From a09e49ee94dc33118381224adb58df2f6600d151 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Mon, 9 Nov 2020 11:22:50 -0800 Subject: [PATCH] [core] Add retry for reading session name (#11844) --- python/ray/node.py | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/python/ray/node.py b/python/ray/node.py index 9c49895cc..882dcb42a 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -26,7 +26,23 @@ from ray.utils import try_to_create_directory, try_to_symlink, open_log logger = logging.getLogger(__name__) SESSION_LATEST = "session_latest" -NUMBER_OF_PORT_RETRIES = 40 +NUM_PORT_RETRIES = 40 +NUM_REDIS_GET_RETRIES = 20 + + +def _get_with_retry(redis_client, key, num_retries=NUM_REDIS_GET_RETRIES): + result = None + for i in range(num_retries): + result = redis_client.get(key) + if result is not None: + break + else: + logger.debug(f"Fetched {key}=None from redis. Retrying.") + time.sleep(2) + if not result: + raise RuntimeError(f"Could not read '{key}' from GCS (redis). " + "Has redis started correctly on the head node?") + return result class Node: @@ -139,8 +155,8 @@ class Node: self.session_name = f"session_{date_str}_{os.getpid()}" else: redis_client = self.create_redis_client() - self.session_name = ray.utils.decode( - redis_client.get("session_name")) + session_name = _get_with_retry(redis_client, "session_name") + self.session_name = ray.utils.decode(session_name) self._init_temp(redis_client) @@ -230,15 +246,16 @@ class Node: if self.head: self._temp_dir = self._ray_params.temp_dir else: - self._temp_dir = ray.utils.decode(redis_client.get("temp_dir")) + temp_dir = _get_with_retry(redis_client, "temp_dir") + self._temp_dir = ray.utils.decode(temp_dir) try_to_create_directory(self._temp_dir) if self.head: self._session_dir = os.path.join(self._temp_dir, self.session_name) else: - self._session_dir = ray.utils.decode( - redis_client.get("session_dir")) + session_dir = _get_with_retry(redis_client, "session_dir") + self._session_dir = ray.utils.decode(session_dir) session_symlink = os.path.join(self._temp_dir, SESSION_LATEST) # Send a warning message if the session exists. @@ -494,7 +511,7 @@ class Node: # Try to generate a port that is far above the 'next available' one. # This solves issue #8254 where GRPC fails because the port assigned # from this method has been used by a different process. - for _ in range(NUMBER_OF_PORT_RETRIES): + for _ in range(NUM_PORT_RETRIES): new_port = random.randint(port, 65535) new_s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: