Revert "Revert "[client] Fix ray client object ref releasing in wrong context."" (#22091)

Reverts ray-project/ray#22090
This commit is contained in:
Yi Cheng 2022-02-04 14:50:23 -08:00 committed by GitHub
parent 182dbfbfdb
commit 5ae8d5b8af
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 60 additions and 21 deletions

View file

@ -93,6 +93,7 @@ cdef class ObjectRef(BaseID):
cdef class ClientObjectRef(ObjectRef):
cdef object _mutex
cdef object _id_future
cdef object _worker
cdef _set_id(self, id)
cdef inline _wait_for_id(self, timeout=None)
@ -107,6 +108,7 @@ cdef class ActorID(BaseID):
cdef class ClientActorRef(ActorID):
cdef object _mutex
cdef object _id_future
cdef object _worker
cdef _set_id(self, id)
cdef inline _wait_for_id(self, timeout=None)

View file

@ -154,6 +154,7 @@ cdef class ClientObjectRef(ObjectRef):
def __init__(self, id: Union[bytes, concurrent.futures.Future]):
self.in_core_worker = False
self._mutex = threading.Lock()
self._worker = client.ray.get_context().client_worker
if isinstance(id, bytes):
self._set_id(id)
elif isinstance(id, concurrent.futures.Future):
@ -162,14 +163,19 @@ cdef class ClientObjectRef(ObjectRef):
raise TypeError("Unexpected type for id {}".format(id))
def __dealloc__(self):
if client is None or client.ray is None:
# Similar issue as mentioned in ObjectRef.__dealloc__ above. The
# client package or client.ray object might be set
# to None when the script exits. Should be safe to skip
# call_release in this case, since the client should have already
# disconnected at this point.
return
if client.ray.is_connected():
# This function is necessary, since within `__dealloc__`, we are not
# supposed to do any python related operations. It's for C related
# resource cleanup.
# Besides, we can't use `__del__` here, since cython won't call this
# unless it's > 3.0.
def _connected():
try:
return client is not None and self._worker is not None \
and self._worker.is_connected()
except Exception:
return False
if _connected():
try:
self._wait_for_id()
# cython would suppress this exception as well, but it tries to
@ -182,7 +188,7 @@ cdef class ClientObjectRef(ObjectRef):
"a method on the actor reference before its destructor "
"is run.")
if not self.data.IsNil():
client.ray.call_release(self.id)
self._worker.call_release(self.id)
cdef CObjectID native(self):
self._wait_for_id()
@ -252,12 +258,12 @@ cdef class ClientObjectRef(ObjectRef):
py_callback(data)
client.ray._register_callback(self, deserialize_obj)
self._worker.register_callback(self, deserialize_obj)
cdef _set_id(self, id):
check_id(id)
self.data = CObjectID.FromBinary(<c_string>id)
client.ray.call_retain(id)
self._worker.call_retain(id)
cdef inline _wait_for_id(self, timeout=None):
if self._id_future:

View file

@ -325,6 +325,7 @@ cdef class ClientActorRef(ActorID):
def __init__(self, id: Union[bytes, concurrent.futures.Future]):
self._mutex = threading.Lock()
self._worker = client.ray.get_context().client_worker
if isinstance(id, bytes):
self._set_id(id)
elif isinstance(id, Future):
@ -333,13 +334,19 @@ cdef class ClientActorRef(ActorID):
raise TypeError("Unexpected type for id {}".format(id))
def __dealloc__(self):
if client is None or client.ray is None:
# The client package or client.ray object might be set
# to None when the script exits. Should be safe to skip
# call_release in this case, since the client should have already
# disconnected at this point.
return
if client.ray.is_connected():
# This function is necessary, since within `__dealloc__`, we are not
# supposed to do any python related operations. It's for C related
# resource cleanup.
# Besides, we can't use `__del__` here, since cython won't call this
# unless it's > 3.0.
def _connected():
try:
return client is not None and \
self._worker is not None and self._worker.is_connected()
except Exception:
return False
if _connected():
try:
self._wait_for_id()
# cython would suppress this exception as well, but it tries to
@ -352,7 +359,7 @@ cdef class ClientActorRef(ActorID):
"a method on the actor reference before its destructor "
"is run.")
if not self.data.IsNil():
client.ray.call_release(self.id)
self._worker.call_release(self.id)
def binary(self):
self._wait_for_id()
@ -381,7 +388,7 @@ cdef class ClientActorRef(ActorID):
cdef _set_id(self, id):
check_id(id, CActorID.Size())
self.data = CActorID.FromBinary(<c_string>id)
client.ray.call_retain(id)
self._worker.call_retain(id)
cdef _wait_for_id(self, timeout=None):
if self._id_future:
@ -390,7 +397,6 @@ cdef class ClientActorRef(ActorID):
self._set_id(self._id_future.result(timeout=timeout))
self._id_future = None
cdef class FunctionID(UniqueID):
def __init__(self, id):

View file

@ -758,5 +758,27 @@ def test_init_requires_no_resources(call_ray_start, use_client):
ray.get(f.remote())
@pytest.mark.parametrize(
"call_ray_start",
["ray start --head --ray-client-server-port 25553 --num-cpus 1"],
indirect=True,
)
def test_object_ref_release(call_ray_start):
import ray
ray.init("ray://localhost:25553")
a = ray.put("Hello")
ray.shutdown()
ray.init("ray://localhost:25553")
del a
with disable_client_hook():
ref_cnt = ray.util.client.ray.get_context().client_worker.reference_count
assert all(v > 0 for v in ref_cnt.values())
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

View file

@ -138,8 +138,11 @@ class _ClientContext:
def disconnect(self):
"""Disconnect the Ray Client."""
from ray.util.client.api import ClientAPI
if self.client_worker is not None:
self.client_worker.close()
self.api = ClientAPI()
self.client_worker = None
# remote can be called outside of a connection, which is why it