Fix bogus warning about excess queuing for async actors (#22386)

#17581 introduced a warning about excess queuing for actors. Unfortunately since Ray 1.10.0, the metric used became wrong for async actors, resulting in bogus warnings when they are called more than 5000 times, even though there are not 5000 pending tasks.

The difference between 1.9.2 and 1.10.0 is that async actors tasks skip the queue in CoreWorkerClient::PushActorTask. However CoreWorkerClient::ClientProcessedUpToSeqno uses max_finished_seq_no_ which is never updated when the queue is skipped.

I think that a better metric for the amount of tasks that are pending submissions is the size of the internal queue CoreWorkerDirectActorTaskSubmitter::inflight_task_callbacks.
This commit is contained in:
Jonathan Giannuzzi 2022-04-30 01:19:43 +01:00 committed by GitHub
parent ba7cc1803a
commit 9f88031d4f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 43 additions and 22 deletions

View file

@ -503,17 +503,26 @@ def test_export_large_objects(ray_start_regular, error_pubsub):
assert errors[0].type == ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR
def test_warning_many_actor_tasks_queued(shutdown_only):
@pytest.mark.parametrize("sync", [True, False])
def test_warning_many_actor_tasks_queued(shutdown_only, sync: bool):
ray.init(num_cpus=1)
p = init_error_pubsub()
@ray.remote(num_cpus=1)
class Foo:
class SyncFoo:
def f(self):
import time
time.sleep(1)
@ray.remote(num_cpus=1)
class AsyncFoo:
async def f(self):
import asyncio
await asyncio.sleep(1)
Foo = SyncFoo if sync else AsyncFoo
a = Foo.remote()
[a.f.remote() for _ in range(50000)]
errors = get_error_message(p, 4, ray_constants.EXCESS_QUEUEING_WARNING)
@ -524,6 +533,29 @@ def test_warning_many_actor_tasks_queued(shutdown_only):
assert "Warning: More than 40000 tasks are pending submission to actor" in msgs[3]
@pytest.mark.parametrize("sync", [True, False])
def test_no_warning_many_actor_tasks_queued_when_sequential(shutdown_only, sync: bool):
ray.init(num_cpus=1)
p = init_error_pubsub()
@ray.remote(num_cpus=1)
class SyncFoo:
def f(self):
return 1
@ray.remote(num_cpus=1)
class AsyncFoo:
async def f(self):
return 1
Foo = SyncFoo if sync else AsyncFoo
a = Foo.remote()
for _ in range(10000):
assert ray.get(a.f.remote()) == 1
errors = get_error_message(p, 1, ray_constants.EXCESS_QUEUEING_WARNING, timeout=1)
assert len(errors) == 0
@pytest.mark.parametrize(
"ray_start_cluster_head",
[

View file

@ -164,7 +164,7 @@ RAY_CONFIG(uint64_t, actor_creation_min_retries, 3)
/// Warn if more than this many tasks are queued for submission to an actor.
/// It likely indicates a bug in the user code.
RAY_CONFIG(int64_t, actor_excess_queueing_warn_threshold, 5000)
RAY_CONFIG(uint64_t, actor_excess_queueing_warn_threshold, 5000)
/// When trying to resolve an object, the initial period that the raylet will
/// wait before contacting the object's owner to check if the object is still

View file

@ -351,7 +351,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
new raylet::RayletClient(std::move(grpc_client)));
};
auto on_excess_queueing = [this](const ActorID &actor_id, int64_t num_queued) {
auto on_excess_queueing = [this](const ActorID &actor_id, uint64_t num_queued) {
auto timestamp = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();

View file

@ -190,7 +190,7 @@ TEST_P(DirectActorSubmitterTest, TestQueueingWarning) {
for (int i = 0; i < 7500; i++) {
auto task = CreateActorTaskHelper(actor_id, worker_id, i);
ASSERT_TRUE(CheckSubmitTask(task));
worker_client_->acked_seqno = i;
ASSERT_TRUE(worker_client_->ReplyPushTask());
}
ASSERT_EQ(last_queue_warning_, 0);
@ -199,24 +199,14 @@ TEST_P(DirectActorSubmitterTest, TestQueueingWarning) {
ASSERT_TRUE(CheckSubmitTask(task));
/* no ack */
}
// TODO(ekl) support warning when true.
// https://github.com/ray-project/ray/issues/22278
if (execute_out_of_order) {
ASSERT_EQ(last_queue_warning_, 0);
} else {
ASSERT_EQ(last_queue_warning_, 5000);
}
ASSERT_EQ(last_queue_warning_, 5000);
for (int i = 15000; i < 35000; i++) {
auto task = CreateActorTaskHelper(actor_id, worker_id, i);
ASSERT_TRUE(CheckSubmitTask(task));
/* no ack */
}
if (execute_out_of_order) {
ASSERT_EQ(last_queue_warning_, 0);
} else {
ASSERT_EQ(last_queue_warning_, 20000);
}
ASSERT_EQ(last_queue_warning_, 20000);
}
TEST_P(DirectActorSubmitterTest, TestDependencies) {

View file

@ -391,12 +391,11 @@ void CoreWorkerDirectActorTaskSubmitter::PushActorTask(ClientQueue &queue,
const auto actor_id = task_spec.ActorId();
const auto actor_counter = task_spec.ActorCounter();
const auto task_skipped = task_spec.GetMessage().skip_execution();
const auto num_queued =
request->sequence_number() - queue.rpc_client->ClientProcessedUpToSeqno();
const auto num_queued = queue.inflight_task_callbacks.size();
RAY_LOG(DEBUG) << "Pushing task " << task_id << " to actor " << actor_id
<< " actor counter " << actor_counter << " seq no "
<< request->sequence_number() << " num queued " << num_queued;
if (num_queued >= next_queueing_warn_threshold_ && !skip_queue) {
if (num_queued >= next_queueing_warn_threshold_) {
// TODO(ekl) add more debug info about the actor name, etc.
warn_excess_queueing_(actor_id, num_queued);
next_queueing_warn_threshold_ *= 2;

View file

@ -276,11 +276,11 @@ class CoreWorkerDirectActorTaskSubmitter
TaskFinisherInterface &task_finisher_;
/// Used to warn of excessive queueing.
std::function<void(const ActorID &, int64_t num_queued)> warn_excess_queueing_;
std::function<void(const ActorID &, uint64_t num_queued)> warn_excess_queueing_;
/// Warn the next time the number of queued task submissions to an actor
/// exceeds this quantity. This threshold is doubled each time it is hit.
int64_t next_queueing_warn_threshold_;
uint64_t next_queueing_warn_threshold_;
/// The event loop where the actor task events are handled.
instrumented_io_context &io_service_;