From 7a110b9401a4433e89edb99f36827033da8cc5cc Mon Sep 17 00:00:00 2001 From: "Siyuan (Ryans) Zhuang" Date: Mon, 22 Jun 2020 14:24:32 -0700 Subject: [PATCH] [Core] Remove digests in plasma (4x performance improvement) (#8980) * remove digest in plasma * totally remove list --- .../object_manager/format/object_manager.fbs | 3 - src/ray/object_manager/plasma/client.cc | 38 +-------- src/ray/object_manager/plasma/client.h | 15 ---- src/ray/object_manager/plasma/common.fbs | 3 - src/ray/object_manager/plasma/plasma.fbs | 15 ---- src/ray/object_manager/plasma/protocol.cc | 81 +++---------------- src/ray/object_manager/plasma/protocol.h | 26 ++---- src/ray/object_manager/plasma/store.cc | 30 ++----- src/ray/object_manager/plasma/store.h | 5 +- 9 files changed, 26 insertions(+), 190 deletions(-) diff --git a/src/ray/object_manager/format/object_manager.fbs b/src/ray/object_manager/format/object_manager.fbs index 7889e1a97..50ed3d341 100644 --- a/src/ray/object_manager/format/object_manager.fbs +++ b/src/ray/object_manager/format/object_manager.fbs @@ -32,9 +32,6 @@ table ObjectInfo { create_time: long; // How long creation of this object took. construct_duration: long; - // Hash of the object content. If the object is not sealed yet this is - // an empty string. - digest: string; // Specifies if this object was deleted or added. is_deletion: bool; } diff --git a/src/ray/object_manager/plasma/client.cc b/src/ray/object_manager/plasma/client.cc index 6a0d6d3e3..7aa18e807 100644 --- a/src/ray/object_manager/plasma/client.cc +++ b/src/ray/object_manager/plasma/client.cc @@ -261,8 +261,6 @@ class PlasmaClient::Impl : public std::enable_shared_from_this guard(client_mutex_); ARROW_LOG(DEBUG) << "called CreateAndSeal on conn " << store_conn_; - // Compute the object hash. - static unsigned char digest[kDigestSize]; - uint64_t hash = ComputeObjectHashCPU( - reinterpret_cast(data.data()), data.size(), - reinterpret_cast(metadata.data()), metadata.size()); - memcpy(&digest[0], &hash, sizeof(hash)); RETURN_NOT_OK(SendCreateAndSealRequest(store_conn_, object_id, evict_if_full, data, - metadata, digest)); + metadata)); std::vector buffer; RETURN_NOT_OK( PlasmaReceive(store_conn_, MessageType::PlasmaCreateAndSealReply, &buffer)); @@ -532,19 +524,8 @@ Status PlasmaClient::Impl::CreateAndSealBatch(const std::vector& objec ARROW_LOG(DEBUG) << "called CreateAndSealBatch on conn " << store_conn_; - std::vector digests; - for (size_t i = 0; i < object_ids.size(); i++) { - // Compute the object hash. - std::string digest; - uint64_t hash = ComputeObjectHashCPU( - reinterpret_cast(data.data()), data.size(), - reinterpret_cast(metadata.data()), metadata.size()); - digest.assign(reinterpret_cast(&hash), sizeof(hash)); - digests.push_back(digest); - } - RETURN_NOT_OK(SendCreateAndSealBatchRequest(store_conn_, object_ids, evict_if_full, - data, metadata, digests)); + data, metadata)); std::vector buffer; RETURN_NOT_OK( PlasmaReceive(store_conn_, MessageType::PlasmaCreateAndSealBatchReply, &buffer)); @@ -780,14 +761,6 @@ Status PlasmaClient::Impl::Contains(const ObjectID& object_id, bool* has_object) return Status::OK(); } -Status PlasmaClient::Impl::List(ObjectTable* objects) { - std::lock_guard guard(client_mutex_); - RETURN_NOT_OK(SendListRequest(store_conn_)); - std::vector buffer; - RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaListReply, &buffer)); - return ReadListReply(buffer.data(), buffer.size(), objects); -} - static void ComputeBlockHash(const unsigned char* data, int64_t nbytes, uint64_t* hash) { XXH64_state_t hash_state; XXH64_reset(&hash_state, XXH64_DEFAULT_SEED); @@ -876,10 +849,7 @@ Status PlasmaClient::Impl::Seal(const ObjectID& object_id) { object_entry->second->is_sealed = true; /// Send the seal request to Plasma. - std::vector digest(kDigestSize); - RETURN_NOT_OK(Hash(object_id, &digest[0])); - RETURN_NOT_OK( - SendSealRequest(store_conn_, object_id, std::string(digest.begin(), digest.end()))); + RETURN_NOT_OK(SendSealRequest(store_conn_, object_id)); std::vector buffer; RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaSealReply, &buffer)); ObjectID sealed_id; @@ -1200,8 +1170,6 @@ Status PlasmaClient::Contains(const ObjectID& object_id, bool* has_object) { return impl_->Contains(object_id, has_object); } -Status PlasmaClient::List(ObjectTable* objects) { return impl_->List(objects); } - Status PlasmaClient::Abort(const ObjectID& object_id) { return impl_->Abort(object_id); } Status PlasmaClient::Seal(const ObjectID& object_id) { return impl_->Seal(object_id); } diff --git a/src/ray/object_manager/plasma/client.h b/src/ray/object_manager/plasma/client.h index 27c57d636..64025b730 100644 --- a/src/ray/object_manager/plasma/client.h +++ b/src/ray/object_manager/plasma/client.h @@ -178,21 +178,6 @@ class ARROW_EXPORT PlasmaClient { /// \return The return status. Status Contains(const ObjectID& object_id, bool* has_object); - /// List all the objects in the object store. - /// - /// This API is experimental and might change in the future. - /// - /// \param[out] objects ObjectTable of objects in the store. For each entry - /// in the map, the following fields are available: - /// - metadata_size: Size of the object metadata in bytes - /// - data_size: Size of the object data in bytes - /// - ref_count: Number of clients referencing the object buffer - /// - create_time: Unix timestamp of the object creation - /// - construct_duration: Object creation time in seconds - /// - state: Is the object still being created or already sealed? - /// \return The return status. - Status List(ObjectTable* objects); - /// Abort an unsealed object in the object store. If the abort succeeds, then /// it will be as if the object was never created at all. The unsealed object /// must have only a single reference (the one that would have been removed by diff --git a/src/ray/object_manager/plasma/common.fbs b/src/ray/object_manager/plasma/common.fbs index 818827a7e..299282774 100644 --- a/src/ray/object_manager/plasma/common.fbs +++ b/src/ray/object_manager/plasma/common.fbs @@ -31,9 +31,6 @@ table ObjectInfo { create_time: long; // How long creation of this object took. construct_duration: long; - // Hash of the object content. If the object is not sealed yet this is - // an empty string. - digest: string; // Specifies if this object was deleted or added. is_deletion: bool; } diff --git a/src/ray/object_manager/plasma/plasma.fbs b/src/ray/object_manager/plasma/plasma.fbs index 40556b45a..10475364b 100644 --- a/src/ray/object_manager/plasma/plasma.fbs +++ b/src/ray/object_manager/plasma/plasma.fbs @@ -45,9 +45,6 @@ enum MessageType:long { // See if the store contains an object (will be deprecated). PlasmaContainsRequest, PlasmaContainsReply, - // List all objects in the store. - PlasmaListRequest, - PlasmaListReply, // Get information for a newly connecting client. PlasmaConnectRequest, PlasmaConnectReply, @@ -179,8 +176,6 @@ table PlasmaCreateAndSealRequest { data: string; // The object's metadata. metadata: string; - // Hash of the object data. - digest: string; } table PlasmaCreateAndSealReply { @@ -194,7 +189,6 @@ table PlasmaCreateAndSealBatchRequest { evict_if_full: bool; data: [string]; metadata: [string]; - digest: [string]; } table PlasmaCreateAndSealBatchReply { @@ -215,8 +209,6 @@ table PlasmaAbortReply { table PlasmaSealRequest { // ID of the object to be sealed. object_id: string; - // Hash of the object data. - digest: string; } table PlasmaSealReply { @@ -294,13 +286,6 @@ table PlasmaContainsReply { has_object: int; } -table PlasmaListRequest { -} - -table PlasmaListReply { - objects: [ObjectInfo]; -} - // PlasmaConnect is used by a plasma client the first time it connects with the // store. This is not really necessary, but is used to get some information // about the store such as its memory capacity. diff --git a/src/ray/object_manager/plasma/protocol.cc b/src/ray/object_manager/plasma/protocol.cc index 94c79ebb9..c308e7b77 100644 --- a/src/ray/object_manager/plasma/protocol.cc +++ b/src/ray/object_manager/plasma/protocol.cc @@ -266,19 +266,17 @@ Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id, } Status SendCreateAndSealRequest(int sock, const ObjectID& object_id, bool evict_if_full, - const std::string& data, const std::string& metadata, - unsigned char* digest) { + const std::string& data, const std::string& metadata) { flatbuffers::FlatBufferBuilder fbb; - auto digest_string = fbb.CreateString(reinterpret_cast(digest), kDigestSize); auto message = fb::CreatePlasmaCreateAndSealRequest( fbb, fbb.CreateString(object_id.Binary()), evict_if_full, fbb.CreateString(data), - fbb.CreateString(metadata), digest_string); + fbb.CreateString(metadata)); return PlasmaSend(sock, MessageType::PlasmaCreateAndSealRequest, &fbb, message); } Status ReadCreateAndSealRequest(uint8_t* data, size_t size, ObjectID* object_id, bool* evict_if_full, std::string* object_data, - std::string* metadata, std::string* digest) { + std::string* metadata) { DCHECK(data); auto message = flatbuffers::GetRoot(data); DCHECK(VerifyFlatbuffer(message, data, size)); @@ -287,22 +285,18 @@ Status ReadCreateAndSealRequest(uint8_t* data, size_t size, ObjectID* object_id, *evict_if_full = message->evict_if_full(); *object_data = message->data()->str(); *metadata = message->metadata()->str(); - ARROW_CHECK(message->digest()->size() == kDigestSize); - digest->assign(message->digest()->data(), kDigestSize); return Status::OK(); } Status SendCreateAndSealBatchRequest(int sock, const std::vector& object_ids, bool evict_if_full, const std::vector& data, - const std::vector& metadata, - const std::vector& digests) { + const std::vector& metadata) { flatbuffers::FlatBufferBuilder fbb; auto message = fb::CreatePlasmaCreateAndSealBatchRequest( fbb, ToFlatbuffer(&fbb, object_ids.data(), object_ids.size()), evict_if_full, - ToFlatbuffer(&fbb, data), ToFlatbuffer(&fbb, metadata), - ToFlatbuffer(&fbb, digests)); + ToFlatbuffer(&fbb, data), ToFlatbuffer(&fbb, metadata)); return PlasmaSend(sock, MessageType::PlasmaCreateAndSealBatchRequest, &fbb, message); } @@ -311,8 +305,7 @@ Status ReadCreateAndSealBatchRequest(uint8_t* data, size_t size, std::vector* object_ids, bool* evict_if_full, std::vector* object_data, - std::vector* metadata, - std::vector* digests) { + std::vector* metadata) { DCHECK(data); auto message = flatbuffers::GetRoot(data); DCHECK(VerifyFlatbuffer(message, data, size)); @@ -329,9 +322,6 @@ Status ReadCreateAndSealBatchRequest(uint8_t* data, size_t size, ConvertToVector(message->metadata(), metadata, [](const flatbuffers::String& element) { return element.str(); }); - ConvertToVector(message->digest(), digests, - [](const flatbuffers::String& element) { return element.str(); }); - return Status::OK(); } @@ -392,21 +382,17 @@ Status ReadAbortReply(uint8_t* data, size_t size, ObjectID* object_id) { // Seal messages. -Status SendSealRequest(int sock, ObjectID object_id, const std::string& digest) { +Status SendSealRequest(int sock, ObjectID object_id) { flatbuffers::FlatBufferBuilder fbb; - auto message = fb::CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.Binary()), - fbb.CreateString(digest)); + auto message = fb::CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.Binary())); return PlasmaSend(sock, MessageType::PlasmaSealRequest, &fbb, message); } -Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id, - std::string* digest) { +Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id) { DCHECK(data); auto message = flatbuffers::GetRoot(data); DCHECK(VerifyFlatbuffer(message, data, size)); *object_id = ObjectID::FromBinary(message->object_id()->str()); - ARROW_CHECK_EQ(message->digest()->size(), kDigestSize); - digest->assign(message->digest()->data(), kDigestSize); return Status::OK(); } @@ -545,55 +531,6 @@ Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id, return Status::OK(); } -// List messages. - -Status SendListRequest(int sock) { - flatbuffers::FlatBufferBuilder fbb; - auto message = fb::CreatePlasmaListRequest(fbb); - return PlasmaSend(sock, MessageType::PlasmaListRequest, &fbb, message); -} - -Status ReadListRequest(uint8_t* data, size_t size) { return Status::OK(); } - -Status SendListReply(int sock, const ObjectTable& objects) { - flatbuffers::FlatBufferBuilder fbb; - std::vector> object_infos; - for (auto const& entry : objects) { - auto digest = entry.second->state == ObjectState::PLASMA_CREATED - ? fbb.CreateString("") - : fbb.CreateString(reinterpret_cast(entry.second->digest), - kDigestSize); - auto info = fb::CreateObjectInfo(fbb, fbb.CreateString(entry.first.Binary()), - entry.second->data_size, entry.second->metadata_size, - entry.second->ref_count, entry.second->create_time, - entry.second->construct_duration, digest); - object_infos.push_back(info); - } - auto message = fb::CreatePlasmaListReply( - fbb, fbb.CreateVector(arrow::util::MakeNonNull(object_infos.data()), - object_infos.size())); - return PlasmaSend(sock, MessageType::PlasmaListReply, &fbb, message); -} - -Status ReadListReply(uint8_t* data, size_t size, ObjectTable* objects) { - DCHECK(data); - auto message = flatbuffers::GetRoot(data); - DCHECK(VerifyFlatbuffer(message, data, size)); - for (auto const& object : *message->objects()) { - ObjectID object_id = ObjectID::FromBinary(object->object_id()->str()); - auto entry = std::unique_ptr(new ObjectTableEntry()); - entry->data_size = object->data_size(); - entry->metadata_size = object->metadata_size(); - entry->ref_count = object->ref_count(); - entry->create_time = object->create_time(); - entry->construct_duration = object->construct_duration(); - entry->state = object->digest()->size() == 0 ? ObjectState::PLASMA_CREATED - : ObjectState::PLASMA_SEALED; - (*objects)[object_id] = std::move(entry); - } - return Status::OK(); -} - // Connect messages. Status SendConnectRequest(int sock) { diff --git a/src/ray/object_manager/plasma/protocol.h b/src/ray/object_manager/plasma/protocol.h index d48a509b8..fd010144f 100644 --- a/src/ray/object_manager/plasma/protocol.h +++ b/src/ray/object_manager/plasma/protocol.h @@ -90,25 +90,22 @@ Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id, PlasmaObject* object, int* store_fd, int64_t* mmap_size); Status SendCreateAndSealRequest(int sock, const ObjectID& object_id, bool evict_if_full, - const std::string& data, const std::string& metadata, - unsigned char* digest); + const std::string& data, const std::string& metadata); Status ReadCreateAndSealRequest(uint8_t* data, size_t size, ObjectID* object_id, bool* evict_if_full, std::string* object_data, - std::string* metadata, std::string* digest); + std::string* metadata); Status SendCreateAndSealBatchRequest(int sock, const std::vector& object_ids, bool evict_if_full, const std::vector& data, - const std::vector& metadata, - const std::vector& digests); + const std::vector& metadata); Status ReadCreateAndSealBatchRequest(uint8_t* data, size_t size, std::vector* object_id, bool* evict_if_full, std::vector* object_data, - std::vector* metadata, - std::vector* digests); + std::vector* metadata); Status SendCreateAndSealReply(int sock, PlasmaError error); @@ -128,10 +125,9 @@ Status ReadAbortReply(uint8_t* data, size_t size, ObjectID* object_id); /* Plasma Seal message functions. */ -Status SendSealRequest(int sock, ObjectID object_id, const std::string& digest); +Status SendSealRequest(int sock, ObjectID object_id); -Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id, - std::string* digest); +Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id); Status SendSealReply(int sock, ObjectID object_id, PlasmaError error); @@ -187,16 +183,6 @@ Status SendContainsReply(int sock, ObjectID object_id, bool has_object); Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id, bool* has_object); -/* Plasma List message functions. */ - -Status SendListRequest(int sock); - -Status ReadListRequest(uint8_t* data, size_t size); - -Status SendListReply(int sock, const ObjectTable& objects); - -Status ReadListReply(uint8_t* data, size_t size, ObjectTable* objects); - /* Plasma Connect message functions. */ Status SendConnectRequest(int sock); diff --git a/src/ray/object_manager/plasma/store.cc b/src/ray/object_manager/plasma/store.cc index c06655ee2..d564242c5 100644 --- a/src/ray/object_manager/plasma/store.cc +++ b/src/ray/object_manager/plasma/store.cc @@ -476,7 +476,6 @@ void PlasmaStore::ProcessGetRequest(Client* client, } if (!evicted_ids.empty()) { - unsigned char digest[kDigestSize]; std::vector> buffers; for (size_t i = 0; i < evicted_ids.size(); ++i) { ARROW_CHECK(evicted_entries[i]->pointer != nullptr); @@ -486,7 +485,6 @@ void PlasmaStore::ProcessGetRequest(Client* client, if (external_store_->Get(evicted_ids, buffers).ok()) { for (size_t i = 0; i < evicted_ids.size(); ++i) { evicted_entries[i]->state = ObjectState::PLASMA_SEALED; - std::memcpy(&evicted_entries[i]->digest[0], &digest[0], kDigestSize); evicted_entries[i]->construct_duration = std::time(nullptr) - evicted_entries[i]->create_time; PlasmaObject_init(&get_req->objects[evicted_ids[i]], evicted_entries[i]); @@ -574,8 +572,7 @@ ObjectStatus PlasmaStore::ContainsObject(const ObjectID& object_id) { : ObjectStatus::OBJECT_NOT_FOUND; } -void PlasmaStore::SealObjects(const std::vector& object_ids, - const std::vector& digests) { +void PlasmaStore::SealObjects(const std::vector& object_ids) { std::vector infos; ARROW_LOG(DEBUG) << "sealing " << object_ids.size() << " objects"; @@ -586,15 +583,12 @@ void PlasmaStore::SealObjects(const std::vector& object_ids, ARROW_CHECK(entry->state == ObjectState::PLASMA_CREATED); // Set the state of object to SEALED. entry->state = ObjectState::PLASMA_SEALED; - // Set the object digest. - std::memcpy(&entry->digest[0], digests[i].c_str(), kDigestSize); // Set object construction duration. entry->construct_duration = std::time(nullptr) - entry->create_time; object_info.object_id = object_ids[i].Binary(); object_info.data_size = entry->data_size; object_info.metadata_size = entry->metadata_size; - object_info.digest = digests[i]; infos.push_back(object_info); } @@ -901,8 +895,6 @@ void PlasmaStore::SubscribeToUpdates(Client* client) { info.object_id = entry.first.Binary(); info.data_size = entry.second->data_size; info.metadata_size = entry.second->metadata_size; - info.digest = - std::string(reinterpret_cast(&entry.second->digest[0]), kDigestSize); PushNotification(&info, fd); } } @@ -948,10 +940,8 @@ Status PlasmaStore::ProcessMessage(Client* client) { bool evict_if_full; std::string data; std::string metadata; - std::string digest; - digest.reserve(kDigestSize); RETURN_NOT_OK(ReadCreateAndSealRequest(input, input_size, &object_id, - &evict_if_full, &data, &metadata, &digest)); + &evict_if_full, &data, &metadata)); // CreateAndSeal currently only supports device_num = 0, which corresponds // to the host. int device_num = 0; @@ -965,7 +955,7 @@ Status PlasmaStore::ProcessMessage(Client* client) { // Write the inlined data and metadata into the allocated object. std::memcpy(entry->pointer, data.data(), data.size()); std::memcpy(entry->pointer + data.size(), metadata.data(), metadata.size()); - SealObjects({object_id}, {digest}); + SealObjects({object_id}); // Remove the client from the object's array of clients because the // object is not being used by any client. The client was added to the // object's array of clients in CreateObject. This is analogous to the @@ -981,10 +971,9 @@ Status PlasmaStore::ProcessMessage(Client* client) { std::vector object_ids; std::vector data; std::vector metadata; - std::vector digests; RETURN_NOT_OK(ReadCreateAndSealBatchRequest( - input, input_size, &object_ids, &evict_if_full, &data, &metadata, &digests)); + input, input_size, &object_ids, &evict_if_full, &data, &metadata)); // CreateAndSeal currently only supports device_num = 0, which corresponds // to the host. @@ -1011,7 +1000,7 @@ Status PlasmaStore::ProcessMessage(Client* client) { metadata[i].size()); } - SealObjects(object_ids, digests); + SealObjects(object_ids); // Remove the client from the object's array of clients because the // object is not being used by any client. The client was added to the // object's array of clients in CreateObject. This is analogous to the @@ -1063,14 +1052,9 @@ Status PlasmaStore::ProcessMessage(Client* client) { HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 0), client->fd); } } break; - case fb::MessageType::PlasmaListRequest: { - RETURN_NOT_OK(ReadListRequest(input, input_size)); - HANDLE_SIGPIPE(SendListReply(client->fd, store_info_.objects), client->fd); - } break; case fb::MessageType::PlasmaSealRequest: { - std::string digest; - RETURN_NOT_OK(ReadSealRequest(input, input_size, &object_id, &digest)); - SealObjects({object_id}, {digest}); + RETURN_NOT_OK(ReadSealRequest(input, input_size, &object_id)); + SealObjects({object_id}); HANDLE_SIGPIPE(SendSealReply(client->fd, object_id, PlasmaError::OK), client->fd); } break; case fb::MessageType::PlasmaEvictRequest: { diff --git a/src/ray/object_manager/plasma/store.h b/src/ray/object_manager/plasma/store.h index f1a24788d..cbc5f33cd 100644 --- a/src/ray/object_manager/plasma/store.h +++ b/src/ray/object_manager/plasma/store.h @@ -137,10 +137,7 @@ class PlasmaStore { /// get. /// /// \param object_ids The vector of Object IDs of the objects to be sealed. - /// \param digests The vector of digests of the objects. This is used to tell if two - /// objects with the same object ID are the same. - void SealObjects(const std::vector& object_ids, - const std::vector& digests); + void SealObjects(const std::vector& object_ids); /// Check if the plasma store contains an object: ///