mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
Drop profiling events if the GCS becomes backlogged (#15726)
This commit is contained in:
parent
4d8ed6dd5c
commit
cb59d30917
4 changed files with 24 additions and 3 deletions
|
@ -89,8 +89,6 @@
|
|||
|
||||
- label: ":python: (Flaky tests)"
|
||||
conditions: ["RAY_CI_PYTHON_AFFECTED", "RAY_CI_SERVE_AFFECTED", "RAY_CI_RLLIB_AFFECTED", "RAY_CI_TUNE_AFFECTED"]
|
||||
soft_fail:
|
||||
- exit_status: 1
|
||||
commands:
|
||||
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
|
||||
- RLLIB_TESTING=1 TF_VERSION=2.1.0 TFP_VERSION=0.8 TORCH_VERSION=1.6 ./ci/travis/install-dependencies.sh
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import ray
|
||||
import os
|
||||
|
||||
|
||||
class _NullLogSpan:
|
||||
|
@ -11,6 +12,7 @@ class _NullLogSpan:
|
|||
pass
|
||||
|
||||
|
||||
PROFILING_ENABLED = "RAY_DISABLE_PROFILING" not in os.environ
|
||||
NULL_LOG_SPAN = _NullLogSpan()
|
||||
|
||||
|
||||
|
@ -44,6 +46,8 @@ def profile(event_type, extra_data=None):
|
|||
Returns:
|
||||
An object that can profile a span of time via a "with" statement.
|
||||
"""
|
||||
if not PROFILING_ENABLED:
|
||||
return NULL_LOG_SPAN
|
||||
worker = ray.worker.global_worker
|
||||
if worker.mode == ray.worker.LOCAL_MODE:
|
||||
return NULL_LOG_SPAN
|
||||
|
|
|
@ -59,8 +59,24 @@ void Profiler::FlushEvents() {
|
|||
}
|
||||
}
|
||||
|
||||
auto on_complete = [this](const Status &status) {
|
||||
absl::MutexLock lock(&mutex_);
|
||||
profile_flush_active_ = false;
|
||||
};
|
||||
|
||||
if (cur_profile_data->profile_events_size() != 0) {
|
||||
if (!gcs_client_->Stats().AsyncAddProfileData(cur_profile_data, nullptr).ok()) {
|
||||
// Check if we're backlogged first.
|
||||
{
|
||||
absl::MutexLock lock(&mutex_);
|
||||
if (profile_flush_active_) {
|
||||
RAY_LOG(WARNING) << "The GCS is backlogged processing profiling data. "
|
||||
"Some events may be dropped.";
|
||||
return; // Drop the events; we're behind.
|
||||
} else {
|
||||
profile_flush_active_ = true;
|
||||
}
|
||||
}
|
||||
if (!gcs_client_->Stats().AsyncAddProfileData(cur_profile_data, on_complete).ok()) {
|
||||
RAY_LOG(WARNING)
|
||||
<< "Failed to push profile events to GCS. This won't affect core Ray, but you "
|
||||
"might lose profile data, and ray timeline might not work as expected.";
|
||||
|
|
|
@ -52,6 +52,9 @@ class Profiler {
|
|||
// until they are flushed.
|
||||
std::shared_ptr<rpc::ProfileTableData> rpc_profile_data_ GUARDED_BY(mutex_);
|
||||
|
||||
/// Whether a profile flush is already in progress.
|
||||
bool profile_flush_active_ GUARDED_BY(mutex_) = false;
|
||||
|
||||
// Client to the GCS used to push profile events to it.
|
||||
std::shared_ptr<gcs::GcsClient> gcs_client_;
|
||||
};
|
||||
|
|
Loading…
Add table
Reference in a new issue