mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
Support multiple store providers in ObjectInterface (#5452)
This commit is contained in:
parent
52a7c1d673
commit
eab595777f
10 changed files with 296 additions and 64 deletions
|
@ -3,18 +3,49 @@
|
|||
#include "ray/common/ray_config.h"
|
||||
#include "ray/core_worker/object_interface.h"
|
||||
#include "ray/core_worker/store_provider/local_plasma_provider.h"
|
||||
#include "ray/core_worker/store_provider/memory_store_provider.h"
|
||||
#include "ray/core_worker/store_provider/plasma_store_provider.h"
|
||||
|
||||
namespace ray {
|
||||
|
||||
// Group object ids according the the corresponding store providers.
|
||||
void GroupObjectIdsByStoreProvider(
|
||||
const std::vector<ObjectID> &object_ids,
|
||||
EnumUnorderedMap<StoreProviderType, std::unordered_set<ObjectID>> *results) {
|
||||
// There are two cases:
|
||||
// - for task return objects from direct actor call, use memory store provider;
|
||||
// - all the others use plasma store provider.
|
||||
for (const auto &object_id : object_ids) {
|
||||
auto type = StoreProviderType::PLASMA;
|
||||
// For raylet transport we always use plasma store provider, for direct actor call
|
||||
// there are a few cases:
|
||||
// - objects manually added to store by `ray.put`: for these objects they always use
|
||||
// plasma store provider;
|
||||
// - task arguments: these objects are passed by value, and are not put into store;
|
||||
// - task return objects: these are put into memory store of the task submitter
|
||||
// and are only used locally.
|
||||
// Thus we need to check whether this object is a task return object in additional
|
||||
// to whether it's from direct actor call before we can choose memory store provider.
|
||||
if (object_id.IsReturnObject() &&
|
||||
object_id.GetTransportType() ==
|
||||
static_cast<int>(TaskTransportType::DIRECT_ACTOR)) {
|
||||
type = StoreProviderType::MEMORY;
|
||||
}
|
||||
|
||||
(*results)[type].insert(object_id);
|
||||
}
|
||||
}
|
||||
|
||||
CoreWorkerObjectInterface::CoreWorkerObjectInterface(
|
||||
WorkerContext &worker_context, std::unique_ptr<RayletClient> &raylet_client,
|
||||
const std::string &store_socket)
|
||||
: worker_context_(worker_context),
|
||||
raylet_client_(raylet_client),
|
||||
store_socket_(store_socket) {
|
||||
store_socket_(store_socket),
|
||||
memory_store_(std::make_shared<CoreWorkerMemoryStore>()) {
|
||||
AddStoreProvider(StoreProviderType::LOCAL_PLASMA);
|
||||
AddStoreProvider(StoreProviderType::PLASMA);
|
||||
AddStoreProvider(StoreProviderType::MEMORY);
|
||||
}
|
||||
|
||||
Status CoreWorkerObjectInterface::Put(const RayObject &object, ObjectID *object_id) {
|
||||
|
@ -35,33 +66,49 @@ Status CoreWorkerObjectInterface::Get(const std::vector<ObjectID> &ids,
|
|||
std::vector<std::shared_ptr<RayObject>> *results) {
|
||||
(*results).resize(ids.size(), nullptr);
|
||||
|
||||
// Divide the object ids into two groups: direct call return objects and the rest,
|
||||
// and de-duplicate for each group.
|
||||
std::unordered_set<ObjectID> direct_call_return_ids;
|
||||
std::unordered_set<ObjectID> other_ids;
|
||||
for (const auto &object_id : ids) {
|
||||
if (object_id.IsReturnObject() &&
|
||||
object_id.GetTransportType() ==
|
||||
static_cast<int>(TaskTransportType::DIRECT_ACTOR)) {
|
||||
direct_call_return_ids.insert(object_id);
|
||||
// Divide the object ids by store provider type. For each store provider,
|
||||
// maintain an unordered_set which does proper de-duplication, thus the
|
||||
// store provider could simply assume its object ids don't have duplicates.
|
||||
EnumUnorderedMap<StoreProviderType, std::unordered_set<ObjectID>>
|
||||
object_ids_per_store_provider;
|
||||
GroupObjectIdsByStoreProvider(ids, &object_ids_per_store_provider);
|
||||
|
||||
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> objects;
|
||||
auto remaining_timeout_ms = timeout_ms;
|
||||
|
||||
// Re-order the list so that we always get from plasma store provider first,
|
||||
// since it uses a loop of `FetchOrReconstruct` and plasma `Get`, it's not
|
||||
// desirable if other store providers use up the timeout and leaves no time
|
||||
// for plasma provider to reconstruct the objects as necessary.
|
||||
std::list<
|
||||
std::pair<StoreProviderType, std::reference_wrapper<std::unordered_set<ObjectID>>>>
|
||||
ids_per_provider;
|
||||
for (auto &entry : object_ids_per_store_provider) {
|
||||
auto list_entry = std::make_pair(entry.first, std::ref(entry.second));
|
||||
if (entry.first == StoreProviderType::PLASMA) {
|
||||
ids_per_provider.emplace_front(list_entry);
|
||||
} else {
|
||||
other_ids.insert(object_id);
|
||||
ids_per_provider.emplace_back(list_entry);
|
||||
}
|
||||
}
|
||||
|
||||
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> objects;
|
||||
auto start_time = current_time_ms();
|
||||
// Fetch non-direct-call objects using `PLASMA` store provider.
|
||||
RAY_RETURN_NOT_OK(Get(StoreProviderType::PLASMA, other_ids, timeout_ms, &objects));
|
||||
int64_t duration = current_time_ms() - start_time;
|
||||
int64_t left_timeout_ms =
|
||||
(timeout_ms == -1) ? timeout_ms
|
||||
: std::max(static_cast<int64_t>(0), timeout_ms - duration);
|
||||
|
||||
// Fetch direct call return objects using `LOCAL_PLASMA` store provider.
|
||||
RAY_RETURN_NOT_OK(Get(StoreProviderType::LOCAL_PLASMA, direct_call_return_ids,
|
||||
left_timeout_ms, &objects));
|
||||
// Note that if one store provider uses up the timeout, we will still try the others
|
||||
// with a timeout of 0.
|
||||
for (const auto &entry : object_ids_per_store_provider) {
|
||||
auto start_time = current_time_ms();
|
||||
RAY_RETURN_NOT_OK(
|
||||
GetFromStoreProvider(entry.first, entry.second, remaining_timeout_ms, &objects));
|
||||
if (remaining_timeout_ms > 0) {
|
||||
int64_t duration = current_time_ms() - start_time;
|
||||
remaining_timeout_ms =
|
||||
std::max(static_cast<int64_t>(0), remaining_timeout_ms - duration);
|
||||
}
|
||||
}
|
||||
|
||||
// Loop through `ids` and fill each entry for the `results` vector,
|
||||
// this ensures that entries `results` have exactly the same order as
|
||||
// they are in `ids`. When there are duplicate object ids, all the entries
|
||||
// for the same id are filled in.
|
||||
for (size_t i = 0; i < ids.size(); i++) {
|
||||
(*results)[i] = objects[ids[i]];
|
||||
}
|
||||
|
@ -69,7 +116,7 @@ Status CoreWorkerObjectInterface::Get(const std::vector<ObjectID> &ids,
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status CoreWorkerObjectInterface::Get(
|
||||
Status CoreWorkerObjectInterface::GetFromStoreProvider(
|
||||
StoreProviderType type, const std::unordered_set<ObjectID> &object_ids,
|
||||
int64_t timeout_ms,
|
||||
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> *results) {
|
||||
|
@ -87,17 +134,116 @@ Status CoreWorkerObjectInterface::Get(
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status CoreWorkerObjectInterface::Wait(const std::vector<ObjectID> &object_ids,
|
||||
int num_objects, int64_t timeout_ms,
|
||||
std::vector<bool> *results) {
|
||||
return store_providers_[StoreProviderType::PLASMA]->Wait(
|
||||
object_ids, num_objects, timeout_ms, worker_context_.GetCurrentTaskID(), results);
|
||||
Status CoreWorkerObjectInterface::Wait(const std::vector<ObjectID> &ids, int num_objects,
|
||||
int64_t timeout_ms, std::vector<bool> *results) {
|
||||
(*results).resize(ids.size(), false);
|
||||
|
||||
if (num_objects <= 0 || num_objects > static_cast<int>(ids.size())) {
|
||||
return Status::Invalid("num_objects value is not valid");
|
||||
}
|
||||
|
||||
EnumUnorderedMap<StoreProviderType, std::unordered_set<ObjectID>>
|
||||
object_ids_per_store_provider;
|
||||
GroupObjectIdsByStoreProvider(ids, &object_ids_per_store_provider);
|
||||
|
||||
// Wait from all the store providers with timeout set to 0. This is to avoid the case
|
||||
// where we might use up the entire timeout on trying to get objects from one store
|
||||
// provider before even trying another (which might have all of the objects available).
|
||||
RAY_RETURN_NOT_OK(WaitFromMultipleStoreProviders(ids, object_ids_per_store_provider,
|
||||
/* timeout_ms= */ 0, &num_objects,
|
||||
results));
|
||||
|
||||
if (num_objects > 0) {
|
||||
// Wait from all the store providers with the specified timeout
|
||||
// if the required number of objects haven't been ready yet.
|
||||
RAY_RETURN_NOT_OK(WaitFromMultipleStoreProviders(ids, object_ids_per_store_provider,
|
||||
/* timeout_ms= */ timeout_ms,
|
||||
&num_objects, results));
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status CoreWorkerObjectInterface::WaitFromMultipleStoreProviders(
|
||||
const std::vector<ObjectID> &ids,
|
||||
const EnumUnorderedMap<StoreProviderType, std::unordered_set<ObjectID>>
|
||||
&ids_per_provider,
|
||||
int64_t timeout_ms, int *num_objects, std::vector<bool> *results) {
|
||||
std::unordered_map<ObjectID, int> object_counts;
|
||||
for (const auto &entry : ids) {
|
||||
auto iter = object_counts.find(entry);
|
||||
if (iter == object_counts.end()) {
|
||||
object_counts.emplace(entry, 1);
|
||||
} else {
|
||||
iter->second++;
|
||||
}
|
||||
}
|
||||
|
||||
auto remaining_timeout_ms = timeout_ms;
|
||||
for (const auto &entry : ids_per_provider) {
|
||||
std::unordered_set<ObjectID> objects;
|
||||
auto start_time = current_time_ms();
|
||||
int required_objects = std::min(static_cast<int>(entry.second.size()), *num_objects);
|
||||
RAY_RETURN_NOT_OK(WaitFromStoreProvider(entry.first, entry.second, required_objects,
|
||||
remaining_timeout_ms, &objects));
|
||||
if (remaining_timeout_ms > 0) {
|
||||
int64_t duration = current_time_ms() - start_time;
|
||||
remaining_timeout_ms =
|
||||
std::max(static_cast<int64_t>(0), remaining_timeout_ms - duration);
|
||||
}
|
||||
for (const auto &entry : objects) {
|
||||
*num_objects -= object_counts[entry];
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < ids.size(); i++) {
|
||||
if (objects.count(ids[i]) > 0) {
|
||||
(*results)[i] = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (*num_objects <= 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
};
|
||||
|
||||
Status CoreWorkerObjectInterface::WaitFromStoreProvider(
|
||||
StoreProviderType type, const std::unordered_set<ObjectID> &object_ids,
|
||||
int num_objects, int64_t timeout_ms, std::unordered_set<ObjectID> *results) {
|
||||
std::vector<ObjectID> ids(object_ids.begin(), object_ids.end());
|
||||
if (!ids.empty()) {
|
||||
std::vector<bool> objects;
|
||||
RAY_RETURN_NOT_OK(store_providers_[type]->Wait(
|
||||
ids, num_objects, timeout_ms, worker_context_.GetCurrentTaskID(), &objects));
|
||||
RAY_CHECK(ids.size() == objects.size());
|
||||
for (size_t i = 0; i < objects.size(); i++) {
|
||||
if (objects[i]) {
|
||||
(*results).insert(ids[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status CoreWorkerObjectInterface::Delete(const std::vector<ObjectID> &object_ids,
|
||||
bool local_only, bool delete_creating_tasks) {
|
||||
return store_providers_[StoreProviderType::PLASMA]->Delete(object_ids, local_only,
|
||||
delete_creating_tasks);
|
||||
EnumUnorderedMap<StoreProviderType, std::unordered_set<ObjectID>>
|
||||
object_ids_per_store_provider;
|
||||
GroupObjectIdsByStoreProvider(object_ids, &object_ids_per_store_provider);
|
||||
|
||||
for (const auto &entry : object_ids_per_store_provider) {
|
||||
auto type = entry.first;
|
||||
bool is_plasma = (type == StoreProviderType::PLASMA);
|
||||
|
||||
std::vector<ObjectID> ids(entry.second.begin(), entry.second.end());
|
||||
RAY_RETURN_NOT_OK(store_providers_[type]->Delete(
|
||||
ids, is_plasma ? local_only : false, is_plasma ? delete_creating_tasks : false));
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void CoreWorkerObjectInterface::AddStoreProvider(StoreProviderType type) {
|
||||
|
@ -113,6 +259,10 @@ std::unique_ptr<CoreWorkerStoreProvider> CoreWorkerObjectInterface::CreateStoreP
|
|||
case StoreProviderType::PLASMA:
|
||||
return std::unique_ptr<CoreWorkerStoreProvider>(
|
||||
new CoreWorkerPlasmaStoreProvider(store_socket_, raylet_client_));
|
||||
case StoreProviderType::MEMORY:
|
||||
return std::unique_ptr<CoreWorkerStoreProvider>(
|
||||
new CoreWorkerMemoryStoreProvider(memory_store_));
|
||||
break;
|
||||
default:
|
||||
RAY_LOG(FATAL) << "unknown store provider type " << static_cast<int>(type);
|
||||
return nullptr;
|
||||
|
|
|
@ -13,6 +13,7 @@ namespace ray {
|
|||
|
||||
class CoreWorker;
|
||||
class CoreWorkerStoreProvider;
|
||||
class CoreWorkerMemoryStore;
|
||||
|
||||
/// The interface that contains all `CoreWorker` methods that are related to object store.
|
||||
class CoreWorkerObjectInterface {
|
||||
|
@ -35,7 +36,7 @@ class CoreWorkerObjectInterface {
|
|||
/// \return Status.
|
||||
Status Put(const RayObject &object, const ObjectID &object_id);
|
||||
|
||||
/// Get a list of objects from the object store.
|
||||
/// Get a list of objects from the object store. Duplicate object ids are supported.
|
||||
///
|
||||
/// \param[in] ids IDs of the objects to get.
|
||||
/// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative.
|
||||
|
@ -45,9 +46,13 @@ class CoreWorkerObjectInterface {
|
|||
std::vector<std::shared_ptr<RayObject>> *results);
|
||||
|
||||
/// Wait for a list of objects to appear in the object store.
|
||||
/// Duplicate object ids are supported, and `num_objects` includes duplicate ids in this
|
||||
/// case.
|
||||
/// TODO(zhijunfu): it is probably more clear in semantics to just fail when there
|
||||
/// are duplicates, and require it to be handled at application level.
|
||||
///
|
||||
/// \param[in] IDs of the objects to wait for.
|
||||
/// \param[in] num_returns Number of objects that should appear.
|
||||
/// \param[in] num_objects Number of objects that should appear.
|
||||
/// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative.
|
||||
/// \param[out] results A bitset that indicates each object has appeared or not.
|
||||
/// \return Status.
|
||||
|
@ -66,6 +71,21 @@ class CoreWorkerObjectInterface {
|
|||
bool delete_creating_tasks);
|
||||
|
||||
private:
|
||||
/// Helper function to get a list of objects from different store providers.
|
||||
///
|
||||
/// \param[in] object_ids IDs of the objects to get.
|
||||
/// \param[in] ids_per_provider A map from store provider type to the set of
|
||||
// object ids for that store provider.
|
||||
/// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's -1.
|
||||
/// \param[in/out] num_objects Number of objects that should appear before returning.
|
||||
/// \param[out] results A bitset that indicates each object has appeared or not.
|
||||
/// \return Status.
|
||||
Status WaitFromMultipleStoreProviders(
|
||||
const std::vector<ObjectID> &object_ids,
|
||||
const EnumUnorderedMap<StoreProviderType, std::unordered_set<ObjectID>>
|
||||
&ids_per_provider,
|
||||
int64_t timeout_ms, int *num_objects, std::vector<bool> *results);
|
||||
|
||||
/// Helper function to get a list of objects from a specific store provider.
|
||||
///
|
||||
/// \param[in] type The type of store provider to use.
|
||||
|
@ -73,9 +93,23 @@ class CoreWorkerObjectInterface {
|
|||
/// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's -1.
|
||||
/// \param[out] results Result list of objects data.
|
||||
/// \return Status.
|
||||
Status Get(StoreProviderType type, const std::unordered_set<ObjectID> &object_ids,
|
||||
int64_t timeout_ms,
|
||||
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> *results);
|
||||
Status GetFromStoreProvider(
|
||||
StoreProviderType type, const std::unordered_set<ObjectID> &object_ids,
|
||||
int64_t timeout_ms,
|
||||
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> *results);
|
||||
|
||||
/// Helper function to wait a list of objects from a specific store provider.
|
||||
///
|
||||
/// \param[in] type The type of store provider to use.
|
||||
/// \param[in] object_ids IDs of the objects to wait for.
|
||||
/// \param[in] num_objects Number of objects that should appear before returning.
|
||||
/// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative.
|
||||
/// \param[out] results A bitset that indicates each object has appeared or not.
|
||||
/// \return Status.
|
||||
Status WaitFromStoreProvider(StoreProviderType type,
|
||||
const std::unordered_set<ObjectID> &object_ids,
|
||||
int num_objects, int64_t timeout_ms,
|
||||
std::unordered_set<ObjectID> *results);
|
||||
|
||||
/// Create a new store provider for the specified type on demand.
|
||||
std::unique_ptr<CoreWorkerStoreProvider> CreateStoreProvider(
|
||||
|
@ -92,6 +126,9 @@ class CoreWorkerObjectInterface {
|
|||
/// Store socket name.
|
||||
std::string store_socket_;
|
||||
|
||||
/// In-memory store for return objects. This is used for `MEMORY` store provider.
|
||||
std::shared_ptr<CoreWorkerMemoryStore> memory_store_;
|
||||
|
||||
/// All the store providers supported.
|
||||
EnumUnorderedMap<StoreProviderType, std::unique_ptr<CoreWorkerStoreProvider>>
|
||||
store_providers_;
|
||||
|
|
|
@ -10,7 +10,8 @@ namespace ray {
|
|||
/// A class that represents a `Get` request.
|
||||
class GetRequest {
|
||||
public:
|
||||
GetRequest(std::unordered_set<ObjectID> object_ids, bool remove_after_get);
|
||||
GetRequest(std::unordered_set<ObjectID> object_ids, size_t num_objects,
|
||||
bool remove_after_get);
|
||||
|
||||
const std::unordered_set<ObjectID> &ObjectIds() const;
|
||||
|
||||
|
@ -31,9 +32,11 @@ class GetRequest {
|
|||
void Wait();
|
||||
|
||||
/// The object IDs involved in this request.
|
||||
std::unordered_set<ObjectID> object_ids_;
|
||||
const std::unordered_set<ObjectID> object_ids_;
|
||||
/// The object information for the objects in this request.
|
||||
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> objects_;
|
||||
/// Number of objects required.
|
||||
const size_t num_objects_;
|
||||
|
||||
// Whether the requested objects should be removed from store
|
||||
// after `get` returns.
|
||||
|
@ -44,8 +47,14 @@ class GetRequest {
|
|||
std::condition_variable cv_;
|
||||
};
|
||||
|
||||
GetRequest::GetRequest(std::unordered_set<ObjectID> object_ids, bool remove_after_get)
|
||||
: object_ids_(std::move(object_ids)), remove_after_get_(remove_after_get) {}
|
||||
GetRequest::GetRequest(std::unordered_set<ObjectID> object_ids, size_t num_objects,
|
||||
bool remove_after_get)
|
||||
: object_ids_(std::move(object_ids)),
|
||||
num_objects_(num_objects),
|
||||
remove_after_get_(remove_after_get),
|
||||
is_ready_(false) {
|
||||
RAY_CHECK(num_objects_ <= object_ids_.size());
|
||||
}
|
||||
|
||||
const std::unordered_set<ObjectID> &GetRequest::ObjectIds() const { return object_ids_; }
|
||||
|
||||
|
@ -80,7 +89,7 @@ void GetRequest::Wait() {
|
|||
void GetRequest::Set(const ObjectID &object_id, std::shared_ptr<RayObject> object) {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
objects_.emplace(object_id, object);
|
||||
if (objects_.size() == object_ids_.size()) {
|
||||
if (objects_.size() == num_objects_) {
|
||||
is_ready_ = true;
|
||||
cv_.notify_all();
|
||||
}
|
||||
|
@ -128,7 +137,8 @@ Status CoreWorkerMemoryStore::Put(const ObjectID &object_id, const RayObject &ob
|
|||
}
|
||||
|
||||
Status CoreWorkerMemoryStore::Get(const std::vector<ObjectID> &object_ids,
|
||||
int64_t timeout_ms, bool remove_after_get,
|
||||
int num_objects, int64_t timeout_ms,
|
||||
bool remove_after_get,
|
||||
std::vector<std::shared_ptr<RayObject>> *results) {
|
||||
(*results).resize(object_ids.size(), nullptr);
|
||||
|
||||
|
@ -164,9 +174,16 @@ Status CoreWorkerMemoryStore::Get(const std::vector<ObjectID> &object_ids,
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
if (object_ids.size() - remaining_ids.size() >= static_cast<size_t>(num_objects)) {
|
||||
// Already get enough objects.
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
size_t required_objects = num_objects - (object_ids.size() - remaining_ids.size());
|
||||
|
||||
// Otherwise, create a GetRequest to track remaining objects.
|
||||
get_request =
|
||||
std::make_shared<GetRequest>(std::move(remaining_ids), remove_after_get);
|
||||
get_request = std::make_shared<GetRequest>(std::move(remaining_ids), required_objects,
|
||||
remove_after_get);
|
||||
for (const auto &object_id : get_request->ObjectIds()) {
|
||||
object_get_requests_[object_id].push_back(get_request);
|
||||
}
|
||||
|
|
|
@ -28,13 +28,14 @@ class CoreWorkerMemoryStore {
|
|||
|
||||
/// Get a list of objects from the object store.
|
||||
///
|
||||
/// \param[in] object_ids IDs of the objects to get. Duplicates are allowed.
|
||||
/// \param[in] object_ids IDs of the objects to get. Duplicates are not allowed.
|
||||
/// \param[in] num_objects Number of objects that should appear.
|
||||
/// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative.
|
||||
/// \param[in] remove_after_get When to remove the objects from store after `Get`
|
||||
/// finishes.
|
||||
/// \param[out] results Result list of objects data.
|
||||
/// \return Status.
|
||||
Status Get(const std::vector<ObjectID> &object_ids, int64_t timeout_ms,
|
||||
Status Get(const std::vector<ObjectID> &object_ids, int num_objects, int64_t timeout_ms,
|
||||
bool remove_after_get, std::vector<std::shared_ptr<RayObject>> *results);
|
||||
|
||||
/// Delete a list of objects from the object store.
|
||||
|
|
|
@ -24,21 +24,17 @@ Status CoreWorkerMemoryStoreProvider::Put(const RayObject &object,
|
|||
Status CoreWorkerMemoryStoreProvider::Get(
|
||||
const std::vector<ObjectID> &object_ids, int64_t timeout_ms, const TaskID &task_id,
|
||||
std::vector<std::shared_ptr<RayObject>> *results) {
|
||||
return store_->Get(object_ids, timeout_ms, true, results);
|
||||
return store_->Get(object_ids, object_ids.size(), timeout_ms, true, results);
|
||||
}
|
||||
|
||||
Status CoreWorkerMemoryStoreProvider::Wait(const std::vector<ObjectID> &object_ids,
|
||||
int num_objects, int64_t timeout_ms,
|
||||
const TaskID &task_id,
|
||||
std::vector<bool> *results) {
|
||||
if (num_objects != static_cast<int>(object_ids.size())) {
|
||||
return Status::Invalid("num_objects should equal to number of items in object_ids");
|
||||
}
|
||||
|
||||
(*results).resize(object_ids.size(), false);
|
||||
|
||||
std::vector<std::shared_ptr<RayObject>> result_objects;
|
||||
auto status = store_->Get(object_ids, timeout_ms, false, &result_objects);
|
||||
auto status = store_->Get(object_ids, num_objects, timeout_ms, false, &result_objects);
|
||||
if (status.ok()) {
|
||||
RAY_CHECK(result_objects.size() == object_ids.size());
|
||||
for (size_t i = 0; i < object_ids.size(); i++) {
|
||||
|
|
|
@ -89,7 +89,7 @@ class CoreWorkerStoreProvider {
|
|||
/// Wait for a list of objects to appear in the object store.
|
||||
///
|
||||
/// \param[in] IDs of the objects to wait for.
|
||||
/// \param[in] num_returns Number of objects that should appear.
|
||||
/// \param[in] num_objects Number of objects that should appear before returning.
|
||||
/// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative.
|
||||
/// \param[in] task_id ID for the current task.
|
||||
/// \param[out] results A bitset that indicates each object has appeared or not.
|
||||
|
|
|
@ -106,10 +106,12 @@ CoreWorkerTaskInterface::CoreWorkerTaskInterface(
|
|||
task_submitters_.emplace(TaskTransportType::RAYLET,
|
||||
std::unique_ptr<CoreWorkerRayletTaskSubmitter>(
|
||||
new CoreWorkerRayletTaskSubmitter(raylet_client)));
|
||||
task_submitters_.emplace(TaskTransportType::DIRECT_ACTOR,
|
||||
std::unique_ptr<CoreWorkerDirectActorTaskSubmitter>(
|
||||
new CoreWorkerDirectActorTaskSubmitter(
|
||||
io_service, gcs_client, object_interface)));
|
||||
task_submitters_.emplace(
|
||||
TaskTransportType::DIRECT_ACTOR,
|
||||
std::unique_ptr<CoreWorkerDirectActorTaskSubmitter>(
|
||||
new CoreWorkerDirectActorTaskSubmitter(
|
||||
io_service, gcs_client,
|
||||
object_interface.CreateStoreProvider(StoreProviderType::MEMORY))));
|
||||
}
|
||||
|
||||
void CoreWorkerTaskInterface::BuildCommonTaskSpec(
|
||||
|
|
|
@ -499,7 +499,7 @@ void CoreWorkerTest::TestStoreProvider(StoreProviderType type) {
|
|||
RAY_CHECK_OK(provider.Put(buffers[i], ids[i]));
|
||||
}
|
||||
|
||||
// Test Wait().
|
||||
// Test Wait() with duplicate object ids.
|
||||
std::vector<ObjectID> ids_with_duplicate;
|
||||
ids_with_duplicate.insert(ids_with_duplicate.end(), ids.begin(), ids.end());
|
||||
// add the same ids again to test `Get` with duplicate object ids.
|
||||
|
@ -514,6 +514,13 @@ void CoreWorkerTest::TestStoreProvider(StoreProviderType type) {
|
|||
ASSERT_EQ(wait_results.size(), 5);
|
||||
ASSERT_EQ(wait_results, std::vector<bool>({true, true, true, true, false}));
|
||||
|
||||
// Test Wait() with duplicate object ids, and the required `num_objects`
|
||||
// is less than size of `wait_ids`.
|
||||
wait_results.clear();
|
||||
RAY_CHECK_OK(provider.Wait(wait_ids, 4, -1, RandomTaskId(), &wait_results));
|
||||
ASSERT_EQ(wait_results.size(), 5);
|
||||
ASSERT_EQ(wait_results, std::vector<bool>({true, true, true, true, false}));
|
||||
|
||||
// Test Get().
|
||||
std::vector<std::shared_ptr<RayObject>> results;
|
||||
RAY_CHECK_OK(provider.Get(ids_with_duplicate, -1, RandomTaskId(), &results));
|
||||
|
@ -542,6 +549,29 @@ void CoreWorkerTest::TestStoreProvider(StoreProviderType type) {
|
|||
ASSERT_EQ(results.size(), 2);
|
||||
ASSERT_TRUE(!results[0]);
|
||||
ASSERT_TRUE(!results[1]);
|
||||
|
||||
// Test Wait() with objects which will become ready later.
|
||||
std::vector<ObjectID> unready_ids(buffers.size());
|
||||
for (size_t i = 0; i < unready_ids.size(); i++) {
|
||||
unready_ids[i] = ObjectID::FromRandom();
|
||||
}
|
||||
|
||||
auto thread_func = [&unready_ids, &provider, &buffers]() {
|
||||
sleep(1);
|
||||
|
||||
for (size_t i = 0; i < unready_ids.size(); i++) {
|
||||
RAY_CHECK_OK(provider.Put(buffers[i], unready_ids[i]));
|
||||
}
|
||||
};
|
||||
|
||||
std::thread async_thread(thread_func);
|
||||
|
||||
// wait for the objects to appear.
|
||||
wait_results.clear();
|
||||
RAY_CHECK_OK(
|
||||
provider.Wait(unready_ids, unready_ids.size(), -1, RandomTaskId(), &wait_results));
|
||||
// wait for the thread to finish.
|
||||
async_thread.join();
|
||||
}
|
||||
|
||||
class ZeroNodeTest : public CoreWorkerTest {
|
||||
|
|
|
@ -17,12 +17,11 @@ bool HasByReferenceArgs(const TaskSpecification &spec) {
|
|||
|
||||
CoreWorkerDirectActorTaskSubmitter::CoreWorkerDirectActorTaskSubmitter(
|
||||
boost::asio::io_service &io_service, gcs::RedisGcsClient &gcs_client,
|
||||
CoreWorkerObjectInterface &object_interface)
|
||||
std::unique_ptr<CoreWorkerStoreProvider> store_provider)
|
||||
: io_service_(io_service),
|
||||
gcs_client_(gcs_client),
|
||||
client_call_manager_(io_service),
|
||||
store_provider_(
|
||||
object_interface.CreateStoreProvider(StoreProviderType::LOCAL_PLASMA)) {
|
||||
store_provider_(std::move(store_provider)) {
|
||||
RAY_CHECK_OK(SubscribeActorUpdates());
|
||||
}
|
||||
|
||||
|
|
|
@ -28,9 +28,9 @@ struct ActorStateData {
|
|||
|
||||
class CoreWorkerDirectActorTaskSubmitter : public CoreWorkerTaskSubmitter {
|
||||
public:
|
||||
CoreWorkerDirectActorTaskSubmitter(boost::asio::io_service &io_service,
|
||||
gcs::RedisGcsClient &gcs_client,
|
||||
CoreWorkerObjectInterface &object_interface);
|
||||
CoreWorkerDirectActorTaskSubmitter(
|
||||
boost::asio::io_service &io_service, gcs::RedisGcsClient &gcs_client,
|
||||
std::unique_ptr<CoreWorkerStoreProvider> store_provider);
|
||||
|
||||
/// Submit a task to an actor for execution.
|
||||
///
|
||||
|
|
Loading…
Add table
Reference in a new issue