Use our own implementation of parallel_memcopy (#7254)

This commit is contained in:
Edward Oakes 2020-02-21 11:03:50 -08:00 committed by GitHub
parent cbc808bc6b
commit d190e73727
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 69 additions and 27 deletions

View file

@ -1075,6 +1075,7 @@ pyx_library(
),
deps = [
"//:core_worker_lib",
"//:ray_util",
"//:raylet_lib",
"//:serialization_cc_proto",
"//:src/ray/ray_exported_symbols.lds",

View file

@ -13,12 +13,6 @@ if "pickle5" in sys.modules:
"requires a specific version of pickle5 (which is "
"packaged along with Ray).")
if "OMP_NUM_THREADS" not in os.environ:
logger.debug("[ray] Forcing OMP_NUM_THREADS=1 to avoid performance "
"degradation with many workers (issue #6998). You can "
"override this by explicitly setting OMP_NUM_THREADS.")
os.environ["OMP_NUM_THREADS"] = "1"
# Add the directory containing pickle5 to the Python path so that we find the
# pickle5 version packaged with ray and not a pre-existing pickle5.
pickle5_path = os.path.join(

View file

@ -112,7 +112,7 @@ include "includes/libcoreworker.pxi"
logger = logging.getLogger(__name__)
MEMCOPY_THREADS = 12
MEMCOPY_THREADS = 6
def set_internal_config(dict options):

View file

@ -10,7 +10,7 @@ DEF kMajorBufferSize = 2048
DEF kMemcopyDefaultBlocksize = 64
DEF kMemcopyDefaultThreshold = 1024 * 1024
cdef extern from "arrow/util/memory.h" namespace "arrow::internal" nogil:
cdef extern from "ray/util/memory.h" namespace "ray" nogil:
void parallel_memcopy(uint8_t* dst, const uint8_t* src, int64_t nbytes,
uintptr_t block_size, int num_threads)

View file

@ -66,12 +66,6 @@ ERROR_KEY_PREFIX = b"Error:"
# entry/init points.
logger = logging.getLogger(__name__)
# Whether we should warn about slow put performance.
if os.environ.get("OMP_NUM_THREADS") == "1":
should_warn_of_slow_puts = True
else:
should_warn_of_slow_puts = False
class ActorCheckpointInfo:
"""Information used to maintain actor checkpoints."""
@ -275,22 +269,10 @@ class Worker:
"do this, you can wrap the ray.ObjectID in a list and "
"call 'put' on it (or return it).")
global should_warn_of_slow_puts
if should_warn_of_slow_puts:
start = time.perf_counter()
serialized_value = self.get_serialization_context().serialize(value)
result = self.core_worker.put_serialized_object(
return self.core_worker.put_serialized_object(
serialized_value, object_id=object_id, pin_object=pin_object)
if should_warn_of_slow_puts:
delta = time.perf_counter() - start
if delta > 0.1:
logger.warning("OMP_NUM_THREADS=1 is set, this may slow down "
"ray.put() for large objects (issue #6998).")
should_warn_of_slow_puts = False
return result
def deserialize_objects(self, data_metadata_pairs, object_ids):
context = self.get_serialization_context()
return context.deserialize_objects(data_metadata_pairs, object_ids)

50
src/ray/util/memory.cc Normal file
View file

@ -0,0 +1,50 @@
#include "ray/util/memory.h"
#include <cstring>
#include <thread>
#include <vector>
namespace ray {
uint8_t *pointer_logical_and(const uint8_t *address, uintptr_t bits) {
uintptr_t value = reinterpret_cast<uintptr_t>(address);
return reinterpret_cast<uint8_t *>(value & bits);
}
void parallel_memcopy(uint8_t *dst, const uint8_t *src, int64_t nbytes,
uintptr_t block_size, int num_threads) {
std::vector<std::thread> threadpool(num_threads);
uint8_t *left = pointer_logical_and(src + block_size - 1, ~(block_size - 1));
uint8_t *right = pointer_logical_and(src + nbytes, ~(block_size - 1));
int64_t num_blocks = (right - left) / block_size;
// Update right address
right = right - (num_blocks % num_threads) * block_size;
// Now we divide these blocks between available threads. The remainder is
// handled on the main thread.
int64_t chunk_size = (right - left) / num_threads;
int64_t prefix = left - src;
int64_t suffix = src + nbytes - right;
// Now the data layout is | prefix | k * num_threads * block_size | suffix |.
// We have chunk_size = k * block_size, therefore the data layout is
// | prefix | num_threads * chunk_size | suffix |.
// Each thread gets a "chunk" of k blocks.
// Start all threads first and handle leftovers while threads run.
for (int i = 0; i < num_threads; i++) {
threadpool[i] = std::thread(std::memcpy, dst + prefix + i * chunk_size,
left + i * chunk_size, chunk_size);
}
std::memcpy(dst, src, prefix);
std::memcpy(dst + prefix + num_threads * chunk_size, right, suffix);
for (auto &t : threadpool) {
if (t.joinable()) {
t.join();
}
}
}
} // namespace ray

15
src/ray/util/memory.h Normal file
View file

@ -0,0 +1,15 @@
#ifndef RAY_UTIL_MEMORY_H
#define RAY_UTIL_MEMORY_H
#include <stdint.h>
namespace ray {
// A helper function for doing memcpy with multiple threads. This is required
// to saturate the memory bandwidth of modern cpus.
void parallel_memcopy(uint8_t *dst, const uint8_t *src, int64_t nbytes,
uintptr_t block_size, int num_threads);
} // namespace ray
#endif // RAY_UTIL_MEMORY_H