Merge pull request #91 from amplab/datasets

Loading imagenet
This commit is contained in:
Robert Nishihara 2016-06-10 22:42:52 -07:00 committed by GitHub
commit 89ba954894
16 changed files with 209 additions and 6 deletions

View file

@ -15,3 +15,4 @@ script:
- cd test
- python runtest.py
- python arrays_test.py
- python datasets_test.py

View file

@ -3,6 +3,11 @@
Ray is an experimental distributed execution framework with a Python-like
programming model. It is under development and not ready for general use.
## Example Code
### Loading ImageNet
TODO: fill this out.
## Design Decisions
For a description of our design decisions, see

5
data/README.md Normal file
View file

@ -0,0 +1,5 @@
# Data for Ray
This folder contains data neccessary to run tests, etc. Only very small amounts
of data should be stored here and if a loader for a large dataset is tested, a
miniature version of this dataset should be created.

BIN
data/mini.tar Normal file

Binary file not shown.

View file

@ -0,0 +1,38 @@
import argparse
import boto3
import os
import numpy as np
import ray
import ray.services as services
import ray.datasets.imagenet as imagenet
import functions
parser = argparse.ArgumentParser(description="Parse information for data loading.")
parser.add_argument("--s3-bucket", type=str, help="Name of the bucket that contains the image data.")
parser.add_argument("--key-prefix", default="ILSVRC2012_img_train/n015", type=str, help="Prefix for files to fetch.")
parser.add_argument("--drop-ipython", default=False, type=bool, help="Drop into IPython at the end?")
if __name__ == "__main__":
args = parser.parse_args()
test_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "worker.py")
services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=5, worker_path=test_path)
s3 = boto3.resource("s3")
imagenet_bucket = s3.Bucket(args.s3_bucket)
objects = imagenet_bucket.objects.filter(Prefix=args.key_prefix)
images = [obj.key for obj in objects.all()]
x = imagenet.load_tarfiles_from_s3(args.s3_bucket, map(str, images), [256, 256]) # TODO(pcm): implement unicode serialization
mean_image = functions.compute_mean_image(x)
mean_image = ray.pull(mean_image)
print "The mean image is:"
print mean_image
if args.drop_ipython:
import IPython
IPython.embed()
services.cleanup()

View file

@ -0,0 +1,18 @@
import numpy as np
from typing import List
import ray
import ray.arrays.remote as ra
@ray.remote([List[ray.ObjRef]], [int])
def num_images(batches):
shape_refs = [ra.shape(batch) for batch in batches]
return sum([ray.pull(shape_ref)[0] for shape_ref in shape_refs])
@ray.remote([List[ray.ObjRef]], [np.ndarray])
def compute_mean_image(batches):
if len(batches) == 0:
raise Exception("No images were passed into `compute_mean_image`.")
sum_image_refs = [ra.sum(batch, axis=0) for batch in batches]
sum_images = [ray.pull(ref) for ref in sum_image_refs]
n_images = num_images(batches)
return np.sum(sum_images, axis=0).astype("float64") / ray.pull(n_images)

View file

@ -0,0 +1,33 @@
import sys
import argparse
import numpy as np
import ray.datasets.imagenet
import ray
import ray.services as services
import ray.worker as worker
import ray.arrays.remote as ra
import ray.arrays.distributed as da
import functions
parser = argparse.ArgumentParser(description="Parse addresses for the worker to connect to.")
parser.add_argument("--scheduler-address", default="127.0.0.1:10001", type=str, help="the scheduler's address")
parser.add_argument("--objstore-address", default="127.0.0.1:20001", type=str, help="the objstore's address")
parser.add_argument("--worker-address", default="127.0.0.1:40001", type=str, help="the worker's address")
if __name__ == "__main__":
args = parser.parse_args()
worker.connect(args.scheduler_address, args.objstore_address, args.worker_address)
ray.register_module(ray.datasets.imagenet)
ray.register_module(functions)
ray.register_module(ra)
ray.register_module(ra.random)
ray.register_module(ra.linalg)
ray.register_module(da)
ray.register_module(da.random)
ray.register_module(da.linalg)
worker.main_loop()

View file

@ -1,3 +1,4 @@
import libraylib as lib
import serialization
from worker import scheduler_info, register_module, connect, disconnect, pull, push, remote
from libraylib import ObjRef

View file

@ -176,7 +176,7 @@ def qr(a):
for r in range(i, a.num_blocks[0]):
y_ri = y_val.objrefs[r - i, 0]
W_rcs.append(qr_helper2(y_ri, a_work.objrefs[r, c]))
W_c = ra.sum(0, *W_rcs)
W_c = ra.linalg.sum_list(*W_rcs)
for r in range(i, a.num_blocks[0]):
y_ri = y_val.objrefs[r - i, 0]
A_rc = qr_helper1(a_work.objrefs[r, c], y_ri, t, W_c)

View file

@ -68,9 +68,9 @@ def add(x1, x2):
def subtract(x1, x2):
return np.subtract(x1, x2)
@ray.remote([int, np.ndarray], [np.ndarray])
def sum(axis, *xs):
return np.sum(xs, axis=axis)
@ray.remote([np.ndarray, int], [np.ndarray])
def sum(x, axis=-1):
return np.sum(x, axis=axis if axis != -1 else None)
@ray.remote([np.ndarray], [tuple])
def shape(a):

View file

@ -86,3 +86,7 @@ def matrix_rank(M):
@ray.remote([np.ndarray], [np.ndarray])
def multi_dot(*a):
raise NotImplementedError
@ray.remote([np.ndarray], [np.ndarray])
def sum_list(*xs):
return np.sum(xs, axis=0)

View file

@ -0,0 +1,71 @@
import tarfile, io
from typing import List
import PIL.Image
import numpy as np
import boto3
import ray
s3 = boto3.client("s3")
def load_chunk(tarfile, size=None):
"""Load a number of images from a single imagenet .tar file.
This function also converts the image from grayscale to RGB if neccessary.
Args:
tarfile (tarfile.TarFile): The archive from which the files get loaded.
size (Optional[Tuple[int, int]]): Resize the image to this size if provided.
Returns:
numpy.ndarray: Contains the image data in format [batch, w, h, c]
"""
result = []
for member in tarfile.getmembers():
filename = member.path
content = tarfile.extractfile(member)
img = PIL.Image.open(content)
rgbimg = PIL.Image.new("RGB", img.size)
rgbimg.paste(img)
if size != None:
rgbimg = rgbimg.resize(size, PIL.Image.ANTIALIAS)
result.append(np.array(rgbimg).reshape(1, rgbimg.size[0], rgbimg.size[1], 3))
return np.concatenate(result)
@ray.remote([str, str, List[int]], [np.ndarray])
def load_tarfile_from_s3(bucket, s3_key, size=[]):
"""Load an imagenet .tar file.
Args:
bucket (str): Bucket holding the imagenet .tar.
s3_key (str): s3 key from which the .tar file is loaded.
size (List[int]): Resize the image to this size if size != []; len(size) == 2 required.
Returns:
np.ndarray: The image data (see load_chunk).
"""
response = s3.get_object(Bucket=bucket, Key=s3_key)
output = io.BytesIO()
chunk = response["Body"].read(1024 * 8)
while chunk:
output.write(chunk)
chunk = response["Body"].read(1024 * 8)
output.seek(0) # go to the beginning of the .tar file
tar = tarfile.open(mode= "r", fileobj=output)
return load_chunk(tar, size=size if size != [] else None)
@ray.remote([str, List[str], List[int]], [List[ray.ObjRef]])
def load_tarfiles_from_s3(bucket, s3_keys, size=[]):
"""Load a number of imagenet .tar files.
Args:
bucket (str): Bucket holding the imagenet .tars.
s3_keys (List[str]): List of s3 keys from which the .tar files are being loaded.
size (List[int]): Resize the image to this size if size != []; len(size) == 2 required.
Returns:
np.ndarray: Contains object references to the chunks of the images (see load_chunk).
"""
return [load_tarfile_from_s3(bucket, s3_key, size) for s3_key in s3_keys]

View file

@ -1,3 +1,6 @@
typing
funcsigs
subprocess32
boto3
botocore
Pillow

View file

@ -28,9 +28,9 @@ fi
if [[ $platform == "linux" ]]; then
sudo apt-get update
sudo apt-get install -y git cmake build-essential autoconf libtool python-dev python-numpy python-pip libboost-all-dev unzip
sudo apt-get install -y git cmake build-essential autoconf libtool python-dev python-numpy python-pip libboost-all-dev unzip libjpeg8-dev
elif [[ $platform == "macosx" ]]; then
brew install git cmake autoconf libtool boost
brew install git cmake autoconf libtool boost libjpeg
sudo easy_install pip
sudo pip install numpy
fi

23
test/datasets_test.py Normal file
View file

@ -0,0 +1,23 @@
import os
import tarfile
import unittest
import ray
import ray.services as services
import ray.datasets.imagenet as imagenet
class ImageNetTest(unittest.TestCase):
def testImageNetLoading(self):
test_dir = os.path.dirname(os.path.abspath(__file__))
test_path = os.path.join(test_dir, "test_worker.py")
services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=5, worker_path=test_path)
chunk_name = os.path.join(test_dir, "..", "data", "mini.tar")
tar = tarfile.open(chunk_name, mode= "r")
chunk = imagenet.load_chunk(tar, size=(256, 256))
self.assertEqual(chunk.shape, (2, 256, 256, 3))
services.cleanup()
if __name__ == "__main__":
unittest.main()

View file

@ -5,6 +5,7 @@ import numpy as np
import test_functions
import ray.arrays.remote as ra
import ray.arrays.distributed as da
import ray.datasets.imagenet
import ray
import ray.services as services