[autoscaler] Add kill and get IP commands to CLI for testing (#3731)

## What do these changes do?

Adds 2 commands to the CLI that take in an autoscaler config:
1. Kill a random ray node in the cluster.
2. Get all the worker node IP addresses.

These commands are both for testing and are not recommended for normal use.

## Related issue number
Closes #3685.
This commit is contained in:
Stephanie Wang 2019-01-10 22:06:57 -08:00 committed by Richard Liaw
parent 574f0b73bc
commit cc5ecd71c5
2 changed files with 85 additions and 7 deletions

View file

@ -11,6 +11,7 @@ import time
import sys
import click
import logging
import random
import yaml
try: # py3
@ -94,6 +95,35 @@ def teardown_cluster(config_file, yes, workers_only, override_cluster_name):
nodes = provider.nodes({TAG_RAY_NODE_TYPE: "worker"})
def kill_node(config_file, yes, override_cluster_name):
"""Kills a random Raylet worker."""
config = yaml.load(open(config_file).read())
if override_cluster_name is not None:
config["cluster_name"] = override_cluster_name
config = _bootstrap_config(config)
confirm("This will kill a node in your cluster", yes)
provider = get_node_provider(config["provider"], config["cluster_name"])
nodes = provider.nodes({TAG_RAY_NODE_TYPE: "worker"})
node = random.choice(nodes)
logger.info("Terminating worker {}".format(node))
updater = NodeUpdaterProcess(
node,
config["provider"],
config["auth"],
config["cluster_name"],
config["file_mounts"], [],
"",
redirect_output=False)
_exec(updater, "ray stop", False, False)
time.sleep(5)
return provider.external_ip(node)
def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
override_cluster_name):
"""Create the cluster head node, which in turn creates the workers."""
@ -343,6 +373,17 @@ def get_head_node_ip(config_file, override_cluster_name):
return provider.external_ip(head_node)
def get_worker_node_ips(config_file, override_cluster_name):
"""Returns worker node IPs for given configuration file."""
config = yaml.load(open(config_file).read())
if override_cluster_name is not None:
config["cluster_name"] = override_cluster_name
provider = get_node_provider(config["provider"], config["cluster_name"])
nodes = provider.nodes({TAG_RAY_NODE_TYPE: "worker"})
return [provider.external_ip(node) for node in nodes]
def _get_head_node(config,
config_file,
override_cluster_name,

View file

@ -9,9 +9,9 @@ import os
import subprocess
import ray.services as services
from ray.autoscaler.commands import (attach_cluster, exec_cluster,
create_or_update_cluster, rsync,
teardown_cluster, get_head_node_ip)
from ray.autoscaler.commands import (
attach_cluster, exec_cluster, create_or_update_cluster, rsync,
teardown_cluster, get_head_node_ip, kill_node, get_worker_node_ips)
import ray.ray_constants as ray_constants
import ray.utils
@ -274,8 +274,8 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
# Get the node IP address if one is not provided.
ray_params.update_if_absent(
node_ip_address=services.get_node_ip_address())
logger.info("Using IP address {} for this node."
.format(ray_params.node_ip_address))
logger.info("Using IP address {} for this node.".format(
ray_params.node_ip_address))
ray_params.update_if_absent(
redis_port=redis_port,
redis_shard_ports=redis_shard_ports,
@ -342,8 +342,8 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
# Get the node IP address if one is not provided.
ray_params.update_if_absent(
node_ip_address=services.get_node_ip_address(redis_address))
logger.info("Using IP address {} for this node."
.format(ray_params.node_ip_address))
logger.info("Using IP address {} for this node.".format(
ray_params.node_ip_address))
# Check that there aren't already Redis clients with the same IP
# address connected with this Redis instance. This raises an exception
# if the Redis server already has clients on this node.
@ -456,6 +456,7 @@ def stop():
help="Don't ask for confirmation.")
def create_or_update(cluster_config_file, min_workers, max_workers, no_restart,
restart_only, yes, cluster_name):
"""Create or update a Ray cluster."""
if restart_only or no_restart:
assert restart_only != no_restart, "Cannot set both 'restart_only' " \
"and 'no_restart' at the same time!"
@ -483,9 +484,30 @@ def create_or_update(cluster_config_file, min_workers, max_workers, no_restart,
type=str,
help="Override the configured cluster name.")
def teardown(cluster_config_file, yes, workers_only, cluster_name):
"""Tear down the Ray cluster."""
teardown_cluster(cluster_config_file, yes, workers_only, cluster_name)
@cli.command()
@click.argument("cluster_config_file", required=True, type=str)
@click.option(
"--yes",
"-y",
is_flag=True,
default=False,
help="Don't ask for confirmation.")
@click.option(
"--cluster-name",
"-n",
required=False,
type=str,
help="Override the configured cluster name.")
def kill_random_node(cluster_config_file, yes, cluster_name):
"""Kills a random Ray node. For testing purposes only."""
click.echo("Killed node with IP " +
kill_node(cluster_config_file, yes, cluster_name))
@cli.command()
@click.argument("cluster_config_file", required=True, type=str)
@click.option(
@ -664,6 +686,19 @@ def get_head_ip(cluster_config_file, cluster_name):
click.echo(get_head_node_ip(cluster_config_file, cluster_name))
@cli.command()
@click.argument("cluster_config_file", required=True, type=str)
@click.option(
"--cluster-name",
"-n",
required=False,
type=str,
help="Override the configured cluster name.")
def get_worker_ips(cluster_config_file, cluster_name):
worker_ips = get_worker_node_ips(cluster_config_file, cluster_name)
click.echo("\n".join(worker_ips))
@cli.command()
def stack():
COMMAND = """
@ -700,7 +735,9 @@ cli.add_command(rsync_up, name="rsync_up")
cli.add_command(submit)
cli.add_command(teardown)
cli.add_command(teardown, name="down")
cli.add_command(kill_random_node)
cli.add_command(get_head_ip, name="get_head_ip")
cli.add_command(get_worker_ips)
cli.add_command(stack)