[core] Fix bugs in worker cleanup on driver exit (#17049)

* unit test

* cleanup test

* Don't kill workers when job finishes

* better test

* lint

* lint

* comment

* check
This commit is contained in:
Stephanie Wang 2021-07-15 12:53:51 -07:00 committed by GitHub
parent 02f58a5c6b
commit bdaa96bf43
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 99 additions and 35 deletions

View file

@ -7,7 +7,8 @@ import pytest
from ray.cluster_utils import Cluster from ray.cluster_utils import Cluster
import ray.ray_constants as ray_constants import ray.ray_constants as ray_constants
from ray.test_utils import (init_error_pubsub, get_error_message) from ray.test_utils import (init_error_pubsub, get_error_message,
run_string_as_driver)
def test_fill_object_store_exception(shutdown_only): def test_fill_object_store_exception(shutdown_only):
@ -79,5 +80,56 @@ def test_connect_with_disconnected_node(shutdown_only):
p.close() p.close()
def test_detached_actor_ref(call_ray_start):
address = call_ray_start
driver_script = """
import ray
import time
@ray.remote
def foo(x):
return ray.put(42)
@ray.remote
class Actor:
def __init__(self):
self.ref = None
def invoke(self):
self.ref = foo.remote(0)
# Wait for the task to finish before exiting the driver.
ray.get(self.ref)
def get(self):
print("get", self.ref)
return self.ref
if __name__ == "__main__":
ray.init(address="{}", namespace="default")
a = Actor.options(name="holder", lifetime="detached").remote()
# Wait for the task to finish before exiting the driver.
ray.get(a.invoke.remote())
print("success")
""".format(address)
out = run_string_as_driver(driver_script)
assert "success" in out
import time
time.sleep(5)
# connect to the cluster
ray.init(address=address, namespace="default")
actor = ray.get_actor("holder")
x = actor.get.remote()
while isinstance(x, ray.ObjectRef):
x = ray.get(x)
assert x == 42
if __name__ == "__main__": if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__])) sys.exit(pytest.main(["-v", __file__]))

View file

@ -1,13 +1,15 @@
import os import os
import pytest import pytest
import psutil
import sys import sys
import time import time
import ray import ray
from ray import ray_constants
from ray.test_utils import (RayTestTimeoutException, run_string_as_driver, from ray.test_utils import (RayTestTimeoutException, run_string_as_driver,
run_string_as_driver_nonblocking, run_string_as_driver_nonblocking,
init_error_pubsub, get_error_message, init_error_pubsub, get_error_message,
object_memory_usage) object_memory_usage, wait_for_condition)
def test_error_isolation(call_ray_start): def test_error_isolation(call_ray_start):
@ -183,6 +185,9 @@ import time
import ray import ray
import numpy as np import numpy as np
from ray.test_utils import object_memory_usage from ray.test_utils import object_memory_usage
import os
ray.init(address="{}") ray.init(address="{}")
object_refs = [ray.put(np.zeros(200 * 1024, dtype=np.uint8)) object_refs = [ray.put(np.zeros(200 * 1024, dtype=np.uint8))
for i in range(1000)] for i in range(1000)]
@ -192,10 +197,20 @@ while time.time() - start_time < 30:
break break
else: else:
raise Exception("Objects did not appear in object table.") raise Exception("Objects did not appear in object table.")
@ray.remote
def f():
time.sleep(1)
print("success") print("success")
# Submit some tasks without waiting for them to finish. Their workers should
# still get cleaned up eventually, even if they get started after the driver
# exits.
[f.remote() for _ in range(10)]
""".format(address) """.format(address)
run_string_as_driver(driver_script) out = run_string_as_driver(driver_script)
assert "success" in out
# Make sure the objects are removed from the object table. # Make sure the objects are removed from the object table.
start_time = time.time() start_time = time.time()
@ -205,6 +220,15 @@ print("success")
else: else:
raise Exception("Objects were not all removed from object table.") raise Exception("Objects were not all removed from object table.")
def all_workers_exited():
for proc in psutil.process_iter():
if ray_constants.WORKER_PROCESS_TYPE_IDLE_WORKER in proc.name():
return False
return True
# Check that workers are eventually cleaned up.
wait_for_condition(all_workers_exited)
def test_drivers_named_actors(call_ray_start): def test_drivers_named_actors(call_ray_start):
# This test will create some drivers that submit some tasks to the same # This test will create some drivers that submit some tasks to the same

View file

@ -234,7 +234,7 @@ ray.shutdown()
# wait for a while to let workers register # wait for a while to let workers register
time.sleep(2) time.sleep(2)
wait_for_condition(lambda: len(get_workers()) == before) wait_for_condition(lambda: len(get_workers()) <= before)
def test_not_killing_workers_that_own_objects(shutdown_only): def test_not_killing_workers_that_own_objects(shutdown_only):

View file

@ -538,21 +538,6 @@ void NodeManager::HandleJobFinished(const JobID &job_id, const JobTableData &job
RAY_LOG(DEBUG) << "HandleJobFinished " << job_id; RAY_LOG(DEBUG) << "HandleJobFinished " << job_id;
RAY_CHECK(job_data.is_dead()); RAY_CHECK(job_data.is_dead());
worker_pool_.HandleJobFinished(job_id); worker_pool_.HandleJobFinished(job_id);
auto workers = worker_pool_.GetWorkersRunningTasksForJob(job_id);
// Kill all the workers. The actual cleanup for these workers is done
// later when we receive the DisconnectClient message from them.
for (const auto &worker : workers) {
if (!worker->IsDetachedActor()) {
// Clean up any open ray.wait calls that the worker made.
dependency_manager_.CancelWaitRequest(worker->WorkerId());
// Mark the worker as dead so further messages from it are ignored
// (except DisconnectClient).
worker->MarkDead();
// Then kill the worker process.
KillWorker(worker);
}
}
runtime_env_manager_.RemoveURIReference(job_id.Hex()); runtime_env_manager_.RemoveURIReference(job_id.Hex());
} }
@ -1243,19 +1228,9 @@ void NodeManager::DisconnectClient(
} }
} }
RAY_CHECK(!(is_worker && is_driver)); RAY_CHECK(!(is_worker && is_driver));
// If the client has any blocked tasks, mark them as unblocked. In // Clean up any open ray.get or ray.wait calls that the worker made.
// particular, we are no longer waiting for their dependencies. dependency_manager_.CancelGetRequest(worker->WorkerId());
if (is_worker && worker->IsDead()) { dependency_manager_.CancelWaitRequest(worker->WorkerId());
// If the worker was killed by us because the driver exited,
// treat it as intentionally disconnected.
// Don't need to unblock the client if it's a worker and is already dead.
// Because in this case, its task is already cleaned up.
RAY_LOG(DEBUG) << "Skip unblocking worker because it's already dead.";
} else {
// Clean up any open ray.wait calls that the worker made.
dependency_manager_.CancelGetRequest(worker->WorkerId());
dependency_manager_.CancelWaitRequest(worker->WorkerId());
}
// Erase any lease metadata. // Erase any lease metadata.
leased_workers_.erase(worker->WorkerId()); leased_workers_.erase(worker->WorkerId());

View file

@ -443,6 +443,7 @@ void WorkerPool::HandleJobFinished(const JobID &job_id) {
// Currently we don't erase the job from `all_jobs_` , as a workaround for // Currently we don't erase the job from `all_jobs_` , as a workaround for
// https://github.com/ray-project/ray/issues/11437. // https://github.com/ray-project/ray/issues/11437.
// unfinished_jobs_.erase(job_id); // unfinished_jobs_.erase(job_id);
finished_jobs_.insert(job_id);
} }
boost::optional<const rpc::JobConfig &> WorkerPool::GetJobConfig( boost::optional<const rpc::JobConfig &> WorkerPool::GetJobConfig(
@ -715,11 +716,16 @@ void WorkerPool::TryKillingIdleWorkers() {
running_size -= pending_exit_idle_workers_.size(); running_size -= pending_exit_idle_workers_.size();
// Kill idle workers in FIFO order. // Kill idle workers in FIFO order.
for (const auto &idle_pair : idle_of_all_languages_) { for (const auto &idle_pair : idle_of_all_languages_) {
const auto &idle_worker = idle_pair.first;
const auto &job_id = idle_worker->GetAssignedJobId();
if (running_size <= static_cast<size_t>(num_workers_soft_limit_)) { if (running_size <= static_cast<size_t>(num_workers_soft_limit_)) {
break; if (!finished_jobs_.count(job_id)) {
// Ignore the soft limit for jobs that have already finished, as we
// should always clean up these workers.
break;
}
} }
const auto &idle_worker = idle_pair.first;
if (pending_exit_idle_workers_.count(idle_worker->WorkerId())) { if (pending_exit_idle_workers_.count(idle_worker->WorkerId())) {
// If the worker is pending exit, just skip it. // If the worker is pending exit, just skip it.
continue; continue;
@ -768,7 +774,11 @@ void WorkerPool::TryKillingIdleWorkers() {
static_cast<size_t>(num_workers_soft_limit_)) { static_cast<size_t>(num_workers_soft_limit_)) {
// A Java worker process may contain multiple workers. Killing more workers than we // A Java worker process may contain multiple workers. Killing more workers than we
// expect may slow the job. // expect may slow the job.
return; if (!finished_jobs_.count(job_id)) {
// Ignore the soft limit for jobs that have already finished, as we
// should always clean up these workers.
return;
}
} }
for (const auto &worker : workers_in_the_same_process) { for (const auto &worker : workers_in_the_same_process) {

View file

@ -553,6 +553,9 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
/// This map tracks the latest infos of unfinished jobs. /// This map tracks the latest infos of unfinished jobs.
absl::flat_hash_map<JobID, rpc::JobConfig> all_jobs_; absl::flat_hash_map<JobID, rpc::JobConfig> all_jobs_;
/// Set of jobs whose drivers have exited.
absl::flat_hash_set<JobID> finished_jobs_;
/// This map stores the same data as `idle_of_all_languages_`, but in a map structure /// This map stores the same data as `idle_of_all_languages_`, but in a map structure
/// for lookup performance. /// for lookup performance.
std::unordered_map<std::shared_ptr<WorkerInterface>, int64_t> std::unordered_map<std::shared_ptr<WorkerInterface>, int64_t>