From e5be5fd46d7cfb4d37f729e040838284cc2b8f6a Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Mon, 15 Jul 2019 18:15:21 -0700 Subject: [PATCH] Remove dependencies from TaskExecutionSpecification (#5166) --- .../org/ray/runtime/AbstractRayRuntime.java | 23 +++++++------ .../ray/runtime/raylet/MockRayletClient.java | 7 ++-- .../ray/runtime/raylet/RayletClientImpl.java | 19 +++++------ .../java/org/ray/runtime/task/TaskSpec.java | 18 +++++------ python/ray/_raylet.pyx | 7 ++-- python/ray/actor.py | 2 +- python/ray/includes/libraylet.pxd | 4 +-- python/ray/includes/task.pxd | 7 ++-- python/ray/includes/task.pxi | 24 ++++++-------- python/ray/state.py | 3 +- python/ray/worker.py | 14 ++++---- src/ray/common/task/task.cc | 9 ++---- src/ray/common/task/task.h | 2 +- src/ray/common/task/task_execution_spec.cc | 7 +--- src/ray/common/task/task_execution_spec.h | 6 ---- src/ray/common/task/task_spec.cc | 6 ++++ src/ray/common/task/task_spec.h | 2 ++ src/ray/common/task/task_util.h | 5 ++- src/ray/core_worker/common.h | 24 -------------- src/ray/core_worker/context.cc | 10 +++--- src/ray/core_worker/context.h | 2 +- src/ray/core_worker/task_execution.cc | 32 +++++++++---------- src/ray/core_worker/task_interface.cc | 23 +++++++------ .../core_worker/transport/raylet_transport.cc | 8 ++--- .../core_worker/transport/raylet_transport.h | 2 +- src/ray/core_worker/transport/transport.h | 2 +- src/ray/protobuf/common.proto | 16 ++++------ src/ray/raylet/format/node_manager.fbs | 1 - ...org_ray_runtime_raylet_RayletClientImpl.cc | 10 ++---- .../org_ray_runtime_raylet_RayletClientImpl.h | 6 ++-- src/ray/raylet/node_manager.cc | 19 ++--------- src/ray/raylet/raylet_client.cc | 6 ++-- src/ray/raylet/raylet_client.h | 4 +-- 33 files changed, 136 insertions(+), 194 deletions(-) diff --git a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java index 88c281f79..2d51f113a 100644 --- a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java @@ -244,7 +244,7 @@ public abstract class AbstractRayRuntime implements RayRuntime { @Override public RayObject call(RayFunc func, Object[] args, CallOptions options) { - TaskSpec spec = createTaskSpec(func, null, RayActorImpl.NIL, args, false, options); + TaskSpec spec = createTaskSpec(func, null, RayActorImpl.NIL, args, false, false, options); rayletClient.submitTask(spec); return new RayObjectImpl(spec.returnIds[0]); } @@ -257,8 +257,7 @@ public abstract class AbstractRayRuntime implements RayRuntime { RayActorImpl actorImpl = (RayActorImpl) actor; TaskSpec spec; synchronized (actor) { - spec = createTaskSpec(func, null, actorImpl, args, false, null); - spec.getExecutionDependencies().add(((RayActorImpl) actor).getTaskCursor()); + spec = createTaskSpec(func, null, actorImpl, args, false, true, null); actorImpl.setTaskCursor(spec.returnIds[1]); actorImpl.clearNewActorHandles(); } @@ -271,7 +270,7 @@ public abstract class AbstractRayRuntime implements RayRuntime { public RayActor createActor(RayFunc actorFactoryFunc, Object[] args, ActorCreationOptions options) { TaskSpec spec = createTaskSpec(actorFactoryFunc, null, RayActorImpl.NIL, - args, true, options); + args, true, false, options); RayActorImpl actor = new RayActorImpl(new UniqueId(spec.returnIds[0].getBytes())); actor.increaseTaskCounter(); actor.setTaskCursor(spec.returnIds[0]); @@ -293,7 +292,7 @@ public abstract class AbstractRayRuntime implements RayRuntime { CallOptions options) { checkPyArguments(args); PyFunctionDescriptor desc = new PyFunctionDescriptor(moduleName, "", functionName); - TaskSpec spec = createTaskSpec(null, desc, RayPyActorImpl.NIL, args, false, options); + TaskSpec spec = createTaskSpec(null, desc, RayPyActorImpl.NIL, args, false, false, options); rayletClient.submitTask(spec); return new RayObjectImpl(spec.returnIds[0]); } @@ -306,8 +305,7 @@ public abstract class AbstractRayRuntime implements RayRuntime { RayPyActorImpl actorImpl = (RayPyActorImpl) pyActor; TaskSpec spec; synchronized (pyActor) { - spec = createTaskSpec(null, desc, actorImpl, args, false, null); - spec.getExecutionDependencies().add(actorImpl.getTaskCursor()); + spec = createTaskSpec(null, desc, actorImpl, args, false, true, null); actorImpl.setTaskCursor(spec.returnIds[1]); actorImpl.clearNewActorHandles(); } @@ -320,7 +318,7 @@ public abstract class AbstractRayRuntime implements RayRuntime { ActorCreationOptions options) { checkPyArguments(args); PyFunctionDescriptor desc = new PyFunctionDescriptor(moduleName, className, "__init__"); - TaskSpec spec = createTaskSpec(null, desc, RayPyActorImpl.NIL, args, true, options); + TaskSpec spec = createTaskSpec(null, desc, RayPyActorImpl.NIL, args, true, false, options); RayPyActorImpl actor = new RayPyActorImpl(spec.actorCreationId, moduleName, className); actor.increaseTaskCounter(); actor.setTaskCursor(spec.returnIds[0]); @@ -337,11 +335,12 @@ public abstract class AbstractRayRuntime implements RayRuntime { * @param actor The actor handle. If the task is not an actor task, actor id must be NIL. * @param args The arguments for the remote function. * @param isActorCreationTask Whether this task is an actor creation task. + * @param isActorTask Whether this task is an actor task. * @return A TaskSpec object. */ private TaskSpec createTaskSpec(RayFunc func, PyFunctionDescriptor pyFunctionDescriptor, RayActorImpl actor, Object[] args, - boolean isActorCreationTask, BaseTaskOptions taskOptions) { + boolean isActorCreationTask, boolean isActorTask, BaseTaskOptions taskOptions) { Preconditions.checkArgument((func == null) != (pyFunctionDescriptor == null)); TaskId taskId = rayletClient.generateTaskId(workerContext.getCurrentJobId(), @@ -382,6 +381,11 @@ public abstract class AbstractRayRuntime implements RayRuntime { functionDescriptor = pyFunctionDescriptor; } + ObjectId previousActorTaskDummyObjectId = ObjectId.NIL; + if (isActorTask) { + previousActorTaskDummyObjectId = actor.getTaskCursor(); + } + return new TaskSpec( workerContext.getCurrentJobId(), taskId, @@ -392,6 +396,7 @@ public abstract class AbstractRayRuntime implements RayRuntime { actor.getId(), actor.getHandleId(), actor.increaseTaskCounter(), + previousActorTaskDummyObjectId, actor.getNewActorHandles().toArray(new UniqueId[0]), ArgumentsBuilder.wrap(args, language == TaskLanguage.PYTHON), numReturns, diff --git a/java/runtime/src/main/java/org/ray/runtime/raylet/MockRayletClient.java b/java/runtime/src/main/java/org/ray/runtime/raylet/MockRayletClient.java index d329459e0..0dc8f4c9e 100644 --- a/java/runtime/src/main/java/org/ray/runtime/raylet/MockRayletClient.java +++ b/java/runtime/src/main/java/org/ray/runtime/raylet/MockRayletClient.java @@ -138,10 +138,9 @@ public class MockRayletClient implements RayletClient { } } } - // Check whether task dependencies are ready. - for (ObjectId id : spec.getExecutionDependencies()) { - if (!store.isObjectReady(id)) { - unreadyObjects.add(id); + if (spec.isActorTask()) { + if (!store.isObjectReady(spec.previousActorTaskDummyObjectId)) { + unreadyObjects.add(spec.previousActorTaskDummyObjectId); } } return unreadyObjects; diff --git a/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java b/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java index 9577cf2e2..059edbe67 100644 --- a/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java +++ b/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java @@ -81,12 +81,7 @@ public class RayletClientImpl implements RayletClient { Preconditions.checkState(!spec.jobId.isNil()); byte[] taskSpec = convertTaskSpecToProtobuf(spec); - byte[] cursorId = null; - if (!spec.getExecutionDependencies().isEmpty()) { - //TODO(hchen): handle more than one dependencies. - cursorId = spec.getExecutionDependencies().get(0).getBytes(); - } - nativeSubmitTask(client, cursorId, taskSpec); + nativeSubmitTask(client, taskSpec); } @Override @@ -195,6 +190,7 @@ public class RayletClientImpl implements RayletClient { // Parse ActorTaskSpec. UniqueId actorId = UniqueId.NIL; UniqueId actorHandleId = UniqueId.NIL; + ObjectId previousActorTaskDummyObjectId = ObjectId.NIL; int actorCounter = 0; if (taskSpec.getType() == Common.TaskType.ACTOR_TASK) { Common.ActorTaskSpec actorTaskSpec = taskSpec.getActorTaskSpec(); @@ -202,14 +198,17 @@ public class RayletClientImpl implements RayletClient { actorHandleId = UniqueId .fromByteBuffer(actorTaskSpec.getActorHandleId().asReadOnlyByteBuffer()); actorCounter = (int) actorTaskSpec.getActorCounter(); + previousActorTaskDummyObjectId = ObjectId.fromByteBuffer( + actorTaskSpec.getPreviousActorTaskDummyObjectId().asReadOnlyByteBuffer()); newActorHandles = actorTaskSpec.getNewActorHandlesList().stream() .map(byteString -> UniqueId.fromByteBuffer(byteString.asReadOnlyByteBuffer())) .toArray(UniqueId[]::new); } return new TaskSpec(jobId, taskId, parentTaskId, parentCounter, actorCreationId, - maxActorReconstructions, actorId, actorHandleId, actorCounter, newActorHandles, - args, numReturns, resources, TaskLanguage.JAVA, functionDescriptor, dynamicWorkerOptions); + maxActorReconstructions, actorId, actorHandleId, actorCounter, + previousActorTaskDummyObjectId, newActorHandles, args, numReturns, resources, + TaskLanguage.JAVA, functionDescriptor, dynamicWorkerOptions); } /** @@ -275,6 +274,8 @@ public class RayletClientImpl implements RayletClient { .setActorId(ByteString.copyFrom(task.actorId.getBytes())) .setActorHandleId(ByteString.copyFrom(task.actorHandleId.getBytes())) .setActorCreationDummyObjectId(ByteString.copyFrom(task.actorId.getBytes())) + .setPreviousActorTaskDummyObjectId( + ByteString.copyFrom(task.previousActorTaskDummyObjectId.getBytes())) .setActorCounter(task.actorCounter) .addAllNewActorHandles(newHandles) ); @@ -310,7 +311,7 @@ public class RayletClientImpl implements RayletClient { private static native long nativeInit(String localSchedulerSocket, byte[] workerId, boolean isWorker, byte[] driverTaskId); - private static native void nativeSubmitTask(long client, byte[] cursorId, byte[] taskSpec) + private static native void nativeSubmitTask(long client, byte[] taskSpec) throws RayException; private static native byte[] nativeGetTask(long client) throws RayException; diff --git a/java/runtime/src/main/java/org/ray/runtime/task/TaskSpec.java b/java/runtime/src/main/java/org/ray/runtime/task/TaskSpec.java index 8d6353b46..f696b13ab 100644 --- a/java/runtime/src/main/java/org/ray/runtime/task/TaskSpec.java +++ b/java/runtime/src/main/java/org/ray/runtime/task/TaskSpec.java @@ -46,6 +46,9 @@ public class TaskSpec { // Number of tasks that have been submitted to this actor so far. public final int actorCounter; + // Object id returned by the previous task submitted to the same actor. + public final ObjectId previousActorTaskDummyObjectId; + // Task arguments. public final UniqueId[] newActorHandles; @@ -55,7 +58,7 @@ public class TaskSpec { // number of return objects. public final int numReturns; - // returns ids. + // Return ids. public final ObjectId[] returnIds; // The task's resource demands. @@ -71,8 +74,6 @@ public class TaskSpec { // is Python, the type is PyFunctionDescriptor. private final FunctionDescriptor functionDescriptor; - private List executionDependencies; - public boolean isActorTask() { return !actorId.isNil(); } @@ -91,6 +92,7 @@ public class TaskSpec { UniqueId actorId, UniqueId actorHandleId, int actorCounter, + ObjectId previousActorTaskDummyObjectId, UniqueId[] newActorHandles, FunctionArg[] args, int numReturns, @@ -107,6 +109,7 @@ public class TaskSpec { this.actorId = actorId; this.actorHandleId = actorHandleId; this.actorCounter = actorCounter; + this.previousActorTaskDummyObjectId = previousActorTaskDummyObjectId; this.newActorHandles = newActorHandles; this.args = args; this.numReturns = numReturns; @@ -128,7 +131,6 @@ public class TaskSpec { Preconditions.checkArgument(false, "Unknown task language: {}.", language); } this.functionDescriptor = functionDescriptor; - this.executionDependencies = new ArrayList<>(); } public JavaFunctionDescriptor getJavaFunctionDescriptor() { @@ -141,10 +143,6 @@ public class TaskSpec { return (PyFunctionDescriptor) functionDescriptor; } - public List getExecutionDependencies() { - return executionDependencies; - } - @Override public String toString() { return "TaskSpec{" + @@ -157,14 +155,14 @@ public class TaskSpec { ", actorId=" + actorId + ", actorHandleId=" + actorHandleId + ", actorCounter=" + actorCounter + + ", previousActorTaskDummyObjectId=" + previousActorTaskDummyObjectId + ", newActorHandles=" + Arrays.toString(newActorHandles) + ", args=" + Arrays.toString(args) + ", numReturns=" + numReturns + ", resources=" + resources + ", language=" + language + ", functionDescriptor=" + functionDescriptor + - ", dynamicWorkerOptions=" + dynamicWorkerOptions + - ", executionDependencies=" + executionDependencies + + ", dynamicWorkerOptions=" + dynamicWorkerOptions + '}'; } } diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 3e5586069..8005a4d04 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -232,14 +232,11 @@ cdef class RayletClient: def disconnect(self): check_status(self.client.get().Disconnect()) - def submit_task(self, TaskSpec task_spec, execution_dependencies): + def submit_task(self, TaskSpec task_spec): cdef: CObjectID c_id - c_vector[CObjectID] c_dependencies - for dep in execution_dependencies: - c_dependencies.push_back((dep).native()) check_status(self.client.get().SubmitTask( - c_dependencies, task_spec.task_spec.get()[0])) + task_spec.task_spec.get()[0])) def get_task(self): cdef: diff --git a/python/ray/actor.py b/python/ray/actor.py index 8ca4a90fc..edc803fd5 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -547,7 +547,7 @@ class ActorHandle(object): actor_counter=self._ray_actor_counter, actor_creation_dummy_object_id=( self._ray_actor_creation_dummy_object_id), - execution_dependencies=[self._ray_actor_cursor], + previous_actor_task_dummy_object_id=self._ray_actor_cursor, new_actor_handles=self._ray_new_actor_handles, # We add one for the dummy return ID. num_return_vals=num_return_vals + 1, diff --git a/python/ray/includes/libraylet.pxd b/python/ray/includes/libraylet.pxd index 7f6d03a5c..2372ec884 100644 --- a/python/ray/includes/libraylet.pxd +++ b/python/ray/includes/libraylet.pxd @@ -50,9 +50,7 @@ cdef extern from "ray/raylet/raylet_client.h" nogil: c_bool is_worker, const CJobID &job_id, const CLanguage &language) CRayStatus Disconnect() - CRayStatus SubmitTask( - const c_vector[CObjectID] &execution_dependencies, - const CTaskSpec &task_spec) + CRayStatus SubmitTask(const CTaskSpec &task_spec) CRayStatus GetTask(unique_ptr[CTaskSpec] *task_spec) CRayStatus TaskDone() CRayStatus FetchOrReconstruct(c_vector[CObjectID] &object_ids, diff --git a/python/ray/includes/task.pxd b/python/ray/includes/task.pxd index 95a1b06ce..d8436f491 100644 --- a/python/ray/includes/task.pxd +++ b/python/ray/includes/task.pxd @@ -66,6 +66,7 @@ cdef extern from "ray/common/task/task_spec.h" namespace "ray" nogil: c_bool IsActorTask() const CActorID ActorCreationId() const CObjectID ActorCreationDummyObjectId() const + CObjectID PreviousActorTaskDummyObjectId() const uint64_t MaxActorReconstructions() const CActorID ActorId() const CActorHandleID ActorHandleId() const @@ -92,8 +93,10 @@ cdef extern from "ray/common/task/task_util.h" namespace "ray" nogil: TaskSpecBuilder &SetActorTaskSpec( const CActorID &actor_id, const CActorHandleID &actor_handle_id, - const CObjectID &actor_creation_dummy_object_id, uint64_t actor_counter, - const c_vector[CActorHandleID] &new_handle_ids) + const CObjectID &actor_creation_dummy_object_id, + const CObjectID &previous_actor_task_dummy_object_id, + uint64_t actor_counter, + const c_vector[CActorHandleID] &new_handle_ids); RpcTaskSpec GetMessage() diff --git a/python/ray/includes/task.pxi b/python/ray/includes/task.pxi index dfad99205..0aa328e32 100644 --- a/python/ray/includes/task.pxi +++ b/python/ray/includes/task.pxi @@ -23,6 +23,7 @@ cdef class TaskSpec: int num_returns, TaskID parent_task_id, int parent_counter, ActorID actor_creation_id, ObjectID actor_creation_dummy_object_id, + ObjectID previous_actor_task_dummy_object_id, int32_t max_actor_reconstructions, ActorID actor_id, ActorHandleID actor_handle_id, int actor_counter, new_actor_handles, resource_map, placement_resource_map): @@ -85,6 +86,7 @@ cdef class TaskSpec: actor_id.native(), actor_handle_id.native(), actor_creation_dummy_object_id.native(), + previous_actor_task_dummy_object_id.native(), actor_counter, c_new_actor_handles, ) @@ -229,6 +231,13 @@ cdef class TaskSpec: return ObjectID( self.task_spec.get().ActorCreationDummyObjectId().Binary()) + def previous_actor_task_dummy_object_id(self): + """Return the object ID of the previously executed actor task.""" + if not self.is_actor_task(): + return ObjectID.nil() + return ObjectID( + self.task_spec.get().PreviousActorTaskDummyObjectId().Binary()) + def actor_id(self): """Return the actor ID for this task.""" if not self.is_actor_task(): @@ -247,13 +256,10 @@ cdef class TaskExecutionSpec: cdef: unique_ptr[CTaskExecutionSpec] c_spec - def __init__(self, execution_dependencies): + def __init__(self): cdef: RpcTaskExecutionSpec message; - for dependency in execution_dependencies: - message.add_dependencies( - (dependency).binary()) self.c_spec.reset(new CTaskExecutionSpec(message)) @staticmethod @@ -264,16 +270,6 @@ cdef class TaskExecutionSpec: self.c_spec.reset(new CTaskExecutionSpec(string)) return self - def dependencies(self): - cdef: - CObjectID c_id - c_vector[CObjectID] dependencies = ( - self.c_spec.get().ExecutionDependencies()) - ret = [] - for c_id in dependencies: - ret.append(ObjectID(c_id.Binary())) - return ret - def num_forwards(self): return self.c_spec.get().NumForwards() diff --git a/python/ray/state.py b/python/ray/state.py index b60e0a1cb..d42f564a7 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -341,6 +341,8 @@ class GlobalState(object): "ActorCreationID": task.actor_creation_id().hex(), "ActorCreationDummyObjectID": ( task.actor_creation_dummy_object_id().hex()), + "PreviousActorTaskDummyObjectID": ( + task.previous_actor_task_dummy_object_id().hex()), "ActorCounter": task.actor_counter(), "Args": task.arguments(), "ReturnObjectIDs": task.returns(), @@ -356,7 +358,6 @@ class GlobalState(object): task_table_data.task.task_execution_spec.SerializeToString()) return { "ExecutionSpec": { - "Dependencies": execution_spec.dependencies(), "NumForwards": execution_spec.num_forwards(), }, "TaskSpec": task_spec_info diff --git a/python/ray/worker.py b/python/ray/worker.py index 2f26b800d..40bbd8c75 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -586,8 +586,8 @@ class Worker(object): actor_counter=0, actor_creation_id=None, actor_creation_dummy_object_id=None, + previous_actor_task_dummy_object_id=None, max_actor_reconstructions=0, - execution_dependencies=None, new_actor_handles=None, num_return_vals=None, resources=None, @@ -611,7 +611,9 @@ class Worker(object): actor_creation_dummy_object_id: If this task is an actor method, then this argument is the dummy object ID associated with the actor creation task for the corresponding actor. - execution_dependencies: The execution dependencies for this task. + previous_actor_task_dummy_object_id: If this task is an actor, + then this argument is the dummy object ID associated with the + task previously submitted to the corresponding actor. num_return_vals: The number of return values this function should have. resources: The resource requirements for this task. @@ -652,10 +654,6 @@ class Worker(object): else: args_for_raylet.append(put(arg)) - # By default, there are no execution dependencies. - if execution_dependencies is None: - execution_dependencies = [] - if new_actor_handles is None: new_actor_handles = [] @@ -705,6 +703,7 @@ class Worker(object): self.task_context.task_index, actor_creation_id, actor_creation_dummy_object_id, + previous_actor_task_dummy_object_id, max_actor_reconstructions, actor_id, actor_handle_id, @@ -713,7 +712,7 @@ class Worker(object): resources, placement_resources, ) - self.raylet_client.submit_task(task, execution_dependencies) + self.raylet_client.submit_task(task) return task.returns() @@ -1887,6 +1886,7 @@ def connect(node, 0, # parent_counter. ActorID.nil(), # actor_creation_id. ObjectID.nil(), # actor_creation_dummy_object_id. + ObjectID.nil(), # previous_actor_task_dummy_object_id. 0, # max_actor_reconstructions. ActorID.nil(), # actor_id. ActorHandleID.nil(), # actor_handle_id. diff --git a/src/ray/common/task/task.cc b/src/ray/common/task/task.cc index 3a603d72d..9c60e078a 100644 --- a/src/ray/common/task/task.cc +++ b/src/ray/common/task/task.cc @@ -22,16 +22,13 @@ void Task::ComputeDependencies() { dependencies_.push_back(task_spec_.ArgId(i, j)); } } - // TODO(atumanov): why not just return a const reference to ExecutionDependencies() and - // avoid a copy. - auto execution_dependencies = task_execution_spec_.ExecutionDependencies(); - dependencies_.insert(dependencies_.end(), execution_dependencies.begin(), - execution_dependencies.end()); + if (task_spec_.IsActorTask()) { + dependencies_.push_back(task_spec_.PreviousActorTaskDummyObjectId()); + } } void Task::CopyTaskExecutionSpec(const Task &task) { task_execution_spec_ = task.task_execution_spec_; - ComputeDependencies(); } std::string Task::DebugString() const { diff --git a/src/ray/common/task/task.h b/src/ray/common/task/task.h index 80e3f71eb..4c37ebdd4 100644 --- a/src/ray/common/task/task.h +++ b/src/ray/common/task/task.h @@ -72,7 +72,7 @@ class Task { /// dependencies, etc. TaskSpecification task_spec_; /// Task execution specification, consisting of all dynamic/mutable - /// information about this task determined at execution time.. + /// information about this task determined at execution time. TaskExecutionSpecification task_execution_spec_; /// A cached copy of the task's object dependencies, including arguments from /// the TaskSpecification and execution dependencies from the diff --git a/src/ray/common/task/task_execution_spec.cc b/src/ray/common/task/task_execution_spec.cc index 849414389..3f1af7551 100644 --- a/src/ray/common/task/task_execution_spec.cc +++ b/src/ray/common/task/task_execution_spec.cc @@ -4,10 +4,6 @@ namespace ray { -const std::vector TaskExecutionSpecification::ExecutionDependencies() const { - return IdVectorFromProtobuf(message_.dependencies()); -} - size_t TaskExecutionSpecification::NumForwards() const { return message_.num_forwards(); } void TaskExecutionSpecification::IncrementNumForwards() { @@ -16,8 +12,7 @@ void TaskExecutionSpecification::IncrementNumForwards() { std::string TaskExecutionSpecification::DebugString() const { std::ostringstream stream; - stream << "num_dependencies=" << message_.dependencies_size() - << ", num_forwards=" << message_.num_forwards(); + stream << "num_forwards=" << message_.num_forwards(); return stream.str(); } diff --git a/src/ray/common/task/task_execution_spec.h b/src/ray/common/task/task_execution_spec.h index 44813c275..ea35bc3de 100644 --- a/src/ray/common/task/task_execution_spec.h +++ b/src/ray/common/task/task_execution_spec.h @@ -29,12 +29,6 @@ class TaskExecutionSpecification : public MessageWrapper explicit TaskExecutionSpecification(const std::string &serialized_binary) : MessageWrapper(serialized_binary) {} - /// Get the task's execution dependencies. - /// - /// \return A vector of object IDs representing this task's execution - /// dependencies. - const std::vector ExecutionDependencies() const; - /// Get the number of times this task has been forwarded. /// /// \return The number of times this task has been forwarded. diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index 1d5bcb7aa..b0c01b418 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -128,6 +128,12 @@ ObjectID TaskSpecification::ActorCreationDummyObjectId() const { message_.actor_task_spec().actor_creation_dummy_object_id()); } +ObjectID TaskSpecification::PreviousActorTaskDummyObjectId() const { + RAY_CHECK(IsActorTask()); + return ObjectID::FromBinary( + message_.actor_task_spec().previous_actor_task_dummy_object_id()); +} + ObjectID TaskSpecification::ActorDummyObject() const { RAY_CHECK(IsActorTask() || IsActorCreationTask()); return ReturnId(NumReturns() - 1); diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index 376433df4..353abb7b1 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -115,6 +115,8 @@ class TaskSpecification : public MessageWrapper { ObjectID ActorCreationDummyObjectId() const; + ObjectID PreviousActorTaskDummyObjectId() const; + std::vector NewActorHandles() const; ObjectID ActorDummyObject() const; diff --git a/src/ray/common/task/task_util.h b/src/ray/common/task/task_util.h index 21c084ae5..d530b691e 100644 --- a/src/ray/common/task/task_util.h +++ b/src/ray/common/task/task_util.h @@ -93,7 +93,8 @@ class TaskSpecBuilder { /// \return Reference to the builder object itself. TaskSpecBuilder &SetActorTaskSpec( const ActorID &actor_id, const ActorHandleID &actor_handle_id, - const ObjectID &actor_creation_dummy_object_id, uint64_t actor_counter, + const ObjectID &actor_creation_dummy_object_id, + const ObjectID &previous_actor_task_dummy_object_id, uint64_t actor_counter, const std::vector &new_handle_ids = {}) { message_.set_type(TaskType::ACTOR_TASK); auto actor_spec = message_.mutable_actor_task_spec(); @@ -101,6 +102,8 @@ class TaskSpecBuilder { actor_spec->set_actor_handle_id(actor_handle_id.Binary()); actor_spec->set_actor_creation_dummy_object_id( actor_creation_dummy_object_id.Binary()); + actor_spec->set_previous_actor_task_dummy_object_id( + previous_actor_task_dummy_object_id.Binary()); actor_spec->set_actor_counter(actor_counter); for (const auto &id : new_handle_ids) { actor_spec->add_new_actor_handles(id.Binary()); diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index 2e32e6ecd..aabb3fa83 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -75,30 +75,6 @@ struct TaskInfo { const TaskType task_type; }; -/// Task specification, which includes the immutable information about the task -/// which are determined at the submission time. -/// TODO(zhijunfu): this can be removed after everything is moved to protobuf. -class TaskSpec { - public: - TaskSpec(const TaskSpecification &task_spec, const std::vector &dependencies) - : task_spec_(task_spec), dependencies_(dependencies) {} - - TaskSpec(const TaskSpecification &&task_spec, - const std::vector &&dependencies) - : task_spec_(task_spec), dependencies_(dependencies) {} - - const TaskSpecification &GetTaskSpecification() const { return task_spec_; } - - const std::vector &GetDependencies() const { return dependencies_; } - - private: - /// Raylet task specification. - TaskSpecification task_spec_; - - /// Dependencies. - std::vector dependencies_; -}; - enum class StoreProviderType { PLASMA }; enum class TaskTransportType { RAYLET }; diff --git a/src/ray/core_worker/context.cc b/src/ray/core_worker/context.cc index 9083fe2b9..c5d7e7857 100644 --- a/src/ray/core_worker/context.cc +++ b/src/ray/core_worker/context.cc @@ -20,7 +20,9 @@ struct WorkerThreadContext { put_index = 0; } - void SetCurrentTask(const TaskSpecification &spec) { SetCurrentTask(spec.TaskId()); } + void SetCurrentTask(const TaskSpecification &task_spec) { + SetCurrentTask(task_spec.TaskId()); + } private: /// The task ID for current task. @@ -62,9 +64,9 @@ const TaskID &WorkerContext::GetCurrentTaskID() const { return GetThreadContext().GetCurrentTaskID(); } -void WorkerContext::SetCurrentTask(const TaskSpecification &spec) { - current_job_id = spec.JobId(); - GetThreadContext().SetCurrentTask(spec); +void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) { + current_job_id = task_spec.JobId(); + GetThreadContext().SetCurrentTask(task_spec); } WorkerThreadContext &WorkerContext::GetThreadContext() { diff --git a/src/ray/core_worker/context.h b/src/ray/core_worker/context.h index a8330055f..629249103 100644 --- a/src/ray/core_worker/context.h +++ b/src/ray/core_worker/context.h @@ -20,7 +20,7 @@ class WorkerContext { const TaskID &GetCurrentTaskID() const; - void SetCurrentTask(const TaskSpecification &spec); + void SetCurrentTask(const TaskSpecification &task_spec); int GetNextTaskIndex(); diff --git a/src/ray/core_worker/task_execution.cc b/src/ray/core_worker/task_execution.cc index 813e789eb..5f939d3cf 100644 --- a/src/ray/core_worker/task_execution.cc +++ b/src/ray/core_worker/task_execution.cc @@ -26,27 +26,27 @@ CoreWorkerTaskExecutionInterface::CoreWorkerTaskExecutionInterface( worker_server_.Run(); } -Status CoreWorkerTaskExecutionInterface::ExecuteTask(const TaskSpecification &spec) { - worker_context_.SetCurrentTask(spec); +Status CoreWorkerTaskExecutionInterface::ExecuteTask(const TaskSpecification &task_spec) { + worker_context_.SetCurrentTask(task_spec); - RayFunction func{spec.GetLanguage(), spec.FunctionDescriptor()}; + RayFunction func{task_spec.GetLanguage(), task_spec.FunctionDescriptor()}; std::vector> args; - RAY_CHECK_OK(BuildArgsForExecutor(spec, &args)); + RAY_CHECK_OK(BuildArgsForExecutor(task_spec, &args)); TaskType task_type; - if (spec.IsActorCreationTask()) { + if (task_spec.IsActorCreationTask()) { task_type = TaskType::ACTOR_CREATION_TASK; - } else if (spec.IsActorTask()) { + } else if (task_spec.IsActorTask()) { task_type = TaskType::ACTOR_TASK; } else { task_type = TaskType::NORMAL_TASK; } - TaskInfo task_info{spec.TaskId(), spec.JobId(), task_type}; + TaskInfo task_info{task_spec.TaskId(), task_spec.JobId(), task_type}; - auto num_returns = spec.NumReturns(); - if (spec.IsActorCreationTask() || spec.IsActorTask()) { + auto num_returns = task_spec.NumReturns(); + if (task_spec.IsActorCreationTask() || task_spec.IsActorTask()) { RAY_CHECK(num_returns > 0); // Decrease to account for the dummy object id. num_returns--; @@ -68,25 +68,25 @@ void CoreWorkerTaskExecutionInterface::Run() { } Status CoreWorkerTaskExecutionInterface::BuildArgsForExecutor( - const TaskSpecification &spec, std::vector> *args) { - auto num_args = spec.NumArgs(); + const TaskSpecification &task, std::vector> *args) { + auto num_args = task.NumArgs(); (*args).resize(num_args); std::vector object_ids_to_fetch; std::vector indices; - for (int i = 0; i < spec.NumArgs(); ++i) { - int count = spec.ArgIdCount(i); + for (int i = 0; i < task.NumArgs(); ++i) { + int count = task.ArgIdCount(i); if (count > 0) { // pass by reference. RAY_CHECK(count == 1); - object_ids_to_fetch.push_back(spec.ArgId(i, 0)); + object_ids_to_fetch.push_back(task.ArgId(i, 0)); indices.push_back(i); } else { // pass by value. (*args)[i] = std::make_shared( - std::make_shared(const_cast(spec.ArgVal(i)), - spec.ArgValLength(i)), + std::make_shared(const_cast(task.ArgVal(i)), + task.ArgValLength(i)), nullptr); } } diff --git a/src/ray/core_worker/task_interface.cc b/src/ray/core_worker/task_interface.cc index 6086e341f..c40b2c05d 100644 --- a/src/ray/core_worker/task_interface.cc +++ b/src/ray/core_worker/task_interface.cc @@ -134,8 +134,8 @@ Status CoreWorkerTaskInterface::SubmitTask(const RayFunction &function, std::vector *return_ids) { auto builder = BuildCommonTaskSpec(function, args, task_options.num_returns, task_options.resources, {}, return_ids); - TaskSpec task(builder.Build(), {}); - return task_submitters_[static_cast(TaskTransportType::RAYLET)]->SubmitTask(task); + return task_submitters_[static_cast(TaskTransportType::RAYLET)]->SubmitTask( + builder.Build()); } Status CoreWorkerTaskInterface::CreateActor( @@ -155,8 +155,8 @@ Status CoreWorkerTaskInterface::CreateActor( (*actor_handle)->IncreaseTaskCounter(); (*actor_handle)->SetActorCursor(return_ids[0]); - const TaskSpec task(builder.Build(), {}); - return task_submitters_[static_cast(TaskTransportType::RAYLET)]->SubmitTask(task); + return task_submitters_[static_cast(TaskTransportType::RAYLET)]->SubmitTask( + builder.Build()); } Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle, @@ -175,12 +175,11 @@ Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle, // Build actor task spec. const auto actor_creation_dummy_object_id = ObjectID::FromBinary(actor_handle.ActorID().Binary()); - builder.SetActorTaskSpec(actor_handle.ActorID(), actor_handle.ActorHandleID(), - actor_creation_dummy_object_id, - actor_handle.IncreaseTaskCounter(), - actor_handle.NewActorHandles()); - - const TaskSpec task(builder.Build(), {actor_handle.ActorCursor()}); + builder.SetActorTaskSpec( + actor_handle.ActorID(), actor_handle.ActorHandleID(), + actor_creation_dummy_object_id, + /*previous_actor_task_dummy_object_id=*/actor_handle.ActorCursor(), + actor_handle.IncreaseTaskCounter(), actor_handle.NewActorHandles()); // Manipulate actor handle state. auto actor_cursor = (*return_ids).back(); @@ -189,8 +188,8 @@ Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle, guard.unlock(); // Submit task. - auto status = - task_submitters_[static_cast(TaskTransportType::RAYLET)]->SubmitTask(task); + auto status = task_submitters_[static_cast(TaskTransportType::RAYLET)]->SubmitTask( + builder.Build()); // Remove cursor from return ids. (*return_ids).pop_back(); diff --git a/src/ray/core_worker/transport/raylet_transport.cc b/src/ray/core_worker/transport/raylet_transport.cc index c260c6d51..4698d4363 100644 --- a/src/ray/core_worker/transport/raylet_transport.cc +++ b/src/ray/core_worker/transport/raylet_transport.cc @@ -8,9 +8,9 @@ CoreWorkerRayletTaskSubmitter::CoreWorkerRayletTaskSubmitter( std::unique_ptr &raylet_client) : raylet_client_(raylet_client) {} -Status CoreWorkerRayletTaskSubmitter::SubmitTask(const TaskSpec &task) { +Status CoreWorkerRayletTaskSubmitter::SubmitTask(const TaskSpecification &task) { RAY_CHECK(raylet_client_ != nullptr); - return raylet_client_->SubmitTask(task.GetDependencies(), task.GetTaskSpecification()); + return raylet_client_->SubmitTask(task); } CoreWorkerRayletTaskReceiver::CoreWorkerRayletTaskReceiver( @@ -26,8 +26,8 @@ void CoreWorkerRayletTaskReceiver::HandleAssignTask( const rpc::AssignTaskRequest &request, rpc::AssignTaskReply *reply, rpc::SendReplyCallback send_reply_callback) { const Task task(request.task()); - const auto &spec = task.GetTaskSpecification(); - auto status = task_handler_(spec); + const auto &task_spec = task.GetTaskSpecification(); + auto status = task_handler_(task_spec); // Notify raylet that current task is done via a `TaskDone` message. This is to // ensure that the task is marked as finished by raylet only after previous // raylet client calls are completed. For example, if the worker sends a diff --git a/src/ray/core_worker/transport/raylet_transport.h b/src/ray/core_worker/transport/raylet_transport.h index 1b375305d..72b0f5ffc 100644 --- a/src/ray/core_worker/transport/raylet_transport.h +++ b/src/ray/core_worker/transport/raylet_transport.h @@ -21,7 +21,7 @@ class CoreWorkerRayletTaskSubmitter : public CoreWorkerTaskSubmitter { /// /// \param[in] task The task spec to submit. /// \return Status. - virtual Status SubmitTask(const TaskSpec &task) override; + virtual Status SubmitTask(const TaskSpecification &task_spec) override; private: /// Raylet client. diff --git a/src/ray/core_worker/transport/transport.h b/src/ray/core_worker/transport/transport.h index 5376817cf..e103fa130 100644 --- a/src/ray/core_worker/transport/transport.h +++ b/src/ray/core_worker/transport/transport.h @@ -26,7 +26,7 @@ class CoreWorkerTaskSubmitter { /// /// \param[in] task The task spec to submit. /// \return Status. - virtual Status SubmitTask(const TaskSpec &task) = 0; + virtual Status SubmitTask(const TaskSpecification &task_spec) = 0; }; /// This class receives tasks for execution. diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 4bee921c0..8d14c004a 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -86,30 +86,26 @@ message ActorCreationTaskSpec { // Task spec of an actor task. message ActorTaskSpec { - // Actor ID of the task. This is the actor that this task is executed on - // or NIL_ACTOR_ID if the task is just a normal task. + // Actor ID of the actor that this task is executed on. bytes actor_id = 2; // The ID of the handle that was used to submit the task. This should be // unique across handles with the same actor_id. bytes actor_handle_id = 3; - // The dummy object ID of the actor creation task if this is an actor method. + // The dummy object ID of the actor creation task. bytes actor_creation_dummy_object_id = 4; // Number of tasks that have been submitted to this actor so far. uint64 actor_counter = 5; - // If this is an actor task, then this will be populated with all of the new - // actor handles that were forked from this handle since the last task on - // this handle was submitted. - // Note that this is a long string that concatenate all of the new_actor_handle IDs. + // This will be populated with all of the new actor handles that were forked + // from this handle since the last task on this handle was submitted. repeated bytes new_actor_handles = 6; + // The dummy object ID of the previous actor task. + bytes previous_actor_task_dummy_object_id = 7; } // The task execution specification encapsulates all mutable information about // the task. These fields may change at execution time, converse to the // `TaskSpec` is determined at submission time. message TaskExecutionSpec { - // A list of object IDs representing the dependencies of this task that may - // change at execution time. - repeated bytes dependencies = 1; // The last time this task was received for scheduling. double last_timestamp = 2; // The number of times this task was spilled back by raylets. diff --git a/src/ray/raylet/format/node_manager.fbs b/src/ray/raylet/format/node_manager.fbs index dad8c8248..705a9fdba 100644 --- a/src/ray/raylet/format/node_manager.fbs +++ b/src/ray/raylet/format/node_manager.fbs @@ -94,7 +94,6 @@ table Task { } table SubmitTaskRequest { - execution_dependencies: [string]; task_spec: string; } diff --git a/src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.cc b/src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.cc index 5080f2edf..ac6d33b9d 100644 --- a/src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.cc +++ b/src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.cc @@ -59,15 +59,9 @@ JNIEXPORT jlong JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeInit( * Signature: (J[BLjava/nio/ByteBuffer;II)V */ JNIEXPORT void JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeSubmitTask( - JNIEnv *env, jclass, jlong client, jbyteArray cursorId, jbyteArray taskSpec) { + JNIEnv *env, jclass, jlong client, jbyteArray taskSpec) { auto raylet_client = reinterpret_cast(client); - std::vector execution_dependencies; - if (cursorId != nullptr) { - UniqueIdFromJByteArray cursor_id(env, cursorId); - execution_dependencies.push_back(cursor_id.GetId()); - } - jbyte *data = env->GetByteArrayElements(taskSpec, NULL); jsize size = env->GetArrayLength(taskSpec); ray::rpc::TaskSpec task_spec_message; @@ -75,7 +69,7 @@ JNIEXPORT void JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeSubmit env->ReleaseByteArrayElements(taskSpec, data, JNI_ABORT); ray::TaskSpecification task_spec(task_spec_message); - auto status = raylet_client->SubmitTask(execution_dependencies, task_spec); + auto status = raylet_client->SubmitTask(task_spec); ThrowRayExceptionIfNotOK(env, status); } diff --git a/src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.h b/src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.h index 414f916d1..d2538654a 100644 --- a/src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.h +++ b/src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.h @@ -7,8 +7,6 @@ #ifdef __cplusplus extern "C" { #endif -#undef org_ray_runtime_raylet_RayletClientImpl_TASK_SPEC_BUFFER_SIZE -#define org_ray_runtime_raylet_RayletClientImpl_TASK_SPEC_BUFFER_SIZE 2097152L /* * Class: org_ray_runtime_raylet_RayletClientImpl * Method: nativeInit @@ -20,10 +18,10 @@ JNIEXPORT jlong JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeInit( /* * Class: org_ray_runtime_raylet_RayletClientImpl * Method: nativeSubmitTask - * Signature: (J[B[B)V + * Signature: (J[B)V */ JNIEXPORT void JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeSubmitTask( - JNIEnv *, jclass, jlong, jbyteArray, jbyteArray); + JNIEnv *, jclass, jlong, jbyteArray); /* * Class: org_ray_runtime_raylet_RayletClientImpl diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 4eb8a8938..b08f6e35d 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1083,10 +1083,6 @@ void NodeManager::ProcessSubmitTaskMessage(const uint8_t *message_data) { rpc::Task task_message; RAY_CHECK(task_message.mutable_task_spec()->ParseFromArray( fbs_message->task_spec()->data(), fbs_message->task_spec()->size())); - for (const auto &dependency : - string_vec_from_flatbuf(*fbs_message->execution_dependencies())) { - task_message.mutable_task_execution_spec()->add_dependencies(dependency); - } // Submit the task to the raylet. Since the task was submitted // locally, there is no uncommitted lineage. @@ -2351,20 +2347,11 @@ void NodeManager::FinishAssignTask(const TaskID &task_id, Worker &worker, bool s // returned by the previous task, so the dependency will not be // released until this first task is submitted. for (auto &new_handle_id : spec.NewActorHandles()) { - // Get the execution dependency for the first task submitted on the new - // actor handle. Since the new actor handle was created after this task - // began and before this task finished, it must have the same execution - // dependency. - const auto &execution_dependencies = - assigned_task.GetTaskExecutionSpec().ExecutionDependencies(); - // TODO(swang): We expect this task to have exactly 1 execution dependency, - // the dummy object returned by the previous actor task. However, this - // leaks information about the TaskExecutionSpecification implementation. - RAY_CHECK(execution_dependencies.size() == 1); - const ObjectID &execution_dependency = execution_dependencies.front(); + const auto prev_actor_task_id = spec.PreviousActorTaskDummyObjectId(); + RAY_CHECK(!prev_actor_task_id.IsNil()); // Add the new handle and give it a reference to the finished task's // execution dependency. - actor_entry->second.AddHandle(new_handle_id, execution_dependency); + actor_entry->second.AddHandle(new_handle_id, prev_actor_task_id); } // TODO(swang): For actors with multiple actor handles, to diff --git a/src/ray/raylet/raylet_client.cc b/src/ray/raylet/raylet_client.cc index d16a88ed5..87a8af4fb 100644 --- a/src/ray/raylet/raylet_client.cc +++ b/src/ray/raylet/raylet_client.cc @@ -223,12 +223,10 @@ RayletClient::RayletClient(const std::string &raylet_socket, const ClientID &cli RAY_CHECK_OK_PREPEND(status, "[RayletClient] Unable to register worker with raylet."); } -ray::Status RayletClient::SubmitTask(const std::vector &execution_dependencies, - const ray::TaskSpecification &task_spec) { +ray::Status RayletClient::SubmitTask(const ray::TaskSpecification &task_spec) { flatbuffers::FlatBufferBuilder fbb; - auto execution_dependencies_message = to_flatbuf(fbb, execution_dependencies); auto message = ray::protocol::CreateSubmitTaskRequest( - fbb, execution_dependencies_message, fbb.CreateString(task_spec.Serialize())); + fbb, fbb.CreateString(task_spec.Serialize())); fbb.Finish(message); return conn_->WriteMessage(MessageType::SubmitTask, &fbb); } diff --git a/src/ray/raylet/raylet_client.h b/src/ray/raylet/raylet_client.h index fb96a6fd8..c27b325df 100644 --- a/src/ray/raylet/raylet_client.h +++ b/src/ray/raylet/raylet_client.h @@ -80,11 +80,9 @@ class RayletClient { /// Submit a task using the raylet code path. /// - /// \param The execution dependencies. /// \param The task specification. /// \return ray::Status. - ray::Status SubmitTask(const std::vector &execution_dependencies, - const ray::TaskSpecification &task_spec); + ray::Status SubmitTask(const ray::TaskSpecification &task_spec); /// Get next task for this client. This will block until the scheduler assigns /// a task to this worker. The caller takes ownership of the returned task