diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index dc582a0c4..5493c58c6 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -421,12 +421,11 @@ class Worker(object): size = size + 4096 * 4 + 8 # The last 8 bytes are for the metadata offset. This is temporary. try: buff = self.plasma_client.create(objectid.id(), size, bytearray(schema)) - except RuntimeError as e: - if e.args != ("an object with this ID could not be created",): - raise + except plasma.plasma_object_exists_error as e: # The object already exists in the object store, so there is no need to # add it again. TODO(rkn): We need to compare the hashes and make sure - # that the objects are in fact the same. + # that the objects are in fact the same. We also should return an error + # code to the caller instead of printing a message. print("This object already exists in the object store.") return data = np.frombuffer(buff.buffer, dtype="byte")[8:] diff --git a/src/plasma/eviction_policy.c b/src/plasma/eviction_policy.c index deb8e4043..ec9af2c1f 100644 --- a/src/plasma/eviction_policy.c +++ b/src/plasma/eviction_policy.c @@ -154,7 +154,7 @@ void object_created(eviction_state *eviction_state, add_object_to_lru_cache(eviction_state, obj_id); } -void require_space(eviction_state *eviction_state, +bool require_space(eviction_state *eviction_state, plasma_store_info *plasma_store_info, int64_t size, int64_t *num_objects_to_evict, @@ -162,13 +162,14 @@ void require_space(eviction_state *eviction_state, /* Check if there is enough space to create the object. */ int64_t required_space = eviction_state->memory_used + size - plasma_store_info->memory_capacity; + int64_t num_bytes_evicted; if (required_space > 0) { /* Try to free up at least as much space as we need right now but ideally * up to 20% of the total capacity. */ int64_t space_to_free = MAX(size, plasma_store_info->memory_capacity / 5); LOG_DEBUG("not enough space to create this object, so evicting objects"); /* Choose some objects to evict, and update the return pointers. */ - int64_t num_bytes_evicted = choose_objects_to_evict( + num_bytes_evicted = choose_objects_to_evict( eviction_state, plasma_store_info, space_to_free, num_objects_to_evict, objects_to_evict); printf("Evicted %" PRId64 " bytes.\n", num_bytes_evicted); @@ -176,12 +177,17 @@ void require_space(eviction_state *eviction_state, "There is not enough space to create this object, so evicting " "%" PRId64 " objects to free up %" PRId64 " bytes.\n", *num_objects_to_evict, num_bytes_evicted); - CHECK(num_bytes_evicted >= required_space); } else { + num_bytes_evicted = 0; *num_objects_to_evict = 0; *objects_to_evict = NULL; } - eviction_state->memory_used += size; + if (num_bytes_evicted >= required_space) { + /* We only increment the space used if there is enough space to create the + * object. */ + eviction_state->memory_used += size; + } + return num_bytes_evicted >= required_space; } void begin_object_access(eviction_state *eviction_state, diff --git a/src/plasma/eviction_policy.h b/src/plasma/eviction_policy.h index cc6497271..09e17e768 100644 --- a/src/plasma/eviction_policy.h +++ b/src/plasma/eviction_policy.h @@ -63,9 +63,9 @@ void object_created(eviction_state *eviction_state, * stored at this address. If the number of objects chosen is greater * than 0, then the caller needs to free that array. If it equals 0, then * the array will be NULL. - * @return Void. + * @return True if enough space can be freed and false otherwise. */ -void require_space(eviction_state *eviction_state, +bool require_space(eviction_state *eviction_state, plasma_store_info *plasma_store_info, int64_t size, int64_t *num_objects_to_evict, diff --git a/src/plasma/format/plasma.fbs b/src/plasma/format/plasma.fbs index 1e7120550..91ff13f29 100644 --- a/src/plasma/format/plasma.fbs +++ b/src/plasma/format/plasma.fbs @@ -51,7 +51,9 @@ enum PlasmaError:int { // Trying to create an object that already exists. ObjectExists, // Trying to access an object that doesn't exist. - ObjectNonexistent + ObjectNonexistent, + // Trying to create an object but there isn't enough space in the store. + OutOfMemory } // Plasma store messages diff --git a/src/plasma/plasma/plasma.py b/src/plasma/plasma/plasma.py index f65b088cc..1dfbe9557 100644 --- a/src/plasma/plasma/plasma.py +++ b/src/plasma/plasma/plasma.py @@ -9,6 +9,8 @@ import sys import time from . import libplasma +from .libplasma import plasma_object_exists_error +from .libplasma import plasma_out_of_memory_error PLASMA_ID_SIZE = 20 PLASMA_WAIT_TIMEOUT = 2 ** 30 @@ -128,7 +130,12 @@ class PlasmaClient(object): wishes to encode. Raises: - Exception: An exception is raised if the object could not be created. + plasma_object_exists_error: This exception is raised if the object could + not be created because there already is an object with the same ID in + the plasma store. + plasma_out_of_memory_error: This exception is raised if the object could + not be created because the plasma store is unable to evict enough + objects to create room for it. """ # Turn the metadata into the right type. metadata = bytearray(b"") if metadata is None else metadata diff --git a/src/plasma/plasma_client.c b/src/plasma/plasma_client.c index ada0d7df0..04268a346 100644 --- a/src/plasma/plasma_client.c +++ b/src/plasma/plasma_client.c @@ -216,12 +216,12 @@ void increment_object_count(plasma_connection *conn, object_entry->count += 1; } -bool plasma_create(plasma_connection *conn, - object_id obj_id, - int64_t data_size, - uint8_t *metadata, - int64_t metadata_size, - uint8_t **data) { +int plasma_create(plasma_connection *conn, + object_id obj_id, + int64_t data_size, + uint8_t *metadata, + int64_t metadata_size, + uint8_t **data) { LOG_DEBUG("called plasma_create on conn %d with size %" PRId64 " and metadata size %" PRId64, conn->store_conn, data_size, metadata_size); @@ -234,10 +234,14 @@ bool plasma_create(plasma_connection *conn, plasma_object object; plasma_read_CreateReply(reply_data, &id, &object, &error); free(reply_data); - if (error == PlasmaError_ObjectExists) { + if (error != PlasmaError_OK) { LOG_DEBUG("returned from plasma_create with error %d", error); - return false; + CHECK(error == PlasmaError_OutOfMemory || + error == PlasmaError_ObjectExists); + return error; } + /* If the CreateReply included an error, then the store will not send a file + * descriptor. */ int fd = recv_fd(conn->store_conn); CHECKM(fd >= 0, "recv not successful"); CHECK(object.data_size == data_size); @@ -263,7 +267,7 @@ bool plasma_create(plasma_connection *conn, * returned by plasma_create goes out of scope, the object does not get * released before the call to plasma_seal happens. */ increment_object_count(conn, obj_id, &object, false); - return true; + return PlasmaError_OK; } /* This method is used to get both the data and the metadata. */ diff --git a/src/plasma/plasma_client.h b/src/plasma/plasma_client.h index 0e446cd5f..d54a2c942 100644 --- a/src/plasma/plasma_client.h +++ b/src/plasma/plasma_client.h @@ -80,15 +80,21 @@ int plasma_manager_connect(const char *addr, int port); * @param metadata_size The size in bytes of the metadata. If there is no metadata, this should be 0. * @param data The address of the newly created object will be written here. - * @return True, if object was created, false, otherwise (e.g., if object has - * been already created). + * @return One of the following error codes: + * - PlasmaError_OK, if the object was created successfully. + * - PlasmaError_ObjectExists, if an object with this ID is already + * present in the store. In this case, the client should not call + * plasma_release. + * - PlasmaError_OutOfMemory, if the store is out of memory and cannot + * create the object. In this case, the client should not call + * plasma_release. */ -bool plasma_create(plasma_connection *conn, - object_id object_id, - int64_t size, - uint8_t *metadata, - int64_t metadata_size, - uint8_t **data); +int plasma_create(plasma_connection *conn, + object_id object_id, + int64_t size, + uint8_t *metadata, + int64_t metadata_size, + uint8_t **data); /** * Get an object from the Plasma Store. This function will block until the diff --git a/src/plasma/plasma_extension.c b/src/plasma/plasma_extension.c index c0253d0f0..74800671f 100644 --- a/src/plasma/plasma_extension.c +++ b/src/plasma/plasma_extension.c @@ -7,6 +7,9 @@ #include "plasma_client.h" #include "object_info.h" +PyObject *PlasmaOutOfMemoryError; +PyObject *PlasmaObjectExistsError; + static int PyObjectToPlasmaConnection(PyObject *object, plasma_connection **conn) { if (PyCapsule_IsValid(object, "plasma")) { @@ -69,14 +72,22 @@ PyObject *PyPlasma_create(PyObject *self, PyObject *args) { return NULL; } uint8_t *data; - bool created = plasma_create(conn, object_id, size, - (uint8_t *) PyByteArray_AsString(metadata), - PyByteArray_Size(metadata), &data); - if (!created) { - PyErr_SetString(PyExc_RuntimeError, - "an object with this ID could not be created"); + int error_code = plasma_create(conn, object_id, size, + (uint8_t *) PyByteArray_AsString(metadata), + PyByteArray_Size(metadata), &data); + if (error_code == PlasmaError_ObjectExists) { + PyErr_SetString(PlasmaObjectExistsError, + "An object with this ID already exists in the plasma " + "store."); return NULL; } + if (error_code == PlasmaError_OutOfMemory) { + PyErr_SetString(PlasmaOutOfMemoryError, + "The plasma store ran out of memory and could not create " + "this object."); + return NULL; + } + CHECK(error_code == PlasmaError_OK); #if PY_MAJOR_VERSION >= 3 return PyMemoryView_FromMemory((void *) data, (Py_ssize_t) size, PyBUF_WRITE); @@ -422,6 +433,19 @@ MOD_INIT(libplasma) { "A Python client library for plasma."); #endif + /* Create a custom exception for when an object ID is reused. */ + char plasma_object_exists_error[] = "plasma_object_exists.error"; + PlasmaObjectExistsError = + PyErr_NewException(plasma_object_exists_error, NULL, NULL); + Py_INCREF(PlasmaObjectExistsError); + PyModule_AddObject(m, "plasma_object_exists_error", PlasmaObjectExistsError); + /* Create a custom exception for when the plasma store is out of memory. */ + char plasma_out_of_memory_error[] = "plasma_out_of_memory.error"; + PlasmaOutOfMemoryError = + PyErr_NewException(plasma_out_of_memory_error, NULL, NULL); + Py_INCREF(PlasmaOutOfMemoryError); + PyModule_AddObject(m, "plasma_out_of_memory_error", PlasmaOutOfMemoryError); + #if PY_MAJOR_VERSION >= 3 return m; #endif diff --git a/src/plasma/plasma_manager.c b/src/plasma/plasma_manager.c index 33b25a650..876ec489a 100644 --- a/src/plasma/plasma_manager.c +++ b/src/plasma/plasma_manager.c @@ -792,13 +792,12 @@ void process_data_request(event_loop *loop, /* The corresponding call to plasma_release should happen in * process_data_chunk. */ - bool success_create = - plasma_create(conn->manager_state->plasma_conn, object_id, data_size, - NULL, metadata_size, &(buf->data)); + int error_code = plasma_create(conn->manager_state->plasma_conn, object_id, + data_size, NULL, metadata_size, &(buf->data)); /* If success_create == true, a new object has been created. * If success_create == false the object creation has failed, possibly * due to an object with the same ID already existing in the Plasma Store. */ - if (success_create) { + if (error_code == PlasmaError_OK) { /* Add buffer where the fetched data is to be stored to * conn->transfer_queue. */ LL_APPEND(conn->transfer_queue, buf); @@ -808,7 +807,7 @@ void process_data_request(event_loop *loop, /* Switch to reading the data from this socket, instead of listening for * other requests. */ event_loop_remove_file(loop, client_sock); - if (success_create) { + if (error_code == PlasmaError_OK) { event_loop_add_file(loop, client_sock, EVENT_LOOP_READ, process_data_chunk, conn); } else { diff --git a/src/plasma/plasma_store.c b/src/plasma/plasma_store.c index ccddacfd7..d8f3e90f2 100644 --- a/src/plasma/plasma_store.c +++ b/src/plasma/plasma_store.c @@ -139,11 +139,11 @@ void add_client_to_object_clients(object_table_entry *entry, } /* Create a new object buffer in the hash table. */ -bool create_object(client *client_context, - object_id obj_id, - int64_t data_size, - int64_t metadata_size, - plasma_object *result) { +int create_object(client *client_context, + object_id obj_id, + int64_t data_size, + int64_t metadata_size, + plasma_object *result) { LOG_DEBUG("creating object"); /* TODO(pcm): add object_id here */ plasma_store_state *plasma_state = client_context->plasma_state; object_table_entry *entry; @@ -153,15 +153,20 @@ bool create_object(client *client_context, if (entry != NULL) { /* There is already an object with the same ID in the Plasma Store, so * ignore this requst. */ - return false; + return PlasmaError_ObjectExists; } /* Tell the eviction policy how much space we need to create this object. */ int64_t num_objects_to_evict; object_id *objects_to_evict; - require_space(plasma_state->eviction_state, plasma_state->plasma_store_info, - data_size + metadata_size, &num_objects_to_evict, - &objects_to_evict); + bool success = require_space( + plasma_state->eviction_state, plasma_state->plasma_store_info, + data_size + metadata_size, &num_objects_to_evict, &objects_to_evict); remove_objects(plasma_state, num_objects_to_evict, objects_to_evict); + /* Return an error to the client if not enough space could be freed to create + * the object. */ + if (!success) { + return PlasmaError_OutOfMemory; + } /* Allocate space for the new object */ uint8_t *pointer = dlmalloc(data_size + metadata_size); int fd; @@ -198,7 +203,7 @@ bool create_object(client *client_context, obj_id); /* Record that this client is using this object. */ add_client_to_object_clients(entry, client_context); - return true; + return PlasmaError_OK; } /* Get an object from the hash table. */ @@ -506,15 +511,11 @@ void process_message(event_loop *loop, int64_t metadata_size; plasma_read_CreateRequest(input, &object_ids[0], &data_size, &metadata_size); - if (create_object(client_context, object_ids[0], data_size, metadata_size, - &objects[0])) { - error = PlasmaError_OK; - } else { - error = PlasmaError_ObjectExists; - } + int error_code = create_object(client_context, object_ids[0], data_size, + metadata_size, &objects[0]); CHECK(plasma_send_CreateReply(client_sock, state->builder, object_ids[0], - &objects[0], error) >= 0); - if (error == PlasmaError_OK) { + &objects[0], error_code) >= 0); + if (error_code == PlasmaError_OK) { CHECK(send_fd(client_sock, objects[0].handle.store_fd) >= 0); } } break; diff --git a/src/plasma/plasma_store.h b/src/plasma/plasma_store.h index ebd5d757a..4342d03b7 100644 --- a/src/plasma/plasma_store.h +++ b/src/plasma/plasma_store.h @@ -15,13 +15,20 @@ typedef struct plasma_store_state plasma_store_state; * @param object_id Object ID of the object to be created. * @param data_size Size in bytes of the object to be created. * @param metadata_size Size in bytes of the object metadata. - * @return False if the object already exists, otherwise true. + * @return One of the following error codes: + * - PlasmaError_OK, if the object was created successfully. + * - PlasmaError_ObjectExists, if an object with this ID is already + * present in the store. In this case, the client should not call + * plasma_release. + * - PlasmaError_OutOfMemory, if the store is out of memory and cannot + * create the object. In this case, the client should not call + * plasma_release. */ -bool create_object(client *client_context, - object_id object_id, - int64_t data_size, - int64_t metadata_size, - plasma_object *result); +int create_object(client *client_context, + object_id object_id, + int64_t data_size, + int64_t metadata_size, + plasma_object *result); /** * Get an object. This method assumes that we currently have or will eventually diff --git a/src/plasma/test/test.py b/src/plasma/test/test.py index 2513413ad..041ad0994 100644 --- a/src/plasma/test/test.py +++ b/src/plasma/test/test.py @@ -107,8 +107,78 @@ class TestPlasmaClient(unittest.TestCase): self.plasma_client.create(object_id, length, generate_metadata(length)) try: val = self.plasma_client.create(object_id, length, generate_metadata(length)) - except Exception: + except plasma.plasma_object_exists_error as e: pass + else: + self.assertTrue(False) + + def test_store_full(self): + # The store is started with 1GB, so make sure that create throws an + # exception when it is full. + def assert_create_raises_plasma_full(unit_test, size): + partial_size = np.random.randint(size) + try: + _, memory_buffer, _ = create_object(unit_test.plasma_client, partial_size, size - partial_size) + except plasma.plasma_out_of_memory_error as e: + pass + else: + # For some reason the above didn't throw an exception, so fail. + unit_test.assertTrue(False) + + # Create a list to keep some of the buffers in scope. + memory_buffers = [] + _, memory_buffer, _ = create_object(self.plasma_client, 9 * 10 ** 8, 0) + memory_buffers.append(memory_buffer) + # Remaining space is 10 ** 8. Make sure that we can't create an object of + # size 10 ** 8 + 1, but we can create one of size 10 ** 8. + assert_create_raises_plasma_full(self, 10 ** 8 + 1) + _, memory_buffer, _ = create_object(self.plasma_client, 10 ** 8, 0) + del memory_buffer + _, memory_buffer, _ = create_object(self.plasma_client, 10 ** 8, 0) + del memory_buffer + assert_create_raises_plasma_full(self, 10 ** 8 + 1) + + _, memory_buffer, _ = create_object(self.plasma_client, 9 * 10 ** 7, 0) + memory_buffers.append(memory_buffer) + # Remaining space is 10 ** 7. + assert_create_raises_plasma_full(self, 10 ** 7 + 1) + + _, memory_buffer, _ = create_object(self.plasma_client, 9 * 10 ** 6, 0) + memory_buffers.append(memory_buffer) + # Remaining space is 10 ** 6. + assert_create_raises_plasma_full(self, 10 ** 6 + 1) + + _, memory_buffer, _ = create_object(self.plasma_client, 9 * 10 ** 5, 0) + memory_buffers.append(memory_buffer) + # Remaining space is 10 ** 5. + assert_create_raises_plasma_full(self, 10 ** 5 + 1) + + _, memory_buffer, _ = create_object(self.plasma_client, 9 * 10 ** 4, 0) + memory_buffers.append(memory_buffer) + # Remaining space is 10 ** 4. + assert_create_raises_plasma_full(self, 10 ** 4 + 1) + + _, memory_buffer, _ = create_object(self.plasma_client, 9 * 10 ** 3, 0) + memory_buffers.append(memory_buffer) + # Remaining space is 10 ** 3. + assert_create_raises_plasma_full(self, 10 ** 3 + 1) + + _, memory_buffer, _ = create_object(self.plasma_client, 9 * 10 ** 2, 0) + memory_buffers.append(memory_buffer) + # Remaining space is 10 ** 2. + assert_create_raises_plasma_full(self, 10 ** 2 + 1) + + _, memory_buffer, _ = create_object(self.plasma_client, 9 * 10 ** 1, 0) + memory_buffers.append(memory_buffer) + # Remaining space is 10 ** 1. + assert_create_raises_plasma_full(self, 10 ** 1 + 1) + + _, memory_buffer, _ = create_object(self.plasma_client, 9 * 10 ** 0, 0) + memory_buffers.append(memory_buffer) + # Remaining space is 10 ** 0. + assert_create_raises_plasma_full(self, 10 ** 0 + 1) + + _, memory_buffer, _ = create_object(self.plasma_client, 1, 0) def test_contains(self): fake_object_ids = [random_object_id() for _ in range(100)]