ray/src/plasma/plasma_extension.c

367 lines
12 KiB
C
Raw Normal View History

#include <Python.h>
#include "common.h"
#include "io.h"
#include "plasma_client.h"
#include "object_info.h"
static int PyObjectToPlasmaConnection(PyObject *object,
plasma_connection **conn) {
if (PyCapsule_IsValid(object, "plasma")) {
*conn = (plasma_connection *) PyCapsule_GetPointer(object, "plasma");
return 1;
} else {
PyErr_SetString(PyExc_TypeError, "must be a 'plasma' capsule");
return 0;
}
}
static int PyObjectToUniqueID(PyObject *object, object_id *object_id) {
if (PyString_Check(object)) {
memcpy(&object_id->id[0], PyString_AsString(object), UNIQUE_ID_SIZE);
return 1;
} else {
PyErr_SetString(PyExc_TypeError, "must be a 20 character string");
return 0;
}
}
PyObject *PyPlasma_connect(PyObject *self, PyObject *args) {
const char *store_socket_name;
const char *manager_socket_name;
int release_delay;
if (!PyArg_ParseTuple(args, "ssi", &store_socket_name, &manager_socket_name,
&release_delay)) {
return NULL;
}
plasma_connection *conn;
if (strlen(manager_socket_name) == 0) {
conn = plasma_connect(store_socket_name, NULL, release_delay);
} else {
conn =
plasma_connect(store_socket_name, manager_socket_name, release_delay);
}
return PyCapsule_New(conn, "plasma", NULL);
}
PyObject *PyPlasma_disconnect(PyObject *self, PyObject *args) {
plasma_connection *conn;
if (!PyArg_ParseTuple(args, "O&", PyObjectToPlasmaConnection, &conn)) {
return NULL;
}
plasma_disconnect(conn);
Py_RETURN_NONE;
}
PyObject *PyPlasma_create(PyObject *self, PyObject *args) {
plasma_connection *conn;
object_id object_id;
long long size;
PyObject *metadata;
if (!PyArg_ParseTuple(args, "O&O&LO", PyObjectToPlasmaConnection, &conn,
PyObjectToUniqueID, &object_id, &size, &metadata)) {
return NULL;
}
if (!PyByteArray_Check(metadata)) {
PyErr_SetString(PyExc_TypeError, "metadata must be a bytearray");
return NULL;
}
uint8_t *data;
bool created = plasma_create(conn, object_id, size,
(uint8_t *) PyByteArray_AsString(metadata),
PyByteArray_Size(metadata), &data);
if (!created) {
PyErr_SetString(PyExc_RuntimeError,
"an object with this ID could not be created");
return NULL;
}
return PyBuffer_FromReadWriteMemory((void *) data, (Py_ssize_t) size);
}
PyObject *PyPlasma_hash(PyObject *self, PyObject *args) {
plasma_connection *conn;
object_id object_id;
if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaConnection, &conn,
PyObjectToUniqueID, &object_id)) {
return NULL;
}
unsigned char digest[DIGEST_SIZE];
bool success = plasma_compute_object_hash(conn, object_id, digest);
if (success) {
PyObject *digest_string = PyString_FromStringAndSize(digest, DIGEST_SIZE);
return digest_string;
} else {
Py_RETURN_NONE;
}
}
PyObject *PyPlasma_seal(PyObject *self, PyObject *args) {
plasma_connection *conn;
object_id object_id;
if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaConnection, &conn,
PyObjectToUniqueID, &object_id)) {
return NULL;
}
plasma_seal(conn, object_id);
Py_RETURN_NONE;
}
PyObject *PyPlasma_release(PyObject *self, PyObject *args) {
plasma_connection *conn;
object_id object_id;
if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaConnection, &conn,
PyObjectToUniqueID, &object_id)) {
return NULL;
}
plasma_release(conn, object_id);
Py_RETURN_NONE;
}
PyObject *PyPlasma_get(PyObject *self, PyObject *args) {
plasma_connection *conn;
object_id object_id;
if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaConnection, &conn,
PyObjectToUniqueID, &object_id)) {
return NULL;
}
int64_t size;
uint8_t *data;
int64_t metadata_size;
uint8_t *metadata;
2016-12-07 17:01:14 -08:00
Py_BEGIN_ALLOW_THREADS;
plasma_get(conn, object_id, &size, &data, &metadata_size, &metadata);
2016-12-07 17:01:14 -08:00
Py_END_ALLOW_THREADS;
PyObject *t = PyTuple_New(2);
PyTuple_SetItem(t, 0, PyBuffer_FromMemory((void *) data, (Py_ssize_t) size));
PyTuple_SetItem(t, 1, PyByteArray_FromStringAndSize(
(void *) metadata, (Py_ssize_t) metadata_size));
return t;
}
PyObject *PyPlasma_contains(PyObject *self, PyObject *args) {
plasma_connection *conn;
object_id object_id;
if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaConnection, &conn,
PyObjectToUniqueID, &object_id)) {
return NULL;
}
int has_object;
plasma_contains(conn, object_id, &has_object);
if (has_object)
Py_RETURN_TRUE;
else
Py_RETURN_FALSE;
}
PyObject *PyPlasma_fetch(PyObject *self, PyObject *args) {
plasma_connection *conn;
PyObject *object_id_list;
if (!PyArg_ParseTuple(args, "O&O", PyObjectToPlasmaConnection, &conn,
&object_id_list)) {
return NULL;
}
if (!plasma_manager_is_connected(conn)) {
PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager");
return NULL;
}
Py_ssize_t n = PyList_Size(object_id_list);
object_id *object_ids = malloc(sizeof(object_id) * n);
for (int i = 0; i < n; ++i) {
PyObjectToUniqueID(PyList_GetItem(object_id_list, i), &object_ids[i]);
}
plasma_fetch(conn, (int) n, object_ids);
free(object_ids);
Py_RETURN_NONE;
}
PyObject *PyPlasma_wait(PyObject *self, PyObject *args) {
plasma_connection *conn;
PyObject *object_id_list;
long long timeout;
int num_returns;
if (!PyArg_ParseTuple(args, "O&OLi", PyObjectToPlasmaConnection, &conn,
&object_id_list, &timeout, &num_returns)) {
return NULL;
}
Py_ssize_t n = PyList_Size(object_id_list);
if (!plasma_manager_is_connected(conn)) {
PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager");
return NULL;
}
if (num_returns < 0) {
PyErr_SetString(PyExc_RuntimeError,
"The argument num_returns cannot be less than zero.");
return NULL;
}
if (num_returns > n) {
PyErr_SetString(
PyExc_RuntimeError,
"The argument num_returns cannot be greater than len(object_ids)");
return NULL;
}
int64_t threshold = 1 << 30;
if (timeout > threshold) {
PyErr_SetString(PyExc_RuntimeError,
"The argument timeout cannot be greater than 2 ** 30.");
return NULL;
}
object_request *object_requests = malloc(sizeof(object_request) * n);
for (int i = 0; i < n; ++i) {
PyObjectToUniqueID(PyList_GetItem(object_id_list, i),
&object_requests[i].object_id);
object_requests[i].type = PLASMA_QUERY_ANYWHERE;
}
/* Drop the global interpreter lock while we are waiting, so other threads can
* run. */
int num_return_objects;
Py_BEGIN_ALLOW_THREADS;
num_return_objects = plasma_wait(conn, (int) n, object_requests, num_returns,
(uint64_t) timeout);
Py_END_ALLOW_THREADS;
int num_to_return = MIN(num_return_objects, num_returns);
PyObject *ready_ids = PyList_New(num_to_return);
PyObject *waiting_ids = PySet_New(object_id_list);
int num_returned = 0;
for (int i = 0; i < n; ++i) {
if (num_returned == num_to_return) {
break;
}
if (object_requests[i].status == PLASMA_OBJECT_LOCAL ||
object_requests[i].status == PLASMA_OBJECT_REMOTE) {
PyObject *ready =
PyString_FromStringAndSize((char *) object_requests[i].object_id.id,
sizeof(object_requests[i].object_id));
PyList_SetItem(ready_ids, num_returned, ready);
PySet_Discard(waiting_ids, ready);
num_returned += 1;
} else {
CHECK(object_requests[i].status == PLASMA_OBJECT_NONEXISTENT);
}
}
CHECK(num_returned == num_to_return);
/* Return both the ready IDs and the remaining IDs. */
PyObject *t = PyTuple_New(2);
PyTuple_SetItem(t, 0, ready_ids);
PyTuple_SetItem(t, 1, waiting_ids);
return t;
}
PyObject *PyPlasma_evict(PyObject *self, PyObject *args) {
plasma_connection *conn;
long long num_bytes;
if (!PyArg_ParseTuple(args, "O&L", PyObjectToPlasmaConnection, &conn,
&num_bytes)) {
return NULL;
}
int64_t evicted_bytes = plasma_evict(conn, (int64_t) num_bytes);
return PyInt_FromLong((long) evicted_bytes);
}
PyObject *PyPlasma_delete(PyObject *self, PyObject *args) {
plasma_connection *conn;
object_id object_id;
if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaConnection, &conn,
PyObjectToUniqueID, &object_id)) {
return NULL;
}
plasma_delete(conn, object_id);
Py_RETURN_NONE;
}
PyObject *PyPlasma_transfer(PyObject *self, PyObject *args) {
plasma_connection *conn;
object_id object_id;
const char *addr;
int port;
if (!PyArg_ParseTuple(args, "O&O&si", PyObjectToPlasmaConnection, &conn,
PyObjectToUniqueID, &object_id, &addr, &port)) {
return NULL;
}
if (!plasma_manager_is_connected(conn)) {
PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager");
return NULL;
}
plasma_transfer(conn, addr, port, object_id);
Py_RETURN_NONE;
}
PyObject *PyPlasma_subscribe(PyObject *self, PyObject *args) {
plasma_connection *conn;
if (!PyArg_ParseTuple(args, "O&", PyObjectToPlasmaConnection, &conn)) {
return NULL;
}
int sock = plasma_subscribe(conn);
return PyInt_FromLong(sock);
}
PyObject *PyPlasma_receive_notification(PyObject *self, PyObject *args) {
int plasma_sock;
object_info object_info;
if (!PyArg_ParseTuple(args, "i", &plasma_sock)) {
return NULL;
}
/* Receive object notification from the plasma connection socket,
* return a tuple of its fields: object_id, data_size, metadata_size. */
int nbytes =
read_bytes(plasma_sock, (uint8_t *) &object_info, sizeof(object_info));
if (nbytes < 0) {
PyErr_SetString(PyExc_RuntimeError,
"Failed to read object notification from Plasma socket");
return NULL;
}
/* Construct a tuple from object_info and return. */
PyObject *t = PyTuple_New(3);
PyTuple_SetItem(t, 0, PyString_FromStringAndSize(
(char *) object_info.obj_id.id, UNIQUE_ID_SIZE));
PyTuple_SetItem(t, 1, PyInt_FromLong(object_info.data_size));
PyTuple_SetItem(t, 2, PyInt_FromLong(object_info.metadata_size));
return t;
}
static PyMethodDef plasma_methods[] = {
{"connect", PyPlasma_connect, METH_VARARGS, "Connect to plasma."},
{"disconnect", PyPlasma_disconnect, METH_VARARGS,
"Disconnect from plasma."},
{"create", PyPlasma_create, METH_VARARGS, "Create a new plasma object."},
{"hash", PyPlasma_hash, METH_VARARGS,
"Compute the hash of a plasma object."},
{"seal", PyPlasma_seal, METH_VARARGS, "Seal a plasma object."},
{"get", PyPlasma_get, METH_VARARGS, "Get a plasma object."},
{"contains", PyPlasma_contains, METH_VARARGS,
"Does the plasma store contain this plasma object?"},
{"fetch", PyPlasma_fetch, METH_VARARGS,
"Fetch the object from another plasma manager instance."},
{"wait", PyPlasma_wait, METH_VARARGS,
"Wait until num_returns objects in object_ids are ready."},
{"evict", PyPlasma_evict, METH_VARARGS,
"Evict some objects until we recover some number of bytes."},
{"release", PyPlasma_release, METH_VARARGS, "Release the plasma object."},
{"delete", PyPlasma_delete, METH_VARARGS, "Deleta a plasma object."},
{"transfer", PyPlasma_transfer, METH_VARARGS,
"Transfer object to another plasma manager."},
{"subscribe", PyPlasma_subscribe, METH_VARARGS,
"Subscribe to the plasma notification socket."},
{"receive_notification", PyPlasma_receive_notification, METH_VARARGS,
"Receive next notification from plasma notification socket."},
{NULL} /* Sentinel */
};
#ifndef PyMODINIT_FUNC /* declarations for DLL import/export */
#define PyMODINIT_FUNC void
#endif
PyMODINIT_FUNC initlibplasma(void) {
Py_InitModule3("libplasma", plasma_methods,
"A Python client library for plasma");
}