From ed77c8b16c1bb2a7e5939d26f9d1f447f5ef17c9 Mon Sep 17 00:00:00 2001 From: "Siyuan (Ryans) Zhuang" Date: Fri, 12 Jun 2020 14:20:53 -0700 Subject: [PATCH] [Core] Use global variable to eliminate force thread termination in plasma (#8912) * use global variable to eliminate force thread termination --- BUILD.bazel | 2 + src/ray/object_manager/object_manager.cc | 11 +- src/ray/object_manager/object_manager.h | 3 +- src/ray/object_manager/plasma/store.cc | 160 ----------------- src/ray/object_manager/plasma/store.h | 19 -- src/ray/object_manager/plasma/store_runner.cc | 164 ++++++++++++++++++ src/ray/object_manager/plasma/store_runner.h | 38 ++++ src/ray/plasma/store_exec.cc | 10 +- 8 files changed, 217 insertions(+), 190 deletions(-) create mode 100644 src/ray/object_manager/plasma/store_runner.cc create mode 100644 src/ray/object_manager/plasma/store_runner.h diff --git a/BUILD.bazel b/BUILD.bazel index b23e8816f..eea740288 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -381,6 +381,7 @@ cc_library( "src/ray/object_manager/plasma/plasma_allocator.cc", "src/ray/object_manager/plasma/quota_aware_policy.cc", "src/ray/object_manager/plasma/store.cc", + "src/ray/object_manager/plasma/store_runner.cc", ], hdrs = [ "src/ray/object_manager/plasma/events.h", @@ -389,6 +390,7 @@ cc_library( "src/ray/object_manager/plasma/plasma_allocator.h", "src/ray/object_manager/plasma/quota_aware_policy.h", "src/ray/object_manager/plasma/store.h", + "src/ray/object_manager/plasma/store_runner.h", "src/ray/thirdparty/dlmalloc.c", ], copts = PLASMA_COPTS, diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 60d0dd3bd..927c24893 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -28,11 +28,12 @@ namespace ray { ObjectStoreRunner::ObjectStoreRunner(const ObjectManagerConfig &config) { if (config.object_store_memory > 0) { - plasma_store_.reset(new plasma::PlasmaStoreRunner( + plasma::plasma_store_runner.reset(new plasma::PlasmaStoreRunner( config.store_socket_name, config.object_store_memory, config.huge_pages, config.plasma_directory, "")); // Initialize object store. - store_thread_ = std::thread(&plasma::PlasmaStoreRunner::Start, plasma_store_.get()); + store_thread_ = + std::thread(&plasma::PlasmaStoreRunner::Start, plasma::plasma_store_runner.get()); // Sleep for sometime until the store is working. This can suppress some // connection warnings. std::this_thread::sleep_for(std::chrono::microseconds(500)); @@ -40,10 +41,10 @@ ObjectStoreRunner::ObjectStoreRunner(const ObjectManagerConfig &config) { } ObjectStoreRunner::~ObjectStoreRunner() { - if (plasma_store_ != nullptr) { - plasma_store_->Stop(); + if (plasma::plasma_store_runner != nullptr) { + plasma::plasma_store_runner->Stop(); store_thread_.join(); - plasma_store_.reset(); + plasma::plasma_store_runner.reset(); } } diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index 5adb3b856..f4bc3f9c1 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -37,7 +37,7 @@ #include "ray/object_manager/object_directory.h" #include "ray/object_manager/object_store_notification_manager.h" #include "ray/object_manager/plasma/client.h" -#include "ray/object_manager/plasma/store.h" +#include "ray/object_manager/plasma/store_runner.h" #include "ray/rpc/object_manager/object_manager_client.h" #include "ray/rpc/object_manager/object_manager_server.h" @@ -85,7 +85,6 @@ class ObjectStoreRunner { ~ObjectStoreRunner(); private: - std::unique_ptr plasma_store_; std::thread store_thread_; }; diff --git a/src/ray/object_manager/plasma/store.cc b/src/ray/object_manager/plasma/store.cc index 72d4f6d52..c06655ee2 100644 --- a/src/ray/object_manager/plasma/store.cc +++ b/src/ray/object_manager/plasma/store.cc @@ -28,16 +28,9 @@ #include "ray/object_manager/plasma/store.h" -#include -#include #include -#include -#include #include #include -#include -#include -#include #include #include @@ -66,15 +59,10 @@ using arrow::cuda::CudaContext; using arrow::cuda::CudaDeviceManager; #endif -using arrow::util::ArrowLog; -using arrow::util::ArrowLogLevel; - namespace fb = plasma::flatbuf; namespace plasma { -void SetMallocGranularity(int value); - struct GetRequest { GetRequest(Client* client, const std::vector& object_ids); /// The client that called get. @@ -1134,152 +1122,4 @@ Status PlasmaStore::ProcessMessage(Client* client) { return Status::OK(); } -void HandleSignal(int signal) { - if (signal == SIGTERM) { - ARROW_LOG(INFO) << "SIGTERM Signal received, closing Plasma Server..."; -#ifdef _WIN32 - ExitThread(0); -#else - pthread_exit(0); -#endif - } -} - -PlasmaStoreRunner::PlasmaStoreRunner(std::string socket_name, int64_t system_memory, - bool hugepages_enabled, std::string plasma_directory, - const std::string external_store_endpoint): - hugepages_enabled_(hugepages_enabled), external_store_endpoint_(external_store_endpoint) { - ArrowLog::StartArrowLog("plasma_store", ArrowLogLevel::ARROW_INFO); - ArrowLog::InstallFailureSignalHandler(); - - // Sanity check. - if (socket_name.empty()) { - ARROW_LOG(FATAL) << "please specify socket for incoming connections with -s switch"; - } - socket_name_ = socket_name; - if (system_memory == -1) { - ARROW_LOG(FATAL) << "please specify the amount of system memory with -m switch"; - } - // Set system memory capacity - plasma::PlasmaAllocator::SetFootprintLimit(static_cast(system_memory)); - ARROW_LOG(INFO) << "Allowing the Plasma store to use up to " - << static_cast(system_memory) / 1000000000 - << "GB of memory."; - if (hugepages_enabled && plasma_directory.empty()) { - ARROW_LOG(FATAL) << "if you want to use hugepages, please specify path to huge pages " - "filesystem with -d"; - } - if (plasma_directory.empty()) { -#ifdef __linux__ - plasma_directory = "/dev/shm"; -#else - plasma_directory = "/tmp"; -#endif - } - ARROW_LOG(INFO) << "Starting object store with directory " << plasma_directory - << " and huge page support " - << (hugepages_enabled ? "enabled" : "disabled"); -#ifdef __linux__ - if (!hugepages_enabled) { - // On Linux, check that the amount of memory available in /dev/shm is large - // enough to accommodate the request. If it isn't, then fail. - int shm_fd = open(plasma_directory.c_str(), O_RDONLY); - struct statvfs shm_vfs_stats; - fstatvfs(shm_fd, &shm_vfs_stats); - // The value shm_vfs_stats.f_bsize is the block size, and the value - // shm_vfs_stats.f_bavail is the number of available blocks. - int64_t shm_mem_avail = shm_vfs_stats.f_bsize * shm_vfs_stats.f_bavail; - close(shm_fd); - // Keep some safety margin for allocator fragmentation. - shm_mem_avail = 9 * shm_mem_avail / 10; - if (system_memory > shm_mem_avail) { - ARROW_LOG(WARNING) - << "System memory request exceeds memory available in " << plasma_directory - << ". The request is for " << system_memory - << " bytes, and the amount available is " << shm_mem_avail - << " bytes. You may be able to free up space by deleting files in " - "/dev/shm. If you are inside a Docker container, you may need to " - "pass an argument with the flag '--shm-size' to 'docker run'."; - system_memory = shm_mem_avail; - } - } else { - plasma::SetMallocGranularity(1024 * 1024 * 1024); // 1 GB - } -#endif - system_memory_ = system_memory; - plasma_directory_ = plasma_directory; -} - -void PlasmaStoreRunner::Start() { -#ifdef _WINSOCKAPI_ - WSADATA wsadata; - WSAStartup(MAKEWORD(2, 2), &wsadata); -#endif - // Get external store - std::shared_ptr external_store{nullptr}; - if (!external_store_endpoint_.empty()) { - std::string name; - ARROW_CHECK_OK( - plasma::ExternalStores::ExtractStoreName(external_store_endpoint_, &name)); - external_store = plasma::ExternalStores::GetStore(name); - if (external_store == nullptr) { - ARROW_LOG(FATAL) << "No such external store \"" << name << "\""; - } - ARROW_LOG(DEBUG) << "connecting to external store..."; - ARROW_CHECK_OK(external_store->Connect(external_store_endpoint_)); - } - ARROW_LOG(DEBUG) << "starting server listening on " << socket_name_; - - // Ignore SIGPIPE signals. If we don't do this, then when we attempt to write - // to a client that has already died, the store could die. -#ifndef _WIN32 // TODO(mehrdadn): Is there an equivalent of this we need for Windows? - // Ignore SIGPIPE signals. If we don't do this, then when we attempt to write - // to a client that has already died, the store could die. - signal(SIGPIPE, SIG_IGN); -#endif - signal(SIGTERM, HandleSignal); - - // Create the event loop. - loop_.reset(new EventLoop); - store_.reset(new PlasmaStore(loop_.get(), plasma_directory_, hugepages_enabled_, - socket_name_, external_store)); - plasma_config = store_->GetPlasmaStoreInfo(); - - // We are using a single memory-mapped file by mallocing and freeing a single - // large amount of space up front. According to the documentation, - // dlmalloc might need up to 128*sizeof(size_t) bytes for internal - // bookkeeping. - void* pointer = plasma::PlasmaAllocator::Memalign( - kBlockSize, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t)); - ARROW_CHECK(pointer != nullptr); - // This will unmap the file, but the next one created will be as large - // as this one (this is an implementation detail of dlmalloc). - plasma::PlasmaAllocator::Free( - pointer, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t)); - - int socket = ConnectOrListenIpcSock(socket_name_, true); - // TODO(pcm): Check return value. - ARROW_CHECK(socket >= 0); - - loop_->AddFileEvent(socket, kEventLoopRead, [this, socket](int events) { - this->store_->ConnectClient(socket); - }); - loop_->Start(); - - Shutdown(); -#ifdef _WINSOCKAPI_ - WSACleanup(); -#endif - ArrowLog::UninstallSignalAction(); - ArrowLog::ShutDownArrowLog(); -} - -void PlasmaStoreRunner::Stop() { loop_->Stop(); } - -void PlasmaStoreRunner::Shutdown() { - loop_->Shutdown(); - loop_ = nullptr; - store_ = nullptr; -} - } // namespace plasma diff --git a/src/ray/object_manager/plasma/store.h b/src/ray/object_manager/plasma/store.h index 14aeafb52..8eab7677c 100644 --- a/src/ray/object_manager/plasma/store.h +++ b/src/ray/object_manager/plasma/store.h @@ -245,25 +245,6 @@ class PlasmaStore { #endif }; -class PlasmaStoreRunner { - public: - PlasmaStoreRunner(std::string socket_name, int64_t system_memory, - bool hugepages_enabled, std::string plasma_directory, - const std::string external_store_endpoint); - void Start(); - void Stop(); - void Shutdown(); - - private: - std::string socket_name_; - int64_t system_memory_; - bool hugepages_enabled_; - std::string plasma_directory_; - std::string external_store_endpoint_; - std::unique_ptr loop_; - std::unique_ptr store_; -}; - } // namespace plasma #endif // PLASMA_STORE_H diff --git a/src/ray/object_manager/plasma/store_runner.cc b/src/ray/object_manager/plasma/store_runner.cc new file mode 100644 index 000000000..492bffb94 --- /dev/null +++ b/src/ray/object_manager/plasma/store_runner.cc @@ -0,0 +1,164 @@ +#include "ray/object_manager/plasma/store_runner.h" + +#include +#include +#include +#include +#include +#include + +#include "ray/object_manager/plasma/io.h" +#include "ray/object_manager/plasma/plasma_allocator.h" + +namespace plasma { + +using arrow::util::ArrowLog; +using arrow::util::ArrowLogLevel; + +void SetMallocGranularity(int value); + +void HandleSignal(int signal) { + if (signal == SIGTERM) { + ARROW_LOG(INFO) << "SIGTERM Signal received, closing Plasma Server..."; + plasma_store_runner->Stop(); + } +} + +PlasmaStoreRunner::PlasmaStoreRunner(std::string socket_name, int64_t system_memory, + bool hugepages_enabled, std::string plasma_directory, + const std::string external_store_endpoint): + hugepages_enabled_(hugepages_enabled), external_store_endpoint_(external_store_endpoint) { + ArrowLog::StartArrowLog("plasma_store", ArrowLogLevel::ARROW_INFO); + ArrowLog::InstallFailureSignalHandler(); + + // Sanity check. + if (socket_name.empty()) { + ARROW_LOG(FATAL) << "please specify socket for incoming connections with -s switch"; + } + socket_name_ = socket_name; + if (system_memory == -1) { + ARROW_LOG(FATAL) << "please specify the amount of system memory with -m switch"; + } + // Set system memory capacity + PlasmaAllocator::SetFootprintLimit(static_cast(system_memory)); + ARROW_LOG(INFO) << "Allowing the Plasma store to use up to " + << static_cast(system_memory) / 1000000000 + << "GB of memory."; + if (hugepages_enabled && plasma_directory.empty()) { + ARROW_LOG(FATAL) << "if you want to use hugepages, please specify path to huge pages " + "filesystem with -d"; + } + if (plasma_directory.empty()) { +#ifdef __linux__ + plasma_directory = "/dev/shm"; +#else + plasma_directory = "/tmp"; +#endif + } + ARROW_LOG(INFO) << "Starting object store with directory " << plasma_directory + << " and huge page support " + << (hugepages_enabled ? "enabled" : "disabled"); +#ifdef __linux__ + if (!hugepages_enabled) { + // On Linux, check that the amount of memory available in /dev/shm is large + // enough to accommodate the request. If it isn't, then fail. + int shm_fd = open(plasma_directory.c_str(), O_RDONLY); + struct statvfs shm_vfs_stats; + fstatvfs(shm_fd, &shm_vfs_stats); + // The value shm_vfs_stats.f_bsize is the block size, and the value + // shm_vfs_stats.f_bavail is the number of available blocks. + int64_t shm_mem_avail = shm_vfs_stats.f_bsize * shm_vfs_stats.f_bavail; + close(shm_fd); + // Keep some safety margin for allocator fragmentation. + shm_mem_avail = 9 * shm_mem_avail / 10; + if (system_memory > shm_mem_avail) { + ARROW_LOG(WARNING) + << "System memory request exceeds memory available in " << plasma_directory + << ". The request is for " << system_memory + << " bytes, and the amount available is " << shm_mem_avail + << " bytes. You may be able to free up space by deleting files in " + "/dev/shm. If you are inside a Docker container, you may need to " + "pass an argument with the flag '--shm-size' to 'docker run'."; + system_memory = shm_mem_avail; + } + } else { + SetMallocGranularity(1024 * 1024 * 1024); // 1 GB + } +#endif + system_memory_ = system_memory; + plasma_directory_ = plasma_directory; +} + +void PlasmaStoreRunner::Start() { +#ifdef _WINSOCKAPI_ + WSADATA wsadata; + WSAStartup(MAKEWORD(2, 2), &wsadata); +#endif + // Get external store + std::shared_ptr external_store{nullptr}; + if (!external_store_endpoint_.empty()) { + std::string name; + ARROW_CHECK_OK( + plasma::ExternalStores::ExtractStoreName(external_store_endpoint_, &name)); + external_store = plasma::ExternalStores::GetStore(name); + if (external_store == nullptr) { + ARROW_LOG(FATAL) << "No such external store \"" << name << "\""; + } + ARROW_LOG(DEBUG) << "connecting to external store..."; + ARROW_CHECK_OK(external_store->Connect(external_store_endpoint_)); + } + ARROW_LOG(DEBUG) << "starting server listening on " << socket_name_; + +#ifndef _WIN32 // TODO(mehrdadn): Is there an equivalent of this we need for Windows? + // Ignore SIGPIPE signals. If we don't do this, then when we attempt to write + // to a client that has already died, the store could die. + signal(SIGPIPE, SIG_IGN); +#endif + signal(SIGTERM, HandleSignal); + + // Create the event loop. + loop_.reset(new EventLoop); + store_.reset(new PlasmaStore(loop_.get(), plasma_directory_, hugepages_enabled_, + socket_name_, external_store)); + plasma_config = store_->GetPlasmaStoreInfo(); + + // We are using a single memory-mapped file by mallocing and freeing a single + // large amount of space up front. According to the documentation, + // dlmalloc might need up to 128*sizeof(size_t) bytes for internal + // bookkeeping. + void* pointer = PlasmaAllocator::Memalign( + kBlockSize, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t)); + ARROW_CHECK(pointer != nullptr); + // This will unmap the file, but the next one created will be as large + // as this one (this is an implementation detail of dlmalloc). + PlasmaAllocator::Free( + pointer, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t)); + + int socket = ConnectOrListenIpcSock(socket_name_, true); + // TODO(pcm): Check return value. + ARROW_CHECK(socket >= 0); + + loop_->AddFileEvent(socket, kEventLoopRead, [this, socket](int events) { + this->store_->ConnectClient(socket); + }); + loop_->Start(); + + Shutdown(); +#ifdef _WINSOCKAPI_ + WSACleanup(); +#endif + ArrowLog::UninstallSignalAction(); + ArrowLog::ShutDownArrowLog(); +} + +void PlasmaStoreRunner::Stop() { loop_->Stop(); } + +void PlasmaStoreRunner::Shutdown() { + loop_->Shutdown(); + loop_ = nullptr; + store_ = nullptr; +} + +std::unique_ptr plasma_store_runner; + +} // namespace plasma diff --git a/src/ray/object_manager/plasma/store_runner.h b/src/ray/object_manager/plasma/store_runner.h new file mode 100644 index 000000000..89d52be63 --- /dev/null +++ b/src/ray/object_manager/plasma/store_runner.h @@ -0,0 +1,38 @@ +#ifndef PLASMA_STORE_RUNNER_H +#define PLASMA_STORE_RUNNER_H + +#include + +#include "ray/object_manager/plasma/store.h" + +namespace plasma { + +class PlasmaStoreRunner { + public: + PlasmaStoreRunner(std::string socket_name, int64_t system_memory, + bool hugepages_enabled, std::string plasma_directory, + const std::string external_store_endpoint); + void Start(); + void Stop(); + void Shutdown(); + + private: + std::string socket_name_; + int64_t system_memory_; + bool hugepages_enabled_; + std::string plasma_directory_; + std::string external_store_endpoint_; + std::unique_ptr loop_; + std::unique_ptr store_; +}; + +// We use a global variable for Plasma Store instance here because: +// 1) There is only one plasma store thread in Raylet or the Plasma Store process. +// 2) The thirdparty dlmalloc library cannot be contained in a local variable, +// so even we use a local variable for plasma store, it does not provide +// better isolation. +extern std::unique_ptr plasma_store_runner; + +} // namespace plasma + +#endif // PLASMA_STORE_RUNNER_H diff --git a/src/ray/plasma/store_exec.cc b/src/ray/plasma/store_exec.cc index 1d62e8b65..6c687dd0d 100644 --- a/src/ray/plasma/store_exec.cc +++ b/src/ray/plasma/store_exec.cc @@ -4,7 +4,7 @@ #include #include -#include "ray/object_manager/plasma/store.h" +#include "ray/object_manager/plasma/store_runner.h" // TODO(pcm): Convert getopt and sscanf in the store to use more idiomatic C++ // and get rid of the next three lines: #ifndef __STDC_FORMAT_MACROS @@ -50,9 +50,11 @@ int main(int argc, char *argv[]) { } if (!keep_idle) { - plasma::PlasmaStoreRunner runner(socket_name, system_memory, hugepages_enabled, - plasma_directory, external_store_endpoint); - runner.Start(); + plasma::plasma_store_runner.reset( + new plasma::PlasmaStoreRunner(socket_name, system_memory, hugepages_enabled, + plasma_directory, external_store_endpoint)); + plasma::plasma_store_runner->Start(); + plasma::plasma_store_runner.reset(); } else { printf( "The Plasma Store is started with the '-z' flag, "