[cli] Tests (#10057)

Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
This commit is contained in:
Maksim Smolin 2020-08-22 13:29:10 -07:00 committed by GitHub
parent 8362029dcf
commit 245c0a9e43
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 596 additions and 80 deletions

View file

@ -330,21 +330,32 @@ class AWSNodeProvider(NodeProvider):
# todo: timed?
# todo: handle plurality?
with cli_logger.group(
"Launching {} nodes",
"Launched {} nodes",
count,
_tags=dict(subnet_id=subnet_id)):
for instance in created:
# NOTE(maximsmol): This is needed for mocking
# boto3 for tests. This is likely a bug in moto
# but AWS docs don't seem to say.
# You can patch moto/ec2/responses/instances.py
# to fix this (add <stateReason> to EC2_RUN_INSTANCES)
# The correct value is technically
# {"code": "0", "Message": "pending"}
state_reason = instance.state_reason or {
"Message": "pending"
}
cli_logger.print(
"Launched instance {}",
instance.instance_id,
_tags=dict(
state=instance.state["Name"],
info=instance.state_reason["Message"]))
info=state_reason["Message"]))
cli_logger.old_info(
logger, "NodeProvider: Created instance "
"[id={}, name={}, info={}]", instance.instance_id,
instance.state["Name"],
instance.state_reason["Message"])
instance.state["Name"], state_reason["Message"])
break
except botocore.exceptions.ClientError as exc:
if attempt == BOTO_CREATE_MAX_RETRIES:

View file

@ -20,7 +20,7 @@ import click
import colorama
from colorful.core import ColorfulString
import colorful as cf
colorama.init()
colorama.init(strip=False)
def _patched_makeRecord(self,
@ -187,12 +187,6 @@ class _CliLogger():
Output verbosity.
Low verbosity will disable `verbose` and `very_verbose` messages.
dump_command_output (bool):
Determines whether the old behavior of dumping command output
to console will be used, or the new behavior of redirecting to
a file.
! Currently unused.
"""
strip: bool
old_style: bool
@ -201,22 +195,19 @@ class _CliLogger():
indent_level: int
verbosity: int
dump_command_output: bool
_autodetected_cf_colormode: int
def __init__(self):
self.strip = False
self.old_style = True
self.color_mode = "auto"
self.indent_level = 0
self.verbosity = 0
self.dump_command_output = False
self.verbosity = 0
# store whatever colorful has detected for future use if
# the color ouput is toggled (colorful detects # of supported colors,
# so it has some non-trivial logic to determine this)
self._autodetected_cf_colormode = cf.colormode
self._autodetected_cf_colormode = cf.colorful.colormode
def detect_colors(self):
"""Update color output settings.
@ -225,7 +216,7 @@ class _CliLogger():
color output
(8-color ANSI if no terminal detected to be safe) in colorful.
"""
self.color_mode = self.color_mode.lower()
if self.color_mode == "true":
if self._autodetected_cf_colormode != cf.NO_COLORS:
cf.colormode = self._autodetected_cf_colormode
@ -236,7 +227,6 @@ class _CliLogger():
cf.disable()
return
if self.color_mode == "auto":
self.strip = not sys.stdout.isatty()
# colorful autodetects tty settings
return

View file

@ -12,8 +12,8 @@ import time
from ray.autoscaler.docker import check_docker_running_cmd, with_docker_exec
from ray.autoscaler.log_timer import LogTimer
from ray.autoscaler.subprocess_output_util import run_cmd_redirected,\
ProcessRunnerError
from ray.autoscaler.subprocess_output_util import (run_cmd_redirected,
ProcessRunnerError)
from ray.autoscaler.cli_logger import cli_logger
import colorful as cf
@ -473,7 +473,6 @@ class SSHCommandRunner(CommandRunnerInterface):
run_env="auto", # Unused argument.
ssh_options_override_ssh_key="",
):
if ssh_options_override_ssh_key:
ssh_options = SSHOptions(ssh_options_override_ssh_key)
else:

View file

@ -1,3 +1,4 @@
import colorful as cf
import copy
import hashlib
import json
@ -35,11 +36,11 @@ from ray.autoscaler.command_runner import set_using_login_shells, \
from ray.autoscaler.command_runner import DockerCommandRunner
from ray.autoscaler.log_timer import LogTimer
from ray.worker import global_worker
from ray.util.debug import log_once
import ray.autoscaler.subprocess_output_util as cmd_output_util
from ray.autoscaler.cli_logger import cli_logger
import colorful as cf
logger = logging.getLogger(__name__)
@ -102,12 +103,18 @@ def create_or_update_cluster(config_file: str,
restart_only: bool,
yes: bool,
override_cluster_name: Optional[str],
no_config_cache: bool,
no_config_cache: bool = False,
redirect_command_output: bool = False,
use_login_shells: bool = True) -> None:
"""Create or updates an autoscaling Ray cluster from a config json."""
set_using_login_shells(use_login_shells)
cmd_output_util.set_output_redirected(redirect_command_output)
if not use_login_shells:
cmd_output_util.set_allow_interactive(False)
if redirect_command_output is None:
# Do not redirect by default.
cmd_output_util.set_output_redirected(False)
else:
cmd_output_util.set_output_redirected(redirect_command_output)
if use_login_shells:
cli_logger.warning(
@ -120,8 +127,6 @@ def create_or_update_cluster(config_file: str,
cf.underlined("if you tested your workflow and it is compatible"))
cli_logger.newline()
cli_logger.detect_colors()
def handle_yaml_error(e):
cli_logger.error("Cluster config invalid\n")
cli_logger.error("Failed to load YAML file " + cf.bold("{}"),
@ -189,7 +194,7 @@ def create_or_update_cluster(config_file: str,
if config["provider"]["type"] != "aws":
cli_logger.old_style = True
cli_logger.newline()
config = _bootstrap_config(config, no_config_cache)
config = _bootstrap_config(config, no_config_cache=no_config_cache)
if config["provider"]["type"] != "aws":
cli_logger.old_style = False
@ -222,15 +227,16 @@ def _bootstrap_config(config: Dict[str, Any],
try_reload_log_state(config_cache["config"]["provider"],
config_cache.get("provider_log_info"))
cli_logger.verbose_warning(
"Loaded cached provider configuration "
"from " + cf.bold("{}"), cache_key)
if cli_logger.verbosity == 0:
cli_logger.warning("Loaded cached provider configuration")
cli_logger.warning(
"If you experience issues with "
"the cloud provider, try re-running "
"the command with {}.", cf.bold("--no-config-cache"))
if log_once("_printed_cached_config_warning"):
cli_logger.verbose_warning(
"Loaded cached provider configuration "
"from " + cf.bold("{}"), cache_key)
if cli_logger.verbosity == 0:
cli_logger.warning("Loaded cached provider configuration")
cli_logger.warning(
"If you experience issues with "
"the cloud provider, try re-running "
"the command with {}.", cf.bold("--no-config-cache"))
return config_cache["config"]
else:
@ -309,7 +315,6 @@ def teardown_cluster(config_file: str, yes: bool, workers_only: bool,
try:
def remaining_nodes():
workers = provider.non_terminated_nodes({
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
})
@ -361,6 +366,7 @@ def teardown_cluster(config_file: str, yes: bool, workers_only: bool,
A = remaining_nodes()
cli_logger.print("{} nodes remaining after 1 second.",
cf.bold(len(A)))
cli_logger.success("No nodes remaining.")
finally:
provider.cleanup()
@ -710,9 +716,14 @@ def get_or_create_head_node(config,
provider.cleanup()
def attach_cluster(config_file: str, start: bool, use_screen: bool,
use_tmux: bool, override_cluster_name: Optional[str],
new: bool, port_forward: Any):
def attach_cluster(config_file: str,
start: bool,
use_screen: bool,
use_tmux: bool,
override_cluster_name: Optional[str],
no_config_cache: bool = False,
new: bool = False,
port_forward: Any = None):
"""Attaches to a screen for the specified cluster.
Arguments:
@ -750,7 +761,9 @@ def attach_cluster(config_file: str, start: bool, use_screen: bool,
stop=False,
start=start,
override_cluster_name=override_cluster_name,
port_forward=port_forward)
no_config_cache=no_config_cache,
port_forward=port_forward,
)
def exec_cluster(config_file: str,
@ -762,6 +775,7 @@ def exec_cluster(config_file: str,
stop: bool = False,
start: bool = False,
override_cluster_name: Optional[str] = None,
no_config_cache: bool = False,
port_forward: Any = None,
with_output: bool = False):
"""Runs a command on the specified cluster.
@ -781,10 +795,15 @@ def exec_cluster(config_file: str,
assert not (screen and tmux), "Can specify only one of `screen` or `tmux`."
assert run_env in RUN_ENV_TYPES, "--run_env must be in {}".format(
RUN_ENV_TYPES)
# TODO(rliaw): We default this to True to maintain backwards-compat.
# In the future we would want to support disabling login-shells
# and interactivity.
cmd_output_util.set_allow_interactive(True)
config = yaml.safe_load(open(config_file).read())
if override_cluster_name is not None:
config["cluster_name"] = override_cluster_name
config = _bootstrap_config(config)
config = _bootstrap_config(config, no_config_cache=no_config_cache)
head_node = _get_head_node(
config, config_file, override_cluster_name, create_if_needed=start)
@ -881,6 +900,7 @@ def rsync(config_file: str,
target: Optional[str],
override_cluster_name: Optional[str],
down: bool,
no_config_cache: bool = False,
all_nodes: bool = False):
"""Rsyncs files.
@ -902,7 +922,7 @@ def rsync(config_file: str,
config = yaml.safe_load(open(config_file).read())
if override_cluster_name is not None:
config["cluster_name"] = override_cluster_name
config = _bootstrap_config(config)
config = _bootstrap_config(config, no_config_cache=no_config_cache)
provider = get_node_provider(config["provider"], config["cluster_name"])
try:

View file

@ -10,14 +10,15 @@ import colorful as cf
CONN_REFUSED_PATIENCE = 30 # how long to wait for sshd to run
_config = {"redirect_output": True}
_redirect_output = False # Whether to log command output to a temporary file
_allow_interactive = True # whether to pass on stdin to running commands.
def is_output_redirected():
return _config["redirect_output"]
return _redirect_output
def set_output_redirected(val):
def set_output_redirected(val: bool):
"""Choose between logging to a temporary file and to `sys.stdout`.
The default is to log to a file.
@ -26,7 +27,24 @@ def set_output_redirected(val):
val (bool): If true, subprocess output will be redirected to
a temporary file.
"""
_config["redirect_output"] = val
global _redirect_output
_redirect_output = val
def does_allow_interactive():
return _allow_interactive
def set_allow_interactive(val: bool):
"""Choose whether to pass on stdin to running commands.
The default is to pipe stdin and close it immediately.
Args:
val (bool): If true, stdin will be passed to commands.
"""
global _allow_interactive
_allow_interactive = val
class ProcessRunnerError(Exception):
@ -167,13 +185,13 @@ def _run_and_process_output(cmd,
Args:
cmd (List[str]): Command to run.
stdout_file: File to redirect stdout to.
stdout_file: File to redirect stderr to.
stderr_file: File to redirect stderr to.
Implementation notes:
1. `use_login_shells` disables special processing
If we run interactive apps, output processing will likely get
overwhelemed with the interactive output elements.
Thus we disable output processing for login shells. This makes
overwhelmed with the interactive output elements.
Thus, we disable output processing for login shells. This makes
the logging experience considerably worse, but it only degrades
to old-style logging.
@ -206,24 +224,31 @@ def _run_and_process_output(cmd,
are read-only except for return values and possible exceptions.
"""
stdin_overwrite = subprocess.PIPE
# This already should be validated in a higher place of the stack.
assert not (does_allow_interactive() and is_output_redirected()), (
"Cannot redirect output while in interactive mode.")
if does_allow_interactive() and not is_output_redirected():
stdin_overwrite = None
# See implementation note #1
if use_login_shells:
# See implementation note #1
if stdout_file is None:
stdout_file = subprocess.DEVNULL
if stderr_file is None:
stderr_file = subprocess.DEVNULL
return subprocess.check_call(
cmd,
# See implementation note #2
stdin=subprocess.PIPE,
stdin=stdin_overwrite,
stdout=stdout_file,
stderr=stderr_file)
with subprocess.Popen(
cmd,
# See implementation note #2
stdin=subprocess.PIPE,
stdin=stdin_overwrite,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1, # line buffering

View file

@ -73,6 +73,16 @@ class NodeUpdater:
cli_logger.old_info(logger, "{}Updating to {}", self.log_prefix,
self.runtime_hash)
if cmd_output_util.does_allow_interactive(
) and cmd_output_util.is_output_redirected():
# this is most probably a bug since the user has no control
# over these settings
msg = ("Output was redirected for an interactive command. "
"Either do not pass `--redirect-command-output` "
"or also pass in `--use-normal-shells`.")
cli_logger.abort(msg)
raise click.ClickException(msg)
try:
with LogTimer(self.log_prefix +
"Applied config {}".format(self.runtime_hash)):
@ -298,7 +308,8 @@ class NodeUpdater:
"See above for stderr.")
raise click.ClickException(
"Initialization command failed.")
"Initialization command failed."
) from None
else:
cli_logger.print(
"No initialization commands to run.",
@ -352,10 +363,11 @@ class NodeUpdater:
else:
env_vars = {}
try:
cmd_output_util.set_output_redirected(True)
old_redirected = cmd_output_util.is_output_redirected()
cmd_output_util.set_output_redirected(False)
self.cmd_runner.run(
cmd, environment_variables=env_vars)
cmd_output_util.set_output_redirected(True)
cmd_output_util.set_output_redirected(old_redirected)
except ProcessRunnerError as e:
if e.msg_type == "ssh_command_failed":
cli_logger.error("Failed.")

View file

@ -22,7 +22,6 @@ import ray.ray_constants as ray_constants
import ray.utils
from ray.projects.scripts import project_cli, session_cli
from ray.autoscaler.subprocess_output_util import set_output_redirected
from ray.autoscaler.cli_logger import cli_logger
import colorful as cf
@ -402,6 +401,7 @@ def start(node_ip_address, redis_address, address, redis_port, port,
cli_logger.old_style = not log_new_style
cli_logger.color_mode = log_color
cli_logger.verbosity = verbose
cli_logger.detect_colors()
if gcs_server_port and not head:
raise ValueError(
@ -768,6 +768,7 @@ def stop(force, verbose, log_new_style, log_color):
cli_logger.old_style = not log_new_style
cli_logger.color_mode = log_color
cli_logger.verbosity = verbose
cli_logger.detect_colors()
# Note that raylet needs to exit before object store, otherwise
# it cannot exit gracefully.
@ -919,8 +920,7 @@ def stop(force, verbose, log_new_style, log_color):
"--redirect-command-output",
is_flag=True,
default=False,
help=("Redirect command output to a file instead "
"of straight to the terminal."))
help="Whether to redirect command output to a file.")
@click.option(
"--use-login-shells/--use-normal-shells",
is_flag=True,
@ -936,6 +936,7 @@ def up(cluster_config_file, min_workers, max_workers, no_restart, restart_only,
cli_logger.old_style = not log_new_style
cli_logger.color_mode = log_color
cli_logger.verbosity = verbose
cli_logger.detect_colors()
if restart_only or no_restart:
cli_logger.doassert(restart_only != no_restart,
@ -1001,6 +1002,7 @@ def down(cluster_config_file, yes, workers_only, cluster_name,
cli_logger.old_style = not log_new_style
cli_logger.color_mode = log_color
cli_logger.verbosity = verbose
cli_logger.detect_colors()
teardown_cluster(cluster_config_file, yes, workers_only, cluster_name,
keep_min_workers)
@ -1053,6 +1055,7 @@ def monitor(cluster_config_file, lines, cluster_name, log_new_style, log_color,
cli_logger.old_style = not log_new_style
cli_logger.color_mode = log_color
cli_logger.verbosity = verbose
cli_logger.detect_colors()
monitor_cluster(cluster_config_file, lines, cluster_name)
@ -1074,6 +1077,11 @@ def monitor(cluster_config_file, lines, cluster_name, log_new_style, log_color,
required=False,
type=str,
help="Override the configured cluster name.")
@click.option(
"--no-config-cache",
is_flag=True,
default=False,
help="Disable the local cluster config cache.")
@click.option(
"--new", "-N", is_flag=True, help="Force creation of a new screen.")
@click.option(
@ -1084,16 +1092,25 @@ def monitor(cluster_config_file, lines, cluster_name, log_new_style, log_color,
type=int,
help="Port to forward. Use this multiple times to forward multiple ports.")
@add_click_options(logging_options)
def attach(cluster_config_file, start, screen, tmux, cluster_name, new,
port_forward, log_new_style, log_color, verbose):
def attach(cluster_config_file, start, screen, tmux, cluster_name,
no_config_cache, new, port_forward, log_new_style, log_color,
verbose):
"""Create or attach to a SSH session to a Ray cluster."""
cli_logger.old_style = not log_new_style
cli_logger.color_mode = log_color
cli_logger.verbosity = verbose
cli_logger.detect_colors()
port_forward = [(port, port) for port in list(port_forward)]
attach_cluster(cluster_config_file, start, screen, tmux, cluster_name, new,
port_forward)
attach_cluster(
cluster_config_file,
start,
screen,
tmux,
cluster_name,
no_config_cache=no_config_cache,
new=new,
port_forward=port_forward)
@cli.command()
@ -1113,6 +1130,7 @@ def rsync_down(cluster_config_file, source, target, cluster_name,
cli_logger.old_style = not log_new_style
cli_logger.color_mode = log_color
cli_logger.verbosity = verbose
cli_logger.detect_colors()
rsync(cluster_config_file, source, target, cluster_name, down=True)
@ -1140,6 +1158,7 @@ def rsync_up(cluster_config_file, source, target, cluster_name, all_nodes,
cli_logger.old_style = not log_new_style
cli_logger.color_mode = log_color
cli_logger.verbosity = verbose
cli_logger.detect_colors()
rsync(
cluster_config_file,
@ -1175,6 +1194,11 @@ def rsync_up(cluster_config_file, source, target, cluster_name, all_nodes,
required=False,
type=str,
help="Override the configured cluster name.")
@click.option(
"--no-config-cache",
is_flag=True,
default=False,
help="Disable the local cluster config cache.")
@click.option(
"--port-forward",
"-p",
@ -1191,8 +1215,8 @@ def rsync_up(cluster_config_file, source, target, cluster_name, all_nodes,
@click.argument("script_args", nargs=-1)
@add_click_options(logging_options)
def submit(cluster_config_file, screen, tmux, stop, start, cluster_name,
port_forward, script, args, script_args, log_new_style, log_color,
verbose):
no_config_cache, port_forward, script, args, script_args,
log_new_style, log_color, verbose):
"""Uploads and runs a script on the specified cluster.
The script is automatically synced to the following location:
@ -1205,8 +1229,7 @@ def submit(cluster_config_file, screen, tmux, stop, start, cluster_name,
cli_logger.old_style = not log_new_style
cli_logger.color_mode = log_color
cli_logger.verbosity = verbose
set_output_redirected(False)
cli_logger.detect_colors()
cli_logger.doassert(not (screen and tmux),
"`{}` and `{}` are incompatible.", cf.bold("--screen"),
@ -1243,12 +1266,18 @@ def submit(cluster_config_file, screen, tmux, stop, start, cluster_name,
restart_only=False,
yes=True,
override_cluster_name=cluster_name,
no_config_cache=False,
no_config_cache=no_config_cache,
redirect_command_output=False,
use_login_shells=True)
target = os.path.basename(script)
target = os.path.join("~", target)
rsync(cluster_config_file, script, target, cluster_name, down=False)
rsync(
cluster_config_file,
script,
target,
cluster_name,
no_config_cache=no_config_cache,
down=False)
command_parts = ["python", target]
if script_args:
@ -1267,6 +1296,7 @@ def submit(cluster_config_file, screen, tmux, stop, start, cluster_name,
stop=stop,
start=False,
override_cluster_name=cluster_name,
no_config_cache=no_config_cache,
port_forward=port_forward)
@ -1303,6 +1333,11 @@ def submit(cluster_config_file, screen, tmux, stop, start, cluster_name,
required=False,
type=str,
help="Override the configured cluster name.")
@click.option(
"--no-config-cache",
is_flag=True,
default=False,
help="Disable the local cluster config cache.")
@click.option(
"--port-forward",
"-p",
@ -1312,13 +1347,13 @@ def submit(cluster_config_file, screen, tmux, stop, start, cluster_name,
help="Port to forward. Use this multiple times to forward multiple ports.")
@add_click_options(logging_options)
def exec(cluster_config_file, cmd, run_env, screen, tmux, stop, start,
cluster_name, port_forward, log_new_style, log_color, verbose):
cluster_name, no_config_cache, port_forward, log_new_style, log_color,
verbose):
"""Execute a command via SSH on a Ray cluster."""
cli_logger.old_style = not log_new_style
cli_logger.color_mode = log_color
cli_logger.verbosity = verbose
set_output_redirected(False)
cli_logger.detect_colors()
port_forward = [(port, port) for port in list(port_forward)]
@ -1331,6 +1366,7 @@ def exec(cluster_config_file, cmd, run_env, screen, tmux, stop, start,
stop=stop,
start=start,
override_cluster_name=cluster_name,
no_config_cache=no_config_cache,
port_forward=port_forward)

View file

@ -22,6 +22,7 @@ py_test_module_list(
"test_basic_2.py",
"test_basic.py",
"test_cancel.py",
"test_cli.py",
"test_component_failures_2.py",
"test_component_failures_3.py",
"test_dynres.py",

View file

@ -0,0 +1,295 @@
"""
Some instructions on writing CLI tests:
1. Look at test_ray_start for a simple output test example.
2. To get a valid regex, start with copy-pasting your output from a captured
version (no formatting). Then escape ALL regex characters (parenthesis,
brackets, dots, etc.). THEN add ".+" to all the places where info might
change run to run.
3. Look at test_ray_up for an example of how to mock AWS, commands,
and autoscaler config.
4. Print your outputs!!!! Tests are impossible to debug if they fail
and you did not print anything. Since command output is captured by click,
MAKE SURE YOU print(result.output) when tests fail!!!
WARNING: IF YOU MOCK AWS, DON'T FORGET THE AWS_CREDENTIALS FIXTURE.
THIS IS REQUIRED SO BOTO3 DOES NOT ACCESS THE ACTUAL AWS SERVERS.
Note: config cache does not work with AWS mocks since the AWS resource ids are
randomized each time.
Note: while not strictly necessary for setup commands e.g. ray up,
--log-new-style produces much cleaner output if the test fails.
"""
import glob
import sys
import tempfile
import re
import os
from contextlib import contextmanager
from pathlib import Path
import pytest
import moto
from moto import mock_ec2, mock_iam
from click.testing import CliRunner
from testfixtures import Replacer
from testfixtures.popen import MockPopen, PopenBehaviour
import ray.autoscaler.aws.config as aws_config
import ray.scripts.scripts as scripts
@pytest.fixture
def configure_lang():
"""Configure output for travis + click."""
if sys.platform != "darwin":
os.environ["LC_ALL"] = "C.UTF-8"
os.environ["LANG"] = "C.UTF-8"
@pytest.fixture
def configure_aws():
"""Mocked AWS Credentials for moto."""
os.environ["LC_ALL"] = "C.UTF-8"
os.environ["LANG"] = "C.UTF-8"
os.environ["AWS_ACCESS_KEY_ID"] = "testing"
os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
os.environ["AWS_SECURITY_TOKEN"] = "testing"
os.environ["AWS_SESSION_TOKEN"] = "testing"
# moto (boto3 mock) only allows a hardcoded set of AMIs
dlami = moto.ec2.ec2_backends["us-west-2"].describe_images(
filters={"name": "Deep Learning AMI Ubuntu*"})[0].id
aws_config.DEFAULT_AMI["us-west-2"] = dlami
@pytest.fixture(scope="function")
def _unlink_test_ssh_key():
"""Use this to remove the keys spawned by ray up."""
yield
try:
for path in glob.glob(os.path.expanduser("~/.ssh/__test-cli_key*")):
os.remove(path)
except FileNotFoundError:
pass
def _debug_die(result):
print("!!!!")
print(result.output)
print("!!!!")
assert False
def _die_on_error(result):
if result.exit_code == 0:
return
_debug_die(result)
def _debug_check_line_by_line(result, expected_lines):
output_lines = result.output.split("\n")
i = 0
for out in output_lines:
print(out)
if i >= len(expected_lines):
i += 1
print("!!!!!! Expected fewer lines")
break
exp = expected_lines[i]
matched = re.fullmatch(exp + r" *", out) is not None
if not matched:
print(f"!!!!!!! Expected (regex): {repr(exp)}")
i += 1
if i < len(expected_lines):
print("!!!!!!! Expected (regex):")
for line in expected_lines[i:]:
print(repr(line))
assert False
@contextmanager
def _setup_popen_mock(commands_mock):
Popen = MockPopen()
Popen.set_default(behaviour=commands_mock)
with Replacer() as replacer:
replacer.replace("subprocess.Popen", Popen)
yield
def _load_output_pattern(name):
pattern_dir = Path(__file__).parent / "test_cli_patterns"
with open(str(pattern_dir / name)) as f:
# remove \n
return [x[:-1] for x in f.readlines()]
def _check_output_via_pattern(name, result):
expected_lines = _load_output_pattern(name)
if result.exception is not None:
print(result.output)
raise result.exception from None
expected = r" *\n".join(expected_lines) + "\n?"
if re.fullmatch(expected, result.output) is None:
_debug_check_line_by_line(result, expected_lines)
assert result.exit_code == 0
DEFAULT_TEST_CONFIG_PATH = str(
Path(__file__).parent / "test_cli_patterns" / "test_ray_up_config.yaml")
@pytest.mark.skipif(
sys.platform == "darwin" and "travis" in os.environ.get("USER", ""),
reason=("Mac builds don't provide proper locale support"))
def test_ray_start(configure_lang):
runner = CliRunner()
result = runner.invoke(
scripts.start, ["--head", "--log-new-style", "--log-color", "False"])
_die_on_error(runner.invoke(scripts.stop))
_check_output_via_pattern("test_ray_start.txt", result)
@pytest.mark.skipif(
sys.platform == "darwin" and "travis" in os.environ.get("USER", ""),
reason=("Mac builds don't provide proper locale support"))
@mock_ec2
@mock_iam
def test_ray_up(configure_lang, _unlink_test_ssh_key, configure_aws):
def commands_mock(command, stdin):
# if we want to have e.g. some commands fail,
# we can have overrides happen here.
# unfortunately, cutting out SSH prefixes and such
# is, to put it lightly, non-trivial
if "uptime" in command:
return PopenBehaviour(stdout="MOCKED uptime")
if "rsync" in command:
return PopenBehaviour(stdout="MOCKED rsync")
if "ray" in command:
return PopenBehaviour(stdout="MOCKED ray")
return PopenBehaviour(stdout="MOCKED GENERIC")
with _setup_popen_mock(commands_mock):
# config cache does not work with mocks
runner = CliRunner()
result = runner.invoke(scripts.up, [
DEFAULT_TEST_CONFIG_PATH, "--no-config-cache", "-y",
"--log-new-style", "--log-color", "False"
])
_check_output_via_pattern("test_ray_up.txt", result)
@pytest.mark.skipif(
sys.platform == "darwin" and "travis" in os.environ.get("USER", ""),
reason=("Mac builds don't provide proper locale support"))
@mock_ec2
@mock_iam
def test_ray_attach(configure_lang, configure_aws, _unlink_test_ssh_key):
def commands_mock(command, stdin):
# TODO(maximsmol): this is a hack since stdout=sys.stdout
# doesn't work with the mock for some reason
print("ubuntu@ip-.+:~$ exit")
return PopenBehaviour(stdout="ubuntu@ip-.+:~$ exit")
with _setup_popen_mock(commands_mock):
runner = CliRunner()
result = runner.invoke(scripts.up, [
DEFAULT_TEST_CONFIG_PATH, "--no-config-cache", "-y",
"--log-new-style", "--log-color", "False"
])
_die_on_error(result)
result = runner.invoke(scripts.attach, [
DEFAULT_TEST_CONFIG_PATH, "--no-config-cache", "--log-new-style",
"--log-color", "False"
])
_check_output_via_pattern("test_ray_attach.txt", result)
@pytest.mark.skipif(
sys.platform == "darwin" and "travis" in os.environ.get("USER", ""),
reason=("Mac builds don't provide proper locale support"))
@mock_ec2
@mock_iam
def test_ray_exec(configure_lang, configure_aws, _unlink_test_ssh_key):
def commands_mock(command, stdin):
# TODO(maximsmol): this is a hack since stdout=sys.stdout
# doesn't work with the mock for some reason
print("This is a test!")
return PopenBehaviour(stdout="This is a test!")
with _setup_popen_mock(commands_mock):
runner = CliRunner()
result = runner.invoke(scripts.up, [
DEFAULT_TEST_CONFIG_PATH, "--no-config-cache", "-y",
"--log-new-style", "--log-color", "False"
])
_die_on_error(result)
result = runner.invoke(scripts.exec, [
DEFAULT_TEST_CONFIG_PATH, "--no-config-cache", "--log-new-style",
"\"echo This is a test!\""
])
_check_output_via_pattern("test_ray_exec.txt", result)
# Try to check if we are running in travis. Bazel overrides and controls
# env vars, so the typical travis env-vars don't help.
# Unfortunately it will not be nice if your username is travis
# and you're running on a Mac.
@pytest.mark.skipif(
sys.platform == "darwin" and "travis" in os.environ.get("USER", ""),
reason=("Mac builds don't provide proper locale support"))
@mock_ec2
@mock_iam
def test_ray_submit(configure_lang, configure_aws, _unlink_test_ssh_key):
def commands_mock(command, stdin):
# TODO(maximsmol): this is a hack since stdout=sys.stdout
# doesn't work with the mock for some reason
if "rsync" not in command:
print("This is a test!")
return PopenBehaviour(stdout="This is a test!")
with _setup_popen_mock(commands_mock):
runner = CliRunner()
result = runner.invoke(scripts.up, [
DEFAULT_TEST_CONFIG_PATH, "--no-config-cache", "-y",
"--log-new-style", "--log-color", "False"
])
_die_on_error(result)
with tempfile.NamedTemporaryFile(suffix="test.py", mode="w") as f:
f.write("print('This is a test!')\n")
result = runner.invoke(
scripts.submit,
[
DEFAULT_TEST_CONFIG_PATH,
"--no-config-cache",
"--log-new-style",
"--log-color",
"False",
# this is somewhat misleading, since the file
# actually never gets run
# TODO(maximsmol): make this work properly one day?
f.name
])
_check_output_via_pattern("test_ray_submit.txt", result)
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

View file

@ -0,0 +1,3 @@
Bootstraping AWS config
Fetched IP: .+
ubuntu@ip-.+:~\$ exit

View file

@ -0,0 +1,3 @@
Bootstraping AWS config
Fetched IP: .+
This is a test!

View file

@ -0,0 +1,25 @@
Local node IP: .+
Available RAM
Workers: .+ GiB
Objects: .+ GiB
To adjust these values, use
ray\.init\(memory=<bytes>, object_store_memory=<bytes>\)
Dashboard URL: .+
--------------------
Ray runtime started.
--------------------
Next steps
To connect to this Ray runtime from another node, run
ray start --address='.+' --redis-password='.+'
Alternatively, use the following Python code:
import ray
ray\.init\(address='auto', redis_password='.+'\)
If connection fails, check your firewall settings other network configuration.
To terminate the Ray runtime, run
ray stop

View file

@ -0,0 +1,5 @@
Bootstraping AWS config
Fetched IP: .+
Bootstraping AWS config
Fetched IP: .+
This is a test!

View file

@ -0,0 +1,51 @@
Commands running under a login shell can produce more output than special processing can handle\.
Thus, the output from subcommands will be logged as is\.
Consider using --use-normal-shells, if you tested your workflow and it is compatible\.
Cluster configuration valid
Cluster: test-cli
Bootstraping AWS config
AWS config
IAM Profile: .+ \[default\]
EC2 Key pair \(head & workers\): .+ \[default\]
VPC Subnets \(head & workers\): subnet-.+ \[default\]
EC2 Security groups \(head & workers\): sg-.+ \[default\]
EC2 AMI \(head & workers\): ami-.+ \[dlami\]
No head node found\. Launching a new cluster\. Confirm \[y/N\]: y \[automatic, due to --yes\]
Acquiring an up-to-date head node
Launched 1 nodes \[subnet_id=subnet-.+\]
Launched instance i-.+ \[state=pending, info=pending\]
Launched a new head node
Fetching the new head node
<1/1> Setting up head node
Prepared bootstrap config
New status: waiting-for-ssh
\[1/6\] Waiting for SSH to become available
Running `uptime` as a test\.
Fetched IP: .+
Success\.
Updating cluster configuration\. \[hash=.+\]
New status: syncing-files
\[3/6\] Processing file mounts
~/tests/ from ./
\[4/6\] No worker file mounts to sync
New status: setting-up
\[3/5\] Running initialization commands
\[4/6\] Running setup commands
\(0/4\) echo a
\(1/4\) echo b
\(2/4\) echo \${echo hi}
\(3/4\) echo head
\[6/6\] Starting the Ray runtime
New status: up-to-date
Useful commands
Monitor autoscaling with
ray exec .+ 'tail -n 100 -f /tmp/ray/session_\*/logs/monitor\*'
Connect to a terminal on the cluster head
ray attach .+

View file

@ -0,0 +1,38 @@
auth:
ssh_user: ubuntu
cluster_name: test-cli
file_mounts:
~/tests: .
head_node:
ImageId: latest_dlami
InstanceType: t3a.small
head_setup_commands:
- echo head
head_start_ray_commands:
- ray stop
- ray start --head --autoscaling-config=~/ray_bootstrap_config.yaml
idle_timeout_minutes: 5
initial_workers: 1
initialization_commands:
- echo init
max_workers: 2
min_workers: 1
provider:
availability_zone: us-west-2a
key_pair:
key_name: __test-cli
region: us-west-2
type: aws
setup_commands:
- echo a
- echo b
- echo ${echo hi}
target_utilization_fraction: 0.9
worker_nodes:
ImageId: latest_dlami
InstanceType: t3a.small
worker_setup_commands:
- echo worker
worker_start_ray_commands:
- ray stop
- ray start --address=$RAY_HEAD_IP

View file

@ -49,6 +49,7 @@ gym
gym-minigrid
kubernetes
lxml
moto
mypy
networkx
numba
@ -63,5 +64,6 @@ pytest-sugar
pytest-timeout
scikit-learn==0.22.2
tensorflow
testfixtures
werkzeug
xlrd

View file

@ -3,18 +3,18 @@ ConfigSpace==0.4.10
google-api-python-client
google-oauth
h5py
hpbandster
hpbandster
hyperopt==0.1.2
ipython
keras
lightgbm
lightgbm
mlflow
nevergrad
nevergrad
oauth2client
scikit-optimize
scikit-optimize
sigopt
smart_open
tensorflow_probability
torch
torchvision
xgboost
torch
torchvision
xgboost