diff --git a/.gitignore b/.gitignore index 52f03c375..26dbcda73 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,4 @@ # The build output should clearly not be checked in -*test-output.xml /bazel-* /python/ray/core /python/ray/pickle5_files/ @@ -12,7 +11,7 @@ /thirdparty/pkg/ /build/java .jar -/dashboard/client/build + # Files generated by flatc should be ignored /src/ray/gcs/format/*_generated.h /src/ray/object_manager/format/*_generated.h diff --git a/ci/travis/test-wheels.sh b/ci/travis/test-wheels.sh index e763bb076..7fd671332 100755 --- a/ci/travis/test-wheels.sh +++ b/ci/travis/test-wheels.sh @@ -22,6 +22,7 @@ if [ -z "${BUILD_DIR}" ]; then fi TEST_DIR="${BUILD_DIR}/python/ray/tests" TEST_SCRIPTS=("$TEST_DIR/test_microbenchmarks.py" "$TEST_DIR/test_basic.py") +UI_TEST_SCRIPT="${BUILD_DIR}/python/ray/tests/test_webui.py" function retry { local n=1 @@ -76,6 +77,9 @@ if [[ "$platform" == "linux" ]]; then for SCRIPT in "${TEST_SCRIPTS[@]}"; do retry "$PYTHON_EXE" "$SCRIPT" done + + # Run the UI test to make sure that the packaged UI works. + retry "$PYTHON_EXE" "$UI_TEST_SCRIPT" done # Check that the other wheels are present. @@ -114,6 +118,12 @@ elif [[ "$platform" == "macosx" ]]; then for SCRIPT in "${TEST_SCRIPTS[@]}"; do retry "$PYTHON_EXE" "$SCRIPT" done + + if (( $(echo "$PY_MM >= 3.0" | bc) )); then + # Run the UI test to make sure that the packaged UI works. + retry "$PYTHON_EXE" "$UI_TEST_SCRIPT" + fi + done elif [ "${platform}" = windows ]; then echo "WARNING: Wheel testing not yet implemented for Windows." diff --git a/dashboard/agent.py b/dashboard/agent.py index 110cb9a1e..73a1bd94f 100644 --- a/dashboard/agent.py +++ b/dashboard/agent.py @@ -38,7 +38,6 @@ aiogrpc.init_grpc_aio() class DashboardAgent(object): def __init__(self, redis_address, - dashboard_agent_port, redis_password=None, temp_dir=None, log_dir=None, @@ -52,7 +51,6 @@ class DashboardAgent(object): self.redis_password = redis_password self.temp_dir = temp_dir self.log_dir = log_dir - self.dashboard_agent_port = dashboard_agent_port self.metrics_export_port = metrics_export_port self.node_manager_port = node_manager_port self.object_store_name = object_store_name @@ -61,8 +59,7 @@ class DashboardAgent(object): assert self.node_id, "Empty node id (RAY_NODE_ID)." self.ip = ray._private.services.get_node_ip_address() self.server = aiogrpc.server(options=(("grpc.so_reuseport", 0), )) - self.grpc_port = self.server.add_insecure_port( - f"[::]:{self.dashboard_agent_port}") + self.grpc_port = self.server.add_insecure_port("[::]:0") logger.info("Dashboard agent grpc address: %s:%s", self.ip, self.grpc_port) self.aioredis_client = None @@ -189,11 +186,6 @@ if __name__ == "__main__": required=True, type=int, help="The port to expose metrics through Prometheus.") - parser.add_argument( - "--dashboard-agent-port", - required=True, - type=int, - help="The port on which the dashboard agent will receive GRPCs.") parser.add_argument( "--node-manager-port", required=True, @@ -296,7 +288,6 @@ if __name__ == "__main__": agent = DashboardAgent( args.redis_address, - args.dashboard_agent_port, redis_password=args.redis_password, temp_dir=temp_dir, log_dir=log_dir, diff --git a/dashboard/dashboard.py b/dashboard/dashboard.py index 1e35cb66a..f8b4b3c5f 100644 --- a/dashboard/dashboard.py +++ b/dashboard/dashboard.py @@ -3,6 +3,7 @@ try: except ImportError: print("The dashboard requires aiohttp to run.") import sys + sys.exit(1) import argparse diff --git a/dashboard/datacenter.py b/dashboard/datacenter.py index 109bdc13e..23a239f29 100644 --- a/dashboard/datacenter.py +++ b/dashboard/datacenter.py @@ -111,13 +111,15 @@ class DataOrganizer: node_physical_stats = DataSource.node_physical_stats.get(node_id, {}) node_stats = DataSource.node_stats.get(node_id, {}) node = DataSource.nodes.get(node_id, {}) - node_ip = DataSource.node_id_to_ip.get(node_id) + # Merge node log count information into the payload - log_info = DataSource.ip_and_pid_to_logs.get(node_ip, {}) + log_info = DataSource.ip_and_pid_to_logs.get(node_physical_stats["ip"], + {}) node_log_count = 0 for entries in log_info.values(): node_log_count += len(entries) - error_info = DataSource.ip_and_pid_to_errors.get(node_ip, {}) + error_info = DataSource.ip_and_pid_to_errors.get( + node_physical_stats["ip"], {}) node_err_count = 0 for entries in error_info.values(): node_err_count += len(entries) diff --git a/dashboard/modules/logical_view/test_logical_view_head.py b/dashboard/modules/logical_view/test_logical_view_head.py index f9ffebfdb..f4118da51 100644 --- a/dashboard/modules/logical_view/test_logical_view_head.py +++ b/dashboard/modules/logical_view/test_logical_view_head.py @@ -33,8 +33,9 @@ def test_actor_groups(ray_start_with_dashboard): foo_actors = [Foo.remote(4), Foo.remote(5)] infeasible_actor = InfeasibleActor.remote() # noqa results = [actor.do_task.remote() for actor in foo_actors] # noqa + assert (wait_until_server_available(ray_start_with_dashboard["webui_url"]) + is True) webui_url = ray_start_with_dashboard["webui_url"] - assert wait_until_server_available(webui_url) webui_url = format_web_url(webui_url) timeout_seconds = 5 @@ -74,66 +75,5 @@ def test_actor_groups(ray_start_with_dashboard): raise Exception(f"Timed out while testing, {ex_stack}") -def test_kill_actor(ray_start_with_dashboard): - @ray.remote - class Actor: - def __init__(self): - pass - - def f(self): - ray.show_in_dashboard("test") - return os.getpid() - - a = Actor.remote() - worker_pid = ray.get(a.f.remote()) # noqa - - webui_url = ray_start_with_dashboard["webui_url"] - assert wait_until_server_available(webui_url) - webui_url = format_web_url(webui_url) - - def actor_killed(pid): - """Check For the existence of a unix pid.""" - try: - os.kill(pid, 0) - except OSError: - return True - else: - return False - - def get_actor(): - resp = requests.get(f"{webui_url}/logical/actor_groups") - resp.raise_for_status() - actor_groups_resp = resp.json() - assert actor_groups_resp["result"] is True, actor_groups_resp["msg"] - actor_groups = actor_groups_resp["data"]["actorGroups"] - actor = actor_groups["Actor"]["entries"][0] - return actor - - def kill_actor_using_dashboard(actor): - resp = requests.get( - webui_url + "/logical/kill_actor", - params={ - "actorId": actor["actorId"], - "ipAddress": actor["ipAddress"], - "port": actor["port"] - }) - resp.raise_for_status() - resp_json = resp.json() - assert resp_json["result"] is True, "msg" in resp_json - - start = time.time() - last_exc = None - while time.time() - start <= 10: - try: - actor = get_actor() - kill_actor_using_dashboard(actor) - last_exc = None - break - except (KeyError, AssertionError) as e: - last_exc = e - time.sleep(.1) - assert last_exc is None - - if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/dashboard/modules/reporter/reporter_agent.py b/dashboard/modules/reporter/reporter_agent.py index 01d17a801..d1e53b644 100644 --- a/dashboard/modules/reporter/reporter_agent.py +++ b/dashboard/modules/reporter/reporter_agent.py @@ -94,15 +94,23 @@ class ReporterAgent(dashboard_utils.DashboardAgentModule, return reporter_pb2.GetProfilingStatsReply( profiling_stats=profiling_stats, std_out=stdout, std_err=stderr) - async def ReportOCMetrics(self, request, context): - # This function receives a GRPC containing OpenCensus (OC) metrics - # from a Ray process, then exposes those metrics to Prometheus. + async def ReportMetrics(self, request, context): + # NOTE: Exceptions are not propagated properly + # when we don't catch them here. try: - self._metrics_agent.record_metric_points_from_protobuf( - request.metrics) - except Exception: + metrcs_description_required = ( + self._metrics_agent.record_metrics_points( + request.metrics_points)) + except Exception as e: + logger.error(e) logger.error(traceback.format_exc()) - return reporter_pb2.ReportOCMetricsReply() + + # If metrics description is missing, we should notify cpp processes + # that we need them. Cpp processes will then report them to here. + # We need it when (1) a new metric is reported (application metric) + # (2) a reporter goes down and restarted (currently not implemented). + return reporter_pb2.ReportMetricsReply( + metrcs_description_required=metrcs_description_required) @staticmethod def _get_cpu_percent(): @@ -117,7 +125,8 @@ class ReporterAgent(dashboard_utils.DashboardAgentModule, try: gpus = gpustat.new_query().gpus except Exception as e: - logger.debug(f"gpustat failed to retrieve GPU information: {e}") + logger.debug( + "gpustat failed to retrieve GPU information: {}".format(e)) for gpu in gpus: # Note the keys in this dict have periods which throws # off javascript so we change .s to _s @@ -224,8 +233,12 @@ class ReporterAgent(dashboard_utils.DashboardAgentModule, "cmdline": self._get_raylet_cmdline(), } - async def _perform_iteration(self, aioredis_client): + async def _perform_iteration(self): """Get any changes to the log files and push updates to Redis.""" + aioredis_client = await aioredis.create_redis_pool( + address=self._dashboard_agent.redis_address, + password=self._dashboard_agent.redis_password) + while True: try: stats = self._get_all_stats() @@ -236,8 +249,5 @@ class ReporterAgent(dashboard_utils.DashboardAgentModule, reporter_consts.REPORTER_UPDATE_INTERVAL_MS / 1000) async def run(self, server): - aioredis_client = await aioredis.create_redis_pool( - address=self._dashboard_agent.redis_address, - password=self._dashboard_agent.redis_password) reporter_pb2_grpc.add_ReporterServiceServicer_to_server(self, server) - await self._perform_iteration(aioredis_client) + await self._perform_iteration() diff --git a/dashboard/modules/tune/tune_head.py b/dashboard/modules/tune/tune_head.py index 5d9736b22..3f10e5df6 100644 --- a/dashboard/modules/tune/tune_head.py +++ b/dashboard/modules/tune/tune_head.py @@ -130,7 +130,7 @@ class TuneController(dashboard_utils.DashboardHeadModule): # search through all the sub_directories in log directory analysis = Analysis(str(self._logdir)) - df = analysis.dataframe(metric=None, mode=None) + df = analysis.dataframe(metric="episode_reward_mean", mode="max") if len(df) == 0 or "trial_id" not in df.columns: return diff --git a/python/build-wheel-macos.sh b/python/build-wheel-macos.sh index a60b1d5d6..04fefa1f8 100755 --- a/python/build-wheel-macos.sh +++ b/python/build-wheel-macos.sh @@ -39,8 +39,7 @@ source "$HOME"/.nvm/nvm.sh nvm use node # Build the dashboard so its static assets can be included in the wheel. -# TODO(mfitton): switch this back when deleting old dashboard code. -pushd python/ray/new_dashboard/client +pushd python/ray/dashboard/client npm ci npm run build popd diff --git a/python/build-wheel-manylinux1.sh b/python/build-wheel-manylinux1.sh index 5972d0101..4855b5830 100755 --- a/python/build-wheel-manylinux1.sh +++ b/python/build-wheel-manylinux1.sh @@ -35,8 +35,7 @@ nvm install node nvm use node # Build the dashboard so its static assets can be included in the wheel. -# TODO(mfitton): switch this back when deleting old dashboard code. -pushd python/ray/new_dashboard/client +pushd python/ray/dashboard/client npm ci npm run build popd diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index 17651be6f..10b9803de 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -495,18 +495,10 @@ def start_ray_process(command, process.kill() raise - def _get_stream_name(stream): - if stream is not None: - try: - return stream.name - except AttributeError: - return str(stream) - return None - return ProcessInfo( process=process, - stdout_file=_get_stream_name(stdout_file), - stderr_file=_get_stream_name(stderr_file), + stdout_file=stdout_file.name if stdout_file is not None else None, + stderr_file=stderr_file.name if stderr_file is not None else None, use_valgrind=use_valgrind, use_gdb=use_gdb, use_valgrind_profiler=use_valgrind_profiler, @@ -1045,7 +1037,12 @@ def start_dashboard(require_dashboard, raise ValueError( f"The given dashboard port {port} is already in use") - dashboard_dir = "new_dashboard" + if "RAY_USE_NEW_DASHBOARD" in os.environ: + dashboard_dir = "new_dashboard" + else: + dashboard_dir = "dashboard" + logdir = None + dashboard_filepath = os.path.join(RAY_PATH, dashboard_dir, "dashboard.py") command = [ sys.executable, @@ -1061,12 +1058,12 @@ def start_dashboard(require_dashboard, if redis_password: command += ["--redis-password", redis_password] - dashboard_dependencies_present = True + webui_dependencies_present = True try: import aiohttp # noqa: F401 import grpc # noqa: F401 except ImportError: - dashboard_dependencies_present = False + webui_dependencies_present = False warning_message = ( "Failed to start the dashboard. The dashboard requires Python 3 " "as well as 'pip install aiohttp grpcio'.") @@ -1074,7 +1071,8 @@ def start_dashboard(require_dashboard, raise ImportError(warning_message) else: logger.warning(warning_message) - if dashboard_dependencies_present: + + if webui_dependencies_present: process_info = start_ray_process( command, ray_constants.PROCESS_TYPE_DASHBOARD, @@ -1321,13 +1319,12 @@ def start_raylet(redis_address, sys.executable, "-u", os.path.join(RAY_PATH, "new_dashboard/agent.py"), - f"--redis-address={redis_address}", - f"--metrics-export-port={metrics_export_port}", - f"--dashboard-agent-port={metrics_agent_port}", - f"--node-manager-port={node_manager_port}", - f"--object-store-name={plasma_store_name}", - f"--raylet-name={raylet_name}", - f"--temp-dir={temp_dir}", + "--redis-address={}".format(redis_address), + "--metrics-export-port={}".format(metrics_export_port), + "--node-manager-port={}".format(node_manager_port), + "--object-store-name={}".format(plasma_store_name), + "--raylet-name={}".format(raylet_name), + "--temp-dir={}".format(temp_dir), ] if redis_password is not None and len(redis_password) != 0: @@ -1360,8 +1357,9 @@ def start_raylet(redis_address, if start_initial_python_workers_for_first_job: command.append("--num_initial_python_workers_for_first_job={}".format( resource_spec.num_cpus)) - command.append("--agent_command={}".format( - subprocess.list2cmdline(agent_command))) + if "RAY_USE_NEW_DASHBOARD" in os.environ: + command.append("--agent_command={}".format( + subprocess.list2cmdline(agent_command))) if config.get("plasma_store_as_thread"): # command related to the plasma store command += [ diff --git a/python/ray/node.py b/python/ray/node.py index ff3dd4d5b..51dc143bd 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -626,14 +626,19 @@ class Node: if we fail to start the dashboard. Otherwise it will print a warning if we fail to start the dashboard. """ + if "RAY_USE_NEW_DASHBOARD" in os.environ: + stdout_file, stderr_file = None, None + else: + stdout_file, stderr_file = self.get_log_file_handles( + "dashboard", unique=True) self._webui_url, process_info = ray._private.services.start_dashboard( require_dashboard, self._ray_params.dashboard_host, self.redis_address, self._temp_dir, self._logs_dir, - stdout_file=subprocess.DEVNULL, # Avoid hang(fd inherit) - stderr_file=subprocess.DEVNULL, # Avoid hang(fd inherit) + stdout_file=stdout_file, + stderr_file=stderr_file, redis_password=self._ray_params.redis_password, fate_share=self.kernel_fate_share, port=self._ray_params.dashboard_port) @@ -823,6 +828,9 @@ class Node: ) self.start_plasma_store(plasma_directory, object_store_memory) self.start_raylet(plasma_directory, object_store_memory) + if "RAY_USE_NEW_DASHBOARD" not in os.environ: + self.start_reporter() + if self._ray_params.include_log_monitor: self.start_log_monitor() diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 2739caebe..9fc69cfd2 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -93,6 +93,7 @@ py_test_module_list( "test_queue.py", "test_ray_init.py", "test_tempfile.py", + "test_webui.py", ], size = "small", extra_srcs = SRCS, diff --git a/python/ray/tests/test_metrics.py b/python/ray/tests/test_metrics.py index 5b97adc93..e17c77ed0 100644 --- a/python/ray/tests/test_metrics.py +++ b/python/ray/tests/test_metrics.py @@ -1,19 +1,27 @@ import os +import json import grpc +import pytest import requests import time +import numpy as np import ray from ray.core.generated import node_manager_pb2 from ray.core.generated import node_manager_pb2_grpc +from ray.core.generated import reporter_pb2 +from ray.core.generated import reporter_pb2_grpc +from ray.dashboard.memory import (ReferenceType, decode_object_ref_if_needed, + MemoryTableEntry, MemoryTable, SortingType) from ray.test_utils import (RayTestTimeoutException, - wait_until_succeeded_without_exception) + wait_until_succeeded_without_exception, + wait_until_server_available, wait_for_condition) import psutil # We must import psutil after ray because we bundle it with ray. def test_worker_stats(shutdown_only): - ray.init(num_cpus=1, include_dashboard=True) + addresses = ray.init(num_cpus=1, include_dashboard=True) raylet = ray.nodes()[0] num_cpus = raylet["Resources"]["CPU"] raylet_address = "{}:{}".format(raylet["NodeManagerAddress"], @@ -96,6 +104,8 @@ def test_worker_stats(shutdown_only): # Check that the rest of the processes are workers, 1 for each CPU. assert len(reply.workers_stats) == num_cpus + 1 + views = [view.view_name for view in reply.view_data] + assert "local_available_resource" in views # Check that all processes are Python. pids = [worker.pid for worker in reply.workers_stats] processes = [ @@ -109,6 +119,248 @@ def test_worker_stats(shutdown_only): or "runner" in process or "ray" in process) break + # Test kill_actor. + def actor_killed(PID): + """Check For the existence of a unix pid.""" + try: + os.kill(PID, 0) + except OSError: + return True + else: + return False + + assert (wait_until_server_available(addresses["webui_url"]) is True) + + webui_url = addresses["webui_url"] + webui_url = webui_url.replace("127.0.0.1", "http://127.0.0.1") + for worker in reply.workers_stats: + if worker.is_driver: + continue + requests.get( + webui_url + "/api/kill_actor", + params={ + "actor_id": ray.utils.binary_to_hex( + worker.core_worker_stats.actor_id), + "ip_address": worker.core_worker_stats.ip_address, + "port": worker.core_worker_stats.port + }) + timeout_seconds = 20 + start_time = time.time() + while True: + if time.time() - start_time > timeout_seconds: + raise RayTestTimeoutException("Timed out while killing actors") + if all( + actor_killed(worker.pid) for worker in reply.workers_stats + if not worker.is_driver): + break + + +def test_raylet_info_endpoint(shutdown_only): + addresses = ray.init(include_dashboard=True, num_cpus=6) + + @ray.remote + def f(): + return "test" + + @ray.remote(num_cpus=1) + class ActorA: + def __init__(self): + pass + + @ray.remote(resources={"CustomResource": 1}) + class ActorB: + def __init__(self): + pass + + @ray.remote(num_cpus=2) + class ActorC: + def __init__(self): + self.children = [ActorA.remote(), ActorB.remote()] + + def local_store(self): + self.local_storage = [f.remote() for _ in range(10)] + + def remote_store(self): + self.remote_storage = ray.put(np.zeros(200 * 1024, dtype=np.uint8)) + + def getpid(self): + return os.getpid() + + c = ActorC.remote() + actor_pid = ray.get(c.getpid.remote()) + c.local_store.remote() + c.remote_store.remote() + + assert (wait_until_server_available(addresses["webui_url"]) is True) + + start_time = time.time() + while True: + time.sleep(1) + try: + webui_url = addresses["webui_url"] + webui_url = webui_url.replace("127.0.0.1", "http://127.0.0.1") + response = requests.get(webui_url + "/api/raylet_info") + response.raise_for_status() + try: + raylet_info = response.json() + except Exception as ex: + print("failed response: {}".format(response.text)) + raise ex + actor_groups = raylet_info["result"]["actorGroups"] + try: + assert len(actor_groups.keys()) == 3 + c_actor_info = actor_groups["ActorC"]["entries"][0] + assert c_actor_info["numObjectRefsInScope"] == 13 + assert c_actor_info["numLocalObjects"] == 10 + break + except AssertionError: + if time.time() > start_time + 30: + raise Exception("Timed out while waiting for actor info \ + or object store info update.") + except requests.exceptions.ConnectionError: + if time.time() > start_time + 30: + raise Exception( + "Timed out while waiting for dashboard to start.") + + def cpu_resources(actor_info): + cpu_resources = 0 + for slot in actor_info["usedResources"]["CPU"]["resourceSlots"]: + cpu_resources += slot["allocation"] + return cpu_resources + + assert cpu_resources(c_actor_info) == 2 + assert c_actor_info["numExecutedTasks"] == 4 + + profiling_id = requests.get( + webui_url + "/api/launch_profiling", + params={ + "node_id": ray.nodes()[0]["NodeID"], + "pid": actor_pid, + "duration": 5 + }).json()["result"] + start_time = time.time() + while True: + # Sometimes some startup time is required + if time.time() - start_time > 30: + raise RayTestTimeoutException( + "Timed out while collecting profiling stats.") + profiling_info = requests.get( + webui_url + "/api/check_profiling_status", + params={ + "profiling_id": profiling_id, + }).json() + status = profiling_info["result"]["status"] + assert status in ("finished", "pending", "error") + if status in ("finished", "error"): + break + time.sleep(1) + + +def test_raylet_infeasible_tasks(shutdown_only): + """ + This test creates an actor that requires 5 GPUs + but a ray cluster only has 3 GPUs. As a result, + the new actor should be an infeasible actor. + """ + addresses = ray.init(num_gpus=3) + + @ray.remote(num_gpus=5) + class ActorRequiringGPU: + def __init__(self): + pass + + ActorRequiringGPU.remote() + + def test_infeasible_actor(ray_addresses): + assert (wait_until_server_available(addresses["webui_url"]) is True) + webui_url = ray_addresses["webui_url"].replace("127.0.0.1", + "http://127.0.0.1") + raylet_info = requests.get(webui_url + "/api/raylet_info").json() + actor_info = raylet_info["result"]["actorGroups"] + assert len(actor_info) == 1 + + _, infeasible_actor_info = actor_info.popitem() + assert infeasible_actor_info["entries"][0]["state"] == -2 + + assert (wait_until_succeeded_without_exception( + test_infeasible_actor, + (AssertionError, requests.exceptions.ConnectionError), + addresses, + timeout_ms=30000, + retry_interval_ms=1000) is True) + + +def test_raylet_pending_tasks(shutdown_only): + # Make sure to specify num_cpus. Otherwise, the test can be broken + # when the number of cores is less than the number of spawned actors. + addresses = ray.init(num_gpus=3, num_cpus=4) + + @ray.remote(num_gpus=1) + class ActorRequiringGPU: + def __init__(self): + pass + + @ray.remote + class ParentActor: + def __init__(self): + self.a = [ActorRequiringGPU.remote() for i in range(4)] + + # If we do not get ParentActor actor handler, reference counter will + # terminate ParentActor. + parent_actor = ParentActor.remote() + assert parent_actor is not None + + def test_pending_actor(ray_addresses): + assert (wait_until_server_available(addresses["webui_url"]) is True) + webui_url = ray_addresses["webui_url"].replace("127.0.0.1", + "http://127.0.0.1") + raylet_info = requests.get(webui_url + "/api/raylet_info").json() + actor_info = raylet_info["result"]["actors"] + assert len(actor_info) == 1 + _, infeasible_actor_info = actor_info.popitem() + wait_until_succeeded_without_exception( + test_pending_actor, + (AssertionError, requests.exceptions.ConnectionError), + addresses, + timeout_ms=30000, + retry_interval_ms=1000) + + +@pytest.mark.skipif( + os.environ.get("TRAVIS") is None, + reason="This test requires password-less sudo due to py-spy requirement.") +def test_profiling_info_endpoint(shutdown_only): + ray.init(num_cpus=1) + + redis_client = ray.worker.global_worker.redis_client + + node_ip = ray.nodes()[0]["NodeManagerAddress"] + + while True: + reporter_port = redis_client.get("REPORTER_PORT:{}".format(node_ip)) + if reporter_port: + break + + reporter_channel = grpc.insecure_channel("{}:{}".format( + node_ip, int(reporter_port))) + reporter_stub = reporter_pb2_grpc.ReporterServiceStub(reporter_channel) + + @ray.remote(num_cpus=1) + class ActorA: + def __init__(self): + pass + + def getpid(self): + return os.getpid() + + a = ActorA.remote() + actor_pid = ray.get(a.getpid.remote()) + + reply = reporter_stub.GetProfilingStats( + reporter_pb2.GetProfilingStatsRequest(pid=actor_pid, duration=10)) + profiling_stats = json.loads(reply.profiling_stats) + assert profiling_stats is not None + def test_multi_node_metrics_export_port_discovery(ray_start_cluster): NUM_NODES = 3 @@ -138,7 +390,438 @@ def test_multi_node_metrics_export_port_discovery(ray_start_cluster): test_prometheus_endpoint, (requests.exceptions.ConnectionError, )) +# This variable is used inside test_memory_dashboard. +# It is defined as a global variable to be used across all nested test +# functions. We use it because memory table is updated every one second, +# and we need to have a way to verify if the test is running with a fresh +# new memory table. +prev_memory_table = MemoryTable([]).__dict__()["group"] + + +def test_memory_dashboard(shutdown_only): + """Test Memory table. + + These tests verify examples in this document. + https://docs.ray.io/en/master/memory-management.html#debugging-using-ray-memory + """ + addresses = ray.init(num_cpus=2) + webui_url = addresses["webui_url"].replace("127.0.0.1", "http://127.0.0.1") + assert (wait_until_server_available(addresses["webui_url"]) is True) + + def get_memory_table(): + memory_table = requests.get(webui_url + "/api/memory_table").json() + return memory_table["result"] + + def memory_table_ready(): + """Wait until the new fresh memory table is ready.""" + global prev_memory_table + memory_table = get_memory_table() + is_ready = memory_table["group"] != prev_memory_table + prev_memory_table = memory_table["group"] + return is_ready + + def stop_memory_table(): + requests.get(webui_url + "/api/stop_memory_table").json() + + def test_local_reference(): + @ray.remote + def f(arg): + return arg + + # a and b are local references. + a = ray.put(None) # Noqa F841 + b = f.remote(None) # Noqa F841 + + wait_for_condition(memory_table_ready) + memory_table = get_memory_table() + summary = memory_table["summary"] + group = memory_table["group"] + assert summary["total_captured_in_objects"] == 0 + assert summary["total_pinned_in_memory"] == 0 + assert summary["total_used_by_pending_task"] == 0 + assert summary["total_local_ref_count"] == 2 + for table in group.values(): + for entry in table["entries"]: + assert ( + entry["reference_type"] == ReferenceType.LOCAL_REFERENCE) + stop_memory_table() + return True + + def test_object_pinned_in_memory(): + + a = ray.put(np.zeros(200 * 1024, dtype=np.uint8)) + b = ray.get(a) # Noqa F841 + del a + + wait_for_condition(memory_table_ready) + memory_table = get_memory_table() + summary = memory_table["summary"] + group = memory_table["group"] + assert summary["total_captured_in_objects"] == 0 + assert summary["total_pinned_in_memory"] == 1 + assert summary["total_used_by_pending_task"] == 0 + assert summary["total_local_ref_count"] == 0 + for table in group.values(): + for entry in table["entries"]: + assert ( + entry["reference_type"] == ReferenceType.PINNED_IN_MEMORY) + stop_memory_table() + return True + + def test_pending_task_references(): + @ray.remote + def f(arg): + time.sleep(1) + + a = ray.put(np.zeros(200 * 1024, dtype=np.uint8)) + b = f.remote(a) + + wait_for_condition(memory_table_ready) + memory_table = get_memory_table() + summary = memory_table["summary"] + assert summary["total_captured_in_objects"] == 0 + assert summary["total_pinned_in_memory"] == 1 + assert summary["total_used_by_pending_task"] == 1 + assert summary["total_local_ref_count"] == 1 + # Make sure the function f is done before going to the next test. + # Otherwise, the memory table will be corrupted because the + # task f won't be done when the next test is running. + ray.get(b) + stop_memory_table() + return True + + def test_serialized_object_ref_reference(): + @ray.remote + def f(arg): + time.sleep(1) + + a = ray.put(None) + b = f.remote([a]) # Noqa F841 + + wait_for_condition(memory_table_ready) + memory_table = get_memory_table() + summary = memory_table["summary"] + assert summary["total_captured_in_objects"] == 0 + assert summary["total_pinned_in_memory"] == 0 + assert summary["total_used_by_pending_task"] == 1 + assert summary["total_local_ref_count"] == 2 + # Make sure the function f is done before going to the next test. + # Otherwise, the memory table will be corrupted because the + # task f won't be done when the next test is running. + ray.get(b) + stop_memory_table() + return True + + def test_captured_object_ref_reference(): + a = ray.put(None) + b = ray.put([a]) # Noqa F841 + del a + + wait_for_condition(memory_table_ready) + memory_table = get_memory_table() + summary = memory_table["summary"] + assert summary["total_captured_in_objects"] == 1 + assert summary["total_pinned_in_memory"] == 0 + assert summary["total_used_by_pending_task"] == 0 + assert summary["total_local_ref_count"] == 1 + stop_memory_table() + return True + + def test_actor_handle_reference(): + @ray.remote + class Actor: + pass + + a = Actor.remote() # Noqa F841 + b = Actor.remote() # Noqa F841 + c = Actor.remote() # Noqa F841 + + wait_for_condition(memory_table_ready) + memory_table = get_memory_table() + summary = memory_table["summary"] + group = memory_table["group"] + assert summary["total_captured_in_objects"] == 0 + assert summary["total_pinned_in_memory"] == 0 + assert summary["total_used_by_pending_task"] == 0 + assert summary["total_local_ref_count"] == 0 + assert summary["total_actor_handles"] == 3 + for table in group.values(): + for entry in table["entries"]: + assert (entry["reference_type"] == ReferenceType.ACTOR_HANDLE) + stop_memory_table() + return True + + # These tests should be retried because it takes at least one second + # to get the fresh new memory table. It is because memory table is updated + # Whenever raylet and node info is renewed which takes 1 second. + wait_for_condition( + test_local_reference, timeout=30000, retry_interval_ms=1000) + + wait_for_condition( + test_object_pinned_in_memory, timeout=30000, retry_interval_ms=1000) + + wait_for_condition( + test_pending_task_references, timeout=30000, retry_interval_ms=1000) + + wait_for_condition( + test_serialized_object_ref_reference, + timeout=30000, + retry_interval_ms=1000) + + wait_for_condition( + test_captured_object_ref_reference, + timeout=30000, + retry_interval_ms=1000) + + wait_for_condition( + test_actor_handle_reference, timeout=30000, retry_interval_ms=1000) + + +"""Memory Table Unit Test""" + +NODE_ADDRESS = "127.0.0.1" +IS_DRIVER = True +PID = 1 +OBJECT_ID = "7wpsIhgZiBz/////AQAAyAEAAAA=" +ACTOR_ID = "fffffffffffffffff66d17ba010000c801000000" +DECODED_ID = decode_object_ref_if_needed(OBJECT_ID) +OBJECT_SIZE = 100 + + +def build_memory_entry(*, + local_ref_count, + pinned_in_memory, + submitted_task_reference_count, + contained_in_owned, + object_size, + pid, + object_id=OBJECT_ID, + node_address=NODE_ADDRESS): + object_ref = { + "objectId": object_id, + "callSite": "(task call) /Users:458", + "objectSize": object_size, + "localRefCount": local_ref_count, + "pinnedInMemory": pinned_in_memory, + "submittedTaskRefCount": submitted_task_reference_count, + "containedInOwned": contained_in_owned + } + return MemoryTableEntry( + object_ref=object_ref, + node_address=node_address, + is_driver=IS_DRIVER, + pid=pid) + + +def build_local_reference_entry(object_size=OBJECT_SIZE, + pid=PID, + node_address=NODE_ADDRESS): + return build_memory_entry( + local_ref_count=1, + pinned_in_memory=False, + submitted_task_reference_count=0, + contained_in_owned=[], + object_size=object_size, + pid=pid, + node_address=node_address) + + +def build_used_by_pending_task_entry(object_size=OBJECT_SIZE, + pid=PID, + node_address=NODE_ADDRESS): + return build_memory_entry( + local_ref_count=0, + pinned_in_memory=False, + submitted_task_reference_count=2, + contained_in_owned=[], + object_size=object_size, + pid=pid, + node_address=node_address) + + +def build_captured_in_object_entry(object_size=OBJECT_SIZE, + pid=PID, + node_address=NODE_ADDRESS): + return build_memory_entry( + local_ref_count=0, + pinned_in_memory=False, + submitted_task_reference_count=0, + contained_in_owned=[OBJECT_ID], + object_size=object_size, + pid=pid, + node_address=node_address) + + +def build_actor_handle_entry(object_size=OBJECT_SIZE, + pid=PID, + node_address=NODE_ADDRESS): + return build_memory_entry( + local_ref_count=1, + pinned_in_memory=False, + submitted_task_reference_count=0, + contained_in_owned=[], + object_size=object_size, + pid=pid, + node_address=node_address, + object_id=ACTOR_ID) + + +def build_pinned_in_memory_entry(object_size=OBJECT_SIZE, + pid=PID, + node_address=NODE_ADDRESS): + return build_memory_entry( + local_ref_count=0, + pinned_in_memory=True, + submitted_task_reference_count=0, + contained_in_owned=[], + object_size=object_size, + pid=pid, + node_address=node_address) + + +def build_entry(object_size=OBJECT_SIZE, + pid=PID, + node_address=NODE_ADDRESS, + reference_type=ReferenceType.PINNED_IN_MEMORY): + if reference_type == ReferenceType.USED_BY_PENDING_TASK: + return build_used_by_pending_task_entry( + pid=pid, object_size=object_size, node_address=node_address) + elif reference_type == ReferenceType.LOCAL_REFERENCE: + return build_local_reference_entry( + pid=pid, object_size=object_size, node_address=node_address) + elif reference_type == ReferenceType.PINNED_IN_MEMORY: + return build_pinned_in_memory_entry( + pid=pid, object_size=object_size, node_address=node_address) + elif reference_type == ReferenceType.ACTOR_HANDLE: + return build_actor_handle_entry( + pid=pid, object_size=object_size, node_address=node_address) + elif reference_type == ReferenceType.CAPTURED_IN_OBJECT: + return build_captured_in_object_entry( + pid=pid, object_size=object_size, node_address=node_address) + + +def test_invalid_memory_entry(): + memory_entry = build_memory_entry( + local_ref_count=0, + pinned_in_memory=False, + submitted_task_reference_count=0, + contained_in_owned=[], + object_size=OBJECT_SIZE, + pid=PID) + assert memory_entry.is_valid() is False + memory_entry = build_memory_entry( + local_ref_count=0, + pinned_in_memory=False, + submitted_task_reference_count=0, + contained_in_owned=[], + object_size=-1, + pid=PID) + assert memory_entry.is_valid() is False + + +def test_valid_reference_memory_entry(): + memory_entry = build_local_reference_entry() + assert memory_entry.reference_type == ReferenceType.LOCAL_REFERENCE + assert memory_entry.object_ref == ray.ObjectRef( + decode_object_ref_if_needed(OBJECT_ID)) + assert memory_entry.is_valid() is True + + +def test_reference_type(): + # pinned in memory + memory_entry = build_pinned_in_memory_entry() + assert memory_entry.reference_type == ReferenceType.PINNED_IN_MEMORY + + # used by pending task + memory_entry = build_used_by_pending_task_entry() + assert memory_entry.reference_type == ReferenceType.USED_BY_PENDING_TASK + + # captued in object + memory_entry = build_captured_in_object_entry() + assert memory_entry.reference_type == ReferenceType.CAPTURED_IN_OBJECT + + # actor handle + memory_entry = build_actor_handle_entry() + assert memory_entry.reference_type == ReferenceType.ACTOR_HANDLE + + +def test_memory_table_summary(): + entries = [ + build_pinned_in_memory_entry(), + build_used_by_pending_task_entry(), + build_captured_in_object_entry(), + build_actor_handle_entry(), + build_local_reference_entry(), + build_local_reference_entry() + ] + memory_table = MemoryTable(entries) + assert len(memory_table.group) == 1 + assert memory_table.summary["total_actor_handles"] == 1 + assert memory_table.summary["total_captured_in_objects"] == 1 + assert memory_table.summary["total_local_ref_count"] == 2 + assert memory_table.summary[ + "total_object_size"] == len(entries) * OBJECT_SIZE + assert memory_table.summary["total_pinned_in_memory"] == 1 + assert memory_table.summary["total_used_by_pending_task"] == 1 + + +def test_memory_table_sort_by_pid(): + unsort = [1, 3, 2] + entries = [build_entry(pid=pid) for pid in unsort] + memory_table = MemoryTable(entries, sort_by_type=SortingType.PID) + sort = sorted(unsort) + for pid, entry in zip(sort, memory_table.table): + assert pid == entry.pid + + +def test_memory_table_sort_by_reference_type(): + unsort = [ + ReferenceType.USED_BY_PENDING_TASK, ReferenceType.LOCAL_REFERENCE, + ReferenceType.LOCAL_REFERENCE, ReferenceType.PINNED_IN_MEMORY + ] + entries = [ + build_entry(reference_type=reference_type) for reference_type in unsort + ] + memory_table = MemoryTable( + entries, sort_by_type=SortingType.REFERENCE_TYPE) + sort = sorted(unsort) + for reference_type, entry in zip(sort, memory_table.table): + assert reference_type == entry.reference_type + + +def test_memory_table_sort_by_object_size(): + unsort = [312, 214, -1, 1244, 642] + entries = [build_entry(object_size=object_size) for object_size in unsort] + memory_table = MemoryTable(entries, sort_by_type=SortingType.OBJECT_SIZE) + sort = sorted(unsort) + for object_size, entry in zip(sort, memory_table.table): + assert object_size == entry.object_size + + +def test_group_by(): + node_second = "127.0.0.2" + node_first = "127.0.0.1" + entries = [ + build_entry(node_address=node_second, pid=2), + build_entry(node_address=node_second, pid=1), + build_entry(node_address=node_first, pid=2), + build_entry(node_address=node_first, pid=1) + ] + memory_table = MemoryTable(entries) + + # Make sure it is correctly grouped + assert node_first in memory_table.group + assert node_second in memory_table.group + + # make sure pid is sorted in the right order. + for group_key, group_memory_table in memory_table.group.items(): + pid = 1 + for entry in group_memory_table.table: + assert pid == entry.pid + pid += 1 + + if __name__ == "__main__": - import sys import pytest + import sys sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/tests/test_webui.py b/python/ray/tests/test_webui.py new file mode 100644 index 000000000..011993af3 --- /dev/null +++ b/python/ray/tests/test_webui.py @@ -0,0 +1,50 @@ +import re +import sys +import time + +import pytest +import requests + +import ray + + +@pytest.mark.skipif( + sys.version_info < (3, 5, 3), reason="requires python3.5.3 or higher") +def test_get_webui(shutdown_only): + addresses = ray.init(include_dashboard=True, num_cpus=1) + webui_url = addresses["webui_url"] + assert ray.get_dashboard_url() == webui_url + + assert re.match(r"^(localhost|\d+\.\d+\.\d+\.\d+):\d+$", webui_url) + + start_time = time.time() + while True: + try: + node_info = requests.get("http://" + webui_url + + "/api/node_info").json() + break + except requests.exceptions.ConnectionError: + if time.time() > start_time + 30: + error_log = None + out_log = None + with open( + "{}/logs/dashboard.out".format( + addresses["session_dir"]), "r") as f: + out_log = f.read() + with open( + "{}/logs/dashboard.err".format( + addresses["session_dir"]), "r") as f: + error_log = f.read() + raise Exception( + "Timed out while waiting for dashboard to start. " + "Dashboard output log: {}\n" + "Dashboard error log: {}\n".format(out_log, error_log)) + assert node_info["error"] is None + assert node_info["result"] is not None + assert isinstance(node_info["timestamp"], float) + + +if __name__ == "__main__": + import pytest + import sys + sys.exit(pytest.main(["-v", __file__]))