[core] Pin arguments during task execution (#13737)

* tmp

* Pin task args

* unit tests

* update

* test

* Fix
This commit is contained in:
Stephanie Wang 2021-01-28 19:07:10 -08:00 committed by GitHub
parent 813a7ab0e2
commit 42d501d747
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 222 additions and 99 deletions

View file

@ -296,9 +296,6 @@ def test_pull_request_retry(shutdown_only):
ray.get(driver.remote())
@pytest.mark.skip(
reason="This hangs due to a deadlock between a worker getting its "
"arguments and the node pulling arguments for the next task queued.")
@pytest.mark.timeout(30)
def test_pull_bundles_admission_control(shutdown_only):
cluster = Cluster()
@ -333,9 +330,6 @@ def test_pull_bundles_admission_control(shutdown_only):
ray.get(tasks)
@pytest.mark.skip(
reason="This hangs due to a deadlock between a worker getting its "
"arguments and the node pulling arguments for the next task queued.")
@pytest.mark.timeout(30)
def test_pull_bundles_admission_control_dynamic(shutdown_only):
# This test is the same as test_pull_bundles_admission_control, except that
@ -358,11 +352,13 @@ def test_pull_bundles_admission_control_dynamic(shutdown_only):
cluster.wait_for_nodes()
@ray.remote
def foo(*args):
def foo(i, *args):
print("foo", i)
return
@ray.remote
def allocate(*args):
def allocate(i):
print("allocate", i)
return np.zeros(object_size, dtype=np.uint8)
args = []
@ -373,8 +369,8 @@ def test_pull_bundles_admission_control_dynamic(shutdown_only):
]
args.append(task_args)
tasks = [foo.remote(*task_args) for task_args in args]
allocated = [allocate.remote() for _ in range(num_objects)]
tasks = [foo.remote(i, *task_args) for i, task_args in enumerate(args)]
allocated = [allocate.remote(i) for i in range(num_objects)]
ray.get(tasks)
del allocated

View file

@ -618,9 +618,6 @@ def test_release_during_plasma_fetch(object_spilling_config, shutdown_only):
do_test_release_resource(object_spilling_config, expect_released=True)
@pytest.mark.skip(
reason="This hangs due to a deadlock between a worker getting its "
"arguments and the node pulling arguments for the next task queued.")
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
@pytest.mark.timeout(30)

View file

@ -185,12 +185,6 @@ bool DependencyManager::RequestTaskDependencies(
return task_entry.num_missing_dependencies == 0;
}
bool DependencyManager::IsTaskReady(const TaskID &task_id) const {
auto task_entry = queued_task_requests_.find(task_id);
RAY_CHECK(task_entry != queued_task_requests_.end());
return task_entry->second.num_missing_dependencies == 0;
}
void DependencyManager::RemoveTaskDependencies(const TaskID &task_id) {
RAY_LOG(DEBUG) << "Removing dependencies for task " << task_id;
auto task_entry = queued_task_requests_.find(task_id);

View file

@ -37,7 +37,6 @@ class TaskDependencyManagerInterface {
virtual bool RequestTaskDependencies(
const TaskID &task_id,
const std::vector<rpc::ObjectReference> &required_objects) = 0;
virtual bool IsTaskReady(const TaskID &task_id) const = 0;
virtual void RemoveTaskDependencies(const TaskID &task_id) = 0;
virtual ~TaskDependencyManagerInterface(){};
};
@ -131,14 +130,6 @@ class DependencyManager : public TaskDependencyManagerInterface {
bool RequestTaskDependencies(const TaskID &task_id,
const std::vector<rpc::ObjectReference> &required_objects);
/// Check whether a task is ready to run. The task ID must have been
/// previously added by the caller.
///
/// \param task_id The ID of the task to check.
/// \return Whether all of the dependencies for the task are
/// local.
bool IsTaskReady(const TaskID &task_id) const;
/// Cancel a task's dependencies. We will no longer attempt to fetch any
/// remote dependencies, if no other task or worker requires them.
///

View file

@ -89,7 +89,6 @@ TEST_F(DependencyManagerTest, TestSimpleTask) {
dependency_manager_.RequestTaskDependencies(task_id, ObjectIdsToRefs(arguments));
ASSERT_FALSE(ready);
ASSERT_EQ(object_manager_mock_.active_requests.size(), 1);
ASSERT_FALSE(dependency_manager_.IsTaskReady(task_id));
// For each argument, tell the task dependency manager that the argument is
// local. All arguments should be canceled as they become available locally.
@ -98,15 +97,12 @@ TEST_F(DependencyManagerTest, TestSimpleTask) {
}
auto ready_task_ids = dependency_manager_.HandleObjectLocal(arguments[0]);
ASSERT_TRUE(ready_task_ids.empty());
ASSERT_FALSE(dependency_manager_.IsTaskReady(task_id));
ready_task_ids = dependency_manager_.HandleObjectLocal(arguments[1]);
ASSERT_TRUE(ready_task_ids.empty());
ASSERT_FALSE(dependency_manager_.IsTaskReady(task_id));
// The task is ready to run.
ready_task_ids = dependency_manager_.HandleObjectLocal(arguments[2]);
ASSERT_EQ(ready_task_ids.size(), 1);
ASSERT_EQ(ready_task_ids.front(), task_id);
ASSERT_TRUE(dependency_manager_.IsTaskReady(task_id));
// Remove the task.
dependency_manager_.RemoveTaskDependencies(task_id);
@ -127,7 +123,6 @@ TEST_F(DependencyManagerTest, TestMultipleTasks) {
bool ready = dependency_manager_.RequestTaskDependencies(
task_id, ObjectIdsToRefs({argument_id}));
ASSERT_FALSE(ready);
ASSERT_FALSE(dependency_manager_.IsTaskReady(task_id));
// The object should be requested from the object manager once for each task.
ASSERT_EQ(object_manager_mock_.active_requests.size(), i + 1);
}
@ -139,7 +134,6 @@ TEST_F(DependencyManagerTest, TestMultipleTasks) {
std::unordered_set<TaskID> added_tasks(dependent_tasks.begin(), dependent_tasks.end());
for (auto &id : ready_task_ids) {
ASSERT_TRUE(added_tasks.erase(id));
ASSERT_TRUE(dependency_manager_.IsTaskReady(id));
}
ASSERT_TRUE(added_tasks.empty());
@ -166,7 +160,6 @@ TEST_F(DependencyManagerTest, TestTaskArgEviction) {
bool ready =
dependency_manager_.RequestTaskDependencies(task_id, ObjectIdsToRefs(arguments));
ASSERT_FALSE(ready);
ASSERT_FALSE(dependency_manager_.IsTaskReady(task_id));
// Tell the task dependency manager that each of the arguments is now
// available.
@ -183,7 +176,6 @@ TEST_F(DependencyManagerTest, TestTaskArgEviction) {
ASSERT_TRUE(ready_tasks.empty());
}
}
ASSERT_TRUE(dependency_manager_.IsTaskReady(task_id));
// Simulate each of the arguments getting evicted. Each object should now be
// considered remote.
@ -203,7 +195,6 @@ TEST_F(DependencyManagerTest, TestTaskArgEviction) {
// the waiting state.
ASSERT_TRUE(waiting_tasks.empty());
}
ASSERT_FALSE(dependency_manager_.IsTaskReady(task_id));
}
// Tell the task dependency manager that each of the arguments is available
@ -221,7 +212,6 @@ TEST_F(DependencyManagerTest, TestTaskArgEviction) {
ASSERT_TRUE(ready_tasks.empty());
}
}
ASSERT_TRUE(dependency_manager_.IsTaskReady(task_id));
dependency_manager_.RemoveTaskDependencies(task_id);
AssertNoLeaks();

View file

@ -222,7 +222,11 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self
self_node_id_,
std::dynamic_pointer_cast<ClusterResourceScheduler>(cluster_resource_scheduler_),
dependency_manager_, is_owner_alive, get_node_info_func, announce_infeasible_task,
worker_pool_, leased_workers_));
worker_pool_, leased_workers_,
[this](const std::vector<ObjectID> &object_ids,
std::vector<std::unique_ptr<RayObject>> *results) {
return GetObjectsFromPlasma(object_ids, results);
}));
placement_group_resource_manager_ =
std::make_shared<NewPlacementGroupResourceManager>(
std::dynamic_pointer_cast<ClusterResourceScheduler>(
@ -1242,8 +1246,9 @@ void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &clie
if ((!task_id.IsNil() || !actor_id.IsNil()) && !worker->IsDead()) {
// If the worker was an actor, it'll be cleaned by GCS.
if (actor_id.IsNil()) {
// Return the resources that were being used by this worker.
Task task;
static_cast<void>(local_queues_.RemoveTask(task_id, &task));
cluster_task_manager_->TaskFinished(worker, &task);
}
if (disconnect_type == rpc::WorkerExitType::SYSTEM_ERROR_EXIT) {
@ -2365,6 +2370,33 @@ std::string compact_tag_string(const opencensus::stats::ViewDescriptor &view,
return result.str();
}
bool NodeManager::GetObjectsFromPlasma(const std::vector<ObjectID> &object_ids,
std::vector<std::unique_ptr<RayObject>> *results) {
// Pin the objects in plasma by getting them and holding a reference to
// the returned buffer.
// NOTE: the caller must ensure that the objects already exist in plasma before
// sending a PinObjectIDs request.
std::vector<plasma::ObjectBuffer> plasma_results;
// TODO(swang): This `Get` has a timeout of 0, so the plasma store will not
// block when serving the request. However, if the plasma store is under
// heavy load, then this request can still block the NodeManager event loop
// since we must wait for the plasma store's reply. We should consider using
// an `AsyncGet` instead.
if (!store_client_.Get(object_ids, /*timeout_ms=*/0, &plasma_results).ok()) {
return false;
}
for (const auto &plasma_result : plasma_results) {
if (plasma_result.data == nullptr) {
results->push_back(nullptr);
} else {
results->emplace_back(std::unique_ptr<RayObject>(
new RayObject(plasma_result.data, plasma_result.metadata, {})));
}
}
return true;
}
void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request,
rpc::PinObjectIDsReply *reply,
rpc::SendReplyCallback send_reply_callback) {
@ -2374,33 +2406,16 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request,
object_ids.push_back(ObjectID::FromBinary(object_id_binary));
}
if (object_pinning_enabled_) {
// Pin the objects in plasma by getting them and holding a reference to
// the returned buffer.
// NOTE: the caller must ensure that the objects already exist in plasma before
// sending a PinObjectIDs request.
std::vector<plasma::ObjectBuffer> plasma_results;
// TODO(swang): This `Get` has a timeout of 0, so the plasma store will not
// block when serving the request. However, if the plasma store is under
// heavy load, then this request can still block the NodeManager event loop
// since we must wait for the plasma store's reply. We should consider using
// an `AsyncGet` instead.
if (!store_client_.Get(object_ids, /*timeout_ms=*/0, &plasma_results).ok()) {
RAY_LOG(WARNING) << "Failed to get objects to be pinned from object store.";
std::vector<std::unique_ptr<RayObject>> results;
if (!GetObjectsFromPlasma(object_ids, &results)) {
RAY_LOG(WARNING)
<< "Failed to get objects that should have been in the object store. These "
"objects may have been evicted while there are still references in scope.";
// TODO(suquark): Maybe "Status::ObjectNotFound" is more accurate here.
send_reply_callback(Status::Invalid("Failed to get objects."), nullptr, nullptr);
return;
}
std::vector<std::unique_ptr<RayObject>> objects;
for (int64_t i = 0; i < request.object_ids().size(); i++) {
if (plasma_results[i].data == nullptr) {
objects.push_back(nullptr);
} else {
objects.emplace_back(std::unique_ptr<RayObject>(
new RayObject(plasma_results[i].data, plasma_results[i].metadata, {})));
}
}
local_object_manager_.PinObjects(object_ids, std::move(objects));
local_object_manager_.PinObjects(object_ids, std::move(results));
}
// Wait for the object to be freed by the owner, which keeps the ref count.
local_object_manager_.WaitForObjectFree(request.owner_address(), object_ids);

View file

@ -647,6 +647,16 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
std::unordered_map<SchedulingClass, ordered_set<TaskID>> MakeTasksByClass(
const std::vector<Task> &tasks) const;
/// Get pointers to objects stored in plasma. They will be
/// released once the returned references go out of scope.
///
/// \param[in] object_ids The objects to get.
/// \param[out] results The pointers to objects stored in
/// plasma.
/// \return Whether the request was successful.
bool GetObjectsFromPlasma(const std::vector<ObjectID> &object_ids,
std::vector<std::unique_ptr<RayObject>> *results);
///////////////////////////////////////////////////////////////////////////////////////
//////////////////// Begin of the override methods of ClusterTaskManager //////////////
// The following methods are defined in node_manager.task.cc instead of node_manager.cc

View file

@ -20,7 +20,10 @@ ClusterTaskManager::ClusterTaskManager(
NodeInfoGetter get_node_info,
std::function<void(const Task &)> announce_infeasible_task,
WorkerPoolInterface &worker_pool,
std::unordered_map<WorkerID, std::shared_ptr<WorkerInterface>> &leased_workers)
std::unordered_map<WorkerID, std::shared_ptr<WorkerInterface>> &leased_workers,
std::function<bool(const std::vector<ObjectID> &object_ids,
std::vector<std::unique_ptr<RayObject>> *results)>
pin_task_arguments)
: self_node_id_(self_node_id),
cluster_resource_scheduler_(cluster_resource_scheduler),
task_dependency_manager_(task_dependency_manager),
@ -31,7 +34,8 @@ ClusterTaskManager::ClusterTaskManager(
RayConfig::instance().max_resource_shapes_per_load_report()),
report_worker_backlog_(RayConfig::instance().report_worker_backlog()),
worker_pool_(worker_pool),
leased_workers_(leased_workers) {}
leased_workers_(leased_workers),
pin_task_arguments_(pin_task_arguments) {}
bool ClusterTaskManager::SchedulePendingTasks() {
// Always try to schedule infeasible tasks in case they are now feasible.
@ -144,11 +148,36 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers(
auto &task = std::get<0>(work);
auto &spec = task.GetTaskSpecification();
std::vector<std::unique_ptr<RayObject>> args;
bool success = true;
const auto &deps = spec.GetDependencyIds();
if (!deps.empty()) {
// This gets refs to the arguments stored in plasma. The refs should be
// deleted once we no longer need to pin the arguments.
success = pin_task_arguments_(deps, &args);
if (!success) {
RAY_LOG(WARNING) << "Error getting task arguments from plasma store";
}
for (size_t i = 0; i < deps.size(); i++) {
if (args[i] == nullptr) {
// This can happen if the task's arguments were all local at some
// point, but then at least one was evicted before the task could
// be dispatched to a worker.
RAY_LOG(INFO)
<< "Task " << spec.TaskId() << " argument " << deps[i]
<< " was evicted before the task could be dispatched. This can happen "
"when there are many objects needed on this node. The task will be "
"scheduled once all of its dependencies are local.";
success = false;
break;
}
}
}
// An argument was evicted since this task was added to the dispatch
// queue. Move it back to the waiting queue. The caller is responsible
// for notifying us when the task is unblocked again.
if (!spec.GetDependencies().empty() &&
!task_dependency_manager_.IsTaskReady(spec.TaskId())) {
if (!success) {
waiting_tasks_[spec.TaskId()] = std::move(*work_it);
work_it = dispatch_queue.erase(work_it);
continue;
@ -177,6 +206,12 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers(
bool worker_leased;
bool remove = AttemptDispatchWork(*work_it, worker, &worker_leased);
if (worker_leased) {
// Pin the arguments while the lease is active. These will be erased
// once the lease is returned.
num_pinned_task_arguments_ += args.size();
RAY_CHECK(pinned_task_arguments_.emplace(spec.TaskId(), std::move(args)).second)
<< spec.TaskId();
auto reply = std::get<1>(*work_it);
auto callback = std::get<2>(*work_it);
Dispatch(worker, leased_workers_, task, reply, callback);
@ -295,6 +330,10 @@ void ClusterTaskManager::TaskFinished(std::shared_ptr<WorkerInterface> worker,
Task *task) {
RAY_CHECK(worker != nullptr && task != nullptr);
*task = worker->GetAssignedTask();
auto it = pinned_task_arguments_.find(task->GetTaskSpecification().TaskId());
RAY_CHECK(it != pinned_task_arguments_.end());
num_pinned_task_arguments_ -= it->second.size();
pinned_task_arguments_.erase(it);
if (worker->GetAllocatedInstances() != nullptr) {
ReleaseWorkerResources(worker);
}
@ -633,6 +672,8 @@ std::string ClusterTaskManager::DebugStr() const {
buffer << "Schedule queue length: " << num_tasks_to_schedule << "\n";
buffer << "Dispatch queue length: " << num_tasks_to_dispatch << "\n";
buffer << "Waiting tasks size: " << waiting_tasks_.size() << "\n";
buffer << "Number of executing tasks: " << pinned_task_arguments_.size() << "\n";
buffer << "Number of pinned task arguments: " << num_pinned_task_arguments_ << "\n";
buffer << "cluster_resource_scheduler state: "
<< cluster_resource_scheduler_->DebugString() << "\n";
buffer << "==================================================";

View file

@ -2,6 +2,7 @@
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "ray/common/ray_object.h"
#include "ray/common/task/task.h"
#include "ray/common/task/task_common.h"
#include "ray/raylet/dependency_manager.h"
@ -60,7 +61,10 @@ class ClusterTaskManager : public ClusterTaskManagerInterface {
NodeInfoGetter get_node_info,
std::function<void(const Task &)> announce_infeasible_task,
WorkerPoolInterface &worker_pool,
std::unordered_map<WorkerID, std::shared_ptr<WorkerInterface>> &leased_workers);
std::unordered_map<WorkerID, std::shared_ptr<WorkerInterface>> &leased_workers,
std::function<bool(const std::vector<ObjectID> &object_ids,
std::vector<std::unique_ptr<RayObject>> *results)>
pin_task_arguments);
/// (Step 1) Queue tasks and schedule.
/// Queue task and schedule. This hanppens when processing the worker lease request.
@ -248,6 +252,22 @@ class ClusterTaskManager : public ClusterTaskManagerInterface {
WorkerPoolInterface &worker_pool_;
std::unordered_map<WorkerID, std::shared_ptr<WorkerInterface>> &leased_workers_;
/// Callback to get references to task arguments. These will be pinned while
/// the task is running.
std::function<bool(const std::vector<ObjectID> &object_ids,
std::vector<std::unique_ptr<RayObject>> *results)>
pin_task_arguments_;
/// Arguments needed by currently granted lease requests. These should be
/// pinned before the lease is granted to ensure that the arguments are not
/// evicted before the task(s) start running.
std::unordered_map<TaskID, std::vector<std::unique_ptr<RayObject>>>
pinned_task_arguments_;
/// The total number of arguments pinned for running tasks.
/// Used for debug purposes.
size_t num_pinned_task_arguments_ = 0;
/// Determine whether a task should be immediately dispatched,
/// or placed on a wait queue.
///

View file

@ -85,7 +85,7 @@ Task CreateTask(const std::unordered_map<std::string, double> &required_resource
std::make_pair(PlacementGroupID::Nil(), -1), true, "");
for (int i = 0; i < num_args; i++) {
ObjectID put_id = ObjectID::FromIndex(TaskID::Nil(), /*index=*/i + 1);
ObjectID put_id = ObjectID::FromIndex(RandomTaskId(), /*index=*/i + 1);
spec_builder.AddArg(TaskArgByReference(put_id, rpc::Address()));
}
@ -96,20 +96,25 @@ Task CreateTask(const std::unordered_map<std::string, double> &required_resource
class MockTaskDependencyManager : public TaskDependencyManagerInterface {
public:
MockTaskDependencyManager(std::unordered_set<ObjectID> &missing_objects)
: missing_objects_(missing_objects) {}
bool RequestTaskDependencies(
const TaskID &task_id, const std::vector<rpc::ObjectReference> &required_objects) {
RAY_CHECK(subscribed_tasks.insert(task_id).second);
return task_ready_;
for (auto &obj_ref : required_objects) {
if (missing_objects_.count(ObjectRefToId(obj_ref))) {
return false;
}
}
return true;
}
void RemoveTaskDependencies(const TaskID &task_id) {
RAY_CHECK(subscribed_tasks.erase(task_id));
}
bool IsTaskReady(const TaskID &task_id) const { return task_ready_; }
bool task_ready_ = true;
std::unordered_set<ObjectID> &missing_objects_;
std::unordered_set<TaskID> subscribed_tasks;
};
@ -121,16 +126,34 @@ class ClusterTaskManagerTest : public ::testing::Test {
is_owner_alive_(true),
node_info_calls_(0),
announce_infeasible_task_calls_(0),
task_manager_(id_, scheduler_, dependency_manager_,
[this](const WorkerID &worker_id, const NodeID &node_id) {
return is_owner_alive_;
},
[this](const NodeID &node_id) {
node_info_calls_++;
return node_info_[node_id];
},
[this](const Task &task) { announce_infeasible_task_calls_++; },
pool_, leased_workers_) {}
dependency_manager_(missing_objects_),
task_manager_(
id_, scheduler_, dependency_manager_,
[this](const WorkerID &worker_id, const NodeID &node_id) {
return is_owner_alive_;
},
[this](const NodeID &node_id) {
node_info_calls_++;
return node_info_[node_id];
},
[this](const Task &task) { announce_infeasible_task_calls_++; }, pool_,
leased_workers_,
[this](const std::vector<ObjectID> &object_ids,
std::vector<std::unique_ptr<RayObject>> *results) {
for (auto &obj_id : object_ids) {
if (missing_objects_.count(obj_id) == 0) {
std::string meta = "metadata";
auto metadata = const_cast<uint8_t *>(
reinterpret_cast<const uint8_t *>(meta.data()));
auto meta_buffer =
std::make_shared<LocalMemoryBuffer>(metadata, meta.size());
results->emplace_back(new RayObject(nullptr, meta_buffer, {}));
} else {
results->emplace_back(nullptr);
}
}
return true;
}) {}
void SetUp() {}
@ -153,13 +176,25 @@ class ClusterTaskManagerTest : public ::testing::Test {
ASSERT_TRUE(task_manager_.tasks_to_dispatch_.empty());
ASSERT_TRUE(task_manager_.waiting_tasks_.empty());
ASSERT_TRUE(task_manager_.infeasible_tasks_.empty());
ASSERT_TRUE(task_manager_.pinned_task_arguments_.empty());
ASSERT_EQ(task_manager_.num_pinned_task_arguments_, 0);
ASSERT_TRUE(dependency_manager_.subscribed_tasks.empty());
}
void AssertPinnedTaskArgumentsEquals(const TaskID &task_id, size_t num_args_expected) {
ASSERT_EQ(task_manager_.pinned_task_arguments_[task_id].size(), num_args_expected);
size_t num_args = 0;
for (auto &args : task_manager_.pinned_task_arguments_) {
num_args += args.second.size();
}
ASSERT_EQ(task_manager_.num_pinned_task_arguments_, num_args);
}
NodeID id_;
std::shared_ptr<ClusterResourceScheduler> scheduler_;
MockWorkerPool pool_;
std::unordered_map<WorkerID, std::shared_ptr<WorkerInterface>> leased_workers_;
std::unordered_set<ObjectID> missing_objects_;
bool is_owner_alive_;
@ -203,6 +238,11 @@ TEST_F(ClusterTaskManagerTest, BasicTest) {
ASSERT_EQ(pool_.workers.size(), 0);
ASSERT_EQ(node_info_calls_, 0);
Task finished_task;
task_manager_.TaskFinished(leased_workers_.begin()->second, &finished_task);
ASSERT_EQ(finished_task.GetTaskSpecification().TaskId(),
task.GetTaskSpecification().TaskId());
AssertNoLeaks();
}
@ -252,8 +292,9 @@ TEST_F(ClusterTaskManagerTest, ResourceTakenWhileResolving) {
};
/* Blocked on dependencies */
dependency_manager_.task_ready_ = false;
auto task = CreateTask({{ray::kCPU_ResourceLabel, 5}}, 1);
auto task = CreateTask({{ray::kCPU_ResourceLabel, 5}}, 2);
auto missing_arg = task.GetTaskSpecification().GetDependencyIds()[0];
missing_objects_.insert(missing_arg);
std::unordered_set<TaskID> expected_subscribed_tasks = {
task.GetTaskSpecification().TaskId()};
task_manager_.QueueAndScheduleTask(task, &reply, callback);
@ -264,36 +305,42 @@ TEST_F(ClusterTaskManagerTest, ResourceTakenWhileResolving) {
ASSERT_EQ(pool_.workers.size(), 2);
/* This task can run */
auto task2 = CreateTask({{ray::kCPU_ResourceLabel, 5}});
auto task2 = CreateTask({{ray::kCPU_ResourceLabel, 5}}, 1);
task_manager_.QueueAndScheduleTask(task2, &reply, callback);
ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks);
AssertPinnedTaskArgumentsEquals(task2.GetTaskSpecification().TaskId(), 1);
ASSERT_EQ(num_callbacks, 1);
ASSERT_EQ(leased_workers_.size(), 1);
ASSERT_EQ(pool_.workers.size(), 1);
/* First task is unblocked now, but resources are no longer available */
dependency_manager_.task_ready_ = true;
missing_objects_.erase(missing_arg);
auto id = task.GetTaskSpecification().TaskId();
std::vector<TaskID> unblocked = {id};
task_manager_.TasksUnblocked(unblocked);
ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks);
AssertPinnedTaskArgumentsEquals(task2.GetTaskSpecification().TaskId(), 1);
ASSERT_EQ(num_callbacks, 1);
ASSERT_EQ(leased_workers_.size(), 1);
ASSERT_EQ(pool_.workers.size(), 1);
/* Second task finishes, making space for the original task */
Task finished_task;
task_manager_.TaskFinished(leased_workers_.begin()->second, &finished_task);
leased_workers_.clear();
task_manager_.ReleaseWorkerResources(worker);
task_manager_.ScheduleAndDispatchTasks();
ASSERT_TRUE(dependency_manager_.subscribed_tasks.empty());
// Task2 is now done so task can run.
AssertPinnedTaskArgumentsEquals(task.GetTaskSpecification().TaskId(), 2);
ASSERT_EQ(num_callbacks, 2);
ASSERT_EQ(leased_workers_.size(), 1);
ASSERT_EQ(pool_.workers.size(), 0);
task_manager_.TaskFinished(leased_workers_.begin()->second, &finished_task);
AssertNoLeaks();
}
@ -342,6 +389,12 @@ TEST_F(ClusterTaskManagerTest, TestSpillAfterAssigned) {
// The second task was spilled.
ASSERT_EQ(spillback_reply.retry_at_raylet_address().raylet_id(),
remote_node_id.Binary());
Task finished_task;
task_manager_.TaskFinished(leased_workers_.begin()->second, &finished_task);
ASSERT_EQ(finished_task.GetTaskSpecification().TaskId(),
task.GetTaskSpecification().TaskId());
AssertNoLeaks();
}
@ -385,6 +438,12 @@ TEST_F(ClusterTaskManagerTest, TaskCancellationTest) {
ASSERT_FALSE(callback_called);
ASSERT_EQ(pool_.workers.size(), 0);
ASSERT_EQ(leased_workers_.size(), 1);
Task finished_task;
task_manager_.TaskFinished(leased_workers_.begin()->second, &finished_task);
ASSERT_EQ(finished_task.GetTaskSpecification().TaskId(),
task.GetTaskSpecification().TaskId());
AssertNoLeaks();
}
@ -615,6 +674,12 @@ TEST_F(ClusterTaskManagerTest, BacklogReportTest) {
task_manager_.FillResourceUsage(data);
auto resource_load_by_shape = data->resource_load_by_shape();
ASSERT_EQ(resource_load_by_shape.resource_demands().size(), 0);
while (!leased_workers_.empty()) {
Task finished_task;
task_manager_.TaskFinished(leased_workers_.begin()->second, &finished_task);
leased_workers_.erase(leased_workers_.begin());
}
AssertNoLeaks();
}
}
@ -785,8 +850,9 @@ TEST_F(ClusterTaskManagerTest, ArgumentEvicted) {
};
/* Blocked on dependencies */
dependency_manager_.task_ready_ = false;
auto task = CreateTask({{ray::kCPU_ResourceLabel, 5}}, 2);
auto missing_arg = task.GetTaskSpecification().GetDependencyIds()[0];
missing_objects_.insert(missing_arg);
std::unordered_set<TaskID> expected_subscribed_tasks = {
task.GetTaskSpecification().TaskId()};
task_manager_.QueueAndScheduleTask(task, &reply, callback);
@ -795,7 +861,7 @@ TEST_F(ClusterTaskManagerTest, ArgumentEvicted) {
ASSERT_EQ(leased_workers_.size(), 0);
/* Task is unblocked now */
dependency_manager_.task_ready_ = true;
missing_objects_.erase(missing_arg);
pool_.workers.clear();
auto id = task.GetTaskSpecification().TaskId();
task_manager_.TasksUnblocked({id});
@ -804,7 +870,7 @@ TEST_F(ClusterTaskManagerTest, ArgumentEvicted) {
ASSERT_EQ(leased_workers_.size(), 0);
/* Task argument gets evicted */
dependency_manager_.task_ready_ = false;
missing_objects_.insert(missing_arg);
pool_.PushWorker(std::dynamic_pointer_cast<WorkerInterface>(worker));
task_manager_.ScheduleAndDispatchTasks();
ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks);
@ -812,10 +878,16 @@ TEST_F(ClusterTaskManagerTest, ArgumentEvicted) {
ASSERT_EQ(leased_workers_.size(), 0);
/* Worker available and arguments available */
dependency_manager_.task_ready_ = true;
missing_objects_.erase(missing_arg);
task_manager_.TasksUnblocked({id});
ASSERT_EQ(num_callbacks, 1);
ASSERT_EQ(leased_workers_.size(), 1);
Task finished_task;
task_manager_.TaskFinished(leased_workers_.begin()->second, &finished_task);
ASSERT_EQ(finished_task.GetTaskSpecification().TaskId(),
task.GetTaskSpecification().TaskId());
AssertNoLeaks();
}

View file

@ -33,7 +33,7 @@ class MockWorker : public WorkerInterface {
void AssignTaskId(const TaskID &task_id) {}
void SetAssignedTask(const Task &assigned_task) {}
void SetAssignedTask(const Task &assigned_task) { task_ = assigned_task; }
const std::string IpAddress() const { return address_.ip_address(); }
@ -162,11 +162,7 @@ class MockWorker : public WorkerInterface {
void SetBundleId(const BundleID &bundle_id) { bundle_id_ = bundle_id; }
Task &GetAssignedTask() {
RAY_CHECK(false) << "Method unused";
auto *t = new Task();
return *t;
}
Task &GetAssignedTask() { return task_; }
bool IsRegistered() {
RAY_CHECK(false) << "Method unused";
@ -188,6 +184,7 @@ class MockWorker : public WorkerInterface {
bool is_detached_actor_;
BundleID bundle_id_;
bool blocked_ = false;
Task task_;
};
} // namespace raylet