diff --git a/python/ray/tests/test_object_manager.py b/python/ray/tests/test_object_manager.py index 9636fadb0..10ed49732 100644 --- a/python/ray/tests/test_object_manager.py +++ b/python/ray/tests/test_object_manager.py @@ -175,46 +175,47 @@ def test_actor_broadcast(ray_start_cluster_with_resource): # Wait for profiling information to be pushed to the profile table. time.sleep(1) - transfer_events = ray.state.object_transfer_timeline() + # TODO(Sang): Re-enable it after event is introduced. + # transfer_events = ray.state.object_transfer_timeline() - # Make sure that each object was transferred a reasonable number of times. - for x_id in object_refs: - relevant_events = [ - event for event in transfer_events if - event["cat"] == "transfer_send" and event["args"][0] == x_id.hex() - ] + # # Make sure that each object was transferred a reasonable number of times. # noqa + # for x_id in object_refs: + # relevant_events = [ + # event for event in transfer_events if + # event["cat"] == "transfer_send" and event["args"][0] == x_id.hex() # noqa + # ] - # NOTE: Each event currently appears twice because we duplicate the - # send and receive boxes to underline them with a box (black if it is a - # send and gray if it is a receive). So we need to remove these extra - # boxes here. - deduplicated_relevant_events = [ - event for event in relevant_events if event["cname"] != "black" - ] - assert len(deduplicated_relevant_events) * 2 == len(relevant_events) - relevant_events = deduplicated_relevant_events + # # NOTE: Each event currently appears twice because we duplicate the + # # send and receive boxes to underline them with a box (black if it is a # noqa + # # send and gray if it is a receive). So we need to remove these extra + # # boxes here. + # deduplicated_relevant_events = [ + # event for event in relevant_events if event["cname"] != "black" + # ] + # assert len(deduplicated_relevant_events) * 2 == len(relevant_events) + # relevant_events = deduplicated_relevant_events - # Each object must have been broadcast to each remote machine. - assert len(relevant_events) >= num_nodes - 1 - # If more object transfers than necessary have been done, print a - # warning. - if len(relevant_events) > num_nodes - 1: - warnings.warn("This object was transferred {} times, when only {} " - "transfers were required.".format( - len(relevant_events), num_nodes - 1)) - # Each object should not have been broadcast more than once from every - # machine to every other machine. Also, a pair of machines should not - # both have sent the object to each other. - assert len(relevant_events) <= (num_nodes - 1) * num_nodes / 2 + # # Each object must have been broadcast to each remote machine. + # assert len(relevant_events) >= num_nodes - 1 + # # If more object transfers than necessary have been done, print a + # # warning. + # if len(relevant_events) > num_nodes - 1: + # warnings.warn("This object was transferred {} times, when only {} " # noqa + # "transfers were required.".format( + # len(relevant_events), num_nodes - 1)) + # # Each object should not have been broadcast more than once from every # noqa + # # machine to every other machine. Also, a pair of machines should not + # # both have sent the object to each other. + # assert len(relevant_events) <= (num_nodes - 1) * num_nodes / 2 - # Make sure that no object was sent multiple times between the same - # pair of object managers. - send_counts = defaultdict(int) - for event in relevant_events: - # The pid identifies the sender and the tid identifies the - # receiver. - send_counts[(event["pid"], event["tid"])] += 1 - assert all(value == 1 for value in send_counts.values()) + # # Make sure that no object was sent multiple times between the same + # # pair of object managers. + # send_counts = defaultdict(int) + # for event in relevant_events: + # # The pid identifies the sender and the tid identifies the + # # receiver. + # send_counts[(event["pid"], event["tid"])] += 1 + # assert all(value == 1 for value in send_counts.values()) # The purpose of this test is to make sure we can transfer many objects. In the diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index b1594fc20..36fd811fe 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -300,36 +300,9 @@ void ObjectManager::HandleSendFinished(const ObjectID &object_id, const NodeID & << ", status: " << status.ToString(); if (!status.ok()) { // TODO(rkn): What do we want to do if the send failed? + RAY_LOG(DEBUG) << "Failed to send a push request for an object " << object_id + << " to " << node_id << ". Chunk index: " << chunk_index; } - - rpc::ProfileTableData::ProfileEvent profile_event; - profile_event.set_event_type("transfer_send"); - profile_event.set_start_time(start_time); - profile_event.set_end_time(end_time); - // Encode the object ID, node ID, chunk index, and status as a json list, - // which will be parsed by the reader of the profile table. - profile_event.set_extra_data("[\"" + object_id.Hex() + "\",\"" + node_id.Hex() + "\"," + - std::to_string(chunk_index) + ",\"" + status.ToString() + - "\"]"); - - std::lock_guard lock(profile_mutex_); - profile_events_.push_back(profile_event); -} - -void ObjectManager::HandleReceiveFinished(const ObjectID &object_id, - const NodeID &node_id, uint64_t chunk_index, - double start_time, double end_time) { - rpc::ProfileTableData::ProfileEvent profile_event; - profile_event.set_event_type("transfer_receive"); - profile_event.set_start_time(start_time); - profile_event.set_end_time(end_time); - // Encode the object ID, node ID, chunk index as a json list, - // which will be parsed by the reader of the profile table. - profile_event.set_extra_data("[\"" + object_id.Hex() + "\",\"" + node_id.Hex() + "\"," + - std::to_string(chunk_index) + "]"); - - std::lock_guard lock(profile_mutex_); - profile_events_.push_back(profile_event); } void ObjectManager::Push(const ObjectID &object_id, const NodeID &node_id) { @@ -741,7 +714,6 @@ void ObjectManager::HandlePush(const rpc::PushRequest &request, rpc::PushReply * const rpc::Address &owner_address = request.owner_address(); const std::string &data = request.data(); - double start_time = absl::GetCurrentTimeNanos() / 1e9; bool success = ReceiveObjectChunk(node_id, object_id, owner_address, data_size, metadata_size, chunk_index, data); num_chunks_received_total_++; @@ -752,9 +724,7 @@ void ObjectManager::HandlePush(const rpc::PushRequest &request, rpc::PushReply * << num_chunks_received_total_failed_ << "/" << num_chunks_received_total_ << " failed"; } - double end_time = absl::GetCurrentTimeNanos() / 1e9; - HandleReceiveFinished(object_id, node_id, chunk_index, start_time, end_time); send_reply_callback(Status::OK(), nullptr, nullptr); } @@ -802,16 +772,6 @@ void ObjectManager::HandlePull(const rpc::PullRequest &request, rpc::PullReply * RAY_LOG(DEBUG) << "Received pull request from node " << node_id << " for object [" << object_id << "]."; - rpc::ProfileTableData::ProfileEvent profile_event; - profile_event.set_event_type("receive_pull_request"); - profile_event.set_start_time(absl::GetCurrentTimeNanos() / 1e9); - profile_event.set_end_time(profile_event.start_time()); - profile_event.set_extra_data("[\"" + object_id.Hex() + "\",\"" + node_id.Hex() + "\"]"); - { - std::lock_guard lock(profile_mutex_); - profile_events_.emplace_back(profile_event); - } - main_service_->post([this, object_id, node_id]() { Push(object_id, node_id); }, "ObjectManager.HandlePull"); send_reply_callback(Status::OK(), nullptr, nullptr); @@ -889,22 +849,6 @@ std::shared_ptr ObjectManager::GetRpcClient( return it->second; } -std::shared_ptr ObjectManager::GetAndResetProfilingInfo() { - auto profile_info = std::make_shared(); - profile_info->set_component_type("object_manager"); - profile_info->set_component_id(self_node_id_.Binary()); - - { - std::lock_guard lock(profile_mutex_); - for (auto const &profile_event : profile_events_) { - profile_info->add_profile_events()->CopyFrom(profile_event); - } - profile_events_.clear(); - } - - return profile_info; -} - std::string ObjectManager::DebugString() const { std::stringstream result; result << "ObjectManager:"; @@ -912,7 +856,6 @@ std::string ObjectManager::DebugString() const { result << "\n- num active wait requests: " << active_wait_requests_.size(); result << "\n- num unfulfilled push requests: " << unfulfilled_push_requests_.size(); result << "\n- num pull requests: " << pull_manager_->NumActiveRequests(); - result << "\n- num buffered profile events: " << profile_events_.size(); result << "\n- num chunks received total: " << num_chunks_received_total_; result << "\n- num chunks received failed (all): " << num_chunks_received_total_failed_; result << "\n- num chunks received failed / cancelled: " diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index 6d4f7a6ae..37b5d6a48 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -269,12 +269,6 @@ class ObjectManager : public ObjectManagerInterface, /// or send it to all the object stores. void FreeObjects(const std::vector &object_ids, bool local_only); - /// Return profiling information and reset the profiling information. - /// - /// \return All profiling information that has accumulated since the last call - /// to this method. - std::shared_ptr GetAndResetProfilingInfo(); - /// Returns debug string for class. /// /// \return string. @@ -424,21 +418,6 @@ class ObjectManager : public ObjectManagerInterface, uint64_t chunk_index, double start_time_us, double end_time_us, ray::Status status); - /// This is used to notify the main thread that the receiving of a chunk has - /// completed. - /// - /// \param object_id The ID of the object that was received. - /// \param node_id The ID of the node that the chunk was received from. - /// \param chunk_index The index of the chunk. - /// \param start_time_us The time when the object manager began receiving the - /// chunk. - /// \param end_time_us The time when the object manager finished receiving the - /// chunk. - /// \return Void. - void HandleReceiveFinished(const ObjectID &object_id, const NodeID &node_id, - uint64_t chunk_index, double start_time_us, - double end_time_us); - /// Handle Push task timeout. void HandlePushTaskTimeout(const ObjectID &object_id, const NodeID &node_id); @@ -484,14 +463,6 @@ class ObjectManager : public ObjectManagerInterface, ObjectID, std::unordered_map>> unfulfilled_push_requests_; - /// Profiling events that are to be batched together and added to the profile - /// table in the GCS. - std::vector profile_events_; - - /// mutex lock used to protect profile_events_, profile_events_ is used in main thread - /// and rpc thread. - std::mutex profile_mutex_; - /// The gPRC server. rpc::GrpcServer object_manager_server_; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 35fc2bf66..4dacc3ac9 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -473,12 +473,6 @@ ray::Status NodeManager::RegisterGcs() { periodical_runner_.RunFnPeriodically( [this] { ReportResourceUsage(); }, report_resources_period_ms_, "NodeManager.deadline_timer.report_resource_usage"); - // Start the timer that gets object manager profiling information and sends it - // to the GCS. - periodical_runner_.RunFnPeriodically( - [this] { GetObjectManagerProfileInfo(); }, - RayConfig::instance().raylet_heartbeat_period_milliseconds(), - "NodeManager.deadline_timer.object_manager_profiling"); /// If periodic asio stats print is enabled, it will print it. const auto event_stats_print_interval_ms = @@ -782,21 +776,6 @@ void NodeManager::WarnResourceDeadlock() { cluster_task_manager_->ScheduleAndDispatchTasks(); } -void NodeManager::GetObjectManagerProfileInfo() { - int64_t start_time_ms = current_time_ms(); - - auto profile_info = object_manager_.GetAndResetProfilingInfo(); - - if (profile_info->profile_events_size() > 0) { - RAY_CHECK_OK(gcs_client_->Stats().AsyncAddProfileData(profile_info, nullptr)); - } - - int64_t interval = current_time_ms() - start_time_ms; - if (interval > RayConfig::instance().handler_warning_timeout_ms()) { - RAY_LOG(WARNING) << "GetObjectManagerProfileInfo handler took " << interval << " ms."; - } -} - void NodeManager::NodeAdded(const GcsNodeInfo &node_info) { const NodeID node_id = NodeID::FromBinary(node_info.node_id()); diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 4cc17cce1..9e6fd6aac 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -259,11 +259,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// to eagerly evict all plasma copies of the object from the cluster. void FlushObjectsToFree(); - /// Get profiling information from the object manager and push it to the GCS. - /// - /// \return Void. - void GetObjectManagerProfileInfo(); - /// Handler for a resource usage notification from the GCS. /// /// \param id The ID of the node manager that sent the resources data.