mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
Switch memory units to bytes (#14433)
This commit is contained in:
parent
5fc761c562
commit
3fab5e2ada
9 changed files with 44 additions and 43 deletions
|
@ -289,7 +289,9 @@ class LoadMetrics:
|
|||
def format_resource(key, value):
|
||||
if key in ["object_store_memory", "memory"]:
|
||||
return "{} GiB".format(
|
||||
round(value * MEMORY_RESOURCE_UNIT_BYTES / 1e9, 2))
|
||||
round(
|
||||
value * MEMORY_RESOURCE_UNIT_BYTES /
|
||||
(1024 * 1024 * 1024), 2))
|
||||
else:
|
||||
return round(value, 2)
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES = 200 * 10**9
|
|||
# The default proportion of available memory allocated to the object store
|
||||
DEFAULT_OBJECT_STORE_MEMORY_PROPORTION = 0.3
|
||||
# The smallest cap on the memory used by the object store that we allow.
|
||||
# This must be greater than MEMORY_RESOURCE_UNIT_BYTES * 0.7
|
||||
# This must be greater than MEMORY_RESOURCE_UNIT_BYTES
|
||||
OBJECT_STORE_MINIMUM_MEMORY_BYTES = 75 * 1024 * 1024
|
||||
# The default maximum number of bytes that the non-primary Redis shards are
|
||||
# allowed to use unless overridden by the user.
|
||||
|
@ -64,10 +64,10 @@ DUPLICATE_REMOTE_FUNCTION_THRESHOLD = 100
|
|||
# The maximum resource quantity that is allowed. TODO(rkn): This could be
|
||||
# relaxed, but the current implementation of the node manager will be slower
|
||||
# for large resource quantities due to bookkeeping of specific resource IDs.
|
||||
MAX_RESOURCE_QUANTITY = 100000
|
||||
MAX_RESOURCE_QUANTITY = 100e12
|
||||
|
||||
# Each memory "resource" counts as this many bytes of memory.
|
||||
MEMORY_RESOURCE_UNIT_BYTES = 50 * 1024 * 1024
|
||||
MEMORY_RESOURCE_UNIT_BYTES = 1
|
||||
|
||||
# Number of units 1 resource can be subdivided into.
|
||||
MIN_RESOURCE_GRANULARITY = 0.0001
|
||||
|
|
|
@ -328,13 +328,14 @@ class LoadMetricsTest(unittest.TestCase):
|
|||
lm = LoadMetrics()
|
||||
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {})
|
||||
lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2}, {})
|
||||
lm.update("3.3.3.3", {
|
||||
"memory": 20,
|
||||
"object_store_memory": 40
|
||||
}, {
|
||||
"memory": 0,
|
||||
"object_store_memory": 20
|
||||
}, {})
|
||||
lm.update(
|
||||
"3.3.3.3", {
|
||||
"memory": 1.05 * 1024 * 1024 * 1024,
|
||||
"object_store_memory": 2.1 * 1024 * 1024 * 1024,
|
||||
}, {
|
||||
"memory": 0,
|
||||
"object_store_memory": 1.05 * 1024 * 1024 * 1024,
|
||||
}, {})
|
||||
debug = lm.info_string()
|
||||
assert ("ResourceUsage: 2.0/4.0 CPU, 14.0/16.0 GPU, "
|
||||
"1.05 GiB/1.05 GiB memory, "
|
||||
|
|
|
@ -7,6 +7,11 @@ from ray.test_utils import wait_for_condition
|
|||
MB = 1024 * 1024
|
||||
|
||||
|
||||
def object_store_memory(a, delta=MB):
|
||||
b = ray.available_resources()["object_store_memory"]
|
||||
return abs(a - b) < delta
|
||||
|
||||
|
||||
@ray.remote(memory=100 * MB)
|
||||
class Actor:
|
||||
def __init__(self):
|
||||
|
@ -36,18 +41,14 @@ def test_memory_request():
|
|||
def test_object_store_memory_reporting():
|
||||
try:
|
||||
ray.init(num_cpus=1, object_store_memory=500 * MB)
|
||||
wait_for_condition(
|
||||
lambda: ray.available_resources()["object_store_memory"] == 10.0)
|
||||
wait_for_condition(lambda: object_store_memory(500 * MB))
|
||||
x1 = ray.put(np.zeros(150 * 1024 * 1024, dtype=np.uint8))
|
||||
wait_for_condition(
|
||||
lambda: ray.available_resources()["object_store_memory"] == 7.0)
|
||||
wait_for_condition(lambda: object_store_memory(350 * MB))
|
||||
x2 = ray.put(np.zeros(75 * 1024 * 1024, dtype=np.uint8))
|
||||
wait_for_condition(
|
||||
lambda: ray.available_resources()["object_store_memory"] == 5.5)
|
||||
wait_for_condition(lambda: object_store_memory(275 * MB))
|
||||
del x1
|
||||
del x2
|
||||
wait_for_condition(
|
||||
lambda: ray.available_resources()["object_store_memory"] == 10.0)
|
||||
wait_for_condition(lambda: object_store_memory(500 * MB))
|
||||
finally:
|
||||
ray.shutdown()
|
||||
|
||||
|
@ -59,14 +60,11 @@ def test_object_store_memory_reporting_task():
|
|||
|
||||
try:
|
||||
ray.init(num_cpus=1, object_store_memory=500 * MB)
|
||||
wait_for_condition(
|
||||
lambda: ray.available_resources()["object_store_memory"] == 10.0)
|
||||
wait_for_condition(lambda: object_store_memory(500 * MB))
|
||||
x1 = f.remote(np.zeros(150 * 1024 * 1024, dtype=np.uint8))
|
||||
wait_for_condition(
|
||||
lambda: ray.available_resources()["object_store_memory"] == 7.0)
|
||||
wait_for_condition(lambda: object_store_memory(350 * MB))
|
||||
ray.cancel(x1, force=True)
|
||||
wait_for_condition(
|
||||
lambda: ray.available_resources()["object_store_memory"] == 10.0)
|
||||
wait_for_condition(lambda: object_store_memory(500 * MB))
|
||||
finally:
|
||||
ray.shutdown()
|
||||
|
||||
|
|
|
@ -1227,13 +1227,13 @@ class LoadMetricsTest(unittest.TestCase):
|
|||
"1.1.1.1",
|
||||
{
|
||||
"CPU": 64,
|
||||
"memory": 20, # 1000 MiB
|
||||
"object_store_memory": 40 # 2000 MiB
|
||||
"memory": 1000 * 1024 * 1024,
|
||||
"object_store_memory": 2000 * 1024 * 1024,
|
||||
},
|
||||
{
|
||||
"CPU": 2,
|
||||
"memory": 10, # 500 MiB
|
||||
"object_store_memory": 20 # 1000 MiB
|
||||
"memory": 500 * 1024 * 1024, # 500 MiB
|
||||
"object_store_memory": 1000 * 1024 * 1024,
|
||||
},
|
||||
{})
|
||||
lm.update("1.1.1.2", {
|
||||
|
|
|
@ -247,8 +247,7 @@ const ResourceSet ResourceSet::GetNumCpus() const {
|
|||
|
||||
const std::string format_resource(std::string resource_name, double quantity) {
|
||||
if (resource_name == "object_store_memory" || resource_name == "memory") {
|
||||
// Convert to 50MiB chunks and then to GiB
|
||||
return std::to_string(quantity * (50 * 1024 * 1024) / (1024 * 1024 * 1024)) + " GiB";
|
||||
return std::to_string(quantity / (1024 * 1024 * 1024)) + " GiB";
|
||||
}
|
||||
return std::to_string(quantity);
|
||||
}
|
||||
|
|
|
@ -233,8 +233,7 @@ std::string NodeResources::DebugString(StringIdMap string_to_in_map) const {
|
|||
|
||||
const std::string format_resource(std::string resource_name, double quantity) {
|
||||
if (resource_name == "object_store_memory" || resource_name == "memory") {
|
||||
// Convert to 50MiB chunks and then to GiB
|
||||
return std::to_string(quantity * (50 * 1024 * 1024) / (1024 * 1024 * 1024)) + " GiB";
|
||||
return std::to_string(quantity / (1024 * 1024 * 1024)) + " GiB";
|
||||
}
|
||||
return std::to_string(quantity);
|
||||
}
|
||||
|
|
|
@ -1009,8 +1009,7 @@ void ClusterResourceScheduler::FillResourceUsage(
|
|||
// it in last_report_resources_.
|
||||
if (get_used_object_store_memory_ != nullptr) {
|
||||
auto &capacity = resources.predefined_resources[OBJECT_STORE_MEM];
|
||||
// Convert to 50MiB memory units.
|
||||
double used = get_used_object_store_memory_() / (50. * 1024 * 1024);
|
||||
double used = get_used_object_store_memory_();
|
||||
capacity.available = FixedPoint(capacity.total.Double() - used);
|
||||
}
|
||||
|
||||
|
|
|
@ -1117,8 +1117,11 @@ TEST_F(ClusterResourceSchedulerTest, ObjectStoreMemoryUsageTest) {
|
|||
vector<int64_t> cust_ids{1};
|
||||
NodeResources node_resources;
|
||||
std::unordered_map<std::string, double> initial_resources(
|
||||
{{"CPU", 1}, {"GPU", 2}, {"memory", 3}, {"object_store_memory", 10}});
|
||||
int64_t used_object_store_memory = 125 * 1024 * 1024;
|
||||
{{"CPU", 1},
|
||||
{"GPU", 2},
|
||||
{"memory", 3},
|
||||
{"object_store_memory", 1000 * 1024 * 1024}});
|
||||
int64_t used_object_store_memory = 250 * 1024 * 1024;
|
||||
int64_t *ptr = &used_object_store_memory;
|
||||
ClusterResourceScheduler resource_scheduler("0", initial_resources,
|
||||
[&] { return *ptr; });
|
||||
|
@ -1134,17 +1137,17 @@ TEST_F(ClusterResourceSchedulerTest, ObjectStoreMemoryUsageTest) {
|
|||
resource_scheduler.FillResourceUsage(data);
|
||||
auto available = data->resources_available();
|
||||
auto total = data->resources_total();
|
||||
ASSERT_EQ(available["object_store_memory"], 7.5);
|
||||
ASSERT_EQ(total["object_store_memory"], 10.0);
|
||||
ASSERT_EQ(available["object_store_memory"], 750 * 1024 * 1024);
|
||||
ASSERT_EQ(total["object_store_memory"], 1000 * 1024 * 1024);
|
||||
}
|
||||
|
||||
used_object_store_memory = 225 * 1024 * 1024;
|
||||
used_object_store_memory = 450 * 1024 * 1024;
|
||||
{
|
||||
auto data = std::make_shared<rpc::ResourcesData>();
|
||||
resource_scheduler.FillResourceUsage(data);
|
||||
auto available = data->resources_available();
|
||||
auto total = data->resources_total();
|
||||
ASSERT_EQ(available["object_store_memory"], 5.5);
|
||||
ASSERT_EQ(available["object_store_memory"], 550 * 1024 * 1024);
|
||||
}
|
||||
|
||||
used_object_store_memory = 0;
|
||||
|
@ -1153,7 +1156,7 @@ TEST_F(ClusterResourceSchedulerTest, ObjectStoreMemoryUsageTest) {
|
|||
resource_scheduler.FillResourceUsage(data);
|
||||
auto available = data->resources_available();
|
||||
auto total = data->resources_total();
|
||||
ASSERT_EQ(available["object_store_memory"], 10.0);
|
||||
ASSERT_EQ(available["object_store_memory"], 1000 * 1024 * 1024);
|
||||
}
|
||||
|
||||
used_object_store_memory = 9999999999;
|
||||
|
@ -1162,7 +1165,7 @@ TEST_F(ClusterResourceSchedulerTest, ObjectStoreMemoryUsageTest) {
|
|||
resource_scheduler.FillResourceUsage(data);
|
||||
auto available = data->resources_available();
|
||||
auto total = data->resources_total();
|
||||
ASSERT_EQ(available["object_store_memory"], 0.0);
|
||||
ASSERT_EQ(available["object_store_memory"], 0);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue