diff --git a/python/ray/_private/function_manager.py b/python/ray/_private/function_manager.py index 969ebad73..ad3dc183d 100644 --- a/python/ray/_private/function_manager.py +++ b/python/ray/_private/function_manager.py @@ -23,7 +23,6 @@ from ray._private.utils import ( decode, ensure_str, format_error_message, - push_error_to_driver, ) from ray.util.inspect import ( is_function_or_method, @@ -193,9 +192,17 @@ class FunctionActorManager: function = pickle.loads(serialized_function) except Exception: + # If an exception was thrown when the remote function was + # imported, we record the traceback and notify the scheduler + # of the failure. + traceback_str = format_error_message(traceback.format_exc()) + def f(*args, **kwargs): raise RuntimeError( - "This function was not imported properly.") + "The remote function failed to import on the " + "worker. This may be because needed library " + "dependencies are not installed in the worker " + "environment:\n\n{}".format(traceback_str)) # Use a placeholder method when function pickled failed self._function_execution_info[function_id] = ( @@ -203,11 +210,10 @@ class FunctionActorManager: function=f, function_name=function_name, max_calls=max_calls)) - # If an exception was thrown when the remote function was - # imported, we record the traceback and notify the scheduler - # of the failure. - traceback_str = format_error_message(traceback.format_exc()) - # Log the error message. + + # Log the error message. Log at DEBUG level to avoid overly + # spamming the log on import failure. The user gets the error + # via the RuntimeError message above. logger.debug("Failed to unpickle the remote function " f"'{function_name}' with " f"function ID {function_id.hex()}. " @@ -471,14 +477,17 @@ class FunctionActorManager: else: return None - def _create_fake_actor_class(self, actor_class_name, actor_method_names): + def _create_fake_actor_class(self, actor_class_name, actor_method_names, + traceback_str): class TemporaryActor: pass def temporary_actor_method(*args, **kwargs): - raise RuntimeError(f"The actor with name {actor_class_name} " - "failed to be imported, " - "and so cannot execute this method.") + raise RuntimeError( + f"The actor with name {actor_class_name} " + "failed to import on the worker. This may be because " + "needed library dependencies are not installed in the " + f"worker environment:\n\n{traceback_str}") for method in actor_method_names: setattr(TemporaryActor, method, temporary_actor_method) @@ -521,27 +530,15 @@ class FunctionActorManager: with self.lock: actor_class = pickle.loads(pickled_class) except Exception: - logger.exception("Failed to load actor class %s.", class_name) + logger.debug("Failed to load actor class %s.", class_name) + # If an exception was thrown when the actor was imported, we record + # the traceback and notify the scheduler of the failure. + traceback_str = format_error_message(traceback.format_exc()) # The actor class failed to be unpickled, create a fake actor # class instead (just to produce error messages and to prevent # the driver from hanging). actor_class = self._create_fake_actor_class( - class_name, actor_method_names) - # If an exception was thrown when the actor was imported, we record - # the traceback and notify the scheduler of the failure. - traceback_str = ray._private.utils.format_error_message( - traceback.format_exc()) - # Log the error message. - push_error_to_driver( - self._worker, - ray_constants.REGISTER_ACTOR_PUSH_ERROR, - f"Failed to unpickle actor class '{class_name}' " - f"for actor ID {self._worker.actor_id.hex()}. " - f"Traceback:\n{traceback_str}", - job_id=job_id) - # TODO(rkn): In the future, it might make sense to have the worker - # exit here. However, currently that would lead to hanging if - # someone calls ray.get on a method invoked on the actor. + class_name, actor_method_names, traceback_str) # The below line is necessary. Because in the driver process, # if the function is defined in the file where the python script diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index 7ea819f8a..c4ef87f87 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -132,7 +132,6 @@ REGISTER_REMOTE_FUNCTION_PUSH_ERROR = "register_remote_function" FUNCTION_TO_RUN_PUSH_ERROR = "function_to_run" VERSION_MISMATCH_PUSH_ERROR = "version_mismatch" CHECKPOINT_PUSH_ERROR = "checkpoint" -REGISTER_ACTOR_PUSH_ERROR = "register_actor" WORKER_CRASH_PUSH_ERROR = "worker_crash" WORKER_DIED_PUSH_ERROR = "worker_died" WORKER_POOL_LARGE_ERROR = "worker_pool_large" diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index 18553f40f..e118fe72b 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -1,6 +1,5 @@ import os import sys -import tempfile import time import numpy as np @@ -129,62 +128,6 @@ def test_failed_function_to_run(ray_start_2_cpus, error_pubsub): assert "Function to run failed." in errors[1].error_message -def test_fail_importing_actor(ray_start_regular, error_pubsub): - p = error_pubsub - # Create the contents of a temporary Python file. - temporary_python_file = """ -def temporary_helper_function(): - return 1 -""" - - f = tempfile.NamedTemporaryFile(suffix=".py") - f.write(temporary_python_file.encode("ascii")) - f.flush() - directory = os.path.dirname(f.name) - # Get the module name and strip ".py" from the end. - module_name = os.path.basename(f.name)[:-3] - sys.path.append(directory) - module = __import__(module_name) - - # Define an actor that closes over this temporary module. This should - # fail when it is unpickled. - @ray.remote - class Foo: - def __init__(self, arg1, arg2=3): - self.x = module.temporary_python_file() - - def get_val(self, arg1, arg2=3): - return 1 - - # There should be no errors yet. - errors = get_error_message(p, 2) - assert len(errors) == 0 - # Create an actor. - foo = Foo.remote(3, arg2=0) - - errors = get_error_message(p, 2) - assert len(errors) == 2 - - for error in errors: - # Wait for the error to arrive. - if error.type == ray_constants.REGISTER_ACTOR_PUSH_ERROR: - assert "No module named" in error.error_message - else: - # Wait for the error from when the __init__ tries to run. - assert ("failed to be imported, and so cannot execute this method" - in error.error_message) - - # Check that if we try to get the function it throws an exception and - # does not hang. - with pytest.raises(Exception, match="failed to be imported"): - ray.get(foo.get_val.remote(1, arg2=2)) - - f.close() - - # Clean up the junk we added to sys.path. - sys.path.pop(-1) - - def test_failed_actor_init(ray_start_regular, error_pubsub): p = error_pubsub error_message1 = "actor constructor failed" diff --git a/python/ray/tests/test_output.py b/python/ray/tests/test_output.py index f32905852..f084e0552 100644 --- a/python/ray/tests/test_output.py +++ b/python/ray/tests/test_output.py @@ -8,6 +8,87 @@ import ray from ray._private.test_utils import run_string_as_driver_nonblocking +@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") +def test_fail_importing_actor(ray_start_regular, error_pubsub): + script = """ +import os +import sys +import tempfile +import ray + +ray.init() +temporary_python_file = ''' +def temporary_helper_function(): + return 1 +''' + +f = tempfile.NamedTemporaryFile(suffix=".py") +f.write(temporary_python_file.encode("ascii")) +f.flush() +directory = os.path.dirname(f.name) +# Get the module name and strip ".py" from the end. +module_name = os.path.basename(f.name)[:-3] +sys.path.append(directory) +module = __import__(module_name) + +# Define an actor that closes over this temporary module. This should +# fail when it is unpickled. +@ray.remote +class Foo: + def __init__(self): + self.x = module.temporary_python_file() + +a = Foo.remote() +""" + proc = run_string_as_driver_nonblocking(script) + out_str = proc.stdout.read().decode("ascii") + err_str = proc.stderr.read().decode("ascii") + print(out_str) + print(err_str) + assert "ModuleNotFoundError: No module named" in err_str + assert "RuntimeError: The actor with name Foo failed to import" in err_str + + +@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") +def test_fail_importing_task(ray_start_regular, error_pubsub): + script = """ +import os +import sys +import tempfile +import ray + +ray.init() +temporary_python_file = ''' +def temporary_helper_function(): + return 1 +''' + +f = tempfile.NamedTemporaryFile(suffix=".py") +f.write(temporary_python_file.encode("ascii")) +f.flush() +directory = os.path.dirname(f.name) +# Get the module name and strip ".py" from the end. +module_name = os.path.basename(f.name)[:-3] +sys.path.append(directory) +module = __import__(module_name) + +# Define an actor that closes over this temporary module. This should +# fail when it is unpickled. +@ray.remote +def foo(): + return module.temporary_python_file() + +ray.get(foo.remote()) +""" + proc = run_string_as_driver_nonblocking(script) + out_str = proc.stdout.read().decode("ascii") + err_str = proc.stderr.read().decode("ascii") + print(out_str) + print(err_str) + assert "ModuleNotFoundError: No module named" in err_str + assert "RuntimeError: The remote function failed to import" in err_str + + @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") def test_worker_stdout(): script = """