From eed0ffc6ff71c3c1950ea758f90e8e188a00b8b4 Mon Sep 17 00:00:00 2001 From: Tao Wang Date: Mon, 12 Jul 2021 17:42:13 +0800 Subject: [PATCH] =?UTF-8?q?[Core]Align=20storage=20of=20session=5Fdir=20in?= =?UTF-8?q?=20java/python=20so=20it=20can=20be=20accessed=20u=E2=80=A6=20(?= =?UTF-8?q?#16958)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Align storage of session_dir in java/python so they can be accessed using internal kv manager * align cpp --- cpp/src/ray/util/process_helper.cc | 2 +- .../java/io/ray/runtime/RayNativeRuntime.java | 2 +- python/ray/node.py | 23 +++++++++++++++++-- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/cpp/src/ray/util/process_helper.cc b/cpp/src/ray/util/process_helper.cc index 92bfd86dd..9fb5443fa 100644 --- a/cpp/src/ray/util/process_helper.cc +++ b/cpp/src/ray/util/process_helper.cc @@ -17,7 +17,7 @@ static std::string GetSessionDir(std::string redis_ip, int port, std::string pas RAY_CHECK(auth_reply->type != REDIS_REPLY_ERROR); freeReplyObject(auth_reply); } - auto reply = (redisReply *)redisCommand(context, "GET session_dir"); + auto reply = (redisReply *)redisCommand(context, "HGET session_dir value"); RAY_CHECK(reply->type != REDIS_REPLY_ERROR); std::string session_dir(reply->str); freeReplyObject(reply); diff --git a/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java b/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java index 66ee168e7..dd79f20b8 100644 --- a/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java +++ b/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java @@ -51,7 +51,7 @@ public final class RayNativeRuntime extends AbstractRayRuntime { if (rayConfig.workerMode == WorkerType.DRIVER) { // Fetch session dir from GCS if this is a driver. RedisClient client = new RedisClient(rayConfig.getRedisAddress(), rayConfig.redisPassword); - final String sessionDir = client.get("session_dir", null); + final String sessionDir = client.get("session_dir", "value"); Preconditions.checkNotNull(sessionDir); rayConfig.setSessionDir(sessionDir); } diff --git a/python/ray/node.py b/python/ray/node.py index 186f70461..9ed0110ec 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -50,6 +50,24 @@ def _get_with_retry(redis_client, key, num_retries=NUM_REDIS_GET_RETRIES): return result +def _hget_with_retry(redis_client, + key, + field, + num_retries=NUM_REDIS_GET_RETRIES): + result = None + for i in range(num_retries): + result = redis_client.hget(key, field) + if result is not None: + break + else: + logger.debug(f"Fetched {key}=None from redis. Retrying.") + time.sleep(2) + if not result: + raise RuntimeError(f"Could not read '{key}' from GCS (redis). " + "Has redis started correctly on the head node?") + return result + + class Node: """An encapsulation of the Ray processes on a single node. @@ -230,7 +248,7 @@ class Node: self.start_head_processes() redis_client = self.create_redis_client() redis_client.set("session_name", self.session_name) - redis_client.set("session_dir", self._session_dir) + redis_client.hset("session_dir", "value", self._session_dir) redis_client.set("temp_dir", self._temp_dir) # Add tracing_startup_hook to redis / internal kv manually # since internal kv is not yet initialized. @@ -288,7 +306,8 @@ class Node: if self.head: self._session_dir = os.path.join(self._temp_dir, self.session_name) else: - session_dir = _get_with_retry(redis_client, "session_dir") + session_dir = _hget_with_retry(redis_client, "session_dir", + "value") self._session_dir = ray._private.utils.decode(session_dir) session_symlink = os.path.join(self._temp_dir, SESSION_LATEST)