diff --git a/BUILD.bazel b/BUILD.bazel index ab9cbf333..e0c6bf1fd 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -674,6 +674,7 @@ cc_library( deps = [ ":agent_manager_rpc", ":gcs", + ":gcs_client_lib", ":node_manager_fbs", ":node_manager_rpc", ":object_manager", @@ -682,7 +683,6 @@ cc_library( ":ray_common", ":ray_util", ":runtime_env_rpc", - ":service_based_gcs_client_lib", ":stats_lib", ":worker_rpc", "//src/ray/protobuf:common_cc_proto", @@ -755,11 +755,11 @@ cc_library( visibility = ["//visibility:public"], deps = [ ":gcs", + ":gcs_client_lib", ":plasma_client", ":ray_common", ":ray_util", ":raylet_client_lib", - ":service_based_gcs_client_lib", ":stats_lib", ":worker_rpc", "//src/ray/protobuf:worker_cc_proto", @@ -936,6 +936,7 @@ cc_test( tags = ["team:core"], deps = [ ":core_worker_lib", + ":gcs_client_lib", ":ray_mock", "@com_google_googletest//:gtest_main", ], @@ -1466,6 +1467,9 @@ cc_library( ], copts = COPTS, strip_include_prefix = "src", + deps = [ + ":gcs_client_lib", + ], ) cc_test( @@ -1525,6 +1529,7 @@ cc_test( copts = COPTS, tags = ["team:core"], deps = [ + ":gcs_client_lib", ":gcs_server_lib", ":gcs_server_test_util", ":gcs_test_util_lib", @@ -1717,17 +1722,15 @@ cc_test( ) cc_library( - name = "service_based_gcs_client_lib", - srcs = glob( - [ - "src/ray/gcs/gcs_client/service_based_*.cc", - ], - ), - hdrs = glob( - [ - "src/ray/gcs/gcs_client/service_based_*.h", - ], - ), + name = "gcs_client_lib", + srcs = [ + "src/ray/gcs/gcs_client/accessor.cc", + "src/ray/gcs/gcs_client/gcs_client.cc", + ], + hdrs = [ + "src/ray/gcs/gcs_client/accessor.h", + "src/ray/gcs/gcs_client/gcs_client.h", + ], copts = COPTS, strip_include_prefix = "src", deps = [ @@ -1755,7 +1758,7 @@ cc_library( strip_include_prefix = "src", visibility = ["//visibility:public"], deps = [ - ":service_based_gcs_client_lib", + ":gcs_client_lib", ], ) @@ -1776,10 +1779,10 @@ cc_test( ], tags = ["team:core"], deps = [ + ":gcs_client_lib", ":gcs_server_lib", ":gcs_test_util_lib", ":global_state_accessor_lib", - ":service_based_gcs_client_lib", "@com_google_googletest//:gtest_main", ], ) @@ -1787,7 +1790,7 @@ cc_test( cc_test( name = "gcs_server_test", srcs = [ - "src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc", + "src/ray/gcs/gcs_client/test/gcs_client_test.cc", ], args = [ "$(location redis-server)", @@ -1800,9 +1803,9 @@ cc_test( ], tags = ["team:core"], deps = [ + ":gcs_client_lib", ":gcs_server_lib", ":gcs_test_util_lib", - ":service_based_gcs_client_lib", "@com_google_googletest//:gtest_main", ], ) @@ -1816,6 +1819,7 @@ cc_test( copts = COPTS, tags = ["team:core"], deps = [ + ":gcs_client_lib", ":gcs_server_lib", ":gcs_server_test_util", ":gcs_test_util_lib", diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index ae20297a9..885aba55c 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -276,7 +276,7 @@ cdef extern from "ray/core_worker/common.h" nogil: const c_string &GetSpilledURL() const const CNodeID &GetSpilledNodeID() const -cdef extern from "ray/gcs/gcs_client.h" nogil: +cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil: cdef cppclass CGcsClientOptions "ray::gcs::GcsClientOptions": CGcsClientOptions(const c_string &ip, int port, const c_string &password) diff --git a/python/ray/includes/gcs_client.pxd b/python/ray/includes/gcs_client.pxd index b25ba6c1a..12031913b 100644 --- a/python/ray/includes/gcs_client.pxd +++ b/python/ray/includes/gcs_client.pxd @@ -12,7 +12,7 @@ from ray.includes.common cimport ( CGcsClientOptions, ) -cdef extern from "ray/gcs/accessor.h" nogil: +cdef extern from "ray/gcs/gcs_client/accessor.h" nogil: cdef cppclass CInternalKVAccessor "ray::gcs::InternalKVAccessor": CRayStatus Put(const c_string &key, const c_string &value, @@ -23,7 +23,7 @@ cdef extern from "ray/gcs/accessor.h" nogil: CRayStatus Exists(const c_string &key, c_bool &exist) CRayStatus Keys(const c_string &key, c_vector[c_string]& results) -cdef extern from "ray/gcs/gcs_client.h" nogil: +cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil: cdef cppclass CGcsClient "ray::gcs::GcsClient": CInternalKVAccessor &InternalKV() @@ -31,15 +31,15 @@ cdef extern from "ray/gcs/gcs_client.h" nogil: cdef extern from * namespace "_gcs_maker": """ #include "ray/common/asio/instrumented_io_context.h" - #include "ray/gcs/gcs_client/service_based_gcs_client.h" + #include "ray/gcs/gcs_client/gcs_client.h" #include "ray/common/asio/instrumented_io_context.h" #include #include namespace _gcs_maker { - class RayletGcsClient : public ray::gcs::ServiceBasedGcsClient { + class RayletGcsClient : public ray::gcs::GcsClient { public: RayletGcsClient(const ray::gcs::GcsClientOptions &options) - : ray::gcs::ServiceBasedGcsClient(options), + : ray::gcs::GcsClient(options), work_(io_context_), thread_([this](){ io_context_.run(); diff --git a/src/mock/ray/core_worker/core_worker.h b/src/mock/ray/core_worker/core_worker.h index bf6d18848..4e304875c 100644 --- a/src/mock/ray/core_worker/core_worker.h +++ b/src/mock/ray/core_worker/core_worker.h @@ -13,7 +13,7 @@ // limitations under the License. #pragma once #include "gmock/gmock.h" -#include "mock/ray/gcs/gcs_client.h" +#include "mock/ray/gcs/gcs_client/gcs_client.h" namespace ray { namespace core { diff --git a/src/mock/ray/gcs/accessor.h b/src/mock/ray/gcs/gcs_client/accessor.h similarity index 99% rename from src/mock/ray/gcs/accessor.h rename to src/mock/ray/gcs/gcs_client/accessor.h index e00646635..287fb2fa8 100644 --- a/src/mock/ray/gcs/accessor.h +++ b/src/mock/ray/gcs/gcs_client/accessor.h @@ -13,6 +13,7 @@ // limitations under the License. #pragma once #include "gmock/gmock.h" +#include "ray/gcs/gcs_client/accessor.h" namespace ray { namespace gcs { diff --git a/src/mock/ray/gcs/gcs_client.h b/src/mock/ray/gcs/gcs_client/gcs_client.h similarity index 98% rename from src/mock/ray/gcs/gcs_client.h rename to src/mock/ray/gcs/gcs_client/gcs_client.h index a3c88dd11..396e7ea16 100644 --- a/src/mock/ray/gcs/gcs_client.h +++ b/src/mock/ray/gcs/gcs_client/gcs_client.h @@ -14,7 +14,7 @@ #pragma once -#include "mock/ray/gcs/accessor.h" +#include "mock/ray/gcs/gcs_client/accessor.h" namespace ray { namespace gcs { @@ -35,6 +35,7 @@ class MockGcsClient : public GcsClient { MOCK_METHOD(void, Disconnect, (), (override)); MOCK_METHOD((std::pair), GetGcsServerAddress, (), (override)); MOCK_METHOD(std::string, DebugString, (), (const, override)); + MockGcsClient() { mock_job_accessor = new MockJobInfoAccessor(); mock_actor_accessor = new MockActorInfoAccessor(); diff --git a/src/mock/ray/gcs/gcs_client/service_based_accessor.h b/src/mock/ray/gcs/gcs_client/service_based_accessor.h deleted file mode 100644 index 7a3e59701..000000000 --- a/src/mock/ray/gcs/gcs_client/service_based_accessor.h +++ /dev/null @@ -1,361 +0,0 @@ -// Copyright The Ray Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -#pragma once - -#include "gmock/gmock.h" -namespace ray { -namespace gcs { - -class MockServiceBasedJobInfoAccessor : public ServiceBasedJobInfoAccessor { - public: - MOCK_METHOD(Status, AsyncAdd, - (const std::shared_ptr &data_ptr, - const StatusCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncMarkFinished, - (const JobID &job_id, const StatusCallback &callback), (override)); - MOCK_METHOD(Status, AsyncSubscribeAll, - ((const SubscribeCallback &subscribe), - const StatusCallback &done), - (override)); - MOCK_METHOD(Status, AsyncGetAll, (const MultiItemCallback &callback), - (override)); - MOCK_METHOD(void, AsyncResubscribe, (bool is_pubsub_server_restarted), (override)); - MOCK_METHOD(Status, AsyncGetNextJobID, (const ItemCallback &callback), - (override)); -}; - -} // namespace gcs -} // namespace ray - -namespace ray { -namespace gcs { - -class MockServiceBasedActorInfoAccessor : public ServiceBasedActorInfoAccessor { - public: - MOCK_METHOD(Status, AsyncGet, - (const ActorID &actor_id, - const OptionalItemCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncGetAll, - (const MultiItemCallback &callback), (override)); - MOCK_METHOD(Status, AsyncGetByName, - (const std::string &name, const std::string &ray_namespace, - const OptionalItemCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncListNamedActors, - (bool all_namespaces, const std::string &ray_namespace, - const ItemCallback> &callback), - (override)); - MOCK_METHOD(Status, AsyncRegisterActor, - (const TaskSpecification &task_spec, const StatusCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncCreateActor, - (const TaskSpecification &task_spec, const StatusCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncKillActor, - (const ActorID &actor_id, bool force_kill, bool no_restart, - const StatusCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncSubscribeAll, - ((const SubscribeCallback &subscribe), - const StatusCallback &done), - (override)); - MOCK_METHOD(Status, AsyncSubscribe, - (const ActorID &actor_id, - (const SubscribeCallback &subscribe), - const StatusCallback &done), - (override)); - MOCK_METHOD(Status, AsyncUnsubscribe, (const ActorID &actor_id), (override)); - MOCK_METHOD(void, AsyncResubscribe, (bool is_pubsub_server_restarted), (override)); - MOCK_METHOD(bool, IsActorUnsubscribed, (const ActorID &actor_id), (override)); -}; - -} // namespace gcs -} // namespace ray - -namespace ray { -namespace gcs { - -class MockServiceBasedNodeInfoAccessor : public ServiceBasedNodeInfoAccessor { - public: - MOCK_METHOD(Status, RegisterSelf, - (const rpc::GcsNodeInfo &local_node_info, const StatusCallback &callback), - (override)); - MOCK_METHOD(Status, DrainSelf, (), (override)); - MOCK_METHOD(const NodeID &, GetSelfId, (), (const, override)); - MOCK_METHOD(const rpc::GcsNodeInfo &, GetSelfInfo, (), (const, override)); - MOCK_METHOD(Status, AsyncRegister, - (const rpc::GcsNodeInfo &node_info, const StatusCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncDrainNode, - (const NodeID &node_id, const StatusCallback &callback), (override)); - MOCK_METHOD(Status, AsyncGetAll, (const MultiItemCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncSubscribeToNodeChange, - ((const SubscribeCallback &subscribe), - const StatusCallback &done), - (override)); - MOCK_METHOD(boost::optional, Get, - (const NodeID &node_id, bool filter_dead_nodes), (const, override)); - MOCK_METHOD((const std::unordered_map &), GetAll, (), - (const, override)); - MOCK_METHOD(bool, IsRemoved, (const NodeID &node_id), (const, override)); - MOCK_METHOD(Status, AsyncReportHeartbeat, - (const std::shared_ptr &data_ptr, - const StatusCallback &callback), - (override)); - MOCK_METHOD(void, AsyncResubscribe, (bool is_pubsub_server_restarted), (override)); - MOCK_METHOD(Status, AsyncGetInternalConfig, - (const OptionalItemCallback &callback), (override)); -}; - -} // namespace gcs -} // namespace ray - -namespace ray { -namespace gcs { - -class MockServiceBasedNodeResourceInfoAccessor - : public ServiceBasedNodeResourceInfoAccessor { - public: - MOCK_METHOD(Status, AsyncGetResources, - (const NodeID &node_id, const OptionalItemCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncGetAllAvailableResources, - (const MultiItemCallback &callback), (override)); - MOCK_METHOD(Status, AsyncUpdateResources, - (const NodeID &node_id, const ResourceMap &resources, - const StatusCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncDeleteResources, - (const NodeID &node_id, const std::vector &resource_names, - const StatusCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncSubscribeToResources, - (const ItemCallback &subscribe, - const StatusCallback &done), - (override)); - MOCK_METHOD(Status, AsyncReportResourceUsage, - (const std::shared_ptr &data_ptr, - const StatusCallback &callback), - (override)); - MOCK_METHOD(void, AsyncReReportResourceUsage, (), (override)); - MOCK_METHOD(Status, AsyncGetAllResourceUsage, - (const ItemCallback &callback), (override)); - MOCK_METHOD(Status, AsyncSubscribeBatchedResourceUsage, - (const ItemCallback &subscribe, - const StatusCallback &done), - (override)); - MOCK_METHOD(void, AsyncResubscribe, (bool is_pubsub_server_restarted), (override)); -}; - -} // namespace gcs -} // namespace ray - -namespace ray { -namespace gcs { - -class MockServiceBasedTaskInfoAccessor : public ServiceBasedTaskInfoAccessor { - public: - MOCK_METHOD(Status, AsyncAdd, - (const std::shared_ptr &data_ptr, - const StatusCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncGet, - (const TaskID &task_id, - const OptionalItemCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncAddTaskLease, - (const std::shared_ptr &data_ptr, - const StatusCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncGetTaskLease, - (const TaskID &task_id, - const OptionalItemCallback &callback), - (override)); - MOCK_METHOD( - Status, AsyncSubscribeTaskLease, - (const TaskID &task_id, - (const SubscribeCallback> &subscribe), - const StatusCallback &done), - (override)); - MOCK_METHOD(Status, AsyncUnsubscribeTaskLease, (const TaskID &task_id), (override)); - MOCK_METHOD(Status, AttemptTaskReconstruction, - (const std::shared_ptr &data_ptr, - const StatusCallback &callback), - (override)); - MOCK_METHOD(void, AsyncResubscribe, (bool is_pubsub_server_restarted), (override)); - MOCK_METHOD(bool, IsTaskLeaseUnsubscribed, (const TaskID &task_id), (override)); -}; - -} // namespace gcs -} // namespace ray - -namespace ray { -namespace gcs { - -class MockServiceBasedObjectInfoAccessor : public ServiceBasedObjectInfoAccessor { - public: - MOCK_METHOD(Status, AsyncGetLocations, - (const ObjectID &object_id, - const OptionalItemCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncGetAll, - (const MultiItemCallback &callback), (override)); - MOCK_METHOD(Status, AsyncAddLocation, - (const ObjectID &object_id, const NodeID &node_id, size_t object_size, - const StatusCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncAddSpilledUrl, - (const ObjectID &object_id, const std::string &spilled_url, - const NodeID &node_id, size_t object_size, const StatusCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncRemoveLocation, - (const ObjectID &object_id, const NodeID &node_id, - const StatusCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncSubscribeToLocations, - (const ObjectID &object_id, - (const SubscribeCallback> - &subscribe), - const StatusCallback &done), - (override)); - MOCK_METHOD(Status, AsyncUnsubscribeToLocations, (const ObjectID &object_id), - (override)); - MOCK_METHOD(void, AsyncResubscribe, (bool is_pubsub_server_restarted), (override)); - MOCK_METHOD(bool, IsObjectUnsubscribed, (const ObjectID &object_id), (override)); -}; - -} // namespace gcs -} // namespace ray - -namespace ray { -namespace gcs { - -class MockServiceBasedStatsInfoAccessor : public ServiceBasedStatsInfoAccessor { - public: - MOCK_METHOD(Status, AsyncAddProfileData, - (const std::shared_ptr &data_ptr, - const StatusCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncGetAll, - (const MultiItemCallback &callback), (override)); -}; - -} // namespace gcs -} // namespace ray - -namespace ray { -namespace gcs { - -class MockServiceBasedErrorInfoAccessor : public ServiceBasedErrorInfoAccessor { - public: - MOCK_METHOD(Status, AsyncReportJobError, - (const std::shared_ptr &data_ptr, - const StatusCallback &callback), - (override)); -}; - -} // namespace gcs -} // namespace ray - -namespace ray { -namespace gcs { - -class MockServiceBasedWorkerInfoAccessor : public ServiceBasedWorkerInfoAccessor { - public: - MOCK_METHOD(Status, AsyncSubscribeToWorkerFailures, - (const ItemCallback &subscribe, - const StatusCallback &done), - (override)); - MOCK_METHOD(Status, AsyncReportWorkerFailure, - (const std::shared_ptr &data_ptr, - const StatusCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncGet, - (const WorkerID &worker_id, - const OptionalItemCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncGetAll, - (const MultiItemCallback &callback), (override)); - MOCK_METHOD(Status, AsyncAdd, - (const std::shared_ptr &data_ptr, - const StatusCallback &callback), - (override)); - MOCK_METHOD(void, AsyncResubscribe, (bool is_pubsub_server_restarted), (override)); -}; - -} // namespace gcs -} // namespace ray - -namespace ray { -namespace gcs { - -class MockServiceBasedPlacementGroupInfoAccessor - : public ServiceBasedPlacementGroupInfoAccessor { - public: - MOCK_METHOD(Status, AsyncCreatePlacementGroup, - (const PlacementGroupSpecification &placement_group_spec, - const StatusCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncRemovePlacementGroup, - (const PlacementGroupID &placement_group_id, - const StatusCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncGet, - (const PlacementGroupID &placement_group_id, - const OptionalItemCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncGetByName, - (const std::string &name, const std::string &ray_namespace, - const OptionalItemCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncGetAll, - (const MultiItemCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncWaitUntilReady, - (const PlacementGroupID &placement_group_id, - const StatusCallback &callback), - (override)); -}; - -} // namespace gcs -} // namespace ray - -namespace ray { -namespace gcs { - -class MockServiceBasedInternalKVAccessor : public ServiceBasedInternalKVAccessor { - public: - MOCK_METHOD(Status, AsyncInternalKVKeys, - (const std::string &prefix, - const OptionalItemCallback> &callback), - (override)); - MOCK_METHOD(Status, AsyncInternalKVGet, - (const std::string &key, const OptionalItemCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncInternalKVPut, - (const std::string &key, const std::string &value, bool overwrite, - const OptionalItemCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncInternalKVExists, - (const std::string &key, const OptionalItemCallback &callback), - (override)); - MOCK_METHOD(Status, AsyncInternalKVDel, - (const std::string &key, const StatusCallback &callback), (override)); -}; - -} // namespace gcs -} // namespace ray diff --git a/src/ray/core_worker/actor_creator.h b/src/ray/core_worker/actor_creator.h index d9456d00b..eae87449d 100644 --- a/src/ray/core_worker/actor_creator.h +++ b/src/ray/core_worker/actor_creator.h @@ -14,8 +14,9 @@ #pragma once #include + #include "ray/common/ray_config.h" -#include "ray/gcs/gcs_client.h" +#include "ray/gcs/gcs_client/gcs_client.h" namespace ray { namespace core { diff --git a/src/ray/core_worker/actor_manager.h b/src/ray/core_worker/actor_manager.h index 56bcc2780..d2b1316c0 100644 --- a/src/ray/core_worker/actor_manager.h +++ b/src/ray/core_worker/actor_manager.h @@ -19,7 +19,7 @@ #include "ray/core_worker/actor_handle.h" #include "ray/core_worker/reference_count.h" #include "ray/core_worker/transport/direct_actor_transport.h" -#include "ray/gcs/gcs_client.h" +#include "ray/gcs/gcs_client/gcs_client.h" namespace ray { namespace core { diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 777f9bee4..a1ab654e1 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -20,7 +20,7 @@ #include "ray/common/task/task_util.h" #include "ray/core_worker/context.h" #include "ray/core_worker/transport/direct_actor_transport.h" -#include "ray/gcs/gcs_client/service_based_gcs_client.h" +#include "ray/gcs/gcs_client/gcs_client.h" #include "ray/stats/stats.h" #include "ray/util/event.h" #include "ray/util/util.h" @@ -488,7 +488,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ options_.gcs_options.password_, /*enable_sync_conn=*/false, /*enable_async_conn=*/false, /*enable_subscribe_conn=*/true); - gcs_client_ = std::make_shared( + gcs_client_ = std::make_shared( gcs_options, [this](std::pair *address) { absl::MutexLock lock(&gcs_server_address_mutex_); if (gcs_server_address_.second != 0) { diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index f28596b8b..1b4984ba4 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -33,7 +33,7 @@ #include "ray/core_worker/store_provider/plasma_store_provider.h" #include "ray/core_worker/transport/direct_actor_transport.h" #include "ray/core_worker/transport/direct_task_transport.h" -#include "ray/gcs/gcs_client.h" +#include "ray/gcs/gcs_client/gcs_client.h" #include "ray/pubsub/publisher.h" #include "ray/pubsub/subscriber.h" #include "ray/raylet_client/raylet_client.h" diff --git a/src/ray/core_worker/profiling.h b/src/ray/core_worker/profiling.h index 03f13a030..42cbb8f9e 100644 --- a/src/ray/core_worker/profiling.h +++ b/src/ray/core_worker/profiling.h @@ -20,7 +20,7 @@ #include "ray/common/asio/instrumented_io_context.h" #include "ray/common/asio/periodical_runner.h" #include "ray/core_worker/context.h" -#include "ray/gcs/gcs_client.h" +#include "ray/gcs/gcs_client/gcs_client.h" namespace ray { namespace core { diff --git a/src/ray/core_worker/test/actor_creator_test.cc b/src/ray/core_worker/test/actor_creator_test.cc index b7c5df893..263c12c6b 100644 --- a/src/ray/core_worker/test/actor_creator_test.cc +++ b/src/ray/core_worker/test/actor_creator_test.cc @@ -16,7 +16,7 @@ #include "gtest/gtest.h" #include "ray/core_worker/actor_creator.h" #include "ray/common/test_util.h" -#include "mock/ray/gcs/gcs_client.h" +#include "mock/ray/gcs/gcs_client/gcs_client.h" // clang-format on namespace ray { diff --git a/src/ray/core_worker/test/actor_manager_test.cc b/src/ray/core_worker/test/actor_manager_test.cc index d62cd8c9c..0e0c85d35 100644 --- a/src/ray/core_worker/test/actor_manager_test.cc +++ b/src/ray/core_worker/test/actor_manager_test.cc @@ -20,18 +20,17 @@ #include "ray/common/test_util.h" #include "ray/core_worker/reference_count.h" #include "ray/core_worker/transport/direct_actor_transport.h" -#include "ray/gcs/gcs_client/service_based_accessor.h" -#include "ray/gcs/gcs_client/service_based_gcs_client.h" +#include "ray/gcs/gcs_client/accessor.h" +#include "ray/gcs/gcs_client/gcs_client.h" namespace ray { namespace core { using ::testing::_; -class MockActorInfoAccessor : public gcs::ServiceBasedActorInfoAccessor { +class MockActorInfoAccessor : public gcs::ActorInfoAccessor { public: - MockActorInfoAccessor(gcs::ServiceBasedGcsClient *client) - : gcs::ServiceBasedActorInfoAccessor(client) {} + MockActorInfoAccessor(gcs::GcsClient *client) : gcs::ActorInfoAccessor(client) {} ~MockActorInfoAccessor() {} @@ -61,9 +60,9 @@ class MockActorInfoAccessor : public gcs::ServiceBasedActorInfoAccessor { callback_map_; }; -class MockGcsClient : public gcs::ServiceBasedGcsClient { +class MockGcsClient : public gcs::GcsClient { public: - MockGcsClient(gcs::GcsClientOptions options) : gcs::ServiceBasedGcsClient(options) {} + MockGcsClient(gcs::GcsClientOptions options) : gcs::GcsClient(options) {} void Init(MockActorInfoAccessor *actor_info_accessor) { actor_accessor_.reset(actor_info_accessor); diff --git a/src/ray/core_worker/test/direct_actor_transport_mock_test.cc b/src/ray/core_worker/test/direct_actor_transport_mock_test.cc index 1f1bab697..28a4e28f1 100644 --- a/src/ray/core_worker/test/direct_actor_transport_mock_test.cc +++ b/src/ray/core_worker/test/direct_actor_transport_mock_test.cc @@ -18,7 +18,7 @@ #include "gtest/gtest.h" #include "ray/core_worker/actor_creator.h" #include "mock/ray/core_worker/task_manager.h" -#include "mock/ray/gcs/gcs_client.h" +#include "mock/ray/gcs/gcs_client/gcs_client.h" // clang-format on namespace ray { diff --git a/src/ray/gcs/accessor.cc b/src/ray/gcs/accessor.cc deleted file mode 100644 index d7ee82527..000000000 --- a/src/ray/gcs/accessor.cc +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright 2017 The Ray Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "ray/gcs/accessor.h" -#include - -namespace ray { -namespace gcs { - -Status InternalKVAccessor::Put(const std::string &key, const std::string &value, - bool overwrite, bool &added) { - std::promise ret_promise; - RAY_CHECK_OK(AsyncInternalKVPut( - key, value, overwrite, - [&ret_promise, &added](Status status, boost::optional added_num) { - added = static_cast(added_num.value_or(0)); - ret_promise.set_value(status); - })); - return ret_promise.get_future().get(); -} - -Status InternalKVAccessor::Keys(const std::string &prefix, - std::vector &value) { - std::promise ret_promise; - RAY_CHECK_OK( - AsyncInternalKVKeys(prefix, [&ret_promise, &value](Status status, auto &values) { - value = values.value_or(std::vector()); - ret_promise.set_value(status); - })); - return ret_promise.get_future().get(); -} - -Status InternalKVAccessor::Get(const std::string &key, std::string &value) { - std::promise ret_promise; - RAY_CHECK_OK(AsyncInternalKVGet(key, [&ret_promise, &value](Status status, auto &v) { - if (v) { - value = *v; - } - ret_promise.set_value(status); - })); - return ret_promise.get_future().get(); -} - -Status InternalKVAccessor::Del(const std::string &key) { - std::promise ret_promise; - RAY_CHECK_OK(AsyncInternalKVDel( - key, [&ret_promise](Status status) { ret_promise.set_value(status); })); - return ret_promise.get_future().get(); -} - -Status InternalKVAccessor::Exists(const std::string &key, bool &exist) { - std::promise ret_promise; - RAY_CHECK_OK(AsyncInternalKVExists( - key, [&ret_promise, &exist](Status status, const boost::optional &value) { - if (value) { - exist = *value; - } - ret_promise.set_value(status); - })); - return ret_promise.get_future().get(); -} - -} // namespace gcs -} // namespace ray diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/accessor.cc similarity index 85% rename from src/ray/gcs/gcs_client/service_based_accessor.cc rename to src/ray/gcs/gcs_client/accessor.cc index 513911859..415feacf5 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/accessor.cc @@ -12,21 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "ray/gcs/gcs_client/service_based_accessor.h" +#include "ray/gcs/gcs_client/accessor.h" -#include "ray/gcs/gcs_client/service_based_gcs_client.h" +#include + +#include "ray/gcs/gcs_client/gcs_client.h" namespace ray { namespace gcs { using namespace ray::rpc; -ServiceBasedJobInfoAccessor::ServiceBasedJobInfoAccessor( - ServiceBasedGcsClient *client_impl) - : client_impl_(client_impl) {} +JobInfoAccessor::JobInfoAccessor(GcsClient *client_impl) : client_impl_(client_impl) {} -Status ServiceBasedJobInfoAccessor::AsyncAdd( - const std::shared_ptr &data_ptr, const StatusCallback &callback) { +Status JobInfoAccessor::AsyncAdd(const std::shared_ptr &data_ptr, + const StatusCallback &callback) { JobID job_id = JobID::FromBinary(data_ptr->job_id()); RAY_LOG(DEBUG) << "Adding job, job id = " << job_id << ", driver pid = " << data_ptr->driver_pid(); @@ -45,8 +45,8 @@ Status ServiceBasedJobInfoAccessor::AsyncAdd( return Status::OK(); } -Status ServiceBasedJobInfoAccessor::AsyncMarkFinished(const JobID &job_id, - const StatusCallback &callback) { +Status JobInfoAccessor::AsyncMarkFinished(const JobID &job_id, + const StatusCallback &callback) { RAY_LOG(DEBUG) << "Marking job state, job id = " << job_id; rpc::MarkJobFinishedRequest request; request.set_job_id(job_id.Binary()); @@ -62,7 +62,7 @@ Status ServiceBasedJobInfoAccessor::AsyncMarkFinished(const JobID &job_id, return Status::OK(); } -Status ServiceBasedJobInfoAccessor::AsyncSubscribeAll( +Status JobInfoAccessor::AsyncSubscribeAll( const SubscribeCallback &subscribe, const StatusCallback &done) { RAY_CHECK(subscribe != nullptr); fetch_all_data_operation_ = [this, subscribe](const StatusCallback &done) { @@ -85,7 +85,7 @@ Status ServiceBasedJobInfoAccessor::AsyncSubscribeAll( [this, done](const Status &status) { fetch_all_data_operation_(done); }); } -void ServiceBasedJobInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restarted) { +void JobInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restarted) { RAY_LOG(DEBUG) << "Reestablishing subscription for job info."; auto fetch_all_done = [](const Status &status) { RAY_LOG(INFO) << "Finished fetching all job information from gcs server after gcs " @@ -108,7 +108,7 @@ void ServiceBasedJobInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restart } } -Status ServiceBasedJobInfoAccessor::AsyncGetAll( +Status JobInfoAccessor::AsyncGetAll( const MultiItemCallback &callback) { RAY_LOG(DEBUG) << "Getting all job info."; RAY_CHECK(callback); @@ -122,8 +122,7 @@ Status ServiceBasedJobInfoAccessor::AsyncGetAll( return Status::OK(); } -Status ServiceBasedJobInfoAccessor::AsyncGetNextJobID( - const ItemCallback &callback) { +Status JobInfoAccessor::AsyncGetNextJobID(const ItemCallback &callback) { RAY_LOG(DEBUG) << "Getting next job id"; rpc::GetNextJobIDRequest request; client_impl_->GetGcsRpcClient().GetNextJobID( @@ -136,11 +135,10 @@ Status ServiceBasedJobInfoAccessor::AsyncGetNextJobID( return Status::OK(); } -ServiceBasedActorInfoAccessor::ServiceBasedActorInfoAccessor( - ServiceBasedGcsClient *client_impl) +ActorInfoAccessor::ActorInfoAccessor(GcsClient *client_impl) : client_impl_(client_impl) {} -Status ServiceBasedActorInfoAccessor::AsyncGet( +Status ActorInfoAccessor::AsyncGet( const ActorID &actor_id, const OptionalItemCallback &callback) { RAY_LOG(DEBUG) << "Getting actor info, actor id = " << actor_id << ", job id = " << actor_id.JobId(); @@ -161,7 +159,7 @@ Status ServiceBasedActorInfoAccessor::AsyncGet( return Status::OK(); } -Status ServiceBasedActorInfoAccessor::AsyncGetAll( +Status ActorInfoAccessor::AsyncGetAll( const MultiItemCallback &callback) { RAY_LOG(DEBUG) << "Getting all actor info."; rpc::GetAllActorInfoRequest request; @@ -174,7 +172,7 @@ Status ServiceBasedActorInfoAccessor::AsyncGetAll( return Status::OK(); } -Status ServiceBasedActorInfoAccessor::AsyncGetByName( +Status ActorInfoAccessor::AsyncGetByName( const std::string &name, const std::string &ray_namespace, const OptionalItemCallback &callback) { RAY_LOG(DEBUG) << "Getting actor info, name = " << name; @@ -195,7 +193,7 @@ Status ServiceBasedActorInfoAccessor::AsyncGetByName( return Status::OK(); } -Status ServiceBasedActorInfoAccessor::AsyncListNamedActors( +Status ActorInfoAccessor::AsyncListNamedActors( bool all_namespaces, const std::string &ray_namespace, const ItemCallback> &callback) { RAY_LOG(DEBUG) << "Listing actors"; @@ -210,8 +208,8 @@ Status ServiceBasedActorInfoAccessor::AsyncListNamedActors( return Status::OK(); } -Status ServiceBasedActorInfoAccessor::AsyncRegisterActor( - const ray::TaskSpecification &task_spec, const ray::gcs::StatusCallback &callback) { +Status ActorInfoAccessor::AsyncRegisterActor(const ray::TaskSpecification &task_spec, + const ray::gcs::StatusCallback &callback) { RAY_CHECK(task_spec.IsActorCreationTask() && callback); rpc::RegisterActorRequest request; request.mutable_task_spec()->CopyFrom(task_spec.GetMessage()); @@ -227,9 +225,9 @@ Status ServiceBasedActorInfoAccessor::AsyncRegisterActor( return Status::OK(); } -Status ServiceBasedActorInfoAccessor::AsyncKillActor( - const ActorID &actor_id, bool force_kill, bool no_restart, - const ray::gcs::StatusCallback &callback) { +Status ActorInfoAccessor::AsyncKillActor(const ActorID &actor_id, bool force_kill, + bool no_restart, + const ray::gcs::StatusCallback &callback) { rpc::KillActorViaGcsRequest request; request.set_actor_id(actor_id.Binary()); request.set_force_kill(force_kill); @@ -248,7 +246,7 @@ Status ServiceBasedActorInfoAccessor::AsyncKillActor( return Status::OK(); } -Status ServiceBasedActorInfoAccessor::AsyncCreateActor( +Status ActorInfoAccessor::AsyncCreateActor( const ray::TaskSpecification &task_spec, const rpc::ClientCallback &callback) { RAY_CHECK(task_spec.IsActorCreationTask() && callback); @@ -265,7 +263,7 @@ Status ServiceBasedActorInfoAccessor::AsyncCreateActor( return Status::OK(); } -Status ServiceBasedActorInfoAccessor::AsyncSubscribeAll( +Status ActorInfoAccessor::AsyncSubscribeAll( const SubscribeCallback &subscribe, const StatusCallback &done) { RAY_CHECK(subscribe != nullptr); @@ -291,7 +289,7 @@ Status ServiceBasedActorInfoAccessor::AsyncSubscribeAll( [this, done](const Status &status) { fetch_all_data_operation_(done); }); } -Status ServiceBasedActorInfoAccessor::AsyncSubscribe( +Status ActorInfoAccessor::AsyncSubscribe( const ActorID &actor_id, const SubscribeCallback &subscribe, const StatusCallback &done) { @@ -329,7 +327,7 @@ Status ServiceBasedActorInfoAccessor::AsyncSubscribe( [fetch_data_operation, done](const Status &status) { fetch_data_operation(done); }); } -Status ServiceBasedActorInfoAccessor::AsyncUnsubscribe(const ActorID &actor_id) { +Status ActorInfoAccessor::AsyncUnsubscribe(const ActorID &actor_id) { RAY_LOG(DEBUG) << "Cancelling subscription to an actor, actor id = " << actor_id << ", job id = " << actor_id.JobId(); auto status = client_impl_->GetGcsSubscriber().UnsubscribeActor(actor_id); @@ -341,7 +339,7 @@ Status ServiceBasedActorInfoAccessor::AsyncUnsubscribe(const ActorID &actor_id) return status; } -void ServiceBasedActorInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restarted) { +void ActorInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restarted) { RAY_LOG(DEBUG) << "Reestablishing subscription for actor info."; auto fetch_all_done = [](const Status &status) { RAY_LOG(INFO) << "Finished fetching all actor information from gcs server after gcs " @@ -381,16 +379,14 @@ void ServiceBasedActorInfoAccessor::AsyncResubscribe(bool is_pubsub_server_resta } } -bool ServiceBasedActorInfoAccessor::IsActorUnsubscribed(const ActorID &actor_id) { +bool ActorInfoAccessor::IsActorUnsubscribed(const ActorID &actor_id) { return client_impl_->GetGcsSubscriber().IsActorUnsubscribed(actor_id); } -ServiceBasedNodeInfoAccessor::ServiceBasedNodeInfoAccessor( - ServiceBasedGcsClient *client_impl) - : client_impl_(client_impl) {} +NodeInfoAccessor::NodeInfoAccessor(GcsClient *client_impl) : client_impl_(client_impl) {} -Status ServiceBasedNodeInfoAccessor::RegisterSelf(const GcsNodeInfo &local_node_info, - const StatusCallback &callback) { +Status NodeInfoAccessor::RegisterSelf(const GcsNodeInfo &local_node_info, + const StatusCallback &callback) { auto node_id = NodeID::FromBinary(local_node_info.node_id()); RAY_LOG(DEBUG) << "Registering node info, node id = " << node_id << ", address is = " << local_node_info.node_manager_address(); @@ -416,7 +412,7 @@ Status ServiceBasedNodeInfoAccessor::RegisterSelf(const GcsNodeInfo &local_node_ return Status::OK(); } -Status ServiceBasedNodeInfoAccessor::DrainSelf() { +Status NodeInfoAccessor::DrainSelf() { RAY_CHECK(!local_node_id_.IsNil()) << "This node is disconnected."; NodeID node_id = NodeID::FromBinary(local_node_info_.node_id()); RAY_LOG(INFO) << "Unregistering node info, node id = " << node_id; @@ -435,14 +431,12 @@ Status ServiceBasedNodeInfoAccessor::DrainSelf() { return Status::OK(); } -const NodeID &ServiceBasedNodeInfoAccessor::GetSelfId() const { return local_node_id_; } +const NodeID &NodeInfoAccessor::GetSelfId() const { return local_node_id_; } -const GcsNodeInfo &ServiceBasedNodeInfoAccessor::GetSelfInfo() const { - return local_node_info_; -} +const GcsNodeInfo &NodeInfoAccessor::GetSelfInfo() const { return local_node_info_; } -Status ServiceBasedNodeInfoAccessor::AsyncRegister(const rpc::GcsNodeInfo &node_info, - const StatusCallback &callback) { +Status NodeInfoAccessor::AsyncRegister(const rpc::GcsNodeInfo &node_info, + const StatusCallback &callback) { NodeID node_id = NodeID::FromBinary(node_info.node_id()); RAY_LOG(DEBUG) << "Registering node info, node id = " << node_id; rpc::RegisterNodeRequest request; @@ -459,8 +453,8 @@ Status ServiceBasedNodeInfoAccessor::AsyncRegister(const rpc::GcsNodeInfo &node_ return Status::OK(); } -Status ServiceBasedNodeInfoAccessor::AsyncDrainNode(const NodeID &node_id, - const StatusCallback &callback) { +Status NodeInfoAccessor::AsyncDrainNode(const NodeID &node_id, + const StatusCallback &callback) { RAY_LOG(DEBUG) << "Draining node, node id = " << node_id; rpc::DrainNodeRequest request; auto draining_request = request.add_drain_node_data(); @@ -477,8 +471,7 @@ Status ServiceBasedNodeInfoAccessor::AsyncDrainNode(const NodeID &node_id, return Status::OK(); } -Status ServiceBasedNodeInfoAccessor::AsyncGetAll( - const MultiItemCallback &callback) { +Status NodeInfoAccessor::AsyncGetAll(const MultiItemCallback &callback) { RAY_LOG(DEBUG) << "Getting information of all nodes."; rpc::GetAllNodeInfoRequest request; client_impl_->GetGcsRpcClient().GetAllNodeInfo( @@ -495,7 +488,7 @@ Status ServiceBasedNodeInfoAccessor::AsyncGetAll( return Status::OK(); } -Status ServiceBasedNodeInfoAccessor::AsyncSubscribeToNodeChange( +Status NodeInfoAccessor::AsyncSubscribeToNodeChange( const SubscribeCallback &subscribe, const StatusCallback &done) { RAY_CHECK(subscribe != nullptr); RAY_CHECK(node_change_callback_ == nullptr); @@ -524,8 +517,8 @@ Status ServiceBasedNodeInfoAccessor::AsyncSubscribeToNodeChange( }); } -const GcsNodeInfo *ServiceBasedNodeInfoAccessor::Get(const NodeID &node_id, - bool filter_dead_nodes) const { +const GcsNodeInfo *NodeInfoAccessor::Get(const NodeID &node_id, + bool filter_dead_nodes) const { RAY_CHECK(!node_id.IsNil()); auto entry = node_cache_.find(node_id); if (entry != node_cache_.end()) { @@ -537,16 +530,15 @@ const GcsNodeInfo *ServiceBasedNodeInfoAccessor::Get(const NodeID &node_id, return nullptr; } -const std::unordered_map &ServiceBasedNodeInfoAccessor::GetAll() - const { +const std::unordered_map &NodeInfoAccessor::GetAll() const { return node_cache_; } -bool ServiceBasedNodeInfoAccessor::IsRemoved(const NodeID &node_id) const { +bool NodeInfoAccessor::IsRemoved(const NodeID &node_id) const { return removed_nodes_.count(node_id) == 1; } -Status ServiceBasedNodeInfoAccessor::AsyncReportHeartbeat( +Status NodeInfoAccessor::AsyncReportHeartbeat( const std::shared_ptr &data_ptr, const StatusCallback &callback) { rpc::ReportHeartbeatRequest request; @@ -560,7 +552,7 @@ Status ServiceBasedNodeInfoAccessor::AsyncReportHeartbeat( return Status::OK(); } -void ServiceBasedNodeInfoAccessor::HandleNotification(const GcsNodeInfo &node_info) { +void NodeInfoAccessor::HandleNotification(const GcsNodeInfo &node_info) { NodeID node_id = NodeID::FromBinary(node_info.node_id()); bool is_alive = (node_info.state() == GcsNodeInfo::ALIVE); auto entry = node_cache_.find(node_id); @@ -613,7 +605,7 @@ void ServiceBasedNodeInfoAccessor::HandleNotification(const GcsNodeInfo &node_in } } -void ServiceBasedNodeInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restarted) { +void NodeInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restarted) { RAY_LOG(DEBUG) << "Reestablishing subscription for node info."; auto fetch_all_done = [](const Status &status) { RAY_LOG(INFO) << "Finished fetching all node information from gcs server after gcs " @@ -637,7 +629,7 @@ void ServiceBasedNodeInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restar } } -Status ServiceBasedNodeInfoAccessor::AsyncGetInternalConfig( +Status NodeInfoAccessor::AsyncGetInternalConfig( const OptionalItemCallback &callback) { rpc::GetInternalConfigRequest request; client_impl_->GetGcsRpcClient().GetInternalConfig( @@ -653,11 +645,10 @@ Status ServiceBasedNodeInfoAccessor::AsyncGetInternalConfig( return Status::OK(); } -ServiceBasedNodeResourceInfoAccessor::ServiceBasedNodeResourceInfoAccessor( - ServiceBasedGcsClient *client_impl) +NodeResourceInfoAccessor::NodeResourceInfoAccessor(GcsClient *client_impl) : client_impl_(client_impl) {} -Status ServiceBasedNodeResourceInfoAccessor::AsyncGetResources( +Status NodeResourceInfoAccessor::AsyncGetResources( const NodeID &node_id, const OptionalItemCallback &callback) { RAY_LOG(DEBUG) << "Getting node resources, node id = " << node_id; rpc::GetResourcesRequest request; @@ -677,7 +668,7 @@ Status ServiceBasedNodeResourceInfoAccessor::AsyncGetResources( return Status::OK(); } -Status ServiceBasedNodeResourceInfoAccessor::AsyncGetAllAvailableResources( +Status NodeResourceInfoAccessor::AsyncGetAllAvailableResources( const MultiItemCallback &callback) { rpc::GetAllAvailableResourcesRequest request; client_impl_->GetGcsRpcClient().GetAllAvailableResources( @@ -692,8 +683,9 @@ Status ServiceBasedNodeResourceInfoAccessor::AsyncGetAllAvailableResources( return Status::OK(); } -Status ServiceBasedNodeResourceInfoAccessor::AsyncUpdateResources( - const NodeID &node_id, const ResourceMap &resources, const StatusCallback &callback) { +Status NodeResourceInfoAccessor::AsyncUpdateResources(const NodeID &node_id, + const ResourceMap &resources, + const StatusCallback &callback) { RAY_LOG(DEBUG) << "Updating node resources, node id = " << node_id; rpc::UpdateResourcesRequest request; request.set_node_id(node_id.Binary()); @@ -719,7 +711,7 @@ Status ServiceBasedNodeResourceInfoAccessor::AsyncUpdateResources( return Status::OK(); } -Status ServiceBasedNodeResourceInfoAccessor::AsyncReportResourceUsage( +Status NodeResourceInfoAccessor::AsyncReportResourceUsage( const std::shared_ptr &data_ptr, const StatusCallback &callback) { absl::MutexLock lock(&mutex_); last_resource_usage_->SetAvailableResources( @@ -739,7 +731,7 @@ Status ServiceBasedNodeResourceInfoAccessor::AsyncReportResourceUsage( return Status::OK(); } -void ServiceBasedNodeResourceInfoAccessor::AsyncReReportResourceUsage() { +void NodeResourceInfoAccessor::AsyncReReportResourceUsage() { absl::MutexLock lock(&mutex_); if (cached_resource_usage_.has_resources()) { RAY_LOG(INFO) << "Rereport resource usage."; @@ -750,7 +742,7 @@ void ServiceBasedNodeResourceInfoAccessor::AsyncReReportResourceUsage() { } } -void ServiceBasedNodeResourceInfoAccessor::FillResourceUsageRequest( +void NodeResourceInfoAccessor::FillResourceUsageRequest( rpc::ReportResourceUsageRequest &resources) { SchedulingResources cached_resources = SchedulingResources(*GetLastResourceUsage()); @@ -778,7 +770,7 @@ void ServiceBasedNodeResourceInfoAccessor::FillResourceUsageRequest( } } -Status ServiceBasedNodeResourceInfoAccessor::AsyncSubscribeBatchedResourceUsage( +Status NodeResourceInfoAccessor::AsyncSubscribeBatchedResourceUsage( const ItemCallback &subscribe, const StatusCallback &done) { RAY_CHECK(subscribe != nullptr); @@ -789,7 +781,7 @@ Status ServiceBasedNodeResourceInfoAccessor::AsyncSubscribeBatchedResourceUsage( return subscribe_batch_resource_usage_operation_(done); } -Status ServiceBasedNodeResourceInfoAccessor::AsyncDeleteResources( +Status NodeResourceInfoAccessor::AsyncDeleteResources( const NodeID &node_id, const std::vector &resource_names, const StatusCallback &callback) { RAY_LOG(DEBUG) << "Deleting node resources, node id = " << node_id; @@ -817,7 +809,7 @@ Status ServiceBasedNodeResourceInfoAccessor::AsyncDeleteResources( return Status::OK(); } -Status ServiceBasedNodeResourceInfoAccessor::AsyncSubscribeToResources( +Status NodeResourceInfoAccessor::AsyncSubscribeToResources( const ItemCallback &subscribe, const StatusCallback &done) { RAY_CHECK(subscribe != nullptr); subscribe_resource_operation_ = [this, subscribe](const StatusCallback &done) { @@ -826,8 +818,7 @@ Status ServiceBasedNodeResourceInfoAccessor::AsyncSubscribeToResources( return subscribe_resource_operation_(done); } -void ServiceBasedNodeResourceInfoAccessor::AsyncResubscribe( - bool is_pubsub_server_restarted) { +void NodeResourceInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restarted) { RAY_LOG(DEBUG) << "Reestablishing subscription for node resource info."; // If the pub-sub server has also restarted, we need to resubscribe to the pub-sub // server. @@ -841,7 +832,7 @@ void ServiceBasedNodeResourceInfoAccessor::AsyncResubscribe( } } -Status ServiceBasedNodeResourceInfoAccessor::AsyncGetAllResourceUsage( +Status NodeResourceInfoAccessor::AsyncGetAllResourceUsage( const ItemCallback &callback) { rpc::GetAllResourceUsageRequest request; client_impl_->GetGcsRpcClient().GetAllResourceUsage( @@ -854,12 +845,10 @@ Status ServiceBasedNodeResourceInfoAccessor::AsyncGetAllResourceUsage( return Status::OK(); } -ServiceBasedTaskInfoAccessor::ServiceBasedTaskInfoAccessor( - ServiceBasedGcsClient *client_impl) - : client_impl_(client_impl) {} +TaskInfoAccessor::TaskInfoAccessor(GcsClient *client_impl) : client_impl_(client_impl) {} -Status ServiceBasedTaskInfoAccessor::AsyncAdd( - const std::shared_ptr &data_ptr, const StatusCallback &callback) { +Status TaskInfoAccessor::AsyncAdd(const std::shared_ptr &data_ptr, + const StatusCallback &callback) { TaskID task_id = TaskID::FromBinary(data_ptr->task().task_spec().task_id()); JobID job_id = JobID::FromBinary(data_ptr->task().task_spec().job_id()); RAY_LOG(DEBUG) << "Adding task, task id = " << task_id << ", job id = " << job_id; @@ -877,7 +866,7 @@ Status ServiceBasedTaskInfoAccessor::AsyncAdd( return Status::OK(); } -Status ServiceBasedTaskInfoAccessor::AsyncGet( +Status TaskInfoAccessor::AsyncGet( const TaskID &task_id, const OptionalItemCallback &callback) { RAY_LOG(DEBUG) << "Getting task, task id = " << task_id << ", job id = " << task_id.JobId(); @@ -896,7 +885,7 @@ Status ServiceBasedTaskInfoAccessor::AsyncGet( return Status::OK(); } -Status ServiceBasedTaskInfoAccessor::AsyncAddTaskLease( +Status TaskInfoAccessor::AsyncAddTaskLease( const std::shared_ptr &data_ptr, const StatusCallback &callback) { TaskID task_id = TaskID::FromBinary(data_ptr->task_id()); NodeID node_id = NodeID::FromBinary(data_ptr->node_manager_id()); @@ -917,7 +906,7 @@ Status ServiceBasedTaskInfoAccessor::AsyncAddTaskLease( return Status::OK(); } -Status ServiceBasedTaskInfoAccessor::AsyncGetTaskLease( +Status TaskInfoAccessor::AsyncGetTaskLease( const TaskID &task_id, const OptionalItemCallback &callback) { RAY_LOG(DEBUG) << "Getting task lease, task id = " << task_id << ", job id = " << task_id.JobId(); @@ -937,7 +926,7 @@ Status ServiceBasedTaskInfoAccessor::AsyncGetTaskLease( return Status::OK(); } -Status ServiceBasedTaskInfoAccessor::AsyncSubscribeTaskLease( +Status TaskInfoAccessor::AsyncSubscribeTaskLease( const TaskID &task_id, const SubscribeCallback> &subscribe, const StatusCallback &done) { @@ -970,7 +959,7 @@ Status ServiceBasedTaskInfoAccessor::AsyncSubscribeTaskLease( [fetch_data_operation, done](const Status &status) { fetch_data_operation(done); }); } -Status ServiceBasedTaskInfoAccessor::AsyncUnsubscribeTaskLease(const TaskID &task_id) { +Status TaskInfoAccessor::AsyncUnsubscribeTaskLease(const TaskID &task_id) { RAY_LOG(DEBUG) << "Unsubscribing task lease, task id = " << task_id << ", job id = " << task_id.JobId(); auto status = client_impl_->GetGcsSubscriber().UnsubscribeTaskLease(task_id); @@ -981,7 +970,7 @@ Status ServiceBasedTaskInfoAccessor::AsyncUnsubscribeTaskLease(const TaskID &tas return status; } -Status ServiceBasedTaskInfoAccessor::AttemptTaskReconstruction( +Status TaskInfoAccessor::AttemptTaskReconstruction( const std::shared_ptr &data_ptr, const StatusCallback &callback) { auto num_reconstructions = data_ptr->num_reconstructions(); @@ -1007,7 +996,7 @@ Status ServiceBasedTaskInfoAccessor::AttemptTaskReconstruction( return Status::OK(); } -void ServiceBasedTaskInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restarted) { +void TaskInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restarted) { RAY_LOG(DEBUG) << "Reestablishing subscription for task info."; // If only the GCS sever has restarted, we only need to fetch data from the GCS server. // If the pub-sub server has also restarted, we need to resubscribe to the pub-sub @@ -1026,15 +1015,14 @@ void ServiceBasedTaskInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restar } } -bool ServiceBasedTaskInfoAccessor::IsTaskLeaseUnsubscribed(const TaskID &task_id) { +bool TaskInfoAccessor::IsTaskLeaseUnsubscribed(const TaskID &task_id) { return client_impl_->GetGcsSubscriber().IsTaskLeaseUnsubscribed(task_id); } -ServiceBasedObjectInfoAccessor::ServiceBasedObjectInfoAccessor( - ServiceBasedGcsClient *client_impl) +ObjectInfoAccessor::ObjectInfoAccessor(GcsClient *client_impl) : client_impl_(client_impl) {} -Status ServiceBasedObjectInfoAccessor::AsyncGetLocations( +Status ObjectInfoAccessor::AsyncGetLocations( const ObjectID &object_id, const OptionalItemCallback &callback) { RAY_LOG(DEBUG) << "Getting object locations, object id = " << object_id @@ -1052,7 +1040,7 @@ Status ServiceBasedObjectInfoAccessor::AsyncGetLocations( return Status::OK(); } -Status ServiceBasedObjectInfoAccessor::AsyncGetAll( +Status ObjectInfoAccessor::AsyncGetAll( const MultiItemCallback &callback) { RAY_LOG(DEBUG) << "Getting all object locations."; rpc::GetAllObjectLocationsRequest request; @@ -1070,10 +1058,9 @@ Status ServiceBasedObjectInfoAccessor::AsyncGetAll( return Status::OK(); } -Status ServiceBasedObjectInfoAccessor::AsyncAddLocation(const ObjectID &object_id, - const NodeID &node_id, - size_t object_size, - const StatusCallback &callback) { +Status ObjectInfoAccessor::AsyncAddLocation(const ObjectID &object_id, + const NodeID &node_id, size_t object_size, + const StatusCallback &callback) { RAY_LOG(DEBUG) << "Adding object location, object id = " << object_id << ", node id = " << node_id << ", job id = " << object_id.TaskId().JobId(); @@ -1102,9 +1089,11 @@ Status ServiceBasedObjectInfoAccessor::AsyncAddLocation(const ObjectID &object_i return Status::OK(); } -Status ServiceBasedObjectInfoAccessor::AsyncAddSpilledUrl( - const ObjectID &object_id, const std::string &spilled_url, - const NodeID &spilled_node_id, size_t object_size, const StatusCallback &callback) { +Status ObjectInfoAccessor::AsyncAddSpilledUrl(const ObjectID &object_id, + const std::string &spilled_url, + const NodeID &spilled_node_id, + size_t object_size, + const StatusCallback &callback) { RAY_LOG(DEBUG) << "Adding object spilled location, object id = " << object_id << ", spilled_url = " << spilled_url << ", job id = " << object_id.TaskId().JobId(); @@ -1130,8 +1119,9 @@ Status ServiceBasedObjectInfoAccessor::AsyncAddSpilledUrl( return Status::OK(); } -Status ServiceBasedObjectInfoAccessor::AsyncRemoveLocation( - const ObjectID &object_id, const NodeID &node_id, const StatusCallback &callback) { +Status ObjectInfoAccessor::AsyncRemoveLocation(const ObjectID &object_id, + const NodeID &node_id, + const StatusCallback &callback) { RAY_LOG(DEBUG) << "Removing object location, object id = " << object_id << ", node id = " << node_id << ", job id = " << object_id.TaskId().JobId(); @@ -1158,7 +1148,7 @@ Status ServiceBasedObjectInfoAccessor::AsyncRemoveLocation( return Status::OK(); } -Status ServiceBasedObjectInfoAccessor::AsyncSubscribeToLocations( +Status ObjectInfoAccessor::AsyncSubscribeToLocations( const ObjectID &object_id, const SubscribeCallback> &subscribe, const StatusCallback &done) { @@ -1211,7 +1201,7 @@ Status ServiceBasedObjectInfoAccessor::AsyncSubscribeToLocations( [fetch_data_operation, done](const Status &status) { fetch_data_operation(done); }); } -void ServiceBasedObjectInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restarted) { +void ObjectInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restarted) { RAY_LOG(DEBUG) << "Reestablishing subscription for object locations."; // If only the GCS sever has restarted, we only need to fetch data from the GCS server. // If the pub-sub server has also restarted, we need to resubscribe to the pub-sub @@ -1237,8 +1227,7 @@ void ServiceBasedObjectInfoAccessor::AsyncResubscribe(bool is_pubsub_server_rest } } -Status ServiceBasedObjectInfoAccessor::AsyncUnsubscribeToLocations( - const ObjectID &object_id) { +Status ObjectInfoAccessor::AsyncUnsubscribeToLocations(const ObjectID &object_id) { RAY_LOG(DEBUG) << "Unsubscribing object location, object id = " << object_id << ", job id = " << object_id.TaskId().JobId(); auto status = client_impl_->GetGcsSubscriber().UnsubscribeObject(object_id); @@ -1250,15 +1239,14 @@ Status ServiceBasedObjectInfoAccessor::AsyncUnsubscribeToLocations( return status; } -bool ServiceBasedObjectInfoAccessor::IsObjectUnsubscribed(const ObjectID &object_id) { +bool ObjectInfoAccessor::IsObjectUnsubscribed(const ObjectID &object_id) { return client_impl_->GetGcsSubscriber().IsObjectUnsubscribed(object_id); } -ServiceBasedStatsInfoAccessor::ServiceBasedStatsInfoAccessor( - ServiceBasedGcsClient *client_impl) +StatsInfoAccessor::StatsInfoAccessor(GcsClient *client_impl) : client_impl_(client_impl) {} -Status ServiceBasedStatsInfoAccessor::AsyncAddProfileData( +Status StatsInfoAccessor::AsyncAddProfileData( const std::shared_ptr &data_ptr, const StatusCallback &callback) { NodeID node_id = NodeID::FromBinary(data_ptr->component_id()); @@ -1279,7 +1267,7 @@ Status ServiceBasedStatsInfoAccessor::AsyncAddProfileData( return Status::OK(); } -Status ServiceBasedStatsInfoAccessor::AsyncGetAll( +Status StatsInfoAccessor::AsyncGetAll( const MultiItemCallback &callback) { RAY_LOG(DEBUG) << "Getting all profile info."; RAY_CHECK(callback); @@ -1294,11 +1282,10 @@ Status ServiceBasedStatsInfoAccessor::AsyncGetAll( return Status::OK(); } -ServiceBasedErrorInfoAccessor::ServiceBasedErrorInfoAccessor( - ServiceBasedGcsClient *client_impl) +ErrorInfoAccessor::ErrorInfoAccessor(GcsClient *client_impl) : client_impl_(client_impl) {} -Status ServiceBasedErrorInfoAccessor::AsyncReportJobError( +Status ErrorInfoAccessor::AsyncReportJobError( const std::shared_ptr &data_ptr, const StatusCallback &callback) { auto job_id = JobID::FromBinary(data_ptr->job_id()); @@ -1316,11 +1303,10 @@ Status ServiceBasedErrorInfoAccessor::AsyncReportJobError( return Status::OK(); } -ServiceBasedWorkerInfoAccessor::ServiceBasedWorkerInfoAccessor( - ServiceBasedGcsClient *client_impl) +WorkerInfoAccessor::WorkerInfoAccessor(GcsClient *client_impl) : client_impl_(client_impl) {} -Status ServiceBasedWorkerInfoAccessor::AsyncSubscribeToWorkerFailures( +Status WorkerInfoAccessor::AsyncSubscribeToWorkerFailures( const ItemCallback &subscribe, const StatusCallback &done) { RAY_CHECK(subscribe != nullptr); subscribe_operation_ = [this, subscribe](const StatusCallback &done) { @@ -1329,7 +1315,7 @@ Status ServiceBasedWorkerInfoAccessor::AsyncSubscribeToWorkerFailures( return subscribe_operation_(done); } -void ServiceBasedWorkerInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restarted) { +void WorkerInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restarted) { RAY_LOG(DEBUG) << "Reestablishing subscription for worker failures."; // If the pub-sub server has restarted, we need to resubscribe to the pub-sub server. if (subscribe_operation_ != nullptr && is_pubsub_server_restarted) { @@ -1337,7 +1323,7 @@ void ServiceBasedWorkerInfoAccessor::AsyncResubscribe(bool is_pubsub_server_rest } } -Status ServiceBasedWorkerInfoAccessor::AsyncReportWorkerFailure( +Status WorkerInfoAccessor::AsyncReportWorkerFailure( const std::shared_ptr &data_ptr, const StatusCallback &callback) { rpc::Address worker_address = data_ptr->worker_address(); @@ -1356,7 +1342,7 @@ Status ServiceBasedWorkerInfoAccessor::AsyncReportWorkerFailure( return Status::OK(); } -Status ServiceBasedWorkerInfoAccessor::AsyncGet( +Status WorkerInfoAccessor::AsyncGet( const WorkerID &worker_id, const OptionalItemCallback &callback) { RAY_LOG(DEBUG) << "Getting worker info, worker id = " << worker_id; @@ -1375,7 +1361,7 @@ Status ServiceBasedWorkerInfoAccessor::AsyncGet( return Status::OK(); } -Status ServiceBasedWorkerInfoAccessor::AsyncGetAll( +Status WorkerInfoAccessor::AsyncGetAll( const MultiItemCallback &callback) { RAY_LOG(DEBUG) << "Getting all worker info."; rpc::GetAllWorkerInfoRequest request; @@ -1388,9 +1374,8 @@ Status ServiceBasedWorkerInfoAccessor::AsyncGetAll( return Status::OK(); } -Status ServiceBasedWorkerInfoAccessor::AsyncAdd( - const std::shared_ptr &data_ptr, - const StatusCallback &callback) { +Status WorkerInfoAccessor::AsyncAdd(const std::shared_ptr &data_ptr, + const StatusCallback &callback) { rpc::AddWorkerInfoRequest request; request.mutable_worker_data()->CopyFrom(*data_ptr); client_impl_->GetGcsRpcClient().AddWorkerInfo( @@ -1402,11 +1387,10 @@ Status ServiceBasedWorkerInfoAccessor::AsyncAdd( return Status::OK(); } -ServiceBasedPlacementGroupInfoAccessor::ServiceBasedPlacementGroupInfoAccessor( - ServiceBasedGcsClient *client_impl) +PlacementGroupInfoAccessor::PlacementGroupInfoAccessor(GcsClient *client_impl) : client_impl_(client_impl) {} -Status ServiceBasedPlacementGroupInfoAccessor::AsyncCreatePlacementGroup( +Status PlacementGroupInfoAccessor::AsyncCreatePlacementGroup( const ray::PlacementGroupSpecification &placement_group_spec, const StatusCallback &callback) { rpc::CreatePlacementGroupRequest request; @@ -1434,7 +1418,7 @@ Status ServiceBasedPlacementGroupInfoAccessor::AsyncCreatePlacementGroup( return Status::OK(); } -Status ServiceBasedPlacementGroupInfoAccessor::AsyncRemovePlacementGroup( +Status PlacementGroupInfoAccessor::AsyncRemovePlacementGroup( const ray::PlacementGroupID &placement_group_id, const StatusCallback &callback) { rpc::RemovePlacementGroupRequest request; request.set_placement_group_id(placement_group_id.Binary()); @@ -1448,7 +1432,7 @@ Status ServiceBasedPlacementGroupInfoAccessor::AsyncRemovePlacementGroup( return Status::OK(); } -Status ServiceBasedPlacementGroupInfoAccessor::AsyncGet( +Status PlacementGroupInfoAccessor::AsyncGet( const PlacementGroupID &placement_group_id, const OptionalItemCallback &callback) { RAY_LOG(DEBUG) << "Getting placement group info, placement group id = " @@ -1469,7 +1453,7 @@ Status ServiceBasedPlacementGroupInfoAccessor::AsyncGet( return Status::OK(); } -Status ServiceBasedPlacementGroupInfoAccessor::AsyncGetByName( +Status PlacementGroupInfoAccessor::AsyncGetByName( const std::string &name, const std::string &ray_namespace, const OptionalItemCallback &callback) { RAY_LOG(DEBUG) << "Getting named placement group info, name = " << name; @@ -1490,7 +1474,7 @@ Status ServiceBasedPlacementGroupInfoAccessor::AsyncGetByName( return Status::OK(); } -Status ServiceBasedPlacementGroupInfoAccessor::AsyncGetAll( +Status PlacementGroupInfoAccessor::AsyncGetAll( const MultiItemCallback &callback) { RAY_LOG(DEBUG) << "Getting all placement group info."; rpc::GetAllPlacementGroupRequest request; @@ -1504,7 +1488,7 @@ Status ServiceBasedPlacementGroupInfoAccessor::AsyncGetAll( return Status::OK(); } -Status ServiceBasedPlacementGroupInfoAccessor::AsyncWaitUntilReady( +Status PlacementGroupInfoAccessor::AsyncWaitUntilReady( const PlacementGroupID &placement_group_id, const StatusCallback &callback) { RAY_LOG(DEBUG) << "Waiting for placement group until ready, placement group id = " << placement_group_id; @@ -1522,11 +1506,10 @@ Status ServiceBasedPlacementGroupInfoAccessor::AsyncWaitUntilReady( return Status::OK(); } -ServiceBasedInternalKVAccessor::ServiceBasedInternalKVAccessor( - ServiceBasedGcsClient *client_impl) +InternalKVAccessor::InternalKVAccessor(GcsClient *client_impl) : client_impl_(client_impl) {} -Status ServiceBasedInternalKVAccessor::AsyncInternalKVGet( +Status InternalKVAccessor::AsyncInternalKVGet( const std::string &key, const OptionalItemCallback &callback) { rpc::InternalKVGetRequest req; req.set_key(key); @@ -1541,9 +1524,9 @@ Status ServiceBasedInternalKVAccessor::AsyncInternalKVGet( return Status::OK(); } -Status ServiceBasedInternalKVAccessor::AsyncInternalKVPut( - const std::string &key, const std::string &value, bool overwrite, - const OptionalItemCallback &callback) { +Status InternalKVAccessor::AsyncInternalKVPut(const std::string &key, + const std::string &value, bool overwrite, + const OptionalItemCallback &callback) { rpc::InternalKVPutRequest req; req.set_key(key); req.set_value(value); @@ -1555,7 +1538,7 @@ Status ServiceBasedInternalKVAccessor::AsyncInternalKVPut( return Status::OK(); } -Status ServiceBasedInternalKVAccessor::AsyncInternalKVExists( +Status InternalKVAccessor::AsyncInternalKVExists( const std::string &key, const OptionalItemCallback &callback) { rpc::InternalKVExistsRequest req; req.set_key(key); @@ -1566,8 +1549,8 @@ Status ServiceBasedInternalKVAccessor::AsyncInternalKVExists( return Status::OK(); } -Status ServiceBasedInternalKVAccessor::AsyncInternalKVDel( - const std::string &key, const StatusCallback &callback) { +Status InternalKVAccessor::AsyncInternalKVDel(const std::string &key, + const StatusCallback &callback) { rpc::InternalKVDelRequest req; req.set_key(key); client_impl_->GetGcsRpcClient().InternalKVDel( @@ -1577,7 +1560,7 @@ Status ServiceBasedInternalKVAccessor::AsyncInternalKVDel( return Status::OK(); } -Status ServiceBasedInternalKVAccessor::AsyncInternalKVKeys( +Status InternalKVAccessor::AsyncInternalKVKeys( const std::string &prefix, const OptionalItemCallback> &callback) { rpc::InternalKVKeysRequest req; @@ -1593,5 +1576,58 @@ Status ServiceBasedInternalKVAccessor::AsyncInternalKVKeys( return Status::OK(); } +Status InternalKVAccessor::Put(const std::string &key, const std::string &value, + bool overwrite, bool &added) { + std::promise ret_promise; + RAY_CHECK_OK(AsyncInternalKVPut( + key, value, overwrite, + [&ret_promise, &added](Status status, boost::optional added_num) { + added = static_cast(added_num.value_or(0)); + ret_promise.set_value(status); + })); + return ret_promise.get_future().get(); +} + +Status InternalKVAccessor::Keys(const std::string &prefix, + std::vector &value) { + std::promise ret_promise; + RAY_CHECK_OK( + AsyncInternalKVKeys(prefix, [&ret_promise, &value](Status status, auto &values) { + value = values.value_or(std::vector()); + ret_promise.set_value(status); + })); + return ret_promise.get_future().get(); +} + +Status InternalKVAccessor::Get(const std::string &key, std::string &value) { + std::promise ret_promise; + RAY_CHECK_OK(AsyncInternalKVGet(key, [&ret_promise, &value](Status status, auto &v) { + if (v) { + value = *v; + } + ret_promise.set_value(status); + })); + return ret_promise.get_future().get(); +} + +Status InternalKVAccessor::Del(const std::string &key) { + std::promise ret_promise; + RAY_CHECK_OK(AsyncInternalKVDel( + key, [&ret_promise](Status status) { ret_promise.set_value(status); })); + return ret_promise.get_future().get(); +} + +Status InternalKVAccessor::Exists(const std::string &key, bool &exist) { + std::promise ret_promise; + RAY_CHECK_OK(AsyncInternalKVExists( + key, [&ret_promise, &exist](Status status, const boost::optional &value) { + if (value) { + exist = *value; + } + ret_promise.set_value(status); + })); + return ret_promise.get_future().get(); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/gcs_client/accessor.h similarity index 79% rename from src/ray/gcs/accessor.h rename to src/ray/gcs/gcs_client/accessor.h index fe84e3cd2..50eec25c6 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/gcs_client/accessor.h @@ -21,6 +21,7 @@ #include "ray/gcs/callback.h" #include "ray/gcs/entry_change_notification.h" #include "ray/rpc/client_call.h" +#include "ray/util/sequencer.h" #include "src/ray/protobuf/gcs.pb.h" #include "src/ray/protobuf/gcs_service.pb.h" @@ -28,27 +29,34 @@ namespace ray { namespace gcs { +using SubscribeOperation = std::function; + +using FetchDataOperation = std::function; + +class GcsClient; + /// \class ActorInfoAccessor /// `ActorInfoAccessor` is a sub-interface of `GcsClient`. /// This class includes all the methods that are related to accessing /// actor information in the GCS. class ActorInfoAccessor { public: + ActorInfoAccessor() = default; + explicit ActorInfoAccessor(GcsClient *client_impl); virtual ~ActorInfoAccessor() = default; - /// Get actor specification from GCS asynchronously. /// /// \param actor_id The ID of actor to look up in the GCS. /// \param callback Callback that will be called after lookup finishes. /// \return Status virtual Status AsyncGet(const ActorID &actor_id, - const OptionalItemCallback &callback) = 0; + const OptionalItemCallback &callback); /// Get all actor specification from the GCS asynchronously. /// /// \param callback Callback that will be called after lookup finishes. /// \return Status - virtual Status AsyncGetAll(const MultiItemCallback &callback) = 0; + virtual Status AsyncGetAll(const MultiItemCallback &callback); /// Get actor specification for a named actor from the GCS asynchronously. /// @@ -58,7 +66,7 @@ class ActorInfoAccessor { /// \return Status virtual Status AsyncGetByName( const std::string &name, const std::string &ray_namespace, - const OptionalItemCallback &callback) = 0; + const OptionalItemCallback &callback); /// List all named actors from the GCS asynchronously. /// @@ -68,7 +76,7 @@ class ActorInfoAccessor { /// \return Status virtual Status AsyncListNamedActors( bool all_namespaces, const std::string &ray_namespace, - const ItemCallback> &callback) = 0; + const ItemCallback> &callback); /// Register actor to GCS asynchronously. /// @@ -76,7 +84,7 @@ class ActorInfoAccessor { /// \param callback Callback that will be called after the actor info is written to GCS. /// \return Status virtual Status AsyncRegisterActor(const TaskSpecification &task_spec, - const StatusCallback &callback) = 0; + const StatusCallback &callback); /// Kill actor via GCS asynchronously. /// @@ -86,7 +94,7 @@ class ActorInfoAccessor { /// \param callback Callback that will be called after the actor is destroyed. /// \return Status virtual Status AsyncKillActor(const ActorID &actor_id, bool force_kill, bool no_restart, - const StatusCallback &callback) = 0; + const StatusCallback &callback); /// Asynchronously request GCS to create the actor. /// @@ -100,7 +108,7 @@ class ActorInfoAccessor { /// \return Status virtual Status AsyncCreateActor( const TaskSpecification &task_spec, - const rpc::ClientCallback &callback) = 0; + const rpc::ClientCallback &callback); /// Subscribe to any register or update operations of actors. /// @@ -111,7 +119,7 @@ class ActorInfoAccessor { /// \return Status virtual Status AsyncSubscribeAll( const SubscribeCallback &subscribe, - const StatusCallback &done) = 0; + const StatusCallback &done); /// Subscribe to any update operations of an actor. /// @@ -122,13 +130,13 @@ class ActorInfoAccessor { virtual Status AsyncSubscribe( const ActorID &actor_id, const SubscribeCallback &subscribe, - const StatusCallback &done) = 0; + const StatusCallback &done); /// Cancel subscription to an actor. /// /// \param actor_id The ID of the actor to be unsubscribed to. /// \return Status - virtual Status AsyncUnsubscribe(const ActorID &actor_id) = 0; + virtual Status AsyncUnsubscribe(const ActorID &actor_id); /// Reestablish subscription. /// This should be called when GCS server restarts from a failure. @@ -137,16 +145,35 @@ class ActorInfoAccessor { /// server. /// /// \param is_pubsub_server_restarted Whether pubsub server is restarted. - virtual void AsyncResubscribe(bool is_pubsub_server_restarted) = 0; + virtual void AsyncResubscribe(bool is_pubsub_server_restarted); /// Check if the specified actor is unsubscribed. /// /// \param actor_id The ID of the actor. /// \return Whether the specified actor is unsubscribed. - virtual bool IsActorUnsubscribed(const ActorID &actor_id) = 0; + virtual bool IsActorUnsubscribed(const ActorID &actor_id); - protected: - ActorInfoAccessor() = default; + private: + /// Save the subscribe operation in this function, so we can call it again when PubSub + /// server restarts from a failure. + SubscribeOperation subscribe_all_operation_; + + /// Save the fetch data operation in this function, so we can call it again when GCS + /// server restarts from a failure. + FetchDataOperation fetch_all_data_operation_; + + // Mutex to protect the subscribe_operations_ field and fetch_data_operations_ field. + absl::Mutex mutex_; + + /// Save the subscribe operation of actors. + std::unordered_map subscribe_operations_ + GUARDED_BY(mutex_); + + /// Save the fetch data operation of actors. + std::unordered_map fetch_data_operations_ + GUARDED_BY(mutex_); + + GcsClient *client_impl_; }; /// \class JobInfoAccessor @@ -155,8 +182,9 @@ class ActorInfoAccessor { /// job information in the GCS. class JobInfoAccessor { public: + JobInfoAccessor() = default; + explicit JobInfoAccessor(GcsClient *client_impl); virtual ~JobInfoAccessor() = default; - /// Add a job to GCS asynchronously. /// /// \param data_ptr The job that will be add to GCS. @@ -164,15 +192,14 @@ class JobInfoAccessor { /// to GCS. /// \return Status virtual Status AsyncAdd(const std::shared_ptr &data_ptr, - const StatusCallback &callback) = 0; + const StatusCallback &callback); /// Mark job as finished in GCS asynchronously. /// /// \param job_id ID of the job that will be make finished to GCS. /// \param callback Callback that will be called after update finished. /// \return Status - virtual Status AsyncMarkFinished(const JobID &job_id, - const StatusCallback &callback) = 0; + virtual Status AsyncMarkFinished(const JobID &job_id, const StatusCallback &callback); /// Subscribe to job updates. /// @@ -181,13 +208,13 @@ class JobInfoAccessor { /// \return Status virtual Status AsyncSubscribeAll( const SubscribeCallback &subscribe, - const StatusCallback &done) = 0; + const StatusCallback &done); /// Get all job info from GCS asynchronously. /// /// \param callback Callback that will be called after lookup finished. /// \return Status - virtual Status AsyncGetAll(const MultiItemCallback &callback) = 0; + virtual Status AsyncGetAll(const MultiItemCallback &callback); /// Reestablish subscription. /// This should be called when GCS server restarts from a failure. @@ -196,16 +223,24 @@ class JobInfoAccessor { /// server. /// /// \param is_pubsub_server_restarted Whether pubsub server is restarted. - virtual void AsyncResubscribe(bool is_pubsub_server_restarted) = 0; + virtual void AsyncResubscribe(bool is_pubsub_server_restarted); /// Increment and get next job id. This is not idempotent. /// /// \param done Callback that will be called when request successfully. /// \return Status - virtual Status AsyncGetNextJobID(const ItemCallback &callback) = 0; + virtual Status AsyncGetNextJobID(const ItemCallback &callback); - protected: - JobInfoAccessor() = default; + private: + /// Save the fetch data operation in this function, so we can call it again when GCS + /// server restarts from a failure. + FetchDataOperation fetch_all_data_operation_; + + /// Save the subscribe operation in this function, so we can call it again when PubSub + /// server restarts from a failure. + SubscribeOperation subscribe_operation_; + + GcsClient *client_impl_; }; /// \class TaskInfoAccessor @@ -214,8 +249,9 @@ class JobInfoAccessor { /// task information in the GCS. class TaskInfoAccessor { public: - virtual ~TaskInfoAccessor() {} - + TaskInfoAccessor() = default; + explicit TaskInfoAccessor(GcsClient *client_impl); + virtual ~TaskInfoAccessor() = default; /// Add a task to GCS asynchronously. /// /// \param data_ptr The task that will be added to GCS. @@ -223,7 +259,7 @@ class TaskInfoAccessor { /// to GCS. /// \return Status virtual Status AsyncAdd(const std::shared_ptr &data_ptr, - const StatusCallback &callback) = 0; + const StatusCallback &callback); /// Get task information from GCS asynchronously. /// @@ -231,7 +267,7 @@ class TaskInfoAccessor { /// \param callback Callback that is called after lookup finished. /// \return Status virtual Status AsyncGet(const TaskID &task_id, - const OptionalItemCallback &callback) = 0; + const OptionalItemCallback &callback); /// Add a task lease to GCS asynchronously. /// @@ -240,7 +276,7 @@ class TaskInfoAccessor { /// to GCS. /// \return Status virtual Status AsyncAddTaskLease(const std::shared_ptr &data_ptr, - const StatusCallback &callback) = 0; + const StatusCallback &callback); /// Get task lease information from GCS asynchronously. /// @@ -248,8 +284,7 @@ class TaskInfoAccessor { /// \param callback Callback that is called after lookup finished. /// \return Status virtual Status AsyncGetTaskLease( - const TaskID &task_id, - const OptionalItemCallback &callback) = 0; + const TaskID &task_id, const OptionalItemCallback &callback); /// Subscribe asynchronously to the event that the given task lease is added in GCS. /// @@ -261,13 +296,13 @@ class TaskInfoAccessor { virtual Status AsyncSubscribeTaskLease( const TaskID &task_id, const SubscribeCallback> &subscribe, - const StatusCallback &done) = 0; + const StatusCallback &done); /// Cancel subscription to a task lease asynchronously. /// /// \param task_id The ID of the task to be unsubscribed to. /// \return Status - virtual Status AsyncUnsubscribeTaskLease(const TaskID &task_id) = 0; + virtual Status AsyncUnsubscribeTaskLease(const TaskID &task_id); /// Attempt task reconstruction to GCS asynchronously. /// @@ -277,7 +312,7 @@ class TaskInfoAccessor { /// \return Status virtual Status AttemptTaskReconstruction( const std::shared_ptr &data_ptr, - const StatusCallback &callback) = 0; + const StatusCallback &callback); /// Reestablish subscription. /// This should be called when GCS server restarts from a failure. @@ -286,16 +321,24 @@ class TaskInfoAccessor { /// server. /// /// \param is_pubsub_server_restarted Whether pubsub server is restarted. - virtual void AsyncResubscribe(bool is_pubsub_server_restarted) = 0; + virtual void AsyncResubscribe(bool is_pubsub_server_restarted); /// Check if the specified task lease is unsubscribed. /// /// \param task_id The ID of the task. /// \return Whether the specified task lease is unsubscribed. - virtual bool IsTaskLeaseUnsubscribed(const TaskID &task_id) = 0; + virtual bool IsTaskLeaseUnsubscribed(const TaskID &task_id); - protected: - TaskInfoAccessor() = default; + private: + /// Save the subscribe operations, so we can call them again when PubSub + /// server restarts from a failure. + std::unordered_map subscribe_task_lease_operations_; + + /// Save the fetch data operation in this function, so we can call it again when GCS + /// server restarts from a failure. + std::unordered_map fetch_task_lease_data_operations_; + + GcsClient *client_impl_; }; /// `ObjectInfoAccessor` is a sub-interface of `GcsClient`. @@ -303,8 +346,9 @@ class TaskInfoAccessor { /// object information in the GCS. class ObjectInfoAccessor { public: - virtual ~ObjectInfoAccessor() {} - + ObjectInfoAccessor() = default; + explicit ObjectInfoAccessor(GcsClient *client_impl); + virtual ~ObjectInfoAccessor() = default; /// Get object's locations from GCS asynchronously. /// /// \param object_id The ID of object to lookup in GCS. @@ -312,14 +356,13 @@ class ObjectInfoAccessor { /// \return Status virtual Status AsyncGetLocations( const ObjectID &object_id, - const OptionalItemCallback &callback) = 0; + const OptionalItemCallback &callback); /// Get all object's locations from GCS asynchronously. /// /// \param callback Callback that will be called after lookup finished. /// \return Status - virtual Status AsyncGetAll( - const MultiItemCallback &callback) = 0; + virtual Status AsyncGetAll(const MultiItemCallback &callback); /// Add location of object to GCS asynchronously. /// @@ -328,7 +371,7 @@ class ObjectInfoAccessor { /// \param callback Callback that will be called after object has been added to GCS. /// \return Status virtual Status AsyncAddLocation(const ObjectID &object_id, const NodeID &node_id, - size_t object_size, const StatusCallback &callback) = 0; + size_t object_size, const StatusCallback &callback); /// Add spilled location of object to GCS asynchronously. /// @@ -340,7 +383,7 @@ class ObjectInfoAccessor { virtual Status AsyncAddSpilledUrl(const ObjectID &object_id, const std::string &spilled_url, const NodeID &spilled_node_id, size_t object_size, - const StatusCallback &callback) = 0; + const StatusCallback &callback); /// Remove location of object from GCS asynchronously. /// @@ -349,7 +392,7 @@ class ObjectInfoAccessor { /// \param callback Callback that will be called after the delete finished. /// \return Status virtual Status AsyncRemoveLocation(const ObjectID &object_id, const NodeID &node_id, - const StatusCallback &callback) = 0; + const StatusCallback &callback); /// Subscribe to any update of an object's location. /// @@ -362,13 +405,13 @@ class ObjectInfoAccessor { const ObjectID &object_id, const SubscribeCallback> &subscribe, - const StatusCallback &done) = 0; + const StatusCallback &done); /// Cancel subscription to any update of an object's location. /// /// \param object_id The ID of the object to be unsubscribed to. /// \return Status - virtual Status AsyncUnsubscribeToLocations(const ObjectID &object_id) = 0; + virtual Status AsyncUnsubscribeToLocations(const ObjectID &object_id); /// Reestablish subscription. /// This should be called when GCS server restarts from a failure. @@ -377,16 +420,32 @@ class ObjectInfoAccessor { /// server. /// /// \param is_pubsub_server_restarted Whether pubsub server is restarted. - virtual void AsyncResubscribe(bool is_pubsub_server_restarted) = 0; + virtual void AsyncResubscribe(bool is_pubsub_server_restarted); /// Check if the specified object is unsubscribed. /// /// \param object_id The ID of the object. /// \return Whether the specified object is unsubscribed. - virtual bool IsObjectUnsubscribed(const ObjectID &object_id) = 0; + virtual bool IsObjectUnsubscribed(const ObjectID &object_id); - protected: - ObjectInfoAccessor() = default; + private: + // Mutex to protect the subscribe_object_operations_ field and + // fetch_object_data_operations_ field. + absl::Mutex mutex_; + + /// Save the subscribe operations, so we can call them again when PubSub + /// server restarts from a failure. + std::unordered_map subscribe_object_operations_ + GUARDED_BY(mutex_); + + /// Save the fetch data operation in this function, so we can call it again when GCS + /// server restarts from a failure. + std::unordered_map fetch_object_data_operations_ + GUARDED_BY(mutex_); + + GcsClient *client_impl_; + + Sequencer sequencer_; }; /// \class NodeInfoAccessor @@ -395,15 +454,16 @@ class ObjectInfoAccessor { /// node information in the GCS. class NodeInfoAccessor { public: + NodeInfoAccessor() = default; + explicit NodeInfoAccessor(GcsClient *client_impl); virtual ~NodeInfoAccessor() = default; - /// Register local node to GCS asynchronously. /// /// \param node_info The information of node to register to GCS. /// \param callback Callback that will be called when registration is complete. /// \return Status virtual Status RegisterSelf(const rpc::GcsNodeInfo &local_node_info, - const StatusCallback &callback) = 0; + const StatusCallback &callback); /// Drain (remove the information of the node from the cluster) the local node from GCS /// synchronously. @@ -411,17 +471,17 @@ class NodeInfoAccessor { /// Once the RPC is replied, it is guaranteed that GCS drains the information of the /// local node, and all the nodes in the cluster will "eventually" be informed that the /// node is drained. \return Status - virtual Status DrainSelf() = 0; + virtual Status DrainSelf(); /// Get id of local node which was registered by 'RegisterSelf'. /// /// \return NodeID - virtual const NodeID &GetSelfId() const = 0; + virtual const NodeID &GetSelfId() const; /// Get information of local node which was registered by 'RegisterSelf'. /// /// \return GcsNodeInfo - virtual const rpc::GcsNodeInfo &GetSelfInfo() const = 0; + virtual const rpc::GcsNodeInfo &GetSelfInfo() const; /// Register a node to GCS asynchronously. /// @@ -429,7 +489,7 @@ class NodeInfoAccessor { /// \param callback Callback that will be called when registration is complete. /// \return Status virtual Status AsyncRegister(const rpc::GcsNodeInfo &node_info, - const StatusCallback &callback) = 0; + const StatusCallback &callback); /// Drain (remove the information of the node from the cluster) the local node from GCS /// asynchronously. @@ -439,14 +499,13 @@ class NodeInfoAccessor { /// \param node_id The ID of node that to be unregistered. /// \param callback Callback that will be called when unregistration is complete. /// \return Status - virtual Status AsyncDrainNode(const NodeID &node_id, - const StatusCallback &callback) = 0; + virtual Status AsyncDrainNode(const NodeID &node_id, const StatusCallback &callback); /// Get information of all nodes from GCS asynchronously. /// /// \param callback Callback that will be called after lookup finishes. /// \return Status - virtual Status AsyncGetAll(const MultiItemCallback &callback) = 0; + virtual Status AsyncGetAll(const MultiItemCallback &callback); /// Subscribe to node addition and removal events from GCS and cache those information. /// @@ -457,7 +516,7 @@ class NodeInfoAccessor { /// \return Status virtual Status AsyncSubscribeToNodeChange( const SubscribeCallback &subscribe, - const StatusCallback &done) = 0; + const StatusCallback &done); /// Get node information from local cache. /// Non-thread safe. @@ -467,9 +526,9 @@ class NodeInfoAccessor { /// \param node_id The ID of node to look up in local cache. /// \param filter_dead_nodes Whether or not if this method will filter dead nodes. /// \return The item returned by GCS. If the item to read doesn't exist or the node is - /// dead, this optional object is empty. - virtual const rpc::GcsNodeInfo *Get(const NodeID &node_id, - bool filter_dead_nodes = true) const = 0; + virtual /// dead, this optional object is empty. + const rpc::GcsNodeInfo * + Get(const NodeID &node_id, bool filter_dead_nodes = true) const; /// Get information of all nodes from local cache. /// Non-thread safe. @@ -477,7 +536,7 @@ class NodeInfoAccessor { /// is called before. /// /// \return All nodes in cache. - virtual const std::unordered_map &GetAll() const = 0; + virtual const std::unordered_map &GetAll() const; /// Search the local cache to find out if the given node is removed. /// Non-thread safe. @@ -486,17 +545,17 @@ class NodeInfoAccessor { /// /// \param node_id The id of the node to check. /// \return Whether the node is removed. - virtual bool IsRemoved(const NodeID &node_id) const = 0; + virtual bool IsRemoved(const NodeID &node_id) const; /// Report heartbeat of a node to GCS asynchronously. /// /// \param data_ptr The heartbeat that will be reported to GCS. /// \param callback Callback that will be called after report finishes. /// \return Status - // TODO(micafan) NodeStateAccessor will call this method to report heartbeat. - virtual Status AsyncReportHeartbeat( - const std::shared_ptr &data_ptr, - const StatusCallback &callback) = 0; + virtual // TODO(micafan) NodeStateAccessor will call this method to report heartbeat. + Status + AsyncReportHeartbeat(const std::shared_ptr &data_ptr, + const StatusCallback &callback); /// Reestablish subscription. /// This should be called when GCS server restarts from a failure. @@ -505,17 +564,41 @@ class NodeInfoAccessor { /// server. /// /// \param is_pubsub_server_restarted Whether pubsub server is restarted. - virtual void AsyncResubscribe(bool is_pubsub_server_restarted) = 0; + virtual void AsyncResubscribe(bool is_pubsub_server_restarted); /// Get the internal config string from GCS. /// /// \param callback Processes a map of config options /// \return Status virtual Status AsyncGetInternalConfig( - const OptionalItemCallback &callback) = 0; + const OptionalItemCallback &callback); - protected: - NodeInfoAccessor() = default; + private: + /// Save the subscribe operation in this function, so we can call it again when PubSub + /// server restarts from a failure. + SubscribeOperation subscribe_node_operation_; + + /// Save the fetch data operation in this function, so we can call it again when GCS + /// server restarts from a failure. + FetchDataOperation fetch_node_data_operation_; + + void HandleNotification(const rpc::GcsNodeInfo &node_info); + + GcsClient *client_impl_; + + using NodeChangeCallback = + std::function; + + rpc::GcsNodeInfo local_node_info_; + NodeID local_node_id_; + + /// The callback to call when a new node is added or a node is removed. + NodeChangeCallback node_change_callback_{nullptr}; + + /// A cache for information about all nodes. + std::unordered_map node_cache_; + /// The set of removed nodes. + std::unordered_set removed_nodes_; }; /// \class NodeResourceInfoAccessor @@ -524,8 +607,9 @@ class NodeInfoAccessor { /// node resource information in the GCS. class NodeResourceInfoAccessor { public: + NodeResourceInfoAccessor() = default; + explicit NodeResourceInfoAccessor(GcsClient *client_impl); virtual ~NodeResourceInfoAccessor() = default; - // TODO(micafan) Define ResourceMap in GCS proto. typedef std::unordered_map> ResourceMap; @@ -536,14 +620,14 @@ class NodeResourceInfoAccessor { /// \param callback Callback that will be called after lookup finishes. /// \return Status virtual Status AsyncGetResources(const NodeID &node_id, - const OptionalItemCallback &callback) = 0; + const OptionalItemCallback &callback); /// Get available resources of all nodes from GCS asynchronously. /// /// \param callback Callback that will be called after lookup finishes. /// \return Status virtual Status AsyncGetAllAvailableResources( - const MultiItemCallback &callback) = 0; + const MultiItemCallback &callback); /// Update resources of node in GCS asynchronously. /// @@ -551,7 +635,7 @@ class NodeResourceInfoAccessor { /// \param resources The dynamic resources of node to be updated. /// \param callback Callback that will be called after update finishes. virtual Status AsyncUpdateResources(const NodeID &node_id, const ResourceMap &resources, - const StatusCallback &callback) = 0; + const StatusCallback &callback); /// Delete resources of a node from GCS asynchronously. /// @@ -560,7 +644,7 @@ class NodeResourceInfoAccessor { /// \param callback Callback that will be called after delete finishes. virtual Status AsyncDeleteResources(const NodeID &node_id, const std::vector &resource_names, - const StatusCallback &callback) = 0; + const StatusCallback &callback); /// Subscribe to node resource changes. /// @@ -568,8 +652,7 @@ class NodeResourceInfoAccessor { /// \param done Callback that will be called when subscription is complete. /// \return Status virtual Status AsyncSubscribeToResources( - const ItemCallback &subscribe, - const StatusCallback &done) = 0; + const ItemCallback &subscribe, const StatusCallback &done); /// Reestablish subscription. /// This should be called when GCS server restarts from a failure. @@ -578,7 +661,7 @@ class NodeResourceInfoAccessor { /// server. /// /// \param is_pubsub_server_restarted Whether pubsub server is restarted. - virtual void AsyncResubscribe(bool is_pubsub_server_restarted) = 0; + virtual void AsyncResubscribe(bool is_pubsub_server_restarted); /// Report resource usage of a node to GCS asynchronously. /// @@ -587,13 +670,13 @@ class NodeResourceInfoAccessor { /// \return Status virtual Status AsyncReportResourceUsage( const std::shared_ptr &data_ptr, - const StatusCallback &callback) = 0; + const StatusCallback &callback); /// Resend resource usage when GCS restarts from a failure. - virtual void AsyncReReportResourceUsage() = 0; + virtual void AsyncReReportResourceUsage(); /// Return resources in last report. Used by light heartbeat. - const std::shared_ptr &GetLastResourceUsage() { + virtual const std::shared_ptr &GetLastResourceUsage() { return last_resource_usage_; } @@ -602,7 +685,7 @@ class NodeResourceInfoAccessor { /// \param callback Callback that will be called after lookup finishes. /// \return Status virtual Status AsyncGetAllResourceUsage( - const ItemCallback &callback) = 0; + const ItemCallback &callback); /// Subscribe batched state of all nodes from GCS. /// @@ -612,15 +695,33 @@ class NodeResourceInfoAccessor { /// \return Status virtual Status AsyncSubscribeBatchedResourceUsage( const ItemCallback &subscribe, - const StatusCallback &done) = 0; + const StatusCallback &done); + + /// Fill resource fields with cached resources. Used by light resource usage report. + virtual void FillResourceUsageRequest(rpc::ReportResourceUsageRequest &resource_usage); protected: - NodeResourceInfoAccessor() = default; - /// Cache which stores resource usage in last report used to check if they are changed. /// Used by light resource usage report. std::shared_ptr last_resource_usage_ = std::make_shared(); + + private: + // Mutex to protect the cached_resource_usage_ field. + absl::Mutex mutex_; + + /// Save the resource usage data, so we can resend it again when GCS server restarts + /// from a failure. + rpc::ReportResourceUsageRequest cached_resource_usage_ GUARDED_BY(mutex_); + + /// Save the subscribe operation in this function, so we can call it again when PubSub + /// server restarts from a failure. + SubscribeOperation subscribe_resource_operation_; + SubscribeOperation subscribe_batch_resource_usage_operation_; + + GcsClient *client_impl_; + + Sequencer sequencer_; }; /// \class ErrorInfoAccessor @@ -629,8 +730,9 @@ class NodeResourceInfoAccessor { /// error information in the GCS. class ErrorInfoAccessor { public: + ErrorInfoAccessor() = default; + explicit ErrorInfoAccessor(GcsClient *client_impl); virtual ~ErrorInfoAccessor() = default; - /// Report a job error to GCS asynchronously. /// The error message will be pushed to the driver of a specific if it is /// a job internal error, or broadcast to all drivers if it is a system error. @@ -643,10 +745,10 @@ class ErrorInfoAccessor { /// \param callback Callback that will be called when report is complete. /// \return Status virtual Status AsyncReportJobError(const std::shared_ptr &data_ptr, - const StatusCallback &callback) = 0; + const StatusCallback &callback); - protected: - ErrorInfoAccessor() = default; + private: + GcsClient *client_impl_; }; /// \class StatsInfoAccessor @@ -655,8 +757,9 @@ class ErrorInfoAccessor { /// stats in the GCS. class StatsInfoAccessor { public: + StatsInfoAccessor() = default; + explicit StatsInfoAccessor(GcsClient *client_impl); virtual ~StatsInfoAccessor() = default; - /// Add profile data to GCS asynchronously. /// /// \param data_ptr The profile data that will be added to GCS. @@ -664,17 +767,16 @@ class StatsInfoAccessor { /// \return Status virtual Status AsyncAddProfileData( const std::shared_ptr &data_ptr, - const StatusCallback &callback) = 0; + const StatusCallback &callback); /// Get all profile info from GCS asynchronously. /// /// \param callback Callback that will be called after lookup finished. /// \return Status - virtual Status AsyncGetAll( - const MultiItemCallback &callback) = 0; + virtual Status AsyncGetAll(const MultiItemCallback &callback); - protected: - StatsInfoAccessor() = default; + private: + GcsClient *client_impl_; }; /// \class WorkerInfoAccessor @@ -683,8 +785,9 @@ class StatsInfoAccessor { /// worker information in the GCS. class WorkerInfoAccessor { public: + WorkerInfoAccessor() = default; + explicit WorkerInfoAccessor(GcsClient *client_impl); virtual ~WorkerInfoAccessor() = default; - /// Subscribe to all unexpected failure of workers from GCS asynchronously. /// Note that this does not include workers that failed due to node failure /// and only fileds in WorkerDeltaData would be published. @@ -693,8 +796,7 @@ class WorkerInfoAccessor { /// \param done Callback that will be called when subscription is complete. /// \return Status virtual Status AsyncSubscribeToWorkerFailures( - const ItemCallback &subscribe, - const StatusCallback &done) = 0; + const ItemCallback &subscribe, const StatusCallback &done); /// Report a worker failure to GCS asynchronously. /// @@ -703,7 +805,7 @@ class WorkerInfoAccessor { /// \param Status virtual Status AsyncReportWorkerFailure( const std::shared_ptr &data_ptr, - const StatusCallback &callback) = 0; + const StatusCallback &callback); /// Get worker specification from GCS asynchronously. /// @@ -711,13 +813,13 @@ class WorkerInfoAccessor { /// \param callback Callback that will be called after lookup finishes. /// \return Status virtual Status AsyncGet(const WorkerID &worker_id, - const OptionalItemCallback &callback) = 0; + const OptionalItemCallback &callback); /// Get all worker info from GCS asynchronously. /// /// \param callback Callback that will be called after lookup finished. /// \return Status - virtual Status AsyncGetAll(const MultiItemCallback &callback) = 0; + virtual Status AsyncGetAll(const MultiItemCallback &callback); /// Add worker information to GCS asynchronously. /// @@ -726,7 +828,7 @@ class WorkerInfoAccessor { /// to GCS. /// \return Status virtual Status AsyncAdd(const std::shared_ptr &data_ptr, - const StatusCallback &callback) = 0; + const StatusCallback &callback); /// Reestablish subscription. /// This should be called when GCS server restarts from a failure. @@ -735,16 +837,21 @@ class WorkerInfoAccessor { /// server. /// /// \param is_pubsub_server_restarted Whether pubsub server is restarted. - virtual void AsyncResubscribe(bool is_pubsub_server_restarted) = 0; + virtual void AsyncResubscribe(bool is_pubsub_server_restarted); - protected: - WorkerInfoAccessor() = default; + private: + /// Save the subscribe operation in this function, so we can call it again when GCS + /// restarts from a failure. + SubscribeOperation subscribe_operation_; + + GcsClient *client_impl_; }; class PlacementGroupInfoAccessor { public: + PlacementGroupInfoAccessor() = default; + explicit PlacementGroupInfoAccessor(GcsClient *client_impl); virtual ~PlacementGroupInfoAccessor() = default; - /// Create a placement group to GCS asynchronously. /// /// \param placement_group_spec The specification for the placement group creation task. @@ -753,7 +860,7 @@ class PlacementGroupInfoAccessor { /// \return Status. virtual Status AsyncCreatePlacementGroup( const PlacementGroupSpecification &placement_group_spec, - const StatusCallback &callback) = 0; + const StatusCallback &callback); /// Get a placement group data from GCS asynchronously by id. /// @@ -761,7 +868,7 @@ class PlacementGroupInfoAccessor { /// \return Status. virtual Status AsyncGet( const PlacementGroupID &placement_group_id, - const OptionalItemCallback &callback) = 0; + const OptionalItemCallback &callback); /// Get a placement group data from GCS asynchronously by name. /// @@ -769,14 +876,14 @@ class PlacementGroupInfoAccessor { /// \return Status. virtual Status AsyncGetByName( const std::string &placement_group_name, const std::string &ray_namespace, - const OptionalItemCallback &callback) = 0; + const OptionalItemCallback &callback); /// Get all placement group info from GCS asynchronously. /// /// \param callback Callback that will be called after lookup finished. /// \return Status virtual Status AsyncGetAll( - const MultiItemCallback &callback) = 0; + const MultiItemCallback &callback); /// Remove a placement group to GCS asynchronously. /// @@ -785,7 +892,7 @@ class PlacementGroupInfoAccessor { /// removed from GCS. /// \return Status virtual Status AsyncRemovePlacementGroup(const PlacementGroupID &placement_group_id, - const StatusCallback &callback) = 0; + const StatusCallback &callback); /// Wait for a placement group until ready asynchronously. /// @@ -793,16 +900,17 @@ class PlacementGroupInfoAccessor { /// \param callback Callback that will be called after the placement group is created. /// \return Status virtual Status AsyncWaitUntilReady(const PlacementGroupID &placement_group_id, - const StatusCallback &callback) = 0; + const StatusCallback &callback); - protected: - PlacementGroupInfoAccessor() = default; + private: + GcsClient *client_impl_; }; class InternalKVAccessor { public: + InternalKVAccessor() = default; + explicit InternalKVAccessor(GcsClient *client_impl); virtual ~InternalKVAccessor() = default; - /// Asynchronously list keys with prefix stored in internal kv /// /// \param prefix The prefix to scan. @@ -810,14 +918,14 @@ class InternalKVAccessor { /// \return Status virtual Status AsyncInternalKVKeys( const std::string &prefix, - const OptionalItemCallback> &callback) = 0; + const OptionalItemCallback> &callback); /// Asynchronously get the value for a given key. /// /// \param key The key to lookup. /// \param callback Callback that will be called after get the value. - virtual Status AsyncInternalKVGet( - const std::string &key, const OptionalItemCallback &callback) = 0; + virtual Status AsyncInternalKVGet(const std::string &key, + const OptionalItemCallback &callback); /// Asynchronously set the value for a given key. /// @@ -827,7 +935,7 @@ class InternalKVAccessor { /// \return Status virtual Status AsyncInternalKVPut(const std::string &key, const std::string &value, bool overwrite, - const OptionalItemCallback &callback) = 0; + const OptionalItemCallback &callback); /// Asynchronously check the existence of a given key /// @@ -835,7 +943,7 @@ class InternalKVAccessor { /// \param callback Callback that will be called after the operation. /// \return Status virtual Status AsyncInternalKVExists(const std::string &key, - const OptionalItemCallback &callback) = 0; + const OptionalItemCallback &callback); /// Asynchronously delete a key /// @@ -843,7 +951,7 @@ class InternalKVAccessor { /// \param callback Callback that will be called after the operation. /// \return Status virtual Status AsyncInternalKVDel(const std::string &key, - const StatusCallback &callback) = 0; + const StatusCallback &callback); // These are sync functions of the async above @@ -852,7 +960,7 @@ class InternalKVAccessor { /// \param prefix The prefix to scan. /// \param value It's an output parameter. It'll be set to the keys with `prefix` /// \return Status - Status Keys(const std::string &prefix, std::vector &value); + virtual Status Keys(const std::string &prefix, std::vector &value); /// Set the in the store /// @@ -863,21 +971,21 @@ class InternalKVAccessor { /// \param added It's an output parameter. It'll be set to be true if /// any row is added. /// \return Status - Status Put(const std::string &key, const std::string &value, bool overwrite, - bool &added); + virtual Status Put(const std::string &key, const std::string &value, bool overwrite, + bool &added); /// Retrive the value associated with a key /// /// \param key The key to lookup /// \param value It's an output parameter. It'll be set to the value of the key /// \return Status - Status Get(const std::string &key, std::string &value); + virtual Status Get(const std::string &key, std::string &value); /// Delete the key /// /// \param key The key to delete /// \return Status - Status Del(const std::string &key); + virtual Status Del(const std::string &key); /// Check existence of a key in the store /// @@ -885,10 +993,10 @@ class InternalKVAccessor { /// \param exist It's an output parameter. It'll be true if the key exists in the /// system. Otherwise, it'll be set to be false. /// \return Status - Status Exists(const std::string &key, bool &exist); + virtual Status Exists(const std::string &key, bool &exist); - protected: - InternalKVAccessor() = default; + private: + GcsClient *client_impl_; }; } // namespace gcs diff --git a/src/ray/gcs/gcs_client/service_based_gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc similarity index 87% rename from src/ray/gcs/gcs_client/service_based_gcs_client.cc rename to src/ray/gcs/gcs_client/gcs_client.cc index 27fb805d4..059bd3bcd 100644 --- a/src/ray/gcs/gcs_client/service_based_gcs_client.cc +++ b/src/ray/gcs/gcs_client/gcs_client.cc @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "ray/gcs/gcs_client/service_based_gcs_client.h" +#include "ray/gcs/gcs_client/gcs_client.h" #include #include "ray/common/ray_config.h" -#include "ray/gcs/gcs_client/service_based_accessor.h" +#include "ray/gcs/gcs_client/accessor.h" #include "ray/pubsub/subscriber.h" extern "C" { @@ -77,15 +77,15 @@ void GcsSubscriberClient::PubsubCommandBatch( } // namespace -ServiceBasedGcsClient::ServiceBasedGcsClient( +GcsClient::GcsClient( const GcsClientOptions &options, std::function *)> get_gcs_server_address_func) - : GcsClient(options), + : options_(options), get_server_address_func_(std::move(get_gcs_server_address_func)), last_reconnect_timestamp_ms_(0), last_reconnect_address_(std::make_pair("", -1)) {} -Status ServiceBasedGcsClient::Connect(instrumented_io_context &io_service) { +Status GcsClient::Connect(instrumented_io_context &io_service) { RAY_CHECK(!is_connected_); if (options_.server_ip_.empty()) { @@ -169,18 +169,17 @@ Status ServiceBasedGcsClient::Connect(instrumented_io_context &io_service) { gcs_subscriber_ = std::make_unique(redis_client_, gcs_address, std::move(subscriber)); - job_accessor_ = std::make_unique(this); - actor_accessor_ = std::make_unique(this); - node_accessor_ = std::make_unique(this); - node_resource_accessor_ = std::make_unique(this); - task_accessor_ = std::make_unique(this); - object_accessor_ = std::make_unique(this); - stats_accessor_ = std::make_unique(this); - error_accessor_ = std::make_unique(this); - worker_accessor_ = std::make_unique(this); - placement_group_accessor_ = - std::make_unique(this); - internal_kv_accessor_ = std::make_unique(this); + job_accessor_ = std::make_unique(this); + actor_accessor_ = std::make_unique(this); + node_accessor_ = std::make_unique(this); + node_resource_accessor_ = std::make_unique(this); + task_accessor_ = std::make_unique(this); + object_accessor_ = std::make_unique(this); + stats_accessor_ = std::make_unique(this); + error_accessor_ = std::make_unique(this); + worker_accessor_ = std::make_unique(this); + placement_group_accessor_ = std::make_unique(this); + internal_kv_accessor_ = std::make_unique(this); // Init gcs service address check timer. periodical_runner_ = std::make_unique(io_service); periodical_runner_->RunFnPeriodically( @@ -190,13 +189,13 @@ Status ServiceBasedGcsClient::Connect(instrumented_io_context &io_service) { is_connected_ = true; - RAY_LOG(DEBUG) << "ServiceBasedGcsClient connected."; + RAY_LOG(DEBUG) << "GcsClient connected."; return Status::OK(); } -void ServiceBasedGcsClient::Disconnect() { +void GcsClient::Disconnect() { if (!is_connected_) { - RAY_LOG(WARNING) << "ServiceBasedGcsClient has been disconnected."; + RAY_LOG(WARNING) << "GcsClient has been disconnected."; return; } is_connected_ = false; @@ -204,15 +203,16 @@ void ServiceBasedGcsClient::Disconnect() { gcs_subscriber_.reset(); redis_client_->Disconnect(); redis_client_.reset(); - RAY_LOG(DEBUG) << "ServiceBasedGcsClient Disconnected."; + RAY_LOG(DEBUG) << "GcsClient Disconnected."; } -std::pair ServiceBasedGcsClient::GetGcsServerAddress() { +std::pair GcsClient::GetGcsServerAddress() { return current_gcs_server_address_; } -bool ServiceBasedGcsClient::GetGcsServerAddressFromRedis( - redisContext *context, std::pair *address, int max_attempts) { +bool GcsClient::GetGcsServerAddressFromRedis(redisContext *context, + std::pair *address, + int max_attempts) { // Get gcs server address. int num_attempts = 0; redisReply *reply = nullptr; @@ -250,7 +250,7 @@ bool ServiceBasedGcsClient::GetGcsServerAddressFromRedis( return false; } -void ServiceBasedGcsClient::PeriodicallyCheckGcsServerAddress() { +void GcsClient::PeriodicallyCheckGcsServerAddress() { std::pair address; if (get_server_address_func_(&address)) { if (address != current_gcs_server_address_) { @@ -262,7 +262,7 @@ void ServiceBasedGcsClient::PeriodicallyCheckGcsServerAddress() { } } -void ServiceBasedGcsClient::GcsServiceFailureDetected(rpc::GcsServiceFailureType type) { +void GcsClient::GcsServiceFailureDetected(rpc::GcsServiceFailureType type) { switch (type) { case rpc::GcsServiceFailureType::RPC_DISCONNECT: // If the GCS server address does not change, reconnect to GCS server. @@ -285,7 +285,7 @@ void ServiceBasedGcsClient::GcsServiceFailureDetected(rpc::GcsServiceFailureType } } -void ServiceBasedGcsClient::ReconnectGcsServer() { +void GcsClient::ReconnectGcsServer() { std::pair address; int index = 0; for (; index < RayConfig::instance().ping_gcs_rpc_server_max_retries(); ++index) { diff --git a/src/ray/gcs/gcs_client.h b/src/ray/gcs/gcs_client/gcs_client.h similarity index 67% rename from src/ray/gcs/gcs_client.h rename to src/ray/gcs/gcs_client/gcs_client.h index 7b0d0917e..308fd5bf3 100644 --- a/src/ray/gcs/gcs_client.h +++ b/src/ray/gcs/gcs_client/gcs_client.h @@ -18,11 +18,16 @@ #include #include #include -#include "gtest/gtest_prod.h" +#include "gtest/gtest_prod.h" #include "ray/common/asio/instrumented_io_context.h" +#include "ray/common/asio/periodical_runner.h" +#include "ray/common/id.h" #include "ray/common/status.h" -#include "ray/gcs/accessor.h" +#include "ray/gcs/gcs_client/accessor.h" +#include "ray/gcs/pubsub/gcs_pub_sub.h" +#include "ray/gcs/redis_client.h" +#include "ray/rpc/gcs_server/gcs_rpc_client.h" #include "ray/util/logging.h" namespace ray { @@ -69,22 +74,29 @@ class GcsClientOptions { /// /// To read and write from the GCS, `Connect()` must be called and return Status::OK. /// Before exit, `Disconnect()` must be called. -class GcsClient : public std::enable_shared_from_this { +class RAY_EXPORT GcsClient : public std::enable_shared_from_this { public: - virtual ~GcsClient() {} + GcsClient() = default; + /// Constructor of GcsClient. + /// + /// \param options Options for client. + /// \param get_gcs_server_address_func Function to get GCS server address. + explicit GcsClient(const GcsClientOptions &options, + std::function *)> + get_gcs_server_address_func = {}); + + virtual ~GcsClient() = default; /// Connect to GCS Service. Non-thread safe. /// This function must be called before calling other functions. /// /// \return Status - virtual Status Connect(instrumented_io_context &io_service) = 0; + virtual Status Connect(instrumented_io_context &io_service); /// Disconnect with GCS Service. Non-thread safe. - virtual void Disconnect() = 0; + virtual void Disconnect(); - virtual std::pair GetGcsServerAddress() { - return std::make_pair("", 0); - } + virtual std::pair GetGcsServerAddress(); /// Return client information for debug. virtual std::string DebugString() const { return ""; } @@ -161,14 +173,13 @@ class GcsClient : public std::enable_shared_from_this { /// Get the sub-interface for accessing worker information in GCS. /// This function is thread safe. - InternalKVAccessor &InternalKV() { return *internal_kv_accessor_; } + virtual InternalKVAccessor &InternalKV() { return *internal_kv_accessor_; } + + virtual GcsSubscriber &GetGcsSubscriber() { return *gcs_subscriber_; } + + virtual rpc::GcsRpcClient &GetGcsRpcClient() { return *gcs_rpc_client_; } protected: - /// Constructor of GcsClient. - /// - /// \param options Options for client. - GcsClient(const GcsClientOptions &options = GcsClientOptions()) : options_(options) {} - GcsClientOptions options_; /// Whether this client is connected to GCS. @@ -185,6 +196,48 @@ class GcsClient : public std::enable_shared_from_this { std::unique_ptr worker_accessor_; std::unique_ptr placement_group_accessor_; std::unique_ptr internal_kv_accessor_; + + private: + /// Get gcs server address from redis. + /// This address is set by GcsServer::StoreGcsServerAddressInRedis function. + /// + /// \param context The context of redis. + /// \param address The address of gcs server. + /// \param max_attempts The maximum number of times to get gcs server rpc address. + /// \return Returns true if gcs server address is obtained, False otherwise. + bool GetGcsServerAddressFromRedis(redisContext *context, + std::pair *address, + int max_attempts = 1); + + /// Fire a periodic timer to check if GCS sever address has changed. + void PeriodicallyCheckGcsServerAddress(); + + /// This function is used to redo subscription and reconnect to GCS RPC server when gcs + /// service failure is detected. + /// + /// \param type The type of GCS service failure. + void GcsServiceFailureDetected(rpc::GcsServiceFailureType type); + + /// Reconnect to GCS RPC server. + void ReconnectGcsServer(); + + std::shared_ptr redis_client_; + + const UniqueID gcs_client_id_ = UniqueID::FromRandom(); + + std::unique_ptr gcs_subscriber_; + + // Gcs rpc client + std::shared_ptr gcs_rpc_client_; + std::unique_ptr client_call_manager_; + + // The runner to run function periodically. + std::unique_ptr periodical_runner_; + std::function *)> get_server_address_func_; + std::function resubscribe_func_; + std::pair current_gcs_server_address_; + int64_t last_reconnect_timestamp_ms_; + std::pair last_reconnect_address_; }; } // namespace gcs diff --git a/src/ray/gcs/gcs_client/global_state_accessor.cc b/src/ray/gcs/gcs_client/global_state_accessor.cc index 6a6c65548..e1b0de7fd 100644 --- a/src/ray/gcs/gcs_client/global_state_accessor.cc +++ b/src/ray/gcs/gcs_client/global_state_accessor.cc @@ -37,7 +37,7 @@ GlobalStateAccessor::GlobalStateAccessor(const std::string &redis_address, options.enable_sync_conn_ = true; options.enable_async_conn_ = false; options.enable_subscribe_conn_ = false; - gcs_client_ = std::make_unique(options); + gcs_client_ = std::make_unique(options); io_service_ = std::make_unique(); diff --git a/src/ray/gcs/gcs_client/global_state_accessor.h b/src/ray/gcs/gcs_client/global_state_accessor.h index 1408baac1..9679759c3 100644 --- a/src/ray/gcs/gcs_client/global_state_accessor.h +++ b/src/ray/gcs/gcs_client/global_state_accessor.h @@ -16,9 +16,8 @@ #include "absl/base/thread_annotations.h" #include "absl/synchronization/mutex.h" - #include "ray/common/asio/instrumented_io_context.h" -#include "ray/gcs/gcs_client/service_based_gcs_client.h" +#include "ray/gcs/gcs_client/gcs_client.h" #include "ray/rpc/server_call.h" namespace ray { @@ -248,7 +247,7 @@ class GlobalStateAccessor { /// Whether this client is connected to gcs server. bool is_connected_ GUARDED_BY(mutex_) = false; - std::unique_ptr gcs_client_ GUARDED_BY(mutex_); + std::unique_ptr gcs_client_ GUARDED_BY(mutex_); std::unique_ptr thread_io_service_; std::unique_ptr io_service_; diff --git a/src/ray/gcs/gcs_client/service_based_accessor.h b/src/ray/gcs/gcs_client/service_based_accessor.h deleted file mode 100644 index bb97cf1d6..000000000 --- a/src/ray/gcs/gcs_client/service_based_accessor.h +++ /dev/null @@ -1,500 +0,0 @@ -// Copyright 2017 The Ray Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include "ray/common/task/task_spec.h" -#include "ray/gcs/accessor.h" -#include "ray/util/sequencer.h" -#include "src/ray/protobuf/gcs_service.pb.h" - -namespace ray { -namespace gcs { - -using SubscribeOperation = std::function; - -using FetchDataOperation = std::function; - -class ServiceBasedGcsClient; - -/// \class ServiceBasedJobInfoAccessor -/// ServiceBasedJobInfoAccessor is an implementation of `JobInfoAccessor` -/// that uses GCS Service as the backend. -class ServiceBasedJobInfoAccessor : public JobInfoAccessor { - public: - explicit ServiceBasedJobInfoAccessor(ServiceBasedGcsClient *client_impl); - - virtual ~ServiceBasedJobInfoAccessor() = default; - - Status AsyncAdd(const std::shared_ptr &data_ptr, - const StatusCallback &callback) override; - - Status AsyncMarkFinished(const JobID &job_id, const StatusCallback &callback) override; - - Status AsyncSubscribeAll(const SubscribeCallback &subscribe, - const StatusCallback &done) override; - - Status AsyncGetAll(const MultiItemCallback &callback) override; - - void AsyncResubscribe(bool is_pubsub_server_restarted) override; - - Status AsyncGetNextJobID(const ItemCallback &callback) override; - - private: - /// Save the fetch data operation in this function, so we can call it again when GCS - /// server restarts from a failure. - FetchDataOperation fetch_all_data_operation_; - - /// Save the subscribe operation in this function, so we can call it again when PubSub - /// server restarts from a failure. - SubscribeOperation subscribe_operation_; - - ServiceBasedGcsClient *client_impl_; -}; - -/// \class ServiceBasedActorInfoAccessor -/// ServiceBasedActorInfoAccessor is an implementation of `ActorInfoAccessor` -/// that uses GCS Service as the backend. -class ServiceBasedActorInfoAccessor : public ActorInfoAccessor { - public: - explicit ServiceBasedActorInfoAccessor(ServiceBasedGcsClient *client_impl); - - virtual ~ServiceBasedActorInfoAccessor() = default; - - Status AsyncGet(const ActorID &actor_id, - const OptionalItemCallback &callback) override; - - Status AsyncGetAll(const MultiItemCallback &callback) override; - - Status AsyncGetByName( - const std::string &name, const std::string &ray_namespace, - const OptionalItemCallback &callback) override; - - Status AsyncListNamedActors( - bool all_namespaces, const std::string &ray_namespace, - const ItemCallback> &callback) override; - - Status AsyncRegisterActor(const TaskSpecification &task_spec, - const StatusCallback &callback) override; - - Status AsyncCreateActor( - const TaskSpecification &task_spec, - const rpc::ClientCallback &callback) override; - - Status AsyncKillActor(const ActorID &actor_id, bool force_kill, bool no_restart, - const StatusCallback &callback) override; - - Status AsyncSubscribeAll( - const SubscribeCallback &subscribe, - const StatusCallback &done) override; - - Status AsyncSubscribe(const ActorID &actor_id, - const SubscribeCallback &subscribe, - const StatusCallback &done) override; - - Status AsyncUnsubscribe(const ActorID &actor_id) override; - - void AsyncResubscribe(bool is_pubsub_server_restarted) override; - - bool IsActorUnsubscribed(const ActorID &actor_id) override; - - private: - /// Save the subscribe operation in this function, so we can call it again when PubSub - /// server restarts from a failure. - SubscribeOperation subscribe_all_operation_; - - /// Save the fetch data operation in this function, so we can call it again when GCS - /// server restarts from a failure. - FetchDataOperation fetch_all_data_operation_; - - // Mutex to protect the subscribe_operations_ field and fetch_data_operations_ field. - absl::Mutex mutex_; - - /// Save the subscribe operation of actors. - std::unordered_map subscribe_operations_ - GUARDED_BY(mutex_); - - /// Save the fetch data operation of actors. - std::unordered_map fetch_data_operations_ - GUARDED_BY(mutex_); - - ServiceBasedGcsClient *client_impl_; -}; - -/// \class ServiceBasedNodeInfoAccessor -/// ServiceBasedNodeInfoAccessor is an implementation of `NodeInfoAccessor` -/// that uses GCS Service as the backend. -class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor { - public: - explicit ServiceBasedNodeInfoAccessor(ServiceBasedGcsClient *client_impl); - - virtual ~ServiceBasedNodeInfoAccessor() = default; - - Status RegisterSelf(const rpc::GcsNodeInfo &local_node_info, - const StatusCallback &callback) override; - - Status DrainSelf() override; - - const NodeID &GetSelfId() const override; - - const rpc::GcsNodeInfo &GetSelfInfo() const override; - - Status AsyncRegister(const rpc::GcsNodeInfo &node_info, - const StatusCallback &callback) override; - - Status AsyncDrainNode(const NodeID &node_id, const StatusCallback &callback) override; - - Status AsyncGetAll(const MultiItemCallback &callback) override; - - Status AsyncSubscribeToNodeChange( - const SubscribeCallback &subscribe, - const StatusCallback &done) override; - - const rpc::GcsNodeInfo *Get(const NodeID &node_id, - bool filter_dead_nodes = false) const override; - - const std::unordered_map &GetAll() const override; - - bool IsRemoved(const NodeID &node_id) const override; - - Status AsyncReportHeartbeat(const std::shared_ptr &data_ptr, - const StatusCallback &callback) override; - - void AsyncResubscribe(bool is_pubsub_server_restarted) override; - - Status AsyncGetInternalConfig( - const OptionalItemCallback &callback) override; - - private: - /// Save the subscribe operation in this function, so we can call it again when PubSub - /// server restarts from a failure. - SubscribeOperation subscribe_node_operation_; - - /// Save the fetch data operation in this function, so we can call it again when GCS - /// server restarts from a failure. - FetchDataOperation fetch_node_data_operation_; - - void HandleNotification(const rpc::GcsNodeInfo &node_info); - - ServiceBasedGcsClient *client_impl_; - - using NodeChangeCallback = - std::function; - - rpc::GcsNodeInfo local_node_info_; - NodeID local_node_id_; - - /// The callback to call when a new node is added or a node is removed. - NodeChangeCallback node_change_callback_{nullptr}; - - /// A cache for information about all nodes. - std::unordered_map node_cache_; - /// The set of removed nodes. - std::unordered_set removed_nodes_; -}; - -/// \class ServiceBasedNodeResourceInfoAccessor -/// ServiceBasedNodeResourceInfoAccessor is an implementation of -/// `NodeResourceInfoAccessor` that uses GCS Service as the backend. -class ServiceBasedNodeResourceInfoAccessor : public NodeResourceInfoAccessor { - public: - explicit ServiceBasedNodeResourceInfoAccessor(ServiceBasedGcsClient *client_impl); - - virtual ~ServiceBasedNodeResourceInfoAccessor() = default; - - Status AsyncGetResources(const NodeID &node_id, - const OptionalItemCallback &callback) override; - - Status AsyncGetAllAvailableResources( - const MultiItemCallback &callback) override; - - Status AsyncUpdateResources(const NodeID &node_id, const ResourceMap &resources, - const StatusCallback &callback) override; - - Status AsyncDeleteResources(const NodeID &node_id, - const std::vector &resource_names, - const StatusCallback &callback) override; - - Status AsyncSubscribeToResources(const ItemCallback &subscribe, - const StatusCallback &done) override; - - Status AsyncReportResourceUsage(const std::shared_ptr &data_ptr, - const StatusCallback &callback) override; - - void AsyncReReportResourceUsage() override; - - /// Fill resource fields with cached resources. Used by light resource usage report. - void FillResourceUsageRequest(rpc::ReportResourceUsageRequest &resource_usage); - - Status AsyncGetAllResourceUsage( - const ItemCallback &callback) override; - - Status AsyncSubscribeBatchedResourceUsage( - const ItemCallback &subscribe, - const StatusCallback &done) override; - - void AsyncResubscribe(bool is_pubsub_server_restarted) override; - - private: - // Mutex to protect the cached_resource_usage_ field. - absl::Mutex mutex_; - - /// Save the resource usage data, so we can resend it again when GCS server restarts - /// from a failure. - rpc::ReportResourceUsageRequest cached_resource_usage_ GUARDED_BY(mutex_); - - /// Save the subscribe operation in this function, so we can call it again when PubSub - /// server restarts from a failure. - SubscribeOperation subscribe_resource_operation_; - SubscribeOperation subscribe_batch_resource_usage_operation_; - - ServiceBasedGcsClient *client_impl_; - - Sequencer sequencer_; -}; - -/// \class ServiceBasedTaskInfoAccessor -/// ServiceBasedTaskInfoAccessor is an implementation of `TaskInfoAccessor` -/// that uses GCS service as the backend. -class ServiceBasedTaskInfoAccessor : public TaskInfoAccessor { - public: - explicit ServiceBasedTaskInfoAccessor(ServiceBasedGcsClient *client_impl); - - virtual ~ServiceBasedTaskInfoAccessor() = default; - - Status AsyncAdd(const std::shared_ptr &data_ptr, - const StatusCallback &callback) override; - - Status AsyncGet(const TaskID &task_id, - const OptionalItemCallback &callback) override; - - Status AsyncAddTaskLease(const std::shared_ptr &data_ptr, - const StatusCallback &callback) override; - - Status AsyncGetTaskLease( - const TaskID &task_id, - const OptionalItemCallback &callback) override; - - Status AsyncSubscribeTaskLease( - const TaskID &task_id, - const SubscribeCallback> &subscribe, - const StatusCallback &done) override; - - Status AsyncUnsubscribeTaskLease(const TaskID &task_id) override; - - Status AttemptTaskReconstruction( - const std::shared_ptr &data_ptr, - const StatusCallback &callback) override; - - void AsyncResubscribe(bool is_pubsub_server_restarted) override; - - bool IsTaskLeaseUnsubscribed(const TaskID &task_id) override; - - private: - /// Save the subscribe operations, so we can call them again when PubSub - /// server restarts from a failure. - std::unordered_map subscribe_task_lease_operations_; - - /// Save the fetch data operation in this function, so we can call it again when GCS - /// server restarts from a failure. - std::unordered_map fetch_task_lease_data_operations_; - - ServiceBasedGcsClient *client_impl_; -}; - -/// \class ServiceBasedObjectInfoAccessor -/// ServiceBasedObjectInfoAccessor is an implementation of `ObjectInfoAccessor` -/// that uses GCS service as the backend. -class ServiceBasedObjectInfoAccessor : public ObjectInfoAccessor { - public: - explicit ServiceBasedObjectInfoAccessor(ServiceBasedGcsClient *client_impl); - - virtual ~ServiceBasedObjectInfoAccessor() = default; - - Status AsyncGetLocations( - const ObjectID &object_id, - const OptionalItemCallback &callback) override; - - Status AsyncGetAll(const MultiItemCallback &callback) override; - - Status AsyncAddLocation(const ObjectID &object_id, const NodeID &node_id, - size_t object_size, const StatusCallback &callback) override; - - Status AsyncAddSpilledUrl(const ObjectID &object_id, const std::string &spilled_url, - const NodeID &node_id, size_t object_size, - const StatusCallback &callback) override; - - Status AsyncRemoveLocation(const ObjectID &object_id, const NodeID &node_id, - const StatusCallback &callback) override; - - Status AsyncSubscribeToLocations( - const ObjectID &object_id, - const SubscribeCallback> - &subscribe, - const StatusCallback &done) override; - - Status AsyncUnsubscribeToLocations(const ObjectID &object_id) override; - - void AsyncResubscribe(bool is_pubsub_server_restarted) override; - - bool IsObjectUnsubscribed(const ObjectID &object_id) override; - - private: - // Mutex to protect the subscribe_object_operations_ field and - // fetch_object_data_operations_ field. - absl::Mutex mutex_; - - /// Save the subscribe operations, so we can call them again when PubSub - /// server restarts from a failure. - std::unordered_map subscribe_object_operations_ - GUARDED_BY(mutex_); - - /// Save the fetch data operation in this function, so we can call it again when GCS - /// server restarts from a failure. - std::unordered_map fetch_object_data_operations_ - GUARDED_BY(mutex_); - - ServiceBasedGcsClient *client_impl_; - - Sequencer sequencer_; -}; - -/// \class ServiceBasedStatsInfoAccessor -/// ServiceBasedStatsInfoAccessor is an implementation of `StatsInfoAccessor` -/// that uses GCS Service as the backend. -class ServiceBasedStatsInfoAccessor : public StatsInfoAccessor { - public: - explicit ServiceBasedStatsInfoAccessor(ServiceBasedGcsClient *client_impl); - - virtual ~ServiceBasedStatsInfoAccessor() = default; - - Status AsyncAddProfileData(const std::shared_ptr &data_ptr, - const StatusCallback &callback) override; - - Status AsyncGetAll(const MultiItemCallback &callback) override; - - private: - ServiceBasedGcsClient *client_impl_; -}; - -/// \class ServiceBasedErrorInfoAccessor -/// ServiceBasedErrorInfoAccessor is an implementation of `ErrorInfoAccessor` -/// that uses GCS Service as the backend. -class ServiceBasedErrorInfoAccessor : public ErrorInfoAccessor { - public: - explicit ServiceBasedErrorInfoAccessor(ServiceBasedGcsClient *client_impl); - - virtual ~ServiceBasedErrorInfoAccessor() = default; - - Status AsyncReportJobError(const std::shared_ptr &data_ptr, - const StatusCallback &callback) override; - - private: - ServiceBasedGcsClient *client_impl_; -}; - -/// \class ServiceBasedWorkerInfoAccessor -/// ServiceBasedWorkerInfoAccessor is an implementation of `WorkerInfoAccessor` -/// that uses GCS Service as the backend. -class ServiceBasedWorkerInfoAccessor : public WorkerInfoAccessor { - public: - explicit ServiceBasedWorkerInfoAccessor(ServiceBasedGcsClient *client_impl); - - virtual ~ServiceBasedWorkerInfoAccessor() = default; - - Status AsyncSubscribeToWorkerFailures( - const ItemCallback &subscribe, - const StatusCallback &done) override; - - Status AsyncReportWorkerFailure(const std::shared_ptr &data_ptr, - const StatusCallback &callback) override; - - Status AsyncGet(const WorkerID &worker_id, - const OptionalItemCallback &callback) override; - - Status AsyncGetAll(const MultiItemCallback &callback) override; - - Status AsyncAdd(const std::shared_ptr &data_ptr, - const StatusCallback &callback) override; - - void AsyncResubscribe(bool is_pubsub_server_restarted) override; - - private: - /// Save the subscribe operation in this function, so we can call it again when GCS - /// restarts from a failure. - SubscribeOperation subscribe_operation_; - - ServiceBasedGcsClient *client_impl_; -}; - -/// \class ServiceBasedPlacementGroupInfoAccessor -/// ServiceBasedPlacementGroupInfoAccessor is an implementation of -/// `PlacementGroupInfoAccessor` that uses GCS Service as the backend. - -class ServiceBasedPlacementGroupInfoAccessor : public PlacementGroupInfoAccessor { - // TODO(AlisaWu):fill the ServiceAccessor. - public: - explicit ServiceBasedPlacementGroupInfoAccessor(ServiceBasedGcsClient *client_impl); - - virtual ~ServiceBasedPlacementGroupInfoAccessor() = default; - - Status AsyncCreatePlacementGroup( - const PlacementGroupSpecification &placement_group_spec, - const StatusCallback &callback) override; - - Status AsyncRemovePlacementGroup(const PlacementGroupID &placement_group_id, - const StatusCallback &callback) override; - - Status AsyncGet( - const PlacementGroupID &placement_group_id, - const OptionalItemCallback &callback) override; - - Status AsyncGetByName( - const std::string &name, const std::string &ray_namespace, - const OptionalItemCallback &callback) override; - - Status AsyncGetAll( - const MultiItemCallback &callback) override; - - Status AsyncWaitUntilReady(const PlacementGroupID &placement_group_id, - const StatusCallback &callback) override; - - private: - ServiceBasedGcsClient *client_impl_; -}; - -class ServiceBasedInternalKVAccessor : public InternalKVAccessor { - public: - explicit ServiceBasedInternalKVAccessor(ServiceBasedGcsClient *client_impl); - ~ServiceBasedInternalKVAccessor() override = default; - - Status AsyncInternalKVKeys( - const std::string &prefix, - const OptionalItemCallback> &callback) override; - Status AsyncInternalKVGet(const std::string &key, - const OptionalItemCallback &callback) override; - Status AsyncInternalKVPut(const std::string &key, const std::string &value, - bool overwrite, - const OptionalItemCallback &callback) override; - Status AsyncInternalKVExists(const std::string &key, - const OptionalItemCallback &callback) override; - Status AsyncInternalKVDel(const std::string &key, - const StatusCallback &callback) override; - - private: - ServiceBasedGcsClient *client_impl_; -}; - -} // namespace gcs -} // namespace ray diff --git a/src/ray/gcs/gcs_client/service_based_gcs_client.h b/src/ray/gcs/gcs_client/service_based_gcs_client.h deleted file mode 100644 index c6236c0e2..000000000 --- a/src/ray/gcs/gcs_client/service_based_gcs_client.h +++ /dev/null @@ -1,90 +0,0 @@ -// Copyright 2017 The Ray Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include - -#include "ray/common/asio/instrumented_io_context.h" -#include "ray/common/asio/periodical_runner.h" -#include "ray/common/id.h" -#include "ray/gcs/gcs_client.h" -#include "ray/gcs/pubsub/gcs_pub_sub.h" -#include "ray/gcs/redis_client.h" -#include "ray/rpc/gcs_server/gcs_rpc_client.h" - -namespace ray { -namespace gcs { - -class RAY_EXPORT ServiceBasedGcsClient : public GcsClient { - public: - explicit ServiceBasedGcsClient(const GcsClientOptions &options, - std::function *)> - get_gcs_server_address_func = {}); - - Status Connect(instrumented_io_context &io_service) override; - - void Disconnect() override; - - GcsSubscriber &GetGcsSubscriber() { return *gcs_subscriber_; } - - rpc::GcsRpcClient &GetGcsRpcClient() { return *gcs_rpc_client_; } - - std::pair GetGcsServerAddress() override; - - private: - /// Get gcs server address from redis. - /// This address is set by GcsServer::StoreGcsServerAddressInRedis function. - /// - /// \param context The context of redis. - /// \param address The address of gcs server. - /// \param max_attempts The maximum number of times to get gcs server rpc address. - /// \return Returns true if gcs server address is obtained, False otherwise. - bool GetGcsServerAddressFromRedis(redisContext *context, - std::pair *address, - int max_attempts = 1); - - /// Fire a periodic timer to check if GCS sever address has changed. - void PeriodicallyCheckGcsServerAddress(); - - /// This function is used to redo subscription and reconnect to GCS RPC server when gcs - /// service failure is detected. - /// - /// \param type The type of GCS service failure. - void GcsServiceFailureDetected(rpc::GcsServiceFailureType type); - - /// Reconnect to GCS RPC server. - void ReconnectGcsServer(); - - const UniqueID gcs_client_id_ = UniqueID::FromRandom(); - - std::shared_ptr redis_client_; - - std::unique_ptr gcs_subscriber_; - - // Gcs rpc client - std::shared_ptr gcs_rpc_client_; - std::unique_ptr client_call_manager_; - - // The runner to run function periodically. - std::unique_ptr periodical_runner_; - std::function *)> get_server_address_func_; - std::function resubscribe_func_; - std::pair current_gcs_server_address_; - int64_t last_reconnect_timestamp_ms_; - std::pair last_reconnect_address_; -}; - -} // namespace gcs -} // namespace ray diff --git a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc b/src/ray/gcs/gcs_client/test/gcs_client_test.cc similarity index 96% rename from src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc rename to src/ray/gcs/gcs_client/test/gcs_client_test.cc index 8c4617cff..18dc4ec6f 100644 --- a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/gcs_client_test.cc @@ -12,13 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "ray/gcs/gcs_client/service_based_gcs_client.h" +#include "ray/gcs/gcs_client/gcs_client.h" #include "absl/strings/substitute.h" #include "gtest/gtest.h" #include "ray/common/asio/instrumented_io_context.h" #include "ray/common/test_util.h" -#include "ray/gcs/gcs_client/service_based_accessor.h" +#include "ray/gcs/gcs_client/accessor.h" #include "ray/gcs/gcs_server/gcs_server.h" #include "ray/gcs/test/gcs_test_util.h" #include "ray/rpc/gcs_server/gcs_rpc_client.h" @@ -26,9 +26,9 @@ namespace ray { -class ServiceBasedGcsClientTest : public ::testing::TestWithParam { +class GcsClientTest : public ::testing::TestWithParam { public: - ServiceBasedGcsClientTest() { + GcsClientTest() { RayConfig::instance().initialize(absl::Substitute(R"( { "ping_gcs_rpc_server_max_retries": 60, @@ -41,7 +41,7 @@ class ServiceBasedGcsClientTest : public ::testing::TestWithParam { TestSetupUtil::StartUpRedisServers(std::vector()); } - virtual ~ServiceBasedGcsClientTest() { TestSetupUtil::ShutDownRedisServers(); } + virtual ~GcsClientTest() { TestSetupUtil::ShutDownRedisServers(); } protected: void SetUp() override { @@ -81,7 +81,7 @@ class ServiceBasedGcsClientTest : public ::testing::TestWithParam { // Create GCS client. gcs::GcsClientOptions options(config_.redis_address, config_.redis_port, config_.redis_password); - gcs_client_.reset(new gcs::ServiceBasedGcsClient(options)); + gcs_client_.reset(new gcs::GcsClient(options)); RAY_CHECK_OK(gcs_client_->Connect(*client_io_service_)); } @@ -575,9 +575,9 @@ class ServiceBasedGcsClientTest : public ::testing::TestWithParam { const std::chrono::milliseconds timeout_ms_{2000}; }; -INSTANTIATE_TEST_SUITE_P(RedisMigration, ServiceBasedGcsClientTest, testing::Bool()); +INSTANTIATE_TEST_SUITE_P(RedisMigration, GcsClientTest, testing::Bool()); -TEST_P(ServiceBasedGcsClientTest, TestJobInfo) { +TEST_P(GcsClientTest, TestJobInfo) { // Create job table data. JobID add_job_id = JobID::FromInt(1); auto job_table_data = Mocker::GenJobTableData(add_job_id); @@ -594,13 +594,13 @@ TEST_P(ServiceBasedGcsClientTest, TestJobInfo) { WaitForExpectedCount(job_updates, 2); } -TEST_P(ServiceBasedGcsClientTest, TestGetNextJobID) { +TEST_P(GcsClientTest, TestGetNextJobID) { JobID job_id1 = GetNextJobID(); JobID job_id2 = GetNextJobID(); ASSERT_TRUE(job_id1.ToInt() + 1 == job_id2.ToInt()); } -TEST_P(ServiceBasedGcsClientTest, TestActorSubscribeAll) { +TEST_P(GcsClientTest, TestActorSubscribeAll) { // NOTE: `TestActorSubscribeAll` will subscribe to all actor messages, so we need to // execute it before `TestActorInfo`, otherwise `TestActorSubscribeAll` will receive // messages from `TestActorInfo`. @@ -630,7 +630,7 @@ TEST_P(ServiceBasedGcsClientTest, TestActorSubscribeAll) { EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count())); } -TEST_P(ServiceBasedGcsClientTest, TestActorInfo) { +TEST_P(GcsClientTest, TestActorInfo) { // Create actor table data. JobID job_id = JobID::FromInt(1); AddJob(job_id); @@ -654,7 +654,7 @@ TEST_P(ServiceBasedGcsClientTest, TestActorInfo) { WaitForActorUnsubscribed(actor_id); } -TEST_P(ServiceBasedGcsClientTest, TestNodeInfo) { +TEST_P(GcsClientTest, TestNodeInfo) { // Create gcs node info. auto gcs_node1_info = Mocker::GenNodeInfo(); NodeID node1_id = NodeID::FromBinary(gcs_node1_info->node_id()); @@ -708,7 +708,7 @@ TEST_P(ServiceBasedGcsClientTest, TestNodeInfo) { ASSERT_TRUE(gcs_client_->Nodes().IsRemoved(node2_id)); } -TEST_P(ServiceBasedGcsClientTest, TestNodeResources) { +TEST_P(GcsClientTest, TestNodeResources) { // Subscribe to node resource changes. std::atomic add_count(0); std::atomic remove_count(0); @@ -739,7 +739,7 @@ TEST_P(ServiceBasedGcsClientTest, TestNodeResources) { ASSERT_TRUE(GetResources(node_id).empty()); } -TEST_P(ServiceBasedGcsClientTest, TestNodeResourceUsage) { +TEST_P(GcsClientTest, TestNodeResourceUsage) { // Subscribe batched state of all nodes from GCS. std::atomic resource_batch_count(0); auto on_subscribe = [&resource_batch_count](const gcs::ResourceUsageBatchData &result) { @@ -768,7 +768,7 @@ TEST_P(ServiceBasedGcsClientTest, TestNodeResourceUsage) { resource_value); } -TEST_P(ServiceBasedGcsClientTest, TestNodeResourceUsageWithLightResourceUsageReport) { +TEST_P(GcsClientTest, TestNodeResourceUsageWithLightResourceUsageReport) { // Subscribe batched state of all nodes from GCS. std::atomic resource_batch_count(0); auto on_subscribe = [&resource_batch_count](const gcs::ResourceUsageBatchData &result) { @@ -795,7 +795,7 @@ TEST_P(ServiceBasedGcsClientTest, TestNodeResourceUsageWithLightResourceUsageRep WaitForExpectedCount(resource_batch_count, 1); } -TEST_P(ServiceBasedGcsClientTest, TestGetAllAvailableResources) { +TEST_P(GcsClientTest, TestGetAllAvailableResources) { // Subscribe batched state of all nodes from GCS. std::atomic resource_batch_count(0); auto on_subscribe = [&resource_batch_count](const gcs::ResourceUsageBatchData &result) { @@ -826,8 +826,7 @@ TEST_P(ServiceBasedGcsClientTest, TestGetAllAvailableResources) { EXPECT_EQ((*resources[0].mutable_resources_available())["GPU"], 10.0); } -TEST_P(ServiceBasedGcsClientTest, - TestGetAllAvailableResourcesWithLightResourceUsageReport) { +TEST_P(GcsClientTest, TestGetAllAvailableResourcesWithLightResourceUsageReport) { // Subscribe batched state of all nodes from GCS. std::atomic resource_batch_count(0); auto on_subscribe = [&resource_batch_count](const gcs::ResourceUsageBatchData &result) { @@ -871,7 +870,7 @@ TEST_P(ServiceBasedGcsClientTest, EXPECT_EQ((*resources1[0].mutable_resources_available())["GPU"], 10.0); } -TEST_P(ServiceBasedGcsClientTest, TestTaskInfo) { +TEST_P(GcsClientTest, TestTaskInfo) { JobID job_id = JobID::FromInt(1); AddJob(job_id); TaskID task_id = TaskID::ForDriverTask(job_id); @@ -916,7 +915,7 @@ TEST_P(ServiceBasedGcsClientTest, TestTaskInfo) { ASSERT_TRUE(AttemptTaskReconstruction(task_reconstruction_data)); } -TEST_P(ServiceBasedGcsClientTest, TestObjectInfo) { +TEST_P(GcsClientTest, TestObjectInfo) { ObjectID object_id = ObjectID::FromRandom(); NodeID node_id = NodeID::FromRandom(); @@ -962,14 +961,14 @@ TEST_P(ServiceBasedGcsClientTest, TestObjectInfo) { ASSERT_EQ(object_add_count, 1); } -TEST_P(ServiceBasedGcsClientTest, TestStats) { +TEST_P(GcsClientTest, TestStats) { // Add profile data to GCS. NodeID node_id = NodeID::FromRandom(); auto profile_table_data = Mocker::GenProfileTableData(node_id); ASSERT_TRUE(AddProfileData(profile_table_data)); } -TEST_P(ServiceBasedGcsClientTest, TestWorkerInfo) { +TEST_P(GcsClientTest, TestWorkerInfo) { // Subscribe to all unexpected failure of workers from GCS. std::atomic worker_failure_count(0); auto on_subscribe = [&worker_failure_count](const rpc::WorkerDeltaData &result) { @@ -991,14 +990,14 @@ TEST_P(ServiceBasedGcsClientTest, TestWorkerInfo) { WaitForExpectedCount(worker_failure_count, 2); } -TEST_P(ServiceBasedGcsClientTest, TestErrorInfo) { +TEST_P(GcsClientTest, TestErrorInfo) { // Report a job error to GCS. JobID job_id = JobID::FromInt(1); auto error_table_data = Mocker::GenErrorTableData(job_id); ASSERT_TRUE(ReportJobError(error_table_data)); } -TEST_P(ServiceBasedGcsClientTest, TestJobTableResubscribe) { +TEST_P(GcsClientTest, TestJobTableResubscribe) { // Test that subscription of the job table can still work when GCS server restarts. JobID job_id = JobID::FromInt(1); auto job_table_data = Mocker::GenJobTableData(job_id); @@ -1022,7 +1021,7 @@ TEST_P(ServiceBasedGcsClientTest, TestJobTableResubscribe) { WaitForExpectedCount(job_update_count, 3); } -TEST_P(ServiceBasedGcsClientTest, TestActorTableResubscribe) { +TEST_P(GcsClientTest, TestActorTableResubscribe) { // Test that subscription of the actor table can still work when GCS server restarts. JobID job_id = JobID::FromInt(1); AddJob(job_id); @@ -1115,7 +1114,7 @@ TEST_P(ServiceBasedGcsClientTest, TestActorTableResubscribe) { EXPECT_TRUE(WaitForCondition(condition_subscribe_all_restart, timeout_ms_.count())); } -TEST_P(ServiceBasedGcsClientTest, TestObjectTableResubscribe) { +TEST_P(GcsClientTest, TestObjectTableResubscribe) { ObjectID object1_id = ObjectID::FromRandom(); ObjectID object2_id = ObjectID::FromRandom(); NodeID node_id = NodeID::FromRandom(); @@ -1163,7 +1162,7 @@ TEST_P(ServiceBasedGcsClientTest, TestObjectTableResubscribe) { WaitForExpectedCount(object2_change_count, 3); } -TEST_P(ServiceBasedGcsClientTest, TestNodeTableResubscribe) { +TEST_P(GcsClientTest, TestNodeTableResubscribe) { // Test that subscription of the node table can still work when GCS server restarts. // Subscribe to node addition and removal events from GCS and cache those information. std::atomic node_change_count(0); @@ -1215,7 +1214,7 @@ TEST_P(ServiceBasedGcsClientTest, TestNodeTableResubscribe) { WaitForExpectedCount(batch_resource_usage_count, 2); } -TEST_P(ServiceBasedGcsClientTest, TestTaskTableResubscribe) { +TEST_P(GcsClientTest, TestTaskTableResubscribe) { JobID job_id = JobID::FromInt(6); AddJob(job_id); TaskID task_id = TaskID::ForDriverTask(job_id); @@ -1246,7 +1245,7 @@ TEST_P(ServiceBasedGcsClientTest, TestTaskTableResubscribe) { WaitForExpectedCount(task_lease_count, 3); } -TEST_P(ServiceBasedGcsClientTest, TestWorkerTableResubscribe) { +TEST_P(GcsClientTest, TestWorkerTableResubscribe) { // Subscribe to all unexpected failure of workers from GCS. std::atomic worker_failure_count(0); auto on_subscribe = [&worker_failure_count](const rpc::WorkerDeltaData &result) { @@ -1267,7 +1266,7 @@ TEST_P(ServiceBasedGcsClientTest, TestWorkerTableResubscribe) { WaitForExpectedCount(worker_failure_count, 1); } -TEST_P(ServiceBasedGcsClientTest, TestGcsTableReload) { +TEST_P(GcsClientTest, TestGcsTableReload) { ObjectID object_id = ObjectID::FromRandom(); NodeID node_id = NodeID::FromRandom(); @@ -1291,7 +1290,7 @@ TEST_P(ServiceBasedGcsClientTest, TestGcsTableReload) { ASSERT_EQ(locations.back().manager(), node_id.Binary()); } -TEST_P(ServiceBasedGcsClientTest, TestGcsRedisFailureDetector) { +TEST_P(GcsClientTest, TestGcsRedisFailureDetector) { // Stop redis. TestSetupUtil::ShutDownRedisServers(); @@ -1304,7 +1303,7 @@ TEST_P(ServiceBasedGcsClientTest, TestGcsRedisFailureDetector) { RAY_CHECK(gcs_server_->IsStopped()); } -TEST_P(ServiceBasedGcsClientTest, TestMultiThreadSubAndUnsub) { +TEST_P(GcsClientTest, TestMultiThreadSubAndUnsub) { auto sub_finished_count = std::make_shared>(0); int size = 5; std::vector> threads; @@ -1350,7 +1349,7 @@ TEST_P(ServiceBasedGcsClientTest, TestMultiThreadSubAndUnsub) { // This UT is only used to test the query actor info performance. // We disable it by default. -TEST_P(ServiceBasedGcsClientTest, DISABLED_TestGetActorPerf) { +TEST_P(GcsClientTest, DISABLED_TestGetActorPerf) { // Register actors. JobID job_id = JobID::FromInt(1); AddJob(job_id); @@ -1379,7 +1378,7 @@ TEST_P(ServiceBasedGcsClientTest, DISABLED_TestGetActorPerf) { << actor_count << " actors."; } -TEST_P(ServiceBasedGcsClientTest, TestEvictExpiredDestroyedActors) { +TEST_P(GcsClientTest, TestEvictExpiredDestroyedActors) { // Register actors and the actors will be destroyed. JobID job_id = JobID::FromInt(1); AddJob(job_id); @@ -1415,7 +1414,7 @@ TEST_P(ServiceBasedGcsClientTest, TestEvictExpiredDestroyedActors) { } } -TEST_P(ServiceBasedGcsClientTest, TestEvictExpiredDeadNodes) { +TEST_P(GcsClientTest, TestEvictExpiredDeadNodes) { // Simulate the scenario of node dead. int node_count = RayConfig::instance().maximum_gcs_dead_node_cached_count(); RegisterNodeAndMarkDead(node_count); diff --git a/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc b/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc index 0aaa39bef..ecb5ff490 100644 --- a/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc +++ b/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc @@ -54,7 +54,7 @@ class GlobalStateAccessorTest : public ::testing::Test { // Create GCS client. gcs::GcsClientOptions options(config.redis_address, config.redis_port, config.redis_password); - gcs_client_.reset(new gcs::ServiceBasedGcsClient(options)); + gcs_client_.reset(new gcs::GcsClient(options)); RAY_CHECK_OK(gcs_client_->Connect(*io_service_)); // Create global state. diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.h b/src/ray/gcs/gcs_server/gcs_actor_scheduler.h index bea9bf793..7782d5808 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.h @@ -22,7 +22,6 @@ #include "ray/common/id.h" #include "ray/common/task/task_execution_spec.h" #include "ray/common/task/task_spec.h" -#include "ray/gcs/accessor.h" #include "ray/gcs/gcs_server/gcs_node_manager.h" #include "ray/gcs/gcs_server/gcs_table_storage.h" #include "ray/raylet_client/raylet_client.h" diff --git a/src/ray/gcs/gcs_server/gcs_heartbeat_manager.h b/src/ray/gcs/gcs_server/gcs_heartbeat_manager.h index e9b01ba59..eb10d5d57 100644 --- a/src/ray/gcs/gcs_server/gcs_heartbeat_manager.h +++ b/src/ray/gcs/gcs_server/gcs_heartbeat_manager.h @@ -19,7 +19,6 @@ #include "ray/common/asio/instrumented_io_context.h" #include "ray/common/asio/periodical_runner.h" #include "ray/common/id.h" -#include "ray/gcs/accessor.h" #include "ray/gcs/gcs_server/gcs_init_data.h" #include "ray/rpc/client_call.h" #include "ray/rpc/gcs_server/gcs_rpc_server.h" diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index a359ea57b..00040e659 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -17,7 +17,6 @@ #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" #include "ray/common/id.h" -#include "ray/gcs/accessor.h" #include "ray/gcs/gcs_server/gcs_init_data.h" #include "ray/gcs/gcs_server/gcs_resource_manager.h" #include "ray/gcs/gcs_server/gcs_table_storage.h" diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h index c4a552969..7bc150265 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h @@ -17,7 +17,6 @@ #include "absl/container/flat_hash_set.h" #include "ray/common/asio/instrumented_io_context.h" #include "ray/common/id.h" -#include "ray/gcs/accessor.h" #include "ray/gcs/gcs_server/gcs_node_manager.h" #include "ray/gcs/gcs_server/gcs_resource_manager.h" #include "ray/gcs/gcs_server/gcs_resource_scheduler.h" diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.h b/src/ray/gcs/gcs_server/gcs_resource_manager.h index a62d66642..7de781ac0 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.h +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.h @@ -19,7 +19,6 @@ #include "ray/common/asio/periodical_runner.h" #include "ray/common/id.h" #include "ray/common/task/scheduling_resources.h" -#include "ray/gcs/accessor.h" #include "ray/gcs/gcs_server/gcs_init_data.h" #include "ray/gcs/gcs_server/gcs_resource_manager.h" #include "ray/gcs/gcs_server/gcs_table_storage.h" diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index abf88d351..8b9e5c4b1 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -56,7 +56,7 @@ class GcsJobManager; class GcsWorkerManager; class GcsPlacementGroupManager; -/// The GcsServer will take over all requests from ServiceBasedGcsClient and transparent +/// The GcsServer will take over all requests from GcsClient and transparent /// transmit the command to the backend reliable storage for the time being. /// In the future, GCS server's main responsibility is to manage meta data /// and the management of actor creation. diff --git a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h index a36bea4ac..15ed784d7 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h +++ b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h @@ -21,6 +21,7 @@ #include "ray/common/task/task.h" #include "ray/common/task/task_util.h" #include "ray/common/test_util.h" +#include "ray/gcs/gcs_client/accessor.h" #include "ray/gcs/gcs_server/gcs_actor_distribution.h" #include "ray/gcs/gcs_server/gcs_actor_manager.h" #include "ray/gcs/gcs_server/gcs_actor_scheduler.h" diff --git a/src/ray/object_manager/object_directory.h b/src/ray/object_manager/object_directory.h index b773456f2..1a37d732d 100644 --- a/src/ray/object_manager/object_directory.h +++ b/src/ray/object_manager/object_directory.h @@ -23,7 +23,7 @@ #include "ray/common/asio/instrumented_io_context.h" #include "ray/common/id.h" #include "ray/common/status.h" -#include "ray/gcs/gcs_client.h" +#include "ray/gcs/gcs_client/gcs_client.h" #include "ray/object_manager/common.h" namespace ray { diff --git a/src/ray/object_manager/ownership_based_object_directory.h b/src/ray/object_manager/ownership_based_object_directory.h index 687b116ab..e8408c5e1 100644 --- a/src/ray/object_manager/ownership_based_object_directory.h +++ b/src/ray/object_manager/ownership_based_object_directory.h @@ -24,7 +24,7 @@ #include "ray/common/asio/instrumented_io_context.h" #include "ray/common/id.h" #include "ray/common/status.h" -#include "ray/gcs/gcs_client.h" +#include "ray/gcs/gcs_client/gcs_client.h" #include "ray/object_manager/object_directory.h" #include "ray/pubsub/subscriber.h" #include "ray/rpc/worker/core_worker_client.h" diff --git a/src/ray/object_manager/test/ownership_based_object_directory_test.cc b/src/ray/object_manager/test/ownership_based_object_directory_test.cc index 44effe7d6..f02694714 100644 --- a/src/ray/object_manager/test/ownership_based_object_directory_test.cc +++ b/src/ray/object_manager/test/ownership_based_object_directory_test.cc @@ -16,15 +16,14 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" - #include "ray/common/asio/instrumented_io_context.h" #include "ray/common/status.h" -#include "ray/gcs/gcs_client/service_based_accessor.h" -#include "ray/gcs/gcs_client/service_based_gcs_client.h" +#include "ray/gcs/gcs_client/accessor.h" +#include "ray/gcs/gcs_client/gcs_client.h" #include "ray/pubsub/mock_pubsub.h" // clang-format off -#include "mock/ray/gcs/accessor.h" +#include "mock/ray/gcs/gcs_client/accessor.h" // clang-format on namespace ray { diff --git a/src/ray/raylet/local_object_manager.h b/src/ray/raylet/local_object_manager.h index 83c2b5612..e55a99826 100644 --- a/src/ray/raylet/local_object_manager.h +++ b/src/ray/raylet/local_object_manager.h @@ -20,7 +20,7 @@ #include "ray/common/id.h" #include "ray/common/ray_object.h" -#include "ray/gcs/accessor.h" +#include "ray/gcs/gcs_client/accessor.h" #include "ray/object_manager/common.h" #include "ray/pubsub/subscriber.h" #include "ray/raylet/worker_pool.h" diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 2cb648b57..4d6c1eb7f 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -20,7 +20,7 @@ #include "ray/common/ray_config.h" #include "ray/common/status.h" #include "ray/common/task/task_common.h" -#include "ray/gcs/gcs_client/service_based_gcs_client.h" +#include "ray/gcs/gcs_client/gcs_client.h" #include "ray/raylet/raylet.h" #include "ray/stats/stats.h" #include "ray/util/event.h" @@ -128,7 +128,7 @@ int main(int argc, char *argv[]) { /*enable_async_conn=*/false, /*enable_subscribe_conn=*/true); std::shared_ptr gcs_client; - gcs_client = std::make_shared(client_options); + gcs_client = std::make_shared(client_options); RAY_CHECK_OK(gcs_client->Connect(main_service)); std::unique_ptr raylet(nullptr); diff --git a/src/ray/raylet/placement_group_resource_manager_test.cc b/src/ray/raylet/placement_group_resource_manager_test.cc index 76331a141..fbd097276 100644 --- a/src/ray/raylet/placement_group_resource_manager_test.cc +++ b/src/ray/raylet/placement_group_resource_manager_test.cc @@ -22,7 +22,7 @@ #include "ray/common/id.h" #include "ray/common/task/scheduling_resources.h" #include "ray/gcs/test/gcs_test_util.h" -#include "mock/ray/gcs/gcs_client.h" +#include "mock/ray/gcs/gcs_client/gcs_client.h" // clang-format on namespace ray { diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index 7a96768e2..0c8eb19fc 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -23,8 +23,8 @@ #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" #include "ray/common/task/scheduling_resources.h" -#include "ray/gcs/accessor.h" -#include "ray/gcs/gcs_client.h" +#include "ray/gcs/gcs_client/accessor.h" +#include "ray/gcs/gcs_client/gcs_client.h" #include "ray/raylet/scheduling/cluster_resource_data.h" #include "ray/raylet/scheduling/cluster_resource_scheduler_interface.h" #include "ray/raylet/scheduling/fixed_point.h" diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc index a672957c2..59e36bb11 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc @@ -22,7 +22,7 @@ #include "ray/common/ray_config.h" #include "ray/common/task/scheduling_resources.h" #include "ray/raylet/scheduling/scheduling_ids.h" -#include "mock/ray/gcs/gcs_client.h" +#include "mock/ray/gcs/gcs_client/gcs_client.h" #ifdef UNORDERED_VS_ABSL_MAPS_EVALUATION #include diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index 4e967071e..b7c89964d 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -29,7 +29,7 @@ #include "ray/raylet/scheduling/cluster_resource_scheduler.h" #include "ray/raylet/scheduling/scheduling_ids.h" #include "ray/raylet/test/util.h" -#include "mock/ray/gcs/gcs_client.h" +#include "mock/ray/gcs/gcs_client/gcs_client.h" #ifdef UNORDERED_VS_ABSL_MAPS_EVALUATION #include diff --git a/src/ray/raylet/scheduling/scheduling_policy.h b/src/ray/raylet/scheduling/scheduling_policy.h index 6e07d1feb..c377faf48 100644 --- a/src/ray/raylet/scheduling/scheduling_policy.h +++ b/src/ray/raylet/scheduling/scheduling_policy.h @@ -15,7 +15,7 @@ #include #include "ray/common/ray_config.h" -#include "ray/gcs/gcs_client.h" +#include "ray/gcs/gcs_client/gcs_client.h" #include "ray/raylet/scheduling/cluster_resource_data.h" namespace ray { diff --git a/src/ray/raylet/test/local_object_manager_test.cc b/src/ray/raylet/test/local_object_manager_test.cc index 7f54597f8..7b2cfb37c 100644 --- a/src/ray/raylet/test/local_object_manager_test.cc +++ b/src/ray/raylet/test/local_object_manager_test.cc @@ -18,7 +18,7 @@ #include "gtest/gtest.h" #include "ray/common/asio/instrumented_io_context.h" #include "ray/common/id.h" -#include "ray/gcs/accessor.h" +#include "ray/gcs/gcs_client/accessor.h" #include "ray/pubsub/subscriber.h" #include "ray/raylet/test/util.h" #include "ray/raylet/worker_pool.h" diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 2c0933980..203504702 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -30,7 +30,7 @@ #include "ray/common/client_connection.h" #include "ray/common/task/task.h" #include "ray/common/task/task_common.h" -#include "ray/gcs/gcs_client.h" +#include "ray/gcs/gcs_client/gcs_client.h" #include "ray/raylet/agent_manager.h" #include "ray/raylet/worker.h"