diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index ac07aab2f..6ca561dad 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -89,8 +89,6 @@ - label: ":python: (Flaky tests)" conditions: ["RAY_CI_PYTHON_AFFECTED", "RAY_CI_SERVE_AFFECTED", "RAY_CI_RLLIB_AFFECTED", "RAY_CI_TUNE_AFFECTED"] - soft_fail: - - exit_status: 1 commands: - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT - RLLIB_TESTING=1 TF_VERSION=2.1.0 TFP_VERSION=0.8 TORCH_VERSION=1.6 ./ci/travis/install-dependencies.sh diff --git a/python/ray/profiling.py b/python/ray/profiling.py index 8a2bb0f75..2a982f7a4 100644 --- a/python/ray/profiling.py +++ b/python/ray/profiling.py @@ -1,4 +1,5 @@ import ray +import os class _NullLogSpan: @@ -11,6 +12,7 @@ class _NullLogSpan: pass +PROFILING_ENABLED = "RAY_DISABLE_PROFILING" not in os.environ NULL_LOG_SPAN = _NullLogSpan() @@ -44,6 +46,8 @@ def profile(event_type, extra_data=None): Returns: An object that can profile a span of time via a "with" statement. """ + if not PROFILING_ENABLED: + return NULL_LOG_SPAN worker = ray.worker.global_worker if worker.mode == ray.worker.LOCAL_MODE: return NULL_LOG_SPAN diff --git a/src/ray/core_worker/profiling.cc b/src/ray/core_worker/profiling.cc index 903befb56..bd73600c5 100644 --- a/src/ray/core_worker/profiling.cc +++ b/src/ray/core_worker/profiling.cc @@ -59,8 +59,24 @@ void Profiler::FlushEvents() { } } + auto on_complete = [this](const Status &status) { + absl::MutexLock lock(&mutex_); + profile_flush_active_ = false; + }; + if (cur_profile_data->profile_events_size() != 0) { - if (!gcs_client_->Stats().AsyncAddProfileData(cur_profile_data, nullptr).ok()) { + // Check if we're backlogged first. + { + absl::MutexLock lock(&mutex_); + if (profile_flush_active_) { + RAY_LOG(WARNING) << "The GCS is backlogged processing profiling data. " + "Some events may be dropped."; + return; // Drop the events; we're behind. + } else { + profile_flush_active_ = true; + } + } + if (!gcs_client_->Stats().AsyncAddProfileData(cur_profile_data, on_complete).ok()) { RAY_LOG(WARNING) << "Failed to push profile events to GCS. This won't affect core Ray, but you " "might lose profile data, and ray timeline might not work as expected."; diff --git a/src/ray/core_worker/profiling.h b/src/ray/core_worker/profiling.h index 6d42e08a6..c186b0be3 100644 --- a/src/ray/core_worker/profiling.h +++ b/src/ray/core_worker/profiling.h @@ -52,6 +52,9 @@ class Profiler { // until they are flushed. std::shared_ptr rpc_profile_data_ GUARDED_BY(mutex_); + /// Whether a profile flush is already in progress. + bool profile_flush_active_ GUARDED_BY(mutex_) = false; + // Client to the GCS used to push profile events to it. std::shared_ptr gcs_client_; };