2016-12-11 12:25:31 -08:00
|
|
|
from __future__ import absolute_import
|
|
|
|
from __future__ import division
|
|
|
|
from __future__ import print_function
|
|
|
|
|
2017-07-31 17:57:48 -07:00
|
|
|
import numpy as np
|
2017-02-16 11:30:35 -08:00
|
|
|
import os
|
2016-08-15 11:02:54 -07:00
|
|
|
import ray
|
2016-12-13 17:37:22 -08:00
|
|
|
import sys
|
2017-02-16 11:30:35 -08:00
|
|
|
import tempfile
|
2016-08-15 11:02:54 -07:00
|
|
|
import time
|
2017-02-16 11:30:35 -08:00
|
|
|
import unittest
|
2016-08-15 11:02:54 -07:00
|
|
|
|
2017-03-21 12:57:54 -07:00
|
|
|
import ray.test.test_functions as test_functions
|
|
|
|
|
2016-12-13 17:37:22 -08:00
|
|
|
if sys.version_info >= (3, 0):
|
2017-07-13 14:53:57 -07:00
|
|
|
from importlib import reload
|
2016-12-13 17:37:22 -08:00
|
|
|
|
2016-08-15 11:02:54 -07:00
|
|
|
|
2017-01-25 22:53:48 -08:00
|
|
|
def relevant_errors(error_type):
|
2017-07-13 14:53:57 -07:00
|
|
|
return [info for info in ray.error_info() if info[b"type"] == error_type]
|
2017-01-25 22:53:48 -08:00
|
|
|
|
2017-03-21 12:57:54 -07:00
|
|
|
|
2016-11-02 00:39:35 -07:00
|
|
|
def wait_for_errors(error_type, num_errors, timeout=10):
|
2017-07-13 14:53:57 -07:00
|
|
|
start_time = time.time()
|
|
|
|
while time.time() - start_time < timeout:
|
|
|
|
if len(relevant_errors(error_type)) >= num_errors:
|
|
|
|
return
|
|
|
|
time.sleep(0.1)
|
|
|
|
print("Timing out of wait.")
|
2016-11-02 00:39:35 -07:00
|
|
|
|
2017-03-21 12:57:54 -07:00
|
|
|
|
2016-08-15 11:02:54 -07:00
|
|
|
class TaskStatusTest(unittest.TestCase):
|
2017-07-13 14:53:57 -07:00
|
|
|
def testFailedTask(self):
|
|
|
|
reload(test_functions)
|
|
|
|
ray.init(num_workers=3, driver_mode=ray.SILENT_MODE)
|
|
|
|
|
|
|
|
test_functions.throw_exception_fct1.remote()
|
|
|
|
test_functions.throw_exception_fct1.remote()
|
|
|
|
wait_for_errors(b"task", 2)
|
|
|
|
self.assertEqual(len(relevant_errors(b"task")), 2)
|
|
|
|
for task in relevant_errors(b"task"):
|
|
|
|
self.assertIn(b"Test function 1 intentionally failed.",
|
|
|
|
task.get(b"message"))
|
|
|
|
|
|
|
|
x = test_functions.throw_exception_fct2.remote()
|
|
|
|
try:
|
|
|
|
ray.get(x)
|
|
|
|
except Exception as e:
|
|
|
|
self.assertIn("Test function 2 intentionally failed.", str(e))
|
|
|
|
else:
|
|
|
|
# ray.get should throw an exception.
|
|
|
|
self.assertTrue(False)
|
|
|
|
|
|
|
|
x, y, z = test_functions.throw_exception_fct3.remote(1.0)
|
|
|
|
for ref in [x, y, z]:
|
|
|
|
try:
|
|
|
|
ray.get(ref)
|
|
|
|
except Exception as e:
|
|
|
|
self.assertIn("Test function 3 intentionally failed.", str(e))
|
|
|
|
else:
|
|
|
|
# ray.get should throw an exception.
|
|
|
|
self.assertTrue(False)
|
|
|
|
|
|
|
|
ray.worker.cleanup()
|
|
|
|
|
|
|
|
def testFailImportingRemoteFunction(self):
|
|
|
|
ray.init(num_workers=2, driver_mode=ray.SILENT_MODE)
|
|
|
|
|
|
|
|
# Create the contents of a temporary Python file.
|
|
|
|
temporary_python_file = """
|
2017-02-16 11:30:35 -08:00
|
|
|
def temporary_helper_function():
|
2017-07-13 14:53:57 -07:00
|
|
|
return 1
|
2017-02-16 11:30:35 -08:00
|
|
|
"""
|
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
f = tempfile.NamedTemporaryFile(suffix=".py")
|
|
|
|
f.write(temporary_python_file.encode("ascii"))
|
|
|
|
f.flush()
|
|
|
|
directory = os.path.dirname(f.name)
|
|
|
|
# Get the module name and strip ".py" from the end.
|
|
|
|
module_name = os.path.basename(f.name)[:-3]
|
|
|
|
sys.path.append(directory)
|
|
|
|
module = __import__(module_name)
|
|
|
|
|
|
|
|
# Define a function that closes over this temporary module. This should
|
|
|
|
# fail when it is unpickled.
|
|
|
|
@ray.remote
|
|
|
|
def g():
|
|
|
|
return module.temporary_python_file()
|
|
|
|
|
|
|
|
wait_for_errors(b"register_remote_function", 2)
|
|
|
|
self.assertIn(b"No module named", ray.error_info()[0][b"message"])
|
|
|
|
self.assertIn(b"No module named", ray.error_info()[1][b"message"])
|
|
|
|
|
|
|
|
# Check that if we try to call the function it throws an exception and
|
|
|
|
# does not hang.
|
|
|
|
for _ in range(10):
|
|
|
|
self.assertRaises(Exception, lambda: ray.get(g.remote()))
|
|
|
|
|
|
|
|
f.close()
|
|
|
|
|
|
|
|
# Clean up the junk we added to sys.path.
|
|
|
|
sys.path.pop(-1)
|
|
|
|
ray.worker.cleanup()
|
|
|
|
|
|
|
|
def testFailedFunctionToRun(self):
|
|
|
|
ray.init(num_workers=2, driver_mode=ray.SILENT_MODE)
|
|
|
|
|
|
|
|
def f(worker):
|
|
|
|
if ray.worker.global_worker.mode == ray.WORKER_MODE:
|
|
|
|
raise Exception("Function to run failed.")
|
|
|
|
ray.worker.global_worker.run_function_on_all_workers(f)
|
|
|
|
wait_for_errors(b"function_to_run", 2)
|
|
|
|
# Check that the error message is in the task info.
|
|
|
|
self.assertEqual(len(ray.error_info()), 2)
|
|
|
|
self.assertIn(b"Function to run failed.",
|
|
|
|
ray.error_info()[0][b"message"])
|
|
|
|
self.assertIn(b"Function to run failed.",
|
|
|
|
ray.error_info()[1][b"message"])
|
|
|
|
|
|
|
|
ray.worker.cleanup()
|
|
|
|
|
|
|
|
def testFailImportingActor(self):
|
|
|
|
ray.init(num_workers=2, driver_mode=ray.SILENT_MODE)
|
|
|
|
|
|
|
|
# Create the contents of a temporary Python file.
|
|
|
|
temporary_python_file = """
|
2017-02-22 15:15:45 -08:00
|
|
|
def temporary_helper_function():
|
2017-07-13 14:53:57 -07:00
|
|
|
return 1
|
2017-02-22 15:15:45 -08:00
|
|
|
"""
|
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
f = tempfile.NamedTemporaryFile(suffix=".py")
|
|
|
|
f.write(temporary_python_file.encode("ascii"))
|
|
|
|
f.flush()
|
|
|
|
directory = os.path.dirname(f.name)
|
|
|
|
# Get the module name and strip ".py" from the end.
|
|
|
|
module_name = os.path.basename(f.name)[:-3]
|
|
|
|
sys.path.append(directory)
|
|
|
|
module = __import__(module_name)
|
2017-02-22 15:15:45 -08:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
# Define an actor that closes over this temporary module. This should
|
|
|
|
# fail when it is unpickled.
|
|
|
|
@ray.remote
|
|
|
|
class Foo(object):
|
|
|
|
def __init__(self):
|
|
|
|
self.x = module.temporary_python_file()
|
2017-03-21 12:57:54 -07:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
def get_val(self):
|
|
|
|
return 1
|
2017-02-22 15:15:45 -08:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
# There should be no errors yet.
|
|
|
|
self.assertEqual(len(ray.error_info()), 0)
|
2017-02-22 15:15:45 -08:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
# Create an actor.
|
|
|
|
foo = Foo.remote()
|
2017-02-22 15:15:45 -08:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
# Wait for the error to arrive.
|
|
|
|
wait_for_errors(b"register_actor", 1)
|
|
|
|
self.assertIn(b"No module named", ray.error_info()[0][b"message"])
|
2017-02-22 15:15:45 -08:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
# Wait for the error from when the __init__ tries to run.
|
|
|
|
wait_for_errors(b"task", 1)
|
|
|
|
self.assertIn(
|
|
|
|
b"failed to be imported, and so cannot execute this method",
|
|
|
|
ray.error_info()[1][b"message"])
|
2017-02-22 15:15:45 -08:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
# Check that if we try to get the function it throws an exception and
|
|
|
|
# does not hang.
|
|
|
|
with self.assertRaises(Exception):
|
|
|
|
ray.get(foo.get_val.remote())
|
2017-02-22 15:15:45 -08:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
# Wait for the error from when the call to get_val.
|
|
|
|
wait_for_errors(b"task", 2)
|
|
|
|
self.assertIn(
|
|
|
|
b"failed to be imported, and so cannot execute this method",
|
|
|
|
ray.error_info()[2][b"message"])
|
2017-02-22 15:15:45 -08:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
f.close()
|
2017-02-22 15:15:45 -08:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
# Clean up the junk we added to sys.path.
|
|
|
|
sys.path.pop(-1)
|
|
|
|
ray.worker.cleanup()
|
2017-02-22 15:15:45 -08:00
|
|
|
|
2017-03-21 12:57:54 -07:00
|
|
|
|
2017-02-15 00:10:05 -08:00
|
|
|
class ActorTest(unittest.TestCase):
|
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
def testFailedActorInit(self):
|
|
|
|
ray.init(num_workers=0, driver_mode=ray.SILENT_MODE)
|
2017-02-15 00:10:05 -08:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
error_message1 = "actor constructor failed"
|
|
|
|
error_message2 = "actor method failed"
|
2017-03-21 12:57:54 -07:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
@ray.remote
|
|
|
|
class FailedActor(object):
|
|
|
|
def __init__(self):
|
|
|
|
raise Exception(error_message1)
|
2017-03-21 12:57:54 -07:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
def get_val(self):
|
|
|
|
return 1
|
2017-03-21 12:57:54 -07:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
def fail_method(self):
|
|
|
|
raise Exception(error_message2)
|
2017-02-15 00:10:05 -08:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
a = FailedActor.remote()
|
2017-02-15 00:10:05 -08:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
# Make sure that we get errors from a failed constructor.
|
|
|
|
wait_for_errors(b"task", 1)
|
|
|
|
self.assertEqual(len(ray.error_info()), 1)
|
|
|
|
self.assertIn(error_message1,
|
|
|
|
ray.error_info()[0][b"message"].decode("ascii"))
|
2017-02-15 00:10:05 -08:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
# Make sure that we get errors from a failed method.
|
|
|
|
a.fail_method.remote()
|
|
|
|
wait_for_errors(b"task", 2)
|
|
|
|
self.assertEqual(len(ray.error_info()), 2)
|
|
|
|
self.assertIn(error_message2,
|
|
|
|
ray.error_info()[1][b"message"].decode("ascii"))
|
2017-02-15 00:10:05 -08:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
ray.worker.cleanup()
|
2017-02-15 00:10:05 -08:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
def testIncorrectMethodCalls(self):
|
|
|
|
ray.init(num_workers=0, driver_mode=ray.SILENT_MODE)
|
2017-02-15 00:10:05 -08:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
@ray.remote
|
|
|
|
class Actor(object):
|
|
|
|
def __init__(self, missing_variable_name):
|
|
|
|
pass
|
2017-03-21 12:57:54 -07:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
def get_val(self, x):
|
|
|
|
pass
|
2017-02-15 00:10:05 -08:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
# Make sure that we get errors if we call the constructor incorrectly.
|
2017-02-15 00:10:05 -08:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
# Create an actor with too few arguments.
|
|
|
|
with self.assertRaises(Exception):
|
|
|
|
a = Actor.remote()
|
2017-02-15 00:10:05 -08:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
# Create an actor with too many arguments.
|
|
|
|
with self.assertRaises(Exception):
|
|
|
|
a = Actor.remote(1, 2)
|
2017-02-15 00:10:05 -08:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
# Create an actor the correct number of arguments.
|
|
|
|
a = Actor.remote(1)
|
2017-02-15 00:10:05 -08:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
# Call a method with too few arguments.
|
|
|
|
with self.assertRaises(Exception):
|
|
|
|
a.get_val.remote()
|
2017-02-15 00:10:05 -08:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
# Call a method with too many arguments.
|
|
|
|
with self.assertRaises(Exception):
|
|
|
|
a.get_val.remote(1, 2)
|
|
|
|
# Call a method that doesn't exist.
|
|
|
|
with self.assertRaises(AttributeError):
|
|
|
|
a.nonexistent_method()
|
|
|
|
with self.assertRaises(AttributeError):
|
|
|
|
a.nonexistent_method.remote()
|
2017-02-15 00:10:05 -08:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
ray.worker.cleanup()
|
2017-02-15 00:10:05 -08:00
|
|
|
|
2017-03-21 12:57:54 -07:00
|
|
|
|
2017-04-06 00:02:39 -07:00
|
|
|
class WorkerDeath(unittest.TestCase):
|
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
def testWorkerDying(self):
|
|
|
|
ray.init(num_workers=0, driver_mode=ray.SILENT_MODE)
|
2017-04-06 00:02:39 -07:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
# Define a remote function that will kill the worker that runs it.
|
|
|
|
@ray.remote
|
|
|
|
def f():
|
|
|
|
eval("exit()")
|
2017-04-06 00:02:39 -07:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
f.remote()
|
2017-04-06 00:02:39 -07:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
wait_for_errors(b"worker_died", 1)
|
2017-04-06 00:02:39 -07:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
self.assertEqual(len(ray.error_info()), 1)
|
|
|
|
self.assertIn("A worker died or was killed while executing a task.",
|
|
|
|
ray.error_info()[0][b"message"].decode("ascii"))
|
2017-04-06 00:02:39 -07:00
|
|
|
|
2017-07-13 14:53:57 -07:00
|
|
|
ray.worker.cleanup()
|
2017-04-06 00:02:39 -07:00
|
|
|
|
|
|
|
|
2017-07-31 17:57:48 -07:00
|
|
|
class PutErrorTest(unittest.TestCase):
|
|
|
|
|
|
|
|
def testPutError1(self):
|
|
|
|
store_size = 10 ** 6
|
|
|
|
ray.worker._init(start_ray_local=True, driver_mode=ray.SILENT_MODE,
|
|
|
|
object_store_memory=store_size)
|
|
|
|
|
|
|
|
num_objects = 3
|
|
|
|
object_size = 4 * 10 ** 5
|
|
|
|
|
|
|
|
# Define a task with a single dependency, a numpy array, that returns
|
|
|
|
# another array.
|
|
|
|
@ray.remote
|
|
|
|
def single_dependency(i, arg):
|
|
|
|
arg = np.copy(arg)
|
|
|
|
arg[0] = i
|
|
|
|
return arg
|
|
|
|
|
|
|
|
@ray.remote
|
|
|
|
def put_arg_task():
|
|
|
|
# Launch num_objects instances of the remote task, each dependent
|
|
|
|
# on the one before it. The result of the first task should get
|
|
|
|
# evicted.
|
|
|
|
args = []
|
|
|
|
arg = single_dependency.remote(0, np.zeros(object_size,
|
|
|
|
dtype=np.uint8))
|
|
|
|
for i in range(num_objects):
|
|
|
|
arg = single_dependency.remote(i, arg)
|
|
|
|
args.append(arg)
|
|
|
|
|
|
|
|
# Get the last value to force all tasks to finish.
|
|
|
|
value = ray.get(args[-1])
|
|
|
|
assert value[0] == i
|
|
|
|
|
|
|
|
# Get the first value (which should have been evicted) to force
|
|
|
|
# reconstruction. Currently, since we're not able to reconstruct
|
|
|
|
# `ray.put` objects that were evicted and whose originating tasks
|
|
|
|
# are still running, this for-loop should hang and push an error to
|
|
|
|
# the driver.
|
|
|
|
ray.get(args[0])
|
|
|
|
|
|
|
|
put_arg_task.remote()
|
|
|
|
|
|
|
|
# Make sure we receive the correct error message.
|
|
|
|
wait_for_errors(b"put_reconstruction", 1)
|
|
|
|
|
|
|
|
ray.worker.cleanup()
|
|
|
|
|
|
|
|
def testPutError2(self):
|
|
|
|
# This is the same as the previous test, but it calls ray.put directly.
|
|
|
|
store_size = 10 ** 6
|
|
|
|
ray.worker._init(start_ray_local=True, driver_mode=ray.SILENT_MODE,
|
|
|
|
object_store_memory=store_size)
|
|
|
|
|
|
|
|
num_objects = 3
|
|
|
|
object_size = 4 * 10 ** 5
|
|
|
|
|
|
|
|
# Define a task with a single dependency, a numpy array, that returns
|
|
|
|
# another array.
|
|
|
|
@ray.remote
|
|
|
|
def single_dependency(i, arg):
|
|
|
|
arg = np.copy(arg)
|
|
|
|
arg[0] = i
|
|
|
|
return arg
|
|
|
|
|
|
|
|
@ray.remote
|
|
|
|
def put_task():
|
|
|
|
# Launch num_objects instances of the remote task, each dependent
|
|
|
|
# on the one before it. The result of the first task should get
|
|
|
|
# evicted.
|
|
|
|
args = []
|
|
|
|
arg = ray.put(np.zeros(object_size, dtype=np.uint8))
|
|
|
|
for i in range(num_objects):
|
|
|
|
arg = single_dependency.remote(i, arg)
|
|
|
|
args.append(arg)
|
|
|
|
|
|
|
|
# Get the last value to force all tasks to finish.
|
|
|
|
value = ray.get(args[-1])
|
|
|
|
assert value[0] == i
|
|
|
|
|
|
|
|
# Get the first value (which should have been evicted) to force
|
|
|
|
# reconstruction. Currently, since we're not able to reconstruct
|
|
|
|
# `ray.put` objects that were evicted and whose originating tasks
|
|
|
|
# are still running, this for-loop should hang and push an error to
|
|
|
|
# the driver.
|
|
|
|
ray.get(args[0])
|
|
|
|
|
|
|
|
put_task.remote()
|
|
|
|
|
|
|
|
# Make sure we receive the correct error message.
|
|
|
|
wait_for_errors(b"put_reconstruction", 1)
|
|
|
|
|
|
|
|
ray.worker.cleanup()
|
|
|
|
|
|
|
|
|
2016-08-15 11:02:54 -07:00
|
|
|
if __name__ == "__main__":
|
2017-07-13 14:53:57 -07:00
|
|
|
unittest.main(verbosity=2)
|