Optimize gcs server resubscribe (#8896)

This commit is contained in:
fangfengbin 2020-06-17 20:05:50 +08:00 committed by GitHub
parent c6ee3cdff4
commit c295284370
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 403 additions and 302 deletions

View file

@ -1171,8 +1171,6 @@ cc_test(
"//:redis-cli", "//:redis-cli",
"//:redis-server", "//:redis-server",
], ],
# TODO(swang): Enable again once pubsub client supports GCS server restart.
tags = ["manual"],
deps = [ deps = [
":gcs_server_lib", ":gcs_server_lib",
":gcs_test_util_lib", ":gcs_test_util_lib",

View file

@ -152,9 +152,12 @@ class ActorInfoAccessor {
/// Reestablish subscription. /// Reestablish subscription.
/// This should be called when GCS server restarts from a failure. /// This should be called when GCS server restarts from a failure.
/// PubSub server restart will cause GCS server restart. In this case, we need to
/// resubscribe from PubSub server, otherwise we only need to fetch data from GCS
/// server.
/// ///
/// \return Status /// \param is_pubsub_server_restarted Whether pubsub server is restarted.
virtual Status AsyncReSubscribe() = 0; virtual void AsyncResubscribe(bool is_pubsub_server_restarted) = 0;
protected: protected:
ActorInfoAccessor() = default; ActorInfoAccessor() = default;
@ -202,9 +205,12 @@ class JobInfoAccessor {
/// Reestablish subscription. /// Reestablish subscription.
/// This should be called when GCS server restarts from a failure. /// This should be called when GCS server restarts from a failure.
/// PubSub server restart will cause GCS server restart. In this case, we need to
/// resubscribe from PubSub server, otherwise we only need to fetch data from GCS
/// server.
/// ///
/// \return Status /// \param is_pubsub_server_restarted Whether pubsub server is restarted.
virtual Status AsyncReSubscribe() = 0; virtual void AsyncResubscribe(bool is_pubsub_server_restarted) = 0;
protected: protected:
JobInfoAccessor() = default; JobInfoAccessor() = default;
@ -310,9 +316,12 @@ class TaskInfoAccessor {
/// Reestablish subscription. /// Reestablish subscription.
/// This should be called when GCS server restarts from a failure. /// This should be called when GCS server restarts from a failure.
/// PubSub server restart will cause GCS server restart. In this case, we need to
/// resubscribe from PubSub server, otherwise we only need to fetch data from GCS
/// server.
/// ///
/// \return Status /// \param is_pubsub_server_restarted Whether pubsub server is restarted.
virtual Status AsyncReSubscribe() = 0; virtual void AsyncResubscribe(bool is_pubsub_server_restarted) = 0;
protected: protected:
TaskInfoAccessor() = default; TaskInfoAccessor() = default;
@ -379,9 +388,12 @@ class ObjectInfoAccessor {
/// Reestablish subscription. /// Reestablish subscription.
/// This should be called when GCS server restarts from a failure. /// This should be called when GCS server restarts from a failure.
/// PubSub server restart will cause GCS server restart. In this case, we need to
/// resubscribe from PubSub server, otherwise we only need to fetch data from GCS
/// server.
/// ///
/// \return Status /// \param is_pubsub_server_restarted Whether pubsub server is restarted.
virtual Status AsyncReSubscribe() = 0; virtual void AsyncResubscribe(bool is_pubsub_server_restarted) = 0;
protected: protected:
ObjectInfoAccessor() = default; ObjectInfoAccessor() = default;
@ -555,9 +567,12 @@ class NodeInfoAccessor {
/// Reestablish subscription. /// Reestablish subscription.
/// This should be called when GCS server restarts from a failure. /// This should be called when GCS server restarts from a failure.
/// PubSub server restart will cause GCS server restart. In this case, we need to
/// resubscribe from PubSub server, otherwise we only need to fetch data from GCS
/// server.
/// ///
/// \return Status /// \param is_pubsub_server_restarted Whether pubsub server is restarted.
virtual Status AsyncReSubscribe() = 0; virtual void AsyncResubscribe(bool is_pubsub_server_restarted) = 0;
protected: protected:
NodeInfoAccessor() = default; NodeInfoAccessor() = default;
@ -657,9 +672,12 @@ class WorkerInfoAccessor {
/// Reestablish subscription. /// Reestablish subscription.
/// This should be called when GCS server restarts from a failure. /// This should be called when GCS server restarts from a failure.
/// PubSub server restart will cause GCS server restart. In this case, we need to
/// resubscribe from PubSub server, otherwise we only need to fetch data from GCS
/// server.
/// ///
/// \return Status /// \param is_pubsub_server_restarted Whether pubsub server is restarted.
virtual Status AsyncReSubscribe() = 0; virtual void AsyncResubscribe(bool is_pubsub_server_restarted) = 0;
protected: protected:
WorkerInfoAccessor() = default; WorkerInfoAccessor() = default;

View file

@ -64,7 +64,6 @@ Status ServiceBasedJobInfoAccessor::AsyncSubscribeToFinishedJobs(
const SubscribeCallback<JobID, JobTableData> &subscribe, const StatusCallback &done) { const SubscribeCallback<JobID, JobTableData> &subscribe, const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr); RAY_CHECK(subscribe != nullptr);
subscribe_operation_ = [this, subscribe](const StatusCallback &done) { subscribe_operation_ = [this, subscribe](const StatusCallback &done) {
RAY_LOG(DEBUG) << "Subscribing finished job.";
auto on_subscribe = [subscribe](const std::string &id, const std::string &data) { auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
JobTableData job_data; JobTableData job_data;
job_data.ParseFromString(data); job_data.ParseFromString(data);
@ -72,20 +71,17 @@ Status ServiceBasedJobInfoAccessor::AsyncSubscribeToFinishedJobs(
subscribe(JobID::FromBinary(id), job_data); subscribe(JobID::FromBinary(id), job_data);
} }
}; };
Status status = return client_impl_->GetGcsPubSub().SubscribeAll(JOB_CHANNEL, on_subscribe, done);
client_impl_->GetGcsPubSub().SubscribeAll(JOB_CHANNEL, on_subscribe, done);
RAY_LOG(DEBUG) << "Finished subscribing finished job.";
return status;
}; };
return subscribe_operation_(done); return subscribe_operation_(done);
} }
Status ServiceBasedJobInfoAccessor::AsyncReSubscribe() { void ServiceBasedJobInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restarted) {
RAY_LOG(INFO) << "Reestablishing subscription for job info."; RAY_LOG(INFO) << "Reestablishing subscription for job info.";
if (subscribe_operation_ != nullptr) { // If the pub-sub server has restarted, we need to resubscribe to the pub-sub server.
return subscribe_operation_(nullptr); if (subscribe_operation_ != nullptr && is_pubsub_server_restarted) {
RAY_CHECK_OK(subscribe_operation_(nullptr));
} }
return Status::OK();
} }
Status ServiceBasedJobInfoAccessor::AsyncGetAll( Status ServiceBasedJobInfoAccessor::AsyncGetAll(
@ -152,8 +148,7 @@ Status ServiceBasedActorInfoAccessor::AsyncGetByName(
request, request,
[name, callback](const Status &status, const rpc::GetNamedActorInfoReply &reply) { [name, callback](const Status &status, const rpc::GetNamedActorInfoReply &reply) {
if (reply.has_actor_table_data()) { if (reply.has_actor_table_data()) {
rpc::ActorTableData actor_table_data(reply.actor_table_data()); callback(status, reply.actor_table_data());
callback(status, actor_table_data);
} else { } else {
callback(status, boost::none); callback(status, boost::none);
} }
@ -234,16 +229,8 @@ Status ServiceBasedActorInfoAccessor::AsyncUpdate(
Status ServiceBasedActorInfoAccessor::AsyncSubscribeAll( Status ServiceBasedActorInfoAccessor::AsyncSubscribeAll(
const SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe, const SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe,
const StatusCallback &done) { const StatusCallback &done) {
RAY_LOG(DEBUG) << "Subscribing register or update operations of actors.";
RAY_CHECK(subscribe != nullptr); RAY_CHECK(subscribe != nullptr);
subscribe_all_operation_ = [this, subscribe](const StatusCallback &done) { fetch_all_data_operation_ = [this, subscribe](const StatusCallback &done) {
auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
ActorTableData actor_data;
actor_data.ParseFromString(data);
subscribe(ActorID::FromBinary(actor_data.actor_id()), actor_data);
};
auto on_done = [this, subscribe, done](const Status &status) {
if (status.ok()) {
auto callback = [subscribe, done]( auto callback = [subscribe, done](
const Status &status, const Status &status,
const std::vector<rpc::ActorTableData> &actor_info_list) { const std::vector<rpc::ActorTableData> &actor_info_list) {
@ -255,16 +242,19 @@ Status ServiceBasedActorInfoAccessor::AsyncSubscribeAll(
} }
}; };
RAY_CHECK_OK(AsyncGetAll(callback)); RAY_CHECK_OK(AsyncGetAll(callback));
} else if (done) {
done(status);
}
}; };
auto status =
client_impl_->GetGcsPubSub().SubscribeAll(ACTOR_CHANNEL, on_subscribe, on_done); subscribe_all_operation_ = [this, subscribe](const StatusCallback &done) {
RAY_LOG(DEBUG) << "Finished subscribing register or update operations of actors."; auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
return status; ActorTableData actor_data;
actor_data.ParseFromString(data);
subscribe(ActorID::FromBinary(actor_data.actor_id()), actor_data);
}; };
return subscribe_all_operation_(done); return client_impl_->GetGcsPubSub().SubscribeAll(ACTOR_CHANNEL, on_subscribe, done);
};
return subscribe_all_operation_(
[this, done](const Status &status) { fetch_all_data_operation_(done); });
} }
Status ServiceBasedActorInfoAccessor::AsyncSubscribe( Status ServiceBasedActorInfoAccessor::AsyncSubscribe(
@ -273,43 +263,44 @@ Status ServiceBasedActorInfoAccessor::AsyncSubscribe(
const StatusCallback &done) { const StatusCallback &done) {
RAY_LOG(DEBUG) << "Subscribing update operations of actor, actor id = " << actor_id; RAY_LOG(DEBUG) << "Subscribing update operations of actor, actor id = " << actor_id;
RAY_CHECK(subscribe != nullptr) << "Failed to subscribe actor, actor id = " << actor_id; RAY_CHECK(subscribe != nullptr) << "Failed to subscribe actor, actor id = " << actor_id;
auto subscribe_operation = [this, actor_id, subscribe](const StatusCallback &done) {
auto on_subscribe = [subscribe](const std::string &id, const std::string &data) { auto fetch_data_operation = [this, actor_id,
ActorTableData actor_data; subscribe](const StatusCallback &fetch_done) {
actor_data.ParseFromString(data); auto callback = [actor_id, subscribe, fetch_done](
subscribe(ActorID::FromBinary(actor_data.actor_id()), actor_data);
};
auto on_done = [this, actor_id, subscribe, done](const Status &status) {
if (status.ok()) {
auto callback = [actor_id, subscribe, done](
const Status &status, const Status &status,
const boost::optional<rpc::ActorTableData> &result) { const boost::optional<rpc::ActorTableData> &result) {
if (result) { if (result) {
subscribe(actor_id, *result); subscribe(actor_id, *result);
} }
if (done) { if (fetch_done) {
done(status); fetch_done(status);
} }
}; };
RAY_CHECK_OK(AsyncGet(actor_id, callback)); RAY_CHECK_OK(AsyncGet(actor_id, callback));
} else if (done) {
done(status);
}
}; };
auto status = client_impl_->GetGcsPubSub().Subscribe(ACTOR_CHANNEL, actor_id.Hex(),
on_subscribe, on_done); auto subscribe_operation = [this, actor_id,
RAY_LOG(DEBUG) << "Finished subscribing update operations of actor, actor id = " subscribe](const StatusCallback &subscribe_done) {
<< actor_id; auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
return status; ActorTableData actor_data;
actor_data.ParseFromString(data);
subscribe(ActorID::FromBinary(actor_data.actor_id()), actor_data);
}; };
return client_impl_->GetGcsPubSub().Subscribe(ACTOR_CHANNEL, actor_id.Hex(),
on_subscribe, subscribe_done);
};
subscribe_operations_[actor_id] = subscribe_operation; subscribe_operations_[actor_id] = subscribe_operation;
return subscribe_operation(done); fetch_data_operations_[actor_id] = fetch_data_operation;
return subscribe_operation(
[fetch_data_operation, done](const Status &status) { fetch_data_operation(done); });
} }
Status ServiceBasedActorInfoAccessor::AsyncUnsubscribe(const ActorID &actor_id) { Status ServiceBasedActorInfoAccessor::AsyncUnsubscribe(const ActorID &actor_id) {
RAY_LOG(DEBUG) << "Cancelling subscription to an actor, actor id = " << actor_id; RAY_LOG(DEBUG) << "Cancelling subscription to an actor, actor id = " << actor_id;
auto status = client_impl_->GetGcsPubSub().Unsubscribe(ACTOR_CHANNEL, actor_id.Hex()); auto status = client_impl_->GetGcsPubSub().Unsubscribe(ACTOR_CHANNEL, actor_id.Hex());
subscribe_operations_.erase(actor_id); subscribe_operations_.erase(actor_id);
fetch_data_operations_.erase(actor_id);
RAY_LOG(DEBUG) << "Finished cancelling subscription to an actor, actor id = " RAY_LOG(DEBUG) << "Finished cancelling subscription to an actor, actor id = "
<< actor_id; << actor_id;
return status; return status;
@ -386,15 +377,30 @@ Status ServiceBasedActorInfoAccessor::AsyncGetCheckpointID(
return Status::OK(); return Status::OK();
} }
Status ServiceBasedActorInfoAccessor::AsyncReSubscribe() { void ServiceBasedActorInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restarted) {
RAY_LOG(INFO) << "Reestablishing subscription for actor info."; RAY_LOG(INFO) << "Reestablishing subscription for actor info.";
// If only the GCS sever has restarted, we only need to fetch data from the GCS server.
// If the pub-sub server has also restarted, we need to resubscribe to the pub-sub
// server first, then fetch data from the GCS server.
if (is_pubsub_server_restarted) {
if (subscribe_all_operation_ != nullptr) { if (subscribe_all_operation_ != nullptr) {
RAY_CHECK_OK(subscribe_all_operation_(nullptr)); RAY_CHECK_OK(subscribe_all_operation_(
[this](const Status &status) { fetch_all_data_operation_(nullptr); }));
} }
for (auto &item : subscribe_operations_) { for (auto &item : subscribe_operations_) {
RAY_CHECK_OK(item.second(nullptr)); auto &actor_id = item.first;
RAY_CHECK_OK(item.second([this, actor_id](const Status &status) {
fetch_data_operations_[actor_id](nullptr);
}));
}
} else {
if (fetch_all_data_operation_ != nullptr) {
fetch_all_data_operation_(nullptr);
}
for (auto &item : fetch_data_operations_) {
item.second(nullptr);
}
} }
return Status::OK();
} }
ServiceBasedNodeInfoAccessor::ServiceBasedNodeInfoAccessor( ServiceBasedNodeInfoAccessor::ServiceBasedNodeInfoAccessor(
@ -411,7 +417,7 @@ Status ServiceBasedNodeInfoAccessor::RegisterSelf(const GcsNodeInfo &local_node_
request.mutable_node_info()->CopyFrom(local_node_info); request.mutable_node_info()->CopyFrom(local_node_info);
auto operation = [this, request, local_node_info, auto operation = [this, request, local_node_info,
node_id](SequencerDoneCallback done_callback) { node_id](const SequencerDoneCallback &done_callback) {
client_impl_->GetGcsRpcClient().RegisterNode( client_impl_->GetGcsRpcClient().RegisterNode(
request, [this, node_id, local_node_info, done_callback]( request, [this, node_id, local_node_info, done_callback](
const Status &status, const rpc::RegisterNodeReply &reply) { const Status &status, const rpc::RegisterNodeReply &reply) {
@ -510,23 +516,12 @@ Status ServiceBasedNodeInfoAccessor::AsyncGetAll(
Status ServiceBasedNodeInfoAccessor::AsyncSubscribeToNodeChange( Status ServiceBasedNodeInfoAccessor::AsyncSubscribeToNodeChange(
const SubscribeCallback<ClientID, GcsNodeInfo> &subscribe, const SubscribeCallback<ClientID, GcsNodeInfo> &subscribe,
const StatusCallback &done) { const StatusCallback &done) {
RAY_LOG(DEBUG) << "Subscribing node change.";
RAY_CHECK(subscribe != nullptr); RAY_CHECK(subscribe != nullptr);
RAY_CHECK(node_change_callback_ == nullptr); RAY_CHECK(node_change_callback_ == nullptr);
node_change_callback_ = subscribe; node_change_callback_ = subscribe;
RAY_CHECK(subscribe != nullptr); fetch_node_data_operation_ = [this](const StatusCallback &done) {
subscribe_node_operation_ = [this, subscribe](const StatusCallback &done) { auto callback = [this, done](const Status &status,
auto on_subscribe = [this](const std::string &id, const std::string &data) {
GcsNodeInfo node_info;
node_info.ParseFromString(data);
HandleNotification(node_info);
};
auto on_done = [this, subscribe, done](const Status &status) {
// Get nodes from GCS Service.
auto callback = [this, subscribe, done](
const Status &status,
const std::vector<GcsNodeInfo> &node_info_list) { const std::vector<GcsNodeInfo> &node_info_list) {
for (auto &node_info : node_info_list) { for (auto &node_info : node_info_list) {
HandleNotification(node_info); HandleNotification(node_info);
@ -538,12 +533,18 @@ Status ServiceBasedNodeInfoAccessor::AsyncSubscribeToNodeChange(
RAY_CHECK_OK(AsyncGetAll(callback)); RAY_CHECK_OK(AsyncGetAll(callback));
}; };
auto status = subscribe_node_operation_ = [this](const StatusCallback &done) {
client_impl_->GetGcsPubSub().SubscribeAll(NODE_CHANNEL, on_subscribe, on_done); auto on_subscribe = [this](const std::string &id, const std::string &data) {
RAY_LOG(DEBUG) << "Finished subscribing node change."; GcsNodeInfo node_info;
return status; node_info.ParseFromString(data);
HandleNotification(node_info);
}; };
return subscribe_node_operation_(done); return client_impl_->GetGcsPubSub().SubscribeAll(NODE_CHANNEL, on_subscribe, done);
};
return subscribe_node_operation_([this, subscribe, done](const Status &status) {
fetch_node_data_operation_(done);
});
} }
boost::optional<GcsNodeInfo> ServiceBasedNodeInfoAccessor::Get( boost::optional<GcsNodeInfo> ServiceBasedNodeInfoAccessor::Get(
@ -596,7 +597,7 @@ Status ServiceBasedNodeInfoAccessor::AsyncUpdateResources(
} }
auto operation = [this, request, node_id, auto operation = [this, request, node_id,
callback](SequencerDoneCallback done_callback) { callback](const SequencerDoneCallback &done_callback) {
client_impl_->GetGcsRpcClient().UpdateResources( client_impl_->GetGcsRpcClient().UpdateResources(
request, [node_id, callback, done_callback]( request, [node_id, callback, done_callback](
const Status &status, const rpc::UpdateResourcesReply &reply) { const Status &status, const rpc::UpdateResourcesReply &reply) {
@ -624,7 +625,7 @@ Status ServiceBasedNodeInfoAccessor::AsyncDeleteResources(
} }
auto operation = [this, request, node_id, auto operation = [this, request, node_id,
callback](SequencerDoneCallback done_callback) { callback](const SequencerDoneCallback &done_callback) {
client_impl_->GetGcsRpcClient().DeleteResources( client_impl_->GetGcsRpcClient().DeleteResources(
request, [node_id, callback, done_callback]( request, [node_id, callback, done_callback](
const Status &status, const rpc::DeleteResourcesReply &reply) { const Status &status, const rpc::DeleteResourcesReply &reply) {
@ -643,20 +644,15 @@ Status ServiceBasedNodeInfoAccessor::AsyncDeleteResources(
Status ServiceBasedNodeInfoAccessor::AsyncSubscribeToResources( Status ServiceBasedNodeInfoAccessor::AsyncSubscribeToResources(
const ItemCallback<rpc::NodeResourceChange> &subscribe, const StatusCallback &done) { const ItemCallback<rpc::NodeResourceChange> &subscribe, const StatusCallback &done) {
RAY_LOG(DEBUG) << "Subscribing node resources change.";
RAY_CHECK(subscribe != nullptr); RAY_CHECK(subscribe != nullptr);
subscribe_resource_operation_ = [this, subscribe](const StatusCallback &done) { subscribe_resource_operation_ = [this, subscribe](const StatusCallback &done) {
auto on_subscribe = [subscribe](const std::string &id, const std::string &data) { auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
rpc::NodeResourceChange node_resource_change; rpc::NodeResourceChange node_resource_change;
node_resource_change.ParseFromString(data); node_resource_change.ParseFromString(data);
subscribe(node_resource_change); subscribe(node_resource_change);
}; };
return client_impl_->GetGcsPubSub().SubscribeAll(NODE_RESOURCE_CHANNEL, on_subscribe,
auto status = client_impl_->GetGcsPubSub().SubscribeAll(NODE_RESOURCE_CHANNEL, done);
on_subscribe, done);
RAY_LOG(DEBUG) << "Finished subscribing node resources change.";
return status;
}; };
return subscribe_resource_operation_(done); return subscribe_resource_operation_(done);
} }
@ -696,19 +692,15 @@ Status ServiceBasedNodeInfoAccessor::AsyncReportBatchHeartbeat(
Status ServiceBasedNodeInfoAccessor::AsyncSubscribeBatchHeartbeat( Status ServiceBasedNodeInfoAccessor::AsyncSubscribeBatchHeartbeat(
const ItemCallback<rpc::HeartbeatBatchTableData> &subscribe, const ItemCallback<rpc::HeartbeatBatchTableData> &subscribe,
const StatusCallback &done) { const StatusCallback &done) {
RAY_LOG(DEBUG) << "Subscribing batch heartbeat.";
RAY_CHECK(subscribe != nullptr); RAY_CHECK(subscribe != nullptr);
subscribe_batch_heartbeat_operation_ = [this, subscribe](const StatusCallback &done) { subscribe_batch_heartbeat_operation_ = [this, subscribe](const StatusCallback &done) {
auto on_subscribe = [subscribe](const std::string &id, const std::string &data) { auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
rpc::HeartbeatBatchTableData heartbeat_batch_table_data; rpc::HeartbeatBatchTableData heartbeat_batch_table_data;
heartbeat_batch_table_data.ParseFromString(data); heartbeat_batch_table_data.ParseFromString(data);
subscribe(heartbeat_batch_table_data); subscribe(heartbeat_batch_table_data);
}; };
auto status = client_impl_->GetGcsPubSub().Subscribe(HEARTBEAT_BATCH_CHANNEL, "", return client_impl_->GetGcsPubSub().Subscribe(HEARTBEAT_BATCH_CHANNEL, "",
on_subscribe, done); on_subscribe, done);
RAY_LOG(DEBUG) << "Finished subscribing batch heartbeat.";
return status;
}; };
return subscribe_batch_heartbeat_operation_(done); return subscribe_batch_heartbeat_operation_(done);
} }
@ -752,18 +744,27 @@ void ServiceBasedNodeInfoAccessor::HandleNotification(const GcsNodeInfo &node_in
} }
} }
Status ServiceBasedNodeInfoAccessor::AsyncReSubscribe() { void ServiceBasedNodeInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restarted) {
RAY_LOG(INFO) << "Reestablishing subscription for node info."; RAY_LOG(INFO) << "Reestablishing subscription for node info.";
// If only the GCS sever has restarted, we only need to fetch data from the GCS server.
// If the pub-sub server has also restarted, we need to resubscribe to the pub-sub
// server first, then fetch data from the GCS server.
if (is_pubsub_server_restarted) {
if (subscribe_node_operation_ != nullptr) { if (subscribe_node_operation_ != nullptr) {
return subscribe_node_operation_(nullptr); RAY_CHECK_OK(subscribe_node_operation_(
[this](const Status &status) { fetch_node_data_operation_(nullptr); }));
} }
if (subscribe_resource_operation_ != nullptr) { if (subscribe_resource_operation_ != nullptr) {
return subscribe_resource_operation_(nullptr); RAY_CHECK_OK(subscribe_resource_operation_(nullptr));
} }
if (subscribe_batch_heartbeat_operation_ != nullptr) { if (subscribe_batch_heartbeat_operation_ != nullptr) {
return subscribe_batch_heartbeat_operation_(nullptr); RAY_CHECK_OK(subscribe_batch_heartbeat_operation_(nullptr));
}
} else {
if (fetch_node_data_operation_ != nullptr) {
fetch_node_data_operation_(nullptr);
}
} }
return Status::OK();
} }
ServiceBasedTaskInfoAccessor::ServiceBasedTaskInfoAccessor( ServiceBasedTaskInfoAccessor::ServiceBasedTaskInfoAccessor(
@ -829,46 +830,46 @@ Status ServiceBasedTaskInfoAccessor::AsyncDelete(const std::vector<TaskID> &task
Status ServiceBasedTaskInfoAccessor::AsyncSubscribe( Status ServiceBasedTaskInfoAccessor::AsyncSubscribe(
const TaskID &task_id, const SubscribeCallback<TaskID, rpc::TaskTableData> &subscribe, const TaskID &task_id, const SubscribeCallback<TaskID, rpc::TaskTableData> &subscribe,
const StatusCallback &done) { const StatusCallback &done) {
RAY_LOG(DEBUG) << "Subscribing task, task id = " << task_id;
RAY_CHECK(subscribe != nullptr) << "Failed to subscribe task, task id = " << task_id; RAY_CHECK(subscribe != nullptr) << "Failed to subscribe task, task id = " << task_id;
auto subscribe_operation = [this, task_id, subscribe](const StatusCallback &done) { auto fetch_data_operation = [this, task_id,
subscribe](const StatusCallback &fetch_done) {
auto callback = [task_id, subscribe, fetch_done](
const Status &status,
const boost::optional<rpc::TaskTableData> &result) {
if (result) {
subscribe(task_id, *result);
}
if (fetch_done) {
fetch_done(status);
}
};
RAY_CHECK_OK(AsyncGet(task_id, callback));
};
auto subscribe_operation = [this, task_id,
subscribe](const StatusCallback &subscribe_done) {
auto on_subscribe = [task_id, subscribe](const std::string &id, auto on_subscribe = [task_id, subscribe](const std::string &id,
const std::string &data) { const std::string &data) {
TaskTableData task_data; TaskTableData task_data;
task_data.ParseFromString(data); task_data.ParseFromString(data);
subscribe(task_id, task_data); subscribe(task_id, task_data);
}; };
auto on_done = [this, task_id, subscribe, done](const Status &status) { return client_impl_->GetGcsPubSub().Subscribe(TASK_CHANNEL, task_id.Hex(),
if (status.ok()) { on_subscribe, subscribe_done);
auto callback = [task_id, subscribe, done](
const Status &status,
const boost::optional<rpc::TaskTableData> &result) {
if (result) {
subscribe(task_id, *result);
}
if (done) {
done(status);
}
};
RAY_CHECK_OK(AsyncGet(task_id, callback));
} else if (done) {
done(status);
}
};
auto status = client_impl_->GetGcsPubSub().Subscribe(TASK_CHANNEL, task_id.Hex(),
on_subscribe, on_done);
RAY_LOG(DEBUG) << "Finished subscribing task, task id = " << task_id;
return status;
}; };
subscribe_task_operations_[task_id] = subscribe_operation; subscribe_task_operations_[task_id] = subscribe_operation;
return subscribe_operation(done); fetch_task_data_operations_[task_id] = fetch_data_operation;
return subscribe_operation(
[fetch_data_operation, done](const Status &status) { fetch_data_operation(done); });
} }
Status ServiceBasedTaskInfoAccessor::AsyncUnsubscribe(const TaskID &task_id) { Status ServiceBasedTaskInfoAccessor::AsyncUnsubscribe(const TaskID &task_id) {
RAY_LOG(DEBUG) << "Unsubscribing task, task id = " << task_id; RAY_LOG(DEBUG) << "Unsubscribing task, task id = " << task_id;
auto status = client_impl_->GetGcsPubSub().Unsubscribe(TASK_CHANNEL, task_id.Hex()); auto status = client_impl_->GetGcsPubSub().Unsubscribe(TASK_CHANNEL, task_id.Hex());
subscribe_task_operations_.erase(task_id); subscribe_task_operations_.erase(task_id);
fetch_task_data_operations_.erase(task_id);
RAY_LOG(DEBUG) << "Finished unsubscribing task, task id = " << task_id; RAY_LOG(DEBUG) << "Finished unsubscribing task, task id = " << task_id;
return status; return status;
} }
@ -916,39 +917,38 @@ Status ServiceBasedTaskInfoAccessor::AsyncSubscribeTaskLease(
const TaskID &task_id, const TaskID &task_id,
const SubscribeCallback<TaskID, boost::optional<rpc::TaskLeaseData>> &subscribe, const SubscribeCallback<TaskID, boost::optional<rpc::TaskLeaseData>> &subscribe,
const StatusCallback &done) { const StatusCallback &done) {
RAY_LOG(DEBUG) << "Subscribing task lease, task id = " << task_id;
RAY_CHECK(subscribe != nullptr) RAY_CHECK(subscribe != nullptr)
<< "Failed to subscribe task lease, task id = " << task_id; << "Failed to subscribe task lease, task id = " << task_id;
auto subscribe_operation = [this, task_id, subscribe](const StatusCallback &done) { auto fetch_data_operation = [this, task_id,
subscribe](const StatusCallback &fetch_done) {
auto callback = [task_id, subscribe, fetch_done](
const Status &status,
const boost::optional<rpc::TaskLeaseData> &result) {
subscribe(task_id, result);
if (fetch_done) {
fetch_done(status);
}
};
RAY_CHECK_OK(AsyncGetTaskLease(task_id, callback));
};
auto subscribe_operation = [this, task_id,
subscribe](const StatusCallback &subscribe_done) {
auto on_subscribe = [task_id, subscribe](const std::string &id, auto on_subscribe = [task_id, subscribe](const std::string &id,
const std::string &data) { const std::string &data) {
TaskLeaseData task_lease_data; TaskLeaseData task_lease_data;
task_lease_data.ParseFromString(data); task_lease_data.ParseFromString(data);
subscribe(task_id, task_lease_data); subscribe(task_id, task_lease_data);
}; };
auto on_done = [this, task_id, subscribe, done](const Status &status) { return client_impl_->GetGcsPubSub().Subscribe(TASK_LEASE_CHANNEL, task_id.Hex(),
if (status.ok()) { on_subscribe, subscribe_done);
auto callback = [task_id, subscribe, done](
const Status &status,
const boost::optional<rpc::TaskLeaseData> &result) {
subscribe(task_id, result);
if (done) {
done(status);
}
};
RAY_CHECK_OK(AsyncGetTaskLease(task_id, callback));
} else if (done) {
done(status);
}
};
auto status = client_impl_->GetGcsPubSub().Subscribe(
TASK_LEASE_CHANNEL, task_id.Hex(), on_subscribe, on_done);
RAY_LOG(DEBUG) << "Finished subscribing task lease, task id = " << task_id;
return status;
}; };
subscribe_task_lease_operations_[task_id] = subscribe_operation; subscribe_task_lease_operations_[task_id] = subscribe_operation;
return subscribe_operation(done); fetch_task_lease_data_operations_[task_id] = fetch_data_operation;
return subscribe_operation(
[fetch_data_operation, done](const Status &status) { fetch_data_operation(done); });
} }
Status ServiceBasedTaskInfoAccessor::AsyncUnsubscribeTaskLease(const TaskID &task_id) { Status ServiceBasedTaskInfoAccessor::AsyncUnsubscribeTaskLease(const TaskID &task_id) {
@ -956,6 +956,7 @@ Status ServiceBasedTaskInfoAccessor::AsyncUnsubscribeTaskLease(const TaskID &tas
auto status = auto status =
client_impl_->GetGcsPubSub().Unsubscribe(TASK_LEASE_CHANNEL, task_id.Hex()); client_impl_->GetGcsPubSub().Unsubscribe(TASK_LEASE_CHANNEL, task_id.Hex());
subscribe_task_lease_operations_.erase(task_id); subscribe_task_lease_operations_.erase(task_id);
fetch_task_lease_data_operations_.erase(task_id);
RAY_LOG(DEBUG) << "Finished unsubscribing task lease, task id = " << task_id; RAY_LOG(DEBUG) << "Finished unsubscribing task lease, task id = " << task_id;
return status; return status;
} }
@ -982,15 +983,32 @@ Status ServiceBasedTaskInfoAccessor::AttemptTaskReconstruction(
return Status::OK(); return Status::OK();
} }
Status ServiceBasedTaskInfoAccessor::AsyncReSubscribe() { void ServiceBasedTaskInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restarted) {
RAY_LOG(INFO) << "Reestablishing subscription for task info."; RAY_LOG(INFO) << "Reestablishing subscription for task info.";
// If only the GCS sever has restarted, we only need to fetch data from the GCS server.
// If the pub-sub server has also restarted, we need to resubscribe to the pub-sub
// server first, then fetch data from the GCS server.
if (is_pubsub_server_restarted) {
for (auto &item : subscribe_task_operations_) { for (auto &item : subscribe_task_operations_) {
RAY_CHECK_OK(item.second(nullptr)); auto &task_id = item.first;
RAY_CHECK_OK(item.second([this, task_id](const Status &status) {
fetch_task_data_operations_[task_id](nullptr);
}));
} }
for (auto &item : subscribe_task_lease_operations_) { for (auto &item : subscribe_task_lease_operations_) {
RAY_CHECK_OK(item.second(nullptr)); auto &task_id = item.first;
RAY_CHECK_OK(item.second([this, task_id](const Status &status) {
fetch_task_lease_data_operations_[task_id](nullptr);
}));
}
} else {
for (auto &item : fetch_task_data_operations_) {
item.second(nullptr);
}
for (auto &item : fetch_task_lease_data_operations_) {
item.second(nullptr);
}
} }
return Status::OK();
} }
ServiceBasedObjectInfoAccessor::ServiceBasedObjectInfoAccessor( ServiceBasedObjectInfoAccessor::ServiceBasedObjectInfoAccessor(
@ -1093,10 +1111,28 @@ Status ServiceBasedObjectInfoAccessor::AsyncSubscribeToLocations(
const ObjectID &object_id, const ObjectID &object_id,
const SubscribeCallback<ObjectID, ObjectChangeNotification> &subscribe, const SubscribeCallback<ObjectID, ObjectChangeNotification> &subscribe,
const StatusCallback &done) { const StatusCallback &done) {
RAY_LOG(DEBUG) << "Subscribing object location, object id = " << object_id;
RAY_CHECK(subscribe != nullptr) RAY_CHECK(subscribe != nullptr)
<< "Failed to subscribe object location, object id = " << object_id; << "Failed to subscribe object location, object id = " << object_id;
auto subscribe_operation = [this, object_id, subscribe](const StatusCallback &done) {
auto fetch_data_operation = [this, object_id,
subscribe](const StatusCallback &fetch_done) {
auto callback = [object_id, subscribe, fetch_done](
const Status &status,
const std::vector<rpc::ObjectTableData> &result) {
if (status.ok()) {
gcs::ObjectChangeNotification notification(rpc::GcsChangeMode::APPEND_OR_ADD,
result);
subscribe(object_id, notification);
}
if (fetch_done) {
fetch_done(status);
}
};
RAY_CHECK_OK(AsyncGetLocations(object_id, callback));
};
auto subscribe_operation = [this, object_id,
subscribe](const StatusCallback &subscribe_done) {
auto on_subscribe = [object_id, subscribe](const std::string &id, auto on_subscribe = [object_id, subscribe](const std::string &id,
const std::string &data) { const std::string &data) {
rpc::ObjectLocationChange object_location_change; rpc::ObjectLocationChange object_location_change;
@ -1109,40 +1145,32 @@ Status ServiceBasedObjectInfoAccessor::AsyncSubscribeToLocations(
gcs::ObjectChangeNotification notification(change_mode, object_data_vector); gcs::ObjectChangeNotification notification(change_mode, object_data_vector);
subscribe(object_id, notification); subscribe(object_id, notification);
}; };
auto on_done = [this, object_id, subscribe, done](const Status &status) { return client_impl_->GetGcsPubSub().Subscribe(OBJECT_CHANNEL, object_id.Hex(),
if (status.ok()) { on_subscribe, subscribe_done);
auto callback = [object_id, subscribe, done](
const Status &status,
const std::vector<rpc::ObjectTableData> &result) {
if (status.ok()) {
gcs::ObjectChangeNotification notification(rpc::GcsChangeMode::APPEND_OR_ADD,
result);
subscribe(object_id, notification);
}
if (done) {
done(status);
}
};
RAY_CHECK_OK(AsyncGetLocations(object_id, callback));
} else if (done) {
done(status);
}
};
auto status = client_impl_->GetGcsPubSub().Subscribe(OBJECT_CHANNEL, object_id.Hex(),
on_subscribe, on_done);
RAY_LOG(DEBUG) << "Finished subscribing object location, object id = " << object_id;
return status;
}; };
subscribe_object_operations_[object_id] = subscribe_operation; subscribe_object_operations_[object_id] = subscribe_operation;
return subscribe_operation(done); fetch_object_data_operations_[object_id] = fetch_data_operation;
return subscribe_operation(
[fetch_data_operation, done](const Status &status) { fetch_data_operation(done); });
} }
Status ServiceBasedObjectInfoAccessor::AsyncReSubscribe() { void ServiceBasedObjectInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restarted) {
RAY_LOG(INFO) << "Reestablishing subscription for object locations."; RAY_LOG(INFO) << "Reestablishing subscription for object locations.";
// If only the GCS sever has restarted, we only need to fetch data from the GCS server.
// If the pub-sub server has also restarted, we need to resubscribe to the pub-sub
// server first, then fetch data from the GCS server.
if (is_pubsub_server_restarted) {
for (auto &item : subscribe_object_operations_) { for (auto &item : subscribe_object_operations_) {
RAY_CHECK_OK(item.second(nullptr)); RAY_CHECK_OK(item.second([this, item](const Status &status) {
fetch_object_data_operations_[item.first](nullptr);
}));
}
} else {
for (auto &item : fetch_object_data_operations_) {
item.second(nullptr);
}
} }
return Status::OK();
} }
Status ServiceBasedObjectInfoAccessor::AsyncUnsubscribeToLocations( Status ServiceBasedObjectInfoAccessor::AsyncUnsubscribeToLocations(
@ -1150,6 +1178,7 @@ Status ServiceBasedObjectInfoAccessor::AsyncUnsubscribeToLocations(
RAY_LOG(DEBUG) << "Unsubscribing object location, object id = " << object_id; RAY_LOG(DEBUG) << "Unsubscribing object location, object id = " << object_id;
auto status = client_impl_->GetGcsPubSub().Unsubscribe(OBJECT_CHANNEL, object_id.Hex()); auto status = client_impl_->GetGcsPubSub().Unsubscribe(OBJECT_CHANNEL, object_id.Hex());
subscribe_object_operations_.erase(object_id); subscribe_object_operations_.erase(object_id);
fetch_object_data_operations_.erase(object_id);
RAY_LOG(DEBUG) << "Finished unsubscribing object location, object id = " << object_id; RAY_LOG(DEBUG) << "Finished unsubscribing object location, object id = " << object_id;
return status; return status;
} }
@ -1225,7 +1254,6 @@ ServiceBasedWorkerInfoAccessor::ServiceBasedWorkerInfoAccessor(
Status ServiceBasedWorkerInfoAccessor::AsyncSubscribeToWorkerFailures( Status ServiceBasedWorkerInfoAccessor::AsyncSubscribeToWorkerFailures(
const SubscribeCallback<WorkerID, rpc::WorkerFailureData> &subscribe, const SubscribeCallback<WorkerID, rpc::WorkerFailureData> &subscribe,
const StatusCallback &done) { const StatusCallback &done) {
RAY_LOG(DEBUG) << "Subscribing worker failures.";
RAY_CHECK(subscribe != nullptr); RAY_CHECK(subscribe != nullptr);
subscribe_operation_ = [this, subscribe](const StatusCallback &done) { subscribe_operation_ = [this, subscribe](const StatusCallback &done) {
auto on_subscribe = [subscribe](const std::string &id, const std::string &data) { auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
@ -1233,20 +1261,18 @@ Status ServiceBasedWorkerInfoAccessor::AsyncSubscribeToWorkerFailures(
worker_failure_data.ParseFromString(data); worker_failure_data.ParseFromString(data);
subscribe(WorkerID::FromBinary(id), worker_failure_data); subscribe(WorkerID::FromBinary(id), worker_failure_data);
}; };
auto status = client_impl_->GetGcsPubSub().SubscribeAll(WORKER_FAILURE_CHANNEL, return client_impl_->GetGcsPubSub().SubscribeAll(WORKER_FAILURE_CHANNEL, on_subscribe,
on_subscribe, done); done);
RAY_LOG(DEBUG) << "Finished subscribing worker failures.";
return status;
}; };
return subscribe_operation_(done); return subscribe_operation_(done);
} }
Status ServiceBasedWorkerInfoAccessor::AsyncReSubscribe() { void ServiceBasedWorkerInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restarted) {
RAY_LOG(INFO) << "Reestablishing subscription for worker failures."; RAY_LOG(INFO) << "Reestablishing subscription for worker failures.";
if (subscribe_operation_ != nullptr) { // If the pub-sub server has restarted, we need to resubscribe to the pub-sub server.
return subscribe_operation_(nullptr); if (subscribe_operation_ != nullptr && is_pubsub_server_restarted) {
RAY_CHECK_OK(subscribe_operation_(nullptr));
} }
return Status::OK();
} }
Status ServiceBasedWorkerInfoAccessor::AsyncReportWorkerFailure( Status ServiceBasedWorkerInfoAccessor::AsyncReportWorkerFailure(

View file

@ -24,6 +24,8 @@ namespace gcs {
using SubscribeOperation = std::function<Status(const StatusCallback &done)>; using SubscribeOperation = std::function<Status(const StatusCallback &done)>;
using FetchDataOperation = std::function<void(const StatusCallback &done)>;
class ServiceBasedGcsClient; class ServiceBasedGcsClient;
/// \class ServiceBasedJobInfoAccessor /// \class ServiceBasedJobInfoAccessor
@ -46,11 +48,11 @@ class ServiceBasedJobInfoAccessor : public JobInfoAccessor {
Status AsyncGetAll(const MultiItemCallback<rpc::JobTableData> &callback) override; Status AsyncGetAll(const MultiItemCallback<rpc::JobTableData> &callback) override;
Status AsyncReSubscribe() override; void AsyncResubscribe(bool is_pubsub_server_restarted) override;
private: private:
/// Save the subscribe operation in this function, so we can call it again when GCS /// Save the subscribe operation in this function, so we can call it again when PubSub
/// restarts from a failure. /// server restarts from a failure.
SubscribeOperation subscribe_operation_; SubscribeOperation subscribe_operation_;
ServiceBasedGcsClient *client_impl_; ServiceBasedGcsClient *client_impl_;
@ -107,15 +109,23 @@ class ServiceBasedActorInfoAccessor : public ActorInfoAccessor {
const ActorID &actor_id, const ActorID &actor_id,
const OptionalItemCallback<rpc::ActorCheckpointIdData> &callback) override; const OptionalItemCallback<rpc::ActorCheckpointIdData> &callback) override;
Status AsyncReSubscribe() override; void AsyncResubscribe(bool is_pubsub_server_restarted) override;
private: private:
/// Save the subscribe operation in this function, so we can call it again when GCS /// Save the subscribe operation in this function, so we can call it again when PubSub
/// restarts from a failure. /// server restarts from a failure.
SubscribeOperation subscribe_all_operation_; SubscribeOperation subscribe_all_operation_;
/// Save the fetch data operation in this function, so we can call it again when GCS
/// server restarts from a failure.
FetchDataOperation fetch_all_data_operation_;
/// Save the subscribe operation of actors. /// Save the subscribe operation of actors.
std::unordered_map<ActorID, SubscribeOperation> subscribe_operations_; std::unordered_map<ActorID, SubscribeOperation> subscribe_operations_;
/// Save the fetch data operation of actors.
std::unordered_map<ActorID, FetchDataOperation> fetch_data_operations_;
ServiceBasedGcsClient *client_impl_; ServiceBasedGcsClient *client_impl_;
Sequencer<ActorID> sequencer_; Sequencer<ActorID> sequencer_;
@ -184,15 +194,19 @@ class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor {
const ItemCallback<rpc::HeartbeatBatchTableData> &subscribe, const ItemCallback<rpc::HeartbeatBatchTableData> &subscribe,
const StatusCallback &done) override; const StatusCallback &done) override;
Status AsyncReSubscribe() override; void AsyncResubscribe(bool is_pubsub_server_restarted) override;
private: private:
/// Save the subscribe operation in this function, so we can call it again when GCS /// Save the subscribe operation in this function, so we can call it again when PubSub
/// restarts from a failure. /// server restarts from a failure.
SubscribeOperation subscribe_node_operation_; SubscribeOperation subscribe_node_operation_;
SubscribeOperation subscribe_resource_operation_; SubscribeOperation subscribe_resource_operation_;
SubscribeOperation subscribe_batch_heartbeat_operation_; SubscribeOperation subscribe_batch_heartbeat_operation_;
/// Save the fetch data operation in this function, so we can call it again when GCS
/// server restarts from a failure.
FetchDataOperation fetch_node_data_operation_;
void HandleNotification(const GcsNodeInfo &node_info); void HandleNotification(const GcsNodeInfo &node_info);
ServiceBasedGcsClient *client_impl_; ServiceBasedGcsClient *client_impl_;
@ -256,14 +270,19 @@ class ServiceBasedTaskInfoAccessor : public TaskInfoAccessor {
const std::shared_ptr<rpc::TaskReconstructionData> &data_ptr, const std::shared_ptr<rpc::TaskReconstructionData> &data_ptr,
const StatusCallback &callback) override; const StatusCallback &callback) override;
Status AsyncReSubscribe() override; void AsyncResubscribe(bool is_pubsub_server_restarted) override;
private: private:
/// Save the subscribe operation in this function, so we can call it again when GCS /// Save the subscribe operations, so we can call them again when PubSub
/// restarts from a failure. /// server restarts from a failure.
std::unordered_map<TaskID, SubscribeOperation> subscribe_task_operations_; std::unordered_map<TaskID, SubscribeOperation> subscribe_task_operations_;
std::unordered_map<TaskID, SubscribeOperation> subscribe_task_lease_operations_; std::unordered_map<TaskID, SubscribeOperation> subscribe_task_lease_operations_;
/// Save the fetch data operation in this function, so we can call it again when GCS
/// server restarts from a failure.
std::unordered_map<TaskID, FetchDataOperation> fetch_task_data_operations_;
std::unordered_map<TaskID, FetchDataOperation> fetch_task_lease_data_operations_;
ServiceBasedGcsClient *client_impl_; ServiceBasedGcsClient *client_impl_;
}; };
@ -295,13 +314,17 @@ class ServiceBasedObjectInfoAccessor : public ObjectInfoAccessor {
Status AsyncUnsubscribeToLocations(const ObjectID &object_id) override; Status AsyncUnsubscribeToLocations(const ObjectID &object_id) override;
Status AsyncReSubscribe() override; void AsyncResubscribe(bool is_pubsub_server_restarted) override;
private: private:
/// Save the subscribe operation in this function, so we can call it again when GCS /// Save the subscribe operations, so we can call them again when PubSub
/// restarts from a failure. /// server restarts from a failure.
std::unordered_map<ObjectID, SubscribeOperation> subscribe_object_operations_; std::unordered_map<ObjectID, SubscribeOperation> subscribe_object_operations_;
/// Save the fetch data operation in this function, so we can call it again when GCS
/// server restarts from a failure.
std::unordered_map<ObjectID, FetchDataOperation> fetch_object_data_operations_;
ServiceBasedGcsClient *client_impl_; ServiceBasedGcsClient *client_impl_;
Sequencer<ObjectID> sequencer_; Sequencer<ObjectID> sequencer_;
@ -362,7 +385,7 @@ class ServiceBasedWorkerInfoAccessor : public WorkerInfoAccessor {
const std::unordered_map<std::string, std::string> &worker_info, const std::unordered_map<std::string, std::string> &worker_info,
const StatusCallback &callback) override; const StatusCallback &callback) override;
Status AsyncReSubscribe() override; void AsyncResubscribe(bool is_pubsub_server_restarted) override;
private: private:
/// Save the subscribe operation in this function, so we can call it again when GCS /// Save the subscribe operation in this function, so we can call it again when GCS

View file

@ -47,13 +47,13 @@ Status ServiceBasedGcsClient::Connect(boost::asio::io_service &io_service) {
}; };
std::pair<std::string, int> address = get_server_address(); std::pair<std::string, int> address = get_server_address();
auto re_subscribe = [this]() { auto re_subscribe = [this](bool is_pubsub_server_restarted) {
RAY_CHECK_OK(job_accessor_->AsyncReSubscribe()); job_accessor_->AsyncResubscribe(is_pubsub_server_restarted);
RAY_CHECK_OK(actor_accessor_->AsyncReSubscribe()); actor_accessor_->AsyncResubscribe(is_pubsub_server_restarted);
RAY_CHECK_OK(node_accessor_->AsyncReSubscribe()); node_accessor_->AsyncResubscribe(is_pubsub_server_restarted);
RAY_CHECK_OK(task_accessor_->AsyncReSubscribe()); task_accessor_->AsyncResubscribe(is_pubsub_server_restarted);
RAY_CHECK_OK(object_accessor_->AsyncReSubscribe()); object_accessor_->AsyncResubscribe(is_pubsub_server_restarted);
RAY_CHECK_OK(worker_accessor_->AsyncReSubscribe()); worker_accessor_->AsyncResubscribe(is_pubsub_server_restarted);
}; };
// Connect to gcs service. // Connect to gcs service.

View file

@ -483,6 +483,11 @@ class ServiceBasedGcsClientTest : public ::testing::Test {
EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count())); EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count()));
} }
void CheckActorData(const gcs::ActorTableData &actor,
rpc::ActorTableData_ActorState expected_state) {
ASSERT_TRUE(actor.state() == expected_state);
}
// GCS server. // GCS server.
gcs::GcsServerConfig config_; gcs::GcsServerConfig config_;
std::unique_ptr<gcs::GcsServer> gcs_server_; std::unique_ptr<gcs::GcsServer> gcs_server_;
@ -835,7 +840,7 @@ TEST_F(ServiceBasedGcsClientTest, TestErrorInfo) {
ASSERT_TRUE(ReportJobError(error_table_data)); ASSERT_TRUE(ReportJobError(error_table_data));
} }
TEST_F(ServiceBasedGcsClientTest, TestJobTableReSubscribe) { TEST_F(ServiceBasedGcsClientTest, TestJobTableResubscribe) {
// Test that subscription of the job table can still work when GCS server restarts. // Test that subscription of the job table can still work when GCS server restarts.
JobID job_id = JobID::FromInt(1); JobID job_id = JobID::FromInt(1);
auto job_table_data = Mocker::GenJobTableData(job_id); auto job_table_data = Mocker::GenJobTableData(job_id);
@ -854,53 +859,79 @@ TEST_F(ServiceBasedGcsClientTest, TestJobTableReSubscribe) {
WaitPendingDone(job_update_count, 1); WaitPendingDone(job_update_count, 1);
} }
TEST_F(ServiceBasedGcsClientTest, TestActorTableReSubscribe) { TEST_F(ServiceBasedGcsClientTest, TestActorTableResubscribe) {
// Test that subscription of the actor table can still work when GCS server restarts. // Test that subscription of the actor table can still work when GCS server restarts.
JobID job_id = JobID::FromInt(1); JobID job_id = JobID::FromInt(1);
auto actor1_table_data = Mocker::GenActorTableData(job_id); auto actor_table_data = Mocker::GenActorTableData(job_id);
auto actor1_id = ActorID::FromBinary(actor1_table_data->actor_id()); auto actor_id = ActorID::FromBinary(actor_table_data->actor_id());
auto actor2_table_data = Mocker::GenActorTableData(job_id);
auto actor2_id = ActorID::FromBinary(actor2_table_data->actor_id());
// Subscribe to any register or update operations of actors. // Number of notifications for the following `SubscribeAllActors` operation.
std::atomic<int> actors_update_count(0); std::atomic<int> num_subscribe_all_notifications(0);
auto subscribe_all = [&actors_update_count](const ActorID &id, // All the notifications for the following `SubscribeAllActors` operation.
const rpc::ActorTableData &result) { std::vector<gcs::ActorTableData> subscribe_all_notifications;
++actors_update_count; auto subscribe_all = [&num_subscribe_all_notifications, &subscribe_all_notifications](
const ActorID &id, const rpc::ActorTableData &data) {
subscribe_all_notifications.emplace_back(data);
++num_subscribe_all_notifications;
}; };
// Subscribe to updates of all actors.
ASSERT_TRUE(SubscribeAllActors(subscribe_all)); ASSERT_TRUE(SubscribeAllActors(subscribe_all));
// Subscribe to any update operations of actor1. // Number of notifications for the following `SubscribeActor` operation.
std::atomic<int> actor1_update_count(0); std::atomic<int> num_subscribe_one_notifications(0);
auto actor1_subscribe = [&actor1_update_count](const ActorID &actor_id, // All the notifications for the following `SubscribeActor` operation.
const gcs::ActorTableData &data) { std::vector<gcs::ActorTableData> subscribe_one_notifications;
++actor1_update_count; auto actor_subscribe = [&num_subscribe_one_notifications, &subscribe_one_notifications](
const ActorID &actor_id, const gcs::ActorTableData &data) {
subscribe_one_notifications.emplace_back(data);
++num_subscribe_one_notifications;
}; };
ASSERT_TRUE(SubscribeActor(actor1_id, actor1_subscribe)); // Subscribe to updates for this actor.
ASSERT_TRUE(SubscribeActor(actor_id, actor_subscribe));
// Subscribe to any update operations of actor2. ASSERT_TRUE(RegisterActor(actor_table_data));
std::atomic<int> actor2_update_count(0);
auto actor2_subscribe = [&actor2_update_count](const ActorID &actor_id,
const gcs::ActorTableData &data) {
++actor2_update_count;
};
ASSERT_TRUE(SubscribeActor(actor2_id, actor2_subscribe));
ASSERT_TRUE(RegisterActor(actor1_table_data)); // We should receive a new ALIVE notification from the subscribe channel.
ASSERT_TRUE(RegisterActor(actor2_table_data)); WaitPendingDone(num_subscribe_all_notifications, 1);
WaitPendingDone(actor2_update_count, 1); WaitPendingDone(num_subscribe_one_notifications, 1);
UnsubscribeActor(actor2_id); CheckActorData(subscribe_all_notifications[0],
rpc::ActorTableData_ActorState::ActorTableData_ActorState_ALIVE);
CheckActorData(subscribe_one_notifications[0],
rpc::ActorTableData_ActorState::ActorTableData_ActorState_ALIVE);
// Restart GCS server.
RestartGcsServer(); RestartGcsServer();
ASSERT_TRUE(UpdateActor(actor1_id, actor1_table_data)); // We need to send a RPC to detect GCS server restart. Then GCS client will
ASSERT_TRUE(UpdateActor(actor2_id, actor2_table_data)); // reconnect to GCS server and resubscribe.
WaitPendingDone(actor1_update_count, 3); ASSERT_TRUE(GetActor(actor_id).state() ==
WaitPendingDone(actor2_update_count, 1); rpc::ActorTableData_ActorState::ActorTableData_ActorState_ALIVE);
UnsubscribeActor(actor1_id);
// When GCS client detects that GCS server has restarted, but the pub-sub server
// didn't restart, it will fetch data again from the GCS server. So we'll receive
// another notification of ALIVE state.
WaitPendingDone(num_subscribe_all_notifications, 2);
WaitPendingDone(num_subscribe_one_notifications, 2);
CheckActorData(subscribe_all_notifications[1],
rpc::ActorTableData_ActorState::ActorTableData_ActorState_ALIVE);
CheckActorData(subscribe_one_notifications[1],
rpc::ActorTableData_ActorState::ActorTableData_ActorState_ALIVE);
// Update the actor state to DEAD.
actor_table_data->set_state(
rpc::ActorTableData_ActorState::ActorTableData_ActorState_DEAD);
ASSERT_TRUE(UpdateActor(actor_id, actor_table_data));
// We should receive a new DEAD notification from the subscribe channel.
WaitPendingDone(num_subscribe_all_notifications, 3);
WaitPendingDone(num_subscribe_one_notifications, 3);
CheckActorData(subscribe_all_notifications[2],
rpc::ActorTableData_ActorState::ActorTableData_ActorState_DEAD);
CheckActorData(subscribe_one_notifications[2],
rpc::ActorTableData_ActorState::ActorTableData_ActorState_DEAD);
} }
TEST_F(ServiceBasedGcsClientTest, TestObjectTableReSubscribe) { TEST_F(ServiceBasedGcsClientTest, TestObjectTableResubscribe) {
ObjectID object1_id = ObjectID::FromRandom(); ObjectID object1_id = ObjectID::FromRandom();
ObjectID object2_id = ObjectID::FromRandom(); ObjectID object2_id = ObjectID::FromRandom();
ClientID node_id = ClientID::FromRandom(); ClientID node_id = ClientID::FromRandom();
@ -942,7 +973,7 @@ TEST_F(ServiceBasedGcsClientTest, TestObjectTableReSubscribe) {
WaitPendingDone(object2_change_count, 2); WaitPendingDone(object2_change_count, 2);
} }
TEST_F(ServiceBasedGcsClientTest, TestNodeTableReSubscribe) { TEST_F(ServiceBasedGcsClientTest, TestNodeTableResubscribe) {
// Test that subscription of the node table can still work when GCS server restarts. // Test that subscription of the node table can still work when GCS server restarts.
// Subscribe to node addition and removal events from GCS and cache those information. // Subscribe to node addition and removal events from GCS and cache those information.
std::atomic<int> node_change_count(0); std::atomic<int> node_change_count(0);
@ -968,8 +999,6 @@ TEST_F(ServiceBasedGcsClientTest, TestNodeTableReSubscribe) {
}; };
ASSERT_TRUE(SubscribeBatchHeartbeat(batch_heartbeat_subscribe)); ASSERT_TRUE(SubscribeBatchHeartbeat(batch_heartbeat_subscribe));
RestartGcsServer();
auto node_info = Mocker::GenNodeInfo(1); auto node_info = Mocker::GenNodeInfo(1);
ASSERT_TRUE(RegisterNode(*node_info)); ASSERT_TRUE(RegisterNode(*node_info));
ClientID node_id = ClientID::FromBinary(node_info->node_id()); ClientID node_id = ClientID::FromBinary(node_info->node_id());
@ -978,13 +1007,23 @@ TEST_F(ServiceBasedGcsClientTest, TestNodeTableReSubscribe) {
auto heartbeat = std::make_shared<rpc::HeartbeatTableData>(); auto heartbeat = std::make_shared<rpc::HeartbeatTableData>();
heartbeat->set_client_id(node_info->node_id()); heartbeat->set_client_id(node_info->node_id());
ASSERT_TRUE(ReportHeartbeat(heartbeat)); ASSERT_TRUE(ReportHeartbeat(heartbeat));
WaitPendingDone(node_change_count, 1);
WaitPendingDone(resource_change_count, 1);
WaitPendingDone(batch_heartbeat_count, 1); WaitPendingDone(batch_heartbeat_count, 1);
RestartGcsServer();
node_info = Mocker::GenNodeInfo(1);
ASSERT_TRUE(RegisterNode(*node_info));
node_id = ClientID::FromBinary(node_info->node_id());
ASSERT_TRUE(UpdateResources(node_id, key));
heartbeat->set_client_id(node_info->node_id());
ASSERT_TRUE(ReportHeartbeat(heartbeat));
WaitPendingDone(node_change_count, 2);
WaitPendingDone(resource_change_count, 2);
WaitPendingDone(batch_heartbeat_count, 2);
} }
TEST_F(ServiceBasedGcsClientTest, TestTaskTableReSubscribe) { TEST_F(ServiceBasedGcsClientTest, TestTaskTableResubscribe) {
JobID job_id = JobID::FromInt(6); JobID job_id = JobID::FromInt(6);
TaskID task_id = TaskID::ForDriverTask(job_id); TaskID task_id = TaskID::ForDriverTask(job_id);
auto task_table_data = Mocker::GenTaskTableData(job_id.Binary(), task_id.Binary()); auto task_table_data = Mocker::GenTaskTableData(job_id.Binary(), task_id.Binary());
@ -1023,7 +1062,7 @@ TEST_F(ServiceBasedGcsClientTest, TestTaskTableReSubscribe) {
WaitPendingDone(task_count, 1); WaitPendingDone(task_count, 1);
} }
TEST_F(ServiceBasedGcsClientTest, TestWorkerTableReSubscribe) { TEST_F(ServiceBasedGcsClientTest, TestWorkerTableResubscribe) {
// Subscribe to all unexpected failure of workers from GCS. // Subscribe to all unexpected failure of workers from GCS.
std::atomic<int> worker_failure_count(0); std::atomic<int> worker_failure_count(0);
auto on_subscribe = [&worker_failure_count](const WorkerID &worker_id, auto on_subscribe = [&worker_failure_count](const WorkerID &worker_id,

View file

@ -289,7 +289,7 @@ struct GcsServerMocker {
return Status::NotImplemented(""); return Status::NotImplemented("");
} }
Status AsyncReSubscribe() override { return Status::NotImplemented(""); } void AsyncResubscribe(bool is_pubsub_server_restarted) override {}
}; };
class MockedErrorInfoAccessor : public gcs::ErrorInfoAccessor { class MockedErrorInfoAccessor : public gcs::ErrorInfoAccessor {

View file

@ -82,7 +82,7 @@ class RedisLogBasedActorInfoAccessor : public ActorInfoAccessor {
const ActorID &actor_id, const ActorID &actor_id,
const OptionalItemCallback<ActorCheckpointIdData> &callback) override; const OptionalItemCallback<ActorCheckpointIdData> &callback) override;
Status AsyncReSubscribe() override { return Status::NotImplemented(""); } void AsyncResubscribe(bool is_pubsub_server_restarted) override {}
protected: protected:
virtual std::vector<ActorID> GetAllActorID() const; virtual std::vector<ActorID> GetAllActorID() const;
@ -182,9 +182,7 @@ class RedisJobInfoAccessor : public JobInfoAccessor {
return Status::NotImplemented("AsyncGetAll not implemented"); return Status::NotImplemented("AsyncGetAll not implemented");
} }
Status AsyncReSubscribe() override { void AsyncResubscribe(bool is_pubsub_server_restarted) override {}
return Status::NotImplemented("AsyncReSubscribe not implemented");
}
private: private:
/// Append job information to GCS asynchronously. /// Append job information to GCS asynchronously.
@ -242,9 +240,7 @@ class RedisTaskInfoAccessor : public TaskInfoAccessor {
const std::shared_ptr<TaskReconstructionData> &data_ptr, const std::shared_ptr<TaskReconstructionData> &data_ptr,
const StatusCallback &callback) override; const StatusCallback &callback) override;
Status AsyncReSubscribe() override { void AsyncResubscribe(bool is_pubsub_server_restarted) override {}
return Status::NotImplemented("AsyncReSubscribe not implemented");
}
private: private:
RedisGcsClient *client_impl_{nullptr}; RedisGcsClient *client_impl_{nullptr};
@ -295,7 +291,7 @@ class RedisObjectInfoAccessor : public ObjectInfoAccessor {
Status AsyncUnsubscribeToLocations(const ObjectID &object_id) override; Status AsyncUnsubscribeToLocations(const ObjectID &object_id) override;
Status AsyncReSubscribe() override { return Status::NotImplemented(""); } void AsyncResubscribe(bool is_pubsub_server_restarted) override {}
private: private:
RedisGcsClient *client_impl_{nullptr}; RedisGcsClient *client_impl_{nullptr};
@ -376,9 +372,7 @@ class RedisNodeInfoAccessor : public NodeInfoAccessor {
const ItemCallback<HeartbeatBatchTableData> &subscribe, const ItemCallback<HeartbeatBatchTableData> &subscribe,
const StatusCallback &done) override; const StatusCallback &done) override;
Status AsyncReSubscribe() override { void AsyncResubscribe(bool is_pubsub_server_restarted) override {}
return Status::NotImplemented("AsyncReSubscribe not implemented");
}
private: private:
RedisGcsClient *client_impl_{nullptr}; RedisGcsClient *client_impl_{nullptr};
@ -453,7 +447,7 @@ class RedisWorkerInfoAccessor : public WorkerInfoAccessor {
const std::unordered_map<std::string, std::string> &worker_info, const std::unordered_map<std::string, std::string> &worker_info,
const StatusCallback &callback) override; const StatusCallback &callback) override;
Status AsyncReSubscribe() override { return Status::NotImplemented(""); } void AsyncResubscribe(bool is_pubsub_server_restarted) override {}
private: private:
RedisGcsClient *client_impl_{nullptr}; RedisGcsClient *client_impl_{nullptr};

View file

@ -87,7 +87,7 @@ class GcsRpcClient {
GcsRpcClient(const std::string &address, const int port, GcsRpcClient(const std::string &address, const int port,
ClientCallManager &client_call_manager, ClientCallManager &client_call_manager,
std::function<std::pair<std::string, int>()> get_server_address = nullptr, std::function<std::pair<std::string, int>()> get_server_address = nullptr,
std::function<void()> reconnected_callback = nullptr) std::function<void(bool)> reconnected_callback = nullptr)
: client_call_manager_(client_call_manager), : client_call_manager_(client_call_manager),
get_server_address_(std::move(get_server_address)), get_server_address_(std::move(get_server_address)),
reconnected_callback_(std::move(reconnected_callback)) { reconnected_callback_(std::move(reconnected_callback)) {
@ -255,7 +255,10 @@ class GcsRpcClient {
if (index < RayConfig::instance().ping_gcs_rpc_server_max_retries()) { if (index < RayConfig::instance().ping_gcs_rpc_server_max_retries()) {
Init(address.first, address.second, client_call_manager_); Init(address.first, address.second, client_call_manager_);
if (reconnected_callback_) { if (reconnected_callback_) {
reconnected_callback_(); // TODO(ffbin): Once we separate the pubsub server and storage addresses, we can
// judge whether pubsub server is restarted. Currently, we only support the
// scenario where pubsub server does not restart.
reconnected_callback_(false);
} }
} else { } else {
RAY_LOG(FATAL) << "Couldn't reconnect to GCS server. The last attempted GCS " RAY_LOG(FATAL) << "Couldn't reconnect to GCS server. The last attempted GCS "
@ -275,7 +278,7 @@ class GcsRpcClient {
/// Note, we use ping to detect whether the reconnection is successful. If the ping /// Note, we use ping to detect whether the reconnection is successful. If the ping
/// succeeds but the RPC connection fails, this function might be called called again. /// succeeds but the RPC connection fails, this function might be called called again.
/// So it needs to be idempotent. /// So it needs to be idempotent.
std::function<void()> reconnected_callback_; std::function<void(bool)> reconnected_callback_;
/// The gRPC-generated stub. /// The gRPC-generated stub.
std::unique_ptr<GrpcClient<JobInfoGcsService>> job_info_grpc_client_; std::unique_ptr<GrpcClient<JobInfoGcsService>> job_info_grpc_client_;