From a617cb88130d0ca4d68c112bfa9bb1bf285a851e Mon Sep 17 00:00:00 2001 From: Chen Shen Date: Fri, 12 Nov 2021 09:28:58 -0800 Subject: [PATCH] [Core][actor out-of-order execution 1/n] Move CoreWorkerDirectActorTaskSubmitter into a separate file This is the stack of PRs that supports out or order execution for threaded/async actors. Next PR #20149 At a high level, threaded actor/async actor already don't guarantee execution order, and the current "sequential" order implementation has caused some confusion and inconvenience. Please refer to #19822 for detailed discussion. The major changes of this stack is introduce OutOfOrderActorSubmitQueue ([Core][actor out-of-order execution 3/n] Introducing out-of-order actor submit queue #20150) Specifically, we have a per-client task submit queue, which guarantees the sequential order of task submission. In [Core][actor out-of-order execution 3/n] Introducing out-of-order actor submit queue #20150 we introduce OutOfOrderActorSubmitQueue, which relaxes the guarantee; it send the task over the network as soon as its dependency is resolved. -- there are 2 PRs ([Core][actor out-of-order execution 1/n] Move CoreWorkerDirectActorTaskSubmitter into a separate file #20148 and [Core][actor out-of-order execution 2/n] create abstraction for the queuing logic on the client/actor submission. #20149) precedes this PR, which refactor the actor submission logic to make the abstraction possible. OutOfOrderActorSchedullingQueue ([Core][actor out-of-order execution 5/n] implement out-of-order scheduling queue #20176) Similarly, we also have a per-client task scheduling queue on the actor to ensure tasks are executed according to the submission order (sequence_no). OutOfOrderActorSchedullingQueue relaxes the guarantee by enqueuing the task as soon as all their dependencies are resolved. -- there is one PR ([Core][actor out-of-order execution 4/n] refactor the actor receiver code #20160) precedes this PR, which refactor the actor scheduling logic to make it the code easier to read; however this one is optional. plumbing PR ([Core][actor out-of-order execution 6/n] plumbing work to make it work e2e #20177) This PR enables the out of order execution by introducing options(execute_out_of_order=True); and create actor client/server components according to the configuration. There are something the PR hasn't touched upon is the restart/retry guarantees, which might need some discussion. This PR we separate client from server code for Actor task submission. This makes the follow up change easier. --- .../transport/direct_actor_task_submitter.cc | 496 ++++++++++++++++++ .../transport/direct_actor_task_submitter.h | 290 ++++++++++ .../transport/direct_actor_transport.cc | 469 ----------------- .../transport/direct_actor_transport.h | 251 +-------- 4 files changed, 787 insertions(+), 719 deletions(-) create mode 100644 src/ray/core_worker/transport/direct_actor_task_submitter.cc create mode 100644 src/ray/core_worker/transport/direct_actor_task_submitter.h diff --git a/src/ray/core_worker/transport/direct_actor_task_submitter.cc b/src/ray/core_worker/transport/direct_actor_task_submitter.cc new file mode 100644 index 000000000..a8a87bbb3 --- /dev/null +++ b/src/ray/core_worker/transport/direct_actor_task_submitter.cc @@ -0,0 +1,496 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/core_worker/transport/direct_actor_task_submitter.h" + +#include + +#include "ray/common/task/task.h" + +using ray::rpc::ActorTableData; + +namespace ray { +namespace core { + +void CoreWorkerDirectActorTaskSubmitter::AddActorQueueIfNotExists( + const ActorID &actor_id) { + absl::MutexLock lock(&mu_); + // No need to check whether the insert was successful, since it is possible + // for this worker to have multiple references to the same actor. + client_queues_.emplace(actor_id, ClientQueue()); +} + +void CoreWorkerDirectActorTaskSubmitter::KillActor(const ActorID &actor_id, + bool force_kill, bool no_restart) { + absl::MutexLock lock(&mu_); + rpc::KillActorRequest request; + request.set_intended_actor_id(actor_id.Binary()); + request.set_force_kill(force_kill); + request.set_no_restart(no_restart); + + auto it = client_queues_.find(actor_id); + // The language frontend can only kill actors that it has a reference to. + RAY_CHECK(it != client_queues_.end()); + + if (!it->second.pending_force_kill) { + it->second.pending_force_kill = request; + } else if (force_kill) { + // Overwrite the previous request to kill the actor if the new request is a + // force kill. + it->second.pending_force_kill->set_force_kill(true); + if (no_restart) { + // Overwrite the previous request to disable restart if the new request's + // no_restart flag is set to true. + it->second.pending_force_kill->set_no_restart(true); + } + } + + SendPendingTasks(actor_id); +} + +Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spec) { + auto task_id = task_spec.TaskId(); + auto actor_id = task_spec.ActorId(); + RAY_LOG(DEBUG) << "Submitting task " << task_id; + RAY_CHECK(task_spec.IsActorTask()); + + bool task_queued = false; + uint64_t send_pos = 0; + { + absl::MutexLock lock(&mu_); + auto queue = client_queues_.find(actor_id); + RAY_CHECK(queue != client_queues_.end()); + if (queue->second.state != rpc::ActorTableData::DEAD) { + // We must fix the send order prior to resolving dependencies, which may + // complete out of order. This ensures that we will not deadlock due to + // backpressure. The receiving actor will execute the tasks according to + // this sequence number. + send_pos = task_spec.ActorCounter(); + auto inserted = + queue->second.requests.emplace(send_pos, std::make_pair(task_spec, false)); + RAY_CHECK(inserted.second); + task_queued = true; + } + } + + if (task_queued) { + // We must release the lock before resolving the task dependencies since + // the callback may get called in the same call stack. + resolver_.ResolveDependencies(task_spec, [this, send_pos, actor_id](Status status) { + absl::MutexLock lock(&mu_); + auto queue = client_queues_.find(actor_id); + RAY_CHECK(queue != client_queues_.end()); + auto it = queue->second.requests.find(send_pos); + // Only dispatch tasks if the submitted task is still queued. The task + // may have been dequeued if the actor has since failed. + if (it != queue->second.requests.end()) { + if (status.ok()) { + it->second.second = true; + SendPendingTasks(actor_id); + } else { + auto task_id = it->second.first.TaskId(); + queue->second.requests.erase(it); + task_finisher_.PendingTaskFailed( + task_id, rpc::ErrorType::DEPENDENCY_RESOLUTION_FAILED, &status); + } + } + }); + } else { + // Do not hold the lock while calling into task_finisher_. + task_finisher_.MarkTaskCanceled(task_id); + std::shared_ptr creation_task_exception = nullptr; + { + absl::MutexLock lock(&mu_); + auto queue = client_queues_.find(task_spec.ActorId()); + creation_task_exception = queue->second.creation_task_exception; + } + auto status = Status::IOError("cancelling task of dead actor"); + // No need to increment the number of completed tasks since the actor is + // dead. + RAY_UNUSED(!task_finisher_.PendingTaskFailed(task_id, rpc::ErrorType::ACTOR_DIED, + &status, creation_task_exception)); + } + + // If the task submission subsequently fails, then the client will receive + // the error in a callback. + return Status::OK(); +} + +void CoreWorkerDirectActorTaskSubmitter::DisconnectRpcClient(ClientQueue &queue) { + queue.rpc_client = nullptr; + core_worker_client_pool_.Disconnect(WorkerID::FromBinary(queue.worker_id)); + queue.worker_id.clear(); + queue.pending_force_kill.reset(); +} + +void CoreWorkerDirectActorTaskSubmitter::FailInflightTasks( + const std::unordered_map> + &inflight_task_callbacks) { + // NOTE(kfstorm): We invoke the callbacks with a bad status to act like there's a + // network issue. We don't call `task_finisher_.PendingTaskFailed` directly because + // there's much more work to do in the callback. + auto status = Status::IOError("Fail all inflight tasks due to actor state change."); + rpc::PushTaskReply reply; + for (const auto &entry : inflight_task_callbacks) { + entry.second(status, reply); + } +} + +void CoreWorkerDirectActorTaskSubmitter::ConnectActor(const ActorID &actor_id, + const rpc::Address &address, + int64_t num_restarts) { + RAY_LOG(DEBUG) << "Connecting to actor " << actor_id << " at worker " + << WorkerID::FromBinary(address.worker_id()); + + std::unordered_map> + inflight_task_callbacks; + + { + absl::MutexLock lock(&mu_); + + auto queue = client_queues_.find(actor_id); + RAY_CHECK(queue != client_queues_.end()); + if (num_restarts < queue->second.num_restarts) { + // This message is about an old version of the actor and the actor has + // already restarted since then. Skip the connection. + RAY_LOG(INFO) << "Skip actor connection that has already been restarted, actor_id=" + << actor_id; + return; + } + + if (queue->second.rpc_client && + queue->second.rpc_client->Addr().ip_address() == address.ip_address() && + queue->second.rpc_client->Addr().port() == address.port()) { + RAY_LOG(DEBUG) << "Skip actor that has already been connected, actor_id=" + << actor_id; + return; + } + + if (queue->second.state == rpc::ActorTableData::DEAD) { + // This message is about an old version of the actor and the actor has + // already died since then. Skip the connection. + return; + } + + queue->second.num_restarts = num_restarts; + if (queue->second.rpc_client) { + // Clear the client to the old version of the actor. + DisconnectRpcClient(queue->second); + inflight_task_callbacks = std::move(queue->second.inflight_task_callbacks); + queue->second.inflight_task_callbacks.clear(); + } + + queue->second.state = rpc::ActorTableData::ALIVE; + // Update the mapping so new RPCs go out with the right intended worker id. + queue->second.worker_id = address.worker_id(); + // Create a new connection to the actor. + queue->second.rpc_client = core_worker_client_pool_.GetOrConnect(address); + // This assumes that all replies from the previous incarnation + // of the actor have been received. This assumption should be OK + // because we fail all inflight tasks in `DisconnectRpcClient`. + RAY_LOG(DEBUG) << "Resetting caller starts at for actor " << actor_id << " from " + << queue->second.caller_starts_at << " to " + << queue->second.next_task_reply_position; + queue->second.caller_starts_at = queue->second.next_task_reply_position; + + RAY_LOG(INFO) << "Connecting to actor " << actor_id << " at worker " + << WorkerID::FromBinary(address.worker_id()); + ResendOutOfOrderTasks(actor_id); + SendPendingTasks(actor_id); + } + + // NOTE(kfstorm): We need to make sure the lock is released before invoking callbacks. + FailInflightTasks(inflight_task_callbacks); +} + +void CoreWorkerDirectActorTaskSubmitter::DisconnectActor( + const ActorID &actor_id, int64_t num_restarts, bool dead, + const std::shared_ptr &creation_task_exception) { + RAY_LOG(DEBUG) << "Disconnecting from actor " << actor_id; + + std::unordered_map> + inflight_task_callbacks; + + { + absl::MutexLock lock(&mu_); + auto queue = client_queues_.find(actor_id); + RAY_CHECK(queue != client_queues_.end()); + if (!dead) { + RAY_CHECK(num_restarts > 0); + } + if (num_restarts <= queue->second.num_restarts && !dead) { + // This message is about an old version of the actor that has already been + // restarted successfully. Skip the message handling. + RAY_LOG(INFO) + << "Skip actor disconnection that has already been restarted, actor_id=" + << actor_id; + return; + } + + // The actor failed, so erase the client for now. Either the actor is + // permanently dead or the new client will be inserted once the actor is + // restarted. + DisconnectRpcClient(queue->second); + inflight_task_callbacks = std::move(queue->second.inflight_task_callbacks); + queue->second.inflight_task_callbacks.clear(); + + if (dead) { + queue->second.state = rpc::ActorTableData::DEAD; + queue->second.creation_task_exception = creation_task_exception; + // If there are pending requests, treat the pending tasks as failed. + RAY_LOG(INFO) << "Failing pending tasks for actor " << actor_id + << " because the actor is already dead."; + auto &requests = queue->second.requests; + auto head = requests.begin(); + + auto status = Status::IOError("cancelling all pending tasks of dead actor"); + while (head != requests.end()) { + const auto &task_spec = head->second.first; + task_finisher_.MarkTaskCanceled(task_spec.TaskId()); + // No need to increment the number of completed tasks since the actor is + // dead. + RAY_UNUSED(!task_finisher_.PendingTaskFailed(task_spec.TaskId(), + rpc::ErrorType::ACTOR_DIED, &status, + creation_task_exception)); + head = requests.erase(head); + } + + auto &wait_for_death_info_tasks = queue->second.wait_for_death_info_tasks; + + RAY_LOG(INFO) << "Failing tasks waiting for death info, size=" + << wait_for_death_info_tasks.size() << ", actor_id=" << actor_id; + for (auto &net_err_task : wait_for_death_info_tasks) { + RAY_UNUSED(task_finisher_.MarkPendingTaskFailed( + net_err_task.second, rpc::ErrorType::ACTOR_DIED, creation_task_exception)); + } + + // No need to clean up tasks that have been sent and are waiting for + // replies. They will be treated as failed once the connection dies. + // We retain the sequencing information so that we can properly fail + // any tasks submitted after the actor death. + } else if (queue->second.state != rpc::ActorTableData::DEAD) { + // Only update the actor's state if it is not permanently dead. The actor + // will eventually get restarted or marked as permanently dead. + queue->second.state = rpc::ActorTableData::RESTARTING; + queue->second.num_restarts = num_restarts; + } + } + + // NOTE(kfstorm): We need to make sure the lock is released before invoking callbacks. + FailInflightTasks(inflight_task_callbacks); +} + +void CoreWorkerDirectActorTaskSubmitter::CheckTimeoutTasks() { + absl::MutexLock lock(&mu_); + for (auto &queue_pair : client_queues_) { + auto &queue = queue_pair.second; + auto deque_itr = queue.wait_for_death_info_tasks.begin(); + while (deque_itr != queue.wait_for_death_info_tasks.end() && + /*timeout timestamp*/ deque_itr->first < current_time_ms()) { + auto task_spec = deque_itr->second; + task_finisher_.MarkPendingTaskFailed(task_spec, rpc::ErrorType::ACTOR_DIED); + deque_itr = queue.wait_for_death_info_tasks.erase(deque_itr); + } + } +} + +void CoreWorkerDirectActorTaskSubmitter::SendPendingTasks(const ActorID &actor_id) { + auto it = client_queues_.find(actor_id); + RAY_CHECK(it != client_queues_.end()); + if (!it->second.rpc_client) { + return; + } + auto &client_queue = it->second; + + // Check if there is a pending force kill. If there is, send it and disconnect the + // client. + if (client_queue.pending_force_kill) { + RAY_LOG(INFO) << "Sending KillActor request to actor " << actor_id; + // It's okay if this fails because this means the worker is already dead. + client_queue.rpc_client->KillActor(*client_queue.pending_force_kill, nullptr); + client_queue.pending_force_kill.reset(); + } + + // Submit all pending requests. + auto &requests = client_queue.requests; + auto head = requests.begin(); + while (head != requests.end() && + (/*seqno*/ head->first <= client_queue.next_send_position) && + (/*dependencies_resolved*/ head->second.second)) { + // If the task has been sent before, skip the other tasks in the send + // queue. + bool skip_queue = head->first < client_queue.next_send_position; + auto task_spec = std::move(head->second.first); + head = requests.erase(head); + + RAY_CHECK(!client_queue.worker_id.empty()); + PushActorTask(client_queue, task_spec, skip_queue); + client_queue.next_send_position++; + } +} + +void CoreWorkerDirectActorTaskSubmitter::ResendOutOfOrderTasks(const ActorID &actor_id) { + auto it = client_queues_.find(actor_id); + RAY_CHECK(it != client_queues_.end()); + if (!it->second.rpc_client) { + return; + } + auto &client_queue = it->second; + RAY_CHECK(!client_queue.worker_id.empty()); + + for (const auto &completed_task : client_queue.out_of_order_completed_tasks) { + // Making a copy here because we are flipping a flag and the original value is + // const. + auto task_spec = completed_task.second; + task_spec.GetMutableMessage().set_skip_execution(true); + PushActorTask(client_queue, task_spec, /*skip_queue=*/true); + } + client_queue.out_of_order_completed_tasks.clear(); +} + +void CoreWorkerDirectActorTaskSubmitter::PushActorTask(ClientQueue &queue, + const TaskSpecification &task_spec, + bool skip_queue) { + auto request = std::make_unique(); + // NOTE(swang): CopyFrom is needed because if we use Swap here and the task + // fails, then the task data will be gone when the TaskManager attempts to + // access the task. + request->mutable_task_spec()->CopyFrom(task_spec.GetMessage()); + + request->set_intended_worker_id(queue.worker_id); + RAY_CHECK(task_spec.ActorCounter() >= queue.caller_starts_at) + << "actor counter " << task_spec.ActorCounter() << " " << queue.caller_starts_at; + request->set_sequence_number(task_spec.ActorCounter() - queue.caller_starts_at); + + const auto task_id = task_spec.TaskId(); + const auto actor_id = task_spec.ActorId(); + const auto actor_counter = task_spec.ActorCounter(); + const auto task_skipped = task_spec.GetMessage().skip_execution(); + const auto num_queued = + request->sequence_number() - queue.rpc_client->ClientProcessedUpToSeqno(); + RAY_LOG(DEBUG) << "Pushing task " << task_id << " to actor " << actor_id + << " actor counter " << actor_counter << " seq no " + << request->sequence_number() << " num queued " << num_queued; + if (num_queued >= next_queueing_warn_threshold_) { + // TODO(ekl) add more debug info about the actor name, etc. + warn_excess_queueing_(actor_id, num_queued); + next_queueing_warn_threshold_ *= 2; + } + + rpc::Address addr(queue.rpc_client->Addr()); + rpc::ClientCallback reply_callback = + [this, addr, task_id, actor_id, actor_counter, task_spec, task_skipped]( + const Status &status, const rpc::PushTaskReply &reply) { + bool increment_completed_tasks = true; + + if (task_skipped) { + // NOTE(simon):Increment the task counter regardless of the status because the + // reply for a previously completed task. We are not calling CompletePendingTask + // because the tasks are pushed directly to the actor, not placed on any queues + // in task_finisher_. + } else if (status.ok()) { + task_finisher_.CompletePendingTask(task_id, reply, addr); + } else { + // push task failed due to network error. For example, actor is dead + // and no process response for the push task. + absl::MutexLock lock(&mu_); + auto queue_pair = client_queues_.find(actor_id); + RAY_CHECK(queue_pair != client_queues_.end()); + auto &queue = queue_pair->second; + + bool immediately_mark_object_fail = (queue.state == rpc::ActorTableData::DEAD); + bool will_retry = task_finisher_.PendingTaskFailed( + task_id, rpc::ErrorType::ACTOR_DIED, &status, queue.creation_task_exception, + immediately_mark_object_fail); + if (will_retry) { + increment_completed_tasks = false; + } else if (!immediately_mark_object_fail) { + // put it to wait_for_death_info_tasks and wait for Death info + int64_t death_info_timeout_ts = + current_time_ms() + + RayConfig::instance().timeout_ms_task_wait_for_death_info(); + queue.wait_for_death_info_tasks.emplace_back(death_info_timeout_ts, + task_spec); + RAY_LOG(INFO) + << "PushActorTask failed because of network error, this task " + "will be stashed away and waiting for Death info from GCS, task_id=" + << task_spec.TaskId() + << ", wait queue size=" << queue.wait_for_death_info_tasks.size(); + } + } + + if (increment_completed_tasks) { + absl::MutexLock lock(&mu_); + auto queue_pair = client_queues_.find(actor_id); + RAY_CHECK(queue_pair != client_queues_.end()); + auto &queue = queue_pair->second; + + // Try to increment queue.next_task_reply_position consecutively until we + // cannot. In the case of tasks not received in order, the following block + // ensure queue.next_task_reply_position are incremented to the max possible + // value. + queue.out_of_order_completed_tasks.insert({actor_counter, task_spec}); + auto min_completed_task = queue.out_of_order_completed_tasks.begin(); + while (min_completed_task != queue.out_of_order_completed_tasks.end()) { + if (min_completed_task->first == queue.next_task_reply_position) { + queue.next_task_reply_position++; + // increment the iterator and erase the old value + queue.out_of_order_completed_tasks.erase(min_completed_task++); + } else { + break; + } + } + + RAY_LOG(DEBUG) << "Got PushTaskReply for actor " << actor_id + << " with actor_counter " << actor_counter + << " new queue.next_task_reply_position is " + << queue.next_task_reply_position + << " and size of out_of_order_tasks set is " + << queue.out_of_order_completed_tasks.size(); + } + }; + + queue.inflight_task_callbacks.emplace(task_id, std::move(reply_callback)); + rpc::ClientCallback wrapped_callback = + [this, task_id, actor_id](const Status &status, const rpc::PushTaskReply &reply) { + rpc::ClientCallback reply_callback; + { + absl::MutexLock lock(&mu_); + auto it = client_queues_.find(actor_id); + RAY_CHECK(it != client_queues_.end()); + auto &queue = it->second; + auto callback_it = queue.inflight_task_callbacks.find(task_id); + if (callback_it == queue.inflight_task_callbacks.end()) { + RAY_LOG(DEBUG) << "The task " << task_id + << " has already been marked as failed. Ingore the reply."; + return; + } + reply_callback = std::move(callback_it->second); + queue.inflight_task_callbacks.erase(callback_it); + } + reply_callback(status, reply); + }; + + queue.rpc_client->PushActorTask(std::move(request), skip_queue, wrapped_callback); +} + +bool CoreWorkerDirectActorTaskSubmitter::IsActorAlive(const ActorID &actor_id) const { + absl::MutexLock lock(&mu_); + + auto iter = client_queues_.find(actor_id); + return (iter != client_queues_.end() && iter->second.rpc_client); +} + +} // namespace core +} // namespace ray diff --git a/src/ray/core_worker/transport/direct_actor_task_submitter.h b/src/ray/core_worker/transport/direct_actor_task_submitter.h new file mode 100644 index 000000000..6c9dfa157 --- /dev/null +++ b/src/ray/core_worker/transport/direct_actor_task_submitter.h @@ -0,0 +1,290 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "absl/base/thread_annotations.h" +#include "absl/container/flat_hash_map.h" +#include "absl/container/flat_hash_set.h" +#include "absl/synchronization/mutex.h" +#include "ray/common/id.h" +#include "ray/common/ray_object.h" +#include "ray/core_worker/actor_creator.h" +#include "ray/core_worker/context.h" +#include "ray/core_worker/store_provider/memory_store/memory_store.h" +#include "ray/core_worker/transport/dependency_resolver.h" +#include "ray/rpc/worker/core_worker_client.h" + +namespace ray { +namespace core { + +/// In direct actor call task submitter and receiver, a task is directly submitted +/// to the actor that will execute it. + +// Interface for testing. +class CoreWorkerDirectActorTaskSubmitterInterface { + public: + virtual void AddActorQueueIfNotExists(const ActorID &actor_id) = 0; + virtual void ConnectActor(const ActorID &actor_id, const rpc::Address &address, + int64_t num_restarts) = 0; + virtual void DisconnectActor( + const ActorID &actor_id, int64_t num_restarts, bool dead, + const std::shared_ptr &creation_task_exception = nullptr) = 0; + virtual void KillActor(const ActorID &actor_id, bool force_kill, bool no_restart) = 0; + + virtual void CheckTimeoutTasks() = 0; + + virtual ~CoreWorkerDirectActorTaskSubmitterInterface() {} +}; + +// This class is thread-safe. +class CoreWorkerDirectActorTaskSubmitter + : public CoreWorkerDirectActorTaskSubmitterInterface { + public: + CoreWorkerDirectActorTaskSubmitter( + rpc::CoreWorkerClientPool &core_worker_client_pool, CoreWorkerMemoryStore &store, + TaskFinisherInterface &task_finisher, ActorCreatorInterface &actor_creator, + std::function warn_excess_queueing) + : core_worker_client_pool_(core_worker_client_pool), + resolver_(store, task_finisher, actor_creator), + task_finisher_(task_finisher), + warn_excess_queueing_(warn_excess_queueing) { + next_queueing_warn_threshold_ = + ::RayConfig::instance().actor_excess_queueing_warn_threshold(); + } + + /// Add an actor queue. This should be called whenever a reference to an + /// actor is created in the language frontend. + /// TODO(swang): Remove the actor queue once it is sure that this worker will + /// not receive another reference to the same actor. + /// + /// \param[in] actor_id The actor for whom to add a queue. + void AddActorQueueIfNotExists(const ActorID &actor_id); + + /// Submit a task to an actor for execution. + /// + /// \param[in] task The task spec to submit. + /// \return Status::Invalid if the task is not yet supported. + Status SubmitTask(TaskSpecification task_spec); + + /// Tell this actor to exit immediately. + /// + /// \param[in] actor_id The actor_id of the actor to kill. + /// \param[in] force_kill Whether to force kill the actor, or let the actor + /// try a clean exit. + /// \param[in] no_restart If set to true, the killed actor will not be + /// restarted anymore. + void KillActor(const ActorID &actor_id, bool force_kill, bool no_restart); + + /// Create connection to actor and send all pending tasks. + /// + /// \param[in] actor_id Actor ID. + /// \param[in] address The new address of the actor. + /// \param[in] num_restarts How many times this actor has been restarted + /// before. If we've already seen a later incarnation of the actor, we will + /// ignore the command to connect. + void ConnectActor(const ActorID &actor_id, const rpc::Address &address, + int64_t num_restarts); + + /// Disconnect from a failed actor. + /// + /// \param[in] actor_id Actor ID. + /// \param[in] num_restarts How many times this actor has been restarted + /// before. If we've already seen a later incarnation of the actor, we will + /// ignore the command to connect. + /// \param[in] dead Whether the actor is permanently dead. In this case, all + /// pending tasks for the actor should be failed. + /// \param[in] creation_task_exception Reason why the actor is dead, only applies when + /// dead = true. If this arg is set, it means this actor died because of an exception + /// thrown in creation task. + void DisconnectActor( + const ActorID &actor_id, int64_t num_restarts, bool dead, + const std::shared_ptr &creation_task_exception = nullptr); + + /// Set the timerstamp for the caller. + void SetCallerCreationTimestamp(int64_t timestamp); + + /// Check timeout tasks that are waiting for Death info. + void CheckTimeoutTasks(); + + private: + struct ClientQueue { + /// The current state of the actor. If this is ALIVE, then we should have + /// an RPC client to the actor. If this is DEAD, then all tasks in the + /// queue will be marked failed and all other ClientQueue state is ignored. + rpc::ActorTableData::ActorState state = rpc::ActorTableData::DEPENDENCIES_UNREADY; + /// Only applies when state=DEAD. + std::shared_ptr creation_task_exception = nullptr; + /// How many times this actor has been restarted before. Starts at -1 to + /// indicate that the actor is not yet created. This is used to drop stale + /// messages from the GCS. + int64_t num_restarts = -1; + /// The RPC client. We use shared_ptr to enable shared_from_this for + /// pending client callbacks. + std::shared_ptr rpc_client = nullptr; + /// The intended worker ID of the actor. + std::string worker_id = ""; + + /// The actor's pending requests, ordered by the task number (see below + /// diagram) in the request. The bool indicates whether the dependencies + /// for that task have been resolved yet. A task will be sent after its + /// dependencies have been resolved and its task number matches + /// next_send_position. + std::map> requests; + + /// Diagram of the sequence numbers assigned to actor tasks during actor + /// crash and restart: + /// + /// The actor starts, and 10 tasks are submitted. We have sent 6 tasks + /// (0-5) so far, and have received a successful reply for 4 tasks (0-3). + /// 0 1 2 3 4 5 6 7 8 9 + /// ^ next_send_position + /// ^ next_task_reply_position + /// ^ caller_starts_at + /// + /// Suppose the actor crashes and recovers. Then, caller_starts_at is reset + /// to the current next_task_reply_position. caller_starts_at is then subtracted + /// from each task's counter, so the recovered actor will receive the + /// sequence numbers 0, 1, 2 (and so on) for tasks 4, 5, 6, respectively. + /// Therefore, the recovered actor will restart execution from task 4. + /// 0 1 2 3 4 5 6 7 8 9 + /// ^ next_send_position + /// ^ next_task_reply_position + /// ^ caller_starts_at + /// + /// New actor tasks will continue to be sent even while tasks are being + /// resubmitted, but the receiving worker will only execute them after the + /// resent tasks. For example, this diagram shows what happens if task 6 is + /// sent for the first time, tasks 4 and 5 have been resent, and we have + /// received a successful reply for task 4. + /// 0 1 2 3 4 5 6 7 8 9 + /// ^ next_send_position + /// ^ next_task_reply_position + /// ^ caller_starts_at + /// + /// The send position of the next task to send to this actor. This sequence + /// number increases monotonically. + uint64_t next_send_position = 0; + /// The offset at which the the actor should start its counter for this + /// caller. This is used for actors that can be restarted, so that the new + /// instance of the actor knows from which task to start executing. + uint64_t caller_starts_at = 0; + /// Out of the tasks sent by this worker to the actor, the number of tasks + /// that we will never send to the actor again. This is used to reset + /// caller_starts_at if the actor dies and is restarted. We only include + /// tasks that will not be sent again, to support automatic task retry on + /// actor failure. This value only tracks consecutive tasks that are completed. + /// Tasks completed out of order will be cached in out_of_completed_tasks first. + uint64_t next_task_reply_position = 0; + + /// The temporary container for tasks completed out of order. It can happen in + /// async or threaded actor mode. This map is used to store the seqno and task + /// spec for (1) increment next_task_reply_position later when the in order tasks are + /// returned (2) resend the tasks to restarted actor so retried tasks can maintain + /// ordering. + // NOTE(simon): consider absl::btree_set for performance, but it requires updating + // abseil. + std::map out_of_order_completed_tasks; + + /// Tasks that can't be sent because 1) the callee actor is dead. 2) network error. + /// For 1) the task will wait for the DEAD state notification, then mark task as + /// failed using the death_info in notification. For 2) we'll never receive a DEAD + /// notification, in this case we'll wait for a fixed timeout value and then mark it + /// as failed. + /// pair key: timestamp in ms when this task should be considered as timeout. + /// pair value: task specification + std::deque> wait_for_death_info_tasks; + + /// A force-kill request that should be sent to the actor once an RPC + /// client to the actor is available. + absl::optional pending_force_kill; + + /// Stores all callbacks of inflight tasks. Note that this doesn't include tasks + /// without replies. + std::unordered_map> + inflight_task_callbacks; + }; + + /// Push a task to a remote actor via the given client. + /// Note, this function doesn't return any error status code. If an error occurs while + /// sending the request, this task will be treated as failed. + /// + /// \param[in] queue The actor queue. Contains the RPC client state. + /// \param[in] task_spec The task to send. + /// \param[in] skip_queue Whether to skip the task queue. This will send the + /// task for execution immediately. + /// \return Void. + void PushActorTask(ClientQueue &queue, const TaskSpecification &task_spec, + bool skip_queue) EXCLUSIVE_LOCKS_REQUIRED(mu_); + + /// Send all pending tasks for an actor. + /// + /// \param[in] actor_id Actor ID. + /// \return Void. + void SendPendingTasks(const ActorID &actor_id) EXCLUSIVE_LOCKS_REQUIRED(mu_); + + /// Resend all previously-received, out-of-order, received tasks for an actor. + /// When sending these tasks, the tasks will have the flag skip_execution=true. + /// + /// \param[in] actor_id Actor ID. + /// \return Void. + void ResendOutOfOrderTasks(const ActorID &actor_id) EXCLUSIVE_LOCKS_REQUIRED(mu_); + + /// Disconnect the RPC client for an actor. + void DisconnectRpcClient(ClientQueue &queue) EXCLUSIVE_LOCKS_REQUIRED(mu_); + + /// Fail all in-flight tasks. + void FailInflightTasks( + const std::unordered_map> + &inflight_task_callbacks) LOCKS_EXCLUDED(mu_); + + /// Whether the specified actor is alive. + /// + /// \param[in] actor_id The actor ID. + /// \return Whether this actor is alive. + bool IsActorAlive(const ActorID &actor_id) const; + + /// Pool for producing new core worker clients. + rpc::CoreWorkerClientPool &core_worker_client_pool_; + + /// Mutex to protect the various maps below. + mutable absl::Mutex mu_; + + absl::flat_hash_map client_queues_ GUARDED_BY(mu_); + + /// Resolve direct call object dependencies. + LocalDependencyResolver resolver_; + + /// Used to complete tasks. + TaskFinisherInterface &task_finisher_; + + /// Used to warn of excessive queueing. + std::function warn_excess_queueing_; + + /// Warn the next time the number of queued task submissions to an actor + /// exceeds this quantity. This threshold is doubled each time it is hit. + int64_t next_queueing_warn_threshold_; + + friend class CoreWorkerTest; +}; + +} // namespace core +} // namespace ray diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index d08588472..cbd29d454 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -23,475 +23,6 @@ using ray::rpc::ActorTableData; namespace ray { namespace core { -void CoreWorkerDirectActorTaskSubmitter::AddActorQueueIfNotExists( - const ActorID &actor_id) { - absl::MutexLock lock(&mu_); - // No need to check whether the insert was successful, since it is possible - // for this worker to have multiple references to the same actor. - client_queues_.emplace(actor_id, ClientQueue()); -} - -void CoreWorkerDirectActorTaskSubmitter::KillActor(const ActorID &actor_id, - bool force_kill, bool no_restart) { - absl::MutexLock lock(&mu_); - rpc::KillActorRequest request; - request.set_intended_actor_id(actor_id.Binary()); - request.set_force_kill(force_kill); - request.set_no_restart(no_restart); - - auto it = client_queues_.find(actor_id); - // The language frontend can only kill actors that it has a reference to. - RAY_CHECK(it != client_queues_.end()); - - if (!it->second.pending_force_kill) { - it->second.pending_force_kill = request; - } else if (force_kill) { - // Overwrite the previous request to kill the actor if the new request is a - // force kill. - it->second.pending_force_kill->set_force_kill(true); - if (no_restart) { - // Overwrite the previous request to disable restart if the new request's - // no_restart flag is set to true. - it->second.pending_force_kill->set_no_restart(true); - } - } - - SendPendingTasks(actor_id); -} - -Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spec) { - auto task_id = task_spec.TaskId(); - auto actor_id = task_spec.ActorId(); - RAY_LOG(DEBUG) << "Submitting task " << task_id; - RAY_CHECK(task_spec.IsActorTask()); - - bool task_queued = false; - uint64_t send_pos = 0; - { - absl::MutexLock lock(&mu_); - auto queue = client_queues_.find(actor_id); - RAY_CHECK(queue != client_queues_.end()); - if (queue->second.state != rpc::ActorTableData::DEAD) { - // We must fix the send order prior to resolving dependencies, which may - // complete out of order. This ensures that we will not deadlock due to - // backpressure. The receiving actor will execute the tasks according to - // this sequence number. - send_pos = task_spec.ActorCounter(); - auto inserted = - queue->second.requests.emplace(send_pos, std::make_pair(task_spec, false)); - RAY_CHECK(inserted.second); - task_queued = true; - } - } - - if (task_queued) { - // We must release the lock before resolving the task dependencies since - // the callback may get called in the same call stack. - resolver_.ResolveDependencies(task_spec, [this, send_pos, actor_id](Status status) { - absl::MutexLock lock(&mu_); - auto queue = client_queues_.find(actor_id); - RAY_CHECK(queue != client_queues_.end()); - auto it = queue->second.requests.find(send_pos); - // Only dispatch tasks if the submitted task is still queued. The task - // may have been dequeued if the actor has since failed. - if (it != queue->second.requests.end()) { - if (status.ok()) { - it->second.second = true; - SendPendingTasks(actor_id); - } else { - auto task_id = it->second.first.TaskId(); - queue->second.requests.erase(it); - task_finisher_.PendingTaskFailed( - task_id, rpc::ErrorType::DEPENDENCY_RESOLUTION_FAILED, &status); - } - } - }); - } else { - // Do not hold the lock while calling into task_finisher_. - task_finisher_.MarkTaskCanceled(task_id); - std::shared_ptr creation_task_exception = nullptr; - { - absl::MutexLock lock(&mu_); - auto queue = client_queues_.find(task_spec.ActorId()); - creation_task_exception = queue->second.creation_task_exception; - } - auto status = Status::IOError("cancelling task of dead actor"); - // No need to increment the number of completed tasks since the actor is - // dead. - RAY_UNUSED(!task_finisher_.PendingTaskFailed(task_id, rpc::ErrorType::ACTOR_DIED, - &status, creation_task_exception)); - } - - // If the task submission subsequently fails, then the client will receive - // the error in a callback. - return Status::OK(); -} - -void CoreWorkerDirectActorTaskSubmitter::DisconnectRpcClient(ClientQueue &queue) { - queue.rpc_client = nullptr; - core_worker_client_pool_.Disconnect(WorkerID::FromBinary(queue.worker_id)); - queue.worker_id.clear(); - queue.pending_force_kill.reset(); -} - -void CoreWorkerDirectActorTaskSubmitter::FailInflightTasks( - const std::unordered_map> - &inflight_task_callbacks) { - // NOTE(kfstorm): We invoke the callbacks with a bad status to act like there's a - // network issue. We don't call `task_finisher_.PendingTaskFailed` directly because - // there's much more work to do in the callback. - auto status = Status::IOError("Fail all inflight tasks due to actor state change."); - rpc::PushTaskReply reply; - for (const auto &entry : inflight_task_callbacks) { - entry.second(status, reply); - } -} - -void CoreWorkerDirectActorTaskSubmitter::ConnectActor(const ActorID &actor_id, - const rpc::Address &address, - int64_t num_restarts) { - RAY_LOG(DEBUG) << "Connecting to actor " << actor_id << " at worker " - << WorkerID::FromBinary(address.worker_id()); - - std::unordered_map> - inflight_task_callbacks; - - { - absl::MutexLock lock(&mu_); - - auto queue = client_queues_.find(actor_id); - RAY_CHECK(queue != client_queues_.end()); - if (num_restarts < queue->second.num_restarts) { - // This message is about an old version of the actor and the actor has - // already restarted since then. Skip the connection. - RAY_LOG(INFO) << "Skip actor connection that has already been restarted, actor_id=" - << actor_id; - return; - } - - if (queue->second.rpc_client && - queue->second.rpc_client->Addr().ip_address() == address.ip_address() && - queue->second.rpc_client->Addr().port() == address.port()) { - RAY_LOG(DEBUG) << "Skip actor that has already been connected, actor_id=" - << actor_id; - return; - } - - if (queue->second.state == rpc::ActorTableData::DEAD) { - // This message is about an old version of the actor and the actor has - // already died since then. Skip the connection. - return; - } - - queue->second.num_restarts = num_restarts; - if (queue->second.rpc_client) { - // Clear the client to the old version of the actor. - DisconnectRpcClient(queue->second); - inflight_task_callbacks = std::move(queue->second.inflight_task_callbacks); - queue->second.inflight_task_callbacks.clear(); - } - - queue->second.state = rpc::ActorTableData::ALIVE; - // Update the mapping so new RPCs go out with the right intended worker id. - queue->second.worker_id = address.worker_id(); - // Create a new connection to the actor. - queue->second.rpc_client = core_worker_client_pool_.GetOrConnect(address); - // This assumes that all replies from the previous incarnation - // of the actor have been received. This assumption should be OK - // because we fail all inflight tasks in `DisconnectRpcClient`. - RAY_LOG(DEBUG) << "Resetting caller starts at for actor " << actor_id << " from " - << queue->second.caller_starts_at << " to " - << queue->second.next_task_reply_position; - queue->second.caller_starts_at = queue->second.next_task_reply_position; - - RAY_LOG(INFO) << "Connecting to actor " << actor_id << " at worker " - << WorkerID::FromBinary(address.worker_id()); - ResendOutOfOrderTasks(actor_id); - SendPendingTasks(actor_id); - } - - // NOTE(kfstorm): We need to make sure the lock is released before invoking callbacks. - FailInflightTasks(inflight_task_callbacks); -} - -void CoreWorkerDirectActorTaskSubmitter::DisconnectActor( - const ActorID &actor_id, int64_t num_restarts, bool dead, - const std::shared_ptr &creation_task_exception) { - RAY_LOG(DEBUG) << "Disconnecting from actor " << actor_id; - - std::unordered_map> - inflight_task_callbacks; - - { - absl::MutexLock lock(&mu_); - auto queue = client_queues_.find(actor_id); - RAY_CHECK(queue != client_queues_.end()); - if (!dead) { - RAY_CHECK(num_restarts > 0); - } - if (num_restarts <= queue->second.num_restarts && !dead) { - // This message is about an old version of the actor that has already been - // restarted successfully. Skip the message handling. - RAY_LOG(INFO) - << "Skip actor disconnection that has already been restarted, actor_id=" - << actor_id; - return; - } - - // The actor failed, so erase the client for now. Either the actor is - // permanently dead or the new client will be inserted once the actor is - // restarted. - DisconnectRpcClient(queue->second); - inflight_task_callbacks = std::move(queue->second.inflight_task_callbacks); - queue->second.inflight_task_callbacks.clear(); - - if (dead) { - queue->second.state = rpc::ActorTableData::DEAD; - queue->second.creation_task_exception = creation_task_exception; - // If there are pending requests, treat the pending tasks as failed. - RAY_LOG(INFO) << "Failing pending tasks for actor " << actor_id - << " because the actor is already dead."; - auto &requests = queue->second.requests; - auto head = requests.begin(); - - auto status = Status::IOError("cancelling all pending tasks of dead actor"); - while (head != requests.end()) { - const auto &task_spec = head->second.first; - task_finisher_.MarkTaskCanceled(task_spec.TaskId()); - // No need to increment the number of completed tasks since the actor is - // dead. - RAY_UNUSED(!task_finisher_.PendingTaskFailed(task_spec.TaskId(), - rpc::ErrorType::ACTOR_DIED, &status, - creation_task_exception)); - head = requests.erase(head); - } - - auto &wait_for_death_info_tasks = queue->second.wait_for_death_info_tasks; - - RAY_LOG(INFO) << "Failing tasks waiting for death info, size=" - << wait_for_death_info_tasks.size() << ", actor_id=" << actor_id; - for (auto &net_err_task : wait_for_death_info_tasks) { - RAY_UNUSED(task_finisher_.MarkPendingTaskFailed( - net_err_task.second, rpc::ErrorType::ACTOR_DIED, creation_task_exception)); - } - - // No need to clean up tasks that have been sent and are waiting for - // replies. They will be treated as failed once the connection dies. - // We retain the sequencing information so that we can properly fail - // any tasks submitted after the actor death. - } else if (queue->second.state != rpc::ActorTableData::DEAD) { - // Only update the actor's state if it is not permanently dead. The actor - // will eventually get restarted or marked as permanently dead. - queue->second.state = rpc::ActorTableData::RESTARTING; - queue->second.num_restarts = num_restarts; - } - } - - // NOTE(kfstorm): We need to make sure the lock is released before invoking callbacks. - FailInflightTasks(inflight_task_callbacks); -} - -void CoreWorkerDirectActorTaskSubmitter::CheckTimeoutTasks() { - absl::MutexLock lock(&mu_); - for (auto &queue_pair : client_queues_) { - auto &queue = queue_pair.second; - auto deque_itr = queue.wait_for_death_info_tasks.begin(); - while (deque_itr != queue.wait_for_death_info_tasks.end() && - /*timeout timestamp*/ deque_itr->first < current_time_ms()) { - auto task_spec = deque_itr->second; - task_finisher_.MarkPendingTaskFailed(task_spec, rpc::ErrorType::ACTOR_DIED); - deque_itr = queue.wait_for_death_info_tasks.erase(deque_itr); - } - } -} - -void CoreWorkerDirectActorTaskSubmitter::SendPendingTasks(const ActorID &actor_id) { - auto it = client_queues_.find(actor_id); - RAY_CHECK(it != client_queues_.end()); - if (!it->second.rpc_client) { - return; - } - auto &client_queue = it->second; - - // Check if there is a pending force kill. If there is, send it and disconnect the - // client. - if (client_queue.pending_force_kill) { - RAY_LOG(INFO) << "Sending KillActor request to actor " << actor_id; - // It's okay if this fails because this means the worker is already dead. - client_queue.rpc_client->KillActor(*client_queue.pending_force_kill, nullptr); - client_queue.pending_force_kill.reset(); - } - - // Submit all pending requests. - auto &requests = client_queue.requests; - auto head = requests.begin(); - while (head != requests.end() && - (/*seqno*/ head->first <= client_queue.next_send_position) && - (/*dependencies_resolved*/ head->second.second)) { - // If the task has been sent before, skip the other tasks in the send - // queue. - bool skip_queue = head->first < client_queue.next_send_position; - auto task_spec = std::move(head->second.first); - head = requests.erase(head); - - RAY_CHECK(!client_queue.worker_id.empty()); - PushActorTask(client_queue, task_spec, skip_queue); - client_queue.next_send_position++; - } -} - -void CoreWorkerDirectActorTaskSubmitter::ResendOutOfOrderTasks(const ActorID &actor_id) { - auto it = client_queues_.find(actor_id); - RAY_CHECK(it != client_queues_.end()); - if (!it->second.rpc_client) { - return; - } - auto &client_queue = it->second; - RAY_CHECK(!client_queue.worker_id.empty()); - - for (const auto &completed_task : client_queue.out_of_order_completed_tasks) { - // Making a copy here because we are flipping a flag and the original value is - // const. - auto task_spec = completed_task.second; - task_spec.GetMutableMessage().set_skip_execution(true); - PushActorTask(client_queue, task_spec, /*skip_queue=*/true); - } - client_queue.out_of_order_completed_tasks.clear(); -} - -void CoreWorkerDirectActorTaskSubmitter::PushActorTask(ClientQueue &queue, - const TaskSpecification &task_spec, - bool skip_queue) { - auto request = std::make_unique(); - // NOTE(swang): CopyFrom is needed because if we use Swap here and the task - // fails, then the task data will be gone when the TaskManager attempts to - // access the task. - request->mutable_task_spec()->CopyFrom(task_spec.GetMessage()); - - request->set_intended_worker_id(queue.worker_id); - RAY_CHECK(task_spec.ActorCounter() >= queue.caller_starts_at) - << "actor counter " << task_spec.ActorCounter() << " " << queue.caller_starts_at; - request->set_sequence_number(task_spec.ActorCounter() - queue.caller_starts_at); - - const auto task_id = task_spec.TaskId(); - const auto actor_id = task_spec.ActorId(); - const auto actor_counter = task_spec.ActorCounter(); - const auto task_skipped = task_spec.GetMessage().skip_execution(); - const auto num_queued = - request->sequence_number() - queue.rpc_client->ClientProcessedUpToSeqno(); - RAY_LOG(DEBUG) << "Pushing task " << task_id << " to actor " << actor_id - << " actor counter " << actor_counter << " seq no " - << request->sequence_number() << " num queued " << num_queued; - if (num_queued >= next_queueing_warn_threshold_) { - // TODO(ekl) add more debug info about the actor name, etc. - warn_excess_queueing_(actor_id, num_queued); - next_queueing_warn_threshold_ *= 2; - } - - rpc::Address addr(queue.rpc_client->Addr()); - rpc::ClientCallback reply_callback = - [this, addr, task_id, actor_id, actor_counter, task_spec, task_skipped]( - const Status &status, const rpc::PushTaskReply &reply) { - bool increment_completed_tasks = true; - - if (task_skipped) { - // NOTE(simon):Increment the task counter regardless of the status because the - // reply for a previously completed task. We are not calling CompletePendingTask - // because the tasks are pushed directly to the actor, not placed on any queues - // in task_finisher_. - } else if (status.ok()) { - task_finisher_.CompletePendingTask(task_id, reply, addr); - } else { - // push task failed due to network error. For example, actor is dead - // and no process response for the push task. - absl::MutexLock lock(&mu_); - auto queue_pair = client_queues_.find(actor_id); - RAY_CHECK(queue_pair != client_queues_.end()); - auto &queue = queue_pair->second; - - bool immediately_mark_object_fail = (queue.state == rpc::ActorTableData::DEAD); - bool will_retry = task_finisher_.PendingTaskFailed( - task_id, rpc::ErrorType::ACTOR_DIED, &status, queue.creation_task_exception, - immediately_mark_object_fail); - if (will_retry) { - increment_completed_tasks = false; - } else if (!immediately_mark_object_fail) { - // put it to wait_for_death_info_tasks and wait for Death info - int64_t death_info_timeout_ts = - current_time_ms() + - RayConfig::instance().timeout_ms_task_wait_for_death_info(); - queue.wait_for_death_info_tasks.emplace_back(death_info_timeout_ts, - task_spec); - RAY_LOG(INFO) - << "PushActorTask failed because of network error, this task " - "will be stashed away and waiting for Death info from GCS, task_id=" - << task_spec.TaskId() - << ", wait queue size=" << queue.wait_for_death_info_tasks.size(); - } - } - - if (increment_completed_tasks) { - absl::MutexLock lock(&mu_); - auto queue_pair = client_queues_.find(actor_id); - RAY_CHECK(queue_pair != client_queues_.end()); - auto &queue = queue_pair->second; - - // Try to increment queue.next_task_reply_position consecutively until we - // cannot. In the case of tasks not received in order, the following block - // ensure queue.next_task_reply_position are incremented to the max possible - // value. - queue.out_of_order_completed_tasks.insert({actor_counter, task_spec}); - auto min_completed_task = queue.out_of_order_completed_tasks.begin(); - while (min_completed_task != queue.out_of_order_completed_tasks.end()) { - if (min_completed_task->first == queue.next_task_reply_position) { - queue.next_task_reply_position++; - // increment the iterator and erase the old value - queue.out_of_order_completed_tasks.erase(min_completed_task++); - } else { - break; - } - } - - RAY_LOG(DEBUG) << "Got PushTaskReply for actor " << actor_id - << " with actor_counter " << actor_counter - << " new queue.next_task_reply_position is " - << queue.next_task_reply_position - << " and size of out_of_order_tasks set is " - << queue.out_of_order_completed_tasks.size(); - } - }; - - queue.inflight_task_callbacks.emplace(task_id, std::move(reply_callback)); - rpc::ClientCallback wrapped_callback = - [this, task_id, actor_id](const Status &status, const rpc::PushTaskReply &reply) { - rpc::ClientCallback reply_callback; - { - absl::MutexLock lock(&mu_); - auto it = client_queues_.find(actor_id); - RAY_CHECK(it != client_queues_.end()); - auto &queue = it->second; - auto callback_it = queue.inflight_task_callbacks.find(task_id); - if (callback_it == queue.inflight_task_callbacks.end()) { - RAY_LOG(DEBUG) << "The task " << task_id - << " has already been marked as failed. Ingore the reply."; - return; - } - reply_callback = std::move(callback_it->second); - queue.inflight_task_callbacks.erase(callback_it); - } - reply_callback(status, reply); - }; - - queue.rpc_client->PushActorTask(std::move(request), skip_queue, wrapped_callback); -} - -bool CoreWorkerDirectActorTaskSubmitter::IsActorAlive(const ActorID &actor_id) const { - absl::MutexLock lock(&mu_); - - auto iter = client_queues_.find(actor_id); - return (iter != client_queues_.end() && iter->second.rpc_client); -} - void CoreWorkerDirectTaskReceiver::Init( std::shared_ptr client_pool, rpc::Address rpc_address, std::shared_ptr dependency_waiter) { diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index 162e9dd52..8f7f21dcc 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -34,6 +34,7 @@ #include "ray/core_worker/store_provider/memory_store/memory_store.h" #include "ray/core_worker/task_manager.h" #include "ray/core_worker/transport/dependency_resolver.h" +#include "ray/core_worker/transport/direct_actor_task_submitter.h" #include "ray/rpc/grpc_server.h" #include "ray/rpc/worker/core_worker_client.h" @@ -43,256 +44,6 @@ namespace core { /// The max time to wait for out-of-order tasks. const int kMaxReorderWaitSeconds = 30; -/// In direct actor call task submitter and receiver, a task is directly submitted -/// to the actor that will execute it. - -// Interface for testing. -class CoreWorkerDirectActorTaskSubmitterInterface { - public: - virtual void AddActorQueueIfNotExists(const ActorID &actor_id) = 0; - virtual void ConnectActor(const ActorID &actor_id, const rpc::Address &address, - int64_t num_restarts) = 0; - virtual void DisconnectActor( - const ActorID &actor_id, int64_t num_restarts, bool dead, - const std::shared_ptr &creation_task_exception = nullptr) = 0; - virtual void KillActor(const ActorID &actor_id, bool force_kill, bool no_restart) = 0; - - virtual void CheckTimeoutTasks() = 0; - - virtual ~CoreWorkerDirectActorTaskSubmitterInterface() {} -}; - -// This class is thread-safe. -class CoreWorkerDirectActorTaskSubmitter - : public CoreWorkerDirectActorTaskSubmitterInterface { - public: - CoreWorkerDirectActorTaskSubmitter( - rpc::CoreWorkerClientPool &core_worker_client_pool, CoreWorkerMemoryStore &store, - TaskFinisherInterface &task_finisher, ActorCreatorInterface &actor_creator, - std::function warn_excess_queueing) - : core_worker_client_pool_(core_worker_client_pool), - resolver_(store, task_finisher, actor_creator), - task_finisher_(task_finisher), - warn_excess_queueing_(warn_excess_queueing) { - next_queueing_warn_threshold_ = - ::RayConfig::instance().actor_excess_queueing_warn_threshold(); - } - - /// Add an actor queue. This should be called whenever a reference to an - /// actor is created in the language frontend. - /// TODO(swang): Remove the actor queue once it is sure that this worker will - /// not receive another reference to the same actor. - /// - /// \param[in] actor_id The actor for whom to add a queue. - void AddActorQueueIfNotExists(const ActorID &actor_id); - - /// Submit a task to an actor for execution. - /// - /// \param[in] task The task spec to submit. - /// \return Status::Invalid if the task is not yet supported. - Status SubmitTask(TaskSpecification task_spec); - - /// Tell this actor to exit immediately. - /// - /// \param[in] actor_id The actor_id of the actor to kill. - /// \param[in] force_kill Whether to force kill the actor, or let the actor - /// try a clean exit. - /// \param[in] no_restart If set to true, the killed actor will not be - /// restarted anymore. - void KillActor(const ActorID &actor_id, bool force_kill, bool no_restart); - - /// Create connection to actor and send all pending tasks. - /// - /// \param[in] actor_id Actor ID. - /// \param[in] address The new address of the actor. - /// \param[in] num_restarts How many times this actor has been restarted - /// before. If we've already seen a later incarnation of the actor, we will - /// ignore the command to connect. - void ConnectActor(const ActorID &actor_id, const rpc::Address &address, - int64_t num_restarts); - - /// Disconnect from a failed actor. - /// - /// \param[in] actor_id Actor ID. - /// \param[in] num_restarts How many times this actor has been restarted - /// before. If we've already seen a later incarnation of the actor, we will - /// ignore the command to connect. - /// \param[in] dead Whether the actor is permanently dead. In this case, all - /// pending tasks for the actor should be failed. - /// \param[in] creation_task_exception Reason why the actor is dead, only applies when - /// dead = true. If this arg is set, it means this actor died because of an exception - /// thrown in creation task. - void DisconnectActor( - const ActorID &actor_id, int64_t num_restarts, bool dead, - const std::shared_ptr &creation_task_exception = nullptr); - - /// Set the timerstamp for the caller. - void SetCallerCreationTimestamp(int64_t timestamp); - - /// Check timeout tasks that are waiting for Death info. - void CheckTimeoutTasks(); - - private: - struct ClientQueue { - /// The current state of the actor. If this is ALIVE, then we should have - /// an RPC client to the actor. If this is DEAD, then all tasks in the - /// queue will be marked failed and all other ClientQueue state is ignored. - rpc::ActorTableData::ActorState state = rpc::ActorTableData::DEPENDENCIES_UNREADY; - /// Only applies when state=DEAD. - std::shared_ptr creation_task_exception = nullptr; - /// How many times this actor has been restarted before. Starts at -1 to - /// indicate that the actor is not yet created. This is used to drop stale - /// messages from the GCS. - int64_t num_restarts = -1; - /// The RPC client. We use shared_ptr to enable shared_from_this for - /// pending client callbacks. - std::shared_ptr rpc_client = nullptr; - /// The intended worker ID of the actor. - std::string worker_id = ""; - - /// The actor's pending requests, ordered by the task number (see below - /// diagram) in the request. The bool indicates whether the dependencies - /// for that task have been resolved yet. A task will be sent after its - /// dependencies have been resolved and its task number matches - /// next_send_position. - std::map> requests; - - /// Diagram of the sequence numbers assigned to actor tasks during actor - /// crash and restart: - /// - /// The actor starts, and 10 tasks are submitted. We have sent 6 tasks - /// (0-5) so far, and have received a successful reply for 4 tasks (0-3). - /// 0 1 2 3 4 5 6 7 8 9 - /// ^ next_send_position - /// ^ next_task_reply_position - /// ^ caller_starts_at - /// - /// Suppose the actor crashes and recovers. Then, caller_starts_at is reset - /// to the current next_task_reply_position. caller_starts_at is then subtracted - /// from each task's counter, so the recovered actor will receive the - /// sequence numbers 0, 1, 2 (and so on) for tasks 4, 5, 6, respectively. - /// Therefore, the recovered actor will restart execution from task 4. - /// 0 1 2 3 4 5 6 7 8 9 - /// ^ next_send_position - /// ^ next_task_reply_position - /// ^ caller_starts_at - /// - /// New actor tasks will continue to be sent even while tasks are being - /// resubmitted, but the receiving worker will only execute them after the - /// resent tasks. For example, this diagram shows what happens if task 6 is - /// sent for the first time, tasks 4 and 5 have been resent, and we have - /// received a successful reply for task 4. - /// 0 1 2 3 4 5 6 7 8 9 - /// ^ next_send_position - /// ^ next_task_reply_position - /// ^ caller_starts_at - /// - /// The send position of the next task to send to this actor. This sequence - /// number increases monotonically. - uint64_t next_send_position = 0; - /// The offset at which the the actor should start its counter for this - /// caller. This is used for actors that can be restarted, so that the new - /// instance of the actor knows from which task to start executing. - uint64_t caller_starts_at = 0; - /// Out of the tasks sent by this worker to the actor, the number of tasks - /// that we will never send to the actor again. This is used to reset - /// caller_starts_at if the actor dies and is restarted. We only include - /// tasks that will not be sent again, to support automatic task retry on - /// actor failure. This value only tracks consecutive tasks that are completed. - /// Tasks completed out of order will be cached in out_of_completed_tasks first. - uint64_t next_task_reply_position = 0; - - /// The temporary container for tasks completed out of order. It can happen in - /// async or threaded actor mode. This map is used to store the seqno and task - /// spec for (1) increment next_task_reply_position later when the in order tasks are - /// returned (2) resend the tasks to restarted actor so retried tasks can maintain - /// ordering. - // NOTE(simon): consider absl::btree_set for performance, but it requires updating - // abseil. - std::map out_of_order_completed_tasks; - - /// Tasks that can't be sent because 1) the callee actor is dead. 2) network error. - /// For 1) the task will wait for the DEAD state notification, then mark task as - /// failed using the death_info in notification. For 2) we'll never receive a DEAD - /// notification, in this case we'll wait for a fixed timeout value and then mark it - /// as failed. - /// pair key: timestamp in ms when this task should be considered as timeout. - /// pair value: task specification - std::deque> wait_for_death_info_tasks; - - /// A force-kill request that should be sent to the actor once an RPC - /// client to the actor is available. - absl::optional pending_force_kill; - - /// Stores all callbacks of inflight tasks. Note that this doesn't include tasks - /// without replies. - std::unordered_map> - inflight_task_callbacks; - }; - - /// Push a task to a remote actor via the given client. - /// Note, this function doesn't return any error status code. If an error occurs while - /// sending the request, this task will be treated as failed. - /// - /// \param[in] queue The actor queue. Contains the RPC client state. - /// \param[in] task_spec The task to send. - /// \param[in] skip_queue Whether to skip the task queue. This will send the - /// task for execution immediately. - /// \return Void. - void PushActorTask(ClientQueue &queue, const TaskSpecification &task_spec, - bool skip_queue) EXCLUSIVE_LOCKS_REQUIRED(mu_); - - /// Send all pending tasks for an actor. - /// - /// \param[in] actor_id Actor ID. - /// \return Void. - void SendPendingTasks(const ActorID &actor_id) EXCLUSIVE_LOCKS_REQUIRED(mu_); - - /// Resend all previously-received, out-of-order, received tasks for an actor. - /// When sending these tasks, the tasks will have the flag skip_execution=true. - /// - /// \param[in] actor_id Actor ID. - /// \return Void. - void ResendOutOfOrderTasks(const ActorID &actor_id) EXCLUSIVE_LOCKS_REQUIRED(mu_); - - /// Disconnect the RPC client for an actor. - void DisconnectRpcClient(ClientQueue &queue) EXCLUSIVE_LOCKS_REQUIRED(mu_); - - /// Fail all in-flight tasks. - void FailInflightTasks( - const std::unordered_map> - &inflight_task_callbacks) LOCKS_EXCLUDED(mu_); - - /// Whether the specified actor is alive. - /// - /// \param[in] actor_id The actor ID. - /// \return Whether this actor is alive. - bool IsActorAlive(const ActorID &actor_id) const; - - /// Pool for producing new core worker clients. - rpc::CoreWorkerClientPool &core_worker_client_pool_; - - /// Mutex to protect the various maps below. - mutable absl::Mutex mu_; - - absl::flat_hash_map client_queues_ GUARDED_BY(mu_); - - /// Resolve direct call object dependencies. - LocalDependencyResolver resolver_; - - /// Used to complete tasks. - TaskFinisherInterface &task_finisher_; - - /// Used to warn of excessive queueing. - std::function warn_excess_queueing_; - - /// Warn the next time the number of queued task submissions to an actor - /// exceeds this quantity. This threshold is doubled each time it is hit. - int64_t next_queueing_warn_threshold_; - - friend class CoreWorkerTest; -}; - /// The class that manages fiber states for Python asyncio actors. /// /// We'll create one fiber state for every concurrency group. And