diff --git a/dashboard/client/src/api.ts b/dashboard/client/src/api.ts index 42467df8e..e532d0829 100644 --- a/dashboard/client/src/api.ts +++ b/dashboard/client/src/api.ts @@ -1,4 +1,4 @@ -import { formatUrl } from "./service/requestHandlers"; +import { formatUrl, get as getV2 } from "./service/requestHandlers"; type APIResponse = { result: boolean; @@ -294,19 +294,18 @@ export const getErrors = (nodeIp: string, pid: number | null) => pid: pid ?? "", }); -export type LogsResponse = { - logs: LogsByPid; -}; - -export type LogsByPid = { - [pid: string]: string[]; -}; - -export const getLogs = (nodeIp: string, pid: number | null) => - get("/node_logs", { - ip: nodeIp, - pid: pid ?? "", +export const getLogs = async (nodeIp: string, pid: number) => { + const result = await getV2("/api/v0/logs/file", { + params: { + node_ip: nodeIp, + pid: pid, + lines: 15000, + }, }); + // Substring to get rid of initial "1" or "0" that represents successful stream. + // TODO(aguo): should we get rid of that? + return result.data.substring(1).split("\n"); +}; export type LaunchProfilingResponse = string; diff --git a/dashboard/client/src/pages/dashboard/node-info/NodeInfo.tsx b/dashboard/client/src/pages/dashboard/node-info/NodeInfo.tsx index fd6eead10..b41ca94e5 100644 --- a/dashboard/client/src/pages/dashboard/node-info/NodeInfo.tsx +++ b/dashboard/client/src/pages/dashboard/node-info/NodeInfo.tsx @@ -147,6 +147,11 @@ const liveNodesSelector = (state: StoreState) => ); type DialogState = { + nodeIp: string; + pid: number; +} | null; + +type ErrorDialogState = { nodeIp: string; pid: number | null; } | null; @@ -169,7 +174,7 @@ const nodeInfoHeaders: HeaderInfo[] = [ const NodeInfo: React.FC<{}> = () => { const [logDialog, setLogDialog] = useState(null); - const [errorDialog, setErrorDialog] = useState(null); + const [errorDialog, setErrorDialog] = useState(null); const [isGrouped, setIsGrouped] = useState(true); const [order, setOrder] = React.useState("asc"); const toggleOrder = () => setOrder(order === "asc" ? "desc" : "asc"); diff --git a/dashboard/client/src/pages/dashboard/node-info/dialogs/logs/Logs.tsx b/dashboard/client/src/pages/dashboard/node-info/dialogs/logs/Logs.tsx index 4651be36d..a6def8378 100644 --- a/dashboard/client/src/pages/dashboard/node-info/dialogs/logs/Logs.tsx +++ b/dashboard/client/src/pages/dashboard/node-info/dialogs/logs/Logs.tsx @@ -7,7 +7,7 @@ import { withStyles, } from "@material-ui/core"; import React from "react"; -import { getLogs, LogsByPid } from "../../../../../api"; +import { getLogs } from "../../../../../api"; import DialogWithTitle from "../../../../../common/DialogWithTitle"; import NumberedLines from "../../../../../common/NumberedLines"; @@ -30,11 +30,11 @@ const styles = (theme: Theme) => type Props = { clearLogDialog: () => void; nodeIp: string; - pid: number | null; + pid: number; }; type State = { - result: LogsByPid | null; + result: string[] | null; error: string | null; }; @@ -48,7 +48,7 @@ class Logs extends React.Component, State> { try { const { nodeIp, pid } = this.props; const result = await getLogs(nodeIp, pid); - this.setState({ result: result.logs, error: null }); + this.setState({ result, error: null }); } catch (error) { this.setState({ result: null, error: error.toString() }); } @@ -65,20 +65,18 @@ class Logs extends React.Component, State> { ) : result === null ? ( Loading... ) : ( - Object.entries(result).map(([pid, lines]) => ( - - - {nodeIp} (PID: {pid}) - - {lines.length > 0 ? ( -
- -
- ) : ( - No logs found. - )} -
- )) + + + {nodeIp} (PID: {this.props.pid}) + + {result.length > 0 ? ( +
+ +
+ ) : ( + No logs found. + )} +
)} ); diff --git a/dashboard/client/src/pages/dashboard/node-info/features/Logs.tsx b/dashboard/client/src/pages/dashboard/node-info/features/Logs.tsx index 3985db534..f9163b3cc 100644 --- a/dashboard/client/src/pages/dashboard/node-info/features/Logs.tsx +++ b/dashboard/client/src/pages/dashboard/node-info/features/Logs.tsx @@ -25,30 +25,25 @@ const ClusterLogs: ClusterFeatureRenderFn = ({ nodes }) => { ); }; -const makeNodeLogs = - ( - setLogDialog: (nodeIp: string, pid: number | null) => void, - ): NodeFeatureRenderFn => - ({ node }) => { - const logCount = node.logCount ?? 0; - return logCount === 0 ? ( - - No logs - - ) : ( - setLogDialog(node.ip, null)}> - View all logs ({logCount.toLocaleString()}{" "} - {node.logCount === 1 ? "line" : "lines"}) - - ); - }; +const makeNodeLogs: NodeFeatureRenderFn = ({ node }) => { + const logCount = node.logCount ?? 0; + return logCount === 0 ? ( + + No logs + + ) : ( + + {logCount.toLocaleString()} {node.logCount === 1 ? "line" : "lines"} + + ); +}; const nodeLogsAccessor: Accessor = ({ node }) => node.logCount ? sum(Object.values(node.logCount)) : 0; const makeWorkerLogs = ( - setLogDialog: (nodeIp: string, pid: number | null) => void, + setLogDialog: (nodeIp: string, pid: number) => void, ): WorkerFeatureRenderFn => ({ worker, node }) => { const logCount = worker.logCount ?? 0; @@ -68,12 +63,12 @@ const workerLogsAccessor: Accessor = ({ worker }) => worker.logCount ?? 0; const makeLogsFeature = ( - setLogDialog: (nodeIp: string, pid: number | null) => void, + setLogDialog: (nodeIp: string, pid: number) => void, ): NodeInfoFeature => ({ id: "logs", ClusterFeatureRenderFn: ClusterLogs, WorkerFeatureRenderFn: makeWorkerLogs(setLogDialog), - NodeFeatureRenderFn: makeNodeLogs(setLogDialog), + NodeFeatureRenderFn: makeNodeLogs, workerAccessor: workerLogsAccessor, nodeAccessor: nodeLogsAccessor, }); diff --git a/dashboard/datacenter.py b/dashboard/datacenter.py index 985908acd..2ba3655e0 100644 --- a/dashboard/datacenter.py +++ b/dashboard/datacenter.py @@ -48,9 +48,9 @@ class DataSource: core_worker_stats = Dict() # {job id hex(str): {event id(str): event dict}} events = Dict() - # {node ip (str): log entries by pid - # (dict from pid to list of latest log entries)} - ip_and_pid_to_logs = Dict() + # {node ip (str): log counts by pid + # (dict from pid to count of logs for that pid)} + ip_and_pid_to_log_counts = Dict() # {node ip (str): error entries by pid # (dict from pid to list of latest err entries)} ip_and_pid_to_errors = Dict() @@ -103,7 +103,7 @@ class DataOrganizer: async def get_node_workers(cls, node_id): workers = [] node_ip = DataSource.node_id_to_ip[node_id] - node_logs = DataSource.ip_and_pid_to_logs.get(node_ip, {}) + node_log_counts = DataSource.ip_and_pid_to_log_counts.get(node_ip, {}) node_errs = DataSource.ip_and_pid_to_errors.get(node_ip, {}) node_physical_stats = DataSource.node_physical_stats.get(node_id, {}) node_stats = DataSource.node_stats.get(node_id, {}) @@ -120,15 +120,15 @@ class DataOrganizer: pid_to_job_id[pid] = core_worker_stats["jobId"] # Clean up logs from a dead pid. - dead_pids = set(node_logs.keys()) - pids_on_node + dead_pids = set(node_log_counts.keys()) - pids_on_node for dead_pid in dead_pids: - if dead_pid in node_logs: - node_logs.mutable().pop(dead_pid) + if dead_pid in node_log_counts: + node_log_counts.mutable().pop(dead_pid) for worker in node_physical_stats.get("workers", []): worker = dict(worker) pid = worker["pid"] - worker["logCount"] = len(node_logs.get(str(pid), [])) + worker["logCount"] = node_log_counts.get(str(pid), 0) worker["errorCount"] = len(node_errs.get(str(pid), [])) worker["coreWorkerStats"] = pid_to_worker_stats.get(pid, []) worker["language"] = pid_to_language.get( @@ -148,10 +148,10 @@ class DataOrganizer: node = DataSource.nodes.get(node_id, {}) node_ip = DataSource.node_id_to_ip.get(node_id) # Merge node log count information into the payload - log_info = DataSource.ip_and_pid_to_logs.get(node_ip, {}) + log_counts = DataSource.ip_and_pid_to_log_counts.get(node_ip, {}) node_log_count = 0 - for entries in log_info.values(): - node_log_count += len(entries) + for entries in log_counts.values(): + node_log_count += entries error_info = DataSource.ip_and_pid_to_errors.get(node_ip, {}) node_err_count = 0 for entries in error_info.values(): diff --git a/dashboard/modules/actor/actor_head.py b/dashboard/modules/actor/actor_head.py index fa4edd842..113bb926d 100644 --- a/dashboard/modules/actor/actor_head.py +++ b/dashboard/modules/actor/actor_head.py @@ -1,5 +1,7 @@ import asyncio +from collections import deque import logging +import os import aiohttp.web @@ -25,10 +27,12 @@ try: except ImportError: from grpc.experimental import aio as aiogrpc - logger = logging.getLogger(__name__) routes = dashboard_optional_utils.ClassMethodRouteTable +MAX_ACTORS_TO_CACHE = int(os.environ.get("RAY_DASHBOARD_MAX_ACTORS_TO_CACHE", 1000)) +ACTOR_CLEANUP_FREQUENCY = 10 # seconds + def actor_table_data_to_dict(message): orig_message = dashboard_utils.message_to_dict( @@ -79,6 +83,8 @@ class ActorHead(dashboard_utils.DashboardHeadModule): self._stubs = {} # ActorInfoGcsService self._gcs_actor_info_stub = None + # A queue of dead actors in order of when they died + self.dead_actors_queue = deque() DataSource.nodes.signal.append(self._update_stubs) async def _update_stubs(self, change): @@ -154,6 +160,8 @@ class ActorHead(dashboard_utils.DashboardHeadModule): actor_id = actor_table_data["actorId"] job_id = actor_table_data["jobId"] node_id = actor_table_data["address"]["rayletId"] + if actor_table_data["state"] == "DEAD": + self.dead_actors_queue.append(actor_id) # Update actors. DataSource.actors[actor_id] = actor_table_data # Update node actors (only when node_id is not Nil). @@ -181,6 +189,30 @@ class ActorHead(dashboard_utils.DashboardHeadModule): except Exception: logger.exception("Error processing actor info from GCS.") + async def _cleanup_actors(self): + while True: + try: + if len(DataSource.actors) > MAX_ACTORS_TO_CACHE: + logger.debug("Cleaning up dead actors from GCS") + while len(DataSource.actors) > MAX_ACTORS_TO_CACHE: + if not self.dead_actors_queue: + logger.warning( + f"More than {MAX_ACTORS_TO_CACHE} " + "live actors are cached" + ) + break + actor_id = self.dead_actors_queue.popleft() + if actor_id in DataSource.actors: + actor = DataSource.actors.pop(actor_id) + job_id = actor["jobId"] + del DataSource.job_actors[job_id][actor_id] + node_id = actor["address"].get("rayletId") + if node_id: + del DataSource.node_actors[node_id][actor_id] + await asyncio.sleep(ACTOR_CLEANUP_FREQUENCY) + except Exception: + logger.exception("Error cleaning up actor info from GCS.") + @routes.get("/logical/actor_groups") async def get_actor_groups(self, req) -> aiohttp.web.Response: actors = await DataOrganizer.get_all_actors() @@ -236,6 +268,7 @@ class ActorHead(dashboard_utils.DashboardHeadModule): gcs_channel ) + asyncio.get_event_loop().create_task(self._cleanup_actors()) await asyncio.gather(self._update_actors()) @staticmethod diff --git a/dashboard/modules/actor/tests/test_actor.py b/dashboard/modules/actor/tests/test_actor.py index 04a04a6fd..8cad11a58 100644 --- a/dashboard/modules/actor/tests/test_actor.py +++ b/dashboard/modules/actor/tests/test_actor.py @@ -342,5 +342,98 @@ def test_nil_node(enable_test_module, disable_aiohttp_cache, ray_start_with_dash raise Exception(f"Timed out while testing, {ex_stack}") +def test_actor_cleanup( + disable_aiohttp_cache, reduce_actor_cache, ray_start_with_dashboard +): + @ray.remote + class Foo: + def __init__(self, num): + self.num = num + + def do_task(self): + return self.num + + @ray.remote(num_gpus=1) + class InfeasibleActor: + pass + + infeasible_actor = InfeasibleActor.remote() # noqa + + foo_actors = [ + Foo.remote(1), + Foo.remote(2), + Foo.remote(3), + Foo.remote(4), + Foo.remote(5), + Foo.remote(6), + ] + results = [actor.do_task.remote() for actor in foo_actors] # noqa + webui_url = ray_start_with_dashboard["webui_url"] + assert wait_until_server_available(webui_url) + webui_url = format_web_url(webui_url) + + timeout_seconds = 8 + start_time = time.time() + last_ex = None + while True: + time.sleep(1) + try: + resp = requests.get(f"{webui_url}/logical/actors") + resp_json = resp.json() + resp_data = resp_json["data"] + actors = resp_data["actors"] + # Although max cache is 3, there should be 7 actors + # because they are all still alive. + assert len(actors) == 7 + + break + except Exception as ex: + last_ex = ex + finally: + if time.time() > start_time + timeout_seconds: + ex_stack = ( + traceback.format_exception( + type(last_ex), last_ex, last_ex.__traceback__ + ) + if last_ex + else [] + ) + ex_stack = "".join(ex_stack) + raise Exception(f"Timed out while testing, {ex_stack}") + + # kill + ray.kill(infeasible_actor) + [ray.kill(foo_actor) for foo_actor in foo_actors] + # Wait 5 seconds for cleanup to finish + time.sleep(5) + + # Check only three remaining in cache + start_time = time.time() + while True: + time.sleep(1) + try: + resp = requests.get(f"{webui_url}/logical/actors") + resp_json = resp.json() + resp_data = resp_json["data"] + actors = resp_data["actors"] + # Max cache is 3 so only 3 actors should be left. + assert len(actors) == 3 + + break + except Exception as ex: + last_ex = ex + finally: + if time.time() > start_time + timeout_seconds: + ex_stack = ( + traceback.format_exception( + type(last_ex), last_ex, last_ex.__traceback__ + ) + if last_ex + else [] + ) + ex_stack = "".join(ex_stack) + raise Exception(f"Timed out while testing, {ex_stack}") + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/dashboard/modules/node/node_head.py b/dashboard/modules/node/node_head.py index 8a4aa2eca..7804be9d1 100644 --- a/dashboard/modules/node/node_head.py +++ b/dashboard/modules/node/node_head.py @@ -220,17 +220,6 @@ class NodeHead(dashboard_utils.DashboardHeadModule): success=True, message=f"Successfully set fetching to {should_fetch}" ) - @routes.get("/node_logs") - async def get_logs(self, req) -> aiohttp.web.Response: - ip = req.query["ip"] - pid = str(req.query.get("pid", "")) - node_logs = DataSource.ip_and_pid_to_logs.get(ip, {}) - if pid: - node_logs = {str(pid): node_logs.get(pid, [])} - return dashboard_optional_utils.rest_response( - success=True, message="Fetched logs.", logs=node_logs - ) - @routes.get("/node_errors") async def get_errors(self, req) -> aiohttp.web.Response: ip = req.query["ip"] @@ -269,18 +258,13 @@ class NodeHead(dashboard_utils.DashboardHeadModule): ip = log_batch["ip"] pid = str(log_batch["pid"]) if pid != "autoscaler": - logs_for_ip = dict(DataSource.ip_and_pid_to_logs.get(ip, {})) - logs_for_pid = list(logs_for_ip.get(pid, [])) - logs_for_pid.extend(log_batch["lines"]) - - # Only cache upto MAX_LOGS_TO_CACHE - logs_length = len(logs_for_pid) - if logs_length > MAX_LOGS_TO_CACHE * LOG_PRUNE_THREASHOLD: - offset = logs_length - MAX_LOGS_TO_CACHE - del logs_for_pid[:offset] - - logs_for_ip[pid] = logs_for_pid - DataSource.ip_and_pid_to_logs[ip] = logs_for_ip + log_counts_for_ip = dict( + DataSource.ip_and_pid_to_log_counts.get(ip, {}) + ) + log_counts_for_pid = log_counts_for_ip.get(pid, 0) + log_counts_for_pid += len(log_batch["lines"]) + log_counts_for_ip[pid] = log_counts_for_pid + DataSource.ip_and_pid_to_log_counts[ip] = log_counts_for_ip logger.debug(f"Received a log for {ip} and {pid}") while True: @@ -309,6 +293,13 @@ class NodeHead(dashboard_utils.DashboardHeadModule): "type": error_data.type, } ) + + # Only cache up to MAX_LOGS_TO_CACHE + pid_errors_length = len(pid_errors) + if pid_errors_length > MAX_LOGS_TO_CACHE * LOG_PRUNE_THREASHOLD: + offset = pid_errors_length - MAX_LOGS_TO_CACHE + del pid_errors[:offset] + errs_for_ip[pid] = pid_errors DataSource.ip_and_pid_to_errors[ip] = errs_for_ip logger.info(f"Received error entry for {ip} {pid}") diff --git a/dashboard/modules/node/tests/test_node.py b/dashboard/modules/node/tests/test_node.py index 6e33e5ec9..80c9ad270 100644 --- a/dashboard/modules/node/tests/test_node.py +++ b/dashboard/modules/node/tests/test_node.py @@ -10,10 +10,6 @@ import ray import threading from datetime import datetime, timedelta from ray.cluster_utils import Cluster -from ray.dashboard.modules.node.node_consts import ( - LOG_PRUNE_THREASHOLD, - MAX_LOGS_TO_CACHE, -) from ray.dashboard.tests.conftest import * # noqa from ray._private.test_utils import ( format_web_url, @@ -22,8 +18,6 @@ from ray._private.test_utils import ( wait_until_succeeded_without_exception, ) -from unittest import mock - logger = logging.getLogger(__name__) @@ -327,204 +321,5 @@ def test_multi_node_churn( time.sleep(2) -@pytest.fixture -def disable_dashboard_log_info(request): - if request.param is False: - env_var_value = "0" - else: - env_var_value = "1" - - with mock.patch.dict( - os.environ, - { - "RAY_DISABLE_DASHBOARD_LOG_INFO": env_var_value, - }, - ): - yield request.param - - -@pytest.mark.parametrize( - "ray_start_cluster_head", [{"include_dashboard": True}], indirect=True -) -@pytest.mark.parametrize("disable_dashboard_log_info", [False, True], indirect=True) -def test_logs( - enable_test_module, - disable_aiohttp_cache, - disable_dashboard_log_info, - ray_start_cluster_head, -): - cluster = ray_start_cluster_head - assert wait_until_server_available(cluster.webui_url) is True - webui_url = cluster.webui_url - webui_url = format_web_url(webui_url) - nodes = ray.nodes() - assert len(nodes) == 1 - node_ip = nodes[0]["NodeManagerAddress"] - - @ray.remote - class LoggingActor: - def go(self, n): - i = 0 - while i < n: - print(f"On number {i}") - i += 1 - - def get_pid(self): - return os.getpid() - - la = LoggingActor.remote() - la2 = LoggingActor.remote() - la_pid = str(ray.get(la.get_pid.remote())) - la2_pid = str(ray.get(la2.get_pid.remote())) - ray.get(la.go.remote(4)) - ray.get(la2.go.remote(1)) - - def check_logs(): - node_logs_response = requests.get( - f"{webui_url}/node_logs", params={"ip": node_ip} - ) - node_logs_response.raise_for_status() - node_logs = node_logs_response.json() - assert node_logs["result"] - assert type(node_logs["data"]["logs"]) is dict - - if disable_dashboard_log_info: - assert node_logs["data"]["logs"] == {} - return - - assert all(pid in node_logs["data"]["logs"] for pid in (la_pid, la2_pid)) - assert len(node_logs["data"]["logs"][la2_pid]) == 1 - - actor_one_logs_response = requests.get( - f"{webui_url}/node_logs", params={"ip": node_ip, "pid": str(la_pid)} - ) - actor_one_logs_response.raise_for_status() - actor_one_logs = actor_one_logs_response.json() - assert actor_one_logs["result"] - assert type(actor_one_logs["data"]["logs"]) is dict - assert len(actor_one_logs["data"]["logs"][la_pid]) == 4 - - assert wait_until_succeeded_without_exception( - check_logs, (AssertionError,), timeout_ms=1000 - ) - - -@pytest.mark.parametrize( - "ray_start_cluster_head", [{"include_dashboard": True}], indirect=True -) -def test_logs_clean_up( - enable_test_module, disable_aiohttp_cache, ray_start_cluster_head -): - """Check if logs from the dead pids are GC'ed.""" - cluster = ray_start_cluster_head - assert wait_until_server_available(cluster.webui_url) is True - webui_url = cluster.webui_url - webui_url = format_web_url(webui_url) - nodes = ray.nodes() - assert len(nodes) == 1 - node_ip = nodes[0]["NodeManagerAddress"] - - @ray.remote - class LoggingActor: - def go(self, n): - i = 0 - while i < n: - print(f"On number {i}") - i += 1 - - def get_pid(self): - return os.getpid() - - la = LoggingActor.remote() - la_pid = str(ray.get(la.get_pid.remote())) - ray.get(la.go.remote(1)) - - def check_logs(): - node_logs_response = requests.get( - f"{webui_url}/node_logs", params={"ip": node_ip} - ) - node_logs_response.raise_for_status() - node_logs = node_logs_response.json() - assert node_logs["result"] - assert la_pid in node_logs["data"]["logs"] - - assert wait_until_succeeded_without_exception( - check_logs, (AssertionError,), timeout_ms=1000 - ) - ray.kill(la) - - def check_logs_not_exist(): - node_logs_response = requests.get( - f"{webui_url}/node_logs", params={"ip": node_ip} - ) - node_logs_response.raise_for_status() - node_logs = node_logs_response.json() - assert node_logs["result"] - assert la_pid not in node_logs["data"]["logs"] - - assert wait_until_succeeded_without_exception( - check_logs_not_exist, (AssertionError,), timeout_ms=10000 - ) - - -@pytest.mark.parametrize( - "ray_start_cluster_head", [{"include_dashboard": True}], indirect=True -) -def test_logs_max_count( - enable_test_module, disable_aiohttp_cache, ray_start_cluster_head -): - """Test that each Ray worker cannot cache more than 1000 logs at a time.""" - cluster = ray_start_cluster_head - assert wait_until_server_available(cluster.webui_url) is True - webui_url = cluster.webui_url - webui_url = format_web_url(webui_url) - nodes = ray.nodes() - assert len(nodes) == 1 - node_ip = nodes[0]["NodeManagerAddress"] - - @ray.remote - class LoggingActor: - def go(self, n): - i = 0 - while i < n: - print(f"On number {i}") - i += 1 - - def get_pid(self): - return os.getpid() - - la = LoggingActor.remote() - la_pid = str(ray.get(la.get_pid.remote())) - ray.get(la.go.remote(MAX_LOGS_TO_CACHE * LOG_PRUNE_THREASHOLD)) - - def check_logs(): - node_logs_response = requests.get( - f"{webui_url}/node_logs", params={"ip": node_ip} - ) - node_logs_response.raise_for_status() - node_logs = node_logs_response.json() - assert node_logs["result"] - assert type(node_logs["data"]["logs"]) is dict - assert la_pid in node_logs["data"]["logs"] - log_lengths = len(node_logs["data"]["logs"][la_pid]) - assert log_lengths >= MAX_LOGS_TO_CACHE - assert log_lengths <= MAX_LOGS_TO_CACHE * LOG_PRUNE_THREASHOLD - - actor_one_logs_response = requests.get( - f"{webui_url}/node_logs", params={"ip": node_ip, "pid": str(la_pid)} - ) - actor_one_logs_response.raise_for_status() - actor_one_logs = actor_one_logs_response.json() - assert actor_one_logs["result"] - assert type(actor_one_logs["data"]["logs"]) is dict - log_lengths = len(actor_one_logs["data"]["logs"][la_pid]) - assert log_lengths >= MAX_LOGS_TO_CACHE - assert log_lengths <= MAX_LOGS_TO_CACHE * LOG_PRUNE_THREASHOLD - - assert wait_until_succeeded_without_exception( - check_logs, (AssertionError,), timeout_ms=10000 - ) - - if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/dashboard/tests/conftest.py b/dashboard/tests/conftest.py index b99884bdf..5a5a0a1e9 100644 --- a/dashboard/tests/conftest.py +++ b/dashboard/tests/conftest.py @@ -40,3 +40,10 @@ def fast_gcs_failure_detection(): os.environ.pop("GCS_CHECK_ALIVE_INTERVAL_SECONDS", None) os.environ.pop("GCS_RETRY_CONNECT_INTERVAL_SECONDS", None) os.environ.pop("GCS_CHECK_ALIVE_RPC_TIMEOUT", None) + + +@pytest.fixture +def reduce_actor_cache(): + os.environ["RAY_DASHBOARD_MAX_ACTORS_TO_CACHE"] = "3" + yield + os.environ.pop("RAY_DASHBOARD_MAX_ACTORS_TO_CACHE", None)