From 6c60dbb242bc83cba5ea36fea72187f87f7dd175 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Tue, 10 May 2022 13:12:22 -0700 Subject: [PATCH] [scheduler][6] Integrate ray with syncer. (#23660) The new syncer comes with the feature of long-polling and versioning. This PR integrates it with ray. --- .buildkite/pipeline.yml | 33 ++++- python/ray/includes/ray_config.pxd | 2 + python/ray/includes/ray_config.pxi | 4 + python/ray/tests/test_metrics_agent.py | 4 +- src/mock/ray/common/ray_syncer/ray_syncer.h | 2 +- src/mock/ray/raylet_client/raylet_client.h | 1 + src/ray/common/ray_config_def.h | 7 + src/ray/common/ray_syncer/ray_syncer-inl.h | 10 +- src/ray/common/ray_syncer/ray_syncer.cc | 54 ++++---- src/ray/common/ray_syncer/ray_syncer.h | 16 +-- src/ray/common/test/ray_syncer_test.cc | 73 +++++----- .../common/test/syncer_service_e2e_test.cc | 9 +- .../gcs_placement_group_scheduler.cc | 28 ++-- .../gcs_placement_group_scheduler.h | 4 +- .../gcs/gcs_server/gcs_resource_manager.cc | 23 +++- src/ray/gcs/gcs_server/gcs_resource_manager.h | 12 +- src/ray/gcs/gcs_server/gcs_server.cc | 83 +++++++++--- src/ray/gcs/gcs_server/gcs_server.h | 17 ++- .../test/gcs_actor_scheduler_test.cc | 4 +- .../gcs_placement_group_manager_mock_test.cc | 4 +- .../test/gcs_placement_group_manager_test.cc | 4 +- .../gcs_placement_group_scheduler_test.cc | 6 +- .../test/gcs_resource_manager_test.cc | 5 +- .../gcs_server/test/gcs_server_test_util.h | 2 + src/ray/protobuf/ray_syncer.proto | 10 +- src/ray/raylet/node_manager.cc | 128 ++++++++++++++---- src/ray/raylet/node_manager.h | 18 ++- .../scheduling/local_resource_manager.cc | 60 +++++++- .../scheduling/local_resource_manager.h | 9 +- src/ray/raylet_client/raylet_client.cc | 4 + src/ray/raylet_client/raylet_client.h | 4 + src/ray/rpc/grpc_client.h | 16 ++- src/ray/rpc/grpc_server.cc | 4 + src/ray/rpc/grpc_server.h | 1 + .../rpc/node_manager/node_manager_client.h | 4 + 35 files changed, 493 insertions(+), 172 deletions(-) diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 4dc58644b..0ed2d7170 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -301,7 +301,6 @@ --build_tests_only --test_tag_filters=release_unit release/... - - label: ":python: (Small & Client)" conditions: ["RAY_CI_PYTHON_AFFECTED"] commands: @@ -354,6 +353,38 @@ --test_env=CONDA_DEFAULT_ENV python/ray/tests/... +- label: ":construction: :python: (syncer) (Small & Client)" + conditions: ["RAY_CI_PYTHON_AFFECTED"] + commands: + - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT + - bazel test --config=ci $(./scripts/bazel_export_options) + --test_tag_filters=client_tests,small_size_python_tests + --test_env=RAY_use_ray_syncer=true + -- python/ray/tests/... +- label: ":construction: :python: (syncer) (Large)" + conditions: ["RAY_CI_PYTHON_AFFECTED"] + parallelism: 3 + commands: + - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT + - RAY_use_ray_syncer=true . ./ci/ci.sh test_large +- label: ":construction: :python: (syncer) (Medium A-J)" + conditions: ["RAY_CI_PYTHON_AFFECTED"] + commands: + - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT + - bazel test --config=ci $(./scripts/bazel_export_options) + --test_tag_filters=-kubernetes,medium_size_python_tests_a_to_j + --test_env=RAY_use_ray_syncer=true + python/ray/tests/... +- label: ":construction: :python: (syncer) (Medium K-Z)" + conditions: ["RAY_CI_PYTHON_AFFECTED"] + commands: + - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT + - bazel test --config=ci $(./scripts/bazel_export_options) + --test_tag_filters=-kubernetes,medium_size_python_tests_k_to_z + --test_env=RAY_use_ray_syncer=true + python/ray/tests/... + + # https://github.com/ray-project/ray/issues/22460 #- label: ":python: (Privileged test)" #conditions: ["RAY_CI_PYTHON_AFFECTED"] diff --git a/python/ray/includes/ray_config.pxd b/python/ray/includes/ray_config.pxd index 32838a642..e9ac6d2cc 100644 --- a/python/ray/includes/ray_config.pxd +++ b/python/ray/includes/ray_config.pxd @@ -66,3 +66,5 @@ cdef extern from "ray/common/ray_config.h" nogil: c_bool gcs_grpc_based_pubsub() const c_bool start_python_importer_thread() const + + c_bool use_ray_syncer() const diff --git a/python/ray/includes/ray_config.pxi b/python/ray/includes/ray_config.pxi index bf048fc03..c65bfbc29 100644 --- a/python/ray/includes/ray_config.pxi +++ b/python/ray/includes/ray_config.pxi @@ -107,3 +107,7 @@ cdef class Config: @staticmethod def start_python_importer_thread(): return RayConfig.instance().start_python_importer_thread() + + @staticmethod + def use_ray_syncer(): + return RayConfig.instance().use_ray_syncer() diff --git a/python/ray/tests/test_metrics_agent.py b/python/ray/tests/test_metrics_agent.py index 645d628ac..c8be6df8e 100644 --- a/python/ray/tests/test_metrics_agent.py +++ b/python/ray/tests/test_metrics_agent.py @@ -44,7 +44,6 @@ _METRICS = [ "ray_internal_num_spilled_tasks", # "ray_unintentional_worker_failures_total", # "ray_node_failure_total", - "ray_outbound_heartbeat_size_kb_sum", "ray_operation_count", "ray_operation_run_time_ms", "ray_operation_queue_time_ms", @@ -75,6 +74,9 @@ _METRICS = [ "ray_gcs_actors_count", ] +if not ray._raylet.Config.use_ray_syncer(): + _METRICS.append("ray_outbound_heartbeat_size_kb_sum") + # This list of metrics should be kept in sync with # ray/python/ray/autoscaler/_private/prom_metrics.py _AUTOSCALER_METRICS = [ diff --git a/src/mock/ray/common/ray_syncer/ray_syncer.h b/src/mock/ray/common/ray_syncer/ray_syncer.h index f18c9d4d3..0f768dab6 100644 --- a/src/mock/ray/common/ray_syncer/ray_syncer.h +++ b/src/mock/ray/common/ray_syncer/ray_syncer.h @@ -19,7 +19,7 @@ class MockReporterInterface : public ReporterInterface { public: MOCK_METHOD(std::optional, CreateSyncMessage, - (int64_t current_version, RayComponentId component_id), + (int64_t current_version, MessageType message_type), (const, override)); }; diff --git a/src/mock/ray/raylet_client/raylet_client.h b/src/mock/ray/raylet_client/raylet_client.h index 7b5e16a4d..0a1d716aa 100644 --- a/src/mock/ray/raylet_client/raylet_client.h +++ b/src/mock/ray/raylet_client/raylet_client.h @@ -132,6 +132,7 @@ class MockRayletClientInterface : public RayletClientInterface { WaitForDirectActorCallArgs, (const std::vector &references, int64_t tag), (override)); + MOCK_METHOD(std::shared_ptr, GetChannel, (), (const)); MOCK_METHOD(void, ReportWorkerBacklog, (const WorkerID &worker_id, diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 297e61d5b..12a4fe044 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -64,6 +64,10 @@ RAY_CONFIG(uint64_t, num_heartbeats_warning, 5) /// The duration between reporting resources sent by the raylets. RAY_CONFIG(uint64_t, raylet_report_resources_period_milliseconds, 100) + +/// The duration between raylet check memory pressure and send gc request +RAY_CONFIG(uint64_t, raylet_check_gc_period_milliseconds, 100) + /// For a raylet, if the last resource report was sent more than this many /// report periods ago, then a warning will be logged that the report /// handler is drifting. @@ -343,6 +347,9 @@ RAY_CONFIG(int32_t, gcs_rpc_server_reconnect_timeout_s, 60) /// Minimum interval between reconnecting gcs rpc server when gcs server restarts. RAY_CONFIG(int32_t, minimum_gcs_reconnect_interval_milliseconds, 5000) +/// Feature flag to use the ray syncer for resource synchronization +RAY_CONFIG(bool, use_ray_syncer, false) + /// The interval at which the gcs client will check if the address of gcs service has /// changed. When the address changed, we will resubscribe again. RAY_CONFIG(uint64_t, gcs_service_address_check_interval_milliseconds, 1000) diff --git a/src/ray/common/ray_syncer/ray_syncer-inl.h b/src/ray/common/ray_syncer/ray_syncer-inl.h index b7b2ed6cf..17948622f 100644 --- a/src/ray/common/ray_syncer/ray_syncer-inl.h +++ b/src/ray/common/ray_syncer/ray_syncer-inl.h @@ -25,7 +25,7 @@ class NodeState { /// Set the local component. /// - /// \param cid The component id. + /// \param message_type The type of the message for this component. /// \param reporter The reporter is defined to be the local module which wants to /// broadcast its internal status to the whole clsuter. When it's null, it means there /// is no reporter in the local node for this component. This is the place there @@ -36,16 +36,16 @@ class NodeState { /// received messages are consumed. /// /// \return true if set successfully. - bool SetComponent(RayComponentId cid, + bool SetComponent(MessageType message_type, const ReporterInterface *reporter, ReceiverInterface *receiver); /// Get the snapshot of a component for a newer version. /// - /// \param cid The component id to take the snapshot. + /// \param message_type The component to take the snapshot. /// /// \return If a snapshot is taken, return the message, otherwise std::nullopt. - std::optional CreateSyncMessage(RayComponentId cid); + std::optional CreateSyncMessage(MessageType message_type); /// Consume a message. Receiver will consume this message if it doesn't have /// this message. @@ -127,7 +127,7 @@ class NodeSyncConnection { std::function)> message_processor_; /// Buffering all the updates. Sending will be done in an async way. - absl::flat_hash_map, + absl::flat_hash_map, std::shared_ptr> sending_buffer_; diff --git a/src/ray/common/ray_syncer/ray_syncer.cc b/src/ray/common/ray_syncer/ray_syncer.cc index 3bb47098f..3972efd58 100644 --- a/src/ray/common/ray_syncer/ray_syncer.cc +++ b/src/ray/common/ray_syncer/ray_syncer.cc @@ -22,30 +22,30 @@ namespace syncer { NodeState::NodeState() { sync_message_versions_taken_.fill(-1); } -bool NodeState::SetComponent(RayComponentId cid, +bool NodeState::SetComponent(MessageType message_type, const ReporterInterface *reporter, ReceiverInterface *receiver) { - if (cid < static_cast(kComponentArraySize) && - reporters_[cid] == nullptr && receivers_[cid] == nullptr) { - reporters_[cid] = reporter; - receivers_[cid] = receiver; + if (message_type < static_cast(kComponentArraySize) && + reporters_[message_type] == nullptr && receivers_[message_type] == nullptr) { + reporters_[message_type] = reporter; + receivers_[message_type] = receiver; return true; } else { - RAY_LOG(FATAL) << "Fail to set components, component_id:" << cid + RAY_LOG(FATAL) << "Fail to set components, message_type:" << message_type << ", reporter:" << reporter << ", receiver:" << receiver; return false; } } -std::optional NodeState::CreateSyncMessage(RayComponentId cid) { - if (reporters_[cid] == nullptr) { +std::optional NodeState::CreateSyncMessage(MessageType message_type) { + if (reporters_[message_type] == nullptr) { return std::nullopt; } - auto message = - reporters_[cid]->CreateSyncMessage(sync_message_versions_taken_[cid], cid); + auto message = reporters_[message_type]->CreateSyncMessage( + sync_message_versions_taken_[message_type], message_type); if (message != std::nullopt) { - sync_message_versions_taken_[cid] = message->version(); - RAY_LOG(DEBUG) << "Sync message taken: cid:" << cid + sync_message_versions_taken_[message_type] = message->version(); + RAY_LOG(DEBUG) << "Sync message taken: message_type:" << message_type << ", version:" << message->version() << ", node:" << NodeID::FromBinary(message->node_id()); } @@ -53,7 +53,7 @@ std::optional NodeState::CreateSyncMessage(RayComponentId cid) { } bool NodeState::ConsumeSyncMessage(std::shared_ptr message) { - auto ¤t = cluster_view_[message->node_id()][message->component_id()]; + auto ¤t = cluster_view_[message->node_id()][message->message_type()]; RAY_LOG(DEBUG) << "ConsumeSyncMessage: " << (current ? current->version() : -1) << " message_version: " << message->version() @@ -64,7 +64,7 @@ bool NodeState::ConsumeSyncMessage(std::shared_ptr message } current = message; - auto receiver = receivers_[message->component_id()]; + auto receiver = receivers_[message->message_type()]; if (receiver != nullptr) { receiver->ConsumeSyncMessage(message); } @@ -83,11 +83,11 @@ void NodeSyncConnection::ReceiveUpdate(RaySyncMessages messages) { for (auto &message : *messages.mutable_sync_messages()) { auto &node_versions = GetNodeComponentVersions(message.node_id()); RAY_LOG(DEBUG) << "Receive update: " - << " component_id=" << message.component_id() + << " message_type=" << message.message_type() << ", message_version=" << message.version() - << ", local_message_version=" << node_versions[message.component_id()]; - if (node_versions[message.component_id()] < message.version()) { - node_versions[message.component_id()] = message.version(); + << ", local_message_version=" << node_versions[message.message_type()]; + if (node_versions[message.message_type()] < message.version()) { + node_versions[message.message_type()] = message.version(); message_processor_(std::make_shared(std::move(message))); } } @@ -104,9 +104,9 @@ bool NodeSyncConnection::PushToSendingQueue( } auto &node_versions = GetNodeComponentVersions(message->node_id()); - if (node_versions[message->component_id()] < message->version()) { - node_versions[message->component_id()] = message->version(); - sending_buffer_[std::make_pair(message->node_id(), message->component_id())] = + if (node_versions[message->message_type()] < message->version()) { + node_versions[message->message_type()] = message->version(); + sending_buffer_[std::make_pair(message->node_id(), message->message_type())] = message; return true; } @@ -323,30 +323,30 @@ void RaySyncer::Disconnect(const std::string &node_id) { "RaySyncerDisconnect"); } -bool RaySyncer::Register(RayComponentId component_id, +bool RaySyncer::Register(MessageType message_type, const ReporterInterface *reporter, ReceiverInterface *receiver, int64_t pull_from_reporter_interval_ms) { - if (!node_state_->SetComponent(component_id, reporter, receiver)) { + if (!node_state_->SetComponent(message_type, reporter, receiver)) { return false; } // Set job to pull from reporter periodically if (reporter != nullptr && pull_from_reporter_interval_ms > 0) { timer_.RunFnPeriodically( - [this, component_id]() { OnDemandBroadcasting(component_id); }, + [this, message_type]() { OnDemandBroadcasting(message_type); }, pull_from_reporter_interval_ms); } RAY_LOG(DEBUG) << "Registered components: " - << "component_id:" << component_id << ", reporter:" << reporter + << "message_type:" << message_type << ", reporter:" << reporter << ", receiver:" << receiver << ", pull_from_reporter_interval_ms:" << pull_from_reporter_interval_ms; return true; } -bool RaySyncer::OnDemandBroadcasting(RayComponentId component_id) { - auto msg = node_state_->CreateSyncMessage(component_id); +bool RaySyncer::OnDemandBroadcasting(MessageType message_type) { + auto msg = node_state_->CreateSyncMessage(message_type); if (msg) { RAY_CHECK(msg->node_id() == GetLocalNodeID()); BroadcastMessage(std::make_shared(std::move(*msg))); diff --git a/src/ray/common/ray_syncer/ray_syncer.h b/src/ray/common/ray_syncer/ray_syncer.h index 7725b518c..42880200c 100644 --- a/src/ray/common/ray_syncer/ray_syncer.h +++ b/src/ray/common/ray_syncer/ray_syncer.h @@ -29,14 +29,14 @@ namespace syncer { using ray::rpc::syncer::DummyRequest; using ray::rpc::syncer::DummyResponse; -using ray::rpc::syncer::RayComponentId; +using ray::rpc::syncer::MessageType; using ray::rpc::syncer::RaySyncMessage; using ray::rpc::syncer::RaySyncMessages; using ray::rpc::syncer::StartSyncRequest; using ray::rpc::syncer::StartSyncResponse; static constexpr size_t kComponentArraySize = - static_cast(ray::rpc::syncer::RayComponentId_ARRAYSIZE); + static_cast(ray::rpc::syncer::MessageType_ARRAYSIZE); /// The interface for a reporter. Reporter is defined to be a local module which would /// like to let the other nodes know its state. For example, local cluster resource @@ -49,13 +49,13 @@ struct ReporterInterface { /// /// \param version_after Request message with version after `version_after`. If the /// reporter doesn't have the qualified one, just return std::nullopt - /// \param component_id The component id asked for. + /// \param message_type The message type asked for. /// /// \return std::nullopt if the reporter doesn't have such component or the current /// snapshot of the component is not newer the asked one. Otherwise, return the /// actual message. virtual std::optional CreateSyncMessage( - int64_t version_after, RayComponentId component_id) const = 0; + int64_t version_after, MessageType message_type) const = 0; virtual ~ReporterInterface() {} }; @@ -114,14 +114,14 @@ class RaySyncer { /// it'll have a global view of the cluster. /// /// - /// \param component_id The component to sync. + /// \param message_type The message type of the component. /// \param reporter The local component to be broadcasted. /// \param receiver The consumer of the sync message sent by the other nodes in the /// cluster. /// \param pull_from_reporter_interval_ms The frequence to pull a message. 0 means /// never pull a message in syncer. /// from reporter and push it to sending queue. - bool Register(RayComponentId component_id, + bool Register(MessageType message_type, const ReporterInterface *reporter, ReceiverInterface *receiver, int64_t pull_from_reporter_interval_ms = 100); @@ -132,10 +132,10 @@ class RaySyncer { /// Request trigger a broadcasting for a specific component immediately instead of /// waiting for ray syncer to poll the message. /// - /// \param component_id The component to check. + /// \param message_type The component to check. /// \return true if a message is generated. If the component doesn't have a new /// version of message, false will be returned. - bool OnDemandBroadcasting(RayComponentId component_id); + bool OnDemandBroadcasting(MessageType message_type); private: /// Get the io_context used by RaySyncer. diff --git a/src/ray/common/test/ray_syncer_test.cc b/src/ray/common/test/ray_syncer_test.cc index 8d8ccb5bf..8ca793737 100644 --- a/src/ray/common/test/ray_syncer_test.cc +++ b/src/ray/common/test/ray_syncer_test.cc @@ -44,10 +44,10 @@ namespace syncer { constexpr size_t kTestComponents = 1; -RaySyncMessage MakeMessage(RayComponentId cid, int64_t version, const NodeID &id) { +RaySyncMessage MakeMessage(MessageType cid, int64_t version, const NodeID &id) { auto msg = RaySyncMessage(); msg.set_version(version); - msg.set_component_id(cid); + msg.set_message_type(cid); msg.set_node_id(id.Binary()); return msg; } @@ -66,7 +66,7 @@ class RaySyncerTest : public ::testing::Test { return std::nullopt; } else { auto msg = RaySyncMessage(); - msg.set_component_id(static_cast(cid)); + msg.set_message_type(static_cast(cid)); msg.set_version(++local_versions_[cid]); return std::make_optional(std::move(msg)); } @@ -82,15 +82,15 @@ class RaySyncerTest : public ::testing::Test { syncer_ = std::make_unique(io_context_, local_id_.Binary()); } - MockReporterInterface *GetReporter(RayComponentId cid) { + MockReporterInterface *GetReporter(MessageType cid) { return reporters_[static_cast(cid)].get(); } - MockReceiverInterface *GetReceiver(RayComponentId cid) { + MockReceiverInterface *GetReceiver(MessageType cid) { return receivers_[static_cast(cid)].get(); } - int64_t &LocalVersion(RayComponentId cid) { + int64_t &LocalVersion(MessageType cid) { return local_versions_[static_cast(cid)]; } @@ -114,32 +114,29 @@ class RaySyncerTest : public ::testing::Test { TEST_F(RaySyncerTest, NodeStateCreateSyncMessage) { auto node_status = std::make_unique(); - node_status->SetComponent(RayComponentId::RESOURCE_MANAGER, nullptr, nullptr); - ASSERT_EQ(std::nullopt, - node_status->CreateSyncMessage(RayComponentId::RESOURCE_MANAGER)); + node_status->SetComponent(MessageType::RESOURCE_VIEW, nullptr, nullptr); + ASSERT_EQ(std::nullopt, node_status->CreateSyncMessage(MessageType::RESOURCE_VIEW)); auto reporter = std::make_unique(); - ASSERT_TRUE(node_status->SetComponent(RayComponentId::RESOURCE_MANAGER, - GetReporter(RayComponentId::RESOURCE_MANAGER), - nullptr)); + ASSERT_TRUE(node_status->SetComponent( + MessageType::RESOURCE_VIEW, GetReporter(MessageType::RESOURCE_VIEW), nullptr)); // Take a snapshot - auto msg = node_status->CreateSyncMessage(RayComponentId::RESOURCE_MANAGER); - ASSERT_EQ(LocalVersion(RayComponentId::RESOURCE_MANAGER), msg->version()); + auto msg = node_status->CreateSyncMessage(MessageType::RESOURCE_VIEW); + ASSERT_EQ(LocalVersion(MessageType::RESOURCE_VIEW), msg->version()); // Revert one version back. - LocalVersion(RayComponentId::RESOURCE_MANAGER) -= 1; - msg = node_status->CreateSyncMessage(RayComponentId::RESOURCE_MANAGER); + LocalVersion(MessageType::RESOURCE_VIEW) -= 1; + msg = node_status->CreateSyncMessage(MessageType::RESOURCE_VIEW); ASSERT_EQ(std::nullopt, msg); } TEST_F(RaySyncerTest, NodeStateConsume) { auto node_status = std::make_unique(); - node_status->SetComponent(RayComponentId::RESOURCE_MANAGER, - nullptr, - GetReceiver(RayComponentId::RESOURCE_MANAGER)); + node_status->SetComponent( + MessageType::RESOURCE_VIEW, nullptr, GetReceiver(MessageType::RESOURCE_VIEW)); auto from_node_id = NodeID::FromRandom(); // The first time receiver the message - auto msg = MakeMessage(RayComponentId::RESOURCE_MANAGER, 0, from_node_id); + auto msg = MakeMessage(MessageType::RESOURCE_VIEW, 0, from_node_id); ASSERT_TRUE(node_status->ConsumeSyncMessage(std::make_shared(msg))); ASSERT_FALSE(node_status->ConsumeSyncMessage(std::make_shared(msg))); @@ -156,7 +153,7 @@ TEST_F(RaySyncerTest, NodeSyncConnection) { node_id.Binary(), [](std::shared_ptr) {}); auto from_node_id = NodeID::FromRandom(); - auto msg = MakeMessage(RayComponentId::RESOURCE_MANAGER, 0, from_node_id); + auto msg = MakeMessage(MessageType::RESOURCE_VIEW, 0, from_node_id); // First push will succeed and the second one will be deduplicated. ASSERT_TRUE(sync_connection.PushToSendingQueue(std::make_shared(msg))); @@ -164,9 +161,9 @@ TEST_F(RaySyncerTest, NodeSyncConnection) { ASSERT_EQ(1, sync_connection.sending_buffer_.size()); ASSERT_EQ(0, sync_connection.sending_buffer_.begin()->second->version()); ASSERT_EQ(1, sync_connection.node_versions_.size()); - ASSERT_EQ(0, - sync_connection - .node_versions_[from_node_id.Binary()][RayComponentId::RESOURCE_MANAGER]); + ASSERT_EQ( + 0, + sync_connection.node_versions_[from_node_id.Binary()][MessageType::RESOURCE_VIEW]); msg.set_version(2); ASSERT_TRUE(sync_connection.PushToSendingQueue(std::make_shared(msg))); @@ -175,9 +172,9 @@ TEST_F(RaySyncerTest, NodeSyncConnection) { ASSERT_EQ(1, sync_connection.sending_buffer_.size()); ASSERT_EQ(1, sync_connection.node_versions_.size()); ASSERT_EQ(2, sync_connection.sending_buffer_.begin()->second->version()); - ASSERT_EQ(2, - sync_connection - .node_versions_[from_node_id.Binary()][RayComponentId::RESOURCE_MANAGER]); + ASSERT_EQ( + 2, + sync_connection.node_versions_[from_node_id.Binary()][MessageType::RESOURCE_VIEW]); } struct SyncerServerTest { @@ -208,7 +205,7 @@ struct SyncerServerTest { iter = received_versions.find(message->node_id()); } - received_versions[message->node_id()][message->component_id()] = + received_versions[message->node_id()][message->message_type()] = message->version(); message_consumed[message->node_id()]++; }; @@ -222,7 +219,7 @@ struct SyncerServerTest { return std::nullopt; } else { auto msg = RaySyncMessage(); - msg.set_component_id(static_cast(cid)); + msg.set_message_type(static_cast(cid)); msg.set_version(local_versions[cid]); msg.set_node_id(syncer->GetLocalNodeID()); snapshot_taken++; @@ -233,7 +230,7 @@ struct SyncerServerTest { EXPECT_CALL(*reporter, CreateSyncMessage(_, Eq(cid))) .WillRepeatedly(WithArg<0>(Invoke(take_snapshot))); syncer->Register( - static_cast(cid), reporter.get(), receivers[cid].get()); + static_cast(cid), reporter.get(), receivers[cid].get()); } thread = std::make_unique([this] { boost::asio::io_context::work work(io_context); @@ -376,10 +373,10 @@ TEST(SyncerTest, Test1To1) { auto s2 = SyncerServerTest("19991"); // Make sure the setup is correct - ASSERT_NE(nullptr, s1.receivers[RayComponentId::RESOURCE_MANAGER]); - ASSERT_NE(nullptr, s2.receivers[RayComponentId::RESOURCE_MANAGER]); - ASSERT_NE(nullptr, s1.reporters[RayComponentId::RESOURCE_MANAGER]); - ASSERT_NE(nullptr, s2.reporters[RayComponentId::RESOURCE_MANAGER]); + ASSERT_NE(nullptr, s1.receivers[MessageType::RESOURCE_VIEW]); + ASSERT_NE(nullptr, s2.receivers[MessageType::RESOURCE_VIEW]); + ASSERT_NE(nullptr, s1.reporters[MessageType::RESOURCE_VIEW]); + ASSERT_NE(nullptr, s2.reporters[MessageType::RESOURCE_VIEW]); auto channel_to_s2 = MakeChannel("19991"); @@ -420,7 +417,7 @@ TEST(SyncerTest, Test1To1) { // Make sure s2 send the new message to s1. ASSERT_TRUE(s1.WaitUntil( [&s1, node_id = s2.syncer->GetLocalNodeID()]() { - return s1.GetReceivedVersions(node_id)[RayComponentId::RESOURCE_MANAGER] == 1 && + return s1.GetReceivedVersions(node_id)[MessageType::RESOURCE_VIEW] == 1 && s1.GetNumConsumedMessages(node_id) == 2; }, 5)); @@ -633,11 +630,11 @@ bool TestCorrectness(std::function get_cluster_ for (size_t i = 0; i < 1000000; ++i) { auto server_idx = choose_server(gen); - auto component_id = choose_component(gen); + auto message_type = choose_component(gen); if (server_idx == 0) { - component_id = 0; + message_type = 0; } - servers[server_idx]->local_versions[component_id]++; + servers[server_idx]->local_versions[message_type]++; // expect to sleep for 100 times for the whole loop. if (rand_sleep(gen) < 100) { std::this_thread::sleep_for(100ms); diff --git a/src/ray/common/test/syncer_service_e2e_test.cc b/src/ray/common/test/syncer_service_e2e_test.cc index 8deb4718a..f8e399943 100644 --- a/src/ray/common/test/syncer_service_e2e_test.cc +++ b/src/ray/common/test/syncer_service_e2e_test.cc @@ -49,12 +49,12 @@ class LocalNode : public ReporterInterface { } std::optional CreateSyncMessage(int64_t current_version, - RayComponentId) const override { + MessageType) const override { if (current_version > version_) { return std::nullopt; } ray::rpc::syncer::RaySyncMessage msg; - msg.set_component_id(ray::rpc::syncer::RayComponentId::RESOURCE_MANAGER); + msg.set_message_type(ray::rpc::syncer::MessageType::RESOURCE_VIEW); msg.set_version(version_); msg.set_sync_message( std::string(reinterpret_cast(&state_), sizeof(state_))); @@ -103,9 +103,8 @@ int main(int argc, char *argv[]) { std::unique_ptr service; std::unique_ptr server; std::shared_ptr channel; - syncer.Register(ray::rpc::syncer::RayComponentId::RESOURCE_MANAGER, - local_node.get(), - remote_node.get()); + syncer.Register( + ray::rpc::syncer::MessageType::RESOURCE_VIEW, local_node.get(), remote_node.get()); if (server_port != ".") { RAY_LOG(INFO) << "Start server on port " << server_port; auto server_address = "0.0.0.0:" + server_port; diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc index 5feb6ca2b..27862cfdf 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc @@ -27,7 +27,7 @@ GcsPlacementGroupScheduler::GcsPlacementGroupScheduler( GcsResourceManager &gcs_resource_manager, ClusterResourceScheduler &cluster_resource_scheduler, std::shared_ptr raylet_client_pool, - gcs_syncer::RaySyncer &ray_syncer) + gcs_syncer::RaySyncer *ray_syncer) : return_timer_(io_context), gcs_table_storage_(std::move(gcs_table_storage)), gcs_node_manager_(gcs_node_manager), @@ -237,14 +237,18 @@ void GcsPlacementGroupScheduler::CancelResourceReserve( RAY_LOG(DEBUG) << "Finished cancelling the resource reserved for bundle: " << bundle_spec->DebugString() << " at node " << node_id; std::vector resource_names; - rpc::NodeResourceChange node_resource_change; auto &resources = bundle_spec->GetFormattedResources(); for (const auto &iter : resources) { resource_names.push_back(iter.first); - node_resource_change.add_deleted_resources(iter.first); } - node_resource_change.set_node_id(node_id.Binary()); - ray_syncer_.Update(std::move(node_resource_change)); + if (ray_syncer_ != nullptr) { + rpc::NodeResourceChange node_resource_change; + for (const auto &iter : resources) { + node_resource_change.add_deleted_resources(iter.first); + } + node_resource_change.set_node_id(node_id.Binary()); + ray_syncer_->Update(std::move(node_resource_change)); + } gcs_resource_manager_.DeleteResources(node_id, std::move(resource_names)); }); } @@ -298,12 +302,14 @@ void GcsPlacementGroupScheduler::CommitAllBundles( auto &resources = bundle->GetFormattedResources(); gcs_resource_manager_.UpdateResources(node_id, resources); - // Push the message to syncer so that it can be broadcasted to all other nodes - rpc::NodeResourceChange node_resource_change; - node_resource_change.set_node_id(node_id.Binary()); - node_resource_change.mutable_updated_resources()->insert(resources.begin(), - resources.end()); - ray_syncer_.Update(std::move(node_resource_change)); + if (ray_syncer_ != nullptr) { + // Push the message to syncer so that it can be broadcasted to all other nodes + rpc::NodeResourceChange node_resource_change; + node_resource_change.set_node_id(node_id.Binary()); + node_resource_change.mutable_updated_resources()->insert(resources.begin(), + resources.end()); + ray_syncer_->Update(std::move(node_resource_change)); + } } if (lease_status_tracker->AllCommitRequestReturned()) { OnAllBundleCommitRequestReturned( diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h index 1401eac02..1afda3925 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h @@ -338,7 +338,7 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { GcsResourceManager &gcs_resource_manager, ClusterResourceScheduler &cluster_resource_scheduler, std::shared_ptr raylet_client_pool, - gcs_syncer::RaySyncer &ray_syncer); + gcs_syncer::RaySyncer *ray_syncer); virtual ~GcsPlacementGroupScheduler() = default; @@ -515,7 +515,7 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { /// The syncer of resource. This is used to report placement group updates. /// TODO (iycheng): Remove this one from pg once we finish the refactor - gcs_syncer::RaySyncer &ray_syncer_; + gcs_syncer::RaySyncer *ray_syncer_; }; } // namespace gcs diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.cc b/src/ray/gcs/gcs_server/gcs_resource_manager.cc index 5af36b2de..6bf9a628b 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.cc @@ -21,13 +21,34 @@ namespace ray { namespace gcs { GcsResourceManager::GcsResourceManager( + instrumented_io_context &io_context, std::shared_ptr gcs_table_storage, ClusterResourceManager &cluster_resource_manager, scheduling::NodeID local_node_id) - : gcs_table_storage_(gcs_table_storage), + : io_context_(io_context), + gcs_table_storage_(gcs_table_storage), cluster_resource_manager_(cluster_resource_manager), local_node_id_(local_node_id) {} +void GcsResourceManager::ConsumeSyncMessage( + std::shared_ptr message) { + // ConsumeSyncMessage is called by ray_syncer which might not run + // in a dedicated thread for performance. + // GcsResourceManager is a module always run in the main thread, so we just + // delegate the work to the main thread for thread safety. + // Ideally, all public api in GcsResourceManager need to be put into this + // io context for thread safety. + io_context_.dispatch( + [this, message]() { + rpc::ResourcesData resources; + resources.ParseFromString(message->sync_message()); + resources.set_node_id(message->node_id()); + RAY_CHECK(message->message_type() == syncer::MessageType::RESOURCE_VIEW); + UpdateFromResourceReport(resources); + }, + "GcsResourceManager::Update"); +} + void GcsResourceManager::HandleGetResources(const rpc::GetResourcesRequest &request, rpc::GetResourcesReply *reply, rpc::SendReplyCallback send_reply_callback) { diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.h b/src/ray/gcs/gcs_server/gcs_resource_manager.h index c00a6c08a..db191cff3 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.h +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.h @@ -16,6 +16,7 @@ #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" #include "ray/common/id.h" +#include "ray/common/ray_syncer/ray_syncer.h" #include "ray/gcs/gcs_server/gcs_init_data.h" #include "ray/gcs/gcs_server/gcs_table_storage.h" #include "ray/raylet/scheduling/cluster_resource_data.h" @@ -44,18 +45,23 @@ namespace gcs { /// It is responsible for handing node resource related rpc requests and it is used for /// actor and placement group scheduling. It obtains the available resources of nodes /// through heartbeat reporting. Non-thread safe. -class GcsResourceManager : public rpc::NodeResourceInfoHandler { +class GcsResourceManager : public rpc::NodeResourceInfoHandler, + public syncer::ReceiverInterface { public: /// Create a GcsResourceManager. /// /// \param gcs_table_storage GCS table external storage accessor. explicit GcsResourceManager( + instrumented_io_context &io_context, std::shared_ptr gcs_table_storage, ClusterResourceManager &cluster_resource_manager, scheduling::NodeID local_node_id_ = scheduling::NodeID::Nil()); virtual ~GcsResourceManager() {} + /// Handle the resource update. + void ConsumeSyncMessage(std::shared_ptr message) override; + /// Handle get resource rpc request. void HandleGetResources(const rpc::GetResourcesRequest &request, rpc::GetResourcesReply *reply, @@ -140,6 +146,10 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler { void UpdateResourceLoads(const rpc::ResourcesData &data); private: + /// io context. This is to ensure thread safety. Ideally, all public + /// funciton needs to post job to this io_context. + instrumented_io_context &io_context_; + /// Newest resource usage of all nodes. absl::flat_hash_map node_resource_usages_; diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 55abbf8cf..f83868da2 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -49,6 +49,7 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config, RayConfig::instance().gcs_server_rpc_client_thread_num()), raylet_client_pool_( std::make_shared(client_call_manager_)), + local_node_id_(NodeID::FromRandom()), pubsub_periodical_runner_(pubsub_io_service_), periodical_runner_(main_service), is_started_(false), @@ -205,8 +206,12 @@ void GcsServer::Stop() { // won't handle heartbeat calls anymore, some nodes will be marked as dead during this // time, causing many nodes die after GCS's failure. gcs_heartbeat_manager_->Stop(); - - ray_syncer_->Stop(); + if (RayConfig::instance().use_ray_syncer()) { + ray_syncer_io_context_.stop(); + ray_syncer_thread_->join(); + } else { + gcs_ray_syncer_->Stop(); + } // Shutdown the rpc server rpc_server_.Shutdown(); @@ -251,6 +256,7 @@ void GcsServer::InitGcsHeartbeatManager(const GcsInitData &gcs_init_data) { void GcsServer::InitGcsResourceManager(const GcsInitData &gcs_init_data) { RAY_CHECK(gcs_table_storage_ && cluster_resource_scheduler_); gcs_resource_manager_ = std::make_shared( + main_service_, gcs_table_storage_, cluster_resource_scheduler_->GetClusterResourceManager(), scheduling::NodeID(local_node_id_.Binary())); @@ -298,7 +304,6 @@ void GcsServer::InitGcsResourceManager(const GcsInitData &gcs_init_data) { } void GcsServer::InitClusterResourceScheduler() { - local_node_id_ = NodeID::FromRandom(); cluster_resource_scheduler_ = std::make_shared( scheduling::NodeID(local_node_id_.Binary()), NodeResources(), @@ -417,7 +422,7 @@ void GcsServer::InitGcsPlacementGroupManager(const GcsInitData &gcs_init_data) { *gcs_resource_manager_, *cluster_resource_scheduler_, raylet_client_pool_, - *ray_syncer_); + gcs_ray_syncer_.get()); gcs_placement_group_manager_ = std::make_shared( main_service_, @@ -468,15 +473,39 @@ void GcsServer::StoreGcsServerAddressInRedis() { } void GcsServer::InitRaySyncer(const GcsInitData &gcs_init_data) { - /* - The current synchronization flow is: - raylet -> syncer::poller --> syncer::update -> gcs_resource_manager - gcs_placement_scheduler --/ - */ - ray_syncer_ = std::make_unique( - main_service_, raylet_client_pool_, *gcs_resource_manager_); - ray_syncer_->Initialize(gcs_init_data); - ray_syncer_->Start(); + if (RayConfig::instance().use_ray_syncer()) { + ray_syncer_ = std::make_unique(ray_syncer_io_context_, + local_node_id_.Binary()); + ray_syncer_->Register( + syncer::MessageType::RESOURCE_VIEW, nullptr, gcs_resource_manager_.get()); + ray_syncer_thread_ = std::make_unique([this]() { + boost::asio::io_service::work work(ray_syncer_io_context_); + ray_syncer_io_context_.run(); + }); + + for (const auto &pair : gcs_init_data.Nodes()) { + if (pair.second.state() == + rpc::GcsNodeInfo_GcsNodeState::GcsNodeInfo_GcsNodeState_ALIVE) { + rpc::Address address; + address.set_raylet_id(pair.second.node_id()); + address.set_ip_address(pair.second.node_manager_address()); + address.set_port(pair.second.node_manager_port()); + + auto raylet_client = raylet_client_pool_->GetOrConnectByAddress(address); + ray_syncer_->Connect(raylet_client->GetChannel()); + } + } + } else { + /* + The current synchronization flow is: + raylet -> syncer::poller --> syncer::update -> gcs_resource_manager + gcs_placement_scheduler --/ + */ + gcs_ray_syncer_ = std::make_unique( + main_service_, raylet_client_pool_, *gcs_resource_manager_); + gcs_ray_syncer_->Initialize(gcs_init_data); + gcs_ray_syncer_->Start(); + } } void GcsServer::InitStatsHandler() { @@ -564,12 +593,23 @@ void GcsServer::InstallEventListeners() { gcs_node_manager_->AddNodeAddedListener([this](std::shared_ptr node) { // Because a new node has been added, we need to try to schedule the pending // placement groups and the pending actors. + auto node_id = NodeID::FromBinary(node->node_id()); gcs_resource_manager_->OnNodeAdd(*node); - gcs_placement_group_manager_->OnNodeAdd(NodeID::FromBinary(node->node_id())); + gcs_placement_group_manager_->OnNodeAdd(node_id); gcs_actor_manager_->SchedulePendingActors(); - cluster_task_manager_->ScheduleAndDispatchTasks(); gcs_heartbeat_manager_->AddNode(NodeID::FromBinary(node->node_id())); - ray_syncer_->AddNode(*node); + cluster_task_manager_->ScheduleAndDispatchTasks(); + if (RayConfig::instance().use_ray_syncer()) { + rpc::Address address; + address.set_raylet_id(node->node_id()); + address.set_ip_address(node->node_manager_address()); + address.set_port(node->node_manager_port()); + + auto raylet_client = raylet_client_pool_->GetOrConnectByAddress(address); + ray_syncer_->Connect(raylet_client->GetChannel()); + } else { + gcs_ray_syncer_->AddNode(*node); + } }); gcs_node_manager_->AddNodeRemovedListener( [this](std::shared_ptr node) { @@ -581,7 +621,11 @@ void GcsServer::InstallEventListeners() { gcs_placement_group_manager_->OnNodeDead(node_id); gcs_actor_manager_->OnNodeDead(node_id, node_ip_address); raylet_client_pool_->Disconnect(NodeID::FromBinary(node->node_id())); - ray_syncer_->RemoveNode(*node); + if (RayConfig::instance().use_ray_syncer()) { + ray_syncer_->Disconnect(node_id.Binary()); + } else { + gcs_ray_syncer_->RemoveNode(*node); + } }); // Install worker event listener. @@ -648,8 +692,9 @@ std::string GcsServer::GetDebugState() const { << gcs_placement_group_manager_->DebugString() << "\n\n" << gcs_publisher_->DebugString() << "\n\n" << runtime_env_manager_->DebugString() << "\n\n"; - - stream << ray_syncer_->DebugString(); + if (gcs_ray_syncer_) { + stream << gcs_ray_syncer_->DebugString(); + } return stream.str(); } diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index a8f68f5c4..51fbc8602 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -15,6 +15,7 @@ #pragma once #include "ray/common/asio/instrumented_io_context.h" +#include "ray/common/ray_syncer/ray_syncer.h" #include "ray/common/runtime_env_manager.h" #include "ray/gcs/gcs_server/gcs_function_manager.h" #include "ray/gcs/gcs_server/gcs_heartbeat_manager.h" @@ -217,8 +218,20 @@ class GcsServer { /// Stats handler and service. std::unique_ptr stats_handler_; std::unique_ptr stats_service_; - // Synchronization service for ray. - std::unique_ptr ray_syncer_; + + /// Synchronization service for ray. + /// TODO(iycheng): Deprecate this gcs_ray_syncer_ one once we roll out + /// to ray_syncer_. + std::unique_ptr gcs_ray_syncer_; + + /// Ray Syncer realted fields. + std::unique_ptr ray_syncer_; + std::unique_ptr ray_syncer_thread_; + instrumented_io_context ray_syncer_io_context_; + + /// The node id of GCS. + NodeID gcs_node_id_; + /// The gcs worker manager. std::unique_ptr gcs_worker_manager_; /// Worker info service. diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc index 132ca4dc8..5d112600a 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc @@ -61,7 +61,9 @@ class GcsActorSchedulerTest : public ::testing::Test { /*local_task_manager=*/ nullptr); auto gcs_resource_manager = std::make_shared( - gcs_table_storage_, cluster_resource_scheduler->GetClusterResourceManager()); + io_service_, + gcs_table_storage_, + cluster_resource_scheduler->GetClusterResourceManager()); gcs_actor_scheduler_ = std::make_shared( io_service_, *gcs_actor_table_, diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_mock_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_mock_test.cc index 6ac65a60d..e5e24ebbe 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_mock_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_mock_test.cc @@ -37,8 +37,8 @@ class GcsPlacementGroupManagerMockTest : public Test { gcs_table_storage_ = std::make_shared(store_client_); gcs_placement_group_scheduler_ = std::make_shared(); - resource_manager_ = - std::make_shared(nullptr, cluster_resource_manager_); + resource_manager_ = std::make_shared( + io_context_, nullptr, cluster_resource_manager_); gcs_placement_group_manager_ = std::make_unique(io_context_, diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc index 7d8c87152..533f7652d 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc @@ -84,8 +84,8 @@ class GcsPlacementGroupManagerTest : public ::testing::Test { gcs_publisher_ = std::make_shared(std::make_unique()); gcs_table_storage_ = std::make_shared(io_service_); - gcs_resource_manager_ = - std::make_shared(nullptr, cluster_resource_manager_); + gcs_resource_manager_ = std::make_shared( + io_service_, nullptr, cluster_resource_manager_); gcs_placement_group_manager_.reset(new gcs::GcsPlacementGroupManager( io_service_, mock_placement_group_scheduler_, diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc index ff4bcc67a..ece9bdbbb 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc @@ -53,7 +53,9 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { [](auto) { return true; }, /*is_local_node_with_raylet=*/false); gcs_resource_manager_ = std::make_shared( - gcs_table_storage_, cluster_resource_scheduler_->GetClusterResourceManager()); + io_service_, + gcs_table_storage_, + cluster_resource_scheduler_->GetClusterResourceManager()); ray_syncer_ = std::make_shared( io_service_, nullptr, *gcs_resource_manager_); store_client_ = std::make_shared(io_service_); @@ -68,7 +70,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { *gcs_resource_manager_, *cluster_resource_scheduler_, raylet_client_pool_, - *ray_syncer_); + ray_syncer_.get()); } void TearDown() override { diff --git a/src/ray/gcs/gcs_server/test/gcs_resource_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_resource_manager_test.cc index b63fdfe46..82b2cb2d9 100644 --- a/src/ray/gcs/gcs_server/test/gcs_resource_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_resource_manager_test.cc @@ -28,10 +28,11 @@ using ::testing::_; class GcsResourceManagerTest : public ::testing::Test { public: GcsResourceManagerTest() { - gcs_resource_manager_ = - std::make_shared(nullptr, cluster_resource_manager_); + gcs_resource_manager_ = std::make_shared( + io_service_, nullptr, cluster_resource_manager_); } + instrumented_io_context io_service_; ClusterResourceManager cluster_resource_manager_; std::shared_ptr gcs_resource_manager_; }; diff --git a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h index 0b7fda200..4be5676a9 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h +++ b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h @@ -72,6 +72,8 @@ struct GcsServerMocker { return Status::OK(); } + std::shared_ptr GetChannel() const override { return nullptr; } + void ReportWorkerBacklog( const WorkerID &worker_id, const std::vector &backlog_reports) override {} diff --git a/src/ray/protobuf/ray_syncer.proto b/src/ray/protobuf/ray_syncer.proto index 33c60dc96..f171665ef 100644 --- a/src/ray/protobuf/ray_syncer.proto +++ b/src/ray/protobuf/ray_syncer.proto @@ -16,16 +16,16 @@ syntax = "proto3"; option cc_enable_arenas = true; package ray.rpc.syncer; -enum RayComponentId { - RESOURCE_MANAGER = 0; - SCHEDULER = 1; +enum MessageType { + RESOURCE_VIEW = 0; + COMMANDS = 1; } message RaySyncMessage { // The version of the message. -1 means the version is not set. int64 version = 1; - // The component this message is for. - RayComponentId component_id = 2; + // The type of this message. + MessageType message_type = 2; // The actual payload. bytes sync_message = 3; // The node id which initially sent this message. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index af2fc140f..ba4ac6d53 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -19,6 +19,7 @@ #include #include +#include "absl/time/clock.h" #include "boost/filesystem.hpp" #include "boost/system/error_code.hpp" #include "ray/common/asio/asio_util.h" @@ -329,7 +330,9 @@ NodeManager::NodeManager(instrumented_io_context &io_service, global_gc_throttler_(RayConfig::instance().global_gc_min_interval_s() * 1e9), local_gc_interval_ns_(RayConfig::instance().local_gc_interval_s() * 1e9), record_metrics_period_ms_(config.record_metrics_period_ms), - next_resource_seq_no_(0) { + next_resource_seq_no_(0), + ray_syncer_(io_service_, self_node_id_.Binary()), + ray_syncer_service_(ray_syncer_) { RAY_LOG(INFO) << "Initializing NodeManager with ID " << self_node_id_; RAY_CHECK(RayConfig::instance().raylet_heartbeat_period_milliseconds() > 0); cluster_resource_scheduler_ = std::make_shared( @@ -404,6 +407,9 @@ NodeManager::NodeManager(instrumented_io_context &io_service, // Run the node manger rpc server. node_manager_server_.RegisterService(node_manager_service_); node_manager_server_.RegisterService(agent_manager_service_); + if (RayConfig::instance().use_ray_syncer()) { + node_manager_server_.RegisterService(ray_syncer_service_); + } node_manager_server_.Run(); worker_pool_.SetNodeManagerPort(GetServerPort()); @@ -541,6 +547,35 @@ ray::Status NodeManager::RegisterGcs() { "NodeManager.deadline_timer.print_event_loop_stats"); } + if (RayConfig::instance().use_ray_syncer()) { + // Register resource manager and scheduler + ray_syncer_.Register( + /* message_type */ syncer::MessageType::RESOURCE_VIEW, + /* reporter */ &cluster_resource_scheduler_->GetLocalResourceManager(), + /* receiver */ this, + /* pull_from_reporter_interval_ms */ + RayConfig::instance().raylet_report_resources_period_milliseconds()); + + // Register a commands channel. + // It's only used for GC right now. + ray_syncer_.Register( + /* message_type */ syncer::MessageType::COMMANDS, + /* reporter */ this, + /* receiver */ this, + /* pull_from_reporter_interval_ms */ 0); + + periodical_runner_.RunFnPeriodically( + [this] { + auto triggered_by_global_gc = TryLocalGC(); + // If plasma store is under high pressure, we should try to schedule a global + // gc. + if (triggered_by_global_gc) { + ray_syncer_.OnDemandBroadcasting(syncer::MessageType::COMMANDS); + } + }, + RayConfig::instance().raylet_check_gc_period_milliseconds(), + "NodeManager.CheckGC"); + } return ray::Status::OK(); } @@ -618,30 +653,8 @@ void NodeManager::FillResourceReport(rpc::ResourcesData &resources_data) { if (RayConfig::instance().gcs_actor_scheduling_enabled()) { FillNormalTaskResourceUsage(resources_data); } - // If plasma store is under high pressure, we should try to schedule a global gc. - bool plasma_high_pressure = - object_manager_.GetUsedMemoryPercentage() > high_plasma_storage_usage_; - if (plasma_high_pressure && global_gc_throttler_.AbleToRun()) { - TriggerGlobalGC(); - } - // Set the global gc bit on the outgoing heartbeat message. - bool triggered_by_global_gc = false; - if (should_global_gc_) { - resources_data.set_should_global_gc(true); - triggered_by_global_gc = true; - should_global_gc_ = false; - global_gc_throttler_.RunNow(); - } - - // Trigger local GC if needed. This throttles the frequency of local GC calls - // to at most once per heartbeat interval. - if ((should_local_gc_ || - (absl::GetCurrentTimeNanos() - local_gc_run_time_ns_ > local_gc_interval_ns_)) && - local_gc_throttler_.AbleToRun()) { - DoLocalGC(triggered_by_global_gc); - should_local_gc_ = false; - } + resources_data.set_should_global_gc(TryLocalGC()); } void NodeManager::DoLocalGC(bool triggered_by_global_gc) { @@ -1799,6 +1812,10 @@ void NodeManager::HandleCommitBundleResources( RAY_LOG(DEBUG) << "Request to commit resources for bundles: " << GetDebugStringForBundles(bundle_specs); placement_group_resource_manager_->CommitBundles(bundle_specs); + if (RayConfig::instance().use_ray_syncer()) { + // To reduce the lag, we trigger a broadcasting immediately. + RAY_CHECK(ray_syncer_.OnDemandBroadcasting(syncer::MessageType::RESOURCE_VIEW)); + } send_reply_callback(Status::OK(), nullptr, nullptr); cluster_task_manager_->ScheduleAndDispatchTasks(); @@ -1836,6 +1853,10 @@ void NodeManager::HandleCancelResourceReserve( // Return bundle resources. placement_group_resource_manager_->ReturnBundle(bundle_spec); + if (RayConfig::instance().use_ray_syncer()) { + // To reduce the lag, we trigger a broadcasting immediately. + RAY_CHECK(ray_syncer_.OnDemandBroadcasting(syncer::MessageType::RESOURCE_VIEW)); + } cluster_task_manager_->ScheduleAndDispatchTasks(); send_reply_callback(Status::OK(), nullptr, nullptr); } @@ -2615,6 +2636,33 @@ void NodeManager::HandleGlobalGC(const rpc::GlobalGCRequest &request, TriggerGlobalGC(); } +bool NodeManager::TryLocalGC() { + // If plasma store is under high pressure, we should try to schedule a global gc. + bool plasma_high_pressure = + object_manager_.GetUsedMemoryPercentage() > high_plasma_storage_usage_; + if (plasma_high_pressure && global_gc_throttler_.AbleToRun()) { + TriggerGlobalGC(); + } + + // Set the global gc bit on the outgoing heartbeat message. + bool triggered_by_global_gc = false; + if (should_global_gc_) { + triggered_by_global_gc = true; + should_global_gc_ = false; + global_gc_throttler_.RunNow(); + } + + // Trigger local GC if needed. This throttles the frequency of local GC calls + // to at most once per heartbeat interval. + if ((should_local_gc_ || + (absl::GetCurrentTimeNanos() - local_gc_run_time_ns_ > local_gc_interval_ns_)) && + local_gc_throttler_.AbleToRun()) { + DoLocalGC(triggered_by_global_gc); + should_local_gc_ = false; + } + return triggered_by_global_gc; +} + void NodeManager::TriggerGlobalGC() { should_global_gc_ = true; // We won't see our own request, so trigger local GC in the next heartbeat. @@ -2644,6 +2692,38 @@ void NodeManager::RecordMetrics() { object_directory_->RecordMetrics(duration_ms); } +void NodeManager::ConsumeSyncMessage( + std::shared_ptr message) { + if (message->message_type() == syncer::MessageType::RESOURCE_VIEW) { + rpc::ResourcesData data; + data.ParseFromString(message->sync_message()); + NodeID node_id = NodeID::FromBinary(data.node_id()); + UpdateResourceUsage(node_id, data); + } else if (message->message_type() == syncer::MessageType::COMMANDS) { + rpc::ResourcesData data; + data.ParseFromString(message->sync_message()); + if (data.should_global_gc()) { + should_local_gc_ = true; + } + } +} + +std::optional NodeManager::CreateSyncMessage( + int64_t after_version, syncer::MessageType message_type) const { + RAY_CHECK(message_type == syncer::MessageType::COMMANDS); + + rpc::ResourcesData resources_data; + resources_data.set_should_global_gc(true); + syncer::RaySyncMessage msg; + msg.set_version(absl::GetCurrentTimeNanos()); + msg.set_node_id(self_node_id_.Binary()); + msg.set_message_type(syncer::MessageType::COMMANDS); + std::string serialized_msg; + RAY_CHECK(resources_data.SerializeToString(&serialized_msg)); + msg.set_sync_message(std::move(serialized_msg)); + return std::make_optional(std::move(msg)); +} + void NodeManager::PublishInfeasibleTaskError(const RayTask &task) const { bool suppress_warning = false; diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 8e88ad798..c7b0391cd 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -21,6 +21,7 @@ #include "ray/common/id.h" #include "ray/common/task/task.h" #include "ray/common/ray_object.h" +#include "ray/common/ray_syncer/ray_syncer.h" #include "ray/common/client_connection.h" #include "ray/common/task/task_common.h" #include "ray/common/task/scheduling_resources.h" @@ -138,7 +139,9 @@ class HeartbeatSender { uint64_t last_heartbeat_at_ms_; }; -class NodeManager : public rpc::NodeManagerServiceHandler { +class NodeManager : public rpc::NodeManagerServiceHandler, + public syncer::ReporterInterface, + public syncer::ReceiverInterface { public: /// Create a node manager. /// @@ -188,6 +191,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// Get the port of the node manager rpc server. int GetServerPort() const { return node_manager_server_.GetPort(); } + void ConsumeSyncMessage(std::shared_ptr message) override; + + std::optional CreateSyncMessage( + int64_t after_version, syncer::MessageType message_type) const override; + int GetObjectManagerPort() const { return object_manager_.GetServerPort(); } LocalObjectManager &GetLocalObjectManager() { return local_object_manager_; } @@ -647,6 +655,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler { rpc::WorkerExitType disconnect_type = rpc::WorkerExitType::SYSTEM_ERROR_EXIT, const rpc::RayException *creation_task_exception = nullptr); + bool TryLocalGC(); + /// ID of this node. NodeID self_node_id_; /// The user-given identifier or name of this node. @@ -797,6 +807,12 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// Whether or not if the node draining process has already received. bool is_node_drained_ = false; + + /// Ray syncer for synchronization + syncer::RaySyncer ray_syncer_; + + /// RaySyncerService for gRPC + syncer::RaySyncerService ray_syncer_service_; }; } // namespace raylet diff --git a/src/ray/raylet/scheduling/local_resource_manager.cc b/src/ray/raylet/scheduling/local_resource_manager.cc index 64f1af181..a74d7dbd5 100644 --- a/src/ray/raylet/scheduling/local_resource_manager.cc +++ b/src/ray/raylet/scheduling/local_resource_manager.cc @@ -306,10 +306,13 @@ void LocalResourceManager::UpdateAvailableObjectStoreMemResource() { RAY_CHECK_EQ(total_instances.size(), 1u); const double used = get_used_object_store_memory_(); const double total = total_instances[0].Double(); - local_resources_.available.Set(ResourceID::ObjectStoreMemory(), - {FixedPoint(total >= used ? total - used : 0.0)}); - - OnResourceChanged(); + auto new_available = + std::vector{FixedPoint(total >= used ? total - used : 0.0)}; + if (new_available != local_resources_.available.Get(ResourceID::ObjectStoreMemory())) { + local_resources_.available.Set(ResourceID::ObjectStoreMemory(), + std::move(new_available)); + OnResourceChanged(); + } } void LocalResourceManager::FillResourceUsage(rpc::ResourcesData &resources_data) { @@ -322,6 +325,7 @@ void LocalResourceManager::FillResourceUsage(rpc::ResourcesData &resources_data) NodeResources node_resources = ResourceMapToNodeResources({{}}, {{}}); last_report_resources_.reset(new NodeResources(node_resources)); } + for (auto entry : resources.total.ToMap()) { auto resource_id = entry.first; auto label = ResourceID(resource_id).Binary(); @@ -361,6 +365,53 @@ double LocalResourceManager::GetLocalAvailableCpus() const { return local_resources_.available.Sum(ResourceID::CPU()).Double(); } +std::optional LocalResourceManager::CreateSyncMessage( + int64_t after_version, syncer::MessageType message_type) const { + RAY_CHECK(message_type == syncer::MessageType::RESOURCE_VIEW); + // We check the memory inside version, so version is not a const function. + // Ideally, we need to move the memory check somewhere else. + // TODO(iycheng): Make version as a const function. + const_cast(this)->UpdateAvailableObjectStoreMemResource(); + + if (version_ <= after_version) { + return std::nullopt; + } + + syncer::RaySyncMessage msg; + rpc::ResourcesData resources_data; + + resources_data.set_node_id(local_node_id_.Binary()); + + NodeResources resources = ToNodeResources(local_resources_); + + for (auto entry : resources.total.ToMap()) { + auto resource_id = entry.first; + auto label = ResourceID(resource_id).Binary(); + auto total = entry.second; + auto available = resources.available.Get(resource_id); + + resources_data.set_resources_available_changed(true); + (*resources_data.mutable_resources_available())[label] = available.Double(); + (*resources_data.mutable_resources_total())[label] = total.Double(); + } + + if (get_pull_manager_at_capacity_ != nullptr) { + resources.object_pulls_queued = get_pull_manager_at_capacity_(); + resources_data.set_object_pulls_queued(resources.object_pulls_queued); + resources_data.set_resources_available_changed(true); + } + + resources_data.set_resources_available_changed(true); + + msg.set_node_id(local_node_id_.Binary()); + msg.set_version(version_); + msg.set_message_type(message_type); + std::string serialized_msg; + RAY_CHECK(resources_data.SerializeToString(&serialized_msg)); + msg.set_sync_message(std::move(serialized_msg)); + return std::make_optional(std::move(msg)); +} + ray::gcs::NodeResourceInfoAccessor::ResourceMap LocalResourceManager::GetResourceTotals( const absl::flat_hash_map &resource_map_filter) const { ray::gcs::NodeResourceInfoAccessor::ResourceMap map; @@ -380,6 +431,7 @@ ray::gcs::NodeResourceInfoAccessor::ResourceMap LocalResourceManager::GetResourc } void LocalResourceManager::OnResourceChanged() { + ++version_; if (resource_change_subscriber_ == nullptr) { return; } diff --git a/src/ray/raylet/scheduling/local_resource_manager.h b/src/ray/raylet/scheduling/local_resource_manager.h index 07cc9270f..f857a8731 100644 --- a/src/ray/raylet/scheduling/local_resource_manager.h +++ b/src/ray/raylet/scheduling/local_resource_manager.h @@ -22,6 +22,7 @@ #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" +#include "ray/common/ray_syncer/ray_syncer.h" #include "ray/common/task/scheduling_resources.h" #include "ray/gcs/gcs_client/accessor.h" #include "ray/gcs/gcs_client/gcs_client.h" @@ -37,7 +38,7 @@ namespace ray { /// it also supports creating a new resource or delete an existing resource. /// Whenever the resouce changes, it notifies the subscriber of the change. /// This class is not thread safe. -class LocalResourceManager { +class LocalResourceManager : public syncer::ReporterInterface { public: LocalResourceManager( scheduling::NodeID local_node_id, @@ -145,6 +146,9 @@ class LocalResourceManager { /// \return true, if exist. otherwise, false. bool ResourcesExist(scheduling::ResourceID resource_id) const; + std::optional CreateSyncMessage( + int64_t after_version, syncer::MessageType message_type) const override; + private: /// Notify the subscriber that the local resouces has changed. void OnResourceChanged(); @@ -251,6 +255,9 @@ class LocalResourceManager { // Specify custom resources that consists of unit-size instances. std::unordered_set custom_unit_instance_resources_{}; + // Version of this resource. It will incr by one whenever the state changed. + int64_t version_ = 0; + FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingUpdateTotalResourcesTest); FRIEND_TEST(ClusterResourceSchedulerTest, AvailableResourceInstancesOpsTest); FRIEND_TEST(ClusterResourceSchedulerTest, TaskResourceInstancesTest); diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index 5211e79aa..eb71b2fed 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -346,6 +346,10 @@ void RayletClient::RequestObjectSpillage( grpc_client_->RequestObjectSpillage(request, callback); } +std::shared_ptr RayletClient::GetChannel() const { + return grpc_client_->Channel(); +} + void RayletClient::ReportWorkerBacklog( const WorkerID &worker_id, const std::vector &backlog_reports) { diff --git a/src/ray/raylet_client/raylet_client.h b/src/ray/raylet_client/raylet_client.h index f0ab5ce7c..8063ed367 100644 --- a/src/ray/raylet_client/raylet_client.h +++ b/src/ray/raylet_client/raylet_client.h @@ -191,6 +191,8 @@ class RayletClientInterface : public PinObjectsInterface, const NodeID &node_id, bool graceful, const rpc::ClientCallback &callback) = 0; + + virtual std::shared_ptr GetChannel() const = 0; }; namespace raylet { @@ -433,6 +435,8 @@ class RayletClient : public RayletClientInterface { const ObjectID &object_id, const rpc::ClientCallback &callback); + std::shared_ptr GetChannel() const override; + /// Implements WorkerLeaseInterface. void RequestWorkerLease( const rpc::TaskSpec &resource_spec, diff --git a/src/ray/rpc/grpc_client.h b/src/ray/rpc/grpc_client.h index 0bc48fca8..301392799 100644 --- a/src/ray/rpc/grpc_client.h +++ b/src/ray/rpc/grpc_client.h @@ -88,7 +88,8 @@ class GrpcClient { ClientCallManager &call_manager, bool use_tls = false) : client_call_manager_(call_manager), use_tls_(use_tls) { - stub_ = GrpcService::NewStub(std::move(channel)); + channel_ = std::move(channel); + stub_ = GrpcService::NewStub(channel_); } GrpcClient(const std::string &address, @@ -97,8 +98,8 @@ class GrpcClient { bool use_tls = false) : client_call_manager_(call_manager), use_tls_(use_tls) { std::shared_ptr channel = BuildChannel(address, port); - - stub_ = GrpcService::NewStub(std::move(channel)); + channel_ = BuildChannel(address, port); + stub_ = GrpcService::NewStub(channel_); } GrpcClient(const std::string &address, @@ -116,9 +117,8 @@ class GrpcClient { argument.SetMaxSendMessageSize(::RayConfig::instance().max_grpc_message_size()); argument.SetMaxReceiveMessageSize(::RayConfig::instance().max_grpc_message_size()); - std::shared_ptr channel = BuildChannel(address, port, argument); - - stub_ = GrpcService::NewStub(std::move(channel)); + channel_ = BuildChannel(address, port, argument); + stub_ = GrpcService::NewStub(channel_); } /// Create a new `ClientCall` and send request. @@ -152,12 +152,16 @@ class GrpcClient { RAY_CHECK(call != nullptr); } + std::shared_ptr Channel() const { return channel_; } + private: ClientCallManager &client_call_manager_; /// The gRPC-generated stub. std::unique_ptr stub_; /// Whether to use TLS. bool use_tls_; + /// The channel of the stub. + std::shared_ptr channel_; }; } // namespace rpc diff --git a/src/ray/rpc/grpc_server.cc b/src/ray/rpc/grpc_server.cc index 820612f98..f3839f505 100644 --- a/src/ray/rpc/grpc_server.cc +++ b/src/ray/rpc/grpc_server.cc @@ -127,6 +127,10 @@ void GrpcServer::Run() { is_closed_ = false; } +void GrpcServer::RegisterService(grpc::Service &service) { + services_.emplace_back(service); +} + void GrpcServer::RegisterService(GrpcService &service) { services_.emplace_back(service.GetGrpcService()); diff --git a/src/ray/rpc/grpc_server.h b/src/ray/rpc/grpc_server.h index c03d87cda..b8dfdb064 100644 --- a/src/ray/rpc/grpc_server.h +++ b/src/ray/rpc/grpc_server.h @@ -107,6 +107,7 @@ class GrpcServer { /// /// \param[in] service A `GrpcService` to register to this server. void RegisterService(GrpcService &service); + void RegisterService(grpc::Service &service); protected: /// This function runs in a background thread. It keeps polling events from the diff --git a/src/ray/rpc/node_manager/node_manager_client.h b/src/ray/rpc/node_manager/node_manager_client.h index ecda8fd2f..12d25b63d 100644 --- a/src/ray/rpc/node_manager/node_manager_client.h +++ b/src/ray/rpc/node_manager/node_manager_client.h @@ -53,6 +53,8 @@ class NodeManagerClient { GetNodeStats(request, callback); } + std::shared_ptr Channel() const { return grpc_client_->Channel(); } + private: /// The RPC client. std::unique_ptr> grpc_client_; @@ -75,6 +77,8 @@ class NodeManagerWorkerClient return std::shared_ptr(instance); } + std::shared_ptr Channel() const { return grpc_client_->Channel(); } + /// Update cluster resource usage. VOID_RPC_CLIENT_METHOD(NodeManagerService, UpdateResourceUsage,