Revert "[GCS] refactor the GCS Client Task Interface (#5515)" (#6543)

This reverts commit f78583147c.
This commit is contained in:
Hao Chen 2019-12-19 17:11:36 +08:00 committed by GitHub
parent d807d0bab6
commit 7e2addb424
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 692 additions and 803 deletions

View file

@ -1,5 +1,5 @@
#include "ray/core_worker/actor_manager.h"
#include "ray/gcs/redis_accessor.h"
#include "ray/gcs/redis_actor_info_accessor.h"
namespace ray {

View file

@ -206,7 +206,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
std::shared_ptr<gcs::TaskTableData> data = std::make_shared<gcs::TaskTableData>();
data->mutable_task()->mutable_task_spec()->CopyFrom(builder.Build().GetMessage());
RAY_CHECK_OK(gcs_client_->Tasks().AsyncAdd(data, nullptr));
RAY_CHECK_OK(gcs_client_->raylet_task_table().Add(job_id, task_id, data, nullptr));
SetCurrentTaskId(task_id);
}

View file

@ -1,183 +0,0 @@
#ifndef RAY_GCS_ACCESSOR_H
#define RAY_GCS_ACCESSOR_H
#include "ray/common/id.h"
#include "ray/gcs/callback.h"
#include "ray/protobuf/gcs.pb.h"
namespace ray {
namespace gcs {
/// \class ActorInfoAccessor
/// `ActorInfoAccessor` is a sub-interface of `GcsClient`.
/// This class includes all the methods that are related to accessing
/// actor information in the GCS.
class ActorInfoAccessor {
public:
virtual ~ActorInfoAccessor() = default;
/// Get actor specification from GCS asynchronously.
///
/// \param actor_id The ID of actor to look up in the GCS.
/// \param callback Callback that will be called after lookup finishes.
/// \return Status
virtual Status AsyncGet(const ActorID &actor_id,
const OptionalItemCallback<rpc::ActorTableData> &callback) = 0;
/// Register an actor to GCS asynchronously.
///
/// \param data_ptr The actor that will be registered to the GCS.
/// \param callback Callback that will be called after actor has been registered
/// to the GCS.
/// \return Status
virtual Status AsyncRegister(const std::shared_ptr<rpc::ActorTableData> &data_ptr,
const StatusCallback &callback) = 0;
/// Update dynamic states of actor in GCS asynchronously.
///
/// \param actor_id ID of the actor to update.
/// \param data_ptr Data of the actor to update.
/// \param callback Callback that will be called after update finishes.
/// \return Status
/// TODO(micafan) Don't expose the whole `ActorTableData` and only allow
/// updating dynamic states.
virtual Status AsyncUpdate(const ActorID &actor_id,
const std::shared_ptr<rpc::ActorTableData> &data_ptr,
const StatusCallback &callback) = 0;
/// Subscribe to any register or update operations of actors.
///
/// \param subscribe Callback that will be called each time when an actor is registered
/// or updated.
/// \param done Callback that will be called when subscription is complete and we
/// are ready to receive notification.
/// \return Status
virtual Status AsyncSubscribeAll(
const SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe,
const StatusCallback &done) = 0;
/// Subscribe to any update operations of an actor.
///
/// \param actor_id The ID of actor to be subscribed to.
/// \param subscribe Callback that will be called each time when the actor is updated.
/// \param done Callback that will be called when subscription is complete.
/// \return Status
virtual Status AsyncSubscribe(
const ActorID &actor_id,
const SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe,
const StatusCallback &done) = 0;
/// Cancel subscription to an actor.
///
/// \param actor_id The ID of the actor to be unsubscribed to.
/// \param done Callback that will be called when unsubscribe is complete.
/// \return Status
virtual Status AsyncUnsubscribe(const ActorID &actor_id,
const StatusCallback &done) = 0;
protected:
ActorInfoAccessor() = default;
};
/// \class JobInfoAccessor
/// `JobInfoAccessor` is a sub-interface of `GcsClient`.
/// This class includes all the methods that are related to accessing
/// job information in the GCS.
class JobInfoAccessor {
public:
virtual ~JobInfoAccessor() = default;
/// Add a job to GCS asynchronously.
///
/// \param data_ptr The job that will be add to GCS.
/// \param callback Callback that will be called after job has been added
/// to GCS.
/// \return Status
virtual Status AsyncAdd(const std::shared_ptr<rpc::JobTableData> &data_ptr,
const StatusCallback &callback) = 0;
/// Mark job as finished in GCS asynchronously.
///
/// \param job_id ID of the job that will be make finished to GCS.
/// \param callback Callback that will be called after update finished.
/// \return Status
virtual Status AsyncMarkFinished(const JobID &job_id,
const StatusCallback &callback) = 0;
/// Subscribe to finished jobs.
///
/// \param subscribe Callback that will be called each time when a job finishes.
/// \param done Callback that will be called when subscription is complete.
/// \return Status
virtual Status AsyncSubscribeToFinishedJobs(
const SubscribeCallback<JobID, rpc::JobTableData> &subscribe,
const StatusCallback &done) = 0;
protected:
JobInfoAccessor() = default;
};
/// \class TaskInfoAccessor
/// `TaskInfoAccessor` is a sub-interface of `GcsClient`.
/// This class includes all the methods that are related to accessing
/// task information in the GCS.
class TaskInfoAccessor {
public:
virtual ~TaskInfoAccessor() {}
/// Add a task to GCS asynchronously.
///
/// \param data_ptr The task that will be added to GCS.
/// \param callback Callback that will be called after task has been added
/// to GCS.
/// \return Status
virtual Status AsyncAdd(const std::shared_ptr<rpc::TaskTableData> &data_ptr,
const StatusCallback &callback) = 0;
/// Get task information from GCS asynchronously.
///
/// \param task_id The ID of the task to look up in GCS.
/// \param callback Callback that is called after lookup finished.
/// \return Status
virtual Status AsyncGet(const TaskID &task_id,
const OptionalItemCallback<rpc::TaskTableData> &callback) = 0;
/// Delete tasks from GCS asynchronously.
///
/// \param task_ids The vector of IDs to delete from GCS.
/// \param callback Callback that is called after delete finished.
/// \return Status
// TODO(micafan) Will support callback of batch deletion in the future.
// Currently this callback will never be called.
virtual Status AsyncDelete(const std::vector<TaskID> &task_ids,
const StatusCallback &callback) = 0;
/// Subscribe asynchronously to the event that the given task is added in GCS.
///
/// \param task_id The ID of the task to be subscribed to.
/// \param subscribe Callback that will be called each time when the task is updated.
/// \param done Callback that will be called when subscription is complete.
/// \return Status
virtual Status AsyncSubscribe(
const TaskID &task_id,
const SubscribeCallback<TaskID, rpc::TaskTableData> &subscribe,
const StatusCallback &done) = 0;
/// Cancel subscription to a task asynchronously.
/// This method is for node only (core worker shouldn't use this method).
///
/// \param task_id The ID of the task to be unsubscribed to.
/// \param done Callback that will be called when unsubscribe is complete.
/// \return Status
virtual Status AsyncUnsubscribe(const TaskID &task_id, const StatusCallback &done) = 0;
protected:
TaskInfoAccessor() = default;
};
} // namespace gcs
} // namespace ray
#endif // RAY_GCS_ACCESSOR_H

View file

@ -0,0 +1,87 @@
#ifndef RAY_GCS_ACTOR_INFO_ACCESSOR_H
#define RAY_GCS_ACTOR_INFO_ACCESSOR_H
#include "ray/common/id.h"
#include "ray/gcs/callback.h"
#include "ray/protobuf/gcs.pb.h"
namespace ray {
namespace gcs {
/// \class ActorInfoAccessor
/// `ActorInfoAccessor` is a sub-interface of `GcsClient`.
/// This class includes all the methods that are related to accessing
/// actor information in the GCS.
class ActorInfoAccessor {
public:
virtual ~ActorInfoAccessor() = default;
/// Get actor specification from GCS asynchronously.
///
/// \param actor_id The ID of actor to look up in the GCS.
/// \param callback Callback that will be called after lookup finishes.
/// \return Status
virtual Status AsyncGet(const ActorID &actor_id,
const OptionalItemCallback<rpc::ActorTableData> &callback) = 0;
/// Register an actor to GCS asynchronously.
///
/// \param data_ptr The actor that will be registered to the GCS.
/// \param callback Callback that will be called after actor has been registered
/// to the GCS.
/// \return Status
virtual Status AsyncRegister(const std::shared_ptr<rpc::ActorTableData> &data_ptr,
const StatusCallback &callback) = 0;
/// Update dynamic states of actor in GCS asynchronously.
///
/// \param actor_id ID of the actor to update.
/// \param data_ptr Data of the actor to update.
/// \param callback Callback that will be called after update finishes.
/// \return Status
/// TODO(micafan) Don't expose the whole `ActorTableData` and only allow
/// updating dynamic states.
virtual Status AsyncUpdate(const ActorID &actor_id,
const std::shared_ptr<rpc::ActorTableData> &data_ptr,
const StatusCallback &callback) = 0;
/// Subscribe to any register or update operations of actors.
///
/// \param subscribe Callback that will be called each time when an actor is registered
/// or updated.
/// \param done Callback that will be called when subscription is complete and we
/// are ready to receive notification.
/// \return Status
virtual Status AsyncSubscribeAll(
const SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe,
const StatusCallback &done) = 0;
/// Subscribe to any update operations of an actor.
///
/// \param actor_id The ID of actor to be subscribed to.
/// \param subscribe Callback that will be called each time when the actor is updated.
/// \param done Callback that will be called when subscription is complete.
/// \return Status
virtual Status AsyncSubscribe(
const ActorID &actor_id,
const SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe,
const StatusCallback &done) = 0;
/// Cancel subscription to an actor.
///
/// \param actor_id The ID of the actor to be unsubscribed to.
/// \param done Callback that will be called when unsubscribe is complete.
/// \return Status
virtual Status AsyncUnsubscribe(const ActorID &actor_id,
const StatusCallback &done) = 0;
protected:
ActorInfoAccessor() = default;
};
} // namespace gcs
} // namespace ray
#endif // RAY_GCS_ACTOR_INFO_ACCESSOR_H

View file

@ -6,7 +6,8 @@
#include <string>
#include <vector>
#include "ray/common/status.h"
#include "ray/gcs/accessor.h"
#include "ray/gcs/actor_info_accessor.h"
#include "ray/gcs/job_info_accessor.h"
#include "ray/util/logging.h"
namespace ray {
@ -74,13 +75,6 @@ class GcsClient : public std::enable_shared_from_this<GcsClient> {
return *job_accessor_;
}
/// Get the sub-interface for accessing task information in GCS.
/// This function is thread safe.
TaskInfoAccessor &Tasks() {
RAY_CHECK(task_accessor_ != nullptr);
return *task_accessor_;
}
protected:
/// Constructor of GcsClient.
///
@ -94,7 +88,6 @@ class GcsClient : public std::enable_shared_from_this<GcsClient> {
std::unique_ptr<ActorInfoAccessor> actor_accessor_;
std::unique_ptr<JobInfoAccessor> job_accessor_;
std::unique_ptr<TaskInfoAccessor> task_accessor_;
};
} // namespace gcs

View file

@ -0,0 +1,54 @@
#ifndef RAY_GCS_JOB_INFO_ACCESSOR_H
#define RAY_GCS_JOB_INFO_ACCESSOR_H
#include "ray/common/id.h"
#include "ray/gcs/callback.h"
#include "ray/protobuf/gcs.pb.h"
namespace ray {
namespace gcs {
/// \class JobInfoAccessor
/// `JobInfoAccessor` is a sub-interface of `GcsClient`.
/// This class includes all the methods that are related to accessing
/// job information in the GCS.
class JobInfoAccessor {
public:
virtual ~JobInfoAccessor() = default;
/// Add a job to GCS asynchronously.
///
/// \param data_ptr The job that will be add to GCS.
/// \param callback Callback that will be called after job has been added
/// to GCS.
/// \return Status
virtual Status AsyncAdd(const std::shared_ptr<rpc::JobTableData> &data_ptr,
const StatusCallback &callback) = 0;
/// Mark job as finished in GCS asynchronously.
///
/// \param job_id ID of the job that will be make finished to GCS.
/// \param callback Callback that will be called after update finished.
/// \return Status
virtual Status AsyncMarkFinished(const JobID &job_id,
const StatusCallback &callback) = 0;
/// Subscribe to finished jobs.
///
/// \param subscribe Callback that will be called each time when a job finishes.
/// \param done Callback that will be called when subscription is complete.
/// \return Status
virtual Status AsyncSubscribeToFinishedJobs(
const SubscribeCallback<JobID, rpc::JobTableData> &subscribe,
const StatusCallback &done) = 0;
protected:
JobInfoAccessor() = default;
};
} // namespace gcs
} // namespace ray
#endif // RAY_GCS_JOB_INFO_ACCESSOR_H

View file

@ -1,139 +0,0 @@
#ifndef RAY_GCS_REDIS_ACCESSOR_H
#define RAY_GCS_REDIS_ACCESSOR_H
#include "ray/common/id.h"
#include "ray/common/task/task_spec.h"
#include "ray/gcs/accessor.h"
#include "ray/gcs/callback.h"
#include "ray/gcs/subscription_executor.h"
#include "ray/gcs/tables.h"
namespace ray {
namespace gcs {
class RedisGcsClient;
std::shared_ptr<gcs::ActorTableData> CreateActorTableData(
const TaskSpecification &task_spec, const rpc::Address &address,
gcs::ActorTableData::ActorState state, uint64_t remaining_reconstructions);
/// \class RedisActorInfoAccessor
/// `RedisActorInfoAccessor` is an implementation of `ActorInfoAccessor`
/// that uses Redis as the backend storage.
class RedisActorInfoAccessor : public ActorInfoAccessor {
public:
explicit RedisActorInfoAccessor(RedisGcsClient *client_impl);
virtual ~RedisActorInfoAccessor() {}
Status AsyncGet(const ActorID &actor_id,
const OptionalItemCallback<ActorTableData> &callback) override;
Status AsyncRegister(const std::shared_ptr<ActorTableData> &data_ptr,
const StatusCallback &callback) override;
Status AsyncUpdate(const ActorID &actor_id,
const std::shared_ptr<ActorTableData> &data_ptr,
const StatusCallback &callback) override;
Status AsyncSubscribeAll(const SubscribeCallback<ActorID, ActorTableData> &subscribe,
const StatusCallback &done) override;
Status AsyncSubscribe(const ActorID &actor_id,
const SubscribeCallback<ActorID, ActorTableData> &subscribe,
const StatusCallback &done) override;
Status AsyncUnsubscribe(const ActorID &actor_id, const StatusCallback &done) override;
private:
RedisGcsClient *client_impl_{nullptr};
// Use a random ClientID for actor subscription. Because:
// If we use ClientID::Nil, GCS will still send all actors' updates to this GCS Client.
// Even we can filter out irrelevant updates, but there will be extra overhead.
// And because the new GCS Client will no longer hold the local ClientID, so we use
// random ClientID instead.
// TODO(micafan): Remove this random id, once GCS becomes a service.
ClientID node_id_{ClientID::FromRandom()};
typedef SubscriptionExecutor<ActorID, ActorTableData, ActorTable>
ActorSubscriptionExecutor;
ActorSubscriptionExecutor actor_sub_executor_;
};
/// \class RedisJobInfoAccessor
/// RedisJobInfoAccessor is an implementation of `JobInfoAccessor`
/// that uses Redis as the backend storage.
class RedisJobInfoAccessor : public JobInfoAccessor {
public:
explicit RedisJobInfoAccessor(RedisGcsClient *client_impl);
virtual ~RedisJobInfoAccessor() {}
Status AsyncAdd(const std::shared_ptr<JobTableData> &data_ptr,
const StatusCallback &callback) override;
Status AsyncMarkFinished(const JobID &job_id, const StatusCallback &callback) override;
Status AsyncSubscribeToFinishedJobs(
const SubscribeCallback<JobID, JobTableData> &subscribe,
const StatusCallback &done) override;
private:
/// Append job information to GCS asynchronously.
///
/// \param data_ptr The job information that will be appended to GCS.
/// \param callback Callback that will be called after append done.
/// \return Status
Status DoAsyncAppend(const std::shared_ptr<JobTableData> &data_ptr,
const StatusCallback &callback);
RedisGcsClient *client_impl_{nullptr};
typedef SubscriptionExecutor<JobID, JobTableData, JobTable> JobSubscriptionExecutor;
JobSubscriptionExecutor job_sub_executor_;
};
/// \class RedisTaskInfoAccessor
/// `RedisTaskInfoAccessor` is an implementation of `TaskInfoAccessor`
/// that uses Redis as the backend storage.
class RedisTaskInfoAccessor : public TaskInfoAccessor {
public:
explicit RedisTaskInfoAccessor(RedisGcsClient *client_impl);
~RedisTaskInfoAccessor() {}
Status AsyncAdd(const std::shared_ptr<TaskTableData> &data_ptr,
const StatusCallback &callback);
Status AsyncGet(const TaskID &task_id,
const OptionalItemCallback<TaskTableData> &callback);
Status AsyncDelete(const std::vector<TaskID> &task_ids, const StatusCallback &callback);
Status AsyncSubscribe(const TaskID &task_id,
const SubscribeCallback<TaskID, TaskTableData> &subscribe,
const StatusCallback &done);
Status AsyncUnsubscribe(const TaskID &task_id, const StatusCallback &done);
private:
RedisGcsClient *client_impl_{nullptr};
// Use a random ClientID for task subscription. Because:
// If we use ClientID::Nil, GCS will still send all tasks' updates to this GCS Client.
// Even we can filter out irrelevant updates, but there will be extra overhead.
// And because the new GCS Client will no longer hold the local ClientID, so we use
// random ClientID instead.
// TODO(micafan): Remove this random id, once GCS becomes a service.
ClientID subscribe_id_{ClientID::FromRandom()};
typedef SubscriptionExecutor<TaskID, TaskTableData, raylet::TaskTable>
TaskSubscriptionExecutor;
TaskSubscriptionExecutor task_sub_executor_;
};
} // namespace gcs
} // namespace ray
#endif // RAY_GCS_REDIS_ACCESSOR_H

View file

@ -1,6 +1,5 @@
#include "ray/gcs/redis_accessor.h"
#include "ray/gcs/redis_actor_info_accessor.h"
#include <boost/none.hpp>
#include "ray/gcs/pb_util.h"
#include "ray/gcs/redis_gcs_client.h"
#include "ray/util/logging.h"
@ -44,10 +43,8 @@ Status RedisActorInfoAccessor::AsyncGet(
boost::optional<ActorTableData> result;
if (!data.empty()) {
result = data.back();
callback(Status::OK(), result);
} else {
callback(Status::Invalid("Actor not exist."), result);
}
callback(Status::OK(), result);
};
return client_impl_->actor_table().Lookup(JobID::Nil(), actor_id, on_done);
@ -135,100 +132,6 @@ Status RedisActorInfoAccessor::AsyncUnsubscribe(const ActorID &actor_id,
return actor_sub_executor_.AsyncUnsubscribe(node_id_, actor_id, done);
}
RedisJobInfoAccessor::RedisJobInfoAccessor(RedisGcsClient *client_impl)
: client_impl_(client_impl), job_sub_executor_(client_impl->job_table()) {}
Status RedisJobInfoAccessor::AsyncAdd(const std::shared_ptr<JobTableData> &data_ptr,
const StatusCallback &callback) {
return DoAsyncAppend(data_ptr, callback);
}
Status RedisJobInfoAccessor::AsyncMarkFinished(const JobID &job_id,
const StatusCallback &callback) {
std::shared_ptr<JobTableData> data_ptr =
CreateJobTableData(job_id, /*is_dead*/ true, /*time_stamp*/ std::time(nullptr),
/*node_manager_address*/ "", /*driver_pid*/ -1);
return DoAsyncAppend(data_ptr, callback);
}
Status RedisJobInfoAccessor::DoAsyncAppend(const std::shared_ptr<JobTableData> &data_ptr,
const StatusCallback &callback) {
JobTable::WriteCallback on_done = nullptr;
if (callback != nullptr) {
on_done = [callback](RedisGcsClient *client, const JobID &job_id,
const JobTableData &data) { callback(Status::OK()); };
}
JobID job_id = JobID::FromBinary(data_ptr->job_id());
return client_impl_->job_table().Append(job_id, job_id, data_ptr, on_done);
}
Status RedisJobInfoAccessor::AsyncSubscribeToFinishedJobs(
const SubscribeCallback<JobID, JobTableData> &subscribe, const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr);
auto on_subscribe = [subscribe](const JobID &job_id, const JobTableData &job_data) {
if (job_data.is_dead()) {
subscribe(job_id, job_data);
}
};
return job_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), on_subscribe, done);
}
RedisTaskInfoAccessor::RedisTaskInfoAccessor(RedisGcsClient *client_impl)
: client_impl_(client_impl), task_sub_executor_(client_impl->raylet_task_table()) {}
Status RedisTaskInfoAccessor::AsyncAdd(const std::shared_ptr<TaskTableData> &data_ptr,
const StatusCallback &callback) {
raylet::TaskTable::WriteCallback on_done = nullptr;
if (callback != nullptr) {
on_done = [callback](RedisGcsClient *client, const TaskID &task_id,
const TaskTableData &data) { callback(Status::OK()); };
}
TaskID task_id = TaskID::FromBinary(data_ptr->task().task_spec().task_id());
raylet::TaskTable &task_table = client_impl_->raylet_task_table();
return task_table.Add(JobID::Nil(), task_id, data_ptr, on_done);
}
Status RedisTaskInfoAccessor::AsyncGet(
const TaskID &task_id, const OptionalItemCallback<TaskTableData> &callback) {
RAY_CHECK(callback != nullptr);
auto on_success = [callback](RedisGcsClient *client, const TaskID &task_id,
const TaskTableData &data) {
boost::optional<TaskTableData> result(data);
callback(Status::OK(), result);
};
auto on_failure = [callback](RedisGcsClient *client, const TaskID &task_id) {
boost::optional<TaskTableData> result;
callback(Status::Invalid("Task not exist."), result);
};
raylet::TaskTable &task_table = client_impl_->raylet_task_table();
return task_table.Lookup(JobID::Nil(), task_id, on_success, on_failure);
}
Status RedisTaskInfoAccessor::AsyncDelete(const std::vector<TaskID> &task_ids,
const StatusCallback &callback) {
raylet::TaskTable &task_table = client_impl_->raylet_task_table();
task_table.Delete(JobID::Nil(), task_ids);
// TODO(micafan) Always return OK here.
// Confirm if we need to handle the deletion failure and how to handle it.
return Status::OK();
}
Status RedisTaskInfoAccessor::AsyncSubscribe(
const TaskID &task_id, const SubscribeCallback<TaskID, TaskTableData> &subscribe,
const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr);
return task_sub_executor_.AsyncSubscribe(subscribe_id_, task_id, subscribe, done);
}
Status RedisTaskInfoAccessor::AsyncUnsubscribe(const TaskID &task_id,
const StatusCallback &done) {
return task_sub_executor_.AsyncUnsubscribe(subscribe_id_, task_id, done);
}
} // namespace gcs
} // namespace ray

View file

@ -0,0 +1,68 @@
#ifndef RAY_GCS_REDIS_ACTOR_INFO_ACCESSOR_H
#define RAY_GCS_REDIS_ACTOR_INFO_ACCESSOR_H
#include "ray/common/id.h"
#include "ray/common/task/task_spec.h"
#include "ray/gcs/actor_info_accessor.h"
#include "ray/gcs/callback.h"
#include "ray/gcs/subscription_executor.h"
#include "ray/gcs/tables.h"
namespace ray {
namespace gcs {
std::shared_ptr<gcs::ActorTableData> CreateActorTableData(
const TaskSpecification &task_spec, const rpc::Address &address,
gcs::ActorTableData::ActorState state, uint64_t remaining_reconstructions);
class RedisGcsClient;
/// \class RedisActorInfoAccessor
/// `RedisActorInfoAccessor` is an implementation of `ActorInfoAccessor`
/// that uses Redis as the backend storage.
class RedisActorInfoAccessor : public ActorInfoAccessor {
public:
explicit RedisActorInfoAccessor(RedisGcsClient *client_impl);
virtual ~RedisActorInfoAccessor() {}
Status AsyncGet(const ActorID &actor_id,
const OptionalItemCallback<ActorTableData> &callback) override;
Status AsyncRegister(const std::shared_ptr<ActorTableData> &data_ptr,
const StatusCallback &callback) override;
Status AsyncUpdate(const ActorID &actor_id,
const std::shared_ptr<ActorTableData> &data_ptr,
const StatusCallback &callback) override;
Status AsyncSubscribeAll(const SubscribeCallback<ActorID, ActorTableData> &subscribe,
const StatusCallback &done) override;
Status AsyncSubscribe(const ActorID &actor_id,
const SubscribeCallback<ActorID, ActorTableData> &subscribe,
const StatusCallback &done) override;
Status AsyncUnsubscribe(const ActorID &actor_id, const StatusCallback &done) override;
private:
RedisGcsClient *client_impl_{nullptr};
// Use a random ClientID for actor subscription. Because:
// If we use ClientID::Nil, GCS will still send all actors' updates to this GCS Client.
// Even we can filter out irrelevant updates, but there will be extra overhead.
// And because the new GCS Client will no longer hold the local ClientID, so we use
// random ClientID instead.
// TODO(micafan): Remove this random id, once GCS becomes a service.
ClientID node_id_{ClientID::FromRandom()};
typedef SubscriptionExecutor<ActorID, ActorTableData, ActorTable>
ActorSubscriptionExecutor;
ActorSubscriptionExecutor actor_sub_executor_;
};
} // namespace gcs
} // namespace ray
#endif // RAY_GCS_REDIS_ACTOR_INFO_ACCESSOR_H

View file

@ -2,8 +2,9 @@
#include <unistd.h>
#include "ray/common/ray_config.h"
#include "ray/gcs/redis_accessor.h"
#include "ray/gcs/redis_actor_info_accessor.h"
#include "ray/gcs/redis_context.h"
#include "ray/gcs/redis_job_info_accessor.h"
static void GetRedisShards(redisContext *context, std::vector<std::string> &addresses,
std::vector<int> &ports) {
@ -72,8 +73,13 @@ namespace ray {
namespace gcs {
RedisGcsClient::RedisGcsClient(const GcsClientOptions &options)
: GcsClient(options), command_type_(CommandType::kRegular) {}
RedisGcsClient::RedisGcsClient(const GcsClientOptions &options) : GcsClient(options) {
#if RAY_USE_NEW_GCS
command_type_ = CommandType::kChain;
#else
command_type_ = CommandType::kRegular;
#endif
}
RedisGcsClient::RedisGcsClient(const GcsClientOptions &options, CommandType command_type)
: GcsClient(options), command_type_(command_type) {}
@ -144,7 +150,6 @@ Status RedisGcsClient::Connect(boost::asio::io_service &io_service) {
actor_accessor_.reset(new RedisActorInfoAccessor(this));
job_accessor_.reset(new RedisJobInfoAccessor(this));
task_accessor_.reset(new RedisTaskInfoAccessor(this));
is_connected_ = true;

View file

@ -18,14 +18,12 @@ namespace gcs {
class RedisContext;
class RAY_EXPORT RedisGcsClient : public GcsClient {
// TODO(micafan) Will remove those friend class / method after we replace RedisGcsClient
// TODO(micafan) Will remove those friend class after we replace RedisGcsClient
// with interface class GcsClient in raylet.
friend class RedisActorInfoAccessor;
friend class RedisJobInfoAccessor;
friend class RedisTaskInfoAccessor;
friend class SubscriptionExecutorTest;
friend class LogSubscribeTestHelper;
friend class TaskTableTestHelper;
public:
/// Constructor of RedisGcsClient.
@ -57,6 +55,7 @@ class RAY_EXPORT RedisGcsClient : public GcsClient {
// TODO: Some API for getting the error on the driver
ObjectTable &object_table();
raylet::TaskTable &raylet_task_table();
TaskReconstructionLog &task_reconstruction_log();
TaskLeaseTable &task_lease_table();
ClientTable &client_table();
@ -97,8 +96,6 @@ class RAY_EXPORT RedisGcsClient : public GcsClient {
ActorTable &actor_table();
/// This method will be deprecated, use method Jobs() instead.
JobTable &job_table();
/// This method will be deprecated, use method Tasks() instead.
raylet::TaskTable &raylet_task_table();
// GCS command type. If CommandType::kChain, chain-replicated versions of the tables
// might be used, if available.

View file

@ -0,0 +1,50 @@
#include "ray/gcs/redis_job_info_accessor.h"
#include "ray/gcs/pb_util.h"
#include "ray/gcs/redis_gcs_client.h"
namespace ray {
namespace gcs {
RedisJobInfoAccessor::RedisJobInfoAccessor(RedisGcsClient *client_impl)
: client_impl_(client_impl), job_sub_executor_(client_impl->job_table()) {}
Status RedisJobInfoAccessor::AsyncAdd(const std::shared_ptr<JobTableData> &data_ptr,
const StatusCallback &callback) {
return DoAsyncAppend(data_ptr, callback);
}
Status RedisJobInfoAccessor::AsyncMarkFinished(const JobID &job_id,
const StatusCallback &callback) {
std::shared_ptr<JobTableData> data_ptr =
CreateJobTableData(job_id, /*is_dead*/ true, /*time_stamp*/ std::time(nullptr),
/*node_manager_address*/ "", /*driver_pid*/ -1);
return DoAsyncAppend(data_ptr, callback);
}
Status RedisJobInfoAccessor::DoAsyncAppend(const std::shared_ptr<JobTableData> &data_ptr,
const StatusCallback &callback) {
JobTable::WriteCallback on_done = nullptr;
if (callback != nullptr) {
on_done = [callback](RedisGcsClient *client, const JobID &job_id,
const JobTableData &data) { callback(Status::OK()); };
}
JobID job_id = JobID::FromBinary(data_ptr->job_id());
return client_impl_->job_table().Append(job_id, job_id, data_ptr, on_done);
}
Status RedisJobInfoAccessor::AsyncSubscribeToFinishedJobs(
const SubscribeCallback<JobID, JobTableData> &subscribe, const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr);
auto on_subscribe = [subscribe](const JobID &job_id, const JobTableData &job_data) {
if (job_data.is_dead()) {
subscribe(job_id, job_data);
}
};
return job_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), on_subscribe, done);
}
} // namespace gcs
} // namespace ray

View file

@ -0,0 +1,53 @@
#ifndef RAY_GCS_REDIS_JOB_INFO_ACCESSOR_H
#define RAY_GCS_REDIS_JOB_INFO_ACCESSOR_H
#include "ray/common/id.h"
#include "ray/gcs/callback.h"
#include "ray/gcs/job_info_accessor.h"
#include "ray/gcs/subscription_executor.h"
#include "ray/gcs/tables.h"
namespace ray {
namespace gcs {
class RedisGcsClient;
/// \class RedisJobInfoAccessor
/// RedisJobInfoAccessor is an implementation of `JobInfoAccessor`
/// that uses Redis as the backend storage.
class RedisJobInfoAccessor : public JobInfoAccessor {
public:
explicit RedisJobInfoAccessor(RedisGcsClient *client_impl);
virtual ~RedisJobInfoAccessor() {}
Status AsyncAdd(const std::shared_ptr<JobTableData> &data_ptr,
const StatusCallback &callback) override;
Status AsyncMarkFinished(const JobID &job_id, const StatusCallback &callback) override;
Status AsyncSubscribeToFinishedJobs(
const SubscribeCallback<JobID, JobTableData> &subscribe,
const StatusCallback &done) override;
private:
/// Append job information to GCS asynchronously.
///
/// \param data_ptr The job information that will be appended to GCS.
/// \param callback Callback that will be called after append done.
/// \return Status
Status DoAsyncAppend(const std::shared_ptr<JobTableData> &data_ptr,
const StatusCallback &callback);
RedisGcsClient *client_impl_{nullptr};
typedef SubscriptionExecutor<JobID, JobTableData, JobTable> JobSubscriptionExecutor;
JobSubscriptionExecutor job_sub_executor_;
};
} // namespace gcs
} // namespace ray
#endif // RAY_GCS_REDIS_JOB_INFO_ACCESSOR_H

View file

@ -189,7 +189,6 @@ Status SubscriptionExecutor<ID, Data, Table>::AsyncUnsubscribe(
template class SubscriptionExecutor<ActorID, ActorTableData, ActorTable>;
template class SubscriptionExecutor<ActorID, ActorTableData, DirectActorTable>;
template class SubscriptionExecutor<JobID, JobTableData, JobTable>;
template class SubscriptionExecutor<TaskID, TaskTableData, raylet::TaskTable>;
} // namespace gcs

View file

@ -306,13 +306,6 @@ Status Table<ID, Data>::Subscribe(const JobID &job_id, const ClientID &client_id
done);
}
template <typename ID, typename Data>
Status Table<ID, Data>::Subscribe(const JobID &job_id, const ClientID &client_id,
const Callback &subscribe,
const SubscriptionCallback &done) {
return Subscribe(job_id, client_id, subscribe, /*failure*/ nullptr, done);
}
template <typename ID, typename Data>
std::string Table<ID, Data>::DebugString() const {
std::stringstream result;

View file

@ -311,9 +311,6 @@ class Table : private Log<ID, Data>,
using Log<ID, Data>::RequestNotifications;
using Log<ID, Data>::CancelNotifications;
/// Expose this interface for use by subscription tools class SubscriptionExecutor.
/// In this way TaskTable() can also reuse class SubscriptionExecutor.
using Log<ID, Data>::Subscribe;
/// Add an entry to the table. This overwrites any existing data at the key.
///
@ -359,24 +356,6 @@ class Table : private Log<ID, Data>,
const Callback &subscribe, const FailureCallback &failure,
const SubscriptionCallback &done);
/// Subscribe to any Add operations to this table. The caller may choose to
/// subscribe to all Adds, or to subscribe only to keys that it requests
/// notifications for. This may only be called once per Table instance.
///
/// \param job_id The ID of the job.
/// \param client_id The type of update to listen to. If this is nil, then a
/// message for each Add to the table will be received. Else, only
/// messages for the given client will be received. In the latter
/// case, the client may request notifications on specific keys in the
/// table via `RequestNotifications`.
/// \param subscribe Callback that is called on each received message. If the
/// callback is called with an empty vector, then there was no data at the key.
/// \param done Callback that is called when subscription is complete and we
/// are ready to receive messages.
/// \return Status
Status Subscribe(const JobID &job_id, const ClientID &client_id,
const Callback &subscribe, const SubscriptionCallback &done);
void Delete(const JobID &job_id, const ID &id) { Log<ID, Data>::Delete(job_id, id); }
void Delete(const JobID &job_id, const std::vector<ID> &ids) {

View file

@ -7,7 +7,6 @@
#include <thread>
#include <vector>
#include "gtest/gtest.h"
#include "ray/gcs/redis_accessor.h"
#include "ray/gcs/redis_gcs_client.h"
#include "ray/util/test_util.h"

View file

@ -87,278 +87,69 @@ class TestGcsWithChainAsio : public TestGcsWithAsio {
TestGcsWithChainAsio() : TestGcsWithAsio(gcs::CommandType::kChain){};
};
class TaskTableTestHelper {
public:
/// A helper function that creates a GCS `TaskTableData` object.
static std::shared_ptr<TaskTableData> CreateTaskTableData(const TaskID &task_id,
uint64_t num_returns = 0) {
auto data = std::make_shared<TaskTableData>();
data->mutable_task()->mutable_task_spec()->set_task_id(task_id.Binary());
data->mutable_task()->mutable_task_spec()->set_num_returns(num_returns);
return data;
}
/// A helper function that creates a GCS `TaskTableData` object.
std::shared_ptr<TaskTableData> CreateTaskTableData(const TaskID &task_id,
uint64_t num_returns = 0) {
auto data = std::make_shared<TaskTableData>();
data->mutable_task()->mutable_task_spec()->set_task_id(task_id.Binary());
data->mutable_task()->mutable_task_spec()->set_num_returns(num_returns);
return data;
}
/// A helper function that compare whether 2 `TaskTableData` objects are equal.
/// Note, this function only compares fields set by `CreateTaskTableData`.
static bool TaskTableDataEqual(const TaskTableData &data1, const TaskTableData &data2) {
const auto &spec1 = data1.task().task_spec();
const auto &spec2 = data2.task().task_spec();
return (spec1.task_id() == spec2.task_id() &&
spec1.num_returns() == spec2.num_returns());
}
/// A helper function that compare whether 2 `TaskTableData` objects are equal.
/// Note, this function only compares fields set by `CreateTaskTableData`.
bool TaskTableDataEqual(const TaskTableData &data1, const TaskTableData &data2) {
const auto &spec1 = data1.task().task_spec();
const auto &spec2 = data2.task().task_spec();
return (spec1.task_id() == spec2.task_id() &&
spec1.num_returns() == spec2.num_returns());
}
static void TestTableLookup(const JobID &job_id,
std::shared_ptr<gcs::RedisGcsClient> client) {
const auto task_id = RandomTaskId();
const auto data = CreateTaskTableData(task_id);
void TestTableLookup(const JobID &job_id, std::shared_ptr<gcs::RedisGcsClient> client) {
const auto task_id = RandomTaskId();
const auto data = CreateTaskTableData(task_id);
// Check that we added the correct task.
auto add_callback = [task_id, data](gcs::RedisGcsClient *client, const TaskID &id,
const TaskTableData &d) {
ASSERT_EQ(id, task_id);
ASSERT_TRUE(TaskTableDataEqual(*data, d));
};
// Check that we added the correct task.
auto add_callback = [task_id, data](gcs::RedisGcsClient *client, const TaskID &id,
const TaskTableData &d) {
ASSERT_EQ(id, task_id);
ASSERT_TRUE(TaskTableDataEqual(*data, d));
};
// Check that the lookup returns the added task.
auto lookup_callback = [task_id, data](gcs::RedisGcsClient *client, const TaskID &id,
const TaskTableData &d) {
ASSERT_EQ(id, task_id);
ASSERT_TRUE(TaskTableDataEqual(*data, d));
test->Stop();
};
// Check that the lookup returns the added task.
auto lookup_callback = [task_id, data](gcs::RedisGcsClient *client, const TaskID &id,
const TaskTableData &d) {
ASSERT_EQ(id, task_id);
ASSERT_TRUE(TaskTableDataEqual(*data, d));
test->Stop();
};
// Check that the lookup does not return an empty entry.
auto failure_callback = [](gcs::RedisGcsClient *client, const TaskID &id) {
RAY_CHECK(false);
};
// Check that the lookup does not return an empty entry.
auto failure_callback = [](gcs::RedisGcsClient *client, const TaskID &id) {
RAY_CHECK(false);
};
// Add the task, then do a lookup.
RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id, data, add_callback));
RAY_CHECK_OK(client->raylet_task_table().Lookup(job_id, task_id, lookup_callback,
failure_callback));
// Run the event loop. The loop will only stop if the Lookup callback is
// called (or an assertion failure).
test->Start();
}
static void TestTableLookupFailure(const JobID &job_id,
std::shared_ptr<gcs::RedisGcsClient> client) {
TaskID task_id = RandomTaskId();
// Check that the lookup does not return data.
auto lookup_callback = [](gcs::RedisGcsClient *client, const TaskID &id,
const TaskTableData &d) { RAY_CHECK(false); };
// Check that the lookup returns an empty entry.
auto failure_callback = [task_id](gcs::RedisGcsClient *client, const TaskID &id) {
ASSERT_EQ(id, task_id);
test->Stop();
};
// Lookup the task. We have not done any writes, so the key should be empty.
RAY_CHECK_OK(client->raylet_task_table().Lookup(job_id, task_id, lookup_callback,
failure_callback));
// Run the event loop. The loop will only stop if the failure callback is
// called (or an assertion failure).
test->Start();
}
static void TestDeleteKeysFromTable(
const JobID &job_id, std::shared_ptr<gcs::RedisGcsClient> client,
std::vector<std::shared_ptr<TaskTableData>> &data_vector, bool stop_at_end) {
std::vector<TaskID> ids;
TaskID task_id;
for (auto &data : data_vector) {
task_id = RandomTaskId();
ids.push_back(task_id);
// Check that we added the correct object entries.
auto add_callback = [task_id, data](gcs::RedisGcsClient *client, const TaskID &id,
const TaskTableData &d) {
ASSERT_EQ(id, task_id);
ASSERT_TRUE(TaskTableDataEqual(*data, d));
test->IncrementNumCallbacks();
};
RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id, data, add_callback));
}
for (const auto &task_id : ids) {
auto task_lookup_callback = [task_id](gcs::RedisGcsClient *client, const TaskID &id,
const TaskTableData &data) {
ASSERT_EQ(id, task_id);
test->IncrementNumCallbacks();
};
RAY_CHECK_OK(client->raylet_task_table().Lookup(job_id, task_id,
task_lookup_callback, nullptr));
}
if (ids.size() == 1) {
client->raylet_task_table().Delete(job_id, ids[0]);
} else {
client->raylet_task_table().Delete(job_id, ids);
}
auto expected_failure_callback = [](RedisGcsClient *client, const TaskID &id) {
ASSERT_TRUE(true);
test->IncrementNumCallbacks();
};
auto undesired_callback = [](gcs::RedisGcsClient *client, const TaskID &id,
const TaskTableData &data) { ASSERT_TRUE(false); };
for (size_t i = 0; i < ids.size(); ++i) {
RAY_CHECK_OK(client->raylet_task_table().Lookup(job_id, task_id, undesired_callback,
expected_failure_callback));
}
if (stop_at_end) {
auto stop_callback = [](RedisGcsClient *client, const TaskID &id) { test->Stop(); };
RAY_CHECK_OK(
client->raylet_task_table().Lookup(job_id, ids[0], nullptr, stop_callback));
}
}
static void TestTableSubscribeId(const JobID &job_id,
std::shared_ptr<gcs::RedisGcsClient> client) {
size_t num_modifications = 3;
// Add a table entry.
TaskID task_id1 = RandomTaskId();
// Add a table entry at a second key.
TaskID task_id2 = RandomTaskId();
// The callback for a notification from the table. This should only be
// received for keys that we requested notifications for.
auto notification_callback = [task_id2, num_modifications](
gcs::RedisGcsClient *client, const TaskID &id,
const TaskTableData &data) {
// Check that we only get notifications for the requested key.
ASSERT_EQ(id, task_id2);
// Check that we get notifications in the same order as the writes.
ASSERT_TRUE(
TaskTableDataEqual(data, *CreateTaskTableData(task_id2, test->NumCallbacks())));
test->IncrementNumCallbacks();
if (test->NumCallbacks() == num_modifications) {
test->Stop();
}
};
// The failure callback should be called once since both keys start as empty.
bool failure_notification_received = false;
auto failure_callback = [task_id2, &failure_notification_received](
gcs::RedisGcsClient *client, const TaskID &id) {
ASSERT_EQ(id, task_id2);
// The failure notification should be the first notification received.
ASSERT_EQ(test->NumCallbacks(), 0);
failure_notification_received = true;
};
// The callback for subscription success. Once we've subscribed, request
// notifications for only one of the keys, then write to both keys.
auto subscribe_callback = [job_id, task_id1, task_id2,
num_modifications](gcs::RedisGcsClient *client) {
// Request notifications for one of the keys.
RAY_CHECK_OK(client->raylet_task_table().RequestNotifications(
job_id, task_id2, client->client_table().GetLocalClientId(), nullptr));
// Write both keys. We should only receive notifications for the key that
// we requested them for.
for (uint64_t i = 0; i < num_modifications; i++) {
auto data = CreateTaskTableData(task_id1, i);
RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id1, data, nullptr));
}
for (uint64_t i = 0; i < num_modifications; i++) {
auto data = CreateTaskTableData(task_id2, i);
RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id2, data, nullptr));
}
};
// Subscribe to notifications for this client. This allows us to request and
// receive notifications for specific keys.
RAY_CHECK_OK(client->raylet_task_table().Subscribe(
job_id, client->client_table().GetLocalClientId(), notification_callback,
failure_callback, subscribe_callback));
// Run the event loop. The loop will only stop if the registered subscription
// callback is called for the requested key.
test->Start();
// Check that the failure callback was called since the key was initially
// empty.
ASSERT_TRUE(failure_notification_received);
// Check that we received one notification callback for each write to the
// requested key.
ASSERT_EQ(test->NumCallbacks(), num_modifications);
}
static void TestTableSubscribeCancel(const JobID &job_id,
std::shared_ptr<gcs::RedisGcsClient> client) {
// Add a table entry.
const auto task_id = RandomTaskId();
const int num_modifications = 3;
const auto data = CreateTaskTableData(task_id, 0);
RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id, data, nullptr));
// The failure callback should not be called since all keys are non-empty
// when notifications are requested.
auto failure_callback = [](gcs::RedisGcsClient *client, const TaskID &id) {
RAY_CHECK(false);
};
// The callback for a notification from the table. This should only be
// received for keys that we requested notifications for.
auto notification_callback = [task_id](gcs::RedisGcsClient *client, const TaskID &id,
const TaskTableData &data) {
ASSERT_EQ(id, task_id);
// Check that we only get notifications for the first and last writes,
// since notifications are canceled in between.
if (test->NumCallbacks() == 0) {
ASSERT_TRUE(TaskTableDataEqual(data, *CreateTaskTableData(task_id, 0)));
} else {
ASSERT_TRUE(TaskTableDataEqual(
data, *CreateTaskTableData(task_id, num_modifications - 1)));
}
test->IncrementNumCallbacks();
if (test->NumCallbacks() == num_modifications - 1) {
test->Stop();
}
};
// The callback for a notification from the table. This should only be
// received for keys that we requested notifications for.
auto subscribe_callback = [job_id, task_id](gcs::RedisGcsClient *client) {
// Request notifications, then cancel immediately. We should receive a
// notification for the current value at the key.
RAY_CHECK_OK(client->raylet_task_table().RequestNotifications(
job_id, task_id, client->client_table().GetLocalClientId(), nullptr));
RAY_CHECK_OK(client->raylet_task_table().CancelNotifications(
job_id, task_id, client->client_table().GetLocalClientId(), nullptr));
// Write to the key. Since we canceled notifications, we should not receive
// a notification for these writes.
for (uint64_t i = 1; i < num_modifications; i++) {
auto data = CreateTaskTableData(task_id, i);
RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id, data, nullptr));
}
// Request notifications again. We should receive a notification for the
// current value at the key.
RAY_CHECK_OK(client->raylet_task_table().RequestNotifications(
job_id, task_id, client->client_table().GetLocalClientId(), nullptr));
};
// Subscribe to notifications for this client. This allows us to request and
// receive notifications for specific keys.
RAY_CHECK_OK(client->raylet_task_table().Subscribe(
job_id, client->client_table().GetLocalClientId(), notification_callback,
failure_callback, subscribe_callback));
// Run the event loop. The loop will only stop if the registered subscription
// callback is called for the requested key.
test->Start();
// Check that we received a notification callback for the first and least
// writes to the key, since notifications are canceled in between.
ASSERT_EQ(test->NumCallbacks(), 2);
}
};
// Add the task, then do a lookup.
RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id, data, add_callback));
RAY_CHECK_OK(client->raylet_task_table().Lookup(job_id, task_id, lookup_callback,
failure_callback));
// Run the event loop. The loop will only stop if the Lookup callback is
// called (or an assertion failure).
test->Start();
}
// Convenient macro to test across {ae, asio} x {regular, chain} x {the tests}.
// Undefined at the end.
#define TEST_TASK_TABLE_MACRO(FIXTURE, TEST) \
TEST_F(FIXTURE, TEST) { \
test = this; \
TaskTableTestHelper::TEST(job_id_, client_); \
#define TEST_MACRO(FIXTURE, TEST) \
TEST_F(FIXTURE, TEST) { \
test = this; \
TEST(job_id_, client_); \
}
TEST_TASK_TABLE_MACRO(TestGcsWithAsio, TestTableLookup);
TEST_MACRO(TestGcsWithAsio, TestTableLookup);
#if RAY_USE_NEW_GCS
TEST_MACRO(TestGcsWithChainAsio, TestTableLookup);
#endif
void TestLogLookup(const JobID &job_id, std::shared_ptr<gcs::RedisGcsClient> client) {
// Append some entries to the log at an object ID.
@ -405,7 +196,32 @@ TEST_F(TestGcsWithAsio, TestLogLookup) {
TestLogLookup(job_id_, client_);
}
TEST_TASK_TABLE_MACRO(TestGcsWithAsio, TestTableLookupFailure);
void TestTableLookupFailure(const JobID &job_id,
std::shared_ptr<gcs::RedisGcsClient> client) {
TaskID task_id = RandomTaskId();
// Check that the lookup does not return data.
auto lookup_callback = [](gcs::RedisGcsClient *client, const TaskID &id,
const TaskTableData &d) { RAY_CHECK(false); };
// Check that the lookup returns an empty entry.
auto failure_callback = [task_id](gcs::RedisGcsClient *client, const TaskID &id) {
ASSERT_EQ(id, task_id);
test->Stop();
};
// Lookup the task. We have not done any writes, so the key should be empty.
RAY_CHECK_OK(client->raylet_task_table().Lookup(job_id, task_id, lookup_callback,
failure_callback));
// Run the event loop. The loop will only stop if the failure callback is
// called (or an assertion failure).
test->Start();
}
TEST_MACRO(TestGcsWithAsio, TestTableLookupFailure);
#if RAY_USE_NEW_GCS
TEST_MACRO(TestGcsWithChainAsio, TestTableLookupFailure);
#endif
void TestLogAppendAt(const JobID &job_id, std::shared_ptr<gcs::RedisGcsClient> client) {
TaskID task_id = RandomTaskId();
@ -579,6 +395,55 @@ void TestDeleteKeysFromLog(
}
}
void TestDeleteKeysFromTable(const JobID &job_id,
std::shared_ptr<gcs::RedisGcsClient> client,
std::vector<std::shared_ptr<TaskTableData>> &data_vector,
bool stop_at_end) {
std::vector<TaskID> ids;
TaskID task_id;
for (auto &data : data_vector) {
task_id = RandomTaskId();
ids.push_back(task_id);
// Check that we added the correct object entries.
auto add_callback = [task_id, data](gcs::RedisGcsClient *client, const TaskID &id,
const TaskTableData &d) {
ASSERT_EQ(id, task_id);
ASSERT_TRUE(TaskTableDataEqual(*data, d));
test->IncrementNumCallbacks();
};
RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id, data, add_callback));
}
for (const auto &task_id : ids) {
auto task_lookup_callback = [task_id](gcs::RedisGcsClient *client, const TaskID &id,
const TaskTableData &data) {
ASSERT_EQ(id, task_id);
test->IncrementNumCallbacks();
};
RAY_CHECK_OK(client->raylet_task_table().Lookup(job_id, task_id, task_lookup_callback,
nullptr));
}
if (ids.size() == 1) {
client->raylet_task_table().Delete(job_id, ids[0]);
} else {
client->raylet_task_table().Delete(job_id, ids);
}
auto expected_failure_callback = [](RedisGcsClient *client, const TaskID &id) {
ASSERT_TRUE(true);
test->IncrementNumCallbacks();
};
auto undesired_callback = [](gcs::RedisGcsClient *client, const TaskID &id,
const TaskTableData &data) { ASSERT_TRUE(false); };
for (size_t i = 0; i < ids.size(); ++i) {
RAY_CHECK_OK(client->raylet_task_table().Lookup(job_id, task_id, undesired_callback,
expected_failure_callback));
}
if (stop_at_end) {
auto stop_callback = [](RedisGcsClient *client, const TaskID &id) { test->Stop(); };
RAY_CHECK_OK(
client->raylet_task_table().Lookup(job_id, ids[0], nullptr, stop_callback));
}
}
void TestDeleteKeysFromSet(const JobID &job_id,
std::shared_ptr<gcs::RedisGcsClient> client,
std::vector<std::shared_ptr<ObjectTableData>> &data_vector) {
@ -658,21 +523,21 @@ void TestDeleteKeys(const JobID &job_id, std::shared_ptr<gcs::RedisGcsClient> cl
std::vector<std::shared_ptr<TaskTableData>> task_vector;
auto AppendTaskData = [&task_vector](size_t add_count) {
for (size_t i = 0; i < add_count; ++i) {
task_vector.push_back(TaskTableTestHelper::CreateTaskTableData(RandomTaskId()));
task_vector.push_back(CreateTaskTableData(RandomTaskId()));
}
};
AppendTaskData(1);
ASSERT_EQ(task_vector.size(), 1);
TaskTableTestHelper::TestDeleteKeysFromTable(job_id, client, task_vector, false);
TestDeleteKeysFromTable(job_id, client, task_vector, false);
AppendTaskData(RayConfig::instance().maximum_gcs_deletion_batch_size() / 2);
ASSERT_GT(task_vector.size(), 1);
ASSERT_LT(task_vector.size(), RayConfig::instance().maximum_gcs_deletion_batch_size());
TaskTableTestHelper::TestDeleteKeysFromTable(job_id, client, task_vector, false);
TestDeleteKeysFromTable(job_id, client, task_vector, false);
AppendTaskData(RayConfig::instance().maximum_gcs_deletion_batch_size() / 2);
ASSERT_GT(task_vector.size(), RayConfig::instance().maximum_gcs_deletion_batch_size());
TaskTableTestHelper::TestDeleteKeysFromTable(job_id, client, task_vector, true);
TestDeleteKeysFromTable(job_id, client, task_vector, true);
test->Start();
ASSERT_GT(test->NumCallbacks(),
@ -976,7 +841,81 @@ TEST_F(TestGcsWithAsio, TestSetSubscribeAll) {
TestSetSubscribeAll(job_id_, client_);
}
TEST_TASK_TABLE_MACRO(TestGcsWithAsio, TestTableSubscribeId);
void TestTableSubscribeId(const JobID &job_id,
std::shared_ptr<gcs::RedisGcsClient> client) {
size_t num_modifications = 3;
// Add a table entry.
TaskID task_id1 = RandomTaskId();
// Add a table entry at a second key.
TaskID task_id2 = RandomTaskId();
// The callback for a notification from the table. This should only be
// received for keys that we requested notifications for.
auto notification_callback = [task_id2, num_modifications](gcs::RedisGcsClient *client,
const TaskID &id,
const TaskTableData &data) {
// Check that we only get notifications for the requested key.
ASSERT_EQ(id, task_id2);
// Check that we get notifications in the same order as the writes.
ASSERT_TRUE(
TaskTableDataEqual(data, *CreateTaskTableData(task_id2, test->NumCallbacks())));
test->IncrementNumCallbacks();
if (test->NumCallbacks() == num_modifications) {
test->Stop();
}
};
// The failure callback should be called once since both keys start as empty.
bool failure_notification_received = false;
auto failure_callback = [task_id2, &failure_notification_received](
gcs::RedisGcsClient *client, const TaskID &id) {
ASSERT_EQ(id, task_id2);
// The failure notification should be the first notification received.
ASSERT_EQ(test->NumCallbacks(), 0);
failure_notification_received = true;
};
// The callback for subscription success. Once we've subscribed, request
// notifications for only one of the keys, then write to both keys.
auto subscribe_callback = [job_id, task_id1, task_id2,
num_modifications](gcs::RedisGcsClient *client) {
// Request notifications for one of the keys.
RAY_CHECK_OK(client->raylet_task_table().RequestNotifications(
job_id, task_id2, client->client_table().GetLocalClientId(), nullptr));
// Write both keys. We should only receive notifications for the key that
// we requested them for.
for (uint64_t i = 0; i < num_modifications; i++) {
auto data = CreateTaskTableData(task_id1, i);
RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id1, data, nullptr));
}
for (uint64_t i = 0; i < num_modifications; i++) {
auto data = CreateTaskTableData(task_id2, i);
RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id2, data, nullptr));
}
};
// Subscribe to notifications for this client. This allows us to request and
// receive notifications for specific keys.
RAY_CHECK_OK(client->raylet_task_table().Subscribe(
job_id, client->client_table().GetLocalClientId(), notification_callback,
failure_callback, subscribe_callback));
// Run the event loop. The loop will only stop if the registered subscription
// callback is called for the requested key.
test->Start();
// Check that the failure callback was called since the key was initially
// empty.
ASSERT_TRUE(failure_notification_received);
// Check that we received one notification callback for each write to the
// requested key.
ASSERT_EQ(test->NumCallbacks(), num_modifications);
}
TEST_MACRO(TestGcsWithAsio, TestTableSubscribeId);
#if RAY_USE_NEW_GCS
TEST_MACRO(TestGcsWithChainAsio, TestTableSubscribeId);
#endif
TEST_F(TestGcsWithAsio, TestLogSubscribeId) {
test = this;
@ -1059,7 +998,77 @@ TEST_F(TestGcsWithAsio, TestSetSubscribeId) {
TestSetSubscribeId(job_id_, client_);
}
TEST_TASK_TABLE_MACRO(TestGcsWithAsio, TestTableSubscribeCancel);
void TestTableSubscribeCancel(const JobID &job_id,
std::shared_ptr<gcs::RedisGcsClient> client) {
// Add a table entry.
const auto task_id = RandomTaskId();
const int num_modifications = 3;
const auto data = CreateTaskTableData(task_id, 0);
RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id, data, nullptr));
// The failure callback should not be called since all keys are non-empty
// when notifications are requested.
auto failure_callback = [](gcs::RedisGcsClient *client, const TaskID &id) {
RAY_CHECK(false);
};
// The callback for a notification from the table. This should only be
// received for keys that we requested notifications for.
auto notification_callback = [task_id](gcs::RedisGcsClient *client, const TaskID &id,
const TaskTableData &data) {
ASSERT_EQ(id, task_id);
// Check that we only get notifications for the first and last writes,
// since notifications are canceled in between.
if (test->NumCallbacks() == 0) {
ASSERT_TRUE(TaskTableDataEqual(data, *CreateTaskTableData(task_id, 0)));
} else {
ASSERT_TRUE(
TaskTableDataEqual(data, *CreateTaskTableData(task_id, num_modifications - 1)));
}
test->IncrementNumCallbacks();
if (test->NumCallbacks() == num_modifications - 1) {
test->Stop();
}
};
// The callback for a notification from the table. This should only be
// received for keys that we requested notifications for.
auto subscribe_callback = [job_id, task_id](gcs::RedisGcsClient *client) {
// Request notifications, then cancel immediately. We should receive a
// notification for the current value at the key.
RAY_CHECK_OK(client->raylet_task_table().RequestNotifications(
job_id, task_id, client->client_table().GetLocalClientId(), nullptr));
RAY_CHECK_OK(client->raylet_task_table().CancelNotifications(
job_id, task_id, client->client_table().GetLocalClientId(), nullptr));
// Write to the key. Since we canceled notifications, we should not receive
// a notification for these writes.
for (uint64_t i = 1; i < num_modifications; i++) {
auto data = CreateTaskTableData(task_id, i);
RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id, data, nullptr));
}
// Request notifications again. We should receive a notification for the
// current value at the key.
RAY_CHECK_OK(client->raylet_task_table().RequestNotifications(
job_id, task_id, client->client_table().GetLocalClientId(), nullptr));
};
// Subscribe to notifications for this client. This allows us to request and
// receive notifications for specific keys.
RAY_CHECK_OK(client->raylet_task_table().Subscribe(
job_id, client->client_table().GetLocalClientId(), notification_callback,
failure_callback, subscribe_callback));
// Run the event loop. The loop will only stop if the registered subscription
// callback is called for the requested key.
test->Start();
// Check that we received a notification callback for the first and least
// writes to the key, since notifications are canceled in between.
ASSERT_EQ(test->NumCallbacks(), 2);
}
TEST_MACRO(TestGcsWithAsio, TestTableSubscribeCancel);
#if RAY_USE_NEW_GCS
TEST_MACRO(TestGcsWithChainAsio, TestTableSubscribeCancel);
#endif
TEST_F(TestGcsWithAsio, TestLogSubscribeCancel) {
test = this;
@ -1422,7 +1431,7 @@ TEST_F(TestGcsWithAsio, TestHashTable) {
TestHashTable(job_id_, client_);
}
#undef TEST_TASK_TABLE_MACRO
#undef TEST_MACRO
} // namespace gcs
} // namespace ray

View file

@ -1,3 +1,4 @@
#include "ray/gcs/redis_job_info_accessor.h"
#include <memory>
#include "gtest/gtest.h"
#include "ray/gcs/pb_util.h"

View file

@ -1,8 +1,8 @@
#include "lineage_cache.h"
#include <sstream>
#include "ray/gcs/redis_gcs_client.h"
#include "ray/stats/stats.h"
#include <sstream>
namespace ray {
namespace raylet {
@ -152,15 +152,16 @@ const std::unordered_set<TaskID> &Lineage::GetChildren(const TaskID &task_id) co
}
}
LineageCache::LineageCache(std::shared_ptr<gcs::RedisGcsClient> gcs_client,
LineageCache::LineageCache(const ClientID &client_id,
gcs::TableInterface<TaskID, TaskTableData> &task_storage,
gcs::PubsubInterface<TaskID> &task_pubsub,
uint64_t max_lineage_size)
: gcs_client_(gcs_client) {}
: client_id_(client_id), task_storage_(task_storage), task_pubsub_(task_pubsub) {}
/// A helper function to add some uncommitted lineage to the local cache.
void LineageCache::AddUncommittedLineage(const TaskID &task_id,
const Lineage &uncommitted_lineage) {
RAY_LOG(DEBUG) << "Adding uncommitted task " << task_id << " on "
<< gcs_client_->client_table().GetLocalClientId();
RAY_LOG(DEBUG) << "Adding uncommitted task " << task_id << " on " << client_id_;
// If the entry is not found in the lineage to merge, then we stop since
// there is nothing to copy into the merged lineage.
auto entry = uncommitted_lineage.GetEntry(task_id);
@ -191,8 +192,7 @@ bool LineageCache::CommitTask(const Task &task) {
return true;
}
const TaskID task_id = task.GetTaskSpecification().TaskId();
RAY_LOG(DEBUG) << "Committing task " << task_id << " on "
<< gcs_client_->client_table().GetLocalClientId();
RAY_LOG(DEBUG) << "Committing task " << task_id << " on " << client_id_;
if (lineage_.SetEntry(task, GcsStatus::UNCOMMITTED) ||
lineage_.GetEntry(task_id)->GetStatus() == GcsStatus::UNCOMMITTED) {
@ -275,17 +275,17 @@ void LineageCache::FlushTask(const TaskID &task_id) {
RAY_CHECK(entry);
RAY_CHECK(entry->GetStatus() < GcsStatus::COMMITTING);
auto task_callback = [this, task_id](Status status) {
RAY_CHECK(status.ok());
HandleEntryCommitted(task_id);
};
gcs::raylet::TaskTable::WriteCallback task_callback =
[this](ray::gcs::RedisGcsClient *client, const TaskID &id,
const TaskTableData &data) { HandleEntryCommitted(id); };
auto task = lineage_.GetEntry(task_id);
auto task_data = std::make_shared<TaskTableData>();
task_data->mutable_task()->mutable_task_spec()->CopyFrom(
task->TaskData().GetTaskSpecification().GetMessage());
task_data->mutable_task()->mutable_task_execution_spec()->CopyFrom(
task->TaskData().GetTaskExecutionSpec().GetMessage());
RAY_CHECK_OK(gcs_client_->Tasks().AsyncAdd(task_data, task_callback));
RAY_CHECK_OK(task_storage_.Add(JobID(task->TaskData().GetTaskSpecification().JobId()),
task_id, task_data, task_callback));
// We successfully wrote the task, so mark it as committing.
// TODO(swang): Use a batched interface and write with all object entries.
@ -296,12 +296,10 @@ bool LineageCache::SubscribeTask(const TaskID &task_id) {
auto inserted = subscribed_tasks_.insert(task_id);
bool unsubscribed = inserted.second;
if (unsubscribed) {
auto subscribe = [this](const TaskID &task_id, const TaskTableData) {
HandleEntryCommitted(task_id);
};
// Subscribe to the task.
RAY_CHECK_OK(gcs_client_->Tasks().AsyncSubscribe(task_id, subscribe,
/*done*/ nullptr));
// Request notifications for the task if we haven't already requested
// notifications for it.
RAY_CHECK_OK(task_pubsub_.RequestNotifications(JobID::Nil(), task_id, client_id_,
/*done*/ nullptr));
}
// Return whether we were previously unsubscribed to this task and are now
// subscribed.
@ -312,8 +310,10 @@ bool LineageCache::UnsubscribeTask(const TaskID &task_id) {
auto it = subscribed_tasks_.find(task_id);
bool subscribed = (it != subscribed_tasks_.end());
if (subscribed) {
// Cancel subscribe to the task.
RAY_CHECK_OK(gcs_client_->Tasks().AsyncUnsubscribe(task_id, /*done*/ nullptr));
// Cancel notifications for the task if we previously requested
// notifications for it.
RAY_CHECK_OK(task_pubsub_.CancelNotifications(JobID::Nil(), task_id, client_id_,
/*done*/ nullptr));
subscribed_tasks_.erase(it);
}
// Return whether we were previously subscribed to this task and are now
@ -339,8 +339,7 @@ void LineageCache::EvictTask(const TaskID &task_id) {
}
// Evict the task.
RAY_LOG(DEBUG) << "Evicting task " << task_id << " on "
<< gcs_client_->client_table().GetLocalClientId();
RAY_LOG(DEBUG) << "Evicting task " << task_id << " on " << client_id_;
lineage_.PopEntry(task_id);
// Try to evict the children of the evict task. These are the tasks that have
// a dependency on the evicted task.

View file

@ -12,7 +12,7 @@
#include "ray/common/id.h"
#include "ray/common/status.h"
#include "ray/common/task/task.h"
#include "ray/gcs/redis_gcs_client.h"
#include "ray/gcs/tables.h"
namespace ray {
@ -209,8 +209,9 @@ class LineageCache {
public:
/// Create a lineage cache for the given task storage system.
/// TODO(swang): Pass in the policy (interface?).
LineageCache(std::shared_ptr<gcs::RedisGcsClient> gcs_client,
uint64_t max_lineage_size);
LineageCache(const ClientID &client_id,
gcs::TableInterface<TaskID, TaskTableData> &task_storage,
gcs::PubsubInterface<TaskID> &task_pubsub, uint64_t max_lineage_size);
/// Asynchronously commit a task to the GCS.
///
@ -302,13 +303,19 @@ class LineageCache {
/// was successful (whether we were subscribed).
bool UnsubscribeTask(const TaskID &task_id);
/// A client connection to the GCS.
std::shared_ptr<gcs::RedisGcsClient> gcs_client_;
/// The client ID, used to request notifications for specific tasks.
/// TODO(swang): Move the ClientID into the generic Table implementation.
ClientID client_id_;
/// The durable storage system for task information.
gcs::TableInterface<TaskID, TaskTableData> &task_storage_;
/// The pubsub storage system for task information. This can be used to
/// request notifications for the commit of a task entry.
gcs::PubsubInterface<TaskID> &task_pubsub_;
/// All tasks and objects that we are responsible for writing back to the
/// GCS, and the tasks and objects in their lineage.
Lineage lineage_;
/// The tasks that we've subscribed to.
/// We will receive a notification for these tasks on commit.
/// The tasks that we've subscribed to notifications for from the pubsub
/// storage system. We will receive a notification for these tasks on commit.
std::unordered_set<TaskID> subscribed_tasks_;
};

View file

@ -102,7 +102,9 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
gcs_client_->client_table().GetLocalClientId(),
RayConfig::instance().initial_reconstruction_timeout_milliseconds(),
gcs_client_->task_lease_table()),
lineage_cache_(gcs_client_, config.max_lineage_size),
lineage_cache_(gcs_client_->client_table().GetLocalClientId(),
gcs_client_->raylet_task_table(), gcs_client_->raylet_task_table(),
config.max_lineage_size),
actor_registry_(),
node_manager_server_("NodeManager", config.node_manager_port),
node_manager_service_(io_service, *this),
@ -138,6 +140,18 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
ray::Status NodeManager::RegisterGcs() {
object_manager_.RegisterGcs();
// Subscribe to task entry commits in the GCS. These notifications are
// forwarded to the lineage cache, which requests notifications about tasks
// that were executed remotely.
const auto task_committed_callback = [this](gcs::RedisGcsClient *client,
const TaskID &task_id,
const TaskTableData &task_data) {
lineage_cache_.HandleEntryCommitted(task_id);
};
RAY_RETURN_NOT_OK(gcs_client_->raylet_task_table().Subscribe(
JobID::Nil(), gcs_client_->client_table().GetLocalClientId(),
task_committed_callback, nullptr, nullptr));
const auto task_lease_notification_callback = [this](gcs::RedisGcsClient *client,
const TaskID &task_id,
const TaskLeaseData &task_lease) {
@ -970,7 +984,7 @@ void NodeManager::ProcessClientMessage(
for (const auto &object_id : object_ids) {
creating_task_ids.push_back(object_id.TaskId());
}
RAY_CHECK_OK(gcs_client_->Tasks().AsyncDelete(creating_task_ids, nullptr));
gcs_client_->raylet_task_table().Delete(JobID::Nil(), creating_task_ids);
}
} break;
case protocol::MessageType::PrepareActorCheckpointRequest: {
@ -2423,26 +2437,27 @@ void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) {
auto parent_task_id = task_spec.ParentTaskId();
int port = worker.Port();
RAY_CHECK_OK(
gcs_client_->Tasks().AsyncGet(
parent_task_id,
/*callback=*/
[this, task_spec, resumed_from_checkpoint, port, parent_task_id](
Status status, const boost::optional<TaskTableData> &parent_task_data) {
RAY_CHECK(status.ok());
if (parent_task_data) {
// The task was in the GCS task table. Use the stored task spec to
// get the parent actor id.
Task parent_task(parent_task_data->task());
ActorID parent_actor_id = ActorID::Nil();
if (parent_task.GetTaskSpecification().IsActorCreationTask()) {
parent_actor_id = parent_task.GetTaskSpecification().ActorCreationId();
} else if (parent_task.GetTaskSpecification().IsActorTask()) {
parent_actor_id = parent_task.GetTaskSpecification().ActorId();
}
FinishAssignedActorCreationTask(parent_actor_id, task_spec,
resumed_from_checkpoint, port);
return;
gcs_client_->raylet_task_table().Lookup(
JobID::Nil(), parent_task_id,
/*success_callback=*/
[this, task_spec, resumed_from_checkpoint, port](
ray::gcs::RedisGcsClient *client, const TaskID &parent_task_id,
const TaskTableData &parent_task_data) {
// The task was in the GCS task table. Use the stored task spec to
// get the parent actor id.
Task parent_task(parent_task_data.task());
ActorID parent_actor_id = ActorID::Nil();
if (parent_task.GetTaskSpecification().IsActorCreationTask()) {
parent_actor_id = parent_task.GetTaskSpecification().ActorCreationId();
} else if (parent_task.GetTaskSpecification().IsActorTask()) {
parent_actor_id = parent_task.GetTaskSpecification().ActorId();
}
FinishAssignedActorCreationTask(parent_actor_id, task_spec,
resumed_from_checkpoint, port);
},
/*failure_callback=*/
[this, task_spec, resumed_from_checkpoint, port](
ray::gcs::RedisGcsClient *client, const TaskID &parent_task_id) {
// The parent task was not in the GCS task table. It should most likely be
// in the lineage cache.
ActorID parent_actor_id = ActorID::Nil();
@ -2559,18 +2574,18 @@ void NodeManager::FinishAssignedActorCreationTask(const ActorID &parent_actor_id
void NodeManager::HandleTaskReconstruction(const TaskID &task_id,
const ObjectID &required_object_id) {
// Retrieve the task spec in order to re-execute the task.
RAY_CHECK_OK(gcs_client_->Tasks().AsyncGet(
task_id,
/*callback=*/
[this, required_object_id, task_id](
Status status, const boost::optional<TaskTableData> &task_data) {
RAY_CHECK(status.ok());
if (task_data) {
// The task was in the GCS task table. Use the stored task spec to
// re-execute the task.
ResubmitTask(Task(task_data->task()), required_object_id);
return;
}
RAY_CHECK_OK(gcs_client_->raylet_task_table().Lookup(
JobID::Nil(), task_id,
/*success_callback=*/
[this, required_object_id](ray::gcs::RedisGcsClient *client, const TaskID &task_id,
const TaskTableData &task_data) {
// The task was in the GCS task table. Use the stored task spec to
// re-execute the task.
ResubmitTask(Task(task_data.task()), required_object_id);
},
/*failure_callback=*/
[this, required_object_id](ray::gcs::RedisGcsClient *client,
const TaskID &task_id) {
// The task was not in the GCS task table. It must therefore be in the
// lineage cache.
if (lineage_cache_.ContainsTask(task_id)) {