diff --git a/python/ray/tests/test_runtime_env.py b/python/ray/tests/test_runtime_env.py index 1ceba5b82..174e01e90 100644 --- a/python/ray/tests/test_runtime_env.py +++ b/python/ray/tests/test_runtime_env.py @@ -8,6 +8,7 @@ from pathlib import Path import ray from ray.test_utils import (run_string_as_driver, run_string_as_driver_nonblocking) +import ray.experimental.internal_kv as kv from time import sleep driver_script = """ from time import sleep @@ -257,6 +258,7 @@ def test_single_node(ray_start_cluster_head, working_dir, client_mode): out = run_string_as_driver(script, env) assert out.strip().split()[-1] == "1000" assert len(list(Path(PKG_DIR).iterdir())) == 1 + assert len(kv._internal_kv_list("gcs://")) == 0 @unittest.skipIf(sys.platform == "win32", "Fail to create temp dir.") @@ -272,6 +274,7 @@ def test_two_node(two_node_cluster, working_dir, client_mode): out = run_string_as_driver(script, env) assert out.strip().split()[-1] == "1000" assert len(list(Path(PKG_DIR).iterdir())) == 1 + assert len(kv._internal_kv_list("gcs://")) == 0 @unittest.skipIf(sys.platform == "win32", "Fail to create temp dir.") @@ -307,6 +310,7 @@ print(sum([int(v) for v in vals])) out = run_string_as_driver(script, env) assert out.strip().split()[-1] == "1000" assert len(list(Path(PKG_DIR).iterdir())) == 1 + assert len(kv._internal_kv_list("gcs://")) == 0 @unittest.skipIf(sys.platform == "win32", "Fail to create temp dir.") @@ -448,6 +452,9 @@ def test_two_node_uri(two_node_cluster, working_dir, client_mode): out = run_string_as_driver(script, env) assert out.strip().split()[-1] == "1000" assert len(list(Path(PKG_DIR).iterdir())) == 1 + # pinned uri will not be deleted + print(list(kv._internal_kv_list(""))) + assert len(kv._internal_kv_list("pingcs://")) == 1 @unittest.skipIf(sys.platform == "win32", "Fail to create temp dir.") @@ -465,6 +472,7 @@ print(sum(ray.get([test_actor.one.remote()] * 1000))) out = run_string_as_driver(script, env) assert out.strip().split()[-1] == "1000" assert len(list(Path(PKG_DIR).iterdir())) == 1 + assert len(kv._internal_kv_list("gcs://")) == 0 @unittest.skipIf(sys.platform == "win32", "Fail to create temp dir.") @@ -482,6 +490,7 @@ print(sum(ray.get([test_actor.one.remote()] * 1000))) out = run_string_as_driver(script, env) assert out.strip().split()[-1] == "1000" # It's a detached actors, so it should still be there + assert len(kv._internal_kv_list("gcs://")) == 1 assert len(list(Path(PKG_DIR).iterdir())) == 2 pkg_dir = [f for f in Path(PKG_DIR).glob("*") if f.is_dir()][0] import sys @@ -492,6 +501,7 @@ print(sum(ray.get([test_actor.one.remote()] * 1000))) from time import sleep sleep(5) assert len(list(Path(PKG_DIR).iterdir())) == 1 + assert len(kv._internal_kv_list("gcs://")) == 0 @unittest.skipIf(sys.platform == "win32", "Fail to create temp dir.") diff --git a/src/ray/common/runtime_env_manager.cc b/src/ray/common/runtime_env_manager.cc index fdd81d472..658e9c47b 100644 --- a/src/ray/common/runtime_env_manager.cc +++ b/src/ray/common/runtime_env_manager.cc @@ -2,19 +2,35 @@ #include "ray/util/logging.h" namespace ray { -void RuntimeEnvManager::AddUriReference(const std::string &hex_id, +void RuntimeEnvManager::AddURIReference(const std::string &hex_id, const rpc::RuntimeEnv &runtime_env) { const auto &uris = runtime_env.uris(); for (const auto &uri : uris) { - if (unused_uris_.count(uri)) { - unused_uris_.erase(uri); - } - uri_reference_[uri]++; - id_to_uris_[hex_id].push_back(uri); + AddURIReference(hex_id, uri); } } -void RuntimeEnvManager::RemoveUriReference(const std::string &hex_id) { +void RuntimeEnvManager::AddURIReference(const std::string &hex_id, + const std::string &uri) { + if (unused_uris_.count(uri)) { + unused_uris_.erase(uri); + } + uri_reference_[uri]++; + id_to_uris_[hex_id].push_back(uri); +} + +const std::vector &RuntimeEnvManager::GetReferences( + const std::string &hex_id) const { + static const std::vector _default; + auto it = id_to_uris_.find(hex_id); + return it == id_to_uris_.end() ? _default : it->second; +} + +void RuntimeEnvManager::RemoveURIReference(const std::string &hex_id) { + if (!id_to_uris_.count(hex_id)) { + return; + } + for (const auto &uri : id_to_uris_[hex_id]) { --uri_reference_[uri]; auto ref_count = uri_reference_[uri]; diff --git a/src/ray/common/runtime_env_manager.h b/src/ray/common/runtime_env_manager.h index 59d0a536c..510aa5fe5 100644 --- a/src/ray/common/runtime_env_manager.h +++ b/src/ray/common/runtime_env_manager.h @@ -24,30 +24,42 @@ namespace ray { /// 2) local node, where runtime env is fetched /// We only track the job and detached actor for runtime env. In summary, /// runtime env will be cleaned up when there is no job or detached actor is -/// using it. The resouce is tracked in uri level. User need to provider +/// using it. The resouce is tracked in URI level. User need to provider /// a delete handler. class RuntimeEnvManager { public: using DeleteFunc = std::function)>; explicit RuntimeEnvManager(DeleteFunc deleter) : deleter_(deleter) {} - /// Increase the reference of uri by job_id and runtime_env. + /// Increase the reference of URI by job_id and runtime_env. /// /// \param[in] hex_id The id of the runtime env. It can be an actor or job id. /// \param[in] runtime_env The runtime env used by the id. - void AddUriReference(const std::string &hex_id, const rpc::RuntimeEnv &runtime_env); + void AddURIReference(const std::string &hex_id, const rpc::RuntimeEnv &runtime_env); - /// Decrease the reference of uri by job_id + /// Increase the reference of URI by URI and runtime_env. + /// + /// \param[in] hex_id The id of the runtime env. It can be an actor or job id. + /// \param[in] uri The URI referenced by the id. + void AddURIReference(const std::string &hex_id, const std::string &uri); + + /// Get the reference of URIs by id. + /// + /// \param[in] hex_id The id of to look. + /// \return The URIs referenced by the id. + const std::vector &GetReferences(const std::string &hex_id) const; + + /// Decrease the reference of URI by job_id /// \param[in] hex_id The id of the runtime env. - void RemoveUriReference(const std::string &hex_id); + void RemoveURIReference(const std::string &hex_id); private: DeleteFunc deleter_; - /// Reference counting of a uri. + /// Reference counting of a URI. std::unordered_map uri_reference_; - /// A map between hex_id and uri. + /// A map between hex_id and URI. std::unordered_map> id_to_uris_; - /// A set of unused uris + /// A set of unused URIs std::unordered_set unused_uris_; }; } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index a225ccafd..956527648 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -87,14 +87,15 @@ rpc::ActorTableData *GcsActor::GetMutableActorTableData() { return &actor_table_ GcsActorManager::GcsActorManager( std::shared_ptr scheduler, std::shared_ptr gcs_table_storage, - std::shared_ptr gcs_pub_sub, + std::shared_ptr gcs_pub_sub, RuntimeEnvManager &runtime_env_manager, std::function destroy_owned_placement_group_if_needed, const rpc::ClientFactoryFn &worker_client_factory) : gcs_actor_scheduler_(std::move(scheduler)), gcs_table_storage_(std::move(gcs_table_storage)), gcs_pub_sub_(std::move(gcs_pub_sub)), worker_client_factory_(worker_client_factory), - destroy_owned_placement_group_if_needed_(destroy_owned_placement_group_if_needed) { + destroy_owned_placement_group_if_needed_(destroy_owned_placement_group_if_needed), + runtime_env_manager_(runtime_env_manager) { RAY_CHECK(worker_client_factory_); RAY_CHECK(destroy_owned_placement_group_if_needed_); } @@ -284,6 +285,14 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ // This actor is owned. Send a long polling request to the actor's // owner to determine when the actor should be removed. PollOwnerForActorOutOfScope(actor); + } else { + // If it's a detached actor, we need to register the runtime env it used to GC + auto job_id = JobID::FromBinary(request.task_spec().job_id()); + const auto &uris = runtime_env_manager_.GetReferences(job_id.Hex()); + auto actor_id_hex = actor->GetActorID().Hex(); + for (const auto &uri : uris) { + runtime_env_manager_.AddURIReference(actor_id_hex, uri); + } } // The backend storage is supposed to be reliable, so the status must be ok. @@ -445,11 +454,14 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id) { it->second->GetMutableActorTableData()->set_timestamp(current_sys_time_ms()); AddDestroyedActorToCache(it->second); const auto actor = std::move(it->second); + registered_actors_.erase(it); // Clean up the client to the actor's owner, if necessary. if (!actor->IsDetached()) { RemoveActorFromOwner(actor); + } else { + runtime_env_manager_.RemoveURIReference(actor->GetActorID().Hex()); } // Remove actor from `named_actors_` if its name is not empty. diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index 78d0908d1..770aed0ca 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -18,6 +18,7 @@ #include "absl/container/flat_hash_map.h" #include "ray/common/id.h" +#include "ray/common/runtime_env_manager.h" #include "ray/common/task/task_execution_spec.h" #include "ray/common/task/task_spec.h" #include "ray/gcs/gcs_server/gcs_actor_scheduler.h" @@ -164,7 +165,7 @@ class GcsActorManager : public rpc::ActorInfoHandler { GcsActorManager( std::shared_ptr scheduler, std::shared_ptr gcs_table_storage, - std::shared_ptr gcs_pub_sub, + std::shared_ptr gcs_pub_sub, RuntimeEnvManager &runtime_env_manager, std::function destroy_ownded_placement_group_if_needed, const rpc::ClientFactoryFn &worker_client_factory = nullptr); @@ -442,6 +443,7 @@ class GcsActorManager : public rpc::ActorInfoHandler { /// actor destroy process. std::function destroy_owned_placement_group_if_needed_; + RuntimeEnvManager &runtime_env_manager_; // Debug info. enum CountType { REGISTER_ACTOR_REQUEST = 0, diff --git a/src/ray/gcs/gcs_server/gcs_job_manager.cc b/src/ray/gcs/gcs_server/gcs_job_manager.cc index c9ab057a0..6275b52a4 100644 --- a/src/ray/gcs/gcs_server/gcs_job_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_job_manager.cc @@ -33,6 +33,10 @@ void GcsJobManager::HandleAddJob(const rpc::AddJobRequest &request, } else { RAY_CHECK_OK(gcs_pub_sub_->Publish(JOB_CHANNEL, job_id.Hex(), request.data().SerializeAsString(), nullptr)); + if (request.data().config().has_runtime_env()) { + runtime_env_manager_.AddURIReference(job_id.Hex(), + request.data().config().runtime_env()); + } RAY_LOG(INFO) << "Finished adding job, job id = " << job_id << ", driver pid = " << request.data().driver_pid(); } @@ -59,6 +63,7 @@ void GcsJobManager::HandleMarkJobFinished(const rpc::MarkJobFinishedRequest &req } else { RAY_CHECK_OK(gcs_pub_sub_->Publish(JOB_CHANNEL, job_id.Hex(), job_table_data->SerializeAsString(), nullptr)); + runtime_env_manager_.RemoveURIReference(job_id.Hex()); ClearJobInfos(job_id); RAY_LOG(INFO) << "Finished marking job state, job id = " << job_id; } diff --git a/src/ray/gcs/gcs_server/gcs_job_manager.h b/src/ray/gcs/gcs_server/gcs_job_manager.h index 31ba0f9c8..23e1d5d2c 100644 --- a/src/ray/gcs/gcs_server/gcs_job_manager.h +++ b/src/ray/gcs/gcs_server/gcs_job_manager.h @@ -14,6 +14,7 @@ #pragma once +#include "ray/common/runtime_env_manager.h" #include "ray/gcs/gcs_server/gcs_object_manager.h" #include "ray/gcs/gcs_server/gcs_table_storage.h" #include "ray/gcs/pubsub/gcs_pub_sub.h" @@ -26,9 +27,11 @@ namespace gcs { class GcsJobManager : public rpc::JobInfoHandler { public: explicit GcsJobManager(std::shared_ptr gcs_table_storage, - std::shared_ptr gcs_pub_sub) + std::shared_ptr gcs_pub_sub, + RuntimeEnvManager &runtime_env_manager) : gcs_table_storage_(std::move(gcs_table_storage)), - gcs_pub_sub_(std::move(gcs_pub_sub)) {} + gcs_pub_sub_(std::move(gcs_pub_sub)), + runtime_env_manager_(runtime_env_manager) {} void HandleAddJob(const rpc::AddJobRequest &request, rpc::AddJobReply *reply, rpc::SendReplyCallback send_reply_callback) override; @@ -54,7 +57,7 @@ class GcsJobManager : public rpc::JobInfoHandler { /// Listeners which monitors the finish of jobs. std::vector)>> job_finished_listeners_; - + ray::RuntimeEnvManager &runtime_env_manager_; void ClearJobInfos(const JobID &job_id); }; diff --git a/src/ray/gcs/gcs_server/gcs_kv_manager.cc b/src/ray/gcs/gcs_server/gcs_kv_manager.cc index f819f59cc..faf441543 100644 --- a/src/ray/gcs/gcs_server/gcs_kv_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_kv_manager.cc @@ -34,12 +34,17 @@ void GcsInternalKVManager::HandleInternalKVPut( void GcsInternalKVManager::HandleInternalKVDel( const rpc::InternalKVDelRequest &request, rpc::InternalKVDelReply *reply, rpc::SendReplyCallback send_reply_callback) { - std::vector cmd = {"HDEL", request.key(), "value"}; + InternalKVDelAsync(request.key(), [reply, send_reply_callback](int deleted_num) { + reply->set_deleted_num(deleted_num); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); + }); +} + +void GcsInternalKVManager::InternalKVDelAsync(const std::string &key, + std::function cb) { + std::vector cmd = {"HDEL", key, "value"}; RAY_CHECK_OK(redis_client_->GetPrimaryContext()->RunArgvAsync( - cmd, [reply, send_reply_callback](auto redis_reply) { - reply->set_deleted_num(redis_reply->ReadAsInteger()); - GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); - })); + cmd, [cb](auto redis_reply) { cb(redis_reply->ReadAsInteger()); })); } void GcsInternalKVManager::HandleInternalKVExists( diff --git a/src/ray/gcs/gcs_server/gcs_kv_manager.h b/src/ray/gcs/gcs_server/gcs_kv_manager.h index bd8fb0d42..6205d0ecf 100644 --- a/src/ray/gcs/gcs_server/gcs_kv_manager.h +++ b/src/ray/gcs/gcs_server/gcs_kv_manager.h @@ -39,6 +39,8 @@ class GcsInternalKVManager : public rpc::InternalKVHandler { rpc::InternalKVDelReply *reply, rpc::SendReplyCallback send_reply_callback); + void InternalKVDelAsync(const std::string &key, std::function cb); + void HandleInternalKVExists(const rpc::InternalKVExistsRequest &request, rpc::InternalKVExistsReply *reply, rpc::SendReplyCallback send_reply_callback); diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index bfff62e32..0b1edf73f 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -80,6 +80,12 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) { // Init gcs heartbeat manager. InitGcsHeartbeatManager(gcs_init_data); + // Init KV Manager + InitKVManager(); + + // Init RuntimeENv manager + InitRuntimeEnvManager(); + // Init gcs job manager. InitGcsJobManager(); @@ -101,9 +107,6 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) { // Init stats handler. InitStatsHandler(); - // Init KV Manager - InitKVManager(); - // Init resource report polling. InitResourceReportPolling(gcs_init_data); @@ -206,9 +209,11 @@ void GcsServer::InitGcsResourceScheduler() { void GcsServer::InitGcsJobManager() { RAY_CHECK(gcs_table_storage_ && gcs_pub_sub_); - gcs_job_manager_.reset(new GcsJobManager(gcs_table_storage_, gcs_pub_sub_)); + gcs_job_manager_ = std::make_unique(gcs_table_storage_, gcs_pub_sub_, + *runtime_env_manager_); // Register service. - job_info_service_.reset(new rpc::JobInfoGrpcService(main_service_, *gcs_job_manager_)); + job_info_service_ = + std::make_unique(main_service_, *gcs_job_manager_); rpc_server_.RegisterService(*job_info_service_); } @@ -236,7 +241,7 @@ void GcsServer::InitGcsActorManager(const GcsInitData &gcs_init_data) { return std::make_shared(address, client_call_manager_); }); gcs_actor_manager_ = std::make_shared( - scheduler, gcs_table_storage_, gcs_pub_sub_, + scheduler, gcs_table_storage_, gcs_pub_sub_, *runtime_env_manager_, [this](const ActorID &actor_id) { gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenActorDead(actor_id); }, @@ -347,6 +352,32 @@ void GcsServer::InitKVManager() { rpc_server_.RegisterService(*kv_service_); } +void GcsServer::InitRuntimeEnvManager() { + runtime_env_manager_ = + std::make_unique([this](const std::string &uri, auto cb) { + std::string sep = "://"; + auto pos = uri.find(sep); + if (pos == std::string::npos || pos + sep.size() == uri.size()) { + RAY_LOG(ERROR) << "Invalid uri: " << uri; + cb(false); + } else { + auto scheme = uri.substr(0, pos); + if (scheme != "gcs") { + // Skip other uri + cb(true); + } else { + this->kv_manager_->InternalKVDelAsync(uri, [cb](int deleted_num) { + if (deleted_num == 0) { + cb(false); + } else { + cb(true); + } + }); + } + } + }); +} + void GcsServer::InitGcsWorkerManager() { gcs_worker_manager_ = std::make_unique(gcs_table_storage_, gcs_pub_sub_); diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index 14b77060d..92c942aa7 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -15,6 +15,7 @@ #pragma once #include "ray/common/asio/instrumented_io_context.h" +#include "ray/common/runtime_env_manager.h" #include "ray/gcs/gcs_server/gcs_heartbeat_manager.h" #include "ray/gcs/gcs_server/gcs_init_data.h" #include "ray/gcs/gcs_server/gcs_kv_manager.h" @@ -120,6 +121,9 @@ class GcsServer { /// Initialize KV manager. void InitKVManager(); + // Init RuntimeENv manager + void InitRuntimeEnvManager(); + /// Initialize resource report polling. void InitResourceReportPolling(const GcsInitData &gcs_init_data); @@ -212,6 +216,7 @@ class GcsServer { std::shared_ptr gcs_pub_sub_; /// The gcs table storage. std::shared_ptr gcs_table_storage_; + std::unique_ptr runtime_env_manager_; /// Gcs service state flag, which is used for ut. bool is_started_ = false; bool is_stopped_ = false; diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc index b0a0f7ff7..664783f40 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc @@ -95,12 +95,13 @@ class GcsActorManagerTest : public ::testing::Test { })); promise.get_future().get(); worker_client_ = std::make_shared(io_service_); - + runtime_env_mgr_ = + std::make_unique([](auto, auto f) { f(true); }); gcs_pub_sub_ = std::make_shared(redis_client_); store_client_ = std::make_shared(io_service_); gcs_table_storage_ = std::make_shared(io_service_); gcs_actor_manager_.reset(new gcs::GcsActorManager( - mock_actor_scheduler_, gcs_table_storage_, gcs_pub_sub_, + mock_actor_scheduler_, gcs_table_storage_, gcs_pub_sub_, *runtime_env_mgr_, [](const ActorID &actor_id) {}, [this](const rpc::Address &addr) { return worker_client_; })); } @@ -185,7 +186,7 @@ class GcsActorManagerTest : public ::testing::Test { std::unique_ptr gcs_actor_manager_; std::shared_ptr gcs_pub_sub_; std::shared_ptr redis_client_; - + std::unique_ptr runtime_env_mgr_; const std::chrono::milliseconds timeout_ms_{2000}; }; diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index bff0b63e0..6f46b25db 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -149,6 +149,9 @@ message ActorTableData { // The exception thrown in creation task. This field is set if this actor died because // of exception thrown in creation task. Only applies when state=DEAD. RayException creation_task_exception = 18; + // Runtime required to run this actor + // It'll only be set if it's a detached actor and the original job has this field + RuntimeEnv runtime_env = 19; } message ErrorTableData { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 9738a8d36..5022b54bf 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -480,7 +480,7 @@ void NodeManager::HandleJobStarted(const JobID &job_id, const JobTableData &job_ RAY_CHECK(!job_data.is_dead()); worker_pool_.HandleJobStarted(job_id, job_data.config()); - runtime_env_manager_.AddUriReference(job_id.Hex(), job_data.config().runtime_env()); + runtime_env_manager_.AddURIReference(job_id.Hex(), job_data.config().runtime_env()); // Tasks of this job may already arrived but failed to pop a worker because the job // config is not local yet. So we trigger dispatching again here to try to // reschedule these tasks. @@ -506,7 +506,7 @@ void NodeManager::HandleJobFinished(const JobID &job_id, const JobTableData &job KillWorker(worker); } } - runtime_env_manager_.RemoveUriReference(job_id.Hex()); + runtime_env_manager_.RemoveURIReference(job_id.Hex()); } void NodeManager::FillResourceReport(rpc::ResourcesData &resources_data) { @@ -1206,7 +1206,7 @@ void NodeManager::DisconnectClient( } if (worker->IsDetachedActor()) { - runtime_env_manager_.RemoveUriReference(actor_id.Hex()); + runtime_env_manager_.RemoveURIReference(actor_id.Hex()); } if (disconnect_type == rpc::WorkerExitType::SYSTEM_ERROR_EXIT) { @@ -1795,7 +1795,7 @@ void NodeManager::FinishAssignedActorCreationTask(WorkerInterface &worker, auto job_id = task.GetTaskSpecification().JobId(); auto job_config = worker_pool_.GetJobConfig(job_id); RAY_CHECK(job_config); - runtime_env_manager_.AddUriReference(actor_id.Hex(), job_config->runtime_env()); + runtime_env_manager_.AddURIReference(actor_id.Hex(), job_config->runtime_env()); ; } }