mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
[GCS]Fix TestActorTableResubscribe bug (#11830)
* fix compile bug * [GCS]Fix TestActorTableResubscribe bug * rm unused code * fix lint error * fix review comment * fix ut bug Co-authored-by: 灵洵 <fengbin.ffb@antfin.com>
This commit is contained in:
parent
64ca30c060
commit
407a212816
1 changed files with 44 additions and 19 deletions
|
@ -188,10 +188,18 @@ class ServiceBasedGcsClientTest : public ::testing::Test {
|
|||
.AsyncRegisterActor(task_spec, [](Status status) {})
|
||||
.ok();
|
||||
}
|
||||
std::promise<bool> promise;
|
||||
RAY_CHECK_OK(gcs_client_->Actors().AsyncRegisterActor(
|
||||
task_spec, [&promise](Status status) { promise.set_value(status.ok()); }));
|
||||
return WaitReady(promise.get_future(), timeout_ms_);
|
||||
|
||||
// NOTE: GCS will not reply when actor registration fails, so when GCS restarts, gcs
|
||||
// client will register the actor again and promise may be set twice.
|
||||
auto promise = std::make_shared<std::promise<bool>>();
|
||||
RAY_CHECK_OK(
|
||||
gcs_client_->Actors().AsyncRegisterActor(task_spec, [promise](Status status) {
|
||||
try {
|
||||
promise->set_value(status.ok());
|
||||
} catch (...) {
|
||||
}
|
||||
}));
|
||||
return WaitReady(promise->get_future(), timeout_ms_);
|
||||
}
|
||||
|
||||
rpc::ActorTableData GetActor(const ActorID &actor_id) {
|
||||
|
@ -208,14 +216,22 @@ class ServiceBasedGcsClientTest : public ::testing::Test {
|
|||
return actor_table_data;
|
||||
}
|
||||
|
||||
std::vector<rpc::ActorTableData> GetAllActors() {
|
||||
std::vector<rpc::ActorTableData> GetAllActors(bool filter_non_dead_actor = false) {
|
||||
std::promise<bool> promise;
|
||||
std::vector<rpc::ActorTableData> actors;
|
||||
RAY_CHECK_OK(gcs_client_->Actors().AsyncGetAll(
|
||||
[&actors, &promise](Status status,
|
||||
const std::vector<rpc::ActorTableData> &result) {
|
||||
[filter_non_dead_actor, &actors, &promise](
|
||||
Status status, const std::vector<rpc::ActorTableData> &result) {
|
||||
if (!result.empty()) {
|
||||
actors.assign(result.begin(), result.end());
|
||||
if (filter_non_dead_actor) {
|
||||
for (auto &iter : result) {
|
||||
if (iter.state() == gcs::ActorTableData::DEAD) {
|
||||
actors.emplace_back(iter);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
actors.assign(result.begin(), result.end());
|
||||
}
|
||||
}
|
||||
promise.set_value(true);
|
||||
}));
|
||||
|
@ -1041,15 +1057,22 @@ TEST_F(ServiceBasedGcsClientTest, TestActorTableResubscribe) {
|
|||
// didn't restart, it will fetch data again from the GCS server. The GCS will destroy
|
||||
// the actor because it finds that the actor is out of scope, so we'll receive another
|
||||
// notification of DEAD state.
|
||||
WaitForExpectedCount(num_subscribe_all_notifications, 3);
|
||||
WaitForExpectedCount(num_subscribe_one_notifications, 3);
|
||||
/// NOTE: GCS will not reply when actor registration fails, so when GCS restarts, gcs
|
||||
/// client will register the actor again. When an actor is registered, the status in GCS
|
||||
/// is `DEPENDENCIES_UNREADY`. When GCS finds that the owner of an actor is nil, it will
|
||||
/// destroy the actor and the status of the actor will change to `DEAD`. The GCS client
|
||||
/// fetch actor info from the GCS server, and the status of the actor may be
|
||||
/// `DEPENDENCIES_UNREADY` or `DEAD`, so we do not assert the actor status here any
|
||||
/// more.
|
||||
|
||||
// NOTE: GCS will not reply when actor registration fails, so when GCS restarts, gcs
|
||||
// client will register the actor again. When an actor is registered, the status in GCS
|
||||
// is `DEPENDENCIES_UNREADY`. When GCS finds that the owner of an actor is nil, it will
|
||||
// destroy the actor and the status of the actor will change to `DEAD`. The GCS client
|
||||
// fetch actor info from the GCS server, and the status of the actor may be
|
||||
// `DEPENDENCIES_UNREADY` or `DEAD`, so we do not assert the actor status here any
|
||||
// more.
|
||||
// If the status of the actor is `DEPENDENCIES_UNREADY`, we will fetch two records, so
|
||||
// `num_subscribe_all_notifications` will be 4. If the status of the actor is `DEAD`, we
|
||||
// will fetch one record, so `num_subscribe_all_notifications` will be 3.
|
||||
auto condition = [&num_subscribe_all_notifications]() {
|
||||
return num_subscribe_all_notifications == 3 || num_subscribe_all_notifications == 4;
|
||||
};
|
||||
EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count()));
|
||||
}
|
||||
|
||||
TEST_F(ServiceBasedGcsClientTest, TestObjectTableResubscribe) {
|
||||
|
@ -1346,14 +1369,16 @@ TEST_F(ServiceBasedGcsClientTest, TestEvictExpiredDestroyedActors) {
|
|||
actor_ids.insert(ActorID::FromBinary(actor_table_data->actor_id()));
|
||||
}
|
||||
|
||||
// Get all actors.
|
||||
// NOTE: GCS will not reply when actor registration fails, so when GCS restarts, gcs
|
||||
// client will register the actor again and the status of the actor may be
|
||||
// `DEPENDENCIES_UNREADY` or `DEAD`. We should get all dead actors.
|
||||
auto condition = [this]() {
|
||||
return GetAllActors().size() ==
|
||||
return GetAllActors(true).size() ==
|
||||
RayConfig::instance().maximum_gcs_destroyed_actor_cached_count();
|
||||
};
|
||||
EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count()));
|
||||
|
||||
auto actors = GetAllActors();
|
||||
auto actors = GetAllActors(true);
|
||||
for (const auto &actor : actors) {
|
||||
EXPECT_TRUE(actor_ids.contains(ActorID::FromBinary(actor.actor_id())));
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue