diff --git a/python/ray/tests/test_multi_node.py b/python/ray/tests/test_multi_node.py index 199892201..e2695f158 100644 --- a/python/ray/tests/test_multi_node.py +++ b/python/ray/tests/test_multi_node.py @@ -172,6 +172,55 @@ print("success") assert "success" in out +def test_drivers_named_actors(call_ray_start): + # This test will create some drivers that submit some tasks to the same + # named actor. + address = call_ray_start + + ray.init(address=address) + + # Define a driver that creates a named actor then sleeps for a while. + driver_script1 = """ +import ray +import time +ray.init(address="{}") +@ray.remote +class Counter(object): + def __init__(self): + self.count = 0 + def increment(self): + self.count += 1 + return self.count +counter = Counter.remote() +ray.experimental.register_actor("Counter", counter) +time.sleep(100) +""".format(address) + + # Define a driver that submits to the named actor and exits. + driver_script2 = """ +import ray +import time +ray.init(address="{}") +while True: + try: + counter = ray.experimental.get_actor("Counter") + break + except ValueError: + time.sleep(1) +assert ray.get(counter.increment.remote()) == {} +print("success") +""".format(address, "{}") + + process_handle = run_string_as_driver_nonblocking(driver_script1) + + for i in range(3): + driver_script = driver_script2.format(i + 1) + out = run_string_as_driver(driver_script) + assert "success" in out + + process_handle.kill() + + def test_receive_late_worker_logs(): # Make sure that log messages from tasks appear in the stdout even if the # script exits quickly. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index b4fbe4fbd..7b3927efe 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -130,6 +130,7 @@ CoreWorker::CoreWorker( data->mutable_task()->mutable_task_spec()->CopyFrom(builder.Build().GetMessage()); RAY_CHECK_OK(gcs_client_->raylet_task_table().Add(job_id, task_id, data, nullptr)); worker_context_.SetCurrentTaskId(task_id); + SetCurrentTaskId(task_id); } direct_actor_submitter_ = std::unique_ptr(