mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
[Dashboard] Fix missing actor pid (#13229)
This commit is contained in:
parent
0b22341bc9
commit
4853aa96cb
5 changed files with 13 additions and 34 deletions
|
@ -159,7 +159,7 @@ class DataOrganizer:
|
|||
# Merge GcsNodeInfo to node physical stats
|
||||
node_info["raylet"].update(node)
|
||||
# Merge actors to node physical stats
|
||||
node_info["actors"] = await cls.get_node_actors(node_id)
|
||||
node_info["actors"] = DataSource.node_actors.get(node_id, {})
|
||||
# Update workers to node physical stats
|
||||
node_info["workers"] = DataSource.node_workers.get(node_id, [])
|
||||
node_info["logCount"] = node_log_count
|
||||
|
@ -203,22 +203,6 @@ class DataOrganizer:
|
|||
for node_id in DataSource.nodes.keys()
|
||||
]
|
||||
|
||||
@classmethod
|
||||
async def get_node_actors(cls, node_id):
|
||||
node_actors = DataSource.node_actors.get(node_id, {})
|
||||
return {
|
||||
actor_id: await cls._get_actor(actor)
|
||||
for actor_id, actor in node_actors.items()
|
||||
}
|
||||
|
||||
@classmethod
|
||||
async def get_job_actors(cls, job_id):
|
||||
job_actors = DataSource.job_actors.get(job_id, {})
|
||||
return {
|
||||
actor_id: await cls._get_actor(actor)
|
||||
for actor_id, actor in job_actors.items()
|
||||
}
|
||||
|
||||
@classmethod
|
||||
async def get_all_actors(cls):
|
||||
return {
|
||||
|
|
|
@ -12,7 +12,6 @@ from ray.core.generated import gcs_service_pb2
|
|||
from ray.core.generated import gcs_service_pb2_grpc
|
||||
from ray.new_dashboard.datacenter import (
|
||||
DataSource,
|
||||
DataOrganizer,
|
||||
GlobalSignals,
|
||||
)
|
||||
|
||||
|
@ -53,7 +52,7 @@ class JobHead(dashboard_utils.DashboardHeadModule):
|
|||
if view is None:
|
||||
job_detail = {
|
||||
"jobInfo": DataSource.jobs.get(job_id, {}),
|
||||
"jobActors": await DataOrganizer.get_job_actors(job_id),
|
||||
"jobActors": DataSource.job_actors.get(job_id, {}),
|
||||
"jobWorkers": DataSource.job_workers.get(job_id, []),
|
||||
}
|
||||
await GlobalSignals.job_info_fetched.send(job_detail)
|
||||
|
@ -104,16 +103,10 @@ class JobHead(dashboard_utils.DashboardHeadModule):
|
|||
pubsub_message = ray.gcs_utils.PubSubMessage.FromString(data)
|
||||
message = ray.gcs_utils.JobTableData.FromString(
|
||||
pubsub_message.data)
|
||||
job_id = ray._raylet.JobID(message.job_id)
|
||||
if job_id.is_submitted_from_dashboard():
|
||||
job_table_data = job_table_data_to_dict(message)
|
||||
job_id = job_table_data["jobId"]
|
||||
# Update jobs.
|
||||
DataSource.jobs[job_id] = job_table_data
|
||||
else:
|
||||
logger.info(
|
||||
"Ignore job %s which is not submitted from dashboard.",
|
||||
job_id.hex())
|
||||
job_table_data = job_table_data_to_dict(message)
|
||||
job_id = job_table_data["jobId"]
|
||||
# Update jobs.
|
||||
DataSource.jobs[job_id] = job_table_data
|
||||
except Exception:
|
||||
logger.exception("Error receiving job info.")
|
||||
|
||||
|
|
|
@ -42,7 +42,7 @@ def test_get_job_info(disable_aiohttp_cache, ray_start_with_dashboard):
|
|||
result = resp.json()
|
||||
assert result["result"] is True, resp.text
|
||||
job_summary = result["data"]["summary"]
|
||||
assert len(job_summary) == 1
|
||||
assert len(job_summary) == 1, resp.text
|
||||
one_job = job_summary[0]
|
||||
assert "jobId" in one_job
|
||||
job_id = one_job["jobId"]
|
||||
|
@ -67,7 +67,7 @@ def test_get_job_info(disable_aiohttp_cache, ray_start_with_dashboard):
|
|||
assert len(one_job_summary_keys - job_detail["jobInfo"].keys()) == 0
|
||||
assert "jobActors" in job_detail
|
||||
job_actors = job_detail["jobActors"]
|
||||
assert len(job_actors) == 1
|
||||
assert len(job_actors) == 1, resp.text
|
||||
one_job_actor = job_actors[actor_id]
|
||||
assert "taskSpec" in one_job_actor
|
||||
assert type(one_job_actor["taskSpec"]) is dict
|
||||
|
@ -82,7 +82,7 @@ def test_get_job_info(disable_aiohttp_cache, ray_start_with_dashboard):
|
|||
assert k in one_job_actor
|
||||
assert "jobWorkers" in job_detail
|
||||
job_workers = job_detail["jobWorkers"]
|
||||
assert len(job_workers) == 1
|
||||
assert len(job_workers) == 1, resp.text
|
||||
one_job_worker = job_workers[0]
|
||||
check_worker_keys = [
|
||||
"cmdline", "pid", "cpuTimes", "memoryInfo", "cpuPercent",
|
||||
|
@ -91,7 +91,7 @@ def test_get_job_info(disable_aiohttp_cache, ray_start_with_dashboard):
|
|||
for k in check_worker_keys:
|
||||
assert k in one_job_worker
|
||||
|
||||
timeout_seconds = 5
|
||||
timeout_seconds = 10
|
||||
start_time = time.time()
|
||||
last_ex = None
|
||||
while True:
|
||||
|
|
|
@ -121,7 +121,7 @@ def test_actors(disable_aiohttp_cache, ray_start_with_dashboard):
|
|||
assert "name" in one_entry
|
||||
assert "numRestarts" in one_entry
|
||||
assert "pid" in one_entry
|
||||
all_pids = [entry["pid"] for entry in actors.values()]
|
||||
all_pids = {entry["pid"] for entry in actors.values()}
|
||||
assert 0 in all_pids # The infeasible actor
|
||||
assert len(all_pids) > 1
|
||||
break
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
#include "ray/raylet/scheduling/cluster_task_manager.h"
|
||||
|
||||
#include <google/protobuf/map.h>
|
||||
|
||||
#include <boost/range/join.hpp>
|
||||
|
||||
#include "ray/util/logging.h"
|
||||
|
@ -647,6 +648,7 @@ void ClusterTaskManager::Dispatch(
|
|||
const auto &task_spec = task.GetTaskSpecification();
|
||||
RAY_LOG(DEBUG) << "Dispatching task " << task_spec.TaskId();
|
||||
// Pass the contact info of the worker to use.
|
||||
reply->set_worker_pid(worker->GetProcess().GetId());
|
||||
reply->mutable_worker_address()->set_ip_address(worker->IpAddress());
|
||||
reply->mutable_worker_address()->set_port(worker->Port());
|
||||
reply->mutable_worker_address()->set_worker_id(worker->WorkerId().Binary());
|
||||
|
|
Loading…
Add table
Reference in a new issue