diff --git a/python/ray/tests/test_dynres.py b/python/ray/tests/test_dynres.py index ea647adf1..34b104132 100644 --- a/python/ray/tests/test_dynres.py +++ b/python/ray/tests/test_dynres.py @@ -569,3 +569,39 @@ def test_dynamic_res_creation_stress(ray_start_cluster): all_resources_created.append(str(i) in resources) success = all(all_resources_created) assert success + + +def test_release_cpus_when_actor_creation_task_blocking(shutdown_only): + ray.init(num_cpus=2) + + @ray.remote(num_cpus=1) + def get_100(): + time.sleep(1) + return 100 + + @ray.remote(num_cpus=1) + class A(object): + def __init__(self): + self.num = ray.get(get_100.remote()) + + def get_num(self): + return self.num + + a = A.remote() + assert 100 == ray.get(a.get_num.remote()) + + def wait_until(condition, timeout_ms): + SLEEP_DURATION_MS = 100 + time_elapsed = 0 + while time_elapsed <= timeout_ms: + if condition(): + return True + time.sleep(SLEEP_DURATION_MS) + time_elapsed += SLEEP_DURATION_MS + return False + + def assert_available_resources(): + return 1 == ray.available_resources()["CPU"] + + result = wait_until(assert_available_resources, 1000) + assert result is True diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 50af61c9e..a1ac878c2 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1576,14 +1576,11 @@ void NodeManager::HandleTaskBlocked(const WorkerID &worker_id, RAY_CHECK(local_queues_.RemoveTask(current_task_id, &task)); local_queues_.QueueTasks({task}, TaskState::RUNNING); // Get the CPU resources required by the running task. - const auto required_resources = task.GetTaskSpecification().GetRequiredResources(); - const ResourceSet cpu_resources = required_resources.GetNumCpus(); - // Release the CPU resources. auto const cpu_resource_ids = worker->ReleaseTaskCpuResources(); local_available_resources_.Release(cpu_resource_ids); cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release( - cpu_resources); + cpu_resource_ids.ToResourceSet()); worker->MarkBlocked(); // Try dispatching tasks since we may have released some resources.