ray/thirdparty/patches/opencensus-cpp-shutdown-api.patch

169 lines
5.4 KiB
Diff

diff --git opencensus/stats/internal/delta_producer.cc opencensus/stats/internal/delta_producer.cc
index c61b4d9..b3e4ef2 100644
--- opencensus/stats/internal/delta_producer.cc
+++ opencensus/stats/internal/delta_producer.cc
@@ -75,6 +75,20 @@ DeltaProducer* DeltaProducer::Get() {
return global_delta_producer;
}
+void DeltaProducer::Shutdown() {
+ {
+ absl::MutexLock l(&mu_);
+ if (!thread_started_) {
+ return;
+ }
+ thread_started_ = false;
+ }
+ // Join loop thread when shutdown.
+ if (harvester_thread_.joinable()) {
+ harvester_thread_.join();
+ }
+}
+
void DeltaProducer::AddMeasure() {
delta_mu_.Lock();
absl::MutexLock harvester_lock(&harvester_mu_);
@@ -115,7 +129,10 @@ void DeltaProducer::Flush() {
}
DeltaProducer::DeltaProducer()
- : harvester_thread_(&DeltaProducer::RunHarvesterLoop, this) {}
+ : harvester_thread_(&DeltaProducer::RunHarvesterLoop, this) {
+ absl::MutexLock l(&mu_);
+ thread_started_ = true;
+}
void DeltaProducer::SwapDeltas() {
ABSL_ASSERT(last_delta_.delta().empty() && "Last delta was not consumed.");
@@ -131,11 +148,19 @@ void DeltaProducer::RunHarvesterLoop() {
absl::Time next_harvest_time = absl::Now() + harvest_interval_;
while (true) {
const absl::Time now = absl::Now();
- absl::SleepFor(next_harvest_time - now);
+ absl::SleepFor(absl::Seconds(0.1));
// Account for the possibility that the last harvest took longer than
// harvest_interval_ and we are already past next_harvest_time.
- next_harvest_time = std::max(next_harvest_time, now) + harvest_interval_;
- Flush();
+ if (absl::Now() > next_harvest_time) {
+ next_harvest_time = std::max(next_harvest_time, now) + harvest_interval_;
+ Flush();
+ }
+ {
+ absl::MutexLock l(&mu_);
+ if (!thread_started_) {
+ break;
+ }
+ }
}
}
diff --git opencensus/stats/internal/delta_producer.h opencensus/stats/internal/delta_producer.h
index 2cff522..c8e2e95 100644
--- opencensus/stats/internal/delta_producer.h
+++ opencensus/stats/internal/delta_producer.h
@@ -71,6 +71,8 @@ class DeltaProducer final {
// Returns a pointer to the singleton DeltaProducer.
static DeltaProducer* Get();
+ void Shutdown();
+
// Adds a new Measure.
void AddMeasure();
@@ -122,6 +124,9 @@ class DeltaProducer final {
// thread when calling a flush during harvesting.
Delta last_delta_ GUARDED_BY(harvester_mu_);
std::thread harvester_thread_ GUARDED_BY(harvester_mu_);
+
+ mutable absl::Mutex mu_;
+ bool thread_started_ GUARDED_BY(mu_) = false;
};
} // namespace stats
diff --git opencensus/stats/internal/stats_exporter.cc opencensus/stats/internal/stats_exporter.cc
index 43ddbc7..37b4ae1 100644
--- opencensus/stats/internal/stats_exporter.cc
+++ opencensus/stats/internal/stats_exporter.cc
@@ -95,25 +95,52 @@ void StatsExporterImpl::ClearHandlersForTesting() {
}
void StatsExporterImpl::StartExportThread() EXCLUSIVE_LOCKS_REQUIRED(mu_) {
- t_ = std::thread(&StatsExporterImpl::RunWorkerLoop, this);
thread_started_ = true;
+ t_ = std::thread(&StatsExporterImpl::RunWorkerLoop, this);
+}
+
+void StatsExporterImpl::Shutdown() {
+ {
+ absl::MutexLock l(&mu_);
+ if (!thread_started_) {
+ return;
+ }
+ thread_started_ = false;
+ }
+ // Join loop thread when shutdown.
+ if (t_.joinable()) {
+ t_.join();
+ }
}
void StatsExporterImpl::RunWorkerLoop() {
absl::Time next_export_time = GetNextExportTime();
while (true) {
// SleepFor() returns immediately when given a negative duration.
- absl::SleepFor(next_export_time - absl::Now());
+ absl::SleepFor(absl::Seconds(0.1));
// In case the last export took longer than the export interval, we
// calculate the next time from now.
- next_export_time = GetNextExportTime();
- Export();
+ if (absl::Now() > next_export_time) {
+ next_export_time = GetNextExportTime();
+ Export();
+ }
+ {
+ absl::MutexLock l(&mu_);
+ if (!thread_started_) {
+ break;
+ }
+ }
}
}
// StatsExporter
// -------------
+void StatsExporter::Shutdown() {
+ StatsExporterImpl::Get()->Shutdown();
+ StatsExporterImpl::Get()->ClearHandlersForTesting();
+}
+
// static
void StatsExporter::SetInterval(absl::Duration interval) {
StatsExporterImpl::Get()->SetInterval(interval);
diff --git opencensus/stats/internal/stats_exporter_impl.h opencensus/stats/internal/stats_exporter_impl.h
index 11ae3c4..ebe9c4d 100644
--- opencensus/stats/internal/stats_exporter_impl.h
+++ opencensus/stats/internal/stats_exporter_impl.h
@@ -34,6 +34,7 @@ class StatsExporterImpl {
public:
static StatsExporterImpl* Get();
void SetInterval(absl::Duration interval);
+ void Shutdown();
absl::Time GetNextExportTime() const;
void AddView(const ViewDescriptor& view);
void RemoveView(absl::string_view name);
diff --git opencensus/stats/stats_exporter.h opencensus/stats/stats_exporter.h
index 6756858..65e0262 100644
--- opencensus/stats/stats_exporter.h
+++ opencensus/stats/stats_exporter.h
@@ -45,6 +45,8 @@ class StatsExporter final {
// Removes the view with 'name' from the registry, if one is registered.
static void RemoveView(absl::string_view name);
+ static void Shutdown();
+
// StatsExporter::Handler is the interface for push exporters that export
// recorded data for registered views. The exporter should provide a static
// Register() method that takes any arguments needed by the exporter (e.g. a