[datasets] Use generators for merge stage in push-based shuffle (#25907)

This commit is contained in:
Stephanie Wang 2022-06-18 19:33:54 -04:00 committed by GitHub
parent 97582a802d
commit 3de4657cae
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 90 additions and 28 deletions

View file

@ -555,16 +555,29 @@ class PushBasedShufflePlan(ShuffleOp):
len({len(mapper_outputs) for mapper_outputs in all_mapper_outputs}) == 1 len({len(mapper_outputs) for mapper_outputs in all_mapper_outputs}) == 1
), "Received different number of map inputs" ), "Received different number of map inputs"
stats = BlockExecStats.builder() stats = BlockExecStats.builder()
merged_outputs = []
if not reduce_args: if not reduce_args:
reduce_args = [] reduce_args = []
for mapper_outputs in zip(*all_mapper_outputs):
num_rows = 0
size_bytes = 0
schema = None
for i, mapper_outputs in enumerate(zip(*all_mapper_outputs)):
block, meta = reduce_fn(*reduce_args, *mapper_outputs) block, meta = reduce_fn(*reduce_args, *mapper_outputs)
merged_outputs.append(block) yield block
meta = BlockAccessor.for_block(block).get_metadata(
input_files=None, exec_stats=stats.build() block = BlockAccessor.for_block(block)
num_rows += block.num_rows()
size_bytes += block.size_bytes()
schema = block.schema()
del block
yield BlockMetadata(
num_rows=num_rows,
size_bytes=size_bytes,
schema=schema,
input_files=None,
exec_stats=stats.build(),
) )
return merged_outputs + [meta]
@staticmethod @staticmethod
def _compute_shuffle_schedule( def _compute_shuffle_schedule(

View file

@ -1,13 +1,14 @@
from contextlib import contextmanager
from typing import List, Optional, Set, Dict, Tuple, Union
import time
import collections import collections
import time
from contextlib import contextmanager
from typing import Dict, List, Optional, Set, Tuple, Union
import numpy as np import numpy as np
import ray import ray
from ray.data._internal.block_list import BlockList
from ray.data.block import BlockMetadata from ray.data.block import BlockMetadata
from ray.data.context import DatasetContext from ray.data.context import DatasetContext
from ray.data._internal.block_list import BlockList
def fmt(seconds: float) -> str: def fmt(seconds: float) -> str:
@ -315,6 +316,14 @@ class DatasetStats:
fmt(sum([e.cpu_time_s for e in exec_stats])), fmt(sum([e.cpu_time_s for e in exec_stats])),
) )
out += indent
memory_stats = [round(e.max_rss_bytes / 1024 * 1024, 2) for e in exec_stats]
out += "* Peak heap memory usage (MiB): {} min, {} max, {} mean\n".format(
min(memory_stats),
max(memory_stats),
int(np.mean(memory_stats)),
)
output_num_rows = [m.num_rows for m in blocks if m.num_rows is not None] output_num_rows = [m.num_rows for m in blocks if m.num_rows is not None]
if output_num_rows: if output_num_rows:
out += indent out += indent

View file

@ -1,32 +1,43 @@
from dataclasses import dataclass import os
import time import time
from dataclasses import dataclass
from typing import ( from typing import (
TypeVar, TYPE_CHECKING,
List, Any,
Callable,
Dict, Dict,
Generic, Generic,
Iterator, Iterator,
Tuple, List,
Any,
Union,
Optional, Optional,
Callable, Tuple,
TYPE_CHECKING, TypeVar,
Union,
) )
import numpy as np import numpy as np
import ray
from ray.data._internal.util import _check_pyarrow_version
from ray.types import ObjectRef
from ray.util.annotations import DeveloperAPI
import psutil
try:
import resource
except ImportError:
resource = None
if TYPE_CHECKING: if TYPE_CHECKING:
import pandas import pandas
import pyarrow import pyarrow
from ray.data import Dataset
from ray.data._internal.block_builder import BlockBuilder from ray.data._internal.block_builder import BlockBuilder
from ray.data.aggregate import AggregateFn from ray.data.aggregate import AggregateFn
from ray.data import Dataset
import ray
from ray.types import ObjectRef
from ray.util.annotations import DeveloperAPI
from ray.data._internal.util import _check_pyarrow_version
T = TypeVar("T") T = TypeVar("T")
U = TypeVar("U") U = TypeVar("U")
@ -116,6 +127,9 @@ class BlockExecStats:
self.wall_time_s: Optional[float] = None self.wall_time_s: Optional[float] = None
self.cpu_time_s: Optional[float] = None self.cpu_time_s: Optional[float] = None
self.node_id = ray.runtime_context.get_runtime_context().node_id.hex() self.node_id = ray.runtime_context.get_runtime_context().node_id.hex()
# Max memory usage. May be an overestimate since we do not
# differentiate from previous tasks on the same worker.
self.max_rss_bytes: int = 0
@staticmethod @staticmethod
def builder() -> "_BlockExecStatsBuilder": def builder() -> "_BlockExecStatsBuilder":
@ -146,6 +160,16 @@ class _BlockExecStatsBuilder:
stats = BlockExecStats() stats = BlockExecStats()
stats.wall_time_s = time.perf_counter() - self.start_time stats.wall_time_s = time.perf_counter() - self.start_time
stats.cpu_time_s = time.process_time() - self.start_cpu stats.cpu_time_s = time.process_time() - self.start_cpu
if resource is None:
# NOTE(swang): resource package is not supported on Windows. This
# is only the memory usage at the end of the task, not the peak
# memory.
process = psutil.Process(os.getpid())
stats.max_rss_bytes = int(process.memory_info().rss)
else:
stats.max_rss_bytes = int(
resource.getrusage(resource.RUSAGE_SELF).ru_maxrss * 1e3
)
return stats return stats
@ -304,8 +328,8 @@ class BlockAccessor(Generic[T]):
def for_block(block: Block) -> "BlockAccessor[T]": def for_block(block: Block) -> "BlockAccessor[T]":
"""Create a block accessor for the given block.""" """Create a block accessor for the given block."""
_check_pyarrow_version() _check_pyarrow_version()
import pyarrow
import pandas import pandas
import pyarrow
if isinstance(block, pyarrow.Table): if isinstance(block, pyarrow.Table):
from ray.data._internal.arrow_block import ArrowBlockAccessor from ray.data._internal.arrow_block import ArrowBlockAccessor

View file

@ -1,6 +1,7 @@
import pytest
import re import re
import pytest
import ray import ray
from ray.data.context import DatasetContext from ray.data.context import DatasetContext
from ray.tests.conftest import * # noqa from ray.tests.conftest import * # noqa
@ -32,6 +33,7 @@ def test_dataset_stats_basic(ray_start_regular_shared):
== """Stage N read->map_batches: N/N blocks executed in T == """Stage N read->map_batches: N/N blocks executed in T
* Remote wall time: T min, T max, T mean, T total * Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total
* Peak heap memory usage (MiB): N min, N max, N mean
* Output num rows: N min, N max, N mean, N total * Output num rows: N min, N max, N mean, N total
* Output size bytes: N min, N max, N mean, N total * Output size bytes: N min, N max, N mean, N total
* Tasks per node: N min, N max, N mean; N nodes used * Tasks per node: N min, N max, N mean; N nodes used
@ -39,6 +41,7 @@ def test_dataset_stats_basic(ray_start_regular_shared):
Stage N map: N/N blocks executed in T Stage N map: N/N blocks executed in T
* Remote wall time: T min, T max, T mean, T total * Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total
* Peak heap memory usage (MiB): N min, N max, N mean
* Output num rows: N min, N max, N mean, N total * Output num rows: N min, N max, N mean, N total
* Output size bytes: N min, N max, N mean, N total * Output size bytes: N min, N max, N mean, N total
* Tasks per node: N min, N max, N mean; N nodes used * Tasks per node: N min, N max, N mean; N nodes used
@ -66,6 +69,7 @@ def test_dataset_stats_shuffle(ray_start_regular_shared):
Substage Z read->random_shuffle_map: N/N blocks executed Substage Z read->random_shuffle_map: N/N blocks executed
* Remote wall time: T min, T max, T mean, T total * Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total
* Peak heap memory usage (MiB): N min, N max, N mean
* Output num rows: N min, N max, N mean, N total * Output num rows: N min, N max, N mean, N total
* Output size bytes: N min, N max, N mean, N total * Output size bytes: N min, N max, N mean, N total
* Tasks per node: N min, N max, N mean; N nodes used * Tasks per node: N min, N max, N mean; N nodes used
@ -73,6 +77,7 @@ def test_dataset_stats_shuffle(ray_start_regular_shared):
Substage N random_shuffle_reduce: N/N blocks executed Substage N random_shuffle_reduce: N/N blocks executed
* Remote wall time: T min, T max, T mean, T total * Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total
* Peak heap memory usage (MiB): N min, N max, N mean
* Output num rows: N min, N max, N mean, N total * Output num rows: N min, N max, N mean, N total
* Output size bytes: N min, N max, N mean, N total * Output size bytes: N min, N max, N mean, N total
* Tasks per node: N min, N max, N mean; N nodes used * Tasks per node: N min, N max, N mean; N nodes used
@ -82,6 +87,7 @@ Stage N repartition: executed in T
Substage Z repartition_map: N/N blocks executed Substage Z repartition_map: N/N blocks executed
* Remote wall time: T min, T max, T mean, T total * Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total
* Peak heap memory usage (MiB): N min, N max, N mean
* Output num rows: N min, N max, N mean, N total * Output num rows: N min, N max, N mean, N total
* Output size bytes: N min, N max, N mean, N total * Output size bytes: N min, N max, N mean, N total
* Tasks per node: N min, N max, N mean; N nodes used * Tasks per node: N min, N max, N mean; N nodes used
@ -89,6 +95,7 @@ Stage N repartition: executed in T
Substage N repartition_reduce: N/N blocks executed Substage N repartition_reduce: N/N blocks executed
* Remote wall time: T min, T max, T mean, T total * Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total
* Peak heap memory usage (MiB): N min, N max, N mean
* Output num rows: N min, N max, N mean, N total * Output num rows: N min, N max, N mean, N total
* Output size bytes: N min, N max, N mean, N total * Output size bytes: N min, N max, N mean, N total
* Tasks per node: N min, N max, N mean; N nodes used * Tasks per node: N min, N max, N mean; N nodes used
@ -143,6 +150,7 @@ def test_dataset_stats_read_parquet(ray_start_regular_shared, tmp_path):
== """Stage N read->map: N/N blocks executed in T == """Stage N read->map: N/N blocks executed in T
* Remote wall time: T min, T max, T mean, T total * Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total
* Peak heap memory usage (MiB): N min, N max, N mean
* Output num rows: N min, N max, N mean, N total * Output num rows: N min, N max, N mean, N total
* Output size bytes: N min, N max, N mean, N total * Output size bytes: N min, N max, N mean, N total
* Tasks per node: N min, N max, N mean; N nodes used * Tasks per node: N min, N max, N mean; N nodes used
@ -161,6 +169,7 @@ def test_dataset_split_stats(ray_start_regular_shared, tmp_path):
== """Stage N read->map: N/N blocks executed in T == """Stage N read->map: N/N blocks executed in T
* Remote wall time: T min, T max, T mean, T total * Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total
* Peak heap memory usage (MiB): N min, N max, N mean
* Output num rows: N min, N max, N mean, N total * Output num rows: N min, N max, N mean, N total
* Output size bytes: N min, N max, N mean, N total * Output size bytes: N min, N max, N mean, N total
* Tasks per node: N min, N max, N mean; N nodes used * Tasks per node: N min, N max, N mean; N nodes used
@ -172,6 +181,7 @@ Stage N split: N/N blocks split from parent in T
Stage N map: N/N blocks executed in T Stage N map: N/N blocks executed in T
* Remote wall time: T min, T max, T mean, T total * Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total
* Peak heap memory usage (MiB): N min, N max, N mean
* Output num rows: N min, N max, N mean, N total * Output num rows: N min, N max, N mean, N total
* Output size bytes: N min, N max, N mean, N total * Output size bytes: N min, N max, N mean, N total
* Tasks per node: N min, N max, N mean; N nodes used * Tasks per node: N min, N max, N mean; N nodes used
@ -195,6 +205,7 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared):
Stage N read->map_batches: N/N blocks executed in T Stage N read->map_batches: N/N blocks executed in T
* Remote wall time: T min, T max, T mean, T total * Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total
* Peak heap memory usage (MiB): N min, N max, N mean
* Output num rows: N min, N max, N mean, N total * Output num rows: N min, N max, N mean, N total
* Output size bytes: N min, N max, N mean, N total * Output size bytes: N min, N max, N mean, N total
* Tasks per node: N min, N max, N mean; N nodes used * Tasks per node: N min, N max, N mean; N nodes used
@ -202,6 +213,7 @@ Stage N read->map_batches: N/N blocks executed in T
Stage N map: N/N blocks executed in T Stage N map: N/N blocks executed in T
* Remote wall time: T min, T max, T mean, T total * Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total
* Peak heap memory usage (MiB): N min, N max, N mean
* Output num rows: N min, N max, N mean, N total * Output num rows: N min, N max, N mean, N total
* Output size bytes: N min, N max, N mean, N total * Output size bytes: N min, N max, N mean, N total
* Tasks per node: N min, N max, N mean; N nodes used * Tasks per node: N min, N max, N mean; N nodes used
@ -212,6 +224,7 @@ Stage N read->map_batches: [execution cached]
Stage N map: N/N blocks executed in T Stage N map: N/N blocks executed in T
* Remote wall time: T min, T max, T mean, T total * Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total
* Peak heap memory usage (MiB): N min, N max, N mean
* Output num rows: N min, N max, N mean, N total * Output num rows: N min, N max, N mean, N total
* Output size bytes: N min, N max, N mean, N total * Output size bytes: N min, N max, N mean, N total
* Tasks per node: N min, N max, N mean; N nodes used * Tasks per node: N min, N max, N mean; N nodes used
@ -222,6 +235,7 @@ Stage N read->map_batches: [execution cached]
Stage N map: N/N blocks executed in T Stage N map: N/N blocks executed in T
* Remote wall time: T min, T max, T mean, T total * Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total
* Peak heap memory usage (MiB): N min, N max, N mean
* Output num rows: N min, N max, N mean, N total * Output num rows: N min, N max, N mean, N total
* Output size bytes: N min, N max, N mean, N total * Output size bytes: N min, N max, N mean, N total
* Tasks per node: N min, N max, N mean; N nodes used * Tasks per node: N min, N max, N mean; N nodes used
@ -281,6 +295,7 @@ def test_dataset_pipeline_split_stats_basic(ray_start_regular_shared):
Stage N read: N/N blocks executed in T Stage N read: N/N blocks executed in T
* Remote wall time: T min, T max, T mean, T total * Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total
* Peak heap memory usage (MiB): N min, N max, N mean
* Output num rows: N min, N max, N mean, N total * Output num rows: N min, N max, N mean, N total
* Output size bytes: N min, N max, N mean, N total * Output size bytes: N min, N max, N mean, N total
* Tasks per node: N min, N max, N mean; N nodes used * Tasks per node: N min, N max, N mean; N nodes used
@ -289,6 +304,7 @@ Stage N read: N/N blocks executed in T
Stage N read: N/N blocks executed in T Stage N read: N/N blocks executed in T
* Remote wall time: T min, T max, T mean, T total * Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total
* Peak heap memory usage (MiB): N min, N max, N mean
* Output num rows: N min, N max, N mean, N total * Output num rows: N min, N max, N mean, N total
* Output size bytes: N min, N max, N mean, N total * Output size bytes: N min, N max, N mean, N total
* Tasks per node: N min, N max, N mean; N nodes used * Tasks per node: N min, N max, N mean; N nodes used

View file

@ -4,17 +4,17 @@ import logging
import os import os
import sys import sys
import time import time
import pytest import pytest
import ray
import ray.cluster_utils import ray.cluster_utils
from ray._private.test_utils import ( from ray._private.test_utils import (
wait_for_pid_to_exit,
client_test_enabled, client_test_enabled,
run_string_as_driver, run_string_as_driver,
wait_for_pid_to_exit,
) )
import ray
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)