[Dataset] imagenet nightly test (#17069)

This commit is contained in:
Alex Wu 2021-07-16 14:15:49 -07:00 committed by GitHub
parent c27f43d9b8
commit 93c16346bf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 257 additions and 2 deletions

View file

@ -1,10 +1,14 @@
from typing import Any, Union, Optional, Tuple, TYPE_CHECKING
import logging
from typing import Any, List, Union, Optional, Tuple, TYPE_CHECKING
import urllib
from ray.experimental.data.datasource import _S3FileSystemWrapper
if TYPE_CHECKING:
import pyarrow
logger = logging.getLogger(__name__)
def read_file(path: str,
include_paths: bool = False,
@ -21,9 +25,66 @@ def read_file(path: str,
filesystem = filesystem.unwrap()
contents = filesystem.open_input_stream(path).readall()
else:
contents = open(path, "rb").read()
parsed = urllib.parse.urlparse(path)
if parsed.scheme == "s3":
contents = download_single_s3_file(parsed.netloc,
parsed.path.strip("/"))
else:
contents = open(path, "rb").read()
if include_paths:
return path, contents
else:
return contents
def list_objects(path: str) -> List[str]:
# S3 specific dependencies should only be imported if we're downloading a
# file from s3.
parsed = urllib.parse.urlparse(path)
if parsed.scheme != "s3":
# TODO (Alex): Implement this after porting it to the datasource.
raise NotImplementedError(
"Binary files can only be enumerated for S3 buckets.")
import boto3
path = parsed.path.strip("/")
split = path.split("/", 1)
bucket = split[0]
key = split[1] if len(split) > 1 else ""
s3 = boto3.resource("s3")
s3_objects = s3.Bucket(bucket).objects.filter(Prefix=key).all()
paths = [f"s3://{obj.bucket_name}/{obj.key}" for obj in s3_objects]
return paths
download_initialized = False
http_session = None
def download_single_s3_file(bucket: str, key: str) -> bytes:
import requests
global download_initialized, http_session
if not download_initialized:
http_session = requests.Session()
download_initialized = True
url = f"https://{bucket}.s3.amazonaws.com/{key}"
# Retry download if it fails.
for _ in range(3):
result = http_session.get(url)
if result.status_code == 200:
break
logger.warning(
f"Failed to download {url} with error: {result.content}. Retrying."
)
else:
raise ValueError(
f"({result.status_code}) {url} is not a valid s3 url. "
f"{result.content}")
return result.content

View file

@ -321,9 +321,13 @@ def read_binary_files(
"""
import pyarrow as pa
if isinstance(paths, str):
paths = _reader.list_objects(paths)
dataset = from_items(paths, parallelism=parallelism)
if isinstance(filesystem, pa.fs.S3FileSystem):
filesystem = _S3FileSystemWrapper(filesystem)
return dataset.map(
lambda path: _reader.read_file(
path,

View file

@ -1,5 +1,6 @@
import os
import random
import requests
import shutil
import time
@ -409,6 +410,16 @@ def test_read_binary_files_with_fs(ray_start_regular_shared):
assert expected == item
def test_read_binary_files_s3(ray_start_regular_shared):
ds = ray.experimental.data.read_binary_files(
["s3://anyscale-data/small-files/0.dat"])
item = ds.take(1).pop()
expected = requests.get(
"https://anyscale-data.s3.us-west-2.amazonaws.com/small-files/0.dat"
).content
assert item == expected
def test_iter_batches_basic(ray_start_regular_shared):
df1 = pd.DataFrame({"one": [1, 2, 3], "two": [2, 3, 4]})
df2 = pd.DataFrame({"one": [4, 5, 6], "two": [5, 6, 7]})

View file

@ -73,6 +73,9 @@ CORE_NIGHTLY_TESTS = {
"single_node",
"object_store",
],
"~/ray/release/nightly_tests/dataset/dataset_test.yaml": [
"inference",
]
}
NIGHTLY_TESTS = {

View file

@ -0,0 +1,12 @@
base_image: "anyscale/ray-ml:pinned-nightly-py37-gpu"
env_vars: {}
python:
pip_packages: []
conda_packages: []
post_build_cmds:
- pip uninstall -y numpy ray || true
- sudo rm -rf /home/ray/anaconda3/lib/python3.7/site-packages/numpy
- pip3 install boto3 pyarrow tqdm
- pip3 install -U {{ env["RAY_WHEELS"] | default("ray") }}

View file

@ -0,0 +1,14 @@
- name: inference
owner:
mail: "core@anyscale.com"
slack: "@Alex Wu"
cluster:
app_config: app_config.yaml
compute_template: inference.yaml
run:
timeout: 600
prepare: sleep 0
script: python inference.py

View file

@ -0,0 +1,118 @@
from io import BytesIO
from PIL import Image
import torch
from torchvision import transforms
from torchvision.models import resnet50
import ray
import boto3
import json
import time
import os
from tqdm import tqdm
import numpy as np
class Preprocessor:
def __init__(self):
self.torch_transform = transforms.Compose([
transforms.Resize(224),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Lambda(lambda t: t[:3, ...]), # remove alpha channel
transforms.Normalize(
mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
def __call__(self, img_bytes):
try:
img = Image.open(BytesIO(img_bytes)).convert("RGB")
tensor = self.torch_transform(img)
return tensor
except Exception as e:
raise e
class ImageModel:
def __init__(self):
self.model = resnet50(pretrained=True).eval().half().cuda()
def __call__(self, input_tensor_np):
input_tensor = torch.from_numpy(input_tensor_np).half().cuda()
with torch.no_grad():
output_tensor = self.model(input_tensor)
result = torch.argmax(output_tensor, dim=1).cpu()
return result.numpy()
def get_paths(bucket, path, max_files=100 * 1000):
s3 = boto3.resource("s3")
s3_objects = s3.Bucket(bucket).objects.filter(
Prefix=path).limit(max_files).all()
materialized = [(obj.bucket_name, obj.key) for obj in tqdm(s3_objects)]
return materialized
def preprocess(batch):
preprocessor = Preprocessor()
return preprocessor(batch)
infer_initialized = False
model_fn = None
def infer(batch):
global infer_initialized, model_fn
if not infer_initialized:
infer_initialized = True
model_fn = ImageModel()
ndarr_obj = batch.values
input_tensor_np = np.array([img.numpy() for img in ndarr_obj.reshape(-1)])
return list(model_fn(input_tensor_np))
ray.init()
while ray.cluster_resources().get("GPU", 0) != 2:
print("Waiting for GPUs {}/2".format(ray.cluster_resources().get(
"GPU", 400)))
time.sleep(5)
start_time = time.time()
print("Downloading...")
ds = ray.experimental.data.read_binary_files(
"s3://anyscale-data/small-images/", parallelism=400)
ds = ds.limit(100 * 1000)
end_download_time = time.time()
print("Preprocessing...")
ds = ds.map(preprocess)
end_preprocess_time = time.time()
print("Inferring...")
ds = ds.map_batches(infer, num_gpus=0.25)
end_time = time.time()
download_time = end_download_time - start_time
preprocess_time = end_preprocess_time - end_download_time
infer_time = end_time - end_preprocess_time
total_time = end_time - start_time
print("Download time", download_time)
print("Preprocess time", preprocess_time)
print("Infer time", infer_time)
print("total time", total_time)
if "TEST_OUTPUT_JSON" in os.environ:
out_file = open(os.environ["TEST_OUTPUT_JSON"], "w")
results = {
"download_time": download_time,
"preprocess_time": preprocess_time,
"inference_time": infer_time,
"total_time": total_time,
}
json.dump(results, out_file)

View file

@ -0,0 +1,32 @@
cloud_id: {{env["ANYSCALE_CLOUD_ID"]}}
region: us-west-2
max_workers: 999
aws:
BlockDeviceMappings:
- DeviceName: /dev/sda1
Ebs:
VolumeSize: 500
head_node_type:
name: head_node
instance_type: g4dn.16xlarge
worker_node_types:
- name: worker_node
instance_type: m5.8xlarge
min_workers: 0
max_workers: 0
use_spot: false
resources:
cpu: 32
- name: gpu_node
instance_type: g4dn.16xlarge
min_workers: 1
max_workers: 1
use_spot: false
resources:
cpu: 64
gpu: 1