[direct call] Fix hang when caller id changes for actor task submission (#6338)

This commit is contained in:
Eric Liang 2019-12-04 12:01:35 -08:00 committed by GitHub
parent 31113aeded
commit 1a3b83abf8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 65 additions and 29 deletions

View file

@ -1253,6 +1253,36 @@ def test_direct_call_simple(ray_start_cluster):
range(1, 101))
# https://github.com/ray-project/ray/issues/6329
def test_call_actors_indirect_through_tasks(ray_start_regular):
@ray.remote
class Counter(object):
def __init__(self, value):
self.value = int(value)
def increase(self, delta):
self.value += int(delta)
return self.value
@ray.remote
def foo(object):
return ray.get(object.increase.remote(1))
@ray.remote
def bar(object):
return ray.get(object.increase.remote(1))
@ray.remote
def zoo(object):
return ray.get(object[0].increase.remote(1))
c = Counter.remote(0)
for _ in range(0, 100):
ray.get(foo.remote(c))
ray.get(bar.remote(c))
ray.get(zoo.remote([c]))
def test_direct_call_refcount(ray_start_regular):
@ray.remote
def f(x):

View file

@ -7,15 +7,19 @@ using ray::rpc::ActorTableData;
namespace ray {
int64_t GetRequestNumber(const std::unique_ptr<rpc::PushTaskRequest> &request) {
return request->task_spec().actor_task_spec().actor_counter();
}
Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spec) {
RAY_LOG(DEBUG) << "Submitting task " << task_spec.TaskId();
RAY_CHECK(task_spec.IsActorTask());
resolver_.ResolveDependencies(task_spec, [this, task_spec]() mutable {
// We must fix the send order prior to resolving dependencies, which may complete
// out of order. This ensures we preserve the client-side send order.
int64_t send_pos = -1;
{
absl::MutexLock lock(&mu_);
send_pos = next_send_position_to_assign_[task_spec.ActorId()]++;
}
resolver_.ResolveDependencies(task_spec, [this, send_pos, task_spec]() mutable {
const auto &actor_id = task_spec.ActorId();
const auto task_id = task_spec.TaskId();
@ -25,7 +29,7 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe
// access the task.
request->mutable_task_spec()->CopyFrom(task_spec.GetMessage());
std::unique_lock<std::mutex> guard(mutex_);
absl::MutexLock lock(&mu_);
auto iter = actor_states_.find(actor_id);
if (iter == actor_states_.end() ||
@ -36,13 +40,11 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe
// actor handle (e.g. from unpickling), in that case it might be desirable
// to have a timeout to mark it as invalid if it doesn't show up in the
// specified time.
auto inserted = pending_requests_[actor_id].emplace(GetRequestNumber(request),
std::move(request));
auto inserted = pending_requests_[actor_id].emplace(send_pos, std::move(request));
RAY_CHECK(inserted.second);
RAY_LOG(DEBUG) << "Actor " << actor_id << " is not yet created.";
} else if (iter->second.state_ == ActorTableData::ALIVE) {
auto inserted = pending_requests_[actor_id].emplace(GetRequestNumber(request),
std::move(request));
auto inserted = pending_requests_[actor_id].emplace(send_pos, std::move(request));
RAY_CHECK(inserted.second);
SendPendingTasks(actor_id);
} else {
@ -59,7 +61,7 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe
void CoreWorkerDirectActorTaskSubmitter::HandleActorUpdate(
const ActorID &actor_id, const ActorTableData &actor_data) {
std::unique_lock<std::mutex> guard(mutex_);
absl::MutexLock lock(&mu_);
actor_states_.erase(actor_id);
actor_states_.emplace(
actor_id, ActorStateData(actor_data.state(), actor_data.address().ip_address(),
@ -93,7 +95,8 @@ void CoreWorkerDirectActorTaskSubmitter::HandleActorUpdate(
pending_requests_.erase(pending_it);
}
next_sequence_number_.erase(actor_id);
next_send_position_.erase(actor_id);
next_send_position_to_assign_.erase(actor_id);
// No need to clean up tasks that have been sent and are waiting for
// replies. They will be treated as failed once the connection dies.
@ -106,7 +109,7 @@ void CoreWorkerDirectActorTaskSubmitter::SendPendingTasks(const ActorID &actor_i
// Submit all pending requests.
auto &requests = pending_requests_[actor_id];
auto head = requests.begin();
while (head != requests.end() && head->first == next_sequence_number_[actor_id]) {
while (head != requests.end() && head->first == next_send_position_[actor_id]) {
auto request = std::move(head->second);
head = requests.erase(head);
@ -120,11 +123,7 @@ void CoreWorkerDirectActorTaskSubmitter::PushActorTask(
rpc::CoreWorkerClientInterface &client, std::unique_ptr<rpc::PushTaskRequest> request,
const ActorID &actor_id, const TaskID &task_id, int num_returns) {
RAY_LOG(DEBUG) << "Pushing task " << task_id << " to actor " << actor_id;
auto task_number = GetRequestNumber(request);
RAY_CHECK(next_sequence_number_[actor_id] == task_number)
<< "Counter was " << task_number << " expected " << next_sequence_number_[actor_id];
next_sequence_number_[actor_id]++;
next_send_position_[actor_id]++;
RAY_CHECK_OK(client.PushActorTask(
std::move(request),
@ -138,7 +137,7 @@ void CoreWorkerDirectActorTaskSubmitter::PushActorTask(
}
bool CoreWorkerDirectActorTaskSubmitter::IsActorAlive(const ActorID &actor_id) const {
std::unique_lock<std::mutex> guard(mutex_);
absl::MutexLock lock(&mu_);
auto iter = actor_states_.find(actor_id);
return (iter != actor_states_.end() && iter->second.state_ == ActorTableData::ALIVE);

View file

@ -79,7 +79,8 @@ class CoreWorkerDirectActorTaskSubmitter {
/// \return Void.
void PushActorTask(rpc::CoreWorkerClientInterface &client,
std::unique_ptr<rpc::PushTaskRequest> request,
const ActorID &actor_id, const TaskID &task_id, int num_returns);
const ActorID &actor_id, const TaskID &task_id, int num_returns)
EXCLUSIVE_LOCKS_REQUIRED(mu_);
/// Send all pending tasks for an actor.
/// Note that this function doesn't take lock, the caller is expected to hold
@ -87,7 +88,7 @@ class CoreWorkerDirectActorTaskSubmitter {
///
/// \param[in] actor_id Actor ID.
/// \return Void.
void SendPendingTasks(const ActorID &actor_id);
void SendPendingTasks(const ActorID &actor_id) EXCLUSIVE_LOCKS_REQUIRED(mu_);
/// Whether the specified actor is alive.
///
@ -99,27 +100,32 @@ class CoreWorkerDirectActorTaskSubmitter {
rpc::ClientFactoryFn client_factory_;
/// Mutex to proect the various maps below.
mutable std::mutex mutex_;
mutable absl::Mutex mu_;
/// Map from actor id to actor state. This only includes actors that we send tasks to.
std::unordered_map<ActorID, ActorStateData> actor_states_;
absl::flat_hash_map<ActorID, ActorStateData> actor_states_ GUARDED_BY(mu_);
/// Map from actor id to rpc client. This only includes actors that we send tasks to.
/// We use shared_ptr to enable shared_from_this for pending client callbacks.
///
/// TODO(zhijunfu): this will be moved into `actor_states_` later when we can
/// subscribe updates for a specific actor.
std::unordered_map<ActorID, std::shared_ptr<rpc::CoreWorkerClientInterface>>
rpc_clients_;
absl::flat_hash_map<ActorID, std::shared_ptr<rpc::CoreWorkerClientInterface>>
rpc_clients_ GUARDED_BY(mu_);
/// Map from actor id to the actor's pending requests. Each actor's requests
/// are ordered by the task number in the request.
absl::flat_hash_map<ActorID, std::map<int64_t, std::unique_ptr<rpc::PushTaskRequest>>>
pending_requests_;
pending_requests_ GUARDED_BY(mu_);
/// Map from actor id to the sequence number of the next task to send to that
/// actor.
std::unordered_map<ActorID, int64_t> next_sequence_number_;
/// Map from actor id to the send position of the next task to queue for send
/// for that actor. This is always greater than or equal to next_send_position_.
absl::flat_hash_map<ActorID, int64_t> next_send_position_to_assign_ GUARDED_BY(mu_);
/// Map from actor id to the send position of the next task to send to that actor.
/// Note that this differs from the PushTaskRequest's sequence number in that it
/// increases monotonically in this process independently of CallerId changes.
absl::flat_hash_map<ActorID, int64_t> next_send_position_ GUARDED_BY(mu_);
/// Resolve direct call object dependencies;
LocalDependencyResolver resolver_;
@ -307,6 +313,7 @@ class SchedulingQueue {
<< client_processed_up_to;
next_seq_no_ = client_processed_up_to + 1;
}
RAY_LOG(DEBUG) << "Enqueue " << seq_no << " cur seqno " << next_seq_no_;
pending_tasks_[seq_no] =
InboundRequest(accept_request, reject_request, dependencies.size() > 0);
if (dependencies.size() > 0) {