[Core] Use Ray ObjectID in Plasma (#8852)

* Use Ray ObjectIDs instead

* remove unused code
This commit is contained in:
Siyuan (Ryans) Zhuang 2020-06-09 10:10:49 -07:00 committed by GitHub
parent b8cc9a1cbb
commit 3d473600a8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 101 additions and 260 deletions

View file

@ -295,14 +295,19 @@ cc_library(
"@bazel_tools//src/conditions:windows": PROPAGATED_WINDOWS_DEFINES, "@bazel_tools//src/conditions:windows": PROPAGATED_WINDOWS_DEFINES,
"//conditions:default": [], "//conditions:default": [],
}), }),
includes = [
"src",
],
linkopts = PLASMA_LINKOPTS, linkopts = PLASMA_LINKOPTS,
strip_include_prefix = "src",
deps = [ deps = [
":common_fbs", ":common_fbs",
":plasma_fbs", ":plasma_fbs",
":platform_shims", ":platform_shims",
":ray_common",
":ray_util",
"@arrow", "@arrow",
"@com_github_google_glog//:glog", "@com_github_google_glog//:glog",
"@msgpack",
], ],
) )
@ -389,8 +394,10 @@ cc_library(
"src/ray/thirdparty/dlmalloc.c", "src/ray/thirdparty/dlmalloc.c",
], ],
copts = PLASMA_COPTS, copts = PLASMA_COPTS,
includes = [
"src",
],
linkopts = PLASMA_LINKOPTS, linkopts = PLASMA_LINKOPTS,
strip_include_prefix = "src",
deps = [ deps = [
":ae", ":ae",
":plasma_client", ":plasma_client",
@ -459,8 +466,8 @@ cc_library(
":common_cc_proto", ":common_cc_proto",
":gcs_cc_proto", ":gcs_cc_proto",
":node_manager_fbs", ":node_manager_fbs",
":plasma_client",
":ray_util", ":ray_util",
"@arrow",
"@boost//:asio", "@boost//:asio",
"@com_github_grpc_grpc//:grpc++", "@com_github_grpc_grpc//:grpc++",
"@com_google_absl//absl/container:flat_hash_map", "@com_google_absl//absl/container:flat_hash_map",
@ -1270,7 +1277,6 @@ cc_library(
], ],
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
":plasma_client",
":sha256", ":sha256",
"@boost//:asio", "@boost//:asio",
"@com_github_google_glog//:glog", "@com_github_google_glog//:glog",

View file

@ -18,12 +18,8 @@
#include <cstdint> #include <cstdint>
#include <cstdio> #include <cstdio>
#include "arrow/buffer.h"
#include "ray/common/status.h" #include "ray/common/status.h"
#include "ray/object_manager/plasma/client.h"
namespace arrow {
class Buffer;
}
namespace ray { namespace ray {

View file

@ -21,9 +21,6 @@
/// Length of Ray full-length IDs in bytes. /// Length of Ray full-length IDs in bytes.
constexpr size_t kUniqueIDSize = 20; constexpr size_t kUniqueIDSize = 20;
/// Length of plasma ID in bytes.
constexpr size_t kPlasmaIdSize = 20;
/// An ObjectID's bytes are split into the task ID itself and the index of the /// An ObjectID's bytes are split into the task ID itself and the index of the
/// object's creation. This is the maximum width of the object index in bits. /// object's creation. This is the maximum width of the object index in bits.
constexpr int kObjectIdIndexSize = 32; constexpr int kObjectIdIndexSize = 32;

View file

@ -108,32 +108,12 @@ WorkerID ComputeDriverIdFromJob(const JobID &job_id) {
std::string(reinterpret_cast<const char *>(data.data()), data.size())); std::string(reinterpret_cast<const char *>(data.data()), data.size()));
} }
ObjectID ObjectID::FromPlasmaIdBinary(const std::string &from) {
RAY_CHECK(from.size() == kPlasmaIdSize);
return ObjectID::FromBinary(from.substr(0, ObjectID::kLength));
}
plasma::UniqueID ObjectID::ToPlasmaId() const {
static_assert(ObjectID::kLength <= kPlasmaIdSize,
"Currently length of ObjectID must be shorter than plasma's.");
plasma::UniqueID result;
std::memcpy(result.mutable_data(), Data(), ObjectID::Size());
std::fill_n(result.mutable_data() + ObjectID::Size(), kPlasmaIdSize - ObjectID::kLength,
0xFF);
return result;
}
ObjectID::ObjectID(const plasma::UniqueID &from) {
RAY_CHECK(from.size() <= static_cast<int64_t>(ObjectID::Size())) << "Out of size.";
std::memcpy(this->MutableData(), from.data(), ObjectID::Size());
}
ObjectIDFlagsType ObjectID::GetFlags() const { ObjectIDFlagsType ObjectID::GetFlags() const {
ObjectIDFlagsType flags; ObjectIDFlagsType flags;
std::memcpy(&flags, id_ + TaskID::kLength, sizeof(flags)); std::memcpy(&flags, id_ + TaskID::kLength, sizeof(flags));
return flags; return flags;
} }
bool ObjectID::CreatedByTask() const { return ::ray::CreatedByTask(this->GetFlags()); } bool ObjectID::CreatedByTask() const { return ::ray::CreatedByTask(this->GetFlags()); }
bool ObjectID::IsPutObject() const { bool ObjectID::IsPutObject() const {

View file

@ -26,7 +26,6 @@
#include <string> #include <string>
#include "ray/common/constants.h" #include "ray/common/constants.h"
#include "ray/object_manager/plasma/common.h"
#include "ray/util/logging.h" #include "ray/util/logging.h"
#include "ray/util/util.h" #include "ray/util/util.h"
#include "ray/util/visibility.h" #include "ray/util/visibility.h"
@ -274,16 +273,6 @@ class ObjectID : public BaseID<ObjectID> {
static size_t Size() { return kLength; } static size_t Size() { return kLength; }
/// Generate ObjectID by the given binary string of a plasma id.
///
/// \param from The binary string of the given plasma id.
/// \return The ObjectID converted from a binary string of the plasma id.
static ObjectID FromPlasmaIdBinary(const std::string &from);
plasma::ObjectID ToPlasmaId() const;
ObjectID(const plasma::UniqueID &from);
/// Get the index of this object in the task that created it. /// Get the index of this object in the task that created it.
/// ///
/// \return The index of object creation according to the task that created /// \return The index of object creation according to the task that created

View file

@ -92,7 +92,7 @@ Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr<Buffer> &meta
std::shared_ptr<arrow::Buffer> arrow_buffer; std::shared_ptr<arrow::Buffer> arrow_buffer;
{ {
std::lock_guard<std::mutex> guard(store_client_mutex_); std::lock_guard<std::mutex> guard(store_client_mutex_);
plasma_status = store_client_.Create(object_id.ToPlasmaId(), data_size, plasma_status = store_client_.Create(object_id, data_size,
metadata ? metadata->Data() : nullptr, metadata ? metadata->Data() : nullptr,
metadata ? metadata->Size() : 0, &arrow_buffer, metadata ? metadata->Size() : 0, &arrow_buffer,
/*device_num=*/0, evict_if_full); /*device_num=*/0, evict_if_full);
@ -136,19 +136,17 @@ Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr<Buffer> &meta
} }
Status CoreWorkerPlasmaStoreProvider::Seal(const ObjectID &object_id) { Status CoreWorkerPlasmaStoreProvider::Seal(const ObjectID &object_id) {
auto plasma_id = object_id.ToPlasmaId();
{ {
std::lock_guard<std::mutex> guard(store_client_mutex_); std::lock_guard<std::mutex> guard(store_client_mutex_);
RAY_ARROW_RETURN_NOT_OK(store_client_.Seal(plasma_id)); RAY_ARROW_RETURN_NOT_OK(store_client_.Seal(object_id));
} }
return Status::OK(); return Status::OK();
} }
Status CoreWorkerPlasmaStoreProvider::Release(const ObjectID &object_id) { Status CoreWorkerPlasmaStoreProvider::Release(const ObjectID &object_id) {
auto plasma_id = object_id.ToPlasmaId();
{ {
std::lock_guard<std::mutex> guard(store_client_mutex_); std::lock_guard<std::mutex> guard(store_client_mutex_);
RAY_ARROW_RETURN_NOT_OK(store_client_.Release(plasma_id)); RAY_ARROW_RETURN_NOT_OK(store_client_.Release(object_id));
} }
return Status::OK(); return Status::OK();
} }
@ -161,16 +159,10 @@ Status CoreWorkerPlasmaStoreProvider::FetchAndGetFromPlasmaStore(
RAY_RETURN_NOT_OK(raylet_client_->FetchOrReconstruct( RAY_RETURN_NOT_OK(raylet_client_->FetchOrReconstruct(
batch_ids, fetch_only, /*mark_worker_blocked*/ !in_direct_call, task_id)); batch_ids, fetch_only, /*mark_worker_blocked*/ !in_direct_call, task_id));
std::vector<plasma::ObjectID> plasma_batch_ids;
plasma_batch_ids.reserve(batch_ids.size());
for (size_t i = 0; i < batch_ids.size(); i++) {
plasma_batch_ids.push_back(batch_ids[i].ToPlasmaId());
}
std::vector<plasma::ObjectBuffer> plasma_results; std::vector<plasma::ObjectBuffer> plasma_results;
{ {
std::lock_guard<std::mutex> guard(store_client_mutex_); std::lock_guard<std::mutex> guard(store_client_mutex_);
RAY_ARROW_RETURN_NOT_OK( RAY_ARROW_RETURN_NOT_OK(store_client_.Get(batch_ids, timeout_ms, &plasma_results));
store_client_.Get(plasma_batch_ids, timeout_ms, &plasma_results));
} }
// Add successfully retrieved objects to the result map and remove them from // Add successfully retrieved objects to the result map and remove them from
@ -319,7 +311,7 @@ Status CoreWorkerPlasmaStoreProvider::Get(
Status CoreWorkerPlasmaStoreProvider::Contains(const ObjectID &object_id, Status CoreWorkerPlasmaStoreProvider::Contains(const ObjectID &object_id,
bool *has_object) { bool *has_object) {
std::lock_guard<std::mutex> guard(store_client_mutex_); std::lock_guard<std::mutex> guard(store_client_mutex_);
RAY_ARROW_RETURN_NOT_OK(store_client_.Contains(object_id.ToPlasmaId(), has_object)); RAY_ARROW_RETURN_NOT_OK(store_client_.Contains(object_id, has_object));
return Status::OK(); return Status::OK();
} }

View file

@ -57,8 +57,7 @@ std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status> ObjectBufferPool::Ge
std::lock_guard<std::mutex> lock(pool_mutex_); std::lock_guard<std::mutex> lock(pool_mutex_);
if (get_buffer_state_.count(object_id) == 0) { if (get_buffer_state_.count(object_id) == 0) {
plasma::ObjectBuffer object_buffer; plasma::ObjectBuffer object_buffer;
plasma::ObjectID plasma_id = object_id.ToPlasmaId(); RAY_ARROW_CHECK_OK(store_client_.Get(&object_id, 1, 0, &object_buffer));
RAY_ARROW_CHECK_OK(store_client_.Get(&plasma_id, 1, 0, &object_buffer));
if (object_buffer.data == nullptr) { if (object_buffer.data == nullptr) {
RAY_LOG(ERROR) << "Failed to get object"; RAY_LOG(ERROR) << "Failed to get object";
return std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status>( return std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status>(
@ -86,14 +85,14 @@ void ObjectBufferPool::ReleaseGetChunk(const ObjectID &object_id, uint64_t chunk
GetBufferState &buffer_state = get_buffer_state_[object_id]; GetBufferState &buffer_state = get_buffer_state_[object_id];
buffer_state.references--; buffer_state.references--;
if (buffer_state.references == 0) { if (buffer_state.references == 0) {
RAY_ARROW_CHECK_OK(store_client_.Release(object_id.ToPlasmaId())); RAY_ARROW_CHECK_OK(store_client_.Release(object_id));
get_buffer_state_.erase(object_id); get_buffer_state_.erase(object_id);
} }
} }
void ObjectBufferPool::AbortGet(const ObjectID &object_id) { void ObjectBufferPool::AbortGet(const ObjectID &object_id) {
std::lock_guard<std::mutex> lock(pool_mutex_); std::lock_guard<std::mutex> lock(pool_mutex_);
RAY_ARROW_CHECK_OK(store_client_.Release(object_id.ToPlasmaId())); RAY_ARROW_CHECK_OK(store_client_.Release(object_id));
get_buffer_state_.erase(object_id); get_buffer_state_.erase(object_id);
} }
@ -102,12 +101,11 @@ std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status> ObjectBufferPool::Cr
uint64_t chunk_index) { uint64_t chunk_index) {
std::lock_guard<std::mutex> lock(pool_mutex_); std::lock_guard<std::mutex> lock(pool_mutex_);
if (create_buffer_state_.count(object_id) == 0) { if (create_buffer_state_.count(object_id) == 0) {
const plasma::ObjectID plasma_id = object_id.ToPlasmaId();
int64_t object_size = data_size - metadata_size; int64_t object_size = data_size - metadata_size;
// Try to create shared buffer. // Try to create shared buffer.
std::shared_ptr<Buffer> data; std::shared_ptr<Buffer> data;
arrow::Status s = arrow::Status s =
store_client_.Create(plasma_id, object_size, NULL, metadata_size, &data); store_client_.Create(object_id, object_size, NULL, metadata_size, &data);
std::vector<boost::asio::mutable_buffer> buffer; std::vector<boost::asio::mutable_buffer> buffer;
if (!s.ok()) { if (!s.ok()) {
// Create failed. The object may already exist locally. If something else went // Create failed. The object may already exist locally. If something else went
@ -167,9 +165,8 @@ void ObjectBufferPool::SealChunk(const ObjectID &object_id, const uint64_t chunk
create_buffer_state_[object_id].chunk_state[chunk_index] = CreateChunkState::SEALED; create_buffer_state_[object_id].chunk_state[chunk_index] = CreateChunkState::SEALED;
create_buffer_state_[object_id].num_seals_remaining--; create_buffer_state_[object_id].num_seals_remaining--;
if (create_buffer_state_[object_id].num_seals_remaining == 0) { if (create_buffer_state_[object_id].num_seals_remaining == 0) {
const plasma::ObjectID plasma_id = object_id.ToPlasmaId(); RAY_ARROW_CHECK_OK(store_client_.Seal(object_id));
RAY_ARROW_CHECK_OK(store_client_.Seal(plasma_id)); RAY_ARROW_CHECK_OK(store_client_.Release(object_id));
RAY_ARROW_CHECK_OK(store_client_.Release(plasma_id));
create_buffer_state_.erase(object_id); create_buffer_state_.erase(object_id);
RAY_LOG(DEBUG) << "Have received all chunks for object " << object_id RAY_LOG(DEBUG) << "Have received all chunks for object " << object_id
<< ", last chunk index: " << chunk_index; << ", last chunk index: " << chunk_index;
@ -177,9 +174,8 @@ void ObjectBufferPool::SealChunk(const ObjectID &object_id, const uint64_t chunk
} }
void ObjectBufferPool::AbortCreate(const ObjectID &object_id) { void ObjectBufferPool::AbortCreate(const ObjectID &object_id) {
const plasma::ObjectID plasma_id = object_id.ToPlasmaId(); RAY_ARROW_CHECK_OK(store_client_.Release(object_id));
RAY_ARROW_CHECK_OK(store_client_.Release(plasma_id)); RAY_ARROW_CHECK_OK(store_client_.Abort(object_id));
RAY_ARROW_CHECK_OK(store_client_.Abort(plasma_id));
create_buffer_state_.erase(object_id); create_buffer_state_.erase(object_id);
} }
@ -202,13 +198,8 @@ std::vector<ObjectBufferPool::ChunkInfo> ObjectBufferPool::BuildChunks(
} }
void ObjectBufferPool::FreeObjects(const std::vector<ObjectID> &object_ids) { void ObjectBufferPool::FreeObjects(const std::vector<ObjectID> &object_ids) {
std::vector<plasma::ObjectID> plasma_ids;
plasma_ids.reserve(object_ids.size());
for (const auto &id : object_ids) {
plasma_ids.push_back(id.ToPlasmaId());
}
std::lock_guard<std::mutex> lock(pool_mutex_); std::lock_guard<std::mutex> lock(pool_mutex_);
RAY_ARROW_CHECK_OK(store_client_.Delete(plasma_ids)); RAY_ARROW_CHECK_OK(store_client_.Delete(object_ids));
} }
std::string ObjectBufferPool::DebugString() const { std::string ObjectBufferPool::DebugString() const {

View file

@ -13,6 +13,7 @@
// limitations under the License. // limitations under the License.
#include "ray/object_manager/object_manager.h" #include "ray/object_manager/object_manager.h"
#include "ray/common/common_protocol.h" #include "ray/common/common_protocol.h"
#include "ray/stats/stats.h" #include "ray/stats/stats.h"
#include "ray/util/util.h" #include "ray/util/util.h"
@ -74,7 +75,7 @@ void ObjectManager::StopRpcService() {
void ObjectManager::HandleObjectAdded( void ObjectManager::HandleObjectAdded(
const object_manager::protocol::ObjectInfoT &object_info) { const object_manager::protocol::ObjectInfoT &object_info) {
// Notify the object directory that the object has been added to this node. // Notify the object directory that the object has been added to this node.
ObjectID object_id = ObjectID::FromPlasmaIdBinary(object_info.object_id); ObjectID object_id = ObjectID::FromBinary(object_info.object_id);
RAY_LOG(DEBUG) << "Object added " << object_id; RAY_LOG(DEBUG) << "Object added " << object_id;
RAY_CHECK(local_objects_.count(object_id) == 0); RAY_CHECK(local_objects_.count(object_id) == 0);
local_objects_[object_id].object_info = object_info; local_objects_[object_id].object_info = object_info;

View file

@ -12,17 +12,16 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#include <future> #include "ray/object_manager/object_store_notification_manager.h"
#include <iostream>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include <boost/function.hpp> #include <boost/function.hpp>
#include <future>
#include "ray/common/status.h" #include <iostream>
#include "ray/common/common_protocol.h" #include "ray/common/common_protocol.h"
#include "ray/object_manager/object_store_notification_manager.h" #include "ray/common/status.h"
#include "ray/util/util.h" #include "ray/util/util.h"
#ifdef _WIN32 #ifdef _WIN32
@ -141,8 +140,7 @@ void ObjectStoreNotificationManager::ProcessStoreNotification(
notification_.data()); notification_.data());
for (size_t i = 0; i < object_notification->object_info()->size(); ++i) { for (size_t i = 0; i < object_notification->object_info()->size(); ++i) {
auto object_info = object_notification->object_info()->Get(i); auto object_info = object_notification->object_info()->Get(i);
const ObjectID object_id = const ObjectID object_id = ObjectID::FromBinary(object_info->object_id()->str());
ObjectID::FromPlasmaIdBinary(object_info->object_id()->str());
if (object_info->is_deletion()) { if (object_info->is_deletion()) {
ProcessStoreRemove(object_id); ProcessStoreRemove(object_id);
} else { } else {

View file

@ -23,8 +23,6 @@
#include <boost/asio/error.hpp> #include <boost/asio/error.hpp>
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include "ray/object_manager/plasma/client.h"
#include "ray/common/client_connection.h" #include "ray/common/client_connection.h"
#include "ray/common/id.h" #include "ray/common/id.h"
#include "ray/common/status.h" #include "ray/common/status.h"

View file

@ -1069,7 +1069,7 @@ Status PlasmaClient::Impl::DecodeNotifications(const uint8_t* buffer,
for (size_t i = 0; i < object_info->object_info()->size(); ++i) { for (size_t i = 0; i < object_info->object_info()->size(); ++i) {
auto info = object_info->object_info()->Get(i); auto info = object_info->object_info()->Get(i);
ObjectID id = ObjectID::from_binary(info->object_id()->str()); ObjectID id = ObjectID::FromBinary(info->object_id()->str());
object_ids->push_back(id); object_ids->push_back(id);
if (info->is_deletion()) { if (info->is_deletion()) {
data_sizes->push_back(-1); data_sizes->push_back(-1);

View file

@ -110,86 +110,6 @@ bool IsPlasmaStoreFull(const arrow::Status& status) {
return IsPlasmaStatus(status, PlasmaErrorCode::PlasmaStoreFull); return IsPlasmaStatus(status, PlasmaErrorCode::PlasmaStoreFull);
} }
UniqueID UniqueID::from_binary(const std::string& binary) {
UniqueID id;
std::memcpy(&id, binary.data(), sizeof(id));
return id;
}
const uint8_t* UniqueID::data() const { return id_; }
uint8_t* UniqueID::mutable_data() { return id_; }
std::string UniqueID::binary() const {
return std::string(reinterpret_cast<const char*>(id_), kUniqueIDSize);
}
std::string UniqueID::hex() const {
constexpr char hex[] = "0123456789abcdef";
std::string result;
for (int i = 0; i < kUniqueIDSize; i++) {
unsigned int val = id_[i];
result.push_back(hex[val >> 4]);
result.push_back(hex[val & 0xf]);
}
return result;
}
// This code is from https://sites.google.com/site/murmurhash/
// and is public domain.
uint64_t MurmurHash64A(const void* key, int len, unsigned int seed) {
const uint64_t m = 0xc6a4a7935bd1e995;
const int r = 47;
uint64_t h = seed ^ (len * m);
const uint64_t* data = reinterpret_cast<const uint64_t*>(key);
const uint64_t* end = data + (len / 8);
while (data != end) {
uint64_t k = arrow::util::SafeLoad(data++);
k *= m;
k ^= k >> r;
k *= m;
h ^= k;
h *= m;
}
const unsigned char* data2 = reinterpret_cast<const unsigned char*>(data);
switch (len & 7) {
case 7:
h ^= uint64_t(data2[6]) << 48; // fall through
case 6:
h ^= uint64_t(data2[5]) << 40; // fall through
case 5:
h ^= uint64_t(data2[4]) << 32; // fall through
case 4:
h ^= uint64_t(data2[3]) << 24; // fall through
case 3:
h ^= uint64_t(data2[2]) << 16; // fall through
case 2:
h ^= uint64_t(data2[1]) << 8; // fall through
case 1:
h ^= uint64_t(data2[0]);
h *= m;
}
h ^= h >> r;
h *= m;
h ^= h >> r;
return h;
}
size_t UniqueID::hash() const { return MurmurHash64A(&id_[0], kUniqueIDSize, 0); }
bool UniqueID::operator==(const UniqueID& rhs) const {
return std::memcmp(data(), rhs.data(), kUniqueIDSize) == 0;
}
const PlasmaStoreInfo* plasma_config; const PlasmaStoreInfo* plasma_config;
} // namespace plasma } // namespace plasma

View file

@ -24,6 +24,7 @@
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
#include "ray/common/id.h"
#include "ray/object_manager/plasma/compat.h" #include "ray/object_manager/plasma/compat.h"
#include "arrow/status.h" #include "arrow/status.h"
@ -33,6 +34,8 @@
namespace plasma { namespace plasma {
using ray::ObjectID;
enum class ObjectLocation : int32_t { Local, Remote, Nonexistent }; enum class ObjectLocation : int32_t { Local, Remote, Nonexistent };
enum class PlasmaErrorCode : int8_t { enum class PlasmaErrorCode : int8_t {
@ -52,27 +55,6 @@ ARROW_EXPORT bool IsPlasmaObjectAlreadySealed(const arrow::Status& status);
/// Return true iff the status indicates the Plasma store reached its capacity limit. /// Return true iff the status indicates the Plasma store reached its capacity limit.
ARROW_EXPORT bool IsPlasmaStoreFull(const arrow::Status& status); ARROW_EXPORT bool IsPlasmaStoreFull(const arrow::Status& status);
constexpr int64_t kUniqueIDSize = 20;
class ARROW_EXPORT UniqueID {
public:
static UniqueID from_binary(const std::string& binary);
bool operator==(const UniqueID& rhs) const;
const uint8_t* data() const;
uint8_t* mutable_data();
std::string binary() const;
std::string hex() const;
size_t hash() const;
static int64_t size() { return kUniqueIDSize; }
private:
uint8_t id_[kUniqueIDSize];
};
static_assert(std::is_pod<UniqueID>::value, "UniqueID must be plain old data");
typedef UniqueID ObjectID;
/// Size of object hash digests. /// Size of object hash digests.
constexpr int64_t kDigestSize = sizeof(uint64_t); constexpr int64_t kDigestSize = sizeof(uint64_t);
@ -142,11 +124,4 @@ struct PlasmaStoreInfo;
extern const PlasmaStoreInfo* plasma_config; extern const PlasmaStoreInfo* plasma_config;
} // namespace plasma } // namespace plasma
namespace std {
template <>
struct hash<::plasma::UniqueID> {
size_t operator()(const ::plasma::UniqueID& id) const { return id.hash(); }
};
} // namespace std
#endif // PLASMA_COMMON_H #endif // PLASMA_COMMON_H

View file

@ -114,7 +114,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_create(
if (plasma::IsPlasmaObjectExists(s)) { if (plasma::IsPlasmaObjectExists(s)) {
jclass exceptionClass = jclass exceptionClass =
env->FindClass("org/apache/arrow/plasma/exceptions/DuplicateObjectException"); env->FindClass("org/apache/arrow/plasma/exceptions/DuplicateObjectException");
env->ThrowNew(exceptionClass, oid.hex().c_str()); env->ThrowNew(exceptionClass, oid.Hex().c_str());
return nullptr; return nullptr;
} }
if (plasma::IsPlasmaStoreFull(s)) { if (plasma::IsPlasmaStoreFull(s)) {

View file

@ -48,7 +48,7 @@ ToFlatbuffer(flatbuffers::FlatBufferBuilder* fbb, const ObjectID* object_ids,
int64_t num_objects) { int64_t num_objects) {
std::vector<flatbuffers::Offset<flatbuffers::String>> results; std::vector<flatbuffers::Offset<flatbuffers::String>> results;
for (int64_t i = 0; i < num_objects; i++) { for (int64_t i = 0; i < num_objects; i++) {
results.push_back(fbb->CreateString(object_ids[i].binary())); results.push_back(fbb->CreateString(object_ids[i].Binary()));
} }
return fbb->CreateVector(arrow::util::MakeNonNull(results.data()), results.size()); return fbb->CreateVector(arrow::util::MakeNonNull(results.data()), results.size());
} }
@ -187,7 +187,7 @@ Status SendCreateRequest(int sock, ObjectID object_id, bool evict_if_full,
int64_t data_size, int64_t metadata_size, int device_num) { int64_t data_size, int64_t metadata_size, int device_num) {
flatbuffers::FlatBufferBuilder fbb; flatbuffers::FlatBufferBuilder fbb;
auto message = auto message =
fb::CreatePlasmaCreateRequest(fbb, fbb.CreateString(object_id.binary()), fb::CreatePlasmaCreateRequest(fbb, fbb.CreateString(object_id.Binary()),
evict_if_full, data_size, metadata_size, device_num); evict_if_full, data_size, metadata_size, device_num);
return PlasmaSend(sock, MessageType::PlasmaCreateRequest, &fbb, message); return PlasmaSend(sock, MessageType::PlasmaCreateRequest, &fbb, message);
} }
@ -201,7 +201,7 @@ Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id,
*evict_if_full = message->evict_if_full(); *evict_if_full = message->evict_if_full();
*data_size = message->data_size(); *data_size = message->data_size();
*metadata_size = message->metadata_size(); *metadata_size = message->metadata_size();
*object_id = ObjectID::from_binary(message->object_id()->str()); *object_id = ObjectID::FromBinary(message->object_id()->str());
*device_num = message->device_num(); *device_num = message->device_num();
return Status::OK(); return Status::OK();
} }
@ -212,7 +212,7 @@ Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object,
PlasmaObjectSpec plasma_object(object->store_fd, object->data_offset, object->data_size, PlasmaObjectSpec plasma_object(object->store_fd, object->data_offset, object->data_size,
object->metadata_offset, object->metadata_size, object->metadata_offset, object->metadata_size,
object->device_num); object->device_num);
auto object_string = fbb.CreateString(object_id.binary()); auto object_string = fbb.CreateString(object_id.Binary());
#ifdef PLASMA_CUDA #ifdef PLASMA_CUDA
flatbuffers::Offset<fb::CudaHandle> ipc_handle; flatbuffers::Offset<fb::CudaHandle> ipc_handle;
if (object->device_num != 0) { if (object->device_num != 0) {
@ -244,7 +244,7 @@ Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id,
DCHECK(data); DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaCreateReply>(data); auto message = flatbuffers::GetRoot<fb::PlasmaCreateReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size)); DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str()); *object_id = ObjectID::FromBinary(message->object_id()->str());
object->store_fd = message->plasma_object()->segment_index(); object->store_fd = message->plasma_object()->segment_index();
object->data_offset = message->plasma_object()->data_offset(); object->data_offset = message->plasma_object()->data_offset();
object->data_size = message->plasma_object()->data_size(); object->data_size = message->plasma_object()->data_size();
@ -271,7 +271,7 @@ Status SendCreateAndSealRequest(int sock, const ObjectID& object_id, bool evict_
flatbuffers::FlatBufferBuilder fbb; flatbuffers::FlatBufferBuilder fbb;
auto digest_string = fbb.CreateString(reinterpret_cast<char*>(digest), kDigestSize); auto digest_string = fbb.CreateString(reinterpret_cast<char*>(digest), kDigestSize);
auto message = fb::CreatePlasmaCreateAndSealRequest( auto message = fb::CreatePlasmaCreateAndSealRequest(
fbb, fbb.CreateString(object_id.binary()), evict_if_full, fbb.CreateString(data), fbb, fbb.CreateString(object_id.Binary()), evict_if_full, fbb.CreateString(data),
fbb.CreateString(metadata), digest_string); fbb.CreateString(metadata), digest_string);
return PlasmaSend(sock, MessageType::PlasmaCreateAndSealRequest, &fbb, message); return PlasmaSend(sock, MessageType::PlasmaCreateAndSealRequest, &fbb, message);
} }
@ -283,7 +283,7 @@ Status ReadCreateAndSealRequest(uint8_t* data, size_t size, ObjectID* object_id,
auto message = flatbuffers::GetRoot<fb::PlasmaCreateAndSealRequest>(data); auto message = flatbuffers::GetRoot<fb::PlasmaCreateAndSealRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size)); DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str()); *object_id = ObjectID::FromBinary(message->object_id()->str());
*evict_if_full = message->evict_if_full(); *evict_if_full = message->evict_if_full();
*object_data = message->data()->str(); *object_data = message->data()->str();
*metadata = message->metadata()->str(); *metadata = message->metadata()->str();
@ -320,7 +320,7 @@ Status ReadCreateAndSealBatchRequest(uint8_t* data, size_t size,
*evict_if_full = message->evict_if_full(); *evict_if_full = message->evict_if_full();
ConvertToVector(message->object_ids(), object_ids, ConvertToVector(message->object_ids(), object_ids,
[](const flatbuffers::String& element) { [](const flatbuffers::String& element) {
return ObjectID::from_binary(element.str()); return ObjectID::FromBinary(element.str());
}); });
ConvertToVector(message->data(), object_data, ConvertToVector(message->data(), object_data,
@ -364,7 +364,7 @@ Status ReadCreateAndSealBatchReply(uint8_t* data, size_t size) {
Status SendAbortRequest(int sock, ObjectID object_id) { Status SendAbortRequest(int sock, ObjectID object_id) {
flatbuffers::FlatBufferBuilder fbb; flatbuffers::FlatBufferBuilder fbb;
auto message = fb::CreatePlasmaAbortRequest(fbb, fbb.CreateString(object_id.binary())); auto message = fb::CreatePlasmaAbortRequest(fbb, fbb.CreateString(object_id.Binary()));
return PlasmaSend(sock, MessageType::PlasmaAbortRequest, &fbb, message); return PlasmaSend(sock, MessageType::PlasmaAbortRequest, &fbb, message);
} }
@ -372,13 +372,13 @@ Status ReadAbortRequest(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data); DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaAbortRequest>(data); auto message = flatbuffers::GetRoot<fb::PlasmaAbortRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size)); DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str()); *object_id = ObjectID::FromBinary(message->object_id()->str());
return Status::OK(); return Status::OK();
} }
Status SendAbortReply(int sock, ObjectID object_id) { Status SendAbortReply(int sock, ObjectID object_id) {
flatbuffers::FlatBufferBuilder fbb; flatbuffers::FlatBufferBuilder fbb;
auto message = fb::CreatePlasmaAbortReply(fbb, fbb.CreateString(object_id.binary())); auto message = fb::CreatePlasmaAbortReply(fbb, fbb.CreateString(object_id.Binary()));
return PlasmaSend(sock, MessageType::PlasmaAbortReply, &fbb, message); return PlasmaSend(sock, MessageType::PlasmaAbortReply, &fbb, message);
} }
@ -386,7 +386,7 @@ Status ReadAbortReply(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data); DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaAbortReply>(data); auto message = flatbuffers::GetRoot<fb::PlasmaAbortReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size)); DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str()); *object_id = ObjectID::FromBinary(message->object_id()->str());
return Status::OK(); return Status::OK();
} }
@ -394,7 +394,7 @@ Status ReadAbortReply(uint8_t* data, size_t size, ObjectID* object_id) {
Status SendSealRequest(int sock, ObjectID object_id, const std::string& digest) { Status SendSealRequest(int sock, ObjectID object_id, const std::string& digest) {
flatbuffers::FlatBufferBuilder fbb; flatbuffers::FlatBufferBuilder fbb;
auto message = fb::CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.binary()), auto message = fb::CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.Binary()),
fbb.CreateString(digest)); fbb.CreateString(digest));
return PlasmaSend(sock, MessageType::PlasmaSealRequest, &fbb, message); return PlasmaSend(sock, MessageType::PlasmaSealRequest, &fbb, message);
} }
@ -404,7 +404,7 @@ Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id,
DCHECK(data); DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaSealRequest>(data); auto message = flatbuffers::GetRoot<fb::PlasmaSealRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size)); DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str()); *object_id = ObjectID::FromBinary(message->object_id()->str());
ARROW_CHECK_EQ(message->digest()->size(), kDigestSize); ARROW_CHECK_EQ(message->digest()->size(), kDigestSize);
digest->assign(message->digest()->data(), kDigestSize); digest->assign(message->digest()->data(), kDigestSize);
return Status::OK(); return Status::OK();
@ -413,7 +413,7 @@ Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id,
Status SendSealReply(int sock, ObjectID object_id, PlasmaError error) { Status SendSealReply(int sock, ObjectID object_id, PlasmaError error) {
flatbuffers::FlatBufferBuilder fbb; flatbuffers::FlatBufferBuilder fbb;
auto message = auto message =
fb::CreatePlasmaSealReply(fbb, fbb.CreateString(object_id.binary()), error); fb::CreatePlasmaSealReply(fbb, fbb.CreateString(object_id.Binary()), error);
return PlasmaSend(sock, MessageType::PlasmaSealReply, &fbb, message); return PlasmaSend(sock, MessageType::PlasmaSealReply, &fbb, message);
} }
@ -421,7 +421,7 @@ Status ReadSealReply(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data); DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaSealReply>(data); auto message = flatbuffers::GetRoot<fb::PlasmaSealReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size)); DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str()); *object_id = ObjectID::FromBinary(message->object_id()->str());
return PlasmaErrorStatus(message->error()); return PlasmaErrorStatus(message->error());
} }
@ -430,7 +430,7 @@ Status ReadSealReply(uint8_t* data, size_t size, ObjectID* object_id) {
Status SendReleaseRequest(int sock, ObjectID object_id) { Status SendReleaseRequest(int sock, ObjectID object_id) {
flatbuffers::FlatBufferBuilder fbb; flatbuffers::FlatBufferBuilder fbb;
auto message = auto message =
fb::CreatePlasmaReleaseRequest(fbb, fbb.CreateString(object_id.binary())); fb::CreatePlasmaReleaseRequest(fbb, fbb.CreateString(object_id.Binary()));
return PlasmaSend(sock, MessageType::PlasmaReleaseRequest, &fbb, message); return PlasmaSend(sock, MessageType::PlasmaReleaseRequest, &fbb, message);
} }
@ -438,14 +438,14 @@ Status ReadReleaseRequest(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data); DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaReleaseRequest>(data); auto message = flatbuffers::GetRoot<fb::PlasmaReleaseRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size)); DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str()); *object_id = ObjectID::FromBinary(message->object_id()->str());
return Status::OK(); return Status::OK();
} }
Status SendReleaseReply(int sock, ObjectID object_id, PlasmaError error) { Status SendReleaseReply(int sock, ObjectID object_id, PlasmaError error) {
flatbuffers::FlatBufferBuilder fbb; flatbuffers::FlatBufferBuilder fbb;
auto message = auto message =
fb::CreatePlasmaReleaseReply(fbb, fbb.CreateString(object_id.binary()), error); fb::CreatePlasmaReleaseReply(fbb, fbb.CreateString(object_id.Binary()), error);
return PlasmaSend(sock, MessageType::PlasmaReleaseReply, &fbb, message); return PlasmaSend(sock, MessageType::PlasmaReleaseReply, &fbb, message);
} }
@ -453,7 +453,7 @@ Status ReadReleaseReply(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data); DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaReleaseReply>(data); auto message = flatbuffers::GetRoot<fb::PlasmaReleaseReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size)); DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str()); *object_id = ObjectID::FromBinary(message->object_id()->str());
return PlasmaErrorStatus(message->error()); return PlasmaErrorStatus(message->error());
} }
@ -475,7 +475,7 @@ Status ReadDeleteRequest(uint8_t* data, size_t size, std::vector<ObjectID>* obje
auto message = flatbuffers::GetRoot<PlasmaDeleteRequest>(data); auto message = flatbuffers::GetRoot<PlasmaDeleteRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size)); DCHECK(VerifyFlatbuffer(message, data, size));
ToVector(*message, object_ids, [](const PlasmaDeleteRequest& request, int i) { ToVector(*message, object_ids, [](const PlasmaDeleteRequest& request, int i) {
return ObjectID::from_binary(request.object_ids()->Get(i)->str()); return ObjectID::FromBinary(request.object_ids()->Get(i)->str());
}); });
return Status::OK(); return Status::OK();
} }
@ -503,7 +503,7 @@ Status ReadDeleteReply(uint8_t* data, size_t size, std::vector<ObjectID>* object
auto message = flatbuffers::GetRoot<PlasmaDeleteReply>(data); auto message = flatbuffers::GetRoot<PlasmaDeleteReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size)); DCHECK(VerifyFlatbuffer(message, data, size));
ToVector(*message, object_ids, [](const PlasmaDeleteReply& request, int i) { ToVector(*message, object_ids, [](const PlasmaDeleteReply& request, int i) {
return ObjectID::from_binary(request.object_ids()->Get(i)->str()); return ObjectID::FromBinary(request.object_ids()->Get(i)->str());
}); });
ToVector(*message, errors, [](const PlasmaDeleteReply& request, int i) { ToVector(*message, errors, [](const PlasmaDeleteReply& request, int i) {
return static_cast<PlasmaError>(request.errors()->data()[i]); return static_cast<PlasmaError>(request.errors()->data()[i]);
@ -516,7 +516,7 @@ Status ReadDeleteReply(uint8_t* data, size_t size, std::vector<ObjectID>* object
Status SendContainsRequest(int sock, ObjectID object_id) { Status SendContainsRequest(int sock, ObjectID object_id) {
flatbuffers::FlatBufferBuilder fbb; flatbuffers::FlatBufferBuilder fbb;
auto message = auto message =
fb::CreatePlasmaContainsRequest(fbb, fbb.CreateString(object_id.binary())); fb::CreatePlasmaContainsRequest(fbb, fbb.CreateString(object_id.Binary()));
return PlasmaSend(sock, MessageType::PlasmaContainsRequest, &fbb, message); return PlasmaSend(sock, MessageType::PlasmaContainsRequest, &fbb, message);
} }
@ -524,13 +524,13 @@ Status ReadContainsRequest(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data); DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaContainsRequest>(data); auto message = flatbuffers::GetRoot<fb::PlasmaContainsRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size)); DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str()); *object_id = ObjectID::FromBinary(message->object_id()->str());
return Status::OK(); return Status::OK();
} }
Status SendContainsReply(int sock, ObjectID object_id, bool has_object) { Status SendContainsReply(int sock, ObjectID object_id, bool has_object) {
flatbuffers::FlatBufferBuilder fbb; flatbuffers::FlatBufferBuilder fbb;
auto message = fb::CreatePlasmaContainsReply(fbb, fbb.CreateString(object_id.binary()), auto message = fb::CreatePlasmaContainsReply(fbb, fbb.CreateString(object_id.Binary()),
has_object); has_object);
return PlasmaSend(sock, MessageType::PlasmaContainsReply, &fbb, message); return PlasmaSend(sock, MessageType::PlasmaContainsReply, &fbb, message);
} }
@ -540,7 +540,7 @@ Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id,
DCHECK(data); DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaContainsReply>(data); auto message = flatbuffers::GetRoot<fb::PlasmaContainsReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size)); DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str()); *object_id = ObjectID::FromBinary(message->object_id()->str());
*has_object = message->has_object(); *has_object = message->has_object();
return Status::OK(); return Status::OK();
} }
@ -563,7 +563,7 @@ Status SendListReply(int sock, const ObjectTable& objects) {
? fbb.CreateString("") ? fbb.CreateString("")
: fbb.CreateString(reinterpret_cast<char*>(entry.second->digest), : fbb.CreateString(reinterpret_cast<char*>(entry.second->digest),
kDigestSize); kDigestSize);
auto info = fb::CreateObjectInfo(fbb, fbb.CreateString(entry.first.binary()), auto info = fb::CreateObjectInfo(fbb, fbb.CreateString(entry.first.Binary()),
entry.second->data_size, entry.second->metadata_size, entry.second->data_size, entry.second->metadata_size,
entry.second->ref_count, entry.second->create_time, entry.second->ref_count, entry.second->create_time,
entry.second->construct_duration, digest); entry.second->construct_duration, digest);
@ -580,7 +580,7 @@ Status ReadListReply(uint8_t* data, size_t size, ObjectTable* objects) {
auto message = flatbuffers::GetRoot<fb::PlasmaListReply>(data); auto message = flatbuffers::GetRoot<fb::PlasmaListReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size)); DCHECK(VerifyFlatbuffer(message, data, size));
for (auto const& object : *message->objects()) { for (auto const& object : *message->objects()) {
ObjectID object_id = ObjectID::from_binary(object->object_id()->str()); ObjectID object_id = ObjectID::FromBinary(object->object_id()->str());
auto entry = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry()); auto entry = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry());
entry->data_size = object->data_size(); entry->data_size = object->data_size();
entry->metadata_size = object->metadata_size(); entry->metadata_size = object->metadata_size();
@ -665,7 +665,7 @@ Status ReadGetRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_
DCHECK(VerifyFlatbuffer(message, data, size)); DCHECK(VerifyFlatbuffer(message, data, size));
for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) { for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
auto object_id = message->object_ids()->Get(i)->str(); auto object_id = message->object_ids()->Get(i)->str();
object_ids.push_back(ObjectID::from_binary(object_id)); object_ids.push_back(ObjectID::FromBinary(object_id));
} }
*timeout_ms = message->timeout_ms(); *timeout_ms = message->timeout_ms();
return Status::OK(); return Status::OK();
@ -712,7 +712,7 @@ Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[],
#endif #endif
DCHECK(VerifyFlatbuffer(message, data, size)); DCHECK(VerifyFlatbuffer(message, data, size));
for (uoffset_t i = 0; i < num_objects; ++i) { for (uoffset_t i = 0; i < num_objects; ++i) {
object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str()); object_ids[i] = ObjectID::FromBinary(message->object_ids()->Get(i)->str());
} }
for (uoffset_t i = 0; i < num_objects; ++i) { for (uoffset_t i = 0; i < num_objects; ++i) {
const PlasmaObjectSpec* object = message->plasma_objects()->Get(i); const PlasmaObjectSpec* object = message->plasma_objects()->Get(i);
@ -753,7 +753,7 @@ Status SendDataRequest(int sock, ObjectID object_id, const char* address, int po
flatbuffers::FlatBufferBuilder fbb; flatbuffers::FlatBufferBuilder fbb;
auto addr = fbb.CreateString(address, strlen(address)); auto addr = fbb.CreateString(address, strlen(address));
auto message = auto message =
fb::CreatePlasmaDataRequest(fbb, fbb.CreateString(object_id.binary()), addr, port); fb::CreatePlasmaDataRequest(fbb, fbb.CreateString(object_id.Binary()), addr, port);
return PlasmaSend(sock, MessageType::PlasmaDataRequest, &fbb, message); return PlasmaSend(sock, MessageType::PlasmaDataRequest, &fbb, message);
} }
@ -763,7 +763,7 @@ Status ReadDataRequest(uint8_t* data, size_t size, ObjectID* object_id, char** a
auto message = flatbuffers::GetRoot<fb::PlasmaDataRequest>(data); auto message = flatbuffers::GetRoot<fb::PlasmaDataRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size)); DCHECK(VerifyFlatbuffer(message, data, size));
DCHECK(message->object_id()->size() == sizeof(ObjectID)); DCHECK(message->object_id()->size() == sizeof(ObjectID));
*object_id = ObjectID::from_binary(message->object_id()->str()); *object_id = ObjectID::FromBinary(message->object_id()->str());
#ifdef _WIN32 #ifdef _WIN32
*address = _strdup(message->address()->c_str()); *address = _strdup(message->address()->c_str());
#else #else
@ -776,7 +776,7 @@ Status ReadDataRequest(uint8_t* data, size_t size, ObjectID* object_id, char** a
Status SendDataReply(int sock, ObjectID object_id, int64_t object_size, Status SendDataReply(int sock, ObjectID object_id, int64_t object_size,
int64_t metadata_size) { int64_t metadata_size) {
flatbuffers::FlatBufferBuilder fbb; flatbuffers::FlatBufferBuilder fbb;
auto message = fb::CreatePlasmaDataReply(fbb, fbb.CreateString(object_id.binary()), auto message = fb::CreatePlasmaDataReply(fbb, fbb.CreateString(object_id.Binary()),
object_size, metadata_size); object_size, metadata_size);
return PlasmaSend(sock, MessageType::PlasmaDataReply, &fbb, message); return PlasmaSend(sock, MessageType::PlasmaDataReply, &fbb, message);
} }
@ -786,7 +786,7 @@ Status ReadDataReply(uint8_t* data, size_t size, ObjectID* object_id,
DCHECK(data); DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaDataReply>(data); auto message = flatbuffers::GetRoot<fb::PlasmaDataReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size)); DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str()); *object_id = ObjectID::FromBinary(message->object_id()->str());
*object_size = static_cast<int64_t>(message->object_size()); *object_size = static_cast<int64_t>(message->object_size());
*metadata_size = static_cast<int64_t>(message->metadata_size()); *metadata_size = static_cast<int64_t>(message->metadata_size());
return Status::OK(); return Status::OK();
@ -810,7 +810,7 @@ Status ReadRefreshLRURequest(uint8_t* data, size_t size,
DCHECK(VerifyFlatbuffer(message, data, size)); DCHECK(VerifyFlatbuffer(message, data, size));
for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) { for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
auto object_id = message->object_ids()->Get(i)->str(); auto object_id = message->object_ids()->Get(i)->str();
object_ids->push_back(ObjectID::from_binary(object_id)); object_ids->push_back(ObjectID::FromBinary(object_id));
} }
return Status::OK(); return Status::OK();
} }

View file

@ -221,7 +221,7 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, bool evict_if_f
int64_t data_size, int64_t metadata_size, int64_t data_size, int64_t metadata_size,
int device_num, Client* client, int device_num, Client* client,
PlasmaObject* result) { PlasmaObject* result) {
ARROW_LOG(DEBUG) << "creating object " << object_id.hex(); ARROW_LOG(DEBUG) << "creating object " << object_id.Hex();
auto entry = GetObjectTableEntry(&store_info_, object_id); auto entry = GetObjectTableEntry(&store_info_, object_id);
if (entry != nullptr) { if (entry != nullptr) {
@ -240,7 +240,7 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, bool evict_if_f
pointer = pointer =
AllocateMemory(total_size, evict_if_full, &fd, &map_size, &offset, client, true); AllocateMemory(total_size, evict_if_full, &fd, &map_size, &offset, client, true);
if (!pointer) { if (!pointer) {
ARROW_LOG(ERROR) << "Not enough memory to create the object " << object_id.hex() ARROW_LOG(ERROR) << "Not enough memory to create the object " << object_id.Hex()
<< ", data_size=" << data_size << ", data_size=" << data_size
<< ", metadata_size=" << metadata_size << ", metadata_size=" << metadata_size
<< ", will send a reply of PlasmaError::OutOfMemory"; << ", will send a reply of PlasmaError::OutOfMemory";
@ -603,7 +603,7 @@ void PlasmaStore::SealObjects(const std::vector<ObjectID>& object_ids,
// Set object construction duration. // Set object construction duration.
entry->construct_duration = std::time(nullptr) - entry->create_time; entry->construct_duration = std::time(nullptr) - entry->create_time;
object_info.object_id = object_ids[i].binary(); object_info.object_id = object_ids[i].Binary();
object_info.data_size = entry->data_size; object_info.data_size = entry->data_size;
object_info.metadata_size = entry->metadata_size; object_info.metadata_size = entry->metadata_size;
object_info.digest = digests[i]; object_info.digest = digests[i];
@ -663,7 +663,7 @@ PlasmaError PlasmaStore::DeleteObject(ObjectID& object_id) {
EraseFromObjectTable(object_id); EraseFromObjectTable(object_id);
// Inform all subscribers that the object has been deleted. // Inform all subscribers that the object has been deleted.
fb::ObjectInfoT notification; fb::ObjectInfoT notification;
notification.object_id = object_id.binary(); notification.object_id = object_id.Binary();
notification.is_deletion = true; notification.is_deletion = true;
PushNotification(&notification); PushNotification(&notification);
@ -678,7 +678,7 @@ void PlasmaStore::EvictObjects(const std::vector<ObjectID>& object_ids) {
std::vector<std::shared_ptr<arrow::Buffer>> evicted_object_data; std::vector<std::shared_ptr<arrow::Buffer>> evicted_object_data;
std::vector<ObjectTableEntry*> evicted_entries; std::vector<ObjectTableEntry*> evicted_entries;
for (const auto& object_id : object_ids) { for (const auto& object_id : object_ids) {
ARROW_LOG(DEBUG) << "evicting object " << object_id.hex(); ARROW_LOG(DEBUG) << "evicting object " << object_id.Hex();
auto entry = GetObjectTableEntry(&store_info_, object_id); auto entry = GetObjectTableEntry(&store_info_, object_id);
// TODO(rkn): This should probably not fail, but should instead throw an // TODO(rkn): This should probably not fail, but should instead throw an
// error. Maybe we should also support deleting objects that have been // error. Maybe we should also support deleting objects that have been
@ -702,7 +702,7 @@ void PlasmaStore::EvictObjects(const std::vector<ObjectID>& object_ids) {
EraseFromObjectTable(object_id); EraseFromObjectTable(object_id);
// Inform all subscribers that the object has been deleted. // Inform all subscribers that the object has been deleted.
fb::ObjectInfoT notification; fb::ObjectInfoT notification;
notification.object_id = object_id.binary(); notification.object_id = object_id.Binary();
notification.is_deletion = true; notification.is_deletion = true;
PushNotification(&notification); PushNotification(&notification);
} }
@ -910,7 +910,7 @@ void PlasmaStore::SubscribeToUpdates(Client* client) {
for (const auto& entry : store_info_.objects) { for (const auto& entry : store_info_.objects) {
if (entry.second->state == ObjectState::PLASMA_SEALED) { if (entry.second->state == ObjectState::PLASMA_SEALED) {
ObjectInfoT info; ObjectInfoT info;
info.object_id = entry.first.binary(); info.object_id = entry.first.Binary();
info.data_size = entry.second->data_size; info.data_size = entry.second->data_size;
info.metadata_size = entry.second->metadata_size; info.metadata_size = entry.second->metadata_size;
info.digest = info.digest =

View file

@ -150,8 +150,8 @@ class TestObjectManagerBase : public ::testing::Test {
int64_t metadata_size = sizeof(metadata); int64_t metadata_size = sizeof(metadata);
std::shared_ptr<arrow::Buffer> data; std::shared_ptr<arrow::Buffer> data;
RAY_ARROW_CHECK_OK( RAY_ARROW_CHECK_OK(
client.Create(object_id.ToPlasmaId(), data_size, metadata, metadata_size, &data)); client.Create(object_id, data_size, metadata, metadata_size, &data));
RAY_ARROW_CHECK_OK(client.Seal(object_id.ToPlasmaId())); RAY_ARROW_CHECK_OK(client.Seal(object_id));
return object_id; return object_id;
} }
@ -270,7 +270,7 @@ class StressTestObjectManager : public TestObjectManagerBase {
plasma::ObjectBuffer GetObject(plasma::PlasmaClient &client, ObjectID &object_id) { plasma::ObjectBuffer GetObject(plasma::PlasmaClient &client, ObjectID &object_id) {
plasma::ObjectBuffer object_buffer; plasma::ObjectBuffer object_buffer;
plasma::ObjectID plasma_id = object_id.ToPlasmaId(); plasma::ObjectID plasma_id = object_id;
RAY_ARROW_CHECK_OK(client.Get(&plasma_id, 1, 0, &object_buffer)); RAY_ARROW_CHECK_OK(client.Get(&plasma_id, 1, 0, &object_buffer));
return object_buffer; return object_buffer;
} }
@ -278,7 +278,7 @@ class StressTestObjectManager : public TestObjectManagerBase {
static unsigned char *GetDigest(plasma::PlasmaClient &client, ObjectID &object_id) { static unsigned char *GetDigest(plasma::PlasmaClient &client, ObjectID &object_id) {
const int64_t size = sizeof(uint64_t); const int64_t size = sizeof(uint64_t);
static unsigned char digest_1[size]; static unsigned char digest_1[size];
RAY_ARROW_CHECK_OK(client.Hash(object_id.ToPlasmaId(), &digest_1[0])); RAY_ARROW_CHECK_OK(client.Hash(object_id, &digest_1[0]));
return digest_1; return digest_1;
} }

View file

@ -149,8 +149,8 @@ class TestObjectManagerBase : public ::testing::Test {
int64_t metadata_size = sizeof(metadata); int64_t metadata_size = sizeof(metadata);
std::shared_ptr<arrow::Buffer> data; std::shared_ptr<arrow::Buffer> data;
RAY_ARROW_CHECK_OK( RAY_ARROW_CHECK_OK(
client.Create(object_id.ToPlasmaId(), data_size, metadata, metadata_size, &data)); client.Create(object_id, data_size, metadata, metadata_size, &data));
RAY_ARROW_CHECK_OK(client.Seal(object_id.ToPlasmaId())); RAY_ARROW_CHECK_OK(client.Seal(object_id));
return object_id; return object_id;
} }

View file

@ -171,7 +171,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
RAY_CHECK_OK(object_manager_.SubscribeObjAdded( RAY_CHECK_OK(object_manager_.SubscribeObjAdded(
[this](const object_manager::protocol::ObjectInfoT &object_info) { [this](const object_manager::protocol::ObjectInfoT &object_info) {
ObjectID object_id = ObjectID::FromPlasmaIdBinary(object_info.object_id); ObjectID object_id = ObjectID::FromBinary(object_info.object_id);
HandleObjectLocal(object_id); HandleObjectLocal(object_id);
})); }));
RAY_CHECK_OK(object_manager_.SubscribeObjDeleted( RAY_CHECK_OK(object_manager_.SubscribeObjDeleted(
@ -2086,9 +2086,9 @@ void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_typ
num_returns -= 1; num_returns -= 1;
} }
// Determine which IDs should be marked as failed. // Determine which IDs should be marked as failed.
std::vector<plasma::ObjectID> objects_to_fail; std::vector<ObjectID> objects_to_fail;
for (int64_t i = 0; i < num_returns; i++) { for (int64_t i = 0; i < num_returns; i++) {
objects_to_fail.push_back(spec.ReturnId(i).ToPlasmaId()); objects_to_fail.push_back(spec.ReturnId(i));
} }
const JobID job_id = task.GetTaskSpecification().JobId(); const JobID job_id = task.GetTaskSpecification().JobId();
MarkObjectsAsFailed(error_type, objects_to_fail, job_id); MarkObjectsAsFailed(error_type, objects_to_fail, job_id);
@ -2102,7 +2102,7 @@ void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_typ
} }
void NodeManager::MarkObjectsAsFailed(const ErrorType &error_type, void NodeManager::MarkObjectsAsFailed(const ErrorType &error_type,
const std::vector<plasma::ObjectID> objects_to_fail, const std::vector<ObjectID> objects_to_fail,
const JobID &job_id) { const JobID &job_id) {
const std::string meta = std::to_string(static_cast<int>(error_type)); const std::string meta = std::to_string(static_cast<int>(error_type));
for (const auto &object_id : objects_to_fail) { for (const auto &object_id : objects_to_fail) {
@ -2887,8 +2887,8 @@ void NodeManager::HandleTaskReconstruction(const TaskID &task_id,
<< "by the redis LRU configuration. Consider increasing the memory " << "by the redis LRU configuration. Consider increasing the memory "
"allocation via " "allocation via "
<< "ray.init(redis_max_memory=<max_memory_bytes>)."; << "ray.init(redis_max_memory=<max_memory_bytes>).";
MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE, MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE, {required_object_id},
{required_object_id.ToPlasmaId()}, JobID::Nil()); JobID::Nil());
} }
})); }));
} }
@ -2925,8 +2925,7 @@ void NodeManager::ResubmitTask(const Task &task, const ObjectID &required_object
gcs::CreateErrorTableData(type, error_message.str(), current_time_ms(), gcs::CreateErrorTableData(type, error_message.str(), current_time_ms(),
task.GetTaskSpecification().JobId()); task.GetTaskSpecification().JobId());
RAY_CHECK_OK(gcs_client_->Errors().AsyncReportJobError(error_data_ptr, nullptr)); RAY_CHECK_OK(gcs_client_->Errors().AsyncReportJobError(error_data_ptr, nullptr));
MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE, MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE, {required_object_id},
{required_object_id.ToPlasmaId()},
task.GetTaskSpecification().JobId()); task.GetTaskSpecification().JobId());
return; return;
} }
@ -3286,7 +3285,7 @@ void NodeManager::ProcessSubscribePlasmaReady(
ray::Status NodeManager::SetupPlasmaSubscription() { ray::Status NodeManager::SetupPlasmaSubscription() {
return object_manager_.SubscribeObjAdded( return object_manager_.SubscribeObjAdded(
[this](const object_manager::protocol::ObjectInfoT &object_info) { [this](const object_manager::protocol::ObjectInfoT &object_info) {
ObjectID object_id = ObjectID::FromPlasmaIdBinary(object_info.object_id); ObjectID object_id = ObjectID::FromBinary(object_info.object_id);
auto waiting_workers = absl::flat_hash_set<std::shared_ptr<Worker>>(); auto waiting_workers = absl::flat_hash_set<std::shared_ptr<Worker>>();
{ {
absl::MutexLock guard(&plasma_object_notification_lock_); absl::MutexLock guard(&plasma_object_notification_lock_);
@ -3396,10 +3395,10 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request,
// the returned buffer. // the returned buffer.
// NOTE: the caller must ensure that the objects already exist in plasma before // NOTE: the caller must ensure that the objects already exist in plasma before
// sending a PinObjectIDs request. // sending a PinObjectIDs request.
std::vector<plasma::ObjectID> plasma_ids; std::vector<ObjectID> object_ids;
plasma_ids.reserve(request.object_ids_size()); object_ids.reserve(request.object_ids_size());
for (const auto &object_id_binary : request.object_ids()) { for (const auto &object_id_binary : request.object_ids()) {
plasma_ids.push_back(plasma::ObjectID::from_binary(object_id_binary)); object_ids.push_back(ObjectID::FromBinary(object_id_binary));
} }
std::vector<plasma::ObjectBuffer> plasma_results; std::vector<plasma::ObjectBuffer> plasma_results;
// TODO(swang): This `Get` has a timeout of 0, so the plasma store will not // TODO(swang): This `Get` has a timeout of 0, so the plasma store will not
@ -3407,7 +3406,7 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request,
// heavy load, then this request can still block the NodeManager event loop // heavy load, then this request can still block the NodeManager event loop
// since we must wait for the plasma store's reply. We should consider using // since we must wait for the plasma store's reply. We should consider using
// an `AsyncGet` instead. // an `AsyncGet` instead.
if (!store_client_.Get(plasma_ids, /*timeout_ms=*/0, &plasma_results).ok()) { if (!store_client_.Get(object_ids, /*timeout_ms=*/0, &plasma_results).ok()) {
RAY_LOG(WARNING) << "Failed to get objects to be pinned from object store."; RAY_LOG(WARNING) << "Failed to get objects to be pinned from object store.";
send_reply_callback(Status::Invalid("Failed to get objects."), nullptr, nullptr); send_reply_callback(Status::Invalid("Failed to get objects."), nullptr, nullptr);
return; return;

View file

@ -231,8 +231,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// \param object_ids The object ids to store error messages into. /// \param object_ids The object ids to store error messages into.
/// \param job_id The optional job to push errors to if the writes fail. /// \param job_id The optional job to push errors to if the writes fail.
void MarkObjectsAsFailed(const ErrorType &error_type, void MarkObjectsAsFailed(const ErrorType &error_type,
const std::vector<plasma::ObjectID> object_ids, const std::vector<ObjectID> object_ids, const JobID &job_id);
const JobID &job_id);
/// This is similar to TreatTaskAsFailed, but it will only mark the task as /// This is similar to TreatTaskAsFailed, but it will only mark the task as
/// failed if at least one of the task's return values is lost. A return /// failed if at least one of the task's return values is lost. A return
/// value is lost if it has been created before, but no longer exists on any /// value is lost if it has been created before, but no longer exists on any

View file

@ -112,8 +112,8 @@ class TestObjectManagerBase : public ::testing::Test {
int64_t metadata_size = sizeof(metadata); int64_t metadata_size = sizeof(metadata);
std::shared_ptr<Buffer> data; std::shared_ptr<Buffer> data;
RAY_ARROW_CHECK_OK( RAY_ARROW_CHECK_OK(
client.Create(object_id.ToPlasmaId(), data_size, metadata, metadata_size, &data)); client.Create(object_id, data_size, metadata, metadata_size, &data));
RAY_ARROW_CHECK_OK(client.Seal(object_id.ToPlasmaId())); RAY_ARROW_CHECK_OK(client.Seal(object_id));
return object_id; return object_id;
} }