[core] Nested tasks on by default (#20800)

This PR turns worker capping on by default. Note that there are a couple of faulty tests that this uncovers which are fixed here.

Co-authored-by: Alex Wu <alex@anyscale.com>
This commit is contained in:
Alex Wu 2022-01-06 15:00:03 -08:00 committed by GitHub
parent 39f8072eac
commit 8cf4071759
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 19 deletions

View file

@ -8,7 +8,7 @@ import pytest
import ray
import ray.ray_constants as ray_constants
from ray.cluster_utils import Cluster, cluster_not_supported
from ray._private.test_utils import RayTestTimeoutException, get_other_nodes
from ray._private.test_utils import get_other_nodes, Semaphore
SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM
@ -21,7 +21,9 @@ def ray_start_workers_separate_multinode(request):
# Start the Ray processes.
cluster = Cluster()
for _ in range(num_nodes):
cluster.add_node(num_cpus=num_initial_workers)
cluster.add_node(
num_cpus=num_initial_workers,
resources={"custom": num_initial_workers})
ray.init(address=cluster.address)
yield num_nodes, num_initial_workers
@ -34,23 +36,24 @@ def ray_start_workers_separate_multinode(request):
def test_worker_failed(ray_start_workers_separate_multinode):
num_nodes, num_initial_workers = (ray_start_workers_separate_multinode)
@ray.remote
block_worker = Semaphore.remote(0)
block_driver = Semaphore.remote(0)
ray.get([block_worker.locked.remote(), block_driver.locked.remote()])
# Acquire a custom resource that isn't released on `ray.get` to make sure
# this task gets spread across all the nodes.
@ray.remote(num_cpus=1, resources={"custom": 1})
def get_pids():
time.sleep(0.25)
ray.get(block_driver.release.remote())
ray.get(block_worker.acquire.remote())
return os.getpid()
start_time = time.time()
pids = set()
while len(pids) < num_nodes * num_initial_workers:
new_pids = ray.get([
get_pids.remote()
for _ in range(2 * num_nodes * num_initial_workers)
])
for pid in new_pids:
pids.add(pid)
if time.time() - start_time > 60:
raise RayTestTimeoutException(
"Timed out while waiting to get worker PIDs.")
total_num_workers = num_nodes * num_initial_workers
pid_refs = [get_pids.remote() for _ in range(total_num_workers)]
ray.get([block_driver.acquire.remote() for _ in range(total_num_workers)])
ray.get([block_worker.release.remote() for _ in range(total_num_workers)])
pids = set(ray.get(pid_refs))
@ray.remote
def f(x):

View file

@ -48,7 +48,9 @@ def init_virtual_actor(x):
@pytest.mark.parametrize(
"workflow_start_regular",
[{
"num_cpus": 4 # We need more CPUs, otherwise 'create()' blocks 'get()'
# We need more CPUs, otherwise 'create()' blocks 'get()', and task
# execution suffers from worker capping.
"num_cpus": 16
}],
indirect=True)
def test_readonly_actor(workflow_start_regular):
@ -73,7 +75,7 @@ def test_readonly_actor(workflow_start_regular):
start = time.time()
ray.get([readonly_actor.readonly_workload.run_async() for _ in range(10)])
end = time.time()
assert end - start < 5
assert end - start < 5, f"Took too long, {end-start}"
@workflow.virtual_actor

View file

@ -108,7 +108,7 @@ RAY_CONFIG(bool, preallocate_plasma_memory, false)
// If true, we place a soft cap on the numer of scheduling classes, see
// `worker_cap_initial_backoff_delay_ms`.
RAY_CONFIG(bool, worker_cap_enabled, false)
RAY_CONFIG(bool, worker_cap_enabled, true)
/// We place a soft cap on the number of tasks of a given scheduling class that
/// can run at once to limit the total nubmer of worker processes. After the