mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
169 lines
5.4 KiB
Diff
169 lines
5.4 KiB
Diff
diff --git opencensus/stats/internal/delta_producer.cc opencensus/stats/internal/delta_producer.cc
|
|
index c61b4d9..b3e4ef2 100644
|
|
--- opencensus/stats/internal/delta_producer.cc
|
|
+++ opencensus/stats/internal/delta_producer.cc
|
|
@@ -75,6 +75,20 @@ DeltaProducer* DeltaProducer::Get() {
|
|
return global_delta_producer;
|
|
}
|
|
|
|
+void DeltaProducer::Shutdown() {
|
|
+ {
|
|
+ absl::MutexLock l(&mu_);
|
|
+ if (!thread_started_) {
|
|
+ return;
|
|
+ }
|
|
+ thread_started_ = false;
|
|
+ }
|
|
+ // Join loop thread when shutdown.
|
|
+ if (harvester_thread_.joinable()) {
|
|
+ harvester_thread_.join();
|
|
+ }
|
|
+}
|
|
+
|
|
void DeltaProducer::AddMeasure() {
|
|
delta_mu_.Lock();
|
|
absl::MutexLock harvester_lock(&harvester_mu_);
|
|
@@ -115,7 +129,10 @@ void DeltaProducer::Flush() {
|
|
}
|
|
|
|
DeltaProducer::DeltaProducer()
|
|
- : harvester_thread_(&DeltaProducer::RunHarvesterLoop, this) {}
|
|
+ : harvester_thread_(&DeltaProducer::RunHarvesterLoop, this) {
|
|
+ absl::MutexLock l(&mu_);
|
|
+ thread_started_ = true;
|
|
+}
|
|
|
|
void DeltaProducer::SwapDeltas() {
|
|
ABSL_ASSERT(last_delta_.delta().empty() && "Last delta was not consumed.");
|
|
@@ -131,11 +148,19 @@ void DeltaProducer::RunHarvesterLoop() {
|
|
absl::Time next_harvest_time = absl::Now() + harvest_interval_;
|
|
while (true) {
|
|
const absl::Time now = absl::Now();
|
|
- absl::SleepFor(next_harvest_time - now);
|
|
+ absl::SleepFor(absl::Seconds(0.1));
|
|
// Account for the possibility that the last harvest took longer than
|
|
// harvest_interval_ and we are already past next_harvest_time.
|
|
- next_harvest_time = std::max(next_harvest_time, now) + harvest_interval_;
|
|
- Flush();
|
|
+ if (absl::Now() > next_harvest_time) {
|
|
+ next_harvest_time = std::max(next_harvest_time, now) + harvest_interval_;
|
|
+ Flush();
|
|
+ }
|
|
+ {
|
|
+ absl::MutexLock l(&mu_);
|
|
+ if (!thread_started_) {
|
|
+ break;
|
|
+ }
|
|
+ }
|
|
}
|
|
}
|
|
|
|
diff --git opencensus/stats/internal/delta_producer.h opencensus/stats/internal/delta_producer.h
|
|
index 2cff522..c8e2e95 100644
|
|
--- opencensus/stats/internal/delta_producer.h
|
|
+++ opencensus/stats/internal/delta_producer.h
|
|
@@ -71,6 +71,8 @@ class DeltaProducer final {
|
|
// Returns a pointer to the singleton DeltaProducer.
|
|
static DeltaProducer* Get();
|
|
|
|
+ void Shutdown();
|
|
+
|
|
// Adds a new Measure.
|
|
void AddMeasure();
|
|
|
|
@@ -122,6 +124,9 @@ class DeltaProducer final {
|
|
// thread when calling a flush during harvesting.
|
|
Delta last_delta_ GUARDED_BY(harvester_mu_);
|
|
std::thread harvester_thread_ GUARDED_BY(harvester_mu_);
|
|
+
|
|
+ mutable absl::Mutex mu_;
|
|
+ bool thread_started_ GUARDED_BY(mu_) = false;
|
|
};
|
|
|
|
} // namespace stats
|
|
diff --git opencensus/stats/internal/stats_exporter.cc opencensus/stats/internal/stats_exporter.cc
|
|
index 43ddbc7..37b4ae1 100644
|
|
--- opencensus/stats/internal/stats_exporter.cc
|
|
+++ opencensus/stats/internal/stats_exporter.cc
|
|
@@ -95,25 +95,52 @@ void StatsExporterImpl::ClearHandlersForTesting() {
|
|
}
|
|
|
|
void StatsExporterImpl::StartExportThread() EXCLUSIVE_LOCKS_REQUIRED(mu_) {
|
|
- t_ = std::thread(&StatsExporterImpl::RunWorkerLoop, this);
|
|
thread_started_ = true;
|
|
+ t_ = std::thread(&StatsExporterImpl::RunWorkerLoop, this);
|
|
+}
|
|
+
|
|
+void StatsExporterImpl::Shutdown() {
|
|
+ {
|
|
+ absl::MutexLock l(&mu_);
|
|
+ if (!thread_started_) {
|
|
+ return;
|
|
+ }
|
|
+ thread_started_ = false;
|
|
+ }
|
|
+ // Join loop thread when shutdown.
|
|
+ if (t_.joinable()) {
|
|
+ t_.join();
|
|
+ }
|
|
}
|
|
|
|
void StatsExporterImpl::RunWorkerLoop() {
|
|
absl::Time next_export_time = GetNextExportTime();
|
|
while (true) {
|
|
// SleepFor() returns immediately when given a negative duration.
|
|
- absl::SleepFor(next_export_time - absl::Now());
|
|
+ absl::SleepFor(absl::Seconds(0.1));
|
|
// In case the last export took longer than the export interval, we
|
|
// calculate the next time from now.
|
|
- next_export_time = GetNextExportTime();
|
|
- Export();
|
|
+ if (absl::Now() > next_export_time) {
|
|
+ next_export_time = GetNextExportTime();
|
|
+ Export();
|
|
+ }
|
|
+ {
|
|
+ absl::MutexLock l(&mu_);
|
|
+ if (!thread_started_) {
|
|
+ break;
|
|
+ }
|
|
+ }
|
|
}
|
|
}
|
|
|
|
// StatsExporter
|
|
// -------------
|
|
|
|
+void StatsExporter::Shutdown() {
|
|
+ StatsExporterImpl::Get()->Shutdown();
|
|
+ StatsExporterImpl::Get()->ClearHandlersForTesting();
|
|
+}
|
|
+
|
|
// static
|
|
void StatsExporter::SetInterval(absl::Duration interval) {
|
|
StatsExporterImpl::Get()->SetInterval(interval);
|
|
diff --git opencensus/stats/internal/stats_exporter_impl.h opencensus/stats/internal/stats_exporter_impl.h
|
|
index 11ae3c4..ebe9c4d 100644
|
|
--- opencensus/stats/internal/stats_exporter_impl.h
|
|
+++ opencensus/stats/internal/stats_exporter_impl.h
|
|
@@ -34,6 +34,7 @@ class StatsExporterImpl {
|
|
public:
|
|
static StatsExporterImpl* Get();
|
|
void SetInterval(absl::Duration interval);
|
|
+ void Shutdown();
|
|
absl::Time GetNextExportTime() const;
|
|
void AddView(const ViewDescriptor& view);
|
|
void RemoveView(absl::string_view name);
|
|
diff --git opencensus/stats/stats_exporter.h opencensus/stats/stats_exporter.h
|
|
index 6756858..65e0262 100644
|
|
--- opencensus/stats/stats_exporter.h
|
|
+++ opencensus/stats/stats_exporter.h
|
|
@@ -45,6 +45,8 @@ class StatsExporter final {
|
|
// Removes the view with 'name' from the registry, if one is registered.
|
|
static void RemoveView(absl::string_view name);
|
|
|
|
+ static void Shutdown();
|
|
+
|
|
// StatsExporter::Handler is the interface for push exporters that export
|
|
// recorded data for registered views. The exporter should provide a static
|
|
// Register() method that takes any arguments needed by the exporter (e.g. a
|