mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
run_function_on_all_workers
only once in the driver (#15203)
This commit is contained in:
parent
eaa3ce3f40
commit
4de1f35b3e
2 changed files with 9 additions and 4 deletions
|
@ -159,10 +159,10 @@ class ImportThread:
|
|||
run_on_other_drivers) = self.redis_client.hmget(
|
||||
key, ["job_id", "function", "run_on_other_drivers"])
|
||||
|
||||
if (ray._private.utils.decode(run_on_other_drivers) == "False"
|
||||
and self.worker.mode == ray.SCRIPT_MODE
|
||||
and job_id != self.worker.current_job_id.binary()):
|
||||
return
|
||||
if self.worker.mode == ray.SCRIPT_MODE:
|
||||
if (run_on_other_drivers == b"False"
|
||||
or job_id == self.worker.current_job_id.binary()):
|
||||
return
|
||||
|
||||
try:
|
||||
# FunctionActorManager may call pickle.loads at the same time.
|
||||
|
|
|
@ -145,6 +145,11 @@ def test_running_function_on_all_workers(ray_start_regular):
|
|||
|
||||
assert "fake_directory" == ray.get(get_path1.remote())[-1]
|
||||
|
||||
# the function should only run on the current driver once.
|
||||
assert sys.path[-1] == "fake_directory"
|
||||
if len(sys.path) > 1:
|
||||
assert sys.path[-2] != "fake_directory"
|
||||
|
||||
def f(worker_info):
|
||||
sys.path.pop(-1)
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue