mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
Remote function returning non-serializable type no longer shuts worker down (#384)
* Moved put_objects in main_loop to inside of try block * Added test for failed serialization * Fixed naming * Minor
This commit is contained in:
parent
43452b9ab3
commit
420bcc0477
3 changed files with 21 additions and 2 deletions
|
@ -964,6 +964,7 @@ def main_loop(worker=global_worker):
|
|||
outputs = worker.functions[function_name].executor(arguments) # execute the function
|
||||
if len(return_objectids) == 1:
|
||||
outputs = (outputs,)
|
||||
store_outputs_in_objstore(return_objectids, outputs, worker) # store output in local object store
|
||||
except Exception as e:
|
||||
# If the task threw an exception, then record the traceback. We determine
|
||||
# whether the exception was thrown in the task execution by whether the
|
||||
|
@ -975,8 +976,6 @@ def main_loop(worker=global_worker):
|
|||
# Notify the scheduler that the task failed.
|
||||
raylib.notify_failure(worker.handle, function_name, str(failure_object), raylib.FailedTask)
|
||||
_logger().info("While running function {}, worker threw exception with message: \n\n{}\n".format(function_name, str(failure_object)))
|
||||
else:
|
||||
store_outputs_in_objstore(return_objectids, outputs, worker) # store output in local object store
|
||||
# Notify the scheduler that the task is done. This happens regardless of
|
||||
# whether the task succeeded or failed.
|
||||
raylib.ready_for_new_task(worker.handle)
|
||||
|
|
|
@ -34,6 +34,18 @@ class FailureTest(unittest.TestCase):
|
|||
|
||||
ray.worker.cleanup()
|
||||
|
||||
def testUnknownSerialization(self):
|
||||
reload(test_functions)
|
||||
ray.init(start_ray_local=True, num_workers=1, driver_mode=ray.SILENT_MODE)
|
||||
|
||||
test_functions.test_unknown_type.remote()
|
||||
time.sleep(0.2)
|
||||
task_info = ray.task_info()
|
||||
self.assertEqual(len(task_info["failed_tasks"]), 1)
|
||||
self.assertEqual(len(task_info["running_tasks"]), 0)
|
||||
|
||||
ray.worker.cleanup()
|
||||
|
||||
class TaskStatusTest(unittest.TestCase):
|
||||
def testFailedTask(self):
|
||||
reload(test_functions)
|
||||
|
|
|
@ -116,3 +116,11 @@ def test_return1():
|
|||
@ray.remote([], [int, float])
|
||||
def test_return2():
|
||||
return 2.0, 3.0
|
||||
|
||||
class TestClass(object):
|
||||
def __init__(self):
|
||||
self.a = 5
|
||||
|
||||
@ray.remote([], [TestClass])
|
||||
def test_unknown_type():
|
||||
return TestClass()
|
||||
|
|
Loading…
Add table
Reference in a new issue