diff --git a/.bazelrc b/.bazelrc index 6c70b2e95..0897dc6a0 100644 --- a/.bazelrc +++ b/.bazelrc @@ -190,3 +190,5 @@ build:ubsan --linkopt -fno-sanitize-recover=all # Import local specific llvm config options, which can be generated by # ci/travis/install-llvm-dependencies.sh try-import %workspace%/.llvm-local.bazelrc + +test:ci --test_env=RAY_lineage_pinning_enabled=1 diff --git a/python/ray/workflow/tests/test_basic_workflows_2.py b/python/ray/workflow/tests/test_basic_workflows_2.py index 3bd71edb8..214368614 100644 --- a/python/ray/workflow/tests/test_basic_workflows_2.py +++ b/python/ray/workflow/tests/test_basic_workflows_2.py @@ -3,7 +3,6 @@ import pytest import ray import re from filelock import FileLock -from pathlib import Path from ray._private.test_utils import run_string_as_driver, SignalActor from ray import workflow from ray.tests.conftest import * # noqa @@ -268,93 +267,6 @@ def test_no_init(shutdown_only): workflow.get_actor("wf") -def test_wf_run(workflow_start_regular, tmp_path): - counter = tmp_path / "counter" - counter.write_text("0") - - @workflow.step - def f(): - v = int(counter.read_text()) + 1 - counter.write_text(str(v)) - - f.step().run("abc") - assert counter.read_text() == "1" - # This will not rerun the job from beginning - f.step().run("abc") - assert counter.read_text() == "1" - - -def test_wf_no_run(): - @workflow.step - def f1(): - pass - - f1.step() - - @workflow.step - def f2(*w): - pass - - f = f2.step(*[f1.step() for _ in range(10)]) - - with pytest.raises(Exception): - f.run() - - -def test_dedupe_indirect(workflow_start_regular, tmp_path): - counter = Path(tmp_path) / "counter.txt" - lock = Path(tmp_path) / "lock.txt" - counter.write_text("0") - - @workflow.step - def incr(): - with FileLock(str(lock)): - c = int(counter.read_text()) - c += 1 - counter.write_text(f"{c}") - - @workflow.step - def identity(a): - return a - - @workflow.step - def join(*a): - return counter.read_text() - - # Here a is passed to two steps and we need to ensure - # it's only executed once - a = incr.step() - i1 = identity.step(a) - i2 = identity.step(a) - assert "1" == join.step(i1, i2).run() - assert "2" == join.step(i1, i2).run() - # pass a multiple times - assert "3" == join.step(a, a, a, a).run() - assert "4" == join.step(a, a, a, a).run() - - -def test_run_off_main_thread(workflow_start_regular): - @workflow.step - def fake_data(num: int): - return list(range(num)) - - succ = False - - # Start new thread here ⚠️ - def run(): - global succ - # Setup the workflow. - data = fake_data.step(10) - assert data.run(workflow_id="run") == list(range(10)) - - import threading - - t = threading.Thread(target=run) - t.start() - t.join() - assert workflow.get_status("run") == workflow.SUCCESSFUL - - if __name__ == "__main__": import sys diff --git a/python/ray/workflow/tests/test_basic_workflows_3.py b/python/ray/workflow/tests/test_basic_workflows_3.py new file mode 100644 index 000000000..d1f6303ce --- /dev/null +++ b/python/ray/workflow/tests/test_basic_workflows_3.py @@ -0,0 +1,98 @@ +import pytest +from filelock import FileLock +from pathlib import Path +from ray import workflow +from ray.tests.conftest import * # noqa + + +def test_wf_run(workflow_start_regular, tmp_path): + counter = tmp_path / "counter" + counter.write_text("0") + + @workflow.step + def f(): + v = int(counter.read_text()) + 1 + counter.write_text(str(v)) + + f.step().run("abc") + assert counter.read_text() == "1" + # This will not rerun the job from beginning + f.step().run("abc") + assert counter.read_text() == "1" + + +def test_wf_no_run(): + @workflow.step + def f1(): + pass + + f1.step() + + @workflow.step + def f2(*w): + pass + + f = f2.step(*[f1.step() for _ in range(10)]) + + with pytest.raises(Exception): + f.run() + + +def test_dedupe_indirect(workflow_start_regular, tmp_path): + counter = Path(tmp_path) / "counter.txt" + lock = Path(tmp_path) / "lock.txt" + counter.write_text("0") + + @workflow.step + def incr(): + with FileLock(str(lock)): + c = int(counter.read_text()) + c += 1 + counter.write_text(f"{c}") + + @workflow.step + def identity(a): + return a + + @workflow.step + def join(*a): + return counter.read_text() + + # Here a is passed to two steps and we need to ensure + # it's only executed once + a = incr.step() + i1 = identity.step(a) + i2 = identity.step(a) + assert "1" == join.step(i1, i2).run() + assert "2" == join.step(i1, i2).run() + # pass a multiple times + assert "3" == join.step(a, a, a, a).run() + assert "4" == join.step(a, a, a, a).run() + + +def test_run_off_main_thread(workflow_start_regular): + @workflow.step + def fake_data(num: int): + return list(range(num)) + + succ = False + + # Start new thread here ⚠️ + def run(): + global succ + # Setup the workflow. + data = fake_data.step(10) + assert data.run(workflow_id="run") == list(range(10)) + + import threading + + t = threading.Thread(target=run) + t.start() + t.join() + assert workflow.get_status("run") == workflow.SUCCESSFUL + + +if __name__ == "__main__": + import sys + + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/workflow/tests/test_virtual_actor_2.py b/python/ray/workflow/tests/test_virtual_actor_2.py index fa7b92e95..d63b4ec12 100644 --- a/python/ray/workflow/tests/test_virtual_actor_2.py +++ b/python/ray/workflow/tests/test_virtual_actor_2.py @@ -185,86 +185,6 @@ def test_wf_in_actor(workflow_start_regular, tmp_path): assert ray.get(ret) == "UP" -@pytest.mark.parametrize( - "workflow_start_regular", - [ - { - "num_cpus": 4 - # We need more CPUs, otherwise 'create()' blocks 'get()' - } - ], - indirect=True, -) -@pytest.mark.repeat(5) -def test_wf_in_actor_chain(workflow_start_regular, tmp_path): - file_lock = [str(tmp_path / str(i)) for i in range(5)] - fail_flag = tmp_path / "fail" - - @workflow.virtual_actor - class Counter: - def __init__(self): - self._counter = 0 - - def incr(self, n): - with FileLock(file_lock[n]): - self._counter += 1 - if fail_flag.exists(): - raise Exception() - - if n == 0: - return self._counter - else: - return self.incr.step(n - 1) - - @workflow.virtual_actor.readonly - def val(self): - return self._counter - - def __getstate__(self): - return self._counter - - def __setstate__(self, v): - self._counter = v - - locks = [FileLock(f) for f in file_lock] - for lock in locks: - lock.acquire() - - c = Counter.get_or_create("counter") - ray.get(c.ready()) - final_ret = c.incr.run_async(len(file_lock) - 1) - for i in range(0, len(file_lock) - 2): - locks[-i - 1].release() - val = c.val.run() - for _ in range(0, 60): - if val == i + 1: - break - val = c.val.run() - time.sleep(1) - assert val == i + 1 - - fail_flag.touch() - locks[1 - len(file_lock)].release() - # Fail the pipeline - with pytest.raises(Exception): - ray.get(final_ret) - - fail_flag.unlink() - workflow.resume("counter") - # After resume, it'll start form the place where it failed - for i in range(len(file_lock) - 1, len(file_lock)): - locks[-i - 1].release() - val = c.val.run() - for _ in range(0, 60): - if val == i + 1: - break - val = c.val.run() - time.sleep(1) - assert val == i + 1 - - assert c.val.run() == 5 - - @pytest.mark.parametrize( "workflow_start_regular", [ diff --git a/python/ray/workflow/tests/test_virtual_actor_3.py b/python/ray/workflow/tests/test_virtual_actor_3.py index 9dfc622fa..8242ee398 100644 --- a/python/ray/workflow/tests/test_virtual_actor_3.py +++ b/python/ray/workflow/tests/test_virtual_actor_3.py @@ -1,8 +1,11 @@ import pytest +from filelock import FileLock +import time +from typing import Optional, Dict, Tuple, List import ray from ray import workflow -from typing import Optional, Dict, Tuple, List +from ray.tests.conftest import * # noqa @ray.workflow.virtual_actor @@ -123,6 +126,86 @@ def test_writer_actor_pressure_test(workflow_start_regular): assert user.goods_value.run() == (5, []) +@pytest.mark.parametrize( + "workflow_start_regular", + [ + { + "num_cpus": 4 + # We need more CPUs, otherwise 'create()' blocks 'get()' + } + ], + indirect=True, +) +@pytest.mark.repeat(5) +def test_wf_in_actor_chain(workflow_start_regular, tmp_path): + file_lock = [str(tmp_path / str(i)) for i in range(5)] + fail_flag = tmp_path / "fail" + + @workflow.virtual_actor + class Counter: + def __init__(self): + self._counter = 0 + + def incr(self, n): + with FileLock(file_lock[n]): + self._counter += 1 + if fail_flag.exists(): + raise Exception() + + if n == 0: + return self._counter + else: + return self.incr.step(n - 1) + + @workflow.virtual_actor.readonly + def val(self): + return self._counter + + def __getstate__(self): + return self._counter + + def __setstate__(self, v): + self._counter = v + + locks = [FileLock(f) for f in file_lock] + for lock in locks: + lock.acquire() + + c = Counter.get_or_create("counter") + ray.get(c.ready()) + final_ret = c.incr.run_async(len(file_lock) - 1) + for i in range(0, len(file_lock) - 2): + locks[-i - 1].release() + val = c.val.run() + for _ in range(0, 60): + if val == i + 1: + break + val = c.val.run() + time.sleep(1) + assert val == i + 1 + + fail_flag.touch() + locks[1 - len(file_lock)].release() + # Fail the pipeline + with pytest.raises(Exception): + ray.get(final_ret) + + fail_flag.unlink() + workflow.resume("counter") + # After resume, it'll start form the place where it failed + for i in range(len(file_lock) - 1, len(file_lock)): + locks[-i - 1].release() + val = c.val.run() + for _ in range(0, 60): + if val == i + 1: + break + val = c.val.run() + time.sleep(1) + assert val == i + 1 + + assert c.val.run() == 5 + + if __name__ == "__main__": import sys diff --git a/release/e2e.py b/release/e2e.py index 5c9aabe64..b5e5e2e5e 100644 --- a/release/e2e.py +++ b/release/e2e.py @@ -1485,12 +1485,16 @@ def run_test_config( if state_json is None: state_json = "/tmp/release_test_state.json" + custom_env_vars = { + "RAY_lineage_pinning_enabled": "1", + } env_vars = { "RAY_ADDRESS": os.environ.get("RAY_ADDRESS", "auto"), "TEST_OUTPUT_JSON": results_json, "TEST_STATE_JSON": state_json, "IS_SMOKE_TEST": "1" if smoke_test else "0", } + env_vars.update(custom_env_vars) with open(os.path.join(local_dir, ".anyscale.yaml"), "wt") as f: f.write(f"project_id: {project_id}")