Python API for constructing tasks (#28)

* Python API for constructing tasks

* Fixes.
This commit is contained in:
Philipp Moritz 2016-10-04 15:56:24 -07:00 committed by GitHub
parent da5ec3b5e0
commit 8e044535e2
10 changed files with 484 additions and 2 deletions

View file

@ -3,4 +3,7 @@ DerivePointerAlignment: true
IndentCaseLabels: false
PointerAlignment: Right
SpaceAfterCStyleCast: true
AllowShortBlocksOnASingleLine: false
AllowShortCaseLabelsOnASingleLine: false
AllowShortFunctionsOnASingleLine: false
AllowShortIfStatementsOnASingleLine: false

View file

@ -6,8 +6,16 @@ matrix:
include:
- os: linux
dist: trusty
python: "2.7"
- os: linux
dist: trusty
python: "3.5"
- os: osx
osx_image: xcode7
python: "2.7"
- os: osx
osx_image: xcode7
python: "3.5"
- os: linux
dist: trusty
env: LINT=1
@ -22,5 +30,12 @@ matrix:
- .travis/check-git-clang-format-output.sh
install:
- ./install-dependencies.sh
- make
- make test
- cd lib/python
- python setup.py install --user
- cd ../..
script:
- python test/test.py

21
install-dependencies.sh Executable file
View file

@ -0,0 +1,21 @@
#!/usr/bin/env bash
ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)
platform="unknown"
unamestr="$(uname)"
if [[ "$unamestr" == "Linux" ]]; then
echo "Platform is linux."
platform="linux"
elif [[ "$unamestr" == "Darwin" ]]; then
echo "Platform is macosx."
platform="macosx"
else
echo "Unrecognized platform."
exit 1
fi
if [[ $platform == "linux" ]]; then
sudo apt-get update
sudo apt-get install -y git python-dev
fi

91
lib/python/object_id.c Normal file
View file

@ -0,0 +1,91 @@
#include "types.h"
int PyObjectToUniqueID(PyObject *object, object_id *objectid) {
if (PyObject_IsInstance(object, (PyObject *) &PyObjectIDType)) {
*objectid = ((PyObjectID *) object)->object_id;
return 1;
} else {
PyErr_SetString(PyExc_TypeError, "must be an ObjectID");
return 0;
}
}
static int PyObjectID_init(PyObjectID *self, PyObject *args, PyObject *kwds) {
const char *data;
int size;
if (!PyArg_ParseTuple(args, "s#", &data, &size)) {
return -1;
}
if (size != UNIQUE_ID_SIZE) {
PyErr_SetString(CommonError,
"ObjectID: object id string needs to have length 20");
return -1;
}
memcpy(&self->object_id.id[0], data, UNIQUE_ID_SIZE);
return 0;
}
/* create PyObjectID from C */
PyObject *PyObjectID_make(object_id object_id) {
PyObjectID *result = PyObject_New(PyObjectID, &PyObjectIDType);
result = (PyObjectID *) PyObject_Init((PyObject *) result, &PyObjectIDType);
result->object_id = object_id;
return (PyObject *) result;
}
static PyObject *PyObjectID_id(PyObject *self) {
PyObjectID *s = (PyObjectID *) self;
return PyString_FromStringAndSize((char *) &s->object_id.id[0],
UNIQUE_ID_SIZE);
}
static PyMethodDef PyObjectID_methods[] = {
{"id", (PyCFunction) PyObjectID_id, METH_NOARGS,
"Return the hash associated with this ObjectID"},
{NULL} /* Sentinel */
};
static PyMemberDef PyObjectID_members[] = {
{NULL} /* Sentinel */
};
PyTypeObject PyObjectIDType = {
PyObject_HEAD_INIT(NULL) 0, /* ob_size */
"common.ObjectID", /* tp_name */
sizeof(PyObjectID), /* tp_basicsize */
0, /* tp_itemsize */
0, /* tp_dealloc */
0, /* tp_print */
0, /* tp_getattr */
0, /* tp_setattr */
0, /* tp_compare */
0, /* tp_repr */
0, /* tp_as_number */
0, /* tp_as_sequence */
0, /* tp_as_mapping */
0, /* tp_hash */
0, /* tp_call */
0, /* tp_str */
0, /* tp_getattro */
0, /* tp_setattro */
0, /* tp_as_buffer */
Py_TPFLAGS_DEFAULT, /* tp_flags */
"ObjectID object", /* tp_doc */
0, /* tp_traverse */
0, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iternext */
PyObjectID_methods, /* tp_methods */
PyObjectID_members, /* tp_members */
0, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
(initproc) PyObjectID_init, /* tp_init */
0, /* tp_alloc */
PyType_GenericNew, /* tp_new */
};

View file

@ -0,0 +1,80 @@
#include "types.h"
/* TODO(pcm): Add limit on total number of elements. */
#define SIZE_LIMIT 100
#define NUM_ELEMENTS_LIMIT 1000
/**
* This method checks if a Python object is sufficiently simple that it can be
* serialized and passed by value as an argument to a task (without being put in
* the object store). The details of which objects are sufficiently simple are
* defined by this method and are not particularly important. But for
* performance reasons, it is better to place "small" objects in the task itself
* and "large" objects in the object store.
*
* @param value The Python object in question.
* @param num_elements_contained If this method returns 1, then the number of
* objects recursively contained within this object will be added to the
* value at this address. This is used to make sure that we do not
* serialize objects that are too large.
* @return 0 if the object cannot be serialized in the task and 1 if it can.
*/
int is_simple_value(PyObject *value, int *num_elements_contained) {
*num_elements_contained += 1;
if (*num_elements_contained >= NUM_ELEMENTS_LIMIT) {
return 0;
}
if (PyInt_Check(value) || PyLong_Check(value) || value == Py_False ||
value == Py_True || PyFloat_Check(value) || value == Py_None) {
return 1;
}
if (PyString_CheckExact(value)) {
*num_elements_contained += PyString_Size(value);
return (*num_elements_contained < NUM_ELEMENTS_LIMIT);
}
if (PyUnicode_CheckExact(value)) {
*num_elements_contained += PyUnicode_GET_SIZE(value);
return (*num_elements_contained < NUM_ELEMENTS_LIMIT);
}
if (PyList_CheckExact(value) && PyList_Size(value) < SIZE_LIMIT) {
for (size_t i = 0; i < PyList_Size(value); ++i) {
if (!is_simple_value(PyList_GetItem(value, i), num_elements_contained)) {
return 0;
}
}
return (*num_elements_contained < NUM_ELEMENTS_LIMIT);
}
if (PyDict_CheckExact(value) && PyDict_Size(value) < SIZE_LIMIT) {
PyObject *key, *val;
Py_ssize_t pos = 0;
while (PyDict_Next(value, &pos, &key, &val)) {
if (!is_simple_value(key, num_elements_contained) ||
!is_simple_value(val, num_elements_contained)) {
return 0;
}
}
return (*num_elements_contained < NUM_ELEMENTS_LIMIT);
}
if (PyTuple_CheckExact(value) && PyTuple_Size(value) < SIZE_LIMIT) {
for (size_t i = 0; i < PyTuple_Size(value); ++i) {
if (!is_simple_value(PyTuple_GetItem(value, i), num_elements_contained)) {
return 0;
}
}
return (*num_elements_contained < NUM_ELEMENTS_LIMIT);
}
return 0;
}
PyObject *check_simple_value(PyObject *self, PyObject *args) {
PyObject *value;
if (!PyArg_ParseTuple(args, "O", &value)) {
return NULL;
}
int num_elements_contained = 0;
if (is_simple_value(value, &num_elements_contained)) {
Py_RETURN_TRUE;
}
Py_RETURN_FALSE;
}

12
lib/python/setup.py Normal file
View file

@ -0,0 +1,12 @@
from setuptools import setup, find_packages, Extension
common_module = Extension("common",
sources=["object_id.c", "serialization.c", "task.c"],
include_dirs=["../../", "../../thirdparty"],
extra_objects=["../../build/libcommon.a"],
extra_compile_args=["--std=c99", "-Werror"])
setup(name="Common",
version="0.1",
description="Common library for Ray",
ext_modules=[common_module])

177
lib/python/task.c Normal file
View file

@ -0,0 +1,177 @@
#include <Python.h>
#include "node.h"
#include "types.h"
#include "task.h"
#include "utarray.h"
#include "utstring.h"
PyObject *CommonError;
#define MARSHAL_VERSION 2
static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) {
function_id function_id;
/* Arguments of the task (can be PyObjectIDs or Python values). */
PyObject *arguments;
/* Array of pointers to string representations of pass-by-value args. */
UT_array *val_repr_ptrs;
utarray_new(val_repr_ptrs, &ut_ptr_icd);
int num_returns;
if (!PyArg_ParseTuple(args, "O&Oi", &PyObjectToUniqueID, &function_id,
&arguments, &num_returns)) {
return -1;
}
size_t size = PyList_Size(arguments);
/* Determine the size of pass by value data in bytes. */
size_t value_data_bytes = 0;
for (size_t i = 0; i < size; ++i) {
PyObject *arg = PyList_GetItem(arguments, i);
if (!PyObject_IsInstance(arg, (PyObject *) &PyObjectIDType)) {
PyObject *data = PyMarshal_WriteObjectToString(arg, MARSHAL_VERSION);
value_data_bytes += PyString_Size(data);
utarray_push_back(val_repr_ptrs, &data);
}
}
/* Construct the task specification. */
int val_repr_index = 0;
self->spec =
alloc_task_spec(function_id, size, num_returns, value_data_bytes);
for (size_t i = 0; i < size; ++i) {
PyObject *arg = PyList_GetItem(arguments, i);
if (PyObject_IsInstance(arg, (PyObject *) &PyObjectIDType)) {
task_args_add_ref(self->spec, ((PyObjectID *) arg)->object_id);
} else {
PyObject *data =
*((PyObject **) utarray_eltptr(val_repr_ptrs, val_repr_index));
task_args_add_val(self->spec, (uint8_t *) PyString_AS_STRING(data),
PyString_GET_SIZE(data));
Py_DECREF(data);
val_repr_index += 1;
}
}
utarray_free(val_repr_ptrs);
return 0;
}
static void PyTask_dealloc(PyTask *self) {
free_task_spec(self->spec);
Py_TYPE(self)->tp_free((PyObject *) self);
}
static PyObject *PyTask_function_id(PyObject *self) {
function_id function_id = *task_function(((PyTask *) self)->spec);
return PyObjectID_make(function_id);
}
static PyObject *PyTask_arguments(PyObject *self, PyObject *args) {
int arg_index;
task_spec *spec = ((PyTask *) self)->spec;
if (!PyArg_ParseTuple(args, "i", &arg_index)) {
return NULL;
}
if (task_arg_type(spec, arg_index) == ARG_BY_REF) {
object_id object_id = *task_arg_id(spec, arg_index);
return PyObjectID_make(object_id);
} else {
PyObject *s = PyMarshal_ReadObjectFromString(
(char *) task_arg_val(spec, arg_index),
(Py_ssize_t) task_arg_length(spec, arg_index));
Py_DECREF(s);
Py_RETURN_NONE;
}
}
static PyObject *PyTask_returns(PyObject *self, PyObject *args) {
int ret_index;
if (!PyArg_ParseTuple(args, "i", &ret_index)) {
return NULL;
}
object_id object_id = *task_return(((PyTask *) self)->spec, ret_index);
return PyObjectID_make(object_id);
}
static PyMethodDef PyTask_methods[] = {
{"function_id", (PyCFunction) PyTask_function_id, METH_NOARGS,
"Return the function id associated with this task."},
{"arguments", (PyCFunction) PyTask_arguments, METH_VARARGS,
"Return the i-th argument of the task."},
{"returns", (PyCFunction) PyTask_returns, METH_VARARGS,
"Return the i-th object reference of the task."},
{NULL} /* Sentinel */
};
static PyTypeObject PyTaskType = {
PyObject_HEAD_INIT(NULL) 0, /* ob_size */
"task.Task", /* tp_name */
sizeof(PyTask), /* tp_basicsize */
0, /* tp_itemsize */
(destructor) PyTask_dealloc, /* tp_dealloc */
0, /* tp_print */
0, /* tp_getattr */
0, /* tp_setattr */
0, /* tp_compare */
0, /* tp_repr */
0, /* tp_as_number */
0, /* tp_as_sequence */
0, /* tp_as_mapping */
0, /* tp_hash */
0, /* tp_call */
0, /* tp_str */
0, /* tp_getattro */
0, /* tp_setattro */
0, /* tp_as_buffer */
Py_TPFLAGS_DEFAULT, /* tp_flags */
"Task object", /* tp_doc */
0, /* tp_traverse */
0, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iternext */
PyTask_methods, /* tp_methods */
0, /* tp_members */
0, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
(initproc) PyTask_init, /* tp_init */
0, /* tp_alloc */
PyType_GenericNew, /* tp_new */
};
static PyMethodDef common_methods[] = {
{"check_simple_value", check_simple_value, METH_VARARGS,
"Should the object be passed by value?"},
{NULL} /* Sentinel */
};
#ifndef PyMODINIT_FUNC /* declarations for DLL import/export */
#define PyMODINIT_FUNC void
#endif
PyMODINIT_FUNC initcommon(void) {
PyObject *m;
if (PyType_Ready(&PyTaskType) < 0)
return;
if (PyType_Ready(&PyObjectIDType) < 0)
return;
m = Py_InitModule3("common", common_methods,
"Example module that creates an extension type.");
Py_INCREF(&PyTaskType);
PyModule_AddObject(m, "Task", (PyObject *) &PyTaskType);
Py_INCREF(&PyObjectIDType);
PyModule_AddObject(m, "ObjectID", (PyObject *) &PyObjectIDType);
char common_error[] = "common.error";
CommonError = PyErr_NewException(common_error, NULL, NULL);
Py_INCREF(CommonError);
PyModule_AddObject(m, "common_error", CommonError);
}

33
lib/python/types.h Normal file
View file

@ -0,0 +1,33 @@
#ifndef TYPES_H
#define TYPES_H
#include <Python.h>
#include "marshal.h"
#include "structmember.h"
#include "common.h"
#include "task.h"
extern PyObject *CommonError;
// clang-format off
typedef struct {
PyObject_HEAD
object_id object_id;
} PyObjectID;
typedef struct {
PyObject_HEAD
task_spec *spec;
} PyTask;
// clang-format on
extern PyTypeObject PyObjectIDType;
int PyObjectToUniqueID(PyObject *object, object_id *objectid);
PyObject *PyObjectID_make(object_id object_id);
PyObject *check_simple_value(PyObject *self, PyObject *args);
#endif /* TYPES_H */

1
task.h
View file

@ -13,7 +13,6 @@
#include "utstring.h"
typedef unique_id function_id;
typedef unique_id object_id;
/* The task ID is a deterministic hash of the function ID that
* the task executes and the argument IDs or argument values */

51
test/test.py Normal file
View file

@ -0,0 +1,51 @@
from __future__ import print_function
import unittest
import common
BASE_SIMPLE_OBJECTS = [
0, 1, 100000, 0L, 1L, 100000L, 1L << 100, 0.0, 0.5, 0.9, 100000.1, (), [], {},
"", 990 * "h", u"", 990 * u"h"
]
LIST_SIMPLE_OBJECTS = [[obj] for obj in BASE_SIMPLE_OBJECTS]
TUPLE_SIMPLE_OBJECTS = [(obj,) for obj in BASE_SIMPLE_OBJECTS]
DICT_SIMPLE_OBJECTS = [{(): obj} for obj in BASE_SIMPLE_OBJECTS]
SIMPLE_OBJECTS = (BASE_SIMPLE_OBJECTS +
LIST_SIMPLE_OBJECTS +
TUPLE_SIMPLE_OBJECTS +
DICT_SIMPLE_OBJECTS)
# Create some complex objects that cannot be serialized by value in tasks.
l = []
l.append(l)
class Foo(object):
def __init__(self):
pass
BASE_COMPLEX_OBJECTS = [999 * "h", 999 * u"h", l, Foo(), 10 * [10 * [10 * [1]]]]
LIST_COMPLEX_OBJECTS = [[obj] for obj in BASE_COMPLEX_OBJECTS]
TUPLE_COMPLEX_OBJECTS = [(obj,) for obj in BASE_COMPLEX_OBJECTS]
DICT_COMPLEX_OBJECTS = [{(): obj} for obj in BASE_COMPLEX_OBJECTS]
COMPLEX_OBJECTS = (BASE_COMPLEX_OBJECTS +
LIST_COMPLEX_OBJECTS +
TUPLE_COMPLEX_OBJECTS +
DICT_COMPLEX_OBJECTS)
class TestPlasmaClient(unittest.TestCase):
def test_serialize_by_value(self):
for val in SIMPLE_OBJECTS:
self.assertTrue(common.check_simple_value(val))
for val in COMPLEX_OBJECTS:
self.assertFalse(common.check_simple_value(val))
if __name__ == "__main__":
unittest.main(verbosity=2)