mirror of
https://github.com/vale981/ray
synced 2025-03-05 10:01:43 -05:00
[Core] Add more accurate worker exit (#24468)
This PR adds precise reason details regarding worker failures. All information is available either by - ray list workers - exceptions from actor failures. Here's an example when the actor is killed by a SIGKILL (e.g., OOM killer) ``` RayActorError: The actor died unexpectedly before finishing this task. class_name: G actor_id: e818d2f0521a334daf03540701000000 pid: 61251 namespace: 674a49b2-5b9b-4fcc-b6e1-5a1d4b9400d2 ip: 127.0.0.1 The actor is dead because its worker process has died. Worker exit type: UNEXPECTED_SYSTEM_EXIT Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors. ``` ## Design Worker failures are reported by Raylet from 2 paths. (1) When the core worker calls `Disconnect`. (2) When the worker is unexpectedly killed, the socket is closed, raylet reports the worker failures. The PR ensures all worker failures are reported through Disconnect while it includes more detailed information to its metadata. ## Exit types Previously, the worker exit types are arbitrary and not correctly categorized. This PR reduces the number of worker exit types while it includes details of each exit type so that users can easily figure out the root cause of worker crashes. ### Status quo - SYSTEM ERROR EXIT - Failure from the connection (core worker dead) - Unexpected exception or exit with exit_code !=0 on core worker - Direct call failure - INTENDED EXIT - Shutdown driver - Exit_actor - exit(0) - Actor kill request - Task cancel request - UNUSED_RESOURCE_REMOVED - Upon GCS restart, it kills bundles that are not registered to GCS to synchronize the state - PG_REMOVED - When pg is removed, all workers fate share - CREATION_TASK (INIT ERROR) - When actor init has an error - IDLE - When worker is idle and num workers > soft limit (by default num cpus) - NODE DIED - Only can detect when the node of the owner is dead (need improvement) ### New proposal Remove unnecessary states and add “details” field. We can categorize failures by 4 types - UNEXPECTED_SYSTEM_ERROR_EXIT - When the worker is crashed unexpectedly - Failure from the connection (core worker dead) - Unexpected exception or exit with exit_code !=0 on core worker - Node died - Direct call failure - INTENDED_USER_EXIT. - When the worker is requested to be killed by users. No workflow required. Just correctly store the state. - Shutdown driver - Exit_actor - exit(0) - Actor kill request - Task cancel request - INTENDED_SYSTEM_EXIT - When the worker is requested to be killed by system (without explicit user request) - Unused resource removed - Pg removed - Idle - ACTOR_INIT_FAILURE (CREATION_TASK_FAILED) - When the actor init is failed, we fate share the process with the actor. - Actor init failed ## Limitation (Follow up) Worker failures are not reported under following circumstances - Worker is failed before it registers its information to GCS (it is usually from critical system bug, and extremely uncommon). - Node is failed. In this case, we should track Node ID -> Worker ID mapping at GCS and when the node is failed, we should record worker metadata. I will create issues to track these problems.
This commit is contained in:
parent
0212f5e095
commit
d89c8aa9f9
31 changed files with 655 additions and 183 deletions
|
@ -102,7 +102,7 @@ std::pair<Status, std::shared_ptr<msgpack::sbuffer>> GetExecuteResult(
|
|||
return std::make_pair(ray::Status::OK(),
|
||||
std::make_shared<msgpack::sbuffer>(std::move(result)));
|
||||
} catch (RayIntentionalSystemExitException &e) {
|
||||
return std::make_pair(ray::Status::IntentionalSystemExit(), nullptr);
|
||||
return std::make_pair(ray::Status::IntentionalSystemExit(""), nullptr);
|
||||
} catch (RayException &e) {
|
||||
return std::make_pair(ray::Status::NotFound(e.what()), nullptr);
|
||||
} catch (msgpack::type_error &e) {
|
||||
|
@ -251,7 +251,7 @@ Status TaskExecutor::ExecuteTask(
|
|||
RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().SealReturnObject(result_id, result));
|
||||
} else {
|
||||
if (!status.ok()) {
|
||||
return ray::Status::CreationTaskError();
|
||||
return ray::Status::CreationTaskError("");
|
||||
}
|
||||
}
|
||||
return ray::Status::OK();
|
||||
|
|
|
@ -521,6 +521,7 @@ cdef execute_task(
|
|||
if core_worker.current_actor_is_asyncio():
|
||||
error = SystemExit(0)
|
||||
error.is_ray_terminate = True
|
||||
error.ray_terminate_msg = "exit_actor() is called."
|
||||
raise error
|
||||
|
||||
function_descriptor = CFunctionDescriptorToPython(
|
||||
|
@ -770,6 +771,9 @@ cdef execute_task(
|
|||
if task_counter == execution_info.max_calls:
|
||||
exit = SystemExit(0)
|
||||
exit.is_ray_terminate = True
|
||||
exit.ray_terminate_msg = (
|
||||
"max_call has reached, "
|
||||
f"max_calls: {execution_info.max_calls}")
|
||||
raise exit
|
||||
|
||||
cdef shared_ptr[LocalMemoryBuffer] ray_error_to_memory_buf(ray_error):
|
||||
|
@ -815,6 +819,9 @@ cdef CRayStatus task_execution_handler(
|
|||
(&creation_task_exception_pb_bytes)[0] = (
|
||||
ray_error_to_memory_buf(e))
|
||||
sys_exit.is_creation_task_error = True
|
||||
sys_exit.init_error_message = (
|
||||
"Exception raised from an actor init method. "
|
||||
f"Traceback: {str(e)}")
|
||||
else:
|
||||
traceback_str = traceback.format_exc() + (
|
||||
"An unexpected internal error "
|
||||
|
@ -825,28 +832,32 @@ cdef CRayStatus task_execution_handler(
|
|||
"worker_crash",
|
||||
traceback_str,
|
||||
job_id=None)
|
||||
sys_exit.unexpected_error_traceback = traceback_str
|
||||
raise sys_exit
|
||||
except SystemExit as e:
|
||||
# Tell the core worker to exit as soon as the result objects
|
||||
# are processed.
|
||||
if hasattr(e, "is_ray_terminate"):
|
||||
return CRayStatus.IntentionalSystemExit()
|
||||
return CRayStatus.IntentionalSystemExit(e.ray_terminate_msg)
|
||||
elif hasattr(e, "is_creation_task_error"):
|
||||
return CRayStatus.CreationTaskError()
|
||||
elif e.code and e.code == 0:
|
||||
return CRayStatus.CreationTaskError(e.init_error_message)
|
||||
elif e.code is not None and e.code == 0:
|
||||
# This means the system exit was
|
||||
# normal based on the python convention.
|
||||
# https://docs.python.org/3/library/sys.html#sys.exit
|
||||
return CRayStatus.IntentionalSystemExit()
|
||||
return CRayStatus.IntentionalSystemExit(
|
||||
f"Worker exits with an exit code {e.code}.")
|
||||
else:
|
||||
msg = "SystemExit was raised from the worker."
|
||||
msg = f"Worker exits with an exit code {e.code}."
|
||||
# In K8s, SIGTERM likely means we hit memory limits, so print
|
||||
# a more informative message there.
|
||||
if "KUBERNETES_SERVICE_HOST" in os.environ:
|
||||
msg += (
|
||||
" The worker may have exceeded K8s pod memory limits.")
|
||||
if hasattr(e, "unexpected_error_traceback"):
|
||||
msg += (f"\n {e.unexpected_error_traceback}")
|
||||
logger.exception(msg)
|
||||
return CRayStatus.UnexpectedSystemExit()
|
||||
return CRayStatus.UnexpectedSystemExit(msg)
|
||||
|
||||
return CRayStatus.OK()
|
||||
|
||||
|
|
|
@ -1295,6 +1295,7 @@ def exit_actor():
|
|||
# reduces log verbosity.
|
||||
exit = SystemExit(0)
|
||||
exit.is_ray_terminate = True
|
||||
exit.ray_terminate_msg = "exit_actor() is called."
|
||||
raise exit
|
||||
assert False, "This process should have terminated."
|
||||
else:
|
||||
|
|
|
@ -58,6 +58,9 @@ class WorkerState:
|
|||
worker_id: str
|
||||
is_alive: str
|
||||
worker_type: str
|
||||
exit_type: str
|
||||
exit_detail: str
|
||||
pid: str
|
||||
|
||||
|
||||
@dataclass(init=True)
|
||||
|
|
|
@ -88,13 +88,13 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil:
|
|||
CRayStatus Interrupted(const c_string &msg)
|
||||
|
||||
@staticmethod
|
||||
CRayStatus IntentionalSystemExit()
|
||||
CRayStatus IntentionalSystemExit(const c_string &msg)
|
||||
|
||||
@staticmethod
|
||||
CRayStatus UnexpectedSystemExit()
|
||||
CRayStatus UnexpectedSystemExit(const c_string &msg)
|
||||
|
||||
@staticmethod
|
||||
CRayStatus CreationTaskError()
|
||||
CRayStatus CreationTaskError(const c_string &msg)
|
||||
|
||||
@staticmethod
|
||||
CRayStatus NotFound()
|
||||
|
|
|
@ -225,6 +225,7 @@ py_test_module_list(
|
|||
"test_failure_3.py",
|
||||
"test_chaos.py",
|
||||
"test_reference_counting_2.py",
|
||||
"test_exit_observability.py",
|
||||
],
|
||||
size = "large",
|
||||
extra_srcs = SRCS,
|
||||
|
|
311
python/ray/tests/test_exit_observability.py
Normal file
311
python/ray/tests/test_exit_observability.py
Normal file
|
@ -0,0 +1,311 @@
|
|||
import os
|
||||
import sys
|
||||
import signal
|
||||
|
||||
import ray
|
||||
|
||||
import pytest
|
||||
|
||||
from ray.experimental.state.api import list_workers
|
||||
from ray._private.test_utils import wait_for_condition, run_string_as_driver
|
||||
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
|
||||
|
||||
|
||||
def get_worker_by_pid(pid):
|
||||
for w in list_workers().values():
|
||||
if w["pid"] == pid:
|
||||
return w
|
||||
assert False
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
sys.platform == "win32",
|
||||
reason="Failed on Windows",
|
||||
)
|
||||
def test_worker_exit_system_error(ray_start_cluster):
|
||||
"""
|
||||
SYSTEM_ERROR
|
||||
- (tested) Failure from the connection E.g., core worker dead.
|
||||
- (tested) Unexpected exception or exit with exit_code !=0 on core worker.
|
||||
- (tested for owner node death) Node died. Currently worker failure detection
|
||||
upon node death is not detected by Ray. TODO(sang): Fix it.
|
||||
- (Cannot test) Direct call failure.
|
||||
"""
|
||||
cluster = ray_start_cluster
|
||||
cluster.add_node(num_cpus=4)
|
||||
ray.init(address=cluster.address)
|
||||
cluster.add_node(num_cpus=1, resources={"worker": 1})
|
||||
|
||||
@ray.remote
|
||||
class Actor:
|
||||
def pid(self):
|
||||
import os
|
||||
|
||||
return os.getpid()
|
||||
|
||||
def exit(self, exit_code):
|
||||
sys.exit(exit_code)
|
||||
|
||||
"""
|
||||
Failure from the connection
|
||||
"""
|
||||
a = Actor.remote()
|
||||
pid = ray.get(a.pid.remote())
|
||||
print(pid)
|
||||
os.kill(pid, signal.SIGKILL)
|
||||
|
||||
def verify_connection_failure():
|
||||
worker = get_worker_by_pid(pid)
|
||||
print(worker)
|
||||
type = worker["exit_type"]
|
||||
detail = worker["exit_detail"]
|
||||
# If the worker is killed by SIGKILL, it is highly likely by OOM, so
|
||||
# the error message should contain information.
|
||||
return type == "SYSTEM_ERROR" and "OOM" in detail
|
||||
|
||||
wait_for_condition(verify_connection_failure)
|
||||
|
||||
"""
|
||||
Unexpected exception or exit with exit_code !=0 on core worker.
|
||||
"""
|
||||
a = Actor.remote()
|
||||
pid = ray.get(a.pid.remote())
|
||||
with pytest.raises(ray.exceptions.RayActorError):
|
||||
ray.get(a.exit.remote(4))
|
||||
|
||||
def verify_exit_failure():
|
||||
worker = get_worker_by_pid(pid)
|
||||
type = worker["exit_type"]
|
||||
detail = worker["exit_detail"]
|
||||
# If the worker is killed by SIGKILL, it is highly likely by OOM, so
|
||||
# the error message should contain information.
|
||||
return type == "SYSTEM_ERROR" and "exit code 4" in detail
|
||||
|
||||
wait_for_condition(verify_exit_failure)
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
sys.platform == "win32",
|
||||
reason="Failed on Windows",
|
||||
)
|
||||
def test_worker_exit_intended_user_exit(ray_start_cluster):
|
||||
"""
|
||||
INTENDED_USER_EXIT
|
||||
- (tested) Shutdown driver
|
||||
- (tested) exit_actor
|
||||
- (tested) exit(0)
|
||||
- (tested) Actor kill request
|
||||
- (tested) Task cancel request
|
||||
"""
|
||||
cluster = ray_start_cluster
|
||||
cluster.add_node(num_cpus=4)
|
||||
ray.init(address=cluster.address)
|
||||
cluster.add_node(num_cpus=1, resources={"worker": 1})
|
||||
|
||||
driver = """
|
||||
import ray
|
||||
import os
|
||||
ray.init(address="{address}")
|
||||
print(os.getpid())
|
||||
ray.shutdown()
|
||||
""".format(
|
||||
address=cluster.address
|
||||
)
|
||||
a = run_string_as_driver(driver)
|
||||
driver_pid = int(a.strip("\n"))
|
||||
|
||||
def verify_worker_exit_by_shutdown():
|
||||
worker = get_worker_by_pid(driver_pid)
|
||||
type = worker["exit_type"]
|
||||
detail = worker["exit_detail"]
|
||||
return type == "INTENDED_USER_EXIT" and "ray.shutdown()" in detail
|
||||
|
||||
wait_for_condition(verify_worker_exit_by_shutdown)
|
||||
|
||||
@ray.remote
|
||||
class A:
|
||||
def pid(self):
|
||||
return os.getpid()
|
||||
|
||||
def exit(self):
|
||||
ray.actor.exit_actor()
|
||||
|
||||
def exit_with_exit_code(self):
|
||||
sys.exit(0)
|
||||
|
||||
a = A.remote()
|
||||
pid = ray.get(a.pid.remote())
|
||||
with pytest.raises(ray.exceptions.RayActorError, match="exit_actor"):
|
||||
ray.get(a.exit.remote())
|
||||
|
||||
def verify_worker_exit_actor():
|
||||
worker = get_worker_by_pid(pid)
|
||||
type = worker["exit_type"]
|
||||
detail = worker["exit_detail"]
|
||||
return type == "INTENDED_USER_EXIT" and "exit_actor" in detail
|
||||
|
||||
wait_for_condition(verify_worker_exit_actor)
|
||||
|
||||
a = A.remote()
|
||||
pid = ray.get(a.pid.remote())
|
||||
with pytest.raises(ray.exceptions.RayActorError, match="exit code 0"):
|
||||
ray.get(a.exit_with_exit_code.remote())
|
||||
|
||||
def verify_exit_code_0():
|
||||
worker = get_worker_by_pid(pid)
|
||||
type = worker["exit_type"]
|
||||
detail = worker["exit_detail"]
|
||||
return type == "INTENDED_USER_EXIT" and "exit code 0" in detail
|
||||
|
||||
wait_for_condition(verify_exit_code_0)
|
||||
|
||||
a = A.remote()
|
||||
pid = ray.get(a.pid.remote())
|
||||
ray.kill(a)
|
||||
with pytest.raises(ray.exceptions.RayActorError, match="ray.kill"):
|
||||
ray.get(a.exit_with_exit_code.remote())
|
||||
|
||||
def verify_exit_by_ray_kill():
|
||||
worker = get_worker_by_pid(pid)
|
||||
type = worker["exit_type"]
|
||||
detail = worker["exit_detail"]
|
||||
return type == "INTENDED_SYSTEM_EXIT" and "ray.kill" in detail
|
||||
|
||||
wait_for_condition(verify_exit_by_ray_kill)
|
||||
|
||||
@ray.remote
|
||||
class PidDB:
|
||||
def __init__(self):
|
||||
self.pid = None
|
||||
|
||||
def record_pid(self, pid):
|
||||
self.pid = pid
|
||||
|
||||
def get_pid(self):
|
||||
return self.pid
|
||||
|
||||
p = PidDB.remote()
|
||||
|
||||
@ray.remote
|
||||
def f():
|
||||
ray.get(p.record_pid.remote(os.getpid()))
|
||||
import time
|
||||
|
||||
time.sleep(300)
|
||||
|
||||
t = f.remote()
|
||||
wait_for_condition(lambda: ray.get(p.get_pid.remote()) is not None, timeout=300)
|
||||
ray.cancel(t, force=True)
|
||||
pid = ray.get(p.get_pid.remote())
|
||||
|
||||
def verify_exit_by_ray_cancel():
|
||||
worker = get_worker_by_pid(pid)
|
||||
type = worker["exit_type"]
|
||||
detail = worker["exit_detail"]
|
||||
return type == "INTENDED_USER_EXIT" and "ray.cancel" in detail
|
||||
|
||||
wait_for_condition(verify_exit_by_ray_cancel)
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
sys.platform == "win32",
|
||||
reason="Failed on Windows",
|
||||
)
|
||||
def test_worker_exit_intended_system_exit_and_user_error(ray_start_cluster):
|
||||
"""
|
||||
INTENDED_SYSTEM_EXIT
|
||||
- (not tested, hard to test) Unused resource removed
|
||||
- (tested) Pg removed
|
||||
- (tested) Idle
|
||||
USER_ERROR
|
||||
- (tested) Actor init failed
|
||||
"""
|
||||
cluster = ray_start_cluster
|
||||
cluster.add_node(num_cpus=1)
|
||||
ray.init(address=cluster.address)
|
||||
|
||||
@ray.remote
|
||||
def f():
|
||||
return ray.get(g.remote())
|
||||
|
||||
@ray.remote
|
||||
def g():
|
||||
return os.getpid()
|
||||
|
||||
# Start a task that has a blocking call ray.get with g.remote.
|
||||
# g.remote will borrow the CPU and start a new worker.
|
||||
# The worker started for g.remote will exit by IDLE timeout.
|
||||
pid = ray.get(f.remote())
|
||||
|
||||
def verify_exit_by_idle_timeout():
|
||||
worker = get_worker_by_pid(pid)
|
||||
type = worker["exit_type"]
|
||||
detail = worker["exit_detail"]
|
||||
return type == "INTENDED_SYSTEM_EXIT" and "it was idle" in detail
|
||||
|
||||
wait_for_condition(verify_exit_by_idle_timeout)
|
||||
|
||||
@ray.remote
|
||||
class A:
|
||||
def getpid(self):
|
||||
return os.getpid()
|
||||
|
||||
pg = ray.util.placement_group(bundles=[{"CPU": 1}])
|
||||
a = A.options(
|
||||
scheduling_strategy=PlacementGroupSchedulingStrategy(placement_group=pg)
|
||||
).remote()
|
||||
pid = ray.get(a.getpid.remote())
|
||||
ray.util.remove_placement_group(pg)
|
||||
|
||||
def verify_exit_by_pg_removed():
|
||||
worker = get_worker_by_pid(pid)
|
||||
type = worker["exit_type"]
|
||||
detail = worker["exit_detail"]
|
||||
return (
|
||||
type == "INTENDED_SYSTEM_EXIT" and "placement group was removed" in detail
|
||||
)
|
||||
|
||||
wait_for_condition(verify_exit_by_pg_removed)
|
||||
|
||||
@ray.remote
|
||||
class PidDB:
|
||||
def __init__(self):
|
||||
self.pid = None
|
||||
|
||||
def record_pid(self, pid):
|
||||
self.pid = pid
|
||||
|
||||
def get_pid(self):
|
||||
return self.pid
|
||||
|
||||
p = PidDB.remote()
|
||||
|
||||
@ray.remote
|
||||
class FaultyActor:
|
||||
def __init__(self):
|
||||
p.record_pid.remote(os.getpid())
|
||||
raise Exception
|
||||
|
||||
def ready(self):
|
||||
pass
|
||||
|
||||
a = FaultyActor.remote()
|
||||
wait_for_condition(lambda: ray.get(p.get_pid.remote()) is not None)
|
||||
pid = ray.get(p.get_pid.remote())
|
||||
|
||||
def verify_exit_by_actor_init_failure():
|
||||
worker = get_worker_by_pid(pid)
|
||||
type = worker["exit_type"]
|
||||
detail = worker["exit_detail"]
|
||||
print(type, detail)
|
||||
return (
|
||||
type == "USER_ERROR" and "exception in the initialization method" in detail
|
||||
)
|
||||
|
||||
wait_for_condition(verify_exit_by_actor_init_failure)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import pytest
|
||||
|
||||
sys.exit(pytest.main(["-v", __file__]))
|
|
@ -8,7 +8,10 @@ import numpy as np
|
|||
import pytest
|
||||
import time
|
||||
|
||||
from ray._private.test_utils import SignalActor, wait_for_pid_to_exit
|
||||
from ray._private.test_utils import (
|
||||
SignalActor,
|
||||
wait_for_pid_to_exit,
|
||||
)
|
||||
|
||||
SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM
|
||||
|
||||
|
|
|
@ -360,14 +360,12 @@ std::shared_ptr<ClientConnection> ClientConnection::Create(
|
|||
local_stream_socket &&socket,
|
||||
const std::string &debug_label,
|
||||
const std::vector<std::string> &message_type_enum_names,
|
||||
int64_t error_message_type,
|
||||
const std::vector<uint8_t> &error_message_data) {
|
||||
int64_t error_message_type) {
|
||||
std::shared_ptr<ClientConnection> self(new ClientConnection(message_handler,
|
||||
std::move(socket),
|
||||
debug_label,
|
||||
message_type_enum_names,
|
||||
error_message_type,
|
||||
error_message_data));
|
||||
error_message_type));
|
||||
// Let our manager process our new connection.
|
||||
client_handler(*self);
|
||||
return self;
|
||||
|
@ -378,15 +376,13 @@ ClientConnection::ClientConnection(
|
|||
local_stream_socket &&socket,
|
||||
const std::string &debug_label,
|
||||
const std::vector<std::string> &message_type_enum_names,
|
||||
int64_t error_message_type,
|
||||
const std::vector<uint8_t> &error_message_data)
|
||||
int64_t error_message_type)
|
||||
: ServerConnection(std::move(socket)),
|
||||
registered_(false),
|
||||
message_handler_(message_handler),
|
||||
debug_label_(debug_label),
|
||||
message_type_enum_names_(message_type_enum_names),
|
||||
error_message_type_(error_message_type),
|
||||
error_message_data_(error_message_data) {}
|
||||
error_message_type_(error_message_type) {}
|
||||
|
||||
void ClientConnection::Register() {
|
||||
RAY_CHECK(!registered_);
|
||||
|
@ -427,9 +423,6 @@ void ClientConnection::ProcessMessages() {
|
|||
|
||||
void ClientConnection::ProcessMessageHeader(const boost::system::error_code &error) {
|
||||
if (error) {
|
||||
// If there was an error, disconnect the client.
|
||||
read_type_ = error_message_type_;
|
||||
read_message_ = error_message_data_;
|
||||
read_length_ = 0;
|
||||
ProcessMessage(error);
|
||||
return;
|
||||
|
@ -502,7 +495,24 @@ std::string ClientConnection::RemoteEndpointInfo() {
|
|||
|
||||
void ClientConnection::ProcessMessage(const boost::system::error_code &error) {
|
||||
if (error) {
|
||||
flatbuffers::FlatBufferBuilder fbb;
|
||||
const auto &disconnect_detail = fbb.CreateString(absl::StrCat(
|
||||
"Worker unexpectedly exits with a connection error code ",
|
||||
error.value(),
|
||||
". ",
|
||||
error.message(),
|
||||
". There are some potential root causes. (1) The process is killed by "
|
||||
"SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is "
|
||||
"called. (3) The worker is crashed unexpectedly due to SIGSEGV or other "
|
||||
"unexpected errors."));
|
||||
protocol::DisconnectClientBuilder builder(fbb);
|
||||
builder.add_disconnect_type(static_cast<int>(ray::rpc::WorkerExitType::SYSTEM_ERROR));
|
||||
builder.add_disconnect_detail(disconnect_detail);
|
||||
fbb.Finish(builder.Finish());
|
||||
std::vector<uint8_t> error_data(fbb.GetBufferPointer(),
|
||||
fbb.GetBufferPointer() + fbb.GetSize());
|
||||
read_type_ = error_message_type_;
|
||||
read_message_ = error_data;
|
||||
}
|
||||
|
||||
int64_t start_ms = current_time_ms();
|
||||
|
|
|
@ -22,8 +22,10 @@
|
|||
#include <memory>
|
||||
|
||||
#include "ray/common/asio/instrumented_io_context.h"
|
||||
#include "ray/common/common_protocol.h"
|
||||
#include "ray/common/id.h"
|
||||
#include "ray/common/status.h"
|
||||
#include "ray/raylet/format/node_manager_generated.h"
|
||||
|
||||
namespace ray {
|
||||
|
||||
|
@ -175,7 +177,6 @@ class ClientConnection;
|
|||
using ClientHandler = std::function<void(ClientConnection &)>;
|
||||
using MessageHandler = std::function<void(
|
||||
std::shared_ptr<ClientConnection>, int64_t, const std::vector<uint8_t> &)>;
|
||||
static std::vector<uint8_t> _dummy_error_message_data;
|
||||
|
||||
/// \typename ClientConnection
|
||||
///
|
||||
|
@ -196,7 +197,6 @@ class ClientConnection : public ServerConnection {
|
|||
/// \param message_type_enum_names A table of printable enum names for the
|
||||
/// message types received from this client, used for debug messages.
|
||||
/// \param error_message_type the type of error message
|
||||
/// \param error_message_data the companion data to the error message type.
|
||||
/// \return std::shared_ptr<ClientConnection>.
|
||||
static std::shared_ptr<ClientConnection> Create(
|
||||
ClientHandler &new_client_handler,
|
||||
|
@ -204,8 +204,7 @@ class ClientConnection : public ServerConnection {
|
|||
local_stream_socket &&socket,
|
||||
const std::string &debug_label,
|
||||
const std::vector<std::string> &message_type_enum_names,
|
||||
int64_t error_message_type,
|
||||
const std::vector<uint8_t> &error_message_data = _dummy_error_message_data);
|
||||
int64_t error_message_type);
|
||||
|
||||
std::shared_ptr<ClientConnection> shared_ClientConnection_from_this() {
|
||||
return std::static_pointer_cast<ClientConnection>(shared_from_this());
|
||||
|
@ -221,13 +220,11 @@ class ClientConnection : public ServerConnection {
|
|||
|
||||
protected:
|
||||
/// A protected constructor for a node client connection.
|
||||
ClientConnection(
|
||||
MessageHandler &message_handler,
|
||||
local_stream_socket &&socket,
|
||||
const std::string &debug_label,
|
||||
const std::vector<std::string> &message_type_enum_names,
|
||||
int64_t error_message_type,
|
||||
const std::vector<uint8_t> &error_message_data = _dummy_error_message_data);
|
||||
ClientConnection(MessageHandler &message_handler,
|
||||
local_stream_socket &&socket,
|
||||
const std::string &debug_label,
|
||||
const std::vector<std::string> &message_type_enum_names,
|
||||
int64_t error_message_type);
|
||||
/// Process an error from the last operation, then process the message
|
||||
/// header from the client.
|
||||
void ProcessMessageHeader(const boost::system::error_code &error);
|
||||
|
@ -257,8 +254,6 @@ class ClientConnection : public ServerConnection {
|
|||
const std::vector<std::string> message_type_enum_names_;
|
||||
/// The value for disconnect client message.
|
||||
int64_t error_message_type_;
|
||||
/// The data for disconnect client message.
|
||||
std::vector<uint8_t> error_message_data_;
|
||||
/// Buffers for the current message being read from the client.
|
||||
int64_t read_cookie_;
|
||||
int64_t read_type_;
|
||||
|
|
|
@ -171,17 +171,16 @@ class RAY_EXPORT Status {
|
|||
return Status(StatusCode::Interrupted, msg);
|
||||
}
|
||||
|
||||
static Status IntentionalSystemExit() {
|
||||
return Status(StatusCode::IntentionalSystemExit, "intentional system exit");
|
||||
static Status IntentionalSystemExit(const std::string &msg) {
|
||||
return Status(StatusCode::IntentionalSystemExit, msg);
|
||||
}
|
||||
|
||||
static Status UnexpectedSystemExit() {
|
||||
return Status(StatusCode::UnexpectedSystemExit, "user code caused exit");
|
||||
static Status UnexpectedSystemExit(const std::string &msg) {
|
||||
return Status(StatusCode::UnexpectedSystemExit, msg);
|
||||
}
|
||||
|
||||
static Status CreationTaskError() {
|
||||
return Status(StatusCode::CreationTaskError,
|
||||
"error raised in creation task, cause worker to exit");
|
||||
static Status CreationTaskError(const std::string &msg) {
|
||||
return Status(StatusCode::CreationTaskError, msg);
|
||||
}
|
||||
|
||||
static Status NotFound(const std::string &msg) {
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
#include "ray/core_worker/context.h"
|
||||
#include "ray/core_worker/transport/direct_actor_transport.h"
|
||||
#include "ray/gcs/gcs_client/gcs_client.h"
|
||||
#include "ray/gcs/pb_util.h"
|
||||
#include "ray/stats/stats.h"
|
||||
#include "ray/util/event.h"
|
||||
#include "ray/util/util.h"
|
||||
|
@ -136,7 +137,6 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
|
|||
// Avoid using FATAL log or RAY_CHECK here because they may create a core dump file.
|
||||
RAY_LOG(ERROR) << "Failed to register worker " << worker_id << " to Raylet. "
|
||||
<< raylet_client_status;
|
||||
// Quit the process immediately.
|
||||
QuickExit();
|
||||
}
|
||||
|
||||
|
@ -332,7 +332,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
|
|||
GetCallerId(),
|
||||
rpc_address_);
|
||||
// Drivers are never re-executed.
|
||||
SetCurrentTaskId(task_id, /*attempt_number=*/0);
|
||||
SetCurrentTaskId(task_id, /*attempt_number=*/0, "driver");
|
||||
}
|
||||
|
||||
auto raylet_client_factory = [this](const std::string ip_address, int port) {
|
||||
|
@ -576,20 +576,22 @@ void CoreWorker::ConnectToRaylet() {
|
|||
}
|
||||
|
||||
void CoreWorker::Disconnect(
|
||||
rpc::WorkerExitType exit_type,
|
||||
const rpc::WorkerExitType &exit_type,
|
||||
const std::string &exit_detail,
|
||||
const std::shared_ptr<LocalMemoryBuffer> &creation_task_exception_pb_bytes) {
|
||||
if (connected_) {
|
||||
RAY_LOG(INFO) << "Disconnecting to the raylet.";
|
||||
connected_ = false;
|
||||
if (local_raylet_client_) {
|
||||
RAY_IGNORE_EXPR(
|
||||
local_raylet_client_->Disconnect(exit_type, creation_task_exception_pb_bytes));
|
||||
RAY_IGNORE_EXPR(local_raylet_client_->Disconnect(
|
||||
exit_type, exit_detail, creation_task_exception_pb_bytes));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void CoreWorker::Exit(
|
||||
rpc::WorkerExitType exit_type,
|
||||
const rpc::WorkerExitType exit_type,
|
||||
const std::string &detail,
|
||||
const std::shared_ptr<LocalMemoryBuffer> &creation_task_exception_pb_bytes) {
|
||||
RAY_LOG(INFO) << "Exit signal received, this process will exit after all outstanding "
|
||||
"tasks have finished"
|
||||
|
@ -604,20 +606,17 @@ void CoreWorker::Exit(
|
|||
/// otherwise the frontend code may not release its references and this worker will be
|
||||
/// leaked. See https://github.com/ray-project/ray/issues/19639.
|
||||
reference_counter_->ReleaseAllLocalReferences();
|
||||
|
||||
// Callback to shutdown.
|
||||
auto shutdown = [this, exit_type, creation_task_exception_pb_bytes]() {
|
||||
auto shutdown = [this, exit_type, detail, creation_task_exception_pb_bytes]() {
|
||||
// To avoid problems, make sure shutdown is always called from the same
|
||||
// event loop each time.
|
||||
task_execution_service_.post(
|
||||
[this, exit_type, creation_task_exception_pb_bytes]() {
|
||||
if (exit_type == rpc::WorkerExitType::CREATION_TASK_ERROR ||
|
||||
exit_type == rpc::WorkerExitType::INTENDED_EXIT ||
|
||||
exit_type == rpc::WorkerExitType::IDLE_EXIT) {
|
||||
// Notify the raylet about this exit.
|
||||
// Only CREATION_TASK_ERROR and INTENDED_EXIT needs to disconnect
|
||||
// manually.
|
||||
Disconnect(exit_type, creation_task_exception_pb_bytes);
|
||||
}
|
||||
[this,
|
||||
exit_type,
|
||||
detail = std::move(detail),
|
||||
creation_task_exception_pb_bytes]() {
|
||||
Disconnect(exit_type, detail, creation_task_exception_pb_bytes);
|
||||
Shutdown();
|
||||
},
|
||||
"CoreWorker.Shutdown");
|
||||
|
@ -656,6 +655,18 @@ void CoreWorker::Exit(
|
|||
|
||||
task_manager_->DrainAndShutdown(drain_references_callback);
|
||||
}
|
||||
|
||||
void CoreWorker::ForceExit(const rpc::WorkerExitType exit_type,
|
||||
const std::string &detail) {
|
||||
RAY_LOG(WARNING) << "Force exit the process. "
|
||||
<< " Details: " << detail;
|
||||
Disconnect(exit_type, detail);
|
||||
// NOTE(hchen): Use `QuickExit()` to force-exit this process without doing cleanup.
|
||||
// `exit()` will destruct static objects in an incorrect order, which will lead to
|
||||
// core dumps.
|
||||
QuickExit();
|
||||
}
|
||||
|
||||
void CoreWorker::RunIOService() {
|
||||
#ifndef _WIN32
|
||||
// Block SIGINT and SIGTERM so they will be handled by the main thread.
|
||||
|
@ -682,11 +693,14 @@ void CoreWorker::OnNodeRemoved(const NodeID &node_id) {
|
|||
|
||||
const WorkerID &CoreWorker::GetWorkerID() const { return worker_context_.GetWorkerID(); }
|
||||
|
||||
void CoreWorker::SetCurrentTaskId(const TaskID &task_id, uint64_t attempt_number) {
|
||||
void CoreWorker::SetCurrentTaskId(const TaskID &task_id,
|
||||
uint64_t attempt_number,
|
||||
const std::string &task_name) {
|
||||
worker_context_.SetCurrentTaskId(task_id, attempt_number);
|
||||
{
|
||||
absl::MutexLock lock(&mutex_);
|
||||
main_thread_task_id_ = task_id;
|
||||
main_thread_task_name_ = task_name;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -720,6 +734,7 @@ void CoreWorker::RegisterToGcs() {
|
|||
worker_data->set_worker_type(options_.worker_type);
|
||||
worker_data->mutable_worker_info()->insert(worker_info.begin(), worker_info.end());
|
||||
worker_data->set_is_alive(true);
|
||||
worker_data->set_pid(getpid());
|
||||
|
||||
RAY_CHECK_OK(gcs_client_->Workers().AsyncAdd(worker_data, nullptr));
|
||||
}
|
||||
|
@ -2256,7 +2271,7 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
|
|||
|
||||
if (!options_.is_local_mode) {
|
||||
worker_context_.SetCurrentTask(task_spec);
|
||||
SetCurrentTaskId(task_spec.TaskId(), task_spec.AttemptNumber());
|
||||
SetCurrentTaskId(task_spec.TaskId(), task_spec.AttemptNumber(), task_spec.GetName());
|
||||
}
|
||||
{
|
||||
absl::MutexLock lock(&mutex_);
|
||||
|
@ -2358,7 +2373,7 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
|
|||
}
|
||||
|
||||
if (!options_.is_local_mode) {
|
||||
SetCurrentTaskId(TaskID::Nil(), /*attempt_number=*/0);
|
||||
SetCurrentTaskId(TaskID::Nil(), /*attempt_number=*/0, "");
|
||||
worker_context_.ResetCurrentTask();
|
||||
}
|
||||
{
|
||||
|
@ -2380,12 +2395,24 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
|
|||
|
||||
RAY_LOG(DEBUG) << "Finished executing task " << task_spec.TaskId()
|
||||
<< ", status=" << status;
|
||||
|
||||
std::ostringstream stream;
|
||||
if (status.IsCreationTaskError()) {
|
||||
Exit(rpc::WorkerExitType::CREATION_TASK_ERROR, creation_task_exception_pb_bytes);
|
||||
Exit(rpc::WorkerExitType::USER_ERROR,
|
||||
absl::StrCat(
|
||||
"Worker exits because there was an exception in the initialization method "
|
||||
"(e.g., __init__). Fix the exceptions from the initialization to resolve "
|
||||
"the issue. ",
|
||||
status.message()),
|
||||
creation_task_exception_pb_bytes);
|
||||
} else if (status.IsIntentionalSystemExit()) {
|
||||
Exit(rpc::WorkerExitType::INTENDED_EXIT, creation_task_exception_pb_bytes);
|
||||
Exit(rpc::WorkerExitType::INTENDED_USER_EXIT,
|
||||
absl::StrCat("Worker exits by an user request. ", status.message()),
|
||||
creation_task_exception_pb_bytes);
|
||||
} else if (status.IsUnexpectedSystemExit()) {
|
||||
Exit(rpc::WorkerExitType::SYSTEM_ERROR_EXIT, creation_task_exception_pb_bytes);
|
||||
Exit(rpc::WorkerExitType::SYSTEM_ERROR,
|
||||
absl::StrCat("Worker exits unexpectedly. ", status.message()),
|
||||
creation_task_exception_pb_bytes);
|
||||
} else if (!status.ok()) {
|
||||
RAY_LOG(FATAL) << "Unexpected task status type : " << status;
|
||||
}
|
||||
|
@ -3027,7 +3054,8 @@ void CoreWorker::HandleCancelTask(const rpc::CancelTaskRequest &request,
|
|||
|
||||
// Try non-force kill
|
||||
if (requested_task_running && !request.force_kill()) {
|
||||
RAY_LOG(INFO) << "Cancelling a running task " << main_thread_task_id_;
|
||||
RAY_LOG(INFO) << "Cancelling a running task " << main_thread_task_name_
|
||||
<< " thread id: " << main_thread_task_id_;
|
||||
success = options_.kill_main();
|
||||
} else if (!requested_task_running) {
|
||||
RAY_LOG(INFO) << "Cancelling a task " << task_id
|
||||
|
@ -3053,14 +3081,10 @@ void CoreWorker::HandleCancelTask(const rpc::CancelTaskRequest &request,
|
|||
|
||||
// Do force kill after reply callback sent
|
||||
if (requested_task_running && request.force_kill()) {
|
||||
RAY_LOG(INFO) << "A task " << main_thread_task_id_
|
||||
<< " has received a force kill request after the cancellation. Killing "
|
||||
"a worker...";
|
||||
Disconnect();
|
||||
// NOTE(hchen): Use `QuickExit()` to force-exit this process without doing cleanup.
|
||||
// `exit()` will destruct static objects in an incorrect order, which will lead to
|
||||
// core dumps.
|
||||
QuickExit();
|
||||
ForceExit(rpc::WorkerExitType::INTENDED_USER_EXIT,
|
||||
absl::StrCat("The worker exits because the task ",
|
||||
main_thread_task_name_,
|
||||
" has received a force ray.cancel request."));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3073,23 +3097,25 @@ void CoreWorker::HandleKillActor(const rpc::KillActorRequest &request,
|
|||
stream << "Mismatched ActorID: ignoring KillActor for previous actor "
|
||||
<< intended_actor_id
|
||||
<< ", current actor ID: " << worker_context_.GetCurrentActorID();
|
||||
auto msg = stream.str();
|
||||
const auto &msg = stream.str();
|
||||
RAY_LOG(ERROR) << msg;
|
||||
send_reply_callback(Status::Invalid(msg), nullptr, nullptr);
|
||||
return;
|
||||
}
|
||||
|
||||
const auto &kill_actor_reason =
|
||||
gcs::GenErrorMessageFromDeathCause(request.death_cause());
|
||||
|
||||
if (request.force_kill()) {
|
||||
RAY_LOG(INFO) << "Force kill actor request has received. exiting immediately...";
|
||||
if (request.no_restart()) {
|
||||
Disconnect();
|
||||
}
|
||||
// NOTE(hchen): Use `QuickExit()` to force-exit this process without doing cleanup.
|
||||
// `exit()` will destruct static objects in an incorrect order, which will lead to
|
||||
// core dumps.
|
||||
QuickExit();
|
||||
RAY_LOG(INFO) << "Force kill actor request has received. exiting immediately... "
|
||||
<< kill_actor_reason;
|
||||
// If we don't need to restart this actor, we notify raylet before force killing it.
|
||||
ForceExit(
|
||||
rpc::WorkerExitType::INTENDED_SYSTEM_EXIT,
|
||||
absl::StrCat("Worker exits because the actor is killed. ", kill_actor_reason));
|
||||
} else {
|
||||
Exit(rpc::WorkerExitType::INTENDED_EXIT);
|
||||
Exit(rpc::WorkerExitType::INTENDED_SYSTEM_EXIT,
|
||||
absl::StrCat("Worker exits because the actor is killed. ", kill_actor_reason));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3242,11 +3268,17 @@ void CoreWorker::HandleExit(const rpc::ExitRequest &request,
|
|||
[this, is_idle]() {
|
||||
// If the worker is idle, we exit.
|
||||
if (is_idle) {
|
||||
Exit(rpc::WorkerExitType::IDLE_EXIT);
|
||||
Exit(rpc::WorkerExitType::INTENDED_SYSTEM_EXIT,
|
||||
"Worker exits because it was idle (it doesn't have objects it owns while "
|
||||
"no task or actor has been scheduled) for a long time.");
|
||||
}
|
||||
},
|
||||
// We need to kill it regardless if the RPC failed.
|
||||
[this]() { Exit(rpc::WorkerExitType::INTENDED_EXIT); });
|
||||
[this]() {
|
||||
Exit(rpc::WorkerExitType::INTENDED_SYSTEM_EXIT,
|
||||
"Worker exits because it was idle (it doesn't have objects it owns while "
|
||||
"no task or actor has been scheduled) for a long time.");
|
||||
});
|
||||
}
|
||||
|
||||
void CoreWorker::HandleAssignObjectOwner(const rpc::AssignObjectOwnerRequest &request,
|
||||
|
|
|
@ -93,11 +93,16 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
|
|||
void ConnectToRaylet();
|
||||
|
||||
/// Gracefully disconnect the worker from Raylet.
|
||||
/// If this function is called during shutdown, Raylet will treat it as an intentional
|
||||
/// disconnect.
|
||||
/// Once the method is returned, it is guaranteed that raylet is
|
||||
/// notified that this worker is disconnected from a raylet.
|
||||
///
|
||||
/// \param exit_type The reason why this worker process is disconnected.
|
||||
/// \param exit_detail The detailed reason for a given exit.
|
||||
/// \param creation_task_exception_pb_bytes It is given when the worker is
|
||||
/// disconnected because the actor is failed due to its exception in its init method.
|
||||
/// \return Void.
|
||||
void Disconnect(rpc::WorkerExitType exit_type = rpc::WorkerExitType::INTENDED_EXIT,
|
||||
void Disconnect(const rpc::WorkerExitType &exit_type,
|
||||
const std::string &exit_detail,
|
||||
const std::shared_ptr<LocalMemoryBuffer>
|
||||
&creation_task_exception_pb_bytes = nullptr);
|
||||
|
||||
|
@ -848,19 +853,33 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
|
|||
int64_t depth,
|
||||
const std::string &serialized_runtime_env_info,
|
||||
const std::string &concurrency_group_name = "");
|
||||
void SetCurrentTaskId(const TaskID &task_id, uint64_t attempt_number);
|
||||
void SetCurrentTaskId(const TaskID &task_id,
|
||||
uint64_t attempt_number,
|
||||
const std::string &task_name);
|
||||
|
||||
void SetActorId(const ActorID &actor_id);
|
||||
|
||||
/// Run the io_service_ event loop. This should be called in a background thread.
|
||||
void RunIOService();
|
||||
|
||||
/// (WORKER mode only) Exit the worker. This is the entrypoint used to shutdown a
|
||||
/// worker.
|
||||
void Exit(rpc::WorkerExitType exit_type,
|
||||
/// (WORKER mode only) Gracefully exit the worker. `Graceful` means the worker will
|
||||
/// exit when it drains all tasks and cleans all owned objects.
|
||||
///
|
||||
/// \param exit_type The reason why this worker process is disconnected.
|
||||
/// \param exit_detail The detailed reason for a given exit.
|
||||
/// \param creation_task_exception_pb_bytes It is given when the worker is
|
||||
/// disconnected because the actor is failed due to its exception in its init method.
|
||||
void Exit(const rpc::WorkerExitType exit_type,
|
||||
const std::string &detail,
|
||||
const std::shared_ptr<LocalMemoryBuffer> &creation_task_exception_pb_bytes =
|
||||
nullptr);
|
||||
|
||||
/// Forcefully exit the worker. `Force` means it will exit actor without draining
|
||||
/// or cleaning any resources.
|
||||
/// \param exit_type The reason why this worker process is disconnected.
|
||||
/// \param exit_detail The detailed reason for a given exit.
|
||||
void ForceExit(const rpc::WorkerExitType exit_type, const std::string &detail);
|
||||
|
||||
/// Register this worker or driver to GCS.
|
||||
void RegisterToGcs();
|
||||
|
||||
|
@ -1065,6 +1084,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
|
|||
/// worker context.
|
||||
TaskID main_thread_task_id_ GUARDED_BY(mutex_);
|
||||
|
||||
std::string main_thread_task_name_ GUARDED_BY(mutex_);
|
||||
|
||||
/// Event loop where the IO events are handled. e.g. async GCS operations.
|
||||
instrumented_io_context io_service_;
|
||||
|
||||
|
|
|
@ -301,7 +301,8 @@ void CoreWorkerProcessImpl::ShutdownDriver() {
|
|||
<< "The `Shutdown` interface is for driver only.";
|
||||
auto global_worker = GetGlobalWorker();
|
||||
RAY_CHECK(global_worker);
|
||||
global_worker->Disconnect();
|
||||
global_worker->Disconnect(/*exit_type*/ rpc::WorkerExitType::INTENDED_USER_EXIT,
|
||||
/*exit_detail*/ "Shutdown by ray.shutdown().");
|
||||
global_worker->Shutdown();
|
||||
RemoveWorker(global_worker);
|
||||
}
|
||||
|
|
|
@ -168,10 +168,10 @@ Java_io_ray_runtime_RayNativeRuntime_nativeInitialize(JNIEnv *env,
|
|||
Status status_to_return = Status::OK();
|
||||
if (env->IsInstanceOf(throwable,
|
||||
java_ray_intentional_system_exit_exception_class)) {
|
||||
status_to_return = Status::IntentionalSystemExit();
|
||||
status_to_return = Status::IntentionalSystemExit("");
|
||||
} else if (env->IsInstanceOf(throwable, java_ray_actor_exception_class)) {
|
||||
creation_task_exception_pb = SerializeActorCreationException(env, throwable);
|
||||
status_to_return = Status::CreationTaskError();
|
||||
status_to_return = Status::CreationTaskError("");
|
||||
} else {
|
||||
RAY_LOG(ERROR) << "Unkown java exception was thrown while executing tasks.";
|
||||
}
|
||||
|
|
|
@ -55,19 +55,23 @@ const ray::rpc::ActorDeathCause GenNodeDiedCause(const ray::gcs::GcsActor *actor
|
|||
const ray::rpc::ActorDeathCause GenWorkerDiedCause(
|
||||
const ray::gcs::GcsActor *actor,
|
||||
const std::string &ip_address,
|
||||
const ray::rpc::WorkerExitType &disconnect_type) {
|
||||
const ray::rpc::WorkerExitType &disconnect_type,
|
||||
const std::string &disconnect_detail) {
|
||||
ray::rpc::ActorDeathCause death_cause;
|
||||
auto actor_died_error_ctx = death_cause.mutable_actor_died_error_context();
|
||||
AddActorInfo(actor, actor_died_error_ctx);
|
||||
actor_died_error_ctx->set_error_message(absl::StrCat(
|
||||
"The actor is dead because its worker process has died. Worker exit type: ",
|
||||
ray::rpc::WorkerExitType_Name(disconnect_type)));
|
||||
ray::rpc::WorkerExitType_Name(disconnect_type),
|
||||
" Worker exit detail: ",
|
||||
disconnect_detail));
|
||||
return death_cause;
|
||||
}
|
||||
const ray::rpc::ActorDeathCause GenOwnerDiedCause(
|
||||
const ray::gcs::GcsActor *actor,
|
||||
const WorkerID &owner_id,
|
||||
const ray::rpc::WorkerExitType disconnect_type,
|
||||
const std::string &disconnect_detail,
|
||||
const std::string owner_ip_address) {
|
||||
ray::rpc::ActorDeathCause death_cause;
|
||||
auto actor_died_error_ctx = death_cause.mutable_actor_died_error_context();
|
||||
|
@ -78,7 +82,9 @@ const ray::rpc::ActorDeathCause GenOwnerDiedCause(
|
|||
" Owner Ip address: ",
|
||||
owner_ip_address,
|
||||
" Owner worker exit type: ",
|
||||
ray::rpc::WorkerExitType_Name(disconnect_type)));
|
||||
ray::rpc::WorkerExitType_Name(disconnect_type),
|
||||
" Worker exit detail: ",
|
||||
disconnect_detail));
|
||||
return death_cause;
|
||||
}
|
||||
|
||||
|
@ -741,7 +747,7 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id,
|
|||
if (node_it != created_actors_.end() && node_it->second.count(worker_id)) {
|
||||
// The actor has already been created. Destroy the process by force-killing
|
||||
// it.
|
||||
NotifyCoreWorkerToKillActor(actor, force_kill);
|
||||
NotifyCoreWorkerToKillActor(actor, death_cause, force_kill);
|
||||
RAY_CHECK(node_it->second.erase(actor->GetWorkerID()));
|
||||
if (node_it->second.empty()) {
|
||||
created_actors_.erase(node_it);
|
||||
|
@ -806,13 +812,18 @@ absl::flat_hash_set<ActorID> GcsActorManager::GetUnresolvedActorsByOwnerWorker(
|
|||
|
||||
void GcsActorManager::OnWorkerDead(const ray::NodeID &node_id,
|
||||
const ray::WorkerID &worker_id) {
|
||||
OnWorkerDead(node_id, worker_id, "", rpc::WorkerExitType::SYSTEM_ERROR_EXIT);
|
||||
OnWorkerDead(node_id,
|
||||
worker_id,
|
||||
"",
|
||||
rpc::WorkerExitType::SYSTEM_ERROR,
|
||||
"Worker exits unexpectedly.");
|
||||
}
|
||||
|
||||
void GcsActorManager::OnWorkerDead(const ray::NodeID &node_id,
|
||||
const ray::WorkerID &worker_id,
|
||||
const std::string &worker_ip,
|
||||
const rpc::WorkerExitType disconnect_type,
|
||||
const std::string &disconnect_detail,
|
||||
const rpc::RayException *creation_task_exception) {
|
||||
std::string message = absl::StrCat("Worker ",
|
||||
worker_id.Hex(),
|
||||
|
@ -822,15 +833,15 @@ void GcsActorManager::OnWorkerDead(const ray::NodeID &node_id,
|
|||
rpc::WorkerExitType_Name(disconnect_type),
|
||||
", has creation_task_exception = ",
|
||||
(creation_task_exception != nullptr));
|
||||
if (disconnect_type == rpc::WorkerExitType::INTENDED_EXIT ||
|
||||
disconnect_type == rpc::WorkerExitType::IDLE_EXIT) {
|
||||
if (disconnect_type == rpc::WorkerExitType::INTENDED_USER_EXIT ||
|
||||
disconnect_type == rpc::WorkerExitType::INTENDED_SYSTEM_EXIT) {
|
||||
RAY_LOG(DEBUG) << message;
|
||||
} else {
|
||||
RAY_LOG(WARNING) << message;
|
||||
}
|
||||
|
||||
bool need_reconstruct = disconnect_type != rpc::WorkerExitType::INTENDED_EXIT &&
|
||||
disconnect_type != rpc::WorkerExitType::CREATION_TASK_ERROR;
|
||||
bool need_reconstruct = disconnect_type != rpc::WorkerExitType::INTENDED_USER_EXIT &&
|
||||
disconnect_type != rpc::WorkerExitType::USER_ERROR;
|
||||
// Destroy all actors that are owned by this worker.
|
||||
const auto it = owners_.find(node_id);
|
||||
if (it != owners_.end() && it->second.count(worker_id)) {
|
||||
|
@ -839,9 +850,12 @@ void GcsActorManager::OnWorkerDead(const ray::NodeID &node_id,
|
|||
// list.
|
||||
const auto children_ids = owner->second.children_actor_ids;
|
||||
for (const auto &child_id : children_ids) {
|
||||
DestroyActor(
|
||||
child_id,
|
||||
GenOwnerDiedCause(GetActor(child_id), worker_id, disconnect_type, worker_ip));
|
||||
DestroyActor(child_id,
|
||||
GenOwnerDiedCause(GetActor(child_id),
|
||||
worker_id,
|
||||
disconnect_type,
|
||||
"Owner's worker process has crashed.",
|
||||
worker_ip));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -851,9 +865,12 @@ void GcsActorManager::OnWorkerDead(const ray::NodeID &node_id,
|
|||
auto unresolved_actors = GetUnresolvedActorsByOwnerWorker(node_id, worker_id);
|
||||
for (auto &actor_id : unresolved_actors) {
|
||||
if (registered_actors_.count(actor_id)) {
|
||||
DestroyActor(
|
||||
actor_id,
|
||||
GenOwnerDiedCause(GetActor(actor_id), worker_id, disconnect_type, worker_ip));
|
||||
DestroyActor(actor_id,
|
||||
GenOwnerDiedCause(GetActor(actor_id),
|
||||
worker_id,
|
||||
disconnect_type,
|
||||
"Owner's worker process has crashed.",
|
||||
worker_ip));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -887,7 +904,8 @@ void GcsActorManager::OnWorkerDead(const ray::NodeID &node_id,
|
|||
death_cause.mutable_creation_task_failure_context()->CopyFrom(
|
||||
*creation_task_exception);
|
||||
} else {
|
||||
death_cause = GenWorkerDiedCause(GetActor(actor_id), worker_ip, disconnect_type);
|
||||
death_cause = GenWorkerDiedCause(
|
||||
GetActor(actor_id), worker_ip, disconnect_type, disconnect_detail);
|
||||
}
|
||||
// Otherwise, try to reconstruct the actor that was already created or in the creation
|
||||
// process.
|
||||
|
@ -911,7 +929,8 @@ void GcsActorManager::OnNodeDead(const NodeID &node_id,
|
|||
DestroyActor(child_id,
|
||||
GenOwnerDiedCause(GetActor(child_id),
|
||||
owner_id,
|
||||
rpc::WorkerExitType::NODE_DIED,
|
||||
rpc::WorkerExitType::SYSTEM_ERROR,
|
||||
"Owner's node has crashed.",
|
||||
node_ip_address));
|
||||
}
|
||||
}
|
||||
|
@ -949,7 +968,8 @@ void GcsActorManager::OnNodeDead(const NodeID &node_id,
|
|||
DestroyActor(actor_id,
|
||||
GenOwnerDiedCause(GetActor(actor_id),
|
||||
owner_id,
|
||||
rpc::WorkerExitType::NODE_DIED,
|
||||
rpc::WorkerExitType::SYSTEM_ERROR,
|
||||
"Owner's node has crashed.",
|
||||
node_ip_address));
|
||||
}
|
||||
}
|
||||
|
@ -1317,10 +1337,12 @@ void GcsActorManager::RemoveActorFromOwner(const std::shared_ptr<GcsActor> &acto
|
|||
}
|
||||
|
||||
void GcsActorManager::NotifyCoreWorkerToKillActor(const std::shared_ptr<GcsActor> &actor,
|
||||
const rpc::ActorDeathCause &death_cause,
|
||||
bool force_kill,
|
||||
bool no_restart) {
|
||||
rpc::KillActorRequest request;
|
||||
request.set_intended_actor_id(actor->GetActorID().Binary());
|
||||
request.mutable_death_cause()->CopyFrom(death_cause);
|
||||
request.set_force_kill(force_kill);
|
||||
request.set_no_restart(no_restart);
|
||||
auto actor_client = worker_client_factory_(actor->GetAddress());
|
||||
|
@ -1355,7 +1377,8 @@ void GcsActorManager::KillActor(const ActorID &actor_id,
|
|||
if (node_it != created_actors_.end() && node_it->second.count(worker_id)) {
|
||||
// The actor has already been created. Destroy the process by force-killing
|
||||
// it.
|
||||
NotifyCoreWorkerToKillActor(actor, force_kill, no_restart);
|
||||
NotifyCoreWorkerToKillActor(
|
||||
actor, GenKilledByApplicationCause(GetActor(actor_id)), force_kill, no_restart);
|
||||
} else {
|
||||
const auto &task_id = actor->GetCreationTaskSpecification().TaskId();
|
||||
RAY_LOG(DEBUG) << "The actor " << actor->GetActorID()
|
||||
|
|
|
@ -329,6 +329,7 @@ class GcsActorManager : public rpc::ActorInfoHandler {
|
|||
const WorkerID &worker_id,
|
||||
const std::string &worker_ip,
|
||||
const rpc::WorkerExitType disconnect_type,
|
||||
const std::string &disconnect_detail,
|
||||
const rpc::RayException *creation_task_exception = nullptr);
|
||||
|
||||
/// Testing only.
|
||||
|
@ -453,9 +454,11 @@ class GcsActorManager : public rpc::ActorInfoHandler {
|
|||
/// Notify CoreWorker to kill the specified actor.
|
||||
///
|
||||
/// \param actor The actor to be killed.
|
||||
/// \param death_cause Context about why this actor is dead.
|
||||
/// \param force_kill Whether to force kill an actor by killing the worker.
|
||||
/// \param no_restart If set to true, the killed actor will not be restarted anymore.
|
||||
void NotifyCoreWorkerToKillActor(const std::shared_ptr<GcsActor> &actor,
|
||||
const rpc::ActorDeathCause &death_cause,
|
||||
bool force_kill = true,
|
||||
bool no_restart = true);
|
||||
|
||||
|
|
|
@ -636,6 +636,7 @@ void GcsServer::InstallEventListeners() {
|
|||
worker_id,
|
||||
worker_ip,
|
||||
worker_failure_data->exit_type(),
|
||||
worker_failure_data->exit_detail(),
|
||||
creation_task_exception);
|
||||
});
|
||||
|
||||
|
|
|
@ -35,9 +35,10 @@ void GcsWorkerManager::HandleReportWorkerFailure(
|
|||
worker_address.ip_address(),
|
||||
", exit_type = ",
|
||||
rpc::WorkerExitType_Name(request.worker_failure().exit_type()),
|
||||
request.worker_failure().has_creation_task_exception());
|
||||
if (request.worker_failure().exit_type() == rpc::WorkerExitType::INTENDED_EXIT ||
|
||||
request.worker_failure().exit_type() == rpc::WorkerExitType::IDLE_EXIT) {
|
||||
", exit_detail = ",
|
||||
request.worker_failure().exit_detail());
|
||||
if (request.worker_failure().exit_type() == rpc::WorkerExitType::INTENDED_USER_EXIT ||
|
||||
request.worker_failure().exit_type() == rpc::WorkerExitType::INTENDED_SYSTEM_EXIT) {
|
||||
RAY_LOG(DEBUG) << message;
|
||||
} else {
|
||||
RAY_LOG(WARNING) << message
|
||||
|
|
|
@ -108,6 +108,8 @@ inline std::shared_ptr<ray::rpc::WorkerTableData> CreateWorkerFailureData(
|
|||
int32_t port,
|
||||
int64_t timestamp,
|
||||
rpc::WorkerExitType disconnect_type,
|
||||
const std::string &disconnect_detail,
|
||||
int pid,
|
||||
const rpc::RayException *creation_task_exception = nullptr) {
|
||||
auto worker_failure_info_ptr = std::make_shared<ray::rpc::WorkerTableData>();
|
||||
worker_failure_info_ptr->mutable_worker_address()->set_raylet_id(raylet_id.Binary());
|
||||
|
@ -116,6 +118,8 @@ inline std::shared_ptr<ray::rpc::WorkerTableData> CreateWorkerFailureData(
|
|||
worker_failure_info_ptr->mutable_worker_address()->set_port(port);
|
||||
worker_failure_info_ptr->set_timestamp(timestamp);
|
||||
worker_failure_info_ptr->set_exit_type(disconnect_type);
|
||||
worker_failure_info_ptr->set_exit_detail(disconnect_detail);
|
||||
worker_failure_info_ptr->set_pid(pid);
|
||||
if (creation_task_exception != nullptr) {
|
||||
// this pointer will be freed by protobuf internal codes
|
||||
auto copied_data = new rpc::RayException(*creation_task_exception);
|
||||
|
@ -185,6 +189,21 @@ inline rpc::RayErrorInfo GetErrorInfoFromActorDeathCause(
|
|||
return error_info;
|
||||
}
|
||||
|
||||
/// Generate object error type from ActorDeathCause.
|
||||
inline std::string GenErrorMessageFromDeathCause(
|
||||
const rpc::ActorDeathCause &death_cause) {
|
||||
if (death_cause.context_case() == ContextCase::kCreationTaskFailureContext) {
|
||||
return death_cause.creation_task_failure_context().formatted_exception_string();
|
||||
} else if (death_cause.context_case() == ContextCase::kRuntimeEnvFailedContext) {
|
||||
return death_cause.runtime_env_failed_context().error_message();
|
||||
} else if (death_cause.context_case() == ContextCase::kActorUnschedulableContext) {
|
||||
return death_cause.actor_unschedulable_context().error_message();
|
||||
} else {
|
||||
RAY_CHECK(death_cause.context_case() == ContextCase::kActorDiedErrorContext);
|
||||
return death_cause.actor_died_error_context().error_message();
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace gcs
|
||||
|
||||
} // namespace ray
|
||||
|
|
|
@ -632,19 +632,16 @@ message MetricPoint {
|
|||
// Type of a worker exit.
|
||||
enum WorkerExitType {
|
||||
// Worker exit due to system level failures (i.e. worker crash).
|
||||
SYSTEM_ERROR_EXIT = 0;
|
||||
// Intended, initiated worker exit via raylet API.
|
||||
INTENDED_EXIT = 1;
|
||||
// Worker exit due to resource bundle release.
|
||||
UNUSED_RESOURCE_RELEASED = 2;
|
||||
// Worker exit due to placement group removal.
|
||||
PLACEMENT_GROUP_REMOVED = 3;
|
||||
// Worker exit due to exceptions in creation task.
|
||||
CREATION_TASK_ERROR = 4;
|
||||
// Worker killed by raylet if it has been idle for too long.
|
||||
IDLE_EXIT = 5;
|
||||
// Worker killed because its node is dead.
|
||||
NODE_DIED = 6;
|
||||
SYSTEM_ERROR = 0;
|
||||
// System-level exit that is intended. E.g.,
|
||||
// Workers that are killed because they are idle for a long time.
|
||||
INTENDED_SYSTEM_EXIT = 1;
|
||||
// Worker exits because of user error.
|
||||
// E.g., execptions from the actor initialization.
|
||||
USER_ERROR = 2;
|
||||
// Intended exit from users (e.g., users exit workers with exit code 0
|
||||
// or exit initated by Ray API such as ray.kill).
|
||||
INTENDED_USER_EXIT = 3;
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -230,6 +230,8 @@ message KillActorRequest {
|
|||
bool force_kill = 2;
|
||||
// If set to true, the killed actor will not be restarted anymore.
|
||||
bool no_restart = 3;
|
||||
// The precise reason why this actor receives a kill request.
|
||||
ActorDeathCause death_cause = 4;
|
||||
}
|
||||
|
||||
message KillActorReply {
|
||||
|
|
|
@ -304,8 +304,6 @@ message WorkerTableData {
|
|||
Address worker_address = 2;
|
||||
// The UNIX timestamp at which this worker's state was updated.
|
||||
int64 timestamp = 3;
|
||||
// Whether it's an intentional disconnect, only applies then `is_alive` is false.
|
||||
WorkerExitType exit_type = 4;
|
||||
// Type of this worker.
|
||||
WorkerType worker_type = 5;
|
||||
// This is for AddWorker.
|
||||
|
@ -313,6 +311,12 @@ message WorkerTableData {
|
|||
// The exception thrown in creation task. This field is set if this worker died because
|
||||
// of exception thrown in actor's creation task. Only applies when is_alive=false.
|
||||
RayException creation_task_exception = 18;
|
||||
// Whether it's an intentional disconnect, only applies then `is_alive` is false.
|
||||
optional WorkerExitType exit_type = 19;
|
||||
// The detailed message about worker exit.
|
||||
optional string exit_detail = 20;
|
||||
// pid of the worker process.
|
||||
uint32 pid = 21;
|
||||
}
|
||||
|
||||
// Fields to publish when worker fails.
|
||||
|
|
|
@ -93,6 +93,7 @@ table ResourceIdSetInfo {
|
|||
// This message is sent from a worker to the node manager.
|
||||
table DisconnectClient {
|
||||
disconnect_type: int;
|
||||
disconnect_detail: string;
|
||||
// Creation task exception serialized by protobuf.
|
||||
// Contains a RayException defined in common.pb
|
||||
creation_task_exception_pb: [ubyte];
|
||||
|
|
|
@ -578,11 +578,12 @@ void NodeManager::KillWorker(std::shared_ptr<WorkerInterface> worker) {
|
|||
}
|
||||
|
||||
void NodeManager::DestroyWorker(std::shared_ptr<WorkerInterface> worker,
|
||||
rpc::WorkerExitType disconnect_type) {
|
||||
rpc::WorkerExitType disconnect_type,
|
||||
const std::string &disconnect_detail) {
|
||||
// We should disconnect the client first. Otherwise, we'll remove bundle resources
|
||||
// before actual resources are returned. Subsequent disconnect request that comes
|
||||
// due to worker dead will be ignored.
|
||||
DisconnectClient(worker->Connection(), disconnect_type);
|
||||
DisconnectClient(worker->Connection(), disconnect_type, disconnect_detail);
|
||||
worker->MarkDead();
|
||||
KillWorker(worker);
|
||||
}
|
||||
|
@ -709,7 +710,10 @@ void NodeManager::HandleReleaseUnusedBundles(
|
|||
<< ", task id: " << worker->GetAssignedTaskId()
|
||||
<< ", actor id: " << worker->GetActorId()
|
||||
<< ", worker id: " << worker->WorkerId();
|
||||
DestroyWorker(worker, rpc::WorkerExitType::UNUSED_RESOURCE_RELEASED);
|
||||
DestroyWorker(worker,
|
||||
rpc::WorkerExitType::INTENDED_SYSTEM_EXIT,
|
||||
"Worker exits because it uses placement group bundles that are not "
|
||||
"registered to GCS. It can happen upon GCS restart.");
|
||||
}
|
||||
|
||||
// Return unused bundle resources.
|
||||
|
@ -1247,7 +1251,10 @@ void NodeManager::ProcessRegisterClientRequestMessage(
|
|||
fbb.GetBufferPointer(),
|
||||
[this, client](const ray::Status &status) {
|
||||
if (!status.ok()) {
|
||||
DisconnectClient(client);
|
||||
DisconnectClient(client,
|
||||
rpc::WorkerExitType::SYSTEM_ERROR,
|
||||
"Worker is failed because the raylet couldn't reply the "
|
||||
"registration request.");
|
||||
}
|
||||
});
|
||||
};
|
||||
|
@ -1351,6 +1358,7 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr<WorkerInterface> &
|
|||
|
||||
void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &client,
|
||||
rpc::WorkerExitType disconnect_type,
|
||||
const std::string &disconnect_detail,
|
||||
const rpc::RayException *creation_task_exception) {
|
||||
RAY_LOG(INFO) << "NodeManager::DisconnectClient, disconnect_type=" << disconnect_type
|
||||
<< ", has creation task exception = "
|
||||
|
@ -1385,13 +1393,16 @@ void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &clie
|
|||
<< ", worker_id: " << worker->WorkerId();
|
||||
}
|
||||
// Publish the worker failure.
|
||||
auto worker_failure_data_ptr = gcs::CreateWorkerFailureData(self_node_id_,
|
||||
worker->WorkerId(),
|
||||
worker->IpAddress(),
|
||||
worker->Port(),
|
||||
time(nullptr),
|
||||
disconnect_type,
|
||||
creation_task_exception);
|
||||
auto worker_failure_data_ptr =
|
||||
gcs::CreateWorkerFailureData(self_node_id_,
|
||||
worker->WorkerId(),
|
||||
worker->IpAddress(),
|
||||
worker->Port(),
|
||||
time(nullptr),
|
||||
disconnect_type,
|
||||
disconnect_detail,
|
||||
worker->GetProcess().GetId(),
|
||||
creation_task_exception);
|
||||
RAY_CHECK_OK(
|
||||
gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, nullptr));
|
||||
|
||||
|
@ -1408,7 +1419,7 @@ void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &clie
|
|||
local_task_manager_->TaskFinished(worker, &task);
|
||||
}
|
||||
|
||||
if (disconnect_type == rpc::WorkerExitType::SYSTEM_ERROR_EXIT) {
|
||||
if (disconnect_type == rpc::WorkerExitType::SYSTEM_ERROR) {
|
||||
// Push the error to driver.
|
||||
const JobID &job_id = worker->GetAssignedJobId();
|
||||
// TODO(rkn): Define this constant somewhere else.
|
||||
|
@ -1422,7 +1433,10 @@ void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &clie
|
|||
<< " Node ID: " << self_node_id_
|
||||
<< " Worker IP address: " << worker->IpAddress()
|
||||
<< " Worker port: " << worker->Port()
|
||||
<< " Worker PID: " << worker->GetProcess().GetId();
|
||||
<< " Worker PID: " << worker->GetProcess().GetId()
|
||||
<< " Worker exit type: "
|
||||
<< rpc::WorkerExitType_Name(disconnect_type)
|
||||
<< " Worker exit detail: " << disconnect_detail;
|
||||
std::string error_message_str = error_message.str();
|
||||
RAY_EVENT(ERROR, EL_RAY_WORKER_FAILURE)
|
||||
.WithField("worker_id", worker->WorkerId().Hex())
|
||||
|
@ -1455,7 +1469,7 @@ void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &clie
|
|||
RAY_LOG(INFO) << "Driver (pid=" << worker->GetProcess().GetId()
|
||||
<< ") is disconnected. "
|
||||
<< "job_id: " << worker->GetAssignedJobId();
|
||||
if (disconnect_type == rpc::WorkerExitType::SYSTEM_ERROR_EXIT) {
|
||||
if (disconnect_type == rpc::WorkerExitType::SYSTEM_ERROR) {
|
||||
RAY_EVENT(ERROR, EL_RAY_DRIVER_FAILURE)
|
||||
.WithField("node_id", self_node_id_.Hex())
|
||||
.WithField("job_id", worker->GetAssignedJobId().Hex())
|
||||
|
@ -1476,6 +1490,7 @@ void NodeManager::ProcessDisconnectClientMessage(
|
|||
const std::shared_ptr<ClientConnection> &client, const uint8_t *message_data) {
|
||||
auto message = flatbuffers::GetRoot<protocol::DisconnectClient>(message_data);
|
||||
auto disconnect_type = static_cast<rpc::WorkerExitType>(message->disconnect_type());
|
||||
const auto &disconnect_detail = message->disconnect_detail()->str();
|
||||
const flatbuffers::Vector<uint8_t> *exception_pb =
|
||||
message->creation_task_exception_pb();
|
||||
|
||||
|
@ -1485,7 +1500,8 @@ void NodeManager::ProcessDisconnectClientMessage(
|
|||
creation_task_exception->ParseFromString(std::string(
|
||||
reinterpret_cast<const char *>(exception_pb->data()), exception_pb->size()));
|
||||
}
|
||||
DisconnectClient(client, disconnect_type, creation_task_exception.get());
|
||||
DisconnectClient(
|
||||
client, disconnect_type, disconnect_detail, creation_task_exception.get());
|
||||
}
|
||||
|
||||
void NodeManager::ProcessFetchOrReconstructMessage(
|
||||
|
@ -1583,7 +1599,10 @@ void NodeManager::ProcessWaitRequestMessage(
|
|||
}
|
||||
} else {
|
||||
// We failed to write to the client, so disconnect the client.
|
||||
DisconnectClient(client);
|
||||
std::ostringstream stream;
|
||||
stream << "Failed to write WaitReply to the client. Status " << status
|
||||
<< ", message: " << status.message();
|
||||
DisconnectClient(client, rpc::WorkerExitType::SYSTEM_ERROR, stream.str());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -1840,14 +1859,17 @@ void NodeManager::HandleCancelResourceReserve(
|
|||
}
|
||||
}
|
||||
for (const auto &worker : workers_associated_with_pg) {
|
||||
RAY_LOG(DEBUG)
|
||||
std::ostringstream stream;
|
||||
stream
|
||||
<< "Destroying worker since its placement group was removed. Placement group id: "
|
||||
<< worker->GetBundleId().first
|
||||
<< ", bundle index: " << bundle_spec.BundleId().second
|
||||
<< ", task id: " << worker->GetAssignedTaskId()
|
||||
<< ", actor id: " << worker->GetActorId()
|
||||
<< ", worker id: " << worker->WorkerId();
|
||||
DestroyWorker(worker, rpc::WorkerExitType::PLACEMENT_GROUP_REMOVED);
|
||||
const auto &message = stream.str();
|
||||
RAY_LOG(DEBUG) << message;
|
||||
DestroyWorker(worker, rpc::WorkerExitType::INTENDED_SYSTEM_EXIT, message);
|
||||
}
|
||||
|
||||
// Return bundle resources.
|
||||
|
@ -1873,7 +1895,10 @@ void NodeManager::HandleReturnWorker(const rpc::ReturnWorkerRequest &request,
|
|||
if (worker) {
|
||||
if (request.disconnect_worker()) {
|
||||
// The worker should be destroyed.
|
||||
DisconnectClient(worker->Connection());
|
||||
DisconnectClient(worker->Connection(),
|
||||
rpc::WorkerExitType::SYSTEM_ERROR,
|
||||
"The leased worker has unrecoverable failure. Worker is requested "
|
||||
"to be destroyed when it is returned.");
|
||||
} else {
|
||||
if (worker->IsBlocked()) {
|
||||
// Handle the edge case where the worker was returned before we got the
|
||||
|
|
|
@ -367,10 +367,12 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
|
|||
/// We will disconnect the worker connection first and then kill the worker.
|
||||
///
|
||||
/// \param worker The worker to destroy.
|
||||
/// \param disconnect_type The reason why this worker process is disconnected.
|
||||
/// \param disconnect_detail The detailed reason for a given exit.
|
||||
/// \return Void.
|
||||
void DestroyWorker(
|
||||
std::shared_ptr<WorkerInterface> worker,
|
||||
rpc::WorkerExitType disconnect_type = rpc::WorkerExitType::SYSTEM_ERROR_EXIT);
|
||||
void DestroyWorker(std::shared_ptr<WorkerInterface> worker,
|
||||
rpc::WorkerExitType disconnect_type,
|
||||
const std::string &disconnect_detail);
|
||||
|
||||
/// When a job finished, loop over all of the queued tasks for that job and
|
||||
/// treat them as failed.
|
||||
|
@ -653,12 +655,13 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
|
|||
///
|
||||
/// \param client The client that sent the message.
|
||||
/// \param disconnect_type The reason to disconnect the specified client.
|
||||
/// \param disconnect_detail Disconnection information in details.
|
||||
/// \param client_error_message Extra error messages about this disconnection
|
||||
/// \return Void.
|
||||
void DisconnectClient(
|
||||
const std::shared_ptr<ClientConnection> &client,
|
||||
rpc::WorkerExitType disconnect_type = rpc::WorkerExitType::SYSTEM_ERROR_EXIT,
|
||||
const rpc::RayException *creation_task_exception = nullptr);
|
||||
void DisconnectClient(const std::shared_ptr<ClientConnection> &client,
|
||||
rpc::WorkerExitType disconnect_type,
|
||||
const std::string &disconnect_detail,
|
||||
const rpc::RayException *creation_task_exception = nullptr);
|
||||
|
||||
bool TryLocalGC();
|
||||
|
||||
|
|
|
@ -143,12 +143,6 @@ void Raylet::HandleAccept(const boost::system::error_code &error) {
|
|||
const std::vector<uint8_t> &message) {
|
||||
node_manager_.ProcessClientMessage(client, message_type, message.data());
|
||||
};
|
||||
flatbuffers::FlatBufferBuilder fbb;
|
||||
protocol::DisconnectClientBuilder builder(fbb);
|
||||
builder.add_disconnect_type(static_cast<int>(rpc::WorkerExitType::SYSTEM_ERROR_EXIT));
|
||||
fbb.Finish(builder.Finish());
|
||||
std::vector<uint8_t> message_data(fbb.GetBufferPointer(),
|
||||
fbb.GetBufferPointer() + fbb.GetSize());
|
||||
// Accept a new local client and dispatch it to the node manager.
|
||||
auto new_connection = ClientConnection::Create(
|
||||
client_handler,
|
||||
|
@ -156,8 +150,7 @@ void Raylet::HandleAccept(const boost::system::error_code &error) {
|
|||
std::move(socket_),
|
||||
"worker",
|
||||
node_manager_message_enum,
|
||||
static_cast<int64_t>(protocol::MessageType::DisconnectClient),
|
||||
message_data);
|
||||
static_cast<int64_t>(protocol::MessageType::DisconnectClient));
|
||||
}
|
||||
// We're ready to accept another client.
|
||||
DoAccept();
|
||||
|
|
|
@ -1390,7 +1390,7 @@ void WorkerPool::DisconnectWorker(const std::shared_ptr<WorkerInterface> &worker
|
|||
}
|
||||
}
|
||||
RemoveWorker(state.idle, worker);
|
||||
if (disconnect_type != rpc::WorkerExitType::INTENDED_EXIT) {
|
||||
if (disconnect_type != rpc::WorkerExitType::INTENDED_USER_EXIT) {
|
||||
// A Java worker process may have multiple workers. If one of them disconnects
|
||||
// unintentionally (which means that the worker process has died), we remove the
|
||||
// others from idle pool so that the failed actor will not be rescheduled on the same
|
||||
|
|
|
@ -612,7 +612,7 @@ TEST_F(WorkerPoolTest, HandleWorkerRegistration) {
|
|||
ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 0);
|
||||
for (const auto &worker : workers) {
|
||||
worker_pool_->DisconnectWorker(
|
||||
worker, /*disconnect_type=*/rpc::WorkerExitType::INTENDED_EXIT);
|
||||
worker, /*disconnect_type=*/rpc::WorkerExitType::INTENDED_USER_EXIT);
|
||||
// Check that we cannot lookup the worker after it's disconnected.
|
||||
ASSERT_EQ(worker_pool_->GetRegisteredWorker(worker->Connection()), nullptr);
|
||||
}
|
||||
|
@ -628,7 +628,7 @@ TEST_F(WorkerPoolTest, HandleWorkerRegistration) {
|
|||
worker, proc.GetId(), worker_pool_->GetStartupToken(proc), [](Status, int) {}));
|
||||
worker->SetStartupToken(worker_pool_->GetStartupToken(proc));
|
||||
worker_pool_->DisconnectWorker(
|
||||
worker, /*disconnect_type=*/rpc::WorkerExitType::INTENDED_EXIT);
|
||||
worker, /*disconnect_type=*/rpc::WorkerExitType::INTENDED_USER_EXIT);
|
||||
ASSERT_EQ(worker_pool_->NumWorkersStarting(), 0);
|
||||
}
|
||||
}
|
||||
|
@ -1527,10 +1527,12 @@ TEST_F(WorkerPoolTest, RuntimeEnvUriReferenceWorkerLevel) {
|
|||
auto popped_normal_worker = worker_pool_->PopWorkerSync(actor_creation_task_spec);
|
||||
ASSERT_EQ(GetReferenceCount(runtime_env_info.serialized_runtime_env()), 3);
|
||||
// Disconnect actor worker.
|
||||
worker_pool_->DisconnectWorker(popped_actor_worker, rpc::WorkerExitType::IDLE_EXIT);
|
||||
worker_pool_->DisconnectWorker(popped_actor_worker,
|
||||
rpc::WorkerExitType::INTENDED_USER_EXIT);
|
||||
ASSERT_EQ(GetReferenceCount(runtime_env_info.serialized_runtime_env()), 2);
|
||||
// Disconnect task worker.
|
||||
worker_pool_->DisconnectWorker(popped_normal_worker, rpc::WorkerExitType::IDLE_EXIT);
|
||||
worker_pool_->DisconnectWorker(popped_normal_worker,
|
||||
rpc::WorkerExitType::INTENDED_USER_EXIT);
|
||||
ASSERT_EQ(GetReferenceCount(runtime_env_info.serialized_runtime_env()), 1);
|
||||
// Finish the job.
|
||||
worker_pool_->HandleJobFinished(job_id);
|
||||
|
@ -1565,10 +1567,12 @@ TEST_F(WorkerPoolTest, RuntimeEnvUriReferenceWorkerLevel) {
|
|||
auto popped_normal_worker = worker_pool_->PopWorkerSync(actor_creation_task_spec);
|
||||
ASSERT_EQ(GetReferenceCount(runtime_env_info.serialized_runtime_env()), 2);
|
||||
// Disconnect actor worker.
|
||||
worker_pool_->DisconnectWorker(popped_actor_worker, rpc::WorkerExitType::IDLE_EXIT);
|
||||
worker_pool_->DisconnectWorker(popped_actor_worker,
|
||||
rpc::WorkerExitType::INTENDED_USER_EXIT);
|
||||
ASSERT_EQ(GetReferenceCount(runtime_env_info.serialized_runtime_env()), 1);
|
||||
// Disconnect task worker.
|
||||
worker_pool_->DisconnectWorker(popped_normal_worker, rpc::WorkerExitType::IDLE_EXIT);
|
||||
worker_pool_->DisconnectWorker(popped_normal_worker,
|
||||
rpc::WorkerExitType::INTENDED_USER_EXIT);
|
||||
ASSERT_EQ(GetReferenceCount(runtime_env_info.serialized_runtime_env()), 0);
|
||||
// Finish the job.
|
||||
worker_pool_->HandleJobFinished(job_id);
|
||||
|
@ -1818,8 +1822,8 @@ TEST_F(WorkerPoolTest, TestIOWorkerFailureAndSpawn) {
|
|||
RAY_CHECK_OK(
|
||||
worker_pool_->RegisterWorker(worker, proc.GetId(), token, [](Status, int) {}));
|
||||
// The worker failed before announcing the worker port (i.e. OnworkerStarted)
|
||||
worker_pool_->DisconnectWorker(
|
||||
worker, /*disconnect_type=*/rpc::WorkerExitType::SYSTEM_ERROR_EXIT);
|
||||
worker_pool_->DisconnectWorker(worker,
|
||||
/*disconnect_type=*/rpc::WorkerExitType::SYSTEM_ERROR);
|
||||
}
|
||||
|
||||
ASSERT_EQ(worker_pool_->NumSpillWorkerStarting(), 0);
|
||||
|
@ -1841,8 +1845,8 @@ TEST_F(WorkerPoolTest, TestIOWorkerFailureAndSpawn) {
|
|||
|
||||
for (const auto &worker : spill_worker_set) {
|
||||
worker_pool_->PushSpillWorker(worker);
|
||||
worker_pool_->DisconnectWorker(
|
||||
worker, /*disconnect_type=*/rpc::WorkerExitType::SYSTEM_ERROR_EXIT);
|
||||
worker_pool_->DisconnectWorker(worker,
|
||||
/*disconnect_type=*/rpc::WorkerExitType::SYSTEM_ERROR);
|
||||
}
|
||||
spill_worker_set.clear();
|
||||
|
||||
|
@ -1868,8 +1872,8 @@ TEST_F(WorkerPoolTest, TestIOWorkerFailureAndSpawn) {
|
|||
|
||||
// This time, we mock worker failure before it's returning to worker pool.
|
||||
|
||||
worker_pool_->DisconnectWorker(
|
||||
worker2, /*disconnect_type=*/rpc::WorkerExitType::SYSTEM_ERROR_EXIT);
|
||||
worker_pool_->DisconnectWorker(worker2,
|
||||
/*disconnect_type=*/rpc::WorkerExitType::SYSTEM_ERROR);
|
||||
worker_pool_->PushSpillWorker(worker2);
|
||||
spill_worker_set.clear();
|
||||
|
||||
|
|
|
@ -157,10 +157,11 @@ raylet::RayletClient::RayletClient(
|
|||
}
|
||||
|
||||
Status raylet::RayletClient::Disconnect(
|
||||
rpc::WorkerExitType exit_type,
|
||||
const rpc::WorkerExitType &exit_type,
|
||||
const std::string &exit_detail,
|
||||
const std::shared_ptr<LocalMemoryBuffer> &creation_task_exception_pb_bytes) {
|
||||
RAY_LOG(INFO) << "RayletClient::Disconnect, exit_type="
|
||||
<< rpc::WorkerExitType_Name(exit_type)
|
||||
<< rpc::WorkerExitType_Name(exit_type) << ", exit_detail=" << exit_detail
|
||||
<< ", has creation_task_exception_pb_bytes="
|
||||
<< (creation_task_exception_pb_bytes != nullptr);
|
||||
flatbuffers::FlatBufferBuilder fbb;
|
||||
|
@ -171,12 +172,14 @@ Status raylet::RayletClient::Disconnect(
|
|||
fbb.CreateVector(creation_task_exception_pb_bytes->Data(),
|
||||
creation_task_exception_pb_bytes->Size());
|
||||
}
|
||||
const auto &fb_exit_detail = fbb.CreateString(exit_detail);
|
||||
protocol::DisconnectClientBuilder builder(fbb);
|
||||
// Add to table builder here to avoid nested construction of flatbuffers
|
||||
if (creation_task_exception_pb_bytes != nullptr) {
|
||||
builder.add_creation_task_exception_pb(creation_task_exception_pb_bytes_fb_vector);
|
||||
}
|
||||
builder.add_disconnect_type(static_cast<int>(exit_type));
|
||||
builder.add_disconnect_detail(fb_exit_detail);
|
||||
fbb.Finish(builder.Finish());
|
||||
auto status = conn_->WriteMessage(MessageType::DisconnectClient, &fbb);
|
||||
// Don't be too strict for disconnection errors.
|
||||
|
|
|
@ -284,9 +284,14 @@ class RayletClient : public RayletClientInterface {
|
|||
/// is used by actors to exit gracefully so that the raylet doesn't
|
||||
/// propagate an error message to the driver.
|
||||
///
|
||||
/// It's a blocking call.
|
||||
///
|
||||
/// \param disconnect_type The reason why this worker process is disconnected.
|
||||
/// \param disconnect_detail The detailed reason for a given exit.
|
||||
/// \return ray::Status.
|
||||
ray::Status Disconnect(
|
||||
rpc::WorkerExitType exit_type,
|
||||
const rpc::WorkerExitType &exit_type,
|
||||
const std::string &exit_detail,
|
||||
const std::shared_ptr<LocalMemoryBuffer> &creation_task_exception_pb_bytes);
|
||||
|
||||
/// Tell the raylet which port this worker's gRPC server is listening on.
|
||||
|
|
Loading…
Add table
Reference in a new issue