mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
Update apache arrow to include TensorFlow fix (#2345)
This commit is contained in:
parent
4185aaed10
commit
fbde8cad74
7 changed files with 130 additions and 94 deletions
|
@ -101,9 +101,9 @@ void process_status_request(ClientConnection *client_conn, ObjectID object_id);
|
|||
* @param context Client connection.
|
||||
* @return Status of object_id as defined in plasma.h
|
||||
*/
|
||||
int request_status(ObjectID object_id,
|
||||
const std::vector<DBClientID> &manager_vector,
|
||||
void *context);
|
||||
ObjectStatus request_status(ObjectID object_id,
|
||||
const std::vector<DBClientID> &manager_vector,
|
||||
void *context);
|
||||
|
||||
/**
|
||||
* Send requested object_id back to the Plasma Manager identified
|
||||
|
@ -316,12 +316,13 @@ bool ClientConnection_request_finished(ClientConnection *client_conn) {
|
|||
return client_conn->cursor == -1;
|
||||
}
|
||||
|
||||
std::unordered_map<ObjectID, std::vector<WaitRequest *>> &
|
||||
object_wait_requests_from_type(PlasmaManagerState *manager_state, int type) {
|
||||
std::unordered_map<ObjectID, std::vector<WaitRequest *>>
|
||||
&object_wait_requests_from_type(PlasmaManagerState *manager_state,
|
||||
plasma::ObjectRequestType type) {
|
||||
/* We use different types of hash tables for different requests. */
|
||||
RAY_CHECK(type == plasma::PLASMA_QUERY_LOCAL ||
|
||||
type == plasma::PLASMA_QUERY_ANYWHERE);
|
||||
if (type == plasma::PLASMA_QUERY_LOCAL) {
|
||||
RAY_CHECK(type == plasma::ObjectRequestType::PLASMA_QUERY_LOCAL ||
|
||||
type == plasma::ObjectRequestType::PLASMA_QUERY_ANYWHERE);
|
||||
if (type == plasma::ObjectRequestType::PLASMA_QUERY_LOCAL) {
|
||||
return manager_state->object_wait_requests_local;
|
||||
} else {
|
||||
return manager_state->object_wait_requests_remote;
|
||||
|
@ -330,7 +331,7 @@ object_wait_requests_from_type(PlasmaManagerState *manager_state, int type) {
|
|||
|
||||
void add_wait_request_for_object(PlasmaManagerState *manager_state,
|
||||
ObjectID object_id,
|
||||
int type,
|
||||
plasma::ObjectRequestType type,
|
||||
WaitRequest *wait_req) {
|
||||
auto &object_wait_requests =
|
||||
object_wait_requests_from_type(manager_state, type);
|
||||
|
@ -343,7 +344,7 @@ void add_wait_request_for_object(PlasmaManagerState *manager_state,
|
|||
|
||||
void remove_wait_request_for_object(PlasmaManagerState *manager_state,
|
||||
ObjectID object_id,
|
||||
int type,
|
||||
plasma::ObjectRequestType type,
|
||||
WaitRequest *wait_req) {
|
||||
auto &object_wait_requests =
|
||||
object_wait_requests_from_type(manager_state, type);
|
||||
|
@ -392,8 +393,8 @@ void return_from_wait(PlasmaManagerState *manager_state,
|
|||
|
||||
void update_object_wait_requests(PlasmaManagerState *manager_state,
|
||||
ObjectID obj_id,
|
||||
int type,
|
||||
int status) {
|
||||
plasma::ObjectRequestType type,
|
||||
ObjectStatus status) {
|
||||
auto &object_wait_requests =
|
||||
object_wait_requests_from_type(manager_state, type);
|
||||
/* Update the in-progress wait requests in the specified table. */
|
||||
|
@ -417,7 +418,7 @@ void update_object_wait_requests(PlasmaManagerState *manager_state,
|
|||
/* Check that we found the object. */
|
||||
RAY_CHECK(object_request != wait_req->object_requests.end());
|
||||
/* Check that the object found was not previously known to us. */
|
||||
RAY_CHECK(object_request->second.status == ObjectStatus_Nonexistent);
|
||||
RAY_CHECK(object_request->second.status == ObjectStatus::Nonexistent);
|
||||
/* Update the found object's status to a known status. */
|
||||
object_request->second.status = status;
|
||||
|
||||
|
@ -608,13 +609,13 @@ void send_queued_request(event_loop *loop,
|
|||
PlasmaRequestBuffer *buf = conn->transfer_queue.front();
|
||||
int err = 0;
|
||||
switch (buf->type) {
|
||||
case MessageType_PlasmaDataRequest:
|
||||
case MessageType::PlasmaDataRequest:
|
||||
err = handle_sigpipe(
|
||||
plasma::SendDataRequest(conn->fd, buf->object_id.to_plasma_id(),
|
||||
state->addr, state->port),
|
||||
conn->fd);
|
||||
break;
|
||||
case MessageType_PlasmaDataReply:
|
||||
case MessageType::PlasmaDataReply:
|
||||
RAY_LOG(DEBUG) << "Transferring object to manager";
|
||||
if (ClientConnection_request_finished(conn)) {
|
||||
/* If the cursor is not set, we haven't sent any requests for this object
|
||||
|
@ -635,7 +636,7 @@ void send_queued_request(event_loop *loop,
|
|||
|
||||
/* If the other side hung up, stop sending to this manager. */
|
||||
if (err != 0) {
|
||||
if (buf->type == MessageType_PlasmaDataReply) {
|
||||
if (buf->type == MessageType::PlasmaDataReply) {
|
||||
/* We errored while sending the object, so release it before removing the
|
||||
* connection. The corresponding call to plasma_get occurred in
|
||||
* process_transfer_request. */
|
||||
|
@ -646,7 +647,7 @@ void send_queued_request(event_loop *loop,
|
|||
ClientConnection_free(conn);
|
||||
} else if (ClientConnection_request_finished(conn)) {
|
||||
/* If we are done with this request, remove it from the transfer queue. */
|
||||
if (buf->type == MessageType_PlasmaDataReply) {
|
||||
if (buf->type == MessageType::PlasmaDataReply) {
|
||||
/* We are done sending the object, so release it. The corresponding call
|
||||
* to plasma_get occurred in process_transfer_request. */
|
||||
ARROW_CHECK_OK(conn->manager_state->plasma_conn->Release(
|
||||
|
@ -827,7 +828,7 @@ void process_transfer_request(event_loop *loop,
|
|||
RAY_CHECK(object_buffer.metadata->data() ==
|
||||
object_buffer.data->data() + object_buffer.data->size());
|
||||
PlasmaRequestBuffer *buf = new PlasmaRequestBuffer();
|
||||
buf->type = MessageType_PlasmaDataReply;
|
||||
buf->type = MessageType::PlasmaDataReply;
|
||||
buf->object_id = obj_id;
|
||||
/* We treat buf->data as a pointer to the concatenated data and metadata, so
|
||||
* we don't actually use buf->metadata. */
|
||||
|
@ -938,7 +939,7 @@ void request_transfer_from(PlasmaManagerState *manager_state,
|
|||
}
|
||||
|
||||
PlasmaRequestBuffer *transfer_request = new PlasmaRequestBuffer();
|
||||
transfer_request->type = MessageType_PlasmaDataRequest;
|
||||
transfer_request->type = MessageType::PlasmaDataRequest;
|
||||
transfer_request->object_id = fetch_req->object_id;
|
||||
|
||||
if (manager_conn->transfer_queue.size() == 0) {
|
||||
|
@ -1085,8 +1086,8 @@ void object_table_subscribe_callback(ObjectID object_id,
|
|||
}
|
||||
/* Run the callback for wait requests. */
|
||||
update_object_wait_requests(manager_state, object_id,
|
||||
plasma::PLASMA_QUERY_ANYWHERE,
|
||||
ObjectStatus_Remote);
|
||||
plasma::ObjectRequestType::PLASMA_QUERY_ANYWHERE,
|
||||
ObjectStatus::Remote);
|
||||
}
|
||||
|
||||
void process_fetch_requests(ClientConnection *client_conn,
|
||||
|
@ -1167,7 +1168,7 @@ void process_wait_request(ClientConnection *client_conn,
|
|||
/* Check if this object is already present locally. If so, mark the object
|
||||
* as present. */
|
||||
if (is_object_local(manager_state, obj_id)) {
|
||||
object_request.status = ObjectStatus_Local;
|
||||
object_request.status = ObjectStatus::Local;
|
||||
wait_req->num_satisfied += 1;
|
||||
continue;
|
||||
}
|
||||
|
@ -1176,10 +1177,11 @@ void process_wait_request(ClientConnection *client_conn,
|
|||
add_wait_request_for_object(manager_state, obj_id, object_request.type,
|
||||
wait_req);
|
||||
|
||||
if (object_request.type == plasma::PLASMA_QUERY_LOCAL) {
|
||||
if (object_request.type == plasma::ObjectRequestType::PLASMA_QUERY_LOCAL) {
|
||||
/* TODO(rkn): If desired, we could issue a fetch command here to retrieve
|
||||
* the object. */
|
||||
} else if (object_request.type == plasma::PLASMA_QUERY_ANYWHERE) {
|
||||
} else if (object_request.type ==
|
||||
plasma::ObjectRequestType::PLASMA_QUERY_ANYWHERE) {
|
||||
/* Add this object ID to the list of object IDs to request notifications
|
||||
* for from the object table. */
|
||||
object_ids_to_request[num_object_ids_to_request] = obj_id;
|
||||
|
@ -1228,34 +1230,35 @@ void request_status_done(ObjectID object_id,
|
|||
const std::vector<DBClientID> &manager_vector,
|
||||
void *context) {
|
||||
ClientConnection *client_conn = (ClientConnection *) context;
|
||||
int status = request_status(object_id, manager_vector, context);
|
||||
int status =
|
||||
static_cast<int>(request_status(object_id, manager_vector, context));
|
||||
plasma::ObjectID object_id_copy = object_id.to_plasma_id();
|
||||
handle_sigpipe(
|
||||
plasma::SendStatusReply(client_conn->fd, &object_id_copy, &status, 1),
|
||||
client_conn->fd);
|
||||
}
|
||||
|
||||
int request_status(ObjectID object_id,
|
||||
const std::vector<DBClientID> &manager_vector,
|
||||
void *context) {
|
||||
ObjectStatus request_status(ObjectID object_id,
|
||||
const std::vector<DBClientID> &manager_vector,
|
||||
void *context) {
|
||||
ClientConnection *client_conn = (ClientConnection *) context;
|
||||
|
||||
/* Return success immediately if we already have this object. */
|
||||
if (is_object_local(client_conn->manager_state, object_id)) {
|
||||
return ObjectStatus_Local;
|
||||
return ObjectStatus::Local;
|
||||
}
|
||||
|
||||
/* Since object is not stored at the local locally, manager_vector.size() > 0
|
||||
* means that the object is stored at another remote object. Otherwise, if
|
||||
* manager_vector.size() == 0, the object is not stored anywhere. */
|
||||
return (manager_vector.size() > 0 ? ObjectStatus_Remote
|
||||
: ObjectStatus_Nonexistent);
|
||||
return manager_vector.size() > 0 ? ObjectStatus::Remote
|
||||
: ObjectStatus::Nonexistent;
|
||||
}
|
||||
|
||||
void object_table_lookup_fail_callback(ObjectID object_id,
|
||||
void *user_context,
|
||||
void *user_data) {
|
||||
/* Fail for now. Later, we may want to send a ObjectStatus_Nonexistent to the
|
||||
/* Fail for now. Later, we may want to send a ObjectStatus::Nonexistent to the
|
||||
* client. */
|
||||
RAY_CHECK(0);
|
||||
}
|
||||
|
@ -1264,7 +1267,7 @@ void process_status_request(ClientConnection *client_conn,
|
|||
plasma::ObjectID object_id) {
|
||||
/* Return success immediately if we already have this object. */
|
||||
if (is_object_local(client_conn->manager_state, object_id)) {
|
||||
int status = ObjectStatus_Local;
|
||||
int status = static_cast<int>(ObjectStatus::Local);
|
||||
handle_sigpipe(
|
||||
plasma::SendStatusReply(client_conn->fd, &object_id, &status, 1),
|
||||
client_conn->fd);
|
||||
|
@ -1272,7 +1275,7 @@ void process_status_request(ClientConnection *client_conn,
|
|||
}
|
||||
|
||||
if (client_conn->manager_state->db == NULL) {
|
||||
int status = ObjectStatus_Nonexistent;
|
||||
auto status = static_cast<int>(ObjectStatus::Nonexistent);
|
||||
handle_sigpipe(
|
||||
plasma::SendStatusReply(client_conn->fd, &object_id, &status, 1),
|
||||
client_conn->fd);
|
||||
|
@ -1369,10 +1372,12 @@ void process_add_object_notification(PlasmaManagerState *state,
|
|||
}
|
||||
|
||||
/* Update the in-progress local and remote wait requests. */
|
||||
update_object_wait_requests(state, object_id, plasma::PLASMA_QUERY_LOCAL,
|
||||
ObjectStatus_Local);
|
||||
update_object_wait_requests(state, object_id, plasma::PLASMA_QUERY_ANYWHERE,
|
||||
ObjectStatus_Local);
|
||||
update_object_wait_requests(state, object_id,
|
||||
plasma::ObjectRequestType::PLASMA_QUERY_LOCAL,
|
||||
ObjectStatus::Local);
|
||||
update_object_wait_requests(state, object_id,
|
||||
plasma::ObjectRequestType::PLASMA_QUERY_ANYWHERE,
|
||||
ObjectStatus::Local);
|
||||
}
|
||||
|
||||
void process_object_notification(event_loop *loop,
|
||||
|
@ -1473,8 +1478,8 @@ void process_message(event_loop *loop,
|
|||
uint8_t *data;
|
||||
read_message(client_sock, &type, &length, &data);
|
||||
|
||||
switch (type) {
|
||||
case MessageType_PlasmaDataRequest: {
|
||||
switch (static_cast<MessageType>(type)) {
|
||||
case MessageType::PlasmaDataRequest: {
|
||||
RAY_LOG(DEBUG) << "Processing data request";
|
||||
plasma::ObjectID object_id;
|
||||
char *address;
|
||||
|
@ -1484,7 +1489,7 @@ void process_message(event_loop *loop,
|
|||
process_transfer_request(loop, object_id, address, port, conn);
|
||||
free(address);
|
||||
} break;
|
||||
case MessageType_PlasmaDataReply: {
|
||||
case MessageType::PlasmaDataReply: {
|
||||
RAY_LOG(DEBUG) << "Processing data reply";
|
||||
plasma::ObjectID object_id;
|
||||
int64_t object_size;
|
||||
|
@ -1494,7 +1499,7 @@ void process_message(event_loop *loop,
|
|||
process_data_request(loop, client_sock, object_id, object_size,
|
||||
metadata_size, conn);
|
||||
} break;
|
||||
case MessageType_PlasmaFetchRequest: {
|
||||
case MessageType::PlasmaFetchRequest: {
|
||||
RAY_LOG(DEBUG) << "Processing fetch remote";
|
||||
std::vector<plasma::ObjectID> object_ids_to_fetch;
|
||||
/* TODO(pcm): process_fetch_requests allocates an array of num_objects
|
||||
|
@ -1503,7 +1508,7 @@ void process_message(event_loop *loop,
|
|||
process_fetch_requests(conn, object_ids_to_fetch.size(),
|
||||
object_ids_to_fetch.data());
|
||||
} break;
|
||||
case MessageType_PlasmaWaitRequest: {
|
||||
case MessageType::PlasmaWaitRequest: {
|
||||
RAY_LOG(DEBUG) << "Processing wait";
|
||||
plasma::ObjectRequestMap object_requests;
|
||||
int64_t timeout_ms;
|
||||
|
@ -1513,13 +1518,13 @@ void process_message(event_loop *loop,
|
|||
process_wait_request(conn, std::move(object_requests), timeout_ms,
|
||||
num_ready_objects);
|
||||
} break;
|
||||
case MessageType_PlasmaStatusRequest: {
|
||||
case MessageType::PlasmaStatusRequest: {
|
||||
RAY_LOG(DEBUG) << "Processing status";
|
||||
plasma::ObjectID object_id;
|
||||
ARROW_CHECK_OK(plasma::ReadStatusRequest(data, length, &object_id, 1));
|
||||
process_status_request(conn, object_id);
|
||||
} break;
|
||||
case static_cast<int64_t>(CommonMessageType::DISCONNECT_CLIENT): {
|
||||
case static_cast<MessageType>(CommonMessageType::DISCONNECT_CLIENT): {
|
||||
RAY_LOG(DEBUG) << "Disconnecting client on fd " << client_sock;
|
||||
event_loop_remove_file(loop, client_sock);
|
||||
ClientConnection_free(conn);
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
#ifndef PLASMA_MANAGER_H
|
||||
#define PLASMA_MANAGER_H
|
||||
|
||||
#include <poll.h>
|
||||
|
||||
#ifndef RAY_NUM_RETRIES
|
||||
#define NUM_RETRIES 5
|
||||
#else
|
||||
|
@ -11,6 +9,7 @@
|
|||
|
||||
typedef struct PlasmaManagerState PlasmaManagerState;
|
||||
typedef struct ClientConnection ClientConnection;
|
||||
enum class MessageType : int64_t;
|
||||
|
||||
/**
|
||||
* Initializes the plasma manager state. This connects the manager to the local
|
||||
|
@ -156,7 +155,7 @@ ClientConnection *ClientConnection_listen(event_loop *loop,
|
|||
|
||||
/* Buffer for requests between plasma managers. */
|
||||
typedef struct PlasmaRequestBuffer {
|
||||
int type;
|
||||
MessageType type;
|
||||
ray::ObjectID object_id;
|
||||
uint8_t *data;
|
||||
int64_t data_size;
|
||||
|
|
|
@ -37,7 +37,7 @@ Status SendCreateRequest(int sock,
|
|||
auto message = CreatePlasmaCreateRequest(
|
||||
fbb, fbb.CreateString(object_id.binary()), data_size, metadata_size);
|
||||
fbb.Finish(message);
|
||||
return WriteMessage(sock, MessageType_PlasmaCreateRequest, fbb.GetSize(),
|
||||
return WriteMessage(sock, MessageType::PlasmaCreateRequest, fbb.GetSize(),
|
||||
fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
|
@ -65,7 +65,7 @@ Status SendCreateReply(int sock,
|
|||
CreatePlasmaCreateReply(fbb, fbb.CreateString(object_id.binary()),
|
||||
&plasma_object, (PlasmaError) error_code);
|
||||
fbb.Finish(message);
|
||||
return WriteMessage(sock, MessageType_PlasmaCreateReply, fbb.GetSize(),
|
||||
return WriteMessage(sock, MessageType::PlasmaCreateReply, fbb.GetSize(),
|
||||
fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
|
@ -92,7 +92,7 @@ Status SendSealRequest(int sock, ObjectID object_id, unsigned char *digest) {
|
|||
auto message = CreatePlasmaSealRequest(
|
||||
fbb, fbb.CreateString(object_id.binary()), digest_string);
|
||||
fbb.Finish(message);
|
||||
return WriteMessage(sock, MessageType_PlasmaSealRequest, fbb.GetSize(),
|
||||
return WriteMessage(sock, MessageType::PlasmaSealRequest, fbb.GetSize(),
|
||||
fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
|
@ -112,7 +112,7 @@ Status SendSealReply(int sock, ObjectID object_id, int error) {
|
|||
auto message = CreatePlasmaSealReply(
|
||||
fbb, fbb.CreateString(object_id.binary()), (PlasmaError) error);
|
||||
fbb.Finish(message);
|
||||
return WriteMessage(sock, MessageType_PlasmaSealReply, fbb.GetSize(),
|
||||
return WriteMessage(sock, MessageType::PlasmaSealReply, fbb.GetSize(),
|
||||
fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
|
@ -130,7 +130,7 @@ Status SendReleaseRequest(int sock, ObjectID object_id) {
|
|||
auto message =
|
||||
CreatePlasmaReleaseRequest(fbb, fbb.CreateString(object_id.binary()));
|
||||
fbb.Finish(message);
|
||||
return WriteMessage(sock, MessageType_PlasmaReleaseRequest, fbb.GetSize(),
|
||||
return WriteMessage(sock, MessageType::PlasmaReleaseRequest, fbb.GetSize(),
|
||||
fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
|
@ -146,7 +146,7 @@ Status SendReleaseReply(int sock, ObjectID object_id, int error) {
|
|||
auto message = CreatePlasmaReleaseReply(
|
||||
fbb, fbb.CreateString(object_id.binary()), (PlasmaError) error);
|
||||
fbb.Finish(message);
|
||||
return WriteMessage(sock, MessageType_PlasmaReleaseReply, fbb.GetSize(),
|
||||
return WriteMessage(sock, MessageType::PlasmaReleaseReply, fbb.GetSize(),
|
||||
fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
|
@ -164,7 +164,7 @@ Status SendDeleteRequest(int sock, ObjectID object_id) {
|
|||
auto message =
|
||||
CreatePlasmaDeleteRequest(fbb, fbb.CreateString(object_id.binary()));
|
||||
fbb.Finish(message);
|
||||
return WriteMessage(sock, MessageType_PlasmaDeleteRequest, fbb.GetSize(),
|
||||
return WriteMessage(sock, MessageType::PlasmaDeleteRequest, fbb.GetSize(),
|
||||
fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
|
@ -180,7 +180,7 @@ Status SendDeleteReply(int sock, ObjectID object_id, int error) {
|
|||
auto message = CreatePlasmaDeleteReply(
|
||||
fbb, fbb.CreateString(object_id.binary()), (PlasmaError) error);
|
||||
fbb.Finish(message);
|
||||
return WriteMessage(sock, MessageType_PlasmaDeleteReply, fbb.GetSize(),
|
||||
return WriteMessage(sock, MessageType::PlasmaDeleteReply, fbb.GetSize(),
|
||||
fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
|
@ -198,7 +198,7 @@ Status SendStatusRequest(int sock, ObjectID object_ids[], int64_t num_objects) {
|
|||
auto message = CreatePlasmaStatusRequest(
|
||||
fbb, to_flatbuffer(fbb, object_ids, num_objects));
|
||||
fbb.Finish(message);
|
||||
return WriteMessage(sock, MessageType_PlasmaStatusRequest, fbb.GetSize(),
|
||||
return WriteMessage(sock, MessageType::PlasmaStatusRequest, fbb.GetSize(),
|
||||
fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
|
@ -215,14 +215,14 @@ Status ReadStatusRequest(uint8_t *data,
|
|||
|
||||
Status SendStatusReply(int sock,
|
||||
ObjectID object_ids[],
|
||||
int object_status[],
|
||||
ObjectStatus object_status[],
|
||||
int64_t num_objects) {
|
||||
flatbuffers::FlatBufferBuilder fbb;
|
||||
auto message =
|
||||
CreatePlasmaStatusReply(fbb, to_flatbuffer(fbb, object_ids, num_objects),
|
||||
fbb.CreateVector(object_status, num_objects));
|
||||
fbb.Finish(message);
|
||||
return WriteMessage(sock, MessageType_PlasmaStatusReply, fbb.GetSize(),
|
||||
return WriteMessage(sock, MessageType::PlasmaStatusReply, fbb.GetSize(),
|
||||
fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
|
@ -254,7 +254,7 @@ Status SendContainsRequest(int sock, ObjectID object_id) {
|
|||
auto message =
|
||||
CreatePlasmaContainsRequest(fbb, fbb.CreateString(object_id.binary()));
|
||||
fbb.Finish(message);
|
||||
return WriteMessage(sock, MessageType_PlasmaContainsRequest, fbb.GetSize(),
|
||||
return WriteMessage(sock, MessageType::PlasmaContainsRequest, fbb.GetSize(),
|
||||
fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
|
@ -270,7 +270,7 @@ Status SendContainsReply(int sock, ObjectID object_id, int has_object) {
|
|||
auto message = CreatePlasmaContainsReply(
|
||||
fbb, fbb.CreateString(object_id.binary()), has_object);
|
||||
fbb.Finish(message);
|
||||
return WriteMessage(sock, MessageType_PlasmaContainsReply, fbb.GetSize(),
|
||||
return WriteMessage(sock, MessageType::PlasmaContainsReply, fbb.GetSize(),
|
||||
fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
|
@ -288,7 +288,7 @@ Status SendConnectRequest(int sock) {
|
|||
flatbuffers::FlatBufferBuilder fbb;
|
||||
auto message = CreatePlasmaConnectRequest(fbb);
|
||||
fbb.Finish(message);
|
||||
return WriteMessage(sock, MessageType_PlasmaConnectRequest, fbb.GetSize(),
|
||||
return WriteMessage(sock, MessageType::PlasmaConnectRequest, fbb.GetSize(),
|
||||
fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
|
@ -300,7 +300,7 @@ Status SendConnectReply(int sock, int64_t memory_capacity) {
|
|||
flatbuffers::FlatBufferBuilder fbb;
|
||||
auto message = CreatePlasmaConnectReply(fbb, memory_capacity);
|
||||
fbb.Finish(message);
|
||||
return WriteMessage(sock, MessageType_PlasmaConnectReply, fbb.GetSize(),
|
||||
return WriteMessage(sock, MessageType::PlasmaConnectReply, fbb.GetSize(),
|
||||
fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
|
@ -317,7 +317,7 @@ Status SendEvictRequest(int sock, int64_t num_bytes) {
|
|||
flatbuffers::FlatBufferBuilder fbb;
|
||||
auto message = CreatePlasmaEvictRequest(fbb, num_bytes);
|
||||
fbb.Finish(message);
|
||||
return WriteMessage(sock, MessageType_PlasmaEvictRequest, fbb.GetSize(),
|
||||
return WriteMessage(sock, MessageType::PlasmaEvictRequest, fbb.GetSize(),
|
||||
fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
|
@ -332,7 +332,7 @@ Status SendEvictReply(int sock, int64_t num_bytes) {
|
|||
flatbuffers::FlatBufferBuilder fbb;
|
||||
auto message = CreatePlasmaEvictReply(fbb, num_bytes);
|
||||
fbb.Finish(message);
|
||||
return WriteMessage(sock, MessageType_PlasmaEvictReply, fbb.GetSize(),
|
||||
return WriteMessage(sock, MessageType::PlasmaEvictReply, fbb.GetSize(),
|
||||
fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
|
@ -353,7 +353,7 @@ Status SendGetRequest(int sock,
|
|||
auto message = CreatePlasmaGetRequest(
|
||||
fbb, to_flatbuffer(fbb, object_ids, num_objects), timeout_ms);
|
||||
fbb.Finish(message);
|
||||
return WriteMessage(sock, MessageType_PlasmaGetRequest, fbb.GetSize(),
|
||||
return WriteMessage(sock, MessageType::PlasmaGetRequest, fbb.GetSize(),
|
||||
fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
|
@ -387,7 +387,7 @@ Status SendGetReply(int sock,
|
|||
fbb, to_flatbuffer(fbb, object_ids, num_objects),
|
||||
fbb.CreateVectorOfStructs(objects.data(), num_objects));
|
||||
fbb.Finish(message);
|
||||
return WriteMessage(sock, MessageType_PlasmaGetReply, fbb.GetSize(),
|
||||
return WriteMessage(sock, MessageType::PlasmaGetReply, fbb.GetSize(),
|
||||
fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
|
@ -419,7 +419,7 @@ Status SendFetchRequest(int sock, ObjectID object_ids[], int64_t num_objects) {
|
|||
auto message = CreatePlasmaFetchRequest(
|
||||
fbb, to_flatbuffer(fbb, object_ids, num_objects));
|
||||
fbb.Finish(message);
|
||||
return WriteMessage(sock, MessageType_PlasmaFetchRequest, fbb.GetSize(),
|
||||
return WriteMessage(sock, MessageType::PlasmaFetchRequest, fbb.GetSize(),
|
||||
fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
|
@ -453,7 +453,7 @@ Status SendWaitRequest(int sock,
|
|||
CreatePlasmaWaitRequest(fbb, fbb.CreateVector(object_request_specs),
|
||||
num_ready_objects, timeout_ms);
|
||||
fbb.Finish(message);
|
||||
return WriteMessage(sock, MessageType_PlasmaWaitRequest, fbb.GetSize(),
|
||||
return WriteMessage(sock, MessageType::PlasmaWaitRequest, fbb.GetSize(),
|
||||
fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
|
@ -494,7 +494,7 @@ Status SendWaitReply(int sock,
|
|||
fbb, fbb.CreateVector(object_replies.data(), num_ready_objects),
|
||||
num_ready_objects);
|
||||
fbb.Finish(message);
|
||||
return WriteMessage(sock, MessageType_PlasmaWaitReply, fbb.GetSize(),
|
||||
return WriteMessage(sock, MessageType::PlasmaWaitReply, fbb.GetSize(),
|
||||
fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
|
@ -519,7 +519,7 @@ Status SendSubscribeRequest(int sock) {
|
|||
flatbuffers::FlatBufferBuilder fbb;
|
||||
auto message = CreatePlasmaSubscribeRequest(fbb);
|
||||
fbb.Finish(message);
|
||||
return WriteMessage(sock, MessageType_PlasmaSubscribeRequest, fbb.GetSize(),
|
||||
return WriteMessage(sock, MessageType::PlasmaSubscribeRequest, fbb.GetSize(),
|
||||
fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
|
@ -534,7 +534,7 @@ Status SendDataRequest(int sock,
|
|||
auto message = CreatePlasmaDataRequest(
|
||||
fbb, fbb.CreateString(object_id.binary()), addr, port);
|
||||
fbb.Finish(message);
|
||||
return WriteMessage(sock, MessageType_PlasmaDataRequest, fbb.GetSize(),
|
||||
return WriteMessage(sock, MessageType::PlasmaDataRequest, fbb.GetSize(),
|
||||
fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
|
@ -559,7 +559,7 @@ Status SendDataReply(int sock,
|
|||
auto message = CreatePlasmaDataReply(
|
||||
fbb, fbb.CreateString(object_id.binary()), object_size, metadata_size);
|
||||
fbb.Finish(message);
|
||||
return WriteMessage(sock, MessageType_PlasmaDataReply, fbb.GetSize(),
|
||||
return WriteMessage(sock, MessageType::PlasmaDataReply, fbb.GetSize(),
|
||||
fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ TEST plasma_status_tests(void) {
|
|||
/* Test for object non-existence. */
|
||||
int status;
|
||||
ARROW_CHECK_OK(client1.Info(oid1, &status));
|
||||
ASSERT(status == ObjectStatus_Nonexistent);
|
||||
ASSERT(status == static_cast<int>(ObjectStatus::Nonexistent));
|
||||
|
||||
/* Test for the object being in local Plasma store. */
|
||||
/* First create object. */
|
||||
|
@ -40,11 +40,11 @@ TEST plasma_status_tests(void) {
|
|||
*/
|
||||
sleep(1);
|
||||
ARROW_CHECK_OK(client1.Info(oid1, &status));
|
||||
ASSERT(status == ObjectStatus_Local);
|
||||
ASSERT(status == static_cast<int>(ObjectStatus::Local));
|
||||
|
||||
/* Test for object being remote. */
|
||||
ARROW_CHECK_OK(client2.Info(oid1, &status));
|
||||
ASSERT(status == ObjectStatus_Remote);
|
||||
ASSERT(status == static_cast<int>(ObjectStatus::Remote));
|
||||
|
||||
ARROW_CHECK_OK(client1.Disconnect());
|
||||
ARROW_CHECK_OK(client2.Disconnect());
|
||||
|
@ -66,7 +66,7 @@ TEST plasma_fetch_tests(void) {
|
|||
|
||||
/* No object in the system */
|
||||
ARROW_CHECK_OK(client1.Info(oid1, &status));
|
||||
ASSERT(status == ObjectStatus_Nonexistent);
|
||||
ASSERT(status == static_cast<int>(ObjectStatus::Nonexistent));
|
||||
|
||||
/* Test for the object being in local Plasma store. */
|
||||
/* First create object. */
|
||||
|
@ -84,24 +84,24 @@ TEST plasma_fetch_tests(void) {
|
|||
ObjectID oid_array1[1] = {oid1};
|
||||
ARROW_CHECK_OK(client1.Fetch(1, oid_array1));
|
||||
ARROW_CHECK_OK(client1.Info(oid1, &status));
|
||||
ASSERT((status == ObjectStatus_Local) ||
|
||||
(status == ObjectStatus_Nonexistent));
|
||||
ASSERT(status == static_cast<int>(ObjectStatus::Local) ||
|
||||
status == static_cast<int>(ObjectStatus::Nonexistent));
|
||||
|
||||
/* Sleep to make sure Plasma Manager got the notification. */
|
||||
sleep(1);
|
||||
ARROW_CHECK_OK(client1.Info(oid1, &status));
|
||||
ASSERT(status == ObjectStatus_Local);
|
||||
ASSERT(status == static_cast<int>(ObjectStatus::Local));
|
||||
|
||||
/* Test for object being remote. */
|
||||
ARROW_CHECK_OK(client2.Info(oid1, &status));
|
||||
ASSERT(status == ObjectStatus_Remote);
|
||||
ASSERT(status == static_cast<int>(ObjectStatus::Remote));
|
||||
|
||||
/* Sleep to make sure the object has been fetched and it is now stored in the
|
||||
* local Plasma Store. */
|
||||
ARROW_CHECK_OK(client2.Fetch(1, oid_array1));
|
||||
sleep(1);
|
||||
ARROW_CHECK_OK(client2.Info(oid1, &status));
|
||||
ASSERT(status == ObjectStatus_Local);
|
||||
ASSERT(status == static_cast<int>(ObjectStatus::Local));
|
||||
|
||||
sleep(1);
|
||||
ARROW_CHECK_OK(client1.Disconnect());
|
||||
|
@ -174,9 +174,9 @@ TEST plasma_wait_for_objects_tests(void) {
|
|||
ObjectRequest obj_requests[NUM_OBJ_REQUEST];
|
||||
|
||||
obj_requests[0].object_id = oid1;
|
||||
obj_requests[0].type = PLASMA_QUERY_ANYWHERE;
|
||||
obj_requests[0].type = ObjectRequestType::PLASMA_QUERY_ANYWHERE;
|
||||
obj_requests[1].object_id = oid2;
|
||||
obj_requests[1].type = PLASMA_QUERY_ANYWHERE;
|
||||
obj_requests[1].type = ObjectRequestType::PLASMA_QUERY_ANYWHERE;
|
||||
|
||||
struct timeval start, end;
|
||||
gettimeofday(&start, NULL);
|
||||
|
@ -216,8 +216,8 @@ TEST plasma_wait_for_objects_tests(void) {
|
|||
WAIT_TIMEOUT_MS, &n));
|
||||
ASSERT(n == 2);
|
||||
|
||||
obj_requests[0].type = PLASMA_QUERY_LOCAL;
|
||||
obj_requests[1].type = PLASMA_QUERY_LOCAL;
|
||||
obj_requests[0].type = ObjectRequestType::PLASMA_QUERY_LOCAL;
|
||||
obj_requests[1].type = ObjectRequestType::PLASMA_QUERY_LOCAL;
|
||||
ARROW_CHECK_OK(client1.Wait(NUM_OBJ_REQUEST, obj_requests, NUM_OBJ_REQUEST,
|
||||
WAIT_TIMEOUT_MS, &n));
|
||||
ASSERT(n == 1);
|
||||
|
|
|
@ -112,8 +112,8 @@ void destroy_plasma_mock(plasma_mock *mock) {
|
|||
* - Buffer a transfer request for the remote manager.
|
||||
* - Start and stop the event loop to make sure that we send the buffered
|
||||
* request.
|
||||
* - Expect to see a MessageType_PlasmaDataRequest message on the remote manager
|
||||
* with the correct object ID.
|
||||
* - Expect to see a MessageType::PlasmaDataRequest message on the remote
|
||||
* manager with the correct object ID.
|
||||
*/
|
||||
TEST request_transfer_test(void) {
|
||||
plasma_mock *local_mock = init_plasma_mock(NULL);
|
||||
|
@ -128,7 +128,7 @@ TEST request_transfer_test(void) {
|
|||
event_loop_run(local_mock->loop);
|
||||
int read_fd = get_client_sock(remote_mock->read_conn);
|
||||
std::vector<uint8_t> request_data;
|
||||
ARROW_CHECK_OK(plasma::PlasmaReceive(read_fd, MessageType_PlasmaDataRequest,
|
||||
ARROW_CHECK_OK(plasma::PlasmaReceive(read_fd, MessageType::PlasmaDataRequest,
|
||||
&request_data));
|
||||
plasma::ObjectID object_id2;
|
||||
char *address;
|
||||
|
@ -152,7 +152,7 @@ TEST request_transfer_test(void) {
|
|||
* - Buffer a transfer request for the remote managers.
|
||||
* - Start and stop the event loop after a timeout to make sure that we
|
||||
* trigger the timeout on the first manager.
|
||||
* - Expect to see a MessageType_PlasmaDataRequest message on the second remote
|
||||
* - Expect to see a MessageType::PlasmaDataRequest message on the second remote
|
||||
* manager with the correct object ID.
|
||||
*/
|
||||
TEST request_transfer_retry_test(void) {
|
||||
|
@ -180,7 +180,7 @@ TEST request_transfer_retry_test(void) {
|
|||
|
||||
int read_fd = get_client_sock(remote_mock2->read_conn);
|
||||
std::vector<uint8_t> request_data;
|
||||
ARROW_CHECK_OK(plasma::PlasmaReceive(read_fd, MessageType_PlasmaDataRequest,
|
||||
ARROW_CHECK_OK(plasma::PlasmaReceive(read_fd, MessageType::PlasmaDataRequest,
|
||||
&request_data));
|
||||
plasma::ObjectID object_id2;
|
||||
char *address;
|
||||
|
@ -211,7 +211,7 @@ TEST read_write_object_chunk_test(void) {
|
|||
const int data_size = strlen(data) + 1;
|
||||
const int metadata_size = 0;
|
||||
PlasmaRequestBuffer remote_buf;
|
||||
remote_buf.type = MessageType_PlasmaDataReply;
|
||||
remote_buf.type = MessageType::PlasmaDataReply;
|
||||
remote_buf.object_id = object_id;
|
||||
remote_buf.data = (uint8_t *) data;
|
||||
remote_buf.data_size = data_size;
|
||||
|
|
30
thirdparty/scripts/arrow-zero-fill.patch
vendored
Normal file
30
thirdparty/scripts/arrow-zero-fill.patch
vendored
Normal file
|
@ -0,0 +1,30 @@
|
|||
diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc
|
||||
index 34bd600e..3ef32090 100644
|
||||
--- a/cpp/src/arrow/memory_pool.cc
|
||||
+++ b/cpp/src/arrow/memory_pool.cc
|
||||
@@ -53,6 +53,7 @@ Status AllocateAligned(int64_t size, uint8_t** out) {
|
||||
ss << "malloc of size " << size << " failed";
|
||||
return Status::OutOfMemory(ss.str());
|
||||
}
|
||||
+ memset(*out, 0, size);
|
||||
#elif defined(ARROW_JEMALLOC)
|
||||
*out = reinterpret_cast<uint8_t*>(mallocx(
|
||||
std::max(static_cast<size_t>(size), kAlignment), MALLOCX_ALIGN(kAlignment)));
|
||||
@@ -75,6 +76,7 @@ Status AllocateAligned(int64_t size, uint8_t** out) {
|
||||
ss << "invalid alignment parameter: " << kAlignment;
|
||||
return Status::Invalid(ss.str());
|
||||
}
|
||||
+ memset(*out, 0, size);
|
||||
#endif
|
||||
return Status::OK();
|
||||
}
|
||||
@@ -124,6 +126,9 @@ class DefaultMemoryPool : public MemoryPool {
|
||||
DCHECK(out);
|
||||
// Copy contents and release old memory chunk
|
||||
memcpy(out, *ptr, static_cast<size_t>(std::min(new_size, old_size)));
|
||||
+ if (new_size > old_size) {
|
||||
+ memset(out + old_size, 0, static_cast<size_t>(new_size - old_size));
|
||||
+ }
|
||||
#ifdef _MSC_VER
|
||||
_aligned_free(*ptr);
|
||||
#else
|
6
thirdparty/scripts/build_arrow.sh
vendored
6
thirdparty/scripts/build_arrow.sh
vendored
|
@ -68,10 +68,12 @@ if [[ ! -d $TP_DIR/../python/ray/pyarrow_files/pyarrow || \
|
|||
|
||||
pushd $TP_DIR/build/arrow
|
||||
git fetch origin master
|
||||
# The PR for this commit is https://github.com/apache/arrow/pull/2065. We
|
||||
# The PR for this commit is https://github.com/apache/arrow/pull/2201. We
|
||||
# include the link here to make it easier to find the right commit because
|
||||
# Arrow often rewrites git history and invalidates certain commits.
|
||||
git checkout ce23c06469de9cf0c3e38e35cdb8d135f341b964
|
||||
git checkout d5d39f770047d671e4879369dd680c69afc370c3
|
||||
|
||||
git apply $TP_DIR/scripts/arrow-zero-fill.patch
|
||||
|
||||
cd cpp
|
||||
if [ ! -d "build" ]; then
|
||||
|
|
Loading…
Add table
Reference in a new issue