mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
[GCS]add callback for RegisterSelf api, make it done first (#12252)
This commit is contained in:
parent
e025b9e788
commit
5d47d02f81
10 changed files with 58 additions and 45 deletions
|
@ -435,11 +435,13 @@ class NodeInfoAccessor {
|
|||
public:
|
||||
virtual ~NodeInfoAccessor() = default;
|
||||
|
||||
/// Register local node to GCS synchronously.
|
||||
/// Register local node to GCS asynchronously.
|
||||
///
|
||||
/// \param node_info The information of node to register to GCS.
|
||||
/// \param callback Callback that will be called when registration is complete.
|
||||
/// \return Status
|
||||
virtual Status RegisterSelf(const rpc::GcsNodeInfo &local_node_info) = 0;
|
||||
virtual Status RegisterSelf(const rpc::GcsNodeInfo &local_node_info,
|
||||
const StatusCallback &callback) = 0;
|
||||
|
||||
/// Cancel registration of local node to GCS synchronously.
|
||||
///
|
||||
|
|
|
@ -418,7 +418,8 @@ ServiceBasedNodeInfoAccessor::ServiceBasedNodeInfoAccessor(
|
|||
ServiceBasedGcsClient *client_impl)
|
||||
: client_impl_(client_impl) {}
|
||||
|
||||
Status ServiceBasedNodeInfoAccessor::RegisterSelf(const GcsNodeInfo &local_node_info) {
|
||||
Status ServiceBasedNodeInfoAccessor::RegisterSelf(const GcsNodeInfo &local_node_info,
|
||||
const StatusCallback &callback) {
|
||||
auto node_id = NodeID::FromBinary(local_node_info.node_id());
|
||||
RAY_LOG(DEBUG) << "Registering node info, node id = " << node_id
|
||||
<< ", address is = " << local_node_info.node_manager_address();
|
||||
|
@ -427,22 +428,20 @@ Status ServiceBasedNodeInfoAccessor::RegisterSelf(const GcsNodeInfo &local_node_
|
|||
rpc::RegisterNodeRequest request;
|
||||
request.mutable_node_info()->CopyFrom(local_node_info);
|
||||
|
||||
auto operation = [this, request, local_node_info,
|
||||
node_id](const SequencerDoneCallback &done_callback) {
|
||||
client_impl_->GetGcsRpcClient().RegisterNode(
|
||||
request, [this, node_id, local_node_info, done_callback](
|
||||
const Status &status, const rpc::RegisterNodeReply &reply) {
|
||||
if (status.ok()) {
|
||||
local_node_info_.CopyFrom(local_node_info);
|
||||
local_node_id_ = NodeID::FromBinary(local_node_info.node_id());
|
||||
}
|
||||
RAY_LOG(DEBUG) << "Finished registering node info, status = " << status
|
||||
<< ", node id = " << node_id;
|
||||
done_callback();
|
||||
});
|
||||
};
|
||||
client_impl_->GetGcsRpcClient().RegisterNode(
|
||||
request, [this, node_id, local_node_info, callback](
|
||||
const Status &status, const rpc::RegisterNodeReply &reply) {
|
||||
if (status.ok()) {
|
||||
local_node_info_.CopyFrom(local_node_info);
|
||||
local_node_id_ = NodeID::FromBinary(local_node_info.node_id());
|
||||
}
|
||||
if (callback) {
|
||||
callback(status);
|
||||
}
|
||||
RAY_LOG(DEBUG) << "Finished registering node info, status = " << status
|
||||
<< ", node id = " << node_id;
|
||||
});
|
||||
|
||||
sequencer_.Post(node_id, operation);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
|
|
@ -147,7 +147,8 @@ class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor {
|
|||
|
||||
virtual ~ServiceBasedNodeInfoAccessor() = default;
|
||||
|
||||
Status RegisterSelf(const GcsNodeInfo &local_node_info) override;
|
||||
Status RegisterSelf(const GcsNodeInfo &local_node_info,
|
||||
const StatusCallback &callback) override;
|
||||
|
||||
Status UnregisterSelf() override;
|
||||
|
||||
|
|
|
@ -288,7 +288,7 @@ class ServiceBasedGcsClientTest : public ::testing::Test {
|
|||
}
|
||||
|
||||
bool RegisterSelf(const rpc::GcsNodeInfo &local_node_info) {
|
||||
Status status = gcs_client_->Nodes().RegisterSelf(local_node_info);
|
||||
Status status = gcs_client_->Nodes().RegisterSelf(local_node_info, nullptr);
|
||||
return status.ok();
|
||||
}
|
||||
|
||||
|
|
|
@ -305,7 +305,8 @@ struct GcsServerMocker {
|
|||
|
||||
class MockedNodeInfoAccessor : public gcs::NodeInfoAccessor {
|
||||
public:
|
||||
Status RegisterSelf(const rpc::GcsNodeInfo &local_node_info) override {
|
||||
Status RegisterSelf(const rpc::GcsNodeInfo &local_node_info,
|
||||
const gcs::StatusCallback &callback) override {
|
||||
return Status::NotImplemented("");
|
||||
}
|
||||
|
||||
|
|
|
@ -497,9 +497,14 @@ RedisNodeInfoAccessor::RedisNodeInfoAccessor(RedisGcsClient *client_impl)
|
|||
resource_sub_executor_(client_impl_->resource_table()),
|
||||
heartbeat_batch_sub_executor_(client_impl->heartbeat_batch_table()) {}
|
||||
|
||||
Status RedisNodeInfoAccessor::RegisterSelf(const GcsNodeInfo &local_node_info) {
|
||||
Status RedisNodeInfoAccessor::RegisterSelf(const GcsNodeInfo &local_node_info,
|
||||
const StatusCallback &callback) {
|
||||
ClientTable &client_table = client_impl_->client_table();
|
||||
return client_table.Connect(local_node_info);
|
||||
Status status = client_table.Connect(local_node_info);
|
||||
if (callback != nullptr) {
|
||||
callback(Status::OK());
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
Status RedisNodeInfoAccessor::UnregisterSelf() {
|
||||
|
|
|
@ -321,7 +321,8 @@ class RedisNodeInfoAccessor : public NodeInfoAccessor {
|
|||
|
||||
virtual ~RedisNodeInfoAccessor() {}
|
||||
|
||||
Status RegisterSelf(const GcsNodeInfo &local_node_info) override;
|
||||
Status RegisterSelf(const GcsNodeInfo &local_node_info,
|
||||
const StatusCallback &callback) override;
|
||||
|
||||
Status UnregisterSelf() override;
|
||||
|
||||
|
|
|
@ -70,7 +70,7 @@ class MockServer {
|
|||
node_info.set_node_manager_port(object_manager_port);
|
||||
node_info.set_object_manager_port(object_manager_port);
|
||||
|
||||
ray::Status status = gcs_client_->Nodes().RegisterSelf(node_info);
|
||||
ray::Status status = gcs_client_->Nodes().RegisterSelf(node_info, nullptr);
|
||||
return status;
|
||||
}
|
||||
|
||||
|
|
|
@ -66,7 +66,7 @@ class MockServer {
|
|||
node_info.set_node_manager_port(object_manager_port);
|
||||
node_info.set_object_manager_port(object_manager_port);
|
||||
|
||||
ray::Status status = gcs_client_->Nodes().RegisterSelf(node_info);
|
||||
ray::Status status = gcs_client_->Nodes().RegisterSelf(node_info, nullptr);
|
||||
return status;
|
||||
}
|
||||
|
||||
|
|
|
@ -113,28 +113,32 @@ void Raylet::Stop() {
|
|||
}
|
||||
|
||||
ray::Status Raylet::RegisterGcs() {
|
||||
RAY_RETURN_NOT_OK(gcs_client_->Nodes().RegisterSelf(self_node_info_));
|
||||
auto register_callback = [this](const Status &status) {
|
||||
RAY_CHECK_OK(status);
|
||||
RAY_LOG(DEBUG) << "Node manager " << self_node_id_ << " started on "
|
||||
<< self_node_info_.node_manager_address() << ":"
|
||||
<< self_node_info_.node_manager_port() << " object manager at "
|
||||
<< self_node_info_.node_manager_address() << ":"
|
||||
<< self_node_info_.object_manager_port() << ", hostname "
|
||||
<< self_node_info_.node_manager_hostname();
|
||||
|
||||
RAY_LOG(DEBUG) << "Node manager " << self_node_id_ << " started on "
|
||||
<< self_node_info_.node_manager_address() << ":"
|
||||
<< self_node_info_.node_manager_port() << " object manager at "
|
||||
<< self_node_info_.node_manager_address() << ":"
|
||||
<< self_node_info_.object_manager_port() << ", hostname "
|
||||
<< self_node_info_.node_manager_hostname();
|
||||
// Add resource information.
|
||||
const NodeManagerConfig &node_manager_config = node_manager_.GetInitialConfig();
|
||||
std::unordered_map<std::string, std::shared_ptr<gcs::ResourceTableData>> resources;
|
||||
for (const auto &resource_pair :
|
||||
node_manager_config.resource_config.GetResourceMap()) {
|
||||
auto resource = std::make_shared<gcs::ResourceTableData>();
|
||||
resource->set_resource_capacity(resource_pair.second);
|
||||
resources.emplace(resource_pair.first, resource);
|
||||
}
|
||||
RAY_CHECK_OK(
|
||||
gcs_client_->Nodes().AsyncUpdateResources(self_node_id_, resources, nullptr));
|
||||
|
||||
RAY_CHECK_OK(node_manager_.RegisterGcs());
|
||||
};
|
||||
|
||||
// Add resource information.
|
||||
const NodeManagerConfig &node_manager_config = node_manager_.GetInitialConfig();
|
||||
std::unordered_map<std::string, std::shared_ptr<gcs::ResourceTableData>> resources;
|
||||
for (const auto &resource_pair : node_manager_config.resource_config.GetResourceMap()) {
|
||||
auto resource = std::make_shared<gcs::ResourceTableData>();
|
||||
resource->set_resource_capacity(resource_pair.second);
|
||||
resources.emplace(resource_pair.first, resource);
|
||||
}
|
||||
RAY_RETURN_NOT_OK(
|
||||
gcs_client_->Nodes().AsyncUpdateResources(self_node_id_, resources, nullptr));
|
||||
|
||||
RAY_RETURN_NOT_OK(node_manager_.RegisterGcs());
|
||||
|
||||
gcs_client_->Nodes().RegisterSelf(self_node_info_, register_callback));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue