[core] Remove push based resource report code path (#17825)

This commit is contained in:
Alex Wu 2021-08-16 12:03:38 -07:00 committed by GitHub
parent b349c6bc4f
commit 1209a87ead
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 9 additions and 63 deletions

View file

@ -241,9 +241,6 @@ RAY_CONFIG(int64_t, gcs_dump_debug_log_interval_minutes, 1)
RAY_CONFIG(int, gcs_resource_report_poll_period_ms, 100)
// The number of concurrent polls to polls to GCS.
RAY_CONFIG(uint64_t, gcs_max_concurrent_resource_pulls, 100)
// Feature flag to turn on resource report polling. Polling and raylet pushing are
// mutually exlusive.
RAY_CONFIG(bool, pull_based_resource_reporting, true)
// Feature flag to use grpc instead of redis for resource broadcast.
// TODO(ekl) broken as of https://github.com/ray-project/ray/issues/16858
RAY_CONFIG(bool, grpc_based_resource_broadcast, false)

View file

@ -51,7 +51,6 @@ class ServiceBasedGcsClientTest : public ::testing::Test {
config_.redis_port = TEST_REDIS_SERVER_PORTS.front();
// Tests legacy code paths. The poller and broadcaster have their own dedicated unit
// test targets.
config_.pull_based_resource_reporting = false;
config_.grpc_based_resource_broadcast = false;
client_io_service_.reset(new instrumented_io_context());

View file

@ -160,9 +160,7 @@ void GcsServer::Stop() {
// time, causing many nodes die after GCS's failure.
gcs_heartbeat_manager_->Stop();
if (config_.pull_based_resource_reporting) {
gcs_resource_report_poller_->Stop();
}
if (config_.grpc_based_resource_broadcast) {
grpc_based_resource_broadcaster_->Stop();
@ -342,7 +340,6 @@ void GcsServer::InitTaskInfoHandler() {
}
void GcsServer::InitResourceReportPolling(const GcsInitData &gcs_init_data) {
if (config_.pull_based_resource_reporting) {
gcs_resource_report_poller_.reset(new GcsResourceReportPoller(
raylet_client_pool_, [this](const rpc::ResourcesData &report) {
gcs_resource_manager_->UpdateFromResourceReport(report);
@ -350,7 +347,6 @@ void GcsServer::InitResourceReportPolling(const GcsInitData &gcs_init_data) {
gcs_resource_report_poller_->Initialize(gcs_init_data);
gcs_resource_report_poller_->Start();
}
}
void GcsServer::InitResourceReportBroadcasting(const GcsInitData &gcs_init_data) {
@ -427,9 +423,7 @@ void GcsServer::InstallEventListeners() {
gcs_placement_group_manager_->SchedulePendingPlacementGroups();
gcs_actor_manager_->SchedulePendingActors();
gcs_heartbeat_manager_->AddNode(NodeID::FromBinary(node->node_id()));
if (config_.pull_based_resource_reporting) {
gcs_resource_report_poller_->HandleNodeAdded(*node);
}
if (config_.grpc_based_resource_broadcast) {
grpc_based_resource_broadcaster_->HandleNodeAdded(*node);
}
@ -443,9 +437,7 @@ void GcsServer::InstallEventListeners() {
gcs_placement_group_manager_->OnNodeDead(node_id);
gcs_actor_manager_->OnNodeDead(node_id);
raylet_client_pool_->Disconnect(NodeID::FromBinary(node->node_id()));
if (config_.pull_based_resource_reporting) {
gcs_resource_report_poller_->HandleNodeRemoved(*node);
}
if (config_.grpc_based_resource_broadcast) {
grpc_based_resource_broadcaster_->HandleNodeRemoved(*node);
}

View file

@ -46,7 +46,6 @@ struct GcsServerConfig {
bool retry_redis = true;
bool enable_sharding_conn = true;
std::string node_ip_address;
bool pull_based_resource_reporting;
bool grpc_based_resource_broadcast;
bool grpc_pubsub_enabled;
};

View file

@ -103,8 +103,6 @@ int main(int argc, char *argv[]) {
gcs_server_config.redis_password = redis_password;
gcs_server_config.retry_redis = retry_redis;
gcs_server_config.node_ip_address = node_ip_address;
gcs_server_config.pull_based_resource_reporting =
RayConfig::instance().pull_based_resource_reporting();
gcs_server_config.grpc_based_resource_broadcast =
RayConfig::instance().grpc_based_resource_broadcast();
gcs_server_config.grpc_pubsub_enabled = RayConfig::instance().gcs_grpc_based_pubsub();

View file

@ -171,8 +171,6 @@ int main(int argc, char *argv[]) {
node_manager_config.min_worker_port = min_worker_port;
node_manager_config.max_worker_port = max_worker_port;
node_manager_config.worker_ports = worker_ports;
node_manager_config.pull_based_resource_reporting =
RayConfig::instance().pull_based_resource_reporting();
if (!python_worker_command.empty()) {
node_manager_config.worker_commands.emplace(

View file

@ -466,10 +466,6 @@ ray::Status NodeManager::RegisterGcs() {
"NodeManager.deadline_timer.flush_free_objects");
}
last_resource_report_at_ms_ = now_ms;
periodical_runner_.RunFnPeriodically(
[this] { ReportResourceUsage(); }, report_resources_period_ms_,
"NodeManager.deadline_timer.report_resource_usage");
/// If periodic asio stats print is enabled, it will print it.
const auto event_stats_print_interval_ms =
RayConfig::instance().event_stats_print_interval_ms();
@ -589,33 +585,6 @@ void NodeManager::FillResourceReport(rpc::ResourcesData &resources_data) {
}
}
void NodeManager::ReportResourceUsage() {
if (initial_config_.pull_based_resource_reporting) {
return;
}
uint64_t now_ms = current_time_ms();
uint64_t interval = now_ms - last_resource_report_at_ms_;
if (interval >
RayConfig::instance().num_resource_report_periods_warning() *
RayConfig::instance().raylet_report_resources_period_milliseconds()) {
RAY_LOG(WARNING)
<< "Last resource report was sent " << interval
<< " ms ago. There might be resource pressure on this node. If "
"resource reports keep lagging, scheduling decisions of other nodes "
"may become stale";
}
last_resource_report_at_ms_ = now_ms;
auto resources_data = std::make_shared<rpc::ResourcesData>();
FillResourceReport(*resources_data);
if (resources_data->resources_total_size() > 0 ||
resources_data->resources_available_changed() ||
resources_data->resource_load_changed() || resources_data->should_global_gc()) {
RAY_CHECK_OK(gcs_client_->NodeResources().AsyncReportResourceUsage(resources_data,
/*done*/ nullptr));
}
}
void NodeManager::DoLocalGC() {
auto all_workers = worker_pool_.GetAllRegisteredWorkers();
for (const auto &driver : worker_pool_.GetAllRegisteredDrivers()) {

View file

@ -103,9 +103,6 @@ struct NodeManagerConfig {
int max_io_workers;
// The minimum object size that can be spilled by each spill operation.
int64_t min_spilling_size;
// Whether to the raylet should push resource reports to GCS or wait for GCS to pull the
// reports from raylets.
bool pull_based_resource_reporting;
};
class HeartbeatSender {
@ -249,9 +246,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// report to GCS.
void FillResourceReport(rpc::ResourcesData &resources_data);
/// Report resource usage to the GCS.
void ReportResourceUsage();
/// Write out debug state to a file.
void DumpDebugState() const;