From 76450c8d4777f12188203e8303d7fd7eb499210c Mon Sep 17 00:00:00 2001 From: Patrick Ames Date: Thu, 28 May 2020 18:16:46 -0700 Subject: [PATCH] [autoscaler] Honor separate head and worker node subnet IDs (#8374) --- ci/travis/install-dependencies.sh | 2 +- python/ray/autoscaler/autoscaler.py | 2 + python/ray/autoscaler/aws/config.py | 231 ++++++++++++------ .../ray/autoscaler/aws/example-subnets.yaml | 30 +++ python/ray/autoscaler/aws/utils.py | 23 ++ python/ray/tests/BUILD | 8 + python/ray/tests/aws/conftest.py | 21 ++ python/ray/tests/aws/test_autoscaler_aws.py | 76 ++++++ python/ray/tests/aws/utils/constants.py | 129 ++++++++++ python/ray/tests/aws/utils/helpers.py | 28 +++ python/ray/tests/aws/utils/mocks.py | 7 + python/ray/tests/aws/utils/stubs.py | 104 ++++++++ python/setup.py | 2 + 13 files changed, 594 insertions(+), 69 deletions(-) create mode 100644 python/ray/autoscaler/aws/example-subnets.yaml create mode 100644 python/ray/autoscaler/aws/utils.py create mode 100644 python/ray/tests/aws/conftest.py create mode 100644 python/ray/tests/aws/test_autoscaler_aws.py create mode 100644 python/ray/tests/aws/utils/constants.py create mode 100644 python/ray/tests/aws/utils/helpers.py create mode 100644 python/ray/tests/aws/utils/mocks.py create mode 100644 python/ray/tests/aws/utils/stubs.py diff --git a/ci/travis/install-dependencies.sh b/ci/travis/install-dependencies.sh index c0bbb5621..3f56b448c 100755 --- a/ci/travis/install-dependencies.sh +++ b/ci/travis/install-dependencies.sh @@ -222,7 +222,7 @@ install_dependencies() { opencv-python-headless pyyaml pandas==0.24.2 requests feather-format lxml openpyxl xlrd \ py-spy pytest pytest-timeout networkx tabulate aiohttp uvicorn dataclasses pygments werkzeug \ kubernetes flask grpcio pytest-sugar pytest-rerunfailures pytest-asyncio scikit-learn==0.22.2 numba \ - Pillow prometheus_client) + Pillow prometheus_client boto3) if [ "${OSTYPE}" != msys ]; then # These packages aren't Windows-compatible pip_packages+=(blist) # https://github.com/DanielStutzbach/blist/issues/81#issue-391460716 diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index efdd86663..a7a83cc07 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -35,6 +35,8 @@ REQUIRED, OPTIONAL = True, False RAY_SCHEMA_PATH = os.path.join( os.path.dirname(ray.autoscaler.__file__), "ray-schema.json") +NODE_TYPE_CONFIG_KEYS = {"workers": "worker_nodes", "head": "head_node"} + class LoadMetrics: """Container for cluster load metrics. diff --git a/python/ray/autoscaler/aws/config.py b/python/ray/autoscaler/aws/config.py index 6af48d7fc..96f93a667 100644 --- a/python/ray/autoscaler/aws/config.py +++ b/python/ray/autoscaler/aws/config.py @@ -1,5 +1,7 @@ from distutils.version import StrictVersion +from functools import lru_cache from functools import partial +import itertools import json import os import time @@ -10,6 +12,8 @@ from botocore.config import Config import botocore from ray.ray_constants import BOTO_MAX_RETRIES +from ray.autoscaler.autoscaler import NODE_TYPE_CONFIG_KEYS +from ray.autoscaler.aws.utils import LazyDefaultDict logger = logging.getLogger(__name__) @@ -34,6 +38,7 @@ DEFAULT_AMI = { "sa-east-1": "ami-0da2c49fe75e7e5ed", # SA (Sao Paulo) } + assert StrictVersion(boto3.__version__) >= StrictVersion("1.4.8"), \ "Boto3 version >= 1.4.8 required, try `pip install -U boto3`" @@ -232,61 +237,29 @@ def _configure_subnet(config): def _configure_security_group(config): - if "SecurityGroupIds" in config["head_node"] and \ - "SecurityGroupIds" in config["worker_nodes"]: + node_types_to_configure = [ + node_type for node_type, config_key in NODE_TYPE_CONFIG_KEYS.items() + if "SecurityGroupIds" not in config[NODE_TYPE_CONFIG_KEYS[node_type]] + ] + if not node_types_to_configure: return config # have user-defined groups - group_name = SECURITY_GROUP_TEMPLATE.format(config["cluster_name"]) - vpc_id = _get_vpc_id_or_die(config, config["worker_nodes"]["SubnetIds"][0]) - security_group = _get_security_group(config, vpc_id, group_name) + security_groups = _upsert_security_groups(config, node_types_to_configure) - if security_group is None: + if "head" in node_types_to_configure: + head_sg = security_groups["head"] + logger.info( + "_configure_security_group: " + "SecurityGroupIds not specified for head node, using {} ({})" + .format(head_sg.group_name, head_sg.id)) + config["head_node"]["SecurityGroupIds"] = [head_sg.id] + + if "workers" in node_types_to_configure: + workers_sg = security_groups["workers"] logger.info("_configure_security_group: " - "Creating new security group {}".format(group_name)) - client = _client("ec2", config) - client.create_security_group( - Description="Auto-created security group for Ray workers", - GroupName=group_name, - VpcId=vpc_id) - security_group = _get_security_group(config, vpc_id, group_name) - assert security_group, "Failed to create security group" - - if not security_group.ip_permissions: - IpPermissions = [{ - "FromPort": -1, - "ToPort": -1, - "IpProtocol": "-1", - "UserIdGroupPairs": [{ - "GroupId": security_group.id - }] - }, { - "FromPort": 22, - "ToPort": 22, - "IpProtocol": "TCP", - "IpRanges": [{ - "CidrIp": "0.0.0.0/0" - }] - }] - - additional_IpPermissions = config["provider"].get( - "security_group", {}).get("IpPermissions", []) - IpPermissions.extend(additional_IpPermissions) - - security_group.authorize_ingress(IpPermissions=IpPermissions) - - if "SecurityGroupIds" not in config["head_node"]: - logger.info( - "_configure_security_group: " - "SecurityGroupIds not specified for head node, using {}".format( - security_group.group_name)) - config["head_node"]["SecurityGroupIds"] = [security_group.id] - - if "SecurityGroupIds" not in config["worker_nodes"]: - logger.info( - "_configure_security_group: " - "SecurityGroupIds not specified for workers, using {}".format( - security_group.group_name)) - config["worker_nodes"]["SecurityGroupIds"] = [security_group.id] + "SecurityGroupIds not specified for workers, using {} ({})" + .format(workers_sg.group_name, workers_sg.id)) + config["worker_nodes"]["SecurityGroupIds"] = [workers_sg.id] return config @@ -319,28 +292,149 @@ def _check_ami(config): region=region)) -def _get_vpc_id_or_die(config, subnet_id): - ec2 = _resource("ec2", config) +def _upsert_security_groups(config, node_types): + security_groups = _get_or_create_vpc_security_groups(config, node_types) + _upsert_security_group_rules(config, security_groups) + + return security_groups + + +def _get_or_create_vpc_security_groups(conf, node_types): + # Figure out which VPC each node_type is in... + ec2 = _resource("ec2", conf) + node_type_to_vpc = { + node_type: _get_vpc_id_or_die( + ec2, + conf[NODE_TYPE_CONFIG_KEYS[node_type]]["SubnetIds"][0], + ) + for node_type in node_types + } + + # Generate the name of the security group we're looking for... + expected_sg_name = SECURITY_GROUP_TEMPLATE.format(conf["cluster_name"]) + + # Figure out which security groups with this name exist for each VPC... + vpc_to_existing_sg = { + sg.vpc_id: sg + for sg in _get_security_groups( + conf, + node_type_to_vpc.values(), + [expected_sg_name], + ) + } + + # Lazily create any security group we're missing for each VPC... + vpc_to_sg = LazyDefaultDict( + partial(_create_security_group, conf, group_name=expected_sg_name), + vpc_to_existing_sg, + ) + + # Then return a mapping from each node_type to its security group... + return { + node_type: vpc_to_sg[vpc_id] + for node_type, vpc_id in node_type_to_vpc.items() + } + + +@lru_cache() +def _get_vpc_id_or_die(ec2, subnet_id): subnet = list( ec2.subnets.filter(Filters=[{ "Name": "subnet-id", "Values": [subnet_id] }])) - assert len(subnet) == 1, "Subnet not found" + assert len(subnet) == 1, "Subnet ID not found: {}".format(subnet_id) subnet = subnet[0] return subnet.vpc_id def _get_security_group(config, vpc_id, group_name): + security_group = _get_security_groups(config, [vpc_id], [group_name]) + return None if not security_group else security_group[0] + + +def _get_security_groups(config, vpc_ids, group_names): + unique_vpc_ids = list(set(vpc_ids)) + unique_group_names = set(group_names) + ec2 = _resource("ec2", config) existing_groups = list( ec2.security_groups.filter(Filters=[{ "Name": "vpc-id", - "Values": [vpc_id] + "Values": unique_vpc_ids }])) - for sg in existing_groups: - if sg.group_name == group_name: - return sg + filtered_groups = [ + sg for sg in existing_groups if sg.group_name in unique_group_names + ] + return filtered_groups + + +def _create_security_group(config, vpc_id, group_name): + client = _client("ec2", config) + client.create_security_group( + Description="Auto-created security group for Ray workers", + GroupName=group_name, + VpcId=vpc_id) + security_group = _get_security_group(config, vpc_id, group_name) + logger.info("_create_security_group: Created new security group {} ({})" + .format(security_group.group_name, security_group.id)) + assert security_group, "Failed to create security group" + return security_group + + +def _upsert_security_group_rules(conf, security_groups): + sgids = {sg.id for sg in security_groups.values()} + # sort security group items for deterministic inbound rule config order + # (mainly supports more precise stub-based boto3 unit testing) + for node_type, sg in sorted(security_groups.items()): + sg = security_groups[node_type] + if not sg.ip_permissions: + _update_inbound_rules(sg, sgids, conf) + + +def _update_inbound_rules(target_security_group, sgids, config): + extended_rules = config["provider"] \ + .get("security_group", {}) \ + .get("IpPermissions", []) + ip_permissions = _create_default_inbound_rules(sgids, extended_rules) + target_security_group.authorize_ingress(IpPermissions=ip_permissions) + + +def _create_default_inbound_rules(sgids, extended_rules=[]): + intracluster_rules = _create_default_instracluster_inbound_rules(sgids) + ssh_rules = _create_default_ssh_inbound_rules() + merged_rules = itertools.chain( + intracluster_rules, + ssh_rules, + extended_rules, + ) + return list(merged_rules) + + +def _create_default_instracluster_inbound_rules(intracluster_sgids): + return [{ + "FromPort": -1, + "ToPort": -1, + "IpProtocol": "-1", + "UserIdGroupPairs": [ + { + "GroupId": security_group_id + } for security_group_id in sorted(intracluster_sgids) + # sort security group IDs for deterministic IpPermission models + # (mainly supports more precise stub-based boto3 unit testing) + ] + }] + + +def _create_default_ssh_inbound_rules(): + return [{ + "FromPort": 22, + "ToPort": 22, + "IpProtocol": "tcp", + "IpRanges": [{ + "CidrIp": "0.0.0.0/0" + }] + }] def _get_role(role_name, config): @@ -380,20 +474,21 @@ def _get_key(key_name, config): def _client(name, config): - boto_config = Config(retries={"max_attempts": BOTO_MAX_RETRIES}) - aws_credentials = config["provider"].get("aws_credentials", {}) - return boto3.client( - name, - config["provider"]["region"], - config=boto_config, - **aws_credentials) + return _resource(name, config).meta.client def _resource(name, config): - boto_config = Config(retries={"max_attempts": BOTO_MAX_RETRIES}) + region = config["provider"]["region"] aws_credentials = config["provider"].get("aws_credentials", {}) + return _resource_cache(name, region, **aws_credentials) + + +@lru_cache() +def _resource_cache(name, region, **kwargs): + boto_config = Config(retries={"max_attempts": BOTO_MAX_RETRIES}) return boto3.resource( name, - config["provider"]["region"], + region, config=boto_config, - **aws_credentials) + **kwargs, + ) diff --git a/python/ray/autoscaler/aws/example-subnets.yaml b/python/ray/autoscaler/aws/example-subnets.yaml new file mode 100644 index 000000000..b4601add8 --- /dev/null +++ b/python/ray/autoscaler/aws/example-subnets.yaml @@ -0,0 +1,30 @@ +cluster_name: subnets + +max_workers: 1 + +provider: + type: aws + region: us-west-2 + availability_zone: us-west-2a + +auth: + ssh_user: ubuntu + +# If required, head and worker nodes can exist on subnets in different VPCs and +# communicate via VPC peering. + +# VPC peering overview: https://docs.aws.amazon.com/vpc/latest/userguide/vpc-peering.html. +# Setup VPC peering: https://docs.aws.amazon.com/vpc/latest/peering/create-vpc-peering-connection.html. +# Configure VPC peering route tables: https://docs.aws.amazon.com/vpc/latest/peering/vpc-peering-routing.html. + +# To enable external SSH connectivity, you should also ensure that your VPC +# is configured to assign public IPv4 addresses to every EC2 instance +# assigned to it. +head_node: + SubnetIds: + - subnet-0000000 # Replace with your actual Head Node Subnet ID. + +worker_nodes: + SubnetIds: + - subnet-fffffff # Replace with your actual Worker Node Subnet ID. + diff --git a/python/ray/autoscaler/aws/utils.py b/python/ray/autoscaler/aws/utils.py new file mode 100644 index 000000000..ba29101db --- /dev/null +++ b/python/ray/autoscaler/aws/utils.py @@ -0,0 +1,23 @@ +from collections import defaultdict + + +class LazyDefaultDict(defaultdict): + """ + LazyDefaultDict(default_factory[, ...]) --> dict with default factory + + The default factory is call with the key argument to produce + a new value when a key is not present, in __getitem__ only. + A LazyDefaultDict compares equal to a dict with the same items. + All remaining arguments are treated the same as if they were + passed to the dict constructor, including keyword arguments. + """ + + def __missing__(self, key): + """ + __missing__(key) # Called by __getitem__ for missing key; pseudo-code: + if self.default_factory is None: raise KeyError((key,)) + self[key] = value = self.default_factory(key) + return value + """ + self[key] = self.default_factory(key) + return self[key] diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 234a7d71e..70111a1bd 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -190,6 +190,14 @@ py_test( deps = ["//:ray_lib"], ) + +py_test( + name = "test_autoscaler_aws", + size = "small", + srcs = ["aws/test_autoscaler_aws.py"], + deps = ["//:ray_lib"], +) + py_test( name = "test_autoscaler_yaml", size = "small", diff --git a/python/ray/tests/aws/conftest.py b/python/ray/tests/aws/conftest.py new file mode 100644 index 000000000..91762a3d4 --- /dev/null +++ b/python/ray/tests/aws/conftest.py @@ -0,0 +1,21 @@ +import pytest + +from ray.autoscaler.aws.config import _resource_cache + +from botocore.stub import Stubber + + +@pytest.fixture() +def iam_client_stub(): + resource = _resource_cache("iam", "us-west-2") + with Stubber(resource.meta.client) as stubber: + yield stubber + stubber.assert_no_pending_responses() + + +@pytest.fixture() +def ec2_client_stub(): + resource = _resource_cache("ec2", "us-west-2") + with Stubber(resource.meta.client) as stubber: + yield stubber + stubber.assert_no_pending_responses() diff --git a/python/ray/tests/aws/test_autoscaler_aws.py b/python/ray/tests/aws/test_autoscaler_aws.py new file mode 100644 index 000000000..b8ad6f31e --- /dev/null +++ b/python/ray/tests/aws/test_autoscaler_aws.py @@ -0,0 +1,76 @@ +import pytest + +import ray.tests.aws.utils.stubs as stubs +import ray.tests.aws.utils.helpers as helpers +from ray.tests.aws.utils.constants import AUX_SUBNET, DEFAULT_SUBNET, \ + DEFAULT_SG_AUX_SUBNET, DEFAULT_SG, DEFAULT_SG_DUAL_GROUP_RULES, \ + DEFAULT_SG_WITH_RULES_AUX_SUBNET, DEFAULT_SG_WITH_RULES, AUX_SG + + +def test_create_sg_different_vpc_same_rules(iam_client_stub, ec2_client_stub): + # use default stubs to skip ahead to security group configuration + stubs.skip_to_configure_sg(ec2_client_stub, iam_client_stub) + + # given head and worker nodes with custom subnets defined... + # expect to first describe the worker subnet ID + stubs.describe_subnets_echo(ec2_client_stub, AUX_SUBNET) + # expect to second describe the head subnet ID + stubs.describe_subnets_echo(ec2_client_stub, DEFAULT_SUBNET) + # given no existing security groups within the VPC... + stubs.describe_no_security_groups(ec2_client_stub) + # expect to first create a security group on the worker node VPC + stubs.create_sg_echo(ec2_client_stub, DEFAULT_SG_AUX_SUBNET) + # expect new worker security group details to be retrieved after creation + stubs.describe_sgs_on_vpc( + ec2_client_stub, + [AUX_SUBNET["VpcId"]], + [DEFAULT_SG_AUX_SUBNET], + ) + # expect to second create a security group on the head node VPC + stubs.create_sg_echo(ec2_client_stub, DEFAULT_SG) + # expect new head security group details to be retrieved after creation + stubs.describe_sgs_on_vpc( + ec2_client_stub, + [DEFAULT_SUBNET["VpcId"]], + [DEFAULT_SG], + ) + + # given no existing default head security group inbound rules... + # expect to authorize all default head inbound rules + stubs.authorize_sg_ingress( + ec2_client_stub, + DEFAULT_SG_DUAL_GROUP_RULES, + ) + # given no existing default worker security group inbound rules... + # expect to authorize all default worker inbound rules + stubs.authorize_sg_ingress( + ec2_client_stub, + DEFAULT_SG_WITH_RULES_AUX_SUBNET, + ) + + # given the prior modification to the head security group... + # expect the next read of a head security group property to reload it + stubs.describe_sg_echo(ec2_client_stub, DEFAULT_SG_WITH_RULES) + # given the prior modification to the worker security group... + # expect the next read of a worker security group property to reload it + stubs.describe_sg_echo(ec2_client_stub, DEFAULT_SG_WITH_RULES_AUX_SUBNET) + + # given our mocks and an example config file as input... + # expect the config to be loaded, validated, and bootstrapped successfully + config = helpers.bootstrap_aws_example_config_file("example-subnets.yaml") + + # expect the bootstrapped config to show different head and worker security + # groups residing on different subnets + assert config["head_node"]["SecurityGroupIds"] == [DEFAULT_SG["GroupId"]] + assert config["head_node"]["SubnetIds"] == [DEFAULT_SUBNET["SubnetId"]] + assert config["worker_nodes"]["SecurityGroupIds"] == [AUX_SG["GroupId"]] + assert config["worker_nodes"]["SubnetIds"] == [AUX_SUBNET["SubnetId"]] + + # expect no pending responses left in IAM or EC2 client stub queues + iam_client_stub.assert_no_pending_responses() + ec2_client_stub.assert_no_pending_responses() + + +if __name__ == "__main__": + import sys + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/tests/aws/utils/constants.py b/python/ray/tests/aws/utils/constants.py new file mode 100644 index 000000000..4827288a0 --- /dev/null +++ b/python/ray/tests/aws/utils/constants.py @@ -0,0 +1,129 @@ +import copy +import ray +from datetime import datetime + +# Override global constants used in AWS autoscaler config artifact names. +# This helps ensure that any unmocked test doesn't alter non-test artifacts. +ray.autoscaler.aws.config.RAY = \ + "ray-autoscaler-aws-test" +ray.autoscaler.aws.config.DEFAULT_RAY_INSTANCE_PROFILE = \ + ray.autoscaler.aws.config.RAY + "-v1" +ray.autoscaler.aws.config.DEFAULT_RAY_IAM_ROLE = \ + ray.autoscaler.aws.config.RAY + "-v1" +ray.autoscaler.aws.config.SECURITY_GROUP_TEMPLATE = \ + ray.autoscaler.aws.config.RAY + "-{}" + +# Default IAM instance profile to expose to tests. +DEFAULT_INSTANCE_PROFILE = { + "Arn": "arn:aws:iam::336924118301:instance-profile/ExampleInstanceProfile", + "CreateDate": datetime(2013, 6, 12, 23, 52, 2, 2), + "InstanceProfileId": "AIPA0000000000EXAMPLE", + "InstanceProfileName": "ExampleInstanceProfile", + "Path": "/", + "Roles": [ + { + "Arn": "arn:aws:iam::123456789012:role/Test-Role", + "AssumeRolePolicyDocument": "ExampleAssumeRolePolicyDocument", + "CreateDate": datetime(2013, 1, 9, 6, 33, 26, 2), + "Path": "/", + "RoleId": "AROA0000000000EXAMPLE", + "RoleName": "Test-Role", + }, + ] +} + +# Default EC2 key pair to expose to tests. +DEFAULT_KEY_PAIR = { + "KeyFingerprint": "00:11:22:33:44:55:66:77:88:99:AA:BB:CC:DD:EE:FF:00", + "KeyName": ray.autoscaler.aws.config.RAY + "_us-west-2", +} + +# Primary EC2 subnet to expose to tests. +DEFAULT_SUBNET = { + "AvailabilityZone": "us-west-2a", + "AvailableIpAddressCount": 251, + "CidrBlock": "10.0.1.0/24", + "DefaultForAz": False, + "MapPublicIpOnLaunch": True, + "State": "available", + "SubnetId": "subnet-0000000", + "VpcId": "vpc-0000000", +} + +# Secondary EC2 subnet to expose to tests as required. +AUX_SUBNET = { + "AvailabilityZone": "us-west-2a", + "AvailableIpAddressCount": 251, + "CidrBlock": "192.168.1.0/24", + "DefaultForAz": False, + "MapPublicIpOnLaunch": True, + "State": "available", + "SubnetId": "subnet-fffffff", + "VpcId": "vpc-fffffff", +} + +# Default cluster name to expose to tests. +DEFAULT_CLUSTER_NAME = "test-cluster-name" + +# Default security group settings immediately after creation +# (prior to inbound rule configuration). +DEFAULT_SG = { + "Description": "Auto-created security group for Ray workers", + "GroupName": ray.autoscaler.aws.config.RAY + "-" + DEFAULT_CLUSTER_NAME, + "OwnerId": "test-owner", + "GroupId": "sg-1234abcd", + "VpcId": DEFAULT_SUBNET["VpcId"], + "IpPermissions": [], + "IpPermissionsEgress": [{ + "FromPort": -1, + "ToPort": -1, + "IpProtocol": "-1", + "IpRanges": [{ + "CidrIp": "0.0.0.0/0" + }] + }], + "Tags": [] +} + +# Secondary security group settings after creation +# (prior to inbound rule configuration). +AUX_SG = copy.deepcopy(DEFAULT_SG) +AUX_SG["GroupName"] += "-aux" +AUX_SG["GroupId"] = "sg-dcba4321" + +# Default security group settings immediately after creation on aux subnet +# (prior to inbound rule configuration). +DEFAULT_SG_AUX_SUBNET = copy.deepcopy(DEFAULT_SG) +DEFAULT_SG_AUX_SUBNET["VpcId"] = AUX_SUBNET["VpcId"] +DEFAULT_SG_AUX_SUBNET["GroupId"] = AUX_SG["GroupId"] + +# Default security group settings once default inbound rules are applied +# (if used by both head and worker nodes) +DEFAULT_SG_WITH_RULES = copy.deepcopy(DEFAULT_SG) +DEFAULT_SG_WITH_RULES["IpPermissions"] = [{ + "FromPort": -1, + "ToPort": -1, + "IpProtocol": "-1", + "UserIdGroupPairs": [{ + "GroupId": DEFAULT_SG["GroupId"] + }] +}, { + "FromPort": 22, + "ToPort": 22, + "IpProtocol": "tcp", + "IpRanges": [{ + "CidrIp": "0.0.0.0/0" + }] +}] + +# Default security group once default inbound rules are applied +# (if using separate security groups for head and worker nodes). +DEFAULT_SG_DUAL_GROUP_RULES = copy.deepcopy(DEFAULT_SG_WITH_RULES) +DEFAULT_SG_DUAL_GROUP_RULES["IpPermissions"][0]["UserIdGroupPairs"].append({ + "GroupId": AUX_SG["GroupId"] +}) + +# Default security group on aux subnet once default inbound rules are applied. +DEFAULT_SG_WITH_RULES_AUX_SUBNET = copy.deepcopy(DEFAULT_SG_DUAL_GROUP_RULES) +DEFAULT_SG_WITH_RULES_AUX_SUBNET["VpcId"] = AUX_SUBNET["VpcId"] +DEFAULT_SG_WITH_RULES_AUX_SUBNET["GroupId"] = AUX_SG["GroupId"] diff --git a/python/ray/tests/aws/utils/helpers.py b/python/ray/tests/aws/utils/helpers.py new file mode 100644 index 000000000..dbbfb0a23 --- /dev/null +++ b/python/ray/tests/aws/utils/helpers.py @@ -0,0 +1,28 @@ +import os +import yaml +import ray + +from ray.autoscaler.commands import fillout_defaults, validate_config +from ray.tests.aws.utils.constants import DEFAULT_CLUSTER_NAME + + +def get_aws_example_config_file_path(file_name): + return os.path.join( + os.path.dirname(ray.autoscaler.aws.__file__), file_name) + + +def load_aws_example_config_file(file_name): + config_file_path = get_aws_example_config_file_path(file_name) + return yaml.safe_load(open(config_file_path).read()) + + +def bootstrap_aws_config(config): + config = fillout_defaults(config) + validate_config(config) + config["cluster_name"] = DEFAULT_CLUSTER_NAME + return ray.autoscaler.aws.config.bootstrap_aws(config) + + +def bootstrap_aws_example_config_file(file_name): + config = load_aws_example_config_file(file_name) + return bootstrap_aws_config(config) diff --git a/python/ray/tests/aws/utils/mocks.py b/python/ray/tests/aws/utils/mocks.py new file mode 100644 index 000000000..2114cf5d0 --- /dev/null +++ b/python/ray/tests/aws/utils/mocks.py @@ -0,0 +1,7 @@ +from ray.autoscaler.aws.config import key_pair +from ray.tests.aws.utils.constants import DEFAULT_KEY_PAIR + + +def mock_path_exists_key_pair(path): + key_name, key_path = key_pair(0, "us-west-2", DEFAULT_KEY_PAIR["KeyName"]) + return path == key_path diff --git a/python/ray/tests/aws/utils/stubs.py b/python/ray/tests/aws/utils/stubs.py new file mode 100644 index 000000000..e11b8a351 --- /dev/null +++ b/python/ray/tests/aws/utils/stubs.py @@ -0,0 +1,104 @@ +import ray +from ray.tests.aws.utils.mocks import mock_path_exists_key_pair +from ray.tests.aws.utils.constants import DEFAULT_INSTANCE_PROFILE, \ + DEFAULT_KEY_PAIR, DEFAULT_SUBNET + +from unittest import mock + +from botocore.stub import ANY + + +def configure_iam_role_default(iam_client_stub): + iam_client_stub.add_response( + "get_instance_profile", + expected_params={ + "InstanceProfileName": ray.autoscaler.aws.config. + DEFAULT_RAY_INSTANCE_PROFILE + }, + service_response={"InstanceProfile": DEFAULT_INSTANCE_PROFILE}) + + +def configure_key_pair_default(ec2_client_stub): + patcher = mock.patch("os.path.exists") + os_path_exists_mock = patcher.start() + os_path_exists_mock.side_effect = mock_path_exists_key_pair + + ec2_client_stub.add_response( + "describe_key_pairs", + expected_params={ + "Filters": [{ + "Name": "key-name", + "Values": [DEFAULT_KEY_PAIR["KeyName"]] + }] + }, + service_response={"KeyPairs": [DEFAULT_KEY_PAIR]}) + + +def configure_subnet_default(ec2_client_stub): + ec2_client_stub.add_response( + "describe_subnets", + expected_params={}, + service_response={"Subnets": [DEFAULT_SUBNET]}) + + +def skip_to_configure_sg(ec2_client_stub, iam_client_stub): + configure_iam_role_default(iam_client_stub) + configure_key_pair_default(ec2_client_stub) + configure_subnet_default(ec2_client_stub) + + +def describe_subnets_echo(ec2_client_stub, subnet): + ec2_client_stub.add_response( + "describe_subnets", + expected_params={ + "Filters": [{ + "Name": "subnet-id", + "Values": [subnet["SubnetId"]] + }] + }, + service_response={"Subnets": [subnet]}) + + +def describe_no_security_groups(ec2_client_stub): + ec2_client_stub.add_response( + "describe_security_groups", + expected_params={"Filters": ANY}, + service_response={}) + + +def create_sg_echo(ec2_client_stub, security_group): + ec2_client_stub.add_response( + "create_security_group", + expected_params={ + "Description": security_group["Description"], + "GroupName": security_group["GroupName"], + "VpcId": security_group["VpcId"] + }, + service_response={"GroupId": security_group["GroupId"]}) + + +def describe_sgs_on_vpc(ec2_client_stub, vpc_ids, security_groups): + ec2_client_stub.add_response( + "describe_security_groups", + expected_params={"Filters": [{ + "Name": "vpc-id", + "Values": vpc_ids + }]}, + service_response={"SecurityGroups": security_groups}) + + +def authorize_sg_ingress(ec2_client_stub, security_group): + ec2_client_stub.add_response( + "authorize_security_group_ingress", + expected_params={ + "GroupId": security_group["GroupId"], + "IpPermissions": security_group["IpPermissions"] + }, + service_response={}) + + +def describe_sg_echo(ec2_client_stub, security_group): + ec2_client_stub.add_response( + "describe_security_groups", + expected_params={"GroupIds": [security_group["GroupId"]]}, + service_response={"SecurityGroups": [security_group]}) diff --git a/python/setup.py b/python/setup.py index a75b481fa..65902a515 100644 --- a/python/setup.py +++ b/python/setup.py @@ -188,6 +188,8 @@ def find_version(*filepath): requires = [ "aiohttp", + "boto3", + "botocore", "click >= 7.0", "colorama", "filelock",