[data] Avoid under-parallelization regressions and add better testing for parallelism detection (#26543)

In the previous PR #25883, a subtle regression was introduced in the case where data sizes blow up significantly.

For example, suppose you're reading jpeg-image files from a Dataset, which increase in size substantially on decompression. On a small-core cluster (e.g., 4 cores), you end up with 4-8 blocks of ~200MiB each when reading a 1GiB dataset. This can blow up to OOM the node when decompressed (e.g., 25x size increase).

Previously the heuristic to use parallelism=200 avoids this small-node problem. This PR avoids this issue by (1) raising the min parallelism back to 200. As an optimization, we also introduce the min block size threshold, which allows using fewer blocks if the data size is really small (<100KiB per block).
This commit is contained in:
Eric Liang 2022-07-14 13:02:52 -07:00 committed by GitHub
parent e42dc7943e
commit 40be6904a5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 238 additions and 82 deletions

View file

@ -1,6 +1,14 @@
import logging
from typing import Union
from typing import Union, Optional, TYPE_CHECKING
from types import ModuleType
import sys
import ray
from ray.data.context import DatasetContext
if TYPE_CHECKING:
from ray.data.datasource import Reader
from ray.util.placement_group import PlacementGroup
logger = logging.getLogger(__name__)
@ -51,3 +59,96 @@ def _check_pyarrow_version():
)
else:
_VERSION_VALIDATED = True
def _autodetect_parallelism(
parallelism: int,
cur_pg: Optional["PlacementGroup"],
ctx: DatasetContext,
reader: Optional["Reader"] = None,
avail_cpus: Optional[int] = None,
) -> (int, int):
"""Returns parallelism to use and the min safe parallelism to avoid OOMs.
This detects parallelism using the following heuristics, applied in order:
1) We start with the default parallelism of 200.
2) Min block size. If the parallelism would make blocks smaller than this
threshold, the parallelism is reduced to avoid the overhead of tiny blocks.
3) Max block size. If the parallelism would make blocks larger than this
threshold, the parallelism is increased to avoid OOMs during processing.
4) Available CPUs. If the parallelism cannot make use of all the available
CPUs in the cluster, the parallelism is increased until it can.
Args:
parallelism: The user-requested parallelism, or -1 for auto-detection.
cur_pg: The current placement group, to be used for avail cpu calculation.
ctx: The current Dataset context to use for configs.
reader: The datasource reader, to be used for data size estimation.
avail_cpus: Override avail cpus detection (for testing only).
Returns:
Tuple of detected parallelism (only if -1 was specified), and the min safe
parallelism (which can be used to generate warnings about large blocks).
"""
min_safe_parallelism = 1
max_reasonable_parallelism = sys.maxsize
if reader:
mem_size = reader.estimate_inmemory_data_size()
if mem_size is not None:
min_safe_parallelism = max(1, int(mem_size / ctx.target_max_block_size))
max_reasonable_parallelism = max(
1, int(mem_size / ctx.target_min_block_size)
)
else:
mem_size = None
if parallelism < 0:
if parallelism != -1:
raise ValueError("`parallelism` must either be -1 or a positive integer.")
# Start with 2x the number of cores as a baseline, with a min floor.
avail_cpus = avail_cpus or _estimate_avail_cpus(cur_pg)
parallelism = max(
min(ctx.min_parallelism, max_reasonable_parallelism),
min_safe_parallelism,
avail_cpus * 2,
)
logger.debug(
f"Autodetected parallelism={parallelism} based on "
f"estimated_available_cpus={avail_cpus} and "
f"estimated_data_size={mem_size}."
)
return parallelism, min_safe_parallelism
def _estimate_avail_cpus(cur_pg: Optional["PlacementGroup"]) -> int:
"""Estimates the available CPU parallelism for this Dataset in the cluster.
If we aren't in a placement group, this is trivially the number of CPUs in the
cluster. Otherwise, we try to calculate how large the placement group is relative
to the size of the cluster.
Args:
cur_pg: The current placement group, if any.
"""
cluster_cpus = int(ray.cluster_resources().get("CPU", 1))
cluster_gpus = int(ray.cluster_resources().get("GPU", 0))
# If we're in a placement group, we shouldn't assume the entire cluster's
# resources are available for us to use. Estimate an upper bound on what's
# reasonable to assume is available for datasets to use.
if cur_pg:
pg_cpus = 0
for bundle in cur_pg.bundle_specs:
# Calculate the proportion of the cluster this placement group "takes up".
# Then scale our cluster_cpus proportionally to avoid over-parallelizing
# if there are many parallel Tune trials using the cluster.
cpu_fraction = bundle.get("CPU", 0) / max(1, cluster_cpus)
gpu_fraction = bundle.get("GPU", 0) / max(1, cluster_gpus)
max_fraction = max(cpu_fraction, gpu_fraction)
# Over-parallelize by up to a factor of 2, but no more than that. It's
# preferrable to over-estimate than under-estimate.
pg_cpus += 2 * int(max_fraction * cluster_cpus)
return min(cluster_cpus, pg_cpus)
return cluster_cpus

View file

@ -11,8 +11,13 @@ _default_context: "Optional[DatasetContext]" = None
_context_lock = threading.Lock()
# The max target block size in bytes for reads and transformations.
# We choose 512MiB as 8x less than the typical memory:core ratio of 4:1.
DEFAULT_TARGET_MAX_BLOCK_SIZE = 512 * 1024 * 1024
# Datasets will avoid creating blocks smaller than this size in bytes on read.
# This takes precedence over DEFAULT_MIN_PARALLELISM.
DEFAULT_TARGET_MIN_BLOCK_SIZE = 1 * 1024 * 1024
# Whether block splitting is on by default
DEFAULT_BLOCK_SPLITTING_ENABLED = False
@ -33,8 +38,9 @@ DEFAULT_OPTIMIZE_FUSE_READ_STAGES = True
# Whether to furthermore fuse prior map tasks with shuffle stages.
DEFAULT_OPTIMIZE_FUSE_SHUFFLE_STAGES = True
# Minimum amount of parallelism to auto-detect for a dataset.
DEFAULT_MIN_PARALLELISM = 8
# Minimum amount of parallelism to auto-detect for a dataset. Note that the min
# block size config takes precedence over this.
DEFAULT_MIN_PARALLELISM = 200
# Wether to use actor based block prefetcher.
DEFAULT_ACTOR_PREFETCHER_ENABLED = True
@ -64,6 +70,7 @@ class DatasetContext:
block_owner: ray.actor.ActorHandle,
block_splitting_enabled: bool,
target_max_block_size: int,
target_min_block_size: int,
enable_pandas_block: bool,
optimize_fuse_stages: bool,
optimize_fuse_read_stages: bool,
@ -80,6 +87,7 @@ class DatasetContext:
self.block_owner = block_owner
self.block_splitting_enabled = block_splitting_enabled
self.target_max_block_size = target_max_block_size
self.target_min_block_size = target_min_block_size
self.enable_pandas_block = enable_pandas_block
self.optimize_fuse_stages = optimize_fuse_stages
self.optimize_fuse_read_stages = optimize_fuse_read_stages
@ -110,6 +118,7 @@ class DatasetContext:
block_owner=None,
block_splitting_enabled=DEFAULT_BLOCK_SPLITTING_ENABLED,
target_max_block_size=DEFAULT_TARGET_MAX_BLOCK_SIZE,
target_min_block_size=DEFAULT_TARGET_MIN_BLOCK_SIZE,
enable_pandas_block=DEFAULT_ENABLE_PANDAS_BLOCK,
optimize_fuse_stages=DEFAULT_OPTIMIZE_FUSE_STAGES,
optimize_fuse_read_stages=DEFAULT_OPTIMIZE_FUSE_READ_STAGES,

View file

@ -12,7 +12,10 @@ from ray.data._internal.lazy_block_list import LazyBlockList
from ray.data._internal.plan import ExecutionPlan
from ray.data._internal.remote_fn import cached_remote_fn
from ray.data._internal.stats import DatasetStats
from ray.data._internal.util import _lazy_import_pyarrow_dataset
from ray.data._internal.util import (
_lazy_import_pyarrow_dataset,
_autodetect_parallelism,
)
from ray.data.block import Block, BlockAccessor, BlockExecStats, BlockMetadata
from ray.data.context import DEFAULT_SCHEDULING_STRATEGY, DatasetContext
from ray.data.dataset import Dataset
@ -31,7 +34,6 @@ from ray.data.datasource import (
ParquetMetadataProvider,
PathPartitionFilter,
RangeDatasource,
Reader,
ReadTask,
)
from ray.data.datasource.file_based_datasource import (
@ -79,7 +81,9 @@ def from_items(items: List[Any], *, parallelism: int = -1) -> Dataset[Any]:
"""
detected_parallelism, _ = _autodetect_parallelism(
parallelism, ray.util.get_current_placement_group()
parallelism,
ray.util.get_current_placement_group(),
DatasetContext.get_current(),
)
block_size = max(
1,
@ -1122,7 +1126,7 @@ def _get_read_tasks(
DatasetContext._set_current(ctx)
reader = ds.create_reader(**kwargs)
requested_parallelism, min_safe_parallelism = _autodetect_parallelism(
parallelism, cur_pg, reader
parallelism, cur_pg, DatasetContext.get_current(), reader
)
return (
requested_parallelism,
@ -1131,61 +1135,6 @@ def _get_read_tasks(
)
def _autodetect_parallelism(
parallelism: int, cur_pg: Optional[PlacementGroup], reader: Optional[Reader] = None
) -> (int, int):
"""Returns parallelism to use and the min safe parallelism to avoid OOMs."""
# Autodetect parallelism requested. The heuristic here are that we should try
# to create as many blocks needed to saturate available resources, and also keep
# block sizes below the target memory size, but no more. Creating too many
# blocks is inefficient.
min_safe_parallelism = 1
ctx = DatasetContext.get_current()
if reader:
mem_size = reader.estimate_inmemory_data_size()
if mem_size is not None:
min_safe_parallelism = max(1, int(mem_size / ctx.target_max_block_size))
else:
mem_size = None
if parallelism < 0:
if parallelism != -1:
raise ValueError("`parallelism` must either be -1 or a positive integer.")
# Start with 2x the number of cores as a baseline, with a min floor.
avail_cpus = _estimate_avail_cpus(cur_pg)
parallelism = max(ctx.min_parallelism, min_safe_parallelism, avail_cpus * 2)
logger.debug(
f"Autodetected parallelism={parallelism} based on "
f"estimated_available_cpus={avail_cpus} and "
f"estimated_data_size={mem_size}."
)
return parallelism, min_safe_parallelism
def _estimate_avail_cpus(cur_pg: Optional[PlacementGroup]) -> int:
cluster_cpus = int(ray.cluster_resources().get("CPU", 1))
cluster_gpus = int(ray.cluster_resources().get("GPU", 0))
# If we're in a placement group, we shouldn't assume the entire cluster's
# resources are available for us to use. Estimate an upper bound on what's
# reasonable to assume is available for datasets to use.
if cur_pg:
pg_cpus = 0
for bundle in cur_pg.bundle_specs:
# Calculate the proportion of the cluster this placement group "takes up".
# Then scale our cluster_cpus proportionally to avoid over-parallelizing
# if there are many parallel Tune trials using the cluster.
cpu_fraction = bundle.get("CPU", 0) / max(1, cluster_cpus)
gpu_fraction = bundle.get("GPU", 0) / max(1, cluster_gpus)
max_fraction = max(cpu_fraction, gpu_fraction)
# Over-parallelize by up to a factor of 2, but no more than that. It's
# preferrable to over-estimate than under-estimate.
pg_cpus += 2 * int(max_fraction * cluster_cpus)
return min(cluster_cpus, pg_cpus)
return cluster_cpus
def _resolve_parquet_args(
tensor_column_schema: Optional[Dict[str, Tuple[np.dtype, Tuple[int, ...]]]] = None,
**arrow_parquet_args,

View file

@ -1,10 +1,100 @@
import pytest
from dataclasses import dataclass, astuple
import ray
from ray.data.context import DatasetContext
from ray.data._internal.util import _autodetect_parallelism
from ray.tests.conftest import * # noqa
@dataclass
class TestCase:
avail_cpus: int
data_size: int
expected_parallelism: int
MiB = 1024 * 1024
GiB = 1024 * MiB
TEST_CASES = [
TestCase(
avail_cpus=4,
data_size=1024,
expected_parallelism=8, # avail_cpus has precedence
),
TestCase(
avail_cpus=4,
data_size=10 * MiB,
expected_parallelism=10, # MIN_BLOCK_SIZE has precedence
),
TestCase(
avail_cpus=4,
data_size=20 * MiB,
expected_parallelism=20, # MIN_BLOCK_SIZE has precedence
),
TestCase(
avail_cpus=4,
data_size=100 * MiB,
expected_parallelism=100, # MIN_BLOCK_SIZE has precedence
),
TestCase(
avail_cpus=4,
data_size=1 * GiB,
expected_parallelism=200, # MIN_PARALLELISM has precedence
),
TestCase(
avail_cpus=4,
data_size=10 * GiB,
expected_parallelism=200, # MIN_PARALLELISM has precedence
),
TestCase(
avail_cpus=150,
data_size=10 * GiB,
expected_parallelism=300, # avail_cpus has precedence
),
TestCase(
avail_cpus=400,
data_size=10 * GiB,
expected_parallelism=800, # avail_cpus has precedence
),
TestCase(
avail_cpus=400,
data_size=1 * MiB,
expected_parallelism=800, # avail_cpus has precedence
),
TestCase(
avail_cpus=4,
data_size=1000 * GiB,
expected_parallelism=2000, # MAX_BLOCK_SIZE has precedence
),
TestCase(
avail_cpus=4,
data_size=10000 * GiB,
expected_parallelism=20000, # MAX_BLOCK_SIZE has precedence
),
]
@pytest.mark.parametrize(
"avail_cpus,data_size,expected",
[astuple(test) for test in TEST_CASES],
)
def test_autodetect_parallelism(avail_cpus, data_size, expected):
class MockReader:
def estimate_inmemory_data_size(self):
return data_size
result, _ = _autodetect_parallelism(
parallelism=-1,
cur_pg=None,
ctx=DatasetContext.get_current(),
reader=MockReader(),
avail_cpus=avail_cpus,
)
assert result == expected, (result, expected)
def test_auto_parallelism_basic(shutdown_only):
ray.init(num_cpus=8)
context = DatasetContext.get_current()

View file

@ -146,7 +146,7 @@ def test_avoid_placement_group_capture(shutdown_only, pipelined):
def test_callable_classes(shutdown_only):
ray.init(num_cpus=1)
ds = ray.data.range(10)
ds = ray.data.range(10, parallelism=10)
class StatefulFn:
def __init__(self):
@ -329,8 +329,8 @@ def test_basic(ray_start_regular_shared, pipelined):
def test_zip(ray_start_regular_shared):
ds1 = ray.data.range(5)
ds2 = ray.data.range(5).map(lambda x: x + 1)
ds1 = ray.data.range(5, parallelism=5)
ds2 = ray.data.range(5, parallelism=5).map(lambda x: x + 1)
ds = ds1.zip(ds2)
assert ds.schema() == tuple
assert ds.take() == [(0, 1), (1, 2), (2, 3), (3, 4), (4, 5)]
@ -447,7 +447,7 @@ def test_arrow_block_slice_copy_empty():
def test_range_table(ray_start_regular_shared):
ds = ray.data.range_table(10)
ds = ray.data.range_table(10, parallelism=10)
assert ds.num_blocks() == 10
assert ds.count() == 10
assert ds.take() == [{"value": i} for i in range(10)]
@ -601,7 +601,7 @@ def test_tensor_array_boolean_slice_pandas_roundtrip(init_with_pandas, test_data
def test_tensors_basic(ray_start_regular_shared):
# Create directly.
tensor_shape = (3, 5)
ds = ray.data.range_tensor(6, shape=tensor_shape)
ds = ray.data.range_tensor(6, shape=tensor_shape, parallelism=6)
assert str(ds) == (
"Dataset(num_blocks=6, num_rows=6, "
"schema={__value__: <ArrowTensorType: shape=(3, 5), dtype=int64>})"
@ -792,7 +792,7 @@ def test_tensors_sort(ray_start_regular_shared):
def test_tensors_inferred_from_map(ray_start_regular_shared):
# Test map.
ds = ray.data.range(10).map(lambda _: np.ones((4, 4)))
ds = ray.data.range(10, parallelism=10).map(lambda _: np.ones((4, 4)))
assert str(ds) == (
"Dataset(num_blocks=10, num_rows=10, "
"schema={__value__: <ArrowTensorType: shape=(4, 4), dtype=double>})"
@ -808,7 +808,9 @@ def test_tensors_inferred_from_map(ray_start_regular_shared):
)
# Test flat_map.
ds = ray.data.range(10).flat_map(lambda _: [np.ones((4, 4)), np.ones((4, 4))])
ds = ray.data.range(10, parallelism=10).flat_map(
lambda _: [np.ones((4, 4)), np.ones((4, 4))]
)
assert str(ds) == (
"Dataset(num_blocks=10, num_rows=20, "
"schema={__value__: <ArrowTensorType: shape=(4, 4), dtype=double>})"
@ -1288,8 +1290,8 @@ def test_empty_dataset(ray_start_regular_shared):
def test_schema(ray_start_regular_shared):
ds = ray.data.range(10)
ds2 = ray.data.range_table(10)
ds = ray.data.range(10, parallelism=10)
ds2 = ray.data.range_table(10, parallelism=10)
ds3 = ds2.repartition(5)
ds4 = ds3.map(lambda x: {"a": "hi", "b": 1.0}).limit(5).repartition(1)
assert str(ds) == "Dataset(num_blocks=10, num_rows=10, schema=<class 'int'>)"
@ -4243,7 +4245,7 @@ def test_actor_pool_strategy_apply_interrupt(shutdown_only):
ray.init(include_dashboard=False, num_cpus=1)
cpus = ray.available_resources()["CPU"]
ds = ray.data.range(5)
ds = ray.data.range(5, parallelism=5)
aps = ray.data.ActorPoolStrategy(max_size=5)
blocks = ds._plan.execute()

View file

@ -268,7 +268,7 @@ def test_to_arrow_refs(ray_start_regular_shared):
def test_get_internal_block_refs(ray_start_regular_shared):
blocks = ray.data.range(10).get_internal_block_refs()
blocks = ray.data.range(10, parallelism=10).get_internal_block_refs()
assert len(blocks) == 10
out = []
for b in ray.get(blocks):
@ -2517,6 +2517,7 @@ def test_csv_read_partitioned_with_filter_multikey(
data_path,
partition_filter=partition_path_filter,
filesystem=fs,
parallelism=100,
)
assert_base_partitioned_ds(ds, num_input_files=6, num_computed=6)
assert ray.get(kept_file_counter.get.remote()) == 6

View file

@ -178,7 +178,7 @@ def test_cannot_read_twice(ray_start_regular_shared):
def test_basic_pipeline(ray_start_regular_shared):
context = DatasetContext.get_current()
context.optimize_fuse_stages = True
ds = ray.data.range(10)
ds = ray.data.range(10, parallelism=10)
pipe = ds.window(blocks_per_window=1)
assert str(pipe) == "DatasetPipeline(num_windows=10, num_stages=2)"
@ -202,7 +202,7 @@ def test_basic_pipeline(ray_start_regular_shared):
def test_window(ray_start_regular_shared):
context = DatasetContext.get_current()
context.optimize_fuse_stages = True
ds = ray.data.range(10)
ds = ray.data.range(10, parallelism=10)
pipe = ds.window(blocks_per_window=1)
assert str(pipe) == "DatasetPipeline(num_windows=10, num_stages=2)"
pipe = pipe.rewindow(blocks_per_window=3)
@ -214,7 +214,7 @@ def test_window(ray_start_regular_shared):
assert datasets[2].take() == [6, 7, 8]
assert datasets[3].take() == [9]
ds = ray.data.range(10)
ds = ray.data.range(10, parallelism=10)
pipe = ds.window(blocks_per_window=5)
assert str(pipe) == "DatasetPipeline(num_windows=2, num_stages=2)"
pipe = pipe.rewindow(blocks_per_window=3)
@ -230,7 +230,7 @@ def test_window(ray_start_regular_shared):
def test_repeat(ray_start_regular_shared):
context = DatasetContext.get_current()
context.optimize_fuse_stages = True
ds = ray.data.range(5)
ds = ray.data.range(5, parallelism=5)
pipe = ds.window(blocks_per_window=1)
assert str(pipe) == "DatasetPipeline(num_windows=5, num_stages=2)"
pipe = pipe.repeat(2)
@ -277,7 +277,7 @@ def test_repartition(ray_start_regular_shared):
def test_iter_batches_basic(ray_start_regular_shared):
pipe = ray.data.range(10).window(blocks_per_window=2)
pipe = ray.data.range(10, parallelism=10).window(blocks_per_window=2)
batches = list(pipe.iter_batches())
assert len(batches) == 10
assert all(len(e) == 1 for e in batches)
@ -294,11 +294,11 @@ def test_iter_batches_batch_across_windows(ray_start_regular_shared):
def test_iter_datasets(ray_start_regular_shared):
pipe = ray.data.range(10).window(blocks_per_window=2)
pipe = ray.data.range(10, parallelism=10).window(blocks_per_window=2)
ds = list(pipe.iter_datasets())
assert len(ds) == 5
pipe = ray.data.range(10).window(blocks_per_window=5)
pipe = ray.data.range(10, parallelism=10).window(blocks_per_window=5)
ds = list(pipe.iter_datasets())
assert len(ds) == 2
@ -316,7 +316,7 @@ def test_schema(ray_start_regular_shared):
def test_schema_peek(ray_start_regular_shared):
# Multiple datasets
pipe = ray.data.range(6).window(blocks_per_window=2)
pipe = ray.data.range(6, parallelism=6).window(blocks_per_window=2)
assert pipe.schema() == int
assert pipe._first_dataset is not None
dss = list(pipe.iter_datasets())
@ -334,7 +334,11 @@ def test_schema_peek(ray_start_regular_shared):
assert pipe.schema() == int
# Empty datasets
pipe = ray.data.range(6).filter(lambda x: x < 0).window(blocks_per_window=2)
pipe = (
ray.data.range(6, parallelism=6)
.filter(lambda x: x < 0)
.window(blocks_per_window=2)
)
assert pipe.schema() is None
assert pipe._first_dataset is not None
dss = list(pipe.iter_datasets())