Move all config constants into single file. (#1192)

* Initial pass at factoring out C++ configuration into a single file.

* Expose config through Python.

* Forward declarations.

* Fixes with Python extensions

* Remove old code.

* Consistent naming for constants.

* Fixes

* Fix linting.

* More linting.

* Whitespace

* rename config -> _config.

* Move config inside a class.

* update naming convention

* Fix linting.

* More linting

* More linting.

* Add in some more constants.

* Fix linting
This commit is contained in:
Robert Nishihara 2017-11-08 11:10:38 -08:00 committed by Philipp Moritz
parent a8032b9ca1
commit 1c6b30b5e2
27 changed files with 659 additions and 178 deletions

View file

@ -40,6 +40,7 @@ except ImportError as e:
e.args += (helpful_message,)
raise
from ray.local_scheduler import _config # noqa: E402
from ray.worker import (error_info, init, connect, disconnect,
get, put, wait, remote, log_event, log_span,
flush_log, get_gpu_ids, get_webui_url,
@ -59,7 +60,7 @@ __all__ = ["error_info", "init", "connect", "disconnect", "get", "put", "wait",
"remote", "log_event", "log_span", "flush_log", "actor",
"get_gpu_ids", "get_webui_url", "register_custom_serializer",
"SCRIPT_MODE", "WORKER_MODE", "PYTHON_MODE", "SILENT_MODE",
"global_state", "__version__"]
"global_state", "_config", "__version__"]
import ctypes # noqa: E402
# Windows only

View file

@ -4,8 +4,9 @@ from __future__ import print_function
from ray.core.src.local_scheduler.liblocal_scheduler_library import (
Task, LocalSchedulerClient, ObjectID, check_simple_value, task_from_string,
task_to_string)
task_to_string, _config)
from .local_scheduler_services import start_local_scheduler
__all__ = ["Task", "LocalSchedulerClient", "ObjectID", "check_simple_value",
"task_from_string", "task_to_string", "start_local_scheduler"]
"task_from_string", "task_to_string", "start_local_scheduler",
"_config"]

View file

@ -22,8 +22,6 @@ from ray.worker import NIL_ACTOR_ID
# These variables must be kept in sync with the C codebase.
# common/common.h
HEARTBEAT_TIMEOUT_MILLISECONDS = 100
NUM_HEARTBEATS_TIMEOUT = 100
DB_CLIENT_ID_SIZE = 20
NIL_ID = b"\xff" * DB_CLIENT_ID_SIZE
@ -580,7 +578,7 @@ class Monitor(object):
plasma_manager_ids = list(self.live_plasma_managers.keys())
for plasma_manager_id in plasma_manager_ids:
if ((self.live_plasma_managers[plasma_manager_id]) >=
NUM_HEARTBEATS_TIMEOUT):
ray._config.num_heartbeats_timeout()):
log.warn("Timed out {}".format(PLASMA_MANAGER_CLIENT_TYPE))
# Remove the plasma manager from the managers whose
# heartbeats we're tracking.
@ -599,7 +597,7 @@ class Monitor(object):
# Wait for a heartbeat interval before processing the next round of
# messages.
time.sleep(HEARTBEAT_TIMEOUT_MILLISECONDS * 1e-3)
time.sleep(ray._config.heartbeat_timeout_milliseconds() * 1e-3)
if __name__ == "__main__":

View file

@ -49,10 +49,6 @@ NIL_LOCAL_SCHEDULER_ID = NIL_ID
NIL_FUNCTION_ID = NIL_ID
NIL_ACTOR_ID = NIL_ID
# When performing ray.get, wait 1 second before attemping to reconstruct and
# fetch the object again.
GET_TIMEOUT_MILLISECONDS = 1000
# This must be kept in sync with the `error_types` array in
# common/state/error_table.h.
OBJECT_HASH_MISMATCH_ERROR_TYPE = b"object_hash_mismatch"
@ -372,10 +368,11 @@ class Worker(object):
# long time, if the store is blocked, it can block the manager
# as well as a consequence.
results = []
get_request_size = 10000
for i in range(0, len(object_ids), get_request_size):
for i in range(0, len(object_ids),
ray._config.worker_get_request_size()):
results += self.plasma_client.get(
object_ids[i:(i + get_request_size)],
object_ids[i:(i +
ray._config.worker_get_request_size())],
timeout,
self.serialization_context)
return results
@ -420,12 +417,13 @@ class Worker(object):
# Do an initial fetch for remote objects. We divide the fetch into
# smaller fetches so as to not block the manager for a prolonged period
# of time in a single call.
fetch_request_size = 10000
plain_object_ids = [plasma.ObjectID(object_id.id())
for object_id in object_ids]
for i in range(0, len(object_ids), fetch_request_size):
for i in range(0, len(object_ids),
ray._config.worker_fetch_request_size()):
self.plasma_client.fetch(
plain_object_ids[i:(i + fetch_request_size)])
plain_object_ids[i:(i +
ray._config.worker_fetch_request_size())])
# Get the objects. We initially try to get the objects immediately.
final_results = self.retrieve_and_deserialize(plain_object_ids, 0)
@ -436,7 +434,7 @@ class Worker(object):
if val is plasma.ObjectNotAvailable)
was_blocked = (len(unready_ids) > 0)
# Try reconstructing any objects we haven't gotten yet. Try to get them
# until at least GET_TIMEOUT_MILLISECONDS milliseconds passes, then
# until at least get_timeout_milliseconds milliseconds passes, then
# repeat.
while len(unready_ids) > 0:
for unready_id in unready_ids:
@ -447,12 +445,15 @@ class Worker(object):
# prolonged period of time in a single call.
object_ids_to_fetch = list(map(
plasma.ObjectID, unready_ids.keys()))
for i in range(0, len(object_ids_to_fetch), fetch_request_size):
for i in range(0, len(object_ids_to_fetch),
ray._config.worker_fetch_request_size()):
self.plasma_client.fetch(
object_ids_to_fetch[i:(i + fetch_request_size)])
object_ids_to_fetch[i:(
i + ray._config.worker_fetch_request_size())])
results = self.retrieve_and_deserialize(
object_ids_to_fetch,
max([GET_TIMEOUT_MILLISECONDS, int(0.01 * len(unready_ids))]))
max([ray._config.get_timeout_milliseconds(),
int(0.01 * len(unready_ids))]))
# Remove any entries for objects we received during this iteration
# so we don't retrieve the same object twice.
for i, val in enumerate(results):

View file

@ -25,13 +25,7 @@ extern "C" {
#include "plasma/common.h"
#include "arrow/util/macros.h"
/** The duration between heartbeats. These are sent by the plasma manager and
* local scheduler. */
#define HEARTBEAT_TIMEOUT_MILLISECONDS 100
/** If a component has not sent a heartbeat in the last NUM_HEARTBEATS_TIMEOUT
* heartbeat intervals, the global scheduler or monitor process will report it
* as dead to the db_client table. */
#define NUM_HEARTBEATS_TIMEOUT 100
#include "state/ray_config.h"
/** Definitions for Ray logging levels. */
#define RAY_COMMON_DEBUG 0

View file

@ -102,10 +102,10 @@ int connect_ipc_sock_retry(const char *socket_pathname,
int64_t timeout) {
/* Pick the default values if the user did not specify. */
if (num_retries < 0) {
num_retries = NUM_CONNECT_ATTEMPTS;
num_retries = RayConfig::instance().num_connect_attempts();
}
if (timeout < 0) {
timeout = CONNECT_TIMEOUT_MS;
timeout = RayConfig::instance().connect_timeout_milliseconds();
}
CHECK(socket_pathname);
@ -163,10 +163,10 @@ int connect_inet_sock_retry(const char *ip_addr,
int64_t timeout) {
/* Pick the default values if the user did not specify. */
if (num_retries < 0) {
num_retries = NUM_CONNECT_ATTEMPTS;
num_retries = RayConfig::instance().num_connect_attempts();
}
if (timeout < 0) {
timeout = CONNECT_TIMEOUT_MS;
timeout = RayConfig::instance().connect_timeout_milliseconds();
}
CHECK(ip_addr);
@ -251,7 +251,7 @@ int write_bytes(int fd, uint8_t *cursor, size_t length) {
}
int write_message(int fd, int64_t type, int64_t length, uint8_t *bytes) {
int64_t version = RAY_PROTOCOL_VERSION;
int64_t version = RayConfig::instance().ray_protocol_version();
int closed;
closed = write_bytes(fd, (uint8_t *) &version, sizeof(version));
if (closed) {
@ -302,7 +302,7 @@ void read_message(int fd, int64_t *type, int64_t *length, uint8_t **bytes) {
if (closed) {
goto disconnected;
}
CHECK(version == RAY_PROTOCOL_VERSION);
CHECK(version == RayConfig::instance().ray_protocol_version());
closed = read_bytes(fd, (uint8_t *) type, sizeof(*type));
if (closed) {
goto disconnected;
@ -359,7 +359,7 @@ int64_t read_vector(int fd, int64_t *type, std::vector<uint8_t> &buffer) {
if (closed) {
goto disconnected;
}
CHECK(version == RAY_PROTOCOL_VERSION);
CHECK(version == RayConfig::instance().ray_protocol_version());
int64_t length;
closed = read_bytes(fd, (uint8_t *) type, sizeof(*type));
if (closed) {

View file

@ -6,16 +6,6 @@
#include <vector>
#define RAY_PROTOCOL_VERSION 0x0000000000000000
/* Number of times we try binding to a socket. */
#define NUM_BIND_ATTEMPTS 5
#define BIND_TIMEOUT_MS 100
/* Number of times we try connecting to a socket. */
#define NUM_CONNECT_ATTEMPTS 50
#define CONNECT_TIMEOUT_MS 100
struct aeEventLoop;
typedef aeEventLoop event_loop;
@ -74,9 +64,10 @@ int connect_ipc_sock(const char *socket_pathname);
* @param socket_pathname The pathname for the socket.
* @param num_retries The number of times to retry the connection
* before exiting. If -1 is provided, then this defaults to
* NUM_CONNECT_ATTEMPTS.
* num_connect_attempts.
* @param timeout The number of milliseconds to wait in between
* retries. If -1 is provided, then this defaults to CONNECT_TIMEOUT_MS.
* retries. If -1 is provided, then this defaults to
* connect_timeout_milliseconds.
* @return A file descriptor for the socket, or -1 if an error occurred.
*/
int connect_ipc_sock_retry(const char *socket_pathname,
@ -102,9 +93,10 @@ int connect_inet_sock(const char *ip_addr, int port);
* @param port The port number to connect to.
* @param num_retries The number of times to retry the connection
* before exiting. If -1 is provided, then this defaults to
* NUM_CONNECT_ATTEMPTS.
* num_connect_attempts.
* @param timeout The number of milliseconds to wait in between
* retries. If -1 is provided, then this defaults to CONNECT_TIMEOUT_MS.
* retries. If -1 is provided, then this defaults to
* connect_timeout_milliseconds.
* @return A file descriptor for the socket, or -1 if an error occurred.
*/
int connect_inet_sock_retry(const char *ip_addr,

View file

@ -507,9 +507,6 @@ PyObject *PyTask_make(TaskSpec *task_spec, int64_t task_size) {
/* Define the methods for the module. */
#define SIZE_LIMIT 100
#define NUM_ELEMENTS_LIMIT 1000
#if PY_MAJOR_VERSION >= 3
#define PyInt_Check PyLong_Check
#endif
@ -531,7 +528,7 @@ PyObject *PyTask_make(TaskSpec *task_spec, int64_t task_size) {
*/
int is_simple_value(PyObject *value, int *num_elements_contained) {
*num_elements_contained += 1;
if (*num_elements_contained >= NUM_ELEMENTS_LIMIT) {
if (*num_elements_contained >= RayConfig::instance().num_elements_limit()) {
return 0;
}
if (PyInt_Check(value) || PyLong_Check(value) || value == Py_False ||
@ -540,21 +537,26 @@ int is_simple_value(PyObject *value, int *num_elements_contained) {
}
if (PyBytes_CheckExact(value)) {
*num_elements_contained += PyBytes_Size(value);
return (*num_elements_contained < NUM_ELEMENTS_LIMIT);
return (*num_elements_contained <
RayConfig::instance().num_elements_limit());
}
if (PyUnicode_CheckExact(value)) {
*num_elements_contained += PyUnicode_GET_SIZE(value);
return (*num_elements_contained < NUM_ELEMENTS_LIMIT);
return (*num_elements_contained <
RayConfig::instance().num_elements_limit());
}
if (PyList_CheckExact(value) && PyList_Size(value) < SIZE_LIMIT) {
if (PyList_CheckExact(value) &&
PyList_Size(value) < RayConfig::instance().size_limit()) {
for (Py_ssize_t i = 0; i < PyList_Size(value); ++i) {
if (!is_simple_value(PyList_GetItem(value, i), num_elements_contained)) {
return 0;
}
}
return (*num_elements_contained < NUM_ELEMENTS_LIMIT);
return (*num_elements_contained <
RayConfig::instance().num_elements_limit());
}
if (PyDict_CheckExact(value) && PyDict_Size(value) < SIZE_LIMIT) {
if (PyDict_CheckExact(value) &&
PyDict_Size(value) < RayConfig::instance().size_limit()) {
PyObject *key, *val;
Py_ssize_t pos = 0;
while (PyDict_Next(value, &pos, &key, &val)) {
@ -563,15 +565,18 @@ int is_simple_value(PyObject *value, int *num_elements_contained) {
return 0;
}
}
return (*num_elements_contained < NUM_ELEMENTS_LIMIT);
return (*num_elements_contained <
RayConfig::instance().num_elements_limit());
}
if (PyTuple_CheckExact(value) && PyTuple_Size(value) < SIZE_LIMIT) {
if (PyTuple_CheckExact(value) &&
PyTuple_Size(value) < RayConfig::instance().size_limit()) {
for (Py_ssize_t i = 0; i < PyTuple_Size(value); ++i) {
if (!is_simple_value(PyTuple_GetItem(value, i), num_elements_contained)) {
return 0;
}
}
return (*num_elements_contained < NUM_ELEMENTS_LIMIT);
return (*num_elements_contained <
RayConfig::instance().num_elements_limit());
}
return 0;
}

View file

@ -49,8 +49,6 @@ PyObject *check_simple_value(PyObject *self, PyObject *args);
PyObject *PyTask_to_string(PyObject *, PyObject *args);
PyObject *PyTask_from_string(PyObject *, PyObject *args);
PyObject *compute_put_id(PyObject *self, PyObject *args);
PyObject *PyTask_make(TaskSpec *task_spec, int64_t task_size);
#endif /* COMMON_EXTENSION_H */

View file

@ -0,0 +1,242 @@
#include <Python.h>
#include "bytesobject.h"
#include "state/ray_config.h"
#include "config_extension.h"
PyObject *PyRayConfig_make() {
PyRayConfig *result = PyObject_New(PyRayConfig, &PyRayConfigType);
result = (PyRayConfig *) PyObject_Init((PyObject *) result, &PyRayConfigType);
return (PyObject *) result;
}
PyObject *PyRayConfig_ray_protocol_version(PyObject *self) {
return PyLong_FromLongLong(RayConfig::instance().ray_protocol_version());
}
PyObject *PyRayConfig_heartbeat_timeout_milliseconds(PyObject *self) {
return PyLong_FromLongLong(
RayConfig::instance().heartbeat_timeout_milliseconds());
}
PyObject *PyRayConfig_num_heartbeats_timeout(PyObject *self) {
return PyLong_FromLongLong(RayConfig::instance().num_heartbeats_timeout());
}
PyObject *PyRayConfig_get_timeout_milliseconds(PyObject *self) {
return PyLong_FromLongLong(RayConfig::instance().get_timeout_milliseconds());
}
PyObject *PyRayConfig_worker_get_request_size(PyObject *self) {
return PyLong_FromLongLong(RayConfig::instance().worker_get_request_size());
}
PyObject *PyRayConfig_worker_fetch_request_size(PyObject *self) {
return PyLong_FromLongLong(RayConfig::instance().worker_fetch_request_size());
}
PyObject *PyRayConfig_num_connect_attempts(PyObject *self) {
return PyLong_FromLongLong(RayConfig::instance().num_connect_attempts());
}
PyObject *PyRayConfig_connect_timeout_milliseconds(PyObject *self) {
return PyLong_FromLongLong(
RayConfig::instance().connect_timeout_milliseconds());
}
PyObject *PyRayConfig_local_scheduler_fetch_timeout_milliseconds(
PyObject *self) {
return PyLong_FromLongLong(
RayConfig::instance().local_scheduler_fetch_timeout_milliseconds());
}
PyObject *PyRayConfig_local_scheduler_reconstruction_timeout_milliseconds(
PyObject *self) {
return PyLong_FromLongLong(
RayConfig::instance()
.local_scheduler_reconstruction_timeout_milliseconds());
}
PyObject *PyRayConfig_max_num_to_reconstruct(PyObject *self) {
return PyLong_FromLongLong(RayConfig::instance().max_num_to_reconstruct());
}
PyObject *PyRayConfig_local_scheduler_fetch_request_size(PyObject *self) {
return PyLong_FromLongLong(
RayConfig::instance().local_scheduler_fetch_request_size());
}
PyObject *PyRayConfig_kill_worker_timeout_milliseconds(PyObject *self) {
return PyLong_FromLongLong(
RayConfig::instance().kill_worker_timeout_milliseconds());
}
PyObject *PyRayConfig_default_num_CPUs(PyObject *self) {
return PyFloat_FromDouble(RayConfig::instance().default_num_CPUs());
}
PyObject *PyRayConfig_default_num_GPUs(PyObject *self) {
return PyFloat_FromDouble(RayConfig::instance().default_num_GPUs());
}
PyObject *PyRayConfig_default_num_custom_resource(PyObject *self) {
return PyFloat_FromDouble(
RayConfig::instance().default_num_custom_resource());
}
PyObject *PyRayConfig_manager_timeout_milliseconds(PyObject *self) {
return PyLong_FromLongLong(
RayConfig::instance().manager_timeout_milliseconds());
}
PyObject *PyRayConfig_buf_size(PyObject *self) {
return PyLong_FromLongLong(RayConfig::instance().buf_size());
}
PyObject *PyRayConfig_max_time_for_handler_milliseconds(PyObject *self) {
return PyLong_FromLongLong(
RayConfig::instance().max_time_for_handler_milliseconds());
}
PyObject *PyRayConfig_size_limit(PyObject *self) {
return PyLong_FromLongLong(RayConfig::instance().size_limit());
}
PyObject *PyRayConfig_num_elements_limit(PyObject *self) {
return PyLong_FromLongLong(RayConfig::instance().num_elements_limit());
}
PyObject *PyRayConfig_max_time_for_loop(PyObject *self) {
return PyLong_FromLongLong(RayConfig::instance().max_time_for_loop());
}
PyObject *PyRayConfig_redis_db_connect_retries(PyObject *self) {
return PyLong_FromLongLong(RayConfig::instance().redis_db_connect_retries());
}
PyObject *PyRayConfig_redis_db_connect_wait_milliseconds(PyObject *self) {
return PyLong_FromLongLong(
RayConfig::instance().redis_db_connect_wait_milliseconds());
}
PyObject *PyRayConfig_plasma_default_release_delay(PyObject *self) {
return PyLong_FromLongLong(
RayConfig::instance().plasma_default_release_delay());
}
PyObject *PyRayConfig_L3_cache_size_bytes(PyObject *self) {
return PyLong_FromLongLong(RayConfig::instance().L3_cache_size_bytes());
}
static PyMethodDef PyRayConfig_methods[] = {
{"ray_protocol_version", (PyCFunction) PyRayConfig_ray_protocol_version,
METH_NOARGS, "Return ray_protocol_version"},
{"heartbeat_timeout_milliseconds",
(PyCFunction) PyRayConfig_heartbeat_timeout_milliseconds, METH_NOARGS,
"Return heartbeat_timeout_milliseconds"},
{"num_heartbeats_timeout", (PyCFunction) PyRayConfig_num_heartbeats_timeout,
METH_NOARGS, "Return num_heartbeats_timeout"},
{"get_timeout_milliseconds",
(PyCFunction) PyRayConfig_get_timeout_milliseconds, METH_NOARGS,
"Return get_timeout_milliseconds"},
{"worker_get_request_size",
(PyCFunction) PyRayConfig_worker_get_request_size, METH_NOARGS,
"Return worker_get_request_size"},
{"worker_fetch_request_size",
(PyCFunction) PyRayConfig_worker_fetch_request_size, METH_NOARGS,
"Return worker_fetch_request_size"},
{"num_connect_attempts", (PyCFunction) PyRayConfig_num_connect_attempts,
METH_NOARGS, "Return num_connect_attempts"},
{"connect_timeout_milliseconds",
(PyCFunction) PyRayConfig_connect_timeout_milliseconds, METH_NOARGS,
"Return connect_timeout_milliseconds"},
{"local_scheduler_fetch_timeout_milliseconds",
(PyCFunction) PyRayConfig_local_scheduler_fetch_timeout_milliseconds,
METH_NOARGS, "Return local_scheduler_fetch_timeout_milliseconds"},
{"local_scheduler_reconstruction_timeout_milliseconds",
(PyCFunction)
PyRayConfig_local_scheduler_reconstruction_timeout_milliseconds,
METH_NOARGS, "Return local_scheduler_reconstruction_timeout_milliseconds"},
{"max_num_to_reconstruct", (PyCFunction) PyRayConfig_max_num_to_reconstruct,
METH_NOARGS, "Return max_num_to_reconstruct"},
{"local_scheduler_fetch_request_size",
(PyCFunction) PyRayConfig_local_scheduler_fetch_request_size, METH_NOARGS,
"Return local_scheduler_fetch_request_size"},
{"kill_worker_timeout_milliseconds",
(PyCFunction) PyRayConfig_kill_worker_timeout_milliseconds, METH_NOARGS,
"Return kill_worker_timeout_milliseconds"},
{"default_num_CPUs", (PyCFunction) PyRayConfig_default_num_CPUs,
METH_NOARGS, "Return default_num_CPUs"},
{"default_num_GPUs", (PyCFunction) PyRayConfig_default_num_GPUs,
METH_NOARGS, "Return default_num_GPUs"},
{"default_num_custom_resource",
(PyCFunction) PyRayConfig_default_num_custom_resource, METH_NOARGS,
"Return default_num_custom_resource"},
{"manager_timeout_milliseconds",
(PyCFunction) PyRayConfig_manager_timeout_milliseconds, METH_NOARGS,
"Return manager_timeout_milliseconds"},
{"buf_size", (PyCFunction) PyRayConfig_buf_size, METH_NOARGS,
"Return buf_size"},
{"max_time_for_handler_milliseconds",
(PyCFunction) PyRayConfig_max_time_for_handler_milliseconds, METH_NOARGS,
"Return max_time_for_handler_milliseconds"},
{"size_limit", (PyCFunction) PyRayConfig_size_limit, METH_NOARGS,
"Return size_limit"},
{"num_elements_limit", (PyCFunction) PyRayConfig_num_elements_limit,
METH_NOARGS, "Return num_elements_limit"},
{"max_time_for_loop", (PyCFunction) PyRayConfig_max_time_for_loop,
METH_NOARGS, "Return max_time_for_loop"},
{"redis_db_connect_retries",
(PyCFunction) PyRayConfig_redis_db_connect_retries, METH_NOARGS,
"Return redis_db_connect_retries"},
{"redis_db_connect_wait_milliseconds",
(PyCFunction) PyRayConfig_redis_db_connect_wait_milliseconds, METH_NOARGS,
"Return redis_db_connect_wait_milliseconds"},
{"plasma_default_release_delay",
(PyCFunction) PyRayConfig_plasma_default_release_delay, METH_NOARGS,
"Return plasma_default_release_delay"},
{"L3_cache_size_bytes", (PyCFunction) PyRayConfig_L3_cache_size_bytes,
METH_NOARGS, "Return L3_cache_size_bytes"},
{NULL} /* Sentinel */
};
PyTypeObject PyRayConfigType = {
PyVarObject_HEAD_INIT(NULL, 0) /* ob_size */
"common.RayConfig", /* tp_name */
sizeof(PyRayConfig), /* tp_basicsize */
0, /* tp_itemsize */
0, /* tp_dealloc */
0, /* tp_print */
0, /* tp_getattr */
0, /* tp_setattr */
0, /* tp_compare */
0, /* tp_repr */
0, /* tp_as_number */
0, /* tp_as_sequence */
0, /* tp_as_mapping */
0, /* tp_hash */
0, /* tp_call */
0, /* tp_str */
0, /* tp_getattro */
0, /* tp_setattro */
0, /* tp_as_buffer */
Py_TPFLAGS_DEFAULT, /* tp_flags */
"RayConfig object", /* tp_doc */
0, /* tp_traverse */
0, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iternext */
PyRayConfig_methods, /* tp_methods */
0, /* tp_members */
0, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
0, /* tp_init */
0, /* tp_alloc */
PyType_GenericNew, /* tp_new */
};

View file

@ -0,0 +1,48 @@
#ifndef CONFIG_EXTENSION_H
#define CONFIG_EXTENSION_H
#include <Python.h>
#include "common.h"
// clang-format off
typedef struct {
PyObject_HEAD
} PyRayConfig;
// clang-format on
extern PyTypeObject PyRayConfigType;
/* Create a PyRayConfig from C++. */
PyObject *PyRayConfig_make();
PyObject *PyRayConfig_ray_protocol_version(PyObject *self);
PyObject *PyRayConfig_heartbeat_timeout_milliseconds(PyObject *self);
PyObject *PyRayConfig_num_heartbeats_timeout(PyObject *self);
PyObject *PyRayConfig_get_timeout_milliseconds(PyObject *self);
PyObject *PyRayConfig_worker_get_request_size(PyObject *self);
PyObject *PyRayConfig_worker_fetch_request_size(PyObject *self);
PyObject *PyRayConfig_num_connect_attempts(PyObject *self);
PyObject *PyRayConfig_connect_timeout_milliseconds(PyObject *self);
PyObject *PyRayConfig_local_scheduler_fetch_timeout_milliseconds(
PyObject *self);
PyObject *PyRayConfig_local_scheduler_reconstruction_timeout_milliseconds(
PyObject *self);
PyObject *PyRayConfig_max_num_to_reconstruct(PyObject *self);
PyObject *PyRayConfig_local_scheduler_fetch_request_size(PyObject *self);
PyObject *PyRayConfig_kill_worker_timeout_milliseconds(PyObject *self);
PyObject *PyRayConfig_default_num_CPUs(PyObject *self);
PyObject *PyRayConfig_default_num_GPUs(PyObject *self);
PyObject *PyRayConfig_default_num_custom_resource(PyObject *self);
PyObject *PyRayConfig_manager_timeout_milliseconds(PyObject *self);
PyObject *PyRayConfig_buf_size(PyObject *self);
PyObject *PyRayConfig_max_time_for_handler_milliseconds(PyObject *self);
PyObject *PyRayConfig_size_limit(PyObject *self);
PyObject *PyRayConfig_num_elements_limit(PyObject *self);
PyObject *PyRayConfig_max_time_for_loop(PyObject *self);
PyObject *PyRayConfig_redis_db_connect_retries(PyObject *self);
PyObject *PyRayConfig_redis_db_connect_wait_milliseconds(PyObject *self);
PyObject *PyRayConfig_plasma_default_release_delay(PyObject *self);
PyObject *PyRayConfig_L3_cache_size_bytes(PyObject *self);
#endif /* CONFIG_EXTENSION_H */

View file

@ -31,7 +31,8 @@ void db_client_table_subscribe(
void plasma_manager_send_heartbeat(DBHandle *db_handle) {
RetryInfo heartbeat_retry;
heartbeat_retry.num_retries = 0;
heartbeat_retry.timeout = HEARTBEAT_TIMEOUT_MILLISECONDS;
heartbeat_retry.timeout =
RayConfig::instance().heartbeat_timeout_milliseconds();
heartbeat_retry.fail_callback = NULL;
init_table_callback(db_handle, NIL_ID, __func__, NULL,

View file

@ -84,7 +84,7 @@ typedef struct {
* Start sending heartbeats to the plasma_managers channel. Each
* heartbeat contains this database client's ID. Heartbeats can be subscribed
* to through the plasma_managers channel. Once called, this "retries" the
* heartbeat operation forever, every HEARTBEAT_TIMEOUT_MILLISECONDS
* heartbeat operation forever, every heartbeat_timeout_milliseconds
* milliseconds.
*
* @param db_handle Database handle.

View file

@ -0,0 +1,195 @@
#ifndef RAY_CONFIG_H
#define RAY_CONFIG_H
#include <math.h>
#include <stdint.h>
class RayConfig {
public:
static RayConfig &instance() {
static RayConfig config;
return config;
}
int64_t ray_protocol_version() const { return ray_protocol_version_; }
int64_t heartbeat_timeout_milliseconds() const {
return heartbeat_timeout_milliseconds_;
}
int64_t num_heartbeats_timeout() const { return num_heartbeats_timeout_; }
int64_t get_timeout_milliseconds() const { return get_timeout_milliseconds_; }
int64_t worker_get_request_size() const { return worker_get_request_size_; }
int64_t worker_fetch_request_size() const {
return worker_fetch_request_size_;
}
int64_t num_connect_attempts() const { return num_connect_attempts_; }
int64_t connect_timeout_milliseconds() const {
return connect_timeout_milliseconds_;
}
int64_t local_scheduler_fetch_timeout_milliseconds() const {
return local_scheduler_fetch_timeout_milliseconds_;
}
int64_t local_scheduler_reconstruction_timeout_milliseconds() const {
return local_scheduler_reconstruction_timeout_milliseconds_;
}
int64_t max_num_to_reconstruct() const { return max_num_to_reconstruct_; }
int64_t local_scheduler_fetch_request_size() const {
return local_scheduler_fetch_request_size_;
}
int64_t kill_worker_timeout_milliseconds() const {
return kill_worker_timeout_milliseconds_;
}
double default_num_CPUs() const { return default_num_CPUs_; }
double default_num_GPUs() const { return default_num_GPUs_; }
double default_num_custom_resource() const {
return default_num_custom_resource_;
}
int64_t manager_timeout_milliseconds() const {
return manager_timeout_milliseconds_;
}
int64_t buf_size() const { return buf_size_; }
int64_t max_time_for_handler_milliseconds() const {
return max_time_for_handler_milliseconds_;
}
int64_t size_limit() const { return size_limit_; }
int64_t num_elements_limit() const { return num_elements_limit_; }
int64_t max_time_for_loop() const { return max_time_for_loop_; }
int64_t redis_db_connect_retries() const { return redis_db_connect_retries_; }
int64_t redis_db_connect_wait_milliseconds() const {
return redis_db_connect_wait_milliseconds_;
};
int64_t plasma_default_release_delay() const {
return plasma_default_release_delay_;
}
int64_t L3_cache_size_bytes() const { return L3_cache_size_bytes_; }
private:
RayConfig()
: ray_protocol_version_(0x0000000000000000),
heartbeat_timeout_milliseconds_(100),
num_heartbeats_timeout_(100),
get_timeout_milliseconds_(1000),
worker_get_request_size_(10000),
worker_fetch_request_size_(10000),
num_connect_attempts_(50),
connect_timeout_milliseconds_(100),
local_scheduler_fetch_timeout_milliseconds_(1000),
local_scheduler_reconstruction_timeout_milliseconds_(1000),
max_num_to_reconstruct_(10000),
local_scheduler_fetch_request_size_(10000),
kill_worker_timeout_milliseconds_(100),
default_num_CPUs_(INT16_MAX),
default_num_GPUs_(0),
default_num_custom_resource_(INFINITY),
manager_timeout_milliseconds_(1000),
buf_size_(4096),
max_time_for_handler_milliseconds_(1000),
size_limit_(100),
num_elements_limit_(1000),
max_time_for_loop_(1000),
redis_db_connect_retries_(50),
redis_db_connect_wait_milliseconds_(100),
plasma_default_release_delay_(64),
L3_cache_size_bytes_(100000000) {}
~RayConfig() {}
/// In theory, this is used to detect Ray version mismatches.
int64_t ray_protocol_version_;
/// The duration between heartbeats. These are sent by the plasma manager and
/// local scheduler.
int64_t heartbeat_timeout_milliseconds_;
/// If a component has not sent a heartbeat in the last num_heartbeats_timeout
/// heartbeat intervals, the global scheduler or monitor process will report
/// it as dead to the db_client table.
int64_t num_heartbeats_timeout_;
/// These are used by the worker to set timeouts and to batch requests when
/// getting objects.
int64_t get_timeout_milliseconds_;
int64_t worker_get_request_size_;
int64_t worker_fetch_request_size_;
/// Number of times we try connecting to a socket.
int64_t num_connect_attempts_;
int64_t connect_timeout_milliseconds_;
/// The duration that the local scheduler will wait before reinitiating a
/// fetch request for a missing task dependency. This time may adapt based on
/// the number of missing task dependencies.
int64_t local_scheduler_fetch_timeout_milliseconds_;
/// The duration that the local scheduler will wait between initiating
/// reconstruction calls for missing task dependencies. If there are many
/// missing task dependencies, we will only iniate reconstruction calls for
/// some of them each time.
int64_t local_scheduler_reconstruction_timeout_milliseconds_;
/// The maximum number of objects that the local scheduler will issue
/// reconstruct calls for in a single pass through the reconstruct object
/// timeout handler.
int64_t max_num_to_reconstruct_;
/// The maximum number of objects to include in a single fetch request in the
/// regular local scheduler fetch timeout handler.
int64_t local_scheduler_fetch_request_size_;
/// The duration that we wait after sending a worker SIGTERM before sending
/// the worker SIGKILL.
int64_t kill_worker_timeout_milliseconds_;
/// These are used to determine the local scheduler's behavior with respect to
/// different types of resources.
double default_num_CPUs_;
double default_num_GPUs_;
double default_num_custom_resource_;
/// These are used by the plasma manager.
int64_t manager_timeout_milliseconds_;
int64_t buf_size_;
/// This is a timeout used to cause failures in the plasma manager and local
/// scheduler when certain event loop handlers take too long.
int64_t max_time_for_handler_milliseconds_;
/// This is used by the Python extension when serializing objects as part of
/// a task spec.
int64_t size_limit_;
int64_t num_elements_limit_;
/// This is used to cause failures when a certain loop in redis.cc which
/// synchronously looks up object manager addresses in redis is slow.
int64_t max_time_for_loop_;
/// Allow up to 5 seconds for connecting to Redis.
int64_t redis_db_connect_retries_;
int64_t redis_db_connect_wait_milliseconds_;
/// TODO(rkn): These constants are currently unused.
int64_t plasma_default_release_delay_;
int64_t L3_cache_size_bytes_;
};
#endif // RAY_CONFIG_H

View file

@ -98,7 +98,7 @@ void get_redis_shards(redisContext *context,
/* Get the total number of Redis shards in the system. */
int num_attempts = 0;
redisReply *reply = NULL;
while (num_attempts < REDIS_DB_CONNECT_RETRIES) {
while (num_attempts < RayConfig::instance().redis_db_connect_retries()) {
/* Try to read the number of Redis shards from the primary shard. If the
* entry is present, exit. */
reply = (redisReply *) redisCommand(context, "GET NumRedisShards");
@ -108,11 +108,11 @@ void get_redis_shards(redisContext *context,
/* Sleep for a little, and try again if the entry isn't there yet. */
freeReplyObject(reply);
usleep(REDIS_DB_CONNECT_WAIT_MS * 1000);
usleep(RayConfig::instance().redis_db_connect_wait_milliseconds() * 1000);
num_attempts++;
continue;
}
CHECKM(num_attempts < REDIS_DB_CONNECT_RETRIES,
CHECKM(num_attempts < RayConfig::instance().redis_db_connect_retries(),
"No entry found for NumRedisShards");
CHECKM(reply->type == REDIS_REPLY_STRING,
"Expected string, found Redis type %d for NumRedisShards",
@ -124,7 +124,7 @@ void get_redis_shards(redisContext *context,
/* Get the addresses of all of the Redis shards. */
num_attempts = 0;
while (num_attempts < REDIS_DB_CONNECT_RETRIES) {
while (num_attempts < RayConfig::instance().redis_db_connect_retries()) {
/* Try to read the Redis shard locations from the primary shard. If we find
* that all of them are present, exit. */
reply = (redisReply *) redisCommand(context, "LRANGE RedisShards 0 -1");
@ -135,11 +135,11 @@ void get_redis_shards(redisContext *context,
/* Sleep for a little, and try again if not all Redis shard addresses have
* been added yet. */
freeReplyObject(reply);
usleep(REDIS_DB_CONNECT_WAIT_MS * 1000);
usleep(RayConfig::instance().redis_db_connect_wait_milliseconds() * 1000);
num_attempts++;
continue;
}
CHECKM(num_attempts < REDIS_DB_CONNECT_RETRIES,
CHECKM(num_attempts < RayConfig::instance().redis_db_connect_retries(),
"Expected %d Redis shard addresses, found %d", num_redis_shards,
(int) reply->elements);
@ -173,12 +173,13 @@ void db_connect_shard(const std::string &db_address,
int connection_attempts = 0;
redisContext *sync_context = redisConnect(db_address.c_str(), db_port);
while (sync_context == NULL || sync_context->err) {
if (connection_attempts >= REDIS_DB_CONNECT_RETRIES) {
if (connection_attempts >=
RayConfig::instance().redis_db_connect_retries()) {
break;
}
LOG_WARN("Failed to connect to Redis, retrying.");
/* Sleep for a little. */
usleep(REDIS_DB_CONNECT_WAIT_MS * 1000);
usleep(RayConfig::instance().redis_db_connect_wait_milliseconds() * 1000);
sync_context = redisConnect(db_address.c_str(), db_port);
connection_attempts += 1;
}
@ -643,8 +644,7 @@ const std::vector<std::string> redis_get_cached_db_clients(
}
int64_t end_time = current_time_ms();
int64_t max_time_for_loop = 1000;
if (end_time - start_time > max_time_for_loop) {
if (end_time - start_time > RayConfig::instance().max_time_for_loop()) {
LOG_WARN(
"calling redis_get_cached_db_client in a loop in with %zu manager IDs "
"took %" PRId64 " milliseconds.",
@ -1515,7 +1515,7 @@ void redis_plasma_manager_send_heartbeat(TableCallbackData *callback_data) {
DBHandle *db = callback_data->db_handle;
/* NOTE(swang): We purposefully do not provide a callback, leaving the table
* operation and timer active. This allows us to send a new heartbeat every
* HEARTBEAT_TIMEOUT_MILLISECONDS without having to allocate and deallocate
* heartbeat_timeout_milliseconds without having to allocate and deallocate
* memory for callback data each time. */
int status = redisAsyncCommand(
db->context, NULL, (void *) callback_data->timer_id,

View file

@ -10,10 +10,6 @@
#include "hiredis/hiredis.h"
#include "hiredis/async.h"
/* Allow up to 5 seconds for connecting to Redis. */
#define REDIS_DB_CONNECT_RETRIES 50
#define REDIS_DB_CONNECT_WAIT_MS 100
#define LOG_REDIS_ERROR(context, M, ...) \
LOG_ERROR("Redis error %d %s; %s", context->err, context->errstr, M)

View file

@ -348,7 +348,8 @@ void local_scheduler_table_handler(DBClientID client_id,
/* The local scheduler is exiting. Increase the number of heartbeats
* missed to the timeout threshold. This will trigger removal of the
* local scheduler the next time the timeout handler fires. */
it->second.num_heartbeats_missed = NUM_HEARTBEATS_TIMEOUT;
it->second.num_heartbeats_missed =
RayConfig::instance().num_heartbeats_timeout();
} else {
/* Reset the number of tasks sent since the last heartbeat. */
LocalScheduler &local_scheduler = it->second;
@ -392,7 +393,8 @@ int heartbeat_timeout_handler(event_loop *loop, timer_id id, void *context) {
* clean up its state and exit upon receiving this notification. */
auto it = state->local_schedulers.begin();
while (it != state->local_schedulers.end()) {
if (it->second.num_heartbeats_missed >= NUM_HEARTBEATS_TIMEOUT) {
if (it->second.num_heartbeats_missed >=
RayConfig::instance().num_heartbeats_timeout()) {
LOG_WARN(
"Missed too many heartbeats from local scheduler, marking as dead.");
/* Notify others by updating the global state. */
@ -408,7 +410,7 @@ int heartbeat_timeout_handler(event_loop *loop, timer_id id, void *context) {
}
/* Reset the timer. */
return HEARTBEAT_TIMEOUT_MILLISECONDS;
return RayConfig::instance().heartbeat_timeout_milliseconds();
}
void start_server(const char *node_ip_address,
@ -446,7 +448,8 @@ void start_server(const char *node_ip_address,
* timer should notice and schedule the task. */
event_loop_add_timer(loop, GLOBAL_SCHEDULER_TASK_CLEANUP_MILLISECONDS,
task_cleanup_handler, g_state);
event_loop_add_timer(loop, HEARTBEAT_TIMEOUT_MILLISECONDS,
event_loop_add_timer(loop,
RayConfig::instance().heartbeat_timeout_milliseconds(),
heartbeat_timeout_handler, g_state);
/* Start the event loop. */
event_loop_run(loop);

View file

@ -53,7 +53,8 @@ add_dependencies(gen_local_scheduler_fbs flatbuffers_ep)
add_library(local_scheduler_library SHARED
local_scheduler_extension.cc
../common/lib/python/common_extension.cc)
../common/lib/python/common_extension.cc
../common/lib/python/config_extension.cc)
add_library(local_scheduler_client STATIC local_scheduler_client.cc)

View file

@ -102,8 +102,9 @@ void kill_worker(LocalSchedulerState *state,
* up its state before force killing. The client socket will be closed
* and the worker struct will be freed after the timeout. */
kill(worker->pid, SIGTERM);
event_loop_add_timer(state->loop, KILL_WORKER_TIMEOUT_MILLISECONDS,
force_kill_worker, (void *) worker);
event_loop_add_timer(
state->loop, RayConfig::instance().kill_worker_timeout_milliseconds(),
force_kill_worker, (void *) worker);
free_worker = false;
}
LOG_DEBUG("Killed worker with pid %d", worker->pid);
@ -1063,8 +1064,8 @@ void process_message(event_loop *loop,
/* Print a warning if this method took too long. */
int64_t end_time = current_time_ms();
int64_t max_time_for_handler = 1000;
if (end_time - start_time > max_time_for_handler) {
if (end_time - start_time >
RayConfig::instance().max_time_for_handler_milliseconds()) {
LOG_WARN("process_message of type %" PRId64 " took %" PRId64
" milliseconds.",
type, end_time - start_time);
@ -1221,7 +1222,8 @@ int heartbeat_handler(event_loop *loop, timer_id id, void *context) {
int64_t current_time = current_time_ms();
CHECK(current_time >= state->previous_heartbeat_time);
if (current_time - state->previous_heartbeat_time >
NUM_HEARTBEATS_TIMEOUT * HEARTBEAT_TIMEOUT_MILLISECONDS) {
RayConfig::instance().num_heartbeats_timeout() *
RayConfig::instance().heartbeat_timeout_milliseconds()) {
LOG_FATAL("The last heartbeat was sent %" PRId64 " milliseconds ago.",
current_time - state->previous_heartbeat_time);
}
@ -1233,7 +1235,7 @@ int heartbeat_handler(event_loop *loop, timer_id id, void *context) {
/* Publish the heartbeat to all subscribers of the local scheduler table. */
local_scheduler_table_send_info(state->db, &info, NULL);
/* Reset the timer. */
return HEARTBEAT_TIMEOUT_MILLISECONDS;
return RayConfig::instance().heartbeat_timeout_milliseconds();
}
void start_server(const char *node_ip_address,
@ -1286,16 +1288,20 @@ void start_server(const char *node_ip_address,
* scheduler to the local scheduler table. This message also serves as a
* heartbeat. */
if (g_state->db != NULL) {
event_loop_add_timer(loop, HEARTBEAT_TIMEOUT_MILLISECONDS,
event_loop_add_timer(loop,
RayConfig::instance().heartbeat_timeout_milliseconds(),
heartbeat_handler, g_state);
}
/* Create a timer for fetching queued tasks' missing object dependencies. */
event_loop_add_timer(loop, kLocalSchedulerFetchTimeoutMilliseconds,
fetch_object_timeout_handler, g_state);
event_loop_add_timer(
loop, RayConfig::instance().local_scheduler_fetch_timeout_milliseconds(),
fetch_object_timeout_handler, g_state);
/* Create a timer for initiating the reconstruction of tasks' missing object
* dependencies. */
event_loop_add_timer(loop, kLocalSchedulerReconstructionTimeoutMilliseconds,
reconstruct_object_timeout_handler, g_state);
event_loop_add_timer(
loop, RayConfig::instance()
.local_scheduler_reconstruction_timeout_milliseconds(),
reconstruct_object_timeout_handler, g_state);
/* Run event loop. */
event_loop_run(loop);
}
@ -1368,10 +1374,12 @@ int main(int argc, char *argv[]) {
memset(&static_resource_conf[0], 0, sizeof(static_resource_conf));
/* TODO(atumanov): Define a default vector and replace individual
* constants. */
static_resource_conf[ResourceIndex_CPU] = kDefaultNumCPUs;
static_resource_conf[ResourceIndex_GPU] = kDefaultNumGPUs;
static_resource_conf[ResourceIndex_CPU] =
RayConfig::instance().default_num_CPUs();
static_resource_conf[ResourceIndex_GPU] =
RayConfig::instance().default_num_GPUs();
static_resource_conf[ResourceIndex_CustomResource] =
kDefaultNumCustomResource;
RayConfig::instance().default_num_custom_resource();
} else {
/* TODO(atumanov): Switch this tokenizer to reading from ifstream. */
/* Tokenize the string. */
@ -1388,7 +1396,7 @@ int main(int argc, char *argv[]) {
/* Interpret negative values for the custom resource as deferring to the
* default system configuration. */
static_resource_conf[ResourceIndex_CustomResource] =
kDefaultNumCustomResource;
RayConfig::instance().default_num_custom_resource();
}
}
if (!scheduler_socket_name) {

View file

@ -1,18 +1,9 @@
#ifndef LOCAL_SCHEDULER_H
#define LOCAL_SCHEDULER_H
#include <math.h>
#include "task.h"
#include "event_loop.h"
/* The duration that we wait after sending a worker SIGTERM before sending the
* worker SIGKILL. */
#define KILL_WORKER_TIMEOUT_MILLISECONDS 100
constexpr double kDefaultNumCPUs = INT16_MAX;
constexpr double kDefaultNumGPUs = 0;
constexpr double kDefaultNumCustomResource = INFINITY;
/**
* Establish a connection to a new client.
*

View file

@ -122,7 +122,7 @@ struct SchedulingAlgorithmState {
std::unordered_map<ObjectID, ObjectEntry, UniqueIDHasher> local_objects;
/** A hash map of the objects that are not available locally. These are
* currently being fetched by this local scheduler. The key is the object
* ID. Every kLocalSchedulerFetchTimeoutMilliseconds, a Plasma fetch
* ID. Every local_scheduler_fetch_timeout_milliseconds, a Plasma fetch
* request will be sent the object IDs in this table. Each entry also holds
* an array of queued tasks that are dependent on it. */
std::unordered_map<ObjectID, ObjectEntry, UniqueIDHasher> remote_objects;
@ -516,7 +516,7 @@ void queue_actor_task(LocalSchedulerState *state,
/**
* Fetch a queued task's missing object dependency. The fetch request will be
* retried every kLocalSchedulerFetchTimeoutMilliseconds until the object is
* retried every local_scheduler_fetch_timeout_milliseconds until the object is
* available locally.
*
* @param state The scheduler state.
@ -567,7 +567,7 @@ void fetch_missing_dependency(LocalSchedulerState *state,
/**
* Fetch a queued task's missing object dependencies. The fetch requests will
* be retried every kLocalSchedulerFetchTimeoutMilliseconds until all
* be retried every local_scheduler_fetch_timeout_milliseconds until all
* objects are available locally.
*
* @param state The scheduler state.
@ -629,7 +629,7 @@ int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context) {
/* Only try the fetches if we are connected to the object store manager. */
if (state->plasma_conn->get_manager_fd() == -1) {
LOG_INFO("Local scheduler is not connected to a object store manager");
return kLocalSchedulerFetchTimeoutMilliseconds;
return RayConfig::instance().local_scheduler_fetch_timeout_milliseconds();
}
std::vector<ObjectID> object_id_vec;
@ -644,10 +644,13 @@ int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context) {
/* Divide very large fetch requests into smaller fetch requests so that a
* single fetch request doesn't block the plasma manager for a long time. */
int64_t fetch_request_size = 10000;
for (int64_t j = 0; j < num_object_ids; j += fetch_request_size) {
for (int64_t j = 0; j < num_object_ids;
j += RayConfig::instance().local_scheduler_fetch_request_size()) {
int num_objects_in_request =
std::min(num_object_ids, j + fetch_request_size) - j;
std::min(
num_object_ids,
j + RayConfig::instance().local_scheduler_fetch_request_size()) -
j;
auto arrow_status = state->plasma_conn->Fetch(
num_objects_in_request,
reinterpret_cast<plasma::ObjectID *>(&object_ids[j]));
@ -662,18 +665,19 @@ int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context) {
/* Print a warning if this method took too long. */
int64_t end_time = current_time_ms();
int64_t max_time_for_handler = 1000;
if (end_time - start_time > max_time_for_handler) {
if (end_time - start_time >
RayConfig::instance().max_time_for_handler_milliseconds()) {
LOG_WARN("fetch_object_timeout_handler took %" PRId64 " milliseconds.",
end_time - start_time);
}
/* Wait at least kLocalSchedulerFetchTimeoutMilliseconds before running
/* Wait at least local_scheduler_fetch_timeout_milliseconds before running
* this timeout handler again. But if we're waiting for a large number of
* objects, wait longer (e.g., 10 seconds for one million objects) so that we
* don't overwhelm the plasma manager. */
return std::max(kLocalSchedulerFetchTimeoutMilliseconds,
int64_t(0.01 * num_object_ids));
return std::max(
RayConfig::instance().local_scheduler_fetch_timeout_milliseconds(),
int64_t(0.01 * num_object_ids));
}
/* TODO(swang): This method is not covered by any valgrind tests. */
@ -687,8 +691,8 @@ int reconstruct_object_timeout_handler(event_loop *loop,
/* This vector is used to track which object IDs to reconstruct next. If the
* vector is empty, we repopulate it with all of the keys of the remote object
* table. During every pass through this handler, we call reconstruct on up to
* 10000 elements of the vector (after first checking that the object IDs are
* still missing). */
* max_num_to_reconstruct elements of the vector (after first checking that
* the object IDs are still missing). */
static std::vector<ObjectID> object_ids_to_reconstruct;
/* If the set is empty, repopulate it. */
@ -698,7 +702,6 @@ int reconstruct_object_timeout_handler(event_loop *loop,
}
}
int64_t max_num_to_reconstruct = 10000;
int64_t num_reconstructed = 0;
for (size_t i = 0; i < object_ids_to_reconstruct.size(); i++) {
ObjectID object_id = object_ids_to_reconstruct[i];
@ -708,7 +711,7 @@ int reconstruct_object_timeout_handler(event_loop *loop,
reconstruct_object(state, object_id);
}
num_reconstructed++;
if (num_reconstructed == max_num_to_reconstruct) {
if (num_reconstructed == RayConfig::instance().max_num_to_reconstruct()) {
break;
}
}
@ -718,14 +721,15 @@ int reconstruct_object_timeout_handler(event_loop *loop,
/* Print a warning if this method took too long. */
int64_t end_time = current_time_ms();
int64_t max_time_for_handler = 1000;
if (end_time - start_time > max_time_for_handler) {
if (end_time - start_time >
RayConfig::instance().max_time_for_handler_milliseconds()) {
LOG_WARN("reconstruct_object_timeout_handler took %" PRId64
" milliseconds.",
end_time - start_time);
}
return kLocalSchedulerReconstructionTimeoutMilliseconds;
return RayConfig::instance()
.local_scheduler_reconstruction_timeout_milliseconds();
}
/**

View file

@ -5,16 +5,6 @@
#include "common/task.h"
#include "state/local_scheduler_table.h"
/* The duration that the local scheduler will wait before reinitiating a fetch
* request for a missing task dependency. This time may adapt based on the
* number of missing task dependencies. */
constexpr int64_t kLocalSchedulerFetchTimeoutMilliseconds = 1000;
/* The duration that the local scheduler will wait between initiating
* reconstruction calls for missing task dependencies. If there are many missing
* task dependencies, we will only iniate reconstruction calls for some of them
* each time. */
constexpr int64_t kLocalSchedulerReconstructionTimeoutMilliseconds = 1000;
/* ==== The scheduling algorithm ====
*
* This file contains declaration for all functions and data structures
@ -282,7 +272,7 @@ void handle_driver_removed(LocalSchedulerState *state,
/**
* This function fetches queued task's missing object dependencies. It is
* called every kLocalSchedulerFetchTimeoutMilliseconds.
* called every local_scheduler_fetch_timeout_milliseconds.
*
* @param loop The local scheduler's event loop.
* @param id The ID of the timer that triggers this function.
@ -295,7 +285,7 @@ int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context);
/**
* This function initiates reconstruction for task's missing object
* dependencies. It is called every
* kLocalSchedulerReconstructionTimeoutMilliseconds, but it may not initiate
* local_scheduler_reconstruction_timeout_milliseconds, but it may not initiate
* reconstruction for every missing object.
*
* @param loop The local scheduler's event loop.

View file

@ -1,6 +1,7 @@
#include <Python.h>
#include "common_extension.h"
#include "config_extension.h"
#include "local_scheduler_client.h"
#include "task.h"
@ -260,6 +261,10 @@ MOD_INIT(liblocal_scheduler_library) {
INITERROR;
}
if (PyType_Ready(&PyRayConfigType) < 0) {
INITERROR;
}
#if PY_MAJOR_VERSION >= 3
PyObject *m = PyModule_Create(&moduledef);
#else
@ -287,6 +292,14 @@ MOD_INIT(liblocal_scheduler_library) {
Py_INCREF(LocalSchedulerError);
PyModule_AddObject(m, "local_scheduler_error", LocalSchedulerError);
Py_INCREF(&PyRayConfigType);
PyModule_AddObject(m, "RayConfig", (PyObject *) &PyRayConfigType);
/* Create the global config object. */
PyObject *config = PyRayConfig_make();
/* TODO(rkn): Do we need Py_INCREF(config)? */
PyModule_AddObject(m, "_config", config);
#if PY_MAJOR_VERSION >= 3
return m;
#endif

View file

@ -75,8 +75,9 @@ LocalSchedulerMock *LocalSchedulerMock_init(int num_workers,
const char *node_ip_address = "127.0.0.1";
const char *redis_addr = node_ip_address;
int redis_port = 6379;
const double static_resource_conf[ResourceIndex_MAX] = {kDefaultNumCPUs,
kDefaultNumGPUs};
const double static_resource_conf[ResourceIndex_MAX] = {
RayConfig::instance().default_num_CPUs(),
RayConfig::instance().default_num_GPUs()};
LocalSchedulerMock *mock =
(LocalSchedulerMock *) malloc(sizeof(LocalSchedulerMock));
memset(mock, 0, sizeof(LocalSchedulerMock));

View file

@ -540,10 +540,10 @@ void process_message(event_loop *loop,
int write_object_chunk(ClientConnection *conn, PlasmaRequestBuffer *buf) {
LOG_DEBUG("Writing data to fd %d", conn->fd);
ssize_t r, s;
/* Try to write one BUFSIZE at a time. */
/* Try to write one buf_size at a time. */
s = buf->data_size + buf->metadata_size - conn->cursor;
if (s > BUFSIZE)
s = BUFSIZE;
if (s > RayConfig::instance().buf_size())
s = RayConfig::instance().buf_size();
r = write(conn->fd, buf->data + conn->cursor, s);
if (r != s) {
@ -641,10 +641,10 @@ int read_object_chunk(ClientConnection *conn, PlasmaRequestBuffer *buf) {
buf->data + conn->cursor);
ssize_t r, s;
CHECK(buf != NULL);
/* Try to read one BUFSIZE at a time. */
/* Try to read one buf_size at a time. */
s = buf->data_size + buf->metadata_size - conn->cursor;
if (s > BUFSIZE) {
s = BUFSIZE;
if (s > RayConfig::instance().buf_size()) {
s = RayConfig::instance().buf_size();
}
r = read(conn->fd, buf->data + conn->cursor, s);
@ -941,12 +941,13 @@ int fetch_timeout_handler(event_loop *loop, timer_id id, void *context) {
}
free(object_ids_to_request);
/* Wait at least MANAGER_TIMEOUT before running this timeout handler again.
* But if we're waiting for a large number of objects, wait longer (e.g., 10
* seconds for one million objects) so that we don't overwhelm other
* components like Redis with too many requests (and so that we don't
* overwhelm this manager with responses). */
return std::max(MANAGER_TIMEOUT, int(0.01 * num_object_ids));
/* Wait at least manager_timeout_milliseconds before running this timeout
* handler again. But if we're waiting for a large number of objects, wait
* longer (e.g., 10 seconds for one million objects) so that we don't
* overwhelm other components like Redis with too many requests (and so that
* we don't overwhelm this manager with responses). */
return std::max(RayConfig::instance().manager_timeout_milliseconds(),
int64_t(0.01 * num_object_ids));
}
bool is_object_local(PlasmaManagerState *state, ObjectID object_id) {
@ -1466,8 +1467,8 @@ void process_message(event_loop *loop,
/* Print a warning if this method took too long. */
int64_t end_time = current_time_ms();
int64_t max_time_for_handler = 1000;
if (end_time - start_time > max_time_for_handler) {
if (end_time - start_time >
RayConfig::instance().max_time_for_handler_milliseconds()) {
LOG_WARN("process_message of type %" PRId64 " took %" PRId64
" milliseconds.",
type, end_time - start_time);
@ -1481,14 +1482,15 @@ int heartbeat_handler(event_loop *loop, timer_id id, void *context) {
int64_t current_time = current_time_ms();
CHECK(current_time >= state->previous_heartbeat_time);
if (current_time - state->previous_heartbeat_time >
NUM_HEARTBEATS_TIMEOUT * HEARTBEAT_TIMEOUT_MILLISECONDS) {
RayConfig::instance().num_heartbeats_timeout() *
RayConfig::instance().heartbeat_timeout_milliseconds()) {
LOG_FATAL("The last heartbeat was sent %" PRId64 " milliseconds ago.",
current_time - state->previous_heartbeat_time);
}
state->previous_heartbeat_time = current_time;
plasma_manager_send_heartbeat(state->db);
return HEARTBEAT_TIMEOUT_MILLISECONDS;
return RayConfig::instance().heartbeat_timeout_milliseconds();
}
void start_server(const char *store_socket_name,
@ -1532,10 +1534,12 @@ void start_server(const char *store_socket_name,
g_manager_state, NULL, NULL, NULL);
/* Set up a recurring timer that will loop through the outstanding fetch
* requests and reissue requests for transfers of those objects. */
event_loop_add_timer(g_manager_state->loop, MANAGER_TIMEOUT,
event_loop_add_timer(g_manager_state->loop,
RayConfig::instance().manager_timeout_milliseconds(),
fetch_timeout_handler, g_manager_state);
/* Publish the heartbeats to all subscribers of the plasma manager table. */
event_loop_add_timer(g_manager_state->loop, HEARTBEAT_TIMEOUT_MILLISECONDS,
event_loop_add_timer(g_manager_state->loop,
RayConfig::instance().heartbeat_timeout_milliseconds(),
heartbeat_handler, g_manager_state);
/* Run the event loop. */
event_loop_run(g_manager_state->loop);

View file

@ -9,14 +9,6 @@
#define NUM_RETRIES RAY_NUM_RETRIES
#endif
/* Timeouts are in milliseconds. */
#define MANAGER_TIMEOUT 1000
#define NUM_HEARTBEATS_TIMEOUT 100
/* The buffer size in bytes. Data will get transfered in multiples of this */
#define BUFSIZE 4096
typedef struct PlasmaManagerState PlasmaManagerState;
typedef struct ClientConnection ClientConnection;
@ -188,10 +180,9 @@ void call_request_transfer(ObjectID object_id,
void *context);
/*
* This runs periodically (every MANAGER_TIMEOUT milliseconds) and reissues
* transfer requests for all outstanding fetch requests. This is only exposed so
* that it can be called from the tests.
*
* This runs periodically (every manager_timeout_milliseconds milliseconds) and
* reissues transfer requests for all outstanding fetch requests. This is only
* exposed so that it can be called from the tests.
*/
int fetch_timeout_handler(event_loop *loop, timer_id id, void *context);

View file

@ -122,8 +122,9 @@ TEST request_transfer_test(void) {
manager_vector.push_back(std::string("127.0.0.1:") +
std::to_string(remote_mock->port));
call_request_transfer(object_id, manager_vector, local_mock->state);
event_loop_add_timer(local_mock->loop, MANAGER_TIMEOUT, test_done_handler,
local_mock->state);
event_loop_add_timer(local_mock->loop,
RayConfig::instance().manager_timeout_milliseconds(),
test_done_handler, local_mock->state);
event_loop_run(local_mock->loop);
int read_fd = get_client_sock(remote_mock->read_conn);
std::vector<uint8_t> request_data;
@ -166,13 +167,15 @@ TEST request_transfer_retry_test(void) {
std::to_string(remote_mock2->port));
call_request_transfer(object_id, manager_vector, local_mock->state);
event_loop_add_timer(local_mock->loop, MANAGER_TIMEOUT * 2, test_done_handler,
local_mock->state);
event_loop_add_timer(local_mock->loop,
RayConfig::instance().manager_timeout_milliseconds() * 2,
test_done_handler, local_mock->state);
/* Register the fetch timeout handler. This is normally done when the plasma
* manager is started. It is needed here so that retries will happen when
* fetch requests time out. */
event_loop_add_timer(local_mock->loop, MANAGER_TIMEOUT, fetch_timeout_handler,
local_mock->state);
event_loop_add_timer(local_mock->loop,
RayConfig::instance().manager_timeout_milliseconds(),
fetch_timeout_handler, local_mock->state);
event_loop_run(local_mock->loop);
int read_fd = get_client_sock(remote_mock2->read_conn);