Implement a first pass at actors in the API. (#242)

* Implement actor field for tasks

* Implement actor management in local scheduler.

* initial python frontend for actors

* import actors on worker

* IPython code completion and tests

* prepare creating actors through local schedulers

* add actor id to PyTask

* submit actor calls to local scheduler

* starting to integrate

* simple fix

* Fixes from rebasing.

* more work on python actors

* Improve local scheduler actor handlers.

* Pass actor ID to local scheduler when connecting a client.

* first working version of actors

* fixing actors

* fix creating two copies of the same actor

* fix actors

* remove sleep

* get rid of export synchronization

* update

* insert actor methods into the queue in the right order

* remove print statements

* make it compile again after rebase

* Minor updates.

* fix python actor ids

* Pass actor_id to start_worker.

* add test

* Minor changes.

* Update actor tests.

* Temporary plan for import counter.

* Temporarily fix import counters.

* Fix some tests.

* Fixes.

* Make actor creation non-blocking.

* Fix test?

* Fix actors on Python 2.

* fix rare case.

* Fix python 2 test.

* More tests.

* Small fixes.

* Linting.

* Revert tensorflow version to 0.12.0 temporarily.

* Small fix.

* Enhance inheritance test.
This commit is contained in:
Philipp Moritz 2017-02-15 00:10:05 -08:00 committed by Robert Nishihara
parent 072eadd57f
commit 12a68e84d2
32 changed files with 1812 additions and 117 deletions

View file

@ -74,6 +74,7 @@ script:
- python test/runtest.py
- python test/array_test.py
- python test/actor_test.py
- python test/tensorflow_test.py
- python test/failure_test.py
- python test/microbenchmarks.py

View file

@ -20,7 +20,8 @@ fi
if [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "linux" ]]; then
sudo apt-get update
sudo apt-get install -y cmake build-essential autoconf curl libtool python-dev python-numpy python-pip libboost-all-dev unzip
sudo pip install cloudpickle funcsigs colorama psutil redis tensorflow
sudo pip install cloudpickle funcsigs colorama psutil redis
sudo pip install tensorflow==0.12.0
elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "linux" ]]; then
sudo apt-get update
sudo apt-get install -y cmake python-dev python-numpy build-essential autoconf curl libtool libboost-all-dev unzip
@ -28,7 +29,8 @@ elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "linux" ]]; then
wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh -O miniconda.sh
bash miniconda.sh -b -p $HOME/miniconda
export PATH="$HOME/miniconda/bin:$PATH"
pip install numpy cloudpickle funcsigs colorama psutil redis tensorflow
pip install numpy cloudpickle funcsigs colorama psutil redis
pip install tensorflow==0.12.0
elif [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "macosx" ]]; then
# check that brew is installed
which -s brew
@ -41,7 +43,8 @@ elif [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "macosx" ]]; then
fi
brew install cmake automake autoconf libtool boost
sudo easy_install pip
sudo pip install numpy cloudpickle funcsigs colorama psutil redis tensorflow --ignore-installed six
sudo pip install numpy cloudpickle funcsigs colorama psutil redis --ignore-installed six
sudo pip install tensorflow==0.12.0 --ignore-installed six
elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "macosx" ]]; then
# check that brew is installed
which -s brew
@ -57,7 +60,8 @@ elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "macosx" ]]; then
wget https://repo.continuum.io/miniconda/Miniconda3-latest-MacOSX-x86_64.sh -O miniconda.sh
bash miniconda.sh -b -p $HOME/miniconda
export PATH="$HOME/miniconda/bin:$PATH"
pip install numpy cloudpickle funcsigs colorama psutil redis tensorflow
pip install numpy cloudpickle funcsigs colorama psutil redis
pip install tensorflow==0.12.0
else
echo "Unrecognized environment."
exit 1

View file

@ -23,6 +23,8 @@ PLASMA_STORE_MEMORY = 1000000000
ID_SIZE = 20
NUM_CLUSTER_NODES = 2
NIL_ACTOR_ID = 20 * b"\xff"
# These constants must match the scheduling state enum in task.h.
TASK_STATUS_WAITING = 1
TASK_STATUS_SCHEDULED = 2
@ -92,7 +94,7 @@ class TestGlobalScheduler(unittest.TestCase):
redis_address=redis_address,
static_resource_list=[10, 0])
# Connect to the scheduler.
photon_client = photon.PhotonClient(local_scheduler_name)
photon_client = photon.PhotonClient(local_scheduler_name, NIL_ACTOR_ID)
self.photon_clients.append(photon_client)
self.local_scheduler_pids.append(p4)
@ -149,7 +151,9 @@ class TestGlobalScheduler(unittest.TestCase):
def test_task_default_resources(self):
task1 = photon.Task(random_driver_id(), random_function_id(), [random_object_id()], 0, random_task_id(), 0)
self.assertEqual(task1.required_resources(), [1.0, 0.0])
task2 = photon.Task(random_driver_id(), random_function_id(), [random_object_id()], 0, random_task_id(), 0, [1.0, 2.0])
task2 = photon.Task(random_driver_id(), random_function_id(),
[random_object_id()], 0, random_task_id(), 0,
photon.ObjectID(NIL_ACTOR_ID), 0, [1.0, 2.0])
self.assertEqual(task2.required_resources(), [1.0, 2.0])
def test_redis_only_single_task(self):

View file

@ -18,6 +18,8 @@ import plasma
USE_VALGRIND = False
ID_SIZE = 20
NIL_ACTOR_ID = 20 * b"\xff"
def random_object_id():
return photon.ObjectID(np.random.bytes(ID_SIZE))
@ -39,7 +41,7 @@ class TestPhotonClient(unittest.TestCase):
# Start a local scheduler.
scheduler_name, self.p2 = photon.start_local_scheduler(plasma_store_name, use_valgrind=USE_VALGRIND)
# Connect to the scheduler.
self.photon_client = photon.PhotonClient(scheduler_name)
self.photon_client = photon.PhotonClient(scheduler_name, NIL_ACTOR_ID)
def tearDown(self):
# Check that the processes are still alive.

View file

@ -16,5 +16,6 @@ if hasattr(ctypes, "windll"):
import ray.experimental
import ray.serialization
from ray.worker import register_class, error_info, init, connect, disconnect, get, put, wait, remote, log_event, log_span, flush_log
from ray.actor import actor
from ray.worker import EnvironmentVariable, env
from ray.worker import SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, SILENT_MODE

141
python/ray/actor.py Normal file
View file

@ -0,0 +1,141 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import hashlib
import inspect
import numpy as np
import photon
import random
import ray.pickling as pickling
import ray.worker
import ray.experimental.state as state
def random_string():
return np.random.bytes(20)
def random_actor_id():
return photon.ObjectID(random_string())
def get_actor_method_function_id(attr):
"""Get the function ID corresponding to an actor method.
Args:
attr (str): The attribute name of the method.
Returns:
Function ID corresponding to the method.
"""
function_id = hashlib.sha1()
function_id.update(attr.encode("ascii"))
return photon.ObjectID(function_id.digest())
def fetch_and_register_actor(key, worker):
"""Import an actor."""
driver_id, actor_id_str, actor_name, module, pickled_class, class_export_counter = \
worker.redis_client.hmget(key, ["driver_id", "actor_id", "name", "module", "class", "class_export_counter"])
actor_id = photon.ObjectID(actor_id_str)
actor_name = actor_name.decode("ascii")
module = module.decode("ascii")
class_export_counter = int(class_export_counter)
try:
unpickled_class = pickling.loads(pickled_class)
except:
raise NotImplemented("TODO(pcm)")
else:
# TODO(pcm): Why is the below line necessary?
unpickled_class.__module__ = module
worker.actors[actor_id_str] = unpickled_class.__new__(unpickled_class)
for (k, v) in inspect.getmembers(unpickled_class, predicate=(lambda x: inspect.isfunction(x) or inspect.ismethod(x))):
function_id = get_actor_method_function_id(k).id()
worker.function_names[function_id] = k
worker.functions[function_id] = v
def export_actor(actor_id, Class, worker):
"""Export an actor to redis.
Args:
actor_id: The ID of the actor.
Class: Name of the class to be exported as an actor.
worker: The worker class
"""
ray.worker.check_main_thread()
if worker.mode is None:
raise NotImplemented("TODO(pcm): Cache actors")
key = "Actor:{}".format(actor_id.id())
pickled_class = pickling.dumps(Class)
# Select a local scheduler for the actor.
local_schedulers = state.get_local_schedulers()
local_scheduler_id = random.choice(local_schedulers)
worker.redis_client.publish("actor_notifications", actor_id.id() + local_scheduler_id)
# The export counter is computed differently depending on whether we are
# currently in a driver or a worker.
if worker.mode in [ray.SCRIPT_MODE, ray.SILENT_MODE]:
export_counter = worker.driver_export_counter
elif worker.mode == ray.WORKER_MODE:
# We don't actually need export counters for actors.
export_counter = 0
d = {"driver_id": worker.task_driver_id.id(),
"actor_id": actor_id.id(),
"name": Class.__name__,
"module": Class.__module__,
"class": pickled_class,
"class_export_counter": export_counter}
worker.redis_client.hmset(key, d)
worker.redis_client.rpush("Exports", key)
worker.driver_export_counter += 1
def actor(Class):
# The function actor_method_call gets called if somebody tries to call a
# method on their local actor stub object.
def actor_method_call(actor_id, attr, *args, **kwargs):
ray.worker.check_connected()
ray.worker.check_main_thread()
args = list(args)
if len(kwargs) > 0:
raise Exception("Actors currently do not support **kwargs.")
function_id = get_actor_method_function_id(attr)
# TODO(pcm): Extend args with keyword args.
# For now, actor methods should not require resources beyond the resources
# used by the actor.
num_cpus = 0
num_gpus = 0
object_ids = ray.worker.global_worker.submit_task(function_id, "", args,
num_cpus, num_gpus,
actor_id=actor_id)
if len(object_ids) == 1:
return object_ids[0]
elif len(object_ids) > 1:
return object_ids
class NewClass(object):
def __init__(self, *args, **kwargs):
self._ray_actor_id = random_actor_id()
self._ray_actor_methods = {k: v for (k, v) in inspect.getmembers(Class, predicate=(lambda x: inspect.isfunction(x) or inspect.ismethod(x)))}
export_actor(self._ray_actor_id, Class, ray.worker.global_worker)
# Call __init__ as a remote function.
if "__init__" in self._ray_actor_methods.keys():
actor_method_call(self._ray_actor_id, "__init__", *args, **kwargs)
else:
print("WARNING: this object has no __init__ method.")
# Make tab completion work.
def __dir__(self):
return self._ray_actor_methods
def __getattribute__(self, attr):
# The following is needed so we can still access self.actor_methods.
if attr in ["_ray_actor_id", "_ray_actor_methods"]:
return super(NewClass, self).__getattribute__(attr)
if attr in self._ray_actor_methods.keys():
return lambda *args, **kwargs: actor_method_call(self._ray_actor_id, attr, *args, **kwargs)
# There is no method with this name, so raise an exception.
raise AttributeError("'{}' Actor object has no attribute '{}'".format(Class, attr))
def __repr__(self):
return "Actor(" + self._ray_actor_id.hex() + ")"
return NewClass
ray.worker.global_worker.fetch_and_register["Actor"] = fetch_and_register_actor

View file

@ -0,0 +1,13 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import ray.worker
def get_local_schedulers():
local_schedulers = []
for client in ray.worker.global_worker.redis_client.keys("CL:*"):
client_type, ray_client_id = ray.worker.global_worker.redis_client.hmget(client, "client_type", "ray_client_id")
if client_type == b"photon":
local_schedulers.append(ray_client_id)
return local_schedulers

View file

@ -9,6 +9,7 @@ import sys
import time
import traceback
import copy
import collections
import funcsigs
import numpy as np
import colorama
@ -39,6 +40,9 @@ ERROR_KEY_PREFIX = b"Error:"
DRIVER_ID_LENGTH = 20
ERROR_ID_LENGTH = 20
# This must match the definition of NIL_ACTOR_ID in task.h.
NIL_ACTOR_ID = 20 * b"\xff"
# When performing ray.get, wait 1 second before attemping to reconstruct and
# fetch the object again.
GET_TIMEOUT_MILLISECONDS = 1000
@ -378,15 +382,58 @@ class Worker(object):
def __init__(self):
"""Initialize a Worker object."""
self.functions = {}
self.num_return_vals = {}
# Use a defaultdict for the number of return values. If this is accessed
# with a missing key, the default value of 1 is returned, and that key value
# pair is added to the dict.
self.num_return_vals = collections.defaultdict(lambda: 1)
self.function_names = {}
self.function_export_counters = {}
self.connected = False
self.mode = None
self.cached_remote_functions = []
self.cached_functions_to_run = []
# The driver_export_counter and worker_import_counter are used to make sure
# that no task executes before everything it needs is present. For example,
# if we define a remote function f, a worker cannot execute a task for f
# until the worker has imported the function f.
# - When a remote function, a reusable variable, or a function to run is
# exported, the driver_export_counter is incremented. These exports must
# take place from the driver.
# - When an actor is created, the driver_export_counter is NOT
# incremented. Note that an actor can be created from a driver or from
# any worker.
# - When a worker imports a remote function, a reusable variable, or a
# function to run, its worker_import_counter is incremented.
# - Notably, when an actor is imported, its worker_import_counter is NOT
# incremented.
# - Whenever a remote function is DEFINED on the driver, it records the
# value of the driver_export_counter and a worker will not execute that
# remote function until it has imported that many exports (excluding
# actors).
# - When an actor is defined.
# a) If the actor is created on a driver, it records the
# driver_export_counter.
# b) If the actor is created inside a task on a regular worker, it
# records the driver_export_counter associated with the function in
# task creating the actor.
# c) If the actor is created inside a task on an actor worker, it
# records
# The worker that ultimately runs the actor will not execute any tasks
# until it has imported that many imports.
#
# TODO(rkn): These counters must be tracked separately for each driver.
# TODO(rkn): Maybe none of these counters are necessary? When executing a
# regular task, workers can just wait until the function ID is present. When
# executing an actor task, the actor worker can just wait until the actor
# has been defined.
self.driver_export_counter = 0
self.worker_import_counter = 0
self.fetch_and_register = {}
self.actors = {}
# Use a defaultdict for the actor counts. If this is accessed with a missing
# key, the default value of 0 is returned, and that key value pair is added
# to the dict.
self.actor_counters = collections.defaultdict(lambda: 0)
def set_mode(self, mode):
"""Set the mode of the worker.
@ -479,7 +526,7 @@ class Worker(object):
assert final_results[i][0] == object_ids[i].id()
return [result[1][0] for result in final_results]
def submit_task(self, function_id, func_name, args, num_cpus, num_gpus):
def submit_task(self, function_id, func_name, args, num_cpus, num_gpus, actor_id=photon.ObjectID(NIL_ACTOR_ID)):
"""Submit a remote task to the scheduler.
Tell the scheduler to schedule the execution of the function with name
@ -514,10 +561,12 @@ class Worker(object):
self.num_return_vals[function_id.id()],
self.current_task_id,
self.task_index,
actor_id, self.actor_counters[actor_id],
[num_cpus, num_gpus])
# Increment the worker's task index to track how many tasks have been
# submitted by the current task so far.
self.task_index += 1
self.actor_counters[actor_id] += 1
self.photon_client.submit(task)
return task.returns()
@ -856,7 +905,7 @@ def _init(address_info=None,
"manager_socket_name": address_info["object_store_addresses"][0].manager_name,
"local_scheduler_socket_name": address_info["local_scheduler_socket_names"][0],
}
connect(driver_address_info, object_id_seed=object_id_seed, mode=driver_mode, worker=global_worker)
connect(driver_address_info, object_id_seed=object_id_seed, mode=driver_mode, worker=global_worker, actor_id=NIL_ACTOR_ID)
return address_info
def init(redis_address=None, node_ip_address=None, object_id_seed=None,
@ -1086,6 +1135,9 @@ def import_thread(worker):
worker_info_key = "WorkerInfo:{}".format(worker.worker_id)
worker.redis_client.hset(worker_info_key, "export_counter", 0)
worker.worker_import_counter = 0
# The number of imports is similar to the worker_import_counter except that it
# also counts actors.
num_imported = 0
# Get the exports that occurred before the call to psubscribe.
with worker.lock:
@ -1097,10 +1149,19 @@ def import_thread(worker):
fetch_and_register_environment_variable(key, worker=worker)
elif key.startswith(b"FunctionsToRun"):
fetch_and_execute_function_to_run(key, worker=worker)
elif key.startswith(b"Actor"):
# Only get the actor if the actor ID matches the actor ID of this
# worker.
actor_id, = worker.redis_client.hmget(key, "actor_id")
if worker.actor_id == actor_id:
worker.fetch_and_register["Actor"](key, worker)
else:
raise Exception("This code should be unreachable.")
# Actors do not contribute to the import counter.
if not key.startswith(b"Actor"):
worker.redis_client.hincrby(worker_info_key, "export_counter", 1)
worker.worker_import_counter += 1
num_imported += 1
for msg in worker.import_pubsub_client.listen():
with worker.lock:
@ -1108,8 +1169,8 @@ def import_thread(worker):
continue
assert msg["data"] == b"rpush"
num_imports = worker.redis_client.llen("Exports")
assert num_imports >= worker.worker_import_counter
for i in range(worker.worker_import_counter, num_imports):
assert num_imports >= num_imported
for i in range(num_imported, num_imports):
key = worker.redis_client.lindex("Exports", i)
if key.startswith(b"RemoteFunction"):
with log_span("ray:import_remote_function", worker=worker):
@ -1120,12 +1181,21 @@ def import_thread(worker):
elif key.startswith(b"FunctionsToRun"):
with log_span("ray:import_function_to_run", worker=worker):
fetch_and_execute_function_to_run(key, worker=worker)
elif key.startswith(b"Actor"):
# Only get the actor if the actor ID matches the actor ID of this
# worker.
actor_id, = worker.redis_client.hmget(key, "actor_id")
if worker.actor_id == actor_id:
worker.fetch_and_register["Actor"](key, worker)
else:
raise Exception("This code should be unreachable.")
# Actors do not contribute to the import counter.
if not key.startswith(b"Actor"):
worker.redis_client.hincrby(worker_info_key, "export_counter", 1)
worker.worker_import_counter += 1
num_imported += 1
def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker):
def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, actor_id=NIL_ACTOR_ID):
"""Connect this worker to the local scheduler, to Plasma, and to Redis.
Args:
@ -1143,6 +1213,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker):
assert env._cached_environment_variables is not None, error_message
# Initialize some fields.
worker.worker_id = random_string()
worker.actor_id = actor_id
worker.connected = True
worker.set_mode(mode)
# The worker.events field is used to aggregate logging information and display
@ -1163,7 +1234,8 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker):
# Create an object store client.
worker.plasma_client = plasma.PlasmaClient(info["store_socket_name"], info["manager_socket_name"])
# Create the local scheduler client.
worker.photon_client = photon.PhotonClient(info["local_scheduler_socket_name"])
worker.photon_client = photon.PhotonClient(info["local_scheduler_socket_name"], worker.actor_id)
# Register the worker with Redis.
if mode in [SCRIPT_MODE, SILENT_MODE]:
# The concept of a driver is the same as the concept of a "job". Register
# the driver/job with Redis here.
@ -1458,7 +1530,11 @@ def wait_for_valid_import_counter(function_id, driver_id, timeout=5, worker=glob
may indicate a problem somewhere and we will push an error message to the
user.
If this worker is an actor, then this will wait until the actor has been
defined.
Args:
is_actor (bool): True if this worker is an actor, and false otherwise.
function_id (str): The ID of the function that we want to execute.
driver_id (str): The ID of the driver to push the error message to if this
times out.
@ -1469,7 +1545,9 @@ def wait_for_valid_import_counter(function_id, driver_id, timeout=5, worker=glob
num_warnings_sent = 0
while True:
with worker.lock:
if function_id.id() in worker.functions and (worker.function_export_counters[function_id.id()] <= worker.worker_import_counter):
if worker.actor_id == NIL_ACTOR_ID and function_id.id() in worker.functions and (worker.function_export_counters[function_id.id()] <= worker.worker_import_counter):
break
elif worker.actor_id != NIL_ACTOR_ID and worker.actor_id in worker.actors:
break
if time.time() - start_time > timeout * (num_warnings_sent + 1):
if function_id.id() not in worker.functions:
@ -1530,6 +1608,7 @@ def main_loop(worker=global_worker):
# correct driver.
worker.task_driver_id = task.driver_id()
worker.current_task_id = task.task_id()
worker.current_function_id = task.function_id().id()
worker.task_index = 0
worker.put_index = 0
function_id = task.function_id()
@ -1543,7 +1622,10 @@ def main_loop(worker=global_worker):
# Execute the task.
with log_span("ray:task:execute", worker=worker):
outputs = worker.functions[function_id.id()].executor(arguments)
if task.actor_id().id() == NIL_ACTOR_ID:
outputs = worker.functions[task.function_id().id()].executor(arguments)
else:
outputs = worker.functions[task.function_id().id()](worker.actors[task.actor_id().id()], *arguments)
# Store the outputs in the local object store.
with log_span("ray:task:store_outputs", worker=worker):
@ -1557,8 +1639,12 @@ def main_loop(worker=global_worker):
# occurred, we format the error message differently.
# whether the variables "arguments" and "outputs" are defined.
if "arguments" in locals() and "outputs" not in locals():
if task.actor_id().id() == NIL_ACTOR_ID:
# The error occurred during the task execution.
traceback_str = format_error_message(traceback.format_exc(), task_exception=True)
else:
# The error occurred during the execution of an actor task.
traceback_str = format_error_message(traceback.format_exc())
elif "arguments" in locals() and "outputs" in locals():
# The error occurred after the task executed.
traceback_str = format_error_message(traceback.format_exc())

View file

@ -6,6 +6,8 @@ import argparse
import numpy as np
import redis
import traceback
import sys
import binascii
import ray
@ -15,6 +17,7 @@ parser.add_argument("--redis-address", required=True, type=str, help="the addres
parser.add_argument("--object-store-name", required=True, type=str, help="the object store's name")
parser.add_argument("--object-store-manager-name", required=True, type=str, help="the object store manager's name")
parser.add_argument("--local-scheduler-name", required=True, type=str, help="the local scheduler's name")
parser.add_argument("--actor-id", required=False, type=str, help="the actor ID of this worker")
def random_string():
return np.random.bytes(20)
@ -26,7 +29,10 @@ if __name__ == "__main__":
"store_socket_name": args.object_store_name,
"manager_socket_name": args.object_store_manager_name,
"local_scheduler_socket_name": args.local_scheduler_name}
ray.worker.connect(info, ray.WORKER_MODE)
actor_id = binascii.unhexlify(args.actor_id) if not args.actor_id is None else ray.worker.NIL_ACTOR_ID
ray.worker.connect(info, mode=ray.WORKER_MODE, actor_id=actor_id)
error_explanation = """
This error is unexpected and should not have happened. Somehow a worker crashed

View file

@ -25,6 +25,7 @@ add_library(common STATIC
state/object_table.c
state/task_table.c
state/db_client_table.c
state/actor_notification_table.c
state/local_scheduler_table.c
thirdparty/ae/ae.c
thirdparty/sha256.c)

View file

@ -263,7 +263,13 @@ PyTypeObject PyObjectIDType = {
/* Define the PyTask class. */
static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) {
/* ID of the driver that this task originates from. */
unique_id driver_id;
/* ID of the actor this task should run on. */
unique_id actor_id = NIL_ACTOR_ID;
/* How many tasks have been launched on the actor so far? */
int actor_counter = 0;
/* ID of the function this task executes. */
function_id function_id;
/* Arguments of the task (can be PyObjectIDs or Python values). */
PyObject *arguments;
@ -277,10 +283,11 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) {
int parent_counter;
/* Resource vector of the required resources to execute this task. */
PyObject *resource_vector = NULL;
if (!PyArg_ParseTuple(args, "O&O&OiO&i|O", &PyObjectToUniqueID, &driver_id,
if (!PyArg_ParseTuple(args, "O&O&OiO&i|O&iO", &PyObjectToUniqueID, &driver_id,
&PyObjectToUniqueID, &function_id, &arguments,
&num_returns, &PyObjectToUniqueID, &parent_task_id,
&parent_counter, &resource_vector)) {
&parent_counter, &PyObjectToUniqueID, &actor_id,
&actor_counter, &resource_vector)) {
return -1;
}
Py_ssize_t size = PyList_Size(arguments);
@ -299,9 +306,9 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) {
}
/* Construct the task specification. */
int val_repr_index = 0;
self->spec = start_construct_task_spec(driver_id, parent_task_id,
parent_counter, function_id, size,
num_returns, value_data_bytes);
self->spec = start_construct_task_spec(
driver_id, parent_task_id, parent_counter, actor_id, actor_counter,
function_id, size, num_returns, value_data_bytes);
/* Add the task arguments. */
for (Py_ssize_t i = 0; i < size; ++i) {
PyObject *arg = PyList_GetItem(arguments, i);
@ -350,6 +357,11 @@ static PyObject *PyTask_function_id(PyObject *self) {
return PyObjectID_make(function_id);
}
static PyObject *PyTask_actor_id(PyObject *self) {
actor_id actor_id = task_spec_actor_id(((PyTask *) self)->spec);
return PyObjectID_make(actor_id);
}
static PyObject *PyTask_driver_id(PyObject *self) {
unique_id driver_id = task_spec_driver_id(((PyTask *) self)->spec);
return PyObjectID_make(driver_id);
@ -407,6 +419,8 @@ static PyObject *PyTask_returns(PyObject *self) {
static PyMethodDef PyTask_methods[] = {
{"function_id", (PyCFunction) PyTask_function_id, METH_NOARGS,
"Return the function ID for this task."},
{"actor_id", (PyCFunction) PyTask_actor_id, METH_NOARGS,
"Return the actor ID for this task."},
{"driver_id", (PyCFunction) PyTask_driver_id, METH_NOARGS,
"Return the driver ID for this task."},
{"task_id", (PyCFunction) PyTask_task_id, METH_NOARGS,

View file

@ -0,0 +1,16 @@
#include "actor_notification_table.h"
#include "redis.h"
void actor_notification_table_subscribe(
db_handle *db_handle,
actor_notification_table_subscribe_callback subscribe_callback,
void *subscribe_context,
retry_info *retry) {
actor_notification_table_subscribe_data *sub_data =
malloc(sizeof(actor_notification_table_subscribe_data));
sub_data->subscribe_callback = subscribe_callback;
sub_data->subscribe_context = subscribe_context;
init_table_callback(db_handle, NIL_ID, __func__, sub_data, retry, NULL,
redis_actor_notification_table_subscribe, NULL);
}

View file

@ -0,0 +1,47 @@
#ifndef ACTOR_NOTIFICATION_TABLE_H
#define ACTOR_NOTIFICATION_TABLE_H
#include "task.h"
#include "db.h"
#include "table.h"
typedef struct {
/** The ID of the actor. */
actor_id actor_id;
/** The ID of the local scheduler that is responsible for the actor. */
db_client_id local_scheduler_id;
} actor_info;
/*
* ==== Subscribing to the actor notification table ====
*/
/* Callback for subscribing to the local scheduler table. */
typedef void (*actor_notification_table_subscribe_callback)(actor_info info,
void *user_context);
/**
* Register a callback to process actor notification events.
*
* @param db_handle Database handle.
* @param subscribe_callback Callback that will be called when the local
* scheduler event happens.
* @param subscribe_context Context that will be passed into the
* subscribe_callback.
* @param retry Information about retrying the request to the database.
* @return Void.
*/
void actor_notification_table_subscribe(
db_handle *db_handle,
actor_notification_table_subscribe_callback subscribe_callback,
void *subscribe_context,
retry_info *retry);
/* Data that is needed to register local scheduler table subscribe callbacks
* with the state database. */
typedef struct {
actor_notification_table_subscribe_callback subscribe_callback;
void *subscribe_context;
} actor_notification_table_subscribe_data;
#endif /* ACTOR_NOTIFICATION_TABLE_H */

View file

@ -70,7 +70,7 @@ void local_scheduler_table_send_info(db_handle *db_handle,
local_scheduler_info *info,
retry_info *retry);
/* Data that is needed to publish local scheduer heartbeats to the local
/* Data that is needed to publish local scheduler heartbeats to the local
* scheduler table. */
typedef struct {
local_scheduler_info info;

View file

@ -12,6 +12,7 @@
#include "common.h"
#include "db.h"
#include "db_client_table.h"
#include "actor_notification_table.h"
#include "local_scheduler_table.h"
#include "object_table.h"
#include "object_info.h"
@ -1063,7 +1064,7 @@ void redis_local_scheduler_table_subscribe_callback(redisAsyncContext *c,
CHECK(reply->type == REDIS_REPLY_ARRAY);
CHECK(reply->elements == 3);
redisReply *message_type = reply->element[0];
LOG_DEBUG("Local scheduer table subscribe callback, message %s",
LOG_DEBUG("Local scheduler table subscribe callback, message %s",
message_type->str);
if (strcmp(message_type->str, "message") == 0) {
@ -1130,6 +1131,57 @@ void redis_local_scheduler_table_send_info(table_callback_data *callback_data) {
}
}
void redis_actor_notification_table_subscribe_callback(redisAsyncContext *c,
void *r,
void *privdata) {
REDIS_CALLBACK_HEADER(db, callback_data, r);
redisReply *reply = r;
CHECK(reply->type == REDIS_REPLY_ARRAY);
CHECK(reply->elements == 3);
redisReply *message_type = reply->element[0];
LOG_DEBUG("Local scheduler table subscribe callback, message %s",
message_type->str);
if (strcmp(message_type->str, "message") == 0) {
/* Handle an actor notification message. Parse the payload and call the
* subscribe callback. */
redisReply *payload = reply->element[2];
actor_notification_table_subscribe_data *data = callback_data->data;
actor_info info;
/* The payload should be the concatenation of these two structs. */
CHECK(sizeof(info.actor_id) + sizeof(info.local_scheduler_id) ==
payload->len);
memcpy(&info.actor_id, payload->str, sizeof(info.actor_id));
memcpy(&info.local_scheduler_id, payload->str + sizeof(info.actor_id),
sizeof(info.local_scheduler_id));
if (data->subscribe_callback) {
data->subscribe_callback(info, data->subscribe_context);
}
} else if (strcmp(message_type->str, "subscribe") == 0) {
/* The reply for the initial SUBSCRIBE command. */
CHECK(callback_data->done_callback == NULL);
/* If the initial SUBSCRIBE was successful, clean up the timer, but don't
* destroy the callback data. */
event_loop_remove_timer(db->loop, callback_data->timer_id);
} else {
LOG_FATAL("Unexpected reply type from actor notification subscribe.");
}
}
void redis_actor_notification_table_subscribe(
table_callback_data *callback_data) {
db_handle *db = callback_data->db_handle;
int status = redisAsyncCommand(
db->sub_context, redis_actor_notification_table_subscribe_callback,
(void *) callback_data->timer_id, "SUBSCRIBE actor_notifications");
if ((status == REDIS_ERR) || db->sub_context->err) {
LOG_REDIS_DEBUG(db->sub_context,
"error in redis_actor_notification_table_subscribe");
}
}
void redis_object_info_subscribe_callback(redisAsyncContext *c,
void *r,
void *privdata) {

View file

@ -245,6 +245,16 @@ void redis_local_scheduler_table_subscribe(table_callback_data *callback_data);
*/
void redis_local_scheduler_table_send_info(table_callback_data *callback_data);
/**
* Subscribe to updates about newly created actors.
*
* @param callback_data Data structure containing redis connection and timeout
* information.
* @return Void.
*/
void redis_actor_notification_table_subscribe(
table_callback_data *callback_data);
void redis_object_info_subscribe(table_callback_data *callback_data);
#endif /* REDIS_H */

View file

@ -44,6 +44,11 @@ struct task_spec_impl {
/** A count of the number of tasks submitted by the parent task before this
* one. */
int64_t parent_counter;
/** Actor ID of the task. This is the actor that this task is executed on
* or NIL_ACTOR_ID if the task is just a normal task. */
actor_id actor_id;
/** Number of tasks that have been submitted to this actor so far. */
int64_t actor_counter;
/** Function ID of the task. */
function_id function_id;
/** Total number of arguments. */
@ -81,6 +86,10 @@ bool task_id_is_nil(task_id id) {
return task_ids_equal(id, NIL_TASK_ID);
}
bool actor_ids_equal(actor_id first_id, actor_id second_id) {
return UNIQUE_ID_EQ(first_id, second_id);
}
bool function_ids_equal(function_id first_id, function_id second_id) {
return UNIQUE_ID_EQ(first_id, second_id);
}
@ -147,6 +156,8 @@ object_id task_compute_put_id(task_id task_id, int64_t put_index) {
task_spec *start_construct_task_spec(unique_id driver_id,
task_id parent_task_id,
int64_t parent_counter,
actor_id actor_id,
int64_t actor_counter,
function_id function_id,
int64_t num_args,
int64_t num_returns,
@ -158,6 +169,8 @@ task_spec *start_construct_task_spec(unique_id driver_id,
task->task_id = NIL_TASK_ID;
task->parent_task_id = parent_task_id;
task->parent_counter = parent_counter;
task->actor_id = actor_id;
task->actor_counter = actor_counter;
task->function_id = function_id;
task->num_args = num_args;
task->arg_index = 0;
@ -190,6 +203,18 @@ function_id task_function(task_spec *spec) {
return spec->function_id;
}
actor_id task_spec_actor_id(task_spec *spec) {
/* Check that the task has been constructed. */
DCHECK(!task_ids_equal(spec->task_id, NIL_TASK_ID));
return spec->actor_id;
}
int64_t task_spec_actor_counter(task_spec *spec) {
/* Check that the task has been constructed. */
DCHECK(!task_ids_equal(spec->task_id, NIL_TASK_ID));
return spec->actor_counter;
}
unique_id task_spec_driver_id(task_spec *spec) {
/* Check that the task has been constructed. */
DCHECK(!task_ids_equal(spec->task_id, NIL_TASK_ID));

View file

@ -15,6 +15,7 @@
#include "utstring.h"
#define NIL_TASK_ID NIL_ID
#define NIL_ACTOR_ID NIL_ID
#define NIL_FUNCTION_ID NIL_ID
typedef unique_id function_id;
@ -23,6 +24,10 @@ typedef unique_id function_id;
* executes and the argument IDs or argument values. */
typedef unique_id task_id;
/** The actor ID is the ID of the actor that a task must run on. If the task is
* not run on an actor, then NIL_ACTOR_ID should be used. */
typedef unique_id actor_id;
/** The task instance ID is a globally unique ID generated which identifies this
* particular execution of the task. */
typedef unique_id task_iid;
@ -55,6 +60,15 @@ bool task_ids_equal(task_id first_id, task_id second_id);
*/
bool task_id_is_nil(task_id id);
/**
* Compare two actor IDs.
*
* @param first_id The first actor ID to compare.
* @param second_id The first actor ID to compare.
* @return True if the actor IDs are the same and false otherwise.
*/
bool actor_ids_equal(actor_id first_id, actor_id second_id);
/**
* Compare two function IDs.
*
@ -83,6 +97,8 @@ bool function_id_is_nil(function_id id);
* @param parent_task_id The task ID of the task that submitted this task.
* @param parent_counter A counter indicating how many tasks were submitted by
* the parent task prior to this one.
* @param actor_id The ID of the actor this task belongs to.
* @param actor_counter Number of tasks that have been executed on this actor.
* @param function_id The function ID of the function to execute in this task.
* @param num_args The number of arguments that this task has.
* @param num_returns The number of return values that this task has.
@ -93,6 +109,8 @@ bool function_id_is_nil(function_id id);
task_spec *start_construct_task_spec(unique_id driver_id,
task_id parent_task_id,
int64_t parent_counter,
unique_id actor_id,
int64_t actor_counter,
function_id function_id,
int64_t num_args,
int64_t num_returns,
@ -124,6 +142,23 @@ int64_t task_spec_size(task_spec *spec);
*/
function_id task_function(task_spec *spec);
/**
* Return the actor ID of the task.
*
* @param spec The task_spec in question.
* @return The actor ID of the actor the task is part of.
*/
unique_id task_spec_actor_id(task_spec *spec);
/**
* Return the actor counter of the task. This starts at 0 and increments by 1
* every time a new task is submitted to run on the actor.
*
* @param spec The task_spec in question.
* @return The actor counter of the task.
*/
int64_t task_spec_actor_counter(task_spec *spec);
/**
* Return the driver ID of the task.
*

View file

@ -14,8 +14,8 @@ SUITE(task_tests);
TEST task_test(void) {
task_id parent_task_id = globally_unique_id();
function_id func_id = globally_unique_id();
task_spec *spec =
start_construct_task_spec(NIL_ID, parent_task_id, 0, func_id, 4, 2, 10);
task_spec *spec = start_construct_task_spec(
NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0, func_id, 4, 2, 10);
ASSERT(task_num_args(spec) == 4);
ASSERT(task_num_returns(spec) == 2);
@ -52,15 +52,15 @@ TEST deterministic_ids_test(void) {
uint8_t *arg2 = (uint8_t *) "hello world";
/* Construct a first task. */
task_spec *spec1 =
start_construct_task_spec(NIL_ID, parent_task_id, 0, func_id, 2, 3, 11);
task_spec *spec1 = start_construct_task_spec(
NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0, func_id, 2, 3, 11);
task_args_add_ref(spec1, arg1);
task_args_add_val(spec1, arg2, 11);
finish_construct_task_spec(spec1);
/* Construct a second identical task. */
task_spec *spec2 =
start_construct_task_spec(NIL_ID, parent_task_id, 0, func_id, 2, 3, 11);
task_spec *spec2 = start_construct_task_spec(
NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0, func_id, 2, 3, 11);
task_args_add_ref(spec2, arg1);
task_args_add_val(spec2, arg2, 11);
finish_construct_task_spec(spec2);
@ -78,36 +78,37 @@ TEST deterministic_ids_test(void) {
/* Create more tasks that are only mildly different. */
/* Construct a task with a different parent task ID. */
task_spec *spec3 = start_construct_task_spec(NIL_ID, globally_unique_id(), 0,
func_id, 2, 3, 11);
task_spec *spec3 = start_construct_task_spec(
NIL_ID, globally_unique_id(), 0, NIL_ACTOR_ID, 0, func_id, 2, 3, 11);
task_args_add_ref(spec3, arg1);
task_args_add_val(spec3, arg2, 11);
finish_construct_task_spec(spec3);
/* Construct a task with a different parent counter. */
task_spec *spec4 =
start_construct_task_spec(NIL_ID, parent_task_id, 1, func_id, 2, 3, 11);
task_spec *spec4 = start_construct_task_spec(
NIL_ID, parent_task_id, 1, NIL_ACTOR_ID, 0, func_id, 2, 3, 11);
task_args_add_ref(spec4, arg1);
task_args_add_val(spec4, arg2, 11);
finish_construct_task_spec(spec4);
/* Construct a task with a different function ID. */
task_spec *spec5 = start_construct_task_spec(NIL_ID, parent_task_id, 0,
task_spec *spec5 =
start_construct_task_spec(NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0,
globally_unique_id(), 2, 3, 11);
task_args_add_ref(spec5, arg1);
task_args_add_val(spec5, arg2, 11);
finish_construct_task_spec(spec5);
/* Construct a task with a different object ID argument. */
task_spec *spec6 =
start_construct_task_spec(NIL_ID, parent_task_id, 0, func_id, 2, 3, 11);
task_spec *spec6 = start_construct_task_spec(
NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0, func_id, 2, 3, 11);
task_args_add_ref(spec6, globally_unique_id());
task_args_add_val(spec6, arg2, 11);
finish_construct_task_spec(spec6);
/* Construct a task with a different value argument. */
task_spec *spec7 =
start_construct_task_spec(NIL_ID, parent_task_id, 0, func_id, 2, 3, 11);
task_spec *spec7 = start_construct_task_spec(
NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0, func_id, 2, 3, 11);
task_args_add_ref(spec7, arg1);
task_args_add_val(spec7, (uint8_t *) "hello_world", 11);
finish_construct_task_spec(spec7);
@ -148,8 +149,8 @@ TEST deterministic_ids_test(void) {
TEST send_task(void) {
task_id parent_task_id = globally_unique_id();
function_id func_id = globally_unique_id();
task_spec *spec =
start_construct_task_spec(NIL_ID, parent_task_id, 0, func_id, 4, 2, 10);
task_spec *spec = start_construct_task_spec(
NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0, func_id, 4, 2, 10);
task_args_add_ref(spec, globally_unique_id());
task_args_add_val(spec, (uint8_t *) "Hello", 5);
task_args_add_val(spec, (uint8_t *) "World", 5);

View file

@ -22,8 +22,8 @@ static inline task_spec *example_task_spec_with_args(int64_t num_args,
task_id parent_task_id = globally_unique_id();
function_id func_id = globally_unique_id();
task_spec *task =
start_construct_task_spec(NIL_ID, parent_task_id, 0, func_id, num_args,
num_returns, arg_value_size);
start_construct_task_spec(NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0,
func_id, num_args, num_returns, arg_value_size);
for (int64_t i = 0; i < num_args; ++i) {
object_id arg_id;
if (arg_ids == NULL) {

View file

@ -27,8 +27,9 @@ enum photon_message_type {
RECONSTRUCT_OBJECT,
/** Log a message to the event table. */
EVENT_LOG_MESSAGE,
/** Register a worker's process ID with the local scheduler. */
REGISTER_PID,
/** Send an initial connection message to the local scheduler.
* This contains the worker's process ID and actor ID. */
REGISTER_WORKER_INFO
};
/* These are needed to define the UT_arrays. */
@ -36,6 +37,27 @@ UT_icd task_ptr_icd;
UT_icd workers_icd;
UT_icd pid_t_icd;
/** This struct is used to register a new worker with the local scheduler.
* It is shipped as part of photon_connect */
typedef struct {
/** The ID of the actor. This is NIL_ACTOR_ID if the worker is not an actor.
*/
actor_id actor_id;
/** The process ID of this worker. */
pid_t worker_pid;
} register_worker_info;
/** This struct is used to maintain a mapping from actor IDs to the ID of the
* local scheduler that is responsible for the actor. */
typedef struct {
/** The ID of the actor. This is used as a key in the hash table. */
actor_id actor_id;
/** The ID of the local scheduler that is responsible for the actor. */
db_client_id local_scheduler_id;
/** Handle fo the hash table. */
UT_hash_handle hh;
} actor_map_entry;
/** Internal state of the scheduling algorithm. */
typedef struct scheduling_algorithm_state scheduling_algorithm_state;
@ -62,6 +84,9 @@ typedef struct {
/** List of the process IDs for child processes (workers) started by the
* local scheduler that have not sent a REGISTER_PID message yet. */
UT_array *child_pids;
/** A hash table mapping actor IDs to the db_client_id of the local scheduler
* that is responsible for the actor. */
actor_map_entry *actor_mapping;
/** The handle to the database. */
db_handle *db;
/** The Plasma client. */
@ -92,6 +117,9 @@ typedef struct {
pid_t pid;
/** Whether the client is a child process of the local scheduler. */
bool is_child;
/** The ID of the actor on this worker. If there is no actor running on this
* worker, this should be NIL_ACTOR_ID. */
actor_id actor_id;
/** A pointer to the local scheduler state. */
local_scheduler_state *local_scheduler_state;
} local_scheduler_client;

View file

@ -11,6 +11,10 @@
#include "photon_scheduler.h"
#include "common/task.h"
/* Declared for convenience. */
void remove_actor(scheduling_algorithm_state *algorithm_state,
actor_id actor_id);
typedef struct task_queue_entry {
/** The task that is queued. */
task_spec *spec;
@ -34,9 +38,34 @@ typedef struct {
UT_icd task_queue_entry_icd = {sizeof(task_queue_entry *), NULL, NULL, NULL};
/** This is used to define the queue of actor task specs for which the
* corresponding local scheduler is unknown. */
UT_icd task_spec_icd = {sizeof(task_spec *), NULL, NULL, NULL};
/** This is used to define the queue of available workers. */
UT_icd worker_icd = {sizeof(local_scheduler_client *), NULL, NULL, NULL};
/** This struct contains information about a specific actor. This struct will be
* used inside of a hash table. */
typedef struct {
/** The ID of the actor. This is used as a key in the hash table. */
actor_id actor_id;
/** The number of tasks that have been executed on this actor so far. This is
* used to guarantee the in-order execution of tasks on actors (in the order
* that the tasks were submitted). This is currently meaningful because we
* restrict the submission of tasks on actors to the process that created the
* actor. */
int64_t task_counter;
/** A queue of tasks to be executed on this actor. The tasks will be sorted by
* the order of their actor counters. */
task_queue_entry *task_queue;
/** The worker that the actor is running on. */
local_scheduler_client *worker;
/** True if the worker is available and false otherwise. */
bool worker_available;
/** Handle for the uthash table. */
UT_hash_handle hh;
} local_actor_info;
/** Part of the photon state that is maintained by the scheduling algorithm. */
struct scheduling_algorithm_state {
/** An array of pointers to tasks that are waiting for dependencies. */
@ -44,6 +73,16 @@ struct scheduling_algorithm_state {
/** An array of pointers to tasks whose dependencies are ready but that are
* waiting to be assigned to a worker. */
task_queue_entry *dispatch_task_queue;
/** This is a hash table from actor ID to information about that actor. In
* particular, a queue of tasks that are waiting to execute on that actor.
* This is only used for actors that exist locally. */
local_actor_info *local_actor_infos;
/** An array of actor tasks that have been submitted but this local scheduler
* doesn't know which local scheduler is responsible for them, so cannot
* assign them to the correct local scheduler yet. Whenever a notification
* about a new local scheduler arrives, we will resubmit all of these tasks
* locally. */
UT_array *cached_submitted_actor_tasks;
/** An array of worker indices corresponding to clients that are
* waiting for tasks. */
UT_array *available_workers;
@ -69,6 +108,8 @@ scheduling_algorithm_state *make_scheduling_algorithm_state(void) {
algorithm_state->waiting_task_queue = NULL;
algorithm_state->dispatch_task_queue = NULL;
utarray_new(algorithm_state->available_workers, &worker_icd);
utarray_new(algorithm_state->cached_submitted_actor_tasks, &task_spec_icd);
algorithm_state->local_actor_infos = NULL;
return algorithm_state;
}
@ -87,6 +128,22 @@ void free_scheduling_algorithm_state(
free_task_spec(elt->spec);
free(elt);
}
/* Remove all of the remaining actors. */
local_actor_info *actor_entry, *tmp_actor_entry;
HASH_ITER(hh, algorithm_state->local_actor_infos, actor_entry,
tmp_actor_entry) {
/* We do not call HASH_DELETE here because it will be called inside of
* remove_actor. */
remove_actor(algorithm_state, actor_entry->actor_id);
}
/* Free the list of cached actor task specs and the task specs themselves. */
for (int i = 0;
i < utarray_len(algorithm_state->cached_submitted_actor_tasks); ++i) {
task_spec **spec = (task_spec **) utarray_eltptr(
algorithm_state->cached_submitted_actor_tasks, i);
free(*spec);
}
utarray_free(algorithm_state->cached_submitted_actor_tasks);
/* Free the list of available workers. */
utarray_free(algorithm_state->available_workers);
/* Free the cached information about which objects are present locally. */
@ -129,6 +186,236 @@ void provide_scheduler_info(local_scheduler_state *state,
}
}
/**
* Create the local_actor_info struct for an actor worker that this local
* scheduler is responsible for. For a given actor, this will either be done
* when the first task for that actor arrives or when the worker running that
* actor connects to the local scheduler.
*
* @param algorithm_state The state of the scheduling algorithm.
* @param actor_id The actor ID of the actor being created.
* @param worker The worker struct for the worker that is running this actor.
* If the worker struct has not been created yet (meaning that the worker
* that is running this actor has not registered with the local scheduler
* yet, and so create_actor is being called because a task for that actor
* has arrived), then this should be NULL.
* @return Void.
*/
void create_actor(scheduling_algorithm_state *algorithm_state,
actor_id actor_id,
local_scheduler_client *worker) {
/* This will be freed when the actor is removed in remove_actor. */
local_actor_info *entry = malloc(sizeof(local_actor_info));
entry->actor_id = actor_id;
entry->task_counter = 0;
/* Initialize the doubly-linked list to NULL. */
entry->task_queue = NULL;
entry->worker = worker;
entry->worker_available = false;
HASH_ADD(hh, algorithm_state->local_actor_infos, actor_id, sizeof(actor_id),
entry);
/* Log some useful information about the actor that we created. */
char id_string[ID_STRING_SIZE];
LOG_DEBUG("Creating actor with ID %s.",
object_id_to_string(actor_id, id_string, ID_STRING_SIZE));
UNUSED(id_string);
}
void remove_actor(scheduling_algorithm_state *algorithm_state,
actor_id actor_id) {
local_actor_info *entry;
HASH_FIND(hh, algorithm_state->local_actor_infos, &actor_id, sizeof(actor_id),
entry);
/* Make sure the actor actually exists. */
CHECK(entry != NULL);
/* Log some useful information about the actor that we're removing. */
char id_string[ID_STRING_SIZE];
task_queue_entry *elt;
int count;
DL_COUNT(entry->task_queue, elt, count);
if (count > 0) {
LOG_WARN("Removing actor with ID %s and %d remaining tasks.",
object_id_to_string(actor_id, id_string, ID_STRING_SIZE), count);
}
UNUSED(id_string);
/* Free all remaining tasks in the actor queue. */
task_queue_entry *task_queue_elt, *tmp;
DL_FOREACH_SAFE(entry->task_queue, task_queue_elt, tmp) {
DL_DELETE(entry->task_queue, task_queue_elt);
free_task_spec(task_queue_elt->spec);
free(task_queue_elt);
}
/* Remove the entry from the hash table and free it. */
HASH_DELETE(hh, algorithm_state->local_actor_infos, entry);
free(entry);
}
void handle_actor_worker_connect(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
actor_id actor_id,
local_scheduler_client *worker) {
local_actor_info *entry;
HASH_FIND(hh, algorithm_state->local_actor_infos, &actor_id, sizeof(actor_id),
entry);
if (entry == NULL) {
create_actor(algorithm_state, actor_id, worker);
} else {
/* In this case, the local_actor_info struct was already been created by the
* first call to add_task_to_actor_queue. However, the worker field was not
* filled out, so fill out the correct worker field now. */
entry->worker = worker;
}
}
void handle_actor_worker_disconnect(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
actor_id actor_id) {
remove_actor(algorithm_state, actor_id);
}
/**
* This will add a task to the task queue for an actor. If this is the first
* task being processed for this actor, it is possible that the local_actor_info
* struct has not yet been created by create_worker (which happens when the
* actor worker connects to the local scheduler), so in that case this method
* will call create_actor.
*
* This method will also update the task table. TODO(rkn): Should we also update
* the task table in the case where the tasks are cached locally?
*
* @param state The state of the local scheduler.
* @param algorithm_state The state of the scheduling algorithm.
* @param spec The task spec to add.
* @param from_global_scheduler True if the task was assigned to this local
* scheduler by the global scheduler and false if it was submitted
* locally by a worker.
* @return Void.
*/
void add_task_to_actor_queue(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
task_spec *spec,
bool from_global_scheduler) {
actor_id actor_id = task_spec_actor_id(spec);
char tmp[ID_STRING_SIZE];
object_id_to_string(actor_id, tmp, ID_STRING_SIZE);
DCHECK(!actor_ids_equal(actor_id, NIL_ACTOR_ID));
/* Get the local actor entry for this actor. */
local_actor_info *entry;
HASH_FIND(hh, algorithm_state->local_actor_infos, &actor_id, sizeof(actor_id),
entry);
/* Handle the case in which there is no local_actor_info struct yet. */
if (entry == NULL) {
/* Create the actor struct with a NULL worker because the worker struct has
* not been created yet. The correct worker struct will be inserted when the
* actor worker connects to the local scheduler. */
create_actor(algorithm_state, actor_id, NULL);
HASH_FIND(hh, algorithm_state->local_actor_infos, &actor_id,
sizeof(actor_id), entry);
CHECK(entry != NULL);
}
int64_t task_counter = task_spec_actor_counter(spec);
/* As a sanity check, the counter of the new task should be greater than the
* number of tasks that have executed on this actor so far (since we are
* guaranteeing in-order execution of the tasks on the actor). TODO(rkn): This
* check will fail if the fault-tolerance mechanism resubmits a task on an
* actor. */
CHECK(task_counter >= entry->task_counter);
/* Create a new task queue entry. */
task_queue_entry *elt = malloc(sizeof(task_queue_entry));
elt->spec = (task_spec *) malloc(task_spec_size(spec));
memcpy(elt->spec, spec, task_spec_size(spec));
/* Add the task spec to the actor's task queue in a manner that preserves the
* order of the actor task counters. Iterate from the beginning of the queue
* to find the right place to insert the task queue entry. TODO(pcm): This
* makes submitting multiple actor tasks take quadratic time, which needs to
* be optimized. */
task_queue_entry *current_entry = entry->task_queue;
while (current_entry != NULL && current_entry->next != NULL &&
task_counter > task_spec_actor_counter(current_entry->spec)) {
current_entry = current_entry->next;
}
DL_APPEND_ELEM(entry->task_queue, current_entry, elt);
/* Update the task table. */
if (state->db != NULL) {
task *task =
alloc_task(spec, TASK_STATUS_QUEUED, get_db_client_id(state->db));
if (from_global_scheduler) {
/* If the task is from the global scheduler, it's already been added to
* the task table, so just update the entry. */
task_table_update(state->db, task, (retry_info *) &photon_retry, NULL,
NULL);
} else {
/* Otherwise, this is the first time the task has been seen in the system
* (unless it's a resubmission of a previous task), so add the entry. */
task_table_add_task(state->db, task, (retry_info *) &photon_retry, NULL,
NULL);
}
}
}
/**
* Dispatch a task to an actor if possible.
*
* @param state The state of the local scheduler.
* @param algorithm_state The state of the scheduling algorithm.
* @param actor_id The ID of the actor corresponding to the worker.
* @return True if a task was dispatched to the actor and false otherwise.
*/
bool dispatch_actor_task(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
actor_id actor_id) {
/* Make sure this worker actually is an actor. */
CHECK(!actor_ids_equal(actor_id, NIL_ACTOR_ID));
/* Make sure this actor belongs to this local scheduler. */
actor_map_entry *actor_entry;
HASH_FIND(hh, state->actor_mapping, &actor_id, sizeof(actor_id), actor_entry);
CHECK(actor_entry != NULL);
CHECK(db_client_ids_equal(actor_entry->local_scheduler_id,
get_db_client_id(state->db)));
/* Get the local actor entry for this actor. */
local_actor_info *entry;
HASH_FIND(hh, algorithm_state->local_actor_infos, &actor_id, sizeof(actor_id),
entry);
CHECK(entry != NULL);
if (entry->task_queue == NULL) {
/* There are no queued tasks for this actor, so we cannot dispatch a task to
* the actor. */
return false;
}
int64_t next_task_counter = task_spec_actor_counter(entry->task_queue->spec);
if (next_task_counter != entry->task_counter) {
/* We cannot execute the next task on this actor without violating the
* in-order execution guarantee for actor tasks. */
CHECK(next_task_counter > entry->task_counter);
return false;
}
/* If the worker is not available, we cannot assign a task to it. */
if (!entry->worker_available) {
return false;
}
/* Assign the first task in the task queue to the worker and mark the worker
* as unavailable. */
task_queue_entry *first_task = entry->task_queue;
entry->task_counter += 1;
assign_task_to_worker(state, first_task->spec, entry->worker);
entry->worker_available = false;
/* Remove the task from the actor's task queue. */
DL_DELETE(entry->task_queue, first_task);
/* Free the task spec and the task queue entry. */
free_task_spec(first_task->spec);
free(first_task);
return true;
}
/**
* Fetch a queued task's missing object dependency. The fetch request will be
* retried every LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS until the object is
@ -420,6 +707,31 @@ void queue_task_locally(local_scheduler_state *state,
}
}
/**
* Give a task directly to another local scheduler. This is currently only used
* for assigning actor tasks to the local scheduer responsible for that actor.
*
* @param state The scheduler state.
* @param algorithm_state The scheduling algorithm state.
* @param spec The task specification to schedule.
* @param local_scheduler_id The ID of the local scheduler to give the task to.
* @return Void.
*/
void give_task_to_local_scheduler(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
task_spec *spec,
db_client_id local_scheduler_id) {
if (db_client_ids_equal(local_scheduler_id, get_db_client_id(state->db))) {
LOG_WARN("Local scheduler is trying to assign a task to itself.");
}
CHECK(state->db != NULL);
/* Assign the task to the relevant local scheduler. */
DCHECK(state->config.global_scheduler_exists);
task *task = alloc_task(spec, TASK_STATUS_SCHEDULED, local_scheduler_id);
task_table_add_task(state->db, task, (retry_info *) &photon_retry, NULL,
NULL);
}
/**
* Give a task to the global scheduler to schedule.
*
@ -458,6 +770,25 @@ bool resource_constraints_satisfied(local_scheduler_state *state,
return true;
}
/**
* Update the result table, which holds mappings of object ID -> ID of the
* task that created it.
*
* @param state The scheduler state.
* @param spec The task spec in question.
* @return Void.
*/
void update_result_table(local_scheduler_state *state, task_spec *spec) {
if (state->db != NULL) {
task_id task_id = task_spec_id(spec);
for (int64_t i = 0; i < task_num_returns(spec); ++i) {
object_id return_id = task_return(spec, i);
result_table_add(state->db, return_id, task_id,
(retry_info *) &photon_retry, NULL, NULL);
}
}
}
void handle_task_submitted(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
task_spec *spec) {
@ -483,14 +814,64 @@ void handle_task_submitted(local_scheduler_state *state,
/* Update the result table, which holds mappings of object ID -> ID of the
* task that created it. */
if (state->db != NULL) {
task_id task_id = task_spec_id(spec);
for (int64_t i = 0; i < task_num_returns(spec); ++i) {
object_id return_id = task_return(spec, i);
result_table_add(state->db, return_id, task_id,
(retry_info *) &photon_retry, NULL, NULL);
update_result_table(state, spec);
}
void handle_actor_task_submitted(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
task_spec *spec) {
actor_id actor_id = task_spec_actor_id(spec);
CHECK(!actor_ids_equal(actor_id, NIL_ACTOR_ID));
/* Find the local scheduler responsible for this actor. */
actor_map_entry *entry;
HASH_FIND(hh, state->actor_mapping, &actor_id, sizeof(actor_id), entry);
if (entry == NULL) {
/* Add this task to a queue of tasks that have been submitted but the local
* scheduler doesn't know which actor is responsible for them. These tasks
* will be resubmitted (internally by the local scheduler) whenever a new
* actor notification arrives. */
utarray_push_back(algorithm_state->cached_submitted_actor_tasks, &spec);
return;
}
if (db_client_ids_equal(entry->local_scheduler_id,
get_db_client_id(state->db))) {
/* This local scheduler is responsible for the actor, so handle the task
* locally. */
add_task_to_actor_queue(state, algorithm_state, spec, false);
/* Attempt to dispatch tasks to this actor. */
dispatch_actor_task(state, algorithm_state, actor_id);
} else {
/* This local scheduler is not responsible for the task, so assign the task
* directly to the actor that is responsible. */
give_task_to_local_scheduler(state, algorithm_state, spec,
entry->local_scheduler_id);
}
/* Update the result table, which holds mappings of object ID -> ID of the
* task that created it. */
update_result_table(state, spec);
}
void handle_actor_creation_notification(
local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
actor_id actor_id) {
int num_cached_actor_tasks =
utarray_len(algorithm_state->cached_submitted_actor_tasks);
for (int i = 0; i < num_cached_actor_tasks; ++i) {
task_spec **spec = (task_spec **) utarray_eltptr(
algorithm_state->cached_submitted_actor_tasks, i);
/* Note that handle_actor_task_submitted may append the spec to the end of
* the cached_submitted_actor_tasks array. */
handle_actor_task_submitted(state, algorithm_state, *spec);
}
/* Remove all the tasks that were resubmitted. This does not erase the tasks
* that were newly appended to the cached_submitted_actor_tasks array. */
utarray_erase(algorithm_state->cached_submitted_actor_tasks, 0,
num_cached_actor_tasks);
}
void handle_task_scheduled(local_scheduler_state *state,
@ -506,6 +887,38 @@ void handle_task_scheduled(local_scheduler_state *state,
dispatch_tasks(state, algorithm_state);
}
void handle_actor_task_scheduled(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
task_spec *spec) {
/* This callback handles tasks that were assigned to this local scheduler by
* the global scheduler or by other workers, so we can safely assert that
* there is a connection to the database. */
DCHECK(state->db != NULL);
DCHECK(state->config.global_scheduler_exists);
/* Check that the task is meant to run on an actor that this local scheduler
* is responsible for. */
actor_id actor_id = task_spec_actor_id(spec);
DCHECK(!actor_ids_equal(actor_id, NIL_ACTOR_ID));
actor_map_entry *entry;
HASH_FIND(hh, state->actor_mapping, &actor_id, sizeof(actor_id), entry);
if (entry != NULL) {
/* This means that an actor has been assigned to this local scheduler, and a
* task for that actor has been received by this local scheduler, but this
* local scheduler has not yet processed the notification about the actor
* creation. This may be possible though should be very uncommon. If it does
* happen, it's ok. */
DCHECK(db_client_ids_equal(entry->local_scheduler_id,
get_db_client_id(state->db)));
} else {
LOG_INFO(
"handle_actor_task_scheduled called on local scheduler but the "
"corresponding actor_map_entry is not present. This should be rare.");
}
/* Push the task to the appropriate queue. */
add_task_to_actor_queue(state, algorithm_state, spec, true);
dispatch_actor_task(state, algorithm_state, actor_id);
}
void handle_worker_available(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
local_scheduler_client *worker) {
@ -524,6 +937,23 @@ void handle_worker_available(local_scheduler_state *state,
dispatch_tasks(state, algorithm_state);
}
void handle_actor_worker_available(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
local_scheduler_client *worker) {
actor_id actor_id = worker->actor_id;
CHECK(!actor_ids_equal(actor_id, NIL_ACTOR_ID));
/* Get the actor info for this worker. */
local_actor_info *entry;
HASH_FIND(hh, algorithm_state->local_actor_infos, &actor_id, sizeof(actor_id),
entry);
CHECK(entry != NULL);
CHECK(worker == entry->worker);
CHECK(!entry->worker_available);
entry->worker_available = true;
/* Assign a task to this actor if possible. */
dispatch_actor_task(state, algorithm_state, actor_id);
}
void handle_object_available(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
object_id object_id) {

View file

@ -61,6 +61,34 @@ void handle_task_submitted(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
task_spec *spec);
/**
* This version of handle_task_submitted is used when the task being submitted
* is a method of an actor.
*
* @param state The state of the local scheduler.
* @param algorithm_state State maintained by the scheduling algorithm.
* @param task Task that is submitted by the worker.
* @return Void.
*/
void handle_actor_task_submitted(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
task_spec *spec);
/**
* This function will be called when the local scheduler receives a notification
* about the creation of a new actor. This can be used by the scheduling
* algorithm to resubmit cached actor tasks.
*
* @param state The state of the local scheduler.
* @param algorithm_state State maintained by the scheduling algorithm.
* @param actor_id The ID of the actor being created.
* @return Void.
*/
void handle_actor_creation_notification(
local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
actor_id actor_id);
/**
* This function will be called when a task is assigned by the global scheduler
* for execution on this local scheduler.
@ -74,6 +102,20 @@ void handle_task_scheduled(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
task_spec *spec);
/**
* This function will be called when an actor task is assigned by the global
* scheduler or by another local scheduler for execution on this local
* scheduler.
*
* @param state The state of the local scheduler.
* @param algorithm_state State maintained by the scheduling algorithm.
* @param task Task that is assigned by the global scheduler.
* @return Void.
*/
void handle_actor_task_scheduled(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
task_spec *spec);
/**
* This function is called if a new object becomes available in the local
* plasma store.
@ -108,6 +150,45 @@ void handle_worker_available(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
local_scheduler_client *worker);
/**
* This version of handle_worker_available is called whenever the worker that is
* available is running an actor.
*
* @param state The state of the local scheduler.
* @param algorithm_state State maintained by the scheduling algorithm.
* @param wi Information about the worker that is available.
* @return Void.
*/
void handle_actor_worker_available(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
local_scheduler_client *worker);
/**
* Handle the fact that a new worker is available for running an actor.
*
* @param state The state of the local scheduler.
* @param algorithm_state State maintained by the scheduling algorithm.
* @param actor_id The ID of the actor running on the worker.
* @param worker The worker that was connected.
* @return Void.
*/
void handle_actor_worker_connect(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
actor_id actor_id,
local_scheduler_client *worker);
/**
* Handle the fact that a worker running an actor has disconnected.
*
* @param state The state of the local scheduler.
* @param algorithm_state State maintained by the scheduling algorithm.
* @param actor_id The ID of the actor running on the worker.
* @return Void.
*/
void handle_actor_worker_disconnect(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
actor_id actor_id);
/**
* This function fetches queued task's missing object dependencies. It is
* called every LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS.

View file

@ -4,13 +4,16 @@
#include "common/task.h"
#include <stdlib.h>
photon_conn *photon_connect(const char *photon_socket) {
photon_conn *photon_connect(const char *photon_socket, actor_id actor_id) {
photon_conn *result = malloc(sizeof(photon_conn));
result->conn = connect_ipc_sock(photon_socket);
/* If this is a worker, register the process ID with the local scheduler. */
pid_t my_pid = getpid();
int success = write_message(result->conn, REGISTER_PID, sizeof(my_pid),
(uint8_t *) &my_pid);
register_worker_info info;
memset(&info, 0, sizeof(info));
/* Register the process ID with the local scheduler. */
info.worker_pid = getpid();
info.actor_id = actor_id;
int success = write_message(result->conn, REGISTER_WORKER_INFO, sizeof(info),
(uint8_t *) &info);
CHECKM(success == 0, "Unable to register worker with local scheduler");
return result;
}

View file

@ -14,9 +14,11 @@ typedef struct {
*
* @param photon_socket The name of the socket to use to connect to the local
* scheduler.
* @param actor_id The ID of the actor running on this worker. If no actor is
* running on this actor, this should be NIL_ACTOR_ID.
* @return The connection information.
*/
photon_conn *photon_connect(const char *photon_socket);
photon_conn *photon_connect(const char *photon_socket, actor_id actor_id);
/**
* Disconnect from the local scheduler.

View file

@ -17,11 +17,13 @@ static int PyPhotonClient_init(PyPhotonClient *self,
PyObject *args,
PyObject *kwds) {
char *socket_name;
if (!PyArg_ParseTuple(args, "s", &socket_name)) {
actor_id actor_id;
if (!PyArg_ParseTuple(args, "sO&", &socket_name, PyStringToUniqueID,
&actor_id)) {
return -1;
}
/* Connect to the Photon scheduler. */
self->photon_connection = photon_connect(socket_name);
self->photon_connection = photon_connect(socket_name, actor_id);
return 0;
}

View file

@ -15,6 +15,7 @@
#include "photon.h"
#include "photon_scheduler.h"
#include "photon_algorithm.h"
#include "state/actor_notification_table.h"
#include "state/db.h"
#include "state/task_table.h"
#include "state/object_table.h"
@ -156,6 +157,15 @@ void free_local_scheduler(local_scheduler_state *state) {
utarray_free(state->workers);
state->workers = NULL;
/* Free the mapping from the actor ID to the ID of the local scheduler
* responsible for that actor. */
actor_map_entry *current_actor_map_entry, *temp_actor_map_entry;
HASH_ITER(hh, state->actor_mapping, current_actor_map_entry,
temp_actor_map_entry) {
HASH_DEL(state->actor_mapping, current_actor_map_entry);
free(current_actor_map_entry);
}
/* Free the algorithm state. */
free_scheduling_algorithm_state(state->algorithm_state);
state->algorithm_state = NULL;
@ -175,7 +185,7 @@ void free_local_scheduler(local_scheduler_state *state) {
* @param state The state of the local scheduler.
* @return Void.
*/
void start_worker(local_scheduler_state *state) {
void start_worker(local_scheduler_state *state, actor_id actor_id) {
/* We can't start a worker if we don't have the path to the worker script. */
CHECK(state->config.start_worker_command != NULL);
/* Launch the process to create the worker. */
@ -186,9 +196,24 @@ void start_worker(local_scheduler_state *state) {
return;
}
char id_string[ID_STRING_SIZE];
object_id_to_string(actor_id, id_string, ID_STRING_SIZE);
/* Figure out how many arguments there are in the start_worker_command. */
int num_args = 0;
for (; state->config.start_worker_command[num_args] != NULL; ++num_args) {
}
const char **start_actor_worker_command =
malloc((num_args + 3) * sizeof(const char *));
for (int i = 0; i < num_args; ++i) {
start_actor_worker_command[i] = state->config.start_worker_command[i];
}
start_actor_worker_command[num_args] = "--actor-id";
start_actor_worker_command[num_args + 1] = (const char *) id_string;
start_actor_worker_command[num_args + 2] = NULL;
/* Try to execute the worker command. Exit if we're not successful. */
execvp(state->config.start_worker_command[0],
(char *const *) state->config.start_worker_command);
execvp(start_actor_worker_command[0],
(char *const *) start_actor_worker_command);
free(start_actor_worker_command);
free_local_scheduler(state);
LOG_FATAL("Failed to start worker");
}
@ -259,6 +284,9 @@ local_scheduler_state *init_local_scheduler(
state->loop = loop;
/* Initialize the list of workers. */
utarray_new(state->workers, &workers_icd);
/* Initialize the hash table mapping actor ID to the ID of the local scheduler
* that is responsible for that actor. */
state->actor_mapping = NULL;
/* Connect to Redis if a Redis address is provided. */
if (redis_addr != NULL) {
int num_args;
@ -309,11 +337,11 @@ local_scheduler_state *init_local_scheduler(
/* Start the initial set of workers. */
utarray_new(state->child_pids, &pid_t_icd);
for (int i = 0; i < num_workers; ++i) {
start_worker(state);
start_worker(state, NIL_ACTOR_ID);
}
return state;
};
}
void assign_task_to_worker(local_scheduler_state *state,
task_spec *spec,
@ -393,8 +421,11 @@ void reconstruct_task_update_callback(task *task, void *user_context) {
* to ensure that reconstruction will happen. */
local_scheduler_state *state = user_context;
task_spec *spec = task_task_spec(task);
/* If the task is an actor task, then we currently do not reconstruct it.
* TODO(rkn): Handle this better. */
CHECK(actor_ids_equal(task_spec_actor_id(spec), NIL_ACTOR_ID));
/* Resubmit the task. */
handle_task_submitted(state, state->algorithm_state, spec);
/* Recursively reconstruct the task's inputs, if necessary. */
for (int64_t i = 0; i < task_num_args(spec); ++i) {
if (task_arg_type(spec, i) == ARG_BY_REF) {
@ -467,7 +498,12 @@ void process_message(event_loop *loop,
switch (type) {
case SUBMIT_TASK: {
task_spec *spec = (task_spec *) utarray_front(state->input_buffer);
if (actor_ids_equal(task_spec_actor_id(spec), NIL_ACTOR_ID)) {
handle_task_submitted(state, state->algorithm_state, spec);
} else {
handle_actor_task_submitted(state, state->algorithm_state, spec);
}
} break;
case TASK_DONE: {
} break;
@ -495,6 +531,50 @@ void process_message(event_loop *loop,
free(key);
free(value);
} break;
case REGISTER_WORKER_INFO: {
/* Update the actor mapping with the actor ID of the worker (if an actor is
* running on the worker). */
register_worker_info *info =
(register_worker_info *) utarray_front(state->input_buffer);
if (!actor_ids_equal(info->actor_id, NIL_ACTOR_ID)) {
/* Make sure that the local scheduler is aware that it is responsible for
* this actor. */
actor_map_entry *entry;
HASH_FIND(hh, state->actor_mapping, &info->actor_id,
sizeof(info->actor_id), entry);
CHECK(entry != NULL);
CHECK(db_client_ids_equal(entry->local_scheduler_id,
get_db_client_id(state->db)));
/* Update the worker struct with this actor ID. */
CHECK(actor_ids_equal(worker->actor_id, NIL_ACTOR_ID));
worker->actor_id = info->actor_id;
/* Let the scheduling algorithm process the presence of this new
* worker. */
handle_actor_worker_connect(state, state->algorithm_state, info->actor_id,
worker);
}
/* Register worker process id with the scheduler. */
worker->pid = info->worker_pid;
/* Determine if this worker is one of our child processes. */
LOG_DEBUG("PID is %d", info->worker_pid);
pid_t *child_pid;
int index = 0;
for (child_pid = (pid_t *) utarray_front(state->child_pids);
child_pid != NULL;
child_pid = (pid_t *) utarray_next(state->child_pids, child_pid)) {
if (*child_pid == info->worker_pid) {
/* If this worker is one of our child processes, mark it as a child so
* that we know that we can wait for the process to exit during
* cleanup. */
worker->is_child = true;
utarray_erase(state->child_pids, index, 1);
LOG_DEBUG("Found matching child pid %d", info->worker_pid);
break;
}
++index;
}
} break;
case GET_TASK: {
/* If this worker reports a completed task: account for resources. */
if (worker->task_in_progress != NULL) {
@ -521,7 +601,11 @@ void process_message(event_loop *loop,
}
/* Let the scheduling algorithm process the fact that there is an available
* worker. */
if (actor_ids_equal(worker->actor_id, NIL_ACTOR_ID)) {
handle_worker_available(state, state->algorithm_state, worker);
} else {
handle_actor_worker_available(state, state->algorithm_state, worker);
}
} break;
case RECONSTRUCT_OBJECT: {
object_id *obj_id = (object_id *) utarray_front(state->input_buffer);
@ -530,32 +614,14 @@ void process_message(event_loop *loop,
case DISCONNECT_CLIENT: {
LOG_INFO("Disconnecting client on fd %d", client_sock);
kill_worker(worker, false);
if (!actor_ids_equal(worker->actor_id, NIL_ACTOR_ID)) {
/* Let the scheduling algorithm process the absence of this worker. */
handle_actor_worker_disconnect(state, state->algorithm_state,
worker->actor_id);
}
} break;
case LOG_MESSAGE: {
} break;
case REGISTER_PID: {
pid_t *worker_pid = (pid_t *) utarray_front(state->input_buffer);
worker->pid = *worker_pid;
/* Determine if this worker is one of our child processes. */
LOG_DEBUG("Pid is %d", *worker_pid);
pid_t *child_pid;
int index = 0;
for (child_pid = (pid_t *) utarray_front(state->child_pids);
child_pid != NULL;
child_pid = (pid_t *) utarray_next(state->child_pids, child_pid)) {
if (*child_pid == *worker_pid) {
/* If this worker is one of our child processes, mark it as a child so
* that we know that we can wait for the process to exit during
* cleanup. */
worker->is_child = true;
utarray_erase(state->child_pids, index, 1);
LOG_DEBUG("Found matching child pid %d", *worker_pid);
break;
}
++index;
}
} break;
default:
/* This code should be unreachable. */
CHECK(0);
@ -575,6 +641,7 @@ void new_client_connection(event_loop *loop,
worker->task_in_progress = NULL;
worker->pid = 0;
worker->is_child = false;
worker->actor_id = NIL_ACTOR_ID;
worker->local_scheduler_state = state;
utarray_push_back(state->workers, &worker);
event_loop_add_file(loop, new_socket, EVENT_LOOP_READ, process_message,
@ -597,8 +664,54 @@ void signal_handler(int signal) {
/* End of the cleanup code. */
void handle_task_scheduled_callback(task *original_task, void *user_context) {
handle_task_scheduled(g_state, g_state->algorithm_state,
task_task_spec(original_task));
task_spec *spec = task_task_spec(original_task);
if (actor_ids_equal(task_spec_actor_id(spec), NIL_ACTOR_ID)) {
/* This task does not involve an actor. Handle it normally. */
handle_task_scheduled(g_state, g_state->algorithm_state, spec);
} else {
/* This task involves an actor. Call the scheduling algorithm's actor
* handler. */
handle_actor_task_scheduled(g_state, g_state->algorithm_state, spec);
}
}
/**
* Process a notification about the creation of a new actor. Use this to update
* the mapping from actor ID to the local scheduler ID of the local scheduler
* that is responsible for the actor. If this local scheduler is responsible for
* the actor, then launch a new worker process to create that actor.
*
* @param actor_id The ID of the actor being created.
* @param local_scheduler_id The ID of the local scheduler that is responsible
* for creating the actor.
* @return Void.
*/
void handle_actor_creation_callback(actor_info info, void *context) {
actor_id actor_id = info.actor_id;
db_client_id local_scheduler_id = info.local_scheduler_id;
local_scheduler_state *state = context;
/* Make sure the actor entry is not already present in the actor map table.
* TODO(rkn): We will need to remove this check to handle the case where the
* corresponding publish is retried and the case in which a task that creates
* an actor is resubmitted due to fault tolerance. */
actor_map_entry *entry;
HASH_FIND(hh, state->actor_mapping, &actor_id, sizeof(actor_id), entry);
CHECK(entry == NULL);
/* Create a new entry and add it to the actor mapping table. TODO(rkn):
* Currently this is never removed (except when the local scheduler state is
* deleted). */
entry = malloc(sizeof(actor_map_entry));
entry->actor_id = actor_id;
entry->local_scheduler_id = local_scheduler_id;
HASH_ADD(hh, state->actor_mapping, actor_id, sizeof(entry->actor_id), entry);
/* If this local scheduler is responsible for the actor, then start a new
* worker for the actor. */
if (db_client_ids_equal(local_scheduler_id, get_db_client_id(state->db))) {
start_worker(state, actor_id);
}
/* Let the scheduling algorithm process the fact that a new actor has been
* created. */
handle_actor_creation_notification(state, state->algorithm_state, actor_id);
}
int heartbeat_handler(event_loop *loop, timer_id id, void *context) {
@ -638,9 +751,9 @@ void start_server(const char *node_ip_address,
event_loop_add_file(loop, fd, EVENT_LOOP_READ, new_client_connection,
g_state);
/* Subscribe to receive notifications about tasks that are assigned to this
* local scheduler by the global scheduler. TODO(rkn): we also need to get any
* tasks that were assigned to this local scheduler before the call to
* subscribe. */
* local scheduler by the global scheduler or by other local schedulers.
* TODO(rkn): we also need to get any tasks that were assigned to this local
* scheduler before the call to subscribe. */
retry_info retry;
memset(&retry, 0, sizeof(retry));
retry.num_retries = 0;
@ -651,6 +764,11 @@ void start_server(const char *node_ip_address,
TASK_STATUS_SCHEDULED, handle_task_scheduled_callback,
NULL, &retry, NULL, NULL);
}
/* Subscribe to notifications about newly created actors. */
if (g_state->db != NULL) {
actor_notification_table_subscribe(
g_state->db, handle_actor_creation_callback, g_state, &retry);
}
/* Create a timer for publishing information about the load on the local
* scheduler to the local scheduler table. This message also serves as a
* heartbeat. */
@ -796,7 +914,6 @@ int main(int argc, char *argv[]) {
}
}
LOG_INFO("Start worker command is %s", start_worker_command);
start_server(node_ip_address, scheduler_socket_name, redis_addr, redis_port,
plasma_store_socket_name, plasma_manager_socket_name,
plasma_manager_address, global_scheduler_exists,

View file

@ -94,7 +94,15 @@ void process_message(event_loop *loop,
void kill_worker(local_scheduler_client *worker, bool wait);
void start_worker(local_scheduler_state *state);
/**
* Start a new worker by forking.
*
* @param state The local scheduler state.
* @param actor_id The ID of the actor for this worker. If this worker is not an
* actor, then NIL_ACTOR_ID should be used.
* @return Void.
*/
void start_worker(local_scheduler_state *state, actor_id actor_id);
#endif
#endif /* PHOTON_SCHEDULER_H */

View file

@ -101,7 +101,8 @@ photon_mock *init_photon_mock(bool connect_to_redis,
mock->num_photon_conns = num_mock_workers;
mock->conns = malloc(sizeof(photon_conn *) * num_mock_workers);
for (int i = 0; i < num_mock_workers; ++i) {
mock->conns[i] = photon_connect(utstring_body(photon_socket_name));
mock->conns[i] =
photon_connect(utstring_body(photon_socket_name), NIL_ACTOR_ID);
new_client_connection(mock->loop, mock->photon_fd,
(void *) mock->photon_state, 0);
}
@ -555,7 +556,7 @@ TEST start_kill_workers_test(void) {
ASSERT_EQ(utarray_len(photon->photon_state->workers), num_workers - 1);
/* Start a worker after the local scheduler has been initialized. */
start_worker(photon->photon_state);
start_worker(photon->photon_state, NIL_ACTOR_ID);
/* Accept the workers as clients to the plasma manager. */
int new_worker_fd = accept_client(photon->plasma_manager_fd);
/* The new worker should register its process ID. */

475
test/actor_test.py Normal file
View file

@ -0,0 +1,475 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import unittest
import numpy as np
import time
import ray
class ActorAPI(unittest.TestCase):
def testKeywordArgs(self):
ray.init(num_workers=0, driver_mode=ray.SILENT_MODE)
@ray.actor
class Actor(object):
def __init__(self, arg0, arg1=1, arg2="a"):
self.arg0 = arg0
self.arg1 = arg1
self.arg2 = arg2
def get_values(self, arg0, arg1=2, arg2="b"):
return self.arg0 + arg0, self.arg1 + arg1, self.arg2 + arg2
actor = Actor(0)
self.assertEqual(ray.get(actor.get_values(1)), (1, 3, "ab"))
actor = Actor(1, 2)
self.assertEqual(ray.get(actor.get_values(2, 3)), (3, 5, "ab"))
actor = Actor(1, 2, "c")
self.assertEqual(ray.get(actor.get_values(2, 3, "d")), (3, 5, "cd"))
# Make sure we get an exception if the constructor is called incorrectly.
actor = Actor()
with self.assertRaises(Exception):
ray.get(ray.get(actor.get_values(1)))
with self.assertRaises(Exception):
ray.get(ray.get(actor.get_values()))
# Make sure we get an exception if the method is called incorrectly.
actor = Actor(1)
with self.assertRaises(Exception):
ray.get(ray.get(actor.get_values()))
ray.worker.cleanup()
def testVariableNumberOfArgs(self):
ray.init(num_workers=0)
@ray.actor
class Actor(object):
def __init__(self, arg0, arg1=1, *args):
self.arg0 = arg0
self.arg1 = arg1
self.args = args
def get_values(self, arg0, arg1=2, *args):
return self.arg0 + arg0, self.arg1 + arg1, self.args, args
actor = Actor(0)
self.assertEqual(ray.get(actor.get_values(1)), (1, 3, (), ()))
actor = Actor(1, 2)
self.assertEqual(ray.get(actor.get_values(2, 3)), (3, 5, (), ()))
actor = Actor(1, 2, "c")
self.assertEqual(ray.get(actor.get_values(2, 3, "d")), (3, 5, ("c",), ("d",)))
actor = Actor(1, 2, "a", "b", "c", "d")
self.assertEqual(ray.get(actor.get_values(2, 3, 1, 2, 3, 4)), (3, 5, ("a", "b", "c", "d"), (1, 2, 3, 4)))
ray.worker.cleanup()
def testNoArgs(self):
ray.init(num_workers=0)
@ray.actor
class Actor(object):
def __init__(self):
pass
def get_values(self):
pass
actor = Actor()
self.assertEqual(ray.get(actor.get_values()), None)
ray.worker.cleanup()
def testNoConstructor(self):
# If no __init__ method is provided, that should not be a problem.
ray.init(num_workers=0)
@ray.actor
class Actor(object):
def get_values(self):
pass
actor = Actor()
self.assertEqual(ray.get(actor.get_values()), None)
ray.worker.cleanup()
def testCustomClasses(self):
ray.init(num_workers=0)
class Foo(object):
def __init__(self, x):
self.x = x
ray.register_class(Foo)
@ray.actor
class Actor(object):
def __init__(self, f2):
self.f1 = Foo(1)
self.f2 = f2
def get_values1(self):
return self.f1, self.f2
def get_values2(self, f3):
return self.f1, self.f2, f3
actor = Actor(Foo(2))
results1 = ray.get(actor.get_values1())
self.assertEqual(results1[0].x, 1)
self.assertEqual(results1[1].x, 2)
results2 = ray.get(actor.get_values2(Foo(3)))
self.assertEqual(results2[0].x, 1)
self.assertEqual(results2[1].x, 2)
self.assertEqual(results2[2].x, 3)
ray.worker.cleanup()
# def testCachingActors(self):
# # TODO(rkn): Implement this.
# pass
class ActorMethods(unittest.TestCase):
def testDefineActor(self):
ray.init()
@ray.actor
class Test(object):
def __init__(self, x):
self.x = x
def f(self, y):
return self.x + y
t = Test(2)
self.assertEqual(ray.get(t.f(1)), 3)
ray.worker.cleanup()
def testActorState(self):
ray.init()
@ray.actor
class Counter(object):
def __init__(self):
self.value = 0
def increase(self):
self.value += 1
def value(self):
return self.value
c1 = Counter()
c1.increase()
self.assertEqual(ray.get(c1.value()), 1)
c2 = Counter()
c2.increase()
c2.increase()
self.assertEqual(ray.get(c2.value()), 2)
ray.worker.cleanup()
def testMultipleActors(self):
# Create a bunch of actors and call a bunch of methods on all of them.
ray.init(num_workers=0)
@ray.actor
class Counter(object):
def __init__(self, value):
self.value = value
def increase(self):
self.value += 1
return self.value
def reset(self):
self.value = 0
num_actors = 20
num_increases = 50
# Create multiple actors.
actors = [Counter(i) for i in range(num_actors)]
results = []
# Call each actor's method a bunch of times.
for i in range(num_actors):
results += [actors[i].increase() for _ in range(num_increases)]
result_values = ray.get(results)
for i in range(num_actors):
self.assertEqual(result_values[(num_increases * i):(num_increases * (i + 1))], list(range(i + 1, num_increases + i + 1)))
# Reset the actor values.
[actor.reset() for actor in actors]
# Interweave the method calls on the different actors.
results = []
for j in range(num_increases):
results += [actor.increase() for actor in actors]
result_values = ray.get(results)
for j in range(num_increases):
self.assertEqual(result_values[(num_actors * j):(num_actors * (j + 1))], num_actors * [j + 1])
ray.worker.cleanup()
class ActorNesting(unittest.TestCase):
def testRemoteFunctionWithinActor(self):
# Make sure we can use remote funtions within actors.
ray.init(num_cpus=100)
# Create some values to close over.
val1 = 1
val2 = 2
@ray.remote
def f(x):
return val1 + x
@ray.remote
def g(x):
return ray.get(f.remote(x))
@ray.actor
class Actor(object):
def __init__(self, x):
self.x = x
self.y = val2
self.object_ids = [f.remote(i) for i in range(5)]
self.values2 = ray.get([f.remote(i) for i in range(5)])
def get_values(self):
return self.x, self.y, self.object_ids, self.values2
def f(self):
return [f.remote(i) for i in range(5)]
def g(self):
return ray.get([g.remote(i) for i in range(5)])
def h(self, object_ids):
return ray.get(object_ids)
actor = Actor(1)
values = ray.get(actor.get_values())
self.assertEqual(values[0], 1)
self.assertEqual(values[1], val2)
self.assertEqual(ray.get(values[2]), list(range(1, 6)))
self.assertEqual(values[3], list(range(1, 6)))
self.assertEqual(ray.get(ray.get(actor.f())), list(range(1, 6)))
self.assertEqual(ray.get(actor.g()), list(range(1, 6)))
self.assertEqual(ray.get(actor.h([f.remote(i) for i in range(5)])), list(range(1, 6)))
ray.worker.cleanup()
def testDefineActorWithinActor(self):
# Make sure we can use remote funtions within actors.
ray.init()
@ray.actor
class Actor1(object):
def __init__(self, x):
self.x = x
def new_actor(self, z):
@ray.actor
class Actor2(object):
def __init__(self, x):
self.x = x
def get_value(self):
return self.x
self.actor2 = Actor2(z)
def get_values(self, z):
self.new_actor(z)
return self.x, ray.get(self.actor2.get_value())
actor1 = Actor1(3)
self.assertEqual(ray.get(actor1.get_values(5)), (3, 5))
ray.worker.cleanup()
# TODO(rkn): The test testUseActorWithinActor currently fails with a pickling
# error.
# def testUseActorWithinActor(self):
# # Make sure we can use remote funtions within actors.
# ray.init()
#
# @ray.actor
# class Actor1(object):
# def __init__(self, x):
# self.x = x
# def get_val(self):
# return self.x
#
# @ray.actor
# class Actor2(object):
# def __init__(self, x, y):
# self.x = x
# self.actor1 = Actor1(y)
#
# def get_values(self, z):
# return self.x, ray.get(self.actor1.get_val())
#
# actor2 = Actor2(3, 4)
# self.assertEqual(ray.get(actor2.get_values(5)), (3, 4))
#
# ray.worker.cleanup()
def testDefineActorWithinRemoteFunction(self):
# Make sure we can define and actors within remote funtions.
ray.init()
@ray.remote
def f(x, n):
@ray.actor
class Actor1(object):
def __init__(self, x):
self.x = x
def get_value(self):
return self.x
actor = Actor1(x)
return ray.get([actor.get_value() for _ in range(n)])
self.assertEqual(ray.get(f.remote(3, 1)), [3])
self.assertEqual(ray.get([f.remote(i, 20) for i in range(10)]), [20 * [i] for i in range(10)])
ray.worker.cleanup()
# This test currently fails with a pickling error.
# def testUseActorWithinRemoteFunction(self):
# # Make sure we can create and use actors within remote funtions.
# ray.init()
#
# @ray.actor
# class Actor1(object):
# def __init__(self, x):
# self.x = x
# def get_values(self):
# return self.x
#
# @ray.remote
# def f(x):
# actor = Actor1(x)
# return ray.get(actor.get_values())
#
# self.assertEqual(ray.get(f.remote(3)), 3)
#
# ray.worker.cleanup()
def testActorImportCounter(self):
# This is mostly a test of the export counters to make sure that when an
# actor is imported, all of the necessary remote functions have been
# imported.
ray.init()
# Export a bunch of remote functions.
num_remote_functions = 50
for i in range(num_remote_functions):
@ray.remote
def f():
return i
@ray.remote
def g():
@ray.actor
class Actor(object):
def __init__(self):
# This should use the last version of f.
self.x = ray.get(f.remote())
def get_val(self):
return self.x
actor = Actor()
return ray.get(actor.get_val())
self.assertEqual(ray.get(g.remote()), num_remote_functions - 1)
ray.worker.cleanup()
class ActorInheritance(unittest.TestCase):
def testInheritActorFromClass(self):
# Make sure we can define an actor by inheriting from a regular class. Note
# that actors cannot inherit from other actors.
ray.init()
class Foo(object):
def __init__(self, x):
self.x = x
def f(self):
return self.x
def g(self, y):
return self.x + y
@ray.actor
class Actor(Foo):
def __init__(self, x):
Foo.__init__(self, x)
def get_value(self):
return self.f()
actor = Actor(1)
self.assertEqual(ray.get(actor.get_value()), 1)
self.assertEqual(ray.get(actor.g(5)), 6)
ray.worker.cleanup()
class ActorSchedulingProperties(unittest.TestCase):
def testRemoteFunctionsNotScheduledOnActors(self):
# Make sure that regular remote functions are not scheduled on actors.
ray.init(num_workers=0)
@ray.actor
class Actor(object):
def __init__(self):
pass
actor = Actor()
@ray.remote
def f():
return 1
# Make sure that f cannot be scheduled on the worker created for the actor.
# The wait call should time out.
ready_ids, remaining_ids = ray.wait([f.remote() for _ in range(10)], timeout=3000)
self.assertEqual(ready_ids, [])
self.assertEqual(len(remaining_ids), 10)
ray.worker.cleanup()
class ActorsOnMultipleNodes(unittest.TestCase):
def testActorLoadBalancing(self):
num_local_schedulers = 3
ray.worker._init(start_ray_local=True, num_workers=0, num_local_schedulers=num_local_schedulers)
@ray.actor
class Actor1(object):
def __init__(self):
pass
def get_location(self):
return ray.worker.global_worker.plasma_client.store_socket_name
# Create a bunch of actors.
num_actors = 30
actors = [Actor1() for _ in range(num_actors)]
# Make sure that actors are spread between the local schedulers.
locations = ray.get([actor.get_location() for actor in actors])
names = set(locations)
self.assertEqual(len(names), num_local_schedulers)
self.assertTrue(all([locations.count(name) > 5 for name in names]))
# Make sure we can get the results of a bunch of tasks.
results = []
for _ in range(1000):
index = np.random.randint(num_actors)
results.append(actors[index].get_location())
ray.get(results)
ray.worker.cleanup()
if __name__ == "__main__":
unittest.main(verbosity=2)

View file

@ -165,5 +165,94 @@ class TaskStatusTest(unittest.TestCase):
ray.worker.cleanup()
class ActorTest(unittest.TestCase):
def testFailedActorInit(self):
ray.init(num_workers=0, driver_mode=ray.SILENT_MODE)
error_message1 = "actor constructor failed"
error_message2 = "actor method failed"
@ray.actor
class FailedActor(object):
def __init__(self):
raise Exception(error_message1)
def get_val(self):
return 1
def fail_method(self):
raise Exception(error_message2)
a = FailedActor()
# Make sure that we get errors from a failed constructor.
wait_for_errors(b"task", 1)
self.assertEqual(len(ray.error_info()), 1)
self.assertIn(error_message1, ray.error_info()[0][b"message"].decode("ascii"))
# Make sure that we get errors from a failed method.
a.fail_method()
wait_for_errors(b"task", 2)
self.assertEqual(len(ray.error_info()), 2)
self.assertIn(error_message2, ray.error_info()[1][b"message"].decode("ascii"))
ray.worker.cleanup()
def testIncorrectMethodCalls(self):
ray.init(num_workers=0, driver_mode=ray.SILENT_MODE)
@ray.actor
class Actor(object):
def __init__(self, missing_variable_name):
pass
def get_val(self, x):
pass
# Make sure that we get errors if we call the constructor incorrectly.
# TODO(rkn): These errors should instead be thrown when the method is
# called.
# Create an actor with too few arguments.
a = Actor()
wait_for_errors(b"task", 1)
self.assertEqual(len(ray.error_info()), 1)
if sys.version_info >= (3, 0):
self.assertIn("missing 1 required", ray.error_info()[0][b"message"].decode("ascii"))
else:
self.assertIn("takes exactly 2 arguments", ray.error_info()[0][b"message"].decode("ascii"))
# Create an actor with too many arguments.
a = Actor(1, 2)
wait_for_errors(b"task", 2)
self.assertEqual(len(ray.error_info()), 2)
if sys.version_info >= (3, 0):
self.assertIn("but 3 were given", ray.error_info()[1][b"message"].decode("ascii"))
else:
self.assertIn("takes exactly 2 arguments", ray.error_info()[1][b"message"].decode("ascii"))
# Create an actor the correct number of arguments.
a = Actor(1)
# Call a method with too few arguments.
a.get_val()
wait_for_errors(b"task", 3)
self.assertEqual(len(ray.error_info()), 3)
if sys.version_info >= (3, 0):
self.assertIn("missing 1 required", ray.error_info()[2][b"message"].decode("ascii"))
else:
self.assertIn("takes exactly 2 arguments", ray.error_info()[2][b"message"].decode("ascii"))
# Call a method with too many arguments.
a.get_val(1, 2)
wait_for_errors(b"task", 4)
self.assertEqual(len(ray.error_info()), 4)
if sys.version_info >= (3, 0):
self.assertIn("but 3 were given", ray.error_info()[3][b"message"].decode("ascii"))
else:
self.assertIn("takes exactly 2 arguments", ray.error_info()[3][b"message"].decode("ascii"))
# Call a method that doesn't exist.
with self.assertRaises(AttributeError):
a.nonexistent_method()
ray.worker.cleanup()
if __name__ == "__main__":
unittest.main(verbosity=2)

View file

@ -550,7 +550,7 @@ class APITest(unittest.TestCase):
ray.worker.cleanup()
def testLoggingAPI(self):
ray.init(num_workers=1)
ray.init(num_workers=1, driver_mode=ray.SILENT_MODE)
def events():
# This is a hack for getting the event log. It is not part of the API.