mirror of
https://github.com/vale981/ray
synced 2025-03-05 18:11:42 -05:00
[GCS] enhance error message when failing to fetch GCS address or connecting to GCS (#21396)
There are test flakiness where GCS client failed to be created, but there is not enough information for debugging. The exception message will be printed after GCS client creation failure. Also, this PR breaks down GCS client creation to two steps: reading GCS address from Redis, and creating GCS client, which should help locating the issue.
This commit is contained in:
parent
3a3d0a4a2b
commit
bbf23ec59f
2 changed files with 15 additions and 13 deletions
|
@ -169,6 +169,9 @@ class GcsChannel:
|
|||
self._aio = aio
|
||||
|
||||
def connect(self):
|
||||
# GCS server uses a cached port, so it should use the same port after
|
||||
# restarting, whether in Redis or GCS bootstrapping mode. This means
|
||||
# GCS address should stay the same for the lifetime of the Ray cluster.
|
||||
if self._gcs_address is None:
|
||||
assert self._redis_client is not None
|
||||
gcs_address = get_gcs_address_from_redis(self._redis_client)
|
||||
|
|
|
@ -13,6 +13,7 @@ import sys
|
|||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from typing import Optional, Dict
|
||||
from collections import defaultdict
|
||||
|
@ -559,20 +560,19 @@ class Node:
|
|||
|
||||
def get_gcs_client(self):
|
||||
if self._gcs_client is None:
|
||||
num_retries = NUM_REDIS_GET_RETRIES
|
||||
for i in range(num_retries):
|
||||
for _ in range(NUM_REDIS_GET_RETRIES):
|
||||
gcs_address = None
|
||||
last_ex = None
|
||||
try:
|
||||
if use_gcs_for_bootstrap():
|
||||
self._gcs_client = GcsClient(address=self.gcs_address)
|
||||
else:
|
||||
self._gcs_client = GcsClient.create_from_redis(
|
||||
self.create_redis_client())
|
||||
break
|
||||
except Exception as e:
|
||||
logger.debug(f"Connecting to GCS: {e}")
|
||||
gcs_address = self.gcs_address
|
||||
self._gcs_client = GcsClient(address=gcs_address)
|
||||
except Exception:
|
||||
last_ex = traceback.format_exc()
|
||||
logger.debug(f"Connecting to GCS: {last_ex}")
|
||||
time.sleep(1)
|
||||
assert self._gcs_client is not None, \
|
||||
f"Failed to connect to GCS at {self._gcs_address}"
|
||||
assert self._gcs_client is not None, (
|
||||
f"Failed to connect to GCS at address={gcs_address}. "
|
||||
f"Last exception: {last_ex}")
|
||||
ray.experimental.internal_kv._initialize_internal_kv(
|
||||
self._gcs_client)
|
||||
return self._gcs_client
|
||||
|
@ -1046,7 +1046,6 @@ class Node:
|
|||
|
||||
self.start_gcs_server()
|
||||
assert self._gcs_client is not None
|
||||
# import pdb; pdb.set_trace()
|
||||
self._write_cluster_info_to_kv()
|
||||
|
||||
if not self._ray_params.no_monitor:
|
||||
|
|
Loading…
Add table
Reference in a new issue