mirror of
https://github.com/vale981/ray
synced 2025-03-06 18:41:40 -05:00
fix core worker test hanging due to heartbeat is not working (#6416)
This commit is contained in:
parent
8c1520d18e
commit
3adbe29450
2 changed files with 44 additions and 4 deletions
|
@ -30,6 +30,7 @@ namespace {
|
|||
std::string store_executable;
|
||||
std::string raylet_executable;
|
||||
int node_manager_port = 0;
|
||||
std::string raylet_monitor_executable;
|
||||
std::string mock_worker_executable;
|
||||
|
||||
} // namespace
|
||||
|
@ -67,6 +68,11 @@ ActorID CreateActorHelper(CoreWorker &worker,
|
|||
return actor_id;
|
||||
}
|
||||
|
||||
std::string MetadataToString(std::shared_ptr<RayObject> obj) {
|
||||
auto metadata = obj->GetMetadata();
|
||||
return std::string(reinterpret_cast<const char *>(metadata->Data()), metadata->Size());
|
||||
}
|
||||
|
||||
class CoreWorkerTest : public ::testing::Test {
|
||||
public:
|
||||
CoreWorkerTest(int num_nodes) : gcs_options_("127.0.0.1", 6379, "") {
|
||||
|
@ -84,6 +90,10 @@ class CoreWorkerTest : public ::testing::Test {
|
|||
store_socket = StartStore();
|
||||
}
|
||||
|
||||
// core worker test relies on node resources. It's important that one raylet can
|
||||
// receive the heartbeat from another. So starting raylet monitor is required here.
|
||||
raylet_monitor_pid_ = StartRayletMonitor("127.0.0.1");
|
||||
|
||||
// start raylet on each node. Assign each node with different resources so that
|
||||
// a task can be scheduled to the desired node.
|
||||
for (int i = 0; i < num_nodes; i++) {
|
||||
|
@ -101,6 +111,10 @@ class CoreWorkerTest : public ::testing::Test {
|
|||
for (const auto &store_socket : raylet_store_socket_names_) {
|
||||
StopStore(store_socket);
|
||||
}
|
||||
|
||||
if (!raylet_monitor_pid_.empty()) {
|
||||
StopRayletMonitor(raylet_monitor_pid_);
|
||||
}
|
||||
}
|
||||
|
||||
JobID NextJobId() const {
|
||||
|
@ -163,6 +177,26 @@ class CoreWorkerTest : public ::testing::Test {
|
|||
ASSERT_TRUE(system(("rm -rf " + raylet_socket_name + ".pid").c_str()) == 0);
|
||||
}
|
||||
|
||||
std::string StartRayletMonitor(std::string redis_address) {
|
||||
std::string raylet_monitor_pid =
|
||||
"/tmp/raylet_monitor" + ObjectID::FromRandom().Hex() + ".pid";
|
||||
std::string raylet_monitor_start_cmd = raylet_monitor_executable;
|
||||
raylet_monitor_start_cmd.append(" --redis_address=" + redis_address)
|
||||
.append(" --redis_port=6379")
|
||||
.append(" & echo $! > " + raylet_monitor_pid);
|
||||
|
||||
RAY_LOG(DEBUG) << "Raylet monitor Start command: " << raylet_monitor_start_cmd;
|
||||
RAY_CHECK(system(raylet_monitor_start_cmd.c_str()) == 0);
|
||||
usleep(200 * 1000);
|
||||
return raylet_monitor_pid;
|
||||
}
|
||||
|
||||
void StopRayletMonitor(std::string raylet_monitor_pid) {
|
||||
std::string kill_9 = "kill -9 `cat " + raylet_monitor_pid + "`";
|
||||
RAY_LOG(DEBUG) << kill_9;
|
||||
ASSERT_TRUE(system(kill_9.c_str()) == 0);
|
||||
}
|
||||
|
||||
void SetUp() {}
|
||||
|
||||
void TearDown() {}
|
||||
|
@ -197,6 +231,7 @@ class CoreWorkerTest : public ::testing::Test {
|
|||
|
||||
std::vector<std::string> raylet_socket_names_;
|
||||
std::vector<std::string> raylet_store_socket_names_;
|
||||
std::string raylet_monitor_pid_;
|
||||
gcs::GcsClientOptions gcs_options_;
|
||||
};
|
||||
|
||||
|
@ -314,6 +349,9 @@ void CoreWorkerTest::TestActorTask(std::unordered_map<std::string, double> &reso
|
|||
RAY_CHECK_OK(driver.Get(return_ids, -1, &results));
|
||||
|
||||
ASSERT_EQ(results.size(), 1);
|
||||
ASSERT_TRUE(!results[0]->HasMetadata())
|
||||
<< "metadata: " << MetadataToString(results[0])
|
||||
<< ", object ID: " << return_ids[0];
|
||||
ASSERT_EQ(results[0]->GetData()->Size(), buffer1->Size() + buffer2->Size());
|
||||
ASSERT_EQ(memcmp(results[0]->GetData()->Data(), buffer1->Data(), buffer1->Size()),
|
||||
0);
|
||||
|
@ -981,10 +1019,11 @@ TEST_F(TwoNodeTest, TestDirectActorTaskCrossNodesFailure) {
|
|||
|
||||
int main(int argc, char **argv) {
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
RAY_CHECK(argc == 5);
|
||||
RAY_CHECK(argc == 6);
|
||||
store_executable = std::string(argv[1]);
|
||||
raylet_executable = std::string(argv[2]);
|
||||
node_manager_port = std::stoi(std::string(argv[3]));
|
||||
mock_worker_executable = std::string(argv[4]);
|
||||
raylet_monitor_executable = std::string(argv[4]);
|
||||
mock_worker_executable = std::string(argv[5]);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ fi
|
|||
set -e
|
||||
set -x
|
||||
|
||||
bazel build "//:core_worker_test" "//:mock_worker" "//:raylet" "//:libray_redis_module.so" "@plasma//:plasma_store_server"
|
||||
bazel build "//:core_worker_test" "//:mock_worker" "//:raylet" "//:raylet_monitor" "//:libray_redis_module.so" "@plasma//:plasma_store_server"
|
||||
|
||||
# Get the directory in which this script is executing.
|
||||
SCRIPT_DIR="`dirname \"$0\"`"
|
||||
|
@ -42,6 +42,7 @@ REDIS_MODULE="./bazel-bin/libray_redis_module.so"
|
|||
LOAD_MODULE_ARGS="--loadmodule ${REDIS_MODULE}"
|
||||
STORE_EXEC="./bazel-bin/external/plasma/plasma_store_server"
|
||||
RAYLET_EXEC="./bazel-bin/raylet"
|
||||
RAYLET_MONITOR_EXEC="./bazel-bin/raylet_monitor"
|
||||
MOCK_WORKER_EXEC="./bazel-bin/mock_worker"
|
||||
|
||||
# Allow cleanup commands to fail.
|
||||
|
@ -54,7 +55,7 @@ sleep 2s
|
|||
bazel run //:redis-server -- --loglevel warning ${LOAD_MODULE_ARGS} --port 6380 &
|
||||
sleep 2s
|
||||
# Run tests.
|
||||
./bazel-bin/core_worker_test $STORE_EXEC $RAYLET_EXEC $RAYLET_PORT $MOCK_WORKER_EXEC
|
||||
./bazel-bin/core_worker_test $STORE_EXEC $RAYLET_EXEC $RAYLET_PORT $RAYLET_MONITOR_EXEC $MOCK_WORKER_EXEC
|
||||
sleep 1s
|
||||
bazel run //:redis-cli -- -p 6379 shutdown
|
||||
bazel run //:redis-cli -- -p 6380 shutdown
|
||||
|
|
Loading…
Add table
Reference in a new issue