Remove CoreWorkerTaskExecutionInterface (#6009)

This commit is contained in:
Edward Oakes 2019-10-25 16:33:44 -07:00 committed by GitHub
parent e6141a0b8b
commit d4055d70e3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 375 additions and 509 deletions

View file

@ -665,7 +665,7 @@ cdef class CoreWorker:
def run_task_loop(self):
with nogil:
self.core_worker.get().Execution().Run()
self.core_worker.get().StartExecutingTasks()
def get_current_task_id(self):
return TaskID(self.core_worker.get().GetCurrentTaskId().Binary())

View file

@ -35,11 +35,6 @@ from ray.includes.libraylet cimport CRayletClient
ctypedef unordered_map[c_string, c_vector[pair[int64_t, double]]] \
ResourceMappingType
cdef extern from "ray/core_worker/task_execution.h" namespace "ray" nogil:
cdef cppclass CTaskExecutionInterface "CoreWorkerTaskExecutionInterface":
void Run()
void Stop()
cdef extern from "ray/core_worker/profiling.h" nogil:
cdef cppclass CProfiler "ray::worker::Profiler":
void Start()
@ -95,7 +90,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CWorkerType &GetWorkerType()
CLanguage &GetLanguage()
CObjectInterface &Objects()
CTaskExecutionInterface &Execution()
void StartExecutingTasks()
CRayStatus SubmitTask(
const CRayFunction &function, const c_vector[CTaskArg] &args,

View file

@ -35,7 +35,8 @@ cdef class ProfileEvent:
elif self.extra_data is not None:
extra_data = self.extra_data
self.inner.get().SetExtraData(json.dumps(extra_data).encode("ascii") if extra_data else b"{}")
self.inner.get().SetExtraData(
json.dumps(extra_data).encode("ascii") if extra_data else b"{}")
# Deleting the CProfileEvent will add it to a queue to be pushed to
# the driver.

View file

@ -2,6 +2,7 @@
#include "ray/common/ray_config.h"
#include "ray/common/task/task_util.h"
#include "ray/core_worker/context.h"
#include "ray/core_worker/transport/raylet_transport.h"
namespace {
@ -44,16 +45,20 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
const std::string &store_socket, const std::string &raylet_socket,
const JobID &job_id, const gcs::GcsClientOptions &gcs_options,
const std::string &log_dir, const std::string &node_ip_address,
const CoreWorkerTaskExecutionInterface::TaskExecutionCallback
&task_execution_callback,
const TaskExecutionCallback &task_execution_callback,
std::function<Status()> check_signals, bool use_memory_store)
: worker_type_(worker_type),
language_(language),
raylet_socket_(raylet_socket),
log_dir_(log_dir),
worker_context_(worker_type, job_id),
io_work_(io_service_),
heartbeat_timer_(io_service_) {
heartbeat_timer_(io_service_),
worker_server_(WorkerTypeString(worker_type), 0 /* let grpc choose a port */),
gcs_client_(gcs_options),
object_interface_(worker_context_, raylet_client_, store_socket, use_memory_store,
check_signals),
task_execution_service_work_(task_execution_service_),
task_execution_callback_(task_execution_callback) {
// Initialize logging if log_dir is passed. Otherwise, it must be initialized
// and cleaned up by the caller.
if (log_dir_ != "") {
@ -65,37 +70,41 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
}
// Initialize gcs client.
gcs_client_ =
std::unique_ptr<gcs::RedisGcsClient>(new gcs::RedisGcsClient(gcs_options));
RAY_CHECK_OK(gcs_client_->Connect(io_service_));
RAY_CHECK_OK(gcs_client_.Connect(io_service_));
// Initialize profiler.
profiler_ = std::make_shared<worker::Profiler>(worker_context_, node_ip_address,
io_service_, gcs_client_);
object_interface_ = std::unique_ptr<CoreWorkerObjectInterface>(
new CoreWorkerObjectInterface(worker_context_, raylet_client_, store_socket,
use_memory_store, check_signals));
// Initialize task execution.
int rpc_server_port = 0;
if (worker_type_ == WorkerType::WORKER) {
task_execution_interface_ = std::unique_ptr<CoreWorkerTaskExecutionInterface>(
new CoreWorkerTaskExecutionInterface(*this, worker_context_, raylet_client_,
*object_interface_, profiler_,
task_execution_callback));
rpc_server_port = task_execution_interface_->worker_server_.GetPort();
RAY_CHECK(task_execution_callback_ != nullptr);
// Initialize task receivers.
auto execute_task = std::bind(&CoreWorker::ExecuteTask, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3);
raylet_task_receiver_ =
std::unique_ptr<CoreWorkerRayletTaskReceiver>(new CoreWorkerRayletTaskReceiver(
worker_context_, raylet_client_, object_interface_, task_execution_service_,
worker_server_, execute_task));
direct_actor_task_receiver_ = std::unique_ptr<CoreWorkerDirectActorTaskReceiver>(
new CoreWorkerDirectActorTaskReceiver(worker_context_, object_interface_,
task_execution_service_, worker_server_,
execute_task));
}
// Start RPC server after all the task receivers are properly initialized.
worker_server_.Run();
// Initialize raylet client.
// TODO(zhijunfu): currently RayletClient would crash in its constructor if it cannot
// connect to Raylet after a number of retries, this can be changed later
// so that the worker (java/python .etc) can retrieve and handle the error
// instead of crashing.
raylet_client_ = std::unique_ptr<RayletClient>(new RayletClient(
raylet_socket_, WorkerID::FromBinary(worker_context_.GetWorkerID().Binary()),
raylet_socket, WorkerID::FromBinary(worker_context_.GetWorkerID().Binary()),
(worker_type_ == ray::WorkerType::WORKER), worker_context_.GetCurrentJobID(),
language_, rpc_server_port));
language_, worker_server_.GetPort()));
// Set timer to periodically send heartbeats containing active object IDs to the raylet.
// If the heartbeat timeout is < 0, the heartbeats are disabled.
@ -105,7 +114,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
heartbeat_timer_.async_wait(boost::bind(&CoreWorker::ReportActiveObjectIDs, this));
}
io_thread_ = std::thread(&CoreWorker::StartIOService, this);
io_thread_ = std::thread(&CoreWorker::RunIOService, this);
// Create an entry for the driver task in the task table. This task is
// added immediately with status RUNNING. This allows us to push errors
@ -125,14 +134,55 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
std::shared_ptr<gcs::TaskTableData> data = std::make_shared<gcs::TaskTableData>();
data->mutable_task()->mutable_task_spec()->CopyFrom(builder.Build().GetMessage());
RAY_CHECK_OK(gcs_client_->raylet_task_table().Add(job_id, task_id, data, nullptr));
RAY_CHECK_OK(gcs_client_.raylet_task_table().Add(job_id, task_id, data, nullptr));
SetCurrentTaskId(task_id);
}
direct_actor_submitter_ = std::unique_ptr<CoreWorkerDirectActorTaskSubmitter>(
new CoreWorkerDirectActorTaskSubmitter(
io_service_,
object_interface_->CreateStoreProvider(StoreProviderType::MEMORY)));
io_service_, object_interface_.CreateStoreProvider(StoreProviderType::MEMORY)));
}
CoreWorker::~CoreWorker() {
io_service_.stop();
io_thread_.join();
if (worker_type_ == WorkerType::WORKER) {
task_execution_service_.stop();
}
if (log_dir_ != "") {
RayLog::ShutDownRayLog();
}
}
void CoreWorker::Disconnect() {
io_service_.stop();
gcs_client_.Disconnect();
if (raylet_client_) {
RAY_IGNORE_EXPR(raylet_client_->Disconnect());
}
}
void CoreWorker::RunIOService() {
// Block SIGINT and SIGTERM so they will be handled by the main thread.
sigset_t mask;
sigemptyset(&mask);
sigaddset(&mask, SIGINT);
sigaddset(&mask, SIGTERM);
pthread_sigmask(SIG_BLOCK, &mask, NULL);
io_service_.run();
}
void CoreWorker::SetCurrentTaskId(const TaskID &task_id) {
worker_context_.SetCurrentTaskId(task_id);
main_thread_task_id_ = task_id;
// Clear all actor handles at the end of each non-actor task.
if (actor_id_.IsNil() && task_id.IsNil()) {
for (const auto &handle : actor_handles_) {
RAY_CHECK_OK(gcs_client_.Actors().AsyncUnsubscribe(handle.first, nullptr));
}
actor_handles_.clear();
}
}
void CoreWorker::AddActiveObjectID(const ObjectID &object_id) {
@ -175,56 +225,6 @@ void CoreWorker::ReportActiveObjectIDs() {
active_object_ids_updated_ = false;
}
CoreWorker::~CoreWorker() {
io_service_.stop();
io_thread_.join();
if (task_execution_interface_) {
task_execution_interface_->Stop();
}
if (log_dir_ != "") {
RayLog::ShutDownRayLog();
}
}
void CoreWorker::Disconnect() {
io_service_.stop();
if (gcs_client_) {
gcs_client_->Disconnect();
}
if (raylet_client_) {
RAY_IGNORE_EXPR(raylet_client_->Disconnect());
}
}
void CoreWorker::StartIOService() {
// Block SIGINT and SIGTERM so they will be handled by the main thread.
sigset_t mask;
sigemptyset(&mask);
sigaddset(&mask, SIGINT);
sigaddset(&mask, SIGTERM);
pthread_sigmask(SIG_BLOCK, &mask, NULL);
io_service_.run();
}
std::unique_ptr<worker::ProfileEvent> CoreWorker::CreateProfileEvent(
const std::string &event_type) {
return std::unique_ptr<worker::ProfileEvent>(
new worker::ProfileEvent(profiler_, event_type));
}
void CoreWorker::SetCurrentTaskId(const TaskID &task_id) {
worker_context_.SetCurrentTaskId(task_id);
main_thread_task_id_ = task_id;
// Clear all actor handles at the end of each non-actor task.
if (actor_id_.IsNil() && task_id.IsNil()) {
for (const auto &handle : actor_handles_) {
RAY_CHECK_OK(gcs_client_->Actors().AsyncUnsubscribe(handle.first, nullptr));
}
actor_handles_.clear();
}
}
TaskID CoreWorker::GetCallerId() const {
TaskID caller_id;
ActorID actor_id = GetActorId();
@ -236,54 +236,6 @@ TaskID CoreWorker::GetCallerId() const {
return caller_id;
}
bool CoreWorker::AddActorHandle(std::unique_ptr<ActorHandle> actor_handle) {
const auto &actor_id = actor_handle->GetActorID();
auto inserted = actor_handles_.emplace(actor_id, std::move(actor_handle)).second;
if (inserted) {
// Register a callback to handle actor notifications.
auto actor_notification_callback = [this](const ActorID &actor_id,
const gcs::ActorTableData &actor_data) {
if (actor_data.state() == gcs::ActorTableData::RECONSTRUCTING) {
auto it = actor_handles_.find(actor_id);
RAY_CHECK(it != actor_handles_.end());
if (it->second->IsDirectCallActor()) {
// We have to reset the actor handle since the next instance of the
// actor will not have the last sequence number that we sent.
// TODO: Remove the check for direct calls. We do not reset for the
// raylet codepath because it tries to replay all tasks since the
// last actor checkpoint.
it->second->Reset();
}
} else if (actor_data.state() == gcs::ActorTableData::DEAD) {
RAY_CHECK_OK(gcs_client_->Actors().AsyncUnsubscribe(actor_id, nullptr));
// We cannot erase the actor handle here because clients can still
// submit tasks to dead actors.
}
direct_actor_submitter_->HandleActorUpdate(actor_id, actor_data);
RAY_LOG(INFO) << "received notification on actor, state="
<< static_cast<int>(actor_data.state()) << ", actor_id: " << actor_id
<< ", ip address: " << actor_data.ip_address()
<< ", port: " << actor_data.port();
};
RAY_CHECK_OK(gcs_client_->Actors().AsyncSubscribe(
actor_id, actor_notification_callback, nullptr));
}
return inserted;
}
Status CoreWorker::GetActorHandle(const ActorID &actor_id,
ActorHandle **actor_handle) const {
auto it = actor_handles_.find(actor_id);
if (it == actor_handles_.end()) {
return Status::Invalid("Handle for actor does not exist");
}
*actor_handle = it->second.get();
return Status::OK();
}
Status CoreWorker::SubmitTask(const RayFunction &function,
const std::vector<TaskArg> &args,
const TaskOptions &task_options,
@ -388,12 +340,156 @@ Status CoreWorker::SerializeActorHandle(const ActorID &actor_id,
return status;
}
const ResourceMappingType CoreWorker::GetResourceIDs() const {
if (worker_type_ == WorkerType::DRIVER) {
ResourceMappingType empty;
return empty;
bool CoreWorker::AddActorHandle(std::unique_ptr<ActorHandle> actor_handle) {
const auto &actor_id = actor_handle->GetActorID();
auto inserted = actor_handles_.emplace(actor_id, std::move(actor_handle)).second;
if (inserted) {
// Register a callback to handle actor notifications.
auto actor_notification_callback = [this](const ActorID &actor_id,
const gcs::ActorTableData &actor_data) {
if (actor_data.state() == gcs::ActorTableData::RECONSTRUCTING) {
auto it = actor_handles_.find(actor_id);
RAY_CHECK(it != actor_handles_.end());
if (it->second->IsDirectCallActor()) {
// We have to reset the actor handle since the next instance of the
// actor will not have the last sequence number that we sent.
// TODO: Remove the check for direct calls. We do not reset for the
// raylet codepath because it tries to replay all tasks since the
// last actor checkpoint.
it->second->Reset();
}
} else if (actor_data.state() == gcs::ActorTableData::DEAD) {
RAY_CHECK_OK(gcs_client_.Actors().AsyncUnsubscribe(actor_id, nullptr));
// We cannot erase the actor handle here because clients can still
// submit tasks to dead actors.
}
direct_actor_submitter_->HandleActorUpdate(actor_id, actor_data);
RAY_LOG(INFO) << "received notification on actor, state="
<< static_cast<int>(actor_data.state()) << ", actor_id: " << actor_id
<< ", ip address: " << actor_data.ip_address()
<< ", port: " << actor_data.port();
};
RAY_CHECK_OK(gcs_client_.Actors().AsyncSubscribe(
actor_id, actor_notification_callback, nullptr));
}
return task_execution_interface_->GetResourceIDs();
return inserted;
}
Status CoreWorker::GetActorHandle(const ActorID &actor_id,
ActorHandle **actor_handle) const {
auto it = actor_handles_.find(actor_id);
if (it == actor_handles_.end()) {
return Status::Invalid("Handle for actor does not exist");
}
*actor_handle = it->second.get();
return Status::OK();
}
std::unique_ptr<worker::ProfileEvent> CoreWorker::CreateProfileEvent(
const std::string &event_type) {
return std::unique_ptr<worker::ProfileEvent>(
new worker::ProfileEvent(profiler_, event_type));
}
void CoreWorker::StartExecutingTasks() {
idle_profile_event_.reset(new worker::ProfileEvent(profiler_, "worker_idle"));
task_execution_service_.run();
}
Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
const ResourceMappingType &resource_ids,
std::vector<std::shared_ptr<RayObject>> *results) {
idle_profile_event_.reset();
RAY_LOG(DEBUG) << "Executing task " << task_spec.TaskId();
resource_ids_ = resource_ids;
worker_context_.SetCurrentTask(task_spec);
SetCurrentTaskId(task_spec.TaskId());
RayFunction func{task_spec.GetLanguage(), task_spec.FunctionDescriptor()};
std::vector<std::shared_ptr<RayObject>> args;
std::vector<ObjectID> arg_reference_ids;
RAY_CHECK_OK(BuildArgsForExecutor(task_spec, &args, &arg_reference_ids));
std::vector<ObjectID> return_ids;
for (size_t i = 0; i < task_spec.NumReturns(); i++) {
return_ids.push_back(task_spec.ReturnId(i));
}
Status status;
TaskType task_type = TaskType::NORMAL_TASK;
if (task_spec.IsActorCreationTask()) {
RAY_CHECK(return_ids.size() > 0);
return_ids.pop_back();
task_type = TaskType::ACTOR_CREATION_TASK;
SetActorId(task_spec.ActorCreationId());
} else if (task_spec.IsActorTask()) {
RAY_CHECK(return_ids.size() > 0);
return_ids.pop_back();
task_type = TaskType::ACTOR_TASK;
}
status = task_execution_callback_(task_type, func,
task_spec.GetRequiredResources().GetResourceMap(),
args, arg_reference_ids, return_ids, results);
SetCurrentTaskId(TaskID::Nil());
worker_context_.ResetCurrentTask(task_spec);
// TODO(zhijunfu):
// 1. Check and handle failure.
// 2. Save or load checkpoint.
idle_profile_event_.reset(new worker::ProfileEvent(profiler_, "worker_idle"));
return status;
}
Status CoreWorker::BuildArgsForExecutor(const TaskSpecification &task,
std::vector<std::shared_ptr<RayObject>> *args,
std::vector<ObjectID> *arg_reference_ids) {
auto num_args = task.NumArgs();
args->resize(num_args);
arg_reference_ids->resize(num_args);
std::vector<ObjectID> object_ids_to_fetch;
std::vector<int> indices;
for (size_t i = 0; i < task.NumArgs(); ++i) {
int count = task.ArgIdCount(i);
if (count > 0) {
// pass by reference.
RAY_CHECK(count == 1);
object_ids_to_fetch.push_back(task.ArgId(i, 0));
indices.push_back(i);
arg_reference_ids->at(i) = task.ArgId(i, 0);
} else {
// pass by value.
std::shared_ptr<LocalMemoryBuffer> data = nullptr;
if (task.ArgDataSize(i)) {
data = std::make_shared<LocalMemoryBuffer>(const_cast<uint8_t *>(task.ArgData(i)),
task.ArgDataSize(i));
}
std::shared_ptr<LocalMemoryBuffer> metadata = nullptr;
if (task.ArgMetadataSize(i)) {
metadata = std::make_shared<LocalMemoryBuffer>(
const_cast<uint8_t *>(task.ArgMetadata(i)), task.ArgMetadataSize(i));
}
args->at(i) = std::make_shared<RayObject>(data, metadata);
arg_reference_ids->at(i) = ObjectID::Nil();
}
}
std::vector<std::shared_ptr<RayObject>> results;
auto status = object_interface_.Get(object_ids_to_fetch, -1, &results);
if (status.ok()) {
for (size_t i = 0; i < results.size(); i++) {
args->at(indices[i]) = results[i];
}
}
return status;
}
} // namespace ray

View file

@ -7,10 +7,12 @@
#include "ray/core_worker/context.h"
#include "ray/core_worker/object_interface.h"
#include "ray/core_worker/profiling.h"
#include "ray/core_worker/task_execution.h"
#include "ray/core_worker/transport/direct_actor_transport.h"
#include "ray/core_worker/transport/raylet_transport.h"
#include "ray/gcs/redis_gcs_client.h"
#include "ray/raylet/raylet_client.h"
#include "ray/rpc/worker/worker_client.h"
#include "ray/rpc/worker/worker_server.h"
namespace ray {
@ -18,6 +20,16 @@ namespace ray {
/// of the worker. This class is supposed to be used to implement app-language (Java,
/// Python, etc) workers.
class CoreWorker {
// Callback that must be implemented and provided by the language-specific worker
// frontend to execute tasks and return their results.
using TaskExecutionCallback = std::function<Status(
TaskType task_type, const RayFunction &ray_function,
const std::unordered_map<std::string, double> &required_resources,
const std::vector<std::shared_ptr<RayObject>> &args,
const std::vector<ObjectID> &arg_reference_ids,
const std::vector<ObjectID> &return_ids,
std::vector<std::shared_ptr<RayObject>> *results)>;
public:
/// Construct a CoreWorker instance.
///
@ -45,8 +57,7 @@ class CoreWorker {
const std::string &store_socket, const std::string &raylet_socket,
const JobID &job_id, const gcs::GcsClientOptions &gcs_options,
const std::string &log_dir, const std::string &node_ip_address,
const CoreWorkerTaskExecutionInterface::TaskExecutionCallback
&task_execution_callback,
const TaskExecutionCallback &task_execution_callback,
std::function<Status()> check_signals = nullptr,
bool use_memory_store = true);
@ -54,32 +65,15 @@ class CoreWorker {
void Disconnect();
/// Type of this worker.
WorkerType GetWorkerType() const { return worker_type_; }
/// Language of this worker.
Language GetLanguage() const { return language_; }
WorkerContext &GetWorkerContext() { return worker_context_; }
RayletClient &GetRayletClient() { return *raylet_client_; }
/// Return the `CoreWorkerObjectInterface` that contains methods related to object
/// store.
CoreWorkerObjectInterface &Objects() { return *object_interface_; }
/// Create a profile event with a reference to the core worker's profiler.
std::unique_ptr<worker::ProfileEvent> CreateProfileEvent(const std::string &event_type);
/// Return the `CoreWorkerTaskExecutionInterface` that contains methods related to
/// task execution.
CoreWorkerTaskExecutionInterface &Execution() {
RAY_CHECK(task_execution_interface_ != nullptr);
return *task_execution_interface_;
}
// Get the resource IDs available to this worker (as assigned by the raylet).
const ResourceMappingType GetResourceIDs() const;
CoreWorkerObjectInterface &Objects() { return object_interface_; }
const TaskID &GetCurrentTaskId() const { return worker_context_.GetCurrentTaskID(); }
@ -92,7 +86,15 @@ class CoreWorker {
actor_id_ = actor_id;
}
const ActorID &GetActorId() const { return actor_id_; }
// Add this object ID to the set of active object IDs that is sent to the raylet
// in the heartbeat messsage.
void AddActiveObjectID(const ObjectID &object_id);
// Remove this object ID from the set of active object IDs that is sent to the raylet
// in the heartbeat messsage.
void RemoveActiveObjectID(const ObjectID &object_id);
/* Public methods related to task submission. */
/// Get the caller ID used to submit tasks from this worker to an actor.
///
@ -102,8 +104,6 @@ class CoreWorker {
/// of the bytes zeroed out.
TaskID GetCallerId() const;
/* Methods related to task submission. */
/// Submit a normal task.
///
/// \param[in] function The remote function to execute.
@ -164,15 +164,29 @@ class CoreWorker {
/// \return Status::Invalid if we don't have the specified handle.
Status SerializeActorHandle(const ActorID &actor_id, std::string *output) const;
// Add this object ID to the set of active object IDs that is sent to the raylet
// in the heartbeat messsage.
void AddActiveObjectID(const ObjectID &object_id);
/* Public methods related to task execution. Should not be used by driver processes. */
// Remove this object ID from the set of active object IDs that is sent to the raylet
// in the heartbeat messsage.
void RemoveActiveObjectID(const ObjectID &object_id);
const ActorID &GetActorId() const { return actor_id_; }
// Get the resource IDs available to this worker (as assigned by the raylet).
const ResourceMappingType GetResourceIDs() const { return resource_ids_; }
/// Create a profile event with a reference to the core worker's profiler.
std::unique_ptr<worker::ProfileEvent> CreateProfileEvent(const std::string &event_type);
/// Start receiving and executing tasks.
/// \return void.
void StartExecutingTasks();
private:
/// Run the io_service_ event loop. This should be called in a background thread.
void RunIOService();
/// Send the list of active object IDs to the raylet.
void ReportActiveObjectIDs();
/* Private methods related to task submission. */
/// Give this worker a handle to an actor.
///
/// This handle will remain as long as the current actor or task is
@ -193,50 +207,76 @@ class CoreWorker {
/// \return Status::Invalid if we don't have this actor handle.
Status GetActorHandle(const ActorID &actor_id, ActorHandle **actor_handle) const;
void StartIOService();
/* Private methods related to task execution. Should not be used by driver processes. */
void ReportActiveObjectIDs();
/// Execute a task.
///
/// \param spec[in] Task specification.
/// \param spec[in] Resource IDs of resources assigned to this worker.
/// \param results[out] Results for task execution.
/// \return Status.
Status ExecuteTask(const TaskSpecification &task_spec,
const ResourceMappingType &resource_ids,
std::vector<std::shared_ptr<RayObject>> *results);
/// Build arguments for task executor. This would loop through all the arguments
/// in task spec, and for each of them that's passed by reference (ObjectID),
/// fetch its content from store and; for arguments that are passed by value,
/// just copy their content.
///
/// \param spec[in] Task specification.
/// \param args[out] Argument data as RayObjects.
/// \param args[out] ObjectIDs corresponding to each by reference argument. The length
/// of this vector will be the same as args, and by value arguments
/// will have ObjectID::Nil().
/// // TODO(edoakes): this is a bit of a hack that's necessary because
/// we have separate serialization paths for by-value and by-reference
/// arguments in Python. This should ideally be handled better there.
/// \return The arguments for passing to task executor.
Status BuildArgsForExecutor(const TaskSpecification &task,
std::vector<std::shared_ptr<RayObject>> *args,
std::vector<ObjectID> *arg_reference_ids);
/// Type of this worker (i.e., DRIVER or WORKER).
const WorkerType worker_type_;
/// Application language of this worker (i.e., PYTHON or JAVA).
const Language language_;
const std::string raylet_socket_;
/// Directory where log files are written.
const std::string log_dir_;
/// Shared state of the worker. Includes process-level and thread-level state.
/// TODO(edoakes): we should move process-level state into this class and make
/// this a ThreadContext.
WorkerContext worker_context_;
/// The ID of the current task being executed by the main thread. If there
/// are multiple threads, they will have a thread-local task ID stored in the
/// worker context.
TaskID main_thread_task_id_;
/// Our actor ID. If this is nil, then we execute only stateless tasks.
ActorID actor_id_;
/// Event loop where the IO events are handled. e.g. async GCS operations.
boost::asio::io_service io_service_;
/// Keeps the io_service_ alive.
boost::asio::io_service::work io_work_;
/// Timer used to periodically send heartbeat containing active object IDs to the
/// raylet.
boost::asio::steady_timer heartbeat_timer_;
// Thread that runs a boost::asio service to process IO events.
std::thread io_thread_;
/// RPC server used to receive tasks to execute.
rpc::GrpcServer worker_server_;
// Client to the GCS shared by core worker interfaces.
std::unique_ptr<gcs::RedisGcsClient> gcs_client_;
gcs::RedisGcsClient gcs_client_;
// Client to the raylet shared by core worker interfaces.
std::unique_ptr<RayletClient> raylet_client_;
// Interface to submit tasks directly to other actors.
std::unique_ptr<CoreWorkerDirectActorTaskSubmitter> direct_actor_submitter_;
// Interface for storing and retrieving shared objects.
std::unique_ptr<CoreWorkerObjectInterface> object_interface_;
// Profiler including a background thread that pushes profiling events to the GCS.
std::shared_ptr<worker::Profiler> profiler_;
/// Map from actor ID to a handle to that actor.
std::unordered_map<ActorID, std::unique_ptr<ActorHandle> > actor_handles_;
// Thread that runs a boost::asio service to process IO events.
std::thread io_thread_;
/// Set of object IDs that are in scope in the language worker.
std::unordered_set<ObjectID> active_object_ids_;
@ -245,8 +285,48 @@ class CoreWorker {
/// last time it was sent to the raylet.
bool active_object_ids_updated_ = false;
/// Only available if it's not a driver.
std::unique_ptr<CoreWorkerTaskExecutionInterface> task_execution_interface_;
// Interface for storing and retrieving shared objects.
CoreWorkerObjectInterface object_interface_;
/* Fields related to task submission. */
// Interface to submit tasks directly to other actors.
std::unique_ptr<CoreWorkerDirectActorTaskSubmitter> direct_actor_submitter_;
/// Map from actor ID to a handle to that actor.
std::unordered_map<ActorID, std::unique_ptr<ActorHandle>> actor_handles_;
/* Fields related to task execution. */
/// Our actor ID. If this is nil, then we execute only stateless tasks.
ActorID actor_id_;
/// Event loop where tasks are processed.
boost::asio::io_service task_execution_service_;
/// The asio work to keep task_execution_service_ alive.
boost::asio::io_service::work task_execution_service_work_;
// Profiler including a background thread that pushes profiling events to the GCS.
std::shared_ptr<worker::Profiler> profiler_;
// Profile event for when the worker is idle. Should be reset when the worker
// enters and exits an idle period.
std::unique_ptr<worker::ProfileEvent> idle_profile_event_;
// Task execution callback.
TaskExecutionCallback task_execution_callback_;
/// A map from resource name to the resource IDs that are currently reserved
/// for this worker. Each pair consists of the resource ID and the fraction
/// of that resource allocated for this worker.
ResourceMappingType resource_ids_;
// Interface that receives tasks from the raylet.
std::unique_ptr<CoreWorkerRayletTaskReceiver> raylet_task_receiver_;
// Interface that receives tasks from direct actor calls.
std::unique_ptr<CoreWorkerDirectActorTaskReceiver> direct_actor_task_receiver_;
friend class CoreWorkerTest;
};

View file

@ -95,7 +95,7 @@ JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeRunTaskExecut
local_env = env;
local_java_task_executor = javaTaskExecutor;
auto core_worker = reinterpret_cast<ray::CoreWorker *>(nativeCoreWorkerPointer);
core_worker->Execution().Run();
core_worker->StartExecutingTasks();
local_env = nullptr;
local_java_task_executor = nullptr;
}

View file

@ -14,8 +14,7 @@ ProfileEvent::ProfileEvent(const std::shared_ptr<Profiler> profiler,
}
Profiler::Profiler(WorkerContext &worker_context, const std::string &node_ip_address,
boost::asio::io_service &io_service,
std::unique_ptr<gcs::RedisGcsClient> &gcs_client)
boost::asio::io_service &io_service, gcs::RedisGcsClient &gcs_client)
: io_service_(io_service),
timer_(io_service_, boost::asio::chrono::seconds(1)),
gcs_client_(gcs_client) {
@ -36,7 +35,7 @@ void Profiler::FlushEvents() {
if (rpc_profile_data_.profile_events_size() != 0) {
// TODO(edoakes): this should be migrated to use the new GCS client interface
// instead of the raw table interface once it's ready.
if (!gcs_client_->profile_table().AddProfileEventBatch(rpc_profile_data_).ok()) {
if (!gcs_client_.profile_table().AddProfileEventBatch(rpc_profile_data_).ok()) {
RAY_LOG(WARNING) << "Failed to push profile events to GCS.";
} else {
RAY_LOG(DEBUG) << "Pushed " << rpc_profile_data_.profile_events_size()

View file

@ -14,8 +14,7 @@ namespace worker {
class Profiler {
public:
Profiler(WorkerContext &worker_context, const std::string &node_ip_address,
boost::asio::io_service &io_service,
std::unique_ptr<gcs::RedisGcsClient> &gcs_client);
boost::asio::io_service &io_service, gcs::RedisGcsClient &gcs_client);
// Add an event to the queue to be flushed periodically.
void AddEvent(const rpc::ProfileTableData::ProfileEvent &event);
@ -34,7 +33,7 @@ class Profiler {
// until they are flushed.
rpc::ProfileTableData rpc_profile_data_ GUARDED_BY(mu_);
std::unique_ptr<gcs::RedisGcsClient> &gcs_client_;
gcs::RedisGcsClient &gcs_client_;
absl::Mutex mu_;
};

View file

@ -1,150 +0,0 @@
#include "ray/core_worker/task_execution.h"
#include "ray/core_worker/context.h"
#include "ray/core_worker/core_worker.h"
#include "ray/core_worker/transport/direct_actor_transport.h"
#include "ray/core_worker/transport/raylet_transport.h"
namespace ray {
CoreWorkerTaskExecutionInterface::CoreWorkerTaskExecutionInterface(
CoreWorker &core_worker, WorkerContext &worker_context,
std::unique_ptr<RayletClient> &raylet_client,
CoreWorkerObjectInterface &object_interface,
const std::shared_ptr<worker::Profiler> profiler,
const TaskExecutionCallback &task_execution_callback)
: core_worker_(core_worker),
worker_context_(worker_context),
object_interface_(object_interface),
profiler_(profiler),
task_execution_callback_(task_execution_callback),
worker_server_("Worker", 0 /* let grpc choose port */),
main_service_(std::make_shared<boost::asio::io_service>()),
main_work_(*main_service_) {
RAY_CHECK(task_execution_callback_ != nullptr);
auto func =
std::bind(&CoreWorkerTaskExecutionInterface::ExecuteTask, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
task_receivers_.emplace(
TaskTransportType::RAYLET,
std::unique_ptr<CoreWorkerRayletTaskReceiver>(new CoreWorkerRayletTaskReceiver(
worker_context_, raylet_client, object_interface_, *main_service_,
worker_server_, func)));
task_receivers_.emplace(
TaskTransportType::DIRECT_ACTOR,
std::unique_ptr<CoreWorkerDirectActorTaskReceiver>(
new CoreWorkerDirectActorTaskReceiver(worker_context_, object_interface_,
*main_service_, worker_server_, func)));
// Start RPC server after all the task receivers are properly initialized.
worker_server_.Run();
}
Status CoreWorkerTaskExecutionInterface::ExecuteTask(
const TaskSpecification &task_spec, const ResourceMappingType &resource_ids,
std::vector<std::shared_ptr<RayObject>> *results) {
idle_profile_event_.reset();
RAY_LOG(DEBUG) << "Executing task " << task_spec.TaskId();
resource_ids_ = resource_ids;
worker_context_.SetCurrentTask(task_spec);
core_worker_.SetCurrentTaskId(task_spec.TaskId());
RayFunction func{task_spec.GetLanguage(), task_spec.FunctionDescriptor()};
std::vector<std::shared_ptr<RayObject>> args;
std::vector<ObjectID> arg_reference_ids;
RAY_CHECK_OK(BuildArgsForExecutor(task_spec, &args, &arg_reference_ids));
std::vector<ObjectID> return_ids;
for (size_t i = 0; i < task_spec.NumReturns(); i++) {
return_ids.push_back(task_spec.ReturnId(i));
}
Status status;
TaskType task_type = TaskType::NORMAL_TASK;
if (task_spec.IsActorCreationTask()) {
RAY_CHECK(return_ids.size() > 0);
return_ids.pop_back();
task_type = TaskType::ACTOR_CREATION_TASK;
core_worker_.SetActorId(task_spec.ActorCreationId());
} else if (task_spec.IsActorTask()) {
RAY_CHECK(return_ids.size() > 0);
return_ids.pop_back();
task_type = TaskType::ACTOR_TASK;
}
status = task_execution_callback_(task_type, func,
task_spec.GetRequiredResources().GetResourceMap(),
args, arg_reference_ids, return_ids, results);
core_worker_.SetCurrentTaskId(TaskID::Nil());
worker_context_.ResetCurrentTask(task_spec);
// TODO(zhijunfu):
// 1. Check and handle failure.
// 2. Save or load checkpoint.
idle_profile_event_.reset(new worker::ProfileEvent(profiler_, "worker_idle"));
return status;
}
void CoreWorkerTaskExecutionInterface::Run() {
idle_profile_event_.reset(new worker::ProfileEvent(profiler_, "worker_idle"));
main_service_->run();
}
void CoreWorkerTaskExecutionInterface::Stop() {
// Stop main IO service.
std::shared_ptr<boost::asio::io_service> main_service = main_service_;
// Delay the execution of io_service::stop() to avoid deadlock if
// CoreWorkerTaskExecutionInterface::Stop is called inside a task.
idle_profile_event_.reset();
main_service_->post([main_service]() { main_service->stop(); });
}
Status CoreWorkerTaskExecutionInterface::BuildArgsForExecutor(
const TaskSpecification &task, std::vector<std::shared_ptr<RayObject>> *args,
std::vector<ObjectID> *arg_reference_ids) {
auto num_args = task.NumArgs();
args->resize(num_args);
arg_reference_ids->resize(num_args);
std::vector<ObjectID> object_ids_to_fetch;
std::vector<int> indices;
for (size_t i = 0; i < task.NumArgs(); ++i) {
int count = task.ArgIdCount(i);
if (count > 0) {
// pass by reference.
RAY_CHECK(count == 1);
object_ids_to_fetch.push_back(task.ArgId(i, 0));
indices.push_back(i);
arg_reference_ids->at(i) = task.ArgId(i, 0);
} else {
// pass by value.
std::shared_ptr<LocalMemoryBuffer> data = nullptr;
if (task.ArgDataSize(i)) {
data = std::make_shared<LocalMemoryBuffer>(const_cast<uint8_t *>(task.ArgData(i)),
task.ArgDataSize(i));
}
std::shared_ptr<LocalMemoryBuffer> metadata = nullptr;
if (task.ArgMetadataSize(i)) {
metadata = std::make_shared<LocalMemoryBuffer>(
const_cast<uint8_t *>(task.ArgMetadata(i)), task.ArgMetadataSize(i));
}
args->at(i) = std::make_shared<RayObject>(data, metadata);
arg_reference_ids->at(i) = ObjectID::Nil();
}
}
std::vector<std::shared_ptr<RayObject>> results;
auto status = object_interface_.Get(object_ids_to_fetch, -1, &results);
if (status.ok()) {
for (size_t i = 0; i < results.size(); i++) {
args->at(indices[i]) = results[i];
}
}
return status;
}
} // namespace ray

View file

@ -1,128 +0,0 @@
#ifndef RAY_CORE_WORKER_TASK_EXECUTION_H
#define RAY_CORE_WORKER_TASK_EXECUTION_H
#include "ray/common/buffer.h"
#include "ray/common/status.h"
#include "ray/core_worker/common.h"
#include "ray/core_worker/context.h"
#include "ray/core_worker/object_interface.h"
#include "ray/core_worker/profiling.h"
#include "ray/core_worker/transport/transport.h"
#include "ray/rpc/client_call.h"
#include "ray/rpc/worker/worker_client.h"
#include "ray/rpc/worker/worker_server.h"
namespace ray {
class CoreWorker;
namespace raylet {
class TaskSpecification;
}
/// The interface that contains all `CoreWorker` methods that are related to task
/// execution.
class CoreWorkerTaskExecutionInterface {
public:
// Callback that must be implemented and provided by the language-specific worker
// frontend to execute tasks and return their results.
using TaskExecutionCallback = std::function<Status(
TaskType task_type, const RayFunction &ray_function,
const std::unordered_map<std::string, double> &required_resources,
const std::vector<std::shared_ptr<RayObject>> &args,
const std::vector<ObjectID> &arg_reference_ids,
const std::vector<ObjectID> &return_ids,
std::vector<std::shared_ptr<RayObject>> *results)>;
CoreWorkerTaskExecutionInterface(CoreWorker &core_worker, WorkerContext &worker_context,
std::unique_ptr<RayletClient> &raylet_client,
CoreWorkerObjectInterface &object_interface,
const std::shared_ptr<worker::Profiler> profiler,
const TaskExecutionCallback &task_execution_callback);
// Get the resource IDs available to this worker (as assigned by the raylet).
const ResourceMappingType &GetResourceIDs() const { return resource_ids_; }
/// Start receiving and executing tasks.
/// \return void.
void Run();
/// Stop receiving and executing tasks.
/// \return void.
void Stop();
private:
/// Build arguments for task executor. This would loop through all the arguments
/// in task spec, and for each of them that's passed by reference (ObjectID),
/// fetch its content from store and; for arguments that are passed by value,
/// just copy their content.
///
/// \param spec[in] Task specification.
/// \param args[out] Argument data as RayObjects.
/// \param args[out] ObjectIDs corresponding to each by reference argument. The length
/// of this vector will be the same as args, and by value arguments
/// will have ObjectID::Nil().
/// // TODO(edoakes): this is a bit of a hack that's necessary because
/// we have separate serialization paths for by-value and by-reference
/// arguments in Python. This should ideally be handled better there.
/// \return The arguments for passing to task executor.
Status BuildArgsForExecutor(const TaskSpecification &task,
std::vector<std::shared_ptr<RayObject>> *args,
std::vector<ObjectID> *arg_reference_ids);
/// Execute a task.
///
/// \param spec[in] Task specification.
/// \param spec[in] Resource IDs of resources assigned to this worker.
/// \param results[out] Results for task execution.
/// \return Status.
Status ExecuteTask(const TaskSpecification &task_spec,
const ResourceMappingType &resource_ids,
std::vector<std::shared_ptr<RayObject>> *results);
/// Reference to the parent CoreWorker.
/// TODO(edoakes) this is very ugly, but unfortunately necessary so that we
/// can clear the ActorHandle state when we start executing a task. Two
/// possible solutions are to either move the ActorHandle state into the
/// WorkerContext or to remove this interface entirely.
CoreWorker &core_worker_;
/// Reference to the parent CoreWorker's context.
WorkerContext &worker_context_;
/// Reference to the parent CoreWorker's objects interface.
CoreWorkerObjectInterface &object_interface_;
// Reference to the parent CoreWorker's profiler.
const std::shared_ptr<worker::Profiler> profiler_;
// Task execution callback.
TaskExecutionCallback task_execution_callback_;
/// All the task task receivers supported.
EnumUnorderedMap<TaskTransportType, std::unique_ptr<CoreWorkerTaskReceiver>>
task_receivers_;
/// The RPC server.
rpc::GrpcServer worker_server_;
/// Event loop where tasks are processed.
std::shared_ptr<boost::asio::io_service> main_service_;
/// The asio work to keep main_service_ alive.
boost::asio::io_service::work main_work_;
/// A map from resource name to the resource IDs that are currently reserved
/// for this worker. Each pair consists of the resource ID and the fraction
/// of that resource allocated for this worker.
ResourceMappingType resource_ids_;
// Profile event for when the worker is idle. Should be reset when the worker
// enters and exits an idle period.
std::unique_ptr<worker::ProfileEvent> idle_profile_event_;
friend class CoreWorker;
};
} // namespace ray
#endif // RAY_CORE_WORKER_TASK_EXECUTION_H

View file

@ -2,7 +2,6 @@
#include "ray/core_worker/context.h"
#include "ray/core_worker/core_worker.h"
#include "ray/core_worker/store_provider/store_provider.h"
#include "ray/core_worker/task_execution.h"
#include "src/ray/util/test_util.h"
using namespace std::placeholders;
@ -27,10 +26,7 @@ class MockWorker {
/*node_id_address=*/"127.0.0.1",
std::bind(&MockWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, _7)) {}
void Run() {
// Start executing tasks.
worker_.Execution().Run();
}
void StartExecutingTasks() { worker_.StartExecutingTasks(); }
private:
Status ExecuteTask(TaskType task_type, const RayFunction &ray_function,
@ -82,6 +78,6 @@ int main(int argc, char **argv) {
ray::gcs::GcsClientOptions gcs_options("127.0.0.1", 6379, "");
ray::MockWorker worker(store_socket, raylet_socket, gcs_options);
worker.Run();
worker.StartExecutingTasks();
return 0;
}

View file

@ -7,7 +7,6 @@
#include "ray/common/id.h"
#include "ray/core_worker/object_interface.h"
#include "ray/core_worker/transport/transport.h"
#include "ray/gcs/redis_gcs_client.h"
#include "ray/rpc/worker/direct_actor_client.h"
#include "ray/rpc/worker/direct_actor_server.h"
@ -196,9 +195,12 @@ class SchedulingQueue {
friend class SchedulingQueueTest;
};
class CoreWorkerDirectActorTaskReceiver : public CoreWorkerTaskReceiver,
public rpc::DirectActorHandler {
class CoreWorkerDirectActorTaskReceiver : public rpc::DirectActorHandler {
public:
using TaskHandler = std::function<Status(
const TaskSpecification &task_spec, const ResourceMappingType &resource_ids,
std::vector<std::shared_ptr<RayObject>> *results)>;
CoreWorkerDirectActorTaskReceiver(WorkerContext &worker_context,
CoreWorkerObjectInterface &object_interface,
boost::asio::io_service &io_service,

View file

@ -4,15 +4,17 @@
#include <list>
#include "ray/core_worker/object_interface.h"
#include "ray/core_worker/transport/transport.h"
#include "ray/raylet/raylet_client.h"
#include "ray/rpc/worker/worker_server.h"
namespace ray {
class CoreWorkerRayletTaskReceiver : public CoreWorkerTaskReceiver,
public rpc::WorkerTaskHandler {
class CoreWorkerRayletTaskReceiver : public rpc::WorkerTaskHandler {
public:
using TaskHandler = std::function<Status(
const TaskSpecification &task_spec, const ResourceMappingType &resource_ids,
std::vector<std::shared_ptr<RayObject>> *results)>;
CoreWorkerRayletTaskReceiver(WorkerContext &worker_context,
std::unique_ptr<RayletClient> &raylet_client,
CoreWorkerObjectInterface &object_interface,

View file

@ -1,27 +0,0 @@
#ifndef RAY_CORE_WORKER_TRANSPORT_H
#define RAY_CORE_WORKER_TRANSPORT_H
#include <list>
#include "ray/common/buffer.h"
#include "ray/common/id.h"
#include "ray/common/status.h"
#include "ray/common/task/task_spec.h"
#include "ray/core_worker/common.h"
#include "ray/core_worker/store_provider/store_provider.h"
namespace ray {
/// This class receives tasks for execution.
class CoreWorkerTaskReceiver {
public:
using TaskHandler = std::function<Status(
const TaskSpecification &task_spec, const ResourceMappingType &resource_ids,
std::vector<std::shared_ptr<RayObject>> *results)>;
virtual ~CoreWorkerTaskReceiver() {}
};
} // namespace ray
#endif // RAY_CORE_WORKER_TRANSPORT_H