mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
Task profiles function and test (#647)
Expose some task profiling information through global state API.
This commit is contained in:
parent
43bae46e47
commit
cc4990b543
2 changed files with 73 additions and 0 deletions
|
@ -2,6 +2,7 @@ from __future__ import absolute_import
|
|||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
import json
|
||||
import pickle
|
||||
import redis
|
||||
|
||||
|
@ -332,3 +333,31 @@ class GlobalState(object):
|
|||
ip_filename_file[ip_addr][filename] = file_str
|
||||
|
||||
return ip_filename_file
|
||||
|
||||
def task_profiles(self):
|
||||
"""Fetch and return a list of task profiles.
|
||||
|
||||
Returns:
|
||||
A tuple of two elements. The first element is a dictionary mapping the
|
||||
task ID of a task to a list of the profiling information for all of the
|
||||
executions of that task. The second element is a list of profiling
|
||||
information for tasks where the events have no task ID.
|
||||
"""
|
||||
event_names = self.redis_client.keys("event_log*")
|
||||
results = dict()
|
||||
events = []
|
||||
for i in range(len(event_names)):
|
||||
event_list = self.redis_client.lrange(event_names[i], 0, -1)
|
||||
for event in event_list:
|
||||
event_dict = json.loads(event.decode("ascii"))
|
||||
task_id = ""
|
||||
for element in event_dict:
|
||||
if "task_id" in element[3]:
|
||||
task_id = element[3]["task_id"]
|
||||
if task_id != "":
|
||||
if task_id not in results:
|
||||
results[task_id] = []
|
||||
results[task_id].append(event_dict)
|
||||
else:
|
||||
events.append(event_dict)
|
||||
return results, events
|
||||
|
|
|
@ -1565,6 +1565,50 @@ class GlobalStateAPI(unittest.TestCase):
|
|||
time.sleep(0.1)
|
||||
|
||||
self.assertEqual(found_message, True)
|
||||
|
||||
ray.worker.cleanup()
|
||||
|
||||
def testTaskProfileAPI(self):
|
||||
ray.init(redirect_output=True)
|
||||
|
||||
@ray.remote
|
||||
def f():
|
||||
return 1
|
||||
|
||||
num_calls = 5
|
||||
[f.remote() for _ in range(num_calls)]
|
||||
|
||||
# Make sure the event log has the correct number of events.
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < 10:
|
||||
profiles, events = ray.global_state.task_profiles()
|
||||
if len(profiles) == num_calls:
|
||||
break
|
||||
time.sleep(0.1)
|
||||
self.assertEqual(len(profiles), num_calls)
|
||||
self.assertEqual(len(events), 0)
|
||||
|
||||
# Make sure that each entry is properly formatted.
|
||||
for task_id in profiles:
|
||||
events_list = profiles[task_id]
|
||||
# Make sure that the task was not executed more than once.
|
||||
self.assertEqual(len(events_list), 1)
|
||||
events = events_list[0]
|
||||
for event in events:
|
||||
found_exec = False
|
||||
found_store = False
|
||||
found_get = False
|
||||
for event in events:
|
||||
if event[1] == "ray:task:execute":
|
||||
found_exec = True
|
||||
if event[1] == "ray:task:get_arguments":
|
||||
found_get = True
|
||||
if event[1] == "ray:task:store_outputs":
|
||||
found_store = True
|
||||
self.assertTrue(found_exec)
|
||||
self.assertTrue(found_store)
|
||||
self.assertTrue(found_get)
|
||||
|
||||
ray.worker.cleanup()
|
||||
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue