[Core] Use global variable to eliminate force thread termination in plasma (#8912)

* use global variable to eliminate force thread termination
This commit is contained in:
Siyuan (Ryans) Zhuang 2020-06-12 14:20:53 -07:00 committed by GitHub
parent 58efec0f2b
commit ed77c8b16c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 217 additions and 190 deletions

View file

@ -381,6 +381,7 @@ cc_library(
"src/ray/object_manager/plasma/plasma_allocator.cc",
"src/ray/object_manager/plasma/quota_aware_policy.cc",
"src/ray/object_manager/plasma/store.cc",
"src/ray/object_manager/plasma/store_runner.cc",
],
hdrs = [
"src/ray/object_manager/plasma/events.h",
@ -389,6 +390,7 @@ cc_library(
"src/ray/object_manager/plasma/plasma_allocator.h",
"src/ray/object_manager/plasma/quota_aware_policy.h",
"src/ray/object_manager/plasma/store.h",
"src/ray/object_manager/plasma/store_runner.h",
"src/ray/thirdparty/dlmalloc.c",
],
copts = PLASMA_COPTS,

View file

@ -28,11 +28,12 @@ namespace ray {
ObjectStoreRunner::ObjectStoreRunner(const ObjectManagerConfig &config) {
if (config.object_store_memory > 0) {
plasma_store_.reset(new plasma::PlasmaStoreRunner(
plasma::plasma_store_runner.reset(new plasma::PlasmaStoreRunner(
config.store_socket_name, config.object_store_memory, config.huge_pages,
config.plasma_directory, ""));
// Initialize object store.
store_thread_ = std::thread(&plasma::PlasmaStoreRunner::Start, plasma_store_.get());
store_thread_ =
std::thread(&plasma::PlasmaStoreRunner::Start, plasma::plasma_store_runner.get());
// Sleep for sometime until the store is working. This can suppress some
// connection warnings.
std::this_thread::sleep_for(std::chrono::microseconds(500));
@ -40,10 +41,10 @@ ObjectStoreRunner::ObjectStoreRunner(const ObjectManagerConfig &config) {
}
ObjectStoreRunner::~ObjectStoreRunner() {
if (plasma_store_ != nullptr) {
plasma_store_->Stop();
if (plasma::plasma_store_runner != nullptr) {
plasma::plasma_store_runner->Stop();
store_thread_.join();
plasma_store_.reset();
plasma::plasma_store_runner.reset();
}
}

View file

@ -37,7 +37,7 @@
#include "ray/object_manager/object_directory.h"
#include "ray/object_manager/object_store_notification_manager.h"
#include "ray/object_manager/plasma/client.h"
#include "ray/object_manager/plasma/store.h"
#include "ray/object_manager/plasma/store_runner.h"
#include "ray/rpc/object_manager/object_manager_client.h"
#include "ray/rpc/object_manager/object_manager_server.h"
@ -85,7 +85,6 @@ class ObjectStoreRunner {
~ObjectStoreRunner();
private:
std::unique_ptr<plasma::PlasmaStoreRunner> plasma_store_;
std::thread store_thread_;
};

View file

@ -28,16 +28,9 @@
#include "ray/object_manager/plasma/store.h"
#include <assert.h>
#include <fcntl.h>
#include <limits.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/statvfs.h>
#include <sys/types.h>
#include <unistd.h>
#include <ctime>
#include <deque>
@ -66,15 +59,10 @@ using arrow::cuda::CudaContext;
using arrow::cuda::CudaDeviceManager;
#endif
using arrow::util::ArrowLog;
using arrow::util::ArrowLogLevel;
namespace fb = plasma::flatbuf;
namespace plasma {
void SetMallocGranularity(int value);
struct GetRequest {
GetRequest(Client* client, const std::vector<ObjectID>& object_ids);
/// The client that called get.
@ -1134,152 +1122,4 @@ Status PlasmaStore::ProcessMessage(Client* client) {
return Status::OK();
}
void HandleSignal(int signal) {
if (signal == SIGTERM) {
ARROW_LOG(INFO) << "SIGTERM Signal received, closing Plasma Server...";
#ifdef _WIN32
ExitThread(0);
#else
pthread_exit(0);
#endif
}
}
PlasmaStoreRunner::PlasmaStoreRunner(std::string socket_name, int64_t system_memory,
bool hugepages_enabled, std::string plasma_directory,
const std::string external_store_endpoint):
hugepages_enabled_(hugepages_enabled), external_store_endpoint_(external_store_endpoint) {
ArrowLog::StartArrowLog("plasma_store", ArrowLogLevel::ARROW_INFO);
ArrowLog::InstallFailureSignalHandler();
// Sanity check.
if (socket_name.empty()) {
ARROW_LOG(FATAL) << "please specify socket for incoming connections with -s switch";
}
socket_name_ = socket_name;
if (system_memory == -1) {
ARROW_LOG(FATAL) << "please specify the amount of system memory with -m switch";
}
// Set system memory capacity
plasma::PlasmaAllocator::SetFootprintLimit(static_cast<size_t>(system_memory));
ARROW_LOG(INFO) << "Allowing the Plasma store to use up to "
<< static_cast<double>(system_memory) / 1000000000
<< "GB of memory.";
if (hugepages_enabled && plasma_directory.empty()) {
ARROW_LOG(FATAL) << "if you want to use hugepages, please specify path to huge pages "
"filesystem with -d";
}
if (plasma_directory.empty()) {
#ifdef __linux__
plasma_directory = "/dev/shm";
#else
plasma_directory = "/tmp";
#endif
}
ARROW_LOG(INFO) << "Starting object store with directory " << plasma_directory
<< " and huge page support "
<< (hugepages_enabled ? "enabled" : "disabled");
#ifdef __linux__
if (!hugepages_enabled) {
// On Linux, check that the amount of memory available in /dev/shm is large
// enough to accommodate the request. If it isn't, then fail.
int shm_fd = open(plasma_directory.c_str(), O_RDONLY);
struct statvfs shm_vfs_stats;
fstatvfs(shm_fd, &shm_vfs_stats);
// The value shm_vfs_stats.f_bsize is the block size, and the value
// shm_vfs_stats.f_bavail is the number of available blocks.
int64_t shm_mem_avail = shm_vfs_stats.f_bsize * shm_vfs_stats.f_bavail;
close(shm_fd);
// Keep some safety margin for allocator fragmentation.
shm_mem_avail = 9 * shm_mem_avail / 10;
if (system_memory > shm_mem_avail) {
ARROW_LOG(WARNING)
<< "System memory request exceeds memory available in " << plasma_directory
<< ". The request is for " << system_memory
<< " bytes, and the amount available is " << shm_mem_avail
<< " bytes. You may be able to free up space by deleting files in "
"/dev/shm. If you are inside a Docker container, you may need to "
"pass an argument with the flag '--shm-size' to 'docker run'.";
system_memory = shm_mem_avail;
}
} else {
plasma::SetMallocGranularity(1024 * 1024 * 1024); // 1 GB
}
#endif
system_memory_ = system_memory;
plasma_directory_ = plasma_directory;
}
void PlasmaStoreRunner::Start() {
#ifdef _WINSOCKAPI_
WSADATA wsadata;
WSAStartup(MAKEWORD(2, 2), &wsadata);
#endif
// Get external store
std::shared_ptr<plasma::ExternalStore> external_store{nullptr};
if (!external_store_endpoint_.empty()) {
std::string name;
ARROW_CHECK_OK(
plasma::ExternalStores::ExtractStoreName(external_store_endpoint_, &name));
external_store = plasma::ExternalStores::GetStore(name);
if (external_store == nullptr) {
ARROW_LOG(FATAL) << "No such external store \"" << name << "\"";
}
ARROW_LOG(DEBUG) << "connecting to external store...";
ARROW_CHECK_OK(external_store->Connect(external_store_endpoint_));
}
ARROW_LOG(DEBUG) << "starting server listening on " << socket_name_;
// Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
// to a client that has already died, the store could die.
#ifndef _WIN32 // TODO(mehrdadn): Is there an equivalent of this we need for Windows?
// Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
// to a client that has already died, the store could die.
signal(SIGPIPE, SIG_IGN);
#endif
signal(SIGTERM, HandleSignal);
// Create the event loop.
loop_.reset(new EventLoop);
store_.reset(new PlasmaStore(loop_.get(), plasma_directory_, hugepages_enabled_,
socket_name_, external_store));
plasma_config = store_->GetPlasmaStoreInfo();
// We are using a single memory-mapped file by mallocing and freeing a single
// large amount of space up front. According to the documentation,
// dlmalloc might need up to 128*sizeof(size_t) bytes for internal
// bookkeeping.
void* pointer = plasma::PlasmaAllocator::Memalign(
kBlockSize, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t));
ARROW_CHECK(pointer != nullptr);
// This will unmap the file, but the next one created will be as large
// as this one (this is an implementation detail of dlmalloc).
plasma::PlasmaAllocator::Free(
pointer, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t));
int socket = ConnectOrListenIpcSock(socket_name_, true);
// TODO(pcm): Check return value.
ARROW_CHECK(socket >= 0);
loop_->AddFileEvent(socket, kEventLoopRead, [this, socket](int events) {
this->store_->ConnectClient(socket);
});
loop_->Start();
Shutdown();
#ifdef _WINSOCKAPI_
WSACleanup();
#endif
ArrowLog::UninstallSignalAction();
ArrowLog::ShutDownArrowLog();
}
void PlasmaStoreRunner::Stop() { loop_->Stop(); }
void PlasmaStoreRunner::Shutdown() {
loop_->Shutdown();
loop_ = nullptr;
store_ = nullptr;
}
} // namespace plasma

View file

@ -245,25 +245,6 @@ class PlasmaStore {
#endif
};
class PlasmaStoreRunner {
public:
PlasmaStoreRunner(std::string socket_name, int64_t system_memory,
bool hugepages_enabled, std::string plasma_directory,
const std::string external_store_endpoint);
void Start();
void Stop();
void Shutdown();
private:
std::string socket_name_;
int64_t system_memory_;
bool hugepages_enabled_;
std::string plasma_directory_;
std::string external_store_endpoint_;
std::unique_ptr<EventLoop> loop_;
std::unique_ptr<PlasmaStore> store_;
};
} // namespace plasma
#endif // PLASMA_STORE_H

View file

@ -0,0 +1,164 @@
#include "ray/object_manager/plasma/store_runner.h"
#include <fcntl.h>
#include <signal.h>
#include <stdio.h>
#include <sys/statvfs.h>
#include <sys/types.h>
#include <unistd.h>
#include "ray/object_manager/plasma/io.h"
#include "ray/object_manager/plasma/plasma_allocator.h"
namespace plasma {
using arrow::util::ArrowLog;
using arrow::util::ArrowLogLevel;
void SetMallocGranularity(int value);
void HandleSignal(int signal) {
if (signal == SIGTERM) {
ARROW_LOG(INFO) << "SIGTERM Signal received, closing Plasma Server...";
plasma_store_runner->Stop();
}
}
PlasmaStoreRunner::PlasmaStoreRunner(std::string socket_name, int64_t system_memory,
bool hugepages_enabled, std::string plasma_directory,
const std::string external_store_endpoint):
hugepages_enabled_(hugepages_enabled), external_store_endpoint_(external_store_endpoint) {
ArrowLog::StartArrowLog("plasma_store", ArrowLogLevel::ARROW_INFO);
ArrowLog::InstallFailureSignalHandler();
// Sanity check.
if (socket_name.empty()) {
ARROW_LOG(FATAL) << "please specify socket for incoming connections with -s switch";
}
socket_name_ = socket_name;
if (system_memory == -1) {
ARROW_LOG(FATAL) << "please specify the amount of system memory with -m switch";
}
// Set system memory capacity
PlasmaAllocator::SetFootprintLimit(static_cast<size_t>(system_memory));
ARROW_LOG(INFO) << "Allowing the Plasma store to use up to "
<< static_cast<double>(system_memory) / 1000000000
<< "GB of memory.";
if (hugepages_enabled && plasma_directory.empty()) {
ARROW_LOG(FATAL) << "if you want to use hugepages, please specify path to huge pages "
"filesystem with -d";
}
if (plasma_directory.empty()) {
#ifdef __linux__
plasma_directory = "/dev/shm";
#else
plasma_directory = "/tmp";
#endif
}
ARROW_LOG(INFO) << "Starting object store with directory " << plasma_directory
<< " and huge page support "
<< (hugepages_enabled ? "enabled" : "disabled");
#ifdef __linux__
if (!hugepages_enabled) {
// On Linux, check that the amount of memory available in /dev/shm is large
// enough to accommodate the request. If it isn't, then fail.
int shm_fd = open(plasma_directory.c_str(), O_RDONLY);
struct statvfs shm_vfs_stats;
fstatvfs(shm_fd, &shm_vfs_stats);
// The value shm_vfs_stats.f_bsize is the block size, and the value
// shm_vfs_stats.f_bavail is the number of available blocks.
int64_t shm_mem_avail = shm_vfs_stats.f_bsize * shm_vfs_stats.f_bavail;
close(shm_fd);
// Keep some safety margin for allocator fragmentation.
shm_mem_avail = 9 * shm_mem_avail / 10;
if (system_memory > shm_mem_avail) {
ARROW_LOG(WARNING)
<< "System memory request exceeds memory available in " << plasma_directory
<< ". The request is for " << system_memory
<< " bytes, and the amount available is " << shm_mem_avail
<< " bytes. You may be able to free up space by deleting files in "
"/dev/shm. If you are inside a Docker container, you may need to "
"pass an argument with the flag '--shm-size' to 'docker run'.";
system_memory = shm_mem_avail;
}
} else {
SetMallocGranularity(1024 * 1024 * 1024); // 1 GB
}
#endif
system_memory_ = system_memory;
plasma_directory_ = plasma_directory;
}
void PlasmaStoreRunner::Start() {
#ifdef _WINSOCKAPI_
WSADATA wsadata;
WSAStartup(MAKEWORD(2, 2), &wsadata);
#endif
// Get external store
std::shared_ptr<plasma::ExternalStore> external_store{nullptr};
if (!external_store_endpoint_.empty()) {
std::string name;
ARROW_CHECK_OK(
plasma::ExternalStores::ExtractStoreName(external_store_endpoint_, &name));
external_store = plasma::ExternalStores::GetStore(name);
if (external_store == nullptr) {
ARROW_LOG(FATAL) << "No such external store \"" << name << "\"";
}
ARROW_LOG(DEBUG) << "connecting to external store...";
ARROW_CHECK_OK(external_store->Connect(external_store_endpoint_));
}
ARROW_LOG(DEBUG) << "starting server listening on " << socket_name_;
#ifndef _WIN32 // TODO(mehrdadn): Is there an equivalent of this we need for Windows?
// Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
// to a client that has already died, the store could die.
signal(SIGPIPE, SIG_IGN);
#endif
signal(SIGTERM, HandleSignal);
// Create the event loop.
loop_.reset(new EventLoop);
store_.reset(new PlasmaStore(loop_.get(), plasma_directory_, hugepages_enabled_,
socket_name_, external_store));
plasma_config = store_->GetPlasmaStoreInfo();
// We are using a single memory-mapped file by mallocing and freeing a single
// large amount of space up front. According to the documentation,
// dlmalloc might need up to 128*sizeof(size_t) bytes for internal
// bookkeeping.
void* pointer = PlasmaAllocator::Memalign(
kBlockSize, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t));
ARROW_CHECK(pointer != nullptr);
// This will unmap the file, but the next one created will be as large
// as this one (this is an implementation detail of dlmalloc).
PlasmaAllocator::Free(
pointer, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t));
int socket = ConnectOrListenIpcSock(socket_name_, true);
// TODO(pcm): Check return value.
ARROW_CHECK(socket >= 0);
loop_->AddFileEvent(socket, kEventLoopRead, [this, socket](int events) {
this->store_->ConnectClient(socket);
});
loop_->Start();
Shutdown();
#ifdef _WINSOCKAPI_
WSACleanup();
#endif
ArrowLog::UninstallSignalAction();
ArrowLog::ShutDownArrowLog();
}
void PlasmaStoreRunner::Stop() { loop_->Stop(); }
void PlasmaStoreRunner::Shutdown() {
loop_->Shutdown();
loop_ = nullptr;
store_ = nullptr;
}
std::unique_ptr<PlasmaStoreRunner> plasma_store_runner;
} // namespace plasma

View file

@ -0,0 +1,38 @@
#ifndef PLASMA_STORE_RUNNER_H
#define PLASMA_STORE_RUNNER_H
#include <memory>
#include "ray/object_manager/plasma/store.h"
namespace plasma {
class PlasmaStoreRunner {
public:
PlasmaStoreRunner(std::string socket_name, int64_t system_memory,
bool hugepages_enabled, std::string plasma_directory,
const std::string external_store_endpoint);
void Start();
void Stop();
void Shutdown();
private:
std::string socket_name_;
int64_t system_memory_;
bool hugepages_enabled_;
std::string plasma_directory_;
std::string external_store_endpoint_;
std::unique_ptr<EventLoop> loop_;
std::unique_ptr<PlasmaStore> store_;
};
// We use a global variable for Plasma Store instance here because:
// 1) There is only one plasma store thread in Raylet or the Plasma Store process.
// 2) The thirdparty dlmalloc library cannot be contained in a local variable,
// so even we use a local variable for plasma store, it does not provide
// better isolation.
extern std::unique_ptr<PlasmaStoreRunner> plasma_store_runner;
} // namespace plasma
#endif // PLASMA_STORE_RUNNER_H

View file

@ -4,7 +4,7 @@
#include <chrono>
#include <thread>
#include "ray/object_manager/plasma/store.h"
#include "ray/object_manager/plasma/store_runner.h"
// TODO(pcm): Convert getopt and sscanf in the store to use more idiomatic C++
// and get rid of the next three lines:
#ifndef __STDC_FORMAT_MACROS
@ -50,9 +50,11 @@ int main(int argc, char *argv[]) {
}
if (!keep_idle) {
plasma::PlasmaStoreRunner runner(socket_name, system_memory, hugepages_enabled,
plasma_directory, external_store_endpoint);
runner.Start();
plasma::plasma_store_runner.reset(
new plasma::PlasmaStoreRunner(socket_name, system_memory, hugepages_enabled,
plasma_directory, external_store_endpoint));
plasma::plasma_store_runner->Start();
plasma::plasma_store_runner.reset();
} else {
printf(
"The Plasma Store is started with the '-z' flag, "