[Dashboard] Turn on New Dashboard by Default (#11321)

This commit is contained in:
Max Fitton 2020-10-19 13:31:11 -04:00 committed by GitHub
parent 202b1859ef
commit f500292d41
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 123 additions and 814 deletions

3
.gitignore vendored
View file

@ -1,4 +1,5 @@
# The build output should clearly not be checked in
*test-output.xml
/bazel-*
/python/ray/core
/python/ray/pickle5_files/
@ -11,7 +12,7 @@
/thirdparty/pkg/
/build/java
.jar
/dashboard/client/build
# Files generated by flatc should be ignored
/src/ray/gcs/format/*_generated.h
/src/ray/object_manager/format/*_generated.h

View file

@ -22,7 +22,6 @@ if [ -z "${BUILD_DIR}" ]; then
fi
TEST_DIR="${BUILD_DIR}/python/ray/tests"
TEST_SCRIPTS=("$TEST_DIR/test_microbenchmarks.py" "$TEST_DIR/test_basic.py")
UI_TEST_SCRIPT="${BUILD_DIR}/python/ray/tests/test_webui.py"
function retry {
local n=1
@ -77,9 +76,6 @@ if [[ "$platform" == "linux" ]]; then
for SCRIPT in "${TEST_SCRIPTS[@]}"; do
retry "$PYTHON_EXE" "$SCRIPT"
done
# Run the UI test to make sure that the packaged UI works.
retry "$PYTHON_EXE" "$UI_TEST_SCRIPT"
done
# Check that the other wheels are present.
@ -118,12 +114,6 @@ elif [[ "$platform" == "macosx" ]]; then
for SCRIPT in "${TEST_SCRIPTS[@]}"; do
retry "$PYTHON_EXE" "$SCRIPT"
done
if (( $(echo "$PY_MM >= 3.0" | bc) )); then
# Run the UI test to make sure that the packaged UI works.
retry "$PYTHON_EXE" "$UI_TEST_SCRIPT"
fi
done
elif [ "${platform}" = windows ]; then
echo "WARNING: Wheel testing not yet implemented for Windows."

View file

@ -38,6 +38,7 @@ aiogrpc.init_grpc_aio()
class DashboardAgent(object):
def __init__(self,
redis_address,
dashboard_agent_port,
redis_password=None,
temp_dir=None,
log_dir=None,
@ -51,6 +52,7 @@ class DashboardAgent(object):
self.redis_password = redis_password
self.temp_dir = temp_dir
self.log_dir = log_dir
self.dashboard_agent_port = dashboard_agent_port
self.metrics_export_port = metrics_export_port
self.node_manager_port = node_manager_port
self.object_store_name = object_store_name
@ -59,7 +61,8 @@ class DashboardAgent(object):
assert self.node_id, "Empty node id (RAY_NODE_ID)."
self.ip = ray._private.services.get_node_ip_address()
self.server = aiogrpc.server(options=(("grpc.so_reuseport", 0), ))
self.grpc_port = self.server.add_insecure_port("[::]:0")
self.grpc_port = self.server.add_insecure_port(
f"[::]:{self.dashboard_agent_port}")
logger.info("Dashboard agent grpc address: %s:%s", self.ip,
self.grpc_port)
self.aioredis_client = None
@ -186,6 +189,11 @@ if __name__ == "__main__":
required=True,
type=int,
help="The port to expose metrics through Prometheus.")
parser.add_argument(
"--dashboard-agent-port",
required=True,
type=int,
help="The port on which the dashboard agent will receive GRPCs.")
parser.add_argument(
"--node-manager-port",
required=True,
@ -288,6 +296,7 @@ if __name__ == "__main__":
agent = DashboardAgent(
args.redis_address,
args.dashboard_agent_port,
redis_password=args.redis_password,
temp_dir=temp_dir,
log_dir=log_dir,

View file

@ -3,7 +3,6 @@ try:
except ImportError:
print("The dashboard requires aiohttp to run.")
import sys
sys.exit(1)
import argparse

View file

@ -111,15 +111,13 @@ class DataOrganizer:
node_physical_stats = DataSource.node_physical_stats.get(node_id, {})
node_stats = DataSource.node_stats.get(node_id, {})
node = DataSource.nodes.get(node_id, {})
node_ip = DataSource.node_id_to_ip.get(node_id)
# Merge node log count information into the payload
log_info = DataSource.ip_and_pid_to_logs.get(node_physical_stats["ip"],
{})
log_info = DataSource.ip_and_pid_to_logs.get(node_ip, {})
node_log_count = 0
for entries in log_info.values():
node_log_count += len(entries)
error_info = DataSource.ip_and_pid_to_errors.get(
node_physical_stats["ip"], {})
error_info = DataSource.ip_and_pid_to_errors.get(node_ip, {})
node_err_count = 0
for entries in error_info.values():
node_err_count += len(entries)

View file

@ -33,9 +33,8 @@ def test_actor_groups(ray_start_with_dashboard):
foo_actors = [Foo.remote(4), Foo.remote(5)]
infeasible_actor = InfeasibleActor.remote() # noqa
results = [actor.do_task.remote() for actor in foo_actors] # noqa
assert (wait_until_server_available(ray_start_with_dashboard["webui_url"])
is True)
webui_url = ray_start_with_dashboard["webui_url"]
assert wait_until_server_available(webui_url)
webui_url = format_web_url(webui_url)
timeout_seconds = 5
@ -75,5 +74,66 @@ def test_actor_groups(ray_start_with_dashboard):
raise Exception(f"Timed out while testing, {ex_stack}")
def test_kill_actor(ray_start_with_dashboard):
@ray.remote
class Actor:
def __init__(self):
pass
def f(self):
ray.show_in_dashboard("test")
return os.getpid()
a = Actor.remote()
worker_pid = ray.get(a.f.remote()) # noqa
webui_url = ray_start_with_dashboard["webui_url"]
assert wait_until_server_available(webui_url)
webui_url = format_web_url(webui_url)
def actor_killed(pid):
"""Check For the existence of a unix pid."""
try:
os.kill(pid, 0)
except OSError:
return True
else:
return False
def get_actor():
resp = requests.get(f"{webui_url}/logical/actor_groups")
resp.raise_for_status()
actor_groups_resp = resp.json()
assert actor_groups_resp["result"] is True, actor_groups_resp["msg"]
actor_groups = actor_groups_resp["data"]["actorGroups"]
actor = actor_groups["Actor"]["entries"][0]
return actor
def kill_actor_using_dashboard(actor):
resp = requests.get(
webui_url + "/logical/kill_actor",
params={
"actorId": actor["actorId"],
"ipAddress": actor["ipAddress"],
"port": actor["port"]
})
resp.raise_for_status()
resp_json = resp.json()
assert resp_json["result"] is True, "msg" in resp_json
start = time.time()
last_exc = None
while time.time() - start <= 10:
try:
actor = get_actor()
kill_actor_using_dashboard(actor)
last_exc = None
break
except (KeyError, AssertionError) as e:
last_exc = e
time.sleep(.1)
assert last_exc is None
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

View file

@ -94,23 +94,15 @@ class ReporterAgent(dashboard_utils.DashboardAgentModule,
return reporter_pb2.GetProfilingStatsReply(
profiling_stats=profiling_stats, std_out=stdout, std_err=stderr)
async def ReportMetrics(self, request, context):
# NOTE: Exceptions are not propagated properly
# when we don't catch them here.
async def ReportOCMetrics(self, request, context):
# This function receives a GRPC containing OpenCensus (OC) metrics
# from a Ray process, then exposes those metrics to Prometheus.
try:
metrcs_description_required = (
self._metrics_agent.record_metrics_points(
request.metrics_points))
except Exception as e:
logger.error(e)
self._metrics_agent.record_metric_points_from_protobuf(
request.metrics)
except Exception:
logger.error(traceback.format_exc())
# If metrics description is missing, we should notify cpp processes
# that we need them. Cpp processes will then report them to here.
# We need it when (1) a new metric is reported (application metric)
# (2) a reporter goes down and restarted (currently not implemented).
return reporter_pb2.ReportMetricsReply(
metrcs_description_required=metrcs_description_required)
return reporter_pb2.ReportOCMetricsReply()
@staticmethod
def _get_cpu_percent():
@ -125,8 +117,7 @@ class ReporterAgent(dashboard_utils.DashboardAgentModule,
try:
gpus = gpustat.new_query().gpus
except Exception as e:
logger.debug(
"gpustat failed to retrieve GPU information: {}".format(e))
logger.debug(f"gpustat failed to retrieve GPU information: {e}")
for gpu in gpus:
# Note the keys in this dict have periods which throws
# off javascript so we change .s to _s
@ -233,12 +224,8 @@ class ReporterAgent(dashboard_utils.DashboardAgentModule,
"cmdline": self._get_raylet_cmdline(),
}
async def _perform_iteration(self):
async def _perform_iteration(self, aioredis_client):
"""Get any changes to the log files and push updates to Redis."""
aioredis_client = await aioredis.create_redis_pool(
address=self._dashboard_agent.redis_address,
password=self._dashboard_agent.redis_password)
while True:
try:
stats = self._get_all_stats()
@ -249,5 +236,8 @@ class ReporterAgent(dashboard_utils.DashboardAgentModule,
reporter_consts.REPORTER_UPDATE_INTERVAL_MS / 1000)
async def run(self, server):
aioredis_client = await aioredis.create_redis_pool(
address=self._dashboard_agent.redis_address,
password=self._dashboard_agent.redis_password)
reporter_pb2_grpc.add_ReporterServiceServicer_to_server(self, server)
await self._perform_iteration()
await self._perform_iteration(aioredis_client)

View file

@ -130,7 +130,7 @@ class TuneController(dashboard_utils.DashboardHeadModule):
# search through all the sub_directories in log directory
analysis = Analysis(str(self._logdir))
df = analysis.dataframe(metric="episode_reward_mean", mode="max")
df = analysis.dataframe(metric=None, mode=None)
if len(df) == 0 or "trial_id" not in df.columns:
return

View file

@ -39,7 +39,8 @@ source "$HOME"/.nvm/nvm.sh
nvm use node
# Build the dashboard so its static assets can be included in the wheel.
pushd python/ray/dashboard/client
# TODO(mfitton): switch this back when deleting old dashboard code.
pushd python/ray/new_dashboard/client
npm ci
npm run build
popd

View file

@ -35,7 +35,8 @@ nvm install node
nvm use node
# Build the dashboard so its static assets can be included in the wheel.
pushd python/ray/dashboard/client
# TODO(mfitton): switch this back when deleting old dashboard code.
pushd python/ray/new_dashboard/client
npm ci
npm run build
popd

View file

@ -495,10 +495,18 @@ def start_ray_process(command,
process.kill()
raise
def _get_stream_name(stream):
if stream is not None:
try:
return stream.name
except AttributeError:
return str(stream)
return None
return ProcessInfo(
process=process,
stdout_file=stdout_file.name if stdout_file is not None else None,
stderr_file=stderr_file.name if stderr_file is not None else None,
stdout_file=_get_stream_name(stdout_file),
stderr_file=_get_stream_name(stderr_file),
use_valgrind=use_valgrind,
use_gdb=use_gdb,
use_valgrind_profiler=use_valgrind_profiler,
@ -1037,12 +1045,7 @@ def start_dashboard(require_dashboard,
raise ValueError(
f"The given dashboard port {port} is already in use")
if "RAY_USE_NEW_DASHBOARD" in os.environ:
dashboard_dir = "new_dashboard"
else:
dashboard_dir = "dashboard"
logdir = None
dashboard_filepath = os.path.join(RAY_PATH, dashboard_dir, "dashboard.py")
command = [
sys.executable,
@ -1058,12 +1061,12 @@ def start_dashboard(require_dashboard,
if redis_password:
command += ["--redis-password", redis_password]
webui_dependencies_present = True
dashboard_dependencies_present = True
try:
import aiohttp # noqa: F401
import grpc # noqa: F401
except ImportError:
webui_dependencies_present = False
dashboard_dependencies_present = False
warning_message = (
"Failed to start the dashboard. The dashboard requires Python 3 "
"as well as 'pip install aiohttp grpcio'.")
@ -1071,8 +1074,7 @@ def start_dashboard(require_dashboard,
raise ImportError(warning_message)
else:
logger.warning(warning_message)
if webui_dependencies_present:
if dashboard_dependencies_present:
process_info = start_ray_process(
command,
ray_constants.PROCESS_TYPE_DASHBOARD,
@ -1319,12 +1321,13 @@ def start_raylet(redis_address,
sys.executable,
"-u",
os.path.join(RAY_PATH, "new_dashboard/agent.py"),
"--redis-address={}".format(redis_address),
"--metrics-export-port={}".format(metrics_export_port),
"--node-manager-port={}".format(node_manager_port),
"--object-store-name={}".format(plasma_store_name),
"--raylet-name={}".format(raylet_name),
"--temp-dir={}".format(temp_dir),
f"--redis-address={redis_address}",
f"--metrics-export-port={metrics_export_port}",
f"--dashboard-agent-port={metrics_agent_port}",
f"--node-manager-port={node_manager_port}",
f"--object-store-name={plasma_store_name}",
f"--raylet-name={raylet_name}",
f"--temp-dir={temp_dir}",
]
if redis_password is not None and len(redis_password) != 0:
@ -1357,7 +1360,6 @@ def start_raylet(redis_address,
if start_initial_python_workers_for_first_job:
command.append("--num_initial_python_workers_for_first_job={}".format(
resource_spec.num_cpus))
if "RAY_USE_NEW_DASHBOARD" in os.environ:
command.append("--agent_command={}".format(
subprocess.list2cmdline(agent_command)))
if config.get("plasma_store_as_thread"):

View file

@ -626,19 +626,14 @@ class Node:
if we fail to start the dashboard. Otherwise it will print
a warning if we fail to start the dashboard.
"""
if "RAY_USE_NEW_DASHBOARD" in os.environ:
stdout_file, stderr_file = None, None
else:
stdout_file, stderr_file = self.get_log_file_handles(
"dashboard", unique=True)
self._webui_url, process_info = ray._private.services.start_dashboard(
require_dashboard,
self._ray_params.dashboard_host,
self.redis_address,
self._temp_dir,
self._logs_dir,
stdout_file=stdout_file,
stderr_file=stderr_file,
stdout_file=subprocess.DEVNULL, # Avoid hang(fd inherit)
stderr_file=subprocess.DEVNULL, # Avoid hang(fd inherit)
redis_password=self._ray_params.redis_password,
fate_share=self.kernel_fate_share,
port=self._ray_params.dashboard_port)
@ -828,9 +823,6 @@ class Node:
)
self.start_plasma_store(plasma_directory, object_store_memory)
self.start_raylet(plasma_directory, object_store_memory)
if "RAY_USE_NEW_DASHBOARD" not in os.environ:
self.start_reporter()
if self._ray_params.include_log_monitor:
self.start_log_monitor()

View file

@ -93,7 +93,6 @@ py_test_module_list(
"test_queue.py",
"test_ray_init.py",
"test_tempfile.py",
"test_webui.py",
],
size = "small",
extra_srcs = SRCS,

View file

@ -1,27 +1,19 @@
import os
import json
import grpc
import pytest
import requests
import time
import numpy as np
import ray
from ray.core.generated import node_manager_pb2
from ray.core.generated import node_manager_pb2_grpc
from ray.core.generated import reporter_pb2
from ray.core.generated import reporter_pb2_grpc
from ray.dashboard.memory import (ReferenceType, decode_object_ref_if_needed,
MemoryTableEntry, MemoryTable, SortingType)
from ray.test_utils import (RayTestTimeoutException,
wait_until_succeeded_without_exception,
wait_until_server_available, wait_for_condition)
wait_until_succeeded_without_exception)
import psutil # We must import psutil after ray because we bundle it with ray.
def test_worker_stats(shutdown_only):
addresses = ray.init(num_cpus=1, include_dashboard=True)
ray.init(num_cpus=1, include_dashboard=True)
raylet = ray.nodes()[0]
num_cpus = raylet["Resources"]["CPU"]
raylet_address = "{}:{}".format(raylet["NodeManagerAddress"],
@ -104,8 +96,6 @@ def test_worker_stats(shutdown_only):
# Check that the rest of the processes are workers, 1 for each CPU.
assert len(reply.workers_stats) == num_cpus + 1
views = [view.view_name for view in reply.view_data]
assert "local_available_resource" in views
# Check that all processes are Python.
pids = [worker.pid for worker in reply.workers_stats]
processes = [
@ -119,248 +109,6 @@ def test_worker_stats(shutdown_only):
or "runner" in process or "ray" in process)
break
# Test kill_actor.
def actor_killed(PID):
"""Check For the existence of a unix pid."""
try:
os.kill(PID, 0)
except OSError:
return True
else:
return False
assert (wait_until_server_available(addresses["webui_url"]) is True)
webui_url = addresses["webui_url"]
webui_url = webui_url.replace("127.0.0.1", "http://127.0.0.1")
for worker in reply.workers_stats:
if worker.is_driver:
continue
requests.get(
webui_url + "/api/kill_actor",
params={
"actor_id": ray.utils.binary_to_hex(
worker.core_worker_stats.actor_id),
"ip_address": worker.core_worker_stats.ip_address,
"port": worker.core_worker_stats.port
})
timeout_seconds = 20
start_time = time.time()
while True:
if time.time() - start_time > timeout_seconds:
raise RayTestTimeoutException("Timed out while killing actors")
if all(
actor_killed(worker.pid) for worker in reply.workers_stats
if not worker.is_driver):
break
def test_raylet_info_endpoint(shutdown_only):
addresses = ray.init(include_dashboard=True, num_cpus=6)
@ray.remote
def f():
return "test"
@ray.remote(num_cpus=1)
class ActorA:
def __init__(self):
pass
@ray.remote(resources={"CustomResource": 1})
class ActorB:
def __init__(self):
pass
@ray.remote(num_cpus=2)
class ActorC:
def __init__(self):
self.children = [ActorA.remote(), ActorB.remote()]
def local_store(self):
self.local_storage = [f.remote() for _ in range(10)]
def remote_store(self):
self.remote_storage = ray.put(np.zeros(200 * 1024, dtype=np.uint8))
def getpid(self):
return os.getpid()
c = ActorC.remote()
actor_pid = ray.get(c.getpid.remote())
c.local_store.remote()
c.remote_store.remote()
assert (wait_until_server_available(addresses["webui_url"]) is True)
start_time = time.time()
while True:
time.sleep(1)
try:
webui_url = addresses["webui_url"]
webui_url = webui_url.replace("127.0.0.1", "http://127.0.0.1")
response = requests.get(webui_url + "/api/raylet_info")
response.raise_for_status()
try:
raylet_info = response.json()
except Exception as ex:
print("failed response: {}".format(response.text))
raise ex
actor_groups = raylet_info["result"]["actorGroups"]
try:
assert len(actor_groups.keys()) == 3
c_actor_info = actor_groups["ActorC"]["entries"][0]
assert c_actor_info["numObjectRefsInScope"] == 13
assert c_actor_info["numLocalObjects"] == 10
break
except AssertionError:
if time.time() > start_time + 30:
raise Exception("Timed out while waiting for actor info \
or object store info update.")
except requests.exceptions.ConnectionError:
if time.time() > start_time + 30:
raise Exception(
"Timed out while waiting for dashboard to start.")
def cpu_resources(actor_info):
cpu_resources = 0
for slot in actor_info["usedResources"]["CPU"]["resourceSlots"]:
cpu_resources += slot["allocation"]
return cpu_resources
assert cpu_resources(c_actor_info) == 2
assert c_actor_info["numExecutedTasks"] == 4
profiling_id = requests.get(
webui_url + "/api/launch_profiling",
params={
"node_id": ray.nodes()[0]["NodeID"],
"pid": actor_pid,
"duration": 5
}).json()["result"]
start_time = time.time()
while True:
# Sometimes some startup time is required
if time.time() - start_time > 30:
raise RayTestTimeoutException(
"Timed out while collecting profiling stats.")
profiling_info = requests.get(
webui_url + "/api/check_profiling_status",
params={
"profiling_id": profiling_id,
}).json()
status = profiling_info["result"]["status"]
assert status in ("finished", "pending", "error")
if status in ("finished", "error"):
break
time.sleep(1)
def test_raylet_infeasible_tasks(shutdown_only):
"""
This test creates an actor that requires 5 GPUs
but a ray cluster only has 3 GPUs. As a result,
the new actor should be an infeasible actor.
"""
addresses = ray.init(num_gpus=3)
@ray.remote(num_gpus=5)
class ActorRequiringGPU:
def __init__(self):
pass
ActorRequiringGPU.remote()
def test_infeasible_actor(ray_addresses):
assert (wait_until_server_available(addresses["webui_url"]) is True)
webui_url = ray_addresses["webui_url"].replace("127.0.0.1",
"http://127.0.0.1")
raylet_info = requests.get(webui_url + "/api/raylet_info").json()
actor_info = raylet_info["result"]["actorGroups"]
assert len(actor_info) == 1
_, infeasible_actor_info = actor_info.popitem()
assert infeasible_actor_info["entries"][0]["state"] == -2
assert (wait_until_succeeded_without_exception(
test_infeasible_actor,
(AssertionError, requests.exceptions.ConnectionError),
addresses,
timeout_ms=30000,
retry_interval_ms=1000) is True)
def test_raylet_pending_tasks(shutdown_only):
# Make sure to specify num_cpus. Otherwise, the test can be broken
# when the number of cores is less than the number of spawned actors.
addresses = ray.init(num_gpus=3, num_cpus=4)
@ray.remote(num_gpus=1)
class ActorRequiringGPU:
def __init__(self):
pass
@ray.remote
class ParentActor:
def __init__(self):
self.a = [ActorRequiringGPU.remote() for i in range(4)]
# If we do not get ParentActor actor handler, reference counter will
# terminate ParentActor.
parent_actor = ParentActor.remote()
assert parent_actor is not None
def test_pending_actor(ray_addresses):
assert (wait_until_server_available(addresses["webui_url"]) is True)
webui_url = ray_addresses["webui_url"].replace("127.0.0.1",
"http://127.0.0.1")
raylet_info = requests.get(webui_url + "/api/raylet_info").json()
actor_info = raylet_info["result"]["actors"]
assert len(actor_info) == 1
_, infeasible_actor_info = actor_info.popitem()
wait_until_succeeded_without_exception(
test_pending_actor,
(AssertionError, requests.exceptions.ConnectionError),
addresses,
timeout_ms=30000,
retry_interval_ms=1000)
@pytest.mark.skipif(
os.environ.get("TRAVIS") is None,
reason="This test requires password-less sudo due to py-spy requirement.")
def test_profiling_info_endpoint(shutdown_only):
ray.init(num_cpus=1)
redis_client = ray.worker.global_worker.redis_client
node_ip = ray.nodes()[0]["NodeManagerAddress"]
while True:
reporter_port = redis_client.get("REPORTER_PORT:{}".format(node_ip))
if reporter_port:
break
reporter_channel = grpc.insecure_channel("{}:{}".format(
node_ip, int(reporter_port)))
reporter_stub = reporter_pb2_grpc.ReporterServiceStub(reporter_channel)
@ray.remote(num_cpus=1)
class ActorA:
def __init__(self):
pass
def getpid(self):
return os.getpid()
a = ActorA.remote()
actor_pid = ray.get(a.getpid.remote())
reply = reporter_stub.GetProfilingStats(
reporter_pb2.GetProfilingStatsRequest(pid=actor_pid, duration=10))
profiling_stats = json.loads(reply.profiling_stats)
assert profiling_stats is not None
def test_multi_node_metrics_export_port_discovery(ray_start_cluster):
NUM_NODES = 3
@ -390,438 +138,7 @@ def test_multi_node_metrics_export_port_discovery(ray_start_cluster):
test_prometheus_endpoint, (requests.exceptions.ConnectionError, ))
# This variable is used inside test_memory_dashboard.
# It is defined as a global variable to be used across all nested test
# functions. We use it because memory table is updated every one second,
# and we need to have a way to verify if the test is running with a fresh
# new memory table.
prev_memory_table = MemoryTable([]).__dict__()["group"]
def test_memory_dashboard(shutdown_only):
"""Test Memory table.
These tests verify examples in this document.
https://docs.ray.io/en/master/memory-management.html#debugging-using-ray-memory
"""
addresses = ray.init(num_cpus=2)
webui_url = addresses["webui_url"].replace("127.0.0.1", "http://127.0.0.1")
assert (wait_until_server_available(addresses["webui_url"]) is True)
def get_memory_table():
memory_table = requests.get(webui_url + "/api/memory_table").json()
return memory_table["result"]
def memory_table_ready():
"""Wait until the new fresh memory table is ready."""
global prev_memory_table
memory_table = get_memory_table()
is_ready = memory_table["group"] != prev_memory_table
prev_memory_table = memory_table["group"]
return is_ready
def stop_memory_table():
requests.get(webui_url + "/api/stop_memory_table").json()
def test_local_reference():
@ray.remote
def f(arg):
return arg
# a and b are local references.
a = ray.put(None) # Noqa F841
b = f.remote(None) # Noqa F841
wait_for_condition(memory_table_ready)
memory_table = get_memory_table()
summary = memory_table["summary"]
group = memory_table["group"]
assert summary["total_captured_in_objects"] == 0
assert summary["total_pinned_in_memory"] == 0
assert summary["total_used_by_pending_task"] == 0
assert summary["total_local_ref_count"] == 2
for table in group.values():
for entry in table["entries"]:
assert (
entry["reference_type"] == ReferenceType.LOCAL_REFERENCE)
stop_memory_table()
return True
def test_object_pinned_in_memory():
a = ray.put(np.zeros(200 * 1024, dtype=np.uint8))
b = ray.get(a) # Noqa F841
del a
wait_for_condition(memory_table_ready)
memory_table = get_memory_table()
summary = memory_table["summary"]
group = memory_table["group"]
assert summary["total_captured_in_objects"] == 0
assert summary["total_pinned_in_memory"] == 1
assert summary["total_used_by_pending_task"] == 0
assert summary["total_local_ref_count"] == 0
for table in group.values():
for entry in table["entries"]:
assert (
entry["reference_type"] == ReferenceType.PINNED_IN_MEMORY)
stop_memory_table()
return True
def test_pending_task_references():
@ray.remote
def f(arg):
time.sleep(1)
a = ray.put(np.zeros(200 * 1024, dtype=np.uint8))
b = f.remote(a)
wait_for_condition(memory_table_ready)
memory_table = get_memory_table()
summary = memory_table["summary"]
assert summary["total_captured_in_objects"] == 0
assert summary["total_pinned_in_memory"] == 1
assert summary["total_used_by_pending_task"] == 1
assert summary["total_local_ref_count"] == 1
# Make sure the function f is done before going to the next test.
# Otherwise, the memory table will be corrupted because the
# task f won't be done when the next test is running.
ray.get(b)
stop_memory_table()
return True
def test_serialized_object_ref_reference():
@ray.remote
def f(arg):
time.sleep(1)
a = ray.put(None)
b = f.remote([a]) # Noqa F841
wait_for_condition(memory_table_ready)
memory_table = get_memory_table()
summary = memory_table["summary"]
assert summary["total_captured_in_objects"] == 0
assert summary["total_pinned_in_memory"] == 0
assert summary["total_used_by_pending_task"] == 1
assert summary["total_local_ref_count"] == 2
# Make sure the function f is done before going to the next test.
# Otherwise, the memory table will be corrupted because the
# task f won't be done when the next test is running.
ray.get(b)
stop_memory_table()
return True
def test_captured_object_ref_reference():
a = ray.put(None)
b = ray.put([a]) # Noqa F841
del a
wait_for_condition(memory_table_ready)
memory_table = get_memory_table()
summary = memory_table["summary"]
assert summary["total_captured_in_objects"] == 1
assert summary["total_pinned_in_memory"] == 0
assert summary["total_used_by_pending_task"] == 0
assert summary["total_local_ref_count"] == 1
stop_memory_table()
return True
def test_actor_handle_reference():
@ray.remote
class Actor:
pass
a = Actor.remote() # Noqa F841
b = Actor.remote() # Noqa F841
c = Actor.remote() # Noqa F841
wait_for_condition(memory_table_ready)
memory_table = get_memory_table()
summary = memory_table["summary"]
group = memory_table["group"]
assert summary["total_captured_in_objects"] == 0
assert summary["total_pinned_in_memory"] == 0
assert summary["total_used_by_pending_task"] == 0
assert summary["total_local_ref_count"] == 0
assert summary["total_actor_handles"] == 3
for table in group.values():
for entry in table["entries"]:
assert (entry["reference_type"] == ReferenceType.ACTOR_HANDLE)
stop_memory_table()
return True
# These tests should be retried because it takes at least one second
# to get the fresh new memory table. It is because memory table is updated
# Whenever raylet and node info is renewed which takes 1 second.
wait_for_condition(
test_local_reference, timeout=30000, retry_interval_ms=1000)
wait_for_condition(
test_object_pinned_in_memory, timeout=30000, retry_interval_ms=1000)
wait_for_condition(
test_pending_task_references, timeout=30000, retry_interval_ms=1000)
wait_for_condition(
test_serialized_object_ref_reference,
timeout=30000,
retry_interval_ms=1000)
wait_for_condition(
test_captured_object_ref_reference,
timeout=30000,
retry_interval_ms=1000)
wait_for_condition(
test_actor_handle_reference, timeout=30000, retry_interval_ms=1000)
"""Memory Table Unit Test"""
NODE_ADDRESS = "127.0.0.1"
IS_DRIVER = True
PID = 1
OBJECT_ID = "7wpsIhgZiBz/////AQAAyAEAAAA="
ACTOR_ID = "fffffffffffffffff66d17ba010000c801000000"
DECODED_ID = decode_object_ref_if_needed(OBJECT_ID)
OBJECT_SIZE = 100
def build_memory_entry(*,
local_ref_count,
pinned_in_memory,
submitted_task_reference_count,
contained_in_owned,
object_size,
pid,
object_id=OBJECT_ID,
node_address=NODE_ADDRESS):
object_ref = {
"objectId": object_id,
"callSite": "(task call) /Users:458",
"objectSize": object_size,
"localRefCount": local_ref_count,
"pinnedInMemory": pinned_in_memory,
"submittedTaskRefCount": submitted_task_reference_count,
"containedInOwned": contained_in_owned
}
return MemoryTableEntry(
object_ref=object_ref,
node_address=node_address,
is_driver=IS_DRIVER,
pid=pid)
def build_local_reference_entry(object_size=OBJECT_SIZE,
pid=PID,
node_address=NODE_ADDRESS):
return build_memory_entry(
local_ref_count=1,
pinned_in_memory=False,
submitted_task_reference_count=0,
contained_in_owned=[],
object_size=object_size,
pid=pid,
node_address=node_address)
def build_used_by_pending_task_entry(object_size=OBJECT_SIZE,
pid=PID,
node_address=NODE_ADDRESS):
return build_memory_entry(
local_ref_count=0,
pinned_in_memory=False,
submitted_task_reference_count=2,
contained_in_owned=[],
object_size=object_size,
pid=pid,
node_address=node_address)
def build_captured_in_object_entry(object_size=OBJECT_SIZE,
pid=PID,
node_address=NODE_ADDRESS):
return build_memory_entry(
local_ref_count=0,
pinned_in_memory=False,
submitted_task_reference_count=0,
contained_in_owned=[OBJECT_ID],
object_size=object_size,
pid=pid,
node_address=node_address)
def build_actor_handle_entry(object_size=OBJECT_SIZE,
pid=PID,
node_address=NODE_ADDRESS):
return build_memory_entry(
local_ref_count=1,
pinned_in_memory=False,
submitted_task_reference_count=0,
contained_in_owned=[],
object_size=object_size,
pid=pid,
node_address=node_address,
object_id=ACTOR_ID)
def build_pinned_in_memory_entry(object_size=OBJECT_SIZE,
pid=PID,
node_address=NODE_ADDRESS):
return build_memory_entry(
local_ref_count=0,
pinned_in_memory=True,
submitted_task_reference_count=0,
contained_in_owned=[],
object_size=object_size,
pid=pid,
node_address=node_address)
def build_entry(object_size=OBJECT_SIZE,
pid=PID,
node_address=NODE_ADDRESS,
reference_type=ReferenceType.PINNED_IN_MEMORY):
if reference_type == ReferenceType.USED_BY_PENDING_TASK:
return build_used_by_pending_task_entry(
pid=pid, object_size=object_size, node_address=node_address)
elif reference_type == ReferenceType.LOCAL_REFERENCE:
return build_local_reference_entry(
pid=pid, object_size=object_size, node_address=node_address)
elif reference_type == ReferenceType.PINNED_IN_MEMORY:
return build_pinned_in_memory_entry(
pid=pid, object_size=object_size, node_address=node_address)
elif reference_type == ReferenceType.ACTOR_HANDLE:
return build_actor_handle_entry(
pid=pid, object_size=object_size, node_address=node_address)
elif reference_type == ReferenceType.CAPTURED_IN_OBJECT:
return build_captured_in_object_entry(
pid=pid, object_size=object_size, node_address=node_address)
def test_invalid_memory_entry():
memory_entry = build_memory_entry(
local_ref_count=0,
pinned_in_memory=False,
submitted_task_reference_count=0,
contained_in_owned=[],
object_size=OBJECT_SIZE,
pid=PID)
assert memory_entry.is_valid() is False
memory_entry = build_memory_entry(
local_ref_count=0,
pinned_in_memory=False,
submitted_task_reference_count=0,
contained_in_owned=[],
object_size=-1,
pid=PID)
assert memory_entry.is_valid() is False
def test_valid_reference_memory_entry():
memory_entry = build_local_reference_entry()
assert memory_entry.reference_type == ReferenceType.LOCAL_REFERENCE
assert memory_entry.object_ref == ray.ObjectRef(
decode_object_ref_if_needed(OBJECT_ID))
assert memory_entry.is_valid() is True
def test_reference_type():
# pinned in memory
memory_entry = build_pinned_in_memory_entry()
assert memory_entry.reference_type == ReferenceType.PINNED_IN_MEMORY
# used by pending task
memory_entry = build_used_by_pending_task_entry()
assert memory_entry.reference_type == ReferenceType.USED_BY_PENDING_TASK
# captued in object
memory_entry = build_captured_in_object_entry()
assert memory_entry.reference_type == ReferenceType.CAPTURED_IN_OBJECT
# actor handle
memory_entry = build_actor_handle_entry()
assert memory_entry.reference_type == ReferenceType.ACTOR_HANDLE
def test_memory_table_summary():
entries = [
build_pinned_in_memory_entry(),
build_used_by_pending_task_entry(),
build_captured_in_object_entry(),
build_actor_handle_entry(),
build_local_reference_entry(),
build_local_reference_entry()
]
memory_table = MemoryTable(entries)
assert len(memory_table.group) == 1
assert memory_table.summary["total_actor_handles"] == 1
assert memory_table.summary["total_captured_in_objects"] == 1
assert memory_table.summary["total_local_ref_count"] == 2
assert memory_table.summary[
"total_object_size"] == len(entries) * OBJECT_SIZE
assert memory_table.summary["total_pinned_in_memory"] == 1
assert memory_table.summary["total_used_by_pending_task"] == 1
def test_memory_table_sort_by_pid():
unsort = [1, 3, 2]
entries = [build_entry(pid=pid) for pid in unsort]
memory_table = MemoryTable(entries, sort_by_type=SortingType.PID)
sort = sorted(unsort)
for pid, entry in zip(sort, memory_table.table):
assert pid == entry.pid
def test_memory_table_sort_by_reference_type():
unsort = [
ReferenceType.USED_BY_PENDING_TASK, ReferenceType.LOCAL_REFERENCE,
ReferenceType.LOCAL_REFERENCE, ReferenceType.PINNED_IN_MEMORY
]
entries = [
build_entry(reference_type=reference_type) for reference_type in unsort
]
memory_table = MemoryTable(
entries, sort_by_type=SortingType.REFERENCE_TYPE)
sort = sorted(unsort)
for reference_type, entry in zip(sort, memory_table.table):
assert reference_type == entry.reference_type
def test_memory_table_sort_by_object_size():
unsort = [312, 214, -1, 1244, 642]
entries = [build_entry(object_size=object_size) for object_size in unsort]
memory_table = MemoryTable(entries, sort_by_type=SortingType.OBJECT_SIZE)
sort = sorted(unsort)
for object_size, entry in zip(sort, memory_table.table):
assert object_size == entry.object_size
def test_group_by():
node_second = "127.0.0.2"
node_first = "127.0.0.1"
entries = [
build_entry(node_address=node_second, pid=2),
build_entry(node_address=node_second, pid=1),
build_entry(node_address=node_first, pid=2),
build_entry(node_address=node_first, pid=1)
]
memory_table = MemoryTable(entries)
# Make sure it is correctly grouped
assert node_first in memory_table.group
assert node_second in memory_table.group
# make sure pid is sorted in the right order.
for group_key, group_memory_table in memory_table.group.items():
pid = 1
for entry in group_memory_table.table:
assert pid == entry.pid
pid += 1
if __name__ == "__main__":
import pytest
import sys
import pytest
sys.exit(pytest.main(["-v", __file__]))

View file

@ -1,50 +0,0 @@
import re
import sys
import time
import pytest
import requests
import ray
@pytest.mark.skipif(
sys.version_info < (3, 5, 3), reason="requires python3.5.3 or higher")
def test_get_webui(shutdown_only):
addresses = ray.init(include_dashboard=True, num_cpus=1)
webui_url = addresses["webui_url"]
assert ray.get_dashboard_url() == webui_url
assert re.match(r"^(localhost|\d+\.\d+\.\d+\.\d+):\d+$", webui_url)
start_time = time.time()
while True:
try:
node_info = requests.get("http://" + webui_url +
"/api/node_info").json()
break
except requests.exceptions.ConnectionError:
if time.time() > start_time + 30:
error_log = None
out_log = None
with open(
"{}/logs/dashboard.out".format(
addresses["session_dir"]), "r") as f:
out_log = f.read()
with open(
"{}/logs/dashboard.err".format(
addresses["session_dir"]), "r") as f:
error_log = f.read()
raise Exception(
"Timed out while waiting for dashboard to start. "
"Dashboard output log: {}\n"
"Dashboard error log: {}\n".format(out_log, error_log))
assert node_info["error"] is None
assert node_info["result"] is not None
assert isinstance(node_info["timestamp"], float)
if __name__ == "__main__":
import pytest
import sys
sys.exit(pytest.main(["-v", __file__]))