[core] refactor disconnect message processing and enrich WorkExitType (#13527)

* [core] refactor disconnect message processing and enrich WorkExitType

add changes from refactor pr

fix type typo

fix typo

fix

* address comments

* also update WorkerTableData

* fix tests
This commit is contained in:
Keqiu Hu 2021-01-19 22:09:46 -08:00 committed by GitHub
parent e544c008df
commit 6c9088eb62
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 93 additions and 58 deletions

View file

@ -281,10 +281,11 @@ void ServerConnection::DoAsyncWrites() {
std::shared_ptr<ClientConnection> ClientConnection::Create(
ClientHandler &client_handler, MessageHandler &message_handler,
local_stream_socket &&socket, const std::string &debug_label,
const std::vector<std::string> &message_type_enum_names, int64_t error_message_type) {
std::shared_ptr<ClientConnection> self(
new ClientConnection(message_handler, std::move(socket), debug_label,
message_type_enum_names, error_message_type));
const std::vector<std::string> &message_type_enum_names, int64_t error_message_type,
const std::vector<uint8_t> &error_message_data) {
std::shared_ptr<ClientConnection> self(new ClientConnection(
message_handler, std::move(socket), debug_label, message_type_enum_names,
error_message_type, error_message_data));
// Let our manager process our new connection.
client_handler(*self);
return self;
@ -293,13 +294,15 @@ std::shared_ptr<ClientConnection> ClientConnection::Create(
ClientConnection::ClientConnection(
MessageHandler &message_handler, local_stream_socket &&socket,
const std::string &debug_label,
const std::vector<std::string> &message_type_enum_names, int64_t error_message_type)
const std::vector<std::string> &message_type_enum_names, int64_t error_message_type,
const std::vector<uint8_t> &error_message_data)
: ServerConnection(std::move(socket)),
registered_(false),
message_handler_(message_handler),
debug_label_(debug_label),
message_type_enum_names_(message_type_enum_names),
error_message_type_(error_message_type) {}
error_message_type_(error_message_type),
error_message_data_(error_message_data) {}
void ClientConnection::Register() {
RAY_CHECK(!registered_);
@ -324,6 +327,7 @@ void ClientConnection::ProcessMessageHeader(const boost::system::error_code &err
if (error) {
// If there was an error, disconnect the client.
read_type_ = error_message_type_;
read_message_ = error_message_data_;
read_length_ = 0;
ProcessMessage(error);
return;

View file

@ -170,6 +170,7 @@ class ClientConnection;
using ClientHandler = std::function<void(ClientConnection &)>;
using MessageHandler = std::function<void(std::shared_ptr<ClientConnection>, int64_t,
const std::vector<uint8_t> &)>;
static std::vector<uint8_t> _dummy_error_message_data;
/// \typename ClientConnection
///
@ -189,12 +190,14 @@ class ClientConnection : public ServerConnection {
/// the type of client.
/// \param message_type_enum_names A table of printable enum names for the
/// message types received from this client, used for debug messages.
/// \param error_message_type the type of error message
/// \param error_message_data the companion data to the error message type.
/// \return std::shared_ptr<ClientConnection>.
static std::shared_ptr<ClientConnection> Create(
ClientHandler &new_client_handler, MessageHandler &message_handler,
local_stream_socket &&socket, const std::string &debug_label,
const std::vector<std::string> &message_type_enum_names,
int64_t error_message_type);
const std::vector<std::string> &message_type_enum_names, int64_t error_message_type,
const std::vector<uint8_t> &error_message_data = _dummy_error_message_data);
std::shared_ptr<ClientConnection> shared_ClientConnection_from_this() {
return std::static_pointer_cast<ClientConnection>(shared_from_this());
@ -210,10 +213,11 @@ class ClientConnection : public ServerConnection {
protected:
/// A protected constructor for a node client connection.
ClientConnection(MessageHandler &message_handler, local_stream_socket &&socket,
const std::string &debug_label,
const std::vector<std::string> &message_type_enum_names,
int64_t error_message_type);
ClientConnection(
MessageHandler &message_handler, local_stream_socket &&socket,
const std::string &debug_label,
const std::vector<std::string> &message_type_enum_names, int64_t error_message_type,
const std::vector<uint8_t> &error_message_data = _dummy_error_message_data);
/// Process an error from the last operation, then process the message
/// header from the client.
void ProcessMessageHeader(const boost::system::error_code &error);
@ -243,6 +247,8 @@ class ClientConnection : public ServerConnection {
const std::vector<std::string> message_type_enum_names_;
/// The value for disconnect client message.
int64_t error_message_type_;
/// The data for disconnect client message.
std::vector<uint8_t> error_message_data_;
/// Buffers for the current message being read from the client.
int64_t read_cookie_;
int64_t read_type_;

View file

@ -542,7 +542,8 @@ void GcsActorManager::CollectStats() const {
void GcsActorManager::OnWorkerDead(const ray::NodeID &node_id,
const ray::WorkerID &worker_id,
bool intentional_exit) {
const rpc::WorkerExitType disconnect_type) {
bool intentional_exit = disconnect_type == rpc::WorkerExitType::INTENDED_EXIT;
if (intentional_exit) {
RAY_LOG(INFO) << "Worker " << worker_id << " on node " << node_id
<< " intentional exit.";

View file

@ -238,10 +238,10 @@ class GcsActorManager : public rpc::ActorInfoHandler {
///
/// \param node_id ID of the node where the dead worker was located.
/// \param worker_id ID of the dead worker.
/// \param intentional_exit Whether the death was intentional. If yes and the
/// worker was an actor, we should not attempt to restart the actor.
void OnWorkerDead(const NodeID &node_id, const WorkerID &worker_id,
bool intentional_exit = false);
/// \param exit_type exit reason of the dead worker.
void OnWorkerDead(
const NodeID &node_id, const WorkerID &worker_id,
const rpc::WorkerExitType disconnect_type = rpc::WorkerExitType::SYSTEM_ERROR_EXIT);
/// Handle actor creation task failure. This should be called when scheduling
/// an actor creation task is infeasible.

View file

@ -326,7 +326,7 @@ void GcsServer::InstallEventListeners() {
auto worker_id = WorkerID::FromBinary(worker_address.worker_id());
auto node_id = NodeID::FromBinary(worker_address.raylet_id());
gcs_actor_manager_->OnWorkerDead(node_id, worker_id,
worker_failure_data->intentional_disconnect());
worker_failure_data->exit_type());
});
// Install job event listeners.

View file

@ -28,7 +28,7 @@ void GcsWorkerManager::HandleReportWorkerFailure(
log_stream << "Reporting worker failure, worker id = " << worker_id
<< ", node id = " << node_id
<< ", address = " << worker_address.ip_address();
if (request.worker_failure().intentional_disconnect()) {
if (request.worker_failure().exit_type() == rpc::WorkerExitType::INTENDED_EXIT) {
RAY_LOG(INFO) << log_stream.str();
} else {
RAY_LOG(WARNING) << log_stream.str()

View file

@ -737,7 +737,7 @@ TEST_F(GcsActorManagerTest, TestRaceConditionCancelLease) {
actor->UpdateAddress(address);
const auto actor_id = actor->GetActorID();
EXPECT_CALL(*mock_actor_scheduler_, CancelOnLeasing(node_id, actor_id));
gcs_actor_manager_->OnWorkerDead(owner_node_id, owner_worker_id, false);
gcs_actor_manager_->OnWorkerDead(owner_node_id, owner_worker_id);
}
TEST_F(GcsActorManagerTest, TestRegisterActor) {
@ -860,10 +860,10 @@ TEST_F(GcsActorManagerTest, TestOwnerAndChildDiedAtTheSameTimeRaceCondition) {
const auto child_worker_id = actor->GetWorkerID();
const auto actor_id = actor->GetActorID();
// Make worker & owner fail at the same time, but owner's failure comes first.
gcs_actor_manager_->OnWorkerDead(owner_node_id, owner_worker_id, false);
gcs_actor_manager_->OnWorkerDead(owner_node_id, owner_worker_id);
EXPECT_CALL(*mock_actor_scheduler_, CancelOnWorker(child_node_id, child_worker_id))
.WillOnce(Return(actor_id));
gcs_actor_manager_->OnWorkerDead(child_node_id, child_worker_id, false);
gcs_actor_manager_->OnWorkerDead(child_node_id, child_worker_id);
}
} // namespace ray

View file

@ -87,14 +87,14 @@ inline std::shared_ptr<ray::rpc::ActorTableData> CreateActorTableData(
inline std::shared_ptr<ray::rpc::WorkerTableData> CreateWorkerFailureData(
const NodeID &raylet_id, const WorkerID &worker_id, const std::string &address,
int32_t port, int64_t timestamp = std::time(nullptr),
bool intentional_disconnect = false) {
rpc::WorkerExitType disconnect_type = rpc::WorkerExitType::SYSTEM_ERROR_EXIT) {
auto worker_failure_info_ptr = std::make_shared<ray::rpc::WorkerTableData>();
worker_failure_info_ptr->mutable_worker_address()->set_raylet_id(raylet_id.Binary());
worker_failure_info_ptr->mutable_worker_address()->set_worker_id(worker_id.Binary());
worker_failure_info_ptr->mutable_worker_address()->set_ip_address(address);
worker_failure_info_ptr->mutable_worker_address()->set_port(port);
worker_failure_info_ptr->set_timestamp(timestamp);
worker_failure_info_ptr->set_intentional_disconnect(intentional_disconnect);
worker_failure_info_ptr->set_exit_type(disconnect_type);
return worker_failure_info_ptr;
}

View file

@ -442,3 +442,15 @@ message MetricPoint {
// [Optional] Unit of the metric.
string units = 6;
}
// Type of a worker exit.
enum WorkerExitType {
// Worker exit due to system level failures (i.e. worker crash).
SYSTEM_ERROR_EXIT = 0;
// Intended, initiated worker exit via raylet API.
INTENDED_EXIT = 1;
// Worker exit due to resource bundle release.
UNUSED_RESOURCE_RELEASED = 2;
// Worker exit due to placement group removal.
PLACEMENT_GROUP_REMOVED = 3;
}

View file

@ -386,7 +386,7 @@ message WorkerTableData {
// The UNIX timestamp at which this worker's state was updated.
int64 timestamp = 3;
// Whether it's an intentional disconnect, only applies then `is_alive` is false.
bool intentional_disconnect = 4;
WorkerExitType exit_type = 4;
// Type of this worker.
WorkerType worker_type = 5;
// This is for AddWorker.

View file

@ -35,12 +35,9 @@ enum MessageType:int {
RegisterClientReply,
// Send the worker's gRPC port to the raylet.
AnnounceWorkerPort,
// Notify the raylet that this client is disconnecting unexpectedly.
// Notify the raylet that this client is disconnecting.
// This is sent from a worker to a raylet.
DisconnectClient,
// Notify the raylet that this client is disconnecting gracefully.
// This is sent from a worker to a raylet.
IntentionalDisconnectClient,
// Tell a worker to execute a task. This is sent from a raylet to a
// worker.
ExecuteTask,
@ -111,6 +108,7 @@ table ResourceIdSetInfo {
// This message is sent from a worker to the node manager.
table DisconnectClient {
disconnect_type: int;
}
table ResourceIdSetInfos {

View file

@ -350,11 +350,12 @@ void NodeManager::KillWorker(std::shared_ptr<WorkerInterface> worker) {
});
}
void NodeManager::DestroyWorker(std::shared_ptr<WorkerInterface> worker) {
void NodeManager::DestroyWorker(std::shared_ptr<WorkerInterface> worker,
rpc::WorkerExitType disconnect_type) {
// We should disconnect the client first. Otherwise, we'll remove bundle resources
// before actual resources are returned. Subsequent disconnect request that comes
// due to worker dead will be ignored.
ProcessDisconnectClientMessage(worker->Connection(), /* intentional exit */ true);
DisconnectClient(worker->Connection(), disconnect_type);
worker->MarkDead();
KillWorker(worker);
}
@ -544,7 +545,7 @@ void NodeManager::HandleReleaseUnusedBundles(
<< ", task id: " << worker->GetAssignedTaskId()
<< ", actor id: " << worker->GetActorId()
<< ", worker id: " << worker->WorkerId();
DestroyWorker(worker);
DestroyWorker(worker, rpc::WorkerExitType::UNUSED_RESOURCE_RELEASED);
}
// Return unused bundle resources.
@ -940,8 +941,7 @@ void NodeManager::ProcessClientMessage(const std::shared_ptr<ClientConnection> &
if (registered_worker && registered_worker->IsDead()) {
// For a worker that is marked as dead (because the job has died already),
// all the messages are ignored except DisconnectClient.
if ((message_type_value != protocol::MessageType::DisconnectClient) &&
(message_type_value != protocol::MessageType::IntentionalDisconnectClient)) {
if (message_type_value != protocol::MessageType::DisconnectClient) {
// Listen for more messages.
client->ProcessMessages();
return;
@ -959,13 +959,7 @@ void NodeManager::ProcessClientMessage(const std::shared_ptr<ClientConnection> &
HandleWorkerAvailable(client);
} break;
case protocol::MessageType::DisconnectClient: {
ProcessDisconnectClientMessage(client);
// We don't need to receive future messages from this client,
// because it's already disconnected.
return;
} break;
case protocol::MessageType::IntentionalDisconnectClient: {
ProcessDisconnectClientMessage(client, /* intentional_disconnect = */ true);
ProcessDisconnectClientMessage(client, message_data);
// We don't need to receive future messages from this client,
// because it's already disconnected.
return;
@ -1068,7 +1062,7 @@ void NodeManager::ProcessRegisterClientRequestMessage(
static_cast<int64_t>(protocol::MessageType::RegisterClientReply), fbb.GetSize(),
fbb.GetBufferPointer(), [this, client](const ray::Status &status) {
if (!status.ok()) {
ProcessDisconnectClientMessage(client);
DisconnectClient(client);
}
});
};
@ -1163,8 +1157,8 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr<WorkerInterface> &
cluster_task_manager_->ScheduleAndDispatchTasks();
}
void NodeManager::ProcessDisconnectClientMessage(
const std::shared_ptr<ClientConnection> &client, bool intentional_disconnect) {
void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &client,
rpc::WorkerExitType disconnect_type) {
std::shared_ptr<WorkerInterface> worker = worker_pool_.GetRegisteredWorker(client);
bool is_worker = false, is_driver = false;
if (worker) {
@ -1187,7 +1181,6 @@ void NodeManager::ProcessDisconnectClientMessage(
if (is_worker && worker->IsDead()) {
// If the worker was killed by us because the driver exited,
// treat it as intentionally disconnected.
intentional_disconnect = true;
// Don't need to unblock the client if it's a worker and is already dead.
// Because in this case, its task is already cleaned up.
RAY_LOG(DEBUG) << "Skip unblocking worker because it's already dead.";
@ -1209,7 +1202,7 @@ void NodeManager::ProcessDisconnectClientMessage(
// Publish the worker failure.
auto worker_failure_data_ptr =
gcs::CreateWorkerFailureData(self_node_id_, worker->WorkerId(), worker->IpAddress(),
worker->Port(), time(nullptr), intentional_disconnect);
worker->Port(), time(nullptr), disconnect_type);
RAY_CHECK_OK(
gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, nullptr));
@ -1225,7 +1218,7 @@ void NodeManager::ProcessDisconnectClientMessage(
static_cast<void>(local_queues_.RemoveTask(task_id, &task));
}
if (!intentional_disconnect) {
if (disconnect_type == rpc::WorkerExitType::SYSTEM_ERROR_EXIT) {
// Push the error to driver.
const JobID &job_id = worker->GetAssignedJobId();
// TODO(rkn): Define this constant somewhere else.
@ -1268,6 +1261,13 @@ void NodeManager::ProcessDisconnectClientMessage(
// these can be leaked.
}
void NodeManager::ProcessDisconnectClientMessage(
const std::shared_ptr<ClientConnection> &client, const uint8_t *message_data) {
auto message = flatbuffers::GetRoot<protocol::DisconnectClient>(message_data);
auto disconnect_type = static_cast<rpc::WorkerExitType>(message->disconnect_type());
DisconnectClient(client, disconnect_type);
}
void NodeManager::ProcessFetchOrReconstructMessage(
const std::shared_ptr<ClientConnection> &client, const uint8_t *message_data) {
auto message = flatbuffers::GetRoot<protocol::FetchOrReconstruct>(message_data);
@ -1356,7 +1356,7 @@ void NodeManager::ProcessWaitRequestMessage(
}
} else {
// We failed to write to the client, so disconnect the client.
ProcessDisconnectClientMessage(client);
DisconnectClient(client);
}
});
RAY_CHECK_OK(status);
@ -1505,7 +1505,7 @@ void NodeManager::HandleCancelResourceReserve(
<< ", task id: " << worker->GetAssignedTaskId()
<< ", actor id: " << worker->GetActorId()
<< ", worker id: " << worker->WorkerId();
DestroyWorker(worker);
DestroyWorker(worker, rpc::WorkerExitType::PLACEMENT_GROUP_REMOVED);
}
// Return bundle resources.
@ -1527,7 +1527,7 @@ void NodeManager::HandleReturnWorker(const rpc::ReturnWorkerRequest &request,
if (worker) {
if (request.disconnect_worker()) {
ProcessDisconnectClientMessage(worker->Connection());
DisconnectClient(worker->Connection());
} else {
// Handle the edge case where the worker was returned before we got the
// unblock RPC by unblocking it immediately (unblock is idempotent).
@ -2190,7 +2190,7 @@ void NodeManager::FinishAssignTask(const std::shared_ptr<WorkerInterface> &worke
} else {
RAY_LOG(WARNING) << "Failed to send task to worker, disconnecting client";
// We failed to send the task to the worker, so disconnect the worker.
ProcessDisconnectClientMessage(worker->Connection());
DisconnectClient(worker->Connection());
// Queue this task for future assignment. We need to do this since
// DispatchTasks() removed it from the ready queue. The task will be
// assigned to a worker once one becomes available.

View file

@ -393,7 +393,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
///
/// \param worker The worker to destroy.
/// \return Void.
void DestroyWorker(std::shared_ptr<WorkerInterface> worker);
void DestroyWorker(
std::shared_ptr<WorkerInterface> worker,
rpc::WorkerExitType disconnect_type = rpc::WorkerExitType::SYSTEM_ERROR_EXIT);
/// When a job finished, loop over all of the queued tasks for that job and
/// treat them as failed.
@ -476,10 +478,10 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
/// client.
///
/// \param client The client that sent the message.
/// \param intentional_disconnect Whether the client was intentionally disconnected.
/// \param message_data A pointer to the message data.
/// \return Void.
void ProcessDisconnectClientMessage(const std::shared_ptr<ClientConnection> &client,
bool intentional_disconnect = false);
const uint8_t *message_data);
/// Process client message of FetchOrReconstruct
///
@ -740,7 +742,14 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
/// object.
void OnObjectMissing(const ObjectID &object_id,
const std::vector<TaskID> &waiting_task_ids) override;
/// Disconnect a client.
///
/// \param client The client that sent the message.
/// \param disconnect_type The reason to disconnect the specified client.
/// \return Void.
void DisconnectClient(
const std::shared_ptr<ClientConnection> &client,
rpc::WorkerExitType disconnect_type = rpc::WorkerExitType::SYSTEM_ERROR_EXIT);
/// The helper to dump the debug state of the cluster task manater.
std::string DebugStr() const override;

View file

@ -173,11 +173,15 @@ void Raylet::HandleAccept(const boost::system::error_code &error) {
const std::vector<uint8_t> &message) {
node_manager_.ProcessClientMessage(client, message_type, message.data());
};
flatbuffers::FlatBufferBuilder fbb;
fbb.Finish(protocol::CreateDisconnectClient(fbb));
std::vector<uint8_t> message_data(fbb.GetBufferPointer(),
fbb.GetBufferPointer() + fbb.GetSize());
// Accept a new local client and dispatch it to the node manager.
auto new_connection = ClientConnection::Create(
client_handler, message_handler, std::move(socket_), "worker",
node_manager_message_enum,
static_cast<int64_t>(protocol::MessageType::DisconnectClient));
static_cast<int64_t>(protocol::MessageType::DisconnectClient), message_data);
}
// We're ready to accept another client.
DoAccept();

View file

@ -137,9 +137,10 @@ raylet::RayletClient::RayletClient(
Status raylet::RayletClient::Disconnect() {
flatbuffers::FlatBufferBuilder fbb;
auto message = protocol::CreateDisconnectClient(fbb);
auto message = protocol::CreateDisconnectClient(
fbb, static_cast<int>(rpc::WorkerExitType::INTENDED_EXIT));
fbb.Finish(message);
auto status = conn_->WriteMessage(MessageType::IntentionalDisconnectClient, &fbb);
auto status = conn_->WriteMessage(MessageType::DisconnectClient, &fbb);
// Don't be too strict for disconnection errors.
// Just create logs and prevent it from crash.
if (!status.ok()) {