[GCS]Only publish fileds used by sub clients in WorkerTableData (#13508)

This commit is contained in:
Tao Wang 2021-01-20 16:14:59 +08:00 committed by GitHub
parent 6c9088eb62
commit b2a6e55289
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 33 additions and 19 deletions

View file

@ -657,13 +657,14 @@ class WorkerInfoAccessor {
virtual ~WorkerInfoAccessor() = default;
/// Subscribe to all unexpected failure of workers from GCS asynchronously.
/// Note that this does not include workers that failed due to node failure.
/// Note that this does not include workers that failed due to node failure
/// and only fileds in WorkerDeltaData would be published.
///
/// \param subscribe Callback that will be called each time when a worker failed.
/// \param done Callback that will be called when subscription is complete.
/// \return Status
virtual Status AsyncSubscribeToWorkerFailures(
const ItemCallback<rpc::WorkerTableData> &subscribe,
const ItemCallback<rpc::WorkerDeltaData> &subscribe,
const StatusCallback &done) = 0;
/// Report a worker failure to GCS asynchronously.

View file

@ -1311,11 +1311,11 @@ ServiceBasedWorkerInfoAccessor::ServiceBasedWorkerInfoAccessor(
: client_impl_(client_impl) {}
Status ServiceBasedWorkerInfoAccessor::AsyncSubscribeToWorkerFailures(
const ItemCallback<rpc::WorkerTableData> &subscribe, const StatusCallback &done) {
const ItemCallback<rpc::WorkerDeltaData> &subscribe, const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr);
subscribe_operation_ = [this, subscribe](const StatusCallback &done) {
auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
rpc::WorkerTableData worker_failure_data;
rpc::WorkerDeltaData worker_failure_data;
worker_failure_data.ParseFromString(data);
subscribe(worker_failure_data);
};

View file

@ -407,7 +407,7 @@ class ServiceBasedWorkerInfoAccessor : public WorkerInfoAccessor {
virtual ~ServiceBasedWorkerInfoAccessor() = default;
Status AsyncSubscribeToWorkerFailures(
const ItemCallback<rpc::WorkerTableData> &subscribe,
const ItemCallback<rpc::WorkerDeltaData> &subscribe,
const StatusCallback &done) override;
Status AsyncReportWorkerFailure(const std::shared_ptr<rpc::WorkerTableData> &data_ptr,

View file

@ -495,7 +495,7 @@ class ServiceBasedGcsClientTest : public ::testing::Test {
}
bool SubscribeToWorkerFailures(
const gcs::ItemCallback<rpc::WorkerTableData> &subscribe) {
const gcs::ItemCallback<rpc::WorkerDeltaData> &subscribe) {
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Workers().AsyncSubscribeToWorkerFailures(
subscribe, [&promise](Status status) { promise.set_value(status.ok()); }));
@ -922,7 +922,7 @@ TEST_F(ServiceBasedGcsClientTest, TestStats) {
TEST_F(ServiceBasedGcsClientTest, TestWorkerInfo) {
// Subscribe to all unexpected failure of workers from GCS.
std::atomic<int> worker_failure_count(0);
auto on_subscribe = [&worker_failure_count](const rpc::WorkerTableData &result) {
auto on_subscribe = [&worker_failure_count](const rpc::WorkerDeltaData &result) {
++worker_failure_count;
};
ASSERT_TRUE(SubscribeToWorkerFailures(on_subscribe));
@ -1168,7 +1168,7 @@ TEST_F(ServiceBasedGcsClientTest, TestTaskTableResubscribe) {
TEST_F(ServiceBasedGcsClientTest, TestWorkerTableResubscribe) {
// Subscribe to all unexpected failure of workers from GCS.
std::atomic<int> worker_failure_count(0);
auto on_subscribe = [&worker_failure_count](const rpc::WorkerTableData &result) {
auto on_subscribe = [&worker_failure_count](const rpc::WorkerDeltaData &result) {
++worker_failure_count;
};
ASSERT_TRUE(SubscribeToWorkerFailures(on_subscribe));

View file

@ -52,8 +52,15 @@ void GcsWorkerManager::HandleReportWorkerFailure(
<< ", address = " << worker_address.ip_address();
} else {
stats::UnintentionalWorkerFailures.Record(1);
// Only publish worker_id and raylet_id in address as they are the only fields used
// by sub clients.
auto worker_failure_delta = std::make_shared<rpc::WorkerDeltaData>();
worker_failure_delta->set_worker_id(
worker_failure_data->worker_address().worker_id());
worker_failure_delta->set_raylet_id(
worker_failure_data->worker_address().raylet_id());
RAY_CHECK_OK(gcs_pub_sub_->Publish(WORKER_CHANNEL, worker_id.Hex(),
worker_failure_data->SerializeAsString(),
worker_failure_delta->SerializeAsString(),
nullptr));
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);

View file

@ -393,6 +393,12 @@ message WorkerTableData {
map<string, bytes> worker_info = 6;
}
// Fields to publish when worker fails.
message WorkerDeltaData {
bytes raylet_id = 1;
bytes worker_id = 2;
}
message ResourceMap {
map<string, ResourceTableData> items = 1;
}

View file

@ -299,8 +299,8 @@ ray::Status NodeManager::RegisterGcs() {
// node failure. These workers can be identified by comparing the raylet_id
// in their rpc::Address to the ID of a failed raylet.
const auto &worker_failure_handler =
[this](const rpc::WorkerTableData &worker_failure_data) {
HandleUnexpectedWorkerFailure(worker_failure_data.worker_address());
[this](const rpc::WorkerDeltaData &worker_failure_data) {
HandleUnexpectedWorkerFailure(worker_failure_data);
};
RAY_CHECK_OK(gcs_client_->Workers().AsyncSubscribeToWorkerFailures(
worker_failure_handler, /*done_callback=*/nullptr));
@ -716,14 +716,14 @@ void NodeManager::NodeRemoved(const NodeID &node_id) {
// Clean up workers that were owned by processes that were on the failed
// node.
rpc::Address address;
address.set_raylet_id(node_id.Binary());
HandleUnexpectedWorkerFailure(address);
rpc::WorkerDeltaData data;
data.set_raylet_id(node_id.Binary());
HandleUnexpectedWorkerFailure(data);
}
void NodeManager::HandleUnexpectedWorkerFailure(const rpc::Address &address) {
const WorkerID worker_id = WorkerID::FromBinary(address.worker_id());
const NodeID node_id = NodeID::FromBinary(address.raylet_id());
void NodeManager::HandleUnexpectedWorkerFailure(const rpc::WorkerDeltaData &data) {
const WorkerID worker_id = WorkerID::FromBinary(data.worker_id());
const NodeID node_id = NodeID::FromBinary(data.raylet_id());
if (!worker_id.IsNil()) {
RAY_LOG(DEBUG) << "Worker " << worker_id << " failed";
failed_workers_cache_.insert(worker_id);

View file

@ -172,8 +172,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
/// Handle an unexpected failure notification from GCS pubsub.
///
/// \param worker_address The address of the worker that died.
void HandleUnexpectedWorkerFailure(const rpc::Address &worker_address);
/// \param data The data of the worker that died.
void HandleUnexpectedWorkerFailure(const rpc::WorkerDeltaData &data);
/// Handler for the addition of a new node.
///