[Datasets] Fix max number of actors for default actor pool strategy (#26266)

This commit is contained in:
Cheng Su 2022-07-03 14:40:24 -07:00 committed by GitHub
parent 818bb78542
commit 7360452d2a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 27 additions and 3 deletions

View file

@ -184,6 +184,8 @@ class ActorPoolStrategy(ComputeStrategy):
self.min_size = min_size
self.max_size = max_size or float("inf")
self.max_tasks_in_flight_per_actor = max_tasks_in_flight_per_actor
self.num_workers = 0
self.ready_to_total_workers_ratio = 0.8
def _apply(
self,
@ -265,7 +267,7 @@ class ActorPoolStrategy(ComputeStrategy):
block, block_fn, input_files, self.fn, *fn_args, **fn_kwargs
)
if not remote_args:
if "num_cpus" not in remote_args:
remote_args["num_cpus"] = 1
if "scheduling_strategy" not in remote_args:
@ -295,7 +297,8 @@ class ActorPoolStrategy(ComputeStrategy):
if not ready:
if (
len(workers) < self.max_size
and len(ready_workers) / len(workers) > 0.8
and len(ready_workers) / len(workers)
> self.ready_to_total_workers_ratio
):
w = BlockWorker.remote(
*fn_constructor_args, **fn_constructor_kwargs
@ -351,6 +354,7 @@ class ActorPoolStrategy(ComputeStrategy):
tasks_in_flight[worker] += 1
map_bar.close()
self.num_workers += len(workers)
new_blocks, new_metadata = [], []
# Put blocks in input order.
results.sort(key=block_indices.get)

View file

@ -4219,7 +4219,7 @@ def test_polars_lazy_import(shutdown_only):
ctx.use_polars = original_use_polars
def test_actorpoolstrategy_apply_interrupt():
def test_actor_pool_strategy_apply_interrupt(shutdown_only):
"""Test that _apply kills the actor pool if an interrupt is raised."""
ray.init(include_dashboard=False, num_cpus=1)
@ -4244,6 +4244,26 @@ def test_actorpoolstrategy_apply_interrupt():
wait_for_condition(lambda: (ray.available_resources().get("CPU", 0) == cpus))
def test_actor_pool_strategy_default_num_actors(shutdown_only):
def f(x):
import time
time.sleep(1)
return x
num_cpus = 5
ray.init(num_cpus=num_cpus)
compute_strategy = ray.data.ActorPoolStrategy()
ray.data.range(10, parallelism=10).map_batches(f, compute=compute_strategy)
expected_max_num_workers = math.ceil(
num_cpus * (1 / compute_strategy.ready_to_total_workers_ratio)
)
assert (
compute_strategy.num_workers >= num_cpus
and compute_strategy.num_workers <= expected_max_num_workers
), "Number of actors is out of the expected bound"
if __name__ == "__main__":
import sys