From f55f605cfda1f416afc713711abf760e94440a0f Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Wed, 3 Aug 2022 19:40:22 -0700 Subject: [PATCH] [dashboard] Add last_activity_at field to /api/component_activities (#27463) Signed-off-by: Nikita Vemuri --- .../snapshot/component_activities_schema.json | 3 ++ dashboard/modules/snapshot/snapshot_head.py | 30 ++++++++++++++++++- .../modules/snapshot/tests/test_snapshot.py | 21 +++++++++++-- 3 files changed, 50 insertions(+), 4 deletions(-) diff --git a/dashboard/modules/snapshot/component_activities_schema.json b/dashboard/modules/snapshot/component_activities_schema.json index c02338d58..9366cee63 100644 --- a/dashboard/modules/snapshot/component_activities_schema.json +++ b/dashboard/modules/snapshot/component_activities_schema.json @@ -15,6 +15,9 @@ }, "timestamp": { "type": ["number"] + }, + "last_activity_at": { + "type": ["number", "null"] } }, "required": ["is_active"] diff --git a/dashboard/modules/snapshot/snapshot_head.py b/dashboard/modules/snapshot/snapshot_head.py index a66ad2545..a7ee5cfd1 100644 --- a/dashboard/modules/snapshot/snapshot_head.py +++ b/dashboard/modules/snapshot/snapshot_head.py @@ -62,6 +62,14 @@ class RayActivityResponse(BaseModel, extra=Extra.allow): "This is in the format of seconds since unix epoch." ), ) + last_activity_at: Optional[float] = Field( + None, + description=( + "Timestamp when last actvity of this Ray component finished in format of " + "seconds since unix epoch. This field does not need to be populated " + "for Ray components where it is not meaningful." + ), + ) @validator("reason", always=True) def reason_required(cls, v, values, **kwargs): @@ -212,14 +220,31 @@ class APIHead(dashboard_utils.DashboardHeadModule): ) num_active_drivers = 0 + latest_job_end_time = 0 for job_table_entry in reply.job_info_list: is_dead = bool(job_table_entry.is_dead) in_internal_namespace = job_table_entry.config.ray_namespace.startswith( "_ray_internal_" ) + latest_job_end_time = ( + max(latest_job_end_time, job_table_entry.end_time) + if job_table_entry.end_time + else latest_job_end_time + ) if not is_dead and not in_internal_namespace: num_active_drivers += 1 + current_timestamp = datetime.now().timestamp() + # Latest job end time must be before or equal to the current timestamp. + # Job end times may be provided in epoch milliseconds. Check if this + # is true, and convert to seconds + if latest_job_end_time > current_timestamp: + latest_job_end_time = latest_job_end_time / 1000 + assert current_timestamp >= latest_job_end_time, ( + f"Most recent job end time {latest_job_end_time} must be " + f"before or equal to the current timestamp {current_timestamp}" + ) + is_active = ( RayActivityStatus.ACTIVE if num_active_drivers > 0 @@ -230,7 +255,10 @@ class APIHead(dashboard_utils.DashboardHeadModule): reason=f"Number of active drivers: {num_active_drivers}" if num_active_drivers else None, - timestamp=datetime.now().timestamp(), + timestamp=current_timestamp, + # If latest_job_end_time == 0, no jobs have finished yet so don't + # populate last_activity_at + last_activity_at=latest_job_end_time if latest_job_end_time else None, ) except Exception as e: logger.exception("Failed to get activity status of Ray drivers.") diff --git a/dashboard/modules/snapshot/tests/test_snapshot.py b/dashboard/modules/snapshot/tests/test_snapshot.py index 56966c4da..1bc84a0ea 100644 --- a/dashboard/modules/snapshot/tests/test_snapshot.py +++ b/dashboard/modules/snapshot/tests/test_snapshot.py @@ -126,6 +126,9 @@ import ray ray.init(address="auto", namespace="{namespace}") """ run_string_as_driver_nonblocking(driver_template.format(namespace="my_namespace")) + # Wait for above driver to start and finish + time.sleep(2) + run_string_as_driver_nonblocking(driver_template.format(namespace="my_namespace")) run_string_as_driver_nonblocking( driver_template.format(namespace="_ray_internal_job_info_id1") @@ -135,7 +138,7 @@ ray.init(address="auto", namespace="{namespace}") driver_template.format(namespace="_ray_internal_dashboard") ) - # Wait 1 sec for drivers to start + # Wait 1.5 sec for drivers to start (but not finish) time.sleep(1.5) # Verify drivers are considered active after running script @@ -158,10 +161,22 @@ ray.init(address="auto", namespace="{namespace}") assert driver_ray_activity_response.is_active == "ACTIVE" # Drivers with namespace starting with "_ray_internal" are not - # considered active drivers. Three active drivers are the two + # considered active drivers. Two active drivers are the second one # run with namespace "my_namespace" and the one started # from ray_start_with_dashboard - assert driver_ray_activity_response.reason == "Number of active drivers: 3" + assert driver_ray_activity_response.reason == "Number of active drivers: 2" + + # Get expected_last_activity at from snapshot endpoint which returns details + # about all jobs + jobs_snapshot_data = requests.get(f"{webui_url}/api/snapshot").json()["data"][ + "snapshot" + ]["jobs"] + # Divide endTime by 1000 to convert from milliseconds to seconds + expected_last_activity_at = max( + [job.get("endTime", 0) / 1000 for (job_id, job) in jobs_snapshot_data.items()] + ) + + assert driver_ray_activity_response.last_activity_at == expected_last_activity_at def test_snapshot(ray_start_with_dashboard):