From 5591aa46653c1ee0e8decbaa77a413f1006f0f76 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Mon, 25 Jul 2016 15:47:10 -0700 Subject: [PATCH] cleanup serialization code (#291) --- .gitmodules | 6 ++-- CMakeLists.txt | 23 +----------- src/ipc.cc | 30 +--------------- src/ipc.h | 22 ------------ src/raylib.cc | 47 +------------------------ src/worker.cc | 58 ------------------------------- src/worker.h | 8 ----- thirdparty/arrow | 1 + thirdparty/build_thirdparty.sh | 9 ----- thirdparty/download_thirdparty.sh | 17 --------- thirdparty/numbuf-old | 1 - 11 files changed, 7 insertions(+), 215 deletions(-) create mode 160000 thirdparty/arrow delete mode 160000 thirdparty/numbuf-old diff --git a/.gitmodules b/.gitmodules index 65a586d5a..005d0f1ea 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,6 +4,6 @@ [submodule "thirdparty/numbuf"] path = thirdparty/numbuf url = https://github.com/pcmoritz/numbuf.git -[submodule "thirdparty/numbuf-old"] - path = thirdparty/numbuf-old - url = https://github.com/amplab/numbuf +[submodule "thirdparty/arrow"] + path = thirdparty/arrow + url = https://github.com/pcmoritz/arrow.git diff --git a/CMakeLists.txt b/CMakeLists.txt index 501021c0d..921befe2d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -131,31 +131,10 @@ set(ARROW_LIB ${CMAKE_SOURCE_DIR}/thirdparty/arrow-old/cpp/build/release/libarro add_definitions(-fPIC) -if(NOT APPLE) - include_directories("${CMAKE_SOURCE_DIR}/thirdparty/arrow-old/cpp/src/") - include_directories("${CMAKE_SOURCE_DIR}/thirdparty/numbuf-old/cpp/src/") - include_directories("${CMAKE_SOURCE_DIR}/thirdparty/numbuf-old/python/src/") - add_library(pynumbuf STATIC ${CMAKE_SOURCE_DIR}/thirdparty/numbuf-old/cpp/src/numbuf/tensor.cc - ${CMAKE_SOURCE_DIR}/thirdparty/numbuf-old/cpp/src/numbuf/types.cc - ${CMAKE_SOURCE_DIR}/thirdparty/numbuf-old/cpp/src/numbuf/metadata.cc - ${CMAKE_SOURCE_DIR}/thirdparty/numbuf-old/cpp/src/numbuf/dict.cc - ${CMAKE_SOURCE_DIR}/thirdparty/numbuf-old/python/src/pynumbuf/serialize.cc - ${CMAKE_SOURCE_DIR}/thirdparty/numbuf-old/python/src/pynumbuf/adapters/numpy.cc - ${CMAKE_SOURCE_DIR}/thirdparty/numbuf-old/python/src/pynumbuf/adapters/python.cc) - target_link_libraries(pynumbuf ${ARROW_LIB} ${PYTHON_LIBRARIES}) -endif() - add_executable(objstore src/objstore.cc src/ipc.cc src/utils.cc ${GENERATED_PROTOBUF_FILES}) -if(NOT APPLE) - target_link_libraries(objstore ${ARROW_LIB} pynumbuf) -endif() add_executable(scheduler src/scheduler.cc src/computation_graph.cc src/utils.cc ${GENERATED_PROTOBUF_FILES}) add_library(raylib SHARED src/raylib.cc src/worker.cc src/ipc.cc src/utils.cc ${GENERATED_PROTOBUF_FILES}) -if(NOT APPLE) - target_link_libraries(raylib ${ARROW_LIB} pynumbuf) -else() - target_link_libraries(raylib ${PYTHON_LIBRARIES}) -endif() +target_link_libraries(raylib ${PYTHON_LIBRARIES}) get_filename_component(PYTHON_SHARED_LIBRARY ${PYTHON_LIBRARIES} NAME) if(APPLE) diff --git a/src/ipc.cc b/src/ipc.cc index 8c2ad08ea..12ec411d2 100644 --- a/src/ipc.cc +++ b/src/ipc.cc @@ -7,38 +7,10 @@ #include #include "ray/ray.h" -#ifndef __APPLE__ - using namespace arrow; -#endif - ObjHandle::ObjHandle(SegmentId segmentid, size_t size, IpcPointer ipcpointer, size_t metadata_offset) : segmentid_(segmentid), size_(size), ipcpointer_(ipcpointer), metadata_offset_(metadata_offset) {} -#ifndef __APPLE__ - -Status BufferMemorySource::Write(int64_t position, const uint8_t* data, int64_t nbytes) { - // TODO(pcm): error handling - std::memcpy(data_ + position, data, nbytes); - return Status::OK(); -} - -Status BufferMemorySource::ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out) { - // TODO(pcm): error handling - *out = std::make_shared(data_ + position, nbytes); - return Status::OK(); -} - -Status BufferMemorySource::Close() { - return Status::OK(); -} - -int64_t BufferMemorySource::Size() const { - return size_; -} - -#endif - MessageQueue<>::MessageQueue() : create_(false) { } MessageQueue<>::~MessageQueue() { @@ -158,7 +130,7 @@ ObjHandle MemorySegmentPool::allocate(size_t size) { // TODO(pcm): at the moment, this always creates a new segment, this will be changed SegmentId segmentid = segments_.size(); open_segment(segmentid, size); - objstore_memcheck(size); + objstore_memcheck(size); void* ptr = segments_[segmentid].first->allocate(size); auto handle = segments_[segmentid].first->get_handle_from_address(ptr); return ObjHandle(segmentid, size, handle); diff --git a/src/ipc.h b/src/ipc.h index 8f9ee431a..0b82532a3 100644 --- a/src/ipc.h +++ b/src/ipc.h @@ -23,11 +23,6 @@ namespace boost { #include #include -#ifndef __APPLE__ - #include - #include -#endif - #include "ray/ray.h" namespace bip = boost::interprocess; @@ -110,23 +105,6 @@ private: size_t metadata_offset_; // offset of the metadata that describes this object }; -#ifndef __APPLE__ - -class BufferMemorySource: public arrow::ipc::MemorySource { -public: - BufferMemorySource(uint8_t* data, int64_t capacity) : data_(data), capacity_(capacity), size_(0) {} - virtual arrow::Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out); - virtual arrow::Status Close(); - virtual arrow::Status Write(int64_t position, const uint8_t* data, int64_t nbytes); - virtual int64_t Size() const; - private: - uint8_t* data_; - int64_t capacity_; - int64_t size_; -}; - -#endif - // Memory segment pool: A collection of shared memory segments // used in two modes: // \item on the object store it is used with create = true, in this case the diff --git a/src/raylib.cc b/src/raylib.cc index 5aee44d4c..43efe393e 100644 --- a/src/raylib.cc +++ b/src/raylib.cc @@ -6,9 +6,6 @@ #include #define PY_ARRAY_UNIQUE_SYMBOL RAYLIB_ARRAY_API #include -#ifndef __APPLE__ - #include -#endif #include #include "types.pb.h" @@ -482,23 +479,6 @@ static PyObject* serialize_object(PyObject* self, PyObject* args) { return t; } -#ifndef __APPLE__ -static PyObject* put_arrow(PyObject* self, PyObject* args) { - Worker* worker; - ObjRef objref; - PyObject* value; - if (!PyArg_ParseTuple(args, "O&O&O", &PyObjectToWorker, &worker, &PyObjectToObjRef, &objref, &value)) { - return NULL; - } - // The following is reqired, because numbuf expects contiguous arrays at the moment. - // This is to make sure that we do not have to do reference counting inside numbuf, it is expected to change. - PyArrayObject* array = PyArray_GETCONTIGUOUS((PyArrayObject*) value); // TODO(pcm): put that into numbuf - worker->put_arrow(objref, (PyObject*) array); - Py_XDECREF(array); // GETCONTIGUOUS from above returned a new reference - Py_RETURN_NONE; -} -#endif - static PyObject* allocate_buffer(PyObject* self, PyObject* args) { Worker* worker; ObjRef objref; @@ -536,7 +516,7 @@ static PyObject* get_buffer(PyObject* self, PyObject* args) { return NULL; } void* address = reinterpret_cast(const_cast(worker->get_buffer(objref, size, segmentid, metadata_offset))); - std::vector dim({size}); + std::vector dim({static_cast(size)}); PyObject* t = PyTuple_New(3); PyTuple_SetItem(t, 0, PyArray_SimpleNewFromData(1, dim.data(), NPY_BYTE, address)); PyTuple_SetItem(t, 1, PyInt_FromLong(segmentid)); @@ -544,25 +524,6 @@ static PyObject* get_buffer(PyObject* self, PyObject* args) { return t; } - -#ifndef __APPLE__ - -static PyObject* get_arrow(PyObject* self, PyObject* args) { - Worker* worker; - ObjRef objref; - if (!PyArg_ParseTuple(args, "O&O&", &PyObjectToWorker, &worker, &PyObjectToObjRef, &objref)) { - return NULL; - } - SegmentId segmentid; - PyObject* value = worker->get_arrow(objref, segmentid); - PyObject* val_and_segmentid = PyList_New(2); - PyList_SetItem(val_and_segmentid, 0, value); - PyList_SetItem(val_and_segmentid, 1, PyInt_FromLong(segmentid)); - return val_and_segmentid; -} - -#endif - static PyObject* is_arrow(PyObject* self, PyObject* args) { Worker* worker; ObjRef objref; @@ -1009,15 +970,9 @@ static PyObject* kill_workers(PyObject* self, PyObject* args) { static PyMethodDef RayLibMethods[] = { { "serialize_object", serialize_object, METH_VARARGS, "serialize an object to protocol buffers" }, { "deserialize_object", deserialize_object, METH_VARARGS, "deserialize an object from protocol buffers" }, -#ifndef __APPLE__ - { "put_arrow", put_arrow, METH_VARARGS, "put an arrow array on the local object store"}, -#endif { "allocate_buffer", allocate_buffer, METH_VARARGS, "Allocates and returns buffer for objref."}, { "finish_buffer", finish_buffer, METH_VARARGS, "Makes the buffer immutable and closes memory segment of objref."}, { "get_buffer", get_buffer, METH_VARARGS, "Gets buffer for objref"}, -#ifndef __APPLE__ - { "get_arrow", get_arrow, METH_VARARGS, "get an arrow array from the local object store"}, -#endif { "is_arrow", is_arrow, METH_VARARGS, "is the object in the local object store an arrow object?"}, { "unmap_object", unmap_object, METH_VARARGS, "unmap the object from the client's shared memory pool"}, { "serialize_task", serialize_task, METH_VARARGS, "serialize a task to protocol buffers" }, diff --git a/src/worker.cc b/src/worker.cc index 6dc37b243..3fa5e58e7 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -5,10 +5,6 @@ #include "utils.h" -#ifndef __APPLE__ - #include -#endif - extern "C" { static PyObject *RayError; } @@ -196,37 +192,6 @@ void Worker::put_object(ObjRef objref, const Obj* obj, std::vector &cont } \ } while (0); -#ifndef __APPLE__ - -PyObject* Worker::put_arrow(ObjRef objref, PyObject* value) { - RAY_CHECK(connected_, "Attempted to perform put_arrow but failed."); - ObjRequest request; - pynumbuf::PythonObjectWriter writer; - int64_t size; - CHECK_ARROW_STATUS(writer.AssemblePayload(value), "error during AssemblePayload: "); - CHECK_ARROW_STATUS(writer.GetTotalSize(&size), "error during GetTotalSize: "); - request.workerid = workerid_; - request.type = ObjRequestType::ALLOC; - request.objref = objref; - request.size = size; - RAY_CHECK(request_obj_queue_.send(&request), "error sending over IPC"); - ObjHandle result; - RAY_CHECK(receive_obj_queue_.receive(&result), "error receiving over IPC"); - int64_t metadata_offset; - uint8_t* address = segmentpool_->get_address(result); - auto source = std::make_shared(address, size); - CHECK_ARROW_STATUS(writer.Write(source.get(), &metadata_offset), "error during Write: "); - // We immediately unmap here; if the object is going to be accessed again, it will be mapped again; - // This is reqired because we do not have a mechanism to unmap the object later. - segmentpool_->unmap_segment(result.segmentid()); - request.type = ObjRequestType::WORKER_DONE; - request.metadata_offset = metadata_offset; - RAY_CHECK(request_obj_queue_.send(&request), "error sending over IPC"); - Py_RETURN_NONE; -} - -#endif - const char* Worker::allocate_buffer(ObjRef objref, int64_t size, SegmentId& segmentid) { RAY_CHECK(connected_, "Attempted to perform put_arrow but failed."); ObjRequest request; @@ -269,29 +234,6 @@ const char* Worker::get_buffer(ObjRef objref, int64_t &size, SegmentId& segmenti return address; } -#ifndef __APPLE__ - -// returns python list containing the value represented by objref and the -// segmentid in which the object is stored -PyObject* Worker::get_arrow(ObjRef objref, SegmentId& segmentid) { - RAY_CHECK(connected_, "Attempted to perform get_arrow but failed."); - ObjRequest request; - request.workerid = workerid_; - request.type = ObjRequestType::GET; - request.objref = objref; - RAY_CHECK(request_obj_queue_.send(&request), "error sending over IPC"); - ObjHandle result; - RAY_CHECK(receive_obj_queue_.receive(&result), "error receiving over IPC"); - uint8_t* address = segmentpool_->get_address(result); - auto source = std::make_shared(address, result.size()); - segmentid = result.segmentid(); - PyObject* value; - CHECK_ARROW_STATUS(pynumbuf::ReadPythonObjectFrom(source.get(), result.metadata_offset(), &value), "error during ReadPythonObjectFrom: "); - return value; -} - -#endif - bool Worker::is_arrow(ObjRef objref) { RAY_CHECK(connected_, "Attempted to perform is_arrow but failed."); ObjRequest request; diff --git a/src/worker.h b/src/worker.h index a35c331ef..02cdd31d0 100644 --- a/src/worker.h +++ b/src/worker.h @@ -67,20 +67,12 @@ class Worker { void put_object(ObjRef objref, const Obj* obj, std::vector &contained_objrefs); // retrieve serialized object from local object store slice get_object(ObjRef objref); -#ifndef __APPLE__ - // stores an arrow object to the local object store - PyObject* put_arrow(ObjRef objref, PyObject* array); -#endif // Allocates buffer for objref with size of size const char* allocate_buffer(ObjRef objref, int64_t size, SegmentId& segmentid); // Finishes buffer with segmentid and an offset of metadata_ofset PyObject* finish_buffer(ObjRef objref, SegmentId segmentid, int64_t metadata_offset); // Gets the buffer for objref const char* get_buffer(ObjRef objref, int64_t& size, SegmentId& segmentid, int64_t& metadata_offset); -#ifndef __APPLE__ - // gets an arrow object from the local object store - PyObject* get_arrow(ObjRef objref, SegmentId& segmentid); -#endif // determine if the object stored in objref is an arrow object // TODO(pcm): more general mechanism for this? bool is_arrow(ObjRef objref); // unmap the segment containing an object from the local address space diff --git a/thirdparty/arrow b/thirdparty/arrow new file mode 160000 index 000000000..070706297 --- /dev/null +++ b/thirdparty/arrow @@ -0,0 +1 @@ +Subproject commit 0707062979e99e136656bf7ebfda7f4eb722b5ce diff --git a/thirdparty/build_thirdparty.sh b/thirdparty/build_thirdparty.sh index a3d011619..c20c5f410 100755 --- a/thirdparty/build_thirdparty.sh +++ b/thirdparty/build_thirdparty.sh @@ -26,15 +26,6 @@ cd $TP_DIR/arrow/cpp/build cmake -DLIBARROW_LINKAGE=STATIC -DCMAKE_BUILD_TYPE=Release .. make VERBOSE=1 -j$PARALLEL -# TODO(pcm): Remove this -echo "building arrow (old version)" -cd $TP_DIR/arrow-old/cpp -source setup_build_env.sh -mkdir -p $TP_DIR/arrow-old/cpp/build -cd $TP_DIR/arrow-old/cpp/build -cmake -DLIBARROW_LINKAGE=STATIC -DCMAKE_BUILD_TYPE=Release .. -make VERBOSE=1 -j$PARALLEL - echo "building numbuf" cd $TP_DIR/numbuf mkdir -p build diff --git a/thirdparty/download_thirdparty.sh b/thirdparty/download_thirdparty.sh index b913d173a..02fa10af3 100755 --- a/thirdparty/download_thirdparty.sh +++ b/thirdparty/download_thirdparty.sh @@ -5,23 +5,6 @@ set -e TP_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd) -if [ ! -d arrow ]; then - echo "Fetching arrow" - git clone https://github.com/pcmoritz/arrow.git - cd arrow - git checkout scratch - cd .. -fi - -# TODO(pcm): Remove this -if [ ! -d arrow-old ]; then - echo "Fetching old version of arrow" - git clone https://github.com/pcmoritz/arrow.git arrow-old - cd arrow-old - git checkout static - cd .. -fi - git submodule update --init --recursive # this seems to be neeccessary for building on Mac OS X diff --git a/thirdparty/numbuf-old b/thirdparty/numbuf-old deleted file mode 160000 index 95f5fcfab..000000000 --- a/thirdparty/numbuf-old +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 95f5fcfab9ce6d439ed9d5e3ca1800c368a80d5e