mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
This reverts commit 94a819d00e
.
This commit is contained in:
parent
6a0b306221
commit
296792f963
11 changed files with 83 additions and 261 deletions
|
@ -641,7 +641,7 @@ class GlobalState:
|
|||
object_ref, remote_node_id, _, _ = event["extra_data"]
|
||||
|
||||
elif event["event_type"] == "transfer_receive":
|
||||
object_ref, remote_node_id, _ = event["extra_data"]
|
||||
object_ref, remote_node_id, _, _ = event["extra_data"]
|
||||
|
||||
elif event["event_type"] == "receive_pull_request":
|
||||
object_ref, remote_node_id = event["extra_data"]
|
||||
|
|
|
@ -123,44 +123,6 @@ def test_load_balancing_with_dependencies(ray_start_cluster, fast):
|
|||
attempt_to_load_balance(f, [x], 100, num_nodes, 25)
|
||||
|
||||
|
||||
def test_load_balancing_under_constrained_memory(ray_start_cluster):
|
||||
# This test ensures that tasks are being assigned to all raylets in a
|
||||
# roughly equal manner even when the tasks have dependencies.
|
||||
cluster = ray_start_cluster
|
||||
num_nodes = 3
|
||||
num_cpus = 4
|
||||
object_size = 4e7
|
||||
num_tasks = 100
|
||||
for _ in range(num_nodes):
|
||||
cluster.add_node(
|
||||
num_cpus=num_cpus,
|
||||
memory=(num_cpus - 2) * object_size,
|
||||
object_store_memory=(num_cpus - 2) * object_size)
|
||||
cluster.add_node(
|
||||
num_cpus=0,
|
||||
resources={"custom": 1},
|
||||
memory=(num_tasks + 1) * object_size,
|
||||
object_store_memory=(num_tasks + 1) * object_size)
|
||||
ray.init(address=cluster.address)
|
||||
|
||||
@ray.remote(num_cpus=0, resources={"custom": 1})
|
||||
def create_object():
|
||||
return np.zeros(int(object_size), dtype=np.uint8)
|
||||
|
||||
@ray.remote
|
||||
def f(i, x):
|
||||
time.sleep(0.1)
|
||||
print(i, ray.worker.global_worker.node.unique_id)
|
||||
return ray.worker.global_worker.node.unique_id
|
||||
|
||||
# TODO(swang): Actually test load balancing.
|
||||
deps = [create_object.remote() for _ in range(num_tasks)]
|
||||
tasks = [f.remote(i, dep) for i, dep in enumerate(deps)]
|
||||
for i, dep in enumerate(deps):
|
||||
print(i, dep)
|
||||
ray.get(tasks)
|
||||
|
||||
|
||||
def test_locality_aware_leasing(ray_start_cluster):
|
||||
# This test ensures that a task will run where its task dependencies are
|
||||
# located. We run an initial non_local() task that is pinned to a
|
||||
|
|
|
@ -134,7 +134,7 @@ std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status> ObjectBufferPool::Cr
|
|||
// There can be only one reference to this chunk at any given time.
|
||||
return std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status>(
|
||||
errored_chunk_,
|
||||
ray::Status::IOError("Chunk already received by a different thread."));
|
||||
ray::Status::IOError("Chunk already referenced by another thread."));
|
||||
}
|
||||
create_buffer_state_[object_id].chunk_state[chunk_index] = CreateChunkState::REFERENCED;
|
||||
return std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status>(
|
||||
|
@ -164,34 +164,23 @@ void ObjectBufferPool::AbortCreateChunk(const ObjectID &object_id,
|
|||
|
||||
void ObjectBufferPool::SealChunk(const ObjectID &object_id, const uint64_t chunk_index) {
|
||||
std::lock_guard<std::mutex> lock(pool_mutex_);
|
||||
auto it = create_buffer_state_.find(object_id);
|
||||
if (it == create_buffer_state_.end() ||
|
||||
it->second.chunk_state[chunk_index] != CreateChunkState::REFERENCED) {
|
||||
RAY_LOG(DEBUG) << "Object " << object_id << " aborted due to OOM before chunk "
|
||||
<< chunk_index << " could be sealed";
|
||||
return;
|
||||
}
|
||||
it->second.chunk_state[chunk_index] = CreateChunkState::SEALED;
|
||||
it->second.num_seals_remaining--;
|
||||
if (it->second.num_seals_remaining == 0) {
|
||||
RAY_CHECK(create_buffer_state_[object_id].chunk_state[chunk_index] ==
|
||||
CreateChunkState::REFERENCED);
|
||||
create_buffer_state_[object_id].chunk_state[chunk_index] = CreateChunkState::SEALED;
|
||||
create_buffer_state_[object_id].num_seals_remaining--;
|
||||
if (create_buffer_state_[object_id].num_seals_remaining == 0) {
|
||||
RAY_CHECK_OK(store_client_.Seal(object_id));
|
||||
RAY_CHECK_OK(store_client_.Release(object_id));
|
||||
create_buffer_state_.erase(it);
|
||||
create_buffer_state_.erase(object_id);
|
||||
RAY_LOG(DEBUG) << "Have received all chunks for object " << object_id
|
||||
<< ", last chunk index: " << chunk_index;
|
||||
}
|
||||
}
|
||||
|
||||
void ObjectBufferPool::AbortCreate(const ObjectID &object_id) {
|
||||
std::lock_guard<std::mutex> lock(pool_mutex_);
|
||||
auto it = create_buffer_state_.find(object_id);
|
||||
if (it != create_buffer_state_.end()) {
|
||||
RAY_LOG(INFO) << "Not enough memory to create requested object " << object_id
|
||||
<< ", aborting";
|
||||
RAY_CHECK_OK(store_client_.Release(object_id));
|
||||
RAY_CHECK_OK(store_client_.Abort(object_id));
|
||||
create_buffer_state_.erase(object_id);
|
||||
}
|
||||
RAY_CHECK_OK(store_client_.Release(object_id));
|
||||
RAY_CHECK_OK(store_client_.Abort(object_id));
|
||||
create_buffer_state_.erase(object_id);
|
||||
}
|
||||
|
||||
std::vector<ObjectBufferPool::ChunkInfo> ObjectBufferPool::BuildChunks(
|
||||
|
|
|
@ -137,16 +137,16 @@ class ObjectBufferPool {
|
|||
/// \return Void.
|
||||
void FreeObjects(const std::vector<ObjectID> &object_ids);
|
||||
|
||||
/// Abort the create operation associated with an object. This destroys the buffer
|
||||
/// state, including create operations in progress for all chunks of the object.
|
||||
void AbortCreate(const ObjectID &object_id);
|
||||
|
||||
/// Returns debug string for class.
|
||||
///
|
||||
/// \return string.
|
||||
std::string DebugString() const;
|
||||
|
||||
private:
|
||||
/// Abort the create operation associated with an object. This destroys the buffer
|
||||
/// state, including create operations in progress for all chunks of the object.
|
||||
void AbortCreate(const ObjectID &object_id);
|
||||
|
||||
/// Abort the get operation associated with an object.
|
||||
void AbortGet(const ObjectID &object_id);
|
||||
|
||||
|
|
|
@ -94,20 +94,14 @@ ObjectManager::ObjectManager(asio::io_service &main_service, const NodeID &self_
|
|||
const NodeID &client_id) {
|
||||
SendPullRequest(object_id, client_id);
|
||||
};
|
||||
const auto &cancel_pull_request = [this](const ObjectID &object_id) {
|
||||
// We must abort this object because it may have only been partially
|
||||
// created and will cause a leak if we never receive the rest of the
|
||||
// object. This is a no-op if the object is already sealed or evicted.
|
||||
buffer_pool_.AbortCreate(object_id);
|
||||
};
|
||||
const auto &get_time = []() { return absl::GetCurrentTimeNanos() / 1e9; };
|
||||
int64_t available_memory = config.object_store_memory;
|
||||
if (available_memory < 0) {
|
||||
available_memory = 0;
|
||||
}
|
||||
pull_manager_.reset(new PullManager(
|
||||
self_node_id_, object_is_local, send_pull_request, cancel_pull_request,
|
||||
restore_spilled_object_, get_time, config.pull_timeout_ms, available_memory,
|
||||
self_node_id_, object_is_local, send_pull_request, restore_spilled_object_,
|
||||
get_time, config.pull_timeout_ms, available_memory,
|
||||
[spill_objects_callback, object_store_full_callback]() {
|
||||
// TODO(swang): This copies the out-of-memory handling in the
|
||||
// CreateRequestQueue. It would be nice to unify these.
|
||||
|
@ -316,15 +310,22 @@ void ObjectManager::HandleSendFinished(const ObjectID &object_id, const NodeID &
|
|||
|
||||
void ObjectManager::HandleReceiveFinished(const ObjectID &object_id,
|
||||
const NodeID &node_id, uint64_t chunk_index,
|
||||
double start_time, double end_time) {
|
||||
double start_time, double end_time,
|
||||
ray::Status status) {
|
||||
if (!status.ok()) {
|
||||
// TODO(rkn): What do we want to do if the send failed?
|
||||
}
|
||||
|
||||
rpc::ProfileTableData::ProfileEvent profile_event;
|
||||
profile_event.set_event_type("transfer_receive");
|
||||
profile_event.set_start_time(start_time);
|
||||
profile_event.set_end_time(end_time);
|
||||
// Encode the object ID, node ID, chunk index as a json list,
|
||||
// Encode the object ID, node ID, chunk index, and status as a json list,
|
||||
// which will be parsed by the reader of the profile table.
|
||||
|
||||
profile_event.set_extra_data("[\"" + object_id.Hex() + "\",\"" + node_id.Hex() + "\"," +
|
||||
std::to_string(chunk_index) + "]");
|
||||
std::to_string(chunk_index) + ",\"" + status.ToString() +
|
||||
"\"]");
|
||||
|
||||
std::lock_guard<std::mutex> lock(profile_mutex_);
|
||||
profile_events_.push_back(profile_event);
|
||||
|
@ -665,57 +666,43 @@ void ObjectManager::HandlePush(const rpc::PushRequest &request, rpc::PushReply *
|
|||
const std::string &data = request.data();
|
||||
|
||||
double start_time = absl::GetCurrentTimeNanos() / 1e9;
|
||||
bool success = ReceiveObjectChunk(node_id, object_id, owner_address, data_size,
|
||||
metadata_size, chunk_index, data);
|
||||
if (!success) {
|
||||
num_chunks_received_failed_++;
|
||||
RAY_LOG(INFO) << "Received duplicate or cancelled chunk at index " << chunk_index
|
||||
<< " of object " << object_id << ": overall "
|
||||
<< num_chunks_received_failed_ << "/" << num_chunks_received_total_
|
||||
<< " failed";
|
||||
}
|
||||
auto status = ReceiveObjectChunk(node_id, object_id, owner_address, data_size,
|
||||
metadata_size, chunk_index, data);
|
||||
double end_time = absl::GetCurrentTimeNanos() / 1e9;
|
||||
|
||||
HandleReceiveFinished(object_id, node_id, chunk_index, start_time, end_time);
|
||||
send_reply_callback(Status::OK(), nullptr, nullptr);
|
||||
HandleReceiveFinished(object_id, node_id, chunk_index, start_time, end_time, status);
|
||||
send_reply_callback(status, nullptr, nullptr);
|
||||
}
|
||||
|
||||
bool ObjectManager::ReceiveObjectChunk(const NodeID &node_id, const ObjectID &object_id,
|
||||
const rpc::Address &owner_address,
|
||||
uint64_t data_size, uint64_t metadata_size,
|
||||
uint64_t chunk_index, const std::string &data) {
|
||||
num_chunks_received_total_++;
|
||||
ray::Status ObjectManager::ReceiveObjectChunk(const NodeID &node_id,
|
||||
const ObjectID &object_id,
|
||||
const rpc::Address &owner_address,
|
||||
uint64_t data_size, uint64_t metadata_size,
|
||||
uint64_t chunk_index,
|
||||
const std::string &data) {
|
||||
RAY_LOG(DEBUG) << "ReceiveObjectChunk on " << self_node_id_ << " from " << node_id
|
||||
<< " of object " << object_id << " chunk index: " << chunk_index
|
||||
<< ", chunk data size: " << data.size()
|
||||
<< ", object size: " << data_size;
|
||||
|
||||
if (!pull_manager_->IsObjectActive(object_id)) {
|
||||
// This object is no longer being actively pulled. Do not create the object.
|
||||
return false;
|
||||
}
|
||||
std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status> chunk_status =
|
||||
buffer_pool_.CreateChunk(object_id, owner_address, data_size, metadata_size,
|
||||
chunk_index);
|
||||
if (!pull_manager_->IsObjectActive(object_id)) {
|
||||
// This object is no longer being actively pulled. Abort the object. We
|
||||
// have to check again here because the pull manager runs in a different
|
||||
// thread and the object may have been deactivated right before creating
|
||||
// the chunk.
|
||||
buffer_pool_.AbortCreate(object_id);
|
||||
return false;
|
||||
}
|
||||
|
||||
ray::Status status;
|
||||
ObjectBufferPool::ChunkInfo chunk_info = chunk_status.first;
|
||||
num_chunks_received_total_++;
|
||||
if (chunk_status.second.ok()) {
|
||||
// Avoid handling this chunk if it's already being handled by another process.
|
||||
std::memcpy(chunk_info.data, data.data(), chunk_info.buffer_length);
|
||||
buffer_pool_.SealChunk(object_id, chunk_index);
|
||||
return true;
|
||||
} else {
|
||||
RAY_LOG(INFO) << "Error receiving chunk:" << chunk_status.second.message();
|
||||
return false;
|
||||
num_chunks_received_failed_++;
|
||||
RAY_LOG(INFO) << "ReceiveObjectChunk index " << chunk_index << " of object "
|
||||
<< object_id << " failed: " << chunk_status.second.message()
|
||||
<< ", overall " << num_chunks_received_failed_ << "/"
|
||||
<< num_chunks_received_total_ << " failed";
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
void ObjectManager::HandlePull(const rpc::PullRequest &request, rpc::PullReply *reply,
|
||||
|
|
|
@ -159,15 +159,7 @@ class ObjectManager : public ObjectManagerInterface,
|
|||
std::shared_ptr<rpc::ObjectManagerClient> rpc_client,
|
||||
std::function<void(const Status &)> on_complete);
|
||||
|
||||
/// Receive an object chunk from a remote object manager. Small object may
|
||||
/// fit in one chunk.
|
||||
///
|
||||
/// If this is the last remaining chunk for an object, then the object will
|
||||
/// be sealed. Else, we will keep the plasma buffer open until the remaining
|
||||
/// chunks are received.
|
||||
///
|
||||
/// If the object is no longer being actively pulled, the object will not be
|
||||
/// created.
|
||||
/// Receive object chunk from remote object manager, small object may contain one chunk
|
||||
///
|
||||
/// \param node_id Node id of remote object manager which sends this chunk
|
||||
/// \param object_id Object id
|
||||
|
@ -176,13 +168,10 @@ class ObjectManager : public ObjectManagerInterface,
|
|||
/// \param metadata_size Metadata size
|
||||
/// \param chunk_index Chunk index
|
||||
/// \param data Chunk data
|
||||
/// \return Whether the chunk was successfully written into the local object
|
||||
/// store. This can fail if the chunk was already received in the past, or if
|
||||
/// the object is no longer being actively pulled.
|
||||
bool ReceiveObjectChunk(const NodeID &node_id, const ObjectID &object_id,
|
||||
const rpc::Address &owner_address, uint64_t data_size,
|
||||
uint64_t metadata_size, uint64_t chunk_index,
|
||||
const std::string &data);
|
||||
ray::Status ReceiveObjectChunk(const NodeID &node_id, const ObjectID &object_id,
|
||||
const rpc::Address &owner_address, uint64_t data_size,
|
||||
uint64_t metadata_size, uint64_t chunk_index,
|
||||
const std::string &data);
|
||||
|
||||
/// Send pull request
|
||||
///
|
||||
|
@ -404,10 +393,11 @@ class ObjectManager : public ObjectManagerInterface,
|
|||
/// chunk.
|
||||
/// \param end_time_us The time when the object manager finished receiving the
|
||||
/// chunk.
|
||||
/// \param status The status of the receive (e.g., did it succeed or fail).
|
||||
/// \return Void.
|
||||
void HandleReceiveFinished(const ObjectID &object_id, const NodeID &node_id,
|
||||
uint64_t chunk_index, double start_time_us,
|
||||
double end_time_us);
|
||||
double end_time_us, ray::Status status);
|
||||
|
||||
/// Handle Push task timeout.
|
||||
void HandlePushTaskTimeout(const ObjectID &object_id, const NodeID &node_id);
|
||||
|
|
|
@ -166,8 +166,6 @@ void PlasmaStore::AddToClientObjectIds(const ObjectID &object_id, ObjectTableEnt
|
|||
}
|
||||
// Increase reference count.
|
||||
entry->ref_count++;
|
||||
RAY_LOG(DEBUG) << "Object " << object_id << " in use by client"
|
||||
<< ", num bytes in use is now " << num_bytes_in_use_;
|
||||
|
||||
// Add object id to the list of object ids that this client is using.
|
||||
client->object_ids.insert(object_id);
|
||||
|
@ -327,8 +325,6 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID &object_id,
|
|||
eviction_policy_.ObjectCreated(object_id, client.get(), true);
|
||||
// Record that this client is using this object.
|
||||
AddToClientObjectIds(object_id, store_info_.objects[object_id].get(), client);
|
||||
num_objects_unsealed_++;
|
||||
num_bytes_unsealed_ += data_size + metadata_size;
|
||||
return PlasmaError::OK;
|
||||
}
|
||||
|
||||
|
@ -542,14 +538,12 @@ int PlasmaStore::RemoveFromClientObjectIds(const ObjectID &object_id,
|
|||
client->object_ids.erase(it);
|
||||
// Decrease reference count.
|
||||
entry->ref_count--;
|
||||
RAY_LOG(DEBUG) << "Object " << object_id << " no longer in use by client";
|
||||
|
||||
// If no more clients are using this object, notify the eviction policy
|
||||
// that the object is no longer being used.
|
||||
if (entry->ref_count == 0) {
|
||||
num_bytes_in_use_ -= entry->data_size + entry->metadata_size;
|
||||
RAY_LOG(DEBUG) << "Releasing object no longer in use " << object_id
|
||||
<< ", num bytes in use is now " << num_bytes_in_use_;
|
||||
RAY_LOG(DEBUG) << "Releasing object no longer in use " << object_id;
|
||||
if (deletion_cache_.count(object_id) == 0) {
|
||||
// Tell the eviction policy that this object is no longer being used.
|
||||
eviction_policy_.EndObjectAccess(object_id);
|
||||
|
@ -574,15 +568,9 @@ void PlasmaStore::EraseFromObjectTable(const ObjectID &object_id) {
|
|||
if (object->device_num == 0) {
|
||||
PlasmaAllocator::Free(object->pointer, buff_size);
|
||||
}
|
||||
if (object->state == ObjectState::PLASMA_CREATED) {
|
||||
num_bytes_unsealed_ -= object->data_size + object->metadata_size;
|
||||
num_objects_unsealed_--;
|
||||
}
|
||||
if (object->ref_count > 0) {
|
||||
// A client was using this object.
|
||||
num_bytes_in_use_ -= object->data_size + object->metadata_size;
|
||||
RAY_LOG(DEBUG) << "Erasing object " << object_id << " with nonzero ref count"
|
||||
<< object_id << ", num bytes in use is now " << num_bytes_in_use_;
|
||||
}
|
||||
store_info_.objects.erase(object_id);
|
||||
}
|
||||
|
@ -626,9 +614,6 @@ void PlasmaStore::SealObjects(const std::vector<ObjectID> &object_ids) {
|
|||
object_info.owner_worker_id = entry->owner_worker_id.Binary();
|
||||
object_info.metadata_size = entry->metadata_size;
|
||||
infos.push_back(object_info);
|
||||
|
||||
num_objects_unsealed_--;
|
||||
num_bytes_unsealed_ -= entry->data_size + entry->metadata_size;
|
||||
}
|
||||
|
||||
PushNotifications(infos);
|
||||
|
|
|
@ -210,18 +210,8 @@ class PlasmaStore {
|
|||
/// Process queued requests to create an object.
|
||||
void ProcessCreateRequests();
|
||||
|
||||
/// Get the available memory for new objects to be created. This includes
|
||||
/// memory that is currently being used for created but unsealed objects.
|
||||
void GetAvailableMemory(std::function<void(size_t)> callback) const {
|
||||
RAY_CHECK((num_bytes_unsealed_ > 0 && num_objects_unsealed_ > 0) ||
|
||||
(num_bytes_unsealed_ == 0 && num_objects_unsealed_ == 0))
|
||||
<< "Tracking for available memory in the plasma store has gone out of sync. "
|
||||
"Please file a GitHub issue.";
|
||||
RAY_CHECK(num_bytes_in_use_ >= num_bytes_unsealed_);
|
||||
// We do not count unsealed objects as in use because these may have been
|
||||
// created by the object manager.
|
||||
int64_t num_bytes_in_use =
|
||||
static_cast<int64_t>(num_bytes_in_use_ - num_bytes_unsealed_);
|
||||
int64_t num_bytes_in_use = static_cast<int64_t>(num_bytes_in_use_);
|
||||
RAY_CHECK(PlasmaAllocator::GetFootprintLimit() >= num_bytes_in_use);
|
||||
size_t available = PlasmaAllocator::GetFootprintLimit() - num_bytes_in_use;
|
||||
callback(available);
|
||||
|
@ -324,18 +314,8 @@ class PlasmaStore {
|
|||
/// mutex if it is not absolutely necessary.
|
||||
std::recursive_mutex mutex_;
|
||||
|
||||
/// Total number of bytes allocated to objects that are in use by any client.
|
||||
/// This includes objects that are being created and objects that a client
|
||||
/// called get on.
|
||||
size_t num_bytes_in_use_ = 0;
|
||||
|
||||
/// Total number of bytes allocated to objects that are created but not yet
|
||||
/// sealed.
|
||||
size_t num_bytes_unsealed_ = 0;
|
||||
|
||||
/// Number of objects that are created but not sealed.
|
||||
size_t num_objects_unsealed_ = 0;
|
||||
|
||||
/// Total plasma object bytes that are consumed by core workers.
|
||||
int64_t total_consumed_bytes_ = 0;
|
||||
};
|
||||
|
|
|
@ -7,14 +7,12 @@ namespace ray {
|
|||
PullManager::PullManager(
|
||||
NodeID &self_node_id, const std::function<bool(const ObjectID &)> object_is_local,
|
||||
const std::function<void(const ObjectID &, const NodeID &)> send_pull_request,
|
||||
const std::function<void(const ObjectID &)> cancel_pull_request,
|
||||
const RestoreSpilledObjectCallback restore_spilled_object,
|
||||
const std::function<double()> get_time, int pull_timeout_ms,
|
||||
size_t num_bytes_available, std::function<void()> object_store_full_callback)
|
||||
: self_node_id_(self_node_id),
|
||||
object_is_local_(object_is_local),
|
||||
send_pull_request_(send_pull_request),
|
||||
cancel_pull_request_(cancel_pull_request),
|
||||
restore_spilled_object_(restore_spilled_object),
|
||||
get_time_(get_time),
|
||||
pull_timeout_ms_(pull_timeout_ms),
|
||||
|
@ -74,7 +72,6 @@ bool PullManager::ActivateNextPullBundleRequest(
|
|||
|
||||
// Activate the bundle.
|
||||
for (const auto &ref : next_request_it->second) {
|
||||
absl::MutexLock lock(&active_objects_mu_);
|
||||
auto obj_id = ObjectRefToId(ref);
|
||||
bool start_pull = active_object_pull_requests_.count(obj_id) == 0;
|
||||
active_object_pull_requests_[obj_id].insert(next_request_it->first);
|
||||
|
@ -86,8 +83,6 @@ bool PullManager::ActivateNextPullBundleRequest(
|
|||
RAY_CHECK(it != object_pull_requests_.end());
|
||||
num_bytes_being_pulled_ += it->second.object_size;
|
||||
objects_to_pull->push_back(obj_id);
|
||||
|
||||
ResetRetryTimer(obj_id);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -101,7 +96,6 @@ void PullManager::DeactivatePullBundleRequest(
|
|||
const std::map<uint64_t, std::vector<rpc::ObjectReference>>::iterator &request_it,
|
||||
std::unordered_set<ObjectID> *objects_to_cancel) {
|
||||
for (const auto &ref : request_it->second) {
|
||||
absl::MutexLock lock(&active_objects_mu_);
|
||||
auto obj_id = ObjectRefToId(ref);
|
||||
RAY_CHECK(active_object_pull_requests_[obj_id].erase(request_it->first));
|
||||
if (active_object_pull_requests_[obj_id].empty()) {
|
||||
|
@ -110,7 +104,10 @@ void PullManager::DeactivatePullBundleRequest(
|
|||
RAY_CHECK(it != object_pull_requests_.end());
|
||||
num_bytes_being_pulled_ -= it->second.object_size;
|
||||
active_object_pull_requests_.erase(obj_id);
|
||||
objects_to_cancel->insert(obj_id);
|
||||
|
||||
if (objects_to_cancel) {
|
||||
objects_to_cancel->insert(obj_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -173,19 +170,12 @@ void PullManager::UpdatePullsBasedOnAvailableMemory(size_t num_bytes_available)
|
|||
RAY_CHECK(last_request_it != pull_request_bundles_.end());
|
||||
DeactivatePullBundleRequest(last_request_it, &object_ids_to_cancel);
|
||||
}
|
||||
for (const auto &obj_id : object_ids_to_cancel) {
|
||||
// Call the cancellation callback outside of the lock.
|
||||
cancel_pull_request_(obj_id);
|
||||
}
|
||||
|
||||
TriggerOutOfMemoryHandlingIfNeeded();
|
||||
|
||||
{
|
||||
absl::MutexLock lock(&active_objects_mu_);
|
||||
for (const auto &obj_id : objects_to_pull) {
|
||||
if (object_ids_to_cancel.count(obj_id) == 0) {
|
||||
TryToMakeObjectLocal(obj_id);
|
||||
}
|
||||
for (const auto &obj_id : objects_to_pull) {
|
||||
if (object_ids_to_cancel.count(obj_id) == 0) {
|
||||
TryToMakeObjectLocal(obj_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -245,16 +235,11 @@ std::vector<ObjectID> PullManager::CancelPull(uint64_t request_id) {
|
|||
|
||||
// If the pull request was being actively pulled, deactivate it now.
|
||||
if (bundle_it->first <= highest_req_id_being_pulled_) {
|
||||
std::unordered_set<ObjectID> object_ids_to_cancel;
|
||||
DeactivatePullBundleRequest(bundle_it, &object_ids_to_cancel);
|
||||
for (const auto &obj_id : object_ids_to_cancel) {
|
||||
// Call the cancellation callback outside of the lock.
|
||||
cancel_pull_request_(obj_id);
|
||||
}
|
||||
DeactivatePullBundleRequest(bundle_it);
|
||||
}
|
||||
|
||||
// Erase this pull request.
|
||||
std::vector<ObjectID> object_ids_to_cancel_subscription;
|
||||
std::vector<ObjectID> object_ids_to_cancel;
|
||||
for (const auto &ref : bundle_it->second) {
|
||||
auto obj_id = ObjectRefToId(ref);
|
||||
auto it = object_pull_requests_.find(obj_id);
|
||||
|
@ -262,7 +247,7 @@ std::vector<ObjectID> PullManager::CancelPull(uint64_t request_id) {
|
|||
RAY_CHECK(it->second.bundle_request_ids.erase(bundle_it->first));
|
||||
if (it->second.bundle_request_ids.empty()) {
|
||||
object_pull_requests_.erase(it);
|
||||
object_ids_to_cancel_subscription.push_back(obj_id);
|
||||
object_ids_to_cancel.push_back(obj_id);
|
||||
}
|
||||
}
|
||||
pull_request_bundles_.erase(bundle_it);
|
||||
|
@ -272,7 +257,7 @@ std::vector<ObjectID> PullManager::CancelPull(uint64_t request_id) {
|
|||
// request to avoid reactivating it again.
|
||||
UpdatePullsBasedOnAvailableMemory(num_bytes_available_);
|
||||
|
||||
return object_ids_to_cancel_subscription;
|
||||
return object_ids_to_cancel;
|
||||
}
|
||||
|
||||
void PullManager::OnLocationChange(const ObjectID &object_id,
|
||||
|
@ -307,10 +292,7 @@ void PullManager::OnLocationChange(const ObjectID &object_id,
|
|||
RAY_LOG(DEBUG) << "OnLocationChange " << spilled_url << " num clients "
|
||||
<< client_ids.size();
|
||||
|
||||
{
|
||||
absl::MutexLock lock(&active_objects_mu_);
|
||||
TryToMakeObjectLocal(object_id);
|
||||
}
|
||||
TryToMakeObjectLocal(object_id);
|
||||
}
|
||||
|
||||
void PullManager::TryToMakeObjectLocal(const ObjectID &object_id) {
|
||||
|
@ -440,7 +422,6 @@ void PullManager::UpdateRetryTimer(ObjectPullRequest &request) {
|
|||
}
|
||||
|
||||
void PullManager::Tick() {
|
||||
absl::MutexLock lock(&active_objects_mu_);
|
||||
for (auto &pair : active_object_pull_requests_) {
|
||||
const auto &object_id = pair.first;
|
||||
TryToMakeObjectLocal(object_id);
|
||||
|
@ -449,13 +430,7 @@ void PullManager::Tick() {
|
|||
|
||||
int PullManager::NumActiveRequests() const { return object_pull_requests_.size(); }
|
||||
|
||||
bool PullManager::IsObjectActive(const ObjectID &object_id) const {
|
||||
absl::MutexLock lock(&active_objects_mu_);
|
||||
return active_object_pull_requests_.count(object_id) == 1;
|
||||
}
|
||||
|
||||
std::string PullManager::DebugString() const {
|
||||
absl::MutexLock lock(&active_objects_mu_);
|
||||
std::stringstream result;
|
||||
result << "PullManager:";
|
||||
result << "\n- num bytes available for pulled objects: " << num_bytes_available_;
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
#include "ray/object_manager/common.h"
|
||||
#include "ray/object_manager/format/object_manager_generated.h"
|
||||
#include "ray/object_manager/notification/object_store_notification_manager_ipc.h"
|
||||
#include "ray/object_manager/object_buffer_pool.h"
|
||||
#include "ray/object_manager/object_directory.h"
|
||||
#include "ray/object_manager/ownership_based_object_directory.h"
|
||||
#include "ray/object_manager/plasma/store_runner.h"
|
||||
|
@ -31,17 +32,13 @@ class PullManager {
|
|||
///
|
||||
/// \param self_node_id the current node
|
||||
/// \param object_is_local A callback which should return true if a given object is
|
||||
/// already on the local node.
|
||||
/// \param send_pull_request A callback which should send a
|
||||
/// already on the local node. \param send_pull_request A callback which should send a
|
||||
/// pull request to the specified node.
|
||||
/// \param cancel_pull_request A callback which should
|
||||
/// cancel pulling an object.
|
||||
/// \param restore_spilled_object A callback which should
|
||||
/// retrieve an spilled object from the external store.
|
||||
PullManager(
|
||||
NodeID &self_node_id, const std::function<bool(const ObjectID &)> object_is_local,
|
||||
const std::function<void(const ObjectID &, const NodeID &)> send_pull_request,
|
||||
const std::function<void(const ObjectID &)> cancel_pull_request,
|
||||
const RestoreSpilledObjectCallback restore_spilled_object,
|
||||
const std::function<double()> get_time, int pull_timeout_ms,
|
||||
size_t num_bytes_available, std::function<void()> object_store_full_callback);
|
||||
|
@ -103,8 +100,6 @@ class PullManager {
|
|||
/// The number of ongoing object pulls.
|
||||
int NumActiveRequests() const;
|
||||
|
||||
bool IsObjectActive(const ObjectID &object_id) const;
|
||||
|
||||
std::string DebugString() const;
|
||||
|
||||
private:
|
||||
|
@ -134,8 +129,7 @@ class PullManager {
|
|||
/// locations. This does nothing if the object is not needed by any pull
|
||||
/// request or if it is already local. This also sets a timeout for when to
|
||||
/// make the next attempt to make the object local.
|
||||
void TryToMakeObjectLocal(const ObjectID &object_id)
|
||||
EXCLUSIVE_LOCKS_REQUIRED(active_objects_mu_);
|
||||
void TryToMakeObjectLocal(const ObjectID &object_id);
|
||||
|
||||
/// Try to Pull an object from one of its expected client locations. If there
|
||||
/// are more client locations to try after this attempt, then this method
|
||||
|
@ -161,7 +155,7 @@ class PullManager {
|
|||
/// operations for the object.
|
||||
void DeactivatePullBundleRequest(
|
||||
const std::map<uint64_t, std::vector<rpc::ObjectReference>>::iterator &request_it,
|
||||
std::unordered_set<ObjectID> *objects_to_cancel);
|
||||
std::unordered_set<ObjectID> *objects_to_cancel = nullptr);
|
||||
|
||||
/// Trigger out-of-memory handling if the first request in the queue needs
|
||||
/// more space than the bytes available. This is needed to make room for the
|
||||
|
@ -172,7 +166,6 @@ class PullManager {
|
|||
NodeID self_node_id_;
|
||||
const std::function<bool(const ObjectID &)> object_is_local_;
|
||||
const std::function<void(const ObjectID &, const NodeID &)> send_pull_request_;
|
||||
const std::function<void(const ObjectID &)> cancel_pull_request_;
|
||||
const RestoreSpilledObjectCallback restore_spilled_object_;
|
||||
const std::function<double()> get_time_;
|
||||
uint64_t pull_timeout_ms_;
|
||||
|
@ -214,16 +207,12 @@ class PullManager {
|
|||
/// object managers.
|
||||
std::unordered_map<ObjectID, ObjectPullRequest> object_pull_requests_;
|
||||
|
||||
// Protects state that is shared by the threads used to receive object
|
||||
// chunks.
|
||||
mutable absl::Mutex active_objects_mu_;
|
||||
|
||||
/// The objects that we are currently fetching. This is a subset of the
|
||||
/// objects that we have been asked to fetch. The total size of these objects
|
||||
/// is the number of bytes that we are currently pulling, and it must be less
|
||||
/// than the bytes available.
|
||||
absl::flat_hash_map<ObjectID, absl::flat_hash_set<uint64_t>>
|
||||
active_object_pull_requests_ GUARDED_BY(active_objects_mu_);
|
||||
active_object_pull_requests_;
|
||||
|
||||
/// Internally maintained random number generator.
|
||||
std::mt19937_64 gen_;
|
||||
|
|
|
@ -19,24 +19,22 @@ class PullManagerTestWithCapacity {
|
|||
num_restore_spilled_object_calls_(0),
|
||||
num_object_store_full_calls_(0),
|
||||
fake_time_(0),
|
||||
pull_manager_(
|
||||
self_node_id_, [this](const ObjectID &object_id) { return object_is_local_; },
|
||||
[this](const ObjectID &object_id, const NodeID &node_id) {
|
||||
num_send_pull_request_calls_++;
|
||||
},
|
||||
[this](const ObjectID &object_id) { num_abort_calls_[object_id]++; },
|
||||
[this](const ObjectID &, const std::string &, const NodeID &,
|
||||
std::function<void(const ray::Status &)> callback) {
|
||||
num_restore_spilled_object_calls_++;
|
||||
restore_object_callback_ = callback;
|
||||
},
|
||||
[this]() { return fake_time_; }, 10000, num_available_bytes,
|
||||
[this]() { num_object_store_full_calls_++; }) {}
|
||||
pull_manager_(self_node_id_,
|
||||
[this](const ObjectID &object_id) { return object_is_local_; },
|
||||
[this](const ObjectID &object_id, const NodeID &node_id) {
|
||||
num_send_pull_request_calls_++;
|
||||
},
|
||||
[this](const ObjectID &, const std::string &, const NodeID &,
|
||||
std::function<void(const ray::Status &)> callback) {
|
||||
num_restore_spilled_object_calls_++;
|
||||
restore_object_callback_ = callback;
|
||||
},
|
||||
[this]() { return fake_time_; }, 10000, num_available_bytes,
|
||||
[this]() { num_object_store_full_calls_++; }) {}
|
||||
|
||||
void AssertNoLeaks() {
|
||||
ASSERT_TRUE(pull_manager_.pull_request_bundles_.empty());
|
||||
ASSERT_TRUE(pull_manager_.object_pull_requests_.empty());
|
||||
absl::MutexLock lock(&pull_manager_.active_objects_mu_);
|
||||
ASSERT_TRUE(pull_manager_.active_object_pull_requests_.empty());
|
||||
// Most tests should not throw OOM.
|
||||
ASSERT_EQ(num_object_store_full_calls_, 0);
|
||||
|
@ -50,7 +48,6 @@ class PullManagerTestWithCapacity {
|
|||
std::function<void(const ray::Status &)> restore_object_callback_;
|
||||
double fake_time_;
|
||||
PullManager pull_manager_;
|
||||
std::unordered_map<ObjectID, int> num_abort_calls_;
|
||||
};
|
||||
|
||||
class PullManagerTest : public PullManagerTestWithCapacity, public ::testing::Test {
|
||||
|
@ -58,7 +55,6 @@ class PullManagerTest : public PullManagerTestWithCapacity, public ::testing::Te
|
|||
PullManagerTest() : PullManagerTestWithCapacity(1) {}
|
||||
|
||||
void AssertNumActiveRequestsEquals(size_t num_requests) {
|
||||
absl::MutexLock lock(&pull_manager_.active_objects_mu_);
|
||||
ASSERT_EQ(pull_manager_.object_pull_requests_.size(), num_requests);
|
||||
ASSERT_EQ(pull_manager_.active_object_pull_requests_.size(), num_requests);
|
||||
}
|
||||
|
@ -70,7 +66,6 @@ class PullManagerWithAdmissionControlTest : public PullManagerTestWithCapacity,
|
|||
PullManagerWithAdmissionControlTest() : PullManagerTestWithCapacity(10) {}
|
||||
|
||||
void AssertNumActiveRequestsEquals(size_t num_requests) {
|
||||
absl::MutexLock lock(&pull_manager_.active_objects_mu_);
|
||||
ASSERT_EQ(pull_manager_.active_object_pull_requests_.size(), num_requests);
|
||||
}
|
||||
|
||||
|
@ -105,7 +100,6 @@ TEST_F(PullManagerTest, TestStaleSubscription) {
|
|||
// There are no client ids to pull from.
|
||||
ASSERT_EQ(num_send_pull_request_calls_, 0);
|
||||
ASSERT_EQ(num_restore_spilled_object_calls_, 0);
|
||||
ASSERT_TRUE(num_abort_calls_.empty());
|
||||
|
||||
auto objects_to_cancel = pull_manager_.CancelPull(req_id);
|
||||
ASSERT_EQ(objects_to_cancel, ObjectRefsToIds(refs));
|
||||
|
@ -120,7 +114,6 @@ TEST_F(PullManagerTest, TestStaleSubscription) {
|
|||
// Now we're getting a notification about an object that was already cancelled.
|
||||
ASSERT_EQ(num_send_pull_request_calls_, 0);
|
||||
ASSERT_EQ(num_restore_spilled_object_calls_, 0);
|
||||
ASSERT_EQ(num_abort_calls_[oid], 1);
|
||||
|
||||
AssertNoLeaks();
|
||||
}
|
||||
|
@ -169,10 +162,8 @@ TEST_F(PullManagerTest, TestRestoreSpilledObject) {
|
|||
NodeID::FromRandom(), 0);
|
||||
ASSERT_EQ(num_restore_spilled_object_calls_, 0);
|
||||
|
||||
ASSERT_TRUE(num_abort_calls_.empty());
|
||||
auto objects_to_cancel = pull_manager_.CancelPull(req_id);
|
||||
ASSERT_EQ(objects_to_cancel, ObjectRefsToIds(refs));
|
||||
ASSERT_EQ(num_abort_calls_[obj1], 1);
|
||||
|
||||
AssertNoLeaks();
|
||||
}
|
||||
|
@ -225,9 +216,7 @@ TEST_F(PullManagerTest, TestRestoreObjectFailed) {
|
|||
ASSERT_EQ(num_send_pull_request_calls_, 1);
|
||||
ASSERT_EQ(num_restore_spilled_object_calls_, 2);
|
||||
|
||||
ASSERT_TRUE(num_abort_calls_.empty());
|
||||
auto objects_to_cancel = pull_manager_.CancelPull(req_id);
|
||||
ASSERT_EQ(num_abort_calls_[obj1], 1);
|
||||
AssertNoLeaks();
|
||||
}
|
||||
|
||||
|
@ -257,7 +246,6 @@ TEST_F(PullManagerTest, TestLoadBalancingRestorationRequest) {
|
|||
// Make sure the restore request wasn't sent since there are nodes that have a copied
|
||||
// object.
|
||||
ASSERT_EQ(num_restore_spilled_object_calls_, 0);
|
||||
ASSERT_TRUE(num_abort_calls_.empty());
|
||||
}
|
||||
|
||||
TEST_F(PullManagerTest, TestManyUpdates) {
|
||||
|
@ -281,10 +269,8 @@ TEST_F(PullManagerTest, TestManyUpdates) {
|
|||
ASSERT_EQ(num_send_pull_request_calls_, 1);
|
||||
ASSERT_EQ(num_restore_spilled_object_calls_, 0);
|
||||
|
||||
ASSERT_TRUE(num_abort_calls_.empty());
|
||||
auto objects_to_cancel = pull_manager_.CancelPull(req_id);
|
||||
ASSERT_EQ(objects_to_cancel, ObjectRefsToIds(refs));
|
||||
ASSERT_EQ(num_abort_calls_[obj1], 1);
|
||||
|
||||
AssertNoLeaks();
|
||||
}
|
||||
|
@ -330,10 +316,8 @@ TEST_F(PullManagerTest, TestRetryTimer) {
|
|||
}
|
||||
ASSERT_EQ(num_send_pull_request_calls_, 0);
|
||||
|
||||
ASSERT_TRUE(num_abort_calls_.empty());
|
||||
auto objects_to_cancel = pull_manager_.CancelPull(req_id);
|
||||
ASSERT_EQ(objects_to_cancel, ObjectRefsToIds(refs));
|
||||
ASSERT_EQ(num_abort_calls_[obj1], 1);
|
||||
|
||||
AssertNoLeaks();
|
||||
}
|
||||
|
@ -364,13 +348,9 @@ TEST_F(PullManagerTest, TestBasic) {
|
|||
}
|
||||
ASSERT_EQ(num_send_pull_request_calls_, 0);
|
||||
|
||||
ASSERT_TRUE(num_abort_calls_.empty());
|
||||
auto objects_to_cancel = pull_manager_.CancelPull(req_id);
|
||||
ASSERT_EQ(objects_to_cancel, oids);
|
||||
AssertNumActiveRequestsEquals(0);
|
||||
for (auto &oid : oids) {
|
||||
ASSERT_EQ(num_abort_calls_[oid], 1);
|
||||
}
|
||||
|
||||
// Don't pull a remote object if we've canceled.
|
||||
object_is_local_ = false;
|
||||
|
@ -407,7 +387,6 @@ TEST_F(PullManagerTest, TestDeduplicateBundles) {
|
|||
|
||||
// Cancel one request.
|
||||
auto objects_to_cancel = pull_manager_.CancelPull(req_id1);
|
||||
ASSERT_TRUE(num_abort_calls_.empty());
|
||||
ASSERT_TRUE(objects_to_cancel.empty());
|
||||
// Objects should still be pulled because the other request is still open.
|
||||
AssertNumActiveRequestsEquals(oids.size());
|
||||
|
@ -421,13 +400,9 @@ TEST_F(PullManagerTest, TestDeduplicateBundles) {
|
|||
}
|
||||
|
||||
// Cancel the other request.
|
||||
ASSERT_TRUE(num_abort_calls_.empty());
|
||||
objects_to_cancel = pull_manager_.CancelPull(req_id2);
|
||||
ASSERT_EQ(objects_to_cancel, oids);
|
||||
AssertNumActiveRequestsEquals(0);
|
||||
for (auto &oid : oids) {
|
||||
ASSERT_EQ(num_abort_calls_[oid], 1);
|
||||
}
|
||||
|
||||
// Don't pull a remote object if we've canceled.
|
||||
object_is_local_ = false;
|
||||
|
@ -463,14 +438,10 @@ TEST_F(PullManagerWithAdmissionControlTest, TestBasic) {
|
|||
ASSERT_TRUE(IsUnderCapacity(oids.size() * object_size));
|
||||
|
||||
// Reduce the available memory.
|
||||
ASSERT_TRUE(num_abort_calls_.empty());
|
||||
ASSERT_EQ(num_object_store_full_calls_, 0);
|
||||
pull_manager_.UpdatePullsBasedOnAvailableMemory(oids.size() * object_size - 1);
|
||||
AssertNumActiveRequestsEquals(0);
|
||||
ASSERT_EQ(num_object_store_full_calls_, 1);
|
||||
for (auto &oid : oids) {
|
||||
ASSERT_EQ(num_abort_calls_[oid], 1);
|
||||
}
|
||||
// No new pull requests after the next tick.
|
||||
fake_time_ += 10;
|
||||
auto prev_pull_requests = num_send_pull_request_calls_;
|
||||
|
@ -489,14 +460,8 @@ TEST_F(PullManagerWithAdmissionControlTest, TestBasic) {
|
|||
// OOM was not triggered a second time.
|
||||
ASSERT_EQ(num_object_store_full_calls_, 1);
|
||||
num_object_store_full_calls_ = 0;
|
||||
for (auto &oid : oids) {
|
||||
ASSERT_EQ(num_abort_calls_[oid], 1);
|
||||
}
|
||||
|
||||
pull_manager_.CancelPull(req_id);
|
||||
for (auto &oid : oids) {
|
||||
ASSERT_EQ(num_abort_calls_[oid], 2);
|
||||
}
|
||||
AssertNoLeaks();
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue