From 37dafa4d1455c65e02b87cfa2340904a858f54a1 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Mon, 31 Jul 2017 17:57:48 -0700 Subject: [PATCH] Simplify put test and move it to failure tests. (#788) --- python/ray/services.py | 16 +++++-- python/ray/worker.py | 7 +++ test/failure_test.py | 96 ++++++++++++++++++++++++++++++++++++++++++ test/stress_tests.py | 91 --------------------------------------- 4 files changed, 115 insertions(+), 95 deletions(-) diff --git a/python/ray/services.py b/python/ray/services.py index 4076ebfb4..1b3cfd66e 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -565,8 +565,8 @@ def start_local_scheduler(redis_address, def start_objstore(node_ip_address, redis_address, object_manager_port=None, store_stdout_file=None, store_stderr_file=None, manager_stdout_file=None, - manager_stderr_file=None, cleanup=True, - objstore_memory=None): + manager_stderr_file=None, objstore_memory=None, + cleanup=True): """This method starts an object store process. Args: @@ -585,11 +585,11 @@ def start_objstore(node_ip_address, redis_address, manager_stderr_file: A file handle opened for writing to redirect stderr to. If no redirection should happen, then this should be None. + objstore_memory: The amount of memory (in bytes) to start the object + store with. cleanup (bool): True if using Ray in local mode. If cleanup is true, then this process will be killed by serices.cleanup() when the Python process that imported services exits. - objstore_memory: The amount of memory (in bytes) to start the object - store with. Return: A tuple of the Plasma store socket name, the Plasma manager socket @@ -734,6 +734,7 @@ def start_ray_processes(address_info=None, redis_port=None, num_workers=None, num_local_schedulers=1, + object_store_memory=None, num_redis_shards=1, worker_path=None, cleanup=True, @@ -762,6 +763,8 @@ def start_ray_processes(address_info=None, stores until there are num_local_schedulers existing instances of each, including ones already registered with the given address_info. + object_store_memory: The amount of memory (in bytes) to start the + object store with. num_redis_shards: The number of Redis shards to start in addition to the primary Redis shard. worker_path (str): The path of the source code that will be run by the @@ -895,6 +898,7 @@ def start_ray_processes(address_info=None, store_stderr_file=plasma_store_stderr_file, manager_stdout_file=plasma_manager_stdout_file, manager_stderr_file=plasma_manager_stderr_file, + objstore_memory=object_store_memory, cleanup=cleanup) object_store_addresses.append(object_store_address) time.sleep(0.1) @@ -1026,6 +1030,7 @@ def start_ray_head(address_info=None, redis_port=None, num_workers=0, num_local_schedulers=1, + object_store_memory=None, worker_path=None, cleanup=True, redirect_output=False, @@ -1051,6 +1056,8 @@ def start_ray_head(address_info=None, stores until there are at least num_local_schedulers existing instances of each, including ones already registered with the given address_info. + object_store_memory: The amount of memory (in bytes) to start the + object store with. worker_path (str): The path of the source code that will be run by the worker. cleanup (bool): If cleanup is true, then the processes started here @@ -1077,6 +1084,7 @@ def start_ray_head(address_info=None, redis_port=redis_port, num_workers=num_workers, num_local_schedulers=num_local_schedulers, + object_store_memory=object_store_memory, worker_path=worker_path, cleanup=cleanup, redirect_output=redirect_output, diff --git a/python/ray/worker.py b/python/ray/worker.py index 5373530cd..9a1b8036e 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -814,6 +814,7 @@ def _init(address_info=None, object_id_seed=None, num_workers=None, num_local_schedulers=None, + object_store_memory=None, driver_mode=SCRIPT_MODE, redirect_output=False, start_workers_from_local_scheduler=True, @@ -845,6 +846,8 @@ def _init(address_info=None, provided if start_ray_local is True. num_local_schedulers (int): The number of local schedulers to start. This is only provided if start_ray_local is True. + object_store_memory: The amount of memory (in bytes) to start the + object store with. driver_mode (bool): The mode in which to start the driver. This should be one of ray.SCRIPT_MODE, ray.PYTHON_MODE, and ray.SILENT_MODE. redirect_output (bool): True if stdout and stderr for all the processes @@ -909,6 +912,7 @@ def _init(address_info=None, node_ip_address=node_ip_address, num_workers=num_workers, num_local_schedulers=num_local_schedulers, + object_store_memory=object_store_memory, redirect_output=redirect_output, start_workers_from_local_scheduler=( start_workers_from_local_scheduler), @@ -931,6 +935,9 @@ def _init(address_info=None, if num_redis_shards is not None: raise Exception("When connecting to an existing cluster, " "num_redis_shards must not be provided.") + if object_store_memory is not None: + raise Exception("When connecting to an existing cluster, " + "object_store_memory must not be provided.") # Get the node IP address if one is not provided. if node_ip_address is None: node_ip_address = services.get_node_ip_address(redis_address) diff --git a/test/failure_test.py b/test/failure_test.py index e336fa0be..8e55e825d 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -2,6 +2,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import numpy as np import os import ray import sys @@ -277,5 +278,100 @@ class WorkerDeath(unittest.TestCase): ray.worker.cleanup() +class PutErrorTest(unittest.TestCase): + + def testPutError1(self): + store_size = 10 ** 6 + ray.worker._init(start_ray_local=True, driver_mode=ray.SILENT_MODE, + object_store_memory=store_size) + + num_objects = 3 + object_size = 4 * 10 ** 5 + + # Define a task with a single dependency, a numpy array, that returns + # another array. + @ray.remote + def single_dependency(i, arg): + arg = np.copy(arg) + arg[0] = i + return arg + + @ray.remote + def put_arg_task(): + # Launch num_objects instances of the remote task, each dependent + # on the one before it. The result of the first task should get + # evicted. + args = [] + arg = single_dependency.remote(0, np.zeros(object_size, + dtype=np.uint8)) + for i in range(num_objects): + arg = single_dependency.remote(i, arg) + args.append(arg) + + # Get the last value to force all tasks to finish. + value = ray.get(args[-1]) + assert value[0] == i + + # Get the first value (which should have been evicted) to force + # reconstruction. Currently, since we're not able to reconstruct + # `ray.put` objects that were evicted and whose originating tasks + # are still running, this for-loop should hang and push an error to + # the driver. + ray.get(args[0]) + + put_arg_task.remote() + + # Make sure we receive the correct error message. + wait_for_errors(b"put_reconstruction", 1) + + ray.worker.cleanup() + + def testPutError2(self): + # This is the same as the previous test, but it calls ray.put directly. + store_size = 10 ** 6 + ray.worker._init(start_ray_local=True, driver_mode=ray.SILENT_MODE, + object_store_memory=store_size) + + num_objects = 3 + object_size = 4 * 10 ** 5 + + # Define a task with a single dependency, a numpy array, that returns + # another array. + @ray.remote + def single_dependency(i, arg): + arg = np.copy(arg) + arg[0] = i + return arg + + @ray.remote + def put_task(): + # Launch num_objects instances of the remote task, each dependent + # on the one before it. The result of the first task should get + # evicted. + args = [] + arg = ray.put(np.zeros(object_size, dtype=np.uint8)) + for i in range(num_objects): + arg = single_dependency.remote(i, arg) + args.append(arg) + + # Get the last value to force all tasks to finish. + value = ray.get(args[-1]) + assert value[0] == i + + # Get the first value (which should have been evicted) to force + # reconstruction. Currently, since we're not able to reconstruct + # `ray.put` objects that were evicted and whose originating tasks + # are still running, this for-loop should hang and push an error to + # the driver. + ray.get(args[0]) + + put_task.remote() + + # Make sure we receive the correct error message. + wait_for_errors(b"put_reconstruction", 1) + + ray.worker.cleanup() + + if __name__ == "__main__": unittest.main(verbosity=2) diff --git a/test/stress_tests.py b/test/stress_tests.py index 37f6b969e..b59f5e2fa 100644 --- a/test/stress_tests.py +++ b/test/stress_tests.py @@ -425,97 +425,6 @@ class ReconstructionTests(unittest.TestCase): self.assertTrue(all(error[b"data"] == b"__main__.foo" for error in errors)) - def testPutErrors(self): - # Define the size of one task's return argument so that the combined - # sum of all objects' sizes is at least twice the plasma stores' - # combined allotted memory. - num_objects = 1000 - size = self.plasma_store_memory * 2 // (num_objects * 8) - - # Define a task with a single dependency, a numpy array, that returns - # another array. - @ray.remote - def single_dependency(i, arg): - arg = np.copy(arg) - arg[0] = i - return arg - - # Define a root task that calls `ray.put` to put an argument in the - # object store. - @ray.remote - def put_arg_task(size): - # Launch num_objects instances of the remote task, each dependent - # on the one before it. The first instance of the task takes a - # numpy array as an argument, which is put into the object store. - args = [] - arg = single_dependency.remote(0, np.zeros(size)) - for i in range(num_objects): - arg = single_dependency.remote(i, arg) - args.append(arg) - - # Get each value to force each task to finish. After some number of - # gets, old values should be evicted. - for i in range(num_objects): - value = ray.get(args[i]) - self.assertEqual(value[0], i) - # Get each value again to force reconstruction. Currently, since - # we're not able to reconstruct `ray.put` objects that were evicted - # and whose originating tasks are still running, this for-loop - # should hang on its first iteration and push an error to the - # driver. - for i in range(num_objects): - value = ray.get(args[i]) - self.assertEqual(value[0], i) - - # Define a root task that calls `ray.put` directly. - @ray.remote - def put_task(size): - # Launch num_objects instances of the remote task, each dependent - # on the one before it. The first instance of the task takes an - # object ID returned by ray.put. - args = [] - arg = ray.put(np.zeros(size)) - for i in range(num_objects): - arg = single_dependency.remote(i, arg) - args.append(arg) - - # Get each value to force each task to finish. After some number of - # gets, old values should be evicted. - for i in range(num_objects): - value = ray.get(args[i]) - self.assertEqual(value[0], i) - # Get each value again to force reconstruction. Currently, since - # we're not able to reconstruct `ray.put` objects that were evicted - # and whose originating tasks are still running, this for-loop - # should hang on its first iteration and push an error to the - # driver. - for i in range(num_objects): - value = ray.get(args[i]) - self.assertEqual(value[0], i) - - put_arg_task.remote(size) - - def error_check(errors): - return len(errors) > 1 - errors = self.wait_for_errors(error_check) - # Make sure all the errors have the correct type. - self.assertTrue(all(error[b"type"] == b"put_reconstruction" - for error in errors)) - self.assertTrue(all(error[b"data"] == b"__main__.put_arg_task" - for error in errors)) - - put_task.remote(size) - - def error_check(errors): - return any(error[b"data"] == b"__main__.put_task" - for error in errors) - errors = self.wait_for_errors(error_check) - # Make sure all the errors have the correct type. - self.assertTrue(all(error[b"type"] == b"put_reconstruction" - for error in errors)) - self.assertTrue(any(error[b"data"] == b"__main__.put_task" - for error in errors)) - def testDriverPutErrors(self): # Define the size of one task's return argument so that the combined # sum of all objects' sizes is at least twice the plasma stores'