From fbde8cad748f9e98b9f467fc324f8ab159bc4758 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 6 Jul 2018 13:18:56 -0700 Subject: [PATCH] Update apache arrow to include TensorFlow fix (#2345) --- src/plasma/plasma_manager.cc | 95 +++++++++++++----------- src/plasma/plasma_manager.h | 5 +- src/plasma/plasma_protocol.cc | 50 ++++++------- src/plasma/test/client_tests.cc | 26 +++---- src/plasma/test/manager_tests.cc | 12 +-- thirdparty/scripts/arrow-zero-fill.patch | 30 ++++++++ thirdparty/scripts/build_arrow.sh | 6 +- 7 files changed, 130 insertions(+), 94 deletions(-) create mode 100644 thirdparty/scripts/arrow-zero-fill.patch diff --git a/src/plasma/plasma_manager.cc b/src/plasma/plasma_manager.cc index a55cff177..f4eaa27f2 100644 --- a/src/plasma/plasma_manager.cc +++ b/src/plasma/plasma_manager.cc @@ -101,9 +101,9 @@ void process_status_request(ClientConnection *client_conn, ObjectID object_id); * @param context Client connection. * @return Status of object_id as defined in plasma.h */ -int request_status(ObjectID object_id, - const std::vector &manager_vector, - void *context); +ObjectStatus request_status(ObjectID object_id, + const std::vector &manager_vector, + void *context); /** * Send requested object_id back to the Plasma Manager identified @@ -316,12 +316,13 @@ bool ClientConnection_request_finished(ClientConnection *client_conn) { return client_conn->cursor == -1; } -std::unordered_map> & -object_wait_requests_from_type(PlasmaManagerState *manager_state, int type) { +std::unordered_map> + &object_wait_requests_from_type(PlasmaManagerState *manager_state, + plasma::ObjectRequestType type) { /* We use different types of hash tables for different requests. */ - RAY_CHECK(type == plasma::PLASMA_QUERY_LOCAL || - type == plasma::PLASMA_QUERY_ANYWHERE); - if (type == plasma::PLASMA_QUERY_LOCAL) { + RAY_CHECK(type == plasma::ObjectRequestType::PLASMA_QUERY_LOCAL || + type == plasma::ObjectRequestType::PLASMA_QUERY_ANYWHERE); + if (type == plasma::ObjectRequestType::PLASMA_QUERY_LOCAL) { return manager_state->object_wait_requests_local; } else { return manager_state->object_wait_requests_remote; @@ -330,7 +331,7 @@ object_wait_requests_from_type(PlasmaManagerState *manager_state, int type) { void add_wait_request_for_object(PlasmaManagerState *manager_state, ObjectID object_id, - int type, + plasma::ObjectRequestType type, WaitRequest *wait_req) { auto &object_wait_requests = object_wait_requests_from_type(manager_state, type); @@ -343,7 +344,7 @@ void add_wait_request_for_object(PlasmaManagerState *manager_state, void remove_wait_request_for_object(PlasmaManagerState *manager_state, ObjectID object_id, - int type, + plasma::ObjectRequestType type, WaitRequest *wait_req) { auto &object_wait_requests = object_wait_requests_from_type(manager_state, type); @@ -392,8 +393,8 @@ void return_from_wait(PlasmaManagerState *manager_state, void update_object_wait_requests(PlasmaManagerState *manager_state, ObjectID obj_id, - int type, - int status) { + plasma::ObjectRequestType type, + ObjectStatus status) { auto &object_wait_requests = object_wait_requests_from_type(manager_state, type); /* Update the in-progress wait requests in the specified table. */ @@ -417,7 +418,7 @@ void update_object_wait_requests(PlasmaManagerState *manager_state, /* Check that we found the object. */ RAY_CHECK(object_request != wait_req->object_requests.end()); /* Check that the object found was not previously known to us. */ - RAY_CHECK(object_request->second.status == ObjectStatus_Nonexistent); + RAY_CHECK(object_request->second.status == ObjectStatus::Nonexistent); /* Update the found object's status to a known status. */ object_request->second.status = status; @@ -608,13 +609,13 @@ void send_queued_request(event_loop *loop, PlasmaRequestBuffer *buf = conn->transfer_queue.front(); int err = 0; switch (buf->type) { - case MessageType_PlasmaDataRequest: + case MessageType::PlasmaDataRequest: err = handle_sigpipe( plasma::SendDataRequest(conn->fd, buf->object_id.to_plasma_id(), state->addr, state->port), conn->fd); break; - case MessageType_PlasmaDataReply: + case MessageType::PlasmaDataReply: RAY_LOG(DEBUG) << "Transferring object to manager"; if (ClientConnection_request_finished(conn)) { /* If the cursor is not set, we haven't sent any requests for this object @@ -635,7 +636,7 @@ void send_queued_request(event_loop *loop, /* If the other side hung up, stop sending to this manager. */ if (err != 0) { - if (buf->type == MessageType_PlasmaDataReply) { + if (buf->type == MessageType::PlasmaDataReply) { /* We errored while sending the object, so release it before removing the * connection. The corresponding call to plasma_get occurred in * process_transfer_request. */ @@ -646,7 +647,7 @@ void send_queued_request(event_loop *loop, ClientConnection_free(conn); } else if (ClientConnection_request_finished(conn)) { /* If we are done with this request, remove it from the transfer queue. */ - if (buf->type == MessageType_PlasmaDataReply) { + if (buf->type == MessageType::PlasmaDataReply) { /* We are done sending the object, so release it. The corresponding call * to plasma_get occurred in process_transfer_request. */ ARROW_CHECK_OK(conn->manager_state->plasma_conn->Release( @@ -827,7 +828,7 @@ void process_transfer_request(event_loop *loop, RAY_CHECK(object_buffer.metadata->data() == object_buffer.data->data() + object_buffer.data->size()); PlasmaRequestBuffer *buf = new PlasmaRequestBuffer(); - buf->type = MessageType_PlasmaDataReply; + buf->type = MessageType::PlasmaDataReply; buf->object_id = obj_id; /* We treat buf->data as a pointer to the concatenated data and metadata, so * we don't actually use buf->metadata. */ @@ -938,7 +939,7 @@ void request_transfer_from(PlasmaManagerState *manager_state, } PlasmaRequestBuffer *transfer_request = new PlasmaRequestBuffer(); - transfer_request->type = MessageType_PlasmaDataRequest; + transfer_request->type = MessageType::PlasmaDataRequest; transfer_request->object_id = fetch_req->object_id; if (manager_conn->transfer_queue.size() == 0) { @@ -1085,8 +1086,8 @@ void object_table_subscribe_callback(ObjectID object_id, } /* Run the callback for wait requests. */ update_object_wait_requests(manager_state, object_id, - plasma::PLASMA_QUERY_ANYWHERE, - ObjectStatus_Remote); + plasma::ObjectRequestType::PLASMA_QUERY_ANYWHERE, + ObjectStatus::Remote); } void process_fetch_requests(ClientConnection *client_conn, @@ -1167,7 +1168,7 @@ void process_wait_request(ClientConnection *client_conn, /* Check if this object is already present locally. If so, mark the object * as present. */ if (is_object_local(manager_state, obj_id)) { - object_request.status = ObjectStatus_Local; + object_request.status = ObjectStatus::Local; wait_req->num_satisfied += 1; continue; } @@ -1176,10 +1177,11 @@ void process_wait_request(ClientConnection *client_conn, add_wait_request_for_object(manager_state, obj_id, object_request.type, wait_req); - if (object_request.type == plasma::PLASMA_QUERY_LOCAL) { + if (object_request.type == plasma::ObjectRequestType::PLASMA_QUERY_LOCAL) { /* TODO(rkn): If desired, we could issue a fetch command here to retrieve * the object. */ - } else if (object_request.type == plasma::PLASMA_QUERY_ANYWHERE) { + } else if (object_request.type == + plasma::ObjectRequestType::PLASMA_QUERY_ANYWHERE) { /* Add this object ID to the list of object IDs to request notifications * for from the object table. */ object_ids_to_request[num_object_ids_to_request] = obj_id; @@ -1228,34 +1230,35 @@ void request_status_done(ObjectID object_id, const std::vector &manager_vector, void *context) { ClientConnection *client_conn = (ClientConnection *) context; - int status = request_status(object_id, manager_vector, context); + int status = + static_cast(request_status(object_id, manager_vector, context)); plasma::ObjectID object_id_copy = object_id.to_plasma_id(); handle_sigpipe( plasma::SendStatusReply(client_conn->fd, &object_id_copy, &status, 1), client_conn->fd); } -int request_status(ObjectID object_id, - const std::vector &manager_vector, - void *context) { +ObjectStatus request_status(ObjectID object_id, + const std::vector &manager_vector, + void *context) { ClientConnection *client_conn = (ClientConnection *) context; /* Return success immediately if we already have this object. */ if (is_object_local(client_conn->manager_state, object_id)) { - return ObjectStatus_Local; + return ObjectStatus::Local; } /* Since object is not stored at the local locally, manager_vector.size() > 0 * means that the object is stored at another remote object. Otherwise, if * manager_vector.size() == 0, the object is not stored anywhere. */ - return (manager_vector.size() > 0 ? ObjectStatus_Remote - : ObjectStatus_Nonexistent); + return manager_vector.size() > 0 ? ObjectStatus::Remote + : ObjectStatus::Nonexistent; } void object_table_lookup_fail_callback(ObjectID object_id, void *user_context, void *user_data) { - /* Fail for now. Later, we may want to send a ObjectStatus_Nonexistent to the + /* Fail for now. Later, we may want to send a ObjectStatus::Nonexistent to the * client. */ RAY_CHECK(0); } @@ -1264,7 +1267,7 @@ void process_status_request(ClientConnection *client_conn, plasma::ObjectID object_id) { /* Return success immediately if we already have this object. */ if (is_object_local(client_conn->manager_state, object_id)) { - int status = ObjectStatus_Local; + int status = static_cast(ObjectStatus::Local); handle_sigpipe( plasma::SendStatusReply(client_conn->fd, &object_id, &status, 1), client_conn->fd); @@ -1272,7 +1275,7 @@ void process_status_request(ClientConnection *client_conn, } if (client_conn->manager_state->db == NULL) { - int status = ObjectStatus_Nonexistent; + auto status = static_cast(ObjectStatus::Nonexistent); handle_sigpipe( plasma::SendStatusReply(client_conn->fd, &object_id, &status, 1), client_conn->fd); @@ -1369,10 +1372,12 @@ void process_add_object_notification(PlasmaManagerState *state, } /* Update the in-progress local and remote wait requests. */ - update_object_wait_requests(state, object_id, plasma::PLASMA_QUERY_LOCAL, - ObjectStatus_Local); - update_object_wait_requests(state, object_id, plasma::PLASMA_QUERY_ANYWHERE, - ObjectStatus_Local); + update_object_wait_requests(state, object_id, + plasma::ObjectRequestType::PLASMA_QUERY_LOCAL, + ObjectStatus::Local); + update_object_wait_requests(state, object_id, + plasma::ObjectRequestType::PLASMA_QUERY_ANYWHERE, + ObjectStatus::Local); } void process_object_notification(event_loop *loop, @@ -1473,8 +1478,8 @@ void process_message(event_loop *loop, uint8_t *data; read_message(client_sock, &type, &length, &data); - switch (type) { - case MessageType_PlasmaDataRequest: { + switch (static_cast(type)) { + case MessageType::PlasmaDataRequest: { RAY_LOG(DEBUG) << "Processing data request"; plasma::ObjectID object_id; char *address; @@ -1484,7 +1489,7 @@ void process_message(event_loop *loop, process_transfer_request(loop, object_id, address, port, conn); free(address); } break; - case MessageType_PlasmaDataReply: { + case MessageType::PlasmaDataReply: { RAY_LOG(DEBUG) << "Processing data reply"; plasma::ObjectID object_id; int64_t object_size; @@ -1494,7 +1499,7 @@ void process_message(event_loop *loop, process_data_request(loop, client_sock, object_id, object_size, metadata_size, conn); } break; - case MessageType_PlasmaFetchRequest: { + case MessageType::PlasmaFetchRequest: { RAY_LOG(DEBUG) << "Processing fetch remote"; std::vector object_ids_to_fetch; /* TODO(pcm): process_fetch_requests allocates an array of num_objects @@ -1503,7 +1508,7 @@ void process_message(event_loop *loop, process_fetch_requests(conn, object_ids_to_fetch.size(), object_ids_to_fetch.data()); } break; - case MessageType_PlasmaWaitRequest: { + case MessageType::PlasmaWaitRequest: { RAY_LOG(DEBUG) << "Processing wait"; plasma::ObjectRequestMap object_requests; int64_t timeout_ms; @@ -1513,13 +1518,13 @@ void process_message(event_loop *loop, process_wait_request(conn, std::move(object_requests), timeout_ms, num_ready_objects); } break; - case MessageType_PlasmaStatusRequest: { + case MessageType::PlasmaStatusRequest: { RAY_LOG(DEBUG) << "Processing status"; plasma::ObjectID object_id; ARROW_CHECK_OK(plasma::ReadStatusRequest(data, length, &object_id, 1)); process_status_request(conn, object_id); } break; - case static_cast(CommonMessageType::DISCONNECT_CLIENT): { + case static_cast(CommonMessageType::DISCONNECT_CLIENT): { RAY_LOG(DEBUG) << "Disconnecting client on fd " << client_sock; event_loop_remove_file(loop, client_sock); ClientConnection_free(conn); diff --git a/src/plasma/plasma_manager.h b/src/plasma/plasma_manager.h index 0b3f374eb..527f19b8e 100644 --- a/src/plasma/plasma_manager.h +++ b/src/plasma/plasma_manager.h @@ -1,8 +1,6 @@ #ifndef PLASMA_MANAGER_H #define PLASMA_MANAGER_H -#include - #ifndef RAY_NUM_RETRIES #define NUM_RETRIES 5 #else @@ -11,6 +9,7 @@ typedef struct PlasmaManagerState PlasmaManagerState; typedef struct ClientConnection ClientConnection; +enum class MessageType : int64_t; /** * Initializes the plasma manager state. This connects the manager to the local @@ -156,7 +155,7 @@ ClientConnection *ClientConnection_listen(event_loop *loop, /* Buffer for requests between plasma managers. */ typedef struct PlasmaRequestBuffer { - int type; + MessageType type; ray::ObjectID object_id; uint8_t *data; int64_t data_size; diff --git a/src/plasma/plasma_protocol.cc b/src/plasma/plasma_protocol.cc index 42fa84cc3..b33ca5d34 100644 --- a/src/plasma/plasma_protocol.cc +++ b/src/plasma/plasma_protocol.cc @@ -37,7 +37,7 @@ Status SendCreateRequest(int sock, auto message = CreatePlasmaCreateRequest( fbb, fbb.CreateString(object_id.binary()), data_size, metadata_size); fbb.Finish(message); - return WriteMessage(sock, MessageType_PlasmaCreateRequest, fbb.GetSize(), + return WriteMessage(sock, MessageType::PlasmaCreateRequest, fbb.GetSize(), fbb.GetBufferPointer()); } @@ -65,7 +65,7 @@ Status SendCreateReply(int sock, CreatePlasmaCreateReply(fbb, fbb.CreateString(object_id.binary()), &plasma_object, (PlasmaError) error_code); fbb.Finish(message); - return WriteMessage(sock, MessageType_PlasmaCreateReply, fbb.GetSize(), + return WriteMessage(sock, MessageType::PlasmaCreateReply, fbb.GetSize(), fbb.GetBufferPointer()); } @@ -92,7 +92,7 @@ Status SendSealRequest(int sock, ObjectID object_id, unsigned char *digest) { auto message = CreatePlasmaSealRequest( fbb, fbb.CreateString(object_id.binary()), digest_string); fbb.Finish(message); - return WriteMessage(sock, MessageType_PlasmaSealRequest, fbb.GetSize(), + return WriteMessage(sock, MessageType::PlasmaSealRequest, fbb.GetSize(), fbb.GetBufferPointer()); } @@ -112,7 +112,7 @@ Status SendSealReply(int sock, ObjectID object_id, int error) { auto message = CreatePlasmaSealReply( fbb, fbb.CreateString(object_id.binary()), (PlasmaError) error); fbb.Finish(message); - return WriteMessage(sock, MessageType_PlasmaSealReply, fbb.GetSize(), + return WriteMessage(sock, MessageType::PlasmaSealReply, fbb.GetSize(), fbb.GetBufferPointer()); } @@ -130,7 +130,7 @@ Status SendReleaseRequest(int sock, ObjectID object_id) { auto message = CreatePlasmaReleaseRequest(fbb, fbb.CreateString(object_id.binary())); fbb.Finish(message); - return WriteMessage(sock, MessageType_PlasmaReleaseRequest, fbb.GetSize(), + return WriteMessage(sock, MessageType::PlasmaReleaseRequest, fbb.GetSize(), fbb.GetBufferPointer()); } @@ -146,7 +146,7 @@ Status SendReleaseReply(int sock, ObjectID object_id, int error) { auto message = CreatePlasmaReleaseReply( fbb, fbb.CreateString(object_id.binary()), (PlasmaError) error); fbb.Finish(message); - return WriteMessage(sock, MessageType_PlasmaReleaseReply, fbb.GetSize(), + return WriteMessage(sock, MessageType::PlasmaReleaseReply, fbb.GetSize(), fbb.GetBufferPointer()); } @@ -164,7 +164,7 @@ Status SendDeleteRequest(int sock, ObjectID object_id) { auto message = CreatePlasmaDeleteRequest(fbb, fbb.CreateString(object_id.binary())); fbb.Finish(message); - return WriteMessage(sock, MessageType_PlasmaDeleteRequest, fbb.GetSize(), + return WriteMessage(sock, MessageType::PlasmaDeleteRequest, fbb.GetSize(), fbb.GetBufferPointer()); } @@ -180,7 +180,7 @@ Status SendDeleteReply(int sock, ObjectID object_id, int error) { auto message = CreatePlasmaDeleteReply( fbb, fbb.CreateString(object_id.binary()), (PlasmaError) error); fbb.Finish(message); - return WriteMessage(sock, MessageType_PlasmaDeleteReply, fbb.GetSize(), + return WriteMessage(sock, MessageType::PlasmaDeleteReply, fbb.GetSize(), fbb.GetBufferPointer()); } @@ -198,7 +198,7 @@ Status SendStatusRequest(int sock, ObjectID object_ids[], int64_t num_objects) { auto message = CreatePlasmaStatusRequest( fbb, to_flatbuffer(fbb, object_ids, num_objects)); fbb.Finish(message); - return WriteMessage(sock, MessageType_PlasmaStatusRequest, fbb.GetSize(), + return WriteMessage(sock, MessageType::PlasmaStatusRequest, fbb.GetSize(), fbb.GetBufferPointer()); } @@ -215,14 +215,14 @@ Status ReadStatusRequest(uint8_t *data, Status SendStatusReply(int sock, ObjectID object_ids[], - int object_status[], + ObjectStatus object_status[], int64_t num_objects) { flatbuffers::FlatBufferBuilder fbb; auto message = CreatePlasmaStatusReply(fbb, to_flatbuffer(fbb, object_ids, num_objects), fbb.CreateVector(object_status, num_objects)); fbb.Finish(message); - return WriteMessage(sock, MessageType_PlasmaStatusReply, fbb.GetSize(), + return WriteMessage(sock, MessageType::PlasmaStatusReply, fbb.GetSize(), fbb.GetBufferPointer()); } @@ -254,7 +254,7 @@ Status SendContainsRequest(int sock, ObjectID object_id) { auto message = CreatePlasmaContainsRequest(fbb, fbb.CreateString(object_id.binary())); fbb.Finish(message); - return WriteMessage(sock, MessageType_PlasmaContainsRequest, fbb.GetSize(), + return WriteMessage(sock, MessageType::PlasmaContainsRequest, fbb.GetSize(), fbb.GetBufferPointer()); } @@ -270,7 +270,7 @@ Status SendContainsReply(int sock, ObjectID object_id, int has_object) { auto message = CreatePlasmaContainsReply( fbb, fbb.CreateString(object_id.binary()), has_object); fbb.Finish(message); - return WriteMessage(sock, MessageType_PlasmaContainsReply, fbb.GetSize(), + return WriteMessage(sock, MessageType::PlasmaContainsReply, fbb.GetSize(), fbb.GetBufferPointer()); } @@ -288,7 +288,7 @@ Status SendConnectRequest(int sock) { flatbuffers::FlatBufferBuilder fbb; auto message = CreatePlasmaConnectRequest(fbb); fbb.Finish(message); - return WriteMessage(sock, MessageType_PlasmaConnectRequest, fbb.GetSize(), + return WriteMessage(sock, MessageType::PlasmaConnectRequest, fbb.GetSize(), fbb.GetBufferPointer()); } @@ -300,7 +300,7 @@ Status SendConnectReply(int sock, int64_t memory_capacity) { flatbuffers::FlatBufferBuilder fbb; auto message = CreatePlasmaConnectReply(fbb, memory_capacity); fbb.Finish(message); - return WriteMessage(sock, MessageType_PlasmaConnectReply, fbb.GetSize(), + return WriteMessage(sock, MessageType::PlasmaConnectReply, fbb.GetSize(), fbb.GetBufferPointer()); } @@ -317,7 +317,7 @@ Status SendEvictRequest(int sock, int64_t num_bytes) { flatbuffers::FlatBufferBuilder fbb; auto message = CreatePlasmaEvictRequest(fbb, num_bytes); fbb.Finish(message); - return WriteMessage(sock, MessageType_PlasmaEvictRequest, fbb.GetSize(), + return WriteMessage(sock, MessageType::PlasmaEvictRequest, fbb.GetSize(), fbb.GetBufferPointer()); } @@ -332,7 +332,7 @@ Status SendEvictReply(int sock, int64_t num_bytes) { flatbuffers::FlatBufferBuilder fbb; auto message = CreatePlasmaEvictReply(fbb, num_bytes); fbb.Finish(message); - return WriteMessage(sock, MessageType_PlasmaEvictReply, fbb.GetSize(), + return WriteMessage(sock, MessageType::PlasmaEvictReply, fbb.GetSize(), fbb.GetBufferPointer()); } @@ -353,7 +353,7 @@ Status SendGetRequest(int sock, auto message = CreatePlasmaGetRequest( fbb, to_flatbuffer(fbb, object_ids, num_objects), timeout_ms); fbb.Finish(message); - return WriteMessage(sock, MessageType_PlasmaGetRequest, fbb.GetSize(), + return WriteMessage(sock, MessageType::PlasmaGetRequest, fbb.GetSize(), fbb.GetBufferPointer()); } @@ -387,7 +387,7 @@ Status SendGetReply(int sock, fbb, to_flatbuffer(fbb, object_ids, num_objects), fbb.CreateVectorOfStructs(objects.data(), num_objects)); fbb.Finish(message); - return WriteMessage(sock, MessageType_PlasmaGetReply, fbb.GetSize(), + return WriteMessage(sock, MessageType::PlasmaGetReply, fbb.GetSize(), fbb.GetBufferPointer()); } @@ -419,7 +419,7 @@ Status SendFetchRequest(int sock, ObjectID object_ids[], int64_t num_objects) { auto message = CreatePlasmaFetchRequest( fbb, to_flatbuffer(fbb, object_ids, num_objects)); fbb.Finish(message); - return WriteMessage(sock, MessageType_PlasmaFetchRequest, fbb.GetSize(), + return WriteMessage(sock, MessageType::PlasmaFetchRequest, fbb.GetSize(), fbb.GetBufferPointer()); } @@ -453,7 +453,7 @@ Status SendWaitRequest(int sock, CreatePlasmaWaitRequest(fbb, fbb.CreateVector(object_request_specs), num_ready_objects, timeout_ms); fbb.Finish(message); - return WriteMessage(sock, MessageType_PlasmaWaitRequest, fbb.GetSize(), + return WriteMessage(sock, MessageType::PlasmaWaitRequest, fbb.GetSize(), fbb.GetBufferPointer()); } @@ -494,7 +494,7 @@ Status SendWaitReply(int sock, fbb, fbb.CreateVector(object_replies.data(), num_ready_objects), num_ready_objects); fbb.Finish(message); - return WriteMessage(sock, MessageType_PlasmaWaitReply, fbb.GetSize(), + return WriteMessage(sock, MessageType::PlasmaWaitReply, fbb.GetSize(), fbb.GetBufferPointer()); } @@ -519,7 +519,7 @@ Status SendSubscribeRequest(int sock) { flatbuffers::FlatBufferBuilder fbb; auto message = CreatePlasmaSubscribeRequest(fbb); fbb.Finish(message); - return WriteMessage(sock, MessageType_PlasmaSubscribeRequest, fbb.GetSize(), + return WriteMessage(sock, MessageType::PlasmaSubscribeRequest, fbb.GetSize(), fbb.GetBufferPointer()); } @@ -534,7 +534,7 @@ Status SendDataRequest(int sock, auto message = CreatePlasmaDataRequest( fbb, fbb.CreateString(object_id.binary()), addr, port); fbb.Finish(message); - return WriteMessage(sock, MessageType_PlasmaDataRequest, fbb.GetSize(), + return WriteMessage(sock, MessageType::PlasmaDataRequest, fbb.GetSize(), fbb.GetBufferPointer()); } @@ -559,7 +559,7 @@ Status SendDataReply(int sock, auto message = CreatePlasmaDataReply( fbb, fbb.CreateString(object_id.binary()), object_size, metadata_size); fbb.Finish(message); - return WriteMessage(sock, MessageType_PlasmaDataReply, fbb.GetSize(), + return WriteMessage(sock, MessageType::PlasmaDataReply, fbb.GetSize(), fbb.GetBufferPointer()); } diff --git a/src/plasma/test/client_tests.cc b/src/plasma/test/client_tests.cc index b42e3481c..023515a6a 100644 --- a/src/plasma/test/client_tests.cc +++ b/src/plasma/test/client_tests.cc @@ -25,7 +25,7 @@ TEST plasma_status_tests(void) { /* Test for object non-existence. */ int status; ARROW_CHECK_OK(client1.Info(oid1, &status)); - ASSERT(status == ObjectStatus_Nonexistent); + ASSERT(status == static_cast(ObjectStatus::Nonexistent)); /* Test for the object being in local Plasma store. */ /* First create object. */ @@ -40,11 +40,11 @@ TEST plasma_status_tests(void) { */ sleep(1); ARROW_CHECK_OK(client1.Info(oid1, &status)); - ASSERT(status == ObjectStatus_Local); + ASSERT(status == static_cast(ObjectStatus::Local)); /* Test for object being remote. */ ARROW_CHECK_OK(client2.Info(oid1, &status)); - ASSERT(status == ObjectStatus_Remote); + ASSERT(status == static_cast(ObjectStatus::Remote)); ARROW_CHECK_OK(client1.Disconnect()); ARROW_CHECK_OK(client2.Disconnect()); @@ -66,7 +66,7 @@ TEST plasma_fetch_tests(void) { /* No object in the system */ ARROW_CHECK_OK(client1.Info(oid1, &status)); - ASSERT(status == ObjectStatus_Nonexistent); + ASSERT(status == static_cast(ObjectStatus::Nonexistent)); /* Test for the object being in local Plasma store. */ /* First create object. */ @@ -84,24 +84,24 @@ TEST plasma_fetch_tests(void) { ObjectID oid_array1[1] = {oid1}; ARROW_CHECK_OK(client1.Fetch(1, oid_array1)); ARROW_CHECK_OK(client1.Info(oid1, &status)); - ASSERT((status == ObjectStatus_Local) || - (status == ObjectStatus_Nonexistent)); + ASSERT(status == static_cast(ObjectStatus::Local) || + status == static_cast(ObjectStatus::Nonexistent)); /* Sleep to make sure Plasma Manager got the notification. */ sleep(1); ARROW_CHECK_OK(client1.Info(oid1, &status)); - ASSERT(status == ObjectStatus_Local); + ASSERT(status == static_cast(ObjectStatus::Local)); /* Test for object being remote. */ ARROW_CHECK_OK(client2.Info(oid1, &status)); - ASSERT(status == ObjectStatus_Remote); + ASSERT(status == static_cast(ObjectStatus::Remote)); /* Sleep to make sure the object has been fetched and it is now stored in the * local Plasma Store. */ ARROW_CHECK_OK(client2.Fetch(1, oid_array1)); sleep(1); ARROW_CHECK_OK(client2.Info(oid1, &status)); - ASSERT(status == ObjectStatus_Local); + ASSERT(status == static_cast(ObjectStatus::Local)); sleep(1); ARROW_CHECK_OK(client1.Disconnect()); @@ -174,9 +174,9 @@ TEST plasma_wait_for_objects_tests(void) { ObjectRequest obj_requests[NUM_OBJ_REQUEST]; obj_requests[0].object_id = oid1; - obj_requests[0].type = PLASMA_QUERY_ANYWHERE; + obj_requests[0].type = ObjectRequestType::PLASMA_QUERY_ANYWHERE; obj_requests[1].object_id = oid2; - obj_requests[1].type = PLASMA_QUERY_ANYWHERE; + obj_requests[1].type = ObjectRequestType::PLASMA_QUERY_ANYWHERE; struct timeval start, end; gettimeofday(&start, NULL); @@ -216,8 +216,8 @@ TEST plasma_wait_for_objects_tests(void) { WAIT_TIMEOUT_MS, &n)); ASSERT(n == 2); - obj_requests[0].type = PLASMA_QUERY_LOCAL; - obj_requests[1].type = PLASMA_QUERY_LOCAL; + obj_requests[0].type = ObjectRequestType::PLASMA_QUERY_LOCAL; + obj_requests[1].type = ObjectRequestType::PLASMA_QUERY_LOCAL; ARROW_CHECK_OK(client1.Wait(NUM_OBJ_REQUEST, obj_requests, NUM_OBJ_REQUEST, WAIT_TIMEOUT_MS, &n)); ASSERT(n == 1); diff --git a/src/plasma/test/manager_tests.cc b/src/plasma/test/manager_tests.cc index c3cf9fa1f..4bb1eacec 100644 --- a/src/plasma/test/manager_tests.cc +++ b/src/plasma/test/manager_tests.cc @@ -112,8 +112,8 @@ void destroy_plasma_mock(plasma_mock *mock) { * - Buffer a transfer request for the remote manager. * - Start and stop the event loop to make sure that we send the buffered * request. - * - Expect to see a MessageType_PlasmaDataRequest message on the remote manager - * with the correct object ID. + * - Expect to see a MessageType::PlasmaDataRequest message on the remote + * manager with the correct object ID. */ TEST request_transfer_test(void) { plasma_mock *local_mock = init_plasma_mock(NULL); @@ -128,7 +128,7 @@ TEST request_transfer_test(void) { event_loop_run(local_mock->loop); int read_fd = get_client_sock(remote_mock->read_conn); std::vector request_data; - ARROW_CHECK_OK(plasma::PlasmaReceive(read_fd, MessageType_PlasmaDataRequest, + ARROW_CHECK_OK(plasma::PlasmaReceive(read_fd, MessageType::PlasmaDataRequest, &request_data)); plasma::ObjectID object_id2; char *address; @@ -152,7 +152,7 @@ TEST request_transfer_test(void) { * - Buffer a transfer request for the remote managers. * - Start and stop the event loop after a timeout to make sure that we * trigger the timeout on the first manager. - * - Expect to see a MessageType_PlasmaDataRequest message on the second remote + * - Expect to see a MessageType::PlasmaDataRequest message on the second remote * manager with the correct object ID. */ TEST request_transfer_retry_test(void) { @@ -180,7 +180,7 @@ TEST request_transfer_retry_test(void) { int read_fd = get_client_sock(remote_mock2->read_conn); std::vector request_data; - ARROW_CHECK_OK(plasma::PlasmaReceive(read_fd, MessageType_PlasmaDataRequest, + ARROW_CHECK_OK(plasma::PlasmaReceive(read_fd, MessageType::PlasmaDataRequest, &request_data)); plasma::ObjectID object_id2; char *address; @@ -211,7 +211,7 @@ TEST read_write_object_chunk_test(void) { const int data_size = strlen(data) + 1; const int metadata_size = 0; PlasmaRequestBuffer remote_buf; - remote_buf.type = MessageType_PlasmaDataReply; + remote_buf.type = MessageType::PlasmaDataReply; remote_buf.object_id = object_id; remote_buf.data = (uint8_t *) data; remote_buf.data_size = data_size; diff --git a/thirdparty/scripts/arrow-zero-fill.patch b/thirdparty/scripts/arrow-zero-fill.patch new file mode 100644 index 000000000..77e0b080f --- /dev/null +++ b/thirdparty/scripts/arrow-zero-fill.patch @@ -0,0 +1,30 @@ +diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc +index 34bd600e..3ef32090 100644 +--- a/cpp/src/arrow/memory_pool.cc ++++ b/cpp/src/arrow/memory_pool.cc +@@ -53,6 +53,7 @@ Status AllocateAligned(int64_t size, uint8_t** out) { + ss << "malloc of size " << size << " failed"; + return Status::OutOfMemory(ss.str()); + } ++ memset(*out, 0, size); + #elif defined(ARROW_JEMALLOC) + *out = reinterpret_cast(mallocx( + std::max(static_cast(size), kAlignment), MALLOCX_ALIGN(kAlignment))); +@@ -75,6 +76,7 @@ Status AllocateAligned(int64_t size, uint8_t** out) { + ss << "invalid alignment parameter: " << kAlignment; + return Status::Invalid(ss.str()); + } ++ memset(*out, 0, size); + #endif + return Status::OK(); + } +@@ -124,6 +126,9 @@ class DefaultMemoryPool : public MemoryPool { + DCHECK(out); + // Copy contents and release old memory chunk + memcpy(out, *ptr, static_cast(std::min(new_size, old_size))); ++ if (new_size > old_size) { ++ memset(out + old_size, 0, static_cast(new_size - old_size)); ++ } + #ifdef _MSC_VER + _aligned_free(*ptr); + #else diff --git a/thirdparty/scripts/build_arrow.sh b/thirdparty/scripts/build_arrow.sh index fb6fd67a1..413dfdcce 100755 --- a/thirdparty/scripts/build_arrow.sh +++ b/thirdparty/scripts/build_arrow.sh @@ -68,10 +68,12 @@ if [[ ! -d $TP_DIR/../python/ray/pyarrow_files/pyarrow || \ pushd $TP_DIR/build/arrow git fetch origin master - # The PR for this commit is https://github.com/apache/arrow/pull/2065. We + # The PR for this commit is https://github.com/apache/arrow/pull/2201. We # include the link here to make it easier to find the right commit because # Arrow often rewrites git history and invalidates certain commits. - git checkout ce23c06469de9cf0c3e38e35cdb8d135f341b964 + git checkout d5d39f770047d671e4879369dd680c69afc370c3 + + git apply $TP_DIR/scripts/arrow-zero-fill.patch cd cpp if [ ! -d "build" ]; then