Remove dependencies from TaskExecutionSpecification (#5166)

This commit is contained in:
Edward Oakes 2019-07-15 18:15:21 -07:00 committed by Philipp Moritz
parent fd71ffde2f
commit e5be5fd46d
33 changed files with 136 additions and 194 deletions

View file

@ -244,7 +244,7 @@ public abstract class AbstractRayRuntime implements RayRuntime {
@Override
public RayObject call(RayFunc func, Object[] args, CallOptions options) {
TaskSpec spec = createTaskSpec(func, null, RayActorImpl.NIL, args, false, options);
TaskSpec spec = createTaskSpec(func, null, RayActorImpl.NIL, args, false, false, options);
rayletClient.submitTask(spec);
return new RayObjectImpl(spec.returnIds[0]);
}
@ -257,8 +257,7 @@ public abstract class AbstractRayRuntime implements RayRuntime {
RayActorImpl<?> actorImpl = (RayActorImpl) actor;
TaskSpec spec;
synchronized (actor) {
spec = createTaskSpec(func, null, actorImpl, args, false, null);
spec.getExecutionDependencies().add(((RayActorImpl) actor).getTaskCursor());
spec = createTaskSpec(func, null, actorImpl, args, false, true, null);
actorImpl.setTaskCursor(spec.returnIds[1]);
actorImpl.clearNewActorHandles();
}
@ -271,7 +270,7 @@ public abstract class AbstractRayRuntime implements RayRuntime {
public <T> RayActor<T> createActor(RayFunc actorFactoryFunc,
Object[] args, ActorCreationOptions options) {
TaskSpec spec = createTaskSpec(actorFactoryFunc, null, RayActorImpl.NIL,
args, true, options);
args, true, false, options);
RayActorImpl<?> actor = new RayActorImpl(new UniqueId(spec.returnIds[0].getBytes()));
actor.increaseTaskCounter();
actor.setTaskCursor(spec.returnIds[0]);
@ -293,7 +292,7 @@ public abstract class AbstractRayRuntime implements RayRuntime {
CallOptions options) {
checkPyArguments(args);
PyFunctionDescriptor desc = new PyFunctionDescriptor(moduleName, "", functionName);
TaskSpec spec = createTaskSpec(null, desc, RayPyActorImpl.NIL, args, false, options);
TaskSpec spec = createTaskSpec(null, desc, RayPyActorImpl.NIL, args, false, false, options);
rayletClient.submitTask(spec);
return new RayObjectImpl(spec.returnIds[0]);
}
@ -306,8 +305,7 @@ public abstract class AbstractRayRuntime implements RayRuntime {
RayPyActorImpl actorImpl = (RayPyActorImpl) pyActor;
TaskSpec spec;
synchronized (pyActor) {
spec = createTaskSpec(null, desc, actorImpl, args, false, null);
spec.getExecutionDependencies().add(actorImpl.getTaskCursor());
spec = createTaskSpec(null, desc, actorImpl, args, false, true, null);
actorImpl.setTaskCursor(spec.returnIds[1]);
actorImpl.clearNewActorHandles();
}
@ -320,7 +318,7 @@ public abstract class AbstractRayRuntime implements RayRuntime {
ActorCreationOptions options) {
checkPyArguments(args);
PyFunctionDescriptor desc = new PyFunctionDescriptor(moduleName, className, "__init__");
TaskSpec spec = createTaskSpec(null, desc, RayPyActorImpl.NIL, args, true, options);
TaskSpec spec = createTaskSpec(null, desc, RayPyActorImpl.NIL, args, true, false, options);
RayPyActorImpl actor = new RayPyActorImpl(spec.actorCreationId, moduleName, className);
actor.increaseTaskCounter();
actor.setTaskCursor(spec.returnIds[0]);
@ -337,11 +335,12 @@ public abstract class AbstractRayRuntime implements RayRuntime {
* @param actor The actor handle. If the task is not an actor task, actor id must be NIL.
* @param args The arguments for the remote function.
* @param isActorCreationTask Whether this task is an actor creation task.
* @param isActorTask Whether this task is an actor task.
* @return A TaskSpec object.
*/
private TaskSpec createTaskSpec(RayFunc func, PyFunctionDescriptor pyFunctionDescriptor,
RayActorImpl<?> actor, Object[] args,
boolean isActorCreationTask, BaseTaskOptions taskOptions) {
boolean isActorCreationTask, boolean isActorTask, BaseTaskOptions taskOptions) {
Preconditions.checkArgument((func == null) != (pyFunctionDescriptor == null));
TaskId taskId = rayletClient.generateTaskId(workerContext.getCurrentJobId(),
@ -382,6 +381,11 @@ public abstract class AbstractRayRuntime implements RayRuntime {
functionDescriptor = pyFunctionDescriptor;
}
ObjectId previousActorTaskDummyObjectId = ObjectId.NIL;
if (isActorTask) {
previousActorTaskDummyObjectId = actor.getTaskCursor();
}
return new TaskSpec(
workerContext.getCurrentJobId(),
taskId,
@ -392,6 +396,7 @@ public abstract class AbstractRayRuntime implements RayRuntime {
actor.getId(),
actor.getHandleId(),
actor.increaseTaskCounter(),
previousActorTaskDummyObjectId,
actor.getNewActorHandles().toArray(new UniqueId[0]),
ArgumentsBuilder.wrap(args, language == TaskLanguage.PYTHON),
numReturns,

View file

@ -138,10 +138,9 @@ public class MockRayletClient implements RayletClient {
}
}
}
// Check whether task dependencies are ready.
for (ObjectId id : spec.getExecutionDependencies()) {
if (!store.isObjectReady(id)) {
unreadyObjects.add(id);
if (spec.isActorTask()) {
if (!store.isObjectReady(spec.previousActorTaskDummyObjectId)) {
unreadyObjects.add(spec.previousActorTaskDummyObjectId);
}
}
return unreadyObjects;

View file

@ -81,12 +81,7 @@ public class RayletClientImpl implements RayletClient {
Preconditions.checkState(!spec.jobId.isNil());
byte[] taskSpec = convertTaskSpecToProtobuf(spec);
byte[] cursorId = null;
if (!spec.getExecutionDependencies().isEmpty()) {
//TODO(hchen): handle more than one dependencies.
cursorId = spec.getExecutionDependencies().get(0).getBytes();
}
nativeSubmitTask(client, cursorId, taskSpec);
nativeSubmitTask(client, taskSpec);
}
@Override
@ -195,6 +190,7 @@ public class RayletClientImpl implements RayletClient {
// Parse ActorTaskSpec.
UniqueId actorId = UniqueId.NIL;
UniqueId actorHandleId = UniqueId.NIL;
ObjectId previousActorTaskDummyObjectId = ObjectId.NIL;
int actorCounter = 0;
if (taskSpec.getType() == Common.TaskType.ACTOR_TASK) {
Common.ActorTaskSpec actorTaskSpec = taskSpec.getActorTaskSpec();
@ -202,14 +198,17 @@ public class RayletClientImpl implements RayletClient {
actorHandleId = UniqueId
.fromByteBuffer(actorTaskSpec.getActorHandleId().asReadOnlyByteBuffer());
actorCounter = (int) actorTaskSpec.getActorCounter();
previousActorTaskDummyObjectId = ObjectId.fromByteBuffer(
actorTaskSpec.getPreviousActorTaskDummyObjectId().asReadOnlyByteBuffer());
newActorHandles = actorTaskSpec.getNewActorHandlesList().stream()
.map(byteString -> UniqueId.fromByteBuffer(byteString.asReadOnlyByteBuffer()))
.toArray(UniqueId[]::new);
}
return new TaskSpec(jobId, taskId, parentTaskId, parentCounter, actorCreationId,
maxActorReconstructions, actorId, actorHandleId, actorCounter, newActorHandles,
args, numReturns, resources, TaskLanguage.JAVA, functionDescriptor, dynamicWorkerOptions);
maxActorReconstructions, actorId, actorHandleId, actorCounter,
previousActorTaskDummyObjectId, newActorHandles, args, numReturns, resources,
TaskLanguage.JAVA, functionDescriptor, dynamicWorkerOptions);
}
/**
@ -275,6 +274,8 @@ public class RayletClientImpl implements RayletClient {
.setActorId(ByteString.copyFrom(task.actorId.getBytes()))
.setActorHandleId(ByteString.copyFrom(task.actorHandleId.getBytes()))
.setActorCreationDummyObjectId(ByteString.copyFrom(task.actorId.getBytes()))
.setPreviousActorTaskDummyObjectId(
ByteString.copyFrom(task.previousActorTaskDummyObjectId.getBytes()))
.setActorCounter(task.actorCounter)
.addAllNewActorHandles(newHandles)
);
@ -310,7 +311,7 @@ public class RayletClientImpl implements RayletClient {
private static native long nativeInit(String localSchedulerSocket, byte[] workerId,
boolean isWorker, byte[] driverTaskId);
private static native void nativeSubmitTask(long client, byte[] cursorId, byte[] taskSpec)
private static native void nativeSubmitTask(long client, byte[] taskSpec)
throws RayException;
private static native byte[] nativeGetTask(long client) throws RayException;

View file

@ -46,6 +46,9 @@ public class TaskSpec {
// Number of tasks that have been submitted to this actor so far.
public final int actorCounter;
// Object id returned by the previous task submitted to the same actor.
public final ObjectId previousActorTaskDummyObjectId;
// Task arguments.
public final UniqueId[] newActorHandles;
@ -55,7 +58,7 @@ public class TaskSpec {
// number of return objects.
public final int numReturns;
// returns ids.
// Return ids.
public final ObjectId[] returnIds;
// The task's resource demands.
@ -71,8 +74,6 @@ public class TaskSpec {
// is Python, the type is PyFunctionDescriptor.
private final FunctionDescriptor functionDescriptor;
private List<ObjectId> executionDependencies;
public boolean isActorTask() {
return !actorId.isNil();
}
@ -91,6 +92,7 @@ public class TaskSpec {
UniqueId actorId,
UniqueId actorHandleId,
int actorCounter,
ObjectId previousActorTaskDummyObjectId,
UniqueId[] newActorHandles,
FunctionArg[] args,
int numReturns,
@ -107,6 +109,7 @@ public class TaskSpec {
this.actorId = actorId;
this.actorHandleId = actorHandleId;
this.actorCounter = actorCounter;
this.previousActorTaskDummyObjectId = previousActorTaskDummyObjectId;
this.newActorHandles = newActorHandles;
this.args = args;
this.numReturns = numReturns;
@ -128,7 +131,6 @@ public class TaskSpec {
Preconditions.checkArgument(false, "Unknown task language: {}.", language);
}
this.functionDescriptor = functionDescriptor;
this.executionDependencies = new ArrayList<>();
}
public JavaFunctionDescriptor getJavaFunctionDescriptor() {
@ -141,10 +143,6 @@ public class TaskSpec {
return (PyFunctionDescriptor) functionDescriptor;
}
public List<ObjectId> getExecutionDependencies() {
return executionDependencies;
}
@Override
public String toString() {
return "TaskSpec{" +
@ -157,14 +155,14 @@ public class TaskSpec {
", actorId=" + actorId +
", actorHandleId=" + actorHandleId +
", actorCounter=" + actorCounter +
", previousActorTaskDummyObjectId=" + previousActorTaskDummyObjectId +
", newActorHandles=" + Arrays.toString(newActorHandles) +
", args=" + Arrays.toString(args) +
", numReturns=" + numReturns +
", resources=" + resources +
", language=" + language +
", functionDescriptor=" + functionDescriptor +
", dynamicWorkerOptions=" + dynamicWorkerOptions +
", executionDependencies=" + executionDependencies +
", dynamicWorkerOptions=" + dynamicWorkerOptions +
'}';
}
}

View file

@ -232,14 +232,11 @@ cdef class RayletClient:
def disconnect(self):
check_status(self.client.get().Disconnect())
def submit_task(self, TaskSpec task_spec, execution_dependencies):
def submit_task(self, TaskSpec task_spec):
cdef:
CObjectID c_id
c_vector[CObjectID] c_dependencies
for dep in execution_dependencies:
c_dependencies.push_back((<ObjectID>dep).native())
check_status(self.client.get().SubmitTask(
c_dependencies, task_spec.task_spec.get()[0]))
task_spec.task_spec.get()[0]))
def get_task(self):
cdef:

View file

@ -547,7 +547,7 @@ class ActorHandle(object):
actor_counter=self._ray_actor_counter,
actor_creation_dummy_object_id=(
self._ray_actor_creation_dummy_object_id),
execution_dependencies=[self._ray_actor_cursor],
previous_actor_task_dummy_object_id=self._ray_actor_cursor,
new_actor_handles=self._ray_new_actor_handles,
# We add one for the dummy return ID.
num_return_vals=num_return_vals + 1,

View file

@ -50,9 +50,7 @@ cdef extern from "ray/raylet/raylet_client.h" nogil:
c_bool is_worker, const CJobID &job_id,
const CLanguage &language)
CRayStatus Disconnect()
CRayStatus SubmitTask(
const c_vector[CObjectID] &execution_dependencies,
const CTaskSpec &task_spec)
CRayStatus SubmitTask(const CTaskSpec &task_spec)
CRayStatus GetTask(unique_ptr[CTaskSpec] *task_spec)
CRayStatus TaskDone()
CRayStatus FetchOrReconstruct(c_vector[CObjectID] &object_ids,

View file

@ -66,6 +66,7 @@ cdef extern from "ray/common/task/task_spec.h" namespace "ray" nogil:
c_bool IsActorTask() const
CActorID ActorCreationId() const
CObjectID ActorCreationDummyObjectId() const
CObjectID PreviousActorTaskDummyObjectId() const
uint64_t MaxActorReconstructions() const
CActorID ActorId() const
CActorHandleID ActorHandleId() const
@ -92,8 +93,10 @@ cdef extern from "ray/common/task/task_util.h" namespace "ray" nogil:
TaskSpecBuilder &SetActorTaskSpec(
const CActorID &actor_id, const CActorHandleID &actor_handle_id,
const CObjectID &actor_creation_dummy_object_id, uint64_t actor_counter,
const c_vector[CActorHandleID] &new_handle_ids)
const CObjectID &actor_creation_dummy_object_id,
const CObjectID &previous_actor_task_dummy_object_id,
uint64_t actor_counter,
const c_vector[CActorHandleID] &new_handle_ids);
RpcTaskSpec GetMessage()

View file

@ -23,6 +23,7 @@ cdef class TaskSpec:
int num_returns, TaskID parent_task_id, int parent_counter,
ActorID actor_creation_id,
ObjectID actor_creation_dummy_object_id,
ObjectID previous_actor_task_dummy_object_id,
int32_t max_actor_reconstructions, ActorID actor_id,
ActorHandleID actor_handle_id, int actor_counter,
new_actor_handles, resource_map, placement_resource_map):
@ -85,6 +86,7 @@ cdef class TaskSpec:
actor_id.native(),
actor_handle_id.native(),
actor_creation_dummy_object_id.native(),
previous_actor_task_dummy_object_id.native(),
actor_counter,
c_new_actor_handles,
)
@ -229,6 +231,13 @@ cdef class TaskSpec:
return ObjectID(
self.task_spec.get().ActorCreationDummyObjectId().Binary())
def previous_actor_task_dummy_object_id(self):
"""Return the object ID of the previously executed actor task."""
if not self.is_actor_task():
return ObjectID.nil()
return ObjectID(
self.task_spec.get().PreviousActorTaskDummyObjectId().Binary())
def actor_id(self):
"""Return the actor ID for this task."""
if not self.is_actor_task():
@ -247,13 +256,10 @@ cdef class TaskExecutionSpec:
cdef:
unique_ptr[CTaskExecutionSpec] c_spec
def __init__(self, execution_dependencies):
def __init__(self):
cdef:
RpcTaskExecutionSpec message;
for dependency in execution_dependencies:
message.add_dependencies(
(<ObjectID?>dependency).binary())
self.c_spec.reset(new CTaskExecutionSpec(message))
@staticmethod
@ -264,16 +270,6 @@ cdef class TaskExecutionSpec:
self.c_spec.reset(new CTaskExecutionSpec(string))
return self
def dependencies(self):
cdef:
CObjectID c_id
c_vector[CObjectID] dependencies = (
self.c_spec.get().ExecutionDependencies())
ret = []
for c_id in dependencies:
ret.append(ObjectID(c_id.Binary()))
return ret
def num_forwards(self):
return self.c_spec.get().NumForwards()

View file

@ -341,6 +341,8 @@ class GlobalState(object):
"ActorCreationID": task.actor_creation_id().hex(),
"ActorCreationDummyObjectID": (
task.actor_creation_dummy_object_id().hex()),
"PreviousActorTaskDummyObjectID": (
task.previous_actor_task_dummy_object_id().hex()),
"ActorCounter": task.actor_counter(),
"Args": task.arguments(),
"ReturnObjectIDs": task.returns(),
@ -356,7 +358,6 @@ class GlobalState(object):
task_table_data.task.task_execution_spec.SerializeToString())
return {
"ExecutionSpec": {
"Dependencies": execution_spec.dependencies(),
"NumForwards": execution_spec.num_forwards(),
},
"TaskSpec": task_spec_info

View file

@ -586,8 +586,8 @@ class Worker(object):
actor_counter=0,
actor_creation_id=None,
actor_creation_dummy_object_id=None,
previous_actor_task_dummy_object_id=None,
max_actor_reconstructions=0,
execution_dependencies=None,
new_actor_handles=None,
num_return_vals=None,
resources=None,
@ -611,7 +611,9 @@ class Worker(object):
actor_creation_dummy_object_id: If this task is an actor method,
then this argument is the dummy object ID associated with the
actor creation task for the corresponding actor.
execution_dependencies: The execution dependencies for this task.
previous_actor_task_dummy_object_id: If this task is an actor,
then this argument is the dummy object ID associated with the
task previously submitted to the corresponding actor.
num_return_vals: The number of return values this function should
have.
resources: The resource requirements for this task.
@ -652,10 +654,6 @@ class Worker(object):
else:
args_for_raylet.append(put(arg))
# By default, there are no execution dependencies.
if execution_dependencies is None:
execution_dependencies = []
if new_actor_handles is None:
new_actor_handles = []
@ -705,6 +703,7 @@ class Worker(object):
self.task_context.task_index,
actor_creation_id,
actor_creation_dummy_object_id,
previous_actor_task_dummy_object_id,
max_actor_reconstructions,
actor_id,
actor_handle_id,
@ -713,7 +712,7 @@ class Worker(object):
resources,
placement_resources,
)
self.raylet_client.submit_task(task, execution_dependencies)
self.raylet_client.submit_task(task)
return task.returns()
@ -1887,6 +1886,7 @@ def connect(node,
0, # parent_counter.
ActorID.nil(), # actor_creation_id.
ObjectID.nil(), # actor_creation_dummy_object_id.
ObjectID.nil(), # previous_actor_task_dummy_object_id.
0, # max_actor_reconstructions.
ActorID.nil(), # actor_id.
ActorHandleID.nil(), # actor_handle_id.

View file

@ -22,16 +22,13 @@ void Task::ComputeDependencies() {
dependencies_.push_back(task_spec_.ArgId(i, j));
}
}
// TODO(atumanov): why not just return a const reference to ExecutionDependencies() and
// avoid a copy.
auto execution_dependencies = task_execution_spec_.ExecutionDependencies();
dependencies_.insert(dependencies_.end(), execution_dependencies.begin(),
execution_dependencies.end());
if (task_spec_.IsActorTask()) {
dependencies_.push_back(task_spec_.PreviousActorTaskDummyObjectId());
}
}
void Task::CopyTaskExecutionSpec(const Task &task) {
task_execution_spec_ = task.task_execution_spec_;
ComputeDependencies();
}
std::string Task::DebugString() const {

View file

@ -72,7 +72,7 @@ class Task {
/// dependencies, etc.
TaskSpecification task_spec_;
/// Task execution specification, consisting of all dynamic/mutable
/// information about this task determined at execution time..
/// information about this task determined at execution time.
TaskExecutionSpecification task_execution_spec_;
/// A cached copy of the task's object dependencies, including arguments from
/// the TaskSpecification and execution dependencies from the

View file

@ -4,10 +4,6 @@
namespace ray {
const std::vector<ObjectID> TaskExecutionSpecification::ExecutionDependencies() const {
return IdVectorFromProtobuf<ObjectID>(message_.dependencies());
}
size_t TaskExecutionSpecification::NumForwards() const { return message_.num_forwards(); }
void TaskExecutionSpecification::IncrementNumForwards() {
@ -16,8 +12,7 @@ void TaskExecutionSpecification::IncrementNumForwards() {
std::string TaskExecutionSpecification::DebugString() const {
std::ostringstream stream;
stream << "num_dependencies=" << message_.dependencies_size()
<< ", num_forwards=" << message_.num_forwards();
stream << "num_forwards=" << message_.num_forwards();
return stream.str();
}

View file

@ -29,12 +29,6 @@ class TaskExecutionSpecification : public MessageWrapper<rpc::TaskExecutionSpec>
explicit TaskExecutionSpecification(const std::string &serialized_binary)
: MessageWrapper(serialized_binary) {}
/// Get the task's execution dependencies.
///
/// \return A vector of object IDs representing this task's execution
/// dependencies.
const std::vector<ObjectID> ExecutionDependencies() const;
/// Get the number of times this task has been forwarded.
///
/// \return The number of times this task has been forwarded.

View file

@ -128,6 +128,12 @@ ObjectID TaskSpecification::ActorCreationDummyObjectId() const {
message_.actor_task_spec().actor_creation_dummy_object_id());
}
ObjectID TaskSpecification::PreviousActorTaskDummyObjectId() const {
RAY_CHECK(IsActorTask());
return ObjectID::FromBinary(
message_.actor_task_spec().previous_actor_task_dummy_object_id());
}
ObjectID TaskSpecification::ActorDummyObject() const {
RAY_CHECK(IsActorTask() || IsActorCreationTask());
return ReturnId(NumReturns() - 1);

View file

@ -115,6 +115,8 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {
ObjectID ActorCreationDummyObjectId() const;
ObjectID PreviousActorTaskDummyObjectId() const;
std::vector<ActorHandleID> NewActorHandles() const;
ObjectID ActorDummyObject() const;

View file

@ -93,7 +93,8 @@ class TaskSpecBuilder {
/// \return Reference to the builder object itself.
TaskSpecBuilder &SetActorTaskSpec(
const ActorID &actor_id, const ActorHandleID &actor_handle_id,
const ObjectID &actor_creation_dummy_object_id, uint64_t actor_counter,
const ObjectID &actor_creation_dummy_object_id,
const ObjectID &previous_actor_task_dummy_object_id, uint64_t actor_counter,
const std::vector<ActorHandleID> &new_handle_ids = {}) {
message_.set_type(TaskType::ACTOR_TASK);
auto actor_spec = message_.mutable_actor_task_spec();
@ -101,6 +102,8 @@ class TaskSpecBuilder {
actor_spec->set_actor_handle_id(actor_handle_id.Binary());
actor_spec->set_actor_creation_dummy_object_id(
actor_creation_dummy_object_id.Binary());
actor_spec->set_previous_actor_task_dummy_object_id(
previous_actor_task_dummy_object_id.Binary());
actor_spec->set_actor_counter(actor_counter);
for (const auto &id : new_handle_ids) {
actor_spec->add_new_actor_handles(id.Binary());

View file

@ -75,30 +75,6 @@ struct TaskInfo {
const TaskType task_type;
};
/// Task specification, which includes the immutable information about the task
/// which are determined at the submission time.
/// TODO(zhijunfu): this can be removed after everything is moved to protobuf.
class TaskSpec {
public:
TaskSpec(const TaskSpecification &task_spec, const std::vector<ObjectID> &dependencies)
: task_spec_(task_spec), dependencies_(dependencies) {}
TaskSpec(const TaskSpecification &&task_spec,
const std::vector<ObjectID> &&dependencies)
: task_spec_(task_spec), dependencies_(dependencies) {}
const TaskSpecification &GetTaskSpecification() const { return task_spec_; }
const std::vector<ObjectID> &GetDependencies() const { return dependencies_; }
private:
/// Raylet task specification.
TaskSpecification task_spec_;
/// Dependencies.
std::vector<ObjectID> dependencies_;
};
enum class StoreProviderType { PLASMA };
enum class TaskTransportType { RAYLET };

View file

@ -20,7 +20,9 @@ struct WorkerThreadContext {
put_index = 0;
}
void SetCurrentTask(const TaskSpecification &spec) { SetCurrentTask(spec.TaskId()); }
void SetCurrentTask(const TaskSpecification &task_spec) {
SetCurrentTask(task_spec.TaskId());
}
private:
/// The task ID for current task.
@ -62,9 +64,9 @@ const TaskID &WorkerContext::GetCurrentTaskID() const {
return GetThreadContext().GetCurrentTaskID();
}
void WorkerContext::SetCurrentTask(const TaskSpecification &spec) {
current_job_id = spec.JobId();
GetThreadContext().SetCurrentTask(spec);
void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) {
current_job_id = task_spec.JobId();
GetThreadContext().SetCurrentTask(task_spec);
}
WorkerThreadContext &WorkerContext::GetThreadContext() {

View file

@ -20,7 +20,7 @@ class WorkerContext {
const TaskID &GetCurrentTaskID() const;
void SetCurrentTask(const TaskSpecification &spec);
void SetCurrentTask(const TaskSpecification &task_spec);
int GetNextTaskIndex();

View file

@ -26,27 +26,27 @@ CoreWorkerTaskExecutionInterface::CoreWorkerTaskExecutionInterface(
worker_server_.Run();
}
Status CoreWorkerTaskExecutionInterface::ExecuteTask(const TaskSpecification &spec) {
worker_context_.SetCurrentTask(spec);
Status CoreWorkerTaskExecutionInterface::ExecuteTask(const TaskSpecification &task_spec) {
worker_context_.SetCurrentTask(task_spec);
RayFunction func{spec.GetLanguage(), spec.FunctionDescriptor()};
RayFunction func{task_spec.GetLanguage(), task_spec.FunctionDescriptor()};
std::vector<std::shared_ptr<RayObject>> args;
RAY_CHECK_OK(BuildArgsForExecutor(spec, &args));
RAY_CHECK_OK(BuildArgsForExecutor(task_spec, &args));
TaskType task_type;
if (spec.IsActorCreationTask()) {
if (task_spec.IsActorCreationTask()) {
task_type = TaskType::ACTOR_CREATION_TASK;
} else if (spec.IsActorTask()) {
} else if (task_spec.IsActorTask()) {
task_type = TaskType::ACTOR_TASK;
} else {
task_type = TaskType::NORMAL_TASK;
}
TaskInfo task_info{spec.TaskId(), spec.JobId(), task_type};
TaskInfo task_info{task_spec.TaskId(), task_spec.JobId(), task_type};
auto num_returns = spec.NumReturns();
if (spec.IsActorCreationTask() || spec.IsActorTask()) {
auto num_returns = task_spec.NumReturns();
if (task_spec.IsActorCreationTask() || task_spec.IsActorTask()) {
RAY_CHECK(num_returns > 0);
// Decrease to account for the dummy object id.
num_returns--;
@ -68,25 +68,25 @@ void CoreWorkerTaskExecutionInterface::Run() {
}
Status CoreWorkerTaskExecutionInterface::BuildArgsForExecutor(
const TaskSpecification &spec, std::vector<std::shared_ptr<RayObject>> *args) {
auto num_args = spec.NumArgs();
const TaskSpecification &task, std::vector<std::shared_ptr<RayObject>> *args) {
auto num_args = task.NumArgs();
(*args).resize(num_args);
std::vector<ObjectID> object_ids_to_fetch;
std::vector<int> indices;
for (int i = 0; i < spec.NumArgs(); ++i) {
int count = spec.ArgIdCount(i);
for (int i = 0; i < task.NumArgs(); ++i) {
int count = task.ArgIdCount(i);
if (count > 0) {
// pass by reference.
RAY_CHECK(count == 1);
object_ids_to_fetch.push_back(spec.ArgId(i, 0));
object_ids_to_fetch.push_back(task.ArgId(i, 0));
indices.push_back(i);
} else {
// pass by value.
(*args)[i] = std::make_shared<RayObject>(
std::make_shared<LocalMemoryBuffer>(const_cast<uint8_t *>(spec.ArgVal(i)),
spec.ArgValLength(i)),
std::make_shared<LocalMemoryBuffer>(const_cast<uint8_t *>(task.ArgVal(i)),
task.ArgValLength(i)),
nullptr);
}
}

View file

@ -134,8 +134,8 @@ Status CoreWorkerTaskInterface::SubmitTask(const RayFunction &function,
std::vector<ObjectID> *return_ids) {
auto builder = BuildCommonTaskSpec(function, args, task_options.num_returns,
task_options.resources, {}, return_ids);
TaskSpec task(builder.Build(), {});
return task_submitters_[static_cast<int>(TaskTransportType::RAYLET)]->SubmitTask(task);
return task_submitters_[static_cast<int>(TaskTransportType::RAYLET)]->SubmitTask(
builder.Build());
}
Status CoreWorkerTaskInterface::CreateActor(
@ -155,8 +155,8 @@ Status CoreWorkerTaskInterface::CreateActor(
(*actor_handle)->IncreaseTaskCounter();
(*actor_handle)->SetActorCursor(return_ids[0]);
const TaskSpec task(builder.Build(), {});
return task_submitters_[static_cast<int>(TaskTransportType::RAYLET)]->SubmitTask(task);
return task_submitters_[static_cast<int>(TaskTransportType::RAYLET)]->SubmitTask(
builder.Build());
}
Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle,
@ -175,12 +175,11 @@ Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle,
// Build actor task spec.
const auto actor_creation_dummy_object_id =
ObjectID::FromBinary(actor_handle.ActorID().Binary());
builder.SetActorTaskSpec(actor_handle.ActorID(), actor_handle.ActorHandleID(),
actor_creation_dummy_object_id,
actor_handle.IncreaseTaskCounter(),
actor_handle.NewActorHandles());
const TaskSpec task(builder.Build(), {actor_handle.ActorCursor()});
builder.SetActorTaskSpec(
actor_handle.ActorID(), actor_handle.ActorHandleID(),
actor_creation_dummy_object_id,
/*previous_actor_task_dummy_object_id=*/actor_handle.ActorCursor(),
actor_handle.IncreaseTaskCounter(), actor_handle.NewActorHandles());
// Manipulate actor handle state.
auto actor_cursor = (*return_ids).back();
@ -189,8 +188,8 @@ Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle,
guard.unlock();
// Submit task.
auto status =
task_submitters_[static_cast<int>(TaskTransportType::RAYLET)]->SubmitTask(task);
auto status = task_submitters_[static_cast<int>(TaskTransportType::RAYLET)]->SubmitTask(
builder.Build());
// Remove cursor from return ids.
(*return_ids).pop_back();

View file

@ -8,9 +8,9 @@ CoreWorkerRayletTaskSubmitter::CoreWorkerRayletTaskSubmitter(
std::unique_ptr<RayletClient> &raylet_client)
: raylet_client_(raylet_client) {}
Status CoreWorkerRayletTaskSubmitter::SubmitTask(const TaskSpec &task) {
Status CoreWorkerRayletTaskSubmitter::SubmitTask(const TaskSpecification &task) {
RAY_CHECK(raylet_client_ != nullptr);
return raylet_client_->SubmitTask(task.GetDependencies(), task.GetTaskSpecification());
return raylet_client_->SubmitTask(task);
}
CoreWorkerRayletTaskReceiver::CoreWorkerRayletTaskReceiver(
@ -26,8 +26,8 @@ void CoreWorkerRayletTaskReceiver::HandleAssignTask(
const rpc::AssignTaskRequest &request, rpc::AssignTaskReply *reply,
rpc::SendReplyCallback send_reply_callback) {
const Task task(request.task());
const auto &spec = task.GetTaskSpecification();
auto status = task_handler_(spec);
const auto &task_spec = task.GetTaskSpecification();
auto status = task_handler_(task_spec);
// Notify raylet that current task is done via a `TaskDone` message. This is to
// ensure that the task is marked as finished by raylet only after previous
// raylet client calls are completed. For example, if the worker sends a

View file

@ -21,7 +21,7 @@ class CoreWorkerRayletTaskSubmitter : public CoreWorkerTaskSubmitter {
///
/// \param[in] task The task spec to submit.
/// \return Status.
virtual Status SubmitTask(const TaskSpec &task) override;
virtual Status SubmitTask(const TaskSpecification &task_spec) override;
private:
/// Raylet client.

View file

@ -26,7 +26,7 @@ class CoreWorkerTaskSubmitter {
///
/// \param[in] task The task spec to submit.
/// \return Status.
virtual Status SubmitTask(const TaskSpec &task) = 0;
virtual Status SubmitTask(const TaskSpecification &task_spec) = 0;
};
/// This class receives tasks for execution.

View file

@ -86,30 +86,26 @@ message ActorCreationTaskSpec {
// Task spec of an actor task.
message ActorTaskSpec {
// Actor ID of the task. This is the actor that this task is executed on
// or NIL_ACTOR_ID if the task is just a normal task.
// Actor ID of the actor that this task is executed on.
bytes actor_id = 2;
// The ID of the handle that was used to submit the task. This should be
// unique across handles with the same actor_id.
bytes actor_handle_id = 3;
// The dummy object ID of the actor creation task if this is an actor method.
// The dummy object ID of the actor creation task.
bytes actor_creation_dummy_object_id = 4;
// Number of tasks that have been submitted to this actor so far.
uint64 actor_counter = 5;
// If this is an actor task, then this will be populated with all of the new
// actor handles that were forked from this handle since the last task on
// this handle was submitted.
// Note that this is a long string that concatenate all of the new_actor_handle IDs.
// This will be populated with all of the new actor handles that were forked
// from this handle since the last task on this handle was submitted.
repeated bytes new_actor_handles = 6;
// The dummy object ID of the previous actor task.
bytes previous_actor_task_dummy_object_id = 7;
}
// The task execution specification encapsulates all mutable information about
// the task. These fields may change at execution time, converse to the
// `TaskSpec` is determined at submission time.
message TaskExecutionSpec {
// A list of object IDs representing the dependencies of this task that may
// change at execution time.
repeated bytes dependencies = 1;
// The last time this task was received for scheduling.
double last_timestamp = 2;
// The number of times this task was spilled back by raylets.

View file

@ -94,7 +94,6 @@ table Task {
}
table SubmitTaskRequest {
execution_dependencies: [string];
task_spec: string;
}

View file

@ -59,15 +59,9 @@ JNIEXPORT jlong JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeInit(
* Signature: (J[BLjava/nio/ByteBuffer;II)V
*/
JNIEXPORT void JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeSubmitTask(
JNIEnv *env, jclass, jlong client, jbyteArray cursorId, jbyteArray taskSpec) {
JNIEnv *env, jclass, jlong client, jbyteArray taskSpec) {
auto raylet_client = reinterpret_cast<RayletClient *>(client);
std::vector<ObjectID> execution_dependencies;
if (cursorId != nullptr) {
UniqueIdFromJByteArray<ObjectID> cursor_id(env, cursorId);
execution_dependencies.push_back(cursor_id.GetId());
}
jbyte *data = env->GetByteArrayElements(taskSpec, NULL);
jsize size = env->GetArrayLength(taskSpec);
ray::rpc::TaskSpec task_spec_message;
@ -75,7 +69,7 @@ JNIEXPORT void JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeSubmit
env->ReleaseByteArrayElements(taskSpec, data, JNI_ABORT);
ray::TaskSpecification task_spec(task_spec_message);
auto status = raylet_client->SubmitTask(execution_dependencies, task_spec);
auto status = raylet_client->SubmitTask(task_spec);
ThrowRayExceptionIfNotOK(env, status);
}

View file

@ -7,8 +7,6 @@
#ifdef __cplusplus
extern "C" {
#endif
#undef org_ray_runtime_raylet_RayletClientImpl_TASK_SPEC_BUFFER_SIZE
#define org_ray_runtime_raylet_RayletClientImpl_TASK_SPEC_BUFFER_SIZE 2097152L
/*
* Class: org_ray_runtime_raylet_RayletClientImpl
* Method: nativeInit
@ -20,10 +18,10 @@ JNIEXPORT jlong JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeInit(
/*
* Class: org_ray_runtime_raylet_RayletClientImpl
* Method: nativeSubmitTask
* Signature: (J[B[B)V
* Signature: (J[B)V
*/
JNIEXPORT void JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeSubmitTask(
JNIEnv *, jclass, jlong, jbyteArray, jbyteArray);
JNIEnv *, jclass, jlong, jbyteArray);
/*
* Class: org_ray_runtime_raylet_RayletClientImpl

View file

@ -1083,10 +1083,6 @@ void NodeManager::ProcessSubmitTaskMessage(const uint8_t *message_data) {
rpc::Task task_message;
RAY_CHECK(task_message.mutable_task_spec()->ParseFromArray(
fbs_message->task_spec()->data(), fbs_message->task_spec()->size()));
for (const auto &dependency :
string_vec_from_flatbuf(*fbs_message->execution_dependencies())) {
task_message.mutable_task_execution_spec()->add_dependencies(dependency);
}
// Submit the task to the raylet. Since the task was submitted
// locally, there is no uncommitted lineage.
@ -2351,20 +2347,11 @@ void NodeManager::FinishAssignTask(const TaskID &task_id, Worker &worker, bool s
// returned by the previous task, so the dependency will not be
// released until this first task is submitted.
for (auto &new_handle_id : spec.NewActorHandles()) {
// Get the execution dependency for the first task submitted on the new
// actor handle. Since the new actor handle was created after this task
// began and before this task finished, it must have the same execution
// dependency.
const auto &execution_dependencies =
assigned_task.GetTaskExecutionSpec().ExecutionDependencies();
// TODO(swang): We expect this task to have exactly 1 execution dependency,
// the dummy object returned by the previous actor task. However, this
// leaks information about the TaskExecutionSpecification implementation.
RAY_CHECK(execution_dependencies.size() == 1);
const ObjectID &execution_dependency = execution_dependencies.front();
const auto prev_actor_task_id = spec.PreviousActorTaskDummyObjectId();
RAY_CHECK(!prev_actor_task_id.IsNil());
// Add the new handle and give it a reference to the finished task's
// execution dependency.
actor_entry->second.AddHandle(new_handle_id, execution_dependency);
actor_entry->second.AddHandle(new_handle_id, prev_actor_task_id);
}
// TODO(swang): For actors with multiple actor handles, to

View file

@ -223,12 +223,10 @@ RayletClient::RayletClient(const std::string &raylet_socket, const ClientID &cli
RAY_CHECK_OK_PREPEND(status, "[RayletClient] Unable to register worker with raylet.");
}
ray::Status RayletClient::SubmitTask(const std::vector<ObjectID> &execution_dependencies,
const ray::TaskSpecification &task_spec) {
ray::Status RayletClient::SubmitTask(const ray::TaskSpecification &task_spec) {
flatbuffers::FlatBufferBuilder fbb;
auto execution_dependencies_message = to_flatbuf(fbb, execution_dependencies);
auto message = ray::protocol::CreateSubmitTaskRequest(
fbb, execution_dependencies_message, fbb.CreateString(task_spec.Serialize()));
fbb, fbb.CreateString(task_spec.Serialize()));
fbb.Finish(message);
return conn_->WriteMessage(MessageType::SubmitTask, &fbb);
}

View file

@ -80,11 +80,9 @@ class RayletClient {
/// Submit a task using the raylet code path.
///
/// \param The execution dependencies.
/// \param The task specification.
/// \return ray::Status.
ray::Status SubmitTask(const std::vector<ObjectID> &execution_dependencies,
const ray::TaskSpecification &task_spec);
ray::Status SubmitTask(const ray::TaskSpecification &task_spec);
/// Get next task for this client. This will block until the scheduler assigns
/// a task to this worker. The caller takes ownership of the returned task