Move profiling code to a new file and fix thread safety (#2397)

This commit is contained in:
Hao Chen 2018-07-16 09:09:52 +08:00 committed by Robert Nishihara
parent bbea73155a
commit 8a3e180156
4 changed files with 301 additions and 258 deletions

View file

@ -47,9 +47,9 @@ except ImportError as e:
raise
from ray.local_scheduler import ObjectID, _config # noqa: E402
from ray.profiling import profile # noqa: E402
from ray.worker import (error_info, init, connect, disconnect, get, put, wait,
remote, profile, flush_profile_data, get_gpu_ids,
get_resource_ids, get_webui_url,
remote, get_gpu_ids, get_resource_ids, get_webui_url,
register_custom_serializer, shutdown) # noqa: E402
from ray.worker import (SCRIPT_MODE, WORKER_MODE, LOCAL_MODE,
SILENT_MODE) # noqa: E402
@ -65,11 +65,10 @@ __version__ = "0.5.0"
__all__ = [
"error_info", "init", "connect", "disconnect", "get", "put", "wait",
"remote", "profile", "flush_profile_data", "actor", "method",
"get_gpu_ids", "get_resource_ids", "get_webui_url",
"register_custom_serializer", "shutdown", "SCRIPT_MODE", "WORKER_MODE",
"LOCAL_MODE", "SILENT_MODE", "global_state", "ObjectID", "_config",
"__version__"
"remote", "profile", "actor", "method", "get_gpu_ids", "get_resource_ids",
"get_webui_url", "register_custom_serializer", "shutdown", "SCRIPT_MODE",
"WORKER_MODE", "LOCAL_MODE", "SILENT_MODE", "global_state", "ObjectID",
"_config", "__version__"
]
import ctypes # noqa: E402

View file

@ -10,6 +10,7 @@ import redis
import ray
from ray import ray_constants
from ray import cloudpickle as pickle
from ray import profiling
from ray import utils
@ -74,21 +75,23 @@ class ImportThread(object):
def _process_key(self, key):
"""Process the given export key from redis."""
from ray.worker import profile, WORKER_MODE
# Handle the driver case first.
if self.mode != WORKER_MODE:
if self.mode != ray.WORKER_MODE:
if key.startswith(b"FunctionsToRun"):
with profile("fetch_and_run_function", worker=self.worker):
with profiling.profile(
"fetch_and_run_function", worker=self.worker):
self.fetch_and_execute_function_to_run(key)
# Return because FunctionsToRun are the only things that
# the driver should import.
return
if key.startswith(b"RemoteFunction"):
with profile("register_remote_function", worker=self.worker):
with profiling.profile(
"register_remote_function", worker=self.worker):
self.fetch_and_register_remote_function(key)
elif key.startswith(b"FunctionsToRun"):
with profile("fetch_and_run_function", worker=self.worker):
with profiling.profile(
"fetch_and_run_function", worker=self.worker):
self.fetch_and_execute_function_to_run(key)
elif key.startswith(b"ActorClass"):
# Keep track of the fact that this actor class has been
@ -154,11 +157,10 @@ class ImportThread(object):
def fetch_and_execute_function_to_run(self, key):
"""Run on arbitrary function on the worker."""
from ray.worker import SCRIPT_MODE, SILENT_MODE
driver_id, serialized_function = self.redis_client.hmget(
key, ["driver_id", "function"])
if (self.worker.mode in [SCRIPT_MODE, SILENT_MODE]
if (self.worker.mode in [ray.SCRIPT_MODE, ray.SILENT_MODE]
and driver_id != self.worker.task_driver_id.id()):
# This export was from a different driver and there's no need for
# this driver to import it.

271
python/ray/profiling.py Normal file
View file

@ -0,0 +1,271 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import json
import time
import threading
import traceback
import ray
LOG_POINT = 0
LOG_SPAN_START = 1
LOG_SPAN_END = 2
class _NullLogSpan(object):
"""A log span context manager that does nothing"""
def __enter__(self):
pass
def __exit__(self, type, value, tb):
pass
NULL_LOG_SPAN = _NullLogSpan()
def profile(event_type, extra_data=None, worker=None):
"""Profile a span of time so that it appears in the timeline visualization.
Note that this only works in the raylet code path.
This function can be used as follows (both on the driver or within a task).
.. code-block:: python
with ray.profile("custom event", extra_data={'key': 'value'}):
# Do some computation here.
Optionally, a dictionary can be passed as the "extra_data" argument, and
it can have keys "name" and "cname" if you want to override the default
timeline display text and box color. Other values will appear at the bottom
of the chrome tracing GUI when you click on the box corresponding to this
profile span.
Args:
event_type: A string describing the type of the event.
extra_data: This must be a dictionary mapping strings to strings. This
data will be added to the json objects that are used to populate
the timeline, so if you want to set a particular color, you can
simply set the "cname" attribute to an appropriate color.
Similarly, if you set the "name" attribute, then that will set the
text displayed on the box in the timeline.
Returns:
An object that can profile a span of time via a "with" statement.
"""
if worker is None:
worker = ray.worker.global_worker
if not worker.use_raylet:
# Log the event if this is a worker and not a driver, since the
# driver's event log never gets flushed.
if worker.mode == ray.WORKER_MODE:
return RayLogSpanNonRaylet(
worker.profiler, event_type, contents=extra_data)
else:
return NULL_LOG_SPAN
else:
return RayLogSpanRaylet(
worker.profiler, event_type, extra_data=extra_data)
class Profiler(object):
"""A class that holds the profiling states.
Attributes:
worker: the worker to profile.
events: the buffer of events.
lock: the lock to protect access of events.
"""
def __init__(self, worker):
self.worker = worker
self.events = []
self.lock = threading.Lock()
def start_flush_thread(self):
t = threading.Thread(target=self._periodically_flush_profile_events)
# Making the thread a daemon causes it to exit when the main thread
# exits.
t.daemon = True
t.start()
def _periodically_flush_profile_events(self):
"""Drivers run this as a thread to flush profile data in the
background."""
# Note(rkn): This is run on a background thread in the driver. It uses
# the local scheduler client. This should be ok because it doesn't read
# from the local scheduler client and we have the GIL here. However,
# if either of those things changes, then we could run into issues.
try:
while True:
time.sleep(1)
self.flush_profile_data()
except AttributeError:
# This is to suppress errors that occur at shutdown.
pass
# TODO(rkn): Support calling this function in the middle of a task, and
# also call this periodically in the background from the driver.
def flush_profile_data(self):
"""Push the logged profiling data to the global control store.
By default, profiling information for a given task won't appear in the
timeline until after the task has completed. For very long-running
tasks, we may want profiling information to appear more quickly.
In such cases, this function can be called. Note that as an
aalternative, we could start thread in the background on workers that
calls this automatically.
"""
with self.lock:
events = self.events
self.events = []
if not self.worker.use_raylet:
event_log_key = b"event_log:" + self.worker.worker_id
event_log_value = json.dumps(events)
self.worker.local_scheduler_client.log_event(
event_log_key, event_log_value, time.time())
else:
if self.worker.mode == ray.WORKER_MODE:
component_type = "worker"
else:
component_type = "driver"
self.worker.local_scheduler_client.push_profile_events(
component_type, ray.ObjectID(self.worker.worker_id),
self.worker.node_ip_address, events)
def add_event(self, event):
with self.lock:
self.events.append(event)
class RayLogSpanNonRaylet(object):
"""An object used to enable logging a span of events with a with statement.
Attributes:
event_type (str): The type of the event being logged.
contents: Additional information to log.
"""
def __init__(self, profiler, event_type, contents=None):
"""Initialize a RayLogSpanNonRaylet object."""
self.profiler = profiler
self.event_type = event_type
self.contents = contents
def _log(self, event_type, kind, contents=None):
"""Log an event to the global state store.
This adds the event to a buffer of events locally. The buffer can be
flushed and written to the global state store by calling
flush_profile_data().
Args:
event_type (str): The type of the event.
contents: More general data to store with the event.
kind (int): Either LOG_POINT, LOG_SPAN_START, or LOG_SPAN_END. This
is LOG_POINT if the event being logged happens at a single
point in time. It is LOG_SPAN_START if we are starting to log a
span of time, and it is LOG_SPAN_END if we are finishing
logging a span of time.
"""
# TODO(rkn): This code currently takes around half a microsecond. Since
# we call it tens of times per task, this adds up. We will need to redo
# the logging code, perhaps in C.
contents = {} if contents is None else contents
assert isinstance(contents, dict)
# Make sure all of the keys and values in the dictionary are strings.
contents = {str(k): str(v) for k, v in contents.items()}
self.profiler.add_event((time.time(), event_type, kind, contents))
def __enter__(self):
"""Log the beginning of a span event."""
self._log(
event_type=self.event_type,
contents=self.contents,
kind=LOG_SPAN_START)
def __exit__(self, type, value, tb):
"""Log the end of a span event. Log any exception that occurred."""
if type is None:
self._log(event_type=self.event_type, kind=LOG_SPAN_END)
else:
self._log(
event_type=self.event_type,
contents={
"type": str(type),
"value": value,
"traceback": traceback.format_exc()
},
kind=LOG_SPAN_END)
class RayLogSpanRaylet(object):
"""An object used to enable logging a span of events with a with statement.
Attributes:
event_type (str): The type of the event being logged.
contents: Additional information to log.
"""
def __init__(self, profiler, event_type, extra_data=None):
"""Initialize a RayLogSpanRaylet object."""
self.profiler = profiler
self.event_type = event_type
self.extra_data = extra_data if extra_data is not None else {}
def set_attribute(self, key, value):
"""Add a key-value pair to the extra_data dict.
This can be used to add attributes that are not available when
ray.profile was called.
Args:
key: The attribute name.
value: The attribute value.
"""
if not isinstance(key, str) or not isinstance(value, str):
raise ValueError("The extra_data argument must be a "
"dictionary mapping strings to strings.")
self.extra_data[key] = value
def __enter__(self):
"""Log the beginning of a span event.
Returns:
The object itself is returned so that if the block is opened using
"with ray.profile(...) as prof:", we can call
"prof.set_attribute" inside the block.
"""
self.start_time = time.time()
return self
def __exit__(self, type, value, tb):
"""Log the end of a span event. Log any exception that occurred."""
for key, value in self.extra_data.items():
if not isinstance(key, str) or not isinstance(value, str):
raise ValueError("The extra_data argument must be a "
"dictionary mapping strings to strings.")
if type is not None:
extra_data = json.dumps({
"type": str(type),
"value": str(value),
"traceback": str(traceback.format_exc()),
})
else:
extra_data = json.dumps(self.extra_data)
event = {
"event_type": self.event_type,
"start_time": self.start_time,
"end_time": time.time(),
"extra_data": extra_data,
}
self.profiler.add_event(event)

View file

@ -7,7 +7,6 @@ import collections
import colorama
import hashlib
import inspect
import json
import numpy as np
import os
import redis
@ -32,6 +31,7 @@ import ray.local_scheduler
import ray.plasma
import ray.ray_constants as ray_constants
from ray import import_thread
from ray import profiling
from ray.utils import (
binary_to_hex,
check_oversized_pickle,
@ -44,10 +44,6 @@ WORKER_MODE = 1
LOCAL_MODE = 2
SILENT_MODE = 3
LOG_POINT = 0
LOG_SPAN_START = 1
LOG_SPAN_END = 2
ERROR_KEY_PREFIX = b"Error:"
DRIVER_ID_LENGTH = 20
ERROR_ID_LENGTH = 20
@ -203,6 +199,7 @@ class Worker(object):
that connect has been called already.
cached_functions_to_run (List): A list of functions to run on all of
the workers that should be exported as soon as connect is called.
profiler: the profiler used to aggregate profiling information.
"""
def __init__(self):
@ -238,6 +235,7 @@ class Worker(object):
# When the worker is constructed. Record the original value of the
# CUDA_VISIBLE_DEVICES environment variable.
self.original_gpu_ids = ray.utils.get_cuda_visible_devices()
self.profiler = profiling.Profiler(self)
def check_connected(self):
"""Check if the worker is connected.
@ -563,7 +561,7 @@ class Worker(object):
Returns:
The return object IDs for this task.
"""
with profile("submit_task", worker=self):
with profiling.profile("submit_task", worker=self):
check_main_thread()
if actor_id is None:
assert actor_handle_id is None
@ -868,7 +866,7 @@ class Worker(object):
# Get task arguments from the object store.
try:
with profile("task:deserialize_arguments", worker=self):
with profiling.profile("task:deserialize_arguments", worker=self):
arguments = self._get_arguments_for_execution(
function_name, args)
except (RayGetError, RayGetArgumentError) as e:
@ -883,7 +881,7 @@ class Worker(object):
# Execute the task.
try:
with profile("task:execute", worker=self):
with profiling.profile("task:execute", worker=self):
if task.actor_id().id() == NIL_ACTOR_ID:
outputs = function_executor(*arguments)
else:
@ -902,7 +900,7 @@ class Worker(object):
# Store the outputs in the local object store.
try:
with profile("task:store_outputs", worker=self):
with profiling.profile("task:store_outputs", worker=self):
# If this is an actor task, then the last object ID returned by
# the task is a dummy output, not returned by the function
# itself. Decrement to get the correct number of return values.
@ -977,7 +975,7 @@ class Worker(object):
# Wait until the function to be executed has actually been registered
# on this worker. We will push warnings to the user if we spend too
# long in this loop.
with profile("wait_for_function", worker=self):
with profiling.profile("wait_for_function", worker=self):
self._wait_for_function(function_id, driver_id)
# Execute the task.
@ -1000,11 +998,11 @@ class Worker(object):
"name": function_name,
"task_id": task.task_id().hex()
}
with profile("task", extra_data=extra_data, worker=self):
with profiling.profile("task", extra_data=extra_data, worker=self):
self._process_task(task)
# Push all of the log events to the global state store.
flush_profile_data()
self.profiler.flush_profile_data()
# Increase the task execution counter.
self.num_task_executions[driver_id][function_id.id()] += 1
@ -1022,7 +1020,7 @@ class Worker(object):
Returns:
A task from the local scheduler.
"""
with profile("get_task", worker=self):
with profiling.profile("get_task", worker=self):
task = self.local_scheduler_client.get_task()
# Automatically restrict the GPUs available to this task.
@ -1847,21 +1845,6 @@ def custom_excepthook(type, value, tb):
sys.excepthook = custom_excepthook
def _flush_profile_events(worker):
"""Drivers run this as a thread to flush profile data in the background."""
# Note(rkn): This is run on a background thread in the driver. It uses the
# local scheduler client. This should be ok because it doesn't read from
# the local scheduler client and we have the GIL here. However, if either
# of those things changes, then we could run into issues.
try:
while True:
time.sleep(1)
flush_profile_data(worker=worker)
except AttributeError:
# This is to suppress errors that occur at shutdown.
pass
def print_error_messages_raylet(worker):
"""Print error messages in the background on the driver.
@ -2024,11 +2007,6 @@ def connect(info,
worker.set_mode(mode)
worker.use_raylet = use_raylet
# The worker.events field is used to aggregate logging information and
# display it in the web UI. Note that Python lists protected by the GIL,
# which is important because we will append to this field from multiple
# threads.
worker.events = []
# If running Ray in LOCAL_MODE, there is no need to create call
# create_worker or to start the worker service.
if mode == LOCAL_MODE:
@ -2214,11 +2192,7 @@ def connect(info,
t.start()
if mode in [SCRIPT_MODE, SILENT_MODE] and worker.use_raylet:
t = threading.Thread(target=_flush_profile_events, args=(worker, ))
# Making the thread a daemon causes it to exit when the main thread
# exits.
t.daemon = True
t.start()
worker.profiler.start_flush_thread()
if mode in [SCRIPT_MODE, SILENT_MODE]:
# Add the directory containing the script that is running to the Python
@ -2397,209 +2371,6 @@ def register_custom_serializer(cls,
register_class_for_serialization({"worker": worker})
class RayLogSpan(object):
"""An object used to enable logging a span of events with a with statement.
Attributes:
event_type (str): The type of the event being logged.
contents: Additional information to log.
"""
def __init__(self, event_type, contents=None, worker=global_worker):
"""Initialize a RayLogSpan object."""
self.event_type = event_type
self.contents = contents
self.worker = worker
def __enter__(self):
"""Log the beginning of a span event."""
_log(
event_type=self.event_type,
contents=self.contents,
kind=LOG_SPAN_START,
worker=self.worker)
def __exit__(self, type, value, tb):
"""Log the end of a span event. Log any exception that occurred."""
if type is None:
_log(
event_type=self.event_type,
kind=LOG_SPAN_END,
worker=self.worker)
else:
_log(
event_type=self.event_type,
contents={
"type": str(type),
"value": value,
"traceback": traceback.format_exc()
},
kind=LOG_SPAN_END,
worker=self.worker)
class RayLogSpanRaylet(object):
"""An object used to enable logging a span of events with a with statement.
Attributes:
event_type (str): The type of the event being logged.
contents: Additional information to log.
"""
def __init__(self, event_type, extra_data=None, worker=global_worker):
"""Initialize a RayLogSpan object."""
self.event_type = event_type
self.extra_data = extra_data if extra_data is not None else {}
self.worker = worker
def set_attribute(self, key, value):
"""Add a key-value pair to the extra_data dict.
This can be used to add attributes that are not available when
ray.profile was called.
Args:
key: The attribute name.
value: The attribute value.
"""
if not isinstance(key, str) or not isinstance(value, str):
raise ValueError("The extra_data argument must be a "
"dictionary mapping strings to strings.")
self.extra_data[key] = value
def __enter__(self):
"""Log the beginning of a span event.
Returns:
The object itself is returned so that if the block is opened using
"with ray.profile(...) as prof:", we can call
"prof.set_attribute" inside the block.
"""
self.start_time = time.time()
return self
def __exit__(self, type, value, tb):
"""Log the end of a span event. Log any exception that occurred."""
for key, value in self.extra_data.items():
if not isinstance(key, str) or not isinstance(value, str):
raise ValueError("The extra_data argument must be a "
"dictionary mapping strings to strings.")
event = {
"event_type": self.event_type,
"start_time": self.start_time,
"end_time": time.time(),
"extra_data": json.dumps(self.extra_data),
}
if type is not None:
event["extra_data"] = json.dumps({
"type": str(type),
"value": str(value),
"traceback": str(traceback.format_exc()),
})
self.worker.events.append(event)
def profile(event_type, extra_data=None, worker=global_worker):
"""Profile a span of time so that it appears in the timeline visualization.
Note that this only works in the raylet code path.
This function can be used as follows (both on the driver or within a task).
.. code-block:: python
with ray.profile("custom event", extra_data={'key': 'value'}):
# Do some computation here.
Optionally, a dictionary can be passed as the "extra_data" argument, and
it can have keys "name" and "cname" if you want to override the default
timeline display text and box color. Other values will appear at the bottom
of the chrome tracing GUI when you click on the box corresponding to this
profile span.
Args:
event_type: A string describing the type of the event.
extra_data: This must be a dictionary mapping strings to strings. This
data will be added to the json objects that are used to populate
the timeline, so if you want to set a particular color, you can
simply set the "cname" attribute to an appropriate color.
Similarly, if you set the "name" attribute, then that will set the
text displayed on the box in the timeline.
Returns:
An object that can profile a span of time via a "with" statement.
"""
if not worker.use_raylet:
return RayLogSpan(event_type, contents=extra_data, worker=worker)
else:
return RayLogSpanRaylet(
event_type, extra_data=extra_data, worker=worker)
def _log(event_type, kind, contents=None, worker=global_worker):
"""Log an event to the global state store.
This adds the event to a buffer of events locally. The buffer can be
flushed and written to the global state store by calling
flush_profile_data().
Args:
event_type (str): The type of the event.
contents: More general data to store with the event.
kind (int): Either LOG_POINT, LOG_SPAN_START, or LOG_SPAN_END. This is
LOG_POINT if the event being logged happens at a single point in
time. It is LOG_SPAN_START if we are starting to log a span of
time, and it is LOG_SPAN_END if we are finishing logging a span of
time.
"""
if worker.use_raylet:
raise Exception(
"This method is not supported in the raylet code path.")
# TODO(rkn): This code currently takes around half a microsecond. Since we
# call it tens of times per task, this adds up. We will need to redo the
# logging code, perhaps in C.
contents = {} if contents is None else contents
assert isinstance(contents, dict)
# Make sure all of the keys and values in the dictionary are strings.
contents = {str(k): str(v) for k, v in contents.items()}
# Log the event if this is a worker and not a driver, since the driver's
# event log never gets flushed.
if worker.mode == WORKER_MODE:
worker.events.append((time.time(), event_type, kind, contents))
# TODO(rkn): Support calling this function in the middle of a task, and also
# call this periodically in the background from the driver.
def flush_profile_data(worker=global_worker):
"""Push the logged profiling data to the global control store.
By default, profiling information for a given task won't appear in the
timeline until after the task has completed. For very long-running tasks,
we may want profiling information to appear more quickly. In such cases,
this function can be called. Note that as an alternative, we could start
a thread in the background on workers that calls this automatically.
"""
if not worker.use_raylet:
event_log_key = b"event_log:" + worker.worker_id
event_log_value = json.dumps(worker.events)
worker.local_scheduler_client.log_event(event_log_key, event_log_value,
time.time())
else:
if worker.mode == WORKER_MODE:
component_type = "worker"
else:
component_type = "driver"
worker.local_scheduler_client.push_profile_events(
component_type, ray.ObjectID(worker.worker_id),
worker.node_ip_address, worker.events)
worker.events = []
def get(object_ids, worker=global_worker):
"""Get a remote object or a list of remote objects from the object store.
@ -2621,7 +2392,7 @@ def get(object_ids, worker=global_worker):
or that created one of the objects raised an exception.
"""
worker.check_connected()
with profile("ray.get", worker=worker):
with profiling.profile("ray.get", worker=worker):
check_main_thread()
if worker.mode == LOCAL_MODE:
@ -2654,7 +2425,7 @@ def put(value, worker=global_worker):
The object ID assigned to this value.
"""
worker.check_connected()
with profile("ray.put", worker=worker):
with profiling.profile("ray.put", worker=worker):
check_main_thread()
if worker.mode == LOCAL_MODE:
@ -2713,7 +2484,7 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker):
type(object_id)))
worker.check_connected()
with profile("ray.wait", worker=worker):
with profiling.profile("ray.wait", worker=worker):
check_main_thread()
# When Ray is run in LOCAL_MODE, all functions are run immediately,