from __future__ import absolute_import, division, print_function import os import subprocess import sys import tempfile import time import unittest import ray from ray.test.test_utils import run_and_get_output import pytest def run_string_as_driver(driver_script): """Run a driver as a separate process. Args: driver_script: A string to run as a Python script. Returns: The scripts output. """ # Save the driver script as a file so we can call it using subprocess. with tempfile.NamedTemporaryFile() as f: f.write(driver_script.encode("ascii")) f.flush() out = ray.utils.decode( subprocess.check_output([sys.executable, f.name])) return out class MultiNodeTest(unittest.TestCase): def setUp(self): out = run_and_get_output(["ray", "start", "--head"]) # Get the redis address from the output. redis_substring_prefix = "redis_address=\"" redis_address_location = ( out.find(redis_substring_prefix) + len(redis_substring_prefix)) redis_address = out[redis_address_location:] self.redis_address = redis_address.split("\"")[0] def tearDown(self): ray.shutdown() # Kill the Ray cluster. subprocess.Popen(["ray", "stop"]).wait() def testErrorIsolation(self): # Connect a driver to the Ray cluster. ray.init(redis_address=self.redis_address, driver_mode=ray.SILENT_MODE) # There shouldn't be any errors yet. assert len(ray.error_info()) == 0 error_string1 = "error_string1" error_string2 = "error_string2" @ray.remote def f(): raise Exception(error_string1) # Run a remote function that throws an error. with pytest.raises(Exception): ray.get(f.remote()) # Wait for the error to appear in Redis. while len(ray.error_info()) != 1: time.sleep(0.1) print("Waiting for error to appear.") # Make sure we got the error. assert len(ray.error_info()) == 1 assert error_string1 in ray.error_info()[0]["message"] # Start another driver and make sure that it does not receive this # error. Make the other driver throw an error, and make sure it # receives that error. driver_script = """ import ray import time ray.init(redis_address="{}") time.sleep(1) assert len(ray.error_info()) == 0 @ray.remote def f(): raise Exception("{}") try: ray.get(f.remote()) except Exception as e: pass while len(ray.error_info()) != 1: print(len(ray.error_info())) time.sleep(0.1) assert len(ray.error_info()) == 1 assert "{}" in ray.error_info()[0]["message"] print("success") """.format(self.redis_address, error_string2, error_string2) out = run_string_as_driver(driver_script) # Make sure the other driver succeeded. assert "success" in out # Make sure that the other error message doesn't show up for this # driver. assert len(ray.error_info()) == 1 assert error_string1 in ray.error_info()[0]["message"] def testRemoteFunctionIsolation(self): # This test will run multiple remote functions with the same names in # two different drivers. Connect a driver to the Ray cluster. ray.init(redis_address=self.redis_address, driver_mode=ray.SILENT_MODE) # Start another driver and make sure that it can define and call its # own commands with the same names. driver_script = """ import ray import time ray.init(redis_address="{}") @ray.remote def f(): return 3 @ray.remote def g(x, y): return 4 for _ in range(10000): result = ray.get([f.remote(), g.remote(0, 0)]) assert result == [3, 4] print("success") """.format(self.redis_address) out = run_string_as_driver(driver_script) @ray.remote def f(): return 1 @ray.remote def g(x): return 2 for _ in range(10000): result = ray.get([f.remote(), g.remote(0)]) assert result == [1, 2] # Make sure the other driver succeeded. assert "success" in out @unittest.skipIf( os.environ.get("RAY_USE_XRAY") == "1", "This test does not work with xray yet.") def testDriverExitingQuickly(self): # This test will create some drivers that submit some tasks and then # exit without waiting for the tasks to complete. ray.init(redis_address=self.redis_address, driver_mode=ray.SILENT_MODE) # Define a driver that creates an actor and exits. driver_script1 = """ import ray ray.init(redis_address="{}") @ray.remote class Foo(object): def __init__(self): pass Foo.remote() print("success") """.format(self.redis_address) # Define a driver that creates some tasks and exits. driver_script2 = """ import ray ray.init(redis_address="{}") @ray.remote def f(): return 1 f.remote() print("success") """.format(self.redis_address) # Create some drivers and let them exit and make sure everything is # still alive. for _ in range(3): out = run_string_as_driver(driver_script1) # Make sure the first driver ran to completion. assert "success" in out out = run_string_as_driver(driver_script2) # Make sure the first driver ran to completion. assert "success" in out assert ray.services.all_processes_alive() class StartRayScriptTest(unittest.TestCase): def testCallingStartRayHead(self): # Test that we can call start-ray.sh with various command line # parameters. TODO(rkn): This test only tests the --head code path. We # should also test the non-head node code path. # Test starting Ray with no arguments. run_and_get_output(["ray", "start", "--head"]) subprocess.Popen(["ray", "stop"]).wait() # Test starting Ray with a number of workers specified. run_and_get_output(["ray", "start", "--head", "--num-workers", "20"]) subprocess.Popen(["ray", "stop"]).wait() # Test starting Ray with a redis port specified. run_and_get_output(["ray", "start", "--head", "--redis-port", "6379"]) subprocess.Popen(["ray", "stop"]).wait() # Test starting Ray with a node IP address specified. run_and_get_output( ["ray", "start", "--head", "--node-ip-address", "127.0.0.1"]) subprocess.Popen(["ray", "stop"]).wait() # Test starting Ray with an object manager port specified. run_and_get_output( ["ray", "start", "--head", "--object-manager-port", "12345"]) subprocess.Popen(["ray", "stop"]).wait() # Test starting Ray with the number of CPUs specified. run_and_get_output(["ray", "start", "--head", "--num-cpus", "2"]) subprocess.Popen(["ray", "stop"]).wait() # Test starting Ray with the number of GPUs specified. run_and_get_output(["ray", "start", "--head", "--num-gpus", "100"]) subprocess.Popen(["ray", "stop"]).wait() # Test starting Ray with the max redis clients specified. run_and_get_output( ["ray", "start", "--head", "--redis-max-clients", "100"]) subprocess.Popen(["ray", "stop"]).wait() if "RAY_USE_NEW_GCS" not in os.environ: # Test starting Ray with redis shard ports specified. run_and_get_output([ "ray", "start", "--head", "--redis-shard-ports", "6380,6381,6382" ]) subprocess.Popen(["ray", "stop"]).wait() # Test starting Ray with all arguments specified. run_and_get_output([ "ray", "start", "--head", "--num-workers", "2", "--redis-port", "6379", "--redis-shard-ports", "6380,6381,6382", "--object-manager-port", "12345", "--num-cpus", "2", "--num-gpus", "0", "--redis-max-clients", "100", "--resources", "{\"Custom\": 1}" ]) subprocess.Popen(["ray", "stop"]).wait() # Test starting Ray with invalid arguments. with pytest.raises(Exception): run_and_get_output([ "ray", "start", "--head", "--redis-address", "127.0.0.1:6379" ]) subprocess.Popen(["ray", "stop"]).wait() def testUsingHostnames(self): # Start the Ray processes on this machine. run_and_get_output([ "ray", "start", "--head", "--node-ip-address=localhost", "--redis-port=6379" ]) ray.init(node_ip_address="localhost", redis_address="localhost:6379") @ray.remote def f(): return 1 assert ray.get(f.remote()) == 1 # Kill the Ray cluster. subprocess.Popen(["ray", "stop"]).wait() class MiscellaneousTest(unittest.TestCase): def tearDown(self): ray.shutdown() def testConnectingInLocalCase(self): address_info = ray.init(num_cpus=0) # Define a driver that just connects to Redis. driver_script = """ import ray ray.init(redis_address="{}") print("success") """.format(address_info["redis_address"]) out = run_string_as_driver(driver_script) # Make sure the other driver succeeded. assert "success" in out class RunDriverForMultipleTimesTest(unittest.TestCase): def tearDown(self): ray.shutdown() def testRunDriverForTwice(self): # We used to have issue 2165 and 2288: # https://github.com/ray-project/ray/issues/2165 # https://github.com/ray-project/ray/issues/2288 # both complain that driver will hang when run for the second time. # This test is used to verify the fix for above issue, it will run the # same driver for twice and verify whether both of them succeed. address_info = ray.init() driver_script = """ import ray import ray.tune as tune import os import time def train_func(config, reporter): # add a reporter arg for i in range(2): time.sleep(0.1) reporter(timesteps_total=i, mean_accuracy=i+97) # report metrics ray.init(redis_address="{}", driver_mode=ray.SILENT_MODE) ray.tune.register_trainable("train_func", train_func) tune.run_experiments({{ "my_experiment": {{ "run": "train_func", "stop": {{"mean_accuracy": 99}}, "config": {{ "layer1": {{ "class_name": tune.grid_search(["a"]), "config": {{"lr": tune.grid_search([1, 2])}} }}, }}, "local_dir": os.path.expanduser("~/tmp") }} }}) print("success") """.format(address_info["redis_address"]) for i in range(2): out = run_string_as_driver(driver_script) assert "success" in out if __name__ == "__main__": unittest.main(verbosity=2)