Properly mock ray submodules when building documentation. (#337)

This commit is contained in:
Robert Nishihara 2017-03-04 23:02:56 -08:00 committed by Philipp Moritz
parent 0a233b7144
commit a7ddac6fb1
8 changed files with 74 additions and 59 deletions

View file

@ -31,7 +31,6 @@ matrix:
# Try generating Sphinx documentation. To do this, we need to install
# Ray first.
- ./.travis/install-dependencies.sh
- ./.travis/install-ray.sh
- export PATH="$HOME/miniconda/bin:$PATH"
- cd doc
- pip install -r requirements-doc.txt

View file

@ -64,8 +64,6 @@ elif [[ "$LINT" == "1" ]]; then
# Install miniconda.
wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh -O miniconda.sh
bash miniconda.sh -b -p $HOME/miniconda
export PATH="$HOME/miniconda/bin:$PATH"
pip install numpy cloudpickle funcsigs colorama psutil redis
else
echo "Unrecognized environment."
exit 1

View file

@ -16,6 +16,12 @@ import sys
import os
import shlex
# These lines added to enable Sphinx to work without installing Ray.
import mock
MOCK_MODULES = ["ray.numbuf", "ray.local_scheduler", "ray.plasma"]
for mod_name in MOCK_MODULES:
sys.modules[mod_name] = mock.Mock()
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.

View file

@ -13,8 +13,6 @@ if hasattr(ctypes, "windll"):
# This is done by associating all child processes with a "job" object that imposes this behavior.
(lambda kernel32: (lambda job: (lambda n: kernel32.SetInformationJobObject(job, 9, "\0" * 17 + chr(0x8 | 0x4 | 0x20) + "\0" * (n - 18), n))(0x90 if ctypes.sizeof(ctypes.c_void_p) > ctypes.sizeof(ctypes.c_int) else 0x70) and kernel32.AssignProcessToJobObject(job, ctypes.c_void_p(kernel32.GetCurrentProcess())))(ctypes.c_void_p(kernel32.CreateJobObjectW(None, None))) if kernel32 is not None else None)(ctypes.windll.kernel32)
import ray.experimental
import ray.serialization
from ray.worker import register_class, error_info, init, connect, disconnect, get, put, wait, remote, log_event, log_span, flush_log
from ray.actor import actor
from ray.actor import get_gpu_ids

View file

@ -9,7 +9,7 @@ import numpy as np
import random
import traceback
import ray.local_scheduler as local_scheduler
import ray.local_scheduler
import ray.pickling as pickling
import ray.worker
import ray.experimental.state as state
@ -30,7 +30,7 @@ def random_string():
return np.random.bytes(20)
def random_actor_id():
return local_scheduler.ObjectID(random_string())
return ray.local_scheduler.ObjectID(random_string())
def get_actor_method_function_id(attr):
"""Get the function ID corresponding to an actor method.
@ -45,13 +45,13 @@ def get_actor_method_function_id(attr):
function_id_hash.update(attr.encode("ascii"))
function_id = function_id_hash.digest()
assert len(function_id) == 20
return local_scheduler.ObjectID(function_id)
return ray.local_scheduler.ObjectID(function_id)
def fetch_and_register_actor(key, worker):
"""Import an actor."""
driver_id, actor_id_str, actor_name, module, pickled_class, assigned_gpu_ids, actor_method_names = \
worker.redis_client.hmget(key, ["driver_id", "actor_id", "name", "module", "class", "gpu_ids", "actor_method_names"])
actor_id = local_scheduler.ObjectID(actor_id_str)
actor_id = ray.local_scheduler.ObjectID(actor_id_str)
actor_name = actor_name.decode("ascii")
module = module.decode("ascii")
actor_method_names = json.loads(actor_method_names.decode("ascii"))

View file

@ -4,7 +4,7 @@ from __future__ import print_function
import numpy as np
import ray.numbuf as numbuf
import ray.numbuf
import ray.pickling as pickling
def check_serializable(cls):
@ -139,5 +139,11 @@ def deserialize(serialized_obj):
obj.__dict__.update(serialized_obj)
return obj
# Register the callbacks with numbuf.
numbuf.register_callbacks(serialize, deserialize)
def set_callbacks():
"""Register the custom callbacks with numbuf.
The serialize callback is used to serialize objects that numbuf does not know
how to serialize (for example custom Python classes). The deserialize callback
is used to serialize objects that were serialized by the serialize callback.
"""
ray.numbuf.register_callbacks(serialize, deserialize)

View file

@ -17,8 +17,8 @@ import time
import threading
# Ray modules
import ray.local_scheduler as local_scheduler
import ray.plasma as plasma
import ray.local_scheduler
import ray.plasma
import ray.global_scheduler as global_scheduler
PROCESS_TYPE_MONITOR = "monitor"
@ -417,7 +417,7 @@ def start_local_scheduler(redis_address,
if num_gpus is None:
# By default, assume this node has no GPUs.
num_gpus = 0
local_scheduler_name, p = local_scheduler.start_local_scheduler(
local_scheduler_name, p = ray.local_scheduler.start_local_scheduler(
plasma_store_name,
plasma_manager_name,
worker_path=worker_path,
@ -489,28 +489,31 @@ def start_objstore(node_ip_address, redis_address, object_manager_port=None,
else:
objstore_memory = int(system_memory * 0.8)
# Start the Plasma store.
plasma_store_name, p1 = plasma.start_plasma_store(plasma_store_memory=objstore_memory,
use_profiler=RUN_PLASMA_STORE_PROFILER,
stdout_file=store_stdout_file,
stderr_file=store_stderr_file)
plasma_store_name, p1 = ray.plasma.start_plasma_store(
plasma_store_memory=objstore_memory,
use_profiler=RUN_PLASMA_STORE_PROFILER,
stdout_file=store_stdout_file,
stderr_file=store_stderr_file)
# Start the plasma manager.
if object_manager_port is not None:
plasma_manager_name, p2, plasma_manager_port = plasma.start_plasma_manager(plasma_store_name,
redis_address,
plasma_manager_port=object_manager_port,
node_ip_address=node_ip_address,
num_retries=1,
run_profiler=RUN_PLASMA_MANAGER_PROFILER,
stdout_file=manager_stdout_file,
stderr_file=manager_stderr_file)
plasma_manager_name, p2, plasma_manager_port = ray.plasma.start_plasma_manager(
plasma_store_name,
redis_address,
plasma_manager_port=object_manager_port,
node_ip_address=node_ip_address,
num_retries=1,
run_profiler=RUN_PLASMA_MANAGER_PROFILER,
stdout_file=manager_stdout_file,
stderr_file=manager_stderr_file)
assert plasma_manager_port == object_manager_port
else:
plasma_manager_name, p2, plasma_manager_port = plasma.start_plasma_manager(plasma_store_name,
redis_address,
node_ip_address=node_ip_address,
run_profiler=RUN_PLASMA_MANAGER_PROFILER,
stdout_file=manager_stdout_file,
stderr_file=manager_stderr_file)
plasma_manager_name, p2, plasma_manager_port = ray.plasma.start_plasma_manager(
plasma_store_name,
redis_address,
node_ip_address=node_ip_address,
run_profiler=RUN_PLASMA_MANAGER_PROFILER,
stdout_file=manager_stdout_file,
stderr_file=manager_stderr_file)
if cleanup:
all_processes[PROCESS_TYPE_PLASMA_STORE].append(p1)
all_processes[PROCESS_TYPE_PLASMA_MANAGER].append(p2)

View file

@ -25,9 +25,9 @@ import traceback
import ray.pickling as pickling
import ray.serialization as serialization
import ray.services as services
import ray.numbuf as numbuf
import ray.local_scheduler as local_scheduler
import ray.plasma as plasma
import ray.numbuf
import ray.local_scheduler
import ray.plasma
SCRIPT_MODE = 0
WORKER_MODE = 1
@ -53,7 +53,7 @@ def random_string():
return np.random.bytes(20)
def random_object_id():
return local_scheduler.ObjectID(random_string())
return ray.local_scheduler.ObjectID(random_string())
class FunctionID(object):
def __init__(self, function_id):
@ -78,7 +78,7 @@ def numbuf_serialize(value):
The serialized object.
"""
assert len(contained_objectids) == 0, "This should be unreachable."
return numbuf.serialize_list([value])
return ray.numbuf.serialize_list([value])
class RayTaskError(Exception):
"""An object used internally to represent a task that threw an exception.
@ -439,8 +439,8 @@ class Worker(object):
"""
# Serialize and put the object in the object store.
try:
numbuf.store_list(objectid.id(), self.plasma_client.conn, [value])
except numbuf.numbuf_plasma_object_exists_error as e:
ray.numbuf.store_list(objectid.id(), self.plasma_client.conn, [value])
except ray.numbuf.numbuf_plasma_object_exists_error as e:
# The object already exists in the object store, so there is no need to
# add it again. TODO(rkn): We need to compare the hashes and make sure
# that the objects are in fact the same. We also should return an error
@ -465,7 +465,7 @@ class Worker(object):
self.plasma_client.fetch([object_id.id() for object_id in object_ids])
# Get the objects. We initially try to get the objects immediately.
final_results = numbuf.retrieve_list(
final_results = ray.numbuf.retrieve_list(
[object_id.id() for object_id in object_ids],
self.plasma_client.conn,
0)
@ -482,7 +482,7 @@ class Worker(object):
# Do another fetch for objects that aren't available locally yet, in case
# they were evicted since the last fetch.
self.plasma_client.fetch(list(unready_ids.keys()))
results = numbuf.retrieve_list(list(unready_ids.keys()),
results = ray.numbuf.retrieve_list(list(unready_ids.keys()),
self.plasma_client.conn,
GET_TIMEOUT_MILLISECONDS)
# Remove any entries for objects we received during this iteration so we
@ -504,7 +504,7 @@ class Worker(object):
assert final_results[i][0] == object_ids[i].id()
return [result[1][0] for result in final_results]
def submit_task(self, function_id, func_name, args, actor_id=local_scheduler.ObjectID(NIL_ACTOR_ID)):
def submit_task(self, function_id, func_name, args, actor_id=None):
"""Submit a remote task to the scheduler.
Tell the scheduler to schedule the execution of the function with name
@ -519,13 +519,14 @@ class Worker(object):
"""
with log_span("ray:submit_task", worker=self):
check_main_thread()
actor_id = ray.local_scheduler.ObjectID(NIL_ACTOR_ID) if actor_id is None else actor_id
# Put large or complex arguments that are passed by value in the object
# store first.
args_for_local_scheduler = []
for arg in args:
if isinstance(arg, local_scheduler.ObjectID):
if isinstance(arg, ray.local_scheduler.ObjectID):
args_for_local_scheduler.append(arg)
elif local_scheduler.check_simple_value(arg):
elif ray.local_scheduler.check_simple_value(arg):
args_for_local_scheduler.append(arg)
else:
args_for_local_scheduler.append(put(arg))
@ -534,9 +535,9 @@ class Worker(object):
num_return_vals, num_cpus, num_gpus = self.function_properties[self.task_driver_id.id()][function_id.id()]
# Submit the task to local scheduler.
task = local_scheduler.Task(
task = ray.local_scheduler.Task(
self.task_driver_id,
local_scheduler.ObjectID(function_id.id()),
ray.local_scheduler.ObjectID(function_id.id()),
args_for_local_scheduler,
num_return_vals,
self.current_task_id,
@ -686,14 +687,18 @@ def initialize_numbuf(worker=global_worker):
This defines a custom serializer for object IDs and also tells numbuf to
serialize several exception classes that we define for error handling.
"""
ray.serialization.set_callbacks()
# Define a custom serializer and deserializer for handling Object IDs.
def objectid_custom_serializer(obj):
class_identifier = serialization.class_identifier(type(obj))
contained_objectids.append(obj)
return obj.id()
def objectid_custom_deserializer(serialized_obj):
return local_scheduler.ObjectID(serialized_obj)
serialization.add_class_to_whitelist(local_scheduler.ObjectID, pickle=False, custom_serializer=objectid_custom_serializer, custom_deserializer=objectid_custom_deserializer)
return ray.local_scheduler.ObjectID(serialized_obj)
serialization.add_class_to_whitelist(ray.local_scheduler.ObjectID,
pickle=False,
custom_serializer=objectid_custom_serializer,
custom_deserializer=objectid_custom_deserializer)
if worker.mode in [SCRIPT_MODE, SILENT_MODE]:
# These should only be called on the driver because register_class will
@ -1045,7 +1050,7 @@ def fetch_and_register_remote_function(key, worker=global_worker):
"module",
"num_cpus",
"num_gpus"])
function_id = local_scheduler.ObjectID(function_id_str)
function_id = ray.local_scheduler.ObjectID(function_id_str)
function_name = function_name.decode("ascii")
num_return_vals = int(num_return_vals)
num_cpus = int(num_cpus)
@ -1211,9 +1216,9 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, a
worker.redis_client = redis.StrictRedis(host=redis_ip_address, port=int(redis_port))
worker.lock = threading.Lock()
# Create an object store client.
worker.plasma_client = plasma.PlasmaClient(info["store_socket_name"], info["manager_socket_name"])
worker.plasma_client = ray.plasma.PlasmaClient(info["store_socket_name"], info["manager_socket_name"])
# Create the local scheduler client.
worker.local_scheduler_client = local_scheduler.LocalSchedulerClient(info["local_scheduler_socket_name"], worker.actor_id)
worker.local_scheduler_client = ray.local_scheduler.LocalSchedulerClient(info["local_scheduler_socket_name"], worker.actor_id)
# Register the worker with Redis.
if mode in [SCRIPT_MODE, SILENT_MODE]:
# The concept of a driver is the same as the concept of a "job". Register
@ -1249,12 +1254,12 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, a
else:
# Try to use true randomness.
np.random.seed(None)
worker.current_task_id = local_scheduler.ObjectID(np.random.bytes(20))
worker.current_task_id = ray.local_scheduler.ObjectID(np.random.bytes(20))
# When tasks are executed on remote workers in the context of multiple
# drivers, the task driver ID is used to keep track of which driver is
# responsible for the task so that error messages will be propagated to the
# correct driver.
worker.task_driver_id = local_scheduler.ObjectID(worker.worker_id)
worker.task_driver_id = ray.local_scheduler.ObjectID(worker.worker_id)
# Reset the state of the numpy random number generator.
np.random.set_state(numpy_state)
# Set other fields needed for computing task IDs.
@ -1471,7 +1476,7 @@ def put(value, worker=global_worker):
if worker.mode == PYTHON_MODE:
# In PYTHON_MODE, ray.put is the identity operation
return value
object_id = local_scheduler.compute_put_id(worker.current_task_id, worker.put_index)
object_id = ray.local_scheduler.compute_put_id(worker.current_task_id, worker.put_index)
worker.put_object(object_id, value)
worker.put_index += 1
return object_id
@ -1504,8 +1509,8 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker):
object_id_strs = [object_id.id() for object_id in object_ids]
timeout = timeout if timeout is not None else 2 ** 30
ready_ids, remaining_ids = worker.plasma_client.wait(object_id_strs, timeout, num_returns)
ready_ids = [local_scheduler.ObjectID(object_id) for object_id in ready_ids]
remaining_ids = [local_scheduler.ObjectID(object_id) for object_id in remaining_ids]
ready_ids = [ray.local_scheduler.ObjectID(object_id) for object_id in ready_ids]
remaining_ids = [ray.local_scheduler.ObjectID(object_id) for object_id in remaining_ids]
return ready_ids, remaining_ids
def wait_for_function(function_id, driver_id, timeout=5, worker=global_worker):
@ -1932,7 +1937,7 @@ def get_arguments_for_execution(function_name, serialized_args, worker=global_wo
"""
arguments = []
for (i, arg) in enumerate(serialized_args):
if isinstance(arg, local_scheduler.ObjectID):
if isinstance(arg, ray.local_scheduler.ObjectID):
# get the object from the local object store
argument = worker.get_object([arg])[0]
if isinstance(argument, RayTaskError):
@ -1966,7 +1971,7 @@ def store_outputs_in_objstore(objectids, outputs, worker=global_worker):
function.
"""
for i in range(len(objectids)):
if isinstance(outputs[i], local_scheduler.ObjectID):
if isinstance(outputs[i], ray.local_scheduler.ObjectID):
raise Exception("This remote function returned an ObjectID as its {}th return value. This is not allowed.".format(i))
for i in range(len(objectids)):
worker.put_object(objectids[i], outputs[i])