mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
[core] Use function_actor_manager.lock when deserializing (#16278)
* use function_actor_manager.lock when deserializing * add comment and todo * better comment * fix comment
This commit is contained in:
parent
c2a2a6f7c3
commit
c8e3ed9eec
1 changed files with 9 additions and 4 deletions
|
@ -289,16 +289,21 @@ class Worker:
|
|||
serialized_value, object_ref=object_ref))
|
||||
|
||||
def raise_errors(self, data_metadata_pairs, object_refs):
|
||||
context = self.get_serialization_context()
|
||||
out = context.deserialize_objects(data_metadata_pairs, object_refs)
|
||||
out = self.deserialize_objects(data_metadata_pairs, object_refs)
|
||||
if "RAY_IGNORE_UNHANDLED_ERRORS" in os.environ:
|
||||
return
|
||||
for e in out:
|
||||
_unhandled_error_handler(e)
|
||||
|
||||
def deserialize_objects(self, data_metadata_pairs, object_refs):
|
||||
context = self.get_serialization_context()
|
||||
return context.deserialize_objects(data_metadata_pairs, object_refs)
|
||||
# Function actor manager or the import thread may call pickle.loads
|
||||
# at the same time which can lead to failed imports
|
||||
# TODO: We may be better off locking on all imports or injecting a lock
|
||||
# into pickle.loads (https://github.com/ray-project/ray/issues/16304)
|
||||
with self.function_actor_manager.lock:
|
||||
context = self.get_serialization_context()
|
||||
return context.deserialize_objects(data_metadata_pairs,
|
||||
object_refs)
|
||||
|
||||
def get_objects(self, object_refs, timeout=None):
|
||||
"""Get the values in the object store associated with the IDs.
|
||||
|
|
Loading…
Add table
Reference in a new issue