diff --git a/python/ray/data/context.py b/python/ray/data/context.py index 7b991513e..f965a8a3e 100644 --- a/python/ray/data/context.py +++ b/python/ray/data/context.py @@ -66,7 +66,7 @@ DEFAULT_SCHEDULING_STRATEGY = "DEFAULT" DEFAULT_USE_POLARS = False # Whether to estimate in-memory decoding data size for data source. -DEFAULT_DECODING_SIZE_ESTIMATION_ENABLED = False +DEFAULT_DECODING_SIZE_ESTIMATION_ENABLED = True # Whether to automatically cast NumPy ndarray columns in Pandas DataFrames to tensor # extension columns. diff --git a/python/ray/data/datasource/parquet_datasource.py b/python/ray/data/datasource/parquet_datasource.py index 56ec9913b..1fc20c112 100644 --- a/python/ray/data/datasource/parquet_datasource.py +++ b/python/ray/data/datasource/parquet_datasource.py @@ -1,11 +1,9 @@ import itertools import logging -import time from typing import TYPE_CHECKING, Callable, Iterator, List, Optional, Union import numpy as np -import ray from ray.data._internal.output_buffer import BlockOutputBuffer from ray.data._internal.progress_bar import ProgressBar from ray.data._internal.remote_fn import cached_remote_fn @@ -292,7 +290,6 @@ class _ParquetDatasourceReader(Reader): # Launch tasks to sample multiple files remotely in parallel. # Evenly distributed to sample N rows in i-th row group in i-th file. # TODO(ekl/cheng) take into account column pruning. - start_time = time.perf_counter() num_files = len(self._pq_ds.pieces) num_samples = int(num_files * PARQUET_ENCODING_RATIO_ESTIMATE_SAMPLING_RATIO) min_num_samples = min( @@ -321,15 +318,10 @@ class _ParquetDatasourceReader(Reader): _SerializedPiece(sample), idx ) ) - sample_ratios = ray.get(futures) + sample_bar = ProgressBar("Parquet Files Sample", len(futures)) + sample_ratios = sample_bar.fetch_until_complete(futures) + sample_bar.close() ratio = np.mean(sample_ratios) - - sampling_duration = time.perf_counter() - start_time - if sampling_duration > 5: - logger.info( - "Parquet input size estimation took " - f"{round(sampling_duration, 2)} seconds." - ) logger.debug(f"Estimated Parquet encoding ratio from sampling is {ratio}.") return max(ratio, PARQUET_ENCODING_RATIO_ESTIMATE_LOWER_BOUND)