Allow remote functions to specify max executions and kill worker once limit is reached. (#660)

* implement restarting workers after certain number of task executions

* Clean up python code.

* Don't start new worker when an actor disconnects.

* Move wait_for_pid_to_exit to test_utils.py.

* Add test.

* Fix linting errors.

* Fix linting.

* Fix typo.
This commit is contained in:
Philipp Moritz 2017-06-13 07:34:58 +00:00 committed by Robert Nishihara
parent 4374ad1453
commit 54925996ca
9 changed files with 214 additions and 116 deletions

View file

@ -13,7 +13,8 @@ import traceback
import ray.local_scheduler
import ray.signature as signature
import ray.worker
from ray.utils import random_string, binary_to_hex, hex_to_binary
from ray.utils import (FunctionProperties, binary_to_hex, hex_to_binary,
random_string)
def random_actor_id():
@ -70,6 +71,12 @@ def fetch_and_register_actor(actor_class_key, worker):
function_id = get_actor_method_function_id(actor_method_name).id()
worker.functions[driver_id][function_id] = (actor_method_name,
temporary_actor_method)
worker.function_properties[driver_id][function_id] = FunctionProperties(
num_return_vals=1,
num_cpus=1,
num_gpus=0,
max_calls=0)
worker.num_task_executions[driver_id][function_id] = 0
try:
unpickled_class = pickle.loads(pickled_class)
@ -236,7 +243,11 @@ def export_actor(actor_id, class_id, actor_method_names, num_cpus, num_gpus,
# TODO(rkn): When we create a second actor, we are probably overwriting
# the values from the first actor here. This may or may not be a problem.
function_id = get_actor_method_function_id(actor_method_name).id()
worker.function_properties[driver_id][function_id] = (1, num_cpus, 0)
worker.function_properties[driver_id][function_id] = FunctionProperties(
num_return_vals=1,
num_cpus=1,
num_gpus=0,
max_calls=0)
# Get a list of the local schedulers from the client table.
client_table = ray.global_state.client_table()

View file

@ -3,6 +3,8 @@ from __future__ import division
from __future__ import print_function
import json
import os
import psutil
import redis
import time
@ -99,3 +101,33 @@ def _wait_for_event(event_name, redis_address, extra_buffer=0):
time.sleep(extra_buffer)
return events[event_name]
time.sleep(0.1)
def _pid_alive(pid):
"""Check if the process with this PID is alive or not.
Args:
pid: The pid to check.
Returns:
This returns false if the process is dead or defunct. Otherwise, it returns
true.
"""
try:
os.kill(pid, 0)
except OSError:
return False
else:
if psutil.Process(pid).status() == psutil.STATUS_ZOMBIE:
return False
else:
return True
def wait_for_pid_to_exit(pid, timeout=20):
start_time = time.time()
while time.time() - start_time < timeout:
if not _pid_alive(pid):
return
time.sleep(0.1)
raise Exception("Timed out while waiting for process to exit.")

View file

@ -3,6 +3,7 @@ from __future__ import division
from __future__ import print_function
import binascii
import collections
import numpy as np
import sys
@ -55,3 +56,11 @@ def binary_to_hex(identifier):
def hex_to_binary(hex_identifier):
return binascii.unhexlify(hex_identifier)
FunctionProperties = collections.namedtuple("FunctionProperties",
["num_return_vals",
"num_cpus",
"num_gpus",
"max_calls"])
"""FunctionProperties: A named tuple storing remote functions information."""

View file

@ -27,7 +27,7 @@ import ray.signature as signature
import ray.numbuf
import ray.local_scheduler
import ray.plasma
from ray.utils import random_string
from ray.utils import FunctionProperties, random_string
SCRIPT_MODE = 0
WORKER_MODE = 1
@ -226,6 +226,13 @@ class Worker(object):
# and the number of GPUs required by that function). This is used when
# submitting a function (which can be done both on workers and on drivers).
self.function_properties = collections.defaultdict(lambda: {})
# This is a dictionary mapping driver ID to a dictionary that maps remote
# function IDs for that driver to a counter of the number of times that
# remote function has been executed on this worker. The counter is
# incremented every time the function is executed on this worker. When the
# counter reaches the maximum number of executions allowed for a particular
# function, the worker is killed.
self.num_task_executions = collections.defaultdict(lambda: {})
self.connected = False
self.mode = None
self.cached_remote_functions = []
@ -454,7 +461,7 @@ class Worker(object):
args_for_local_scheduler.append(put(arg))
# Look up the various function properties.
num_return_vals, num_cpus, num_gpus = self.function_properties[
function_properties = self.function_properties[
self.task_driver_id.id()][function_id.id()]
# Submit the task to local scheduler.
@ -462,11 +469,11 @@ class Worker(object):
self.task_driver_id,
ray.local_scheduler.ObjectID(function_id.id()),
args_for_local_scheduler,
num_return_vals,
function_properties.num_return_vals,
self.current_task_id,
self.task_index,
actor_id, self.actor_counters[actor_id],
[num_cpus, num_gpus])
[function_properties.num_cpus, function_properties.num_gpus])
# Increment the worker's task index to track how many tasks have been
# submitted by the current task so far.
self.task_index += 1
@ -1058,8 +1065,9 @@ If this driver is hanging, start a new one with
def fetch_and_register_remote_function(key, worker=global_worker):
"""Import a remote function."""
(driver_id, function_id_str, function_name, serialized_function,
num_return_vals, module, num_cpus, num_gpus) = worker.redis_client.hmget(
(driver_id, function_id_str, function_name,
serialized_function, num_return_vals, module,
num_cpus, num_gpus, max_calls) = worker.redis_client.hmget(
key, ["driver_id",
"function_id",
"name",
@ -1067,12 +1075,15 @@ def fetch_and_register_remote_function(key, worker=global_worker):
"num_return_vals",
"module",
"num_cpus",
"num_gpus"])
"num_gpus",
"max_calls"])
function_id = ray.local_scheduler.ObjectID(function_id_str)
function_name = function_name.decode("ascii")
num_return_vals = int(num_return_vals)
num_cpus = int(num_cpus)
num_gpus = int(num_gpus)
function_properties = FunctionProperties(
num_return_vals=int(num_return_vals),
num_cpus=int(num_cpus),
num_gpus=int(num_gpus),
max_calls=int(max_calls))
module = module.decode("ascii")
# This is a placeholder in case the function can't be unpickled. This will be
@ -1082,9 +1093,8 @@ def fetch_and_register_remote_function(key, worker=global_worker):
remote_f_placeholder = remote(function_id=function_id)(lambda *xs: f())
worker.functions[driver_id][function_id.id()] = (function_name,
remote_f_placeholder)
worker.function_properties[driver_id][function_id.id()] = (num_return_vals,
num_cpus,
num_gpus)
worker.function_properties[driver_id][function_id.id()] = function_properties
worker.num_task_executions[driver_id][function_id.id()] = 0
try:
function = pickle.loads(serialized_function)
@ -1410,10 +1420,9 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
worker.run_function_on_all_workers(function)
# Export cached remote functions to the workers.
for info in worker.cached_remote_functions:
(function_id, func_name, func,
func_invoker, num_return_vals, num_cpus, num_gpus) = info
(function_id, func_name, func, func_invoker, function_properties) = info
export_remote_function(function_id, func_name, func, func_invoker,
num_return_vals, num_cpus, num_gpus, worker)
function_properties, worker)
worker.cached_functions_to_run = None
worker.cached_remote_functions = None
@ -1813,6 +1822,17 @@ def main_loop(worker=global_worker):
# Push all of the log events to the global state store.
flush_log()
# Increase the task execution counter.
worker.num_task_executions[task.driver_id().id()][function_id.id()] += 1
reached_max_executions = (
worker.num_task_executions[task.driver_id().id()][function_id.id()] ==
worker.function_properties[task.driver_id().id()]
[function_id.id()].max_calls)
if reached_max_executions:
ray.worker.global_worker.local_scheduler_client.disconnect()
os._exit(0)
def _submit_task(function_id, func_name, args, worker=global_worker):
"""This is a wrapper around worker.submit_task.
@ -1837,14 +1857,13 @@ def _mode(worker=global_worker):
def export_remote_function(function_id, func_name, func, func_invoker,
num_return_vals, num_cpus, num_gpus,
worker=global_worker):
function_properties, worker=global_worker):
check_main_thread()
if _mode(worker) not in [SCRIPT_MODE, SILENT_MODE]:
raise Exception("export_remote_function can only be called on a driver.")
worker.function_properties[worker.task_driver_id.id()][function_id.id()] = (
num_return_vals, num_cpus, num_gpus)
worker.function_properties[
worker.task_driver_id.id()][function_id.id()] = function_properties
task_driver_id = worker.task_driver_id
key = b"RemoteFunction:" + task_driver_id.id() + b":" + function_id.id()
@ -1862,14 +1881,16 @@ def export_remote_function(function_id, func_name, func, func_invoker,
else:
del func.__globals__[func.__name__]
worker.redis_client.hmset(key, {"driver_id": worker.task_driver_id.id(),
worker.redis_client.hmset(key, {
"driver_id": worker.task_driver_id.id(),
"function_id": function_id.id(),
"name": func_name,
"module": func.__module__,
"function": pickled_func,
"num_return_vals": num_return_vals,
"num_cpus": num_cpus,
"num_gpus": num_gpus})
"num_return_vals": function_properties.num_return_vals,
"num_cpus": function_properties.num_cpus,
"num_gpus": function_properties.num_gpus,
"max_calls": function_properties.max_calls})
worker.redis_client.rpush("Exports", key)
@ -1911,7 +1932,7 @@ def compute_function_id(func_name, func):
def remote(*args, **kwargs):
"""This decorator is used to create remote functions.
"""This decorator is used to define remote functions and to define actors.
Args:
num_return_vals (int): The number of object IDs that a call to this
@ -1920,19 +1941,27 @@ def remote(*args, **kwargs):
should only be passed in when defining the remote function on the driver.
num_gpus (int): The number of GPUs needed to execute this function. This
should only be passed in when defining the remote function on the driver.
max_calls (int): The maximum number of tasks of this kind that can be
run on a worker before the worker needs to be restarted.
"""
worker = global_worker
def make_remote_decorator(num_return_vals, num_cpus, num_gpus, func_id=None):
def make_remote_decorator(num_return_vals, num_cpus, num_gpus,
max_calls, func_id=None):
def remote_decorator(func_or_class):
if inspect.isfunction(func_or_class):
return remote_function_decorator(func_or_class)
function_properties = FunctionProperties(
num_return_vals=num_return_vals,
num_cpus=num_cpus,
num_gpus=num_gpus,
max_calls=max_calls)
return remote_function_decorator(func_or_class, function_properties)
if inspect.isclass(func_or_class):
return worker.make_actor(func_or_class, num_cpus, num_gpus)
raise Exception("The @ray.remote decorator must be applied to either a "
"function or to a class.")
def remote_function_decorator(func):
def remote_function_decorator(func, function_properties):
func_name = "{}.{}".format(func.__module__, func.__name__)
if func_id is None:
function_id = compute_function_id(func_name, func)
@ -1983,44 +2012,49 @@ def remote(*args, **kwargs):
# Everything ready - export the function
if worker.mode in [SCRIPT_MODE, SILENT_MODE]:
export_remote_function(function_id, func_name, func, func_invoker,
num_return_vals, num_cpus, num_gpus)
function_properties)
elif worker.mode is None:
worker.cached_remote_functions.append((function_id, func_name, func,
func_invoker, num_return_vals,
num_cpus, num_gpus))
func_invoker,
function_properties))
return func_invoker
return remote_decorator
num_return_vals = (kwargs["num_return_vals"] if "num_return_vals"
in kwargs.keys() else 1)
num_cpus = kwargs["num_cpus"] if "num_cpus" in kwargs.keys() else 1
num_gpus = kwargs["num_gpus"] if "num_gpus" in kwargs.keys() else 0
in kwargs else 1)
num_cpus = kwargs["num_cpus"] if "num_cpus" in kwargs else 1
num_gpus = kwargs["num_gpus"] if "num_gpus" in kwargs else 0
max_calls = kwargs["max_calls"] if "max_calls" in kwargs else 0
if _mode() == WORKER_MODE:
if "function_id" in kwargs:
function_id = kwargs["function_id"]
return make_remote_decorator(num_return_vals, num_cpus, num_gpus,
function_id)
max_calls, function_id)
if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
# This is the case where the decorator is just @ray.remote.
return make_remote_decorator(num_return_vals, num_cpus, num_gpus)(args[0])
return make_remote_decorator(num_return_vals, num_cpus,
num_gpus, max_calls)(args[0])
else:
# This is the case where the decorator is something like
# @ray.remote(num_return_vals=2).
error_string = ("The @ray.remote decorator must be applied either with no "
"arguments and no parentheses, for example '@ray.remote', "
"or it must be applied using some of the arguments "
"'num_return_vals', 'num_cpus', or 'num_gpus', like "
"'@ray.remote(num_return_vals=2)'.")
"'num_return_vals', 'num_cpus', 'num_gpus', or 'max_calls'"
", like '@ray.remote(num_return_vals=2)'.")
assert len(args) == 0 and ("num_return_vals" in kwargs or
"num_cpus" in kwargs or
"num_gpus" in kwargs), error_string
"num_gpus" in kwargs or
"max_calls" in kwargs), error_string
for key in kwargs:
assert key in ["num_return_vals", "num_cpus", "num_gpus"], error_string
assert key in ["num_return_vals", "num_cpus",
"num_gpus", "max_calls"], error_string
assert "function_id" not in kwargs
return make_remote_decorator(num_return_vals, num_cpus, num_gpus)
return make_remote_decorator(num_return_vals, num_cpus, num_gpus,
max_calls)
def get_arguments_for_execution(function_name, serialized_args,

View file

@ -532,6 +532,33 @@ void assign_task_to_worker(LocalSchedulerState *state,
}
}
void finish_task(LocalSchedulerState *state, LocalSchedulerClient *worker) {
if (worker->task_in_progress != NULL) {
TaskSpec *spec = Task_task_spec(worker->task_in_progress);
/* Return dynamic resources back for the task in progress. TODO(rkn): We
* are currently ignoring resource bookkeeping for actor methods. */
if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) {
CHECK(worker->cpus_in_use ==
TaskSpec_get_required_resource(spec, ResourceIndex_CPU));
CHECK(worker->gpus_in_use.size() ==
TaskSpec_get_required_resource(spec, ResourceIndex_GPU));
release_resources(state, worker, worker->cpus_in_use,
worker->gpus_in_use.size());
}
/* If we're connected to Redis, update tables. */
if (state->db != NULL) {
/* Update control state tables. */
Task_set_state(worker->task_in_progress, TASK_STATUS_DONE);
task_table_update(state->db, worker->task_in_progress, NULL, NULL, NULL);
/* The call to task_table_update takes ownership of the
* task_in_progress, so we set the pointer to NULL so it is not used. */
} else {
Task_free(worker->task_in_progress);
}
worker->task_in_progress = NULL;
}
}
void process_plasma_notification(event_loop *loop,
int client_sock,
void *context,
@ -874,8 +901,14 @@ void process_message(event_loop *loop,
case MessageType_TaskDone: {
} break;
case MessageType_DisconnectClient: {
finish_task(state, worker);
CHECK(!worker->disconnected);
worker->disconnected = true;
/* If the disconnected worker was not an actor, start a new worker to make
* sure there are enough workers in the pool. */
if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) {
start_worker(state, NIL_ACTOR_ID);
}
} break;
case MessageType_EventLogMessage: {
/* Parse the message. */
@ -894,32 +927,8 @@ void process_message(event_loop *loop,
send_client_register_reply(state, worker);
} break;
case MessageType_GetTask: {
/* If this worker reports a completed task: account for resources. */
if (worker->task_in_progress != NULL) {
TaskSpec *spec = Task_task_spec(worker->task_in_progress);
/* Return dynamic resources back for the task in progress. TODO(rkn): We
* are currently ignoring resource bookkeeping for actor methods. */
if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) {
CHECK(worker->cpus_in_use ==
TaskSpec_get_required_resource(spec, ResourceIndex_CPU));
CHECK(worker->gpus_in_use.size() ==
TaskSpec_get_required_resource(spec, ResourceIndex_GPU));
release_resources(state, worker, worker->cpus_in_use,
worker->gpus_in_use.size());
}
/* If we're connected to Redis, update tables. */
if (state->db != NULL) {
/* Update control state tables. */
Task_set_state(worker->task_in_progress, TASK_STATUS_DONE);
task_table_update(state->db, worker->task_in_progress, NULL, NULL,
NULL);
/* The call to task_table_update takes ownership of the
* task_in_progress, so we set the pointer to NULL so it is not used. */
} else {
Task_free(worker->task_in_progress);
}
worker->task_in_progress = NULL;
}
/* If this worker reports a completed task, account for resources. */
finish_task(state, worker);
/* Let the scheduling algorithm process the fact that there is an available
* worker. */
if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) {

View file

@ -48,6 +48,16 @@ void assign_task_to_worker(LocalSchedulerState *state,
int64_t task_spec_size,
LocalSchedulerClient *worker);
/*
* This function is called whenever a task has finished on one of the workers.
* It updates the resource accounting and the global state store.
*
* @param state The local scheduler state.
* @param worker The worker that finished the task.
* @return Void.
*/
void finish_task(LocalSchedulerState *state, LocalSchedulerClient *worker);
/**
* This is the callback that is used to process a notification from the Plasma
* store that an object has been sealed.

View file

@ -6,7 +6,7 @@ import os
import time
import ray
from ray.test.multi_node_tests import (_wait_for_nodes_to_join,
from ray.test.test_utils import (_wait_for_nodes_to_join,
_broadcast_event,
_wait_for_event)

View file

@ -3,13 +3,13 @@ from __future__ import division
from __future__ import print_function
import os
import psutil
import time
import ray
from ray.test.multi_node_tests import (_wait_for_nodes_to_join,
from ray.test.test_utils import (_wait_for_nodes_to_join,
_broadcast_event,
_wait_for_event)
_wait_for_event,
wait_for_pid_to_exit)
# This test should be run with 5 nodes, which have 0, 1, 2, 3, and 4 GPUs for a
# total of 10 GPUs. It should be run with 7 drivers. Drivers 2 through 6 must
@ -18,27 +18,6 @@ from ray.test.multi_node_tests import (_wait_for_nodes_to_join,
total_num_nodes = 5
def pid_alive(pid):
"""Check if the process with this PID is alive or not.
Args:
pid: The pid to check.
Returns:
This returns false if the process is dead or defunct. Otherwise, it returns
true.
"""
try:
os.kill(pid, 0)
except OSError:
return False
else:
if psutil.Process(pid).status() == psutil.STATUS_ZOMBIE:
return False
else:
return True
def actor_event_name(driver_index, actor_index):
return "DRIVER_{}_ACTOR_{}_RUNNING".format(driver_index, actor_index)
@ -229,14 +208,6 @@ def cleanup_driver(redis_address, driver_index):
actors_one_gpu.append(try_to_create_actor(Actor1, driver_index,
10 + 3 + i))
def wait_for_pid_to_exit(pid, timeout=20):
start_time = time.time()
while time.time() - start_time < timeout:
if not pid_alive(pid):
return
time.sleep(0.1)
raise Exception("Timed out while waiting for process to exit.")
removed_workers = 0
# Make sure that the PIDs for the long-running tasks from driver 0 and driver

View file

@ -14,6 +14,7 @@ import time
import unittest
import ray.test.test_functions as test_functions
import ray.test.test_utils
if sys.version_info >= (3, 0):
from importlib import reload
@ -1315,6 +1316,27 @@ class WorkerPoolTests(unittest.TestCase):
ray.worker.cleanup()
def testMaxCallTasks(self):
ray.init(num_cpus=1)
@ray.remote(max_calls=1)
def f():
return os.getpid()
pid = ray.get(f.remote())
ray.test.test_utils.wait_for_pid_to_exit(pid)
@ray.remote(max_calls=2)
def f():
return os.getpid()
pid1 = ray.get(f.remote())
pid2 = ray.get(f.remote())
self.assertEqual(pid1, pid2)
ray.test.test_utils.wait_for_pid_to_exit(pid1)
ray.worker.cleanup()
class SchedulingAlgorithm(unittest.TestCase):