diff --git a/python/ray/experimental/workflow/tests/driver_terminated.py b/python/ray/experimental/workflow/tests/driver_terminated.py deleted file mode 100644 index ebd86d2c2..000000000 --- a/python/ray/experimental/workflow/tests/driver_terminated.py +++ /dev/null @@ -1,21 +0,0 @@ -import sys -import time -import ray -from ray.experimental import workflow - - -@workflow.step -def foo(x): - print("Executing", x) - time.sleep(1) - if x < 20: - return foo.step(x + 1) - else: - return 20 - - -if __name__ == "__main__": - sleep_duration = float(sys.argv[1]) - ray.init(address="auto", namespace="workflow") - wf = workflow.run(foo.step(0), workflow_id="driver_terminated") - time.sleep(sleep_duration) diff --git a/python/ray/experimental/workflow/tests/test_basic_workflows.py b/python/ray/experimental/workflow/tests/test_basic_workflows.py index 601cb9f36..9c16faf55 100644 --- a/python/ray/experimental/workflow/tests/test_basic_workflows.py +++ b/python/ray/experimental/workflow/tests/test_basic_workflows.py @@ -1,7 +1,8 @@ import time -import pytest +from ray.tests.conftest import * # noqa +import pytest import ray from ray.experimental import workflow from ray.experimental.workflow import workflow_access @@ -78,9 +79,24 @@ def blocking(): return 314 -def test_basic_workflows(): - ray.init(namespace="workflow") +@workflow.step +def mul(a, b): + return a * b + +@workflow.step +def factorial(n): + if n == 1: + return 1 + else: + return mul.step(n, factorial.step(n - 1)) + + +@pytest.mark.parametrize( + "ray_start_regular_shared", [{ + "namespace": "workflow" + }], indirect=True) +def test_basic_workflows(ray_start_regular_shared): output = workflow.run(simple_sequential.step()) assert ray.get(output) == "[source1][append1][append2]" @@ -96,20 +112,21 @@ def test_basic_workflows(): output = workflow.run(fork_join.step()) assert ray.get(output) == "join([source1][append1], [source1][append2])" - ray.shutdown() + outputs = workflow.run(factorial.step(10)) + assert ray.get(outputs) == 3628800 -def test_async_execution(): - ray.init(namespace="workflow") - +@pytest.mark.parametrize( + "ray_start_regular_shared", [{ + "namespace": "workflow" + }], indirect=True) +def test_async_execution(ray_start_regular_shared): start = time.time() output = workflow.run(blocking.step()) duration = time.time() - start assert duration < 5 # workflow.run is not blocked assert ray.get(output) == 314 - ray.shutdown() - @ray.remote def deep_nested(x): @@ -124,8 +141,11 @@ def _resolve_workflow_output(workflow_id: str, output: ray.ObjectRef): return output -def test_workflow_output_resolving(): - ray.init(namespace="workflow") +@pytest.mark.parametrize( + "ray_start_regular_shared", [{ + "namespace": "workflow" + }], indirect=True) +def test_workflow_output_resolving(ray_start_regular_shared): # deep nested workflow nested_ref = deep_nested.remote(30) original_func = workflow_access._resolve_workflow_output @@ -139,11 +159,13 @@ def test_workflow_output_resolving(): # restore the function workflow_access._resolve_workflow_output = original_func assert ray.get(ref) == 42 - ray.shutdown() -def test_run_or_resume_during_running(): - ray.init(namespace="workflow") +@pytest.mark.parametrize( + "ray_start_regular_shared", [{ + "namespace": "workflow" + }], indirect=True) +def test_run_or_resume_during_running(ray_start_regular_shared): output = workflow.run( simple_sequential.step(), workflow_id="running_workflow") @@ -153,4 +175,3 @@ def test_run_or_resume_during_running(): workflow.resume(workflow_id="running_workflow") assert ray.get(output) == "[source1][append1][append2]" - ray.shutdown() diff --git a/python/ray/experimental/workflow/tests/test_large_intermediate.py b/python/ray/experimental/workflow/tests/test_large_intermediate.py index e549644b9..258c56965 100644 --- a/python/ray/experimental/workflow/tests/test_large_intermediate.py +++ b/python/ray/experimental/workflow/tests/test_large_intermediate.py @@ -1,5 +1,9 @@ import time +from ray.tests.conftest import * # noqa + +import ray +import pytest import numpy as np from ray.experimental import workflow @@ -26,14 +30,14 @@ def simple_large_intermediate(): return average.step(y) -def test_simple_large_intermediate(): - import ray - ray.init(namespace="workflow") - +@pytest.mark.parametrize( + "ray_start_regular_shared", [{ + "namespace": "workflow" + }], indirect=True) +def test_simple_large_intermediate(ray_start_regular_shared): start = time.time() outputs = workflow.run(simple_large_intermediate.step()) outputs = ray.get(outputs) print(f"duration = {time.time() - start}") assert np.isclose(outputs, 8388607.5) - ray.shutdown() diff --git a/python/ray/experimental/workflow/tests/test_lifetime.py b/python/ray/experimental/workflow/tests/test_lifetime.py index 3657277d3..ffec01461 100644 --- a/python/ray/experimental/workflow/tests/test_lifetime.py +++ b/python/ray/experimental/workflow/tests/test_lifetime.py @@ -1,32 +1,47 @@ -import os -import subprocess -import sys +import ray import time -import pytest +from ray.test_utils import (run_string_as_driver_nonblocking, + run_string_as_driver) +from ray.tests.conftest import * # noqa +from ray.experimental import workflow + +driver_script = """ +import time import ray from ray.experimental import workflow -@pytest.mark.skip(reason="Blocked by issue #16951, where the exiting of " - "driver kills the task launched by a detached " - "named actor.") -def test_workflow_lifetime(): - subprocess.run(["ray start --head"], shell=True) +@workflow.step +def foo(x): time.sleep(1) - script = os.path.join( - os.path.abspath(os.path.dirname(__file__)), "driver_terminated.py") - sleep_duration = 5 - proc = subprocess.Popen([sys.executable, script, str(sleep_duration)]) - # TODO(suquark): also test killing the driver after fixing - # https://github.com/ray-project/ray/issues/16951 - # now we only let the driver exit normally - proc.wait() - time.sleep(1) - # connect to the cluster + if x < 20: + return foo.step(x + 1) + else: + return 20 + + +if __name__ == "__main__": + ray.init(address="auto", namespace="workflow") + wf = workflow.run(foo.step(0), workflow_id="driver_terminated") + time.sleep({}) +""" + + +def test_workflow_lifetime_1(call_ray_start): + # Case 1: driver exits normally + run_string_as_driver(driver_script.format(5)) ray.init(address="auto", namespace="workflow") output = workflow.get_output("driver_terminated") assert ray.get(output) == 20 - ray.shutdown() - subprocess.run(["ray stop"], shell=True) + + +def test_workflow_lifetime_2(call_ray_start): + # Case 2: driver terminated + proc = run_string_as_driver_nonblocking(driver_script.format(100)) + time.sleep(10) + proc.kill() time.sleep(1) + ray.init(address="auto", namespace="workflow") + output = workflow.get_output("driver_terminated") + assert ray.get(output) == 20 diff --git a/python/ray/experimental/workflow/tests/test_object_deref.py b/python/ray/experimental/workflow/tests/test_object_deref.py index 3d137cfef..551f12d6e 100644 --- a/python/ray/experimental/workflow/tests/test_object_deref.py +++ b/python/ray/experimental/workflow/tests/test_object_deref.py @@ -1,5 +1,7 @@ from typing import List, Dict +from ray.tests.conftest import * # noqa + import pytest import numpy as np @@ -68,19 +70,22 @@ def receive_data(data: np.ndarray): # TODO(suquark): Support ObjectRef checkpointing. -def test_objectref_inputs_exception(): - ray.init(namespace="workflow") - +@pytest.mark.parametrize( + "ray_start_regular_shared", [{ + "namespace": "workflow" + }], indirect=True) +def test_objectref_inputs_exception(ray_start_regular_shared): with pytest.raises(ValueError): output = workflow.run(receive_data.step(ray.put([42]))) assert ray.get(output) - ray.shutdown() @pytest.mark.skip(reason="no support for ObjectRef checkpointing yet") -def test_objectref_inputs(): - ray.init(namespace="workflow") - +@pytest.mark.parametrize( + "ray_start_regular_shared", [{ + "namespace": "workflow" + }], indirect=True) +def test_objectref_inputs(ray_start_regular_shared): output = workflow.run( deref_check.step( ray.put(42), nested_ref.remote(), [nested_ref.remote()], @@ -88,12 +93,13 @@ def test_objectref_inputs(): "output": nested_workflow.step(7) }])) assert ray.get(output) - ray.shutdown() -def test_object_deref(): - ray.init(namespace="workflow") - +@pytest.mark.parametrize( + "ray_start_regular_shared", [{ + "namespace": "workflow" + }], indirect=True) +def test_object_deref(ray_start_regular_shared): x = empty_list.step() output = workflow.run(deref_shared.step(x, x)) assert ray.get(output) @@ -111,5 +117,3 @@ def test_object_deref(): obj = return_data.step() arr: np.ndarray = ray.get(workflow.run(receive_data.step(obj))) assert np.array_equal(arr, np.ones(4096)) - - ray.shutdown() diff --git a/python/ray/experimental/workflow/tests/test_recovery.py b/python/ray/experimental/workflow/tests/test_recovery.py index f9f6fc24a..aa9300e8f 100644 --- a/python/ray/experimental/workflow/tests/test_recovery.py +++ b/python/ray/experimental/workflow/tests/test_recovery.py @@ -1,10 +1,11 @@ -import os import subprocess -import sys import time + +from ray.tests.conftest import * # noqa import pytest import ray +from ray.test_utils import run_string_as_driver_nonblocking from ray.exceptions import RaySystemError, RayTaskError from ray.experimental import workflow from ray.experimental.workflow.tests import utils @@ -58,8 +59,11 @@ def simple(x): return z -def test_recovery_simple(): - ray.init(namespace="workflow") +@pytest.mark.parametrize( + "ray_start_regular", [{ + "namespace": "workflow" + }], indirect=True) +def test_recovery_simple(ray_start_regular): utils.unset_global_mark() workflow_id = "test_recovery_simple" with pytest.raises(RaySystemError): @@ -73,11 +77,13 @@ def test_recovery_simple(): # resume from workflow output checkpoint output = workflow.resume(workflow_id) assert ray.get(output) == "foo(x[append1])[append2]" - ray.shutdown() -def test_recovery_complex(): - ray.init(namespace="workflow") +@pytest.mark.parametrize( + "ray_start_regular", [{ + "namespace": "workflow" + }], indirect=True) +def test_recovery_complex(ray_start_regular): utils.unset_global_mark() workflow_id = "test_recovery_complex" with pytest.raises(RaySystemError): @@ -93,22 +99,44 @@ def test_recovery_complex(): output = workflow.resume(workflow_id) r = "join(join(foo(x[append1]), [source1][append2]), join(x, [source1]))" assert ray.get(output) == r - ray.shutdown() -def test_recovery_non_exists_workflow(): - ray.init(namespace="workflow") +@pytest.mark.parametrize( + "ray_start_regular", [{ + "namespace": "workflow" + }], indirect=True) +def test_recovery_non_exists_workflow(ray_start_regular): with pytest.raises(RayTaskError): ray.get(workflow.resume("this_workflow_id_does_not_exist")) - ray.shutdown() + + +driver_script = """ +import time +import ray +from ray.experimental import workflow + + +@workflow.step +def foo(x): + print("Executing", x) + time.sleep(1) + if x < 20: + return foo.step(x + 1) + else: + return 20 + + +if __name__ == "__main__": + ray.init(address="auto", namespace="workflow") + wf = workflow.run(foo.step(0), workflow_id="cluster_failure") + assert ray.get(wf) == 20 +""" def test_recovery_cluster_failure(): subprocess.run(["ray start --head"], shell=True) time.sleep(1) - script = os.path.join( - os.path.abspath(os.path.dirname(__file__)), "workflows_to_fail.py") - proc = subprocess.Popen([sys.executable, script]) + proc = run_string_as_driver_nonblocking(driver_script) time.sleep(10) subprocess.run(["ray stop"], shell=True) proc.kill() @@ -126,12 +154,14 @@ def recursive_chain(x): return 100 -def test_shortcut(): - ray.init(namespace="workflow") +@pytest.mark.parametrize( + "ray_start_regular", [{ + "namespace": "workflow" + }], indirect=True) +def test_shortcut(ray_start_regular): output = workflow.run(recursive_chain.step(0), workflow_id="shortcut") assert ray.get(output) == 100 # the shortcut points to the step with output checkpoint store = workflow_storage.WorkflowStorage("shortcut") step_id = store.get_entrypoint_step_id() assert store.inspect_step(step_id).output_object_valid - ray.shutdown() diff --git a/python/ray/experimental/workflow/tests/test_recursion_factorial.py b/python/ray/experimental/workflow/tests/test_recursion_factorial.py deleted file mode 100644 index c8f241338..000000000 --- a/python/ray/experimental/workflow/tests/test_recursion_factorial.py +++ /dev/null @@ -1,28 +0,0 @@ -from ray.experimental import workflow - - -@workflow.step -def mul(a, b): - return a * b - - -@workflow.step -def factorial(n): - if n == 1: - return 1 - else: - return mul.step(n, factorial.step(n - 1)) - - -@workflow.step -def recursion_factorial(n): - return factorial.step(n) - - -def test_recursion_factorial(): - import ray - ray.init(namespace="workflow") - - outputs = workflow.run(recursion_factorial.step(10)) - assert ray.get(outputs) == 3628800 - ray.shutdown() diff --git a/python/ray/experimental/workflow/tests/test_signature_check.py b/python/ray/experimental/workflow/tests/test_signature_check.py index 71d3ce52b..ed7737d2b 100644 --- a/python/ray/experimental/workflow/tests/test_signature_check.py +++ b/python/ray/experimental/workflow/tests/test_signature_check.py @@ -1,6 +1,6 @@ import pytest -import ray +from ray.tests.conftest import * # noqa from ray.experimental import workflow @@ -9,9 +9,11 @@ def signature_check(a, b, c=1): pass -def test_signature_check(): - ray.init(namespace="workflow") - +@pytest.mark.parametrize( + "ray_start_regular", [{ + "namespace": "workflow" + }], indirect=True) +def test_signature_check(ray_start_regular): with pytest.raises(TypeError): signature_check(1, 2) @@ -31,5 +33,3 @@ def test_signature_check(): signature_check.step(1, 2, c=3) signature_check.step(1, b=2, c=3) signature_check.step(a=1, b=2, c=3) - - ray.shutdown() diff --git a/python/ray/experimental/workflow/tests/test_variable_mutable.py b/python/ray/experimental/workflow/tests/test_variable_mutable.py index 430bdce2a..cb5f8e4a0 100644 --- a/python/ray/experimental/workflow/tests/test_variable_mutable.py +++ b/python/ray/experimental/workflow/tests/test_variable_mutable.py @@ -1,3 +1,6 @@ +import ray +import pytest +from ray.tests.conftest import * # noqa from ray.experimental import workflow @@ -20,10 +23,10 @@ def variable_mutable(): return projection.step(a, b) -def test_variable_mutable(): - import ray - ray.init(namespace="workflow") - +@pytest.mark.parametrize( + "ray_start_regular", [{ + "namespace": "workflow" + }], indirect=True) +def test_variable_mutable(ray_start_regular): outputs = workflow.run(variable_mutable.step()) assert ray.get(outputs) == [] - ray.shutdown() diff --git a/python/ray/experimental/workflow/tests/test_virtual_actor.py b/python/ray/experimental/workflow/tests/test_virtual_actor.py index d070b1344..d7ef4b82e 100644 --- a/python/ray/experimental/workflow/tests/test_virtual_actor.py +++ b/python/ray/experimental/workflow/tests/test_virtual_actor.py @@ -1,9 +1,9 @@ import time import pytest - import ray +from ray.tests.conftest import * # noqa from ray.experimental import workflow from ray.experimental.workflow import virtual_actor @@ -36,8 +36,14 @@ def init_virtual_actor(x): return x -def test_readonly_actor(): - ray.init(namespace="workflow") +@pytest.mark.parametrize( + "ray_start_regular", + [{ + "namespace": "workflow", + "num_cpus": 4 # We need more CPUs for concurrency + }], + indirect=True) +def test_readonly_actor(ray_start_regular): actor = Counter.options(actor_id="Counter").create(42) ray.get(actor.ready()) assert ray.get(actor.get.options(readonly=True).run()) == 42 @@ -54,7 +60,6 @@ def test_readonly_actor(): ray.get([readonly_actor.workload.run() for _ in range(10)]) end = time.time() assert end - start < 5 - ray.shutdown() @workflow.actor @@ -73,11 +78,16 @@ class SlowInit: self.x = state -def test_actor_ready(): - ray.init(namespace="workflow") +@pytest.mark.parametrize( + "ray_start_regular", + [{ + "namespace": "workflow", + "num_cpus": 4 # We need more CPUs, otherwise 'create()' blocks 'get()' + }], + indirect=True) +def test_actor_ready(ray_start_regular): actor = SlowInit.options(actor_id="SlowInit").create(42) with pytest.raises(virtual_actor.VirtualActorNotInitializedError): ray.get(actor.get.options(readonly=True).run()) ray.get(actor.ready()) assert ray.get(actor.get.options(readonly=True).run()) == 42 - ray.shutdown() diff --git a/python/ray/experimental/workflow/tests/workflows_to_fail.py b/python/ray/experimental/workflow/tests/workflows_to_fail.py deleted file mode 100644 index f0f0b4cc7..000000000 --- a/python/ray/experimental/workflow/tests/workflows_to_fail.py +++ /dev/null @@ -1,19 +0,0 @@ -import time -import ray -from ray.experimental import workflow - - -@workflow.step -def foo(x): - print("Executing", x) - time.sleep(1) - if x < 20: - return foo.step(x + 1) - else: - return 20 - - -if __name__ == "__main__": - ray.init(address="auto", namespace="workflow") - wf = workflow.run(foo.step(0), workflow_id="cluster_failure") - assert ray.get(wf) == 20