Update arrow with better dataframe serialization and get rid of custo… (#1413)

* Update arrow with better dataframe serialization and get rid of custom dataframe serializers.

* Update plasma client API.

* Fix potential bug.

* Bug fix.

* Update arrow to use deduplicated file descriptors and mutable buffers.

* Fix tests.

* Update commit.

* Update commit.

* Update commit.

* Update commit.

* Update commit

* Update commit back to arrow codebase.'
This commit is contained in:
Robert Nishihara 2018-01-24 10:03:29 -08:00 committed by Philipp Moritz
parent f1303291b4
commit 5acc98e629
5 changed files with 30 additions and 29 deletions

View file

@ -6,10 +6,5 @@ from .dataframe import DataFrame
from .dataframe import from_pandas
from .dataframe import to_pandas
from .series import Series
import ray
import pandas as pd
__all__ = ["DataFrame", "from_pandas", "to_pandas", "Series"]
ray.register_custom_serializer(pd.DataFrame, use_pickle=True)
ray.register_custom_serializer(pd.core.indexes.base.Index, use_pickle=True)

View file

@ -392,7 +392,7 @@ void finish_killed_task(LocalSchedulerState *state,
int64_t num_returns = TaskSpec_num_returns(spec);
for (int i = 0; i < num_returns; i++) {
ObjectID object_id = TaskSpec_return(spec, i);
uint8_t *data = NULL;
std::shared_ptr<MutableBuffer> data;
// TODO(ekl): this writes an invalid arrow object, which is sufficient to
// signal that the worker failed, but it would be nice to return more
// detailed failure metadata in the future.

View file

@ -798,14 +798,14 @@ void process_transfer_request(event_loop *loop,
}
}
DCHECK(object_buffer.metadata ==
object_buffer.data + object_buffer.data_size);
CHECK(object_buffer.metadata->data() ==
object_buffer.data->data() + object_buffer.data_size);
PlasmaRequestBuffer *buf = new PlasmaRequestBuffer();
buf->type = MessageType_PlasmaDataReply;
buf->object_id = obj_id;
/* We treat buf->data as a pointer to the concatenated data and metadata, so
* we don't actually use buf->metadata. */
buf->data = object_buffer.data;
buf->data = const_cast<uint8_t *>(object_buffer.data->data());
buf->data_size = object_buffer.data_size;
buf->metadata_size = object_buffer.metadata_size;
@ -839,8 +839,10 @@ void process_data_request(event_loop *loop,
/* The corresponding call to plasma_release should happen in
* process_data_chunk. */
std::shared_ptr<MutableBuffer> data;
Status s = conn->manager_state->plasma_conn->Create(
object_id.to_plasma_id(), data_size, NULL, metadata_size, &(buf->data));
object_id.to_plasma_id(), data_size, NULL, metadata_size, &data);
/* If success_create == true, a new object has been created.
* If success_create == false the object creation has failed, possibly
* due to an object with the same ID already existing in the Plasma Store. */
@ -857,6 +859,7 @@ void process_data_request(event_loop *loop,
event_loop_remove_file(loop, client_sock);
event_loop_file_handler data_chunk_handler;
if (s.ok()) {
buf->data = data->mutable_data();
data_chunk_handler = process_data_chunk;
} else {
/* Since plasma_create() has failed, we ignore the data transfer. We will

View file

@ -32,7 +32,7 @@ TEST plasma_status_tests(void) {
int64_t data_size = 100;
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
uint8_t *data;
std::shared_ptr<MutableBuffer> data;
ARROW_CHECK_OK(
client1.Create(oid1, data_size, metadata, metadata_size, &data));
ARROW_CHECK_OK(client1.Seal(oid1));
@ -73,7 +73,7 @@ TEST plasma_fetch_tests(void) {
int64_t data_size = 100;
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
uint8_t *data;
std::shared_ptr<MutableBuffer> data;
ARROW_CHECK_OK(
client1.Create(oid1, data_size, metadata, metadata_size, &data));
ARROW_CHECK_OK(client1.Seal(oid1));
@ -116,7 +116,9 @@ void init_data_123(uint8_t *data, uint64_t size, uint8_t base) {
}
}
bool is_equal_data_123(uint8_t *data1, uint8_t *data2, uint64_t size) {
bool is_equal_data_123(const uint8_t *data1,
const uint8_t *data2,
uint64_t size) {
for (size_t i = 0; i < size; i++) {
if (data1[i] != data2[i]) {
return false;
@ -142,14 +144,15 @@ TEST plasma_nonblocking_get_tests(void) {
int64_t data_size = 4;
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
uint8_t *data;
std::shared_ptr<MutableBuffer> data;
ARROW_CHECK_OK(client.Create(oid, data_size, metadata, metadata_size, &data));
init_data_123(data, data_size, 0);
init_data_123(data->mutable_data(), data_size, 0);
ARROW_CHECK_OK(client.Seal(oid));
sleep(1);
ARROW_CHECK_OK(client.Get(oid_array, 1, 0, &obj_buffer));
ASSERT(is_equal_data_123(data, obj_buffer.data, data_size) == true);
ASSERT(is_equal_data_123(data->data(), obj_buffer.data->data(), data_size) ==
true);
sleep(1);
ARROW_CHECK_OK(client.Disconnect());
@ -191,7 +194,7 @@ TEST plasma_wait_for_objects_tests(void) {
int64_t data_size = 4;
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
uint8_t *data;
std::shared_ptr<MutableBuffer> data;
ARROW_CHECK_OK(
client1.Create(oid1, data_size, metadata, metadata_size, &data));
ARROW_CHECK_OK(client1.Seal(oid1));
@ -245,23 +248,23 @@ TEST plasma_get_tests(void) {
int64_t data_size = 4;
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
uint8_t *data;
std::shared_ptr<MutableBuffer> data;
ARROW_CHECK_OK(
client1.Create(oid1, data_size, metadata, metadata_size, &data));
init_data_123(data, data_size, 1);
init_data_123(data->mutable_data(), data_size, 1);
ARROW_CHECK_OK(client1.Seal(oid1));
ARROW_CHECK_OK(client1.Get(oid_array1, 1, -1, &obj_buffer));
ASSERT(data[0] == obj_buffer.data[0]);
ASSERT(data->data()[0] == obj_buffer.data->data()[0]);
ARROW_CHECK_OK(
client2.Create(oid2, data_size, metadata, metadata_size, &data));
init_data_123(data, data_size, 2);
init_data_123(data->mutable_data(), data_size, 2);
ARROW_CHECK_OK(client2.Seal(oid2));
ARROW_CHECK_OK(client1.Fetch(1, oid_array2));
ARROW_CHECK_OK(client1.Get(oid_array2, 1, -1, &obj_buffer));
ASSERT(data[0] == obj_buffer.data[0]);
ASSERT(data->data()[0] == obj_buffer.data->data()[0]);
sleep(1);
ARROW_CHECK_OK(client1.Disconnect());
@ -288,25 +291,25 @@ TEST plasma_get_multiple_tests(void) {
int64_t data_size = 4;
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
uint8_t *data;
std::shared_ptr<MutableBuffer> data;
ARROW_CHECK_OK(
client1.Create(oid1, data_size, metadata, metadata_size, &data));
init_data_123(data, data_size, obj1_first);
init_data_123(data->mutable_data(), data_size, obj1_first);
ARROW_CHECK_OK(client1.Seal(oid1));
/* This only waits for oid1. */
ARROW_CHECK_OK(client1.Get(obj_ids, 1, -1, obj_buffer));
ASSERT(data[0] == obj_buffer[0].data[0]);
ASSERT(data->data()[0] == obj_buffer[0].data->data()[0]);
ARROW_CHECK_OK(
client2.Create(oid2, data_size, metadata, metadata_size, &data));
init_data_123(data, data_size, obj2_first);
init_data_123(data->mutable_data(), data_size, obj2_first);
ARROW_CHECK_OK(client2.Seal(oid2));
ARROW_CHECK_OK(client1.Fetch(2, obj_ids));
ARROW_CHECK_OK(client1.Get(obj_ids, 2, -1, obj_buffer));
ASSERT(obj1_first == obj_buffer[0].data[0]);
ASSERT(obj2_first == obj_buffer[1].data[0]);
ASSERT(obj1_first == obj_buffer[0].data->data()[0]);
ASSERT(obj2_first == obj_buffer[1].data->data()[0]);
sleep(1);
ARROW_CHECK_OK(client1.Disconnect());

View file

@ -13,4 +13,4 @@ fi
cd $TP_DIR/arrow
git fetch origin master
git checkout 16c79cc94e2440321bcad1ebbef53ea1266b94e8
git checkout d135974a0d3dd9a9fbbb10da4c5dbc65f9324234