From 7030ef366f3d3a842a19cd8e114e7e5084a28b7d Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Mon, 4 Sep 2017 22:58:49 -0700 Subject: [PATCH] Rebase Ray on latest arrow (remove numbuf from Ray). (#910) * remove some stuff * put get roundtrip working * fixes * more fixes * cleanup * fix tests * latest arrow * fixes * fix tests * fix linting * rebase * fixes * fix bug * bring back libgcc error * fix linting * use official arrow repo * fixes --- .gitignore | 1 - .travis.yml | 4 +- CMakeLists.txt | 1 - doc/source/conf.py | 1 - doc/source/installation-troubleshooting.rst | 12 +- python/ray/__init__.py | 30 + python/ray/local_scheduler/test/test.py | 4 +- python/ray/numbuf/__init__.py | 41 -- python/ray/plasma/test/test.py | 10 +- python/ray/serialization.py | 150 ----- python/ray/worker.py | 100 ++-- python/setup.py | 1 - src/global_scheduler/global_scheduler.cc | 10 +- .../local_scheduler_algorithm.cc | 4 +- src/numbuf/.clang-format | 65 --- src/numbuf/CMakeLists.txt | 70 --- src/numbuf/cmake/Modules/FindNumPy.cmake | 54 -- src/numbuf/cpp/src/numbuf/dict.cc | 25 - src/numbuf/cpp/src/numbuf/dict.h | 47 -- src/numbuf/cpp/src/numbuf/sequence.cc | 149 ----- src/numbuf/cpp/src/numbuf/sequence.h | 125 ----- .../python/src/pynumbuf/adapters/numpy.cc | 67 --- .../python/src/pynumbuf/adapters/numpy.h | 23 - .../python/src/pynumbuf/adapters/python.cc | 340 ----------- .../python/src/pynumbuf/adapters/python.h | 30 - .../python/src/pynumbuf/adapters/scalars.h | 54 -- src/numbuf/python/src/pynumbuf/numbuf.cc | 529 ------------------ src/numbuf/python/test/runtest.py | 174 ------ src/numbuf/vsprojects/numbuf.vcxproj | 75 --- src/numbuf/vsprojects/numbuf.vcxproj.filters | 60 -- src/thirdparty/download_thirdparty.sh | 2 +- 31 files changed, 89 insertions(+), 2169 deletions(-) delete mode 100644 python/ray/numbuf/__init__.py delete mode 100644 src/numbuf/.clang-format delete mode 100644 src/numbuf/CMakeLists.txt delete mode 100644 src/numbuf/cmake/Modules/FindNumPy.cmake delete mode 100644 src/numbuf/cpp/src/numbuf/dict.cc delete mode 100644 src/numbuf/cpp/src/numbuf/dict.h delete mode 100644 src/numbuf/cpp/src/numbuf/sequence.cc delete mode 100644 src/numbuf/cpp/src/numbuf/sequence.h delete mode 100644 src/numbuf/python/src/pynumbuf/adapters/numpy.cc delete mode 100644 src/numbuf/python/src/pynumbuf/adapters/numpy.h delete mode 100644 src/numbuf/python/src/pynumbuf/adapters/python.cc delete mode 100644 src/numbuf/python/src/pynumbuf/adapters/python.h delete mode 100644 src/numbuf/python/src/pynumbuf/adapters/scalars.h delete mode 100644 src/numbuf/python/src/pynumbuf/numbuf.cc delete mode 100644 src/numbuf/python/test/runtest.py delete mode 100644 src/numbuf/vsprojects/numbuf.vcxproj delete mode 100644 src/numbuf/vsprojects/numbuf.vcxproj.filters diff --git a/.gitignore b/.gitignore index 945233491..aa0b53d10 100644 --- a/.gitignore +++ b/.gitignore @@ -4,7 +4,6 @@ /python/build /python/dist /src/common/thirdparty/redis -/src/numbuf/thirdparty/arrow /src/thirdparty/arrow /flatbuffers-1.7.1/ /src/thirdparty/boost/ diff --git a/.travis.yml b/.travis.yml index d766fd478..3a3f7aac3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -41,7 +41,7 @@ matrix: - sphinx-build -W -b html -d _build/doctrees source _build/html - cd .. # Run Python linting. - - flake8 --exclude=python/ray/core/src/common/flatbuffers_ep-prefix/,python/ray/core/generated/,src/numbuf/thirdparty/,src/common/format/,doc/source/conf.py + - flake8 --exclude=python/ray/core/src/common/flatbuffers_ep-prefix/,python/ray/core/generated/,src/common/format/,doc/source/conf.py - os: linux dist: trusty @@ -101,8 +101,6 @@ install: script: - export PATH="$HOME/miniconda/bin:$PATH" - - python src/numbuf/python/test/runtest.py - - python python/ray/common/test/test.py - python python/ray/common/redis_module/runtest.py - python python/ray/plasma/test/test.py diff --git a/CMakeLists.txt b/CMakeLists.txt index 401a03583..546e58974 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -11,4 +11,3 @@ add_subdirectory(${CMAKE_CURRENT_LIST_DIR}/src/common/) add_subdirectory(${CMAKE_CURRENT_LIST_DIR}/src/plasma/) add_subdirectory(${CMAKE_CURRENT_LIST_DIR}/src/local_scheduler/) add_subdirectory(${CMAKE_CURRENT_LIST_DIR}/src/global_scheduler/) -add_subdirectory(${CMAKE_CURRENT_LIST_DIR}/src/numbuf/) diff --git a/doc/source/conf.py b/doc/source/conf.py index c3e5a0866..b84b8229c 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -20,7 +20,6 @@ import shlex import mock MOCK_MODULES = ["pyarrow", "pyarrow.plasma", - "ray.numbuf", "ray.local_scheduler", "ray.plasma", "ray.core.generated.TaskInfo", diff --git a/doc/source/installation-troubleshooting.rst b/doc/source/installation-troubleshooting.rst index e0a033250..f3b4ea7d4 100644 --- a/doc/source/installation-troubleshooting.rst +++ b/doc/source/installation-troubleshooting.rst @@ -1,26 +1,24 @@ Installation Troubleshooting ============================ -Trouble installing Numbuf +Trouble installing Arrow ------------------------- -If the installation of Numbuf fails, chances are there was a problem building -Arrow. Some candidate possibilities. +Some candidate possibilities. You have a different version of Flatbuffers installed ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Arrow pulls and builds its own copy of Flatbuffers, but if you already have Flatbuffers installed, Arrow may find the wrong version. If a directory like -``/usr/local/include/flatbuffers`` shows up in the output when installing -Numbuf, this may be the problem. To solve it, get rid of the old version of -flatbuffers. +``/usr/local/include/flatbuffers`` shows up in the output, this may be the +problem. To solve it, get rid of the old version of flatbuffers. There is some problem with Boost ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ If a message like ``Unable to find the requested Boost libraries`` appears when -installing Numbuf, there may be a problem with Boost. This can happen if you +installing Arrow, there may be a problem with Boost. This can happen if you installed Boost using MacPorts. This is sometimes solved by using Brew instead. Trouble installing or running Ray diff --git a/python/ray/__init__.py b/python/ray/__init__.py index 2efb5739d..8e16b1c1d 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -10,6 +10,36 @@ pyarrow_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "pyarrow_files") sys.path.insert(0, pyarrow_path) +# See https://github.com/ray-project/ray/issues/131. +helpful_message = """ + +If you are using Anaconda, try fixing this problem by running: + + conda install libgcc +""" + +try: + import pyarrow # noqa: F401 +except ImportError as e: + if ((hasattr(e, "msg") and isinstance(e.msg, str) and + ("libstdc++" in e.msg or "CXX" in e.msg))): + # This code path should be taken with Python 3. + e.msg += helpful_message + elif (hasattr(e, "message") and isinstance(e.message, str) and + ("libstdc++" in e.message or "CXX" in e.message)): + # This code path should be taken with Python 2. + condition = (hasattr(e, "args") and isinstance(e.args, tuple) and + len(e.args) == 1 and isinstance(e.args[0], str)) + if condition: + e.args = (e.args[0] + helpful_message,) + else: + if not hasattr(e, "args"): + e.args = () + elif not isinstance(e.args, tuple): + e.args = (e.args,) + e.args += (helpful_message,) + raise + from ray.worker import (register_class, error_info, init, connect, disconnect, get, put, wait, remote, log_event, log_span, flush_log, get_gpu_ids) # noqa: E402 diff --git a/python/ray/local_scheduler/test/test.py b/python/ray/local_scheduler/test/test.py index dd9f45f05..d098b1dea 100644 --- a/python/ray/local_scheduler/test/test.py +++ b/python/ray/local_scheduler/test/test.py @@ -188,8 +188,8 @@ class TestLocalSchedulerClient(unittest.TestCase): time.sleep(0.1) self.assertTrue(t.is_alive()) # Check that the first object dependency was evicted. - object1 = self.plasma_client.get([pa.plasma.ObjectID(object_id1.id())], - timeout_ms=0) + object1 = self.plasma_client.get_buffers( + [pa.plasma.ObjectID(object_id1.id())], timeout_ms=0) self.assertEqual(object1, [None]) # Check that the thread is still waiting for a task. time.sleep(0.1) diff --git a/python/ray/numbuf/__init__.py b/python/ray/numbuf/__init__.py deleted file mode 100644 index fdca8de3e..000000000 --- a/python/ray/numbuf/__init__.py +++ /dev/null @@ -1,41 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -# See https://github.com/ray-project/ray/issues/131. -helpful_message = """ - -If you are using Anaconda, try fixing this problem by running: - - conda install libgcc -""" - -__all__ = ["deserialize_list", "numbuf_error", - "numbuf_plasma_object_exists_error", "read_from_buffer", - "register_callbacks", "retrieve_list", "serialize_list", - "store_list", "write_to_buffer"] - -try: - from ray.core.src.numbuf.libnumbuf import ( - deserialize_list, numbuf_error, numbuf_plasma_object_exists_error, - read_from_buffer, register_callbacks, retrieve_list, serialize_list, - store_list, write_to_buffer) -except ImportError as e: - if ((hasattr(e, "msg") and isinstance(e.msg, str) and - ("libstdc++" in e.msg or "CXX" in e.msg))): - # This code path should be taken with Python 3. - e.msg += helpful_message - elif (hasattr(e, "message") and isinstance(e.message, str) and - ("libstdc++" in e.message or "CXX" in e.message)): - # This code path should be taken with Python 2. - condition = (hasattr(e, "args") and isinstance(e.args, tuple) and - len(e.args) == 1 and isinstance(e.args[0], str)) - if condition: - e.args = (e.args[0] + helpful_message,) - else: - if not hasattr(e, "args"): - e.args = () - elif not isinstance(e.args, tuple): - e.args = (e.args,) - e.args += (helpful_message,) - raise diff --git a/python/ray/plasma/test/test.py b/python/ray/plasma/test/test.py index c5292d9df..7cb9e4bae 100644 --- a/python/ray/plasma/test/test.py +++ b/python/ray/plasma/test/test.py @@ -32,8 +32,8 @@ def random_name(): def assert_get_object_equal(unit_test, client1, client2, object_id, memory_buffer=None, metadata=None): - client1_buff = client1.get([object_id])[0] - client2_buff = client2.get([object_id])[0] + client1_buff = client1.get_buffers([object_id])[0] + client2_buff = client2.get_buffers([object_id])[0] client1_metadata = client1.get_metadata([object_id])[0] client2_metadata = client2.get_metadata([object_id])[0] unit_test.assertEqual(len(client1_buff), len(client2_buff)) @@ -371,7 +371,8 @@ class TestPlasmaManager(unittest.TestCase): # trying until the object appears on the second Plasma store. for i in range(num_attempts): self.client1.transfer("127.0.0.1", self.port2, object_id1) - buff = self.client2.get([object_id1], timeout_ms=100)[0] + buff = self.client2.get_buffers( + [object_id1], timeout_ms=100)[0] if buff is not None: break self.assertNotEqual(buff, None) @@ -397,7 +398,8 @@ class TestPlasmaManager(unittest.TestCase): # trying until the object appears on the second Plasma store. for i in range(num_attempts): self.client2.transfer("127.0.0.1", self.port1, object_id2) - buff = self.client1.get([object_id2], timeout_ms=100)[0] + buff = self.client1.get_buffers( + [object_id2], timeout_ms=100)[0] if buff is not None: break self.assertNotEqual(buff, None) diff --git a/python/ray/serialization.py b/python/ray/serialization.py index 5a99bda67..87c678f23 100644 --- a/python/ray/serialization.py +++ b/python/ray/serialization.py @@ -2,22 +2,6 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import cloudpickle as pickle - -import ray.numbuf - - -class RaySerializationException(Exception): - def __init__(self, message, example_object): - Exception.__init__(self, message) - self.example_object = example_object - - -class RayDeserializationException(Exception): - def __init__(self, message, class_id): - Exception.__init__(self, message) - self.class_id = class_id - class RayNotDictionarySerializable(Exception): pass @@ -65,15 +49,6 @@ def check_serializable(cls): "it efficiently.".format(cls)) -# This field keeps track of a whitelisted set of classes that Ray will -# serialize. -type_to_class_id = dict() -whitelisted_classes = dict() -classes_to_pickle = set() -custom_serializers = dict() -custom_deserializers = dict() - - def is_named_tuple(cls): """Return True if cls is a namedtuple and False otherwise.""" b = cls.__bases__ @@ -83,128 +58,3 @@ def is_named_tuple(cls): if not isinstance(f, tuple): return False return all(type(n) == str for n in f) - - -def add_class_to_whitelist(cls, class_id, pickle=False, custom_serializer=None, - custom_deserializer=None): - """Add cls to the list of classes that we can serialize. - - Args: - cls (type): The class that we can serialize. - class_id: A string of bytes used to identify the class. - pickle (bool): True if the serialization should be done with pickle. - False if it should be done efficiently with Ray. - custom_serializer: This argument is optional, but can be provided to - serialize objects of the class in a particular way. - custom_deserializer: This argument is optional, but can be provided to - deserialize objects of the class in a particular way. - """ - type_to_class_id[cls] = class_id - whitelisted_classes[class_id] = cls - if pickle: - classes_to_pickle.add(class_id) - if custom_serializer is not None: - custom_serializers[class_id] = custom_serializer - custom_deserializers[class_id] = custom_deserializer - - -def serialize(obj): - """This is the callback that will be used by numbuf. - - If numbuf does not know how to serialize an object, it will call this - method. - - Args: - obj (object): A Python object. - - Returns: - A dictionary that has the key "_pyttype_" to identify the class, and - contains all information needed to reconstruct the object. - """ - if type(obj) not in type_to_class_id: - raise RaySerializationException("Ray does not know how to serialize " - "objects of type {}." - .format(type(obj)), - obj) - class_id = type_to_class_id[type(obj)] - - if class_id in classes_to_pickle: - serialized_obj = {"data": pickle.dumps(obj), - "pickle": True} - elif class_id in custom_serializers: - serialized_obj = {"data": custom_serializers[class_id](obj)} - else: - # Handle the namedtuple case. - if is_named_tuple(type(obj)): - serialized_obj = {} - serialized_obj["_ray_getnewargs_"] = obj.__getnewargs__() - elif hasattr(obj, "__dict__"): - serialized_obj = obj.__dict__ - else: - raise RaySerializationException("We do not know how to serialize " - "the object '{}'".format(obj), obj) - result = dict(serialized_obj, **{"_pytype_": class_id}) - return result - - -def deserialize(serialized_obj): - """This is the callback that will be used by numbuf. - - If numbuf encounters a dictionary that contains the key "_pytype_" during - deserialization, it will ask this callback to deserialize the object. - - Args: - serialized_obj (object): A dictionary that contains the key "_pytype_". - - Returns: - A Python object. - - Raises: - An exception is raised if we do not know how to deserialize the object. - """ - class_id = serialized_obj["_pytype_"] - - if "pickle" in serialized_obj: - # The object was pickled, so unpickle it. - obj = pickle.loads(serialized_obj["data"]) - else: - assert class_id not in classes_to_pickle - if class_id not in whitelisted_classes: - # If this happens, that means that the call to _register_class, - # which should have added the class to the list of whitelisted - # classes, has not yet propagated to this worker. It should happen - # if we wait a little longer. - raise RayDeserializationException("The class {} is not one of the " - "whitelisted classes." - .format(class_id), class_id) - cls = whitelisted_classes[class_id] - if class_id in custom_deserializers: - obj = custom_deserializers[class_id](serialized_obj["data"]) - else: - # In this case, serialized_obj should just be the __dict__ field. - if "_ray_getnewargs_" in serialized_obj: - obj = cls.__new__(cls, *serialized_obj["_ray_getnewargs_"]) - else: - obj = cls.__new__(cls) - serialized_obj.pop("_pytype_") - obj.__dict__.update(serialized_obj) - return obj - - -def set_callbacks(): - """Register the custom callbacks with numbuf. - - The serialize callback is used to serialize objects that numbuf does not - know how to serialize (for example custom Python classes). The deserialize - callback is used to serialize objects that were serialized by the serialize - callback. - """ - ray.numbuf.register_callbacks(serialize, deserialize) - - -def clear_state(): - type_to_class_id.clear() - whitelisted_classes.clear() - classes_to_pickle.clear() - custom_serializers.clear() - custom_deserializers.clear() diff --git a/python/ray/worker.py b/python/ray/worker.py index 77b2e476c..303f0255a 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -20,12 +20,12 @@ import time import traceback # Ray modules +import pyarrow import pyarrow.plasma as plasma import ray.experimental.state as state import ray.serialization as serialization import ray.services as services import ray.signature as signature -import ray.numbuf import ray.local_scheduler import ray.plasma from ray.utils import FunctionProperties, random_string, binary_to_hex @@ -70,27 +70,6 @@ class FunctionID(object): return self.function_id -contained_objectids = [] - - -def numbuf_serialize(value): - """This serializes a value and tracks the object IDs inside the value. - - We also define a custom ObjectID serializer which also closes over the - global variable contained_objectids, and whenever the custom serializer is - called, it adds the releevant ObjectID to the list contained_objectids. The - list contained_objectids should be reset between calls to numbuf_serialize. - - Args: - value: A Python object that will be serialized. - - Returns: - The serialized object. - """ - assert len(contained_objectids) == 0, "This should be unreachable." - return ray.numbuf.serialize_list([value]) - - class RayTaskError(Exception): """An object used internally to represent a task that threw an exception. @@ -300,11 +279,10 @@ class Worker(object): "type {}.".format(type(value))) counter += 1 try: - ray.numbuf.store_list(object_id.id(), - self.plasma_client.to_capsule(), - [value]) + self.plasma_client.put(value, pyarrow.plasma.ObjectID( + object_id.id()), self.serialization_context) break - except serialization.RaySerializationException as e: + except pyarrow.SerializationCallbackError as e: try: _register_class(type(e.example_object)) warning_message = ("WARNING: Serializing objects of type " @@ -349,7 +327,7 @@ class Worker(object): # Serialize and put the object in the object store. try: self.store_and_register(object_id, value) - except ray.numbuf.numbuf_plasma_object_exists_error as e: + except pyarrow.PlasmaObjectExists as e: # The object already exists in the object store, so there is no # need to add it again. TODO(rkn): We need to compare the hashes # and make sure that the objects are in fact the same. We also @@ -357,10 +335,6 @@ class Worker(object): # message. print("This object already exists in the object store.") - global contained_objectids - # Optionally do something with the contained_objectids here. - contained_objectids = [] - def retrieve_and_deserialize(self, object_ids, timeout, error_timeout=10): start_time = time.time() # Only send the warning once. @@ -374,12 +348,12 @@ class Worker(object): results = [] get_request_size = 10000 for i in range(0, len(object_ids), get_request_size): - results += ray.numbuf.retrieve_list( + results += self.plasma_client.get( object_ids[i:(i + get_request_size)], - self.plasma_client.to_capsule(), - timeout) + timeout, + self.serialization_context) return results - except serialization.RayDeserializationException as e: + except pyarrow.DeserializationCallbackError as e: # Wait a little bit for the import thread to import the class. # If we currently have the worker lock, we need to release it # so that the import thread can acquire it. @@ -428,12 +402,12 @@ class Worker(object): plain_object_ids[i:(i + fetch_request_size)]) # Get the objects. We initially try to get the objects immediately. - final_results = self.retrieve_and_deserialize( - [object_id.id() for object_id in object_ids], 0) + final_results = self.retrieve_and_deserialize(plain_object_ids, 0) # Construct a dictionary mapping object IDs that we haven't gotten yet # to their original index in the object_ids argument. - unready_ids = dict((object_id, i) for (i, (object_id, val)) in - enumerate(final_results) if val is None) + unready_ids = dict((plain_object_ids[i].binary(), i) for (i, val) in + enumerate(final_results) + if val is plasma.ObjectNotAvailable) was_blocked = (len(unready_ids) > 0) # Try reconstructing any objects we haven't gotten yet. Try to get them # until at least GET_TIMEOUT_MILLISECONDS milliseconds passes, then @@ -451,14 +425,15 @@ class Worker(object): self.plasma_client.fetch( object_ids_to_fetch[i:(i + fetch_request_size)]) results = self.retrieve_and_deserialize( - list(unready_ids.keys()), + object_ids_to_fetch, max([GET_TIMEOUT_MILLISECONDS, int(0.01 * len(unready_ids))])) # Remove any entries for objects we received during this iteration # so we don't retrieve the same object twice. - for object_id, val in results: - if val is not None: + for i, val in enumerate(results): + if val is not plasma.ObjectNotAvailable: + object_id = object_ids_to_fetch[i].binary() index = unready_ids[object_id] - final_results[index] = (object_id, val) + final_results[index] = val unready_ids.pop(object_id) # If there were objects that we weren't able to get locally, let the @@ -466,11 +441,8 @@ class Worker(object): if was_blocked: self.local_scheduler_client.notify_unblocked() - # Unwrap the object from the list (it was wrapped put_object). assert len(final_results) == len(object_ids) - for i in range(len(final_results)): - assert final_results[i][0] == object_ids[i].id() - return [result[1][0] for result in final_results] + return final_results def submit_task(self, function_id, args, actor_id=None): """Submit a remote task to the scheduler. @@ -556,7 +528,7 @@ class Worker(object): # counter starts at 0. counter = self.redis_client.hincrby(self.node_ip_address, key, 1) - 1 - function({"counter": counter}) + function({"counter": counter, "worker": self}) # Run the function on all workers. self.redis_client.hmset(key, {"driver_id": self.task_driver_id.id(), @@ -991,23 +963,22 @@ def error_info(worker=global_worker): return errors -def initialize_numbuf(worker=global_worker): +def _initialize_serialization(worker=global_worker): """Initialize the serialization library. - This defines a custom serializer for object IDs and also tells numbuf to + This defines a custom serializer for object IDs and also tells ray to serialize several exception classes that we define for error handling. """ - ray.serialization.set_callbacks() + worker.serialization_context = pyarrow.SerializationContext() # Define a custom serializer and deserializer for handling Object IDs. def objectid_custom_serializer(obj): - contained_objectids.append(obj) return obj.id() def objectid_custom_deserializer(serialized_obj): return ray.local_scheduler.ObjectID(serialized_obj) - serialization.add_class_to_whitelist( + worker.serialization_context.register_type( ray.local_scheduler.ObjectID, 20 * b"\x00", pickle=False, custom_serializer=objectid_custom_serializer, custom_deserializer=objectid_custom_deserializer) @@ -1020,7 +991,7 @@ def initialize_numbuf(worker=global_worker): def array_custom_deserializer(serialized_obj): return np.array(serialized_obj[0], dtype=np.dtype(serialized_obj[1])) - serialization.add_class_to_whitelist( + worker.serialization_context.register_type( np.ndarray, 20 * b"\x01", pickle=False, custom_serializer=array_custom_serializer, custom_deserializer=array_custom_deserializer) @@ -1503,7 +1474,7 @@ def fetch_and_execute_function_to_run(key, worker=global_worker): # Deserialize the function. function = pickle.loads(serialized_function) # Run the function. - function({"counter": counter}) + function({"counter": counter, "worker": worker}) except: # If an exception was thrown when the function was run, we record the # traceback and notify the scheduler of the failure. @@ -1770,6 +1741,10 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, class_id = worker.redis_client.hget(actor_key, "class_id") worker.class_id = class_id + # Initialize the serialization library. This registers some classes, and so + # it must be run before we export all of the cached remote functions. + _initialize_serialization() + # Start a thread to import exports from the driver or from other workers. # Note that the driver also has an import thread, which is used only to # import custom class definitions from calls to _register_class that happen @@ -1791,9 +1766,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, # exits. t.daemon = True t.start() - # Initialize the serialization library. This registers some classes, and so - # it must be run before we export all of the cached remote functions. - initialize_numbuf() + if mode in [SCRIPT_MODE, SILENT_MODE]: # Add the directory containing the script that is running to the Python # paths of the workers. Also add the current directory. Note that this @@ -1835,7 +1808,7 @@ def disconnect(worker=global_worker): worker.connected = False worker.cached_functions_to_run = [] worker.cached_remote_functions = [] - serialization.clear_state() + worker.serialization_context = pyarrow.SerializationContext() def register_class(cls, pickle=False, worker=global_worker): @@ -1847,11 +1820,11 @@ def _register_class(cls, pickle=False, worker=global_worker): """Enable serialization and deserialization for a particular class. This method runs the register_class function defined below on every worker, - which will enable numbuf to properly serialize and deserialize objects of + which will enable ray to properly serialize and deserialize objects of this class. Args: - cls (type): The class that numbuf should serialize. + cls (type): The class that ray should serialize. pickle (bool): If False then objects of this class will be serialized by turning their __dict__ fields into a dictionary. If True, then objects of this class will be serialized using pickle. @@ -1863,7 +1836,8 @@ def _register_class(cls, pickle=False, worker=global_worker): class_id = random_string() def register_class_for_serialization(worker_info): - serialization.add_class_to_whitelist(cls, class_id, pickle=pickle) + worker_info["worker"].serialization_context.register_type( + cls, class_id, pickle=pickle) if not pickle: # Raise an exception if cls cannot be serialized efficiently by Ray. @@ -1872,7 +1846,7 @@ def _register_class(cls, pickle=False, worker=global_worker): else: # Since we are pickling objects of this class, we don't actually need # to ship the class definition. - register_class_for_serialization({}) + register_class_for_serialization({"worker": worker}) class RayLogSpan(object): diff --git a/python/setup.py b/python/setup.py index 942fae75a..2ff65d6a1 100644 --- a/python/setup.py +++ b/python/setup.py @@ -22,7 +22,6 @@ ray_files = [ "ray/core/src/plasma/plasma_manager", "ray/core/src/local_scheduler/local_scheduler", "ray/core/src/local_scheduler/liblocal_scheduler_library.so", - "ray/core/src/numbuf/libnumbuf.so", "ray/core/src/global_scheduler/global_scheduler", "ray/WebUI.ipynb" ] diff --git a/src/global_scheduler/global_scheduler.cc b/src/global_scheduler/global_scheduler.cc index e6127f7d5..061753288 100644 --- a/src/global_scheduler/global_scheduler.cc +++ b/src/global_scheduler/global_scheduler.cc @@ -72,7 +72,7 @@ void assign_task_to_local_scheduler(GlobalSchedulerState *state, Task_set_local_scheduler(task, local_scheduler_id); LOG_DEBUG("Issuing a task table update for task = %s", ObjectID_to_string(Task_task_id(task), id_string, ID_STRING_SIZE)); - UNUSED(id_string); + ARROW_UNUSED(id_string); auto retryInfo = RetryInfo{ .num_retries = 0, // This value is unused. .timeout = 0, // This value is unused. @@ -246,7 +246,7 @@ void process_new_db_client(DBClient *db_client, void *user_context) { char id_string[ID_STRING_SIZE]; LOG_DEBUG("db client table callback for db client = %s", ObjectID_to_string(db_client->id, id_string, ID_STRING_SIZE)); - UNUSED(id_string); + ARROW_UNUSED(id_string); if (strncmp(db_client->client_type, "local_scheduler", strlen("local_scheduler")) == 0) { bool local_scheduler_present = @@ -288,7 +288,7 @@ void object_table_subscribe_callback(ObjectID object_id, char id_string[ID_STRING_SIZE]; LOG_DEBUG("object table subscribe callback for OBJECT = %s", ObjectID_to_string(object_id, id_string, ID_STRING_SIZE)); - UNUSED(id_string); + ARROW_UNUSED(id_string); LOG_DEBUG("\tManagers<%d>:", manager_count); for (int i = 0; i < manager_count; i++) { LOG_DEBUG("\t\t%s", manager_vector[i]); @@ -324,12 +324,12 @@ void local_scheduler_table_handler(DBClientID client_id, void *user_context) { /* Extract global scheduler state from the callback context. */ GlobalSchedulerState *state = (GlobalSchedulerState *) user_context; - UNUSED(state); + ARROW_UNUSED(state); char id_string[ID_STRING_SIZE]; LOG_DEBUG( "Local scheduler heartbeat from db_client_id %s", ObjectID_to_string((ObjectID) client_id, id_string, ID_STRING_SIZE)); - UNUSED(id_string); + ARROW_UNUSED(id_string); LOG_DEBUG( "total workers = %d, task queue length = %d, available workers = %d", info.total_num_workers, info.task_queue_length, info.available_workers); diff --git a/src/local_scheduler/local_scheduler_algorithm.cc b/src/local_scheduler/local_scheduler_algorithm.cc index 95c9b85af..3f1d08260 100644 --- a/src/local_scheduler/local_scheduler_algorithm.cc +++ b/src/local_scheduler/local_scheduler_algorithm.cc @@ -233,7 +233,7 @@ void create_actor(SchedulingAlgorithmState *algorithm_state, char id_string[ID_STRING_SIZE]; LOG_DEBUG("Creating actor with ID %s.", ObjectID_to_string(actor_id, id_string, ID_STRING_SIZE)); - UNUSED(id_string); + ARROW_UNUSED(id_string); } void remove_actor(SchedulingAlgorithmState *algorithm_state, ActorID actor_id) { @@ -249,7 +249,7 @@ void remove_actor(SchedulingAlgorithmState *algorithm_state, ActorID actor_id) { ObjectID_to_string(actor_id, id_string, ID_STRING_SIZE), (long long) count); } - UNUSED(id_string); + ARROW_UNUSED(id_string); /* Free all remaining tasks in the actor queue. */ for (auto &task : *entry.task_queue) { diff --git a/src/numbuf/.clang-format b/src/numbuf/.clang-format deleted file mode 100644 index 7d5b3cf30..000000000 --- a/src/numbuf/.clang-format +++ /dev/null @@ -1,65 +0,0 @@ ---- -Language: Cpp -# BasedOnStyle: Google -AccessModifierOffset: -1 -AlignAfterOpenBracket: false -AlignConsecutiveAssignments: false -AlignEscapedNewlinesLeft: true -AlignOperands: true -AlignTrailingComments: true -AllowAllParametersOfDeclarationOnNextLine: true -AllowShortBlocksOnASingleLine: true -AllowShortCaseLabelsOnASingleLine: false -AllowShortFunctionsOnASingleLine: Inline -AllowShortIfStatementsOnASingleLine: true -AllowShortLoopsOnASingleLine: false -AlwaysBreakAfterDefinitionReturnType: None -AlwaysBreakBeforeMultilineStrings: true -AlwaysBreakTemplateDeclarations: true -BinPackArguments: true -BinPackParameters: true -BreakBeforeBinaryOperators: None -BreakBeforeBraces: Attach -BreakBeforeTernaryOperators: true -BreakConstructorInitializersBeforeComma: false -ColumnLimit: 90 -CommentPragmas: '^ IWYU pragma:' -ConstructorInitializerAllOnOneLineOrOnePerLine: true -ConstructorInitializerIndentWidth: 4 -ContinuationIndentWidth: 4 -Cpp11BracedListStyle: true -DerivePointerAlignment: false -DisableFormat: false -ExperimentalAutoDetectBinPacking: false -ForEachMacros: [ foreach, Q_FOREACH, BOOST_FOREACH ] -IndentCaseLabels: true -IndentWidth: 2 -IndentWrappedFunctionNames: false -KeepEmptyLinesAtTheStartOfBlocks: false -MacroBlockBegin: '' -MacroBlockEnd: '' -MaxEmptyLinesToKeep: 1 -NamespaceIndentation: None -ObjCBlockIndentWidth: 2 -ObjCSpaceAfterProperty: false -ObjCSpaceBeforeProtocolList: false -PenaltyBreakBeforeFirstCallParameter: 1000 -PenaltyBreakComment: 300 -PenaltyBreakFirstLessLess: 120 -PenaltyBreakString: 1000 -PenaltyExcessCharacter: 1000000 -PenaltyReturnTypeOnItsOwnLine: 200 -PointerAlignment: Left -SpaceAfterCStyleCast: false -SpaceBeforeAssignmentOperators: true -SpaceBeforeParens: ControlStatements -SpaceInEmptyParentheses: false -SpacesBeforeTrailingComments: 2 -SpacesInAngles: false -SpacesInContainerLiterals: true -SpacesInCStyleCastParentheses: false -SpacesInParentheses: false -SpacesInSquareBrackets: false -Standard: Cpp11 -TabWidth: 8 -UseTab: Never diff --git a/src/numbuf/CMakeLists.txt b/src/numbuf/CMakeLists.txt deleted file mode 100644 index 2666bf0a2..000000000 --- a/src/numbuf/CMakeLists.txt +++ /dev/null @@ -1,70 +0,0 @@ -cmake_minimum_required(VERSION 3.4) - -project(numbuf) - -include(${CMAKE_CURRENT_LIST_DIR}/../common/cmake/Common.cmake) - -list(APPEND CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake/Modules) - -# Include plasma -list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_LIST_DIR}/../thirdparty/arrow/python/cmake_modules) - -find_package(Plasma) -include_directories(SYSTEM ${PLASMA_INCLUDE_DIR}) - -option(HAS_PLASMA - "Are we linking with the plasma object store? Recommended if numbuf is used as part of ray." - ON) - -if(HAS_PLASMA) - add_definitions(-DHAS_PLASMA) -endif() - -find_package(NumPy REQUIRED) - -if(APPLE) - set(CMAKE_SHARED_LIBRARY_SUFFIX ".so") -endif(APPLE) - -include_directories("${PYTHON_INCLUDE_DIRS}") -include_directories("${NUMPY_INCLUDE_DIR}") - -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -O3") - -if(UNIX AND NOT APPLE) - link_libraries(rt) -endif() - -set(ARROW_LIB "${ARROW_DIR}/cpp/build/release/libarrow.a" - CACHE STRING "Path to libarrow.a (needs to be changed if arrow is build in debug mode)") -set(ARROW_PYTHON_LIB "${ARROW_DIR}/cpp/build/release/libarrow_python.a" - CACHE STRING "Path to libarrow_python.a (needs to be changed if arrow is build in debug mode)") - -# include_directories("${ARROW_DIR}/cpp/src/") -include_directories("cpp/src/") -include_directories("python/src/") - -add_definitions(-fPIC) - -add_library(numbuf SHARED - cpp/src/numbuf/dict.cc - cpp/src/numbuf/sequence.cc - python/src/pynumbuf/numbuf.cc - python/src/pynumbuf/adapters/numpy.cc - python/src/pynumbuf/adapters/python.cc) - - -if(APPLE) - target_link_libraries(numbuf "-undefined dynamic_lookup" ${ARROW_LIB} ${ARROW_PYTHON_LIB} -lpthread) -else() - set(Boost_USE_STATIC_LIBS ON) - find_package(Boost 1.60.0 COMPONENTS filesystem system) - message(STATUS "Using Boost_LIBRARIES: ${Boost_LIBRARIES}") - target_link_libraries(numbuf -Wl,--whole-archive ${ARROW_LIB} -Wl,--no-whole-archive ${ARROW_PYTHON_LIB} -lpthread ${Boost_LIBRARIES}) -endif() - -if(HAS_PLASMA) - target_link_libraries(numbuf ${PLASMA_STATIC_LIB} ${ARROW_DIR}/cpp/build/release/libarrow.a common) -endif() - -install(TARGETS numbuf DESTINATION ${CMAKE_SOURCE_DIR}/numbuf/) diff --git a/src/numbuf/cmake/Modules/FindNumPy.cmake b/src/numbuf/cmake/Modules/FindNumPy.cmake deleted file mode 100644 index 037dbdb5f..000000000 --- a/src/numbuf/cmake/Modules/FindNumPy.cmake +++ /dev/null @@ -1,54 +0,0 @@ -# - Find the NumPy libraries -# This module finds if NumPy is installed, and sets the following variables -# indicating where it is. -# -# -# NUMPY_FOUND - was NumPy found -# NUMPY_VERSION - the version of NumPy found as a string -# NUMPY_VERSION_MAJOR - the major version number of NumPy -# NUMPY_VERSION_MINOR - the minor version number of NumPy -# NUMPY_VERSION_PATCH - the patch version number of NumPy -# NUMPY_VERSION_DECIMAL - e.g. version 1.6.1 is 10601 -# NUMPY_INCLUDE_DIR - path to the NumPy include files - -unset(NUMPY_VERSION) -unset(NUMPY_INCLUDE_DIR) - -if(PYTHONINTERP_FOUND) - execute_process(COMMAND "${PYTHON_EXECUTABLE}" "-c" - "import numpy as n; print(n.__version__); print(n.get_include());" - RESULT_VARIABLE __result - OUTPUT_VARIABLE __output - OUTPUT_STRIP_TRAILING_WHITESPACE) - - if(__result MATCHES 0) - string(REGEX REPLACE ";" "\\\\;" __values ${__output}) - string(REGEX REPLACE "\r?\n" ";" __values ${__values}) - list(GET __values 0 NUMPY_VERSION) - list(GET __values 1 NUMPY_INCLUDE_DIR) - - string(REGEX MATCH "^([0-9])+\\.([0-9])+\\.([0-9])+" __ver_check "${NUMPY_VERSION}") - if(NOT "${__ver_check}" STREQUAL "") - set(NUMPY_VERSION_MAJOR ${CMAKE_MATCH_1}) - set(NUMPY_VERSION_MINOR ${CMAKE_MATCH_2}) - set(NUMPY_VERSION_PATCH ${CMAKE_MATCH_3}) - math(EXPR NUMPY_VERSION_DECIMAL - "(${NUMPY_VERSION_MAJOR} * 10000) + (${NUMPY_VERSION_MINOR} * 100) + ${NUMPY_VERSION_PATCH}") - string(REGEX REPLACE "\\\\" "/" NUMPY_INCLUDE_DIR ${NUMPY_INCLUDE_DIR}) - else() - unset(NUMPY_VERSION) - unset(NUMPY_INCLUDE_DIR) - message(STATUS "Requested NumPy version and include path, but got instead:\n${__output}\n") - endif() - endif() -else() - message(STATUS "To find NumPy Python interpreter is required to be found.") -endif() - -include(FindPackageHandleStandardArgs) -find_package_handle_standard_args(NumPy REQUIRED_VARS NUMPY_INCLUDE_DIR NUMPY_VERSION - VERSION_VAR NUMPY_VERSION) - -if(NUMPY_FOUND) - message(STATUS "NumPy ver. ${NUMPY_VERSION} found (include: ${NUMPY_INCLUDE_DIR})") -endif() diff --git a/src/numbuf/cpp/src/numbuf/dict.cc b/src/numbuf/cpp/src/numbuf/dict.cc deleted file mode 100644 index 832e4bc86..000000000 --- a/src/numbuf/cpp/src/numbuf/dict.cc +++ /dev/null @@ -1,25 +0,0 @@ -#include "dict.h" - -using namespace arrow; - -namespace numbuf { - -Status DictBuilder::Finish(std::shared_ptr key_tuple_data, - std::shared_ptr key_dict_data, std::shared_ptr val_list_data, - std::shared_ptr val_tuple_data, std::shared_ptr val_dict_data, - std::shared_ptr* out) { - // lists and dicts can't be keys of dicts in Python, that is why for - // the keys we do not need to collect sublists - std::shared_ptr keys, vals; - RETURN_NOT_OK(keys_.Finish(nullptr, key_tuple_data, key_dict_data, &keys)); - RETURN_NOT_OK(vals_.Finish(val_list_data, val_tuple_data, val_dict_data, &vals)); - auto keys_field = std::make_shared("keys", keys->type()); - auto vals_field = std::make_shared("vals", vals->type()); - auto type = - std::make_shared(std::vector({keys_field, vals_field})); - std::vector> field_arrays({keys, vals}); - DCHECK(keys->length() == vals->length()); - out->reset(new StructArray(type, keys->length(), field_arrays)); - return Status::OK(); -} -} diff --git a/src/numbuf/cpp/src/numbuf/dict.h b/src/numbuf/cpp/src/numbuf/dict.h deleted file mode 100644 index 708d36747..000000000 --- a/src/numbuf/cpp/src/numbuf/dict.h +++ /dev/null @@ -1,47 +0,0 @@ -#ifndef NUMBUF_DICT_H -#define NUMBUF_DICT_H - -#include - -#include "sequence.h" - -namespace numbuf { - -/*! Constructing dictionaries of key/value pairs. Sequences of - keys and values are built separately using a pair of - SequenceBuilders. The resulting Arrow representation - can be obtained via the Finish method. -*/ -class DictBuilder { - public: - DictBuilder(arrow::MemoryPool* pool = nullptr) : keys_(pool), vals_(pool) {} - - //! Builder for the keys of the dictionary - SequenceBuilder& keys() { return keys_; } - //! Builder for the values of the dictionary - SequenceBuilder& vals() { return vals_; } - - /*! Construct an Arrow StructArray representing the dictionary. - Contains a field "keys" for the keys and "vals" for the values. - - \param list_data - List containing the data from nested lists in the value - list of the dictionary - - \param dict_data - List containing the data from nested dictionaries in the - value list of the dictionary - */ - arrow::Status Finish(std::shared_ptr key_tuple_data, - std::shared_ptr key_dict_data, - std::shared_ptr val_list_data, - std::shared_ptr val_tuple_data, - std::shared_ptr val_dict_data, std::shared_ptr* out); - - private: - SequenceBuilder keys_; - SequenceBuilder vals_; -}; -} - -#endif diff --git a/src/numbuf/cpp/src/numbuf/sequence.cc b/src/numbuf/cpp/src/numbuf/sequence.cc deleted file mode 100644 index 694887374..000000000 --- a/src/numbuf/cpp/src/numbuf/sequence.cc +++ /dev/null @@ -1,149 +0,0 @@ -#include "sequence.h" - -using namespace arrow; - -namespace numbuf { - -SequenceBuilder::SequenceBuilder(MemoryPool* pool) - : pool_(pool), - types_(pool, std::make_shared()), - offsets_(pool, std::make_shared()), - nones_(pool, std::make_shared()), - bools_(pool, std::make_shared()), - ints_(pool, std::make_shared()), - bytes_(pool, std::make_shared()), - strings_(pool), - floats_(pool, std::make_shared()), - doubles_(pool, std::make_shared()), - tensor_indices_(pool, std::make_shared()), - list_offsets_({0}), - tuple_offsets_({0}), - dict_offsets_({0}) {} - -#define UPDATE(OFFSET, TAG) \ - if (TAG == -1) { \ - TAG = num_tags; \ - num_tags += 1; \ - } \ - RETURN_NOT_OK(offsets_.Append(OFFSET)); \ - RETURN_NOT_OK(types_.Append(TAG)); \ - RETURN_NOT_OK(nones_.AppendToBitmap(true)); - -Status SequenceBuilder::AppendNone() { - RETURN_NOT_OK(offsets_.Append(0)); - RETURN_NOT_OK(types_.Append(0)); - return nones_.AppendToBitmap(false); -} - -Status SequenceBuilder::AppendBool(bool data) { - UPDATE(bools_.length(), bool_tag); - return bools_.Append(data); -} - -Status SequenceBuilder::AppendInt64(int64_t data) { - UPDATE(ints_.length(), int_tag); - return ints_.Append(data); -} - -Status SequenceBuilder::AppendUInt64(uint64_t data) { - UPDATE(ints_.length(), int_tag); - return ints_.Append(data); -} - -Status SequenceBuilder::AppendBytes(const uint8_t* data, int32_t length) { - UPDATE(bytes_.length(), bytes_tag); - return bytes_.Append(data, length); -} - -Status SequenceBuilder::AppendString(const char* data, int32_t length) { - UPDATE(strings_.length(), string_tag); - return strings_.Append(data, length); -} - -Status SequenceBuilder::AppendFloat(float data) { - UPDATE(floats_.length(), float_tag); - return floats_.Append(data); -} - -Status SequenceBuilder::AppendDouble(double data) { - UPDATE(doubles_.length(), double_tag); - return doubles_.Append(data); -} - -Status SequenceBuilder::AppendTensor(int32_t tensor_index) { - UPDATE(tensor_indices_.length(), tensor_tag); - return tensor_indices_.Append(tensor_index); -} - -Status SequenceBuilder::AppendList(int32_t size) { - UPDATE(list_offsets_.size() - 1, list_tag); - list_offsets_.push_back(list_offsets_.back() + size); - return Status::OK(); -} - -Status SequenceBuilder::AppendTuple(int32_t size) { - UPDATE(tuple_offsets_.size() - 1, tuple_tag); - tuple_offsets_.push_back(tuple_offsets_.back() + size); - return Status::OK(); -} - -Status SequenceBuilder::AppendDict(int32_t size) { - UPDATE(dict_offsets_.size() - 1, dict_tag); - dict_offsets_.push_back(dict_offsets_.back() + size); - return Status::OK(); -} - -#define ADD_ELEMENT(VARNAME, TAG) \ - if (TAG != -1) { \ - types[TAG] = std::make_shared("", VARNAME.type()); \ - RETURN_NOT_OK(VARNAME.Finish(&children[TAG])); \ - RETURN_NOT_OK(nones_.AppendToBitmap(true)); \ - type_ids.push_back(TAG); \ - } - -#define ADD_SUBSEQUENCE(DATA, OFFSETS, BUILDER, TAG, NAME) \ - if (DATA) { \ - DCHECK(DATA->length() == OFFSETS.back()); \ - std::shared_ptr offset_array; \ - Int32Builder builder(pool_, std::make_shared()); \ - RETURN_NOT_OK(builder.Append(OFFSETS.data(), OFFSETS.size())); \ - RETURN_NOT_OK(builder.Finish(&offset_array)); \ - std::shared_ptr list_array; \ - ListArray::FromArrays(*offset_array, *DATA, pool_, &list_array); \ - auto field = std::make_shared(NAME, list_array->type()); \ - auto type = std::make_shared(std::vector({field})); \ - types[TAG] = std::make_shared("", type); \ - children[TAG] = std::shared_ptr( \ - new StructArray(type, list_array->length(), {list_array})); \ - RETURN_NOT_OK(nones_.AppendToBitmap(true)); \ - type_ids.push_back(TAG); \ - } else { \ - DCHECK(OFFSETS.size() == 1); \ - } - -Status SequenceBuilder::Finish(std::shared_ptr list_data, - std::shared_ptr tuple_data, std::shared_ptr dict_data, - std::shared_ptr* out) { - std::vector> types(num_tags); - std::vector> children(num_tags); - std::vector type_ids; - - ADD_ELEMENT(bools_, bool_tag); - ADD_ELEMENT(ints_, int_tag); - ADD_ELEMENT(strings_, string_tag); - ADD_ELEMENT(bytes_, bytes_tag); - ADD_ELEMENT(floats_, float_tag); - ADD_ELEMENT(doubles_, double_tag); - - ADD_ELEMENT(tensor_indices_, tensor_tag); - - ADD_SUBSEQUENCE(list_data, list_offsets_, list_builder, list_tag, "list"); - ADD_SUBSEQUENCE(tuple_data, tuple_offsets_, tuple_builder, tuple_tag, "tuple"); - ADD_SUBSEQUENCE(dict_data, dict_offsets_, dict_builder, dict_tag, "dict"); - - TypePtr type = TypePtr(new UnionType(types, type_ids, UnionMode::DENSE)); - out->reset(new UnionArray(type, types_.length(), children, types_.data(), - offsets_.data(), nones_.null_bitmap(), nones_.null_count())); - return Status::OK(); -} -} diff --git a/src/numbuf/cpp/src/numbuf/sequence.h b/src/numbuf/cpp/src/numbuf/sequence.h deleted file mode 100644 index fea1d21cd..000000000 --- a/src/numbuf/cpp/src/numbuf/sequence.h +++ /dev/null @@ -1,125 +0,0 @@ -#ifndef NUMBUF_LIST_H -#define NUMBUF_LIST_H - -#include -#include - -namespace numbuf { - -class NullArrayBuilder : public arrow::ArrayBuilder { - public: - explicit NullArrayBuilder(arrow::MemoryPool* pool, const arrow::TypePtr& type) - : arrow::ArrayBuilder(pool, type) {} - virtual ~NullArrayBuilder(){}; - arrow::Status Finish(std::shared_ptr* out) override { - return arrow::Status::OK(); - } -}; - -/*! A Sequence is a heterogeneous collections of elements. It can contain - scalar Python types, lists, tuples, dictionaries and tensors. -*/ -class SequenceBuilder { - public: - SequenceBuilder(arrow::MemoryPool* pool = nullptr); - - //! Appending a none to the sequence - arrow::Status AppendNone(); - - //! Appending a boolean to the sequence - arrow::Status AppendBool(bool data); - - //! Appending an int64_t to the sequence - arrow::Status AppendInt64(int64_t data); - - //! Appending an uint64_t to the sequence - arrow::Status AppendUInt64(uint64_t data); - - //! Append a list of bytes to the sequence - arrow::Status AppendBytes(const uint8_t* data, int32_t length); - - //! Appending a string to the sequence - arrow::Status AppendString(const char* data, int32_t length); - - //! Appending a float to the sequence - arrow::Status AppendFloat(float data); - - //! Appending a double to the sequence - arrow::Status AppendDouble(double data); - - /*! Appending a tensor to the sequence - - \param tensor_index Index of the tensor in the object. - */ - arrow::Status AppendTensor(int32_t tensor_index); - - /*! Add a sublist to the sequence. The data contained in the sublist will be - specified in the "Finish" method. - - To construct l = [[11, 22], 33, [44, 55]] you would for example run - list = ListBuilder(); - list.AppendList(2); - list.Append(33); - list.AppendList(2); - list.Finish([11, 22, 44, 55]); - list.Finish(); - - \param size - The size of the sublist - */ - arrow::Status AppendList(int32_t size); - - arrow::Status AppendTuple(int32_t size); - - arrow::Status AppendDict(int32_t size); - - //! Finish building the sequence and return the result - arrow::Status Finish(std::shared_ptr list_data, - std::shared_ptr tuple_data, std::shared_ptr dict_data, - std::shared_ptr* out); - - private: - arrow::MemoryPool* pool_; - - arrow::Int8Builder types_; - arrow::Int32Builder offsets_; - - /* Total number of bytes needed to represent this sequence. */ - int64_t total_num_bytes_; - - NullArrayBuilder nones_; - arrow::BooleanBuilder bools_; - arrow::Int64Builder ints_; - arrow::BinaryBuilder bytes_; - arrow::StringBuilder strings_; - arrow::FloatBuilder floats_; - arrow::DoubleBuilder doubles_; - - // We use an Int32Builder here to distinguish the tensor indices from - // the ints_ above (see the case Type::INT32 in get_value in python.cc). - // TODO(pcm): Replace this by using the union tags to distinguish between - // these two cases. - arrow::Int32Builder tensor_indices_; - - std::vector list_offsets_; - std::vector tuple_offsets_; - std::vector dict_offsets_; - - int8_t bool_tag = -1; - int8_t int_tag = -1; - int8_t string_tag = -1; - int8_t bytes_tag = -1; - int8_t float_tag = -1; - int8_t double_tag = -1; - - int8_t tensor_tag = -1; - int8_t list_tag = -1; - int8_t tuple_tag = -1; - int8_t dict_tag = -1; - - int8_t num_tags = 0; -}; - -} // namespace numbuf - -#endif // NUMBUF_LIST_H diff --git a/src/numbuf/python/src/pynumbuf/adapters/numpy.cc b/src/numbuf/python/src/pynumbuf/adapters/numpy.cc deleted file mode 100644 index 92fc16aee..000000000 --- a/src/numbuf/python/src/pynumbuf/adapters/numpy.cc +++ /dev/null @@ -1,67 +0,0 @@ -#include "numpy.h" -#include "python.h" - -#include - -#include - -using namespace arrow; - -extern "C" { -extern PyObject* numbuf_serialize_callback; -extern PyObject* numbuf_deserialize_callback; -} - -namespace numbuf { - -Status DeserializeArray(std::shared_ptr array, int32_t offset, PyObject* base, - const std::vector>& tensors, PyObject** out) { - DCHECK(array); - int32_t index = std::static_pointer_cast(array)->Value(offset); - RETURN_NOT_OK(py::TensorToNdarray(*tensors[index], base, out)); - /* Mark the array as immutable. */ - PyObject* flags = PyObject_GetAttrString(*out, "flags"); - DCHECK(flags != NULL) << "Could not mark Numpy array immutable"; - int flag_set = PyObject_SetAttrString(flags, "writeable", Py_False); - DCHECK(flag_set == 0) << "Could not mark Numpy array immutable"; - Py_XDECREF(flags); - return Status::OK(); -} - -Status SerializeArray(PyArrayObject* array, SequenceBuilder& builder, - std::vector& subdicts, std::vector& tensors_out) { - int dtype = PyArray_TYPE(array); - switch (dtype) { - case NPY_BOOL: - case NPY_UINT8: - case NPY_INT8: - case NPY_UINT16: - case NPY_INT16: - case NPY_UINT32: - case NPY_INT32: - case NPY_UINT64: - case NPY_INT64: - case NPY_FLOAT: - case NPY_DOUBLE: { - RETURN_NOT_OK(builder.AppendTensor(tensors_out.size())); - tensors_out.push_back(reinterpret_cast(array)); - } break; - default: - if (!numbuf_serialize_callback) { - std::stringstream stream; - stream << "numpy data type not recognized: " << dtype; - return Status::NotImplemented(stream.str()); - } else { - PyObject* arglist = Py_BuildValue("(O)", array); - // The reference count of the result of the call to PyObject_CallObject - // must be decremented. This is done in SerializeDict in python.cc. - PyObject* result = PyObject_CallObject(numbuf_serialize_callback, arglist); - Py_XDECREF(arglist); - if (!result) { return Status::NotImplemented("python error"); } - builder.AppendDict(PyDict_Size(result)); - subdicts.push_back(result); - } - } - return Status::OK(); -} -} diff --git a/src/numbuf/python/src/pynumbuf/adapters/numpy.h b/src/numbuf/python/src/pynumbuf/adapters/numpy.h deleted file mode 100644 index 1d2bf887e..000000000 --- a/src/numbuf/python/src/pynumbuf/adapters/numpy.h +++ /dev/null @@ -1,23 +0,0 @@ -#ifndef PYNUMBUF_NUMPY_H -#define PYNUMBUF_NUMPY_H - -#include -#include - -#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION -#define NO_IMPORT_ARRAY -#define PY_ARRAY_UNIQUE_SYMBOL arrow_ARRAY_API -#include - -#include - -namespace numbuf { - -arrow::Status SerializeArray(PyArrayObject* array, SequenceBuilder& builder, - std::vector& subdicts, std::vector& tensors_out); -arrow::Status DeserializeArray(std::shared_ptr array, int32_t offset, - PyObject* base, const std::vector>& tensors, - PyObject** out); -} - -#endif diff --git a/src/numbuf/python/src/pynumbuf/adapters/python.cc b/src/numbuf/python/src/pynumbuf/adapters/python.cc deleted file mode 100644 index 262c3bb7c..000000000 --- a/src/numbuf/python/src/pynumbuf/adapters/python.cc +++ /dev/null @@ -1,340 +0,0 @@ -#include "python.h" - -#include - -#include "scalars.h" - -using namespace arrow; - -int32_t MAX_RECURSION_DEPTH = 100; - -extern "C" { - -extern PyObject* numbuf_serialize_callback; -extern PyObject* numbuf_deserialize_callback; -} - -namespace numbuf { - -#if PY_MAJOR_VERSION >= 3 -#define PyInt_FromLong PyLong_FromLong -#endif - -Status get_value(std::shared_ptr arr, int32_t index, int32_t type, PyObject* base, - const std::vector>& tensors, PyObject** result) { - switch (arr->type()->id()) { - case Type::BOOL: - *result = - PyBool_FromLong(std::static_pointer_cast(arr)->Value(index)); - return Status::OK(); - case Type::INT64: - *result = PyInt_FromLong(std::static_pointer_cast(arr)->Value(index)); - return Status::OK(); - case Type::BINARY: { - int32_t nchars; - const uint8_t* str = - std::static_pointer_cast(arr)->GetValue(index, &nchars); - *result = PyBytes_FromStringAndSize(reinterpret_cast(str), nchars); - return Status::OK(); - } - case Type::STRING: { - int32_t nchars; - const uint8_t* str = - std::static_pointer_cast(arr)->GetValue(index, &nchars); - *result = PyUnicode_FromStringAndSize(reinterpret_cast(str), nchars); - return Status::OK(); - } - case Type::FLOAT: - *result = - PyFloat_FromDouble(std::static_pointer_cast(arr)->Value(index)); - return Status::OK(); - case Type::DOUBLE: - *result = - PyFloat_FromDouble(std::static_pointer_cast(arr)->Value(index)); - return Status::OK(); - case Type::STRUCT: { - auto s = std::static_pointer_cast(arr); - auto l = std::static_pointer_cast(s->field(0)); - if (s->type()->child(0)->name() == "list") { - return DeserializeList(l->values(), l->value_offset(index), - l->value_offset(index + 1), base, tensors, result); - } else if (s->type()->child(0)->name() == "tuple") { - return DeserializeTuple(l->values(), l->value_offset(index), - l->value_offset(index + 1), base, tensors, result); - } else if (s->type()->child(0)->name() == "dict") { - return DeserializeDict(l->values(), l->value_offset(index), - l->value_offset(index + 1), base, tensors, result); - } else { - DCHECK(false) << "error"; - } - } - // We use an Int32Builder here to distinguish the tensor indices from - // the Type::INT64 above (see tensor_indices_ in sequence.h). - case Type::INT32: { - int32_t val = std::static_pointer_cast(arr)->Value(index); - return DeserializeArray(arr, index, base, tensors, result); - } - default: - DCHECK(false) << "union tag not recognized " << type; - } - return Status::OK(); -} - -Status CallCustomSerializationCallback(PyObject* elem, PyObject** serialized_object) { - *serialized_object = NULL; - if (!numbuf_serialize_callback) { - std::stringstream ss; - ss << "data type of " << PyBytes_AS_STRING(PyObject_Repr(elem)) - << " not recognized and custom serialization handler not registered"; - return Status::NotImplemented(ss.str()); - } else { - PyObject* arglist = Py_BuildValue("(O)", elem); - // The reference count of the result of the call to PyObject_CallObject - // must be decremented. This is done in SerializeDict in this file. - PyObject* result = PyObject_CallObject(numbuf_serialize_callback, arglist); - Py_XDECREF(arglist); - if (!result) { return Status::NotImplemented("python error"); } - *serialized_object = result; - } - return Status::OK(); -} - -Status append(PyObject* elem, SequenceBuilder& builder, std::vector& sublists, - std::vector& subtuples, std::vector& subdicts, - std::vector& tensors_out) { - // The bool case must precede the int case (PyInt_Check passes for bools) - if (PyBool_Check(elem)) { - RETURN_NOT_OK(builder.AppendBool(elem == Py_True)); - } else if (PyFloat_Check(elem)) { - RETURN_NOT_OK(builder.AppendDouble(PyFloat_AS_DOUBLE(elem))); - } else if (PyLong_Check(elem)) { - int overflow = 0; - int64_t data = PyLong_AsLongLongAndOverflow(elem, &overflow); - if (!overflow) { - RETURN_NOT_OK(builder.AppendInt64(data)); - } else { - // Attempt to serialize the object using the custom callback. - PyObject* serialized_object; - // The reference count of serialized_object is incremented in the function - // CallCustomSerializationCallback (if the call is successful), and it will - // be decremented in SerializeDict in this file. - RETURN_NOT_OK(CallCustomSerializationCallback(elem, &serialized_object)); - RETURN_NOT_OK(builder.AppendDict(PyDict_Size(serialized_object))); - subdicts.push_back(serialized_object); - } -#if PY_MAJOR_VERSION < 3 - } else if (PyInt_Check(elem)) { - RETURN_NOT_OK(builder.AppendInt64(static_cast(PyInt_AS_LONG(elem)))); -#endif - } else if (PyBytes_Check(elem)) { - auto data = reinterpret_cast(PyBytes_AS_STRING(elem)); - auto size = PyBytes_GET_SIZE(elem); - RETURN_NOT_OK(builder.AppendBytes(data, size)); - } else if (PyUnicode_Check(elem)) { - Py_ssize_t size; -#if PY_MAJOR_VERSION >= 3 - char* data = PyUnicode_AsUTF8AndSize(elem, &size); - Status s = builder.AppendString(data, size); -#else - PyObject* str = PyUnicode_AsUTF8String(elem); - char* data = PyString_AS_STRING(str); - size = PyString_GET_SIZE(str); - Status s = builder.AppendString(data, size); - Py_XDECREF(str); -#endif - RETURN_NOT_OK(s); - } else if (PyList_Check(elem)) { - RETURN_NOT_OK(builder.AppendList(PyList_Size(elem))); - sublists.push_back(elem); - } else if (PyDict_Check(elem)) { - RETURN_NOT_OK(builder.AppendDict(PyDict_Size(elem))); - subdicts.push_back(elem); - } else if (PyTuple_CheckExact(elem)) { - RETURN_NOT_OK(builder.AppendTuple(PyTuple_Size(elem))); - subtuples.push_back(elem); - } else if (PyArray_IsScalar(elem, Generic)) { - RETURN_NOT_OK(AppendScalar(elem, builder)); - } else if (PyArray_Check(elem)) { - RETURN_NOT_OK(SerializeArray((PyArrayObject*)elem, builder, subdicts, tensors_out)); - } else if (elem == Py_None) { - RETURN_NOT_OK(builder.AppendNone()); - } else { - // Attempt to serialize the object using the custom callback. - PyObject* serialized_object; - // The reference count of serialized_object is incremented in the function - // CallCustomSerializationCallback (if the call is successful), and it will - // be decremented in SerializeDict in this file. - RETURN_NOT_OK(CallCustomSerializationCallback(elem, &serialized_object)); - RETURN_NOT_OK(builder.AppendDict(PyDict_Size(serialized_object))); - subdicts.push_back(serialized_object); - } - return Status::OK(); -} - -Status SerializeSequences(std::vector sequences, int32_t recursion_depth, - std::shared_ptr* out, std::vector& tensors_out) { - DCHECK(out); - if (recursion_depth >= MAX_RECURSION_DEPTH) { - return Status::NotImplemented( - "This object exceeds the maximum recursion depth. It may contain itself " - "recursively."); - } - SequenceBuilder builder(nullptr); - std::vector sublists, subtuples, subdicts; - for (const auto& sequence : sequences) { - PyObject* item; - PyObject* iterator = PyObject_GetIter(sequence); - while ((item = PyIter_Next(iterator))) { - Status s = append(item, builder, sublists, subtuples, subdicts, tensors_out); - Py_DECREF(item); - // if an error occurs, we need to decrement the reference counts before returning - if (!s.ok()) { - Py_DECREF(iterator); - return s; - } - } - Py_DECREF(iterator); - } - std::shared_ptr list; - if (sublists.size() > 0) { - RETURN_NOT_OK(SerializeSequences(sublists, recursion_depth + 1, &list, tensors_out)); - } - std::shared_ptr tuple; - if (subtuples.size() > 0) { - RETURN_NOT_OK( - SerializeSequences(subtuples, recursion_depth + 1, &tuple, tensors_out)); - } - std::shared_ptr dict; - if (subdicts.size() > 0) { - RETURN_NOT_OK(SerializeDict(subdicts, recursion_depth + 1, &dict, tensors_out)); - } - return builder.Finish(list, tuple, dict, out); -} - -#define DESERIALIZE_SEQUENCE(CREATE, SET_ITEM) \ - auto data = std::dynamic_pointer_cast(array); \ - int32_t size = array->length(); \ - PyObject* result = CREATE(stop_idx - start_idx); \ - auto types = std::make_shared(size, data->type_ids()); \ - auto offsets = std::make_shared(size, data->value_offsets()); \ - for (size_t i = start_idx; i < stop_idx; ++i) { \ - if (data->IsNull(i)) { \ - Py_INCREF(Py_None); \ - SET_ITEM(result, i - start_idx, Py_None); \ - } else { \ - int32_t offset = offsets->Value(i); \ - int8_t type = types->Value(i); \ - std::shared_ptr arr = data->child(type); \ - PyObject* value; \ - RETURN_NOT_OK(get_value(arr, offset, type, base, tensors, &value)); \ - SET_ITEM(result, i - start_idx, value); \ - } \ - } \ - *out = result; \ - return Status::OK(); - -Status DeserializeList(std::shared_ptr array, int32_t start_idx, int32_t stop_idx, - PyObject* base, const std::vector>& tensors, PyObject** out) { - DESERIALIZE_SEQUENCE(PyList_New, PyList_SetItem) -} - -Status DeserializeTuple(std::shared_ptr array, int32_t start_idx, int32_t stop_idx, - PyObject* base, const std::vector>& tensors, PyObject** out) { - DESERIALIZE_SEQUENCE(PyTuple_New, PyTuple_SetItem) -} - -Status SerializeDict(std::vector dicts, int32_t recursion_depth, - std::shared_ptr* out, std::vector& tensors_out) { - DictBuilder result; - if (recursion_depth >= MAX_RECURSION_DEPTH) { - return Status::NotImplemented( - "This object exceeds the maximum recursion depth. It may contain itself " - "recursively."); - } - std::vector key_tuples, key_dicts, val_lists, val_tuples, val_dicts, dummy; - for (const auto& dict : dicts) { - PyObject *key, *value; - Py_ssize_t pos = 0; - while (PyDict_Next(dict, &pos, &key, &value)) { - RETURN_NOT_OK( - append(key, result.keys(), dummy, key_tuples, key_dicts, tensors_out)); - DCHECK(dummy.size() == 0); - RETURN_NOT_OK( - append(value, result.vals(), val_lists, val_tuples, val_dicts, tensors_out)); - } - } - std::shared_ptr key_tuples_arr; - if (key_tuples.size() > 0) { - RETURN_NOT_OK(SerializeSequences( - key_tuples, recursion_depth + 1, &key_tuples_arr, tensors_out)); - } - std::shared_ptr key_dicts_arr; - if (key_dicts.size() > 0) { - RETURN_NOT_OK( - SerializeDict(key_dicts, recursion_depth + 1, &key_dicts_arr, tensors_out)); - } - std::shared_ptr val_list_arr; - if (val_lists.size() > 0) { - RETURN_NOT_OK( - SerializeSequences(val_lists, recursion_depth + 1, &val_list_arr, tensors_out)); - } - std::shared_ptr val_tuples_arr; - if (val_tuples.size() > 0) { - RETURN_NOT_OK(SerializeSequences( - val_tuples, recursion_depth + 1, &val_tuples_arr, tensors_out)); - } - std::shared_ptr val_dict_arr; - if (val_dicts.size() > 0) { - RETURN_NOT_OK( - SerializeDict(val_dicts, recursion_depth + 1, &val_dict_arr, tensors_out)); - } - RETURN_NOT_OK(result.Finish( - key_tuples_arr, key_dicts_arr, val_list_arr, val_tuples_arr, val_dict_arr, out)); - - // This block is used to decrement the reference counts of the results - // returned by the serialization callback, which is called in SerializeArray - // in numpy.cc as well as in DeserializeDict and in append in this file. - static PyObject* py_type = PyUnicode_FromString("_pytype_"); - for (const auto& dict : dicts) { - if (PyDict_Contains(dict, py_type)) { - // If the dictionary contains the key "_pytype_", then the user has to - // have registered a callback. - ARROW_CHECK(numbuf_serialize_callback); - Py_XDECREF(dict); - } - } - - return Status::OK(); -} - -Status DeserializeDict(std::shared_ptr array, int32_t start_idx, int32_t stop_idx, - PyObject* base, const std::vector>& tensors, PyObject** out) { - auto data = std::dynamic_pointer_cast(array); - // TODO(pcm): error handling, get rid of the temporary copy of the list - PyObject *keys, *vals; - PyObject* result = PyDict_New(); - ARROW_RETURN_NOT_OK( - DeserializeList(data->field(0), start_idx, stop_idx, base, tensors, &keys)); - ARROW_RETURN_NOT_OK( - DeserializeList(data->field(1), start_idx, stop_idx, base, tensors, &vals)); - for (size_t i = start_idx; i < stop_idx; ++i) { - PyDict_SetItem( - result, PyList_GetItem(keys, i - start_idx), PyList_GetItem(vals, i - start_idx)); - } - Py_XDECREF(keys); // PyList_GetItem(keys, ...) incremented the reference count - Py_XDECREF(vals); // PyList_GetItem(vals, ...) incremented the reference count - static PyObject* py_type = PyUnicode_FromString("_pytype_"); - if (PyDict_Contains(result, py_type) && numbuf_deserialize_callback) { - PyObject* arglist = Py_BuildValue("(O)", result); - // The result of the call to PyObject_CallObject will be passed to Python - // and its reference count will be decremented by the interpreter. - PyObject* callback_result = PyObject_CallObject(numbuf_deserialize_callback, arglist); - Py_XDECREF(arglist); - Py_XDECREF(result); - result = callback_result; - if (!callback_result) { return Status::NotImplemented("python error"); } - } - *out = result; - return Status::OK(); -} -} diff --git a/src/numbuf/python/src/pynumbuf/adapters/python.h b/src/numbuf/python/src/pynumbuf/adapters/python.h deleted file mode 100644 index d66aec109..000000000 --- a/src/numbuf/python/src/pynumbuf/adapters/python.h +++ /dev/null @@ -1,30 +0,0 @@ -#ifndef PYNUMBUF_PYTHON_H -#define PYNUMBUF_PYTHON_H - -#include - -#include -#include -#include - -#include "numpy.h" - -namespace numbuf { - -arrow::Status SerializeSequences(std::vector sequences, - int32_t recursion_depth, std::shared_ptr* out, - std::vector& tensors_out); -arrow::Status SerializeDict(std::vector dicts, int32_t recursion_depth, - std::shared_ptr* out, std::vector& tensors_out); -arrow::Status DeserializeList(std::shared_ptr array, int32_t start_idx, - int32_t stop_idx, PyObject* base, - const std::vector>& tensors, PyObject** out); -arrow::Status DeserializeTuple(std::shared_ptr array, int32_t start_idx, - int32_t stop_idx, PyObject* base, - const std::vector>& tensors, PyObject** out); -arrow::Status DeserializeDict(std::shared_ptr array, int32_t start_idx, - int32_t stop_idx, PyObject* base, - const std::vector>& tensors, PyObject** out); -} - -#endif diff --git a/src/numbuf/python/src/pynumbuf/adapters/scalars.h b/src/numbuf/python/src/pynumbuf/adapters/scalars.h deleted file mode 100644 index c7c65dce1..000000000 --- a/src/numbuf/python/src/pynumbuf/adapters/scalars.h +++ /dev/null @@ -1,54 +0,0 @@ -#ifndef PYNUMBUF_SCALARS_H -#define PYNUMBUF_SCALARS_H - -#include - -#include -#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION -#define NO_IMPORT_ARRAY -#define PY_ARRAY_UNIQUE_SYMBOL arrow_ARRAY_API -#include -#include - -#include - -namespace numbuf { - -arrow::Status AppendScalar(PyObject* obj, SequenceBuilder& builder) { - if (PyArray_IsScalar(obj, Bool)) { - return builder.AppendBool(((PyBoolScalarObject*)obj)->obval != 0); - } else if (PyArray_IsScalar(obj, Float)) { - return builder.AppendFloat(((PyFloatScalarObject*)obj)->obval); - } else if (PyArray_IsScalar(obj, Double)) { - return builder.AppendDouble(((PyDoubleScalarObject*)obj)->obval); - } - int64_t value = 0; - if (PyArray_IsScalar(obj, Byte)) { - value = ((PyByteScalarObject*)obj)->obval; - } else if (PyArray_IsScalar(obj, UByte)) { - value = ((PyUByteScalarObject*)obj)->obval; - } else if (PyArray_IsScalar(obj, Short)) { - value = ((PyShortScalarObject*)obj)->obval; - } else if (PyArray_IsScalar(obj, UShort)) { - value = ((PyUShortScalarObject*)obj)->obval; - } else if (PyArray_IsScalar(obj, Int)) { - value = ((PyIntScalarObject*)obj)->obval; - } else if (PyArray_IsScalar(obj, UInt)) { - value = ((PyUIntScalarObject*)obj)->obval; - } else if (PyArray_IsScalar(obj, Long)) { - value = ((PyLongScalarObject*)obj)->obval; - } else if (PyArray_IsScalar(obj, ULong)) { - value = ((PyULongScalarObject*)obj)->obval; - } else if (PyArray_IsScalar(obj, LongLong)) { - value = ((PyLongLongScalarObject*)obj)->obval; - } else if (PyArray_IsScalar(obj, ULongLong)) { - value = ((PyULongLongScalarObject*)obj)->obval; - } else { - DCHECK(false) << "scalar type not recognized"; - } - return builder.AppendInt64(value); -} - -} // namespace - -#endif // PYNUMBUF_SCALARS_H diff --git a/src/numbuf/python/src/pynumbuf/numbuf.cc b/src/numbuf/python/src/pynumbuf/numbuf.cc deleted file mode 100644 index 63b947d82..000000000 --- a/src/numbuf/python/src/pynumbuf/numbuf.cc +++ /dev/null @@ -1,529 +0,0 @@ -#include -#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION -#define PY_ARRAY_UNIQUE_SYMBOL arrow_ARRAY_API -#include - -#include "bytesobject.h" - -#include - -#ifdef HAS_PLASMA -// This needs to be included before plasma_protocol. We cannot include it in -// plasma_protocol, because that file is used both with the store and the -// manager, the store uses it the ObjectID from plasma_common.h and the -// manager uses it with the ObjectID from common.h. -#include "plasma/common.h" - -#include "plasma/client.h" -#include "plasma/protocol.h" - -extern "C" { -PyObject* NumbufPlasmaOutOfMemoryError; -PyObject* NumbufPlasmaObjectExistsError; -} - -using namespace plasma; - -#endif - -#include -#include -#include -#include -#include - -#include "adapters/python.h" - -using namespace arrow; -using namespace numbuf; - -struct RayObject { - std::shared_ptr batch; - std::vector arrays; - std::vector> tensors; -}; - -// Each arrow object is stored in the format -// | length of the object in bytes | object data |. -// LENGTH_PREFIX_SIZE is the number of bytes occupied by the -// object length field. -constexpr int64_t LENGTH_PREFIX_SIZE = sizeof(int64_t); - -std::shared_ptr make_batch(std::shared_ptr data) { - auto field = std::make_shared("list", data->type()); - std::shared_ptr schema(new Schema({field})); - return std::shared_ptr(new RecordBatch(schema, data->length(), {data})); -} - -Status write_batch_and_tensors(io::OutputStream* stream, - std::shared_ptr batch, const std::vector& tensors, - int64_t* batch_size, int64_t* total_size) { - std::shared_ptr writer; - RETURN_NOT_OK(ipc::FileWriter::Open(stream, batch->schema(), &writer)); - RETURN_NOT_OK(writer->WriteRecordBatch(*batch, true)); - RETURN_NOT_OK(writer->Close()); - RETURN_NOT_OK(stream->Tell(batch_size)); - for (auto array : tensors) { - int32_t metadata_length; - int64_t body_length; - std::shared_ptr tensor; - auto contiguous = (PyObject*)PyArray_GETCONTIGUOUS((PyArrayObject*)array); - RETURN_NOT_OK(py::NdarrayToTensor(NULL, contiguous, &tensor)); - RETURN_NOT_OK(ipc::WriteTensor(*tensor, stream, &metadata_length, &body_length)); - Py_XDECREF(contiguous); - } - RETURN_NOT_OK(stream->Tell(total_size)); - return Status::OK(); -} - -Status read_batch_and_tensors(uint8_t* data, int64_t size, - std::shared_ptr* batch_out, - std::vector>& tensors_out) { - std::shared_ptr reader; - int64_t batch_size = *((int64_t*)data); - auto source = std::make_shared( - LENGTH_PREFIX_SIZE + data, size - LENGTH_PREFIX_SIZE); - RETURN_NOT_OK(arrow::ipc::FileReader::Open(source, batch_size, &reader)); - RETURN_NOT_OK(reader->ReadRecordBatch(0, batch_out)); - int64_t offset = batch_size; - while (true) { - std::shared_ptr tensor; - Status s = ipc::ReadTensor(offset, source.get(), &tensor); - if (!s.ok()) { break; } - tensors_out.push_back(tensor); - RETURN_NOT_OK(source->Tell(&offset)); - } - return Status::OK(); -} - -extern "C" { - -#define CHECK_SERIALIZATION_ERROR(STATUS) \ - do { \ - Status _s = (STATUS); \ - if (!_s.ok()) { \ - /* If this condition is true, there was an error in the callback that \ - * needs to be passed through */ \ - if (!PyErr_Occurred()) { PyErr_SetString(NumbufError, _s.ToString().c_str()); } \ - return NULL; \ - } \ - } while (0) - -static PyObject* NumbufError; - -PyObject* numbuf_serialize_callback = NULL; -PyObject* numbuf_deserialize_callback = NULL; - -int PyObjectToArrow(PyObject* object, RayObject** result) { - if (PyCapsule_IsValid(object, "arrow")) { - *result = reinterpret_cast(PyCapsule_GetPointer(object, "arrow")); - return 1; - } else { - PyErr_SetString(PyExc_TypeError, "must be an 'arrow' capsule"); - return 0; - } -} - -static void ArrowCapsule_Destructor(PyObject* capsule) { - delete reinterpret_cast(PyCapsule_GetPointer(capsule, "arrow")); -} - -static int PyObjectToPlasmaClient(PyObject* object, PlasmaClient** client) { - if (PyCapsule_IsValid(object, "plasma")) { - *client = reinterpret_cast(PyCapsule_GetPointer(object, "plasma")); - return 1; - } else { - PyErr_SetString(PyExc_TypeError, "must be a 'plasma' capsule"); - return 0; - } -} - -int PyStringToUniqueID(PyObject* object, ObjectID* object_id) { - if (PyBytes_Check(object)) { - memcpy(object_id, PyBytes_AsString(object), sizeof(ObjectID)); - return 1; - } else { - PyErr_SetString(PyExc_TypeError, "must be a 20 character string"); - return 0; - } -} - -/* Documented in doc/numbuf.rst in ray-core */ -static PyObject* serialize_list(PyObject* self, PyObject* args) { - PyObject* value; - if (!PyArg_ParseTuple(args, "O", &value)) { return NULL; } - std::shared_ptr array; - if (PyList_Check(value)) { - RayObject* object = new RayObject(); - int32_t recursion_depth = 0; - Status s = SerializeSequences( - std::vector({value}), recursion_depth, &array, object->arrays); - CHECK_SERIALIZATION_ERROR(s); - - for (auto array : object->arrays) { - int32_t metadata_length; - int64_t body_length; - std::shared_ptr tensor; - ARROW_CHECK_OK(py::NdarrayToTensor(NULL, array, &tensor)); - object->tensors.push_back(tensor); - } - - object->batch = make_batch(array); - - int64_t data_size, total_size; - auto mock = std::make_shared(); - write_batch_and_tensors( - mock.get(), object->batch, object->arrays, &data_size, &total_size); - - PyObject* r = PyTuple_New(2); - PyTuple_SetItem(r, 0, PyLong_FromLong(LENGTH_PREFIX_SIZE + total_size)); - PyTuple_SetItem(r, 1, PyCapsule_New(reinterpret_cast(object), "arrow", - &ArrowCapsule_Destructor)); - return r; - } - return NULL; -} - -/* Documented in doc/numbuf.rst in ray-core */ -static PyObject* write_to_buffer(PyObject* self, PyObject* args) { - RayObject* object; - PyObject* memoryview; - if (!PyArg_ParseTuple(args, "O&O", &PyObjectToArrow, &object, &memoryview)) { - return NULL; - } - if (!PyMemoryView_Check(memoryview)) { return NULL; } - Py_buffer* buffer = PyMemoryView_GET_BUFFER(memoryview); - auto buf = std::make_shared( - LENGTH_PREFIX_SIZE + reinterpret_cast(buffer->buf), - buffer->len - LENGTH_PREFIX_SIZE); - auto target = std::make_shared(buf); - target->set_memcopy_threads(8); - int64_t batch_size, total_size; - ARROW_CHECK_OK(write_batch_and_tensors( - target.get(), object->batch, object->arrays, &batch_size, &total_size)); - *((int64_t*)buffer->buf) = buffer->len - LENGTH_PREFIX_SIZE; - Py_RETURN_NONE; -} - -/* Documented in doc/numbuf.rst in ray-core */ -static PyObject* read_from_buffer(PyObject* self, PyObject* args) { - PyObject* data_memoryview; - if (!PyArg_ParseTuple(args, "O", &data_memoryview)) { return NULL; } - - Py_buffer* data_buffer = PyMemoryView_GET_BUFFER(data_memoryview); - - RayObject* object = new RayObject(); - ARROW_CHECK_OK(read_batch_and_tensors(reinterpret_cast(data_buffer->buf), - data_buffer->len, &object->batch, object->tensors)); - - return PyCapsule_New( - reinterpret_cast(object), "arrow", &ArrowCapsule_Destructor); -} - -/* Documented in doc/numbuf.rst in ray-core */ -static PyObject* deserialize_list(PyObject* self, PyObject* args) { - RayObject* object; - PyObject* base = Py_None; - if (!PyArg_ParseTuple(args, "O&|O", &PyObjectToArrow, &object, &base)) { return NULL; } - PyObject* result; - Status s = DeserializeList(object->batch->column(0), 0, object->batch->num_rows(), base, - object->tensors, &result); - CHECK_SERIALIZATION_ERROR(s); - return result; -} - -static PyObject* register_callbacks(PyObject* self, PyObject* args) { - PyObject* result = NULL; - PyObject* serialize_callback; - PyObject* deserialize_callback; - if (PyArg_ParseTuple( - args, "OO:register_callbacks", &serialize_callback, &deserialize_callback)) { - if (!PyCallable_Check(serialize_callback)) { - PyErr_SetString(PyExc_TypeError, "serialize_callback must be callable"); - return NULL; - } - if (!PyCallable_Check(deserialize_callback)) { - PyErr_SetString(PyExc_TypeError, "deserialize_callback must be callable"); - return NULL; - } - Py_XINCREF(serialize_callback); // Add a reference to new serialization callback - Py_XINCREF(deserialize_callback); // Add a reference to new deserialization callback - Py_XDECREF(numbuf_serialize_callback); // Dispose of old serialization callback - Py_XDECREF(numbuf_deserialize_callback); // Dispose of old deserialization callback - numbuf_serialize_callback = serialize_callback; - numbuf_deserialize_callback = deserialize_callback; - Py_INCREF(Py_None); - result = Py_None; - } - return result; -} - -#ifdef HAS_PLASMA - -/** - * Release the object when its associated PyCapsule goes out of scope. - * - * The PyCapsule is used as the base object for the Python object that - * is stored with store_list and retrieved with retrieve_list. The base - * object ensures that the reference count of the capsule is non-zero - * during the lifetime of the Python object returned by retrieve_list. - * - * @param capsule The capsule that went out of scope. - * @return Void. - */ -static void BufferCapsule_Destructor(PyObject* capsule) { - plasma::ObjectID* id = - reinterpret_cast(PyCapsule_GetPointer(capsule, "buffer")); - auto context = reinterpret_cast(PyCapsule_GetContext(capsule)); - /* We use the context of the connection capsule to indicate if the connection - * is still active (if the context is NULL) or if it is closed (if the context - * is (void*) 0x1). This is neccessary because the primary pointer of the - * capsule cannot be NULL. */ - if (PyCapsule_GetContext(context) == NULL) { - plasma::PlasmaClient* client; - ARROW_CHECK(PyObjectToPlasmaClient(context, &client)); - ARROW_CHECK_OK(client->Release(*id)); - } - Py_XDECREF(context); - delete id; -} - -/** - * Store a PyList in the plasma store. - * - * This function converts the PyList into an arrow RecordBatch, constructs the - * metadata (schema) of the PyList, creates a new plasma object, puts the data - * into the plasma buffer and the schema into the plasma metadata. This raises - * - * - * @param args Contains the object ID the list is stored under, the - * connection to the plasma store and the PyList we want to store. - * @return None. - */ -static PyObject* store_list(PyObject* self, PyObject* args) { - ObjectID obj_id; - plasma::PlasmaClient* client; - PyObject* value; - if (!PyArg_ParseTuple(args, "O&O&O", PyStringToUniqueID, &obj_id, - PyObjectToPlasmaClient, &client, &value)) { - return NULL; - } - if (!PyList_Check(value)) { return NULL; } - - std::shared_ptr array; - int32_t recursion_depth = 0; - std::vector tensors; - Status s = SerializeSequences( - std::vector({value}), recursion_depth, &array, tensors); - CHECK_SERIALIZATION_ERROR(s); - - std::shared_ptr batch = make_batch(array); - - int64_t data_size, total_size; - auto mock = std::make_shared(); - write_batch_and_tensors(mock.get(), batch, tensors, &data_size, &total_size); - - uint8_t* data; - /* The arrow schema is stored as the metadata of the plasma object and - * both the arrow data and the header end offset are - * stored in the plasma data buffer. The header end offset is stored in - * the first LENGTH_PREFIX_SIZE bytes of the data buffer. The RecordBatch - * data is stored after that. */ - s = client->Create(obj_id, LENGTH_PREFIX_SIZE + total_size, NULL, 0, &data); - if (s.IsPlasmaObjectExists()) { - PyErr_SetString(NumbufPlasmaObjectExistsError, - "An object with this ID already exists in the plasma " - "store."); - return NULL; - } - if (s.IsPlasmaStoreFull()) { - PyErr_SetString(NumbufPlasmaOutOfMemoryError, - "The plasma store ran out of memory and could not create " - "this object."); - return NULL; - } - ARROW_CHECK_OK(s); - - auto buf = - std::make_shared(LENGTH_PREFIX_SIZE + data, total_size); - auto target = std::make_shared(buf); - target->set_memcopy_threads(8); - write_batch_and_tensors(target.get(), batch, tensors, &data_size, &total_size); - *((int64_t*)data) = data_size; - - /* Do the plasma_release corresponding to the call to plasma_create. */ - ARROW_CHECK_OK(client->Release(obj_id)); - /* Seal the object. */ - ARROW_CHECK_OK(client->Seal(obj_id)); - Py_RETURN_NONE; -} - -/** - * Retrieve a PyList from the plasma store. - * - * This reads the arrow schema from the plasma metadata, constructs - * Python objects from the plasma data according to the schema and - * returns the object. - * - * @param args The arguments are, in order: - * 1) A list of object IDs of the lists to be retrieved. - * 2) The connection to the plasma store. - * 3) A timeout in milliseconds that the call should return by. This is - * -1 if the call should block forever, or 0 if the call should - * return immediately. - * @return A list of tuples, where the first element in the tuple is the object - * ID (appearing in the same order as in the argument to the method), - * and the second element in the tuple is the retrieved list (or None) - * if no value was retrieved. - */ -static PyObject* retrieve_list(PyObject* self, PyObject* args) { - PyObject* object_id_list; - PyObject* plasma_client; - long long timeout_ms; - if (!PyArg_ParseTuple(args, "OOL", &object_id_list, &plasma_client, &timeout_ms)) { - return NULL; - } - plasma::PlasmaClient* client; - if (!PyObjectToPlasmaClient(plasma_client, &client)) { return NULL; } - - Py_ssize_t num_object_ids = PyList_Size(object_id_list); - ObjectID* object_ids = new ObjectID[num_object_ids]; - ObjectBuffer* object_buffers = new ObjectBuffer[num_object_ids]; - - for (int i = 0; i < num_object_ids; ++i) { - PyStringToUniqueID(PyList_GetItem(object_id_list, i), &object_ids[i]); - } - - Py_BEGIN_ALLOW_THREADS; - ARROW_CHECK_OK(client->Get(object_ids, num_object_ids, timeout_ms, object_buffers)); - Py_END_ALLOW_THREADS; - - PyObject* returns = PyList_New(num_object_ids); - for (int i = 0; i < num_object_ids; ++i) { - PyObject* obj_id = PyList_GetItem(object_id_list, i); - PyObject* t = PyTuple_New(2); - Py_XINCREF(obj_id); - PyTuple_SetItem(t, 0, obj_id); - - if (object_buffers[i].data_size != -1) { - /* The object was retrieved, so return the object. */ - ObjectID* buffer_obj_id = new ObjectID(object_ids[i]); - /* This keeps a Plasma buffer in scope as long as an object that is backed by that - * buffer is in scope. This prevents memory in the object store from getting - * released while it is still being used to back a Python object. */ - PyObject* base = PyCapsule_New(buffer_obj_id, "buffer", BufferCapsule_Destructor); - PyCapsule_SetContext(base, plasma_client); - Py_XINCREF(plasma_client); - - auto batch = std::shared_ptr(); - std::vector> tensors; - ARROW_CHECK_OK(read_batch_and_tensors( - object_buffers[i].data, object_buffers[i].data_size, &batch, tensors)); - - PyObject* result; - Status s = - DeserializeList(batch->column(0), 0, batch->num_rows(), base, tensors, &result); - CHECK_SERIALIZATION_ERROR(s); - Py_XDECREF(base); - - PyTuple_SetItem(t, 1, result); - } else { - /* The object was not retrieved, so just add None to the list of return - * values. */ - Py_XINCREF(Py_None); - PyTuple_SetItem(t, 1, Py_None); - } - - PyList_SetItem(returns, i, t); - } - - delete[] object_ids; - delete[] object_buffers; - - return returns; -} - -#endif // HAS_PLASMA - -static PyMethodDef NumbufMethods[] = { - {"serialize_list", serialize_list, METH_VARARGS, "serialize a Python list"}, - {"deserialize_list", deserialize_list, METH_VARARGS, "deserialize a Python list"}, - {"write_to_buffer", write_to_buffer, METH_VARARGS, "write serialized data to buffer"}, - {"read_from_buffer", read_from_buffer, METH_VARARGS, - "read serialized data from buffer"}, - {"register_callbacks", register_callbacks, METH_VARARGS, - "set serialization and deserialization callbacks"}, -#ifdef HAS_PLASMA - {"store_list", store_list, METH_VARARGS, "store a Python list in plasma"}, - {"retrieve_list", retrieve_list, METH_VARARGS, "retrieve a Python list from plasma"}, -#endif - {NULL, NULL, 0, NULL}}; - -// clang-format off -#if PY_MAJOR_VERSION >= 3 -static struct PyModuleDef moduledef = { - PyModuleDef_HEAD_INIT, - "libnumbuf", /* m_name */ - "Python C Extension for Numbuf", /* m_doc */ - 0, /* m_size */ - NumbufMethods, /* m_methods */ - NULL, /* m_reload */ - NULL, /* m_traverse */ - NULL, /* m_clear */ - NULL, /* m_free */ -}; -#endif -// clang-format on - -#if PY_MAJOR_VERSION >= 3 -#define INITERROR return NULL -#else -#define INITERROR return -#endif - -#ifndef PyMODINIT_FUNC /* declarations for DLL import/export */ -#define PyMODINIT_FUNC void -#endif - -#if PY_MAJOR_VERSION >= 3 -#define MOD_INIT(name) PyMODINIT_FUNC PyInit_##name(void) -#else -#define MOD_INIT(name) PyMODINIT_FUNC init##name(void) -#endif - -MOD_INIT(libnumbuf) { -#if PY_MAJOR_VERSION >= 3 - PyObject* m = PyModule_Create(&moduledef); -#else - PyObject* m = - Py_InitModule3("libnumbuf", NumbufMethods, "Python C Extension for Numbuf"); -#endif - -#if HAS_PLASMA - /* Create a custom exception for when an object ID is reused. */ - char numbuf_plasma_object_exists_error[] = "numbuf_plasma_object_exists.error"; - NumbufPlasmaObjectExistsError = - PyErr_NewException(numbuf_plasma_object_exists_error, NULL, NULL); - Py_INCREF(NumbufPlasmaObjectExistsError); - PyModule_AddObject( - m, "numbuf_plasma_object_exists_error", NumbufPlasmaObjectExistsError); - /* Create a custom exception for when the plasma store is out of memory. */ - char numbuf_plasma_out_of_memory_error[] = "numbuf_plasma_out_of_memory.error"; - NumbufPlasmaOutOfMemoryError = - PyErr_NewException(numbuf_plasma_out_of_memory_error, NULL, NULL); - Py_INCREF(NumbufPlasmaOutOfMemoryError); - PyModule_AddObject( - m, "numbuf_plasma_out_of_memory_error", NumbufPlasmaOutOfMemoryError); -#endif - - char numbuf_error[] = "numbuf.error"; - NumbufError = PyErr_NewException(numbuf_error, NULL, NULL); - Py_INCREF(NumbufError); - PyModule_AddObject(m, "numbuf_error", NumbufError); - import_array(); - -#if PY_MAJOR_VERSION >= 3 - return m; -#endif -} -} diff --git a/src/numbuf/python/test/runtest.py b/src/numbuf/python/test/runtest.py deleted file mode 100644 index 9ed234af9..000000000 --- a/src/numbuf/python/test/runtest.py +++ /dev/null @@ -1,174 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import unittest -import ray.numbuf as numbuf -import numpy as np -from numpy.testing import assert_equal -import os -import sys - -TEST_OBJECTS = [{(1, 2): 1}, {(): 2}, [1, "hello", 3.0], 42, 43, - "hello world", - u"x", u"\u262F", 42.0, - 1 << 62, (1.0, "hi"), - None, (None, None), ("hello", None), - True, False, (True, False), "hello", - {True: "hello", False: "world"}, - {"hello": "world", 1: 42, 2.5: 45}, {}, - np.int8(3), np.int32(4), np.int64(5), - np.uint8(3), np.uint32(4), np.uint64(5), - np.float32(1.0), np.float64(1.0)] - -if sys.version_info < (3, 0): - TEST_OBJECTS += [long(42), long(1 << 62)] # noqa: F821 - - -class SerializationTests(unittest.TestCase): - - def roundTripTest(self, data): - size, serialized = numbuf.serialize_list(data) - result = numbuf.deserialize_list(serialized) - assert_equal(data, result) - - def testSimple(self): - self.roundTripTest([1, 2, 3]) - self.roundTripTest([1.0, 2.0, 3.0]) - self.roundTripTest(['hello', 'world']) - self.roundTripTest([1, 'hello', 1.0]) - self.roundTripTest([{'hello': 1.0, 'world': 42}]) - self.roundTripTest([True, False]) - - def testNone(self): - self.roundTripTest([1, 2, None, 3]) - - def testNested(self): - self.roundTripTest([{"hello": {"world": (1, 2, 3)}}]) - self.roundTripTest([((1,), (1, 2, 3, (4, 5, 6), "string"))]) - self.roundTripTest([{"hello": [1, 2, 3]}]) - self.roundTripTest([{"hello": [1, [2, 3]]}]) - self.roundTripTest([{"hello": (None, 2, [3, 4])}]) - self.roundTripTest( - [{"hello": (None, 2, [3, 4], np.array([1.0, 2.0, 3.0]))}]) - - def numpyTest(self, t): - a = np.random.randint(0, 10, size=(100, 100)).astype(t) - self.roundTripTest([a]) - - def testArrays(self): - for t in ["int8", "uint8", "int16", "uint16", "int32", "uint32", - "float32", "float64"]: - self.numpyTest(t) - - def testRay(self): - for obj in TEST_OBJECTS: - self.roundTripTest([obj]) - - def testCallback(self): - - class Foo(object): - def __init__(self): - self.x = 1 - - class Bar(object): - def __init__(self): - self.foo = Foo() - - def serialize(obj): - return dict(obj.__dict__, **{"_pytype_": type(obj).__name__}) - - def deserialize(obj): - if obj["_pytype_"] == "Foo": - result = Foo() - elif obj["_pytype_"] == "Bar": - result = Bar() - - obj.pop("_pytype_", None) - result.__dict__ = obj - return result - - bar = Bar() - bar.foo.x = 42 - - numbuf.register_callbacks(serialize, deserialize) - - size, serialized = numbuf.serialize_list([bar]) - self.assertEqual(numbuf.deserialize_list(serialized)[0].foo.x, 42) - - def testObjectArray(self): - x = np.array([1, 2, "hello"], dtype=object) - y = np.array([[1, 2], [3, 4]], dtype=object) - - def myserialize(obj): - return {"_pytype_": "numpy.array", "data": obj.tolist()} - - def mydeserialize(obj): - if obj["_pytype_"] == "numpy.array": - return np.array(obj["data"], dtype=object) - - numbuf.register_callbacks(myserialize, mydeserialize) - - size, serialized = numbuf.serialize_list([x, y]) - - assert_equal(numbuf.deserialize_list(serialized), [x, y]) - - def testBuffer(self): - for (i, obj) in enumerate(TEST_OBJECTS): - size, batch = numbuf.serialize_list([obj]) - size = size - buff = np.zeros(size, dtype="uint8") - numbuf.write_to_buffer(batch, memoryview(buff)) - array = numbuf.read_from_buffer(memoryview(buff)) - result = numbuf.deserialize_list(array) - assert_equal(result[0], obj) - - def testObjectArrayImmutable(self): - obj = np.zeros([10]) - size, serialized = numbuf.serialize_list([obj]) - result = numbuf.deserialize_list(serialized) - assert_equal(result[0], obj) - with self.assertRaises(ValueError): - result[0][0] = 1 - - def testArrowLimits(self): - # Test that objects that are too large for Arrow throw a Python - # exception. These tests give out of memory errors on Travis and need - # to be run on a machine with lots of RAM. - if os.getenv("TRAVIS") is None: - l = 2 ** 29 * [1.0] - with self.assertRaises(numbuf.numbuf_error): - self.roundTripTest(l) - self.roundTripTest([l]) - del l - l = 2 ** 29 * ["s"] - with self.assertRaises(numbuf.numbuf_error): - self.roundTripTest(l) - self.roundTripTest([l]) - del l - l = 2 ** 29 * [["1"], 2, 3, [{"s": 4}]] - with self.assertRaises(numbuf.numbuf_error): - self.roundTripTest(l) - self.roundTripTest([l]) - del l - with self.assertRaises(numbuf.numbuf_error): - l = 2 ** 29 * [{"s": 1}] + 2 ** 29 * [1.0] - self.roundTripTest(l) - self.roundTripTest([l]) - del l - with self.assertRaises(numbuf.numbuf_error): - l = np.zeros(2 ** 25) - self.roundTripTest([l]) - del l - with self.assertRaises(numbuf.numbuf_error): - l = [np.zeros(2 ** 18) for _ in range(2 ** 7)] - self.roundTripTest(l) - self.roundTripTest([l]) - del l - else: - print("Not running testArrowLimits on Travis because of the " - "test's memory requirements.") - - -if __name__ == "__main__": - unittest.main(verbosity=2) diff --git a/src/numbuf/vsprojects/numbuf.vcxproj b/src/numbuf/vsprojects/numbuf.vcxproj deleted file mode 100644 index b762afa79..000000000 --- a/src/numbuf/vsprojects/numbuf.vcxproj +++ /dev/null @@ -1,75 +0,0 @@ - - - - - Debug - Win32 - - - Release - Win32 - - - Debug - x64 - - - Release - x64 - - - - {609D1438-D42D-4CBA-80A5-A1398C3BCC85} - Win32Proj - - - - v140 - DynamicLibrary - - - - - - - - - - - - - lib$(MSBuildProjectName) - .pyd - - - - $(THIRD_PARTY)arrow\cpp\src;%(AdditionalIncludeDirectories) - - - Console - - - - - - - - - - - - - - - - - - - - - - {10e7d8e8-0eeb-46ea-a58d-f9236b5960ad} - - - - \ No newline at end of file diff --git a/src/numbuf/vsprojects/numbuf.vcxproj.filters b/src/numbuf/vsprojects/numbuf.vcxproj.filters deleted file mode 100644 index c20d75f12..000000000 --- a/src/numbuf/vsprojects/numbuf.vcxproj.filters +++ /dev/null @@ -1,60 +0,0 @@ - - - - - {4FC737F1-C7A5-4376-A066-2A32D752A2FF} - cpp;c;cc;cxx;def;odl;idl;hpj;bat;asm;asmx - - - {93995380-89BD-4b04-88EB-625FBE52EBFB} - h;hh;hpp;hxx;hm;inl;inc;xsd - - - {67DA6AB6-F800-4c08-8B7A-83BB121AAD01} - rc;ico;cur;bmp;dlg;rc2;rct;bin;rgs;gif;jpg;jpeg;jpe;resx;tiff;tif;png;wav;mfcribbon-ms - - - - - Source Files - - - Source Files - - - Source Files - - - Source Files - - - Source Files - - - Source Files - - - - - Header Files - - - Header Files - - - Header Files - - - Header Files - - - Header Files - - - Header Files - - - Header Files - - - \ No newline at end of file diff --git a/src/thirdparty/download_thirdparty.sh b/src/thirdparty/download_thirdparty.sh index b9eead06a..bb82a656e 100755 --- a/src/thirdparty/download_thirdparty.sh +++ b/src/thirdparty/download_thirdparty.sh @@ -13,4 +13,4 @@ fi cd $TP_DIR/arrow git fetch origin master -git checkout dca5d96c7a029c079183e2903db425e486e2deb9 +git checkout b1e56a2f5d3fef3d04093fcfd4f279290f597d06