mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
Remove unused function definition caching (#6042)
This commit is contained in:
parent
56f3e96887
commit
e9e78871b9
2 changed files with 7 additions and 50 deletions
|
@ -315,46 +315,17 @@ class FunctionActorManager(object):
|
|||
job_id = ray.JobID.nil()
|
||||
return self._num_task_executions[job_id][function_id]
|
||||
|
||||
def export_cached(self):
|
||||
"""Export cached remote functions
|
||||
|
||||
Note: this should be called only once when worker is connected.
|
||||
"""
|
||||
for remote_function in self._functions_to_export:
|
||||
self._do_export(remote_function)
|
||||
self._functions_to_export = None
|
||||
for info in self._actors_to_export:
|
||||
(key, actor_class_info) = info
|
||||
self._publish_actor_class_to_key(key, actor_class_info)
|
||||
|
||||
def reset_cache(self):
|
||||
self._functions_to_export = []
|
||||
self._actors_to_export = []
|
||||
|
||||
def export(self, remote_function):
|
||||
"""Export a remote function.
|
||||
|
||||
Args:
|
||||
remote_function: the RemoteFunction object.
|
||||
"""
|
||||
if self._worker.mode is None:
|
||||
# If the worker isn't connected, cache the function
|
||||
# and export it later.
|
||||
self._functions_to_export.append(remote_function)
|
||||
return
|
||||
if self._worker.mode == ray.worker.LOCAL_MODE:
|
||||
# Don't need to export if the worker is not a driver.
|
||||
return
|
||||
self._do_export(remote_function)
|
||||
|
||||
def _do_export(self, remote_function):
|
||||
"""Pickle a remote function and export it to redis.
|
||||
|
||||
Args:
|
||||
remote_function: the RemoteFunction object.
|
||||
"""
|
||||
if self._worker.mode == ray.worker.LOCAL_MODE:
|
||||
return
|
||||
if self._worker.load_code_from_local:
|
||||
return
|
||||
|
||||
function = remote_function._function
|
||||
pickled_function = pickle.dumps(function)
|
||||
|
||||
|
@ -585,21 +556,10 @@ class FunctionActorManager(object):
|
|||
actor_class_info["class_name"], "actor",
|
||||
self._worker)
|
||||
|
||||
if self._worker.mode is None:
|
||||
# This means that 'ray.init()' has not been called yet and so we
|
||||
# must cache the actor class definition and export it when
|
||||
# 'ray.init()' is called.
|
||||
assert self._actors_to_export is not None
|
||||
self._actors_to_export.append((key, actor_class_info))
|
||||
# This caching code path is currently not used because we only
|
||||
# export actor class definitions lazily when we instantiate the
|
||||
# actor for the first time.
|
||||
assert False, "This should be unreachable."
|
||||
else:
|
||||
self._publish_actor_class_to_key(key, actor_class_info)
|
||||
# TODO(rkn): Currently we allow actor classes to be defined
|
||||
# within tasks. I tried to disable this, but it may be necessary
|
||||
# because of https://github.com/ray-project/ray/issues/1146.
|
||||
self._publish_actor_class_to_key(key, actor_class_info)
|
||||
# TODO(rkn): Currently we allow actor classes to be defined
|
||||
# within tasks. I tried to disable this, but it may be necessary
|
||||
# because of https://github.com/ray-project/ray/issues/1146.
|
||||
|
||||
def load_actor_class(self, job_id, function_descriptor):
|
||||
"""Load the actor class.
|
||||
|
|
|
@ -1578,8 +1578,6 @@ def connect(node,
|
|||
# Export cached functions_to_run.
|
||||
for function in worker.cached_functions_to_run:
|
||||
worker.run_function_on_all_workers(function)
|
||||
# Export cached remote functions and actors to the workers.
|
||||
worker.function_actor_manager.export_cached()
|
||||
worker.cached_functions_to_run = None
|
||||
|
||||
|
||||
|
@ -1608,7 +1606,6 @@ def disconnect(exiting_interpreter=False):
|
|||
|
||||
worker.node = None # Disconnect the worker from the node.
|
||||
worker.cached_functions_to_run = []
|
||||
worker.function_actor_manager.reset_cache()
|
||||
worker.serialization_context_map.clear()
|
||||
|
||||
# We need to destruct the core worker here because after this function,
|
||||
|
|
Loading…
Add table
Reference in a new issue