2021-11-03 09:49:28 -05:00
|
|
|
from typing import Any, Dict, List, Optional
|
2021-08-24 12:06:26 -07:00
|
|
|
import hashlib
|
2021-08-11 20:26:54 -07:00
|
|
|
|
|
|
|
import ray
|
2021-05-20 08:10:32 -07:00
|
|
|
from ray.core.generated import gcs_service_pb2
|
|
|
|
from ray.core.generated import gcs_pb2
|
|
|
|
from ray.core.generated import gcs_service_pb2_grpc
|
2021-08-31 12:46:41 -05:00
|
|
|
from ray.experimental.internal_kv import (_initialize_internal_kv,
|
|
|
|
_internal_kv_initialized,
|
|
|
|
_internal_kv_get, _internal_kv_list)
|
2021-09-15 11:17:15 -05:00
|
|
|
import ray.dashboard.utils as dashboard_utils
|
2021-11-10 14:14:55 -08:00
|
|
|
from ray.dashboard.modules.job.common import (JobStatusStorageClient,
|
|
|
|
JOB_ID_METADATA_KEY)
|
2021-05-20 08:10:32 -07:00
|
|
|
|
|
|
|
import json
|
2021-09-13 20:03:15 -07:00
|
|
|
import aiohttp.web
|
2021-05-20 08:10:32 -07:00
|
|
|
|
|
|
|
routes = dashboard_utils.ClassMethodRouteTable
|
|
|
|
|
|
|
|
|
2021-09-13 20:03:15 -07:00
|
|
|
class APIHead(dashboard_utils.DashboardHeadModule):
|
2021-05-20 08:10:32 -07:00
|
|
|
def __init__(self, dashboard_head):
|
|
|
|
super().__init__(dashboard_head)
|
|
|
|
self._gcs_job_info_stub = None
|
|
|
|
self._gcs_actor_info_stub = None
|
2021-06-02 15:07:06 -07:00
|
|
|
self._dashboard_head = dashboard_head
|
2021-05-20 08:10:32 -07:00
|
|
|
|
2021-08-31 12:46:41 -05:00
|
|
|
_initialize_internal_kv(dashboard_head.gcs_client)
|
|
|
|
assert _internal_kv_initialized()
|
2021-11-03 09:49:28 -05:00
|
|
|
self._job_status_client = JobStatusStorageClient()
|
2021-08-31 12:46:41 -05:00
|
|
|
|
2021-09-13 20:03:15 -07:00
|
|
|
@routes.get("/api/actors/kill")
|
|
|
|
async def kill_actor_gcs(self, req) -> aiohttp.web.Response:
|
|
|
|
actor_id = req.query.get("actor_id")
|
|
|
|
force_kill = req.query.get("force_kill", False) in ("true", "True")
|
|
|
|
no_restart = req.query.get("no_restart", False) in ("true", "True")
|
|
|
|
if not actor_id:
|
2021-09-15 11:17:15 -05:00
|
|
|
return dashboard_utils.rest_response(
|
2021-09-13 20:03:15 -07:00
|
|
|
success=False, message="actor_id is required.")
|
|
|
|
|
|
|
|
request = gcs_service_pb2.KillActorViaGcsRequest()
|
|
|
|
request.actor_id = bytes.fromhex(actor_id)
|
|
|
|
request.force_kill = force_kill
|
|
|
|
request.no_restart = no_restart
|
|
|
|
await self._gcs_actor_info_stub.KillActorViaGcs(request, timeout=5)
|
|
|
|
|
|
|
|
message = (f"Force killed actor with id {actor_id}" if force_kill else
|
|
|
|
f"Requested actor with id {actor_id} to terminate. " +
|
|
|
|
"It will exit once running tasks complete")
|
|
|
|
|
2021-09-15 11:17:15 -05:00
|
|
|
return dashboard_utils.rest_response(success=True, message=message)
|
2021-09-13 20:03:15 -07:00
|
|
|
|
2021-05-20 08:10:32 -07:00
|
|
|
@routes.get("/api/snapshot")
|
|
|
|
async def snapshot(self, req):
|
|
|
|
job_data = await self.get_job_info()
|
|
|
|
actor_data = await self.get_actor_info()
|
2021-08-06 15:03:29 -07:00
|
|
|
serve_data = await self.get_serve_info()
|
2021-06-02 15:07:06 -07:00
|
|
|
session_name = await self.get_session_name()
|
2021-05-20 08:10:32 -07:00
|
|
|
snapshot = {
|
|
|
|
"jobs": job_data,
|
|
|
|
"actors": actor_data,
|
2021-08-06 15:03:29 -07:00
|
|
|
"deployments": serve_data,
|
2021-06-02 15:07:06 -07:00
|
|
|
"session_name": session_name,
|
2021-08-11 20:26:54 -07:00
|
|
|
"ray_version": ray.__version__,
|
|
|
|
"ray_commit": ray.__commit__
|
2021-05-20 08:10:32 -07:00
|
|
|
}
|
|
|
|
return dashboard_utils.rest_response(
|
|
|
|
success=True, message="hello", snapshot=snapshot)
|
|
|
|
|
2021-11-03 09:49:28 -05:00
|
|
|
def _get_job_status(self, metadata: Dict[str, str]) -> Optional[str]:
|
|
|
|
status = None
|
|
|
|
job_submission_id = metadata.get(JOB_ID_METADATA_KEY)
|
|
|
|
# If a job submission ID has been added to a job, the status is
|
|
|
|
# guaranteed to be returned.
|
|
|
|
if job_submission_id is not None:
|
|
|
|
status = str(self._job_status_client.get_status(job_submission_id))
|
|
|
|
|
|
|
|
return status
|
|
|
|
|
2021-05-20 08:10:32 -07:00
|
|
|
async def get_job_info(self):
|
|
|
|
request = gcs_service_pb2.GetAllJobInfoRequest()
|
|
|
|
reply = await self._gcs_job_info_stub.GetAllJobInfo(request, timeout=5)
|
|
|
|
|
|
|
|
jobs = {}
|
|
|
|
for job_table_entry in reply.job_info_list:
|
|
|
|
job_id = job_table_entry.job_id.hex()
|
2021-11-03 09:49:28 -05:00
|
|
|
metadata = dict(job_table_entry.config.metadata)
|
2021-05-20 08:10:32 -07:00
|
|
|
config = {
|
|
|
|
"namespace": job_table_entry.config.ray_namespace,
|
2021-11-03 09:49:28 -05:00
|
|
|
"metadata": metadata,
|
2021-05-20 08:10:32 -07:00
|
|
|
"runtime_env": json.loads(
|
2021-09-28 15:13:15 -05:00
|
|
|
job_table_entry.config.runtime_env.serialized_runtime_env),
|
2021-05-20 08:10:32 -07:00
|
|
|
}
|
|
|
|
entry = {
|
2021-11-03 09:49:28 -05:00
|
|
|
"status": self._get_job_status(metadata),
|
2021-05-20 08:10:32 -07:00
|
|
|
"is_dead": job_table_entry.is_dead,
|
|
|
|
"start_time": job_table_entry.start_time,
|
|
|
|
"end_time": job_table_entry.end_time,
|
|
|
|
"config": config,
|
|
|
|
}
|
|
|
|
jobs[job_id] = entry
|
|
|
|
|
|
|
|
return jobs
|
|
|
|
|
|
|
|
async def get_actor_info(self):
|
|
|
|
# TODO (Alex): GCS still needs to return actors from dead jobs.
|
|
|
|
request = gcs_service_pb2.GetAllActorInfoRequest()
|
|
|
|
request.show_dead_jobs = True
|
|
|
|
reply = await self._gcs_actor_info_stub.GetAllActorInfo(
|
|
|
|
request, timeout=5)
|
|
|
|
actors = {}
|
|
|
|
for actor_table_entry in reply.actor_table_data:
|
|
|
|
actor_id = actor_table_entry.actor_id.hex()
|
|
|
|
runtime_env = json.loads(actor_table_entry.serialized_runtime_env)
|
|
|
|
entry = {
|
|
|
|
"job_id": actor_table_entry.job_id.hex(),
|
|
|
|
"state": gcs_pb2.ActorTableData.ActorState.Name(
|
|
|
|
actor_table_entry.state),
|
|
|
|
"name": actor_table_entry.name,
|
|
|
|
"namespace": actor_table_entry.ray_namespace,
|
|
|
|
"runtime_env": runtime_env,
|
|
|
|
"start_time": actor_table_entry.start_time,
|
|
|
|
"end_time": actor_table_entry.end_time,
|
|
|
|
"is_detached": actor_table_entry.is_detached,
|
|
|
|
"resources": dict(
|
|
|
|
actor_table_entry.task_spec.required_resources),
|
|
|
|
"actor_class": actor_table_entry.class_name,
|
2021-05-21 09:26:37 -07:00
|
|
|
"current_worker_id": actor_table_entry.address.worker_id.hex(),
|
|
|
|
"current_raylet_id": actor_table_entry.address.raylet_id.hex(),
|
2021-05-20 08:10:32 -07:00
|
|
|
"ip_address": actor_table_entry.address.ip_address,
|
2021-08-13 09:49:12 -07:00
|
|
|
"port": actor_table_entry.address.port,
|
|
|
|
"metadata": dict()
|
2021-05-20 08:10:32 -07:00
|
|
|
}
|
|
|
|
actors[actor_id] = entry
|
2021-08-13 09:49:12 -07:00
|
|
|
|
|
|
|
deployments = await self.get_serve_info()
|
2021-08-24 12:06:26 -07:00
|
|
|
for _, deployment_info in deployments.items():
|
2021-08-13 09:49:12 -07:00
|
|
|
for replica_actor_id, actor_info in deployment_info[
|
|
|
|
"actors"].items():
|
|
|
|
if replica_actor_id in actors:
|
|
|
|
serve_metadata = dict()
|
|
|
|
serve_metadata["replica_tag"] = actor_info[
|
|
|
|
"replica_tag"]
|
2021-08-24 12:06:26 -07:00
|
|
|
serve_metadata["deployment_name"] = deployment_info[
|
|
|
|
"name"]
|
2021-08-13 09:49:12 -07:00
|
|
|
serve_metadata["version"] = actor_info["version"]
|
|
|
|
actors[replica_actor_id]["metadata"][
|
|
|
|
"serve"] = serve_metadata
|
2021-05-20 08:10:32 -07:00
|
|
|
return actors
|
|
|
|
|
2021-08-13 09:49:12 -07:00
|
|
|
async def get_serve_info(self) -> Dict[str, Any]:
|
2021-08-10 17:06:00 -07:00
|
|
|
# Conditionally import serve to prevent ModuleNotFoundError from serve
|
|
|
|
# dependencies when only ray[default] is installed (#17712)
|
|
|
|
try:
|
|
|
|
from ray.serve.controller import SNAPSHOT_KEY as SERVE_SNAPSHOT_KEY
|
|
|
|
from ray.serve.constants import SERVE_CONTROLLER_NAME
|
|
|
|
except Exception:
|
2021-08-13 09:49:12 -07:00
|
|
|
return {}
|
2021-08-10 17:06:00 -07:00
|
|
|
|
2021-08-06 15:03:29 -07:00
|
|
|
# Serve wraps Ray's internal KV store and specially formats the keys.
|
2021-08-11 20:26:54 -07:00
|
|
|
# These are the keys we are interested in:
|
|
|
|
# SERVE_CONTROLLER_NAME(+ optional random letters):SERVE_SNAPSHOT_KEY
|
|
|
|
|
2021-08-31 12:46:41 -05:00
|
|
|
serve_keys = _internal_kv_list(SERVE_CONTROLLER_NAME)
|
2021-08-11 20:26:54 -07:00
|
|
|
serve_snapshot_keys = filter(lambda k: SERVE_SNAPSHOT_KEY in str(k),
|
|
|
|
serve_keys)
|
|
|
|
|
|
|
|
deployments_per_controller: List[Dict[str, Any]] = []
|
|
|
|
for key in serve_snapshot_keys:
|
2021-08-31 12:46:41 -05:00
|
|
|
val_bytes = _internal_kv_get(key) or "{}".encode("utf-8")
|
2021-08-11 20:26:54 -07:00
|
|
|
deployments_per_controller.append(
|
|
|
|
json.loads(val_bytes.decode("utf-8")))
|
2021-08-24 12:06:26 -07:00
|
|
|
# Merge the deployments dicts of all controllers.
|
2021-08-11 20:26:54 -07:00
|
|
|
deployments: Dict[str, Any] = {
|
|
|
|
k: v
|
|
|
|
for d in deployments_per_controller for k, v in d.items()
|
|
|
|
}
|
2021-08-24 12:06:26 -07:00
|
|
|
# Replace the keys (deployment names) with their hashes to prevent
|
|
|
|
# collisions caused by the automatic conversion to camelcase by the
|
|
|
|
# dashboard agent.
|
|
|
|
deployments = {
|
|
|
|
hashlib.sha1(name.encode()).hexdigest(): info
|
|
|
|
for name, info in deployments.items()
|
|
|
|
}
|
2021-08-11 20:26:54 -07:00
|
|
|
return deployments
|
2021-08-06 15:03:29 -07:00
|
|
|
|
2021-06-02 15:07:06 -07:00
|
|
|
async def get_session_name(self):
|
|
|
|
encoded_name = await self._dashboard_head.aioredis_client.get(
|
|
|
|
"session_name")
|
|
|
|
return encoded_name.decode()
|
|
|
|
|
2021-05-20 08:10:32 -07:00
|
|
|
async def run(self, server):
|
|
|
|
self._gcs_job_info_stub = gcs_service_pb2_grpc.JobInfoGcsServiceStub(
|
|
|
|
self._dashboard_head.aiogrpc_gcs_channel)
|
|
|
|
self._gcs_actor_info_stub = \
|
|
|
|
gcs_service_pb2_grpc.ActorInfoGcsServiceStub(
|
|
|
|
self._dashboard_head.aiogrpc_gcs_channel)
|