[Core]Align storage of session_dir in java/python so it can be accessed u… (#16958)

* Align storage of session_dir in java/python so they can be accessed using internal kv manager

* align cpp
This commit is contained in:
Tao Wang 2021-07-12 17:42:13 +08:00 committed by GitHub
parent 298d2afc35
commit eed0ffc6ff
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 23 additions and 4 deletions

View file

@ -17,7 +17,7 @@ static std::string GetSessionDir(std::string redis_ip, int port, std::string pas
RAY_CHECK(auth_reply->type != REDIS_REPLY_ERROR);
freeReplyObject(auth_reply);
}
auto reply = (redisReply *)redisCommand(context, "GET session_dir");
auto reply = (redisReply *)redisCommand(context, "HGET session_dir value");
RAY_CHECK(reply->type != REDIS_REPLY_ERROR);
std::string session_dir(reply->str);
freeReplyObject(reply);

View file

@ -51,7 +51,7 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
if (rayConfig.workerMode == WorkerType.DRIVER) {
// Fetch session dir from GCS if this is a driver.
RedisClient client = new RedisClient(rayConfig.getRedisAddress(), rayConfig.redisPassword);
final String sessionDir = client.get("session_dir", null);
final String sessionDir = client.get("session_dir", "value");
Preconditions.checkNotNull(sessionDir);
rayConfig.setSessionDir(sessionDir);
}

View file

@ -50,6 +50,24 @@ def _get_with_retry(redis_client, key, num_retries=NUM_REDIS_GET_RETRIES):
return result
def _hget_with_retry(redis_client,
key,
field,
num_retries=NUM_REDIS_GET_RETRIES):
result = None
for i in range(num_retries):
result = redis_client.hget(key, field)
if result is not None:
break
else:
logger.debug(f"Fetched {key}=None from redis. Retrying.")
time.sleep(2)
if not result:
raise RuntimeError(f"Could not read '{key}' from GCS (redis). "
"Has redis started correctly on the head node?")
return result
class Node:
"""An encapsulation of the Ray processes on a single node.
@ -230,7 +248,7 @@ class Node:
self.start_head_processes()
redis_client = self.create_redis_client()
redis_client.set("session_name", self.session_name)
redis_client.set("session_dir", self._session_dir)
redis_client.hset("session_dir", "value", self._session_dir)
redis_client.set("temp_dir", self._temp_dir)
# Add tracing_startup_hook to redis / internal kv manually
# since internal kv is not yet initialized.
@ -288,7 +306,8 @@ class Node:
if self.head:
self._session_dir = os.path.join(self._temp_dir, self.session_name)
else:
session_dir = _get_with_retry(redis_client, "session_dir")
session_dir = _hget_with_retry(redis_client, "session_dir",
"value")
self._session_dir = ray._private.utils.decode(session_dir)
session_symlink = os.path.join(self._temp_dir, SESSION_LATEST)