Suppress object transfer requests when object is already being received. (#1430)

* added deterministic check for objects received in fetch_timeout_handler.

* use receive time, in case something goes wrong after object is received.

* increase timeout for removal.

* indentation fix.

* make log info log debug. clean up debug log.

* undo unecessary changes.

* changed description var.

* shorten line 949.

* incorporate feedback.

* linting; make is_object_received function consts.

* change semantics of received_objects to objects being received.
added checks to both points at which objects are re-requested.
updated object receive initialization accordingly.

* eliminate erase on receive init. check call to request_transfer_from instead of request_transfer.

* updated comments.

* added todo for multiple object transfers.

* linting.
This commit is contained in:
Melih Elibol 2018-02-01 22:45:31 -08:00 committed by Robert Nishihara
parent ed77a4c415
commit d8850eac4b
2 changed files with 51 additions and 4 deletions

1
.gitignore vendored
View file

@ -131,6 +131,7 @@ build
# Gradle:
.idea/**/gradle.xml
.idea/**/libraries
.idea
# Website
/site/Gemfile.lock

View file

@ -238,6 +238,16 @@ struct PlasmaManagerState {
/** The time (in milliseconds since the Unix epoch) when the most recent
* heartbeat was sent. */
int64_t previous_heartbeat_time;
/** This is the set of ObjectIDs currently being transferred to this manager.
* An ObjectID is added to this set if a shared buffer is
* successfully created for the corresponding object.
* The ObjectID is removed in process_add_object_notification, which is
* triggered by the corresponding notification from the plasma store.
* If an object transfer fails, only the ObjectID of the corresponding
* object is removed. If object transfers between managers is parallelized,
* then all objects being received from a remote manager will need to be
* removed if the connection to the remote manager fails. */
std::unordered_set<ObjectID, UniqueIDHasher> receives_in_progress;
};
PlasmaManagerState *g_manager_state = NULL;
@ -534,6 +544,12 @@ void PlasmaManagerState_free(PlasmaManagerState *state) {
delete state;
}
bool is_receiving_or_received(const PlasmaManagerState *state,
const ObjectID &object_id) {
return state->local_available_objects.count(object_id) > 0 ||
state->receives_in_progress.count(object_id) > 0;
}
event_loop *get_event_loop(PlasmaManagerState *state) {
return state->loop;
}
@ -546,7 +562,6 @@ void process_message(event_loop *loop,
int events);
int write_object_chunk(ClientConnection *conn, PlasmaRequestBuffer *buf) {
LOG_DEBUG("Writing data to fd %d", conn->fd);
ssize_t r, s;
/* Try to write one buf_size at a time. */
s = buf->data_size + buf->metadata_size - conn->cursor;
@ -642,8 +657,6 @@ void send_queued_request(event_loop *loop,
}
int read_object_chunk(ClientConnection *conn, PlasmaRequestBuffer *buf) {
LOG_DEBUG("Reading data from fd %d to %p", conn->fd,
buf->data + conn->cursor);
ssize_t r, s;
CHECK(buf != NULL);
/* Try to read one buf_size at a time. */
@ -680,6 +693,11 @@ void process_data_chunk(event_loop *loop,
int err = read_object_chunk(conn, buf);
auto plasma_conn = conn->manager_state->plasma_conn;
if (err != 0) {
// Remove the object from the receives_in_progress set so that
// retries are processed.
// TODO(hme): Remove all ObjectIDs associated with this manager if we
// allow parallel object transfers.
conn->manager_state->receives_in_progress.erase(buf->object_id);
/* Abort the object that we were trying to read from the remote plasma
* manager. */
ARROW_CHECK_OK(plasma_conn->Release(buf->object_id.to_plasma_id()));
@ -865,6 +883,14 @@ void process_data_request(event_loop *loop,
event_loop_remove_file(loop, client_sock);
event_loop_file_handler data_chunk_handler;
if (s.ok()) {
// Monitor objects that are in progress of being received.
// If a read fails while receiving this object, its
// ObjectID will be removed. If the object is successfully
// received, its ObjectID is removed by process_add_object_notification.
// If a shared buffer for the object cannot be created,
// then the receive is ignored, and the corresponding ObjectID
// is not inserted into receives_in_progress.
conn->manager_state->receives_in_progress.insert(object_id);
buf->data = data->mutable_data();
data_chunk_handler = process_data_chunk;
} else {
@ -946,6 +972,15 @@ int fetch_timeout_handler(event_loop *loop, timer_id id, void *context) {
it != manager_state->fetch_requests.end(); it++) {
FetchRequest *fetch_req = it->second;
if (fetch_req->manager_vector.size() > 0) {
if (is_receiving_or_received(manager_state, fetch_req->object_id)) {
// Do nothing if the object transfer is in progress or if the object
// has already been received.
LOG_DEBUG("fetch_timeout_handler: Object in progress or received. %s",
fetch_req->object_id.hex().c_str());
continue;
}
LOG_DEBUG("fetch_timeout_handler: Object missing. %s",
fetch_req->object_id.hex().c_str());
request_transfer_from(manager_state, fetch_req);
/* If we've tried all of the managers that we know about for this object,
* add this object to the list to resend requests for. */
@ -1005,7 +1040,12 @@ void request_transfer(ObjectID object_id,
fetch_req->next_manager = 0;
/* Wait for the object data for the default number of retries, which timeout
* after a default interval. */
request_transfer_from(manager_state, fetch_req);
if (!is_receiving_or_received(manager_state, object_id)) {
// Request object if it's not already being received,
// or if it has not already been received.
request_transfer_from(manager_state, fetch_req);
}
}
/* This method is only called from the tests. */
@ -1037,6 +1077,7 @@ void object_table_subscribe_callback(ObjectID object_id,
db_client_table_get_ip_addresses(manager_state->db, manager_ids);
/* Run the callback for fetch requests if there is a fetch request. */
auto it = manager_state->fetch_requests.find(object_id);
if (it != manager_state->fetch_requests.end()) {
request_transfer(object_id, managers, context);
}
@ -1315,6 +1356,11 @@ void process_add_object_notification(PlasmaManagerState *state,
int64_t metadata_size,
unsigned char *digest) {
state->local_available_objects.insert(object_id);
if (state->receives_in_progress.count(object_id) > 0) {
// This object is now locally available, so remove it from the
// receives_in_progress set.
state->receives_in_progress.erase(object_id);
}
/* Add this object to the (redis) object table. */
if (state->db) {