Remove an unused profile event code from object manager. (#17529)

* Remove an unused profile event code from object manager.

* Addressed code review.

* Temporarily skip a test

* lint
This commit is contained in:
SangBin Cho 2021-08-05 17:13:16 -07:00 committed by GitHub
parent d59d6ad653
commit 8bc9286296
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 39 additions and 150 deletions

View file

@ -175,46 +175,47 @@ def test_actor_broadcast(ray_start_cluster_with_resource):
# Wait for profiling information to be pushed to the profile table.
time.sleep(1)
transfer_events = ray.state.object_transfer_timeline()
# TODO(Sang): Re-enable it after event is introduced.
# transfer_events = ray.state.object_transfer_timeline()
# Make sure that each object was transferred a reasonable number of times.
for x_id in object_refs:
relevant_events = [
event for event in transfer_events if
event["cat"] == "transfer_send" and event["args"][0] == x_id.hex()
]
# # Make sure that each object was transferred a reasonable number of times. # noqa
# for x_id in object_refs:
# relevant_events = [
# event for event in transfer_events if
# event["cat"] == "transfer_send" and event["args"][0] == x_id.hex() # noqa
# ]
# NOTE: Each event currently appears twice because we duplicate the
# send and receive boxes to underline them with a box (black if it is a
# send and gray if it is a receive). So we need to remove these extra
# boxes here.
deduplicated_relevant_events = [
event for event in relevant_events if event["cname"] != "black"
]
assert len(deduplicated_relevant_events) * 2 == len(relevant_events)
relevant_events = deduplicated_relevant_events
# # NOTE: Each event currently appears twice because we duplicate the
# # send and receive boxes to underline them with a box (black if it is a # noqa
# # send and gray if it is a receive). So we need to remove these extra
# # boxes here.
# deduplicated_relevant_events = [
# event for event in relevant_events if event["cname"] != "black"
# ]
# assert len(deduplicated_relevant_events) * 2 == len(relevant_events)
# relevant_events = deduplicated_relevant_events
# Each object must have been broadcast to each remote machine.
assert len(relevant_events) >= num_nodes - 1
# If more object transfers than necessary have been done, print a
# warning.
if len(relevant_events) > num_nodes - 1:
warnings.warn("This object was transferred {} times, when only {} "
"transfers were required.".format(
len(relevant_events), num_nodes - 1))
# Each object should not have been broadcast more than once from every
# machine to every other machine. Also, a pair of machines should not
# both have sent the object to each other.
assert len(relevant_events) <= (num_nodes - 1) * num_nodes / 2
# # Each object must have been broadcast to each remote machine.
# assert len(relevant_events) >= num_nodes - 1
# # If more object transfers than necessary have been done, print a
# # warning.
# if len(relevant_events) > num_nodes - 1:
# warnings.warn("This object was transferred {} times, when only {} " # noqa
# "transfers were required.".format(
# len(relevant_events), num_nodes - 1))
# # Each object should not have been broadcast more than once from every # noqa
# # machine to every other machine. Also, a pair of machines should not
# # both have sent the object to each other.
# assert len(relevant_events) <= (num_nodes - 1) * num_nodes / 2
# Make sure that no object was sent multiple times between the same
# pair of object managers.
send_counts = defaultdict(int)
for event in relevant_events:
# The pid identifies the sender and the tid identifies the
# receiver.
send_counts[(event["pid"], event["tid"])] += 1
assert all(value == 1 for value in send_counts.values())
# # Make sure that no object was sent multiple times between the same
# # pair of object managers.
# send_counts = defaultdict(int)
# for event in relevant_events:
# # The pid identifies the sender and the tid identifies the
# # receiver.
# send_counts[(event["pid"], event["tid"])] += 1
# assert all(value == 1 for value in send_counts.values())
# The purpose of this test is to make sure we can transfer many objects. In the

View file

@ -300,36 +300,9 @@ void ObjectManager::HandleSendFinished(const ObjectID &object_id, const NodeID &
<< ", status: " << status.ToString();
if (!status.ok()) {
// TODO(rkn): What do we want to do if the send failed?
RAY_LOG(DEBUG) << "Failed to send a push request for an object " << object_id
<< " to " << node_id << ". Chunk index: " << chunk_index;
}
rpc::ProfileTableData::ProfileEvent profile_event;
profile_event.set_event_type("transfer_send");
profile_event.set_start_time(start_time);
profile_event.set_end_time(end_time);
// Encode the object ID, node ID, chunk index, and status as a json list,
// which will be parsed by the reader of the profile table.
profile_event.set_extra_data("[\"" + object_id.Hex() + "\",\"" + node_id.Hex() + "\"," +
std::to_string(chunk_index) + ",\"" + status.ToString() +
"\"]");
std::lock_guard<std::mutex> lock(profile_mutex_);
profile_events_.push_back(profile_event);
}
void ObjectManager::HandleReceiveFinished(const ObjectID &object_id,
const NodeID &node_id, uint64_t chunk_index,
double start_time, double end_time) {
rpc::ProfileTableData::ProfileEvent profile_event;
profile_event.set_event_type("transfer_receive");
profile_event.set_start_time(start_time);
profile_event.set_end_time(end_time);
// Encode the object ID, node ID, chunk index as a json list,
// which will be parsed by the reader of the profile table.
profile_event.set_extra_data("[\"" + object_id.Hex() + "\",\"" + node_id.Hex() + "\"," +
std::to_string(chunk_index) + "]");
std::lock_guard<std::mutex> lock(profile_mutex_);
profile_events_.push_back(profile_event);
}
void ObjectManager::Push(const ObjectID &object_id, const NodeID &node_id) {
@ -741,7 +714,6 @@ void ObjectManager::HandlePush(const rpc::PushRequest &request, rpc::PushReply *
const rpc::Address &owner_address = request.owner_address();
const std::string &data = request.data();
double start_time = absl::GetCurrentTimeNanos() / 1e9;
bool success = ReceiveObjectChunk(node_id, object_id, owner_address, data_size,
metadata_size, chunk_index, data);
num_chunks_received_total_++;
@ -752,9 +724,7 @@ void ObjectManager::HandlePush(const rpc::PushRequest &request, rpc::PushReply *
<< num_chunks_received_total_failed_ << "/"
<< num_chunks_received_total_ << " failed";
}
double end_time = absl::GetCurrentTimeNanos() / 1e9;
HandleReceiveFinished(object_id, node_id, chunk_index, start_time, end_time);
send_reply_callback(Status::OK(), nullptr, nullptr);
}
@ -802,16 +772,6 @@ void ObjectManager::HandlePull(const rpc::PullRequest &request, rpc::PullReply *
RAY_LOG(DEBUG) << "Received pull request from node " << node_id << " for object ["
<< object_id << "].";
rpc::ProfileTableData::ProfileEvent profile_event;
profile_event.set_event_type("receive_pull_request");
profile_event.set_start_time(absl::GetCurrentTimeNanos() / 1e9);
profile_event.set_end_time(profile_event.start_time());
profile_event.set_extra_data("[\"" + object_id.Hex() + "\",\"" + node_id.Hex() + "\"]");
{
std::lock_guard<std::mutex> lock(profile_mutex_);
profile_events_.emplace_back(profile_event);
}
main_service_->post([this, object_id, node_id]() { Push(object_id, node_id); },
"ObjectManager.HandlePull");
send_reply_callback(Status::OK(), nullptr, nullptr);
@ -889,22 +849,6 @@ std::shared_ptr<rpc::ObjectManagerClient> ObjectManager::GetRpcClient(
return it->second;
}
std::shared_ptr<rpc::ProfileTableData> ObjectManager::GetAndResetProfilingInfo() {
auto profile_info = std::make_shared<rpc::ProfileTableData>();
profile_info->set_component_type("object_manager");
profile_info->set_component_id(self_node_id_.Binary());
{
std::lock_guard<std::mutex> lock(profile_mutex_);
for (auto const &profile_event : profile_events_) {
profile_info->add_profile_events()->CopyFrom(profile_event);
}
profile_events_.clear();
}
return profile_info;
}
std::string ObjectManager::DebugString() const {
std::stringstream result;
result << "ObjectManager:";
@ -912,7 +856,6 @@ std::string ObjectManager::DebugString() const {
result << "\n- num active wait requests: " << active_wait_requests_.size();
result << "\n- num unfulfilled push requests: " << unfulfilled_push_requests_.size();
result << "\n- num pull requests: " << pull_manager_->NumActiveRequests();
result << "\n- num buffered profile events: " << profile_events_.size();
result << "\n- num chunks received total: " << num_chunks_received_total_;
result << "\n- num chunks received failed (all): " << num_chunks_received_total_failed_;
result << "\n- num chunks received failed / cancelled: "

View file

@ -269,12 +269,6 @@ class ObjectManager : public ObjectManagerInterface,
/// or send it to all the object stores.
void FreeObjects(const std::vector<ObjectID> &object_ids, bool local_only);
/// Return profiling information and reset the profiling information.
///
/// \return All profiling information that has accumulated since the last call
/// to this method.
std::shared_ptr<rpc::ProfileTableData> GetAndResetProfilingInfo();
/// Returns debug string for class.
///
/// \return string.
@ -424,21 +418,6 @@ class ObjectManager : public ObjectManagerInterface,
uint64_t chunk_index, double start_time_us, double end_time_us,
ray::Status status);
/// This is used to notify the main thread that the receiving of a chunk has
/// completed.
///
/// \param object_id The ID of the object that was received.
/// \param node_id The ID of the node that the chunk was received from.
/// \param chunk_index The index of the chunk.
/// \param start_time_us The time when the object manager began receiving the
/// chunk.
/// \param end_time_us The time when the object manager finished receiving the
/// chunk.
/// \return Void.
void HandleReceiveFinished(const ObjectID &object_id, const NodeID &node_id,
uint64_t chunk_index, double start_time_us,
double end_time_us);
/// Handle Push task timeout.
void HandlePushTaskTimeout(const ObjectID &object_id, const NodeID &node_id);
@ -484,14 +463,6 @@ class ObjectManager : public ObjectManagerInterface,
ObjectID, std::unordered_map<NodeID, std::unique_ptr<boost::asio::deadline_timer>>>
unfulfilled_push_requests_;
/// Profiling events that are to be batched together and added to the profile
/// table in the GCS.
std::vector<rpc::ProfileTableData::ProfileEvent> profile_events_;
/// mutex lock used to protect profile_events_, profile_events_ is used in main thread
/// and rpc thread.
std::mutex profile_mutex_;
/// The gPRC server.
rpc::GrpcServer object_manager_server_;

View file

@ -473,12 +473,6 @@ ray::Status NodeManager::RegisterGcs() {
periodical_runner_.RunFnPeriodically(
[this] { ReportResourceUsage(); }, report_resources_period_ms_,
"NodeManager.deadline_timer.report_resource_usage");
// Start the timer that gets object manager profiling information and sends it
// to the GCS.
periodical_runner_.RunFnPeriodically(
[this] { GetObjectManagerProfileInfo(); },
RayConfig::instance().raylet_heartbeat_period_milliseconds(),
"NodeManager.deadline_timer.object_manager_profiling");
/// If periodic asio stats print is enabled, it will print it.
const auto event_stats_print_interval_ms =
@ -782,21 +776,6 @@ void NodeManager::WarnResourceDeadlock() {
cluster_task_manager_->ScheduleAndDispatchTasks();
}
void NodeManager::GetObjectManagerProfileInfo() {
int64_t start_time_ms = current_time_ms();
auto profile_info = object_manager_.GetAndResetProfilingInfo();
if (profile_info->profile_events_size() > 0) {
RAY_CHECK_OK(gcs_client_->Stats().AsyncAddProfileData(profile_info, nullptr));
}
int64_t interval = current_time_ms() - start_time_ms;
if (interval > RayConfig::instance().handler_warning_timeout_ms()) {
RAY_LOG(WARNING) << "GetObjectManagerProfileInfo handler took " << interval << " ms.";
}
}
void NodeManager::NodeAdded(const GcsNodeInfo &node_info) {
const NodeID node_id = NodeID::FromBinary(node_info.node_id());

View file

@ -259,11 +259,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// to eagerly evict all plasma copies of the object from the cluster.
void FlushObjectsToFree();
/// Get profiling information from the object manager and push it to the GCS.
///
/// \return Void.
void GetObjectManagerProfileInfo();
/// Handler for a resource usage notification from the GCS.
///
/// \param id The ID of the node manager that sent the resources data.