diff --git a/python/ray/tests/test_job.py b/python/ray/tests/test_job.py index 466433235..a09dcdfcb 100644 --- a/python/ray/tests/test_job.py +++ b/python/ray/tests/test_job.py @@ -1,11 +1,10 @@ -import time import os +import subprocess import sys import tempfile -import subprocess +import time import ray -from ray.job_config import JobConfig import ray._private.gcs_utils as gcs_utils from ray._private.test_utils import ( run_string_as_driver, @@ -13,6 +12,7 @@ from ray._private.test_utils import ( wait_for_condition, wait_for_num_actors, ) +from ray.job_config import JobConfig def test_job_isolation(call_ray_start): @@ -181,7 +181,7 @@ ray.get(_.value.remote()) assert ray.get(detached_actor.value.remote()) == 1 -def test_job_timestamps(ray_start_regular): +def test_job_observability(ray_start_regular): driver_template = """ import ray from time import sleep @@ -235,6 +235,11 @@ ray.shutdown() assert running["StartTime"] > 0 assert running["EndTime"] == 0 + assert len(running["DriverIPAddress"]) > 0 + assert running["DriverPid"] > 0 + assert len(finished["DriverIPAddress"]) > 0 + assert finished["DriverPid"] > 0 + p.kill() # Give the second job time to clean itself up. time.sleep(1) @@ -253,6 +258,9 @@ ray.shutdown() assert prev_running["EndTime"] > prev_running["StartTime"] > 0 + assert len(prev_running["DriverIPAddress"]) > 0 + assert prev_running["DriverPid"] > 0 + def test_config_metadata(shutdown_only): job_config = JobConfig(metadata={"abc": "xyz"}) diff --git a/src/ray/gcs/gcs_server/gcs_job_manager.cc b/src/ray/gcs/gcs_server/gcs_job_manager.cc index 21c5bd9ea..16d777a27 100644 --- a/src/ray/gcs/gcs_server/gcs_job_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_job_manager.cc @@ -76,8 +76,6 @@ void GcsJobManager::MarkJobAsFinished(rpc::JobTableData job_table_data, job_table_data.set_timestamp(time); job_table_data.set_end_time(time); job_table_data.set_is_dead(true); - job_table_data.set_driver_ip_address(""); - job_table_data.set_driver_pid(-1); auto on_done = [this, job_id, job_table_data, done_callback](const Status &status) { if (!status.ok()) { RAY_LOG(ERROR) << "Failed to mark job state, job id = " << job_id; diff --git a/src/ray/gcs/gcs_server/test/gcs_job_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_job_manager_test.cc index 506da34a1..af99351f2 100644 --- a/src/ray/gcs/gcs_server/test/gcs_job_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_job_manager_test.cc @@ -33,15 +33,6 @@ class MockInMemoryStoreClient : public gcs::InMemoryStoreClient { public: explicit MockInMemoryStoreClient(instrumented_io_context &main_io_service) : gcs::InMemoryStoreClient(main_io_service) {} - - Status AsyncPut(const std::string &table_name, - const std::string &key, - const std::string &data, - bool overwrite, - std::function callback) override { - callback(true); - return Status::OK(); - } }; class GcsJobManagerTest : public ::testing::Test { @@ -89,19 +80,29 @@ TEST_F(GcsJobManagerTest, TestGetJobConfig) { auto job_id2 = JobID::FromInt(2); gcs::GcsInitData gcs_init_data(gcs_table_storage_); gcs_job_manager.Initialize(/*init_data=*/gcs_init_data); - auto add_job_request1 = Mocker::GenAddJobRequest(job_id1, "namespace_1"); rpc::AddJobReply empty_reply; + std::promise promise1; + std::promise promise2; + auto add_job_request1 = Mocker::GenAddJobRequest(job_id1, "namespace_1"); gcs_job_manager.HandleAddJob( *add_job_request1, &empty_reply, - [](Status, std::function, std::function) {}); + [&promise1](Status, std::function, std::function) { + promise1.set_value(true); + }); + promise1.get_future().get(); + auto add_job_request2 = Mocker::GenAddJobRequest(job_id2, "namespace_2"); gcs_job_manager.HandleAddJob( *add_job_request2, &empty_reply, - [](Status, std::function, std::function) {}); + [&promise2](Status, std::function, std::function) { + promise2.set_value(true); + }); + promise2.get_future().get(); + auto job_config1 = gcs_job_manager.GetJobConfig(job_id1); ASSERT_EQ("namespace_1", job_config1->ray_namespace()); @@ -109,6 +110,60 @@ TEST_F(GcsJobManagerTest, TestGetJobConfig) { ASSERT_EQ("namespace_2", job_config2->ray_namespace()); } +TEST_F(GcsJobManagerTest, TestPreserveDriverInfo) { + gcs::GcsJobManager gcs_job_manager( + gcs_table_storage_, gcs_publisher_, runtime_env_manager_, *function_manager_); + + auto job_id = JobID::FromInt(1); + gcs::GcsInitData gcs_init_data(gcs_table_storage_); + gcs_job_manager.Initialize(/*init_data=*/gcs_init_data); + auto add_job_request = Mocker::GenAddJobRequest(job_id, "namespace"); + add_job_request->mutable_data()->set_driver_ip_address("10.0.0.1"); + add_job_request->mutable_data()->set_driver_pid(8264); + + rpc::AddJobReply empty_reply; + std::promise promise; + + gcs_job_manager.HandleAddJob( + *add_job_request, + &empty_reply, + [&promise](Status, std::function, std::function) { + promise.set_value(true); + }); + promise.get_future().get(); + + rpc::MarkJobFinishedRequest job_finished_request; + rpc::MarkJobFinishedReply job_finished_reply; + std::promise job_finished_promise; + + job_finished_request.set_job_id(JobID::FromInt(1).Binary()); + + gcs_job_manager.HandleMarkJobFinished( + job_finished_request, + &job_finished_reply, + [&job_finished_promise](Status, std::function, std::function) { + job_finished_promise.set_value(true); + }); + job_finished_promise.get_future().get(); + + rpc::GetAllJobInfoRequest all_job_info_request; + rpc::GetAllJobInfoReply all_job_info_reply; + std::promise all_job_info_promise; + + gcs_job_manager.HandleGetAllJobInfo( + all_job_info_request, + &all_job_info_reply, + [&all_job_info_promise](Status, std::function, std::function) { + all_job_info_promise.set_value(true); + }); + all_job_info_promise.get_future().get(); + + ASSERT_EQ(all_job_info_reply.job_info_list().size(), 1); + rpc::JobTableData data = all_job_info_reply.job_info_list().Get(0); + ASSERT_EQ(data.driver_ip_address(), "10.0.0.1"); + ASSERT_EQ(data.driver_pid(), 8264); +} + int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS();