mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
[Core] Added ownership-based object directory metrics, fixed raylet metric bug. (#14855)
* Added ownership-based object directory metrics. * Updated OBOD metric descriptions. * Dump OBOD metrics in debug string. * Added e2e tests for metrics.
This commit is contained in:
parent
ade5857aa3
commit
ed46d8bf45
8 changed files with 153 additions and 5 deletions
|
@ -14,6 +14,39 @@ from ray._private.metrics_agent import PrometheusServiceDiscoveryWriter
|
|||
from ray.util.metrics import Count, Histogram, Gauge
|
||||
from ray.test_utils import wait_for_condition, SignalActor, fetch_prometheus
|
||||
|
||||
# This list of metrics should be kept in sync with src/ray/stats/metric_defs.h
|
||||
# NOTE: Commented out metrics are not available in this test.
|
||||
# TODO(Clark): Find ways to trigger commented out metrics in cluster setup.
|
||||
_METRICS = [
|
||||
"ray_gcs_latency_sum",
|
||||
# "ray_local_available_resource",
|
||||
# "ray_local_total_resource",
|
||||
# "ray_live_actors",
|
||||
# "ray_restarting_actors",
|
||||
"ray_object_store_available_memory",
|
||||
"ray_object_store_used_memory",
|
||||
"ray_object_store_num_local_objects",
|
||||
"ray_object_manager_num_pull_requests",
|
||||
"ray_object_directory_subscriptions",
|
||||
"ray_object_directory_updates",
|
||||
"ray_object_directory_lookups",
|
||||
"ray_object_directory_added_locations",
|
||||
"ray_object_directory_removed_locations",
|
||||
# "ray_num_infeasible_tasks",
|
||||
"ray_heartbeat_report_ms_sum",
|
||||
"ray_process_startup_time_ms_sum",
|
||||
"ray_avg_num_scheduled_tasks",
|
||||
"ray_avg_num_executed_tasks",
|
||||
"ray_avg_num_spilled_back_tasks",
|
||||
# "ray_object_spilling_bandwidth_mb",
|
||||
# "ray_object_restoration_bandwidth_mb",
|
||||
# "ray_unintentional_worker_failures_total",
|
||||
# "ray_node_failure_total",
|
||||
"ray_pending_actors",
|
||||
"ray_pending_placement_groups",
|
||||
"ray_outbound_heartbeat_size_kb_sum",
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def _setup_cluster_for_test(ray_start_cluster):
|
||||
|
@ -96,8 +129,9 @@ def test_metrics_export_end_to_end(_setup_cluster_for_test):
|
|||
]:
|
||||
assert any(metric_name in full_name for full_name in metric_names)
|
||||
|
||||
# Make sure GCS server metrics are recorded.
|
||||
assert "ray_outbound_heartbeat_size_kb_sum" in metric_names
|
||||
# Make sure metrics are recorded.
|
||||
for metric in _METRICS:
|
||||
assert metric in metric_names
|
||||
|
||||
# Make sure the numeric values are correct
|
||||
test_counter_sample = [
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
// limitations under the License.
|
||||
|
||||
#include "ray/object_manager/object_directory.h"
|
||||
#include "ray/stats/stats.h"
|
||||
|
||||
namespace ray {
|
||||
|
||||
|
@ -287,6 +288,10 @@ ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id,
|
|||
return status;
|
||||
}
|
||||
|
||||
void ObjectDirectory::RecordMetrics(uint64_t duration_ms) {
|
||||
stats::ObjectDirectoryLocationSubscriptions().Record(listeners_.size());
|
||||
}
|
||||
|
||||
std::string ObjectDirectory::DebugString() const {
|
||||
std::stringstream result;
|
||||
result << "ObjectDirectory:";
|
||||
|
|
|
@ -128,6 +128,9 @@ class ObjectDirectoryInterface {
|
|||
const ObjectID &object_id, const NodeID &node_id,
|
||||
const object_manager::protocol::ObjectInfoT &object_info) = 0;
|
||||
|
||||
/// Record metrics.
|
||||
virtual void RecordMetrics(uint64_t duration_ms) = 0;
|
||||
|
||||
/// Returns debug string for class.
|
||||
///
|
||||
/// \return string.
|
||||
|
@ -172,6 +175,8 @@ class ObjectDirectory : public ObjectDirectoryInterface {
|
|||
const ObjectID &object_id, const NodeID &node_id,
|
||||
const object_manager::protocol::ObjectInfoT &object_info) override;
|
||||
|
||||
void RecordMetrics(uint64_t duration_ms) override;
|
||||
|
||||
std::string DebugString() const override;
|
||||
|
||||
/// ObjectDirectory should not be copied.
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
// limitations under the License.
|
||||
|
||||
#include "ray/object_manager/ownership_based_object_directory.h"
|
||||
#include "ray/stats/stats.h"
|
||||
|
||||
namespace ray {
|
||||
|
||||
|
@ -136,6 +137,8 @@ ray::Status OwnershipBasedObjectDirectory::ReportObjectAdded(
|
|||
request.set_object_id(object_id.Binary());
|
||||
request.set_node_id(node_id.Binary());
|
||||
|
||||
metrics_num_object_locations_added_++;
|
||||
|
||||
rpc_client->AddObjectLocationOwner(
|
||||
request, [worker_id, object_id, node_id](
|
||||
Status status, const rpc::AddObjectLocationOwnerReply &reply) {
|
||||
|
@ -170,6 +173,8 @@ ray::Status OwnershipBasedObjectDirectory::ReportObjectRemoved(
|
|||
request.set_object_id(object_id.Binary());
|
||||
request.set_node_id(node_id.Binary());
|
||||
|
||||
metrics_num_object_locations_removed_++;
|
||||
|
||||
rpc_client->RemoveObjectLocationOwner(
|
||||
request, [worker_id, object_id, node_id](
|
||||
Status status, const rpc::RemoveObjectLocationOwnerReply &reply) {
|
||||
|
@ -207,6 +212,7 @@ void OwnershipBasedObjectDirectory::SubscriptionCallback(
|
|||
<< " locations, spilled_url: " << it->second.spilled_url
|
||||
<< ", spilled node ID: " << it->second.spilled_node_id
|
||||
<< ", object size: " << it->second.object_size;
|
||||
metrics_num_object_location_updates_++;
|
||||
// Copy the callbacks so that the callbacks can unsubscribe without interrupting
|
||||
// looping over the callbacks.
|
||||
auto callbacks = it->second.callbacks;
|
||||
|
@ -309,6 +315,7 @@ ray::Status OwnershipBasedObjectDirectory::UnsubscribeObjectLocations(
|
|||
ray::Status OwnershipBasedObjectDirectory::LookupLocations(
|
||||
const ObjectID &object_id, const rpc::Address &owner_address,
|
||||
const OnLocationsFound &callback) {
|
||||
metrics_num_object_location_lookups_++;
|
||||
auto it = listeners_.find(object_id);
|
||||
if (it != listeners_.end() && it->second.subscribed) {
|
||||
// If we have locations cached due to a concurrent SubscribeObjectLocations
|
||||
|
@ -377,10 +384,48 @@ ray::Status OwnershipBasedObjectDirectory::LookupLocations(
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
void OwnershipBasedObjectDirectory::RecordMetrics(uint64_t duration_ms) {
|
||||
stats::ObjectDirectoryLocationSubscriptions.Record(listeners_.size());
|
||||
|
||||
// Record number of object location updates per second.
|
||||
metrics_num_object_location_updates_per_second_ =
|
||||
(double)metrics_num_object_location_updates_ * (1000.0 / (double)duration_ms);
|
||||
stats::ObjectDirectoryLocationUpdates.Record(
|
||||
metrics_num_object_location_updates_per_second_);
|
||||
metrics_num_object_location_updates_ = 0;
|
||||
// Record number of object location lookups per second.
|
||||
metrics_num_object_location_lookups_per_second_ =
|
||||
(double)metrics_num_object_location_lookups_ * (1000.0 / (double)duration_ms);
|
||||
stats::ObjectDirectoryLocationLookups.Record(
|
||||
metrics_num_object_location_lookups_per_second_);
|
||||
metrics_num_object_location_lookups_ = 0;
|
||||
// Record number of object locations added per second.
|
||||
metrics_num_object_locations_added_per_second_ =
|
||||
(double)metrics_num_object_locations_added_ * (1000.0 / (double)duration_ms);
|
||||
stats::ObjectDirectoryAddedLocations.Record(
|
||||
metrics_num_object_locations_added_per_second_);
|
||||
metrics_num_object_locations_added_ = 0;
|
||||
// Record number of object locations removed per second.
|
||||
metrics_num_object_locations_removed_per_second_ =
|
||||
(double)metrics_num_object_locations_removed_ * (1000.0 / (double)duration_ms);
|
||||
stats::ObjectDirectoryRemovedLocations.Record(
|
||||
metrics_num_object_locations_removed_per_second_);
|
||||
metrics_num_object_locations_removed_ = 0;
|
||||
}
|
||||
|
||||
std::string OwnershipBasedObjectDirectory::DebugString() const {
|
||||
std::stringstream result;
|
||||
result << std::fixed << std::setprecision(3);
|
||||
result << "OwnershipBasedObjectDirectory:";
|
||||
result << "\n- num listeners: " << listeners_.size();
|
||||
result << "\n- num location updates per second: "
|
||||
<< metrics_num_object_location_updates_per_second_;
|
||||
result << "\n- num location lookups per second: "
|
||||
<< metrics_num_object_location_lookups_per_second_;
|
||||
result << "\n- num locations added per second: "
|
||||
<< metrics_num_object_locations_added_per_second_;
|
||||
result << "\n- num locations removed per second: "
|
||||
<< metrics_num_object_locations_removed_per_second_;
|
||||
return result.str();
|
||||
}
|
||||
|
||||
|
|
|
@ -64,6 +64,8 @@ class OwnershipBasedObjectDirectory : public ObjectDirectory {
|
|||
const ObjectID &object_id, const NodeID &node_id,
|
||||
const object_manager::protocol::ObjectInfoT &object_info) override;
|
||||
|
||||
void RecordMetrics(uint64_t duration_ms) override;
|
||||
|
||||
std::string DebugString() const override;
|
||||
|
||||
/// OwnershipBasedObjectDirectory should not be copied.
|
||||
|
@ -87,6 +89,24 @@ class OwnershipBasedObjectDirectory : public ObjectDirectory {
|
|||
/// Internal callback function used by SubscribeObjectLocations.
|
||||
void SubscriptionCallback(ObjectID object_id, WorkerID worker_id, Status status,
|
||||
const rpc::GetObjectLocationsOwnerReply &reply);
|
||||
|
||||
/// Metrics
|
||||
|
||||
/// Number of object locations added to this object directory.
|
||||
uint64_t metrics_num_object_locations_added_;
|
||||
double metrics_num_object_locations_added_per_second_;
|
||||
|
||||
/// Number of object locations removed from this object directory.
|
||||
uint64_t metrics_num_object_locations_removed_;
|
||||
double metrics_num_object_locations_removed_per_second_;
|
||||
|
||||
/// Number of object location lookups.
|
||||
uint64_t metrics_num_object_location_lookups_;
|
||||
double metrics_num_object_location_lookups_per_second_;
|
||||
|
||||
/// Number of object location updates.
|
||||
uint64_t metrics_num_object_location_updates_;
|
||||
double metrics_num_object_location_updates_per_second_;
|
||||
};
|
||||
|
||||
} // namespace ray
|
||||
|
|
|
@ -375,6 +375,8 @@ ray::Status NodeManager::RegisterGcs() {
|
|||
WarnResourceDeadlock();
|
||||
},
|
||||
RayConfig::instance().debug_dump_period_milliseconds());
|
||||
uint64_t now_ms = current_time_ms();
|
||||
last_metrics_recorded_at_ms_ = now_ms;
|
||||
periodical_runner_.RunFnPeriodically([this] { RecordMetrics(); },
|
||||
record_metrics_period_ms_);
|
||||
if (RayConfig::instance().free_objects_period_milliseconds() > 0) {
|
||||
|
@ -382,7 +384,7 @@ ray::Status NodeManager::RegisterGcs() {
|
|||
[this] { local_object_manager_.FlushFreeObjects(); },
|
||||
RayConfig::instance().free_objects_period_milliseconds());
|
||||
}
|
||||
last_resource_report_at_ms_ = current_time_ms();
|
||||
last_resource_report_at_ms_ = now_ms;
|
||||
periodical_runner_.RunFnPeriodically([this] { ReportResourceUsage(); },
|
||||
report_resources_period_ms_);
|
||||
// Start the timer that gets object manager profiling information and sends it
|
||||
|
@ -2254,7 +2256,7 @@ void NodeManager::RecordMetrics() {
|
|||
}
|
||||
// Last recorded time will be reset in the caller side.
|
||||
uint64_t current_time = current_time_ms();
|
||||
uint64_t duration_ms = current_time - metrics_last_recorded_time_ms_;
|
||||
uint64_t duration_ms = current_time - last_metrics_recorded_at_ms_;
|
||||
|
||||
// Record average number of tasks information per second.
|
||||
stats::AvgNumScheduledTasks.Record((double)metrics_num_task_scheduled_ *
|
||||
|
@ -2267,8 +2269,11 @@ void NodeManager::RecordMetrics() {
|
|||
(1000.0 / (double)duration_ms));
|
||||
metrics_num_task_spilled_back_ = 0;
|
||||
|
||||
object_directory_->RecordMetrics(duration_ms);
|
||||
object_manager_.RecordMetrics();
|
||||
local_object_manager_.RecordObjectSpillingStats();
|
||||
|
||||
last_metrics_recorded_at_ms_ = current_time;
|
||||
}
|
||||
|
||||
void NodeManager::PublishInfeasibleTaskError(const Task &task) const {
|
||||
|
|
|
@ -736,7 +736,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
|
|||
uint64_t record_metrics_period_ms_;
|
||||
|
||||
/// Last time metrics are recorded.
|
||||
uint64_t metrics_last_recorded_time_ms_;
|
||||
uint64_t last_metrics_recorded_at_ms_;
|
||||
|
||||
/// Number of tasks that are received and scheduled.
|
||||
uint64_t metrics_num_task_scheduled_;
|
||||
|
|
|
@ -24,6 +24,9 @@
|
|||
///
|
||||
/// You can follow these examples to define your metrics.
|
||||
|
||||
/// NOTE: When adding a new metric, add the metric name to the _METRICS list in
|
||||
/// python/ray/tests/test_metrics_agent.py to ensure that its existence is tested.
|
||||
|
||||
///
|
||||
/// Common
|
||||
///
|
||||
|
@ -64,6 +67,37 @@ static Gauge ObjectManagerPullRequests("object_manager_num_pull_requests",
|
|||
"Number of active pull requests for objects.",
|
||||
"requests");
|
||||
|
||||
static Gauge ObjectDirectoryLocationSubscriptions(
|
||||
"object_directory_subscriptions",
|
||||
"Number of object location subscriptions. If this is high, the raylet is attempting "
|
||||
"to pull a lot of objects.",
|
||||
"subscriptions");
|
||||
|
||||
static Gauge ObjectDirectoryLocationUpdates(
|
||||
"object_directory_updates",
|
||||
"Number of object location updates per second., If this is high, the raylet is "
|
||||
"attempting to pull a lot of objects and/or the locations for objects are frequently "
|
||||
"changing (e.g. due to many object copies or evictions).",
|
||||
"updates");
|
||||
|
||||
static Gauge ObjectDirectoryLocationLookups(
|
||||
"object_directory_lookups",
|
||||
"Number of object location lookups per second. If this is high, the raylet is "
|
||||
"waiting on a lot of objects.",
|
||||
"lookups");
|
||||
|
||||
static Gauge ObjectDirectoryAddedLocations(
|
||||
"object_directory_added_locations",
|
||||
"Number of object locations added per second., If this is high, a lot of objects "
|
||||
"have been added on this node.",
|
||||
"additions");
|
||||
|
||||
static Gauge ObjectDirectoryRemovedLocations(
|
||||
"object_directory_removed_locations",
|
||||
"Number of object locations removed per second. If this is high, a lot of objects "
|
||||
"have been removed from this node.",
|
||||
"removals");
|
||||
|
||||
static Gauge NumInfeasibleTasks(
|
||||
"num_infeasible_tasks",
|
||||
"The number of tasks in the scheduler that are in the 'infeasible' state.", "tasks");
|
||||
|
|
Loading…
Add table
Reference in a new issue