[core] RuntimeEnv GC in gcs (#14833)

This commit is contained in:
Yi Cheng 2021-05-06 09:31:33 -07:00 committed by GitHub
parent 513b0ed796
commit d5379ba99e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 146 additions and 39 deletions

View file

@ -8,6 +8,7 @@ from pathlib import Path
import ray
from ray.test_utils import (run_string_as_driver,
run_string_as_driver_nonblocking)
import ray.experimental.internal_kv as kv
from time import sleep
driver_script = """
from time import sleep
@ -257,6 +258,7 @@ def test_single_node(ray_start_cluster_head, working_dir, client_mode):
out = run_string_as_driver(script, env)
assert out.strip().split()[-1] == "1000"
assert len(list(Path(PKG_DIR).iterdir())) == 1
assert len(kv._internal_kv_list("gcs://")) == 0
@unittest.skipIf(sys.platform == "win32", "Fail to create temp dir.")
@ -272,6 +274,7 @@ def test_two_node(two_node_cluster, working_dir, client_mode):
out = run_string_as_driver(script, env)
assert out.strip().split()[-1] == "1000"
assert len(list(Path(PKG_DIR).iterdir())) == 1
assert len(kv._internal_kv_list("gcs://")) == 0
@unittest.skipIf(sys.platform == "win32", "Fail to create temp dir.")
@ -307,6 +310,7 @@ print(sum([int(v) for v in vals]))
out = run_string_as_driver(script, env)
assert out.strip().split()[-1] == "1000"
assert len(list(Path(PKG_DIR).iterdir())) == 1
assert len(kv._internal_kv_list("gcs://")) == 0
@unittest.skipIf(sys.platform == "win32", "Fail to create temp dir.")
@ -448,6 +452,9 @@ def test_two_node_uri(two_node_cluster, working_dir, client_mode):
out = run_string_as_driver(script, env)
assert out.strip().split()[-1] == "1000"
assert len(list(Path(PKG_DIR).iterdir())) == 1
# pinned uri will not be deleted
print(list(kv._internal_kv_list("")))
assert len(kv._internal_kv_list("pingcs://")) == 1
@unittest.skipIf(sys.platform == "win32", "Fail to create temp dir.")
@ -465,6 +472,7 @@ print(sum(ray.get([test_actor.one.remote()] * 1000)))
out = run_string_as_driver(script, env)
assert out.strip().split()[-1] == "1000"
assert len(list(Path(PKG_DIR).iterdir())) == 1
assert len(kv._internal_kv_list("gcs://")) == 0
@unittest.skipIf(sys.platform == "win32", "Fail to create temp dir.")
@ -482,6 +490,7 @@ print(sum(ray.get([test_actor.one.remote()] * 1000)))
out = run_string_as_driver(script, env)
assert out.strip().split()[-1] == "1000"
# It's a detached actors, so it should still be there
assert len(kv._internal_kv_list("gcs://")) == 1
assert len(list(Path(PKG_DIR).iterdir())) == 2
pkg_dir = [f for f in Path(PKG_DIR).glob("*") if f.is_dir()][0]
import sys
@ -492,6 +501,7 @@ print(sum(ray.get([test_actor.one.remote()] * 1000)))
from time import sleep
sleep(5)
assert len(list(Path(PKG_DIR).iterdir())) == 1
assert len(kv._internal_kv_list("gcs://")) == 0
@unittest.skipIf(sys.platform == "win32", "Fail to create temp dir.")

View file

@ -2,19 +2,35 @@
#include "ray/util/logging.h"
namespace ray {
void RuntimeEnvManager::AddUriReference(const std::string &hex_id,
void RuntimeEnvManager::AddURIReference(const std::string &hex_id,
const rpc::RuntimeEnv &runtime_env) {
const auto &uris = runtime_env.uris();
for (const auto &uri : uris) {
if (unused_uris_.count(uri)) {
unused_uris_.erase(uri);
}
uri_reference_[uri]++;
id_to_uris_[hex_id].push_back(uri);
AddURIReference(hex_id, uri);
}
}
void RuntimeEnvManager::RemoveUriReference(const std::string &hex_id) {
void RuntimeEnvManager::AddURIReference(const std::string &hex_id,
const std::string &uri) {
if (unused_uris_.count(uri)) {
unused_uris_.erase(uri);
}
uri_reference_[uri]++;
id_to_uris_[hex_id].push_back(uri);
}
const std::vector<std::string> &RuntimeEnvManager::GetReferences(
const std::string &hex_id) const {
static const std::vector<std::string> _default;
auto it = id_to_uris_.find(hex_id);
return it == id_to_uris_.end() ? _default : it->second;
}
void RuntimeEnvManager::RemoveURIReference(const std::string &hex_id) {
if (!id_to_uris_.count(hex_id)) {
return;
}
for (const auto &uri : id_to_uris_[hex_id]) {
--uri_reference_[uri];
auto ref_count = uri_reference_[uri];

View file

@ -24,30 +24,42 @@ namespace ray {
/// 2) local node, where runtime env is fetched
/// We only track the job and detached actor for runtime env. In summary,
/// runtime env will be cleaned up when there is no job or detached actor is
/// using it. The resouce is tracked in uri level. User need to provider
/// using it. The resouce is tracked in URI level. User need to provider
/// a delete handler.
class RuntimeEnvManager {
public:
using DeleteFunc = std::function<void(const std::string &, std::function<void(bool)>)>;
explicit RuntimeEnvManager(DeleteFunc deleter) : deleter_(deleter) {}
/// Increase the reference of uri by job_id and runtime_env.
/// Increase the reference of URI by job_id and runtime_env.
///
/// \param[in] hex_id The id of the runtime env. It can be an actor or job id.
/// \param[in] runtime_env The runtime env used by the id.
void AddUriReference(const std::string &hex_id, const rpc::RuntimeEnv &runtime_env);
void AddURIReference(const std::string &hex_id, const rpc::RuntimeEnv &runtime_env);
/// Decrease the reference of uri by job_id
/// Increase the reference of URI by URI and runtime_env.
///
/// \param[in] hex_id The id of the runtime env. It can be an actor or job id.
/// \param[in] uri The URI referenced by the id.
void AddURIReference(const std::string &hex_id, const std::string &uri);
/// Get the reference of URIs by id.
///
/// \param[in] hex_id The id of to look.
/// \return The URIs referenced by the id.
const std::vector<std::string> &GetReferences(const std::string &hex_id) const;
/// Decrease the reference of URI by job_id
/// \param[in] hex_id The id of the runtime env.
void RemoveUriReference(const std::string &hex_id);
void RemoveURIReference(const std::string &hex_id);
private:
DeleteFunc deleter_;
/// Reference counting of a uri.
/// Reference counting of a URI.
std::unordered_map<std::string, int64_t> uri_reference_;
/// A map between hex_id and uri.
/// A map between hex_id and URI.
std::unordered_map<std::string, std::vector<std::string>> id_to_uris_;
/// A set of unused uris
/// A set of unused URIs
std::unordered_set<std::string> unused_uris_;
};
} // namespace ray

View file

@ -87,14 +87,15 @@ rpc::ActorTableData *GcsActor::GetMutableActorTableData() { return &actor_table_
GcsActorManager::GcsActorManager(
std::shared_ptr<GcsActorSchedulerInterface> scheduler,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub, RuntimeEnvManager &runtime_env_manager,
std::function<void(const ActorID &)> destroy_owned_placement_group_if_needed,
const rpc::ClientFactoryFn &worker_client_factory)
: gcs_actor_scheduler_(std::move(scheduler)),
gcs_table_storage_(std::move(gcs_table_storage)),
gcs_pub_sub_(std::move(gcs_pub_sub)),
worker_client_factory_(worker_client_factory),
destroy_owned_placement_group_if_needed_(destroy_owned_placement_group_if_needed) {
destroy_owned_placement_group_if_needed_(destroy_owned_placement_group_if_needed),
runtime_env_manager_(runtime_env_manager) {
RAY_CHECK(worker_client_factory_);
RAY_CHECK(destroy_owned_placement_group_if_needed_);
}
@ -284,6 +285,14 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ
// This actor is owned. Send a long polling request to the actor's
// owner to determine when the actor should be removed.
PollOwnerForActorOutOfScope(actor);
} else {
// If it's a detached actor, we need to register the runtime env it used to GC
auto job_id = JobID::FromBinary(request.task_spec().job_id());
const auto &uris = runtime_env_manager_.GetReferences(job_id.Hex());
auto actor_id_hex = actor->GetActorID().Hex();
for (const auto &uri : uris) {
runtime_env_manager_.AddURIReference(actor_id_hex, uri);
}
}
// The backend storage is supposed to be reliable, so the status must be ok.
@ -445,11 +454,14 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id) {
it->second->GetMutableActorTableData()->set_timestamp(current_sys_time_ms());
AddDestroyedActorToCache(it->second);
const auto actor = std::move(it->second);
registered_actors_.erase(it);
// Clean up the client to the actor's owner, if necessary.
if (!actor->IsDetached()) {
RemoveActorFromOwner(actor);
} else {
runtime_env_manager_.RemoveURIReference(actor->GetActorID().Hex());
}
// Remove actor from `named_actors_` if its name is not empty.

View file

@ -18,6 +18,7 @@
#include "absl/container/flat_hash_map.h"
#include "ray/common/id.h"
#include "ray/common/runtime_env_manager.h"
#include "ray/common/task/task_execution_spec.h"
#include "ray/common/task/task_spec.h"
#include "ray/gcs/gcs_server/gcs_actor_scheduler.h"
@ -164,7 +165,7 @@ class GcsActorManager : public rpc::ActorInfoHandler {
GcsActorManager(
std::shared_ptr<GcsActorSchedulerInterface> scheduler,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub, RuntimeEnvManager &runtime_env_manager,
std::function<void(const ActorID &)> destroy_ownded_placement_group_if_needed,
const rpc::ClientFactoryFn &worker_client_factory = nullptr);
@ -442,6 +443,7 @@ class GcsActorManager : public rpc::ActorInfoHandler {
/// actor destroy process.
std::function<void(const ActorID &)> destroy_owned_placement_group_if_needed_;
RuntimeEnvManager &runtime_env_manager_;
// Debug info.
enum CountType {
REGISTER_ACTOR_REQUEST = 0,

View file

@ -33,6 +33,10 @@ void GcsJobManager::HandleAddJob(const rpc::AddJobRequest &request,
} else {
RAY_CHECK_OK(gcs_pub_sub_->Publish(JOB_CHANNEL, job_id.Hex(),
request.data().SerializeAsString(), nullptr));
if (request.data().config().has_runtime_env()) {
runtime_env_manager_.AddURIReference(job_id.Hex(),
request.data().config().runtime_env());
}
RAY_LOG(INFO) << "Finished adding job, job id = " << job_id
<< ", driver pid = " << request.data().driver_pid();
}
@ -59,6 +63,7 @@ void GcsJobManager::HandleMarkJobFinished(const rpc::MarkJobFinishedRequest &req
} else {
RAY_CHECK_OK(gcs_pub_sub_->Publish(JOB_CHANNEL, job_id.Hex(),
job_table_data->SerializeAsString(), nullptr));
runtime_env_manager_.RemoveURIReference(job_id.Hex());
ClearJobInfos(job_id);
RAY_LOG(INFO) << "Finished marking job state, job id = " << job_id;
}

View file

@ -14,6 +14,7 @@
#pragma once
#include "ray/common/runtime_env_manager.h"
#include "ray/gcs/gcs_server/gcs_object_manager.h"
#include "ray/gcs/gcs_server/gcs_table_storage.h"
#include "ray/gcs/pubsub/gcs_pub_sub.h"
@ -26,9 +27,11 @@ namespace gcs {
class GcsJobManager : public rpc::JobInfoHandler {
public:
explicit GcsJobManager(std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub)
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
RuntimeEnvManager &runtime_env_manager)
: gcs_table_storage_(std::move(gcs_table_storage)),
gcs_pub_sub_(std::move(gcs_pub_sub)) {}
gcs_pub_sub_(std::move(gcs_pub_sub)),
runtime_env_manager_(runtime_env_manager) {}
void HandleAddJob(const rpc::AddJobRequest &request, rpc::AddJobReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
@ -54,7 +57,7 @@ class GcsJobManager : public rpc::JobInfoHandler {
/// Listeners which monitors the finish of jobs.
std::vector<std::function<void(std::shared_ptr<JobID>)>> job_finished_listeners_;
ray::RuntimeEnvManager &runtime_env_manager_;
void ClearJobInfos(const JobID &job_id);
};

View file

@ -34,12 +34,17 @@ void GcsInternalKVManager::HandleInternalKVPut(
void GcsInternalKVManager::HandleInternalKVDel(
const rpc::InternalKVDelRequest &request, rpc::InternalKVDelReply *reply,
rpc::SendReplyCallback send_reply_callback) {
std::vector<std::string> cmd = {"HDEL", request.key(), "value"};
InternalKVDelAsync(request.key(), [reply, send_reply_callback](int deleted_num) {
reply->set_deleted_num(deleted_num);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
});
}
void GcsInternalKVManager::InternalKVDelAsync(const std::string &key,
std::function<void(int)> cb) {
std::vector<std::string> cmd = {"HDEL", key, "value"};
RAY_CHECK_OK(redis_client_->GetPrimaryContext()->RunArgvAsync(
cmd, [reply, send_reply_callback](auto redis_reply) {
reply->set_deleted_num(redis_reply->ReadAsInteger());
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
}));
cmd, [cb](auto redis_reply) { cb(redis_reply->ReadAsInteger()); }));
}
void GcsInternalKVManager::HandleInternalKVExists(

View file

@ -39,6 +39,8 @@ class GcsInternalKVManager : public rpc::InternalKVHandler {
rpc::InternalKVDelReply *reply,
rpc::SendReplyCallback send_reply_callback);
void InternalKVDelAsync(const std::string &key, std::function<void(int)> cb);
void HandleInternalKVExists(const rpc::InternalKVExistsRequest &request,
rpc::InternalKVExistsReply *reply,
rpc::SendReplyCallback send_reply_callback);

View file

@ -80,6 +80,12 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) {
// Init gcs heartbeat manager.
InitGcsHeartbeatManager(gcs_init_data);
// Init KV Manager
InitKVManager();
// Init RuntimeENv manager
InitRuntimeEnvManager();
// Init gcs job manager.
InitGcsJobManager();
@ -101,9 +107,6 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) {
// Init stats handler.
InitStatsHandler();
// Init KV Manager
InitKVManager();
// Init resource report polling.
InitResourceReportPolling(gcs_init_data);
@ -206,9 +209,11 @@ void GcsServer::InitGcsResourceScheduler() {
void GcsServer::InitGcsJobManager() {
RAY_CHECK(gcs_table_storage_ && gcs_pub_sub_);
gcs_job_manager_.reset(new GcsJobManager(gcs_table_storage_, gcs_pub_sub_));
gcs_job_manager_ = std::make_unique<GcsJobManager>(gcs_table_storage_, gcs_pub_sub_,
*runtime_env_manager_);
// Register service.
job_info_service_.reset(new rpc::JobInfoGrpcService(main_service_, *gcs_job_manager_));
job_info_service_ =
std::make_unique<rpc::JobInfoGrpcService>(main_service_, *gcs_job_manager_);
rpc_server_.RegisterService(*job_info_service_);
}
@ -236,7 +241,7 @@ void GcsServer::InitGcsActorManager(const GcsInitData &gcs_init_data) {
return std::make_shared<rpc::CoreWorkerClient>(address, client_call_manager_);
});
gcs_actor_manager_ = std::make_shared<GcsActorManager>(
scheduler, gcs_table_storage_, gcs_pub_sub_,
scheduler, gcs_table_storage_, gcs_pub_sub_, *runtime_env_manager_,
[this](const ActorID &actor_id) {
gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenActorDead(actor_id);
},
@ -347,6 +352,32 @@ void GcsServer::InitKVManager() {
rpc_server_.RegisterService(*kv_service_);
}
void GcsServer::InitRuntimeEnvManager() {
runtime_env_manager_ =
std::make_unique<RuntimeEnvManager>([this](const std::string &uri, auto cb) {
std::string sep = "://";
auto pos = uri.find(sep);
if (pos == std::string::npos || pos + sep.size() == uri.size()) {
RAY_LOG(ERROR) << "Invalid uri: " << uri;
cb(false);
} else {
auto scheme = uri.substr(0, pos);
if (scheme != "gcs") {
// Skip other uri
cb(true);
} else {
this->kv_manager_->InternalKVDelAsync(uri, [cb](int deleted_num) {
if (deleted_num == 0) {
cb(false);
} else {
cb(true);
}
});
}
}
});
}
void GcsServer::InitGcsWorkerManager() {
gcs_worker_manager_ =
std::make_unique<GcsWorkerManager>(gcs_table_storage_, gcs_pub_sub_);

View file

@ -15,6 +15,7 @@
#pragma once
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/runtime_env_manager.h"
#include "ray/gcs/gcs_server/gcs_heartbeat_manager.h"
#include "ray/gcs/gcs_server/gcs_init_data.h"
#include "ray/gcs/gcs_server/gcs_kv_manager.h"
@ -120,6 +121,9 @@ class GcsServer {
/// Initialize KV manager.
void InitKVManager();
// Init RuntimeENv manager
void InitRuntimeEnvManager();
/// Initialize resource report polling.
void InitResourceReportPolling(const GcsInitData &gcs_init_data);
@ -212,6 +216,7 @@ class GcsServer {
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
/// The gcs table storage.
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
std::unique_ptr<ray::RuntimeEnvManager> runtime_env_manager_;
/// Gcs service state flag, which is used for ut.
bool is_started_ = false;
bool is_stopped_ = false;

View file

@ -95,12 +95,13 @@ class GcsActorManagerTest : public ::testing::Test {
}));
promise.get_future().get();
worker_client_ = std::make_shared<MockWorkerClient>(io_service_);
runtime_env_mgr_ =
std::make_unique<ray::RuntimeEnvManager>([](auto, auto f) { f(true); });
gcs_pub_sub_ = std::make_shared<GcsServerMocker::MockGcsPubSub>(redis_client_);
store_client_ = std::make_shared<gcs::InMemoryStoreClient>(io_service_);
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
gcs_actor_manager_.reset(new gcs::GcsActorManager(
mock_actor_scheduler_, gcs_table_storage_, gcs_pub_sub_,
mock_actor_scheduler_, gcs_table_storage_, gcs_pub_sub_, *runtime_env_mgr_,
[](const ActorID &actor_id) {},
[this](const rpc::Address &addr) { return worker_client_; }));
}
@ -185,7 +186,7 @@ class GcsActorManagerTest : public ::testing::Test {
std::unique_ptr<gcs::GcsActorManager> gcs_actor_manager_;
std::shared_ptr<GcsServerMocker::MockGcsPubSub> gcs_pub_sub_;
std::shared_ptr<gcs::RedisClient> redis_client_;
std::unique_ptr<ray::RuntimeEnvManager> runtime_env_mgr_;
const std::chrono::milliseconds timeout_ms_{2000};
};

View file

@ -149,6 +149,9 @@ message ActorTableData {
// The exception thrown in creation task. This field is set if this actor died because
// of exception thrown in creation task. Only applies when state=DEAD.
RayException creation_task_exception = 18;
// Runtime required to run this actor
// It'll only be set if it's a detached actor and the original job has this field
RuntimeEnv runtime_env = 19;
}
message ErrorTableData {

View file

@ -480,7 +480,7 @@ void NodeManager::HandleJobStarted(const JobID &job_id, const JobTableData &job_
RAY_CHECK(!job_data.is_dead());
worker_pool_.HandleJobStarted(job_id, job_data.config());
runtime_env_manager_.AddUriReference(job_id.Hex(), job_data.config().runtime_env());
runtime_env_manager_.AddURIReference(job_id.Hex(), job_data.config().runtime_env());
// Tasks of this job may already arrived but failed to pop a worker because the job
// config is not local yet. So we trigger dispatching again here to try to
// reschedule these tasks.
@ -506,7 +506,7 @@ void NodeManager::HandleJobFinished(const JobID &job_id, const JobTableData &job
KillWorker(worker);
}
}
runtime_env_manager_.RemoveUriReference(job_id.Hex());
runtime_env_manager_.RemoveURIReference(job_id.Hex());
}
void NodeManager::FillResourceReport(rpc::ResourcesData &resources_data) {
@ -1206,7 +1206,7 @@ void NodeManager::DisconnectClient(
}
if (worker->IsDetachedActor()) {
runtime_env_manager_.RemoveUriReference(actor_id.Hex());
runtime_env_manager_.RemoveURIReference(actor_id.Hex());
}
if (disconnect_type == rpc::WorkerExitType::SYSTEM_ERROR_EXIT) {
@ -1795,7 +1795,7 @@ void NodeManager::FinishAssignedActorCreationTask(WorkerInterface &worker,
auto job_id = task.GetTaskSpecification().JobId();
auto job_config = worker_pool_.GetJobConfig(job_id);
RAY_CHECK(job_config);
runtime_env_manager_.AddUriReference(actor_id.Hex(), job_config->runtime_env());
runtime_env_manager_.AddURIReference(actor_id.Hex(), job_config->runtime_env());
;
}
}