[direct call] Assign resource ids for direct call tasks (#6364)

This commit is contained in:
Eric Liang 2019-12-05 10:16:04 -08:00 committed by GitHub
parent 4c6739476b
commit 6223d2ed0b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 149 additions and 66 deletions

View file

@ -141,7 +141,7 @@ script:
- if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then python -c 'import sys;exit(sys.version_info>=(3,5))' || python -m pytest -v --durations=5 --timeout=300 python/ray/tests/py3_test.py; fi
# py bazel tests, run using local strategy since PY2 breaks with sandbox
- ./ci/keep_alive bazel test --spawn_strategy=local --python_version=$BAZEL_PYTHON_VERSION --incompatible_allow_python_version_transitions=false --incompatible_py3_is_default=false --progress_report_interval=100 --show_progress_rate_limit=100 --show_timestamps --test_output=errors --test_tag_filters=-jenkins_only python/ray/...
- ./ci/keep_alive bazel test --spawn_strategy=local --python_version=$BAZEL_PYTHON_VERSION --nocache_test_results --test_verbose_timeout_warnings --incompatible_allow_python_version_transitions=false --incompatible_py3_is_default=false --progress_report_interval=100 --show_progress_rate_limit=100 --show_timestamps --test_output=errors --test_tag_filters=-jenkins_only python/ray/...
deploy:
- provider: s3

View file

@ -16,15 +16,23 @@ py_test(
py_test(
name = "test_actor_resources",
size = "large",
size = "medium",
srcs = ["test_actor_resources.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
py_test(
name = "test_actor_resources_direct",
size = "medium",
srcs = ["test_actor_resources_direct.py", "test_actor_resources.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
py_test(
name = "test_actor_failures",
size = "large",
size = "medium",
srcs = ["test_actor_failures.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
@ -80,7 +88,7 @@ py_test(
py_test(
name = "test_stress",
size = "large",
size = "medium",
srcs = ["test_stress.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
@ -88,7 +96,7 @@ py_test(
py_test(
name = "test_stress_sharded",
size = "large",
size = "medium",
srcs = ["test_stress_sharded.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
@ -167,7 +175,7 @@ py_test(
py_test(
name = "test_garbage_collection",
size = "medium",
size = "small",
srcs = ["test_garbage_collection.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
@ -175,7 +183,7 @@ py_test(
py_test(
name = "test_global_state",
size = "medium",
size = "small",
srcs = ["test_global_state.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
@ -183,7 +191,7 @@ py_test(
py_test(
name = "test_logical_graph",
size = "medium",
size = "small",
srcs = ["test_logical_graph.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
@ -215,7 +223,7 @@ py_test(
py_test(
name = "test_microbenchmarks",
size = "medium",
size = "small",
srcs = ["test_microbenchmarks.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
@ -230,7 +238,7 @@ py_test(
py_test(
name = "test_monitors",
size = "medium",
size = "small",
srcs = ["test_monitors.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
@ -285,7 +293,7 @@ py_test(
py_test(
name = "test_ray_init",
size = "medium",
size = "small",
srcs = ["test_ray_init.py"],
deps = ["//:ray_lib"],
)
@ -308,7 +316,7 @@ py_test(
py_test(
name = "test_tensorflow",
size = "medium",
size = "small",
srcs = ["test_tensorflow.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
@ -324,7 +332,7 @@ py_test(
py_test(
name = "test_webui",
size = "medium",
size = "small",
srcs = ["test_webui.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],

View file

@ -1152,7 +1152,6 @@ def setup_queue_actor():
ray.shutdown()
@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="TODO support block/unblock")
def test_fork(setup_queue_actor):
queue = setup_queue_actor
@ -1171,7 +1170,6 @@ def test_fork(setup_queue_actor):
assert filtered_items == list(range(1))
@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="TODO support block/unblock")
def test_fork_consistency(setup_queue_actor):
queue = setup_queue_actor
@ -1203,7 +1201,6 @@ def test_fork_consistency(setup_queue_actor):
assert filtered_items == list(range(num_items_per_fork))
@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="TODO support block/unblock")
def test_pickled_handle_consistency(setup_queue_actor):
queue = setup_queue_actor
@ -1237,7 +1234,6 @@ def test_pickled_handle_consistency(setup_queue_actor):
assert filtered_items == list(range(num_items_per_fork))
@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="TODO support block/unblock")
def test_nested_fork(setup_queue_actor):
queue = setup_queue_actor

View file

@ -17,6 +17,8 @@ import ray
import ray.test_utils
import ray.cluster_utils
RAY_FORCE_DIRECT = bool(os.environ.get("RAY_FORCE_DIRECT"))
def test_actor_deletion_with_gpus(shutdown_only):
ray.init(
@ -84,6 +86,7 @@ def test_actor_class_methods(ray_start_regular):
assert ray.get(a.g.remote(2)) == 4
@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="no actor method resources")
def test_resource_assignment(shutdown_only):
"""Test to make sure that we assign resource to actors at instantiation."""
# This test will create 16 actors. Declaring this many CPUs initially will

View file

@ -0,0 +1,17 @@
"""Wrapper script that sets RAY_FORCE_DIRECT."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pytest
import sys
import os
if __name__ == "__main__":
os.environ["RAY_FORCE_DIRECT"] = "1"
sys.exit(
pytest.main([
"-v",
os.path.join(os.path.dirname(__file__), "test_actor_resources.py")
]))

View file

@ -5,7 +5,6 @@ from __future__ import print_function
import collections
import io
import os
import json
import logging
import re
@ -24,8 +23,6 @@ import ray.test_utils
logger = logging.getLogger(__name__)
RAY_FORCE_DIRECT = bool(os.environ.get("RAY_FORCE_DIRECT"))
def test_simple_serialization(ray_start_regular):
primitive_objects = [
@ -94,7 +91,6 @@ def test_simple_serialization(ray_start_regular):
assert type(obj) == type(new_obj_2)
@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="resource shape not implemented")
def test_fair_queueing(shutdown_only):
ray.init(
num_cpus=1, _internal_config=json.dumps({
@ -1065,7 +1061,6 @@ def test_redefining_remote_functions(shutdown_only):
assert ray.get(ray.get(h.remote(i))) == i
@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="reconstruction not implemented")
def test_submit_api(shutdown_only):
ray.init(num_cpus=2, num_gpus=1, resources={"Custom": 1})
@ -1124,7 +1119,6 @@ def test_submit_api(shutdown_only):
assert ray.get([id1, id2, id3, id4]) == [0, 1, "test", 2]
@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="reconstruction not implemented")
def test_many_fractional_resources(shutdown_only):
ray.init(num_cpus=2, num_gpus=2, resources={"Custom": 2})

View file

@ -30,7 +30,7 @@ py_test(
py_test(
name = "test_commands",
size = "medium",
size = "small",
srcs = ["tests/test_commands.py"],
deps = [":tune_lib"],
tags = ["exclusive"],
@ -98,7 +98,7 @@ py_test(
py_test(
name = "test_var",
size = "medium",
size = "small",
srcs = ["tests/test_var.py"],
deps = [":tune_lib"],
tags = ["exclusive"],
@ -138,7 +138,7 @@ py_test(
py_test(
name = "test_tune_save_restore",
size = "large",
size = "small",
srcs = ["tests/test_tune_save_restore.py"],
deps = [":tune_lib"],
tags = ["exclusive"],

View file

@ -9,7 +9,8 @@
namespace ray {
typedef std::function<void(const std::shared_ptr<void>, const std::string &, int)>
typedef std::function<void(const std::shared_ptr<void>, const std::string &, int,
const ResourceIdSet &)>
DispatchTaskCallback;
/// Arguments are the raylet ID to spill back to, the raylet's
/// address and the raylet's port.

View file

@ -82,6 +82,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
reference_counter_(std::make_shared<ReferenceCounter>()),
task_execution_service_work_(task_execution_service_),
task_execution_callback_(task_execution_callback),
resource_ids_(new ResourceMappingType()),
grpc_service_(io_service_, *this) {
// Initialize logging if log_dir is passed. Otherwise, it must be initialized
// and cleaned up by the caller.
@ -811,9 +812,11 @@ Status CoreWorker::AllocateReturnObjects(
}
Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
const ResourceMappingType &resource_ids,
const std::shared_ptr<ResourceMappingType> &resource_ids,
std::vector<std::shared_ptr<RayObject>> *return_objects) {
resource_ids_ = resource_ids;
if (resource_ids != nullptr) {
resource_ids_ = resource_ids;
}
worker_context_.SetCurrentTask(task_spec);
SetCurrentTaskId(task_spec.TaskId());

View file

@ -340,7 +340,7 @@ class CoreWorker {
const ActorID &GetActorId() const { return actor_id_; }
// Get the resource IDs available to this worker (as assigned by the raylet).
const ResourceMappingType GetResourceIDs() const { return resource_ids_; }
const ResourceMappingType GetResourceIDs() const { return *resource_ids_; }
/// Create a profile event with a reference to the core worker's profiler.
std::unique_ptr<worker::ProfileEvent> CreateProfileEvent(const std::string &event_type);
@ -443,12 +443,13 @@ class CoreWorker {
/// Execute a task.
///
/// \param spec[in] Task specification.
/// \param spec[in] Resource IDs of resources assigned to this worker.
/// \param spec[in] Resource IDs of resources assigned to this worker. If nullptr,
/// reuse the previously assigned resources.
/// \param results[out] Result objects that should be returned by value (not via
/// plasma).
/// \return Status.
Status ExecuteTask(const TaskSpecification &task_spec,
const ResourceMappingType &resource_ids,
const std::shared_ptr<ResourceMappingType> &resource_ids,
std::vector<std::shared_ptr<RayObject>> *return_objects);
/// Build arguments for task executor. This would loop through all the arguments
@ -585,8 +586,8 @@ class CoreWorker {
/// A map from resource name to the resource IDs that are currently reserved
/// for this worker. Each pair consists of the resource ID and the fraction
/// of that resource allocated for this worker.
ResourceMappingType resource_ids_;
/// of that resource allocated for this worker. This is set on task assignment.
std::shared_ptr<ResourceMappingType> resource_ids_;
// Interface that receives tasks from the raylet.
std::unique_ptr<CoreWorkerRayletTaskReceiver> raylet_task_receiver_;

View file

@ -203,9 +203,6 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask(
SetActorAsAsync();
}
// TODO(ekl) resolving object dependencies is expensive and requires an IPC to
// the raylet, which is a central bottleneck. In the future, we should inline
// dependencies that are small and already known to be local to the client.
std::vector<ObjectID> dependencies;
for (size_t i = 0; i < task_spec.NumArgs(); ++i) {
int count = task_spec.ArgIdCount(i);
@ -223,21 +220,32 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask(
it = result.first;
}
auto accept_callback = [this, reply, send_reply_callback, task_spec]() {
// Only assign resources for non-actor tasks. Actor tasks inherit the resources
// assigned at initial actor creation time.
std::shared_ptr<ResourceMappingType> resource_ids;
if (!task_spec.IsActorTask()) {
resource_ids.reset(new ResourceMappingType());
for (const auto &mapping : request.resource_mapping()) {
std::vector<std::pair<int64_t, double>> rids;
for (const auto &ids : mapping.resource_ids()) {
rids.push_back(std::make_pair(ids.index(), ids.quantity()));
}
(*resource_ids)[mapping.name()] = rids;
}
}
auto accept_callback = [this, reply, send_reply_callback, task_spec, resource_ids]() {
// We have posted an exit task onto the main event loop,
// so shouldn't bother executing any further work.
if (exiting_) return;
auto num_returns = task_spec.NumReturns();
RAY_CHECK(num_returns > 0);
if (task_spec.IsActorCreationTask() || task_spec.IsActorTask()) {
// Decrease to account for the dummy object id.
num_returns--;
}
RAY_CHECK(num_returns >= 0);
// TODO(edoakes): resource IDs are currently kept track of in the
// raylet, need to come up with a solution for this.
ResourceMappingType resource_ids;
std::vector<std::shared_ptr<RayObject>> return_objects;
auto status = task_handler_(task_spec, resource_ids, &return_objects);
bool objects_valid = return_objects.size() == num_returns;

View file

@ -420,9 +420,10 @@ class SchedulingQueue {
class CoreWorkerDirectTaskReceiver {
public:
using TaskHandler = std::function<Status(
const TaskSpecification &task_spec, const ResourceMappingType &resource_ids,
std::vector<std::shared_ptr<RayObject>> *return_objects)>;
using TaskHandler =
std::function<Status(const TaskSpecification &task_spec,
const std::shared_ptr<ResourceMappingType> resource_ids,
std::vector<std::shared_ptr<RayObject>> *return_objects)>;
CoreWorkerDirectTaskReceiver(WorkerContext &worker_context,
boost::asio::io_service &main_io_service,

View file

@ -36,9 +36,9 @@ void CoreWorkerDirectTaskSubmitter::AddWorkerLeaseClient(
std::make_pair(std::move(lease_client), expiration));
}
void CoreWorkerDirectTaskSubmitter::OnWorkerIdle(const rpc::WorkerAddress &addr,
const SchedulingKey &scheduling_key,
bool was_error) {
void CoreWorkerDirectTaskSubmitter::OnWorkerIdle(
const rpc::WorkerAddress &addr, const SchedulingKey &scheduling_key, bool was_error,
const google::protobuf::RepeatedPtrField<rpc::ResourceMapEntry> &assigned_resources) {
auto lease_entry = worker_to_lease_client_[addr];
auto queue_entry = task_queues_.find(scheduling_key);
// Return the worker if there was an error executing the previous task,
@ -49,7 +49,8 @@ void CoreWorkerDirectTaskSubmitter::OnWorkerIdle(const rpc::WorkerAddress &addr,
worker_to_lease_client_.erase(addr);
} else {
auto &client = *client_cache_[addr];
PushNormalTask(addr, client, scheduling_key, queue_entry->second.front());
PushNormalTask(addr, client, scheduling_key, queue_entry->second.front(),
assigned_resources);
queue_entry->second.pop_front();
// Delete the queue if it's now empty. Note that the queue cannot already be empty
// because this is the only place tasks are removed from it.
@ -112,7 +113,8 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded(
rpc::WorkerAddress addr(reply.worker_address().ip_address(),
reply.worker_address().port());
AddWorkerLeaseClient(addr, std::move(lease_client));
OnWorkerIdle(addr, scheduling_key, /*error=*/false);
auto resources_copy = reply.resource_mapping();
OnWorkerIdle(addr, scheduling_key, /*error=*/false, resources_copy);
} else {
// The raylet redirected us to a different raylet to retry at.
RequestNewWorkerIfNeeded(scheduling_key, &reply.retry_at_raylet_address());
@ -138,10 +140,10 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded(
pending_lease_requests_.insert(scheduling_key);
}
void CoreWorkerDirectTaskSubmitter::PushNormalTask(const rpc::WorkerAddress &addr,
rpc::CoreWorkerClientInterface &client,
const SchedulingKey &scheduling_key,
const TaskSpecification &task_spec) {
void CoreWorkerDirectTaskSubmitter::PushNormalTask(
const rpc::WorkerAddress &addr, rpc::CoreWorkerClientInterface &client,
const SchedulingKey &scheduling_key, const TaskSpecification &task_spec,
const google::protobuf::RepeatedPtrField<rpc::ResourceMapEntry> &assigned_resources) {
auto task_id = task_spec.TaskId();
auto request = std::unique_ptr<rpc::PushTaskRequest>(new rpc::PushTaskRequest);
RAY_LOG(DEBUG) << "Pushing normal task " << task_spec.TaskId();
@ -149,12 +151,13 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask(const rpc::WorkerAddress &add
// fails, then the task data will be gone when the TaskManager attempts to
// access the task.
request->mutable_task_spec()->CopyFrom(task_spec.GetMessage());
request->mutable_resource_mapping()->CopyFrom(assigned_resources);
RAY_CHECK_OK(client.PushNormalTask(
std::move(request), [this, task_id, scheduling_key, addr](
std::move(request), [this, task_id, scheduling_key, addr, assigned_resources](
Status status, const rpc::PushTaskReply &reply) {
{
absl::MutexLock lock(&mu_);
OnWorkerIdle(addr, scheduling_key, /*error=*/!status.ok());
OnWorkerIdle(addr, scheduling_key, /*error=*/!status.ok(), assigned_resources);
}
if (!status.ok()) {
// TODO: It'd be nice to differentiate here between process vs node

View file

@ -1,6 +1,8 @@
#ifndef RAY_CORE_WORKER_DIRECT_TASK_H
#define RAY_CORE_WORKER_DIRECT_TASK_H
#include <google/protobuf/repeated_field.h>
#include "absl/base/thread_annotations.h"
#include "absl/synchronization/mutex.h"
@ -52,8 +54,15 @@ class CoreWorkerDirectTaskSubmitter {
/// Schedule more work onto an idle worker or return it back to the raylet if
/// no more tasks are queued for submission. If an error was encountered
/// processing the worker, we don't attempt to re-use the worker.
void OnWorkerIdle(const rpc::WorkerAddress &addr, const SchedulingKey &task_queue_key,
bool was_error) EXCLUSIVE_LOCKS_REQUIRED(mu_);
///
/// \param[in] addr The address of the worker.
/// \param[in] task_queue_key The scheduling class of the worker.
/// \param[in] was_error Whether the task failed to be submitted.
/// \param[in] assigned_resources Resource ids previously assigned to the worker.
void OnWorkerIdle(
const rpc::WorkerAddress &addr, const SchedulingKey &task_queue_key, bool was_error,
const google::protobuf::RepeatedPtrField<rpc::ResourceMapEntry> &assigned_resources)
EXCLUSIVE_LOCKS_REQUIRED(mu_);
/// Get an existing lease client or connect a new one. If a raylet_address is
/// provided, this connects to a remote raylet. Else, this connects to the
@ -78,7 +87,9 @@ class CoreWorkerDirectTaskSubmitter {
void PushNormalTask(const rpc::WorkerAddress &addr,
rpc::CoreWorkerClientInterface &client,
const SchedulingKey &task_queue_key,
const TaskSpecification &task_spec);
const TaskSpecification &task_spec,
const google::protobuf::RepeatedPtrField<rpc::ResourceMapEntry>
&assigned_resources);
// Client that can be used to lease and return workers from the local raylet.
std::shared_ptr<WorkerLeaseInterface> local_lease_client_;

View file

@ -36,14 +36,14 @@ void CoreWorkerRayletTaskReceiver::HandleAssignTask(
// Set the resource IDs for this task.
// TODO: convert the resource map to protobuf and change this.
ResourceMappingType resource_ids;
auto resource_ids = std::make_shared<ResourceMappingType>();
auto resource_infos =
flatbuffers::GetRoot<protocol::ResourceIdSetInfos>(request.resource_ids().data())
->resource_infos();
for (size_t i = 0; i < resource_infos->size(); ++i) {
auto const &fractional_resource_ids = resource_infos->Get(i);
auto &acquired_resources =
resource_ids[string_from_flatbuf(*fractional_resource_ids->resource_name())];
(*resource_ids)[string_from_flatbuf(*fractional_resource_ids->resource_name())];
size_t num_resource_ids = fractional_resource_ids->resource_ids()->size();
size_t num_resource_fractions = fractional_resource_ids->resource_fractions()->size();

View file

@ -11,9 +11,10 @@ namespace ray {
class CoreWorkerRayletTaskReceiver {
public:
using TaskHandler = std::function<Status(
const TaskSpecification &task_spec, const ResourceMappingType &resource_ids,
std::vector<std::shared_ptr<RayObject>> *return_objects)>;
using TaskHandler =
std::function<Status(const TaskSpecification &task_spec,
const std::shared_ptr<ResourceMappingType> &resource_ids,
std::vector<std::shared_ptr<RayObject>> *return_objects)>;
CoreWorkerRayletTaskReceiver(const WorkerID &worker_id,
std::shared_ptr<RayletClient> &raylet_client,

View file

@ -143,3 +143,19 @@ message Task {
TaskSpec task_spec = 1;
TaskExecutionSpec task_execution_spec = 2;
}
// Represents a resource id.
message ResourceId {
// The index of the resource (i.e., CPU #3).
int64 index = 1;
// The quantity of the resource assigned (i.e., 0.5 CPU).
double quantity = 2;
}
// Represents a set of resource ids.
message ResourceMapEntry {
// The name of the resource (i.e., "CPU").
string name = 1;
// The set of resource ids assigned.
repeated ResourceId resource_ids = 2;
}

View file

@ -77,6 +77,8 @@ message PushTaskRequest {
// to cancel any PushTaskRequests with seqno <= this value, rather than
// waiting for the server to time out waiting for missing messages.
int64 client_processed_up_to = 3;
// Resource mapping ids assigned to the worker executing the task.
repeated ResourceMapEntry resource_mapping = 4;
}
message PushTaskReply {

View file

@ -16,6 +16,8 @@ message WorkerLeaseReply {
Address worker_address = 1;
// Address of the raylet to spill back to, if any.
Address retry_at_raylet_address = 2;
// Resource mapping ids acquired by the leased worker.
repeated ResourceMapEntry resource_mapping = 3;
}
// Release a worker back to its raylet.

View file

@ -1584,12 +1584,27 @@ void NodeManager::HandleWorkerLeaseRequest(const rpc::WorkerLeaseRequest &reques
TaskID task_id = task.GetTaskSpecification().TaskId();
task.OnDispatchInstead(
[this, task_id, reply, send_reply_callback](const std::shared_ptr<void> granted,
const std::string &address, int port) {
const std::string &address, int port,
const ResourceIdSet &resource_ids) {
RAY_LOG(DEBUG) << "Worker lease request DISPATCH " << task_id;
reply->mutable_worker_address()->set_ip_address(address);
reply->mutable_worker_address()->set_port(port);
reply->mutable_worker_address()->set_raylet_id(
gcs_client_->client_table().GetLocalClientId().Binary());
for (const auto &mapping : resource_ids.AvailableResources()) {
auto resource = reply->add_resource_mapping();
resource->set_name(mapping.first);
for (const auto &id : mapping.second.WholeIds()) {
auto rid = resource->add_resource_ids();
rid->set_index(id);
rid->set_quantity(1.0);
}
for (const auto &id : mapping.second.FractionalIds()) {
auto rid = resource->add_resource_ids();
rid->set_index(id.first);
rid->set_quantity(id.second.ToDouble());
}
}
send_reply_callback(Status::OK(), nullptr, nullptr);
// TODO(swang): Kill worker if other end hangs up.
@ -2258,7 +2273,8 @@ void NodeManager::AssignTask(const std::shared_ptr<Worker> &worker, const Task &
auto task_id = spec.TaskId();
if (task.OnDispatch() != nullptr) {
task.OnDispatch()(worker, initial_config_.node_manager_address, worker->Port());
task.OnDispatch()(worker, initial_config_.node_manager_address, worker->Port(),
worker->GetTaskResourceIds());
post_assign_callbacks->push_back([this, worker, task_id]() {
FinishAssignTask(worker, task_id, /*success=*/true);
});