[Placement Group] Capture child tasks by default. (#11025)

* In progress.

* Finished up.

* Improve comment.

* Addressed code review.

* Fix test failure.

* Fix ci failures.

* Fix CI issues.
This commit is contained in:
SangBin Cho 2020-09-27 19:33:00 -07:00 committed by GitHub
parent f0787a63da
commit 1e39c40370
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
28 changed files with 282 additions and 47 deletions

View file

@ -44,7 +44,7 @@ ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation) {
local_mode_ray_tuntime_.GetCurrentTaskId(), 0,
local_mode_ray_tuntime_.GetCurrentTaskId(), address, 1,
required_resources, required_placement_resources,
PlacementGroupID::Nil());
PlacementGroupID::Nil(), true);
if (invocation.task_type == TaskType::NORMAL_TASK) {
} else if (invocation.task_type == TaskType::ACTOR_CREATION_TASK) {
builder.SetActorCreationTaskSpec(invocation.actor_id);

View file

@ -37,7 +37,7 @@ ObjectID NativeTaskSubmitter::Submit(const InvocationSpec &invocation) {
&return_ids);
} else {
core_worker.SubmitTask(ray_function, args, TaskOptions(), &return_ids, 1,
std::make_pair(PlacementGroupID::Nil(), -1));
std::make_pair(PlacementGroupID::Nil(), -1), true);
}
return return_ids[0];
}

View file

@ -796,6 +796,10 @@ cdef class CoreWorker:
CCoreWorkerProcess.GetCoreWorker()
.GetCurrentPlacementGroupId().Binary())
def should_capture_child_tasks_in_placement_group(self):
return CCoreWorkerProcess.GetCoreWorker(
).ShouldCaptureChildTasksInPlacementGroup()
def set_webui_display(self, key, message):
CCoreWorkerProcess.GetCoreWorker().SetWebuiDisplay(key, message)
@ -1003,7 +1007,8 @@ cdef class CoreWorker:
resources,
int max_retries,
PlacementGroupID placement_group_id,
int64_t placement_group_bundle_index):
int64_t placement_group_bundle_index,
c_bool placement_group_capture_child_tasks):
cdef:
unordered_map[c_string, double] c_resources
CTaskOptions task_options
@ -1025,7 +1030,8 @@ cdef class CoreWorker:
CCoreWorkerProcess.GetCoreWorker().SubmitTask(
ray_function, args_vector, task_options, &return_ids,
max_retries, c_pair[CPlacementGroupID, int64_t](
c_placement_group_id, placement_group_bundle_index))
c_placement_group_id, placement_group_bundle_index),
placement_group_capture_child_tasks)
return VectorToObjectRefs(return_ids)
@ -1043,6 +1049,7 @@ cdef class CoreWorker:
c_bool is_asyncio,
PlacementGroupID placement_group_id,
int64_t placement_group_bundle_index,
c_bool placement_group_capture_child_tasks,
c_string extension_data
):
cdef:
@ -1071,7 +1078,8 @@ cdef class CoreWorker:
dynamic_worker_options, is_detached, name, is_asyncio,
c_pair[CPlacementGroupID, int64_t](
c_placement_group_id,
placement_group_bundle_index)),
placement_group_bundle_index),
placement_group_capture_child_tasks),
extension_data,
&c_actor_id))

View file

@ -6,8 +6,8 @@ import ray.ray_constants as ray_constants
import ray._raylet
import ray.signature as signature
import ray.worker
from ray.util.placement_group import PlacementGroup, \
check_placement_group_index
from ray.util.placement_group import (
PlacementGroup, check_placement_group_index, get_current_placement_group)
from ray import ActorClassID, Language
from ray._raylet import PythonFunctionDescriptor
@ -417,7 +417,8 @@ class ActorClass:
name=None,
lifetime=None,
placement_group=None,
placement_group_bundle_index=-1):
placement_group_bundle_index=-1,
placement_group_capture_child_tasks=None):
"""Configures and overrides the actor instantiation parameters.
The arguments are the same as those that can be passed
@ -455,7 +456,9 @@ class ActorClass:
name=name,
lifetime=lifetime,
placement_group=placement_group,
placement_group_bundle_index=placement_group_bundle_index)
placement_group_bundle_index=placement_group_bundle_index,
placement_group_capture_child_tasks=(
placement_group_capture_child_tasks))
return ActorOptionWrapper()
@ -474,7 +477,8 @@ class ActorClass:
name=None,
lifetime=None,
placement_group=None,
placement_group_bundle_index=-1):
placement_group_bundle_index=-1,
placement_group_capture_child_tasks=None):
"""Create an actor.
This method allows more flexibility than the remote method because
@ -508,6 +512,9 @@ class ActorClass:
placement_group_bundle_index: the index of the bundle
if the actor belongs to a placement group, which may be -1 to
specify any available bundle.
placement_group_capture_child_tasks: Whether or not children tasks
of this actor should implicitly use the same placement group
as its parent. It is True by default.
Returns:
A handle to the newly created actor.
@ -565,7 +572,15 @@ class ActorClass:
else:
raise ValueError("lifetime must be either `None` or 'detached'")
if placement_group_capture_child_tasks is None:
placement_group_capture_child_tasks = (
worker.should_capture_child_tasks_in_placement_group)
if placement_group is None:
if placement_group_capture_child_tasks:
placement_group = get_current_placement_group()
if not placement_group:
placement_group = PlacementGroup.empty()
check_placement_group_index(placement_group,
@ -644,6 +659,7 @@ class ActorClass:
is_asyncio,
placement_group.id,
placement_group_bundle_index,
placement_group_capture_child_tasks,
# Store actor_method_cpu in actor handle's extension data.
extension_data=str(actor_method_cpu))

View file

@ -78,7 +78,8 @@ def java_function(class_name, function_name):
None, # max_retries
placement_group=None,
# TODO(ekl) set default to -1 once we support -1 as "any index"
placement_group_bundle_index=0)
placement_group_bundle_index=0,
placement_group_capture_child_tasks=None)
def java_actor_class(class_name):

View file

@ -254,7 +254,8 @@ cdef extern from "ray/core_worker/common.h" nogil:
const unordered_map[c_string, double] &placement_resources,
const c_vector[c_string] &dynamic_worker_options,
c_bool is_detached, c_string &name, c_bool is_asyncio,
c_pair[CPlacementGroupID, int64_t] placement_options)
c_pair[CPlacementGroupID, int64_t] placement_options,
c_bool placement_group_capture_child_tasks)
cdef cppclass CPlacementGroupCreationOptions \
"ray::PlacementGroupCreationOptions":

View file

@ -89,7 +89,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const c_vector[unique_ptr[CTaskArg]] &args,
const CTaskOptions &options, c_vector[CObjectID] *return_ids,
int max_retries,
c_pair[CPlacementGroupID, int64_t] placement_options)
c_pair[CPlacementGroupID, int64_t] placement_options,
c_bool placement_group_capture_child_tasks)
CRayStatus CreateActor(
const CRayFunction &function,
const c_vector[unique_ptr[CTaskArg]] &args,
@ -123,6 +124,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CTaskID GetCurrentTaskId()
CNodeID GetCurrentNodeId()
CPlacementGroupID GetCurrentPlacementGroupId()
c_bool ShouldCaptureChildTasksInPlacementGroup()
const CActorID &GetActorId()
void SetActorTitle(const c_string &title)
void SetWebuiDisplay(const c_string &key, const c_string &message)

View file

@ -4,8 +4,11 @@ from functools import wraps
from ray import cloudpickle as pickle
from ray._raylet import PythonFunctionDescriptor
from ray import cross_language, Language
from ray.util.placement_group import PlacementGroup, \
check_placement_group_index
from ray.util.placement_group import (
PlacementGroup,
check_placement_group_index,
get_current_placement_group,
)
import ray.signature
# Default parameters for remote functions.
@ -63,7 +66,8 @@ class RemoteFunction:
def __init__(self, language, function, function_descriptor, num_cpus,
num_gpus, memory, object_store_memory, resources,
accelerator_type, num_returns, max_calls, max_retries,
placement_group, placement_group_bundle_index):
placement_group, placement_group_bundle_index,
placement_group_capture_child_tasks):
self._language = language
self._function = function
self._function_name = (
@ -135,6 +139,7 @@ class RemoteFunction:
max_retries=None,
placement_group=None,
placement_group_bundle_index=-1,
placement_group_capture_child_tasks=None,
name=""):
"""Configures and overrides the task invocation parameters.
@ -168,6 +173,8 @@ class RemoteFunction:
max_retries=max_retries,
placement_group=placement_group,
placement_group_bundle_index=placement_group_bundle_index,
placement_group_capture_child_tasks=(
placement_group_capture_child_tasks),
name=name)
return FuncWrapper()
@ -185,6 +192,7 @@ class RemoteFunction:
max_retries=None,
placement_group=None,
placement_group_bundle_index=-1,
placement_group_capture_child_tasks=None,
name=""):
"""Submit the remote function for execution."""
worker = ray.worker.global_worker
@ -220,7 +228,15 @@ class RemoteFunction:
if max_retries is None:
max_retries = self._max_retries
if placement_group_capture_child_tasks is None:
placement_group_capture_child_tasks = (
worker.should_capture_child_tasks_in_placement_group)
if placement_group is None:
if placement_group_capture_child_tasks:
placement_group = get_current_placement_group()
if not placement_group:
placement_group = PlacementGroup.empty()
check_placement_group_index(placement_group,
@ -248,7 +264,8 @@ class RemoteFunction:
object_refs = worker.core_worker.submit_task(
self._language, self._function_descriptor, list_args, name,
num_returns, resources, max_retries, placement_group.id,
placement_group_bundle_index)
placement_group_bundle_index,
placement_group_capture_child_tasks)
if len(object_refs) == 1:
return object_refs[0]

View file

@ -54,6 +54,18 @@ class RuntimeContext(object):
"""
return self.worker.placement_group_id
@property
def should_capture_child_tasks_in_placement_group(self):
"""Get if the current task should capture parent's placement group.
This returns True if it is called inside a driver.
Returns:
Return True if the current task should implicitly
capture the parent placement group.
"""
return self.worker.should_capture_child_tasks_in_placement_group
_runtime_context = None

View file

@ -52,7 +52,6 @@ py_test_module_list(
"test_multiprocessing.py",
"test_object_manager.py",
"test_output.py",
"test_placement_group.py",
"test_reconstruction.py",
"test_reference_counting_2.py",
"test_reference_counting.py",
@ -104,7 +103,8 @@ py_test_module_list(
py_test_module_list(
files = [
"test_stress_failure.py",
"test_failure.py"
"test_failure.py",
"test_placement_group.py",
],
size = "large",
extra_srcs = SRCS,

View file

@ -793,7 +793,7 @@ def test_mini_integration(ray_start_cluster):
assert all(ray.get([a.ping.remote() for a in actors]))
def test_capture_child_tasks(ray_start_cluster):
def test_capture_child_actors(ray_start_cluster):
cluster = ray_start_cluster
total_num_actors = 4
for _ in range(2):
@ -806,12 +806,13 @@ def test_capture_child_tasks(ray_start_cluster):
}, {
"CPU": 2
}], strategy="STRICT_PACK")
ray.get(pg.ready(), timeout=5)
ray.get(pg.ready())
# If get_current_placement_group is used when the current worker/driver
# doesn't belong to any of placement group, it should return None.
assert get_current_placement_group() is None
# Test actors first.
@ray.remote(num_cpus=1)
class NestedActor:
def ready(self):
@ -826,8 +827,16 @@ def test_capture_child_tasks(ray_start_cluster):
return True
def schedule_nested_actor(self):
actor = NestedActor.options(
placement_group=get_current_placement_group()).remote()
# Make sure we can capture the current placement group.
assert get_current_placement_group() is not None
# Actors should be implicitly captured.
actor = NestedActor.remote()
ray.get(actor.ready.remote())
self.actors.append(actor)
def schedule_nested_actor_outside_pg(self):
# Don't use placement group.
actor = NestedActor.options(placement_group=None).remote()
ray.get(actor.ready.remote())
self.actors.append(actor)
@ -846,6 +855,104 @@ def test_capture_child_tasks(ray_start_cluster):
# Since all node id should be identical, set should be equal to 1.
assert len(node_id_set) == 1
# Kill an actor and wait until it is killed.
ray.kill(a)
with pytest.raises(ray.exceptions.RayActorError):
ray.get(a.ready.remote())
# Now create an actor, but do not capture the current tasks
a = Actor.options(
placement_group=pg,
placement_group_capture_child_tasks=False).remote()
ray.get(a.ready.remote())
# 1 top level actor + 3 children.
for _ in range(total_num_actors - 1):
ray.get(a.schedule_nested_actor.remote())
# Make sure all the actors are not scheduled on the same node.
# It is because the child tasks are not scheduled on the same
# placement group.
node_id_set = set()
for actor_info in ray.actors().values():
node_id = actor_info["Address"]["NodeID"]
node_id_set.add(node_id)
assert len(node_id_set) == 2
# Kill an actor and wait until it is killed.
ray.kill(a)
with pytest.raises(ray.exceptions.RayActorError):
ray.get(a.ready.remote())
# Lastly, make sure when None is specified, actors are not scheduled
# on the same placement group.
a = Actor.options(placement_group=pg).remote()
ray.get(a.ready.remote())
# 1 top level actor + 3 children.
for _ in range(total_num_actors - 1):
ray.get(a.schedule_nested_actor_outside_pg.remote())
# Make sure all the actors are not scheduled on the same node.
# It is because the child tasks are not scheduled on the same
# placement group.
node_id_set = set()
for actor_info in ray.actors().values():
node_id = actor_info["Address"]["NodeID"]
node_id_set.add(node_id)
assert len(node_id_set) == 2
def test_capture_child_tasks(ray_start_cluster):
cluster = ray_start_cluster
total_num_tasks = 4
for _ in range(2):
cluster.add_node(num_cpus=total_num_tasks, num_gpus=total_num_tasks)
ray.init(address=cluster.address)
pg = ray.util.placement_group(
[{
"CPU": 2,
"GPU": 2,
}, {
"CPU": 2,
"GPU": 2,
}],
strategy="STRICT_PACK")
ray.get(pg.ready())
# If get_current_placement_group is used when the current worker/driver
# doesn't belong to any of placement group, it should return None.
assert get_current_placement_group() is None
# Test if tasks capture child tasks.
@ray.remote
def task():
return get_current_placement_group()
@ray.remote
def create_nested_task(child_cpu, child_gpu):
assert get_current_placement_group() is not None
return ray.get([
task.options(num_cpus=child_cpu, num_gpus=child_gpu).remote()
for _ in range(3)
])
t = create_nested_task.options(
num_cpus=1, num_gpus=0, placement_group=pg).remote(1, 0)
pgs = ray.get(t)
# Every task should have current placement group because they
# should be implicitly captured by default.
assert None not in pgs
# Test if tasks don't capture child tasks when the option is off.
t2 = create_nested_task.options(
num_cpus=0,
num_gpus=1,
placement_group=pg,
placement_group_capture_child_tasks=False).remote(0, 1)
pgs = ray.get(t2)
# All placement group should be None because we don't capture child tasks.
assert not all(pgs)
def test_ready_warning_suppressed(ray_start_regular, error_pubsub):
p = error_pubsub

View file

@ -209,8 +209,9 @@ def get_current_placement_group() -> Optional[PlacementGroup]:
None if the current task or actor wasn't
created with any placement group.
"""
pg_id = ray.runtime_context.get_runtime_context(
).current_placement_group_id
worker = ray.worker.global_worker
worker.check_connected()
pg_id = worker.placement_group_id
if pg_id.is_nil():
return None
return PlacementGroup(pg_id)

View file

@ -159,6 +159,10 @@ class Worker:
def placement_group_id(self):
return self.core_worker.get_placement_group_id()
@property
def should_capture_child_tasks_in_placement_group(self):
return self.core_worker.should_capture_child_tasks_in_placement_group()
@property
def current_session_and_job(self):
"""Get the current session index and job id as pair."""
@ -1671,7 +1675,8 @@ def make_decorator(num_returns=None,
max_task_retries=None,
worker=None,
placement_group=None,
placement_group_bundle_index=-1):
placement_group_bundle_index=-1,
placement_group_capture_child_tasks=True):
def decorator(function_or_class):
if (inspect.isfunction(function_or_class)
or is_cython(function_or_class)):
@ -1701,7 +1706,8 @@ def make_decorator(num_returns=None,
Language.PYTHON, function_or_class, None, num_cpus, num_gpus,
memory, object_store_memory, resources, accelerator_type,
num_returns, max_calls, max_retries, placement_group,
placement_group_bundle_index)
placement_group_bundle_index,
placement_group_capture_child_tasks)
if inspect.isclass(function_or_class):
if num_returns is not None:
@ -1831,6 +1837,9 @@ def remote(*args, **kwargs):
placement_group_bundle_index (int): The index of the bundle
if the task belongs to a placement group, which may be
-1 to indicate any available bundle.
placement_group_capture_child_tasks (bool): Default True.
If True, all the child tasks (including actor creation)
are scheduled in the same placement group.
"""
worker = global_worker
@ -1864,6 +1873,7 @@ def remote(*args, **kwargs):
"max_retries",
"placement_group",
"placement_group_bundle_index",
"placement_group_capture_child_tasks",
], error_string
num_cpus = kwargs["num_cpus"] if "num_cpus" in kwargs else None

View file

@ -47,6 +47,10 @@ const PlacementGroupID TaskSpecification::PlacementGroupId() const {
return PlacementGroupID::FromBinary(message_->placement_group_id());
}
bool TaskSpecification::PlacementGroupCaptureChildTasks() const {
return message_->placement_group_capture_child_tasks();
}
void TaskSpecification::ComputeResources() {
auto required_resources = MapFromProtobuf(message_->required_resources());
auto required_placement_resources =

View file

@ -195,6 +195,9 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {
// Placement Group ID that this task or actor creation is associated with.
const PlacementGroupID PlacementGroupId() const;
// Whether or not we should capture parent's placement group implicitly.
bool PlacementGroupCaptureChildTasks() const;
private:
void ComputeResources();

View file

@ -86,7 +86,8 @@ class TaskSpecBuilder {
const rpc::Address &caller_address, uint64_t num_returns,
const std::unordered_map<std::string, double> &required_resources,
const std::unordered_map<std::string, double> &required_placement_resources,
const PlacementGroupID &placement_group_id) {
const PlacementGroupID &placement_group_id,
bool placement_group_capture_child_tasks) {
message_->set_type(TaskType::NORMAL_TASK);
message_->set_name(name);
message_->set_language(language);
@ -103,6 +104,8 @@ class TaskSpecBuilder {
message_->mutable_required_placement_resources()->insert(
required_placement_resources.begin(), required_placement_resources.end());
message_->set_placement_group_id(placement_group_id.Binary());
message_->set_placement_group_capture_child_tasks(
placement_group_capture_child_tasks);
return *this;
}

View file

@ -75,7 +75,8 @@ struct ActorCreationOptions {
const std::unordered_map<std::string, double> &placement_resources,
const std::vector<std::string> &dynamic_worker_options, bool is_detached,
std::string &name, bool is_asyncio,
PlacementOptions placement_options = std::make_pair(PlacementGroupID::Nil(), -1))
PlacementOptions placement_options = std::make_pair(PlacementGroupID::Nil(), -1),
bool placement_group_capture_child_tasks = true)
: max_restarts(max_restarts),
max_task_retries(max_task_retries),
max_concurrency(max_concurrency),
@ -85,7 +86,8 @@ struct ActorCreationOptions {
is_detached(is_detached),
name(name),
is_asyncio(is_asyncio),
placement_options(placement_options){};
placement_options(placement_options),
placement_group_capture_child_tasks(placement_group_capture_child_tasks){};
/// Maximum number of times that the actor should be restarted if it dies
/// unexpectedly. A value of -1 indicates infinite restarts. If it's 0, the
@ -117,6 +119,9 @@ struct ActorCreationOptions {
/// If the actor doesn't belong to a placement group, the placement_group_id will be
/// nil, and the bundle_index will be -1.
PlacementOptions placement_options;
/// When true, the child task will always scheduled on the same placement group
/// specified in the PlacementOptions.
bool placement_group_capture_child_tasks = true;
};
using PlacementStrategy = rpc::PlacementStrategy;

View file

@ -56,11 +56,20 @@ struct WorkerThreadContext {
current_placement_group_id_ = placement_group_id;
}
void SetPlacementGroupCaptureChildTasks(bool placement_group_capture_child_tasks) {
placement_group_capture_child_tasks_ = placement_group_capture_child_tasks;
}
bool PlacementGroupCaptureChildTasks() const {
return placement_group_capture_child_tasks_;
}
void SetCurrentTask(const TaskSpecification &task_spec) {
RAY_CHECK(task_index_ == 0);
RAY_CHECK(put_counter_ == 0);
SetCurrentTaskId(task_spec.TaskId());
SetCurrentPlacementGroupId(task_spec.PlacementGroupId());
SetPlacementGroupCaptureChildTasks(task_spec.PlacementGroupCaptureChildTasks());
current_task_ = std::make_shared<const TaskSpecification>(task_spec);
}
@ -90,6 +99,9 @@ struct WorkerThreadContext {
/// thread local placement group id for tasks, and the process placement group id for
/// actors.
PlacementGroupID current_placement_group_id_;
/// Whether or not child tasks are captured in the parent's placement group implicitly.
bool placement_group_capture_child_tasks_ = true;
};
thread_local std::unique_ptr<WorkerThreadContext> WorkerContext::thread_context_ =
@ -102,6 +114,7 @@ WorkerContext::WorkerContext(WorkerType worker_type, const WorkerID &worker_id,
current_job_id_(worker_type_ == WorkerType::DRIVER ? job_id : JobID::Nil()),
current_actor_id_(ActorID::Nil()),
current_actor_placement_group_id_(PlacementGroupID::Nil()),
placement_group_capture_child_tasks_(true),
main_thread_id_(boost::this_thread::get_id()) {
// For worker main thread which initializes the WorkerContext,
// set task_id according to whether current worker is a driver.
@ -134,6 +147,15 @@ const PlacementGroupID &WorkerContext::GetCurrentPlacementGroupId() const {
}
}
bool WorkerContext::ShouldCaptureChildTasksInPlacementGroup() const {
// If the worker is an actor, we should return the actor's placement group id.
if (current_actor_id_ != ActorID::Nil()) {
return placement_group_capture_child_tasks_;
} else {
return GetThreadContext().PlacementGroupCaptureChildTasks();
}
}
void WorkerContext::SetCurrentJobId(const JobID &job_id) { current_job_id_ = job_id; }
void WorkerContext::SetCurrentTaskId(const TaskID &task_id) {
@ -155,6 +177,7 @@ void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) {
current_actor_max_concurrency_ = task_spec.MaxActorConcurrency();
current_actor_is_asyncio_ = task_spec.IsAsyncioActor();
current_actor_placement_group_id_ = task_spec.PlacementGroupId();
placement_group_capture_child_tasks_ = task_spec.PlacementGroupCaptureChildTasks();
} else if (task_spec.IsActorTask()) {
RAY_CHECK(current_job_id_ == task_spec.JobId());
RAY_CHECK(current_actor_id_ == task_spec.ActorId());

View file

@ -37,6 +37,8 @@ class WorkerContext {
const PlacementGroupID &GetCurrentPlacementGroupId() const;
bool ShouldCaptureChildTasksInPlacementGroup() const;
// TODO(edoakes): remove this once Python core worker uses the task interfaces.
void SetCurrentJobId(const JobID &job_id);
@ -88,6 +90,8 @@ class WorkerContext {
bool current_actor_is_asyncio_ = false;
// The placement group id that the current actor belongs to.
PlacementGroupID current_actor_placement_group_id_;
// Whether or not we should implicitly capture parent's placement group.
bool placement_group_capture_child_tasks_;
/// The id of the (main) thread that constructed this worker context.
boost::thread::id main_thread_id_;

View file

@ -38,12 +38,14 @@ void BuildCommonTaskSpec(
const std::vector<std::unique_ptr<ray::TaskArg>> &args, uint64_t num_returns,
const std::unordered_map<std::string, double> &required_resources,
const std::unordered_map<std::string, double> &required_placement_resources,
std::vector<ObjectID> *return_ids, const ray::PlacementGroupID &placement_group_id) {
std::vector<ObjectID> *return_ids, const ray::PlacementGroupID &placement_group_id,
bool placement_group_capture_child_tasks) {
// Build common task spec.
builder.SetCommonTaskSpec(
task_id, name, function.GetLanguage(), function.GetFunctionDescriptor(), job_id,
current_task_id, task_index, caller_id, address, num_returns, required_resources,
required_placement_resources, placement_group_id);
builder.SetCommonTaskSpec(task_id, name, function.GetLanguage(),
function.GetFunctionDescriptor(), job_id, current_task_id,
task_index, caller_id, address, num_returns,
required_resources, required_placement_resources,
placement_group_id, placement_group_capture_child_tasks);
// Set task arguments.
for (const auto &arg : args) {
builder.AddArg(*arg);
@ -1268,7 +1270,8 @@ void CoreWorker::SubmitTask(const RayFunction &function,
const std::vector<std::unique_ptr<TaskArg>> &args,
const TaskOptions &task_options,
std::vector<ObjectID> *return_ids, int max_retries,
PlacementOptions placement_options) {
PlacementOptions placement_options,
bool placement_group_capture_child_tasks) {
TaskSpecBuilder builder;
const int next_task_index = worker_context_.GetNextTaskIndex();
const auto task_id =
@ -1286,7 +1289,7 @@ void CoreWorker::SubmitTask(const RayFunction &function,
worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(),
rpc_address_, function, args, task_options.num_returns,
constrained_resources, required_resources, return_ids,
placement_options.first);
placement_options.first, placement_group_capture_child_tasks);
TaskSpecification task_spec = builder.Build();
if (options_.is_local_mode) {
ExecuteTaskLocalMode(task_spec);
@ -1332,7 +1335,8 @@ Status CoreWorker::CreateActor(const RayFunction &function,
worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(),
rpc_address_, function, args, 1, new_resource,
new_placement_resources, &return_ids,
actor_creation_options.placement_options.first);
actor_creation_options.placement_options.first,
actor_creation_options.placement_group_capture_child_tasks);
builder.SetActorCreationTaskSpec(actor_id, actor_creation_options.max_restarts,
actor_creation_options.dynamic_worker_options,
actor_creation_options.max_concurrency,
@ -1436,7 +1440,10 @@ void CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &fun
BuildCommonTaskSpec(builder, actor_handle->CreationJobID(), actor_task_id, task_name,
worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(),
rpc_address_, function, args, num_returns, task_options.resources,
required_resources, return_ids, PlacementGroupID::Nil());
required_resources, return_ids, PlacementGroupID::Nil(),
true /* placement_group_capture_child_tasks */);
// NOTE: placement_group_capture_child_tasks will be ignored in the actor because
// we should always follow actor's option.
const ObjectID new_cursor = return_ids->back();
actor_handle->SetActorTaskSpec(builder, new_cursor);

View file

@ -356,6 +356,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
return worker_context_.GetCurrentPlacementGroupId();
}
bool ShouldCaptureChildTasksInPlacementGroup() const {
return worker_context_.ShouldCaptureChildTasksInPlacementGroup();
}
void SetWebuiDisplay(const std::string &key, const std::string &message);
void SetActorTitle(const std::string &title);
@ -639,10 +643,15 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// \param[in] args Arguments of this task.
/// \param[in] task_options Options for this task.
/// \param[out] return_ids Ids of the return objects.
/// \param[in] max_retires max number of retry when the task fails.
/// \param[in] placement_options placement group options.
/// \param[in] placement_group_capture_child_tasks whether or not the submitted task
/// should capture parent's placement group implicilty.
void SubmitTask(const RayFunction &function,
const std::vector<std::unique_ptr<TaskArg>> &args,
const TaskOptions &task_options, std::vector<ObjectID> *return_ids,
int max_retries, PlacementOptions placement_options);
int max_retries, PlacementOptions placement_options,
bool placement_group_capture_child_tasks);
/// Create an actor.
///

View file

@ -221,7 +221,7 @@ JNIEXPORT jobject JNICALL Java_io_ray_runtime_task_NativeTaskSubmitter_nativeSub
ray_function, task_args, task_options, &return_ids,
/*max_retries=*/0,
/*placement_options=*/
std::pair<ray::PlacementGroupID, int64_t>(ray::PlacementGroupID::Nil(), 0));
std::pair<ray::PlacementGroupID, int64_t>(ray::PlacementGroupID::Nil(), 0), true);
// This is to avoid creating an empty java list and boost performance.
if (return_ids.empty()) {

View file

@ -257,7 +257,7 @@ void CoreWorkerTest::TestNormalTask(std::unordered_map<std::string, double> &res
TaskOptions options;
std::vector<ObjectID> return_ids;
driver.SubmitTask(func, args, options, &return_ids, /*max_retries=*/0,
std::make_pair(PlacementGroupID::Nil(), -1));
std::make_pair(PlacementGroupID::Nil(), -1), true);
ASSERT_EQ(return_ids.size(), 1);
@ -533,7 +533,7 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) {
builder.SetCommonTaskSpec(RandomTaskId(), options.name, function.GetLanguage(),
function.GetFunctionDescriptor(), job_id, RandomTaskId(), 0,
RandomTaskId(), address, num_returns, resources, resources,
PlacementGroupID::Nil());
PlacementGroupID::Nil(), true);
// Set task arguments.
for (const auto &arg : args) {
builder.AddArg(*arg);

View file

@ -327,7 +327,7 @@ TaskSpecification BuildTaskSpec(const std::unordered_map<std::string, double> &r
builder.SetCommonTaskSpec(TaskID::Nil(), "dummy_task", Language::PYTHON,
function_descriptor, JobID::Nil(), TaskID::Nil(), 0,
TaskID::Nil(), empty_address, 1, resources, resources,
PlacementGroupID::Nil());
PlacementGroupID::Nil(), true);
return builder.Build();
}

View file

@ -41,7 +41,7 @@ struct Mocker {
builder.SetCommonTaskSpec(task_id, name + ":" + empty_descriptor->CallString(),
Language::PYTHON, empty_descriptor, job_id, TaskID::Nil(),
0, TaskID::Nil(), owner_address, 1, resource, resource,
PlacementGroupID::Nil());
PlacementGroupID::Nil(), true);
builder.SetActorCreationTaskSpec(actor_id, max_restarts, {}, 1, detached, name);
return builder.Build();
}

View file

@ -188,6 +188,8 @@ message TaskSpec {
int32 max_retries = 17;
// Placement group that is associated with this task.
bytes placement_group_id = 18;
// Whether or not this task should capture parent's placement group automatically.
bool placement_group_capture_child_tasks = 19;
}
message Bundle {

View file

@ -267,7 +267,7 @@ Task CreateTask(const std::unordered_map<std::string, double> &required_resource
spec_builder.SetCommonTaskSpec(id, "dummy_task", Language::PYTHON,
FunctionDescriptorBuilder::BuildPython("", "", "", ""),
job_id, TaskID::Nil(), 0, TaskID::Nil(), address, 0,
required_resources, {}, PlacementGroupID::Nil());
required_resources, {}, PlacementGroupID::Nil(), true);
for (int i = 0; i < num_args; i++) {
ObjectID put_id = ObjectID::FromIndex(TaskID::Nil(), /*index=*/i + 1);

View file

@ -68,7 +68,7 @@ static inline Task ExampleTask(const std::vector<ObjectID> &arguments,
builder.SetCommonTaskSpec(RandomTaskId(), "example_task", Language::PYTHON,
FunctionDescriptorBuilder::BuildPython("", "", "", ""),
JobID::Nil(), RandomTaskId(), 0, RandomTaskId(), address,
num_returns, {}, {}, PlacementGroupID::Nil());
num_returns, {}, {}, PlacementGroupID::Nil(), true);
builder.SetActorCreationTaskSpec(ActorID::Nil(), 1, {}, 1, false, "", false);
for (const auto &arg : arguments) {
builder.AddArg(TaskArgByReference(arg, rpc::Address()));