General attribute-based heterogeneity support with hard and soft constraints (#248)

* attribute-based heterogeneity-awareness in global scheduler and photon

* minor post-rebase fix

* photon: enforce dynamic capacity constraint on task dispatch

* globalsched: cap the number of times we try to schedule a task in round robin

* propagating ability to specify resource capacity to ray.init

* adding resources to remote function export and fetch/register

* globalsched: remove unused functions; update cached photon resource capacity (until next photon heartbeat)

* Add some integration tests.

* globalsched: cleanup + factor out constraint checking

* lots of style

* task_spec_required_resource: global refactor

* clang format

* clang format + comment update in photon

* clang format photon comment

* valgrind

* reduce verbosity for Travis

* Add test for scheduler load balancing.

* addressing comments

* refactoring global scheduler algorithm

* Minor cleanups.

* Linting.

* Fix array_test.py and linting.

* valgrind fix for photon tests

* Attempt to fix stress tests.

* fix hashmap free

* fix hashmap free comment

* memset photon resource vectors to 0 in case they get used before the first heartbeat

* More whitespace changes.

* Undo whitespace error I introduced.
This commit is contained in:
Alexey Tumanov 2017-02-09 01:34:14 -08:00 committed by Robert Nishihara
parent 1a7e1c47cb
commit dfb6107b22
22 changed files with 1037 additions and 226 deletions

View file

@ -21,6 +21,7 @@ from plasma.utils import random_object_id, generate_metadata, write_to_data_buff
USE_VALGRIND = False
PLASMA_STORE_MEMORY = 1000000000
ID_SIZE = 20
NUM_CLUSTER_NODES = 2
# These constants must match the scheduling state enum in task.h.
TASK_STATUS_WAITING = 1
@ -43,13 +44,16 @@ def random_task_id():
def random_function_id():
return photon.ObjectID(np.random.bytes(ID_SIZE))
def random_object_id():
return photon.ObjectID(np.random.bytes(ID_SIZE))
def new_port():
return random.randint(10000, 65535)
class TestGlobalScheduler(unittest.TestCase):
def setUp(self):
# Start a Redis server.
# Start one Redis server and N pairs of (plasma, photon)
redis_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../core/src/common/thirdparty/redis/src/redis-server")
redis_module = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../core/src/common/redis_module/libray_redis_module.so")
assert os.path.isfile(redis_path)
@ -61,29 +65,47 @@ class TestGlobalScheduler(unittest.TestCase):
time.sleep(0.1)
# Create a Redis client.
self.redis_client = redis.StrictRedis(host=node_ip_address, port=redis_port)
# Start the global scheduler.
# Start one global scheduler.
self.p1 = global_scheduler.start_global_scheduler(redis_address, use_valgrind=USE_VALGRIND)
# Start the Plasma store.
plasma_store_name, self.p2 = plasma.start_plasma_store()
self.plasma_store_pids = []
self.plasma_manager_pids = []
self.local_scheduler_pids = []
self.plasma_clients = []
self.photon_clients = []
for i in range(NUM_CLUSTER_NODES):
# Start the Plasma store. Plasma store name is randomly generated.
plasma_store_name, p2 = plasma.start_plasma_store()
self.plasma_store_pids.append(p2)
# Start the Plasma manager.
plasma_manager_name, self.p3, plasma_manager_port = plasma.start_plasma_manager(plasma_store_name, redis_address)
self.plasma_address = "{}:{}".format(node_ip_address, plasma_manager_port)
self.plasma_client = plasma.PlasmaClient(plasma_store_name, plasma_manager_name)
# Assumption: Plasma manager name and port are randomly generated by the plasma module.
plasma_manager_name, p3, plasma_manager_port = plasma.start_plasma_manager(plasma_store_name, redis_address)
self.plasma_manager_pids.append(p3)
plasma_address = "{}:{}".format(node_ip_address, plasma_manager_port)
plasma_client = plasma.PlasmaClient(plasma_store_name, plasma_manager_name)
self.plasma_clients.append(plasma_client)
# Start the local scheduler.
local_scheduler_name, self.p4 = photon.start_local_scheduler(
local_scheduler_name, p4 = photon.start_local_scheduler(
plasma_store_name,
plasma_manager_name=plasma_manager_name,
plasma_address=self.plasma_address,
redis_address=redis_address)
plasma_address=plasma_address,
redis_address=redis_address,
static_resource_list=[None, 0])
# Connect to the scheduler.
self.photon_client = photon.PhotonClient(local_scheduler_name)
photon_client = photon.PhotonClient(local_scheduler_name)
self.photon_clients.append(photon_client)
self.local_scheduler_pids.append(p4)
def tearDown(self):
# Check that the processes are still alive.
self.assertEqual(self.p1.poll(), None)
self.assertEqual(self.p2.poll(), None)
self.assertEqual(self.p3.poll(), None)
self.assertEqual(self.p4.poll(), None)
for p2 in self.plasma_store_pids:
self.assertEqual(p2.poll(), None)
for p3 in self.plasma_manager_pids:
self.assertEqual(p3.poll(), None)
for p4 in self.local_scheduler_pids:
self.assertEqual(p4.poll(), None)
self.assertEqual(self.redis_process.poll(), None)
# Kill the global scheduler.
@ -94,9 +116,10 @@ class TestGlobalScheduler(unittest.TestCase):
os._exit(-1)
else:
self.p1.kill()
self.p2.kill()
self.p3.kill()
self.p4.kill()
# Kill local schedulers, plasma managers, and plasma stores.
map(subprocess.Popen.kill, self.local_scheduler_pids)
map(subprocess.Popen.kill, self.plasma_manager_pids)
map(subprocess.Popen.kill, self.plasma_store_pids)
# Kill Redis. In the event that we are using valgrind, this needs to happen
# after we kill the global scheduler.
self.redis_process.kill()
@ -123,15 +146,21 @@ class TestGlobalScheduler(unittest.TestCase):
return db_client_id
def test_task_default_resources(self):
task1 = photon.Task(random_driver_id(), random_function_id(), [random_object_id()], 0, random_task_id(), 0)
self.assertEqual(task1.required_resources(), [1.0, 0.0])
task2 = photon.Task(random_driver_id(), random_function_id(), [random_object_id()], 0, random_task_id(), 0, [1.0, 2.0])
self.assertEqual(task2.required_resources(), [1.0, 2.0])
def test_redis_only_single_task(self):
"""
Tests global scheduler functionality by interacting with Redis and checking
task state transitions in Redis only. TODO(atumanov): implement.
"""
# Check precondition for this test:
# There should be three db clients, the global scheduler, the local
# scheduler, and the plasma manager.
self.assertEqual(len(self.redis_client.keys("{}*".format(DB_CLIENT_PREFIX))), 3)
# There should be 2n+1 db clients: the global scheduler + one photon and one plasma per node.
self.assertEqual(len(self.redis_client.keys("{}*".format(DB_CLIENT_PREFIX))),
2 * NUM_CLUSTER_NODES + 1)
db_client_id = self.get_plasma_manager_id()
assert(db_client_id != None)
assert(db_client_id.startswith(b"CL:"))
@ -140,21 +169,23 @@ class TestGlobalScheduler(unittest.TestCase):
def test_integration_single_task(self):
# There should be three db clients, the global scheduler, the local
# scheduler, and the plasma manager.
self.assertEqual(len(self.redis_client.keys("{}*".format(DB_CLIENT_PREFIX))), 3)
self.assertEqual(len(self.redis_client.keys("{}*".format(DB_CLIENT_PREFIX))),
2 * NUM_CLUSTER_NODES + 1)
num_return_vals = [0, 1, 2, 3, 5, 10]
# There should not be anything else in Redis yet.
self.assertEqual(len(self.redis_client.keys("*")), 3)
self.assertEqual(len(self.redis_client.keys("*")), 2 * NUM_CLUSTER_NODES + 1)
# Insert the object into Redis.
data_size = 0xf1f0
metadata_size = 0x40
object_dep, memory_buffer, metadata = create_object(self.plasma_client, data_size, metadata_size, seal=True)
plasma_client = self.plasma_clients[0]
object_dep, memory_buffer, metadata = create_object(plasma_client, data_size, metadata_size, seal=True)
# Sleep before submitting task to photon.
time.sleep(0.1)
# Submit a task to Redis.
task = photon.Task(random_driver_id(), random_function_id(), [photon.ObjectID(object_dep)], num_return_vals[0], random_task_id(), 0)
self.photon_client.submit(task)
self.photon_clients[0].submit(task)
time.sleep(0.1)
# There should now be a task in Redis, and it should get assigned to the
# local scheduler
@ -184,7 +215,8 @@ class TestGlobalScheduler(unittest.TestCase):
def integration_many_tasks_helper(self, timesync=True):
# There should be three db clients, the global scheduler, the local
# scheduler, and the plasma manager.
self.assertEqual(len(self.redis_client.keys("{}*".format(DB_CLIENT_PREFIX))), 3)
self.assertEqual(len(self.redis_client.keys("{}*".format(DB_CLIENT_PREFIX))),
2 * NUM_CLUSTER_NODES + 1)
num_return_vals = [0, 1, 2, 3, 5, 10]
# Submit a bunch of tasks to Redis.
@ -193,12 +225,13 @@ class TestGlobalScheduler(unittest.TestCase):
# Create a new object for each task.
data_size = np.random.randint(1 << 20)
metadata_size = np.random.randint(1 << 10)
object_dep, memory_buffer, metadata = create_object(self.plasma_client, data_size, metadata_size, seal=True)
plasma_client = self.plasma_clients[0]
object_dep, memory_buffer, metadata = create_object(plasma_client, data_size, metadata_size, seal=True)
if timesync:
# Give 10ms for object info handler to fire (long enough to yield CPU).
time.sleep(0.010)
task = photon.Task(random_driver_id(), random_function_id(), [photon.ObjectID(object_dep)], num_return_vals[0], random_task_id(), 0)
self.photon_client.submit(task)
self.photon_clients[0].submit(task)
# Check that there are the correct number of tasks in Redis and that they
# all get assigned to the local scheduler.
num_retries = 10

View file

@ -2,6 +2,7 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import multiprocessing
import os
import random
import subprocess
@ -14,7 +15,7 @@ def start_local_scheduler(plasma_store_name, plasma_manager_name=None,
worker_path=None, plasma_address=None,
node_ip_address="127.0.0.1", redis_address=None,
use_valgrind=False, use_profiler=False,
redirect_output=False):
redirect_output=False, static_resource_list=None):
"""Start a local scheduler process.
Args:
@ -37,6 +38,9 @@ def start_local_scheduler(plasma_store_name, plasma_manager_name=None,
profiler. If this is True, use_valgrind must be False.
redirect_output (bool): True if stdout and stderr should be redirected to
/dev/null.
static_resource_list (list): A list of integers specifying the local
scheduler's resource capacities. The resources should appear in an order
matching the order defined in task.h.
Return:
A tuple of the name of the local scheduler socket and the process ID of the
@ -71,6 +75,24 @@ def start_local_scheduler(plasma_store_name, plasma_manager_name=None,
command += ["-r", redis_address]
if plasma_address is not None:
command += ["-a", plasma_address]
# We want to be able to support independently setting capacity for each of the
# supported resource types. Thus, the list can be None or contain any number
# of None values.
if static_resource_list is None:
static_resource_list = [None, None]
if static_resource_list[0] is None:
# By default, use the number of hardware execution threads for the number of
# cores.
static_resource_list[0] = multiprocessing.cpu_count()
if static_resource_list[1] is None:
# By default, do not configure any GPUs on this node.
static_resource_list[1] = 0
# Pass the resource capacity string to the photon scheduler in all cases.
# Sanity check to make sure all resource capacities in the list are numeric
# (int or float).
assert(all([x == int or x == float for x in map(type, static_resource_list)]))
command += ["-c", ",".join(map(str, static_resource_list))]
with open(os.devnull, "w") as FNULL:
stdout = FNULL if redirect_output else None
stderr = FNULL if redirect_output else None

View file

@ -100,6 +100,8 @@ class PlasmaClient(object):
store_socket_name (str): Name of the socket the plasma store is listening at.
manager_socket_name (str): Name of the socket the plasma manager is listening at.
"""
self.store_socket_name = store_socket_name
self.manager_socket_name = manager_socket_name
self.alive = True
if manager_socket_name is not None:

View file

@ -264,7 +264,8 @@ def start_global_scheduler(redis_address, cleanup=True, redirect_output=False):
def start_local_scheduler(redis_address, node_ip_address, plasma_store_name,
plasma_manager_name, worker_path, plasma_address=None,
cleanup=True, redirect_output=False):
cleanup=True, redirect_output=False,
static_resource_list=None):
"""Start a local scheduler process.
Args:
@ -281,6 +282,8 @@ def start_local_scheduler(redis_address, node_ip_address, plasma_store_name,
that imported services exits.
redirect_output (bool): True if stdout and stderr should be redirected to
/dev/null.
static_resource_list (list): An ordered list of the configured resource
capacities for this local scheduler.
Return:
The name of the local scheduler socket.
@ -292,7 +295,8 @@ def start_local_scheduler(redis_address, node_ip_address, plasma_store_name,
redis_address=redis_address,
plasma_address=plasma_address,
use_profiler=RUN_PHOTON_PROFILER,
redirect_output=redirect_output)
redirect_output=redirect_output,
static_resource_list=static_resource_list)
if cleanup:
all_processes[PROCESS_TYPE_LOCAL_SCHEDULER].append(p)
return local_scheduler_name
@ -386,7 +390,9 @@ def start_ray_processes(address_info=None,
cleanup=True,
redirect_output=False,
include_global_scheduler=False,
include_redis=False):
include_redis=False,
num_cpus=None,
num_gpus=None):
"""Helper method to start Ray processes.
Args:
@ -411,11 +417,22 @@ def start_ray_processes(address_info=None,
start a global scheduler process.
include_redis (bool): If include_redis is True, then start a Redis server
process.
num_cpus: A list of length num_local_schedulers containing the number of
CPUs each local scheduler should be configured with.
num_gpus: A list of length num_local_schedulers containing the number of
GPUs each local scheduler should be configured with.
Returns:
A dictionary of the address information for the processes that were
started.
"""
if not isinstance(num_cpus, list):
num_cpus = num_local_schedulers * [num_cpus]
if not isinstance(num_gpus, list):
num_gpus = num_local_schedulers * [num_gpus]
assert len(num_cpus) == num_local_schedulers
assert len(num_gpus) == num_local_schedulers
if address_info is None:
address_info = {}
address_info["node_ip_address"] = node_ip_address
@ -486,7 +503,8 @@ def start_ray_processes(address_info=None,
worker_path,
plasma_address=plasma_address,
cleanup=cleanup,
redirect_output=redirect_output)
redirect_output=redirect_output,
static_resource_list=[num_cpus[i], num_gpus[i]])
local_scheduler_socket_names.append(local_scheduler_name)
time.sleep(0.1)
@ -517,7 +535,9 @@ def start_ray_node(node_ip_address,
num_local_schedulers=1,
worker_path=None,
cleanup=True,
redirect_output=False):
redirect_output=False,
num_cpus=None,
num_gpus=None):
"""Start the Ray processes for a single node.
This assumes that the Ray processes on some master node have already been
@ -550,7 +570,9 @@ def start_ray_node(node_ip_address,
num_local_schedulers=num_local_schedulers,
worker_path=worker_path,
cleanup=cleanup,
redirect_output=redirect_output)
redirect_output=redirect_output,
num_cpus=num_cpus,
num_gpus=num_gpus)
def start_ray_head(address_info=None,
node_ip_address="127.0.0.1",
@ -558,7 +580,9 @@ def start_ray_head(address_info=None,
num_local_schedulers=1,
worker_path=None,
cleanup=True,
redirect_output=False):
redirect_output=False,
num_cpus=None,
num_gpus=None):
"""Start Ray in local mode.
Args:
@ -579,6 +603,8 @@ def start_ray_head(address_info=None,
method exits.
redirect_output (bool): True if stdout and stderr should be redirected to
/dev/null.
num_cpus (int): number of cpus to configure the local scheduler with.
num_gpus (int): number of gpus to configure the local scheduler with.
Returns:
A dictionary of the address information for the processes that were
@ -592,4 +618,6 @@ def start_ray_head(address_info=None,
cleanup=cleanup,
redirect_output=redirect_output,
include_global_scheduler=True,
include_redis=True)
include_redis=True,
num_cpus=num_cpus,
num_gpus=num_gpus)

View file

@ -479,7 +479,7 @@ class Worker(object):
assert final_results[i][0] == object_ids[i].id()
return [result[1][0] for result in final_results]
def submit_task(self, function_id, func_name, args):
def submit_task(self, function_id, func_name, args, num_cpus, num_gpus):
"""Submit a remote task to the scheduler.
Tell the scheduler to schedule the execution of the function with name
@ -491,6 +491,8 @@ class Worker(object):
args (List[Any]): The arguments to pass into the function. Arguments can
be object IDs or they can be values. If they are values, they
must be serializable objecs.
num_cpus (int): The number of cpu cores this task requires to run.
num_gpus (int): The number of gpus this task requires to run.
"""
with log_span("ray:submit_task", worker=self):
check_main_thread()
@ -511,7 +513,8 @@ class Worker(object):
args_for_photon,
self.num_return_vals[function_id.id()],
self.current_task_id,
self.task_index)
self.task_index,
[num_cpus, num_gpus])
# Increment the worker's task index to track how many tasks have been
# submitted by the current task so far.
self.task_index += 1
@ -734,7 +737,7 @@ def get_address_info_from_redis(redis_address, node_ip_address, num_retries=5):
def _init(address_info=None, start_ray_local=False, object_id_seed=None,
num_workers=None, num_local_schedulers=None,
driver_mode=SCRIPT_MODE):
driver_mode=SCRIPT_MODE, num_cpus=None, num_gpus=None):
"""Helper method to connect to an existing Ray cluster or start a new one.
This method handles two cases. Either a Ray cluster already exists and we
@ -761,6 +764,10 @@ def _init(address_info=None, start_ray_local=False, object_id_seed=None,
only provided if start_ray_local is True.
driver_mode (bool): The mode in which to start the driver. This should be
one of ray.SCRIPT_MODE, ray.PYTHON_MODE, and ray.SILENT_MODE.
num_cpus: A list containing the number of CPUs the local schedulers should
be configured with.
num_gpus: A list containing the number of GPUs the local schedulers should
be configured with.
Returns:
Address information about the started processes.
@ -807,7 +814,8 @@ def _init(address_info=None, start_ray_local=False, object_id_seed=None,
address_info = services.start_ray_head(address_info=address_info,
node_ip_address=node_ip_address,
num_workers=num_workers,
num_local_schedulers=num_local_schedulers)
num_local_schedulers=num_local_schedulers,
num_cpus=num_cpus, num_gpus=num_gpus)
else:
if redis_address is None:
raise Exception("If start_ray_local=False, then redis_address must be provided.")
@ -815,6 +823,8 @@ def _init(address_info=None, start_ray_local=False, object_id_seed=None,
raise Exception("If start_ray_local=False, then num_workers must not be provided.")
if num_local_schedulers is not None:
raise Exception("If start_ray_local=False, then num_local_schedulers must not be provided.")
if num_cpus is not None or num_gpus is not None:
raise Exception("If start_ray_local=False, then num_cpus and num_gpus must not be provided.")
# Get the node IP address if one is not provided.
if node_ip_address is None:
node_ip_address = services.get_node_ip_address(redis_address)
@ -839,7 +849,7 @@ def _init(address_info=None, start_ray_local=False, object_id_seed=None,
return address_info
def init(redis_address=None, node_ip_address=None, object_id_seed=None,
num_workers=None, driver_mode=SCRIPT_MODE):
num_workers=None, driver_mode=SCRIPT_MODE, num_cpus=None, num_gpus=None):
"""Either connect to an existing Ray cluster or start one and connect to it.
This method handles two cases. Either a Ray cluster already exists and we
@ -860,6 +870,8 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None,
redis_address is not provided.
driver_mode (bool): The mode in which to start the driver. This should be
one of ray.SCRIPT_MODE, ray.PYTHON_MODE, and ray.SILENT_MODE.
num_cpus (int): Number of cpus the user wishes all local schedulers to be configured with.
num_gpus (int): Number of gpus the user wishes all local schedulers to be configured with.
Returns:
Address information about the started processes.
@ -873,7 +885,8 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None,
"redis_address": redis_address,
}
return _init(address_info=info, start_ray_local=(redis_address is None),
num_workers=num_workers, driver_mode=driver_mode)
num_workers=num_workers, driver_mode=driver_mode,
num_cpus=num_cpus, num_gpus=num_gpus)
def cleanup(worker=global_worker):
"""Disconnect the driver, and terminate any processes started in init.
@ -964,10 +977,21 @@ If this driver is hanging, start a new one with
def fetch_and_register_remote_function(key, worker=global_worker):
"""Import a remote function."""
driver_id, function_id_str, function_name, serialized_function, num_return_vals, module, function_export_counter = worker.redis_client.hmget(key, ["driver_id", "function_id", "name", "function", "num_return_vals", "module", "function_export_counter"])
driver_id, function_id_str, function_name, serialized_function, num_return_vals, module, function_export_counter, num_cpus, num_gpus = \
worker.redis_client.hmget(key, ["driver_id",
"function_id",
"name",
"function",
"num_return_vals",
"module",
"function_export_counter",
"num_cpus",
"num_gpus"])
function_id = photon.ObjectID(function_id_str)
function_name = function_name.decode("ascii")
num_return_vals = int(num_return_vals)
num_cpus = int(num_cpus)
num_gpus = int(num_gpus)
module = module.decode("ascii")
function_export_counter = int(function_export_counter)
@ -978,7 +1002,10 @@ def fetch_and_register_remote_function(key, worker=global_worker):
# overwritten if the function is unpickled successfully.
def f():
raise Exception("This function was not imported properly.")
worker.functions[function_id.id()] = remote(num_return_vals=num_return_vals, function_id=function_id)(lambda *xs: f())
worker.functions[function_id.id()] = remote(num_return_vals=num_return_vals,
function_id=function_id,
num_cpus=num_cpus,
num_gpus=num_gpus)(lambda *xs: f())
try:
function = pickling.loads(serialized_function)
@ -994,7 +1021,10 @@ def fetch_and_register_remote_function(key, worker=global_worker):
else:
# TODO(rkn): Why is the below line necessary?
function.__module__ = module
worker.functions[function_id.id()] = remote(num_return_vals=num_return_vals, function_id=function_id)(function)
worker.functions[function_id.id()] = remote(num_return_vals=num_return_vals,
function_id=function_id,
num_cpus=num_cpus,
num_gpus=num_gpus)(function)
# Add the function to the function table.
worker.redis_client.rpush("FunctionTable:{}".format(function_id.id()), worker.worker_id)
@ -1207,8 +1237,8 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker):
for name, environment_variable in env._cached_environment_variables:
env.__setattr__(name, environment_variable)
# Export cached remote functions to the workers.
for function_id, func_name, func, num_return_vals in worker.cached_remote_functions:
export_remote_function(function_id, func_name, func, num_return_vals, worker)
for function_id, func_name, func, num_return_vals, num_cpus, num_gpus in worker.cached_remote_functions:
export_remote_function(function_id, func_name, func, num_return_vals, num_cpus, num_gpus, worker)
worker.cached_functions_to_run = None
worker.cached_remote_functions = None
env._cached_environment_variables = None
@ -1576,7 +1606,7 @@ def main_loop(worker=global_worker):
# Push all of the log events to the global state store.
flush_log()
def _submit_task(function_id, func_name, args, worker=global_worker):
def _submit_task(function_id, func_name, args, num_cpus, num_gpus, worker=global_worker):
"""This is a wrapper around worker.submit_task.
We use this wrapper so that in the remote decorator, we can call _submit_task
@ -1584,7 +1614,7 @@ def _submit_task(function_id, func_name, args, worker=global_worker):
serialize remote functions, we don't attempt to serialize the worker object,
which cannot be serialized.
"""
return worker.submit_task(function_id, func_name, args)
return worker.submit_task(function_id, func_name, args, num_cpus, num_gpus)
def _mode(worker=global_worker):
"""This is a wrapper around worker.mode.
@ -1626,7 +1656,7 @@ def _export_environment_variable(name, environment_variable, worker=global_worke
worker.redis_client.rpush("Exports", key)
worker.driver_export_counter += 1
def export_remote_function(function_id, func_name, func, num_return_vals, worker=global_worker):
def export_remote_function(function_id, func_name, func, num_return_vals, num_cpus, num_gpus, worker=global_worker):
check_main_thread()
if _mode(worker) not in [SCRIPT_MODE, SILENT_MODE]:
raise Exception("export_remote_function can only be called on a driver.")
@ -1639,7 +1669,9 @@ def export_remote_function(function_id, func_name, func, num_return_vals, worker
"module": func.__module__,
"function": pickled_func,
"num_return_vals": num_return_vals,
"function_export_counter": worker.driver_export_counter})
"function_export_counter": worker.driver_export_counter,
"num_cpus": num_cpus,
"num_gpus": num_gpus})
worker.redis_client.rpush("Exports", key)
worker.driver_export_counter += 1
@ -1651,7 +1683,7 @@ def remote(*args, **kwargs):
should return.
"""
worker = global_worker
def make_remote_decorator(num_return_vals, func_id=None):
def make_remote_decorator(num_return_vals, num_cpus, num_gpus, func_id=None):
def remote_decorator(func):
func_name = "{}.{}".format(func.__module__, func.__name__)
if func_id is None:
@ -1678,7 +1710,7 @@ def remote(*args, **kwargs):
_env()._reinitialize()
_env()._running_remote_function_locally = False
return result
objectids = _submit_task(function_id, func_name, args)
objectids = _submit_task(function_id, func_name, args, num_cpus, num_gpus)
if len(objectids) == 1:
return objectids[0]
elif len(objectids) > 1:
@ -1722,37 +1754,44 @@ def remote(*args, **kwargs):
if func_name_global_valid: func.__globals__[func.__name__] = func_name_global_value
else: del func.__globals__[func.__name__]
if worker.mode in [SCRIPT_MODE, SILENT_MODE]:
export_remote_function(function_id, func_name, func, num_return_vals)
export_remote_function(function_id, func_name, func, num_return_vals, num_cpus, num_gpus)
elif worker.mode is None:
worker.cached_remote_functions.append((function_id, func_name, func, num_return_vals))
worker.cached_remote_functions.append((function_id, func_name, func, num_return_vals, num_cpus, num_gpus))
return func_invoker
return remote_decorator
num_return_vals = kwargs["num_return_vals"] if "num_return_vals" in kwargs.keys() else 1
num_cpus = kwargs["num_cpus"] if "num_cpus" in kwargs.keys() else 1
num_gpus = kwargs["num_gpus"] if "num_gpus" in kwargs.keys() else 0
if _mode() == WORKER_MODE:
if "function_id" in kwargs:
num_return_vals = kwargs["num_return_vals"]
function_id = kwargs["function_id"]
return make_remote_decorator(num_return_vals, function_id)
return make_remote_decorator(num_return_vals, num_cpus, num_gpus, function_id)
if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
# This is the case where the decorator is just @ray.remote.
num_return_vals = 1
func = args[0]
return make_remote_decorator(num_return_vals)(func)
return make_remote_decorator(num_return_vals, num_cpus, num_gpus)(args[0])
else:
# This is the case where the decorator is something like
# @ray.remote(num_return_vals=2).
assert len(args) == 0 and "num_return_vals" in kwargs, "The @ray.remote decorator must be applied either with no arguments and no parentheses, for example '@ray.remote', or it must be applied with only the argument num_return_vals, like '@ray.remote(num_return_vals=2)'."
num_return_vals = kwargs["num_return_vals"]
error_string = ("The @ray.remote decorator must be applied either with no "
"arguments and no parentheses, for example '@ray.remote', "
"or it must be applied using some of the arguments "
"'num_return_vals', 'num_cpus', or 'num_gpus', like "
"'@ray.remote(num_return_vals=2)'.")
assert len(args) == 0 and ("num_return_vals" in kwargs or
"num_cpus" in kwargs or
"num_gpus" in kwargs), error_string
assert not "function_id" in kwargs
return make_remote_decorator(num_return_vals)
return make_remote_decorator(num_return_vals, num_cpus, num_gpus)
def check_signature_supported(has_kwargs_param, has_vararg_param, keyword_defaults, name):
"""Check if we support the signature of this function.
We currently do not allow remote functions to have **kwargs. We also do not
support keyword argumens in conjunction with a *args argument.
support keyword arguments in conjunction with a *args argument.
Args:
has_kwards_param (bool): True if the function being checked has a **kwargs

View file

@ -275,10 +275,12 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) {
task_id parent_task_id;
/* The number of tasks that the parent task has called prior to this one. */
int parent_counter;
if (!PyArg_ParseTuple(args, "O&O&OiO&i", &PyObjectToUniqueID, &driver_id,
/* Resource vector of the required resources to execute this task. */
PyObject *resource_vector = NULL;
if (!PyArg_ParseTuple(args, "O&O&OiO&i|O", &PyObjectToUniqueID, &driver_id,
&PyObjectToUniqueID, &function_id, &arguments,
&num_returns, &PyObjectToUniqueID, &parent_task_id,
&parent_counter)) {
&parent_counter, &resource_vector)) {
return -1;
}
Py_ssize_t size = PyList_Size(arguments);
@ -317,6 +319,20 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) {
}
}
utarray_free(val_repr_ptrs);
/* Set the resource vector of the task. */
if (resource_vector != NULL) {
CHECK(PyList_Size(resource_vector) == MAX_RESOURCE_INDEX);
for (int i = 0; i < MAX_RESOURCE_INDEX; ++i) {
PyObject *resource_entry = PyList_GetItem(resource_vector, i);
task_spec_set_required_resource(self->spec, i,
PyFloat_AsDouble(resource_entry));
}
} else {
for (int i = 0; i < MAX_RESOURCE_INDEX; ++i) {
task_spec_set_required_resource(self->spec, i,
i == CPU_RESOURCE_INDEX ? 1.0 : 0.0);
}
}
/* Compute the task ID and the return object IDs. */
finish_construct_task_spec(self->spec);
return 0;
@ -367,6 +383,16 @@ static PyObject *PyTask_arguments(PyObject *self) {
return arg_list;
}
static PyObject *PyTask_required_resources(PyObject *self) {
task_spec *task = ((PyTask *) self)->spec;
PyObject *required_resources = PyList_New((Py_ssize_t) MAX_RESOURCE_INDEX);
for (int i = 0; i < MAX_RESOURCE_INDEX; ++i) {
double r = task_spec_get_required_resource(task, i);
PyList_SetItem(required_resources, i, PyFloat_FromDouble(r));
}
return required_resources;
}
static PyObject *PyTask_returns(PyObject *self) {
task_spec *task = ((PyTask *) self)->spec;
int64_t num_returns = task_num_returns(task);
@ -387,6 +413,8 @@ static PyMethodDef PyTask_methods[] = {
"Return the task ID for this task."},
{"arguments", (PyCFunction) PyTask_arguments, METH_NOARGS,
"Return the arguments for the task."},
{"required_resources", (PyCFunction) PyTask_required_resources, METH_NOARGS,
"Return the resource vector of the task."},
{"returns", (PyCFunction) PyTask_returns, METH_NOARGS,
"Return the object IDs for the return values of the task."},
{NULL} /* Sentinel */

View file

@ -3,6 +3,7 @@
#include "db.h"
#include "table.h"
#include "task.h"
/** This struct is sent with heartbeat messages from the local scheduler to the
* global scheduler, and it contains information about the load on the local
@ -14,6 +15,12 @@ typedef struct {
int task_queue_length;
/** The number of workers that are available and waiting for tasks. */
int available_workers;
/** The resource vector of resources generally available to this local
* scheduler. */
double static_resources[MAX_RESOURCE_INDEX];
/** The resource vector of resources currently available to this local
* scheduler. */
double dynamic_resources[MAX_RESOURCE_INDEX];
} local_scheduler_info;
/*

View file

@ -58,6 +58,12 @@ struct task_spec_impl {
* has been written so far, relative to &task_spec->args_and_returns[0] +
* (task_spec->num_args + task_spec->num_returns) * sizeof(task_arg) */
int64_t args_value_offset;
/** Resource vector for this task. A resource vector maps a resource index
* (like "cpu" or "gpu") to the number of units of that resource required.
* Note that this will allow us to support arbitrary attributes:
* For example, we can have a coloring of nodes and "red" can correspond
* to 0.0, "green" to 1.0 and "yellow" to 2.0. */
double required_resources[MAX_RESOURCE_INDEX];
/** Argument and return IDs as well as offsets for pass-by-value args. */
task_arg args_and_returns[0];
};
@ -259,6 +265,12 @@ int64_t task_args_add_val(task_spec *spec, uint8_t *data, int64_t length) {
return spec->arg_index++;
}
void task_spec_set_required_resource(task_spec *spec,
int64_t resource_index,
double value) {
spec->required_resources[resource_index] = value;
}
object_id task_return(task_spec *spec, int64_t return_index) {
/* Check that the task has been constructed. */
DCHECK(!task_ids_equal(spec->task_id, NIL_TASK_ID));
@ -268,6 +280,11 @@ object_id task_return(task_spec *spec, int64_t return_index) {
return ret->obj_id;
}
double task_spec_get_required_resource(const task_spec *spec,
int64_t resource_index) {
return spec->required_resources[resource_index];
}
void free_task_spec(task_spec *spec) {
/* Check that the task has been constructed. */
DCHECK(!task_ids_equal(spec->task_id, NIL_TASK_ID));

View file

@ -3,7 +3,7 @@
/**
* This API specifies the task data structures. It is in C so we can
* easily construct tasks from other languages like Python. The datastructures
* easily construct tasks from other languages like Python. The data structures
* are also defined in such a way that memory is contiguous and all pointers
* are relative, so that we can memcpy the datastructure and ship it over the
* network without serialization and deserialization. */
@ -227,6 +227,45 @@ int64_t task_args_add_val(task_spec *spec, uint8_t *data, int64_t length);
*/
object_id task_return(task_spec *spec, int64_t return_index);
/**
* Indices into resource vectors.
* A resource vector maps a resource index to the number
* of units of that resource required.
*
* The total length of the resource vector is NUM_RESOURCE_INDICES.
*/
typedef enum {
/** Index for number of cpus the task requires. */
CPU_RESOURCE_INDEX = 0,
/** Index for number of gpus the task requires. */
GPU_RESOURCE_INDEX,
/** Total number of different resources in the system. */
MAX_RESOURCE_INDEX
} resource_vector_index;
/**
* Set the value associated to a resource index.
*
* @param spec Task specification.
* @param resource_index Index of the resource in the resource vector.
* @param value Value for the resource. This can be a quantity of this resource
* this task needs or a value for an attribute this task requires.
* @return Void.
*/
void task_spec_set_required_resource(task_spec *spec,
int64_t resource_index,
double value);
/**
* Get the value associated to a resource index.
*
* @param spec Task specification.
* @param resource_index Index of the resource.
* @return How many of this resource the task needs to execute.
*/
double task_spec_get_required_resource(const task_spec *spec,
int64_t resource_index);
/**
* Compute the object id associated to a put call.
*

View file

@ -18,10 +18,20 @@
* global_scheduler_state type. */
UT_icd local_scheduler_icd = {sizeof(local_scheduler), NULL, NULL, NULL};
/**
* Assign the given task to the local scheduler, update Redis and scheduler data
* structures.
*
* @param state Global scheduler state.
* @param task Task to be assigned to the local scheduler.
* @param local_scheduler_id DB client ID for the local scheduler.
* @return Void.
*/
void assign_task_to_local_scheduler(global_scheduler_state *state,
task *task,
db_client_id local_scheduler_id) {
char id_string[ID_STRING_SIZE];
task_spec *spec = task_task_spec(task);
LOG_DEBUG("assigning task to local_scheduler_id = %s",
object_id_to_string(local_scheduler_id, id_string, ID_STRING_SIZE));
task_set_state(task, TASK_STATUS_SCHEDULED);
@ -41,6 +51,14 @@ void assign_task_to_local_scheduler(global_scheduler_state *state,
get_local_scheduler(state, local_scheduler_id);
local_scheduler->num_tasks_sent += 1;
local_scheduler->num_recent_tasks_sent += 1;
/* Resource accounting update for this local scheduler. */
for (int i = 0; i < MAX_RESOURCE_INDEX; i++) {
/* Subtract task's resource from the cached dynamic resource capacity for
* this local scheduler. This will be overwritten on the next heartbeat. */
local_scheduler->info.dynamic_resources[i] =
MAX(0, local_scheduler->info.dynamic_resources[i] -
task_spec_get_required_resource(spec, i));
}
}
global_scheduler_state *init_global_scheduler(event_loop *loop,
@ -63,13 +81,21 @@ void free_global_scheduler(global_scheduler_state *state) {
db_disconnect(state->db);
utarray_free(state->local_schedulers);
destroy_global_scheduler_policy(state->policy_state);
/* Delete the plasma 2 photon association map. */
HASH_ITER(hh, state->plasma_photon_map, entry, tmp) {
HASH_DELETE(hh, state->plasma_photon_map, entry);
/* Now deallocate hash table entry. */
/* Delete the plasma to photon association map. */
HASH_ITER(plasma_photon_hh, state->plasma_photon_map, entry, tmp) {
HASH_DELETE(plasma_photon_hh, state->plasma_photon_map, entry);
/* The hash entry is shared with the photon_plasma hashmap and will be freed
* there. */
free(entry->aux_address);
}
/* Delete the photon to plasma association map. */
HASH_ITER(photon_plasma_hh, state->photon_plasma_map, entry, tmp) {
HASH_DELETE(photon_plasma_hh, state->photon_plasma_map, entry);
/* Now free the shared hash entry -- no longer needed. */
free(entry);
}
/* Free the scheduler object info table. */
scheduler_object_info *object_entry, *tmp_entry;
HASH_ITER(hh, state->scheduler_object_info_table, object_entry, tmp_entry) {
@ -135,20 +161,29 @@ void process_new_db_client(db_client_id db_client_id,
calloc(1, sizeof(aux_address_entry));
plasma_photon_entry->aux_address = strdup(aux_address);
plasma_photon_entry->photon_db_client_id = db_client_id;
HASH_ADD_KEYPTR(
hh, state->plasma_photon_map, plasma_photon_entry->aux_address,
strlen(plasma_photon_entry->aux_address), plasma_photon_entry);
HASH_ADD_KEYPTR(plasma_photon_hh, state->plasma_photon_map,
plasma_photon_entry->aux_address,
strlen(plasma_photon_entry->aux_address),
plasma_photon_entry);
/* Add photon_db_client_id -> plasma_manager ip:port association to state.
*/
HASH_ADD(photon_plasma_hh, state->photon_plasma_map, photon_db_client_id,
sizeof(plasma_photon_entry->photon_db_client_id),
plasma_photon_entry);
#if (RAY_COMMON_LOG_LEVEL <= RAY_COMMON_DEBUG)
{
/* Print the photon to plasma association map so far. */
aux_address_entry *entry, *tmp;
LOG_DEBUG("Photon to Plasma hash map so far:");
HASH_ITER(hh, state->plasma_photon_map, entry, tmp) {
HASH_ITER(plasma_photon_hh, state->plasma_photon_map, entry, tmp) {
LOG_DEBUG("%s -> %s", entry->aux_address,
object_id_to_string(entry->photon_db_client_id, id_string,
ID_STRING_SIZE));
}
}
#endif
/* Add new local scheduler to the state. */
local_scheduler local_scheduler;
@ -157,6 +192,10 @@ void process_new_db_client(db_client_id db_client_id,
local_scheduler.num_recent_tasks_sent = 0;
local_scheduler.info.task_queue_length = 0;
local_scheduler.info.available_workers = 0;
memset(local_scheduler.info.dynamic_resources, 0,
sizeof(local_scheduler.info.dynamic_resources));
memset(local_scheduler.info.static_resources, 0,
sizeof(local_scheduler.info.static_resources));
utarray_push_back(state->local_schedulers, &local_scheduler);
/* Allow the scheduling algorithm to process this event. */

View file

@ -39,12 +39,23 @@ typedef struct {
UT_hash_handle hh;
} scheduler_object_info;
/**
* A struct used for caching Photon to Plasma association.
*/
typedef struct {
char *aux_address; /* Key */
/** IP:port string for the plasma_manager. */
char *aux_address;
/** Photon db client id. */
db_client_id photon_db_client_id;
UT_hash_handle hh;
/** Plasma_manager ip:port -> photon_db_client_id. */
UT_hash_handle plasma_photon_hh;
/** Photon_db_client_id -> plasma_manager ip:port. */
UT_hash_handle photon_plasma_hh;
} aux_address_entry;
/**
* Global scheduler state structure.
*/
typedef struct {
/** The global scheduler event loop. */
event_loop *loop;
@ -56,7 +67,10 @@ typedef struct {
UT_array *local_schedulers;
/** The state managed by the scheduling policy. */
global_scheduler_policy_state *policy_state;
/** The plasma_manager ip:port -> photon_db_client_id association. */
aux_address_entry *plasma_photon_map;
/** The photon_db_client_id -> plasma_manager ip:port association. */
aux_address_entry *photon_plasma_map;
/** Objects cached by this global scheduler instance. */
scheduler_object_info *scheduler_object_info_table;
} global_scheduler_state;
@ -73,6 +87,15 @@ typedef struct {
local_scheduler *get_local_scheduler(global_scheduler_state *state,
db_client_id photon_id);
/**
* Assign the given task to the local scheduler, update Redis and scheduler data
* structures.
*
* @param state Global scheduler state.
* @param task Task to be assigned to the local scheduler.
* @param local_scheduler_id DB client ID for the local scheduler.
* @return Void.
*/
void assign_task_to_local_scheduler(global_scheduler_state *state,
task *task,
db_client_id local_scheduler_id);

View file

@ -10,6 +10,16 @@ global_scheduler_policy_state *init_global_scheduler_policy(void) {
global_scheduler_policy_state *policy_state =
malloc(sizeof(global_scheduler_policy_state));
policy_state->round_robin_index = 0;
int num_weight_elem =
sizeof(policy_state->resource_attribute_weight) / sizeof(double);
for (int i = 0; i < num_weight_elem; i++) {
/* Weight distribution is subject to scheduling policy. Giving all weight
* to the last element of the vector (cached data) is equivalent to
* the transfer-aware policy. */
policy_state->resource_attribute_weight[i] = 1.0 / num_weight_elem;
}
return policy_state;
}
@ -18,6 +28,25 @@ void destroy_global_scheduler_policy(
free(policy_state);
}
/**
* Checks if the given local scheduler satisfies the task's hard constraints.
*
* @param scheduler Local scheduler.
* @param spec Task specification.
* @return True if all tasks's resource constraints are satisfied. False
* otherwise.
*/
bool constraints_satisfied_hard(const local_scheduler *scheduler,
const task_spec *spec) {
for (int i = 0; i < MAX_RESOURCE_INDEX; i++) {
if (scheduler->info.static_resources[i] <
task_spec_get_required_resource(spec, i)) {
return false;
}
}
return true;
}
/**
* This is a helper method that assigns a task to the next local scheduler in a
* round robin fashion.
@ -26,43 +55,40 @@ void handle_task_round_robin(global_scheduler_state *state,
global_scheduler_policy_state *policy_state,
task *task) {
CHECKM(utarray_len(state->local_schedulers) > 0,
"No local schedulers. We currently don't handle this case.")
local_scheduler *scheduler = (local_scheduler *) utarray_eltptr(
state->local_schedulers, policy_state->round_robin_index);
policy_state->round_robin_index += 1;
policy_state->round_robin_index %= utarray_len(state->local_schedulers);
assign_task_to_local_scheduler(state, task, scheduler->id);
}
"No local schedulers. We currently don't handle this case.");
local_scheduler *scheduler = NULL;
task_spec *task_spec = task_task_spec(task);
int i;
int num_retries = 1;
bool task_satisfied = false;
/**
* This is a helper method that assigns a task to the local scheduler with the
* minimal load.
*/
void handle_task_minimum_load(global_scheduler_state *state,
global_scheduler_policy_state *policy_state,
task *task) {
CHECKM(utarray_len(state->local_schedulers) > 0,
"No local schedulers. We currently don't handle this case.")
int current_minimal_load_estimate = INT_MAX;
local_scheduler *current_local_scheduler_ptr = NULL;
for (int i = 0; i < utarray_len(state->local_schedulers); ++i) {
local_scheduler *local_scheduler_ptr =
(local_scheduler *) utarray_eltptr(state->local_schedulers, i);
int load_estimate = local_scheduler_ptr->info.task_queue_length +
local_scheduler_ptr->num_recent_tasks_sent;
if (load_estimate <= current_minimal_load_estimate) {
current_minimal_load_estimate = load_estimate;
current_local_scheduler_ptr = local_scheduler_ptr;
for (i = policy_state->round_robin_index; !task_satisfied && num_retries > 0;
i = (i + 1) % utarray_len(state->local_schedulers)) {
if (i == policy_state->round_robin_index) {
num_retries--;
}
scheduler = (local_scheduler *) utarray_eltptr(state->local_schedulers, i);
task_satisfied = constraints_satisfied_hard(scheduler, task_spec);
}
if (task_satisfied) {
/* Update next index to try and assign the task. Note that the counter i has
* been advanced. */
policy_state->round_robin_index = i;
assign_task_to_local_scheduler(state, task, scheduler->id);
} else {
/* TODO(atumanov): propagate the error to the driver, which submitted
* this impossible task and/or cache the task to consider when new
* local schedulers register. */
}
DCHECK(current_local_scheduler_ptr != NULL);
assign_task_to_local_scheduler(state, task, current_local_scheduler_ptr->id);
}
object_size_entry *create_object_size_hashmap(global_scheduler_state *state,
task_spec *task_spec,
bool *has_args_by_ref) {
bool *has_args_by_ref,
int64_t *task_data_size) {
object_size_entry *s = NULL, *object_size_table = NULL;
*task_data_size = 0;
for (int i = 0; i < task_num_args(task_spec); i++) {
/* Object ids are only available for args by references.
@ -88,6 +114,8 @@ object_size_entry *create_object_size_hashmap(global_scheduler_state *state,
obj_info_entry->data_size);
/* Object is known to the scheduler. For each of its locations, add size. */
int64_t object_size = obj_info_entry->data_size;
/* Add each object's size to task's size. */
*task_data_size += object_size;
char **p = NULL;
char id_string[ID_STRING_SIZE];
LOG_DEBUG("locations for an arg_by_ref obj_id = %s",
@ -134,7 +162,8 @@ db_client_id get_photon_id(global_scheduler_state *state,
if (plasma_location != NULL) {
LOG_DEBUG("max object size location found : %s", plasma_location);
/* Lookup association of plasma location to photon. */
HASH_FIND_STR(state->plasma_photon_map, plasma_location, aux_entry);
HASH_FIND(plasma_photon_hh, state->plasma_photon_map, plasma_location,
uthash_strlen(plasma_location), aux_entry);
if (aux_entry) {
LOG_DEBUG("found photon db client association for plasma ip:port = %s",
aux_entry->aux_address);
@ -164,66 +193,143 @@ db_client_id get_photon_id(global_scheduler_state *state,
return photon_id;
}
double inner_product(double a[], double b[], int size) {
double result = 0;
for (int i = 0; i < size; i++) {
result += a[i] * b[i];
}
return result;
}
double calculate_object_size_fraction(global_scheduler_state *state,
local_scheduler *scheduler,
object_size_entry *object_size_table,
int64_t total_task_object_size) {
/* Look up its cached object size in the hashmap, normalize by total object
* size for this task. */
/* Aggregate object size for this task. */
double object_size_fraction = 0;
if (total_task_object_size > 0) {
/* Does this node contribute anything to this task object size? */
/* Lookup scheduler->id in photon_plasma_map to get plasma aux address,
* which is used as the key for object_size_table.
* This uses the plasma aux address to locate the object_size this node
* contributes. */
aux_address_entry *photon_plasma_pair = NULL;
HASH_FIND(photon_plasma_hh, state->photon_plasma_map, &(scheduler->id),
sizeof(scheduler->id), photon_plasma_pair);
if (photon_plasma_pair != NULL) {
object_size_entry *s = NULL;
/* Found this node's photon to plasma mapping. Use the corresponding
* plasma key to see if this node has any cached objects for this task. */
HASH_FIND_STR(object_size_table, photon_plasma_pair->aux_address, s);
if (s != NULL) {
/* This node has some of this task's objects. Calculate what fraction.
*/
CHECK(strcmp(s->object_location, photon_plasma_pair->aux_address) == 0);
object_size_fraction =
MIN(1, (double) (s->total_object_size) / total_task_object_size);
}
}
}
return object_size_fraction;
}
double calculate_score_dynvec_normalized(global_scheduler_state *state,
local_scheduler *scheduler,
const task_spec *task_spec,
double object_size_fraction) {
/* The object size fraction is now calculated for this (task,node) pair. */
/* Construct the normalized dynamic resource attribute vector */
double normalized_dynvec[MAX_RESOURCE_INDEX + 1];
memset(&normalized_dynvec, 0, sizeof(normalized_dynvec));
for (int i = 0; i < MAX_RESOURCE_INDEX; i++) {
double resreqval = task_spec_get_required_resource(task_spec, i);
if (resreqval <= 0) {
/* Skip and leave normalized dynvec value == 0. */
continue;
}
normalized_dynvec[i] =
MIN(1, scheduler->info.dynamic_resources[i] / resreqval);
}
normalized_dynvec[MAX_RESOURCE_INDEX] = object_size_fraction;
/* Finally, calculate the score. */
double score = inner_product(normalized_dynvec,
state->policy_state->resource_attribute_weight,
MAX_RESOURCE_INDEX + 1);
return score;
}
double calculate_cost_pending(const global_scheduler_state *state,
const local_scheduler *scheduler) {
/* TODO: make sure that num_recent_tasks_sent is reset on each heartbeat. */
return scheduler->num_recent_tasks_sent + scheduler->info.task_queue_length;
}
/**
* Main new task handling function in the global scheduler.
*
* @param state Global scheduler state.
* @param policy_state State specific to the scheduling policy.
* @param task New task to be scheduled.
* @return Void.
*/
void handle_task_waiting(global_scheduler_state *state,
global_scheduler_policy_state *policy_state,
task *task) {
task_spec *task_spec = task_task_spec(task);
CHECKM(task_spec != NULL,
"task wait handler encounted a task with NULL spec");
/* Local hash table to keep track of aggregate object sizes per local
* scheduler. */
object_size_entry *tmp, *s = NULL, *object_size_table = NULL;
object_size_entry *object_size_table = NULL;
bool has_args_by_ref = false;
bool task_feasible = false;
/* The total size of the task's data. */
int64_t task_object_size = 0;
object_size_table =
create_object_size_hashmap(state, task_spec, &has_args_by_ref);
object_size_table = create_object_size_hashmap(
state, task_spec, &has_args_by_ref, &task_object_size);
if (!object_size_table) {
char id_string[ID_STRING_SIZE];
if (has_args_by_ref) {
LOG_DEBUG(
"Using simple policy. Didn't find objects in GS cache for task = %s",
object_id_to_string(task_task_id(task), id_string, ID_STRING_SIZE));
/* TODO(future): wait for object notification and try again. */
} else {
LOG_DEBUG("Using simple policy. No arguments passed by reference.");
/* Go through all the nodes, calculate the score for each, pick max score. */
local_scheduler *scheduler = NULL;
double best_photon_score = INT32_MIN;
CHECKM(best_photon_score < 0, "We might have a floating point underflow");
db_client_id best_photon_id = NIL_ID; /* best node to send this task */
for (scheduler = (local_scheduler *) utarray_front(state->local_schedulers);
scheduler != NULL; scheduler = (local_scheduler *) utarray_next(
state->local_schedulers, scheduler)) {
/* For each local scheduler, calculate its score. Check hard constraints
* first. */
if (!constraints_satisfied_hard(scheduler, task_spec)) {
continue;
}
UNUSED(id_string);
handle_task_round_robin(state, policy_state, task);
task_feasible = true;
/* This node satisfies the hard capacity constraint. Calculate its score. */
double score = -1 * calculate_cost_pending(state, scheduler);
if (score > best_photon_score) {
best_photon_score = score;
best_photon_id = scheduler->id;
}
} /* For each local scheduler. */
free_object_size_hashmap(object_size_table);
if (!task_feasible) {
char id_string[ID_STRING_SIZE];
LOG_ERROR(
"Infeasible task. No nodes satisfy hard constraints for task = %s",
object_id_to_string(task_task_id(task), id_string, ID_STRING_SIZE));
/* TODO(atumanov): propagate this error to the task's driver and/or
* cache the task in case new local schedulers satisfy it in the future. */
return;
}
LOG_DEBUG("Using transfer-aware policy");
/* Pick maximum object_size and assign task to that scheduler. */
int64_t max_object_size = 0;
const char *max_object_location = NULL;
HASH_ITER(hh, object_size_table, s, tmp) {
if (s->total_object_size > max_object_size) {
max_object_size = s->total_object_size;
max_object_location = s->object_location;
}
}
db_client_id photon_id = get_photon_id(state, max_object_location);
CHECKM(!IS_NIL_ID(photon_id), "Failed to find an LS: num_args = %" PRId64
" num_returns = %" PRId64 "\n",
task_num_args(task_spec), task_num_returns(task_spec));
/* Get the local scheduler for this photon ID. */
local_scheduler *local_scheduler_ptr = get_local_scheduler(state, photon_id);
CHECK(local_scheduler_ptr != NULL);
/* If this local scheduler has enough capacity, assign the task to this local
* scheduler. Otherwise assign the task to the global scheduler with the
* minimal load. */
int64_t load_estimate = local_scheduler_ptr->info.task_queue_length +
local_scheduler_ptr->num_recent_tasks_sent;
if (local_scheduler_ptr->info.available_workers > 0 &&
load_estimate < local_scheduler_ptr->info.total_num_workers) {
assign_task_to_local_scheduler(state, task, photon_id);
} else {
handle_task_minimum_load(state, policy_state, task);
}
free_object_size_hashmap(object_size_table);
CHECKM(!IS_NIL_ID(best_photon_id),
"Task is feasible, but doesn't have a local scheduler assigned.");
/* A local scheduler ID was found, so assign the task. */
assign_task_to_local_scheduler(state, task, best_photon_id);
}
void handle_object_available(global_scheduler_state *state,
@ -232,12 +338,6 @@ void handle_object_available(global_scheduler_state *state,
/* Do nothing for now. */
}
void handle_local_scheduler_heartbeat(
global_scheduler_state *state,
global_scheduler_policy_state *policy_state) {
/* Do nothing for now. */
}
void handle_new_local_scheduler(global_scheduler_state *state,
global_scheduler_policy_state *policy_state,
db_client_id db_client_id) {

View file

@ -23,6 +23,7 @@ typedef enum {
struct global_scheduler_policy_state {
/** The index of the next local scheduler to assign a task to. */
int64_t round_robin_index;
double resource_attribute_weight[MAX_RESOURCE_INDEX + 1];
};
typedef struct {
@ -49,13 +50,11 @@ void destroy_global_scheduler_policy(
global_scheduler_policy_state *policy_state);
/**
* Assign the task to a local scheduler. At the moment, this simply assigns the
* task to the local schedulers in a round robin fashion. If there are no local
* schedulers it fails.
* Main new task handling function in the global scheduler.
*
* @param state The global scheduler state.
* @param policy_state The state managed by the scheduling policy.
* @param task The task that is waiting to be scheduled.
* @param state Global scheduler state.
* @param policy_state State specific to the scheduling policy.
* @param task New task to be scheduled.
* @return Void.
*/
void handle_task_waiting(global_scheduler_state *state,

View file

@ -74,6 +74,12 @@ typedef struct {
/** Input buffer, used for reading input in process_message to avoid
* allocation for each call to process_message. */
UT_array *input_buffer;
/** Vector of static attributes associated with the node owned by this local
* scheduler. */
double static_resources[MAX_RESOURCE_INDEX];
/** Vector of dynamic attributes associated with the node owned by this local
* scheduler. */
double dynamic_resources[MAX_RESOURCE_INDEX];
} local_scheduler_state;
/** Contains all information associated with a local scheduler client. */

View file

@ -9,6 +9,7 @@
#include "state/object_table.h"
#include "photon.h"
#include "photon_scheduler.h"
#include "common/task.h"
typedef struct task_queue_entry {
/** The task that is queued. */
@ -121,6 +122,11 @@ void provide_scheduler_info(local_scheduler_state *state,
info->task_queue_length =
waiting_task_queue_length + dispatch_task_queue_length;
info->available_workers = utarray_len(algorithm_state->available_workers);
/* Copy static and dynamic resource information. */
for (int i = 0; i < MAX_RESOURCE_INDEX; i++) {
info->dynamic_resources[i] = state->dynamic_resources[i];
info->static_resources[i] = state->static_resources[i];
}
}
/**
@ -259,25 +265,43 @@ int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context) {
*/
void dispatch_tasks(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state) {
/* Assign tasks while there are still tasks in the dispatch queue and
* available workers. */
while ((algorithm_state->dispatch_task_queue != NULL) &&
(utarray_len(algorithm_state->available_workers) > 0)) {
LOG_DEBUG("Dispatching task");
/* Pop a task from the dispatch queue. */
task_queue_entry *dispatched_task = algorithm_state->dispatch_task_queue;
DL_DELETE(algorithm_state->dispatch_task_queue, dispatched_task);
task_queue_entry *elt, *tmp;
/* Assign as many tasks as we can, while there are workers available. */
DL_FOREACH_SAFE(algorithm_state->dispatch_task_queue, elt, tmp) {
if (utarray_len(algorithm_state->available_workers) <= 0) {
/* There are no more available workers, so we're done. */
break;
}
/* TODO(atumanov): as an optimization, we can also check if all dynamic
* capacity is zero and bail early. */
bool task_satisfied = true;
for (int i = 0; i < MAX_RESOURCE_INDEX; i++) {
if (task_spec_get_required_resource(elt->spec, i) >
state->dynamic_resources[i]) {
/* Insufficient capacity for this task, proceed to the next task. */
task_satisfied = false;
break;
}
}
if (!task_satisfied) {
continue; /* Proceed to the next task. */
}
/* Dispatch this task to an available worker and dequeue the task. */
LOG_DEBUG("Dispatching task");
/* Get the last available worker in the available worker queue. */
local_scheduler_client **worker = (local_scheduler_client **) utarray_back(
algorithm_state->available_workers);
/* Tell the available worker to execute the task. */
assign_task_to_worker(state, dispatched_task->spec, *worker);
assign_task_to_worker(state, elt->spec, *worker);
/* Remove the available worker from the queue and free the struct. */
utarray_pop_back(algorithm_state->available_workers);
free_task_spec(dispatched_task->spec);
free(dispatched_task);
}
print_resource_info(state, elt->spec);
/* Deque the task. */
DL_DELETE(algorithm_state->dispatch_task_queue, elt);
free_task_spec(elt->spec);
free(elt);
} /* End for each task in the dispatch queue. */
}
/**
@ -420,18 +444,34 @@ void give_task_to_global_scheduler(local_scheduler_state *state,
NULL);
}
bool resource_constraints_satisfied(local_scheduler_state *state,
task_spec *spec) {
/* At the local scheduler, if required resource vector exceeds either static
* or dynamic resource vector, the resource constraint is not satisfied. */
for (int i = 0; i < MAX_RESOURCE_INDEX; i++) {
if (task_spec_get_required_resource(spec, i) > state->static_resources[i] ||
task_spec_get_required_resource(spec, i) >
state->dynamic_resources[i]) {
return false;
}
}
return true;
}
void handle_task_submitted(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
task_spec *spec) {
/* If this task's dependencies are available locally, and if there is an
* available worker, then assign this task to an available worker. If we
* cannot assign the task to a worker immediately, we either queue the task in
* the local task queue or we pass the task to the global scheduler. For now,
* we pass the task along to the global scheduler if there is one. */
if (can_run(algorithm_state, spec) &&
(utarray_len(algorithm_state->available_workers) > 0)) {
/* Dependencies are ready and there is an available worker, so dispatch the
* task. */
/* TODO(atumanov): if static is satisfied and local objects ready, but dynamic
* resource is currently unavailable, then consider queueing task locally and
* recheck dynamic next time. */
/* If this task's constraints are satisfied, dependencies are available
* locally, and there is an available worker, then enqueue the task in the
* dispatch queue and trigger task dispatch. Otherwise, pass the task along to
* the global scheduler if there is one. */
if (resource_constraints_satisfied(state, spec) &&
(utarray_len(algorithm_state->available_workers) > 0) &&
can_run(algorithm_state, spec)) {
queue_dispatch_task(state, algorithm_state, spec, false);
} else {
/* Give the task to the global scheduler to schedule, if it exists. */
@ -457,8 +497,8 @@ void handle_task_scheduled(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
task_spec *spec) {
/* This callback handles tasks that were assigned to this local scheduler by
* the global scheduler, so we can safely assert that there is a connection
* to the database. */
* the global scheduler, so we can safely assert that there is a connection to
* the database. */
DCHECK(state->db != NULL);
DCHECK(state->config.global_scheduler_exists);
/* Push the task to the appropriate queue. */

View file

@ -25,6 +25,34 @@ UT_icd workers_icd = {sizeof(local_scheduler_client *), NULL, NULL, NULL};
UT_icd byte_icd = {sizeof(uint8_t), NULL, NULL, NULL};
/**
* A helper function for printing available and requested resource information.
*
* @param state Local scheduler state.
* @param spec Task specification object.
* @return Void.
*/
void print_resource_info(const local_scheduler_state *state,
const task_spec *spec) {
#if RAY_COMMON_LOG_LEVEL <= RAY_COMMON_DEBUG
/* Print information about available and requested resources. */
char buftotal[256], bufavail[256], bufresreq[256];
snprintf(bufavail, sizeof(bufavail), "%8.4f %8.4f",
state->dynamic_resources[CPU_RESOURCE_INDEX],
state->dynamic_resources[GPU_RESOURCE_INDEX]);
snprintf(buftotal, sizeof(buftotal), "%8.4f %8.4f",
state->static_resources[CPU_RESOURCE_INDEX],
state->static_resources[GPU_RESOURCE_INDEX]);
if (spec) {
snprintf(bufresreq, sizeof(bufresreq), "%8.4f %8.4f",
task_spec_get_required_resource(spec, CPU_RESOURCE_INDEX),
task_spec_get_required_resource(spec, GPU_RESOURCE_INDEX));
}
LOG_DEBUG("Resources: [total=%s][available=%s][requested=%s]", buftotal,
bufavail, spec ? bufresreq : "n/a");
#endif
}
local_scheduler_state *init_local_scheduler(
const char *node_ip_address,
event_loop *loop,
@ -35,7 +63,8 @@ local_scheduler_state *init_local_scheduler(
const char *plasma_manager_socket_name,
const char *plasma_manager_address,
bool global_scheduler_exists,
const char *start_worker_command) {
const char *start_worker_command,
const double static_resource_conf[]) {
local_scheduler_state *state = malloc(sizeof(local_scheduler_state));
/* Set the configuration struct for the local scheduler. */
if (start_worker_command != NULL) {
@ -86,6 +115,14 @@ local_scheduler_state *init_local_scheduler(
/* Add the input buffer. This is used to read in messages from clients without
* having to reallocate a new buffer every time. */
utarray_new(state->input_buffer, &byte_icd);
/* Initialize resource vectors. */
for (int i = 0; i < MAX_RESOURCE_INDEX; i++) {
state->static_resources[i] = state->dynamic_resources[i] =
static_resource_conf[i];
}
/* Print some debug information about resource configuration. */
print_resource_info(state, NULL);
return state;
};
@ -149,16 +186,28 @@ void assign_task_to_worker(local_scheduler_state *state,
LOG_FATAL("Failed to give task to client on fd %d.", worker->sock);
}
}
/* Update the global task table. */
if (state->db != NULL) {
task *task =
alloc_task(spec, TASK_STATUS_RUNNING, get_db_client_id(state->db));
task_table_update(state->db, task, (retry_info *) &photon_retry, NULL,
NULL);
/* Resource accounting:
* Update dynamic resource vector in the local scheduler state. */
for (int i = 0; i < MAX_RESOURCE_INDEX; i++) {
state->dynamic_resources[i] -= task_spec_get_required_resource(spec, i);
CHECKM(state->dynamic_resources[i] >= 0,
"photon dynamic resources dropped to %8.4f\t%8.4f\n",
state->dynamic_resources[0], state->dynamic_resources[1]);
}
print_resource_info(state, spec);
task *task = alloc_task(spec, TASK_STATUS_RUNNING,
state->db ? get_db_client_id(state->db) : NIL_ID);
/* Record which task this worker is executing. This will be freed in
* process_message when the worker sends a GET_TASK message to the local
* scheduler. */
worker->task_in_progress = copy_task(task);
/* Update the global task table. */
if (state->db != NULL) {
task_table_update(state->db, task, (retry_info *) &photon_retry, NULL,
NULL);
} else {
free_task(task);
}
}
@ -302,16 +351,27 @@ void process_message(event_loop *loop,
free(value);
} break;
case GET_TASK: {
/* Update the task table with the completed task. */
if (state->db != NULL && worker->task_in_progress != NULL) {
/* If this worker reports a completed task: account for resources. */
if (worker->task_in_progress != NULL) {
task_spec *spec = task_task_spec(worker->task_in_progress);
/* Return dynamic resources back for the task in progress. */
for (int i = 0; i < MAX_RESOURCE_INDEX; i++) {
state->dynamic_resources[i] += task_spec_get_required_resource(spec, i);
/* Sanity-check resource vector boundary conditions. */
CHECK(state->dynamic_resources[i] <= state->static_resources[i]);
}
print_resource_info(state, spec);
/* If we're connected to Redis, update tables. */
if (state->db != NULL) {
/* Update control state tables. */
task_set_state(worker->task_in_progress, TASK_STATUS_DONE);
task_table_update(state->db, worker->task_in_progress,
(retry_info *) &photon_retry, NULL, NULL);
/* The call to task_table_update takes ownership of the task_in_progress,
* so we set the pointer to NULL so it is not used. */
worker->task_in_progress = NULL;
} else if (worker->task_in_progress != NULL) {
/* The call to task_table_update takes ownership of the
* task_in_progress, so we set the pointer to NULL so it is not used. */
} else {
free_task(worker->task_in_progress);
}
worker->task_in_progress = NULL;
}
/* Let the scheduling algorithm process the fact that there is an available
@ -398,7 +458,8 @@ void start_server(const char *node_ip_address,
const char *plasma_manager_socket_name,
const char *plasma_manager_address,
bool global_scheduler_exists,
const char *start_worker_command) {
const char *start_worker_command,
const double static_resource_conf[]) {
/* Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
* to a client that has already died, the local scheduler could die. */
signal(SIGPIPE, SIG_IGN);
@ -407,7 +468,8 @@ void start_server(const char *node_ip_address,
g_state = init_local_scheduler(
node_ip_address, loop, redis_addr, redis_port, socket_name,
plasma_store_socket_name, plasma_manager_socket_name,
plasma_manager_address, global_scheduler_exists, start_worker_command);
plasma_manager_address, global_scheduler_exists, start_worker_command,
static_resource_conf);
/* Register a callback for registering new clients. */
event_loop_add_file(loop, fd, EVENT_LOOP_READ, new_client_connection,
g_state);
@ -458,9 +520,12 @@ int main(int argc, char *argv[]) {
char *node_ip_address = NULL;
/* The command to run when starting new workers. */
char *start_worker_command = NULL;
/* Comma-separated list of configured resource capabilities for this node. */
char *static_resource_list = NULL;
double static_resource_conf[MAX_RESOURCE_INDEX];
int c;
bool global_scheduler_exists = true;
while ((c = getopt(argc, argv, "s:r:p:m:ga:h:w:")) != -1) {
while ((c = getopt(argc, argv, "s:r:p:m:ga:h:w:c:")) != -1) {
switch (c) {
case 's':
scheduler_socket_name = optarg;
@ -486,10 +551,30 @@ int main(int argc, char *argv[]) {
case 'w':
start_worker_command = optarg;
break;
case 'c':
static_resource_list = optarg;
break;
default:
LOG_FATAL("unknown option %c", c);
}
}
if (!static_resource_list) {
/* Use defaults for this node's static resource configuration. */
memset(&static_resource_conf[0], 0, sizeof(static_resource_conf));
static_resource_conf[CPU_RESOURCE_INDEX] = DEFAULT_NUM_CPUS;
static_resource_conf[GPU_RESOURCE_INDEX] = DEFAULT_NUM_GPUS;
} else {
/* Tokenize the string. */
const char delim[2] = ",";
char *token;
int idx = 0; /* Index into the resource vector. */
token = strtok(static_resource_list, delim);
while (token != NULL && idx < MAX_RESOURCE_INDEX) {
static_resource_conf[idx++] = atoi(token);
/* Attempt to get the next token. */
token = strtok(NULL, delim);
}
}
if (!scheduler_socket_name) {
LOG_FATAL("please specify socket for incoming connections with -s switch");
}
@ -510,7 +595,8 @@ int main(int argc, char *argv[]) {
}
start_server(node_ip_address, scheduler_socket_name, NULL, -1,
plasma_store_socket_name, NULL, plasma_manager_address,
global_scheduler_exists, start_worker_command);
global_scheduler_exists, start_worker_command,
static_resource_conf);
} else {
/* Parse the Redis address into an IP address and a port. */
char redis_addr[16] = {0};
@ -530,7 +616,8 @@ int main(int argc, char *argv[]) {
start_server(node_ip_address, scheduler_socket_name, &redis_addr[0],
atoi(redis_port), plasma_store_socket_name,
plasma_manager_socket_name, plasma_manager_address,
global_scheduler_exists, start_worker_command);
global_scheduler_exists, start_worker_command,
static_resource_conf);
}
}
#endif

View file

@ -7,6 +7,9 @@
/* The duration between local scheduler heartbeats. */
#define LOCAL_SCHEDULER_HEARTBEAT_TIMEOUT_MILLISECONDS 100
#define DEFAULT_NUM_CPUS INT16_MAX
#define DEFAULT_NUM_GPUS 0
/**
* Establish a connection to a new client.
*
@ -62,6 +65,8 @@ void process_plasma_notification(event_loop *loop,
*/
void reconstruct_object(local_scheduler_state *state, object_id object_id);
void print_resource_info(const local_scheduler_state *s, const task_spec *spec);
/** The following methods are for testing purposes only. */
#ifdef PHOTON_TEST
local_scheduler_state *init_local_scheduler(
@ -74,7 +79,8 @@ local_scheduler_state *init_local_scheduler(
const char *plasma_store_socket_name,
const char *plasma_manager_address,
bool global_scheduler_exists,
const char *worker_path);
const char *worker_path,
const double static_resource_vector[]);
void free_local_scheduler(local_scheduler_state *state);

View file

@ -48,10 +48,13 @@ typedef struct {
photon_mock *init_photon_mock(bool connect_to_redis) {
const char *redis_addr = NULL;
int redis_port = -1;
const double static_resource_conf[MAX_RESOURCE_INDEX] = {DEFAULT_NUM_CPUS,
DEFAULT_NUM_GPUS};
if (connect_to_redis) {
redis_addr = "127.0.0.1";
redis_port = 6379;
}
photon_mock *mock = malloc(sizeof(photon_mock));
memset(mock, 0, sizeof(photon_mock));
mock->loop = event_loop_create();
@ -67,7 +70,8 @@ photon_mock *init_photon_mock(bool connect_to_redis) {
mock->photon_state = init_local_scheduler(
"127.0.0.1", mock->loop, redis_addr, redis_port,
utstring_body(photon_socket_name), plasma_store_socket_name,
utstring_body(plasma_manager_socket_name), NULL, false, NULL);
utstring_body(plasma_manager_socket_name), NULL, false, NULL,
static_resource_conf);
/* Connect a Photon client. */
mock->conn = photon_connect(utstring_body(photon_socket_name));
new_client_connection(mock->loop, mock->photon_fd,
@ -87,7 +91,10 @@ void destroy_photon_mock(photon_mock *mock) {
}
void reset_worker(photon_mock *mock, local_scheduler_client *worker) {
if (worker->task_in_progress) {
free_task(worker->task_in_progress);
worker->task_in_progress = NULL;
}
}
/**

View file

@ -174,7 +174,7 @@ bool require_space(eviction_state *eviction_state,
objects_to_evict);
LOG_INFO(
"There is not enough space to create this object, so evicting "
"%" PRId64 " objects to free up %" PRId64 " bytes.\n",
"%" PRId64 " objects to free up %" PRId64 " bytes.",
*num_objects_to_evict, num_bytes_evicted);
} else {
num_bytes_evicted = 0;

View file

@ -66,7 +66,7 @@ class DistributedArrayTest(unittest.TestCase):
def testMethods(self):
for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]:
reload(module)
ray.worker._init(start_ray_local=True, num_workers=10, num_local_schedulers=2)
ray.worker._init(start_ray_local=True, num_workers=10, num_local_schedulers=2, num_cpus=[10, 10])
x = da.zeros.remote([9, 25, 51], "float")
assert_equal(ray.get(da.assemble.remote(x)), np.zeros([9, 25, 51]))

View file

@ -291,7 +291,7 @@ class APITest(unittest.TestCase):
ray.worker.cleanup()
def testDefiningRemoteFunctions(self):
ray.init(num_workers=3)
ray.init(num_workers=3, num_cpus=3)
# Test that we can define a remote function in the shell.
@ray.remote
@ -503,7 +503,7 @@ class APITest(unittest.TestCase):
ray.worker.cleanup()
def testPassingInfoToAllWorkers(self):
ray.init(num_workers=10)
ray.init(num_workers=10, num_cpus=10)
def f(worker_info):
sys.path.append(worker_info)
@ -805,5 +805,289 @@ class UtilsTest(unittest.TestCase):
ray.worker.cleanup()
class ResourcesTest(unittest.TestCase):
def testResourceConstraints(self):
num_workers = 20
ray.init(num_workers=num_workers, num_cpus=10, num_gpus=2)
# Attempt to wait for all of the workers to start up.
ray.worker.global_worker.run_function_on_all_workers(lambda worker_info: sys.path.append(worker_info["counter"]))
@ray.remote(num_cpus=0)
def get_worker_id():
time.sleep(1)
return sys.path[-1]
while True:
if len(set(ray.get([get_worker_id.remote() for _ in range(num_workers)]))) == num_workers:
break
time_buffer = 0.3
# At most 10 copies of this can run at once.
@ray.remote(num_cpus=1)
def f(n):
time.sleep(n)
start_time = time.time()
ray.get([f.remote(0.5) for _ in range(10)])
duration = time.time() - start_time
self.assertLess(duration, 0.5 + time_buffer)
self.assertGreater(duration, 0.5)
start_time = time.time()
ray.get([f.remote(0.5) for _ in range(11)])
duration = time.time() - start_time
self.assertLess(duration, 1 + time_buffer)
self.assertGreater(duration, 1)
@ray.remote(num_cpus=3)
def f(n):
time.sleep(n)
start_time = time.time()
ray.get([f.remote(0.5) for _ in range(3)])
duration = time.time() - start_time
self.assertLess(duration, 0.5 + time_buffer)
self.assertGreater(duration, 0.5)
start_time = time.time()
ray.get([f.remote(0.5) for _ in range(4)])
duration = time.time() - start_time
self.assertLess(duration, 1 + time_buffer)
self.assertGreater(duration, 1)
@ray.remote(num_gpus=1)
def f(n):
time.sleep(n)
start_time = time.time()
ray.get([f.remote(0.5) for _ in range(2)])
duration = time.time() - start_time
self.assertLess(duration, 0.5 + time_buffer)
self.assertGreater(duration, 0.5)
start_time = time.time()
ray.get([f.remote(0.5) for _ in range(3)])
duration = time.time() - start_time
self.assertLess(duration, 1 + time_buffer)
self.assertGreater(duration, 1)
start_time = time.time()
ray.get([f.remote(0.5) for _ in range(4)])
duration = time.time() - start_time
self.assertLess(duration, 1 + time_buffer)
self.assertGreater(duration, 1)
ray.worker.cleanup()
def testMultiResourceConstraints(self):
num_workers = 20
ray.init(num_workers=num_workers, num_cpus=10, num_gpus=10)
# Attempt to wait for all of the workers to start up.
ray.worker.global_worker.run_function_on_all_workers(lambda worker_info: sys.path.append(worker_info["counter"]))
@ray.remote(num_cpus=0)
def get_worker_id():
time.sleep(1)
return sys.path[-1]
while True:
if len(set(ray.get([get_worker_id.remote() for _ in range(num_workers)]))) == num_workers:
break
@ray.remote(num_cpus=1, num_gpus=9)
def f(n):
time.sleep(n)
@ray.remote(num_cpus=9, num_gpus=1)
def g(n):
time.sleep(n)
time_buffer = 0.3
start_time = time.time()
ray.get([f.remote(0.5), g.remote(0.5)])
duration = time.time() - start_time
self.assertLess(duration, 0.5 + time_buffer)
self.assertGreater(duration, 0.5)
start_time = time.time()
ray.get([f.remote(0.5), f.remote(0.5)])
duration = time.time() - start_time
self.assertLess(duration, 1 + time_buffer)
self.assertGreater(duration, 1)
start_time = time.time()
ray.get([g.remote(0.5), g.remote(0.5)])
duration = time.time() - start_time
self.assertLess(duration, 1 + time_buffer)
self.assertGreater(duration, 1)
start_time = time.time()
ray.get([f.remote(0.5), f.remote(0.5), g.remote(0.5), g.remote(0.5)])
duration = time.time() - start_time
self.assertLess(duration, 1 + time_buffer)
self.assertGreater(duration, 1)
ray.worker.cleanup()
def testMultipleLocalSchedulers(self):
# This test will define a bunch of tasks that can only be assigned to
# specific local schedulers, and we will check that they are assigned to the
# correct local schedulers.
address_info = ray.worker._init(start_ray_local=True,
num_local_schedulers=3,
num_cpus=[100, 5, 10],
num_gpus=[0, 5, 1])
# Define a bunch of remote functions that all return the socket name of the
# plasma store. Since there is a one-to-one correspondence between plasma
# stores and local schedulers (at least right now), this can be used to
# identify which local scheduler the task was assigned to.
# This must be run on the zeroth local scheduler.
@ray.remote(num_cpus=11)
def run_on_0():
return ray.worker.global_worker.plasma_client.store_socket_name
# This must be run on the first local scheduler.
@ray.remote(num_gpus=2)
def run_on_1():
return ray.worker.global_worker.plasma_client.store_socket_name
# This must be run on the second local scheduler.
@ray.remote(num_cpus=6, num_gpus=1)
def run_on_2():
return ray.worker.global_worker.plasma_client.store_socket_name
# This can be run anywhere.
@ray.remote(num_cpus=0, num_gpus=0)
def run_on_0_1_2():
return ray.worker.global_worker.plasma_client.store_socket_name
# This must be run on the first or second local scheduler.
@ray.remote(num_gpus=1)
def run_on_1_2():
return ray.worker.global_worker.plasma_client.store_socket_name
# This must be run on the zeroth or second local scheduler.
@ray.remote(num_cpus=8)
def run_on_0_2():
return ray.worker.global_worker.plasma_client.store_socket_name
def run_lots_of_tasks():
names = []
results = []
for i in range(100):
index = np.random.randint(6)
if index == 0:
names.append("run_on_0")
results.append(run_on_0.remote())
elif index == 1:
names.append("run_on_1")
results.append(run_on_1.remote())
elif index == 2:
names.append("run_on_2")
results.append(run_on_2.remote())
elif index == 3:
names.append("run_on_0_1_2")
results.append(run_on_0_1_2.remote())
elif index == 4:
names.append("run_on_1_2")
results.append(run_on_1_2.remote())
elif index == 5:
names.append("run_on_0_2")
results.append(run_on_0_2.remote())
return names, results
store_names = [object_store_address.name for object_store_address in address_info["object_store_addresses"]]
def validate_names_and_results(names, results):
for name, result in zip(names, ray.get(results)):
if name == "run_on_0":
self.assertIn(result, [store_names[0]])
elif name == "run_on_1":
self.assertIn(result, [store_names[1]])
elif name == "run_on_2":
self.assertIn(result, [store_names[2]])
elif name == "run_on_0_1_2":
self.assertIn(result, [store_names[0], store_names[1], store_names[2]])
elif name == "run_on_1_2":
self.assertIn(result, [store_names[1], store_names[2]])
elif name == "run_on_0_2":
self.assertIn(result, [store_names[0], store_names[2]])
else:
raise Exception("This should be unreachable.")
self.assertEqual(set(ray.get(results)), set(store_names))
names, results = run_lots_of_tasks()
validate_names_and_results(names, results)
# Make sure the same thing works when this is nested inside of a task.
@ray.remote
def run_nested1():
names, results = run_lots_of_tasks()
return names, results
@ray.remote
def run_nested2():
names, results = ray.get(run_nested1.remote())
return names, results
names, results = ray.get(run_nested2.remote())
validate_names_and_results(names, results)
ray.worker.cleanup()
class SchedulingAlgorithm(unittest.TestCase):
def testLoadBalancing(self):
num_workers = 21
num_local_schedulers = 3
ray.worker._init(start_ray_local=True, num_workers=num_workers, num_local_schedulers=num_local_schedulers)
@ray.remote
def f():
time.sleep(0.001)
return ray.worker.global_worker.plasma_client.store_socket_name
locations = ray.get([f.remote() for _ in range(100)])
names = set(locations)
self.assertEqual(len(names), num_local_schedulers)
counts = [locations.count(name) for name in names]
for count in counts:
self.assertGreater(count, 30)
locations = ray.get([f.remote() for _ in range(1000)])
names = set(locations)
self.assertEqual(len(names), num_local_schedulers)
counts = [locations.count(name) for name in names]
for count in counts:
self.assertGreater(count, 200)
ray.worker.cleanup()
def testLoadBalancingWithDependencies(self):
num_workers = 3
num_local_schedulers = 3
ray.worker._init(start_ray_local=True, num_workers=num_workers, num_local_schedulers=num_local_schedulers)
@ray.remote
def f(x):
return ray.worker.global_worker.plasma_client.store_socket_name
# This object will be local to one of the local schedulers. Make sure this
# doesn't prevent tasks from being scheduled on other local schedulers.
x = ray.put(np.zeros(1000000))
locations = ray.get([f.remote(x) for _ in range(100)])
names = set(locations)
self.assertEqual(len(names), num_local_schedulers)
counts = [locations.count(name) for name in names]
for count in counts:
self.assertGreater(count, 30)
ray.worker.cleanup()
if __name__ == "__main__":
unittest.main(verbosity=2)

View file

@ -15,7 +15,8 @@ class TaskTests(unittest.TestCase):
for num_workers_per_scheduler in [4]:
num_workers = num_local_schedulers * num_workers_per_scheduler
ray.worker._init(start_ray_local=True, num_workers=num_workers,
num_local_schedulers=num_local_schedulers)
num_local_schedulers=num_local_schedulers,
num_cpus=100)
@ray.remote
def f(x):
@ -41,7 +42,8 @@ class TaskTests(unittest.TestCase):
for num_workers_per_scheduler in [4]:
num_workers = num_local_schedulers * num_workers_per_scheduler
ray.worker._init(start_ray_local=True, num_workers=num_workers,
num_local_schedulers=num_local_schedulers)
num_local_schedulers=num_local_schedulers,
num_cpus=100)
@ray.remote
def f(x):
@ -98,7 +100,8 @@ class TaskTests(unittest.TestCase):
for num_workers_per_scheduler in [4]:
num_workers = num_local_schedulers * num_workers_per_scheduler
ray.worker._init(start_ray_local=True, num_workers=num_workers,
num_local_schedulers=num_local_schedulers)
num_local_schedulers=num_local_schedulers,
num_cpus=100)
@ray.remote
def f(x):
@ -147,7 +150,9 @@ class ReconstructionTests(unittest.TestCase):
# Start the rest of the services in the Ray cluster.
ray.worker._init(address_info=address_info, start_ray_local=True,
num_workers=self.num_local_schedulers, num_local_schedulers=self.num_local_schedulers)
num_workers=self.num_local_schedulers,
num_local_schedulers=self.num_local_schedulers,
num_cpus=100)
def tearDown(self):
self.assertTrue(ray.services.all_processes_alive())