diff --git a/python/ray/tests/test_autoscaler_fake_scaledown.py b/python/ray/tests/test_autoscaler_fake_scaledown.py index 8b8e71c10..34bb2ca25 100644 --- a/python/ray/tests/test_autoscaler_fake_scaledown.py +++ b/python/ray/tests/test_autoscaler_fake_scaledown.py @@ -1,18 +1,35 @@ import pytest import platform import numpy as np +import re import ray from ray._private.test_utils import wait_for_condition from ray.cluster_utils import AutoscalingCluster +# Triggers the addition of a worker node. +@ray.remote(num_cpus=1) +class Actor: + def __init__(self): + self.data = [] + + def f(self): + pass + + def recv(self, obj): + pass + + def create(self, size): + return np.zeros(size) + + # Tests that we scale down even if secondary copies of objects are present on # idle nodes: https://github.com/ray-project/ray/issues/21870 @pytest.mark.skipif(platform.system() == "Windows", reason="Failing on Windows.") def test_scaledown_shared_objects(shutdown_only): cluster = AutoscalingCluster( - head_resources={"CPU": 1}, + head_resources={"CPU": 0}, worker_node_types={ "cpu_node": { "resources": { @@ -21,7 +38,7 @@ def test_scaledown_shared_objects(shutdown_only): }, "node_config": {}, "min_workers": 0, - "max_workers": 4, + "max_workers": 5, }, }, idle_timeout_minutes=0.05, @@ -31,15 +48,6 @@ def test_scaledown_shared_objects(shutdown_only): cluster.start(_system_config={"scheduler_report_pinned_bytes_only": True}) ray.init("auto") - # Triggers the addition of a GPU node. - @ray.remote(num_cpus=1) - class Actor: - def f(self): - pass - - def recv(self, obj): - pass - actors = [Actor.remote() for _ in range(5)] ray.get([a.f.remote() for a in actors]) print("All five nodes launched") @@ -47,7 +55,7 @@ def test_scaledown_shared_objects(shutdown_only): # Verify scale-up. wait_for_condition(lambda: ray.cluster_resources().get("CPU", 0) == 5) - data = ray.put(np.zeros(1024 * 1024 * 5)) + data = actors[0].create.remote(1024 * 1024 * 5) ray.get([a.recv.remote(data) for a in actors]) print("Data broadcast successfully, deleting actors.") del actors @@ -60,6 +68,120 @@ def test_scaledown_shared_objects(shutdown_only): cluster.shutdown() +def check_memory(local_objs, num_spilled_objects=None, num_plasma_objects=None): + def ok(): + s = ray.internal.internal_api.memory_summary() + print(f"\n\nMemory Summary:\n{s}\n") + + actual_objs = re.findall(r"LOCAL_REFERENCE[\s|\|]+([0-9a-f]+)", s) + if sorted(actual_objs) != sorted(local_objs): + raise RuntimeError( + f"Expect local objects={local_objs}, actual={actual_objs}" + ) + + if num_spilled_objects is not None: + m = re.search(r"Spilled (\d+) MiB, (\d+) objects", s) + if m is not None: + actual_spilled_objects = int(m.group(2)) + if actual_spilled_objects < num_spilled_objects: + raise RuntimeError( + f"Expected spilled objects={num_spilled_objects} " + f"greater than actual={actual_spilled_objects}" + ) + + if num_plasma_objects is not None: + m = re.search(r"Plasma memory usage (\d+) MiB, (\d+) objects", s) + if m is None: + raise RuntimeError( + "Memory summary does not contain Plasma memory objects count" + ) + actual_plasma_objects = int(m.group(2)) + if actual_plasma_objects != num_plasma_objects: + raise RuntimeError( + f"Expected plasma objects={num_plasma_objects} not equal " + f"to actual={actual_plasma_objects}" + ) + + return True + + wait_for_condition(ok, timeout=30, retry_interval_ms=5000) + + +# Tests that node with live spilled object does not get scaled down. +@pytest.mark.skipif(platform.system() == "Windows", reason="Failing on Windows.") +def test_no_scaledown_with_spilled_objects(shutdown_only): + cluster = AutoscalingCluster( + head_resources={"CPU": 0}, + worker_node_types={ + "cpu_node": { + "resources": { + "CPU": 1, + "object_store_memory": 75 * 1024 * 1024, + }, + "node_config": {}, + "min_workers": 0, + "max_workers": 2, + }, + }, + idle_timeout_minutes=0.05, + ) + + try: + cluster.start( + _system_config={ + "scheduler_report_pinned_bytes_only": True, + "min_spilling_size": 0, + } + ) + ray.init("auto") + + actors = [Actor.remote() for _ in range(2)] + ray.get([a.f.remote() for a in actors]) + + # Verify scale-up. + wait_for_condition(lambda: ray.cluster_resources().get("CPU", 0) == 2) + print("All nodes launched") + + # Put 10 x 80MiB objects into the object store with 75MiB memory limit. + obj_size = 10 * 1024 * 1024 + objs = [] + for i in range(10): + obj = actors[0].create.remote(obj_size) + ray.get(actors[1].recv.remote(obj)) + objs.append(obj) + print(f"obj {i}={obj.hex()}") + del obj + + # At least 9 out of the 10 objects should have spilled. + check_memory([obj.hex() for obj in objs], num_spilled_objects=9) + print("Objects spilled, deleting actors and object references.") + + # Assume the 1st object always gets spilled. + spilled_obj = objs[0] + del objs + del actors + + # Verify scale-down to 1 node. + def scaledown_to_one(): + cpu = ray.cluster_resources().get("CPU", 0) + assert cpu > 0, "Scale-down should keep at least 1 node" + return cpu == 1 + + wait_for_condition(scaledown_to_one, timeout=30) + + # Verify the spilled object still exists, and there is no object in the + # plasma store. + check_memory([spilled_obj.hex()], num_plasma_objects=0) + + # Delete the spilled object, the remaining worker node should be scaled + # down. + del spilled_obj + wait_for_condition(lambda: ray.cluster_resources().get("CPU", 0) == 0) + check_memory([], num_plasma_objects=0) + finally: + cluster.shutdown() + + if __name__ == "__main__": import sys diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 32067a281..e5b5c5487 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -140,7 +140,7 @@ RAY_CONFIG(float, scheduler_spread_threshold, 0.5); /// Whether to only report the usage of pinned copies of objects in the /// object_store_memory resource. This means nodes holding secondary copies only /// will become eligible for removal in the autoscaler. -RAY_CONFIG(bool, scheduler_report_pinned_bytes_only, false) +RAY_CONFIG(bool, scheduler_report_pinned_bytes_only, true) // The max allowed size in bytes of a return object from direct actor calls. // Objects larger than this size will be spilled/promoted to plasma. diff --git a/src/ray/raylet/local_object_manager.cc b/src/ray/raylet/local_object_manager.cc index 8663011bd..33a357647 100644 --- a/src/ray/raylet/local_object_manager.cc +++ b/src/ray/raylet/local_object_manager.cc @@ -592,6 +592,16 @@ void LocalObjectManager::RecordMetrics() const { "Restored"); } +int64_t LocalObjectManager::GetPinnedBytes() const { + if (pinned_objects_size_ > 0) { + return pinned_objects_size_; + } + // Report non-zero usage when there are spilled / spill-pending live objects, to + // prevent this node from being drained. Note that the value reported here is also used + // for scheduling. + return (spilled_objects_url_.empty() && objects_pending_spill_.empty()) ? 0 : 1; +} + std::string LocalObjectManager::DebugString() const { std::stringstream result; result << "LocalObjectManager:\n"; diff --git a/src/ray/raylet/local_object_manager.h b/src/ray/raylet/local_object_manager.h index 89c2cd263..6c9116544 100644 --- a/src/ray/raylet/local_object_manager.h +++ b/src/ray/raylet/local_object_manager.h @@ -146,8 +146,9 @@ class LocalObjectManager { /// In that case, the URL is supposed to be obtained by the object directory. std::string GetLocalSpilledObjectURL(const ObjectID &object_id); - /// Get the current pinned object store memory usage. - int64_t GetPinnedBytes() const { return pinned_objects_size_; } + /// Get the current pinned object store memory usage to help node scale down decisions. + /// A node can only be safely drained when this function reports zero. + int64_t GetPinnedBytes() const; std::string DebugString() const; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 55ef4f959..a634cc08e 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -329,25 +329,23 @@ NodeManager::NodeManager(instrumented_io_context &io_service, RAY_LOG(INFO) << "Initializing NodeManager with ID " << self_node_id_; RAY_CHECK(RayConfig::instance().raylet_heartbeat_period_milliseconds() > 0); SchedulingResources local_resources(config.resource_config); - cluster_resource_scheduler_ = - std::shared_ptr(new ClusterResourceScheduler( - scheduling::NodeID(self_node_id_.Binary()), - local_resources.GetTotalResources().GetResourceMap(), - /*is_node_available_fn*/ - [this](scheduling::NodeID node_id) { - return gcs_client_->Nodes().Get(NodeID::FromBinary(node_id.Binary())) != - nullptr; - }, - /*get_used_object_store_memory*/ - [this]() { - if (RayConfig::instance().scheduler_report_pinned_bytes_only()) { - return local_object_manager_.GetPinnedBytes(); - } else { - return object_manager_.GetUsedMemory(); - } - }, - /*get_pull_manager_at_capacity*/ - [this]() { return object_manager_.PullManagerHasPullsQueued(); })); + cluster_resource_scheduler_ = std::make_shared( + scheduling::NodeID(self_node_id_.Binary()), + local_resources.GetTotalResources().GetResourceMap(), + /*is_node_available_fn*/ + [this](scheduling::NodeID node_id) { + return gcs_client_->Nodes().Get(NodeID::FromBinary(node_id.Binary())) != nullptr; + }, + /*get_used_object_store_memory*/ + [this]() { + if (RayConfig::instance().scheduler_report_pinned_bytes_only()) { + return local_object_manager_.GetPinnedBytes(); + } else { + return object_manager_.GetUsedMemory(); + } + }, + /*get_pull_manager_at_capacity*/ + [this]() { return object_manager_.PullManagerHasPullsQueued(); }); auto get_node_info_func = [this](const NodeID &node_id) { return gcs_client_->Nodes().Get(node_id); diff --git a/src/ray/raylet/test/local_object_manager_test.cc b/src/ray/raylet/test/local_object_manager_test.cc index a980b3ee9..d25c71ed8 100644 --- a/src/ray/raylet/test/local_object_manager_test.cc +++ b/src/ray/raylet/test/local_object_manager_test.cc @@ -666,14 +666,12 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectNotEvictable) { std::vector object_ids; std::vector> objects; - int64_t total_size = 0; int64_t object_size = 1000; const ObjectID object_id = ObjectID::FromRandom(); object_ids.push_back(object_id); unevictable_objects_.emplace(object_id); auto data_buffer = std::make_shared(object_size, object_id, unpins); - total_size += object_size; auto object = std::make_unique( data_buffer, nullptr, std::vector()); objects.push_back(std::move(object)); @@ -918,7 +916,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpilledObjects) { ASSERT_TRUE(subscriber_->PublishObjectEviction()); } - // // Make sure all spilled objects are deleted. + // Make sure all spilled objects are deleted. manager.ProcessSpilledObjectsDeleteQueue(/* max_batch_size */ 30); int deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects(); ASSERT_EQ(deleted_urls_size, object_ids_to_spill.size()); @@ -1299,6 +1297,72 @@ TEST_F(LocalObjectManagerTest, TestDuplicatePinAndSpill) { AssertNoLeaks(); } +TEST_F(LocalObjectManagerTest, TestPinBytes) { + // Prepare data for objects. + rpc::Address owner_address; + owner_address.set_worker_id(WorkerID::FromRandom().Binary()); + + std::vector object_ids; + for (size_t i = 0; i < free_objects_batch_size; i++) { + ObjectID object_id = ObjectID::FromRandom(); + object_ids.push_back(object_id); + } + + std::vector> objects; + for (size_t i = 0; i < free_objects_batch_size; i++) { + std::string meta = std::to_string(static_cast(rpc::ErrorType::OBJECT_IN_PLASMA)); + auto metadata = const_cast(reinterpret_cast(meta.data())); + auto meta_buffer = std::make_shared(metadata, meta.size()); + auto object = std::make_unique( + nullptr, meta_buffer, std::vector()); + objects.push_back(std::move(object)); + } + + // There is no pinned object yet. + ASSERT_EQ(manager.GetPinnedBytes(), 0); + + // Pin objects. + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); + + // Pinned object memory should be reported. + ASSERT_GT(manager.GetPinnedBytes(), 0); + + // Spill all objects. + bool spilled = false; + manager.SpillObjects(object_ids, [&](const Status &status) { + RAY_CHECK(status.ok()); + spilled = true; + }); + ASSERT_FALSE(spilled); + ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks()); + std::vector urls; + for (size_t i = 0; i < object_ids.size(); i++) { + urls.push_back(BuildURL("url" + std::to_string(i))); + } + ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); + for (size_t i = 0; i < object_ids.size(); i++) { + ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + } + ASSERT_TRUE(spilled); + + // With all objects spilled, the pinned bytes would be 1. + ASSERT_EQ(manager.GetPinnedBytes(), 1); + + // Delete all (spilled) objects. + for (size_t i = 0; i < free_objects_batch_size; i++) { + EXPECT_CALL(*subscriber_, Unsubscribe(_, _, object_ids[i].Binary())); + ASSERT_TRUE(subscriber_->PublishObjectEviction()); + } + manager.ProcessSpilledObjectsDeleteQueue(/* max_batch_size */ 30); + int deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects(); + ASSERT_EQ(deleted_urls_size, object_ids.size()); + + // With no pinned or spilled object, the pinned bytes should be 0. + ASSERT_EQ(manager.GetPinnedBytes(), 0); + + AssertNoLeaks(); +} + } // namespace raylet } // namespace ray