Optimize python task execution (#6024)

This commit is contained in:
Philipp Moritz 2019-10-27 00:43:34 -07:00 committed by GitHub
parent e706cb63cc
commit 80c01617a3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 60 additions and 40 deletions

View file

@ -481,20 +481,24 @@ cdef execute_task(
c_vector[shared_ptr[CRayObject]] *returns):
worker = ray.worker.global_worker
manager = worker.function_actor_manager
actor_id = worker.core_worker.get_actor_id()
job_id = worker.core_worker.get_current_job_id()
task_id = worker.core_worker.get_current_task_id()
cdef:
dict execution_infos = manager.execution_infos
CoreWorker core_worker = worker.core_worker
JobID job_id = core_worker.get_current_job_id()
CTaskID task_id = core_worker.core_worker.get().GetCurrentTaskId()
# Automatically restrict the GPUs available to this task.
ray.utils.set_cuda_visible_devices(ray.get_gpu_ids())
function_descriptor = FunctionDescriptor.from_bytes_list(
ray_function.GetFunctionDescriptor())
descriptor = tuple(ray_function.GetFunctionDescriptor())
if <int>task_type == <int>TASK_TYPE_ACTOR_CREATION_TASK:
actor_class = worker.function_actor_manager.load_actor_class(
job_id, function_descriptor)
function_descriptor = FunctionDescriptor.from_bytes_list(
ray_function.GetFunctionDescriptor())
actor_class = manager.load_actor_class(job_id, function_descriptor)
actor_id = core_worker.get_actor_id()
worker.actors[actor_id] = actor_class.__new__(actor_class)
worker.actor_checkpoint_info[actor_id] = (
ray.worker.ActorCheckpointInfo(
@ -502,17 +506,23 @@ cdef execute_task(
last_checkpoint_timestamp=int(1000 * time.time()),
checkpoint_ids=[]))
execution_info = worker.function_actor_manager.get_execution_info(
job_id, function_descriptor)
execution_info = execution_infos.get(descriptor)
if not execution_info:
function_descriptor = FunctionDescriptor.from_bytes_list(
ray_function.GetFunctionDescriptor())
execution_info = manager.get_execution_info(job_id, function_descriptor)
execution_infos[descriptor] = execution_info
function_name = execution_info.function_name
extra_data = {"name": function_name, "task_id": task_id.hex()}
extra_data = (b'{"name": ' + function_name.encode("ascii") +
b' "task_id": ' + task_id.Hex() + b'}')
if <int>task_type == <int>TASK_TYPE_NORMAL_TASK:
title = "ray_worker:{}()".format(function_name)
next_title = "ray_worker"
function_executor = execution_info.function
else:
actor = worker.actors[actor_id]
actor = worker.actors[core_worker.get_actor_id()]
class_name = actor.__class__.__name__
title = "ray_{}:{}()".format(class_name, function_name)
next_title = "ray_{}".format(class_name)
@ -533,7 +543,7 @@ cdef execute_task(
return execution_info.function(actor, *arguments, **kwarguments)
return_ids = VectorToObjectIDs(c_return_ids)
with profiling.profile("task", extra_data=extra_data):
with core_worker.profile_event(b"task", extra_data=extra_data):
try:
task_exception = False
if not (<int>task_type == <int>TASK_TYPE_ACTOR_TASK
@ -541,12 +551,12 @@ cdef execute_task(
worker.reraise_actor_init_error()
worker.memory_monitor.raise_if_low_memory()
with profiling.profile("task:deserialize_arguments"):
with core_worker.profile_event(b"task:deserialize_arguments"):
args, kwargs = deserialize_args(c_args, c_arg_reference_ids)
# Execute the task.
with ray.worker._changeproctitle(title, next_title):
with profiling.profile("task:execute"):
with core_worker.profile_event(b"task:execute"):
task_exception = True
outputs = function_executor(*args, **kwargs)
task_exception = False
@ -554,7 +564,7 @@ cdef execute_task(
outputs = (outputs,)
# Store the outputs in the object store.
with profiling.profile("task:store_outputs"):
with core_worker.profile_event(b"task:store_outputs"):
_store_task_outputs(worker, return_ids, outputs)
except Exception as error:
if (<int>task_type == <int>TASK_TYPE_ACTOR_CREATION_TASK):
@ -588,18 +598,20 @@ cdef execute_task(
# all past signals.
ray_signal.reset()
# Reset the state of the worker for the next task to execute.
# Increase the task execution counter.
worker.function_actor_manager.increase_task_counter(
job_id, function_descriptor)
if execution_info.max_calls != 0:
function_descriptor = FunctionDescriptor.from_bytes_list(
ray_function.GetFunctionDescriptor())
# Reset the state of the worker for the next task to execute.
# Increase the task execution counter.
manager.increase_task_counter(job_id, function_descriptor)
# If we've reached the max number of executions for this worker, exit.
task_counter = manager.get_task_counter(job_id, function_descriptor)
if task_counter == execution_info.max_calls:
worker.core_worker.disconnect()
sys.exit(0)
# If we've reached the max number of executions for this worker, exit.
reached_max_executions = (
worker.function_actor_manager.get_task_counter(
job_id, function_descriptor) == execution_info.max_calls)
if reached_max_executions:
worker.core_worker.disconnect()
sys.exit(0)
cdef CRayStatus task_execution_handler(
CTaskType task_type,
@ -870,7 +882,7 @@ cdef class CoreWorker:
c_vector[CTaskArg] args_vector
c_vector[CObjectID] return_ids
with self.profile_event("submit_task"):
with self.profile_event(b"submit_task"):
prepare_resources(resources, &c_resources)
task_options = CTaskOptions(num_return_vals, c_resources)
ray_function = CRayFunction(
@ -897,7 +909,7 @@ cdef class CoreWorker:
unordered_map[c_string, double] c_placement_resources
CActorID c_actor_id
with profiling.profile("submit_task"):
with self.profile_event(b"submit_task"):
prepare_resources(resources, &c_resources)
prepare_resources(placement_resources, &c_placement_resources)
ray_function = CRayFunction(
@ -929,7 +941,7 @@ cdef class CoreWorker:
c_vector[CTaskArg] args_vector
c_vector[CObjectID] return_ids
with self.profile_event("submit_task"):
with self.profile_event(b"submit_task"):
prepare_resources(resources, &c_resources)
task_options = CTaskOptions(num_return_vals, c_resources)
ray_function = CRayFunction(
@ -966,12 +978,9 @@ cdef class CoreWorker:
return resources_dict
def profile_event(self, event_type, object extra_data=None):
cdef:
c_string c_event_type = event_type.encode("ascii")
def profile_event(self, c_string event_type, object extra_data=None):
return ProfileEvent.make(
self.core_worker.get().CreateProfileEvent(c_event_type),
self.core_worker.get().CreateProfileEvent(event_type),
extra_data)
def deserialize_and_register_actor_handle(self, const c_string &bytes):

View file

@ -301,6 +301,7 @@ class FunctionActorManager(object):
self.imported_actor_classes = set()
self._loaded_actor_classes = {}
self.lock = threading.Lock()
self.execution_infos = {}
def increase_task_counter(self, job_id, function_descriptor):
function_id = function_descriptor.function_id

View file

@ -186,7 +186,7 @@ cdef extern from "ray/core_worker/common.h" nogil:
CRayFunction(CLanguage language,
const c_vector[c_string] function_descriptor)
CLanguage GetLanguage()
c_vector[c_string] GetFunctionDescriptor()
const c_vector[c_string]& GetFunctionDescriptor()
cdef cppclass CTaskArg "ray::TaskArg":
@staticmethod

View file

@ -35,8 +35,13 @@ cdef class ProfileEvent:
elif self.extra_data is not None:
extra_data = self.extra_data
self.inner.get().SetExtraData(
json.dumps(extra_data).encode("ascii") if extra_data else b"{}")
if not extra_data:
self.inner.get().SetExtraData(b"{}")
elif isinstance(extra_data, dict):
self.inner.get().SetExtraData(
json.dumps(extra_data).encode("ascii"))
else:
self.inner.get().SetExtraData(extra_data)
# Deleting the CProfileEvent will add it to a queue to be pushed to
# the driver.

View file

@ -51,4 +51,5 @@ def profile(event_type, extra_data=None):
worker = ray.worker.global_worker
if worker.mode == ray.worker.LOCAL_MODE:
return NULL_LOG_SPAN
return worker.core_worker.profile_event(event_type, extra_data)
return worker.core_worker.profile_event(
event_type.encode("ascii"), extra_data)

View file

@ -292,7 +292,9 @@ def test_incorrect_method_calls(ray_start_regular):
def test_worker_raising_exception(ray_start_regular):
@ray.remote
def f():
ray.worker.global_worker.function_actor_manager = None
# This is the only reasonable variable we can set here that makes the
# execute_task function fail after the task got executed.
ray.experimental.signal.reset = None
# Running this task should cause the worker to raise an exception after
# the task has successfully completed.

View file

@ -27,7 +27,9 @@ class RayFunction {
Language GetLanguage() const { return language_; }
std::vector<std::string> GetFunctionDescriptor() const { return function_descriptor_; }
const std::vector<std::string> &GetFunctionDescriptor() const {
return function_descriptor_;
}
private:
Language language_;