[core] Add retry for reading session name (#11844)

This commit is contained in:
Richard Liaw 2020-11-09 11:22:50 -08:00 committed by GitHub
parent 88be1ea20b
commit a09e49ee94
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -26,7 +26,23 @@ from ray.utils import try_to_create_directory, try_to_symlink, open_log
logger = logging.getLogger(__name__)
SESSION_LATEST = "session_latest"
NUMBER_OF_PORT_RETRIES = 40
NUM_PORT_RETRIES = 40
NUM_REDIS_GET_RETRIES = 20
def _get_with_retry(redis_client, key, num_retries=NUM_REDIS_GET_RETRIES):
result = None
for i in range(num_retries):
result = redis_client.get(key)
if result is not None:
break
else:
logger.debug(f"Fetched {key}=None from redis. Retrying.")
time.sleep(2)
if not result:
raise RuntimeError(f"Could not read '{key}' from GCS (redis). "
"Has redis started correctly on the head node?")
return result
class Node:
@ -139,8 +155,8 @@ class Node:
self.session_name = f"session_{date_str}_{os.getpid()}"
else:
redis_client = self.create_redis_client()
self.session_name = ray.utils.decode(
redis_client.get("session_name"))
session_name = _get_with_retry(redis_client, "session_name")
self.session_name = ray.utils.decode(session_name)
self._init_temp(redis_client)
@ -230,15 +246,16 @@ class Node:
if self.head:
self._temp_dir = self._ray_params.temp_dir
else:
self._temp_dir = ray.utils.decode(redis_client.get("temp_dir"))
temp_dir = _get_with_retry(redis_client, "temp_dir")
self._temp_dir = ray.utils.decode(temp_dir)
try_to_create_directory(self._temp_dir)
if self.head:
self._session_dir = os.path.join(self._temp_dir, self.session_name)
else:
self._session_dir = ray.utils.decode(
redis_client.get("session_dir"))
session_dir = _get_with_retry(redis_client, "session_dir")
self._session_dir = ray.utils.decode(session_dir)
session_symlink = os.path.join(self._temp_dir, SESSION_LATEST)
# Send a warning message if the session exists.
@ -494,7 +511,7 @@ class Node:
# Try to generate a port that is far above the 'next available' one.
# This solves issue #8254 where GRPC fails because the port assigned
# from this method has been used by a different process.
for _ in range(NUMBER_OF_PORT_RETRIES):
for _ in range(NUM_PORT_RETRIES):
new_port = random.randint(port, 65535)
new_s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try: