From b8dda0e3d09c8214db04b45852b07367615560fe Mon Sep 17 00:00:00 2001 From: "Siyuan (Ryans) Zhuang" Date: Tue, 10 Nov 2020 23:44:16 -0800 Subject: [PATCH] [Serialization] Fix buffer alignment issues (#11888) * fix buffer alignment issues * remove unused fields * aligned memory allocation * windows compat * license. fix compiler warnings * fix compilation error * reinterpret_cast --- BUILD.bazel | 13 ++++ python/ray/_raylet.pyx | 1 - python/ray/includes/serialization.pxi | 87 +++++++++++++------------- python/ray/tests/test_serialization.py | 25 ++++++++ src/ray/common/buffer.h | 27 +++++--- src/ray/protobuf/serialization.proto | 34 +++++----- src/ray/thirdparty/aligned_alloc.c | 78 +++++++++++++++++++++++ src/ray/thirdparty/aligned_alloc.h | 43 +++++++++++++ 8 files changed, 238 insertions(+), 70 deletions(-) create mode 100644 src/ray/thirdparty/aligned_alloc.c create mode 100644 src/ray/thirdparty/aligned_alloc.h diff --git a/BUILD.bazel b/BUILD.bazel index e55b44dcf..0cc65ef8f 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1367,6 +1367,7 @@ cc_library( strip_include_prefix = "src", visibility = ["//visibility:public"], deps = [ + ":aligned_alloc", ":sha256", "//src/ray/protobuf:event_cc_proto", "@boost//:asio", @@ -1425,6 +1426,18 @@ cc_library( strip_include_prefix = "src", ) +cc_library( + name = "aligned_alloc", + srcs = [ + "src/ray/thirdparty/aligned_alloc.c", + ], + hdrs = [ + "src/ray/thirdparty/aligned_alloc.h", + ], + copts = COPTS, + strip_include_prefix = "src", +) + alias( name = "hiredis", actual = "@com_github_redis_hiredis//:hiredis", diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 75a5045e4..1df342930 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -54,7 +54,6 @@ from ray.includes.common cimport ( CTaskType, CPlacementStrategy, CRayFunction, - LocalMemoryBuffer, move, LANGUAGE_CPP, LANGUAGE_JAVA, diff --git a/python/ray/includes/serialization.pxi b/python/ray/includes/serialization.pxi index 1051ca56a..bf45275e9 100644 --- a/python/ray/includes/serialization.pxi +++ b/python/ray/includes/serialization.pxi @@ -48,12 +48,8 @@ cdef extern from "src/ray/protobuf/serialization.pb.h" nogil: int strides_size() cdef cppclass CPythonObject "ray::serialization::PythonObject": - uint64_t inband_data_offset() const - void set_inband_data_offset(uint64_t value) uint64_t inband_data_size() const void set_inband_data_size(uint64_t value) - uint64_t raw_buffers_offset() const - void set_raw_buffers_offset(uint64_t value) uint64_t raw_buffers_size() const void set_raw_buffers_size(uint64_t value) CPythonBuffer* add_buffer() @@ -69,8 +65,9 @@ cdef int64_t padded_length(int64_t offset, int64_t alignment): return ((offset + alignment - 1) // alignment) * alignment -cdef int64_t padded_length_u64(uint64_t offset, uint64_t alignment): - return ((offset + alignment - 1) // alignment) * alignment +cdef uint8_t* aligned_address(uint8_t* addr, uint64_t alignment) nogil: + cdef uintptr_t u_addr = addr + return (((u_addr + alignment - 1) // alignment) * alignment) cdef class SubBuffer: @@ -204,36 +201,41 @@ def split_buffer(Buffer buf): bufferview[kMessagePackOffset + msgpack_bytes_length:]) -# See 'serialization.proto' for the memory layout in the Plasma buffer. +# Note [Pickle5 serialization layout & alignment] +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# To ensure efficient data access, our serialize enforces alignment +# when writing data to a buffer. See 'serialization.proto' for +# the detail memory layout and alignment. + + @cython.boundscheck(False) @cython.wraparound(False) def unpack_pickle5_buffers(uint8_t[:] bufferview): cdef: const uint8_t *data = &bufferview[0] - size_t size = len(bufferview) CPythonObject python_object CPythonBuffer *buffer_meta - int64_t protobuf_offset + int inband_offset = sizeof(int64_t) * 2 + int64_t inband_size int64_t protobuf_size int32_t i const uint8_t *buffers_segment - protobuf_offset = (data)[0] - if protobuf_offset < 0: - raise ValueError("The protobuf data offset should be positive." + inband_size = (data)[0] + if inband_size < 0: + raise ValueError("The inband data size should be positive." "Got negative instead. " "Maybe the buffer has been corrupted.") protobuf_size = (data)[1] if protobuf_size > INT32_MAX or protobuf_size < 0: raise ValueError("Incorrect protobuf size. " "Maybe the buffer has been corrupted.") + inband_data = bufferview[inband_offset:inband_offset + inband_size] if not python_object.ParseFromArray( - data + protobuf_offset, protobuf_size): + data + inband_offset + inband_size, protobuf_size): raise ValueError("Protobuf object is corrupted.") - inband_data_offset = python_object.inband_data_offset() - inband_data = bufferview[ - inband_data_offset: - inband_data_offset + python_object.inband_data_size()] - buffers_segment = data + python_object.raw_buffers_offset() + buffers_segment = aligned_address( + data + inband_offset + inband_size + protobuf_size, + kMajorBufferAlign) pickled_buffers = [] # Now read buffer meta for i in range(python_object.buffer_size()): @@ -313,51 +315,52 @@ cdef class Pickle5Writer: cdef: size_t protobuf_bytes = 0 uint64_t inband_data_offset = sizeof(int64_t) * 2 - uint64_t raw_buffers_offset = padded_length_u64( - inband_data_offset + len(inband), kMajorBufferAlign) - self.python_object.set_inband_data_offset(inband_data_offset) self.python_object.set_inband_data_size(len(inband)) - self.python_object.set_raw_buffers_offset(raw_buffers_offset) self.python_object.set_raw_buffers_size(self._curr_buffer_addr) # Since calculating the output size is expensive, we will # reuse the cached size. - # So we MUST NOT change 'python_object' afterwards. - # This is because protobuf could change the output size - # according to different values. + # However, protobuf could change the output size according to + # different values, so we MUST NOT change 'python_object' afterwards. protobuf_bytes = self.python_object.ByteSizeLong() if protobuf_bytes > INT32_MAX: raise ValueError("Total buffer metadata size is bigger than %d. " "Consider reduce the number of buffers " "(number of numpy arrays, etc)." % INT32_MAX) - self._protobuf_offset = padded_length_u64( - raw_buffers_offset + self._curr_buffer_addr, kMinorBufferAlign) + self._protobuf_offset = inband_data_offset + len(inband) self._total_bytes = self._protobuf_offset + protobuf_bytes + if self._curr_buffer_addr > 0: + # reserve 'kMajorBufferAlign' bytes for possible buffer alignment + self._total_bytes += kMajorBufferAlign + self._curr_buffer_addr return self._total_bytes @cython.boundscheck(False) @cython.wraparound(False) cdef void write_to(self, const uint8_t[:] inband, uint8_t[:] data, int memcopy_threads) nogil: - cdef uint8_t *ptr = &data[0] - cdef int32_t protobuf_size - cdef uint64_t buffer_addr - cdef uint64_t buffer_len - cdef int i + cdef: + uint8_t *ptr = &data[0] + uint64_t buffer_addr + uint64_t buffer_len + int i + int64_t protobuf_size = self.python_object.GetCachedSize() if self._total_bytes < 0: raise ValueError("Must call 'get_total_bytes()' first " "to get the actual size") - # Write protobuf size for deserialization. - protobuf_size = self.python_object.GetCachedSize() - (ptr)[0] = self._protobuf_offset + # Write inband data & protobuf size for deserialization. + (ptr)[0] = len(inband) (ptr)[1] = protobuf_size - # Write protobuf data. - self.python_object.SerializeWithCachedSizesToArray( - ptr + self._protobuf_offset) # Write inband data. - memcpy(ptr + self.python_object.inband_data_offset(), - &inband[0], len(inband)) - # Write buffer data. - ptr += self.python_object.raw_buffers_offset() + ptr += sizeof(int64_t) * 2 + memcpy(ptr, &inband[0], len(inband)) + # Write protobuf data. + ptr += len(inband) + self.python_object.SerializeWithCachedSizesToArray(ptr) + ptr += protobuf_size + if self._curr_buffer_addr <= 0: + # End of serialization. Writing more stuff will corrupt the memory. + return + # aligned to 64 bytes + ptr = aligned_address(ptr, kMajorBufferAlign) for i in range(self.python_object.buffer_size()): buffer_addr = self.python_object.buffer(i).address() buffer_len = self.python_object.buffer(i).length() diff --git a/python/ray/tests/test_serialization.py b/python/ray/tests/test_serialization.py index 8004e694d..35b6e09fe 100644 --- a/python/ray/tests/test_serialization.py +++ b/python/ray/tests/test_serialization.py @@ -543,6 +543,31 @@ def test_reducer_override_no_reference_cycle(ray_start_shared_local_modes): assert new_obj() is None +def test_buffer_alignment(): + # Deserialized large numpy arrays should be 64-byte aligned. + x = np.random.normal(size=(10, 20, 30)) + y = ray.get(ray.put(x)) + assert y.ctypes.data % 64 == 0 + + # Unlike PyArrow, Ray aligns small numpy arrays to 8 + # bytes to be memory efficient. + xs = [np.random.normal(size=i) for i in range(100)] + ys = ray.get(ray.put(xs)) + for y in ys: + assert y.ctypes.data % 8 == 0 + + xs = [np.random.normal(size=i * (1, )) for i in range(20)] + ys = ray.get(ray.put(xs)) + for y in ys: + assert y.ctypes.data % 8 == 0 + + xs = [np.random.normal(size=i * (5, )) for i in range(1, 8)] + xs = [xs[i][(i + 1) * (slice(1, 3), )] for i in range(len(xs))] + ys = ray.get(ray.put(xs)) + for y in ys: + assert y.ctypes.data % 8 == 0 + + if __name__ == "__main__": import pytest sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/common/buffer.h b/src/ray/common/buffer.h index 501f86886..ccaa6c9fb 100644 --- a/src/ray/common/buffer.h +++ b/src/ray/common/buffer.h @@ -19,6 +19,9 @@ #include "arrow/buffer.h" #include "ray/common/status.h" +#include "ray/thirdparty/aligned_alloc.h" + +#define BUFFER_ALIGNMENT 64 namespace ray { @@ -48,6 +51,7 @@ class Buffer { }; /// Represents a byte buffer in local memory. +/// TODO(suquark): In C++17, we can use std::aligned_alloc class LocalMemoryBuffer : public Buffer { public: /// Constructor. @@ -65,10 +69,10 @@ class LocalMemoryBuffer : public Buffer { : has_data_copy_(copy_data) { if (copy_data) { RAY_CHECK(data != nullptr); - buffer_.resize(size); - std::copy(data, data + size, buffer_.begin()); - data_ = buffer_.data(); - size_ = buffer_.size(); + buffer_ = reinterpret_cast(aligned_malloc(size, BUFFER_ALIGNMENT)); + std::copy(data, data + size, buffer_); + data_ = buffer_; + size_ = size; } else { data_ = data; size_ = size; @@ -77,9 +81,9 @@ class LocalMemoryBuffer : public Buffer { /// Construct a LocalMemoryBuffer of all zeros of the given size. LocalMemoryBuffer(size_t size) : has_data_copy_(true) { - buffer_.resize(size, 0); - data_ = buffer_.data(); - size_ = buffer_.size(); + buffer_ = reinterpret_cast(aligned_malloc(size, BUFFER_ALIGNMENT)); + data_ = buffer_; + size_ = size; } uint8_t *Data() const override { return data_; } @@ -90,7 +94,12 @@ class LocalMemoryBuffer : public Buffer { bool IsPlasmaBuffer() const override { return false; } - ~LocalMemoryBuffer() { size_ = 0; } + ~LocalMemoryBuffer() { + size_ = 0; + if (buffer_ != NULL) { + aligned_free(buffer_); + } + } private: /// Disable copy constructor and assignment, as default copy will @@ -105,7 +114,7 @@ class LocalMemoryBuffer : public Buffer { /// Whether this buffer holds a copy of data. bool has_data_copy_; /// This is only valid when `should_copy` is true. - std::vector buffer_; + uint8_t *buffer_ = NULL; }; /// Represents a byte buffer for plasma object. This can be used to hold the diff --git a/src/ray/protobuf/serialization.proto b/src/ray/protobuf/serialization.proto index 7653aa968..e5fed8e4a 100644 --- a/src/ray/protobuf/serialization.proto +++ b/src/ray/protobuf/serialization.proto @@ -21,9 +21,9 @@ package ray.serialization; // ## About Pickle 5 Protocol // Pickle5 will create two things during serialization: // 1. Inband data. This is the framed pickle data for most objects. -// 2. Buffers. They are python buffers referring internal data of objects. -// They contain metadata of the buffer and a native pointer. -// Thus they provide interface for zero-copy serialization. +// 2. Buffers. They are python buffers wrapping internal data chunks of objects. +// They contain metadata of the buffer and a native pointer, which is +// intended for zero-copy serialization. // // ## Protobuf object // A PythonObject protobuf object will be created for each python object. @@ -34,19 +34,21 @@ package ray.serialization; // // ## Python object serialization memory layout // This section describes the memory layout in the Plasma store buffer. -// Unfortunately, no frame info is included in protobuf data, so we have to specify -// the length and offset of PythonObject. // --------------------- -// i64 offset(PythonObject): -// Offset of the PythonObject relative to the start of this buffer. +// i64 len(inband_data): +// Length of the inband data. // i64 len(PythonObject): // Length of the PythonObject. -// inband_data | pad(64) +// inband_data // Inband data, padded with 64 bytes for the alignment of buffers. -// buffers | pad(8) -// Raw data of buffers, padded with 8 bytes for the alignment of PythonObject. // PythonObject -// PythonObject is stored at the end because its size will be variable. +// PythonObject protobuf defined as below. +// buffers | aligned(64) +// Raw data of buffers, this section is 64-byte aligned. +// Inside the section, large buffers (>=2048 bytes) are aligned +// to 64 bytes for faster SIMD, small buffers (<2048 bytes) are +// aligned to 8 bytes. + // --------------------- // The message for metadata of python buffer objects. @@ -81,16 +83,12 @@ message PythonBuffer { // The message for pickle5 serialized python object. message PythonObject { - // The offset of the inband data section relative to the beginning of the Plasma buffer. - uint64 inband_data_offset = 1; // The size of the inband data section. - uint64 inband_data_size = 2; - // The offset of the raw buffers section relative to the beginning of the Plasma buffer. - uint64 raw_buffers_offset = 3; + uint64 inband_data_size = 1; // The size of the buffers section. It is not used in deserialization // because we already have the length and address of every buffer. However, it could // be useful for debugging or future adjustment, so we just keep it. - uint64 raw_buffers_size = 4; + uint64 raw_buffers_size = 2; // The metadata of python buffer objects. - repeated PythonBuffer buffer = 5; + repeated PythonBuffer buffer = 3; } diff --git a/src/ray/thirdparty/aligned_alloc.c b/src/ray/thirdparty/aligned_alloc.c new file mode 100644 index 000000000..09a75b9dd --- /dev/null +++ b/src/ray/thirdparty/aligned_alloc.c @@ -0,0 +1,78 @@ +/* +Adopted from https://github.com/NickStrupat/AlignedMalloc + +The MIT License (MIT) + +Copyright (c) 2015 Nick Strupat + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +#include "ray/thirdparty/aligned_alloc.h" + +#if defined(__APPLE__) || defined(__linux__) + +#include + +void *aligned_malloc(size_t size, size_t alignment) { + void *pointer = NULL; + int rv = posix_memalign(&pointer, alignment, size); + if (rv != 0) { + pointer = NULL; + } + return pointer; +} + +void aligned_free(void *pointer) { + free(pointer); +} + +#elif defined(_WIN32) + +#include + +void *aligned_malloc(size_t size, size_t alignment) { + return _aligned_malloc(size, alignment); +} + +void aligned_free(void *pointer) { + _aligned_free(pointer); +} + +#else + +// https://sites.google.com/site/ruslancray/lab/bookshelf/interview/ci/low-level/write-an-aligned-malloc-free-function +#include + +void *aligned_malloc(size_t size, size_t alignment) { + void *p1; // original block + void **p2; // aligned block + int offset = alignment - 1 + sizeof(void *); + if ((p1 = (void *)malloc(size + offset)) == NULL) + return NULL; + p2 = (void **)(((size_t)(p1) + offset) & ~(alignment - 1)); + p2[-1] = p1; + return p2; +} + +void aligned_free(void *pointer) { + free(((void **)pointer)[-1]); +} + +#endif diff --git a/src/ray/thirdparty/aligned_alloc.h b/src/ray/thirdparty/aligned_alloc.h new file mode 100644 index 000000000..5bd3274eb --- /dev/null +++ b/src/ray/thirdparty/aligned_alloc.h @@ -0,0 +1,43 @@ +/* +Adopted from https://github.com/NickStrupat/AlignedMalloc + +The MIT License (MIT) + +Copyright (c) 2015 Nick Strupat + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +#ifndef ALIGNED_ALLOC_H_INCLUDED +#define ALIGNED_ALLOC_H_INCLUDED + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +void * aligned_malloc(size_t size, size_t alignment); +void aligned_free(void * pointer); + +#ifdef __cplusplus +} +#endif + +#endif