diff --git a/cpp/src/ray/runtime/task/native_task_submitter.cc b/cpp/src/ray/runtime/task/native_task_submitter.cc index 7eead31e9..9b2f69d77 100644 --- a/cpp/src/ray/runtime/task/native_task_submitter.cc +++ b/cpp/src/ray/runtime/task/native_task_submitter.cc @@ -46,10 +46,13 @@ ObjectID NativeTaskSubmitter::Submit(InvocationSpec &invocation, TaskOptions options{}; options.name = call_options.name; options.resources = call_options.resources; - std::vector return_refs; + std::optional> return_refs; if (invocation.task_type == TaskType::ACTOR_TASK) { return_refs = core_worker.SubmitActorTask( invocation.actor_id, BuildRayFunction(invocation), invocation.args, options); + if (!return_refs.has_value()) { + return ObjectID::Nil(); + } } else { BundleID bundle_id = GetBundleID(call_options); rpc::SchedulingStrategy scheduling_strategy; @@ -67,7 +70,7 @@ ObjectID NativeTaskSubmitter::Submit(InvocationSpec &invocation, options, 1, false, scheduling_strategy, ""); } std::vector return_ids; - for (const auto &ref : return_refs) { + for (const auto &ref : return_refs.value()) { return_ids.push_back(ObjectID::FromBinary(ref.object_id())); } return return_ids[0]; diff --git a/java/api/src/main/java/io/ray/api/call/BaseActorCreator.java b/java/api/src/main/java/io/ray/api/call/BaseActorCreator.java index a3159571b..098b89e19 100644 --- a/java/api/src/main/java/io/ray/api/call/BaseActorCreator.java +++ b/java/api/src/main/java/io/ray/api/call/BaseActorCreator.java @@ -73,10 +73,10 @@ public class BaseActorCreator { /** * Set the max number of concurrent calls to allow for this actor. * - *

The max concurrency defaults to 1 for threaded execution. Note that the execution order is - * not guaranteed when {@code max_concurrency > 1}. + *

The maximum concurrency defaults to 1 for threaded execution. Note that the execution order + * is not guaranteed when {@code max_concurrency > 1}. * - * @param maxConcurrency The max number of concurrent calls to allow for this actor. + * @param maxConcurrency The maximum number of concurrent calls to allow for this actor. * @return self * @see ActorCreationOptions.Builder#setMaxConcurrency(int) */ @@ -85,6 +85,19 @@ public class BaseActorCreator { return self(); } + /** + * Set the max number of pending calls allowed on the actor handle. When this value is exceeded, + * ray.exceptions.PendingCallsLimitExceededException will be thrown for further tasks. Note that + * this limit is counted per handle. -1 means that the number of pending calls is unlimited. + * + * @param maxPendingCalls The maximum number of pending calls for this actor. + * @return self + */ + public T setMaxPendingCalls(int maxPendingCalls) { + builder.setMaxPendingCalls(maxPendingCalls); + return self(); + } + /** * Set the placement group to place this actor in. * diff --git a/java/api/src/main/java/io/ray/api/options/ActorCreationOptions.java b/java/api/src/main/java/io/ray/api/options/ActorCreationOptions.java index 69410f780..f1e4f7ccf 100644 --- a/java/api/src/main/java/io/ray/api/options/ActorCreationOptions.java +++ b/java/api/src/main/java/io/ray/api/options/ActorCreationOptions.java @@ -17,6 +17,7 @@ public class ActorCreationOptions extends BaseTaskOptions { public final PlacementGroup group; public final int bundleIndex; public final List concurrencyGroups; + public final int maxPendingCalls; private ActorCreationOptions( String name, @@ -26,7 +27,8 @@ public class ActorCreationOptions extends BaseTaskOptions { int maxConcurrency, PlacementGroup group, int bundleIndex, - List concurrencyGroups) { + List concurrencyGroups, + int maxPendingCalls) { super(resources); this.name = name; this.maxRestarts = maxRestarts; @@ -35,6 +37,7 @@ public class ActorCreationOptions extends BaseTaskOptions { this.group = group; this.bundleIndex = bundleIndex; this.concurrencyGroups = concurrencyGroups; + this.maxPendingCalls = maxPendingCalls; } /** The inner class for building ActorCreationOptions. */ @@ -47,6 +50,7 @@ public class ActorCreationOptions extends BaseTaskOptions { private PlacementGroup group; private int bundleIndex; private List concurrencyGroups = new ArrayList<>(); + private int maxPendingCalls = -1; /** * Set the actor name of a named actor. This named actor is accessible in this namespace by this @@ -132,6 +136,24 @@ public class ActorCreationOptions extends BaseTaskOptions { return this; } + /** + * Set the max number of pending calls allowed on the actor handle. When this value is exceeded, + * ray.exceptions.PendingCallsLimitExceededException will be thrown for further tasks. Note that + * this limit is counted per handle. -1 means that the number of pending calls is unlimited. + * + * @param maxPendingCalls The maximum number of pending calls for this actor. + * @return self + */ + public Builder setMaxPendingCalls(int maxPendingCalls) { + if (maxPendingCalls < -1 || maxPendingCalls == 0) { + throw new IllegalArgumentException( + "maxPendingCalls must be greater than 0, or -1 to disable."); + } + + this.maxPendingCalls = maxPendingCalls; + return this; + } + /** * Set the placement group to place this actor in. * @@ -154,7 +176,8 @@ public class ActorCreationOptions extends BaseTaskOptions { maxConcurrency, group, bundleIndex, - concurrencyGroups); + concurrencyGroups, + maxPendingCalls); } /** Set the concurrency groups for this actor. */ diff --git a/java/runtime/src/main/java/io/ray/runtime/exception/PendingCallsLimitExceededException.java b/java/runtime/src/main/java/io/ray/runtime/exception/PendingCallsLimitExceededException.java new file mode 100644 index 000000000..58f859b73 --- /dev/null +++ b/java/runtime/src/main/java/io/ray/runtime/exception/PendingCallsLimitExceededException.java @@ -0,0 +1,17 @@ +package io.ray.runtime.exception; + +import io.ray.api.id.ActorId; + +/** + * Indicates that the back pressure occurs when submitting an actor task. + * + *

This exception could happen probably because the caller calls the callee too frequently. + */ +public class PendingCallsLimitExceededException extends RayException { + + public ActorId actorId; + + public PendingCallsLimitExceededException(String message) { + super(message); + } +} diff --git a/java/runtime/src/main/java/io/ray/runtime/task/LocalModeTaskSubmitter.java b/java/runtime/src/main/java/io/ray/runtime/task/LocalModeTaskSubmitter.java index 390f49712..f90bec390 100644 --- a/java/runtime/src/main/java/io/ray/runtime/task/LocalModeTaskSubmitter.java +++ b/java/runtime/src/main/java/io/ray/runtime/task/LocalModeTaskSubmitter.java @@ -295,7 +295,8 @@ public class LocalModeTaskSubmitter implements TaskSubmitter { ActorCreationTaskSpec.Builder actorCreationTaskSpecBuilder = ActorCreationTaskSpec.newBuilder() .setActorId(ByteString.copyFrom(actorId.toByteBuffer())) - .setMaxConcurrency(options.maxConcurrency); + .setMaxConcurrency(options.maxConcurrency) + .setMaxPendingCalls(options.maxPendingCalls); appendConcurrencyGroupsBuilder(actorCreationTaskSpecBuilder, options); TaskSpec taskSpec = getTaskSpecBuilder(TaskType.ACTOR_CREATION_TASK, functionDescriptor, args) diff --git a/java/test/src/main/java/io/ray/test/BackPressureTest.java b/java/test/src/main/java/io/ray/test/BackPressureTest.java new file mode 100644 index 000000000..953cf7afb --- /dev/null +++ b/java/test/src/main/java/io/ray/test/BackPressureTest.java @@ -0,0 +1,60 @@ +package io.ray.test; + +import io.ray.api.ActorHandle; +import io.ray.api.Ray; +import io.ray.api.id.ObjectId; +import io.ray.runtime.exception.PendingCallsLimitExceededException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Test(groups = {"cluster"}) +public class BackPressureTest extends BaseTest { + private static final Logger LOGGER = LoggerFactory.getLogger(BackPressureTest.class); + + @BeforeClass + public void setupJobConfig() {} + + private static final ObjectId objectId = ObjectId.fromRandom(); + + public static String unblockSignalActor(ActorHandle signal) { + signal.task(SignalActor::sendSignal).remote().get(); + return null; + } + + public void testBackPressure() { + /// set max concurrency to 11, 10 of them for executing waitSignal, and 1 + /// of them for executing sendSignal. + ActorHandle signalActor = + Ray.actor(SignalActor::new).setMaxConcurrency(11).setMaxPendingCalls(10).remote(); + /// Ping the actor to insure the actor is alive already. + signalActor.task(SignalActor::ping).remote().get(); + + for (int i = 0; i < 10; i++) { + LOGGER.info("call waitSignal"); + Assert.assertNotNull(signalActor.task(SignalActor::waitSignal).remote()); + } + + // Check backpressure occur. + boolean backPressure = false; + try { + LOGGER.info("call waitSignal"); + signalActor.task(SignalActor::waitSignal).remote(); + } catch (PendingCallsLimitExceededException e) { + LOGGER.info(e.toString()); + backPressure = true; + } finally { + Assert.assertTrue(backPressure); + } + + // Unblock signal actor, to make all backpressured raycall executed. + for (int i = 0; i < 10; i++) { + Ray.task(BackPressureTest::unblockSignalActor, signalActor).remote().get(); + } + + // Check the raycall is normal + signalActor.task(SignalActor::ping).remote().get(); + } +} diff --git a/java/test/src/main/java/io/ray/test/SignalActor.java b/java/test/src/main/java/io/ray/test/SignalActor.java index 0f74a0cfa..23ade2514 100644 --- a/java/test/src/main/java/io/ray/test/SignalActor.java +++ b/java/test/src/main/java/io/ray/test/SignalActor.java @@ -25,4 +25,8 @@ public class SignalActor { public static ActorHandle create() { return Ray.actor(SignalActor::new).setMaxConcurrency(2).remote(); } + + public int ping() { + return 1; + } } diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index b53a1fee2..936be1e12 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -102,6 +102,10 @@ from ray.includes.gcs_client cimport CGcsClient from ray.includes.ray_config cimport RayConfig from ray.includes.global_state_accessor cimport CGlobalStateAccessor +from ray.includes.optional cimport ( + optional +) + import ray from ray.exceptions import ( RayActorError, @@ -112,6 +116,7 @@ from ray.exceptions import ( GetTimeoutError, TaskCancelledError, AsyncioActorExit, + PendingCallsLimitExceeded, ) from ray import external_storage from ray.util.scheduling_strategies import ( @@ -1522,6 +1527,7 @@ cdef class CoreWorker: c_string extension_data, c_string serialized_runtime_env, concurrency_groups_dict, + int32_t max_pending_calls, scheduling_strategy, ): cdef: @@ -1561,7 +1567,8 @@ cdef class CoreWorker: c_concurrency_groups, # execute out of order for # async or threaded actors. - is_asyncio or max_concurrency > 1), + is_asyncio or max_concurrency > 1, + max_pending_calls), extension_data, &c_actor_id)) @@ -1642,7 +1649,7 @@ cdef class CoreWorker: unordered_map[c_string, double] c_resources CRayFunction ray_function c_vector[unique_ptr[CTaskArg]] args_vector - c_vector[CObjectReference] return_refs + optional[c_vector[CObjectReference]] return_refs with self.profile_event(b"submit_task"): if num_method_cpus > 0: @@ -1659,8 +1666,25 @@ cdef class CoreWorker: c_actor_id, ray_function, args_vector, CTaskOptions(name, num_returns, c_resources)) - - return VectorToObjectRefs(return_refs) + if return_refs.has_value(): + return VectorToObjectRefs(return_refs.value()) + else: + actor = self.get_actor_handle(actor_id) + actor_handle = (CCoreWorkerProcess.GetCoreWorker() + .GetActorHandle(c_actor_id)) + raise PendingCallsLimitExceeded("The task {} could not be " + "submitted to {} because more " + "than {} tasks are queued on " + "the actor. This limit " + "can be adjusted with the " + "`max_pending_calls` actor " + "option.".format( + function_descriptor + .function_name, + repr(actor), + (dereference(actor_handle) + .MaxPendingCalls()) + )) def kill_actor(self, ActorID actor_id, c_bool no_restart): cdef: diff --git a/python/ray/actor.py b/python/ray/actor.py index 65557ad3e..0e6dba757 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -489,6 +489,7 @@ class ActorClass: placement_group_bundle_index=-1, placement_group_capture_child_tasks=None, runtime_env=None, + max_pending_calls=-1, scheduling_strategy: SchedulingStrategyT = None): """Configures and overrides the actor instantiation parameters. @@ -547,6 +548,7 @@ class ActorClass: placement_group_capture_child_tasks=( placement_group_capture_child_tasks), runtime_env=new_runtime_env, + max_pending_calls=max_pending_calls, scheduling_strategy=scheduling_strategy) return ActorOptionWrapper() @@ -571,6 +573,7 @@ class ActorClass: placement_group_bundle_index=-1, placement_group_capture_child_tasks=None, runtime_env=None, + max_pending_calls=-1, scheduling_strategy: SchedulingStrategyT = None): """Create an actor. @@ -624,6 +627,11 @@ class ActorClass: this actor or task and its children (see :ref:`runtime-environments` for details). This API is in beta and may change before becoming stable. + max_pending_calls (int): Set the max number of pending calls + allowed on the actor handle. When this value is exceeded, + PendingCallsLimitExceeded will be raised for further tasks. + Note that this limit is counted per handle. -1 means that the + number of pending calls is unlimited. scheduling_strategy: Strategy about how to schedule this actor. Returns: @@ -671,6 +679,7 @@ class ActorClass: placement_group_capture_child_tasks=( placement_group_capture_child_tasks), runtime_env=runtime_env, + max_pending_calls=max_pending_calls, scheduling_strategy=scheduling_strategy) worker = ray.worker.global_worker @@ -857,6 +866,7 @@ class ActorClass: extension_data=str(actor_method_cpu), serialized_runtime_env=new_runtime_env or "{}", concurrency_groups_dict=concurrency_groups_dict or dict(), + max_pending_calls=max_pending_calls, scheduling_strategy=scheduling_strategy) actor_handle = ActorHandle( diff --git a/python/ray/exceptions.py b/python/ray/exceptions.py index 608cc17de..c6d95eb76 100644 --- a/python/ray/exceptions.py +++ b/python/ray/exceptions.py @@ -454,6 +454,15 @@ class RuntimeEnvSetupError(RayError): return "The runtime_env failed to be set up." +class PendingCallsLimitExceeded(RayError): + """Raised when the pending actor calls exceeds `max_pending_calls` option. + + This exception could happen probably because the caller calls the callee + too frequently. + """ + pass + + RAY_EXCEPTION_TYPES = [ PlasmaObjectNotAvailable, RayError, @@ -471,5 +480,6 @@ RAY_EXCEPTION_TYPES = [ GetTimeoutError, AsyncioActorExit, RuntimeEnvSetupError, + PendingCallsLimitExceeded, LocalRayletDiedError, ] diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 9ece35e6a..a9c66e28c 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -269,7 +269,8 @@ cdef extern from "ray/core_worker/common.h" nogil: const CSchedulingStrategy &scheduling_strategy, c_string serialized_runtime_env, const c_vector[CConcurrencyGroup] &concurrency_groups, - c_bool execute_out_of_order) + c_bool execute_out_of_order, + int32_t max_pending_calls) cdef cppclass CPlacementGroupCreationOptions \ "ray::core::PlacementGroupCreationOptions": diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index fc7cb5193..eb655a754 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -50,6 +50,10 @@ from ray.includes.function_descriptor cimport ( CFunctionDescriptor, ) +from ray.includes.optional cimport ( + optional +) + ctypedef unordered_map[c_string, c_vector[pair[int64_t, double]]] \ ResourceMappingType @@ -96,6 +100,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CLanguage ActorLanguage() const CFunctionDescriptor ActorCreationTaskFunctionDescriptor() const c_string ExtensionData() const + int MaxPendingCalls() const cdef cppclass CCoreWorker "ray::core::CoreWorker": void ConnectToRaylet() @@ -122,7 +127,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const CPlacementGroupID &placement_group_id) CRayStatus WaitPlacementGroupReady( const CPlacementGroupID &placement_group_id, int timeout_seconds) - c_vector[CObjectReference] SubmitActorTask( + optional[c_vector[CObjectReference]] SubmitActorTask( const CActorID &actor_id, const CRayFunction &function, const c_vector[unique_ptr[CTaskArg]] &args, const CTaskOptions &options) diff --git a/python/ray/includes/optional.pxd b/python/ray/includes/optional.pxd new file mode 100644 index 000000000..a3539824a --- /dev/null +++ b/python/ray/includes/optional.pxd @@ -0,0 +1,36 @@ +# Currently Cython does not support std::optional. +# See: https://github.com/cython/cython/pull/3294 +from libcpp cimport bool + +cdef extern from "" namespace "std" nogil: + cdef cppclass nullopt_t: + nullopt_t() + + cdef nullopt_t nullopt + + cdef cppclass optional[T]: + ctypedef T value_type + optional() + optional(nullopt_t) + optional(optional&) except + + optional(T&) except + + bool has_value() + T& value() + T& value_or[U](U& default_value) + void swap(optional&) + void reset() + T& emplace(...) + T& operator*() + # T* operator->() # Not Supported + optional& operator=(optional&) + optional& operator=[U](U&) + bool operator bool() + bool operator!() + bool operator==[U](optional&, U&) + bool operator!=[U](optional&, U&) + bool operator<[U](optional&, U&) + bool operator>[U](optional&, U&) + bool operator<=[U](optional&, U&) + bool operator>=[U](optional&, U&) + + optional[T] make_optional[T](...) except + diff --git a/python/ray/tests/test_advanced_4.py b/python/ray/tests/test_advanced_4.py index 0f19e1770..f688055e9 100644 --- a/python/ray/tests/test_advanced_4.py +++ b/python/ray/tests/test_advanced_4.py @@ -2,6 +2,7 @@ import pytest import ray import subprocess import sys +from ray._private.test_utils import Semaphore @pytest.fixture @@ -96,6 +97,66 @@ def test_jemalloc_env_var_propagate(): assert actual == expected +def test_back_pressure(shutdown_only_with_initialization_check): + ray.init() + + signal_actor = Semaphore.options(max_pending_calls=10).remote(value=0) + + try: + for i in range(10): + signal_actor.acquire.remote() + except ray.exceptions.PendingCallsLimitExceeded: + assert False + + with pytest.raises(ray.exceptions.PendingCallsLimitExceeded): + signal_actor.acquire.remote() + + @ray.remote + def release(signal_actor): + ray.get(signal_actor.release.remote()) + return 1 + + # Release signal actor through common task, + # because actor tasks will be back pressured + for i in range(10): + ray.get(release.remote(signal_actor)) + + # Check whether we can call remote actor normally after + # back presssure released. + try: + signal_actor.acquire.remote() + except ray.exceptions.PendingCallsLimitExceeded: + assert False + + ray.shutdown() + + +def test_local_mode_deadlock(shutdown_only_with_initialization_check): + ray.init(local_mode=True) + + @ray.remote + class Foo: + def __init__(self): + pass + + def ping_actor(self, actor): + actor.ping.remote() + return 3 + + @ray.remote + class Bar: + def __init__(self): + pass + + def ping(self): + return 1 + + foo = Foo.remote() + bar = Bar.remote() + # Expect ping_actor call returns normally without deadlock. + assert ray.get(foo.ping_actor.remote(bar)) == 3 + + if __name__ == "__main__": import pytest sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/util/client/options.py b/python/ray/util/client/options.py index d84487ee0..e51da4d6a 100644 --- a/python/ray/util/client/options.py +++ b/python/ray/util/client/options.py @@ -36,6 +36,7 @@ options = { "placement_group_bundle_index": (), "placement_group_capture_child_tasks": (), "runtime_env": (), + "max_pending_calls": (), "scheduling_strategy": (), } diff --git a/rllib/tests/test_gpus.py b/rllib/tests/test_gpus.py index 8a1f24311..c5ecb08ef 100644 --- a/rllib/tests/test_gpus.py +++ b/rllib/tests/test_gpus.py @@ -98,6 +98,7 @@ class TestGPUs(unittest.TestCase): print("via ray.tune.run()") tune.run( "PG", config=config, stop={"training_iteration": 0}) + ray.shutdown() diff --git a/src/mock/ray/core_worker/transport/direct_actor_transport.h b/src/mock/ray/core_worker/transport/direct_actor_transport.h index d7b22f3f7..4fd262731 100644 --- a/src/mock/ray/core_worker/transport/direct_actor_transport.h +++ b/src/mock/ray/core_worker/transport/direct_actor_transport.h @@ -17,7 +17,8 @@ namespace core { class MockCoreWorkerDirectActorTaskSubmitterInterface : public CoreWorkerDirectActorTaskSubmitterInterface { public: - MOCK_METHOD(void, AddActorQueueIfNotExists, (const ActorID &actor_id), (override)); + MOCK_METHOD(void, AddActorQueueIfNotExists, + (const ActorID &actor_id, int32_t max_pending_calls), (override)); MOCK_METHOD(void, ConnectActor, (const ActorID &actor_id, const rpc::Address &address, int64_t num_restarts), diff --git a/src/ray/core_worker/actor_handle.cc b/src/ray/core_worker/actor_handle.cc index 665b64835..741108672 100644 --- a/src/ray/core_worker/actor_handle.cc +++ b/src/ray/core_worker/actor_handle.cc @@ -25,7 +25,8 @@ rpc::ActorHandle CreateInnerActorHandle( const ObjectID &initial_cursor, const Language actor_language, const FunctionDescriptor &actor_creation_task_function_descriptor, const std::string &extension_data, int64_t max_task_retries, const std::string &name, - const std::string &ray_namespace, bool execute_out_of_order) { + const std::string &ray_namespace, int32_t max_pending_calls, + bool execute_out_of_order) { rpc::ActorHandle inner; inner.set_actor_id(actor_id.Data(), actor_id.Size()); inner.set_owner_id(owner_id.Binary()); @@ -40,6 +41,7 @@ rpc::ActorHandle CreateInnerActorHandle( inner.set_name(name); inner.set_ray_namespace(ray_namespace); inner.set_execute_out_of_order(execute_out_of_order); + inner.set_max_pending_calls(max_pending_calls); return inner; } @@ -70,6 +72,8 @@ rpc::ActorHandle CreateInnerActorHandleFromActorTableData( actor_table_data.task_spec().actor_creation_task_spec().ray_namespace()); inner.set_execute_out_of_order( actor_table_data.task_spec().actor_creation_task_spec().execute_out_of_order()); + inner.set_max_pending_calls( + actor_table_data.task_spec().actor_creation_task_spec().max_pending_calls()); return inner; } } // namespace @@ -80,11 +84,12 @@ ActorHandle::ActorHandle( const ObjectID &initial_cursor, const Language actor_language, const FunctionDescriptor &actor_creation_task_function_descriptor, const std::string &extension_data, int64_t max_task_retries, const std::string &name, - const std::string &ray_namespace, bool execute_out_of_order) + const std::string &ray_namespace, int32_t max_pending_calls, + bool execute_out_of_order) : ActorHandle(CreateInnerActorHandle( actor_id, owner_id, owner_address, job_id, initial_cursor, actor_language, actor_creation_task_function_descriptor, extension_data, max_task_retries, name, - ray_namespace, execute_out_of_order)) {} + ray_namespace, max_pending_calls, execute_out_of_order)) {} ActorHandle::ActorHandle(const std::string &serialized) : ActorHandle(CreateInnerActorHandleFromString(serialized)) {} diff --git a/src/ray/core_worker/actor_handle.h b/src/ray/core_worker/actor_handle.h index 79fa12b0c..56ecd5b7e 100644 --- a/src/ray/core_worker/actor_handle.h +++ b/src/ray/core_worker/actor_handle.h @@ -38,7 +38,7 @@ class ActorHandle { const FunctionDescriptor &actor_creation_task_function_descriptor, const std::string &extension_data, int64_t max_task_retries, const std::string &name, const std::string &ray_namespace, - bool execute_out_of_order = false); + int32_t max_pending_calls, bool execute_out_of_order = false); /// Constructs an ActorHandle from a serialized string. explicit ActorHandle(const std::string &serialized); @@ -89,6 +89,8 @@ class ActorHandle { std::string GetNamespace() const; + int32_t MaxPendingCalls() const { return inner_.max_pending_calls(); } + bool ExecuteOutOfOrder() const { return inner_.execute_out_of_order(); } private: diff --git a/src/ray/core_worker/actor_manager.cc b/src/ray/core_worker/actor_manager.cc index c6cbd355a..ff18944b7 100644 --- a/src/ray/core_worker/actor_manager.cc +++ b/src/ray/core_worker/actor_manager.cc @@ -148,8 +148,8 @@ bool ActorManager::AddActorHandle(std::unique_ptr actor_handle, const ObjectID &actor_creation_return_id, bool is_self) { reference_counter_->AddLocalReference(actor_creation_return_id, call_site); - direct_actor_submitter_->AddActorQueueIfNotExists(actor_id, - actor_handle->ExecuteOutOfOrder()); + direct_actor_submitter_->AddActorQueueIfNotExists( + actor_id, actor_handle->MaxPendingCalls(), actor_handle->ExecuteOutOfOrder()); bool inserted; { absl::MutexLock lock(&mutex_); diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index c37466b7b..db4f6eb7c 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -91,7 +91,7 @@ struct ActorCreationOptions { const rpc::SchedulingStrategy &scheduling_strategy, const std::string &serialized_runtime_env = "{}", const std::vector &concurrency_groups = {}, - bool execute_out_of_order = false) + bool execute_out_of_order = false, int32_t max_pending_calls = -1) : max_restarts(max_restarts), max_task_retries(max_task_retries), max_concurrency(max_concurrency), @@ -105,6 +105,7 @@ struct ActorCreationOptions { serialized_runtime_env(serialized_runtime_env), concurrency_groups(concurrency_groups.begin(), concurrency_groups.end()), execute_out_of_order(execute_out_of_order), + max_pending_calls(max_pending_calls), scheduling_strategy(scheduling_strategy){}; /// Maximum number of times that the actor should be restarted if it dies @@ -144,6 +145,8 @@ struct ActorCreationOptions { const std::vector concurrency_groups; /// Wether the actor execute tasks out of order. const bool execute_out_of_order = false; + /// The maxmium actor call pending count. + const int max_pending_calls = -1; // The strategy about how to schedule this actor. rpc::SchedulingStrategy scheduling_strategy; }; diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 8f0db3b2f..810ccea0a 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -354,7 +354,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ direct_actor_submitter_ = std::shared_ptr( new CoreWorkerDirectActorTaskSubmitter(*core_worker_client_pool_, *memory_store_, *task_manager_, *actor_creator_, - on_excess_queueing)); + on_excess_queueing, io_service_)); auto node_addr_factory = [this](const NodeID &node_id) { absl::optional addr; @@ -1602,6 +1602,7 @@ Status CoreWorker::CreateActor(const RayFunction &function, /*actor_cursor=*/ObjectID::FromIndex(actor_creation_task_id, 1), function.GetLanguage(), function.GetFunctionDescriptor(), extension_data, actor_creation_options.max_task_retries, actor_name, ray_namespace, + actor_creation_options.max_pending_calls, actor_creation_options.execute_out_of_order); std::string serialized_actor_handle; actor_handle->Serialize(&serialized_actor_handle); @@ -1760,9 +1761,17 @@ Status CoreWorker::WaitPlacementGroupReady(const PlacementGroupID &placement_gro return status_future.get(); } -std::vector CoreWorker::SubmitActorTask( +std::optional> CoreWorker::SubmitActorTask( const ActorID &actor_id, const RayFunction &function, const std::vector> &args, const TaskOptions &task_options) { + absl::ReleasableMutexLock lock(&actor_task_mutex_); + /// Check whether backpressure may happen at the very beginning of submitting a task. + if (direct_actor_submitter_->PendingTasksFull(actor_id)) { + RAY_LOG(DEBUG) << "Back pressure occurred while submitting the task to " << actor_id + << ". " << direct_actor_submitter_->DebugString(actor_id); + return std::nullopt; + } + auto actor_handle = actor_manager_->GetActorHandle(actor_id); // Add one for actor cursor object id for tasks. @@ -1801,17 +1810,18 @@ std::vector CoreWorker::SubmitActorTask( RAY_LOG(DEBUG) << "Submitting actor task " << task_spec.DebugString(); std::vector returned_refs; if (options_.is_local_mode) { + /// NOTE: The lock should be released in local mode. The user code may + /// submit another task when executing the current task locally, which + /// cause deadlock. The code call chain is: + /// SubmitActorTask -> python user code -> actor.xx.remote() -> SubmitActorTask + lock.Release(); returned_refs = ExecuteTaskLocalMode(task_spec, actor_id); } else { returned_refs = task_manager_->AddPendingTask( rpc_address_, task_spec, CurrentCallSite(), actor_handle->MaxTaskRetries()); - io_service_.post( - [this, task_spec]() { - RAY_UNUSED(direct_actor_submitter_->SubmitTask(task_spec)); - }, - "CoreWorker.SubmitActorTask"); + RAY_CHECK_OK(direct_actor_submitter_->SubmitTask(task_spec)); } - return returned_refs; + return {std::move(returned_refs)}; } Status CoreWorker::CancelTask(const ObjectID &object_id, bool force_kill, diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 0cc63d744..879ecc908 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -476,7 +476,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \param[in] args Arguments of this task. /// \param[in] task_options Options for this task. /// \return ObjectRefs returned by this task. - std::vector SubmitActorTask( + std::optional> SubmitActorTask( const ActorID &actor_id, const RayFunction &function, const std::vector> &args, const TaskOptions &task_options); @@ -1206,6 +1206,12 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { } }; TaskCounter task_counter_; + + /// Used to guarantee that submitting actor task is thread safe. + /// NOTE(MissiontoMars,scv119): In particular, without this mutex, + /// the checking and increasing of backpressure pending calls counter + /// is not atomic, which may lead to under counting or over counting. + absl::Mutex actor_task_mutex_; }; } // namespace core diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc index 29302f7a3..a2865a91d 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc @@ -150,6 +150,7 @@ inline ActorCreationOptions ToActorCreationOptions(JNIEnv *env, uint64_t max_concurrency = 1; auto placement_options = std::make_pair(PlacementGroupID::Nil(), -1); std::vector concurrency_groups; + int32_t max_pending_calls = -1; if (actorCreationOptions) { auto java_name = (jstring)env->GetObjectField(actorCreationOptions, @@ -214,6 +215,8 @@ inline ActorCreationOptions ToActorCreationOptions(JNIEnv *env, return ray::ConcurrencyGroup{concurrency_group_name, max_concurrency, native_func_descriptors}; }); + max_pending_calls = static_cast(env->GetIntField( + actorCreationOptions, java_actor_creation_options_max_pending_calls)); } // TODO(suquark): support passing namespace for Java. Currently @@ -243,7 +246,9 @@ inline ActorCreationOptions ToActorCreationOptions(JNIEnv *env, /*is_asyncio=*/false, /*scheduling_strategy=*/scheduling_strategy, /*serialized_runtime_env=*/"{}", - concurrency_groups}; + concurrency_groups, + /*execute_out_of_order*/ false, + max_pending_calls}; return actor_creation_options; } @@ -369,8 +374,21 @@ Java_io_ray_runtime_task_NativeTaskSubmitter_nativeSubmitActorTask( auto return_refs = CoreWorkerProcess::GetCoreWorker().SubmitActorTask( actor_id, ray_function, task_args, task_options); + if (!return_refs.has_value()) { + std::stringstream ss; + ss << "The task " << ray_function.GetFunctionDescriptor()->ToString() + << " could not be submitted to " << actor_id; + ss << " because more than " + << CoreWorkerProcess::GetCoreWorker().GetActorHandle(actor_id)->MaxPendingCalls(); + ss << " tasks are queued on the actor. This limit can be adjusted with the " + "`setMaxPendingCalls` actor option."; + env->ThrowNew(java_ray_pending_calls_limit_exceeded_exception_class, + ss.str().c_str()); + return nullptr; + } + std::vector return_ids; - for (const auto &ref : return_refs) { + for (const auto &ref : return_refs.value()) { return_ids.push_back(ObjectID::FromBinary(ref.object_id())); } diff --git a/src/ray/core_worker/lib/java/jni_init.cc b/src/ray/core_worker/lib/java/jni_init.cc index 9e1600cf4..38b3697ff 100644 --- a/src/ray/core_worker/lib/java/jni_init.cc +++ b/src/ray/core_worker/lib/java/jni_init.cc @@ -61,6 +61,8 @@ jclass java_ray_exception_class; jclass java_ray_intentional_system_exit_exception_class; jclass java_ray_timeout_exception_class; +jclass java_ray_pending_calls_limit_exceeded_exception_class; + jclass java_ray_actor_exception_class; jmethodID java_ray_exception_to_bytes; @@ -102,6 +104,7 @@ jfieldID java_actor_creation_options_max_concurrency; jfieldID java_actor_creation_options_group; jfieldID java_actor_creation_options_bundle_index; jfieldID java_actor_creation_options_concurrency_groups; +jfieldID java_actor_creation_options_max_pending_calls; jclass java_placement_group_creation_options_class; jclass java_placement_group_creation_options_strategy_class; @@ -218,6 +221,9 @@ jint JNI_OnLoad(JavaVM *vm, void *reserved) { java_ray_actor_exception_class = LoadClass(env, "io/ray/runtime/exception/RayActorException"); + java_ray_pending_calls_limit_exceeded_exception_class = + LoadClass(env, "io/ray/runtime/exception/PendingCallsLimitExceededException"); + java_ray_exception_to_bytes = env->GetMethodID(java_ray_exception_class, "toBytes", "()[B"); @@ -304,6 +310,8 @@ jint JNI_OnLoad(JavaVM *vm, void *reserved) { env->GetFieldID(java_actor_creation_options_class, "bundleIndex", "I"); java_actor_creation_options_concurrency_groups = env->GetFieldID( java_actor_creation_options_class, "concurrencyGroups", "Ljava/util/List;"); + java_actor_creation_options_max_pending_calls = + env->GetFieldID(java_actor_creation_options_class, "maxPendingCalls", "I"); java_concurrency_group_impl_class = LoadClass(env, "io/ray/runtime/ConcurrencyGroupImpl"); java_concurrency_group_impl_get_function_descriptors = env->GetMethodID( diff --git a/src/ray/core_worker/lib/java/jni_utils.h b/src/ray/core_worker/lib/java/jni_utils.h index 667896d93..0b2a14bd4 100644 --- a/src/ray/core_worker/lib/java/jni_utils.h +++ b/src/ray/core_worker/lib/java/jni_utils.h @@ -104,6 +104,9 @@ extern jmethodID java_system_gc; /// RayException class extern jclass java_ray_exception_class; +/// PendingCallsLimitExceededException class +extern jclass java_ray_pending_calls_limit_exceeded_exception_class; + /// RayIntentionalSystemExitException class extern jclass java_ray_intentional_system_exit_exception_class; @@ -184,6 +187,8 @@ extern jfieldID java_actor_creation_options_group; extern jfieldID java_actor_creation_options_bundle_index; /// concurrencyGroups field of ActorCreationOptions class extern jfieldID java_actor_creation_options_concurrency_groups; +/// maxPendingCalls field of ActorCreationOptions class +extern jfieldID java_actor_creation_options_max_pending_calls; /// ConcurrencyGroupImpl class extern jclass java_concurrency_group_impl_class; diff --git a/src/ray/core_worker/test/actor_manager_test.cc b/src/ray/core_worker/test/actor_manager_test.cc index 9465f41a0..61259833d 100644 --- a/src/ray/core_worker/test/actor_manager_test.cc +++ b/src/ray/core_worker/test/actor_manager_test.cc @@ -72,12 +72,13 @@ class MockGcsClient : public gcs::GcsClient { class MockDirectActorSubmitter : public CoreWorkerDirectActorTaskSubmitterInterface { public: MockDirectActorSubmitter() : CoreWorkerDirectActorTaskSubmitterInterface() {} - void AddActorQueueIfNotExists(const ActorID &actor_id, + void AddActorQueueIfNotExists(const ActorID &actor_id, int32_t max_pending_calls, bool execute_out_of_order = false) override { - AddActorQueueIfNotExists_(actor_id, execute_out_of_order); + AddActorQueueIfNotExists_(actor_id, max_pending_calls, execute_out_of_order); } - MOCK_METHOD2(AddActorQueueIfNotExists_, - void(const ActorID &actor_id, bool execute_out_of_order)); + MOCK_METHOD3(AddActorQueueIfNotExists_, + void(const ActorID &actor_id, int32_t max_pending_calls, + bool execute_out_of_order)); MOCK_METHOD3(ConnectActor, void(const ActorID &actor_id, const rpc::Address &address, int64_t num_restarts)); MOCK_METHOD4(DisconnectActor, void(const ActorID &actor_id, int64_t num_restarts, @@ -146,7 +147,8 @@ class ActorManagerTest : public ::testing::Test { auto actor_handle = absl::make_unique( actor_id, TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(), - function.GetLanguage(), function.GetFunctionDescriptor(), "", 0, "", "", false); + function.GetLanguage(), function.GetFunctionDescriptor(), "", 0, "", "", -1, + false); EXPECT_CALL(*reference_counter_, SetDeleteCallback(_, _)) .WillRepeatedly(testing::Return(true)); actor_manager_->AddNewActorHandle(move(actor_handle), call_site, caller_address, @@ -172,7 +174,7 @@ TEST_F(ActorManagerTest, TestAddAndGetActorHandleEndToEnd) { FunctionDescriptorBuilder::BuildPython("", "", "", "")); auto actor_handle = absl::make_unique( actor_id, TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(), - function.GetLanguage(), function.GetFunctionDescriptor(), "", 0, "", "", false); + function.GetLanguage(), function.GetFunctionDescriptor(), "", 0, "", "", -1, false); EXPECT_CALL(*reference_counter_, SetDeleteCallback(_, _)) .WillRepeatedly(testing::Return(true)); @@ -185,7 +187,7 @@ TEST_F(ActorManagerTest, TestAddAndGetActorHandleEndToEnd) { auto actor_handle2 = absl::make_unique( actor_id, TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(), - function.GetLanguage(), function.GetFunctionDescriptor(), "", 0, "", "", false); + function.GetLanguage(), function.GetFunctionDescriptor(), "", 0, "", "", -1, false); // Make sure the same actor id adding will return false. ASSERT_FALSE(actor_manager_->AddNewActorHandle(move(actor_handle2), call_site, caller_address, false)); @@ -225,7 +227,7 @@ TEST_F(ActorManagerTest, RegisterActorHandles) { FunctionDescriptorBuilder::BuildPython("", "", "", "")); auto actor_handle = absl::make_unique( actor_id, TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(), - function.GetLanguage(), function.GetFunctionDescriptor(), "", 0, "", "", false); + function.GetLanguage(), function.GetFunctionDescriptor(), "", 0, "", "", -1, false); EXPECT_CALL(*reference_counter_, SetDeleteCallback(_, _)) .WillRepeatedly(testing::Return(true)); ObjectID outer_object_id = ObjectID::Nil(); diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 8c01ad0c7..69c97b8f2 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -216,8 +216,9 @@ int CoreWorkerTest::GetActorPid(const ActorID &actor_id, RayFunction func{Language::PYTHON, FunctionDescriptorBuilder::BuildPython("GetWorkerPid", "", "", "")}; - auto return_ids = ObjectRefsToIds( - CoreWorkerProcess::GetCoreWorker().SubmitActorTask(actor_id, func, args, options)); + auto return_ids = ObjectRefsToIds(CoreWorkerProcess::GetCoreWorker() + .SubmitActorTask(actor_id, func, args, options) + .value()); std::vector> results; RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().Get(return_ids, -1, &results)); @@ -304,7 +305,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso "MergeInputArgsAsOutput", "", "", "")); auto return_ids = - ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options)); + ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options).value()); ASSERT_EQ(return_ids.size(), 1); std::vector> results; @@ -347,7 +348,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso RayFunction func(Language::PYTHON, FunctionDescriptorBuilder::BuildPython( "MergeInputArgsAsOutput", "", "", "")); auto return_ids = - ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options)); + ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options).value()); ASSERT_EQ(return_ids.size(), 1); @@ -410,7 +411,7 @@ void CoreWorkerTest::TestActorRestart( "MergeInputArgsAsOutput", "", "", "")); auto return_ids = - ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options)); + ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options).value()); ASSERT_EQ(return_ids.size(), 1); // Verify if it's expected data. std::vector> results; @@ -453,7 +454,7 @@ void CoreWorkerTest::TestActorFailure( "MergeInputArgsAsOutput", "", "", "")); auto return_ids = - ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options)); + ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options).value()); ASSERT_EQ(return_ids.size(), 1); all_results.emplace_back(std::make_pair(return_ids[0], buffer1)); @@ -527,7 +528,7 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { ActorHandle actor_handle(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 1), TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(), function.GetLanguage(), function.GetFunctionDescriptor(), "", - 0, "", ""); + 0, "", "", -1); // Manually create `num_tasks` task specs, and for each of them create a // `PushTaskRequest`, this is to batch performance of TaskSpec @@ -589,7 +590,7 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) { "MergeInputArgsAsOutput", "", "", "")); auto return_ids = - ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options)); + ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options).value()); ASSERT_EQ(return_ids.size(), 1); object_ids.emplace_back(return_ids[0]); } @@ -647,7 +648,7 @@ TEST_F(ZeroNodeTest, TestActorHandle) { ActorHandle original( ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 0), TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(), Language::PYTHON, - FunctionDescriptorBuilder::BuildPython("", "", "", ""), "", 0, "", ""); + FunctionDescriptorBuilder::BuildPython("", "", "", ""), "", 0, "", "", -1); std::string output; original.Serialize(&output); ActorHandle deserialized(output); diff --git a/src/ray/core_worker/test/direct_actor_transport_mock_test.cc b/src/ray/core_worker/test/direct_actor_transport_mock_test.cc index a6b02692f..bbcf86474 100644 --- a/src/ray/core_worker/test/direct_actor_transport_mock_test.cc +++ b/src/ray/core_worker/test/direct_actor_transport_mock_test.cc @@ -26,6 +26,8 @@ namespace core { using namespace ::testing; class DirectTaskTransportTest : public ::testing::Test { public: + DirectTaskTransportTest() : io_work(io_context) {} + void SetUp() override { gcs_client = std::make_shared(); actor_creator = std::make_unique(gcs_client); @@ -35,7 +37,7 @@ class DirectTaskTransportTest : public ::testing::Test { [&](const rpc::Address &) { return nullptr; }); memory_store = std::make_unique(); actor_task_submitter = std::make_unique( - *client_pool, *memory_store, *task_finisher, *actor_creator, nullptr); + *client_pool, *memory_store, *task_finisher, *actor_creator, nullptr, io_context); } TaskSpecification GetActorTaskSpec(const ActorID &actor_id) { @@ -57,6 +59,15 @@ class DirectTaskTransportTest : public ::testing::Test { return TaskSpecification(task_spec); } + protected: + bool CheckSubmitTask(TaskSpecification task) { + EXPECT_TRUE(actor_task_submitter->SubmitTask(task).ok()); + return 1 == io_context.poll_one(); + } + + protected: + instrumented_io_context io_context; + boost::asio::io_service::work io_work; std::unique_ptr actor_task_submitter; std::shared_ptr client_pool; std::unique_ptr memory_store; @@ -81,8 +92,8 @@ TEST_F(DirectTaskTransportTest, ActorRegisterFailure) { ::testing::Return(Status::OK()))); ASSERT_TRUE(actor_creator->AsyncRegisterActor(creation_task_spec, nullptr).ok()); ASSERT_TRUE(actor_creator->IsActorInRegistering(actor_id)); - actor_task_submitter->AddActorQueueIfNotExists(actor_id); - ASSERT_TRUE(actor_task_submitter->SubmitTask(task_spec).ok()); + actor_task_submitter->AddActorQueueIfNotExists(actor_id, -1); + ASSERT_TRUE(CheckSubmitTask(task_spec)); EXPECT_CALL(*task_finisher, FailOrRetryPendingTask( task_spec.TaskId(), rpc::ErrorType::DEPENDENCY_RESOLUTION_FAILED, _, _, _)); @@ -105,8 +116,8 @@ TEST_F(DirectTaskTransportTest, ActorRegisterOk) { ::testing::Return(Status::OK()))); ASSERT_TRUE(actor_creator->AsyncRegisterActor(creation_task_spec, nullptr).ok()); ASSERT_TRUE(actor_creator->IsActorInRegistering(actor_id)); - actor_task_submitter->AddActorQueueIfNotExists(actor_id); - ASSERT_TRUE(actor_task_submitter->SubmitTask(task_spec).ok()); + actor_task_submitter->AddActorQueueIfNotExists(actor_id, -1); + ASSERT_TRUE(CheckSubmitTask(task_spec)); EXPECT_CALL(*task_finisher, FailOrRetryPendingTask(_, _, _, _, _)).Times(0); register_cb(Status::OK()); } diff --git a/src/ray/core_worker/test/direct_actor_transport_test.cc b/src/ray/core_worker/test/direct_actor_transport_test.cc index ad9e4a745..e95bb8242 100644 --- a/src/ray/core_worker/test/direct_actor_transport_test.cc +++ b/src/ray/core_worker/test/direct_actor_transport_test.cc @@ -104,10 +104,15 @@ class DirectActorSubmitterTest : public ::testing::TestWithParam { worker_client_(std::make_shared()), store_(std::make_shared()), task_finisher_(std::make_shared()), - submitter_(*client_pool_, *store_, *task_finisher_, actor_creator_, - [this](const ActorID &actor_id, int64_t num_queued) { - last_queue_warning_ = num_queued; - }) {} + io_work(io_context), + submitter_( + *client_pool_, *store_, *task_finisher_, actor_creator_, + [this](const ActorID &actor_id, int64_t num_queued) { + last_queue_warning_ = num_queued; + }, + io_context) {} + + void TearDown() override { io_context.stop(); } int num_clients_connected_ = 0; int64_t last_queue_warning_ = 0; @@ -116,7 +121,15 @@ class DirectActorSubmitterTest : public ::testing::TestWithParam { std::shared_ptr worker_client_; std::shared_ptr store_; std::shared_ptr task_finisher_; + instrumented_io_context io_context; + boost::asio::io_service::work io_work; CoreWorkerDirectActorTaskSubmitter submitter_; + + protected: + bool CheckSubmitTask(TaskSpecification task) { + EXPECT_TRUE(submitter_.SubmitTask(task).ok()); + return 1 == io_context.poll_one(); + } }; TEST_P(DirectActorSubmitterTest, TestSubmitTask) { @@ -125,17 +138,17 @@ TEST_P(DirectActorSubmitterTest, TestSubmitTask) { auto worker_id = WorkerID::FromRandom(); addr.set_worker_id(worker_id.Binary()); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); - submitter_.AddActorQueueIfNotExists(actor_id, execute_out_of_order); + submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order); auto task = CreateActorTaskHelper(actor_id, worker_id, 0); - ASSERT_TRUE(submitter_.SubmitTask(task).ok()); + ASSERT_TRUE(CheckSubmitTask(task)); ASSERT_EQ(worker_client_->callbacks.size(), 0); submitter_.ConnectActor(actor_id, addr, 0); ASSERT_EQ(worker_client_->callbacks.size(), 1); task = CreateActorTaskHelper(actor_id, worker_id, 1); - ASSERT_TRUE(submitter_.SubmitTask(task).ok()); + ASSERT_TRUE(CheckSubmitTask(task)); ASSERT_EQ(worker_client_->callbacks.size(), 2); EXPECT_CALL(*task_finisher_, CompletePendingTask(_, _, _)) @@ -159,26 +172,26 @@ TEST_P(DirectActorSubmitterTest, TestQueueingWarning) { auto worker_id = WorkerID::FromRandom(); addr.set_worker_id(worker_id.Binary()); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); - submitter_.AddActorQueueIfNotExists(actor_id, execute_out_of_order); + submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order); submitter_.ConnectActor(actor_id, addr, 0); for (int i = 0; i < 7500; i++) { auto task = CreateActorTaskHelper(actor_id, worker_id, i); - ASSERT_TRUE(submitter_.SubmitTask(task).ok()); + ASSERT_TRUE(CheckSubmitTask(task)); worker_client_->acked_seqno = i; } ASSERT_EQ(last_queue_warning_, 0); for (int i = 7500; i < 15000; i++) { auto task = CreateActorTaskHelper(actor_id, worker_id, i); - ASSERT_TRUE(submitter_.SubmitTask(task).ok()); + ASSERT_TRUE(CheckSubmitTask(task)); /* no ack */ } ASSERT_EQ(last_queue_warning_, 5000); for (int i = 15000; i < 35000; i++) { auto task = CreateActorTaskHelper(actor_id, worker_id, i); - ASSERT_TRUE(submitter_.SubmitTask(task).ok()); + ASSERT_TRUE(CheckSubmitTask(task)); /* no ack */ } ASSERT_EQ(last_queue_warning_, 20000); @@ -190,7 +203,7 @@ TEST_P(DirectActorSubmitterTest, TestDependencies) { auto worker_id = WorkerID::FromRandom(); addr.set_worker_id(worker_id.Binary()); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); - submitter_.AddActorQueueIfNotExists(actor_id, execute_out_of_order); + submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order); submitter_.ConnectActor(actor_id, addr, 0); ASSERT_EQ(worker_client_->callbacks.size(), 0); @@ -206,8 +219,8 @@ TEST_P(DirectActorSubmitterTest, TestDependencies) { // Neither task can be submitted yet because they are still waiting on // dependencies. - ASSERT_TRUE(submitter_.SubmitTask(task1).ok()); - ASSERT_TRUE(submitter_.SubmitTask(task2).ok()); + ASSERT_TRUE(CheckSubmitTask(task1)); + ASSERT_TRUE(CheckSubmitTask(task2)); ASSERT_EQ(worker_client_->callbacks.size(), 0); // Put the dependencies in the store in the same order as task submission. @@ -225,7 +238,7 @@ TEST_P(DirectActorSubmitterTest, TestOutOfOrderDependencies) { auto worker_id = WorkerID::FromRandom(); addr.set_worker_id(worker_id.Binary()); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); - submitter_.AddActorQueueIfNotExists(actor_id, execute_out_of_order); + submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order); submitter_.ConnectActor(actor_id, addr, 0); ASSERT_EQ(worker_client_->callbacks.size(), 0); @@ -241,8 +254,8 @@ TEST_P(DirectActorSubmitterTest, TestOutOfOrderDependencies) { // Neither task can be submitted yet because they are still waiting on // dependencies. - ASSERT_TRUE(submitter_.SubmitTask(task1).ok()); - ASSERT_TRUE(submitter_.SubmitTask(task2).ok()); + ASSERT_TRUE(CheckSubmitTask(task1)); + ASSERT_TRUE(CheckSubmitTask(task2)); ASSERT_EQ(worker_client_->callbacks.size(), 0); if (execute_out_of_order) { @@ -275,7 +288,7 @@ TEST_P(DirectActorSubmitterTest, TestActorDead) { auto worker_id = WorkerID::FromRandom(); addr.set_worker_id(worker_id.Binary()); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); - submitter_.AddActorQueueIfNotExists(actor_id, execute_out_of_order); + submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order); submitter_.ConnectActor(actor_id, addr, 0); ASSERT_EQ(worker_client_->callbacks.size(), 0); @@ -284,8 +297,8 @@ TEST_P(DirectActorSubmitterTest, TestActorDead) { ObjectID obj = ObjectID::FromRandom(); auto task2 = CreateActorTaskHelper(actor_id, worker_id, 1); task2.GetMutableMessage().add_args()->mutable_object_ref()->set_object_id(obj.Binary()); - ASSERT_TRUE(submitter_.SubmitTask(task1).ok()); - ASSERT_TRUE(submitter_.SubmitTask(task2).ok()); + ASSERT_TRUE(CheckSubmitTask(task1)); + ASSERT_TRUE(CheckSubmitTask(task2)); ASSERT_EQ(worker_client_->callbacks.size(), 1); // Simulate the actor dying. All in-flight tasks should get failed. @@ -310,7 +323,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartNoRetry) { auto worker_id = WorkerID::FromRandom(); addr.set_worker_id(worker_id.Binary()); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); - submitter_.AddActorQueueIfNotExists(actor_id, execute_out_of_order); + submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order); addr.set_port(0); submitter_.ConnectActor(actor_id, addr, 0); ASSERT_EQ(worker_client_->callbacks.size(), 0); @@ -321,9 +334,9 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartNoRetry) { auto task3 = CreateActorTaskHelper(actor_id, worker_id, 2); auto task4 = CreateActorTaskHelper(actor_id, worker_id, 3); // Submit three tasks. - ASSERT_TRUE(submitter_.SubmitTask(task1).ok()); - ASSERT_TRUE(submitter_.SubmitTask(task2).ok()); - ASSERT_TRUE(submitter_.SubmitTask(task3).ok()); + ASSERT_TRUE(CheckSubmitTask(task1)); + ASSERT_TRUE(CheckSubmitTask(task2)); + ASSERT_TRUE(CheckSubmitTask(task3)); EXPECT_CALL(*task_finisher_, CompletePendingTask(task1.TaskId(), _, _)).Times(1); EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _)) @@ -344,7 +357,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartNoRetry) { // Actor gets restarted. addr.set_port(1); submitter_.ConnectActor(actor_id, addr, 1); - ASSERT_TRUE(submitter_.SubmitTask(task4).ok()); + ASSERT_TRUE(CheckSubmitTask(task4)); ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK())); ASSERT_TRUE(worker_client_->callbacks.empty()); if (execute_out_of_order) { @@ -362,7 +375,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartRetry) { auto worker_id = WorkerID::FromRandom(); addr.set_worker_id(worker_id.Binary()); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); - submitter_.AddActorQueueIfNotExists(actor_id, execute_out_of_order); + submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order); addr.set_port(0); submitter_.ConnectActor(actor_id, addr, 0); ASSERT_EQ(worker_client_->callbacks.size(), 0); @@ -373,9 +386,9 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartRetry) { auto task3 = CreateActorTaskHelper(actor_id, worker_id, 2); auto task4 = CreateActorTaskHelper(actor_id, worker_id, 3); // Submit three tasks. - ASSERT_TRUE(submitter_.SubmitTask(task1).ok()); - ASSERT_TRUE(submitter_.SubmitTask(task2).ok()); - ASSERT_TRUE(submitter_.SubmitTask(task3).ok()); + ASSERT_TRUE(CheckSubmitTask(task1)); + ASSERT_TRUE(CheckSubmitTask(task2)); + ASSERT_TRUE(CheckSubmitTask(task3)); // All tasks will eventually finish. EXPECT_CALL(*task_finisher_, CompletePendingTask(_, _, _)).Times(4); @@ -399,10 +412,10 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartRetry) { addr.set_port(1); submitter_.ConnectActor(actor_id, addr, 1); // A new task is submitted. - ASSERT_TRUE(submitter_.SubmitTask(task4).ok()); + ASSERT_TRUE(CheckSubmitTask(task4)); // Tasks 2 and 3 get retried. - ASSERT_TRUE(submitter_.SubmitTask(task2).ok()); - ASSERT_TRUE(submitter_.SubmitTask(task3).ok()); + ASSERT_TRUE(CheckSubmitTask(task2)); + ASSERT_TRUE(CheckSubmitTask(task3)); while (!worker_client_->callbacks.empty()) { ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK())); } @@ -422,7 +435,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderRetry) { auto worker_id = WorkerID::FromRandom(); addr.set_worker_id(worker_id.Binary()); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); - submitter_.AddActorQueueIfNotExists(actor_id, execute_out_of_order); + submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order); addr.set_port(0); submitter_.ConnectActor(actor_id, addr, 0); ASSERT_EQ(worker_client_->callbacks.size(), 0); @@ -432,9 +445,9 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderRetry) { auto task2 = CreateActorTaskHelper(actor_id, worker_id, 1); auto task3 = CreateActorTaskHelper(actor_id, worker_id, 2); // Submit three tasks. - ASSERT_TRUE(submitter_.SubmitTask(task1).ok()); - ASSERT_TRUE(submitter_.SubmitTask(task2).ok()); - ASSERT_TRUE(submitter_.SubmitTask(task3).ok()); + ASSERT_TRUE(CheckSubmitTask(task1)); + ASSERT_TRUE(CheckSubmitTask(task2)); + ASSERT_TRUE(CheckSubmitTask(task3)); // All tasks will eventually finish. EXPECT_CALL(*task_finisher_, CompletePendingTask(_, _, _)).Times(3); @@ -456,7 +469,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderRetry) { if (execute_out_of_order) { // Upon re-connect, task 2 (failed) should be both retried. // Retry task 2 manually (simulating task_finisher and SendPendingTask's behavior) - ASSERT_TRUE(submitter_.SubmitTask(task2).ok()); + ASSERT_TRUE(CheckSubmitTask(task2)); // Only task2 should be submitted. ASSERT_EQ(worker_client_->callbacks.size(), 1); @@ -464,7 +477,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderRetry) { // Upon re-connect, task 2 (failed) and 3 (completed) should be both retried. // Retry task 2 manually (simulating task_finisher and SendPendingTask's behavior) // Retry task 3 should happen via event loop - ASSERT_TRUE(submitter_.SubmitTask(task2).ok()); + ASSERT_TRUE(CheckSubmitTask(task2)); // Both task2 and task3 should be submitted. ASSERT_EQ(worker_client_->callbacks.size(), 2); @@ -482,7 +495,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) { auto worker_id = WorkerID::FromRandom(); addr.set_worker_id(worker_id.Binary()); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); - submitter_.AddActorQueueIfNotExists(actor_id, execute_out_of_order); + submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order); addr.set_port(0); submitter_.ConnectActor(actor_id, addr, 0); ASSERT_EQ(worker_client_->callbacks.size(), 0); @@ -491,7 +504,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) { // Create four tasks for the actor. auto task = CreateActorTaskHelper(actor_id, worker_id, 0); // Submit a task. - ASSERT_TRUE(submitter_.SubmitTask(task).ok()); + ASSERT_TRUE(CheckSubmitTask(task)); EXPECT_CALL(*task_finisher_, CompletePendingTask(task.TaskId(), _, _)).Times(1); ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK())); @@ -501,7 +514,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) { ASSERT_EQ(num_clients_connected_, 2); // Submit a task. task = CreateActorTaskHelper(actor_id, worker_id, 1); - ASSERT_TRUE(submitter_.SubmitTask(task).ok()); + ASSERT_TRUE(CheckSubmitTask(task)); EXPECT_CALL(*task_finisher_, CompletePendingTask(task.TaskId(), _, _)).Times(1); ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK())); @@ -510,7 +523,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) { ASSERT_EQ(num_clients_connected_, 2); // Submit a task. task = CreateActorTaskHelper(actor_id, worker_id, 2); - ASSERT_TRUE(submitter_.SubmitTask(task).ok()); + ASSERT_TRUE(CheckSubmitTask(task)); EXPECT_CALL(*task_finisher_, CompletePendingTask(task.TaskId(), _, _)).Times(1); ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK())); @@ -519,7 +532,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) { ASSERT_EQ(num_clients_connected_, 2); // Submit a task. task = CreateActorTaskHelper(actor_id, worker_id, 3); - ASSERT_TRUE(submitter_.SubmitTask(task).ok()); + ASSERT_TRUE(CheckSubmitTask(task)); EXPECT_CALL(*task_finisher_, CompletePendingTask(task.TaskId(), _, _)).Times(0); ASSERT_FALSE(worker_client_->ReplyPushTask(Status::OK())); @@ -544,7 +557,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) { task = CreateActorTaskHelper(actor_id, worker_id, 4); EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task.TaskId(), _, _, _, _)) .Times(1); - ASSERT_TRUE(submitter_.SubmitTask(task).ok()); + ASSERT_FALSE(CheckSubmitTask(task)); } TEST_P(DirectActorSubmitterTest, TestActorRestartFailInflightTasks) { @@ -553,7 +566,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartFailInflightTasks) { auto worker_id = WorkerID::FromRandom(); addr.set_worker_id(worker_id.Binary()); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); - submitter_.AddActorQueueIfNotExists(actor_id, execute_out_of_order); + submitter_.AddActorQueueIfNotExists(actor_id, -1, execute_out_of_order); addr.set_port(0); submitter_.ConnectActor(actor_id, addr, 0); ASSERT_EQ(worker_client_->callbacks.size(), 0); @@ -564,13 +577,13 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartFailInflightTasks) { auto task2 = CreateActorTaskHelper(actor_id, worker_id, 1); auto task3 = CreateActorTaskHelper(actor_id, worker_id, 1); // Submit a task. - ASSERT_TRUE(submitter_.SubmitTask(task1).ok()); + ASSERT_TRUE(CheckSubmitTask(task1)); EXPECT_CALL(*task_finisher_, CompletePendingTask(task1.TaskId(), _, _)).Times(1); ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK())); // Submit 2 tasks. - ASSERT_TRUE(submitter_.SubmitTask(task2).ok()); - ASSERT_TRUE(submitter_.SubmitTask(task3).ok()); + ASSERT_TRUE(CheckSubmitTask(task2)); + ASSERT_TRUE(CheckSubmitTask(task3)); // Actor failed, but the task replies are delayed (or in some scenarios, lost). // We should still be able to fail the inflight tasks. EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _)) @@ -593,6 +606,46 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartFailInflightTasks) { ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError(""))); } +TEST_P(DirectActorSubmitterTest, TestPendingTasks) { + auto execute_out_of_order = GetParam(); + int32_t max_pending_calls = 10; + rpc::Address addr; + auto worker_id = WorkerID::FromRandom(); + addr.set_worker_id(worker_id.Binary()); + ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); + submitter_.AddActorQueueIfNotExists(actor_id, max_pending_calls, execute_out_of_order); + addr.set_port(0); + + // Submit number of `max_pending_calls` tasks would be OK. + for (int32_t i = 0; i < max_pending_calls; i++) { + ASSERT_FALSE(submitter_.PendingTasksFull(actor_id)); + auto task = CreateActorTaskHelper(actor_id, worker_id, i); + ASSERT_TRUE(CheckSubmitTask(task)); + } + + // Then the queue should be full. + ASSERT_TRUE(submitter_.PendingTasksFull(actor_id)); + + ASSERT_EQ(worker_client_->callbacks.size(), 0); + submitter_.ConnectActor(actor_id, addr, 0); + ASSERT_EQ(worker_client_->callbacks.size(), 10); + + // After task 0 reply comes, the queue turn to not full. + ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK(), 0)); + ASSERT_FALSE(submitter_.PendingTasksFull(actor_id)); + + // We can submit task 10, but after that the queue is full. + auto task = CreateActorTaskHelper(actor_id, worker_id, 10); + ASSERT_TRUE(CheckSubmitTask(task)); + ASSERT_TRUE(submitter_.PendingTasksFull(actor_id)); + + // All the replies comes, the queue shouble be empty. + while (!worker_client_->callbacks.empty()) { + ASSERT_TRUE(worker_client_->ReplyPushTask()); + } + ASSERT_FALSE(submitter_.PendingTasksFull(actor_id)); +} + INSTANTIATE_TEST_SUITE_P(ExecuteOutOfOrder, DirectActorSubmitterTest, ::testing::Values(true, false)); diff --git a/src/ray/core_worker/transport/direct_actor_task_submitter.cc b/src/ray/core_worker/transport/direct_actor_task_submitter.cc index 460185e2d..ac3c8960d 100644 --- a/src/ray/core_worker/transport/direct_actor_task_submitter.cc +++ b/src/ray/core_worker/transport/direct_actor_task_submitter.cc @@ -26,11 +26,14 @@ namespace ray { namespace core { void CoreWorkerDirectActorTaskSubmitter::AddActorQueueIfNotExists( - const ActorID &actor_id, bool execute_out_of_order) { + const ActorID &actor_id, int32_t max_pending_calls, bool execute_out_of_order) { absl::MutexLock lock(&mu_); // No need to check whether the insert was successful, since it is possible // for this worker to have multiple references to the same actor. - client_queues_.emplace(actor_id, ClientQueue(actor_id, execute_out_of_order)); + RAY_LOG(INFO) << "Set max pending calls to " << max_pending_calls << " for actor " + << actor_id; + client_queues_.emplace(actor_id, + ClientQueue(actor_id, execute_out_of_order, max_pending_calls)); } void CoreWorkerDirectActorTaskSubmitter::KillActor(const ActorID &actor_id, @@ -80,32 +83,39 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe // this sequence number. send_pos = task_spec.ActorCounter(); RAY_CHECK(queue->second.actor_submit_queue->Emplace(send_pos, task_spec)); + queue->second.cur_pending_calls++; task_queued = true; } } if (task_queued) { - // We must release the lock before resolving the task dependencies since - // the callback may get called in the same call stack. - resolver_.ResolveDependencies(task_spec, [this, send_pos, actor_id](Status status) { - absl::MutexLock lock(&mu_); - auto queue = client_queues_.find(actor_id); - RAY_CHECK(queue != client_queues_.end()); - auto &actor_submit_queue = queue->second.actor_submit_queue; - // Only dispatch tasks if the submitted task is still queued. The task - // may have been dequeued if the actor has since failed. - if (actor_submit_queue->Contains(send_pos)) { - if (status.ok()) { - actor_submit_queue->MarkDependencyResolved(send_pos); - SendPendingTasks(actor_id); - } else { - auto task_id = actor_submit_queue->Get(send_pos).first.TaskId(); - actor_submit_queue->MarkDependencyFailed(send_pos); - task_finisher_.FailOrRetryPendingTask( - task_id, rpc::ErrorType::DEPENDENCY_RESOLUTION_FAILED, &status); - } - } - }); + io_service_.post( + [task_spec, send_pos, this]() mutable { + // We must release the lock before resolving the task dependencies since + // the callback may get called in the same call stack. + auto actor_id = task_spec.ActorId(); + resolver_.ResolveDependencies( + task_spec, [this, send_pos, actor_id](Status status) { + absl::MutexLock lock(&mu_); + auto queue = client_queues_.find(actor_id); + RAY_CHECK(queue != client_queues_.end()); + auto &actor_submit_queue = queue->second.actor_submit_queue; + // Only dispatch tasks if the submitted task is still queued. The task + // may have been dequeued if the actor has since failed. + if (actor_submit_queue->Contains(send_pos)) { + if (status.ok()) { + actor_submit_queue->MarkDependencyResolved(send_pos); + SendPendingTasks(actor_id); + } else { + auto task_id = actor_submit_queue->Get(send_pos).first.TaskId(); + actor_submit_queue->MarkDependencyFailed(send_pos); + task_finisher_.FailOrRetryPendingTask( + task_id, rpc::ErrorType::DEPENDENCY_RESOLUTION_FAILED, &status); + } + } + }); + }, + "CoreWorkerDirectActorTaskSubmitter::SubmitTask"); } else { // Do not hold the lock while calling into task_finisher_. task_finisher_.MarkTaskCanceled(task_id); @@ -291,17 +301,26 @@ void CoreWorkerDirectActorTaskSubmitter::DisconnectActor( } void CoreWorkerDirectActorTaskSubmitter::CheckTimeoutTasks() { - absl::MutexLock lock(&mu_); - for (auto &queue_pair : client_queues_) { - auto &queue = queue_pair.second; - auto deque_itr = queue.wait_for_death_info_tasks.begin(); - while (deque_itr != queue.wait_for_death_info_tasks.end() && - /*timeout timestamp*/ deque_itr->first < current_time_ms()) { - auto task_spec = deque_itr->second; - task_finisher_.MarkTaskReturnObjectsFailed(task_spec, rpc::ErrorType::ACTOR_DIED); - deque_itr = queue.wait_for_death_info_tasks.erase(deque_itr); + std::vector task_specs; + { + absl::MutexLock lock(&mu_); + for (auto &queue_pair : client_queues_) { + auto &queue = queue_pair.second; + auto deque_itr = queue.wait_for_death_info_tasks.begin(); + while (deque_itr != queue.wait_for_death_info_tasks.end() && + /*timeout timestamp*/ deque_itr->first < current_time_ms()) { + auto &task_spec = deque_itr->second; + task_specs.push_back(task_spec); + deque_itr = queue.wait_for_death_info_tasks.erase(deque_itr); + } } } + + // Do not hold mu_, because MarkTaskReturnObjectsFailed may call python from cpp, + // and may cause deadlock with SubmitActorTask thread when aquire GIL. + for (auto &task_spec : task_specs) { + task_finisher_.MarkTaskReturnObjectsFailed(task_spec, rpc::ErrorType::ACTOR_DIED); + } } void CoreWorkerDirectActorTaskSubmitter::SendPendingTasks(const ActorID &actor_id) { @@ -427,14 +446,15 @@ void CoreWorkerDirectActorTaskSubmitter::PushActorTask(ClientQueue &queue, << ", wait queue size=" << queue.wait_for_death_info_tasks.size(); } } - - if (!will_retry) { - // If we don't need to retry, mark the task as completed. + { absl::MutexLock lock(&mu_); auto queue_pair = client_queues_.find(actor_id); RAY_CHECK(queue_pair != client_queues_.end()); auto &queue = queue_pair->second; - queue.actor_submit_queue->MarkTaskCompleted(actor_counter, task_spec); + if (!will_retry) { + queue.actor_submit_queue->MarkTaskCompleted(actor_counter, task_spec); + } + queue.cur_pending_calls--; } }; @@ -468,5 +488,25 @@ bool CoreWorkerDirectActorTaskSubmitter::IsActorAlive(const ActorID &actor_id) c auto iter = client_queues_.find(actor_id); return (iter != client_queues_.end() && iter->second.rpc_client); } + +bool CoreWorkerDirectActorTaskSubmitter::PendingTasksFull(const ActorID &actor_id) const { + absl::MutexLock lock(&mu_); + auto it = client_queues_.find(actor_id); + RAY_CHECK(it != client_queues_.end()); + return it->second.max_pending_calls > 0 && + it->second.cur_pending_calls >= it->second.max_pending_calls; +} + +std::string CoreWorkerDirectActorTaskSubmitter::DebugString( + const ActorID &actor_id) const { + absl::MutexLock lock(&mu_); + auto it = client_queues_.find(actor_id); + RAY_CHECK(it != client_queues_.end()); + std::ostringstream stream; + stream << "Submitter debug string for actor " << actor_id << " " + << it->second.DebugString(); + return stream.str(); +} + } // namespace core } // namespace ray diff --git a/src/ray/core_worker/transport/direct_actor_task_submitter.h b/src/ray/core_worker/transport/direct_actor_task_submitter.h index 55be94d2e..5e7f865c4 100644 --- a/src/ray/core_worker/transport/direct_actor_task_submitter.h +++ b/src/ray/core_worker/transport/direct_actor_task_submitter.h @@ -46,6 +46,7 @@ namespace core { class CoreWorkerDirectActorTaskSubmitterInterface { public: virtual void AddActorQueueIfNotExists(const ActorID &actor_id, + int32_t max_pending_calls, bool execute_out_of_order = false) = 0; virtual void ConnectActor(const ActorID &actor_id, const rpc::Address &address, int64_t num_restarts) = 0; @@ -65,11 +66,13 @@ class CoreWorkerDirectActorTaskSubmitter CoreWorkerDirectActorTaskSubmitter( rpc::CoreWorkerClientPool &core_worker_client_pool, CoreWorkerMemoryStore &store, TaskFinisherInterface &task_finisher, ActorCreatorInterface &actor_creator, - std::function warn_excess_queueing) + std::function warn_excess_queueing, + instrumented_io_context &io_service) : core_worker_client_pool_(core_worker_client_pool), resolver_(store, task_finisher, actor_creator), task_finisher_(task_finisher), - warn_excess_queueing_(warn_excess_queueing) { + warn_excess_queueing_(warn_excess_queueing), + io_service_(io_service) { next_queueing_warn_threshold_ = ::RayConfig::instance().actor_excess_queueing_warn_threshold(); } @@ -80,12 +83,14 @@ class CoreWorkerDirectActorTaskSubmitter /// not receive another reference to the same actor. /// /// \param[in] actor_id The actor for whom to add a queue. - void AddActorQueueIfNotExists(const ActorID &actor_id, + /// \param[in] max_pending_calls The max pending calls for the actor to be added. + void AddActorQueueIfNotExists(const ActorID &actor_id, int32_t max_pending_calls, bool execute_out_of_order = false); /// Submit a task to an actor for execution. /// - /// \param[in] task The task spec to submit. + /// \param[in] task_spec The task spec to submit. + /// /// \return Status::Invalid if the task is not yet supported. Status SubmitTask(TaskSpecification task_spec); @@ -126,9 +131,23 @@ class CoreWorkerDirectActorTaskSubmitter /// Check timeout tasks that are waiting for Death info. void CheckTimeoutTasks(); + /// If the the number of tasks in requests is greater than or equal to + /// max_pending_calls. + /// + /// \param[in] actor_id Actor id. + /// \return Whether the corresponding client queue is full or not. + bool PendingTasksFull(const ActorID &actor_id) const; + + /// Returns debug string for class. + /// + /// \param[in] actor_id The actor whose debug string to return. + /// \return string. + std::string DebugString(const ActorID &actor_id) const; + private: struct ClientQueue { - ClientQueue(ActorID actor_id, bool execute_out_of_order) { + ClientQueue(ActorID actor_id, bool execute_out_of_order, int32_t max_pending_calls) + : max_pending_calls(max_pending_calls) { if (execute_out_of_order) { actor_submit_queue = std::make_unique(actor_id); } else { @@ -172,6 +191,24 @@ class CoreWorkerDirectActorTaskSubmitter /// without replies. std::unordered_map> inflight_task_callbacks; + + /// The max number limit of task capacity used for back pressure. + /// If the number of tasks in requests >= max_pending_calls, it can't continue to + /// push task to ClientQueue. + const int32_t max_pending_calls; + + /// The current task number in this client queue. + int32_t cur_pending_calls = 0; + + /// Returns debug string for class. + /// + /// \return string. + std::string DebugString() const { + std::ostringstream stream; + stream << "max_pending_calls=" << max_pending_calls + << " cur_pending_calls=" << cur_pending_calls; + return stream.str(); + } }; /// Push a task to a remote actor via the given client. @@ -234,6 +271,9 @@ class CoreWorkerDirectActorTaskSubmitter /// exceeds this quantity. This threshold is doubled each time it is hit. int64_t next_queueing_warn_threshold_; + /// The event loop where the actor task events are handled. + instrumented_io_context &io_service_; + friend class CoreWorkerTest; }; diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index c368a21a6..ada01ab1e 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -29,6 +29,7 @@ #include "ray/common/id.h" #include "ray/common/ray_object.h" #include "ray/core_worker/actor_creator.h" +#include "ray/core_worker/actor_handle.h" #include "ray/core_worker/context.h" #include "ray/core_worker/fiber.h" #include "ray/core_worker/store_provider/memory_store/memory_store.h" diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 4f50062d2..e5447af3f 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -373,6 +373,8 @@ message ActorCreationTaskSpec { repeated ConcurrencyGroup concurrency_groups = 13; // Whether to enable out of order execution. bool execute_out_of_order = 14; + // The max number of pending actor calls. + int32 max_pending_calls = 15; } // Task spec of an actor task. diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index a5a59d9f4..5ab2bca65 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -64,6 +64,9 @@ message ActorHandle { // Whether the actor supports out of order execution. bool execute_out_of_order = 12; + + // The max number of pending actor calls. + int32 max_pending_calls = 13; } message ReturnObject { diff --git a/src/ray/streaming/streaming.cc b/src/ray/streaming/streaming.cc index 2d54c57dc..a06ee36cf 100644 --- a/src/ray/streaming/streaming.cc +++ b/src/ray/streaming/streaming.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "ray/streaming/streaming.h" + #include "ray/core_worker/core_worker.h" namespace ray { @@ -44,8 +45,12 @@ std::vector SendInternal(const ActorID &peer_actor_id, std::move(buffer), meta, std::vector(), true))); std::vector> results; - return CoreWorkerProcess::GetCoreWorker().SubmitActorTask(peer_actor_id, function, args, - options); + auto result = CoreWorkerProcess::GetCoreWorker().SubmitActorTask( + peer_actor_id, function, args, options); + if (!result.has_value()) { + RAY_CHECK(false) << "Back pressure should not be enabled."; + } + return result.value(); } } // namespace streaming } // namespace ray diff --git a/streaming/src/test/queue_tests_base.h b/streaming/src/test/queue_tests_base.h index b3fbbbd47..4aec671a3 100644 --- a/streaming/src/test/queue_tests_base.h +++ b/streaming/src/test/queue_tests_base.h @@ -117,7 +117,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { "", "", "check_current_test_status", "")}; auto return_refs = driver.SubmitActorTask(actor_id, func, args, options); - auto return_ids = ObjectRefsToIds(return_refs); + auto return_ids = ObjectRefsToIds(return_refs.value()); std::vector wait_results; std::vector> results;