[GCS] refactor the GCS Client Job Interface (#5503)

This commit is contained in:
micafan 2019-12-12 16:57:32 +08:00 committed by Hao Chen
parent 40211bed4b
commit 8c1520d18e
23 changed files with 588 additions and 318 deletions

View file

@ -717,6 +717,7 @@ cc_library(
),
hdrs = glob([
"src/ray/gcs/*.h",
"src/ray/gcs/test/*.h",
]),
copts = COPTS,
deps = [
@ -735,7 +736,7 @@ cc_library(
cc_binary(
name = "redis_gcs_client_test",
testonly = 1,
srcs = ["src/ray/gcs/redis_gcs_client_test.cc"],
srcs = ["src/ray/gcs/test/redis_gcs_client_test.cc"],
copts = COPTS,
deps = [
":gcs",
@ -746,7 +747,7 @@ cc_binary(
cc_binary(
name = "redis_actor_info_accessor_test",
testonly = 1,
srcs = ["src/ray/gcs/redis_actor_info_accessor_test.cc"],
srcs = ["src/ray/gcs/test/redis_actor_info_accessor_test.cc"],
copts = COPTS,
deps = [
":gcs",
@ -757,7 +758,18 @@ cc_binary(
cc_binary(
name = "subscription_executor_test",
testonly = 1,
srcs = ["src/ray/gcs/subscription_executor_test.cc"],
srcs = ["src/ray/gcs/test/subscription_executor_test.cc"],
copts = COPTS,
deps = [
":gcs",
"@com_google_googletest//:gtest_main",
],
)
cc_binary(
name = "redis_job_info_accessor_test",
testonly = 1,
srcs = ["src/ray/gcs/test/redis_job_info_accessor_test.cc"],
copts = COPTS,
deps = [
":gcs",
@ -768,7 +780,7 @@ cc_binary(
cc_binary(
name = "asio_test",
testonly = 1,
srcs = ["src/ray/gcs/asio_test.cc"],
srcs = ["src/ray/gcs/test/asio_test.cc"],
copts = COPTS,
deps = [
":gcs",

View file

@ -3,7 +3,7 @@
#include "ray/common/id.h"
#include "ray/gcs/callback.h"
#include "ray/gcs/subscription_executor.h"
#include "ray/protobuf/gcs.pb.h"
namespace ray {
@ -23,7 +23,7 @@ class ActorInfoAccessor {
/// \param callback Callback that will be called after lookup finishes.
/// \return Status
virtual Status AsyncGet(const ActorID &actor_id,
const OptionalItemCallback<ActorTableData> &callback) = 0;
const OptionalItemCallback<rpc::ActorTableData> &callback) = 0;
/// Register an actor to GCS asynchronously.
///
@ -31,7 +31,7 @@ class ActorInfoAccessor {
/// \param callback Callback that will be called after actor has been registered
/// to the GCS.
/// \return Status
virtual Status AsyncRegister(const std::shared_ptr<ActorTableData> &data_ptr,
virtual Status AsyncRegister(const std::shared_ptr<rpc::ActorTableData> &data_ptr,
const StatusCallback &callback) = 0;
/// Update dynamic states of actor in GCS asynchronously.
@ -43,7 +43,7 @@ class ActorInfoAccessor {
/// TODO(micafan) Don't expose the whole `ActorTableData` and only allow
/// updating dynamic states.
virtual Status AsyncUpdate(const ActorID &actor_id,
const std::shared_ptr<ActorTableData> &data_ptr,
const std::shared_ptr<rpc::ActorTableData> &data_ptr,
const StatusCallback &callback) = 0;
/// Subscribe to any register or update operations of actors.
@ -54,7 +54,7 @@ class ActorInfoAccessor {
/// are ready to receive notification.
/// \return Status
virtual Status AsyncSubscribeAll(
const SubscribeCallback<ActorID, ActorTableData> &subscribe,
const SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe,
const StatusCallback &done) = 0;
/// Subscribe to any update operations of an actor.
@ -65,7 +65,7 @@ class ActorInfoAccessor {
/// \return Status
virtual Status AsyncSubscribe(
const ActorID &actor_id,
const SubscribeCallback<ActorID, ActorTableData> &subscribe,
const SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe,
const StatusCallback &done) = 0;
/// Cancel subscription to an actor.

View file

@ -7,6 +7,7 @@
#include <vector>
#include "ray/common/status.h"
#include "ray/gcs/actor_info_accessor.h"
#include "ray/gcs/job_info_accessor.h"
#include "ray/util/logging.h"
namespace ray {
@ -29,24 +30,7 @@ class GcsClientOptions {
: server_ip_(ip),
server_port_(port),
password_(password),
is_test_client_(is_test_client) {
#if RAY_USE_NEW_GCS
command_type_ = CommandType::kChain;
#else
command_type_ = CommandType::kRegular;
#endif
}
/// This constructor is only used for testing (RedisGcsClient's test).
///
/// \param ip GCS service ip
/// \param port GCS service port
/// \param command_type Command type of RedisGcsClient
GcsClientOptions(const std::string &ip, int port, CommandType command_type)
: server_ip_(ip),
server_port_(port),
command_type_(command_type),
is_test_client_(true) {}
is_test_client_(is_test_client) {}
// GCS server address
std::string server_ip_;
@ -54,9 +38,6 @@ class GcsClientOptions {
// Password of GCS server.
std::string password_;
// GCS command type. If CommandType::kChain, chain-replicated versions of the tables
// might be used, if available.
CommandType command_type_ = CommandType::kUnknown;
// Whether this client is used for tests.
bool is_test_client_{false};
@ -80,13 +61,20 @@ class GcsClient : public std::enable_shared_from_this<GcsClient> {
/// Disconnect with GCS Service. Non-thread safe.
virtual void Disconnect() = 0;
/// Get ActorInfoAccessor for reading or writing or subscribing to
/// actors. This function is thread safe.
/// Get the sub-interface for accessing actor information in GCS.
/// This function is thread safe.
ActorInfoAccessor &Actors() {
RAY_CHECK(actor_accessor_ != nullptr);
return *actor_accessor_;
}
/// Get the sub-interface for accessing job information in GCS.
/// This function is thread safe.
JobInfoAccessor &Jobs() {
RAY_CHECK(job_accessor_ != nullptr);
return *job_accessor_;
}
protected:
/// Constructor of GcsClient.
///
@ -99,6 +87,7 @@ class GcsClient : public std::enable_shared_from_this<GcsClient> {
bool is_connected_{false};
std::unique_ptr<ActorInfoAccessor> actor_accessor_;
std::unique_ptr<JobInfoAccessor> job_accessor_;
};
} // namespace gcs

View file

@ -0,0 +1,54 @@
#ifndef RAY_GCS_JOB_INFO_ACCESSOR_H
#define RAY_GCS_JOB_INFO_ACCESSOR_H
#include "ray/common/id.h"
#include "ray/gcs/callback.h"
#include "ray/protobuf/gcs.pb.h"
namespace ray {
namespace gcs {
/// \class JobInfoAccessor
/// `JobInfoAccessor` is a sub-interface of `GcsClient`.
/// This class includes all the methods that are related to accessing
/// job information in the GCS.
class JobInfoAccessor {
public:
virtual ~JobInfoAccessor() = default;
/// Add a job to GCS asynchronously.
///
/// \param data_ptr The job that will be add to GCS.
/// \param callback Callback that will be called after job has been added
/// to GCS.
/// \return Status
virtual Status AsyncAdd(const std::shared_ptr<rpc::JobTableData> &data_ptr,
const StatusCallback &callback) = 0;
/// Mark job as finished in GCS asynchronously.
///
/// \param job_id ID of the job that will be make finished to GCS.
/// \param callback Callback that will be called after update finished.
/// \return Status
virtual Status AsyncMarkFinished(const JobID &job_id,
const StatusCallback &callback) = 0;
/// Subscribe to finished jobs.
///
/// \param subscribe Callback that will be called each time when a job finishes.
/// \param done Callback that will be called when subscription is complete.
/// \return Status
virtual Status AsyncSubscribeToFinishedJobs(
const SubscribeCallback<JobID, rpc::JobTableData> &subscribe,
const StatusCallback &done) = 0;
protected:
JobInfoAccessor() = default;
};
} // namespace gcs
} // namespace ray
#endif // RAY_GCS_JOB_INFO_ACCESSOR_H

36
src/ray/gcs/pb_util.h Normal file
View file

@ -0,0 +1,36 @@
#ifndef RAY_GCS_PB_UTIL_H
#define RAY_GCS_PB_UTIL_H
#include <memory>
#include "ray/common/id.h"
#include "ray/protobuf/gcs.pb.h"
namespace ray {
namespace gcs {
/// Helper function to produce job table data (for newly created job or updated job).
///
/// \param job_id The ID of job that need to be registered or updated.
/// \param is_dead Whether the driver of this job is dead.
/// \param timestamp The UNIX timestamp of corresponding to this event.
/// \param node_manager_address Address of the node this job was started on.
/// \param driver_pid Process ID of the driver running this job.
/// \return The job table data created by this method.
inline std::shared_ptr<ray::rpc::JobTableData> CreateJobTableData(
const ray::JobID &job_id, bool is_dead, int64_t timestamp,
const std::string &node_manager_address, int64_t driver_pid) {
auto job_info_ptr = std::make_shared<ray::rpc::JobTableData>();
job_info_ptr->set_job_id(job_id.Binary());
job_info_ptr->set_is_dead(is_dead);
job_info_ptr->set_timestamp(timestamp);
job_info_ptr->set_node_manager_address(node_manager_address);
job_info_ptr->set_driver_pid(driver_pid);
return job_info_ptr;
}
} // namespace gcs
} // namespace ray
#endif // RAY_GCS_PB_UTIL_H

View file

@ -92,7 +92,7 @@ Status RedisActorInfoAccessor::AsyncSubscribeAll(
const SubscribeCallback<ActorID, ActorTableData> &subscribe,
const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr);
return actor_sub_executor_.AsyncSubscribe(ClientID::Nil(), subscribe, done);
return actor_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), subscribe, done);
}
Status RedisActorInfoAccessor::AsyncSubscribe(

View file

@ -1,10 +1,10 @@
#include "ray/gcs/redis_gcs_client.h"
#include <unistd.h>
#include "ray/common/ray_config.h"
#include "ray/gcs/redis_actor_info_accessor.h"
#include "ray/gcs/redis_context.h"
#include <unistd.h>
#include "ray/gcs/redis_job_info_accessor.h"
static void GetRedisShards(redisContext *context, std::vector<std::string> &addresses,
std::vector<int> &ports) {
@ -73,7 +73,16 @@ namespace ray {
namespace gcs {
RedisGcsClient::RedisGcsClient(const GcsClientOptions &options) : GcsClient(options) {}
RedisGcsClient::RedisGcsClient(const GcsClientOptions &options) : GcsClient(options) {
#if RAY_USE_NEW_GCS
command_type_ = CommandType::kChain;
#else
command_type_ = CommandType::kRegular;
#endif
}
RedisGcsClient::RedisGcsClient(const GcsClientOptions &options, CommandType command_type)
: GcsClient(options), command_type_(command_type) {}
Status RedisGcsClient::Connect(boost::asio::io_service &io_service) {
RAY_CHECK(!is_connected_);
@ -114,6 +123,8 @@ Status RedisGcsClient::Connect(boost::asio::io_service &io_service) {
/*password=*/options_.password_));
}
Attach(io_service);
actor_table_.reset(new ActorTable({primary_context_}, this));
// TODO(micafan) Modify ClientTable' Constructor(remove ClientID) in future.
@ -127,8 +138,7 @@ Status RedisGcsClient::Connect(boost::asio::io_service &io_service) {
heartbeat_batch_table_.reset(new HeartbeatBatchTable({primary_context_}, this));
// Tables below would be sharded.
object_table_.reset(new ObjectTable(shard_contexts_, this));
raylet_task_table_.reset(
new raylet::TaskTable(shard_contexts_, this, options_.command_type_));
raylet_task_table_.reset(new raylet::TaskTable(shard_contexts_, this, command_type_));
task_reconstruction_log_.reset(new TaskReconstructionLog(shard_contexts_, this));
task_lease_table_.reset(new TaskLeaseTable(shard_contexts_, this));
heartbeat_table_.reset(new HeartbeatTable(shard_contexts_, this));
@ -138,14 +148,13 @@ Status RedisGcsClient::Connect(boost::asio::io_service &io_service) {
resource_table_.reset(new DynamicResourceTable({primary_context_}, this));
actor_accessor_.reset(new RedisActorInfoAccessor(this));
job_accessor_.reset(new RedisJobInfoAccessor(this));
Status status = Attach(io_service);
is_connected_ = status.ok();
is_connected_ = true;
// TODO(micafan): Synchronously register node and look up existing nodes here
// for this client is Raylet.
RAY_LOG(INFO) << "RedisGcsClient::Connect finished with status " << status;
return status;
RAY_LOG(INFO) << "RedisGcsClient Connected.";
return Status::OK();
}
void RedisGcsClient::Disconnect() {
@ -155,7 +164,7 @@ void RedisGcsClient::Disconnect() {
// TODO(micafan): Synchronously unregister node if this client is Raylet.
}
Status RedisGcsClient::Attach(boost::asio::io_service &io_service) {
void RedisGcsClient::Attach(boost::asio::io_service &io_service) {
// Take care of sharding contexts.
RAY_CHECK(shard_asio_async_clients_.empty()) << "Attach shall be called only once";
for (std::shared_ptr<RedisContext> context : shard_contexts_) {
@ -168,7 +177,6 @@ Status RedisGcsClient::Attach(boost::asio::io_service &io_service) {
new RedisAsioClient(io_service, primary_context_->async_context()));
asio_subscribe_auxiliary_client_.reset(
new RedisAsioClient(io_service, primary_context_->subscribe_context()));
return Status::OK();
}
std::string RedisGcsClient::DebugString() const {

View file

@ -21,7 +21,9 @@ class RAY_EXPORT RedisGcsClient : public GcsClient {
// TODO(micafan) Will remove those friend class after we replace RedisGcsClient
// with interface class GcsClient in raylet.
friend class RedisActorInfoAccessor;
friend class RedisJobInfoAccessor;
friend class SubscriptionExecutorTest;
friend class LogSubscribeTestHelper;
public:
/// Constructor of RedisGcsClient.
@ -29,9 +31,16 @@ class RAY_EXPORT RedisGcsClient : public GcsClient {
/// TODO(micafan) To read and write from the GCS tables requires a further
/// call to Connect() to the client table. Will fix this in next pr.
///
/// \param GcsClientOptions Options of client, e.g. server address, is test client ...
/// \param options Options of this client, e.g. server address, password and so on.
RedisGcsClient(const GcsClientOptions &options);
/// This constructor is only used for testing.
/// Connect() must be called(and return ok) before you call any other methods.
///
/// \param options Options of this client, e.g. server address, password and so on.
/// \param command_type The commands issued type.
RedisGcsClient(const GcsClientOptions &options, CommandType command_type);
/// Connect to GCS Service. Non-thread safe.
/// Call this function before calling other functions.
///
@ -53,7 +62,6 @@ class RAY_EXPORT RedisGcsClient : public GcsClient {
HeartbeatTable &heartbeat_table();
HeartbeatBatchTable &heartbeat_batch_table();
ErrorTable &error_table();
JobTable &job_table();
ProfileTable &profile_table();
ActorCheckpointTable &actor_checkpoint_table();
ActorCheckpointIdTable &actor_checkpoint_id_table();
@ -78,10 +86,16 @@ class RAY_EXPORT RedisGcsClient : public GcsClient {
private:
/// Attach this client to an asio event loop. Note that only
/// one event loop should be attached at a time.
Status Attach(boost::asio::io_service &io_service);
void Attach(boost::asio::io_service &io_service);
/// Use method Actors() instead
/// This method will be deprecated, use method Actors() instead.
ActorTable &actor_table();
/// This method will be deprecated, use method Jobs() instead.
JobTable &job_table();
// GCS command type. If CommandType::kChain, chain-replicated versions of the tables
// might be used, if available.
CommandType command_type_{CommandType::kUnknown};
std::unique_ptr<ObjectTable> object_table_;
std::unique_ptr<raylet::TaskTable> raylet_task_table_;

View file

@ -0,0 +1,50 @@
#include "ray/gcs/redis_job_info_accessor.h"
#include "ray/gcs/pb_util.h"
#include "ray/gcs/redis_gcs_client.h"
namespace ray {
namespace gcs {
RedisJobInfoAccessor::RedisJobInfoAccessor(RedisGcsClient *client_impl)
: client_impl_(client_impl), job_sub_executor_(client_impl->job_table()) {}
Status RedisJobInfoAccessor::AsyncAdd(const std::shared_ptr<JobTableData> &data_ptr,
const StatusCallback &callback) {
return DoAsyncAppend(data_ptr, callback);
}
Status RedisJobInfoAccessor::AsyncMarkFinished(const JobID &job_id,
const StatusCallback &callback) {
std::shared_ptr<JobTableData> data_ptr =
CreateJobTableData(job_id, /*is_dead*/ true, /*time_stamp*/ std::time(nullptr),
/*node_manager_address*/ "", /*driver_pid*/ -1);
return DoAsyncAppend(data_ptr, callback);
}
Status RedisJobInfoAccessor::DoAsyncAppend(const std::shared_ptr<JobTableData> &data_ptr,
const StatusCallback &callback) {
JobTable::WriteCallback on_done = nullptr;
if (callback != nullptr) {
on_done = [callback](RedisGcsClient *client, const JobID &job_id,
const JobTableData &data) { callback(Status::OK()); };
}
JobID job_id = JobID::FromBinary(data_ptr->job_id());
return client_impl_->job_table().Append(job_id, job_id, data_ptr, on_done);
}
Status RedisJobInfoAccessor::AsyncSubscribeToFinishedJobs(
const SubscribeCallback<JobID, JobTableData> &subscribe, const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr);
auto on_subscribe = [subscribe](const JobID &job_id, const JobTableData &job_data) {
if (job_data.is_dead()) {
subscribe(job_id, job_data);
}
};
return job_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), on_subscribe, done);
}
} // namespace gcs
} // namespace ray

View file

@ -0,0 +1,53 @@
#ifndef RAY_GCS_REDIS_JOB_INFO_ACCESSOR_H
#define RAY_GCS_REDIS_JOB_INFO_ACCESSOR_H
#include "ray/common/id.h"
#include "ray/gcs/callback.h"
#include "ray/gcs/job_info_accessor.h"
#include "ray/gcs/subscription_executor.h"
#include "ray/gcs/tables.h"
namespace ray {
namespace gcs {
class RedisGcsClient;
/// \class RedisJobInfoAccessor
/// RedisJobInfoAccessor is an implementation of `JobInfoAccessor`
/// that uses Redis as the backend storage.
class RedisJobInfoAccessor : public JobInfoAccessor {
public:
explicit RedisJobInfoAccessor(RedisGcsClient *client_impl);
virtual ~RedisJobInfoAccessor() {}
Status AsyncAdd(const std::shared_ptr<JobTableData> &data_ptr,
const StatusCallback &callback) override;
Status AsyncMarkFinished(const JobID &job_id, const StatusCallback &callback) override;
Status AsyncSubscribeToFinishedJobs(
const SubscribeCallback<JobID, JobTableData> &subscribe,
const StatusCallback &done) override;
private:
/// Append job information to GCS asynchronously.
///
/// \param data_ptr The job information that will be appended to GCS.
/// \param callback Callback that will be called after append done.
/// \return Status
Status DoAsyncAppend(const std::shared_ptr<JobTableData> &data_ptr,
const StatusCallback &callback);
RedisGcsClient *client_impl_{nullptr};
typedef SubscriptionExecutor<JobID, JobTableData, JobTable> JobSubscriptionExecutor;
JobSubscriptionExecutor job_sub_executor_;
};
} // namespace gcs
} // namespace ray
#endif // RAY_GCS_REDIS_JOB_INFO_ACCESSOR_H

View file

@ -5,7 +5,7 @@ namespace ray {
namespace gcs {
template <typename ID, typename Data, typename Table>
Status SubscriptionExecutor<ID, Data, Table>::AsyncSubscribe(
Status SubscriptionExecutor<ID, Data, Table>::AsyncSubscribeAll(
const ClientID &client_id, const SubscribeCallback<ID, Data> &subscribe,
const StatusCallback &done) {
// TODO(micafan) Optimize the lock when necessary.
@ -138,7 +138,7 @@ Status SubscriptionExecutor<ID, Data, Table>::AsyncSubscribe(
id_to_callback_map_[id] = subscribe;
}
auto status = AsyncSubscribe(client_id, nullptr, on_subscribe_done);
auto status = AsyncSubscribeAll(client_id, nullptr, on_subscribe_done);
if (!status.ok()) {
std::unique_lock<std::mutex> lock(mutex_);
id_to_callback_map_.erase(id);
@ -187,6 +187,7 @@ Status SubscriptionExecutor<ID, Data, Table>::AsyncUnsubscribe(
}
template class SubscriptionExecutor<ActorID, ActorTableData, ActorTable>;
template class SubscriptionExecutor<JobID, JobTableData, JobTable>;
} // namespace gcs

View file

@ -32,9 +32,9 @@ class SubscriptionExecutor {
/// is registered or updated.
/// \param done Callback that will be called when subscription is complete.
/// \return Status
Status AsyncSubscribe(const ClientID &client_id,
const SubscribeCallback<ID, Data> &subscribe,
const StatusCallback &done);
Status AsyncSubscribeAll(const ClientID &client_id,
const SubscribeCallback<ID, Data> &subscribe,
const StatusCallback &done);
/// Subscribe to operations of an element.
/// Repeated subscription to an element will return a failure.

View file

@ -503,18 +503,6 @@ std::string ProfileTable::DebugString() const {
return Log<UniqueID, ProfileTableData>::DebugString();
}
Status JobTable::AppendJobData(const JobID &job_id, bool is_dead, int64_t timestamp,
const std::string &node_manager_address,
int64_t driver_pid) {
auto data = std::make_shared<JobTableData>();
data->set_job_id(job_id.Binary());
data->set_is_dead(is_dead);
data->set_timestamp(timestamp);
data->set_node_manager_address(node_manager_address);
data->set_driver_pid(driver_pid);
return Append(JobID(job_id), job_id, data, /*done_callback=*/nullptr);
}
void ClientTable::RegisterClientAddedCallback(const ClientTableCallback &callback) {
client_added_callback_ = callback;
// Call the callback for any added clients that are cached.

View file

@ -654,17 +654,6 @@ class JobTable : public Log<JobID, JobTableData> {
};
virtual ~JobTable() {}
/// Appends job data to the job table.
///
/// \param job_id The job id.
/// \param is_dead Whether the job is dead.
/// \param timestamp The UNIX timestamp when the driver was started/stopped.
/// \param node_manager_address IP address of the node the driver is running on.
/// \param driver_pid Process ID of the driver process.
/// \return The return status.
Status AppendJobData(const JobID &job_id, bool is_dead, int64_t timestamp,
const std::string &node_manager_address, int64_t driver_pid);
};
/// Actor table starts with an ALIVE entry, which represents the first time the actor

View file

@ -1,7 +1,7 @@
#include <iostream>
#include "asio.h"
#include "gtest/gtest.h"
#include "ray/gcs/asio.h"
#include "ray/util/logging.h"
extern "C" {

View file

@ -4,8 +4,8 @@
#include <thread>
#include <vector>
#include "gtest/gtest.h"
#include "ray/gcs/accessor_test_base.h"
#include "ray/gcs/redis_gcs_client.h"
#include "ray/gcs/test/accessor_test_base.h"
#include "ray/util/test_util.h"
namespace ray {

View file

@ -6,6 +6,7 @@ extern "C" {
}
#include "ray/common/ray_config.h"
#include "ray/gcs/pb_util.h"
#include "ray/gcs/redis_gcs_client.h"
#include "ray/gcs/tables.h"
#include "ray/util/test_util.h"
@ -30,8 +31,8 @@ inline JobID NextJobID() {
class TestGcs : public ::testing::Test {
public:
TestGcs(CommandType command_type) : num_callbacks_(0), command_type_(command_type) {
GcsClientOptions options("127.0.0.1", 6379, command_type_);
client_ = std::make_shared<gcs::RedisGcsClient>(options);
GcsClientOptions options("127.0.0.1", 6379, "", true);
client_ = std::make_shared<gcs::RedisGcsClient>(options, command_type_);
job_id_ = NextJobID();
}
@ -575,52 +576,193 @@ TEST_F(TestGcsWithAsio, TestDeleteKey) {
TestDeleteKeys(job_id_, client_);
}
void TestLogSubscribeAll(const JobID &job_id,
std::shared_ptr<gcs::RedisGcsClient> client) {
std::vector<JobID> job_ids;
for (int i = 0; i < 3; i++) {
job_ids.emplace_back(NextJobID());
/// A helper class for Log Subscribe testing.
class LogSubscribeTestHelper {
public:
static void TestLogSubscribeAll(const JobID &job_id,
std::shared_ptr<gcs::RedisGcsClient> client) {
std::vector<JobID> job_ids;
for (int i = 0; i < 3; i++) {
job_ids.emplace_back(NextJobID());
}
// Callback for a notification.
auto notification_callback = [job_ids](gcs::RedisGcsClient *client, const JobID &id,
const std::vector<JobTableData> data) {
ASSERT_EQ(id, job_ids[test->NumCallbacks()]);
// Check that we get notifications in the same order as the writes.
for (const auto &entry : data) {
ASSERT_EQ(entry.job_id(), job_ids[test->NumCallbacks()].Binary());
test->IncrementNumCallbacks();
}
if (test->NumCallbacks() == job_ids.size()) {
test->Stop();
}
};
// Callback for subscription success. We are guaranteed to receive
// notifications after this is called.
auto subscribe_callback = [job_ids](gcs::RedisGcsClient *client) {
// We have subscribed. Do the writes to the table.
for (size_t i = 0; i < job_ids.size(); i++) {
auto job_info_ptr = CreateJobTableData(job_ids[i], false, 0, "localhost", 1);
RAY_CHECK_OK(
client->job_table().Append(job_ids[i], job_ids[i], job_info_ptr, nullptr));
}
};
// Subscribe to all driver table notifications. Once we have successfully
// subscribed, we will append to the key several times and check that we get
// notified for each.
RAY_CHECK_OK(client->job_table().Subscribe(
job_id, ClientID::Nil(), notification_callback, subscribe_callback));
// Run the event loop. The loop will only stop if the registered subscription
// callback is called (or an assertion failure).
test->Start();
// Check that we received one notification callback for each write.
ASSERT_EQ(test->NumCallbacks(), job_ids.size());
}
// Callback for a notification.
auto notification_callback = [job_ids](gcs::RedisGcsClient *client, const JobID &id,
const std::vector<JobTableData> data) {
ASSERT_EQ(id, job_ids[test->NumCallbacks()]);
// Check that we get notifications in the same order as the writes.
for (const auto &entry : data) {
ASSERT_EQ(entry.job_id(), job_ids[test->NumCallbacks()].Binary());
test->IncrementNumCallbacks();
}
if (test->NumCallbacks() == job_ids.size()) {
test->Stop();
}
};
// Callback for subscription success. We are guaranteed to receive
// notifications after this is called.
auto subscribe_callback = [job_ids](gcs::RedisGcsClient *client) {
// We have subscribed. Do the writes to the table.
for (size_t i = 0; i < job_ids.size(); i++) {
RAY_CHECK_OK(
client->job_table().AppendJobData(job_ids[i], false, 0, "localhost", 1));
}
};
static void TestLogSubscribeId(const JobID &job_id,
std::shared_ptr<gcs::RedisGcsClient> client) {
// Add a log entry.
JobID job_id1 = NextJobID();
std::vector<std::string> job_ids1 = {"abc", "def", "ghi"};
auto data1 = std::make_shared<JobTableData>();
data1->set_job_id(job_ids1[0]);
RAY_CHECK_OK(client->job_table().Append(job_id, job_id1, data1, nullptr));
// Subscribe to all driver table notifications. Once we have successfully
// subscribed, we will append to the key several times and check that we get
// notified for each.
RAY_CHECK_OK(client->job_table().Subscribe(job_id, ClientID::Nil(),
notification_callback, subscribe_callback));
// Add a log entry at a second key.
JobID job_id2 = NextJobID();
std::vector<std::string> job_ids2 = {"jkl", "mno", "pqr"};
auto data2 = std::make_shared<JobTableData>();
data2->set_job_id(job_ids2[0]);
RAY_CHECK_OK(client->job_table().Append(job_id, job_id2, data2, nullptr));
// Run the event loop. The loop will only stop if the registered subscription
// callback is called (or an assertion failure).
test->Start();
// Check that we received one notification callback for each write.
ASSERT_EQ(test->NumCallbacks(), job_ids.size());
}
// The callback for a notification from the table. This should only be
// received for keys that we requested notifications for.
auto notification_callback = [job_id2, job_ids2](
gcs::RedisGcsClient *client, const JobID &id,
const std::vector<JobTableData> &data) {
// Check that we only get notifications for the requested key.
ASSERT_EQ(id, job_id2);
// Check that we get notifications in the same order as the writes.
for (const auto &entry : data) {
ASSERT_EQ(entry.job_id(), job_ids2[test->NumCallbacks()]);
test->IncrementNumCallbacks();
}
if (test->NumCallbacks() == job_ids2.size()) {
test->Stop();
}
};
// The callback for subscription success. Once we've subscribed, request
// notifications for only one of the keys, then write to both keys.
auto subscribe_callback = [job_id, job_id1, job_id2, job_ids1,
job_ids2](gcs::RedisGcsClient *client) {
// Request notifications for one of the keys.
RAY_CHECK_OK(client->job_table().RequestNotifications(
job_id, job_id2, client->client_table().GetLocalClientId(), nullptr));
// Write both keys. We should only receive notifications for the key that
// we requested them for.
auto remaining = std::vector<std::string>(++job_ids1.begin(), job_ids1.end());
for (const auto &job_id_it : remaining) {
auto data = std::make_shared<JobTableData>();
data->set_job_id(job_id_it);
RAY_CHECK_OK(client->job_table().Append(job_id, job_id1, data, nullptr));
}
remaining = std::vector<std::string>(++job_ids2.begin(), job_ids2.end());
for (const auto &job_id_it : remaining) {
auto data = std::make_shared<JobTableData>();
data->set_job_id(job_id_it);
RAY_CHECK_OK(client->job_table().Append(job_id, job_id2, data, nullptr));
}
};
// Subscribe to notifications for this client. This allows us to request and
// receive notifications for specific keys.
RAY_CHECK_OK(
client->job_table().Subscribe(job_id, client->client_table().GetLocalClientId(),
notification_callback, subscribe_callback));
// Run the event loop. The loop will only stop if the registered subscription
// callback is called for the requested key.
test->Start();
// Check that we received one notification callback for each write to the
// requested key.
ASSERT_EQ(test->NumCallbacks(), job_ids2.size());
}
static void TestLogSubscribeCancel(const JobID &job_id,
std::shared_ptr<gcs::RedisGcsClient> client) {
// Add a log entry.
JobID random_job_id = NextJobID();
std::vector<std::string> job_ids = {"jkl", "mno", "pqr"};
auto data = std::make_shared<JobTableData>();
data->set_job_id(job_ids[0]);
RAY_CHECK_OK(client->job_table().Append(job_id, random_job_id, data, nullptr));
// The callback for a notification from the object table. This should only be
// received for the object that we requested notifications for.
auto notification_callback = [random_job_id, job_ids](
gcs::RedisGcsClient *client, const JobID &id,
const std::vector<JobTableData> &data) {
ASSERT_EQ(id, random_job_id);
// Check that we get a duplicate notification for the first write. We get a
// duplicate notification because the log is append-only and notifications
// are canceled after the first write, then requested again.
auto job_ids_copy = job_ids;
job_ids_copy.insert(job_ids_copy.begin(), job_ids_copy.front());
for (const auto &entry : data) {
ASSERT_EQ(entry.job_id(), job_ids_copy[test->NumCallbacks()]);
test->IncrementNumCallbacks();
}
if (test->NumCallbacks() == job_ids_copy.size()) {
test->Stop();
}
};
// The callback for a notification from the table. This should only be
// received for keys that we requested notifications for.
auto subscribe_callback = [job_id, random_job_id,
job_ids](gcs::RedisGcsClient *client) {
// Request notifications, then cancel immediately. We should receive a
// notification for the current value at the key.
RAY_CHECK_OK(client->job_table().RequestNotifications(
job_id, random_job_id, client->client_table().GetLocalClientId(), nullptr));
RAY_CHECK_OK(client->job_table().CancelNotifications(
job_id, random_job_id, client->client_table().GetLocalClientId(), nullptr));
// Append to the key. Since we canceled notifications, we should not
// receive a notification for these writes.
auto remaining = std::vector<std::string>(++job_ids.begin(), job_ids.end());
for (const auto &remaining_job_id : remaining) {
auto data = std::make_shared<JobTableData>();
data->set_job_id(remaining_job_id);
RAY_CHECK_OK(client->job_table().Append(job_id, random_job_id, data, nullptr));
}
// Request notifications again. We should receive a notification for the
// current values at the key.
RAY_CHECK_OK(client->job_table().RequestNotifications(
job_id, random_job_id, client->client_table().GetLocalClientId(), nullptr));
};
// Subscribe to notifications for this client. This allows us to request and
// receive notifications for specific keys.
RAY_CHECK_OK(
client->job_table().Subscribe(job_id, client->client_table().GetLocalClientId(),
notification_callback, subscribe_callback));
// Run the event loop. The loop will only stop if the registered subscription
// callback is called for the requested key.
test->Start();
// Check that we received a notification callback for the first append to the
// key, then a notification for all of the appends, because we cancel
// notifications in between.
ASSERT_EQ(test->NumCallbacks(), job_ids.size() + 1);
}
};
TEST_F(TestGcsWithAsio, TestLogSubscribeAll) {
test = this;
TestLogSubscribeAll(job_id_, client_);
LogSubscribeTestHelper::TestLogSubscribeAll(job_id_, client_);
}
void TestSetSubscribeAll(const JobID &job_id,
@ -775,78 +917,9 @@ TEST_MACRO(TestGcsWithAsio, TestTableSubscribeId);
TEST_MACRO(TestGcsWithChainAsio, TestTableSubscribeId);
#endif
void TestLogSubscribeId(const JobID &job_id,
std::shared_ptr<gcs::RedisGcsClient> client) {
// Add a log entry.
JobID job_id1 = NextJobID();
std::vector<std::string> job_ids1 = {"abc", "def", "ghi"};
auto data1 = std::make_shared<JobTableData>();
data1->set_job_id(job_ids1[0]);
RAY_CHECK_OK(client->job_table().Append(job_id, job_id1, data1, nullptr));
// Add a log entry at a second key.
JobID job_id2 = NextJobID();
std::vector<std::string> job_ids2 = {"jkl", "mno", "pqr"};
auto data2 = std::make_shared<JobTableData>();
data2->set_job_id(job_ids2[0]);
RAY_CHECK_OK(client->job_table().Append(job_id, job_id2, data2, nullptr));
// The callback for a notification from the table. This should only be
// received for keys that we requested notifications for.
auto notification_callback = [job_id2, job_ids2](
gcs::RedisGcsClient *client, const JobID &id,
const std::vector<JobTableData> &data) {
// Check that we only get notifications for the requested key.
ASSERT_EQ(id, job_id2);
// Check that we get notifications in the same order as the writes.
for (const auto &entry : data) {
ASSERT_EQ(entry.job_id(), job_ids2[test->NumCallbacks()]);
test->IncrementNumCallbacks();
}
if (test->NumCallbacks() == job_ids2.size()) {
test->Stop();
}
};
// The callback for subscription success. Once we've subscribed, request
// notifications for only one of the keys, then write to both keys.
auto subscribe_callback = [job_id, job_id1, job_id2, job_ids1,
job_ids2](gcs::RedisGcsClient *client) {
// Request notifications for one of the keys.
RAY_CHECK_OK(client->job_table().RequestNotifications(
job_id, job_id2, client->client_table().GetLocalClientId(), nullptr));
// Write both keys. We should only receive notifications for the key that
// we requested them for.
auto remaining = std::vector<std::string>(++job_ids1.begin(), job_ids1.end());
for (const auto &job_id_it : remaining) {
auto data = std::make_shared<JobTableData>();
data->set_job_id(job_id_it);
RAY_CHECK_OK(client->job_table().Append(job_id, job_id1, data, nullptr));
}
remaining = std::vector<std::string>(++job_ids2.begin(), job_ids2.end());
for (const auto &job_id_it : remaining) {
auto data = std::make_shared<JobTableData>();
data->set_job_id(job_id_it);
RAY_CHECK_OK(client->job_table().Append(job_id, job_id2, data, nullptr));
}
};
// Subscribe to notifications for this client. This allows us to request and
// receive notifications for specific keys.
RAY_CHECK_OK(client->job_table().Subscribe(job_id,
client->client_table().GetLocalClientId(),
notification_callback, subscribe_callback));
// Run the event loop. The loop will only stop if the registered subscription
// callback is called for the requested key.
test->Start();
// Check that we received one notification callback for each write to the
// requested key.
ASSERT_EQ(test->NumCallbacks(), job_ids2.size());
}
TEST_F(TestGcsWithAsio, TestLogSubscribeId) {
test = this;
TestLogSubscribeId(job_id_, client_);
LogSubscribeTestHelper::TestLogSubscribeId(job_id_, client_);
}
void TestSetSubscribeId(const JobID &job_id,
@ -997,76 +1070,9 @@ TEST_MACRO(TestGcsWithAsio, TestTableSubscribeCancel);
TEST_MACRO(TestGcsWithChainAsio, TestTableSubscribeCancel);
#endif
void TestLogSubscribeCancel(const JobID &job_id,
std::shared_ptr<gcs::RedisGcsClient> client) {
// Add a log entry.
JobID random_job_id = NextJobID();
std::vector<std::string> job_ids = {"jkl", "mno", "pqr"};
auto data = std::make_shared<JobTableData>();
data->set_job_id(job_ids[0]);
RAY_CHECK_OK(client->job_table().Append(job_id, random_job_id, data, nullptr));
// The callback for a notification from the object table. This should only be
// received for the object that we requested notifications for.
auto notification_callback = [random_job_id, job_ids](
gcs::RedisGcsClient *client, const JobID &id,
const std::vector<JobTableData> &data) {
ASSERT_EQ(id, random_job_id);
// Check that we get a duplicate notification for the first write. We get a
// duplicate notification because the log is append-only and notifications
// are canceled after the first write, then requested again.
auto job_ids_copy = job_ids;
job_ids_copy.insert(job_ids_copy.begin(), job_ids_copy.front());
for (const auto &entry : data) {
ASSERT_EQ(entry.job_id(), job_ids_copy[test->NumCallbacks()]);
test->IncrementNumCallbacks();
}
if (test->NumCallbacks() == job_ids_copy.size()) {
test->Stop();
}
};
// The callback for a notification from the table. This should only be
// received for keys that we requested notifications for.
auto subscribe_callback = [job_id, random_job_id,
job_ids](gcs::RedisGcsClient *client) {
// Request notifications, then cancel immediately. We should receive a
// notification for the current value at the key.
RAY_CHECK_OK(client->job_table().RequestNotifications(
job_id, random_job_id, client->client_table().GetLocalClientId(), nullptr));
RAY_CHECK_OK(client->job_table().CancelNotifications(
job_id, random_job_id, client->client_table().GetLocalClientId(), nullptr));
// Append to the key. Since we canceled notifications, we should not
// receive a notification for these writes.
auto remaining = std::vector<std::string>(++job_ids.begin(), job_ids.end());
for (const auto &remaining_job_id : remaining) {
auto data = std::make_shared<JobTableData>();
data->set_job_id(remaining_job_id);
RAY_CHECK_OK(client->job_table().Append(job_id, random_job_id, data, nullptr));
}
// Request notifications again. We should receive a notification for the
// current values at the key.
RAY_CHECK_OK(client->job_table().RequestNotifications(
job_id, random_job_id, client->client_table().GetLocalClientId(), nullptr));
};
// Subscribe to notifications for this client. This allows us to request and
// receive notifications for specific keys.
RAY_CHECK_OK(client->job_table().Subscribe(job_id,
client->client_table().GetLocalClientId(),
notification_callback, subscribe_callback));
// Run the event loop. The loop will only stop if the registered subscription
// callback is called for the requested key.
test->Start();
// Check that we received a notification callback for the first append to the
// key, then a notification for all of the appends, because we cancel
// notifications in between.
ASSERT_EQ(test->NumCallbacks(), job_ids.size() + 1);
}
TEST_F(TestGcsWithAsio, TestLogSubscribeCancel) {
test = this;
TestLogSubscribeCancel(job_id_, client_);
LogSubscribeTestHelper::TestLogSubscribeCancel(job_id_, client_);
}
void TestSetSubscribeCancel(const JobID &job_id,

View file

@ -0,0 +1,75 @@
#include "ray/gcs/redis_job_info_accessor.h"
#include <memory>
#include "gtest/gtest.h"
#include "ray/gcs/pb_util.h"
#include "ray/gcs/redis_gcs_client.h"
#include "ray/gcs/test/accessor_test_base.h"
#include "ray/util/test_util.h"
namespace ray {
namespace gcs {
class RedisJobInfoAccessorTest : public AccessorTestBase<JobID, JobTableData> {
protected:
virtual void GenTestData() {
for (size_t i = 0; i < total_job_number_; ++i) {
JobID job_id = JobID::FromInt(i);
std::shared_ptr<JobTableData> job_data_ptr =
CreateJobTableData(job_id, /*is_dead*/ false, /*timestamp*/ 1,
/*node_manager_address*/ "", /*driver_pid*/ i);
id_to_data_[job_id] = job_data_ptr;
}
}
std::atomic<int> subscribe_pending_count_{0};
size_t total_job_number_{100};
};
TEST_F(RedisJobInfoAccessorTest, AddAndSubscribe) {
JobInfoAccessor &job_accessor = gcs_client_->Jobs();
// SubscribeAll
auto on_subscribe = [this](const JobID &job_id, const JobTableData &data) {
const auto it = id_to_data_.find(job_id);
RAY_CHECK(it != id_to_data_.end());
ASSERT_TRUE(data.is_dead());
--subscribe_pending_count_;
};
auto on_done = [this](Status status) {
RAY_CHECK_OK(status);
--pending_count_;
};
++pending_count_;
RAY_CHECK_OK(job_accessor.AsyncSubscribeToFinishedJobs(on_subscribe, on_done));
WaitPendingDone(wait_pending_timeout_);
WaitPendingDone(subscribe_pending_count_, wait_pending_timeout_);
// Register
for (const auto &item : id_to_data_) {
++pending_count_;
RAY_CHECK_OK(job_accessor.AsyncAdd(item.second, [this](Status status) {
RAY_CHECK_OK(status);
--pending_count_;
}));
}
WaitPendingDone(wait_pending_timeout_);
WaitPendingDone(subscribe_pending_count_, wait_pending_timeout_);
// Update
for (auto &item : id_to_data_) {
++pending_count_;
++subscribe_pending_count_;
RAY_CHECK_OK(job_accessor.AsyncMarkFinished(item.first, [this](Status status) {
RAY_CHECK_OK(status);
--pending_count_;
}));
}
WaitPendingDone(wait_pending_timeout_);
WaitPendingDone(subscribe_pending_count_, wait_pending_timeout_);
}
} // namespace gcs
} // namespace ray

View file

@ -1,7 +1,8 @@
#include "ray/gcs/subscription_executor.h"
#include "gtest/gtest.h"
#include "ray/gcs/accessor_test_base.h"
#include "ray/gcs/callback.h"
#include "ray/gcs/redis_gcs_client.h"
#include "ray/gcs/test/accessor_test_base.h"
namespace ray {
@ -42,7 +43,7 @@ class SubscriptionExecutorTest : public AccessorTestBase<ActorID, ActorTableData
protected:
virtual void GenTestData() {
for (size_t i = 0; i < 2; ++i) {
for (size_t i = 0; i < 100; ++i) {
std::shared_ptr<ActorTableData> actor = std::make_shared<ActorTableData>();
actor->set_max_reconstructions(1);
actor->set_remaining_reconstructions(1);
@ -85,12 +86,12 @@ class SubscriptionExecutorTest : public AccessorTestBase<ActorID, ActorTableData
TEST_F(SubscriptionExecutorTest, SubscribeAllTest) {
++do_sub_pending_count_;
Status status =
actor_sub_executor_->AsyncSubscribe(ClientID::Nil(), subscribe_, sub_done_);
actor_sub_executor_->AsyncSubscribeAll(ClientID::Nil(), subscribe_, sub_done_);
WaitPendingDone(do_sub_pending_count_, wait_pending_timeout_);
ASSERT_TRUE(status.ok());
sub_pending_count_ = id_to_data_.size();
AsyncRegisterActorToGcs();
status = actor_sub_executor_->AsyncSubscribe(ClientID::Nil(), subscribe_, sub_done_);
status = actor_sub_executor_->AsyncSubscribeAll(ClientID::Nil(), subscribe_, sub_done_);
ASSERT_TRUE(status.IsInvalid());
WaitPendingDone(sub_pending_count_, wait_pending_timeout_);
}
@ -128,7 +129,7 @@ TEST_F(SubscriptionExecutorTest, SubscribeOneAfterActorRegistrationWithClientIDT
TEST_F(SubscriptionExecutorTest, SubscribeAllAndSubscribeOneTest) {
++do_sub_pending_count_;
Status status =
actor_sub_executor_->AsyncSubscribe(ClientID::Nil(), subscribe_, sub_done_);
actor_sub_executor_->AsyncSubscribeAll(ClientID::Nil(), subscribe_, sub_done_);
ASSERT_TRUE(status.ok());
WaitPendingDone(do_sub_pending_count_, wait_pending_timeout_);
for (const auto &item : id_to_data_) {

View file

@ -4,10 +4,10 @@
#include <fstream>
#include <memory>
#include "ray/common/status.h"
#include "ray/common/common_protocol.h"
#include "ray/common/id.h"
#include "ray/common/status.h"
#include "ray/gcs/pb_util.h"
#include "ray/raylet/format/node_manager_generated.h"
#include "ray/stats/stats.h"
#include "ray/util/sample.h"
@ -235,13 +235,13 @@ ray::Status NodeManager::RegisterGcs() {
/*subscribe_callback=*/nullptr,
/*done_callback=*/nullptr));
// Subscribe to driver table updates.
const auto job_table_handler = [this](gcs::RedisGcsClient *client, const JobID &job_id,
const std::vector<JobTableData> &job_data) {
HandleJobTableUpdate(job_id, job_data);
// Subscribe to job updates.
const auto job_subscribe_handler = [this](const JobID &job_id,
const JobTableData &job_data) {
HandleJobFinished(job_id, job_data);
};
RAY_RETURN_NOT_OK(gcs_client_->job_table().Subscribe(JobID::Nil(), ClientID::Nil(),
job_table_handler, nullptr));
RAY_RETURN_NOT_OK(
gcs_client_->Jobs().AsyncSubscribeToFinishedJobs(job_subscribe_handler, nullptr));
// Start sending heartbeats to the GCS.
last_heartbeat_at_ms_ = current_time_ms();
@ -272,40 +272,33 @@ void NodeManager::KillWorker(std::shared_ptr<Worker> worker) {
});
}
void NodeManager::HandleJobTableUpdate(const JobID &id,
const std::vector<JobTableData> &job_data) {
for (const auto &entry : job_data) {
RAY_LOG(DEBUG) << "HandleJobTableUpdate " << JobID::FromBinary(entry.job_id()) << " "
<< entry.is_dead();
if (entry.is_dead()) {
auto job_id = JobID::FromBinary(entry.job_id());
auto workers = worker_pool_.GetWorkersRunningTasksForJob(job_id);
// Kill all the workers. The actual cleanup for these workers is done
// later when we receive the DisconnectClient message from them.
for (const auto &worker : workers) {
if (!worker->IsDetachedActor()) {
// Clean up any open ray.wait calls that the worker made.
task_dependency_manager_.UnsubscribeWaitDependencies(worker->WorkerId());
// Mark the worker as dead so further messages from it are ignored
// (except DisconnectClient).
worker->MarkDead();
// Then kill the worker process.
KillWorker(worker);
}
}
// Remove all tasks for this job from the scheduling queues, mark
// the results for these tasks as not required, cancel any attempts
// at reconstruction. Note that at this time the workers are likely
// alive because of the delay in killing workers.
auto tasks_to_remove = local_queues_.GetTaskIdsForJob(job_id);
task_dependency_manager_.RemoveTasksAndRelatedObjects(tasks_to_remove);
// NOTE(swang): SchedulingQueue::RemoveTasks modifies its argument so we must
// call it last.
local_queues_.RemoveTasks(tasks_to_remove);
void NodeManager::HandleJobFinished(const JobID &job_id, const JobTableData &job_data) {
RAY_LOG(DEBUG) << "HandleJobFinished " << job_id;
RAY_CHECK(job_data.is_dead());
auto workers = worker_pool_.GetWorkersRunningTasksForJob(job_id);
// Kill all the workers. The actual cleanup for these workers is done
// later when we receive the DisconnectClient message from them.
for (const auto &worker : workers) {
if (!worker->IsDetachedActor()) {
// Clean up any open ray.wait calls that the worker made.
task_dependency_manager_.UnsubscribeWaitDependencies(worker->WorkerId());
// Mark the worker as dead so further messages from it are ignored
// (except DisconnectClient).
worker->MarkDead();
// Then kill the worker process.
KillWorker(worker);
}
}
// Remove all tasks for this job from the scheduling queues, mark
// the results for these tasks as not required, cancel any attempts
// at reconstruction. Note that at this time the workers are likely
// alive because of the delay in killing workers.
auto tasks_to_remove = local_queues_.GetTaskIdsForJob(job_id);
task_dependency_manager_.RemoveTasksAndRelatedObjects(tasks_to_remove);
// NOTE(swang): SchedulingQueue::RemoveTasks modifies its argument so we must
// call it last.
local_queues_.RemoveTasks(tasks_to_remove);
}
void NodeManager::Heartbeat() {
@ -1048,9 +1041,10 @@ void NodeManager::ProcessRegisterClientRequestMessage(
status = worker_pool_.RegisterDriver(std::move(worker));
if (status.ok()) {
local_queues_.AddDriverTaskId(driver_task_id);
RAY_CHECK_OK(gcs_client_->job_table().AppendJobData(
job_id, /*is_dead=*/false, std::time(nullptr),
initial_config_.node_manager_address, message->worker_pid()));
auto job_data_ptr = gcs::CreateJobTableData(
job_id, /*is_dead*/ false, std::time(nullptr),
initial_config_.node_manager_address, message->worker_pid());
RAY_CHECK_OK(gcs_client_->Jobs().AsyncAdd(job_data_ptr, nullptr));
}
}
}
@ -1255,11 +1249,9 @@ void NodeManager::ProcessDisconnectClientMessage(
} else if (is_driver) {
// The client is a driver.
const auto job_id = worker->GetAssignedJobId();
const auto driver_id = ComputeDriverIdFromJob(job_id);
RAY_CHECK(!job_id.IsNil());
RAY_CHECK_OK(gcs_client_->job_table().AppendJobData(
job_id, /*is_dead=*/true, std::time(nullptr),
initial_config_.node_manager_address, worker->Pid()));
RAY_CHECK_OK(gcs_client_->Jobs().AsyncMarkFinished(job_id, nullptr));
const auto driver_id = ComputeDriverIdFromJob(job_id);
local_queues_.RemoveDriverTaskId(TaskID::ComputeDriverTaskId(driver_id));
worker_pool_.DisconnectDriver(worker);

View file

@ -390,12 +390,12 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// \return Void.
void HandleObjectMissing(const ObjectID &object_id);
/// Handles updates to job table.
/// Handles the event that a job is finished.
///
/// \param id An unused value. TODO(rkn): Should this be removed?
/// \param job_data Data associated with a job table event.
/// \param job_id ID of the finished job.
/// \param job_data Data associated with the finished job.
/// \return Void.
void HandleJobTableUpdate(const JobID &id, const std::vector<JobTableData> &job_data);
void HandleJobFinished(const JobID &job_id, const JobTableData &job_data);
/// Check if certain invariants associated with the task dependency manager
/// and the local queues are satisfied. This is only used for debugging

View file

@ -6,7 +6,8 @@
set -e
set -x
bazel build "//:redis_gcs_client_test" "//:redis_actor_info_accessor_test" "//:subscription_executor_test" "//:asio_test" "//:libray_redis_module.so"
bazel build "//:redis_gcs_client_test" "//:subscription_executor_test" "//:asio_test" "//:libray_redis_module.so"
bazel build "//:redis_actor_info_accessor_test" "//:redis_job_info_accessor_test"
# Start Redis.
./bazel-bin/redis-server \
@ -16,9 +17,10 @@ bazel build "//:redis_gcs_client_test" "//:redis_actor_info_accessor_test" "//:
sleep 1s
./bazel-bin/redis_gcs_client_test
./bazel-bin/redis_actor_info_accessor_test
./bazel-bin/subscription_executor_test
./bazel-bin/asio_test
./bazel-bin/redis_actor_info_accessor_test
./bazel-bin/redis_job_info_accessor_test
./bazel-bin/redis-cli -p 6379 shutdown
sleep 1s