Raylet request resource report endpoint (#14291)

* .

* done?

* raylet side done?

* .

* .

* .

* client

* .

* fix tests

* make ci happy

* lint

* cleanup

* clang sucks

Co-authored-by: Alex Wu <alex@anyscale.com>
This commit is contained in:
Alex Wu 2021-03-09 09:50:50 -08:00 committed by GitHub
parent 9a7fbd3cdf
commit ba6cebe30f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 130 additions and 55 deletions

View file

@ -240,6 +240,12 @@ struct GcsServerMocker {
void GetSystemConfig(const ray::rpc::ClientCallback<ray::rpc::GetSystemConfigReply>
&callback) override {}
/// ResourceRequestInterface
void RequestResourceReport(
const rpc::ClientCallback<rpc::RequestResourceReportReply> &callback) override {
RAY_CHECK(false) << "Unused";
};
~MockRayletClient() {}
int num_workers_requested = 0;

View file

@ -40,7 +40,10 @@ python_grpc_compile(
proto_library(
name = "node_manager_proto",
srcs = ["node_manager.proto"],
deps = [":common_proto"],
deps = [
":common_proto",
":gcs_proto",
],
)
cc_proto_library(

View file

@ -17,6 +17,7 @@ syntax = "proto3";
package ray.rpc;
import "src/ray/protobuf/common.proto";
import "src/ray/protobuf/gcs.proto";
// Request a worker from the raylet with the specified resources.
message RequestWorkerLeaseRequest {
@ -216,8 +217,18 @@ message GetSystemConfigReply {
string system_config = 1;
}
message RequestResourceReportRequest {
}
message RequestResourceReportReply {
ResourcesData resources = 1;
}
// Service for inter-node-manager communication.
service NodeManagerService {
// Request the current resource usage from this raylet
rpc RequestResourceReport(RequestResourceReportRequest)
returns (RequestResourceReportReply);
// Request a worker from the raylet.
rpc RequestWorkerLease(RequestWorkerLeaseRequest) returns (RequestWorkerLeaseReply);
// Release a worker back to its raylet.

View file

@ -448,11 +448,10 @@ void NodeManager::HandleJobFinished(const JobID &job_id, const JobTableData &job
}
}
void NodeManager::ReportResourceUsage() {
auto resources_data = std::make_shared<rpc::ResourcesData>();
resources_data->set_node_id(self_node_id_.Binary());
resources_data->set_node_manager_address(initial_config_.node_manager_address);
// Update local chche from gcs remote cache, this is needed when gcs restart.
void NodeManager::FillResourceReport(rpc::ResourcesData &resources_data) {
resources_data.set_node_id(self_node_id_.Binary());
resources_data.set_node_manager_address(initial_config_.node_manager_address);
// Update local cache from gcs remote cache, this is needed when gcs restart.
// We should always keep the cache view consistent.
cluster_resource_scheduler_->UpdateLastResourceUsage(
gcs_client_->NodeResources().GetLastResourceUsage());
@ -461,9 +460,15 @@ void NodeManager::ReportResourceUsage() {
// Set the global gc bit on the outgoing heartbeat message.
if (should_global_gc_) {
resources_data->set_should_global_gc(true);
resources_data.set_should_global_gc(true);
resources_data.set_should_global_gc(true);
should_global_gc_ = false;
}
}
void NodeManager::ReportResourceUsage() {
auto resources_data = std::make_shared<rpc::ResourcesData>();
FillResourceReport(*resources_data);
// Trigger local GC if needed. This throttles the frequency of local GC calls
// to at most once per heartbeat interval.
@ -1314,6 +1319,15 @@ void NodeManager::ProcessPushErrorRequestMessage(const uint8_t *message_data) {
RAY_CHECK_OK(gcs_client_->Errors().AsyncReportJobError(error_data_ptr, nullptr));
}
void NodeManager::HandleRequestResourceReport(
const rpc::RequestResourceReportRequest &request,
rpc::RequestResourceReportReply *reply, rpc::SendReplyCallback send_reply_callback) {
auto resources_data = reply->mutable_resources();
FillResourceReport(*resources_data);
send_reply_callback(Status::OK(), nullptr, nullptr);
}
void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest &request,
rpc::RequestWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback) {

View file

@ -229,14 +229,15 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
void ResourceDeleted(const NodeID &node_id,
const std::vector<std::string> &resource_names);
/// Send heartbeats to the GCS.
void Heartbeat();
/// Evaluates the local infeasible queue to check if any tasks can be scheduled.
/// This is called whenever there's an update to the resources on the local node.
/// \return Void.
void TryLocalInfeasibleTaskScheduling();
/// Fill out the resource report. This can be called by either method to transport the
/// report to GCS.
void FillResourceReport(rpc::ResourcesData &resources_data);
/// Report resource usage to the GCS.
void ReportResourceUsage();
@ -476,6 +477,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// \return Status indicating whether setup was successful.
ray::Status SetupPlasmaSubscription();
/// Handle a `RequestResourceReport` request.
void HandleRequestResourceReport(const rpc::RequestResourceReportRequest &request,
rpc::RequestResourceReportReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Handle a `PrepareBundleResources` request.
void HandlePrepareBundleResources(const rpc::PrepareBundleResourcesRequest &request,
rpc::PrepareBundleResourcesReply *reply,
@ -590,6 +596,14 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
bool GetObjectsFromPlasma(const std::vector<ObjectID> &object_ids,
std::vector<std::unique_ptr<RayObject>> *results);
/// Populate the relevant parts of the heartbeat table. This is intended for
/// sending raylet <-> gcs heartbeats. In particular, this should fill in
/// resource_load and resource_load_by_shape.
///
/// \param Output parameter. `resource_load` and `resource_load_by_shape` are the only
/// fields used.
void FillResourceUsage(rpc::ResourcesData &data);
/// Disconnect a client.
///
/// \param client The client that sent the message.

View file

@ -979,8 +979,7 @@ void ClusterResourceScheduler::UpdateLastResourceUsage(
last_report_resources_.reset(new NodeResources(node_resources));
}
void ClusterResourceScheduler::FillResourceUsage(
std::shared_ptr<rpc::ResourcesData> resources_data) {
void ClusterResourceScheduler::FillResourceUsage(rpc::ResourcesData &resources_data) {
NodeResources resources;
RAY_CHECK(GetNodeResources(local_node_id_, &resources))
@ -1019,12 +1018,12 @@ void ClusterResourceScheduler::FillResourceUsage(
const auto &last_capacity = last_report_resources_->predefined_resources[i];
// Note: available may be negative, but only report positive to GCS.
if (capacity.available != last_capacity.available && capacity.available > 0) {
resources_data->set_resources_available_changed(true);
(*resources_data->mutable_resources_available())[label] =
resources_data.set_resources_available_changed(true);
(*resources_data.mutable_resources_available())[label] =
capacity.available.Double();
}
if (capacity.total != last_capacity.total) {
(*resources_data->mutable_resources_total())[label] = capacity.total.Double();
(*resources_data.mutable_resources_total())[label] = capacity.total.Double();
}
}
for (const auto &it : resources.custom_resources) {
@ -1034,12 +1033,12 @@ void ClusterResourceScheduler::FillResourceUsage(
const auto &label = string_to_int_map_.Get(custom_id);
// Note: available may be negative, but only report positive to GCS.
if (capacity.available != last_capacity.available && capacity.available > 0) {
resources_data->set_resources_available_changed(true);
(*resources_data->mutable_resources_available())[label] =
resources_data.set_resources_available_changed(true);
(*resources_data.mutable_resources_available())[label] =
capacity.available.Double();
}
if (capacity.total != last_capacity.total) {
(*resources_data->mutable_resources_total())[label] = capacity.total.Double();
(*resources_data.mutable_resources_total())[label] = capacity.total.Double();
}
}
if (resources != *last_report_resources_.get()) {

View file

@ -385,7 +385,7 @@ class ClusterResourceScheduler : public ClusterResourceSchedulerInterface {
///
/// \param Output parameter. `resources_available` and `resources_total` are the only
/// fields used.
void FillResourceUsage(std::shared_ptr<rpc::ResourcesData> resources_data) override;
void FillResourceUsage(rpc::ResourcesData &resources_data) override;
/// Update last report resources local cache from gcs cache,
/// this is needed when gcs fo.

View file

@ -62,7 +62,7 @@ class ClusterResourceSchedulerInterface {
///
/// \param Output parameter. `resources_available` and `resources_total` are the only
/// fields used.
virtual void FillResourceUsage(std::shared_ptr<rpc::ResourcesData> data) = 0;
virtual void FillResourceUsage(rpc::ResourcesData &data) = 0;
/// Return local resources in human-readable string form.
virtual std::string GetLocalResourceViewString() const = 0;

View file

@ -1052,11 +1052,11 @@ TEST_F(ClusterResourceSchedulerTest, ResourceUsageReportTest) {
resource_scheduler.AddOrUpdateNode(12345, other_node_resources);
{ // Cluster is idle.
auto data = std::make_shared<rpc::ResourcesData>();
rpc::ResourcesData data;
resource_scheduler.FillResourceUsage(data);
auto available = data->resources_available();
auto total = data->resources_total();
auto available = data.resources_available();
auto total = data.resources_total();
ASSERT_EQ(available[kCPU_ResourceLabel], 1);
ASSERT_EQ(available[kGPU_ResourceLabel], 2);
@ -1090,12 +1090,12 @@ TEST_F(ClusterResourceSchedulerTest, ResourceUsageReportTest) {
{"1", 0.1},
});
resource_scheduler.AllocateLocalTaskResources(allocation_map, allocations);
auto data = std::make_shared<rpc::ResourcesData>();
rpc::ResourcesData data;
resource_scheduler.UpdateLastResourceUsage(std::make_shared<SchedulingResources>());
resource_scheduler.FillResourceUsage(data);
auto available = data->resources_available();
auto total = data->resources_total();
auto available = data.resources_available();
auto total = data.resources_total();
ASSERT_EQ(available[kCPU_ResourceLabel], 0.9);
ASSERT_EQ(available[kGPU_ResourceLabel], 2);
@ -1133,38 +1133,38 @@ TEST_F(ClusterResourceSchedulerTest, ObjectStoreMemoryUsageTest) {
resource_scheduler.AddOrUpdateNode(12345, other_node_resources);
{
auto data = std::make_shared<rpc::ResourcesData>();
rpc::ResourcesData data;
resource_scheduler.FillResourceUsage(data);
auto available = data->resources_available();
auto total = data->resources_total();
auto available = data.resources_available();
auto total = data.resources_total();
ASSERT_EQ(available["object_store_memory"], 750 * 1024 * 1024);
ASSERT_EQ(total["object_store_memory"], 1000 * 1024 * 1024);
}
used_object_store_memory = 450 * 1024 * 1024;
{
auto data = std::make_shared<rpc::ResourcesData>();
rpc::ResourcesData data;
resource_scheduler.FillResourceUsage(data);
auto available = data->resources_available();
auto total = data->resources_total();
auto available = data.resources_available();
auto total = data.resources_total();
ASSERT_EQ(available["object_store_memory"], 550 * 1024 * 1024);
}
used_object_store_memory = 0;
{
auto data = std::make_shared<rpc::ResourcesData>();
rpc::ResourcesData data;
resource_scheduler.FillResourceUsage(data);
auto available = data->resources_available();
auto total = data->resources_total();
auto available = data.resources_available();
auto total = data.resources_total();
ASSERT_EQ(available["object_store_memory"], 1000 * 1024 * 1024);
}
used_object_store_memory = 9999999999;
{
auto data = std::make_shared<rpc::ResourcesData>();
rpc::ResourcesData data;
resource_scheduler.FillResourceUsage(data);
auto available = data->resources_available();
auto total = data->resources_total();
auto available = data.resources_available();
auto total = data.resources_total();
ASSERT_EQ(available["object_store_memory"], 0);
}
}
@ -1183,7 +1183,7 @@ TEST_F(ClusterResourceSchedulerTest, DirtyLocalViewTest) {
task_allocation = std::make_shared<TaskResourceInstances>();
ASSERT_FALSE(resource_scheduler.AllocateLocalTaskResources(task_spec, task_allocation));
// View of local resources is not affected by resource usage report.
auto data = std::make_shared<rpc::ResourcesData>();
rpc::ResourcesData data;
resource_scheduler.FillResourceUsage(data);
ASSERT_FALSE(resource_scheduler.AllocateLocalTaskResources(task_spec, task_allocation));
@ -1191,7 +1191,7 @@ TEST_F(ClusterResourceSchedulerTest, DirtyLocalViewTest) {
// Remote node reports updated resource availability.
resource_scheduler.AddOrUpdateNode("remote", {{"CPU", 2.}},
{{"CPU", num_slots_available}});
auto data = std::make_shared<rpc::ResourcesData>();
rpc::ResourcesData data;
int64_t t;
bool is_infeasible;
for (int i = 0; i < 3; i++) {

View file

@ -463,16 +463,16 @@ void ClusterTaskManager::FillPendingActorInfo(rpc::GetNodeStatsReply *reply) con
}
}
void ClusterTaskManager::FillResourceUsage(std::shared_ptr<rpc::ResourcesData> data) {
void ClusterTaskManager::FillResourceUsage(rpc::ResourcesData &data) {
if (max_resource_shapes_per_load_report_ == 0) {
return;
}
// TODO (WangTao): Find a way to check if load changed and combine it with light
// heartbeat. Now we just report it every time.
data->set_resource_load_changed(true);
auto resource_loads = data->mutable_resource_load();
data.set_resource_load_changed(true);
auto resource_loads = data.mutable_resource_load();
auto resource_load_by_shape =
data->mutable_resource_load_by_shape()->mutable_resource_demands();
data.mutable_resource_load_by_shape()->mutable_resource_demands();
int num_reported = 0;

View file

@ -118,7 +118,7 @@ class ClusterTaskManager : public ClusterTaskManagerInterface {
///
/// \param Output parameter. `resource_load` and `resource_load_by_shape` are the only
/// fields used.
void FillResourceUsage(std::shared_ptr<rpc::ResourcesData> data) override;
void FillResourceUsage(rpc::ResourcesData &data) override;
/// Return if any tasks are pending resource acquisition.
///

View file

@ -61,7 +61,7 @@ class ClusterTaskManagerInterface {
///
/// \param Output parameter. `resource_load` and `resource_load_by_shape` are the only
/// fields used.
virtual void FillResourceUsage(std::shared_ptr<rpc::ResourcesData> data) = 0;
virtual void FillResourceUsage(rpc::ResourcesData &data) = 0;
/// Populate the list of pending or infeasible actor tasks for node stats.
///

View file

@ -576,11 +576,11 @@ TEST_F(ClusterTaskManagerTest, HeartbeatTest) {
}
{
auto data = std::make_shared<rpc::ResourcesData>();
rpc::ResourcesData data;
task_manager_.FillResourceUsage(data);
auto load_by_shape =
data->mutable_resource_load_by_shape()->mutable_resource_demands();
data.mutable_resource_load_by_shape()->mutable_resource_demands();
ASSERT_EQ(load_by_shape->size(), 3);
std::vector<std::vector<unsigned int>> expected = {
@ -655,9 +655,9 @@ TEST_F(ClusterTaskManagerTest, BacklogReportTest) {
ASSERT_EQ(node_info_calls_, 0);
{ // No tasks can run because the worker pool is empty.
auto data = std::make_shared<rpc::ResourcesData>();
rpc::ResourcesData data;
task_manager_.FillResourceUsage(data);
auto resource_load_by_shape = data->resource_load_by_shape();
auto resource_load_by_shape = data.resource_load_by_shape();
auto shape1 = resource_load_by_shape.resource_demands()[0];
ASSERT_EQ(shape1.backlog_size(), 55);
@ -672,9 +672,9 @@ TEST_F(ClusterTaskManagerTest, BacklogReportTest) {
task_manager_.ScheduleAndDispatchTasks();
{
auto data = std::make_shared<rpc::ResourcesData>();
rpc::ResourcesData data;
task_manager_.FillResourceUsage(data);
auto resource_load_by_shape = data->resource_load_by_shape();
auto resource_load_by_shape = data.resource_load_by_shape();
auto shape1 = resource_load_by_shape.resource_demands()[0];
ASSERT_TRUE(callback_occurred);
@ -690,9 +690,9 @@ TEST_F(ClusterTaskManagerTest, BacklogReportTest) {
RAY_LOG(ERROR) << "Finished cancelling tasks";
{
auto data = std::make_shared<rpc::ResourcesData>();
rpc::ResourcesData data;
task_manager_.FillResourceUsage(data);
auto resource_load_by_shape = data->resource_load_by_shape();
auto resource_load_by_shape = data.resource_load_by_shape();
ASSERT_EQ(resource_load_by_shape.resource_demands().size(), 0);
while (!leased_workers_.empty()) {

View file

@ -415,6 +415,12 @@ void raylet::RayletClient::GlobalGC(
grpc_client_->GlobalGC(request, callback);
}
void raylet::RayletClient::RequestResourceReport(
const rpc::ClientCallback<rpc::RequestResourceReportReply> &callback) {
rpc::RequestResourceReportRequest request;
grpc_client_->RequestResourceReport(request, callback);
}
void raylet::RayletClient::SubscribeToPlasma(const ObjectID &object_id,
const rpc::Address &owner_address) {
flatbuffers::FlatBufferBuilder fbb;

View file

@ -139,10 +139,20 @@ class DependencyWaiterInterface {
virtual ~DependencyWaiterInterface(){};
};
/// Inteface for getting resource reports.
class ResourceRequestInterface {
public:
virtual void RequestResourceReport(
const rpc::ClientCallback<rpc::RequestResourceReportReply> &callback) = 0;
virtual ~ResourceRequestInterface(){};
};
class RayletClientInterface : public PinObjectsInterface,
public WorkerLeaseInterface,
public DependencyWaiterInterface,
public ResourceReserveInterface {
public ResourceReserveInterface,
public ResourceRequestInterface {
public:
virtual ~RayletClientInterface(){};
@ -399,6 +409,9 @@ class RayletClient : public RayletClientInterface {
void GlobalGC(const rpc::ClientCallback<rpc::GlobalGCReply> &callback);
void RequestResourceReport(
const rpc::ClientCallback<rpc::RequestResourceReportReply> &callback) override;
// Subscribe to receive notification on plasma object
void SubscribeToPlasma(const ObjectID &object_id, const rpc::Address &owner_address);

View file

@ -70,6 +70,9 @@ class NodeManagerWorkerClient
return std::shared_ptr<NodeManagerWorkerClient>(instance);
}
/// Request a resource report.
VOID_RPC_CLIENT_METHOD(NodeManagerService, RequestResourceReport, grpc_client_, )
/// Request a worker lease.
VOID_RPC_CLIENT_METHOD(NodeManagerService, RequestWorkerLease, grpc_client_, )

View file

@ -24,6 +24,7 @@ namespace rpc {
/// NOTE: See src/ray/core_worker/core_worker.h on how to add a new grpc handler.
#define RAY_NODE_MANAGER_RPC_HANDLERS \
RPC_SERVICE_HANDLER(NodeManagerService, RequestResourceReport) \
RPC_SERVICE_HANDLER(NodeManagerService, RequestWorkerLease) \
RPC_SERVICE_HANDLER(NodeManagerService, ReturnWorker) \
RPC_SERVICE_HANDLER(NodeManagerService, ReleaseUnusedWorkers) \
@ -54,6 +55,11 @@ class NodeManagerServiceHandler {
/// \param[out] reply The reply message.
/// \param[in] send_reply_callback The callback to be called when the request is done.
virtual void HandleRequestResourceReport(
const rpc::RequestResourceReportRequest &request,
rpc::RequestResourceReportReply *reply,
rpc::SendReplyCallback send_reply_callback) = 0;
virtual void HandleRequestWorkerLease(const RequestWorkerLeaseRequest &request,
RequestWorkerLeaseReply *reply,
SendReplyCallback send_reply_callback) = 0;