[runtime env] Enable reference counting for URIs for actors (#20165)

This commit is contained in:
architkulkarni 2021-11-10 10:52:03 -08:00 committed by GitHub
parent 790e22f9ad
commit 923131ba37
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 62 additions and 26 deletions

View file

@ -124,5 +124,35 @@ def test_detached_actor_gc(start_cluster, field, spec_format, tmp_path):
wait_for_condition(lambda: check_local_files_gced(cluster), timeout=30)
@pytest.mark.skipif(
os.environ.get("CI") and sys.platform != "linux",
reason="Requires PR wheels built in CI, so only run on linux CI machines.")
@pytest.mark.parametrize("field", ["conda", "pip"])
@pytest.mark.parametrize("spec_format", ["file", "python_object"])
def test_actor_level_gc(start_cluster, field, spec_format, tmp_path):
"""Tests that actor-level working_dir is GC'd when the actor exits."""
cluster, address = start_cluster
ray.init(address)
runtime_env = generate_runtime_env_dict(field, spec_format, tmp_path)
@ray.remote
class A:
def test_import(self):
import pip_install_test # noqa: F401
return True
NUM_ACTORS = 5
actors = [
A.options(runtime_env=runtime_env).remote() for _ in range(NUM_ACTORS)
]
ray.get([a.test_import.remote() for a in actors])
for i in range(5):
assert not check_local_files_gced(cluster)
ray.kill(actors[i])
wait_for_condition(lambda: check_local_files_gced(cluster))
if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))

View file

@ -314,7 +314,10 @@ def test_s3_uri(start_cluster, option, per_task_actor):
"source", [S3_PACKAGE_URI, lazy_fixture("tmp_working_dir")])
def test_multi_node(start_cluster, option: str, source: str):
"""Tests that the working_dir is propagated across multi-node clusters."""
NUM_NODES = 3
# TODO(architkulkarni): Currently all nodes in cluster_utils share the same
# session directory, which isn't the case for real world clusters. Once
# this is fixed, we should test GC with NUM_NODES > 1 here.
NUM_NODES = 1
cluster, address = start_cluster
for _ in range(NUM_NODES - 1): # Head node already added.
cluster.add_node(num_cpus=1)
@ -363,7 +366,10 @@ def check_local_files_gced(cluster):
"source", [S3_PACKAGE_URI, lazy_fixture("tmp_working_dir")])
def test_job_level_gc(start_cluster, option: str, source: str):
"""Tests that job-level working_dir is GC'd when the job exits."""
NUM_NODES = 3
# TODO(architkulkarni): Currently all nodes in cluster_utils share the same
# session directory, which isn't the case for real world clusters. Once
# this is fixed, we should test GC with NUM_NODES > 1 here.
NUM_NODES = 1
cluster, address = start_cluster
for _ in range(NUM_NODES - 1): # Head node already added.
cluster.add_node(num_cpus=1)
@ -407,13 +413,14 @@ def test_job_level_gc(start_cluster, option: str, source: str):
wait_for_condition(lambda: check_local_files_gced(cluster))
# TODO(edoakes): fix this bug and enable test.
@pytest.mark.skip("Currently failing.")
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
@pytest.mark.parametrize("option", ["working_dir", "py_modules"])
def test_actor_level_gc(start_cluster, option: str):
"""Tests that actor-level working_dir is GC'd when the actor exits."""
NUM_NODES = 3
# TODO(architkulkarni): Currently all nodes in cluster_utils share the same
# session directory, which isn't the case for real world clusters. Once
# this is fixed, we should test GC with NUM_NODES > 1 here.
NUM_NODES = 1
cluster, address = start_cluster
for _ in range(NUM_NODES - 1): # Head node already added.
cluster.add_node(num_cpus=1)
@ -423,20 +430,20 @@ def test_actor_level_gc(start_cluster, option: str):
@ray.remote
class A:
def check(self):
assert "test_module" in os.listdir()
import test_module
test_module.one()
if option == "working_dir":
A = A.options(runtime_env={"working_dir": S3_PACKAGE_URI})
else:
A = A.options(runtime_env={"py_modules": [S3_PACKAGE_URI]})
actors = [A.remote() for _ in range(5)]
NUM_ACTORS = 5
actors = [A.remote() for _ in range(NUM_ACTORS)]
ray.get([a.check.remote() for a in actors])
assert not check_local_files_gced(cluster)
[ray.kill(a) for a in actors]
for i in range(5):
assert not check_local_files_gced(cluster)
ray.kill(actors[i])
wait_for_condition(lambda: check_local_files_gced(cluster))

View file

@ -23,9 +23,9 @@ namespace ray {
/// here. There are two places where runtime env need be managed
/// 1) central storage, like GCS or global KV storage
/// 2) local node, where runtime env is fetched
/// We only track the job and detached actor for runtime env. In summary,
/// runtime env will be cleaned up when there is no job or detached actor is
/// using it. The resouce is tracked in URI level. User need to provider
/// We only track references from jobs and actors for runtime env. In summary,
/// runtime env will be cleaned up when there is no job or actor is
/// using it. The resource is tracked at the URI level. User needs to provide
/// a delete handler.
class RuntimeEnvManager {
public:

View file

@ -397,12 +397,11 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ
// This actor is owned. Send a long polling request to the actor's
// owner to determine when the actor should be removed.
PollOwnerForActorOutOfScope(actor);
} else {
// If it's a detached actor, we need to register the runtime env it used to GC.
runtime_env_manager_.AddURIReference(actor->GetActorID().Hex(),
request.task_spec().runtime_env());
}
runtime_env_manager_.AddURIReference(actor->GetActorID().Hex(),
request.task_spec().runtime_env());
// The backend storage is supposed to be reliable, so the status must be ok.
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
actor->GetActorID(), *actor->GetMutableActorTableData(),
@ -628,11 +627,12 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id) {
// Clean up the client to the actor's owner, if necessary.
if (!actor->IsDetached()) {
RemoveActorFromOwner(actor);
} else {
runtime_env_manager_.RemoveURIReference(actor->GetActorID().Hex());
}
runtime_env_manager_.RemoveURIReference(actor->GetActorID().Hex());
RemoveActorNameFromRegistry(actor);
// The actor is already dead, most likely due to process or node failure.
if (actor->GetState() == rpc::ActorTableData::DEAD) {
RAY_LOG(DEBUG) << "Actor " << actor->GetActorID() << "has been dead,"

View file

@ -1256,9 +1256,7 @@ void NodeManager::DisconnectClient(
cluster_task_manager_->TaskFinished(worker, &task);
}
if (worker->IsDetachedActor()) {
runtime_env_manager_.RemoveURIReference(actor_id.Hex());
}
runtime_env_manager_.RemoveURIReference(actor_id.Hex());
if (disconnect_type == rpc::WorkerExitType::SYSTEM_ERROR_EXIT) {
// Push the error to driver.
@ -1950,9 +1948,10 @@ void NodeManager::FinishAssignedActorCreationTask(WorkerInterface &worker,
auto job_id = task.GetTaskSpecification().JobId();
auto job_config = worker_pool_.GetJobConfig(job_id);
RAY_CHECK(job_config);
runtime_env_manager_.AddURIReference(actor_id.Hex(),
task.GetTaskSpecification().RuntimeEnv());
}
runtime_env_manager_.AddURIReference(actor_id.Hex(),
task.GetTaskSpecification().RuntimeEnv());
}
void NodeManager::HandleObjectLocal(const ObjectInfo &object_info) {