diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 91b81b1ec..69a41302d 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -30,6 +30,7 @@ namespace { std::string store_executable; std::string raylet_executable; int node_manager_port = 0; +std::string raylet_monitor_executable; std::string mock_worker_executable; } // namespace @@ -67,6 +68,11 @@ ActorID CreateActorHelper(CoreWorker &worker, return actor_id; } +std::string MetadataToString(std::shared_ptr obj) { + auto metadata = obj->GetMetadata(); + return std::string(reinterpret_cast(metadata->Data()), metadata->Size()); +} + class CoreWorkerTest : public ::testing::Test { public: CoreWorkerTest(int num_nodes) : gcs_options_("127.0.0.1", 6379, "") { @@ -84,6 +90,10 @@ class CoreWorkerTest : public ::testing::Test { store_socket = StartStore(); } + // core worker test relies on node resources. It's important that one raylet can + // receive the heartbeat from another. So starting raylet monitor is required here. + raylet_monitor_pid_ = StartRayletMonitor("127.0.0.1"); + // start raylet on each node. Assign each node with different resources so that // a task can be scheduled to the desired node. for (int i = 0; i < num_nodes; i++) { @@ -101,6 +111,10 @@ class CoreWorkerTest : public ::testing::Test { for (const auto &store_socket : raylet_store_socket_names_) { StopStore(store_socket); } + + if (!raylet_monitor_pid_.empty()) { + StopRayletMonitor(raylet_monitor_pid_); + } } JobID NextJobId() const { @@ -163,6 +177,26 @@ class CoreWorkerTest : public ::testing::Test { ASSERT_TRUE(system(("rm -rf " + raylet_socket_name + ".pid").c_str()) == 0); } + std::string StartRayletMonitor(std::string redis_address) { + std::string raylet_monitor_pid = + "/tmp/raylet_monitor" + ObjectID::FromRandom().Hex() + ".pid"; + std::string raylet_monitor_start_cmd = raylet_monitor_executable; + raylet_monitor_start_cmd.append(" --redis_address=" + redis_address) + .append(" --redis_port=6379") + .append(" & echo $! > " + raylet_monitor_pid); + + RAY_LOG(DEBUG) << "Raylet monitor Start command: " << raylet_monitor_start_cmd; + RAY_CHECK(system(raylet_monitor_start_cmd.c_str()) == 0); + usleep(200 * 1000); + return raylet_monitor_pid; + } + + void StopRayletMonitor(std::string raylet_monitor_pid) { + std::string kill_9 = "kill -9 `cat " + raylet_monitor_pid + "`"; + RAY_LOG(DEBUG) << kill_9; + ASSERT_TRUE(system(kill_9.c_str()) == 0); + } + void SetUp() {} void TearDown() {} @@ -197,6 +231,7 @@ class CoreWorkerTest : public ::testing::Test { std::vector raylet_socket_names_; std::vector raylet_store_socket_names_; + std::string raylet_monitor_pid_; gcs::GcsClientOptions gcs_options_; }; @@ -314,6 +349,9 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso RAY_CHECK_OK(driver.Get(return_ids, -1, &results)); ASSERT_EQ(results.size(), 1); + ASSERT_TRUE(!results[0]->HasMetadata()) + << "metadata: " << MetadataToString(results[0]) + << ", object ID: " << return_ids[0]; ASSERT_EQ(results[0]->GetData()->Size(), buffer1->Size() + buffer2->Size()); ASSERT_EQ(memcmp(results[0]->GetData()->Data(), buffer1->Data(), buffer1->Size()), 0); @@ -981,10 +1019,11 @@ TEST_F(TwoNodeTest, TestDirectActorTaskCrossNodesFailure) { int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); - RAY_CHECK(argc == 5); + RAY_CHECK(argc == 6); store_executable = std::string(argv[1]); raylet_executable = std::string(argv[2]); node_manager_port = std::stoi(std::string(argv[3])); - mock_worker_executable = std::string(argv[4]); + raylet_monitor_executable = std::string(argv[4]); + mock_worker_executable = std::string(argv[5]); return RUN_ALL_TESTS(); } diff --git a/src/ray/test/run_core_worker_tests.sh b/src/ray/test/run_core_worker_tests.sh index 146090feb..709d30daa 100644 --- a/src/ray/test/run_core_worker_tests.sh +++ b/src/ray/test/run_core_worker_tests.sh @@ -22,7 +22,7 @@ fi set -e set -x -bazel build "//:core_worker_test" "//:mock_worker" "//:raylet" "//:libray_redis_module.so" "@plasma//:plasma_store_server" +bazel build "//:core_worker_test" "//:mock_worker" "//:raylet" "//:raylet_monitor" "//:libray_redis_module.so" "@plasma//:plasma_store_server" # Get the directory in which this script is executing. SCRIPT_DIR="`dirname \"$0\"`" @@ -42,6 +42,7 @@ REDIS_MODULE="./bazel-bin/libray_redis_module.so" LOAD_MODULE_ARGS="--loadmodule ${REDIS_MODULE}" STORE_EXEC="./bazel-bin/external/plasma/plasma_store_server" RAYLET_EXEC="./bazel-bin/raylet" +RAYLET_MONITOR_EXEC="./bazel-bin/raylet_monitor" MOCK_WORKER_EXEC="./bazel-bin/mock_worker" # Allow cleanup commands to fail. @@ -54,7 +55,7 @@ sleep 2s bazel run //:redis-server -- --loglevel warning ${LOAD_MODULE_ARGS} --port 6380 & sleep 2s # Run tests. -./bazel-bin/core_worker_test $STORE_EXEC $RAYLET_EXEC $RAYLET_PORT $MOCK_WORKER_EXEC +./bazel-bin/core_worker_test $STORE_EXEC $RAYLET_EXEC $RAYLET_PORT $RAYLET_MONITOR_EXEC $MOCK_WORKER_EXEC sleep 1s bazel run //:redis-cli -- -p 6379 shutdown bazel run //:redis-cli -- -p 6380 shutdown