Collect more usage stats data (#23167)

This commit is contained in:
Jiajun Yao 2022-03-17 19:33:27 -07:00 committed by GitHub
parent b4bc8809dc
commit 62a5404369
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 388 additions and 44 deletions

View file

@ -1,6 +1,8 @@
import os
import asyncio
import logging
import random
from concurrent.futures import ThreadPoolExecutor
import ray
@ -19,6 +21,9 @@ class UsageStatsHead(dashboard_utils.DashboardHeadModule):
ray.experimental.internal_kv.internal_kv_get_gcs_client(),
num_retries=20,
)
self.cluster_config_to_report = ray_usage_lib.get_cluster_config_to_report(
os.path.expanduser("~/ray_bootstrap_config.yaml")
)
self.session_dir = dashboard_head.session_dir
self.client = ray_usage_lib.UsageReportClient()
# The total number of report succeeded.
@ -28,19 +33,20 @@ class UsageStatsHead(dashboard_utils.DashboardHeadModule):
# The seq number of report. It increments whenever a new report is sent.
self.seq_no = 0
async def _report_usage(self):
if not ray_usage_lib._usage_stats_enabled():
return
def _report_usage_sync(self):
"""
- Always write usage_stats.json regardless of report success/failure.
- If report fails, the error message should be written to usage_stats.json
- If file write fails, the error will just stay at dashboard.log.
usage_stats.json won't be written.
"""
if not ray_usage_lib._usage_stats_enabled():
return
try:
data = ray_usage_lib.generate_report_data(
self.cluster_metadata,
self.cluster_config_to_report,
self.total_success,
self.total_failed,
self.seq_no,
@ -48,7 +54,7 @@ class UsageStatsHead(dashboard_utils.DashboardHeadModule):
error = None
try:
await self.client.report_usage_data_async(
self.client.report_usage_data(
ray_usage_lib._usage_stats_report_url(), data
)
except Exception as e:
@ -61,15 +67,22 @@ class UsageStatsHead(dashboard_utils.DashboardHeadModule):
self.seq_no += 1
data = ray_usage_lib.generate_write_data(data, error)
await self.client.write_usage_data_async(data, self.session_dir)
self.client.write_usage_data(data, self.session_dir)
except Exception as e:
logger.exception(e)
logger.info(f"Usage report failed: {e}")
async def _report_usage_async(self):
if not ray_usage_lib._usage_stats_enabled():
return
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(max_workers=1) as executor:
await loop.run_in_executor(executor, self._report_usage_sync)
@async_loop_forever(ray_usage_lib._usage_stats_report_interval_s())
async def periodically_report_usage(self):
await self._report_usage()
await self._report_usage_async()
async def run(self, server):
if not ray_usage_lib._usage_stats_enabled():
@ -77,7 +90,7 @@ class UsageStatsHead(dashboard_utils.DashboardHeadModule):
return
else:
logger.info("Usage reporting is enabled.")
await self._report_usage()
await self._report_usage_async()
# Add a random offset before the first report to remove sample bias.
await asyncio.sleep(
random.randint(0, ray_usage_lib._usage_stats_report_interval_s())

View file

@ -33,7 +33,7 @@ Or `RAY_USAGE_STATS_ENABLED=1 python [drivers with ray.init()]`.
"Ray API server (currently a dashboard server)" reports the usage data to https://usage-stats.ray.io/.
Data is reported ever hour by default.
Data is reported every hour by default.
Note that it is also possible to configure the interval using the environment variable,
`RAY_USAGE_STATS_REPORT_INTERVAL_S`.
@ -41,16 +41,16 @@ Note that it is also possible to configure the interval using the environment va
To see collected/reported data, see `usage_stats.json` inside a temp
folder (e.g., /tmp/ray/session_[id]/*).
"""
import asyncio
import os
import uuid
import sys
import json
import logging
import time
import yaml
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, asdict
from typing import Optional, List
from pathlib import Path
import ray
@ -66,6 +66,23 @@ logger = logging.getLogger(__name__)
#################
@dataclass(init=True)
class ClusterConfigToReport:
cloud_provider: Optional[str] = None
min_workers: Optional[int] = None
max_workers: Optional[int] = None
head_node_instance_type: Optional[str] = None
worker_node_instance_types: Optional[List[str]] = None
@dataclass(init=True)
class ClusterStatusToReport:
total_num_cpus: Optional[int] = None
total_num_gpus: Optional[int] = None
total_memory_gb: Optional[float] = None
total_object_store_memory_gb: Optional[float] = None
@dataclass(init=True)
class UsageStatsToReport:
"""Usage stats to report"""
@ -79,6 +96,15 @@ class UsageStatsToReport:
os: str
collect_timestamp_ms: int
session_start_timestamp_ms: int
cloud_provider: Optional[str]
min_workers: Optional[int]
max_workers: Optional[int]
head_node_instance_type: Optional[str]
worker_node_instance_types: Optional[List[str]]
total_num_cpus: Optional[int]
total_num_gpus: Optional[int]
total_memory_gb: Optional[float]
total_object_store_memory_gb: Optional[float]
# The total number of successful reports for the lifetime of the cluster.
total_success: int
# The total number of failed reports for the lifetime of the cluster.
@ -182,6 +208,131 @@ def put_cluster_metadata(gcs_client, num_retries) -> None:
return metadata
def get_cluster_status_to_report(gcs_client, num_retries) -> ClusterStatusToReport:
"""Get the current status of this cluster.
It is a blocking API.
Params:
gcs_client (GCSClient): The GCS client to perform KV operation GET.
num_retries (int): Max number of times to retry if GET fails.
Returns:
The current cluster status or empty if it fails to get that information.
"""
try:
cluster_status = ray._private.utils.internal_kv_get_with_retry(
gcs_client,
ray.ray_constants.DEBUG_AUTOSCALING_STATUS,
namespace=None,
num_retries=num_retries,
)
if not cluster_status:
return ClusterStatusToReport()
result = ClusterStatusToReport()
to_GiB = 1 / 2 ** 30
cluster_status = json.loads(cluster_status.decode("utf-8"))
if (
"load_metrics_report" not in cluster_status
or "usage" not in cluster_status["load_metrics_report"]
):
return ClusterStatusToReport()
usage = cluster_status["load_metrics_report"]["usage"]
# usage is a map from resource to (used, total) pair
if "CPU" in usage:
result.total_num_cpus = int(usage["CPU"][1])
if "GPU" in usage:
result.total_num_gpus = int(usage["GPU"][1])
if "memory" in usage:
result.total_memory_gb = usage["memory"][1] * to_GiB
if "object_store_memory" in usage:
result.total_object_store_memory_gb = (
usage["object_store_memory"][1] * to_GiB
)
return result
except Exception as e:
logger.info(f"Failed to get cluster status to report {e}")
return ClusterStatusToReport()
def get_cluster_config_to_report(cluster_config_file_path) -> ClusterConfigToReport:
"""Get the static cluster (autoscaler) config used to launch this cluster.
Params:
cluster_config_file_path (str): The file path to the cluster config file.
Returns:
The cluster (autoscaler) config or empty if it fails to get that information.
"""
def get_instance_type(node_config):
if not node_config:
return None
if "InstanceType" in node_config:
# aws
return node_config["InstanceType"]
if "machineType" in node_config:
# gcp
return node_config["machineType"]
if (
"azure_arm_parameters" in node_config
and "vmSize" in node_config["azure_arm_parameters"]
):
return node_config["azure_arm_parameters"]["vmSize"]
return None
try:
with open(cluster_config_file_path) as f:
config = yaml.safe_load(f)
result = ClusterConfigToReport()
if "min_workers" in config:
result.min_workers = config["min_workers"]
if "max_workers" in config:
result.max_workers = config["max_workers"]
if "provider" in config and "type" in config["provider"]:
result.cloud_provider = config["provider"]["type"]
if "head_node_type" not in config:
return result
if "available_node_types" not in config:
return result
head_node_type = config["head_node_type"]
available_node_types = config["available_node_types"]
for available_node_type in available_node_types:
if available_node_type == head_node_type:
head_node_instance_type = get_instance_type(
available_node_types[available_node_type].get("node_config")
)
if head_node_instance_type:
result.head_node_instance_type = head_node_instance_type
else:
worker_node_instance_type = get_instance_type(
available_node_types[available_node_type].get("node_config")
)
if worker_node_instance_type:
result.worker_node_instance_types = (
result.worker_node_instance_types or set()
)
result.worker_node_instance_types.add(worker_node_instance_type)
if result.worker_node_instance_types:
result.worker_node_instance_types = list(
result.worker_node_instance_types
)
return result
except FileNotFoundError:
# It's a manually started cluster or k8s cluster
result = ClusterConfigToReport()
if "KUBERNETES_SERVICE_HOST" in os.environ:
result.cloud_provider = "kubernetes"
return result
except Exception as e:
logger.info(f"Failed to get cluster config to report {e}")
return ClusterConfigToReport()
def get_cluster_metadata(gcs_client, num_retries) -> dict:
"""Get the cluster metadata from GCS.
@ -210,13 +361,19 @@ def get_cluster_metadata(gcs_client, num_retries) -> dict:
def generate_report_data(
cluster_metadata: dict, total_success: int, total_failed: int, seq_number: int
cluster_metadata: dict,
cluster_config_to_report: ClusterConfigToReport,
total_success: int,
total_failed: int,
seq_number: int,
) -> UsageStatsToReport:
"""Generate the report data.
Params:
cluster_metadata (dict): The cluster metadata of the system generated by
`_generate_cluster_metadata`.
cluster_config_to_report (ClusterConfigToReport): The cluster (autoscaler)
config generated by `get_cluster_config_to_report`.
total_success(int): The total number of successful report
for the lifetime of the cluster.
total_failed(int): The total number of failed report
@ -227,6 +384,10 @@ def generate_report_data(
Returns:
UsageStats
"""
cluster_status_to_report = get_cluster_status_to_report(
ray.experimental.internal_kv.internal_kv_get_gcs_client(),
num_retries=20,
)
data = UsageStatsToReport(
ray_version=cluster_metadata["ray_version"],
python_version=cluster_metadata["python_version"],
@ -237,6 +398,15 @@ def generate_report_data(
os=cluster_metadata["os"],
collect_timestamp_ms=int(time.time() * 1000),
session_start_timestamp_ms=cluster_metadata["session_start_timestamp_ms"],
cloud_provider=cluster_config_to_report.cloud_provider,
min_workers=cluster_config_to_report.min_workers,
max_workers=cluster_config_to_report.max_workers,
head_node_instance_type=cluster_config_to_report.head_node_instance_type,
worker_node_instance_types=cluster_config_to_report.worker_node_instance_types,
total_num_cpus=cluster_status_to_report.total_num_cpus,
total_num_gpus=cluster_status_to_report.total_num_gpus,
total_memory_gb=cluster_status_to_report.total_memory_gb,
total_object_store_memory_gb=cluster_status_to_report.total_object_store_memory_gb, # noqa: E501
total_success=total_success,
total_failed=total_failed,
seq_number=seq_number,
@ -272,7 +442,7 @@ class UsageReportClient:
and report usage stats.
"""
def _write_usage_data(self, data: UsageStatsToWrite, dir_path: str) -> None:
def write_usage_data(self, data: UsageStatsToWrite, dir_path: str) -> None:
"""Write the usage data to the directory.
Params:
@ -294,7 +464,7 @@ class UsageReportClient:
destination.unlink(missing_ok=True)
temp.rename(destination)
def _report_usage_data(self, url: str, data: UsageStatsToReport) -> None:
def report_usage_data(self, url: str, data: UsageStatsToReport) -> None:
"""Report the usage data to the usage server.
Params:
@ -316,26 +486,3 @@ class UsageReportClient:
)
r.raise_for_status()
return r
async def write_usage_data_async(
self, data: UsageStatsToWrite, dir_path: str
) -> None:
"""Asynchronously write the data to the `dir_path`.
It uses a thread pool to implement asynchronous write.
https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor
"""
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(max_workers=1) as executor:
await loop.run_in_executor(executor, self._write_usage_data, data, dir_path)
async def report_usage_data_async(self, url: str, data: UsageStatsToReport) -> None:
"""Asynchronously report the data to the `url`.
It uses a thread pool to implement asynchronous write
instead of using dedicated library such as httpx
since that's too heavy dependency.
"""
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(max_workers=1) as executor:
await loop.run_in_executor(executor, self._report_usage_data, url, data)

View file

@ -13,6 +13,7 @@ from jsonschema import validate
import ray._private.usage.usage_lib as ray_usage_lib
import ray._private.usage.usage_constants as usage_constants
from ray._private.usage.usage_lib import ClusterConfigToReport
from ray._private.test_utils import wait_for_condition
@ -29,6 +30,18 @@ schema = {
"python_version": {"type": "string"},
"collect_timestamp_ms": {"type": "integer"},
"session_start_timestamp_ms": {"type": "integer"},
"cloud_provider": {"type": ["null", "string"]},
"min_workers": {"type": ["null", "integer"]},
"max_workers": {"type": ["null", "integer"]},
"head_node_instance_type": {"type": ["null", "string"]},
"worker_node_instance_types": {
"type": ["null", "array"],
"items": {"type": "string"},
},
"total_num_cpus": {"type": ["null", "integer"]},
"total_num_gpus": {"type": ["null", "integer"]},
"total_memory_gb": {"type": ["null", "number"]},
"total_object_store_memory_gb": {"type": ["null", "number"]},
"total_success": {"type": "integer"},
"total_failed": {"type": "integer"},
"seq_number": {"type": "integer"},
@ -227,6 +240,133 @@ def test_usage_lib_cluster_metadata_generation_usage_disabled(shutdown_only):
assert len(meta) == 2
def test_usage_lib_get_cluster_status_to_report(shutdown_only):
ray.init(num_cpus=3, num_gpus=1, object_store_memory=2 ** 30)
# Wait for monitor.py to update cluster status
wait_for_condition(
lambda: ray_usage_lib.get_cluster_status_to_report(
ray.experimental.internal_kv.internal_kv_get_gcs_client(),
num_retries=20,
).total_num_cpus
== 3,
timeout=10,
)
cluster_status_to_report = ray_usage_lib.get_cluster_status_to_report(
ray.experimental.internal_kv.internal_kv_get_gcs_client(),
num_retries=20,
)
assert cluster_status_to_report.total_num_cpus == 3
assert cluster_status_to_report.total_num_gpus == 1
assert cluster_status_to_report.total_memory_gb > 0
assert cluster_status_to_report.total_object_store_memory_gb == 1.0
def test_usage_lib_get_cluster_config_to_report(monkeypatch, tmp_path):
cluster_config_file_path = tmp_path / "ray_bootstrap_config.yaml"
""" Test minimal cluster config"""
cluster_config_file_path.write_text(
"""
cluster_name: minimal
max_workers: 1
provider:
type: aws
region: us-west-2
availability_zone: us-west-2a
"""
)
cluster_config_to_report = ray_usage_lib.get_cluster_config_to_report(
cluster_config_file_path
)
assert cluster_config_to_report.cloud_provider == "aws"
assert cluster_config_to_report.min_workers is None
assert cluster_config_to_report.max_workers == 1
assert cluster_config_to_report.head_node_instance_type is None
assert cluster_config_to_report.worker_node_instance_types is None
cluster_config_file_path.write_text(
"""
cluster_name: full
min_workers: 1
provider:
type: gcp
head_node_type: head_node
available_node_types:
head_node:
node_config:
InstanceType: m5.large
min_workers: 0
max_workers: 0
aws_worker_node:
node_config:
InstanceType: m3.large
min_workers: 0
max_workers: 0
azure_worker_node:
node_config:
azure_arm_parameters:
vmSize: Standard_D2s_v3
gcp_worker_node:
node_config:
machineType: n1-standard-2
"""
)
cluster_config_to_report = ray_usage_lib.get_cluster_config_to_report(
cluster_config_file_path
)
assert cluster_config_to_report.cloud_provider == "gcp"
assert cluster_config_to_report.min_workers == 1
assert cluster_config_to_report.max_workers is None
assert cluster_config_to_report.head_node_instance_type == "m5.large"
assert cluster_config_to_report.worker_node_instance_types == list(
{"m3.large", "Standard_D2s_v3", "n1-standard-2"}
)
cluster_config_file_path.write_text(
"""
cluster_name: full
head_node_type: head_node
available_node_types:
worker_node_1:
node_config:
ImageId: xyz
worker_node_2:
resources: {}
worker_node_3:
node_config:
InstanceType: m5.large
"""
)
cluster_config_to_report = ray_usage_lib.get_cluster_config_to_report(
cluster_config_file_path
)
assert cluster_config_to_report.cloud_provider is None
assert cluster_config_to_report.min_workers is None
assert cluster_config_to_report.max_workers is None
assert cluster_config_to_report.head_node_instance_type is None
assert cluster_config_to_report.worker_node_instance_types == ["m5.large"]
cluster_config_file_path.write_text("[invalid")
cluster_config_to_report = ray_usage_lib.get_cluster_config_to_report(
cluster_config_file_path
)
assert cluster_config_to_report == ClusterConfigToReport()
cluster_config_to_report = ray_usage_lib.get_cluster_config_to_report(
tmp_path / "does_not_exist.yaml"
)
assert cluster_config_to_report == ClusterConfigToReport()
monkeypatch.setenv("KUBERNETES_SERVICE_HOST", "localhost")
cluster_config_to_report = ray_usage_lib.get_cluster_config_to_report(
tmp_path / "does_not_exist.yaml"
)
assert cluster_config_to_report.cloud_provider == "kubernetes"
assert cluster_config_to_report.min_workers is None
assert cluster_config_to_report.max_workers is None
assert cluster_config_to_report.head_node_instance_type is None
assert cluster_config_to_report.worker_node_instance_types is None
@pytest.mark.skipif(
sys.platform == "win32",
reason="Test depends on runtime env feature not supported on Windows.",
@ -242,7 +382,23 @@ def test_usage_lib_report_data(monkeypatch, shutdown_only, tmp_path):
cluster_metadata = ray_usage_lib.get_cluster_metadata(
ray.experimental.internal_kv.internal_kv_get_gcs_client(), num_retries=20
)
d = ray_usage_lib.generate_report_data(cluster_metadata, 2, 2, 2)
cluster_config_file_path = tmp_path / "ray_bootstrap_config.yaml"
cluster_config_file_path.write_text(
"""
cluster_name: minimal
max_workers: 1
provider:
type: aws
region: us-west-2
availability_zone: us-west-2a
"""
)
cluster_config_to_report = ray_usage_lib.get_cluster_config_to_report(
cluster_config_file_path
)
d = ray_usage_lib.generate_report_data(
cluster_metadata, cluster_config_to_report, 2, 2, 2
)
validate(instance=asdict(d), schema=schema)
"""
@ -250,7 +406,7 @@ def test_usage_lib_report_data(monkeypatch, shutdown_only, tmp_path):
"""
client = ray_usage_lib.UsageReportClient()
temp_dir = Path(tmp_path)
client._write_usage_data(d, temp_dir)
client.write_usage_data(d, temp_dir)
wait_for_condition(lambda: file_exists(temp_dir))
@ -286,7 +442,7 @@ def test_usage_lib_report_data(monkeypatch, shutdown_only, tmp_path):
ray.get(s.ready.remote())
# Query our endpoint over HTTP.
r = client._report_usage_data("http://127.0.0.1:8000/usage", d)
r = client.report_usage_data("http://127.0.0.1:8000/usage", d)
r.raise_for_status()
assert json.loads(r.text) is True
@ -295,15 +451,27 @@ def test_usage_lib_report_data(monkeypatch, shutdown_only, tmp_path):
sys.platform == "win32",
reason="Test depends on runtime env feature not supported on Windows.",
)
def test_usage_report_e2e(monkeypatch, shutdown_only):
def test_usage_report_e2e(monkeypatch, shutdown_only, tmp_path):
"""
Test usage report works e2e with env vars.
"""
cluster_config_file_path = tmp_path / "ray_bootstrap_config.yaml"
cluster_config_file_path.write_text(
"""
cluster_name: minimal
max_workers: 1
provider:
type: aws
region: us-west-2
availability_zone: us-west-2a
"""
)
with monkeypatch.context() as m:
m.setenv("HOME", str(tmp_path))
m.setenv("RAY_USAGE_STATS_ENABLED", "1")
m.setenv("RAY_USAGE_STATS_REPORT_URL", "http://127.0.0.1:8000/usage")
m.setenv("RAY_USAGE_STATS_REPORT_INTERVAL_S", "1")
ray.init(num_cpus=0)
ray.init(num_cpus=3)
@ray.remote(num_cpus=0)
class StatusReporter:
@ -361,7 +529,23 @@ def test_usage_report_e2e(monkeypatch, shutdown_only):
except Exception:
print_dashboard_log()
raise
validate(instance=ray.get(reporter.get_payload.remote()), schema=schema)
payload = ray.get(reporter.get_payload.remote())
ray_version, python_version = ray._private.utils.compute_version_info()
assert payload["ray_version"] == ray_version
assert payload["python_version"] == python_version
assert payload["schema_version"] == "0.1"
assert payload["os"] == sys.platform
assert payload["source"] == "OSS"
assert payload["cloud_provider"] == "aws"
assert payload["min_workers"] is None
assert payload["max_workers"] == 1
assert payload["head_node_instance_type"] is None
assert payload["worker_node_instance_types"] is None
assert payload["total_num_cpus"] == 3
assert payload["total_num_gpus"] is None
assert payload["total_memory_gb"] > 0
assert payload["total_object_store_memory_gb"] > 0
validate(instance=payload, schema=schema)
"""
Verify the usage_stats.json is updated.
"""