[Core][actor out-of-order execution 1/n] Move CoreWorkerDirectActorTaskSubmitter into a separate file

This is the stack of PRs that supports out or order execution for threaded/async actors. Next PR #20149
At a high level, threaded actor/async actor already don't guarantee execution order, and the current "sequential" order implementation has caused some confusion and inconvenience. Please refer to #19822 for detailed discussion.

The major changes of this stack is

introduce OutOfOrderActorSubmitQueue ([Core][actor out-of-order execution 3/n] Introducing out-of-order actor submit queue #20150)
Specifically, we have a per-client task submit queue, which guarantees the sequential order of task submission. In [Core][actor out-of-order execution 3/n] Introducing out-of-order actor submit queue #20150 we introduce OutOfOrderActorSubmitQueue, which relaxes the guarantee; it send the task over the network as soon as its dependency is resolved.
-- there are 2 PRs ([Core][actor out-of-order execution 1/n] Move CoreWorkerDirectActorTaskSubmitter into a separate file #20148 and [Core][actor out-of-order execution 2/n] create abstraction for the queuing logic on the client/actor submission. #20149) precedes this PR, which refactor the actor submission logic to make the abstraction possible.

OutOfOrderActorSchedullingQueue ([Core][actor out-of-order execution 5/n] implement out-of-order scheduling queue #20176)
Similarly, we also have a per-client task scheduling queue on the actor to ensure tasks are executed according to the submission order (sequence_no). OutOfOrderActorSchedullingQueue relaxes the guarantee by enqueuing the task as soon as all their dependencies are resolved.
-- there is one PR ([Core][actor out-of-order execution 4/n] refactor the actor receiver code #20160) precedes this PR, which refactor the actor scheduling logic to make it the code easier to read; however this one is optional.

plumbing PR ([Core][actor out-of-order execution 6/n] plumbing work to make it work e2e #20177)
This PR enables the out of order execution by introducing options(execute_out_of_order=True); and create actor client/server components according to the configuration.

There are something the PR hasn't touched upon is the restart/retry guarantees, which might need some discussion.

This PR we separate client from server code for Actor task submission. This makes the follow up change easier.
This commit is contained in:
Chen Shen 2021-11-12 09:28:58 -08:00 committed by GitHub
parent 3b62388a9a
commit a617cb8813
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 787 additions and 719 deletions

View file

@ -0,0 +1,496 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/core_worker/transport/direct_actor_task_submitter.h"
#include <thread>
#include "ray/common/task/task.h"
using ray::rpc::ActorTableData;
namespace ray {
namespace core {
void CoreWorkerDirectActorTaskSubmitter::AddActorQueueIfNotExists(
const ActorID &actor_id) {
absl::MutexLock lock(&mu_);
// No need to check whether the insert was successful, since it is possible
// for this worker to have multiple references to the same actor.
client_queues_.emplace(actor_id, ClientQueue());
}
void CoreWorkerDirectActorTaskSubmitter::KillActor(const ActorID &actor_id,
bool force_kill, bool no_restart) {
absl::MutexLock lock(&mu_);
rpc::KillActorRequest request;
request.set_intended_actor_id(actor_id.Binary());
request.set_force_kill(force_kill);
request.set_no_restart(no_restart);
auto it = client_queues_.find(actor_id);
// The language frontend can only kill actors that it has a reference to.
RAY_CHECK(it != client_queues_.end());
if (!it->second.pending_force_kill) {
it->second.pending_force_kill = request;
} else if (force_kill) {
// Overwrite the previous request to kill the actor if the new request is a
// force kill.
it->second.pending_force_kill->set_force_kill(true);
if (no_restart) {
// Overwrite the previous request to disable restart if the new request's
// no_restart flag is set to true.
it->second.pending_force_kill->set_no_restart(true);
}
}
SendPendingTasks(actor_id);
}
Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spec) {
auto task_id = task_spec.TaskId();
auto actor_id = task_spec.ActorId();
RAY_LOG(DEBUG) << "Submitting task " << task_id;
RAY_CHECK(task_spec.IsActorTask());
bool task_queued = false;
uint64_t send_pos = 0;
{
absl::MutexLock lock(&mu_);
auto queue = client_queues_.find(actor_id);
RAY_CHECK(queue != client_queues_.end());
if (queue->second.state != rpc::ActorTableData::DEAD) {
// We must fix the send order prior to resolving dependencies, which may
// complete out of order. This ensures that we will not deadlock due to
// backpressure. The receiving actor will execute the tasks according to
// this sequence number.
send_pos = task_spec.ActorCounter();
auto inserted =
queue->second.requests.emplace(send_pos, std::make_pair(task_spec, false));
RAY_CHECK(inserted.second);
task_queued = true;
}
}
if (task_queued) {
// We must release the lock before resolving the task dependencies since
// the callback may get called in the same call stack.
resolver_.ResolveDependencies(task_spec, [this, send_pos, actor_id](Status status) {
absl::MutexLock lock(&mu_);
auto queue = client_queues_.find(actor_id);
RAY_CHECK(queue != client_queues_.end());
auto it = queue->second.requests.find(send_pos);
// Only dispatch tasks if the submitted task is still queued. The task
// may have been dequeued if the actor has since failed.
if (it != queue->second.requests.end()) {
if (status.ok()) {
it->second.second = true;
SendPendingTasks(actor_id);
} else {
auto task_id = it->second.first.TaskId();
queue->second.requests.erase(it);
task_finisher_.PendingTaskFailed(
task_id, rpc::ErrorType::DEPENDENCY_RESOLUTION_FAILED, &status);
}
}
});
} else {
// Do not hold the lock while calling into task_finisher_.
task_finisher_.MarkTaskCanceled(task_id);
std::shared_ptr<rpc::RayException> creation_task_exception = nullptr;
{
absl::MutexLock lock(&mu_);
auto queue = client_queues_.find(task_spec.ActorId());
creation_task_exception = queue->second.creation_task_exception;
}
auto status = Status::IOError("cancelling task of dead actor");
// No need to increment the number of completed tasks since the actor is
// dead.
RAY_UNUSED(!task_finisher_.PendingTaskFailed(task_id, rpc::ErrorType::ACTOR_DIED,
&status, creation_task_exception));
}
// If the task submission subsequently fails, then the client will receive
// the error in a callback.
return Status::OK();
}
void CoreWorkerDirectActorTaskSubmitter::DisconnectRpcClient(ClientQueue &queue) {
queue.rpc_client = nullptr;
core_worker_client_pool_.Disconnect(WorkerID::FromBinary(queue.worker_id));
queue.worker_id.clear();
queue.pending_force_kill.reset();
}
void CoreWorkerDirectActorTaskSubmitter::FailInflightTasks(
const std::unordered_map<TaskID, rpc::ClientCallback<rpc::PushTaskReply>>
&inflight_task_callbacks) {
// NOTE(kfstorm): We invoke the callbacks with a bad status to act like there's a
// network issue. We don't call `task_finisher_.PendingTaskFailed` directly because
// there's much more work to do in the callback.
auto status = Status::IOError("Fail all inflight tasks due to actor state change.");
rpc::PushTaskReply reply;
for (const auto &entry : inflight_task_callbacks) {
entry.second(status, reply);
}
}
void CoreWorkerDirectActorTaskSubmitter::ConnectActor(const ActorID &actor_id,
const rpc::Address &address,
int64_t num_restarts) {
RAY_LOG(DEBUG) << "Connecting to actor " << actor_id << " at worker "
<< WorkerID::FromBinary(address.worker_id());
std::unordered_map<TaskID, rpc::ClientCallback<rpc::PushTaskReply>>
inflight_task_callbacks;
{
absl::MutexLock lock(&mu_);
auto queue = client_queues_.find(actor_id);
RAY_CHECK(queue != client_queues_.end());
if (num_restarts < queue->second.num_restarts) {
// This message is about an old version of the actor and the actor has
// already restarted since then. Skip the connection.
RAY_LOG(INFO) << "Skip actor connection that has already been restarted, actor_id="
<< actor_id;
return;
}
if (queue->second.rpc_client &&
queue->second.rpc_client->Addr().ip_address() == address.ip_address() &&
queue->second.rpc_client->Addr().port() == address.port()) {
RAY_LOG(DEBUG) << "Skip actor that has already been connected, actor_id="
<< actor_id;
return;
}
if (queue->second.state == rpc::ActorTableData::DEAD) {
// This message is about an old version of the actor and the actor has
// already died since then. Skip the connection.
return;
}
queue->second.num_restarts = num_restarts;
if (queue->second.rpc_client) {
// Clear the client to the old version of the actor.
DisconnectRpcClient(queue->second);
inflight_task_callbacks = std::move(queue->second.inflight_task_callbacks);
queue->second.inflight_task_callbacks.clear();
}
queue->second.state = rpc::ActorTableData::ALIVE;
// Update the mapping so new RPCs go out with the right intended worker id.
queue->second.worker_id = address.worker_id();
// Create a new connection to the actor.
queue->second.rpc_client = core_worker_client_pool_.GetOrConnect(address);
// This assumes that all replies from the previous incarnation
// of the actor have been received. This assumption should be OK
// because we fail all inflight tasks in `DisconnectRpcClient`.
RAY_LOG(DEBUG) << "Resetting caller starts at for actor " << actor_id << " from "
<< queue->second.caller_starts_at << " to "
<< queue->second.next_task_reply_position;
queue->second.caller_starts_at = queue->second.next_task_reply_position;
RAY_LOG(INFO) << "Connecting to actor " << actor_id << " at worker "
<< WorkerID::FromBinary(address.worker_id());
ResendOutOfOrderTasks(actor_id);
SendPendingTasks(actor_id);
}
// NOTE(kfstorm): We need to make sure the lock is released before invoking callbacks.
FailInflightTasks(inflight_task_callbacks);
}
void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(
const ActorID &actor_id, int64_t num_restarts, bool dead,
const std::shared_ptr<rpc::RayException> &creation_task_exception) {
RAY_LOG(DEBUG) << "Disconnecting from actor " << actor_id;
std::unordered_map<TaskID, rpc::ClientCallback<rpc::PushTaskReply>>
inflight_task_callbacks;
{
absl::MutexLock lock(&mu_);
auto queue = client_queues_.find(actor_id);
RAY_CHECK(queue != client_queues_.end());
if (!dead) {
RAY_CHECK(num_restarts > 0);
}
if (num_restarts <= queue->second.num_restarts && !dead) {
// This message is about an old version of the actor that has already been
// restarted successfully. Skip the message handling.
RAY_LOG(INFO)
<< "Skip actor disconnection that has already been restarted, actor_id="
<< actor_id;
return;
}
// The actor failed, so erase the client for now. Either the actor is
// permanently dead or the new client will be inserted once the actor is
// restarted.
DisconnectRpcClient(queue->second);
inflight_task_callbacks = std::move(queue->second.inflight_task_callbacks);
queue->second.inflight_task_callbacks.clear();
if (dead) {
queue->second.state = rpc::ActorTableData::DEAD;
queue->second.creation_task_exception = creation_task_exception;
// If there are pending requests, treat the pending tasks as failed.
RAY_LOG(INFO) << "Failing pending tasks for actor " << actor_id
<< " because the actor is already dead.";
auto &requests = queue->second.requests;
auto head = requests.begin();
auto status = Status::IOError("cancelling all pending tasks of dead actor");
while (head != requests.end()) {
const auto &task_spec = head->second.first;
task_finisher_.MarkTaskCanceled(task_spec.TaskId());
// No need to increment the number of completed tasks since the actor is
// dead.
RAY_UNUSED(!task_finisher_.PendingTaskFailed(task_spec.TaskId(),
rpc::ErrorType::ACTOR_DIED, &status,
creation_task_exception));
head = requests.erase(head);
}
auto &wait_for_death_info_tasks = queue->second.wait_for_death_info_tasks;
RAY_LOG(INFO) << "Failing tasks waiting for death info, size="
<< wait_for_death_info_tasks.size() << ", actor_id=" << actor_id;
for (auto &net_err_task : wait_for_death_info_tasks) {
RAY_UNUSED(task_finisher_.MarkPendingTaskFailed(
net_err_task.second, rpc::ErrorType::ACTOR_DIED, creation_task_exception));
}
// No need to clean up tasks that have been sent and are waiting for
// replies. They will be treated as failed once the connection dies.
// We retain the sequencing information so that we can properly fail
// any tasks submitted after the actor death.
} else if (queue->second.state != rpc::ActorTableData::DEAD) {
// Only update the actor's state if it is not permanently dead. The actor
// will eventually get restarted or marked as permanently dead.
queue->second.state = rpc::ActorTableData::RESTARTING;
queue->second.num_restarts = num_restarts;
}
}
// NOTE(kfstorm): We need to make sure the lock is released before invoking callbacks.
FailInflightTasks(inflight_task_callbacks);
}
void CoreWorkerDirectActorTaskSubmitter::CheckTimeoutTasks() {
absl::MutexLock lock(&mu_);
for (auto &queue_pair : client_queues_) {
auto &queue = queue_pair.second;
auto deque_itr = queue.wait_for_death_info_tasks.begin();
while (deque_itr != queue.wait_for_death_info_tasks.end() &&
/*timeout timestamp*/ deque_itr->first < current_time_ms()) {
auto task_spec = deque_itr->second;
task_finisher_.MarkPendingTaskFailed(task_spec, rpc::ErrorType::ACTOR_DIED);
deque_itr = queue.wait_for_death_info_tasks.erase(deque_itr);
}
}
}
void CoreWorkerDirectActorTaskSubmitter::SendPendingTasks(const ActorID &actor_id) {
auto it = client_queues_.find(actor_id);
RAY_CHECK(it != client_queues_.end());
if (!it->second.rpc_client) {
return;
}
auto &client_queue = it->second;
// Check if there is a pending force kill. If there is, send it and disconnect the
// client.
if (client_queue.pending_force_kill) {
RAY_LOG(INFO) << "Sending KillActor request to actor " << actor_id;
// It's okay if this fails because this means the worker is already dead.
client_queue.rpc_client->KillActor(*client_queue.pending_force_kill, nullptr);
client_queue.pending_force_kill.reset();
}
// Submit all pending requests.
auto &requests = client_queue.requests;
auto head = requests.begin();
while (head != requests.end() &&
(/*seqno*/ head->first <= client_queue.next_send_position) &&
(/*dependencies_resolved*/ head->second.second)) {
// If the task has been sent before, skip the other tasks in the send
// queue.
bool skip_queue = head->first < client_queue.next_send_position;
auto task_spec = std::move(head->second.first);
head = requests.erase(head);
RAY_CHECK(!client_queue.worker_id.empty());
PushActorTask(client_queue, task_spec, skip_queue);
client_queue.next_send_position++;
}
}
void CoreWorkerDirectActorTaskSubmitter::ResendOutOfOrderTasks(const ActorID &actor_id) {
auto it = client_queues_.find(actor_id);
RAY_CHECK(it != client_queues_.end());
if (!it->second.rpc_client) {
return;
}
auto &client_queue = it->second;
RAY_CHECK(!client_queue.worker_id.empty());
for (const auto &completed_task : client_queue.out_of_order_completed_tasks) {
// Making a copy here because we are flipping a flag and the original value is
// const.
auto task_spec = completed_task.second;
task_spec.GetMutableMessage().set_skip_execution(true);
PushActorTask(client_queue, task_spec, /*skip_queue=*/true);
}
client_queue.out_of_order_completed_tasks.clear();
}
void CoreWorkerDirectActorTaskSubmitter::PushActorTask(ClientQueue &queue,
const TaskSpecification &task_spec,
bool skip_queue) {
auto request = std::make_unique<rpc::PushTaskRequest>();
// NOTE(swang): CopyFrom is needed because if we use Swap here and the task
// fails, then the task data will be gone when the TaskManager attempts to
// access the task.
request->mutable_task_spec()->CopyFrom(task_spec.GetMessage());
request->set_intended_worker_id(queue.worker_id);
RAY_CHECK(task_spec.ActorCounter() >= queue.caller_starts_at)
<< "actor counter " << task_spec.ActorCounter() << " " << queue.caller_starts_at;
request->set_sequence_number(task_spec.ActorCounter() - queue.caller_starts_at);
const auto task_id = task_spec.TaskId();
const auto actor_id = task_spec.ActorId();
const auto actor_counter = task_spec.ActorCounter();
const auto task_skipped = task_spec.GetMessage().skip_execution();
const auto num_queued =
request->sequence_number() - queue.rpc_client->ClientProcessedUpToSeqno();
RAY_LOG(DEBUG) << "Pushing task " << task_id << " to actor " << actor_id
<< " actor counter " << actor_counter << " seq no "
<< request->sequence_number() << " num queued " << num_queued;
if (num_queued >= next_queueing_warn_threshold_) {
// TODO(ekl) add more debug info about the actor name, etc.
warn_excess_queueing_(actor_id, num_queued);
next_queueing_warn_threshold_ *= 2;
}
rpc::Address addr(queue.rpc_client->Addr());
rpc::ClientCallback<rpc::PushTaskReply> reply_callback =
[this, addr, task_id, actor_id, actor_counter, task_spec, task_skipped](
const Status &status, const rpc::PushTaskReply &reply) {
bool increment_completed_tasks = true;
if (task_skipped) {
// NOTE(simon):Increment the task counter regardless of the status because the
// reply for a previously completed task. We are not calling CompletePendingTask
// because the tasks are pushed directly to the actor, not placed on any queues
// in task_finisher_.
} else if (status.ok()) {
task_finisher_.CompletePendingTask(task_id, reply, addr);
} else {
// push task failed due to network error. For example, actor is dead
// and no process response for the push task.
absl::MutexLock lock(&mu_);
auto queue_pair = client_queues_.find(actor_id);
RAY_CHECK(queue_pair != client_queues_.end());
auto &queue = queue_pair->second;
bool immediately_mark_object_fail = (queue.state == rpc::ActorTableData::DEAD);
bool will_retry = task_finisher_.PendingTaskFailed(
task_id, rpc::ErrorType::ACTOR_DIED, &status, queue.creation_task_exception,
immediately_mark_object_fail);
if (will_retry) {
increment_completed_tasks = false;
} else if (!immediately_mark_object_fail) {
// put it to wait_for_death_info_tasks and wait for Death info
int64_t death_info_timeout_ts =
current_time_ms() +
RayConfig::instance().timeout_ms_task_wait_for_death_info();
queue.wait_for_death_info_tasks.emplace_back(death_info_timeout_ts,
task_spec);
RAY_LOG(INFO)
<< "PushActorTask failed because of network error, this task "
"will be stashed away and waiting for Death info from GCS, task_id="
<< task_spec.TaskId()
<< ", wait queue size=" << queue.wait_for_death_info_tasks.size();
}
}
if (increment_completed_tasks) {
absl::MutexLock lock(&mu_);
auto queue_pair = client_queues_.find(actor_id);
RAY_CHECK(queue_pair != client_queues_.end());
auto &queue = queue_pair->second;
// Try to increment queue.next_task_reply_position consecutively until we
// cannot. In the case of tasks not received in order, the following block
// ensure queue.next_task_reply_position are incremented to the max possible
// value.
queue.out_of_order_completed_tasks.insert({actor_counter, task_spec});
auto min_completed_task = queue.out_of_order_completed_tasks.begin();
while (min_completed_task != queue.out_of_order_completed_tasks.end()) {
if (min_completed_task->first == queue.next_task_reply_position) {
queue.next_task_reply_position++;
// increment the iterator and erase the old value
queue.out_of_order_completed_tasks.erase(min_completed_task++);
} else {
break;
}
}
RAY_LOG(DEBUG) << "Got PushTaskReply for actor " << actor_id
<< " with actor_counter " << actor_counter
<< " new queue.next_task_reply_position is "
<< queue.next_task_reply_position
<< " and size of out_of_order_tasks set is "
<< queue.out_of_order_completed_tasks.size();
}
};
queue.inflight_task_callbacks.emplace(task_id, std::move(reply_callback));
rpc::ClientCallback<rpc::PushTaskReply> wrapped_callback =
[this, task_id, actor_id](const Status &status, const rpc::PushTaskReply &reply) {
rpc::ClientCallback<rpc::PushTaskReply> reply_callback;
{
absl::MutexLock lock(&mu_);
auto it = client_queues_.find(actor_id);
RAY_CHECK(it != client_queues_.end());
auto &queue = it->second;
auto callback_it = queue.inflight_task_callbacks.find(task_id);
if (callback_it == queue.inflight_task_callbacks.end()) {
RAY_LOG(DEBUG) << "The task " << task_id
<< " has already been marked as failed. Ingore the reply.";
return;
}
reply_callback = std::move(callback_it->second);
queue.inflight_task_callbacks.erase(callback_it);
}
reply_callback(status, reply);
};
queue.rpc_client->PushActorTask(std::move(request), skip_queue, wrapped_callback);
}
bool CoreWorkerDirectActorTaskSubmitter::IsActorAlive(const ActorID &actor_id) const {
absl::MutexLock lock(&mu_);
auto iter = client_queues_.find(actor_id);
return (iter != client_queues_.end() && iter->second.rpc_client);
}
} // namespace core
} // namespace ray

View file

@ -0,0 +1,290 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <boost/asio/thread_pool.hpp>
#include <boost/thread.hpp>
#include <list>
#include <queue>
#include <set>
#include <utility>
#include "absl/base/thread_annotations.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/synchronization/mutex.h"
#include "ray/common/id.h"
#include "ray/common/ray_object.h"
#include "ray/core_worker/actor_creator.h"
#include "ray/core_worker/context.h"
#include "ray/core_worker/store_provider/memory_store/memory_store.h"
#include "ray/core_worker/transport/dependency_resolver.h"
#include "ray/rpc/worker/core_worker_client.h"
namespace ray {
namespace core {
/// In direct actor call task submitter and receiver, a task is directly submitted
/// to the actor that will execute it.
// Interface for testing.
class CoreWorkerDirectActorTaskSubmitterInterface {
public:
virtual void AddActorQueueIfNotExists(const ActorID &actor_id) = 0;
virtual void ConnectActor(const ActorID &actor_id, const rpc::Address &address,
int64_t num_restarts) = 0;
virtual void DisconnectActor(
const ActorID &actor_id, int64_t num_restarts, bool dead,
const std::shared_ptr<rpc::RayException> &creation_task_exception = nullptr) = 0;
virtual void KillActor(const ActorID &actor_id, bool force_kill, bool no_restart) = 0;
virtual void CheckTimeoutTasks() = 0;
virtual ~CoreWorkerDirectActorTaskSubmitterInterface() {}
};
// This class is thread-safe.
class CoreWorkerDirectActorTaskSubmitter
: public CoreWorkerDirectActorTaskSubmitterInterface {
public:
CoreWorkerDirectActorTaskSubmitter(
rpc::CoreWorkerClientPool &core_worker_client_pool, CoreWorkerMemoryStore &store,
TaskFinisherInterface &task_finisher, ActorCreatorInterface &actor_creator,
std::function<void(const ActorID &, int64_t)> warn_excess_queueing)
: core_worker_client_pool_(core_worker_client_pool),
resolver_(store, task_finisher, actor_creator),
task_finisher_(task_finisher),
warn_excess_queueing_(warn_excess_queueing) {
next_queueing_warn_threshold_ =
::RayConfig::instance().actor_excess_queueing_warn_threshold();
}
/// Add an actor queue. This should be called whenever a reference to an
/// actor is created in the language frontend.
/// TODO(swang): Remove the actor queue once it is sure that this worker will
/// not receive another reference to the same actor.
///
/// \param[in] actor_id The actor for whom to add a queue.
void AddActorQueueIfNotExists(const ActorID &actor_id);
/// Submit a task to an actor for execution.
///
/// \param[in] task The task spec to submit.
/// \return Status::Invalid if the task is not yet supported.
Status SubmitTask(TaskSpecification task_spec);
/// Tell this actor to exit immediately.
///
/// \param[in] actor_id The actor_id of the actor to kill.
/// \param[in] force_kill Whether to force kill the actor, or let the actor
/// try a clean exit.
/// \param[in] no_restart If set to true, the killed actor will not be
/// restarted anymore.
void KillActor(const ActorID &actor_id, bool force_kill, bool no_restart);
/// Create connection to actor and send all pending tasks.
///
/// \param[in] actor_id Actor ID.
/// \param[in] address The new address of the actor.
/// \param[in] num_restarts How many times this actor has been restarted
/// before. If we've already seen a later incarnation of the actor, we will
/// ignore the command to connect.
void ConnectActor(const ActorID &actor_id, const rpc::Address &address,
int64_t num_restarts);
/// Disconnect from a failed actor.
///
/// \param[in] actor_id Actor ID.
/// \param[in] num_restarts How many times this actor has been restarted
/// before. If we've already seen a later incarnation of the actor, we will
/// ignore the command to connect.
/// \param[in] dead Whether the actor is permanently dead. In this case, all
/// pending tasks for the actor should be failed.
/// \param[in] creation_task_exception Reason why the actor is dead, only applies when
/// dead = true. If this arg is set, it means this actor died because of an exception
/// thrown in creation task.
void DisconnectActor(
const ActorID &actor_id, int64_t num_restarts, bool dead,
const std::shared_ptr<rpc::RayException> &creation_task_exception = nullptr);
/// Set the timerstamp for the caller.
void SetCallerCreationTimestamp(int64_t timestamp);
/// Check timeout tasks that are waiting for Death info.
void CheckTimeoutTasks();
private:
struct ClientQueue {
/// The current state of the actor. If this is ALIVE, then we should have
/// an RPC client to the actor. If this is DEAD, then all tasks in the
/// queue will be marked failed and all other ClientQueue state is ignored.
rpc::ActorTableData::ActorState state = rpc::ActorTableData::DEPENDENCIES_UNREADY;
/// Only applies when state=DEAD.
std::shared_ptr<rpc::RayException> creation_task_exception = nullptr;
/// How many times this actor has been restarted before. Starts at -1 to
/// indicate that the actor is not yet created. This is used to drop stale
/// messages from the GCS.
int64_t num_restarts = -1;
/// The RPC client. We use shared_ptr to enable shared_from_this for
/// pending client callbacks.
std::shared_ptr<rpc::CoreWorkerClientInterface> rpc_client = nullptr;
/// The intended worker ID of the actor.
std::string worker_id = "";
/// The actor's pending requests, ordered by the task number (see below
/// diagram) in the request. The bool indicates whether the dependencies
/// for that task have been resolved yet. A task will be sent after its
/// dependencies have been resolved and its task number matches
/// next_send_position.
std::map<uint64_t, std::pair<TaskSpecification, bool>> requests;
/// Diagram of the sequence numbers assigned to actor tasks during actor
/// crash and restart:
///
/// The actor starts, and 10 tasks are submitted. We have sent 6 tasks
/// (0-5) so far, and have received a successful reply for 4 tasks (0-3).
/// 0 1 2 3 4 5 6 7 8 9
/// ^ next_send_position
/// ^ next_task_reply_position
/// ^ caller_starts_at
///
/// Suppose the actor crashes and recovers. Then, caller_starts_at is reset
/// to the current next_task_reply_position. caller_starts_at is then subtracted
/// from each task's counter, so the recovered actor will receive the
/// sequence numbers 0, 1, 2 (and so on) for tasks 4, 5, 6, respectively.
/// Therefore, the recovered actor will restart execution from task 4.
/// 0 1 2 3 4 5 6 7 8 9
/// ^ next_send_position
/// ^ next_task_reply_position
/// ^ caller_starts_at
///
/// New actor tasks will continue to be sent even while tasks are being
/// resubmitted, but the receiving worker will only execute them after the
/// resent tasks. For example, this diagram shows what happens if task 6 is
/// sent for the first time, tasks 4 and 5 have been resent, and we have
/// received a successful reply for task 4.
/// 0 1 2 3 4 5 6 7 8 9
/// ^ next_send_position
/// ^ next_task_reply_position
/// ^ caller_starts_at
///
/// The send position of the next task to send to this actor. This sequence
/// number increases monotonically.
uint64_t next_send_position = 0;
/// The offset at which the the actor should start its counter for this
/// caller. This is used for actors that can be restarted, so that the new
/// instance of the actor knows from which task to start executing.
uint64_t caller_starts_at = 0;
/// Out of the tasks sent by this worker to the actor, the number of tasks
/// that we will never send to the actor again. This is used to reset
/// caller_starts_at if the actor dies and is restarted. We only include
/// tasks that will not be sent again, to support automatic task retry on
/// actor failure. This value only tracks consecutive tasks that are completed.
/// Tasks completed out of order will be cached in out_of_completed_tasks first.
uint64_t next_task_reply_position = 0;
/// The temporary container for tasks completed out of order. It can happen in
/// async or threaded actor mode. This map is used to store the seqno and task
/// spec for (1) increment next_task_reply_position later when the in order tasks are
/// returned (2) resend the tasks to restarted actor so retried tasks can maintain
/// ordering.
// NOTE(simon): consider absl::btree_set for performance, but it requires updating
// abseil.
std::map<uint64_t, TaskSpecification> out_of_order_completed_tasks;
/// Tasks that can't be sent because 1) the callee actor is dead. 2) network error.
/// For 1) the task will wait for the DEAD state notification, then mark task as
/// failed using the death_info in notification. For 2) we'll never receive a DEAD
/// notification, in this case we'll wait for a fixed timeout value and then mark it
/// as failed.
/// pair key: timestamp in ms when this task should be considered as timeout.
/// pair value: task specification
std::deque<std::pair<int64_t, TaskSpecification>> wait_for_death_info_tasks;
/// A force-kill request that should be sent to the actor once an RPC
/// client to the actor is available.
absl::optional<rpc::KillActorRequest> pending_force_kill;
/// Stores all callbacks of inflight tasks. Note that this doesn't include tasks
/// without replies.
std::unordered_map<TaskID, rpc::ClientCallback<rpc::PushTaskReply>>
inflight_task_callbacks;
};
/// Push a task to a remote actor via the given client.
/// Note, this function doesn't return any error status code. If an error occurs while
/// sending the request, this task will be treated as failed.
///
/// \param[in] queue The actor queue. Contains the RPC client state.
/// \param[in] task_spec The task to send.
/// \param[in] skip_queue Whether to skip the task queue. This will send the
/// task for execution immediately.
/// \return Void.
void PushActorTask(ClientQueue &queue, const TaskSpecification &task_spec,
bool skip_queue) EXCLUSIVE_LOCKS_REQUIRED(mu_);
/// Send all pending tasks for an actor.
///
/// \param[in] actor_id Actor ID.
/// \return Void.
void SendPendingTasks(const ActorID &actor_id) EXCLUSIVE_LOCKS_REQUIRED(mu_);
/// Resend all previously-received, out-of-order, received tasks for an actor.
/// When sending these tasks, the tasks will have the flag skip_execution=true.
///
/// \param[in] actor_id Actor ID.
/// \return Void.
void ResendOutOfOrderTasks(const ActorID &actor_id) EXCLUSIVE_LOCKS_REQUIRED(mu_);
/// Disconnect the RPC client for an actor.
void DisconnectRpcClient(ClientQueue &queue) EXCLUSIVE_LOCKS_REQUIRED(mu_);
/// Fail all in-flight tasks.
void FailInflightTasks(
const std::unordered_map<TaskID, rpc::ClientCallback<rpc::PushTaskReply>>
&inflight_task_callbacks) LOCKS_EXCLUDED(mu_);
/// Whether the specified actor is alive.
///
/// \param[in] actor_id The actor ID.
/// \return Whether this actor is alive.
bool IsActorAlive(const ActorID &actor_id) const;
/// Pool for producing new core worker clients.
rpc::CoreWorkerClientPool &core_worker_client_pool_;
/// Mutex to protect the various maps below.
mutable absl::Mutex mu_;
absl::flat_hash_map<ActorID, ClientQueue> client_queues_ GUARDED_BY(mu_);
/// Resolve direct call object dependencies.
LocalDependencyResolver resolver_;
/// Used to complete tasks.
TaskFinisherInterface &task_finisher_;
/// Used to warn of excessive queueing.
std::function<void(const ActorID &, int64_t num_queued)> warn_excess_queueing_;
/// Warn the next time the number of queued task submissions to an actor
/// exceeds this quantity. This threshold is doubled each time it is hit.
int64_t next_queueing_warn_threshold_;
friend class CoreWorkerTest;
};
} // namespace core
} // namespace ray

View file

@ -23,475 +23,6 @@ using ray::rpc::ActorTableData;
namespace ray {
namespace core {
void CoreWorkerDirectActorTaskSubmitter::AddActorQueueIfNotExists(
const ActorID &actor_id) {
absl::MutexLock lock(&mu_);
// No need to check whether the insert was successful, since it is possible
// for this worker to have multiple references to the same actor.
client_queues_.emplace(actor_id, ClientQueue());
}
void CoreWorkerDirectActorTaskSubmitter::KillActor(const ActorID &actor_id,
bool force_kill, bool no_restart) {
absl::MutexLock lock(&mu_);
rpc::KillActorRequest request;
request.set_intended_actor_id(actor_id.Binary());
request.set_force_kill(force_kill);
request.set_no_restart(no_restart);
auto it = client_queues_.find(actor_id);
// The language frontend can only kill actors that it has a reference to.
RAY_CHECK(it != client_queues_.end());
if (!it->second.pending_force_kill) {
it->second.pending_force_kill = request;
} else if (force_kill) {
// Overwrite the previous request to kill the actor if the new request is a
// force kill.
it->second.pending_force_kill->set_force_kill(true);
if (no_restart) {
// Overwrite the previous request to disable restart if the new request's
// no_restart flag is set to true.
it->second.pending_force_kill->set_no_restart(true);
}
}
SendPendingTasks(actor_id);
}
Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spec) {
auto task_id = task_spec.TaskId();
auto actor_id = task_spec.ActorId();
RAY_LOG(DEBUG) << "Submitting task " << task_id;
RAY_CHECK(task_spec.IsActorTask());
bool task_queued = false;
uint64_t send_pos = 0;
{
absl::MutexLock lock(&mu_);
auto queue = client_queues_.find(actor_id);
RAY_CHECK(queue != client_queues_.end());
if (queue->second.state != rpc::ActorTableData::DEAD) {
// We must fix the send order prior to resolving dependencies, which may
// complete out of order. This ensures that we will not deadlock due to
// backpressure. The receiving actor will execute the tasks according to
// this sequence number.
send_pos = task_spec.ActorCounter();
auto inserted =
queue->second.requests.emplace(send_pos, std::make_pair(task_spec, false));
RAY_CHECK(inserted.second);
task_queued = true;
}
}
if (task_queued) {
// We must release the lock before resolving the task dependencies since
// the callback may get called in the same call stack.
resolver_.ResolveDependencies(task_spec, [this, send_pos, actor_id](Status status) {
absl::MutexLock lock(&mu_);
auto queue = client_queues_.find(actor_id);
RAY_CHECK(queue != client_queues_.end());
auto it = queue->second.requests.find(send_pos);
// Only dispatch tasks if the submitted task is still queued. The task
// may have been dequeued if the actor has since failed.
if (it != queue->second.requests.end()) {
if (status.ok()) {
it->second.second = true;
SendPendingTasks(actor_id);
} else {
auto task_id = it->second.first.TaskId();
queue->second.requests.erase(it);
task_finisher_.PendingTaskFailed(
task_id, rpc::ErrorType::DEPENDENCY_RESOLUTION_FAILED, &status);
}
}
});
} else {
// Do not hold the lock while calling into task_finisher_.
task_finisher_.MarkTaskCanceled(task_id);
std::shared_ptr<rpc::RayException> creation_task_exception = nullptr;
{
absl::MutexLock lock(&mu_);
auto queue = client_queues_.find(task_spec.ActorId());
creation_task_exception = queue->second.creation_task_exception;
}
auto status = Status::IOError("cancelling task of dead actor");
// No need to increment the number of completed tasks since the actor is
// dead.
RAY_UNUSED(!task_finisher_.PendingTaskFailed(task_id, rpc::ErrorType::ACTOR_DIED,
&status, creation_task_exception));
}
// If the task submission subsequently fails, then the client will receive
// the error in a callback.
return Status::OK();
}
void CoreWorkerDirectActorTaskSubmitter::DisconnectRpcClient(ClientQueue &queue) {
queue.rpc_client = nullptr;
core_worker_client_pool_.Disconnect(WorkerID::FromBinary(queue.worker_id));
queue.worker_id.clear();
queue.pending_force_kill.reset();
}
void CoreWorkerDirectActorTaskSubmitter::FailInflightTasks(
const std::unordered_map<TaskID, rpc::ClientCallback<rpc::PushTaskReply>>
&inflight_task_callbacks) {
// NOTE(kfstorm): We invoke the callbacks with a bad status to act like there's a
// network issue. We don't call `task_finisher_.PendingTaskFailed` directly because
// there's much more work to do in the callback.
auto status = Status::IOError("Fail all inflight tasks due to actor state change.");
rpc::PushTaskReply reply;
for (const auto &entry : inflight_task_callbacks) {
entry.second(status, reply);
}
}
void CoreWorkerDirectActorTaskSubmitter::ConnectActor(const ActorID &actor_id,
const rpc::Address &address,
int64_t num_restarts) {
RAY_LOG(DEBUG) << "Connecting to actor " << actor_id << " at worker "
<< WorkerID::FromBinary(address.worker_id());
std::unordered_map<TaskID, rpc::ClientCallback<rpc::PushTaskReply>>
inflight_task_callbacks;
{
absl::MutexLock lock(&mu_);
auto queue = client_queues_.find(actor_id);
RAY_CHECK(queue != client_queues_.end());
if (num_restarts < queue->second.num_restarts) {
// This message is about an old version of the actor and the actor has
// already restarted since then. Skip the connection.
RAY_LOG(INFO) << "Skip actor connection that has already been restarted, actor_id="
<< actor_id;
return;
}
if (queue->second.rpc_client &&
queue->second.rpc_client->Addr().ip_address() == address.ip_address() &&
queue->second.rpc_client->Addr().port() == address.port()) {
RAY_LOG(DEBUG) << "Skip actor that has already been connected, actor_id="
<< actor_id;
return;
}
if (queue->second.state == rpc::ActorTableData::DEAD) {
// This message is about an old version of the actor and the actor has
// already died since then. Skip the connection.
return;
}
queue->second.num_restarts = num_restarts;
if (queue->second.rpc_client) {
// Clear the client to the old version of the actor.
DisconnectRpcClient(queue->second);
inflight_task_callbacks = std::move(queue->second.inflight_task_callbacks);
queue->second.inflight_task_callbacks.clear();
}
queue->second.state = rpc::ActorTableData::ALIVE;
// Update the mapping so new RPCs go out with the right intended worker id.
queue->second.worker_id = address.worker_id();
// Create a new connection to the actor.
queue->second.rpc_client = core_worker_client_pool_.GetOrConnect(address);
// This assumes that all replies from the previous incarnation
// of the actor have been received. This assumption should be OK
// because we fail all inflight tasks in `DisconnectRpcClient`.
RAY_LOG(DEBUG) << "Resetting caller starts at for actor " << actor_id << " from "
<< queue->second.caller_starts_at << " to "
<< queue->second.next_task_reply_position;
queue->second.caller_starts_at = queue->second.next_task_reply_position;
RAY_LOG(INFO) << "Connecting to actor " << actor_id << " at worker "
<< WorkerID::FromBinary(address.worker_id());
ResendOutOfOrderTasks(actor_id);
SendPendingTasks(actor_id);
}
// NOTE(kfstorm): We need to make sure the lock is released before invoking callbacks.
FailInflightTasks(inflight_task_callbacks);
}
void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(
const ActorID &actor_id, int64_t num_restarts, bool dead,
const std::shared_ptr<rpc::RayException> &creation_task_exception) {
RAY_LOG(DEBUG) << "Disconnecting from actor " << actor_id;
std::unordered_map<TaskID, rpc::ClientCallback<rpc::PushTaskReply>>
inflight_task_callbacks;
{
absl::MutexLock lock(&mu_);
auto queue = client_queues_.find(actor_id);
RAY_CHECK(queue != client_queues_.end());
if (!dead) {
RAY_CHECK(num_restarts > 0);
}
if (num_restarts <= queue->second.num_restarts && !dead) {
// This message is about an old version of the actor that has already been
// restarted successfully. Skip the message handling.
RAY_LOG(INFO)
<< "Skip actor disconnection that has already been restarted, actor_id="
<< actor_id;
return;
}
// The actor failed, so erase the client for now. Either the actor is
// permanently dead or the new client will be inserted once the actor is
// restarted.
DisconnectRpcClient(queue->second);
inflight_task_callbacks = std::move(queue->second.inflight_task_callbacks);
queue->second.inflight_task_callbacks.clear();
if (dead) {
queue->second.state = rpc::ActorTableData::DEAD;
queue->second.creation_task_exception = creation_task_exception;
// If there are pending requests, treat the pending tasks as failed.
RAY_LOG(INFO) << "Failing pending tasks for actor " << actor_id
<< " because the actor is already dead.";
auto &requests = queue->second.requests;
auto head = requests.begin();
auto status = Status::IOError("cancelling all pending tasks of dead actor");
while (head != requests.end()) {
const auto &task_spec = head->second.first;
task_finisher_.MarkTaskCanceled(task_spec.TaskId());
// No need to increment the number of completed tasks since the actor is
// dead.
RAY_UNUSED(!task_finisher_.PendingTaskFailed(task_spec.TaskId(),
rpc::ErrorType::ACTOR_DIED, &status,
creation_task_exception));
head = requests.erase(head);
}
auto &wait_for_death_info_tasks = queue->second.wait_for_death_info_tasks;
RAY_LOG(INFO) << "Failing tasks waiting for death info, size="
<< wait_for_death_info_tasks.size() << ", actor_id=" << actor_id;
for (auto &net_err_task : wait_for_death_info_tasks) {
RAY_UNUSED(task_finisher_.MarkPendingTaskFailed(
net_err_task.second, rpc::ErrorType::ACTOR_DIED, creation_task_exception));
}
// No need to clean up tasks that have been sent and are waiting for
// replies. They will be treated as failed once the connection dies.
// We retain the sequencing information so that we can properly fail
// any tasks submitted after the actor death.
} else if (queue->second.state != rpc::ActorTableData::DEAD) {
// Only update the actor's state if it is not permanently dead. The actor
// will eventually get restarted or marked as permanently dead.
queue->second.state = rpc::ActorTableData::RESTARTING;
queue->second.num_restarts = num_restarts;
}
}
// NOTE(kfstorm): We need to make sure the lock is released before invoking callbacks.
FailInflightTasks(inflight_task_callbacks);
}
void CoreWorkerDirectActorTaskSubmitter::CheckTimeoutTasks() {
absl::MutexLock lock(&mu_);
for (auto &queue_pair : client_queues_) {
auto &queue = queue_pair.second;
auto deque_itr = queue.wait_for_death_info_tasks.begin();
while (deque_itr != queue.wait_for_death_info_tasks.end() &&
/*timeout timestamp*/ deque_itr->first < current_time_ms()) {
auto task_spec = deque_itr->second;
task_finisher_.MarkPendingTaskFailed(task_spec, rpc::ErrorType::ACTOR_DIED);
deque_itr = queue.wait_for_death_info_tasks.erase(deque_itr);
}
}
}
void CoreWorkerDirectActorTaskSubmitter::SendPendingTasks(const ActorID &actor_id) {
auto it = client_queues_.find(actor_id);
RAY_CHECK(it != client_queues_.end());
if (!it->second.rpc_client) {
return;
}
auto &client_queue = it->second;
// Check if there is a pending force kill. If there is, send it and disconnect the
// client.
if (client_queue.pending_force_kill) {
RAY_LOG(INFO) << "Sending KillActor request to actor " << actor_id;
// It's okay if this fails because this means the worker is already dead.
client_queue.rpc_client->KillActor(*client_queue.pending_force_kill, nullptr);
client_queue.pending_force_kill.reset();
}
// Submit all pending requests.
auto &requests = client_queue.requests;
auto head = requests.begin();
while (head != requests.end() &&
(/*seqno*/ head->first <= client_queue.next_send_position) &&
(/*dependencies_resolved*/ head->second.second)) {
// If the task has been sent before, skip the other tasks in the send
// queue.
bool skip_queue = head->first < client_queue.next_send_position;
auto task_spec = std::move(head->second.first);
head = requests.erase(head);
RAY_CHECK(!client_queue.worker_id.empty());
PushActorTask(client_queue, task_spec, skip_queue);
client_queue.next_send_position++;
}
}
void CoreWorkerDirectActorTaskSubmitter::ResendOutOfOrderTasks(const ActorID &actor_id) {
auto it = client_queues_.find(actor_id);
RAY_CHECK(it != client_queues_.end());
if (!it->second.rpc_client) {
return;
}
auto &client_queue = it->second;
RAY_CHECK(!client_queue.worker_id.empty());
for (const auto &completed_task : client_queue.out_of_order_completed_tasks) {
// Making a copy here because we are flipping a flag and the original value is
// const.
auto task_spec = completed_task.second;
task_spec.GetMutableMessage().set_skip_execution(true);
PushActorTask(client_queue, task_spec, /*skip_queue=*/true);
}
client_queue.out_of_order_completed_tasks.clear();
}
void CoreWorkerDirectActorTaskSubmitter::PushActorTask(ClientQueue &queue,
const TaskSpecification &task_spec,
bool skip_queue) {
auto request = std::make_unique<rpc::PushTaskRequest>();
// NOTE(swang): CopyFrom is needed because if we use Swap here and the task
// fails, then the task data will be gone when the TaskManager attempts to
// access the task.
request->mutable_task_spec()->CopyFrom(task_spec.GetMessage());
request->set_intended_worker_id(queue.worker_id);
RAY_CHECK(task_spec.ActorCounter() >= queue.caller_starts_at)
<< "actor counter " << task_spec.ActorCounter() << " " << queue.caller_starts_at;
request->set_sequence_number(task_spec.ActorCounter() - queue.caller_starts_at);
const auto task_id = task_spec.TaskId();
const auto actor_id = task_spec.ActorId();
const auto actor_counter = task_spec.ActorCounter();
const auto task_skipped = task_spec.GetMessage().skip_execution();
const auto num_queued =
request->sequence_number() - queue.rpc_client->ClientProcessedUpToSeqno();
RAY_LOG(DEBUG) << "Pushing task " << task_id << " to actor " << actor_id
<< " actor counter " << actor_counter << " seq no "
<< request->sequence_number() << " num queued " << num_queued;
if (num_queued >= next_queueing_warn_threshold_) {
// TODO(ekl) add more debug info about the actor name, etc.
warn_excess_queueing_(actor_id, num_queued);
next_queueing_warn_threshold_ *= 2;
}
rpc::Address addr(queue.rpc_client->Addr());
rpc::ClientCallback<rpc::PushTaskReply> reply_callback =
[this, addr, task_id, actor_id, actor_counter, task_spec, task_skipped](
const Status &status, const rpc::PushTaskReply &reply) {
bool increment_completed_tasks = true;
if (task_skipped) {
// NOTE(simon):Increment the task counter regardless of the status because the
// reply for a previously completed task. We are not calling CompletePendingTask
// because the tasks are pushed directly to the actor, not placed on any queues
// in task_finisher_.
} else if (status.ok()) {
task_finisher_.CompletePendingTask(task_id, reply, addr);
} else {
// push task failed due to network error. For example, actor is dead
// and no process response for the push task.
absl::MutexLock lock(&mu_);
auto queue_pair = client_queues_.find(actor_id);
RAY_CHECK(queue_pair != client_queues_.end());
auto &queue = queue_pair->second;
bool immediately_mark_object_fail = (queue.state == rpc::ActorTableData::DEAD);
bool will_retry = task_finisher_.PendingTaskFailed(
task_id, rpc::ErrorType::ACTOR_DIED, &status, queue.creation_task_exception,
immediately_mark_object_fail);
if (will_retry) {
increment_completed_tasks = false;
} else if (!immediately_mark_object_fail) {
// put it to wait_for_death_info_tasks and wait for Death info
int64_t death_info_timeout_ts =
current_time_ms() +
RayConfig::instance().timeout_ms_task_wait_for_death_info();
queue.wait_for_death_info_tasks.emplace_back(death_info_timeout_ts,
task_spec);
RAY_LOG(INFO)
<< "PushActorTask failed because of network error, this task "
"will be stashed away and waiting for Death info from GCS, task_id="
<< task_spec.TaskId()
<< ", wait queue size=" << queue.wait_for_death_info_tasks.size();
}
}
if (increment_completed_tasks) {
absl::MutexLock lock(&mu_);
auto queue_pair = client_queues_.find(actor_id);
RAY_CHECK(queue_pair != client_queues_.end());
auto &queue = queue_pair->second;
// Try to increment queue.next_task_reply_position consecutively until we
// cannot. In the case of tasks not received in order, the following block
// ensure queue.next_task_reply_position are incremented to the max possible
// value.
queue.out_of_order_completed_tasks.insert({actor_counter, task_spec});
auto min_completed_task = queue.out_of_order_completed_tasks.begin();
while (min_completed_task != queue.out_of_order_completed_tasks.end()) {
if (min_completed_task->first == queue.next_task_reply_position) {
queue.next_task_reply_position++;
// increment the iterator and erase the old value
queue.out_of_order_completed_tasks.erase(min_completed_task++);
} else {
break;
}
}
RAY_LOG(DEBUG) << "Got PushTaskReply for actor " << actor_id
<< " with actor_counter " << actor_counter
<< " new queue.next_task_reply_position is "
<< queue.next_task_reply_position
<< " and size of out_of_order_tasks set is "
<< queue.out_of_order_completed_tasks.size();
}
};
queue.inflight_task_callbacks.emplace(task_id, std::move(reply_callback));
rpc::ClientCallback<rpc::PushTaskReply> wrapped_callback =
[this, task_id, actor_id](const Status &status, const rpc::PushTaskReply &reply) {
rpc::ClientCallback<rpc::PushTaskReply> reply_callback;
{
absl::MutexLock lock(&mu_);
auto it = client_queues_.find(actor_id);
RAY_CHECK(it != client_queues_.end());
auto &queue = it->second;
auto callback_it = queue.inflight_task_callbacks.find(task_id);
if (callback_it == queue.inflight_task_callbacks.end()) {
RAY_LOG(DEBUG) << "The task " << task_id
<< " has already been marked as failed. Ingore the reply.";
return;
}
reply_callback = std::move(callback_it->second);
queue.inflight_task_callbacks.erase(callback_it);
}
reply_callback(status, reply);
};
queue.rpc_client->PushActorTask(std::move(request), skip_queue, wrapped_callback);
}
bool CoreWorkerDirectActorTaskSubmitter::IsActorAlive(const ActorID &actor_id) const {
absl::MutexLock lock(&mu_);
auto iter = client_queues_.find(actor_id);
return (iter != client_queues_.end() && iter->second.rpc_client);
}
void CoreWorkerDirectTaskReceiver::Init(
std::shared_ptr<rpc::CoreWorkerClientPool> client_pool, rpc::Address rpc_address,
std::shared_ptr<DependencyWaiter> dependency_waiter) {

View file

@ -34,6 +34,7 @@
#include "ray/core_worker/store_provider/memory_store/memory_store.h"
#include "ray/core_worker/task_manager.h"
#include "ray/core_worker/transport/dependency_resolver.h"
#include "ray/core_worker/transport/direct_actor_task_submitter.h"
#include "ray/rpc/grpc_server.h"
#include "ray/rpc/worker/core_worker_client.h"
@ -43,256 +44,6 @@ namespace core {
/// The max time to wait for out-of-order tasks.
const int kMaxReorderWaitSeconds = 30;
/// In direct actor call task submitter and receiver, a task is directly submitted
/// to the actor that will execute it.
// Interface for testing.
class CoreWorkerDirectActorTaskSubmitterInterface {
public:
virtual void AddActorQueueIfNotExists(const ActorID &actor_id) = 0;
virtual void ConnectActor(const ActorID &actor_id, const rpc::Address &address,
int64_t num_restarts) = 0;
virtual void DisconnectActor(
const ActorID &actor_id, int64_t num_restarts, bool dead,
const std::shared_ptr<rpc::RayException> &creation_task_exception = nullptr) = 0;
virtual void KillActor(const ActorID &actor_id, bool force_kill, bool no_restart) = 0;
virtual void CheckTimeoutTasks() = 0;
virtual ~CoreWorkerDirectActorTaskSubmitterInterface() {}
};
// This class is thread-safe.
class CoreWorkerDirectActorTaskSubmitter
: public CoreWorkerDirectActorTaskSubmitterInterface {
public:
CoreWorkerDirectActorTaskSubmitter(
rpc::CoreWorkerClientPool &core_worker_client_pool, CoreWorkerMemoryStore &store,
TaskFinisherInterface &task_finisher, ActorCreatorInterface &actor_creator,
std::function<void(const ActorID &, int64_t)> warn_excess_queueing)
: core_worker_client_pool_(core_worker_client_pool),
resolver_(store, task_finisher, actor_creator),
task_finisher_(task_finisher),
warn_excess_queueing_(warn_excess_queueing) {
next_queueing_warn_threshold_ =
::RayConfig::instance().actor_excess_queueing_warn_threshold();
}
/// Add an actor queue. This should be called whenever a reference to an
/// actor is created in the language frontend.
/// TODO(swang): Remove the actor queue once it is sure that this worker will
/// not receive another reference to the same actor.
///
/// \param[in] actor_id The actor for whom to add a queue.
void AddActorQueueIfNotExists(const ActorID &actor_id);
/// Submit a task to an actor for execution.
///
/// \param[in] task The task spec to submit.
/// \return Status::Invalid if the task is not yet supported.
Status SubmitTask(TaskSpecification task_spec);
/// Tell this actor to exit immediately.
///
/// \param[in] actor_id The actor_id of the actor to kill.
/// \param[in] force_kill Whether to force kill the actor, or let the actor
/// try a clean exit.
/// \param[in] no_restart If set to true, the killed actor will not be
/// restarted anymore.
void KillActor(const ActorID &actor_id, bool force_kill, bool no_restart);
/// Create connection to actor and send all pending tasks.
///
/// \param[in] actor_id Actor ID.
/// \param[in] address The new address of the actor.
/// \param[in] num_restarts How many times this actor has been restarted
/// before. If we've already seen a later incarnation of the actor, we will
/// ignore the command to connect.
void ConnectActor(const ActorID &actor_id, const rpc::Address &address,
int64_t num_restarts);
/// Disconnect from a failed actor.
///
/// \param[in] actor_id Actor ID.
/// \param[in] num_restarts How many times this actor has been restarted
/// before. If we've already seen a later incarnation of the actor, we will
/// ignore the command to connect.
/// \param[in] dead Whether the actor is permanently dead. In this case, all
/// pending tasks for the actor should be failed.
/// \param[in] creation_task_exception Reason why the actor is dead, only applies when
/// dead = true. If this arg is set, it means this actor died because of an exception
/// thrown in creation task.
void DisconnectActor(
const ActorID &actor_id, int64_t num_restarts, bool dead,
const std::shared_ptr<rpc::RayException> &creation_task_exception = nullptr);
/// Set the timerstamp for the caller.
void SetCallerCreationTimestamp(int64_t timestamp);
/// Check timeout tasks that are waiting for Death info.
void CheckTimeoutTasks();
private:
struct ClientQueue {
/// The current state of the actor. If this is ALIVE, then we should have
/// an RPC client to the actor. If this is DEAD, then all tasks in the
/// queue will be marked failed and all other ClientQueue state is ignored.
rpc::ActorTableData::ActorState state = rpc::ActorTableData::DEPENDENCIES_UNREADY;
/// Only applies when state=DEAD.
std::shared_ptr<rpc::RayException> creation_task_exception = nullptr;
/// How many times this actor has been restarted before. Starts at -1 to
/// indicate that the actor is not yet created. This is used to drop stale
/// messages from the GCS.
int64_t num_restarts = -1;
/// The RPC client. We use shared_ptr to enable shared_from_this for
/// pending client callbacks.
std::shared_ptr<rpc::CoreWorkerClientInterface> rpc_client = nullptr;
/// The intended worker ID of the actor.
std::string worker_id = "";
/// The actor's pending requests, ordered by the task number (see below
/// diagram) in the request. The bool indicates whether the dependencies
/// for that task have been resolved yet. A task will be sent after its
/// dependencies have been resolved and its task number matches
/// next_send_position.
std::map<uint64_t, std::pair<TaskSpecification, bool>> requests;
/// Diagram of the sequence numbers assigned to actor tasks during actor
/// crash and restart:
///
/// The actor starts, and 10 tasks are submitted. We have sent 6 tasks
/// (0-5) so far, and have received a successful reply for 4 tasks (0-3).
/// 0 1 2 3 4 5 6 7 8 9
/// ^ next_send_position
/// ^ next_task_reply_position
/// ^ caller_starts_at
///
/// Suppose the actor crashes and recovers. Then, caller_starts_at is reset
/// to the current next_task_reply_position. caller_starts_at is then subtracted
/// from each task's counter, so the recovered actor will receive the
/// sequence numbers 0, 1, 2 (and so on) for tasks 4, 5, 6, respectively.
/// Therefore, the recovered actor will restart execution from task 4.
/// 0 1 2 3 4 5 6 7 8 9
/// ^ next_send_position
/// ^ next_task_reply_position
/// ^ caller_starts_at
///
/// New actor tasks will continue to be sent even while tasks are being
/// resubmitted, but the receiving worker will only execute them after the
/// resent tasks. For example, this diagram shows what happens if task 6 is
/// sent for the first time, tasks 4 and 5 have been resent, and we have
/// received a successful reply for task 4.
/// 0 1 2 3 4 5 6 7 8 9
/// ^ next_send_position
/// ^ next_task_reply_position
/// ^ caller_starts_at
///
/// The send position of the next task to send to this actor. This sequence
/// number increases monotonically.
uint64_t next_send_position = 0;
/// The offset at which the the actor should start its counter for this
/// caller. This is used for actors that can be restarted, so that the new
/// instance of the actor knows from which task to start executing.
uint64_t caller_starts_at = 0;
/// Out of the tasks sent by this worker to the actor, the number of tasks
/// that we will never send to the actor again. This is used to reset
/// caller_starts_at if the actor dies and is restarted. We only include
/// tasks that will not be sent again, to support automatic task retry on
/// actor failure. This value only tracks consecutive tasks that are completed.
/// Tasks completed out of order will be cached in out_of_completed_tasks first.
uint64_t next_task_reply_position = 0;
/// The temporary container for tasks completed out of order. It can happen in
/// async or threaded actor mode. This map is used to store the seqno and task
/// spec for (1) increment next_task_reply_position later when the in order tasks are
/// returned (2) resend the tasks to restarted actor so retried tasks can maintain
/// ordering.
// NOTE(simon): consider absl::btree_set for performance, but it requires updating
// abseil.
std::map<uint64_t, TaskSpecification> out_of_order_completed_tasks;
/// Tasks that can't be sent because 1) the callee actor is dead. 2) network error.
/// For 1) the task will wait for the DEAD state notification, then mark task as
/// failed using the death_info in notification. For 2) we'll never receive a DEAD
/// notification, in this case we'll wait for a fixed timeout value and then mark it
/// as failed.
/// pair key: timestamp in ms when this task should be considered as timeout.
/// pair value: task specification
std::deque<std::pair<int64_t, TaskSpecification>> wait_for_death_info_tasks;
/// A force-kill request that should be sent to the actor once an RPC
/// client to the actor is available.
absl::optional<rpc::KillActorRequest> pending_force_kill;
/// Stores all callbacks of inflight tasks. Note that this doesn't include tasks
/// without replies.
std::unordered_map<TaskID, rpc::ClientCallback<rpc::PushTaskReply>>
inflight_task_callbacks;
};
/// Push a task to a remote actor via the given client.
/// Note, this function doesn't return any error status code. If an error occurs while
/// sending the request, this task will be treated as failed.
///
/// \param[in] queue The actor queue. Contains the RPC client state.
/// \param[in] task_spec The task to send.
/// \param[in] skip_queue Whether to skip the task queue. This will send the
/// task for execution immediately.
/// \return Void.
void PushActorTask(ClientQueue &queue, const TaskSpecification &task_spec,
bool skip_queue) EXCLUSIVE_LOCKS_REQUIRED(mu_);
/// Send all pending tasks for an actor.
///
/// \param[in] actor_id Actor ID.
/// \return Void.
void SendPendingTasks(const ActorID &actor_id) EXCLUSIVE_LOCKS_REQUIRED(mu_);
/// Resend all previously-received, out-of-order, received tasks for an actor.
/// When sending these tasks, the tasks will have the flag skip_execution=true.
///
/// \param[in] actor_id Actor ID.
/// \return Void.
void ResendOutOfOrderTasks(const ActorID &actor_id) EXCLUSIVE_LOCKS_REQUIRED(mu_);
/// Disconnect the RPC client for an actor.
void DisconnectRpcClient(ClientQueue &queue) EXCLUSIVE_LOCKS_REQUIRED(mu_);
/// Fail all in-flight tasks.
void FailInflightTasks(
const std::unordered_map<TaskID, rpc::ClientCallback<rpc::PushTaskReply>>
&inflight_task_callbacks) LOCKS_EXCLUDED(mu_);
/// Whether the specified actor is alive.
///
/// \param[in] actor_id The actor ID.
/// \return Whether this actor is alive.
bool IsActorAlive(const ActorID &actor_id) const;
/// Pool for producing new core worker clients.
rpc::CoreWorkerClientPool &core_worker_client_pool_;
/// Mutex to protect the various maps below.
mutable absl::Mutex mu_;
absl::flat_hash_map<ActorID, ClientQueue> client_queues_ GUARDED_BY(mu_);
/// Resolve direct call object dependencies.
LocalDependencyResolver resolver_;
/// Used to complete tasks.
TaskFinisherInterface &task_finisher_;
/// Used to warn of excessive queueing.
std::function<void(const ActorID &, int64_t num_queued)> warn_excess_queueing_;
/// Warn the next time the number of queued task submissions to an actor
/// exceeds this quantity. This threshold is doubled each time it is hit.
int64_t next_queueing_warn_threshold_;
friend class CoreWorkerTest;
};
/// The class that manages fiber states for Python asyncio actors.
///
/// We'll create one fiber state for every concurrency group. And