Improved Pickle5 pickling (#5841)

* object copy optimization

* see if we can reuse the Arrow parallel_memcopy

* remove unused function

* restore the original code, since later experiments show that it has little impact on performance.

* lint
This commit is contained in:
Si-Yuan 2019-10-03 15:14:32 -07:00 committed by GitHub
parent fa1214c44a
commit 3a42780cb8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 5 additions and 43 deletions

View file

@ -1,7 +1,5 @@
from libc.stdlib cimport malloc, free
from libc.string cimport memcpy
from libc.stdint cimport uintptr_t, uint64_t, INT32_MAX
from cython.parallel cimport prange
# This is the default alignment value for len(buffer) < 2048.
DEF kMinorBufferAlign = 8
@ -12,6 +10,9 @@ DEF kMajorBufferSize = 2048
DEF kMemcopyDefaultBlocksize = 64
DEF kMemcopyDefaultThreshold = 1024 * 1024
cdef extern from "arrow/util/memory.h" namespace "arrow::internal" nogil:
void parallel_memcopy(uint8_t* dst, const uint8_t* src, int64_t nbytes,
uintptr_t block_size, int num_threads)
cdef extern from "google/protobuf/repeated_field.h" nogil:
cdef cppclass RepeatedField[Element]:
@ -67,10 +68,6 @@ cdef int64_t padded_length_u64(uint64_t offset, uint64_t alignment):
return ((offset + alignment - 1) // alignment) * alignment
cdef uint8_t* pointer_logical_and(const uint8_t *address, uintptr_t bits):
return <uint8_t*> ((<uintptr_t> address) & bits)
cdef class SubBuffer:
cdef:
void *buf
@ -141,41 +138,6 @@ cdef class SubBuffer:
return self.size
cdef void parallel_memcopy(uint8_t *dst, const uint8_t *src, int64_t nbytes,
uintptr_t block_size, int num_threads):
cdef:
uint8_t *left = pointer_logical_and(src + block_size - 1,
~(block_size - 1))
uint8_t *right = pointer_logical_and(src + nbytes, ~(block_size - 1))
int64_t num_blocks = (right - left) // block_size
size_t chunk_size
int64_t prefix, suffix
int i
# Update right address
right = right - (num_blocks % num_threads) * block_size
# Now we divide these blocks between available threads. The remainder is
# handled separately.
chunk_size = (right - left) // num_threads
prefix = left - src
suffix = src + nbytes - right
# Now the data layout is
# | prefix | k * num_threads * block_size | suffix |.
#
# We have chunk_size = k * block_size, therefore the data layout is
# | prefix | num_threads * chunk_size | suffix |.
# Each thread gets a "chunk" of k blocks.
for i in prange(num_threads, nogil=True, num_threads=num_threads):
memcpy(dst + prefix + i * chunk_size, left + i * chunk_size,
chunk_size)
if i == 0:
memcpy(dst, src, prefix)
if i == num_threads - 1:
memcpy(dst + prefix + num_threads * chunk_size, right, suffix)
# See 'serialization.proto' for the memory layout in the Plasma buffer.
def unpack_pickle5_buffers(Buffer buf):
cdef:

View file

@ -434,9 +434,9 @@ class Worker(object):
value, object_id, memcopy_threads=self.memcopy_threads)
else:
writer = Pickle5Writer()
meta = pickle.dumps(
inband = pickle.dumps(
value, protocol=5, buffer_callback=writer.buffer_callback)
self.core_worker.put_pickle5_buffers(object_id, meta, writer,
self.core_worker.put_pickle5_buffers(object_id, inband, writer,
self.memcopy_threads)
except pyarrow.plasma.PlasmaObjectExists:
# The object already exists in the object store, so there is no