merge actor info handler into actor manager (#8682)

This commit is contained in:
fangfengbin 2020-05-30 21:56:29 +08:00 committed by GitHub
parent ebea5c4111
commit 10c87063be
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 329 additions and 406 deletions

View file

@ -1,289 +0,0 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "actor_info_handler_impl.h"
#include "ray/util/logging.h"
namespace ray {
namespace rpc {
void DefaultActorInfoHandler::HandleCreateActor(
const ray::rpc::CreateActorRequest &request, ray::rpc::CreateActorReply *reply,
ray::rpc::SendReplyCallback send_reply_callback) {
RAY_CHECK(request.task_spec().type() == TaskType::ACTOR_CREATION_TASK);
auto actor_id =
ActorID::FromBinary(request.task_spec().actor_creation_task_spec().actor_id());
RAY_LOG(INFO) << "Registering actor, actor id = " << actor_id;
Status status = gcs_actor_manager_.RegisterActor(
request, [reply, send_reply_callback,
actor_id](const std::shared_ptr<gcs::GcsActor> &actor) {
RAY_LOG(INFO) << "Registered actor, actor id = " << actor_id;
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
});
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to create actor: " << status.ToString();
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
}
}
void DefaultActorInfoHandler::HandleGetActorInfo(
const rpc::GetActorInfoRequest &request, rpc::GetActorInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) {
ActorID actor_id = ActorID::FromBinary(request.actor_id());
RAY_LOG(DEBUG) << "Getting actor info"
<< ", job id = " << actor_id.JobId() << ", actor id = " << actor_id;
auto on_done = [actor_id, reply, send_reply_callback](
const Status &status,
const boost::optional<ActorTableData> &result) {
if (result) {
reply->mutable_actor_table_data()->CopyFrom(*result);
}
RAY_LOG(DEBUG) << "Finished getting actor info, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id << ", status = " << status;
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
};
// Look up the actor_id in the GCS.
Status status = gcs_table_storage_->ActorTable().Get(actor_id, on_done);
if (!status.ok()) {
on_done(status, boost::none);
}
}
void DefaultActorInfoHandler::HandleGetAllActorInfo(
const rpc::GetAllActorInfoRequest &request, rpc::GetAllActorInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) {
RAY_LOG(DEBUG) << "Getting all actor info.";
auto on_done = [reply, send_reply_callback](
const std::unordered_map<ActorID, ActorTableData> &result) {
for (auto &it : result) {
reply->add_actor_table_data()->CopyFrom(it.second);
}
RAY_LOG(DEBUG) << "Finished getting all actor info.";
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
};
Status status = gcs_table_storage_->ActorTable().GetAll(on_done);
if (!status.ok()) {
on_done(std::unordered_map<ActorID, ActorTableData>());
}
}
void DefaultActorInfoHandler::HandleGetNamedActorInfo(
const rpc::GetNamedActorInfoRequest &request, rpc::GetNamedActorInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) {
const std::string &name = request.name();
RAY_LOG(DEBUG) << "Getting actor info"
<< ", name = " << name;
auto on_done = [name, reply, send_reply_callback](
const Status &status,
const boost::optional<ActorTableData> &result) {
if (status.ok()) {
if (result) {
reply->mutable_actor_table_data()->CopyFrom(*result);
}
} else {
RAY_LOG(ERROR) << "Failed to get actor info: " << status.ToString()
<< ", name = " << name;
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
// Try to look up the actor ID for the named actor.
ActorID actor_id = gcs_actor_manager_.GetActorIDByName(name);
if (actor_id.IsNil()) {
// The named actor was not found.
std::stringstream stream;
stream << "Actor with name '" << name << "' was not found.";
on_done(Status::NotFound(stream.str()), boost::none);
} else {
// Look up the actor_id in the GCS.
Status status = gcs_table_storage_->ActorTable().Get(actor_id, on_done);
if (!status.ok()) {
on_done(status, boost::none);
}
RAY_LOG(DEBUG) << "Finished getting actor info, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id;
}
}
void DefaultActorInfoHandler::HandleRegisterActorInfo(
const rpc::RegisterActorInfoRequest &request, rpc::RegisterActorInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) {
ActorID actor_id = ActorID::FromBinary(request.actor_table_data().actor_id());
RAY_LOG(DEBUG) << "Registering actor info, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id;
const auto &actor_table_data = request.actor_table_data();
auto on_done = [this, actor_id, actor_table_data, reply,
send_reply_callback](const Status &status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to register actor info: " << status.ToString()
<< ", job id = " << actor_id.JobId() << ", actor id = " << actor_id;
} else {
RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor_id.Hex(),
actor_table_data.SerializeAsString(), nullptr));
RAY_LOG(DEBUG) << "Finished registering actor info, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id;
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status =
gcs_table_storage_->ActorTable().Put(actor_id, actor_table_data, on_done);
if (!status.ok()) {
on_done(status);
}
}
void DefaultActorInfoHandler::HandleUpdateActorInfo(
const rpc::UpdateActorInfoRequest &request, rpc::UpdateActorInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) {
ActorID actor_id = ActorID::FromBinary(request.actor_id());
RAY_LOG(DEBUG) << "Updating actor info, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id;
const auto &actor_table_data = request.actor_table_data();
auto on_done = [this, actor_id, actor_table_data, reply,
send_reply_callback](const Status &status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to update actor info: " << status.ToString()
<< ", job id = " << actor_id.JobId() << ", actor id = " << actor_id;
} else {
RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor_id.Hex(),
actor_table_data.SerializeAsString(), nullptr));
RAY_LOG(DEBUG) << "Finished updating actor info, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id;
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status =
gcs_table_storage_->ActorTable().Put(actor_id, actor_table_data, on_done);
if (!status.ok()) {
on_done(status);
}
}
void DefaultActorInfoHandler::HandleAddActorCheckpoint(
const AddActorCheckpointRequest &request, AddActorCheckpointReply *reply,
SendReplyCallback send_reply_callback) {
ActorID actor_id = ActorID::FromBinary(request.checkpoint_data().actor_id());
ActorCheckpointID checkpoint_id =
ActorCheckpointID::FromBinary(request.checkpoint_data().checkpoint_id());
RAY_LOG(DEBUG) << "Adding actor checkpoint, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id << ", checkpoint id = " << checkpoint_id;
auto on_done = [this, actor_id, checkpoint_id, reply,
send_reply_callback](const Status &status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to add actor checkpoint: " << status.ToString()
<< ", job id = " << actor_id.JobId() << ", actor id = " << actor_id
<< ", checkpoint id = " << checkpoint_id;
} else {
auto on_get_done = [this, actor_id, checkpoint_id, reply, send_reply_callback](
const Status &status,
const boost::optional<ActorCheckpointIdData> &result) {
ActorCheckpointIdData actor_checkpoint_id;
if (result) {
actor_checkpoint_id.CopyFrom(*result);
} else {
actor_checkpoint_id.set_actor_id(actor_id.Binary());
}
actor_checkpoint_id.add_checkpoint_ids(checkpoint_id.Binary());
actor_checkpoint_id.add_timestamps(absl::GetCurrentTimeNanos() / 1000000);
auto on_put_done = [actor_id, checkpoint_id, reply,
send_reply_callback](const Status &status) {
RAY_LOG(DEBUG) << "Finished adding actor checkpoint, job id = "
<< actor_id.JobId() << ", actor id = " << actor_id
<< ", checkpoint id = " << checkpoint_id;
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
RAY_CHECK_OK(gcs_table_storage_->ActorCheckpointIdTable().Put(
actor_id, actor_checkpoint_id, on_put_done));
};
RAY_CHECK_OK(
gcs_table_storage_->ActorCheckpointIdTable().Get(actor_id, on_get_done));
}
};
Status status = gcs_table_storage_->ActorCheckpointTable().Put(
checkpoint_id, request.checkpoint_data(), on_done);
if (!status.ok()) {
on_done(status);
}
}
void DefaultActorInfoHandler::HandleGetActorCheckpoint(
const GetActorCheckpointRequest &request, GetActorCheckpointReply *reply,
SendReplyCallback send_reply_callback) {
ActorID actor_id = ActorID::FromBinary(request.actor_id());
ActorCheckpointID checkpoint_id =
ActorCheckpointID::FromBinary(request.checkpoint_id());
RAY_LOG(DEBUG) << "Getting actor checkpoint, job id = " << actor_id.JobId()
<< ", checkpoint id = " << checkpoint_id;
auto on_done = [actor_id, checkpoint_id, reply, send_reply_callback](
const Status &status,
const boost::optional<ActorCheckpointData> &result) {
if (status.ok()) {
if (result) {
reply->mutable_checkpoint_data()->CopyFrom(*result);
}
RAY_LOG(DEBUG) << "Finished getting actor checkpoint, job id = " << actor_id.JobId()
<< ", checkpoint id = " << checkpoint_id;
} else {
RAY_LOG(ERROR) << "Failed to get actor checkpoint: " << status.ToString()
<< ", job id = " << actor_id.JobId()
<< ", checkpoint id = " << checkpoint_id;
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_table_storage_->ActorCheckpointTable().Get(checkpoint_id, on_done);
if (!status.ok()) {
on_done(status, boost::none);
}
}
void DefaultActorInfoHandler::HandleGetActorCheckpointID(
const GetActorCheckpointIDRequest &request, GetActorCheckpointIDReply *reply,
SendReplyCallback send_reply_callback) {
ActorID actor_id = ActorID::FromBinary(request.actor_id());
RAY_LOG(DEBUG) << "Getting actor checkpoint id, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id;
auto on_done = [actor_id, reply, send_reply_callback](
const Status &status,
const boost::optional<ActorCheckpointIdData> &result) {
if (status.ok()) {
if (result) {
reply->mutable_checkpoint_id_data()->CopyFrom(*result);
}
RAY_LOG(DEBUG) << "Finished getting actor checkpoint id, job id = "
<< actor_id.JobId() << ", actor id = " << actor_id;
} else {
RAY_LOG(ERROR) << "Failed to get actor checkpoint id: " << status.ToString()
<< ", job id = " << actor_id.JobId() << ", actor id = " << actor_id;
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_table_storage_->ActorCheckpointIdTable().Get(actor_id, on_done);
if (!status.ok()) {
on_done(status, boost::none);
}
}
} // namespace rpc
} // namespace ray

View file

@ -1,80 +0,0 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RAY_GCS_ACTOR_INFO_HANDLER_IMPL_H
#define RAY_GCS_ACTOR_INFO_HANDLER_IMPL_H
#include "gcs_actor_manager.h"
#include "gcs_table_storage.h"
#include "ray/gcs/redis_gcs_client.h"
#include "ray/rpc/gcs_server/gcs_rpc_server.h"
namespace ray {
namespace rpc {
/// This implementation class of `ActorInfoHandler`.
class DefaultActorInfoHandler : public rpc::ActorInfoHandler {
public:
explicit DefaultActorInfoHandler(
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
gcs::GcsActorManager &gcs_actor_manager,
std::shared_ptr<gcs::GcsPubSub> &gcs_pub_sub)
: gcs_table_storage_(std::move(gcs_table_storage)),
gcs_actor_manager_(gcs_actor_manager),
gcs_pub_sub_(gcs_pub_sub) {}
void HandleCreateActor(const CreateActorRequest &request, CreateActorReply *reply,
SendReplyCallback send_reply_callback) override;
void HandleGetActorInfo(const GetActorInfoRequest &request, GetActorInfoReply *reply,
SendReplyCallback send_reply_callback) override;
void HandleGetNamedActorInfo(const GetNamedActorInfoRequest &request,
GetNamedActorInfoReply *reply,
SendReplyCallback send_reply_callback) override;
void HandleGetAllActorInfo(const GetAllActorInfoRequest &request,
GetAllActorInfoReply *reply,
SendReplyCallback send_reply_callback) override;
void HandleRegisterActorInfo(const RegisterActorInfoRequest &request,
RegisterActorInfoReply *reply,
SendReplyCallback send_reply_callback) override;
void HandleUpdateActorInfo(const UpdateActorInfoRequest &request,
UpdateActorInfoReply *reply,
SendReplyCallback send_reply_callback) override;
void HandleAddActorCheckpoint(const AddActorCheckpointRequest &request,
AddActorCheckpointReply *reply,
SendReplyCallback send_reply_callback) override;
void HandleGetActorCheckpoint(const GetActorCheckpointRequest &request,
GetActorCheckpointReply *reply,
SendReplyCallback send_reply_callback) override;
void HandleGetActorCheckpointID(const GetActorCheckpointIDRequest &request,
GetActorCheckpointIDReply *reply,
SendReplyCallback send_reply_callback) override;
private:
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
gcs::GcsActorManager &gcs_actor_manager_;
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
};
} // namespace rpc
} // namespace ray
#endif // RAY_GCS_ACTOR_INFO_HANDLER_IMPL_H

View file

@ -88,14 +88,281 @@ rpc::ActorTableData *GcsActor::GetMutableActorTableData() { return &actor_table_
/////////////////////////////////////////////////////////////////////////////////////////
GcsActorManager::GcsActorManager(std::shared_ptr<GcsActorSchedulerInterface> scheduler,
gcs::GcsActorTable &gcs_actor_table,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
const rpc::ClientFactoryFn &worker_client_factory)
: gcs_actor_scheduler_(std::move(scheduler)),
gcs_actor_table_(gcs_actor_table),
gcs_table_storage_(gcs_table_storage),
gcs_pub_sub_(std::move(gcs_pub_sub)),
worker_client_factory_(worker_client_factory) {}
void GcsActorManager::HandleCreateActor(const rpc::CreateActorRequest &request,
rpc::CreateActorReply *reply,
rpc::SendReplyCallback send_reply_callback) {
RAY_CHECK(request.task_spec().type() == TaskType::ACTOR_CREATION_TASK);
auto actor_id =
ActorID::FromBinary(request.task_spec().actor_creation_task_spec().actor_id());
RAY_LOG(INFO) << "Registering actor, actor id = " << actor_id;
Status status =
RegisterActor(request, [reply, send_reply_callback,
actor_id](const std::shared_ptr<gcs::GcsActor> &actor) {
RAY_LOG(INFO) << "Registered actor, actor id = " << actor_id;
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
});
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to create actor: " << status.ToString();
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
}
}
void GcsActorManager::HandleGetActorInfo(const rpc::GetActorInfoRequest &request,
rpc::GetActorInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) {
ActorID actor_id = ActorID::FromBinary(request.actor_id());
RAY_LOG(DEBUG) << "Getting actor info"
<< ", job id = " << actor_id.JobId() << ", actor id = " << actor_id;
auto on_done = [actor_id, reply, send_reply_callback](
const Status &status,
const boost::optional<ActorTableData> &result) {
if (result) {
reply->mutable_actor_table_data()->CopyFrom(*result);
}
RAY_LOG(DEBUG) << "Finished getting actor info, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id << ", status = " << status;
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
};
// Look up the actor_id in the GCS.
Status status = gcs_table_storage_->ActorTable().Get(actor_id, on_done);
if (!status.ok()) {
on_done(status, boost::none);
}
}
void GcsActorManager::HandleGetAllActorInfo(const rpc::GetAllActorInfoRequest &request,
rpc::GetAllActorInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) {
RAY_LOG(DEBUG) << "Getting all actor info.";
auto on_done = [reply, send_reply_callback](
const std::unordered_map<ActorID, ActorTableData> &result) {
for (auto &it : result) {
reply->add_actor_table_data()->CopyFrom(it.second);
}
RAY_LOG(DEBUG) << "Finished getting all actor info.";
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
};
Status status = gcs_table_storage_->ActorTable().GetAll(on_done);
if (!status.ok()) {
on_done(std::unordered_map<ActorID, ActorTableData>());
}
}
void GcsActorManager::HandleGetNamedActorInfo(
const rpc::GetNamedActorInfoRequest &request, rpc::GetNamedActorInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) {
const std::string &name = request.name();
RAY_LOG(DEBUG) << "Getting actor info"
<< ", name = " << name;
auto on_done = [name, reply, send_reply_callback](
const Status &status,
const boost::optional<ActorTableData> &result) {
if (status.ok()) {
if (result) {
reply->mutable_actor_table_data()->CopyFrom(*result);
}
} else {
RAY_LOG(ERROR) << "Failed to get actor info: " << status.ToString()
<< ", name = " << name;
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
// Try to look up the actor ID for the named actor.
ActorID actor_id = GetActorIDByName(name);
if (actor_id.IsNil()) {
// The named actor was not found.
std::stringstream stream;
stream << "Actor with name '" << name << "' was not found.";
on_done(Status::NotFound(stream.str()), boost::none);
} else {
// Look up the actor_id in the GCS.
Status status = gcs_table_storage_->ActorTable().Get(actor_id, on_done);
if (!status.ok()) {
on_done(status, boost::none);
}
RAY_LOG(DEBUG) << "Finished getting actor info, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id;
}
}
void GcsActorManager::HandleRegisterActorInfo(
const rpc::RegisterActorInfoRequest &request, rpc::RegisterActorInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) {
ActorID actor_id = ActorID::FromBinary(request.actor_table_data().actor_id());
RAY_LOG(DEBUG) << "Registering actor info, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id;
const auto &actor_table_data = request.actor_table_data();
auto on_done = [this, actor_id, actor_table_data, reply,
send_reply_callback](const Status &status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to register actor info: " << status.ToString()
<< ", job id = " << actor_id.JobId() << ", actor id = " << actor_id;
} else {
RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor_id.Hex(),
actor_table_data.SerializeAsString(), nullptr));
RAY_LOG(DEBUG) << "Finished registering actor info, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id;
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status =
gcs_table_storage_->ActorTable().Put(actor_id, actor_table_data, on_done);
if (!status.ok()) {
on_done(status);
}
}
void GcsActorManager::HandleUpdateActorInfo(const rpc::UpdateActorInfoRequest &request,
rpc::UpdateActorInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) {
ActorID actor_id = ActorID::FromBinary(request.actor_id());
RAY_LOG(DEBUG) << "Updating actor info, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id;
const auto &actor_table_data = request.actor_table_data();
auto on_done = [this, actor_id, actor_table_data, reply,
send_reply_callback](const Status &status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to update actor info: " << status.ToString()
<< ", job id = " << actor_id.JobId() << ", actor id = " << actor_id;
} else {
RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor_id.Hex(),
actor_table_data.SerializeAsString(), nullptr));
RAY_LOG(DEBUG) << "Finished updating actor info, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id;
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status =
gcs_table_storage_->ActorTable().Put(actor_id, actor_table_data, on_done);
if (!status.ok()) {
on_done(status);
}
}
void GcsActorManager::HandleAddActorCheckpoint(
const rpc::AddActorCheckpointRequest &request, rpc::AddActorCheckpointReply *reply,
rpc::SendReplyCallback send_reply_callback) {
ActorID actor_id = ActorID::FromBinary(request.checkpoint_data().actor_id());
ActorCheckpointID checkpoint_id =
ActorCheckpointID::FromBinary(request.checkpoint_data().checkpoint_id());
RAY_LOG(DEBUG) << "Adding actor checkpoint, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id << ", checkpoint id = " << checkpoint_id;
auto on_done = [this, actor_id, checkpoint_id, reply,
send_reply_callback](const Status &status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to add actor checkpoint: " << status.ToString()
<< ", job id = " << actor_id.JobId() << ", actor id = " << actor_id
<< ", checkpoint id = " << checkpoint_id;
} else {
auto on_get_done = [this, actor_id, checkpoint_id, reply, send_reply_callback](
const Status &status,
const boost::optional<ActorCheckpointIdData> &result) {
ActorCheckpointIdData actor_checkpoint_id;
if (result) {
actor_checkpoint_id.CopyFrom(*result);
} else {
actor_checkpoint_id.set_actor_id(actor_id.Binary());
}
actor_checkpoint_id.add_checkpoint_ids(checkpoint_id.Binary());
actor_checkpoint_id.add_timestamps(absl::GetCurrentTimeNanos() / 1000000);
auto on_put_done = [actor_id, checkpoint_id, reply,
send_reply_callback](const Status &status) {
RAY_LOG(DEBUG) << "Finished adding actor checkpoint, job id = "
<< actor_id.JobId() << ", actor id = " << actor_id
<< ", checkpoint id = " << checkpoint_id;
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
RAY_CHECK_OK(gcs_table_storage_->ActorCheckpointIdTable().Put(
actor_id, actor_checkpoint_id, on_put_done));
};
RAY_CHECK_OK(
gcs_table_storage_->ActorCheckpointIdTable().Get(actor_id, on_get_done));
}
};
Status status = gcs_table_storage_->ActorCheckpointTable().Put(
checkpoint_id, request.checkpoint_data(), on_done);
if (!status.ok()) {
on_done(status);
}
}
void GcsActorManager::HandleGetActorCheckpoint(
const rpc::GetActorCheckpointRequest &request, rpc::GetActorCheckpointReply *reply,
rpc::SendReplyCallback send_reply_callback) {
ActorID actor_id = ActorID::FromBinary(request.actor_id());
ActorCheckpointID checkpoint_id =
ActorCheckpointID::FromBinary(request.checkpoint_id());
RAY_LOG(DEBUG) << "Getting actor checkpoint, job id = " << actor_id.JobId()
<< ", checkpoint id = " << checkpoint_id;
auto on_done = [actor_id, checkpoint_id, reply, send_reply_callback](
const Status &status,
const boost::optional<ActorCheckpointData> &result) {
if (status.ok()) {
if (result) {
reply->mutable_checkpoint_data()->CopyFrom(*result);
}
RAY_LOG(DEBUG) << "Finished getting actor checkpoint, job id = " << actor_id.JobId()
<< ", checkpoint id = " << checkpoint_id;
} else {
RAY_LOG(ERROR) << "Failed to get actor checkpoint: " << status.ToString()
<< ", job id = " << actor_id.JobId()
<< ", checkpoint id = " << checkpoint_id;
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_table_storage_->ActorCheckpointTable().Get(checkpoint_id, on_done);
if (!status.ok()) {
on_done(status, boost::none);
}
}
void GcsActorManager::HandleGetActorCheckpointID(
const rpc::GetActorCheckpointIDRequest &request,
rpc::GetActorCheckpointIDReply *reply, rpc::SendReplyCallback send_reply_callback) {
ActorID actor_id = ActorID::FromBinary(request.actor_id());
RAY_LOG(DEBUG) << "Getting actor checkpoint id, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id;
auto on_done = [actor_id, reply, send_reply_callback](
const Status &status,
const boost::optional<ActorCheckpointIdData> &result) {
if (status.ok()) {
if (result) {
reply->mutable_checkpoint_id_data()->CopyFrom(*result);
}
RAY_LOG(DEBUG) << "Finished getting actor checkpoint id, job id = "
<< actor_id.JobId() << ", actor id = " << actor_id;
} else {
RAY_LOG(ERROR) << "Failed to get actor checkpoint id: " << status.ToString()
<< ", job id = " << actor_id.JobId() << ", actor id = " << actor_id;
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_table_storage_->ActorCheckpointIdTable().Get(actor_id, on_done);
if (!status.ok()) {
on_done(status, boost::none);
}
}
Status GcsActorManager::RegisterActor(
const ray::rpc::CreateActorRequest &request,
std::function<void(std::shared_ptr<GcsActor>)> callback) {
@ -273,13 +540,13 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id) {
auto actor_table_data =
std::make_shared<rpc::ActorTableData>(*mutable_actor_table_data);
// The backend storage is reliable in the future, so the status must be ok.
RAY_CHECK_OK(gcs_actor_table_.Put(actor->GetActorID(), *actor_table_data,
[this, actor_id, actor_table_data](Status status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(
ACTOR_CHANNEL, actor_id.Hex(),
actor_table_data->SerializeAsString(),
nullptr));
}));
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
actor->GetActorID(), *actor_table_data,
[this, actor_id, actor_table_data](Status status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor_id.Hex(),
actor_table_data->SerializeAsString(),
nullptr));
}));
}
void GcsActorManager::OnWorkerDead(const ray::ClientID &node_id,
@ -381,7 +648,7 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche
mutable_actor_table_data->set_num_restarts(++num_restarts);
mutable_actor_table_data->set_state(rpc::ActorTableData::RESTARTING);
// The backend storage is reliable in the future, so the status must be ok.
RAY_CHECK_OK(gcs_actor_table_.Put(
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
actor_id, *mutable_actor_table_data,
[this, actor_id, mutable_actor_table_data](Status status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(
@ -392,7 +659,7 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche
} else {
mutable_actor_table_data->set_state(rpc::ActorTableData::DEAD);
// The backend storage is reliable in the future, so the status must be ok.
RAY_CHECK_OK(gcs_actor_table_.Put(
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
actor_id, *mutable_actor_table_data,
[this, actor_id, mutable_actor_table_data](Status status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(
@ -418,7 +685,7 @@ void GcsActorManager::OnActorCreationSuccess(std::shared_ptr<GcsActor> actor) {
actor->UpdateState(rpc::ActorTableData::ALIVE);
auto actor_table_data = actor->GetActorTableData();
// The backend storage is reliable in the future, so the status must be ok.
RAY_CHECK_OK(gcs_actor_table_.Put(
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
actor_id, actor_table_data, [this, actor_id, actor_table_data](Status status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor_id.Hex(),
actor_table_data.SerializeAsString(),

View file

@ -26,6 +26,8 @@
#include "gcs_actor_scheduler.h"
#include "gcs_table_storage.h"
#include "ray/gcs/pubsub/gcs_pub_sub.h"
#include "ray/gcs/redis_gcs_client.h"
#include "ray/rpc/gcs_server/gcs_rpc_server.h"
namespace ray {
namespace gcs {
@ -112,20 +114,56 @@ class GcsActor {
using RegisterActorCallback = std::function<void(std::shared_ptr<GcsActor>)>;
/// GcsActorManager is responsible for managing the lifecycle of all actors.
/// This class is not thread-safe.
class GcsActorManager {
class GcsActorManager : public rpc::ActorInfoHandler {
public:
/// Create a GcsActorManager
///
/// \param scheduler Used to schedule actor creation tasks.
/// \param gcs_actor_table Used to flush actor data to storage.
/// \param gcs_table_storage Used to flush actor data to storage.
/// \param gcs_pub_sub Used to publish gcs message.
GcsActorManager(std::shared_ptr<GcsActorSchedulerInterface> scheduler,
gcs::GcsActorTable &gcs_actor_table,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
const rpc::ClientFactoryFn &worker_client_factory = nullptr);
~GcsActorManager() = default;
void HandleCreateActor(const rpc::CreateActorRequest &request,
rpc::CreateActorReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
void HandleGetActorInfo(const rpc::GetActorInfoRequest &request,
rpc::GetActorInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
void HandleGetNamedActorInfo(const rpc::GetNamedActorInfoRequest &request,
rpc::GetNamedActorInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
void HandleGetAllActorInfo(const rpc::GetAllActorInfoRequest &request,
rpc::GetAllActorInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
void HandleRegisterActorInfo(const rpc::RegisterActorInfoRequest &request,
rpc::RegisterActorInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
void HandleUpdateActorInfo(const rpc::UpdateActorInfoRequest &request,
rpc::UpdateActorInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
void HandleAddActorCheckpoint(const rpc::AddActorCheckpointRequest &request,
rpc::AddActorCheckpointReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
void HandleGetActorCheckpoint(const rpc::GetActorCheckpointRequest &request,
rpc::GetActorCheckpointReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
void HandleGetActorCheckpointID(const rpc::GetActorCheckpointIDRequest &request,
rpc::GetActorCheckpointIDReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Register actor asynchronously.
///
/// \param request Contains the meta info to create the actor.
@ -234,8 +272,8 @@ class GcsActorManager {
/// The scheduler to schedule all registered actors.
std::shared_ptr<gcs::GcsActorSchedulerInterface> gcs_actor_scheduler_;
/// Actor table. Used to update actor information upon creation, deletion, etc.
gcs::GcsActorTable &gcs_actor_table_;
/// Used to update actor information upon creation, deletion, etc.
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
/// A publisher for publishing gcs messages.
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
/// Factory to produce clients to workers. This is used to communicate with

View file

@ -14,7 +14,6 @@
#include "gcs_server.h"
#include "actor_info_handler_impl.h"
#include "error_info_handler_impl.h"
#include "gcs_actor_manager.h"
#include "gcs_node_manager.h"
@ -64,9 +63,8 @@ void GcsServer::Start() {
job_info_service_.reset(new rpc::JobInfoGrpcService(main_service_, *job_info_handler_));
rpc_server_.RegisterService(*job_info_service_);
actor_info_handler_ = InitActorInfoHandler();
actor_info_service_.reset(
new rpc::ActorInfoGrpcService(main_service_, *actor_info_handler_));
new rpc::ActorInfoGrpcService(main_service_, *gcs_actor_manager_));
rpc_server_.RegisterService(*actor_info_service_);
node_info_service_.reset(
@ -166,8 +164,7 @@ void GcsServer::InitGcsActorManager() {
return std::make_shared<rpc::CoreWorkerClient>(address, client_call_manager_);
});
gcs_actor_manager_ = std::make_shared<GcsActorManager>(
scheduler, gcs_table_storage_->ActorTable(), gcs_pub_sub_,
[this](const rpc::Address &address) {
scheduler, gcs_table_storage_, gcs_pub_sub_, [this](const rpc::Address &address) {
return std::make_shared<rpc::CoreWorkerClient>(address, client_call_manager_);
});
gcs_node_manager_->AddNodeAddedListener(
@ -201,11 +198,6 @@ std::unique_ptr<rpc::JobInfoHandler> GcsServer::InitJobInfoHandler() {
new rpc::DefaultJobInfoHandler(gcs_table_storage_, gcs_pub_sub_));
}
std::unique_ptr<rpc::ActorInfoHandler> GcsServer::InitActorInfoHandler() {
return std::unique_ptr<rpc::DefaultActorInfoHandler>(new rpc::DefaultActorInfoHandler(
gcs_table_storage_, *gcs_actor_manager_, gcs_pub_sub_));
}
std::unique_ptr<rpc::ObjectInfoHandler> GcsServer::InitObjectInfoHandler() {
return std::unique_ptr<GcsObjectManager>(
new GcsObjectManager(gcs_table_storage_, gcs_pub_sub_, *gcs_node_manager_));

View file

@ -76,15 +76,12 @@ class GcsServer {
/// cluster.
virtual void InitGcsNodeManager();
/// Initialize the gcs node manager.
/// Initialize the gcs actor manager.
virtual void InitGcsActorManager();
/// The job info handler
virtual std::unique_ptr<rpc::JobInfoHandler> InitJobInfoHandler();
/// The actor info handler
virtual std::unique_ptr<rpc::ActorInfoHandler> InitActorInfoHandler();
/// The object info handler
virtual std::unique_ptr<rpc::ObjectInfoHandler> InitObjectInfoHandler();
@ -125,8 +122,7 @@ class GcsServer {
/// Job info handler and service
std::unique_ptr<rpc::JobInfoHandler> job_info_handler_;
std::unique_ptr<rpc::JobInfoGrpcService> job_info_service_;
/// Actor info handler and service
std::unique_ptr<rpc::ActorInfoHandler> actor_info_handler_;
/// Actor info service
std::unique_ptr<rpc::ActorInfoGrpcService> actor_info_service_;
/// Node info handler and service
std::unique_ptr<rpc::NodeInfoGrpcService> node_info_service_;

View file

@ -74,16 +74,15 @@ class GcsActorManagerTest : public ::testing::Test {
worker_client_(new MockWorkerClient()) {
gcs_pub_sub_ = std::make_shared<GcsServerMocker::MockGcsPubSub>(redis_client_);
store_client_ = std::make_shared<gcs::InMemoryStoreClient>(io_service_);
gcs_actor_table_ =
std::make_shared<GcsServerMocker::MockedGcsActorTable>(store_client_);
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
gcs_actor_manager_.reset(new gcs::GcsActorManager(
mock_actor_scheduler_, *gcs_actor_table_, gcs_pub_sub_,
mock_actor_scheduler_, gcs_table_storage_, gcs_pub_sub_,
[&](const rpc::Address &addr) { return worker_client_; }));
}
boost::asio::io_service io_service_;
std::shared_ptr<gcs::StoreClient> store_client_;
std::shared_ptr<GcsServerMocker::MockedGcsActorTable> gcs_actor_table_;
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
std::shared_ptr<MockActorScheduler> mock_actor_scheduler_;
std::shared_ptr<MockWorkerClient> worker_client_;
std::unique_ptr<gcs::GcsActorManager> gcs_actor_manager_;