mirror of
https://github.com/vale981/ray
synced 2025-03-05 18:11:42 -05:00
Fix releasing CPUs incorrectly when actor creation task blocked. (#5271)
* Fix * Remove useless log * Address * Fix typo * sleep
This commit is contained in:
parent
5ea859dc73
commit
1465a30ea9
2 changed files with 37 additions and 4 deletions
|
@ -569,3 +569,39 @@ def test_dynamic_res_creation_stress(ray_start_cluster):
|
|||
all_resources_created.append(str(i) in resources)
|
||||
success = all(all_resources_created)
|
||||
assert success
|
||||
|
||||
|
||||
def test_release_cpus_when_actor_creation_task_blocking(shutdown_only):
|
||||
ray.init(num_cpus=2)
|
||||
|
||||
@ray.remote(num_cpus=1)
|
||||
def get_100():
|
||||
time.sleep(1)
|
||||
return 100
|
||||
|
||||
@ray.remote(num_cpus=1)
|
||||
class A(object):
|
||||
def __init__(self):
|
||||
self.num = ray.get(get_100.remote())
|
||||
|
||||
def get_num(self):
|
||||
return self.num
|
||||
|
||||
a = A.remote()
|
||||
assert 100 == ray.get(a.get_num.remote())
|
||||
|
||||
def wait_until(condition, timeout_ms):
|
||||
SLEEP_DURATION_MS = 100
|
||||
time_elapsed = 0
|
||||
while time_elapsed <= timeout_ms:
|
||||
if condition():
|
||||
return True
|
||||
time.sleep(SLEEP_DURATION_MS)
|
||||
time_elapsed += SLEEP_DURATION_MS
|
||||
return False
|
||||
|
||||
def assert_available_resources():
|
||||
return 1 == ray.available_resources()["CPU"]
|
||||
|
||||
result = wait_until(assert_available_resources, 1000)
|
||||
assert result is True
|
||||
|
|
|
@ -1576,14 +1576,11 @@ void NodeManager::HandleTaskBlocked(const WorkerID &worker_id,
|
|||
RAY_CHECK(local_queues_.RemoveTask(current_task_id, &task));
|
||||
local_queues_.QueueTasks({task}, TaskState::RUNNING);
|
||||
// Get the CPU resources required by the running task.
|
||||
const auto required_resources = task.GetTaskSpecification().GetRequiredResources();
|
||||
const ResourceSet cpu_resources = required_resources.GetNumCpus();
|
||||
|
||||
// Release the CPU resources.
|
||||
auto const cpu_resource_ids = worker->ReleaseTaskCpuResources();
|
||||
local_available_resources_.Release(cpu_resource_ids);
|
||||
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release(
|
||||
cpu_resources);
|
||||
cpu_resource_ids.ToResourceSet());
|
||||
worker->MarkBlocked();
|
||||
|
||||
// Try dispatching tasks since we may have released some resources.
|
||||
|
|
Loading…
Add table
Reference in a new issue