Revert "Revert "[Release 1.11.0][Core] avoid unnecessary work during event st… (#22144)" (#22284)

This reverts commit 6235b6d7e9.

Looks like windows://python/ray/tests:test_dataclient_disconnect has similar level of flakiness as before the revert. This seems unrelated and the test needs to be fixed in another way.
This commit is contained in:
mwtian 2022-02-15 00:20:28 -08:00 committed by GitHub
parent 6f5afcbce9
commit 59d9e20a4c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 31 additions and 29 deletions

View file

@ -71,7 +71,7 @@ std::shared_ptr<StatsHandle> EventTracker::RecordStart(
ray::stats::STATS_operation_count.Record(curr_count, name);
ray::stats::STATS_operation_active_count.Record(curr_count, name);
return std::make_shared<StatsHandle>(
name, absl::GetCurrentTimeNanos() + expected_queueing_delay_ns, stats,
name, absl::GetCurrentTimeNanos() + expected_queueing_delay_ns, std::move(stats),
global_stats_);
}
@ -92,18 +92,18 @@ void EventTracker::RecordExecution(const std::function<void()> &fn,
// Update event-specific stats.
ray::stats::STATS_operation_run_time_ms.Record(execution_time_ns / 1000000,
handle->event_name);
int64_t curr_count;
{
auto &stats = handle->handler_stats;
absl::MutexLock lock(&(stats->mutex));
// Event-specific execution stats.
stats->stats.cum_execution_time += execution_time_ns;
// Event-specific current count.
stats->stats.curr_count--;
ray::stats::STATS_operation_active_count.Record(stats->stats.curr_count,
handle->event_name);
curr_count = --stats->stats.curr_count;
// Event-specific running count.
stats->stats.running_count--;
}
ray::stats::STATS_operation_active_count.Record(curr_count, handle->event_name);
// Update global stats.
const auto queue_time_ns = start_execution - handle->start_time;
ray::stats::STATS_operation_queue_time_ms.Record(queue_time_ns / 1000000,
@ -137,14 +137,7 @@ std::shared_ptr<GuardedEventStats> EventTracker::GetOrCreate(const std::string &
absl::WriterMutexLock lock(&mutex_);
const auto pair =
post_handler_stats_.try_emplace(name, std::make_shared<GuardedEventStats>());
if (pair.second) {
it = pair.first;
} else {
it = post_handler_stats_.find(name);
// If try_emplace failed to insert the item, the item is guaranteed to exist in
// the table.
RAY_CHECK(it != post_handler_stats_.end());
}
it = pair.first;
result = it->second;
} else {
result = it->second;

View file

@ -94,7 +94,7 @@ struct StatsHandle {
class EventTracker {
public:
/// Initializes the global stats struct after calling the base contructor.
/// Initializes the global stats struct after calling the base constructor.
EventTracker() : global_stats_(std::make_shared<GuardedGlobalStats>()) {}
/// Sets the queueing start time, increments the current and cumulative counts and

View file

@ -274,7 +274,7 @@ void RegisterViewWithTagList(const std::string &name, const std::string &descrip
RegisterViewWithTagList<Ts...>(name, description, tag_keys, buckets);
}
inline std::vector<opencensus::tags::TagKey> convertTags(
inline std::vector<opencensus::tags::TagKey> convert_tags(
const std::vector<std::string> &names) {
std::vector<opencensus::tags::TagKey> ret;
for (auto &n : names) {
@ -303,10 +303,10 @@ class Stats {
const std::vector<opencensus::tags::TagKey>,
const std::vector<double> &buckets)>
register_func)
: tag_keys_(tag_keys) {
: tag_keys_(convert_tags(tag_keys)) {
auto stats_init = [register_func, measure, description, buckets, this]() {
measure_ = std::make_unique<Measure>(Measure::Register(measure, description, ""));
register_func(measure, description, convertTags(tag_keys_), buckets);
register_func(measure, description, tag_keys_, buckets);
};
if (StatsConfig::instance().IsInitialized()) {
@ -326,8 +326,13 @@ class Stats {
/// this metric.
void Record(double val, std::string tag_val) {
RAY_CHECK(tag_keys_.size() == 1);
std::unordered_map<std::string, std::string> tags{{tag_keys_[0], std::move(tag_val)}};
Record(val, std::move(tags));
if (StatsConfig::instance().IsStatsDisabled() || !measure_) {
return;
}
TagsType combined_tags = StatsConfig::instance().GetGlobalTags();
CheckPrintableChar(tag_val);
combined_tags.emplace_back(tag_keys_[0], std::move(tag_val));
opencensus::stats::Record({{*measure_, val}}, std::move(combined_tags));
}
/// Record a value
@ -338,22 +343,26 @@ class Stats {
return;
}
TagsType combined_tags = StatsConfig::instance().GetGlobalTags();
// In case that tag containing non-printable chars we replace them to '?'
// It's important here because otherwise, the message will fail to be sent.
for (auto &t : tags) {
for (auto &c : t.second) {
if (!isprint(c)) {
c = '?';
}
}
combined_tags.emplace_back(TagKeyType::Register(t.first), std::move(t.second));
for (auto &[tag_key, tag_val] : tags) {
CheckPrintableChar(tag_val);
combined_tags.emplace_back(TagKeyType::Register(tag_key), std::move(tag_val));
}
opencensus::stats::Record({{*measure_, val}}, combined_tags);
opencensus::stats::Record({{*measure_, val}}, std::move(combined_tags));
}
private:
void CheckPrintableChar(const std::string &val) {
#ifndef NDEBUG
// In debug build, verify val is printable.
for (auto c : val) {
RAY_CHECK(isprint(c)) << "Found unprintable character code " << static_cast<int>(c)
<< " in " << val;
}
#endif // NDEBUG
}
const std::vector<opencensus::tags::TagKey> tag_keys_;
std::unique_ptr<opencensus::stats::Measure<double>> measure_;
std::vector<std::string> tag_keys_;
};
} // namespace internal