Fix c++ gcs test bug (#15063)

* fix ut bug

* fix bug

Co-authored-by: 灵洵 <fengbin.ffb@antgroup.com>
This commit is contained in:
fangfengbin 2021-04-02 00:19:24 +08:00 committed by GitHub
parent 005cff0092
commit 18728b2b7e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 12 additions and 11 deletions

View file

@ -1246,10 +1246,10 @@ TEST_F(ServiceBasedGcsClientTest, TestMultiThreadSubAndUnsub) {
std::vector<std::unique_ptr<std::thread>> threads;
threads.resize(size);
// The number of times each thread executes subscribe & resubscribe & unsubscribe.
// The number of times each thread executes subscribe & unsubscribe.
int sub_and_unsub_loop_count = 20;
// Multithreading subscribe/resubscribe/unsubscribe actors.
// Multithreading subscribe/unsubscribe actors.
auto job_id = JobID::FromInt(1);
for (int index = 0; index < size; ++index) {
threads[index].reset(new std::thread([this, sub_and_unsub_loop_count, job_id] {
@ -1257,7 +1257,6 @@ TEST_F(ServiceBasedGcsClientTest, TestMultiThreadSubAndUnsub) {
auto actor_id = ActorID::Of(job_id, RandomTaskId(), 0);
ASSERT_TRUE(SubscribeActor(
actor_id, [](const ActorID &id, const rpc::ActorTableData &result) {}));
gcs_client_->Actors().AsyncResubscribe(false);
UnsubscribeActor(actor_id);
}
}));
@ -1267,7 +1266,7 @@ TEST_F(ServiceBasedGcsClientTest, TestMultiThreadSubAndUnsub) {
thread.reset();
}
// Multithreading subscribe/resubscribe/unsubscribe objects.
// Multithreading subscribe/unsubscribe objects.
for (int index = 0; index < size; ++index) {
threads[index].reset(new std::thread([this, sub_and_unsub_loop_count] {
for (int index = 0; index < sub_and_unsub_loop_count; ++index) {
@ -1275,7 +1274,6 @@ TEST_F(ServiceBasedGcsClientTest, TestMultiThreadSubAndUnsub) {
ASSERT_TRUE(SubscribeToLocations(
object_id, [](const ObjectID &id,
const std::vector<rpc::ObjectLocationChange> &result) {}));
gcs_client_->Objects().AsyncResubscribe(false);
UnsubscribeToLocations(object_id);
}
}));
@ -1319,16 +1317,17 @@ TEST_F(ServiceBasedGcsClientTest, DISABLED_TestGetActorPerf) {
TEST_F(ServiceBasedGcsClientTest, TestEvictExpiredDestroyedActors) {
// Register actors and the actors will be destroyed.
JobID job_id = JobID::FromInt(1);
absl::flat_hash_set<ActorID> actor_ids;
int actor_count = RayConfig::instance().maximum_gcs_destroyed_actor_cached_count();
for (int index = 0; index < actor_count; ++index) {
auto actor_table_data = Mocker::GenActorTableData(job_id);
RegisterActor(actor_table_data, false);
actor_ids.insert(ActorID::FromBinary(actor_table_data->actor_id()));
}
// Restart GCS.
RestartGcsServer();
absl::flat_hash_set<ActorID> actor_ids;
for (int index = 0; index < actor_count; ++index) {
auto actor_table_data = Mocker::GenActorTableData(job_id);
RegisterActor(actor_table_data, false);

View file

@ -996,14 +996,16 @@ void GcsActorManager::KillActor(const ActorID &actor_id, bool force_kill,
void GcsActorManager::AddDestroyedActorToCache(const std::shared_ptr<GcsActor> &actor) {
if (destroyed_actors_.size() >=
RayConfig::instance().maximum_gcs_destroyed_actor_cached_count()) {
const auto &actor_id = sorted_destroyed_actor_list_.begin()->first;
const auto &actor_id = sorted_destroyed_actor_list_.front().first;
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Delete(actor_id, nullptr));
destroyed_actors_.erase(actor_id);
sorted_destroyed_actor_list_.erase(sorted_destroyed_actor_list_.begin());
sorted_destroyed_actor_list_.pop_front();
}
if (destroyed_actors_.emplace(actor->GetActorID(), actor).second) {
sorted_destroyed_actor_list_.emplace_back(
actor->GetActorID(), (int64_t)actor->GetActorTableData().timestamp());
}
destroyed_actors_.emplace(actor->GetActorID(), actor);
sorted_destroyed_actor_list_.emplace_back(
actor->GetActorID(), (int64_t)actor->GetActorTableData().timestamp());
}
void GcsActorManager::CancelActorInScheduling(const std::shared_ptr<GcsActor> &actor,