ray/release/ray_release/buildkite/concurrency.py
Kai Fricke 04ea180dfb
[ci/release] Add "tiny" concurrency group, change limits (#23065)
E.g. long running tests run on small clusters (often 8 CPUs) but block other jobs for a long time. We should thus add more granularity to the concurrency groups.
Additionally, limits have been slightly adjusted to make more sense (e.g. 8 GPUs are now small-gpu, 9+ GPUs large-gpu, instead of 7 for small-gpu and 8 for large-gpu).
2022-03-11 10:19:38 -08:00

133 lines
4.2 KiB
Python

import csv
import os
from collections import namedtuple
from typing import Tuple, Optional, Dict
from ray_release.config import Test, RELEASE_PACKAGE_DIR, load_test_cluster_compute
from ray_release.logger import logger
# Keep 10% for the buffer.
limit = int(15784 * 0.9)
CONCURRENY_GROUPS = {
"tiny": 32,
"small": 16,
"medium": 4,
"large": 2,
"small-gpu": 8,
"large-gpu": 4,
}
Condition = namedtuple(
"Condition", ["min_gpu", "max_gpu", "min_cpu", "max_cpu", "group"]
)
gpu_cpu_to_concurrency_groups = [
Condition(min_gpu=9, max_gpu=-1, min_cpu=0, max_cpu=-1, group="large-gpu"),
Condition(min_gpu=1, max_gpu=9, min_cpu=0, max_cpu=-128, group="small-gpu"),
Condition(min_gpu=0, max_gpu=0, min_cpu=513, max_cpu=-1, group="large"),
Condition(min_gpu=0, max_gpu=0, min_cpu=129, max_cpu=512, group="medium"),
Condition(min_gpu=0, max_gpu=0, min_cpu=0, max_cpu=32, group="tiny"),
Condition(min_gpu=0, max_gpu=0, min_cpu=0, max_cpu=128, group="small"),
]
# Obtained from https://cloud.google.com/compute/docs/accelerator-optimized-machines
gcp_gpu_instances = {
"a2-highgpu-1g": (12, 1),
"a2-highgpu-2g": (24, 2),
"a2-highgpu-4g": (48, 4),
"a2-highgpu-8g": (96, 8),
"a2-megagpu-16g": (96, 16),
}
def load_instance_types(path: Optional[str] = None) -> Dict[str, Tuple[int, int]]:
path = path or os.path.join(
RELEASE_PACKAGE_DIR, "ray_release", "buildkite", "aws_instance_types.csv"
)
instance_to_resources = {}
with open(path, "rt") as fp:
reader = csv.DictReader(fp)
for row in reader:
instance_to_resources[row["instance"]] = (
int(row["cpus"]),
int(row["gpus"]),
)
return instance_to_resources
def parse_instance_resources(instance: str) -> Tuple[int, int]:
"""Parse (GCP) instance strings to resources"""
# Assumes that GPU instances have already been parsed
num_cpus = int(instance.split("-")[-1])
num_gpus = 0
return num_cpus, num_gpus
def parse_condition(cond: int, limit: float = float("inf")) -> float:
return cond if cond > -1 else limit
def get_concurrency_group(test: Test) -> Tuple[str, int]:
try:
test_cpus, test_gpus = get_test_resources(test)
except Exception as e:
logger.warning(f"Couldn't get test resources for test {test['name']}: {e}")
return "small", CONCURRENY_GROUPS["small"]
for condition in gpu_cpu_to_concurrency_groups:
min_gpu = parse_condition(condition.min_gpu, float("-inf"))
max_gpu = parse_condition(condition.max_gpu, float("inf"))
min_cpu = parse_condition(condition.min_cpu, float("-inf"))
max_cpu = parse_condition(condition.max_cpu, float("inf"))
if min_cpu <= test_cpus <= max_cpu and min_gpu <= test_gpus <= max_gpu:
group = condition.group
return group, CONCURRENY_GROUPS[group]
# Return default
logger.warning(
f"Could not find concurrency group for test {test['name']} "
f"based on used resources."
)
return "small", CONCURRENY_GROUPS["small"]
def get_test_resources(test: Test) -> Tuple[int, int]:
cluster_compute = load_test_cluster_compute(test)
return get_test_resources_from_cluster_compute(cluster_compute)
def get_test_resources_from_cluster_compute(cluster_compute: Dict) -> Tuple[int, int]:
instances = []
# Add head node instance
instances.append((cluster_compute["head_node_type"]["instance_type"], 1))
# Add worker node instances
instances.extend(
(w["instance_type"], w.get("max_workers", w.get("min_workers", 1)))
for w in cluster_compute["worker_node_types"]
)
aws_instance_types = load_instance_types()
total_cpus = 0
total_gpus = 0
for instance, count in instances:
if instance in aws_instance_types:
instance_cpus, instance_gpus = aws_instance_types[instance]
elif instance in gcp_gpu_instances:
instance_cpus, instance_gpus = gcp_gpu_instances[instance]
else:
instance_cpus, instance_gpus = parse_instance_resources(instance)
total_cpus += instance_cpus * count
total_gpus += instance_gpus * count
return total_cpus, total_gpus