[Core] Support back pressure for actor tasks. (#20894)

Resubmit the PR https://github.com/ray-project/ray/pull/19936

I've figure out that the test case `//rllib:tests/test_gpus::test_gpus_in_local_mode` failed due to deadlock in local mode.
In local mode, if the user code submits another task during the executing of current task, the `CoreWorker::actor_task_mutex_` may cause deadlock.
The solution is quite simple, release the lock before executing task in local mode.

In the commit 7c2f61c76c:
1. Release the lock in local mode to fix the bug. @scv119 
2. `test_local_mode_deadlock` added to cover the case. @rkooo567 
3. Left a trivial change in `rllib/tests/test_gpus.py` to make the `RAY_CI_RLLIB_DIRECTLY_AFFECTED ` to take effect.
This commit is contained in:
WanXing Wang 2021-12-14 15:56:07 +08:00 committed by GitHub
parent f5dfe6c158
commit 72bd2d7e09
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
37 changed files with 633 additions and 147 deletions

View file

@ -46,10 +46,13 @@ ObjectID NativeTaskSubmitter::Submit(InvocationSpec &invocation,
TaskOptions options{};
options.name = call_options.name;
options.resources = call_options.resources;
std::vector<rpc::ObjectReference> return_refs;
std::optional<std::vector<rpc::ObjectReference>> return_refs;
if (invocation.task_type == TaskType::ACTOR_TASK) {
return_refs = core_worker.SubmitActorTask(
invocation.actor_id, BuildRayFunction(invocation), invocation.args, options);
if (!return_refs.has_value()) {
return ObjectID::Nil();
}
} else {
BundleID bundle_id = GetBundleID(call_options);
rpc::SchedulingStrategy scheduling_strategy;
@ -67,7 +70,7 @@ ObjectID NativeTaskSubmitter::Submit(InvocationSpec &invocation,
options, 1, false, scheduling_strategy, "");
}
std::vector<ObjectID> return_ids;
for (const auto &ref : return_refs) {
for (const auto &ref : return_refs.value()) {
return_ids.push_back(ObjectID::FromBinary(ref.object_id()));
}
return return_ids[0];

View file

@ -73,10 +73,10 @@ public class BaseActorCreator<T extends BaseActorCreator> {
/**
* Set the max number of concurrent calls to allow for this actor.
*
* <p>The max concurrency defaults to 1 for threaded execution. Note that the execution order is
* not guaranteed when {@code max_concurrency > 1}.
* <p>The maximum concurrency defaults to 1 for threaded execution. Note that the execution order
* is not guaranteed when {@code max_concurrency > 1}.
*
* @param maxConcurrency The max number of concurrent calls to allow for this actor.
* @param maxConcurrency The maximum number of concurrent calls to allow for this actor.
* @return self
* @see ActorCreationOptions.Builder#setMaxConcurrency(int)
*/
@ -85,6 +85,19 @@ public class BaseActorCreator<T extends BaseActorCreator> {
return self();
}
/**
* Set the max number of pending calls allowed on the actor handle. When this value is exceeded,
* ray.exceptions.PendingCallsLimitExceededException will be thrown for further tasks. Note that
* this limit is counted per handle. -1 means that the number of pending calls is unlimited.
*
* @param maxPendingCalls The maximum number of pending calls for this actor.
* @return self
*/
public T setMaxPendingCalls(int maxPendingCalls) {
builder.setMaxPendingCalls(maxPendingCalls);
return self();
}
/**
* Set the placement group to place this actor in.
*

View file

@ -17,6 +17,7 @@ public class ActorCreationOptions extends BaseTaskOptions {
public final PlacementGroup group;
public final int bundleIndex;
public final List<ConcurrencyGroup> concurrencyGroups;
public final int maxPendingCalls;
private ActorCreationOptions(
String name,
@ -26,7 +27,8 @@ public class ActorCreationOptions extends BaseTaskOptions {
int maxConcurrency,
PlacementGroup group,
int bundleIndex,
List<ConcurrencyGroup> concurrencyGroups) {
List<ConcurrencyGroup> concurrencyGroups,
int maxPendingCalls) {
super(resources);
this.name = name;
this.maxRestarts = maxRestarts;
@ -35,6 +37,7 @@ public class ActorCreationOptions extends BaseTaskOptions {
this.group = group;
this.bundleIndex = bundleIndex;
this.concurrencyGroups = concurrencyGroups;
this.maxPendingCalls = maxPendingCalls;
}
/** The inner class for building ActorCreationOptions. */
@ -47,6 +50,7 @@ public class ActorCreationOptions extends BaseTaskOptions {
private PlacementGroup group;
private int bundleIndex;
private List<ConcurrencyGroup> concurrencyGroups = new ArrayList<>();
private int maxPendingCalls = -1;
/**
* Set the actor name of a named actor. This named actor is accessible in this namespace by this
@ -132,6 +136,24 @@ public class ActorCreationOptions extends BaseTaskOptions {
return this;
}
/**
* Set the max number of pending calls allowed on the actor handle. When this value is exceeded,
* ray.exceptions.PendingCallsLimitExceededException will be thrown for further tasks. Note that
* this limit is counted per handle. -1 means that the number of pending calls is unlimited.
*
* @param maxPendingCalls The maximum number of pending calls for this actor.
* @return self
*/
public Builder setMaxPendingCalls(int maxPendingCalls) {
if (maxPendingCalls < -1 || maxPendingCalls == 0) {
throw new IllegalArgumentException(
"maxPendingCalls must be greater than 0, or -1 to disable.");
}
this.maxPendingCalls = maxPendingCalls;
return this;
}
/**
* Set the placement group to place this actor in.
*
@ -154,7 +176,8 @@ public class ActorCreationOptions extends BaseTaskOptions {
maxConcurrency,
group,
bundleIndex,
concurrencyGroups);
concurrencyGroups,
maxPendingCalls);
}
/** Set the concurrency groups for this actor. */

View file

@ -0,0 +1,17 @@
package io.ray.runtime.exception;
import io.ray.api.id.ActorId;
/**
* Indicates that the back pressure occurs when submitting an actor task.
*
* <p>This exception could happen probably because the caller calls the callee too frequently.
*/
public class PendingCallsLimitExceededException extends RayException {
public ActorId actorId;
public PendingCallsLimitExceededException(String message) {
super(message);
}
}

View file

@ -295,7 +295,8 @@ public class LocalModeTaskSubmitter implements TaskSubmitter {
ActorCreationTaskSpec.Builder actorCreationTaskSpecBuilder =
ActorCreationTaskSpec.newBuilder()
.setActorId(ByteString.copyFrom(actorId.toByteBuffer()))
.setMaxConcurrency(options.maxConcurrency);
.setMaxConcurrency(options.maxConcurrency)
.setMaxPendingCalls(options.maxPendingCalls);
appendConcurrencyGroupsBuilder(actorCreationTaskSpecBuilder, options);
TaskSpec taskSpec =
getTaskSpecBuilder(TaskType.ACTOR_CREATION_TASK, functionDescriptor, args)

View file

@ -0,0 +1,60 @@
package io.ray.test;
import io.ray.api.ActorHandle;
import io.ray.api.Ray;
import io.ray.api.id.ObjectId;
import io.ray.runtime.exception.PendingCallsLimitExceededException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Test(groups = {"cluster"})
public class BackPressureTest extends BaseTest {
private static final Logger LOGGER = LoggerFactory.getLogger(BackPressureTest.class);
@BeforeClass
public void setupJobConfig() {}
private static final ObjectId objectId = ObjectId.fromRandom();
public static String unblockSignalActor(ActorHandle<SignalActor> signal) {
signal.task(SignalActor::sendSignal).remote().get();
return null;
}
public void testBackPressure() {
/// set max concurrency to 11, 10 of them for executing waitSignal, and 1
/// of them for executing sendSignal.
ActorHandle<SignalActor> signalActor =
Ray.actor(SignalActor::new).setMaxConcurrency(11).setMaxPendingCalls(10).remote();
/// Ping the actor to insure the actor is alive already.
signalActor.task(SignalActor::ping).remote().get();
for (int i = 0; i < 10; i++) {
LOGGER.info("call waitSignal");
Assert.assertNotNull(signalActor.task(SignalActor::waitSignal).remote());
}
// Check backpressure occur.
boolean backPressure = false;
try {
LOGGER.info("call waitSignal");
signalActor.task(SignalActor::waitSignal).remote();
} catch (PendingCallsLimitExceededException e) {
LOGGER.info(e.toString());
backPressure = true;
} finally {
Assert.assertTrue(backPressure);
}
// Unblock signal actor, to make all backpressured raycall executed.
for (int i = 0; i < 10; i++) {
Ray.task(BackPressureTest::unblockSignalActor, signalActor).remote().get();
}
// Check the raycall is normal
signalActor.task(SignalActor::ping).remote().get();
}
}

View file

@ -25,4 +25,8 @@ public class SignalActor {
public static ActorHandle<SignalActor> create() {
return Ray.actor(SignalActor::new).setMaxConcurrency(2).remote();
}
public int ping() {
return 1;
}
}

View file

@ -102,6 +102,10 @@ from ray.includes.gcs_client cimport CGcsClient
from ray.includes.ray_config cimport RayConfig
from ray.includes.global_state_accessor cimport CGlobalStateAccessor
from ray.includes.optional cimport (
optional
)
import ray
from ray.exceptions import (
RayActorError,
@ -112,6 +116,7 @@ from ray.exceptions import (
GetTimeoutError,
TaskCancelledError,
AsyncioActorExit,
PendingCallsLimitExceeded,
)
from ray import external_storage
from ray.util.scheduling_strategies import (
@ -1522,6 +1527,7 @@ cdef class CoreWorker:
c_string extension_data,
c_string serialized_runtime_env,
concurrency_groups_dict,
int32_t max_pending_calls,
scheduling_strategy,
):
cdef:
@ -1561,7 +1567,8 @@ cdef class CoreWorker:
c_concurrency_groups,
# execute out of order for
# async or threaded actors.
is_asyncio or max_concurrency > 1),
is_asyncio or max_concurrency > 1,
max_pending_calls),
extension_data,
&c_actor_id))
@ -1642,7 +1649,7 @@ cdef class CoreWorker:
unordered_map[c_string, double] c_resources
CRayFunction ray_function
c_vector[unique_ptr[CTaskArg]] args_vector
c_vector[CObjectReference] return_refs
optional[c_vector[CObjectReference]] return_refs
with self.profile_event(b"submit_task"):
if num_method_cpus > 0:
@ -1659,8 +1666,25 @@ cdef class CoreWorker:
c_actor_id,
ray_function,
args_vector, CTaskOptions(name, num_returns, c_resources))
return VectorToObjectRefs(return_refs)
if return_refs.has_value():
return VectorToObjectRefs(return_refs.value())
else:
actor = self.get_actor_handle(actor_id)
actor_handle = (CCoreWorkerProcess.GetCoreWorker()
.GetActorHandle(c_actor_id))
raise PendingCallsLimitExceeded("The task {} could not be "
"submitted to {} because more "
"than {} tasks are queued on "
"the actor. This limit "
"can be adjusted with the "
"`max_pending_calls` actor "
"option.".format(
function_descriptor
.function_name,
repr(actor),
(dereference(actor_handle)
.MaxPendingCalls())
))
def kill_actor(self, ActorID actor_id, c_bool no_restart):
cdef:

View file

@ -489,6 +489,7 @@ class ActorClass:
placement_group_bundle_index=-1,
placement_group_capture_child_tasks=None,
runtime_env=None,
max_pending_calls=-1,
scheduling_strategy: SchedulingStrategyT = None):
"""Configures and overrides the actor instantiation parameters.
@ -547,6 +548,7 @@ class ActorClass:
placement_group_capture_child_tasks=(
placement_group_capture_child_tasks),
runtime_env=new_runtime_env,
max_pending_calls=max_pending_calls,
scheduling_strategy=scheduling_strategy)
return ActorOptionWrapper()
@ -571,6 +573,7 @@ class ActorClass:
placement_group_bundle_index=-1,
placement_group_capture_child_tasks=None,
runtime_env=None,
max_pending_calls=-1,
scheduling_strategy: SchedulingStrategyT = None):
"""Create an actor.
@ -624,6 +627,11 @@ class ActorClass:
this actor or task and its children (see
:ref:`runtime-environments` for details). This API is in beta
and may change before becoming stable.
max_pending_calls (int): Set the max number of pending calls
allowed on the actor handle. When this value is exceeded,
PendingCallsLimitExceeded will be raised for further tasks.
Note that this limit is counted per handle. -1 means that the
number of pending calls is unlimited.
scheduling_strategy: Strategy about how to schedule this actor.
Returns:
@ -671,6 +679,7 @@ class ActorClass:
placement_group_capture_child_tasks=(
placement_group_capture_child_tasks),
runtime_env=runtime_env,
max_pending_calls=max_pending_calls,
scheduling_strategy=scheduling_strategy)
worker = ray.worker.global_worker
@ -857,6 +866,7 @@ class ActorClass:
extension_data=str(actor_method_cpu),
serialized_runtime_env=new_runtime_env or "{}",
concurrency_groups_dict=concurrency_groups_dict or dict(),
max_pending_calls=max_pending_calls,
scheduling_strategy=scheduling_strategy)
actor_handle = ActorHandle(

View file

@ -454,6 +454,15 @@ class RuntimeEnvSetupError(RayError):
return "The runtime_env failed to be set up."
class PendingCallsLimitExceeded(RayError):
"""Raised when the pending actor calls exceeds `max_pending_calls` option.
This exception could happen probably because the caller calls the callee
too frequently.
"""
pass
RAY_EXCEPTION_TYPES = [
PlasmaObjectNotAvailable,
RayError,
@ -471,5 +480,6 @@ RAY_EXCEPTION_TYPES = [
GetTimeoutError,
AsyncioActorExit,
RuntimeEnvSetupError,
PendingCallsLimitExceeded,
LocalRayletDiedError,
]

View file

@ -269,7 +269,8 @@ cdef extern from "ray/core_worker/common.h" nogil:
const CSchedulingStrategy &scheduling_strategy,
c_string serialized_runtime_env,
const c_vector[CConcurrencyGroup] &concurrency_groups,
c_bool execute_out_of_order)
c_bool execute_out_of_order,
int32_t max_pending_calls)
cdef cppclass CPlacementGroupCreationOptions \
"ray::core::PlacementGroupCreationOptions":

View file

@ -50,6 +50,10 @@ from ray.includes.function_descriptor cimport (
CFunctionDescriptor,
)
from ray.includes.optional cimport (
optional
)
ctypedef unordered_map[c_string, c_vector[pair[int64_t, double]]] \
ResourceMappingType
@ -96,6 +100,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CLanguage ActorLanguage() const
CFunctionDescriptor ActorCreationTaskFunctionDescriptor() const
c_string ExtensionData() const
int MaxPendingCalls() const
cdef cppclass CCoreWorker "ray::core::CoreWorker":
void ConnectToRaylet()
@ -122,7 +127,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const CPlacementGroupID &placement_group_id)
CRayStatus WaitPlacementGroupReady(
const CPlacementGroupID &placement_group_id, int timeout_seconds)
c_vector[CObjectReference] SubmitActorTask(
optional[c_vector[CObjectReference]] SubmitActorTask(
const CActorID &actor_id, const CRayFunction &function,
const c_vector[unique_ptr[CTaskArg]] &args,
const CTaskOptions &options)

View file

@ -0,0 +1,36 @@
# Currently Cython does not support std::optional.
# See: https://github.com/cython/cython/pull/3294
from libcpp cimport bool
cdef extern from "<optional>" namespace "std" nogil:
cdef cppclass nullopt_t:
nullopt_t()
cdef nullopt_t nullopt
cdef cppclass optional[T]:
ctypedef T value_type
optional()
optional(nullopt_t)
optional(optional&) except +
optional(T&) except +
bool has_value()
T& value()
T& value_or[U](U& default_value)
void swap(optional&)
void reset()
T& emplace(...)
T& operator*()
# T* operator->() # Not Supported
optional& operator=(optional&)
optional& operator=[U](U&)
bool operator bool()
bool operator!()
bool operator==[U](optional&, U&)
bool operator!=[U](optional&, U&)
bool operator<[U](optional&, U&)
bool operator>[U](optional&, U&)
bool operator<=[U](optional&, U&)
bool operator>=[U](optional&, U&)
optional[T] make_optional[T](...) except +

View file

@ -2,6 +2,7 @@ import pytest
import ray
import subprocess
import sys
from ray._private.test_utils import Semaphore
@pytest.fixture
@ -96,6 +97,66 @@ def test_jemalloc_env_var_propagate():
assert actual == expected
def test_back_pressure(shutdown_only_with_initialization_check):
ray.init()
signal_actor = Semaphore.options(max_pending_calls=10).remote(value=0)
try:
for i in range(10):
signal_actor.acquire.remote()
except ray.exceptions.PendingCallsLimitExceeded:
assert False
with pytest.raises(ray.exceptions.PendingCallsLimitExceeded):
signal_actor.acquire.remote()
@ray.remote
def release(signal_actor):
ray.get(signal_actor.release.remote())
return 1
# Release signal actor through common task,
# because actor tasks will be back pressured
for i in range(10):
ray.get(release.remote(signal_actor))
# Check whether we can call remote actor normally after
# back presssure released.
try:
signal_actor.acquire.remote()
except ray.exceptions.PendingCallsLimitExceeded:
assert False
ray.shutdown()
def test_local_mode_deadlock(shutdown_only_with_initialization_check):
ray.init(local_mode=True)
@ray.remote
class Foo:
def __init__(self):
pass
def ping_actor(self, actor):
actor.ping.remote()
return 3
@ray.remote
class Bar:
def __init__(self):
pass
def ping(self):
return 1
foo = Foo.remote()
bar = Bar.remote()
# Expect ping_actor call returns normally without deadlock.
assert ray.get(foo.ping_actor.remote(bar)) == 3
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))

View file

@ -36,6 +36,7 @@ options = {
"placement_group_bundle_index": (),
"placement_group_capture_child_tasks": (),
"runtime_env": (),
"max_pending_calls": (),
"scheduling_strategy": (),
}

View file

@ -98,6 +98,7 @@ class TestGPUs(unittest.TestCase):
print("via ray.tune.run()")
tune.run(
"PG", config=config, stop={"training_iteration": 0})
ray.shutdown()

View file

@ -17,7 +17,8 @@ namespace core {
class MockCoreWorkerDirectActorTaskSubmitterInterface
: public CoreWorkerDirectActorTaskSubmitterInterface {
public:
MOCK_METHOD(void, AddActorQueueIfNotExists, (const ActorID &actor_id), (override));
MOCK_METHOD(void, AddActorQueueIfNotExists,
(const ActorID &actor_id, int32_t max_pending_calls), (override));
MOCK_METHOD(void, ConnectActor,
(const ActorID &actor_id, const rpc::Address &address,
int64_t num_restarts),

View file

@ -25,7 +25,8 @@ rpc::ActorHandle CreateInnerActorHandle(
const ObjectID &initial_cursor, const Language actor_language,
const FunctionDescriptor &actor_creation_task_function_descriptor,
const std::string &extension_data, int64_t max_task_retries, const std::string &name,
const std::string &ray_namespace, bool execute_out_of_order) {
const std::string &ray_namespace, int32_t max_pending_calls,
bool execute_out_of_order) {
rpc::ActorHandle inner;
inner.set_actor_id(actor_id.Data(), actor_id.Size());
inner.set_owner_id(owner_id.Binary());
@ -40,6 +41,7 @@ rpc::ActorHandle CreateInnerActorHandle(
inner.set_name(name);
inner.set_ray_namespace(ray_namespace);
inner.set_execute_out_of_order(execute_out_of_order);
inner.set_max_pending_calls(max_pending_calls);
return inner;
}
@ -70,6 +72,8 @@ rpc::ActorHandle CreateInnerActorHandleFromActorTableData(
actor_table_data.task_spec().actor_creation_task_spec().ray_namespace());
inner.set_execute_out_of_order(
actor_table_data.task_spec().actor_creation_task_spec().execute_out_of_order());
inner.set_max_pending_calls(
actor_table_data.task_spec().actor_creation_task_spec().max_pending_calls());
return inner;
}
} // namespace
@ -80,11 +84,12 @@ ActorHandle::ActorHandle(
const ObjectID &initial_cursor, const Language actor_language,
const FunctionDescriptor &actor_creation_task_function_descriptor,
const std::string &extension_data, int64_t max_task_retries, const std::string &name,
const std::string &ray_namespace, bool execute_out_of_order)
const std::string &ray_namespace, int32_t max_pending_calls,
bool execute_out_of_order)
: ActorHandle(CreateInnerActorHandle(
actor_id, owner_id, owner_address, job_id, initial_cursor, actor_language,
actor_creation_task_function_descriptor, extension_data, max_task_retries, name,
ray_namespace, execute_out_of_order)) {}
ray_namespace, max_pending_calls, execute_out_of_order)) {}
ActorHandle::ActorHandle(const std::string &serialized)
: ActorHandle(CreateInnerActorHandleFromString(serialized)) {}

View file

@ -38,7 +38,7 @@ class ActorHandle {
const FunctionDescriptor &actor_creation_task_function_descriptor,
const std::string &extension_data, int64_t max_task_retries,
const std::string &name, const std::string &ray_namespace,
bool execute_out_of_order = false);
int32_t max_pending_calls, bool execute_out_of_order = false);
/// Constructs an ActorHandle from a serialized string.
explicit ActorHandle(const std::string &serialized);
@ -89,6 +89,8 @@ class ActorHandle {
std::string GetNamespace() const;
int32_t MaxPendingCalls() const { return inner_.max_pending_calls(); }
bool ExecuteOutOfOrder() const { return inner_.execute_out_of_order(); }
private:

View file

@ -148,8 +148,8 @@ bool ActorManager::AddActorHandle(std::unique_ptr<ActorHandle> actor_handle,
const ObjectID &actor_creation_return_id,
bool is_self) {
reference_counter_->AddLocalReference(actor_creation_return_id, call_site);
direct_actor_submitter_->AddActorQueueIfNotExists(actor_id,
actor_handle->ExecuteOutOfOrder());
direct_actor_submitter_->AddActorQueueIfNotExists(
actor_id, actor_handle->MaxPendingCalls(), actor_handle->ExecuteOutOfOrder());
bool inserted;
{
absl::MutexLock lock(&mutex_);

View file

@ -91,7 +91,7 @@ struct ActorCreationOptions {
const rpc::SchedulingStrategy &scheduling_strategy,
const std::string &serialized_runtime_env = "{}",
const std::vector<ConcurrencyGroup> &concurrency_groups = {},
bool execute_out_of_order = false)
bool execute_out_of_order = false, int32_t max_pending_calls = -1)
: max_restarts(max_restarts),
max_task_retries(max_task_retries),
max_concurrency(max_concurrency),
@ -105,6 +105,7 @@ struct ActorCreationOptions {
serialized_runtime_env(serialized_runtime_env),
concurrency_groups(concurrency_groups.begin(), concurrency_groups.end()),
execute_out_of_order(execute_out_of_order),
max_pending_calls(max_pending_calls),
scheduling_strategy(scheduling_strategy){};
/// Maximum number of times that the actor should be restarted if it dies
@ -144,6 +145,8 @@ struct ActorCreationOptions {
const std::vector<ConcurrencyGroup> concurrency_groups;
/// Wether the actor execute tasks out of order.
const bool execute_out_of_order = false;
/// The maxmium actor call pending count.
const int max_pending_calls = -1;
// The strategy about how to schedule this actor.
rpc::SchedulingStrategy scheduling_strategy;
};

View file

@ -354,7 +354,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
direct_actor_submitter_ = std::shared_ptr<CoreWorkerDirectActorTaskSubmitter>(
new CoreWorkerDirectActorTaskSubmitter(*core_worker_client_pool_, *memory_store_,
*task_manager_, *actor_creator_,
on_excess_queueing));
on_excess_queueing, io_service_));
auto node_addr_factory = [this](const NodeID &node_id) {
absl::optional<rpc::Address> addr;
@ -1602,6 +1602,7 @@ Status CoreWorker::CreateActor(const RayFunction &function,
/*actor_cursor=*/ObjectID::FromIndex(actor_creation_task_id, 1),
function.GetLanguage(), function.GetFunctionDescriptor(), extension_data,
actor_creation_options.max_task_retries, actor_name, ray_namespace,
actor_creation_options.max_pending_calls,
actor_creation_options.execute_out_of_order);
std::string serialized_actor_handle;
actor_handle->Serialize(&serialized_actor_handle);
@ -1760,9 +1761,17 @@ Status CoreWorker::WaitPlacementGroupReady(const PlacementGroupID &placement_gro
return status_future.get();
}
std::vector<rpc::ObjectReference> CoreWorker::SubmitActorTask(
std::optional<std::vector<rpc::ObjectReference>> CoreWorker::SubmitActorTask(
const ActorID &actor_id, const RayFunction &function,
const std::vector<std::unique_ptr<TaskArg>> &args, const TaskOptions &task_options) {
absl::ReleasableMutexLock lock(&actor_task_mutex_);
/// Check whether backpressure may happen at the very beginning of submitting a task.
if (direct_actor_submitter_->PendingTasksFull(actor_id)) {
RAY_LOG(DEBUG) << "Back pressure occurred while submitting the task to " << actor_id
<< ". " << direct_actor_submitter_->DebugString(actor_id);
return std::nullopt;
}
auto actor_handle = actor_manager_->GetActorHandle(actor_id);
// Add one for actor cursor object id for tasks.
@ -1801,17 +1810,18 @@ std::vector<rpc::ObjectReference> CoreWorker::SubmitActorTask(
RAY_LOG(DEBUG) << "Submitting actor task " << task_spec.DebugString();
std::vector<rpc::ObjectReference> returned_refs;
if (options_.is_local_mode) {
/// NOTE: The lock should be released in local mode. The user code may
/// submit another task when executing the current task locally, which
/// cause deadlock. The code call chain is:
/// SubmitActorTask -> python user code -> actor.xx.remote() -> SubmitActorTask
lock.Release();
returned_refs = ExecuteTaskLocalMode(task_spec, actor_id);
} else {
returned_refs = task_manager_->AddPendingTask(
rpc_address_, task_spec, CurrentCallSite(), actor_handle->MaxTaskRetries());
io_service_.post(
[this, task_spec]() {
RAY_UNUSED(direct_actor_submitter_->SubmitTask(task_spec));
},
"CoreWorker.SubmitActorTask");
RAY_CHECK_OK(direct_actor_submitter_->SubmitTask(task_spec));
}
return returned_refs;
return {std::move(returned_refs)};
}
Status CoreWorker::CancelTask(const ObjectID &object_id, bool force_kill,

View file

@ -476,7 +476,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// \param[in] args Arguments of this task.
/// \param[in] task_options Options for this task.
/// \return ObjectRefs returned by this task.
std::vector<rpc::ObjectReference> SubmitActorTask(
std::optional<std::vector<rpc::ObjectReference>> SubmitActorTask(
const ActorID &actor_id, const RayFunction &function,
const std::vector<std::unique_ptr<TaskArg>> &args, const TaskOptions &task_options);
@ -1206,6 +1206,12 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
}
};
TaskCounter task_counter_;
/// Used to guarantee that submitting actor task is thread safe.
/// NOTE(MissiontoMars,scv119): In particular, without this mutex,
/// the checking and increasing of backpressure pending calls counter
/// is not atomic, which may lead to under counting or over counting.
absl::Mutex actor_task_mutex_;
};
} // namespace core

View file

@ -150,6 +150,7 @@ inline ActorCreationOptions ToActorCreationOptions(JNIEnv *env,
uint64_t max_concurrency = 1;
auto placement_options = std::make_pair(PlacementGroupID::Nil(), -1);
std::vector<ConcurrencyGroup> concurrency_groups;
int32_t max_pending_calls = -1;
if (actorCreationOptions) {
auto java_name = (jstring)env->GetObjectField(actorCreationOptions,
@ -214,6 +215,8 @@ inline ActorCreationOptions ToActorCreationOptions(JNIEnv *env,
return ray::ConcurrencyGroup{concurrency_group_name, max_concurrency,
native_func_descriptors};
});
max_pending_calls = static_cast<int32_t>(env->GetIntField(
actorCreationOptions, java_actor_creation_options_max_pending_calls));
}
// TODO(suquark): support passing namespace for Java. Currently
@ -243,7 +246,9 @@ inline ActorCreationOptions ToActorCreationOptions(JNIEnv *env,
/*is_asyncio=*/false,
/*scheduling_strategy=*/scheduling_strategy,
/*serialized_runtime_env=*/"{}",
concurrency_groups};
concurrency_groups,
/*execute_out_of_order*/ false,
max_pending_calls};
return actor_creation_options;
}
@ -369,8 +374,21 @@ Java_io_ray_runtime_task_NativeTaskSubmitter_nativeSubmitActorTask(
auto return_refs = CoreWorkerProcess::GetCoreWorker().SubmitActorTask(
actor_id, ray_function, task_args, task_options);
if (!return_refs.has_value()) {
std::stringstream ss;
ss << "The task " << ray_function.GetFunctionDescriptor()->ToString()
<< " could not be submitted to " << actor_id;
ss << " because more than "
<< CoreWorkerProcess::GetCoreWorker().GetActorHandle(actor_id)->MaxPendingCalls();
ss << " tasks are queued on the actor. This limit can be adjusted with the "
"`setMaxPendingCalls` actor option.";
env->ThrowNew(java_ray_pending_calls_limit_exceeded_exception_class,
ss.str().c_str());
return nullptr;
}
std::vector<ObjectID> return_ids;
for (const auto &ref : return_refs) {
for (const auto &ref : return_refs.value()) {
return_ids.push_back(ObjectID::FromBinary(ref.object_id()));
}

View file

@ -61,6 +61,8 @@ jclass java_ray_exception_class;
jclass java_ray_intentional_system_exit_exception_class;
jclass java_ray_timeout_exception_class;
jclass java_ray_pending_calls_limit_exceeded_exception_class;
jclass java_ray_actor_exception_class;
jmethodID java_ray_exception_to_bytes;
@ -102,6 +104,7 @@ jfieldID java_actor_creation_options_max_concurrency;
jfieldID java_actor_creation_options_group;
jfieldID java_actor_creation_options_bundle_index;
jfieldID java_actor_creation_options_concurrency_groups;
jfieldID java_actor_creation_options_max_pending_calls;
jclass java_placement_group_creation_options_class;
jclass java_placement_group_creation_options_strategy_class;
@ -218,6 +221,9 @@ jint JNI_OnLoad(JavaVM *vm, void *reserved) {
java_ray_actor_exception_class =
LoadClass(env, "io/ray/runtime/exception/RayActorException");
java_ray_pending_calls_limit_exceeded_exception_class =
LoadClass(env, "io/ray/runtime/exception/PendingCallsLimitExceededException");
java_ray_exception_to_bytes =
env->GetMethodID(java_ray_exception_class, "toBytes", "()[B");
@ -304,6 +310,8 @@ jint JNI_OnLoad(JavaVM *vm, void *reserved) {
env->GetFieldID(java_actor_creation_options_class, "bundleIndex", "I");
java_actor_creation_options_concurrency_groups = env->GetFieldID(
java_actor_creation_options_class, "concurrencyGroups", "Ljava/util/List;");
java_actor_creation_options_max_pending_calls =
env->GetFieldID(java_actor_creation_options_class, "maxPendingCalls", "I");
java_concurrency_group_impl_class =
LoadClass(env, "io/ray/runtime/ConcurrencyGroupImpl");
java_concurrency_group_impl_get_function_descriptors = env->GetMethodID(

View file

@ -104,6 +104,9 @@ extern jmethodID java_system_gc;
/// RayException class
extern jclass java_ray_exception_class;
/// PendingCallsLimitExceededException class
extern jclass java_ray_pending_calls_limit_exceeded_exception_class;
/// RayIntentionalSystemExitException class
extern jclass java_ray_intentional_system_exit_exception_class;
@ -184,6 +187,8 @@ extern jfieldID java_actor_creation_options_group;
extern jfieldID java_actor_creation_options_bundle_index;
/// concurrencyGroups field of ActorCreationOptions class
extern jfieldID java_actor_creation_options_concurrency_groups;
/// maxPendingCalls field of ActorCreationOptions class
extern jfieldID java_actor_creation_options_max_pending_calls;
/// ConcurrencyGroupImpl class
extern jclass java_concurrency_group_impl_class;

View file

@ -72,12 +72,13 @@ class MockGcsClient : public gcs::GcsClient {
class MockDirectActorSubmitter : public CoreWorkerDirectActorTaskSubmitterInterface {
public:
MockDirectActorSubmitter() : CoreWorkerDirectActorTaskSubmitterInterface() {}
void AddActorQueueIfNotExists(const ActorID &actor_id,
void AddActorQueueIfNotExists(const ActorID &actor_id, int32_t max_pending_calls,
bool execute_out_of_order = false) override {
AddActorQueueIfNotExists_(actor_id, execute_out_of_order);
AddActorQueueIfNotExists_(actor_id, max_pending_calls, execute_out_of_order);
}
MOCK_METHOD2(AddActorQueueIfNotExists_,
void(const ActorID &actor_id, bool execute_out_of_order));
MOCK_METHOD3(AddActorQueueIfNotExists_,
void(const ActorID &actor_id, int32_t max_pending_calls,
bool execute_out_of_order));
MOCK_METHOD3(ConnectActor, void(const ActorID &actor_id, const rpc::Address &address,
int64_t num_restarts));
MOCK_METHOD4(DisconnectActor, void(const ActorID &actor_id, int64_t num_restarts,
@ -146,7 +147,8 @@ class ActorManagerTest : public ::testing::Test {
auto actor_handle = absl::make_unique<ActorHandle>(
actor_id, TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(),
function.GetLanguage(), function.GetFunctionDescriptor(), "", 0, "", "", false);
function.GetLanguage(), function.GetFunctionDescriptor(), "", 0, "", "", -1,
false);
EXPECT_CALL(*reference_counter_, SetDeleteCallback(_, _))
.WillRepeatedly(testing::Return(true));
actor_manager_->AddNewActorHandle(move(actor_handle), call_site, caller_address,
@ -172,7 +174,7 @@ TEST_F(ActorManagerTest, TestAddAndGetActorHandleEndToEnd) {
FunctionDescriptorBuilder::BuildPython("", "", "", ""));
auto actor_handle = absl::make_unique<ActorHandle>(
actor_id, TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(),
function.GetLanguage(), function.GetFunctionDescriptor(), "", 0, "", "", false);
function.GetLanguage(), function.GetFunctionDescriptor(), "", 0, "", "", -1, false);
EXPECT_CALL(*reference_counter_, SetDeleteCallback(_, _))
.WillRepeatedly(testing::Return(true));
@ -185,7 +187,7 @@ TEST_F(ActorManagerTest, TestAddAndGetActorHandleEndToEnd) {
auto actor_handle2 = absl::make_unique<ActorHandle>(
actor_id, TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(),
function.GetLanguage(), function.GetFunctionDescriptor(), "", 0, "", "", false);
function.GetLanguage(), function.GetFunctionDescriptor(), "", 0, "", "", -1, false);
// Make sure the same actor id adding will return false.
ASSERT_FALSE(actor_manager_->AddNewActorHandle(move(actor_handle2), call_site,
caller_address, false));
@ -225,7 +227,7 @@ TEST_F(ActorManagerTest, RegisterActorHandles) {
FunctionDescriptorBuilder::BuildPython("", "", "", ""));
auto actor_handle = absl::make_unique<ActorHandle>(
actor_id, TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(),
function.GetLanguage(), function.GetFunctionDescriptor(), "", 0, "", "", false);
function.GetLanguage(), function.GetFunctionDescriptor(), "", 0, "", "", -1, false);
EXPECT_CALL(*reference_counter_, SetDeleteCallback(_, _))
.WillRepeatedly(testing::Return(true));
ObjectID outer_object_id = ObjectID::Nil();

View file

@ -216,8 +216,9 @@ int CoreWorkerTest::GetActorPid(const ActorID &actor_id,
RayFunction func{Language::PYTHON,
FunctionDescriptorBuilder::BuildPython("GetWorkerPid", "", "", "")};
auto return_ids = ObjectRefsToIds(
CoreWorkerProcess::GetCoreWorker().SubmitActorTask(actor_id, func, args, options));
auto return_ids = ObjectRefsToIds(CoreWorkerProcess::GetCoreWorker()
.SubmitActorTask(actor_id, func, args, options)
.value());
std::vector<std::shared_ptr<RayObject>> results;
RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().Get(return_ids, -1, &results));
@ -304,7 +305,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map<std::string, double> &reso
"MergeInputArgsAsOutput", "", "", ""));
auto return_ids =
ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options));
ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options).value());
ASSERT_EQ(return_ids.size(), 1);
std::vector<std::shared_ptr<RayObject>> results;
@ -347,7 +348,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map<std::string, double> &reso
RayFunction func(Language::PYTHON, FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
auto return_ids =
ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options));
ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options).value());
ASSERT_EQ(return_ids.size(), 1);
@ -410,7 +411,7 @@ void CoreWorkerTest::TestActorRestart(
"MergeInputArgsAsOutput", "", "", ""));
auto return_ids =
ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options));
ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options).value());
ASSERT_EQ(return_ids.size(), 1);
// Verify if it's expected data.
std::vector<std::shared_ptr<RayObject>> results;
@ -453,7 +454,7 @@ void CoreWorkerTest::TestActorFailure(
"MergeInputArgsAsOutput", "", "", ""));
auto return_ids =
ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options));
ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options).value());
ASSERT_EQ(return_ids.size(), 1);
all_results.emplace_back(std::make_pair(return_ids[0], buffer1));
@ -527,7 +528,7 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) {
ActorHandle actor_handle(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 1),
TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(),
function.GetLanguage(), function.GetFunctionDescriptor(), "",
0, "", "");
0, "", "", -1);
// Manually create `num_tasks` task specs, and for each of them create a
// `PushTaskRequest`, this is to batch performance of TaskSpec
@ -589,7 +590,7 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) {
"MergeInputArgsAsOutput", "", "", ""));
auto return_ids =
ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options));
ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options).value());
ASSERT_EQ(return_ids.size(), 1);
object_ids.emplace_back(return_ids[0]);
}
@ -647,7 +648,7 @@ TEST_F(ZeroNodeTest, TestActorHandle) {
ActorHandle original(
ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 0), TaskID::Nil(),
rpc::Address(), job_id, ObjectID::FromRandom(), Language::PYTHON,
FunctionDescriptorBuilder::BuildPython("", "", "", ""), "", 0, "", "");
FunctionDescriptorBuilder::BuildPython("", "", "", ""), "", 0, "", "", -1);
std::string output;
original.Serialize(&output);
ActorHandle deserialized(output);

View file

@ -26,6 +26,8 @@ namespace core {
using namespace ::testing;
class DirectTaskTransportTest : public ::testing::Test {
public:
DirectTaskTransportTest() : io_work(io_context) {}
void SetUp() override {
gcs_client = std::make_shared<ray::gcs::MockGcsClient>();
actor_creator = std::make_unique<DefaultActorCreator>(gcs_client);
@ -35,7 +37,7 @@ class DirectTaskTransportTest : public ::testing::Test {
[&](const rpc::Address &) { return nullptr; });
memory_store = std::make_unique<CoreWorkerMemoryStore>();
actor_task_submitter = std::make_unique<CoreWorkerDirectActorTaskSubmitter>(
*client_pool, *memory_store, *task_finisher, *actor_creator, nullptr);
*client_pool, *memory_store, *task_finisher, *actor_creator, nullptr, io_context);
}
TaskSpecification GetActorTaskSpec(const ActorID &actor_id) {
@ -57,6 +59,15 @@ class DirectTaskTransportTest : public ::testing::Test {
return TaskSpecification(task_spec);
}
protected:
bool CheckSubmitTask(TaskSpecification task) {
EXPECT_TRUE(actor_task_submitter->SubmitTask(task).ok());
return 1 == io_context.poll_one();
}
protected:
instrumented_io_context io_context;
boost::asio::io_service::work io_work;
std::unique_ptr<CoreWorkerDirectActorTaskSubmitter> actor_task_submitter;
std::shared_ptr<rpc::CoreWorkerClientPool> client_pool;
std::unique_ptr<CoreWorkerMemoryStore> memory_store;
@ -81,8 +92,8 @@ TEST_F(DirectTaskTransportTest, ActorRegisterFailure) {
::testing::Return(Status::OK())));
ASSERT_TRUE(actor_creator->AsyncRegisterActor(creation_task_spec, nullptr).ok());
ASSERT_TRUE(actor_creator->IsActorInRegistering(actor_id));
actor_task_submitter->AddActorQueueIfNotExists(actor_id);
ASSERT_TRUE(actor_task_submitter->SubmitTask(task_spec).ok());
actor_task_submitter->AddActorQueueIfNotExists(actor_id, -1);
ASSERT_TRUE(CheckSubmitTask(task_spec));
EXPECT_CALL(*task_finisher, FailOrRetryPendingTask(
task_spec.TaskId(),
rpc::ErrorType::DEPENDENCY_RESOLUTION_FAILED, _, _, _));
@ -105,8 +116,8 @@ TEST_F(DirectTaskTransportTest, ActorRegisterOk) {
::testing::Return(Status::OK())));
ASSERT_TRUE(actor_creator->AsyncRegisterActor(creation_task_spec, nullptr).ok());
ASSERT_TRUE(actor_creator->IsActorInRegistering(actor_id));
actor_task_submitter->AddActorQueueIfNotExists(actor_id);
ASSERT_TRUE(actor_task_submitter->SubmitTask(task_spec).ok());
actor_task_submitter->AddActorQueueIfNotExists(actor_id, -1);
ASSERT_TRUE(CheckSubmitTask(task_spec));
EXPECT_CALL(*task_finisher, FailOrRetryPendingTask(_, _, _, _, _)).Times(0);
register_cb(Status::OK());
}

View file

@ -104,10 +104,15 @@ class DirectActorSubmitterTest : public ::testing::TestWithParam<bool> {
worker_client_(std::make_shared<MockWorkerClient>()),
store_(std::make_shared<CoreWorkerMemoryStore>()),
task_finisher_(std::make_shared<MockTaskFinisherInterface>()),
submitter_(*client_pool_, *store_, *task_finisher_, actor_creator_,
[this](const ActorID &actor_id, int64_t num_queued) {
last_queue_warning_ = num_queued;
}) {}
io_work(io_context),
submitter_(
*client_pool_, *store_, *task_finisher_, actor_creator_,
[this](const ActorID &actor_id, int64_t num_queued) {
last_queue_warning_ = num_queued;
},
io_context) {}
void TearDown() override { io_context.stop(); }
int num_clients_connected_ = 0;
int64_t last_queue_warning_ = 0;
@ -116,7 +121,15 @@ class DirectActorSubmitterTest : public ::testing::TestWithParam<bool> {
std::shared_ptr<MockWorkerClient> worker_client_;
std::shared_ptr<CoreWorkerMemoryStore> store_;
std::shared_ptr<MockTaskFinisherInterface> task_finisher_;
instrumented_io_context io_context;
boost::asio::io_service::work io_work;
CoreWorkerDirectActorTaskSubmitter submitter_;
protected:
bool CheckSubmitTask(TaskSpecification task) {
EXPECT_TRUE(submitter_.SubmitTask(task).ok());
return 1 == io_context.poll_one();
}
};
TEST_P(DirectActorSubmitterTest, TestSubmitTask) {
@ -125,17 +138,17 @@ TEST_P(DirectActorSubmitterTest, TestSubmitTask) {
auto worker_id = WorkerID::FromRandom();
addr.set_worker_id(worker_id.Binary());
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id, execute_out_of_order);
submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order);
auto task = CreateActorTaskHelper(actor_id, worker_id, 0);
ASSERT_TRUE(submitter_.SubmitTask(task).ok());
ASSERT_TRUE(CheckSubmitTask(task));
ASSERT_EQ(worker_client_->callbacks.size(), 0);
submitter_.ConnectActor(actor_id, addr, 0);
ASSERT_EQ(worker_client_->callbacks.size(), 1);
task = CreateActorTaskHelper(actor_id, worker_id, 1);
ASSERT_TRUE(submitter_.SubmitTask(task).ok());
ASSERT_TRUE(CheckSubmitTask(task));
ASSERT_EQ(worker_client_->callbacks.size(), 2);
EXPECT_CALL(*task_finisher_, CompletePendingTask(_, _, _))
@ -159,26 +172,26 @@ TEST_P(DirectActorSubmitterTest, TestQueueingWarning) {
auto worker_id = WorkerID::FromRandom();
addr.set_worker_id(worker_id.Binary());
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id, execute_out_of_order);
submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order);
submitter_.ConnectActor(actor_id, addr, 0);
for (int i = 0; i < 7500; i++) {
auto task = CreateActorTaskHelper(actor_id, worker_id, i);
ASSERT_TRUE(submitter_.SubmitTask(task).ok());
ASSERT_TRUE(CheckSubmitTask(task));
worker_client_->acked_seqno = i;
}
ASSERT_EQ(last_queue_warning_, 0);
for (int i = 7500; i < 15000; i++) {
auto task = CreateActorTaskHelper(actor_id, worker_id, i);
ASSERT_TRUE(submitter_.SubmitTask(task).ok());
ASSERT_TRUE(CheckSubmitTask(task));
/* no ack */
}
ASSERT_EQ(last_queue_warning_, 5000);
for (int i = 15000; i < 35000; i++) {
auto task = CreateActorTaskHelper(actor_id, worker_id, i);
ASSERT_TRUE(submitter_.SubmitTask(task).ok());
ASSERT_TRUE(CheckSubmitTask(task));
/* no ack */
}
ASSERT_EQ(last_queue_warning_, 20000);
@ -190,7 +203,7 @@ TEST_P(DirectActorSubmitterTest, TestDependencies) {
auto worker_id = WorkerID::FromRandom();
addr.set_worker_id(worker_id.Binary());
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id, execute_out_of_order);
submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order);
submitter_.ConnectActor(actor_id, addr, 0);
ASSERT_EQ(worker_client_->callbacks.size(), 0);
@ -206,8 +219,8 @@ TEST_P(DirectActorSubmitterTest, TestDependencies) {
// Neither task can be submitted yet because they are still waiting on
// dependencies.
ASSERT_TRUE(submitter_.SubmitTask(task1).ok());
ASSERT_TRUE(submitter_.SubmitTask(task2).ok());
ASSERT_TRUE(CheckSubmitTask(task1));
ASSERT_TRUE(CheckSubmitTask(task2));
ASSERT_EQ(worker_client_->callbacks.size(), 0);
// Put the dependencies in the store in the same order as task submission.
@ -225,7 +238,7 @@ TEST_P(DirectActorSubmitterTest, TestOutOfOrderDependencies) {
auto worker_id = WorkerID::FromRandom();
addr.set_worker_id(worker_id.Binary());
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id, execute_out_of_order);
submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order);
submitter_.ConnectActor(actor_id, addr, 0);
ASSERT_EQ(worker_client_->callbacks.size(), 0);
@ -241,8 +254,8 @@ TEST_P(DirectActorSubmitterTest, TestOutOfOrderDependencies) {
// Neither task can be submitted yet because they are still waiting on
// dependencies.
ASSERT_TRUE(submitter_.SubmitTask(task1).ok());
ASSERT_TRUE(submitter_.SubmitTask(task2).ok());
ASSERT_TRUE(CheckSubmitTask(task1));
ASSERT_TRUE(CheckSubmitTask(task2));
ASSERT_EQ(worker_client_->callbacks.size(), 0);
if (execute_out_of_order) {
@ -275,7 +288,7 @@ TEST_P(DirectActorSubmitterTest, TestActorDead) {
auto worker_id = WorkerID::FromRandom();
addr.set_worker_id(worker_id.Binary());
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id, execute_out_of_order);
submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order);
submitter_.ConnectActor(actor_id, addr, 0);
ASSERT_EQ(worker_client_->callbacks.size(), 0);
@ -284,8 +297,8 @@ TEST_P(DirectActorSubmitterTest, TestActorDead) {
ObjectID obj = ObjectID::FromRandom();
auto task2 = CreateActorTaskHelper(actor_id, worker_id, 1);
task2.GetMutableMessage().add_args()->mutable_object_ref()->set_object_id(obj.Binary());
ASSERT_TRUE(submitter_.SubmitTask(task1).ok());
ASSERT_TRUE(submitter_.SubmitTask(task2).ok());
ASSERT_TRUE(CheckSubmitTask(task1));
ASSERT_TRUE(CheckSubmitTask(task2));
ASSERT_EQ(worker_client_->callbacks.size(), 1);
// Simulate the actor dying. All in-flight tasks should get failed.
@ -310,7 +323,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartNoRetry) {
auto worker_id = WorkerID::FromRandom();
addr.set_worker_id(worker_id.Binary());
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id, execute_out_of_order);
submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order);
addr.set_port(0);
submitter_.ConnectActor(actor_id, addr, 0);
ASSERT_EQ(worker_client_->callbacks.size(), 0);
@ -321,9 +334,9 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartNoRetry) {
auto task3 = CreateActorTaskHelper(actor_id, worker_id, 2);
auto task4 = CreateActorTaskHelper(actor_id, worker_id, 3);
// Submit three tasks.
ASSERT_TRUE(submitter_.SubmitTask(task1).ok());
ASSERT_TRUE(submitter_.SubmitTask(task2).ok());
ASSERT_TRUE(submitter_.SubmitTask(task3).ok());
ASSERT_TRUE(CheckSubmitTask(task1));
ASSERT_TRUE(CheckSubmitTask(task2));
ASSERT_TRUE(CheckSubmitTask(task3));
EXPECT_CALL(*task_finisher_, CompletePendingTask(task1.TaskId(), _, _)).Times(1);
EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _))
@ -344,7 +357,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartNoRetry) {
// Actor gets restarted.
addr.set_port(1);
submitter_.ConnectActor(actor_id, addr, 1);
ASSERT_TRUE(submitter_.SubmitTask(task4).ok());
ASSERT_TRUE(CheckSubmitTask(task4));
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK()));
ASSERT_TRUE(worker_client_->callbacks.empty());
if (execute_out_of_order) {
@ -362,7 +375,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartRetry) {
auto worker_id = WorkerID::FromRandom();
addr.set_worker_id(worker_id.Binary());
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id, execute_out_of_order);
submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order);
addr.set_port(0);
submitter_.ConnectActor(actor_id, addr, 0);
ASSERT_EQ(worker_client_->callbacks.size(), 0);
@ -373,9 +386,9 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartRetry) {
auto task3 = CreateActorTaskHelper(actor_id, worker_id, 2);
auto task4 = CreateActorTaskHelper(actor_id, worker_id, 3);
// Submit three tasks.
ASSERT_TRUE(submitter_.SubmitTask(task1).ok());
ASSERT_TRUE(submitter_.SubmitTask(task2).ok());
ASSERT_TRUE(submitter_.SubmitTask(task3).ok());
ASSERT_TRUE(CheckSubmitTask(task1));
ASSERT_TRUE(CheckSubmitTask(task2));
ASSERT_TRUE(CheckSubmitTask(task3));
// All tasks will eventually finish.
EXPECT_CALL(*task_finisher_, CompletePendingTask(_, _, _)).Times(4);
@ -399,10 +412,10 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartRetry) {
addr.set_port(1);
submitter_.ConnectActor(actor_id, addr, 1);
// A new task is submitted.
ASSERT_TRUE(submitter_.SubmitTask(task4).ok());
ASSERT_TRUE(CheckSubmitTask(task4));
// Tasks 2 and 3 get retried.
ASSERT_TRUE(submitter_.SubmitTask(task2).ok());
ASSERT_TRUE(submitter_.SubmitTask(task3).ok());
ASSERT_TRUE(CheckSubmitTask(task2));
ASSERT_TRUE(CheckSubmitTask(task3));
while (!worker_client_->callbacks.empty()) {
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK()));
}
@ -422,7 +435,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderRetry) {
auto worker_id = WorkerID::FromRandom();
addr.set_worker_id(worker_id.Binary());
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id, execute_out_of_order);
submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order);
addr.set_port(0);
submitter_.ConnectActor(actor_id, addr, 0);
ASSERT_EQ(worker_client_->callbacks.size(), 0);
@ -432,9 +445,9 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderRetry) {
auto task2 = CreateActorTaskHelper(actor_id, worker_id, 1);
auto task3 = CreateActorTaskHelper(actor_id, worker_id, 2);
// Submit three tasks.
ASSERT_TRUE(submitter_.SubmitTask(task1).ok());
ASSERT_TRUE(submitter_.SubmitTask(task2).ok());
ASSERT_TRUE(submitter_.SubmitTask(task3).ok());
ASSERT_TRUE(CheckSubmitTask(task1));
ASSERT_TRUE(CheckSubmitTask(task2));
ASSERT_TRUE(CheckSubmitTask(task3));
// All tasks will eventually finish.
EXPECT_CALL(*task_finisher_, CompletePendingTask(_, _, _)).Times(3);
@ -456,7 +469,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderRetry) {
if (execute_out_of_order) {
// Upon re-connect, task 2 (failed) should be both retried.
// Retry task 2 manually (simulating task_finisher and SendPendingTask's behavior)
ASSERT_TRUE(submitter_.SubmitTask(task2).ok());
ASSERT_TRUE(CheckSubmitTask(task2));
// Only task2 should be submitted.
ASSERT_EQ(worker_client_->callbacks.size(), 1);
@ -464,7 +477,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderRetry) {
// Upon re-connect, task 2 (failed) and 3 (completed) should be both retried.
// Retry task 2 manually (simulating task_finisher and SendPendingTask's behavior)
// Retry task 3 should happen via event loop
ASSERT_TRUE(submitter_.SubmitTask(task2).ok());
ASSERT_TRUE(CheckSubmitTask(task2));
// Both task2 and task3 should be submitted.
ASSERT_EQ(worker_client_->callbacks.size(), 2);
@ -482,7 +495,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) {
auto worker_id = WorkerID::FromRandom();
addr.set_worker_id(worker_id.Binary());
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id, execute_out_of_order);
submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order);
addr.set_port(0);
submitter_.ConnectActor(actor_id, addr, 0);
ASSERT_EQ(worker_client_->callbacks.size(), 0);
@ -491,7 +504,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) {
// Create four tasks for the actor.
auto task = CreateActorTaskHelper(actor_id, worker_id, 0);
// Submit a task.
ASSERT_TRUE(submitter_.SubmitTask(task).ok());
ASSERT_TRUE(CheckSubmitTask(task));
EXPECT_CALL(*task_finisher_, CompletePendingTask(task.TaskId(), _, _)).Times(1);
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK()));
@ -501,7 +514,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) {
ASSERT_EQ(num_clients_connected_, 2);
// Submit a task.
task = CreateActorTaskHelper(actor_id, worker_id, 1);
ASSERT_TRUE(submitter_.SubmitTask(task).ok());
ASSERT_TRUE(CheckSubmitTask(task));
EXPECT_CALL(*task_finisher_, CompletePendingTask(task.TaskId(), _, _)).Times(1);
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK()));
@ -510,7 +523,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) {
ASSERT_EQ(num_clients_connected_, 2);
// Submit a task.
task = CreateActorTaskHelper(actor_id, worker_id, 2);
ASSERT_TRUE(submitter_.SubmitTask(task).ok());
ASSERT_TRUE(CheckSubmitTask(task));
EXPECT_CALL(*task_finisher_, CompletePendingTask(task.TaskId(), _, _)).Times(1);
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK()));
@ -519,7 +532,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) {
ASSERT_EQ(num_clients_connected_, 2);
// Submit a task.
task = CreateActorTaskHelper(actor_id, worker_id, 3);
ASSERT_TRUE(submitter_.SubmitTask(task).ok());
ASSERT_TRUE(CheckSubmitTask(task));
EXPECT_CALL(*task_finisher_, CompletePendingTask(task.TaskId(), _, _)).Times(0);
ASSERT_FALSE(worker_client_->ReplyPushTask(Status::OK()));
@ -544,7 +557,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) {
task = CreateActorTaskHelper(actor_id, worker_id, 4);
EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task.TaskId(), _, _, _, _))
.Times(1);
ASSERT_TRUE(submitter_.SubmitTask(task).ok());
ASSERT_FALSE(CheckSubmitTask(task));
}
TEST_P(DirectActorSubmitterTest, TestActorRestartFailInflightTasks) {
@ -553,7 +566,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartFailInflightTasks) {
auto worker_id = WorkerID::FromRandom();
addr.set_worker_id(worker_id.Binary());
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id, execute_out_of_order);
submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order);
addr.set_port(0);
submitter_.ConnectActor(actor_id, addr, 0);
ASSERT_EQ(worker_client_->callbacks.size(), 0);
@ -564,13 +577,13 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartFailInflightTasks) {
auto task2 = CreateActorTaskHelper(actor_id, worker_id, 1);
auto task3 = CreateActorTaskHelper(actor_id, worker_id, 1);
// Submit a task.
ASSERT_TRUE(submitter_.SubmitTask(task1).ok());
ASSERT_TRUE(CheckSubmitTask(task1));
EXPECT_CALL(*task_finisher_, CompletePendingTask(task1.TaskId(), _, _)).Times(1);
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK()));
// Submit 2 tasks.
ASSERT_TRUE(submitter_.SubmitTask(task2).ok());
ASSERT_TRUE(submitter_.SubmitTask(task3).ok());
ASSERT_TRUE(CheckSubmitTask(task2));
ASSERT_TRUE(CheckSubmitTask(task3));
// Actor failed, but the task replies are delayed (or in some scenarios, lost).
// We should still be able to fail the inflight tasks.
EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _))
@ -593,6 +606,46 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartFailInflightTasks) {
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError("")));
}
TEST_P(DirectActorSubmitterTest, TestPendingTasks) {
auto execute_out_of_order = GetParam();
int32_t max_pending_calls = 10;
rpc::Address addr;
auto worker_id = WorkerID::FromRandom();
addr.set_worker_id(worker_id.Binary());
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id, max_pending_calls, execute_out_of_order);
addr.set_port(0);
// Submit number of `max_pending_calls` tasks would be OK.
for (int32_t i = 0; i < max_pending_calls; i++) {
ASSERT_FALSE(submitter_.PendingTasksFull(actor_id));
auto task = CreateActorTaskHelper(actor_id, worker_id, i);
ASSERT_TRUE(CheckSubmitTask(task));
}
// Then the queue should be full.
ASSERT_TRUE(submitter_.PendingTasksFull(actor_id));
ASSERT_EQ(worker_client_->callbacks.size(), 0);
submitter_.ConnectActor(actor_id, addr, 0);
ASSERT_EQ(worker_client_->callbacks.size(), 10);
// After task 0 reply comes, the queue turn to not full.
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK(), 0));
ASSERT_FALSE(submitter_.PendingTasksFull(actor_id));
// We can submit task 10, but after that the queue is full.
auto task = CreateActorTaskHelper(actor_id, worker_id, 10);
ASSERT_TRUE(CheckSubmitTask(task));
ASSERT_TRUE(submitter_.PendingTasksFull(actor_id));
// All the replies comes, the queue shouble be empty.
while (!worker_client_->callbacks.empty()) {
ASSERT_TRUE(worker_client_->ReplyPushTask());
}
ASSERT_FALSE(submitter_.PendingTasksFull(actor_id));
}
INSTANTIATE_TEST_SUITE_P(ExecuteOutOfOrder, DirectActorSubmitterTest,
::testing::Values(true, false));

View file

@ -26,11 +26,14 @@ namespace ray {
namespace core {
void CoreWorkerDirectActorTaskSubmitter::AddActorQueueIfNotExists(
const ActorID &actor_id, bool execute_out_of_order) {
const ActorID &actor_id, int32_t max_pending_calls, bool execute_out_of_order) {
absl::MutexLock lock(&mu_);
// No need to check whether the insert was successful, since it is possible
// for this worker to have multiple references to the same actor.
client_queues_.emplace(actor_id, ClientQueue(actor_id, execute_out_of_order));
RAY_LOG(INFO) << "Set max pending calls to " << max_pending_calls << " for actor "
<< actor_id;
client_queues_.emplace(actor_id,
ClientQueue(actor_id, execute_out_of_order, max_pending_calls));
}
void CoreWorkerDirectActorTaskSubmitter::KillActor(const ActorID &actor_id,
@ -80,32 +83,39 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe
// this sequence number.
send_pos = task_spec.ActorCounter();
RAY_CHECK(queue->second.actor_submit_queue->Emplace(send_pos, task_spec));
queue->second.cur_pending_calls++;
task_queued = true;
}
}
if (task_queued) {
// We must release the lock before resolving the task dependencies since
// the callback may get called in the same call stack.
resolver_.ResolveDependencies(task_spec, [this, send_pos, actor_id](Status status) {
absl::MutexLock lock(&mu_);
auto queue = client_queues_.find(actor_id);
RAY_CHECK(queue != client_queues_.end());
auto &actor_submit_queue = queue->second.actor_submit_queue;
// Only dispatch tasks if the submitted task is still queued. The task
// may have been dequeued if the actor has since failed.
if (actor_submit_queue->Contains(send_pos)) {
if (status.ok()) {
actor_submit_queue->MarkDependencyResolved(send_pos);
SendPendingTasks(actor_id);
} else {
auto task_id = actor_submit_queue->Get(send_pos).first.TaskId();
actor_submit_queue->MarkDependencyFailed(send_pos);
task_finisher_.FailOrRetryPendingTask(
task_id, rpc::ErrorType::DEPENDENCY_RESOLUTION_FAILED, &status);
}
}
});
io_service_.post(
[task_spec, send_pos, this]() mutable {
// We must release the lock before resolving the task dependencies since
// the callback may get called in the same call stack.
auto actor_id = task_spec.ActorId();
resolver_.ResolveDependencies(
task_spec, [this, send_pos, actor_id](Status status) {
absl::MutexLock lock(&mu_);
auto queue = client_queues_.find(actor_id);
RAY_CHECK(queue != client_queues_.end());
auto &actor_submit_queue = queue->second.actor_submit_queue;
// Only dispatch tasks if the submitted task is still queued. The task
// may have been dequeued if the actor has since failed.
if (actor_submit_queue->Contains(send_pos)) {
if (status.ok()) {
actor_submit_queue->MarkDependencyResolved(send_pos);
SendPendingTasks(actor_id);
} else {
auto task_id = actor_submit_queue->Get(send_pos).first.TaskId();
actor_submit_queue->MarkDependencyFailed(send_pos);
task_finisher_.FailOrRetryPendingTask(
task_id, rpc::ErrorType::DEPENDENCY_RESOLUTION_FAILED, &status);
}
}
});
},
"CoreWorkerDirectActorTaskSubmitter::SubmitTask");
} else {
// Do not hold the lock while calling into task_finisher_.
task_finisher_.MarkTaskCanceled(task_id);
@ -291,17 +301,26 @@ void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(
}
void CoreWorkerDirectActorTaskSubmitter::CheckTimeoutTasks() {
absl::MutexLock lock(&mu_);
for (auto &queue_pair : client_queues_) {
auto &queue = queue_pair.second;
auto deque_itr = queue.wait_for_death_info_tasks.begin();
while (deque_itr != queue.wait_for_death_info_tasks.end() &&
/*timeout timestamp*/ deque_itr->first < current_time_ms()) {
auto task_spec = deque_itr->second;
task_finisher_.MarkTaskReturnObjectsFailed(task_spec, rpc::ErrorType::ACTOR_DIED);
deque_itr = queue.wait_for_death_info_tasks.erase(deque_itr);
std::vector<TaskSpecification> task_specs;
{
absl::MutexLock lock(&mu_);
for (auto &queue_pair : client_queues_) {
auto &queue = queue_pair.second;
auto deque_itr = queue.wait_for_death_info_tasks.begin();
while (deque_itr != queue.wait_for_death_info_tasks.end() &&
/*timeout timestamp*/ deque_itr->first < current_time_ms()) {
auto &task_spec = deque_itr->second;
task_specs.push_back(task_spec);
deque_itr = queue.wait_for_death_info_tasks.erase(deque_itr);
}
}
}
// Do not hold mu_, because MarkTaskReturnObjectsFailed may call python from cpp,
// and may cause deadlock with SubmitActorTask thread when aquire GIL.
for (auto &task_spec : task_specs) {
task_finisher_.MarkTaskReturnObjectsFailed(task_spec, rpc::ErrorType::ACTOR_DIED);
}
}
void CoreWorkerDirectActorTaskSubmitter::SendPendingTasks(const ActorID &actor_id) {
@ -427,14 +446,15 @@ void CoreWorkerDirectActorTaskSubmitter::PushActorTask(ClientQueue &queue,
<< ", wait queue size=" << queue.wait_for_death_info_tasks.size();
}
}
if (!will_retry) {
// If we don't need to retry, mark the task as completed.
{
absl::MutexLock lock(&mu_);
auto queue_pair = client_queues_.find(actor_id);
RAY_CHECK(queue_pair != client_queues_.end());
auto &queue = queue_pair->second;
queue.actor_submit_queue->MarkTaskCompleted(actor_counter, task_spec);
if (!will_retry) {
queue.actor_submit_queue->MarkTaskCompleted(actor_counter, task_spec);
}
queue.cur_pending_calls--;
}
};
@ -468,5 +488,25 @@ bool CoreWorkerDirectActorTaskSubmitter::IsActorAlive(const ActorID &actor_id) c
auto iter = client_queues_.find(actor_id);
return (iter != client_queues_.end() && iter->second.rpc_client);
}
bool CoreWorkerDirectActorTaskSubmitter::PendingTasksFull(const ActorID &actor_id) const {
absl::MutexLock lock(&mu_);
auto it = client_queues_.find(actor_id);
RAY_CHECK(it != client_queues_.end());
return it->second.max_pending_calls > 0 &&
it->second.cur_pending_calls >= it->second.max_pending_calls;
}
std::string CoreWorkerDirectActorTaskSubmitter::DebugString(
const ActorID &actor_id) const {
absl::MutexLock lock(&mu_);
auto it = client_queues_.find(actor_id);
RAY_CHECK(it != client_queues_.end());
std::ostringstream stream;
stream << "Submitter debug string for actor " << actor_id << " "
<< it->second.DebugString();
return stream.str();
}
} // namespace core
} // namespace ray

View file

@ -46,6 +46,7 @@ namespace core {
class CoreWorkerDirectActorTaskSubmitterInterface {
public:
virtual void AddActorQueueIfNotExists(const ActorID &actor_id,
int32_t max_pending_calls,
bool execute_out_of_order = false) = 0;
virtual void ConnectActor(const ActorID &actor_id, const rpc::Address &address,
int64_t num_restarts) = 0;
@ -65,11 +66,13 @@ class CoreWorkerDirectActorTaskSubmitter
CoreWorkerDirectActorTaskSubmitter(
rpc::CoreWorkerClientPool &core_worker_client_pool, CoreWorkerMemoryStore &store,
TaskFinisherInterface &task_finisher, ActorCreatorInterface &actor_creator,
std::function<void(const ActorID &, int64_t)> warn_excess_queueing)
std::function<void(const ActorID &, int64_t)> warn_excess_queueing,
instrumented_io_context &io_service)
: core_worker_client_pool_(core_worker_client_pool),
resolver_(store, task_finisher, actor_creator),
task_finisher_(task_finisher),
warn_excess_queueing_(warn_excess_queueing) {
warn_excess_queueing_(warn_excess_queueing),
io_service_(io_service) {
next_queueing_warn_threshold_ =
::RayConfig::instance().actor_excess_queueing_warn_threshold();
}
@ -80,12 +83,14 @@ class CoreWorkerDirectActorTaskSubmitter
/// not receive another reference to the same actor.
///
/// \param[in] actor_id The actor for whom to add a queue.
void AddActorQueueIfNotExists(const ActorID &actor_id,
/// \param[in] max_pending_calls The max pending calls for the actor to be added.
void AddActorQueueIfNotExists(const ActorID &actor_id, int32_t max_pending_calls,
bool execute_out_of_order = false);
/// Submit a task to an actor for execution.
///
/// \param[in] task The task spec to submit.
/// \param[in] task_spec The task spec to submit.
///
/// \return Status::Invalid if the task is not yet supported.
Status SubmitTask(TaskSpecification task_spec);
@ -126,9 +131,23 @@ class CoreWorkerDirectActorTaskSubmitter
/// Check timeout tasks that are waiting for Death info.
void CheckTimeoutTasks();
/// If the the number of tasks in requests is greater than or equal to
/// max_pending_calls.
///
/// \param[in] actor_id Actor id.
/// \return Whether the corresponding client queue is full or not.
bool PendingTasksFull(const ActorID &actor_id) const;
/// Returns debug string for class.
///
/// \param[in] actor_id The actor whose debug string to return.
/// \return string.
std::string DebugString(const ActorID &actor_id) const;
private:
struct ClientQueue {
ClientQueue(ActorID actor_id, bool execute_out_of_order) {
ClientQueue(ActorID actor_id, bool execute_out_of_order, int32_t max_pending_calls)
: max_pending_calls(max_pending_calls) {
if (execute_out_of_order) {
actor_submit_queue = std::make_unique<OutofOrderActorSubmitQueue>(actor_id);
} else {
@ -172,6 +191,24 @@ class CoreWorkerDirectActorTaskSubmitter
/// without replies.
std::unordered_map<TaskID, rpc::ClientCallback<rpc::PushTaskReply>>
inflight_task_callbacks;
/// The max number limit of task capacity used for back pressure.
/// If the number of tasks in requests >= max_pending_calls, it can't continue to
/// push task to ClientQueue.
const int32_t max_pending_calls;
/// The current task number in this client queue.
int32_t cur_pending_calls = 0;
/// Returns debug string for class.
///
/// \return string.
std::string DebugString() const {
std::ostringstream stream;
stream << "max_pending_calls=" << max_pending_calls
<< " cur_pending_calls=" << cur_pending_calls;
return stream.str();
}
};
/// Push a task to a remote actor via the given client.
@ -234,6 +271,9 @@ class CoreWorkerDirectActorTaskSubmitter
/// exceeds this quantity. This threshold is doubled each time it is hit.
int64_t next_queueing_warn_threshold_;
/// The event loop where the actor task events are handled.
instrumented_io_context &io_service_;
friend class CoreWorkerTest;
};

View file

@ -29,6 +29,7 @@
#include "ray/common/id.h"
#include "ray/common/ray_object.h"
#include "ray/core_worker/actor_creator.h"
#include "ray/core_worker/actor_handle.h"
#include "ray/core_worker/context.h"
#include "ray/core_worker/fiber.h"
#include "ray/core_worker/store_provider/memory_store/memory_store.h"

View file

@ -373,6 +373,8 @@ message ActorCreationTaskSpec {
repeated ConcurrencyGroup concurrency_groups = 13;
// Whether to enable out of order execution.
bool execute_out_of_order = 14;
// The max number of pending actor calls.
int32 max_pending_calls = 15;
}
// Task spec of an actor task.

View file

@ -64,6 +64,9 @@ message ActorHandle {
// Whether the actor supports out of order execution.
bool execute_out_of_order = 12;
// The max number of pending actor calls.
int32 max_pending_calls = 13;
}
message ReturnObject {

View file

@ -13,6 +13,7 @@
// limitations under the License.
#include "ray/streaming/streaming.h"
#include "ray/core_worker/core_worker.h"
namespace ray {
@ -44,8 +45,12 @@ std::vector<rpc::ObjectReference> SendInternal(const ActorID &peer_actor_id,
std::move(buffer), meta, std::vector<rpc::ObjectReference>(), true)));
std::vector<std::shared_ptr<RayObject>> results;
return CoreWorkerProcess::GetCoreWorker().SubmitActorTask(peer_actor_id, function, args,
options);
auto result = CoreWorkerProcess::GetCoreWorker().SubmitActorTask(
peer_actor_id, function, args, options);
if (!result.has_value()) {
RAY_CHECK(false) << "Back pressure should not be enabled.";
}
return result.value();
}
} // namespace streaming
} // namespace ray

View file

@ -117,7 +117,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam<uint64_t> {
"", "", "check_current_test_status", "")};
auto return_refs = driver.SubmitActorTask(actor_id, func, args, options);
auto return_ids = ObjectRefsToIds(return_refs);
auto return_ids = ObjectRefsToIds(return_refs.value());
std::vector<bool> wait_results;
std::vector<std::shared_ptr<RayObject>> results;