Merge numbuf.

This commit is contained in:
Philipp Moritz 2016-11-19 12:30:01 -08:00
commit 0b888c38f1
31 changed files with 2125 additions and 0 deletions

32
numbuf/.gitignore vendored Normal file
View file

@ -0,0 +1,32 @@
# Compiled Object files
*.slo
*.lo
*.o
*.obj
# Precompiled Headers
*.gch
*.pch
# Compiled Dynamic libraries
*.so
*.dylib
*.dll
# Fortran module files
*.mod
*.smod
# Compiled Static libraries
*.lai
*.la
*.a
*.lib
# Executables
*.exe
*.out
*.app
# Build directory
build

3
numbuf/.gitmodules vendored Normal file
View file

@ -0,0 +1,3 @@
[submodule "thirdparty/arrow"]
path = thirdparty/arrow
url = https://github.com/pcmoritz/arrow.git

25
numbuf/.travis.yml Normal file
View file

@ -0,0 +1,25 @@
sudo: required
language: generic
matrix:
include:
- os: linux
dist: trusty
python: "2.7"
- os: linux
dist: trusty
python: "3.5"
- os: osx
osx_image: xcode7
python: "2.7"
- os: osx
osx_image: xcode7
python: "3.5"
install:
- ./install-dependencies.sh
- python setup.py install --user
script:
- python python/test/runtest.py

113
numbuf/CMakeLists.txt Normal file
View file

@ -0,0 +1,113 @@
cmake_minimum_required(VERSION 2.8)
project(numbuf)
list(APPEND CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake/Modules)
# Make libnumbuf.so look for shared libraries in the folder libnumbuf.so is in
set(CMAKE_INSTALL_RPATH "$ORIGIN/")
set(CMAKE_MACOSX_RPATH 1)
if(NOT APPLE)
find_package(PythonInterp REQUIRED)
find_package(PythonLibs REQUIRED)
set(CUSTOM_PYTHON_EXECUTABLE ${PYTHON_EXECUTABLE})
else()
find_program(CUSTOM_PYTHON_EXECUTABLE python)
message("-- Found Python program: ${CUSTOM_PYTHON_EXECUTABLE}")
execute_process(COMMAND ${CUSTOM_PYTHON_EXECUTABLE} -c
"import sys; print 'python' + sys.version[0:3]"
OUTPUT_VARIABLE PYTHON_LIBRARY_NAME OUTPUT_STRIP_TRAILING_WHITESPACE)
execute_process(COMMAND ${CUSTOM_PYTHON_EXECUTABLE} -c
"import sys; print sys.exec_prefix"
OUTPUT_VARIABLE PYTHON_PREFIX OUTPUT_STRIP_TRAILING_WHITESPACE)
FIND_LIBRARY(PYTHON_LIBRARIES
NAMES ${PYTHON_LIBRARY_NAME}
HINTS "${PYTHON_PREFIX}"
PATH_SUFFIXES "lib" "libs"
NO_DEFAULT_PATH)
execute_process(COMMAND ${CUSTOM_PYTHON_EXECUTABLE} -c
"from distutils.sysconfig import *; print get_python_inc()"
OUTPUT_VARIABLE PYTHON_INCLUDE_DIRS OUTPUT_STRIP_TRAILING_WHITESPACE)
if(PYTHON_LIBRARIES AND PYTHON_INCLUDE_DIRS)
SET(PYTHONLIBS_FOUND TRUE)
message("-- Found PythonLibs: " ${PYTHON_LIBRARIES})
message("-- -- Used custom search path")
else()
find_package(PythonLibs REQUIRED)
message("-- -- Used find_package(PythonLibs)")
endif()
endif()
find_package(NumPy REQUIRED)
if(APPLE)
SET(CMAKE_SHARED_LIBRARY_SUFFIX ".so")
endif(APPLE)
include_directories("${PYTHON_INCLUDE_DIRS}")
include_directories("${NUMPY_INCLUDE_DIR}")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
if (UNIX AND NOT APPLE)
link_libraries(rt)
endif()
set(ARROW_DIR "${CMAKE_SOURCE_DIR}/thirdparty/arrow/" CACHE STRING
"Path of the arrow source directory")
if (APPLE)
set(ARROW_LIB "${CMAKE_SOURCE_DIR}/thirdparty/arrow/cpp/build/release/libarrow.dylib" CACHE STRING
"Path to libarrow.dylib (needs to be changed if arrow is build in debug mode)")
set(ARROW_IO_LIB "${CMAKE_SOURCE_DIR}/thirdparty/arrow/cpp/build/release/libarrow_io.dylib" CACHE STRING
"Path to libarrow_io.dylib (needs to be changed if arrow is build in debug mode)")
set(ARROW_IPC_LIB "${CMAKE_SOURCE_DIR}/thirdparty/arrow/cpp/build/release/libarrow_ipc.dylib" CACHE STRING
"Path to libarrow_ipc.dylib (needs to be changed if arrow is build in debug mode)")
else()
set(ARROW_LIB "${CMAKE_SOURCE_DIR}/thirdparty/arrow/cpp/build/release/libarrow.so" CACHE STRING
"Path to libarrow.so (needs to be changed if arrow is build in debug mode)")
set(ARROW_IO_LIB "${CMAKE_SOURCE_DIR}/thirdparty/arrow/cpp/build/release/libarrow_io.so" CACHE STRING
"Path to libarrow_io.so (needs to be changed if arrow is build in debug mode)")
set(ARROW_IPC_LIB "${CMAKE_SOURCE_DIR}/thirdparty/arrow/cpp/build/release/libarrow_ipc.so" CACHE STRING
"Path to libarrow_ipc.so (needs to be changed if arrow is build in debug mode)")
endif()
include_directories("${ARROW_DIR}/cpp/src/")
include_directories("cpp/src/")
include_directories("python/src/")
add_definitions(-fPIC)
add_library(numbuf SHARED
cpp/src/numbuf/tensor.cc
cpp/src/numbuf/dict.cc
cpp/src/numbuf/sequence.cc
python/src/pynumbuf/numbuf.cc
python/src/pynumbuf/adapters/numpy.cc
python/src/pynumbuf/adapters/python.cc)
get_filename_component(PYTHON_SHARED_LIBRARY ${PYTHON_LIBRARIES} NAME)
if(APPLE)
add_custom_command(TARGET numbuf
POST_BUILD COMMAND
${CMAKE_INSTALL_NAME_TOOL} -change ${PYTHON_SHARED_LIBRARY} ${PYTHON_LIBRARIES} libnumbuf.so)
add_custom_command(TARGET numbuf
POST_BUILD COMMAND
${CMAKE_INSTALL_NAME_TOOL} -change "@rpath/libarrow.dylib" "@loader_path/libarrow.dylib" libnumbuf.so)
add_custom_command(TARGET numbuf
POST_BUILD COMMAND
${CMAKE_INSTALL_NAME_TOOL} -change "@rpath/libarrow_io.dylib" "@loader_path/libarrow_io.dylib" libnumbuf.so)
add_custom_command(TARGET numbuf
POST_BUILD COMMAND
${CMAKE_INSTALL_NAME_TOOL} -change "@rpath/libarrow_ipc.dylib" "@loader_path/libarrow_ipc.dylib" libnumbuf.so)
endif(APPLE)
target_link_libraries(numbuf ${ARROW_LIB} ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${PYTHON_LIBRARIES})
install(TARGETS numbuf DESTINATION ${CMAKE_SOURCE_DIR}/numbuf/)
install(FILES ${ARROW_LIB} ${ARROW_IO_LIB} ${ARROW_IPC_LIB} DESTINATION ${CMAKE_SOURCE_DIR}/numbuf/)

201
numbuf/LICENSE Normal file
View file

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "{}"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright {yyyy} {name of copyright owner}
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

2
numbuf/README.md Normal file
View file

@ -0,0 +1,2 @@
# numbuf
Serializing primitive Python types in Arrow

20
numbuf/build.sh Executable file
View file

@ -0,0 +1,20 @@
#!/usr/bin/env bash
ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)
# Determine how many parallel jobs to use for make based on the number of cores
unamestr="$(uname)"
if [[ "$unamestr" == "Linux" ]]; then
PARALLEL=$(nproc)
elif [[ "$unamestr" == "Darwin" ]]; then
PARALLEL=$(sysctl -n hw.ncpu)
else
echo "Unrecognized platform."
exit 1
fi
mkdir -p "$ROOT_DIR/build"
pushd "$ROOT_DIR/build"
cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS="-g" -DCMAKE_CXX_FLAGS="-g" ..
make install -j$PARALLEL
popd

View file

@ -0,0 +1,54 @@
# - Find the NumPy libraries
# This module finds if NumPy is installed, and sets the following variables
# indicating where it is.
#
#
# NUMPY_FOUND - was NumPy found
# NUMPY_VERSION - the version of NumPy found as a string
# NUMPY_VERSION_MAJOR - the major version number of NumPy
# NUMPY_VERSION_MINOR - the minor version number of NumPy
# NUMPY_VERSION_PATCH - the patch version number of NumPy
# NUMPY_VERSION_DECIMAL - e.g. version 1.6.1 is 10601
# NUMPY_INCLUDE_DIR - path to the NumPy include files
unset(NUMPY_VERSION)
unset(NUMPY_INCLUDE_DIR)
if(NOT "${CUSTOM_PYTHON_EXECUTABLE}" STREQUAL "CUSTOM_PYTHON_EXECUTABLE-NOTFOUND")
execute_process(COMMAND "${CUSTOM_PYTHON_EXECUTABLE}" "-c"
"import numpy as n; print(n.__version__); print(n.get_include());"
RESULT_VARIABLE __result
OUTPUT_VARIABLE __output
OUTPUT_STRIP_TRAILING_WHITESPACE)
if(__result MATCHES 0)
string(REGEX REPLACE ";" "\\\\;" __values ${__output})
string(REGEX REPLACE "\r?\n" ";" __values ${__values})
list(GET __values 0 NUMPY_VERSION)
list(GET __values 1 NUMPY_INCLUDE_DIR)
string(REGEX MATCH "^([0-9])+\\.([0-9])+\\.([0-9])+" __ver_check "${NUMPY_VERSION}")
if(NOT "${__ver_check}" STREQUAL "")
set(NUMPY_VERSION_MAJOR ${CMAKE_MATCH_1})
set(NUMPY_VERSION_MINOR ${CMAKE_MATCH_2})
set(NUMPY_VERSION_PATCH ${CMAKE_MATCH_3})
math(EXPR NUMPY_VERSION_DECIMAL
"(${NUMPY_VERSION_MAJOR} * 10000) + (${NUMPY_VERSION_MINOR} * 100) + ${NUMPY_VERSION_PATCH}")
string(REGEX REPLACE "\\\\" "/" NUMPY_INCLUDE_DIR ${NUMPY_INCLUDE_DIR})
else()
unset(NUMPY_VERSION)
unset(NUMPY_INCLUDE_DIR)
message(STATUS "Requested NumPy version and include path, but got instead:\n${__output}\n")
endif()
endif()
else()
message(STATUS "To find NumPy Python executable is required to be found.")
endif()
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(NumPy REQUIRED_VARS NUMPY_INCLUDE_DIR NUMPY_VERSION
VERSION_VAR NUMPY_VERSION)
if(NUMPY_FOUND)
message(STATUS "NumPy ver. ${NUMPY_VERSION} found (include: ${NUMPY_INCLUDE_DIR})")
endif()

View file

@ -0,0 +1,27 @@
#include "dict.h"
using namespace arrow;
namespace numbuf {
Status DictBuilder::Finish(
std::shared_ptr<Array> key_tuple_data,
std::shared_ptr<Array> val_list_data,
std::shared_ptr<Array> val_tuple_data,
std::shared_ptr<Array> val_dict_data,
std::shared_ptr<arrow::Array>* out) {
// lists and dicts can't be keys of dicts in Python, that is why for
// the keys we do not need to collect sublists
std::shared_ptr<Array> keys, vals;
RETURN_NOT_OK(keys_.Finish(nullptr, key_tuple_data, nullptr, &keys));
RETURN_NOT_OK(vals_.Finish(val_list_data, val_tuple_data, val_dict_data, &vals));
auto keys_field = std::make_shared<Field>("keys", keys->type());
auto vals_field = std::make_shared<Field>("vals", vals->type());
auto type = std::make_shared<StructType>(std::vector<FieldPtr>({keys_field, vals_field}));
std::vector<ArrayPtr> field_arrays({keys, vals});
DCHECK(keys->length() == vals->length());
out->reset(new StructArray(type, keys->length(), field_arrays));
return Status::OK();
}
}

View file

@ -0,0 +1,50 @@
#ifndef NUMBUF_DICT_H
#define NUMBUF_DICT_H
#include <arrow/api.h>
#include "sequence.h"
namespace numbuf {
/*! Constructing dictionaries of key/value pairs. Sequences of
keys and values are built separately using a pair of
SequenceBuilders. The resulting Arrow representation
can be obtained via the Finish method.
*/
class DictBuilder {
public:
DictBuilder(arrow::MemoryPool* pool = nullptr)
: keys_(pool), vals_(pool) {}
//! Builder for the keys of the dictionary
SequenceBuilder& keys() { return keys_; }
//! Builder for the values of the dictionary
SequenceBuilder& vals() { return vals_; }
/*! Construct an Arrow StructArray representing the dictionary.
Contains a field "keys" for the keys and "vals" for the values.
\param list_data
List containing the data from nested lists in the value
list of the dictionary
\param dict_data
List containing the data from nested dictionaries in the
value list of the dictionary
*/
arrow::Status Finish(
std::shared_ptr<arrow::Array> key_tuple_data,
std::shared_ptr<arrow::Array> val_list_data,
std::shared_ptr<arrow::Array> val_tuple_data,
std::shared_ptr<arrow::Array> val_dict_data,
std::shared_ptr<arrow::Array>* out);
private:
SequenceBuilder keys_;
SequenceBuilder vals_;
};
}
#endif

View file

@ -0,0 +1,183 @@
#include "sequence.h"
using namespace arrow;
namespace numbuf {
SequenceBuilder::SequenceBuilder(MemoryPool* pool)
: pool_(pool),
types_(pool, std::make_shared<Int8Type>()),
offsets_(pool, std::make_shared<Int32Type>()),
nones_(pool, std::make_shared<NullType>()),
bools_(pool, std::make_shared<BooleanType>()),
ints_(pool, std::make_shared<Int64Type>()),
bytes_(pool, std::make_shared<BinaryType>()),
strings_(pool, std::make_shared<StringType>()),
floats_(pool, std::make_shared<FloatType>()),
doubles_(pool, std::make_shared<DoubleType>()),
uint8_tensors_(std::make_shared<UInt8Type>(), pool),
int8_tensors_(std::make_shared<Int8Type>(), pool),
uint16_tensors_(std::make_shared<UInt16Type>(), pool),
int16_tensors_(std::make_shared<Int16Type>(), pool),
uint32_tensors_(std::make_shared<UInt32Type>(), pool),
int32_tensors_(std::make_shared<Int32Type>(), pool),
uint64_tensors_(std::make_shared<UInt64Type>(), pool),
int64_tensors_(std::make_shared<Int64Type>(), pool),
float_tensors_(std::make_shared<FloatType>(), pool),
double_tensors_(std::make_shared<DoubleType>(), pool),
list_offsets_({0}), tuple_offsets_({0}), dict_offsets_({0}) {}
#define UPDATE(OFFSET, TAG) \
if (TAG == -1) { \
TAG = num_tags; \
num_tags += 1; \
} \
RETURN_NOT_OK(offsets_.Append(OFFSET)); \
RETURN_NOT_OK(types_.Append(TAG)); \
RETURN_NOT_OK(nones_.AppendToBitmap(true));
Status SequenceBuilder::AppendNone() {
RETURN_NOT_OK(offsets_.Append(0));
RETURN_NOT_OK(types_.Append(0));
return nones_.AppendToBitmap(false);
}
Status SequenceBuilder::AppendBool(bool data) {
UPDATE(bools_.length(), bool_tag);
return bools_.Append(data);
}
Status SequenceBuilder::AppendInt64(int64_t data) {
UPDATE(ints_.length(), int_tag);
return ints_.Append(data);
}
Status SequenceBuilder::AppendUInt64(uint64_t data) {
UPDATE(ints_.length(), int_tag);
return ints_.Append(data);
}
Status SequenceBuilder::AppendBytes(const uint8_t* data, int32_t length) {
UPDATE(bytes_.length(), bytes_tag);
return bytes_.Append(data, length);
}
Status SequenceBuilder::AppendString(const char* data, int32_t length) {
UPDATE(strings_.length(), string_tag);
return strings_.Append(data, length);
}
Status SequenceBuilder::AppendFloat(float data) {
UPDATE(floats_.length(), float_tag);
return floats_.Append(data);
}
Status SequenceBuilder::AppendDouble(double data) {
UPDATE(doubles_.length(), double_tag);
return doubles_.Append(data);
}
#define DEF_TENSOR_APPEND(NAME, TYPE, TAG) \
Status SequenceBuilder::AppendTensor(const std::vector<int64_t>& dims, TYPE* data) { \
if (TAG == -1) { \
NAME.Start(); \
} \
UPDATE(NAME.length(), TAG); \
return NAME.Append(dims, data); \
}
DEF_TENSOR_APPEND(uint8_tensors_, uint8_t, uint8_tensor_tag);
DEF_TENSOR_APPEND(int8_tensors_, int8_t, int8_tensor_tag);
DEF_TENSOR_APPEND(uint16_tensors_, uint16_t, uint16_tensor_tag);
DEF_TENSOR_APPEND(int16_tensors_, int16_t, int16_tensor_tag);
DEF_TENSOR_APPEND(uint32_tensors_, uint32_t, uint32_tensor_tag);
DEF_TENSOR_APPEND(int32_tensors_, int32_t, int32_tensor_tag);
DEF_TENSOR_APPEND(uint64_tensors_, uint64_t, uint64_tensor_tag);
DEF_TENSOR_APPEND(int64_tensors_, int64_t, int64_tensor_tag);
DEF_TENSOR_APPEND(float_tensors_, float, float_tensor_tag);
DEF_TENSOR_APPEND(double_tensors_, double, double_tensor_tag);
Status SequenceBuilder::AppendList(int32_t size) {
UPDATE(list_offsets_.size() - 1, list_tag);
list_offsets_.push_back(list_offsets_.back() + size);
return Status::OK();
}
Status SequenceBuilder::AppendTuple(int32_t size) {
UPDATE(tuple_offsets_.size() - 1, tuple_tag);
tuple_offsets_.push_back(tuple_offsets_.back() + size);
return Status::OK();
}
Status SequenceBuilder::AppendDict(int32_t size) {
UPDATE(dict_offsets_.size() - 1, dict_tag);
dict_offsets_.push_back(dict_offsets_.back() + size);
return Status::OK();
}
#define ADD_ELEMENT(VARNAME, TAG) \
if (TAG != -1) { \
types[TAG] = std::make_shared<Field>("", VARNAME.type()); \
RETURN_NOT_OK(VARNAME.Finish(&children[TAG])); \
RETURN_NOT_OK(nones_.AppendToBitmap(true)); \
}
#define ADD_SUBSEQUENCE(DATA, OFFSETS, BUILDER, TAG, NAME) \
if (DATA) { \
DCHECK(DATA->length() == OFFSETS.back()); \
auto list_builder = std::make_shared<ListBuilder>(pool_, DATA); \
auto field = std::make_shared<Field>(NAME, list_builder->type()); \
auto type = std::make_shared<StructType>(std::vector<FieldPtr>({field})); \
auto lists = std::vector<std::shared_ptr<ArrayBuilder>>({list_builder}); \
StructBuilder builder(pool_, type, lists); \
OFFSETS.pop_back(); \
ARROW_CHECK_OK(list_builder->Append(OFFSETS.data(), OFFSETS.size())); \
builder.Append(); \
ADD_ELEMENT(builder, TAG); \
} else { \
DCHECK(OFFSETS.size() == 1); \
}
Status SequenceBuilder::Finish(
std::shared_ptr<Array> list_data,
std::shared_ptr<Array> tuple_data,
std::shared_ptr<Array> dict_data,
std::shared_ptr<Array>* out) {
std::vector<std::shared_ptr<Field>> types(num_tags);
std::vector<ArrayPtr> children(num_tags);
ADD_ELEMENT(bools_, bool_tag);
ADD_ELEMENT(ints_, int_tag);
ADD_ELEMENT(strings_, string_tag);
ADD_ELEMENT(bytes_, bytes_tag);
ADD_ELEMENT(floats_, float_tag);
ADD_ELEMENT(doubles_, double_tag);
ADD_ELEMENT(uint8_tensors_, uint8_tensor_tag);
ADD_ELEMENT(int8_tensors_, int8_tensor_tag);
ADD_ELEMENT(uint16_tensors_, uint16_tensor_tag);
ADD_ELEMENT(int16_tensors_, int16_tensor_tag);
ADD_ELEMENT(uint32_tensors_, uint32_tensor_tag);
ADD_ELEMENT(int32_tensors_, int32_tensor_tag);
ADD_ELEMENT(uint64_tensors_, uint64_tensor_tag);
ADD_ELEMENT(int64_tensors_, int64_tensor_tag);
ADD_ELEMENT(float_tensors_, float_tensor_tag);
ADD_ELEMENT(double_tensors_, double_tensor_tag);
ADD_SUBSEQUENCE(list_data, list_offsets_, list_builder, list_tag, "list");
ADD_SUBSEQUENCE(tuple_data, tuple_offsets_, tuple_builder, tuple_tag, "tuple");
ADD_SUBSEQUENCE(dict_data, dict_offsets_, dict_builder, dict_tag, "dict");
std::vector<uint8_t> type_ids = {};
TypePtr type = TypePtr(new UnionType(types, type_ids, UnionMode::DENSE));
out->reset(new UnionArray(type, types_.length(),
children, types_.data(), offsets_.data(),
nones_.null_count(), nones_.null_bitmap()));
return Status::OK();
}
}

View file

@ -0,0 +1,144 @@
#ifndef NUMBUF_LIST_H
#define NUMBUF_LIST_H
#include <arrow/api.h>
#include <arrow/types/union.h>
#include "tensor.h"
namespace numbuf {
/*! A Sequence is a heterogeneous collections of elements. It can contain
scalar Python types, lists, tuples, dictionaries and tensors.
*/
class SequenceBuilder {
public:
SequenceBuilder(arrow::MemoryPool* pool = nullptr);
//! Appending a none to the sequence
arrow::Status AppendNone();
//! Appending a boolean to the sequence
arrow::Status AppendBool(bool data);
//! Appending an int64_t to the sequence
arrow::Status AppendInt64(int64_t data);
//! Appending an uint64_t to the sequence
arrow::Status AppendUInt64(uint64_t data);
//! Append a list of bytes to the sequence
arrow::Status AppendBytes(const uint8_t* data, int32_t length);
//! Appending a string to the sequence
arrow::Status AppendString(const char* data, int32_t length);
//! Appending a float to the sequence
arrow::Status AppendFloat(float data);
//! Appending a double to the sequence
arrow::Status AppendDouble(double data);
/*! Appending a tensor to the sequence
\param dims
A vector of dimensions
\param data
A pointer to the start of the data block. The length of the data block
will be the product of the dimensions
*/
arrow::Status AppendTensor(const std::vector<int64_t>& dims, uint8_t* data);
arrow::Status AppendTensor(const std::vector<int64_t>& dims, int8_t* data);
arrow::Status AppendTensor(const std::vector<int64_t>& dims, uint16_t* data);
arrow::Status AppendTensor(const std::vector<int64_t>& dims, int16_t* data);
arrow::Status AppendTensor(const std::vector<int64_t>& dims, uint32_t* data);
arrow::Status AppendTensor(const std::vector<int64_t>& dims, int32_t* data);
arrow::Status AppendTensor(const std::vector<int64_t>& dims, uint64_t* data);
arrow::Status AppendTensor(const std::vector<int64_t>& dims, int64_t* data);
arrow::Status AppendTensor(const std::vector<int64_t>& dims, float* data);
arrow::Status AppendTensor(const std::vector<int64_t>& dims, double* data);
/*! Add a sublist to the sequenc. The data contained in the sublist will be
specified in the "Finish" method.
To construct l = [[11, 22], 33, [44, 55]] you would for example run
list = ListBuilder();
list.AppendList(2);
list.Append(33);
list.AppendList(2);
list.Finish([11, 22, 44, 55]);
list.Finish();
\param size
The size of the sublist
*/
arrow::Status AppendList(int32_t size);
arrow::Status AppendTuple(int32_t size);
arrow::Status AppendDict(int32_t size);
//! Finish building the sequence and return the result
arrow::Status Finish(
std::shared_ptr<arrow::Array> list_data,
std::shared_ptr<arrow::Array> tuple_data,
std::shared_ptr<arrow::Array> dict_data,
std::shared_ptr<arrow::Array>* out);
private:
arrow::MemoryPool* pool_;
arrow::Int8Builder types_;
arrow::Int32Builder offsets_;
arrow::NullArrayBuilder nones_;
arrow::BooleanBuilder bools_;
arrow::Int64Builder ints_;
arrow::BinaryBuilder bytes_;
arrow::StringBuilder strings_;
arrow::FloatBuilder floats_;
arrow::DoubleBuilder doubles_;
UInt8TensorBuilder uint8_tensors_;
Int8TensorBuilder int8_tensors_;
UInt16TensorBuilder uint16_tensors_;
Int16TensorBuilder int16_tensors_;
UInt32TensorBuilder uint32_tensors_;
Int32TensorBuilder int32_tensors_;
UInt64TensorBuilder uint64_tensors_;
Int64TensorBuilder int64_tensors_;
FloatTensorBuilder float_tensors_;
DoubleTensorBuilder double_tensors_;
std::vector<int32_t> list_offsets_;
std::vector<int32_t> tuple_offsets_;
std::vector<int32_t> dict_offsets_;
int8_t bool_tag = -1;
int8_t int_tag = -1;
int8_t string_tag = -1;
int8_t bytes_tag = -1;
int8_t float_tag = -1;
int8_t double_tag = -1;
int8_t uint8_tensor_tag = -1;
int8_t int8_tensor_tag = -1;
int8_t uint16_tensor_tag = -1;
int8_t int16_tensor_tag = -1;
int8_t uint32_tensor_tag = -1;
int8_t int32_tensor_tag = -1;
int8_t uint64_tensor_tag = -1;
int8_t int64_tensor_tag = -1;
int8_t float_tensor_tag = -1;
int8_t double_tensor_tag = -1;
int8_t list_tag = -1;
int8_t tuple_tag = -1;
int8_t dict_tag = -1;
int8_t num_tags = 0;
};
} // namespace numbuf
#endif // NUMBUF_LIST_H

View file

@ -0,0 +1,55 @@
#include "tensor.h"
using namespace arrow;
namespace numbuf {
template<typename T>
TensorBuilder<T>::TensorBuilder(const TypePtr& dtype, MemoryPool* pool)
: dtype_(dtype), pool_(pool) {}
template<typename T>
Status TensorBuilder<T>::Start() {
dim_data_ = std::make_shared<Int64Builder>(pool_, std::make_shared<Int64Type>());
dims_ = std::make_shared<ListBuilder>(pool_, dim_data_);
value_data_ = std::make_shared<PrimitiveBuilder<T>>(pool_, dtype_);
values_ = std::make_shared<ListBuilder>(pool_, value_data_);
auto dims_field = std::make_shared<Field>("dims", dims_->type());
auto values_field = std::make_shared<Field>("data", values_->type());
auto type = std::make_shared<StructType>(std::vector<FieldPtr>({dims_field, values_field}));
tensors_ = std::make_shared<StructBuilder>(pool_, type, std::vector<std::shared_ptr<ArrayBuilder>>({dims_, values_}));
return Status::OK();
}
template<typename T>
Status TensorBuilder<T>::Append(const std::vector<int64_t>& dims, const elem_type* data) {
DCHECK(tensors_);
RETURN_NOT_OK(tensors_->Append());
RETURN_NOT_OK(dims_->Append());
RETURN_NOT_OK(values_->Append());
int32_t size = 1;
for (auto dim : dims) {
size *= dim;
RETURN_NOT_OK(dim_data_->Append(dim));
}
RETURN_NOT_OK(value_data_->Append(data, size));
return Status::OK(); // tensors_->Append();
}
template<typename T>
Status TensorBuilder<T>::Finish(std::shared_ptr<Array>* out) {
return tensors_->Finish(out);
}
template class TensorBuilder<UInt8Type>;
template class TensorBuilder<Int8Type>;
template class TensorBuilder<UInt16Type>;
template class TensorBuilder<Int16Type>;
template class TensorBuilder<UInt32Type>;
template class TensorBuilder<Int32Type>;
template class TensorBuilder<UInt64Type>;
template class TensorBuilder<Int64Type>;
template class TensorBuilder<FloatType>;
template class TensorBuilder<DoubleType>;
}

View file

@ -0,0 +1,70 @@
#ifndef NUMBUF_TENSOR_H
#define NUMBUF_TENSOR_H
#include <memory>
#include <arrow/type.h>
#include <arrow/api.h>
namespace numbuf {
/*! This is a class for building a dataframe where each row corresponds to
a Tensor (= multidimensional array) of numerical data. There are two
columns, "dims" which contains an array of dimensions for each Tensor
and "data" which contains data buffer of the Tensor as a flattened array.
*/
template<typename T>
class TensorBuilder {
public:
typedef typename T::c_type elem_type;
TensorBuilder(const arrow::TypePtr& dtype, arrow::MemoryPool* pool = nullptr);
arrow::Status Start();
/*! Append a new tensor.
\param dims
The dimensions of the Tensor
\param data
Pointer to the beginning of the data buffer of the Tensor. The
total length of the buffer is sizeof(elem_type) * product of dims[i] over i
*/
arrow::Status Append(const std::vector<int64_t>& dims, const elem_type* data);
//! Convert the tensors to an Arrow StructArray
arrow::Status Finish(std::shared_ptr<arrow::Array>* out);
//! Number of tensors in the column
int32_t length() {
return tensors_->length();
}
const arrow::TypePtr& type() {
return tensors_->type();
}
private:
arrow::TypePtr dtype_;
arrow::MemoryPool* pool_;
std::shared_ptr<arrow::Int64Builder> dim_data_;
std::shared_ptr<arrow::ListBuilder> dims_;
std::shared_ptr<arrow::PrimitiveBuilder<T>> value_data_;
std::shared_ptr<arrow::ListBuilder> values_;
std::shared_ptr<arrow::StructBuilder> tensors_;
};
typedef TensorBuilder<arrow::UInt8Type> UInt8TensorBuilder;
typedef TensorBuilder<arrow::Int8Type> Int8TensorBuilder;
typedef TensorBuilder<arrow::UInt16Type> UInt16TensorBuilder;
typedef TensorBuilder<arrow::Int16Type> Int16TensorBuilder;
typedef TensorBuilder<arrow::UInt32Type> UInt32TensorBuilder;
typedef TensorBuilder<arrow::Int32Type> Int32TensorBuilder;
typedef TensorBuilder<arrow::UInt64Type> UInt64TensorBuilder;
typedef TensorBuilder<arrow::Int64Type> Int64TensorBuilder;
typedef TensorBuilder<arrow::FloatType> FloatTensorBuilder;
typedef TensorBuilder<arrow::DoubleType> DoubleTensorBuilder;
}
#endif // NUMBUF_TENSOR_H

36
numbuf/install-dependencies.sh Executable file
View file

@ -0,0 +1,36 @@
#!/usr/bin/env bash
ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)
platform="unknown"
unamestr="$(uname)"
if [[ "$unamestr" == "Linux" ]]; then
echo "Platform is linux."
platform="linux"
elif [[ "$unamestr" == "Darwin" ]]; then
echo "Platform is macosx."
platform="macosx"
else
echo "Unrecognized platform."
exit 1
fi
if [[ $platform == "linux" ]]; then
# These commands must be kept in sync with the installation instructions.
sudo apt-get update
sudo apt-get install -y cmake build-essential autoconf libtool python-dev python-numpy libboost-all-dev
elif [[ $platform == "macosx" ]]; then
# check that brew is installed
which -s brew
if [[ $? != 0 ]]; then
echo "Could not find brew, please install brew (see http://brew.sh/)."
exit 1
else
echo "Updating brew."
brew update
fi
# These commands must be kept in sync with the installation instructions.
brew install cmake automake autoconf libtool boost
sudo easy_install pip
sudo pip install numpy --ignore-installed six
fi

View file

@ -0,0 +1 @@
from libnumbuf import *

View file

@ -0,0 +1,132 @@
#include "numpy.h"
#include "python.h"
#include <sstream>
#include <numbuf/tensor.h>
using namespace arrow;
extern "C" {
extern PyObject *numbuf_serialize_callback;
extern PyObject *numbuf_deserialize_callback;
}
namespace numbuf {
#define ARROW_TYPE_TO_NUMPY_CASE(TYPE) \
case Type::TYPE: \
return NPY_##TYPE;
#define DESERIALIZE_ARRAY_CASE(TYPE, ArrayType, type) \
case Type::TYPE: { \
auto values = std::dynamic_pointer_cast<ArrayType>(content->values()); \
DCHECK(values); \
type* data = const_cast<type*>(values->raw_data()) \
+ content->offset(offset); \
*out = PyArray_SimpleNewFromData(num_dims, dim.data(), NPY_##TYPE, \
reinterpret_cast<void*>(data)); \
if (base != Py_None) { \
PyArray_SetBaseObject((PyArrayObject*) *out, base); \
} \
Py_XINCREF(base); \
} \
return Status::OK();
Status DeserializeArray(std::shared_ptr<Array> array, int32_t offset, PyObject* base, PyObject** out) {
DCHECK(array);
auto tensor = std::dynamic_pointer_cast<StructArray>(array);
DCHECK(tensor);
auto dims = std::dynamic_pointer_cast<ListArray>(tensor->field(0));
auto content = std::dynamic_pointer_cast<ListArray>(tensor->field(1));
npy_intp num_dims = dims->value_length(offset);
std::vector<npy_intp> dim(num_dims);
for (int i = dims->offset(offset); i < dims->offset(offset+1); ++i) {
dim[i - dims->offset(offset)] =
std::dynamic_pointer_cast<Int64Array>(dims->values())->Value(i);
}
switch (content->value_type()->type) {
DESERIALIZE_ARRAY_CASE(INT8, Int8Array, int8_t)
DESERIALIZE_ARRAY_CASE(INT16, Int16Array, int16_t)
DESERIALIZE_ARRAY_CASE(INT32, Int32Array, int32_t)
DESERIALIZE_ARRAY_CASE(INT64, Int64Array, int64_t)
DESERIALIZE_ARRAY_CASE(UINT8, UInt8Array, uint8_t)
DESERIALIZE_ARRAY_CASE(UINT16, UInt16Array, uint16_t)
DESERIALIZE_ARRAY_CASE(UINT32, UInt32Array, uint32_t)
DESERIALIZE_ARRAY_CASE(UINT64, UInt64Array, uint64_t)
DESERIALIZE_ARRAY_CASE(FLOAT, FloatArray, float)
DESERIALIZE_ARRAY_CASE(DOUBLE, DoubleArray, double)
default:
DCHECK(false) << "arrow type not recognized: " << content->value_type()->type;
}
return Status::OK();
}
Status SerializeArray(PyArrayObject* array, SequenceBuilder& builder,
std::vector<PyObject*>& subdicts) {
size_t ndim = PyArray_NDIM(array);
int dtype = PyArray_TYPE(array);
std::vector<int64_t> dims(ndim);
for (int i = 0; i < ndim; ++i) {
dims[i] = PyArray_DIM(array, i);
}
// TODO(pcm): Once we don't use builders any more below and directly share
// the memory buffer, we need to be more careful about this and not
// decrease the reference count of "contiguous" before the serialization
// is finished
auto contiguous = PyArray_GETCONTIGUOUS(array);
auto data = PyArray_DATA(contiguous);
switch (dtype) {
case NPY_UINT8:
RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast<uint8_t*>(data)));
break;
case NPY_INT8:
RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast<int8_t*>(data)));
break;
case NPY_UINT16:
RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast<uint16_t*>(data)));
break;
case NPY_INT16:
RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast<int16_t*>(data)));
break;
case NPY_UINT32:
RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast<uint32_t*>(data)));
break;
case NPY_INT32:
RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast<int32_t*>(data)));
break;
case NPY_UINT64:
RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast<uint64_t*>(data)));
break;
case NPY_INT64:
RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast<int64_t*>(data)));
break;
case NPY_FLOAT:
RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast<float*>(data)));
break;
case NPY_DOUBLE:
RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast<double*>(data)));
break;
default:
if (!numbuf_serialize_callback) {
std::stringstream stream;
stream << "numpy data type not recognized: " << dtype;
return Status::NotImplemented(stream.str());
} else {
PyObject* arglist = Py_BuildValue("(O)", array);
// The reference count of the result of the call to PyObject_CallObject
// must be decremented. This is done in SerializeDict in python.cc.
PyObject* result = PyObject_CallObject(numbuf_serialize_callback, arglist);
Py_XDECREF(arglist);
if (!result) {
return Status::NotImplemented("python error"); // TODO(pcm): https://github.com/ray-project/numbuf/issues/10
}
builder.AppendDict(PyDict_Size(result));
subdicts.push_back(result);
}
}
Py_XDECREF(contiguous);
return Status::OK();
}
}

View file

@ -0,0 +1,22 @@
#ifndef PYNUMBUF_NUMPY_H
#define PYNUMBUF_NUMPY_H
#include <arrow/api.h>
#include <Python.h>
#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION
#define NO_IMPORT_ARRAY
#define PY_ARRAY_UNIQUE_SYMBOL NUMBUF_ARRAY_API
#include <numpy/arrayobject.h>
#include <numbuf/tensor.h>
#include <numbuf/sequence.h>
namespace numbuf {
arrow::Status SerializeArray(PyArrayObject* array, SequenceBuilder& builder, std::vector<PyObject*>& subdicts);
arrow::Status DeserializeArray(std::shared_ptr<arrow::Array> array, int32_t offset, PyObject* base, PyObject** out);
}
#endif

View file

@ -0,0 +1,281 @@
#include "python.h"
#include <sstream>
#include "scalars.h"
using namespace arrow;
int32_t MAX_RECURSION_DEPTH = 100;
extern "C" {
extern PyObject* numbuf_serialize_callback;
extern PyObject* numbuf_deserialize_callback;
}
namespace numbuf {
Status get_value(ArrayPtr arr, int32_t index, int32_t type, PyObject* base, PyObject** result) {
switch (arr->type()->type) {
case Type::BOOL:
*result = PyBool_FromLong(std::static_pointer_cast<BooleanArray>(arr)->Value(index));
return Status::OK();
case Type::INT64:
*result = PyInt_FromLong(std::static_pointer_cast<Int64Array>(arr)->Value(index));
return Status::OK();
case Type::BINARY: {
int32_t nchars;
const uint8_t* str = std::static_pointer_cast<BinaryArray>(arr)->GetValue(index, &nchars);
*result = PyString_FromStringAndSize(reinterpret_cast<const char*>(str), nchars);
return Status::OK();
}
case Type::STRING: {
int32_t nchars;
const uint8_t* str = std::static_pointer_cast<StringArray>(arr)->GetValue(index, &nchars);
*result = PyUnicode_FromStringAndSize(reinterpret_cast<const char*>(str), nchars);
return Status::OK();
}
case Type::FLOAT:
*result = PyFloat_FromDouble(std::static_pointer_cast<FloatArray>(arr)->Value(index));
return Status::OK();
case Type::DOUBLE:
*result = PyFloat_FromDouble(std::static_pointer_cast<DoubleArray>(arr)->Value(index));
return Status::OK();
case Type::STRUCT: {
auto s = std::static_pointer_cast<StructArray>(arr);
auto l = std::static_pointer_cast<ListArray>(s->field(0));
if (s->type()->child(0)->name == "list") {
return DeserializeList(l->values(), l->value_offset(index), l->value_offset(index+1), base, result);
} else if (s->type()->child(0)->name == "tuple") {
return DeserializeTuple(l->values(), l->value_offset(index), l->value_offset(index+1), base, result);
} else if (s->type()->child(0)->name == "dict") {
return DeserializeDict(l->values(), l->value_offset(index), l->value_offset(index+1), base, result);
} else {
return DeserializeArray(arr, index, base, result);
}
}
default:
DCHECK(false) << "union tag not recognized " << type;
}
return Status::OK();
}
Status append(PyObject* elem, SequenceBuilder& builder,
std::vector<PyObject*>& sublists,
std::vector<PyObject*>& subtuples,
std::vector<PyObject*>& subdicts) {
// The bool case must precede the int case (PyInt_Check passes for bools)
if (PyBool_Check(elem)) {
RETURN_NOT_OK(builder.AppendBool(elem == Py_True));
} else if (PyFloat_Check(elem)) {
RETURN_NOT_OK(builder.AppendDouble(PyFloat_AS_DOUBLE(elem)));
} else if (PyLong_Check(elem)) {
int overflow = 0;
int64_t data = PyLong_AsLongLongAndOverflow(elem, &overflow);
RETURN_NOT_OK(builder.AppendInt64(data));
if(overflow) {
return Status::NotImplemented("long overflow");
}
} else if (PyInt_Check(elem)) {
RETURN_NOT_OK(builder.AppendInt64(static_cast<int64_t>(PyInt_AS_LONG(elem))));
} else if (PyString_Check(elem)) {
auto data = reinterpret_cast<uint8_t*>(PyString_AS_STRING(elem));
auto size = PyString_GET_SIZE(elem);
RETURN_NOT_OK(builder.AppendBytes(data, size));
} else if (PyUnicode_Check(elem)) {
Py_ssize_t size;
#if PY_MAJOR_VERSION >= 3
char* data = PyUnicode_AsUTF8AndSize(elem, &size); // TODO(pcm): Check if this is correct
#else
PyObject* str = PyUnicode_AsUTF8String(elem);
char* data = PyString_AS_STRING(str);
size = PyString_GET_SIZE(str);
#endif
Status s = builder.AppendString(data, size);
Py_XDECREF(str);
RETURN_NOT_OK(s);
} else if (PyList_Check(elem)) {
builder.AppendList(PyList_Size(elem));
sublists.push_back(elem);
} else if (PyDict_Check(elem)) {
builder.AppendDict(PyDict_Size(elem));
subdicts.push_back(elem);
} else if (PyTuple_CheckExact(elem)) {
builder.AppendTuple(PyTuple_Size(elem));
subtuples.push_back(elem);
} else if (PyArray_IsScalar(elem, Generic)) {
RETURN_NOT_OK(AppendScalar(elem, builder));
} else if (PyArray_Check(elem)) {
RETURN_NOT_OK(SerializeArray((PyArrayObject*) elem, builder, subdicts));
} else if (elem == Py_None) {
RETURN_NOT_OK(builder.AppendNone());
} else {
if (!numbuf_serialize_callback) {
std::stringstream ss;
ss << "data type of " << PyString_AS_STRING(PyObject_Repr(elem))
<< " not recognized and custom serialization handler not registered";
return Status::NotImplemented(ss.str());
} else {
PyObject* arglist = Py_BuildValue("(O)", elem);
// The reference count of the result of the call to PyObject_CallObject
// must be decremented. This is done in SerializeDict in this file.
PyObject* result = PyObject_CallObject(numbuf_serialize_callback, arglist);
Py_XDECREF(arglist);
if (!result) {
return Status::NotImplemented("python error"); // TODO(pcm): https://github.com/ray-project/numbuf/issues/10
}
builder.AppendDict(PyDict_Size(result));
subdicts.push_back(result);
}
}
return Status::OK();
}
Status SerializeSequences(std::vector<PyObject*> sequences, int32_t recursion_depth, std::shared_ptr<Array>* out) {
DCHECK(out);
if (recursion_depth >= MAX_RECURSION_DEPTH) {
return Status::NotImplemented("This object exceeds the maximum recursion depth. It may contain itself recursively.");
}
SequenceBuilder builder(nullptr);
std::vector<PyObject*> sublists, subtuples, subdicts;
for (const auto& sequence : sequences) {
PyObject* item;
PyObject* iterator = PyObject_GetIter(sequence);
while ((item = PyIter_Next(iterator))) {
Status s = append(item, builder, sublists, subtuples, subdicts);
Py_DECREF(item);
// if an error occurs, we need to decrement the reference counts before returning
if (!s.ok()) {
Py_DECREF(iterator);
return s;
}
}
Py_DECREF(iterator);
}
std::shared_ptr<Array> list;
if (sublists.size() > 0) {
RETURN_NOT_OK(SerializeSequences(sublists, recursion_depth + 1, &list));
}
std::shared_ptr<Array> tuple;
if (subtuples.size() > 0) {
RETURN_NOT_OK(SerializeSequences(subtuples, recursion_depth + 1, &tuple));
}
std::shared_ptr<Array> dict;
if (subdicts.size() > 0) {
RETURN_NOT_OK(SerializeDict(subdicts, recursion_depth + 1, &dict));
}
return builder.Finish(list, tuple, dict, out);
}
#define DESERIALIZE_SEQUENCE(CREATE, SET_ITEM) \
auto data = std::dynamic_pointer_cast<UnionArray>(array); \
int32_t size = array->length(); \
PyObject* result = CREATE(stop_idx - start_idx); \
auto types = std::make_shared<Int8Array>(size, data->types()); \
auto offsets = std::make_shared<Int32Array>(size, data->offset_buf()); \
for (size_t i = start_idx; i < stop_idx; ++i) { \
if (data->IsNull(i)) { \
Py_INCREF(Py_None); \
SET_ITEM(result, i-start_idx, Py_None); \
} else { \
int32_t offset = offsets->Value(i); \
int8_t type = types->Value(i); \
ArrayPtr arr = data->child(type); \
PyObject* value; \
RETURN_NOT_OK(get_value(arr, offset, type, base, &value)); \
SET_ITEM(result, i-start_idx, value); \
} \
} \
*out = result; \
return Status::OK();
Status DeserializeList(std::shared_ptr<Array> array, int32_t start_idx, int32_t stop_idx, PyObject* base, PyObject** out) {
DESERIALIZE_SEQUENCE(PyList_New, PyList_SetItem)
}
Status DeserializeTuple(std::shared_ptr<Array> array, int32_t start_idx, int32_t stop_idx, PyObject* base, PyObject** out) {
DESERIALIZE_SEQUENCE(PyTuple_New, PyTuple_SetItem)
}
Status SerializeDict(std::vector<PyObject*> dicts, int32_t recursion_depth, std::shared_ptr<Array>* out) {
DictBuilder result;
if (recursion_depth >= MAX_RECURSION_DEPTH) {
return Status::NotImplemented("This object exceeds the maximum recursion depth. It may contain itself recursively.");
}
std::vector<PyObject*> key_tuples, val_lists, val_tuples, val_dicts, dummy;
for (const auto& dict : dicts) {
PyObject *key, *value;
Py_ssize_t pos = 0;
while (PyDict_Next(dict, &pos, &key, &value)) {
RETURN_NOT_OK(append(key, result.keys(), dummy, key_tuples, dummy));
DCHECK(dummy.size() == 0);
RETURN_NOT_OK(append(value, result.vals(), val_lists, val_tuples, val_dicts));
}
}
std::shared_ptr<Array> key_tuples_arr;
if (key_tuples.size() > 0) {
RETURN_NOT_OK(SerializeSequences(key_tuples, recursion_depth + 1, &key_tuples_arr));
}
std::shared_ptr<Array> val_list_arr;
if (val_lists.size() > 0) {
RETURN_NOT_OK(SerializeSequences(val_lists, recursion_depth + 1, &val_list_arr));
}
std::shared_ptr<Array> val_tuples_arr;
if (val_tuples.size() > 0) {
RETURN_NOT_OK(SerializeSequences(val_tuples, recursion_depth + 1, &val_tuples_arr));
}
std::shared_ptr<Array> val_dict_arr;
if (val_dicts.size() > 0) {
RETURN_NOT_OK(SerializeDict(val_dicts, recursion_depth + 1, &val_dict_arr));
}
result.Finish(key_tuples_arr, val_list_arr, val_tuples_arr, val_dict_arr, out);
// This block is used to decrement the reference counts of the results
// returned by the serialization callback, which is called in SerializeArray
// in numpy.cc as well as in DeserializeDict and in append in this file.
static PyObject* py_type = PyString_FromString("_pytype_");
for (const auto& dict : dicts) {
if (PyDict_Contains(dict, py_type)) {
// If the dictionary contains the key "_pytype_", then the user has to
// have registered a callback.
ARROW_CHECK(numbuf_serialize_callback);
Py_XDECREF(dict);
}
}
return Status::OK();
}
Status DeserializeDict(std::shared_ptr<Array> array, int32_t start_idx, int32_t stop_idx, PyObject* base, PyObject** out) {
auto data = std::dynamic_pointer_cast<StructArray>(array);
// TODO(pcm): error handling, get rid of the temporary copy of the list
PyObject *keys, *vals;
PyObject* result = PyDict_New();
ARROW_RETURN_NOT_OK(DeserializeList(data->field(0), start_idx, stop_idx, base, &keys));
ARROW_RETURN_NOT_OK(DeserializeList(data->field(1), start_idx, stop_idx, base, &vals));
for (size_t i = start_idx; i < stop_idx; ++i) {
PyDict_SetItem(result, PyList_GetItem(keys, i - start_idx), PyList_GetItem(vals, i - start_idx));
}
Py_XDECREF(keys); // PyList_GetItem(keys, ...) incremented the reference count
Py_XDECREF(vals); // PyList_GetItem(vals, ...) incremented the reference count
static PyObject* py_type = PyString_FromString("_pytype_");
if (PyDict_Contains(result, py_type) && numbuf_deserialize_callback) {
PyObject* arglist = Py_BuildValue("(O)", result);
// The result of the call to PyObject_CallObject will be passed to Python
// and its reference count will be decremented by the interpreter.
PyObject* callback_result = PyObject_CallObject(numbuf_deserialize_callback, arglist);
Py_XDECREF(arglist);
Py_XDECREF(result);
result = callback_result;
if (!callback_result) {
return Status::NotImplemented("python error"); // TODO(pcm): https://github.com/ray-project/numbuf/issues/10
}
}
*out = result;
return Status::OK();
}
}

View file

@ -0,0 +1,22 @@
#ifndef PYNUMBUF_PYTHON_H
#define PYNUMBUF_PYTHON_H
#include <Python.h>
#include <arrow/api.h>
#include <numbuf/dict.h>
#include <numbuf/sequence.h>
#include "numpy.h"
namespace numbuf {
arrow::Status SerializeSequences(std::vector<PyObject*> sequences, int32_t recursion_depth, std::shared_ptr<arrow::Array>* out);
arrow::Status SerializeDict(std::vector<PyObject*> dicts, int32_t recursion_depth, std::shared_ptr<arrow::Array>* out);
arrow::Status DeserializeList(std::shared_ptr<arrow::Array> array, int32_t start_idx, int32_t stop_idx, PyObject* base, PyObject** out);
arrow::Status DeserializeTuple(std::shared_ptr<arrow::Array> array, int32_t start_idx, int32_t stop_idx, PyObject* base, PyObject** out);
arrow::Status DeserializeDict(std::shared_ptr<arrow::Array> array, int32_t start_idx, int32_t stop_idx, PyObject* base, PyObject** out);
}
#endif

View file

@ -0,0 +1,54 @@
#ifndef PYNUMBUF_SCALARS_H
#define PYNUMBUF_SCALARS_H
#include <arrow/api.h>
#include <Python.h>
#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION
#define NO_IMPORT_ARRAY
#define PY_ARRAY_UNIQUE_SYMBOL NUMBUF_ARRAY_API
#include <numpy/arrayobject.h>
#include <numpy/arrayscalars.h>
#include <numbuf/sequence.h>
namespace numbuf {
arrow::Status AppendScalar(PyObject* obj, SequenceBuilder& builder) {
if (PyArray_IsScalar(obj, Bool)) {
return builder.AppendBool(((PyBoolScalarObject *)obj)->obval != 0);
} else if (PyArray_IsScalar(obj, Float)) {
return builder.AppendFloat(((PyFloatScalarObject *)obj)->obval);
} else if (PyArray_IsScalar(obj, Double)) {
return builder.AppendDouble(((PyDoubleScalarObject *)obj)->obval);
}
int64_t value = 0;
if (PyArray_IsScalar(obj, Byte)) {
value = ((PyByteScalarObject *)obj)->obval;
} else if (PyArray_IsScalar(obj, UByte)) {
value = ((PyUByteScalarObject *)obj)->obval;
} else if (PyArray_IsScalar(obj, Short)) {
value = ((PyShortScalarObject *)obj)->obval;
} else if (PyArray_IsScalar(obj, UShort)) {
value = ((PyUShortScalarObject *)obj)->obval;
} else if (PyArray_IsScalar(obj, Int)) {
value = ((PyIntScalarObject *)obj)->obval;
} else if (PyArray_IsScalar(obj, UInt)) {
value = ((PyUIntScalarObject *)obj)->obval;
} else if (PyArray_IsScalar(obj, Long)) {
value = ((PyLongScalarObject *)obj)->obval;
} else if (PyArray_IsScalar(obj, ULong)) {
value = ((PyULongScalarObject *)obj)->obval;
} else if (PyArray_IsScalar(obj, LongLong)) {
value = ((PyLongLongScalarObject *)obj)->obval;
} else if (PyArray_IsScalar(obj, ULongLong)) {
value = ((PyULongLongScalarObject *)obj)->obval;
} else {
DCHECK(false) << "scalar type not recognized";
}
return builder.AppendInt64(value);
}
} // namespace
#endif // PYNUMBUF_SCALARS_H

View file

@ -0,0 +1,68 @@
#ifndef PYNUMBUF_MEMORY_H
#define PYNUMBUF_MEMORY_H
#include <arrow/io/interfaces.h>
namespace numbuf {
class FixedBufferStream : public arrow::io::OutputStream, public arrow::io::ReadableFileInterface {
public:
virtual ~FixedBufferStream() {}
explicit FixedBufferStream(uint8_t* data, int64_t nbytes)
: data_(data), position_(0), size_(nbytes) {}
arrow::Status Read(int64_t nbytes, std::shared_ptr<arrow::Buffer>* out) override {
DCHECK(out);
DCHECK(position_ + nbytes <= size_) << "position: " << position_ << " nbytes: " << nbytes << "size: " << size_;
*out = std::make_shared<arrow::Buffer>(data_ + position_, nbytes);
position_ += nbytes;
return arrow::Status::OK();
}
arrow::Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
assert(0);
return arrow::Status::OK();
}
arrow::Status Seek(int64_t position) override {
position_ = position;
return arrow::Status::OK();
}
arrow::Status Close() override {
return arrow::Status::OK();
}
arrow::Status Tell(int64_t* position) override {
*position = position_;
return arrow::Status::OK();
}
arrow::Status Write(const uint8_t* data, int64_t nbytes) override {
DCHECK(position_ >= 0 && position_ < size_);
DCHECK(position_ + nbytes <= size_) << "position: " << position_ << " nbytes: " << nbytes << "size: " << size_;
uint8_t* dst = data_ + position_;
memcpy(dst, data, nbytes);
position_ += nbytes;
return arrow::Status::OK();
}
arrow::Status GetSize(int64_t *size) override {
*size = size_;
return arrow::Status::OK();
}
bool supports_zero_copy() const override {
return true;
}
private:
uint8_t* data_;
int64_t position_;
int64_t size_;
};
} // namespace numbuf
#endif // PYNUMBUF_MEMORY_H

View file

@ -0,0 +1,195 @@
#include <Python.h>
#include <arrow/api.h>
#include <arrow/ipc/adapter.h>
#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION
#define PY_ARRAY_UNIQUE_SYMBOL NUMBUF_ARRAY_API
#include <numpy/arrayobject.h>
#include <iostream>
#include <arrow/ipc/metadata.h>
#include "adapters/python.h"
#include "memory.h"
using namespace arrow;
using namespace numbuf;
std::shared_ptr<RecordBatch> make_row_batch(std::shared_ptr<Array> data) {
auto field = std::make_shared<Field>("list", data->type());
std::shared_ptr<Schema> schema(new Schema({field}));
return std::shared_ptr<RecordBatch>(new RecordBatch(schema, data->length(), {data}));
}
extern "C" {
static PyObject *NumbufError;
PyObject *numbuf_serialize_callback = NULL;
PyObject *numbuf_deserialize_callback = NULL;
int PyObjectToArrow(PyObject* object, std::shared_ptr<RecordBatch> **result) {
if (PyCapsule_IsValid(object, "arrow")) {
*result = reinterpret_cast<std::shared_ptr<RecordBatch>*>(PyCapsule_GetPointer(object, "arrow"));
return 1;
} else {
PyErr_SetString(PyExc_TypeError, "must be an 'arrow' capsule");
return 0;
}
}
static void ArrowCapsule_Destructor(PyObject* capsule) {
delete reinterpret_cast<std::shared_ptr<RecordBatch>*>(PyCapsule_GetPointer(capsule, "arrow"));
}
/* Documented in doc/numbuf.rst in ray-core */
static PyObject* serialize_list(PyObject* self, PyObject* args) {
PyObject* value;
if (!PyArg_ParseTuple(args, "O", &value)) {
return NULL;
}
std::shared_ptr<Array> array;
if (PyList_Check(value)) {
int32_t recursion_depth = 0;
Status s = SerializeSequences(std::vector<PyObject*>({value}), recursion_depth, &array);
if (!s.ok()) {
// If this condition is true, there was an error in the callback that
// needs to be passed through
if (!PyErr_Occurred()) {
PyErr_SetString(NumbufError, s.ToString().c_str());
}
return NULL;
}
auto batch = new std::shared_ptr<RecordBatch>();
*batch = make_row_batch(array);
int64_t size = 0;
ARROW_CHECK_OK(arrow::ipc::GetRecordBatchSize(batch->get(), &size));
std::shared_ptr<Buffer> buffer;
ARROW_CHECK_OK(ipc::WriteSchema((*batch)->schema().get(), &buffer));
auto ptr = reinterpret_cast<const char*>(buffer->data());
PyObject* r = PyTuple_New(3);
PyTuple_SetItem(r, 0, PyByteArray_FromStringAndSize(ptr, buffer->size()));
PyTuple_SetItem(r, 1, PyInt_FromLong(size));
PyTuple_SetItem(r, 2, PyCapsule_New(reinterpret_cast<void*>(batch),
"arrow", &ArrowCapsule_Destructor));
return r;
}
return NULL;
}
/* Documented in doc/numbuf.rst in ray-core */
static PyObject* write_to_buffer(PyObject* self, PyObject* args) {
std::shared_ptr<RecordBatch>* batch;
PyObject* memoryview;
if (!PyArg_ParseTuple(args, "O&O", &PyObjectToArrow, &batch, &memoryview)) {
return NULL;
}
if (!PyMemoryView_Check(memoryview)) {
return NULL;
}
Py_buffer* buffer = PyMemoryView_GET_BUFFER(memoryview);
auto target = std::make_shared<FixedBufferStream>(reinterpret_cast<uint8_t*>(buffer->buf), buffer->len);
int64_t body_end_offset;
int64_t header_end_offset;
ARROW_CHECK_OK(ipc::WriteRecordBatch((*batch)->columns(), (*batch)->num_rows(), target.get(), &body_end_offset, &header_end_offset));
return PyInt_FromLong(header_end_offset);
}
/* Documented in doc/numbuf.rst in ray-core */
static PyObject* read_from_buffer(PyObject* self, PyObject* args) {
PyObject* memoryview;
PyObject* metadata;
int64_t metadata_offset;
if (!PyArg_ParseTuple(args, "OOL", &memoryview, &metadata, &metadata_offset)) {
return NULL;
}
auto ptr = reinterpret_cast<uint8_t*>(PyByteArray_AsString(metadata));
auto schema_buffer = std::make_shared<Buffer>(ptr, PyByteArray_Size(metadata));
std::shared_ptr<ipc::Message> message;
ARROW_CHECK_OK(ipc::Message::Open(schema_buffer, &message));
DCHECK_EQ(ipc::Message::SCHEMA, message->type());
std::shared_ptr<ipc::SchemaMessage> schema_msg = message->GetSchema();
std::shared_ptr<Schema> schema;
ARROW_CHECK_OK(schema_msg->GetSchema(&schema));
Py_buffer* buffer = PyMemoryView_GET_BUFFER(memoryview);
auto source = std::make_shared<FixedBufferStream>(reinterpret_cast<uint8_t*>(buffer->buf), buffer->len);
std::shared_ptr<arrow::ipc::RecordBatchReader> reader;
ARROW_CHECK_OK(arrow::ipc::RecordBatchReader::Open(source.get(), metadata_offset, &reader));
auto batch = new std::shared_ptr<arrow::RecordBatch>();
ARROW_CHECK_OK(reader->GetRecordBatch(schema, batch));
return PyCapsule_New(reinterpret_cast<void*>(batch),
"arrow", &ArrowCapsule_Destructor);
}
/* Documented in doc/numbuf.rst in ray-core */
static PyObject* deserialize_list(PyObject* self, PyObject* args) {
std::shared_ptr<RecordBatch>* data;
PyObject* base = Py_None;
if (!PyArg_ParseTuple(args, "O&|O", &PyObjectToArrow, &data, &base)) {
return NULL;
}
PyObject* result;
Status s = DeserializeList((*data)->column(0), 0, (*data)->num_rows(), base, &result);
if (!s.ok()) {
// If this condition is true, there was an error in the callback that
// needs to be passed through
if (!PyErr_Occurred()) {
PyErr_SetString(NumbufError, s.ToString().c_str());
}
return NULL;
}
return result;
}
static PyObject* register_callbacks(PyObject* self, PyObject* args) {
PyObject* result = NULL;
PyObject* serialize_callback;
PyObject* deserialize_callback;
if (PyArg_ParseTuple(args, "OO:register_callbacks", &serialize_callback, &deserialize_callback)) {
if (!PyCallable_Check(serialize_callback)) {
PyErr_SetString(PyExc_TypeError, "serialize_callback must be callable");
return NULL;
}
if (!PyCallable_Check(deserialize_callback)) {
PyErr_SetString(PyExc_TypeError, "deserialize_callback must be callable");
return NULL;
}
Py_XINCREF(serialize_callback); // Add a reference to new serialization callback
Py_XINCREF(deserialize_callback); // Add a reference to new deserialization callback
Py_XDECREF(numbuf_serialize_callback); // Dispose of old serialization callback
Py_XDECREF(numbuf_deserialize_callback); // Dispose of old deserialization callback
numbuf_serialize_callback = serialize_callback;
numbuf_deserialize_callback = deserialize_callback;
Py_INCREF(Py_None);
result = Py_None;
}
return result;
}
static PyMethodDef NumbufMethods[] = {
{ "serialize_list", serialize_list, METH_VARARGS, "serialize a Python list" },
{ "deserialize_list", deserialize_list, METH_VARARGS, "deserialize a Python list" },
{ "write_to_buffer", write_to_buffer, METH_VARARGS, "write serialized data to buffer"},
{ "read_from_buffer", read_from_buffer, METH_VARARGS, "read serialized data from buffer"},
{ "register_callbacks", register_callbacks, METH_VARARGS, "set serialization and deserialization callbacks"},
{ NULL, NULL, 0, NULL }
};
PyMODINIT_FUNC initlibnumbuf(void) {
PyObject* m;
m = Py_InitModule3("libnumbuf", NumbufMethods, "Python C Extension for Numbuf");
char numbuf_error[] = "numbuf.error";
NumbufError = PyErr_NewException(numbuf_error, NULL, NULL);
Py_INCREF(NumbufError);
PyModule_AddObject(m, "numbuf_error", NumbufError);
import_array();
}
}

View file

@ -0,0 +1,114 @@
import unittest
import numbuf
import numpy as np
from numpy.testing import assert_equal
TEST_OBJECTS = [{(1,2) : 1}, {() : 2}, [1, "hello", 3.0], 42, 43L, "hello world",
u"x", u"\u262F", 42.0,
1L << 62, (1.0, "hi"),
None, (None, None), ("hello", None),
True, False, (True, False), "hello",
{True: "hello", False: "world"},
{"hello" : "world", 1: 42, 1.0: 45}, {},
np.int8(3), np.int32(4), np.int64(5),
np.uint8(3), np.uint32(4), np.uint64(5),
np.float32(1.0), np.float64(1.0)]
class SerializationTests(unittest.TestCase):
def roundTripTest(self, data):
schema, size, serialized = numbuf.serialize_list(data)
result = numbuf.deserialize_list(serialized)
assert_equal(data, result)
def testSimple(self):
self.roundTripTest([1, 2, 3])
self.roundTripTest([1.0, 2.0, 3.0])
self.roundTripTest(['hello', 'world'])
self.roundTripTest([1, 'hello', 1.0])
self.roundTripTest([{'hello': 1.0, 'world': 42}])
self.roundTripTest([True, False])
def testNone(self):
self.roundTripTest([1, 2, None, 3])
def testNested(self):
self.roundTripTest([{"hello": {"world": (1, 2, 3)}}])
self.roundTripTest([((1,), (1, 2, 3, (4, 5, 6), "string"))])
self.roundTripTest([{"hello": [1, 2, 3]}])
self.roundTripTest([{"hello": [1, [2, 3]]}])
self.roundTripTest([{"hello": (None, 2, [3, 4])}])
self.roundTripTest([{"hello": (None, 2, [3, 4], np.ndarray([1.0, 2.0, 3.0]))}])
def numpyTest(self, t):
a = np.random.randint(0, 10, size=(100, 100)).astype(t)
self.roundTripTest([a])
def testArrays(self):
for t in ["int8", "uint8", "int16", "uint16", "int32", "uint32", "float32", "float64"]:
self.numpyTest(t)
def testRay(self):
for obj in TEST_OBJECTS:
self.roundTripTest([obj])
def testCallback(self):
class Foo(object):
def __init__(self):
self.x = 1
class Bar(object):
def __init__(self):
self.foo = Foo()
def serialize(obj):
return dict(obj.__dict__, **{"_pytype_": type(obj).__name__})
def deserialize(obj):
if obj["_pytype_"] == "Foo":
result = Foo()
elif obj["_pytype_"] == "Bar":
result = Bar()
obj.pop("_pytype_", None)
result.__dict__ = obj
return result
bar = Bar()
bar.foo.x = 42
numbuf.register_callbacks(serialize, deserialize)
metadata, size, serialized = numbuf.serialize_list([bar])
self.assertEqual(numbuf.deserialize_list(serialized)[0].foo.x, 42)
def testObjectArray(self):
x = np.array([1, 2, "hello"], dtype=object)
y = np.array([[1, 2], [3, 4]], dtype=object)
def myserialize(obj):
return {"_pytype_": "numpy.array", "data": obj.tolist()}
def mydeserialize(obj):
if obj["_pytype_"] == "numpy.array":
return np.array(obj["data"], dtype=object)
numbuf.register_callbacks(myserialize, mydeserialize)
metadata, size, serialized = numbuf.serialize_list([x, y])
assert_equal(numbuf.deserialize_list(serialized), [x, y])
def testBuffer(self):
for (i, obj) in enumerate(TEST_OBJECTS):
schema, size, batch = numbuf.serialize_list([obj])
size = size + 4096 # INITIAL_METADATA_SIZE in arrow
buff = np.zeros(size, dtype="uint8")
metadata_offset = numbuf.write_to_buffer(batch, memoryview(buff))
array = numbuf.read_from_buffer(memoryview(buff), schema, metadata_offset)
result = numbuf.deserialize_list(array)
assert_equal(result[0], obj)
if __name__ == "__main__":
unittest.main(verbosity=2)

30
numbuf/setup.py Normal file
View file

@ -0,0 +1,30 @@
import subprocess
from setuptools import setup, find_packages, Extension
import setuptools.command.install as _install
from sys import platform
extension = ""
if platform == "linux" or platform == "linux2":
extension = ".so"
elif platform == "darwin":
extension = ".dylib"
# Because of relative paths, this must be run from inside numbuf/.
class install(_install.install):
def run(self):
subprocess.check_call(["./setup.sh"])
subprocess.check_call(["./build.sh"])
# Calling _install.install.run(self) does not fetch required packages and
# instead performs an old-style install. See command/install.py in
# setuptools. So, calling do_egg_install() manually here.
self.do_egg_install()
setup(name="numbuf",
version="0.0.1",
packages=find_packages(),
package_data={"numbuf": ["libnumbuf.so", "libarrow" + extension, "libarrow_io" + extension, "libarrow_ipc" + extension]},
cmdclass={"install": install},
setup_requires=["numpy"],
include_package_data=True,
zip_safe=False)

21
numbuf/setup.sh Executable file
View file

@ -0,0 +1,21 @@
#!/usr/bin/env bash
ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)
platform="unknown"
unamestr="$(uname)"
if [[ "$unamestr" == "Linux" ]]; then
echo "Platform is linux."
platform="linux"
elif [[ "$unamestr" == "Darwin" ]]; then
echo "Platform is macosx."
platform="macosx"
else
echo "Unrecognized platform."
exit 1
fi
pushd "$ROOT_DIR/thirdparty"
./download_thirdparty.sh
./build_thirdparty.sh
popd

1
numbuf/thirdparty/arrow vendored Submodule

@ -0,0 +1 @@
Subproject commit 58bd7bedc63d66d5898297bab25b54dfb67665db

26
numbuf/thirdparty/build_thirdparty.sh vendored Executable file
View file

@ -0,0 +1,26 @@
#!/bin/bash
set -x
set -e
TP_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)
PREFIX=$TP_DIR/installed
# Determine how many parallel jobs to use for make based on the number of cores
unamestr="$(uname)"
if [[ "$unamestr" == "Linux" ]]; then
PARALLEL=$(nproc)
elif [[ "$unamestr" == "Darwin" ]]; then
PARALLEL=$(sysctl -n hw.ncpu)
echo "Platform is macosx."
else
echo "Unrecognized platform."
exit 1
fi
echo "building arrow"
cd $TP_DIR/arrow/cpp
mkdir -p $TP_DIR/arrow/cpp/build
cd $TP_DIR/arrow/cpp/build
cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS="-g" -DCMAKE_CXX_FLAGS="-g" ..
make VERBOSE=1 -j$PARALLEL

8
numbuf/thirdparty/download_thirdparty.sh vendored Executable file
View file

@ -0,0 +1,8 @@
#!/bin/bash
set -x
set -e
TP_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)
git submodule update --init --recursive -- "$TP_DIR/arrow"

View file

@ -0,0 +1,75 @@
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" ToolsVersion="14.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup Label="ProjectConfigurations">
<ProjectConfiguration Include="Debug|Win32">
<Configuration>Debug</Configuration>
<Platform>Win32</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Release|Win32">
<Configuration>Release</Configuration>
<Platform>Win32</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Debug|x64">
<Configuration>Debug</Configuration>
<Platform>x64</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Release|x64">
<Configuration>Release</Configuration>
<Platform>x64</Platform>
</ProjectConfiguration>
</ItemGroup>
<PropertyGroup Label="Globals">
<ProjectGuid>{609D1438-D42D-4CBA-80A5-A1398C3BCC85}</ProjectGuid>
<Keyword>Win32Proj</Keyword>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
<PropertyGroup Label="Configuration">
<PlatformToolset>v140</PlatformToolset>
<ConfigurationType>DynamicLibrary</ConfigurationType>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
<ImportGroup Label="ExtensionSettings">
</ImportGroup>
<ImportGroup Label="Shared">
</ImportGroup>
<ImportGroup Label="PropertySheets">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
<Import Project="$(SolutionDir)vsprojects\base.props" Condition="exists('$(SolutionDir)vsprojects\base.props')" />
</ImportGroup>
<PropertyGroup Label="UserMacros" />
<PropertyGroup>
<TargetName>lib$(MSBuildProjectName)</TargetName>
<TargetExt>.pyd</TargetExt>
</PropertyGroup>
<ItemDefinitionGroup>
<ClCompile>
<AdditionalIncludeDirectories>$(THIRD_PARTY)arrow\cpp\src;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
</Link>
</ItemDefinitionGroup>
<ItemGroup>
<ClInclude Include="..\cpp\src\numbuf\dict.h" />
<ClInclude Include="..\cpp\src\numbuf\sequence.h" />
<ClInclude Include="..\cpp\src\numbuf\tensor.h" />
<ClInclude Include="..\python\src\pynumbuf\adapters\numpy.h" />
<ClInclude Include="..\python\src\pynumbuf\adapters\python.h" />
<ClInclude Include="..\python\src\pynumbuf\adapters\scalars.h" />
<ClInclude Include="..\python\src\pynumbuf\memory.h" />
</ItemGroup>
<ItemGroup>
<ClCompile Include="..\cpp\src\numbuf\dict.cc" />
<ClCompile Include="..\cpp\src\numbuf\sequence.cc" />
<ClCompile Include="..\cpp\src\numbuf\tensor.cc" />
<ClCompile Include="..\python\src\pynumbuf\adapters\numpy.cc" />
<ClCompile Include="..\python\src\pynumbuf\adapters\python.cc" />
<ClCompile Include="..\python\src\pynumbuf\numbuf.cc" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="$(THIRD_PARTY)arrow\vsprojects\arrow.vcxproj" Condition="exists('$(THIRD_PARTY)arrow\vsprojects\arrow.vcxproj')">
<Project>{10e7d8e8-0eeb-46ea-a58d-f9236b5960ad}</Project>
</ProjectReference>
</ItemGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
</Project>

View file

@ -0,0 +1,60 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup>
<Filter Include="Source Files">
<UniqueIdentifier>{4FC737F1-C7A5-4376-A066-2A32D752A2FF}</UniqueIdentifier>
<Extensions>cpp;c;cc;cxx;def;odl;idl;hpj;bat;asm;asmx</Extensions>
</Filter>
<Filter Include="Header Files">
<UniqueIdentifier>{93995380-89BD-4b04-88EB-625FBE52EBFB}</UniqueIdentifier>
<Extensions>h;hh;hpp;hxx;hm;inl;inc;xsd</Extensions>
</Filter>
<Filter Include="Resource Files">
<UniqueIdentifier>{67DA6AB6-F800-4c08-8B7A-83BB121AAD01}</UniqueIdentifier>
<Extensions>rc;ico;cur;bmp;dlg;rc2;rct;bin;rgs;gif;jpg;jpeg;jpe;resx;tiff;tif;png;wav;mfcribbon-ms</Extensions>
</Filter>
</ItemGroup>
<ItemGroup>
<ClCompile Include="..\cpp\src\numbuf\dict.cc">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="..\cpp\src\numbuf\sequence.cc">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="..\cpp\src\numbuf\tensor.cc">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="..\python\src\pynumbuf\adapters\numpy.cc">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="..\python\src\pynumbuf\adapters\python.cc">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="..\python\src\pynumbuf\numbuf.cc">
<Filter>Source Files</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\cpp\src\numbuf\dict.h">
<Filter>Header Files</Filter>
</ClInclude>
<ClInclude Include="..\cpp\src\numbuf\sequence.h">
<Filter>Header Files</Filter>
</ClInclude>
<ClInclude Include="..\cpp\src\numbuf\tensor.h">
<Filter>Header Files</Filter>
</ClInclude>
<ClInclude Include="..\python\src\pynumbuf\adapters\numpy.h">
<Filter>Header Files</Filter>
</ClInclude>
<ClInclude Include="..\python\src\pynumbuf\adapters\python.h">
<Filter>Header Files</Filter>
</ClInclude>
<ClInclude Include="..\python\src\pynumbuf\adapters\scalars.h">
<Filter>Header Files</Filter>
</ClInclude>
<ClInclude Include="..\python\src\pynumbuf\memory.h">
<Filter>Header Files</Filter>
</ClInclude>
</ItemGroup>
</Project>