[Core] Account for spilled objects when reporting object store memory usage (#23425)

This commit is contained in:
mwtian 2022-03-23 22:25:22 -07:00 committed by GitHub
parent 63d6884509
commit 26f1a7ef7d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 232 additions and 37 deletions

View file

@ -1,18 +1,35 @@
import pytest
import platform
import numpy as np
import re
import ray
from ray._private.test_utils import wait_for_condition
from ray.cluster_utils import AutoscalingCluster
# Triggers the addition of a worker node.
@ray.remote(num_cpus=1)
class Actor:
def __init__(self):
self.data = []
def f(self):
pass
def recv(self, obj):
pass
def create(self, size):
return np.zeros(size)
# Tests that we scale down even if secondary copies of objects are present on
# idle nodes: https://github.com/ray-project/ray/issues/21870
@pytest.mark.skipif(platform.system() == "Windows", reason="Failing on Windows.")
def test_scaledown_shared_objects(shutdown_only):
cluster = AutoscalingCluster(
head_resources={"CPU": 1},
head_resources={"CPU": 0},
worker_node_types={
"cpu_node": {
"resources": {
@ -21,7 +38,7 @@ def test_scaledown_shared_objects(shutdown_only):
},
"node_config": {},
"min_workers": 0,
"max_workers": 4,
"max_workers": 5,
},
},
idle_timeout_minutes=0.05,
@ -31,15 +48,6 @@ def test_scaledown_shared_objects(shutdown_only):
cluster.start(_system_config={"scheduler_report_pinned_bytes_only": True})
ray.init("auto")
# Triggers the addition of a GPU node.
@ray.remote(num_cpus=1)
class Actor:
def f(self):
pass
def recv(self, obj):
pass
actors = [Actor.remote() for _ in range(5)]
ray.get([a.f.remote() for a in actors])
print("All five nodes launched")
@ -47,7 +55,7 @@ def test_scaledown_shared_objects(shutdown_only):
# Verify scale-up.
wait_for_condition(lambda: ray.cluster_resources().get("CPU", 0) == 5)
data = ray.put(np.zeros(1024 * 1024 * 5))
data = actors[0].create.remote(1024 * 1024 * 5)
ray.get([a.recv.remote(data) for a in actors])
print("Data broadcast successfully, deleting actors.")
del actors
@ -60,6 +68,120 @@ def test_scaledown_shared_objects(shutdown_only):
cluster.shutdown()
def check_memory(local_objs, num_spilled_objects=None, num_plasma_objects=None):
def ok():
s = ray.internal.internal_api.memory_summary()
print(f"\n\nMemory Summary:\n{s}\n")
actual_objs = re.findall(r"LOCAL_REFERENCE[\s|\|]+([0-9a-f]+)", s)
if sorted(actual_objs) != sorted(local_objs):
raise RuntimeError(
f"Expect local objects={local_objs}, actual={actual_objs}"
)
if num_spilled_objects is not None:
m = re.search(r"Spilled (\d+) MiB, (\d+) objects", s)
if m is not None:
actual_spilled_objects = int(m.group(2))
if actual_spilled_objects < num_spilled_objects:
raise RuntimeError(
f"Expected spilled objects={num_spilled_objects} "
f"greater than actual={actual_spilled_objects}"
)
if num_plasma_objects is not None:
m = re.search(r"Plasma memory usage (\d+) MiB, (\d+) objects", s)
if m is None:
raise RuntimeError(
"Memory summary does not contain Plasma memory objects count"
)
actual_plasma_objects = int(m.group(2))
if actual_plasma_objects != num_plasma_objects:
raise RuntimeError(
f"Expected plasma objects={num_plasma_objects} not equal "
f"to actual={actual_plasma_objects}"
)
return True
wait_for_condition(ok, timeout=30, retry_interval_ms=5000)
# Tests that node with live spilled object does not get scaled down.
@pytest.mark.skipif(platform.system() == "Windows", reason="Failing on Windows.")
def test_no_scaledown_with_spilled_objects(shutdown_only):
cluster = AutoscalingCluster(
head_resources={"CPU": 0},
worker_node_types={
"cpu_node": {
"resources": {
"CPU": 1,
"object_store_memory": 75 * 1024 * 1024,
},
"node_config": {},
"min_workers": 0,
"max_workers": 2,
},
},
idle_timeout_minutes=0.05,
)
try:
cluster.start(
_system_config={
"scheduler_report_pinned_bytes_only": True,
"min_spilling_size": 0,
}
)
ray.init("auto")
actors = [Actor.remote() for _ in range(2)]
ray.get([a.f.remote() for a in actors])
# Verify scale-up.
wait_for_condition(lambda: ray.cluster_resources().get("CPU", 0) == 2)
print("All nodes launched")
# Put 10 x 80MiB objects into the object store with 75MiB memory limit.
obj_size = 10 * 1024 * 1024
objs = []
for i in range(10):
obj = actors[0].create.remote(obj_size)
ray.get(actors[1].recv.remote(obj))
objs.append(obj)
print(f"obj {i}={obj.hex()}")
del obj
# At least 9 out of the 10 objects should have spilled.
check_memory([obj.hex() for obj in objs], num_spilled_objects=9)
print("Objects spilled, deleting actors and object references.")
# Assume the 1st object always gets spilled.
spilled_obj = objs[0]
del objs
del actors
# Verify scale-down to 1 node.
def scaledown_to_one():
cpu = ray.cluster_resources().get("CPU", 0)
assert cpu > 0, "Scale-down should keep at least 1 node"
return cpu == 1
wait_for_condition(scaledown_to_one, timeout=30)
# Verify the spilled object still exists, and there is no object in the
# plasma store.
check_memory([spilled_obj.hex()], num_plasma_objects=0)
# Delete the spilled object, the remaining worker node should be scaled
# down.
del spilled_obj
wait_for_condition(lambda: ray.cluster_resources().get("CPU", 0) == 0)
check_memory([], num_plasma_objects=0)
finally:
cluster.shutdown()
if __name__ == "__main__":
import sys

View file

@ -140,7 +140,7 @@ RAY_CONFIG(float, scheduler_spread_threshold, 0.5);
/// Whether to only report the usage of pinned copies of objects in the
/// object_store_memory resource. This means nodes holding secondary copies only
/// will become eligible for removal in the autoscaler.
RAY_CONFIG(bool, scheduler_report_pinned_bytes_only, false)
RAY_CONFIG(bool, scheduler_report_pinned_bytes_only, true)
// The max allowed size in bytes of a return object from direct actor calls.
// Objects larger than this size will be spilled/promoted to plasma.

View file

@ -592,6 +592,16 @@ void LocalObjectManager::RecordMetrics() const {
"Restored");
}
int64_t LocalObjectManager::GetPinnedBytes() const {
if (pinned_objects_size_ > 0) {
return pinned_objects_size_;
}
// Report non-zero usage when there are spilled / spill-pending live objects, to
// prevent this node from being drained. Note that the value reported here is also used
// for scheduling.
return (spilled_objects_url_.empty() && objects_pending_spill_.empty()) ? 0 : 1;
}
std::string LocalObjectManager::DebugString() const {
std::stringstream result;
result << "LocalObjectManager:\n";

View file

@ -146,8 +146,9 @@ class LocalObjectManager {
/// In that case, the URL is supposed to be obtained by the object directory.
std::string GetLocalSpilledObjectURL(const ObjectID &object_id);
/// Get the current pinned object store memory usage.
int64_t GetPinnedBytes() const { return pinned_objects_size_; }
/// Get the current pinned object store memory usage to help node scale down decisions.
/// A node can only be safely drained when this function reports zero.
int64_t GetPinnedBytes() const;
std::string DebugString() const;

View file

@ -329,25 +329,23 @@ NodeManager::NodeManager(instrumented_io_context &io_service,
RAY_LOG(INFO) << "Initializing NodeManager with ID " << self_node_id_;
RAY_CHECK(RayConfig::instance().raylet_heartbeat_period_milliseconds() > 0);
SchedulingResources local_resources(config.resource_config);
cluster_resource_scheduler_ =
std::shared_ptr<ClusterResourceScheduler>(new ClusterResourceScheduler(
scheduling::NodeID(self_node_id_.Binary()),
local_resources.GetTotalResources().GetResourceMap(),
/*is_node_available_fn*/
[this](scheduling::NodeID node_id) {
return gcs_client_->Nodes().Get(NodeID::FromBinary(node_id.Binary())) !=
nullptr;
},
/*get_used_object_store_memory*/
[this]() {
if (RayConfig::instance().scheduler_report_pinned_bytes_only()) {
return local_object_manager_.GetPinnedBytes();
} else {
return object_manager_.GetUsedMemory();
}
},
/*get_pull_manager_at_capacity*/
[this]() { return object_manager_.PullManagerHasPullsQueued(); }));
cluster_resource_scheduler_ = std::make_shared<ClusterResourceScheduler>(
scheduling::NodeID(self_node_id_.Binary()),
local_resources.GetTotalResources().GetResourceMap(),
/*is_node_available_fn*/
[this](scheduling::NodeID node_id) {
return gcs_client_->Nodes().Get(NodeID::FromBinary(node_id.Binary())) != nullptr;
},
/*get_used_object_store_memory*/
[this]() {
if (RayConfig::instance().scheduler_report_pinned_bytes_only()) {
return local_object_manager_.GetPinnedBytes();
} else {
return object_manager_.GetUsedMemory();
}
},
/*get_pull_manager_at_capacity*/
[this]() { return object_manager_.PullManagerHasPullsQueued(); });
auto get_node_info_func = [this](const NodeID &node_id) {
return gcs_client_->Nodes().Get(node_id);

View file

@ -666,14 +666,12 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectNotEvictable) {
std::vector<ObjectID> object_ids;
std::vector<std::unique_ptr<RayObject>> objects;
int64_t total_size = 0;
int64_t object_size = 1000;
const ObjectID object_id = ObjectID::FromRandom();
object_ids.push_back(object_id);
unevictable_objects_.emplace(object_id);
auto data_buffer = std::make_shared<MockObjectBuffer>(object_size, object_id, unpins);
total_size += object_size;
auto object = std::make_unique<RayObject>(
data_buffer, nullptr, std::vector<rpc::ObjectReference>());
objects.push_back(std::move(object));
@ -918,7 +916,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpilledObjects) {
ASSERT_TRUE(subscriber_->PublishObjectEviction());
}
// // Make sure all spilled objects are deleted.
// Make sure all spilled objects are deleted.
manager.ProcessSpilledObjectsDeleteQueue(/* max_batch_size */ 30);
int deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects();
ASSERT_EQ(deleted_urls_size, object_ids_to_spill.size());
@ -1299,6 +1297,72 @@ TEST_F(LocalObjectManagerTest, TestDuplicatePinAndSpill) {
AssertNoLeaks();
}
TEST_F(LocalObjectManagerTest, TestPinBytes) {
// Prepare data for objects.
rpc::Address owner_address;
owner_address.set_worker_id(WorkerID::FromRandom().Binary());
std::vector<ObjectID> object_ids;
for (size_t i = 0; i < free_objects_batch_size; i++) {
ObjectID object_id = ObjectID::FromRandom();
object_ids.push_back(object_id);
}
std::vector<std::unique_ptr<RayObject>> objects;
for (size_t i = 0; i < free_objects_batch_size; i++) {
std::string meta = std::to_string(static_cast<int>(rpc::ErrorType::OBJECT_IN_PLASMA));
auto metadata = const_cast<uint8_t *>(reinterpret_cast<const uint8_t *>(meta.data()));
auto meta_buffer = std::make_shared<LocalMemoryBuffer>(metadata, meta.size());
auto object = std::make_unique<RayObject>(
nullptr, meta_buffer, std::vector<rpc::ObjectReference>());
objects.push_back(std::move(object));
}
// There is no pinned object yet.
ASSERT_EQ(manager.GetPinnedBytes(), 0);
// Pin objects.
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
// Pinned object memory should be reported.
ASSERT_GT(manager.GetPinnedBytes(), 0);
// Spill all objects.
bool spilled = false;
manager.SpillObjects(object_ids, [&](const Status &status) {
RAY_CHECK(status.ok());
spilled = true;
});
ASSERT_FALSE(spilled);
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
std::vector<std::string> urls;
for (size_t i = 0; i < object_ids.size(); i++) {
urls.push_back(BuildURL("url" + std::to_string(i)));
}
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls));
for (size_t i = 0; i < object_ids.size(); i++) {
ASSERT_TRUE(owner_client->ReplyAddSpilledUrl());
}
ASSERT_TRUE(spilled);
// With all objects spilled, the pinned bytes would be 1.
ASSERT_EQ(manager.GetPinnedBytes(), 1);
// Delete all (spilled) objects.
for (size_t i = 0; i < free_objects_batch_size; i++) {
EXPECT_CALL(*subscriber_, Unsubscribe(_, _, object_ids[i].Binary()));
ASSERT_TRUE(subscriber_->PublishObjectEviction());
}
manager.ProcessSpilledObjectsDeleteQueue(/* max_batch_size */ 30);
int deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects();
ASSERT_EQ(deleted_urls_size, object_ids.size());
// With no pinned or spilled object, the pinned bytes should be 0.
ASSERT_EQ(manager.GetPinnedBytes(), 0);
AssertNoLeaks();
}
} // namespace raylet
} // namespace ray