[autoscaler] Consolidate CloudWatch agent/dashboard/alarm support; Add unit tests for AWS autoscaler CloudWatch integration (#22070)

This PR mainly adds two improvements:

We have introduced three CloudWatch Config support in previous PRs: Agent, Dashboard and Alarm. In this PR, we generalize the logic of all three config types by using enum CloudwatchConfigType.
Adds unit tests to ensure the correctness of Ray autoscaler CloudWatch integration behavior.
This commit is contained in:
Zyiqin-Miranda 2022-08-08 11:45:07 -07:00 committed by GitHub
parent 5087511c46
commit b3f06d97b2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 876 additions and 68 deletions

View file

@ -4,7 +4,8 @@ import json
import logging
import os
import time
from typing import Any, Dict, List, Tuple, Union
from enum import Enum
from typing import Any, Callable, Dict, List, Union
import botocore
@ -21,6 +22,12 @@ CLOUDWATCH_AGENT_INSTALLED_TAG = "cloudwatch-agent-installed"
CLOUDWATCH_CONFIG_HASH_TAG_BASE = "cloudwatch-config-hash"
class CloudwatchConfigType(str, Enum):
AGENT = "agent"
DASHBOARD = "dashboard"
ALARM = "alarm"
class CloudwatchHelper:
def __init__(
self, provider_config: Dict[str, Any], node_id: str, cluster_name: str
@ -34,6 +41,22 @@ class CloudwatchHelper:
self.ssm_client = client_cache("ssm", region)
cloudwatch_resource = resource_cache("cloudwatch", region)
self.cloudwatch_client = cloudwatch_resource.meta.client
self.CLOUDWATCH_CONFIG_TYPE_TO_CONFIG_VARIABLE_REPLACE_FUNC: Dict[
str, Callable
] = {
CloudwatchConfigType.AGENT.value: self._replace_cwa_config_vars,
CloudwatchConfigType.DASHBOARD.value: self._replace_dashboard_config_vars,
CloudwatchConfigType.ALARM.value: self._load_config_file,
}
self.CLOUDWATCH_CONFIG_TYPE_TO_UPDATE_FUNC_HEAD_NODE: Dict[str, Callable] = {
CloudwatchConfigType.AGENT.value: self._restart_cloudwatch_agent,
CloudwatchConfigType.DASHBOARD.value: self._put_cloudwatch_dashboard,
CloudwatchConfigType.ALARM.value: self._put_cloudwatch_alarm,
}
self.CLOUDWATCH_CONFIG_TYPE_TO_UPDATE_FUNC_WORKER_NODE: Dict[str, Callable] = {
CloudwatchConfigType.AGENT.value: self._restart_cloudwatch_agent,
CloudwatchConfigType.ALARM.value: self._put_cloudwatch_alarm,
}
def update_from_config(self, is_head_node: bool) -> None:
"""Discovers and applies CloudWatch config updates as required.
@ -41,12 +64,11 @@ class CloudwatchHelper:
Args:
is_head_node: whether this node is the head node.
"""
if CloudwatchHelper.cloudwatch_config_exists(self.provider_config, "agent"):
self._update_cloudwatch_config(is_head_node, "agent")
if CloudwatchHelper.cloudwatch_config_exists(self.provider_config, "dashboard"):
self._update_cloudwatch_config(is_head_node, "dashboard")
if CloudwatchHelper.cloudwatch_config_exists(self.provider_config, "alarm"):
self._update_cloudwatch_config(is_head_node, "alarm")
for config_type in CloudwatchConfigType:
if CloudwatchHelper.cloudwatch_config_exists(
self.provider_config, config_type.value
):
self._update_cloudwatch_config(config_type.value, is_head_node)
def _ec2_health_check_waiter(self, node_id: str) -> None:
# wait for all EC2 instance checks to complete
@ -66,14 +88,10 @@ class CloudwatchHelper:
)
raise e
def _update_cloudwatch_config(self, is_head_node: bool, config_type: str) -> None:
"""Update remote CloudWatch configs at Parameter Store,
update hash tag value on node and perform associated operations
at CloudWatch console if local CloudWatch configs change.
Args:
is_head_node: whether this node is the head node.
config_type: CloudWatch config file type.
def _update_cloudwatch_config(self, config_type: str, is_head_node: bool) -> None:
"""
check whether update operations are needed in
cloudwatch related configs
"""
cwa_installed = self._setup_cwa()
param_name = self._get_ssm_param_name(config_type)
@ -84,19 +102,16 @@ class CloudwatchHelper:
)
cur_cw_config_hash = self._sha1_hash_file(config_type)
ssm_cw_config_hash = self._sha1_hash_json(cw_config_ssm)
# check if user updated Unified Cloudwatch Agent config file.
# check if user updated cloudwatch related config files.
# if so, perform corresponding actions.
if cur_cw_config_hash != ssm_cw_config_hash:
logger.info(
"Cloudwatch {} config file has changed.".format(config_type)
)
self._upload_config_to_ssm_and_set_hash_tag(config_type)
if config_type == "agent":
self._restart_cloudwatch_agent()
elif config_type == "dashboard":
self._put_cloudwatch_dashboard()
elif config_type == "alarm":
self._put_cloudwatch_alarm()
self.CLOUDWATCH_CONFIG_TYPE_TO_UPDATE_FUNC_HEAD_NODE.get(
config_type
)()
else:
head_node_hash = self._get_head_node_config_hash(config_type)
cur_node_hash = self._get_cur_node_config_hash(config_type)
@ -104,10 +119,13 @@ class CloudwatchHelper:
logger.info(
"Cloudwatch {} config file has changed.".format(config_type)
)
if config_type == "agent":
self._restart_cloudwatch_agent()
if config_type == "alarm":
self._put_cloudwatch_alarm()
update_func = (
self.CLOUDWATCH_CONFIG_TYPE_TO_UPDATE_FUNC_WORKER_NODE.get(
config_type
)
)
if update_func:
update_func()
self._update_cloudwatch_hash_tag_value(
self.node_id, head_node_hash, config_type
)
@ -120,7 +138,9 @@ class CloudwatchHelper:
dashboard_name_cluster = dashboard_config.get("name", self.cluster_name)
dashboard_name = self.cluster_name + "-" + dashboard_name_cluster
widgets = self._replace_dashboard_config_variables()
widgets = self._replace_dashboard_config_vars(
CloudwatchConfigType.DASHBOARD.value
)
response = self.cloudwatch_client.put_dashboard(
DashboardName=dashboard_name, DashboardBody=json.dumps({"widgets": widgets})
@ -144,7 +164,7 @@ class CloudwatchHelper:
def _put_cloudwatch_alarm(self) -> None:
"""put CloudWatch metric alarms read from config"""
param_name = self._get_ssm_param_name("alarm")
param_name = self._get_ssm_param_name(CloudwatchConfigType.ALARM.value)
data = json.loads(self._get_ssm_param(param_name))
for item in data:
item_out = copy.deepcopy(item)
@ -158,7 +178,7 @@ class CloudwatchHelper:
logger.info("Successfully put alarms to CloudWatch console")
def _send_command_to_node(
self, document_name: str, parameters: List[str], node_id: str
self, document_name: str, parameters: Dict[str, List[str]], node_id: str
) -> Dict[str, Any]:
"""send SSM command to the given nodes"""
logger.debug(
@ -177,10 +197,10 @@ class CloudwatchHelper:
def _ssm_command_waiter(
self,
document_name: str,
parameters: List[str],
parameters: Dict[str, List[str]],
node_id: str,
retry_failed: bool = True,
) -> bool:
) -> Dict[str, Any]:
"""wait for SSM command to complete on all cluster nodes"""
# This waiter differs from the built-in SSM.Waiter by
@ -192,7 +212,9 @@ class CloudwatchHelper:
command_id = response["Command"]["CommandId"]
cloudwatch_config = self.provider_config["cloudwatch"]
agent_retryer_config = cloudwatch_config.get("agent").get("retryer", {})
agent_retryer_config = cloudwatch_config.get(
CloudwatchConfigType.AGENT.value
).get("retryer", {})
max_attempts = agent_retryer_config.get("max_attempts", 120)
delay_seconds = agent_retryer_config.get("delay_seconds", 30)
num_attempts = 0
@ -283,19 +305,19 @@ class CloudwatchHelper:
def _replace_all_config_variables(
self,
collection: Union[dict, list],
collection: Union[Dict[str, Any], str],
node_id: str,
cluster_name: str,
region: str,
) -> Tuple[(Union[dict, list], int)]:
) -> Union[str, Dict[str, Any]]:
"""
Replace known config variable occurrences in the input collection.
The input collection must be either a dict or list.
Returns a tuple consisting of the output collection and the number of
modified strings in the collection (which is not necessarily equal to
the number of variables replaced).
"""
for key in collection:
if type(collection) is dict:
value = collection.get(key)
@ -303,6 +325,12 @@ class CloudwatchHelper:
elif type(collection) is list:
value = key
index_key = collection.index(key)
else:
raise ValueError(
f"Can't replace CloudWatch config variables "
f"in unsupported collection type: {type(collection)}."
f"Please check your CloudWatch JSON config files."
)
if type(value) is str:
collection[index_key] = self._replace_config_variables(
value, node_id, cluster_name, region
@ -344,8 +372,8 @@ class CloudwatchHelper:
return self._get_default_empty_config_file_hash()
else:
logger.info(
"Failed to fetch CloudWatch {} config from SSM "
"parameter store.".format(config_type)
"Failed to fetch Unified CloudWatch Agent config from SSM "
"parameter store."
)
logger.error(e)
raise e
@ -368,31 +396,25 @@ class CloudwatchHelper:
def _sha1_hash_json(self, value: str) -> str:
"""calculate the json string sha1 hash"""
hash = hashlib.new("sha1")
sha1_hash = hashlib.new("sha1")
binary_value = value.encode("ascii")
hash.update(binary_value)
sha1_res = hash.hexdigest()
sha1_hash.update(binary_value)
sha1_res = sha1_hash.hexdigest()
return sha1_res
def _sha1_hash_file(self, config_type: str) -> str:
"""calculate the config file sha1 hash"""
if config_type == "agent":
config = self._replace_cwa_config_variables()
if config_type == "dashboard":
config = self._replace_dashboard_config_variables()
if config_type == "alarm":
config = self._load_config_file("alarm")
config = self.CLOUDWATCH_CONFIG_TYPE_TO_CONFIG_VARIABLE_REPLACE_FUNC.get(
config_type
)(config_type)
value = json.dumps(config)
sha1_res = self._sha1_hash_json(value)
return sha1_res
def _upload_config_to_ssm_and_set_hash_tag(self, config_type: str):
if config_type == "agent":
data = self._replace_cwa_config_variables()
if config_type == "dashboard":
data = self._replace_dashboard_config_variables()
if config_type == "alarm":
data = self._load_config_file("alarm")
data = self.CLOUDWATCH_CONFIG_TYPE_TO_CONFIG_VARIABLE_REPLACE_FUNC.get(
config_type
)(config_type)
sha1_hash_value = self._sha1_hash_file(config_type)
self._upload_config_to_ssm(data, config_type)
self._update_cloudwatch_hash_tag_value(
@ -405,7 +427,7 @@ class CloudwatchHelper:
Tags=[{"Key": CLOUDWATCH_AGENT_INSTALLED_TAG, "Value": "True"}],
)
logger.info(
"Successfully add Unified Cloudwatch Agent installed "
"Successfully add Unified CloudWatch Agent installed "
"tag on {}".format(node_id)
)
@ -444,12 +466,12 @@ class CloudwatchHelper:
param_name = self._get_ssm_param_name(config_type)
self._put_ssm_param(param, param_name)
def _replace_cwa_config_variables(self) -> Dict[str, Any]:
def _replace_cwa_config_vars(self, config_type: str) -> Dict[str, Any]:
"""
replace {instance_id}, {region}, {cluster_name}
variable occurrences in Unified Cloudwatch Agent config file
"""
cwa_config = self._load_config_file("agent")
cwa_config = self._load_config_file(config_type)
self._replace_all_config_variables(
cwa_config,
self.node_id,
@ -458,11 +480,11 @@ class CloudwatchHelper:
)
return cwa_config
def _replace_dashboard_config_variables(self) -> List[Dict[str, Any]]:
def _replace_dashboard_config_vars(self, config_type: str) -> List[str]:
"""
replace known variable occurrences in CloudWatch Dashboard config file
"""
data = self._load_config_file("dashboard")
data = self._load_config_file(config_type)
widgets = []
for item in data:
item_out = self._replace_all_config_variables(
@ -471,16 +493,15 @@ class CloudwatchHelper:
self.cluster_name,
self.provider_config["region"],
)
item_out = copy.deepcopy(item)
widgets.append(item_out)
return widgets
def _replace_alarm_config_variables(self) -> List[Dict[str, Any]]:
def _replace_alarm_config_vars(self, config_type: str) -> List[str]:
"""
replace {instance_id}, {region}, {cluster_name}
variable occurrences in cloudwatch alarm config file
"""
data = self._load_config_file("alarm")
data = self._load_config_file(config_type)
param_data = []
for item in data:
item_out = copy.deepcopy(item)
@ -494,11 +515,11 @@ class CloudwatchHelper:
return param_data
def _restart_cloudwatch_agent(self) -> None:
"""restart Unified Cloudwatch Agent"""
cwa_param_name = self._get_ssm_param_name("agent")
"""restart Unified CloudWatch Agent"""
cwa_param_name = self._get_ssm_param_name(CloudwatchConfigType.AGENT.value)
logger.info(
"Restarting Unified Cloudwatch Agent package on {} node(s).".format(
(self.node_id)
"Restarting Unified CloudWatch Agent package on node {}.".format(
self.node_id
)
)
self._stop_cloudwatch_agent()
@ -691,7 +712,9 @@ class CloudwatchHelper:
default ray instance profile name if cloudwatch config file
doesn't exist.
"""
cwa_cfg_exists = CloudwatchHelper.cloudwatch_config_exists(config, "agent")
cwa_cfg_exists = CloudwatchHelper.cloudwatch_config_exists(
config, CloudwatchConfigType.AGENT.value
)
return (
CLOUDWATCH_RAY_INSTANCE_PROFILE
if cwa_cfg_exists
@ -712,7 +735,9 @@ class CloudwatchHelper:
default cloudwatch iam role name if cloudwatch config file exists.
default ray iam role name if cloudwatch config file doesn't exist.
"""
cwa_cfg_exists = CloudwatchHelper.cloudwatch_config_exists(config, "agent")
cwa_cfg_exists = CloudwatchHelper.cloudwatch_config_exists(
config, CloudwatchConfigType.AGENT.value
)
return CLOUDWATCH_RAY_IAM_ROLE if cwa_cfg_exists else default_iam_role_name
@staticmethod
@ -731,7 +756,9 @@ class CloudwatchHelper:
related operations if cloudwatch agent config is specifed in
cluster config file.
"""
cwa_cfg_exists = CloudwatchHelper.cloudwatch_config_exists(config, "agent")
cwa_cfg_exists = CloudwatchHelper.cloudwatch_config_exists(
config, CloudwatchConfigType.AGENT.value
)
if cwa_cfg_exists:
cloudwatch_managed_policy = {
"Version": "2012-10-17",

View file

@ -1,7 +1,7 @@
import pytest
from ray.autoscaler._private.constants import BOTO_MAX_RETRIES
from ray.autoscaler._private.aws.utils import resource_cache
from ray.autoscaler._private.aws.utils import resource_cache, client_cache
from botocore.stub import Stubber
@ -38,3 +38,19 @@ def ec2_client_stub_max_retries():
with Stubber(resource.meta.client) as stubber:
yield stubber
stubber.assert_no_pending_responses()
@pytest.fixture()
def cloudwatch_client_stub():
resource = resource_cache("cloudwatch", "us-west-2")
with Stubber(resource.meta.client) as stubber:
yield stubber
stubber.assert_no_pending_responses()
@pytest.fixture()
def ssm_client_stub():
client = client_cache("ssm", "us-west-2")
with Stubber(client) as stubber:
yield stubber
stubber.assert_no_pending_responses()

View file

@ -794,6 +794,366 @@ def test_use_subnets_ordered_by_az(ec2_client_stub):
assert set(offsets[10:15]) == {0}, "Last 5 should be in us-west-2a"
def test_cloudwatch_dashboard_creation(cloudwatch_client_stub, ssm_client_stub):
# create test cluster node IDs and an associated cloudwatch helper
node_id = "i-abc"
cloudwatch_helper = helpers.get_cloudwatch_helper(node_id)
# given a directive to create a cluster CloudWatch Dashboard...
# expect to make a call to create a dashboard for each node in the cluster
stubs.put_cluster_dashboard_success(
cloudwatch_client_stub,
cloudwatch_helper,
)
# given our mocks and the example CloudWatch Dashboard config as input...
# expect a cluster CloudWatch Dashboard to be created successfully
cloudwatch_helper._put_cloudwatch_dashboard()
# expect no pending responses left in the CloudWatch client stub queue
cloudwatch_client_stub.assert_no_pending_responses()
def test_cloudwatch_alarm_creation(cloudwatch_client_stub, ssm_client_stub):
# create test cluster node IDs and an associated cloudwatch helper
node_id = "i-abc"
cloudwatch_helper = helpers.get_cloudwatch_helper(node_id)
# given a directive to update a cluster CloudWatch Alarm Config without any
# change...
# expect the stored the CloudWatch Alarm Config is same as local config
cw_ssm_param_name = helpers.get_ssm_param_name(
cloudwatch_helper.cluster_name, "alarm"
)
stubs.get_param_ssm_same(
ssm_client_stub, cw_ssm_param_name, cloudwatch_helper, "alarm"
)
# given a directive to create cluster CloudWatch alarms...
# expect to make a call to create alarms for each node in the cluster
stubs.put_cluster_alarms_success(cloudwatch_client_stub, cloudwatch_helper)
# given our mocks and the example CloudWatch Alarm config as input...
# expect cluster alarms to be created successfully
cloudwatch_helper._put_cloudwatch_alarm()
# expect no pending responses left in the CloudWatch client stub queue
cloudwatch_client_stub.assert_no_pending_responses()
def test_cloudwatch_agent_update_without_change_head_node(
ssm_client_stub, ec2_client_stub
):
# create test cluster head node ID and an associated cloudwatch helper
node_id = "i-abc"
is_head_node = True
cloudwatch_helper = helpers.get_cloudwatch_helper(node_id)
# given a directive to check for the Unified CloudWatch Agent status...
# expect CloudWatch Agent is installed
stubs.get_ec2_cwa_installed_tag_true(ec2_client_stub, node_id)
# given a directive to update a cluster CloudWatch Agent Config without any
# change...
# expect the stored the CloudWatch Agent Config is same as local config
cw_ssm_param_name = helpers.get_ssm_param_name(
cloudwatch_helper.cluster_name, "agent"
)
stubs.get_param_ssm_same(
ssm_client_stub, cw_ssm_param_name, cloudwatch_helper, "agent"
)
# given our mocks and the same cloudwatch agent config as input...
# expect no update performed on CloudWatch Agent Config
cloudwatch_helper._update_cloudwatch_config("agent", is_head_node)
def test_cloudwatch_agent_update_with_change_head_node(
ec2_client_stub, ssm_client_stub
):
# create test cluster head node ID and an associated cloudwatch helper
node_id = "i-abc"
is_head_node = True
cloudwatch_helper = helpers.get_cloudwatch_helper(node_id)
# given a directive to check for the Unified CloudWatch Agent status...
# expect CloudWatch Agent is installed
stubs.get_ec2_cwa_installed_tag_true(ec2_client_stub, node_id)
# given a directive to update a cluster CloudWatch Agent Config with new
# changes...
# expect the stored the CloudWatch Agent Config is different from local
# config
cw_ssm_param_name = helpers.get_ssm_param_name(
cloudwatch_helper.cluster_name, "agent"
)
stubs.get_param_ssm_different(ssm_client_stub, cw_ssm_param_name)
# given an updated CloudWatch Agent Config file...
# expect to store the new CloudWatch Agent config as an SSM parameter
cmd_id = stubs.put_parameter_cloudwatch_config(
ssm_client_stub, cloudwatch_helper.cluster_name, "agent"
)
# given an updated CloudWatch Agent Config file...
# expect to update the node tag equal to updated config file sha1 hash
# to reflect the changes in config file
stubs.update_hash_tag_success(ec2_client_stub, node_id, "agent", cloudwatch_helper)
# given that updated CloudWatch Agent Config is put to Parameter Store...
# expect to send an SSM command to restart CloudWatch Agent on all nodes
cmd_id = stubs.send_command_stop_cwa(ssm_client_stub, node_id)
# given a SSM command to stop CloudWatch Agent sent to all nodes...
# expect to wait for the command to complete successfully on every node
stubs.list_command_invocations_success(ssm_client_stub, node_id, cmd_id)
cmd_id = stubs.send_command_start_cwa(ssm_client_stub, node_id, cw_ssm_param_name)
# given a SSM command to start CloudWatch Agent sent to all nodes...
# expect to wait for the command to complete successfully on every node
stubs.list_command_invocations_success(ssm_client_stub, node_id, cmd_id)
# given our mocks and the example CloudWatch Agent config as input...
# expect CloudWatch Agent configured to use updated file on each cluster
# node successfully
cloudwatch_helper._update_cloudwatch_config("agent", is_head_node)
# expect no pending responses left in client stub queues
ec2_client_stub.assert_no_pending_responses()
ssm_client_stub.assert_no_pending_responses()
def test_cloudwatch_agent_update_with_change_worker_node(
ec2_client_stub, ssm_client_stub
):
# create test cluster worker node ID and an associated cloudwatch helper
node_id = "i-abc"
is_head_node = False
cloudwatch_helper = helpers.get_cloudwatch_helper(node_id)
# given a directive to check for the Unified CloudWatch Agent status...
# expect CloudWatch Agent is installed
stubs.get_ec2_cwa_installed_tag_true(ec2_client_stub, node_id)
# given a directive to update a cluster CloudWatch Agent Config with new
# changes...
# expect the stored the CloudWatch Agent Config is different from local
# config
stubs.get_head_node_config_hash_different(
ec2_client_stub, "agent", cloudwatch_helper, node_id
)
stubs.get_cur_node_config_hash_different(ec2_client_stub, "agent", node_id)
# given an updated CloudWatch Agent Config file...
# expect to update the node tag equal to updated config file sha1 hash
# to reflect the changes in config file
stubs.update_hash_tag_success(ec2_client_stub, node_id, "agent", cloudwatch_helper)
# given that updated CloudWatch Agent Config is put to Parameter Store...
# expect to send an SSM command to restart CloudWatch Agent on all nodes
cmd_id = stubs.send_command_stop_cwa(ssm_client_stub, node_id)
# given a SSM command to stop CloudWatch Agent sent to all nodes...
# expect to wait for the command to complete successfully on every node
stubs.list_command_invocations_success(ssm_client_stub, node_id, cmd_id)
cw_ssm_param_name = helpers.get_ssm_param_name(
cloudwatch_helper.cluster_name, "agent"
)
cmd_id = stubs.send_command_start_cwa(ssm_client_stub, node_id, cw_ssm_param_name)
# given a SSM command to start CloudWatch Agent sent to all nodes...
# expect to wait for the command to complete successfully on every node
stubs.list_command_invocations_success(ssm_client_stub, node_id, cmd_id)
# given our mocks and the example CloudWatch Agent config as input...
# expect CloudWatch Agent configured to use updated file on each cluster
# node successfully
cloudwatch_helper._update_cloudwatch_config("agent", is_head_node)
# expect no pending responses left in client stub queues
ec2_client_stub.assert_no_pending_responses()
ssm_client_stub.assert_no_pending_responses()
def test_cloudwatch_dashboard_update_head_node(
ec2_client_stub, ssm_client_stub, cloudwatch_client_stub
):
# create test cluster head node ID and an associated cloudwatch helper
node_id = "i-abc"
is_head_node = True
cloudwatch_helper = helpers.get_cloudwatch_helper(node_id)
# given a directive to check for the Unified CloudWatch Agent status...
# expect CloudWatch Agent is installed
stubs.get_ec2_cwa_installed_tag_true(ec2_client_stub, node_id)
# given a directive to update a cluster CloudWatch Dashboard Config
# with new changes...
# expect the stored the CloudWatch Dashboard Config is different from local
# config
cw_ssm_param_name = helpers.get_ssm_param_name(
cloudwatch_helper.cluster_name, "dashboard"
)
stubs.get_param_ssm_different(ssm_client_stub, cw_ssm_param_name)
# given an updated CloudWatch Dashboard Config file...
# expect to store the new CloudWatch Dashboard config as an SSM parameter
stubs.put_parameter_cloudwatch_config(
ssm_client_stub, cloudwatch_helper.cluster_name, "dashboard"
)
# given an updated CloudWatch Dashboard Config file...
# expect to update the node tag equal to updated config file sha1 hash
# to reflect the changes in config file
stubs.update_hash_tag_success(
ec2_client_stub, node_id, "dashboard", cloudwatch_helper
)
# given a directive to create a cluster CloudWatch dashboard...
# expect to make a call to create a dashboard for each node in the cluster
stubs.put_cluster_dashboard_success(
cloudwatch_client_stub,
cloudwatch_helper,
)
# given our mocks and the example CloudWatch Dashboard config as input...
# expect CloudWatch Dashboard configured to use updated file
# on each cluster node successfully
cloudwatch_helper._update_cloudwatch_config("dashboard", is_head_node)
# expect no pending responses left in client stub queues
ec2_client_stub.assert_no_pending_responses()
ssm_client_stub.assert_no_pending_responses()
def test_cloudwatch_dashboard_update_worker_node(
ec2_client_stub, ssm_client_stub, cloudwatch_client_stub
):
# create test cluster worker node ID and an associated cloudwatch helper
node_id = "i-abc"
is_head_node = False
cloudwatch_helper = helpers.get_cloudwatch_helper(node_id)
# given a directive to check for the Unified CloudWatch Agent status...
# expect CloudWatch Agent is installed
stubs.get_ec2_cwa_installed_tag_true(ec2_client_stub, node_id)
# given a directive to update a cluster CloudWatch Dashboard Config
# with new changes...
# expect the stored the CloudWatch Dashboard Config is different from local
# config
stubs.get_head_node_config_hash_different(
ec2_client_stub, "dashboard", cloudwatch_helper, node_id
)
stubs.get_cur_node_config_hash_different(ec2_client_stub, "dashboard", node_id)
# given an updated CloudWatch Dashboard Config file...
# expect to update the node tag equal to updated config file sha1 hash
# to reflect the changes in config file
stubs.update_hash_tag_success(
ec2_client_stub, node_id, "dashboard", cloudwatch_helper
)
# given our mocks and the example CloudWatch Dashboard config as input...
# expect CloudWatch Dashboard configured to use updated file
# on each cluster node successfully
cloudwatch_helper._update_cloudwatch_config("dashboard", is_head_node)
# expect no pending responses left in client stub queues
ec2_client_stub.assert_no_pending_responses()
ssm_client_stub.assert_no_pending_responses()
def test_cloudwatch_alarm_update_head_node(
ec2_client_stub, ssm_client_stub, cloudwatch_client_stub
):
# create test cluster head node ID and an associated cloudwatch helper
node_id = "i-abc"
is_head_node = True
cloudwatch_helper = helpers.get_cloudwatch_helper(node_id)
# given a directive to check for the Unified CloudWatch Agent status...
# expect CloudWatch Agent is installed
stubs.get_ec2_cwa_installed_tag_true(ec2_client_stub, node_id)
# given a directive to update a cluster CloudWatch Alarm Config with new
# changes...
# expect the stored the CloudWatch Alarm Config is different from local
# config
cw_ssm_param_name = helpers.get_ssm_param_name(
cloudwatch_helper.cluster_name, "alarm"
)
stubs.get_param_ssm_different(ssm_client_stub, cw_ssm_param_name)
# given an updated CloudWatch Alarm Config file...
# expect to store the new CloudWatch Alarm config as an SSM parameter
stubs.put_parameter_cloudwatch_config(
ssm_client_stub, cloudwatch_helper.cluster_name, "alarm"
)
# given an updated CloudWatch Alarm Config file...
# expect to update the node tag equal to updated config file sha1 hash
# to reflect the changes in config file
stubs.update_hash_tag_success(ec2_client_stub, node_id, "alarm", cloudwatch_helper)
stubs.get_param_ssm_same(
ssm_client_stub, cw_ssm_param_name, cloudwatch_helper, "alarm"
)
# given a directive to create cluster CloudWatch Alarms...
# expect to make a call to create alarms for each node in the cluster
stubs.put_cluster_alarms_success(cloudwatch_client_stub, cloudwatch_helper)
# given our mocks and the example CloudWatch Alarm config as input...
# expect CloudWatch Alarm configured to use updated file on each cluster
# node successfully
cloudwatch_helper._update_cloudwatch_config("alarm", is_head_node)
# expect no pending responses left in client stub queues
ec2_client_stub.assert_no_pending_responses()
ssm_client_stub.assert_no_pending_responses()
def test_cloudwatch_alarm_update_worker_node(
ec2_client_stub, ssm_client_stub, cloudwatch_client_stub
):
# create test cluster worker node ID and an associated cloudwatch helper
node_id = "i-abc"
is_head_node = False
cloudwatch_helper = helpers.get_cloudwatch_helper(node_id)
# given a directive to check for the Unified CloudWatch Agent status...
# expect CloudWatch Agent is installed
stubs.get_ec2_cwa_installed_tag_true(ec2_client_stub, node_id)
# given a directive to update a cluster CloudWatch Alarm Config with new
# changes...
# expect the stored the CloudWatch Alarm Config is different from local
# config
cw_ssm_param_name = helpers.get_ssm_param_name(
cloudwatch_helper.cluster_name, "alarm"
)
# given a directive to update a cluster CloudWatch Alarm Config with new
# changes...
# expect the stored the CloudWatch Alarm Config is different from local
# config
stubs.get_head_node_config_hash_different(
ec2_client_stub, "alarm", cloudwatch_helper, node_id
)
stubs.get_cur_node_config_hash_different(ec2_client_stub, "alarm", node_id)
# given an updated CloudWatch Alarm Config file...
# expect to update the node tag equal to updated config file sha1 hash
# to reflect the changes in config file
stubs.update_hash_tag_success(ec2_client_stub, node_id, "alarm", cloudwatch_helper)
stubs.get_param_ssm_same(
ssm_client_stub, cw_ssm_param_name, cloudwatch_helper, "alarm"
)
# given a directive to create cluster CloudWatch Alarms...
# expect to make a call to create alarms for each node in the cluster
stubs.put_cluster_alarms_success(cloudwatch_client_stub, cloudwatch_helper)
# given our mocks and the example CloudWatch Alarm config as input...
# expect CloudWatch Alarm configured to use updated file on each cluster
# node successfully
cloudwatch_helper._update_cloudwatch_config("alarm", is_head_node)
# expect no pending responses left in client stub queues
ec2_client_stub.assert_no_pending_responses()
ssm_client_stub.assert_no_pending_responses()
if __name__ == "__main__":
import sys

View file

@ -17,6 +17,7 @@ from ray.tests.aws.utils.constants import (
DEFAULT_CLUSTER_NAME,
DEFAULT_NODE_PROVIDER_INSTANCE_TAGS,
)
from ray.autoscaler._private.aws.cloudwatch.cloudwatch_helper import CloudwatchHelper
def get_aws_example_config_file_path(file_name):
@ -100,3 +101,47 @@ def apply_node_provider_config_updates(config, node_cfg, node_type_name, max_cou
node_cfg.update(node_provider_cfg_updates)
# merge node provider tag specs with user overrides
AWSNodeProvider._merge_tag_specs(tag_specs, user_tag_specs)
def get_cloudwatch_agent_config_file_path():
return get_aws_example_config_file_path(
"cloudwatch/example-cloudwatch-agent-config.json"
)
def get_cloudwatch_dashboard_config_file_path():
return get_aws_example_config_file_path(
"cloudwatch/example-cloudwatch-dashboard-config.json"
)
def get_cloudwatch_alarm_config_file_path():
return get_aws_example_config_file_path(
"cloudwatch/example-cloudwatch-alarm-config.json"
)
def load_cloudwatch_example_config_file():
config = load_aws_example_config_file("example-cloudwatch.yaml")
cw_cfg = config["provider"]["cloudwatch"]
cw_cfg["agent"]["config"] = get_cloudwatch_agent_config_file_path()
cw_cfg["dashboard"]["config"] = get_cloudwatch_dashboard_config_file_path()
cw_cfg["alarm"]["config"] = get_cloudwatch_alarm_config_file_path()
return config
def get_cloudwatch_helper(node_ids):
config = load_cloudwatch_example_config_file()
config["cluster_name"] = DEFAULT_CLUSTER_NAME
return CloudwatchHelper(
config["provider"],
node_ids,
config["cluster_name"],
)
def get_ssm_param_name(cluster_name, config_type):
ssm_config_param_name = "AmazonCloudWatch-" + "ray_{}_config_{}".format(
config_type, cluster_name
)
return ssm_config_param_name

View file

@ -1,7 +1,9 @@
from typing import Dict, List
import ray
import copy
import json
from uuid import uuid4
from ray.tests.aws.utils import helpers
from ray.tests.aws.utils.constants import (
DEFAULT_INSTANCE_PROFILE,
@ -12,6 +14,15 @@ from ray.tests.aws.utils.constants import (
TWENTY_SUBNETS_IN_DIFFERENT_AZS,
)
from ray.autoscaler._private.aws.config import key_pair
from ray.tests.aws.utils.helpers import (
get_cloudwatch_dashboard_config_file_path,
get_cloudwatch_alarm_config_file_path,
)
from ray.autoscaler._private.aws.cloudwatch.cloudwatch_helper import (
CLOUDWATCH_AGENT_INSTALLED_TAG,
CLOUDWATCH_CONFIG_HASH_TAG_BASE,
)
from ray.autoscaler.tags import NODE_KIND_HEAD, TAG_RAY_NODE_KIND
from unittest import mock
@ -235,3 +246,352 @@ def describe_launch_template_versions_by_name_default(ec2_client_stub, versions)
},
service_response={"LaunchTemplateVersions": [DEFAULT_LT]},
)
def describe_instance_status_ok(ec2_client_stub, instance_ids):
ec2_client_stub.add_response(
"describe_instance_status",
expected_params={"InstanceIds": instance_ids},
service_response={
"InstanceStatuses": [
{
"InstanceId": instance_id,
"InstanceState": {"Code": 16, "Name": "running"},
"AvailabilityZone": "us-west-2",
"SystemStatus": {
"Status": "ok",
"Details": [{"Status": "passed", "Name": "reachability"}],
},
"InstanceStatus": {
"Status": "ok",
"Details": [{"Status": "passed", "Name": "reachability"}],
},
}
]
for instance_id in instance_ids
},
)
def get_ec2_cwa_installed_tag_true(ec2_client_stub, node_id):
ec2_client_stub.add_response(
"describe_instances",
expected_params={"InstanceIds": [node_id]},
service_response={
"Reservations": [
{
"Instances": [
{
"InstanceId": node_id,
"Tags": [
{
"Key": CLOUDWATCH_AGENT_INSTALLED_TAG,
"Value": "True",
},
],
}
]
}
]
},
)
def update_hash_tag_success(ec2_client_stub, node_id, config_type, cloudwatch_helper):
hash_key_value = "-".join([CLOUDWATCH_CONFIG_HASH_TAG_BASE, config_type])
cur_hash_value = get_sha1_hash_of_cloudwatch_config_file(
config_type, cloudwatch_helper
)
ec2_client_stub.add_response(
"create_tags",
expected_params={
"Resources": [node_id],
"Tags": [{"Key": hash_key_value, "Value": cur_hash_value}],
},
service_response={"ResponseMetadata": {"HTTPStatusCode": 200}},
)
def add_cwa_installed_tag_response(ec2_client_stub, node_id):
ec2_client_stub.add_response(
"create_tags",
expected_params={
"Resources": node_id,
"Tags": [{"Key": CLOUDWATCH_AGENT_INSTALLED_TAG, "Value": "True"}],
},
service_response={"ResponseMetadata": {"HTTPStatusCode": 200}},
)
def get_head_node_config_hash_different(ec2_client_stub, config_type, cwh, node_id):
hash_key_value = "-".join([CLOUDWATCH_CONFIG_HASH_TAG_BASE, config_type])
cur_hash_value = get_sha1_hash_of_cloudwatch_config_file(config_type, cwh)
filters = cwh._get_current_cluster_session_nodes(cwh.cluster_name)
filters.append(
{
"Name": "tag:{}".format(TAG_RAY_NODE_KIND),
"Values": [NODE_KIND_HEAD],
}
)
ec2_client_stub.add_response(
"describe_instances",
expected_params={"Filters": filters},
service_response={
"Reservations": [
{
"Instances": [
{
"InstanceId": node_id,
"Tags": [
{"Key": hash_key_value, "Value": cur_hash_value},
],
}
]
}
]
},
)
def get_cur_node_config_hash_different(ec2_client_stub, config_type, node_id):
hash_key_value = "-".join([CLOUDWATCH_CONFIG_HASH_TAG_BASE, config_type])
ec2_client_stub.add_response(
"describe_instances",
expected_params={"InstanceIds": [node_id]},
service_response={
"Reservations": [
{
"Instances": [
{
"InstanceId": node_id,
"Tags": [
{"Key": hash_key_value, "Value": str(uuid4())},
],
}
]
}
]
},
)
def send_command_cwa_install(ssm_client_stub, node_id):
command_id = str(uuid4())
ssm_client_stub.add_response(
"send_command",
expected_params={
"DocumentName": "AWS-ConfigureAWSPackage",
"InstanceIds": node_id,
"MaxConcurrency": "1",
"MaxErrors": "0",
"Parameters": {
"action": ["Install"],
"name": ["AmazonCloudWatchAgent"],
"version": ["latest"],
},
},
service_response={
"Command": {
"CommandId": command_id,
"DocumentName": "AWS-ConfigureAWSPackage",
}
},
)
return command_id
def list_command_invocations_status(ssm_client_stub, node_id, cmd_id, status):
ssm_client_stub.add_response(
"list_command_invocations",
expected_params={"CommandId": cmd_id, "InstanceId": node_id},
service_response={"CommandInvocations": [{"Status": status}]},
)
def list_command_invocations_failed(ssm_client_stub, node_id, cmd_id):
status = "Failed"
list_command_invocations_status(ssm_client_stub, node_id, cmd_id, status)
def list_command_invocations_success(ssm_client_stub, node_id, cmd_id):
status = "Success"
list_command_invocations_status(ssm_client_stub, node_id, cmd_id, status)
def put_parameter_cloudwatch_config(ssm_client_stub, cluster_name, section_name):
ssm_config_param_name = helpers.get_ssm_param_name(cluster_name, section_name)
ssm_client_stub.add_response(
"put_parameter",
expected_params={
"Name": ssm_config_param_name,
"Type": "String",
"Value": ANY,
"Overwrite": True,
"Tier": ANY,
},
service_response={},
)
def send_command_cwa_collectd_init(ssm_client_stub, node_id):
command_id = str(uuid4())
ssm_client_stub.add_response(
"send_command",
expected_params={
"DocumentName": "AWS-RunShellScript",
"InstanceIds": [node_id],
"MaxConcurrency": "1",
"MaxErrors": "0",
"Parameters": {
"commands": [
"mkdir -p /usr/share/collectd/",
"touch /usr/share/collectd/types.db",
],
},
},
service_response={"Command": {"CommandId": command_id}},
)
return command_id
def send_command_start_cwa(ssm_client_stub, node_id, parameter_name):
command_id = str(uuid4())
ssm_client_stub.add_response(
"send_command",
expected_params={
"DocumentName": "AmazonCloudWatch-ManageAgent",
"InstanceIds": [node_id],
"MaxConcurrency": "1",
"MaxErrors": "0",
"Parameters": {
"action": ["configure"],
"mode": ["ec2"],
"optionalConfigurationSource": ["ssm"],
"optionalConfigurationLocation": [parameter_name],
"optionalRestart": ["yes"],
},
},
service_response={"Command": {"CommandId": command_id}},
)
return command_id
def send_command_stop_cwa(ssm_client_stub, node_id):
command_id = str(uuid4())
ssm_client_stub.add_response(
"send_command",
expected_params={
"DocumentName": "AmazonCloudWatch-ManageAgent",
"InstanceIds": [node_id],
"MaxConcurrency": "1",
"MaxErrors": "0",
"Parameters": {
"action": ["stop"],
"mode": ["ec2"],
},
},
service_response={"Command": {"CommandId": command_id}},
)
return command_id
def get_param_ssm_same(ssm_client_stub, ssm_param_name, cloudwatch_helper, config_type):
command_id = str(uuid4())
cw_value_json = (
cloudwatch_helper.CLOUDWATCH_CONFIG_TYPE_TO_CONFIG_VARIABLE_REPLACE_FUNC.get(
config_type
)(config_type)
)
ssm_client_stub.add_response(
"get_parameter",
expected_params={"Name": ssm_param_name},
service_response={"Parameter": {"Value": json.dumps(cw_value_json)}},
)
return command_id
def get_sha1_hash_of_cloudwatch_config_file(config_type, cloudwatch_helper):
cw_value_file = cloudwatch_helper._sha1_hash_file(config_type)
return cw_value_file
def get_param_ssm_different(ssm_client_stub, ssm_param_name):
command_id = str(uuid4())
ssm_client_stub.add_response(
"get_parameter",
expected_params={"Name": ssm_param_name},
service_response={"Parameter": {"Value": "value"}},
)
return command_id
def get_param_ssm_exception(ssm_client_stub, ssm_param_name):
command_id = str(uuid4())
ssm_client_stub.add_client_error(
"get_parameter",
"ParameterNotFound",
expected_params={"Name": ssm_param_name},
response_meta={"Error": {"Code": "ParameterNotFound"}},
)
return command_id
def put_cluster_dashboard_success(cloudwatch_client_stub, cloudwatch_helper):
widgets = []
json_config_path = get_cloudwatch_dashboard_config_file_path()
with open(json_config_path) as f:
dashboard_config = json.load(f)
for item in dashboard_config:
item_out = cloudwatch_helper._replace_all_config_variables(
item,
cloudwatch_helper.node_id,
cloudwatch_helper.cluster_name,
cloudwatch_helper.provider_config["region"],
)
widgets.append(item_out)
dashboard_name = cloudwatch_helper.cluster_name + "-" + "example-dashboard-name"
cloudwatch_client_stub.add_response(
"put_dashboard",
expected_params={
"DashboardName": dashboard_name,
"DashboardBody": json.dumps({"widgets": widgets}),
},
service_response={"ResponseMetadata": {"HTTPStatusCode": 200}},
)
def put_cluster_alarms_success(cloudwatch_client_stub, cloudwatch_helper):
json_config_path = get_cloudwatch_alarm_config_file_path()
with open(json_config_path) as f:
data = json.load(f)
for item in data:
item_out = copy.deepcopy(item)
cloudwatch_helper._replace_all_config_variables(
item_out,
cloudwatch_helper.node_id,
cloudwatch_helper.cluster_name,
cloudwatch_helper.provider_config["region"],
)
cloudwatch_client_stub.add_response(
"put_metric_alarm",
expected_params=item_out,
service_response={"ResponseMetadata": {"HTTPStatusCode": 200}},
)
def get_metric_alarm(cloudwatch_client_stub):
cloudwatch_client_stub.add_response(
"describe_alarms",
expected_params={},
service_response={"MetricAlarms": [{"AlarmName": "myalarm"}]},
)
def delete_metric_alarms(cloudwatch_client_stub):
cloudwatch_client_stub.add_response(
"delete_alarms",
expected_params={"AlarmNames": ["myalarm"]},
service_response={"ResponseMetadata": {"HTTPStatusCode": 200}},
)