Make putting large objects work. (#411)

* putting large objects

* add more checks

* support large objects

* fix test

* fix linting

* upgrade to latest arrow version

* check malloc return code

* print mmap file sizes

* printing

* revert to dlmalloc

* add prints

* more prints

* add printing

* printing

* fix

* update

* fix

* update

* print

* initialization

* temp

* fix

* update

* fix linting

* comment out object_store_full tests

* fix test

* fix test

* evict objects if dlmalloc fails

* fix stresstests

* Fix linting.

* Uncomment large-memory tests.

* Increase memory for docker image for jenkins tests.

* Reduce large memory tests.

* Further reduce large memory tests.
This commit is contained in:
Philipp Moritz 2017-04-05 01:04:05 -07:00 committed by Robert Nishihara
parent 1e84747e13
commit 4043769ba2
11 changed files with 131 additions and 121 deletions

View file

@ -40,7 +40,8 @@ class TestLocalSchedulerClient(unittest.TestCase):
def setUp(self):
# Start Plasma store.
plasma_store_name, self.p1 = plasma.start_plasma_store()
self.plasma_client = plasma.PlasmaClient(plasma_store_name)
self.plasma_client = plasma.PlasmaClient(plasma_store_name,
release_delay=0)
# Start a local scheduler.
scheduler_name, self.p2 = local_scheduler.start_local_scheduler(
plasma_store_name, use_valgrind=USE_VALGRIND)
@ -169,19 +170,15 @@ class TestLocalSchedulerClient(unittest.TestCase):
t.start()
# Make one of the dependencies available.
self.plasma_client.create(object_id1.id(), 1)
buf = self.plasma_client.create(object_id1.id(), 1)
self.plasma_client.seal(object_id1.id())
# Release the object.
del buf
# Check that the thread is still waiting for a task.
time.sleep(0.1)
self.assertTrue(t.is_alive())
# Force eviction of the first dependency.
num_objects = 4
object_size = plasma.DEFAULT_PLASMA_STORE_MEMORY // num_objects
for i in range(num_objects + 1):
object_id = random_object_id()
self.plasma_client.create(object_id.id(), object_size)
self.plasma_client.seal(object_id.id())
self.plasma_client.evict(plasma.DEFAULT_PLASMA_STORE_MEMORY)
# Check that the thread is still waiting for a task.
time.sleep(0.1)
self.assertTrue(t.is_alive())

View file

@ -165,58 +165,26 @@ class TestPlasmaClient(unittest.TestCase):
# Create a list to keep some of the buffers in scope.
memory_buffers = []
_, memory_buffer, _ = create_object(self.plasma_client, 9 * 10 ** 8, 0)
_, memory_buffer, _ = create_object(self.plasma_client, 5 * 10 ** 8, 0)
memory_buffers.append(memory_buffer)
# Remaining space is 10 ** 8. Make sure that we can't create an object of
# size 10 ** 8 + 1, but we can create one of size 10 ** 8.
assert_create_raises_plasma_full(self, 10 ** 8 + 1)
_, memory_buffer, _ = create_object(self.plasma_client, 10 ** 8, 0)
# Remaining space is 5 * 10 ** 8. Make sure that we can't create an object
# of size 5 * 10 ** 8 + 1, but we can create one of size 2 * 10 ** 8.
assert_create_raises_plasma_full(self, 5 * 10 ** 8 + 1)
_, memory_buffer, _ = create_object(self.plasma_client, 2 * 10 ** 8, 0)
del memory_buffer
_, memory_buffer, _ = create_object(self.plasma_client, 10 ** 8, 0)
_, memory_buffer, _ = create_object(self.plasma_client, 2 * 10 ** 8, 0)
del memory_buffer
assert_create_raises_plasma_full(self, 10 ** 8 + 1)
assert_create_raises_plasma_full(self, 5 * 10 ** 8 + 1)
_, memory_buffer, _ = create_object(self.plasma_client, 9 * 10 ** 7, 0)
_, memory_buffer, _ = create_object(self.plasma_client, 2 * 10 ** 8, 0)
memory_buffers.append(memory_buffer)
# Remaining space is 10 ** 7.
assert_create_raises_plasma_full(self, 10 ** 7 + 1)
# Remaining space is 3 * 10 ** 8.
assert_create_raises_plasma_full(self, 3 * 10 ** 8 + 1)
_, memory_buffer, _ = create_object(self.plasma_client, 9 * 10 ** 6, 0)
_, memory_buffer, _ = create_object(self.plasma_client, 10 ** 8, 0)
memory_buffers.append(memory_buffer)
# Remaining space is 10 ** 6.
assert_create_raises_plasma_full(self, 10 ** 6 + 1)
_, memory_buffer, _ = create_object(self.plasma_client, 9 * 10 ** 5, 0)
memory_buffers.append(memory_buffer)
# Remaining space is 10 ** 5.
assert_create_raises_plasma_full(self, 10 ** 5 + 1)
_, memory_buffer, _ = create_object(self.plasma_client, 9 * 10 ** 4, 0)
memory_buffers.append(memory_buffer)
# Remaining space is 10 ** 4.
assert_create_raises_plasma_full(self, 10 ** 4 + 1)
_, memory_buffer, _ = create_object(self.plasma_client, 9 * 10 ** 3, 0)
memory_buffers.append(memory_buffer)
# Remaining space is 10 ** 3.
assert_create_raises_plasma_full(self, 10 ** 3 + 1)
_, memory_buffer, _ = create_object(self.plasma_client, 9 * 10 ** 2, 0)
memory_buffers.append(memory_buffer)
# Remaining space is 10 ** 2.
assert_create_raises_plasma_full(self, 10 ** 2 + 1)
_, memory_buffer, _ = create_object(self.plasma_client, 9 * 10 ** 1, 0)
memory_buffers.append(memory_buffer)
# Remaining space is 10 ** 1.
assert_create_raises_plasma_full(self, 10 ** 1 + 1)
_, memory_buffer, _ = create_object(self.plasma_client, 9 * 10 ** 0, 0)
memory_buffers.append(memory_buffer)
# Remaining space is 10 ** 0.
assert_create_raises_plasma_full(self, 10 ** 0 + 1)
_, memory_buffer, _ = create_object(self.plasma_client, 1, 0)
# Remaining space is 2 * 10 ** 8.
assert_create_raises_plasma_full(self, 2 * 10 ** 8 + 1)
def test_contains(self):
fake_object_ids = [random_object_id() for _ in range(100)]

View file

@ -8,7 +8,6 @@ SequenceBuilder::SequenceBuilder(MemoryPool* pool)
: pool_(pool),
types_(pool, std::make_shared<Int8Type>()),
offsets_(pool, std::make_shared<Int32Type>()),
total_num_bytes_(0),
nones_(pool, std::make_shared<NullType>()),
bools_(pool, std::make_shared<BooleanType>()),
ints_(pool, std::make_shared<Int64Type>()),
@ -30,16 +29,7 @@ SequenceBuilder::SequenceBuilder(MemoryPool* pool)
tuple_offsets_({0}),
dict_offsets_({0}) {}
/* We need to ensure that the number of bytes allocated by arrow
* does not exceed 2**31 - 1. To make sure that is the case, allocation needs
* to be capped at 2**29 - 1, because arrow calculates the next power of two
* for allocations (see arrow::ArrayBuilder::Reserve). TODO(rkn): The (1 << 28)
* below should really be (1 << 29), but there seems to be a bug.
*/
#define UPDATE(OFFSET, TAG) \
if (total_num_bytes_ >= (1 << 28) - 1) { \
return Status::NotImplemented("Sequence contains too many elements"); \
} \
if (TAG == -1) { \
TAG = num_tags; \
num_tags += 1; \
@ -49,50 +39,42 @@ SequenceBuilder::SequenceBuilder(MemoryPool* pool)
RETURN_NOT_OK(nones_.AppendToBitmap(true));
Status SequenceBuilder::AppendNone() {
total_num_bytes_ += sizeof(int32_t);
RETURN_NOT_OK(offsets_.Append(0));
RETURN_NOT_OK(types_.Append(0));
return nones_.AppendToBitmap(false);
}
Status SequenceBuilder::AppendBool(bool data) {
total_num_bytes_ += sizeof(bool);
UPDATE(bools_.length(), bool_tag);
return bools_.Append(data);
}
Status SequenceBuilder::AppendInt64(int64_t data) {
total_num_bytes_ += sizeof(int64_t);
UPDATE(ints_.length(), int_tag);
return ints_.Append(data);
}
Status SequenceBuilder::AppendUInt64(uint64_t data) {
total_num_bytes_ += sizeof(uint64_t);
UPDATE(ints_.length(), int_tag);
return ints_.Append(data);
}
Status SequenceBuilder::AppendBytes(const uint8_t* data, int32_t length) {
total_num_bytes_ += length * sizeof(uint8_t);
UPDATE(bytes_.length(), bytes_tag);
return bytes_.Append(data, length);
}
Status SequenceBuilder::AppendString(const char* data, int32_t length) {
total_num_bytes_ += length * sizeof(char);
UPDATE(strings_.length(), string_tag);
return strings_.Append(data, length);
}
Status SequenceBuilder::AppendFloat(float data) {
total_num_bytes_ += sizeof(float);
UPDATE(floats_.length(), float_tag);
return floats_.Append(data);
}
Status SequenceBuilder::AppendDouble(double data) {
total_num_bytes_ += sizeof(double);
UPDATE(doubles_.length(), double_tag);
return doubles_.Append(data);
}
@ -104,7 +86,6 @@ Status SequenceBuilder::AppendDouble(double data) {
for (auto dim : dims) { \
size *= dim; \
} \
total_num_bytes_ += size * sizeof(TYPE); \
UPDATE(NAME.length(), TAG); \
return NAME.Append(dims, data); \
}
@ -121,27 +102,18 @@ DEF_TENSOR_APPEND(float_tensors_, float, float_tensor_tag);
DEF_TENSOR_APPEND(double_tensors_, double, double_tensor_tag);
Status SequenceBuilder::AppendList(int32_t size) {
// Increase number of bytes to account for offsets
// (types and bitmaps are smaller).
total_num_bytes_ += size * sizeof(int32_t);
UPDATE(list_offsets_.size() - 1, list_tag);
list_offsets_.push_back(list_offsets_.back() + size);
return Status::OK();
}
Status SequenceBuilder::AppendTuple(int32_t size) {
// Increase number of bytes to account for offsets
// (types and bitmaps are smaller).
total_num_bytes_ += size * sizeof(int32_t);
UPDATE(tuple_offsets_.size() - 1, tuple_tag);
tuple_offsets_.push_back(tuple_offsets_.back() + size);
return Status::OK();
}
Status SequenceBuilder::AppendDict(int32_t size) {
// Increase number of bytes to account for offsets
// (types and bitmaps are smaller).
total_num_bytes_ += size * sizeof(int32_t);
UPDATE(dict_offsets_.size() - 1, dict_tag);
dict_offsets_.push_back(dict_offsets_.back() + size);
return Status::OK();

View file

@ -42,16 +42,15 @@ std::shared_ptr<RecordBatch> make_batch(std::shared_ptr<Array> data) {
return std::shared_ptr<RecordBatch>(new RecordBatch(schema, data->length(), {data}));
}
int64_t get_batch_size(std::shared_ptr<RecordBatch> batch) {
Status get_batch_size(std::shared_ptr<RecordBatch> batch, int64_t* size) {
// Determine the size of the file by writing to a mock file.
auto mock = std::make_shared<MockBufferStream>();
std::shared_ptr<arrow::ipc::FileWriter> writer;
ipc::FileWriter::Open(mock.get(), batch->schema(), &writer);
writer->WriteRecordBatch(*batch);
writer->Close();
int64_t size;
ARROW_CHECK_OK(mock->Tell(&size));
return size;
RETURN_NOT_OK(ipc::FileWriter::Open(mock.get(), batch->schema(), &writer));
RETURN_NOT_OK(writer->WriteRecordBatch(*batch, true));
RETURN_NOT_OK(writer->Close());
RETURN_NOT_OK(mock->Tell(size));
return Status::OK();
}
Status read_batch(uint8_t* data, int64_t size, std::shared_ptr<RecordBatch>* batch_out) {
@ -59,8 +58,8 @@ Status read_batch(uint8_t* data, int64_t size, std::shared_ptr<RecordBatch>* bat
auto source = std::make_shared<FixedBufferStream>(
LENGTH_PREFIX_SIZE + data, size - LENGTH_PREFIX_SIZE);
int64_t data_size = *((int64_t*)data);
arrow::ipc::FileReader::Open(source, data_size, &reader);
reader->GetRecordBatch(0, batch_out);
RETURN_NOT_OK(arrow::ipc::FileReader::Open(source, data_size, &reader));
RETURN_NOT_OK(reader->GetRecordBatch(0, batch_out));
return Status::OK();
}
@ -112,7 +111,8 @@ static PyObject* serialize_list(PyObject* self, PyObject* args) {
auto batch = new std::shared_ptr<RecordBatch>();
*batch = make_batch(array);
int64_t size = get_batch_size(*batch);
int64_t size;
ARROW_CHECK_OK(get_batch_size(*batch, &size));
PyObject* r = PyTuple_New(2);
PyTuple_SetItem(r, 0, PyLong_FromLong(LENGTH_PREFIX_SIZE + size));
@ -136,9 +136,9 @@ static PyObject* write_to_buffer(PyObject* self, PyObject* args) {
LENGTH_PREFIX_SIZE + reinterpret_cast<uint8_t*>(buffer->buf),
buffer->len - LENGTH_PREFIX_SIZE);
std::shared_ptr<arrow::ipc::FileWriter> writer;
ipc::FileWriter::Open(target.get(), (*batch)->schema(), &writer);
writer->WriteRecordBatch(*(*batch));
writer->Close();
ARROW_CHECK_OK(ipc::FileWriter::Open(target.get(), (*batch)->schema(), &writer));
ARROW_CHECK_OK(writer->WriteRecordBatch(*(*batch), true));
ARROW_CHECK_OK(writer->Close());
*((int64_t*)buffer->buf) = buffer->len - LENGTH_PREFIX_SIZE;
Py_RETURN_NONE;
}
@ -251,7 +251,8 @@ static PyObject* store_list(PyObject* self, PyObject* args) {
CHECK_SERIALIZATION_ERROR(s);
std::shared_ptr<RecordBatch> batch = make_batch(array);
int64_t size = get_batch_size(batch);
int64_t size;
ARROW_CHECK_OK(get_batch_size(batch, &size));
uint8_t* data;
/* The arrow schema is stored as the metadata of the plasma object and
@ -277,7 +278,7 @@ static PyObject* store_list(PyObject* self, PyObject* args) {
auto target = std::make_shared<FixedBufferStream>(LENGTH_PREFIX_SIZE + data, size);
std::shared_ptr<arrow::ipc::FileWriter> writer;
ipc::FileWriter::Open(target.get(), batch->schema(), &writer);
writer->WriteRecordBatch(*batch);
writer->WriteRecordBatch(*batch, true);
writer->Close();
*((int64_t*)data) = size;

View file

@ -11,4 +11,4 @@ if [ ! -d $TP_DIR/arrow ]; then
git clone https://github.com/apache/arrow/ "$TP_DIR/arrow"
fi
cd $TP_DIR/arrow
git checkout 98a52b4823f3cd0880eaef066dc932f533170292
git checkout 067cd4ebfbd9be9b607658a2a249017cc6db84f9

View file

@ -45,6 +45,7 @@ extern "C" {
void *dlmalloc(size_t);
void *dlmemalign(size_t alignment, size_t bytes);
void dlfree(void *);
size_t dlmalloc_set_footprint_limit(size_t bytes);
}
namespace std {
@ -233,27 +234,33 @@ int create_object(Client *client_context,
* ignore this requst. */
return PlasmaError_ObjectExists;
}
/* Tell the eviction policy how much space we need to create this object. */
int64_t num_objects_to_evict;
ObjectID *objects_to_evict;
bool success = EvictionState_require_space(
plasma_state->eviction_state, plasma_state->plasma_store_info,
data_size + metadata_size, &num_objects_to_evict, &objects_to_evict);
remove_objects(plasma_state, num_objects_to_evict, objects_to_evict);
/* Return an error to the client if not enough space could be freed to create
* the object. */
if (!success) {
return PlasmaError_OutOfMemory;
}
/* Allocate space for the new object. We use dlmemalign instead of dlmalloc in
* order to align the allocated region to a 64-byte boundary. This is not
* strictly necessary, but it is an optimization that could speed up the
* computation of a hash of the data (see compute_object_hash_parallel in
* plasma_client.cc). Note that even though this pointer is 64-byte aligned,
* it is not guaranteed that the corresponding pointer in the client will be
* 64-byte aligned, but in practice it often will be. */
uint8_t *pointer =
(uint8_t *) dlmemalign(BLOCK_SIZE, data_size + metadata_size);
/* Try to evict objects until there is enough space. */
uint8_t *pointer;
do {
/* Allocate space for the new object. We use dlmemalign instead of dlmalloc
* in order to align the allocated region to a 64-byte boundary. This is not
* strictly necessary, but it is an optimization that could speed up the
* computation of a hash of the data (see compute_object_hash_parallel in
* plasma_client.cc). Note that even though this pointer is 64-byte aligned,
* it is not guaranteed that the corresponding pointer in the client will be
* 64-byte aligned, but in practice it often will be. */
pointer = (uint8_t *) dlmemalign(BLOCK_SIZE, data_size + metadata_size);
if (pointer == NULL) {
/* Tell the eviction policy how much space we need to create this object.
*/
int64_t num_objects_to_evict;
ObjectID *objects_to_evict;
bool success = EvictionState_require_space(
plasma_state->eviction_state, plasma_state->plasma_store_info,
data_size + metadata_size, &num_objects_to_evict, &objects_to_evict);
remove_objects(plasma_state, num_objects_to_evict, objects_to_evict);
/* Return an error to the client if not enough space could be freed to
* create the object. */
if (!success) {
return PlasmaError_OutOfMemory;
}
}
} while (pointer == NULL);
int fd;
int64_t map_size;
ptrdiff_t offset;
@ -895,6 +902,9 @@ int main(int argc, char *argv[]) {
system_memory, shm_mem_avail);
}
#endif
/* Make it so dlmalloc fails if we try to request more memory than is
* available. */
dlmalloc_set_footprint_limit((size_t) system_memory);
LOG_DEBUG("starting server listening on %s", socket_name);
start_server(socket_name, system_memory);
}

View file

@ -5350,6 +5350,7 @@ size_t dlmalloc_footprint_limit(void) {
}
size_t dlmalloc_set_footprint_limit(size_t bytes) {
ensure_initialization();
size_t result; /* invert sense of 0 */
if (bytes == 0)
result = granularity_align(1); /* Use minimal size */

View file

@ -0,0 +1,50 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
from numpy.testing import assert_almost_equal
import ray
if __name__ == "__main__":
ray.init()
A = np.zeros(2 ** 31 + 1, dtype="int8")
a = ray.put(A)
assert_almost_equal(ray.get(a), A)
del A
del a
print("Successfully put A.")
# B = {"hello": np.zeros(2 ** 30 + 1),
# "world": np.ones(2 ** 30 + 1)}
# b = ray.put(B)
# assert_almost_equal(ray.get(b)["hello"], B["hello"])
# assert_almost_equal(ray.get(b)["world"], B["world"])
# del B
# del b
# print("Successfully put B.")
# C = [np.ones(2 ** 30 + 1), 42.0 * np.ones(2 ** 30 + 1)]
# c = ray.put(C)
# assert_almost_equal(ray.get(c)[0], C[0])
# assert_almost_equal(ray.get(c)[1], C[1])
# del C
# del c
# print("Successfully put C.")
# D = (2 ** 30 + 1) * ["h"]
# d = ray.put(D)
# assert ray.get(d) == D
# del D
# del d
# print("Successfully put D.")
# E = (2 ** 30 + 1) * ("i",)
# e = ray.put(E)
# assert ray.get(e) == E
# del E
# del e
# print("Successfully put E.")

View file

@ -1,3 +1,7 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import time

View file

@ -12,3 +12,10 @@ python $ROOT_DIR/multi_node_docker_test.py \
--docker-image=$DOCKER_SHA \
--num-nodes=5 \
--test-script=/ray/test/jenkins_tests/multi_node_tests/test_0.py
python $ROOT_DIR/multi_node_docker_test.py \
--docker-image=$DOCKER_SHA \
--num-nodes=1 \
--mem-size=60G \
--shm-size=60G \
--test-script=/ray/test/jenkins_tests/multi_node_tests/large_memory_test.py

View file

@ -190,7 +190,7 @@ class ReconstructionTests(unittest.TestCase):
# all objects' sizes is at least twice the plasma stores' combined allotted
# memory.
num_objects = 1000
size = self.plasma_store_memory * 2 // (num_objects * 8)
size = int(self.plasma_store_memory * 1.5 / (num_objects * 8))
# Define a remote task with no dependencies, which returns a numpy array of
# the given size.
@ -226,7 +226,7 @@ class ReconstructionTests(unittest.TestCase):
# all objects' sizes is at least twice the plasma stores' combined allotted
# memory.
num_objects = 1000
size = self.plasma_store_memory * 2 // (num_objects * 8)
size = int(self.plasma_store_memory * 1.5 / (num_objects * 8))
# Define a root task with no dependencies, which returns a numpy array of
# the given size.