Signed-off-by: Clarence Ng <clarence.wyng@gmail.com>
This commit is contained in:
Clarence Ng 2022-08-15 20:47:17 -07:00
parent be4e7a7d89
commit d48fa5c972
6 changed files with 11 additions and 14 deletions

View file

@ -66,7 +66,7 @@ DEFAULT_SCHEDULING_STRATEGY = "DEFAULT"
DEFAULT_USE_POLARS = False
# Whether to estimate in-memory decoding data size for data source.
DEFAULT_DECODING_SIZE_ESTIMATION_ENABLED = False
DEFAULT_DECODING_SIZE_ESTIMATION_ENABLED = True
# Whether to automatically cast NumPy ndarray columns in Pandas DataFrames to tensor
# extension columns.

View file

@ -61,8 +61,8 @@ PARQUET_ENCODING_RATIO_ESTIMATE_SAMPLING_RATIO = 0.01
# Parquet encoding ratio.
# This is to restrict `PARQUET_ENCODING_RATIO_ESTIMATE_SAMPLING_RATIO` within the
# proper boundary.
PARQUET_ENCODING_RATIO_ESTIMATE_MIN_NUM_SAMPLES = 2
PARQUET_ENCODING_RATIO_ESTIMATE_MAX_NUM_SAMPLES = 10
PARQUET_ENCODING_RATIO_ESTIMATE_MIN_NUM_SAMPLES = 4
PARQUET_ENCODING_RATIO_ESTIMATE_MAX_NUM_SAMPLES = 20
# The number of rows to read from each file for sampling. Try to keep it low to avoid
# reading too much data into memory.
@ -316,11 +316,7 @@ class _ParquetDatasourceReader(Reader):
# Sample i-th row group in i-th file.
# Use SPREAD scheduling strategy to avoid packing many sampling tasks on
# same machine to cause OOM issue, as sampling can be memory-intensive.
futures.append(
sample_piece.options(scheduling_strategy="SPREAD").remote(
_SerializedPiece(sample), idx
)
)
futures.append(sample_piece.remote(_SerializedPiece(sample), idx))
sample_ratios = ray.get(futures)
ratio = np.mean(sample_ratios)

View file

@ -1,7 +1,7 @@
cloud_id: {{env["ANYSCALE_CLOUD_ID"]}}
region: us-west-2
max_workers: 19
max_workers: 2
head_node_type:
name: head_node
@ -10,6 +10,6 @@ head_node_type:
worker_node_types:
- name: worker_node
instance_type: m5.4xlarge
max_workers: 19
min_workers: 19
max_workers: 2
min_workers: 2
use_spot: false

View file

@ -13,7 +13,7 @@ import pandas as pd
GiB = 1024 * 1024 * 1024
@ray.remote(num_cpus=0.5)
@ray.remote
class ConsumingActor:
def __init__(self, rank):
self._rank = rank

View file

@ -43,6 +43,7 @@ class MinimalClusterManager(ClusterManager):
paging_token = result.metadata.next_paging_token
for res in result.results:
print(res.__dict__)
if res.name == self.cluster_env_name:
self.cluster_env_id = res.id
logger.info(

View file

@ -171,7 +171,7 @@
script: python workloads/data_benchmark.py --dataset-size-gb=200 --num-workers=20
wait_for_nodes:
num_nodes: 20
num_nodes: 2
type: sdk_command
file_manager: job
@ -4226,7 +4226,7 @@
cluster_compute: data_ingest_benchmark_compute.yaml
run:
timeout: 300
timeout: 900
script: python data_ingest_benchmark.py --dataset-size-gb=200 --num-workers=20 --streaming
wait_for_nodes:
num_nodes: 20