mirror of
https://github.com/vale981/ray
synced 2025-03-08 19:41:38 -05:00
Fix bug with reused file descriptors (#471)
* Fix bug with reused file descriptors * Remove client connection if write_object_chunk fails * Handle ECONNRESET on unsuccessful write * lint * Back to lowercase * fix compilation * fix linting
This commit is contained in:
parent
2bbfc5da8d
commit
e50a23b820
6 changed files with 53 additions and 29 deletions
|
@ -20,7 +20,7 @@ void event_loop_destroy(event_loop *loop) {
|
|||
aeDeleteEventLoop(loop);
|
||||
}
|
||||
|
||||
void event_loop_add_file(event_loop *loop,
|
||||
bool event_loop_add_file(event_loop *loop,
|
||||
int fd,
|
||||
int events,
|
||||
event_loop_file_handler handler,
|
||||
|
@ -30,11 +30,13 @@ void event_loop_add_file(event_loop *loop,
|
|||
/* If it cannot be added, increase the size of the event loop. */
|
||||
if (err == AE_ERR && errno == ERANGE) {
|
||||
err = aeResizeSetSize(loop, 3 * aeGetSetSize(loop) / 2);
|
||||
CHECK(err == AE_OK);
|
||||
if (err != AE_OK) {
|
||||
return false;
|
||||
}
|
||||
err = aeCreateFileEvent(loop, fd, events, handler, context);
|
||||
}
|
||||
/* In any case, test if there were errors. */
|
||||
CHECK(err == AE_OK);
|
||||
return (err == AE_OK);
|
||||
}
|
||||
|
||||
void event_loop_remove_file(event_loop *loop, int fd) {
|
||||
|
|
|
@ -60,7 +60,7 @@ void event_loop_destroy(event_loop *loop);
|
|||
* argument to the handler. Currently there can only be one handler per file.
|
||||
* The events parameter specifies which events we listen to: EVENT_LOOP_READ
|
||||
* or EVENT_LOOP_WRITE. */
|
||||
void event_loop_add_file(event_loop *loop,
|
||||
bool event_loop_add_file(event_loop *loop,
|
||||
int fd,
|
||||
int events,
|
||||
event_loop_file_handler handler,
|
||||
|
|
|
@ -7,16 +7,17 @@
|
|||
|
||||
#include "plasma_protocol.h"
|
||||
|
||||
bool warn_if_sigpipe(int status, int client_sock) {
|
||||
int warn_if_sigpipe(int status, int client_sock) {
|
||||
if (status >= 0) {
|
||||
return false;
|
||||
return 0;
|
||||
}
|
||||
if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) {
|
||||
LOG_WARN(
|
||||
"Received SIGPIPE or BAD FILE DESCRIPTOR when sending a message to "
|
||||
"client on fd %d. The client on the other end may have hung up.",
|
||||
"Received SIGPIPE, BAD FILE DESCRIPTOR, or ECONNRESET when "
|
||||
"sending a message to client on fd %d. The client on the other end may "
|
||||
"have hung up.",
|
||||
client_sock);
|
||||
return true;
|
||||
return errno;
|
||||
}
|
||||
LOG_FATAL("Failed to write message to client on fd %d.", client_sock);
|
||||
}
|
||||
|
|
|
@ -148,15 +148,16 @@ ObjectTableEntry *get_object_table_entry(PlasmaStoreInfo *store_info,
|
|||
* situations where the store writes to a client file descriptor, and the client
|
||||
* may already have disconnected. If we have processed the disconnection and
|
||||
* closed the file descriptor, we should get a BAD FILE DESCRIPTOR error. If we
|
||||
* have not, then we should get a SIGPIPE.
|
||||
* have not, then we should get a SIGPIPE. If we write to a TCP socket that
|
||||
* isn't connected yet, then we should get an ECONNRESET.
|
||||
*
|
||||
* @param status The status to check. If it is less less than zero, we will
|
||||
* print a warning.
|
||||
* @param client_sock The client socket. This is just used to print some extra
|
||||
* information.
|
||||
* @return Void.
|
||||
* @return The errno set.
|
||||
*/
|
||||
bool warn_if_sigpipe(int status, int client_sock);
|
||||
int warn_if_sigpipe(int status, int client_sock);
|
||||
|
||||
uint8_t *create_object_info_buffer(ObjectInfoT *object_info);
|
||||
|
||||
|
|
|
@ -598,7 +598,7 @@ void process_message(event_loop *loop,
|
|||
void *context,
|
||||
int events);
|
||||
|
||||
void write_object_chunk(ClientConnection *conn, PlasmaRequestBuffer *buf) {
|
||||
int write_object_chunk(ClientConnection *conn, PlasmaRequestBuffer *buf) {
|
||||
LOG_DEBUG("Writing data to fd %d", conn->fd);
|
||||
ssize_t r, s;
|
||||
/* Try to write one BUFSIZE at a time. */
|
||||
|
@ -608,12 +608,11 @@ void write_object_chunk(ClientConnection *conn, PlasmaRequestBuffer *buf) {
|
|||
r = write(conn->fd, buf->data + conn->cursor, s);
|
||||
|
||||
if (r != s) {
|
||||
LOG_ERROR("write failed, errno was %d", errno);
|
||||
if (r > 0) {
|
||||
LOG_ERROR("partial write on fd %d", conn->fd);
|
||||
} else {
|
||||
/* TODO(swang): This should not be a fatal error, since connections can
|
||||
* close at any time. */
|
||||
LOG_FATAL("write error");
|
||||
return errno;
|
||||
}
|
||||
} else {
|
||||
conn->cursor += r;
|
||||
|
@ -626,6 +625,8 @@ void write_object_chunk(ClientConnection *conn, PlasmaRequestBuffer *buf) {
|
|||
* plasma_get occurred in process_transfer_request. */
|
||||
plasma_release(conn->manager_state->plasma_conn, buf->object_id);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void send_queued_request(event_loop *loop,
|
||||
|
@ -644,10 +645,10 @@ void send_queued_request(event_loop *loop,
|
|||
}
|
||||
|
||||
PlasmaRequestBuffer *buf = conn->transfer_queue;
|
||||
bool sigpipe = false;
|
||||
int err = 0;
|
||||
switch (buf->type) {
|
||||
case MessageType_PlasmaDataRequest:
|
||||
sigpipe = warn_if_sigpipe(
|
||||
err = warn_if_sigpipe(
|
||||
plasma_send_DataRequest(conn->fd, state->builder, buf->object_id,
|
||||
state->addr, state->port),
|
||||
conn->fd);
|
||||
|
@ -657,19 +658,28 @@ void send_queued_request(event_loop *loop,
|
|||
if (conn->cursor == 0) {
|
||||
/* If the cursor is zero, we haven't sent any requests for this object
|
||||
* yet, so send the initial data request. */
|
||||
sigpipe = warn_if_sigpipe(
|
||||
err = warn_if_sigpipe(
|
||||
plasma_send_DataReply(conn->fd, state->builder, buf->object_id,
|
||||
buf->data_size, buf->metadata_size),
|
||||
conn->fd);
|
||||
}
|
||||
write_object_chunk(conn, buf);
|
||||
if (err == 0) {
|
||||
err = write_object_chunk(conn, buf);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
LOG_FATAL("Buffered request has unknown type.");
|
||||
}
|
||||
|
||||
/* If there was a SIGPIPE, stop sending to this manager. */
|
||||
if (sigpipe) {
|
||||
if (err != 0) {
|
||||
/* If there was an ECONNRESET, this means that we haven't finished
|
||||
* connecting to this manager yet. Resend the request when the socket is
|
||||
* ready for a write again. */
|
||||
if (err == ECONNRESET) {
|
||||
return;
|
||||
}
|
||||
event_loop_remove_file(loop, conn->fd);
|
||||
ClientConnection_free(conn);
|
||||
return;
|
||||
}
|
||||
|
@ -827,8 +837,12 @@ void process_transfer_request(event_loop *loop,
|
|||
/* If we already have a connection to this manager and its inactive,
|
||||
* (re)register it with the event loop again. */
|
||||
if (manager_conn->transfer_queue == NULL) {
|
||||
event_loop_add_file(loop, manager_conn->fd, EVENT_LOOP_WRITE,
|
||||
send_queued_request, manager_conn);
|
||||
bool success = event_loop_add_file(loop, manager_conn->fd, EVENT_LOOP_WRITE,
|
||||
send_queued_request, manager_conn);
|
||||
if (!success) {
|
||||
ClientConnection_free(manager_conn);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
DCHECK(object_buffer.metadata ==
|
||||
|
@ -891,8 +905,11 @@ void process_data_request(event_loop *loop,
|
|||
* other requests. */
|
||||
event_loop_remove_file(loop, client_sock);
|
||||
if (error_code == PlasmaError_OK) {
|
||||
event_loop_add_file(loop, client_sock, EVENT_LOOP_READ, process_data_chunk,
|
||||
conn);
|
||||
bool success = event_loop_add_file(loop, client_sock, EVENT_LOOP_READ,
|
||||
process_data_chunk, conn);
|
||||
if (!success) {
|
||||
ClientConnection_free(conn);
|
||||
}
|
||||
} else {
|
||||
/* Since plasma_create() has failed, we ignore the data transfer. We will
|
||||
* receive this transfer in g_ignore_buf and then drop it. Allocate memory
|
||||
|
@ -900,8 +917,11 @@ void process_data_request(event_loop *loop,
|
|||
* buf/g_ignore_buf will be freed in ignore_data_chunkc(). */
|
||||
conn->ignore_buffer = buf;
|
||||
buf->data = (uint8_t *) malloc(buf->data_size + buf->metadata_size);
|
||||
event_loop_add_file(loop, client_sock, EVENT_LOOP_READ, ignore_data_chunk,
|
||||
conn);
|
||||
bool success = event_loop_add_file(loop, client_sock, EVENT_LOOP_READ,
|
||||
ignore_data_chunk, conn);
|
||||
if (!success) {
|
||||
ClientConnection_free(conn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -255,9 +255,9 @@ int read_object_chunk(ClientConnection *conn, PlasmaRequestBuffer *buf);
|
|||
*
|
||||
* @param conn The connection to the client who's receiving the data.
|
||||
* @param buf The buffer to read data from.
|
||||
* @return Void.
|
||||
* @return The errno set, if the write wasn't successful.
|
||||
*/
|
||||
void write_object_chunk(ClientConnection *conn, PlasmaRequestBuffer *buf);
|
||||
int write_object_chunk(ClientConnection *conn, PlasmaRequestBuffer *buf);
|
||||
|
||||
/**
|
||||
* Get the event loop of the given plasma manager state.
|
||||
|
|
Loading…
Add table
Reference in a new issue