[Pubsub] Wrap Redis-based publisher in GCS to allow incrementally switching to the GCS-based publisher (#19600)

## Why are these changes needed?
The most significant change of the PR is the `GcsPublisher` wrapper added to `src/ray/gcs/pubsub/gcs_pub_sub.h`. It forwards publishing to the underlying `GcsPubSub` (Redis-based) or `pubsub::Publisher` (GCS-based) depending on the migration status, so it allows incremental migration by channel.
   -  Since it was decided that we want to use typed ID and messages for GCS-based publishing, each member function of `GcsPublisher` accepts a typed message.

Most of the modified files are from migrating publishing logic in GCS to use `GcsPublisher` instead of `GcsPubSub`.

Later on, `GcsPublisher` member functions will be migrated to use GCS-based publishing.

This change should make no functionality difference. If this looks ok, a similar change would be made for subscribers in GCS client.

## Related issue number
This commit is contained in:
mwtian 2021-10-22 10:52:36 -07:00 committed by GitHub
parent 0760fe869d
commit 530f2d7c5e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
34 changed files with 347 additions and 236 deletions

View file

@ -485,20 +485,17 @@ cc_binary(
cc_library(
name = "gcs_pub_sub_lib",
srcs = glob(
[
srcs = [
"src/ray/gcs/pubsub/gcs_pub_sub.cc",
],
),
hdrs = glob(
[
hdrs = [
"src/ray/gcs/pubsub/gcs_pub_sub.h",
],
),
copts = COPTS,
strip_include_prefix = "src",
deps = [
":gcs",
":pubsub_lib",
":ray_common",
":redis_client",
],

View file

@ -34,8 +34,8 @@ const ResourceSet &GcsActorWorkerAssignment::GetResources() const {
bool GcsActorWorkerAssignment::IsShared() const { return is_shared_; }
GcsBasedActorScheduler::GcsBasedActorScheduler(
instrumented_io_context &io_context, gcs::GcsActorTable &gcs_actor_table,
const GcsNodeManager &gcs_node_manager, std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
instrumented_io_context &io_context, GcsActorTable &gcs_actor_table,
const GcsNodeManager &gcs_node_manager,
std::shared_ptr<GcsResourceManager> gcs_resource_manager,
std::shared_ptr<GcsResourceScheduler> gcs_resource_scheduler,
std::function<void(std::shared_ptr<GcsActor>)> schedule_failure_handler,
@ -43,7 +43,7 @@ GcsBasedActorScheduler::GcsBasedActorScheduler(
schedule_success_handler,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
rpc::ClientFactoryFn client_factory)
: GcsActorScheduler(io_context, gcs_actor_table, gcs_node_manager, gcs_pub_sub,
: GcsActorScheduler(io_context, gcs_actor_table, gcs_node_manager,
schedule_failure_handler, schedule_success_handler,
raylet_client_pool, client_factory),
gcs_resource_manager_(std::move(gcs_resource_manager)),

View file

@ -81,8 +81,8 @@ class GcsBasedActorScheduler : public GcsActorScheduler {
/// \param client_factory Factory to create remote core worker client, default factor
/// will be used if not set.
explicit GcsBasedActorScheduler(
instrumented_io_context &io_context, gcs::GcsActorTable &gcs_actor_table,
const GcsNodeManager &gcs_node_manager, std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
instrumented_io_context &io_context, GcsActorTable &gcs_actor_table,
const GcsNodeManager &gcs_node_manager,
std::shared_ptr<GcsResourceManager> gcs_resource_manager,
std::shared_ptr<GcsResourceScheduler> gcs_resource_scheduler,
std::function<void(std::shared_ptr<GcsActor>)> schedule_failure_handler,

View file

@ -109,8 +109,8 @@ void GcsActor::SetActorWorkerAssignment(
GcsActorManager::GcsActorManager(
boost::asio::io_context &io_context,
std::shared_ptr<GcsActorSchedulerInterface> scheduler,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub, RuntimeEnvManager &runtime_env_manager,
std::shared_ptr<GcsTableStorage> gcs_table_storage,
std::shared_ptr<GcsPublisher> gcs_publisher, RuntimeEnvManager &runtime_env_manager,
std::function<void(const ActorID &)> destroy_owned_placement_group_if_needed,
std::function<std::string(const JobID &)> get_ray_namespace,
std::function<void(std::function<void(void)>, boost::posix_time::milliseconds)>
@ -119,7 +119,7 @@ GcsActorManager::GcsActorManager(
: io_context_(io_context),
gcs_actor_scheduler_(std::move(scheduler)),
gcs_table_storage_(std::move(gcs_table_storage)),
gcs_pub_sub_(std::move(gcs_pub_sub)),
gcs_publisher_(std::move(gcs_publisher)),
worker_client_factory_(worker_client_factory),
destroy_owned_placement_group_if_needed_(destroy_owned_placement_group_if_needed),
get_ray_namespace_(get_ray_namespace),
@ -373,8 +373,8 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ
absl::GetCurrentTimeNanos(), job_id);
RAY_LOG(WARNING) << error_data_ptr->SerializeAsString();
RAY_CHECK_OK(gcs_pub_sub_->Publish(ERROR_INFO_CHANNEL, job_id.Hex(),
error_data_ptr->SerializeAsString(), nullptr));
RAY_CHECK_OK(
gcs_publisher_->PublishError(job_id.Hex(), *error_data_ptr, nullptr));
}
actors_in_namespace.emplace(actor->GetName(), actor->GetActorID());
@ -420,9 +420,8 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ
// the actor state to DEAD to avoid race condition.
return;
}
RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor->GetActorID().Hex(),
actor->GetActorTableData().SerializeAsString(),
nullptr));
RAY_CHECK_OK(gcs_publisher_->PublishActor(actor->GetActorID(),
actor->GetActorTableData(), nullptr));
// Invoke all callbacks for all registration requests of this actor (duplicated
// requests are included) and remove all of them from
// actor_to_register_callbacks_.
@ -491,8 +490,7 @@ Status GcsActorManager::CreateActor(const ray::rpc::CreateActorRequest &request,
actor->GetMutableActorTableData()->set_state(rpc::ActorTableData::PENDING_CREATION);
const auto &actor_table_data = actor->GetActorTableData();
// Pub this state for dashboard showing.
RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor_id.Hex(),
actor_table_data.SerializeAsString(), nullptr));
RAY_CHECK_OK(gcs_publisher_->PublishActor(actor_id, actor_table_data, nullptr));
RemoveUnresolvedActor(actor);
// Update the registered actor as its creation task specification may have changed due
@ -668,9 +666,8 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id) {
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
actor->GetActorID(), *actor_table_data,
[this, actor_id, actor_table_data](Status status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(
ACTOR_CHANNEL, actor_id.Hex(),
GenActorDataOnlyWithStates(*actor_table_data)->SerializeAsString(), nullptr));
RAY_CHECK_OK(gcs_publisher_->PublishActor(
actor_id, *GenActorDataOnlyWithStates(*actor_table_data), nullptr));
// Destroy placement group owned by this actor.
destroy_owned_placement_group_if_needed_(actor_id);
}));
@ -867,10 +864,8 @@ void GcsActorManager::ReconstructActor(
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
actor_id, *mutable_actor_table_data,
[this, actor_id, mutable_actor_table_data](Status status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(
ACTOR_CHANNEL, actor_id.Hex(),
GenActorDataOnlyWithStates(*mutable_actor_table_data)->SerializeAsString(),
nullptr));
RAY_CHECK_OK(gcs_publisher_->PublishActor(
actor_id, *GenActorDataOnlyWithStates(*mutable_actor_table_data), nullptr));
}));
gcs_actor_scheduler_->Schedule(actor);
} else {
@ -907,10 +902,8 @@ void GcsActorManager::ReconstructActor(
if (actor->IsDetached()) {
DestroyActor(actor_id);
}
RAY_CHECK_OK(gcs_pub_sub_->Publish(
ACTOR_CHANNEL, actor_id.Hex(),
GenActorDataOnlyWithStates(*mutable_actor_table_data)->SerializeAsString(),
nullptr));
RAY_CHECK_OK(gcs_publisher_->PublishActor(
actor_id, *GenActorDataOnlyWithStates(*mutable_actor_table_data), nullptr));
}));
// The actor is dead, but we should not remove the entry from the
// registered actors yet. If the actor is owned, we will destroy the actor
@ -958,9 +951,8 @@ void GcsActorManager::OnActorCreationSuccess(const std::shared_ptr<GcsActor> &ac
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
actor_id, actor_table_data,
[this, actor_id, actor_table_data, actor, reply](Status status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(
ACTOR_CHANNEL, actor_id.Hex(),
GenActorDataOnlyWithStates(actor_table_data)->SerializeAsString(), nullptr));
RAY_CHECK_OK(gcs_publisher_->PublishActor(
actor_id, *GenActorDataOnlyWithStates(actor_table_data), nullptr));
// Invoke all callbacks for all registration requests of this actor (duplicated
// requests are included) and remove all of them from
// actor_to_create_callbacks_.

View file

@ -193,12 +193,12 @@ class GcsActorManager : public rpc::ActorInfoHandler {
///
/// \param scheduler Used to schedule actor creation tasks.
/// \param gcs_table_storage Used to flush actor data to storage.
/// \param gcs_pub_sub Used to publish gcs message.
/// \param gcs_publisher Used to publish gcs message.
GcsActorManager(
boost::asio::io_context &io_context,
std::shared_ptr<GcsActorSchedulerInterface> scheduler,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub, RuntimeEnvManager &runtime_env_manager,
std::shared_ptr<GcsTableStorage> gcs_table_storage,
std::shared_ptr<GcsPublisher> gcs_publisher, RuntimeEnvManager &runtime_env_manager,
std::function<void(const ActorID &)> destroy_ownded_placement_group_if_needed,
std::function<std::string(const JobID &)> get_ray_namespace,
std::function<void(std::function<void(void)>, boost::posix_time::milliseconds)>
@ -494,11 +494,11 @@ class GcsActorManager : public rpc::ActorInfoHandler {
boost::asio::io_context &io_context_;
/// The scheduler to schedule all registered actors.
std::shared_ptr<gcs::GcsActorSchedulerInterface> gcs_actor_scheduler_;
std::shared_ptr<GcsActorSchedulerInterface> gcs_actor_scheduler_;
/// Used to update actor information upon creation, deletion, etc.
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
std::shared_ptr<GcsTableStorage> gcs_table_storage_;
/// A publisher for publishing gcs messages.
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
std::shared_ptr<GcsPublisher> gcs_publisher_;
/// Factory to produce clients to workers. This is used to communicate with
/// actors and their owners.
rpc::ClientFactoryFn worker_client_factory_;

View file

@ -24,9 +24,8 @@ namespace ray {
namespace gcs {
GcsActorScheduler::GcsActorScheduler(
instrumented_io_context &io_context, gcs::GcsActorTable &gcs_actor_table,
const gcs::GcsNodeManager &gcs_node_manager,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
instrumented_io_context &io_context, GcsActorTable &gcs_actor_table,
const GcsNodeManager &gcs_node_manager,
std::function<void(std::shared_ptr<GcsActor>)> schedule_failure_handler,
std::function<void(std::shared_ptr<GcsActor>, const rpc::PushTaskReply &reply)>
schedule_success_handler,
@ -35,7 +34,6 @@ GcsActorScheduler::GcsActorScheduler(
: io_context_(io_context),
gcs_actor_table_(gcs_actor_table),
gcs_node_manager_(gcs_node_manager),
gcs_pub_sub_(std::move(gcs_pub_sub)),
schedule_failure_handler_(std::move(schedule_failure_handler)),
schedule_success_handler_(std::move(schedule_success_handler)),
raylet_client_pool_(raylet_client_pool),

View file

@ -100,8 +100,8 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
/// \param client_factory Factory to create remote core worker client, default factor
/// will be used if not set.
explicit GcsActorScheduler(
instrumented_io_context &io_context, gcs::GcsActorTable &gcs_actor_table,
const GcsNodeManager &gcs_node_manager, std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
instrumented_io_context &io_context, GcsActorTable &gcs_actor_table,
const GcsNodeManager &gcs_node_manager,
std::function<void(std::shared_ptr<GcsActor>)> schedule_failure_handler,
std::function<void(std::shared_ptr<GcsActor>, const rpc::PushTaskReply &reply)>
schedule_success_handler,
@ -305,8 +305,6 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
node_to_workers_when_creating_;
/// Reference of GcsNodeManager.
const GcsNodeManager &gcs_node_manager_;
/// A publisher for publishing gcs messages.
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
/// The handler to handle the scheduling failures.
std::function<void(std::shared_ptr<GcsActor>)> schedule_failure_handler_;
/// The handler to handle the successful scheduling.

View file

@ -46,9 +46,7 @@ void GcsJobManager::HandleAddJob(const rpc::AddJobRequest &request,
RAY_LOG(ERROR) << "Failed to add job, job id = " << job_id
<< ", driver pid = " << mutable_job_table_data.driver_pid();
} else {
RAY_CHECK_OK(gcs_pub_sub_->Publish(JOB_CHANNEL, job_id.Hex(),
mutable_job_table_data.SerializeAsString(),
nullptr));
RAY_CHECK_OK(gcs_publisher_->PublishJob(job_id, mutable_job_table_data, nullptr));
if (mutable_job_table_data.config().has_runtime_env()) {
runtime_env_manager_.AddURIReference(
job_id.Hex(), mutable_job_table_data.config().runtime_env());
@ -81,8 +79,7 @@ void GcsJobManager::MarkJobAsFinished(rpc::JobTableData job_table_data,
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to mark job state, job id = " << job_id;
} else {
RAY_CHECK_OK(gcs_pub_sub_->Publish(JOB_CHANNEL, job_id.Hex(),
job_table_data.SerializeAsString(), nullptr));
RAY_CHECK_OK(gcs_publisher_->PublishJob(job_id, job_table_data, nullptr));
runtime_env_manager_.RemoveURIReference(job_id.Hex());
ClearJobInfos(job_id);
RAY_LOG(INFO) << "Finished marking job state, job id = " << job_id;
@ -159,8 +156,7 @@ void GcsJobManager::HandleReportJobError(const rpc::ReportJobErrorRequest &reque
rpc::ReportJobErrorReply *reply,
rpc::SendReplyCallback send_reply_callback) {
auto job_id = JobID::FromBinary(request.job_error().job_id());
RAY_CHECK_OK(gcs_pub_sub_->Publish(ERROR_INFO_CHANNEL, job_id.Hex(),
request.job_error().SerializeAsString(), nullptr));
RAY_CHECK_OK(gcs_publisher_->PublishError(job_id.Hex(), request.job_error(), nullptr));
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
}

View file

@ -26,11 +26,11 @@ namespace gcs {
/// This implementation class of `JobInfoHandler`.
class GcsJobManager : public rpc::JobInfoHandler {
public:
explicit GcsJobManager(std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
explicit GcsJobManager(std::shared_ptr<GcsTableStorage> gcs_table_storage,
std::shared_ptr<GcsPublisher> gcs_publisher,
RuntimeEnvManager &runtime_env_manager)
: gcs_table_storage_(std::move(gcs_table_storage)),
gcs_pub_sub_(std::move(gcs_pub_sub)),
gcs_publisher_(std::move(gcs_publisher)),
runtime_env_manager_(runtime_env_manager) {}
void Initialize(const GcsInitData &gcs_init_data);
@ -60,8 +60,8 @@ class GcsJobManager : public rpc::JobInfoHandler {
std::string GetRayNamespace(const JobID &job_id) const;
private:
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
std::shared_ptr<GcsTableStorage> gcs_table_storage_;
std::shared_ptr<GcsPublisher> gcs_publisher_;
/// Listeners which monitors the finish of jobs.
std::vector<std::function<void(std::shared_ptr<JobID>)>> job_finished_listeners_;

View file

@ -25,9 +25,9 @@ namespace ray {
namespace gcs {
//////////////////////////////////////////////////////////////////////////////////////////
GcsNodeManager::GcsNodeManager(std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage)
: gcs_pub_sub_(gcs_pub_sub), gcs_table_storage_(gcs_table_storage) {}
GcsNodeManager::GcsNodeManager(std::shared_ptr<GcsPublisher> gcs_publisher,
std::shared_ptr<GcsTableStorage> gcs_table_storage)
: gcs_publisher_(gcs_publisher), gcs_table_storage_(gcs_table_storage) {}
void GcsNodeManager::HandleRegisterNode(const rpc::RegisterNodeRequest &request,
rpc::RegisterNodeReply *reply,
@ -40,8 +40,7 @@ void GcsNodeManager::HandleRegisterNode(const rpc::RegisterNodeRequest &request,
RAY_CHECK_OK(status);
RAY_LOG(INFO) << "Finished registering node info, node id = " << node_id
<< ", address = " << request.node_info().node_manager_address();
RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_CHANNEL, node_id.Hex(),
request.node_info().SerializeAsString(), nullptr));
RAY_CHECK_OK(gcs_publisher_->PublishNodeInfo(node_id, request.node_info(), nullptr));
AddNode(std::make_shared<rpc::GcsNodeInfo>(request.node_info()));
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
@ -68,8 +67,7 @@ void GcsNodeManager::HandleUnregisterNode(const rpc::UnregisterNodeRequest &requ
send_reply_callback](const Status &status) {
auto on_done = [this, node_id, node_info_delta, reply,
send_reply_callback](const Status &status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(
NODE_CHANNEL, node_id.Hex(), node_info_delta->SerializeAsString(), nullptr));
RAY_CHECK_OK(gcs_publisher_->PublishNodeInfo(node_id, *node_info_delta, nullptr));
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
RAY_LOG(INFO) << "Finished unregistering node info, node id = " << node_id;
};
@ -159,14 +157,13 @@ std::shared_ptr<rpc::GcsNodeInfo> GcsNodeManager::RemoveNode(
<< " has been marked dead because the detector"
<< " has missed too many heartbeats from it. This can happen when a "
"raylet crashes unexpectedly or has lagging heartbeats.";
auto error_data_ptr =
gcs::CreateErrorTableData(type, error_message.str(), current_time_ms());
RAY_EVENT(ERROR, EL_RAY_NODE_REMOVED)
.WithField("node_id", node_id.Hex())
.WithField("ip", removed_node->node_manager_address())
<< error_message.str();
RAY_CHECK_OK(gcs_pub_sub_->Publish(ERROR_INFO_CHANNEL, node_id.Hex(),
error_data_ptr->SerializeAsString(), nullptr));
auto error_data_ptr =
gcs::CreateErrorTableData(type, error_message.str(), current_time_ms());
RAY_CHECK_OK(gcs_publisher_->PublishError(node_id.Hex(), *error_data_ptr, nullptr));
}
// Notify all listeners.
@ -189,8 +186,7 @@ void GcsNodeManager::OnNodeFailure(const NodeID &node_id) {
auto on_done = [this, node_id, node_info_delta](const Status &status) {
auto on_done = [this, node_id, node_info_delta](const Status &status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(
NODE_CHANNEL, node_id.Hex(), node_info_delta->SerializeAsString(), nullptr));
RAY_CHECK_OK(gcs_publisher_->PublishNodeInfo(node_id, *node_info_delta, nullptr));
};
RAY_CHECK_OK(gcs_table_storage_->NodeResourceTable().Delete(node_id, on_done));
};

View file

@ -36,10 +36,10 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
public:
/// Create a GcsNodeManager.
///
/// \param gcs_pub_sub GCS message publisher.
/// \param gcs_publisher GCS message publisher.
/// \param gcs_table_storage GCS table external storage accessor.
explicit GcsNodeManager(std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage);
explicit GcsNodeManager(std::shared_ptr<GcsPublisher> gcs_publisher,
std::shared_ptr<GcsTableStorage> gcs_table_storage);
/// Handle register rpc request come from raylet.
void HandleRegisterNode(const rpc::RegisterNodeRequest &request,
@ -138,9 +138,9 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
std::vector<std::function<void(std::shared_ptr<rpc::GcsNodeInfo>)>>
node_removed_listeners_;
/// A publisher for publishing gcs messages.
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
std::shared_ptr<GcsPublisher> gcs_publisher_;
/// Storage for GCS tables.
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
std::shared_ptr<GcsTableStorage> gcs_table_storage_;
// Debug info.
enum CountType {

View file

@ -96,8 +96,7 @@ void GcsObjectManager::HandleAddObjectLocation(
notification.set_spilled_node_id(spilled_node_id.Binary());
}
notification.set_size(size);
RAY_CHECK_OK(gcs_pub_sub_->Publish(OBJECT_CHANNEL, object_id.Hex(),
notification.SerializeAsString(), nullptr));
RAY_CHECK_OK(gcs_publisher_->PublishObject(object_id, notification, nullptr));
RAY_LOG(DEBUG) << "Finished adding object location, job id = "
<< object_id.TaskId().JobId() << ", object id = " << object_id
<< ", node id = " << node_id << ", task id = " << object_id.TaskId()
@ -136,9 +135,8 @@ void GcsObjectManager::HandleRemoveObjectLocation(
auto on_done = [this, object_id, node_id, reply,
send_reply_callback](const Status &status) {
if (status.ok()) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(
OBJECT_CHANNEL, object_id.Hex(),
gcs::CreateObjectLocationChange(node_id, false)->SerializeAsString(), nullptr));
RAY_CHECK_OK(gcs_publisher_->PublishObject(
object_id, *gcs::CreateObjectLocationChange(node_id, false), nullptr));
RAY_LOG(DEBUG) << "Finished removing object location, job id = "
<< object_id.TaskId().JobId() << ", object id = " << object_id
<< ", node id = " << node_id;

View file

@ -25,10 +25,10 @@ namespace gcs {
class GcsObjectManager : public rpc::ObjectInfoHandler {
public:
explicit GcsObjectManager(std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsPubSub> &gcs_pub_sub,
explicit GcsObjectManager(std::shared_ptr<GcsTableStorage> gcs_table_storage,
std::shared_ptr<GcsPublisher> &gcs_publisher,
gcs::GcsNodeManager &gcs_node_manager)
: gcs_table_storage_(std::move(gcs_table_storage)), gcs_pub_sub_(gcs_pub_sub) {
: gcs_table_storage_(std::move(gcs_table_storage)), gcs_publisher_(gcs_publisher) {
gcs_node_manager.AddNodeRemovedListener(
[this](const std::shared_ptr<rpc::GcsNodeInfo> &node) {
// All of the related actors should be reconstructed when a node is removed from
@ -141,7 +141,7 @@ class GcsObjectManager : public rpc::ObjectInfoHandler {
absl::flat_hash_map<NodeID, ObjectSet> node_to_objects_ GUARDED_BY(mutex_);
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
std::shared_ptr<GcsPublisher> gcs_publisher_;
// Debug info.
enum CountType {

View file

@ -21,10 +21,10 @@ namespace ray {
namespace gcs {
GcsResourceManager::GcsResourceManager(
instrumented_io_context &main_io_service, std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
instrumented_io_context &main_io_service, std::shared_ptr<GcsPublisher> gcs_publisher,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage, bool redis_broadcast_enabled)
: periodical_runner_(main_io_service),
gcs_pub_sub_(gcs_pub_sub),
gcs_publisher_(gcs_publisher),
gcs_table_storage_(gcs_table_storage),
redis_broadcast_enabled_(redis_broadcast_enabled),
max_broadcasting_batch_size_(
@ -89,9 +89,8 @@ void GcsResourceManager::HandleUpdateResources(
node_resource_change.mutable_updated_resources()->insert(changed_resources->begin(),
changed_resources->end());
if (redis_broadcast_enabled_) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_RESOURCE_CHANNEL, node_id.Hex(),
node_resource_change.SerializeAsString(),
nullptr));
RAY_CHECK_OK(
gcs_publisher_->PublishNodeResource(node_id, node_resource_change, nullptr));
} else {
absl::MutexLock guard(&resource_buffer_mutex_);
resources_buffer_proto_.add_batch()->mutable_change()->Swap(
@ -145,9 +144,8 @@ void GcsResourceManager::HandleDeleteResources(
node_resource_change.add_deleted_resources(resource_name);
}
if (redis_broadcast_enabled_) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_RESOURCE_CHANNEL, node_id.Hex(),
node_resource_change.SerializeAsString(),
nullptr));
RAY_CHECK_OK(
gcs_publisher_->PublishNodeResource(node_id, node_resource_change, nullptr));
} else {
absl::MutexLock guard(&resource_buffer_mutex_);
resources_buffer_proto_.add_batch()->mutable_change()->Swap(
@ -421,8 +419,7 @@ void GcsResourceManager::SendBatchedResourceUsage() {
rpc::ResourceUsageBatchData batch;
GetResourceUsageBatchForBroadcast_Locked(batch);
if (batch.ByteSizeLong() > 0) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(RESOURCES_BATCH_CHANNEL, "",
batch.SerializeAsString(), nullptr));
RAY_CHECK_OK(gcs_publisher_->PublishResourceBatch(batch, nullptr));
stats::OutboundHeartbeatSizeKB.Record(batch.ByteSizeLong() / 1024.0);
}
}

View file

@ -40,10 +40,10 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler {
/// Create a GcsResourceManager.
///
/// \param main_io_service The main event loop.
/// \param gcs_pub_sub GCS message publisher.
/// \param gcs_publisher GCS message publisher.
/// \param gcs_table_storage GCS table external storage accessor.
explicit GcsResourceManager(instrumented_io_context &main_io_service,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
std::shared_ptr<GcsPublisher> gcs_publisher,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
bool redis_broadcast_enabled);
@ -199,7 +199,7 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler {
GUARDED_BY(resource_buffer_mutex_);
/// A publisher for publishing gcs messages.
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
std::shared_ptr<GcsPublisher> gcs_publisher_;
/// Storage for GCS tables.
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
/// Whether or not to broadcast resource usage via redis.

View file

@ -26,6 +26,7 @@
#include "ray/gcs/gcs_server/gcs_worker_manager.h"
#include "ray/gcs/gcs_server/stats_handler_impl.h"
#include "ray/gcs/gcs_server/task_info_handler_impl.h"
#include "ray/pubsub/publisher.h"
namespace ray {
namespace gcs {
@ -60,12 +61,12 @@ void GcsServer::Start() {
main_service_, redis_client_->GetPrimaryContext(), [this]() { Stop(); });
gcs_redis_failure_detector_->Start();
// Init gcs pub sub instance.
gcs_pub_sub_ = std::make_shared<gcs::GcsPubSub>(redis_client_);
// Init GCS publisher instance.
std::unique_ptr<pubsub::Publisher> inner_publisher;
if (config_.grpc_pubsub_enabled) {
// Init grpc based pubsub
grpc_pubsub_publisher_ = std::make_shared<pubsub::Publisher>(
// Init grpc based pubsub on GCS.
// TODO: Move this into GcsPublisher.
inner_publisher = std::make_unique<pubsub::Publisher>(
/*channels=*/std::vector<
rpc::ChannelType>{rpc::ChannelType::WORKER_OBJECT_EVICTION,
rpc::ChannelType::WORKER_REF_REMOVED_CHANNEL,
@ -75,6 +76,8 @@ void GcsServer::Start() {
/*subscriber_timeout_ms=*/RayConfig::instance().subscriber_timeout_ms(),
/*publish_batch_size_=*/RayConfig::instance().publish_batch_size());
}
gcs_publisher_ =
std::make_shared<GcsPublisher>(redis_client_, std::move(inner_publisher));
// Init gcs table storage.
gcs_table_storage_ = std::make_shared<gcs::RedisGcsTableStorage>(redis_client_);
@ -181,8 +184,9 @@ void GcsServer::Stop() {
}
void GcsServer::InitGcsNodeManager(const GcsInitData &gcs_init_data) {
RAY_CHECK(redis_client_ && gcs_table_storage_ && gcs_pub_sub_);
gcs_node_manager_ = std::make_shared<GcsNodeManager>(gcs_pub_sub_, gcs_table_storage_);
RAY_CHECK(redis_client_ && gcs_table_storage_ && gcs_publisher_);
gcs_node_manager_ =
std::make_shared<GcsNodeManager>(gcs_publisher_, gcs_table_storage_);
// Initialize by gcs tables data.
gcs_node_manager_->Initialize(gcs_init_data);
// Register service.
@ -209,9 +213,9 @@ void GcsServer::InitGcsHeartbeatManager(const GcsInitData &gcs_init_data) {
}
void GcsServer::InitGcsResourceManager(const GcsInitData &gcs_init_data) {
RAY_CHECK(gcs_table_storage_ && gcs_pub_sub_);
RAY_CHECK(gcs_table_storage_ && gcs_publisher_);
gcs_resource_manager_ = std::make_shared<GcsResourceManager>(
main_service_, gcs_pub_sub_, gcs_table_storage_,
main_service_, gcs_publisher_, gcs_table_storage_,
!config_.grpc_based_resource_broadcast);
// Initialize by gcs tables data.
gcs_resource_manager_->Initialize(gcs_init_data);
@ -228,8 +232,8 @@ void GcsServer::InitGcsResourceScheduler() {
}
void GcsServer::InitGcsJobManager(const GcsInitData &gcs_init_data) {
RAY_CHECK(gcs_table_storage_ && gcs_pub_sub_);
gcs_job_manager_ = std::make_unique<GcsJobManager>(gcs_table_storage_, gcs_pub_sub_,
RAY_CHECK(gcs_table_storage_ && gcs_publisher_);
gcs_job_manager_ = std::make_unique<GcsJobManager>(gcs_table_storage_, gcs_publisher_,
*runtime_env_manager_);
gcs_job_manager_->Initialize(gcs_init_data);
@ -240,7 +244,7 @@ void GcsServer::InitGcsJobManager(const GcsInitData &gcs_init_data) {
}
void GcsServer::InitGcsActorManager(const GcsInitData &gcs_init_data) {
RAY_CHECK(gcs_table_storage_ && gcs_pub_sub_ && gcs_node_manager_);
RAY_CHECK(gcs_table_storage_ && gcs_publisher_ && gcs_node_manager_);
std::unique_ptr<GcsActorSchedulerInterface> scheduler;
auto schedule_failure_handler = [this](std::shared_ptr<GcsActor> actor) {
// When there are no available nodes to schedule the actor the
@ -260,17 +264,17 @@ void GcsServer::InitGcsActorManager(const GcsInitData &gcs_init_data) {
if (RayConfig::instance().gcs_actor_scheduling_enabled()) {
RAY_CHECK(gcs_resource_manager_ && gcs_resource_scheduler_);
scheduler = std::make_unique<GcsBasedActorScheduler>(
main_service_, gcs_table_storage_->ActorTable(), *gcs_node_manager_, gcs_pub_sub_,
main_service_, gcs_table_storage_->ActorTable(), *gcs_node_manager_,
gcs_resource_manager_, gcs_resource_scheduler_, schedule_failure_handler,
schedule_success_handler, raylet_client_pool_, client_factory);
} else {
scheduler = std::make_unique<RayletBasedActorScheduler>(
main_service_, gcs_table_storage_->ActorTable(), *gcs_node_manager_, gcs_pub_sub_,
main_service_, gcs_table_storage_->ActorTable(), *gcs_node_manager_,
schedule_failure_handler, schedule_success_handler, raylet_client_pool_,
client_factory);
}
gcs_actor_manager_ = std::make_shared<GcsActorManager>(
main_service_, std::move(scheduler), gcs_table_storage_, gcs_pub_sub_,
main_service_, std::move(scheduler), gcs_table_storage_, gcs_publisher_,
*runtime_env_manager_,
[this](const ActorID &actor_id) {
gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenActorDead(actor_id);
@ -320,9 +324,9 @@ void GcsServer::InitGcsPlacementGroupManager(const GcsInitData &gcs_init_data) {
}
void GcsServer::InitObjectManager(const GcsInitData &gcs_init_data) {
RAY_CHECK(gcs_table_storage_ && gcs_pub_sub_ && gcs_node_manager_);
RAY_CHECK(gcs_table_storage_ && gcs_publisher_ && gcs_node_manager_);
gcs_object_manager_.reset(
new GcsObjectManager(gcs_table_storage_, gcs_pub_sub_, *gcs_node_manager_));
new GcsObjectManager(gcs_table_storage_, gcs_publisher_, *gcs_node_manager_));
// Initialize by gcs tables data.
gcs_object_manager_->Initialize(gcs_init_data);
// Register service.
@ -347,9 +351,9 @@ void GcsServer::StoreGcsServerAddressInRedis() {
}
void GcsServer::InitTaskInfoHandler() {
RAY_CHECK(gcs_table_storage_ && gcs_pub_sub_);
RAY_CHECK(gcs_table_storage_ && gcs_publisher_);
task_info_handler_.reset(
new rpc::DefaultTaskInfoHandler(gcs_table_storage_, gcs_pub_sub_));
new rpc::DefaultTaskInfoHandler(gcs_table_storage_, gcs_publisher_));
// Register service.
task_info_service_.reset(
new rpc::TaskInfoGrpcService(main_service_, *task_info_handler_));
@ -424,7 +428,7 @@ void GcsServer::InitRuntimeEnvManager() {
void GcsServer::InitGcsWorkerManager() {
gcs_worker_manager_ =
std::make_unique<GcsWorkerManager>(gcs_table_storage_, gcs_pub_sub_);
std::make_unique<GcsWorkerManager>(gcs_table_storage_, gcs_publisher_);
// Register service.
worker_info_service_.reset(
new rpc::WorkerInfoGrpcService(main_service_, *gcs_worker_manager_));
@ -508,7 +512,7 @@ void GcsServer::PrintDebugInfo() {
<< gcs_actor_manager_->DebugString() << "\n"
<< gcs_object_manager_->DebugString() << "\n"
<< gcs_placement_group_manager_->DebugString() << "\n"
<< gcs_pub_sub_->DebugString() << "\n"
<< gcs_publisher_->DebugString() << "\n"
<< ((rpc::DefaultTaskInfoHandler *)task_info_handler_.get())->DebugString();
if (config_.grpc_based_resource_broadcast) {

View file

@ -28,7 +28,6 @@
#include "ray/gcs/gcs_server/grpc_based_resource_broadcaster.h"
#include "ray/gcs/pubsub/gcs_pub_sub.h"
#include "ray/gcs/redis_client.h"
#include "ray/pubsub/publisher.h"
#include "ray/rpc/client_call.h"
#include "ray/rpc/gcs_server/gcs_rpc_server.h"
#include "ray/rpc/node_manager/node_manager_client_pool.h"
@ -214,9 +213,7 @@ class GcsServer {
/// Backend client.
std::shared_ptr<RedisClient> redis_client_;
/// A publisher for publishing gcs messages.
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
/// Grpc based pubsub.
std::shared_ptr<pubsub::Publisher> grpc_pubsub_publisher_;
std::shared_ptr<GcsPublisher> gcs_publisher_;
/// Grpc based pubsub's periodical runner.
PeriodicalRunner pubsub_periodical_runner_;
/// The gcs table storage.

View file

@ -13,6 +13,7 @@
// limitations under the License.
#include "ray/gcs/gcs_server/gcs_worker_manager.h"
#include "ray/stats/stats.h"
namespace ray {
@ -57,14 +58,11 @@ void GcsWorkerManager::HandleReportWorkerFailure(
stats::UnintentionalWorkerFailures.Record(1);
// Only publish worker_id and raylet_id in address as they are the only fields used
// by sub clients.
auto worker_failure_delta = std::make_shared<rpc::WorkerDeltaData>();
worker_failure_delta->set_worker_id(
worker_failure_data->worker_address().worker_id());
worker_failure_delta->set_raylet_id(
worker_failure_data->worker_address().raylet_id());
RAY_CHECK_OK(gcs_pub_sub_->Publish(WORKER_CHANNEL, worker_id.Hex(),
worker_failure_delta->SerializeAsString(),
nullptr));
rpc::WorkerDeltaData worker_failure;
worker_failure.set_worker_id(worker_failure_data->worker_address().worker_id());
worker_failure.set_raylet_id(worker_failure_data->worker_address().raylet_id());
RAY_CHECK_OK(
gcs_publisher_->PublishWorkerFailure(worker_id, worker_failure, nullptr));
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};

View file

@ -25,8 +25,8 @@ namespace gcs {
class GcsWorkerManager : public rpc::WorkerInfoHandler {
public:
explicit GcsWorkerManager(std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsPubSub> &gcs_pub_sub)
: gcs_table_storage_(gcs_table_storage), gcs_pub_sub_(gcs_pub_sub) {}
std::shared_ptr<GcsPublisher> &gcs_publisher)
: gcs_table_storage_(gcs_table_storage), gcs_publisher_(gcs_publisher) {}
void HandleReportWorkerFailure(const rpc::ReportWorkerFailureRequest &request,
rpc::ReportWorkerFailureReply *reply,
@ -49,7 +49,7 @@ class GcsWorkerManager : public rpc::WorkerInfoHandler {
private:
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
std::shared_ptr<GcsPublisher> gcs_publisher_;
std::vector<std::function<void(std::shared_ptr<WorkerTableData>)>>
worker_dead_listeners_;
};

View file

@ -78,9 +78,8 @@ void DefaultTaskInfoHandler::HandleAddTaskLease(const AddTaskLeaseRequest &reque
RAY_LOG(ERROR) << "Failed to add task lease, job id = " << task_id.JobId()
<< ", task id = " << task_id << ", node id = " << node_id;
} else {
RAY_CHECK_OK(gcs_pub_sub_->Publish(TASK_LEASE_CHANNEL, task_id.Hex(),
request.task_lease_data().SerializeAsString(),
nullptr));
RAY_CHECK_OK(
gcs_publisher_->PublishTaskLease(task_id, request.task_lease_data(), nullptr));
RAY_LOG(DEBUG) << "Finished adding task lease, job id = " << task_id.JobId()
<< ", task id = " << task_id << ", node id = " << node_id;
}

View file

@ -25,8 +25,8 @@ namespace rpc {
class DefaultTaskInfoHandler : public rpc::TaskInfoHandler {
public:
explicit DefaultTaskInfoHandler(std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsPubSub> &gcs_pub_sub)
: gcs_table_storage_(gcs_table_storage), gcs_pub_sub_(gcs_pub_sub) {}
std::shared_ptr<gcs::GcsPublisher> &gcs_publisher)
: gcs_table_storage_(gcs_table_storage), gcs_publisher_(gcs_publisher) {}
void HandleAddTask(const AddTaskRequest &request, AddTaskReply *reply,
SendReplyCallback send_reply_callback) override;
@ -48,7 +48,7 @@ class DefaultTaskInfoHandler : public rpc::TaskInfoHandler {
private:
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
std::shared_ptr<gcs::GcsPubSub> &gcs_pub_sub_;
std::shared_ptr<gcs::GcsPublisher> &gcs_publisher_;
// Debug info.
enum CountType {

View file

@ -102,11 +102,12 @@ class GcsActorManagerTest : public ::testing::Test {
worker_client_ = std::make_shared<MockWorkerClient>(io_service_);
runtime_env_mgr_ =
std::make_unique<ray::RuntimeEnvManager>([](auto, auto f) { f(true); });
gcs_pub_sub_ = std::make_shared<GcsServerMocker::MockGcsPubSub>(redis_client_);
gcs_publisher_ = std::make_shared<gcs::GcsPublisher>(
std::make_unique<GcsServerMocker::MockGcsPubSub>(redis_client_));
store_client_ = std::make_shared<gcs::InMemoryStoreClient>(io_service_);
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
gcs_actor_manager_.reset(new gcs::GcsActorManager(
io_service_, mock_actor_scheduler_, gcs_table_storage_, gcs_pub_sub_,
io_service_, mock_actor_scheduler_, gcs_table_storage_, gcs_publisher_,
*runtime_env_mgr_, [](const ActorID &actor_id) {},
[this](const JobID &job_id) { return job_namespace_table_[job_id]; },
[this](std::function<void(void)> fn, boost::posix_time::milliseconds delay) {
@ -204,7 +205,7 @@ class GcsActorManagerTest : public ::testing::Test {
std::shared_ptr<MockWorkerClient> worker_client_;
std::unordered_map<JobID, std::string> job_namespace_table_;
std::unique_ptr<gcs::GcsActorManager> gcs_actor_manager_;
std::shared_ptr<GcsServerMocker::MockGcsPubSub> gcs_pub_sub_;
std::shared_ptr<gcs::GcsPublisher> gcs_publisher_;
std::shared_ptr<gcs::RedisClient> redis_client_;
std::unique_ptr<ray::RuntimeEnvManager> runtime_env_mgr_;
const std::chrono::milliseconds timeout_ms_{2000};

View file

@ -21,7 +21,6 @@
#include "mock/ray/gcs/gcs_server/gcs_node_manager.h"
#include "mock/ray/raylet_client/raylet_client.h"
#include "mock/ray/pubsub/subscriber.h"
#include "mock/ray/gcs/pubsub/gcs_pub_sub.h"
#include "mock/ray/rpc/worker/core_worker_client.h"
// clang-format on
using namespace ::testing;
@ -39,13 +38,12 @@ class GcsActorSchedulerTest : public Test {
store_client = std::make_shared<MockStoreClient>();
actor_table = std::make_unique<GcsActorTable>(store_client);
gcs_node_manager = std::make_unique<MockGcsNodeManager>();
pub_sub = std::make_shared<MockGcsPubSub>();
raylet_client = std::make_shared<MockRayletClientInterface>();
core_worker_client = std::make_shared<rpc::MockCoreWorkerClientInterface>();
client_pool = std::make_shared<rpc::NodeManagerClientPool>(
[this](const rpc::Address &) { return raylet_client; });
actor_scheduler = std::make_unique<RayletBasedActorScheduler>(
io_context, *actor_table, *gcs_node_manager, pub_sub,
io_context, *actor_table, *gcs_node_manager,
[this](auto a) { schedule_failure_handler(a); },
[this](auto a, const rpc::PushTaskReply) { schedule_success_handler(a); },
client_pool, [this](const rpc::Address &) { return core_worker_client; });
@ -62,7 +60,6 @@ class GcsActorSchedulerTest : public Test {
std::unique_ptr<GcsActorTable> actor_table;
std::unique_ptr<GcsActorScheduler> actor_scheduler;
std::unique_ptr<MockGcsNodeManager> gcs_node_manager;
std::shared_ptr<MockGcsPubSub> pub_sub;
std::shared_ptr<rpc::MockCoreWorkerClientInterface> core_worker_client;
std::shared_ptr<rpc::NodeManagerClientPool> client_pool;
MockCallback schedule_failure_handler;

View file

@ -29,23 +29,24 @@ class GcsBasedActorSchedulerTest : public ::testing::Test {
void SetUp() override {
raylet_client_ = std::make_shared<GcsServerMocker::MockRayletClient>();
worker_client_ = std::make_shared<GcsServerMocker::MockWorkerClient>();
gcs_pub_sub_ = std::make_shared<GcsServerMocker::MockGcsPubSub>(redis_client_);
gcs_publisher_ = std::make_shared<gcs::GcsPublisher>(
std::make_unique<GcsServerMocker::MockGcsPubSub>(redis_client_));
gcs_table_storage_ = std::make_shared<gcs::RedisGcsTableStorage>(redis_client_);
gcs_node_manager_ =
std::make_shared<gcs::GcsNodeManager>(gcs_pub_sub_, gcs_table_storage_);
std::make_shared<gcs::GcsNodeManager>(gcs_publisher_, gcs_table_storage_);
store_client_ = std::make_shared<gcs::InMemoryStoreClient>(io_service_);
gcs_actor_table_ =
std::make_shared<GcsServerMocker::MockedGcsActorTable>(store_client_);
raylet_client_pool_ = std::make_shared<rpc::NodeManagerClientPool>(
[this](const rpc::Address &addr) { return raylet_client_; });
gcs_resource_manager_ = std::make_shared<gcs::GcsResourceManager>(
io_service_, gcs_pub_sub_, gcs_table_storage_, true);
io_service_, gcs_publisher_, gcs_table_storage_, true);
auto resource_scheduler =
std::make_shared<gcs::GcsResourceScheduler>(*gcs_resource_manager_);
gcs_actor_scheduler_ =
std::make_shared<GcsServerMocker::MockedGcsBasedActorScheduler>(
io_service_, *gcs_actor_table_, *gcs_node_manager_, gcs_pub_sub_,
gcs_resource_manager_, resource_scheduler,
io_service_, *gcs_actor_table_, *gcs_node_manager_, gcs_resource_manager_,
resource_scheduler,
/*schedule_failure_handler=*/
[this](std::shared_ptr<gcs::GcsActor> actor) {
failure_actors_.emplace_back(std::move(actor));
@ -99,7 +100,7 @@ class GcsBasedActorSchedulerTest : public ::testing::Test {
std::shared_ptr<GcsServerMocker::MockedGcsBasedActorScheduler> gcs_actor_scheduler_;
std::vector<std::shared_ptr<gcs::GcsActor>> success_actors_;
std::vector<std::shared_ptr<gcs::GcsActor>> failure_actors_;
std::shared_ptr<GcsServerMocker::MockGcsPubSub> gcs_pub_sub_;
std::shared_ptr<gcs::GcsPublisher> gcs_publisher_;
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
std::shared_ptr<gcs::RedisClient> redis_client_;
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool_;

View file

@ -22,17 +22,18 @@ namespace ray {
class GcsNodeManagerTest : public ::testing::Test {
public:
GcsNodeManagerTest() {
gcs_pub_sub_ = std::make_shared<GcsServerMocker::MockGcsPubSub>(redis_client_);
gcs_publisher_ = std::make_shared<gcs::GcsPublisher>(
std::make_unique<GcsServerMocker::MockGcsPubSub>(redis_client_));
}
protected:
std::shared_ptr<GcsServerMocker::MockGcsPubSub> gcs_pub_sub_;
std::shared_ptr<gcs::RedisClient> redis_client_;
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
std::shared_ptr<gcs::GcsPublisher> gcs_publisher_;
};
TEST_F(GcsNodeManagerTest, TestManagement) {
gcs::GcsNodeManager node_manager(gcs_pub_sub_, gcs_table_storage_);
gcs::GcsNodeManager node_manager(gcs_publisher_, gcs_table_storage_);
// Test Add/Get/Remove functionality.
auto node = Mocker::GenNodeInfo();
auto node_id = NodeID::FromBinary(node->node_id());
@ -45,7 +46,7 @@ TEST_F(GcsNodeManagerTest, TestManagement) {
}
TEST_F(GcsNodeManagerTest, TestListener) {
gcs::GcsNodeManager node_manager(gcs_pub_sub_, gcs_table_storage_);
gcs::GcsNodeManager node_manager(gcs_publisher_, gcs_table_storage_);
// Test AddNodeAddedListener.
int node_count = 1000;
std::vector<std::shared_ptr<rpc::GcsNodeInfo>> added_nodes;

View file

@ -24,9 +24,9 @@ namespace ray {
class MockedGcsObjectManager : public gcs::GcsObjectManager {
public:
explicit MockedGcsObjectManager(std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsPubSub> &gcs_pub_sub,
std::shared_ptr<gcs::GcsPublisher> &gcs_publisher,
gcs::GcsNodeManager &gcs_node_manager)
: gcs::GcsObjectManager(gcs_table_storage, gcs_pub_sub, gcs_node_manager) {}
: gcs::GcsObjectManager(gcs_table_storage, gcs_publisher, gcs_node_manager) {}
public:
void AddObjectsLocation(const NodeID &node_id,
@ -56,9 +56,9 @@ class GcsObjectManagerTest : public ::testing::Test {
void SetUp() override {
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
gcs_node_manager_ =
std::make_shared<gcs::GcsNodeManager>(gcs_pub_sub_, gcs_table_storage_);
std::make_shared<gcs::GcsNodeManager>(gcs_publisher_, gcs_table_storage_);
gcs_object_manager_ = std::make_shared<MockedGcsObjectManager>(
gcs_table_storage_, gcs_pub_sub_, *gcs_node_manager_);
gcs_table_storage_, gcs_publisher_, *gcs_node_manager_);
GenTestData();
}
@ -85,7 +85,7 @@ class GcsObjectManagerTest : public ::testing::Test {
protected:
instrumented_io_context io_service_;
std::shared_ptr<gcs::GcsNodeManager> gcs_node_manager_;
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
std::shared_ptr<gcs::GcsPublisher> gcs_publisher_;
std::shared_ptr<MockedGcsObjectManager> gcs_object_manager_;
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;

View file

@ -71,7 +71,8 @@ class GcsPlacementGroupManagerTest : public ::testing::Test {
public:
GcsPlacementGroupManagerTest()
: mock_placement_group_scheduler_(new MockPlacementGroupScheduler()) {
gcs_pub_sub_ = std::make_shared<GcsServerMocker::MockGcsPubSub>(redis_client_);
gcs_publisher_ = std::make_shared<GcsPublisher>(
std::make_unique<GcsServerMocker::MockGcsPubSub>(redis_client_));
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
gcs_resource_manager_ =
std::make_shared<gcs::GcsResourceManager>(io_service_, nullptr, nullptr, true);
@ -147,7 +148,7 @@ class GcsPlacementGroupManagerTest : public ::testing::Test {
instrumented_io_context io_service_;
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
std::shared_ptr<gcs::GcsResourceManager> gcs_resource_manager_;
std::shared_ptr<GcsServerMocker::MockGcsPubSub> gcs_pub_sub_;
std::shared_ptr<gcs::GcsPublisher> gcs_publisher_;
std::shared_ptr<gcs::RedisClient> redis_client_;
};

View file

@ -39,13 +39,14 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
raylet_clients_.push_back(std::make_shared<GcsServerMocker::MockRayletClient>());
}
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
gcs_pub_sub_ = std::make_shared<GcsServerMocker::MockGcsPubSub>(redis_client_);
gcs_publisher_ = std::make_shared<gcs::GcsPublisher>(
std::make_unique<GcsServerMocker::MockGcsPubSub>(redis_client_));
gcs_resource_manager_ =
std::make_shared<gcs::GcsResourceManager>(io_service_, nullptr, nullptr, true);
gcs_resource_scheduler_ =
std::make_shared<gcs::GcsResourceScheduler>(*gcs_resource_manager_);
gcs_node_manager_ =
std::make_shared<gcs::GcsNodeManager>(gcs_pub_sub_, gcs_table_storage_);
std::make_shared<gcs::GcsNodeManager>(gcs_publisher_, gcs_table_storage_);
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
store_client_ = std::make_shared<gcs::InMemoryStoreClient>(io_service_);
raylet_client_pool_ = std::make_shared<rpc::NodeManagerClientPool>(
@ -219,7 +220,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
GUARDED_BY(placement_group_requests_mutex_);
std::vector<std::shared_ptr<gcs::GcsPlacementGroup>> failure_placement_groups_
GUARDED_BY(placement_group_requests_mutex_);
std::shared_ptr<GcsServerMocker::MockGcsPubSub> gcs_pub_sub_;
std::shared_ptr<gcs::GcsPublisher> gcs_publisher_;
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
std::shared_ptr<gcs::RedisClient> redis_client_;
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool_;

View file

@ -451,7 +451,7 @@ struct GcsServerMocker {
MockGcsPubSub(std::shared_ptr<gcs::RedisClient> redis_client)
: GcsPubSub(redis_client) {}
Status Publish(const std::string &channel, const std::string &id,
Status Publish(std::string_view channel, const std::string &id,
const std::string &data, const gcs::StatusCallback &done) override {
return Status::OK();
}

View file

@ -25,10 +25,11 @@ class RayletBasedActorSchedulerTest : public ::testing::Test {
void SetUp() override {
raylet_client_ = std::make_shared<GcsServerMocker::MockRayletClient>();
worker_client_ = std::make_shared<GcsServerMocker::MockWorkerClient>();
gcs_pub_sub_ = std::make_shared<GcsServerMocker::MockGcsPubSub>(redis_client_);
gcs_publisher_ = std::make_shared<gcs::GcsPublisher>(
std::make_unique<GcsServerMocker::MockGcsPubSub>(redis_client_));
gcs_table_storage_ = std::make_shared<gcs::RedisGcsTableStorage>(redis_client_);
gcs_node_manager_ =
std::make_shared<gcs::GcsNodeManager>(gcs_pub_sub_, gcs_table_storage_);
std::make_shared<gcs::GcsNodeManager>(gcs_publisher_, gcs_table_storage_);
store_client_ = std::make_shared<gcs::InMemoryStoreClient>(io_service_);
gcs_actor_table_ =
std::make_shared<GcsServerMocker::MockedGcsActorTable>(store_client_);
@ -36,7 +37,7 @@ class RayletBasedActorSchedulerTest : public ::testing::Test {
[this](const rpc::Address &addr) { return raylet_client_; });
gcs_actor_scheduler_ =
std::make_shared<GcsServerMocker::MockedRayletBasedActorScheduler>(
io_service_, *gcs_actor_table_, *gcs_node_manager_, gcs_pub_sub_,
io_service_, *gcs_actor_table_, *gcs_node_manager_,
/*schedule_failure_handler=*/
[this](std::shared_ptr<gcs::GcsActor> actor) {
failure_actors_.emplace_back(std::move(actor));
@ -61,7 +62,7 @@ class RayletBasedActorSchedulerTest : public ::testing::Test {
std::shared_ptr<GcsServerMocker::MockedRayletBasedActorScheduler> gcs_actor_scheduler_;
std::vector<std::shared_ptr<gcs::GcsActor>> success_actors_;
std::vector<std::shared_ptr<gcs::GcsActor>> failure_actors_;
std::shared_ptr<GcsServerMocker::MockGcsPubSub> gcs_pub_sub_;
std::shared_ptr<gcs::GcsPublisher> gcs_publisher_;
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
std::shared_ptr<gcs::RedisClient> redis_client_;
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool_;

View file

@ -14,10 +14,12 @@
#include "ray/gcs/pubsub/gcs_pub_sub.h"
#include "absl/strings/str_cat.h"
namespace ray {
namespace gcs {
Status GcsPubSub::Publish(const std::string &channel, const std::string &id,
Status GcsPubSub::Publish(std::string_view channel, const std::string &id,
const std::string &data, const StatusCallback &done) {
rpc::PubSubMessage message;
message.set_id(id);
@ -30,22 +32,20 @@ Status GcsPubSub::Publish(const std::string &channel, const std::string &id,
};
return redis_client_->GetPrimaryContext()->PublishAsync(
GenChannelPattern(channel, boost::optional<std::string>(id)),
message.SerializeAsString(), on_done);
GenChannelPattern(channel, id), message.SerializeAsString(), on_done);
}
Status GcsPubSub::Subscribe(const std::string &channel, const std::string &id,
Status GcsPubSub::Subscribe(std::string_view channel, const std::string &id,
const Callback &subscribe, const StatusCallback &done) {
return SubscribeInternal(channel, subscribe, done, false,
boost::optional<std::string>(id));
return SubscribeInternal(channel, subscribe, done, id);
}
Status GcsPubSub::SubscribeAll(const std::string &channel, const Callback &subscribe,
Status GcsPubSub::SubscribeAll(std::string_view channel, const Callback &subscribe,
const StatusCallback &done) {
return SubscribeInternal(channel, subscribe, done, true);
return SubscribeInternal(channel, subscribe, done, std::nullopt);
}
Status GcsPubSub::Unsubscribe(const std::string &channel_name, const std::string &id) {
Status GcsPubSub::Unsubscribe(std::string_view channel_name, const std::string &id) {
std::string pattern = GenChannelPattern(channel_name, id);
absl::MutexLock lock(&mutex_);
@ -59,10 +59,9 @@ Status GcsPubSub::Unsubscribe(const std::string &channel_name, const std::string
return ExecuteCommandIfPossible(channel->first, channel->second);
}
Status GcsPubSub::SubscribeInternal(const std::string &channel_name,
Status GcsPubSub::SubscribeInternal(std::string_view channel_name,
const Callback &subscribe, const StatusCallback &done,
bool is_sub_or_unsub_all,
const boost::optional<std::string> &id) {
const std::optional<std::string_view> &id) {
std::string pattern = GenChannelPattern(channel_name, id);
absl::MutexLock lock(&mutex_);
@ -74,7 +73,8 @@ Status GcsPubSub::SubscribeInternal(const std::string &channel_name,
}
// Add the SUBSCRIBE command to the queue.
channel->second.command_queue.push_back(Command(subscribe, done, is_sub_or_unsub_all));
channel->second.command_queue.push_back(
Command(subscribe, done, /*is_sub_or_unsub_all=*/!id.has_value()));
total_commands_queued_++;
// Process the first command on the queue, if possible.
@ -178,19 +178,18 @@ Status GcsPubSub::ExecuteCommandIfPossible(const std::string &channel_key,
return status;
}
std::string GcsPubSub::GenChannelPattern(const std::string &channel,
const boost::optional<std::string> &id) {
std::stringstream pattern;
pattern << channel << ":";
std::string GcsPubSub::GenChannelPattern(std::string_view channel,
const std::optional<std::string_view> &id) {
std::string pattern = absl::StrCat(channel, ":");
if (id) {
pattern << *id;
absl::StrAppend(&pattern, *id);
} else {
pattern << "*";
absl::StrAppend(&pattern, "*");
}
return pattern.str();
return pattern;
}
bool GcsPubSub::IsUnsubscribed(const std::string &channel, const std::string &id) {
bool GcsPubSub::IsUnsubscribed(std::string_view channel, const std::string &id) {
std::string pattern = GenChannelPattern(channel, id);
absl::MutexLock lock(&mutex_);
@ -206,5 +205,63 @@ std::string GcsPubSub::DebugString() const {
return stream.str();
}
Status GcsPublisher::PublishObject(const ObjectID &id,
const rpc::ObjectLocationChange &message,
const StatusCallback &done) {
return pubsub_->Publish(OBJECT_CHANNEL, id.Hex(), message.SerializeAsString(), done);
}
Status GcsPublisher::PublishActor(const ActorID &id, const rpc::ActorTableData &message,
const StatusCallback &done) {
return pubsub_->Publish(ACTOR_CHANNEL, id.Hex(), message.SerializeAsString(), done);
}
Status GcsPublisher::PublishJob(const JobID &id, const rpc::JobTableData &message,
const StatusCallback &done) {
return pubsub_->Publish(JOB_CHANNEL, id.Hex(), message.SerializeAsString(), done);
}
Status GcsPublisher::PublishNodeInfo(const NodeID &id, const rpc::GcsNodeInfo &message,
const StatusCallback &done) {
return pubsub_->Publish(NODE_CHANNEL, id.Hex(), message.SerializeAsString(), done);
}
Status GcsPublisher::PublishNodeResource(const NodeID &id,
const rpc::NodeResourceChange &message,
const StatusCallback &done) {
return pubsub_->Publish(NODE_RESOURCE_CHANNEL, id.Hex(), message.SerializeAsString(),
done);
}
Status GcsPublisher::PublishResourceBatch(const rpc::ResourceUsageBatchData &message,
const StatusCallback &done) {
return pubsub_->Publish(RESOURCES_BATCH_CHANNEL, "", message.SerializeAsString(), done);
}
Status GcsPublisher::PublishWorkerFailure(const WorkerID &id,
const rpc::WorkerDeltaData &message,
const StatusCallback &done) {
return pubsub_->Publish(WORKER_CHANNEL, id.Hex(), message.SerializeAsString(), done);
}
Status GcsPublisher::PublishTaskLease(const TaskID &id, const rpc::TaskLeaseData &message,
const StatusCallback &done) {
return pubsub_->Publish(TASK_LEASE_CHANNEL, id.Hex(), message.SerializeAsString(),
done);
}
Status GcsPublisher::PublishError(const std::string &id,
const rpc::ErrorTableData &message,
const StatusCallback &done) {
return pubsub_->Publish(ERROR_INFO_CHANNEL, id, message.SerializeAsString(), done);
}
std::string GcsPublisher::DebugString() const {
if (pubsub_) {
return pubsub_->DebugString();
}
return "GcsPublisher {}";
}
} // namespace gcs
} // namespace ray

View file

@ -14,26 +14,32 @@
#pragma once
#include <optional>
#include <string>
#include <string_view>
#include "absl/container/flat_hash_map.h"
#include "absl/synchronization/mutex.h"
#include "ray/gcs/callback.h"
#include "ray/gcs/redis_client.h"
#include "ray/gcs/redis_context.h"
#include "ray/pubsub/publisher.h"
#include "src/ray/protobuf/gcs.pb.h"
#include "src/ray/protobuf/gcs_service.pb.h"
namespace ray {
namespace gcs {
#define JOB_CHANNEL "JOB"
#define NODE_CHANNEL "NODE"
#define NODE_RESOURCE_CHANNEL "NODE_RESOURCE"
#define ACTOR_CHANNEL "ACTOR"
#define WORKER_CHANNEL "WORKER"
#define OBJECT_CHANNEL "OBJECT"
#define TASK_CHANNEL "TASK"
#define TASK_LEASE_CHANNEL "TASK_LEASE"
#define RESOURCES_BATCH_CHANNEL "RESOURCES_BATCH"
#define ERROR_INFO_CHANNEL "ERROR_INFO"
inline constexpr std::string_view JOB_CHANNEL = "JOB";
inline constexpr std::string_view NODE_CHANNEL = "NODE";
inline constexpr std::string_view NODE_RESOURCE_CHANNEL = "NODE_RESOURCE";
inline constexpr std::string_view ACTOR_CHANNEL = "ACTOR";
inline constexpr std::string_view WORKER_CHANNEL = "WORKER";
inline constexpr std::string_view OBJECT_CHANNEL = "OBJECT";
inline constexpr std::string_view TASK_CHANNEL = "TASK";
inline constexpr std::string_view TASK_LEASE_CHANNEL = "TASK_LEASE";
inline constexpr std::string_view RESOURCES_BATCH_CHANNEL = "RESOURCES_BATCH";
inline constexpr std::string_view ERROR_INFO_CHANNEL = "ERROR_INFO";
/// \class GcsPubSub
///
@ -44,7 +50,7 @@ class GcsPubSub {
/// The callback is called when a subscription message is received.
using Callback = std::function<void(const std::string &id, const std::string &data)>;
explicit GcsPubSub(std::shared_ptr<RedisClient> redis_client)
explicit GcsPubSub(const std::shared_ptr<RedisClient> &redis_client)
: redis_client_(redis_client), total_commands_queued_(0) {}
virtual ~GcsPubSub() = default;
@ -56,7 +62,7 @@ class GcsPubSub {
/// \param data The data of message to be published to redis.
/// \param done Callback that will be called when the message is published to redis.
/// \return Status
virtual Status Publish(const std::string &channel, const std::string &id,
virtual Status Publish(std::string_view channel, const std::string &id,
const std::string &data, const StatusCallback &done);
/// Subscribe to messages with the specified ID under the specified channel.
@ -67,7 +73,7 @@ class GcsPubSub {
/// received.
/// \param done Callback that will be called when subscription is complete.
/// \return Status
Status Subscribe(const std::string &channel, const std::string &id,
Status Subscribe(std::string_view channel, const std::string &id,
const Callback &subscribe, const StatusCallback &done);
/// Subscribe to messages with the specified channel.
@ -77,7 +83,7 @@ class GcsPubSub {
/// received.
/// \param done Callback that will be called when subscription is complete.
/// \return Status
Status SubscribeAll(const std::string &channel, const Callback &subscribe,
Status SubscribeAll(std::string_view channel, const Callback &subscribe,
const StatusCallback &done);
/// Unsubscribe to messages with the specified ID under the specified channel.
@ -85,14 +91,14 @@ class GcsPubSub {
/// \param channel The channel to unsubscribe from redis.
/// \param id The id of message to be unsubscribed from redis.
/// \return Status
Status Unsubscribe(const std::string &channel, const std::string &id);
Status Unsubscribe(std::string_view channel, const std::string &id);
/// Check if the specified ID under the specified channel is unsubscribed.
///
/// \param channel The channel to unsubscribe from redis.
/// \param id The id of message to be unsubscribed from redis.
/// \return Whether the specified ID under the specified channel is unsubscribed.
bool IsUnsubscribed(const std::string &channel, const std::string &id);
bool IsUnsubscribed(std::string_view channel, const std::string &id);
std::string DebugString() const;
@ -155,12 +161,12 @@ class GcsPubSub {
GcsPubSub::Channel &channel)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status SubscribeInternal(const std::string &channel_name, const Callback &subscribe,
const StatusCallback &done, bool is_sub_or_unsub_all,
const boost::optional<std::string> &id = boost::none);
Status SubscribeInternal(std::string_view channel_name, const Callback &subscribe,
const StatusCallback &done,
const std::optional<std::string_view> &id);
std::string GenChannelPattern(const std::string &channel,
const boost::optional<std::string> &id);
std::string GenChannelPattern(std::string_view channel,
const std::optional<std::string_view> &id);
std::shared_ptr<RedisClient> redis_client_;
@ -172,5 +178,80 @@ class GcsPubSub {
size_t total_commands_queued_ GUARDED_BY(mutex_);
};
/// \class GcsPublisher
///
/// Supports publishing per-entity data and errors from GCS. Thread safe.
class GcsPublisher {
public:
/// Initializes GcsPublisher with both Redis and GCS based publishers.
/// Publish*() member functions below would be incrementally converted to use the GCS
/// based publisher, if available.
GcsPublisher(const std::shared_ptr<RedisClient> &redis_client,
std::unique_ptr<pubsub::Publisher> publisher)
: pubsub_(std::make_unique<GcsPubSub>(redis_client)),
publisher_(std::move(publisher)) {}
/// Test only.
/// Initializes GcsPublisher with GcsPubSub, usually a mock.
/// TODO: remove this constructor and inject mock / fake from the other constructor.
explicit GcsPublisher(std::unique_ptr<GcsPubSub> pubsub) : pubsub_(std::move(pubsub)) {}
/// Each publishing method below publishes to a different "channel".
/// ID is the entity which the message is associated with, e.g. ActorID for Actor data.
/// Subscribers receive typed messages for the ID that they subscribe to.
///
/// The full stream of NodeResource and Error channels are needed by its subscribers.
/// But for other channels, subscribers should only need the latest data.
///
/// TODO: Verify GCS pubsub satisfies the streaming semantics.
/// TODO: Implement optimization for channels where only latest data per ID is useful.
/// Uses Redis pubsub.
Status PublishActor(const ActorID &id, const rpc::ActorTableData &message,
const StatusCallback &done);
/// Uses Redis pubsub.
Status PublishJob(const JobID &id, const rpc::JobTableData &message,
const StatusCallback &done);
/// Uses Redis pubsub.
Status PublishNodeInfo(const NodeID &id, const rpc::GcsNodeInfo &message,
const StatusCallback &done);
/// Uses Redis pubsub.
Status PublishNodeResource(const NodeID &id, const rpc::NodeResourceChange &message,
const StatusCallback &done);
/// Actually rpc::WorkerDeltaData is not a delta message.
/// Uses Redis pubsub.
Status PublishWorkerFailure(const WorkerID &id, const rpc::WorkerDeltaData &message,
const StatusCallback &done);
/// Uses Redis pubsub.
Status PublishTaskLease(const TaskID &id, const rpc::TaskLeaseData &message,
const StatusCallback &done);
/// Uses Redis pubsub.
Status PublishError(const std::string &id, const rpc::ErrorTableData &message,
const StatusCallback &done);
/// TODO: remove once it is converted to GRPC-based push broadcasting.
/// Uses Redis pubsub.
Status PublishResourceBatch(const rpc::ResourceUsageBatchData &message,
const StatusCallback &done);
/// TODO: This belongs to a deprecated codepath. Remove this and its callsites.
/// Uses Redis pubsub.
Status PublishObject(const ObjectID &id, const rpc::ObjectLocationChange &message,
const StatusCallback &done);
/// Prints debugging info for the publisher.
std::string DebugString() const;
private:
const std::unique_ptr<GcsPubSub> pubsub_;
const std::unique_ptr<pubsub::Publisher> publisher_;
};
} // namespace gcs
} // namespace ray

View file

@ -76,12 +76,12 @@ class GcsPubSubTest : public ::testing::Test {
absl::MutexLock lock(&vector_mutex_);
result.push_back(std::make_pair(id, data));
};
RAY_CHECK_OK((pub_sub_->SubscribeAll(channel, subscribe, done)));
RAY_CHECK_OK(pub_sub_->SubscribeAll(channel, subscribe, done));
WaitReady(promise.get_future(), timeout_ms_);
}
bool Unsubscribe(const std::string &channel, const std::string &id) {
return pub_sub_->Unsubscribe(channel, id).ok();
void Unsubscribe(const std::string &channel, const std::string &id) {
RAY_CHECK_OK(pub_sub_->Unsubscribe(channel, id));
}
bool Publish(const std::string &channel, const std::string &id,

View file

@ -156,11 +156,11 @@ class PublisherInterface {
///
/// \param channel_type The type of the channel.
/// \param subscriber_id The node id of the subscriber.
/// \param key_id_binary The key_id that the subscriber is subscribing to.
/// \param key_id The key_id that the subscriber is subscribing to.
/// \return True if registration is new. False otherwise.
virtual bool RegisterSubscription(const rpc::ChannelType channel_type,
const SubscriberID &subscriber_id,
const std::string &key_id_binary) = 0;
const std::string &key_id) = 0;
/// Publish the given object id to subscribers.
///
@ -172,20 +172,20 @@ class PublisherInterface {
/// It will invoke the failure callback on the subscriber side.
///
/// \param channel_type The type of the channel.
/// \param key_id_binary The message id to publish.
/// \param key_id The message id to publish.
virtual void PublishFailure(const rpc::ChannelType channel_type,
const std::string &key_id_binary) = 0;
const std::string &key_id) = 0;
/// Unregister subscription. It means the given object id won't be published to the
/// subscriber anymore.
///
/// \param channel_type The type of the channel.
/// \param subscriber_id The node id of the subscriber.
/// \param key_id_binary The key_id of the subscriber.
/// \param key_id The key_id of the subscriber.
/// \return True if erased. False otherwise.
virtual bool UnregisterSubscription(const rpc::ChannelType channel_type,
const SubscriberID &subscriber_id,
const std::string &key_id_binary) = 0;
const std::string &key_id) = 0;
};
/// Protocol detail
@ -246,11 +246,11 @@ class Publisher : public PublisherInterface {
///
/// \param channel_type The type of the channel.
/// \param subscriber_id The node id of the subscriber.
/// \param key_id_binary The key_id that the subscriber is subscribing to.
/// \param key_id The key_id that the subscriber is subscribing to.
/// \return True if the registration is new. False otherwise.
bool RegisterSubscription(const rpc::ChannelType channel_type,
const SubscriberID &subscriber_id,
const std::string &key_id_binary) override;
const std::string &key_id) override;
/// Publish the given object id to subscribers.
///
@ -262,20 +262,20 @@ class Publisher : public PublisherInterface {
/// It will invoke the failure callback on the subscriber side.
///
/// \param channel_type The type of the channel.
/// \param key_id_binary The message id to publish.
/// \param key_id The message id to publish.
void PublishFailure(const rpc::ChannelType channel_type,
const std::string &key_id_binary) override;
const std::string &key_id) override;
/// Unregister subscription. It means the given object id won't be published to the
/// subscriber anymore.
///
/// \param channel_type The type of the channel.
/// \param subscriber_id The node id of the subscriber.
/// \param key_id_binary The key_id of the subscriber.
/// \param key_id The key_id of the subscriber.
/// \return True if erased. False otherwise.
bool UnregisterSubscription(const rpc::ChannelType channel_type,
const SubscriberID &subscriber_id,
const std::string &key_id_binary) override;
const std::string &key_id) override;
/// Remove the subscriber. Once the subscriber is removed, messages won't be published
/// to it anymore.