cleanup serialization code (#291)

This commit is contained in:
Philipp Moritz 2016-07-25 15:47:10 -07:00 committed by Robert Nishihara
parent 4a0f35b042
commit 5591aa4665
11 changed files with 7 additions and 215 deletions

6
.gitmodules vendored
View file

@ -4,6 +4,6 @@
[submodule "thirdparty/numbuf"]
path = thirdparty/numbuf
url = https://github.com/pcmoritz/numbuf.git
[submodule "thirdparty/numbuf-old"]
path = thirdparty/numbuf-old
url = https://github.com/amplab/numbuf
[submodule "thirdparty/arrow"]
path = thirdparty/arrow
url = https://github.com/pcmoritz/arrow.git

View file

@ -131,31 +131,10 @@ set(ARROW_LIB ${CMAKE_SOURCE_DIR}/thirdparty/arrow-old/cpp/build/release/libarro
add_definitions(-fPIC)
if(NOT APPLE)
include_directories("${CMAKE_SOURCE_DIR}/thirdparty/arrow-old/cpp/src/")
include_directories("${CMAKE_SOURCE_DIR}/thirdparty/numbuf-old/cpp/src/")
include_directories("${CMAKE_SOURCE_DIR}/thirdparty/numbuf-old/python/src/")
add_library(pynumbuf STATIC ${CMAKE_SOURCE_DIR}/thirdparty/numbuf-old/cpp/src/numbuf/tensor.cc
${CMAKE_SOURCE_DIR}/thirdparty/numbuf-old/cpp/src/numbuf/types.cc
${CMAKE_SOURCE_DIR}/thirdparty/numbuf-old/cpp/src/numbuf/metadata.cc
${CMAKE_SOURCE_DIR}/thirdparty/numbuf-old/cpp/src/numbuf/dict.cc
${CMAKE_SOURCE_DIR}/thirdparty/numbuf-old/python/src/pynumbuf/serialize.cc
${CMAKE_SOURCE_DIR}/thirdparty/numbuf-old/python/src/pynumbuf/adapters/numpy.cc
${CMAKE_SOURCE_DIR}/thirdparty/numbuf-old/python/src/pynumbuf/adapters/python.cc)
target_link_libraries(pynumbuf ${ARROW_LIB} ${PYTHON_LIBRARIES})
endif()
add_executable(objstore src/objstore.cc src/ipc.cc src/utils.cc ${GENERATED_PROTOBUF_FILES})
if(NOT APPLE)
target_link_libraries(objstore ${ARROW_LIB} pynumbuf)
endif()
add_executable(scheduler src/scheduler.cc src/computation_graph.cc src/utils.cc ${GENERATED_PROTOBUF_FILES})
add_library(raylib SHARED src/raylib.cc src/worker.cc src/ipc.cc src/utils.cc ${GENERATED_PROTOBUF_FILES})
if(NOT APPLE)
target_link_libraries(raylib ${ARROW_LIB} pynumbuf)
else()
target_link_libraries(raylib ${PYTHON_LIBRARIES})
endif()
target_link_libraries(raylib ${PYTHON_LIBRARIES})
get_filename_component(PYTHON_SHARED_LIBRARY ${PYTHON_LIBRARIES} NAME)
if(APPLE)

View file

@ -7,38 +7,10 @@
#include <stdlib.h>
#include "ray/ray.h"
#ifndef __APPLE__
using namespace arrow;
#endif
ObjHandle::ObjHandle(SegmentId segmentid, size_t size, IpcPointer ipcpointer, size_t metadata_offset)
: segmentid_(segmentid), size_(size), ipcpointer_(ipcpointer), metadata_offset_(metadata_offset)
{}
#ifndef __APPLE__
Status BufferMemorySource::Write(int64_t position, const uint8_t* data, int64_t nbytes) {
// TODO(pcm): error handling
std::memcpy(data_ + position, data, nbytes);
return Status::OK();
}
Status BufferMemorySource::ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
// TODO(pcm): error handling
*out = std::make_shared<Buffer>(data_ + position, nbytes);
return Status::OK();
}
Status BufferMemorySource::Close() {
return Status::OK();
}
int64_t BufferMemorySource::Size() const {
return size_;
}
#endif
MessageQueue<>::MessageQueue() : create_(false) { }
MessageQueue<>::~MessageQueue() {
@ -158,7 +130,7 @@ ObjHandle MemorySegmentPool::allocate(size_t size) {
// TODO(pcm): at the moment, this always creates a new segment, this will be changed
SegmentId segmentid = segments_.size();
open_segment(segmentid, size);
objstore_memcheck(size);
objstore_memcheck(size);
void* ptr = segments_[segmentid].first->allocate(size);
auto handle = segments_[segmentid].first->get_handle_from_address(ptr);
return ObjHandle(segmentid, size, handle);

View file

@ -23,11 +23,6 @@ namespace boost {
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#ifndef __APPLE__
#include <arrow/api.h>
#include <arrow/ipc/memory.h>
#endif
#include "ray/ray.h"
namespace bip = boost::interprocess;
@ -110,23 +105,6 @@ private:
size_t metadata_offset_; // offset of the metadata that describes this object
};
#ifndef __APPLE__
class BufferMemorySource: public arrow::ipc::MemorySource {
public:
BufferMemorySource(uint8_t* data, int64_t capacity) : data_(data), capacity_(capacity), size_(0) {}
virtual arrow::Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<arrow::Buffer>* out);
virtual arrow::Status Close();
virtual arrow::Status Write(int64_t position, const uint8_t* data, int64_t nbytes);
virtual int64_t Size() const;
private:
uint8_t* data_;
int64_t capacity_;
int64_t size_;
};
#endif
// Memory segment pool: A collection of shared memory segments
// used in two modes:
// \item on the object store it is used with create = true, in this case the

View file

@ -6,9 +6,6 @@
#include <structmember.h>
#define PY_ARRAY_UNIQUE_SYMBOL RAYLIB_ARRAY_API
#include <numpy/arrayobject.h>
#ifndef __APPLE__
#include <arrow/api.h>
#endif
#include <iostream>
#include "types.pb.h"
@ -482,23 +479,6 @@ static PyObject* serialize_object(PyObject* self, PyObject* args) {
return t;
}
#ifndef __APPLE__
static PyObject* put_arrow(PyObject* self, PyObject* args) {
Worker* worker;
ObjRef objref;
PyObject* value;
if (!PyArg_ParseTuple(args, "O&O&O", &PyObjectToWorker, &worker, &PyObjectToObjRef, &objref, &value)) {
return NULL;
}
// The following is reqired, because numbuf expects contiguous arrays at the moment.
// This is to make sure that we do not have to do reference counting inside numbuf, it is expected to change.
PyArrayObject* array = PyArray_GETCONTIGUOUS((PyArrayObject*) value); // TODO(pcm): put that into numbuf
worker->put_arrow(objref, (PyObject*) array);
Py_XDECREF(array); // GETCONTIGUOUS from above returned a new reference
Py_RETURN_NONE;
}
#endif
static PyObject* allocate_buffer(PyObject* self, PyObject* args) {
Worker* worker;
ObjRef objref;
@ -536,7 +516,7 @@ static PyObject* get_buffer(PyObject* self, PyObject* args) {
return NULL;
}
void* address = reinterpret_cast<void*>(const_cast<char*>(worker->get_buffer(objref, size, segmentid, metadata_offset)));
std::vector<npy_intp> dim({size});
std::vector<npy_intp> dim({static_cast<npy_intp>(size)});
PyObject* t = PyTuple_New(3);
PyTuple_SetItem(t, 0, PyArray_SimpleNewFromData(1, dim.data(), NPY_BYTE, address));
PyTuple_SetItem(t, 1, PyInt_FromLong(segmentid));
@ -544,25 +524,6 @@ static PyObject* get_buffer(PyObject* self, PyObject* args) {
return t;
}
#ifndef __APPLE__
static PyObject* get_arrow(PyObject* self, PyObject* args) {
Worker* worker;
ObjRef objref;
if (!PyArg_ParseTuple(args, "O&O&", &PyObjectToWorker, &worker, &PyObjectToObjRef, &objref)) {
return NULL;
}
SegmentId segmentid;
PyObject* value = worker->get_arrow(objref, segmentid);
PyObject* val_and_segmentid = PyList_New(2);
PyList_SetItem(val_and_segmentid, 0, value);
PyList_SetItem(val_and_segmentid, 1, PyInt_FromLong(segmentid));
return val_and_segmentid;
}
#endif
static PyObject* is_arrow(PyObject* self, PyObject* args) {
Worker* worker;
ObjRef objref;
@ -1009,15 +970,9 @@ static PyObject* kill_workers(PyObject* self, PyObject* args) {
static PyMethodDef RayLibMethods[] = {
{ "serialize_object", serialize_object, METH_VARARGS, "serialize an object to protocol buffers" },
{ "deserialize_object", deserialize_object, METH_VARARGS, "deserialize an object from protocol buffers" },
#ifndef __APPLE__
{ "put_arrow", put_arrow, METH_VARARGS, "put an arrow array on the local object store"},
#endif
{ "allocate_buffer", allocate_buffer, METH_VARARGS, "Allocates and returns buffer for objref."},
{ "finish_buffer", finish_buffer, METH_VARARGS, "Makes the buffer immutable and closes memory segment of objref."},
{ "get_buffer", get_buffer, METH_VARARGS, "Gets buffer for objref"},
#ifndef __APPLE__
{ "get_arrow", get_arrow, METH_VARARGS, "get an arrow array from the local object store"},
#endif
{ "is_arrow", is_arrow, METH_VARARGS, "is the object in the local object store an arrow object?"},
{ "unmap_object", unmap_object, METH_VARARGS, "unmap the object from the client's shared memory pool"},
{ "serialize_task", serialize_task, METH_VARARGS, "serialize a task to protocol buffers" },

View file

@ -5,10 +5,6 @@
#include "utils.h"
#ifndef __APPLE__
#include <pynumbuf/serialize.h>
#endif
extern "C" {
static PyObject *RayError;
}
@ -196,37 +192,6 @@ void Worker::put_object(ObjRef objref, const Obj* obj, std::vector<ObjRef> &cont
} \
} while (0);
#ifndef __APPLE__
PyObject* Worker::put_arrow(ObjRef objref, PyObject* value) {
RAY_CHECK(connected_, "Attempted to perform put_arrow but failed.");
ObjRequest request;
pynumbuf::PythonObjectWriter writer;
int64_t size;
CHECK_ARROW_STATUS(writer.AssemblePayload(value), "error during AssemblePayload: ");
CHECK_ARROW_STATUS(writer.GetTotalSize(&size), "error during GetTotalSize: ");
request.workerid = workerid_;
request.type = ObjRequestType::ALLOC;
request.objref = objref;
request.size = size;
RAY_CHECK(request_obj_queue_.send(&request), "error sending over IPC");
ObjHandle result;
RAY_CHECK(receive_obj_queue_.receive(&result), "error receiving over IPC");
int64_t metadata_offset;
uint8_t* address = segmentpool_->get_address(result);
auto source = std::make_shared<BufferMemorySource>(address, size);
CHECK_ARROW_STATUS(writer.Write(source.get(), &metadata_offset), "error during Write: ");
// We immediately unmap here; if the object is going to be accessed again, it will be mapped again;
// This is reqired because we do not have a mechanism to unmap the object later.
segmentpool_->unmap_segment(result.segmentid());
request.type = ObjRequestType::WORKER_DONE;
request.metadata_offset = metadata_offset;
RAY_CHECK(request_obj_queue_.send(&request), "error sending over IPC");
Py_RETURN_NONE;
}
#endif
const char* Worker::allocate_buffer(ObjRef objref, int64_t size, SegmentId& segmentid) {
RAY_CHECK(connected_, "Attempted to perform put_arrow but failed.");
ObjRequest request;
@ -269,29 +234,6 @@ const char* Worker::get_buffer(ObjRef objref, int64_t &size, SegmentId& segmenti
return address;
}
#ifndef __APPLE__
// returns python list containing the value represented by objref and the
// segmentid in which the object is stored
PyObject* Worker::get_arrow(ObjRef objref, SegmentId& segmentid) {
RAY_CHECK(connected_, "Attempted to perform get_arrow but failed.");
ObjRequest request;
request.workerid = workerid_;
request.type = ObjRequestType::GET;
request.objref = objref;
RAY_CHECK(request_obj_queue_.send(&request), "error sending over IPC");
ObjHandle result;
RAY_CHECK(receive_obj_queue_.receive(&result), "error receiving over IPC");
uint8_t* address = segmentpool_->get_address(result);
auto source = std::make_shared<BufferMemorySource>(address, result.size());
segmentid = result.segmentid();
PyObject* value;
CHECK_ARROW_STATUS(pynumbuf::ReadPythonObjectFrom(source.get(), result.metadata_offset(), &value), "error during ReadPythonObjectFrom: ");
return value;
}
#endif
bool Worker::is_arrow(ObjRef objref) {
RAY_CHECK(connected_, "Attempted to perform is_arrow but failed.");
ObjRequest request;

View file

@ -67,20 +67,12 @@ class Worker {
void put_object(ObjRef objref, const Obj* obj, std::vector<ObjRef> &contained_objrefs);
// retrieve serialized object from local object store
slice get_object(ObjRef objref);
#ifndef __APPLE__
// stores an arrow object to the local object store
PyObject* put_arrow(ObjRef objref, PyObject* array);
#endif
// Allocates buffer for objref with size of size
const char* allocate_buffer(ObjRef objref, int64_t size, SegmentId& segmentid);
// Finishes buffer with segmentid and an offset of metadata_ofset
PyObject* finish_buffer(ObjRef objref, SegmentId segmentid, int64_t metadata_offset);
// Gets the buffer for objref
const char* get_buffer(ObjRef objref, int64_t& size, SegmentId& segmentid, int64_t& metadata_offset);
#ifndef __APPLE__
// gets an arrow object from the local object store
PyObject* get_arrow(ObjRef objref, SegmentId& segmentid);
#endif
// determine if the object stored in objref is an arrow object // TODO(pcm): more general mechanism for this?
bool is_arrow(ObjRef objref);
// unmap the segment containing an object from the local address space

1
thirdparty/arrow vendored Submodule

@ -0,0 +1 @@
Subproject commit 0707062979e99e136656bf7ebfda7f4eb722b5ce

View file

@ -26,15 +26,6 @@ cd $TP_DIR/arrow/cpp/build
cmake -DLIBARROW_LINKAGE=STATIC -DCMAKE_BUILD_TYPE=Release ..
make VERBOSE=1 -j$PARALLEL
# TODO(pcm): Remove this
echo "building arrow (old version)"
cd $TP_DIR/arrow-old/cpp
source setup_build_env.sh
mkdir -p $TP_DIR/arrow-old/cpp/build
cd $TP_DIR/arrow-old/cpp/build
cmake -DLIBARROW_LINKAGE=STATIC -DCMAKE_BUILD_TYPE=Release ..
make VERBOSE=1 -j$PARALLEL
echo "building numbuf"
cd $TP_DIR/numbuf
mkdir -p build

View file

@ -5,23 +5,6 @@ set -e
TP_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)
if [ ! -d arrow ]; then
echo "Fetching arrow"
git clone https://github.com/pcmoritz/arrow.git
cd arrow
git checkout scratch
cd ..
fi
# TODO(pcm): Remove this
if [ ! -d arrow-old ]; then
echo "Fetching old version of arrow"
git clone https://github.com/pcmoritz/arrow.git arrow-old
cd arrow-old
git checkout static
cd ..
fi
git submodule update --init --recursive
# this seems to be neeccessary for building on Mac OS X

@ -1 +0,0 @@
Subproject commit 95f5fcfab9ce6d439ed9d5e3ca1800c368a80d5e