[Workflow] Update tests (#17147)

* update workflow tests

* use conftest

* use conftest

* use conftest
This commit is contained in:
Siyuan (Ryans) Zhuang 2021-07-16 09:25:40 -07:00 committed by GitHub
parent 40f1ee6e1b
commit fd3742bb63
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 176 additions and 157 deletions

View file

@ -1,21 +0,0 @@
import sys
import time
import ray
from ray.experimental import workflow
@workflow.step
def foo(x):
print("Executing", x)
time.sleep(1)
if x < 20:
return foo.step(x + 1)
else:
return 20
if __name__ == "__main__":
sleep_duration = float(sys.argv[1])
ray.init(address="auto", namespace="workflow")
wf = workflow.run(foo.step(0), workflow_id="driver_terminated")
time.sleep(sleep_duration)

View file

@ -1,7 +1,8 @@
import time import time
import pytest from ray.tests.conftest import * # noqa
import pytest
import ray import ray
from ray.experimental import workflow from ray.experimental import workflow
from ray.experimental.workflow import workflow_access from ray.experimental.workflow import workflow_access
@ -78,9 +79,24 @@ def blocking():
return 314 return 314
def test_basic_workflows(): @workflow.step
ray.init(namespace="workflow") def mul(a, b):
return a * b
@workflow.step
def factorial(n):
if n == 1:
return 1
else:
return mul.step(n, factorial.step(n - 1))
@pytest.mark.parametrize(
"ray_start_regular_shared", [{
"namespace": "workflow"
}], indirect=True)
def test_basic_workflows(ray_start_regular_shared):
output = workflow.run(simple_sequential.step()) output = workflow.run(simple_sequential.step())
assert ray.get(output) == "[source1][append1][append2]" assert ray.get(output) == "[source1][append1][append2]"
@ -96,20 +112,21 @@ def test_basic_workflows():
output = workflow.run(fork_join.step()) output = workflow.run(fork_join.step())
assert ray.get(output) == "join([source1][append1], [source1][append2])" assert ray.get(output) == "join([source1][append1], [source1][append2])"
ray.shutdown() outputs = workflow.run(factorial.step(10))
assert ray.get(outputs) == 3628800
def test_async_execution(): @pytest.mark.parametrize(
ray.init(namespace="workflow") "ray_start_regular_shared", [{
"namespace": "workflow"
}], indirect=True)
def test_async_execution(ray_start_regular_shared):
start = time.time() start = time.time()
output = workflow.run(blocking.step()) output = workflow.run(blocking.step())
duration = time.time() - start duration = time.time() - start
assert duration < 5 # workflow.run is not blocked assert duration < 5 # workflow.run is not blocked
assert ray.get(output) == 314 assert ray.get(output) == 314
ray.shutdown()
@ray.remote @ray.remote
def deep_nested(x): def deep_nested(x):
@ -124,8 +141,11 @@ def _resolve_workflow_output(workflow_id: str, output: ray.ObjectRef):
return output return output
def test_workflow_output_resolving(): @pytest.mark.parametrize(
ray.init(namespace="workflow") "ray_start_regular_shared", [{
"namespace": "workflow"
}], indirect=True)
def test_workflow_output_resolving(ray_start_regular_shared):
# deep nested workflow # deep nested workflow
nested_ref = deep_nested.remote(30) nested_ref = deep_nested.remote(30)
original_func = workflow_access._resolve_workflow_output original_func = workflow_access._resolve_workflow_output
@ -139,11 +159,13 @@ def test_workflow_output_resolving():
# restore the function # restore the function
workflow_access._resolve_workflow_output = original_func workflow_access._resolve_workflow_output = original_func
assert ray.get(ref) == 42 assert ray.get(ref) == 42
ray.shutdown()
def test_run_or_resume_during_running(): @pytest.mark.parametrize(
ray.init(namespace="workflow") "ray_start_regular_shared", [{
"namespace": "workflow"
}], indirect=True)
def test_run_or_resume_during_running(ray_start_regular_shared):
output = workflow.run( output = workflow.run(
simple_sequential.step(), workflow_id="running_workflow") simple_sequential.step(), workflow_id="running_workflow")
@ -153,4 +175,3 @@ def test_run_or_resume_during_running():
workflow.resume(workflow_id="running_workflow") workflow.resume(workflow_id="running_workflow")
assert ray.get(output) == "[source1][append1][append2]" assert ray.get(output) == "[source1][append1][append2]"
ray.shutdown()

View file

@ -1,5 +1,9 @@
import time import time
from ray.tests.conftest import * # noqa
import ray
import pytest
import numpy as np import numpy as np
from ray.experimental import workflow from ray.experimental import workflow
@ -26,14 +30,14 @@ def simple_large_intermediate():
return average.step(y) return average.step(y)
def test_simple_large_intermediate(): @pytest.mark.parametrize(
import ray "ray_start_regular_shared", [{
ray.init(namespace="workflow") "namespace": "workflow"
}], indirect=True)
def test_simple_large_intermediate(ray_start_regular_shared):
start = time.time() start = time.time()
outputs = workflow.run(simple_large_intermediate.step()) outputs = workflow.run(simple_large_intermediate.step())
outputs = ray.get(outputs) outputs = ray.get(outputs)
print(f"duration = {time.time() - start}") print(f"duration = {time.time() - start}")
assert np.isclose(outputs, 8388607.5) assert np.isclose(outputs, 8388607.5)
ray.shutdown()

View file

@ -1,32 +1,47 @@
import os import ray
import subprocess
import sys
import time import time
import pytest from ray.test_utils import (run_string_as_driver_nonblocking,
run_string_as_driver)
from ray.tests.conftest import * # noqa
from ray.experimental import workflow
driver_script = """
import time
import ray import ray
from ray.experimental import workflow from ray.experimental import workflow
@pytest.mark.skip(reason="Blocked by issue #16951, where the exiting of " @workflow.step
"driver kills the task launched by a detached " def foo(x):
"named actor.")
def test_workflow_lifetime():
subprocess.run(["ray start --head"], shell=True)
time.sleep(1) time.sleep(1)
script = os.path.join( if x < 20:
os.path.abspath(os.path.dirname(__file__)), "driver_terminated.py") return foo.step(x + 1)
sleep_duration = 5 else:
proc = subprocess.Popen([sys.executable, script, str(sleep_duration)]) return 20
# TODO(suquark): also test killing the driver after fixing
# https://github.com/ray-project/ray/issues/16951
# now we only let the driver exit normally if __name__ == "__main__":
proc.wait() ray.init(address="auto", namespace="workflow")
time.sleep(1) wf = workflow.run(foo.step(0), workflow_id="driver_terminated")
# connect to the cluster time.sleep({})
"""
def test_workflow_lifetime_1(call_ray_start):
# Case 1: driver exits normally
run_string_as_driver(driver_script.format(5))
ray.init(address="auto", namespace="workflow") ray.init(address="auto", namespace="workflow")
output = workflow.get_output("driver_terminated") output = workflow.get_output("driver_terminated")
assert ray.get(output) == 20 assert ray.get(output) == 20
ray.shutdown()
subprocess.run(["ray stop"], shell=True)
def test_workflow_lifetime_2(call_ray_start):
# Case 2: driver terminated
proc = run_string_as_driver_nonblocking(driver_script.format(100))
time.sleep(10)
proc.kill()
time.sleep(1) time.sleep(1)
ray.init(address="auto", namespace="workflow")
output = workflow.get_output("driver_terminated")
assert ray.get(output) == 20

View file

@ -1,5 +1,7 @@
from typing import List, Dict from typing import List, Dict
from ray.tests.conftest import * # noqa
import pytest import pytest
import numpy as np import numpy as np
@ -68,19 +70,22 @@ def receive_data(data: np.ndarray):
# TODO(suquark): Support ObjectRef checkpointing. # TODO(suquark): Support ObjectRef checkpointing.
def test_objectref_inputs_exception(): @pytest.mark.parametrize(
ray.init(namespace="workflow") "ray_start_regular_shared", [{
"namespace": "workflow"
}], indirect=True)
def test_objectref_inputs_exception(ray_start_regular_shared):
with pytest.raises(ValueError): with pytest.raises(ValueError):
output = workflow.run(receive_data.step(ray.put([42]))) output = workflow.run(receive_data.step(ray.put([42])))
assert ray.get(output) assert ray.get(output)
ray.shutdown()
@pytest.mark.skip(reason="no support for ObjectRef checkpointing yet") @pytest.mark.skip(reason="no support for ObjectRef checkpointing yet")
def test_objectref_inputs(): @pytest.mark.parametrize(
ray.init(namespace="workflow") "ray_start_regular_shared", [{
"namespace": "workflow"
}], indirect=True)
def test_objectref_inputs(ray_start_regular_shared):
output = workflow.run( output = workflow.run(
deref_check.step( deref_check.step(
ray.put(42), nested_ref.remote(), [nested_ref.remote()], ray.put(42), nested_ref.remote(), [nested_ref.remote()],
@ -88,12 +93,13 @@ def test_objectref_inputs():
"output": nested_workflow.step(7) "output": nested_workflow.step(7)
}])) }]))
assert ray.get(output) assert ray.get(output)
ray.shutdown()
def test_object_deref(): @pytest.mark.parametrize(
ray.init(namespace="workflow") "ray_start_regular_shared", [{
"namespace": "workflow"
}], indirect=True)
def test_object_deref(ray_start_regular_shared):
x = empty_list.step() x = empty_list.step()
output = workflow.run(deref_shared.step(x, x)) output = workflow.run(deref_shared.step(x, x))
assert ray.get(output) assert ray.get(output)
@ -111,5 +117,3 @@ def test_object_deref():
obj = return_data.step() obj = return_data.step()
arr: np.ndarray = ray.get(workflow.run(receive_data.step(obj))) arr: np.ndarray = ray.get(workflow.run(receive_data.step(obj)))
assert np.array_equal(arr, np.ones(4096)) assert np.array_equal(arr, np.ones(4096))
ray.shutdown()

View file

@ -1,10 +1,11 @@
import os
import subprocess import subprocess
import sys
import time import time
from ray.tests.conftest import * # noqa
import pytest import pytest
import ray import ray
from ray.test_utils import run_string_as_driver_nonblocking
from ray.exceptions import RaySystemError, RayTaskError from ray.exceptions import RaySystemError, RayTaskError
from ray.experimental import workflow from ray.experimental import workflow
from ray.experimental.workflow.tests import utils from ray.experimental.workflow.tests import utils
@ -58,8 +59,11 @@ def simple(x):
return z return z
def test_recovery_simple(): @pytest.mark.parametrize(
ray.init(namespace="workflow") "ray_start_regular", [{
"namespace": "workflow"
}], indirect=True)
def test_recovery_simple(ray_start_regular):
utils.unset_global_mark() utils.unset_global_mark()
workflow_id = "test_recovery_simple" workflow_id = "test_recovery_simple"
with pytest.raises(RaySystemError): with pytest.raises(RaySystemError):
@ -73,11 +77,13 @@ def test_recovery_simple():
# resume from workflow output checkpoint # resume from workflow output checkpoint
output = workflow.resume(workflow_id) output = workflow.resume(workflow_id)
assert ray.get(output) == "foo(x[append1])[append2]" assert ray.get(output) == "foo(x[append1])[append2]"
ray.shutdown()
def test_recovery_complex(): @pytest.mark.parametrize(
ray.init(namespace="workflow") "ray_start_regular", [{
"namespace": "workflow"
}], indirect=True)
def test_recovery_complex(ray_start_regular):
utils.unset_global_mark() utils.unset_global_mark()
workflow_id = "test_recovery_complex" workflow_id = "test_recovery_complex"
with pytest.raises(RaySystemError): with pytest.raises(RaySystemError):
@ -93,22 +99,44 @@ def test_recovery_complex():
output = workflow.resume(workflow_id) output = workflow.resume(workflow_id)
r = "join(join(foo(x[append1]), [source1][append2]), join(x, [source1]))" r = "join(join(foo(x[append1]), [source1][append2]), join(x, [source1]))"
assert ray.get(output) == r assert ray.get(output) == r
ray.shutdown()
def test_recovery_non_exists_workflow(): @pytest.mark.parametrize(
ray.init(namespace="workflow") "ray_start_regular", [{
"namespace": "workflow"
}], indirect=True)
def test_recovery_non_exists_workflow(ray_start_regular):
with pytest.raises(RayTaskError): with pytest.raises(RayTaskError):
ray.get(workflow.resume("this_workflow_id_does_not_exist")) ray.get(workflow.resume("this_workflow_id_does_not_exist"))
ray.shutdown()
driver_script = """
import time
import ray
from ray.experimental import workflow
@workflow.step
def foo(x):
print("Executing", x)
time.sleep(1)
if x < 20:
return foo.step(x + 1)
else:
return 20
if __name__ == "__main__":
ray.init(address="auto", namespace="workflow")
wf = workflow.run(foo.step(0), workflow_id="cluster_failure")
assert ray.get(wf) == 20
"""
def test_recovery_cluster_failure(): def test_recovery_cluster_failure():
subprocess.run(["ray start --head"], shell=True) subprocess.run(["ray start --head"], shell=True)
time.sleep(1) time.sleep(1)
script = os.path.join( proc = run_string_as_driver_nonblocking(driver_script)
os.path.abspath(os.path.dirname(__file__)), "workflows_to_fail.py")
proc = subprocess.Popen([sys.executable, script])
time.sleep(10) time.sleep(10)
subprocess.run(["ray stop"], shell=True) subprocess.run(["ray stop"], shell=True)
proc.kill() proc.kill()
@ -126,12 +154,14 @@ def recursive_chain(x):
return 100 return 100
def test_shortcut(): @pytest.mark.parametrize(
ray.init(namespace="workflow") "ray_start_regular", [{
"namespace": "workflow"
}], indirect=True)
def test_shortcut(ray_start_regular):
output = workflow.run(recursive_chain.step(0), workflow_id="shortcut") output = workflow.run(recursive_chain.step(0), workflow_id="shortcut")
assert ray.get(output) == 100 assert ray.get(output) == 100
# the shortcut points to the step with output checkpoint # the shortcut points to the step with output checkpoint
store = workflow_storage.WorkflowStorage("shortcut") store = workflow_storage.WorkflowStorage("shortcut")
step_id = store.get_entrypoint_step_id() step_id = store.get_entrypoint_step_id()
assert store.inspect_step(step_id).output_object_valid assert store.inspect_step(step_id).output_object_valid
ray.shutdown()

View file

@ -1,28 +0,0 @@
from ray.experimental import workflow
@workflow.step
def mul(a, b):
return a * b
@workflow.step
def factorial(n):
if n == 1:
return 1
else:
return mul.step(n, factorial.step(n - 1))
@workflow.step
def recursion_factorial(n):
return factorial.step(n)
def test_recursion_factorial():
import ray
ray.init(namespace="workflow")
outputs = workflow.run(recursion_factorial.step(10))
assert ray.get(outputs) == 3628800
ray.shutdown()

View file

@ -1,6 +1,6 @@
import pytest import pytest
import ray from ray.tests.conftest import * # noqa
from ray.experimental import workflow from ray.experimental import workflow
@ -9,9 +9,11 @@ def signature_check(a, b, c=1):
pass pass
def test_signature_check(): @pytest.mark.parametrize(
ray.init(namespace="workflow") "ray_start_regular", [{
"namespace": "workflow"
}], indirect=True)
def test_signature_check(ray_start_regular):
with pytest.raises(TypeError): with pytest.raises(TypeError):
signature_check(1, 2) signature_check(1, 2)
@ -31,5 +33,3 @@ def test_signature_check():
signature_check.step(1, 2, c=3) signature_check.step(1, 2, c=3)
signature_check.step(1, b=2, c=3) signature_check.step(1, b=2, c=3)
signature_check.step(a=1, b=2, c=3) signature_check.step(a=1, b=2, c=3)
ray.shutdown()

View file

@ -1,3 +1,6 @@
import ray
import pytest
from ray.tests.conftest import * # noqa
from ray.experimental import workflow from ray.experimental import workflow
@ -20,10 +23,10 @@ def variable_mutable():
return projection.step(a, b) return projection.step(a, b)
def test_variable_mutable(): @pytest.mark.parametrize(
import ray "ray_start_regular", [{
ray.init(namespace="workflow") "namespace": "workflow"
}], indirect=True)
def test_variable_mutable(ray_start_regular):
outputs = workflow.run(variable_mutable.step()) outputs = workflow.run(variable_mutable.step())
assert ray.get(outputs) == [] assert ray.get(outputs) == []
ray.shutdown()

View file

@ -1,9 +1,9 @@
import time import time
import pytest import pytest
import ray import ray
from ray.tests.conftest import * # noqa
from ray.experimental import workflow from ray.experimental import workflow
from ray.experimental.workflow import virtual_actor from ray.experimental.workflow import virtual_actor
@ -36,8 +36,14 @@ def init_virtual_actor(x):
return x return x
def test_readonly_actor(): @pytest.mark.parametrize(
ray.init(namespace="workflow") "ray_start_regular",
[{
"namespace": "workflow",
"num_cpus": 4 # We need more CPUs for concurrency
}],
indirect=True)
def test_readonly_actor(ray_start_regular):
actor = Counter.options(actor_id="Counter").create(42) actor = Counter.options(actor_id="Counter").create(42)
ray.get(actor.ready()) ray.get(actor.ready())
assert ray.get(actor.get.options(readonly=True).run()) == 42 assert ray.get(actor.get.options(readonly=True).run()) == 42
@ -54,7 +60,6 @@ def test_readonly_actor():
ray.get([readonly_actor.workload.run() for _ in range(10)]) ray.get([readonly_actor.workload.run() for _ in range(10)])
end = time.time() end = time.time()
assert end - start < 5 assert end - start < 5
ray.shutdown()
@workflow.actor @workflow.actor
@ -73,11 +78,16 @@ class SlowInit:
self.x = state self.x = state
def test_actor_ready(): @pytest.mark.parametrize(
ray.init(namespace="workflow") "ray_start_regular",
[{
"namespace": "workflow",
"num_cpus": 4 # We need more CPUs, otherwise 'create()' blocks 'get()'
}],
indirect=True)
def test_actor_ready(ray_start_regular):
actor = SlowInit.options(actor_id="SlowInit").create(42) actor = SlowInit.options(actor_id="SlowInit").create(42)
with pytest.raises(virtual_actor.VirtualActorNotInitializedError): with pytest.raises(virtual_actor.VirtualActorNotInitializedError):
ray.get(actor.get.options(readonly=True).run()) ray.get(actor.get.options(readonly=True).run())
ray.get(actor.ready()) ray.get(actor.ready())
assert ray.get(actor.get.options(readonly=True).run()) == 42 assert ray.get(actor.get.options(readonly=True).run()) == 42
ray.shutdown()

View file

@ -1,19 +0,0 @@
import time
import ray
from ray.experimental import workflow
@workflow.step
def foo(x):
print("Executing", x)
time.sleep(1)
if x < 20:
return foo.step(x + 1)
else:
return 20
if __name__ == "__main__":
ray.init(address="auto", namespace="workflow")
wf = workflow.run(foo.step(0), workflow_id="cluster_failure")
assert ray.get(wf) == 20