mirror of
https://github.com/vale981/ray
synced 2025-03-06 18:41:40 -05:00
Improve the error message for failed task/actor imports on workers (#18792)
This commit is contained in:
parent
698b4eeed3
commit
cf0bd00cc2
4 changed files with 106 additions and 86 deletions
|
@ -23,7 +23,6 @@ from ray._private.utils import (
|
||||||
decode,
|
decode,
|
||||||
ensure_str,
|
ensure_str,
|
||||||
format_error_message,
|
format_error_message,
|
||||||
push_error_to_driver,
|
|
||||||
)
|
)
|
||||||
from ray.util.inspect import (
|
from ray.util.inspect import (
|
||||||
is_function_or_method,
|
is_function_or_method,
|
||||||
|
@ -193,9 +192,17 @@ class FunctionActorManager:
|
||||||
function = pickle.loads(serialized_function)
|
function = pickle.loads(serialized_function)
|
||||||
except Exception:
|
except Exception:
|
||||||
|
|
||||||
|
# If an exception was thrown when the remote function was
|
||||||
|
# imported, we record the traceback and notify the scheduler
|
||||||
|
# of the failure.
|
||||||
|
traceback_str = format_error_message(traceback.format_exc())
|
||||||
|
|
||||||
def f(*args, **kwargs):
|
def f(*args, **kwargs):
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
"This function was not imported properly.")
|
"The remote function failed to import on the "
|
||||||
|
"worker. This may be because needed library "
|
||||||
|
"dependencies are not installed in the worker "
|
||||||
|
"environment:\n\n{}".format(traceback_str))
|
||||||
|
|
||||||
# Use a placeholder method when function pickled failed
|
# Use a placeholder method when function pickled failed
|
||||||
self._function_execution_info[function_id] = (
|
self._function_execution_info[function_id] = (
|
||||||
|
@ -203,11 +210,10 @@ class FunctionActorManager:
|
||||||
function=f,
|
function=f,
|
||||||
function_name=function_name,
|
function_name=function_name,
|
||||||
max_calls=max_calls))
|
max_calls=max_calls))
|
||||||
# If an exception was thrown when the remote function was
|
|
||||||
# imported, we record the traceback and notify the scheduler
|
# Log the error message. Log at DEBUG level to avoid overly
|
||||||
# of the failure.
|
# spamming the log on import failure. The user gets the error
|
||||||
traceback_str = format_error_message(traceback.format_exc())
|
# via the RuntimeError message above.
|
||||||
# Log the error message.
|
|
||||||
logger.debug("Failed to unpickle the remote function "
|
logger.debug("Failed to unpickle the remote function "
|
||||||
f"'{function_name}' with "
|
f"'{function_name}' with "
|
||||||
f"function ID {function_id.hex()}. "
|
f"function ID {function_id.hex()}. "
|
||||||
|
@ -471,14 +477,17 @@ class FunctionActorManager:
|
||||||
else:
|
else:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _create_fake_actor_class(self, actor_class_name, actor_method_names):
|
def _create_fake_actor_class(self, actor_class_name, actor_method_names,
|
||||||
|
traceback_str):
|
||||||
class TemporaryActor:
|
class TemporaryActor:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def temporary_actor_method(*args, **kwargs):
|
def temporary_actor_method(*args, **kwargs):
|
||||||
raise RuntimeError(f"The actor with name {actor_class_name} "
|
raise RuntimeError(
|
||||||
"failed to be imported, "
|
f"The actor with name {actor_class_name} "
|
||||||
"and so cannot execute this method.")
|
"failed to import on the worker. This may be because "
|
||||||
|
"needed library dependencies are not installed in the "
|
||||||
|
f"worker environment:\n\n{traceback_str}")
|
||||||
|
|
||||||
for method in actor_method_names:
|
for method in actor_method_names:
|
||||||
setattr(TemporaryActor, method, temporary_actor_method)
|
setattr(TemporaryActor, method, temporary_actor_method)
|
||||||
|
@ -521,27 +530,15 @@ class FunctionActorManager:
|
||||||
with self.lock:
|
with self.lock:
|
||||||
actor_class = pickle.loads(pickled_class)
|
actor_class = pickle.loads(pickled_class)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Failed to load actor class %s.", class_name)
|
logger.debug("Failed to load actor class %s.", class_name)
|
||||||
|
# If an exception was thrown when the actor was imported, we record
|
||||||
|
# the traceback and notify the scheduler of the failure.
|
||||||
|
traceback_str = format_error_message(traceback.format_exc())
|
||||||
# The actor class failed to be unpickled, create a fake actor
|
# The actor class failed to be unpickled, create a fake actor
|
||||||
# class instead (just to produce error messages and to prevent
|
# class instead (just to produce error messages and to prevent
|
||||||
# the driver from hanging).
|
# the driver from hanging).
|
||||||
actor_class = self._create_fake_actor_class(
|
actor_class = self._create_fake_actor_class(
|
||||||
class_name, actor_method_names)
|
class_name, actor_method_names, traceback_str)
|
||||||
# If an exception was thrown when the actor was imported, we record
|
|
||||||
# the traceback and notify the scheduler of the failure.
|
|
||||||
traceback_str = ray._private.utils.format_error_message(
|
|
||||||
traceback.format_exc())
|
|
||||||
# Log the error message.
|
|
||||||
push_error_to_driver(
|
|
||||||
self._worker,
|
|
||||||
ray_constants.REGISTER_ACTOR_PUSH_ERROR,
|
|
||||||
f"Failed to unpickle actor class '{class_name}' "
|
|
||||||
f"for actor ID {self._worker.actor_id.hex()}. "
|
|
||||||
f"Traceback:\n{traceback_str}",
|
|
||||||
job_id=job_id)
|
|
||||||
# TODO(rkn): In the future, it might make sense to have the worker
|
|
||||||
# exit here. However, currently that would lead to hanging if
|
|
||||||
# someone calls ray.get on a method invoked on the actor.
|
|
||||||
|
|
||||||
# The below line is necessary. Because in the driver process,
|
# The below line is necessary. Because in the driver process,
|
||||||
# if the function is defined in the file where the python script
|
# if the function is defined in the file where the python script
|
||||||
|
|
|
@ -132,7 +132,6 @@ REGISTER_REMOTE_FUNCTION_PUSH_ERROR = "register_remote_function"
|
||||||
FUNCTION_TO_RUN_PUSH_ERROR = "function_to_run"
|
FUNCTION_TO_RUN_PUSH_ERROR = "function_to_run"
|
||||||
VERSION_MISMATCH_PUSH_ERROR = "version_mismatch"
|
VERSION_MISMATCH_PUSH_ERROR = "version_mismatch"
|
||||||
CHECKPOINT_PUSH_ERROR = "checkpoint"
|
CHECKPOINT_PUSH_ERROR = "checkpoint"
|
||||||
REGISTER_ACTOR_PUSH_ERROR = "register_actor"
|
|
||||||
WORKER_CRASH_PUSH_ERROR = "worker_crash"
|
WORKER_CRASH_PUSH_ERROR = "worker_crash"
|
||||||
WORKER_DIED_PUSH_ERROR = "worker_died"
|
WORKER_DIED_PUSH_ERROR = "worker_died"
|
||||||
WORKER_POOL_LARGE_ERROR = "worker_pool_large"
|
WORKER_POOL_LARGE_ERROR = "worker_pool_large"
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import tempfile
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
@ -129,62 +128,6 @@ def test_failed_function_to_run(ray_start_2_cpus, error_pubsub):
|
||||||
assert "Function to run failed." in errors[1].error_message
|
assert "Function to run failed." in errors[1].error_message
|
||||||
|
|
||||||
|
|
||||||
def test_fail_importing_actor(ray_start_regular, error_pubsub):
|
|
||||||
p = error_pubsub
|
|
||||||
# Create the contents of a temporary Python file.
|
|
||||||
temporary_python_file = """
|
|
||||||
def temporary_helper_function():
|
|
||||||
return 1
|
|
||||||
"""
|
|
||||||
|
|
||||||
f = tempfile.NamedTemporaryFile(suffix=".py")
|
|
||||||
f.write(temporary_python_file.encode("ascii"))
|
|
||||||
f.flush()
|
|
||||||
directory = os.path.dirname(f.name)
|
|
||||||
# Get the module name and strip ".py" from the end.
|
|
||||||
module_name = os.path.basename(f.name)[:-3]
|
|
||||||
sys.path.append(directory)
|
|
||||||
module = __import__(module_name)
|
|
||||||
|
|
||||||
# Define an actor that closes over this temporary module. This should
|
|
||||||
# fail when it is unpickled.
|
|
||||||
@ray.remote
|
|
||||||
class Foo:
|
|
||||||
def __init__(self, arg1, arg2=3):
|
|
||||||
self.x = module.temporary_python_file()
|
|
||||||
|
|
||||||
def get_val(self, arg1, arg2=3):
|
|
||||||
return 1
|
|
||||||
|
|
||||||
# There should be no errors yet.
|
|
||||||
errors = get_error_message(p, 2)
|
|
||||||
assert len(errors) == 0
|
|
||||||
# Create an actor.
|
|
||||||
foo = Foo.remote(3, arg2=0)
|
|
||||||
|
|
||||||
errors = get_error_message(p, 2)
|
|
||||||
assert len(errors) == 2
|
|
||||||
|
|
||||||
for error in errors:
|
|
||||||
# Wait for the error to arrive.
|
|
||||||
if error.type == ray_constants.REGISTER_ACTOR_PUSH_ERROR:
|
|
||||||
assert "No module named" in error.error_message
|
|
||||||
else:
|
|
||||||
# Wait for the error from when the __init__ tries to run.
|
|
||||||
assert ("failed to be imported, and so cannot execute this method"
|
|
||||||
in error.error_message)
|
|
||||||
|
|
||||||
# Check that if we try to get the function it throws an exception and
|
|
||||||
# does not hang.
|
|
||||||
with pytest.raises(Exception, match="failed to be imported"):
|
|
||||||
ray.get(foo.get_val.remote(1, arg2=2))
|
|
||||||
|
|
||||||
f.close()
|
|
||||||
|
|
||||||
# Clean up the junk we added to sys.path.
|
|
||||||
sys.path.pop(-1)
|
|
||||||
|
|
||||||
|
|
||||||
def test_failed_actor_init(ray_start_regular, error_pubsub):
|
def test_failed_actor_init(ray_start_regular, error_pubsub):
|
||||||
p = error_pubsub
|
p = error_pubsub
|
||||||
error_message1 = "actor constructor failed"
|
error_message1 = "actor constructor failed"
|
||||||
|
|
|
@ -8,6 +8,87 @@ import ray
|
||||||
from ray._private.test_utils import run_string_as_driver_nonblocking
|
from ray._private.test_utils import run_string_as_driver_nonblocking
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
|
||||||
|
def test_fail_importing_actor(ray_start_regular, error_pubsub):
|
||||||
|
script = """
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
import ray
|
||||||
|
|
||||||
|
ray.init()
|
||||||
|
temporary_python_file = '''
|
||||||
|
def temporary_helper_function():
|
||||||
|
return 1
|
||||||
|
'''
|
||||||
|
|
||||||
|
f = tempfile.NamedTemporaryFile(suffix=".py")
|
||||||
|
f.write(temporary_python_file.encode("ascii"))
|
||||||
|
f.flush()
|
||||||
|
directory = os.path.dirname(f.name)
|
||||||
|
# Get the module name and strip ".py" from the end.
|
||||||
|
module_name = os.path.basename(f.name)[:-3]
|
||||||
|
sys.path.append(directory)
|
||||||
|
module = __import__(module_name)
|
||||||
|
|
||||||
|
# Define an actor that closes over this temporary module. This should
|
||||||
|
# fail when it is unpickled.
|
||||||
|
@ray.remote
|
||||||
|
class Foo:
|
||||||
|
def __init__(self):
|
||||||
|
self.x = module.temporary_python_file()
|
||||||
|
|
||||||
|
a = Foo.remote()
|
||||||
|
"""
|
||||||
|
proc = run_string_as_driver_nonblocking(script)
|
||||||
|
out_str = proc.stdout.read().decode("ascii")
|
||||||
|
err_str = proc.stderr.read().decode("ascii")
|
||||||
|
print(out_str)
|
||||||
|
print(err_str)
|
||||||
|
assert "ModuleNotFoundError: No module named" in err_str
|
||||||
|
assert "RuntimeError: The actor with name Foo failed to import" in err_str
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
|
||||||
|
def test_fail_importing_task(ray_start_regular, error_pubsub):
|
||||||
|
script = """
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
import ray
|
||||||
|
|
||||||
|
ray.init()
|
||||||
|
temporary_python_file = '''
|
||||||
|
def temporary_helper_function():
|
||||||
|
return 1
|
||||||
|
'''
|
||||||
|
|
||||||
|
f = tempfile.NamedTemporaryFile(suffix=".py")
|
||||||
|
f.write(temporary_python_file.encode("ascii"))
|
||||||
|
f.flush()
|
||||||
|
directory = os.path.dirname(f.name)
|
||||||
|
# Get the module name and strip ".py" from the end.
|
||||||
|
module_name = os.path.basename(f.name)[:-3]
|
||||||
|
sys.path.append(directory)
|
||||||
|
module = __import__(module_name)
|
||||||
|
|
||||||
|
# Define an actor that closes over this temporary module. This should
|
||||||
|
# fail when it is unpickled.
|
||||||
|
@ray.remote
|
||||||
|
def foo():
|
||||||
|
return module.temporary_python_file()
|
||||||
|
|
||||||
|
ray.get(foo.remote())
|
||||||
|
"""
|
||||||
|
proc = run_string_as_driver_nonblocking(script)
|
||||||
|
out_str = proc.stdout.read().decode("ascii")
|
||||||
|
err_str = proc.stderr.read().decode("ascii")
|
||||||
|
print(out_str)
|
||||||
|
print(err_str)
|
||||||
|
assert "ModuleNotFoundError: No module named" in err_str
|
||||||
|
assert "RuntimeError: The remote function failed to import" in err_str
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
|
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
|
||||||
def test_worker_stdout():
|
def test_worker_stdout():
|
||||||
script = """
|
script = """
|
||||||
|
|
Loading…
Add table
Reference in a new issue