[scheduler][autoscaler] Report placement resources for actor creation tasks (#26813)

This change makes us report placement resources for actor creation tasks. Essentially, the resource model here is that a placement resource/actor creation task is a task that runs very quickly.

Closes #26806

Co-authored-by: Alex <alex@anyscale.com>
This commit is contained in:
Alex Wu 2022-08-05 22:02:44 -07:00 committed by GitHub
parent f017fcd826
commit 50e278f58b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 119 additions and 4 deletions

View file

@ -55,6 +55,37 @@ def test_fake_autoscaler_basic_e2e(shutdown_only):
# __example_end__
def test_zero_cpu_default_actor():
cluster = AutoscalingCluster(
head_resources={"CPU": 0},
worker_node_types={
"cpu_node": {
"resources": {
"CPU": 1,
},
"node_config": {},
"min_workers": 0,
"max_workers": 1,
},
},
)
try:
cluster.start()
ray.init("auto")
@ray.remote
class Actor:
def ping(self):
pass
actor = Actor.remote()
ray.get(actor.ping.remote())
ray.shutdown()
finally:
cluster.shutdown()
if __name__ == "__main__":
import os
import sys

View file

@ -31,7 +31,7 @@ def test_warning_for_too_many_actors(shutdown_only):
p = init_error_pubsub()
@ray.remote
@ray.remote(num_cpus=0)
class Foo:
def __init__(self):
time.sleep(1000)

View file

@ -378,6 +378,54 @@ def test_backlog_report(shutdown_only):
global_state_accessor.disconnect()
def test_default_load_reports(shutdown_only):
"""Despite the fact that default actors release their cpu after being
placed, they should still require 1 CPU for laod reporting purposes.
https://github.com/ray-project/ray/issues/26806
"""
cluster = ray.init(
num_cpus=0,
)
global_state_accessor = make_global_state_accessor(cluster)
@ray.remote
def foo():
return None
@ray.remote
class Foo:
pass
def actor_and_task_queued_together():
message = global_state_accessor.get_all_resource_usage()
if message is None:
return False
resource_usage = gcs_utils.ResourceUsageBatchData.FromString(message)
aggregate_resource_load = resource_usage.resource_load_by_shape.resource_demands
print(f"Num shapes {len(aggregate_resource_load)}")
if len(aggregate_resource_load) == 1:
num_infeasible = aggregate_resource_load[0].num_infeasible_requests_queued
print(f"num in shape {num_infeasible}")
# Ideally we'd want to assert backlog_size == 8, but guaranteeing
# the order the order that submissions will occur is too
# hard/flaky.
return num_infeasible == 2
return False
# Assign to variables to keep the ref counter happy.
handle = Foo.remote()
ref = foo.remote()
wait_for_condition(actor_and_task_queued_together, timeout=2)
global_state_accessor.disconnect()
# Do something with the variables so lint is happy.
del handle
del ref
def test_heartbeat_ip(shutdown_only):
cluster = ray.init(num_cpus=1)
global_state_accessor = make_global_state_accessor(cluster)

View file

@ -92,6 +92,13 @@ RAY_CONFIG(uint64_t, raylet_get_agent_info_interval_ms, 1)
/// handler is drifting.
RAY_CONFIG(uint64_t, num_resource_report_periods_warning, 5)
/// Whether to report placement or regular resource usage for an actor.
/// Reporting placement may cause the autoscaler to overestimate the resources
/// required of the cluster, but reporting regular resource may lead to no
/// autoscaling when an actor can't be placed.
/// https://github.com/ray-project/ray/issues/26806
RAY_CONFIG(bool, report_actor_placement_resources, true)
/// Whether to record the creation sites of object references. This adds more
/// information to `ray memory`, but introduces a little extra overhead when
/// creating object references (e.g. 5~10 microsec per call in Python).

View file

@ -113,7 +113,13 @@ void TaskSpecification::ComputeResources() {
if (!IsActorTask()) {
// There is no need to compute `SchedulingClass` for actor tasks since
// the actor tasks need not be scheduled.
const auto &resource_set = GetRequiredResources();
const bool is_actor_creation_task = IsActorCreationTask();
const bool should_report_placement_resources =
RayConfig::instance().report_actor_placement_resources();
const auto &resource_set =
(is_actor_creation_task && should_report_placement_resources)
? GetRequiredPlacementResources()
: GetRequiredResources();
const auto &function_descriptor = FunctionDescriptor();
auto depth = GetDepth();
auto sched_cls_desc = SchedulingClassDescriptor(

View file

@ -100,6 +100,29 @@ TEST(TaskSpecTest, TestSchedulingClassDescriptor) {
TaskSpecification::GetSchedulingClass(descriptor9));
}
TEST(TaskSpecTest, TestActorSchedulingClass) {
// This test ensures that an actor's lease request's scheduling class is
// determined by the placement resources, not the regular resources.
const std::unordered_map<std::string, double> one_cpu = {{"CPU", 1}};
rpc::TaskSpec actor_task_spec_proto;
actor_task_spec_proto.set_type(TaskType::ACTOR_CREATION_TASK);
actor_task_spec_proto.mutable_required_placement_resources()->insert(one_cpu.begin(),
one_cpu.end());
TaskSpecification actor_task(actor_task_spec_proto);
rpc::TaskSpec regular_task_spec_proto;
regular_task_spec_proto.set_type(TaskType::NORMAL_TASK);
regular_task_spec_proto.mutable_required_resources()->insert(one_cpu.begin(),
one_cpu.end());
TaskSpecification regular_task(regular_task_spec_proto);
ASSERT_EQ(regular_task.GetSchedulingClass(), actor_task.GetSchedulingClass());
}
TEST(TaskSpecTest, TestTaskSpecification) {
rpc::SchedulingStrategy scheduling_strategy;
NodeID node_id = NodeID::FromRandom();
@ -118,4 +141,4 @@ TEST(TaskSpecTest, TestTaskSpecification) {
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
}

View file

@ -73,7 +73,7 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler,
rpc::GetAllAvailableResourcesReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Handle report resource usage rpc come from raylet.
/// Handle report resource usage rpc from a raylet.
void HandleReportResourceUsage(const rpc::ReportResourceUsageRequest &request,
rpc::ReportResourceUsageReply *reply,
rpc::SendReplyCallback send_reply_callback) override;