Signed-off-by: Clarence Ng <clarence.wyng@gmail.com>
This commit is contained in:
Clarence Ng 2022-07-25 23:35:14 -07:00
parent ddf2f639a8
commit 649b846a8f
9 changed files with 4 additions and 177 deletions

View file

@ -281,10 +281,6 @@ class SerializationContext:
return OutOfDiskError(
object_ref.hex(), object_ref.owner_address(), object_ref.call_site()
)
elif error_type == ErrorType.Value("OUT_OF_MEMORY_ERROR"):
return OutOfDiskError(
object_ref.hex(), object_ref.owner_address(), object_ref.call_site()
)
elif error_type == ErrorType.Value("OBJECT_DELETED"):
return ReferenceCountingAssertionError(
object_ref.hex(), object_ref.owner_address(), object_ref.call_site()

View file

@ -1,9 +1,6 @@
import collections
import gc
import logging
import os
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, TypeVar, Union
import psutil
import ray
from ray.data._internal.block_list import BlockList
@ -445,11 +442,6 @@ def _map_block_nosplit(
*fn_args,
**fn_kwargs,
) -> Tuple[Block, BlockMetadata]:
gc.collect()
process = psutil.Process(os.getpid())
start_uss = int(process.memory_full_info().uss)
start_rss = int(process.memory_full_info().rss)
stats = BlockExecStats.builder()
builder = DelegatingBlockBuilder()
if fn is not None:
@ -458,22 +450,6 @@ def _map_block_nosplit(
builder.add_block(new_block)
new_block = builder.build()
accessor = BlockAccessor.for_block(new_block)
end_uss = int(process.memory_full_info().uss)
end_rss = int(process.memory_full_info().rss)
uss_delta = end_uss - start_uss
available = psutil.virtual_memory().available
print(f'delta {uss_delta} end {end_uss} end-rss {end_rss} av {available} pid {os.getpid()}')
# if block_fn.__name__ not in blockfn_stats:
# blockfn_stats[block_fn.__name__] = {}
# blockfn_stats[block_fn.__name__]["uss"] = []
# blockfn_stats[block_fn.__name__]["rss"] = []
# blockfn_stats[block_fn.__name__]["uss_delta"] = []
# blockfn_stats[block_fn.__name__]["uss"].append(end_uss)
# blockfn_stats[block_fn.__name__]["rss"].append(end_rss)
# blockfn_stats[block_fn.__name__]["uss_delta"].append(uss_delta)
return new_block, accessor.get_metadata(
input_files=input_files, exec_stats=stats.build()
)

View file

@ -162,8 +162,7 @@ class BlockExecStats:
# Max memory usage. May be an overestimate since we do not
# differentiate from previous tasks on the same worker.
self.max_rss_bytes: int = 0
self.end_uss_bytes: int = 0
@staticmethod
def builder() -> "_BlockExecStatsBuilder":
return _BlockExecStatsBuilder()
@ -174,8 +173,6 @@ class BlockExecStats:
"wall_time_s": self.wall_time_s,
"cpu_time_s": self.cpu_time_s,
"node_id": self.node_id,
"max_rss_bytes": self.max_rss_bytes,
"end_uss_bytes": self.end_uss_bytes,
}
)
@ -195,17 +192,16 @@ class _BlockExecStatsBuilder:
stats = BlockExecStats()
stats.wall_time_s = time.perf_counter() - self.start_time
stats.cpu_time_s = time.process_time() - self.start_cpu
process = psutil.Process(os.getpid())
if resource is None:
# NOTE(swang): resource package is not supported on Windows. This
# is only the memory usage at the end of the task, not the peak
# memory.
process = psutil.Process(os.getpid())
stats.max_rss_bytes = int(process.memory_info().rss)
else:
stats.max_rss_bytes = int(
resource.getrusage(resource.RUSAGE_SELF).ru_maxrss * 1e3
)
stats.end_uss_bytes = int(process.memory_full_info().uss)
return stats

View file

@ -163,7 +163,6 @@ class DatasetContext:
_default_context.block_splitting_enabled
and _default_context.block_owner is None
):
print("block_splitting_enabled true")
owner = _DesignatedBlockOwner.options(
scheduling_strategy=_default_context.scheduling_strategy
).remote()

View file

@ -1837,26 +1837,6 @@ class Dataset(Generic[T]):
return None
return sum(m.size_bytes for m in metadata)
def max_rss(self) -> int:
"""Return the max rss used across all blocks.
Time complexity: O(1)
Returns:
"""
metadata = self._plan.execute().get_metadata()
return max(m.exec_stats.max_rss_bytes if hasattr(m.exec_stats, 'max_rss_bytes') else 0 for m in metadata)
def max_uss(self) -> int:
"""Return the max uss used across all blocks.
Time complexity: O(1)
Returns:
"""
metadata = self._plan.execute().get_metadata()
return max(m.exec_stats.end_uss_bytes if hasattr(m.exec_stats, 'end_uss_bytes') else 0 for m in metadata)
def input_files(self) -> List[str]:
"""Return the list of input files for the dataset.

View file

@ -274,8 +274,6 @@ def read_datasource(
_wrap_and_register_arrow_serialization_workaround(read_args),
)
)
logger.warning(f'req parallel {requested_parallelism} {min_safe_parallelism} {len(read_tasks)}')
if read_tasks and len(read_tasks) < min_safe_parallelism * 0.7:
perc = 1 + round((min_safe_parallelism - len(read_tasks)) / len(read_tasks), 1)

View file

@ -1,4 +1,4 @@
// Copyright 2020 The Ray Authors.
// Copyright 2022 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.

View file

@ -983,6 +983,7 @@ void WorkerPool::PushWorker(const std::shared_ptr<WorkerInterface> &worker) {
int64_t now = get_time_();
idle_of_all_languages_.emplace_back(worker, now);
idle_of_all_languages_map_[worker] = now;
TryKillingIdleWorkers();
}
}

View file

@ -1,119 +0,0 @@
import traceback
import numpy as np
import pandas as pd
from io import BytesIO
from PIL import Image
import ray
from ray.air.util.tensor_extensions.pandas import TensorArray
from ray.train.torch import to_air_checkpoint, TorchPredictor
from ray.train.batch_predictor import BatchPredictor
import time
from sqlalchemy import values
from torchvision import transforms
from torchvision.models import resnet18
def convert_to_pandas(byte_item_list):
preprocess = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
images = [Image.open(BytesIO(byte_item)).convert('RGB') for byte_item in byte_item_list]
images = [preprocess(image) for image in images]
images = [np.array(image) for image in images]
return pd.DataFrame({"image": TensorArray(images)})
node_high_memory_usage_fraction = 0.95
node_high_memory_monitor_min_interval_ms = 100
params_use_oom_killer = [False, True]
params_num_cpus = [2,4,6,8,10]
params_num_blocks = [25, 50, 100, 200]
params_infinite_retries = [False, True]
params_object_store_memory = [10e9]
use_oom_killer = False
infinite_retries = True
num_cpus = 5
num_blocks = 400
object_store_memory = 10e9
reuse_worker = True
system_config = {}
if use_oom_killer:
system_config["node_high_memory_usage_fraction"] = node_high_memory_usage_fraction
system_config["node_high_memory_monitor_min_interval_ms"] = node_high_memory_monitor_min_interval_ms
if not reuse_worker:
system_config["idle_worker_killing_time_threshold_ms"] = 0
system_config["num_workers_soft_limit"] = 0
system_config["enable_worker_prestart"] = False
if len(system_config) > 0:
ray.init(
num_cpus=num_cpus,
object_store_memory=object_store_memory,
_metrics_export_port=8080,
_system_config=system_config
)
else:
ray.init(
num_cpus=num_cpus,
object_store_memory=object_store_memory,
_metrics_export_port=8080,
)
print(f'running for use oom? {use_oom_killer} num_cpus {num_cpus} blocks {num_blocks} inf retries {infinite_retries}')
for i in range(1):
start_time = time.time()
if infinite_retries:
dataset = ray.data.read_binary_files(
paths=["/home/ray/raydev_mnt/air-cuj-imagenet-20gb/"],
parallelism=num_blocks,
ray_remote_args={'max_retries': -1})
else:
dataset = ray.data.read_binary_files(
paths=["/home/ray/raydev_mnt/air-cuj-imagenet-20gb/"],
parallelism=num_blocks)
end_time = time.time()
print(f"The execution time of read_binary_files is: {end_time-start_time}")
# start_time = time.time()
# dataset = dataset.limit(int(dataset.count() / 4))
# end_time = time.time()
# print(dataset.is_fully_executed())
# print(f"The execution time of limit is: {end_time-start_time}")
start_time = time.time()
try:
if infinite_retries:
dataset = dataset.map_batches(convert_to_pandas, max_retries = -1)
else:
dataset = dataset.map_batches(convert_to_pandas)
except Exception as e:
print(e)
print(traceback.format_exc())
end_time = time.time()
print(f"The execution time is: {end_time-start_time}")
print(f"max task rss {dataset.max_rss()} uss {dataset.max_uss()}")
time.sleep(1000)
# start_time = time.time()
# try:
# dataset = dataset.map_batches(convert_to_pandas)
# except Exception as e:
# print(e)
# print(traceback.format_exc())
# end_time = time.time()
# print(f"The execution time of map_batches is: {end_time-start_time}")
# model = resnet18(pretrained=True)
# ckpt = to_air_checkpoint(model=model)
# predictor = BatchPredictor.from_checkpoint(ckpt, TorchPredictor)
# predictor.predict(dataset, num_gpus_per_worker=1)