From 853d650e29c8ddf29321eb6bf48a8f4cffab4625 Mon Sep 17 00:00:00 2001 From: Kai Yang Date: Wed, 26 May 2021 14:48:24 +0800 Subject: [PATCH] =?UTF-8?q?Revert=20"Revert=20"[Object=20spilling]=20Avoid?= =?UTF-8?q?=20worker=20crash=20when=20an=20object=20is=20spille=E2=80=A6?= =?UTF-8?q?=20(#15964)"=20(#16012)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 29aa336a4de501a95cfe762bae4d94d6730525e7. --- python/ray/tests/test_object_spilling.py | 33 +++++++++++++++++++----- src/ray/core_worker/core_worker.cc | 9 ++++--- 2 files changed, 32 insertions(+), 10 deletions(-) diff --git a/python/ray/tests/test_object_spilling.py b/python/ray/tests/test_object_spilling.py index 51bb4d0e9..dc2d01c9f 100644 --- a/python/ray/tests/test_object_spilling.py +++ b/python/ray/tests/test_object_spilling.py @@ -390,7 +390,10 @@ def test_spill_stats(object_spilling_config, shutdown_only): @pytest.mark.skipif( platform.system() == "Windows", reason="Failing on Windows.") -def test_spill_during_get(object_spilling_config, shutdown_only): +@pytest.mark.asyncio +@pytest.mark.parametrize("is_async", [False, True]) +async def test_spill_during_get(object_spilling_config, shutdown_only, + is_async): object_spilling_config, _ = object_spilling_config address = ray.init( num_cpus=4, @@ -404,20 +407,38 @@ def test_spill_during_get(object_spilling_config, shutdown_only): }, ) - @ray.remote - def f(): - return np.zeros(10 * 1024 * 1024) + if is_async: + @ray.remote + class Actor: + async def f(self): + return np.zeros(10 * 1024 * 1024) + else: + + @ray.remote + def f(): + return np.zeros(10 * 1024 * 1024) + + if is_async: + a = Actor.remote() ids = [] for i in range(10): - x = f.remote() + if is_async: + x = a.f.remote() + else: + x = f.remote() print(i, x) ids.append(x) # Concurrent gets, which require restoring from external storage, while # objects are being created. for x in ids: - print(ray.get(x).shape) + if is_async: + obj = await x + else: + obj = ray.get(x) + print(obj.shape) + del obj assert_no_thrashing(address["redis_address"]) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index a9d07cfd7..42072cb6d 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2862,10 +2862,11 @@ void CoreWorker::PlasmaCallback(SetResultCallback success, bool object_is_local = false; if (Contains(object_id, &object_is_local).ok() && object_is_local) { std::vector> vec; - RAY_CHECK_OK(Get(std::vector{object_id}, 0, &vec)); - RAY_CHECK(vec.size() > 0) - << "Failed to get local object but Raylet notified object is local."; - return success(vec.front(), object_id, py_future); + if (Get(std::vector{object_id}, 0, &vec).ok()) { + RAY_CHECK(vec.size() > 0) + << "Failed to get local object but Raylet notified object is local."; + return success(vec.front(), object_id, py_future); + } } // Object is not available locally. We now add the callback to listener queue.