From c17e171f92d3964cf4ae87e0e06b01d190bcddbd Mon Sep 17 00:00:00 2001 From: Amog Kamsetty Date: Tue, 13 Jul 2021 23:18:43 -0700 Subject: [PATCH] Revert "[Dashboard][event] Basic event module (#16985)" (#17068) This reverts commit f1faa79a0428b5df9adc693d9350749674035db5. --- BUILD.bazel | 1 - dashboard/datacenter.py | 2 - dashboard/modules/event/__init__.py | 0 dashboard/modules/event/event_agent.py | 90 ------- dashboard/modules/event/event_consts.py | 26 -- dashboard/modules/event/event_head.py | 89 ------- dashboard/modules/event/event_utils.py | 190 -------------- dashboard/modules/event/tests/test_event.py | 264 -------------------- dashboard/modules/node/node_head.py | 5 +- dashboard/tests/conftest.py | 14 -- dashboard/tests/test_dashboard.py | 16 -- dashboard/utils.py | 10 +- src/ray/protobuf/BUILD | 5 - src/ray/protobuf/event.proto | 12 - 14 files changed, 3 insertions(+), 721 deletions(-) delete mode 100644 dashboard/modules/event/__init__.py delete mode 100644 dashboard/modules/event/event_agent.py delete mode 100644 dashboard/modules/event/event_consts.py delete mode 100644 dashboard/modules/event/event_head.py delete mode 100644 dashboard/modules/event/event_utils.py delete mode 100644 dashboard/modules/event/tests/test_event.py diff --git a/BUILD.bazel b/BUILD.bazel index 31faa1c3f..4098d15de 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1957,7 +1957,6 @@ filegroup( "//src/ray/protobuf:agent_manager_py_proto", "//src/ray/protobuf:common_py_proto", "//src/ray/protobuf:core_worker_py_proto", - "//src/ray/protobuf:event_py_proto", "//src/ray/protobuf:gcs_py_proto", "//src/ray/protobuf:gcs_service_py_proto", "//src/ray/protobuf:job_agent_py_proto", diff --git a/dashboard/datacenter.py b/dashboard/datacenter.py index 7b4528176..b15e67891 100644 --- a/dashboard/datacenter.py +++ b/dashboard/datacenter.py @@ -45,8 +45,6 @@ class DataSource: job_actors = Dict() # {worker id(str): core worker stats} core_worker_stats = Dict() - # {job id hex(str): {event id(str): event dict}} - events = Dict() # {node ip (str): log entries by pid # (dict from pid to list of latest log entries)} ip_and_pid_to_logs = Dict() diff --git a/dashboard/modules/event/__init__.py b/dashboard/modules/event/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/dashboard/modules/event/event_agent.py b/dashboard/modules/event/event_agent.py deleted file mode 100644 index bc4651777..000000000 --- a/dashboard/modules/event/event_agent.py +++ /dev/null @@ -1,90 +0,0 @@ -import os -import asyncio -import logging -from typing import Union -from grpc.experimental import aio as aiogrpc - -import ray.new_dashboard.utils as dashboard_utils -import ray.new_dashboard.consts as dashboard_consts -from ray.ray_constants import env_bool -from ray.new_dashboard.utils import async_loop_forever, create_task -from ray.new_dashboard.modules.event import event_consts -from ray.new_dashboard.modules.event.event_utils import monitor_events -from ray.core.generated import event_pb2 -from ray.core.generated import event_pb2_grpc - -logger = logging.getLogger(__name__) -routes = dashboard_utils.ClassMethodRouteTable - - -@dashboard_utils.dashboard_module( - enable=env_bool(event_consts.EVENT_MODULE_ENVIRONMENT_KEY, False)) -class EventAgent(dashboard_utils.DashboardAgentModule): - def __init__(self, dashboard_agent): - super().__init__(dashboard_agent) - self._event_dir = os.path.join(self._dashboard_agent.log_dir, "events") - os.makedirs(self._event_dir, exist_ok=True) - self._monitor: Union[asyncio.Task, None] = None - self._stub: Union[event_pb2_grpc.ReportEventServiceStub, None] = None - self._cached_events = asyncio.Queue( - event_consts.EVENT_AGENT_CACHE_SIZE) - logger.info("Event agent cache buffer size: %s", - self._cached_events.maxsize) - - async def _connect_to_dashboard(self): - """ Connect to the dashboard. If the dashboard is not started, then - this method will never returns. - - Returns: - The ReportEventServiceStub object. - """ - while True: - try: - aioredis = self._dashboard_agent.aioredis_client - dashboard_rpc_address = await aioredis.get( - dashboard_consts.REDIS_KEY_DASHBOARD_RPC) - if dashboard_rpc_address: - logger.info("Report events to %s", dashboard_rpc_address) - options = (("grpc.enable_http_proxy", 0), ) - channel = aiogrpc.insecure_channel( - dashboard_rpc_address, options=options) - return event_pb2_grpc.ReportEventServiceStub(channel) - except Exception: - logger.exception("Connect to dashboard failed.") - await asyncio.sleep( - event_consts.RETRY_CONNECT_TO_DASHBOARD_INTERVAL_SECONDS) - - @async_loop_forever(event_consts.EVENT_AGENT_REPORT_INTERVAL_SECONDS) - async def report_events(self): - """ Report events from cached events queue. Reconnect to dashboard if - report failed. Log error after retry EVENT_AGENT_RETRY_TIMES. - - This method will never returns. - """ - data = await self._cached_events.get() - for _ in range(event_consts.EVENT_AGENT_RETRY_TIMES): - try: - logger.info("Report %s events.", len(data)) - request = event_pb2.ReportEventsRequest(event_strings=data) - await self._stub.ReportEvents(request) - break - except Exception: - logger.exception("Report event failed, reconnect to the " - "dashboard.") - self._stub = await self._connect_to_dashboard() - else: - data_str = str(data) - limit = event_consts.LOG_ERROR_EVENT_STRING_LENGTH_LIMIT - logger.error("Report event failed: %s", - data_str[:limit] + (data_str[limit:] and "...")) - - async def run(self, server): - # Connect to dashboard. - self._stub = await self._connect_to_dashboard() - # Start monitor task. - self._monitor = monitor_events( - self._event_dir, - lambda data: create_task(self._cached_events.put(data)), - source_types=event_consts.EVENT_AGENT_MONITOR_SOURCE_TYPES) - # Start reporting events. - await self.report_events() diff --git a/dashboard/modules/event/event_consts.py b/dashboard/modules/event/event_consts.py deleted file mode 100644 index c20579e71..000000000 --- a/dashboard/modules/event/event_consts.py +++ /dev/null @@ -1,26 +0,0 @@ -from ray.ray_constants import env_integer -from ray.core.generated import event_pb2 - -EVENT_MODULE_ENVIRONMENT_KEY = "RAY_DASHBOARD_MODULE_EVENT" -LOG_ERROR_EVENT_STRING_LENGTH_LIMIT = 1000 -RETRY_CONNECT_TO_DASHBOARD_INTERVAL_SECONDS = 2 -# Monitor events -SCAN_EVENT_DIR_INTERVAL_SECONDS = env_integer( - "SCAN_EVENT_DIR_INTERVAL_SECONDS", 2) -SCAN_EVENT_START_OFFSET_SECONDS = -30 * 60 -CONCURRENT_READ_LIMIT = 50 -EVENT_READ_LINE_COUNT_LIMIT = 200 -EVENT_READ_LINE_LENGTH_LIMIT = env_integer("EVENT_READ_LINE_LENGTH_LIMIT", - 2 * 1024 * 1024) # 2MB -# Report events -EVENT_AGENT_REPORT_INTERVAL_SECONDS = 0.1 -EVENT_AGENT_RETRY_TIMES = 10 -EVENT_AGENT_CACHE_SIZE = 10240 -# Event sources -EVENT_HEAD_MONITOR_SOURCE_TYPES = [ - event_pb2.Event.SourceType.Name(event_pb2.Event.GCS) -] -EVENT_AGENT_MONITOR_SOURCE_TYPES = list( - set(event_pb2.Event.SourceType.keys()) - - set(EVENT_HEAD_MONITOR_SOURCE_TYPES)) -EVENT_SOURCE_ALL = event_pb2.Event.SourceType.keys() diff --git a/dashboard/modules/event/event_head.py b/dashboard/modules/event/event_head.py deleted file mode 100644 index 1e6eff0df..000000000 --- a/dashboard/modules/event/event_head.py +++ /dev/null @@ -1,89 +0,0 @@ -import os -import asyncio -import logging -from typing import Union -from collections import OrderedDict, defaultdict - -import aiohttp.web - -import ray.new_dashboard.utils as dashboard_utils -from ray.ray_constants import env_bool -from ray.new_dashboard.modules.event import event_consts -from ray.new_dashboard.modules.event.event_utils import ( - parse_event_strings, - monitor_events, -) -from ray.core.generated import event_pb2 -from ray.core.generated import event_pb2_grpc -from ray.new_dashboard.datacenter import DataSource - -logger = logging.getLogger(__name__) -routes = dashboard_utils.ClassMethodRouteTable - -JobEvents = OrderedDict -dashboard_utils._json_compatible_types.add(JobEvents) - - -@dashboard_utils.dashboard_module( - enable=env_bool(event_consts.EVENT_MODULE_ENVIRONMENT_KEY, False)) -class EventHead(dashboard_utils.DashboardHeadModule, - event_pb2_grpc.ReportEventServiceServicer): - def __init__(self, dashboard_head): - super().__init__(dashboard_head) - self._event_dir = os.path.join(self._dashboard_head.log_dir, "events") - os.makedirs(self._event_dir, exist_ok=True) - self._monitor: Union[asyncio.Task, None] = None - - @staticmethod - def _update_events(event_list): - # {job_id: {event_id: event}} - all_job_events = defaultdict(JobEvents) - for event in event_list: - event_id = event["event_id"] - custom_fields = event.get("custom_fields") - system_event = False - if custom_fields: - job_id = custom_fields.get("job_id", "global") or "global" - else: - job_id = "global" - if system_event is False: - all_job_events[job_id][event_id] = event - # TODO(fyrestone): Limit the event count per job. - for job_id, new_job_events in all_job_events.items(): - job_events = DataSource.events.get(job_id, JobEvents()) - job_events.update(new_job_events) - DataSource.events[job_id] = job_events - - async def ReportEvents(self, request, context): - received_events = [] - if request.event_strings: - received_events.extend(parse_event_strings(request.event_strings)) - logger.info("Received %d events", len(received_events)) - self._update_events(received_events) - return event_pb2.ReportEventsReply(send_success=True) - - @routes.get("/events") - @dashboard_utils.aiohttp_cache(2) - async def get_event(self, req) -> aiohttp.web.Response: - job_id = req.query.get("job_id") - if job_id is None: - all_events = { - job_id: list(job_events.values()) - for job_id, job_events in DataSource.events.items() - } - return dashboard_utils.rest_response( - success=True, message="All events fetched.", events=all_events) - - job_events = DataSource.events.get(job_id, {}) - return dashboard_utils.rest_response( - success=True, - message="Job events fetched.", - job_id=job_id, - events=list(job_events.values())) - - async def run(self, server): - event_pb2_grpc.add_ReportEventServiceServicer_to_server(self, server) - self._monitor = monitor_events( - self._event_dir, - lambda data: self._update_events(parse_event_strings(data)), - source_types=event_consts.EVENT_HEAD_MONITOR_SOURCE_TYPES) diff --git a/dashboard/modules/event/event_utils.py b/dashboard/modules/event/event_utils.py deleted file mode 100644 index ccd333d4d..000000000 --- a/dashboard/modules/event/event_utils.py +++ /dev/null @@ -1,190 +0,0 @@ -import os -import time -import mmap -import json -import fnmatch -import asyncio -import itertools -import collections -import logging.handlers - -from ray.new_dashboard.modules.event import event_consts -from ray.new_dashboard.utils import async_loop_forever, create_task - -logger = logging.getLogger(__name__) - - -def _get_source_files(event_dir, source_types=None, event_file_filter=None): - event_log_names = os.listdir(event_dir) - source_files = {} - all_source_types = set(event_consts.EVENT_SOURCE_ALL) - for source_type in source_types or event_consts.EVENT_SOURCE_ALL: - assert source_type in all_source_types, \ - f"Invalid source type: {source_type}" - files = [] - for n in event_log_names: - if fnmatch.fnmatch(n, f"*{source_type}*"): - f = os.path.join(event_dir, n) - if event_file_filter is not None and not event_file_filter(f): - continue - files.append(f) - if files: - source_files[source_type] = files - return source_files - - -def _restore_newline(event_dict): - try: - event_dict["message"] = event_dict["message"]\ - .replace("\\n", "\n")\ - .replace("\\r", "\n") - except Exception: - logger.exception("Restore newline for event failed: %s", event_dict) - return event_dict - - -def _parse_line(event_str): - return _restore_newline(json.loads(event_str)) - - -def parse_event_strings(event_string_list): - events = [] - for data in event_string_list: - if not data: - continue - try: - event = _parse_line(data) - events.append(event) - except Exception: - logger.exception("Parse event line failed: %s", repr(data)) - return events - - -ReadFileResult = collections.namedtuple( - "ReadFileResult", ["fid", "size", "mtime", "position", "lines"]) - - -def _read_file(file, - pos, - n_lines=event_consts.EVENT_READ_LINE_COUNT_LIMIT, - closefd=True): - with open(file, "rb", closefd=closefd) as f: - # The ino may be 0 on Windows. - stat = os.stat(f.fileno()) - fid = stat.st_ino or file - lines = [] - with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm: - start = pos - for _ in range(n_lines): - sep = mm.find(b"\n", start) - if sep == -1: - break - if sep - start <= event_consts.EVENT_READ_LINE_LENGTH_LIMIT: - lines.append(mm[start:sep].decode("utf-8")) - else: - truncated_size = min( - 100, event_consts.EVENT_READ_LINE_LENGTH_LIMIT) - logger.warning( - "Ignored long string: %s...(%s chars)", - mm[start:start + truncated_size].decode("utf-8"), - sep - start) - start = sep + 1 - return ReadFileResult(fid, stat.st_size, stat.st_mtime, start, lines) - - -def monitor_events( - event_dir, - callback, - scan_interval_seconds=event_consts.SCAN_EVENT_DIR_INTERVAL_SECONDS, - start_mtime=time.time() + event_consts.SCAN_EVENT_START_OFFSET_SECONDS, - monitor_files=None, - source_types=None): - """ Monitor events in directory. New events will be read and passed to the - callback. - - Args: - event_dir (str): The event log directory. - callback (def callback(List[str]): pass): A callback accepts a list of - event strings. - scan_interval_seconds (float): An interval seconds between two scans. - start_mtime (float): Only the event log files whose last modification - time is greater than start_mtime are monitored. - monitor_files (Dict[int, MonitorFile]): The map from event log file id - to MonitorFile object. Monitor all files start from the beginning - if the value is None. - source_types (List[str]): A list of source type name from - event_pb2.Event.SourceType.keys(). Monitor all source types if the - value is None. - """ - loop = asyncio.get_event_loop() - if monitor_files is None: - monitor_files = {} - - logger.info( - "Monitor events logs modified after %s on %s, " - "the source types are %s.", start_mtime, event_dir, "all" - if source_types is None else source_types) - - MonitorFile = collections.namedtuple("MonitorFile", - ["size", "mtime", "position"]) - - def _source_file_filter(source_file): - stat = os.stat(source_file) - return stat.st_mtime > start_mtime - - def _read_monitor_file(file, pos): - assert isinstance(file, str), \ - f"File should be a str, but a {type(file)}({file}) found" - fd = os.open(file, os.O_RDONLY) - try: - stat = os.stat(fd) - # Check the file size to avoid raising the exception - # ValueError: cannot mmap an empty file - if stat.st_size <= 0: - return [] - fid = stat.st_ino or file - monitor_file = monitor_files.get(fid) - if monitor_file: - if (monitor_file.position == monitor_file.size - and monitor_file.size == stat.st_size - and monitor_file.mtime == stat.st_mtime): - logger.debug( - "Skip reading the file because " - "there is no change: %s", file) - return [] - position = monitor_file.position - else: - logger.info("Found new event log file: %s", file) - position = pos - # Close the fd in finally. - r = _read_file(fd, position, closefd=False) - # It should be fine to update the dict in executor thread. - monitor_files[r.fid] = MonitorFile(r.size, r.mtime, r.position) - loop.call_soon_threadsafe(callback, r.lines) - except Exception as e: - raise Exception(f"Read event file failed: {file}") from e - finally: - os.close(fd) - - @async_loop_forever(scan_interval_seconds, cancellable=True) - async def _scan_event_log_files(): - # Scan event files. - source_files = await loop.run_in_executor(None, _get_source_files, - event_dir, source_types, - _source_file_filter) - - # Limit concurrent read to avoid fd exhaustion. - semaphore = asyncio.Semaphore(event_consts.CONCURRENT_READ_LIMIT) - - async def _concurrent_coro(filename): - async with semaphore: - return await loop.run_in_executor(None, _read_monitor_file, - filename, 0) - - # Read files. - await asyncio.gather(*[ - _concurrent_coro(filename) - for filename in list(itertools.chain(*source_files.values())) - ]) - - return create_task(_scan_event_log_files()) diff --git a/dashboard/modules/event/tests/test_event.py b/dashboard/modules/event/tests/test_event.py deleted file mode 100644 index a36223ead..000000000 --- a/dashboard/modules/event/tests/test_event.py +++ /dev/null @@ -1,264 +0,0 @@ -import os -import sys -import time -import json -import copy -import logging -import requests -import asyncio -import random -import tempfile - -import pytest -import numpy as np - -import ray -from ray._private.utils import binary_to_hex -from ray.new_dashboard.tests.conftest import * # noqa -from ray.new_dashboard.modules.event import event_consts -from ray.core.generated import event_pb2 -from ray.test_utils import ( - format_web_url, - wait_until_server_available, - wait_for_condition, -) -from ray.new_dashboard.modules.event.event_utils import ( - monitor_events, ) - -logger = logging.getLogger(__name__) - - -def _get_event(msg="empty message", job_id=None, source_type=None): - return { - "event_id": binary_to_hex(np.random.bytes(18)), - "source_type": random.choice(event_pb2.Event.SourceType.keys()) - if source_type is None else source_type, - "host_name": "po-dev.inc.alipay.net", - "pid": random.randint(1, 65536), - "label": "", - "message": msg, - "time_stamp": time.time(), - "severity": "INFO", - "custom_fields": { - "job_id": ray.JobID.from_int(random.randint(1, 100)).hex() - if job_id is None else job_id, - "node_id": "", - "task_id": "", - } - } - - -def _test_logger(name, log_file, max_bytes, backup_count): - handler = logging.handlers.RotatingFileHandler( - log_file, maxBytes=max_bytes, backupCount=backup_count) - formatter = logging.Formatter("%(message)s") - handler.setFormatter(formatter) - - logger = logging.getLogger(name) - logger.propagate = False - logger.setLevel(logging.INFO) - logger.addHandler(handler) - - return logger - - -def test_event_basic(enable_event_module, disable_aiohttp_cache, - ray_start_with_dashboard): - assert (wait_until_server_available(ray_start_with_dashboard["webui_url"])) - webui_url = format_web_url(ray_start_with_dashboard["webui_url"]) - session_dir = ray_start_with_dashboard["session_dir"] - event_dir = os.path.join(session_dir, "logs", "events") - job_id = ray.JobID.from_int(100).hex() - - source_type_gcs = event_pb2.Event.SourceType.Name(event_pb2.Event.GCS) - source_type_raylet = event_pb2.Event.SourceType.Name( - event_pb2.Event.RAYLET) - test_count = 20 - - for source_type in [source_type_gcs, source_type_raylet]: - test_log_file = os.path.join(event_dir, f"event_{source_type}.log") - test_logger = _test_logger( - __name__ + str(random.random()), - test_log_file, - max_bytes=2000, - backup_count=1000) - for i in range(test_count): - sample_event = _get_event( - str(i), job_id=job_id, source_type=source_type) - test_logger.info("%s", json.dumps(sample_event)) - - def _check_events(): - try: - resp = requests.get(f"{webui_url}/events") - resp.raise_for_status() - result = resp.json() - all_events = result["data"]["events"] - job_events = all_events[job_id] - assert len(job_events) >= test_count * 2 - source_messages = {} - for e in job_events: - source_type = e["sourceType"] - message = e["message"] - source_messages.setdefault(source_type, set()).add(message) - assert len(source_messages[source_type_gcs]) >= test_count - assert len(source_messages[source_type_raylet]) >= test_count - data = {str(i) for i in range(test_count)} - assert data & source_messages[source_type_gcs] == data - assert data & source_messages[source_type_raylet] == data - return True - except Exception as ex: - logger.exception(ex) - return False - - wait_for_condition(_check_events, timeout=15) - - -def test_event_message_limit(enable_event_module, small_event_line_limit, - disable_aiohttp_cache, ray_start_with_dashboard): - event_read_line_length_limit = small_event_line_limit - assert (wait_until_server_available(ray_start_with_dashboard["webui_url"])) - webui_url = format_web_url(ray_start_with_dashboard["webui_url"]) - session_dir = ray_start_with_dashboard["session_dir"] - event_dir = os.path.join(session_dir, "logs", "events") - job_id = ray.JobID.from_int(100).hex() - events = [] - # Sample event equals with limit. - sample_event = _get_event("", job_id=job_id) - message_len = event_read_line_length_limit - len(json.dumps(sample_event)) - for i in range(10): - sample_event = copy.deepcopy(sample_event) - sample_event["event_id"] = binary_to_hex(np.random.bytes(18)) - sample_event["message"] = str(i) * message_len - assert len(json.dumps(sample_event)) == event_read_line_length_limit - events.append(sample_event) - # Sample event longer than limit. - sample_event = copy.deepcopy(sample_event) - sample_event["event_id"] = binary_to_hex(np.random.bytes(18)) - sample_event["message"] = "2" * (message_len + 1) - assert len(json.dumps(sample_event)) > event_read_line_length_limit - events.append(sample_event) - - for i in range(event_consts.EVENT_READ_LINE_COUNT_LIMIT): - events.append(_get_event(str(i), job_id=job_id)) - - with open(os.path.join(event_dir, "tmp.log"), "w") as f: - f.writelines([(json.dumps(e) + "\n") for e in events]) - - try: - os.remove(os.path.join(event_dir, "event_GCS.log")) - except Exception: - pass - os.rename( - os.path.join(event_dir, "tmp.log"), - os.path.join(event_dir, "event_GCS.log")) - - def _check_events(): - try: - resp = requests.get(f"{webui_url}/events") - resp.raise_for_status() - result = resp.json() - all_events = result["data"]["events"] - assert len(all_events[job_id] - ) >= event_consts.EVENT_READ_LINE_COUNT_LIMIT + 10 - messages = [e["message"] for e in all_events[job_id]] - for i in range(10): - assert str(i) * message_len in messages - assert "2" * (message_len + 1) not in messages - assert str(event_consts.EVENT_READ_LINE_COUNT_LIMIT - - 1) in messages - return True - except Exception as ex: - logger.exception(ex) - return False - - wait_for_condition(_check_events, timeout=15) - - -@pytest.mark.asyncio -async def test_monitor_events(enable_event_module): - with tempfile.TemporaryDirectory() as temp_dir: - common = event_pb2.Event.SourceType.Name(event_pb2.Event.COMMON) - common_log = os.path.join(temp_dir, f"event_{common}.log") - test_logger = _test_logger( - __name__ + str(random.random()), - common_log, - max_bytes=10, - backup_count=10) - test_events1 = [] - monitor_task = monitor_events( - temp_dir, - lambda x: test_events1.extend(x), - scan_interval_seconds=0.01) - assert not monitor_task.done() - count = 10 - - async def _writer(*args, read_events, spin=True): - for x in range(*args): - test_logger.info("%s", x) - if spin: - while str(x) not in read_events: - await asyncio.sleep(0.01) - - async def _check_events(expect_events, read_events, timeout=10): - start_time = time.time() - while True: - sorted_events = sorted(int(i) for i in read_events) - sorted_events = [str(i) for i in sorted_events] - if time.time() - start_time > timeout: - raise TimeoutError( - f"Timeout, read events: {sorted_events}, " - f"expect events: {expect_events}") - if len(sorted_events) == len(expect_events): - if sorted_events == expect_events: - break - await asyncio.sleep(1) - - await asyncio.gather( - _writer(count, read_events=test_events1), - _check_events( - [str(i) for i in range(count)], read_events=test_events1)) - - monitor_task.cancel() - test_events2 = [] - monitor_task = monitor_events( - temp_dir, - lambda x: test_events2.extend(x), - scan_interval_seconds=0.1) - - await _check_events( - [str(i) for i in range(count)], read_events=test_events2) - - await _writer(count, count * 2, read_events=test_events2) - await _check_events( - [str(i) for i in range(count * 2)], read_events=test_events2) - - log_file_count = len(os.listdir(temp_dir)) - - test_logger = _test_logger( - __name__ + str(random.random()), - common_log, - max_bytes=1000, - backup_count=10) - assert len(os.listdir(temp_dir)) == log_file_count - - await _writer( - count * 2, count * 3, spin=False, read_events=test_events2) - await _check_events( - [str(i) for i in range(count * 3)], read_events=test_events2) - await _writer( - count * 3, count * 4, spin=False, read_events=test_events2) - await _check_events( - [str(i) for i in range(count * 4)], read_events=test_events2) - - # Test cancel monitor task. - monitor_task.cancel() - with pytest.raises(asyncio.CancelledError): - await monitor_task - assert monitor_task.done() - - assert len( - os.listdir(temp_dir)) > 1, "Event log should have rollovers." - - -if __name__ == "__main__": - sys.exit(pytest.main(["-v", __file__])) diff --git a/dashboard/modules/node/node_head.py b/dashboard/modules/node/node_head.py index dbd244974..918ae2566 100644 --- a/dashboard/modules/node/node_head.py +++ b/dashboard/modules/node/node_head.py @@ -294,9 +294,8 @@ class NodeHead(dashboard_utils.DashboardHeadModule): if match: pid = match.group(1) ip = match.group(2) - errs_for_ip = dict( - DataSource.ip_and_pid_to_errors.get(ip, {})) - pid_errors = list(errs_for_ip.get(pid, [])) + errs_for_ip = DataSource.ip_and_pid_to_errors.get(ip, {}) + pid_errors = errs_for_ip.get(pid, []) pid_errors.append({ "message": message, "timestamp": error_data.timestamp, diff --git a/dashboard/tests/conftest.py b/dashboard/tests/conftest.py index a82194257..ec893fbef 100644 --- a/dashboard/tests/conftest.py +++ b/dashboard/tests/conftest.py @@ -10,13 +10,6 @@ def enable_test_module(): os.environ.pop("RAY_DASHBOARD_MODULE_TEST", None) -@pytest.fixture -def enable_event_module(): - os.environ["RAY_DASHBOARD_MODULE_EVENT"] = "true" - yield - os.environ.pop("RAY_DASHBOARD_MODULE_EVENT", None) - - @pytest.fixture def disable_aiohttp_cache(): os.environ["RAY_DASHBOARD_NO_CACHE"] = "true" @@ -45,10 +38,3 @@ def set_http_proxy(): os.environ["https_proxy"] = https_proxy else: del os.environ["https_proxy"] - - -@pytest.fixture -def small_event_line_limit(): - os.environ["EVENT_READ_LINE_LENGTH_LIMIT"] = "1024" - yield 1024 - os.environ.pop("EVENT_READ_LINE_LENGTH_LIMIT", None) diff --git a/dashboard/tests/test_dashboard.py b/dashboard/tests/test_dashboard.py index aca08f222..98c5bdcf1 100644 --- a/dashboard/tests/test_dashboard.py +++ b/dashboard/tests/test_dashboard.py @@ -320,22 +320,6 @@ def test_async_loop_forever(): loop.run_forever() assert counter[0] > 2 - counter2 = [0] - task = None - - @dashboard_utils.async_loop_forever(interval_seconds=0.1, cancellable=True) - async def bar(): - nonlocal task - counter2[0] += 1 - if counter2[0] > 2: - task.cancel() - - loop = asyncio.new_event_loop() - task = loop.create_task(bar()) - with pytest.raises(asyncio.CancelledError): - loop.run_until_complete(task) - assert counter2[0] == 3 - def test_dashboard_module_decorator(enable_test_module): head_cls_list = dashboard_utils.get_all_modules( diff --git a/dashboard/utils.py b/dashboard/utils.py index 0acfee638..d8b4f20f0 100644 --- a/dashboard/utils.py +++ b/dashboard/utils.py @@ -669,21 +669,13 @@ async def get_aioredis_client(redis_address, redis_password, address=redis_address, password=redis_password) -def async_loop_forever(interval_seconds, cancellable=False): +def async_loop_forever(interval_seconds): def _wrapper(coro): @functools.wraps(coro) async def _looper(*args, **kwargs): while True: try: await coro(*args, **kwargs) - except asyncio.CancelledError as ex: - if cancellable: - logger.info(f"An async loop forever coroutine " - f"is cancelled {coro}.") - raise ex - else: - logger.exception(f"Can not cancel the async loop " - f"forever coroutine {coro}.") except Exception: logger.exception(f"Error looping coroutine {coro}.") await asyncio.sleep(interval_seconds) diff --git a/src/ray/protobuf/BUILD b/src/ray/protobuf/BUILD index c31f2d675..3e639c5a0 100644 --- a/src/ray/protobuf/BUILD +++ b/src/ray/protobuf/BUILD @@ -145,11 +145,6 @@ cc_proto_library( deps = [":event_proto"], ) -python_grpc_compile( - name = "event_py_proto", - deps = [":event_proto"], -) - # Job agent. proto_library( name = "job_agent_proto", diff --git a/src/ray/protobuf/event.proto b/src/ray/protobuf/event.proto index 2edc20277..7dfd61c27 100644 --- a/src/ray/protobuf/event.proto +++ b/src/ray/protobuf/event.proto @@ -40,15 +40,3 @@ message Event { // store custom key such as node_id, job_id, task_id map custom_fields = 9; } - -message ReportEventsReply { - bool send_success = 1; -} - -message ReportEventsRequest { - repeated string event_strings = 1; -} - -service ReportEventService { - rpc ReportEvents(ReportEventsRequest) returns (ReportEventsReply); -}