From 296792f9635b4e3bde93ab390d9349741bb02bb8 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Fri, 19 Feb 2021 11:58:17 -0800 Subject: [PATCH] Revert "[core] Fix bugs in admission control (#14157)" (#14217) This reverts commit 94a819d00e22da3f0f48edb8f072a92ffde8201e. --- python/ray/state.py | 2 +- python/ray/tests/test_advanced_3.py | 38 ---------- src/ray/object_manager/object_buffer_pool.cc | 31 +++----- src/ray/object_manager/object_buffer_pool.h | 8 +- src/ray/object_manager/object_manager.cc | 73 ++++++++----------- src/ray/object_manager/object_manager.h | 24 ++---- src/ray/object_manager/plasma/store.cc | 17 +---- src/ray/object_manager/plasma/store.h | 22 +----- src/ray/object_manager/pull_manager.cc | 49 +++---------- src/ray/object_manager/pull_manager.h | 21 ++---- .../object_manager/test/pull_manager_test.cc | 59 +++------------ 11 files changed, 83 insertions(+), 261 deletions(-) diff --git a/python/ray/state.py b/python/ray/state.py index 523a7c4d4..7524ea124 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -641,7 +641,7 @@ class GlobalState: object_ref, remote_node_id, _, _ = event["extra_data"] elif event["event_type"] == "transfer_receive": - object_ref, remote_node_id, _ = event["extra_data"] + object_ref, remote_node_id, _, _ = event["extra_data"] elif event["event_type"] == "receive_pull_request": object_ref, remote_node_id = event["extra_data"] diff --git a/python/ray/tests/test_advanced_3.py b/python/ray/tests/test_advanced_3.py index 520c40f2b..5a2b57e2c 100644 --- a/python/ray/tests/test_advanced_3.py +++ b/python/ray/tests/test_advanced_3.py @@ -123,44 +123,6 @@ def test_load_balancing_with_dependencies(ray_start_cluster, fast): attempt_to_load_balance(f, [x], 100, num_nodes, 25) -def test_load_balancing_under_constrained_memory(ray_start_cluster): - # This test ensures that tasks are being assigned to all raylets in a - # roughly equal manner even when the tasks have dependencies. - cluster = ray_start_cluster - num_nodes = 3 - num_cpus = 4 - object_size = 4e7 - num_tasks = 100 - for _ in range(num_nodes): - cluster.add_node( - num_cpus=num_cpus, - memory=(num_cpus - 2) * object_size, - object_store_memory=(num_cpus - 2) * object_size) - cluster.add_node( - num_cpus=0, - resources={"custom": 1}, - memory=(num_tasks + 1) * object_size, - object_store_memory=(num_tasks + 1) * object_size) - ray.init(address=cluster.address) - - @ray.remote(num_cpus=0, resources={"custom": 1}) - def create_object(): - return np.zeros(int(object_size), dtype=np.uint8) - - @ray.remote - def f(i, x): - time.sleep(0.1) - print(i, ray.worker.global_worker.node.unique_id) - return ray.worker.global_worker.node.unique_id - - # TODO(swang): Actually test load balancing. - deps = [create_object.remote() for _ in range(num_tasks)] - tasks = [f.remote(i, dep) for i, dep in enumerate(deps)] - for i, dep in enumerate(deps): - print(i, dep) - ray.get(tasks) - - def test_locality_aware_leasing(ray_start_cluster): # This test ensures that a task will run where its task dependencies are # located. We run an initial non_local() task that is pinned to a diff --git a/src/ray/object_manager/object_buffer_pool.cc b/src/ray/object_manager/object_buffer_pool.cc index 63b17f80d..63dabcb41 100644 --- a/src/ray/object_manager/object_buffer_pool.cc +++ b/src/ray/object_manager/object_buffer_pool.cc @@ -134,7 +134,7 @@ std::pair ObjectBufferPool::Cr // There can be only one reference to this chunk at any given time. return std::pair( errored_chunk_, - ray::Status::IOError("Chunk already received by a different thread.")); + ray::Status::IOError("Chunk already referenced by another thread.")); } create_buffer_state_[object_id].chunk_state[chunk_index] = CreateChunkState::REFERENCED; return std::pair( @@ -164,34 +164,23 @@ void ObjectBufferPool::AbortCreateChunk(const ObjectID &object_id, void ObjectBufferPool::SealChunk(const ObjectID &object_id, const uint64_t chunk_index) { std::lock_guard lock(pool_mutex_); - auto it = create_buffer_state_.find(object_id); - if (it == create_buffer_state_.end() || - it->second.chunk_state[chunk_index] != CreateChunkState::REFERENCED) { - RAY_LOG(DEBUG) << "Object " << object_id << " aborted due to OOM before chunk " - << chunk_index << " could be sealed"; - return; - } - it->second.chunk_state[chunk_index] = CreateChunkState::SEALED; - it->second.num_seals_remaining--; - if (it->second.num_seals_remaining == 0) { + RAY_CHECK(create_buffer_state_[object_id].chunk_state[chunk_index] == + CreateChunkState::REFERENCED); + create_buffer_state_[object_id].chunk_state[chunk_index] = CreateChunkState::SEALED; + create_buffer_state_[object_id].num_seals_remaining--; + if (create_buffer_state_[object_id].num_seals_remaining == 0) { RAY_CHECK_OK(store_client_.Seal(object_id)); RAY_CHECK_OK(store_client_.Release(object_id)); - create_buffer_state_.erase(it); + create_buffer_state_.erase(object_id); RAY_LOG(DEBUG) << "Have received all chunks for object " << object_id << ", last chunk index: " << chunk_index; } } void ObjectBufferPool::AbortCreate(const ObjectID &object_id) { - std::lock_guard lock(pool_mutex_); - auto it = create_buffer_state_.find(object_id); - if (it != create_buffer_state_.end()) { - RAY_LOG(INFO) << "Not enough memory to create requested object " << object_id - << ", aborting"; - RAY_CHECK_OK(store_client_.Release(object_id)); - RAY_CHECK_OK(store_client_.Abort(object_id)); - create_buffer_state_.erase(object_id); - } + RAY_CHECK_OK(store_client_.Release(object_id)); + RAY_CHECK_OK(store_client_.Abort(object_id)); + create_buffer_state_.erase(object_id); } std::vector ObjectBufferPool::BuildChunks( diff --git a/src/ray/object_manager/object_buffer_pool.h b/src/ray/object_manager/object_buffer_pool.h index ad812da11..ee013563f 100644 --- a/src/ray/object_manager/object_buffer_pool.h +++ b/src/ray/object_manager/object_buffer_pool.h @@ -137,16 +137,16 @@ class ObjectBufferPool { /// \return Void. void FreeObjects(const std::vector &object_ids); - /// Abort the create operation associated with an object. This destroys the buffer - /// state, including create operations in progress for all chunks of the object. - void AbortCreate(const ObjectID &object_id); - /// Returns debug string for class. /// /// \return string. std::string DebugString() const; private: + /// Abort the create operation associated with an object. This destroys the buffer + /// state, including create operations in progress for all chunks of the object. + void AbortCreate(const ObjectID &object_id); + /// Abort the get operation associated with an object. void AbortGet(const ObjectID &object_id); diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 05883723f..89479e6ab 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -94,20 +94,14 @@ ObjectManager::ObjectManager(asio::io_service &main_service, const NodeID &self_ const NodeID &client_id) { SendPullRequest(object_id, client_id); }; - const auto &cancel_pull_request = [this](const ObjectID &object_id) { - // We must abort this object because it may have only been partially - // created and will cause a leak if we never receive the rest of the - // object. This is a no-op if the object is already sealed or evicted. - buffer_pool_.AbortCreate(object_id); - }; const auto &get_time = []() { return absl::GetCurrentTimeNanos() / 1e9; }; int64_t available_memory = config.object_store_memory; if (available_memory < 0) { available_memory = 0; } pull_manager_.reset(new PullManager( - self_node_id_, object_is_local, send_pull_request, cancel_pull_request, - restore_spilled_object_, get_time, config.pull_timeout_ms, available_memory, + self_node_id_, object_is_local, send_pull_request, restore_spilled_object_, + get_time, config.pull_timeout_ms, available_memory, [spill_objects_callback, object_store_full_callback]() { // TODO(swang): This copies the out-of-memory handling in the // CreateRequestQueue. It would be nice to unify these. @@ -316,15 +310,22 @@ void ObjectManager::HandleSendFinished(const ObjectID &object_id, const NodeID & void ObjectManager::HandleReceiveFinished(const ObjectID &object_id, const NodeID &node_id, uint64_t chunk_index, - double start_time, double end_time) { + double start_time, double end_time, + ray::Status status) { + if (!status.ok()) { + // TODO(rkn): What do we want to do if the send failed? + } + rpc::ProfileTableData::ProfileEvent profile_event; profile_event.set_event_type("transfer_receive"); profile_event.set_start_time(start_time); profile_event.set_end_time(end_time); - // Encode the object ID, node ID, chunk index as a json list, + // Encode the object ID, node ID, chunk index, and status as a json list, // which will be parsed by the reader of the profile table. + profile_event.set_extra_data("[\"" + object_id.Hex() + "\",\"" + node_id.Hex() + "\"," + - std::to_string(chunk_index) + "]"); + std::to_string(chunk_index) + ",\"" + status.ToString() + + "\"]"); std::lock_guard lock(profile_mutex_); profile_events_.push_back(profile_event); @@ -665,57 +666,43 @@ void ObjectManager::HandlePush(const rpc::PushRequest &request, rpc::PushReply * const std::string &data = request.data(); double start_time = absl::GetCurrentTimeNanos() / 1e9; - bool success = ReceiveObjectChunk(node_id, object_id, owner_address, data_size, - metadata_size, chunk_index, data); - if (!success) { - num_chunks_received_failed_++; - RAY_LOG(INFO) << "Received duplicate or cancelled chunk at index " << chunk_index - << " of object " << object_id << ": overall " - << num_chunks_received_failed_ << "/" << num_chunks_received_total_ - << " failed"; - } + auto status = ReceiveObjectChunk(node_id, object_id, owner_address, data_size, + metadata_size, chunk_index, data); double end_time = absl::GetCurrentTimeNanos() / 1e9; - HandleReceiveFinished(object_id, node_id, chunk_index, start_time, end_time); - send_reply_callback(Status::OK(), nullptr, nullptr); + HandleReceiveFinished(object_id, node_id, chunk_index, start_time, end_time, status); + send_reply_callback(status, nullptr, nullptr); } -bool ObjectManager::ReceiveObjectChunk(const NodeID &node_id, const ObjectID &object_id, - const rpc::Address &owner_address, - uint64_t data_size, uint64_t metadata_size, - uint64_t chunk_index, const std::string &data) { - num_chunks_received_total_++; +ray::Status ObjectManager::ReceiveObjectChunk(const NodeID &node_id, + const ObjectID &object_id, + const rpc::Address &owner_address, + uint64_t data_size, uint64_t metadata_size, + uint64_t chunk_index, + const std::string &data) { RAY_LOG(DEBUG) << "ReceiveObjectChunk on " << self_node_id_ << " from " << node_id << " of object " << object_id << " chunk index: " << chunk_index << ", chunk data size: " << data.size() << ", object size: " << data_size; - if (!pull_manager_->IsObjectActive(object_id)) { - // This object is no longer being actively pulled. Do not create the object. - return false; - } std::pair chunk_status = buffer_pool_.CreateChunk(object_id, owner_address, data_size, metadata_size, chunk_index); - if (!pull_manager_->IsObjectActive(object_id)) { - // This object is no longer being actively pulled. Abort the object. We - // have to check again here because the pull manager runs in a different - // thread and the object may have been deactivated right before creating - // the chunk. - buffer_pool_.AbortCreate(object_id); - return false; - } - + ray::Status status; ObjectBufferPool::ChunkInfo chunk_info = chunk_status.first; + num_chunks_received_total_++; if (chunk_status.second.ok()) { // Avoid handling this chunk if it's already being handled by another process. std::memcpy(chunk_info.data, data.data(), chunk_info.buffer_length); buffer_pool_.SealChunk(object_id, chunk_index); - return true; } else { - RAY_LOG(INFO) << "Error receiving chunk:" << chunk_status.second.message(); - return false; + num_chunks_received_failed_++; + RAY_LOG(INFO) << "ReceiveObjectChunk index " << chunk_index << " of object " + << object_id << " failed: " << chunk_status.second.message() + << ", overall " << num_chunks_received_failed_ << "/" + << num_chunks_received_total_ << " failed"; } + return status; } void ObjectManager::HandlePull(const rpc::PullRequest &request, rpc::PullReply *reply, diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index 43e5f4510..000730122 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -159,15 +159,7 @@ class ObjectManager : public ObjectManagerInterface, std::shared_ptr rpc_client, std::function on_complete); - /// Receive an object chunk from a remote object manager. Small object may - /// fit in one chunk. - /// - /// If this is the last remaining chunk for an object, then the object will - /// be sealed. Else, we will keep the plasma buffer open until the remaining - /// chunks are received. - /// - /// If the object is no longer being actively pulled, the object will not be - /// created. + /// Receive object chunk from remote object manager, small object may contain one chunk /// /// \param node_id Node id of remote object manager which sends this chunk /// \param object_id Object id @@ -176,13 +168,10 @@ class ObjectManager : public ObjectManagerInterface, /// \param metadata_size Metadata size /// \param chunk_index Chunk index /// \param data Chunk data - /// \return Whether the chunk was successfully written into the local object - /// store. This can fail if the chunk was already received in the past, or if - /// the object is no longer being actively pulled. - bool ReceiveObjectChunk(const NodeID &node_id, const ObjectID &object_id, - const rpc::Address &owner_address, uint64_t data_size, - uint64_t metadata_size, uint64_t chunk_index, - const std::string &data); + ray::Status ReceiveObjectChunk(const NodeID &node_id, const ObjectID &object_id, + const rpc::Address &owner_address, uint64_t data_size, + uint64_t metadata_size, uint64_t chunk_index, + const std::string &data); /// Send pull request /// @@ -404,10 +393,11 @@ class ObjectManager : public ObjectManagerInterface, /// chunk. /// \param end_time_us The time when the object manager finished receiving the /// chunk. + /// \param status The status of the receive (e.g., did it succeed or fail). /// \return Void. void HandleReceiveFinished(const ObjectID &object_id, const NodeID &node_id, uint64_t chunk_index, double start_time_us, - double end_time_us); + double end_time_us, ray::Status status); /// Handle Push task timeout. void HandlePushTaskTimeout(const ObjectID &object_id, const NodeID &node_id); diff --git a/src/ray/object_manager/plasma/store.cc b/src/ray/object_manager/plasma/store.cc index 1bfd80615..642d84204 100644 --- a/src/ray/object_manager/plasma/store.cc +++ b/src/ray/object_manager/plasma/store.cc @@ -166,8 +166,6 @@ void PlasmaStore::AddToClientObjectIds(const ObjectID &object_id, ObjectTableEnt } // Increase reference count. entry->ref_count++; - RAY_LOG(DEBUG) << "Object " << object_id << " in use by client" - << ", num bytes in use is now " << num_bytes_in_use_; // Add object id to the list of object ids that this client is using. client->object_ids.insert(object_id); @@ -327,8 +325,6 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID &object_id, eviction_policy_.ObjectCreated(object_id, client.get(), true); // Record that this client is using this object. AddToClientObjectIds(object_id, store_info_.objects[object_id].get(), client); - num_objects_unsealed_++; - num_bytes_unsealed_ += data_size + metadata_size; return PlasmaError::OK; } @@ -542,14 +538,12 @@ int PlasmaStore::RemoveFromClientObjectIds(const ObjectID &object_id, client->object_ids.erase(it); // Decrease reference count. entry->ref_count--; - RAY_LOG(DEBUG) << "Object " << object_id << " no longer in use by client"; // If no more clients are using this object, notify the eviction policy // that the object is no longer being used. if (entry->ref_count == 0) { num_bytes_in_use_ -= entry->data_size + entry->metadata_size; - RAY_LOG(DEBUG) << "Releasing object no longer in use " << object_id - << ", num bytes in use is now " << num_bytes_in_use_; + RAY_LOG(DEBUG) << "Releasing object no longer in use " << object_id; if (deletion_cache_.count(object_id) == 0) { // Tell the eviction policy that this object is no longer being used. eviction_policy_.EndObjectAccess(object_id); @@ -574,15 +568,9 @@ void PlasmaStore::EraseFromObjectTable(const ObjectID &object_id) { if (object->device_num == 0) { PlasmaAllocator::Free(object->pointer, buff_size); } - if (object->state == ObjectState::PLASMA_CREATED) { - num_bytes_unsealed_ -= object->data_size + object->metadata_size; - num_objects_unsealed_--; - } if (object->ref_count > 0) { // A client was using this object. num_bytes_in_use_ -= object->data_size + object->metadata_size; - RAY_LOG(DEBUG) << "Erasing object " << object_id << " with nonzero ref count" - << object_id << ", num bytes in use is now " << num_bytes_in_use_; } store_info_.objects.erase(object_id); } @@ -626,9 +614,6 @@ void PlasmaStore::SealObjects(const std::vector &object_ids) { object_info.owner_worker_id = entry->owner_worker_id.Binary(); object_info.metadata_size = entry->metadata_size; infos.push_back(object_info); - - num_objects_unsealed_--; - num_bytes_unsealed_ -= entry->data_size + entry->metadata_size; } PushNotifications(infos); diff --git a/src/ray/object_manager/plasma/store.h b/src/ray/object_manager/plasma/store.h index 27a0451d8..c6561bf65 100644 --- a/src/ray/object_manager/plasma/store.h +++ b/src/ray/object_manager/plasma/store.h @@ -210,18 +210,8 @@ class PlasmaStore { /// Process queued requests to create an object. void ProcessCreateRequests(); - /// Get the available memory for new objects to be created. This includes - /// memory that is currently being used for created but unsealed objects. void GetAvailableMemory(std::function callback) const { - RAY_CHECK((num_bytes_unsealed_ > 0 && num_objects_unsealed_ > 0) || - (num_bytes_unsealed_ == 0 && num_objects_unsealed_ == 0)) - << "Tracking for available memory in the plasma store has gone out of sync. " - "Please file a GitHub issue."; - RAY_CHECK(num_bytes_in_use_ >= num_bytes_unsealed_); - // We do not count unsealed objects as in use because these may have been - // created by the object manager. - int64_t num_bytes_in_use = - static_cast(num_bytes_in_use_ - num_bytes_unsealed_); + int64_t num_bytes_in_use = static_cast(num_bytes_in_use_); RAY_CHECK(PlasmaAllocator::GetFootprintLimit() >= num_bytes_in_use); size_t available = PlasmaAllocator::GetFootprintLimit() - num_bytes_in_use; callback(available); @@ -324,18 +314,8 @@ class PlasmaStore { /// mutex if it is not absolutely necessary. std::recursive_mutex mutex_; - /// Total number of bytes allocated to objects that are in use by any client. - /// This includes objects that are being created and objects that a client - /// called get on. size_t num_bytes_in_use_ = 0; - /// Total number of bytes allocated to objects that are created but not yet - /// sealed. - size_t num_bytes_unsealed_ = 0; - - /// Number of objects that are created but not sealed. - size_t num_objects_unsealed_ = 0; - /// Total plasma object bytes that are consumed by core workers. int64_t total_consumed_bytes_ = 0; }; diff --git a/src/ray/object_manager/pull_manager.cc b/src/ray/object_manager/pull_manager.cc index f8f055be6..1ce460b81 100644 --- a/src/ray/object_manager/pull_manager.cc +++ b/src/ray/object_manager/pull_manager.cc @@ -7,14 +7,12 @@ namespace ray { PullManager::PullManager( NodeID &self_node_id, const std::function object_is_local, const std::function send_pull_request, - const std::function cancel_pull_request, const RestoreSpilledObjectCallback restore_spilled_object, const std::function get_time, int pull_timeout_ms, size_t num_bytes_available, std::function object_store_full_callback) : self_node_id_(self_node_id), object_is_local_(object_is_local), send_pull_request_(send_pull_request), - cancel_pull_request_(cancel_pull_request), restore_spilled_object_(restore_spilled_object), get_time_(get_time), pull_timeout_ms_(pull_timeout_ms), @@ -74,7 +72,6 @@ bool PullManager::ActivateNextPullBundleRequest( // Activate the bundle. for (const auto &ref : next_request_it->second) { - absl::MutexLock lock(&active_objects_mu_); auto obj_id = ObjectRefToId(ref); bool start_pull = active_object_pull_requests_.count(obj_id) == 0; active_object_pull_requests_[obj_id].insert(next_request_it->first); @@ -86,8 +83,6 @@ bool PullManager::ActivateNextPullBundleRequest( RAY_CHECK(it != object_pull_requests_.end()); num_bytes_being_pulled_ += it->second.object_size; objects_to_pull->push_back(obj_id); - - ResetRetryTimer(obj_id); } } @@ -101,7 +96,6 @@ void PullManager::DeactivatePullBundleRequest( const std::map>::iterator &request_it, std::unordered_set *objects_to_cancel) { for (const auto &ref : request_it->second) { - absl::MutexLock lock(&active_objects_mu_); auto obj_id = ObjectRefToId(ref); RAY_CHECK(active_object_pull_requests_[obj_id].erase(request_it->first)); if (active_object_pull_requests_[obj_id].empty()) { @@ -110,7 +104,10 @@ void PullManager::DeactivatePullBundleRequest( RAY_CHECK(it != object_pull_requests_.end()); num_bytes_being_pulled_ -= it->second.object_size; active_object_pull_requests_.erase(obj_id); - objects_to_cancel->insert(obj_id); + + if (objects_to_cancel) { + objects_to_cancel->insert(obj_id); + } } } @@ -173,19 +170,12 @@ void PullManager::UpdatePullsBasedOnAvailableMemory(size_t num_bytes_available) RAY_CHECK(last_request_it != pull_request_bundles_.end()); DeactivatePullBundleRequest(last_request_it, &object_ids_to_cancel); } - for (const auto &obj_id : object_ids_to_cancel) { - // Call the cancellation callback outside of the lock. - cancel_pull_request_(obj_id); - } TriggerOutOfMemoryHandlingIfNeeded(); - { - absl::MutexLock lock(&active_objects_mu_); - for (const auto &obj_id : objects_to_pull) { - if (object_ids_to_cancel.count(obj_id) == 0) { - TryToMakeObjectLocal(obj_id); - } + for (const auto &obj_id : objects_to_pull) { + if (object_ids_to_cancel.count(obj_id) == 0) { + TryToMakeObjectLocal(obj_id); } } } @@ -245,16 +235,11 @@ std::vector PullManager::CancelPull(uint64_t request_id) { // If the pull request was being actively pulled, deactivate it now. if (bundle_it->first <= highest_req_id_being_pulled_) { - std::unordered_set object_ids_to_cancel; - DeactivatePullBundleRequest(bundle_it, &object_ids_to_cancel); - for (const auto &obj_id : object_ids_to_cancel) { - // Call the cancellation callback outside of the lock. - cancel_pull_request_(obj_id); - } + DeactivatePullBundleRequest(bundle_it); } // Erase this pull request. - std::vector object_ids_to_cancel_subscription; + std::vector object_ids_to_cancel; for (const auto &ref : bundle_it->second) { auto obj_id = ObjectRefToId(ref); auto it = object_pull_requests_.find(obj_id); @@ -262,7 +247,7 @@ std::vector PullManager::CancelPull(uint64_t request_id) { RAY_CHECK(it->second.bundle_request_ids.erase(bundle_it->first)); if (it->second.bundle_request_ids.empty()) { object_pull_requests_.erase(it); - object_ids_to_cancel_subscription.push_back(obj_id); + object_ids_to_cancel.push_back(obj_id); } } pull_request_bundles_.erase(bundle_it); @@ -272,7 +257,7 @@ std::vector PullManager::CancelPull(uint64_t request_id) { // request to avoid reactivating it again. UpdatePullsBasedOnAvailableMemory(num_bytes_available_); - return object_ids_to_cancel_subscription; + return object_ids_to_cancel; } void PullManager::OnLocationChange(const ObjectID &object_id, @@ -307,10 +292,7 @@ void PullManager::OnLocationChange(const ObjectID &object_id, RAY_LOG(DEBUG) << "OnLocationChange " << spilled_url << " num clients " << client_ids.size(); - { - absl::MutexLock lock(&active_objects_mu_); - TryToMakeObjectLocal(object_id); - } + TryToMakeObjectLocal(object_id); } void PullManager::TryToMakeObjectLocal(const ObjectID &object_id) { @@ -440,7 +422,6 @@ void PullManager::UpdateRetryTimer(ObjectPullRequest &request) { } void PullManager::Tick() { - absl::MutexLock lock(&active_objects_mu_); for (auto &pair : active_object_pull_requests_) { const auto &object_id = pair.first; TryToMakeObjectLocal(object_id); @@ -449,13 +430,7 @@ void PullManager::Tick() { int PullManager::NumActiveRequests() const { return object_pull_requests_.size(); } -bool PullManager::IsObjectActive(const ObjectID &object_id) const { - absl::MutexLock lock(&active_objects_mu_); - return active_object_pull_requests_.count(object_id) == 1; -} - std::string PullManager::DebugString() const { - absl::MutexLock lock(&active_objects_mu_); std::stringstream result; result << "PullManager:"; result << "\n- num bytes available for pulled objects: " << num_bytes_available_; diff --git a/src/ray/object_manager/pull_manager.h b/src/ray/object_manager/pull_manager.h index c8820458d..b0c80e338 100644 --- a/src/ray/object_manager/pull_manager.h +++ b/src/ray/object_manager/pull_manager.h @@ -15,6 +15,7 @@ #include "ray/object_manager/common.h" #include "ray/object_manager/format/object_manager_generated.h" #include "ray/object_manager/notification/object_store_notification_manager_ipc.h" +#include "ray/object_manager/object_buffer_pool.h" #include "ray/object_manager/object_directory.h" #include "ray/object_manager/ownership_based_object_directory.h" #include "ray/object_manager/plasma/store_runner.h" @@ -31,17 +32,13 @@ class PullManager { /// /// \param self_node_id the current node /// \param object_is_local A callback which should return true if a given object is - /// already on the local node. - /// \param send_pull_request A callback which should send a + /// already on the local node. \param send_pull_request A callback which should send a /// pull request to the specified node. - /// \param cancel_pull_request A callback which should - /// cancel pulling an object. /// \param restore_spilled_object A callback which should /// retrieve an spilled object from the external store. PullManager( NodeID &self_node_id, const std::function object_is_local, const std::function send_pull_request, - const std::function cancel_pull_request, const RestoreSpilledObjectCallback restore_spilled_object, const std::function get_time, int pull_timeout_ms, size_t num_bytes_available, std::function object_store_full_callback); @@ -103,8 +100,6 @@ class PullManager { /// The number of ongoing object pulls. int NumActiveRequests() const; - bool IsObjectActive(const ObjectID &object_id) const; - std::string DebugString() const; private: @@ -134,8 +129,7 @@ class PullManager { /// locations. This does nothing if the object is not needed by any pull /// request or if it is already local. This also sets a timeout for when to /// make the next attempt to make the object local. - void TryToMakeObjectLocal(const ObjectID &object_id) - EXCLUSIVE_LOCKS_REQUIRED(active_objects_mu_); + void TryToMakeObjectLocal(const ObjectID &object_id); /// Try to Pull an object from one of its expected client locations. If there /// are more client locations to try after this attempt, then this method @@ -161,7 +155,7 @@ class PullManager { /// operations for the object. void DeactivatePullBundleRequest( const std::map>::iterator &request_it, - std::unordered_set *objects_to_cancel); + std::unordered_set *objects_to_cancel = nullptr); /// Trigger out-of-memory handling if the first request in the queue needs /// more space than the bytes available. This is needed to make room for the @@ -172,7 +166,6 @@ class PullManager { NodeID self_node_id_; const std::function object_is_local_; const std::function send_pull_request_; - const std::function cancel_pull_request_; const RestoreSpilledObjectCallback restore_spilled_object_; const std::function get_time_; uint64_t pull_timeout_ms_; @@ -214,16 +207,12 @@ class PullManager { /// object managers. std::unordered_map object_pull_requests_; - // Protects state that is shared by the threads used to receive object - // chunks. - mutable absl::Mutex active_objects_mu_; - /// The objects that we are currently fetching. This is a subset of the /// objects that we have been asked to fetch. The total size of these objects /// is the number of bytes that we are currently pulling, and it must be less /// than the bytes available. absl::flat_hash_map> - active_object_pull_requests_ GUARDED_BY(active_objects_mu_); + active_object_pull_requests_; /// Internally maintained random number generator. std::mt19937_64 gen_; diff --git a/src/ray/object_manager/test/pull_manager_test.cc b/src/ray/object_manager/test/pull_manager_test.cc index 74eee6e35..ecdaa0619 100644 --- a/src/ray/object_manager/test/pull_manager_test.cc +++ b/src/ray/object_manager/test/pull_manager_test.cc @@ -19,24 +19,22 @@ class PullManagerTestWithCapacity { num_restore_spilled_object_calls_(0), num_object_store_full_calls_(0), fake_time_(0), - pull_manager_( - self_node_id_, [this](const ObjectID &object_id) { return object_is_local_; }, - [this](const ObjectID &object_id, const NodeID &node_id) { - num_send_pull_request_calls_++; - }, - [this](const ObjectID &object_id) { num_abort_calls_[object_id]++; }, - [this](const ObjectID &, const std::string &, const NodeID &, - std::function callback) { - num_restore_spilled_object_calls_++; - restore_object_callback_ = callback; - }, - [this]() { return fake_time_; }, 10000, num_available_bytes, - [this]() { num_object_store_full_calls_++; }) {} + pull_manager_(self_node_id_, + [this](const ObjectID &object_id) { return object_is_local_; }, + [this](const ObjectID &object_id, const NodeID &node_id) { + num_send_pull_request_calls_++; + }, + [this](const ObjectID &, const std::string &, const NodeID &, + std::function callback) { + num_restore_spilled_object_calls_++; + restore_object_callback_ = callback; + }, + [this]() { return fake_time_; }, 10000, num_available_bytes, + [this]() { num_object_store_full_calls_++; }) {} void AssertNoLeaks() { ASSERT_TRUE(pull_manager_.pull_request_bundles_.empty()); ASSERT_TRUE(pull_manager_.object_pull_requests_.empty()); - absl::MutexLock lock(&pull_manager_.active_objects_mu_); ASSERT_TRUE(pull_manager_.active_object_pull_requests_.empty()); // Most tests should not throw OOM. ASSERT_EQ(num_object_store_full_calls_, 0); @@ -50,7 +48,6 @@ class PullManagerTestWithCapacity { std::function restore_object_callback_; double fake_time_; PullManager pull_manager_; - std::unordered_map num_abort_calls_; }; class PullManagerTest : public PullManagerTestWithCapacity, public ::testing::Test { @@ -58,7 +55,6 @@ class PullManagerTest : public PullManagerTestWithCapacity, public ::testing::Te PullManagerTest() : PullManagerTestWithCapacity(1) {} void AssertNumActiveRequestsEquals(size_t num_requests) { - absl::MutexLock lock(&pull_manager_.active_objects_mu_); ASSERT_EQ(pull_manager_.object_pull_requests_.size(), num_requests); ASSERT_EQ(pull_manager_.active_object_pull_requests_.size(), num_requests); } @@ -70,7 +66,6 @@ class PullManagerWithAdmissionControlTest : public PullManagerTestWithCapacity, PullManagerWithAdmissionControlTest() : PullManagerTestWithCapacity(10) {} void AssertNumActiveRequestsEquals(size_t num_requests) { - absl::MutexLock lock(&pull_manager_.active_objects_mu_); ASSERT_EQ(pull_manager_.active_object_pull_requests_.size(), num_requests); } @@ -105,7 +100,6 @@ TEST_F(PullManagerTest, TestStaleSubscription) { // There are no client ids to pull from. ASSERT_EQ(num_send_pull_request_calls_, 0); ASSERT_EQ(num_restore_spilled_object_calls_, 0); - ASSERT_TRUE(num_abort_calls_.empty()); auto objects_to_cancel = pull_manager_.CancelPull(req_id); ASSERT_EQ(objects_to_cancel, ObjectRefsToIds(refs)); @@ -120,7 +114,6 @@ TEST_F(PullManagerTest, TestStaleSubscription) { // Now we're getting a notification about an object that was already cancelled. ASSERT_EQ(num_send_pull_request_calls_, 0); ASSERT_EQ(num_restore_spilled_object_calls_, 0); - ASSERT_EQ(num_abort_calls_[oid], 1); AssertNoLeaks(); } @@ -169,10 +162,8 @@ TEST_F(PullManagerTest, TestRestoreSpilledObject) { NodeID::FromRandom(), 0); ASSERT_EQ(num_restore_spilled_object_calls_, 0); - ASSERT_TRUE(num_abort_calls_.empty()); auto objects_to_cancel = pull_manager_.CancelPull(req_id); ASSERT_EQ(objects_to_cancel, ObjectRefsToIds(refs)); - ASSERT_EQ(num_abort_calls_[obj1], 1); AssertNoLeaks(); } @@ -225,9 +216,7 @@ TEST_F(PullManagerTest, TestRestoreObjectFailed) { ASSERT_EQ(num_send_pull_request_calls_, 1); ASSERT_EQ(num_restore_spilled_object_calls_, 2); - ASSERT_TRUE(num_abort_calls_.empty()); auto objects_to_cancel = pull_manager_.CancelPull(req_id); - ASSERT_EQ(num_abort_calls_[obj1], 1); AssertNoLeaks(); } @@ -257,7 +246,6 @@ TEST_F(PullManagerTest, TestLoadBalancingRestorationRequest) { // Make sure the restore request wasn't sent since there are nodes that have a copied // object. ASSERT_EQ(num_restore_spilled_object_calls_, 0); - ASSERT_TRUE(num_abort_calls_.empty()); } TEST_F(PullManagerTest, TestManyUpdates) { @@ -281,10 +269,8 @@ TEST_F(PullManagerTest, TestManyUpdates) { ASSERT_EQ(num_send_pull_request_calls_, 1); ASSERT_EQ(num_restore_spilled_object_calls_, 0); - ASSERT_TRUE(num_abort_calls_.empty()); auto objects_to_cancel = pull_manager_.CancelPull(req_id); ASSERT_EQ(objects_to_cancel, ObjectRefsToIds(refs)); - ASSERT_EQ(num_abort_calls_[obj1], 1); AssertNoLeaks(); } @@ -330,10 +316,8 @@ TEST_F(PullManagerTest, TestRetryTimer) { } ASSERT_EQ(num_send_pull_request_calls_, 0); - ASSERT_TRUE(num_abort_calls_.empty()); auto objects_to_cancel = pull_manager_.CancelPull(req_id); ASSERT_EQ(objects_to_cancel, ObjectRefsToIds(refs)); - ASSERT_EQ(num_abort_calls_[obj1], 1); AssertNoLeaks(); } @@ -364,13 +348,9 @@ TEST_F(PullManagerTest, TestBasic) { } ASSERT_EQ(num_send_pull_request_calls_, 0); - ASSERT_TRUE(num_abort_calls_.empty()); auto objects_to_cancel = pull_manager_.CancelPull(req_id); ASSERT_EQ(objects_to_cancel, oids); AssertNumActiveRequestsEquals(0); - for (auto &oid : oids) { - ASSERT_EQ(num_abort_calls_[oid], 1); - } // Don't pull a remote object if we've canceled. object_is_local_ = false; @@ -407,7 +387,6 @@ TEST_F(PullManagerTest, TestDeduplicateBundles) { // Cancel one request. auto objects_to_cancel = pull_manager_.CancelPull(req_id1); - ASSERT_TRUE(num_abort_calls_.empty()); ASSERT_TRUE(objects_to_cancel.empty()); // Objects should still be pulled because the other request is still open. AssertNumActiveRequestsEquals(oids.size()); @@ -421,13 +400,9 @@ TEST_F(PullManagerTest, TestDeduplicateBundles) { } // Cancel the other request. - ASSERT_TRUE(num_abort_calls_.empty()); objects_to_cancel = pull_manager_.CancelPull(req_id2); ASSERT_EQ(objects_to_cancel, oids); AssertNumActiveRequestsEquals(0); - for (auto &oid : oids) { - ASSERT_EQ(num_abort_calls_[oid], 1); - } // Don't pull a remote object if we've canceled. object_is_local_ = false; @@ -463,14 +438,10 @@ TEST_F(PullManagerWithAdmissionControlTest, TestBasic) { ASSERT_TRUE(IsUnderCapacity(oids.size() * object_size)); // Reduce the available memory. - ASSERT_TRUE(num_abort_calls_.empty()); ASSERT_EQ(num_object_store_full_calls_, 0); pull_manager_.UpdatePullsBasedOnAvailableMemory(oids.size() * object_size - 1); AssertNumActiveRequestsEquals(0); ASSERT_EQ(num_object_store_full_calls_, 1); - for (auto &oid : oids) { - ASSERT_EQ(num_abort_calls_[oid], 1); - } // No new pull requests after the next tick. fake_time_ += 10; auto prev_pull_requests = num_send_pull_request_calls_; @@ -489,14 +460,8 @@ TEST_F(PullManagerWithAdmissionControlTest, TestBasic) { // OOM was not triggered a second time. ASSERT_EQ(num_object_store_full_calls_, 1); num_object_store_full_calls_ = 0; - for (auto &oid : oids) { - ASSERT_EQ(num_abort_calls_[oid], 1); - } pull_manager_.CancelPull(req_id); - for (auto &oid : oids) { - ASSERT_EQ(num_abort_calls_[oid], 2); - } AssertNoLeaks(); }