diff --git a/python/ray/actor.py b/python/ray/actor.py index 1785621c3..7967d17de 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -3,6 +3,7 @@ from __future__ import division from __future__ import print_function import cloudpickle as pickle +import copy import hashlib import inspect import json @@ -314,17 +315,25 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval): self._actor_method_invokers[k] = ActorMethod( self, k, self._ray_method_signatures[k]) - # Export the actor class if it has not been exported yet. - if len(exported) == 0: - export_actor_class(class_id, Class, - self._ray_actor_methods.keys(), - checkpoint_interval, - ray.worker.global_worker) - exported.append(0) - # Export the actor. - export_actor(self._ray_actor_id, class_id, - self._ray_actor_methods.keys(), num_cpus, num_gpus, - ray.worker.global_worker) + # Do not export the actor class or the actor if run in PYTHON_MODE + # Instead, instantiate the actor locally and add it to + # global_worker's dictionary + if ray.worker.global_worker.mode == ray.PYTHON_MODE: + ray.worker.global_worker.actors[self._ray_actor_id] = ( + Class.__new__(Class)) + else: + # Export the actor class if it has not been exported yet. + if len(exported) == 0: + export_actor_class(class_id, Class, + self._ray_actor_methods.keys(), + checkpoint_interval, + ray.worker.global_worker) + exported.append(0) + # Export the actor. + export_actor(self._ray_actor_id, class_id, + self._ray_actor_methods.keys(), num_cpus, + num_gpus, ray.worker.global_worker) + # Call __init__ as a remote function. if "__init__" in self._ray_actor_methods.keys(): self._actor_method_call( @@ -340,6 +349,14 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval): ray.worker.check_connected() ray.worker.check_main_thread() args = signature.extend_args(function_signature, args, kwargs) + + # Execute functions locally if Ray is run in PYTHON_MODE + # Copy args to prevent the function from mutating them. + if ray.worker.global_worker.mode == ray.PYTHON_MODE: + return getattr( + ray.worker.global_worker.actors[self._ray_actor_id], + attr)(*copy.deepcopy(args)) + # Add the current actor cursor, a dummy object returned by the most # recent method invocation, as a dependency for the next method # invocation. diff --git a/python/ray/worker.py b/python/ray/worker.py index 81c355fc7..a24f1b9af 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -2067,6 +2067,12 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): check_connected(worker) with log_span("ray:wait", worker=worker): check_main_thread() + + # When Ray is run in PYTHON_MODE, all functions are run immediately, + # so all objects in object_id are ready. + if worker.mode == PYTHON_MODE: + return object_ids[:num_returns], object_ids[num_returns:] + object_id_strs = [plasma.ObjectID(object_id.id()) for object_id in object_ids] timeout = timeout if timeout is not None else 2 ** 30 diff --git a/test/runtest.py b/test/runtest.py index 361416e60..684832524 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -907,6 +907,44 @@ class PythonModeTest(unittest.TestCase): assert_equal(aref, np.array([0, 0])) assert_equal(bref, np.array([1, 0])) + # wait should return the first num_returns values passed in as the + # first list and the remaining values as the second list + num_returns = 5 + object_ids = [ray.put(i) for i in range(20)] + ready, remaining = ray.wait(object_ids, num_returns=num_returns, + timeout=None) + assert_equal(ready, object_ids[:num_returns]) + assert_equal(remaining, object_ids[num_returns:]) + + # Test actors in PYTHON_MODE. + + @ray.remote + class PythonModeTestClass(object): + def __init__(self, array): + self.array = array + + def set_array(self, array): + self.array = array + + def get_array(self): + return self.array + + def modify_and_set_array(self, array): + array[0] = -1 + self.array = array + + test_actor = PythonModeTestClass.remote(np.arange(10)) + # Remote actor functions should return by value + assert_equal(test_actor.get_array.remote(), np.arange(10)) + + test_array = np.arange(10) + # Remote actor functions should not mutate arguments + test_actor.modify_and_set_array.remote(test_array) + assert_equal(test_array, np.arange(10)) + # Remote actor functions should keep state + test_array[0] = -1 + assert_equal(test_array, test_actor.get_array.remote()) + ray.worker.cleanup()