diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 1383f5edc..2a67364bf 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -665,7 +665,7 @@ cdef class CoreWorker: def run_task_loop(self): with nogil: - self.core_worker.get().Execution().Run() + self.core_worker.get().StartExecutingTasks() def get_current_task_id(self): return TaskID(self.core_worker.get().GetCurrentTaskId().Binary()) diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 3e8754e68..505bf1982 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -35,11 +35,6 @@ from ray.includes.libraylet cimport CRayletClient ctypedef unordered_map[c_string, c_vector[pair[int64_t, double]]] \ ResourceMappingType -cdef extern from "ray/core_worker/task_execution.h" namespace "ray" nogil: - cdef cppclass CTaskExecutionInterface "CoreWorkerTaskExecutionInterface": - void Run() - void Stop() - cdef extern from "ray/core_worker/profiling.h" nogil: cdef cppclass CProfiler "ray::worker::Profiler": void Start() @@ -95,7 +90,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CWorkerType &GetWorkerType() CLanguage &GetLanguage() CObjectInterface &Objects() - CTaskExecutionInterface &Execution() + + void StartExecutingTasks() CRayStatus SubmitTask( const CRayFunction &function, const c_vector[CTaskArg] &args, diff --git a/python/ray/includes/libcoreworker.pxi b/python/ray/includes/libcoreworker.pxi index 8e5a4d92b..1e920cdc7 100644 --- a/python/ray/includes/libcoreworker.pxi +++ b/python/ray/includes/libcoreworker.pxi @@ -35,7 +35,8 @@ cdef class ProfileEvent: elif self.extra_data is not None: extra_data = self.extra_data - self.inner.get().SetExtraData(json.dumps(extra_data).encode("ascii") if extra_data else b"{}") + self.inner.get().SetExtraData( + json.dumps(extra_data).encode("ascii") if extra_data else b"{}") # Deleting the CProfileEvent will add it to a queue to be pushed to # the driver. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 0db95165a..894118d6c 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2,6 +2,7 @@ #include "ray/common/ray_config.h" #include "ray/common/task/task_util.h" #include "ray/core_worker/context.h" +#include "ray/core_worker/transport/raylet_transport.h" namespace { @@ -44,16 +45,20 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, const std::string &store_socket, const std::string &raylet_socket, const JobID &job_id, const gcs::GcsClientOptions &gcs_options, const std::string &log_dir, const std::string &node_ip_address, - const CoreWorkerTaskExecutionInterface::TaskExecutionCallback - &task_execution_callback, + const TaskExecutionCallback &task_execution_callback, std::function check_signals, bool use_memory_store) : worker_type_(worker_type), language_(language), - raylet_socket_(raylet_socket), log_dir_(log_dir), worker_context_(worker_type, job_id), io_work_(io_service_), - heartbeat_timer_(io_service_) { + heartbeat_timer_(io_service_), + worker_server_(WorkerTypeString(worker_type), 0 /* let grpc choose a port */), + gcs_client_(gcs_options), + object_interface_(worker_context_, raylet_client_, store_socket, use_memory_store, + check_signals), + task_execution_service_work_(task_execution_service_), + task_execution_callback_(task_execution_callback) { // Initialize logging if log_dir is passed. Otherwise, it must be initialized // and cleaned up by the caller. if (log_dir_ != "") { @@ -65,37 +70,41 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, } // Initialize gcs client. - gcs_client_ = - std::unique_ptr(new gcs::RedisGcsClient(gcs_options)); - RAY_CHECK_OK(gcs_client_->Connect(io_service_)); + RAY_CHECK_OK(gcs_client_.Connect(io_service_)); // Initialize profiler. profiler_ = std::make_shared(worker_context_, node_ip_address, io_service_, gcs_client_); - object_interface_ = std::unique_ptr( - new CoreWorkerObjectInterface(worker_context_, raylet_client_, store_socket, - use_memory_store, check_signals)); - // Initialize task execution. - int rpc_server_port = 0; if (worker_type_ == WorkerType::WORKER) { - task_execution_interface_ = std::unique_ptr( - new CoreWorkerTaskExecutionInterface(*this, worker_context_, raylet_client_, - *object_interface_, profiler_, - task_execution_callback)); - rpc_server_port = task_execution_interface_->worker_server_.GetPort(); + RAY_CHECK(task_execution_callback_ != nullptr); + + // Initialize task receivers. + auto execute_task = std::bind(&CoreWorker::ExecuteTask, this, std::placeholders::_1, + std::placeholders::_2, std::placeholders::_3); + raylet_task_receiver_ = + std::unique_ptr(new CoreWorkerRayletTaskReceiver( + worker_context_, raylet_client_, object_interface_, task_execution_service_, + worker_server_, execute_task)); + direct_actor_task_receiver_ = std::unique_ptr( + new CoreWorkerDirectActorTaskReceiver(worker_context_, object_interface_, + task_execution_service_, worker_server_, + execute_task)); } + // Start RPC server after all the task receivers are properly initialized. + worker_server_.Run(); + // Initialize raylet client. // TODO(zhijunfu): currently RayletClient would crash in its constructor if it cannot // connect to Raylet after a number of retries, this can be changed later // so that the worker (java/python .etc) can retrieve and handle the error // instead of crashing. raylet_client_ = std::unique_ptr(new RayletClient( - raylet_socket_, WorkerID::FromBinary(worker_context_.GetWorkerID().Binary()), + raylet_socket, WorkerID::FromBinary(worker_context_.GetWorkerID().Binary()), (worker_type_ == ray::WorkerType::WORKER), worker_context_.GetCurrentJobID(), - language_, rpc_server_port)); + language_, worker_server_.GetPort())); // Set timer to periodically send heartbeats containing active object IDs to the raylet. // If the heartbeat timeout is < 0, the heartbeats are disabled. @@ -105,7 +114,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, heartbeat_timer_.async_wait(boost::bind(&CoreWorker::ReportActiveObjectIDs, this)); } - io_thread_ = std::thread(&CoreWorker::StartIOService, this); + io_thread_ = std::thread(&CoreWorker::RunIOService, this); // Create an entry for the driver task in the task table. This task is // added immediately with status RUNNING. This allows us to push errors @@ -125,14 +134,55 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, std::shared_ptr data = std::make_shared(); data->mutable_task()->mutable_task_spec()->CopyFrom(builder.Build().GetMessage()); - RAY_CHECK_OK(gcs_client_->raylet_task_table().Add(job_id, task_id, data, nullptr)); + RAY_CHECK_OK(gcs_client_.raylet_task_table().Add(job_id, task_id, data, nullptr)); SetCurrentTaskId(task_id); } direct_actor_submitter_ = std::unique_ptr( new CoreWorkerDirectActorTaskSubmitter( - io_service_, - object_interface_->CreateStoreProvider(StoreProviderType::MEMORY))); + io_service_, object_interface_.CreateStoreProvider(StoreProviderType::MEMORY))); +} + +CoreWorker::~CoreWorker() { + io_service_.stop(); + io_thread_.join(); + if (worker_type_ == WorkerType::WORKER) { + task_execution_service_.stop(); + } + if (log_dir_ != "") { + RayLog::ShutDownRayLog(); + } +} + +void CoreWorker::Disconnect() { + io_service_.stop(); + gcs_client_.Disconnect(); + if (raylet_client_) { + RAY_IGNORE_EXPR(raylet_client_->Disconnect()); + } +} + +void CoreWorker::RunIOService() { + // Block SIGINT and SIGTERM so they will be handled by the main thread. + sigset_t mask; + sigemptyset(&mask); + sigaddset(&mask, SIGINT); + sigaddset(&mask, SIGTERM); + pthread_sigmask(SIG_BLOCK, &mask, NULL); + + io_service_.run(); +} + +void CoreWorker::SetCurrentTaskId(const TaskID &task_id) { + worker_context_.SetCurrentTaskId(task_id); + main_thread_task_id_ = task_id; + // Clear all actor handles at the end of each non-actor task. + if (actor_id_.IsNil() && task_id.IsNil()) { + for (const auto &handle : actor_handles_) { + RAY_CHECK_OK(gcs_client_.Actors().AsyncUnsubscribe(handle.first, nullptr)); + } + actor_handles_.clear(); + } } void CoreWorker::AddActiveObjectID(const ObjectID &object_id) { @@ -175,56 +225,6 @@ void CoreWorker::ReportActiveObjectIDs() { active_object_ids_updated_ = false; } -CoreWorker::~CoreWorker() { - io_service_.stop(); - io_thread_.join(); - if (task_execution_interface_) { - task_execution_interface_->Stop(); - } - if (log_dir_ != "") { - RayLog::ShutDownRayLog(); - } -} - -void CoreWorker::Disconnect() { - io_service_.stop(); - if (gcs_client_) { - gcs_client_->Disconnect(); - } - if (raylet_client_) { - RAY_IGNORE_EXPR(raylet_client_->Disconnect()); - } -} - -void CoreWorker::StartIOService() { - // Block SIGINT and SIGTERM so they will be handled by the main thread. - sigset_t mask; - sigemptyset(&mask); - sigaddset(&mask, SIGINT); - sigaddset(&mask, SIGTERM); - pthread_sigmask(SIG_BLOCK, &mask, NULL); - - io_service_.run(); -} - -std::unique_ptr CoreWorker::CreateProfileEvent( - const std::string &event_type) { - return std::unique_ptr( - new worker::ProfileEvent(profiler_, event_type)); -} - -void CoreWorker::SetCurrentTaskId(const TaskID &task_id) { - worker_context_.SetCurrentTaskId(task_id); - main_thread_task_id_ = task_id; - // Clear all actor handles at the end of each non-actor task. - if (actor_id_.IsNil() && task_id.IsNil()) { - for (const auto &handle : actor_handles_) { - RAY_CHECK_OK(gcs_client_->Actors().AsyncUnsubscribe(handle.first, nullptr)); - } - actor_handles_.clear(); - } -} - TaskID CoreWorker::GetCallerId() const { TaskID caller_id; ActorID actor_id = GetActorId(); @@ -236,54 +236,6 @@ TaskID CoreWorker::GetCallerId() const { return caller_id; } -bool CoreWorker::AddActorHandle(std::unique_ptr actor_handle) { - const auto &actor_id = actor_handle->GetActorID(); - auto inserted = actor_handles_.emplace(actor_id, std::move(actor_handle)).second; - if (inserted) { - // Register a callback to handle actor notifications. - auto actor_notification_callback = [this](const ActorID &actor_id, - const gcs::ActorTableData &actor_data) { - if (actor_data.state() == gcs::ActorTableData::RECONSTRUCTING) { - auto it = actor_handles_.find(actor_id); - RAY_CHECK(it != actor_handles_.end()); - if (it->second->IsDirectCallActor()) { - // We have to reset the actor handle since the next instance of the - // actor will not have the last sequence number that we sent. - // TODO: Remove the check for direct calls. We do not reset for the - // raylet codepath because it tries to replay all tasks since the - // last actor checkpoint. - it->second->Reset(); - } - } else if (actor_data.state() == gcs::ActorTableData::DEAD) { - RAY_CHECK_OK(gcs_client_->Actors().AsyncUnsubscribe(actor_id, nullptr)); - // We cannot erase the actor handle here because clients can still - // submit tasks to dead actors. - } - - direct_actor_submitter_->HandleActorUpdate(actor_id, actor_data); - - RAY_LOG(INFO) << "received notification on actor, state=" - << static_cast(actor_data.state()) << ", actor_id: " << actor_id - << ", ip address: " << actor_data.ip_address() - << ", port: " << actor_data.port(); - }; - - RAY_CHECK_OK(gcs_client_->Actors().AsyncSubscribe( - actor_id, actor_notification_callback, nullptr)); - } - return inserted; -} - -Status CoreWorker::GetActorHandle(const ActorID &actor_id, - ActorHandle **actor_handle) const { - auto it = actor_handles_.find(actor_id); - if (it == actor_handles_.end()) { - return Status::Invalid("Handle for actor does not exist"); - } - *actor_handle = it->second.get(); - return Status::OK(); -} - Status CoreWorker::SubmitTask(const RayFunction &function, const std::vector &args, const TaskOptions &task_options, @@ -388,12 +340,156 @@ Status CoreWorker::SerializeActorHandle(const ActorID &actor_id, return status; } -const ResourceMappingType CoreWorker::GetResourceIDs() const { - if (worker_type_ == WorkerType::DRIVER) { - ResourceMappingType empty; - return empty; +bool CoreWorker::AddActorHandle(std::unique_ptr actor_handle) { + const auto &actor_id = actor_handle->GetActorID(); + auto inserted = actor_handles_.emplace(actor_id, std::move(actor_handle)).second; + if (inserted) { + // Register a callback to handle actor notifications. + auto actor_notification_callback = [this](const ActorID &actor_id, + const gcs::ActorTableData &actor_data) { + if (actor_data.state() == gcs::ActorTableData::RECONSTRUCTING) { + auto it = actor_handles_.find(actor_id); + RAY_CHECK(it != actor_handles_.end()); + if (it->second->IsDirectCallActor()) { + // We have to reset the actor handle since the next instance of the + // actor will not have the last sequence number that we sent. + // TODO: Remove the check for direct calls. We do not reset for the + // raylet codepath because it tries to replay all tasks since the + // last actor checkpoint. + it->second->Reset(); + } + } else if (actor_data.state() == gcs::ActorTableData::DEAD) { + RAY_CHECK_OK(gcs_client_.Actors().AsyncUnsubscribe(actor_id, nullptr)); + // We cannot erase the actor handle here because clients can still + // submit tasks to dead actors. + } + + direct_actor_submitter_->HandleActorUpdate(actor_id, actor_data); + + RAY_LOG(INFO) << "received notification on actor, state=" + << static_cast(actor_data.state()) << ", actor_id: " << actor_id + << ", ip address: " << actor_data.ip_address() + << ", port: " << actor_data.port(); + }; + + RAY_CHECK_OK(gcs_client_.Actors().AsyncSubscribe( + actor_id, actor_notification_callback, nullptr)); } - return task_execution_interface_->GetResourceIDs(); + return inserted; +} + +Status CoreWorker::GetActorHandle(const ActorID &actor_id, + ActorHandle **actor_handle) const { + auto it = actor_handles_.find(actor_id); + if (it == actor_handles_.end()) { + return Status::Invalid("Handle for actor does not exist"); + } + *actor_handle = it->second.get(); + return Status::OK(); +} + +std::unique_ptr CoreWorker::CreateProfileEvent( + const std::string &event_type) { + return std::unique_ptr( + new worker::ProfileEvent(profiler_, event_type)); +} + +void CoreWorker::StartExecutingTasks() { + idle_profile_event_.reset(new worker::ProfileEvent(profiler_, "worker_idle")); + task_execution_service_.run(); +} + +Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, + const ResourceMappingType &resource_ids, + std::vector> *results) { + idle_profile_event_.reset(); + RAY_LOG(DEBUG) << "Executing task " << task_spec.TaskId(); + + resource_ids_ = resource_ids; + worker_context_.SetCurrentTask(task_spec); + SetCurrentTaskId(task_spec.TaskId()); + + RayFunction func{task_spec.GetLanguage(), task_spec.FunctionDescriptor()}; + + std::vector> args; + std::vector arg_reference_ids; + RAY_CHECK_OK(BuildArgsForExecutor(task_spec, &args, &arg_reference_ids)); + + std::vector return_ids; + for (size_t i = 0; i < task_spec.NumReturns(); i++) { + return_ids.push_back(task_spec.ReturnId(i)); + } + + Status status; + TaskType task_type = TaskType::NORMAL_TASK; + if (task_spec.IsActorCreationTask()) { + RAY_CHECK(return_ids.size() > 0); + return_ids.pop_back(); + task_type = TaskType::ACTOR_CREATION_TASK; + SetActorId(task_spec.ActorCreationId()); + } else if (task_spec.IsActorTask()) { + RAY_CHECK(return_ids.size() > 0); + return_ids.pop_back(); + task_type = TaskType::ACTOR_TASK; + } + status = task_execution_callback_(task_type, func, + task_spec.GetRequiredResources().GetResourceMap(), + args, arg_reference_ids, return_ids, results); + + SetCurrentTaskId(TaskID::Nil()); + worker_context_.ResetCurrentTask(task_spec); + + // TODO(zhijunfu): + // 1. Check and handle failure. + // 2. Save or load checkpoint. + idle_profile_event_.reset(new worker::ProfileEvent(profiler_, "worker_idle")); + return status; +} + +Status CoreWorker::BuildArgsForExecutor(const TaskSpecification &task, + std::vector> *args, + std::vector *arg_reference_ids) { + auto num_args = task.NumArgs(); + args->resize(num_args); + arg_reference_ids->resize(num_args); + + std::vector object_ids_to_fetch; + std::vector indices; + + for (size_t i = 0; i < task.NumArgs(); ++i) { + int count = task.ArgIdCount(i); + if (count > 0) { + // pass by reference. + RAY_CHECK(count == 1); + object_ids_to_fetch.push_back(task.ArgId(i, 0)); + indices.push_back(i); + arg_reference_ids->at(i) = task.ArgId(i, 0); + } else { + // pass by value. + std::shared_ptr data = nullptr; + if (task.ArgDataSize(i)) { + data = std::make_shared(const_cast(task.ArgData(i)), + task.ArgDataSize(i)); + } + std::shared_ptr metadata = nullptr; + if (task.ArgMetadataSize(i)) { + metadata = std::make_shared( + const_cast(task.ArgMetadata(i)), task.ArgMetadataSize(i)); + } + args->at(i) = std::make_shared(data, metadata); + arg_reference_ids->at(i) = ObjectID::Nil(); + } + } + + std::vector> results; + auto status = object_interface_.Get(object_ids_to_fetch, -1, &results); + if (status.ok()) { + for (size_t i = 0; i < results.size(); i++) { + args->at(indices[i]) = results[i]; + } + } + + return status; } } // namespace ray diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index f8ef88a65..a8a2b8715 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -7,10 +7,12 @@ #include "ray/core_worker/context.h" #include "ray/core_worker/object_interface.h" #include "ray/core_worker/profiling.h" -#include "ray/core_worker/task_execution.h" #include "ray/core_worker/transport/direct_actor_transport.h" +#include "ray/core_worker/transport/raylet_transport.h" #include "ray/gcs/redis_gcs_client.h" #include "ray/raylet/raylet_client.h" +#include "ray/rpc/worker/worker_client.h" +#include "ray/rpc/worker/worker_server.h" namespace ray { @@ -18,6 +20,16 @@ namespace ray { /// of the worker. This class is supposed to be used to implement app-language (Java, /// Python, etc) workers. class CoreWorker { + // Callback that must be implemented and provided by the language-specific worker + // frontend to execute tasks and return their results. + using TaskExecutionCallback = std::function &required_resources, + const std::vector> &args, + const std::vector &arg_reference_ids, + const std::vector &return_ids, + std::vector> *results)>; + public: /// Construct a CoreWorker instance. /// @@ -45,8 +57,7 @@ class CoreWorker { const std::string &store_socket, const std::string &raylet_socket, const JobID &job_id, const gcs::GcsClientOptions &gcs_options, const std::string &log_dir, const std::string &node_ip_address, - const CoreWorkerTaskExecutionInterface::TaskExecutionCallback - &task_execution_callback, + const TaskExecutionCallback &task_execution_callback, std::function check_signals = nullptr, bool use_memory_store = true); @@ -54,32 +65,15 @@ class CoreWorker { void Disconnect(); - /// Type of this worker. WorkerType GetWorkerType() const { return worker_type_; } - /// Language of this worker. Language GetLanguage() const { return language_; } WorkerContext &GetWorkerContext() { return worker_context_; } RayletClient &GetRayletClient() { return *raylet_client_; } - /// Return the `CoreWorkerObjectInterface` that contains methods related to object - /// store. - CoreWorkerObjectInterface &Objects() { return *object_interface_; } - - /// Create a profile event with a reference to the core worker's profiler. - std::unique_ptr CreateProfileEvent(const std::string &event_type); - - /// Return the `CoreWorkerTaskExecutionInterface` that contains methods related to - /// task execution. - CoreWorkerTaskExecutionInterface &Execution() { - RAY_CHECK(task_execution_interface_ != nullptr); - return *task_execution_interface_; - } - - // Get the resource IDs available to this worker (as assigned by the raylet). - const ResourceMappingType GetResourceIDs() const; + CoreWorkerObjectInterface &Objects() { return object_interface_; } const TaskID &GetCurrentTaskId() const { return worker_context_.GetCurrentTaskID(); } @@ -92,7 +86,15 @@ class CoreWorker { actor_id_ = actor_id; } - const ActorID &GetActorId() const { return actor_id_; } + // Add this object ID to the set of active object IDs that is sent to the raylet + // in the heartbeat messsage. + void AddActiveObjectID(const ObjectID &object_id); + + // Remove this object ID from the set of active object IDs that is sent to the raylet + // in the heartbeat messsage. + void RemoveActiveObjectID(const ObjectID &object_id); + + /* Public methods related to task submission. */ /// Get the caller ID used to submit tasks from this worker to an actor. /// @@ -102,8 +104,6 @@ class CoreWorker { /// of the bytes zeroed out. TaskID GetCallerId() const; - /* Methods related to task submission. */ - /// Submit a normal task. /// /// \param[in] function The remote function to execute. @@ -164,15 +164,29 @@ class CoreWorker { /// \return Status::Invalid if we don't have the specified handle. Status SerializeActorHandle(const ActorID &actor_id, std::string *output) const; - // Add this object ID to the set of active object IDs that is sent to the raylet - // in the heartbeat messsage. - void AddActiveObjectID(const ObjectID &object_id); + /* Public methods related to task execution. Should not be used by driver processes. */ - // Remove this object ID from the set of active object IDs that is sent to the raylet - // in the heartbeat messsage. - void RemoveActiveObjectID(const ObjectID &object_id); + const ActorID &GetActorId() const { return actor_id_; } + + // Get the resource IDs available to this worker (as assigned by the raylet). + const ResourceMappingType GetResourceIDs() const { return resource_ids_; } + + /// Create a profile event with a reference to the core worker's profiler. + std::unique_ptr CreateProfileEvent(const std::string &event_type); + + /// Start receiving and executing tasks. + /// \return void. + void StartExecutingTasks(); private: + /// Run the io_service_ event loop. This should be called in a background thread. + void RunIOService(); + + /// Send the list of active object IDs to the raylet. + void ReportActiveObjectIDs(); + + /* Private methods related to task submission. */ + /// Give this worker a handle to an actor. /// /// This handle will remain as long as the current actor or task is @@ -193,50 +207,76 @@ class CoreWorker { /// \return Status::Invalid if we don't have this actor handle. Status GetActorHandle(const ActorID &actor_id, ActorHandle **actor_handle) const; - void StartIOService(); + /* Private methods related to task execution. Should not be used by driver processes. */ - void ReportActiveObjectIDs(); + /// Execute a task. + /// + /// \param spec[in] Task specification. + /// \param spec[in] Resource IDs of resources assigned to this worker. + /// \param results[out] Results for task execution. + /// \return Status. + Status ExecuteTask(const TaskSpecification &task_spec, + const ResourceMappingType &resource_ids, + std::vector> *results); + /// Build arguments for task executor. This would loop through all the arguments + /// in task spec, and for each of them that's passed by reference (ObjectID), + /// fetch its content from store and; for arguments that are passed by value, + /// just copy their content. + /// + /// \param spec[in] Task specification. + /// \param args[out] Argument data as RayObjects. + /// \param args[out] ObjectIDs corresponding to each by reference argument. The length + /// of this vector will be the same as args, and by value arguments + /// will have ObjectID::Nil(). + /// // TODO(edoakes): this is a bit of a hack that's necessary because + /// we have separate serialization paths for by-value and by-reference + /// arguments in Python. This should ideally be handled better there. + /// \return The arguments for passing to task executor. + Status BuildArgsForExecutor(const TaskSpecification &task, + std::vector> *args, + std::vector *arg_reference_ids); + + /// Type of this worker (i.e., DRIVER or WORKER). const WorkerType worker_type_; + + /// Application language of this worker (i.e., PYTHON or JAVA). const Language language_; - const std::string raylet_socket_; + + /// Directory where log files are written. const std::string log_dir_; + + /// Shared state of the worker. Includes process-level and thread-level state. + /// TODO(edoakes): we should move process-level state into this class and make + /// this a ThreadContext. WorkerContext worker_context_; + /// The ID of the current task being executed by the main thread. If there /// are multiple threads, they will have a thread-local task ID stored in the /// worker context. TaskID main_thread_task_id_; - /// Our actor ID. If this is nil, then we execute only stateless tasks. - ActorID actor_id_; /// Event loop where the IO events are handled. e.g. async GCS operations. boost::asio::io_service io_service_; + /// Keeps the io_service_ alive. boost::asio::io_service::work io_work_; + /// Timer used to periodically send heartbeat containing active object IDs to the /// raylet. boost::asio::steady_timer heartbeat_timer_; - // Thread that runs a boost::asio service to process IO events. - std::thread io_thread_; + /// RPC server used to receive tasks to execute. + rpc::GrpcServer worker_server_; // Client to the GCS shared by core worker interfaces. - std::unique_ptr gcs_client_; + gcs::RedisGcsClient gcs_client_; // Client to the raylet shared by core worker interfaces. std::unique_ptr raylet_client_; - // Interface to submit tasks directly to other actors. - std::unique_ptr direct_actor_submitter_; - - // Interface for storing and retrieving shared objects. - std::unique_ptr object_interface_; - - // Profiler including a background thread that pushes profiling events to the GCS. - std::shared_ptr profiler_; - - /// Map from actor ID to a handle to that actor. - std::unordered_map > actor_handles_; + // Thread that runs a boost::asio service to process IO events. + std::thread io_thread_; /// Set of object IDs that are in scope in the language worker. std::unordered_set active_object_ids_; @@ -245,8 +285,48 @@ class CoreWorker { /// last time it was sent to the raylet. bool active_object_ids_updated_ = false; - /// Only available if it's not a driver. - std::unique_ptr task_execution_interface_; + // Interface for storing and retrieving shared objects. + CoreWorkerObjectInterface object_interface_; + + /* Fields related to task submission. */ + + // Interface to submit tasks directly to other actors. + std::unique_ptr direct_actor_submitter_; + + /// Map from actor ID to a handle to that actor. + std::unordered_map> actor_handles_; + + /* Fields related to task execution. */ + + /// Our actor ID. If this is nil, then we execute only stateless tasks. + ActorID actor_id_; + + /// Event loop where tasks are processed. + boost::asio::io_service task_execution_service_; + + /// The asio work to keep task_execution_service_ alive. + boost::asio::io_service::work task_execution_service_work_; + + // Profiler including a background thread that pushes profiling events to the GCS. + std::shared_ptr profiler_; + + // Profile event for when the worker is idle. Should be reset when the worker + // enters and exits an idle period. + std::unique_ptr idle_profile_event_; + + // Task execution callback. + TaskExecutionCallback task_execution_callback_; + + /// A map from resource name to the resource IDs that are currently reserved + /// for this worker. Each pair consists of the resource ID and the fraction + /// of that resource allocated for this worker. + ResourceMappingType resource_ids_; + + // Interface that receives tasks from the raylet. + std::unique_ptr raylet_task_receiver_; + + // Interface that receives tasks from direct actor calls. + std::unique_ptr direct_actor_task_receiver_; friend class CoreWorkerTest; }; diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc b/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc index 5465b0e97..e19cc9e5d 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc +++ b/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc @@ -95,7 +95,7 @@ JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeRunTaskExecut local_env = env; local_java_task_executor = javaTaskExecutor; auto core_worker = reinterpret_cast(nativeCoreWorkerPointer); - core_worker->Execution().Run(); + core_worker->StartExecutingTasks(); local_env = nullptr; local_java_task_executor = nullptr; } diff --git a/src/ray/core_worker/profiling.cc b/src/ray/core_worker/profiling.cc index 0ce704d3b..81e682421 100644 --- a/src/ray/core_worker/profiling.cc +++ b/src/ray/core_worker/profiling.cc @@ -14,8 +14,7 @@ ProfileEvent::ProfileEvent(const std::shared_ptr profiler, } Profiler::Profiler(WorkerContext &worker_context, const std::string &node_ip_address, - boost::asio::io_service &io_service, - std::unique_ptr &gcs_client) + boost::asio::io_service &io_service, gcs::RedisGcsClient &gcs_client) : io_service_(io_service), timer_(io_service_, boost::asio::chrono::seconds(1)), gcs_client_(gcs_client) { @@ -36,7 +35,7 @@ void Profiler::FlushEvents() { if (rpc_profile_data_.profile_events_size() != 0) { // TODO(edoakes): this should be migrated to use the new GCS client interface // instead of the raw table interface once it's ready. - if (!gcs_client_->profile_table().AddProfileEventBatch(rpc_profile_data_).ok()) { + if (!gcs_client_.profile_table().AddProfileEventBatch(rpc_profile_data_).ok()) { RAY_LOG(WARNING) << "Failed to push profile events to GCS."; } else { RAY_LOG(DEBUG) << "Pushed " << rpc_profile_data_.profile_events_size() diff --git a/src/ray/core_worker/profiling.h b/src/ray/core_worker/profiling.h index 2d745a6b1..f523ca63f 100644 --- a/src/ray/core_worker/profiling.h +++ b/src/ray/core_worker/profiling.h @@ -14,8 +14,7 @@ namespace worker { class Profiler { public: Profiler(WorkerContext &worker_context, const std::string &node_ip_address, - boost::asio::io_service &io_service, - std::unique_ptr &gcs_client); + boost::asio::io_service &io_service, gcs::RedisGcsClient &gcs_client); // Add an event to the queue to be flushed periodically. void AddEvent(const rpc::ProfileTableData::ProfileEvent &event); @@ -34,7 +33,7 @@ class Profiler { // until they are flushed. rpc::ProfileTableData rpc_profile_data_ GUARDED_BY(mu_); - std::unique_ptr &gcs_client_; + gcs::RedisGcsClient &gcs_client_; absl::Mutex mu_; }; diff --git a/src/ray/core_worker/task_execution.cc b/src/ray/core_worker/task_execution.cc deleted file mode 100644 index bf2bee181..000000000 --- a/src/ray/core_worker/task_execution.cc +++ /dev/null @@ -1,150 +0,0 @@ -#include "ray/core_worker/task_execution.h" -#include "ray/core_worker/context.h" -#include "ray/core_worker/core_worker.h" -#include "ray/core_worker/transport/direct_actor_transport.h" -#include "ray/core_worker/transport/raylet_transport.h" - -namespace ray { - -CoreWorkerTaskExecutionInterface::CoreWorkerTaskExecutionInterface( - CoreWorker &core_worker, WorkerContext &worker_context, - std::unique_ptr &raylet_client, - CoreWorkerObjectInterface &object_interface, - const std::shared_ptr profiler, - const TaskExecutionCallback &task_execution_callback) - : core_worker_(core_worker), - worker_context_(worker_context), - object_interface_(object_interface), - profiler_(profiler), - task_execution_callback_(task_execution_callback), - worker_server_("Worker", 0 /* let grpc choose port */), - main_service_(std::make_shared()), - main_work_(*main_service_) { - RAY_CHECK(task_execution_callback_ != nullptr); - - auto func = - std::bind(&CoreWorkerTaskExecutionInterface::ExecuteTask, this, - std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); - task_receivers_.emplace( - TaskTransportType::RAYLET, - std::unique_ptr(new CoreWorkerRayletTaskReceiver( - worker_context_, raylet_client, object_interface_, *main_service_, - worker_server_, func))); - task_receivers_.emplace( - TaskTransportType::DIRECT_ACTOR, - std::unique_ptr( - new CoreWorkerDirectActorTaskReceiver(worker_context_, object_interface_, - *main_service_, worker_server_, func))); - - // Start RPC server after all the task receivers are properly initialized. - worker_server_.Run(); -} - -Status CoreWorkerTaskExecutionInterface::ExecuteTask( - const TaskSpecification &task_spec, const ResourceMappingType &resource_ids, - std::vector> *results) { - idle_profile_event_.reset(); - RAY_LOG(DEBUG) << "Executing task " << task_spec.TaskId(); - - resource_ids_ = resource_ids; - worker_context_.SetCurrentTask(task_spec); - core_worker_.SetCurrentTaskId(task_spec.TaskId()); - - RayFunction func{task_spec.GetLanguage(), task_spec.FunctionDescriptor()}; - - std::vector> args; - std::vector arg_reference_ids; - RAY_CHECK_OK(BuildArgsForExecutor(task_spec, &args, &arg_reference_ids)); - - std::vector return_ids; - for (size_t i = 0; i < task_spec.NumReturns(); i++) { - return_ids.push_back(task_spec.ReturnId(i)); - } - - Status status; - TaskType task_type = TaskType::NORMAL_TASK; - if (task_spec.IsActorCreationTask()) { - RAY_CHECK(return_ids.size() > 0); - return_ids.pop_back(); - task_type = TaskType::ACTOR_CREATION_TASK; - core_worker_.SetActorId(task_spec.ActorCreationId()); - } else if (task_spec.IsActorTask()) { - RAY_CHECK(return_ids.size() > 0); - return_ids.pop_back(); - task_type = TaskType::ACTOR_TASK; - } - status = task_execution_callback_(task_type, func, - task_spec.GetRequiredResources().GetResourceMap(), - args, arg_reference_ids, return_ids, results); - - core_worker_.SetCurrentTaskId(TaskID::Nil()); - worker_context_.ResetCurrentTask(task_spec); - - // TODO(zhijunfu): - // 1. Check and handle failure. - // 2. Save or load checkpoint. - idle_profile_event_.reset(new worker::ProfileEvent(profiler_, "worker_idle")); - return status; -} - -void CoreWorkerTaskExecutionInterface::Run() { - idle_profile_event_.reset(new worker::ProfileEvent(profiler_, "worker_idle")); - main_service_->run(); -} - -void CoreWorkerTaskExecutionInterface::Stop() { - // Stop main IO service. - std::shared_ptr main_service = main_service_; - // Delay the execution of io_service::stop() to avoid deadlock if - // CoreWorkerTaskExecutionInterface::Stop is called inside a task. - idle_profile_event_.reset(); - main_service_->post([main_service]() { main_service->stop(); }); -} - -Status CoreWorkerTaskExecutionInterface::BuildArgsForExecutor( - const TaskSpecification &task, std::vector> *args, - std::vector *arg_reference_ids) { - auto num_args = task.NumArgs(); - args->resize(num_args); - arg_reference_ids->resize(num_args); - - std::vector object_ids_to_fetch; - std::vector indices; - - for (size_t i = 0; i < task.NumArgs(); ++i) { - int count = task.ArgIdCount(i); - if (count > 0) { - // pass by reference. - RAY_CHECK(count == 1); - object_ids_to_fetch.push_back(task.ArgId(i, 0)); - indices.push_back(i); - arg_reference_ids->at(i) = task.ArgId(i, 0); - } else { - // pass by value. - std::shared_ptr data = nullptr; - if (task.ArgDataSize(i)) { - data = std::make_shared(const_cast(task.ArgData(i)), - task.ArgDataSize(i)); - } - std::shared_ptr metadata = nullptr; - if (task.ArgMetadataSize(i)) { - metadata = std::make_shared( - const_cast(task.ArgMetadata(i)), task.ArgMetadataSize(i)); - } - args->at(i) = std::make_shared(data, metadata); - arg_reference_ids->at(i) = ObjectID::Nil(); - } - } - - std::vector> results; - auto status = object_interface_.Get(object_ids_to_fetch, -1, &results); - if (status.ok()) { - for (size_t i = 0; i < results.size(); i++) { - args->at(indices[i]) = results[i]; - } - } - - return status; -} - -} // namespace ray diff --git a/src/ray/core_worker/task_execution.h b/src/ray/core_worker/task_execution.h deleted file mode 100644 index ad07fdd61..000000000 --- a/src/ray/core_worker/task_execution.h +++ /dev/null @@ -1,128 +0,0 @@ -#ifndef RAY_CORE_WORKER_TASK_EXECUTION_H -#define RAY_CORE_WORKER_TASK_EXECUTION_H - -#include "ray/common/buffer.h" -#include "ray/common/status.h" -#include "ray/core_worker/common.h" -#include "ray/core_worker/context.h" -#include "ray/core_worker/object_interface.h" -#include "ray/core_worker/profiling.h" -#include "ray/core_worker/transport/transport.h" -#include "ray/rpc/client_call.h" -#include "ray/rpc/worker/worker_client.h" -#include "ray/rpc/worker/worker_server.h" - -namespace ray { - -class CoreWorker; - -namespace raylet { -class TaskSpecification; -} - -/// The interface that contains all `CoreWorker` methods that are related to task -/// execution. -class CoreWorkerTaskExecutionInterface { - public: - // Callback that must be implemented and provided by the language-specific worker - // frontend to execute tasks and return their results. - using TaskExecutionCallback = std::function &required_resources, - const std::vector> &args, - const std::vector &arg_reference_ids, - const std::vector &return_ids, - std::vector> *results)>; - - CoreWorkerTaskExecutionInterface(CoreWorker &core_worker, WorkerContext &worker_context, - std::unique_ptr &raylet_client, - CoreWorkerObjectInterface &object_interface, - const std::shared_ptr profiler, - const TaskExecutionCallback &task_execution_callback); - - // Get the resource IDs available to this worker (as assigned by the raylet). - const ResourceMappingType &GetResourceIDs() const { return resource_ids_; } - - /// Start receiving and executing tasks. - /// \return void. - void Run(); - - /// Stop receiving and executing tasks. - /// \return void. - void Stop(); - - private: - /// Build arguments for task executor. This would loop through all the arguments - /// in task spec, and for each of them that's passed by reference (ObjectID), - /// fetch its content from store and; for arguments that are passed by value, - /// just copy their content. - /// - /// \param spec[in] Task specification. - /// \param args[out] Argument data as RayObjects. - /// \param args[out] ObjectIDs corresponding to each by reference argument. The length - /// of this vector will be the same as args, and by value arguments - /// will have ObjectID::Nil(). - /// // TODO(edoakes): this is a bit of a hack that's necessary because - /// we have separate serialization paths for by-value and by-reference - /// arguments in Python. This should ideally be handled better there. - /// \return The arguments for passing to task executor. - Status BuildArgsForExecutor(const TaskSpecification &task, - std::vector> *args, - std::vector *arg_reference_ids); - - /// Execute a task. - /// - /// \param spec[in] Task specification. - /// \param spec[in] Resource IDs of resources assigned to this worker. - /// \param results[out] Results for task execution. - /// \return Status. - Status ExecuteTask(const TaskSpecification &task_spec, - const ResourceMappingType &resource_ids, - std::vector> *results); - - /// Reference to the parent CoreWorker. - /// TODO(edoakes) this is very ugly, but unfortunately necessary so that we - /// can clear the ActorHandle state when we start executing a task. Two - /// possible solutions are to either move the ActorHandle state into the - /// WorkerContext or to remove this interface entirely. - CoreWorker &core_worker_; - - /// Reference to the parent CoreWorker's context. - WorkerContext &worker_context_; - /// Reference to the parent CoreWorker's objects interface. - CoreWorkerObjectInterface &object_interface_; - - // Reference to the parent CoreWorker's profiler. - const std::shared_ptr profiler_; - - // Task execution callback. - TaskExecutionCallback task_execution_callback_; - - /// All the task task receivers supported. - EnumUnorderedMap> - task_receivers_; - - /// The RPC server. - rpc::GrpcServer worker_server_; - - /// Event loop where tasks are processed. - std::shared_ptr main_service_; - - /// The asio work to keep main_service_ alive. - boost::asio::io_service::work main_work_; - - /// A map from resource name to the resource IDs that are currently reserved - /// for this worker. Each pair consists of the resource ID and the fraction - /// of that resource allocated for this worker. - ResourceMappingType resource_ids_; - - // Profile event for when the worker is idle. Should be reset when the worker - // enters and exits an idle period. - std::unique_ptr idle_profile_event_; - - friend class CoreWorker; -}; - -} // namespace ray - -#endif // RAY_CORE_WORKER_TASK_EXECUTION_H diff --git a/src/ray/core_worker/test/mock_worker.cc b/src/ray/core_worker/test/mock_worker.cc index a88593b0d..74f4b5671 100644 --- a/src/ray/core_worker/test/mock_worker.cc +++ b/src/ray/core_worker/test/mock_worker.cc @@ -2,7 +2,6 @@ #include "ray/core_worker/context.h" #include "ray/core_worker/core_worker.h" #include "ray/core_worker/store_provider/store_provider.h" -#include "ray/core_worker/task_execution.h" #include "src/ray/util/test_util.h" using namespace std::placeholders; @@ -27,10 +26,7 @@ class MockWorker { /*node_id_address=*/"127.0.0.1", std::bind(&MockWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, _7)) {} - void Run() { - // Start executing tasks. - worker_.Execution().Run(); - } + void StartExecutingTasks() { worker_.StartExecutingTasks(); } private: Status ExecuteTask(TaskType task_type, const RayFunction &ray_function, @@ -82,6 +78,6 @@ int main(int argc, char **argv) { ray::gcs::GcsClientOptions gcs_options("127.0.0.1", 6379, ""); ray::MockWorker worker(store_socket, raylet_socket, gcs_options); - worker.Run(); + worker.StartExecutingTasks(); return 0; } diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index 40c219eda..137dc9c19 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -7,7 +7,6 @@ #include "ray/common/id.h" #include "ray/core_worker/object_interface.h" -#include "ray/core_worker/transport/transport.h" #include "ray/gcs/redis_gcs_client.h" #include "ray/rpc/worker/direct_actor_client.h" #include "ray/rpc/worker/direct_actor_server.h" @@ -196,9 +195,12 @@ class SchedulingQueue { friend class SchedulingQueueTest; }; -class CoreWorkerDirectActorTaskReceiver : public CoreWorkerTaskReceiver, - public rpc::DirectActorHandler { +class CoreWorkerDirectActorTaskReceiver : public rpc::DirectActorHandler { public: + using TaskHandler = std::function> *results)>; + CoreWorkerDirectActorTaskReceiver(WorkerContext &worker_context, CoreWorkerObjectInterface &object_interface, boost::asio::io_service &io_service, diff --git a/src/ray/core_worker/transport/raylet_transport.h b/src/ray/core_worker/transport/raylet_transport.h index 76e3dc8cf..21cd31bdb 100644 --- a/src/ray/core_worker/transport/raylet_transport.h +++ b/src/ray/core_worker/transport/raylet_transport.h @@ -4,15 +4,17 @@ #include #include "ray/core_worker/object_interface.h" -#include "ray/core_worker/transport/transport.h" #include "ray/raylet/raylet_client.h" #include "ray/rpc/worker/worker_server.h" namespace ray { -class CoreWorkerRayletTaskReceiver : public CoreWorkerTaskReceiver, - public rpc::WorkerTaskHandler { +class CoreWorkerRayletTaskReceiver : public rpc::WorkerTaskHandler { public: + using TaskHandler = std::function> *results)>; + CoreWorkerRayletTaskReceiver(WorkerContext &worker_context, std::unique_ptr &raylet_client, CoreWorkerObjectInterface &object_interface, diff --git a/src/ray/core_worker/transport/transport.h b/src/ray/core_worker/transport/transport.h deleted file mode 100644 index 1e54e09e8..000000000 --- a/src/ray/core_worker/transport/transport.h +++ /dev/null @@ -1,27 +0,0 @@ -#ifndef RAY_CORE_WORKER_TRANSPORT_H -#define RAY_CORE_WORKER_TRANSPORT_H - -#include - -#include "ray/common/buffer.h" -#include "ray/common/id.h" -#include "ray/common/status.h" -#include "ray/common/task/task_spec.h" -#include "ray/core_worker/common.h" -#include "ray/core_worker/store_provider/store_provider.h" - -namespace ray { - -/// This class receives tasks for execution. -class CoreWorkerTaskReceiver { - public: - using TaskHandler = std::function> *results)>; - - virtual ~CoreWorkerTaskReceiver() {} -}; - -} // namespace ray - -#endif // RAY_CORE_WORKER_TRANSPORT_H