[Core] Remove digests in plasma (4x performance improvement) (#8980)

* remove digest in plasma

* totally remove list
This commit is contained in:
Siyuan (Ryans) Zhuang 2020-06-22 14:24:32 -07:00 committed by GitHub
parent 275da2e400
commit 7a110b9401
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 26 additions and 190 deletions

View file

@ -32,9 +32,6 @@ table ObjectInfo {
create_time: long; create_time: long;
// How long creation of this object took. // How long creation of this object took.
construct_duration: long; construct_duration: long;
// Hash of the object content. If the object is not sealed yet this is
// an empty string.
digest: string;
// Specifies if this object was deleted or added. // Specifies if this object was deleted or added.
is_deletion: bool; is_deletion: bool;
} }

View file

@ -261,8 +261,6 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
Status Contains(const ObjectID& object_id, bool* has_object); Status Contains(const ObjectID& object_id, bool* has_object);
Status List(ObjectTable* objects);
Status Abort(const ObjectID& object_id); Status Abort(const ObjectID& object_id);
Status Seal(const ObjectID& object_id); Status Seal(const ObjectID& object_id);
@ -508,15 +506,9 @@ Status PlasmaClient::Impl::CreateAndSeal(const ObjectID& object_id,
std::lock_guard<std::recursive_mutex> guard(client_mutex_); std::lock_guard<std::recursive_mutex> guard(client_mutex_);
ARROW_LOG(DEBUG) << "called CreateAndSeal on conn " << store_conn_; ARROW_LOG(DEBUG) << "called CreateAndSeal on conn " << store_conn_;
// Compute the object hash.
static unsigned char digest[kDigestSize];
uint64_t hash = ComputeObjectHashCPU(
reinterpret_cast<const uint8_t*>(data.data()), data.size(),
reinterpret_cast<const uint8_t*>(metadata.data()), metadata.size());
memcpy(&digest[0], &hash, sizeof(hash));
RETURN_NOT_OK(SendCreateAndSealRequest(store_conn_, object_id, evict_if_full, data, RETURN_NOT_OK(SendCreateAndSealRequest(store_conn_, object_id, evict_if_full, data,
metadata, digest)); metadata));
std::vector<uint8_t> buffer; std::vector<uint8_t> buffer;
RETURN_NOT_OK( RETURN_NOT_OK(
PlasmaReceive(store_conn_, MessageType::PlasmaCreateAndSealReply, &buffer)); PlasmaReceive(store_conn_, MessageType::PlasmaCreateAndSealReply, &buffer));
@ -532,19 +524,8 @@ Status PlasmaClient::Impl::CreateAndSealBatch(const std::vector<ObjectID>& objec
ARROW_LOG(DEBUG) << "called CreateAndSealBatch on conn " << store_conn_; ARROW_LOG(DEBUG) << "called CreateAndSealBatch on conn " << store_conn_;
std::vector<std::string> digests;
for (size_t i = 0; i < object_ids.size(); i++) {
// Compute the object hash.
std::string digest;
uint64_t hash = ComputeObjectHashCPU(
reinterpret_cast<const uint8_t*>(data.data()), data.size(),
reinterpret_cast<const uint8_t*>(metadata.data()), metadata.size());
digest.assign(reinterpret_cast<char*>(&hash), sizeof(hash));
digests.push_back(digest);
}
RETURN_NOT_OK(SendCreateAndSealBatchRequest(store_conn_, object_ids, evict_if_full, RETURN_NOT_OK(SendCreateAndSealBatchRequest(store_conn_, object_ids, evict_if_full,
data, metadata, digests)); data, metadata));
std::vector<uint8_t> buffer; std::vector<uint8_t> buffer;
RETURN_NOT_OK( RETURN_NOT_OK(
PlasmaReceive(store_conn_, MessageType::PlasmaCreateAndSealBatchReply, &buffer)); PlasmaReceive(store_conn_, MessageType::PlasmaCreateAndSealBatchReply, &buffer));
@ -780,14 +761,6 @@ Status PlasmaClient::Impl::Contains(const ObjectID& object_id, bool* has_object)
return Status::OK(); return Status::OK();
} }
Status PlasmaClient::Impl::List(ObjectTable* objects) {
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
RETURN_NOT_OK(SendListRequest(store_conn_));
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaListReply, &buffer));
return ReadListReply(buffer.data(), buffer.size(), objects);
}
static void ComputeBlockHash(const unsigned char* data, int64_t nbytes, uint64_t* hash) { static void ComputeBlockHash(const unsigned char* data, int64_t nbytes, uint64_t* hash) {
XXH64_state_t hash_state; XXH64_state_t hash_state;
XXH64_reset(&hash_state, XXH64_DEFAULT_SEED); XXH64_reset(&hash_state, XXH64_DEFAULT_SEED);
@ -876,10 +849,7 @@ Status PlasmaClient::Impl::Seal(const ObjectID& object_id) {
object_entry->second->is_sealed = true; object_entry->second->is_sealed = true;
/// Send the seal request to Plasma. /// Send the seal request to Plasma.
std::vector<uint8_t> digest(kDigestSize); RETURN_NOT_OK(SendSealRequest(store_conn_, object_id));
RETURN_NOT_OK(Hash(object_id, &digest[0]));
RETURN_NOT_OK(
SendSealRequest(store_conn_, object_id, std::string(digest.begin(), digest.end())));
std::vector<uint8_t> buffer; std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaSealReply, &buffer)); RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaSealReply, &buffer));
ObjectID sealed_id; ObjectID sealed_id;
@ -1200,8 +1170,6 @@ Status PlasmaClient::Contains(const ObjectID& object_id, bool* has_object) {
return impl_->Contains(object_id, has_object); return impl_->Contains(object_id, has_object);
} }
Status PlasmaClient::List(ObjectTable* objects) { return impl_->List(objects); }
Status PlasmaClient::Abort(const ObjectID& object_id) { return impl_->Abort(object_id); } Status PlasmaClient::Abort(const ObjectID& object_id) { return impl_->Abort(object_id); }
Status PlasmaClient::Seal(const ObjectID& object_id) { return impl_->Seal(object_id); } Status PlasmaClient::Seal(const ObjectID& object_id) { return impl_->Seal(object_id); }

View file

@ -178,21 +178,6 @@ class ARROW_EXPORT PlasmaClient {
/// \return The return status. /// \return The return status.
Status Contains(const ObjectID& object_id, bool* has_object); Status Contains(const ObjectID& object_id, bool* has_object);
/// List all the objects in the object store.
///
/// This API is experimental and might change in the future.
///
/// \param[out] objects ObjectTable of objects in the store. For each entry
/// in the map, the following fields are available:
/// - metadata_size: Size of the object metadata in bytes
/// - data_size: Size of the object data in bytes
/// - ref_count: Number of clients referencing the object buffer
/// - create_time: Unix timestamp of the object creation
/// - construct_duration: Object creation time in seconds
/// - state: Is the object still being created or already sealed?
/// \return The return status.
Status List(ObjectTable* objects);
/// Abort an unsealed object in the object store. If the abort succeeds, then /// Abort an unsealed object in the object store. If the abort succeeds, then
/// it will be as if the object was never created at all. The unsealed object /// it will be as if the object was never created at all. The unsealed object
/// must have only a single reference (the one that would have been removed by /// must have only a single reference (the one that would have been removed by

View file

@ -31,9 +31,6 @@ table ObjectInfo {
create_time: long; create_time: long;
// How long creation of this object took. // How long creation of this object took.
construct_duration: long; construct_duration: long;
// Hash of the object content. If the object is not sealed yet this is
// an empty string.
digest: string;
// Specifies if this object was deleted or added. // Specifies if this object was deleted or added.
is_deletion: bool; is_deletion: bool;
} }

View file

@ -45,9 +45,6 @@ enum MessageType:long {
// See if the store contains an object (will be deprecated). // See if the store contains an object (will be deprecated).
PlasmaContainsRequest, PlasmaContainsRequest,
PlasmaContainsReply, PlasmaContainsReply,
// List all objects in the store.
PlasmaListRequest,
PlasmaListReply,
// Get information for a newly connecting client. // Get information for a newly connecting client.
PlasmaConnectRequest, PlasmaConnectRequest,
PlasmaConnectReply, PlasmaConnectReply,
@ -179,8 +176,6 @@ table PlasmaCreateAndSealRequest {
data: string; data: string;
// The object's metadata. // The object's metadata.
metadata: string; metadata: string;
// Hash of the object data.
digest: string;
} }
table PlasmaCreateAndSealReply { table PlasmaCreateAndSealReply {
@ -194,7 +189,6 @@ table PlasmaCreateAndSealBatchRequest {
evict_if_full: bool; evict_if_full: bool;
data: [string]; data: [string];
metadata: [string]; metadata: [string];
digest: [string];
} }
table PlasmaCreateAndSealBatchReply { table PlasmaCreateAndSealBatchReply {
@ -215,8 +209,6 @@ table PlasmaAbortReply {
table PlasmaSealRequest { table PlasmaSealRequest {
// ID of the object to be sealed. // ID of the object to be sealed.
object_id: string; object_id: string;
// Hash of the object data.
digest: string;
} }
table PlasmaSealReply { table PlasmaSealReply {
@ -294,13 +286,6 @@ table PlasmaContainsReply {
has_object: int; has_object: int;
} }
table PlasmaListRequest {
}
table PlasmaListReply {
objects: [ObjectInfo];
}
// PlasmaConnect is used by a plasma client the first time it connects with the // PlasmaConnect is used by a plasma client the first time it connects with the
// store. This is not really necessary, but is used to get some information // store. This is not really necessary, but is used to get some information
// about the store such as its memory capacity. // about the store such as its memory capacity.

View file

@ -266,19 +266,17 @@ Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id,
} }
Status SendCreateAndSealRequest(int sock, const ObjectID& object_id, bool evict_if_full, Status SendCreateAndSealRequest(int sock, const ObjectID& object_id, bool evict_if_full,
const std::string& data, const std::string& metadata, const std::string& data, const std::string& metadata) {
unsigned char* digest) {
flatbuffers::FlatBufferBuilder fbb; flatbuffers::FlatBufferBuilder fbb;
auto digest_string = fbb.CreateString(reinterpret_cast<char*>(digest), kDigestSize);
auto message = fb::CreatePlasmaCreateAndSealRequest( auto message = fb::CreatePlasmaCreateAndSealRequest(
fbb, fbb.CreateString(object_id.Binary()), evict_if_full, fbb.CreateString(data), fbb, fbb.CreateString(object_id.Binary()), evict_if_full, fbb.CreateString(data),
fbb.CreateString(metadata), digest_string); fbb.CreateString(metadata));
return PlasmaSend(sock, MessageType::PlasmaCreateAndSealRequest, &fbb, message); return PlasmaSend(sock, MessageType::PlasmaCreateAndSealRequest, &fbb, message);
} }
Status ReadCreateAndSealRequest(uint8_t* data, size_t size, ObjectID* object_id, Status ReadCreateAndSealRequest(uint8_t* data, size_t size, ObjectID* object_id,
bool* evict_if_full, std::string* object_data, bool* evict_if_full, std::string* object_data,
std::string* metadata, std::string* digest) { std::string* metadata) {
DCHECK(data); DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaCreateAndSealRequest>(data); auto message = flatbuffers::GetRoot<fb::PlasmaCreateAndSealRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size)); DCHECK(VerifyFlatbuffer(message, data, size));
@ -287,22 +285,18 @@ Status ReadCreateAndSealRequest(uint8_t* data, size_t size, ObjectID* object_id,
*evict_if_full = message->evict_if_full(); *evict_if_full = message->evict_if_full();
*object_data = message->data()->str(); *object_data = message->data()->str();
*metadata = message->metadata()->str(); *metadata = message->metadata()->str();
ARROW_CHECK(message->digest()->size() == kDigestSize);
digest->assign(message->digest()->data(), kDigestSize);
return Status::OK(); return Status::OK();
} }
Status SendCreateAndSealBatchRequest(int sock, const std::vector<ObjectID>& object_ids, Status SendCreateAndSealBatchRequest(int sock, const std::vector<ObjectID>& object_ids,
bool evict_if_full, bool evict_if_full,
const std::vector<std::string>& data, const std::vector<std::string>& data,
const std::vector<std::string>& metadata, const std::vector<std::string>& metadata) {
const std::vector<std::string>& digests) {
flatbuffers::FlatBufferBuilder fbb; flatbuffers::FlatBufferBuilder fbb;
auto message = fb::CreatePlasmaCreateAndSealBatchRequest( auto message = fb::CreatePlasmaCreateAndSealBatchRequest(
fbb, ToFlatbuffer(&fbb, object_ids.data(), object_ids.size()), evict_if_full, fbb, ToFlatbuffer(&fbb, object_ids.data(), object_ids.size()), evict_if_full,
ToFlatbuffer(&fbb, data), ToFlatbuffer(&fbb, metadata), ToFlatbuffer(&fbb, data), ToFlatbuffer(&fbb, metadata));
ToFlatbuffer(&fbb, digests));
return PlasmaSend(sock, MessageType::PlasmaCreateAndSealBatchRequest, &fbb, message); return PlasmaSend(sock, MessageType::PlasmaCreateAndSealBatchRequest, &fbb, message);
} }
@ -311,8 +305,7 @@ Status ReadCreateAndSealBatchRequest(uint8_t* data, size_t size,
std::vector<ObjectID>* object_ids, std::vector<ObjectID>* object_ids,
bool* evict_if_full, bool* evict_if_full,
std::vector<std::string>* object_data, std::vector<std::string>* object_data,
std::vector<std::string>* metadata, std::vector<std::string>* metadata) {
std::vector<std::string>* digests) {
DCHECK(data); DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaCreateAndSealBatchRequest>(data); auto message = flatbuffers::GetRoot<fb::PlasmaCreateAndSealBatchRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size)); DCHECK(VerifyFlatbuffer(message, data, size));
@ -329,9 +322,6 @@ Status ReadCreateAndSealBatchRequest(uint8_t* data, size_t size,
ConvertToVector(message->metadata(), metadata, ConvertToVector(message->metadata(), metadata,
[](const flatbuffers::String& element) { return element.str(); }); [](const flatbuffers::String& element) { return element.str(); });
ConvertToVector(message->digest(), digests,
[](const flatbuffers::String& element) { return element.str(); });
return Status::OK(); return Status::OK();
} }
@ -392,21 +382,17 @@ Status ReadAbortReply(uint8_t* data, size_t size, ObjectID* object_id) {
// Seal messages. // Seal messages.
Status SendSealRequest(int sock, ObjectID object_id, const std::string& digest) { Status SendSealRequest(int sock, ObjectID object_id) {
flatbuffers::FlatBufferBuilder fbb; flatbuffers::FlatBufferBuilder fbb;
auto message = fb::CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.Binary()), auto message = fb::CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.Binary()));
fbb.CreateString(digest));
return PlasmaSend(sock, MessageType::PlasmaSealRequest, &fbb, message); return PlasmaSend(sock, MessageType::PlasmaSealRequest, &fbb, message);
} }
Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id, Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id) {
std::string* digest) {
DCHECK(data); DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaSealRequest>(data); auto message = flatbuffers::GetRoot<fb::PlasmaSealRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size)); DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::FromBinary(message->object_id()->str()); *object_id = ObjectID::FromBinary(message->object_id()->str());
ARROW_CHECK_EQ(message->digest()->size(), kDigestSize);
digest->assign(message->digest()->data(), kDigestSize);
return Status::OK(); return Status::OK();
} }
@ -545,55 +531,6 @@ Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id,
return Status::OK(); return Status::OK();
} }
// List messages.
Status SendListRequest(int sock) {
flatbuffers::FlatBufferBuilder fbb;
auto message = fb::CreatePlasmaListRequest(fbb);
return PlasmaSend(sock, MessageType::PlasmaListRequest, &fbb, message);
}
Status ReadListRequest(uint8_t* data, size_t size) { return Status::OK(); }
Status SendListReply(int sock, const ObjectTable& objects) {
flatbuffers::FlatBufferBuilder fbb;
std::vector<flatbuffers::Offset<fb::ObjectInfo>> object_infos;
for (auto const& entry : objects) {
auto digest = entry.second->state == ObjectState::PLASMA_CREATED
? fbb.CreateString("")
: fbb.CreateString(reinterpret_cast<char*>(entry.second->digest),
kDigestSize);
auto info = fb::CreateObjectInfo(fbb, fbb.CreateString(entry.first.Binary()),
entry.second->data_size, entry.second->metadata_size,
entry.second->ref_count, entry.second->create_time,
entry.second->construct_duration, digest);
object_infos.push_back(info);
}
auto message = fb::CreatePlasmaListReply(
fbb, fbb.CreateVector(arrow::util::MakeNonNull(object_infos.data()),
object_infos.size()));
return PlasmaSend(sock, MessageType::PlasmaListReply, &fbb, message);
}
Status ReadListReply(uint8_t* data, size_t size, ObjectTable* objects) {
DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaListReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
for (auto const& object : *message->objects()) {
ObjectID object_id = ObjectID::FromBinary(object->object_id()->str());
auto entry = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry());
entry->data_size = object->data_size();
entry->metadata_size = object->metadata_size();
entry->ref_count = object->ref_count();
entry->create_time = object->create_time();
entry->construct_duration = object->construct_duration();
entry->state = object->digest()->size() == 0 ? ObjectState::PLASMA_CREATED
: ObjectState::PLASMA_SEALED;
(*objects)[object_id] = std::move(entry);
}
return Status::OK();
}
// Connect messages. // Connect messages.
Status SendConnectRequest(int sock) { Status SendConnectRequest(int sock) {

View file

@ -90,25 +90,22 @@ Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id,
PlasmaObject* object, int* store_fd, int64_t* mmap_size); PlasmaObject* object, int* store_fd, int64_t* mmap_size);
Status SendCreateAndSealRequest(int sock, const ObjectID& object_id, bool evict_if_full, Status SendCreateAndSealRequest(int sock, const ObjectID& object_id, bool evict_if_full,
const std::string& data, const std::string& metadata, const std::string& data, const std::string& metadata);
unsigned char* digest);
Status ReadCreateAndSealRequest(uint8_t* data, size_t size, ObjectID* object_id, Status ReadCreateAndSealRequest(uint8_t* data, size_t size, ObjectID* object_id,
bool* evict_if_full, std::string* object_data, bool* evict_if_full, std::string* object_data,
std::string* metadata, std::string* digest); std::string* metadata);
Status SendCreateAndSealBatchRequest(int sock, const std::vector<ObjectID>& object_ids, Status SendCreateAndSealBatchRequest(int sock, const std::vector<ObjectID>& object_ids,
bool evict_if_full, bool evict_if_full,
const std::vector<std::string>& data, const std::vector<std::string>& data,
const std::vector<std::string>& metadata, const std::vector<std::string>& metadata);
const std::vector<std::string>& digests);
Status ReadCreateAndSealBatchRequest(uint8_t* data, size_t size, Status ReadCreateAndSealBatchRequest(uint8_t* data, size_t size,
std::vector<ObjectID>* object_id, std::vector<ObjectID>* object_id,
bool* evict_if_full, bool* evict_if_full,
std::vector<std::string>* object_data, std::vector<std::string>* object_data,
std::vector<std::string>* metadata, std::vector<std::string>* metadata);
std::vector<std::string>* digests);
Status SendCreateAndSealReply(int sock, PlasmaError error); Status SendCreateAndSealReply(int sock, PlasmaError error);
@ -128,10 +125,9 @@ Status ReadAbortReply(uint8_t* data, size_t size, ObjectID* object_id);
/* Plasma Seal message functions. */ /* Plasma Seal message functions. */
Status SendSealRequest(int sock, ObjectID object_id, const std::string& digest); Status SendSealRequest(int sock, ObjectID object_id);
Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id, Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id);
std::string* digest);
Status SendSealReply(int sock, ObjectID object_id, PlasmaError error); Status SendSealReply(int sock, ObjectID object_id, PlasmaError error);
@ -187,16 +183,6 @@ Status SendContainsReply(int sock, ObjectID object_id, bool has_object);
Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id, Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id,
bool* has_object); bool* has_object);
/* Plasma List message functions. */
Status SendListRequest(int sock);
Status ReadListRequest(uint8_t* data, size_t size);
Status SendListReply(int sock, const ObjectTable& objects);
Status ReadListReply(uint8_t* data, size_t size, ObjectTable* objects);
/* Plasma Connect message functions. */ /* Plasma Connect message functions. */
Status SendConnectRequest(int sock); Status SendConnectRequest(int sock);

View file

@ -476,7 +476,6 @@ void PlasmaStore::ProcessGetRequest(Client* client,
} }
if (!evicted_ids.empty()) { if (!evicted_ids.empty()) {
unsigned char digest[kDigestSize];
std::vector<std::shared_ptr<Buffer>> buffers; std::vector<std::shared_ptr<Buffer>> buffers;
for (size_t i = 0; i < evicted_ids.size(); ++i) { for (size_t i = 0; i < evicted_ids.size(); ++i) {
ARROW_CHECK(evicted_entries[i]->pointer != nullptr); ARROW_CHECK(evicted_entries[i]->pointer != nullptr);
@ -486,7 +485,6 @@ void PlasmaStore::ProcessGetRequest(Client* client,
if (external_store_->Get(evicted_ids, buffers).ok()) { if (external_store_->Get(evicted_ids, buffers).ok()) {
for (size_t i = 0; i < evicted_ids.size(); ++i) { for (size_t i = 0; i < evicted_ids.size(); ++i) {
evicted_entries[i]->state = ObjectState::PLASMA_SEALED; evicted_entries[i]->state = ObjectState::PLASMA_SEALED;
std::memcpy(&evicted_entries[i]->digest[0], &digest[0], kDigestSize);
evicted_entries[i]->construct_duration = evicted_entries[i]->construct_duration =
std::time(nullptr) - evicted_entries[i]->create_time; std::time(nullptr) - evicted_entries[i]->create_time;
PlasmaObject_init(&get_req->objects[evicted_ids[i]], evicted_entries[i]); PlasmaObject_init(&get_req->objects[evicted_ids[i]], evicted_entries[i]);
@ -574,8 +572,7 @@ ObjectStatus PlasmaStore::ContainsObject(const ObjectID& object_id) {
: ObjectStatus::OBJECT_NOT_FOUND; : ObjectStatus::OBJECT_NOT_FOUND;
} }
void PlasmaStore::SealObjects(const std::vector<ObjectID>& object_ids, void PlasmaStore::SealObjects(const std::vector<ObjectID>& object_ids) {
const std::vector<std::string>& digests) {
std::vector<ObjectInfoT> infos; std::vector<ObjectInfoT> infos;
ARROW_LOG(DEBUG) << "sealing " << object_ids.size() << " objects"; ARROW_LOG(DEBUG) << "sealing " << object_ids.size() << " objects";
@ -586,15 +583,12 @@ void PlasmaStore::SealObjects(const std::vector<ObjectID>& object_ids,
ARROW_CHECK(entry->state == ObjectState::PLASMA_CREATED); ARROW_CHECK(entry->state == ObjectState::PLASMA_CREATED);
// Set the state of object to SEALED. // Set the state of object to SEALED.
entry->state = ObjectState::PLASMA_SEALED; entry->state = ObjectState::PLASMA_SEALED;
// Set the object digest.
std::memcpy(&entry->digest[0], digests[i].c_str(), kDigestSize);
// Set object construction duration. // Set object construction duration.
entry->construct_duration = std::time(nullptr) - entry->create_time; entry->construct_duration = std::time(nullptr) - entry->create_time;
object_info.object_id = object_ids[i].Binary(); object_info.object_id = object_ids[i].Binary();
object_info.data_size = entry->data_size; object_info.data_size = entry->data_size;
object_info.metadata_size = entry->metadata_size; object_info.metadata_size = entry->metadata_size;
object_info.digest = digests[i];
infos.push_back(object_info); infos.push_back(object_info);
} }
@ -901,8 +895,6 @@ void PlasmaStore::SubscribeToUpdates(Client* client) {
info.object_id = entry.first.Binary(); info.object_id = entry.first.Binary();
info.data_size = entry.second->data_size; info.data_size = entry.second->data_size;
info.metadata_size = entry.second->metadata_size; info.metadata_size = entry.second->metadata_size;
info.digest =
std::string(reinterpret_cast<char*>(&entry.second->digest[0]), kDigestSize);
PushNotification(&info, fd); PushNotification(&info, fd);
} }
} }
@ -948,10 +940,8 @@ Status PlasmaStore::ProcessMessage(Client* client) {
bool evict_if_full; bool evict_if_full;
std::string data; std::string data;
std::string metadata; std::string metadata;
std::string digest;
digest.reserve(kDigestSize);
RETURN_NOT_OK(ReadCreateAndSealRequest(input, input_size, &object_id, RETURN_NOT_OK(ReadCreateAndSealRequest(input, input_size, &object_id,
&evict_if_full, &data, &metadata, &digest)); &evict_if_full, &data, &metadata));
// CreateAndSeal currently only supports device_num = 0, which corresponds // CreateAndSeal currently only supports device_num = 0, which corresponds
// to the host. // to the host.
int device_num = 0; int device_num = 0;
@ -965,7 +955,7 @@ Status PlasmaStore::ProcessMessage(Client* client) {
// Write the inlined data and metadata into the allocated object. // Write the inlined data and metadata into the allocated object.
std::memcpy(entry->pointer, data.data(), data.size()); std::memcpy(entry->pointer, data.data(), data.size());
std::memcpy(entry->pointer + data.size(), metadata.data(), metadata.size()); std::memcpy(entry->pointer + data.size(), metadata.data(), metadata.size());
SealObjects({object_id}, {digest}); SealObjects({object_id});
// Remove the client from the object's array of clients because the // Remove the client from the object's array of clients because the
// object is not being used by any client. The client was added to the // object is not being used by any client. The client was added to the
// object's array of clients in CreateObject. This is analogous to the // object's array of clients in CreateObject. This is analogous to the
@ -981,10 +971,9 @@ Status PlasmaStore::ProcessMessage(Client* client) {
std::vector<ObjectID> object_ids; std::vector<ObjectID> object_ids;
std::vector<std::string> data; std::vector<std::string> data;
std::vector<std::string> metadata; std::vector<std::string> metadata;
std::vector<std::string> digests;
RETURN_NOT_OK(ReadCreateAndSealBatchRequest( RETURN_NOT_OK(ReadCreateAndSealBatchRequest(
input, input_size, &object_ids, &evict_if_full, &data, &metadata, &digests)); input, input_size, &object_ids, &evict_if_full, &data, &metadata));
// CreateAndSeal currently only supports device_num = 0, which corresponds // CreateAndSeal currently only supports device_num = 0, which corresponds
// to the host. // to the host.
@ -1011,7 +1000,7 @@ Status PlasmaStore::ProcessMessage(Client* client) {
metadata[i].size()); metadata[i].size());
} }
SealObjects(object_ids, digests); SealObjects(object_ids);
// Remove the client from the object's array of clients because the // Remove the client from the object's array of clients because the
// object is not being used by any client. The client was added to the // object is not being used by any client. The client was added to the
// object's array of clients in CreateObject. This is analogous to the // object's array of clients in CreateObject. This is analogous to the
@ -1063,14 +1052,9 @@ Status PlasmaStore::ProcessMessage(Client* client) {
HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 0), client->fd); HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 0), client->fd);
} }
} break; } break;
case fb::MessageType::PlasmaListRequest: {
RETURN_NOT_OK(ReadListRequest(input, input_size));
HANDLE_SIGPIPE(SendListReply(client->fd, store_info_.objects), client->fd);
} break;
case fb::MessageType::PlasmaSealRequest: { case fb::MessageType::PlasmaSealRequest: {
std::string digest; RETURN_NOT_OK(ReadSealRequest(input, input_size, &object_id));
RETURN_NOT_OK(ReadSealRequest(input, input_size, &object_id, &digest)); SealObjects({object_id});
SealObjects({object_id}, {digest});
HANDLE_SIGPIPE(SendSealReply(client->fd, object_id, PlasmaError::OK), client->fd); HANDLE_SIGPIPE(SendSealReply(client->fd, object_id, PlasmaError::OK), client->fd);
} break; } break;
case fb::MessageType::PlasmaEvictRequest: { case fb::MessageType::PlasmaEvictRequest: {

View file

@ -137,10 +137,7 @@ class PlasmaStore {
/// get. /// get.
/// ///
/// \param object_ids The vector of Object IDs of the objects to be sealed. /// \param object_ids The vector of Object IDs of the objects to be sealed.
/// \param digests The vector of digests of the objects. This is used to tell if two void SealObjects(const std::vector<ObjectID>& object_ids);
/// objects with the same object ID are the same.
void SealObjects(const std::vector<ObjectID>& object_ids,
const std::vector<std::string>& digests);
/// Check if the plasma store contains an object: /// Check if the plasma store contains an object:
/// ///