mirror of
https://github.com/vale981/ray
synced 2025-03-08 11:31:40 -05:00
Use Arrow Tensors for serializing numpy arrays and get rid of extra memcpy. (#436)
* Use Arrow Tensors for serializing numpy arrays and get rid of extra memcpy * fix nondeterminism problem * mark array as immutable * make arrays contiguous * fix serialize_list and deseralize_list * fix numbuf tests * linting * add optimization flags * fixes * roll back arrow
This commit is contained in:
parent
c9d66555e2
commit
6ffc849d23
15 changed files with 197 additions and 389 deletions
|
@ -23,7 +23,7 @@ endif(APPLE)
|
|||
include_directories("${PYTHON_INCLUDE_DIRS}")
|
||||
include_directories("${NUMPY_INCLUDE_DIR}")
|
||||
|
||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
|
||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -march=native -mtune=native -O3")
|
||||
|
||||
if(UNIX AND NOT APPLE)
|
||||
link_libraries(rt)
|
||||
|
@ -38,6 +38,8 @@ set(ARROW_IO_LIB "${CMAKE_CURRENT_LIST_DIR}/thirdparty/arrow/cpp/build/release/l
|
|||
CACHE STRING "Path to libarrow_io.a (needs to be changed if arrow is build in debug mode)")
|
||||
set(ARROW_IPC_LIB "${CMAKE_CURRENT_LIST_DIR}/thirdparty/arrow/cpp/build/release/libarrow_ipc.a"
|
||||
CACHE STRING "Path to libarrow_ipc.a (needs to be changed if arrow is build in debug mode)")
|
||||
set(ARROW_PYTHON_LIB "${CMAKE_CURRENT_LIST_DIR}/thirdparty/arrow/cpp/build/release/libarrow_python.a"
|
||||
CACHE STRING "Path to libarrow_python.a (needs to be changed if arrow is build in debug mode)")
|
||||
|
||||
include_directories("${ARROW_DIR}/cpp/src/")
|
||||
include_directories("cpp/src/")
|
||||
|
@ -53,7 +55,6 @@ endif()
|
|||
add_definitions(-fPIC)
|
||||
|
||||
add_library(numbuf SHARED
|
||||
cpp/src/numbuf/tensor.cc
|
||||
cpp/src/numbuf/dict.cc
|
||||
cpp/src/numbuf/sequence.cc
|
||||
python/src/pynumbuf/numbuf.cc
|
||||
|
@ -68,15 +69,15 @@ if(APPLE)
|
|||
endif(APPLE)
|
||||
|
||||
if(APPLE)
|
||||
target_link_libraries(numbuf ${ARROW_LIB} ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${PYTHON_LIBRARIES} -lpthread)
|
||||
target_link_libraries(numbuf ${ARROW_LIB} ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${ARROW_PYTHON_LIB} ${PYTHON_LIBRARIES} -lpthread)
|
||||
else()
|
||||
target_link_libraries(numbuf -Wl,--whole-archive ${ARROW_LIB} -Wl,--no-whole-archive ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${PYTHON_LIBRARIES} -lpthread)
|
||||
target_link_libraries(numbuf -Wl,--whole-archive ${ARROW_LIB} -Wl,--no-whole-archive ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${ARROW_PYTHON_LIB} ${PYTHON_LIBRARIES} -lpthread)
|
||||
endif()
|
||||
|
||||
if(HAS_PLASMA)
|
||||
target_link_libraries(numbuf ${ARROW_LIB} ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${PYTHON_LIBRARIES} plasma_lib common ${FLATBUFFERS_STATIC_LIB} -lpthread)
|
||||
target_link_libraries(numbuf ${ARROW_LIB} ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${ARROW_PYTHON_LIB} ${PYTHON_LIBRARIES} plasma_lib common ${FLATBUFFERS_STATIC_LIB} -lpthread)
|
||||
else()
|
||||
target_link_libraries(numbuf ${ARROW_LIB} ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${PYTHON_LIBRARIES} -lpthread)
|
||||
target_link_libraries(numbuf ${ARROW_LIB} ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${ARROW_PYTHON_LIB} ${PYTHON_LIBRARIES} -lpthread)
|
||||
endif()
|
||||
|
||||
install(TARGETS numbuf DESTINATION ${CMAKE_SOURCE_DIR}/numbuf/)
|
||||
|
|
|
@ -15,27 +15,18 @@ SequenceBuilder::SequenceBuilder(MemoryPool* pool)
|
|||
strings_(pool),
|
||||
floats_(pool, std::make_shared<FloatType>()),
|
||||
doubles_(pool, std::make_shared<DoubleType>()),
|
||||
uint8_tensors_(std::make_shared<UInt8Type>(), pool),
|
||||
int8_tensors_(std::make_shared<Int8Type>(), pool),
|
||||
uint16_tensors_(std::make_shared<UInt16Type>(), pool),
|
||||
int16_tensors_(std::make_shared<Int16Type>(), pool),
|
||||
uint32_tensors_(std::make_shared<UInt32Type>(), pool),
|
||||
int32_tensors_(std::make_shared<Int32Type>(), pool),
|
||||
uint64_tensors_(std::make_shared<UInt64Type>(), pool),
|
||||
int64_tensors_(std::make_shared<Int64Type>(), pool),
|
||||
float_tensors_(std::make_shared<FloatType>(), pool),
|
||||
double_tensors_(std::make_shared<DoubleType>(), pool),
|
||||
tensor_indices_(pool, std::make_shared<Int32Type>()),
|
||||
list_offsets_({0}),
|
||||
tuple_offsets_({0}),
|
||||
dict_offsets_({0}) {}
|
||||
|
||||
#define UPDATE(OFFSET, TAG) \
|
||||
if (TAG == -1) { \
|
||||
TAG = num_tags; \
|
||||
num_tags += 1; \
|
||||
} \
|
||||
RETURN_NOT_OK(offsets_.Append(OFFSET)); \
|
||||
RETURN_NOT_OK(types_.Append(TAG)); \
|
||||
#define UPDATE(OFFSET, TAG) \
|
||||
if (TAG == -1) { \
|
||||
TAG = num_tags; \
|
||||
num_tags += 1; \
|
||||
} \
|
||||
RETURN_NOT_OK(offsets_.Append(OFFSET)); \
|
||||
RETURN_NOT_OK(types_.Append(TAG)); \
|
||||
RETURN_NOT_OK(nones_.AppendToBitmap(true));
|
||||
|
||||
Status SequenceBuilder::AppendNone() {
|
||||
|
@ -79,27 +70,10 @@ Status SequenceBuilder::AppendDouble(double data) {
|
|||
return doubles_.Append(data);
|
||||
}
|
||||
|
||||
#define DEF_TENSOR_APPEND(NAME, TYPE, TAG) \
|
||||
Status SequenceBuilder::AppendTensor(const std::vector<int64_t>& dims, TYPE* data) { \
|
||||
if (TAG == -1) { NAME.Start(); } \
|
||||
int64_t size = 1; \
|
||||
for (auto dim : dims) { \
|
||||
size *= dim; \
|
||||
} \
|
||||
UPDATE(NAME.length(), TAG); \
|
||||
return NAME.Append(dims, data); \
|
||||
}
|
||||
|
||||
DEF_TENSOR_APPEND(uint8_tensors_, uint8_t, uint8_tensor_tag);
|
||||
DEF_TENSOR_APPEND(int8_tensors_, int8_t, int8_tensor_tag);
|
||||
DEF_TENSOR_APPEND(uint16_tensors_, uint16_t, uint16_tensor_tag);
|
||||
DEF_TENSOR_APPEND(int16_tensors_, int16_t, int16_tensor_tag);
|
||||
DEF_TENSOR_APPEND(uint32_tensors_, uint32_t, uint32_tensor_tag);
|
||||
DEF_TENSOR_APPEND(int32_tensors_, int32_t, int32_tensor_tag);
|
||||
DEF_TENSOR_APPEND(uint64_tensors_, uint64_t, uint64_tensor_tag);
|
||||
DEF_TENSOR_APPEND(int64_tensors_, int64_t, int64_tensor_tag);
|
||||
DEF_TENSOR_APPEND(float_tensors_, float, float_tensor_tag);
|
||||
DEF_TENSOR_APPEND(double_tensors_, double, double_tensor_tag);
|
||||
Status SequenceBuilder::AppendTensor(int32_t tensor_index) {
|
||||
UPDATE(tensor_indices_.length(), tensor_tag);
|
||||
return tensor_indices_.Append(tensor_index);
|
||||
}
|
||||
|
||||
Status SequenceBuilder::AppendList(int32_t size) {
|
||||
UPDATE(list_offsets_.size() - 1, list_tag);
|
||||
|
@ -158,19 +132,7 @@ Status SequenceBuilder::Finish(std::shared_ptr<Array> list_data,
|
|||
ADD_ELEMENT(floats_, float_tag);
|
||||
ADD_ELEMENT(doubles_, double_tag);
|
||||
|
||||
ADD_ELEMENT(uint8_tensors_, uint8_tensor_tag);
|
||||
|
||||
ADD_ELEMENT(int8_tensors_, int8_tensor_tag);
|
||||
ADD_ELEMENT(uint16_tensors_, uint16_tensor_tag);
|
||||
ADD_ELEMENT(int16_tensors_, int16_tensor_tag);
|
||||
ADD_ELEMENT(uint32_tensors_, uint32_tensor_tag);
|
||||
|
||||
ADD_ELEMENT(int32_tensors_, int32_tensor_tag);
|
||||
ADD_ELEMENT(uint64_tensors_, uint64_tensor_tag);
|
||||
ADD_ELEMENT(int64_tensors_, int64_tensor_tag);
|
||||
|
||||
ADD_ELEMENT(float_tensors_, float_tensor_tag);
|
||||
ADD_ELEMENT(double_tensors_, double_tensor_tag);
|
||||
ADD_ELEMENT(tensor_indices_, tensor_tag);
|
||||
|
||||
ADD_SUBSEQUENCE(list_data, list_offsets_, list_builder, list_tag, "list");
|
||||
ADD_SUBSEQUENCE(tuple_data, tuple_offsets_, tuple_builder, tuple_tag, "tuple");
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
#ifndef NUMBUF_LIST_H
|
||||
#define NUMBUF_LIST_H
|
||||
|
||||
#include "tensor.h"
|
||||
#include <arrow/api.h>
|
||||
#include <arrow/util/logging.h>
|
||||
|
||||
namespace numbuf {
|
||||
|
||||
|
@ -49,23 +49,9 @@ class SequenceBuilder {
|
|||
|
||||
/*! Appending a tensor to the sequence
|
||||
|
||||
\param dims
|
||||
A vector of dimensions
|
||||
|
||||
\param data
|
||||
A pointer to the start of the data block. The length of the data block
|
||||
will be the product of the dimensions
|
||||
\param tensor_index Index of the tensor in the object.
|
||||
*/
|
||||
arrow::Status AppendTensor(const std::vector<int64_t>& dims, uint8_t* data);
|
||||
arrow::Status AppendTensor(const std::vector<int64_t>& dims, int8_t* data);
|
||||
arrow::Status AppendTensor(const std::vector<int64_t>& dims, uint16_t* data);
|
||||
arrow::Status AppendTensor(const std::vector<int64_t>& dims, int16_t* data);
|
||||
arrow::Status AppendTensor(const std::vector<int64_t>& dims, uint32_t* data);
|
||||
arrow::Status AppendTensor(const std::vector<int64_t>& dims, int32_t* data);
|
||||
arrow::Status AppendTensor(const std::vector<int64_t>& dims, uint64_t* data);
|
||||
arrow::Status AppendTensor(const std::vector<int64_t>& dims, int64_t* data);
|
||||
arrow::Status AppendTensor(const std::vector<int64_t>& dims, float* data);
|
||||
arrow::Status AppendTensor(const std::vector<int64_t>& dims, double* data);
|
||||
arrow::Status AppendTensor(int32_t tensor_index);
|
||||
|
||||
/*! Add a sublist to the sequence. The data contained in the sublist will be
|
||||
specified in the "Finish" method.
|
||||
|
@ -109,16 +95,11 @@ class SequenceBuilder {
|
|||
arrow::FloatBuilder floats_;
|
||||
arrow::DoubleBuilder doubles_;
|
||||
|
||||
UInt8TensorBuilder uint8_tensors_;
|
||||
Int8TensorBuilder int8_tensors_;
|
||||
UInt16TensorBuilder uint16_tensors_;
|
||||
Int16TensorBuilder int16_tensors_;
|
||||
UInt32TensorBuilder uint32_tensors_;
|
||||
Int32TensorBuilder int32_tensors_;
|
||||
UInt64TensorBuilder uint64_tensors_;
|
||||
Int64TensorBuilder int64_tensors_;
|
||||
FloatTensorBuilder float_tensors_;
|
||||
DoubleTensorBuilder double_tensors_;
|
||||
// We use an Int32Builder here to distinguish the tensor indices from
|
||||
// the ints_ above (see the case Type::INT32 in get_value in python.cc).
|
||||
// TODO(pcm): Replace this by using the union tags to distinguish between
|
||||
// these two cases.
|
||||
arrow::Int32Builder tensor_indices_;
|
||||
|
||||
std::vector<int32_t> list_offsets_;
|
||||
std::vector<int32_t> tuple_offsets_;
|
||||
|
@ -131,17 +112,7 @@ class SequenceBuilder {
|
|||
int8_t float_tag = -1;
|
||||
int8_t double_tag = -1;
|
||||
|
||||
int8_t uint8_tensor_tag = -1;
|
||||
int8_t int8_tensor_tag = -1;
|
||||
int8_t uint16_tensor_tag = -1;
|
||||
int8_t int16_tensor_tag = -1;
|
||||
int8_t uint32_tensor_tag = -1;
|
||||
int8_t int32_tensor_tag = -1;
|
||||
int8_t uint64_tensor_tag = -1;
|
||||
int8_t int64_tensor_tag = -1;
|
||||
int8_t float_tensor_tag = -1;
|
||||
int8_t double_tensor_tag = -1;
|
||||
|
||||
int8_t tensor_tag = -1;
|
||||
int8_t list_tag = -1;
|
||||
int8_t tuple_tag = -1;
|
||||
int8_t dict_tag = -1;
|
||||
|
|
|
@ -1,56 +0,0 @@
|
|||
#include "tensor.h"
|
||||
|
||||
using namespace arrow;
|
||||
|
||||
namespace numbuf {
|
||||
|
||||
template <typename T>
|
||||
TensorBuilder<T>::TensorBuilder(const TypePtr& dtype, MemoryPool* pool)
|
||||
: dtype_(dtype), pool_(pool) {}
|
||||
|
||||
template <typename T>
|
||||
Status TensorBuilder<T>::Start() {
|
||||
dim_data_ = std::make_shared<Int64Builder>(pool_, std::make_shared<Int64Type>());
|
||||
dims_ = std::make_shared<ListBuilder>(pool_, dim_data_);
|
||||
value_data_ = std::make_shared<PrimitiveBuilder<T>>(pool_, dtype_);
|
||||
values_ = std::make_shared<ListBuilder>(pool_, value_data_);
|
||||
auto dims_field = std::make_shared<Field>("dims", dims_->type());
|
||||
auto values_field = std::make_shared<Field>("data", values_->type());
|
||||
auto type =
|
||||
std::make_shared<StructType>(std::vector<FieldPtr>({dims_field, values_field}));
|
||||
tensors_ = std::make_shared<StructBuilder>(
|
||||
pool_, type, std::vector<std::shared_ptr<ArrayBuilder>>({dims_, values_}));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
Status TensorBuilder<T>::Append(const std::vector<int64_t>& dims, const elem_type* data) {
|
||||
DCHECK(tensors_);
|
||||
RETURN_NOT_OK(tensors_->Append());
|
||||
RETURN_NOT_OK(dims_->Append());
|
||||
RETURN_NOT_OK(values_->Append());
|
||||
int64_t size = 1;
|
||||
for (auto dim : dims) {
|
||||
size *= dim;
|
||||
RETURN_NOT_OK(dim_data_->Append(dim));
|
||||
}
|
||||
RETURN_NOT_OK(value_data_->Append(data, size));
|
||||
return Status::OK(); // tensors_->Append();
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
Status TensorBuilder<T>::Finish(std::shared_ptr<Array>* out) {
|
||||
return tensors_->Finish(out);
|
||||
}
|
||||
|
||||
template class TensorBuilder<UInt8Type>;
|
||||
template class TensorBuilder<Int8Type>;
|
||||
template class TensorBuilder<UInt16Type>;
|
||||
template class TensorBuilder<Int16Type>;
|
||||
template class TensorBuilder<UInt32Type>;
|
||||
template class TensorBuilder<Int32Type>;
|
||||
template class TensorBuilder<UInt64Type>;
|
||||
template class TensorBuilder<Int64Type>;
|
||||
template class TensorBuilder<FloatType>;
|
||||
template class TensorBuilder<DoubleType>;
|
||||
}
|
|
@ -1,65 +0,0 @@
|
|||
#ifndef NUMBUF_TENSOR_H
|
||||
#define NUMBUF_TENSOR_H
|
||||
|
||||
#include <arrow/api.h>
|
||||
#include <arrow/util/logging.h>
|
||||
#include <memory>
|
||||
|
||||
namespace numbuf {
|
||||
|
||||
/*! This is a class for building a dataframe where each row corresponds to
|
||||
a Tensor (= multidimensional array) of numerical data. There are two
|
||||
columns, "dims" which contains an array of dimensions for each Tensor
|
||||
and "data" which contains data buffer of the Tensor as a flattened array.
|
||||
*/
|
||||
template <typename T>
|
||||
class TensorBuilder {
|
||||
public:
|
||||
typedef typename T::c_type elem_type;
|
||||
|
||||
TensorBuilder(const arrow::TypePtr& dtype, arrow::MemoryPool* pool = nullptr);
|
||||
|
||||
arrow::Status Start();
|
||||
|
||||
/*! Append a new tensor.
|
||||
|
||||
\param dims
|
||||
The dimensions of the Tensor
|
||||
|
||||
\param data
|
||||
Pointer to the beginning of the data buffer of the Tensor. The
|
||||
total length of the buffer is sizeof(elem_type) * product of dims[i] over i
|
||||
*/
|
||||
arrow::Status Append(const std::vector<int64_t>& dims, const elem_type* data);
|
||||
|
||||
//! Convert the tensors to an Arrow StructArray
|
||||
arrow::Status Finish(std::shared_ptr<arrow::Array>* out);
|
||||
|
||||
//! Number of tensors in the column
|
||||
int32_t length() { return tensors_->length(); }
|
||||
|
||||
std::shared_ptr<arrow::DataType> type() { return tensors_->type(); }
|
||||
|
||||
private:
|
||||
arrow::TypePtr dtype_;
|
||||
arrow::MemoryPool* pool_;
|
||||
std::shared_ptr<arrow::Int64Builder> dim_data_;
|
||||
std::shared_ptr<arrow::ListBuilder> dims_;
|
||||
std::shared_ptr<arrow::PrimitiveBuilder<T>> value_data_;
|
||||
std::shared_ptr<arrow::ListBuilder> values_;
|
||||
std::shared_ptr<arrow::StructBuilder> tensors_;
|
||||
};
|
||||
|
||||
typedef TensorBuilder<arrow::UInt8Type> UInt8TensorBuilder;
|
||||
typedef TensorBuilder<arrow::Int8Type> Int8TensorBuilder;
|
||||
typedef TensorBuilder<arrow::UInt16Type> UInt16TensorBuilder;
|
||||
typedef TensorBuilder<arrow::Int16Type> Int16TensorBuilder;
|
||||
typedef TensorBuilder<arrow::UInt32Type> UInt32TensorBuilder;
|
||||
typedef TensorBuilder<arrow::Int32Type> Int32TensorBuilder;
|
||||
typedef TensorBuilder<arrow::UInt64Type> UInt64TensorBuilder;
|
||||
typedef TensorBuilder<arrow::Int64Type> Int64TensorBuilder;
|
||||
typedef TensorBuilder<arrow::FloatType> FloatTensorBuilder;
|
||||
typedef TensorBuilder<arrow::DoubleType> DoubleTensorBuilder;
|
||||
}
|
||||
|
||||
#endif // NUMBUF_TENSOR_H
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
#include <sstream>
|
||||
|
||||
#include <numbuf/tensor.h>
|
||||
#include <arrow/python/numpy_convert.h>
|
||||
|
||||
using namespace arrow;
|
||||
|
||||
|
@ -14,48 +14,11 @@ extern PyObject* numbuf_deserialize_callback;
|
|||
|
||||
namespace numbuf {
|
||||
|
||||
#define ARROW_TYPE_TO_NUMPY_CASE(TYPE) \
|
||||
case Type::TYPE: \
|
||||
return NPY_##TYPE;
|
||||
|
||||
#define DESERIALIZE_ARRAY_CASE(TYPE, ArrayType, type) \
|
||||
case Type::TYPE: { \
|
||||
auto values = std::dynamic_pointer_cast<ArrayType>(content->values()); \
|
||||
DCHECK(values); \
|
||||
type* data = const_cast<type*>(values->raw_data()) + content->value_offset(offset); \
|
||||
*out = PyArray_SimpleNewFromData( \
|
||||
num_dims, dim.data(), NPY_##TYPE, reinterpret_cast<void*>(data)); \
|
||||
if (base != Py_None) { PyArray_SetBaseObject((PyArrayObject*)*out, base); } \
|
||||
Py_XINCREF(base); \
|
||||
} break;
|
||||
|
||||
Status DeserializeArray(
|
||||
std::shared_ptr<Array> array, int32_t offset, PyObject* base, PyObject** out) {
|
||||
Status DeserializeArray(std::shared_ptr<Array> array, int32_t offset, PyObject* base,
|
||||
const std::vector<std::shared_ptr<arrow::Tensor>>& tensors, PyObject** out) {
|
||||
DCHECK(array);
|
||||
auto tensor = std::dynamic_pointer_cast<StructArray>(array);
|
||||
DCHECK(tensor);
|
||||
auto dims = std::dynamic_pointer_cast<ListArray>(tensor->field(0));
|
||||
auto content = std::dynamic_pointer_cast<ListArray>(tensor->field(1));
|
||||
npy_intp num_dims = dims->value_length(offset);
|
||||
std::vector<npy_intp> dim(num_dims);
|
||||
for (int i = dims->value_offset(offset); i < dims->value_offset(offset + 1); ++i) {
|
||||
dim[i - dims->value_offset(offset)] =
|
||||
std::dynamic_pointer_cast<Int64Array>(dims->values())->Value(i);
|
||||
}
|
||||
switch (content->value_type()->type) {
|
||||
DESERIALIZE_ARRAY_CASE(INT8, Int8Array, int8_t)
|
||||
DESERIALIZE_ARRAY_CASE(INT16, Int16Array, int16_t)
|
||||
DESERIALIZE_ARRAY_CASE(INT32, Int32Array, int32_t)
|
||||
DESERIALIZE_ARRAY_CASE(INT64, Int64Array, int64_t)
|
||||
DESERIALIZE_ARRAY_CASE(UINT8, UInt8Array, uint8_t)
|
||||
DESERIALIZE_ARRAY_CASE(UINT16, UInt16Array, uint16_t)
|
||||
DESERIALIZE_ARRAY_CASE(UINT32, UInt32Array, uint32_t)
|
||||
DESERIALIZE_ARRAY_CASE(UINT64, UInt64Array, uint64_t)
|
||||
DESERIALIZE_ARRAY_CASE(FLOAT, FloatArray, float)
|
||||
DESERIALIZE_ARRAY_CASE(DOUBLE, DoubleArray, double)
|
||||
default:
|
||||
DCHECK(false) << "arrow type not recognized: " << content->value_type()->type;
|
||||
}
|
||||
int32_t index = std::static_pointer_cast<Int32Array>(array)->Value(offset);
|
||||
RETURN_NOT_OK(py::TensorToNdarray(*tensors[index], base, out));
|
||||
/* Mark the array as immutable. */
|
||||
PyObject* flags = PyObject_GetAttrString(*out, "flags");
|
||||
DCHECK(flags != NULL) << "Could not mark Numpy array immutable";
|
||||
|
@ -65,51 +28,23 @@ Status DeserializeArray(
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SerializeArray(
|
||||
PyArrayObject* array, SequenceBuilder& builder, std::vector<PyObject*>& subdicts) {
|
||||
size_t ndim = PyArray_NDIM(array);
|
||||
Status SerializeArray(PyArrayObject* array, SequenceBuilder& builder,
|
||||
std::vector<PyObject*>& subdicts, std::vector<PyObject*>& tensors_out) {
|
||||
int dtype = PyArray_TYPE(array);
|
||||
std::vector<int64_t> dims(ndim);
|
||||
for (int i = 0; i < ndim; ++i) {
|
||||
dims[i] = PyArray_DIM(array, i);
|
||||
}
|
||||
// TODO(pcm): Once we don't use builders any more below and directly share
|
||||
// the memory buffer, we need to be more careful about this and not
|
||||
// decrease the reference count of "contiguous" before the serialization
|
||||
// is finished
|
||||
auto contiguous = PyArray_GETCONTIGUOUS(array);
|
||||
auto data = PyArray_DATA(contiguous);
|
||||
switch (dtype) {
|
||||
case NPY_UINT8:
|
||||
RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast<uint8_t*>(data)));
|
||||
break;
|
||||
case NPY_INT8:
|
||||
RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast<int8_t*>(data)));
|
||||
break;
|
||||
case NPY_UINT16:
|
||||
RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast<uint16_t*>(data)));
|
||||
break;
|
||||
case NPY_INT16:
|
||||
RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast<int16_t*>(data)));
|
||||
break;
|
||||
case NPY_UINT32:
|
||||
RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast<uint32_t*>(data)));
|
||||
break;
|
||||
case NPY_INT32:
|
||||
RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast<int32_t*>(data)));
|
||||
break;
|
||||
case NPY_UINT64:
|
||||
RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast<uint64_t*>(data)));
|
||||
break;
|
||||
case NPY_INT64:
|
||||
RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast<int64_t*>(data)));
|
||||
break;
|
||||
case NPY_FLOAT:
|
||||
RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast<float*>(data)));
|
||||
break;
|
||||
case NPY_DOUBLE:
|
||||
RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast<double*>(data)));
|
||||
break;
|
||||
case NPY_DOUBLE: {
|
||||
RETURN_NOT_OK(builder.AppendTensor(tensors_out.size()));
|
||||
tensors_out.push_back(reinterpret_cast<PyObject*>(array));
|
||||
} break;
|
||||
default:
|
||||
if (!numbuf_serialize_callback) {
|
||||
std::stringstream stream;
|
||||
|
@ -126,7 +61,6 @@ Status SerializeArray(
|
|||
subdicts.push_back(result);
|
||||
}
|
||||
}
|
||||
Py_XDECREF(contiguous);
|
||||
return Status::OK();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,18 +6,18 @@
|
|||
|
||||
#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION
|
||||
#define NO_IMPORT_ARRAY
|
||||
#define PY_ARRAY_UNIQUE_SYMBOL NUMBUF_ARRAY_API
|
||||
#define PY_ARRAY_UNIQUE_SYMBOL arrow_ARRAY_API
|
||||
#include <numpy/arrayobject.h>
|
||||
|
||||
#include <numbuf/sequence.h>
|
||||
#include <numbuf/tensor.h>
|
||||
|
||||
namespace numbuf {
|
||||
|
||||
arrow::Status SerializeArray(
|
||||
PyArrayObject* array, SequenceBuilder& builder, std::vector<PyObject*>& subdicts);
|
||||
arrow::Status DeserializeArray(
|
||||
std::shared_ptr<arrow::Array> array, int32_t offset, PyObject* base, PyObject** out);
|
||||
arrow::Status SerializeArray(PyArrayObject* array, SequenceBuilder& builder,
|
||||
std::vector<PyObject*>& subdicts, std::vector<PyObject*>& tensors_out);
|
||||
arrow::Status DeserializeArray(std::shared_ptr<arrow::Array> array, int32_t offset,
|
||||
PyObject* base, const std::vector<std::shared_ptr<arrow::Tensor>>& tensors,
|
||||
PyObject** out);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
|
|
@ -21,7 +21,7 @@ namespace numbuf {
|
|||
#endif
|
||||
|
||||
Status get_value(std::shared_ptr<Array> arr, int32_t index, int32_t type, PyObject* base,
|
||||
PyObject** result) {
|
||||
const std::vector<std::shared_ptr<Tensor>>& tensors, PyObject** result) {
|
||||
switch (arr->type()->type) {
|
||||
case Type::BOOL:
|
||||
*result =
|
||||
|
@ -57,17 +57,23 @@ Status get_value(std::shared_ptr<Array> arr, int32_t index, int32_t type, PyObje
|
|||
auto l = std::static_pointer_cast<ListArray>(s->field(0));
|
||||
if (s->type()->child(0)->name == "list") {
|
||||
return DeserializeList(l->values(), l->value_offset(index),
|
||||
l->value_offset(index + 1), base, result);
|
||||
l->value_offset(index + 1), base, tensors, result);
|
||||
} else if (s->type()->child(0)->name == "tuple") {
|
||||
return DeserializeTuple(l->values(), l->value_offset(index),
|
||||
l->value_offset(index + 1), base, result);
|
||||
l->value_offset(index + 1), base, tensors, result);
|
||||
} else if (s->type()->child(0)->name == "dict") {
|
||||
return DeserializeDict(l->values(), l->value_offset(index),
|
||||
l->value_offset(index + 1), base, result);
|
||||
l->value_offset(index + 1), base, tensors, result);
|
||||
} else {
|
||||
return DeserializeArray(arr, index, base, result);
|
||||
DCHECK(false) << "error";
|
||||
}
|
||||
}
|
||||
// We use an Int32Builder here to distinguish the tensor indices from
|
||||
// the Type::INT64 above (see tensor_indices_ in sequence.h).
|
||||
case Type::INT32: {
|
||||
int32_t val = std::static_pointer_cast<Int32Array>(arr)->Value(index);
|
||||
return DeserializeArray(arr, index, base, tensors, result);
|
||||
}
|
||||
default:
|
||||
DCHECK(false) << "union tag not recognized " << type;
|
||||
}
|
||||
|
@ -75,7 +81,8 @@ Status get_value(std::shared_ptr<Array> arr, int32_t index, int32_t type, PyObje
|
|||
}
|
||||
|
||||
Status append(PyObject* elem, SequenceBuilder& builder, std::vector<PyObject*>& sublists,
|
||||
std::vector<PyObject*>& subtuples, std::vector<PyObject*>& subdicts) {
|
||||
std::vector<PyObject*>& subtuples, std::vector<PyObject*>& subdicts,
|
||||
std::vector<PyObject*>& tensors_out) {
|
||||
// The bool case must precede the int case (PyInt_Check passes for bools)
|
||||
if (PyBool_Check(elem)) {
|
||||
RETURN_NOT_OK(builder.AppendBool(elem == Py_True));
|
||||
|
@ -119,7 +126,7 @@ Status append(PyObject* elem, SequenceBuilder& builder, std::vector<PyObject*>&
|
|||
} else if (PyArray_IsScalar(elem, Generic)) {
|
||||
RETURN_NOT_OK(AppendScalar(elem, builder));
|
||||
} else if (PyArray_Check(elem)) {
|
||||
RETURN_NOT_OK(SerializeArray((PyArrayObject*)elem, builder, subdicts));
|
||||
RETURN_NOT_OK(SerializeArray((PyArrayObject*)elem, builder, subdicts, tensors_out));
|
||||
} else if (elem == Py_None) {
|
||||
RETURN_NOT_OK(builder.AppendNone());
|
||||
} else {
|
||||
|
@ -143,7 +150,7 @@ Status append(PyObject* elem, SequenceBuilder& builder, std::vector<PyObject*>&
|
|||
}
|
||||
|
||||
Status SerializeSequences(std::vector<PyObject*> sequences, int32_t recursion_depth,
|
||||
std::shared_ptr<Array>* out) {
|
||||
std::shared_ptr<Array>* out, std::vector<PyObject*>& tensors_out) {
|
||||
DCHECK(out);
|
||||
if (recursion_depth >= MAX_RECURSION_DEPTH) {
|
||||
return Status::NotImplemented(
|
||||
|
@ -156,7 +163,7 @@ Status SerializeSequences(std::vector<PyObject*> sequences, int32_t recursion_de
|
|||
PyObject* item;
|
||||
PyObject* iterator = PyObject_GetIter(sequence);
|
||||
while ((item = PyIter_Next(iterator))) {
|
||||
Status s = append(item, builder, sublists, subtuples, subdicts);
|
||||
Status s = append(item, builder, sublists, subtuples, subdicts, tensors_out);
|
||||
Py_DECREF(item);
|
||||
// if an error occurs, we need to decrement the reference counts before returning
|
||||
if (!s.ok()) {
|
||||
|
@ -168,15 +175,16 @@ Status SerializeSequences(std::vector<PyObject*> sequences, int32_t recursion_de
|
|||
}
|
||||
std::shared_ptr<Array> list;
|
||||
if (sublists.size() > 0) {
|
||||
RETURN_NOT_OK(SerializeSequences(sublists, recursion_depth + 1, &list));
|
||||
RETURN_NOT_OK(SerializeSequences(sublists, recursion_depth + 1, &list, tensors_out));
|
||||
}
|
||||
std::shared_ptr<Array> tuple;
|
||||
if (subtuples.size() > 0) {
|
||||
RETURN_NOT_OK(SerializeSequences(subtuples, recursion_depth + 1, &tuple));
|
||||
RETURN_NOT_OK(
|
||||
SerializeSequences(subtuples, recursion_depth + 1, &tuple, tensors_out));
|
||||
}
|
||||
std::shared_ptr<Array> dict;
|
||||
if (subdicts.size() > 0) {
|
||||
RETURN_NOT_OK(SerializeDict(subdicts, recursion_depth + 1, &dict));
|
||||
RETURN_NOT_OK(SerializeDict(subdicts, recursion_depth + 1, &dict, tensors_out));
|
||||
}
|
||||
return builder.Finish(list, tuple, dict, out);
|
||||
}
|
||||
|
@ -196,7 +204,7 @@ Status SerializeSequences(std::vector<PyObject*> sequences, int32_t recursion_de
|
|||
int8_t type = types->Value(i); \
|
||||
std::shared_ptr<Array> arr = data->child(type); \
|
||||
PyObject* value; \
|
||||
RETURN_NOT_OK(get_value(arr, offset, type, base, &value)); \
|
||||
RETURN_NOT_OK(get_value(arr, offset, type, base, tensors, &value)); \
|
||||
SET_ITEM(result, i - start_idx, value); \
|
||||
} \
|
||||
} \
|
||||
|
@ -204,17 +212,17 @@ Status SerializeSequences(std::vector<PyObject*> sequences, int32_t recursion_de
|
|||
return Status::OK();
|
||||
|
||||
Status DeserializeList(std::shared_ptr<Array> array, int32_t start_idx, int32_t stop_idx,
|
||||
PyObject* base, PyObject** out) {
|
||||
PyObject* base, const std::vector<std::shared_ptr<Tensor>>& tensors, PyObject** out) {
|
||||
DESERIALIZE_SEQUENCE(PyList_New, PyList_SetItem)
|
||||
}
|
||||
|
||||
Status DeserializeTuple(std::shared_ptr<Array> array, int32_t start_idx, int32_t stop_idx,
|
||||
PyObject* base, PyObject** out) {
|
||||
PyObject* base, const std::vector<std::shared_ptr<Tensor>>& tensors, PyObject** out) {
|
||||
DESERIALIZE_SEQUENCE(PyTuple_New, PyTuple_SetItem)
|
||||
}
|
||||
|
||||
Status SerializeDict(
|
||||
std::vector<PyObject*> dicts, int32_t recursion_depth, std::shared_ptr<Array>* out) {
|
||||
Status SerializeDict(std::vector<PyObject*> dicts, int32_t recursion_depth,
|
||||
std::shared_ptr<Array>* out, std::vector<PyObject*>& tensors_out) {
|
||||
DictBuilder result;
|
||||
if (recursion_depth >= MAX_RECURSION_DEPTH) {
|
||||
return Status::NotImplemented(
|
||||
|
@ -226,26 +234,31 @@ Status SerializeDict(
|
|||
PyObject *key, *value;
|
||||
Py_ssize_t pos = 0;
|
||||
while (PyDict_Next(dict, &pos, &key, &value)) {
|
||||
RETURN_NOT_OK(append(key, result.keys(), dummy, key_tuples, dummy));
|
||||
RETURN_NOT_OK(append(key, result.keys(), dummy, key_tuples, dummy, tensors_out));
|
||||
DCHECK(dummy.size() == 0);
|
||||
RETURN_NOT_OK(append(value, result.vals(), val_lists, val_tuples, val_dicts));
|
||||
RETURN_NOT_OK(
|
||||
append(value, result.vals(), val_lists, val_tuples, val_dicts, tensors_out));
|
||||
}
|
||||
}
|
||||
std::shared_ptr<Array> key_tuples_arr;
|
||||
if (key_tuples.size() > 0) {
|
||||
RETURN_NOT_OK(SerializeSequences(key_tuples, recursion_depth + 1, &key_tuples_arr));
|
||||
RETURN_NOT_OK(SerializeSequences(
|
||||
key_tuples, recursion_depth + 1, &key_tuples_arr, tensors_out));
|
||||
}
|
||||
std::shared_ptr<Array> val_list_arr;
|
||||
if (val_lists.size() > 0) {
|
||||
RETURN_NOT_OK(SerializeSequences(val_lists, recursion_depth + 1, &val_list_arr));
|
||||
RETURN_NOT_OK(
|
||||
SerializeSequences(val_lists, recursion_depth + 1, &val_list_arr, tensors_out));
|
||||
}
|
||||
std::shared_ptr<Array> val_tuples_arr;
|
||||
if (val_tuples.size() > 0) {
|
||||
RETURN_NOT_OK(SerializeSequences(val_tuples, recursion_depth + 1, &val_tuples_arr));
|
||||
RETURN_NOT_OK(SerializeSequences(
|
||||
val_tuples, recursion_depth + 1, &val_tuples_arr, tensors_out));
|
||||
}
|
||||
std::shared_ptr<Array> val_dict_arr;
|
||||
if (val_dicts.size() > 0) {
|
||||
RETURN_NOT_OK(SerializeDict(val_dicts, recursion_depth + 1, &val_dict_arr));
|
||||
RETURN_NOT_OK(
|
||||
SerializeDict(val_dicts, recursion_depth + 1, &val_dict_arr, tensors_out));
|
||||
}
|
||||
result.Finish(key_tuples_arr, val_list_arr, val_tuples_arr, val_dict_arr, out);
|
||||
|
||||
|
@ -266,13 +279,15 @@ Status SerializeDict(
|
|||
}
|
||||
|
||||
Status DeserializeDict(std::shared_ptr<Array> array, int32_t start_idx, int32_t stop_idx,
|
||||
PyObject* base, PyObject** out) {
|
||||
PyObject* base, const std::vector<std::shared_ptr<Tensor>>& tensors, PyObject** out) {
|
||||
auto data = std::dynamic_pointer_cast<StructArray>(array);
|
||||
// TODO(pcm): error handling, get rid of the temporary copy of the list
|
||||
PyObject *keys, *vals;
|
||||
PyObject* result = PyDict_New();
|
||||
ARROW_RETURN_NOT_OK(DeserializeList(data->field(0), start_idx, stop_idx, base, &keys));
|
||||
ARROW_RETURN_NOT_OK(DeserializeList(data->field(1), start_idx, stop_idx, base, &vals));
|
||||
ARROW_RETURN_NOT_OK(
|
||||
DeserializeList(data->field(0), start_idx, stop_idx, base, tensors, &keys));
|
||||
ARROW_RETURN_NOT_OK(
|
||||
DeserializeList(data->field(1), start_idx, stop_idx, base, tensors, &vals));
|
||||
for (size_t i = start_idx; i < stop_idx; ++i) {
|
||||
PyDict_SetItem(
|
||||
result, PyList_GetItem(keys, i - start_idx), PyList_GetItem(vals, i - start_idx));
|
||||
|
|
|
@ -12,15 +12,19 @@
|
|||
namespace numbuf {
|
||||
|
||||
arrow::Status SerializeSequences(std::vector<PyObject*> sequences,
|
||||
int32_t recursion_depth, std::shared_ptr<arrow::Array>* out);
|
||||
int32_t recursion_depth, std::shared_ptr<arrow::Array>* out,
|
||||
std::vector<PyObject*>& tensors_out);
|
||||
arrow::Status SerializeDict(std::vector<PyObject*> dicts, int32_t recursion_depth,
|
||||
std::shared_ptr<arrow::Array>* out);
|
||||
std::shared_ptr<arrow::Array>* out, std::vector<PyObject*>& tensors_out);
|
||||
arrow::Status DeserializeList(std::shared_ptr<arrow::Array> array, int32_t start_idx,
|
||||
int32_t stop_idx, PyObject* base, PyObject** out);
|
||||
int32_t stop_idx, PyObject* base,
|
||||
const std::vector<std::shared_ptr<arrow::Tensor>>& tensors, PyObject** out);
|
||||
arrow::Status DeserializeTuple(std::shared_ptr<arrow::Array> array, int32_t start_idx,
|
||||
int32_t stop_idx, PyObject* base, PyObject** out);
|
||||
int32_t stop_idx, PyObject* base,
|
||||
const std::vector<std::shared_ptr<arrow::Tensor>>& tensors, PyObject** out);
|
||||
arrow::Status DeserializeDict(std::shared_ptr<arrow::Array> array, int32_t start_idx,
|
||||
int32_t stop_idx, PyObject* base, PyObject** out);
|
||||
int32_t stop_idx, PyObject* base,
|
||||
const std::vector<std::shared_ptr<arrow::Tensor>>& tensors, PyObject** out);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
#include <Python.h>
|
||||
#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION
|
||||
#define NO_IMPORT_ARRAY
|
||||
#define PY_ARRAY_UNIQUE_SYMBOL NUMBUF_ARRAY_API
|
||||
#define PY_ARRAY_UNIQUE_SYMBOL arrow_ARRAY_API
|
||||
#include <numpy/arrayobject.h>
|
||||
#include <numpy/arrayscalars.h>
|
||||
|
||||
|
|
|
@ -113,8 +113,7 @@ class FixedBufferStream : public arrow::io::OutputStream,
|
|||
|
||||
arrow::Status Read(int64_t nbytes, std::shared_ptr<arrow::Buffer>* out) override {
|
||||
DCHECK(out);
|
||||
DCHECK(position_ + nbytes <= size_) << "position: " << position_
|
||||
<< " nbytes: " << nbytes << "size: " << size_;
|
||||
if (position_ + nbytes > size_) { return arrow::Status::IOError("EOF"); }
|
||||
*out = std::make_shared<arrow::Buffer>(data_ + position_, nbytes);
|
||||
position_ += nbytes;
|
||||
return arrow::Status::OK();
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
#include <Python.h>
|
||||
#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION
|
||||
#define PY_ARRAY_UNIQUE_SYMBOL NUMBUF_ARRAY_API
|
||||
#define PY_ARRAY_UNIQUE_SYMBOL arrow_ARRAY_API
|
||||
#include <numpy/arrayobject.h>
|
||||
|
||||
#include "bytesobject.h"
|
||||
|
@ -9,6 +9,9 @@
|
|||
|
||||
#include <arrow/api.h>
|
||||
#include <arrow/ipc/api.h>
|
||||
#include <arrow/ipc/writer.h>
|
||||
|
||||
#include <arrow/python/numpy_convert.h>
|
||||
|
||||
#include "adapters/python.h"
|
||||
#include "memory.h"
|
||||
|
@ -27,39 +30,65 @@ PyObject* NumbufPlasmaObjectExistsError;
|
|||
|
||||
#endif
|
||||
|
||||
using namespace arrow;
|
||||
using namespace numbuf;
|
||||
|
||||
struct RayObject {
|
||||
std::shared_ptr<RecordBatch> batch;
|
||||
std::vector<PyObject*> arrays;
|
||||
std::vector<std::shared_ptr<Tensor>> tensors;
|
||||
};
|
||||
|
||||
// Each arrow object is stored in the format
|
||||
// | length of the object in bytes | object data |.
|
||||
// LENGTH_PREFIX_SIZE is the number of bytes occupied by the
|
||||
// object length field.
|
||||
constexpr int64_t LENGTH_PREFIX_SIZE = sizeof(int64_t);
|
||||
|
||||
using namespace arrow;
|
||||
using namespace numbuf;
|
||||
|
||||
std::shared_ptr<RecordBatch> make_batch(std::shared_ptr<Array> data) {
|
||||
auto field = std::make_shared<Field>("list", data->type());
|
||||
std::shared_ptr<Schema> schema(new Schema({field}));
|
||||
return std::shared_ptr<RecordBatch>(new RecordBatch(schema, data->length(), {data}));
|
||||
}
|
||||
|
||||
Status get_batch_size(std::shared_ptr<RecordBatch> batch, int64_t* size) {
|
||||
// Determine the size of the file by writing to a mock file.
|
||||
auto mock = std::make_shared<MockBufferStream>();
|
||||
Status write_batch_and_tensors(io::OutputStream* stream,
|
||||
std::shared_ptr<RecordBatch> batch, const std::vector<PyObject*>& tensors,
|
||||
int64_t* batch_size, int64_t* total_size) {
|
||||
std::shared_ptr<arrow::ipc::FileWriter> writer;
|
||||
RETURN_NOT_OK(ipc::FileWriter::Open(mock.get(), batch->schema(), &writer));
|
||||
RETURN_NOT_OK(ipc::FileWriter::Open(stream, batch->schema(), &writer));
|
||||
RETURN_NOT_OK(writer->WriteRecordBatch(*batch, true));
|
||||
RETURN_NOT_OK(writer->Close());
|
||||
RETURN_NOT_OK(mock->Tell(size));
|
||||
RETURN_NOT_OK(stream->Tell(batch_size));
|
||||
for (auto array : tensors) {
|
||||
int32_t metadata_length;
|
||||
int64_t body_length;
|
||||
std::shared_ptr<Tensor> tensor;
|
||||
auto contiguous = (PyObject*)PyArray_GETCONTIGUOUS((PyArrayObject*)array);
|
||||
RETURN_NOT_OK(py::NdarrayToTensor(NULL, contiguous, &tensor));
|
||||
RETURN_NOT_OK(ipc::WriteTensor(*tensor, stream, &metadata_length, &body_length));
|
||||
Py_XDECREF(contiguous);
|
||||
}
|
||||
RETURN_NOT_OK(stream->Tell(total_size));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status read_batch(uint8_t* data, int64_t size, std::shared_ptr<RecordBatch>* batch_out) {
|
||||
Status read_batch_and_tensors(uint8_t* data, int64_t size,
|
||||
std::shared_ptr<RecordBatch>* batch_out,
|
||||
std::vector<std::shared_ptr<Tensor>>& tensors_out) {
|
||||
std::shared_ptr<arrow::ipc::FileReader> reader;
|
||||
int64_t batch_size = *((int64_t*)data);
|
||||
auto source = std::make_shared<FixedBufferStream>(
|
||||
LENGTH_PREFIX_SIZE + data, size - LENGTH_PREFIX_SIZE);
|
||||
int64_t data_size = *((int64_t*)data);
|
||||
RETURN_NOT_OK(arrow::ipc::FileReader::Open(source, data_size, &reader));
|
||||
RETURN_NOT_OK(arrow::ipc::FileReader::Open(source, batch_size, &reader));
|
||||
RETURN_NOT_OK(reader->GetRecordBatch(0, batch_out));
|
||||
int64_t offset = batch_size;
|
||||
while (true) {
|
||||
std::shared_ptr<Tensor> tensor;
|
||||
Status s = ipc::ReadTensor(offset, source.get(), &tensor);
|
||||
if (!s.ok()) { break; }
|
||||
tensors_out.push_back(tensor);
|
||||
RETURN_NOT_OK(source->Tell(&offset));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
@ -81,10 +110,9 @@ static PyObject* NumbufError;
|
|||
PyObject* numbuf_serialize_callback = NULL;
|
||||
PyObject* numbuf_deserialize_callback = NULL;
|
||||
|
||||
int PyObjectToArrow(PyObject* object, std::shared_ptr<RecordBatch>** result) {
|
||||
int PyObjectToArrow(PyObject* object, RayObject** result) {
|
||||
if (PyCapsule_IsValid(object, "arrow")) {
|
||||
*result = reinterpret_cast<std::shared_ptr<RecordBatch>*>(
|
||||
PyCapsule_GetPointer(object, "arrow"));
|
||||
*result = reinterpret_cast<RayObject*>(PyCapsule_GetPointer(object, "arrow"));
|
||||
return 1;
|
||||
} else {
|
||||
PyErr_SetString(PyExc_TypeError, "must be an 'arrow' capsule");
|
||||
|
@ -93,8 +121,7 @@ int PyObjectToArrow(PyObject* object, std::shared_ptr<RecordBatch>** result) {
|
|||
}
|
||||
|
||||
static void ArrowCapsule_Destructor(PyObject* capsule) {
|
||||
delete reinterpret_cast<std::shared_ptr<RecordBatch>*>(
|
||||
PyCapsule_GetPointer(capsule, "arrow"));
|
||||
delete reinterpret_cast<RayObject*>(PyCapsule_GetPointer(capsule, "arrow"));
|
||||
}
|
||||
|
||||
/* Documented in doc/numbuf.rst in ray-core */
|
||||
|
@ -103,21 +130,31 @@ static PyObject* serialize_list(PyObject* self, PyObject* args) {
|
|||
if (!PyArg_ParseTuple(args, "O", &value)) { return NULL; }
|
||||
std::shared_ptr<Array> array;
|
||||
if (PyList_Check(value)) {
|
||||
RayObject* object = new RayObject();
|
||||
int32_t recursion_depth = 0;
|
||||
Status s =
|
||||
SerializeSequences(std::vector<PyObject*>({value}), recursion_depth, &array);
|
||||
Status s = SerializeSequences(
|
||||
std::vector<PyObject*>({value}), recursion_depth, &array, object->arrays);
|
||||
CHECK_SERIALIZATION_ERROR(s);
|
||||
|
||||
auto batch = new std::shared_ptr<RecordBatch>();
|
||||
*batch = make_batch(array);
|
||||
for (auto array : object->arrays) {
|
||||
int32_t metadata_length;
|
||||
int64_t body_length;
|
||||
std::shared_ptr<Tensor> tensor;
|
||||
ARROW_CHECK_OK(py::NdarrayToTensor(NULL, array, &tensor));
|
||||
object->tensors.push_back(tensor);
|
||||
}
|
||||
|
||||
int64_t size;
|
||||
ARROW_CHECK_OK(get_batch_size(*batch, &size));
|
||||
object->batch = make_batch(array);
|
||||
|
||||
int64_t data_size, total_size;
|
||||
auto mock = std::make_shared<MockBufferStream>();
|
||||
write_batch_and_tensors(
|
||||
mock.get(), object->batch, object->arrays, &data_size, &total_size);
|
||||
|
||||
PyObject* r = PyTuple_New(2);
|
||||
PyTuple_SetItem(r, 0, PyLong_FromLong(LENGTH_PREFIX_SIZE + size));
|
||||
PyTuple_SetItem(r, 1,
|
||||
PyCapsule_New(reinterpret_cast<void*>(batch), "arrow", &ArrowCapsule_Destructor));
|
||||
PyTuple_SetItem(r, 0, PyLong_FromLong(LENGTH_PREFIX_SIZE + total_size));
|
||||
PyTuple_SetItem(r, 1, PyCapsule_New(reinterpret_cast<void*>(object), "arrow",
|
||||
&ArrowCapsule_Destructor));
|
||||
return r;
|
||||
}
|
||||
return NULL;
|
||||
|
@ -125,9 +162,9 @@ static PyObject* serialize_list(PyObject* self, PyObject* args) {
|
|||
|
||||
/* Documented in doc/numbuf.rst in ray-core */
|
||||
static PyObject* write_to_buffer(PyObject* self, PyObject* args) {
|
||||
std::shared_ptr<RecordBatch>* batch;
|
||||
RayObject* object;
|
||||
PyObject* memoryview;
|
||||
if (!PyArg_ParseTuple(args, "O&O", &PyObjectToArrow, &batch, &memoryview)) {
|
||||
if (!PyArg_ParseTuple(args, "O&O", &PyObjectToArrow, &object, &memoryview)) {
|
||||
return NULL;
|
||||
}
|
||||
if (!PyMemoryView_Check(memoryview)) { return NULL; }
|
||||
|
@ -135,10 +172,9 @@ static PyObject* write_to_buffer(PyObject* self, PyObject* args) {
|
|||
auto target = std::make_shared<FixedBufferStream>(
|
||||
LENGTH_PREFIX_SIZE + reinterpret_cast<uint8_t*>(buffer->buf),
|
||||
buffer->len - LENGTH_PREFIX_SIZE);
|
||||
std::shared_ptr<arrow::ipc::FileWriter> writer;
|
||||
ARROW_CHECK_OK(ipc::FileWriter::Open(target.get(), (*batch)->schema(), &writer));
|
||||
ARROW_CHECK_OK(writer->WriteRecordBatch(*(*batch), true));
|
||||
ARROW_CHECK_OK(writer->Close());
|
||||
int64_t batch_size, total_size;
|
||||
ARROW_CHECK_OK(write_batch_and_tensors(
|
||||
target.get(), object->batch, object->arrays, &batch_size, &total_size));
|
||||
*((int64_t*)buffer->buf) = buffer->len - LENGTH_PREFIX_SIZE;
|
||||
Py_RETURN_NONE;
|
||||
}
|
||||
|
@ -150,20 +186,22 @@ static PyObject* read_from_buffer(PyObject* self, PyObject* args) {
|
|||
|
||||
Py_buffer* data_buffer = PyMemoryView_GET_BUFFER(data_memoryview);
|
||||
|
||||
auto batch = new std::shared_ptr<arrow::RecordBatch>();
|
||||
ARROW_CHECK_OK(
|
||||
read_batch(reinterpret_cast<uint8_t*>(data_buffer->buf), data_buffer->len, batch));
|
||||
RayObject* object = new RayObject();
|
||||
ARROW_CHECK_OK(read_batch_and_tensors(reinterpret_cast<uint8_t*>(data_buffer->buf),
|
||||
data_buffer->len, &object->batch, object->tensors));
|
||||
|
||||
return PyCapsule_New(reinterpret_cast<void*>(batch), "arrow", &ArrowCapsule_Destructor);
|
||||
return PyCapsule_New(
|
||||
reinterpret_cast<void*>(object), "arrow", &ArrowCapsule_Destructor);
|
||||
}
|
||||
|
||||
/* Documented in doc/numbuf.rst in ray-core */
|
||||
static PyObject* deserialize_list(PyObject* self, PyObject* args) {
|
||||
std::shared_ptr<RecordBatch>* data;
|
||||
RayObject* object;
|
||||
PyObject* base = Py_None;
|
||||
if (!PyArg_ParseTuple(args, "O&|O", &PyObjectToArrow, &data, &base)) { return NULL; }
|
||||
if (!PyArg_ParseTuple(args, "O&|O", &PyObjectToArrow, &object, &base)) { return NULL; }
|
||||
PyObject* result;
|
||||
Status s = DeserializeList((*data)->column(0), 0, (*data)->num_rows(), base, &result);
|
||||
Status s = DeserializeList(object->batch->column(0), 0, object->batch->num_rows(), base,
|
||||
object->tensors, &result);
|
||||
CHECK_SERIALIZATION_ERROR(s);
|
||||
return result;
|
||||
}
|
||||
|
@ -247,12 +285,16 @@ static PyObject* store_list(PyObject* self, PyObject* args) {
|
|||
|
||||
std::shared_ptr<Array> array;
|
||||
int32_t recursion_depth = 0;
|
||||
Status s = SerializeSequences(std::vector<PyObject*>({value}), recursion_depth, &array);
|
||||
std::vector<PyObject*> tensors;
|
||||
Status s = SerializeSequences(
|
||||
std::vector<PyObject*>({value}), recursion_depth, &array, tensors);
|
||||
CHECK_SERIALIZATION_ERROR(s);
|
||||
|
||||
std::shared_ptr<RecordBatch> batch = make_batch(array);
|
||||
int64_t size;
|
||||
ARROW_CHECK_OK(get_batch_size(batch, &size));
|
||||
|
||||
int64_t data_size, total_size;
|
||||
auto mock = std::make_shared<MockBufferStream>();
|
||||
write_batch_and_tensors(mock.get(), batch, tensors, &data_size, &total_size);
|
||||
|
||||
uint8_t* data;
|
||||
/* The arrow schema is stored as the metadata of the plasma object and
|
||||
|
@ -260,7 +302,8 @@ static PyObject* store_list(PyObject* self, PyObject* args) {
|
|||
* stored in the plasma data buffer. The header end offset is stored in
|
||||
* the first LENGTH_PREFIX_SIZE bytes of the data buffer. The RecordBatch
|
||||
* data is stored after that. */
|
||||
int error_code = plasma_create(conn, obj_id, LENGTH_PREFIX_SIZE + size, NULL, 0, &data);
|
||||
int error_code =
|
||||
plasma_create(conn, obj_id, LENGTH_PREFIX_SIZE + total_size, NULL, 0, &data);
|
||||
if (error_code == PlasmaError_ObjectExists) {
|
||||
PyErr_SetString(NumbufPlasmaObjectExistsError,
|
||||
"An object with this ID already exists in the plasma "
|
||||
|
@ -275,12 +318,10 @@ static PyObject* store_list(PyObject* self, PyObject* args) {
|
|||
}
|
||||
CHECK(error_code == PlasmaError_OK);
|
||||
|
||||
auto target = std::make_shared<FixedBufferStream>(LENGTH_PREFIX_SIZE + data, size);
|
||||
std::shared_ptr<arrow::ipc::FileWriter> writer;
|
||||
ipc::FileWriter::Open(target.get(), batch->schema(), &writer);
|
||||
writer->WriteRecordBatch(*batch, true);
|
||||
writer->Close();
|
||||
*((int64_t*)data) = size;
|
||||
auto target =
|
||||
std::make_shared<FixedBufferStream>(LENGTH_PREFIX_SIZE + data, total_size);
|
||||
write_batch_and_tensors(target.get(), batch, tensors, &data_size, &total_size);
|
||||
*((int64_t*)data) = data_size;
|
||||
|
||||
/* Do the plasma_release corresponding to the call to plasma_create. */
|
||||
plasma_release(conn, obj_id);
|
||||
|
@ -347,11 +388,13 @@ static PyObject* retrieve_list(PyObject* self, PyObject* args) {
|
|||
Py_XINCREF(plasma_conn);
|
||||
|
||||
auto batch = std::shared_ptr<RecordBatch>();
|
||||
ARROW_CHECK_OK(
|
||||
read_batch(object_buffers[i].data, object_buffers[i].data_size, &batch));
|
||||
std::vector<std::shared_ptr<Tensor>> tensors;
|
||||
ARROW_CHECK_OK(read_batch_and_tensors(
|
||||
object_buffers[i].data, object_buffers[i].data_size, &batch, tensors));
|
||||
|
||||
PyObject* result;
|
||||
Status s = DeserializeList(batch->column(0), 0, batch->num_rows(), base, &result);
|
||||
Status s =
|
||||
DeserializeList(batch->column(0), 0, batch->num_rows(), base, tensors, &result);
|
||||
CHECK_SERIALIZATION_ERROR(s);
|
||||
Py_XDECREF(base);
|
||||
|
||||
|
|
2
src/numbuf/thirdparty/build_thirdparty.sh
vendored
2
src/numbuf/thirdparty/build_thirdparty.sh
vendored
|
@ -24,5 +24,5 @@ echo "building arrow"
|
|||
cd $TP_DIR/arrow/cpp
|
||||
mkdir -p $TP_DIR/arrow/cpp/build
|
||||
cd $TP_DIR/arrow/cpp/build
|
||||
cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS="-g" -DCMAKE_CXX_FLAGS="-g" -DARROW_BUILD_TESTS=OFF ..
|
||||
cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS="-g -O3 -march=native -mtune=native" -DCMAKE_CXX_FLAGS="-g -O3 -march=native -mtune=native" -DARROW_BUILD_TESTS=OFF -DARROW_PYTHON=on ..
|
||||
make VERBOSE=1 -j$PARALLEL
|
||||
|
|
2
src/numbuf/thirdparty/download_thirdparty.sh
vendored
2
src/numbuf/thirdparty/download_thirdparty.sh
vendored
|
@ -11,4 +11,4 @@ if [ ! -d $TP_DIR/arrow ]; then
|
|||
git clone https://github.com/apache/arrow/ "$TP_DIR/arrow"
|
||||
fi
|
||||
cd $TP_DIR/arrow
|
||||
git checkout 067cd4ebfbd9be9b607658a2a249017cc6db84f9
|
||||
git checkout b0863cb63d62ae7c4a429164e5a2e350d3c1f21a
|
||||
|
|
|
@ -11,8 +11,8 @@ endif(APPLE)
|
|||
|
||||
include_directories("${PYTHON_INCLUDE_DIRS}" thirdparty)
|
||||
|
||||
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L")
|
||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} --std=c++11 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L")
|
||||
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -march=native -mtune=native -O3")
|
||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} --std=c++11 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -march=native -mtune=native -O3")
|
||||
|
||||
# Compile flatbuffers
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue