[scheduler][6] Integrate ray with syncer. (#23660)

The new syncer comes with the feature of long-polling and versioning. This PR integrates it with ray.
This commit is contained in:
Yi Cheng 2022-05-10 13:12:22 -07:00 committed by GitHub
parent 04e16f70a3
commit 6c60dbb242
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
35 changed files with 493 additions and 172 deletions

View file

@ -301,7 +301,6 @@
--build_tests_only
--test_tag_filters=release_unit
release/...
- label: ":python: (Small & Client)"
conditions: ["RAY_CI_PYTHON_AFFECTED"]
commands:
@ -354,6 +353,38 @@
--test_env=CONDA_DEFAULT_ENV
python/ray/tests/...
- label: ":construction: :python: (syncer) (Small & Client)"
conditions: ["RAY_CI_PYTHON_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
- bazel test --config=ci $(./scripts/bazel_export_options)
--test_tag_filters=client_tests,small_size_python_tests
--test_env=RAY_use_ray_syncer=true
-- python/ray/tests/...
- label: ":construction: :python: (syncer) (Large)"
conditions: ["RAY_CI_PYTHON_AFFECTED"]
parallelism: 3
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- RAY_use_ray_syncer=true . ./ci/ci.sh test_large
- label: ":construction: :python: (syncer) (Medium A-J)"
conditions: ["RAY_CI_PYTHON_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
- bazel test --config=ci $(./scripts/bazel_export_options)
--test_tag_filters=-kubernetes,medium_size_python_tests_a_to_j
--test_env=RAY_use_ray_syncer=true
python/ray/tests/...
- label: ":construction: :python: (syncer) (Medium K-Z)"
conditions: ["RAY_CI_PYTHON_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
- bazel test --config=ci $(./scripts/bazel_export_options)
--test_tag_filters=-kubernetes,medium_size_python_tests_k_to_z
--test_env=RAY_use_ray_syncer=true
python/ray/tests/...
# https://github.com/ray-project/ray/issues/22460
#- label: ":python: (Privileged test)"
#conditions: ["RAY_CI_PYTHON_AFFECTED"]

View file

@ -66,3 +66,5 @@ cdef extern from "ray/common/ray_config.h" nogil:
c_bool gcs_grpc_based_pubsub() const
c_bool start_python_importer_thread() const
c_bool use_ray_syncer() const

View file

@ -107,3 +107,7 @@ cdef class Config:
@staticmethod
def start_python_importer_thread():
return RayConfig.instance().start_python_importer_thread()
@staticmethod
def use_ray_syncer():
return RayConfig.instance().use_ray_syncer()

View file

@ -44,7 +44,6 @@ _METRICS = [
"ray_internal_num_spilled_tasks",
# "ray_unintentional_worker_failures_total",
# "ray_node_failure_total",
"ray_outbound_heartbeat_size_kb_sum",
"ray_operation_count",
"ray_operation_run_time_ms",
"ray_operation_queue_time_ms",
@ -75,6 +74,9 @@ _METRICS = [
"ray_gcs_actors_count",
]
if not ray._raylet.Config.use_ray_syncer():
_METRICS.append("ray_outbound_heartbeat_size_kb_sum")
# This list of metrics should be kept in sync with
# ray/python/ray/autoscaler/_private/prom_metrics.py
_AUTOSCALER_METRICS = [

View file

@ -19,7 +19,7 @@ class MockReporterInterface : public ReporterInterface {
public:
MOCK_METHOD(std::optional<RaySyncMessage>,
CreateSyncMessage,
(int64_t current_version, RayComponentId component_id),
(int64_t current_version, MessageType message_type),
(const, override));
};

View file

@ -132,6 +132,7 @@ class MockRayletClientInterface : public RayletClientInterface {
WaitForDirectActorCallArgs,
(const std::vector<rpc::ObjectReference> &references, int64_t tag),
(override));
MOCK_METHOD(std::shared_ptr<grpc::Channel>, GetChannel, (), (const));
MOCK_METHOD(void,
ReportWorkerBacklog,
(const WorkerID &worker_id,

View file

@ -64,6 +64,10 @@ RAY_CONFIG(uint64_t, num_heartbeats_warning, 5)
/// The duration between reporting resources sent by the raylets.
RAY_CONFIG(uint64_t, raylet_report_resources_period_milliseconds, 100)
/// The duration between raylet check memory pressure and send gc request
RAY_CONFIG(uint64_t, raylet_check_gc_period_milliseconds, 100)
/// For a raylet, if the last resource report was sent more than this many
/// report periods ago, then a warning will be logged that the report
/// handler is drifting.
@ -343,6 +347,9 @@ RAY_CONFIG(int32_t, gcs_rpc_server_reconnect_timeout_s, 60)
/// Minimum interval between reconnecting gcs rpc server when gcs server restarts.
RAY_CONFIG(int32_t, minimum_gcs_reconnect_interval_milliseconds, 5000)
/// Feature flag to use the ray syncer for resource synchronization
RAY_CONFIG(bool, use_ray_syncer, false)
/// The interval at which the gcs client will check if the address of gcs service has
/// changed. When the address changed, we will resubscribe again.
RAY_CONFIG(uint64_t, gcs_service_address_check_interval_milliseconds, 1000)

View file

@ -25,7 +25,7 @@ class NodeState {
/// Set the local component.
///
/// \param cid The component id.
/// \param message_type The type of the message for this component.
/// \param reporter The reporter is defined to be the local module which wants to
/// broadcast its internal status to the whole clsuter. When it's null, it means there
/// is no reporter in the local node for this component. This is the place there
@ -36,16 +36,16 @@ class NodeState {
/// received messages are consumed.
///
/// \return true if set successfully.
bool SetComponent(RayComponentId cid,
bool SetComponent(MessageType message_type,
const ReporterInterface *reporter,
ReceiverInterface *receiver);
/// Get the snapshot of a component for a newer version.
///
/// \param cid The component id to take the snapshot.
/// \param message_type The component to take the snapshot.
///
/// \return If a snapshot is taken, return the message, otherwise std::nullopt.
std::optional<RaySyncMessage> CreateSyncMessage(RayComponentId cid);
std::optional<RaySyncMessage> CreateSyncMessage(MessageType message_type);
/// Consume a message. Receiver will consume this message if it doesn't have
/// this message.
@ -127,7 +127,7 @@ class NodeSyncConnection {
std::function<void(std::shared_ptr<RaySyncMessage>)> message_processor_;
/// Buffering all the updates. Sending will be done in an async way.
absl::flat_hash_map<std::pair<std::string, RayComponentId>,
absl::flat_hash_map<std::pair<std::string, MessageType>,
std::shared_ptr<const RaySyncMessage>>
sending_buffer_;

View file

@ -22,30 +22,30 @@ namespace syncer {
NodeState::NodeState() { sync_message_versions_taken_.fill(-1); }
bool NodeState::SetComponent(RayComponentId cid,
bool NodeState::SetComponent(MessageType message_type,
const ReporterInterface *reporter,
ReceiverInterface *receiver) {
if (cid < static_cast<RayComponentId>(kComponentArraySize) &&
reporters_[cid] == nullptr && receivers_[cid] == nullptr) {
reporters_[cid] = reporter;
receivers_[cid] = receiver;
if (message_type < static_cast<MessageType>(kComponentArraySize) &&
reporters_[message_type] == nullptr && receivers_[message_type] == nullptr) {
reporters_[message_type] = reporter;
receivers_[message_type] = receiver;
return true;
} else {
RAY_LOG(FATAL) << "Fail to set components, component_id:" << cid
RAY_LOG(FATAL) << "Fail to set components, message_type:" << message_type
<< ", reporter:" << reporter << ", receiver:" << receiver;
return false;
}
}
std::optional<RaySyncMessage> NodeState::CreateSyncMessage(RayComponentId cid) {
if (reporters_[cid] == nullptr) {
std::optional<RaySyncMessage> NodeState::CreateSyncMessage(MessageType message_type) {
if (reporters_[message_type] == nullptr) {
return std::nullopt;
}
auto message =
reporters_[cid]->CreateSyncMessage(sync_message_versions_taken_[cid], cid);
auto message = reporters_[message_type]->CreateSyncMessage(
sync_message_versions_taken_[message_type], message_type);
if (message != std::nullopt) {
sync_message_versions_taken_[cid] = message->version();
RAY_LOG(DEBUG) << "Sync message taken: cid:" << cid
sync_message_versions_taken_[message_type] = message->version();
RAY_LOG(DEBUG) << "Sync message taken: message_type:" << message_type
<< ", version:" << message->version()
<< ", node:" << NodeID::FromBinary(message->node_id());
}
@ -53,7 +53,7 @@ std::optional<RaySyncMessage> NodeState::CreateSyncMessage(RayComponentId cid) {
}
bool NodeState::ConsumeSyncMessage(std::shared_ptr<const RaySyncMessage> message) {
auto &current = cluster_view_[message->node_id()][message->component_id()];
auto &current = cluster_view_[message->node_id()][message->message_type()];
RAY_LOG(DEBUG) << "ConsumeSyncMessage: " << (current ? current->version() : -1)
<< " message_version: " << message->version()
@ -64,7 +64,7 @@ bool NodeState::ConsumeSyncMessage(std::shared_ptr<const RaySyncMessage> message
}
current = message;
auto receiver = receivers_[message->component_id()];
auto receiver = receivers_[message->message_type()];
if (receiver != nullptr) {
receiver->ConsumeSyncMessage(message);
}
@ -83,11 +83,11 @@ void NodeSyncConnection::ReceiveUpdate(RaySyncMessages messages) {
for (auto &message : *messages.mutable_sync_messages()) {
auto &node_versions = GetNodeComponentVersions(message.node_id());
RAY_LOG(DEBUG) << "Receive update: "
<< " component_id=" << message.component_id()
<< " message_type=" << message.message_type()
<< ", message_version=" << message.version()
<< ", local_message_version=" << node_versions[message.component_id()];
if (node_versions[message.component_id()] < message.version()) {
node_versions[message.component_id()] = message.version();
<< ", local_message_version=" << node_versions[message.message_type()];
if (node_versions[message.message_type()] < message.version()) {
node_versions[message.message_type()] = message.version();
message_processor_(std::make_shared<RaySyncMessage>(std::move(message)));
}
}
@ -104,9 +104,9 @@ bool NodeSyncConnection::PushToSendingQueue(
}
auto &node_versions = GetNodeComponentVersions(message->node_id());
if (node_versions[message->component_id()] < message->version()) {
node_versions[message->component_id()] = message->version();
sending_buffer_[std::make_pair(message->node_id(), message->component_id())] =
if (node_versions[message->message_type()] < message->version()) {
node_versions[message->message_type()] = message->version();
sending_buffer_[std::make_pair(message->node_id(), message->message_type())] =
message;
return true;
}
@ -323,30 +323,30 @@ void RaySyncer::Disconnect(const std::string &node_id) {
"RaySyncerDisconnect");
}
bool RaySyncer::Register(RayComponentId component_id,
bool RaySyncer::Register(MessageType message_type,
const ReporterInterface *reporter,
ReceiverInterface *receiver,
int64_t pull_from_reporter_interval_ms) {
if (!node_state_->SetComponent(component_id, reporter, receiver)) {
if (!node_state_->SetComponent(message_type, reporter, receiver)) {
return false;
}
// Set job to pull from reporter periodically
if (reporter != nullptr && pull_from_reporter_interval_ms > 0) {
timer_.RunFnPeriodically(
[this, component_id]() { OnDemandBroadcasting(component_id); },
[this, message_type]() { OnDemandBroadcasting(message_type); },
pull_from_reporter_interval_ms);
}
RAY_LOG(DEBUG) << "Registered components: "
<< "component_id:" << component_id << ", reporter:" << reporter
<< "message_type:" << message_type << ", reporter:" << reporter
<< ", receiver:" << receiver
<< ", pull_from_reporter_interval_ms:" << pull_from_reporter_interval_ms;
return true;
}
bool RaySyncer::OnDemandBroadcasting(RayComponentId component_id) {
auto msg = node_state_->CreateSyncMessage(component_id);
bool RaySyncer::OnDemandBroadcasting(MessageType message_type) {
auto msg = node_state_->CreateSyncMessage(message_type);
if (msg) {
RAY_CHECK(msg->node_id() == GetLocalNodeID());
BroadcastMessage(std::make_shared<RaySyncMessage>(std::move(*msg)));

View file

@ -29,14 +29,14 @@ namespace syncer {
using ray::rpc::syncer::DummyRequest;
using ray::rpc::syncer::DummyResponse;
using ray::rpc::syncer::RayComponentId;
using ray::rpc::syncer::MessageType;
using ray::rpc::syncer::RaySyncMessage;
using ray::rpc::syncer::RaySyncMessages;
using ray::rpc::syncer::StartSyncRequest;
using ray::rpc::syncer::StartSyncResponse;
static constexpr size_t kComponentArraySize =
static_cast<size_t>(ray::rpc::syncer::RayComponentId_ARRAYSIZE);
static_cast<size_t>(ray::rpc::syncer::MessageType_ARRAYSIZE);
/// The interface for a reporter. Reporter is defined to be a local module which would
/// like to let the other nodes know its state. For example, local cluster resource
@ -49,13 +49,13 @@ struct ReporterInterface {
///
/// \param version_after Request message with version after `version_after`. If the
/// reporter doesn't have the qualified one, just return std::nullopt
/// \param component_id The component id asked for.
/// \param message_type The message type asked for.
///
/// \return std::nullopt if the reporter doesn't have such component or the current
/// snapshot of the component is not newer the asked one. Otherwise, return the
/// actual message.
virtual std::optional<RaySyncMessage> CreateSyncMessage(
int64_t version_after, RayComponentId component_id) const = 0;
int64_t version_after, MessageType message_type) const = 0;
virtual ~ReporterInterface() {}
};
@ -114,14 +114,14 @@ class RaySyncer {
/// it'll have a global view of the cluster.
///
///
/// \param component_id The component to sync.
/// \param message_type The message type of the component.
/// \param reporter The local component to be broadcasted.
/// \param receiver The consumer of the sync message sent by the other nodes in the
/// cluster.
/// \param pull_from_reporter_interval_ms The frequence to pull a message. 0 means
/// never pull a message in syncer.
/// from reporter and push it to sending queue.
bool Register(RayComponentId component_id,
bool Register(MessageType message_type,
const ReporterInterface *reporter,
ReceiverInterface *receiver,
int64_t pull_from_reporter_interval_ms = 100);
@ -132,10 +132,10 @@ class RaySyncer {
/// Request trigger a broadcasting for a specific component immediately instead of
/// waiting for ray syncer to poll the message.
///
/// \param component_id The component to check.
/// \param message_type The component to check.
/// \return true if a message is generated. If the component doesn't have a new
/// version of message, false will be returned.
bool OnDemandBroadcasting(RayComponentId component_id);
bool OnDemandBroadcasting(MessageType message_type);
private:
/// Get the io_context used by RaySyncer.

View file

@ -44,10 +44,10 @@ namespace syncer {
constexpr size_t kTestComponents = 1;
RaySyncMessage MakeMessage(RayComponentId cid, int64_t version, const NodeID &id) {
RaySyncMessage MakeMessage(MessageType cid, int64_t version, const NodeID &id) {
auto msg = RaySyncMessage();
msg.set_version(version);
msg.set_component_id(cid);
msg.set_message_type(cid);
msg.set_node_id(id.Binary());
return msg;
}
@ -66,7 +66,7 @@ class RaySyncerTest : public ::testing::Test {
return std::nullopt;
} else {
auto msg = RaySyncMessage();
msg.set_component_id(static_cast<RayComponentId>(cid));
msg.set_message_type(static_cast<MessageType>(cid));
msg.set_version(++local_versions_[cid]);
return std::make_optional(std::move(msg));
}
@ -82,15 +82,15 @@ class RaySyncerTest : public ::testing::Test {
syncer_ = std::make_unique<RaySyncer>(io_context_, local_id_.Binary());
}
MockReporterInterface *GetReporter(RayComponentId cid) {
MockReporterInterface *GetReporter(MessageType cid) {
return reporters_[static_cast<size_t>(cid)].get();
}
MockReceiverInterface *GetReceiver(RayComponentId cid) {
MockReceiverInterface *GetReceiver(MessageType cid) {
return receivers_[static_cast<size_t>(cid)].get();
}
int64_t &LocalVersion(RayComponentId cid) {
int64_t &LocalVersion(MessageType cid) {
return local_versions_[static_cast<size_t>(cid)];
}
@ -114,32 +114,29 @@ class RaySyncerTest : public ::testing::Test {
TEST_F(RaySyncerTest, NodeStateCreateSyncMessage) {
auto node_status = std::make_unique<NodeState>();
node_status->SetComponent(RayComponentId::RESOURCE_MANAGER, nullptr, nullptr);
ASSERT_EQ(std::nullopt,
node_status->CreateSyncMessage(RayComponentId::RESOURCE_MANAGER));
node_status->SetComponent(MessageType::RESOURCE_VIEW, nullptr, nullptr);
ASSERT_EQ(std::nullopt, node_status->CreateSyncMessage(MessageType::RESOURCE_VIEW));
auto reporter = std::make_unique<MockReporterInterface>();
ASSERT_TRUE(node_status->SetComponent(RayComponentId::RESOURCE_MANAGER,
GetReporter(RayComponentId::RESOURCE_MANAGER),
nullptr));
ASSERT_TRUE(node_status->SetComponent(
MessageType::RESOURCE_VIEW, GetReporter(MessageType::RESOURCE_VIEW), nullptr));
// Take a snapshot
auto msg = node_status->CreateSyncMessage(RayComponentId::RESOURCE_MANAGER);
ASSERT_EQ(LocalVersion(RayComponentId::RESOURCE_MANAGER), msg->version());
auto msg = node_status->CreateSyncMessage(MessageType::RESOURCE_VIEW);
ASSERT_EQ(LocalVersion(MessageType::RESOURCE_VIEW), msg->version());
// Revert one version back.
LocalVersion(RayComponentId::RESOURCE_MANAGER) -= 1;
msg = node_status->CreateSyncMessage(RayComponentId::RESOURCE_MANAGER);
LocalVersion(MessageType::RESOURCE_VIEW) -= 1;
msg = node_status->CreateSyncMessage(MessageType::RESOURCE_VIEW);
ASSERT_EQ(std::nullopt, msg);
}
TEST_F(RaySyncerTest, NodeStateConsume) {
auto node_status = std::make_unique<NodeState>();
node_status->SetComponent(RayComponentId::RESOURCE_MANAGER,
nullptr,
GetReceiver(RayComponentId::RESOURCE_MANAGER));
node_status->SetComponent(
MessageType::RESOURCE_VIEW, nullptr, GetReceiver(MessageType::RESOURCE_VIEW));
auto from_node_id = NodeID::FromRandom();
// The first time receiver the message
auto msg = MakeMessage(RayComponentId::RESOURCE_MANAGER, 0, from_node_id);
auto msg = MakeMessage(MessageType::RESOURCE_VIEW, 0, from_node_id);
ASSERT_TRUE(node_status->ConsumeSyncMessage(std::make_shared<RaySyncMessage>(msg)));
ASSERT_FALSE(node_status->ConsumeSyncMessage(std::make_shared<RaySyncMessage>(msg)));
@ -156,7 +153,7 @@ TEST_F(RaySyncerTest, NodeSyncConnection) {
node_id.Binary(),
[](std::shared_ptr<ray::rpc::syncer::RaySyncMessage>) {});
auto from_node_id = NodeID::FromRandom();
auto msg = MakeMessage(RayComponentId::RESOURCE_MANAGER, 0, from_node_id);
auto msg = MakeMessage(MessageType::RESOURCE_VIEW, 0, from_node_id);
// First push will succeed and the second one will be deduplicated.
ASSERT_TRUE(sync_connection.PushToSendingQueue(std::make_shared<RaySyncMessage>(msg)));
@ -164,9 +161,9 @@ TEST_F(RaySyncerTest, NodeSyncConnection) {
ASSERT_EQ(1, sync_connection.sending_buffer_.size());
ASSERT_EQ(0, sync_connection.sending_buffer_.begin()->second->version());
ASSERT_EQ(1, sync_connection.node_versions_.size());
ASSERT_EQ(0,
sync_connection
.node_versions_[from_node_id.Binary()][RayComponentId::RESOURCE_MANAGER]);
ASSERT_EQ(
0,
sync_connection.node_versions_[from_node_id.Binary()][MessageType::RESOURCE_VIEW]);
msg.set_version(2);
ASSERT_TRUE(sync_connection.PushToSendingQueue(std::make_shared<RaySyncMessage>(msg)));
@ -175,9 +172,9 @@ TEST_F(RaySyncerTest, NodeSyncConnection) {
ASSERT_EQ(1, sync_connection.sending_buffer_.size());
ASSERT_EQ(1, sync_connection.node_versions_.size());
ASSERT_EQ(2, sync_connection.sending_buffer_.begin()->second->version());
ASSERT_EQ(2,
sync_connection
.node_versions_[from_node_id.Binary()][RayComponentId::RESOURCE_MANAGER]);
ASSERT_EQ(
2,
sync_connection.node_versions_[from_node_id.Binary()][MessageType::RESOURCE_VIEW]);
}
struct SyncerServerTest {
@ -208,7 +205,7 @@ struct SyncerServerTest {
iter = received_versions.find(message->node_id());
}
received_versions[message->node_id()][message->component_id()] =
received_versions[message->node_id()][message->message_type()] =
message->version();
message_consumed[message->node_id()]++;
};
@ -222,7 +219,7 @@ struct SyncerServerTest {
return std::nullopt;
} else {
auto msg = RaySyncMessage();
msg.set_component_id(static_cast<RayComponentId>(cid));
msg.set_message_type(static_cast<MessageType>(cid));
msg.set_version(local_versions[cid]);
msg.set_node_id(syncer->GetLocalNodeID());
snapshot_taken++;
@ -233,7 +230,7 @@ struct SyncerServerTest {
EXPECT_CALL(*reporter, CreateSyncMessage(_, Eq(cid)))
.WillRepeatedly(WithArg<0>(Invoke(take_snapshot)));
syncer->Register(
static_cast<RayComponentId>(cid), reporter.get(), receivers[cid].get());
static_cast<MessageType>(cid), reporter.get(), receivers[cid].get());
}
thread = std::make_unique<std::thread>([this] {
boost::asio::io_context::work work(io_context);
@ -376,10 +373,10 @@ TEST(SyncerTest, Test1To1) {
auto s2 = SyncerServerTest("19991");
// Make sure the setup is correct
ASSERT_NE(nullptr, s1.receivers[RayComponentId::RESOURCE_MANAGER]);
ASSERT_NE(nullptr, s2.receivers[RayComponentId::RESOURCE_MANAGER]);
ASSERT_NE(nullptr, s1.reporters[RayComponentId::RESOURCE_MANAGER]);
ASSERT_NE(nullptr, s2.reporters[RayComponentId::RESOURCE_MANAGER]);
ASSERT_NE(nullptr, s1.receivers[MessageType::RESOURCE_VIEW]);
ASSERT_NE(nullptr, s2.receivers[MessageType::RESOURCE_VIEW]);
ASSERT_NE(nullptr, s1.reporters[MessageType::RESOURCE_VIEW]);
ASSERT_NE(nullptr, s2.reporters[MessageType::RESOURCE_VIEW]);
auto channel_to_s2 = MakeChannel("19991");
@ -420,7 +417,7 @@ TEST(SyncerTest, Test1To1) {
// Make sure s2 send the new message to s1.
ASSERT_TRUE(s1.WaitUntil(
[&s1, node_id = s2.syncer->GetLocalNodeID()]() {
return s1.GetReceivedVersions(node_id)[RayComponentId::RESOURCE_MANAGER] == 1 &&
return s1.GetReceivedVersions(node_id)[MessageType::RESOURCE_VIEW] == 1 &&
s1.GetNumConsumedMessages(node_id) == 2;
},
5));
@ -633,11 +630,11 @@ bool TestCorrectness(std::function<TClusterView(RaySyncer &syncer)> get_cluster_
for (size_t i = 0; i < 1000000; ++i) {
auto server_idx = choose_server(gen);
auto component_id = choose_component(gen);
auto message_type = choose_component(gen);
if (server_idx == 0) {
component_id = 0;
message_type = 0;
}
servers[server_idx]->local_versions[component_id]++;
servers[server_idx]->local_versions[message_type]++;
// expect to sleep for 100 times for the whole loop.
if (rand_sleep(gen) < 100) {
std::this_thread::sleep_for(100ms);

View file

@ -49,12 +49,12 @@ class LocalNode : public ReporterInterface {
}
std::optional<RaySyncMessage> CreateSyncMessage(int64_t current_version,
RayComponentId) const override {
MessageType) const override {
if (current_version > version_) {
return std::nullopt;
}
ray::rpc::syncer::RaySyncMessage msg;
msg.set_component_id(ray::rpc::syncer::RayComponentId::RESOURCE_MANAGER);
msg.set_message_type(ray::rpc::syncer::MessageType::RESOURCE_VIEW);
msg.set_version(version_);
msg.set_sync_message(
std::string(reinterpret_cast<const char *>(&state_), sizeof(state_)));
@ -103,9 +103,8 @@ int main(int argc, char *argv[]) {
std::unique_ptr<RaySyncerService> service;
std::unique_ptr<grpc::Server> server;
std::shared_ptr<grpc::Channel> channel;
syncer.Register(ray::rpc::syncer::RayComponentId::RESOURCE_MANAGER,
local_node.get(),
remote_node.get());
syncer.Register(
ray::rpc::syncer::MessageType::RESOURCE_VIEW, local_node.get(), remote_node.get());
if (server_port != ".") {
RAY_LOG(INFO) << "Start server on port " << server_port;
auto server_address = "0.0.0.0:" + server_port;

View file

@ -27,7 +27,7 @@ GcsPlacementGroupScheduler::GcsPlacementGroupScheduler(
GcsResourceManager &gcs_resource_manager,
ClusterResourceScheduler &cluster_resource_scheduler,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
gcs_syncer::RaySyncer &ray_syncer)
gcs_syncer::RaySyncer *ray_syncer)
: return_timer_(io_context),
gcs_table_storage_(std::move(gcs_table_storage)),
gcs_node_manager_(gcs_node_manager),
@ -237,14 +237,18 @@ void GcsPlacementGroupScheduler::CancelResourceReserve(
RAY_LOG(DEBUG) << "Finished cancelling the resource reserved for bundle: "
<< bundle_spec->DebugString() << " at node " << node_id;
std::vector<std::string> resource_names;
rpc::NodeResourceChange node_resource_change;
auto &resources = bundle_spec->GetFormattedResources();
for (const auto &iter : resources) {
resource_names.push_back(iter.first);
node_resource_change.add_deleted_resources(iter.first);
}
node_resource_change.set_node_id(node_id.Binary());
ray_syncer_.Update(std::move(node_resource_change));
if (ray_syncer_ != nullptr) {
rpc::NodeResourceChange node_resource_change;
for (const auto &iter : resources) {
node_resource_change.add_deleted_resources(iter.first);
}
node_resource_change.set_node_id(node_id.Binary());
ray_syncer_->Update(std::move(node_resource_change));
}
gcs_resource_manager_.DeleteResources(node_id, std::move(resource_names));
});
}
@ -298,12 +302,14 @@ void GcsPlacementGroupScheduler::CommitAllBundles(
auto &resources = bundle->GetFormattedResources();
gcs_resource_manager_.UpdateResources(node_id, resources);
// Push the message to syncer so that it can be broadcasted to all other nodes
rpc::NodeResourceChange node_resource_change;
node_resource_change.set_node_id(node_id.Binary());
node_resource_change.mutable_updated_resources()->insert(resources.begin(),
resources.end());
ray_syncer_.Update(std::move(node_resource_change));
if (ray_syncer_ != nullptr) {
// Push the message to syncer so that it can be broadcasted to all other nodes
rpc::NodeResourceChange node_resource_change;
node_resource_change.set_node_id(node_id.Binary());
node_resource_change.mutable_updated_resources()->insert(resources.begin(),
resources.end());
ray_syncer_->Update(std::move(node_resource_change));
}
}
if (lease_status_tracker->AllCommitRequestReturned()) {
OnAllBundleCommitRequestReturned(

View file

@ -338,7 +338,7 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
GcsResourceManager &gcs_resource_manager,
ClusterResourceScheduler &cluster_resource_scheduler,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
gcs_syncer::RaySyncer &ray_syncer);
gcs_syncer::RaySyncer *ray_syncer);
virtual ~GcsPlacementGroupScheduler() = default;
@ -515,7 +515,7 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
/// The syncer of resource. This is used to report placement group updates.
/// TODO (iycheng): Remove this one from pg once we finish the refactor
gcs_syncer::RaySyncer &ray_syncer_;
gcs_syncer::RaySyncer *ray_syncer_;
};
} // namespace gcs

View file

@ -21,13 +21,34 @@ namespace ray {
namespace gcs {
GcsResourceManager::GcsResourceManager(
instrumented_io_context &io_context,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
ClusterResourceManager &cluster_resource_manager,
scheduling::NodeID local_node_id)
: gcs_table_storage_(gcs_table_storage),
: io_context_(io_context),
gcs_table_storage_(gcs_table_storage),
cluster_resource_manager_(cluster_resource_manager),
local_node_id_(local_node_id) {}
void GcsResourceManager::ConsumeSyncMessage(
std::shared_ptr<const syncer::RaySyncMessage> message) {
// ConsumeSyncMessage is called by ray_syncer which might not run
// in a dedicated thread for performance.
// GcsResourceManager is a module always run in the main thread, so we just
// delegate the work to the main thread for thread safety.
// Ideally, all public api in GcsResourceManager need to be put into this
// io context for thread safety.
io_context_.dispatch(
[this, message]() {
rpc::ResourcesData resources;
resources.ParseFromString(message->sync_message());
resources.set_node_id(message->node_id());
RAY_CHECK(message->message_type() == syncer::MessageType::RESOURCE_VIEW);
UpdateFromResourceReport(resources);
},
"GcsResourceManager::Update");
}
void GcsResourceManager::HandleGetResources(const rpc::GetResourcesRequest &request,
rpc::GetResourcesReply *reply,
rpc::SendReplyCallback send_reply_callback) {

View file

@ -16,6 +16,7 @@
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "ray/common/id.h"
#include "ray/common/ray_syncer/ray_syncer.h"
#include "ray/gcs/gcs_server/gcs_init_data.h"
#include "ray/gcs/gcs_server/gcs_table_storage.h"
#include "ray/raylet/scheduling/cluster_resource_data.h"
@ -44,18 +45,23 @@ namespace gcs {
/// It is responsible for handing node resource related rpc requests and it is used for
/// actor and placement group scheduling. It obtains the available resources of nodes
/// through heartbeat reporting. Non-thread safe.
class GcsResourceManager : public rpc::NodeResourceInfoHandler {
class GcsResourceManager : public rpc::NodeResourceInfoHandler,
public syncer::ReceiverInterface {
public:
/// Create a GcsResourceManager.
///
/// \param gcs_table_storage GCS table external storage accessor.
explicit GcsResourceManager(
instrumented_io_context &io_context,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
ClusterResourceManager &cluster_resource_manager,
scheduling::NodeID local_node_id_ = scheduling::NodeID::Nil());
virtual ~GcsResourceManager() {}
/// Handle the resource update.
void ConsumeSyncMessage(std::shared_ptr<const syncer::RaySyncMessage> message) override;
/// Handle get resource rpc request.
void HandleGetResources(const rpc::GetResourcesRequest &request,
rpc::GetResourcesReply *reply,
@ -140,6 +146,10 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler {
void UpdateResourceLoads(const rpc::ResourcesData &data);
private:
/// io context. This is to ensure thread safety. Ideally, all public
/// funciton needs to post job to this io_context.
instrumented_io_context &io_context_;
/// Newest resource usage of all nodes.
absl::flat_hash_map<NodeID, rpc::ResourcesData> node_resource_usages_;

View file

@ -49,6 +49,7 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config,
RayConfig::instance().gcs_server_rpc_client_thread_num()),
raylet_client_pool_(
std::make_shared<rpc::NodeManagerClientPool>(client_call_manager_)),
local_node_id_(NodeID::FromRandom()),
pubsub_periodical_runner_(pubsub_io_service_),
periodical_runner_(main_service),
is_started_(false),
@ -205,8 +206,12 @@ void GcsServer::Stop() {
// won't handle heartbeat calls anymore, some nodes will be marked as dead during this
// time, causing many nodes die after GCS's failure.
gcs_heartbeat_manager_->Stop();
ray_syncer_->Stop();
if (RayConfig::instance().use_ray_syncer()) {
ray_syncer_io_context_.stop();
ray_syncer_thread_->join();
} else {
gcs_ray_syncer_->Stop();
}
// Shutdown the rpc server
rpc_server_.Shutdown();
@ -251,6 +256,7 @@ void GcsServer::InitGcsHeartbeatManager(const GcsInitData &gcs_init_data) {
void GcsServer::InitGcsResourceManager(const GcsInitData &gcs_init_data) {
RAY_CHECK(gcs_table_storage_ && cluster_resource_scheduler_);
gcs_resource_manager_ = std::make_shared<GcsResourceManager>(
main_service_,
gcs_table_storage_,
cluster_resource_scheduler_->GetClusterResourceManager(),
scheduling::NodeID(local_node_id_.Binary()));
@ -298,7 +304,6 @@ void GcsServer::InitGcsResourceManager(const GcsInitData &gcs_init_data) {
}
void GcsServer::InitClusterResourceScheduler() {
local_node_id_ = NodeID::FromRandom();
cluster_resource_scheduler_ = std::make_shared<ClusterResourceScheduler>(
scheduling::NodeID(local_node_id_.Binary()),
NodeResources(),
@ -417,7 +422,7 @@ void GcsServer::InitGcsPlacementGroupManager(const GcsInitData &gcs_init_data) {
*gcs_resource_manager_,
*cluster_resource_scheduler_,
raylet_client_pool_,
*ray_syncer_);
gcs_ray_syncer_.get());
gcs_placement_group_manager_ = std::make_shared<GcsPlacementGroupManager>(
main_service_,
@ -468,15 +473,39 @@ void GcsServer::StoreGcsServerAddressInRedis() {
}
void GcsServer::InitRaySyncer(const GcsInitData &gcs_init_data) {
/*
The current synchronization flow is:
raylet -> syncer::poller --> syncer::update -> gcs_resource_manager
gcs_placement_scheduler --/
*/
ray_syncer_ = std::make_unique<gcs_syncer::RaySyncer>(
main_service_, raylet_client_pool_, *gcs_resource_manager_);
ray_syncer_->Initialize(gcs_init_data);
ray_syncer_->Start();
if (RayConfig::instance().use_ray_syncer()) {
ray_syncer_ = std::make_unique<syncer::RaySyncer>(ray_syncer_io_context_,
local_node_id_.Binary());
ray_syncer_->Register(
syncer::MessageType::RESOURCE_VIEW, nullptr, gcs_resource_manager_.get());
ray_syncer_thread_ = std::make_unique<std::thread>([this]() {
boost::asio::io_service::work work(ray_syncer_io_context_);
ray_syncer_io_context_.run();
});
for (const auto &pair : gcs_init_data.Nodes()) {
if (pair.second.state() ==
rpc::GcsNodeInfo_GcsNodeState::GcsNodeInfo_GcsNodeState_ALIVE) {
rpc::Address address;
address.set_raylet_id(pair.second.node_id());
address.set_ip_address(pair.second.node_manager_address());
address.set_port(pair.second.node_manager_port());
auto raylet_client = raylet_client_pool_->GetOrConnectByAddress(address);
ray_syncer_->Connect(raylet_client->GetChannel());
}
}
} else {
/*
The current synchronization flow is:
raylet -> syncer::poller --> syncer::update -> gcs_resource_manager
gcs_placement_scheduler --/
*/
gcs_ray_syncer_ = std::make_unique<gcs_syncer::RaySyncer>(
main_service_, raylet_client_pool_, *gcs_resource_manager_);
gcs_ray_syncer_->Initialize(gcs_init_data);
gcs_ray_syncer_->Start();
}
}
void GcsServer::InitStatsHandler() {
@ -564,12 +593,23 @@ void GcsServer::InstallEventListeners() {
gcs_node_manager_->AddNodeAddedListener([this](std::shared_ptr<rpc::GcsNodeInfo> node) {
// Because a new node has been added, we need to try to schedule the pending
// placement groups and the pending actors.
auto node_id = NodeID::FromBinary(node->node_id());
gcs_resource_manager_->OnNodeAdd(*node);
gcs_placement_group_manager_->OnNodeAdd(NodeID::FromBinary(node->node_id()));
gcs_placement_group_manager_->OnNodeAdd(node_id);
gcs_actor_manager_->SchedulePendingActors();
cluster_task_manager_->ScheduleAndDispatchTasks();
gcs_heartbeat_manager_->AddNode(NodeID::FromBinary(node->node_id()));
ray_syncer_->AddNode(*node);
cluster_task_manager_->ScheduleAndDispatchTasks();
if (RayConfig::instance().use_ray_syncer()) {
rpc::Address address;
address.set_raylet_id(node->node_id());
address.set_ip_address(node->node_manager_address());
address.set_port(node->node_manager_port());
auto raylet_client = raylet_client_pool_->GetOrConnectByAddress(address);
ray_syncer_->Connect(raylet_client->GetChannel());
} else {
gcs_ray_syncer_->AddNode(*node);
}
});
gcs_node_manager_->AddNodeRemovedListener(
[this](std::shared_ptr<rpc::GcsNodeInfo> node) {
@ -581,7 +621,11 @@ void GcsServer::InstallEventListeners() {
gcs_placement_group_manager_->OnNodeDead(node_id);
gcs_actor_manager_->OnNodeDead(node_id, node_ip_address);
raylet_client_pool_->Disconnect(NodeID::FromBinary(node->node_id()));
ray_syncer_->RemoveNode(*node);
if (RayConfig::instance().use_ray_syncer()) {
ray_syncer_->Disconnect(node_id.Binary());
} else {
gcs_ray_syncer_->RemoveNode(*node);
}
});
// Install worker event listener.
@ -648,8 +692,9 @@ std::string GcsServer::GetDebugState() const {
<< gcs_placement_group_manager_->DebugString() << "\n\n"
<< gcs_publisher_->DebugString() << "\n\n"
<< runtime_env_manager_->DebugString() << "\n\n";
stream << ray_syncer_->DebugString();
if (gcs_ray_syncer_) {
stream << gcs_ray_syncer_->DebugString();
}
return stream.str();
}

View file

@ -15,6 +15,7 @@
#pragma once
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/ray_syncer/ray_syncer.h"
#include "ray/common/runtime_env_manager.h"
#include "ray/gcs/gcs_server/gcs_function_manager.h"
#include "ray/gcs/gcs_server/gcs_heartbeat_manager.h"
@ -217,8 +218,20 @@ class GcsServer {
/// Stats handler and service.
std::unique_ptr<rpc::StatsHandler> stats_handler_;
std::unique_ptr<rpc::StatsGrpcService> stats_service_;
// Synchronization service for ray.
std::unique_ptr<gcs_syncer::RaySyncer> ray_syncer_;
/// Synchronization service for ray.
/// TODO(iycheng): Deprecate this gcs_ray_syncer_ one once we roll out
/// to ray_syncer_.
std::unique_ptr<gcs_syncer::RaySyncer> gcs_ray_syncer_;
/// Ray Syncer realted fields.
std::unique_ptr<syncer::RaySyncer> ray_syncer_;
std::unique_ptr<std::thread> ray_syncer_thread_;
instrumented_io_context ray_syncer_io_context_;
/// The node id of GCS.
NodeID gcs_node_id_;
/// The gcs worker manager.
std::unique_ptr<GcsWorkerManager> gcs_worker_manager_;
/// Worker info service.

View file

@ -61,7 +61,9 @@ class GcsActorSchedulerTest : public ::testing::Test {
/*local_task_manager=*/
nullptr);
auto gcs_resource_manager = std::make_shared<gcs::GcsResourceManager>(
gcs_table_storage_, cluster_resource_scheduler->GetClusterResourceManager());
io_service_,
gcs_table_storage_,
cluster_resource_scheduler->GetClusterResourceManager());
gcs_actor_scheduler_ = std::make_shared<GcsServerMocker::MockedGcsActorScheduler>(
io_service_,
*gcs_actor_table_,

View file

@ -37,8 +37,8 @@ class GcsPlacementGroupManagerMockTest : public Test {
gcs_table_storage_ = std::make_shared<GcsTableStorage>(store_client_);
gcs_placement_group_scheduler_ =
std::make_shared<MockGcsPlacementGroupSchedulerInterface>();
resource_manager_ =
std::make_shared<MockGcsResourceManager>(nullptr, cluster_resource_manager_);
resource_manager_ = std::make_shared<MockGcsResourceManager>(
io_context_, nullptr, cluster_resource_manager_);
gcs_placement_group_manager_ =
std::make_unique<GcsPlacementGroupManager>(io_context_,

View file

@ -84,8 +84,8 @@ class GcsPlacementGroupManagerTest : public ::testing::Test {
gcs_publisher_ =
std::make_shared<GcsPublisher>(std::make_unique<ray::pubsub::MockPublisher>());
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
gcs_resource_manager_ =
std::make_shared<gcs::GcsResourceManager>(nullptr, cluster_resource_manager_);
gcs_resource_manager_ = std::make_shared<gcs::GcsResourceManager>(
io_service_, nullptr, cluster_resource_manager_);
gcs_placement_group_manager_.reset(new gcs::GcsPlacementGroupManager(
io_service_,
mock_placement_group_scheduler_,

View file

@ -53,7 +53,9 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
[](auto) { return true; },
/*is_local_node_with_raylet=*/false);
gcs_resource_manager_ = std::make_shared<gcs::GcsResourceManager>(
gcs_table_storage_, cluster_resource_scheduler_->GetClusterResourceManager());
io_service_,
gcs_table_storage_,
cluster_resource_scheduler_->GetClusterResourceManager());
ray_syncer_ = std::make_shared<ray::gcs_syncer::RaySyncer>(
io_service_, nullptr, *gcs_resource_manager_);
store_client_ = std::make_shared<gcs::InMemoryStoreClient>(io_service_);
@ -68,7 +70,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
*gcs_resource_manager_,
*cluster_resource_scheduler_,
raylet_client_pool_,
*ray_syncer_);
ray_syncer_.get());
}
void TearDown() override {

View file

@ -28,10 +28,11 @@ using ::testing::_;
class GcsResourceManagerTest : public ::testing::Test {
public:
GcsResourceManagerTest() {
gcs_resource_manager_ =
std::make_shared<gcs::GcsResourceManager>(nullptr, cluster_resource_manager_);
gcs_resource_manager_ = std::make_shared<gcs::GcsResourceManager>(
io_service_, nullptr, cluster_resource_manager_);
}
instrumented_io_context io_service_;
ClusterResourceManager cluster_resource_manager_;
std::shared_ptr<gcs::GcsResourceManager> gcs_resource_manager_;
};

View file

@ -72,6 +72,8 @@ struct GcsServerMocker {
return Status::OK();
}
std::shared_ptr<grpc::Channel> GetChannel() const override { return nullptr; }
void ReportWorkerBacklog(
const WorkerID &worker_id,
const std::vector<rpc::WorkerBacklogReport> &backlog_reports) override {}

View file

@ -16,16 +16,16 @@ syntax = "proto3";
option cc_enable_arenas = true;
package ray.rpc.syncer;
enum RayComponentId {
RESOURCE_MANAGER = 0;
SCHEDULER = 1;
enum MessageType {
RESOURCE_VIEW = 0;
COMMANDS = 1;
}
message RaySyncMessage {
// The version of the message. -1 means the version is not set.
int64 version = 1;
// The component this message is for.
RayComponentId component_id = 2;
// The type of this message.
MessageType message_type = 2;
// The actual payload.
bytes sync_message = 3;
// The node id which initially sent this message.

View file

@ -19,6 +19,7 @@
#include <fstream>
#include <memory>
#include "absl/time/clock.h"
#include "boost/filesystem.hpp"
#include "boost/system/error_code.hpp"
#include "ray/common/asio/asio_util.h"
@ -329,7 +330,9 @@ NodeManager::NodeManager(instrumented_io_context &io_service,
global_gc_throttler_(RayConfig::instance().global_gc_min_interval_s() * 1e9),
local_gc_interval_ns_(RayConfig::instance().local_gc_interval_s() * 1e9),
record_metrics_period_ms_(config.record_metrics_period_ms),
next_resource_seq_no_(0) {
next_resource_seq_no_(0),
ray_syncer_(io_service_, self_node_id_.Binary()),
ray_syncer_service_(ray_syncer_) {
RAY_LOG(INFO) << "Initializing NodeManager with ID " << self_node_id_;
RAY_CHECK(RayConfig::instance().raylet_heartbeat_period_milliseconds() > 0);
cluster_resource_scheduler_ = std::make_shared<ClusterResourceScheduler>(
@ -404,6 +407,9 @@ NodeManager::NodeManager(instrumented_io_context &io_service,
// Run the node manger rpc server.
node_manager_server_.RegisterService(node_manager_service_);
node_manager_server_.RegisterService(agent_manager_service_);
if (RayConfig::instance().use_ray_syncer()) {
node_manager_server_.RegisterService(ray_syncer_service_);
}
node_manager_server_.Run();
worker_pool_.SetNodeManagerPort(GetServerPort());
@ -541,6 +547,35 @@ ray::Status NodeManager::RegisterGcs() {
"NodeManager.deadline_timer.print_event_loop_stats");
}
if (RayConfig::instance().use_ray_syncer()) {
// Register resource manager and scheduler
ray_syncer_.Register(
/* message_type */ syncer::MessageType::RESOURCE_VIEW,
/* reporter */ &cluster_resource_scheduler_->GetLocalResourceManager(),
/* receiver */ this,
/* pull_from_reporter_interval_ms */
RayConfig::instance().raylet_report_resources_period_milliseconds());
// Register a commands channel.
// It's only used for GC right now.
ray_syncer_.Register(
/* message_type */ syncer::MessageType::COMMANDS,
/* reporter */ this,
/* receiver */ this,
/* pull_from_reporter_interval_ms */ 0);
periodical_runner_.RunFnPeriodically(
[this] {
auto triggered_by_global_gc = TryLocalGC();
// If plasma store is under high pressure, we should try to schedule a global
// gc.
if (triggered_by_global_gc) {
ray_syncer_.OnDemandBroadcasting(syncer::MessageType::COMMANDS);
}
},
RayConfig::instance().raylet_check_gc_period_milliseconds(),
"NodeManager.CheckGC");
}
return ray::Status::OK();
}
@ -618,30 +653,8 @@ void NodeManager::FillResourceReport(rpc::ResourcesData &resources_data) {
if (RayConfig::instance().gcs_actor_scheduling_enabled()) {
FillNormalTaskResourceUsage(resources_data);
}
// If plasma store is under high pressure, we should try to schedule a global gc.
bool plasma_high_pressure =
object_manager_.GetUsedMemoryPercentage() > high_plasma_storage_usage_;
if (plasma_high_pressure && global_gc_throttler_.AbleToRun()) {
TriggerGlobalGC();
}
// Set the global gc bit on the outgoing heartbeat message.
bool triggered_by_global_gc = false;
if (should_global_gc_) {
resources_data.set_should_global_gc(true);
triggered_by_global_gc = true;
should_global_gc_ = false;
global_gc_throttler_.RunNow();
}
// Trigger local GC if needed. This throttles the frequency of local GC calls
// to at most once per heartbeat interval.
if ((should_local_gc_ ||
(absl::GetCurrentTimeNanos() - local_gc_run_time_ns_ > local_gc_interval_ns_)) &&
local_gc_throttler_.AbleToRun()) {
DoLocalGC(triggered_by_global_gc);
should_local_gc_ = false;
}
resources_data.set_should_global_gc(TryLocalGC());
}
void NodeManager::DoLocalGC(bool triggered_by_global_gc) {
@ -1799,6 +1812,10 @@ void NodeManager::HandleCommitBundleResources(
RAY_LOG(DEBUG) << "Request to commit resources for bundles: "
<< GetDebugStringForBundles(bundle_specs);
placement_group_resource_manager_->CommitBundles(bundle_specs);
if (RayConfig::instance().use_ray_syncer()) {
// To reduce the lag, we trigger a broadcasting immediately.
RAY_CHECK(ray_syncer_.OnDemandBroadcasting(syncer::MessageType::RESOURCE_VIEW));
}
send_reply_callback(Status::OK(), nullptr, nullptr);
cluster_task_manager_->ScheduleAndDispatchTasks();
@ -1836,6 +1853,10 @@ void NodeManager::HandleCancelResourceReserve(
// Return bundle resources.
placement_group_resource_manager_->ReturnBundle(bundle_spec);
if (RayConfig::instance().use_ray_syncer()) {
// To reduce the lag, we trigger a broadcasting immediately.
RAY_CHECK(ray_syncer_.OnDemandBroadcasting(syncer::MessageType::RESOURCE_VIEW));
}
cluster_task_manager_->ScheduleAndDispatchTasks();
send_reply_callback(Status::OK(), nullptr, nullptr);
}
@ -2615,6 +2636,33 @@ void NodeManager::HandleGlobalGC(const rpc::GlobalGCRequest &request,
TriggerGlobalGC();
}
bool NodeManager::TryLocalGC() {
// If plasma store is under high pressure, we should try to schedule a global gc.
bool plasma_high_pressure =
object_manager_.GetUsedMemoryPercentage() > high_plasma_storage_usage_;
if (plasma_high_pressure && global_gc_throttler_.AbleToRun()) {
TriggerGlobalGC();
}
// Set the global gc bit on the outgoing heartbeat message.
bool triggered_by_global_gc = false;
if (should_global_gc_) {
triggered_by_global_gc = true;
should_global_gc_ = false;
global_gc_throttler_.RunNow();
}
// Trigger local GC if needed. This throttles the frequency of local GC calls
// to at most once per heartbeat interval.
if ((should_local_gc_ ||
(absl::GetCurrentTimeNanos() - local_gc_run_time_ns_ > local_gc_interval_ns_)) &&
local_gc_throttler_.AbleToRun()) {
DoLocalGC(triggered_by_global_gc);
should_local_gc_ = false;
}
return triggered_by_global_gc;
}
void NodeManager::TriggerGlobalGC() {
should_global_gc_ = true;
// We won't see our own request, so trigger local GC in the next heartbeat.
@ -2644,6 +2692,38 @@ void NodeManager::RecordMetrics() {
object_directory_->RecordMetrics(duration_ms);
}
void NodeManager::ConsumeSyncMessage(
std::shared_ptr<const syncer::RaySyncMessage> message) {
if (message->message_type() == syncer::MessageType::RESOURCE_VIEW) {
rpc::ResourcesData data;
data.ParseFromString(message->sync_message());
NodeID node_id = NodeID::FromBinary(data.node_id());
UpdateResourceUsage(node_id, data);
} else if (message->message_type() == syncer::MessageType::COMMANDS) {
rpc::ResourcesData data;
data.ParseFromString(message->sync_message());
if (data.should_global_gc()) {
should_local_gc_ = true;
}
}
}
std::optional<syncer::RaySyncMessage> NodeManager::CreateSyncMessage(
int64_t after_version, syncer::MessageType message_type) const {
RAY_CHECK(message_type == syncer::MessageType::COMMANDS);
rpc::ResourcesData resources_data;
resources_data.set_should_global_gc(true);
syncer::RaySyncMessage msg;
msg.set_version(absl::GetCurrentTimeNanos());
msg.set_node_id(self_node_id_.Binary());
msg.set_message_type(syncer::MessageType::COMMANDS);
std::string serialized_msg;
RAY_CHECK(resources_data.SerializeToString(&serialized_msg));
msg.set_sync_message(std::move(serialized_msg));
return std::make_optional(std::move(msg));
}
void NodeManager::PublishInfeasibleTaskError(const RayTask &task) const {
bool suppress_warning = false;

View file

@ -21,6 +21,7 @@
#include "ray/common/id.h"
#include "ray/common/task/task.h"
#include "ray/common/ray_object.h"
#include "ray/common/ray_syncer/ray_syncer.h"
#include "ray/common/client_connection.h"
#include "ray/common/task/task_common.h"
#include "ray/common/task/scheduling_resources.h"
@ -138,7 +139,9 @@ class HeartbeatSender {
uint64_t last_heartbeat_at_ms_;
};
class NodeManager : public rpc::NodeManagerServiceHandler {
class NodeManager : public rpc::NodeManagerServiceHandler,
public syncer::ReporterInterface,
public syncer::ReceiverInterface {
public:
/// Create a node manager.
///
@ -188,6 +191,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// Get the port of the node manager rpc server.
int GetServerPort() const { return node_manager_server_.GetPort(); }
void ConsumeSyncMessage(std::shared_ptr<const syncer::RaySyncMessage> message) override;
std::optional<syncer::RaySyncMessage> CreateSyncMessage(
int64_t after_version, syncer::MessageType message_type) const override;
int GetObjectManagerPort() const { return object_manager_.GetServerPort(); }
LocalObjectManager &GetLocalObjectManager() { return local_object_manager_; }
@ -647,6 +655,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
rpc::WorkerExitType disconnect_type = rpc::WorkerExitType::SYSTEM_ERROR_EXIT,
const rpc::RayException *creation_task_exception = nullptr);
bool TryLocalGC();
/// ID of this node.
NodeID self_node_id_;
/// The user-given identifier or name of this node.
@ -797,6 +807,12 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// Whether or not if the node draining process has already received.
bool is_node_drained_ = false;
/// Ray syncer for synchronization
syncer::RaySyncer ray_syncer_;
/// RaySyncerService for gRPC
syncer::RaySyncerService ray_syncer_service_;
};
} // namespace raylet

View file

@ -306,10 +306,13 @@ void LocalResourceManager::UpdateAvailableObjectStoreMemResource() {
RAY_CHECK_EQ(total_instances.size(), 1u);
const double used = get_used_object_store_memory_();
const double total = total_instances[0].Double();
local_resources_.available.Set(ResourceID::ObjectStoreMemory(),
{FixedPoint(total >= used ? total - used : 0.0)});
OnResourceChanged();
auto new_available =
std::vector<FixedPoint>{FixedPoint(total >= used ? total - used : 0.0)};
if (new_available != local_resources_.available.Get(ResourceID::ObjectStoreMemory())) {
local_resources_.available.Set(ResourceID::ObjectStoreMemory(),
std::move(new_available));
OnResourceChanged();
}
}
void LocalResourceManager::FillResourceUsage(rpc::ResourcesData &resources_data) {
@ -322,6 +325,7 @@ void LocalResourceManager::FillResourceUsage(rpc::ResourcesData &resources_data)
NodeResources node_resources = ResourceMapToNodeResources({{}}, {{}});
last_report_resources_.reset(new NodeResources(node_resources));
}
for (auto entry : resources.total.ToMap()) {
auto resource_id = entry.first;
auto label = ResourceID(resource_id).Binary();
@ -361,6 +365,53 @@ double LocalResourceManager::GetLocalAvailableCpus() const {
return local_resources_.available.Sum(ResourceID::CPU()).Double();
}
std::optional<syncer::RaySyncMessage> LocalResourceManager::CreateSyncMessage(
int64_t after_version, syncer::MessageType message_type) const {
RAY_CHECK(message_type == syncer::MessageType::RESOURCE_VIEW);
// We check the memory inside version, so version is not a const function.
// Ideally, we need to move the memory check somewhere else.
// TODO(iycheng): Make version as a const function.
const_cast<LocalResourceManager *>(this)->UpdateAvailableObjectStoreMemResource();
if (version_ <= after_version) {
return std::nullopt;
}
syncer::RaySyncMessage msg;
rpc::ResourcesData resources_data;
resources_data.set_node_id(local_node_id_.Binary());
NodeResources resources = ToNodeResources(local_resources_);
for (auto entry : resources.total.ToMap()) {
auto resource_id = entry.first;
auto label = ResourceID(resource_id).Binary();
auto total = entry.second;
auto available = resources.available.Get(resource_id);
resources_data.set_resources_available_changed(true);
(*resources_data.mutable_resources_available())[label] = available.Double();
(*resources_data.mutable_resources_total())[label] = total.Double();
}
if (get_pull_manager_at_capacity_ != nullptr) {
resources.object_pulls_queued = get_pull_manager_at_capacity_();
resources_data.set_object_pulls_queued(resources.object_pulls_queued);
resources_data.set_resources_available_changed(true);
}
resources_data.set_resources_available_changed(true);
msg.set_node_id(local_node_id_.Binary());
msg.set_version(version_);
msg.set_message_type(message_type);
std::string serialized_msg;
RAY_CHECK(resources_data.SerializeToString(&serialized_msg));
msg.set_sync_message(std::move(serialized_msg));
return std::make_optional(std::move(msg));
}
ray::gcs::NodeResourceInfoAccessor::ResourceMap LocalResourceManager::GetResourceTotals(
const absl::flat_hash_map<std::string, double> &resource_map_filter) const {
ray::gcs::NodeResourceInfoAccessor::ResourceMap map;
@ -380,6 +431,7 @@ ray::gcs::NodeResourceInfoAccessor::ResourceMap LocalResourceManager::GetResourc
}
void LocalResourceManager::OnResourceChanged() {
++version_;
if (resource_change_subscriber_ == nullptr) {
return;
}

View file

@ -22,6 +22,7 @@
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "ray/common/ray_syncer/ray_syncer.h"
#include "ray/common/task/scheduling_resources.h"
#include "ray/gcs/gcs_client/accessor.h"
#include "ray/gcs/gcs_client/gcs_client.h"
@ -37,7 +38,7 @@ namespace ray {
/// it also supports creating a new resource or delete an existing resource.
/// Whenever the resouce changes, it notifies the subscriber of the change.
/// This class is not thread safe.
class LocalResourceManager {
class LocalResourceManager : public syncer::ReporterInterface {
public:
LocalResourceManager(
scheduling::NodeID local_node_id,
@ -145,6 +146,9 @@ class LocalResourceManager {
/// \return true, if exist. otherwise, false.
bool ResourcesExist(scheduling::ResourceID resource_id) const;
std::optional<syncer::RaySyncMessage> CreateSyncMessage(
int64_t after_version, syncer::MessageType message_type) const override;
private:
/// Notify the subscriber that the local resouces has changed.
void OnResourceChanged();
@ -251,6 +255,9 @@ class LocalResourceManager {
// Specify custom resources that consists of unit-size instances.
std::unordered_set<int64_t> custom_unit_instance_resources_{};
// Version of this resource. It will incr by one whenever the state changed.
int64_t version_ = 0;
FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingUpdateTotalResourcesTest);
FRIEND_TEST(ClusterResourceSchedulerTest, AvailableResourceInstancesOpsTest);
FRIEND_TEST(ClusterResourceSchedulerTest, TaskResourceInstancesTest);

View file

@ -346,6 +346,10 @@ void RayletClient::RequestObjectSpillage(
grpc_client_->RequestObjectSpillage(request, callback);
}
std::shared_ptr<grpc::Channel> RayletClient::GetChannel() const {
return grpc_client_->Channel();
}
void RayletClient::ReportWorkerBacklog(
const WorkerID &worker_id,
const std::vector<rpc::WorkerBacklogReport> &backlog_reports) {

View file

@ -191,6 +191,8 @@ class RayletClientInterface : public PinObjectsInterface,
const NodeID &node_id,
bool graceful,
const rpc::ClientCallback<rpc::ShutdownRayletReply> &callback) = 0;
virtual std::shared_ptr<grpc::Channel> GetChannel() const = 0;
};
namespace raylet {
@ -433,6 +435,8 @@ class RayletClient : public RayletClientInterface {
const ObjectID &object_id,
const rpc::ClientCallback<rpc::RequestObjectSpillageReply> &callback);
std::shared_ptr<grpc::Channel> GetChannel() const override;
/// Implements WorkerLeaseInterface.
void RequestWorkerLease(
const rpc::TaskSpec &resource_spec,

View file

@ -88,7 +88,8 @@ class GrpcClient {
ClientCallManager &call_manager,
bool use_tls = false)
: client_call_manager_(call_manager), use_tls_(use_tls) {
stub_ = GrpcService::NewStub(std::move(channel));
channel_ = std::move(channel);
stub_ = GrpcService::NewStub(channel_);
}
GrpcClient(const std::string &address,
@ -97,8 +98,8 @@ class GrpcClient {
bool use_tls = false)
: client_call_manager_(call_manager), use_tls_(use_tls) {
std::shared_ptr<grpc::Channel> channel = BuildChannel(address, port);
stub_ = GrpcService::NewStub(std::move(channel));
channel_ = BuildChannel(address, port);
stub_ = GrpcService::NewStub(channel_);
}
GrpcClient(const std::string &address,
@ -116,9 +117,8 @@ class GrpcClient {
argument.SetMaxSendMessageSize(::RayConfig::instance().max_grpc_message_size());
argument.SetMaxReceiveMessageSize(::RayConfig::instance().max_grpc_message_size());
std::shared_ptr<grpc::Channel> channel = BuildChannel(address, port, argument);
stub_ = GrpcService::NewStub(std::move(channel));
channel_ = BuildChannel(address, port, argument);
stub_ = GrpcService::NewStub(channel_);
}
/// Create a new `ClientCall` and send request.
@ -152,12 +152,16 @@ class GrpcClient {
RAY_CHECK(call != nullptr);
}
std::shared_ptr<grpc::Channel> Channel() const { return channel_; }
private:
ClientCallManager &client_call_manager_;
/// The gRPC-generated stub.
std::unique_ptr<typename GrpcService::Stub> stub_;
/// Whether to use TLS.
bool use_tls_;
/// The channel of the stub.
std::shared_ptr<grpc::Channel> channel_;
};
} // namespace rpc

View file

@ -127,6 +127,10 @@ void GrpcServer::Run() {
is_closed_ = false;
}
void GrpcServer::RegisterService(grpc::Service &service) {
services_.emplace_back(service);
}
void GrpcServer::RegisterService(GrpcService &service) {
services_.emplace_back(service.GetGrpcService());

View file

@ -107,6 +107,7 @@ class GrpcServer {
///
/// \param[in] service A `GrpcService` to register to this server.
void RegisterService(GrpcService &service);
void RegisterService(grpc::Service &service);
protected:
/// This function runs in a background thread. It keeps polling events from the

View file

@ -53,6 +53,8 @@ class NodeManagerClient {
GetNodeStats(request, callback);
}
std::shared_ptr<grpc::Channel> Channel() const { return grpc_client_->Channel(); }
private:
/// The RPC client.
std::unique_ptr<GrpcClient<NodeManagerService>> grpc_client_;
@ -75,6 +77,8 @@ class NodeManagerWorkerClient
return std::shared_ptr<NodeManagerWorkerClient>(instance);
}
std::shared_ptr<grpc::Channel> Channel() const { return grpc_client_->Channel(); }
/// Update cluster resource usage.
VOID_RPC_CLIENT_METHOD(NodeManagerService,
UpdateResourceUsage,