Make the command runner interface public (#11022)

This commit is contained in:
Ameer Haj Ali 2020-09-24 22:45:17 -07:00 committed by GitHub
parent ee85cb31a5
commit 3b6fe72029
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 84 additions and 82 deletions

View file

@ -1,6 +1,6 @@
from getpass import getuser from getpass import getuser
from shlex import quote from shlex import quote
from typing import Any, List, Tuple, Dict, Optional from typing import Dict
import click import click
import hashlib import hashlib
import json import json
@ -11,6 +11,7 @@ import sys
import time import time
import warnings import warnings
from ray.autoscaler.command_runner import CommandRunnerInterface
from ray.autoscaler._private.docker import check_bind_mounts_cmd, \ from ray.autoscaler._private.docker import check_bind_mounts_cmd, \
check_docker_running_cmd, \ check_docker_running_cmd, \
check_docker_image, \ check_docker_image, \
@ -100,84 +101,6 @@ def _with_interactive(cmd):
return ["bash", "--login", "-c", "-i", quote(force_interactive)] return ["bash", "--login", "-c", "-i", quote(force_interactive)]
class CommandRunnerInterface:
"""Interface to run commands on a remote cluster node.
Command runner instances are returned by provider.get_command_runner()."""
def run(
self,
cmd: str = None,
timeout: int = 120,
exit_on_fail: bool = False,
port_forward: List[Tuple[int, int]] = None,
with_output: bool = False,
environment_variables: Dict[str, object] = None,
run_env: str = "auto",
ssh_options_override_ssh_key: str = "",
shutdown_after_run: bool = False,
) -> str:
"""Run the given command on the cluster node and optionally get output.
WARNING: the cloudgateway needs arguments of "run" function to be json
dumpable to send them over HTTP requests.
Args:
cmd (str): The command to run.
timeout (int): The command timeout in seconds.
exit_on_fail (bool): Whether to sys exit on failure.
port_forward (list): List of (local, remote) ports to forward, or
a single tuple.
with_output (bool): Whether to return output.
environment_variables (Dict[str, str | int | Dict[str, str]):
Environment variables that `cmd` should be run with.
run_env (str): Options: docker/host/auto. Used in
DockerCommandRunner to determine the run environment.
ssh_options_override_ssh_key (str): if provided, overwrites
SSHOptions class with SSHOptions(ssh_options_override_ssh_key).
shutdown_after_run (bool): if provided, shutdowns down the machine
after executing the command with `sudo shutdown -h now`.
"""
raise NotImplementedError
def run_rsync_up(self,
source: str,
target: str,
options: Optional[Dict[str, Any]] = None) -> None:
"""Rsync files up to the cluster node.
Args:
source (str): The (local) source directory or file.
target (str): The (remote) destination path.
"""
raise NotImplementedError
def run_rsync_down(self,
source: str,
target: str,
options: Optional[Dict[str, Any]] = None) -> None:
"""Rsync files down from the cluster node.
Args:
source (str): The (remote) source directory or file.
target (str): The (local) destination path.
"""
raise NotImplementedError
def remote_shell_command_str(self) -> str:
"""Return the command the user can use to open a shell."""
raise NotImplementedError
def run_init(self, *, as_head: bool, file_mounts: Dict[str, str]) -> None:
"""Used to run extra initialization commands.
Args:
as_head (bool): Run as head image or worker.
file_mounts (dict): Files to copy to the head and worker nodes.
"""
pass
class KubernetesCommandRunner(CommandRunnerInterface): class KubernetesCommandRunner(CommandRunnerInterface):
def __init__(self, log_prefix, namespace, node_id, auth_config, def __init__(self, log_prefix, namespace, node_id, auth_config,
process_runner): process_runner):

View file

@ -0,0 +1,79 @@
from typing import Any, List, Tuple, Dict, Optional
class CommandRunnerInterface:
"""Interface to run commands on a remote cluster node.
Command runner instances are returned by provider.get_command_runner()."""
def run(
self,
cmd: str = None,
timeout: int = 120,
exit_on_fail: bool = False,
port_forward: List[Tuple[int, int]] = None,
with_output: bool = False,
environment_variables: Dict[str, object] = None,
run_env: str = "auto",
ssh_options_override_ssh_key: str = "",
shutdown_after_run: bool = False,
) -> str:
"""Run the given command on the cluster node and optionally get output.
WARNING: the cloudgateway needs arguments of "run" function to be json
dumpable to send them over HTTP requests.
Args:
cmd (str): The command to run.
timeout (int): The command timeout in seconds.
exit_on_fail (bool): Whether to sys exit on failure.
port_forward (list): List of (local, remote) ports to forward, or
a single tuple.
with_output (bool): Whether to return output.
environment_variables (Dict[str, str | int | Dict[str, str]):
Environment variables that `cmd` should be run with.
run_env (str): Options: docker/host/auto. Used in
DockerCommandRunner to determine the run environment.
ssh_options_override_ssh_key (str): if provided, overwrites
SSHOptions class with SSHOptions(ssh_options_override_ssh_key).
shutdown_after_run (bool): if provided, shutdowns down the machine
after executing the command with `sudo shutdown -h now`.
"""
raise NotImplementedError
def run_rsync_up(self,
source: str,
target: str,
options: Optional[Dict[str, Any]] = None) -> None:
"""Rsync files up to the cluster node.
Args:
source (str): The (local) source directory or file.
target (str): The (remote) destination path.
"""
raise NotImplementedError
def run_rsync_down(self,
source: str,
target: str,
options: Optional[Dict[str, Any]] = None) -> None:
"""Rsync files down from the cluster node.
Args:
source (str): The (remote) source directory or file.
target (str): The (local) destination path.
"""
raise NotImplementedError
def remote_shell_command_str(self) -> str:
"""Return the command the user can use to open a shell."""
raise NotImplementedError
def run_init(self, *, as_head: bool, file_mounts: Dict[str, str]) -> None:
"""Used to run extra initialization commands.
Args:
as_head (bool): Run as head image or worker.
file_mounts (dict): Files to copy to the head and worker nodes.
"""
pass

View file

@ -4,9 +4,9 @@ import sys
from unittest.mock import patch from unittest.mock import patch
from ray.tests.test_autoscaler import MockProvider, MockProcessRunner from ray.tests.test_autoscaler import MockProvider, MockProcessRunner
from ray.autoscaler._private.command_runner import CommandRunnerInterface, \ from ray.autoscaler.command_runner import CommandRunnerInterface
SSHCommandRunner, _with_environment_variables, DockerCommandRunner, \ from ray.autoscaler._private.command_runner import SSHCommandRunner, \
KubernetesCommandRunner DockerCommandRunner, KubernetesCommandRunner, _with_environment_variables
from ray.autoscaler._private.docker import DOCKER_MOUNT_PREFIX from ray.autoscaler._private.docker import DOCKER_MOUNT_PREFIX
from getpass import getuser from getpass import getuser
import hashlib import hashlib