[large scale]use proxy to track gcs server address in core worker (#15714)

This commit is contained in:
Tao Wang 2021-05-13 19:26:01 +08:00 committed by GitHub
parent c877da4c19
commit 19462e43d6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 230 additions and 17 deletions

View file

@ -426,15 +426,34 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
<< rpc_address_.port() << ", worker ID " << worker_context_.GetWorkerID()
<< ", raylet " << local_raylet_id;
// Begin to get gcs server address from raylet.
gcs_server_address_updater_ = std::make_unique<GcsServerAddressUpdater>(
options_.raylet_ip_address, options_.node_manager_port,
[this](std::string ip, int port) {
absl::MutexLock lock(&gcs_server_address_mutex_);
gcs_server_address_.first = ip;
gcs_server_address_.second = port;
});
// Initialize gcs client.
// As asynchronous context of redis client is not used in this gcs client. We would not
// open connection for it by setting `enable_async_conn` as false.
// As the synchronous and the asynchronous context of redis client is not used in this
// gcs client. We would not open connection for it by setting `enable_sync_conn` and
// `enable_async_conn` as false.
gcs::GcsClientOptions gcs_options = gcs::GcsClientOptions(
options_.gcs_options.server_ip_, options_.gcs_options.server_port_,
options_.gcs_options.password_,
/*enable_sync_conn=*/true, /*enable_async_conn=*/false,
/*enable_sync_conn=*/false, /*enable_async_conn=*/false,
/*enable_subscribe_conn=*/true);
gcs_client_ = std::make_shared<ray::gcs::ServiceBasedGcsClient>(gcs_options);
gcs_client_ = std::make_shared<ray::gcs::ServiceBasedGcsClient>(
gcs_options, [this](std::pair<std::string, int> *address) {
absl::MutexLock lock(&gcs_server_address_mutex_);
if (gcs_server_address_.second != 0) {
address->first = gcs_server_address_.first;
address->second = gcs_server_address_.second;
return true;
}
return false;
});
RAY_CHECK_OK(gcs_client_->Connect(io_service_));
RegisterToGcs();

View file

@ -24,6 +24,7 @@
#include "ray/core_worker/common.h"
#include "ray/core_worker/context.h"
#include "ray/core_worker/future_resolver.h"
#include "ray/core_worker/gcs_server_address_updater.h"
#include "ray/core_worker/lease_policy.h"
#include "ray/core_worker/object_recovery_manager.h"
#include "ray/core_worker/profiling.h"
@ -1184,6 +1185,12 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// Whether or not this worker is connected to the raylet and GCS.
bool connected_ = false;
std::pair<std::string, int> gcs_server_address_ GUARDED_BY(gcs_server_address_mutex_) =
std::make_pair<std::string, int>("", 0);
/// To protect accessing the `gcs_server_address_`.
absl::Mutex gcs_server_address_mutex_;
std::unique_ptr<GcsServerAddressUpdater> gcs_server_address_updater_;
// Client to the GCS shared by core worker interfaces.
std::shared_ptr<gcs::GcsClient> gcs_client_;

View file

@ -0,0 +1,64 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/core_worker/gcs_server_address_updater.h"
namespace ray {
GcsServerAddressUpdater::GcsServerAddressUpdater(
const std::string raylet_ip_address, const int port,
std::function<void(std::string, int)> update_func)
: update_func_(update_func) {
// Init updater thread and run its io service.
updater_thread_.reset(new std::thread([this] {
SetThreadName("gcs_address_updater");
/// The asio work to keep io_service_ alive.
boost::asio::io_service::work io_service_work_(updater_io_service_);
updater_io_service_.run();
}));
client_call_manager_.reset(new rpc::ClientCallManager(updater_io_service_));
auto grpc_client =
rpc::NodeManagerWorkerClient::make(raylet_ip_address, port, *client_call_manager_);
raylet_client_ = std::make_shared<raylet::RayletClient>(grpc_client);
// Init updater runner.
updater_runner_.reset(new PeriodicalRunner(updater_io_service_));
// Start updating gcs server address.
updater_runner_->RunFnPeriodically(
[this] { UpdateGcsServerAddress(); },
RayConfig::instance().gcs_service_address_check_interval_milliseconds());
}
GcsServerAddressUpdater::~GcsServerAddressUpdater() {
updater_runner_.reset();
raylet_client_.reset();
updater_io_service_.stop();
if (updater_thread_->joinable()) {
updater_thread_->join();
}
updater_thread_.reset();
}
void GcsServerAddressUpdater::UpdateGcsServerAddress() {
RAY_LOG(DEBUG) << "Getting gcs server address from raylet.";
raylet_client_->GetGcsServerAddress(
[this](const Status &status, const rpc::GetGcsServerAddressReply &reply) {
if (!status.ok()) {
RAY_LOG(WARNING) << "Failed to get gcs server address from Raylet: " << status;
} else {
update_func_(reply.ip(), reply.port());
}
});
}
} // namespace ray

View file

@ -0,0 +1,48 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/asio/periodical_runner.h"
#include "ray/raylet_client/raylet_client.h"
namespace ray {
class GcsServerAddressUpdater {
public:
/// Create a updater for gcs server address.
///
/// \param raylet_ip_address Raylet ip address.
/// \param port Port to connect raylet.
/// \param address to store gcs server address.
GcsServerAddressUpdater(const std::string raylet_ip_address, const int port,
std::function<void(std::string, int)> update_func);
~GcsServerAddressUpdater();
private:
/// Update gcs server address.
void UpdateGcsServerAddress();
std::unique_ptr<rpc::ClientCallManager> client_call_manager_;
/// A client connection to the raylet.
std::shared_ptr<raylet::RayletClient> raylet_client_;
std::function<void(std::string, int)> update_func_;
instrumented_io_context updater_io_service_;
std::unique_ptr<std::thread> updater_thread_;
std::unique_ptr<PeriodicalRunner> updater_runner_;
};
} // namespace ray

View file

@ -81,6 +81,10 @@ class GcsClient : public std::enable_shared_from_this<GcsClient> {
/// Disconnect with GCS Service. Non-thread safe.
virtual void Disconnect() = 0;
virtual std::pair<std::string, int> GetGcsServerAddress() {
return std::make_pair("", 0);
}
/// Return client information for debug.
virtual std::string DebugString() const { return ""; }

View file

@ -24,8 +24,11 @@ extern "C" {
namespace ray {
namespace gcs {
ServiceBasedGcsClient::ServiceBasedGcsClient(const GcsClientOptions &options)
ServiceBasedGcsClient::ServiceBasedGcsClient(
const GcsClientOptions &options,
std::function<bool(std::pair<std::string, int> *)> get_gcs_server_address_func)
: GcsClient(options),
get_server_address_func_(get_gcs_server_address_func),
last_reconnect_timestamp_ms_(0),
last_reconnect_address_(std::make_pair("", -1)) {}
@ -51,15 +54,26 @@ Status ServiceBasedGcsClient::Connect(instrumented_io_context &io_service) {
gcs_pub_sub_.reset(new GcsPubSub(redis_client_));
// Get gcs service address.
if (get_server_address_func_) {
get_server_address_func_(&current_gcs_server_address_);
int i = 0;
while (current_gcs_server_address_.first.empty() &&
i < RayConfig::instance().gcs_service_connect_retries()) {
std::this_thread::sleep_for(std::chrono::milliseconds(
RayConfig::instance().internal_gcs_service_connect_wait_milliseconds()));
get_server_address_func_(&current_gcs_server_address_);
i++;
}
} else {
get_server_address_func_ = [this](std::pair<std::string, int> *address) {
return GetGcsServerAddressFromRedis(
redis_client_->GetPrimaryContext()->sync_context(), address);
};
std::pair<std::string, int> address;
RAY_CHECK(GetGcsServerAddressFromRedis(
redis_client_->GetPrimaryContext()->sync_context(), &address,
redis_client_->GetPrimaryContext()->sync_context(), &current_gcs_server_address_,
RayConfig::instance().gcs_service_connect_retries()))
<< "Failed to get gcs server address when init gcs client.";
}
resubscribe_func_ = [this](bool is_pubsub_server_restarted) {
job_accessor_->AsyncResubscribe(is_pubsub_server_restarted);
@ -74,7 +88,8 @@ Status ServiceBasedGcsClient::Connect(instrumented_io_context &io_service) {
// Connect to gcs service.
client_call_manager_.reset(new rpc::ClientCallManager(io_service));
gcs_rpc_client_.reset(new rpc::GcsRpcClient(
address.first, address.second, *client_call_manager_,
current_gcs_server_address_.first, current_gcs_server_address_.second,
*client_call_manager_,
[this](rpc::GcsServiceFailureType type) { GcsServiceFailureDetected(type); }));
job_accessor_.reset(new ServiceBasedJobInfoAccessor(this));
actor_accessor_.reset(new ServiceBasedActorInfoAccessor(this));
@ -110,6 +125,10 @@ void ServiceBasedGcsClient::Disconnect() {
RAY_LOG(DEBUG) << "ServiceBasedGcsClient Disconnected.";
}
std::pair<std::string, int> ServiceBasedGcsClient::GetGcsServerAddress() {
return current_gcs_server_address_;
}
bool ServiceBasedGcsClient::GetGcsServerAddressFromRedis(
redisContext *context, std::pair<std::string, int> *address, int max_attempts) {
// Get gcs server address.

View file

@ -26,7 +26,9 @@ namespace gcs {
class RAY_EXPORT ServiceBasedGcsClient : public GcsClient {
public:
explicit ServiceBasedGcsClient(const GcsClientOptions &options);
explicit ServiceBasedGcsClient(const GcsClientOptions &options,
std::function<bool(std::pair<std::string, int> *)>
get_gcs_server_address_func = {});
Status Connect(instrumented_io_context &io_service) override;
@ -36,6 +38,8 @@ class RAY_EXPORT ServiceBasedGcsClient : public GcsClient {
rpc::GcsRpcClient &GetGcsRpcClient() { return *gcs_rpc_client_; }
std::pair<std::string, int> GetGcsServerAddress() override;
private:
/// Get gcs server address from redis.
/// This address is set by GcsServer::StoreGcsServerAddressInRedis function.

View file

@ -240,6 +240,10 @@ struct GcsServerMocker {
void GetSystemConfig(const ray::rpc::ClientCallback<ray::rpc::GetSystemConfigReply>
&callback) override {}
void GetGcsServerAddress(
const ray::rpc::ClientCallback<ray::rpc::GetGcsServerAddressReply> &callback)
override {}
/// ResourceUsageInterface
void RequestResourceReport(
const rpc::ClientCallback<rpc::RequestResourceReportReply> &callback) override {

View file

@ -222,6 +222,14 @@ message UpdateResourceUsageRequest {
message UpdateResourceUsageReply {
}
message GetGcsServerAddressRequest {
}
message GetGcsServerAddressReply {
string ip = 1;
int32 port = 2;
}
// Service for inter-node-manager communication.
service NodeManagerService {
// Update the node's view of the cluster resource usage
@ -273,4 +281,6 @@ service NodeManagerService {
returns (ReleaseUnusedBundlesReply);
// Get the system config.
rpc GetSystemConfig(GetSystemConfigRequest) returns (GetSystemConfigReply);
// Get gcs server address.
rpc GetGcsServerAddress(GetGcsServerAddressRequest) returns (GetGcsServerAddressReply);
}

View file

@ -2025,6 +2025,15 @@ void NodeManager::HandleGetSystemConfig(const rpc::GetSystemConfigRequest &reque
send_reply_callback(Status::OK(), nullptr, nullptr);
}
void NodeManager::HandleGetGcsServerAddress(
const rpc::GetGcsServerAddressRequest &request, rpc::GetGcsServerAddressReply *reply,
rpc::SendReplyCallback send_reply_callback) {
auto address = gcs_client_->GetGcsServerAddress();
reply->set_ip(address.first);
reply->set_port(address.second);
send_reply_callback(Status::OK(), nullptr, nullptr);
}
void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &node_stats_request,
rpc::GetNodeStatsReply *reply,
rpc::SendReplyCallback send_reply_callback) {

View file

@ -553,6 +553,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
rpc::GetSystemConfigReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Handle a `GetGcsServerAddress` request.
void HandleGetGcsServerAddress(const rpc::GetGcsServerAddressRequest &request,
rpc::GetGcsServerAddressReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Trigger local GC on each worker of this raylet.
void DoLocalGC();

View file

@ -455,4 +455,10 @@ void raylet::RayletClient::GetSystemConfig(
grpc_client_->GetSystemConfig(request, callback);
}
void raylet::RayletClient::GetGcsServerAddress(
const rpc::ClientCallback<rpc::GetGcsServerAddressReply> &callback) {
rpc::GetGcsServerAddressRequest request;
grpc_client_->GetGcsServerAddress(request, callback);
}
} // namespace ray

View file

@ -166,6 +166,9 @@ class RayletClientInterface : public PinObjectsInterface,
/// \param callback Callback that will be called after raylet replied the system config.
virtual void GetSystemConfig(
const rpc::ClientCallback<rpc::GetSystemConfigReply> &callback) = 0;
virtual void GetGcsServerAddress(
const rpc::ClientCallback<rpc::GetGcsServerAddressReply> &callback) = 0;
};
namespace raylet {
@ -406,6 +409,9 @@ class RayletClient : public RayletClientInterface {
void GetSystemConfig(
const rpc::ClientCallback<rpc::GetSystemConfigReply> &callback) override;
void GetGcsServerAddress(
const rpc::ClientCallback<rpc::GetGcsServerAddressReply> &callback) override;
void GlobalGC(const rpc::ClientCallback<rpc::GlobalGCReply> &callback);
void UpdateResourceUsage(

View file

@ -112,6 +112,9 @@ class NodeManagerWorkerClient
/// Get the system config from Raylet.
VOID_RPC_CLIENT_METHOD(NodeManagerService, GetSystemConfig, grpc_client_, )
/// Get gcs server address.
VOID_RPC_CLIENT_METHOD(NodeManagerService, GetGcsServerAddress, grpc_client_, )
private:
/// Constructor.
///

View file

@ -40,7 +40,8 @@ namespace rpc {
RPC_SERVICE_HANDLER(NodeManagerService, CancelResourceReserve) \
RPC_SERVICE_HANDLER(NodeManagerService, RequestObjectSpillage) \
RPC_SERVICE_HANDLER(NodeManagerService, ReleaseUnusedBundles) \
RPC_SERVICE_HANDLER(NodeManagerService, GetSystemConfig)
RPC_SERVICE_HANDLER(NodeManagerService, GetSystemConfig) \
RPC_SERVICE_HANDLER(NodeManagerService, GetGcsServerAddress)
/// Interface of the `NodeManagerService`, see `src/ray/protobuf/node_manager.proto`.
class NodeManagerServiceHandler {
@ -122,6 +123,10 @@ class NodeManagerServiceHandler {
virtual void HandleGetSystemConfig(const GetSystemConfigRequest &request,
GetSystemConfigReply *reply,
SendReplyCallback send_reply_callback) = 0;
virtual void HandleGetGcsServerAddress(const GetGcsServerAddressRequest &request,
GetGcsServerAddressReply *reply,
SendReplyCallback send_reply_callback) = 0;
};
/// The `GrpcService` for `NodeManagerService`.