Gcs pull resource reports (#14336)

This commit is contained in:
Alex Wu 2021-03-29 11:36:30 -07:00 committed by GitHub
parent 4f66309e19
commit 1f4d4dfeb0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 663 additions and 31 deletions

View file

@ -1218,6 +1218,19 @@ cc_test(
],
)
cc_test(
name = "gcs_resource_report_poller_test",
srcs = [
"src/ray/gcs/gcs_server/test/gcs_resource_report_poller_test.cc",
],
copts = COPTS,
deps = [
":gcs_server_lib",
":gcs_test_util_lib",
"@com_google_googletest//:gtest_main",
],
)
cc_library(
name = "service_based_gcs_client_lib",
srcs = glob(

View file

@ -229,6 +229,13 @@ RAY_CONFIG(uint32_t, maximum_gcs_destroyed_actor_cached_count, 100000)
RAY_CONFIG(uint32_t, maximum_gcs_dead_node_cached_count, 1000)
/// The interval at which the gcs server will print debug info.
RAY_CONFIG(int64_t, gcs_dump_debug_log_interval_minutes, 1)
// The interval at which the gcs server will pull a new resource.
RAY_CONFIG(int, gcs_resource_report_poll_period_ms, 100)
// The number of concurrent polls to polls to GCS.
RAY_CONFIG(uint64_t, gcs_max_concurrent_resource_pulls, 100)
// Feature flag to turn on resource report polling. Polling and raylet pushing are
// mutually exlusive.
RAY_CONFIG(bool, pull_based_resource_reporting, true)
/// Duration to sleep after failing to put an object in plasma because it is full.
RAY_CONFIG(uint32_t, object_store_full_delay_ms, 10)

View file

@ -161,12 +161,10 @@ void GcsResourceManager::HandleGetAllAvailableResources(
++counts_[CountType::GET_ALL_AVAILABLE_RESOURCES_REQUEST];
}
void GcsResourceManager::HandleReportResourceUsage(
const rpc::ReportResourceUsageRequest &request, rpc::ReportResourceUsageReply *reply,
rpc::SendReplyCallback send_reply_callback) {
NodeID node_id = NodeID::FromBinary(request.resources().node_id());
void GcsResourceManager::UpdateFromResourceReport(const rpc::ResourcesData &data) {
NodeID node_id = NodeID::FromBinary(data.node_id());
auto resources_data = std::make_shared<rpc::ResourcesData>();
resources_data->CopyFrom(request.resources());
resources_data->CopyFrom(data);
// We use `node_resource_usages_` to filter out the nodes that report resource
// information for the first time. `UpdateNodeResourceUsage` will modify
@ -177,13 +175,19 @@ void GcsResourceManager::HandleReportResourceUsage(
SetAvailableResources(node_id, ResourceSet(resource_changed));
}
UpdateNodeResourceUsage(node_id, request);
UpdateNodeResourceUsage(node_id, data);
if (resources_data->should_global_gc() || resources_data->resources_total_size() > 0 ||
resources_data->resources_available_changed() ||
resources_data->resource_load_changed()) {
resources_buffer_[node_id] = *resources_data;
}
}
void GcsResourceManager::HandleReportResourceUsage(
const rpc::ReportResourceUsageRequest &request, rpc::ReportResourceUsageReply *reply,
rpc::SendReplyCallback send_reply_callback) {
UpdateFromResourceReport(request.resources());
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
++counts_[CountType::REPORT_RESOURCE_USAGE_REQUEST];
@ -238,26 +242,24 @@ void GcsResourceManager::HandleGetAllResourceUsage(
++counts_[CountType::GET_ALL_RESOURCE_USAGE_REQUEST];
}
void GcsResourceManager::UpdateNodeResourceUsage(
const NodeID node_id, const rpc::ReportResourceUsageRequest &request) {
void GcsResourceManager::UpdateNodeResourceUsage(const NodeID &node_id,
const rpc::ResourcesData &resources) {
auto iter = node_resource_usages_.find(node_id);
if (iter == node_resource_usages_.end()) {
auto resources_data = std::make_shared<rpc::ResourcesData>();
resources_data->CopyFrom(request.resources());
resources_data->CopyFrom(resources);
node_resource_usages_[node_id] = *resources_data;
} else {
if (request.resources().resources_total_size() > 0) {
(*iter->second.mutable_resources_total()) = request.resources().resources_total();
if (resources.resources_total_size() > 0) {
(*iter->second.mutable_resources_total()) = resources.resources_total();
}
if (request.resources().resources_available_changed()) {
(*iter->second.mutable_resources_available()) =
request.resources().resources_available();
if (resources.resources_available_changed()) {
(*iter->second.mutable_resources_available()) = resources.resources_available();
}
if (request.resources().resource_load_changed()) {
(*iter->second.mutable_resource_load()) = request.resources().resource_load();
if (resources.resource_load_changed()) {
(*iter->second.mutable_resource_load()) = resources.resource_load();
}
(*iter->second.mutable_resource_load_by_shape()) =
request.resources().resource_load_by_shape();
(*iter->second.mutable_resource_load_by_shape()) = resources.resource_load_by_shape();
}
}

View file

@ -136,8 +136,12 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler {
///
/// \param node_id Node id.
/// \param request Request containing resource usage.
void UpdateNodeResourceUsage(const NodeID node_id,
const rpc::ReportResourceUsageRequest &request);
void UpdateNodeResourceUsage(const NodeID &node_id,
const rpc::ResourcesData &resources);
/// Process a new resource report from a node, independent of the rpc handler it came
/// from.
void UpdateFromResourceReport(const rpc::ResourcesData &data);
/// Update the placement group load information so that it will be reported through
/// heartbeat.

View file

@ -0,0 +1,147 @@
#include "ray/gcs/gcs_server/gcs_resource_report_poller.h"
namespace ray {
namespace gcs {
GcsResourceReportPoller::GcsResourceReportPoller(
std::shared_ptr<GcsResourceManager> gcs_resource_manager,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
std::function<void(const rpc::ResourcesData &)> handle_resource_report,
std::function<int64_t(void)> get_current_time_milli,
std::function<void(
const rpc::Address &, std::shared_ptr<rpc::NodeManagerClientPool> &,
std::function<void(const Status &, const rpc::RequestResourceReportReply &)>)>
request_report)
: ticker_(polling_service_),
max_concurrent_pulls_(RayConfig::instance().gcs_max_concurrent_resource_pulls()),
inflight_pulls_(0),
gcs_resource_manager_(gcs_resource_manager),
raylet_client_pool_(raylet_client_pool),
handle_resource_report_(handle_resource_report),
get_current_time_milli_(get_current_time_milli),
request_report_(request_report),
poll_period_ms_(RayConfig::instance().gcs_resource_report_poll_period_ms()) {}
GcsResourceReportPoller::~GcsResourceReportPoller() { Stop(); }
void GcsResourceReportPoller::Initialize(const GcsInitData &gcs_init_data) {
for (const auto &pair : gcs_init_data.Nodes()) {
HandleNodeAdded(pair.second);
}
}
void GcsResourceReportPoller::Start() {
polling_thread_.reset(new std::thread{[this]() {
SetThreadName("resource_report_poller");
boost::asio::io_service::work work(polling_service_);
polling_service_.run();
RAY_LOG(DEBUG) << "GCSResourceReportPoller has stopped. This should only happen if "
"the cluster has stopped";
}});
ticker_.RunFnPeriodically([this] { TryPullResourceReport(); }, 10);
}
void GcsResourceReportPoller::Stop() {
if (polling_thread_ != nullptr) {
// TODO (Alex): There's technically a race condition here if we start and stop the
// thread in rapid succession.
polling_service_.stop();
if (polling_thread_->joinable()) {
polling_thread_->join();
}
}
}
void GcsResourceReportPoller::HandleNodeAdded(const rpc::GcsNodeInfo &node_info) {
absl::MutexLock guard(&mutex_);
rpc::Address address;
address.set_raylet_id(node_info.node_id());
address.set_ip_address(node_info.node_manager_address());
address.set_port(node_info.node_manager_port());
auto state =
std::make_shared<PullState>(NodeID::FromBinary(node_info.node_id()),
std::move(address), -1, get_current_time_milli_());
const auto &node_id = state->node_id;
RAY_CHECK(!nodes_.count(node_id)) << "Node with id: " << node_id << " was added twice!";
nodes_[node_id] = state;
to_pull_queue_.push_front(state);
RAY_LOG(DEBUG) << "Node was added with id: " << node_id;
polling_service_.post([this]() { TryPullResourceReport(); });
}
void GcsResourceReportPoller::HandleNodeRemoved(const rpc::GcsNodeInfo &node_info) {
NodeID node_id = NodeID::FromBinary(node_info.node_id());
{
absl::MutexLock guard(&mutex_);
nodes_.erase(node_id);
RAY_CHECK(!nodes_.count(node_id));
RAY_LOG(DEBUG) << "Node removed (node_id: " << node_id
<< ")# of remaining nodes: " << nodes_.size();
}
}
void GcsResourceReportPoller::TryPullResourceReport() {
absl::MutexLock guard(&mutex_);
int64_t cur_time = get_current_time_milli_();
RAY_LOG(DEBUG) << "Trying to pull inflight_pulls " << inflight_pulls_ << "/"
<< max_concurrent_pulls_ << ", queue size: " << to_pull_queue_.size();
while (inflight_pulls_ < max_concurrent_pulls_ && !to_pull_queue_.empty()) {
auto to_pull = to_pull_queue_.front();
if (cur_time < to_pull->next_pull_time) {
break;
}
to_pull_queue_.pop_front();
if (!nodes_.count(to_pull->node_id)) {
RAY_LOG(DEBUG)
<< "Update finished, but node was already removed from the cluster. Ignoring.";
continue;
}
PullResourceReport(to_pull);
}
}
void GcsResourceReportPoller::PullResourceReport(const std::shared_ptr<PullState> state) {
inflight_pulls_++;
request_report_(
state->address, raylet_client_pool_,
[this, state](const Status &status, const rpc::RequestResourceReportReply &reply) {
if (status.ok()) {
// TODO (Alex): This callback is always posted onto the main thread. Since most
// of the work is in the callback we should move this callback's execution to
// the polling thread. We will need to implement locking once we switch threads.
handle_resource_report_(reply.resources());
} else {
RAY_LOG(INFO) << "Couldn't get resource request from raylet " << state->node_id
<< ": " << status.ToString();
}
polling_service_.post([this, state]() { NodeResourceReportReceived(state); });
});
}
void GcsResourceReportPoller::NodeResourceReportReceived(
const std::shared_ptr<PullState> state) {
absl::MutexLock guard(&mutex_);
inflight_pulls_--;
// Schedule the next pull. The scheduling `TryPullResourceReport` loop will handle
// validating that this node is still in the cluster.
state->next_pull_time = get_current_time_milli_() + poll_period_ms_;
to_pull_queue_.push_back(state);
polling_service_.post([this] { TryPullResourceReport(); });
}
} // namespace gcs
} // namespace ray

View file

@ -0,0 +1,143 @@
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/gcs/gcs_server/gcs_resource_manager.h"
#include "ray/rpc/node_manager/node_manager_client_pool.h"
namespace ray {
namespace gcs {
/// Polls raylets on a separate thread to update GCS's view of the cluster's resource
/// utilization. This class creates and manages a polling thread. All public methods are
/// thread safe.
class GcsResourceReportPoller {
/*
This class roughly polls each node independently (with the exception of max
concurrency). The process for polling a single node is as follows:
A new node joins the cluster.
1. (Main thread) Begin tracking the node, and begin the polling process.
Main polling procedure.
2. (Polling thread) Enqueue the node to be pulled.
3. (Polling thread) Node is popped off the back of the queue and RequestResourceReport
is sent to the raylet.
4. (Main thread) The raylet responds and the resource manager is updated. This section
is _not_ thread safe (i.e. should not modify the resource report poller state).
5. (Polling thread) The RequestResourceReport continuation runs, scheduling the next
pull time.
6. (Polling thread) The next pull time occurs, and step 2 is repeated.
The node leaves the cluster.
7. Untrack the node. The next time the main polling procedure comes across the node, it
should be dropped from the system.
*/
public:
GcsResourceReportPoller(
std::shared_ptr<GcsResourceManager> gcs_resource_manager,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
std::function<void(const rpc::ResourcesData &)> handle_resource_report,
/* Default values should only be changed for testing. */
std::function<int64_t(void)> get_current_time_milli =
[]() { return absl::GetCurrentTimeNanos() / (1000 * 1000); },
std::function<void(
const rpc::Address &, std::shared_ptr<rpc::NodeManagerClientPool> &,
std::function<void(const Status &, const rpc::RequestResourceReportReply &)>)>
request_report =
[](const rpc::Address &address,
std::shared_ptr<rpc::NodeManagerClientPool> &raylet_client_pool,
std::function<void(const Status &,
const rpc::RequestResourceReportReply &)>
callback) {
auto raylet_client = raylet_client_pool->GetOrConnectByAddress(address);
raylet_client->RequestResourceReport(callback);
});
~GcsResourceReportPoller();
void Initialize(const GcsInitData &gcs_init_data);
/// Start a thread to poll for resource updates.
void Start();
/// Stop polling for resource updates.
void Stop();
/// Event handler when a new node joins the cluster.
void HandleNodeAdded(const rpc::GcsNodeInfo &node_info) LOCKS_EXCLUDED(mutex_);
/// Event handler when a node leaves the cluster.
void HandleNodeRemoved(const rpc::GcsNodeInfo &node_info) LOCKS_EXCLUDED(mutex_);
private:
// An asio service which does the polling work.
instrumented_io_context polling_service_;
// The associated thread it runs on.
std::unique_ptr<std::thread> polling_thread_;
// Timer tick to check when we should do another poll.
PeriodicalRunner ticker_;
// The maximum number of pulls that can occur at once.
const uint64_t max_concurrent_pulls_;
// The number of ongoing pulls.
uint64_t inflight_pulls_;
// The resource manager which maintains GCS's view of the cluster's resource
// utilization.
std::shared_ptr<GcsResourceManager> gcs_resource_manager_;
// The shared, thread safe pool of raylet clients, which we use to minimize connections.
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool_;
// Handle receiving a resource report (e.g. update the resource manager).
// This function is guaranteed to be called on the main thread. It is not necessarily
// safe to call it from the polling thread.
std::function<void(const rpc::ResourcesData &)> handle_resource_report_;
// Return the current time in miliseconds
std::function<int64_t(void)> get_current_time_milli_;
// Send the `RequestResourceReport` RPC.
std::function<void(
const rpc::Address &, std::shared_ptr<rpc::NodeManagerClientPool> &,
std::function<void(const Status &, const rpc::RequestResourceReportReply &)>)>
request_report_;
// The minimum delay between two pull requests to the same thread.
const int64_t poll_period_ms_;
struct PullState {
NodeID node_id;
rpc::Address address;
int64_t last_pull_time;
int64_t next_pull_time;
PullState(NodeID _node_id, rpc::Address _address, int64_t _last_pull_time,
int64_t _next_pull_time)
: node_id(_node_id),
address(_address),
last_pull_time(_last_pull_time),
next_pull_time(_next_pull_time) {}
~PullState() {}
};
// A global lock for internal operations. This lock is shared between the main thread
// and polling thread, so we should be mindful about how long we hold it.
absl::Mutex mutex_;
// All the state regarding how to and when to send a new pull request to a raylet.
std::unordered_map<NodeID, std::shared_ptr<PullState>> nodes_ GUARDED_BY(mutex_);
// The set of all nodes which we are allowed to pull from. We can't necessarily pull
// from this list immediately because we limit the number of concurrent pulls. This
// queue should be sorted by time. The front should contain the first item to pull.
std::deque<std::shared_ptr<PullState>> to_pull_queue_ GUARDED_BY(mutex_);
/// Try to pull from the node. We may not be able to if it violates max concurrent
/// pulls. This method is thread safe.
void TryPullResourceReport() LOCKS_EXCLUDED(mutex_);
/// Pull resource report without validation.
void PullResourceReport(const std::shared_ptr<PullState> state);
/// A resource report was successfully pulled (and the resource manager was already
/// updated). This method is thread safe.
void NodeResourceReportReceived(const std::shared_ptr<PullState> state)
LOCKS_EXCLUDED(mutex_);
friend class GcsResourceReportPollerTest;
};
} // namespace gcs
} // namespace ray

View file

@ -101,6 +101,9 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) {
// Init stats handler.
InitStatsHandler();
// Init resource report polling.
InitResourceReportPolling(gcs_init_data);
// Install event listeners.
InstallEventListeners();
@ -132,6 +135,10 @@ void GcsServer::Stop() {
gcs_heartbeat_manager_->Stop();
if (config_.pull_based_resource_reporting) {
gcs_resource_report_poller_->Stop();
}
is_stopped_ = true;
RAY_LOG(INFO) << "GCS server stopped.";
}
@ -284,6 +291,19 @@ void GcsServer::InitTaskInfoHandler() {
rpc_server_.RegisterService(*task_info_service_);
}
void GcsServer::InitResourceReportPolling(const GcsInitData &gcs_init_data) {
if (config_.pull_based_resource_reporting) {
gcs_resource_report_poller_.reset(new GcsResourceReportPoller(
gcs_resource_manager_, raylet_client_pool_,
[this](const rpc::ResourcesData &report) {
gcs_resource_manager_->UpdateFromResourceReport(report);
}));
gcs_resource_report_poller_->Initialize(gcs_init_data);
gcs_resource_report_poller_->Start();
}
}
void GcsServer::InitStatsHandler() {
RAY_CHECK(gcs_table_storage_);
stats_handler_.reset(new rpc::DefaultStatsHandler(gcs_table_storage_));
@ -310,6 +330,9 @@ void GcsServer::InstallEventListeners() {
gcs_placement_group_manager_->SchedulePendingPlacementGroups();
gcs_actor_manager_->SchedulePendingActors();
gcs_heartbeat_manager_->AddNode(NodeID::FromBinary(node->node_id()));
if (config_.pull_based_resource_reporting) {
gcs_resource_report_poller_->HandleNodeAdded(*node);
}
});
gcs_node_manager_->AddNodeRemovedListener(
[this](std::shared_ptr<rpc::GcsNodeInfo> node) {
@ -320,6 +343,9 @@ void GcsServer::InstallEventListeners() {
gcs_placement_group_manager_->OnNodeDead(node_id);
gcs_actor_manager_->OnNodeDead(node_id);
raylet_client_pool_->Disconnect(NodeID::FromBinary(node->node_id()));
if (config_.pull_based_resource_reporting) {
gcs_resource_report_poller_->HandleNodeRemoved(*node);
}
});
// Install worker event listener.

View file

@ -20,6 +20,7 @@
#include "ray/gcs/gcs_server/gcs_object_manager.h"
#include "ray/gcs/gcs_server/gcs_redis_failure_detector.h"
#include "ray/gcs/gcs_server/gcs_resource_manager.h"
#include "ray/gcs/gcs_server/gcs_resource_report_poller.h"
#include "ray/gcs/gcs_server/gcs_resource_scheduler.h"
#include "ray/gcs/gcs_server/gcs_table_storage.h"
#include "ray/gcs/pubsub/gcs_pub_sub.h"
@ -41,6 +42,7 @@ struct GcsServerConfig {
bool retry_redis = true;
bool enable_sharding_conn = true;
std::string node_ip_address;
bool pull_based_resource_reporting;
};
class GcsNodeManager;
@ -112,6 +114,9 @@ class GcsServer {
/// Initialize stats handler.
void InitStatsHandler();
/// Initialize resource report polling.
void InitResourceReportPolling(const GcsInitData &gcs_init_data);
/// Install event listeners.
void InstallEventListeners();
@ -176,6 +181,8 @@ class GcsServer {
/// Stats handler and service.
std::unique_ptr<rpc::StatsHandler> stats_handler_;
std::unique_ptr<rpc::StatsGrpcService> stats_service_;
/// Resource report poller.
std::unique_ptr<GcsResourceReportPoller> gcs_resource_report_poller_;
/// The gcs worker manager.
std::unique_ptr<GcsWorkerManager> gcs_worker_manager_;
/// Worker info service.

View file

@ -101,6 +101,8 @@ int main(int argc, char *argv[]) {
gcs_server_config.redis_password = redis_password;
gcs_server_config.retry_redis = retry_redis;
gcs_server_config.node_ip_address = node_ip_address;
gcs_server_config.pull_based_resource_reporting =
RayConfig::instance().pull_based_resource_reporting();
ray::gcs::GcsServer gcs_server(gcs_server_config, main_service);
// Destroy the GCS server on a SIGTERM. The pointer to main_service is

View file

@ -72,7 +72,7 @@ TEST_F(GcsResourceManagerTest, TestResourceUsageAPI) {
rpc::ReportResourceUsageRequest report_request;
(*report_request.mutable_resources()->mutable_resources_available())["CPU"] = 2;
(*report_request.mutable_resources()->mutable_resources_total())["CPU"] = 2;
gcs_resource_manager_->UpdateNodeResourceUsage(node_id, report_request);
gcs_resource_manager_->UpdateNodeResourceUsage(node_id, report_request.resources());
gcs_resource_manager_->HandleGetAllResourceUsage(get_all_request, &get_all_reply,
send_reply_callback);

View file

@ -0,0 +1,271 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <memory>
#include "gtest/gtest.h"
#include "ray/gcs/gcs_server/gcs_resource_report_poller.h"
#include "ray/gcs/test/gcs_test_util.h"
namespace ray {
namespace gcs {
using ::testing::_;
class GcsResourceReportPollerTest : public ::testing::Test {
public:
GcsResourceReportPollerTest()
: current_time_(0),
gcs_resource_report_poller_(
nullptr, nullptr, [](const rpc::ResourcesData &) {},
[this]() { return current_time_; },
[this](const rpc::Address &address,
std::shared_ptr<rpc::NodeManagerClientPool> &client_pool,
std::function<void(const Status &,
const rpc::RequestResourceReportReply &)>
callback) {
if (request_report_) {
request_report_(address, client_pool, callback);
}
}
) {}
void RunPollingService() {
RAY_LOG(ERROR) << "Running service...";
gcs_resource_report_poller_.polling_service_.run();
gcs_resource_report_poller_.polling_service_.restart();
RAY_LOG(ERROR) << "Done";
}
void Tick(int64_t inc) {
current_time_ += inc;
gcs_resource_report_poller_.TryPullResourceReport();
}
int64_t current_time_;
std::function<void(
const rpc::Address &, std::shared_ptr<rpc::NodeManagerClientPool> &,
std::function<void(const Status &, const rpc::RequestResourceReportReply &)>)>
request_report_;
boost::asio::io_service io_service_;
gcs::GcsResourceReportPoller gcs_resource_report_poller_;
};
TEST_F(GcsResourceReportPollerTest, TestBasic) {
bool rpc_sent = false;
request_report_ =
[&rpc_sent](
const rpc::Address &, std::shared_ptr<rpc::NodeManagerClientPool> &,
std::function<void(const Status &, const rpc::RequestResourceReportReply &)>
callback) {
RAY_LOG(ERROR) << "Requesting";
rpc_sent = true;
rpc::RequestResourceReportReply temp;
callback(Status::OK(), temp);
};
auto node_info = Mocker::GenNodeInfo();
gcs_resource_report_poller_.HandleNodeAdded(*node_info);
// We try to send the rpc from the polling service.
ASSERT_FALSE(rpc_sent);
RunPollingService();
ASSERT_TRUE(rpc_sent);
// Not enough time has passed to send another tick.
rpc_sent = false;
Tick(1);
RunPollingService();
ASSERT_FALSE(rpc_sent);
// Now enough time has passed.
Tick(100);
RunPollingService();
ASSERT_TRUE(rpc_sent);
}
TEST_F(GcsResourceReportPollerTest, TestFailedRpc) {
bool rpc_sent = false;
request_report_ =
[&rpc_sent](
const rpc::Address &, std::shared_ptr<rpc::NodeManagerClientPool> &,
std::function<void(const Status &, const rpc::RequestResourceReportReply &)>
callback) {
RAY_LOG(ERROR) << "Requesting";
rpc_sent = true;
rpc::RequestResourceReportReply temp;
callback(Status::TimedOut("error"), temp);
};
auto node_info = Mocker::GenNodeInfo();
gcs_resource_report_poller_.HandleNodeAdded(*node_info);
// We try to send the rpc from the polling service.
ASSERT_FALSE(rpc_sent);
RunPollingService();
ASSERT_TRUE(rpc_sent);
// Not enough time has passed to send another tick.
rpc_sent = false;
Tick(1);
RunPollingService();
ASSERT_FALSE(rpc_sent);
// Now enough time has passed.
Tick(100);
RunPollingService();
ASSERT_TRUE(rpc_sent);
}
TEST_F(GcsResourceReportPollerTest, TestMaxInFlight) {
std::vector<std::shared_ptr<rpc::GcsNodeInfo>> nodes;
for (int i = 0; i < 200; i++) {
nodes.emplace_back(Mocker::GenNodeInfo());
}
std::vector<
std::function<void(const Status &, const rpc::RequestResourceReportReply &)>>
callbacks;
int num_rpcs_sent = 0;
request_report_ =
[&](const rpc::Address &, std::shared_ptr<rpc::NodeManagerClientPool> &,
std::function<void(const Status &, const rpc::RequestResourceReportReply &)>
callback) {
num_rpcs_sent++;
callbacks.emplace_back(callback);
};
for (const auto &node : nodes) {
gcs_resource_report_poller_.HandleNodeAdded(*node);
}
RunPollingService();
ASSERT_EQ(num_rpcs_sent, 100);
for (int i = 0; i < 100; i++) {
const auto &callback = callbacks[i];
rpc::RequestResourceReportReply temp;
callback(Status::OK(), temp);
}
RunPollingService();
// Requests are sent as soon as possible (don't wait for the next tick).
ASSERT_EQ(num_rpcs_sent, 200);
}
TEST_F(GcsResourceReportPollerTest, TestNodeRemoval) {
std::vector<std::shared_ptr<rpc::GcsNodeInfo>> nodes;
for (int i = 0; i < 200; i++) {
nodes.emplace_back(Mocker::GenNodeInfo());
}
std::vector<
std::function<void(const Status &, const rpc::RequestResourceReportReply &)>>
callbacks;
int num_rpcs_sent = 0;
request_report_ =
[&](const rpc::Address &, std::shared_ptr<rpc::NodeManagerClientPool> &,
std::function<void(const Status &, const rpc::RequestResourceReportReply &)>
callback) {
num_rpcs_sent++;
callbacks.emplace_back(callback);
};
for (const auto &node : nodes) {
gcs_resource_report_poller_.HandleNodeAdded(*node);
}
RunPollingService();
ASSERT_EQ(num_rpcs_sent, 100);
Tick(50);
RunPollingService();
ASSERT_EQ(num_rpcs_sent, 100);
// Remove the nodes while there are inflight rpcs.
// Since we added all the nodes at once, the request order is LIFO.
for (int i = 199; i >= 100; i--) {
gcs_resource_report_poller_.HandleNodeRemoved(*nodes[i]);
}
for (int i = 0; i < 100; i++) {
const auto &callback = callbacks[i];
rpc::RequestResourceReportReply temp;
callback(Status::TimedOut("Timed out"), temp);
}
RunPollingService();
// Requests are sent as soon as possible (don't wait for the next tick).
ASSERT_EQ(num_rpcs_sent, 200);
}
TEST_F(GcsResourceReportPollerTest, TestPrioritizeNewNodes) {
std::vector<std::shared_ptr<rpc::GcsNodeInfo>> nodes;
for (int i = 0; i < 200; i++) {
nodes.emplace_back(Mocker::GenNodeInfo());
}
std::vector<
std::function<void(const Status &, const rpc::RequestResourceReportReply &)>>
callbacks;
std::unordered_set<NodeID> nodes_requested;
int num_rpcs_sent = 0;
request_report_ =
[&](const rpc::Address &address, std::shared_ptr<rpc::NodeManagerClientPool> &,
std::function<void(const Status &, const rpc::RequestResourceReportReply &)>
callback) {
num_rpcs_sent++;
callbacks.emplace_back(callback);
auto node_id = NodeID::FromBinary(address.raylet_id());
nodes_requested.insert(node_id);
};
for (int i = 0; i < 100; i++) {
const auto &node = nodes[i];
gcs_resource_report_poller_.HandleNodeAdded(*node);
}
RunPollingService();
ASSERT_EQ(num_rpcs_sent, 100);
Tick(50);
RunPollingService();
ASSERT_EQ(num_rpcs_sent, 100);
ASSERT_EQ(nodes_requested.size(), 100);
for (int i = 0; i < 100; i++) {
const auto &callback = callbacks[i];
rpc::RequestResourceReportReply temp;
callback(Status::OK(), temp);
}
RunPollingService();
ASSERT_EQ(num_rpcs_sent, 100);
ASSERT_EQ(nodes_requested.size(), 100);
// The first wave of nodes should run at t=150.
Tick(50);
RunPollingService();
ASSERT_EQ(num_rpcs_sent, 100);
ASSERT_EQ(nodes_requested.size(), 100);
// We shouuld now request from the new nodes before repeating the first wave of nodes.
for (int i = 100; i < 200; i++) {
const auto &node = nodes[i];
gcs_resource_report_poller_.HandleNodeAdded(*node);
}
RunPollingService();
ASSERT_EQ(num_rpcs_sent, 200);
ASSERT_EQ(nodes_requested.size(), 200);
}
} // namespace gcs
} // namespace ray

View file

@ -169,6 +169,8 @@ int main(int argc, char *argv[]) {
node_manager_config.min_worker_port = min_worker_port;
node_manager_config.max_worker_port = max_worker_port;
node_manager_config.worker_ports = worker_ports;
node_manager_config.pull_based_resource_reporting =
RayConfig::instance().pull_based_resource_reporting();
if (!python_worker_command.empty()) {
node_manager_config.worker_commands.emplace(

View file

@ -467,9 +467,22 @@ void NodeManager::FillResourceReport(rpc::ResourcesData &resources_data) {
resources_data.set_should_global_gc(true);
should_global_gc_ = false;
}
// Trigger local GC if needed. This throttles the frequency of local GC calls
// to at most once per heartbeat interval.
auto now = absl::GetCurrentTimeNanos();
if ((should_local_gc_ || now - last_local_gc_ns_ > local_gc_interval_ns_) &&
now - last_local_gc_ns_ > local_gc_min_interval_ns_) {
DoLocalGC();
should_local_gc_ = false;
last_local_gc_ns_ = now;
}
}
void NodeManager::ReportResourceUsage() {
if (initial_config_.pull_based_resource_reporting) {
return;
}
uint64_t now_ms = current_time_ms();
uint64_t interval = now_ms - last_resource_report_at_ms_;
if (interval >
@ -485,16 +498,6 @@ void NodeManager::ReportResourceUsage() {
auto resources_data = std::make_shared<rpc::ResourcesData>();
FillResourceReport(*resources_data);
// Trigger local GC if needed. This throttles the frequency of local GC calls
// to at most once per heartbeat interval.
auto now = absl::GetCurrentTimeNanos();
if ((should_local_gc_ || now - last_local_gc_ns_ > local_gc_interval_ns_) &&
now - last_local_gc_ns_ > local_gc_min_interval_ns_) {
DoLocalGC();
should_local_gc_ = false;
last_local_gc_ns_ = now;
}
if (resources_data->resources_total_size() > 0 ||
resources_data->resources_available_changed() ||
resources_data->resource_load_changed() || resources_data->should_global_gc()) {
@ -1396,6 +1399,7 @@ void NodeManager::ProcessPushErrorRequestMessage(const uint8_t *message_data) {
void NodeManager::HandleRequestResourceReport(
const rpc::RequestResourceReportRequest &request,
rpc::RequestResourceReportReply *reply, rpc::SendReplyCallback send_reply_callback) {
// RAY_LOG(ERROR) << "Resource report requested";
auto resources_data = reply->mutable_resources();
FillResourceReport(*resources_data);

View file

@ -103,6 +103,9 @@ struct NodeManagerConfig {
int max_io_workers;
// The minimum object size that can be spilled by each spill operation.
int64_t min_spilling_size;
// Whether to the raylet should push resource reports to GCS or wait for GCS to pull the
// reports from raylets.
bool pull_based_resource_reporting;
};
class HeartbeatSender {

View file

@ -202,6 +202,7 @@ class ClusterTaskManager : public ClusterTaskManagerInterface {
void SpillWaitingTasks();
const NodeID &self_node_id_;
/// Responsible for resource tracking/view of the cluster.
std::shared_ptr<ClusterResourceScheduler> cluster_resource_scheduler_;
/// Class to make task dependencies to be local.
TaskDependencyManagerInterface &task_dependency_manager_;