mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
Added error handling and cleanup for objstore overflowing (#275)
This commit is contained in:
parent
ad9cc57485
commit
6c96a05ab4
2 changed files with 22 additions and 2 deletions
22
src/ipc.cc
22
src/ipc.cc
|
@ -1,7 +1,10 @@
|
||||||
#include "ipc.h"
|
#include "ipc.h"
|
||||||
|
|
||||||
#include <stdlib.h>
|
#if defined(__unix__) || defined(__linux__)
|
||||||
|
#include <sys/statvfs.h>
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include <stdlib.h>
|
||||||
#include "ray/ray.h"
|
#include "ray/ray.h"
|
||||||
|
|
||||||
using namespace arrow;
|
using namespace arrow;
|
||||||
|
@ -149,6 +152,7 @@ ObjHandle MemorySegmentPool::allocate(size_t size) {
|
||||||
// TODO(pcm): at the moment, this always creates a new segment, this will be changed
|
// TODO(pcm): at the moment, this always creates a new segment, this will be changed
|
||||||
SegmentId segmentid = segments_.size();
|
SegmentId segmentid = segments_.size();
|
||||||
open_segment(segmentid, size);
|
open_segment(segmentid, size);
|
||||||
|
objstore_memcheck(size);
|
||||||
void* ptr = segments_[segmentid].first->allocate(size);
|
void* ptr = segments_[segmentid].first->allocate(size);
|
||||||
auto handle = segments_[segmentid].first->get_handle_from_address(ptr);
|
auto handle = segments_[segmentid].first->get_handle_from_address(ptr);
|
||||||
return ObjHandle(segmentid, size, handle);
|
return ObjHandle(segmentid, size, handle);
|
||||||
|
@ -178,13 +182,27 @@ std::string MemorySegmentPool::get_segment_name(SegmentId segmentid) {
|
||||||
}
|
}
|
||||||
|
|
||||||
MemorySegmentPool::~MemorySegmentPool() {
|
MemorySegmentPool::~MemorySegmentPool() {
|
||||||
|
destroy_segments();
|
||||||
|
}
|
||||||
|
|
||||||
|
void MemorySegmentPool::objstore_memcheck(int64_t size) {
|
||||||
|
#if defined(__unix__) || defined(__linux__)
|
||||||
|
struct statvfs buffer;
|
||||||
|
statvfs("/dev/shm/", &buffer);
|
||||||
|
if (size + 100 > buffer.f_bsize * buffer.f_bavail) {
|
||||||
|
MemorySegmentPool::destroy_segments();
|
||||||
|
RAY_LOG(RAY_FATAL, "Not enough memory for allocating object in objectstore.");
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
void MemorySegmentPool::destroy_segments() {
|
||||||
for (size_t segmentid = 0; segmentid < segments_.size(); ++segmentid) {
|
for (size_t segmentid = 0; segmentid < segments_.size(); ++segmentid) {
|
||||||
std::string segment_name = get_segment_name(segmentid);
|
std::string segment_name = get_segment_name(segmentid);
|
||||||
segments_[segmentid].first.reset();
|
segments_[segmentid].first.reset();
|
||||||
bip::shared_memory_object::remove(segment_name.c_str());
|
bip::shared_memory_object::remove(segment_name.c_str());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#if defined(WIN32) || defined(_WIN32)
|
#if defined(WIN32) || defined(_WIN32)
|
||||||
namespace boost {
|
namespace boost {
|
||||||
namespace interprocess {
|
namespace interprocess {
|
||||||
|
|
|
@ -140,6 +140,8 @@ public:
|
||||||
uint8_t* get_address(ObjHandle pointer); // get address of shared object
|
uint8_t* get_address(ObjHandle pointer); // get address of shared object
|
||||||
std::string get_segment_name(SegmentId segmentid); // get the name of a segment
|
std::string get_segment_name(SegmentId segmentid); // get the name of a segment
|
||||||
void unmap_segment(SegmentId segmentid); // unmap a memory segment from a client (only to be called by clients)
|
void unmap_segment(SegmentId segmentid); // unmap a memory segment from a client (only to be called by clients)
|
||||||
|
void destroy_segments();
|
||||||
|
void objstore_memcheck(int64_t size);
|
||||||
private:
|
private:
|
||||||
void open_segment(SegmentId segmentid, size_t size = 0); // create a segment or map an existing one into memory
|
void open_segment(SegmentId segmentid, size_t size = 0); // create a segment or map an existing one into memory
|
||||||
void close_segment(SegmentId segmentid); // close a segment
|
void close_segment(SegmentId segmentid); // close a segment
|
||||||
|
|
Loading…
Add table
Reference in a new issue