From 8b4b49662b43b1e59c3ea71531cd8d9b1d85d224 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Sat, 1 Feb 2020 11:46:11 -0800 Subject: [PATCH] Force OMP_NUM_THREADS=1 if unset (#6998) * force omp * update * set * workers * link --- python/ray/__init__.py | 10 ++++++++++ python/ray/autoscaler/updater.py | 2 +- python/ray/worker.py | 25 +++++++++++++++++++------ 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/python/ray/__init__.py b/python/ray/__init__.py index 42dc2e806..13ef3e4b0 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -1,7 +1,11 @@ import os +import logging +import multiprocessing from os.path import dirname import sys +logger = logging.getLogger(__name__) + # MUST add pickle5 to the import path because it will be imported by some # raylet modules. @@ -10,6 +14,12 @@ if "pickle5" in sys.modules: "requires a specific version of pickle5 (which is " "packaged along with Ray).") +if "OMP_NUM_THREADS" not in os.environ and multiprocessing.cpu_count() > 8: + logger.warning("[ray] Forcing OMP_NUM_THREADS=1 to avoid performance " + "degradation with many workers (issue #6998). You can " + "override this by explicitly setting OMP_NUM_THREADS.") + os.environ["OMP_NUM_THREADS"] = "1" + # Add the directory containing pickle5 to the Python path so that we find the # pickle5 version packaged with ray and not a pre-existing pickle5. pickle5_path = os.path.join( diff --git a/python/ray/autoscaler/updater.py b/python/ray/autoscaler/updater.py index b5bae21ae..eb320abb6 100644 --- a/python/ray/autoscaler/updater.py +++ b/python/ray/autoscaler/updater.py @@ -29,7 +29,7 @@ KUBECTL_RSYNC = os.path.join( def with_interactive(cmd): force_interactive = ("true && source ~/.bashrc && " - "export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && ") + "export PYTHONWARNINGS=ignore && ") return ["bash", "--login", "-c", "-i", quote(force_interactive + cmd)] diff --git a/python/ray/worker.py b/python/ray/worker.py index edbd312dd..03f1023b8 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -68,6 +68,12 @@ try: except ImportError: setproctitle = None +# Whether we should warn about slow put performance. +if os.environ.get("OMP_NUM_THREADS") == "1": + should_warn_of_slow_puts = True +else: + should_warn_of_slow_puts = False + class ActorCheckpointInfo: """Information used to maintain actor checkpoints.""" @@ -272,10 +278,22 @@ class Worker: "do this, you can wrap the ray.ObjectID in a list and " "call 'put' on it (or return it).") + global should_warn_of_slow_puts + if should_warn_of_slow_puts: + start = time.perf_counter() + serialized_value = self.get_serialization_context().serialize(value) - return self.core_worker.put_serialized_object( + result = self.core_worker.put_serialized_object( serialized_value, object_id=object_id, pin_object=pin_object) + if should_warn_of_slow_puts: + delta = time.perf_counter() - start + if delta > 0.1: + logger.warning("OMP_NUM_THREADS=1 is set, this may slow down " + "ray.put() for large objects (issue #6998).") + should_warn_of_slow_puts = False + return result + def deserialize_objects(self, data_metadata_pairs, object_ids, @@ -672,11 +690,6 @@ def init(address=None, else: driver_mode = SCRIPT_MODE - if "OMP_NUM_THREADS" in os.environ: - logger.warning("OMP_NUM_THREADS={} is set, this may impact " - "object transfer performance.".format( - os.environ["OMP_NUM_THREADS"])) - if setproctitle is None: logger.warning( "WARNING: Not updating worker name since `setproctitle` is not "