[Core] Publish gcs server failure to drivers. (#11265)

* Done.

* Fixed.
This commit is contained in:
SangBin Cho 2020-10-08 08:59:31 -07:00 committed by GitHub
parent 37fa86f9a0
commit 174bef56d4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 47 additions and 2 deletions

View file

@ -94,7 +94,8 @@ class LogMonitor:
try:
# Test if the worker process that generated the log file
# is still alive. Only applies to worker processes.
if file_info.worker_pid != "raylet":
if (file_info.worker_pid != "raylet"
and file_info.worker_pid != "gcs_server"):
os.kill(file_info.worker_pid, 0)
except OSError:
# The process is not alive any more, so move the log file
@ -121,7 +122,9 @@ class LogMonitor:
log_file_paths = glob.glob(f"{self.logs_dir}/worker*[.out|.err]")
# segfaults and other serious errors are logged here
raylet_err_paths = glob.glob(f"{self.logs_dir}/raylet*.err")
for file_path in log_file_paths + raylet_err_paths:
# If gcs server restarts, there can be multiple log files.
gcs_err_path = glob.glob(f"{self.logs_dir}/gcs_server*.err")
for file_path in log_file_paths + raylet_err_paths + gcs_err_path:
if os.path.isfile(
file_path) and file_path not in self.log_filenames:
job_match = JOB_LOG_PATTERN.match(file_path)
@ -240,6 +243,8 @@ class LogMonitor:
lines_to_publish = lines_to_publish[1:]
elif "/raylet" in file_info.filename:
file_info.worker_pid = "raylet"
elif "/gcs_server" in file_info.filename:
file_info.worker_pid = "gcs_server"
# Record the current position in the file.
file_info.file_position = file_info.file_handle.tell()

View file

@ -227,3 +227,13 @@ def error_pubsub():
p = init_error_pubsub()
yield p
p.close()
@pytest.fixture()
def log_pubsub():
p = ray.worker.global_worker.redis_client.pubsub(
ignore_subscribe_messages=True)
log_channel = ray.gcs_utils.LOG_FILE_CHANNEL
p.psubscribe(log_channel)
yield p
p.close()

View file

@ -1,5 +1,7 @@
import json
import logging
import os
import signal
import sys
import tempfile
import threading
@ -1246,6 +1248,34 @@ def test_fate_sharing(ray_start_cluster, use_actors, node_failure):
assert len(keys) <= 2, len(keys)
@pytest.mark.parametrize(
"ray_start_regular", [{
"_system_config": {
"ping_gcs_rpc_server_max_retries": 100
}
}],
indirect=True)
def test_gcs_server_failiure_report(ray_start_regular, log_pubsub):
p = log_pubsub
# Get gcs server pid to send a signal.
all_processes = ray.worker._global_node.all_processes
gcs_server_process = all_processes["gcs_server"][0].process
gcs_server_pid = gcs_server_process.pid
os.kill(gcs_server_pid, signal.SIGBUS)
msg = None
cnt = 0
# wait for max 30 seconds.
while cnt < 3000 and not msg:
msg = p.get_message()
if msg is None:
time.sleep(0.01)
cnt += 1
continue
data = json.loads(ray.utils.decode(msg["data"]))
assert data["pid"] == "gcs_server"
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))