mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
[autoscaler] Honor separate head and worker node subnet IDs (#8374)
This commit is contained in:
parent
ad1c05c8b0
commit
76450c8d47
13 changed files with 594 additions and 69 deletions
|
@ -222,7 +222,7 @@ install_dependencies() {
|
|||
opencv-python-headless pyyaml pandas==0.24.2 requests feather-format lxml openpyxl xlrd \
|
||||
py-spy pytest pytest-timeout networkx tabulate aiohttp uvicorn dataclasses pygments werkzeug \
|
||||
kubernetes flask grpcio pytest-sugar pytest-rerunfailures pytest-asyncio scikit-learn==0.22.2 numba \
|
||||
Pillow prometheus_client)
|
||||
Pillow prometheus_client boto3)
|
||||
if [ "${OSTYPE}" != msys ]; then
|
||||
# These packages aren't Windows-compatible
|
||||
pip_packages+=(blist) # https://github.com/DanielStutzbach/blist/issues/81#issue-391460716
|
||||
|
|
|
@ -35,6 +35,8 @@ REQUIRED, OPTIONAL = True, False
|
|||
RAY_SCHEMA_PATH = os.path.join(
|
||||
os.path.dirname(ray.autoscaler.__file__), "ray-schema.json")
|
||||
|
||||
NODE_TYPE_CONFIG_KEYS = {"workers": "worker_nodes", "head": "head_node"}
|
||||
|
||||
|
||||
class LoadMetrics:
|
||||
"""Container for cluster load metrics.
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
from distutils.version import StrictVersion
|
||||
from functools import lru_cache
|
||||
from functools import partial
|
||||
import itertools
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
|
@ -10,6 +12,8 @@ from botocore.config import Config
|
|||
import botocore
|
||||
|
||||
from ray.ray_constants import BOTO_MAX_RETRIES
|
||||
from ray.autoscaler.autoscaler import NODE_TYPE_CONFIG_KEYS
|
||||
from ray.autoscaler.aws.utils import LazyDefaultDict
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -34,6 +38,7 @@ DEFAULT_AMI = {
|
|||
"sa-east-1": "ami-0da2c49fe75e7e5ed", # SA (Sao Paulo)
|
||||
}
|
||||
|
||||
|
||||
assert StrictVersion(boto3.__version__) >= StrictVersion("1.4.8"), \
|
||||
"Boto3 version >= 1.4.8 required, try `pip install -U boto3`"
|
||||
|
||||
|
@ -232,61 +237,29 @@ def _configure_subnet(config):
|
|||
|
||||
|
||||
def _configure_security_group(config):
|
||||
if "SecurityGroupIds" in config["head_node"] and \
|
||||
"SecurityGroupIds" in config["worker_nodes"]:
|
||||
node_types_to_configure = [
|
||||
node_type for node_type, config_key in NODE_TYPE_CONFIG_KEYS.items()
|
||||
if "SecurityGroupIds" not in config[NODE_TYPE_CONFIG_KEYS[node_type]]
|
||||
]
|
||||
if not node_types_to_configure:
|
||||
return config # have user-defined groups
|
||||
|
||||
group_name = SECURITY_GROUP_TEMPLATE.format(config["cluster_name"])
|
||||
vpc_id = _get_vpc_id_or_die(config, config["worker_nodes"]["SubnetIds"][0])
|
||||
security_group = _get_security_group(config, vpc_id, group_name)
|
||||
security_groups = _upsert_security_groups(config, node_types_to_configure)
|
||||
|
||||
if security_group is None:
|
||||
if "head" in node_types_to_configure:
|
||||
head_sg = security_groups["head"]
|
||||
logger.info(
|
||||
"_configure_security_group: "
|
||||
"SecurityGroupIds not specified for head node, using {} ({})"
|
||||
.format(head_sg.group_name, head_sg.id))
|
||||
config["head_node"]["SecurityGroupIds"] = [head_sg.id]
|
||||
|
||||
if "workers" in node_types_to_configure:
|
||||
workers_sg = security_groups["workers"]
|
||||
logger.info("_configure_security_group: "
|
||||
"Creating new security group {}".format(group_name))
|
||||
client = _client("ec2", config)
|
||||
client.create_security_group(
|
||||
Description="Auto-created security group for Ray workers",
|
||||
GroupName=group_name,
|
||||
VpcId=vpc_id)
|
||||
security_group = _get_security_group(config, vpc_id, group_name)
|
||||
assert security_group, "Failed to create security group"
|
||||
|
||||
if not security_group.ip_permissions:
|
||||
IpPermissions = [{
|
||||
"FromPort": -1,
|
||||
"ToPort": -1,
|
||||
"IpProtocol": "-1",
|
||||
"UserIdGroupPairs": [{
|
||||
"GroupId": security_group.id
|
||||
}]
|
||||
}, {
|
||||
"FromPort": 22,
|
||||
"ToPort": 22,
|
||||
"IpProtocol": "TCP",
|
||||
"IpRanges": [{
|
||||
"CidrIp": "0.0.0.0/0"
|
||||
}]
|
||||
}]
|
||||
|
||||
additional_IpPermissions = config["provider"].get(
|
||||
"security_group", {}).get("IpPermissions", [])
|
||||
IpPermissions.extend(additional_IpPermissions)
|
||||
|
||||
security_group.authorize_ingress(IpPermissions=IpPermissions)
|
||||
|
||||
if "SecurityGroupIds" not in config["head_node"]:
|
||||
logger.info(
|
||||
"_configure_security_group: "
|
||||
"SecurityGroupIds not specified for head node, using {}".format(
|
||||
security_group.group_name))
|
||||
config["head_node"]["SecurityGroupIds"] = [security_group.id]
|
||||
|
||||
if "SecurityGroupIds" not in config["worker_nodes"]:
|
||||
logger.info(
|
||||
"_configure_security_group: "
|
||||
"SecurityGroupIds not specified for workers, using {}".format(
|
||||
security_group.group_name))
|
||||
config["worker_nodes"]["SecurityGroupIds"] = [security_group.id]
|
||||
"SecurityGroupIds not specified for workers, using {} ({})"
|
||||
.format(workers_sg.group_name, workers_sg.id))
|
||||
config["worker_nodes"]["SecurityGroupIds"] = [workers_sg.id]
|
||||
|
||||
return config
|
||||
|
||||
|
@ -319,28 +292,149 @@ def _check_ami(config):
|
|||
region=region))
|
||||
|
||||
|
||||
def _get_vpc_id_or_die(config, subnet_id):
|
||||
ec2 = _resource("ec2", config)
|
||||
def _upsert_security_groups(config, node_types):
|
||||
security_groups = _get_or_create_vpc_security_groups(config, node_types)
|
||||
_upsert_security_group_rules(config, security_groups)
|
||||
|
||||
return security_groups
|
||||
|
||||
|
||||
def _get_or_create_vpc_security_groups(conf, node_types):
|
||||
# Figure out which VPC each node_type is in...
|
||||
ec2 = _resource("ec2", conf)
|
||||
node_type_to_vpc = {
|
||||
node_type: _get_vpc_id_or_die(
|
||||
ec2,
|
||||
conf[NODE_TYPE_CONFIG_KEYS[node_type]]["SubnetIds"][0],
|
||||
)
|
||||
for node_type in node_types
|
||||
}
|
||||
|
||||
# Generate the name of the security group we're looking for...
|
||||
expected_sg_name = SECURITY_GROUP_TEMPLATE.format(conf["cluster_name"])
|
||||
|
||||
# Figure out which security groups with this name exist for each VPC...
|
||||
vpc_to_existing_sg = {
|
||||
sg.vpc_id: sg
|
||||
for sg in _get_security_groups(
|
||||
conf,
|
||||
node_type_to_vpc.values(),
|
||||
[expected_sg_name],
|
||||
)
|
||||
}
|
||||
|
||||
# Lazily create any security group we're missing for each VPC...
|
||||
vpc_to_sg = LazyDefaultDict(
|
||||
partial(_create_security_group, conf, group_name=expected_sg_name),
|
||||
vpc_to_existing_sg,
|
||||
)
|
||||
|
||||
# Then return a mapping from each node_type to its security group...
|
||||
return {
|
||||
node_type: vpc_to_sg[vpc_id]
|
||||
for node_type, vpc_id in node_type_to_vpc.items()
|
||||
}
|
||||
|
||||
|
||||
@lru_cache()
|
||||
def _get_vpc_id_or_die(ec2, subnet_id):
|
||||
subnet = list(
|
||||
ec2.subnets.filter(Filters=[{
|
||||
"Name": "subnet-id",
|
||||
"Values": [subnet_id]
|
||||
}]))
|
||||
assert len(subnet) == 1, "Subnet not found"
|
||||
assert len(subnet) == 1, "Subnet ID not found: {}".format(subnet_id)
|
||||
subnet = subnet[0]
|
||||
return subnet.vpc_id
|
||||
|
||||
|
||||
def _get_security_group(config, vpc_id, group_name):
|
||||
security_group = _get_security_groups(config, [vpc_id], [group_name])
|
||||
return None if not security_group else security_group[0]
|
||||
|
||||
|
||||
def _get_security_groups(config, vpc_ids, group_names):
|
||||
unique_vpc_ids = list(set(vpc_ids))
|
||||
unique_group_names = set(group_names)
|
||||
|
||||
ec2 = _resource("ec2", config)
|
||||
existing_groups = list(
|
||||
ec2.security_groups.filter(Filters=[{
|
||||
"Name": "vpc-id",
|
||||
"Values": [vpc_id]
|
||||
"Values": unique_vpc_ids
|
||||
}]))
|
||||
for sg in existing_groups:
|
||||
if sg.group_name == group_name:
|
||||
return sg
|
||||
filtered_groups = [
|
||||
sg for sg in existing_groups if sg.group_name in unique_group_names
|
||||
]
|
||||
return filtered_groups
|
||||
|
||||
|
||||
def _create_security_group(config, vpc_id, group_name):
|
||||
client = _client("ec2", config)
|
||||
client.create_security_group(
|
||||
Description="Auto-created security group for Ray workers",
|
||||
GroupName=group_name,
|
||||
VpcId=vpc_id)
|
||||
security_group = _get_security_group(config, vpc_id, group_name)
|
||||
logger.info("_create_security_group: Created new security group {} ({})"
|
||||
.format(security_group.group_name, security_group.id))
|
||||
assert security_group, "Failed to create security group"
|
||||
return security_group
|
||||
|
||||
|
||||
def _upsert_security_group_rules(conf, security_groups):
|
||||
sgids = {sg.id for sg in security_groups.values()}
|
||||
# sort security group items for deterministic inbound rule config order
|
||||
# (mainly supports more precise stub-based boto3 unit testing)
|
||||
for node_type, sg in sorted(security_groups.items()):
|
||||
sg = security_groups[node_type]
|
||||
if not sg.ip_permissions:
|
||||
_update_inbound_rules(sg, sgids, conf)
|
||||
|
||||
|
||||
def _update_inbound_rules(target_security_group, sgids, config):
|
||||
extended_rules = config["provider"] \
|
||||
.get("security_group", {}) \
|
||||
.get("IpPermissions", [])
|
||||
ip_permissions = _create_default_inbound_rules(sgids, extended_rules)
|
||||
target_security_group.authorize_ingress(IpPermissions=ip_permissions)
|
||||
|
||||
|
||||
def _create_default_inbound_rules(sgids, extended_rules=[]):
|
||||
intracluster_rules = _create_default_instracluster_inbound_rules(sgids)
|
||||
ssh_rules = _create_default_ssh_inbound_rules()
|
||||
merged_rules = itertools.chain(
|
||||
intracluster_rules,
|
||||
ssh_rules,
|
||||
extended_rules,
|
||||
)
|
||||
return list(merged_rules)
|
||||
|
||||
|
||||
def _create_default_instracluster_inbound_rules(intracluster_sgids):
|
||||
return [{
|
||||
"FromPort": -1,
|
||||
"ToPort": -1,
|
||||
"IpProtocol": "-1",
|
||||
"UserIdGroupPairs": [
|
||||
{
|
||||
"GroupId": security_group_id
|
||||
} for security_group_id in sorted(intracluster_sgids)
|
||||
# sort security group IDs for deterministic IpPermission models
|
||||
# (mainly supports more precise stub-based boto3 unit testing)
|
||||
]
|
||||
}]
|
||||
|
||||
|
||||
def _create_default_ssh_inbound_rules():
|
||||
return [{
|
||||
"FromPort": 22,
|
||||
"ToPort": 22,
|
||||
"IpProtocol": "tcp",
|
||||
"IpRanges": [{
|
||||
"CidrIp": "0.0.0.0/0"
|
||||
}]
|
||||
}]
|
||||
|
||||
|
||||
def _get_role(role_name, config):
|
||||
|
@ -380,20 +474,21 @@ def _get_key(key_name, config):
|
|||
|
||||
|
||||
def _client(name, config):
|
||||
boto_config = Config(retries={"max_attempts": BOTO_MAX_RETRIES})
|
||||
aws_credentials = config["provider"].get("aws_credentials", {})
|
||||
return boto3.client(
|
||||
name,
|
||||
config["provider"]["region"],
|
||||
config=boto_config,
|
||||
**aws_credentials)
|
||||
return _resource(name, config).meta.client
|
||||
|
||||
|
||||
def _resource(name, config):
|
||||
boto_config = Config(retries={"max_attempts": BOTO_MAX_RETRIES})
|
||||
region = config["provider"]["region"]
|
||||
aws_credentials = config["provider"].get("aws_credentials", {})
|
||||
return _resource_cache(name, region, **aws_credentials)
|
||||
|
||||
|
||||
@lru_cache()
|
||||
def _resource_cache(name, region, **kwargs):
|
||||
boto_config = Config(retries={"max_attempts": BOTO_MAX_RETRIES})
|
||||
return boto3.resource(
|
||||
name,
|
||||
config["provider"]["region"],
|
||||
region,
|
||||
config=boto_config,
|
||||
**aws_credentials)
|
||||
**kwargs,
|
||||
)
|
||||
|
|
30
python/ray/autoscaler/aws/example-subnets.yaml
Normal file
30
python/ray/autoscaler/aws/example-subnets.yaml
Normal file
|
@ -0,0 +1,30 @@
|
|||
cluster_name: subnets
|
||||
|
||||
max_workers: 1
|
||||
|
||||
provider:
|
||||
type: aws
|
||||
region: us-west-2
|
||||
availability_zone: us-west-2a
|
||||
|
||||
auth:
|
||||
ssh_user: ubuntu
|
||||
|
||||
# If required, head and worker nodes can exist on subnets in different VPCs and
|
||||
# communicate via VPC peering.
|
||||
|
||||
# VPC peering overview: https://docs.aws.amazon.com/vpc/latest/userguide/vpc-peering.html.
|
||||
# Setup VPC peering: https://docs.aws.amazon.com/vpc/latest/peering/create-vpc-peering-connection.html.
|
||||
# Configure VPC peering route tables: https://docs.aws.amazon.com/vpc/latest/peering/vpc-peering-routing.html.
|
||||
|
||||
# To enable external SSH connectivity, you should also ensure that your VPC
|
||||
# is configured to assign public IPv4 addresses to every EC2 instance
|
||||
# assigned to it.
|
||||
head_node:
|
||||
SubnetIds:
|
||||
- subnet-0000000 # Replace with your actual Head Node Subnet ID.
|
||||
|
||||
worker_nodes:
|
||||
SubnetIds:
|
||||
- subnet-fffffff # Replace with your actual Worker Node Subnet ID.
|
||||
|
23
python/ray/autoscaler/aws/utils.py
Normal file
23
python/ray/autoscaler/aws/utils.py
Normal file
|
@ -0,0 +1,23 @@
|
|||
from collections import defaultdict
|
||||
|
||||
|
||||
class LazyDefaultDict(defaultdict):
|
||||
"""
|
||||
LazyDefaultDict(default_factory[, ...]) --> dict with default factory
|
||||
|
||||
The default factory is call with the key argument to produce
|
||||
a new value when a key is not present, in __getitem__ only.
|
||||
A LazyDefaultDict compares equal to a dict with the same items.
|
||||
All remaining arguments are treated the same as if they were
|
||||
passed to the dict constructor, including keyword arguments.
|
||||
"""
|
||||
|
||||
def __missing__(self, key):
|
||||
"""
|
||||
__missing__(key) # Called by __getitem__ for missing key; pseudo-code:
|
||||
if self.default_factory is None: raise KeyError((key,))
|
||||
self[key] = value = self.default_factory(key)
|
||||
return value
|
||||
"""
|
||||
self[key] = self.default_factory(key)
|
||||
return self[key]
|
|
@ -190,6 +190,14 @@ py_test(
|
|||
deps = ["//:ray_lib"],
|
||||
)
|
||||
|
||||
|
||||
py_test(
|
||||
name = "test_autoscaler_aws",
|
||||
size = "small",
|
||||
srcs = ["aws/test_autoscaler_aws.py"],
|
||||
deps = ["//:ray_lib"],
|
||||
)
|
||||
|
||||
py_test(
|
||||
name = "test_autoscaler_yaml",
|
||||
size = "small",
|
||||
|
|
21
python/ray/tests/aws/conftest.py
Normal file
21
python/ray/tests/aws/conftest.py
Normal file
|
@ -0,0 +1,21 @@
|
|||
import pytest
|
||||
|
||||
from ray.autoscaler.aws.config import _resource_cache
|
||||
|
||||
from botocore.stub import Stubber
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def iam_client_stub():
|
||||
resource = _resource_cache("iam", "us-west-2")
|
||||
with Stubber(resource.meta.client) as stubber:
|
||||
yield stubber
|
||||
stubber.assert_no_pending_responses()
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def ec2_client_stub():
|
||||
resource = _resource_cache("ec2", "us-west-2")
|
||||
with Stubber(resource.meta.client) as stubber:
|
||||
yield stubber
|
||||
stubber.assert_no_pending_responses()
|
76
python/ray/tests/aws/test_autoscaler_aws.py
Normal file
76
python/ray/tests/aws/test_autoscaler_aws.py
Normal file
|
@ -0,0 +1,76 @@
|
|||
import pytest
|
||||
|
||||
import ray.tests.aws.utils.stubs as stubs
|
||||
import ray.tests.aws.utils.helpers as helpers
|
||||
from ray.tests.aws.utils.constants import AUX_SUBNET, DEFAULT_SUBNET, \
|
||||
DEFAULT_SG_AUX_SUBNET, DEFAULT_SG, DEFAULT_SG_DUAL_GROUP_RULES, \
|
||||
DEFAULT_SG_WITH_RULES_AUX_SUBNET, DEFAULT_SG_WITH_RULES, AUX_SG
|
||||
|
||||
|
||||
def test_create_sg_different_vpc_same_rules(iam_client_stub, ec2_client_stub):
|
||||
# use default stubs to skip ahead to security group configuration
|
||||
stubs.skip_to_configure_sg(ec2_client_stub, iam_client_stub)
|
||||
|
||||
# given head and worker nodes with custom subnets defined...
|
||||
# expect to first describe the worker subnet ID
|
||||
stubs.describe_subnets_echo(ec2_client_stub, AUX_SUBNET)
|
||||
# expect to second describe the head subnet ID
|
||||
stubs.describe_subnets_echo(ec2_client_stub, DEFAULT_SUBNET)
|
||||
# given no existing security groups within the VPC...
|
||||
stubs.describe_no_security_groups(ec2_client_stub)
|
||||
# expect to first create a security group on the worker node VPC
|
||||
stubs.create_sg_echo(ec2_client_stub, DEFAULT_SG_AUX_SUBNET)
|
||||
# expect new worker security group details to be retrieved after creation
|
||||
stubs.describe_sgs_on_vpc(
|
||||
ec2_client_stub,
|
||||
[AUX_SUBNET["VpcId"]],
|
||||
[DEFAULT_SG_AUX_SUBNET],
|
||||
)
|
||||
# expect to second create a security group on the head node VPC
|
||||
stubs.create_sg_echo(ec2_client_stub, DEFAULT_SG)
|
||||
# expect new head security group details to be retrieved after creation
|
||||
stubs.describe_sgs_on_vpc(
|
||||
ec2_client_stub,
|
||||
[DEFAULT_SUBNET["VpcId"]],
|
||||
[DEFAULT_SG],
|
||||
)
|
||||
|
||||
# given no existing default head security group inbound rules...
|
||||
# expect to authorize all default head inbound rules
|
||||
stubs.authorize_sg_ingress(
|
||||
ec2_client_stub,
|
||||
DEFAULT_SG_DUAL_GROUP_RULES,
|
||||
)
|
||||
# given no existing default worker security group inbound rules...
|
||||
# expect to authorize all default worker inbound rules
|
||||
stubs.authorize_sg_ingress(
|
||||
ec2_client_stub,
|
||||
DEFAULT_SG_WITH_RULES_AUX_SUBNET,
|
||||
)
|
||||
|
||||
# given the prior modification to the head security group...
|
||||
# expect the next read of a head security group property to reload it
|
||||
stubs.describe_sg_echo(ec2_client_stub, DEFAULT_SG_WITH_RULES)
|
||||
# given the prior modification to the worker security group...
|
||||
# expect the next read of a worker security group property to reload it
|
||||
stubs.describe_sg_echo(ec2_client_stub, DEFAULT_SG_WITH_RULES_AUX_SUBNET)
|
||||
|
||||
# given our mocks and an example config file as input...
|
||||
# expect the config to be loaded, validated, and bootstrapped successfully
|
||||
config = helpers.bootstrap_aws_example_config_file("example-subnets.yaml")
|
||||
|
||||
# expect the bootstrapped config to show different head and worker security
|
||||
# groups residing on different subnets
|
||||
assert config["head_node"]["SecurityGroupIds"] == [DEFAULT_SG["GroupId"]]
|
||||
assert config["head_node"]["SubnetIds"] == [DEFAULT_SUBNET["SubnetId"]]
|
||||
assert config["worker_nodes"]["SecurityGroupIds"] == [AUX_SG["GroupId"]]
|
||||
assert config["worker_nodes"]["SubnetIds"] == [AUX_SUBNET["SubnetId"]]
|
||||
|
||||
# expect no pending responses left in IAM or EC2 client stub queues
|
||||
iam_client_stub.assert_no_pending_responses()
|
||||
ec2_client_stub.assert_no_pending_responses()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
sys.exit(pytest.main(["-v", __file__]))
|
129
python/ray/tests/aws/utils/constants.py
Normal file
129
python/ray/tests/aws/utils/constants.py
Normal file
|
@ -0,0 +1,129 @@
|
|||
import copy
|
||||
import ray
|
||||
from datetime import datetime
|
||||
|
||||
# Override global constants used in AWS autoscaler config artifact names.
|
||||
# This helps ensure that any unmocked test doesn't alter non-test artifacts.
|
||||
ray.autoscaler.aws.config.RAY = \
|
||||
"ray-autoscaler-aws-test"
|
||||
ray.autoscaler.aws.config.DEFAULT_RAY_INSTANCE_PROFILE = \
|
||||
ray.autoscaler.aws.config.RAY + "-v1"
|
||||
ray.autoscaler.aws.config.DEFAULT_RAY_IAM_ROLE = \
|
||||
ray.autoscaler.aws.config.RAY + "-v1"
|
||||
ray.autoscaler.aws.config.SECURITY_GROUP_TEMPLATE = \
|
||||
ray.autoscaler.aws.config.RAY + "-{}"
|
||||
|
||||
# Default IAM instance profile to expose to tests.
|
||||
DEFAULT_INSTANCE_PROFILE = {
|
||||
"Arn": "arn:aws:iam::336924118301:instance-profile/ExampleInstanceProfile",
|
||||
"CreateDate": datetime(2013, 6, 12, 23, 52, 2, 2),
|
||||
"InstanceProfileId": "AIPA0000000000EXAMPLE",
|
||||
"InstanceProfileName": "ExampleInstanceProfile",
|
||||
"Path": "/",
|
||||
"Roles": [
|
||||
{
|
||||
"Arn": "arn:aws:iam::123456789012:role/Test-Role",
|
||||
"AssumeRolePolicyDocument": "ExampleAssumeRolePolicyDocument",
|
||||
"CreateDate": datetime(2013, 1, 9, 6, 33, 26, 2),
|
||||
"Path": "/",
|
||||
"RoleId": "AROA0000000000EXAMPLE",
|
||||
"RoleName": "Test-Role",
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
# Default EC2 key pair to expose to tests.
|
||||
DEFAULT_KEY_PAIR = {
|
||||
"KeyFingerprint": "00:11:22:33:44:55:66:77:88:99:AA:BB:CC:DD:EE:FF:00",
|
||||
"KeyName": ray.autoscaler.aws.config.RAY + "_us-west-2",
|
||||
}
|
||||
|
||||
# Primary EC2 subnet to expose to tests.
|
||||
DEFAULT_SUBNET = {
|
||||
"AvailabilityZone": "us-west-2a",
|
||||
"AvailableIpAddressCount": 251,
|
||||
"CidrBlock": "10.0.1.0/24",
|
||||
"DefaultForAz": False,
|
||||
"MapPublicIpOnLaunch": True,
|
||||
"State": "available",
|
||||
"SubnetId": "subnet-0000000",
|
||||
"VpcId": "vpc-0000000",
|
||||
}
|
||||
|
||||
# Secondary EC2 subnet to expose to tests as required.
|
||||
AUX_SUBNET = {
|
||||
"AvailabilityZone": "us-west-2a",
|
||||
"AvailableIpAddressCount": 251,
|
||||
"CidrBlock": "192.168.1.0/24",
|
||||
"DefaultForAz": False,
|
||||
"MapPublicIpOnLaunch": True,
|
||||
"State": "available",
|
||||
"SubnetId": "subnet-fffffff",
|
||||
"VpcId": "vpc-fffffff",
|
||||
}
|
||||
|
||||
# Default cluster name to expose to tests.
|
||||
DEFAULT_CLUSTER_NAME = "test-cluster-name"
|
||||
|
||||
# Default security group settings immediately after creation
|
||||
# (prior to inbound rule configuration).
|
||||
DEFAULT_SG = {
|
||||
"Description": "Auto-created security group for Ray workers",
|
||||
"GroupName": ray.autoscaler.aws.config.RAY + "-" + DEFAULT_CLUSTER_NAME,
|
||||
"OwnerId": "test-owner",
|
||||
"GroupId": "sg-1234abcd",
|
||||
"VpcId": DEFAULT_SUBNET["VpcId"],
|
||||
"IpPermissions": [],
|
||||
"IpPermissionsEgress": [{
|
||||
"FromPort": -1,
|
||||
"ToPort": -1,
|
||||
"IpProtocol": "-1",
|
||||
"IpRanges": [{
|
||||
"CidrIp": "0.0.0.0/0"
|
||||
}]
|
||||
}],
|
||||
"Tags": []
|
||||
}
|
||||
|
||||
# Secondary security group settings after creation
|
||||
# (prior to inbound rule configuration).
|
||||
AUX_SG = copy.deepcopy(DEFAULT_SG)
|
||||
AUX_SG["GroupName"] += "-aux"
|
||||
AUX_SG["GroupId"] = "sg-dcba4321"
|
||||
|
||||
# Default security group settings immediately after creation on aux subnet
|
||||
# (prior to inbound rule configuration).
|
||||
DEFAULT_SG_AUX_SUBNET = copy.deepcopy(DEFAULT_SG)
|
||||
DEFAULT_SG_AUX_SUBNET["VpcId"] = AUX_SUBNET["VpcId"]
|
||||
DEFAULT_SG_AUX_SUBNET["GroupId"] = AUX_SG["GroupId"]
|
||||
|
||||
# Default security group settings once default inbound rules are applied
|
||||
# (if used by both head and worker nodes)
|
||||
DEFAULT_SG_WITH_RULES = copy.deepcopy(DEFAULT_SG)
|
||||
DEFAULT_SG_WITH_RULES["IpPermissions"] = [{
|
||||
"FromPort": -1,
|
||||
"ToPort": -1,
|
||||
"IpProtocol": "-1",
|
||||
"UserIdGroupPairs": [{
|
||||
"GroupId": DEFAULT_SG["GroupId"]
|
||||
}]
|
||||
}, {
|
||||
"FromPort": 22,
|
||||
"ToPort": 22,
|
||||
"IpProtocol": "tcp",
|
||||
"IpRanges": [{
|
||||
"CidrIp": "0.0.0.0/0"
|
||||
}]
|
||||
}]
|
||||
|
||||
# Default security group once default inbound rules are applied
|
||||
# (if using separate security groups for head and worker nodes).
|
||||
DEFAULT_SG_DUAL_GROUP_RULES = copy.deepcopy(DEFAULT_SG_WITH_RULES)
|
||||
DEFAULT_SG_DUAL_GROUP_RULES["IpPermissions"][0]["UserIdGroupPairs"].append({
|
||||
"GroupId": AUX_SG["GroupId"]
|
||||
})
|
||||
|
||||
# Default security group on aux subnet once default inbound rules are applied.
|
||||
DEFAULT_SG_WITH_RULES_AUX_SUBNET = copy.deepcopy(DEFAULT_SG_DUAL_GROUP_RULES)
|
||||
DEFAULT_SG_WITH_RULES_AUX_SUBNET["VpcId"] = AUX_SUBNET["VpcId"]
|
||||
DEFAULT_SG_WITH_RULES_AUX_SUBNET["GroupId"] = AUX_SG["GroupId"]
|
28
python/ray/tests/aws/utils/helpers.py
Normal file
28
python/ray/tests/aws/utils/helpers.py
Normal file
|
@ -0,0 +1,28 @@
|
|||
import os
|
||||
import yaml
|
||||
import ray
|
||||
|
||||
from ray.autoscaler.commands import fillout_defaults, validate_config
|
||||
from ray.tests.aws.utils.constants import DEFAULT_CLUSTER_NAME
|
||||
|
||||
|
||||
def get_aws_example_config_file_path(file_name):
|
||||
return os.path.join(
|
||||
os.path.dirname(ray.autoscaler.aws.__file__), file_name)
|
||||
|
||||
|
||||
def load_aws_example_config_file(file_name):
|
||||
config_file_path = get_aws_example_config_file_path(file_name)
|
||||
return yaml.safe_load(open(config_file_path).read())
|
||||
|
||||
|
||||
def bootstrap_aws_config(config):
|
||||
config = fillout_defaults(config)
|
||||
validate_config(config)
|
||||
config["cluster_name"] = DEFAULT_CLUSTER_NAME
|
||||
return ray.autoscaler.aws.config.bootstrap_aws(config)
|
||||
|
||||
|
||||
def bootstrap_aws_example_config_file(file_name):
|
||||
config = load_aws_example_config_file(file_name)
|
||||
return bootstrap_aws_config(config)
|
7
python/ray/tests/aws/utils/mocks.py
Normal file
7
python/ray/tests/aws/utils/mocks.py
Normal file
|
@ -0,0 +1,7 @@
|
|||
from ray.autoscaler.aws.config import key_pair
|
||||
from ray.tests.aws.utils.constants import DEFAULT_KEY_PAIR
|
||||
|
||||
|
||||
def mock_path_exists_key_pair(path):
|
||||
key_name, key_path = key_pair(0, "us-west-2", DEFAULT_KEY_PAIR["KeyName"])
|
||||
return path == key_path
|
104
python/ray/tests/aws/utils/stubs.py
Normal file
104
python/ray/tests/aws/utils/stubs.py
Normal file
|
@ -0,0 +1,104 @@
|
|||
import ray
|
||||
from ray.tests.aws.utils.mocks import mock_path_exists_key_pair
|
||||
from ray.tests.aws.utils.constants import DEFAULT_INSTANCE_PROFILE, \
|
||||
DEFAULT_KEY_PAIR, DEFAULT_SUBNET
|
||||
|
||||
from unittest import mock
|
||||
|
||||
from botocore.stub import ANY
|
||||
|
||||
|
||||
def configure_iam_role_default(iam_client_stub):
|
||||
iam_client_stub.add_response(
|
||||
"get_instance_profile",
|
||||
expected_params={
|
||||
"InstanceProfileName": ray.autoscaler.aws.config.
|
||||
DEFAULT_RAY_INSTANCE_PROFILE
|
||||
},
|
||||
service_response={"InstanceProfile": DEFAULT_INSTANCE_PROFILE})
|
||||
|
||||
|
||||
def configure_key_pair_default(ec2_client_stub):
|
||||
patcher = mock.patch("os.path.exists")
|
||||
os_path_exists_mock = patcher.start()
|
||||
os_path_exists_mock.side_effect = mock_path_exists_key_pair
|
||||
|
||||
ec2_client_stub.add_response(
|
||||
"describe_key_pairs",
|
||||
expected_params={
|
||||
"Filters": [{
|
||||
"Name": "key-name",
|
||||
"Values": [DEFAULT_KEY_PAIR["KeyName"]]
|
||||
}]
|
||||
},
|
||||
service_response={"KeyPairs": [DEFAULT_KEY_PAIR]})
|
||||
|
||||
|
||||
def configure_subnet_default(ec2_client_stub):
|
||||
ec2_client_stub.add_response(
|
||||
"describe_subnets",
|
||||
expected_params={},
|
||||
service_response={"Subnets": [DEFAULT_SUBNET]})
|
||||
|
||||
|
||||
def skip_to_configure_sg(ec2_client_stub, iam_client_stub):
|
||||
configure_iam_role_default(iam_client_stub)
|
||||
configure_key_pair_default(ec2_client_stub)
|
||||
configure_subnet_default(ec2_client_stub)
|
||||
|
||||
|
||||
def describe_subnets_echo(ec2_client_stub, subnet):
|
||||
ec2_client_stub.add_response(
|
||||
"describe_subnets",
|
||||
expected_params={
|
||||
"Filters": [{
|
||||
"Name": "subnet-id",
|
||||
"Values": [subnet["SubnetId"]]
|
||||
}]
|
||||
},
|
||||
service_response={"Subnets": [subnet]})
|
||||
|
||||
|
||||
def describe_no_security_groups(ec2_client_stub):
|
||||
ec2_client_stub.add_response(
|
||||
"describe_security_groups",
|
||||
expected_params={"Filters": ANY},
|
||||
service_response={})
|
||||
|
||||
|
||||
def create_sg_echo(ec2_client_stub, security_group):
|
||||
ec2_client_stub.add_response(
|
||||
"create_security_group",
|
||||
expected_params={
|
||||
"Description": security_group["Description"],
|
||||
"GroupName": security_group["GroupName"],
|
||||
"VpcId": security_group["VpcId"]
|
||||
},
|
||||
service_response={"GroupId": security_group["GroupId"]})
|
||||
|
||||
|
||||
def describe_sgs_on_vpc(ec2_client_stub, vpc_ids, security_groups):
|
||||
ec2_client_stub.add_response(
|
||||
"describe_security_groups",
|
||||
expected_params={"Filters": [{
|
||||
"Name": "vpc-id",
|
||||
"Values": vpc_ids
|
||||
}]},
|
||||
service_response={"SecurityGroups": security_groups})
|
||||
|
||||
|
||||
def authorize_sg_ingress(ec2_client_stub, security_group):
|
||||
ec2_client_stub.add_response(
|
||||
"authorize_security_group_ingress",
|
||||
expected_params={
|
||||
"GroupId": security_group["GroupId"],
|
||||
"IpPermissions": security_group["IpPermissions"]
|
||||
},
|
||||
service_response={})
|
||||
|
||||
|
||||
def describe_sg_echo(ec2_client_stub, security_group):
|
||||
ec2_client_stub.add_response(
|
||||
"describe_security_groups",
|
||||
expected_params={"GroupIds": [security_group["GroupId"]]},
|
||||
service_response={"SecurityGroups": [security_group]})
|
|
@ -188,6 +188,8 @@ def find_version(*filepath):
|
|||
|
||||
requires = [
|
||||
"aiohttp",
|
||||
"boto3",
|
||||
"botocore",
|
||||
"click >= 7.0",
|
||||
"colorama",
|
||||
"filelock",
|
||||
|
|
Loading…
Add table
Reference in a new issue