Support concurrent Actor calls in Ray (#6053)

This commit is contained in:
Eric Liang 2019-11-04 01:14:35 -08:00 committed by GitHub
parent fbad6f543b
commit 8485304e83
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 287 additions and 86 deletions

View file

@ -6,6 +6,7 @@
from cpython.exc cimport PyErr_CheckSignals
import numpy
import threading
import time
import logging
import os
@ -647,28 +648,34 @@ cdef CRayStatus task_execution_handler(
with gil:
try:
# The call to execute_task should never raise an exception. If it
# does, that indicates that there was an unexpected internal error.
execute_task(task_type, ray_function, c_resources, c_args,
c_arg_reference_ids, c_return_ids,
return_results_directly, returns)
except Exception:
traceback_str = traceback.format_exc() + (
"An unexpected internal error occurred while the worker was"
"executing a task.")
ray.utils.push_error_to_driver(
ray.worker.global_worker,
"worker_crash",
traceback_str,
job_id=None)
# TODO(rkn): Note that if the worker was in the middle of executing
# a task, then any worker or driver that is blocking in a get call
# and waiting for the output of that task will hang. We need to
# address this.
sys.exit(1)
try:
# The call to execute_task should never raise an exception. If
# it does, that indicates that there was an internal error.
execute_task(task_type, ray_function, c_resources, c_args,
c_arg_reference_ids, c_return_ids,
return_results_directly, returns)
except Exception:
traceback_str = traceback.format_exc() + (
"An unexpected internal error occurred while the worker "
"was executing a task.")
ray.utils.push_error_to_driver(
ray.worker.global_worker,
"worker_crash",
traceback_str,
job_id=None)
sys.exit(1)
except SystemExit:
if isinstance(threading.current_thread(), threading._MainThread):
raise
else:
# We cannot exit from a non-main thread, so return a special
# status that tells the core worker to call sys.exit() on the
# main thread instead. This only applies to direct actor calls.
return CRayStatus.SystemExit()
return CRayStatus.OK()
cdef CRayStatus check_signals() nogil:
with gil:
try:
@ -678,6 +685,11 @@ cdef CRayStatus check_signals() nogil:
return CRayStatus.OK()
cdef void exit_handler() nogil:
with gil:
sys.exit(0)
cdef void push_objects_into_return_vector(
py_objects,
c_vector[shared_ptr[CRayObject]] *returns):
@ -733,7 +745,7 @@ cdef class CoreWorker:
raylet_socket.encode("ascii"), job_id.native(),
gcs_options.native()[0], log_dir.encode("utf-8"),
node_ip_address.encode("utf-8"), task_execution_handler,
check_signals))
check_signals, exit_handler))
def disconnect(self):
with nogil:
@ -966,6 +978,7 @@ cdef class CoreWorker:
resources,
placement_resources,
c_bool is_direct_call,
int32_t max_concurrency,
c_bool is_detached):
cdef:
CRayFunction ray_function
@ -986,9 +999,9 @@ cdef class CoreWorker:
check_status(self.core_worker.get().CreateActor(
ray_function, args_vector,
CActorCreationOptions(
max_reconstructions, is_direct_call, c_resources,
c_placement_resources, dynamic_worker_options,
is_detached),
max_reconstructions, is_direct_call, max_concurrency,
c_resources, c_placement_resources,
dynamic_worker_options, is_detached),
&c_actor_id))
return ActorID(c_actor_id.Binary())

View file

@ -326,6 +326,26 @@ class ActorClass(object):
"""
return self._remote(args=args, kwargs=kwargs)
def options(self, **options):
"""Convenience method for creating an actor with options.
Same arguments as Actor._remote(), but returns a wrapped actor class
that a non-underscore .remote() can be called on.
Examples:
# The following two calls are equivalent.
>>> Actor._remote(num_cpus=4, max_concurrency=8, args=[x, y])
>>> Actor.options(num_cpus=4, max_concurrency=8).remote(x, y)
"""
actor_cls = self
class ActorOptionWrapper(object):
def remote(self, *args, **kwargs):
return actor_cls._remote(args=args, kwargs=kwargs, **options)
return ActorOptionWrapper()
def _remote(self,
args=None,
kwargs=None,
@ -335,6 +355,7 @@ class ActorClass(object):
object_store_memory=None,
resources=None,
is_direct_call=None,
max_concurrency=None,
name=None,
detached=False):
"""Create an actor.
@ -354,6 +375,8 @@ class ActorClass(object):
resources: The custom resources required by the actor creation
task.
is_direct_call: Use direct actor calls.
max_concurrency: The max number of concurrent calls to allow for
this actor. This only works with direct actor calls.
name: The globally unique name for the actor.
detached: Whether the actor should be kept alive after driver
exits.
@ -365,6 +388,16 @@ class ActorClass(object):
args = []
if kwargs is None:
kwargs = {}
if is_direct_call is None:
is_direct_call = False
if max_concurrency is None:
max_concurrency = 1
if max_concurrency > 1 and not is_direct_call:
raise ValueError(
"setting max_concurrency requires is_direct_call=True")
if max_concurrency < 1:
raise ValueError("max_concurrency must be >= 1")
worker = ray.worker.get_global_worker()
if worker.mode is None:
@ -452,7 +485,8 @@ class ActorClass(object):
actor_id = worker.core_worker.create_actor(
function_descriptor.get_function_descriptor_list(),
creation_args, meta.max_reconstructions, resources,
actor_placement_resources, is_direct_call, detached)
actor_placement_resources, is_direct_call, max_concurrency,
detached)
actor_handle = ActorHandle(
actor_id,

View file

@ -2,7 +2,7 @@ from libcpp cimport bool as c_bool
from libcpp.memory cimport shared_ptr, unique_ptr
from libcpp.string cimport string as c_string
from libc.stdint cimport uint8_t, uint64_t, int64_t
from libc.stdint cimport uint8_t, int32_t, uint64_t, int64_t
from libcpp.unordered_map cimport unordered_map
from libcpp.vector cimport vector as c_vector
@ -76,6 +76,9 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil:
@staticmethod
CRayStatus Interrupted(const c_string &msg)
@staticmethod
CRayStatus SystemExit()
c_bool ok()
c_bool IsOutOfMemory()
c_bool IsKeyError()
@ -205,6 +208,7 @@ cdef extern from "ray/core_worker/common.h" nogil:
CActorCreationOptions()
CActorCreationOptions(
uint64_t max_reconstructions, c_bool is_direct_call,
int32_t max_concurrency,
const unordered_map[c_string, double] &resources,
const unordered_map[c_string, double] &placement_resources,
const c_vector[c_string] &dynamic_worker_options,

View file

@ -64,7 +64,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const c_vector[CObjectID] &return_ids,
c_bool is_direct_call,
c_vector[shared_ptr[CRayObject]] *returns) nogil,
CRayStatus() nogil)
CRayStatus() nogil,
void () nogil)
void Disconnect()
CWorkerType &GetWorkerType()
CLanguage &GetLanguage()

View file

@ -142,14 +142,28 @@ def main():
def actor_sync():
ray.get(a.small_value.remote())
timeit("single client actor calls sync", actor_sync)
timeit("1:1 actor calls sync", actor_sync)
a = Actor.remote()
def actor_async():
ray.get([a.small_value.remote() for _ in range(1000)])
timeit("single client actor calls async", actor_async, 1000)
timeit("1:1 actor calls async", actor_async, 1000)
a = Actor.options(is_direct_call=True).remote()
def actor_concurrent():
ray.get([a.small_value.remote() for _ in range(1000)])
timeit("1:1 direct actor calls async", actor_concurrent, 1000)
a = Actor.options(is_direct_call=True, max_concurrency=16).remote()
def actor_concurrent():
ray.get([a.small_value.remote() for _ in range(1000)])
timeit("1:1 direct actor calls concurrent", actor_concurrent, 1000)
n_cpu = multiprocessing.cpu_count() // 2
a = [Actor.remote() for _ in range(n_cpu)]
@ -161,7 +175,7 @@ def main():
def actor_multi2():
ray.get([work.remote(a) for _ in range(m)])
timeit("multi client actor calls async", actor_multi2, m * n)
timeit("n:n actor calls async", actor_multi2, m * n)
n = 5000
n_cpu = multiprocessing.cpu_count() // 2
@ -171,15 +185,24 @@ def main():
def actor_async_direct():
ray.get(client.small_value_batch.remote(n))
timeit("single client direct actor calls async", actor_async_direct,
n * len(actors))
timeit("1:n direct actor calls async", actor_async_direct, n * len(actors))
clients = [Client.remote(a) for a in actors]
def actor_multi2_direct():
ray.get([c.small_value_batch.remote(n) for c in clients])
timeit("multi client direct actor calls async", actor_multi2_direct,
timeit("n:n direct actor calls async", actor_multi2_direct,
n * len(clients))
n = 1000
actors = [Actor._remote(is_direct_call=True) for _ in range(n_cpu)]
clients = [Client.remote(a) for a in actors]
def actor_multi2_direct_arg():
ray.get([c.small_value_batch_arg.remote(n) for c in clients])
timeit("n:n direct actor calls with arg async", actor_multi2_direct_arg,
n * len(clients))
n = 1000

View file

@ -110,6 +110,26 @@ class RemoteFunction(object):
num_gpus=num_gpus,
resources=resources)
def options(self, **options):
"""Convenience method for executing a task with options.
Same arguments as func._remote(), but returns a wrapped function
that a non-underscore .remote() can be called on.
Examples:
# The following two calls are equivalent.
>>> func._remote(num_cpus=4, args=[x, y])
>>> func.options(num_cpus=4).remote(x, y)
"""
func_cls = self
class FuncWrapper(object):
def remote(self, *args, **kwargs):
return func_cls._remote(args=args, kwargs=kwargs, **options)
return FuncWrapper()
def _remote(self,
args=None,
kwargs=None,

View file

@ -1318,6 +1318,32 @@ def test_direct_actor_recursive(ray_start_regular):
assert result == [x * 2 for x in range(100)]
def test_direct_actor_concurrent(ray_start_regular):
@ray.remote
class Batcher(object):
def __init__(self):
self.batch = []
self.event = threading.Event()
def add(self, x):
self.batch.append(x)
if len(self.batch) >= 3:
self.event.set()
else:
self.event.wait()
return sorted(self.batch)
a = Batcher.options(is_direct_call=True, max_concurrency=3).remote()
x1 = a.add.remote(1)
x2 = a.add.remote(2)
x3 = a.add.remote(3)
r1 = ray.get(x1)
r2 = ray.get(x2)
r3 = ray.get(x3)
assert r1 == [1, 2, 3]
assert r1 == r2 == r3
def test_wait(ray_start_regular):
@ray.remote
def f(delay):
@ -1516,7 +1542,6 @@ def test_profiling_api(ray_start_2_cpus):
profile_data = ray.timeline()
event_types = {event["cat"] for event in profile_data}
expected_types = [
"worker_idle",
"task",
"task:deserialize_arguments",
"task:execute",

View file

@ -80,6 +80,7 @@ enum class StatusCode : char {
NotImplemented = 10,
RedisError = 11,
Interrupted = 12,
SystemExit = 13,
};
#if defined(__clang__)
@ -147,6 +148,10 @@ class RAY_EXPORT Status {
return Status(StatusCode::Interrupted, msg);
}
static Status SystemExit() {
return Status(StatusCode::SystemExit, "process requested exit");
}
// Returns true iff the status indicates success.
bool ok() const { return (state_ == NULL); }
@ -161,6 +166,7 @@ class RAY_EXPORT Status {
bool IsNotImplemented() const { return code() == StatusCode::NotImplemented; }
bool IsRedisError() const { return code() == StatusCode::RedisError; }
bool IsInterrupted() const { return code() == StatusCode::Interrupted; }
bool IsSystemExit() const { return code() == StatusCode::SystemExit; }
// Return a string representation of this status suitable for printing.
// Returns the string "OK" for success.

View file

@ -189,6 +189,11 @@ bool TaskSpecification::IsDirectCall() const {
return message_->actor_creation_task_spec().is_direct_call();
}
int TaskSpecification::MaxActorConcurrency() const {
RAY_CHECK(IsActorCreationTask());
return message_->actor_creation_task_spec().max_concurrency();
}
bool TaskSpecification::IsDetachedActor() const {
RAY_CHECK(IsActorCreationTask());
return message_->actor_creation_task_spec().is_detached();

View file

@ -142,6 +142,8 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {
bool IsDirectCall() const;
int MaxActorConcurrency() const;
bool IsDetachedActor() const;
ObjectID ActorDummyObject() const;

View file

@ -93,7 +93,7 @@ class TaskSpecBuilder {
TaskSpecBuilder &SetActorCreationTaskSpec(
const ActorID &actor_id, uint64_t max_reconstructions = 0,
const std::vector<std::string> &dynamic_worker_options = {},
bool is_direct_call = false, bool is_detached = false) {
bool is_direct_call = false, int max_concurrency = 1, bool is_detached = false) {
message_->set_type(TaskType::ACTOR_CREATION_TASK);
auto actor_creation_spec = message_->mutable_actor_creation_task_spec();
actor_creation_spec->set_actor_id(actor_id.Binary());
@ -102,6 +102,7 @@ class TaskSpecBuilder {
actor_creation_spec->add_dynamic_worker_options(option);
}
actor_creation_spec->set_is_direct_call(is_direct_call);
actor_creation_spec->set_max_concurrency(max_concurrency);
actor_creation_spec->set_is_detached(is_detached);
return *this;
}

View file

@ -97,12 +97,14 @@ struct TaskOptions {
struct ActorCreationOptions {
ActorCreationOptions() {}
ActorCreationOptions(uint64_t max_reconstructions, bool is_direct_call,
int max_concurrency,
const std::unordered_map<std::string, double> &resources,
const std::unordered_map<std::string, double> &placement_resources,
const std::vector<std::string> &dynamic_worker_options,
bool is_detached)
: max_reconstructions(max_reconstructions),
is_direct_call(is_direct_call),
max_concurrency(max_concurrency),
resources(resources),
placement_resources(placement_resources),
dynamic_worker_options(dynamic_worker_options),
@ -114,6 +116,8 @@ struct ActorCreationOptions {
/// Whether to use direct actor call. If this is set to true, callers will submit
/// tasks directly to the created actor without going through raylet.
const bool is_direct_call = false;
/// The max number of concurrent tasks to run on this direct call actor.
const int max_concurrency = 1;
/// Resources required by the whole lifetime of this actor.
const std::unordered_map<std::string, double> resources;
/// Resources required to place this actor.

View file

@ -21,7 +21,6 @@ struct WorkerThreadContext {
void SetCurrentTaskId(const TaskID &task_id) { current_task_id_ = task_id; }
void SetCurrentTask(const TaskSpecification &task_spec) {
RAY_CHECK(current_task_id_.IsNil());
RAY_CHECK(task_index_ == 0);
RAY_CHECK(put_index_ == 0);
SetCurrentTaskId(task_spec.TaskId());
@ -97,6 +96,7 @@ void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) {
RAY_CHECK(current_actor_id_.IsNil());
current_actor_id_ = task_spec.ActorCreationId();
current_actor_use_direct_call_ = task_spec.IsDirectCall();
current_actor_max_concurrency_ = task_spec.MaxActorConcurrency();
} else if (task_spec.IsActorTask()) {
RAY_CHECK(current_job_id_ == task_spec.JobId());
RAY_CHECK(current_actor_id_ == task_spec.ActorId());
@ -122,21 +122,13 @@ bool WorkerContext::CurrentActorUseDirectCall() const {
return current_actor_use_direct_call_;
}
WorkerThreadContext &WorkerContext::GetThreadContext(bool for_main_thread) {
// Flag used to ensure that we only print a warning about multithreading once per
// process.
static bool multithreading_warning_printed = false;
int WorkerContext::CurrentActorMaxConcurrency() const {
return current_actor_max_concurrency_;
}
WorkerThreadContext &WorkerContext::GetThreadContext(bool for_main_thread) {
if (thread_context_ == nullptr) {
thread_context_ = std::unique_ptr<WorkerThreadContext>(new WorkerThreadContext());
if (!for_main_thread && !multithreading_warning_printed) {
std::cout << "WARNING: "
<< "Calling ray.get or ray.wait in a separate thread "
<< "may lead to deadlock if the main thread blocks on "
<< "this thread and there are not enough resources to "
<< "execute more tasks." << std::endl;
multithreading_warning_printed = true;
}
}
return *thread_context_;

View file

@ -36,6 +36,8 @@ class WorkerContext {
bool CurrentActorUseDirectCall() const;
int CurrentActorMaxConcurrency() const;
int GetNextTaskIndex();
int GetNextPutIndex();
@ -46,6 +48,7 @@ class WorkerContext {
JobID current_job_id_;
ActorID current_actor_id_;
bool current_actor_use_direct_call_;
int current_actor_max_concurrency_;
private:
static WorkerThreadContext &GetThreadContext(bool for_main_thread = false);

View file

@ -73,7 +73,8 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
const JobID &job_id, const gcs::GcsClientOptions &gcs_options,
const std::string &log_dir, const std::string &node_ip_address,
const TaskExecutionCallback &task_execution_callback,
std::function<Status()> check_signals)
std::function<Status()> check_signals,
const std::function<void()> exit_handler)
: worker_type_(worker_type),
language_(language),
log_dir_(log_dir),
@ -116,7 +117,8 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
execute_task));
direct_actor_task_receiver_ = std::unique_ptr<CoreWorkerDirectActorTaskReceiver>(
new CoreWorkerDirectActorTaskReceiver(worker_context_, task_execution_service_,
worker_server_, execute_task));
worker_server_, execute_task,
exit_handler));
}
// Start RPC server after all the task receivers are properly initialized.
@ -483,6 +485,7 @@ Status CoreWorker::CreateActor(const RayFunction &function,
builder.SetActorCreationTaskSpec(actor_id, actor_creation_options.max_reconstructions,
actor_creation_options.dynamic_worker_options,
actor_creation_options.is_direct_call,
actor_creation_options.max_concurrency,
actor_creation_options.is_detached);
std::unique_ptr<ActorHandle> actor_handle(new ActorHandle(
@ -607,17 +610,11 @@ std::unique_ptr<worker::ProfileEvent> CoreWorker::CreateProfileEvent(
new worker::ProfileEvent(profiler_, event_type));
}
void CoreWorker::StartExecutingTasks() {
idle_profile_event_.reset(new worker::ProfileEvent(profiler_, "worker_idle"));
task_execution_service_.run();
}
void CoreWorker::StartExecutingTasks() { task_execution_service_.run(); }
Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
const ResourceMappingType &resource_ids,
std::vector<std::shared_ptr<RayObject>> *results) {
idle_profile_event_.reset();
RAY_LOG(DEBUG) << "Executing task " << task_spec.TaskId();
resource_ids_ = resource_ids;
worker_context_.SetCurrentTask(task_spec);
SetCurrentTaskId(task_spec.TaskId());
@ -670,11 +667,6 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
}
}
}
// TODO(zhijunfu):
// 1. Check and handle failure.
// 2. Save or load checkpoint.
idle_profile_event_.reset(new worker::ProfileEvent(profiler_, "worker_idle"));
return status;
}

View file

@ -52,6 +52,8 @@ class CoreWorker {
/// \parma[in] check_signals Language worker function to check for signals and handle
/// them. If the function returns anything but StatusOK, any long-running
/// operations in the core worker will short circuit and return that status.
/// \parma[in] exit_handler Language worker function to orderly shutdown the worker.
/// We guarantee this will be run on the main thread of the worker.
///
/// NOTE(zhijunfu): the constructor would throw if a failure happens.
CoreWorker(const WorkerType worker_type, const Language language,
@ -59,7 +61,8 @@ class CoreWorker {
const JobID &job_id, const gcs::GcsClientOptions &gcs_options,
const std::string &log_dir, const std::string &node_ip_address,
const TaskExecutionCallback &task_execution_callback,
std::function<Status()> check_signals = nullptr);
std::function<Status()> check_signals = nullptr,
std::function<void()> exit_handler = nullptr);
~CoreWorker();
@ -438,10 +441,6 @@ class CoreWorker {
/// Profiler including a background thread that pushes profiling events to the GCS.
std::shared_ptr<worker::Profiler> profiler_;
/// Profile event for when the worker is idle. Should be reset when the worker
/// enters and exits an idle period.
std::unique_ptr<worker::ProfileEvent> idle_profile_event_;
/// Task execution callback.
TaskExecutionCallback task_execution_callback_;

View file

@ -62,9 +62,9 @@ ActorID CreateActorHelper(CoreWorker &worker,
std::vector<TaskArg> args;
args.emplace_back(TaskArg::PassByValue(std::make_shared<RayObject>(buffer, nullptr)));
ActorCreationOptions actor_options{
max_reconstructions, is_direct_call, resources, resources, {},
/*is_detached*/ false};
ActorCreationOptions actor_options{max_reconstructions, is_direct_call,
/*max_concurrency*/ 1, resources, resources, {},
/*is_detached*/ false};
// Create an actor.
ActorID actor_id;
@ -492,7 +492,7 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) {
args.emplace_back(TaskArg::PassByValue(std::make_shared<RayObject>(buffer, nullptr)));
std::unordered_map<std::string, double> resources;
ActorCreationOptions actor_options{0, /*is_direct_call*/ true, resources, resources,
ActorCreationOptions actor_options{0, /*is_direct_call*/ true, 1, resources, resources,
{}, /*is_detached*/ false};
const auto job_id = NextJobId();
ActorHandle actor_handle(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 1), job_id,
@ -592,7 +592,6 @@ TEST_F(ZeroNodeTest, TestWorkerContext) {
auto thread_func = [&context]() {
// Verify that task_index, put_index are thread-local.
ASSERT_TRUE(!context.GetCurrentTaskID().IsNil());
ASSERT_EQ(context.GetNextTaskIndex(), 1);
ASSERT_EQ(context.GetNextPutIndex(), 1);
};

View file

@ -23,7 +23,7 @@ class MockWaiter : public DependencyWaiter {
TEST(SchedulingQueueTest, TestInOrder) {
boost::asio::io_service io_service;
MockWaiter waiter;
SchedulingQueue queue(io_service, waiter, 0);
SchedulingQueue queue(io_service, waiter, nullptr, 0);
int n_ok = 0;
int n_rej = 0;
auto fn_ok = [&n_ok]() { n_ok++; };
@ -43,7 +43,7 @@ TEST(SchedulingQueueTest, TestWaitForObjects) {
ObjectID obj3 = ObjectID::FromRandom();
boost::asio::io_service io_service;
MockWaiter waiter;
SchedulingQueue queue(io_service, waiter, 0);
SchedulingQueue queue(io_service, waiter, nullptr, 0);
int n_ok = 0;
int n_rej = 0;
auto fn_ok = [&n_ok]() { n_ok++; };
@ -68,7 +68,7 @@ TEST(SchedulingQueueTest, TestWaitForObjectsNotSubjectToSeqTimeout) {
ObjectID obj1 = ObjectID::FromRandom();
boost::asio::io_service io_service;
MockWaiter waiter;
SchedulingQueue queue(io_service, waiter, 0);
SchedulingQueue queue(io_service, waiter, nullptr, 0);
int n_ok = 0;
int n_rej = 0;
auto fn_ok = [&n_ok]() { n_ok++; };
@ -85,7 +85,7 @@ TEST(SchedulingQueueTest, TestWaitForObjectsNotSubjectToSeqTimeout) {
TEST(SchedulingQueueTest, TestOutOfOrder) {
boost::asio::io_service io_service;
MockWaiter waiter;
SchedulingQueue queue(io_service, waiter, 0);
SchedulingQueue queue(io_service, waiter, nullptr, 0);
int n_ok = 0;
int n_rej = 0;
auto fn_ok = [&n_ok]() { n_ok++; };
@ -102,7 +102,7 @@ TEST(SchedulingQueueTest, TestOutOfOrder) {
TEST(SchedulingQueueTest, TestSeqWaitTimeout) {
boost::asio::io_service io_service;
MockWaiter waiter;
SchedulingQueue queue(io_service, waiter, 0);
SchedulingQueue queue(io_service, waiter, nullptr, 0);
int n_ok = 0;
int n_rej = 0;
auto fn_ok = [&n_ok]() { n_ok++; };
@ -124,7 +124,7 @@ TEST(SchedulingQueueTest, TestSeqWaitTimeout) {
TEST(SchedulingQueueTest, TestSkipAlreadyProcessedByClient) {
boost::asio::io_service io_service;
MockWaiter waiter;
SchedulingQueue queue(io_service, waiter, 0);
SchedulingQueue queue(io_service, waiter, nullptr, 0);
int n_ok = 0;
int n_rej = 0;
auto fn_ok = [&n_ok]() { n_ok++; };

View file

@ -146,7 +146,7 @@ void CoreWorkerDirectActorTaskSubmitter::PushTask(
if (!status.ok()) {
// Note that this might be the __ray_terminate__ task, so we don't log
// loudly with ERROR here.
RAY_LOG(DEBUG) << "Task failed with error: " << status;
RAY_LOG(INFO) << "Task failed with error: " << status;
TreatTaskAsFailed(task_id, num_returns, rpc::ErrorType::ACTOR_DIED);
return;
}
@ -200,10 +200,12 @@ bool CoreWorkerDirectActorTaskSubmitter::IsActorAlive(const ActorID &actor_id) c
CoreWorkerDirectActorTaskReceiver::CoreWorkerDirectActorTaskReceiver(
WorkerContext &worker_context, boost::asio::io_service &main_io_service,
rpc::GrpcServer &server, const TaskHandler &task_handler)
rpc::GrpcServer &server, const TaskHandler &task_handler,
const std::function<void()> &exit_handler)
: worker_context_(worker_context),
task_service_(main_io_service, *this),
task_handler_(task_handler),
exit_handler_(exit_handler),
task_main_io_service_(main_io_service) {
server.RegisterService(task_service_);
}
@ -212,6 +214,15 @@ void CoreWorkerDirectActorTaskReceiver::Init(RayletClient &raylet_client) {
waiter_.reset(new DependencyWaiterImpl(raylet_client));
}
void CoreWorkerDirectActorTaskReceiver::SetMaxActorConcurrency(int max_concurrency) {
if (max_concurrency != max_concurrency_) {
RAY_LOG(INFO) << "Creating new thread pool of size " << max_concurrency;
RAY_CHECK(pool_ == nullptr) << "Cannot change max concurrency at runtime.";
pool_.reset(new BoundedExecutor(max_concurrency));
max_concurrency_ = max_concurrency;
}
}
void CoreWorkerDirectActorTaskReceiver::HandlePushTask(
const rpc::PushTaskRequest &request, rpc::PushTaskReply *reply,
rpc::SendReplyCallback send_reply_callback) {
@ -223,6 +234,7 @@ void CoreWorkerDirectActorTaskReceiver::HandlePushTask(
nullptr, nullptr);
return;
}
SetMaxActorConcurrency(worker_context_.CurrentActorMaxConcurrency());
// TODO(ekl) resolving object dependencies is expensive and requires an IPC to
// the raylet, which is a central bottleneck. In the future, we should inline
@ -238,8 +250,8 @@ void CoreWorkerDirectActorTaskReceiver::HandlePushTask(
auto it = scheduling_queue_.find(task_spec.CallerId());
if (it == scheduling_queue_.end()) {
auto result = scheduling_queue_.emplace(
task_spec.CallerId(), std::unique_ptr<SchedulingQueue>(
new SchedulingQueue(task_main_io_service_, *waiter_)));
task_spec.CallerId(), std::unique_ptr<SchedulingQueue>(new SchedulingQueue(
task_main_io_service_, *waiter_, pool_)));
it = result.first;
}
it->second->Add(
@ -256,6 +268,13 @@ void CoreWorkerDirectActorTaskReceiver::HandlePushTask(
ResourceMappingType resource_ids;
std::vector<std::shared_ptr<RayObject>> results;
auto status = task_handler_(task_spec, resource_ids, &results);
if (status.IsSystemExit()) {
// In Python, SystemExit cannot be raised except on the main thread. To work
// around this when we are executing tasks on worker threads, we re-post the
// exit event explicitly on the main thread.
task_main_io_service_.post([this]() { exit_handler_(); });
return;
}
RAY_CHECK(results.size() == num_returns) << results.size() << " " << num_returns;
for (size_t i = 0; i < results.size(); i++) {

View file

@ -1,11 +1,14 @@
#ifndef RAY_CORE_WORKER_DIRECT_ACTOR_TRANSPORT_H
#define RAY_CORE_WORKER_DIRECT_ACTOR_TRANSPORT_H
#include <boost/asio/thread_pool.hpp>
#include <boost/thread.hpp>
#include <list>
#include <set>
#include <utility>
#include "absl/base/thread_annotations.h"
#include "absl/synchronization/mutex.h"
#include "ray/common/id.h"
#include "ray/common/ray_object.h"
#include "ray/core_worker/context.h"
@ -180,16 +183,52 @@ class DependencyWaiterImpl : public DependencyWaiter {
RayletClient &raylet_client_;
};
/// Wraps a thread-pool to block posts until the pool has free slots. This is used
/// by the SchedulingQueue to provide backpressure to clients.
class BoundedExecutor {
public:
BoundedExecutor(int max_concurrency)
: num_running_(0), max_concurrency_(max_concurrency), pool_(max_concurrency){};
/// Posts work to the pool, blocking if no free threads are available.
void PostBlocking(std::function<void()> fn) {
mu_.LockWhen(absl::Condition(this, &BoundedExecutor::ThreadsAvailable));
num_running_ += 1;
mu_.Unlock();
boost::asio::post(pool_, [this, fn]() {
fn();
absl::MutexLock lock(&mu_);
num_running_ -= 1;
});
}
private:
bool ThreadsAvailable() EXCLUSIVE_LOCKS_REQUIRED(mu_) {
return num_running_ < max_concurrency_;
}
/// Protects access to the counters below.
absl::Mutex mu_;
/// The number of currently running tasks.
int num_running_ GUARDED_BY(mu_);
/// The max number of concurrently running tasks allowed.
const int max_concurrency_;
/// The underlying thread pool for running tasks.
boost::asio::thread_pool pool_;
};
/// Used to ensure serial order of task execution per actor handle.
/// See direct_actor.proto for a description of the ordering protocol.
class SchedulingQueue {
public:
SchedulingQueue(boost::asio::io_service &main_io_service, DependencyWaiter &waiter,
std::shared_ptr<BoundedExecutor> pool = nullptr,
int64_t reorder_wait_seconds = kMaxReorderWaitSeconds)
: wait_timer_(main_io_service),
waiter_(waiter),
reorder_wait_seconds_(reorder_wait_seconds),
main_thread_id_(boost::this_thread::get_id()) {}
main_thread_id_(boost::this_thread::get_id()),
pool_(pool) {}
void Add(int64_t seq_no, int64_t client_processed_up_to,
std::function<void()> accept_request, std::function<void()> reject_request,
@ -229,7 +268,12 @@ class SchedulingQueue {
while (!pending_tasks_.empty() && pending_tasks_.begin()->first == next_seq_no_ &&
pending_tasks_.begin()->second.CanExecute()) {
auto head = pending_tasks_.begin();
head->second.Accept();
auto request = head->second;
if (pool_ != nullptr) {
pool_->PostBlocking([request]() mutable { request.Accept(); });
} else {
request.Accept();
}
pending_tasks_.erase(head);
next_seq_no_++;
}
@ -270,12 +314,15 @@ class SchedulingQueue {
std::map<int64_t, InboundRequest> pending_tasks_;
/// The next sequence number we are waiting for to arrive.
int64_t next_seq_no_ = 0;
/// Timer for waiting on dependencies.
/// Timer for waiting on dependencies. Note that this is set on the task main
/// io service, which is fine since it only ever fires if no tasks are running.
boost::asio::deadline_timer wait_timer_;
/// The id of the thread that constructed this scheduling queue.
boost::thread::id main_thread_id_;
/// Reference to the waiter owned by the task receiver.
DependencyWaiter &waiter_;
/// If concurrent calls are allowed, holds the pool for executing these tasks.
std::shared_ptr<BoundedExecutor> pool_;
friend class SchedulingQueueTest;
};
@ -289,7 +336,8 @@ class CoreWorkerDirectActorTaskReceiver : public rpc::DirectActorHandler {
CoreWorkerDirectActorTaskReceiver(WorkerContext &worker_context,
boost::asio::io_service &main_io_service,
rpc::GrpcServer &server,
const TaskHandler &task_handler);
const TaskHandler &task_handler,
const std::function<void()> &exit_handler);
/// Initialize this receiver. This must be called prior to use.
void Init(RayletClient &client);
@ -312,6 +360,9 @@ class CoreWorkerDirectActorTaskReceiver : public rpc::DirectActorHandler {
rpc::DirectActorCallArgWaitCompleteReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Set the max concurrency at runtime. It cannot be changed once set.
void SetMaxActorConcurrency(int max_concurrency);
private:
// Worker context.
WorkerContext &worker_context_;
@ -319,6 +370,8 @@ class CoreWorkerDirectActorTaskReceiver : public rpc::DirectActorHandler {
rpc::DirectActorGrpcService task_service_;
/// The callback function to process a task.
TaskHandler task_handler_;
/// The callback function to exit the worker.
std::function<void()> exit_handler_;
/// The IO event loop for running tasks on.
boost::asio::io_service &task_main_io_service_;
/// Shared waiter for dependencies required by incoming tasks.
@ -326,6 +379,10 @@ class CoreWorkerDirectActorTaskReceiver : public rpc::DirectActorHandler {
/// Queue of pending requests per actor handle.
/// TODO(ekl) GC these queues once the handle is no longer active.
std::unordered_map<TaskID, std::unique_ptr<SchedulingQueue>> scheduling_queue_;
/// The max number of concurrent calls to allow.
int max_concurrency_ = 1;
/// If concurrent calls are allowed, holds the pool for executing these tasks.
std::shared_ptr<BoundedExecutor> pool_;
};
} // namespace ray

View file

@ -95,8 +95,10 @@ message ActorCreationTaskSpec {
repeated string dynamic_worker_options = 4;
// Whether direct actor call is used.
bool is_direct_call = 5;
// The max number of concurrent calls for direct call actors.
int32 max_concurrency = 6;
// Whether the actor is persistent
bool is_detached = 6;
bool is_detached = 7;
}
// Task spec of an actor task.