mirror of
https://github.com/vale981/ray
synced 2025-03-05 10:01:43 -05:00
This reverts commit c17e171f92
.
Co-authored-by: 刘宝 <po.lb@antfin.com>
This commit is contained in:
parent
bda1a37e93
commit
8dfd471823
14 changed files with 721 additions and 3 deletions
|
@ -1957,6 +1957,7 @@ filegroup(
|
|||
"//src/ray/protobuf:agent_manager_py_proto",
|
||||
"//src/ray/protobuf:common_py_proto",
|
||||
"//src/ray/protobuf:core_worker_py_proto",
|
||||
"//src/ray/protobuf:event_py_proto",
|
||||
"//src/ray/protobuf:gcs_py_proto",
|
||||
"//src/ray/protobuf:gcs_service_py_proto",
|
||||
"//src/ray/protobuf:job_agent_py_proto",
|
||||
|
|
|
@ -45,6 +45,8 @@ class DataSource:
|
|||
job_actors = Dict()
|
||||
# {worker id(str): core worker stats}
|
||||
core_worker_stats = Dict()
|
||||
# {job id hex(str): {event id(str): event dict}}
|
||||
events = Dict()
|
||||
# {node ip (str): log entries by pid
|
||||
# (dict from pid to list of latest log entries)}
|
||||
ip_and_pid_to_logs = Dict()
|
||||
|
|
0
dashboard/modules/event/__init__.py
Normal file
0
dashboard/modules/event/__init__.py
Normal file
90
dashboard/modules/event/event_agent.py
Normal file
90
dashboard/modules/event/event_agent.py
Normal file
|
@ -0,0 +1,90 @@
|
|||
import os
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Union
|
||||
from grpc.experimental import aio as aiogrpc
|
||||
|
||||
import ray.new_dashboard.utils as dashboard_utils
|
||||
import ray.new_dashboard.consts as dashboard_consts
|
||||
from ray.ray_constants import env_bool
|
||||
from ray.new_dashboard.utils import async_loop_forever, create_task
|
||||
from ray.new_dashboard.modules.event import event_consts
|
||||
from ray.new_dashboard.modules.event.event_utils import monitor_events
|
||||
from ray.core.generated import event_pb2
|
||||
from ray.core.generated import event_pb2_grpc
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
routes = dashboard_utils.ClassMethodRouteTable
|
||||
|
||||
|
||||
@dashboard_utils.dashboard_module(
|
||||
enable=env_bool(event_consts.EVENT_MODULE_ENVIRONMENT_KEY, False))
|
||||
class EventAgent(dashboard_utils.DashboardAgentModule):
|
||||
def __init__(self, dashboard_agent):
|
||||
super().__init__(dashboard_agent)
|
||||
self._event_dir = os.path.join(self._dashboard_agent.log_dir, "events")
|
||||
os.makedirs(self._event_dir, exist_ok=True)
|
||||
self._monitor: Union[asyncio.Task, None] = None
|
||||
self._stub: Union[event_pb2_grpc.ReportEventServiceStub, None] = None
|
||||
self._cached_events = asyncio.Queue(
|
||||
event_consts.EVENT_AGENT_CACHE_SIZE)
|
||||
logger.info("Event agent cache buffer size: %s",
|
||||
self._cached_events.maxsize)
|
||||
|
||||
async def _connect_to_dashboard(self):
|
||||
""" Connect to the dashboard. If the dashboard is not started, then
|
||||
this method will never returns.
|
||||
|
||||
Returns:
|
||||
The ReportEventServiceStub object.
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
aioredis = self._dashboard_agent.aioredis_client
|
||||
dashboard_rpc_address = await aioredis.get(
|
||||
dashboard_consts.REDIS_KEY_DASHBOARD_RPC)
|
||||
if dashboard_rpc_address:
|
||||
logger.info("Report events to %s", dashboard_rpc_address)
|
||||
options = (("grpc.enable_http_proxy", 0), )
|
||||
channel = aiogrpc.insecure_channel(
|
||||
dashboard_rpc_address, options=options)
|
||||
return event_pb2_grpc.ReportEventServiceStub(channel)
|
||||
except Exception:
|
||||
logger.exception("Connect to dashboard failed.")
|
||||
await asyncio.sleep(
|
||||
event_consts.RETRY_CONNECT_TO_DASHBOARD_INTERVAL_SECONDS)
|
||||
|
||||
@async_loop_forever(event_consts.EVENT_AGENT_REPORT_INTERVAL_SECONDS)
|
||||
async def report_events(self):
|
||||
""" Report events from cached events queue. Reconnect to dashboard if
|
||||
report failed. Log error after retry EVENT_AGENT_RETRY_TIMES.
|
||||
|
||||
This method will never returns.
|
||||
"""
|
||||
data = await self._cached_events.get()
|
||||
for _ in range(event_consts.EVENT_AGENT_RETRY_TIMES):
|
||||
try:
|
||||
logger.info("Report %s events.", len(data))
|
||||
request = event_pb2.ReportEventsRequest(event_strings=data)
|
||||
await self._stub.ReportEvents(request)
|
||||
break
|
||||
except Exception:
|
||||
logger.exception("Report event failed, reconnect to the "
|
||||
"dashboard.")
|
||||
self._stub = await self._connect_to_dashboard()
|
||||
else:
|
||||
data_str = str(data)
|
||||
limit = event_consts.LOG_ERROR_EVENT_STRING_LENGTH_LIMIT
|
||||
logger.error("Report event failed: %s",
|
||||
data_str[:limit] + (data_str[limit:] and "..."))
|
||||
|
||||
async def run(self, server):
|
||||
# Connect to dashboard.
|
||||
self._stub = await self._connect_to_dashboard()
|
||||
# Start monitor task.
|
||||
self._monitor = monitor_events(
|
||||
self._event_dir,
|
||||
lambda data: create_task(self._cached_events.put(data)),
|
||||
source_types=event_consts.EVENT_AGENT_MONITOR_SOURCE_TYPES)
|
||||
# Start reporting events.
|
||||
await self.report_events()
|
26
dashboard/modules/event/event_consts.py
Normal file
26
dashboard/modules/event/event_consts.py
Normal file
|
@ -0,0 +1,26 @@
|
|||
from ray.ray_constants import env_integer
|
||||
from ray.core.generated import event_pb2
|
||||
|
||||
EVENT_MODULE_ENVIRONMENT_KEY = "RAY_DASHBOARD_MODULE_EVENT"
|
||||
LOG_ERROR_EVENT_STRING_LENGTH_LIMIT = 1000
|
||||
RETRY_CONNECT_TO_DASHBOARD_INTERVAL_SECONDS = 2
|
||||
# Monitor events
|
||||
SCAN_EVENT_DIR_INTERVAL_SECONDS = env_integer(
|
||||
"SCAN_EVENT_DIR_INTERVAL_SECONDS", 2)
|
||||
SCAN_EVENT_START_OFFSET_SECONDS = -30 * 60
|
||||
CONCURRENT_READ_LIMIT = 50
|
||||
EVENT_READ_LINE_COUNT_LIMIT = 200
|
||||
EVENT_READ_LINE_LENGTH_LIMIT = env_integer("EVENT_READ_LINE_LENGTH_LIMIT",
|
||||
2 * 1024 * 1024) # 2MB
|
||||
# Report events
|
||||
EVENT_AGENT_REPORT_INTERVAL_SECONDS = 0.1
|
||||
EVENT_AGENT_RETRY_TIMES = 10
|
||||
EVENT_AGENT_CACHE_SIZE = 10240
|
||||
# Event sources
|
||||
EVENT_HEAD_MONITOR_SOURCE_TYPES = [
|
||||
event_pb2.Event.SourceType.Name(event_pb2.Event.GCS)
|
||||
]
|
||||
EVENT_AGENT_MONITOR_SOURCE_TYPES = list(
|
||||
set(event_pb2.Event.SourceType.keys()) -
|
||||
set(EVENT_HEAD_MONITOR_SOURCE_TYPES))
|
||||
EVENT_SOURCE_ALL = event_pb2.Event.SourceType.keys()
|
89
dashboard/modules/event/event_head.py
Normal file
89
dashboard/modules/event/event_head.py
Normal file
|
@ -0,0 +1,89 @@
|
|||
import os
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Union
|
||||
from collections import OrderedDict, defaultdict
|
||||
|
||||
import aiohttp.web
|
||||
|
||||
import ray.new_dashboard.utils as dashboard_utils
|
||||
from ray.ray_constants import env_bool
|
||||
from ray.new_dashboard.modules.event import event_consts
|
||||
from ray.new_dashboard.modules.event.event_utils import (
|
||||
parse_event_strings,
|
||||
monitor_events,
|
||||
)
|
||||
from ray.core.generated import event_pb2
|
||||
from ray.core.generated import event_pb2_grpc
|
||||
from ray.new_dashboard.datacenter import DataSource
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
routes = dashboard_utils.ClassMethodRouteTable
|
||||
|
||||
JobEvents = OrderedDict
|
||||
dashboard_utils._json_compatible_types.add(JobEvents)
|
||||
|
||||
|
||||
@dashboard_utils.dashboard_module(
|
||||
enable=env_bool(event_consts.EVENT_MODULE_ENVIRONMENT_KEY, False))
|
||||
class EventHead(dashboard_utils.DashboardHeadModule,
|
||||
event_pb2_grpc.ReportEventServiceServicer):
|
||||
def __init__(self, dashboard_head):
|
||||
super().__init__(dashboard_head)
|
||||
self._event_dir = os.path.join(self._dashboard_head.log_dir, "events")
|
||||
os.makedirs(self._event_dir, exist_ok=True)
|
||||
self._monitor: Union[asyncio.Task, None] = None
|
||||
|
||||
@staticmethod
|
||||
def _update_events(event_list):
|
||||
# {job_id: {event_id: event}}
|
||||
all_job_events = defaultdict(JobEvents)
|
||||
for event in event_list:
|
||||
event_id = event["event_id"]
|
||||
custom_fields = event.get("custom_fields")
|
||||
system_event = False
|
||||
if custom_fields:
|
||||
job_id = custom_fields.get("job_id", "global") or "global"
|
||||
else:
|
||||
job_id = "global"
|
||||
if system_event is False:
|
||||
all_job_events[job_id][event_id] = event
|
||||
# TODO(fyrestone): Limit the event count per job.
|
||||
for job_id, new_job_events in all_job_events.items():
|
||||
job_events = DataSource.events.get(job_id, JobEvents())
|
||||
job_events.update(new_job_events)
|
||||
DataSource.events[job_id] = job_events
|
||||
|
||||
async def ReportEvents(self, request, context):
|
||||
received_events = []
|
||||
if request.event_strings:
|
||||
received_events.extend(parse_event_strings(request.event_strings))
|
||||
logger.info("Received %d events", len(received_events))
|
||||
self._update_events(received_events)
|
||||
return event_pb2.ReportEventsReply(send_success=True)
|
||||
|
||||
@routes.get("/events")
|
||||
@dashboard_utils.aiohttp_cache(2)
|
||||
async def get_event(self, req) -> aiohttp.web.Response:
|
||||
job_id = req.query.get("job_id")
|
||||
if job_id is None:
|
||||
all_events = {
|
||||
job_id: list(job_events.values())
|
||||
for job_id, job_events in DataSource.events.items()
|
||||
}
|
||||
return dashboard_utils.rest_response(
|
||||
success=True, message="All events fetched.", events=all_events)
|
||||
|
||||
job_events = DataSource.events.get(job_id, {})
|
||||
return dashboard_utils.rest_response(
|
||||
success=True,
|
||||
message="Job events fetched.",
|
||||
job_id=job_id,
|
||||
events=list(job_events.values()))
|
||||
|
||||
async def run(self, server):
|
||||
event_pb2_grpc.add_ReportEventServiceServicer_to_server(self, server)
|
||||
self._monitor = monitor_events(
|
||||
self._event_dir,
|
||||
lambda data: self._update_events(parse_event_strings(data)),
|
||||
source_types=event_consts.EVENT_HEAD_MONITOR_SOURCE_TYPES)
|
190
dashboard/modules/event/event_utils.py
Normal file
190
dashboard/modules/event/event_utils.py
Normal file
|
@ -0,0 +1,190 @@
|
|||
import os
|
||||
import time
|
||||
import mmap
|
||||
import json
|
||||
import fnmatch
|
||||
import asyncio
|
||||
import itertools
|
||||
import collections
|
||||
import logging.handlers
|
||||
|
||||
from ray.new_dashboard.modules.event import event_consts
|
||||
from ray.new_dashboard.utils import async_loop_forever, create_task
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _get_source_files(event_dir, source_types=None, event_file_filter=None):
|
||||
event_log_names = os.listdir(event_dir)
|
||||
source_files = {}
|
||||
all_source_types = set(event_consts.EVENT_SOURCE_ALL)
|
||||
for source_type in source_types or event_consts.EVENT_SOURCE_ALL:
|
||||
assert source_type in all_source_types, \
|
||||
f"Invalid source type: {source_type}"
|
||||
files = []
|
||||
for n in event_log_names:
|
||||
if fnmatch.fnmatch(n, f"*{source_type}*"):
|
||||
f = os.path.join(event_dir, n)
|
||||
if event_file_filter is not None and not event_file_filter(f):
|
||||
continue
|
||||
files.append(f)
|
||||
if files:
|
||||
source_files[source_type] = files
|
||||
return source_files
|
||||
|
||||
|
||||
def _restore_newline(event_dict):
|
||||
try:
|
||||
event_dict["message"] = event_dict["message"]\
|
||||
.replace("\\n", "\n")\
|
||||
.replace("\\r", "\n")
|
||||
except Exception:
|
||||
logger.exception("Restore newline for event failed: %s", event_dict)
|
||||
return event_dict
|
||||
|
||||
|
||||
def _parse_line(event_str):
|
||||
return _restore_newline(json.loads(event_str))
|
||||
|
||||
|
||||
def parse_event_strings(event_string_list):
|
||||
events = []
|
||||
for data in event_string_list:
|
||||
if not data:
|
||||
continue
|
||||
try:
|
||||
event = _parse_line(data)
|
||||
events.append(event)
|
||||
except Exception:
|
||||
logger.exception("Parse event line failed: %s", repr(data))
|
||||
return events
|
||||
|
||||
|
||||
ReadFileResult = collections.namedtuple(
|
||||
"ReadFileResult", ["fid", "size", "mtime", "position", "lines"])
|
||||
|
||||
|
||||
def _read_file(file,
|
||||
pos,
|
||||
n_lines=event_consts.EVENT_READ_LINE_COUNT_LIMIT,
|
||||
closefd=True):
|
||||
with open(file, "rb", closefd=closefd) as f:
|
||||
# The ino may be 0 on Windows.
|
||||
stat = os.stat(f.fileno())
|
||||
fid = stat.st_ino or file
|
||||
lines = []
|
||||
with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
|
||||
start = pos
|
||||
for _ in range(n_lines):
|
||||
sep = mm.find(b"\n", start)
|
||||
if sep == -1:
|
||||
break
|
||||
if sep - start <= event_consts.EVENT_READ_LINE_LENGTH_LIMIT:
|
||||
lines.append(mm[start:sep].decode("utf-8"))
|
||||
else:
|
||||
truncated_size = min(
|
||||
100, event_consts.EVENT_READ_LINE_LENGTH_LIMIT)
|
||||
logger.warning(
|
||||
"Ignored long string: %s...(%s chars)",
|
||||
mm[start:start + truncated_size].decode("utf-8"),
|
||||
sep - start)
|
||||
start = sep + 1
|
||||
return ReadFileResult(fid, stat.st_size, stat.st_mtime, start, lines)
|
||||
|
||||
|
||||
def monitor_events(
|
||||
event_dir,
|
||||
callback,
|
||||
scan_interval_seconds=event_consts.SCAN_EVENT_DIR_INTERVAL_SECONDS,
|
||||
start_mtime=time.time() + event_consts.SCAN_EVENT_START_OFFSET_SECONDS,
|
||||
monitor_files=None,
|
||||
source_types=None):
|
||||
""" Monitor events in directory. New events will be read and passed to the
|
||||
callback.
|
||||
|
||||
Args:
|
||||
event_dir (str): The event log directory.
|
||||
callback (def callback(List[str]): pass): A callback accepts a list of
|
||||
event strings.
|
||||
scan_interval_seconds (float): An interval seconds between two scans.
|
||||
start_mtime (float): Only the event log files whose last modification
|
||||
time is greater than start_mtime are monitored.
|
||||
monitor_files (Dict[int, MonitorFile]): The map from event log file id
|
||||
to MonitorFile object. Monitor all files start from the beginning
|
||||
if the value is None.
|
||||
source_types (List[str]): A list of source type name from
|
||||
event_pb2.Event.SourceType.keys(). Monitor all source types if the
|
||||
value is None.
|
||||
"""
|
||||
loop = asyncio.get_event_loop()
|
||||
if monitor_files is None:
|
||||
monitor_files = {}
|
||||
|
||||
logger.info(
|
||||
"Monitor events logs modified after %s on %s, "
|
||||
"the source types are %s.", start_mtime, event_dir, "all"
|
||||
if source_types is None else source_types)
|
||||
|
||||
MonitorFile = collections.namedtuple("MonitorFile",
|
||||
["size", "mtime", "position"])
|
||||
|
||||
def _source_file_filter(source_file):
|
||||
stat = os.stat(source_file)
|
||||
return stat.st_mtime > start_mtime
|
||||
|
||||
def _read_monitor_file(file, pos):
|
||||
assert isinstance(file, str), \
|
||||
f"File should be a str, but a {type(file)}({file}) found"
|
||||
fd = os.open(file, os.O_RDONLY)
|
||||
try:
|
||||
stat = os.stat(fd)
|
||||
# Check the file size to avoid raising the exception
|
||||
# ValueError: cannot mmap an empty file
|
||||
if stat.st_size <= 0:
|
||||
return []
|
||||
fid = stat.st_ino or file
|
||||
monitor_file = monitor_files.get(fid)
|
||||
if monitor_file:
|
||||
if (monitor_file.position == monitor_file.size
|
||||
and monitor_file.size == stat.st_size
|
||||
and monitor_file.mtime == stat.st_mtime):
|
||||
logger.debug(
|
||||
"Skip reading the file because "
|
||||
"there is no change: %s", file)
|
||||
return []
|
||||
position = monitor_file.position
|
||||
else:
|
||||
logger.info("Found new event log file: %s", file)
|
||||
position = pos
|
||||
# Close the fd in finally.
|
||||
r = _read_file(fd, position, closefd=False)
|
||||
# It should be fine to update the dict in executor thread.
|
||||
monitor_files[r.fid] = MonitorFile(r.size, r.mtime, r.position)
|
||||
loop.call_soon_threadsafe(callback, r.lines)
|
||||
except Exception as e:
|
||||
raise Exception(f"Read event file failed: {file}") from e
|
||||
finally:
|
||||
os.close(fd)
|
||||
|
||||
@async_loop_forever(scan_interval_seconds, cancellable=True)
|
||||
async def _scan_event_log_files():
|
||||
# Scan event files.
|
||||
source_files = await loop.run_in_executor(None, _get_source_files,
|
||||
event_dir, source_types,
|
||||
_source_file_filter)
|
||||
|
||||
# Limit concurrent read to avoid fd exhaustion.
|
||||
semaphore = asyncio.Semaphore(event_consts.CONCURRENT_READ_LIMIT)
|
||||
|
||||
async def _concurrent_coro(filename):
|
||||
async with semaphore:
|
||||
return await loop.run_in_executor(None, _read_monitor_file,
|
||||
filename, 0)
|
||||
|
||||
# Read files.
|
||||
await asyncio.gather(*[
|
||||
_concurrent_coro(filename)
|
||||
for filename in list(itertools.chain(*source_files.values()))
|
||||
])
|
||||
|
||||
return create_task(_scan_event_log_files())
|
264
dashboard/modules/event/tests/test_event.py
Normal file
264
dashboard/modules/event/tests/test_event.py
Normal file
|
@ -0,0 +1,264 @@
|
|||
import os
|
||||
import sys
|
||||
import time
|
||||
import json
|
||||
import copy
|
||||
import logging
|
||||
import requests
|
||||
import asyncio
|
||||
import random
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
import numpy as np
|
||||
|
||||
import ray
|
||||
from ray._private.utils import binary_to_hex
|
||||
from ray.new_dashboard.tests.conftest import * # noqa
|
||||
from ray.new_dashboard.modules.event import event_consts
|
||||
from ray.core.generated import event_pb2
|
||||
from ray.test_utils import (
|
||||
format_web_url,
|
||||
wait_until_server_available,
|
||||
wait_for_condition,
|
||||
)
|
||||
from ray.new_dashboard.modules.event.event_utils import (
|
||||
monitor_events, )
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _get_event(msg="empty message", job_id=None, source_type=None):
|
||||
return {
|
||||
"event_id": binary_to_hex(np.random.bytes(18)),
|
||||
"source_type": random.choice(event_pb2.Event.SourceType.keys())
|
||||
if source_type is None else source_type,
|
||||
"host_name": "po-dev.inc.alipay.net",
|
||||
"pid": random.randint(1, 65536),
|
||||
"label": "",
|
||||
"message": msg,
|
||||
"time_stamp": time.time(),
|
||||
"severity": "INFO",
|
||||
"custom_fields": {
|
||||
"job_id": ray.JobID.from_int(random.randint(1, 100)).hex()
|
||||
if job_id is None else job_id,
|
||||
"node_id": "",
|
||||
"task_id": "",
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def _test_logger(name, log_file, max_bytes, backup_count):
|
||||
handler = logging.handlers.RotatingFileHandler(
|
||||
log_file, maxBytes=max_bytes, backupCount=backup_count)
|
||||
formatter = logging.Formatter("%(message)s")
|
||||
handler.setFormatter(formatter)
|
||||
|
||||
logger = logging.getLogger(name)
|
||||
logger.propagate = False
|
||||
logger.setLevel(logging.INFO)
|
||||
logger.addHandler(handler)
|
||||
|
||||
return logger
|
||||
|
||||
|
||||
def test_event_basic(enable_event_module, disable_aiohttp_cache,
|
||||
ray_start_with_dashboard):
|
||||
assert (wait_until_server_available(ray_start_with_dashboard["webui_url"]))
|
||||
webui_url = format_web_url(ray_start_with_dashboard["webui_url"])
|
||||
session_dir = ray_start_with_dashboard["session_dir"]
|
||||
event_dir = os.path.join(session_dir, "logs", "events")
|
||||
job_id = ray.JobID.from_int(100).hex()
|
||||
|
||||
source_type_gcs = event_pb2.Event.SourceType.Name(event_pb2.Event.GCS)
|
||||
source_type_raylet = event_pb2.Event.SourceType.Name(
|
||||
event_pb2.Event.RAYLET)
|
||||
test_count = 20
|
||||
|
||||
for source_type in [source_type_gcs, source_type_raylet]:
|
||||
test_log_file = os.path.join(event_dir, f"event_{source_type}.log")
|
||||
test_logger = _test_logger(
|
||||
__name__ + str(random.random()),
|
||||
test_log_file,
|
||||
max_bytes=2000,
|
||||
backup_count=1000)
|
||||
for i in range(test_count):
|
||||
sample_event = _get_event(
|
||||
str(i), job_id=job_id, source_type=source_type)
|
||||
test_logger.info("%s", json.dumps(sample_event))
|
||||
|
||||
def _check_events():
|
||||
try:
|
||||
resp = requests.get(f"{webui_url}/events")
|
||||
resp.raise_for_status()
|
||||
result = resp.json()
|
||||
all_events = result["data"]["events"]
|
||||
job_events = all_events[job_id]
|
||||
assert len(job_events) >= test_count * 2
|
||||
source_messages = {}
|
||||
for e in job_events:
|
||||
source_type = e["sourceType"]
|
||||
message = e["message"]
|
||||
source_messages.setdefault(source_type, set()).add(message)
|
||||
assert len(source_messages[source_type_gcs]) >= test_count
|
||||
assert len(source_messages[source_type_raylet]) >= test_count
|
||||
data = {str(i) for i in range(test_count)}
|
||||
assert data & source_messages[source_type_gcs] == data
|
||||
assert data & source_messages[source_type_raylet] == data
|
||||
return True
|
||||
except Exception as ex:
|
||||
logger.exception(ex)
|
||||
return False
|
||||
|
||||
wait_for_condition(_check_events, timeout=15)
|
||||
|
||||
|
||||
def test_event_message_limit(enable_event_module, small_event_line_limit,
|
||||
disable_aiohttp_cache, ray_start_with_dashboard):
|
||||
event_read_line_length_limit = small_event_line_limit
|
||||
assert (wait_until_server_available(ray_start_with_dashboard["webui_url"]))
|
||||
webui_url = format_web_url(ray_start_with_dashboard["webui_url"])
|
||||
session_dir = ray_start_with_dashboard["session_dir"]
|
||||
event_dir = os.path.join(session_dir, "logs", "events")
|
||||
job_id = ray.JobID.from_int(100).hex()
|
||||
events = []
|
||||
# Sample event equals with limit.
|
||||
sample_event = _get_event("", job_id=job_id)
|
||||
message_len = event_read_line_length_limit - len(json.dumps(sample_event))
|
||||
for i in range(10):
|
||||
sample_event = copy.deepcopy(sample_event)
|
||||
sample_event["event_id"] = binary_to_hex(np.random.bytes(18))
|
||||
sample_event["message"] = str(i) * message_len
|
||||
assert len(json.dumps(sample_event)) == event_read_line_length_limit
|
||||
events.append(sample_event)
|
||||
# Sample event longer than limit.
|
||||
sample_event = copy.deepcopy(sample_event)
|
||||
sample_event["event_id"] = binary_to_hex(np.random.bytes(18))
|
||||
sample_event["message"] = "2" * (message_len + 1)
|
||||
assert len(json.dumps(sample_event)) > event_read_line_length_limit
|
||||
events.append(sample_event)
|
||||
|
||||
for i in range(event_consts.EVENT_READ_LINE_COUNT_LIMIT):
|
||||
events.append(_get_event(str(i), job_id=job_id))
|
||||
|
||||
with open(os.path.join(event_dir, "tmp.log"), "w") as f:
|
||||
f.writelines([(json.dumps(e) + "\n") for e in events])
|
||||
|
||||
try:
|
||||
os.remove(os.path.join(event_dir, "event_GCS.log"))
|
||||
except Exception:
|
||||
pass
|
||||
os.rename(
|
||||
os.path.join(event_dir, "tmp.log"),
|
||||
os.path.join(event_dir, "event_GCS.log"))
|
||||
|
||||
def _check_events():
|
||||
try:
|
||||
resp = requests.get(f"{webui_url}/events")
|
||||
resp.raise_for_status()
|
||||
result = resp.json()
|
||||
all_events = result["data"]["events"]
|
||||
assert len(all_events[job_id]
|
||||
) >= event_consts.EVENT_READ_LINE_COUNT_LIMIT + 10
|
||||
messages = [e["message"] for e in all_events[job_id]]
|
||||
for i in range(10):
|
||||
assert str(i) * message_len in messages
|
||||
assert "2" * (message_len + 1) not in messages
|
||||
assert str(event_consts.EVENT_READ_LINE_COUNT_LIMIT -
|
||||
1) in messages
|
||||
return True
|
||||
except Exception as ex:
|
||||
logger.exception(ex)
|
||||
return False
|
||||
|
||||
wait_for_condition(_check_events, timeout=15)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_monitor_events(enable_event_module):
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
common = event_pb2.Event.SourceType.Name(event_pb2.Event.COMMON)
|
||||
common_log = os.path.join(temp_dir, f"event_{common}.log")
|
||||
test_logger = _test_logger(
|
||||
__name__ + str(random.random()),
|
||||
common_log,
|
||||
max_bytes=10,
|
||||
backup_count=10)
|
||||
test_events1 = []
|
||||
monitor_task = monitor_events(
|
||||
temp_dir,
|
||||
lambda x: test_events1.extend(x),
|
||||
scan_interval_seconds=0.01)
|
||||
assert not monitor_task.done()
|
||||
count = 10
|
||||
|
||||
async def _writer(*args, read_events, spin=True):
|
||||
for x in range(*args):
|
||||
test_logger.info("%s", x)
|
||||
if spin:
|
||||
while str(x) not in read_events:
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
async def _check_events(expect_events, read_events, timeout=10):
|
||||
start_time = time.time()
|
||||
while True:
|
||||
sorted_events = sorted(int(i) for i in read_events)
|
||||
sorted_events = [str(i) for i in sorted_events]
|
||||
if time.time() - start_time > timeout:
|
||||
raise TimeoutError(
|
||||
f"Timeout, read events: {sorted_events}, "
|
||||
f"expect events: {expect_events}")
|
||||
if len(sorted_events) == len(expect_events):
|
||||
if sorted_events == expect_events:
|
||||
break
|
||||
await asyncio.sleep(1)
|
||||
|
||||
await asyncio.gather(
|
||||
_writer(count, read_events=test_events1),
|
||||
_check_events(
|
||||
[str(i) for i in range(count)], read_events=test_events1))
|
||||
|
||||
monitor_task.cancel()
|
||||
test_events2 = []
|
||||
monitor_task = monitor_events(
|
||||
temp_dir,
|
||||
lambda x: test_events2.extend(x),
|
||||
scan_interval_seconds=0.1)
|
||||
|
||||
await _check_events(
|
||||
[str(i) for i in range(count)], read_events=test_events2)
|
||||
|
||||
await _writer(count, count * 2, read_events=test_events2)
|
||||
await _check_events(
|
||||
[str(i) for i in range(count * 2)], read_events=test_events2)
|
||||
|
||||
log_file_count = len(os.listdir(temp_dir))
|
||||
|
||||
test_logger = _test_logger(
|
||||
__name__ + str(random.random()),
|
||||
common_log,
|
||||
max_bytes=1000,
|
||||
backup_count=10)
|
||||
assert len(os.listdir(temp_dir)) == log_file_count
|
||||
|
||||
await _writer(
|
||||
count * 2, count * 3, spin=False, read_events=test_events2)
|
||||
await _check_events(
|
||||
[str(i) for i in range(count * 3)], read_events=test_events2)
|
||||
await _writer(
|
||||
count * 3, count * 4, spin=False, read_events=test_events2)
|
||||
await _check_events(
|
||||
[str(i) for i in range(count * 4)], read_events=test_events2)
|
||||
|
||||
# Test cancel monitor task.
|
||||
monitor_task.cancel()
|
||||
with pytest.raises(asyncio.CancelledError):
|
||||
await monitor_task
|
||||
assert monitor_task.done()
|
||||
|
||||
assert len(
|
||||
os.listdir(temp_dir)) > 1, "Event log should have rollovers."
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(pytest.main(["-v", __file__]))
|
|
@ -294,8 +294,9 @@ class NodeHead(dashboard_utils.DashboardHeadModule):
|
|||
if match:
|
||||
pid = match.group(1)
|
||||
ip = match.group(2)
|
||||
errs_for_ip = DataSource.ip_and_pid_to_errors.get(ip, {})
|
||||
pid_errors = errs_for_ip.get(pid, [])
|
||||
errs_for_ip = dict(
|
||||
DataSource.ip_and_pid_to_errors.get(ip, {}))
|
||||
pid_errors = list(errs_for_ip.get(pid, []))
|
||||
pid_errors.append({
|
||||
"message": message,
|
||||
"timestamp": error_data.timestamp,
|
||||
|
|
|
@ -10,6 +10,13 @@ def enable_test_module():
|
|||
os.environ.pop("RAY_DASHBOARD_MODULE_TEST", None)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def enable_event_module():
|
||||
os.environ["RAY_DASHBOARD_MODULE_EVENT"] = "true"
|
||||
yield
|
||||
os.environ.pop("RAY_DASHBOARD_MODULE_EVENT", None)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def disable_aiohttp_cache():
|
||||
os.environ["RAY_DASHBOARD_NO_CACHE"] = "true"
|
||||
|
@ -38,3 +45,10 @@ def set_http_proxy():
|
|||
os.environ["https_proxy"] = https_proxy
|
||||
else:
|
||||
del os.environ["https_proxy"]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def small_event_line_limit():
|
||||
os.environ["EVENT_READ_LINE_LENGTH_LIMIT"] = "1024"
|
||||
yield 1024
|
||||
os.environ.pop("EVENT_READ_LINE_LENGTH_LIMIT", None)
|
||||
|
|
|
@ -320,6 +320,22 @@ def test_async_loop_forever():
|
|||
loop.run_forever()
|
||||
assert counter[0] > 2
|
||||
|
||||
counter2 = [0]
|
||||
task = None
|
||||
|
||||
@dashboard_utils.async_loop_forever(interval_seconds=0.1, cancellable=True)
|
||||
async def bar():
|
||||
nonlocal task
|
||||
counter2[0] += 1
|
||||
if counter2[0] > 2:
|
||||
task.cancel()
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
task = loop.create_task(bar())
|
||||
with pytest.raises(asyncio.CancelledError):
|
||||
loop.run_until_complete(task)
|
||||
assert counter2[0] == 3
|
||||
|
||||
|
||||
def test_dashboard_module_decorator(enable_test_module):
|
||||
head_cls_list = dashboard_utils.get_all_modules(
|
||||
|
|
|
@ -669,13 +669,21 @@ async def get_aioredis_client(redis_address, redis_password,
|
|||
address=redis_address, password=redis_password)
|
||||
|
||||
|
||||
def async_loop_forever(interval_seconds):
|
||||
def async_loop_forever(interval_seconds, cancellable=False):
|
||||
def _wrapper(coro):
|
||||
@functools.wraps(coro)
|
||||
async def _looper(*args, **kwargs):
|
||||
while True:
|
||||
try:
|
||||
await coro(*args, **kwargs)
|
||||
except asyncio.CancelledError as ex:
|
||||
if cancellable:
|
||||
logger.info(f"An async loop forever coroutine "
|
||||
f"is cancelled {coro}.")
|
||||
raise ex
|
||||
else:
|
||||
logger.exception(f"Can not cancel the async loop "
|
||||
f"forever coroutine {coro}.")
|
||||
except Exception:
|
||||
logger.exception(f"Error looping coroutine {coro}.")
|
||||
await asyncio.sleep(interval_seconds)
|
||||
|
|
|
@ -145,6 +145,11 @@ cc_proto_library(
|
|||
deps = [":event_proto"],
|
||||
)
|
||||
|
||||
python_grpc_compile(
|
||||
name = "event_py_proto",
|
||||
deps = [":event_proto"],
|
||||
)
|
||||
|
||||
# Job agent.
|
||||
proto_library(
|
||||
name = "job_agent_proto",
|
||||
|
|
|
@ -40,3 +40,15 @@ message Event {
|
|||
// store custom key such as node_id, job_id, task_id
|
||||
map<string, string> custom_fields = 9;
|
||||
}
|
||||
|
||||
message ReportEventsReply {
|
||||
bool send_success = 1;
|
||||
}
|
||||
|
||||
message ReportEventsRequest {
|
||||
repeated string event_strings = 1;
|
||||
}
|
||||
|
||||
service ReportEventService {
|
||||
rpc ReportEvents(ReportEventsRequest) returns (ReportEventsReply);
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue