Release GPU resources as soon as an actor exits. (#1088)

* Release GPU resources as soon as an actor exits.

* Add a test.

* Store local_scheduler_id and driver_id in the worker object instead of the actor object.
This commit is contained in:
Robert Nishihara 2017-10-06 17:58:19 -07:00 committed by Philipp Moritz
parent aebe9f9374
commit 4669c59fa8
3 changed files with 95 additions and 4 deletions

View file

@ -12,8 +12,8 @@ import traceback
import ray.local_scheduler
import ray.signature as signature
import ray.worker
from ray.utils import (FunctionProperties, random_string,
select_local_scheduler)
from ray.utils import (binary_to_hex, FunctionProperties, random_string,
release_gpus_in_use, select_local_scheduler)
def random_actor_id():
@ -112,7 +112,6 @@ def fetch_and_register_actor(actor_class_key, worker):
try:
unpickled_class = pickle.loads(pickled_class)
worker.actor_class = unpickled_class
except Exception:
# If an exception was thrown when the actor was imported, we record the
# traceback and notify the scheduler of the failure.
@ -120,6 +119,9 @@ def fetch_and_register_actor(actor_class_key, worker):
# Log the error message.
worker.push_error_to_driver(driver_id, "register_actor", traceback_str,
data={"actor_id": actor_id_str})
# TODO(rkn): In the future, it might make sense to have the worker exit
# here. However, currently that would lead to hanging if someone calls
# ray.get on a method invoked on the actor.
else:
# TODO(pcm): Why is the below line necessary?
unpickled_class.__module__ = module
@ -133,6 +135,13 @@ def fetch_and_register_actor(actor_class_key, worker):
# because we currently do need the actor worker to submit new tasks
# for the actor.
# Store some extra information that will be used when the actor exits
# to release GPU resources.
worker.driver_id = binary_to_hex(driver_id)
local_scheduler_id = worker.redis_client.hget(
b"Actor:" + actor_id_str, "local_scheduler_id")
worker.local_scheduler_id = binary_to_hex(local_scheduler_id)
def export_actor_class(class_id, Class, actor_method_names,
checkpoint_interval, worker):
@ -214,7 +223,14 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval):
# remove the actor key from Redis here.
ray.worker.global_worker.redis_client.hset(b"Actor:" + actor_id,
"removed", True)
# Disconnect the worker from he local scheduler. The point of this
# Release the GPUs that this worker was using.
if len(ray.get_gpu_ids()) > 0:
release_gpus_in_use(
ray.worker.global_worker.driver_id,
ray.worker.global_worker.local_scheduler_id,
ray.get_gpu_ids(),
ray.worker.global_worker.redis_client)
# Disconnect the worker from the local scheduler. The point of this
# is so that when the worker kills itself below, the local
# scheduler won't push an error message to the driver.
ray.worker.global_worker.local_scheduler_client.disconnect()

View file

@ -136,6 +136,54 @@ def attempt_to_reserve_gpus(num_gpus, driver_id, local_scheduler,
return success
def release_gpus_in_use(driver_id, local_scheduler_id, gpu_ids, redis_client):
"""Release the GPUs that a given worker was using.
Note that this does not affect the local scheduler's bookkeeping. It only
affects the GPU allocations which are recorded in the primary Redis shard,
which are redundant with the local scheduler bookkeeping.
Args:
driver_id: The ID of the driver that is releasing some GPUs.
local_scheduler_id: The ID of the local scheduler that owns the GPUs
being released.
gpu_ids: The IDs of the GPUs being released.
redis_client: A client for the primary Redis shard.
"""
# Attempt to release GPU IDs atomically.
with redis_client.pipeline() as pipe:
while True:
try:
# If this key is changed before the transaction below (the
# multi/exec block), then the transaction will not take place.
pipe.watch(local_scheduler_id)
# Figure out which GPUs are currently in use.
result = redis_client.hget(local_scheduler_id, "gpus_in_use")
gpus_in_use = dict() if result is None else json.loads(
result.decode("ascii"))
assert driver_id in gpus_in_use
assert gpus_in_use[driver_id] >= len(gpu_ids)
gpus_in_use[driver_id] -= len(gpu_ids)
pipe.multi()
pipe.hset(local_scheduler_id, "gpus_in_use",
json.dumps(gpus_in_use))
pipe.execute()
# If a WatchError is not raised, then the operations should
# have gone through atomically.
break
except redis.WatchError:
# Another client must have changed the watched key between the
# time we started WATCHing it and the pipeline's execution. We
# should just retry.
continue
def select_local_scheduler(driver_id, local_schedulers, num_gpus,
redis_client):
"""Select a local scheduler to assign this actor to.

View file

@ -315,6 +315,33 @@ class ActorMethods(unittest.TestCase):
ray.worker.cleanup()
def testActorDeletionWithGPUs(self):
ray.init(num_workers=0, num_gpus=1)
# When an actor that uses a GPU exits, make sure that the GPU resources
# are released.
@ray.remote(num_gpus=1)
class Actor(object):
def getpid(self):
return os.getpid()
for _ in range(5):
# If we can successfully create an actor, that means that enough
# GPU resources are available.
a = Actor.remote()
pid = ray.get(a.getpid.remote())
# Make sure that we can't create another actor.
with self.assertRaises(Exception):
Actor.remote()
# Let the actor go out of scope, and wait for it to exit.
a = None
ray.test.test_utils.wait_for_pid_to_exit(pid)
ray.worker.cleanup()
def testActorState(self):
ray.init()