From 71fae21e8e86c75bc58b53dccae563d15691610f Mon Sep 17 00:00:00 2001 From: Zyiqin-Miranda <79943187+Zyiqin-Miranda@users.noreply.github.com> Date: Mon, 10 Jan 2022 10:18:53 -0800 Subject: [PATCH] [autoscaler] AWS Autoscaler CloudWatch Dashboard support (#20266) These changes add a set of improvements to enable automatic creation and update of CloudWatch dashboards when provisioning AWS Autoscaling clusters. Successful implementation of these improvements will allow AWS Autoscaler users to: 1. Get rapid insights into their cluster state via CloudWatch dashboards. 2. Allow users to update their CloudWatch dashboard JSON configuration files during Ray up execution time. Notes: 1. This PR is a follow-up PR for #18619, adds dashboard support. --- .../aws/cloudwatch/cloudwatch_helper.py | 409 ++++++++++-------- .../autoscaler/_private/aws/node_provider.py | 2 +- python/ray/autoscaler/_private/updater.py | 2 +- .../example-cloudwatch-agent-config.json | 126 +++--- .../example-cloudwatch-dashboard-config.json | 238 ++++++++++ .../autoscaler/aws/example-cloudwatch.yaml | 16 +- python/ray/autoscaler/ray-schema.json | 10 + 7 files changed, 563 insertions(+), 240 deletions(-) create mode 100644 python/ray/autoscaler/aws/cloudwatch/example-cloudwatch-dashboard-config.json diff --git a/python/ray/autoscaler/_private/aws/cloudwatch/cloudwatch_helper.py b/python/ray/autoscaler/_private/aws/cloudwatch/cloudwatch_helper.py index c493da3d9..6dcb3dfde 100644 --- a/python/ray/autoscaler/_private/aws/cloudwatch/cloudwatch_helper.py +++ b/python/ray/autoscaler/_private/aws/cloudwatch/cloudwatch_helper.py @@ -21,10 +21,9 @@ CLOUDWATCH_CONFIG_HASH_TAG_BASE = "cloudwatch-config-hash" class CloudwatchHelper: - def __init__(self, provider_config: Dict[str, Any], node_ids: List[str], + def __init__(self, provider_config: Dict[str, Any], node_id: str, cluster_name: str) -> None: - # dedupe and sort node IDs to support deterministic unit test stubs - self.node_ids = sorted(set(node_ids)) + self.node_id = node_id self.cluster_name = cluster_name self.provider_config = provider_config region = provider_config["region"] @@ -41,10 +40,13 @@ class CloudwatchHelper: is_head_node: whether this node is the head node. """ if CloudwatchHelper.cloudwatch_config_exists(self.provider_config, - "config"): - self._update_cloudwatch_agent_config(is_head_node) + "agent"): + self._update_cloudwatch_config(is_head_node, "agent") + if CloudwatchHelper.cloudwatch_config_exists(self.provider_config, + "dashboard"): + self._update_cloudwatch_config(is_head_node, "dashboard") - def _ec2_health_check_waiter(self, node_ids: List[str]) -> None: + def _ec2_health_check_waiter(self, node_id: str) -> None: # wait for all EC2 instance checks to complete try: logger.info( @@ -52,59 +54,101 @@ class CloudwatchHelper: "configuring Unified Cloudwatch Agent. This may take a few " "minutes...") waiter = self.ec2_client.get_waiter("instance_status_ok") - waiter.wait(InstanceIds=node_ids) + waiter.wait(InstanceIds=[node_id]) except botocore.exceptions.WaiterError as e: logger.error( "Failed while waiting for EC2 instance checks to complete: {}". format(e.message)) raise e - def _update_cloudwatch_agent_config(self, is_head_node: bool) -> None: - """ check whether update operations are needed. + def _update_cloudwatch_config(self, is_head_node: bool, + config_type: str) -> None: + """Update remote CloudWatch configs at Parameter Store, + update hash tag value on node and perform associated operations + at CloudWatch console if local CloudWatch configs change. + + Args: + is_head_node: whether this node is the head node. + config_type: CloudWatch config file type. """ cwa_installed = self._setup_cwa() - param_name = self._get_ssm_param_name() + param_name = self._get_ssm_param_name(config_type) if cwa_installed: if is_head_node: cw_config_ssm = self._set_cloudwatch_ssm_config_param( - param_name) - cur_cw_config_hash = self._sha1_hash_file() + param_name, config_type) + cur_cw_config_hash = self._sha1_hash_file(config_type) ssm_cw_config_hash = self._sha1_hash_json(cw_config_ssm) # check if user updated Unified Cloudwatch Agent config file. # if so, perform corresponding actions. if cur_cw_config_hash != ssm_cw_config_hash: logger.info( - "Unified Cloudwatch Agent config file has changed.") - self._upload_config_to_ssm_and_set_hash_tag() - self._restart_cloudwatch_agent() + "Cloudwatch {} config file has changed.".format( + config_type)) + self._upload_config_to_ssm_and_set_hash_tag(config_type) + if config_type == "agent": + self._restart_cloudwatch_agent() + elif config_type == "dashboard": + self._put_cloudwatch_dashboard() else: - head_node_hash = self._get_head_node_config_hash() - cur_node_hash = self._get_cur_node_config_hash() + head_node_hash = self._get_head_node_config_hash(config_type) + cur_node_hash = self._get_cur_node_config_hash(config_type) if head_node_hash != cur_node_hash: logger.info( - "Unified Cloudwatch Agent config file has changed.") - self._restart_cloudwatch_agent() + "Cloudwatch {} config file has changed.".format( + config_type)) + if config_type == "agent": + self._restart_cloudwatch_agent() self._update_cloudwatch_hash_tag_value( - self.node_ids, head_node_hash) + self.node_id, head_node_hash, config_type) - def _send_command_to_nodes(self, document_name: str, parameters: List[str], - node_ids: List[str]) -> Dict[str, Any]: + def _put_cloudwatch_dashboard(self) -> Dict[str, Any]: + """put dashboard to cloudwatch console""" + + cloudwatch_config = self.provider_config["cloudwatch"] + dashboard_config = cloudwatch_config \ + .get("dashboard", {}) + dashboard_name_cluster = dashboard_config.get("name", + self.cluster_name) + dashboard_name = self.cluster_name + "-" + dashboard_name_cluster + + widgets = self._replace_dashboard_config_variables() + + response = self.cloudwatch_client.put_dashboard( + DashboardName=dashboard_name, + DashboardBody=json.dumps({ + "widgets": widgets + })) + issue_count = len(response.get("DashboardValidationMessages", [])) + if issue_count > 0: + for issue in response.get("DashboardValidationMessages"): + logging.error("Error in dashboard config: {} - {}".format( + issue["Message"], issue["DataPath"])) + raise Exception( + "Errors in dashboard configuration: {} issues raised".format( + issue_count)) + else: + logger.info("Successfully put dashboard to CloudWatch console") + return response + + def _send_command_to_node(self, document_name: str, parameters: List[str], + node_id: str) -> Dict[str, Any]: """ send SSM command to the given nodes """ logger.debug("Sending SSM command to {} node(s). Document name: {}. " - "Parameters: {}.".format( - len(node_ids), document_name, parameters)) + "Parameters: {}.".format(node_id, document_name, + parameters)) response = self.ssm_client.send_command( - InstanceIds=node_ids, + InstanceIds=[node_id], DocumentName=document_name, Parameters=parameters, - MaxConcurrency=str(min(len(node_ids), 100)), + MaxConcurrency=str(min(len(node_id), 1)), MaxErrors="0") return response def _ssm_command_waiter(self, document_name: str, parameters: List[str], - node_ids: List[str], + node_id: str, retry_failed: bool = True) -> bool: """ wait for SSM command to complete on all cluster nodes """ @@ -113,76 +157,72 @@ class CloudwatchHelper: # exist instead of failing immediately, and by resubmitting # any failed command until all retry attempts are exhausted # by default. - response = self._send_command_to_nodes(document_name, parameters, - node_ids) + response = self._send_command_to_node(document_name, parameters, + node_id) command_id = response["Command"]["CommandId"] cloudwatch_config = self.provider_config["cloudwatch"] agent_retryer_config = cloudwatch_config \ - .get("agent", {}) \ + .get("agent") \ .get("retryer", {}) max_attempts = agent_retryer_config.get("max_attempts", 120) delay_seconds = agent_retryer_config.get("delay_seconds", 30) num_attempts = 0 cmd_invocation_res = {} - for node_id in node_ids: - while True: - num_attempts += 1 - logger.debug("Listing SSM command ID {} invocations on node {}" - .format(command_id, node_id)) - response = self.ssm_client.list_command_invocations( - CommandId=command_id, - InstanceId=node_id, - ) - cmd_invocations = response["CommandInvocations"] - if not cmd_invocations: - logger.debug( - "SSM Command ID {} invocation does not exist. If " - "the command was just started, it may take a " - "few seconds to register.".format(command_id)) - else: - if len(cmd_invocations) > 1: - logger.warning( - "Expected to find 1 SSM command invocation with " - "ID {} on node {} but found {}: {}".format( - command_id, - node_id, - len(cmd_invocations), - cmd_invocations, - )) - cmd_invocation = cmd_invocations[0] - if cmd_invocation["Status"] == "Success": + while True: + num_attempts += 1 + logger.debug("Listing SSM command ID {} invocations on node {}" + .format(command_id, node_id)) + response = self.ssm_client.list_command_invocations( + CommandId=command_id, + InstanceId=node_id, + ) + cmd_invocations = response["CommandInvocations"] + if not cmd_invocations: + logger.debug("SSM Command ID {} invocation does not exist. If " + "the command was just started, it may take a " + "few seconds to register.".format(command_id)) + else: + if len(cmd_invocations) > 1: + logger.warning( + "Expected to find 1 SSM command invocation with " + "ID {} on node {} but found {}: {}".format( + command_id, + node_id, + len(cmd_invocations), + cmd_invocations, + )) + cmd_invocation = cmd_invocations[0] + if cmd_invocation["Status"] == "Success": + logger.debug("SSM Command ID {} completed successfully." + .format(command_id)) + cmd_invocation_res[node_id] = True + break + if num_attempts >= max_attempts: + logger.error( + "Max attempts for command {} exceeded on node {}" + .format(command_id, node_id)) + raise botocore.exceptions.WaiterError( + name="ssm_waiter", + reason="Max attempts exceeded", + last_response=cmd_invocation, + ) + if cmd_invocation["Status"] == "Failed": + logger.debug(f"SSM Command ID {command_id} failed.") + if retry_failed: + logger.debug(f"Retrying in {delay_seconds} seconds.") + response = self._send_command_to_node( + document_name, parameters, node_id) + command_id = response["Command"]["CommandId"] + logger.debug("Sent SSM command ID {} to node {}" + .format(command_id, node_id)) + else: logger.debug( - "SSM Command ID {} completed successfully." - .format(command_id)) - cmd_invocation_res[node_id] = True + f"Ignoring Command ID {command_id} failure.") + cmd_invocation_res[node_id] = False break - if num_attempts >= max_attempts: - logger.error( - "Max attempts for command {} exceeded on node {}" - .format(command_id, node_id)) - raise botocore.exceptions.WaiterError( - name="ssm_waiter", - reason="Max attempts exceeded", - last_response=cmd_invocation, - ) - if cmd_invocation["Status"] == "Failed": - logger.debug(f"SSM Command ID {command_id} failed.") - if retry_failed: - logger.debug( - f"Retrying in {delay_seconds} seconds.") - response = self._send_command_to_nodes( - document_name, parameters, [node_id]) - command_id = response["Command"]["CommandId"] - logger.debug("Sent SSM command ID {} to node {}" - .format(command_id, node_id)) - else: - logger.debug( - f"Ignoring Command ID {command_id} failure.") - cmd_invocation_res[node_id] = False - break + time.sleep(delay_seconds) - time.sleep(delay_seconds) return cmd_invocation_res def _replace_config_variables(self, string: str, node_id: str, @@ -211,7 +251,6 @@ class CloudwatchHelper: modified strings in the collection (which is not necessarily equal to the number of variables replaced). """ - modified_value_count = 0 for key in collection: if type(collection) is dict: value = collection.get(key) @@ -222,49 +261,48 @@ class CloudwatchHelper: if type(value) is str: collection[index_key] = self._replace_config_variables( value, node_id, cluster_name, region) - modified_value_count += (collection[index_key] != value) elif type(value) is dict or type(value) is list: - collection[index_key], modified_count = self. \ + collection[index_key] = self. \ _replace_all_config_variables( value, node_id, cluster_name, region) - modified_value_count += modified_count - return collection, modified_value_count + return collection - def _load_config_file(self) -> Dict[str, Any]: + def _load_config_file(self, config_type: str) -> Dict[str, Any]: """load JSON config file""" cloudwatch_config = self.provider_config["cloudwatch"] - json_config_file_section = cloudwatch_config.get("agent", {}) + json_config_file_section = cloudwatch_config.get(config_type, {}) json_config_file_path = json_config_file_section.get("config", {}) json_config_path = os.path.abspath(json_config_file_path) with open(json_config_path) as f: data = json.load(f) return data - def _set_cloudwatch_ssm_config_param(self, parameter_name: str) -> str: + def _set_cloudwatch_ssm_config_param(self, parameter_name: str, + config_type: str) -> str: """ get cloudwatch config for the given param and config type from SSM - if it exists, returns empty str if not. + if it exists, put it in the SSM param store if not """ try: parameter_value = self._get_ssm_param(parameter_name) except botocore.exceptions.ClientError as e: if e.response["Error"]["Code"] == "ParameterNotFound": logger.info( - "Unified Cloudwatch Agent config file is not found " + "Cloudwatch {} config file is not found " "at SSM parameter store. " - "Checking for Unified Cloudwatch Agent installation") + "Checking for Unified CloudWatch Agent installation". + format(config_type)) return self._get_default_empty_config_file_hash() else: - logger.info( - "Failed to fetch Unified Cloudwatch Agent config from SSM " - "parameter store.") + logger.info("Failed to fetch CloudWatch {} config from SSM " + "parameter store.".format(config_type)) logger.error(e) raise e return parameter_value def _get_default_empty_config_file_hash(self): - default_cwa_config = "{}" - parameter_value = self._sha1_hash_json(default_cwa_config) + default_cw_config = "{}" + parameter_value = self._sha1_hash_json(default_cw_config) return parameter_value def _get_ssm_param(self, parameter_name: str) -> str: @@ -286,48 +324,54 @@ class CloudwatchHelper: sha1_res = hash.hexdigest() return sha1_res - def _sha1_hash_file(self) -> str: + def _sha1_hash_file(self, config_type: str) -> str: """calculate the config file sha1 hash""" - config = self._replace_cwa_config_variables() + if config_type == "agent": + config = self._replace_cwa_config_variables() + if config_type == "dashboard": + config = self._replace_dashboard_config_variables() value = json.dumps(config) sha1_res = self._sha1_hash_json(value) return sha1_res - def _upload_config_to_ssm_and_set_hash_tag(self): - """This function should only be called by head node""" - data = self._replace_cwa_config_variables() - sha1_hash_value = self._sha1_hash_file() - self._upload_config_to_ssm(data) - self._update_cloudwatch_hash_tag_value(self.node_ids, sha1_hash_value) + def _upload_config_to_ssm_and_set_hash_tag(self, config_type: str): + if config_type == "agent": + data = self._replace_cwa_config_variables() + if config_type == "dashboard": + data = self._replace_dashboard_config_variables() + sha1_hash_value = self._sha1_hash_file(config_type) + self._upload_config_to_ssm(data, config_type) + self._update_cloudwatch_hash_tag_value(self.node_id, sha1_hash_value, + config_type) - def _add_cwa_installed_tag(self, node_ids: List[str]) -> None: + def _add_cwa_installed_tag(self, node_id: str) -> None: self.ec2_client.create_tags( - Resources=node_ids, + Resources=[node_id], Tags=[{ "Key": CLOUDWATCH_AGENT_INSTALLED_TAG, "Value": "True" }]) logger.info("Successfully add Unified Cloudwatch Agent installed " - "tag on {}".format(node_ids)) + "tag on {}".format(node_id)) - def _update_cloudwatch_hash_tag_value(self, node_ids: List[str], - sha1_hash_value: str): - hash_key_value = "-".join([CLOUDWATCH_CONFIG_HASH_TAG_BASE, "agent"]) + def _update_cloudwatch_hash_tag_value( + self, node_id: str, sha1_hash_value: str, config_type: str): + hash_key_value = "-".join( + [CLOUDWATCH_CONFIG_HASH_TAG_BASE, config_type]) self.ec2_client.create_tags( - Resources=node_ids, + Resources=[node_id], Tags=[{ "Key": hash_key_value, "Value": sha1_hash_value }]) - logger.info( - "Successfully update Unified Cloudwatch Agent hash tag on {}". - format(node_ids)) + logger.info("Successfully update cloudwatch {} hash tag on {}".format( + config_type, node_id)) - def _get_ssm_param_name(self) -> str: + def _get_ssm_param_name(self, config_type: str) -> str: """return the parameter name for cloudwatch configs""" ssm_config_param_name = \ "AmazonCloudWatch-" + "ray_{}_config_{}". \ - format("agent", self.cluster_name) + format(config_type, self.cluster_name) return ssm_config_param_name def _put_ssm_param(self, parameter: Dict[str, Any], @@ -341,8 +385,8 @@ class CloudwatchHelper: Tier="Intelligent-Tiering", ) - def _upload_config_to_ssm(self, param: Dict[str, Any]): - param_name = self._get_ssm_param_name() + def _upload_config_to_ssm(self, param: Dict[str, Any], config_type: str): + param_name = self._get_ssm_param_name(config_type) self._put_ssm_param(param, param_name) def _replace_cwa_config_variables(self) -> Dict[str, Any]: @@ -350,28 +394,45 @@ class CloudwatchHelper: replace known variable occurrences in Unified Cloudwatch Agent config file """ - cwa_config = self._load_config_file() + cwa_config = self._load_config_file("agent") self._replace_all_config_variables( cwa_config, - self.node_ids[0], + self.node_id, self.cluster_name, self.provider_config["region"], ) return cwa_config + def _replace_dashboard_config_variables(self) -> Dict[str, Any]: + """ + replace known variable occurrences in CloudWatch Dashboard config file + """ + data = self._load_config_file("dashboard") + widgets = [] + for item in data: + item_out = self._replace_all_config_variables( + item, + self.node_id, + self.cluster_name, + self.provider_config["region"], + ) + item_out = copy.deepcopy(item) + widgets.append(item_out) + return widgets + def _restart_cloudwatch_agent(self) -> None: """restart Unified Cloudwatch Agent""" - cwa_param_name = self._get_ssm_param_name() + cwa_param_name = self._get_ssm_param_name("agent") logger.info( "Restarting Unified Cloudwatch Agent package on {} node(s)." - .format(len(self.node_ids))) + .format((self.node_id))) self._stop_cloudwatch_agent() self._start_cloudwatch_agent(cwa_param_name) def _stop_cloudwatch_agent(self) -> None: - """stop Unified Cloudwatch Agent""" - logger.info("Stopping Unified Cloudwatch Agent package on {} node(s)." - .format(len(self.node_ids))) + """stop Unified CloudWatch Agent """ + logger.info("Stopping Unified CloudWatch Agent package on node {}." + .format(self.node_id)) parameters_stop_cwa = { "action": ["stop"], "mode": ["ec2"], @@ -381,16 +442,16 @@ class CloudwatchHelper: self._ssm_command_waiter( "AmazonCloudWatch-ManageAgent", parameters_stop_cwa, - self.node_ids, + self.node_id, False, ) - logger.info("Unified Cloudwatch Agent stopped on {} node(s).".format( - len(self.node_ids))) + logger.info("Unified CloudWatch Agent stopped on node {}.".format( + self.node_id)) def _start_cloudwatch_agent(self, cwa_param_name: str) -> None: - """start Unified Cloudwatch Agent""" - logger.info("Starting Unified Cloudwatch Agent package on {} node(s)." - .format(len(self.node_ids))) + """start Unified CloudWatch Agent""" + logger.info("Starting Unified CloudWatch Agent package on node {}." + .format(self.node_id)) parameters_start_cwa = { "action": ["configure"], "mode": ["ec2"], @@ -399,21 +460,21 @@ class CloudwatchHelper: "optionalRestart": ["yes"], } self._ssm_command_waiter("AmazonCloudWatch-ManageAgent", - parameters_start_cwa, self.node_ids) - logger.info( - "Unified Cloudwatch Agent started successfully on {} node(s)." - .format(len(self.node_ids))) + parameters_start_cwa, self.node_id) + logger.info("Unified CloudWatch Agent started successfully on node {}." + .format(self.node_id)) def _setup_cwa(self) -> bool: cwa_installed = self._check_cwa_installed_ec2_tag() if cwa_installed == "False": - res_cwa_installed = self._ensure_cwa_installed_ssm(self.node_ids) + res_cwa_installed = self._ensure_cwa_installed_ssm(self.node_id) return res_cwa_installed else: return True - def _get_head_node_config_hash(self) -> str: - hash_key_value = "-".join([CLOUDWATCH_CONFIG_HASH_TAG_BASE, "agent"]) + def _get_head_node_config_hash(self, config_type: str) -> str: + hash_key_value = "-".join( + [CLOUDWATCH_CONFIG_HASH_TAG_BASE, config_type]) filters = copy.deepcopy( self._get_current_cluster_session_nodes(self.cluster_name)) filters.append({ @@ -432,11 +493,12 @@ class CloudwatchHelper: "{} Error caught when getting value of {} tag on head node". format(e.response["Error"], hash_key_value)) - def _get_cur_node_config_hash(self) -> str: - hash_key_value = "-".join([CLOUDWATCH_CONFIG_HASH_TAG_BASE, "agent"]) + def _get_cur_node_config_hash(self, config_type: str) -> str: + hash_key_value = "-".join( + [CLOUDWATCH_CONFIG_HASH_TAG_BASE, config_type]) try: response = self.ec2_client.describe_instances( - InstanceIds=self.node_ids) + InstanceIds=[self.node_id]) reservations = response["Reservations"] message = "More than 1 response received from " \ "describing current node" @@ -447,9 +509,9 @@ class CloudwatchHelper: hash_value = self._get_default_empty_config_file_hash() for tag in tags: if tag["Key"] == hash_key_value: - logger.info("Successfully get Unified Cloudwatch Agent " - "hash tag value from node {}".format( - self.node_ids)) + logger.info( + "Successfully get cloudwatch {} hash tag value from " + "node {}".format(config_type, self.node_id)) hash_value = tag["Value"] return hash_value except botocore.exceptions.ClientError as e: @@ -457,36 +519,29 @@ class CloudwatchHelper: "{} Error caught when getting hash tag {} tag".format( e.response["Error"], hash_key_value)) - def _ensure_cwa_installed_ssm(self, node_ids: List[str]) -> bool: + def _ensure_cwa_installed_ssm(self, node_id: str) -> bool: """ Check if Unified Cloudwatch Agent is installed via ssm run command. - If not, notify user to use an AMI with the Unified CloudWatch Agent installed. """ logger.info("Checking Unified Cloudwatch Agent " - "status on {} nodes".format(len(node_ids))) + "status on node {}".format(node_id)) parameters_status_cwa = { "action": ["status"], "mode": ["ec2"], } - self._ec2_health_check_waiter(node_ids) + self._ec2_health_check_waiter(node_id) cmd_invocation_res = self._ssm_command_waiter( - "AmazonCloudWatch-ManageAgent", parameters_status_cwa, node_ids, + "AmazonCloudWatch-ManageAgent", parameters_status_cwa, node_id, False) - uninstalled_nodes = [] - installed_nodes = [] - for node_id, res in cmd_invocation_res.items(): - if not res: - uninstalled_nodes.append(node_id) - else: - installed_nodes.append(node_id) - if len(uninstalled_nodes) > 0: + cwa_installed = cmd_invocation_res.get(node_id, False) + if not cwa_installed: logger.warning( "Unified CloudWatch Agent not installed on {}. " "Ray logs, metrics not picked up. " "Please use an AMI with Unified CloudWatch Agent installed." - .format(uninstalled_nodes)) + .format(node_id)) return False else: return True @@ -504,11 +559,12 @@ class CloudwatchHelper: def _check_cwa_installed_ec2_tag(self) -> List[str]: """ - Check if Unified Cloudwatch Agent is installed. + Filtering all nodes to get nodes + without Unified CloudWatch Agent installed """ try: response = self.ec2_client.describe_instances( - InstanceIds=self.node_ids) + InstanceIds=[self.node_id]) reservations = response["Reservations"] message = "More than 1 response received from " \ "describing current node" @@ -519,15 +575,15 @@ class CloudwatchHelper: cwa_installed = str(False) for tag in tags: if tag["Key"] == CLOUDWATCH_AGENT_INSTALLED_TAG: - logger.info("Unified Cloudwatch Agent is installed on " - "node {}".format(self.node_ids)) + logger.info("Unified CloudWatch Agent is installed on " + "node {}".format(self.node_id)) cwa_installed = tag["Value"] return cwa_installed except botocore.exceptions.ClientError as e: logger.warning( - "{} Error caught when getting Unified Cloudwatch Agent status " - "based on {} tag".format(e.response["Error"], - CLOUDWATCH_AGENT_INSTALLED_TAG)) + "{} Error caught when getting Unified CloudWatch Agent " + "status based on {} tag".format( + e.response["Error"], CLOUDWATCH_AGENT_INSTALLED_TAG)) @staticmethod def resolve_instance_profile_name( @@ -545,7 +601,7 @@ class CloudwatchHelper: doesn't exist. """ cwa_cfg_exists = CloudwatchHelper.cloudwatch_config_exists( - config, "config") + config, "agent") return CLOUDWATCH_RAY_INSTANCE_PROFILE if cwa_cfg_exists \ else default_instance_profile_name @@ -563,7 +619,7 @@ class CloudwatchHelper: default ray iam role name if cloudwatch config file doesn't exist. """ cwa_cfg_exists = CloudwatchHelper.cloudwatch_config_exists( - config, "config") + config, "agent") return CLOUDWATCH_RAY_IAM_ROLE if cwa_cfg_exists \ else default_iam_role_name @@ -583,7 +639,7 @@ class CloudwatchHelper: cluster config file. """ cwa_cfg_exists = CloudwatchHelper.cloudwatch_config_exists( - config, "config") + config, "agent") if cwa_cfg_exists: cloudwatch_managed_policy = { "Version": "2012-10-17", @@ -624,7 +680,7 @@ class CloudwatchHelper: @staticmethod def cloudwatch_config_exists(config: Dict[str, Any], - config_key_name: str) -> bool: + config_type: str) -> bool: """Check if CloudWatch configuration was specified by the user in their cluster config file. @@ -633,12 +689,11 @@ class CloudwatchHelper: Args: config: provider section of cluster config file. - config_key_name: config file name. + config_type: type of CloudWatch config file. Returns: True if config file is specified by user. False if config file is not specified. """ - cfg = config.get("cloudwatch", {}).get("agent", - {}).get(config_key_name) + cfg = config.get("cloudwatch", {}).get(config_type, {}).get("config") return bool(cfg) diff --git a/python/ray/autoscaler/_private/aws/node_provider.py b/python/ray/autoscaler/_private/aws/node_provider.py index 4ea99b3ce..b3f7b6727 100644 --- a/python/ray/autoscaler/_private/aws/node_provider.py +++ b/python/ray/autoscaler/_private/aws/node_provider.py @@ -361,7 +361,7 @@ class AWSNodeProvider(NodeProvider): "Value": v, }) if CloudwatchHelper.cloudwatch_config_exists(self.provider_config, - "config"): + "agent"): cwa_installed = self._check_ami_cwa_installation(node_config) if cwa_installed: tag_pairs.extend([{ diff --git a/python/ray/autoscaler/_private/updater.py b/python/ray/autoscaler/_private/updater.py index f3253fc33..fd51325d4 100644 --- a/python/ray/autoscaler/_private/updater.py +++ b/python/ray/autoscaler/_private/updater.py @@ -307,7 +307,7 @@ class NodeUpdater: from ray.autoscaler._private.aws.cloudwatch.cloudwatch_helper \ import CloudwatchHelper CloudwatchHelper(self.provider.provider_config, - [self.node_id], self.provider.cluster_name). \ + self.node_id, self.provider.cluster_name). \ update_from_config(self.is_head_node) if node_tags.get(TAG_RAY_RUNTIME_CONFIG) == self.runtime_hash: diff --git a/python/ray/autoscaler/aws/cloudwatch/example-cloudwatch-agent-config.json b/python/ray/autoscaler/aws/cloudwatch/example-cloudwatch-agent-config.json index 61d236651..b21855b26 100755 --- a/python/ray/autoscaler/aws/cloudwatch/example-cloudwatch-agent-config.json +++ b/python/ray/autoscaler/aws/cloudwatch/example-cloudwatch-agent-config.json @@ -3,10 +3,6 @@ "metrics_collection_interval":60, "run_as_user":"root" }, - "csm":{ - "memory_limit_in_mb":20, - "port":31000 - }, "logs":{ "metrics_collected": { "prometheus": { @@ -116,60 +112,72 @@ } } }, - "metrics":{ - "append_dimensions":{ - "AutoScalingGroupName":"${aws:AutoScalingGroupName}", - "InstanceId":"${aws:InstanceId}" - }, - "metrics_collected":{ - "collectd":{ - "metrics_aggregation_interval":60 - }, - "cpu":{ - "measurement":[ - "usage_active", - "usage_system", - "usage_user", - "usage_idle", - "time_active", - "time_system", - "time_user", - "time_idle" - ] - }, - "processes":{ - "measurement":[ - "processes_running", - "processes_sleeping", - "processes_zombies", - "processes_dead", - "processes_total" - ], - "metrics_collection_interval":60, - "resources":[ - "*" - ] - }, - "disk":{ - "measurement":[ - "disk_used_percent" - ], - "metrics_collection_interval":60, - "resources":[ - "*" - ] - }, - "mem":{ - "measurement":[ - "mem_used_percent" - ], - "metrics_collection_interval":60 - }, - "statsd":{ - "metrics_aggregation_interval":60, - "metrics_collection_interval":10, - "service_address":":8125" - } - } + "metrics": { + "namespace": "{cluster_name}-ray-CWAgent", + "aggregation_dimensions": [ + [ + "InstanceId" + ] + ], + "append_dimensions": { + "AutoScalingGroupName": "${aws:AutoScalingGroupName}", + "InstanceId": "${aws:InstanceId}" + }, + "metrics_collected": { + "collectd": { + "metrics_aggregation_interval": 60 + }, + "cpu": { + "measurement": [ + "usage_active", + "usage_system", + "usage_user", + "usage_idle", + "time_active", + "time_system", + "time_user", + "time_idle" + ], + "resources": [ + "*" + ] + }, + "processes": { + "measurement": [ + "processes_running", + "processes_sleeping", + "processes_zombies", + "processes_dead", + "processes_total" + ], + "metrics_collection_interval": 60, + "resources": [ + "*" + ] + }, + "disk": { + "measurement": [ + "disk_used_percent" + ], + "metrics_collection_interval": 60, + "resources": [ + "/" + ] + }, + "mem": { + "measurement": [ + "mem_used_percent" + ], + "metrics_collection_interval": 60, + "resources": [ + "*" + ] + }, + "statsd": { + "metrics_aggregation_interval": 60, + "metrics_collection_interval": 10, + "service_address": ":8125" + } + } } } diff --git a/python/ray/autoscaler/aws/cloudwatch/example-cloudwatch-dashboard-config.json b/python/ray/autoscaler/aws/cloudwatch/example-cloudwatch-dashboard-config.json new file mode 100644 index 000000000..b30047a80 --- /dev/null +++ b/python/ray/autoscaler/aws/cloudwatch/example-cloudwatch-dashboard-config.json @@ -0,0 +1,238 @@ +[ + { + "type":"explorer", + "x":12, + "y":18, + "width":12, + "height":6, + "properties": { + "metrics": [ + { + "metricName": "CPUUtilization", + "resourceType": "AWS::EC2::Instance", + "stat": "Average" + } + ], + "aggregateBy": { + "key": "*", + "func": "SUM" + }, + "labels": [ + { + "key": "cloudwatch-agent-installed", + "value": "True" + }, + { + "key": "ray-cluster-name", + "value": "{cluster_name}" + } + ], + "widgetOptions": { + "legend": { + "position": "bottom" + }, + "view": "timeSeries", + "stacked": false, + "rowsPerPage": 1, + "widgetsPerRow": 1 + }, + "title":"Cluster CPU Utilization" + } + }, + { + "type":"explorer", + "x":0, + "y":18, + "width":12, + "height":6, + "properties": { + "metrics": [ + { + "metricName": "CPUUtilization", + "resourceType": "AWS::EC2::Instance", + "stat": "Average" + } + ], + "aggregateBy": { + "key": "*", + "func": "AVG" + }, + "labels": [ + { + "key": "cloudwatch-agent-installed", + "value": "True" + }, + { + "key": "ray-cluster-name", + "value": "{cluster_name}" + } + ], + "widgetOptions": { + "legend": { + "position": "bottom" + }, + "view": "timeSeries", + "stacked": false, + "rowsPerPage": 1, + "widgetsPerRow": 1 + }, + "title":"Single Node CPU Utilization (Avg and Max)" + } + }, + { + "type":"metric", + "x":12, + "y":6, + "width":12, + "height":6, + "properties":{ + "view":"timeSeries", + "metrics":[ + [ { "expression": "SUM(SEARCH('{{cluster_name}-ray-CWAgent,InstanceId} processes_running', 'Average', 300))", "label": "cluster running process sum", "id": "e1" } ], + [ { "expression": "SUM(SEARCH('{{cluster_name}-ray-CWAgent,InstanceId} processes_sleeping', 'Average', 300))", "label": "cluster sleeping process sum", "id": "e2" } ] + ], + "region":"{region}", + "stat":"Average", + "period":60, + "title":"Cluster Processes" + } + }, + { + "type":"metric", + "x":0, + "y":6, + "width":12, + "height":6, + "properties":{ + "view":"timeSeries", + "metrics":[ + [ { "expression": "AVG(SEARCH('{{cluster_name}-ray-CWAgent,InstanceId} processes_running', 'Average', 300))", "label": "cluster running process average", "id": "e3" } ], + [ { "expression": "AVG(SEARCH('{{cluster_name}-ray-CWAgent,InstanceId} processes_sleeping', 'Average', 300))", "label": "cluster sleeping process average", "id": "e4" } ], + [ { "expression": "MAX(SEARCH('{{cluster_name}-ray-CWAgent,InstanceId} processes_running', 'Average', 300))", "label": "cluster running process maximum", "id": "e5" } ], + [ { "expression": "MAX(SEARCH('{{cluster_name}-ray-CWAgent,InstanceId} processes_sleeping', 'Average', 300))", "label": "cluster sleeping process maximum", "id": "e6" } ] + ], + "region":"{region}", + "stat":"Average", + "period":60, + "title":"Single Node Processes (Avg and Max)" + } + }, + { + "type":"metric", + "x":12, + "y":12, + "width":12, + "height":6, + "properties":{ + "view":"timeSeries", + "stacked":false, + "metrics":[ + [ { "expression": "SUM(SEARCH('{{cluster_name}-ray-CWAgent,InstanceId} disk_used_percent', 'Average', 300))", "label": "cluster disk used percent sum", "id": "e7", "period": 300 } ] + + ], + "region":"{region}", + "title":"Cluster Disk Usage" + } + }, + { + "type":"metric", + "x":0, + "y":12, + "width":12, + "height":6, + "properties":{ + "view":"timeSeries", + "stacked":false, + "metrics":[ + [ { "expression": "AVG(SEARCH('{{cluster_name}-ray-CWAgent,InstanceId} disk_used_percent', 'Average', 300))", "id": "e8", "label": "cluster disk used percent average", "period": 300 } ], + [ { "expression": "MAX(SEARCH('{{cluster_name}-ray-CWAgent,InstanceId} disk_used_percent', 'Maximum', 300))", "id": "e9", "label": "cluster disk used percent maximum", "period": 300 } ] + + ], + "region":"{region}", + "title":"Single Node Disk Usage (Avg and Max)" + } + }, + { + "type":"metric", + "x":12, + "y":18, + "width":12, + "height":6, + "properties": { + "metrics": [ + [ { "expression": "SUM(SEARCH('{{cluster_name}-ray-CWAgent,InstanceId} mem_used_percent', 'Average', 300))", "id": "e10", "label": "cluster mem used percent sum", "period": 300 } ] + + ], + "view": "timeSeries", + "stacked": false, + "region": "{region}", + "stat": "Maximum", + "period": 300, + "start": "-PT2H", + "end": "P0D", + "title": "Cluster Memory Usage" + } + }, + { + "type":"metric", + "x":0, + "y":18, + "width":12, + "height":6, + "properties": { + "metrics": [ + [ { "expression": "AVG(SEARCH('{{cluster_name}-ray-CWAgent,InstanceId} mem_used_percent', 'Average', 300))", "id": "e11", "label": "cluster mem used percent average", "period": 300 } ], + [ { "expression": "MAX(SEARCH('{{cluster_name}-ray-CWAgent,InstanceId} mem_used_percent', 'Maximum', 300))", "id": "e12", "label": "cluster mem used percent maximum", "period": 300 } ] + ], + "view": "timeSeries", + "stacked": false, + "region": "{region}", + "stat": "Maximum", + "period": 300, + "start": "-PT2H", + "end": "P0D", + "title": "Single Node Memory Usage (Avg and Max)" + } + }, + { + "height": 6, + "width": 12, + "y": 0, + "x": 0, + "type": "metric", + "properties": { + "metrics": [ + [ { "expression": "SUM(SEARCH('{{cluster_name}-ray-prometheus,instance} ray_node_cpu_count', 'Maximum', 300))", "label": "cluster cpu sum", "id": "e13" } ] + ], + "view": "timeSeries", + "stacked": false, + "region": "{region}", + "stat": "Maximum", + "period": 300, + "start": "-PT2H", + "end": "P0D", + "title": "Cluster CPUs" + } + }, + { + "height": 6, + "width": 12, + "y": 0, + "x": 12, + "type": "metric", + "properties": { + "metrics": [ + [ { "expression": "SUM(SEARCH('{{cluster_name}-ray-prometheus,instance} object_store_available_memory', 'Average', 300))", "label": "cluster object store available memory sum", "id": "e14" } ] + ], + "view": "timeSeries", + "stacked": false, + "region": "{region}", + "stat": "Maximum", + "period": 300, + "start": "-PT2H", + "end": "P0D", + "title": "Cluster Object Store Available Memory" + } + } +] + diff --git a/python/ray/autoscaler/aws/example-cloudwatch.yaml b/python/ray/autoscaler/aws/example-cloudwatch.yaml index a696f02f6..35fb94372 100644 --- a/python/ray/autoscaler/aws/example-cloudwatch.yaml +++ b/python/ray/autoscaler/aws/example-cloudwatch.yaml @@ -14,8 +14,9 @@ provider: # We depend on AWS Systems Manager (SSM) to deploy CloudWatch configuration updates to your cluster, # with relevant configuration created or updated in the SSM Parameter Store during `ray up`. - # The `AmazonCloudWatch-ray_agent_config_{cluster_name}` SSM Parameter Store Config Key is used to - # store a remote cache of the last Unified CloudWatch Agent config applied. + # We support two CloudWatch related config type under this cloudwatch section: agent and dashboard. + # The `AmazonCloudWatch-ray_{config_type}_config_{cluster_name}` SSM Parameter Store Config Key is used to + # store a remote cache of the last Unified CloudWatch config applied. # Every time you run `ray up` to update your cluster, we compare your local CloudWatch config file contents # to the SSM Parameter Store's contents for that config and, if they differ, then the associated CloudWatch @@ -39,6 +40,17 @@ provider: max_attempts: 120 # Seconds to wait between each Unified CloudWatch Agent SSM config update attempt. delay_seconds: 30 + # For CloudWatch Dashboard config files, we will also replace references to + # `{region}` with your cluster's region name, and `{cluster_name}` with your cluster name. + dashboard: + # CloudWatch Dashboard name + # Per-cluster level dashboard is created and dashboard name will be + # `{your_cluster_name}-example-dashboard-name` as default + name: "example-dashboard-name" + # The CloudWatch Dashboard is defined via the config file described + # at https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/CloudWatch-Dashboard-Body-Structure.html. + # Path to the CloudWatch Dashboard config file + config: "cloudwatch/example-cloudwatch-dashboard-config.json" # How Ray will authenticate with newly launched nodes. auth: diff --git a/python/ray/autoscaler/ray-schema.json b/python/ray/autoscaler/ray-schema.json index 96b2768c7..f31b77d06 100644 --- a/python/ray/autoscaler/ray-schema.json +++ b/python/ray/autoscaler/ray-schema.json @@ -201,6 +201,16 @@ "description": "Seconds to wait between each Unified CloudWatch Agent installation attempt." } } + }, + "dashboard": { + "name": { + "type": ["string", "null"], + "description": "User defined CloudWatch Dashboard name." + }, + "config": { + "type": ["string", "null"], + "description": "Path to CloudWatch Dashboard config file. See https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/CloudWatch-Dashboard-Body-Structure.html for additional details." + } } } }