From 5ae8d5b8af43702abb2a7922a335931a8d26995e Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Fri, 4 Feb 2022 14:50:23 -0800 Subject: [PATCH] Revert "Revert "[client] Fix ray client object ref releasing in wrong context."" (#22091) Reverts ray-project/ray#22090 --- python/ray/_raylet.pxd | 2 ++ python/ray/includes/object_ref.pxi | 28 +++++++++++++++++----------- python/ray/includes/unique_ids.pxi | 26 ++++++++++++++++---------- python/ray/tests/test_client.py | 22 ++++++++++++++++++++++ python/ray/util/client/__init__.py | 3 +++ 5 files changed, 60 insertions(+), 21 deletions(-) diff --git a/python/ray/_raylet.pxd b/python/ray/_raylet.pxd index 7715063ea..31389bf61 100644 --- a/python/ray/_raylet.pxd +++ b/python/ray/_raylet.pxd @@ -93,6 +93,7 @@ cdef class ObjectRef(BaseID): cdef class ClientObjectRef(ObjectRef): cdef object _mutex cdef object _id_future + cdef object _worker cdef _set_id(self, id) cdef inline _wait_for_id(self, timeout=None) @@ -107,6 +108,7 @@ cdef class ActorID(BaseID): cdef class ClientActorRef(ActorID): cdef object _mutex cdef object _id_future + cdef object _worker cdef _set_id(self, id) cdef inline _wait_for_id(self, timeout=None) diff --git a/python/ray/includes/object_ref.pxi b/python/ray/includes/object_ref.pxi index 860f2fe28..1c5cad74c 100644 --- a/python/ray/includes/object_ref.pxi +++ b/python/ray/includes/object_ref.pxi @@ -154,6 +154,7 @@ cdef class ClientObjectRef(ObjectRef): def __init__(self, id: Union[bytes, concurrent.futures.Future]): self.in_core_worker = False self._mutex = threading.Lock() + self._worker = client.ray.get_context().client_worker if isinstance(id, bytes): self._set_id(id) elif isinstance(id, concurrent.futures.Future): @@ -162,14 +163,19 @@ cdef class ClientObjectRef(ObjectRef): raise TypeError("Unexpected type for id {}".format(id)) def __dealloc__(self): - if client is None or client.ray is None: - # Similar issue as mentioned in ObjectRef.__dealloc__ above. The - # client package or client.ray object might be set - # to None when the script exits. Should be safe to skip - # call_release in this case, since the client should have already - # disconnected at this point. - return - if client.ray.is_connected(): + # This function is necessary, since within `__dealloc__`, we are not + # supposed to do any python related operations. It's for C related + # resource cleanup. + # Besides, we can't use `__del__` here, since cython won't call this + # unless it's > 3.0. + def _connected(): + try: + return client is not None and self._worker is not None \ + and self._worker.is_connected() + except Exception: + return False + + if _connected(): try: self._wait_for_id() # cython would suppress this exception as well, but it tries to @@ -182,7 +188,7 @@ cdef class ClientObjectRef(ObjectRef): "a method on the actor reference before its destructor " "is run.") if not self.data.IsNil(): - client.ray.call_release(self.id) + self._worker.call_release(self.id) cdef CObjectID native(self): self._wait_for_id() @@ -252,12 +258,12 @@ cdef class ClientObjectRef(ObjectRef): py_callback(data) - client.ray._register_callback(self, deserialize_obj) + self._worker.register_callback(self, deserialize_obj) cdef _set_id(self, id): check_id(id) self.data = CObjectID.FromBinary(id) - client.ray.call_retain(id) + self._worker.call_retain(id) cdef inline _wait_for_id(self, timeout=None): if self._id_future: diff --git a/python/ray/includes/unique_ids.pxi b/python/ray/includes/unique_ids.pxi index 93822f570..d89b3f856 100644 --- a/python/ray/includes/unique_ids.pxi +++ b/python/ray/includes/unique_ids.pxi @@ -325,6 +325,7 @@ cdef class ClientActorRef(ActorID): def __init__(self, id: Union[bytes, concurrent.futures.Future]): self._mutex = threading.Lock() + self._worker = client.ray.get_context().client_worker if isinstance(id, bytes): self._set_id(id) elif isinstance(id, Future): @@ -333,13 +334,19 @@ cdef class ClientActorRef(ActorID): raise TypeError("Unexpected type for id {}".format(id)) def __dealloc__(self): - if client is None or client.ray is None: - # The client package or client.ray object might be set - # to None when the script exits. Should be safe to skip - # call_release in this case, since the client should have already - # disconnected at this point. - return - if client.ray.is_connected(): + # This function is necessary, since within `__dealloc__`, we are not + # supposed to do any python related operations. It's for C related + # resource cleanup. + # Besides, we can't use `__del__` here, since cython won't call this + # unless it's > 3.0. + def _connected(): + try: + return client is not None and \ + self._worker is not None and self._worker.is_connected() + except Exception: + return False + + if _connected(): try: self._wait_for_id() # cython would suppress this exception as well, but it tries to @@ -352,7 +359,7 @@ cdef class ClientActorRef(ActorID): "a method on the actor reference before its destructor " "is run.") if not self.data.IsNil(): - client.ray.call_release(self.id) + self._worker.call_release(self.id) def binary(self): self._wait_for_id() @@ -381,7 +388,7 @@ cdef class ClientActorRef(ActorID): cdef _set_id(self, id): check_id(id, CActorID.Size()) self.data = CActorID.FromBinary(id) - client.ray.call_retain(id) + self._worker.call_retain(id) cdef _wait_for_id(self, timeout=None): if self._id_future: @@ -390,7 +397,6 @@ cdef class ClientActorRef(ActorID): self._set_id(self._id_future.result(timeout=timeout)) self._id_future = None - cdef class FunctionID(UniqueID): def __init__(self, id): diff --git a/python/ray/tests/test_client.py b/python/ray/tests/test_client.py index d02edeed5..3a7068df2 100644 --- a/python/ray/tests/test_client.py +++ b/python/ray/tests/test_client.py @@ -758,5 +758,27 @@ def test_init_requires_no_resources(call_ray_start, use_client): ray.get(f.remote()) +@pytest.mark.parametrize( + "call_ray_start", + ["ray start --head --ray-client-server-port 25553 --num-cpus 1"], + indirect=True, +) +def test_object_ref_release(call_ray_start): + import ray + + ray.init("ray://localhost:25553") + + a = ray.put("Hello") + + ray.shutdown() + ray.init("ray://localhost:25553") + + del a + + with disable_client_hook(): + ref_cnt = ray.util.client.ray.get_context().client_worker.reference_count + assert all(v > 0 for v in ref_cnt.values()) + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/util/client/__init__.py b/python/ray/util/client/__init__.py index 2f55135e6..12cde3b78 100644 --- a/python/ray/util/client/__init__.py +++ b/python/ray/util/client/__init__.py @@ -138,8 +138,11 @@ class _ClientContext: def disconnect(self): """Disconnect the Ray Client.""" + from ray.util.client.api import ClientAPI + if self.client_worker is not None: self.client_worker.close() + self.api = ClientAPI() self.client_worker = None # remote can be called outside of a connection, which is why it