GCS adapts to actor table pub sub (#8347)

This commit is contained in:
fangfengbin 2020-05-11 13:53:53 +08:00 committed by GitHub
parent 501b936114
commit 8d0c1b5e06
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 354 additions and 161 deletions

View file

@ -47,6 +47,12 @@ class ActorInfoAccessor {
virtual Status AsyncGet(const ActorID &actor_id,
const OptionalItemCallback<rpc::ActorTableData> &callback) = 0;
/// Get all actor specification from GCS asynchronously.
///
/// \param callback Callback that will be called after lookup finishes.
/// \return Status
virtual Status AsyncGetAll(const MultiItemCallback<rpc::ActorTableData> &callback) = 0;
/// Get actor specification for a named actor from GCS asynchronously.
///
/// \param name The name of the detached actor to look up in the GCS.
@ -110,10 +116,8 @@ class ActorInfoAccessor {
/// Cancel subscription to an actor.
///
/// \param actor_id The ID of the actor to be unsubscribed to.
/// \param done Callback that will be called when unsubscribe is complete.
/// \return Status
virtual Status AsyncUnsubscribe(const ActorID &actor_id,
const StatusCallback &done) = 0;
virtual Status AsyncUnsubscribe(const ActorID &actor_id) = 0;
/// Add actor checkpoint data to GCS asynchronously.
///

View file

@ -78,9 +78,7 @@ Status ServiceBasedJobInfoAccessor::AsyncSubscribeToFinishedJobs(
ServiceBasedActorInfoAccessor::ServiceBasedActorInfoAccessor(
ServiceBasedGcsClient *client_impl)
: subscribe_id_(ClientID::FromRandom()),
client_impl_(client_impl),
actor_sub_executor_(client_impl->GetRedisGcsClient().actor_table()) {}
: client_impl_(client_impl) {}
Status ServiceBasedActorInfoAccessor::GetAll(
std::vector<ActorTableData> *actor_table_data_list) {
@ -96,8 +94,7 @@ Status ServiceBasedActorInfoAccessor::AsyncGet(
request,
[actor_id, callback](const Status &status, const rpc::GetActorInfoReply &reply) {
if (reply.has_actor_table_data()) {
rpc::ActorTableData actor_table_data(reply.actor_table_data());
callback(status, actor_table_data);
callback(status, reply.actor_table_data());
} else {
callback(status, boost::none);
}
@ -107,6 +104,19 @@ Status ServiceBasedActorInfoAccessor::AsyncGet(
return Status::OK();
}
Status ServiceBasedActorInfoAccessor::AsyncGetAll(
const MultiItemCallback<rpc::ActorTableData> &callback) {
RAY_LOG(DEBUG) << "Getting all actor info.";
rpc::GetAllActorInfoRequest request;
client_impl_->GetGcsRpcClient().GetAllActorInfo(
request, [callback](const Status &status, const rpc::GetAllActorInfoReply &reply) {
auto result = VectorFromProtobuf(reply.actor_table_data());
callback(status, result);
RAY_LOG(DEBUG) << "Finished getting all actor info, status = " << status;
});
return Status::OK();
}
Status ServiceBasedActorInfoAccessor::AsyncGetByName(
const std::string &name, const OptionalItemCallback<rpc::ActorTableData> &callback) {
RAY_LOG(DEBUG) << "Getting actor info, name = " << name;
@ -152,7 +162,7 @@ Status ServiceBasedActorInfoAccessor::AsyncRegister(
request.mutable_actor_table_data()->CopyFrom(*data_ptr);
auto operation = [this, request, actor_id,
callback](SequencerDoneCallback done_callback) {
callback](const SequencerDoneCallback &done_callback) {
client_impl_->GetGcsRpcClient().RegisterActorInfo(
request, [actor_id, callback, done_callback](
const Status &status, const rpc::RegisterActorInfoReply &reply) {
@ -178,7 +188,7 @@ Status ServiceBasedActorInfoAccessor::AsyncUpdate(
request.mutable_actor_table_data()->CopyFrom(*data_ptr);
auto operation = [this, request, actor_id,
callback](SequencerDoneCallback done_callback) {
callback](const SequencerDoneCallback &done_callback) {
client_impl_->GetGcsRpcClient().UpdateActorInfo(
request, [actor_id, callback, done_callback](
const Status &status, const rpc::UpdateActorInfoReply &reply) {
@ -200,7 +210,30 @@ Status ServiceBasedActorInfoAccessor::AsyncSubscribeAll(
const StatusCallback &done) {
RAY_LOG(DEBUG) << "Subscribing register or update operations of actors.";
RAY_CHECK(subscribe != nullptr);
auto status = actor_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), subscribe, done);
auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
ActorTableData actor_data;
actor_data.ParseFromString(data);
subscribe(ActorID::FromBinary(actor_data.actor_id()), actor_data);
};
auto on_done = [this, subscribe, done](const Status &status) {
if (status.ok()) {
auto callback = [subscribe, done](
const Status &status,
const std::vector<rpc::ActorTableData> &actor_info_list) {
for (auto &actor_info : actor_info_list) {
subscribe(ActorID::FromBinary(actor_info.actor_id()), actor_info);
}
if (done) {
done(status);
}
};
RAY_CHECK_OK(AsyncGetAll(callback));
} else if (done) {
done(status);
}
};
auto status =
client_impl_->GetGcsPubSub().SubscribeAll(ACTOR_CHANNEL, on_subscribe, on_done);
RAY_LOG(DEBUG) << "Finished subscribing register or update operations of actors.";
return status;
}
@ -211,17 +244,38 @@ Status ServiceBasedActorInfoAccessor::AsyncSubscribe(
const StatusCallback &done) {
RAY_LOG(DEBUG) << "Subscribing update operations of actor, actor id = " << actor_id;
RAY_CHECK(subscribe != nullptr) << "Failed to subscribe actor, actor id = " << actor_id;
auto status =
actor_sub_executor_.AsyncSubscribe(subscribe_id_, actor_id, subscribe, done);
auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
ActorTableData actor_data;
actor_data.ParseFromString(data);
subscribe(ActorID::FromBinary(actor_data.actor_id()), actor_data);
};
auto on_done = [this, actor_id, subscribe, done](const Status &status) {
if (status.ok()) {
auto callback = [actor_id, subscribe, done](
const Status &status,
const boost::optional<rpc::ActorTableData> &result) {
if (result) {
subscribe(actor_id, *result);
}
if (done) {
done(status);
}
};
RAY_CHECK_OK(AsyncGet(actor_id, callback));
} else if (done) {
done(status);
}
};
auto status = client_impl_->GetGcsPubSub().Subscribe(ACTOR_CHANNEL, actor_id.Hex(),
on_subscribe, on_done);
RAY_LOG(DEBUG) << "Finished subscribing update operations of actor, actor id = "
<< actor_id;
return status;
}
Status ServiceBasedActorInfoAccessor::AsyncUnsubscribe(const ActorID &actor_id,
const StatusCallback &done) {
Status ServiceBasedActorInfoAccessor::AsyncUnsubscribe(const ActorID &actor_id) {
RAY_LOG(DEBUG) << "Cancelling subscription to an actor, actor id = " << actor_id;
auto status = actor_sub_executor_.AsyncUnsubscribe(subscribe_id_, actor_id, done);
auto status = client_impl_->GetGcsPubSub().Unsubscribe(ACTOR_CHANNEL, actor_id.Hex());
RAY_LOG(DEBUG) << "Finished cancelling subscription to an actor, actor id = "
<< actor_id;
return status;
@ -239,7 +293,7 @@ Status ServiceBasedActorInfoAccessor::AsyncAddCheckpoint(
request.mutable_checkpoint_data()->CopyFrom(*data_ptr);
auto operation = [this, request, actor_id, checkpoint_id,
callback](SequencerDoneCallback done_callback) {
callback](const SequencerDoneCallback &done_callback) {
client_impl_->GetGcsRpcClient().AddActorCheckpoint(
request, [actor_id, checkpoint_id, callback, done_callback](
const Status &status, const rpc::AddActorCheckpointReply &reply) {
@ -268,8 +322,7 @@ Status ServiceBasedActorInfoAccessor::AsyncGetCheckpoint(
request, [checkpoint_id, callback](const Status &status,
const rpc::GetActorCheckpointReply &reply) {
if (reply.has_checkpoint_data()) {
rpc::ActorCheckpointData checkpoint_data(reply.checkpoint_data());
callback(status, checkpoint_data);
callback(status, reply.checkpoint_data());
} else {
callback(status, boost::none);
}
@ -289,8 +342,7 @@ Status ServiceBasedActorInfoAccessor::AsyncGetCheckpointID(
request, [actor_id, callback](const Status &status,
const rpc::GetActorCheckpointIDReply &reply) {
if (reply.has_checkpoint_id_data()) {
rpc::ActorCheckpointIdData checkpoint_id_data(reply.checkpoint_id_data());
callback(status, checkpoint_id_data);
callback(status, reply.checkpoint_id_data());
} else {
callback(status, boost::none);
}

View file

@ -61,6 +61,8 @@ class ServiceBasedActorInfoAccessor : public ActorInfoAccessor {
Status AsyncGet(const ActorID &actor_id,
const OptionalItemCallback<rpc::ActorTableData> &callback) override;
Status AsyncGetAll(const MultiItemCallback<rpc::ActorTableData> &callback) override;
Status AsyncGetByName(
const std::string &name,
const OptionalItemCallback<rpc::ActorTableData> &callback) override;
@ -83,7 +85,7 @@ class ServiceBasedActorInfoAccessor : public ActorInfoAccessor {
const SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe,
const StatusCallback &done) override;
Status AsyncUnsubscribe(const ActorID &actor_id, const StatusCallback &done) override;
Status AsyncUnsubscribe(const ActorID &actor_id) override;
Status AsyncAddCheckpoint(const std::shared_ptr<rpc::ActorCheckpointData> &data_ptr,
const StatusCallback &callback) override;
@ -96,16 +98,9 @@ class ServiceBasedActorInfoAccessor : public ActorInfoAccessor {
const ActorID &actor_id,
const OptionalItemCallback<rpc::ActorCheckpointIdData> &callback) override;
protected:
ClientID subscribe_id_;
private:
ServiceBasedGcsClient *client_impl_;
typedef SubscriptionExecutor<ActorID, ActorTableData, ActorTable>
ActorSubscriptionExecutor;
ActorSubscriptionExecutor actor_sub_executor_;
Sequencer<ActorID> sequencer_;
};

View file

@ -95,11 +95,8 @@ class ServiceBasedGcsClientTest : public RedisServiceManagerForTest {
return WaitReady(promise.get_future(), timeout_ms_);
}
bool UnsubscribeActor(const ActorID &actor_id) {
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Actors().AsyncUnsubscribe(
actor_id, [&promise](Status status) { promise.set_value(status.ok()); }));
return WaitReady(promise.get_future(), timeout_ms_);
void UnsubscribeActor(const ActorID &actor_id) {
RAY_CHECK_OK(gcs_client_->Actors().AsyncUnsubscribe(actor_id));
}
bool SubscribeAllActors(
@ -497,7 +494,7 @@ TEST_F(ServiceBasedGcsClientTest, TestActorInfo) {
rpc::ActorTableData_ActorState::ActorTableData_ActorState_ALIVE);
// Cancel subscription to an actor.
ASSERT_TRUE(UnsubscribeActor(actor_id));
UnsubscribeActor(actor_id);
// Update dynamic states of actor in GCS.
actor_table_data->set_state(

View file

@ -46,16 +46,14 @@ void DefaultActorInfoHandler::HandleGetActorInfo(
<< ", job id = " << actor_id.JobId() << ", actor id = " << actor_id;
auto on_done = [actor_id, reply, send_reply_callback](
Status status, const boost::optional<ActorTableData> &result) {
if (status.ok()) {
if (result) {
reply->mutable_actor_table_data()->CopyFrom(*result);
}
} else {
RAY_LOG(ERROR) << "Failed to get actor info: " << status.ToString()
<< ", job id = " << actor_id.JobId() << ", actor id = " << actor_id;
const Status &status,
const boost::optional<ActorTableData> &result) {
if (result) {
reply->mutable_actor_table_data()->CopyFrom(*result);
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
RAY_LOG(DEBUG) << "Finished getting actor info, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id << ", status = " << status;
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
};
// Look up the actor_id in the GCS.
@ -63,9 +61,26 @@ void DefaultActorInfoHandler::HandleGetActorInfo(
if (!status.ok()) {
on_done(status, boost::none);
}
}
RAY_LOG(DEBUG) << "Finished getting actor info, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id;
void DefaultActorInfoHandler::HandleGetAllActorInfo(
const rpc::GetAllActorInfoRequest &request, rpc::GetAllActorInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) {
RAY_LOG(DEBUG) << "Getting all actor info.";
auto on_done = [reply, send_reply_callback](const Status &status,
const std::vector<ActorTableData> &result) {
for (auto &it : result) {
reply->add_actor_table_data()->CopyFrom(it);
}
RAY_LOG(DEBUG) << "Finished getting all actor info.";
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
};
Status status = gcs_client_.Actors().AsyncGetAll(on_done);
if (!status.ok()) {
on_done(status, std::vector<ActorTableData>());
}
}
void DefaultActorInfoHandler::HandleGetNamedActorInfo(
@ -114,10 +129,16 @@ void DefaultActorInfoHandler::HandleRegisterActorInfo(
<< ", actor id = " << actor_id;
auto actor_table_data = std::make_shared<ActorTableData>();
actor_table_data->CopyFrom(request.actor_table_data());
auto on_done = [actor_id, reply, send_reply_callback](Status status) {
auto on_done = [this, actor_id, actor_table_data, reply,
send_reply_callback](const Status &status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to register actor info: " << status.ToString()
<< ", job id = " << actor_id.JobId() << ", actor id = " << actor_id;
} else {
RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor_id.Hex(),
actor_table_data->SerializeAsString(), nullptr));
RAY_LOG(DEBUG) << "Finished registering actor info, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id;
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
@ -126,8 +147,6 @@ void DefaultActorInfoHandler::HandleRegisterActorInfo(
if (!status.ok()) {
on_done(status);
}
RAY_LOG(DEBUG) << "Finished registering actor info, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id;
}
void DefaultActorInfoHandler::HandleUpdateActorInfo(
@ -138,10 +157,16 @@ void DefaultActorInfoHandler::HandleUpdateActorInfo(
<< ", actor id = " << actor_id;
auto actor_table_data = std::make_shared<ActorTableData>();
actor_table_data->CopyFrom(request.actor_table_data());
auto on_done = [actor_id, reply, send_reply_callback](Status status) {
auto on_done = [this, actor_id, actor_table_data, reply,
send_reply_callback](const Status &status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to update actor info: " << status.ToString()
<< ", job id = " << actor_id.JobId() << ", actor id = " << actor_id;
} else {
RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor_id.Hex(),
actor_table_data->SerializeAsString(), nullptr));
RAY_LOG(DEBUG) << "Finished updating actor info, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id;
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
@ -150,8 +175,6 @@ void DefaultActorInfoHandler::HandleUpdateActorInfo(
if (!status.ok()) {
on_done(status);
}
RAY_LOG(DEBUG) << "Finished updating actor info, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id;
}
void DefaultActorInfoHandler::HandleAddActorCheckpoint(
@ -169,6 +192,10 @@ void DefaultActorInfoHandler::HandleAddActorCheckpoint(
RAY_LOG(ERROR) << "Failed to add actor checkpoint: " << status.ToString()
<< ", job id = " << actor_id.JobId() << ", actor id = " << actor_id
<< ", checkpoint id = " << checkpoint_id;
} else {
RAY_LOG(DEBUG) << "Finished adding actor checkpoint, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id
<< ", checkpoint id = " << checkpoint_id;
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
@ -177,8 +204,6 @@ void DefaultActorInfoHandler::HandleAddActorCheckpoint(
if (!status.ok()) {
on_done(status);
}
RAY_LOG(DEBUG) << "Finished adding actor checkpoint, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id << ", checkpoint id = " << checkpoint_id;
}
void DefaultActorInfoHandler::HandleGetActorCheckpoint(
@ -190,10 +215,13 @@ void DefaultActorInfoHandler::HandleGetActorCheckpoint(
RAY_LOG(DEBUG) << "Getting actor checkpoint, job id = " << actor_id.JobId()
<< ", checkpoint id = " << checkpoint_id;
auto on_done = [actor_id, checkpoint_id, reply, send_reply_callback](
Status status, const boost::optional<ActorCheckpointData> &result) {
const Status &status,
const boost::optional<ActorCheckpointData> &result) {
if (status.ok()) {
RAY_DCHECK(result);
reply->mutable_checkpoint_data()->CopyFrom(*result);
RAY_LOG(DEBUG) << "Finished getting actor checkpoint, job id = " << actor_id.JobId()
<< ", checkpoint id = " << checkpoint_id;
} else {
RAY_LOG(ERROR) << "Failed to get actor checkpoint: " << status.ToString()
<< ", job id = " << actor_id.JobId()
@ -207,8 +235,6 @@ void DefaultActorInfoHandler::HandleGetActorCheckpoint(
if (!status.ok()) {
on_done(status, boost::none);
}
RAY_LOG(DEBUG) << "Finished getting actor checkpoint, job id = " << actor_id.JobId()
<< ", checkpoint id = " << checkpoint_id;
}
void DefaultActorInfoHandler::HandleGetActorCheckpointID(
@ -218,11 +244,13 @@ void DefaultActorInfoHandler::HandleGetActorCheckpointID(
RAY_LOG(DEBUG) << "Getting actor checkpoint id, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id;
auto on_done = [actor_id, reply, send_reply_callback](
Status status,
const Status &status,
const boost::optional<ActorCheckpointIdData> &result) {
if (status.ok()) {
RAY_DCHECK(result);
reply->mutable_checkpoint_id_data()->CopyFrom(*result);
RAY_LOG(DEBUG) << "Finished getting actor checkpoint id, job id = "
<< actor_id.JobId() << ", actor id = " << actor_id;
} else {
RAY_LOG(ERROR) << "Failed to get actor checkpoint id: " << status.ToString()
<< ", job id = " << actor_id.JobId() << ", actor id = " << actor_id;
@ -234,8 +262,6 @@ void DefaultActorInfoHandler::HandleGetActorCheckpointID(
if (!status.ok()) {
on_done(status, boost::none);
}
RAY_LOG(DEBUG) << "Finished getting actor checkpoint id, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id;
}
} // namespace rpc

View file

@ -26,8 +26,11 @@ namespace rpc {
class DefaultActorInfoHandler : public rpc::ActorInfoHandler {
public:
explicit DefaultActorInfoHandler(gcs::RedisGcsClient &gcs_client,
gcs::GcsActorManager &gcs_actor_manager)
: gcs_client_(gcs_client), gcs_actor_manager_(gcs_actor_manager) {}
gcs::GcsActorManager &gcs_actor_manager,
std::shared_ptr<gcs::GcsPubSub> &gcs_pub_sub)
: gcs_client_(gcs_client),
gcs_actor_manager_(gcs_actor_manager),
gcs_pub_sub_(gcs_pub_sub) {}
void HandleCreateActor(const CreateActorRequest &request, CreateActorReply *reply,
SendReplyCallback send_reply_callback) override;
@ -39,6 +42,10 @@ class DefaultActorInfoHandler : public rpc::ActorInfoHandler {
GetNamedActorInfoReply *reply,
SendReplyCallback send_reply_callback) override;
void HandleGetAllActorInfo(const GetAllActorInfoRequest &request,
GetAllActorInfoReply *reply,
SendReplyCallback send_reply_callback) override;
void HandleRegisterActorInfo(const RegisterActorInfoRequest &request,
RegisterActorInfoReply *reply,
SendReplyCallback send_reply_callback) override;
@ -62,6 +69,7 @@ class DefaultActorInfoHandler : public rpc::ActorInfoHandler {
private:
gcs::RedisGcsClient &gcs_client_;
gcs::GcsActorManager &gcs_actor_manager_;
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
};
} // namespace rpc

View file

@ -88,9 +88,11 @@ rpc::ActorTableData *GcsActor::GetMutableActorTableData() { return &actor_table_
/////////////////////////////////////////////////////////////////////////////////////////
GcsActorManager::GcsActorManager(std::shared_ptr<GcsActorSchedulerInterface> scheduler,
gcs::ActorInfoAccessor &actor_info_accessor,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
const rpc::ClientFactoryFn &worker_client_factory)
: gcs_actor_scheduler_(std::move(scheduler)),
actor_info_accessor_(actor_info_accessor),
gcs_pub_sub_(std::move(gcs_pub_sub)),
worker_client_factory_(worker_client_factory) {}
Status GcsActorManager::RegisterActor(
@ -270,8 +272,13 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id) {
auto actor_table_data =
std::make_shared<rpc::ActorTableData>(*mutable_actor_table_data);
// The backend storage is reliable in the future, so the status must be ok.
RAY_CHECK_OK(
actor_info_accessor_.AsyncUpdate(actor->GetActorID(), actor_table_data, nullptr));
RAY_CHECK_OK(actor_info_accessor_.AsyncUpdate(
actor->GetActorID(), actor_table_data,
[this, actor_id, actor_table_data](Status status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor_id.Hex(),
actor_table_data->SerializeAsString(),
nullptr));
}));
}
void GcsActorManager::OnWorkerDead(const ray::ClientID &node_id,
@ -357,9 +364,8 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche
// so that the actor will never be rescheduled.
auto remaining_reconstructions =
need_reschedule ? mutable_actor_table_data->remaining_reconstructions() : 0;
RAY_LOG(WARNING) << "Actor is failed " << actor->GetActorID() << " on worker "
<< worker_id << " at node " << node_id
<< ", need_reschedule = " << need_reschedule
RAY_LOG(WARNING) << "Actor is failed " << actor_id << " on worker " << worker_id
<< " at node " << node_id << ", need_reschedule = " << need_reschedule
<< ", remaining_reconstructions = " << remaining_reconstructions;
if (remaining_reconstructions > 0) {
@ -368,16 +374,24 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche
auto actor_table_data =
std::make_shared<rpc::ActorTableData>(*mutable_actor_table_data);
// The backend storage is reliable in the future, so the status must be ok.
RAY_CHECK_OK(
actor_info_accessor_.AsyncUpdate(actor->GetActorID(), actor_table_data, nullptr));
RAY_CHECK_OK(actor_info_accessor_.AsyncUpdate(
actor_id, actor_table_data, [this, actor_id, actor_table_data](Status status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor_id.Hex(),
actor_table_data->SerializeAsString(),
nullptr));
}));
gcs_actor_scheduler_->Schedule(actor);
} else {
mutable_actor_table_data->set_state(rpc::ActorTableData::DEAD);
auto actor_table_data =
std::make_shared<rpc::ActorTableData>(*mutable_actor_table_data);
// The backend storage is reliable in the future, so the status must be ok.
RAY_CHECK_OK(
actor_info_accessor_.AsyncUpdate(actor->GetActorID(), actor_table_data, nullptr));
RAY_CHECK_OK(actor_info_accessor_.AsyncUpdate(
actor_id, actor_table_data, [this, actor_id, actor_table_data](Status status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor_id.Hex(),
actor_table_data->SerializeAsString(),
nullptr));
}));
// The actor is dead, but we should not remove the entry from the
// registered actors yet. If the actor is owned, we will destroy the actor
// once the owner fails or notifies us that the actor's handle has gone out
@ -398,7 +412,12 @@ void GcsActorManager::OnActorCreationSuccess(std::shared_ptr<GcsActor> actor) {
auto actor_table_data =
std::make_shared<rpc::ActorTableData>(actor->GetActorTableData());
// The backend storage is reliable in the future, so the status must be ok.
RAY_CHECK_OK(actor_info_accessor_.AsyncUpdate(actor_id, actor_table_data, nullptr));
RAY_CHECK_OK(actor_info_accessor_.AsyncUpdate(
actor_id, actor_table_data, [this, actor_id, actor_table_data](Status status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor_id.Hex(),
actor_table_data->SerializeAsString(),
nullptr));
}));
// Invoke all callbacks for all registration requests of this actor (duplicated
// requests are included) and remove all of them from actor_to_register_callbacks_.

View file

@ -24,6 +24,7 @@
#include "absl/container/flat_hash_map.h"
#include "gcs_actor_scheduler.h"
#include "ray/gcs/pubsub/gcs_pub_sub.h"
namespace ray {
namespace gcs {
@ -118,8 +119,10 @@ class GcsActorManager {
///
/// \param scheduler Used to schedule actor creation tasks.
/// \param actor_info_accessor Used to flush actor data to storage.
/// \param gcs_pub_sub Used to publish gcs message.
GcsActorManager(std::shared_ptr<GcsActorSchedulerInterface> scheduler,
gcs::ActorInfoAccessor &actor_info_accessor,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
const rpc::ClientFactoryFn &worker_client_factory = nullptr);
~GcsActorManager() = default;
@ -234,6 +237,8 @@ class GcsActorManager {
std::shared_ptr<gcs::GcsActorSchedulerInterface> gcs_actor_scheduler_;
/// Actor table. Used to update actor information upon creation, deletion, etc.
gcs::ActorInfoAccessor &actor_info_accessor_;
/// A publisher for publishing gcs messages.
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
/// Factory to produce clients to workers. This is used to communicate with
/// actors and their owners.
rpc::ClientFactoryFn worker_client_factory_;

View file

@ -24,12 +24,14 @@ namespace gcs {
GcsActorScheduler::GcsActorScheduler(
boost::asio::io_context &io_context, gcs::ActorInfoAccessor &actor_info_accessor,
const gcs::GcsNodeManager &gcs_node_manager,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
std::function<void(std::shared_ptr<GcsActor>)> schedule_failure_handler,
std::function<void(std::shared_ptr<GcsActor>)> schedule_success_handler,
LeaseClientFactoryFn lease_client_factory, rpc::ClientFactoryFn client_factory)
: io_context_(io_context),
actor_info_accessor_(actor_info_accessor),
gcs_node_manager_(gcs_node_manager),
gcs_pub_sub_(std::move(gcs_pub_sub)),
schedule_failure_handler_(std::move(schedule_failure_handler)),
schedule_success_handler_(std::move(schedule_success_handler)),
lease_client_factory_(std::move(lease_client_factory)),
@ -72,17 +74,21 @@ void GcsActorScheduler::Schedule(std::shared_ptr<GcsActor> actor) {
auto actor_table_data =
std::make_shared<rpc::ActorTableData>(actor->GetActorTableData());
// The backend storage is reliable in the future, so the status must be ok.
RAY_CHECK_OK(actor_info_accessor_.AsyncUpdate(actor->GetActorID(), actor_table_data,
[this, actor](Status status) {
RAY_CHECK_OK(status);
// There is no promise that the node the
// actor tied to is still alive as the
// flush is asynchronously, so just
// invoke `Schedule` which will lease
// worker directly if the node is still
// available or select a new one if not.
Schedule(actor);
}));
RAY_CHECK_OK(actor_info_accessor_.AsyncUpdate(
actor->GetActorID(), actor_table_data,
[this, actor, actor_table_data](Status status) {
RAY_CHECK_OK(status);
RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor->GetActorID().Hex(),
actor_table_data->SerializeAsString(),
nullptr));
// There is no promise that the node the
// actor tied to is still alive as the
// flush is asynchronously, so just
// invoke `Schedule` which will lease
// worker directly if the node is still
// available or select a new one if not.
Schedule(actor);
}));
}
std::vector<ActorID> GcsActorScheduler::CancelOnNode(const ClientID &node_id) {
@ -226,11 +232,15 @@ void GcsActorScheduler::HandleWorkerLeasedReply(
auto actor_table_data =
std::make_shared<rpc::ActorTableData>(actor->GetActorTableData());
// The backend storage is reliable in the future, so the status must be ok.
RAY_CHECK_OK(actor_info_accessor_.AsyncUpdate(actor->GetActorID(), actor_table_data,
[this, actor](Status status) {
RAY_CHECK_OK(status);
Schedule(actor);
}));
RAY_CHECK_OK(actor_info_accessor_.AsyncUpdate(
actor->GetActorID(), actor_table_data,
[this, actor, actor_table_data](Status status) {
RAY_CHECK_OK(status);
RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor->GetActorID().Hex(),
actor_table_data->SerializeAsString(),
nullptr));
Schedule(actor);
}));
} else {
// The worker is leased successfully from the specified node.
std::vector<rpc::ResourceMapEntry> resources;

View file

@ -79,7 +79,7 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
/// will be used if not set.
explicit GcsActorScheduler(
boost::asio::io_context &io_context, gcs::ActorInfoAccessor &actor_info_accessor,
const GcsNodeManager &gcs_node_manager,
const GcsNodeManager &gcs_node_manager, std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
std::function<void(std::shared_ptr<GcsActor>)> schedule_failure_handler,
std::function<void(std::shared_ptr<GcsActor>)> schedule_success_handler,
LeaseClientFactoryFn lease_client_factory = nullptr,
@ -249,6 +249,8 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
core_worker_clients_;
/// Reference of GcsNodeManager.
const GcsNodeManager &gcs_node_manager_;
/// A publisher for publishing gcs messages.
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
/// The handler to handle the scheduling failures.
std::function<void(std::shared_ptr<GcsActor>)> schedule_failure_handler_;
/// The handler to handle the successful scheduling.

View file

@ -108,7 +108,7 @@ void GcsNodeManager::NodeFailureDetector::ScheduleTick() {
GcsNodeManager::GcsNodeManager(boost::asio::io_service &io_service,
gcs::NodeInfoAccessor &node_info_accessor,
gcs::ErrorInfoAccessor &error_info_accessor,
std::shared_ptr<gcs::GcsPubSub> &gcs_pub_sub)
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub)
: node_info_accessor_(node_info_accessor),
error_info_accessor_(error_info_accessor),
node_failure_detector_(new NodeFailureDetector(

View file

@ -41,7 +41,7 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
explicit GcsNodeManager(boost::asio::io_service &io_service,
gcs::NodeInfoAccessor &node_info_accessor,
gcs::ErrorInfoAccessor &error_info_accessor,
std::shared_ptr<gcs::GcsPubSub> &gcs_pub_sub);
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub);
/// Handle register rpc request come from raylet.
void HandleRegisterNode(const rpc::RegisterNodeRequest &request,

View file

@ -136,7 +136,7 @@ void GcsServer::InitGcsNodeManager() {
void GcsServer::InitGcsActorManager() {
RAY_CHECK(redis_gcs_client_ != nullptr && gcs_node_manager_ != nullptr);
auto scheduler = std::make_shared<GcsActorScheduler>(
main_service_, redis_gcs_client_->Actors(), *gcs_node_manager_,
main_service_, redis_gcs_client_->Actors(), *gcs_node_manager_, gcs_pub_sub_,
/*schedule_failure_handler=*/
[this](std::shared_ptr<GcsActor> actor) {
// When there are no available nodes to schedule the actor the
@ -161,7 +161,8 @@ void GcsServer::InitGcsActorManager() {
return std::make_shared<rpc::CoreWorkerClient>(address, client_call_manager_);
});
gcs_actor_manager_ = std::make_shared<GcsActorManager>(
scheduler, redis_gcs_client_->Actors(), [this](const rpc::Address &address) {
scheduler, redis_gcs_client_->Actors(), gcs_pub_sub_,
[this](const rpc::Address &address) {
return std::make_shared<rpc::CoreWorkerClient>(address, client_call_manager_);
});
gcs_node_manager_->AddNodeAddedListener(
@ -194,8 +195,8 @@ std::unique_ptr<rpc::JobInfoHandler> GcsServer::InitJobInfoHandler() {
}
std::unique_ptr<rpc::ActorInfoHandler> GcsServer::InitActorInfoHandler() {
return std::unique_ptr<rpc::DefaultActorInfoHandler>(
new rpc::DefaultActorInfoHandler(*redis_gcs_client_, *gcs_actor_manager_));
return std::unique_ptr<rpc::DefaultActorInfoHandler>(new rpc::DefaultActorInfoHandler(
*redis_gcs_client_, *gcs_actor_manager_, gcs_pub_sub_));
}
std::unique_ptr<rpc::ObjectInfoHandler> GcsServer::InitObjectInfoHandler() {

View file

@ -70,21 +70,26 @@ class GcsActorManagerTest : public ::testing::Test {
public:
GcsActorManagerTest()
: mock_actor_scheduler_(new MockActorScheduler()),
worker_client_(new MockWorkerClient()),
gcs_actor_manager_(mock_actor_scheduler_, actor_info_accessor_,
[&](const rpc::Address &addr) { return worker_client_; }) {}
worker_client_(new MockWorkerClient()) {
gcs_pub_sub_ = std::make_shared<GcsServerMocker::MockGcsPubSub>(redis_client_);
gcs_actor_manager_.reset(new gcs::GcsActorManager(
mock_actor_scheduler_, actor_info_accessor_, gcs_pub_sub_,
[&](const rpc::Address &addr) { return worker_client_; }));
}
GcsServerMocker::MockedActorInfoAccessor actor_info_accessor_;
std::shared_ptr<MockActorScheduler> mock_actor_scheduler_;
std::shared_ptr<MockWorkerClient> worker_client_;
gcs::GcsActorManager gcs_actor_manager_;
std::unique_ptr<gcs::GcsActorManager> gcs_actor_manager_;
std::shared_ptr<GcsServerMocker::MockGcsPubSub> gcs_pub_sub_;
std::shared_ptr<gcs::RedisClient> redis_client_;
};
TEST_F(GcsActorManagerTest, TestBasic) {
auto job_id = JobID::FromInt(1);
auto create_actor_request = Mocker::GenCreateActorRequest(job_id);
std::vector<std::shared_ptr<gcs::GcsActor>> finished_actors;
Status status = gcs_actor_manager_.RegisterActor(
Status status = gcs_actor_manager_->RegisterActor(
create_actor_request, [&finished_actors](std::shared_ptr<gcs::GcsActor> actor) {
finished_actors.emplace_back(actor);
});
@ -102,7 +107,7 @@ TEST_F(GcsActorManagerTest, TestBasic) {
address.set_raylet_id(node_id.Binary());
address.set_worker_id(worker_id.Binary());
actor->UpdateAddress(address);
gcs_actor_manager_.OnActorCreationSuccess(actor);
gcs_actor_manager_->OnActorCreationSuccess(actor);
ASSERT_EQ(finished_actors.size(), 1);
ASSERT_TRUE(worker_client_->Reply());
@ -113,7 +118,7 @@ TEST_F(GcsActorManagerTest, TestSchedulingFailed) {
auto job_id = JobID::FromInt(1);
auto create_actor_request = Mocker::GenCreateActorRequest(job_id);
std::vector<std::shared_ptr<gcs::GcsActor>> finished_actors;
RAY_CHECK_OK(gcs_actor_manager_.RegisterActor(
RAY_CHECK_OK(gcs_actor_manager_->RegisterActor(
create_actor_request, [&finished_actors](std::shared_ptr<gcs::GcsActor> actor) {
finished_actors.emplace_back(actor);
}));
@ -123,8 +128,8 @@ TEST_F(GcsActorManagerTest, TestSchedulingFailed) {
auto actor = mock_actor_scheduler_->actors.back();
mock_actor_scheduler_->actors.clear();
gcs_actor_manager_.OnActorCreationFailed(actor);
gcs_actor_manager_.SchedulePendingActors();
gcs_actor_manager_->OnActorCreationFailed(actor);
gcs_actor_manager_->SchedulePendingActors();
ASSERT_EQ(mock_actor_scheduler_->actors.size(), 1);
mock_actor_scheduler_->actors.clear();
ASSERT_EQ(finished_actors.size(), 0);
@ -136,7 +141,7 @@ TEST_F(GcsActorManagerTest, TestSchedulingFailed) {
address.set_raylet_id(node_id.Binary());
address.set_worker_id(worker_id.Binary());
actor->UpdateAddress(address);
gcs_actor_manager_.OnActorCreationSuccess(actor);
gcs_actor_manager_->OnActorCreationSuccess(actor);
ASSERT_EQ(finished_actors.size(), 1);
}
@ -144,7 +149,7 @@ TEST_F(GcsActorManagerTest, TestWorkerFailure) {
auto job_id = JobID::FromInt(1);
auto create_actor_request = Mocker::GenCreateActorRequest(job_id);
std::vector<std::shared_ptr<gcs::GcsActor>> finished_actors;
RAY_CHECK_OK(gcs_actor_manager_.RegisterActor(
RAY_CHECK_OK(gcs_actor_manager_->RegisterActor(
create_actor_request, [&finished_actors](std::shared_ptr<gcs::GcsActor> actor) {
finished_actors.emplace_back(actor);
}));
@ -161,19 +166,19 @@ TEST_F(GcsActorManagerTest, TestWorkerFailure) {
address.set_raylet_id(node_id.Binary());
address.set_worker_id(worker_id.Binary());
actor->UpdateAddress(address);
gcs_actor_manager_.OnActorCreationSuccess(actor);
gcs_actor_manager_->OnActorCreationSuccess(actor);
ASSERT_EQ(finished_actors.size(), 1);
// Killing another worker does not affect this actor.
EXPECT_CALL(*mock_actor_scheduler_, CancelOnWorker(node_id, _));
gcs_actor_manager_.OnWorkerDead(node_id, WorkerID::FromRandom());
gcs_actor_manager_->OnWorkerDead(node_id, WorkerID::FromRandom());
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::ALIVE);
// Remove worker and then check that the actor is dead.
gcs_actor_manager_.OnWorkerDead(node_id, worker_id);
gcs_actor_manager_->OnWorkerDead(node_id, worker_id);
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD);
// No more actors to schedule.
gcs_actor_manager_.SchedulePendingActors();
gcs_actor_manager_->SchedulePendingActors();
ASSERT_EQ(mock_actor_scheduler_->actors.size(), 0);
ASSERT_TRUE(worker_client_->Reply());
@ -183,7 +188,7 @@ TEST_F(GcsActorManagerTest, TestNodeFailure) {
auto job_id = JobID::FromInt(1);
auto create_actor_request = Mocker::GenCreateActorRequest(job_id);
std::vector<std::shared_ptr<gcs::GcsActor>> finished_actors;
Status status = gcs_actor_manager_.RegisterActor(
Status status = gcs_actor_manager_->RegisterActor(
create_actor_request, [&finished_actors](std::shared_ptr<gcs::GcsActor> actor) {
finished_actors.emplace_back(actor);
});
@ -201,20 +206,20 @@ TEST_F(GcsActorManagerTest, TestNodeFailure) {
address.set_raylet_id(node_id.Binary());
address.set_worker_id(worker_id.Binary());
actor->UpdateAddress(address);
gcs_actor_manager_.OnActorCreationSuccess(actor);
gcs_actor_manager_->OnActorCreationSuccess(actor);
ASSERT_EQ(finished_actors.size(), 1);
// Killing another node does not affect this actor.
EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(_));
gcs_actor_manager_.OnNodeDead(ClientID::FromRandom());
gcs_actor_manager_->OnNodeDead(ClientID::FromRandom());
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::ALIVE);
// Remove node and then check that the actor is dead.
EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(node_id));
gcs_actor_manager_.OnNodeDead(node_id);
gcs_actor_manager_->OnNodeDead(node_id);
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD);
// No more actors to schedule.
gcs_actor_manager_.SchedulePendingActors();
gcs_actor_manager_->SchedulePendingActors();
ASSERT_EQ(mock_actor_scheduler_->actors.size(), 0);
ASSERT_TRUE(worker_client_->Reply());
@ -225,7 +230,7 @@ TEST_F(GcsActorManagerTest, TestActorReconstruction) {
auto create_actor_request = Mocker::GenCreateActorRequest(
job_id, /*max_reconstructions=*/1, /*detached=*/false);
std::vector<std::shared_ptr<gcs::GcsActor>> finished_actors;
Status status = gcs_actor_manager_.RegisterActor(
Status status = gcs_actor_manager_->RegisterActor(
create_actor_request, [&finished_actors](std::shared_ptr<gcs::GcsActor> actor) {
finished_actors.emplace_back(actor);
});
@ -243,38 +248,38 @@ TEST_F(GcsActorManagerTest, TestActorReconstruction) {
address.set_raylet_id(node_id.Binary());
address.set_worker_id(worker_id.Binary());
actor->UpdateAddress(address);
gcs_actor_manager_.OnActorCreationSuccess(actor);
gcs_actor_manager_->OnActorCreationSuccess(actor);
ASSERT_EQ(finished_actors.size(), 1);
// Remove worker and then check that the actor is being restarted.
EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(node_id));
gcs_actor_manager_.OnNodeDead(node_id);
gcs_actor_manager_->OnNodeDead(node_id);
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::RECONSTRUCTING);
// Add node and check that the actor is restarted.
gcs_actor_manager_.SchedulePendingActors();
gcs_actor_manager_->SchedulePendingActors();
ASSERT_EQ(mock_actor_scheduler_->actors.size(), 1);
mock_actor_scheduler_->actors.clear();
ASSERT_EQ(finished_actors.size(), 1);
auto node_id2 = ClientID::FromRandom();
address.set_raylet_id(node_id2.Binary());
actor->UpdateAddress(address);
gcs_actor_manager_.OnActorCreationSuccess(actor);
gcs_actor_manager_->OnActorCreationSuccess(actor);
ASSERT_EQ(finished_actors.size(), 1);
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::ALIVE);
ASSERT_EQ(actor->GetNodeID(), node_id2);
// Killing another worker does not affect this actor.
EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(_));
gcs_actor_manager_.OnNodeDead(ClientID::FromRandom());
gcs_actor_manager_->OnNodeDead(ClientID::FromRandom());
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::ALIVE);
// Remove worker and then check that the actor is dead.
EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(node_id2));
gcs_actor_manager_.OnNodeDead(node_id2);
gcs_actor_manager_->OnNodeDead(node_id2);
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD);
// No more actors to schedule.
gcs_actor_manager_.SchedulePendingActors();
gcs_actor_manager_->SchedulePendingActors();
ASSERT_EQ(mock_actor_scheduler_->actors.size(), 0);
ASSERT_TRUE(worker_client_->Reply());
@ -285,7 +290,7 @@ TEST_F(GcsActorManagerTest, TestActorRestartWhenOwnerDead) {
auto create_actor_request = Mocker::GenCreateActorRequest(
job_id, /*max_reconstructions=*/1, /*detached=*/false);
std::vector<std::shared_ptr<gcs::GcsActor>> finished_actors;
RAY_CHECK_OK(gcs_actor_manager_.RegisterActor(
RAY_CHECK_OK(gcs_actor_manager_->RegisterActor(
create_actor_request, [&finished_actors](std::shared_ptr<gcs::GcsActor> actor) {
finished_actors.emplace_back(actor);
}));
@ -303,12 +308,12 @@ TEST_F(GcsActorManagerTest, TestActorRestartWhenOwnerDead) {
address.set_raylet_id(node_id.Binary());
address.set_worker_id(worker_id.Binary());
actor->UpdateAddress(address);
gcs_actor_manager_.OnActorCreationSuccess(actor);
gcs_actor_manager_->OnActorCreationSuccess(actor);
ASSERT_EQ(finished_actors.size(), 1);
// Remove the owner's node.
EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(owner_node_id));
gcs_actor_manager_.OnNodeDead(owner_node_id);
gcs_actor_manager_->OnNodeDead(owner_node_id);
// The child actor should be marked as dead.
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD);
ASSERT_EQ(worker_client_->killed_actors.size(), 1);
@ -317,9 +322,9 @@ TEST_F(GcsActorManagerTest, TestActorRestartWhenOwnerDead) {
// Remove the actor's node and check that the actor is not restarted, since
// its owner has died.
EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(node_id));
gcs_actor_manager_.OnNodeDead(node_id);
gcs_actor_manager_->OnNodeDead(node_id);
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD);
gcs_actor_manager_.SchedulePendingActors();
gcs_actor_manager_->SchedulePendingActors();
ASSERT_TRUE(mock_actor_scheduler_->actors.empty());
}
@ -328,7 +333,7 @@ TEST_F(GcsActorManagerTest, TestDetachedActorRestartWhenCreatorDead) {
auto create_actor_request =
Mocker::GenCreateActorRequest(job_id, /*max_reconstructions=*/1, /*detached=*/true);
std::vector<std::shared_ptr<gcs::GcsActor>> finished_actors;
RAY_CHECK_OK(gcs_actor_manager_.RegisterActor(
RAY_CHECK_OK(gcs_actor_manager_->RegisterActor(
create_actor_request, [&finished_actors](std::shared_ptr<gcs::GcsActor> actor) {
finished_actors.emplace_back(actor);
}));
@ -346,12 +351,12 @@ TEST_F(GcsActorManagerTest, TestDetachedActorRestartWhenCreatorDead) {
address.set_raylet_id(node_id.Binary());
address.set_worker_id(worker_id.Binary());
actor->UpdateAddress(address);
gcs_actor_manager_.OnActorCreationSuccess(actor);
gcs_actor_manager_->OnActorCreationSuccess(actor);
ASSERT_EQ(finished_actors.size(), 1);
// Remove the owner's node.
EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(owner_node_id));
gcs_actor_manager_.OnNodeDead(owner_node_id);
gcs_actor_manager_->OnNodeDead(owner_node_id);
// The child actor should not be marked as dead.
ASSERT_TRUE(worker_client_->killed_actors.empty());
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::ALIVE);
@ -363,39 +368,39 @@ TEST_F(GcsActorManagerTest, TestNamedActors) {
auto request1 =
Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, /*name=*/"actor1");
Status status = gcs_actor_manager_.RegisterActor(
Status status = gcs_actor_manager_->RegisterActor(
request1, [](std::shared_ptr<gcs::GcsActor> actor) {});
ASSERT_TRUE(status.ok());
ASSERT_EQ(gcs_actor_manager_.GetActorIDByName("actor1").Binary(),
ASSERT_EQ(gcs_actor_manager_->GetActorIDByName("actor1").Binary(),
request1.task_spec().actor_creation_task_spec().actor_id());
auto request2 =
Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, /*name=*/"actor2");
status = gcs_actor_manager_.RegisterActor(request2,
[](std::shared_ptr<gcs::GcsActor> actor) {});
status = gcs_actor_manager_->RegisterActor(request2,
[](std::shared_ptr<gcs::GcsActor> actor) {});
ASSERT_TRUE(status.ok());
ASSERT_EQ(gcs_actor_manager_.GetActorIDByName("actor2").Binary(),
ASSERT_EQ(gcs_actor_manager_->GetActorIDByName("actor2").Binary(),
request2.task_spec().actor_creation_task_spec().actor_id());
// Check that looking up a non-existent name returns ActorID::Nil();
ASSERT_EQ(gcs_actor_manager_.GetActorIDByName("actor3"), ActorID::Nil());
ASSERT_EQ(gcs_actor_manager_->GetActorIDByName("actor3"), ActorID::Nil());
// Check that naming collisions return Status::Invalid.
auto request3 =
Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, /*name=*/"actor2");
status = gcs_actor_manager_.RegisterActor(request3,
[](std::shared_ptr<gcs::GcsActor> actor) {});
status = gcs_actor_manager_->RegisterActor(request3,
[](std::shared_ptr<gcs::GcsActor> actor) {});
ASSERT_TRUE(status.IsInvalid());
ASSERT_EQ(gcs_actor_manager_.GetActorIDByName("actor2").Binary(),
ASSERT_EQ(gcs_actor_manager_->GetActorIDByName("actor2").Binary(),
request2.task_spec().actor_creation_task_spec().actor_id());
// Check that naming collisions are enforced across JobIDs.
auto request4 =
Mocker::GenCreateActorRequest(job_id_2, 0, /*is_detached=*/true, /*name=*/"actor2");
status = gcs_actor_manager_.RegisterActor(request4,
[](std::shared_ptr<gcs::GcsActor> actor) {});
status = gcs_actor_manager_->RegisterActor(request4,
[](std::shared_ptr<gcs::GcsActor> actor) {});
ASSERT_TRUE(status.IsInvalid());
ASSERT_EQ(gcs_actor_manager_.GetActorIDByName("actor2").Binary(),
ASSERT_EQ(gcs_actor_manager_->GetActorIDByName("actor2").Binary(),
request2.task_spec().actor_creation_task_spec().actor_id());
}

View file

@ -25,10 +25,11 @@ class GcsActorSchedulerTest : public ::testing::Test {
void SetUp() override {
raylet_client_ = std::make_shared<GcsServerMocker::MockRayletClient>();
worker_client_ = std::make_shared<GcsServerMocker::MockWorkerClient>();
gcs_pub_sub_ = std::make_shared<GcsServerMocker::MockGcsPubSub>(redis_client_);
gcs_node_manager_ = std::make_shared<gcs::GcsNodeManager>(
io_service_, node_info_accessor_, error_info_accessor_, gcs_pub_sub_);
gcs_actor_scheduler_ = std::make_shared<GcsServerMocker::MockedGcsActorScheduler>(
io_service_, actor_info_accessor_, *gcs_node_manager_,
io_service_, actor_info_accessor_, *gcs_node_manager_, gcs_pub_sub_,
/*schedule_failure_handler=*/
[this](std::shared_ptr<gcs::GcsActor> actor) {
failure_actors_.emplace_back(std::move(actor));
@ -55,7 +56,8 @@ class GcsActorSchedulerTest : public ::testing::Test {
std::shared_ptr<GcsServerMocker::MockedGcsActorScheduler> gcs_actor_scheduler_;
std::vector<std::shared_ptr<gcs::GcsActor>> success_actors_;
std::vector<std::shared_ptr<gcs::GcsActor>> failure_actors_;
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
std::shared_ptr<GcsServerMocker::MockGcsPubSub> gcs_pub_sub_;
std::shared_ptr<gcs::RedisClient> redis_client_;
};
TEST_F(GcsActorSchedulerTest, TestScheduleFailedWithZeroNode) {
@ -223,7 +225,6 @@ TEST_F(GcsActorSchedulerTest, TestNodeFailedWhenLeasing) {
ASSERT_EQ(actor->GetActorID(), actor_ids.front());
ASSERT_EQ(1, raylet_client_->num_workers_requested);
ASSERT_EQ(1, raylet_client_->callbacks.size());
ASSERT_TRUE(actor->GetNodeID().IsNil());
// Grant a worker, which will influence nothing.
ASSERT_TRUE(raylet_client_->GrantWorkerLease(
@ -269,7 +270,6 @@ TEST_F(GcsActorSchedulerTest, TestNodeFailedWhenCreating) {
ASSERT_EQ(1, actor_ids.size());
ASSERT_EQ(actor->GetActorID(), actor_ids.front());
ASSERT_EQ(1, worker_client_->callbacks.size());
ASSERT_TRUE(actor->GetNodeID().IsNil());
// Reply the actor creation request, which will influence nothing.
ASSERT_TRUE(worker_client_->ReplyPushTask());
@ -309,8 +309,6 @@ TEST_F(GcsActorSchedulerTest, TestWorkerFailedWhenCreating) {
ASSERT_EQ(actor->GetActorID(),
gcs_actor_scheduler_->CancelOnWorker(node_id, worker_id));
ASSERT_EQ(1, worker_client_->callbacks.size());
ASSERT_TRUE(actor->GetNodeID().IsNil());
ASSERT_TRUE(actor->GetWorkerID().IsNil());
// Reply the actor creation request, which will influence nothing.
ASSERT_TRUE(worker_client_->ReplyPushTask());

View file

@ -179,6 +179,11 @@ struct GcsServerMocker {
return Status::NotImplemented("");
}
Status AsyncGetAll(
const gcs::MultiItemCallback<rpc::ActorTableData> &callback) override {
return Status::NotImplemented("");
}
Status AsyncGetByName(
const std::string &name,
const gcs::OptionalItemCallback<rpc::ActorTableData> &callback) override {
@ -217,8 +222,7 @@ struct GcsServerMocker {
return Status::NotImplemented("");
}
Status AsyncUnsubscribe(const ActorID &actor_id,
const gcs::StatusCallback &done) override {
Status AsyncUnsubscribe(const ActorID &actor_id) override {
return Status::NotImplemented("");
}
@ -356,6 +360,17 @@ struct GcsServerMocker {
return Status::OK();
}
};
class MockGcsPubSub : public gcs::GcsPubSub {
public:
MockGcsPubSub(std::shared_ptr<gcs::RedisClient> redis_client)
: GcsPubSub(redis_client) {}
Status Publish(const std::string &channel, const std::string &id,
const std::string &data, const gcs::StatusCallback &done) override {
return Status::OK();
}
};
};
} // namespace ray

View file

@ -28,6 +28,7 @@ namespace gcs {
#define JOB_CHANNEL "JOB"
#define NODE_CHANNEL "NODE"
#define NODE_RESOURCE_CHANNEL "NODE_RESOURCE"
#define ACTOR_CHANNEL "ACTOR"
#define WORKER_FAILURE_CHANNEL "WORKER_FAILURE"
#define OBJECT_CHANNEL "OBJECT"
#define TASK_CHANNEL "TASK"
@ -55,8 +56,8 @@ class GcsPubSub {
/// \param data The data of message to be published to redis.
/// \param done Callback that will be called when the message is published to redis.
/// \return Status
Status Publish(const std::string &channel, const std::string &id,
const std::string &data, const StatusCallback &done);
virtual Status Publish(const std::string &channel, const std::string &id,
const std::string &data, const StatusCallback &done);
/// Subscribe to messages with the specified ID under the specified channel.
///

View file

@ -143,9 +143,8 @@ Status RedisLogBasedActorInfoAccessor::AsyncSubscribe(
done);
}
Status RedisLogBasedActorInfoAccessor::AsyncUnsubscribe(const ActorID &actor_id,
const StatusCallback &done) {
return log_based_actor_sub_executor_.AsyncUnsubscribe(subscribe_id_, actor_id, done);
Status RedisLogBasedActorInfoAccessor::AsyncUnsubscribe(const ActorID &actor_id) {
return log_based_actor_sub_executor_.AsyncUnsubscribe(subscribe_id_, actor_id, nullptr);
}
Status RedisLogBasedActorInfoAccessor::AsyncAddCheckpoint(
@ -249,6 +248,36 @@ Status RedisActorInfoAccessor::AsyncGet(
return client_impl_->actor_table().Lookup(JobID::Nil(), actor_id, on_done, on_failure);
}
Status RedisActorInfoAccessor::AsyncGetAll(
const MultiItemCallback<rpc::ActorTableData> &callback) {
RAY_CHECK(callback != nullptr);
auto actor_id_list = GetAllActorID();
if (actor_id_list.empty()) {
callback(Status::OK(), std::vector<rpc::ActorTableData>());
return Status::OK();
}
auto finished_count = std::make_shared<int>(0);
auto result = std::make_shared<std::vector<ActorTableData>>();
int size = actor_id_list.size();
for (auto &actor_id : actor_id_list) {
auto on_done = [finished_count, size, result, callback](
const Status &status,
const boost::optional<ActorTableData> &data) {
++(*finished_count);
if (data) {
result->push_back(*data);
}
if (*finished_count == size) {
callback(Status::OK(), *result);
}
};
RAY_CHECK_OK(AsyncGet(actor_id, on_done));
}
return Status::OK();
}
Status RedisActorInfoAccessor::AsyncRegister(
const std::shared_ptr<ActorTableData> &data_ptr, const StatusCallback &callback) {
auto on_register_done = [callback](RedisGcsClient *client, const ActorID &actor_id,
@ -289,9 +318,8 @@ Status RedisActorInfoAccessor::AsyncSubscribe(
return actor_sub_executor_.AsyncSubscribe(subscribe_id_, actor_id, subscribe, done);
}
Status RedisActorInfoAccessor::AsyncUnsubscribe(const ActorID &actor_id,
const StatusCallback &done) {
return actor_sub_executor_.AsyncUnsubscribe(subscribe_id_, actor_id, done);
Status RedisActorInfoAccessor::AsyncUnsubscribe(const ActorID &actor_id) {
return actor_sub_executor_.AsyncUnsubscribe(subscribe_id_, actor_id, nullptr);
}
RedisJobInfoAccessor::RedisJobInfoAccessor(RedisGcsClient *client_impl)

View file

@ -42,6 +42,11 @@ class RedisLogBasedActorInfoAccessor : public ActorInfoAccessor {
Status AsyncGet(const ActorID &actor_id,
const OptionalItemCallback<ActorTableData> &callback) override;
Status AsyncGetAll(const MultiItemCallback<rpc::ActorTableData> &callback) {
return Status::NotImplemented(
"RedisLogBasedActorInfoAccessor does not support AsyncGetAll.");
}
Status AsyncGetByName(const std::string &name,
const OptionalItemCallback<ActorTableData> &callback) override {
return Status::NotImplemented(
@ -65,7 +70,7 @@ class RedisLogBasedActorInfoAccessor : public ActorInfoAccessor {
const SubscribeCallback<ActorID, ActorTableData> &subscribe,
const StatusCallback &done) override;
Status AsyncUnsubscribe(const ActorID &actor_id, const StatusCallback &done) override;
Status AsyncUnsubscribe(const ActorID &actor_id) override;
Status AsyncAddCheckpoint(const std::shared_ptr<ActorCheckpointData> &data_ptr,
const StatusCallback &callback) override;
@ -120,6 +125,8 @@ class RedisActorInfoAccessor : public RedisLogBasedActorInfoAccessor {
Status AsyncGet(const ActorID &actor_id,
const OptionalItemCallback<ActorTableData> &callback) override;
Status AsyncGetAll(const MultiItemCallback<rpc::ActorTableData> &callback) override;
Status AsyncGetByName(const std::string &name,
const OptionalItemCallback<ActorTableData> &callback) override {
return Status::NotImplemented(
@ -140,7 +147,7 @@ class RedisActorInfoAccessor : public RedisLogBasedActorInfoAccessor {
const SubscribeCallback<ActorID, ActorTableData> &subscribe,
const StatusCallback &done) override;
Status AsyncUnsubscribe(const ActorID &actor_id, const StatusCallback &done) override;
Status AsyncUnsubscribe(const ActorID &actor_id) override;
protected:
std::vector<ActorID> GetAllActorID() const override;

View file

@ -73,6 +73,15 @@ message GetNamedActorInfoReply {
ActorTableData actor_table_data = 2;
}
message GetAllActorInfoRequest {
}
message GetAllActorInfoReply {
GcsStatus status = 1;
// Data of actor.
repeated ActorTableData actor_table_data = 2;
}
message RegisterActorInfoRequest {
// Data of actor.
ActorTableData actor_table_data = 1;
@ -128,6 +137,8 @@ service ActorInfoGcsService {
rpc GetActorInfo(GetActorInfoRequest) returns (GetActorInfoReply);
// Get actor data from GCS Service by name.
rpc GetNamedActorInfo(GetNamedActorInfoRequest) returns (GetNamedActorInfoReply);
// Get information of all actor from GCS Service.
rpc GetAllActorInfo(GetAllActorInfoRequest) returns (GetAllActorInfoReply);
// Register an actor to GCS Service.
rpc RegisterActorInfo(RegisterActorInfoRequest) returns (RegisterActorInfoReply);
// Update actor info in GCS Service.

View file

@ -106,6 +106,10 @@ class GcsRpcClient {
VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, GetNamedActorInfo,
actor_info_grpc_client_, )
/// Get all actor data from GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, GetAllActorInfo,
actor_info_grpc_client_, )
/// Register an actor to GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, RegisterActorInfo,
actor_info_grpc_client_, )

View file

@ -106,6 +106,10 @@ class ActorInfoGcsServiceHandler {
GetNamedActorInfoReply *reply,
SendReplyCallback send_reply_callback) = 0;
virtual void HandleGetAllActorInfo(const GetAllActorInfoRequest &request,
GetAllActorInfoReply *reply,
SendReplyCallback send_reply_callback) = 0;
virtual void HandleRegisterActorInfo(const RegisterActorInfoRequest &request,
RegisterActorInfoReply *reply,
SendReplyCallback send_reply_callback) = 0;
@ -146,6 +150,7 @@ class ActorInfoGrpcService : public GrpcService {
ACTOR_INFO_SERVICE_RPC_HANDLER(CreateActor);
ACTOR_INFO_SERVICE_RPC_HANDLER(GetActorInfo);
ACTOR_INFO_SERVICE_RPC_HANDLER(GetNamedActorInfo);
ACTOR_INFO_SERVICE_RPC_HANDLER(GetAllActorInfo);
ACTOR_INFO_SERVICE_RPC_HANDLER(RegisterActorInfo);
ACTOR_INFO_SERVICE_RPC_HANDLER(UpdateActorInfo);
ACTOR_INFO_SERVICE_RPC_HANDLER(AddActorCheckpoint);