[gcs] Preserve job driver info for dashboard (#25880)

This PR ensures that GCS keeps the IP and PID information about a job so that it can be used to find the job's logs in the dashboard after the job terminates.

@alanwguo will handle any dashboard work in a separate PR.

Co-authored-by: Alex Wu <alex@anyscale.com>
This commit is contained in:
Alex Wu 2022-06-17 09:03:20 -07:00 committed by GitHub
parent b24c736bb8
commit 187c21ce20
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 79 additions and 18 deletions

View file

@ -1,11 +1,10 @@
import time
import os
import subprocess
import sys
import tempfile
import subprocess
import time
import ray
from ray.job_config import JobConfig
import ray._private.gcs_utils as gcs_utils
from ray._private.test_utils import (
run_string_as_driver,
@ -13,6 +12,7 @@ from ray._private.test_utils import (
wait_for_condition,
wait_for_num_actors,
)
from ray.job_config import JobConfig
def test_job_isolation(call_ray_start):
@ -181,7 +181,7 @@ ray.get(_.value.remote())
assert ray.get(detached_actor.value.remote()) == 1
def test_job_timestamps(ray_start_regular):
def test_job_observability(ray_start_regular):
driver_template = """
import ray
from time import sleep
@ -235,6 +235,11 @@ ray.shutdown()
assert running["StartTime"] > 0
assert running["EndTime"] == 0
assert len(running["DriverIPAddress"]) > 0
assert running["DriverPid"] > 0
assert len(finished["DriverIPAddress"]) > 0
assert finished["DriverPid"] > 0
p.kill()
# Give the second job time to clean itself up.
time.sleep(1)
@ -253,6 +258,9 @@ ray.shutdown()
assert prev_running["EndTime"] > prev_running["StartTime"] > 0
assert len(prev_running["DriverIPAddress"]) > 0
assert prev_running["DriverPid"] > 0
def test_config_metadata(shutdown_only):
job_config = JobConfig(metadata={"abc": "xyz"})

View file

@ -76,8 +76,6 @@ void GcsJobManager::MarkJobAsFinished(rpc::JobTableData job_table_data,
job_table_data.set_timestamp(time);
job_table_data.set_end_time(time);
job_table_data.set_is_dead(true);
job_table_data.set_driver_ip_address("");
job_table_data.set_driver_pid(-1);
auto on_done = [this, job_id, job_table_data, done_callback](const Status &status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to mark job state, job id = " << job_id;

View file

@ -33,15 +33,6 @@ class MockInMemoryStoreClient : public gcs::InMemoryStoreClient {
public:
explicit MockInMemoryStoreClient(instrumented_io_context &main_io_service)
: gcs::InMemoryStoreClient(main_io_service) {}
Status AsyncPut(const std::string &table_name,
const std::string &key,
const std::string &data,
bool overwrite,
std::function<void(bool)> callback) override {
callback(true);
return Status::OK();
}
};
class GcsJobManagerTest : public ::testing::Test {
@ -89,19 +80,29 @@ TEST_F(GcsJobManagerTest, TestGetJobConfig) {
auto job_id2 = JobID::FromInt(2);
gcs::GcsInitData gcs_init_data(gcs_table_storage_);
gcs_job_manager.Initialize(/*init_data=*/gcs_init_data);
auto add_job_request1 = Mocker::GenAddJobRequest(job_id1, "namespace_1");
rpc::AddJobReply empty_reply;
std::promise<bool> promise1;
std::promise<bool> promise2;
auto add_job_request1 = Mocker::GenAddJobRequest(job_id1, "namespace_1");
gcs_job_manager.HandleAddJob(
*add_job_request1,
&empty_reply,
[](Status, std::function<void()>, std::function<void()>) {});
[&promise1](Status, std::function<void()>, std::function<void()>) {
promise1.set_value(true);
});
promise1.get_future().get();
auto add_job_request2 = Mocker::GenAddJobRequest(job_id2, "namespace_2");
gcs_job_manager.HandleAddJob(
*add_job_request2,
&empty_reply,
[](Status, std::function<void()>, std::function<void()>) {});
[&promise2](Status, std::function<void()>, std::function<void()>) {
promise2.set_value(true);
});
promise2.get_future().get();
auto job_config1 = gcs_job_manager.GetJobConfig(job_id1);
ASSERT_EQ("namespace_1", job_config1->ray_namespace());
@ -109,6 +110,60 @@ TEST_F(GcsJobManagerTest, TestGetJobConfig) {
ASSERT_EQ("namespace_2", job_config2->ray_namespace());
}
TEST_F(GcsJobManagerTest, TestPreserveDriverInfo) {
gcs::GcsJobManager gcs_job_manager(
gcs_table_storage_, gcs_publisher_, runtime_env_manager_, *function_manager_);
auto job_id = JobID::FromInt(1);
gcs::GcsInitData gcs_init_data(gcs_table_storage_);
gcs_job_manager.Initialize(/*init_data=*/gcs_init_data);
auto add_job_request = Mocker::GenAddJobRequest(job_id, "namespace");
add_job_request->mutable_data()->set_driver_ip_address("10.0.0.1");
add_job_request->mutable_data()->set_driver_pid(8264);
rpc::AddJobReply empty_reply;
std::promise<bool> promise;
gcs_job_manager.HandleAddJob(
*add_job_request,
&empty_reply,
[&promise](Status, std::function<void()>, std::function<void()>) {
promise.set_value(true);
});
promise.get_future().get();
rpc::MarkJobFinishedRequest job_finished_request;
rpc::MarkJobFinishedReply job_finished_reply;
std::promise<bool> job_finished_promise;
job_finished_request.set_job_id(JobID::FromInt(1).Binary());
gcs_job_manager.HandleMarkJobFinished(
job_finished_request,
&job_finished_reply,
[&job_finished_promise](Status, std::function<void()>, std::function<void()>) {
job_finished_promise.set_value(true);
});
job_finished_promise.get_future().get();
rpc::GetAllJobInfoRequest all_job_info_request;
rpc::GetAllJobInfoReply all_job_info_reply;
std::promise<bool> all_job_info_promise;
gcs_job_manager.HandleGetAllJobInfo(
all_job_info_request,
&all_job_info_reply,
[&all_job_info_promise](Status, std::function<void()>, std::function<void()>) {
all_job_info_promise.set_value(true);
});
all_job_info_promise.get_future().get();
ASSERT_EQ(all_job_info_reply.job_info_list().size(), 1);
rpc::JobTableData data = all_job_info_reply.job_info_list().Get(0);
ASSERT_EQ(data.driver_ip_address(), "10.0.0.1");
ASSERT_EQ(data.driver_pid(), 8264);
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();