From a3d58607bf76b7a47e748f9d195f04b533fbd37e Mon Sep 17 00:00:00 2001 From: Alexey Tumanov Date: Tue, 21 Mar 2017 16:17:35 -0700 Subject: [PATCH] parallelize numbuf memcpy and plasma object hash construction (#366) * parallelizing memcopy and object hash construction in numbuf/plasma * clang format * whitespace * refactoring compute object hash: get rid of the prefix chunk * clang format * Document performance optimization. * Remove check for 64-byte alignment, since it may not be guaranteed. --- src/numbuf/CMakeLists.txt | 8 +- src/numbuf/python/src/pynumbuf/memory.h | 103 +++++++++++++++++++++++- src/plasma/CMakeLists.txt | 6 +- src/plasma/plasma.h | 3 + src/plasma/plasma_client.cc | 82 ++++++++++++++++--- src/plasma/plasma_store.cc | 12 ++- 6 files changed, 193 insertions(+), 21 deletions(-) diff --git a/src/numbuf/CMakeLists.txt b/src/numbuf/CMakeLists.txt index 297463461..d843914cf 100644 --- a/src/numbuf/CMakeLists.txt +++ b/src/numbuf/CMakeLists.txt @@ -68,15 +68,15 @@ if(APPLE) endif(APPLE) if(APPLE) - target_link_libraries(numbuf ${ARROW_LIB} ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${PYTHON_LIBRARIES}) + target_link_libraries(numbuf ${ARROW_LIB} ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${PYTHON_LIBRARIES} -lpthread) else() - target_link_libraries(numbuf -Wl,--whole-archive ${ARROW_LIB} -Wl,--no-whole-archive ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${PYTHON_LIBRARIES}) + target_link_libraries(numbuf -Wl,--whole-archive ${ARROW_LIB} -Wl,--no-whole-archive ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${PYTHON_LIBRARIES} -lpthread) endif() if(HAS_PLASMA) - target_link_libraries(numbuf ${ARROW_LIB} ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${PYTHON_LIBRARIES} plasma_lib common ${FLATBUFFERS_STATIC_LIB}) + target_link_libraries(numbuf ${ARROW_LIB} ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${PYTHON_LIBRARIES} plasma_lib common ${FLATBUFFERS_STATIC_LIB} -lpthread) else() - target_link_libraries(numbuf ${ARROW_LIB} ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${PYTHON_LIBRARIES}) + target_link_libraries(numbuf ${ARROW_LIB} ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${PYTHON_LIBRARIES} -lpthread) endif() install(TARGETS numbuf DESTINATION ${CMAKE_SOURCE_DIR}/numbuf/) diff --git a/src/numbuf/python/src/pynumbuf/memory.h b/src/numbuf/python/src/pynumbuf/memory.h index ed3c13a63..8b1c612a4 100644 --- a/src/numbuf/python/src/pynumbuf/memory.h +++ b/src/numbuf/python/src/pynumbuf/memory.h @@ -3,15 +3,113 @@ #include +/* C++ includes */ +#include +#include +#include + +#define THREADPOOL_SIZE 8 +#define MEMCOPY_BLOCK_SIZE 64 +#define BYTES_IN_MB (1 << 20) + +using namespace std; + namespace numbuf { +class ParallelMemcopy { + public: + explicit ParallelMemcopy(uint64_t block_size, bool timeit, int threadpool_size) + : timeit_(timeit), + threadpool_(threadpool_size), + block_size_(block_size), + threadpool_size_(threadpool_size) {} + + void memcopy(uint8_t* dst, const uint8_t* src, uint64_t nbytes) { + struct timeval tv1, tv2; + if (timeit_) { + // Start the timer. + gettimeofday(&tv1, NULL); + } + if (nbytes >= BYTES_IN_MB) { + memcopy_aligned(dst, src, nbytes, block_size_); + } else { + memcpy(dst, src, nbytes); + } + if (timeit_) { + // Stop the timer and log the measured time. + gettimeofday(&tv2, NULL); + double elapsed = + ((tv2.tv_sec - tv1.tv_sec) * 1000000 + (tv2.tv_usec - tv1.tv_usec)) / 1000000.0; + // TODO: replace this with ARROW_LOG(ARROW_INFO) or better equivalent. + printf("Copied %llu bytes in time = %8.4f MBps = %8.4f\n", nbytes, elapsed, + nbytes / (BYTES_IN_MB * elapsed)); + } + } + + ~ParallelMemcopy() { + // Join threadpool threads just in case they are still running. + for (auto& t : threadpool_) { + if (t.joinable()) { t.join(); } + } + } + + private: + /** Controls whether the memcopy operations are timed. */ + bool timeit_; + /** Specifies the desired alignment in bytes, as a power of 2. */ + uint64_t block_size_; + /** Number of threads to be used for parallel memcopy operations. */ + int threadpool_size_; + /** Internal threadpool to be used in the fork/join pattern. */ + std::vector threadpool_; + + void memcopy_aligned( + uint8_t* dst, const uint8_t* src, uint64_t nbytes, uint64_t block_size) { + uint64_t num_threads = threadpool_size_; + uint64_t dst_address = reinterpret_cast(dst); + uint64_t src_address = reinterpret_cast(src); + uint64_t left_address = (src_address + block_size - 1) & ~(block_size - 1); + uint64_t right_address = (src_address + nbytes) & ~(block_size - 1); + uint64_t num_blocks = (right_address - left_address) / block_size; + // Update right address + right_address = right_address - (num_blocks % num_threads) * block_size; + // Now we divide these blocks between available threads. The remainder is + // handled on the main thread. + + uint64_t chunk_size = (right_address - left_address) / num_threads; + uint64_t prefix = left_address - src_address; + uint64_t suffix = src_address + nbytes - right_address; + // Now the data layout is | prefix | k * num_threads * block_size | suffix |. + // We have chunk_size = k * block_size, therefore the data layout is + // | prefix | num_threads * chunk_size | suffix |. + // Each thread gets a "chunk" of k blocks. + + // Start all threads first and handle leftovers while threads run. + for (int i = 0; i < num_threads; i++) { + threadpool_[i] = std::thread(memcpy, dst + prefix + i * chunk_size, + reinterpret_cast(left_address) + i * chunk_size, chunk_size); + } + + memcpy(dst, src, prefix); + memcpy(dst + prefix + num_threads * chunk_size, + reinterpret_cast(right_address), suffix); + + for (auto& t : threadpool_) { + if (t.joinable()) { t.join(); } + } + } +}; + class FixedBufferStream : public arrow::io::OutputStream, public arrow::io::ReadableFileInterface { public: virtual ~FixedBufferStream() {} explicit FixedBufferStream(uint8_t* data, int64_t nbytes) - : data_(data), position_(0), size_(nbytes) {} + : data_(data), + position_(0), + size_(nbytes), + memcopy_helper(MEMCOPY_BLOCK_SIZE, false, THREADPOOL_SIZE) {} arrow::Status Read(int64_t nbytes, std::shared_ptr* out) override { DCHECK(out); @@ -44,7 +142,7 @@ class FixedBufferStream : public arrow::io::OutputStream, DCHECK(position_ + nbytes <= size_) << "position: " << position_ << " nbytes: " << nbytes << "size: " << size_; uint8_t* dst = data_ + position_; - memcpy(dst, data, nbytes); + memcopy_helper.memcopy(dst, data, nbytes); position_ += nbytes; return arrow::Status::OK(); } @@ -60,6 +158,7 @@ class FixedBufferStream : public arrow::io::OutputStream, uint8_t* data_; int64_t position_; int64_t size_; + ParallelMemcopy memcopy_helper; }; class MockBufferStream : public arrow::io::OutputStream { diff --git a/src/plasma/CMakeLists.txt b/src/plasma/CMakeLists.txt index 86e489d5c..e6a2eb533 100644 --- a/src/plasma/CMakeLists.txt +++ b/src/plasma/CMakeLists.txt @@ -57,9 +57,9 @@ if(APPLE) endif(APPLE) if(APPLE) - target_link_libraries(plasma -Wl,-force_load,${FLATBUFFERS_STATIC_LIB} common ${PYTHON_LIBRARIES} ${FLATBUFFERS_STATIC_LIB}) + target_link_libraries(plasma -Wl,-force_load,${FLATBUFFERS_STATIC_LIB} common ${PYTHON_LIBRARIES} ${FLATBUFFERS_STATIC_LIB} -lpthread) else(APPLE) - target_link_libraries(plasma -Wl,--whole-archive ${FLATBUFFERS_STATIC_LIB} -Wl,--no-whole-archive common ${PYTHON_LIBRARIES} ${FLATBUFFERS_STATIC_LIB}) + target_link_libraries(plasma -Wl,--whole-archive ${FLATBUFFERS_STATIC_LIB} -Wl,--no-whole-archive common ${PYTHON_LIBRARIES} ${FLATBUFFERS_STATIC_LIB} -lpthread) endif(APPLE) include_directories("${FLATBUFFERS_INCLUDE_DIR}") @@ -87,7 +87,7 @@ add_library(plasma_lib STATIC fling.c thirdparty/xxhash.c) -target_link_libraries(plasma_lib common ${FLATBUFFERS_STATIC_LIB}) +target_link_libraries(plasma_lib common ${FLATBUFFERS_STATIC_LIB} -lpthread) add_dependencies(plasma_lib gen_plasma_fbs) add_dependencies(plasma protocol_fbs) diff --git a/src/plasma/plasma.h b/src/plasma/plasma.h index f482d4e26..5329789a7 100644 --- a/src/plasma/plasma.h +++ b/src/plasma/plasma.h @@ -16,6 +16,9 @@ #include "utarray.h" #include "uthash.h" +/** Allocation granularity used in plasma for object allocation. */ +#define BLOCK_SIZE 64 + /** * Object request data structure. Used in the plasma_wait_for_objects() * argument. diff --git a/src/plasma/plasma_client.cc b/src/plasma/plasma_client.cc index 97ac1a2ed..8fc72fecd 100644 --- a/src/plasma/plasma_client.cc +++ b/src/plasma/plasma_client.cc @@ -28,6 +28,10 @@ #include "uthash.h" #include "utlist.h" +/* C++ includes */ +#include +#include + extern "C" { #include "sha256.h" #include "fling.h" @@ -38,6 +42,10 @@ extern "C" { #define XXH64_DEFAULT_SEED 0 } +#define THREADPOOL_SIZE 8 +#define BYTES_IN_MB (1 << 20) +static std::vector threadpool_(THREADPOOL_SIZE); + typedef struct { /** Key that uniquely identifies the memory mapped file. In practice, we * take the numerical value of the file descriptor in the object store. */ @@ -465,6 +473,66 @@ void plasma_contains(PlasmaConnection *conn, ObjectID obj_id, int *has_object) { } } +static void compute_block_hash(const unsigned char *data, + int64_t nbytes, + uint64_t *hash) { + XXH64_state_t hash_state; + XXH64_reset(&hash_state, XXH64_DEFAULT_SEED); + XXH64_update(&hash_state, data, nbytes); + *hash = XXH64_digest(&hash_state); +} + +static inline bool compute_object_hash_parallel(XXH64_state_t *hash_state, + const unsigned char *data, + int64_t nbytes) { + /* Note that this function will likely be faster if the address of data is + * aligned on a 64-byte boundary. */ + const uint64_t num_threads = THREADPOOL_SIZE; + uint64_t threadhash[num_threads + 1]; + const uint64_t data_address = reinterpret_cast(data); + const uint64_t num_blocks = nbytes / BLOCK_SIZE; + const uint64_t chunk_size = (num_blocks / num_threads) * BLOCK_SIZE; + const uint64_t right_address = data_address + chunk_size * num_threads; + const uint64_t suffix = (data_address + nbytes) - right_address; + /* Now the data layout is | k * num_threads * block_size | suffix | == + * | num_threads * chunk_size | suffix |, where chunk_size = k * block_size. + * Each thread gets a "chunk" of k blocks, except the suffix thread. */ + + for (int i = 0; i < num_threads; i++) { + threadpool_[i] = + std::thread(compute_block_hash, + reinterpret_cast(data_address) + i * chunk_size, + chunk_size, &threadhash[i]); + } + compute_block_hash(reinterpret_cast(right_address), suffix, + &threadhash[num_threads]); + + /* Join the threads. */ + for (auto &t : threadpool_) { + if (t.joinable()) { + t.join(); + } + } + + XXH64_update(hash_state, (unsigned char *) threadhash, sizeof(threadhash)); + return true; +} + +static uint64_t compute_object_hash(const ObjectBuffer &obj_buffer) { + XXH64_state_t hash_state; + XXH64_reset(&hash_state, XXH64_DEFAULT_SEED); + if (obj_buffer.data_size >= BYTES_IN_MB) { + compute_object_hash_parallel(&hash_state, (unsigned char *) obj_buffer.data, + obj_buffer.data_size); + } else { + XXH64_update(&hash_state, (unsigned char *) obj_buffer.data, + obj_buffer.data_size); + } + XXH64_update(&hash_state, (unsigned char *) obj_buffer.metadata, + obj_buffer.metadata_size); + return XXH64_digest(&hash_state); +} + bool plasma_compute_object_hash(PlasmaConnection *conn, ObjectID obj_id, unsigned char *digest) { @@ -472,21 +540,15 @@ bool plasma_compute_object_hash(PlasmaConnection *conn, * the operation should timeout immediately. */ ObjectBuffer obj_buffer; ObjectID obj_id_array[1] = {obj_id}; + uint64_t hash; + plasma_get(conn, obj_id_array, 1, 0, &obj_buffer); /* If the object was not retrieved, return false. */ if (obj_buffer.data_size == -1) { return false; } /* Compute the hash. */ - XXH64_state_t hash_state; - XXH64_reset(&hash_state, XXH64_DEFAULT_SEED); - XXH64_update(&hash_state, (unsigned char *) obj_buffer.data, - obj_buffer.data_size); - XXH64_update(&hash_state, (unsigned char *) obj_buffer.metadata, - obj_buffer.metadata_size); - uint64_t hash = XXH64_digest(&hash_state); - DCHECK(DIGEST_SIZE >= sizeof(hash)); - memset(digest, 0, DIGEST_SIZE); + hash = compute_object_hash(obj_buffer); memcpy(digest, &hash, sizeof(hash)); /* Release the plasma object. */ plasma_release(conn, obj_id); @@ -505,7 +567,7 @@ void plasma_seal(PlasmaConnection *conn, ObjectID object_id) { "Plasma client called seal an already sealed object"); object_entry->is_sealed = true; /* Send the seal request to Plasma. */ - unsigned char digest[DIGEST_SIZE]; + static unsigned char digest[DIGEST_SIZE]; CHECK(plasma_compute_object_hash(conn, object_id, &digest[0])); CHECK(plasma_send_SealRequest(conn->store_conn, conn->builder, object_id, &digest[0]) >= 0); diff --git a/src/plasma/plasma_store.cc b/src/plasma/plasma_store.cc index 03a84d71e..7582fc0ac 100644 --- a/src/plasma/plasma_store.cc +++ b/src/plasma/plasma_store.cc @@ -40,6 +40,7 @@ extern "C" { #include "fling.h" #include "malloc.h" void *dlmalloc(size_t); +void *dlmemalign(size_t alignment, size_t bytes); void dlfree(void *); } @@ -230,8 +231,15 @@ int create_object(Client *client_context, if (!success) { return PlasmaError_OutOfMemory; } - /* Allocate space for the new object */ - uint8_t *pointer = (uint8_t *) dlmalloc(data_size + metadata_size); + /* Allocate space for the new object. We use dlmemalign instead of dlmalloc in + * order to align the allocated region to a 64-byte boundary. This is not + * strictly necessary, but it is an optimization that could speed up the + * computation of a hash of the data (see compute_object_hash_parallel in + * plasma_client.cc). Note that even though this pointer is 64-byte aligned, + * it is not guaranteed that the corresponding pointer in the client will be + * 64-byte aligned, but in practice it often will be. */ + uint8_t *pointer = + (uint8_t *) dlmemalign(BLOCK_SIZE, data_size + metadata_size); int fd; int64_t map_size; ptrdiff_t offset;