[Projects] Add "session execute" (#5681)

This commit is contained in:
Philipp Moritz 2019-09-11 00:50:05 -07:00 committed by GitHub
parent 336aef1774
commit 9ce6dd9b88
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 178 additions and 98 deletions

View file

@ -145,6 +145,120 @@ def load_project_or_throw():
"`ray project validate` to inspect the error.")
class SessionRunner(object):
"""Class for setting up a session and executing commands in it."""
def __init__(self):
"""Initialize session runner and try to parse the command arguments.
Raises:
click.ClickException: This exception is raised if any error occurs.
"""
self.project_definition = load_project_or_throw()
# Check for features we don't support right now
project_environment = self.project_definition.config["environment"]
need_docker = ("dockerfile" in project_environment
or "dockerimage" in project_environment)
if need_docker:
raise click.ClickException(
"Docker support in session is currently not implemented.")
def create_cluster(self):
"""Create a cluster that will run the session."""
create_or_update_cluster(
config_file=self.project_definition.cluster_yaml(),
override_min_workers=None,
override_max_workers=None,
no_restart=False,
restart_only=False,
yes=True,
override_cluster_name=None,
)
def sync_files(self):
"""Synchronize files with the session."""
rsync(
self.project_definition.cluster_yaml(),
source=self.project_definition.root,
target=self.project_definition.working_directory(),
override_cluster_name=None,
down=False,
)
def setup_environment(self):
"""Set up the environment of the session."""
project_environment = self.project_definition.config["environment"]
if "requirements" in project_environment:
requirements_txt = project_environment["requirements"]
# Create a temporary requirements_txt in the head node.
remote_requirements_txt = (
"/tmp/" + "ray_project_requirements_txt_{}".format(
time.time()))
rsync(
self.project_definition.cluster_yaml(),
source=requirements_txt,
target=remote_requirements_txt,
override_cluster_name=None,
down=False,
)
self.execute_command(
"pip install -r {}".format(remote_requirements_txt))
if "shell" in project_environment:
for cmd in project_environment["shell"]:
self.execute_command(cmd)
def format_command(self, command, args, shell):
"""Validate and format a session command.
Args:
command (str, optional): Command from the project definition's
commands section to run, if any.
args (list): Arguments for the command to run.
shell (bool): If true, command is a shell command that should be
run directly.
Returns:
The formatted shell command to run.
Raises:
click.ClickException: This exception is raised if any error occurs.
"""
if shell:
return command
else:
try:
return self.project_definition.get_command_to_run(
command=command, args=args)
except ValueError as e:
raise click.ClickException(e)
def execute_command(self, cmd):
"""Execute a shell command in the session.
Args:
cmd (str): Shell command to run in the session. It will be
run in the working directory of the project.
"""
cwd = self.project_definition.working_directory()
cmd = "cd {cwd}; {cmd}".format(cwd=cwd, cmd=cmd)
exec_cluster(
config_file=self.project_definition.cluster_yaml(),
cmd=cmd,
docker=False,
screen=False,
tmux=False,
stop=False,
start=False,
override_cluster_name=None,
port_forward=None,
)
@session_cli.command(help="Attach to an existing cluster")
def attach():
project_definition = load_project_or_throw()
@ -168,6 +282,7 @@ def stop():
@session_cli.command(
name="start",
context_settings=dict(ignore_unknown_options=True, ),
help="Start a session based on current project config")
@click.argument("command", required=False)
@ -178,99 +293,41 @@ def stop():
"If set, run the command as a raw shell command instead of looking up "
"the command in the project config"),
is_flag=True)
def start(command, args, shell):
project_definition = load_project_or_throw()
if shell:
command_to_run = command
def session_start(command, args, shell):
runner = SessionRunner()
if shell or command:
# Get the actual command to run.
cmd = runner.format_command(command, args, shell)
num_steps = 4
else:
try:
command_to_run = project_definition.get_command_to_run(
command=command, args=args)
except ValueError as e:
raise click.ClickException(e)
num_steps = 3
# Check for features we don't support right now
project_environment = project_definition.config["environment"]
need_docker = ("dockerfile" in project_environment
or "dockerimage" in project_environment)
if need_docker:
raise click.ClickException(
"Docker support in session is currently not implemented. "
"Please file an feature request at"
"https://github.com/ray-project/ray/issues")
logger.info("[1/{}] Creating cluster".format(num_steps))
runner.create_cluster()
logger.info("[2/{}] Syncing the project".format(num_steps))
runner.sync_files()
logger.info("[3/{}] Setting up environment".format(num_steps))
runner.setup_environment()
logger.info("[1/4] Creating cluster")
create_or_update_cluster(
config_file=project_definition.cluster_yaml(),
override_min_workers=None,
override_max_workers=None,
no_restart=False,
restart_only=False,
yes=True,
override_cluster_name=None,
)
logger.info("[2/4] Syncing the project")
rsync(
project_definition.cluster_yaml(),
source=project_definition.root,
target=project_definition.working_directory(),
override_cluster_name=None,
down=False,
)
logger.info("[3/4] Setting up environment")
_setup_environment(
project_definition.cluster_yaml(),
project_environment,
cwd=project_definition.working_directory())
logger.info("[4/4] Running command")
logger.debug("Running {}".format(command))
session_exec_cluster(
project_definition.cluster_yaml(),
command_to_run,
cwd=project_definition.working_directory())
if shell or command:
# Run the actual command.
logger.info("[4/4] Running command")
runner.execute_command(cmd)
def session_exec_cluster(cluster_yaml, cmd, cwd=None):
if cwd is not None:
cmd = "cd {cwd}; {cmd}".format(cwd=cwd, cmd=cmd)
exec_cluster(
config_file=cluster_yaml,
cmd=cmd,
docker=False,
screen=False,
tmux=False,
stop=False,
start=False,
override_cluster_name=None,
port_forward=None,
)
def _setup_environment(cluster_yaml, project_environment, cwd):
if "requirements" in project_environment:
requirements_txt = project_environment["requirements"]
# Create a temporary requirements_txt in the head node.
remote_requirements_txt = (
"/tmp/" + "ray_project_requirements_txt_{}".format(time.time()))
rsync(
cluster_yaml,
source=requirements_txt,
target=remote_requirements_txt,
override_cluster_name=None,
down=False,
)
session_exec_cluster(
cluster_yaml,
"pip install -r {}".format(remote_requirements_txt),
cwd=cwd)
if "shell" in project_environment:
for cmd in project_environment["shell"]:
session_exec_cluster(cluster_yaml, cmd, cwd=cwd)
@session_cli.command(
name="execute",
context_settings=dict(ignore_unknown_options=True, ),
help="Execute a command in a session")
@click.argument("command", required=False)
@click.argument("args", nargs=-1, type=click.UNPROCESSED)
@click.option(
"--shell",
help=(
"If set, run the command as a raw shell command instead of looking up "
"the command in the project config"),
is_flag=True)
def session_execute(command, args, shell):
runner = SessionRunner()
cmd = runner.format_command(command, args, shell)
runner.execute_command(cmd)

View file

@ -12,7 +12,7 @@ import sys
from contextlib import contextmanager
from ray.projects.scripts import start
from ray.projects.scripts import session_start, session_execute
import ray
if sys.version_info >= (3, 3):
@ -101,7 +101,7 @@ def run_test_project(project_dir, command, args):
def test_session_start_default_project():
result, mock_calls, test_dir = run_test_project(
"session-tests/project-pass", start, [])
"session-tests/project-pass", session_start, ["default"])
loaded_project = ray.projects.ProjectDefinition(test_dir)
assert result.exit_code == 0
@ -143,9 +143,32 @@ def test_session_start_default_project():
assert expected_commands == commands_executed
def test_session_execute_default_project():
result, mock_calls, test_dir = run_test_project(
"session-tests/project-pass", session_execute, ["default"])
loaded_project = ray.projects.ProjectDefinition(test_dir)
assert result.exit_code == 0
assert mock_calls["rsync"].call_count == 0
assert mock_calls["create_or_update_cluster"].call_count == 0
exec_cluster_call = mock_calls["exec_cluster"]
commands_executed = []
for _, kwargs in exec_cluster_call.call_args_list:
commands_executed.append(kwargs["cmd"].replace(
"cd {}; ".format(loaded_project.working_directory()), ""))
expected_commands = [
command["command"] for command in loaded_project.config["commands"]
]
assert expected_commands == commands_executed
def test_session_start_docker_fail():
result, _, _ = run_test_project("session-tests/with-docker-fail", start,
[])
result, _, _ = run_test_project("session-tests/with-docker-fail",
session_start, [])
assert result.exit_code == 1
assert ("Docker support in session is currently "
@ -153,8 +176,8 @@ def test_session_start_docker_fail():
def test_session_invalid_config_errored():
result, _, _ = run_test_project("session-tests/invalid-config-fail", start,
[])
result, _, _ = run_test_project("session-tests/invalid-config-fail",
session_start, [])
assert result.exit_code == 1
assert "validation failed" in result.output
@ -164,7 +187,7 @@ def test_session_invalid_config_errored():
def test_session_create_command():
result, mock_calls, test_dir = run_test_project(
"session-tests/commands-test", start,
"session-tests/commands-test", session_start,
["first", "--a", "1", "--b", "2"])
# Verify the project can be loaded.