Force OMP_NUM_THREADS=1 if unset (#6998)

* force omp

* update

* set

* workers

* link
This commit is contained in:
Eric Liang 2020-02-01 11:46:11 -08:00 committed by GitHub
parent 3c60caa448
commit 8b4b49662b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 30 additions and 7 deletions

View file

@ -1,7 +1,11 @@
import os
import logging
import multiprocessing
from os.path import dirname
import sys
logger = logging.getLogger(__name__)
# MUST add pickle5 to the import path because it will be imported by some
# raylet modules.
@ -10,6 +14,12 @@ if "pickle5" in sys.modules:
"requires a specific version of pickle5 (which is "
"packaged along with Ray).")
if "OMP_NUM_THREADS" not in os.environ and multiprocessing.cpu_count() > 8:
logger.warning("[ray] Forcing OMP_NUM_THREADS=1 to avoid performance "
"degradation with many workers (issue #6998). You can "
"override this by explicitly setting OMP_NUM_THREADS.")
os.environ["OMP_NUM_THREADS"] = "1"
# Add the directory containing pickle5 to the Python path so that we find the
# pickle5 version packaged with ray and not a pre-existing pickle5.
pickle5_path = os.path.join(

View file

@ -29,7 +29,7 @@ KUBECTL_RSYNC = os.path.join(
def with_interactive(cmd):
force_interactive = ("true && source ~/.bashrc && "
"export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && ")
"export PYTHONWARNINGS=ignore && ")
return ["bash", "--login", "-c", "-i", quote(force_interactive + cmd)]

View file

@ -68,6 +68,12 @@ try:
except ImportError:
setproctitle = None
# Whether we should warn about slow put performance.
if os.environ.get("OMP_NUM_THREADS") == "1":
should_warn_of_slow_puts = True
else:
should_warn_of_slow_puts = False
class ActorCheckpointInfo:
"""Information used to maintain actor checkpoints."""
@ -272,10 +278,22 @@ class Worker:
"do this, you can wrap the ray.ObjectID in a list and "
"call 'put' on it (or return it).")
global should_warn_of_slow_puts
if should_warn_of_slow_puts:
start = time.perf_counter()
serialized_value = self.get_serialization_context().serialize(value)
return self.core_worker.put_serialized_object(
result = self.core_worker.put_serialized_object(
serialized_value, object_id=object_id, pin_object=pin_object)
if should_warn_of_slow_puts:
delta = time.perf_counter() - start
if delta > 0.1:
logger.warning("OMP_NUM_THREADS=1 is set, this may slow down "
"ray.put() for large objects (issue #6998).")
should_warn_of_slow_puts = False
return result
def deserialize_objects(self,
data_metadata_pairs,
object_ids,
@ -672,11 +690,6 @@ def init(address=None,
else:
driver_mode = SCRIPT_MODE
if "OMP_NUM_THREADS" in os.environ:
logger.warning("OMP_NUM_THREADS={} is set, this may impact "
"object transfer performance.".format(
os.environ["OMP_NUM_THREADS"]))
if setproctitle is None:
logger.warning(
"WARNING: Not updating worker name since `setproctitle` is not "