ray/test/runtest.py

440 lines
16 KiB
Python
Raw Normal View History

2016-02-22 13:55:06 -08:00
import unittest
2016-06-10 14:12:15 -07:00
import ray
import ray.serialization as serialization
import ray.services as services
import ray.worker as worker
2016-02-22 13:55:06 -08:00
import numpy as np
import time
2016-03-01 01:02:08 -08:00
import subprocess32 as subprocess
import os
import test_functions
2016-06-27 11:35:31 -07:00
import ray.array.remote as ra
import ray.array.distributed as da
RAY_TEST_OBJECTS = [[1, "hello", 3.0], 42, "hello world", 42.0,
(1.0, "hi"), None, (None, None), ("hello", None),
True, False, (True, False),
{True: "hello", False: "world"},
{"hello" : "world", 1: 42, 1.0: 45}, {},
np.int8(3), np.int32(4), np.int64(5),
np.uint8(3), np.uint32(4), np.uint64(5),
np.float32(1.0), np.float64(1.0)]
class UserDefinedType(object):
def __init__(self):
pass
def deserialize(self, primitives):
return "user defined type"
def serialize(self):
return "user defined type"
2016-03-10 12:35:31 -08:00
class SerializationTest(unittest.TestCase):
2016-06-27 11:57:22 -07:00
def roundTripTest(self, data):
serialized, _ = serialization.serialize(ray.worker.global_worker.handle, data)
result = serialization.deserialize(ray.worker.global_worker.handle, serialized)
2016-03-10 12:35:31 -08:00
self.assertEqual(data, result)
2016-06-27 11:57:22 -07:00
def numpyTypeTest(self, typ):
a = np.random.randint(0, 10, size=(100, 100)).astype(typ)
2016-06-27 11:57:22 -07:00
b, _ = serialization.serialize(ray.worker.global_worker.handle, a)
c = serialization.deserialize(ray.worker.global_worker.handle, b)
self.assertTrue((a == c).all())
a = np.array(0).astype(typ)
2016-06-27 11:57:22 -07:00
b, _ = serialization.serialize(ray.worker.global_worker.handle, a)
c = serialization.deserialize(ray.worker.global_worker.handle, b)
self.assertTrue((a == c).all())
a = np.empty((0,)).astype(typ)
2016-06-27 11:57:22 -07:00
b, _ = serialization.serialize(ray.worker.global_worker.handle, a)
c = serialization.deserialize(ray.worker.global_worker.handle, b)
self.assertTrue(a.dtype == c.dtype)
2016-03-10 12:35:31 -08:00
def testSerialize(self):
2016-06-27 11:57:22 -07:00
services.start_ray_local()
2016-03-10 12:35:31 -08:00
for val in RAY_TEST_OBJECTS:
2016-06-27 11:57:22 -07:00
self.roundTripTest(val)
2016-03-24 23:35:38 -07:00
2016-03-10 12:35:31 -08:00
a = np.zeros((100, 100))
2016-06-27 11:57:22 -07:00
res, _ = serialization.serialize(ray.worker.global_worker.handle, a)
b = serialization.deserialize(ray.worker.global_worker.handle, res)
2016-03-10 12:35:31 -08:00
self.assertTrue((a == b).all())
2016-03-08 16:14:02 -08:00
2016-06-27 11:57:22 -07:00
self.numpyTypeTest("int8")
self.numpyTypeTest("uint8")
self.numpyTypeTest("int16")
self.numpyTypeTest("uint16")
self.numpyTypeTest("int32")
self.numpyTypeTest("uint32")
self.numpyTypeTest("float32")
self.numpyTypeTest("float64")
2016-06-27 11:57:22 -07:00
ref0 = ray.put(0)
ref1 = ray.put(0)
ref2 = ray.put(0)
ref3 = ray.put(0)
a = np.array([[ref0, ref1], [ref2, ref3]])
2016-06-27 11:57:22 -07:00
capsule, _ = serialization.serialize(ray.worker.global_worker.handle, a)
result = serialization.deserialize(ray.worker.global_worker.handle, capsule)
self.assertTrue((a == result).all())
2016-06-27 11:57:22 -07:00
self.roundTripTest(ref0)
self.roundTripTest([ref0, ref1, ref2, ref3])
self.roundTripTest({"0": ref0, "1": ref1, "2": ref2, "3": ref3})
self.roundTripTest((ref0, 1))
services.cleanup()
2016-03-12 15:25:45 -08:00
2016-02-22 13:55:06 -08:00
class ObjStoreTest(unittest.TestCase):
# Test setting up object stores, transfering data between them and retrieving data to a client
2016-02-22 13:55:06 -08:00
def testObjStore(self):
2016-06-27 11:57:22 -07:00
[w1, w2] = services.start_services_local(return_drivers=True, num_objstores=2, num_workers_per_objstore=0)
2016-02-22 13:55:06 -08:00
2016-06-23 12:58:48 -07:00
# putting and getting an object shouldn't change it
2016-03-09 11:40:36 -08:00
for data in ["h", "h" * 10000, 0, 0.0]:
2016-06-23 12:58:48 -07:00
objref = ray.put(data, w1)
result = ray.get(objref, w1)
2016-03-09 11:40:36 -08:00
self.assertEqual(result, data)
2016-06-23 12:58:48 -07:00
# putting an object, shipping it to another worker, and getting it shouldn't change it
for data in ["h", "h" * 10000, 0, 0.0, [1, 2, 3, "a", (1, 2)], ("a", ("b", 3))]:
2016-06-23 12:58:48 -07:00
objref = worker.put(data, w1)
result = worker.get(objref, w2)
self.assertEqual(result, data)
2016-06-23 12:58:48 -07:00
# putting an array, shipping it to another worker, and getting it shouldn't change it
for data in [np.zeros([10, 20]), np.random.normal(size=[45, 25])]:
2016-06-23 12:58:48 -07:00
objref = worker.put(data, w1)
result = worker.get(objref, w2)
self.assertTrue(np.alltrue(result == data))
"""
2016-06-23 12:58:48 -07:00
# getting multiple times shouldn't matter
for data in [np.zeros([10, 20]), np.random.normal(size=[45, 25]), np.zeros([10, 20], dtype=np.dtype("float64")), np.zeros([10, 20], dtype=np.dtype("float32")), np.zeros([10, 20], dtype=np.dtype("int64")), np.zeros([10, 20], dtype=np.dtype("int32"))]:
2016-06-23 12:58:48 -07:00
objref = worker.put(data, w1)
result = worker.get(objref, w2)
result = worker.get(objref, w2)
result = worker.get(objref, w2)
self.assertTrue(np.alltrue(result == data))
"""
# shipping a numpy array inside something else should be fine
data = ("a", np.random.normal(size=[10, 10]))
2016-06-23 12:58:48 -07:00
objref = worker.put(data, w1)
result = worker.get(objref, w2)
self.assertTrue(data[0] == result[0])
self.assertTrue(np.alltrue(data[1] == result[1]))
# shipping a numpy array inside something else should be fine
data = ["a", np.random.normal(size=[10, 10])]
2016-06-23 12:58:48 -07:00
objref = worker.put(data, w1)
result = worker.get(objref, w2)
self.assertTrue(data[0] == result[0])
self.assertTrue(np.alltrue(data[1] == result[1]))
2016-02-22 13:55:06 -08:00
2016-03-01 01:02:08 -08:00
services.cleanup()
2016-03-10 14:40:46 -08:00
class WorkerTest(unittest.TestCase):
2016-06-23 12:58:48 -07:00
def testPutGet(self):
2016-06-27 11:57:22 -07:00
services.start_ray_local()
2016-03-10 14:40:46 -08:00
for i in range(100):
value_before = i * 10 ** 6
2016-06-27 11:57:22 -07:00
objref = ray.put(value_before)
value_after = ray.get(objref)
2016-03-10 14:40:46 -08:00
self.assertEqual(value_before, value_after)
for i in range(100):
value_before = i * 10 ** 6 * 1.0
2016-06-27 11:57:22 -07:00
objref = ray.put(value_before)
value_after = ray.get(objref)
2016-03-10 14:40:46 -08:00
self.assertEqual(value_before, value_after)
for i in range(100):
value_before = "h" * i
2016-06-27 11:57:22 -07:00
objref = ray.put(value_before)
value_after = ray.get(objref)
2016-03-10 14:40:46 -08:00
self.assertEqual(value_before, value_after)
for i in range(100):
value_before = [1] * i
2016-06-27 11:57:22 -07:00
objref = ray.put(value_before)
value_after = ray.get(objref)
2016-03-10 14:40:46 -08:00
self.assertEqual(value_before, value_after)
services.cleanup()
2016-04-05 00:34:23 -07:00
class APITest(unittest.TestCase):
def testObjRefAliasing(self):
2016-06-19 22:01:31 -07:00
worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py")
2016-06-27 11:57:22 -07:00
services.start_ray_local(num_workers=3, worker_path=worker_path)
2016-06-27 11:57:22 -07:00
ref = test_functions.test_alias_f()
self.assertTrue(np.alltrue(ray.get(ref) == np.ones([3, 4, 5])))
ref = test_functions.test_alias_g()
self.assertTrue(np.alltrue(ray.get(ref) == np.ones([3, 4, 5])))
ref = test_functions.test_alias_h()
self.assertTrue(np.alltrue(ray.get(ref) == np.ones([3, 4, 5])))
2016-04-05 00:34:23 -07:00
services.cleanup()
2016-06-03 00:10:17 -07:00
def testKeywordArgs(self):
2016-06-19 22:01:31 -07:00
worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py")
2016-06-27 11:57:22 -07:00
services.start_ray_local(num_workers=1, worker_path=worker_path)
2016-06-03 00:10:17 -07:00
x = test_functions.keyword_fct1(1)
2016-06-23 12:58:48 -07:00
self.assertEqual(ray.get(x), "1 hello")
2016-06-03 00:10:17 -07:00
x = test_functions.keyword_fct1(1, "hi")
2016-06-23 12:58:48 -07:00
self.assertEqual(ray.get(x), "1 hi")
2016-06-03 00:10:17 -07:00
x = test_functions.keyword_fct1(1, b="world")
2016-06-23 12:58:48 -07:00
self.assertEqual(ray.get(x), "1 world")
2016-06-03 00:10:17 -07:00
x = test_functions.keyword_fct2(a="w", b="hi")
2016-06-23 12:58:48 -07:00
self.assertEqual(ray.get(x), "w hi")
2016-06-03 00:10:17 -07:00
x = test_functions.keyword_fct2(b="hi", a="w")
2016-06-23 12:58:48 -07:00
self.assertEqual(ray.get(x), "w hi")
2016-06-03 00:10:17 -07:00
x = test_functions.keyword_fct2(a="w")
2016-06-23 12:58:48 -07:00
self.assertEqual(ray.get(x), "w world")
2016-06-03 00:10:17 -07:00
x = test_functions.keyword_fct2(b="hi")
2016-06-23 12:58:48 -07:00
self.assertEqual(ray.get(x), "hello hi")
2016-06-03 00:10:17 -07:00
x = test_functions.keyword_fct2("w")
2016-06-23 12:58:48 -07:00
self.assertEqual(ray.get(x), "w world")
2016-06-03 00:10:17 -07:00
x = test_functions.keyword_fct2("w", "hi")
2016-06-23 12:58:48 -07:00
self.assertEqual(ray.get(x), "w hi")
2016-06-03 00:10:17 -07:00
x = test_functions.keyword_fct3(0, 1, c="w", d="hi")
2016-06-23 12:58:48 -07:00
self.assertEqual(ray.get(x), "0 1 w hi")
2016-06-03 00:10:17 -07:00
x = test_functions.keyword_fct3(0, 1, d="hi", c="w")
2016-06-23 12:58:48 -07:00
self.assertEqual(ray.get(x), "0 1 w hi")
2016-06-03 00:10:17 -07:00
x = test_functions.keyword_fct3(0, 1, c="w")
2016-06-23 12:58:48 -07:00
self.assertEqual(ray.get(x), "0 1 w world")
2016-06-03 00:10:17 -07:00
x = test_functions.keyword_fct3(0, 1, d="hi")
2016-06-23 12:58:48 -07:00
self.assertEqual(ray.get(x), "0 1 hello hi")
2016-06-03 00:10:17 -07:00
x = test_functions.keyword_fct3(0, 1)
2016-06-23 12:58:48 -07:00
self.assertEqual(ray.get(x), "0 1 hello world")
2016-06-03 00:10:17 -07:00
services.cleanup()
def testVariableNumberOfArgs(self):
2016-06-19 22:01:31 -07:00
worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py")
2016-06-27 11:57:22 -07:00
services.start_ray_local(num_workers=1, worker_path=worker_path)
x = test_functions.varargs_fct1(0, 1, 2)
2016-06-23 12:58:48 -07:00
self.assertEqual(ray.get(x), "0 1 2")
x = test_functions.varargs_fct2(0, 1, 2)
2016-06-23 12:58:48 -07:00
self.assertEqual(ray.get(x), "1 2")
self.assertTrue(test_functions.kwargs_exception_thrown)
self.assertTrue(test_functions.varargs_and_kwargs_exception_thrown)
services.cleanup()
def testNoArgs(self):
worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py")
services.start_ray_local(num_workers=1, worker_path=worker_path, driver_mode=ray.WORKER_MODE)
test_functions.no_op()
time.sleep(0.2)
task_info = ray.task_info()
self.assertEqual(len(task_info["failed_tasks"]), 0)
self.assertEqual(len(task_info["running_tasks"]), 0)
self.assertEqual(task_info["num_succeeded"], 1)
test_functions.no_op_fail()
time.sleep(0.2)
task_info = ray.task_info()
self.assertEqual(len(task_info["failed_tasks"]), 1)
self.assertEqual(len(task_info["running_tasks"]), 0)
self.assertEqual(task_info["num_succeeded"], 1)
self.assertEqual(task_info["failed_tasks"][0].get("error_message"), "The @remote decorator for function test_functions.no_op_fail has 0 return values, but test_functions.no_op_fail returned more than 0 values.")
services.cleanup()
def testTypeChecking(self):
worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py")
services.start_ray_local(num_workers=1, worker_path=worker_path, driver_mode=ray.WORKER_MODE)
# Make sure that these functions throw exceptions because there return
# values do not type check.
test_functions.test_return1()
test_functions.test_return2()
time.sleep(0.2)
task_info = ray.task_info()
self.assertEqual(len(task_info["failed_tasks"]), 2)
self.assertEqual(len(task_info["running_tasks"]), 0)
self.assertEqual(task_info["num_succeeded"], 0)
services.cleanup()
2016-06-18 01:01:48 +03:00
class TaskStatusTest(unittest.TestCase):
def testFailedTask(self):
2016-06-19 22:01:31 -07:00
worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py")
2016-06-27 11:57:22 -07:00
services.start_ray_local(num_workers=3, worker_path=worker_path, driver_mode=ray.WORKER_MODE)
2016-06-18 01:01:48 +03:00
test_functions.test_alias_f()
2016-06-25 09:43:57 -07:00
test_functions.throw_exception_fct1()
test_functions.throw_exception_fct1()
2016-06-18 01:01:48 +03:00
time.sleep(1)
result = ray.task_info()
2016-06-24 19:43:24 -07:00
self.assertTrue(len(result["failed_tasks"]) == 2)
2016-06-18 01:01:48 +03:00
task_ids = set()
2016-06-24 19:43:24 -07:00
for task in result["failed_tasks"]:
self.assertTrue(task.has_key("worker_address"))
self.assertTrue(task.has_key("operationid"))
2016-06-25 09:43:57 -07:00
self.assertEqual(task.get("error_message"), "Test function 1 intentionally failed.")
2016-06-24 19:43:24 -07:00
self.assertTrue(task["operationid"] not in task_ids)
task_ids.add(task["operationid"])
2016-06-18 01:01:48 +03:00
2016-06-25 09:43:57 -07:00
x = test_functions.throw_exception_fct2()
try:
ray.get(x)
except Exception as e:
self.assertEqual(str(e), "The task that created this object reference failed with error message: Test function 2 intentionally failed.")
else:
self.assertTrue(False) # ray.get should throw an exception
x, y, z = test_functions.throw_exception_fct3(1.0)
for ref in [x, y, z]:
try:
ray.get(ref)
except Exception as e:
self.assertEqual(str(e), "The task that created this object reference failed with error message: Test function 3 intentionally failed.")
else:
self.assertTrue(False) # ray.get should throw an exception
2016-06-23 12:58:48 -07:00
def check_get_deallocated(data):
x = ray.put(data)
ray.get(x)
return x.val
2016-06-23 12:58:48 -07:00
def check_get_not_deallocated(data):
x = ray.put(data)
y = ray.get(x)
return y, x.val
class ReferenceCountingTest(unittest.TestCase):
def testDeallocation(self):
2016-06-19 22:01:31 -07:00
worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py")
2016-06-27 11:57:22 -07:00
services.start_ray_local(num_workers=3, worker_path=worker_path)
x = test_functions.test_alias_f()
2016-06-23 12:58:48 -07:00
ray.get(x)
time.sleep(0.1)
objref_val = x.val
2016-06-10 14:12:15 -07:00
self.assertTrue(ray.scheduler_info()["reference_counts"][objref_val] == 1)
del x
2016-06-10 14:12:15 -07:00
self.assertTrue(ray.scheduler_info()["reference_counts"][objref_val] == -1) # -1 indicates deallocated
y = test_functions.test_alias_h()
2016-06-23 12:58:48 -07:00
ray.get(y)
time.sleep(0.1)
objref_val = y.val
2016-06-10 14:12:15 -07:00
self.assertTrue(ray.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [1, 0, 0])
del y
2016-06-10 14:12:15 -07:00
self.assertTrue(ray.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [-1, -1, -1])
z = da.zeros([da.BLOCK_SIZE, 2 * da.BLOCK_SIZE], "float")
time.sleep(0.1)
objref_val = z.val
2016-06-10 14:12:15 -07:00
self.assertTrue(ray.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [1, 1, 1])
del z
time.sleep(0.1)
2016-06-10 14:12:15 -07:00
self.assertTrue(ray.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [-1, -1, -1])
x = ra.zeros([10, 10], "float")
y = ra.zeros([10, 10], "float")
z = ra.dot(x, y)
objref_val = x.val
time.sleep(0.1)
2016-06-10 14:12:15 -07:00
self.assertTrue(ray.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [1, 1, 1])
del x
time.sleep(0.1)
2016-06-10 14:12:15 -07:00
self.assertTrue(ray.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [-1, 1, 1])
del y
time.sleep(0.1)
2016-06-10 14:12:15 -07:00
self.assertTrue(ray.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [-1, -1, 1])
del z
time.sleep(0.1)
2016-06-10 14:12:15 -07:00
self.assertTrue(ray.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [-1, -1, -1])
services.cleanup()
2016-06-23 12:58:48 -07:00
def testGet(self):
worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py")
2016-06-27 11:57:22 -07:00
services.start_ray_local(num_workers=3, worker_path=worker_path)
for val in RAY_TEST_OBJECTS + [np.zeros((2, 2)), UserDefinedType()]:
2016-06-23 12:58:48 -07:00
objref_val = check_get_deallocated(val)
self.assertEqual(ray.scheduler_info()["reference_counts"][objref_val], -1)
if not isinstance(val, bool) and not isinstance(val, np.generic) and val is not None:
2016-06-23 12:58:48 -07:00
x, objref_val = check_get_not_deallocated(val)
self.assertEqual(ray.scheduler_info()["reference_counts"][objref_val], 1)
# The following currently segfaults: The second "result = " closes the
# memory segment as soon as the assignment is done (and the first result
# goes out of scope).
"""
data = np.zeros([10, 20])
objref = ray.put(data)
result = worker.get(objref)
result = worker.get(objref)
self.assertTrue(np.alltrue(result == data))
"""
services.cleanup()
@unittest.expectedFailure
2016-06-23 12:58:48 -07:00
def testGetFailing(self):
worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py")
2016-06-27 11:57:22 -07:00
services.start_ray_local(num_workers=3, worker_path=worker_path)
# This is failing, because for bool and None, we cannot track python
# refcounts and therefore cannot keep the refcount up
# (see 5281bd414f6b404f61e1fe25ec5f6651defee206).
# The resulting behavior is still correct however because True, False and
2016-06-23 12:58:48 -07:00
# None are returned by get "by value" and therefore can be reclaimed from
# the object store safely.
for val in [True, False, None]:
2016-06-23 12:58:48 -07:00
x, objref_val = check_get_not_deallocated(val)
self.assertEqual(ray.scheduler_info()["reference_counts"][objref_val], 1)
services.cleanup()
class PythonModeTest(unittest.TestCase):
def testObjRefAliasing(self):
2016-06-27 11:57:22 -07:00
services.start_ray_local(driver_mode=ray.PYTHON_MODE)
xref = test_functions.test_alias_h()
self.assertTrue(np.alltrue(xref == np.ones([3, 4, 5]))) # remote functions should return by value
self.assertTrue(np.alltrue(xref == ray.get(xref))) # ray.get should be the identity
y = np.random.normal(size=[11, 12])
self.assertTrue(np.alltrue(y == ray.put(y))) # ray.put should be the identity
# make sure objects are immutable, this example is why we need to copy
# arguments before passing them into remote functions in python mode
aref = test_functions.python_mode_f()
self.assertTrue(np.alltrue(aref == np.array([0, 0])))
bref = test_functions.python_mode_g(aref)
self.assertTrue(np.alltrue(aref == np.array([0, 0]))) # python_mode_g should not mutate aref
self.assertTrue(np.alltrue(bref == np.array([1, 0])))
services.cleanup()
2016-06-24 19:43:24 -07:00
if __name__ == "__main__":
2016-02-22 13:55:06 -08:00
unittest.main()