ray/src/local_scheduler/local_scheduler_extension.cc
Robert Nishihara a4d8e13094 Suppress excess warning messages related to intentional actor deaths. (#627)
* Don't submit the actor destructor tasks when the job is exiting.

* Don't propagate error messages to the driver when an actor exits intentionally.
2017-06-01 20:10:40 +00:00

281 lines
10 KiB
C++

#include <Python.h>
#include "common_extension.h"
#include "local_scheduler_client.h"
#include "task.h"
PyObject *LocalSchedulerError;
// clang-format off
typedef struct {
PyObject_HEAD
LocalSchedulerConnection *local_scheduler_connection;
} PyLocalSchedulerClient;
// clang-format on
static int PyLocalSchedulerClient_init(PyLocalSchedulerClient *self,
PyObject *args,
PyObject *kwds) {
char *socket_name;
UniqueID client_id;
ActorID actor_id;
PyObject *is_worker;
int num_gpus;
if (!PyArg_ParseTuple(args, "sO&O&Oi", &socket_name, PyStringToUniqueID,
&client_id, PyStringToUniqueID, &actor_id, &is_worker,
&num_gpus)) {
self->local_scheduler_connection = NULL;
return -1;
}
/* Connect to the local scheduler. */
self->local_scheduler_connection = LocalSchedulerConnection_init(
socket_name, client_id, actor_id, (bool) PyObject_IsTrue(is_worker),
num_gpus);
return 0;
}
static void PyLocalSchedulerClient_dealloc(PyLocalSchedulerClient *self) {
if (self->local_scheduler_connection != NULL) {
LocalSchedulerConnection_free(self->local_scheduler_connection);
}
Py_TYPE(self)->tp_free((PyObject *) self);
}
static PyObject *PyLocalSchedulerClient_disconnect(PyObject *self) {
local_scheduler_disconnect_client(
((PyLocalSchedulerClient *) self)->local_scheduler_connection);
Py_RETURN_NONE;
}
static PyObject *PyLocalSchedulerClient_submit(PyObject *self, PyObject *args) {
PyObject *py_task;
if (!PyArg_ParseTuple(args, "O", &py_task)) {
return NULL;
}
local_scheduler_submit(
((PyLocalSchedulerClient *) self)->local_scheduler_connection,
((PyTask *) py_task)->spec, ((PyTask *) py_task)->size);
Py_RETURN_NONE;
}
// clang-format off
static PyObject *PyLocalSchedulerClient_get_task(PyObject *self) {
TaskSpec *task_spec;
/* Drop the global interpreter lock while we get a task because
* local_scheduler_get_task may block for a long time. */
int64_t task_size;
Py_BEGIN_ALLOW_THREADS
task_spec = local_scheduler_get_task(
((PyLocalSchedulerClient *) self)->local_scheduler_connection, &task_size);
Py_END_ALLOW_THREADS
return PyTask_make(task_spec, task_size);
}
// clang-format on
static PyObject *PyLocalSchedulerClient_reconstruct_object(PyObject *self,
PyObject *args) {
ObjectID object_id;
if (!PyArg_ParseTuple(args, "O&", PyStringToUniqueID, &object_id)) {
return NULL;
}
local_scheduler_reconstruct_object(
((PyLocalSchedulerClient *) self)->local_scheduler_connection, object_id);
Py_RETURN_NONE;
}
static PyObject *PyLocalSchedulerClient_log_event(PyObject *self,
PyObject *args) {
const char *key;
int key_length;
const char *value;
int value_length;
if (!PyArg_ParseTuple(args, "s#s#", &key, &key_length, &value,
&value_length)) {
return NULL;
}
local_scheduler_log_event(
((PyLocalSchedulerClient *) self)->local_scheduler_connection,
(uint8_t *) key, key_length, (uint8_t *) value, value_length);
Py_RETURN_NONE;
}
static PyObject *PyLocalSchedulerClient_notify_unblocked(PyObject *self) {
local_scheduler_notify_unblocked(
((PyLocalSchedulerClient *) self)->local_scheduler_connection);
Py_RETURN_NONE;
}
static PyObject *PyLocalSchedulerClient_compute_put_id(PyObject *self,
PyObject *args) {
int put_index;
TaskID task_id;
if (!PyArg_ParseTuple(args, "O&i", &PyObjectToUniqueID, &task_id,
&put_index)) {
return NULL;
}
ObjectID put_id = task_compute_put_id(task_id, put_index);
local_scheduler_put_object(
((PyLocalSchedulerClient *) self)->local_scheduler_connection, task_id,
put_id);
return PyObjectID_make(put_id);
}
static PyObject *PyLocalSchedulerClient_gpu_ids(PyObject *self) {
/* Construct a Python list of GPU IDs. */
std::vector<int> gpu_ids =
((PyLocalSchedulerClient *) self)->local_scheduler_connection->gpu_ids;
int num_gpu_ids = gpu_ids.size();
PyObject *gpu_ids_list = PyList_New((Py_ssize_t) num_gpu_ids);
for (int i = 0; i < num_gpu_ids; ++i) {
PyList_SetItem(gpu_ids_list, i, PyLong_FromLong(gpu_ids[i]));
}
return gpu_ids_list;
}
static PyMethodDef PyLocalSchedulerClient_methods[] = {
{"disconnect", (PyCFunction) PyLocalSchedulerClient_disconnect, METH_NOARGS,
"Notify the local scheduler that this client is exiting gracefully."},
{"submit", (PyCFunction) PyLocalSchedulerClient_submit, METH_VARARGS,
"Submit a task to the local scheduler."},
{"get_task", (PyCFunction) PyLocalSchedulerClient_get_task, METH_NOARGS,
"Get a task from the local scheduler."},
{"reconstruct_object",
(PyCFunction) PyLocalSchedulerClient_reconstruct_object, METH_VARARGS,
"Ask the local scheduler to reconstruct an object."},
{"log_event", (PyCFunction) PyLocalSchedulerClient_log_event, METH_VARARGS,
"Log an event to the event log through the local scheduler."},
{"notify_unblocked", (PyCFunction) PyLocalSchedulerClient_notify_unblocked,
METH_NOARGS, "Notify the local scheduler that we are unblocked."},
{"compute_put_id", (PyCFunction) PyLocalSchedulerClient_compute_put_id,
METH_VARARGS, "Return the object ID for a put call within a task."},
{"gpu_ids", (PyCFunction) PyLocalSchedulerClient_gpu_ids, METH_NOARGS,
"Get the IDs of the GPUs that are reserved for this client."},
{NULL} /* Sentinel */
};
static PyTypeObject PyLocalSchedulerClientType = {
PyVarObject_HEAD_INIT(NULL, 0) /* ob_size */
"local_scheduler.LocalSchedulerClient", /* tp_name */
sizeof(PyLocalSchedulerClient), /* tp_basicsize */
0, /* tp_itemsize */
(destructor) PyLocalSchedulerClient_dealloc, /* tp_dealloc */
0, /* tp_print */
0, /* tp_getattr */
0, /* tp_setattr */
0, /* tp_compare */
0, /* tp_repr */
0, /* tp_as_number */
0, /* tp_as_sequence */
0, /* tp_as_mapping */
0, /* tp_hash */
0, /* tp_call */
0, /* tp_str */
0, /* tp_getattro */
0, /* tp_setattro */
0, /* tp_as_buffer */
Py_TPFLAGS_DEFAULT, /* tp_flags */
"LocalSchedulerClient object", /* tp_doc */
0, /* tp_traverse */
0, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iternext */
PyLocalSchedulerClient_methods, /* tp_methods */
0, /* tp_members */
0, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
(initproc) PyLocalSchedulerClient_init, /* tp_init */
0, /* tp_alloc */
PyType_GenericNew, /* tp_new */
};
static PyMethodDef local_scheduler_methods[] = {
{"check_simple_value", check_simple_value, METH_VARARGS,
"Should the object be passed by value?"},
{"task_from_string", PyTask_from_string, METH_VARARGS,
"Creates a Python PyTask object from a string representation of "
"TaskSpec."},
{"task_to_string", PyTask_to_string, METH_VARARGS,
"Translates a PyTask python object to a byte string."},
{NULL} /* Sentinel */
};
#if PY_MAJOR_VERSION >= 3
static struct PyModuleDef moduledef = {
PyModuleDef_HEAD_INIT,
"liblocal_scheduler", /* m_name */
"A module for the local scheduler.", /* m_doc */
0, /* m_size */
local_scheduler_methods, /* m_methods */
NULL, /* m_reload */
NULL, /* m_traverse */
NULL, /* m_clear */
NULL, /* m_free */
};
#endif
#if PY_MAJOR_VERSION >= 3
#define INITERROR return NULL
#else
#define INITERROR return
#endif
#ifndef PyMODINIT_FUNC /* declarations for DLL import/export */
#define PyMODINIT_FUNC void
#endif
#if PY_MAJOR_VERSION >= 3
#define MOD_INIT(name) PyMODINIT_FUNC PyInit_##name(void)
#else
#define MOD_INIT(name) PyMODINIT_FUNC init##name(void)
#endif
MOD_INIT(liblocal_scheduler_library) {
if (PyType_Ready(&PyTaskType) < 0) {
INITERROR;
}
if (PyType_Ready(&PyObjectIDType) < 0) {
INITERROR;
}
if (PyType_Ready(&PyLocalSchedulerClientType) < 0) {
INITERROR;
}
#if PY_MAJOR_VERSION >= 3
PyObject *m = PyModule_Create(&moduledef);
#else
PyObject *m =
Py_InitModule3("liblocal_scheduler_library", local_scheduler_methods,
"A module for the local scheduler.");
#endif
init_pickle_module();
Py_INCREF(&PyTaskType);
PyModule_AddObject(m, "Task", (PyObject *) &PyTaskType);
Py_INCREF(&PyObjectIDType);
PyModule_AddObject(m, "ObjectID", (PyObject *) &PyObjectIDType);
Py_INCREF(&PyLocalSchedulerClientType);
PyModule_AddObject(m, "LocalSchedulerClient",
(PyObject *) &PyLocalSchedulerClientType);
g_task_builder = make_task_builder();
char local_scheduler_error[] = "local_scheduler.error";
LocalSchedulerError = PyErr_NewException(local_scheduler_error, NULL, NULL);
Py_INCREF(LocalSchedulerError);
PyModule_AddObject(m, "local_scheduler_error", LocalSchedulerError);
#if PY_MAJOR_VERSION >= 3
return m;
#endif
}