diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index dcd1135c9..d826d4fa2 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -426,15 +426,34 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ << rpc_address_.port() << ", worker ID " << worker_context_.GetWorkerID() << ", raylet " << local_raylet_id; + // Begin to get gcs server address from raylet. + gcs_server_address_updater_ = std::make_unique( + options_.raylet_ip_address, options_.node_manager_port, + [this](std::string ip, int port) { + absl::MutexLock lock(&gcs_server_address_mutex_); + gcs_server_address_.first = ip; + gcs_server_address_.second = port; + }); + // Initialize gcs client. - // As asynchronous context of redis client is not used in this gcs client. We would not - // open connection for it by setting `enable_async_conn` as false. + // As the synchronous and the asynchronous context of redis client is not used in this + // gcs client. We would not open connection for it by setting `enable_sync_conn` and + // `enable_async_conn` as false. gcs::GcsClientOptions gcs_options = gcs::GcsClientOptions( options_.gcs_options.server_ip_, options_.gcs_options.server_port_, options_.gcs_options.password_, - /*enable_sync_conn=*/true, /*enable_async_conn=*/false, + /*enable_sync_conn=*/false, /*enable_async_conn=*/false, /*enable_subscribe_conn=*/true); - gcs_client_ = std::make_shared(gcs_options); + gcs_client_ = std::make_shared( + gcs_options, [this](std::pair *address) { + absl::MutexLock lock(&gcs_server_address_mutex_); + if (gcs_server_address_.second != 0) { + address->first = gcs_server_address_.first; + address->second = gcs_server_address_.second; + return true; + } + return false; + }); RAY_CHECK_OK(gcs_client_->Connect(io_service_)); RegisterToGcs(); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 73c6312ff..7f534782c 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -24,6 +24,7 @@ #include "ray/core_worker/common.h" #include "ray/core_worker/context.h" #include "ray/core_worker/future_resolver.h" +#include "ray/core_worker/gcs_server_address_updater.h" #include "ray/core_worker/lease_policy.h" #include "ray/core_worker/object_recovery_manager.h" #include "ray/core_worker/profiling.h" @@ -1184,6 +1185,12 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// Whether or not this worker is connected to the raylet and GCS. bool connected_ = false; + std::pair gcs_server_address_ GUARDED_BY(gcs_server_address_mutex_) = + std::make_pair("", 0); + /// To protect accessing the `gcs_server_address_`. + absl::Mutex gcs_server_address_mutex_; + std::unique_ptr gcs_server_address_updater_; + // Client to the GCS shared by core worker interfaces. std::shared_ptr gcs_client_; diff --git a/src/ray/core_worker/gcs_server_address_updater.cc b/src/ray/core_worker/gcs_server_address_updater.cc new file mode 100644 index 000000000..6ff142208 --- /dev/null +++ b/src/ray/core_worker/gcs_server_address_updater.cc @@ -0,0 +1,64 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/core_worker/gcs_server_address_updater.h" + +namespace ray { + +GcsServerAddressUpdater::GcsServerAddressUpdater( + const std::string raylet_ip_address, const int port, + std::function update_func) + : update_func_(update_func) { + // Init updater thread and run its io service. + updater_thread_.reset(new std::thread([this] { + SetThreadName("gcs_address_updater"); + /// The asio work to keep io_service_ alive. + boost::asio::io_service::work io_service_work_(updater_io_service_); + updater_io_service_.run(); + })); + client_call_manager_.reset(new rpc::ClientCallManager(updater_io_service_)); + auto grpc_client = + rpc::NodeManagerWorkerClient::make(raylet_ip_address, port, *client_call_manager_); + raylet_client_ = std::make_shared(grpc_client); + // Init updater runner. + updater_runner_.reset(new PeriodicalRunner(updater_io_service_)); + // Start updating gcs server address. + updater_runner_->RunFnPeriodically( + [this] { UpdateGcsServerAddress(); }, + RayConfig::instance().gcs_service_address_check_interval_milliseconds()); +} + +GcsServerAddressUpdater::~GcsServerAddressUpdater() { + updater_runner_.reset(); + raylet_client_.reset(); + updater_io_service_.stop(); + if (updater_thread_->joinable()) { + updater_thread_->join(); + } + updater_thread_.reset(); +} + +void GcsServerAddressUpdater::UpdateGcsServerAddress() { + RAY_LOG(DEBUG) << "Getting gcs server address from raylet."; + raylet_client_->GetGcsServerAddress( + [this](const Status &status, const rpc::GetGcsServerAddressReply &reply) { + if (!status.ok()) { + RAY_LOG(WARNING) << "Failed to get gcs server address from Raylet: " << status; + } else { + update_func_(reply.ip(), reply.port()); + } + }); +} + +} // namespace ray diff --git a/src/ray/core_worker/gcs_server_address_updater.h b/src/ray/core_worker/gcs_server_address_updater.h new file mode 100644 index 000000000..b1c85fa60 --- /dev/null +++ b/src/ray/core_worker/gcs_server_address_updater.h @@ -0,0 +1,48 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "ray/common/asio/instrumented_io_context.h" +#include "ray/common/asio/periodical_runner.h" +#include "ray/raylet_client/raylet_client.h" + +namespace ray { + +class GcsServerAddressUpdater { + public: + /// Create a updater for gcs server address. + /// + /// \param raylet_ip_address Raylet ip address. + /// \param port Port to connect raylet. + /// \param address to store gcs server address. + GcsServerAddressUpdater(const std::string raylet_ip_address, const int port, + std::function update_func); + + ~GcsServerAddressUpdater(); + + private: + /// Update gcs server address. + void UpdateGcsServerAddress(); + + std::unique_ptr client_call_manager_; + /// A client connection to the raylet. + std::shared_ptr raylet_client_; + std::function update_func_; + instrumented_io_context updater_io_service_; + std::unique_ptr updater_thread_; + std::unique_ptr updater_runner_; +}; + +} // namespace ray diff --git a/src/ray/gcs/gcs_client.h b/src/ray/gcs/gcs_client.h index 93cd58304..0c8ceeac8 100644 --- a/src/ray/gcs/gcs_client.h +++ b/src/ray/gcs/gcs_client.h @@ -81,6 +81,10 @@ class GcsClient : public std::enable_shared_from_this { /// Disconnect with GCS Service. Non-thread safe. virtual void Disconnect() = 0; + virtual std::pair GetGcsServerAddress() { + return std::make_pair("", 0); + } + /// Return client information for debug. virtual std::string DebugString() const { return ""; } diff --git a/src/ray/gcs/gcs_client/service_based_gcs_client.cc b/src/ray/gcs/gcs_client/service_based_gcs_client.cc index fa5fa604d..c4308b27b 100644 --- a/src/ray/gcs/gcs_client/service_based_gcs_client.cc +++ b/src/ray/gcs/gcs_client/service_based_gcs_client.cc @@ -24,8 +24,11 @@ extern "C" { namespace ray { namespace gcs { -ServiceBasedGcsClient::ServiceBasedGcsClient(const GcsClientOptions &options) +ServiceBasedGcsClient::ServiceBasedGcsClient( + const GcsClientOptions &options, + std::function *)> get_gcs_server_address_func) : GcsClient(options), + get_server_address_func_(get_gcs_server_address_func), last_reconnect_timestamp_ms_(0), last_reconnect_address_(std::make_pair("", -1)) {} @@ -51,15 +54,26 @@ Status ServiceBasedGcsClient::Connect(instrumented_io_context &io_service) { gcs_pub_sub_.reset(new GcsPubSub(redis_client_)); // Get gcs service address. - get_server_address_func_ = [this](std::pair *address) { - return GetGcsServerAddressFromRedis( - redis_client_->GetPrimaryContext()->sync_context(), address); - }; - std::pair address; - RAY_CHECK(GetGcsServerAddressFromRedis( - redis_client_->GetPrimaryContext()->sync_context(), &address, - RayConfig::instance().gcs_service_connect_retries())) - << "Failed to get gcs server address when init gcs client."; + if (get_server_address_func_) { + get_server_address_func_(¤t_gcs_server_address_); + int i = 0; + while (current_gcs_server_address_.first.empty() && + i < RayConfig::instance().gcs_service_connect_retries()) { + std::this_thread::sleep_for(std::chrono::milliseconds( + RayConfig::instance().internal_gcs_service_connect_wait_milliseconds())); + get_server_address_func_(¤t_gcs_server_address_); + i++; + } + } else { + get_server_address_func_ = [this](std::pair *address) { + return GetGcsServerAddressFromRedis( + redis_client_->GetPrimaryContext()->sync_context(), address); + }; + RAY_CHECK(GetGcsServerAddressFromRedis( + redis_client_->GetPrimaryContext()->sync_context(), ¤t_gcs_server_address_, + RayConfig::instance().gcs_service_connect_retries())) + << "Failed to get gcs server address when init gcs client."; + } resubscribe_func_ = [this](bool is_pubsub_server_restarted) { job_accessor_->AsyncResubscribe(is_pubsub_server_restarted); @@ -74,7 +88,8 @@ Status ServiceBasedGcsClient::Connect(instrumented_io_context &io_service) { // Connect to gcs service. client_call_manager_.reset(new rpc::ClientCallManager(io_service)); gcs_rpc_client_.reset(new rpc::GcsRpcClient( - address.first, address.second, *client_call_manager_, + current_gcs_server_address_.first, current_gcs_server_address_.second, + *client_call_manager_, [this](rpc::GcsServiceFailureType type) { GcsServiceFailureDetected(type); })); job_accessor_.reset(new ServiceBasedJobInfoAccessor(this)); actor_accessor_.reset(new ServiceBasedActorInfoAccessor(this)); @@ -110,6 +125,10 @@ void ServiceBasedGcsClient::Disconnect() { RAY_LOG(DEBUG) << "ServiceBasedGcsClient Disconnected."; } +std::pair ServiceBasedGcsClient::GetGcsServerAddress() { + return current_gcs_server_address_; +} + bool ServiceBasedGcsClient::GetGcsServerAddressFromRedis( redisContext *context, std::pair *address, int max_attempts) { // Get gcs server address. diff --git a/src/ray/gcs/gcs_client/service_based_gcs_client.h b/src/ray/gcs/gcs_client/service_based_gcs_client.h index 84968bde9..714a94174 100644 --- a/src/ray/gcs/gcs_client/service_based_gcs_client.h +++ b/src/ray/gcs/gcs_client/service_based_gcs_client.h @@ -26,7 +26,9 @@ namespace gcs { class RAY_EXPORT ServiceBasedGcsClient : public GcsClient { public: - explicit ServiceBasedGcsClient(const GcsClientOptions &options); + explicit ServiceBasedGcsClient(const GcsClientOptions &options, + std::function *)> + get_gcs_server_address_func = {}); Status Connect(instrumented_io_context &io_service) override; @@ -36,6 +38,8 @@ class RAY_EXPORT ServiceBasedGcsClient : public GcsClient { rpc::GcsRpcClient &GetGcsRpcClient() { return *gcs_rpc_client_; } + std::pair GetGcsServerAddress() override; + private: /// Get gcs server address from redis. /// This address is set by GcsServer::StoreGcsServerAddressInRedis function. diff --git a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h index 61c1877d2..a971682bf 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h +++ b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h @@ -240,6 +240,10 @@ struct GcsServerMocker { void GetSystemConfig(const ray::rpc::ClientCallback &callback) override {} + void GetGcsServerAddress( + const ray::rpc::ClientCallback &callback) + override {} + /// ResourceUsageInterface void RequestResourceReport( const rpc::ClientCallback &callback) override { diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index e9b89445d..87fb09a83 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -222,6 +222,14 @@ message UpdateResourceUsageRequest { message UpdateResourceUsageReply { } +message GetGcsServerAddressRequest { +} + +message GetGcsServerAddressReply { + string ip = 1; + int32 port = 2; +} + // Service for inter-node-manager communication. service NodeManagerService { // Update the node's view of the cluster resource usage @@ -273,4 +281,6 @@ service NodeManagerService { returns (ReleaseUnusedBundlesReply); // Get the system config. rpc GetSystemConfig(GetSystemConfigRequest) returns (GetSystemConfigReply); + // Get gcs server address. + rpc GetGcsServerAddress(GetGcsServerAddressRequest) returns (GetGcsServerAddressReply); } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 5022b54bf..db9823ae9 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2025,6 +2025,15 @@ void NodeManager::HandleGetSystemConfig(const rpc::GetSystemConfigRequest &reque send_reply_callback(Status::OK(), nullptr, nullptr); } +void NodeManager::HandleGetGcsServerAddress( + const rpc::GetGcsServerAddressRequest &request, rpc::GetGcsServerAddressReply *reply, + rpc::SendReplyCallback send_reply_callback) { + auto address = gcs_client_->GetGcsServerAddress(); + reply->set_ip(address.first); + reply->set_port(address.second); + send_reply_callback(Status::OK(), nullptr, nullptr); +} + void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &node_stats_request, rpc::GetNodeStatsReply *reply, rpc::SendReplyCallback send_reply_callback) { diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 394899b12..285ceba68 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -553,6 +553,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler { rpc::GetSystemConfigReply *reply, rpc::SendReplyCallback send_reply_callback) override; + /// Handle a `GetGcsServerAddress` request. + void HandleGetGcsServerAddress(const rpc::GetGcsServerAddressRequest &request, + rpc::GetGcsServerAddressReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + /// Trigger local GC on each worker of this raylet. void DoLocalGC(); diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index 15cf4be46..1d9360bc5 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -455,4 +455,10 @@ void raylet::RayletClient::GetSystemConfig( grpc_client_->GetSystemConfig(request, callback); } +void raylet::RayletClient::GetGcsServerAddress( + const rpc::ClientCallback &callback) { + rpc::GetGcsServerAddressRequest request; + grpc_client_->GetGcsServerAddress(request, callback); +} + } // namespace ray diff --git a/src/ray/raylet_client/raylet_client.h b/src/ray/raylet_client/raylet_client.h index c0dbbb686..9eaee1648 100644 --- a/src/ray/raylet_client/raylet_client.h +++ b/src/ray/raylet_client/raylet_client.h @@ -166,6 +166,9 @@ class RayletClientInterface : public PinObjectsInterface, /// \param callback Callback that will be called after raylet replied the system config. virtual void GetSystemConfig( const rpc::ClientCallback &callback) = 0; + + virtual void GetGcsServerAddress( + const rpc::ClientCallback &callback) = 0; }; namespace raylet { @@ -406,6 +409,9 @@ class RayletClient : public RayletClientInterface { void GetSystemConfig( const rpc::ClientCallback &callback) override; + void GetGcsServerAddress( + const rpc::ClientCallback &callback) override; + void GlobalGC(const rpc::ClientCallback &callback); void UpdateResourceUsage( diff --git a/src/ray/rpc/node_manager/node_manager_client.h b/src/ray/rpc/node_manager/node_manager_client.h index 5e16550c3..341613a84 100644 --- a/src/ray/rpc/node_manager/node_manager_client.h +++ b/src/ray/rpc/node_manager/node_manager_client.h @@ -112,6 +112,9 @@ class NodeManagerWorkerClient /// Get the system config from Raylet. VOID_RPC_CLIENT_METHOD(NodeManagerService, GetSystemConfig, grpc_client_, ) + /// Get gcs server address. + VOID_RPC_CLIENT_METHOD(NodeManagerService, GetGcsServerAddress, grpc_client_, ) + private: /// Constructor. /// diff --git a/src/ray/rpc/node_manager/node_manager_server.h b/src/ray/rpc/node_manager/node_manager_server.h index 332e3d093..a7660b900 100644 --- a/src/ray/rpc/node_manager/node_manager_server.h +++ b/src/ray/rpc/node_manager/node_manager_server.h @@ -40,7 +40,8 @@ namespace rpc { RPC_SERVICE_HANDLER(NodeManagerService, CancelResourceReserve) \ RPC_SERVICE_HANDLER(NodeManagerService, RequestObjectSpillage) \ RPC_SERVICE_HANDLER(NodeManagerService, ReleaseUnusedBundles) \ - RPC_SERVICE_HANDLER(NodeManagerService, GetSystemConfig) + RPC_SERVICE_HANDLER(NodeManagerService, GetSystemConfig) \ + RPC_SERVICE_HANDLER(NodeManagerService, GetGcsServerAddress) /// Interface of the `NodeManagerService`, see `src/ray/protobuf/node_manager.proto`. class NodeManagerServiceHandler { @@ -122,6 +123,10 @@ class NodeManagerServiceHandler { virtual void HandleGetSystemConfig(const GetSystemConfigRequest &request, GetSystemConfigReply *reply, SendReplyCallback send_reply_callback) = 0; + + virtual void HandleGetGcsServerAddress(const GetGcsServerAddressRequest &request, + GetGcsServerAddressReply *reply, + SendReplyCallback send_reply_callback) = 0; }; /// The `GrpcService` for `NodeManagerService`.