Revert "[Dashboard][event] Basic event module (#16985)" (#17068)

This reverts commit f1faa79a04.
This commit is contained in:
Amog Kamsetty 2021-07-13 23:18:43 -07:00 committed by GitHub
parent 7ec18f671a
commit c17e171f92
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 3 additions and 721 deletions

View file

@ -1957,7 +1957,6 @@ filegroup(
"//src/ray/protobuf:agent_manager_py_proto",
"//src/ray/protobuf:common_py_proto",
"//src/ray/protobuf:core_worker_py_proto",
"//src/ray/protobuf:event_py_proto",
"//src/ray/protobuf:gcs_py_proto",
"//src/ray/protobuf:gcs_service_py_proto",
"//src/ray/protobuf:job_agent_py_proto",

View file

@ -45,8 +45,6 @@ class DataSource:
job_actors = Dict()
# {worker id(str): core worker stats}
core_worker_stats = Dict()
# {job id hex(str): {event id(str): event dict}}
events = Dict()
# {node ip (str): log entries by pid
# (dict from pid to list of latest log entries)}
ip_and_pid_to_logs = Dict()

View file

@ -1,90 +0,0 @@
import os
import asyncio
import logging
from typing import Union
from grpc.experimental import aio as aiogrpc
import ray.new_dashboard.utils as dashboard_utils
import ray.new_dashboard.consts as dashboard_consts
from ray.ray_constants import env_bool
from ray.new_dashboard.utils import async_loop_forever, create_task
from ray.new_dashboard.modules.event import event_consts
from ray.new_dashboard.modules.event.event_utils import monitor_events
from ray.core.generated import event_pb2
from ray.core.generated import event_pb2_grpc
logger = logging.getLogger(__name__)
routes = dashboard_utils.ClassMethodRouteTable
@dashboard_utils.dashboard_module(
enable=env_bool(event_consts.EVENT_MODULE_ENVIRONMENT_KEY, False))
class EventAgent(dashboard_utils.DashboardAgentModule):
def __init__(self, dashboard_agent):
super().__init__(dashboard_agent)
self._event_dir = os.path.join(self._dashboard_agent.log_dir, "events")
os.makedirs(self._event_dir, exist_ok=True)
self._monitor: Union[asyncio.Task, None] = None
self._stub: Union[event_pb2_grpc.ReportEventServiceStub, None] = None
self._cached_events = asyncio.Queue(
event_consts.EVENT_AGENT_CACHE_SIZE)
logger.info("Event agent cache buffer size: %s",
self._cached_events.maxsize)
async def _connect_to_dashboard(self):
""" Connect to the dashboard. If the dashboard is not started, then
this method will never returns.
Returns:
The ReportEventServiceStub object.
"""
while True:
try:
aioredis = self._dashboard_agent.aioredis_client
dashboard_rpc_address = await aioredis.get(
dashboard_consts.REDIS_KEY_DASHBOARD_RPC)
if dashboard_rpc_address:
logger.info("Report events to %s", dashboard_rpc_address)
options = (("grpc.enable_http_proxy", 0), )
channel = aiogrpc.insecure_channel(
dashboard_rpc_address, options=options)
return event_pb2_grpc.ReportEventServiceStub(channel)
except Exception:
logger.exception("Connect to dashboard failed.")
await asyncio.sleep(
event_consts.RETRY_CONNECT_TO_DASHBOARD_INTERVAL_SECONDS)
@async_loop_forever(event_consts.EVENT_AGENT_REPORT_INTERVAL_SECONDS)
async def report_events(self):
""" Report events from cached events queue. Reconnect to dashboard if
report failed. Log error after retry EVENT_AGENT_RETRY_TIMES.
This method will never returns.
"""
data = await self._cached_events.get()
for _ in range(event_consts.EVENT_AGENT_RETRY_TIMES):
try:
logger.info("Report %s events.", len(data))
request = event_pb2.ReportEventsRequest(event_strings=data)
await self._stub.ReportEvents(request)
break
except Exception:
logger.exception("Report event failed, reconnect to the "
"dashboard.")
self._stub = await self._connect_to_dashboard()
else:
data_str = str(data)
limit = event_consts.LOG_ERROR_EVENT_STRING_LENGTH_LIMIT
logger.error("Report event failed: %s",
data_str[:limit] + (data_str[limit:] and "..."))
async def run(self, server):
# Connect to dashboard.
self._stub = await self._connect_to_dashboard()
# Start monitor task.
self._monitor = monitor_events(
self._event_dir,
lambda data: create_task(self._cached_events.put(data)),
source_types=event_consts.EVENT_AGENT_MONITOR_SOURCE_TYPES)
# Start reporting events.
await self.report_events()

View file

@ -1,26 +0,0 @@
from ray.ray_constants import env_integer
from ray.core.generated import event_pb2
EVENT_MODULE_ENVIRONMENT_KEY = "RAY_DASHBOARD_MODULE_EVENT"
LOG_ERROR_EVENT_STRING_LENGTH_LIMIT = 1000
RETRY_CONNECT_TO_DASHBOARD_INTERVAL_SECONDS = 2
# Monitor events
SCAN_EVENT_DIR_INTERVAL_SECONDS = env_integer(
"SCAN_EVENT_DIR_INTERVAL_SECONDS", 2)
SCAN_EVENT_START_OFFSET_SECONDS = -30 * 60
CONCURRENT_READ_LIMIT = 50
EVENT_READ_LINE_COUNT_LIMIT = 200
EVENT_READ_LINE_LENGTH_LIMIT = env_integer("EVENT_READ_LINE_LENGTH_LIMIT",
2 * 1024 * 1024) # 2MB
# Report events
EVENT_AGENT_REPORT_INTERVAL_SECONDS = 0.1
EVENT_AGENT_RETRY_TIMES = 10
EVENT_AGENT_CACHE_SIZE = 10240
# Event sources
EVENT_HEAD_MONITOR_SOURCE_TYPES = [
event_pb2.Event.SourceType.Name(event_pb2.Event.GCS)
]
EVENT_AGENT_MONITOR_SOURCE_TYPES = list(
set(event_pb2.Event.SourceType.keys()) -
set(EVENT_HEAD_MONITOR_SOURCE_TYPES))
EVENT_SOURCE_ALL = event_pb2.Event.SourceType.keys()

View file

@ -1,89 +0,0 @@
import os
import asyncio
import logging
from typing import Union
from collections import OrderedDict, defaultdict
import aiohttp.web
import ray.new_dashboard.utils as dashboard_utils
from ray.ray_constants import env_bool
from ray.new_dashboard.modules.event import event_consts
from ray.new_dashboard.modules.event.event_utils import (
parse_event_strings,
monitor_events,
)
from ray.core.generated import event_pb2
from ray.core.generated import event_pb2_grpc
from ray.new_dashboard.datacenter import DataSource
logger = logging.getLogger(__name__)
routes = dashboard_utils.ClassMethodRouteTable
JobEvents = OrderedDict
dashboard_utils._json_compatible_types.add(JobEvents)
@dashboard_utils.dashboard_module(
enable=env_bool(event_consts.EVENT_MODULE_ENVIRONMENT_KEY, False))
class EventHead(dashboard_utils.DashboardHeadModule,
event_pb2_grpc.ReportEventServiceServicer):
def __init__(self, dashboard_head):
super().__init__(dashboard_head)
self._event_dir = os.path.join(self._dashboard_head.log_dir, "events")
os.makedirs(self._event_dir, exist_ok=True)
self._monitor: Union[asyncio.Task, None] = None
@staticmethod
def _update_events(event_list):
# {job_id: {event_id: event}}
all_job_events = defaultdict(JobEvents)
for event in event_list:
event_id = event["event_id"]
custom_fields = event.get("custom_fields")
system_event = False
if custom_fields:
job_id = custom_fields.get("job_id", "global") or "global"
else:
job_id = "global"
if system_event is False:
all_job_events[job_id][event_id] = event
# TODO(fyrestone): Limit the event count per job.
for job_id, new_job_events in all_job_events.items():
job_events = DataSource.events.get(job_id, JobEvents())
job_events.update(new_job_events)
DataSource.events[job_id] = job_events
async def ReportEvents(self, request, context):
received_events = []
if request.event_strings:
received_events.extend(parse_event_strings(request.event_strings))
logger.info("Received %d events", len(received_events))
self._update_events(received_events)
return event_pb2.ReportEventsReply(send_success=True)
@routes.get("/events")
@dashboard_utils.aiohttp_cache(2)
async def get_event(self, req) -> aiohttp.web.Response:
job_id = req.query.get("job_id")
if job_id is None:
all_events = {
job_id: list(job_events.values())
for job_id, job_events in DataSource.events.items()
}
return dashboard_utils.rest_response(
success=True, message="All events fetched.", events=all_events)
job_events = DataSource.events.get(job_id, {})
return dashboard_utils.rest_response(
success=True,
message="Job events fetched.",
job_id=job_id,
events=list(job_events.values()))
async def run(self, server):
event_pb2_grpc.add_ReportEventServiceServicer_to_server(self, server)
self._monitor = monitor_events(
self._event_dir,
lambda data: self._update_events(parse_event_strings(data)),
source_types=event_consts.EVENT_HEAD_MONITOR_SOURCE_TYPES)

View file

@ -1,190 +0,0 @@
import os
import time
import mmap
import json
import fnmatch
import asyncio
import itertools
import collections
import logging.handlers
from ray.new_dashboard.modules.event import event_consts
from ray.new_dashboard.utils import async_loop_forever, create_task
logger = logging.getLogger(__name__)
def _get_source_files(event_dir, source_types=None, event_file_filter=None):
event_log_names = os.listdir(event_dir)
source_files = {}
all_source_types = set(event_consts.EVENT_SOURCE_ALL)
for source_type in source_types or event_consts.EVENT_SOURCE_ALL:
assert source_type in all_source_types, \
f"Invalid source type: {source_type}"
files = []
for n in event_log_names:
if fnmatch.fnmatch(n, f"*{source_type}*"):
f = os.path.join(event_dir, n)
if event_file_filter is not None and not event_file_filter(f):
continue
files.append(f)
if files:
source_files[source_type] = files
return source_files
def _restore_newline(event_dict):
try:
event_dict["message"] = event_dict["message"]\
.replace("\\n", "\n")\
.replace("\\r", "\n")
except Exception:
logger.exception("Restore newline for event failed: %s", event_dict)
return event_dict
def _parse_line(event_str):
return _restore_newline(json.loads(event_str))
def parse_event_strings(event_string_list):
events = []
for data in event_string_list:
if not data:
continue
try:
event = _parse_line(data)
events.append(event)
except Exception:
logger.exception("Parse event line failed: %s", repr(data))
return events
ReadFileResult = collections.namedtuple(
"ReadFileResult", ["fid", "size", "mtime", "position", "lines"])
def _read_file(file,
pos,
n_lines=event_consts.EVENT_READ_LINE_COUNT_LIMIT,
closefd=True):
with open(file, "rb", closefd=closefd) as f:
# The ino may be 0 on Windows.
stat = os.stat(f.fileno())
fid = stat.st_ino or file
lines = []
with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
start = pos
for _ in range(n_lines):
sep = mm.find(b"\n", start)
if sep == -1:
break
if sep - start <= event_consts.EVENT_READ_LINE_LENGTH_LIMIT:
lines.append(mm[start:sep].decode("utf-8"))
else:
truncated_size = min(
100, event_consts.EVENT_READ_LINE_LENGTH_LIMIT)
logger.warning(
"Ignored long string: %s...(%s chars)",
mm[start:start + truncated_size].decode("utf-8"),
sep - start)
start = sep + 1
return ReadFileResult(fid, stat.st_size, stat.st_mtime, start, lines)
def monitor_events(
event_dir,
callback,
scan_interval_seconds=event_consts.SCAN_EVENT_DIR_INTERVAL_SECONDS,
start_mtime=time.time() + event_consts.SCAN_EVENT_START_OFFSET_SECONDS,
monitor_files=None,
source_types=None):
""" Monitor events in directory. New events will be read and passed to the
callback.
Args:
event_dir (str): The event log directory.
callback (def callback(List[str]): pass): A callback accepts a list of
event strings.
scan_interval_seconds (float): An interval seconds between two scans.
start_mtime (float): Only the event log files whose last modification
time is greater than start_mtime are monitored.
monitor_files (Dict[int, MonitorFile]): The map from event log file id
to MonitorFile object. Monitor all files start from the beginning
if the value is None.
source_types (List[str]): A list of source type name from
event_pb2.Event.SourceType.keys(). Monitor all source types if the
value is None.
"""
loop = asyncio.get_event_loop()
if monitor_files is None:
monitor_files = {}
logger.info(
"Monitor events logs modified after %s on %s, "
"the source types are %s.", start_mtime, event_dir, "all"
if source_types is None else source_types)
MonitorFile = collections.namedtuple("MonitorFile",
["size", "mtime", "position"])
def _source_file_filter(source_file):
stat = os.stat(source_file)
return stat.st_mtime > start_mtime
def _read_monitor_file(file, pos):
assert isinstance(file, str), \
f"File should be a str, but a {type(file)}({file}) found"
fd = os.open(file, os.O_RDONLY)
try:
stat = os.stat(fd)
# Check the file size to avoid raising the exception
# ValueError: cannot mmap an empty file
if stat.st_size <= 0:
return []
fid = stat.st_ino or file
monitor_file = monitor_files.get(fid)
if monitor_file:
if (monitor_file.position == monitor_file.size
and monitor_file.size == stat.st_size
and monitor_file.mtime == stat.st_mtime):
logger.debug(
"Skip reading the file because "
"there is no change: %s", file)
return []
position = monitor_file.position
else:
logger.info("Found new event log file: %s", file)
position = pos
# Close the fd in finally.
r = _read_file(fd, position, closefd=False)
# It should be fine to update the dict in executor thread.
monitor_files[r.fid] = MonitorFile(r.size, r.mtime, r.position)
loop.call_soon_threadsafe(callback, r.lines)
except Exception as e:
raise Exception(f"Read event file failed: {file}") from e
finally:
os.close(fd)
@async_loop_forever(scan_interval_seconds, cancellable=True)
async def _scan_event_log_files():
# Scan event files.
source_files = await loop.run_in_executor(None, _get_source_files,
event_dir, source_types,
_source_file_filter)
# Limit concurrent read to avoid fd exhaustion.
semaphore = asyncio.Semaphore(event_consts.CONCURRENT_READ_LIMIT)
async def _concurrent_coro(filename):
async with semaphore:
return await loop.run_in_executor(None, _read_monitor_file,
filename, 0)
# Read files.
await asyncio.gather(*[
_concurrent_coro(filename)
for filename in list(itertools.chain(*source_files.values()))
])
return create_task(_scan_event_log_files())

View file

@ -1,264 +0,0 @@
import os
import sys
import time
import json
import copy
import logging
import requests
import asyncio
import random
import tempfile
import pytest
import numpy as np
import ray
from ray._private.utils import binary_to_hex
from ray.new_dashboard.tests.conftest import * # noqa
from ray.new_dashboard.modules.event import event_consts
from ray.core.generated import event_pb2
from ray.test_utils import (
format_web_url,
wait_until_server_available,
wait_for_condition,
)
from ray.new_dashboard.modules.event.event_utils import (
monitor_events, )
logger = logging.getLogger(__name__)
def _get_event(msg="empty message", job_id=None, source_type=None):
return {
"event_id": binary_to_hex(np.random.bytes(18)),
"source_type": random.choice(event_pb2.Event.SourceType.keys())
if source_type is None else source_type,
"host_name": "po-dev.inc.alipay.net",
"pid": random.randint(1, 65536),
"label": "",
"message": msg,
"time_stamp": time.time(),
"severity": "INFO",
"custom_fields": {
"job_id": ray.JobID.from_int(random.randint(1, 100)).hex()
if job_id is None else job_id,
"node_id": "",
"task_id": "",
}
}
def _test_logger(name, log_file, max_bytes, backup_count):
handler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=max_bytes, backupCount=backup_count)
formatter = logging.Formatter("%(message)s")
handler.setFormatter(formatter)
logger = logging.getLogger(name)
logger.propagate = False
logger.setLevel(logging.INFO)
logger.addHandler(handler)
return logger
def test_event_basic(enable_event_module, disable_aiohttp_cache,
ray_start_with_dashboard):
assert (wait_until_server_available(ray_start_with_dashboard["webui_url"]))
webui_url = format_web_url(ray_start_with_dashboard["webui_url"])
session_dir = ray_start_with_dashboard["session_dir"]
event_dir = os.path.join(session_dir, "logs", "events")
job_id = ray.JobID.from_int(100).hex()
source_type_gcs = event_pb2.Event.SourceType.Name(event_pb2.Event.GCS)
source_type_raylet = event_pb2.Event.SourceType.Name(
event_pb2.Event.RAYLET)
test_count = 20
for source_type in [source_type_gcs, source_type_raylet]:
test_log_file = os.path.join(event_dir, f"event_{source_type}.log")
test_logger = _test_logger(
__name__ + str(random.random()),
test_log_file,
max_bytes=2000,
backup_count=1000)
for i in range(test_count):
sample_event = _get_event(
str(i), job_id=job_id, source_type=source_type)
test_logger.info("%s", json.dumps(sample_event))
def _check_events():
try:
resp = requests.get(f"{webui_url}/events")
resp.raise_for_status()
result = resp.json()
all_events = result["data"]["events"]
job_events = all_events[job_id]
assert len(job_events) >= test_count * 2
source_messages = {}
for e in job_events:
source_type = e["sourceType"]
message = e["message"]
source_messages.setdefault(source_type, set()).add(message)
assert len(source_messages[source_type_gcs]) >= test_count
assert len(source_messages[source_type_raylet]) >= test_count
data = {str(i) for i in range(test_count)}
assert data & source_messages[source_type_gcs] == data
assert data & source_messages[source_type_raylet] == data
return True
except Exception as ex:
logger.exception(ex)
return False
wait_for_condition(_check_events, timeout=15)
def test_event_message_limit(enable_event_module, small_event_line_limit,
disable_aiohttp_cache, ray_start_with_dashboard):
event_read_line_length_limit = small_event_line_limit
assert (wait_until_server_available(ray_start_with_dashboard["webui_url"]))
webui_url = format_web_url(ray_start_with_dashboard["webui_url"])
session_dir = ray_start_with_dashboard["session_dir"]
event_dir = os.path.join(session_dir, "logs", "events")
job_id = ray.JobID.from_int(100).hex()
events = []
# Sample event equals with limit.
sample_event = _get_event("", job_id=job_id)
message_len = event_read_line_length_limit - len(json.dumps(sample_event))
for i in range(10):
sample_event = copy.deepcopy(sample_event)
sample_event["event_id"] = binary_to_hex(np.random.bytes(18))
sample_event["message"] = str(i) * message_len
assert len(json.dumps(sample_event)) == event_read_line_length_limit
events.append(sample_event)
# Sample event longer than limit.
sample_event = copy.deepcopy(sample_event)
sample_event["event_id"] = binary_to_hex(np.random.bytes(18))
sample_event["message"] = "2" * (message_len + 1)
assert len(json.dumps(sample_event)) > event_read_line_length_limit
events.append(sample_event)
for i in range(event_consts.EVENT_READ_LINE_COUNT_LIMIT):
events.append(_get_event(str(i), job_id=job_id))
with open(os.path.join(event_dir, "tmp.log"), "w") as f:
f.writelines([(json.dumps(e) + "\n") for e in events])
try:
os.remove(os.path.join(event_dir, "event_GCS.log"))
except Exception:
pass
os.rename(
os.path.join(event_dir, "tmp.log"),
os.path.join(event_dir, "event_GCS.log"))
def _check_events():
try:
resp = requests.get(f"{webui_url}/events")
resp.raise_for_status()
result = resp.json()
all_events = result["data"]["events"]
assert len(all_events[job_id]
) >= event_consts.EVENT_READ_LINE_COUNT_LIMIT + 10
messages = [e["message"] for e in all_events[job_id]]
for i in range(10):
assert str(i) * message_len in messages
assert "2" * (message_len + 1) not in messages
assert str(event_consts.EVENT_READ_LINE_COUNT_LIMIT -
1) in messages
return True
except Exception as ex:
logger.exception(ex)
return False
wait_for_condition(_check_events, timeout=15)
@pytest.mark.asyncio
async def test_monitor_events(enable_event_module):
with tempfile.TemporaryDirectory() as temp_dir:
common = event_pb2.Event.SourceType.Name(event_pb2.Event.COMMON)
common_log = os.path.join(temp_dir, f"event_{common}.log")
test_logger = _test_logger(
__name__ + str(random.random()),
common_log,
max_bytes=10,
backup_count=10)
test_events1 = []
monitor_task = monitor_events(
temp_dir,
lambda x: test_events1.extend(x),
scan_interval_seconds=0.01)
assert not monitor_task.done()
count = 10
async def _writer(*args, read_events, spin=True):
for x in range(*args):
test_logger.info("%s", x)
if spin:
while str(x) not in read_events:
await asyncio.sleep(0.01)
async def _check_events(expect_events, read_events, timeout=10):
start_time = time.time()
while True:
sorted_events = sorted(int(i) for i in read_events)
sorted_events = [str(i) for i in sorted_events]
if time.time() - start_time > timeout:
raise TimeoutError(
f"Timeout, read events: {sorted_events}, "
f"expect events: {expect_events}")
if len(sorted_events) == len(expect_events):
if sorted_events == expect_events:
break
await asyncio.sleep(1)
await asyncio.gather(
_writer(count, read_events=test_events1),
_check_events(
[str(i) for i in range(count)], read_events=test_events1))
monitor_task.cancel()
test_events2 = []
monitor_task = monitor_events(
temp_dir,
lambda x: test_events2.extend(x),
scan_interval_seconds=0.1)
await _check_events(
[str(i) for i in range(count)], read_events=test_events2)
await _writer(count, count * 2, read_events=test_events2)
await _check_events(
[str(i) for i in range(count * 2)], read_events=test_events2)
log_file_count = len(os.listdir(temp_dir))
test_logger = _test_logger(
__name__ + str(random.random()),
common_log,
max_bytes=1000,
backup_count=10)
assert len(os.listdir(temp_dir)) == log_file_count
await _writer(
count * 2, count * 3, spin=False, read_events=test_events2)
await _check_events(
[str(i) for i in range(count * 3)], read_events=test_events2)
await _writer(
count * 3, count * 4, spin=False, read_events=test_events2)
await _check_events(
[str(i) for i in range(count * 4)], read_events=test_events2)
# Test cancel monitor task.
monitor_task.cancel()
with pytest.raises(asyncio.CancelledError):
await monitor_task
assert monitor_task.done()
assert len(
os.listdir(temp_dir)) > 1, "Event log should have rollovers."
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

View file

@ -294,9 +294,8 @@ class NodeHead(dashboard_utils.DashboardHeadModule):
if match:
pid = match.group(1)
ip = match.group(2)
errs_for_ip = dict(
DataSource.ip_and_pid_to_errors.get(ip, {}))
pid_errors = list(errs_for_ip.get(pid, []))
errs_for_ip = DataSource.ip_and_pid_to_errors.get(ip, {})
pid_errors = errs_for_ip.get(pid, [])
pid_errors.append({
"message": message,
"timestamp": error_data.timestamp,

View file

@ -10,13 +10,6 @@ def enable_test_module():
os.environ.pop("RAY_DASHBOARD_MODULE_TEST", None)
@pytest.fixture
def enable_event_module():
os.environ["RAY_DASHBOARD_MODULE_EVENT"] = "true"
yield
os.environ.pop("RAY_DASHBOARD_MODULE_EVENT", None)
@pytest.fixture
def disable_aiohttp_cache():
os.environ["RAY_DASHBOARD_NO_CACHE"] = "true"
@ -45,10 +38,3 @@ def set_http_proxy():
os.environ["https_proxy"] = https_proxy
else:
del os.environ["https_proxy"]
@pytest.fixture
def small_event_line_limit():
os.environ["EVENT_READ_LINE_LENGTH_LIMIT"] = "1024"
yield 1024
os.environ.pop("EVENT_READ_LINE_LENGTH_LIMIT", None)

View file

@ -320,22 +320,6 @@ def test_async_loop_forever():
loop.run_forever()
assert counter[0] > 2
counter2 = [0]
task = None
@dashboard_utils.async_loop_forever(interval_seconds=0.1, cancellable=True)
async def bar():
nonlocal task
counter2[0] += 1
if counter2[0] > 2:
task.cancel()
loop = asyncio.new_event_loop()
task = loop.create_task(bar())
with pytest.raises(asyncio.CancelledError):
loop.run_until_complete(task)
assert counter2[0] == 3
def test_dashboard_module_decorator(enable_test_module):
head_cls_list = dashboard_utils.get_all_modules(

View file

@ -669,21 +669,13 @@ async def get_aioredis_client(redis_address, redis_password,
address=redis_address, password=redis_password)
def async_loop_forever(interval_seconds, cancellable=False):
def async_loop_forever(interval_seconds):
def _wrapper(coro):
@functools.wraps(coro)
async def _looper(*args, **kwargs):
while True:
try:
await coro(*args, **kwargs)
except asyncio.CancelledError as ex:
if cancellable:
logger.info(f"An async loop forever coroutine "
f"is cancelled {coro}.")
raise ex
else:
logger.exception(f"Can not cancel the async loop "
f"forever coroutine {coro}.")
except Exception:
logger.exception(f"Error looping coroutine {coro}.")
await asyncio.sleep(interval_seconds)

View file

@ -145,11 +145,6 @@ cc_proto_library(
deps = [":event_proto"],
)
python_grpc_compile(
name = "event_py_proto",
deps = [":event_proto"],
)
# Job agent.
proto_library(
name = "job_agent_proto",

View file

@ -40,15 +40,3 @@ message Event {
// store custom key such as node_id, job_id, task_id
map<string, string> custom_fields = 9;
}
message ReportEventsReply {
bool send_success = 1;
}
message ReportEventsRequest {
repeated string event_strings = 1;
}
service ReportEventService {
rpc ReportEvents(ReportEventsRequest) returns (ReportEventsReply);
}