From 40be6904a5e089ea8ea9ef374969f710838f22b1 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 14 Jul 2022 13:02:52 -0700 Subject: [PATCH] [data] Avoid under-parallelization regressions and add better testing for parallelism detection (#26543) In the previous PR #25883, a subtle regression was introduced in the case where data sizes blow up significantly. For example, suppose you're reading jpeg-image files from a Dataset, which increase in size substantially on decompression. On a small-core cluster (e.g., 4 cores), you end up with 4-8 blocks of ~200MiB each when reading a 1GiB dataset. This can blow up to OOM the node when decompressed (e.g., 25x size increase). Previously the heuristic to use parallelism=200 avoids this small-node problem. This PR avoids this issue by (1) raising the min parallelism back to 200. As an optimization, we also introduce the min block size threshold, which allows using fewer blocks if the data size is really small (<100KiB per block). --- python/ray/data/_internal/util.py | 103 +++++++++++++++++- python/ray/data/context.py | 13 ++- python/ray/data/read_api.py | 67 ++---------- .../ray/data/tests/test_auto_parallelism.py | 90 +++++++++++++++ python/ray/data/tests/test_dataset.py | 22 ++-- python/ray/data/tests/test_dataset_formats.py | 3 +- .../ray/data/tests/test_dataset_pipeline.py | 22 ++-- 7 files changed, 238 insertions(+), 82 deletions(-) diff --git a/python/ray/data/_internal/util.py b/python/ray/data/_internal/util.py index ae5f1692f..ade8c94cc 100644 --- a/python/ray/data/_internal/util.py +++ b/python/ray/data/_internal/util.py @@ -1,6 +1,14 @@ import logging -from typing import Union +from typing import Union, Optional, TYPE_CHECKING from types import ModuleType +import sys + +import ray +from ray.data.context import DatasetContext + +if TYPE_CHECKING: + from ray.data.datasource import Reader + from ray.util.placement_group import PlacementGroup logger = logging.getLogger(__name__) @@ -51,3 +59,96 @@ def _check_pyarrow_version(): ) else: _VERSION_VALIDATED = True + + +def _autodetect_parallelism( + parallelism: int, + cur_pg: Optional["PlacementGroup"], + ctx: DatasetContext, + reader: Optional["Reader"] = None, + avail_cpus: Optional[int] = None, +) -> (int, int): + """Returns parallelism to use and the min safe parallelism to avoid OOMs. + + This detects parallelism using the following heuristics, applied in order: + + 1) We start with the default parallelism of 200. + 2) Min block size. If the parallelism would make blocks smaller than this + threshold, the parallelism is reduced to avoid the overhead of tiny blocks. + 3) Max block size. If the parallelism would make blocks larger than this + threshold, the parallelism is increased to avoid OOMs during processing. + 4) Available CPUs. If the parallelism cannot make use of all the available + CPUs in the cluster, the parallelism is increased until it can. + + Args: + parallelism: The user-requested parallelism, or -1 for auto-detection. + cur_pg: The current placement group, to be used for avail cpu calculation. + ctx: The current Dataset context to use for configs. + reader: The datasource reader, to be used for data size estimation. + avail_cpus: Override avail cpus detection (for testing only). + + Returns: + Tuple of detected parallelism (only if -1 was specified), and the min safe + parallelism (which can be used to generate warnings about large blocks). + """ + min_safe_parallelism = 1 + max_reasonable_parallelism = sys.maxsize + if reader: + mem_size = reader.estimate_inmemory_data_size() + if mem_size is not None: + min_safe_parallelism = max(1, int(mem_size / ctx.target_max_block_size)) + max_reasonable_parallelism = max( + 1, int(mem_size / ctx.target_min_block_size) + ) + else: + mem_size = None + if parallelism < 0: + if parallelism != -1: + raise ValueError("`parallelism` must either be -1 or a positive integer.") + # Start with 2x the number of cores as a baseline, with a min floor. + avail_cpus = avail_cpus or _estimate_avail_cpus(cur_pg) + parallelism = max( + min(ctx.min_parallelism, max_reasonable_parallelism), + min_safe_parallelism, + avail_cpus * 2, + ) + logger.debug( + f"Autodetected parallelism={parallelism} based on " + f"estimated_available_cpus={avail_cpus} and " + f"estimated_data_size={mem_size}." + ) + return parallelism, min_safe_parallelism + + +def _estimate_avail_cpus(cur_pg: Optional["PlacementGroup"]) -> int: + """Estimates the available CPU parallelism for this Dataset in the cluster. + + If we aren't in a placement group, this is trivially the number of CPUs in the + cluster. Otherwise, we try to calculate how large the placement group is relative + to the size of the cluster. + + Args: + cur_pg: The current placement group, if any. + """ + cluster_cpus = int(ray.cluster_resources().get("CPU", 1)) + cluster_gpus = int(ray.cluster_resources().get("GPU", 0)) + + # If we're in a placement group, we shouldn't assume the entire cluster's + # resources are available for us to use. Estimate an upper bound on what's + # reasonable to assume is available for datasets to use. + if cur_pg: + pg_cpus = 0 + for bundle in cur_pg.bundle_specs: + # Calculate the proportion of the cluster this placement group "takes up". + # Then scale our cluster_cpus proportionally to avoid over-parallelizing + # if there are many parallel Tune trials using the cluster. + cpu_fraction = bundle.get("CPU", 0) / max(1, cluster_cpus) + gpu_fraction = bundle.get("GPU", 0) / max(1, cluster_gpus) + max_fraction = max(cpu_fraction, gpu_fraction) + # Over-parallelize by up to a factor of 2, but no more than that. It's + # preferrable to over-estimate than under-estimate. + pg_cpus += 2 * int(max_fraction * cluster_cpus) + + return min(cluster_cpus, pg_cpus) + + return cluster_cpus diff --git a/python/ray/data/context.py b/python/ray/data/context.py index 782121d67..afac73b46 100644 --- a/python/ray/data/context.py +++ b/python/ray/data/context.py @@ -11,8 +11,13 @@ _default_context: "Optional[DatasetContext]" = None _context_lock = threading.Lock() # The max target block size in bytes for reads and transformations. +# We choose 512MiB as 8x less than the typical memory:core ratio of 4:1. DEFAULT_TARGET_MAX_BLOCK_SIZE = 512 * 1024 * 1024 +# Datasets will avoid creating blocks smaller than this size in bytes on read. +# This takes precedence over DEFAULT_MIN_PARALLELISM. +DEFAULT_TARGET_MIN_BLOCK_SIZE = 1 * 1024 * 1024 + # Whether block splitting is on by default DEFAULT_BLOCK_SPLITTING_ENABLED = False @@ -33,8 +38,9 @@ DEFAULT_OPTIMIZE_FUSE_READ_STAGES = True # Whether to furthermore fuse prior map tasks with shuffle stages. DEFAULT_OPTIMIZE_FUSE_SHUFFLE_STAGES = True -# Minimum amount of parallelism to auto-detect for a dataset. -DEFAULT_MIN_PARALLELISM = 8 +# Minimum amount of parallelism to auto-detect for a dataset. Note that the min +# block size config takes precedence over this. +DEFAULT_MIN_PARALLELISM = 200 # Wether to use actor based block prefetcher. DEFAULT_ACTOR_PREFETCHER_ENABLED = True @@ -64,6 +70,7 @@ class DatasetContext: block_owner: ray.actor.ActorHandle, block_splitting_enabled: bool, target_max_block_size: int, + target_min_block_size: int, enable_pandas_block: bool, optimize_fuse_stages: bool, optimize_fuse_read_stages: bool, @@ -80,6 +87,7 @@ class DatasetContext: self.block_owner = block_owner self.block_splitting_enabled = block_splitting_enabled self.target_max_block_size = target_max_block_size + self.target_min_block_size = target_min_block_size self.enable_pandas_block = enable_pandas_block self.optimize_fuse_stages = optimize_fuse_stages self.optimize_fuse_read_stages = optimize_fuse_read_stages @@ -110,6 +118,7 @@ class DatasetContext: block_owner=None, block_splitting_enabled=DEFAULT_BLOCK_SPLITTING_ENABLED, target_max_block_size=DEFAULT_TARGET_MAX_BLOCK_SIZE, + target_min_block_size=DEFAULT_TARGET_MIN_BLOCK_SIZE, enable_pandas_block=DEFAULT_ENABLE_PANDAS_BLOCK, optimize_fuse_stages=DEFAULT_OPTIMIZE_FUSE_STAGES, optimize_fuse_read_stages=DEFAULT_OPTIMIZE_FUSE_READ_STAGES, diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 3b6d5118a..b1350fbd0 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -12,7 +12,10 @@ from ray.data._internal.lazy_block_list import LazyBlockList from ray.data._internal.plan import ExecutionPlan from ray.data._internal.remote_fn import cached_remote_fn from ray.data._internal.stats import DatasetStats -from ray.data._internal.util import _lazy_import_pyarrow_dataset +from ray.data._internal.util import ( + _lazy_import_pyarrow_dataset, + _autodetect_parallelism, +) from ray.data.block import Block, BlockAccessor, BlockExecStats, BlockMetadata from ray.data.context import DEFAULT_SCHEDULING_STRATEGY, DatasetContext from ray.data.dataset import Dataset @@ -31,7 +34,6 @@ from ray.data.datasource import ( ParquetMetadataProvider, PathPartitionFilter, RangeDatasource, - Reader, ReadTask, ) from ray.data.datasource.file_based_datasource import ( @@ -79,7 +81,9 @@ def from_items(items: List[Any], *, parallelism: int = -1) -> Dataset[Any]: """ detected_parallelism, _ = _autodetect_parallelism( - parallelism, ray.util.get_current_placement_group() + parallelism, + ray.util.get_current_placement_group(), + DatasetContext.get_current(), ) block_size = max( 1, @@ -1122,7 +1126,7 @@ def _get_read_tasks( DatasetContext._set_current(ctx) reader = ds.create_reader(**kwargs) requested_parallelism, min_safe_parallelism = _autodetect_parallelism( - parallelism, cur_pg, reader + parallelism, cur_pg, DatasetContext.get_current(), reader ) return ( requested_parallelism, @@ -1131,61 +1135,6 @@ def _get_read_tasks( ) -def _autodetect_parallelism( - parallelism: int, cur_pg: Optional[PlacementGroup], reader: Optional[Reader] = None -) -> (int, int): - """Returns parallelism to use and the min safe parallelism to avoid OOMs.""" - # Autodetect parallelism requested. The heuristic here are that we should try - # to create as many blocks needed to saturate available resources, and also keep - # block sizes below the target memory size, but no more. Creating too many - # blocks is inefficient. - min_safe_parallelism = 1 - ctx = DatasetContext.get_current() - if reader: - mem_size = reader.estimate_inmemory_data_size() - if mem_size is not None: - min_safe_parallelism = max(1, int(mem_size / ctx.target_max_block_size)) - else: - mem_size = None - if parallelism < 0: - if parallelism != -1: - raise ValueError("`parallelism` must either be -1 or a positive integer.") - # Start with 2x the number of cores as a baseline, with a min floor. - avail_cpus = _estimate_avail_cpus(cur_pg) - parallelism = max(ctx.min_parallelism, min_safe_parallelism, avail_cpus * 2) - logger.debug( - f"Autodetected parallelism={parallelism} based on " - f"estimated_available_cpus={avail_cpus} and " - f"estimated_data_size={mem_size}." - ) - return parallelism, min_safe_parallelism - - -def _estimate_avail_cpus(cur_pg: Optional[PlacementGroup]) -> int: - cluster_cpus = int(ray.cluster_resources().get("CPU", 1)) - cluster_gpus = int(ray.cluster_resources().get("GPU", 0)) - - # If we're in a placement group, we shouldn't assume the entire cluster's - # resources are available for us to use. Estimate an upper bound on what's - # reasonable to assume is available for datasets to use. - if cur_pg: - pg_cpus = 0 - for bundle in cur_pg.bundle_specs: - # Calculate the proportion of the cluster this placement group "takes up". - # Then scale our cluster_cpus proportionally to avoid over-parallelizing - # if there are many parallel Tune trials using the cluster. - cpu_fraction = bundle.get("CPU", 0) / max(1, cluster_cpus) - gpu_fraction = bundle.get("GPU", 0) / max(1, cluster_gpus) - max_fraction = max(cpu_fraction, gpu_fraction) - # Over-parallelize by up to a factor of 2, but no more than that. It's - # preferrable to over-estimate than under-estimate. - pg_cpus += 2 * int(max_fraction * cluster_cpus) - - return min(cluster_cpus, pg_cpus) - - return cluster_cpus - - def _resolve_parquet_args( tensor_column_schema: Optional[Dict[str, Tuple[np.dtype, Tuple[int, ...]]]] = None, **arrow_parquet_args, diff --git a/python/ray/data/tests/test_auto_parallelism.py b/python/ray/data/tests/test_auto_parallelism.py index 99e6a576f..87729106d 100644 --- a/python/ray/data/tests/test_auto_parallelism.py +++ b/python/ray/data/tests/test_auto_parallelism.py @@ -1,10 +1,100 @@ import pytest +from dataclasses import dataclass, astuple import ray from ray.data.context import DatasetContext +from ray.data._internal.util import _autodetect_parallelism from ray.tests.conftest import * # noqa +@dataclass +class TestCase: + avail_cpus: int + data_size: int + expected_parallelism: int + + +MiB = 1024 * 1024 +GiB = 1024 * MiB + +TEST_CASES = [ + TestCase( + avail_cpus=4, + data_size=1024, + expected_parallelism=8, # avail_cpus has precedence + ), + TestCase( + avail_cpus=4, + data_size=10 * MiB, + expected_parallelism=10, # MIN_BLOCK_SIZE has precedence + ), + TestCase( + avail_cpus=4, + data_size=20 * MiB, + expected_parallelism=20, # MIN_BLOCK_SIZE has precedence + ), + TestCase( + avail_cpus=4, + data_size=100 * MiB, + expected_parallelism=100, # MIN_BLOCK_SIZE has precedence + ), + TestCase( + avail_cpus=4, + data_size=1 * GiB, + expected_parallelism=200, # MIN_PARALLELISM has precedence + ), + TestCase( + avail_cpus=4, + data_size=10 * GiB, + expected_parallelism=200, # MIN_PARALLELISM has precedence + ), + TestCase( + avail_cpus=150, + data_size=10 * GiB, + expected_parallelism=300, # avail_cpus has precedence + ), + TestCase( + avail_cpus=400, + data_size=10 * GiB, + expected_parallelism=800, # avail_cpus has precedence + ), + TestCase( + avail_cpus=400, + data_size=1 * MiB, + expected_parallelism=800, # avail_cpus has precedence + ), + TestCase( + avail_cpus=4, + data_size=1000 * GiB, + expected_parallelism=2000, # MAX_BLOCK_SIZE has precedence + ), + TestCase( + avail_cpus=4, + data_size=10000 * GiB, + expected_parallelism=20000, # MAX_BLOCK_SIZE has precedence + ), +] + + +@pytest.mark.parametrize( + "avail_cpus,data_size,expected", + [astuple(test) for test in TEST_CASES], +) +def test_autodetect_parallelism(avail_cpus, data_size, expected): + class MockReader: + def estimate_inmemory_data_size(self): + return data_size + + result, _ = _autodetect_parallelism( + parallelism=-1, + cur_pg=None, + ctx=DatasetContext.get_current(), + reader=MockReader(), + avail_cpus=avail_cpus, + ) + assert result == expected, (result, expected) + + def test_auto_parallelism_basic(shutdown_only): ray.init(num_cpus=8) context = DatasetContext.get_current() diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index c5cd08ae4..6af867e33 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -146,7 +146,7 @@ def test_avoid_placement_group_capture(shutdown_only, pipelined): def test_callable_classes(shutdown_only): ray.init(num_cpus=1) - ds = ray.data.range(10) + ds = ray.data.range(10, parallelism=10) class StatefulFn: def __init__(self): @@ -329,8 +329,8 @@ def test_basic(ray_start_regular_shared, pipelined): def test_zip(ray_start_regular_shared): - ds1 = ray.data.range(5) - ds2 = ray.data.range(5).map(lambda x: x + 1) + ds1 = ray.data.range(5, parallelism=5) + ds2 = ray.data.range(5, parallelism=5).map(lambda x: x + 1) ds = ds1.zip(ds2) assert ds.schema() == tuple assert ds.take() == [(0, 1), (1, 2), (2, 3), (3, 4), (4, 5)] @@ -447,7 +447,7 @@ def test_arrow_block_slice_copy_empty(): def test_range_table(ray_start_regular_shared): - ds = ray.data.range_table(10) + ds = ray.data.range_table(10, parallelism=10) assert ds.num_blocks() == 10 assert ds.count() == 10 assert ds.take() == [{"value": i} for i in range(10)] @@ -601,7 +601,7 @@ def test_tensor_array_boolean_slice_pandas_roundtrip(init_with_pandas, test_data def test_tensors_basic(ray_start_regular_shared): # Create directly. tensor_shape = (3, 5) - ds = ray.data.range_tensor(6, shape=tensor_shape) + ds = ray.data.range_tensor(6, shape=tensor_shape, parallelism=6) assert str(ds) == ( "Dataset(num_blocks=6, num_rows=6, " "schema={__value__: })" @@ -792,7 +792,7 @@ def test_tensors_sort(ray_start_regular_shared): def test_tensors_inferred_from_map(ray_start_regular_shared): # Test map. - ds = ray.data.range(10).map(lambda _: np.ones((4, 4))) + ds = ray.data.range(10, parallelism=10).map(lambda _: np.ones((4, 4))) assert str(ds) == ( "Dataset(num_blocks=10, num_rows=10, " "schema={__value__: })" @@ -808,7 +808,9 @@ def test_tensors_inferred_from_map(ray_start_regular_shared): ) # Test flat_map. - ds = ray.data.range(10).flat_map(lambda _: [np.ones((4, 4)), np.ones((4, 4))]) + ds = ray.data.range(10, parallelism=10).flat_map( + lambda _: [np.ones((4, 4)), np.ones((4, 4))] + ) assert str(ds) == ( "Dataset(num_blocks=10, num_rows=20, " "schema={__value__: })" @@ -1288,8 +1290,8 @@ def test_empty_dataset(ray_start_regular_shared): def test_schema(ray_start_regular_shared): - ds = ray.data.range(10) - ds2 = ray.data.range_table(10) + ds = ray.data.range(10, parallelism=10) + ds2 = ray.data.range_table(10, parallelism=10) ds3 = ds2.repartition(5) ds4 = ds3.map(lambda x: {"a": "hi", "b": 1.0}).limit(5).repartition(1) assert str(ds) == "Dataset(num_blocks=10, num_rows=10, schema=)" @@ -4243,7 +4245,7 @@ def test_actor_pool_strategy_apply_interrupt(shutdown_only): ray.init(include_dashboard=False, num_cpus=1) cpus = ray.available_resources()["CPU"] - ds = ray.data.range(5) + ds = ray.data.range(5, parallelism=5) aps = ray.data.ActorPoolStrategy(max_size=5) blocks = ds._plan.execute() diff --git a/python/ray/data/tests/test_dataset_formats.py b/python/ray/data/tests/test_dataset_formats.py index e986665cf..5b2553160 100644 --- a/python/ray/data/tests/test_dataset_formats.py +++ b/python/ray/data/tests/test_dataset_formats.py @@ -268,7 +268,7 @@ def test_to_arrow_refs(ray_start_regular_shared): def test_get_internal_block_refs(ray_start_regular_shared): - blocks = ray.data.range(10).get_internal_block_refs() + blocks = ray.data.range(10, parallelism=10).get_internal_block_refs() assert len(blocks) == 10 out = [] for b in ray.get(blocks): @@ -2517,6 +2517,7 @@ def test_csv_read_partitioned_with_filter_multikey( data_path, partition_filter=partition_path_filter, filesystem=fs, + parallelism=100, ) assert_base_partitioned_ds(ds, num_input_files=6, num_computed=6) assert ray.get(kept_file_counter.get.remote()) == 6 diff --git a/python/ray/data/tests/test_dataset_pipeline.py b/python/ray/data/tests/test_dataset_pipeline.py index c49f0ffd0..436aabccc 100644 --- a/python/ray/data/tests/test_dataset_pipeline.py +++ b/python/ray/data/tests/test_dataset_pipeline.py @@ -178,7 +178,7 @@ def test_cannot_read_twice(ray_start_regular_shared): def test_basic_pipeline(ray_start_regular_shared): context = DatasetContext.get_current() context.optimize_fuse_stages = True - ds = ray.data.range(10) + ds = ray.data.range(10, parallelism=10) pipe = ds.window(blocks_per_window=1) assert str(pipe) == "DatasetPipeline(num_windows=10, num_stages=2)" @@ -202,7 +202,7 @@ def test_basic_pipeline(ray_start_regular_shared): def test_window(ray_start_regular_shared): context = DatasetContext.get_current() context.optimize_fuse_stages = True - ds = ray.data.range(10) + ds = ray.data.range(10, parallelism=10) pipe = ds.window(blocks_per_window=1) assert str(pipe) == "DatasetPipeline(num_windows=10, num_stages=2)" pipe = pipe.rewindow(blocks_per_window=3) @@ -214,7 +214,7 @@ def test_window(ray_start_regular_shared): assert datasets[2].take() == [6, 7, 8] assert datasets[3].take() == [9] - ds = ray.data.range(10) + ds = ray.data.range(10, parallelism=10) pipe = ds.window(blocks_per_window=5) assert str(pipe) == "DatasetPipeline(num_windows=2, num_stages=2)" pipe = pipe.rewindow(blocks_per_window=3) @@ -230,7 +230,7 @@ def test_window(ray_start_regular_shared): def test_repeat(ray_start_regular_shared): context = DatasetContext.get_current() context.optimize_fuse_stages = True - ds = ray.data.range(5) + ds = ray.data.range(5, parallelism=5) pipe = ds.window(blocks_per_window=1) assert str(pipe) == "DatasetPipeline(num_windows=5, num_stages=2)" pipe = pipe.repeat(2) @@ -277,7 +277,7 @@ def test_repartition(ray_start_regular_shared): def test_iter_batches_basic(ray_start_regular_shared): - pipe = ray.data.range(10).window(blocks_per_window=2) + pipe = ray.data.range(10, parallelism=10).window(blocks_per_window=2) batches = list(pipe.iter_batches()) assert len(batches) == 10 assert all(len(e) == 1 for e in batches) @@ -294,11 +294,11 @@ def test_iter_batches_batch_across_windows(ray_start_regular_shared): def test_iter_datasets(ray_start_regular_shared): - pipe = ray.data.range(10).window(blocks_per_window=2) + pipe = ray.data.range(10, parallelism=10).window(blocks_per_window=2) ds = list(pipe.iter_datasets()) assert len(ds) == 5 - pipe = ray.data.range(10).window(blocks_per_window=5) + pipe = ray.data.range(10, parallelism=10).window(blocks_per_window=5) ds = list(pipe.iter_datasets()) assert len(ds) == 2 @@ -316,7 +316,7 @@ def test_schema(ray_start_regular_shared): def test_schema_peek(ray_start_regular_shared): # Multiple datasets - pipe = ray.data.range(6).window(blocks_per_window=2) + pipe = ray.data.range(6, parallelism=6).window(blocks_per_window=2) assert pipe.schema() == int assert pipe._first_dataset is not None dss = list(pipe.iter_datasets()) @@ -334,7 +334,11 @@ def test_schema_peek(ray_start_regular_shared): assert pipe.schema() == int # Empty datasets - pipe = ray.data.range(6).filter(lambda x: x < 0).window(blocks_per_window=2) + pipe = ( + ray.data.range(6, parallelism=6) + .filter(lambda x: x < 0) + .window(blocks_per_window=2) + ) assert pipe.schema() is None assert pipe._first_dataset is not None dss = list(pipe.iter_datasets())