[GCS]Random eviction of destroyed actors cached in GCS (#11189)

* add part code

* fix lint error

Co-authored-by: 灵洵 <fengbin.ffb@antfin.com>
This commit is contained in:
fangfengbin 2020-10-10 02:54:47 +08:00 committed by GitHub
parent ca36105d77
commit 3eb2b9e216
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 37 additions and 3 deletions

View file

@ -232,6 +232,8 @@ RAY_CONFIG(uint32_t, gcs_lease_worker_retry_interval_ms, 200)
RAY_CONFIG(uint32_t, gcs_create_actor_retry_interval_ms, 200)
/// Duration to wait between retries for creating placement group in gcs server.
RAY_CONFIG(uint32_t, gcs_create_placement_group_retry_interval_ms, 200)
/// Maximum number of destroyed actors in GCS server memory cache.
RAY_CONFIG(uint32_t, maximum_gcs_destroyed_actor_cached_count, 10000)
/// Maximum number of times to retry putting an object when the plasma store is full.
/// Can be set to -1 to enable unlimited retries.

View file

@ -28,7 +28,8 @@ class ServiceBasedGcsClientTest : public ::testing::Test {
public:
ServiceBasedGcsClientTest() {
RayConfig::instance().initialize(
{{"ping_gcs_rpc_server_max_retries", std::to_string(60)}});
{{"ping_gcs_rpc_server_max_retries", std::to_string(60)},
{"maximum_gcs_destroyed_actor_cached_count", std::to_string(10)}});
TestSetupUtil::StartUpRedisServers(std::vector<int>());
}
@ -1310,6 +1311,23 @@ TEST_F(ServiceBasedGcsClientTest, DISABLED_TestGetActorPerf) {
<< actor_count << " actors.";
}
TEST_F(ServiceBasedGcsClientTest, TestRandomEvictDestroyedActors) {
// Register actors and the actors will be destroyed.
JobID job_id = JobID::FromInt(1);
int actor_count = 20;
for (int index = 0; index < actor_count; ++index) {
auto actor_table_data = Mocker::GenActorTableData(job_id);
RegisterActor(actor_table_data, false);
}
// Get all actors.
auto condition = [this]() {
return GetAllActors().size() ==
RayConfig::instance().maximum_gcs_destroyed_actor_cached_count();
};
EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count()));
}
// TODO(sang): Add tests after adding asyncAdd
} // namespace ray

View file

@ -562,7 +562,7 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id) {
RAY_CHECK(it != registered_actors_.end())
<< "Tried to destroy actor that does not exist " << actor_id;
it->second->GetMutableActorTableData()->mutable_task_spec()->Clear();
destroyed_actors_.emplace(it->first, it->second);
AddDestroyedActorToCache(it->second);
const auto actor = std::move(it->second);
registered_actors_.erase(it);
@ -954,7 +954,7 @@ void GcsActorManager::LoadInitialData(const EmptyCallback &done) {
node_to_workers[actor->GetNodeID()].emplace_back(actor->GetWorkerID());
}
} else {
destroyed_actors_.emplace(item.first, actor);
AddDestroyedActorToCache(actor);
}
}
@ -1098,5 +1098,13 @@ void GcsActorManager::KillActor(const std::shared_ptr<GcsActor> &actor) {
RAY_UNUSED(actor_client->KillActor(request, nullptr));
}
void GcsActorManager::AddDestroyedActorToCache(const std::shared_ptr<GcsActor> &actor) {
if (destroyed_actors_.size() >=
RayConfig::instance().maximum_gcs_destroyed_actor_cached_count()) {
destroyed_actors_.erase(destroyed_actors_.begin());
}
destroyed_actors_.emplace(actor->GetActorID(), actor);
}
} // namespace gcs
} // namespace ray

View file

@ -353,6 +353,12 @@ class GcsActorManager : public rpc::ActorInfoHandler {
/// \param actor The actor to be killed.
void KillActor(const std::shared_ptr<GcsActor> &actor);
/// Add the destroyed actor to the cache. If the cache is full, one actor is randomly
/// evicted.
///
/// \param actor The actor to be killed.
void AddDestroyedActorToCache(const std::shared_ptr<GcsActor> &actor);
/// Callbacks of pending `RegisterActor` requests.
/// Maps actor ID to actor registration callbacks, which is used to filter duplicated
/// messages from a driver/worker caused by some network problems.