diff --git a/.travis.yml b/.travis.yml index 4e14cbbc5..fd1767a50 100644 --- a/.travis.yml +++ b/.travis.yml @@ -141,7 +141,7 @@ script: - if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then python -c 'import sys;exit(sys.version_info>=(3,5))' || python -m pytest -v --durations=5 --timeout=300 python/ray/tests/py3_test.py; fi # py bazel tests, run using local strategy since PY2 breaks with sandbox - - ./ci/keep_alive bazel test --spawn_strategy=local --python_version=$BAZEL_PYTHON_VERSION --incompatible_allow_python_version_transitions=false --incompatible_py3_is_default=false --progress_report_interval=100 --show_progress_rate_limit=100 --show_timestamps --test_output=errors --test_tag_filters=-jenkins_only python/ray/... + - ./ci/keep_alive bazel test --spawn_strategy=local --python_version=$BAZEL_PYTHON_VERSION --nocache_test_results --test_verbose_timeout_warnings --incompatible_allow_python_version_transitions=false --incompatible_py3_is_default=false --progress_report_interval=100 --show_progress_rate_limit=100 --show_timestamps --test_output=errors --test_tag_filters=-jenkins_only python/ray/... deploy: - provider: s3 diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 40059cc77..43888409c 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -16,15 +16,23 @@ py_test( py_test( name = "test_actor_resources", - size = "large", + size = "medium", srcs = ["test_actor_resources.py"], tags = ["exclusive"], deps = ["//:ray_lib"], ) +py_test( + name = "test_actor_resources_direct", + size = "medium", + srcs = ["test_actor_resources_direct.py", "test_actor_resources.py"], + tags = ["exclusive"], + deps = ["//:ray_lib"], +) + py_test( name = "test_actor_failures", - size = "large", + size = "medium", srcs = ["test_actor_failures.py"], tags = ["exclusive"], deps = ["//:ray_lib"], @@ -80,7 +88,7 @@ py_test( py_test( name = "test_stress", - size = "large", + size = "medium", srcs = ["test_stress.py"], tags = ["exclusive"], deps = ["//:ray_lib"], @@ -88,7 +96,7 @@ py_test( py_test( name = "test_stress_sharded", - size = "large", + size = "medium", srcs = ["test_stress_sharded.py"], tags = ["exclusive"], deps = ["//:ray_lib"], @@ -167,7 +175,7 @@ py_test( py_test( name = "test_garbage_collection", - size = "medium", + size = "small", srcs = ["test_garbage_collection.py"], tags = ["exclusive"], deps = ["//:ray_lib"], @@ -175,7 +183,7 @@ py_test( py_test( name = "test_global_state", - size = "medium", + size = "small", srcs = ["test_global_state.py"], tags = ["exclusive"], deps = ["//:ray_lib"], @@ -183,7 +191,7 @@ py_test( py_test( name = "test_logical_graph", - size = "medium", + size = "small", srcs = ["test_logical_graph.py"], tags = ["exclusive"], deps = ["//:ray_lib"], @@ -215,7 +223,7 @@ py_test( py_test( name = "test_microbenchmarks", - size = "medium", + size = "small", srcs = ["test_microbenchmarks.py"], tags = ["exclusive"], deps = ["//:ray_lib"], @@ -230,7 +238,7 @@ py_test( py_test( name = "test_monitors", - size = "medium", + size = "small", srcs = ["test_monitors.py"], tags = ["exclusive"], deps = ["//:ray_lib"], @@ -285,7 +293,7 @@ py_test( py_test( name = "test_ray_init", - size = "medium", + size = "small", srcs = ["test_ray_init.py"], deps = ["//:ray_lib"], ) @@ -308,7 +316,7 @@ py_test( py_test( name = "test_tensorflow", - size = "medium", + size = "small", srcs = ["test_tensorflow.py"], tags = ["exclusive"], deps = ["//:ray_lib"], @@ -324,7 +332,7 @@ py_test( py_test( name = "test_webui", - size = "medium", + size = "small", srcs = ["test_webui.py"], tags = ["exclusive"], deps = ["//:ray_lib"], diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index 1d5713ff9..934b6e103 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -1152,7 +1152,6 @@ def setup_queue_actor(): ray.shutdown() -@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="TODO support block/unblock") def test_fork(setup_queue_actor): queue = setup_queue_actor @@ -1171,7 +1170,6 @@ def test_fork(setup_queue_actor): assert filtered_items == list(range(1)) -@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="TODO support block/unblock") def test_fork_consistency(setup_queue_actor): queue = setup_queue_actor @@ -1203,7 +1201,6 @@ def test_fork_consistency(setup_queue_actor): assert filtered_items == list(range(num_items_per_fork)) -@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="TODO support block/unblock") def test_pickled_handle_consistency(setup_queue_actor): queue = setup_queue_actor @@ -1237,7 +1234,6 @@ def test_pickled_handle_consistency(setup_queue_actor): assert filtered_items == list(range(num_items_per_fork)) -@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="TODO support block/unblock") def test_nested_fork(setup_queue_actor): queue = setup_queue_actor diff --git a/python/ray/tests/test_actor_resources.py b/python/ray/tests/test_actor_resources.py index 51cf34ec4..fa5421d0d 100644 --- a/python/ray/tests/test_actor_resources.py +++ b/python/ray/tests/test_actor_resources.py @@ -17,6 +17,8 @@ import ray import ray.test_utils import ray.cluster_utils +RAY_FORCE_DIRECT = bool(os.environ.get("RAY_FORCE_DIRECT")) + def test_actor_deletion_with_gpus(shutdown_only): ray.init( @@ -84,6 +86,7 @@ def test_actor_class_methods(ray_start_regular): assert ray.get(a.g.remote(2)) == 4 +@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="no actor method resources") def test_resource_assignment(shutdown_only): """Test to make sure that we assign resource to actors at instantiation.""" # This test will create 16 actors. Declaring this many CPUs initially will diff --git a/python/ray/tests/test_actor_resources_direct.py b/python/ray/tests/test_actor_resources_direct.py new file mode 100644 index 000000000..63263afc6 --- /dev/null +++ b/python/ray/tests/test_actor_resources_direct.py @@ -0,0 +1,17 @@ +"""Wrapper script that sets RAY_FORCE_DIRECT.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import pytest +import sys +import os + +if __name__ == "__main__": + os.environ["RAY_FORCE_DIRECT"] = "1" + sys.exit( + pytest.main([ + "-v", + os.path.join(os.path.dirname(__file__), "test_actor_resources.py") + ])) diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 3a929a677..8a875209b 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -5,7 +5,6 @@ from __future__ import print_function import collections import io -import os import json import logging import re @@ -24,8 +23,6 @@ import ray.test_utils logger = logging.getLogger(__name__) -RAY_FORCE_DIRECT = bool(os.environ.get("RAY_FORCE_DIRECT")) - def test_simple_serialization(ray_start_regular): primitive_objects = [ @@ -94,7 +91,6 @@ def test_simple_serialization(ray_start_regular): assert type(obj) == type(new_obj_2) -@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="resource shape not implemented") def test_fair_queueing(shutdown_only): ray.init( num_cpus=1, _internal_config=json.dumps({ @@ -1065,7 +1061,6 @@ def test_redefining_remote_functions(shutdown_only): assert ray.get(ray.get(h.remote(i))) == i -@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="reconstruction not implemented") def test_submit_api(shutdown_only): ray.init(num_cpus=2, num_gpus=1, resources={"Custom": 1}) @@ -1124,7 +1119,6 @@ def test_submit_api(shutdown_only): assert ray.get([id1, id2, id3, id4]) == [0, 1, "test", 2] -@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="reconstruction not implemented") def test_many_fractional_resources(shutdown_only): ray.init(num_cpus=2, num_gpus=2, resources={"Custom": 2}) diff --git a/python/ray/tune/BUILD b/python/ray/tune/BUILD index 09fe9f12b..397561351 100644 --- a/python/ray/tune/BUILD +++ b/python/ray/tune/BUILD @@ -30,7 +30,7 @@ py_test( py_test( name = "test_commands", - size = "medium", + size = "small", srcs = ["tests/test_commands.py"], deps = [":tune_lib"], tags = ["exclusive"], @@ -98,7 +98,7 @@ py_test( py_test( name = "test_var", - size = "medium", + size = "small", srcs = ["tests/test_var.py"], deps = [":tune_lib"], tags = ["exclusive"], @@ -138,7 +138,7 @@ py_test( py_test( name = "test_tune_save_restore", - size = "large", + size = "small", srcs = ["tests/test_tune_save_restore.py"], deps = [":tune_lib"], tags = ["exclusive"], diff --git a/src/ray/common/task/task.h b/src/ray/common/task/task.h index 9b6418f51..41f37655d 100644 --- a/src/ray/common/task/task.h +++ b/src/ray/common/task/task.h @@ -9,7 +9,8 @@ namespace ray { -typedef std::function, const std::string &, int)> +typedef std::function, const std::string &, int, + const ResourceIdSet &)> DispatchTaskCallback; /// Arguments are the raylet ID to spill back to, the raylet's /// address and the raylet's port. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index e04bf7355..ce9851546 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -82,6 +82,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, reference_counter_(std::make_shared()), task_execution_service_work_(task_execution_service_), task_execution_callback_(task_execution_callback), + resource_ids_(new ResourceMappingType()), grpc_service_(io_service_, *this) { // Initialize logging if log_dir is passed. Otherwise, it must be initialized // and cleaned up by the caller. @@ -811,9 +812,11 @@ Status CoreWorker::AllocateReturnObjects( } Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, - const ResourceMappingType &resource_ids, + const std::shared_ptr &resource_ids, std::vector> *return_objects) { - resource_ids_ = resource_ids; + if (resource_ids != nullptr) { + resource_ids_ = resource_ids; + } worker_context_.SetCurrentTask(task_spec); SetCurrentTaskId(task_spec.TaskId()); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 7bb11a3fa..c9e03967a 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -340,7 +340,7 @@ class CoreWorker { const ActorID &GetActorId() const { return actor_id_; } // Get the resource IDs available to this worker (as assigned by the raylet). - const ResourceMappingType GetResourceIDs() const { return resource_ids_; } + const ResourceMappingType GetResourceIDs() const { return *resource_ids_; } /// Create a profile event with a reference to the core worker's profiler. std::unique_ptr CreateProfileEvent(const std::string &event_type); @@ -443,12 +443,13 @@ class CoreWorker { /// Execute a task. /// /// \param spec[in] Task specification. - /// \param spec[in] Resource IDs of resources assigned to this worker. + /// \param spec[in] Resource IDs of resources assigned to this worker. If nullptr, + /// reuse the previously assigned resources. /// \param results[out] Result objects that should be returned by value (not via /// plasma). /// \return Status. Status ExecuteTask(const TaskSpecification &task_spec, - const ResourceMappingType &resource_ids, + const std::shared_ptr &resource_ids, std::vector> *return_objects); /// Build arguments for task executor. This would loop through all the arguments @@ -585,8 +586,8 @@ class CoreWorker { /// A map from resource name to the resource IDs that are currently reserved /// for this worker. Each pair consists of the resource ID and the fraction - /// of that resource allocated for this worker. - ResourceMappingType resource_ids_; + /// of that resource allocated for this worker. This is set on task assignment. + std::shared_ptr resource_ids_; // Interface that receives tasks from the raylet. std::unique_ptr raylet_task_receiver_; diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index aa475d04b..07c89c6e4 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -203,9 +203,6 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask( SetActorAsAsync(); } - // TODO(ekl) resolving object dependencies is expensive and requires an IPC to - // the raylet, which is a central bottleneck. In the future, we should inline - // dependencies that are small and already known to be local to the client. std::vector dependencies; for (size_t i = 0; i < task_spec.NumArgs(); ++i) { int count = task_spec.ArgIdCount(i); @@ -223,21 +220,32 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask( it = result.first; } - auto accept_callback = [this, reply, send_reply_callback, task_spec]() { + // Only assign resources for non-actor tasks. Actor tasks inherit the resources + // assigned at initial actor creation time. + std::shared_ptr resource_ids; + if (!task_spec.IsActorTask()) { + resource_ids.reset(new ResourceMappingType()); + for (const auto &mapping : request.resource_mapping()) { + std::vector> rids; + for (const auto &ids : mapping.resource_ids()) { + rids.push_back(std::make_pair(ids.index(), ids.quantity())); + } + (*resource_ids)[mapping.name()] = rids; + } + } + + auto accept_callback = [this, reply, send_reply_callback, task_spec, resource_ids]() { // We have posted an exit task onto the main event loop, // so shouldn't bother executing any further work. if (exiting_) return; auto num_returns = task_spec.NumReturns(); - RAY_CHECK(num_returns > 0); if (task_spec.IsActorCreationTask() || task_spec.IsActorTask()) { // Decrease to account for the dummy object id. num_returns--; } + RAY_CHECK(num_returns >= 0); - // TODO(edoakes): resource IDs are currently kept track of in the - // raylet, need to come up with a solution for this. - ResourceMappingType resource_ids; std::vector> return_objects; auto status = task_handler_(task_spec, resource_ids, &return_objects); bool objects_valid = return_objects.size() == num_returns; diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index 33c9dc172..134c5691c 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -420,9 +420,10 @@ class SchedulingQueue { class CoreWorkerDirectTaskReceiver { public: - using TaskHandler = std::function> *return_objects)>; + using TaskHandler = + std::function resource_ids, + std::vector> *return_objects)>; CoreWorkerDirectTaskReceiver(WorkerContext &worker_context, boost::asio::io_service &main_io_service, diff --git a/src/ray/core_worker/transport/direct_task_transport.cc b/src/ray/core_worker/transport/direct_task_transport.cc index 54760be6c..506ec609e 100644 --- a/src/ray/core_worker/transport/direct_task_transport.cc +++ b/src/ray/core_worker/transport/direct_task_transport.cc @@ -36,9 +36,9 @@ void CoreWorkerDirectTaskSubmitter::AddWorkerLeaseClient( std::make_pair(std::move(lease_client), expiration)); } -void CoreWorkerDirectTaskSubmitter::OnWorkerIdle(const rpc::WorkerAddress &addr, - const SchedulingKey &scheduling_key, - bool was_error) { +void CoreWorkerDirectTaskSubmitter::OnWorkerIdle( + const rpc::WorkerAddress &addr, const SchedulingKey &scheduling_key, bool was_error, + const google::protobuf::RepeatedPtrField &assigned_resources) { auto lease_entry = worker_to_lease_client_[addr]; auto queue_entry = task_queues_.find(scheduling_key); // Return the worker if there was an error executing the previous task, @@ -49,7 +49,8 @@ void CoreWorkerDirectTaskSubmitter::OnWorkerIdle(const rpc::WorkerAddress &addr, worker_to_lease_client_.erase(addr); } else { auto &client = *client_cache_[addr]; - PushNormalTask(addr, client, scheduling_key, queue_entry->second.front()); + PushNormalTask(addr, client, scheduling_key, queue_entry->second.front(), + assigned_resources); queue_entry->second.pop_front(); // Delete the queue if it's now empty. Note that the queue cannot already be empty // because this is the only place tasks are removed from it. @@ -112,7 +113,8 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded( rpc::WorkerAddress addr(reply.worker_address().ip_address(), reply.worker_address().port()); AddWorkerLeaseClient(addr, std::move(lease_client)); - OnWorkerIdle(addr, scheduling_key, /*error=*/false); + auto resources_copy = reply.resource_mapping(); + OnWorkerIdle(addr, scheduling_key, /*error=*/false, resources_copy); } else { // The raylet redirected us to a different raylet to retry at. RequestNewWorkerIfNeeded(scheduling_key, &reply.retry_at_raylet_address()); @@ -138,10 +140,10 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded( pending_lease_requests_.insert(scheduling_key); } -void CoreWorkerDirectTaskSubmitter::PushNormalTask(const rpc::WorkerAddress &addr, - rpc::CoreWorkerClientInterface &client, - const SchedulingKey &scheduling_key, - const TaskSpecification &task_spec) { +void CoreWorkerDirectTaskSubmitter::PushNormalTask( + const rpc::WorkerAddress &addr, rpc::CoreWorkerClientInterface &client, + const SchedulingKey &scheduling_key, const TaskSpecification &task_spec, + const google::protobuf::RepeatedPtrField &assigned_resources) { auto task_id = task_spec.TaskId(); auto request = std::unique_ptr(new rpc::PushTaskRequest); RAY_LOG(DEBUG) << "Pushing normal task " << task_spec.TaskId(); @@ -149,12 +151,13 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask(const rpc::WorkerAddress &add // fails, then the task data will be gone when the TaskManager attempts to // access the task. request->mutable_task_spec()->CopyFrom(task_spec.GetMessage()); + request->mutable_resource_mapping()->CopyFrom(assigned_resources); RAY_CHECK_OK(client.PushNormalTask( - std::move(request), [this, task_id, scheduling_key, addr]( + std::move(request), [this, task_id, scheduling_key, addr, assigned_resources]( Status status, const rpc::PushTaskReply &reply) { { absl::MutexLock lock(&mu_); - OnWorkerIdle(addr, scheduling_key, /*error=*/!status.ok()); + OnWorkerIdle(addr, scheduling_key, /*error=*/!status.ok(), assigned_resources); } if (!status.ok()) { // TODO: It'd be nice to differentiate here between process vs node diff --git a/src/ray/core_worker/transport/direct_task_transport.h b/src/ray/core_worker/transport/direct_task_transport.h index d8bdcc23a..1623d2358 100644 --- a/src/ray/core_worker/transport/direct_task_transport.h +++ b/src/ray/core_worker/transport/direct_task_transport.h @@ -1,6 +1,8 @@ #ifndef RAY_CORE_WORKER_DIRECT_TASK_H #define RAY_CORE_WORKER_DIRECT_TASK_H +#include + #include "absl/base/thread_annotations.h" #include "absl/synchronization/mutex.h" @@ -52,8 +54,15 @@ class CoreWorkerDirectTaskSubmitter { /// Schedule more work onto an idle worker or return it back to the raylet if /// no more tasks are queued for submission. If an error was encountered /// processing the worker, we don't attempt to re-use the worker. - void OnWorkerIdle(const rpc::WorkerAddress &addr, const SchedulingKey &task_queue_key, - bool was_error) EXCLUSIVE_LOCKS_REQUIRED(mu_); + /// + /// \param[in] addr The address of the worker. + /// \param[in] task_queue_key The scheduling class of the worker. + /// \param[in] was_error Whether the task failed to be submitted. + /// \param[in] assigned_resources Resource ids previously assigned to the worker. + void OnWorkerIdle( + const rpc::WorkerAddress &addr, const SchedulingKey &task_queue_key, bool was_error, + const google::protobuf::RepeatedPtrField &assigned_resources) + EXCLUSIVE_LOCKS_REQUIRED(mu_); /// Get an existing lease client or connect a new one. If a raylet_address is /// provided, this connects to a remote raylet. Else, this connects to the @@ -78,7 +87,9 @@ class CoreWorkerDirectTaskSubmitter { void PushNormalTask(const rpc::WorkerAddress &addr, rpc::CoreWorkerClientInterface &client, const SchedulingKey &task_queue_key, - const TaskSpecification &task_spec); + const TaskSpecification &task_spec, + const google::protobuf::RepeatedPtrField + &assigned_resources); // Client that can be used to lease and return workers from the local raylet. std::shared_ptr local_lease_client_; diff --git a/src/ray/core_worker/transport/raylet_transport.cc b/src/ray/core_worker/transport/raylet_transport.cc index 5ac3e0a68..3c9982260 100644 --- a/src/ray/core_worker/transport/raylet_transport.cc +++ b/src/ray/core_worker/transport/raylet_transport.cc @@ -36,14 +36,14 @@ void CoreWorkerRayletTaskReceiver::HandleAssignTask( // Set the resource IDs for this task. // TODO: convert the resource map to protobuf and change this. - ResourceMappingType resource_ids; + auto resource_ids = std::make_shared(); auto resource_infos = flatbuffers::GetRoot(request.resource_ids().data()) ->resource_infos(); for (size_t i = 0; i < resource_infos->size(); ++i) { auto const &fractional_resource_ids = resource_infos->Get(i); auto &acquired_resources = - resource_ids[string_from_flatbuf(*fractional_resource_ids->resource_name())]; + (*resource_ids)[string_from_flatbuf(*fractional_resource_ids->resource_name())]; size_t num_resource_ids = fractional_resource_ids->resource_ids()->size(); size_t num_resource_fractions = fractional_resource_ids->resource_fractions()->size(); diff --git a/src/ray/core_worker/transport/raylet_transport.h b/src/ray/core_worker/transport/raylet_transport.h index da36951b3..0ced28c6e 100644 --- a/src/ray/core_worker/transport/raylet_transport.h +++ b/src/ray/core_worker/transport/raylet_transport.h @@ -11,9 +11,10 @@ namespace ray { class CoreWorkerRayletTaskReceiver { public: - using TaskHandler = std::function> *return_objects)>; + using TaskHandler = + std::function &resource_ids, + std::vector> *return_objects)>; CoreWorkerRayletTaskReceiver(const WorkerID &worker_id, std::shared_ptr &raylet_client, diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 2c3e41cf2..9bfd72463 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -143,3 +143,19 @@ message Task { TaskSpec task_spec = 1; TaskExecutionSpec task_execution_spec = 2; } + +// Represents a resource id. +message ResourceId { + // The index of the resource (i.e., CPU #3). + int64 index = 1; + // The quantity of the resource assigned (i.e., 0.5 CPU). + double quantity = 2; +} + +// Represents a set of resource ids. +message ResourceMapEntry { + // The name of the resource (i.e., "CPU"). + string name = 1; + // The set of resource ids assigned. + repeated ResourceId resource_ids = 2; +} diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 47f6a2dea..f2964d6e9 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -77,6 +77,8 @@ message PushTaskRequest { // to cancel any PushTaskRequests with seqno <= this value, rather than // waiting for the server to time out waiting for missing messages. int64 client_processed_up_to = 3; + // Resource mapping ids assigned to the worker executing the task. + repeated ResourceMapEntry resource_mapping = 4; } message PushTaskReply { diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index c6dd3b23e..56bc99814 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -16,6 +16,8 @@ message WorkerLeaseReply { Address worker_address = 1; // Address of the raylet to spill back to, if any. Address retry_at_raylet_address = 2; + // Resource mapping ids acquired by the leased worker. + repeated ResourceMapEntry resource_mapping = 3; } // Release a worker back to its raylet. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 26eb8360a..e74438014 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1584,12 +1584,27 @@ void NodeManager::HandleWorkerLeaseRequest(const rpc::WorkerLeaseRequest &reques TaskID task_id = task.GetTaskSpecification().TaskId(); task.OnDispatchInstead( [this, task_id, reply, send_reply_callback](const std::shared_ptr granted, - const std::string &address, int port) { + const std::string &address, int port, + const ResourceIdSet &resource_ids) { RAY_LOG(DEBUG) << "Worker lease request DISPATCH " << task_id; reply->mutable_worker_address()->set_ip_address(address); reply->mutable_worker_address()->set_port(port); reply->mutable_worker_address()->set_raylet_id( gcs_client_->client_table().GetLocalClientId().Binary()); + for (const auto &mapping : resource_ids.AvailableResources()) { + auto resource = reply->add_resource_mapping(); + resource->set_name(mapping.first); + for (const auto &id : mapping.second.WholeIds()) { + auto rid = resource->add_resource_ids(); + rid->set_index(id); + rid->set_quantity(1.0); + } + for (const auto &id : mapping.second.FractionalIds()) { + auto rid = resource->add_resource_ids(); + rid->set_index(id.first); + rid->set_quantity(id.second.ToDouble()); + } + } send_reply_callback(Status::OK(), nullptr, nullptr); // TODO(swang): Kill worker if other end hangs up. @@ -2258,7 +2273,8 @@ void NodeManager::AssignTask(const std::shared_ptr &worker, const Task & auto task_id = spec.TaskId(); if (task.OnDispatch() != nullptr) { - task.OnDispatch()(worker, initial_config_.node_manager_address, worker->Port()); + task.OnDispatch()(worker, initial_config_.node_manager_address, worker->Port(), + worker->GetTaskResourceIds()); post_assign_callbacks->push_back([this, worker, task_id]() { FinishAssignTask(worker, task_id, /*success=*/true); });