[function table] Make sure FunctionsToRun are executed properly on all workers (#21867)

This PR fix the issue that sometimes FunctionsToRun is not executed. We isolated the Functions/Actors in function table, but not the RunctionsToRun. So when doing importing, sometimes, some functions will be missed.
This PR fixed this.
This commit is contained in:
Yi Cheng 2022-01-26 21:58:43 -08:00 committed by GitHub
parent 3560211ab5
commit e6bbafc17a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 80 additions and 33 deletions

View file

@ -6,6 +6,7 @@ import json
import logging
import sys
import time
from typing import Optional
import threading
import traceback
from collections import (
@ -37,13 +38,22 @@ FunctionExecutionInfo = namedtuple("FunctionExecutionInfo",
logger = logging.getLogger(__name__)
def make_function_table_key(key_type: bytes, job_id: JobID,
key: Optional[bytes]):
if key is None:
return b":".join([key_type, job_id.hex().encode()])
else:
return b":".join([key_type, job_id.hex().encode(), key])
def make_exports_prefix(job_id: JobID) -> bytes:
return b"IsolatedExports:" + job_id.hex().encode()
return make_function_table_key(b"IsolatedExports", job_id)
def make_export_key(pos: int, job_id: JobID) -> bytes:
# big-endian for ordering in binary
return make_exports_prefix(job_id) + b":" + pos.to_bytes(8, "big")
return make_function_table_key(b"IsolatedExports", job_id,
pos.to_bytes(8, "big"))
class FunctionActorManager:
@ -192,9 +202,9 @@ class FunctionActorManager:
check_oversized_function(pickled_function,
remote_function._function_name,
"remote function", self._worker)
key = (
b"RemoteFunction:" + self._worker.current_job_id.hex().encode() +
b":" + remote_function._function_descriptor.function_id.binary())
key = make_function_table_key(
b"RemoteFunction", self._worker.current_job_id,
remote_function._function_descriptor.function_id.binary())
if self._worker.gcs_client.internal_kv_exists(
key, KV_NAMESPACE_FUNCTION_TABLE):
return
@ -423,8 +433,9 @@ class FunctionActorManager:
"task, please make sure the thread finishes before the "
"task finishes.")
job_id = self._worker.current_job_id
key = (b"ActorClass:" + job_id.hex().encode() + b":" +
actor_creation_function_descriptor.function_id.binary())
key = make_function_table_key(
b"ActorClass", job_id,
actor_creation_function_descriptor.function_id.binary())
try:
serialized_actor_class = pickle.dumps(Class)
except TypeError as e:
@ -555,8 +566,9 @@ class FunctionActorManager:
def _load_actor_class_from_gcs(self, job_id,
actor_creation_function_descriptor):
"""Load actor class from GCS."""
key = (b"ActorClass:" + job_id.hex().encode() + b":" +
actor_creation_function_descriptor.function_id.binary())
key = make_function_table_key(
b"ActorClass", job_id,
actor_creation_function_descriptor.function_id.binary())
# Only wait for the actor class if it was exported from the same job.
# It will hang if the job id mismatches, since we isolate actor class
# exports from the import thread. It's important to wait since this

View file

@ -142,7 +142,7 @@ class ImportThread:
"more discussion.", import_type, name,
ray_constants.DUPLICATE_REMOTE_FUNCTION_THRESHOLD)
if key.startswith(b"RemoteFunction"):
if key.startswith(b"RemoteFunction:"):
# TODO (Alex): There's a race condition here if the worker is
# shutdown before the function finished registering (because core
# worker's global worker is unset before shutdown and is needed
@ -150,10 +150,10 @@ class ImportThread:
# with profiling.profile("register_remote_function"):
(self.worker.function_actor_manager.
fetch_and_register_remote_function(key))
elif key.startswith(b"FunctionsToRun"):
elif key.startswith(b"FunctionsToRun:"):
with profiling.profile("fetch_and_run_function"):
self.fetch_and_execute_function_to_run(key)
elif key.startswith(b"ActorClass"):
elif key.startswith(b"ActorClass:"):
# Keep track of the fact that this actor class has been
# exported so that we know it is safe to turn this worker
# into an actor of that class.
@ -172,7 +172,6 @@ class ImportThread:
"""Run on arbitrary function on the worker."""
(job_id, serialized_function) = self._internal_kv_multiget(
key, ["job_id", "function"])
if self.worker.mode == ray.SCRIPT_MODE:
return

View file

@ -176,6 +176,8 @@ def function_entry_num(job_id):
len(_internal_kv_list(b"RemoteFunction:" + job_id,
namespace=KV_NAMESPACE_FUNCTION_TABLE)) + \
len(_internal_kv_list(b"ActorClass:" + job_id,
namespace=KV_NAMESPACE_FUNCTION_TABLE)) + \
len(_internal_kv_list(b"FunctionsToRun:" + job_id,
namespace=KV_NAMESPACE_FUNCTION_TABLE))

View file

@ -7,7 +7,8 @@ import time
import pytest
import ray.cluster_utils
from ray._private.test_utils import wait_for_pid_to_exit, client_test_enabled
from ray._private.test_utils import (wait_for_pid_to_exit, client_test_enabled,
run_string_as_driver)
import ray
@ -118,5 +119,41 @@ def test_internal_kv(ray_start_regular):
kv._internal_kv_list("@namespace_abc", namespace="n")
def test_run_on_all_workers(ray_start_regular):
# This test is to ensure run_function_on_all_workers are executed
# on all workers.
@ray.remote
class Actor:
def __init__(self):
self.jobs = []
def record(self, job_id=None):
if job_id is not None:
self.jobs.append(job_id)
return self.jobs
a = Actor.options(name="recorder", namespace="n").remote() # noqa: F841
driver_script = """
import ray
from pathlib import Path
def init_func(worker_info):
a = ray.get_actor("recorder", namespace="n")
a.record.remote(worker_info['worker'].worker_id)
ray.worker.global_worker.run_function_on_all_workers(init_func)
ray.init(address='auto')
@ray.remote
def ready():
a = ray.get_actor("recorder", namespace="n")
assert ray.worker.global_worker.worker_id in ray.get(a.record.remote())
ray.get(ready.remote())
"""
run_string_as_driver(driver_script)
run_string_as_driver(driver_script)
run_string_as_driver(driver_script)
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

View file

@ -54,7 +54,8 @@ from ray.exceptions import (
RayTaskError,
ObjectStoreFullError,
)
from ray._private.function_manager import FunctionActorManager
from ray._private.function_manager import FunctionActorManager, \
make_function_table_key
from ray._private.ray_logging import setup_logger
from ray._private.ray_logging import global_worker_stdstream_dispatcher
from ray._private.utils import check_oversized_function
@ -388,31 +389,24 @@ class Worker:
function_to_run_id = hashlib.shake_128(pickled_function).digest(
ray_constants.ID_SIZE)
key = b"FunctionsToRun:" + function_to_run_id
key = make_function_table_key(
b"FunctionsToRun", self.current_job_id, function_to_run_id)
# First run the function on the driver.
# We always run the task locally.
function({"worker": self})
# Check if the function has already been put into redis.
function_exported = self.gcs_client.internal_kv_put(
b"Lock:" + key, b"1", False,
ray_constants.KV_NAMESPACE_FUNCTION_TABLE) == 0
if function_exported is True:
# In this case, the function has already been exported, so
# we don't need to export it again.
return
check_oversized_function(pickled_function, function.__name__,
"function", self)
# Run the function on all workers.
self.gcs_client.internal_kv_put(
key,
pickle.dumps({
"job_id": self.current_job_id.binary(),
"function_id": function_to_run_id,
"function": pickled_function,
}), True, ray_constants.KV_NAMESPACE_FUNCTION_TABLE)
self.function_actor_manager.export_key(key)
if self.gcs_client.internal_kv_put(
key,
pickle.dumps({
"job_id": self.current_job_id.binary(),
"function_id": function_to_run_id,
"function": pickled_function,
}), True, ray_constants.KV_NAMESPACE_FUNCTION_TABLE) != 0:
self.function_actor_manager.export_key(key)
# TODO(rkn): If the worker fails after it calls setnx and before it
# successfully completes the hset and rpush, then the program will
# most likely hang. This could be fixed by making these three

View file

@ -48,6 +48,7 @@ class GcsFunctionManager {
kv_.Del("fun", "IsolatedExports:" + job_id_hex + ":", true, nullptr);
kv_.Del("fun", "RemoteFunction:" + job_id_hex + ":", true, nullptr);
kv_.Del("fun", "ActorClass:" + job_id_hex + ":", true, nullptr);
kv_.Del("fun", "FunctionsToRun:" + job_id_hex + ":", true, nullptr);
}
// Handler for internal KV

View file

@ -43,6 +43,8 @@ TEST_F(GcsFunctionManagerTest, TestFunctionManagerGC) {
.WillOnce(InvokeWithoutArgs(f));
EXPECT_CALL(*kv, Del(StrEq("fun"), StartsWith("ActorClass:"), true, _))
.WillOnce(InvokeWithoutArgs(f));
EXPECT_CALL(*kv, Del(StrEq("fun"), StartsWith("FunctionsToRun:"), true, _))
.WillOnce(InvokeWithoutArgs(f));
function_manager->AddJobReference(job_id);
EXPECT_EQ(0, num_del_called);
function_manager->AddJobReference(job_id);
@ -54,5 +56,5 @@ TEST_F(GcsFunctionManagerTest, TestFunctionManagerGC) {
function_manager->RemoveJobReference(job_id);
EXPECT_EQ(0, num_del_called);
function_manager->RemoveJobReference(job_id);
EXPECT_EQ(3, num_del_called);
EXPECT_EQ(4, num_del_called);
}