diff --git a/BUILD.bazel b/BUILD.bazel index 73e60b334..afa1ef3d1 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -717,6 +717,7 @@ cc_library( ), hdrs = glob([ "src/ray/gcs/*.h", + "src/ray/gcs/test/*.h", ]), copts = COPTS, deps = [ @@ -735,7 +736,7 @@ cc_library( cc_binary( name = "redis_gcs_client_test", testonly = 1, - srcs = ["src/ray/gcs/redis_gcs_client_test.cc"], + srcs = ["src/ray/gcs/test/redis_gcs_client_test.cc"], copts = COPTS, deps = [ ":gcs", @@ -746,7 +747,7 @@ cc_binary( cc_binary( name = "redis_actor_info_accessor_test", testonly = 1, - srcs = ["src/ray/gcs/redis_actor_info_accessor_test.cc"], + srcs = ["src/ray/gcs/test/redis_actor_info_accessor_test.cc"], copts = COPTS, deps = [ ":gcs", @@ -757,7 +758,18 @@ cc_binary( cc_binary( name = "subscription_executor_test", testonly = 1, - srcs = ["src/ray/gcs/subscription_executor_test.cc"], + srcs = ["src/ray/gcs/test/subscription_executor_test.cc"], + copts = COPTS, + deps = [ + ":gcs", + "@com_google_googletest//:gtest_main", + ], +) + +cc_binary( + name = "redis_job_info_accessor_test", + testonly = 1, + srcs = ["src/ray/gcs/test/redis_job_info_accessor_test.cc"], copts = COPTS, deps = [ ":gcs", @@ -768,7 +780,7 @@ cc_binary( cc_binary( name = "asio_test", testonly = 1, - srcs = ["src/ray/gcs/asio_test.cc"], + srcs = ["src/ray/gcs/test/asio_test.cc"], copts = COPTS, deps = [ ":gcs", diff --git a/src/ray/gcs/actor_info_accessor.h b/src/ray/gcs/actor_info_accessor.h index 0f2589658..4a3a3109c 100644 --- a/src/ray/gcs/actor_info_accessor.h +++ b/src/ray/gcs/actor_info_accessor.h @@ -3,7 +3,7 @@ #include "ray/common/id.h" #include "ray/gcs/callback.h" -#include "ray/gcs/subscription_executor.h" +#include "ray/protobuf/gcs.pb.h" namespace ray { @@ -23,7 +23,7 @@ class ActorInfoAccessor { /// \param callback Callback that will be called after lookup finishes. /// \return Status virtual Status AsyncGet(const ActorID &actor_id, - const OptionalItemCallback &callback) = 0; + const OptionalItemCallback &callback) = 0; /// Register an actor to GCS asynchronously. /// @@ -31,7 +31,7 @@ class ActorInfoAccessor { /// \param callback Callback that will be called after actor has been registered /// to the GCS. /// \return Status - virtual Status AsyncRegister(const std::shared_ptr &data_ptr, + virtual Status AsyncRegister(const std::shared_ptr &data_ptr, const StatusCallback &callback) = 0; /// Update dynamic states of actor in GCS asynchronously. @@ -43,7 +43,7 @@ class ActorInfoAccessor { /// TODO(micafan) Don't expose the whole `ActorTableData` and only allow /// updating dynamic states. virtual Status AsyncUpdate(const ActorID &actor_id, - const std::shared_ptr &data_ptr, + const std::shared_ptr &data_ptr, const StatusCallback &callback) = 0; /// Subscribe to any register or update operations of actors. @@ -54,7 +54,7 @@ class ActorInfoAccessor { /// are ready to receive notification. /// \return Status virtual Status AsyncSubscribeAll( - const SubscribeCallback &subscribe, + const SubscribeCallback &subscribe, const StatusCallback &done) = 0; /// Subscribe to any update operations of an actor. @@ -65,7 +65,7 @@ class ActorInfoAccessor { /// \return Status virtual Status AsyncSubscribe( const ActorID &actor_id, - const SubscribeCallback &subscribe, + const SubscribeCallback &subscribe, const StatusCallback &done) = 0; /// Cancel subscription to an actor. diff --git a/src/ray/gcs/gcs_client.h b/src/ray/gcs/gcs_client.h index df7fa3dbf..c5ebd8e35 100644 --- a/src/ray/gcs/gcs_client.h +++ b/src/ray/gcs/gcs_client.h @@ -7,6 +7,7 @@ #include #include "ray/common/status.h" #include "ray/gcs/actor_info_accessor.h" +#include "ray/gcs/job_info_accessor.h" #include "ray/util/logging.h" namespace ray { @@ -29,24 +30,7 @@ class GcsClientOptions { : server_ip_(ip), server_port_(port), password_(password), - is_test_client_(is_test_client) { -#if RAY_USE_NEW_GCS - command_type_ = CommandType::kChain; -#else - command_type_ = CommandType::kRegular; -#endif - } - - /// This constructor is only used for testing (RedisGcsClient's test). - /// - /// \param ip GCS service ip - /// \param port GCS service port - /// \param command_type Command type of RedisGcsClient - GcsClientOptions(const std::string &ip, int port, CommandType command_type) - : server_ip_(ip), - server_port_(port), - command_type_(command_type), - is_test_client_(true) {} + is_test_client_(is_test_client) {} // GCS server address std::string server_ip_; @@ -54,9 +38,6 @@ class GcsClientOptions { // Password of GCS server. std::string password_; - // GCS command type. If CommandType::kChain, chain-replicated versions of the tables - // might be used, if available. - CommandType command_type_ = CommandType::kUnknown; // Whether this client is used for tests. bool is_test_client_{false}; @@ -80,13 +61,20 @@ class GcsClient : public std::enable_shared_from_this { /// Disconnect with GCS Service. Non-thread safe. virtual void Disconnect() = 0; - /// Get ActorInfoAccessor for reading or writing or subscribing to - /// actors. This function is thread safe. + /// Get the sub-interface for accessing actor information in GCS. + /// This function is thread safe. ActorInfoAccessor &Actors() { RAY_CHECK(actor_accessor_ != nullptr); return *actor_accessor_; } + /// Get the sub-interface for accessing job information in GCS. + /// This function is thread safe. + JobInfoAccessor &Jobs() { + RAY_CHECK(job_accessor_ != nullptr); + return *job_accessor_; + } + protected: /// Constructor of GcsClient. /// @@ -99,6 +87,7 @@ class GcsClient : public std::enable_shared_from_this { bool is_connected_{false}; std::unique_ptr actor_accessor_; + std::unique_ptr job_accessor_; }; } // namespace gcs diff --git a/src/ray/gcs/job_info_accessor.h b/src/ray/gcs/job_info_accessor.h new file mode 100644 index 000000000..e24a582e9 --- /dev/null +++ b/src/ray/gcs/job_info_accessor.h @@ -0,0 +1,54 @@ +#ifndef RAY_GCS_JOB_INFO_ACCESSOR_H +#define RAY_GCS_JOB_INFO_ACCESSOR_H + +#include "ray/common/id.h" +#include "ray/gcs/callback.h" +#include "ray/protobuf/gcs.pb.h" + +namespace ray { + +namespace gcs { + +/// \class JobInfoAccessor +/// `JobInfoAccessor` is a sub-interface of `GcsClient`. +/// This class includes all the methods that are related to accessing +/// job information in the GCS. +class JobInfoAccessor { + public: + virtual ~JobInfoAccessor() = default; + + /// Add a job to GCS asynchronously. + /// + /// \param data_ptr The job that will be add to GCS. + /// \param callback Callback that will be called after job has been added + /// to GCS. + /// \return Status + virtual Status AsyncAdd(const std::shared_ptr &data_ptr, + const StatusCallback &callback) = 0; + + /// Mark job as finished in GCS asynchronously. + /// + /// \param job_id ID of the job that will be make finished to GCS. + /// \param callback Callback that will be called after update finished. + /// \return Status + virtual Status AsyncMarkFinished(const JobID &job_id, + const StatusCallback &callback) = 0; + + /// Subscribe to finished jobs. + /// + /// \param subscribe Callback that will be called each time when a job finishes. + /// \param done Callback that will be called when subscription is complete. + /// \return Status + virtual Status AsyncSubscribeToFinishedJobs( + const SubscribeCallback &subscribe, + const StatusCallback &done) = 0; + + protected: + JobInfoAccessor() = default; +}; + +} // namespace gcs + +} // namespace ray + +#endif // RAY_GCS_JOB_INFO_ACCESSOR_H diff --git a/src/ray/gcs/pb_util.h b/src/ray/gcs/pb_util.h new file mode 100644 index 000000000..ddc5179fe --- /dev/null +++ b/src/ray/gcs/pb_util.h @@ -0,0 +1,36 @@ +#ifndef RAY_GCS_PB_UTIL_H +#define RAY_GCS_PB_UTIL_H + +#include +#include "ray/common/id.h" +#include "ray/protobuf/gcs.pb.h" + +namespace ray { + +namespace gcs { + +/// Helper function to produce job table data (for newly created job or updated job). +/// +/// \param job_id The ID of job that need to be registered or updated. +/// \param is_dead Whether the driver of this job is dead. +/// \param timestamp The UNIX timestamp of corresponding to this event. +/// \param node_manager_address Address of the node this job was started on. +/// \param driver_pid Process ID of the driver running this job. +/// \return The job table data created by this method. +inline std::shared_ptr CreateJobTableData( + const ray::JobID &job_id, bool is_dead, int64_t timestamp, + const std::string &node_manager_address, int64_t driver_pid) { + auto job_info_ptr = std::make_shared(); + job_info_ptr->set_job_id(job_id.Binary()); + job_info_ptr->set_is_dead(is_dead); + job_info_ptr->set_timestamp(timestamp); + job_info_ptr->set_node_manager_address(node_manager_address); + job_info_ptr->set_driver_pid(driver_pid); + return job_info_ptr; +} + +} // namespace gcs + +} // namespace ray + +#endif // RAY_GCS_PB_UTIL_H diff --git a/src/ray/gcs/redis_actor_info_accessor.cc b/src/ray/gcs/redis_actor_info_accessor.cc index f346efddb..c0018c8a4 100644 --- a/src/ray/gcs/redis_actor_info_accessor.cc +++ b/src/ray/gcs/redis_actor_info_accessor.cc @@ -92,7 +92,7 @@ Status RedisActorInfoAccessor::AsyncSubscribeAll( const SubscribeCallback &subscribe, const StatusCallback &done) { RAY_CHECK(subscribe != nullptr); - return actor_sub_executor_.AsyncSubscribe(ClientID::Nil(), subscribe, done); + return actor_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), subscribe, done); } Status RedisActorInfoAccessor::AsyncSubscribe( diff --git a/src/ray/gcs/redis_gcs_client.cc b/src/ray/gcs/redis_gcs_client.cc index 161fab042..339751c82 100644 --- a/src/ray/gcs/redis_gcs_client.cc +++ b/src/ray/gcs/redis_gcs_client.cc @@ -1,10 +1,10 @@ #include "ray/gcs/redis_gcs_client.h" +#include #include "ray/common/ray_config.h" #include "ray/gcs/redis_actor_info_accessor.h" #include "ray/gcs/redis_context.h" - -#include +#include "ray/gcs/redis_job_info_accessor.h" static void GetRedisShards(redisContext *context, std::vector &addresses, std::vector &ports) { @@ -73,7 +73,16 @@ namespace ray { namespace gcs { -RedisGcsClient::RedisGcsClient(const GcsClientOptions &options) : GcsClient(options) {} +RedisGcsClient::RedisGcsClient(const GcsClientOptions &options) : GcsClient(options) { +#if RAY_USE_NEW_GCS + command_type_ = CommandType::kChain; +#else + command_type_ = CommandType::kRegular; +#endif +} + +RedisGcsClient::RedisGcsClient(const GcsClientOptions &options, CommandType command_type) + : GcsClient(options), command_type_(command_type) {} Status RedisGcsClient::Connect(boost::asio::io_service &io_service) { RAY_CHECK(!is_connected_); @@ -114,6 +123,8 @@ Status RedisGcsClient::Connect(boost::asio::io_service &io_service) { /*password=*/options_.password_)); } + Attach(io_service); + actor_table_.reset(new ActorTable({primary_context_}, this)); // TODO(micafan) Modify ClientTable' Constructor(remove ClientID) in future. @@ -127,8 +138,7 @@ Status RedisGcsClient::Connect(boost::asio::io_service &io_service) { heartbeat_batch_table_.reset(new HeartbeatBatchTable({primary_context_}, this)); // Tables below would be sharded. object_table_.reset(new ObjectTable(shard_contexts_, this)); - raylet_task_table_.reset( - new raylet::TaskTable(shard_contexts_, this, options_.command_type_)); + raylet_task_table_.reset(new raylet::TaskTable(shard_contexts_, this, command_type_)); task_reconstruction_log_.reset(new TaskReconstructionLog(shard_contexts_, this)); task_lease_table_.reset(new TaskLeaseTable(shard_contexts_, this)); heartbeat_table_.reset(new HeartbeatTable(shard_contexts_, this)); @@ -138,14 +148,13 @@ Status RedisGcsClient::Connect(boost::asio::io_service &io_service) { resource_table_.reset(new DynamicResourceTable({primary_context_}, this)); actor_accessor_.reset(new RedisActorInfoAccessor(this)); + job_accessor_.reset(new RedisJobInfoAccessor(this)); - Status status = Attach(io_service); - is_connected_ = status.ok(); + is_connected_ = true; - // TODO(micafan): Synchronously register node and look up existing nodes here - // for this client is Raylet. - RAY_LOG(INFO) << "RedisGcsClient::Connect finished with status " << status; - return status; + RAY_LOG(INFO) << "RedisGcsClient Connected."; + + return Status::OK(); } void RedisGcsClient::Disconnect() { @@ -155,7 +164,7 @@ void RedisGcsClient::Disconnect() { // TODO(micafan): Synchronously unregister node if this client is Raylet. } -Status RedisGcsClient::Attach(boost::asio::io_service &io_service) { +void RedisGcsClient::Attach(boost::asio::io_service &io_service) { // Take care of sharding contexts. RAY_CHECK(shard_asio_async_clients_.empty()) << "Attach shall be called only once"; for (std::shared_ptr context : shard_contexts_) { @@ -168,7 +177,6 @@ Status RedisGcsClient::Attach(boost::asio::io_service &io_service) { new RedisAsioClient(io_service, primary_context_->async_context())); asio_subscribe_auxiliary_client_.reset( new RedisAsioClient(io_service, primary_context_->subscribe_context())); - return Status::OK(); } std::string RedisGcsClient::DebugString() const { diff --git a/src/ray/gcs/redis_gcs_client.h b/src/ray/gcs/redis_gcs_client.h index fbb8d72f1..146d80f60 100644 --- a/src/ray/gcs/redis_gcs_client.h +++ b/src/ray/gcs/redis_gcs_client.h @@ -21,7 +21,9 @@ class RAY_EXPORT RedisGcsClient : public GcsClient { // TODO(micafan) Will remove those friend class after we replace RedisGcsClient // with interface class GcsClient in raylet. friend class RedisActorInfoAccessor; + friend class RedisJobInfoAccessor; friend class SubscriptionExecutorTest; + friend class LogSubscribeTestHelper; public: /// Constructor of RedisGcsClient. @@ -29,9 +31,16 @@ class RAY_EXPORT RedisGcsClient : public GcsClient { /// TODO(micafan) To read and write from the GCS tables requires a further /// call to Connect() to the client table. Will fix this in next pr. /// - /// \param GcsClientOptions Options of client, e.g. server address, is test client ... + /// \param options Options of this client, e.g. server address, password and so on. RedisGcsClient(const GcsClientOptions &options); + /// This constructor is only used for testing. + /// Connect() must be called(and return ok) before you call any other methods. + /// + /// \param options Options of this client, e.g. server address, password and so on. + /// \param command_type The commands issued type. + RedisGcsClient(const GcsClientOptions &options, CommandType command_type); + /// Connect to GCS Service. Non-thread safe. /// Call this function before calling other functions. /// @@ -53,7 +62,6 @@ class RAY_EXPORT RedisGcsClient : public GcsClient { HeartbeatTable &heartbeat_table(); HeartbeatBatchTable &heartbeat_batch_table(); ErrorTable &error_table(); - JobTable &job_table(); ProfileTable &profile_table(); ActorCheckpointTable &actor_checkpoint_table(); ActorCheckpointIdTable &actor_checkpoint_id_table(); @@ -78,10 +86,16 @@ class RAY_EXPORT RedisGcsClient : public GcsClient { private: /// Attach this client to an asio event loop. Note that only /// one event loop should be attached at a time. - Status Attach(boost::asio::io_service &io_service); + void Attach(boost::asio::io_service &io_service); - /// Use method Actors() instead + /// This method will be deprecated, use method Actors() instead. ActorTable &actor_table(); + /// This method will be deprecated, use method Jobs() instead. + JobTable &job_table(); + + // GCS command type. If CommandType::kChain, chain-replicated versions of the tables + // might be used, if available. + CommandType command_type_{CommandType::kUnknown}; std::unique_ptr object_table_; std::unique_ptr raylet_task_table_; diff --git a/src/ray/gcs/redis_job_info_accessor.cc b/src/ray/gcs/redis_job_info_accessor.cc new file mode 100644 index 000000000..d7028c163 --- /dev/null +++ b/src/ray/gcs/redis_job_info_accessor.cc @@ -0,0 +1,50 @@ +#include "ray/gcs/redis_job_info_accessor.h" +#include "ray/gcs/pb_util.h" +#include "ray/gcs/redis_gcs_client.h" + +namespace ray { + +namespace gcs { + +RedisJobInfoAccessor::RedisJobInfoAccessor(RedisGcsClient *client_impl) + : client_impl_(client_impl), job_sub_executor_(client_impl->job_table()) {} + +Status RedisJobInfoAccessor::AsyncAdd(const std::shared_ptr &data_ptr, + const StatusCallback &callback) { + return DoAsyncAppend(data_ptr, callback); +} + +Status RedisJobInfoAccessor::AsyncMarkFinished(const JobID &job_id, + const StatusCallback &callback) { + std::shared_ptr data_ptr = + CreateJobTableData(job_id, /*is_dead*/ true, /*time_stamp*/ std::time(nullptr), + /*node_manager_address*/ "", /*driver_pid*/ -1); + return DoAsyncAppend(data_ptr, callback); +} + +Status RedisJobInfoAccessor::DoAsyncAppend(const std::shared_ptr &data_ptr, + const StatusCallback &callback) { + JobTable::WriteCallback on_done = nullptr; + if (callback != nullptr) { + on_done = [callback](RedisGcsClient *client, const JobID &job_id, + const JobTableData &data) { callback(Status::OK()); }; + } + + JobID job_id = JobID::FromBinary(data_ptr->job_id()); + return client_impl_->job_table().Append(job_id, job_id, data_ptr, on_done); +} + +Status RedisJobInfoAccessor::AsyncSubscribeToFinishedJobs( + const SubscribeCallback &subscribe, const StatusCallback &done) { + RAY_CHECK(subscribe != nullptr); + auto on_subscribe = [subscribe](const JobID &job_id, const JobTableData &job_data) { + if (job_data.is_dead()) { + subscribe(job_id, job_data); + } + }; + return job_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), on_subscribe, done); +} + +} // namespace gcs + +} // namespace ray diff --git a/src/ray/gcs/redis_job_info_accessor.h b/src/ray/gcs/redis_job_info_accessor.h new file mode 100644 index 000000000..2fb8ebc75 --- /dev/null +++ b/src/ray/gcs/redis_job_info_accessor.h @@ -0,0 +1,53 @@ +#ifndef RAY_GCS_REDIS_JOB_INFO_ACCESSOR_H +#define RAY_GCS_REDIS_JOB_INFO_ACCESSOR_H + +#include "ray/common/id.h" +#include "ray/gcs/callback.h" +#include "ray/gcs/job_info_accessor.h" +#include "ray/gcs/subscription_executor.h" +#include "ray/gcs/tables.h" + +namespace ray { + +namespace gcs { + +class RedisGcsClient; + +/// \class RedisJobInfoAccessor +/// RedisJobInfoAccessor is an implementation of `JobInfoAccessor` +/// that uses Redis as the backend storage. +class RedisJobInfoAccessor : public JobInfoAccessor { + public: + explicit RedisJobInfoAccessor(RedisGcsClient *client_impl); + + virtual ~RedisJobInfoAccessor() {} + + Status AsyncAdd(const std::shared_ptr &data_ptr, + const StatusCallback &callback) override; + + Status AsyncMarkFinished(const JobID &job_id, const StatusCallback &callback) override; + + Status AsyncSubscribeToFinishedJobs( + const SubscribeCallback &subscribe, + const StatusCallback &done) override; + + private: + /// Append job information to GCS asynchronously. + /// + /// \param data_ptr The job information that will be appended to GCS. + /// \param callback Callback that will be called after append done. + /// \return Status + Status DoAsyncAppend(const std::shared_ptr &data_ptr, + const StatusCallback &callback); + + RedisGcsClient *client_impl_{nullptr}; + + typedef SubscriptionExecutor JobSubscriptionExecutor; + JobSubscriptionExecutor job_sub_executor_; +}; + +} // namespace gcs + +} // namespace ray + +#endif // RAY_GCS_REDIS_JOB_INFO_ACCESSOR_H diff --git a/src/ray/gcs/subscription_executor.cc b/src/ray/gcs/subscription_executor.cc index f6a9540b8..40f0ead29 100644 --- a/src/ray/gcs/subscription_executor.cc +++ b/src/ray/gcs/subscription_executor.cc @@ -5,7 +5,7 @@ namespace ray { namespace gcs { template -Status SubscriptionExecutor::AsyncSubscribe( +Status SubscriptionExecutor::AsyncSubscribeAll( const ClientID &client_id, const SubscribeCallback &subscribe, const StatusCallback &done) { // TODO(micafan) Optimize the lock when necessary. @@ -138,7 +138,7 @@ Status SubscriptionExecutor::AsyncSubscribe( id_to_callback_map_[id] = subscribe; } - auto status = AsyncSubscribe(client_id, nullptr, on_subscribe_done); + auto status = AsyncSubscribeAll(client_id, nullptr, on_subscribe_done); if (!status.ok()) { std::unique_lock lock(mutex_); id_to_callback_map_.erase(id); @@ -187,6 +187,7 @@ Status SubscriptionExecutor::AsyncUnsubscribe( } template class SubscriptionExecutor; +template class SubscriptionExecutor; } // namespace gcs diff --git a/src/ray/gcs/subscription_executor.h b/src/ray/gcs/subscription_executor.h index 1221b70e0..83921d52b 100644 --- a/src/ray/gcs/subscription_executor.h +++ b/src/ray/gcs/subscription_executor.h @@ -32,9 +32,9 @@ class SubscriptionExecutor { /// is registered or updated. /// \param done Callback that will be called when subscription is complete. /// \return Status - Status AsyncSubscribe(const ClientID &client_id, - const SubscribeCallback &subscribe, - const StatusCallback &done); + Status AsyncSubscribeAll(const ClientID &client_id, + const SubscribeCallback &subscribe, + const StatusCallback &done); /// Subscribe to operations of an element. /// Repeated subscription to an element will return a failure. diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index c891dbeef..ca7b8e7c5 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -503,18 +503,6 @@ std::string ProfileTable::DebugString() const { return Log::DebugString(); } -Status JobTable::AppendJobData(const JobID &job_id, bool is_dead, int64_t timestamp, - const std::string &node_manager_address, - int64_t driver_pid) { - auto data = std::make_shared(); - data->set_job_id(job_id.Binary()); - data->set_is_dead(is_dead); - data->set_timestamp(timestamp); - data->set_node_manager_address(node_manager_address); - data->set_driver_pid(driver_pid); - return Append(JobID(job_id), job_id, data, /*done_callback=*/nullptr); -} - void ClientTable::RegisterClientAddedCallback(const ClientTableCallback &callback) { client_added_callback_ = callback; // Call the callback for any added clients that are cached. diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index f02bc453c..eb2240017 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -654,17 +654,6 @@ class JobTable : public Log { }; virtual ~JobTable() {} - - /// Appends job data to the job table. - /// - /// \param job_id The job id. - /// \param is_dead Whether the job is dead. - /// \param timestamp The UNIX timestamp when the driver was started/stopped. - /// \param node_manager_address IP address of the node the driver is running on. - /// \param driver_pid Process ID of the driver process. - /// \return The return status. - Status AppendJobData(const JobID &job_id, bool is_dead, int64_t timestamp, - const std::string &node_manager_address, int64_t driver_pid); }; /// Actor table starts with an ALIVE entry, which represents the first time the actor diff --git a/src/ray/gcs/accessor_test_base.h b/src/ray/gcs/test/accessor_test_base.h similarity index 100% rename from src/ray/gcs/accessor_test_base.h rename to src/ray/gcs/test/accessor_test_base.h diff --git a/src/ray/gcs/asio_test.cc b/src/ray/gcs/test/asio_test.cc similarity index 97% rename from src/ray/gcs/asio_test.cc rename to src/ray/gcs/test/asio_test.cc index 21d5e37a5..71850618e 100644 --- a/src/ray/gcs/asio_test.cc +++ b/src/ray/gcs/test/asio_test.cc @@ -1,7 +1,7 @@ #include -#include "asio.h" #include "gtest/gtest.h" +#include "ray/gcs/asio.h" #include "ray/util/logging.h" extern "C" { diff --git a/src/ray/gcs/redis_actor_info_accessor_test.cc b/src/ray/gcs/test/redis_actor_info_accessor_test.cc similarity index 98% rename from src/ray/gcs/redis_actor_info_accessor_test.cc rename to src/ray/gcs/test/redis_actor_info_accessor_test.cc index ebaa76c50..c1b7eed43 100644 --- a/src/ray/gcs/redis_actor_info_accessor_test.cc +++ b/src/ray/gcs/test/redis_actor_info_accessor_test.cc @@ -4,8 +4,8 @@ #include #include #include "gtest/gtest.h" -#include "ray/gcs/accessor_test_base.h" #include "ray/gcs/redis_gcs_client.h" +#include "ray/gcs/test/accessor_test_base.h" #include "ray/util/test_util.h" namespace ray { diff --git a/src/ray/gcs/redis_gcs_client_test.cc b/src/ray/gcs/test/redis_gcs_client_test.cc similarity index 85% rename from src/ray/gcs/redis_gcs_client_test.cc rename to src/ray/gcs/test/redis_gcs_client_test.cc index cee839550..76dbdcbf3 100644 --- a/src/ray/gcs/redis_gcs_client_test.cc +++ b/src/ray/gcs/test/redis_gcs_client_test.cc @@ -6,6 +6,7 @@ extern "C" { } #include "ray/common/ray_config.h" +#include "ray/gcs/pb_util.h" #include "ray/gcs/redis_gcs_client.h" #include "ray/gcs/tables.h" #include "ray/util/test_util.h" @@ -30,8 +31,8 @@ inline JobID NextJobID() { class TestGcs : public ::testing::Test { public: TestGcs(CommandType command_type) : num_callbacks_(0), command_type_(command_type) { - GcsClientOptions options("127.0.0.1", 6379, command_type_); - client_ = std::make_shared(options); + GcsClientOptions options("127.0.0.1", 6379, "", true); + client_ = std::make_shared(options, command_type_); job_id_ = NextJobID(); } @@ -575,52 +576,193 @@ TEST_F(TestGcsWithAsio, TestDeleteKey) { TestDeleteKeys(job_id_, client_); } -void TestLogSubscribeAll(const JobID &job_id, - std::shared_ptr client) { - std::vector job_ids; - for (int i = 0; i < 3; i++) { - job_ids.emplace_back(NextJobID()); +/// A helper class for Log Subscribe testing. +class LogSubscribeTestHelper { + public: + static void TestLogSubscribeAll(const JobID &job_id, + std::shared_ptr client) { + std::vector job_ids; + for (int i = 0; i < 3; i++) { + job_ids.emplace_back(NextJobID()); + } + // Callback for a notification. + auto notification_callback = [job_ids](gcs::RedisGcsClient *client, const JobID &id, + const std::vector data) { + ASSERT_EQ(id, job_ids[test->NumCallbacks()]); + // Check that we get notifications in the same order as the writes. + for (const auto &entry : data) { + ASSERT_EQ(entry.job_id(), job_ids[test->NumCallbacks()].Binary()); + test->IncrementNumCallbacks(); + } + if (test->NumCallbacks() == job_ids.size()) { + test->Stop(); + } + }; + + // Callback for subscription success. We are guaranteed to receive + // notifications after this is called. + auto subscribe_callback = [job_ids](gcs::RedisGcsClient *client) { + // We have subscribed. Do the writes to the table. + for (size_t i = 0; i < job_ids.size(); i++) { + auto job_info_ptr = CreateJobTableData(job_ids[i], false, 0, "localhost", 1); + RAY_CHECK_OK( + client->job_table().Append(job_ids[i], job_ids[i], job_info_ptr, nullptr)); + } + }; + + // Subscribe to all driver table notifications. Once we have successfully + // subscribed, we will append to the key several times and check that we get + // notified for each. + RAY_CHECK_OK(client->job_table().Subscribe( + job_id, ClientID::Nil(), notification_callback, subscribe_callback)); + + // Run the event loop. The loop will only stop if the registered subscription + // callback is called (or an assertion failure). + test->Start(); + // Check that we received one notification callback for each write. + ASSERT_EQ(test->NumCallbacks(), job_ids.size()); } - // Callback for a notification. - auto notification_callback = [job_ids](gcs::RedisGcsClient *client, const JobID &id, - const std::vector data) { - ASSERT_EQ(id, job_ids[test->NumCallbacks()]); - // Check that we get notifications in the same order as the writes. - for (const auto &entry : data) { - ASSERT_EQ(entry.job_id(), job_ids[test->NumCallbacks()].Binary()); - test->IncrementNumCallbacks(); - } - if (test->NumCallbacks() == job_ids.size()) { - test->Stop(); - } - }; - // Callback for subscription success. We are guaranteed to receive - // notifications after this is called. - auto subscribe_callback = [job_ids](gcs::RedisGcsClient *client) { - // We have subscribed. Do the writes to the table. - for (size_t i = 0; i < job_ids.size(); i++) { - RAY_CHECK_OK( - client->job_table().AppendJobData(job_ids[i], false, 0, "localhost", 1)); - } - }; + static void TestLogSubscribeId(const JobID &job_id, + std::shared_ptr client) { + // Add a log entry. + JobID job_id1 = NextJobID(); + std::vector job_ids1 = {"abc", "def", "ghi"}; + auto data1 = std::make_shared(); + data1->set_job_id(job_ids1[0]); + RAY_CHECK_OK(client->job_table().Append(job_id, job_id1, data1, nullptr)); - // Subscribe to all driver table notifications. Once we have successfully - // subscribed, we will append to the key several times and check that we get - // notified for each. - RAY_CHECK_OK(client->job_table().Subscribe(job_id, ClientID::Nil(), - notification_callback, subscribe_callback)); + // Add a log entry at a second key. + JobID job_id2 = NextJobID(); + std::vector job_ids2 = {"jkl", "mno", "pqr"}; + auto data2 = std::make_shared(); + data2->set_job_id(job_ids2[0]); + RAY_CHECK_OK(client->job_table().Append(job_id, job_id2, data2, nullptr)); - // Run the event loop. The loop will only stop if the registered subscription - // callback is called (or an assertion failure). - test->Start(); - // Check that we received one notification callback for each write. - ASSERT_EQ(test->NumCallbacks(), job_ids.size()); -} + // The callback for a notification from the table. This should only be + // received for keys that we requested notifications for. + auto notification_callback = [job_id2, job_ids2]( + gcs::RedisGcsClient *client, const JobID &id, + const std::vector &data) { + // Check that we only get notifications for the requested key. + ASSERT_EQ(id, job_id2); + // Check that we get notifications in the same order as the writes. + for (const auto &entry : data) { + ASSERT_EQ(entry.job_id(), job_ids2[test->NumCallbacks()]); + test->IncrementNumCallbacks(); + } + if (test->NumCallbacks() == job_ids2.size()) { + test->Stop(); + } + }; + + // The callback for subscription success. Once we've subscribed, request + // notifications for only one of the keys, then write to both keys. + auto subscribe_callback = [job_id, job_id1, job_id2, job_ids1, + job_ids2](gcs::RedisGcsClient *client) { + // Request notifications for one of the keys. + RAY_CHECK_OK(client->job_table().RequestNotifications( + job_id, job_id2, client->client_table().GetLocalClientId(), nullptr)); + // Write both keys. We should only receive notifications for the key that + // we requested them for. + auto remaining = std::vector(++job_ids1.begin(), job_ids1.end()); + for (const auto &job_id_it : remaining) { + auto data = std::make_shared(); + data->set_job_id(job_id_it); + RAY_CHECK_OK(client->job_table().Append(job_id, job_id1, data, nullptr)); + } + remaining = std::vector(++job_ids2.begin(), job_ids2.end()); + for (const auto &job_id_it : remaining) { + auto data = std::make_shared(); + data->set_job_id(job_id_it); + RAY_CHECK_OK(client->job_table().Append(job_id, job_id2, data, nullptr)); + } + }; + + // Subscribe to notifications for this client. This allows us to request and + // receive notifications for specific keys. + RAY_CHECK_OK( + client->job_table().Subscribe(job_id, client->client_table().GetLocalClientId(), + notification_callback, subscribe_callback)); + // Run the event loop. The loop will only stop if the registered subscription + // callback is called for the requested key. + test->Start(); + // Check that we received one notification callback for each write to the + // requested key. + ASSERT_EQ(test->NumCallbacks(), job_ids2.size()); + } + + static void TestLogSubscribeCancel(const JobID &job_id, + std::shared_ptr client) { + // Add a log entry. + JobID random_job_id = NextJobID(); + std::vector job_ids = {"jkl", "mno", "pqr"}; + auto data = std::make_shared(); + data->set_job_id(job_ids[0]); + RAY_CHECK_OK(client->job_table().Append(job_id, random_job_id, data, nullptr)); + + // The callback for a notification from the object table. This should only be + // received for the object that we requested notifications for. + auto notification_callback = [random_job_id, job_ids]( + gcs::RedisGcsClient *client, const JobID &id, + const std::vector &data) { + ASSERT_EQ(id, random_job_id); + // Check that we get a duplicate notification for the first write. We get a + // duplicate notification because the log is append-only and notifications + // are canceled after the first write, then requested again. + auto job_ids_copy = job_ids; + job_ids_copy.insert(job_ids_copy.begin(), job_ids_copy.front()); + for (const auto &entry : data) { + ASSERT_EQ(entry.job_id(), job_ids_copy[test->NumCallbacks()]); + test->IncrementNumCallbacks(); + } + if (test->NumCallbacks() == job_ids_copy.size()) { + test->Stop(); + } + }; + + // The callback for a notification from the table. This should only be + // received for keys that we requested notifications for. + auto subscribe_callback = [job_id, random_job_id, + job_ids](gcs::RedisGcsClient *client) { + // Request notifications, then cancel immediately. We should receive a + // notification for the current value at the key. + RAY_CHECK_OK(client->job_table().RequestNotifications( + job_id, random_job_id, client->client_table().GetLocalClientId(), nullptr)); + RAY_CHECK_OK(client->job_table().CancelNotifications( + job_id, random_job_id, client->client_table().GetLocalClientId(), nullptr)); + // Append to the key. Since we canceled notifications, we should not + // receive a notification for these writes. + auto remaining = std::vector(++job_ids.begin(), job_ids.end()); + for (const auto &remaining_job_id : remaining) { + auto data = std::make_shared(); + data->set_job_id(remaining_job_id); + RAY_CHECK_OK(client->job_table().Append(job_id, random_job_id, data, nullptr)); + } + // Request notifications again. We should receive a notification for the + // current values at the key. + RAY_CHECK_OK(client->job_table().RequestNotifications( + job_id, random_job_id, client->client_table().GetLocalClientId(), nullptr)); + }; + + // Subscribe to notifications for this client. This allows us to request and + // receive notifications for specific keys. + RAY_CHECK_OK( + client->job_table().Subscribe(job_id, client->client_table().GetLocalClientId(), + notification_callback, subscribe_callback)); + // Run the event loop. The loop will only stop if the registered subscription + // callback is called for the requested key. + test->Start(); + // Check that we received a notification callback for the first append to the + // key, then a notification for all of the appends, because we cancel + // notifications in between. + ASSERT_EQ(test->NumCallbacks(), job_ids.size() + 1); + } +}; TEST_F(TestGcsWithAsio, TestLogSubscribeAll) { test = this; - TestLogSubscribeAll(job_id_, client_); + LogSubscribeTestHelper::TestLogSubscribeAll(job_id_, client_); } void TestSetSubscribeAll(const JobID &job_id, @@ -775,78 +917,9 @@ TEST_MACRO(TestGcsWithAsio, TestTableSubscribeId); TEST_MACRO(TestGcsWithChainAsio, TestTableSubscribeId); #endif -void TestLogSubscribeId(const JobID &job_id, - std::shared_ptr client) { - // Add a log entry. - JobID job_id1 = NextJobID(); - std::vector job_ids1 = {"abc", "def", "ghi"}; - auto data1 = std::make_shared(); - data1->set_job_id(job_ids1[0]); - RAY_CHECK_OK(client->job_table().Append(job_id, job_id1, data1, nullptr)); - - // Add a log entry at a second key. - JobID job_id2 = NextJobID(); - std::vector job_ids2 = {"jkl", "mno", "pqr"}; - auto data2 = std::make_shared(); - data2->set_job_id(job_ids2[0]); - RAY_CHECK_OK(client->job_table().Append(job_id, job_id2, data2, nullptr)); - - // The callback for a notification from the table. This should only be - // received for keys that we requested notifications for. - auto notification_callback = [job_id2, job_ids2]( - gcs::RedisGcsClient *client, const JobID &id, - const std::vector &data) { - // Check that we only get notifications for the requested key. - ASSERT_EQ(id, job_id2); - // Check that we get notifications in the same order as the writes. - for (const auto &entry : data) { - ASSERT_EQ(entry.job_id(), job_ids2[test->NumCallbacks()]); - test->IncrementNumCallbacks(); - } - if (test->NumCallbacks() == job_ids2.size()) { - test->Stop(); - } - }; - - // The callback for subscription success. Once we've subscribed, request - // notifications for only one of the keys, then write to both keys. - auto subscribe_callback = [job_id, job_id1, job_id2, job_ids1, - job_ids2](gcs::RedisGcsClient *client) { - // Request notifications for one of the keys. - RAY_CHECK_OK(client->job_table().RequestNotifications( - job_id, job_id2, client->client_table().GetLocalClientId(), nullptr)); - // Write both keys. We should only receive notifications for the key that - // we requested them for. - auto remaining = std::vector(++job_ids1.begin(), job_ids1.end()); - for (const auto &job_id_it : remaining) { - auto data = std::make_shared(); - data->set_job_id(job_id_it); - RAY_CHECK_OK(client->job_table().Append(job_id, job_id1, data, nullptr)); - } - remaining = std::vector(++job_ids2.begin(), job_ids2.end()); - for (const auto &job_id_it : remaining) { - auto data = std::make_shared(); - data->set_job_id(job_id_it); - RAY_CHECK_OK(client->job_table().Append(job_id, job_id2, data, nullptr)); - } - }; - - // Subscribe to notifications for this client. This allows us to request and - // receive notifications for specific keys. - RAY_CHECK_OK(client->job_table().Subscribe(job_id, - client->client_table().GetLocalClientId(), - notification_callback, subscribe_callback)); - // Run the event loop. The loop will only stop if the registered subscription - // callback is called for the requested key. - test->Start(); - // Check that we received one notification callback for each write to the - // requested key. - ASSERT_EQ(test->NumCallbacks(), job_ids2.size()); -} - TEST_F(TestGcsWithAsio, TestLogSubscribeId) { test = this; - TestLogSubscribeId(job_id_, client_); + LogSubscribeTestHelper::TestLogSubscribeId(job_id_, client_); } void TestSetSubscribeId(const JobID &job_id, @@ -997,76 +1070,9 @@ TEST_MACRO(TestGcsWithAsio, TestTableSubscribeCancel); TEST_MACRO(TestGcsWithChainAsio, TestTableSubscribeCancel); #endif -void TestLogSubscribeCancel(const JobID &job_id, - std::shared_ptr client) { - // Add a log entry. - JobID random_job_id = NextJobID(); - std::vector job_ids = {"jkl", "mno", "pqr"}; - auto data = std::make_shared(); - data->set_job_id(job_ids[0]); - RAY_CHECK_OK(client->job_table().Append(job_id, random_job_id, data, nullptr)); - - // The callback for a notification from the object table. This should only be - // received for the object that we requested notifications for. - auto notification_callback = [random_job_id, job_ids]( - gcs::RedisGcsClient *client, const JobID &id, - const std::vector &data) { - ASSERT_EQ(id, random_job_id); - // Check that we get a duplicate notification for the first write. We get a - // duplicate notification because the log is append-only and notifications - // are canceled after the first write, then requested again. - auto job_ids_copy = job_ids; - job_ids_copy.insert(job_ids_copy.begin(), job_ids_copy.front()); - for (const auto &entry : data) { - ASSERT_EQ(entry.job_id(), job_ids_copy[test->NumCallbacks()]); - test->IncrementNumCallbacks(); - } - if (test->NumCallbacks() == job_ids_copy.size()) { - test->Stop(); - } - }; - - // The callback for a notification from the table. This should only be - // received for keys that we requested notifications for. - auto subscribe_callback = [job_id, random_job_id, - job_ids](gcs::RedisGcsClient *client) { - // Request notifications, then cancel immediately. We should receive a - // notification for the current value at the key. - RAY_CHECK_OK(client->job_table().RequestNotifications( - job_id, random_job_id, client->client_table().GetLocalClientId(), nullptr)); - RAY_CHECK_OK(client->job_table().CancelNotifications( - job_id, random_job_id, client->client_table().GetLocalClientId(), nullptr)); - // Append to the key. Since we canceled notifications, we should not - // receive a notification for these writes. - auto remaining = std::vector(++job_ids.begin(), job_ids.end()); - for (const auto &remaining_job_id : remaining) { - auto data = std::make_shared(); - data->set_job_id(remaining_job_id); - RAY_CHECK_OK(client->job_table().Append(job_id, random_job_id, data, nullptr)); - } - // Request notifications again. We should receive a notification for the - // current values at the key. - RAY_CHECK_OK(client->job_table().RequestNotifications( - job_id, random_job_id, client->client_table().GetLocalClientId(), nullptr)); - }; - - // Subscribe to notifications for this client. This allows us to request and - // receive notifications for specific keys. - RAY_CHECK_OK(client->job_table().Subscribe(job_id, - client->client_table().GetLocalClientId(), - notification_callback, subscribe_callback)); - // Run the event loop. The loop will only stop if the registered subscription - // callback is called for the requested key. - test->Start(); - // Check that we received a notification callback for the first append to the - // key, then a notification for all of the appends, because we cancel - // notifications in between. - ASSERT_EQ(test->NumCallbacks(), job_ids.size() + 1); -} - TEST_F(TestGcsWithAsio, TestLogSubscribeCancel) { test = this; - TestLogSubscribeCancel(job_id_, client_); + LogSubscribeTestHelper::TestLogSubscribeCancel(job_id_, client_); } void TestSetSubscribeCancel(const JobID &job_id, diff --git a/src/ray/gcs/test/redis_job_info_accessor_test.cc b/src/ray/gcs/test/redis_job_info_accessor_test.cc new file mode 100644 index 000000000..d93f7c40d --- /dev/null +++ b/src/ray/gcs/test/redis_job_info_accessor_test.cc @@ -0,0 +1,75 @@ +#include "ray/gcs/redis_job_info_accessor.h" +#include +#include "gtest/gtest.h" +#include "ray/gcs/pb_util.h" +#include "ray/gcs/redis_gcs_client.h" +#include "ray/gcs/test/accessor_test_base.h" +#include "ray/util/test_util.h" + +namespace ray { + +namespace gcs { + +class RedisJobInfoAccessorTest : public AccessorTestBase { + protected: + virtual void GenTestData() { + for (size_t i = 0; i < total_job_number_; ++i) { + JobID job_id = JobID::FromInt(i); + std::shared_ptr job_data_ptr = + CreateJobTableData(job_id, /*is_dead*/ false, /*timestamp*/ 1, + /*node_manager_address*/ "", /*driver_pid*/ i); + id_to_data_[job_id] = job_data_ptr; + } + } + std::atomic subscribe_pending_count_{0}; + size_t total_job_number_{100}; +}; + +TEST_F(RedisJobInfoAccessorTest, AddAndSubscribe) { + JobInfoAccessor &job_accessor = gcs_client_->Jobs(); + // SubscribeAll + auto on_subscribe = [this](const JobID &job_id, const JobTableData &data) { + const auto it = id_to_data_.find(job_id); + RAY_CHECK(it != id_to_data_.end()); + ASSERT_TRUE(data.is_dead()); + --subscribe_pending_count_; + }; + + auto on_done = [this](Status status) { + RAY_CHECK_OK(status); + --pending_count_; + }; + + ++pending_count_; + RAY_CHECK_OK(job_accessor.AsyncSubscribeToFinishedJobs(on_subscribe, on_done)); + + WaitPendingDone(wait_pending_timeout_); + WaitPendingDone(subscribe_pending_count_, wait_pending_timeout_); + + // Register + for (const auto &item : id_to_data_) { + ++pending_count_; + RAY_CHECK_OK(job_accessor.AsyncAdd(item.second, [this](Status status) { + RAY_CHECK_OK(status); + --pending_count_; + })); + } + WaitPendingDone(wait_pending_timeout_); + WaitPendingDone(subscribe_pending_count_, wait_pending_timeout_); + + // Update + for (auto &item : id_to_data_) { + ++pending_count_; + ++subscribe_pending_count_; + RAY_CHECK_OK(job_accessor.AsyncMarkFinished(item.first, [this](Status status) { + RAY_CHECK_OK(status); + --pending_count_; + })); + } + WaitPendingDone(wait_pending_timeout_); + WaitPendingDone(subscribe_pending_count_, wait_pending_timeout_); +} + +} // namespace gcs + +} // namespace ray diff --git a/src/ray/gcs/subscription_executor_test.cc b/src/ray/gcs/test/subscription_executor_test.cc similarity index 94% rename from src/ray/gcs/subscription_executor_test.cc rename to src/ray/gcs/test/subscription_executor_test.cc index bc89c48df..8d04b0e3a 100644 --- a/src/ray/gcs/subscription_executor_test.cc +++ b/src/ray/gcs/test/subscription_executor_test.cc @@ -1,7 +1,8 @@ +#include "ray/gcs/subscription_executor.h" #include "gtest/gtest.h" -#include "ray/gcs/accessor_test_base.h" #include "ray/gcs/callback.h" #include "ray/gcs/redis_gcs_client.h" +#include "ray/gcs/test/accessor_test_base.h" namespace ray { @@ -42,7 +43,7 @@ class SubscriptionExecutorTest : public AccessorTestBase actor = std::make_shared(); actor->set_max_reconstructions(1); actor->set_remaining_reconstructions(1); @@ -85,12 +86,12 @@ class SubscriptionExecutorTest : public AccessorTestBaseAsyncSubscribe(ClientID::Nil(), subscribe_, sub_done_); + actor_sub_executor_->AsyncSubscribeAll(ClientID::Nil(), subscribe_, sub_done_); WaitPendingDone(do_sub_pending_count_, wait_pending_timeout_); ASSERT_TRUE(status.ok()); sub_pending_count_ = id_to_data_.size(); AsyncRegisterActorToGcs(); - status = actor_sub_executor_->AsyncSubscribe(ClientID::Nil(), subscribe_, sub_done_); + status = actor_sub_executor_->AsyncSubscribeAll(ClientID::Nil(), subscribe_, sub_done_); ASSERT_TRUE(status.IsInvalid()); WaitPendingDone(sub_pending_count_, wait_pending_timeout_); } @@ -128,7 +129,7 @@ TEST_F(SubscriptionExecutorTest, SubscribeOneAfterActorRegistrationWithClientIDT TEST_F(SubscriptionExecutorTest, SubscribeAllAndSubscribeOneTest) { ++do_sub_pending_count_; Status status = - actor_sub_executor_->AsyncSubscribe(ClientID::Nil(), subscribe_, sub_done_); + actor_sub_executor_->AsyncSubscribeAll(ClientID::Nil(), subscribe_, sub_done_); ASSERT_TRUE(status.ok()); WaitPendingDone(do_sub_pending_count_, wait_pending_timeout_); for (const auto &item : id_to_data_) { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 12b674a29..867c65006 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -4,10 +4,10 @@ #include #include -#include "ray/common/status.h" - #include "ray/common/common_protocol.h" #include "ray/common/id.h" +#include "ray/common/status.h" +#include "ray/gcs/pb_util.h" #include "ray/raylet/format/node_manager_generated.h" #include "ray/stats/stats.h" #include "ray/util/sample.h" @@ -235,13 +235,13 @@ ray::Status NodeManager::RegisterGcs() { /*subscribe_callback=*/nullptr, /*done_callback=*/nullptr)); - // Subscribe to driver table updates. - const auto job_table_handler = [this](gcs::RedisGcsClient *client, const JobID &job_id, - const std::vector &job_data) { - HandleJobTableUpdate(job_id, job_data); + // Subscribe to job updates. + const auto job_subscribe_handler = [this](const JobID &job_id, + const JobTableData &job_data) { + HandleJobFinished(job_id, job_data); }; - RAY_RETURN_NOT_OK(gcs_client_->job_table().Subscribe(JobID::Nil(), ClientID::Nil(), - job_table_handler, nullptr)); + RAY_RETURN_NOT_OK( + gcs_client_->Jobs().AsyncSubscribeToFinishedJobs(job_subscribe_handler, nullptr)); // Start sending heartbeats to the GCS. last_heartbeat_at_ms_ = current_time_ms(); @@ -272,40 +272,33 @@ void NodeManager::KillWorker(std::shared_ptr worker) { }); } -void NodeManager::HandleJobTableUpdate(const JobID &id, - const std::vector &job_data) { - for (const auto &entry : job_data) { - RAY_LOG(DEBUG) << "HandleJobTableUpdate " << JobID::FromBinary(entry.job_id()) << " " - << entry.is_dead(); - if (entry.is_dead()) { - auto job_id = JobID::FromBinary(entry.job_id()); - auto workers = worker_pool_.GetWorkersRunningTasksForJob(job_id); - - // Kill all the workers. The actual cleanup for these workers is done - // later when we receive the DisconnectClient message from them. - for (const auto &worker : workers) { - if (!worker->IsDetachedActor()) { - // Clean up any open ray.wait calls that the worker made. - task_dependency_manager_.UnsubscribeWaitDependencies(worker->WorkerId()); - // Mark the worker as dead so further messages from it are ignored - // (except DisconnectClient). - worker->MarkDead(); - // Then kill the worker process. - KillWorker(worker); - } - } - - // Remove all tasks for this job from the scheduling queues, mark - // the results for these tasks as not required, cancel any attempts - // at reconstruction. Note that at this time the workers are likely - // alive because of the delay in killing workers. - auto tasks_to_remove = local_queues_.GetTaskIdsForJob(job_id); - task_dependency_manager_.RemoveTasksAndRelatedObjects(tasks_to_remove); - // NOTE(swang): SchedulingQueue::RemoveTasks modifies its argument so we must - // call it last. - local_queues_.RemoveTasks(tasks_to_remove); +void NodeManager::HandleJobFinished(const JobID &job_id, const JobTableData &job_data) { + RAY_LOG(DEBUG) << "HandleJobFinished " << job_id; + RAY_CHECK(job_data.is_dead()); + auto workers = worker_pool_.GetWorkersRunningTasksForJob(job_id); + // Kill all the workers. The actual cleanup for these workers is done + // later when we receive the DisconnectClient message from them. + for (const auto &worker : workers) { + if (!worker->IsDetachedActor()) { + // Clean up any open ray.wait calls that the worker made. + task_dependency_manager_.UnsubscribeWaitDependencies(worker->WorkerId()); + // Mark the worker as dead so further messages from it are ignored + // (except DisconnectClient). + worker->MarkDead(); + // Then kill the worker process. + KillWorker(worker); } } + + // Remove all tasks for this job from the scheduling queues, mark + // the results for these tasks as not required, cancel any attempts + // at reconstruction. Note that at this time the workers are likely + // alive because of the delay in killing workers. + auto tasks_to_remove = local_queues_.GetTaskIdsForJob(job_id); + task_dependency_manager_.RemoveTasksAndRelatedObjects(tasks_to_remove); + // NOTE(swang): SchedulingQueue::RemoveTasks modifies its argument so we must + // call it last. + local_queues_.RemoveTasks(tasks_to_remove); } void NodeManager::Heartbeat() { @@ -1048,9 +1041,10 @@ void NodeManager::ProcessRegisterClientRequestMessage( status = worker_pool_.RegisterDriver(std::move(worker)); if (status.ok()) { local_queues_.AddDriverTaskId(driver_task_id); - RAY_CHECK_OK(gcs_client_->job_table().AppendJobData( - job_id, /*is_dead=*/false, std::time(nullptr), - initial_config_.node_manager_address, message->worker_pid())); + auto job_data_ptr = gcs::CreateJobTableData( + job_id, /*is_dead*/ false, std::time(nullptr), + initial_config_.node_manager_address, message->worker_pid()); + RAY_CHECK_OK(gcs_client_->Jobs().AsyncAdd(job_data_ptr, nullptr)); } } } @@ -1255,11 +1249,9 @@ void NodeManager::ProcessDisconnectClientMessage( } else if (is_driver) { // The client is a driver. const auto job_id = worker->GetAssignedJobId(); - const auto driver_id = ComputeDriverIdFromJob(job_id); RAY_CHECK(!job_id.IsNil()); - RAY_CHECK_OK(gcs_client_->job_table().AppendJobData( - job_id, /*is_dead=*/true, std::time(nullptr), - initial_config_.node_manager_address, worker->Pid())); + RAY_CHECK_OK(gcs_client_->Jobs().AsyncMarkFinished(job_id, nullptr)); + const auto driver_id = ComputeDriverIdFromJob(job_id); local_queues_.RemoveDriverTaskId(TaskID::ComputeDriverTaskId(driver_id)); worker_pool_.DisconnectDriver(worker); diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index f2e41358e..0c017daeb 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -390,12 +390,12 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// \return Void. void HandleObjectMissing(const ObjectID &object_id); - /// Handles updates to job table. + /// Handles the event that a job is finished. /// - /// \param id An unused value. TODO(rkn): Should this be removed? - /// \param job_data Data associated with a job table event. + /// \param job_id ID of the finished job. + /// \param job_data Data associated with the finished job. /// \return Void. - void HandleJobTableUpdate(const JobID &id, const std::vector &job_data); + void HandleJobFinished(const JobID &job_id, const JobTableData &job_data); /// Check if certain invariants associated with the task dependency manager /// and the local queues are satisfied. This is only used for debugging diff --git a/src/ray/test/run_gcs_tests.sh b/src/ray/test/run_gcs_tests.sh index 228754895..0a67533ba 100644 --- a/src/ray/test/run_gcs_tests.sh +++ b/src/ray/test/run_gcs_tests.sh @@ -6,7 +6,8 @@ set -e set -x -bazel build "//:redis_gcs_client_test" "//:redis_actor_info_accessor_test" "//:subscription_executor_test" "//:asio_test" "//:libray_redis_module.so" +bazel build "//:redis_gcs_client_test" "//:subscription_executor_test" "//:asio_test" "//:libray_redis_module.so" +bazel build "//:redis_actor_info_accessor_test" "//:redis_job_info_accessor_test" # Start Redis. ./bazel-bin/redis-server \ @@ -16,9 +17,10 @@ bazel build "//:redis_gcs_client_test" "//:redis_actor_info_accessor_test" "//: sleep 1s ./bazel-bin/redis_gcs_client_test -./bazel-bin/redis_actor_info_accessor_test ./bazel-bin/subscription_executor_test ./bazel-bin/asio_test +./bazel-bin/redis_actor_info_accessor_test +./bazel-bin/redis_job_info_accessor_test ./bazel-bin/redis-cli -p 6379 shutdown sleep 1s