diff --git a/python/ray/actor.py b/python/ray/actor.py index 36845216b..ab0fbf5c5 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -12,8 +12,8 @@ import traceback import ray.local_scheduler import ray.signature as signature import ray.worker -from ray.utils import (FunctionProperties, random_string, - select_local_scheduler) +from ray.utils import (binary_to_hex, FunctionProperties, random_string, + release_gpus_in_use, select_local_scheduler) def random_actor_id(): @@ -112,7 +112,6 @@ def fetch_and_register_actor(actor_class_key, worker): try: unpickled_class = pickle.loads(pickled_class) - worker.actor_class = unpickled_class except Exception: # If an exception was thrown when the actor was imported, we record the # traceback and notify the scheduler of the failure. @@ -120,6 +119,9 @@ def fetch_and_register_actor(actor_class_key, worker): # Log the error message. worker.push_error_to_driver(driver_id, "register_actor", traceback_str, data={"actor_id": actor_id_str}) + # TODO(rkn): In the future, it might make sense to have the worker exit + # here. However, currently that would lead to hanging if someone calls + # ray.get on a method invoked on the actor. else: # TODO(pcm): Why is the below line necessary? unpickled_class.__module__ = module @@ -133,6 +135,13 @@ def fetch_and_register_actor(actor_class_key, worker): # because we currently do need the actor worker to submit new tasks # for the actor. + # Store some extra information that will be used when the actor exits + # to release GPU resources. + worker.driver_id = binary_to_hex(driver_id) + local_scheduler_id = worker.redis_client.hget( + b"Actor:" + actor_id_str, "local_scheduler_id") + worker.local_scheduler_id = binary_to_hex(local_scheduler_id) + def export_actor_class(class_id, Class, actor_method_names, checkpoint_interval, worker): @@ -214,7 +223,14 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval): # remove the actor key from Redis here. ray.worker.global_worker.redis_client.hset(b"Actor:" + actor_id, "removed", True) - # Disconnect the worker from he local scheduler. The point of this + # Release the GPUs that this worker was using. + if len(ray.get_gpu_ids()) > 0: + release_gpus_in_use( + ray.worker.global_worker.driver_id, + ray.worker.global_worker.local_scheduler_id, + ray.get_gpu_ids(), + ray.worker.global_worker.redis_client) + # Disconnect the worker from the local scheduler. The point of this # is so that when the worker kills itself below, the local # scheduler won't push an error message to the driver. ray.worker.global_worker.local_scheduler_client.disconnect() diff --git a/python/ray/utils.py b/python/ray/utils.py index 2a5bbb4e3..e6e9ff697 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -136,6 +136,54 @@ def attempt_to_reserve_gpus(num_gpus, driver_id, local_scheduler, return success +def release_gpus_in_use(driver_id, local_scheduler_id, gpu_ids, redis_client): + """Release the GPUs that a given worker was using. + + Note that this does not affect the local scheduler's bookkeeping. It only + affects the GPU allocations which are recorded in the primary Redis shard, + which are redundant with the local scheduler bookkeeping. + + Args: + driver_id: The ID of the driver that is releasing some GPUs. + local_scheduler_id: The ID of the local scheduler that owns the GPUs + being released. + gpu_ids: The IDs of the GPUs being released. + redis_client: A client for the primary Redis shard. + """ + # Attempt to release GPU IDs atomically. + with redis_client.pipeline() as pipe: + while True: + try: + # If this key is changed before the transaction below (the + # multi/exec block), then the transaction will not take place. + pipe.watch(local_scheduler_id) + + # Figure out which GPUs are currently in use. + result = redis_client.hget(local_scheduler_id, "gpus_in_use") + gpus_in_use = dict() if result is None else json.loads( + result.decode("ascii")) + + assert driver_id in gpus_in_use + assert gpus_in_use[driver_id] >= len(gpu_ids) + + gpus_in_use[driver_id] -= len(gpu_ids) + + pipe.multi() + + pipe.hset(local_scheduler_id, "gpus_in_use", + json.dumps(gpus_in_use)) + + pipe.execute() + # If a WatchError is not raised, then the operations should + # have gone through atomically. + break + except redis.WatchError: + # Another client must have changed the watched key between the + # time we started WATCHing it and the pipeline's execution. We + # should just retry. + continue + + def select_local_scheduler(driver_id, local_schedulers, num_gpus, redis_client): """Select a local scheduler to assign this actor to. diff --git a/test/actor_test.py b/test/actor_test.py index df4a1ae5a..37d674b57 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -315,6 +315,33 @@ class ActorMethods(unittest.TestCase): ray.worker.cleanup() + def testActorDeletionWithGPUs(self): + ray.init(num_workers=0, num_gpus=1) + + # When an actor that uses a GPU exits, make sure that the GPU resources + # are released. + + @ray.remote(num_gpus=1) + class Actor(object): + def getpid(self): + return os.getpid() + + for _ in range(5): + # If we can successfully create an actor, that means that enough + # GPU resources are available. + a = Actor.remote() + pid = ray.get(a.getpid.remote()) + + # Make sure that we can't create another actor. + with self.assertRaises(Exception): + Actor.remote() + + # Let the actor go out of scope, and wait for it to exit. + a = None + ray.test.test_utils.wait_for_pid_to_exit(pid) + + ray.worker.cleanup() + def testActorState(self): ray.init()