[gcs] Remove rpush for storage in py (#20410)

## Why are these changes needed?
In python, redis rpush is used to broadcast and store the keys. In this PR, we use gcs kv to store the keys. pubsub is still using redis which need to be remove later.

The protocol before this PR:
- worker subscribe to redis key spaces
- worker write the key of function/actor to (export:sqn, key)
- so the other worker will be notified and start to load the data by checking export:sqn

This depends on redis for both kv and pubsub, and this PR fix the kv part.

After this PR:
- worker subscribe to redis key space
- For exporting:
  - worker will find the first key not being hold. This is guaranteed by internal kv which right now is a single thread, atomic db. The worker will just check until it find one key not existing and write it (this is single operation). One optimization right now is to use the import counter as the start offset since this counter means all keys before the counter has already been used.
  - worker will then write a dummy key to redis key space for broadcasting
- For importer
 - It's working as before, but instead of reading from redis, it will read from gcs kv.


This is part in redis removal project.

## Related issue number
https://github.com/ray-project/ray/issues/19443
This commit is contained in:
Yi Cheng 2021-11-21 00:40:13 -08:00 committed by GitHub
parent 88136fa495
commit 36ee3fd46b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 54 additions and 24 deletions

View file

@ -37,6 +37,11 @@ FunctionExecutionInfo = namedtuple("FunctionExecutionInfo",
logger = logging.getLogger(__name__)
def make_export_key(pos):
# big-endian for ordering in binary
return b"Exports:" + pos.to_bytes(8, "big")
class FunctionActorManager:
"""A class used to export/load remote functions and actors.
Attributes:
@ -76,7 +81,13 @@ class FunctionActorManager:
# So, the lock should be a reentrant lock.
self.lock = threading.RLock()
self.cv = threading.Condition(lock=self.lock)
self.execution_infos = {}
# This is the counter to keep track of how many keys have already
# been exported so that we can find next key quicker.
self._num_exported = 0
# This is to protect self._num_exported when doing exporting
self._export_lock = threading.Lock()
def increase_task_counter(self, function_descriptor):
function_id = function_descriptor.function_id
@ -127,6 +138,27 @@ class FunctionActorManager:
except Exception:
return None
def export_key(self, key):
"""Export a key so it can be imported by other workers"""
# It's going to check all the keys until if reserve one key not
# existing in the cluster.
# One optimization is that we can use importer counter since
# it's sure keys before this counter has been allocated.
with self._export_lock:
self._num_exported = max(self._num_exported,
self._worker.import_thread.num_imported)
while True:
self._num_exported += 1
holder = make_export_key(self._num_exported)
# This step is atomic since internal kv is a single thread
# atomic db.
if self._worker.gcs_client.internal_kv_put(
holder, key, False, KV_NAMESPACE_FUNCTION_TABLE) > 0:
break
# TODO(yic) Use gcs pubsub
self._worker.redis_client.lpush("Exports", "a")
def export(self, remote_function):
"""Pickle a remote function and export it to redis.
Args:
@ -167,7 +199,7 @@ class FunctionActorManager:
})
self._worker.gcs_client.internal_kv_put(key, val, True,
KV_NAMESPACE_FUNCTION_TABLE)
self._worker.redis_client.rpush("Exports", key)
self.export_key(key)
def fetch_and_register_remote_function(self, key):
"""Import a remote function."""
@ -245,10 +277,6 @@ class FunctionActorManager:
function=function,
function_name=function_name,
max_calls=max_calls))
# Add the function to the function table.
self._worker.redis_client.rpush(
b"FunctionTable:" + function_id.binary(),
self._worker.worker_id)
def get_execution_info(self, job_id, function_descriptor):
"""Get the FunctionExecutionInfo of a remote function.
@ -367,7 +395,7 @@ class FunctionActorManager:
pickle.dumps(actor_class_info),
True,
KV_NAMESPACE_FUNCTION_TABLE)
self._worker.redis_client.rpush("Exports", key)
self.export_key(key)
def export_actor_class(self, Class, actor_creation_function_descriptor,
actor_method_names):

View file

@ -279,7 +279,7 @@ class GcsClient:
@_auto_reconnect
def internal_kv_keys(self, prefix: bytes,
namespace: Optional[str] = None) -> List[bytes]:
namespace: Optional[str]) -> List[bytes]:
logger.debug(f"internal_kv_keys {prefix} {namespace}")
prefix = _make_key(namespace, prefix)
req = gcs_service_pb2.InternalKVKeysRequest(prefix=prefix)

View file

@ -37,6 +37,8 @@ class ImportThread:
self.gcs_client = worker.gcs_client
self.threads_stopped = threads_stopped
self.imported_collision_identifiers = defaultdict(int)
# Keep track of the number of imports that we've imported.
self.num_imported = 0
def start(self):
"""Start the import thread."""
@ -56,16 +58,9 @@ class ImportThread:
# import_pubsub_client.subscribe and before the call to
# import_pubsub_client.listen will still be processed in the loop.
import_pubsub_client.subscribe("__keyspace@0__:Exports")
# Keep track of the number of imports that we've imported.
num_imported = 0
try:
# Get the exports that occurred before the call to subscribe.
export_keys = self.redis_client.lrange("Exports", 0, -1)
for key in export_keys:
num_imported += 1
self._process_key(key)
self._do_importing()
while True:
# Exit if we received a signal that we should stop.
if self.threads_stopped.is_set():
@ -75,22 +70,27 @@ class ImportThread:
if msg is None:
self.threads_stopped.wait(timeout=0.01)
continue
if msg["type"] == "subscribe":
continue
assert msg["data"] == b"rpush"
num_imports = self.redis_client.llen("Exports")
assert num_imports >= num_imported
for i in range(num_imported, num_imports):
num_imported += 1
key = self.redis_client.lindex("Exports", i)
self._process_key(key)
self._do_importing()
except (OSError, redis.exceptions.ConnectionError, grpc.RpcError) as e:
logger.error(f"ImportThread: {e}")
finally:
# Close the pubsub client to avoid leaking file descriptors.
import_pubsub_client.close()
def _do_importing(self):
while True:
export_key = ray._private.function_manager.make_export_key(
self.num_imported + 1)
key = self.gcs_client.internal_kv_get(
export_key, ray_constants.KV_NAMESPACE_FUNCTION_TABLE)
if key is not None:
self._process_key(key)
self.num_imported += 1
else:
break
def _get_import_info_for_collision_detection(self, key):
"""Retrieve the collision identifier, type, and name of the import."""
if key.startswith(b"RemoteFunction"):

View file

@ -413,7 +413,7 @@ class Worker:
"function_id": function_to_run_id,
"function": pickled_function,
}), True, ray_constants.KV_NAMESPACE_FUNCTION_TABLE)
self.redis_client.rpush("Exports", key)
self.function_actor_manager.export_key(key)
# TODO(rkn): If the worker fails after it calls setnx and before it
# successfully completes the hset and rpush, then the program will
# most likely hang. This could be fixed by making these three
@ -1018,6 +1018,8 @@ def shutdown(_exiting_interpreter: bool = False):
if hasattr(global_worker, "core_worker"):
global_worker.core_worker.shutdown()
del global_worker.core_worker
# We need to reset function actor manager to clear the context
global_worker.function_actor_manager = FunctionActorManager(global_worker)
# Disconnect global state from GCS.
ray.state.state.disconnect()