[dashboard] Add last_activity_at field to /api/component_activities (#27463)

Signed-off-by: Nikita Vemuri <nikitavemuri@gmail.com>
This commit is contained in:
Nikita Vemuri 2022-08-03 19:40:22 -07:00 committed by GitHub
parent b7540d46cd
commit f55f605cfd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 50 additions and 4 deletions

View file

@ -15,6 +15,9 @@
}, },
"timestamp": { "timestamp": {
"type": ["number"] "type": ["number"]
},
"last_activity_at": {
"type": ["number", "null"]
} }
}, },
"required": ["is_active"] "required": ["is_active"]

View file

@ -62,6 +62,14 @@ class RayActivityResponse(BaseModel, extra=Extra.allow):
"This is in the format of seconds since unix epoch." "This is in the format of seconds since unix epoch."
), ),
) )
last_activity_at: Optional[float] = Field(
None,
description=(
"Timestamp when last actvity of this Ray component finished in format of "
"seconds since unix epoch. This field does not need to be populated "
"for Ray components where it is not meaningful."
),
)
@validator("reason", always=True) @validator("reason", always=True)
def reason_required(cls, v, values, **kwargs): def reason_required(cls, v, values, **kwargs):
@ -212,14 +220,31 @@ class APIHead(dashboard_utils.DashboardHeadModule):
) )
num_active_drivers = 0 num_active_drivers = 0
latest_job_end_time = 0
for job_table_entry in reply.job_info_list: for job_table_entry in reply.job_info_list:
is_dead = bool(job_table_entry.is_dead) is_dead = bool(job_table_entry.is_dead)
in_internal_namespace = job_table_entry.config.ray_namespace.startswith( in_internal_namespace = job_table_entry.config.ray_namespace.startswith(
"_ray_internal_" "_ray_internal_"
) )
latest_job_end_time = (
max(latest_job_end_time, job_table_entry.end_time)
if job_table_entry.end_time
else latest_job_end_time
)
if not is_dead and not in_internal_namespace: if not is_dead and not in_internal_namespace:
num_active_drivers += 1 num_active_drivers += 1
current_timestamp = datetime.now().timestamp()
# Latest job end time must be before or equal to the current timestamp.
# Job end times may be provided in epoch milliseconds. Check if this
# is true, and convert to seconds
if latest_job_end_time > current_timestamp:
latest_job_end_time = latest_job_end_time / 1000
assert current_timestamp >= latest_job_end_time, (
f"Most recent job end time {latest_job_end_time} must be "
f"before or equal to the current timestamp {current_timestamp}"
)
is_active = ( is_active = (
RayActivityStatus.ACTIVE RayActivityStatus.ACTIVE
if num_active_drivers > 0 if num_active_drivers > 0
@ -230,7 +255,10 @@ class APIHead(dashboard_utils.DashboardHeadModule):
reason=f"Number of active drivers: {num_active_drivers}" reason=f"Number of active drivers: {num_active_drivers}"
if num_active_drivers if num_active_drivers
else None, else None,
timestamp=datetime.now().timestamp(), timestamp=current_timestamp,
# If latest_job_end_time == 0, no jobs have finished yet so don't
# populate last_activity_at
last_activity_at=latest_job_end_time if latest_job_end_time else None,
) )
except Exception as e: except Exception as e:
logger.exception("Failed to get activity status of Ray drivers.") logger.exception("Failed to get activity status of Ray drivers.")

View file

@ -126,6 +126,9 @@ import ray
ray.init(address="auto", namespace="{namespace}") ray.init(address="auto", namespace="{namespace}")
""" """
run_string_as_driver_nonblocking(driver_template.format(namespace="my_namespace")) run_string_as_driver_nonblocking(driver_template.format(namespace="my_namespace"))
# Wait for above driver to start and finish
time.sleep(2)
run_string_as_driver_nonblocking(driver_template.format(namespace="my_namespace")) run_string_as_driver_nonblocking(driver_template.format(namespace="my_namespace"))
run_string_as_driver_nonblocking( run_string_as_driver_nonblocking(
driver_template.format(namespace="_ray_internal_job_info_id1") driver_template.format(namespace="_ray_internal_job_info_id1")
@ -135,7 +138,7 @@ ray.init(address="auto", namespace="{namespace}")
driver_template.format(namespace="_ray_internal_dashboard") driver_template.format(namespace="_ray_internal_dashboard")
) )
# Wait 1 sec for drivers to start # Wait 1.5 sec for drivers to start (but not finish)
time.sleep(1.5) time.sleep(1.5)
# Verify drivers are considered active after running script # Verify drivers are considered active after running script
@ -158,10 +161,22 @@ ray.init(address="auto", namespace="{namespace}")
assert driver_ray_activity_response.is_active == "ACTIVE" assert driver_ray_activity_response.is_active == "ACTIVE"
# Drivers with namespace starting with "_ray_internal" are not # Drivers with namespace starting with "_ray_internal" are not
# considered active drivers. Three active drivers are the two # considered active drivers. Two active drivers are the second one
# run with namespace "my_namespace" and the one started # run with namespace "my_namespace" and the one started
# from ray_start_with_dashboard # from ray_start_with_dashboard
assert driver_ray_activity_response.reason == "Number of active drivers: 3" assert driver_ray_activity_response.reason == "Number of active drivers: 2"
# Get expected_last_activity at from snapshot endpoint which returns details
# about all jobs
jobs_snapshot_data = requests.get(f"{webui_url}/api/snapshot").json()["data"][
"snapshot"
]["jobs"]
# Divide endTime by 1000 to convert from milliseconds to seconds
expected_last_activity_at = max(
[job.get("endTime", 0) / 1000 for (job_id, job) in jobs_snapshot_data.items()]
)
assert driver_ray_activity_response.last_activity_at == expected_last_activity_at
def test_snapshot(ray_start_with_dashboard): def test_snapshot(ray_start_with_dashboard):