Revert "Revert "[Object spilling] Avoid worker crash when an object is spille… (#15964)" (#16012)

This reverts commit 29aa336a4d.
This commit is contained in:
Kai Yang 2021-05-26 14:48:24 +08:00 committed by GitHub
parent 3dbdd4eb46
commit 853d650e29
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 32 additions and 10 deletions

View file

@ -390,7 +390,10 @@ def test_spill_stats(object_spilling_config, shutdown_only):
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_spill_during_get(object_spilling_config, shutdown_only):
@pytest.mark.asyncio
@pytest.mark.parametrize("is_async", [False, True])
async def test_spill_during_get(object_spilling_config, shutdown_only,
is_async):
object_spilling_config, _ = object_spilling_config
address = ray.init(
num_cpus=4,
@ -404,20 +407,38 @@ def test_spill_during_get(object_spilling_config, shutdown_only):
},
)
@ray.remote
def f():
return np.zeros(10 * 1024 * 1024)
if is_async:
@ray.remote
class Actor:
async def f(self):
return np.zeros(10 * 1024 * 1024)
else:
@ray.remote
def f():
return np.zeros(10 * 1024 * 1024)
if is_async:
a = Actor.remote()
ids = []
for i in range(10):
x = f.remote()
if is_async:
x = a.f.remote()
else:
x = f.remote()
print(i, x)
ids.append(x)
# Concurrent gets, which require restoring from external storage, while
# objects are being created.
for x in ids:
print(ray.get(x).shape)
if is_async:
obj = await x
else:
obj = ray.get(x)
print(obj.shape)
del obj
assert_no_thrashing(address["redis_address"])

View file

@ -2862,10 +2862,11 @@ void CoreWorker::PlasmaCallback(SetResultCallback success,
bool object_is_local = false;
if (Contains(object_id, &object_is_local).ok() && object_is_local) {
std::vector<std::shared_ptr<RayObject>> vec;
RAY_CHECK_OK(Get(std::vector<ObjectID>{object_id}, 0, &vec));
RAY_CHECK(vec.size() > 0)
<< "Failed to get local object but Raylet notified object is local.";
return success(vec.front(), object_id, py_future);
if (Get(std::vector<ObjectID>{object_id}, 0, &vec).ok()) {
RAY_CHECK(vec.size() > 0)
<< "Failed to get local object but Raylet notified object is local.";
return success(vec.front(), object_id, py_future);
}
}
// Object is not available locally. We now add the callback to listener queue.