[autoscaler] AWS Autoscaler CloudWatch Dashboard support (#20266)

These changes add a set of improvements to enable automatic creation and update of CloudWatch dashboards when provisioning AWS Autoscaling clusters. Successful implementation of these improvements will allow AWS Autoscaler users to:

1. Get rapid insights into their cluster state via CloudWatch dashboards.
2. Allow users to update their CloudWatch dashboard JSON configuration files during Ray up execution time.

Notes:
1.  This PR is a follow-up PR for #18619, adds dashboard support.
This commit is contained in:
Zyiqin-Miranda 2022-01-10 10:18:53 -08:00 committed by GitHub
parent 6420c75fd2
commit 71fae21e8e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 563 additions and 240 deletions

View file

@ -21,10 +21,9 @@ CLOUDWATCH_CONFIG_HASH_TAG_BASE = "cloudwatch-config-hash"
class CloudwatchHelper: class CloudwatchHelper:
def __init__(self, provider_config: Dict[str, Any], node_ids: List[str], def __init__(self, provider_config: Dict[str, Any], node_id: str,
cluster_name: str) -> None: cluster_name: str) -> None:
# dedupe and sort node IDs to support deterministic unit test stubs self.node_id = node_id
self.node_ids = sorted(set(node_ids))
self.cluster_name = cluster_name self.cluster_name = cluster_name
self.provider_config = provider_config self.provider_config = provider_config
region = provider_config["region"] region = provider_config["region"]
@ -41,10 +40,13 @@ class CloudwatchHelper:
is_head_node: whether this node is the head node. is_head_node: whether this node is the head node.
""" """
if CloudwatchHelper.cloudwatch_config_exists(self.provider_config, if CloudwatchHelper.cloudwatch_config_exists(self.provider_config,
"config"): "agent"):
self._update_cloudwatch_agent_config(is_head_node) self._update_cloudwatch_config(is_head_node, "agent")
if CloudwatchHelper.cloudwatch_config_exists(self.provider_config,
"dashboard"):
self._update_cloudwatch_config(is_head_node, "dashboard")
def _ec2_health_check_waiter(self, node_ids: List[str]) -> None: def _ec2_health_check_waiter(self, node_id: str) -> None:
# wait for all EC2 instance checks to complete # wait for all EC2 instance checks to complete
try: try:
logger.info( logger.info(
@ -52,59 +54,101 @@ class CloudwatchHelper:
"configuring Unified Cloudwatch Agent. This may take a few " "configuring Unified Cloudwatch Agent. This may take a few "
"minutes...") "minutes...")
waiter = self.ec2_client.get_waiter("instance_status_ok") waiter = self.ec2_client.get_waiter("instance_status_ok")
waiter.wait(InstanceIds=node_ids) waiter.wait(InstanceIds=[node_id])
except botocore.exceptions.WaiterError as e: except botocore.exceptions.WaiterError as e:
logger.error( logger.error(
"Failed while waiting for EC2 instance checks to complete: {}". "Failed while waiting for EC2 instance checks to complete: {}".
format(e.message)) format(e.message))
raise e raise e
def _update_cloudwatch_agent_config(self, is_head_node: bool) -> None: def _update_cloudwatch_config(self, is_head_node: bool,
""" check whether update operations are needed. config_type: str) -> None:
"""Update remote CloudWatch configs at Parameter Store,
update hash tag value on node and perform associated operations
at CloudWatch console if local CloudWatch configs change.
Args:
is_head_node: whether this node is the head node.
config_type: CloudWatch config file type.
""" """
cwa_installed = self._setup_cwa() cwa_installed = self._setup_cwa()
param_name = self._get_ssm_param_name() param_name = self._get_ssm_param_name(config_type)
if cwa_installed: if cwa_installed:
if is_head_node: if is_head_node:
cw_config_ssm = self._set_cloudwatch_ssm_config_param( cw_config_ssm = self._set_cloudwatch_ssm_config_param(
param_name) param_name, config_type)
cur_cw_config_hash = self._sha1_hash_file() cur_cw_config_hash = self._sha1_hash_file(config_type)
ssm_cw_config_hash = self._sha1_hash_json(cw_config_ssm) ssm_cw_config_hash = self._sha1_hash_json(cw_config_ssm)
# check if user updated Unified Cloudwatch Agent config file. # check if user updated Unified Cloudwatch Agent config file.
# if so, perform corresponding actions. # if so, perform corresponding actions.
if cur_cw_config_hash != ssm_cw_config_hash: if cur_cw_config_hash != ssm_cw_config_hash:
logger.info( logger.info(
"Unified Cloudwatch Agent config file has changed.") "Cloudwatch {} config file has changed.".format(
self._upload_config_to_ssm_and_set_hash_tag() config_type))
self._restart_cloudwatch_agent() self._upload_config_to_ssm_and_set_hash_tag(config_type)
if config_type == "agent":
self._restart_cloudwatch_agent()
elif config_type == "dashboard":
self._put_cloudwatch_dashboard()
else: else:
head_node_hash = self._get_head_node_config_hash() head_node_hash = self._get_head_node_config_hash(config_type)
cur_node_hash = self._get_cur_node_config_hash() cur_node_hash = self._get_cur_node_config_hash(config_type)
if head_node_hash != cur_node_hash: if head_node_hash != cur_node_hash:
logger.info( logger.info(
"Unified Cloudwatch Agent config file has changed.") "Cloudwatch {} config file has changed.".format(
self._restart_cloudwatch_agent() config_type))
if config_type == "agent":
self._restart_cloudwatch_agent()
self._update_cloudwatch_hash_tag_value( self._update_cloudwatch_hash_tag_value(
self.node_ids, head_node_hash) self.node_id, head_node_hash, config_type)
def _send_command_to_nodes(self, document_name: str, parameters: List[str], def _put_cloudwatch_dashboard(self) -> Dict[str, Any]:
node_ids: List[str]) -> Dict[str, Any]: """put dashboard to cloudwatch console"""
cloudwatch_config = self.provider_config["cloudwatch"]
dashboard_config = cloudwatch_config \
.get("dashboard", {})
dashboard_name_cluster = dashboard_config.get("name",
self.cluster_name)
dashboard_name = self.cluster_name + "-" + dashboard_name_cluster
widgets = self._replace_dashboard_config_variables()
response = self.cloudwatch_client.put_dashboard(
DashboardName=dashboard_name,
DashboardBody=json.dumps({
"widgets": widgets
}))
issue_count = len(response.get("DashboardValidationMessages", []))
if issue_count > 0:
for issue in response.get("DashboardValidationMessages"):
logging.error("Error in dashboard config: {} - {}".format(
issue["Message"], issue["DataPath"]))
raise Exception(
"Errors in dashboard configuration: {} issues raised".format(
issue_count))
else:
logger.info("Successfully put dashboard to CloudWatch console")
return response
def _send_command_to_node(self, document_name: str, parameters: List[str],
node_id: str) -> Dict[str, Any]:
""" send SSM command to the given nodes """ """ send SSM command to the given nodes """
logger.debug("Sending SSM command to {} node(s). Document name: {}. " logger.debug("Sending SSM command to {} node(s). Document name: {}. "
"Parameters: {}.".format( "Parameters: {}.".format(node_id, document_name,
len(node_ids), document_name, parameters)) parameters))
response = self.ssm_client.send_command( response = self.ssm_client.send_command(
InstanceIds=node_ids, InstanceIds=[node_id],
DocumentName=document_name, DocumentName=document_name,
Parameters=parameters, Parameters=parameters,
MaxConcurrency=str(min(len(node_ids), 100)), MaxConcurrency=str(min(len(node_id), 1)),
MaxErrors="0") MaxErrors="0")
return response return response
def _ssm_command_waiter(self, def _ssm_command_waiter(self,
document_name: str, document_name: str,
parameters: List[str], parameters: List[str],
node_ids: List[str], node_id: str,
retry_failed: bool = True) -> bool: retry_failed: bool = True) -> bool:
""" wait for SSM command to complete on all cluster nodes """ """ wait for SSM command to complete on all cluster nodes """
@ -113,76 +157,72 @@ class CloudwatchHelper:
# exist instead of failing immediately, and by resubmitting # exist instead of failing immediately, and by resubmitting
# any failed command until all retry attempts are exhausted # any failed command until all retry attempts are exhausted
# by default. # by default.
response = self._send_command_to_nodes(document_name, parameters, response = self._send_command_to_node(document_name, parameters,
node_ids) node_id)
command_id = response["Command"]["CommandId"] command_id = response["Command"]["CommandId"]
cloudwatch_config = self.provider_config["cloudwatch"] cloudwatch_config = self.provider_config["cloudwatch"]
agent_retryer_config = cloudwatch_config \ agent_retryer_config = cloudwatch_config \
.get("agent", {}) \ .get("agent") \
.get("retryer", {}) .get("retryer", {})
max_attempts = agent_retryer_config.get("max_attempts", 120) max_attempts = agent_retryer_config.get("max_attempts", 120)
delay_seconds = agent_retryer_config.get("delay_seconds", 30) delay_seconds = agent_retryer_config.get("delay_seconds", 30)
num_attempts = 0 num_attempts = 0
cmd_invocation_res = {} cmd_invocation_res = {}
for node_id in node_ids: while True:
while True: num_attempts += 1
num_attempts += 1 logger.debug("Listing SSM command ID {} invocations on node {}"
logger.debug("Listing SSM command ID {} invocations on node {}" .format(command_id, node_id))
.format(command_id, node_id)) response = self.ssm_client.list_command_invocations(
response = self.ssm_client.list_command_invocations( CommandId=command_id,
CommandId=command_id, InstanceId=node_id,
InstanceId=node_id, )
) cmd_invocations = response["CommandInvocations"]
cmd_invocations = response["CommandInvocations"] if not cmd_invocations:
if not cmd_invocations: logger.debug("SSM Command ID {} invocation does not exist. If "
logger.debug( "the command was just started, it may take a "
"SSM Command ID {} invocation does not exist. If " "few seconds to register.".format(command_id))
"the command was just started, it may take a " else:
"few seconds to register.".format(command_id)) if len(cmd_invocations) > 1:
else: logger.warning(
if len(cmd_invocations) > 1: "Expected to find 1 SSM command invocation with "
logger.warning( "ID {} on node {} but found {}: {}".format(
"Expected to find 1 SSM command invocation with " command_id,
"ID {} on node {} but found {}: {}".format( node_id,
command_id, len(cmd_invocations),
node_id, cmd_invocations,
len(cmd_invocations), ))
cmd_invocations, cmd_invocation = cmd_invocations[0]
)) if cmd_invocation["Status"] == "Success":
cmd_invocation = cmd_invocations[0] logger.debug("SSM Command ID {} completed successfully."
if cmd_invocation["Status"] == "Success": .format(command_id))
cmd_invocation_res[node_id] = True
break
if num_attempts >= max_attempts:
logger.error(
"Max attempts for command {} exceeded on node {}"
.format(command_id, node_id))
raise botocore.exceptions.WaiterError(
name="ssm_waiter",
reason="Max attempts exceeded",
last_response=cmd_invocation,
)
if cmd_invocation["Status"] == "Failed":
logger.debug(f"SSM Command ID {command_id} failed.")
if retry_failed:
logger.debug(f"Retrying in {delay_seconds} seconds.")
response = self._send_command_to_node(
document_name, parameters, node_id)
command_id = response["Command"]["CommandId"]
logger.debug("Sent SSM command ID {} to node {}"
.format(command_id, node_id))
else:
logger.debug( logger.debug(
"SSM Command ID {} completed successfully." f"Ignoring Command ID {command_id} failure.")
.format(command_id)) cmd_invocation_res[node_id] = False
cmd_invocation_res[node_id] = True
break break
if num_attempts >= max_attempts: time.sleep(delay_seconds)
logger.error(
"Max attempts for command {} exceeded on node {}"
.format(command_id, node_id))
raise botocore.exceptions.WaiterError(
name="ssm_waiter",
reason="Max attempts exceeded",
last_response=cmd_invocation,
)
if cmd_invocation["Status"] == "Failed":
logger.debug(f"SSM Command ID {command_id} failed.")
if retry_failed:
logger.debug(
f"Retrying in {delay_seconds} seconds.")
response = self._send_command_to_nodes(
document_name, parameters, [node_id])
command_id = response["Command"]["CommandId"]
logger.debug("Sent SSM command ID {} to node {}"
.format(command_id, node_id))
else:
logger.debug(
f"Ignoring Command ID {command_id} failure.")
cmd_invocation_res[node_id] = False
break
time.sleep(delay_seconds)
return cmd_invocation_res return cmd_invocation_res
def _replace_config_variables(self, string: str, node_id: str, def _replace_config_variables(self, string: str, node_id: str,
@ -211,7 +251,6 @@ class CloudwatchHelper:
modified strings in the collection (which is not necessarily equal to modified strings in the collection (which is not necessarily equal to
the number of variables replaced). the number of variables replaced).
""" """
modified_value_count = 0
for key in collection: for key in collection:
if type(collection) is dict: if type(collection) is dict:
value = collection.get(key) value = collection.get(key)
@ -222,49 +261,48 @@ class CloudwatchHelper:
if type(value) is str: if type(value) is str:
collection[index_key] = self._replace_config_variables( collection[index_key] = self._replace_config_variables(
value, node_id, cluster_name, region) value, node_id, cluster_name, region)
modified_value_count += (collection[index_key] != value)
elif type(value) is dict or type(value) is list: elif type(value) is dict or type(value) is list:
collection[index_key], modified_count = self. \ collection[index_key] = self. \
_replace_all_config_variables( _replace_all_config_variables(
value, node_id, cluster_name, region) value, node_id, cluster_name, region)
modified_value_count += modified_count return collection
return collection, modified_value_count
def _load_config_file(self) -> Dict[str, Any]: def _load_config_file(self, config_type: str) -> Dict[str, Any]:
"""load JSON config file""" """load JSON config file"""
cloudwatch_config = self.provider_config["cloudwatch"] cloudwatch_config = self.provider_config["cloudwatch"]
json_config_file_section = cloudwatch_config.get("agent", {}) json_config_file_section = cloudwatch_config.get(config_type, {})
json_config_file_path = json_config_file_section.get("config", {}) json_config_file_path = json_config_file_section.get("config", {})
json_config_path = os.path.abspath(json_config_file_path) json_config_path = os.path.abspath(json_config_file_path)
with open(json_config_path) as f: with open(json_config_path) as f:
data = json.load(f) data = json.load(f)
return data return data
def _set_cloudwatch_ssm_config_param(self, parameter_name: str) -> str: def _set_cloudwatch_ssm_config_param(self, parameter_name: str,
config_type: str) -> str:
""" """
get cloudwatch config for the given param and config type from SSM get cloudwatch config for the given param and config type from SSM
if it exists, returns empty str if not. if it exists, put it in the SSM param store if not
""" """
try: try:
parameter_value = self._get_ssm_param(parameter_name) parameter_value = self._get_ssm_param(parameter_name)
except botocore.exceptions.ClientError as e: except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "ParameterNotFound": if e.response["Error"]["Code"] == "ParameterNotFound":
logger.info( logger.info(
"Unified Cloudwatch Agent config file is not found " "Cloudwatch {} config file is not found "
"at SSM parameter store. " "at SSM parameter store. "
"Checking for Unified Cloudwatch Agent installation") "Checking for Unified CloudWatch Agent installation".
format(config_type))
return self._get_default_empty_config_file_hash() return self._get_default_empty_config_file_hash()
else: else:
logger.info( logger.info("Failed to fetch CloudWatch {} config from SSM "
"Failed to fetch Unified Cloudwatch Agent config from SSM " "parameter store.".format(config_type))
"parameter store.")
logger.error(e) logger.error(e)
raise e raise e
return parameter_value return parameter_value
def _get_default_empty_config_file_hash(self): def _get_default_empty_config_file_hash(self):
default_cwa_config = "{}" default_cw_config = "{}"
parameter_value = self._sha1_hash_json(default_cwa_config) parameter_value = self._sha1_hash_json(default_cw_config)
return parameter_value return parameter_value
def _get_ssm_param(self, parameter_name: str) -> str: def _get_ssm_param(self, parameter_name: str) -> str:
@ -286,48 +324,54 @@ class CloudwatchHelper:
sha1_res = hash.hexdigest() sha1_res = hash.hexdigest()
return sha1_res return sha1_res
def _sha1_hash_file(self) -> str: def _sha1_hash_file(self, config_type: str) -> str:
"""calculate the config file sha1 hash""" """calculate the config file sha1 hash"""
config = self._replace_cwa_config_variables() if config_type == "agent":
config = self._replace_cwa_config_variables()
if config_type == "dashboard":
config = self._replace_dashboard_config_variables()
value = json.dumps(config) value = json.dumps(config)
sha1_res = self._sha1_hash_json(value) sha1_res = self._sha1_hash_json(value)
return sha1_res return sha1_res
def _upload_config_to_ssm_and_set_hash_tag(self): def _upload_config_to_ssm_and_set_hash_tag(self, config_type: str):
"""This function should only be called by head node""" if config_type == "agent":
data = self._replace_cwa_config_variables() data = self._replace_cwa_config_variables()
sha1_hash_value = self._sha1_hash_file() if config_type == "dashboard":
self._upload_config_to_ssm(data) data = self._replace_dashboard_config_variables()
self._update_cloudwatch_hash_tag_value(self.node_ids, sha1_hash_value) sha1_hash_value = self._sha1_hash_file(config_type)
self._upload_config_to_ssm(data, config_type)
self._update_cloudwatch_hash_tag_value(self.node_id, sha1_hash_value,
config_type)
def _add_cwa_installed_tag(self, node_ids: List[str]) -> None: def _add_cwa_installed_tag(self, node_id: str) -> None:
self.ec2_client.create_tags( self.ec2_client.create_tags(
Resources=node_ids, Resources=[node_id],
Tags=[{ Tags=[{
"Key": CLOUDWATCH_AGENT_INSTALLED_TAG, "Key": CLOUDWATCH_AGENT_INSTALLED_TAG,
"Value": "True" "Value": "True"
}]) }])
logger.info("Successfully add Unified Cloudwatch Agent installed " logger.info("Successfully add Unified Cloudwatch Agent installed "
"tag on {}".format(node_ids)) "tag on {}".format(node_id))
def _update_cloudwatch_hash_tag_value(self, node_ids: List[str], def _update_cloudwatch_hash_tag_value(
sha1_hash_value: str): self, node_id: str, sha1_hash_value: str, config_type: str):
hash_key_value = "-".join([CLOUDWATCH_CONFIG_HASH_TAG_BASE, "agent"]) hash_key_value = "-".join(
[CLOUDWATCH_CONFIG_HASH_TAG_BASE, config_type])
self.ec2_client.create_tags( self.ec2_client.create_tags(
Resources=node_ids, Resources=[node_id],
Tags=[{ Tags=[{
"Key": hash_key_value, "Key": hash_key_value,
"Value": sha1_hash_value "Value": sha1_hash_value
}]) }])
logger.info( logger.info("Successfully update cloudwatch {} hash tag on {}".format(
"Successfully update Unified Cloudwatch Agent hash tag on {}". config_type, node_id))
format(node_ids))
def _get_ssm_param_name(self) -> str: def _get_ssm_param_name(self, config_type: str) -> str:
"""return the parameter name for cloudwatch configs""" """return the parameter name for cloudwatch configs"""
ssm_config_param_name = \ ssm_config_param_name = \
"AmazonCloudWatch-" + "ray_{}_config_{}". \ "AmazonCloudWatch-" + "ray_{}_config_{}". \
format("agent", self.cluster_name) format(config_type, self.cluster_name)
return ssm_config_param_name return ssm_config_param_name
def _put_ssm_param(self, parameter: Dict[str, Any], def _put_ssm_param(self, parameter: Dict[str, Any],
@ -341,8 +385,8 @@ class CloudwatchHelper:
Tier="Intelligent-Tiering", Tier="Intelligent-Tiering",
) )
def _upload_config_to_ssm(self, param: Dict[str, Any]): def _upload_config_to_ssm(self, param: Dict[str, Any], config_type: str):
param_name = self._get_ssm_param_name() param_name = self._get_ssm_param_name(config_type)
self._put_ssm_param(param, param_name) self._put_ssm_param(param, param_name)
def _replace_cwa_config_variables(self) -> Dict[str, Any]: def _replace_cwa_config_variables(self) -> Dict[str, Any]:
@ -350,28 +394,45 @@ class CloudwatchHelper:
replace known variable occurrences in replace known variable occurrences in
Unified Cloudwatch Agent config file Unified Cloudwatch Agent config file
""" """
cwa_config = self._load_config_file() cwa_config = self._load_config_file("agent")
self._replace_all_config_variables( self._replace_all_config_variables(
cwa_config, cwa_config,
self.node_ids[0], self.node_id,
self.cluster_name, self.cluster_name,
self.provider_config["region"], self.provider_config["region"],
) )
return cwa_config return cwa_config
def _replace_dashboard_config_variables(self) -> Dict[str, Any]:
"""
replace known variable occurrences in CloudWatch Dashboard config file
"""
data = self._load_config_file("dashboard")
widgets = []
for item in data:
item_out = self._replace_all_config_variables(
item,
self.node_id,
self.cluster_name,
self.provider_config["region"],
)
item_out = copy.deepcopy(item)
widgets.append(item_out)
return widgets
def _restart_cloudwatch_agent(self) -> None: def _restart_cloudwatch_agent(self) -> None:
"""restart Unified Cloudwatch Agent""" """restart Unified Cloudwatch Agent"""
cwa_param_name = self._get_ssm_param_name() cwa_param_name = self._get_ssm_param_name("agent")
logger.info( logger.info(
"Restarting Unified Cloudwatch Agent package on {} node(s)." "Restarting Unified Cloudwatch Agent package on {} node(s)."
.format(len(self.node_ids))) .format((self.node_id)))
self._stop_cloudwatch_agent() self._stop_cloudwatch_agent()
self._start_cloudwatch_agent(cwa_param_name) self._start_cloudwatch_agent(cwa_param_name)
def _stop_cloudwatch_agent(self) -> None: def _stop_cloudwatch_agent(self) -> None:
"""stop Unified Cloudwatch Agent""" """stop Unified CloudWatch Agent """
logger.info("Stopping Unified Cloudwatch Agent package on {} node(s)." logger.info("Stopping Unified CloudWatch Agent package on node {}."
.format(len(self.node_ids))) .format(self.node_id))
parameters_stop_cwa = { parameters_stop_cwa = {
"action": ["stop"], "action": ["stop"],
"mode": ["ec2"], "mode": ["ec2"],
@ -381,16 +442,16 @@ class CloudwatchHelper:
self._ssm_command_waiter( self._ssm_command_waiter(
"AmazonCloudWatch-ManageAgent", "AmazonCloudWatch-ManageAgent",
parameters_stop_cwa, parameters_stop_cwa,
self.node_ids, self.node_id,
False, False,
) )
logger.info("Unified Cloudwatch Agent stopped on {} node(s).".format( logger.info("Unified CloudWatch Agent stopped on node {}.".format(
len(self.node_ids))) self.node_id))
def _start_cloudwatch_agent(self, cwa_param_name: str) -> None: def _start_cloudwatch_agent(self, cwa_param_name: str) -> None:
"""start Unified Cloudwatch Agent""" """start Unified CloudWatch Agent"""
logger.info("Starting Unified Cloudwatch Agent package on {} node(s)." logger.info("Starting Unified CloudWatch Agent package on node {}."
.format(len(self.node_ids))) .format(self.node_id))
parameters_start_cwa = { parameters_start_cwa = {
"action": ["configure"], "action": ["configure"],
"mode": ["ec2"], "mode": ["ec2"],
@ -399,21 +460,21 @@ class CloudwatchHelper:
"optionalRestart": ["yes"], "optionalRestart": ["yes"],
} }
self._ssm_command_waiter("AmazonCloudWatch-ManageAgent", self._ssm_command_waiter("AmazonCloudWatch-ManageAgent",
parameters_start_cwa, self.node_ids) parameters_start_cwa, self.node_id)
logger.info( logger.info("Unified CloudWatch Agent started successfully on node {}."
"Unified Cloudwatch Agent started successfully on {} node(s)." .format(self.node_id))
.format(len(self.node_ids)))
def _setup_cwa(self) -> bool: def _setup_cwa(self) -> bool:
cwa_installed = self._check_cwa_installed_ec2_tag() cwa_installed = self._check_cwa_installed_ec2_tag()
if cwa_installed == "False": if cwa_installed == "False":
res_cwa_installed = self._ensure_cwa_installed_ssm(self.node_ids) res_cwa_installed = self._ensure_cwa_installed_ssm(self.node_id)
return res_cwa_installed return res_cwa_installed
else: else:
return True return True
def _get_head_node_config_hash(self) -> str: def _get_head_node_config_hash(self, config_type: str) -> str:
hash_key_value = "-".join([CLOUDWATCH_CONFIG_HASH_TAG_BASE, "agent"]) hash_key_value = "-".join(
[CLOUDWATCH_CONFIG_HASH_TAG_BASE, config_type])
filters = copy.deepcopy( filters = copy.deepcopy(
self._get_current_cluster_session_nodes(self.cluster_name)) self._get_current_cluster_session_nodes(self.cluster_name))
filters.append({ filters.append({
@ -432,11 +493,12 @@ class CloudwatchHelper:
"{} Error caught when getting value of {} tag on head node". "{} Error caught when getting value of {} tag on head node".
format(e.response["Error"], hash_key_value)) format(e.response["Error"], hash_key_value))
def _get_cur_node_config_hash(self) -> str: def _get_cur_node_config_hash(self, config_type: str) -> str:
hash_key_value = "-".join([CLOUDWATCH_CONFIG_HASH_TAG_BASE, "agent"]) hash_key_value = "-".join(
[CLOUDWATCH_CONFIG_HASH_TAG_BASE, config_type])
try: try:
response = self.ec2_client.describe_instances( response = self.ec2_client.describe_instances(
InstanceIds=self.node_ids) InstanceIds=[self.node_id])
reservations = response["Reservations"] reservations = response["Reservations"]
message = "More than 1 response received from " \ message = "More than 1 response received from " \
"describing current node" "describing current node"
@ -447,9 +509,9 @@ class CloudwatchHelper:
hash_value = self._get_default_empty_config_file_hash() hash_value = self._get_default_empty_config_file_hash()
for tag in tags: for tag in tags:
if tag["Key"] == hash_key_value: if tag["Key"] == hash_key_value:
logger.info("Successfully get Unified Cloudwatch Agent " logger.info(
"hash tag value from node {}".format( "Successfully get cloudwatch {} hash tag value from "
self.node_ids)) "node {}".format(config_type, self.node_id))
hash_value = tag["Value"] hash_value = tag["Value"]
return hash_value return hash_value
except botocore.exceptions.ClientError as e: except botocore.exceptions.ClientError as e:
@ -457,36 +519,29 @@ class CloudwatchHelper:
"{} Error caught when getting hash tag {} tag".format( "{} Error caught when getting hash tag {} tag".format(
e.response["Error"], hash_key_value)) e.response["Error"], hash_key_value))
def _ensure_cwa_installed_ssm(self, node_ids: List[str]) -> bool: def _ensure_cwa_installed_ssm(self, node_id: str) -> bool:
""" """
Check if Unified Cloudwatch Agent is installed via ssm run command. Check if Unified Cloudwatch Agent is installed via ssm run command.
If not, notify user to use an AMI with If not, notify user to use an AMI with
the Unified CloudWatch Agent installed. the Unified CloudWatch Agent installed.
""" """
logger.info("Checking Unified Cloudwatch Agent " logger.info("Checking Unified Cloudwatch Agent "
"status on {} nodes".format(len(node_ids))) "status on node {}".format(node_id))
parameters_status_cwa = { parameters_status_cwa = {
"action": ["status"], "action": ["status"],
"mode": ["ec2"], "mode": ["ec2"],
} }
self._ec2_health_check_waiter(node_ids) self._ec2_health_check_waiter(node_id)
cmd_invocation_res = self._ssm_command_waiter( cmd_invocation_res = self._ssm_command_waiter(
"AmazonCloudWatch-ManageAgent", parameters_status_cwa, node_ids, "AmazonCloudWatch-ManageAgent", parameters_status_cwa, node_id,
False) False)
uninstalled_nodes = [] cwa_installed = cmd_invocation_res.get(node_id, False)
installed_nodes = [] if not cwa_installed:
for node_id, res in cmd_invocation_res.items():
if not res:
uninstalled_nodes.append(node_id)
else:
installed_nodes.append(node_id)
if len(uninstalled_nodes) > 0:
logger.warning( logger.warning(
"Unified CloudWatch Agent not installed on {}. " "Unified CloudWatch Agent not installed on {}. "
"Ray logs, metrics not picked up. " "Ray logs, metrics not picked up. "
"Please use an AMI with Unified CloudWatch Agent installed." "Please use an AMI with Unified CloudWatch Agent installed."
.format(uninstalled_nodes)) .format(node_id))
return False return False
else: else:
return True return True
@ -504,11 +559,12 @@ class CloudwatchHelper:
def _check_cwa_installed_ec2_tag(self) -> List[str]: def _check_cwa_installed_ec2_tag(self) -> List[str]:
""" """
Check if Unified Cloudwatch Agent is installed. Filtering all nodes to get nodes
without Unified CloudWatch Agent installed
""" """
try: try:
response = self.ec2_client.describe_instances( response = self.ec2_client.describe_instances(
InstanceIds=self.node_ids) InstanceIds=[self.node_id])
reservations = response["Reservations"] reservations = response["Reservations"]
message = "More than 1 response received from " \ message = "More than 1 response received from " \
"describing current node" "describing current node"
@ -519,15 +575,15 @@ class CloudwatchHelper:
cwa_installed = str(False) cwa_installed = str(False)
for tag in tags: for tag in tags:
if tag["Key"] == CLOUDWATCH_AGENT_INSTALLED_TAG: if tag["Key"] == CLOUDWATCH_AGENT_INSTALLED_TAG:
logger.info("Unified Cloudwatch Agent is installed on " logger.info("Unified CloudWatch Agent is installed on "
"node {}".format(self.node_ids)) "node {}".format(self.node_id))
cwa_installed = tag["Value"] cwa_installed = tag["Value"]
return cwa_installed return cwa_installed
except botocore.exceptions.ClientError as e: except botocore.exceptions.ClientError as e:
logger.warning( logger.warning(
"{} Error caught when getting Unified Cloudwatch Agent status " "{} Error caught when getting Unified CloudWatch Agent "
"based on {} tag".format(e.response["Error"], "status based on {} tag".format(
CLOUDWATCH_AGENT_INSTALLED_TAG)) e.response["Error"], CLOUDWATCH_AGENT_INSTALLED_TAG))
@staticmethod @staticmethod
def resolve_instance_profile_name( def resolve_instance_profile_name(
@ -545,7 +601,7 @@ class CloudwatchHelper:
doesn't exist. doesn't exist.
""" """
cwa_cfg_exists = CloudwatchHelper.cloudwatch_config_exists( cwa_cfg_exists = CloudwatchHelper.cloudwatch_config_exists(
config, "config") config, "agent")
return CLOUDWATCH_RAY_INSTANCE_PROFILE if cwa_cfg_exists \ return CLOUDWATCH_RAY_INSTANCE_PROFILE if cwa_cfg_exists \
else default_instance_profile_name else default_instance_profile_name
@ -563,7 +619,7 @@ class CloudwatchHelper:
default ray iam role name if cloudwatch config file doesn't exist. default ray iam role name if cloudwatch config file doesn't exist.
""" """
cwa_cfg_exists = CloudwatchHelper.cloudwatch_config_exists( cwa_cfg_exists = CloudwatchHelper.cloudwatch_config_exists(
config, "config") config, "agent")
return CLOUDWATCH_RAY_IAM_ROLE if cwa_cfg_exists \ return CLOUDWATCH_RAY_IAM_ROLE if cwa_cfg_exists \
else default_iam_role_name else default_iam_role_name
@ -583,7 +639,7 @@ class CloudwatchHelper:
cluster config file. cluster config file.
""" """
cwa_cfg_exists = CloudwatchHelper.cloudwatch_config_exists( cwa_cfg_exists = CloudwatchHelper.cloudwatch_config_exists(
config, "config") config, "agent")
if cwa_cfg_exists: if cwa_cfg_exists:
cloudwatch_managed_policy = { cloudwatch_managed_policy = {
"Version": "2012-10-17", "Version": "2012-10-17",
@ -624,7 +680,7 @@ class CloudwatchHelper:
@staticmethod @staticmethod
def cloudwatch_config_exists(config: Dict[str, Any], def cloudwatch_config_exists(config: Dict[str, Any],
config_key_name: str) -> bool: config_type: str) -> bool:
"""Check if CloudWatch configuration was specified by the user """Check if CloudWatch configuration was specified by the user
in their cluster config file. in their cluster config file.
@ -633,12 +689,11 @@ class CloudwatchHelper:
Args: Args:
config: provider section of cluster config file. config: provider section of cluster config file.
config_key_name: config file name. config_type: type of CloudWatch config file.
Returns: Returns:
True if config file is specified by user. True if config file is specified by user.
False if config file is not specified. False if config file is not specified.
""" """
cfg = config.get("cloudwatch", {}).get("agent", cfg = config.get("cloudwatch", {}).get(config_type, {}).get("config")
{}).get(config_key_name)
return bool(cfg) return bool(cfg)

View file

@ -361,7 +361,7 @@ class AWSNodeProvider(NodeProvider):
"Value": v, "Value": v,
}) })
if CloudwatchHelper.cloudwatch_config_exists(self.provider_config, if CloudwatchHelper.cloudwatch_config_exists(self.provider_config,
"config"): "agent"):
cwa_installed = self._check_ami_cwa_installation(node_config) cwa_installed = self._check_ami_cwa_installation(node_config)
if cwa_installed: if cwa_installed:
tag_pairs.extend([{ tag_pairs.extend([{

View file

@ -307,7 +307,7 @@ class NodeUpdater:
from ray.autoscaler._private.aws.cloudwatch.cloudwatch_helper \ from ray.autoscaler._private.aws.cloudwatch.cloudwatch_helper \
import CloudwatchHelper import CloudwatchHelper
CloudwatchHelper(self.provider.provider_config, CloudwatchHelper(self.provider.provider_config,
[self.node_id], self.provider.cluster_name). \ self.node_id, self.provider.cluster_name). \
update_from_config(self.is_head_node) update_from_config(self.is_head_node)
if node_tags.get(TAG_RAY_RUNTIME_CONFIG) == self.runtime_hash: if node_tags.get(TAG_RAY_RUNTIME_CONFIG) == self.runtime_hash:

View file

@ -3,10 +3,6 @@
"metrics_collection_interval":60, "metrics_collection_interval":60,
"run_as_user":"root" "run_as_user":"root"
}, },
"csm":{
"memory_limit_in_mb":20,
"port":31000
},
"logs":{ "logs":{
"metrics_collected": { "metrics_collected": {
"prometheus": { "prometheus": {
@ -116,60 +112,72 @@
} }
} }
}, },
"metrics":{ "metrics": {
"append_dimensions":{ "namespace": "{cluster_name}-ray-CWAgent",
"AutoScalingGroupName":"${aws:AutoScalingGroupName}", "aggregation_dimensions": [
"InstanceId":"${aws:InstanceId}" [
}, "InstanceId"
"metrics_collected":{ ]
"collectd":{ ],
"metrics_aggregation_interval":60 "append_dimensions": {
}, "AutoScalingGroupName": "${aws:AutoScalingGroupName}",
"cpu":{ "InstanceId": "${aws:InstanceId}"
"measurement":[ },
"usage_active", "metrics_collected": {
"usage_system", "collectd": {
"usage_user", "metrics_aggregation_interval": 60
"usage_idle", },
"time_active", "cpu": {
"time_system", "measurement": [
"time_user", "usage_active",
"time_idle" "usage_system",
] "usage_user",
}, "usage_idle",
"processes":{ "time_active",
"measurement":[ "time_system",
"processes_running", "time_user",
"processes_sleeping", "time_idle"
"processes_zombies", ],
"processes_dead", "resources": [
"processes_total" "*"
], ]
"metrics_collection_interval":60, },
"resources":[ "processes": {
"*" "measurement": [
] "processes_running",
}, "processes_sleeping",
"disk":{ "processes_zombies",
"measurement":[ "processes_dead",
"disk_used_percent" "processes_total"
], ],
"metrics_collection_interval":60, "metrics_collection_interval": 60,
"resources":[ "resources": [
"*" "*"
] ]
}, },
"mem":{ "disk": {
"measurement":[ "measurement": [
"mem_used_percent" "disk_used_percent"
], ],
"metrics_collection_interval":60 "metrics_collection_interval": 60,
}, "resources": [
"statsd":{ "/"
"metrics_aggregation_interval":60, ]
"metrics_collection_interval":10, },
"service_address":":8125" "mem": {
} "measurement": [
} "mem_used_percent"
],
"metrics_collection_interval": 60,
"resources": [
"*"
]
},
"statsd": {
"metrics_aggregation_interval": 60,
"metrics_collection_interval": 10,
"service_address": ":8125"
}
}
} }
} }

View file

@ -0,0 +1,238 @@
[
{
"type":"explorer",
"x":12,
"y":18,
"width":12,
"height":6,
"properties": {
"metrics": [
{
"metricName": "CPUUtilization",
"resourceType": "AWS::EC2::Instance",
"stat": "Average"
}
],
"aggregateBy": {
"key": "*",
"func": "SUM"
},
"labels": [
{
"key": "cloudwatch-agent-installed",
"value": "True"
},
{
"key": "ray-cluster-name",
"value": "{cluster_name}"
}
],
"widgetOptions": {
"legend": {
"position": "bottom"
},
"view": "timeSeries",
"stacked": false,
"rowsPerPage": 1,
"widgetsPerRow": 1
},
"title":"Cluster CPU Utilization"
}
},
{
"type":"explorer",
"x":0,
"y":18,
"width":12,
"height":6,
"properties": {
"metrics": [
{
"metricName": "CPUUtilization",
"resourceType": "AWS::EC2::Instance",
"stat": "Average"
}
],
"aggregateBy": {
"key": "*",
"func": "AVG"
},
"labels": [
{
"key": "cloudwatch-agent-installed",
"value": "True"
},
{
"key": "ray-cluster-name",
"value": "{cluster_name}"
}
],
"widgetOptions": {
"legend": {
"position": "bottom"
},
"view": "timeSeries",
"stacked": false,
"rowsPerPage": 1,
"widgetsPerRow": 1
},
"title":"Single Node CPU Utilization (Avg and Max)"
}
},
{
"type":"metric",
"x":12,
"y":6,
"width":12,
"height":6,
"properties":{
"view":"timeSeries",
"metrics":[
[ { "expression": "SUM(SEARCH('{{cluster_name}-ray-CWAgent,InstanceId} processes_running', 'Average', 300))", "label": "cluster running process sum", "id": "e1" } ],
[ { "expression": "SUM(SEARCH('{{cluster_name}-ray-CWAgent,InstanceId} processes_sleeping', 'Average', 300))", "label": "cluster sleeping process sum", "id": "e2" } ]
],
"region":"{region}",
"stat":"Average",
"period":60,
"title":"Cluster Processes"
}
},
{
"type":"metric",
"x":0,
"y":6,
"width":12,
"height":6,
"properties":{
"view":"timeSeries",
"metrics":[
[ { "expression": "AVG(SEARCH('{{cluster_name}-ray-CWAgent,InstanceId} processes_running', 'Average', 300))", "label": "cluster running process average", "id": "e3" } ],
[ { "expression": "AVG(SEARCH('{{cluster_name}-ray-CWAgent,InstanceId} processes_sleeping', 'Average', 300))", "label": "cluster sleeping process average", "id": "e4" } ],
[ { "expression": "MAX(SEARCH('{{cluster_name}-ray-CWAgent,InstanceId} processes_running', 'Average', 300))", "label": "cluster running process maximum", "id": "e5" } ],
[ { "expression": "MAX(SEARCH('{{cluster_name}-ray-CWAgent,InstanceId} processes_sleeping', 'Average', 300))", "label": "cluster sleeping process maximum", "id": "e6" } ]
],
"region":"{region}",
"stat":"Average",
"period":60,
"title":"Single Node Processes (Avg and Max)"
}
},
{
"type":"metric",
"x":12,
"y":12,
"width":12,
"height":6,
"properties":{
"view":"timeSeries",
"stacked":false,
"metrics":[
[ { "expression": "SUM(SEARCH('{{cluster_name}-ray-CWAgent,InstanceId} disk_used_percent', 'Average', 300))", "label": "cluster disk used percent sum", "id": "e7", "period": 300 } ]
],
"region":"{region}",
"title":"Cluster Disk Usage"
}
},
{
"type":"metric",
"x":0,
"y":12,
"width":12,
"height":6,
"properties":{
"view":"timeSeries",
"stacked":false,
"metrics":[
[ { "expression": "AVG(SEARCH('{{cluster_name}-ray-CWAgent,InstanceId} disk_used_percent', 'Average', 300))", "id": "e8", "label": "cluster disk used percent average", "period": 300 } ],
[ { "expression": "MAX(SEARCH('{{cluster_name}-ray-CWAgent,InstanceId} disk_used_percent', 'Maximum', 300))", "id": "e9", "label": "cluster disk used percent maximum", "period": 300 } ]
],
"region":"{region}",
"title":"Single Node Disk Usage (Avg and Max)"
}
},
{
"type":"metric",
"x":12,
"y":18,
"width":12,
"height":6,
"properties": {
"metrics": [
[ { "expression": "SUM(SEARCH('{{cluster_name}-ray-CWAgent,InstanceId} mem_used_percent', 'Average', 300))", "id": "e10", "label": "cluster mem used percent sum", "period": 300 } ]
],
"view": "timeSeries",
"stacked": false,
"region": "{region}",
"stat": "Maximum",
"period": 300,
"start": "-PT2H",
"end": "P0D",
"title": "Cluster Memory Usage"
}
},
{
"type":"metric",
"x":0,
"y":18,
"width":12,
"height":6,
"properties": {
"metrics": [
[ { "expression": "AVG(SEARCH('{{cluster_name}-ray-CWAgent,InstanceId} mem_used_percent', 'Average', 300))", "id": "e11", "label": "cluster mem used percent average", "period": 300 } ],
[ { "expression": "MAX(SEARCH('{{cluster_name}-ray-CWAgent,InstanceId} mem_used_percent', 'Maximum', 300))", "id": "e12", "label": "cluster mem used percent maximum", "period": 300 } ]
],
"view": "timeSeries",
"stacked": false,
"region": "{region}",
"stat": "Maximum",
"period": 300,
"start": "-PT2H",
"end": "P0D",
"title": "Single Node Memory Usage (Avg and Max)"
}
},
{
"height": 6,
"width": 12,
"y": 0,
"x": 0,
"type": "metric",
"properties": {
"metrics": [
[ { "expression": "SUM(SEARCH('{{cluster_name}-ray-prometheus,instance} ray_node_cpu_count', 'Maximum', 300))", "label": "cluster cpu sum", "id": "e13" } ]
],
"view": "timeSeries",
"stacked": false,
"region": "{region}",
"stat": "Maximum",
"period": 300,
"start": "-PT2H",
"end": "P0D",
"title": "Cluster CPUs"
}
},
{
"height": 6,
"width": 12,
"y": 0,
"x": 12,
"type": "metric",
"properties": {
"metrics": [
[ { "expression": "SUM(SEARCH('{{cluster_name}-ray-prometheus,instance} object_store_available_memory', 'Average', 300))", "label": "cluster object store available memory sum", "id": "e14" } ]
],
"view": "timeSeries",
"stacked": false,
"region": "{region}",
"stat": "Maximum",
"period": 300,
"start": "-PT2H",
"end": "P0D",
"title": "Cluster Object Store Available Memory"
}
}
]

View file

@ -14,8 +14,9 @@ provider:
# We depend on AWS Systems Manager (SSM) to deploy CloudWatch configuration updates to your cluster, # We depend on AWS Systems Manager (SSM) to deploy CloudWatch configuration updates to your cluster,
# with relevant configuration created or updated in the SSM Parameter Store during `ray up`. # with relevant configuration created or updated in the SSM Parameter Store during `ray up`.
# The `AmazonCloudWatch-ray_agent_config_{cluster_name}` SSM Parameter Store Config Key is used to # We support two CloudWatch related config type under this cloudwatch section: agent and dashboard.
# store a remote cache of the last Unified CloudWatch Agent config applied. # The `AmazonCloudWatch-ray_{config_type}_config_{cluster_name}` SSM Parameter Store Config Key is used to
# store a remote cache of the last Unified CloudWatch config applied.
# Every time you run `ray up` to update your cluster, we compare your local CloudWatch config file contents # Every time you run `ray up` to update your cluster, we compare your local CloudWatch config file contents
# to the SSM Parameter Store's contents for that config and, if they differ, then the associated CloudWatch # to the SSM Parameter Store's contents for that config and, if they differ, then the associated CloudWatch
@ -39,6 +40,17 @@ provider:
max_attempts: 120 max_attempts: 120
# Seconds to wait between each Unified CloudWatch Agent SSM config update attempt. # Seconds to wait between each Unified CloudWatch Agent SSM config update attempt.
delay_seconds: 30 delay_seconds: 30
# For CloudWatch Dashboard config files, we will also replace references to
# `{region}` with your cluster's region name, and `{cluster_name}` with your cluster name.
dashboard:
# CloudWatch Dashboard name
# Per-cluster level dashboard is created and dashboard name will be
# `{your_cluster_name}-example-dashboard-name` as default
name: "example-dashboard-name"
# The CloudWatch Dashboard is defined via the config file described
# at https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/CloudWatch-Dashboard-Body-Structure.html.
# Path to the CloudWatch Dashboard config file
config: "cloudwatch/example-cloudwatch-dashboard-config.json"
# How Ray will authenticate with newly launched nodes. # How Ray will authenticate with newly launched nodes.
auth: auth:

View file

@ -201,6 +201,16 @@
"description": "Seconds to wait between each Unified CloudWatch Agent installation attempt." "description": "Seconds to wait between each Unified CloudWatch Agent installation attempt."
} }
} }
},
"dashboard": {
"name": {
"type": ["string", "null"],
"description": "User defined CloudWatch Dashboard name."
},
"config": {
"type": ["string", "null"],
"description": "Path to CloudWatch Dashboard config file. See https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/CloudWatch-Dashboard-Body-Structure.html for additional details."
}
} }
} }
} }