2021-11-02 16:01:54 -05:00
|
|
|
import logging
|
|
|
|
from pathlib import Path
|
2021-10-23 10:48:16 -07:00
|
|
|
import sys
|
2022-03-01 19:27:09 -08:00
|
|
|
import json
|
2022-03-10 13:15:16 -08:00
|
|
|
import yaml
|
2021-11-02 16:01:54 -05:00
|
|
|
import tempfile
|
2022-02-02 14:34:34 -06:00
|
|
|
from typing import Optional
|
2021-10-23 10:48:16 -07:00
|
|
|
|
|
|
|
import pytest
|
2022-02-02 14:34:34 -06:00
|
|
|
from unittest.mock import patch
|
2021-10-23 10:48:16 -07:00
|
|
|
|
2021-11-22 15:11:04 -06:00
|
|
|
import ray
|
2022-02-09 11:55:32 -08:00
|
|
|
from ray.job_submission import JobSubmissionClient, JobStatus
|
2022-03-01 19:27:09 -08:00
|
|
|
from ray.dashboard.modules.job.common import CURRENT_VERSION, JobInfo
|
2022-03-09 21:31:23 -08:00
|
|
|
from ray.dashboard.modules.dashboard_sdk import (
|
2022-02-02 14:34:34 -06:00
|
|
|
ClusterInfo,
|
|
|
|
parse_cluster_info,
|
|
|
|
)
|
2021-10-23 10:48:16 -07:00
|
|
|
from ray.dashboard.tests.conftest import * # noqa
|
2022-02-02 14:34:34 -06:00
|
|
|
from ray.ray_constants import DEFAULT_DASHBOARD_PORT
|
2021-11-13 22:54:01 -08:00
|
|
|
from ray.tests.conftest import _ray_start
|
2022-01-29 18:41:57 -08:00
|
|
|
from ray._private.test_utils import (
|
2022-03-10 13:15:16 -08:00
|
|
|
chdir,
|
2022-01-29 18:41:57 -08:00
|
|
|
format_web_url,
|
|
|
|
wait_for_condition,
|
|
|
|
wait_until_server_available,
|
|
|
|
)
|
2021-10-23 10:48:16 -07:00
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
2021-11-13 22:54:01 -08:00
|
|
|
@pytest.fixture(scope="module")
|
2021-12-03 16:44:30 -08:00
|
|
|
def headers():
|
|
|
|
return {"Connection": "keep-alive", "Authorization": "TOK:<MY_TOKEN>"}
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
|
|
def job_sdk_client(headers):
|
2022-03-01 19:27:09 -08:00
|
|
|
with _ray_start(include_dashboard=True, num_cpus=1) as ctx:
|
|
|
|
address = ctx.address_info["webui_url"]
|
2021-11-13 22:54:01 -08:00
|
|
|
assert wait_until_server_available(address)
|
2021-12-03 16:44:30 -08:00
|
|
|
yield JobSubmissionClient(format_web_url(address), headers=headers)
|
2021-11-02 16:01:54 -05:00
|
|
|
|
|
|
|
|
2022-03-01 19:27:09 -08:00
|
|
|
# NOTE(architkulkarni): This test must be run first in order for the job
|
|
|
|
# submission history of the shared Ray runtime to be empty.
|
|
|
|
@pytest.mark.parametrize("use_sdk", [True, False])
|
|
|
|
def test_list_jobs_empty(job_sdk_client: JobSubmissionClient, use_sdk: bool):
|
|
|
|
client = job_sdk_client
|
|
|
|
|
|
|
|
if use_sdk:
|
|
|
|
assert client.list_jobs() == dict()
|
|
|
|
else:
|
|
|
|
r = client._do_request(
|
|
|
|
"GET",
|
|
|
|
"/api/jobs/",
|
|
|
|
)
|
|
|
|
|
|
|
|
assert r.status_code == 200
|
|
|
|
assert json.loads(r.text) == dict()
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize("use_sdk", [True, False])
|
|
|
|
def test_list_jobs(job_sdk_client: JobSubmissionClient, use_sdk: bool):
|
|
|
|
client = job_sdk_client
|
|
|
|
|
|
|
|
runtime_env = {"env_vars": {"TEST": "123"}}
|
|
|
|
metadata = {"foo": "bar"}
|
|
|
|
job_id = client.submit_job(
|
|
|
|
entrypoint="echo hello", runtime_env=runtime_env, metadata=metadata
|
|
|
|
)
|
|
|
|
|
|
|
|
wait_for_condition(_check_job_succeeded, client=client, job_id=job_id)
|
|
|
|
if use_sdk:
|
|
|
|
info: JobInfo = client.list_jobs()[job_id]
|
|
|
|
else:
|
|
|
|
r = client._do_request(
|
|
|
|
"GET",
|
|
|
|
"/api/jobs/",
|
|
|
|
)
|
|
|
|
|
|
|
|
assert r.status_code == 200
|
|
|
|
jobs_info_json = json.loads(r.text)
|
|
|
|
info_json = jobs_info_json[job_id]
|
|
|
|
info = JobInfo(**info_json)
|
|
|
|
|
|
|
|
assert info.status == JobStatus.SUCCEEDED
|
|
|
|
assert info.message is not None
|
|
|
|
assert info.end_time >= info.start_time
|
|
|
|
assert info.runtime_env == runtime_env
|
|
|
|
assert info.metadata == metadata
|
|
|
|
|
|
|
|
|
2021-11-02 16:01:54 -05:00
|
|
|
def _check_job_succeeded(client: JobSubmissionClient, job_id: str) -> bool:
|
|
|
|
status = client.get_job_status(job_id)
|
2022-02-18 07:54:37 -08:00
|
|
|
if status == JobStatus.FAILED:
|
2021-11-09 22:34:12 -08:00
|
|
|
logs = client.get_job_logs(job_id)
|
|
|
|
raise RuntimeError(f"Job failed\nlogs:\n{logs}")
|
2022-02-18 07:54:37 -08:00
|
|
|
return status == JobStatus.SUCCEEDED
|
2021-11-02 16:01:54 -05:00
|
|
|
|
|
|
|
|
2021-11-06 10:37:54 -07:00
|
|
|
def _check_job_failed(client: JobSubmissionClient, job_id: str) -> bool:
|
|
|
|
status = client.get_job_status(job_id)
|
2022-02-18 07:54:37 -08:00
|
|
|
return status == JobStatus.FAILED
|
2021-11-06 10:37:54 -07:00
|
|
|
|
|
|
|
|
|
|
|
def _check_job_stopped(client: JobSubmissionClient, job_id: str) -> bool:
|
|
|
|
status = client.get_job_status(job_id)
|
2022-02-18 07:54:37 -08:00
|
|
|
return status == JobStatus.STOPPED
|
2021-11-06 10:37:54 -07:00
|
|
|
|
|
|
|
|
2021-11-02 16:01:54 -05:00
|
|
|
@pytest.fixture(
|
2022-03-10 09:42:25 -08:00
|
|
|
scope="module",
|
|
|
|
params=[
|
|
|
|
"no_working_dir",
|
|
|
|
"local_working_dir",
|
|
|
|
"s3_working_dir",
|
2022-03-10 13:15:16 -08:00
|
|
|
"pip_txt",
|
|
|
|
"conda_yaml",
|
2022-03-10 09:42:25 -08:00
|
|
|
"local_py_modules",
|
|
|
|
],
|
2022-01-29 18:41:57 -08:00
|
|
|
)
|
2022-03-10 09:42:25 -08:00
|
|
|
def runtime_env_option(request):
|
2022-03-10 13:15:16 -08:00
|
|
|
driver_script = """
|
|
|
|
import ray
|
|
|
|
ray.init(address="auto")
|
|
|
|
|
|
|
|
@ray.remote
|
|
|
|
def f():
|
|
|
|
import pip_install_test
|
|
|
|
|
|
|
|
ray.get(f.remote())
|
|
|
|
"""
|
2021-11-02 16:01:54 -05:00
|
|
|
if request.param == "no_working_dir":
|
|
|
|
yield {
|
|
|
|
"runtime_env": {},
|
|
|
|
"entrypoint": "echo hello",
|
2021-11-09 22:34:12 -08:00
|
|
|
"expected_logs": "hello\n",
|
2021-11-02 16:01:54 -05:00
|
|
|
}
|
2022-03-10 09:42:25 -08:00
|
|
|
elif request.param == "local_working_dir" or request.param == "local_py_modules":
|
2021-11-02 16:01:54 -05:00
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
|
|
path = Path(tmp_dir)
|
|
|
|
|
|
|
|
hello_file = path / "test.py"
|
|
|
|
with hello_file.open(mode="w") as f:
|
|
|
|
f.write("from test_module import run_test\n")
|
|
|
|
f.write("print(run_test())")
|
|
|
|
|
|
|
|
module_path = path / "test_module"
|
|
|
|
module_path.mkdir(parents=True)
|
|
|
|
|
|
|
|
test_file = module_path / "test.py"
|
|
|
|
with test_file.open(mode="w") as f:
|
|
|
|
f.write("def run_test():\n")
|
2021-12-03 16:44:30 -08:00
|
|
|
f.write(" return 'Hello from test_module!'\n") # noqa: Q000
|
2021-11-02 16:01:54 -05:00
|
|
|
|
|
|
|
init_file = module_path / "__init__.py"
|
|
|
|
with init_file.open(mode="w") as f:
|
|
|
|
f.write("from test_module.test import run_test\n")
|
|
|
|
|
2022-03-10 09:42:25 -08:00
|
|
|
if request.param == "local_working_dir":
|
|
|
|
yield {
|
|
|
|
"runtime_env": {"working_dir": tmp_dir},
|
|
|
|
"entrypoint": "python test.py",
|
|
|
|
"expected_logs": "Hello from test_module!\n",
|
|
|
|
}
|
|
|
|
elif request.param == "local_py_modules":
|
|
|
|
yield {
|
|
|
|
"runtime_env": {"py_modules": [str(Path(tmp_dir) / "test_module")]},
|
|
|
|
"entrypoint": (
|
|
|
|
"python -c 'import test_module;"
|
|
|
|
"print(test_module.run_test())'"
|
|
|
|
),
|
|
|
|
"expected_logs": "Hello from test_module!\n",
|
|
|
|
}
|
|
|
|
else:
|
|
|
|
raise ValueError(f"Unexpected pytest fixture option {request.param}")
|
2021-11-02 16:01:54 -05:00
|
|
|
elif request.param == "s3_working_dir":
|
|
|
|
yield {
|
|
|
|
"runtime_env": {
|
2021-11-14 02:16:45 -08:00
|
|
|
"working_dir": "s3://runtime-env-test/script_runtime_env.zip",
|
2021-11-02 16:01:54 -05:00
|
|
|
},
|
|
|
|
"entrypoint": "python script.py",
|
2021-11-09 22:34:12 -08:00
|
|
|
"expected_logs": "Executing main() from script.py !!\n",
|
2021-11-02 16:01:54 -05:00
|
|
|
}
|
2022-03-10 13:15:16 -08:00
|
|
|
elif request.param == "pip_txt":
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir, chdir(tmpdir):
|
|
|
|
pip_list = ["pip-install-test==0.5"]
|
|
|
|
relative_filepath = "requirements.txt"
|
|
|
|
pip_file = Path(relative_filepath)
|
|
|
|
pip_file.write_text("\n".join(pip_list))
|
2022-03-14 23:41:19 +08:00
|
|
|
runtime_env = {"pip": {"packages": relative_filepath, "pip_check": False}}
|
2022-03-10 13:15:16 -08:00
|
|
|
yield {
|
|
|
|
"runtime_env": runtime_env,
|
|
|
|
"entrypoint": f"python -c '{driver_script}'",
|
|
|
|
# TODO(architkulkarni): Uncomment after #22968 is fixed.
|
|
|
|
# "entrypoint": "python -c 'import pip_install_test'",
|
|
|
|
"expected_logs": "Good job! You installed a pip module.",
|
|
|
|
}
|
|
|
|
elif request.param == "conda_yaml":
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir, chdir(tmpdir):
|
|
|
|
conda_dict = {"dependencies": ["pip", {"pip": ["pip-install-test==0.5"]}]}
|
|
|
|
relative_filepath = "environment.yml"
|
|
|
|
conda_file = Path(relative_filepath)
|
|
|
|
conda_file.write_text(yaml.dump(conda_dict))
|
|
|
|
runtime_env = {"conda": relative_filepath}
|
|
|
|
|
|
|
|
yield {
|
|
|
|
"runtime_env": runtime_env,
|
|
|
|
"entrypoint": f"python -c '{driver_script}'",
|
|
|
|
# TODO(architkulkarni): Uncomment after #22968 is fixed.
|
|
|
|
# "entrypoint": "python -c 'import pip_install_test'",
|
|
|
|
"expected_logs": "Good job! You installed a pip module.",
|
|
|
|
}
|
2021-11-02 16:01:54 -05:00
|
|
|
else:
|
|
|
|
assert False, f"Unrecognized option: {request.param}."
|
|
|
|
|
|
|
|
|
2022-03-10 13:15:16 -08:00
|
|
|
def test_submit_job(job_sdk_client, runtime_env_option, monkeypatch):
|
|
|
|
# This flag allows for local testing of runtime env conda functionality
|
|
|
|
# without needing a built Ray wheel. Rather than insert the link to the
|
|
|
|
# wheel into the conda spec, it links to the current Python site.
|
|
|
|
monkeypatch.setenv("RAY_RUNTIME_ENV_LOCAL_DEV_MODE", "1")
|
|
|
|
|
2021-11-02 16:01:54 -05:00
|
|
|
client = job_sdk_client
|
|
|
|
|
|
|
|
job_id = client.submit_job(
|
2022-03-10 09:42:25 -08:00
|
|
|
entrypoint=runtime_env_option["entrypoint"],
|
|
|
|
runtime_env=runtime_env_option["runtime_env"],
|
2022-01-29 18:41:57 -08:00
|
|
|
)
|
2021-11-02 16:01:54 -05:00
|
|
|
|
2022-03-10 13:15:16 -08:00
|
|
|
wait_for_condition(_check_job_succeeded, client=client, job_id=job_id, timeout=120)
|
2021-11-02 16:01:54 -05:00
|
|
|
|
2021-11-09 22:34:12 -08:00
|
|
|
logs = client.get_job_logs(job_id)
|
2022-03-10 13:15:16 -08:00
|
|
|
assert runtime_env_option["expected_logs"] in logs
|
2021-11-02 16:01:54 -05:00
|
|
|
|
|
|
|
|
2021-11-06 10:37:54 -07:00
|
|
|
def test_http_bad_request(job_sdk_client):
|
|
|
|
"""
|
|
|
|
Send bad requests to job http server and ensure right return code and
|
|
|
|
error message is returned via http.
|
|
|
|
"""
|
|
|
|
client = job_sdk_client
|
|
|
|
|
|
|
|
# 400 - HTTPBadRequest
|
2021-11-13 22:54:01 -08:00
|
|
|
r = client._do_request(
|
|
|
|
"POST",
|
|
|
|
"/api/jobs/",
|
|
|
|
json_data={"key": "baaaad request"},
|
|
|
|
)
|
|
|
|
|
|
|
|
assert r.status_code == 400
|
|
|
|
assert "TypeError: __init__() got an unexpected keyword argument" in r.text
|
2021-11-06 10:37:54 -07:00
|
|
|
|
2021-11-18 10:15:23 -06:00
|
|
|
|
|
|
|
def test_invalid_runtime_env(job_sdk_client):
|
|
|
|
client = job_sdk_client
|
2022-03-10 09:42:25 -08:00
|
|
|
with pytest.raises(ValueError, match="Only .zip files supported"):
|
|
|
|
client.submit_job(
|
|
|
|
entrypoint="echo hello", runtime_env={"working_dir": "s3://not_a_zip"}
|
|
|
|
)
|
2021-11-18 10:15:23 -06:00
|
|
|
|
|
|
|
|
|
|
|
def test_runtime_env_setup_failure(job_sdk_client):
|
|
|
|
client = job_sdk_client
|
|
|
|
job_id = client.submit_job(
|
2022-01-29 18:41:57 -08:00
|
|
|
entrypoint="echo hello", runtime_env={"working_dir": "s3://does_not_exist.zip"}
|
|
|
|
)
|
2021-11-18 10:15:23 -06:00
|
|
|
|
|
|
|
wait_for_condition(_check_job_failed, client=client, job_id=job_id)
|
2022-02-22 16:18:16 -06:00
|
|
|
data = client.get_job_info(job_id)
|
2022-02-18 07:54:37 -08:00
|
|
|
assert "Failed to setup runtime environment" in data.message
|
2021-11-06 10:37:54 -07:00
|
|
|
|
|
|
|
|
|
|
|
def test_submit_job_with_exception_in_driver(job_sdk_client):
|
|
|
|
"""
|
|
|
|
Submit a job that's expected to throw exception while executing.
|
|
|
|
"""
|
|
|
|
client = job_sdk_client
|
|
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
|
|
path = Path(tmp_dir)
|
|
|
|
driver_script = """
|
|
|
|
print('Hello !')
|
|
|
|
raise RuntimeError('Intentionally failed.')
|
|
|
|
"""
|
|
|
|
test_script_file = path / "test_script.py"
|
|
|
|
with open(test_script_file, "w+") as file:
|
|
|
|
file.write(driver_script)
|
|
|
|
|
|
|
|
job_id = client.submit_job(
|
2022-01-29 18:41:57 -08:00
|
|
|
entrypoint="python test_script.py", runtime_env={"working_dir": tmp_dir}
|
|
|
|
)
|
2021-11-06 10:37:54 -07:00
|
|
|
|
|
|
|
wait_for_condition(_check_job_failed, client=client, job_id=job_id)
|
2021-11-09 22:34:12 -08:00
|
|
|
logs = client.get_job_logs(job_id)
|
|
|
|
assert "Hello !" in logs
|
|
|
|
assert "RuntimeError: Intentionally failed." in logs
|
2021-11-06 10:37:54 -07:00
|
|
|
|
|
|
|
|
|
|
|
def test_stop_long_running_job(job_sdk_client):
|
|
|
|
"""
|
|
|
|
Submit a job that runs for a while and stop it in the middle.
|
|
|
|
"""
|
|
|
|
client = job_sdk_client
|
|
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
|
|
path = Path(tmp_dir)
|
|
|
|
driver_script = """
|
|
|
|
print('Hello !')
|
|
|
|
import time
|
|
|
|
time.sleep(300) # This should never finish
|
|
|
|
raise RuntimeError('Intentionally failed.')
|
|
|
|
"""
|
|
|
|
test_script_file = path / "test_script.py"
|
|
|
|
with open(test_script_file, "w+") as file:
|
|
|
|
file.write(driver_script)
|
|
|
|
|
|
|
|
job_id = client.submit_job(
|
2022-01-29 18:41:57 -08:00
|
|
|
entrypoint="python test_script.py", runtime_env={"working_dir": tmp_dir}
|
|
|
|
)
|
2021-11-06 10:37:54 -07:00
|
|
|
assert client.stop_job(job_id) is True
|
|
|
|
wait_for_condition(_check_job_stopped, client=client, job_id=job_id)
|
|
|
|
|
|
|
|
|
2021-11-02 16:01:54 -05:00
|
|
|
def test_job_metadata(job_sdk_client):
|
|
|
|
client = job_sdk_client
|
2021-10-28 16:40:03 -05:00
|
|
|
|
|
|
|
print_metadata_cmd = (
|
2022-01-29 18:41:57 -08:00
|
|
|
'python -c"'
|
2021-10-28 16:40:03 -05:00
|
|
|
"import ray;"
|
|
|
|
"ray.init();"
|
|
|
|
"job_config=ray.worker.global_worker.core_worker.get_job_config();"
|
|
|
|
"print(dict(sorted(job_config.metadata.items())))"
|
2022-01-29 18:41:57 -08:00
|
|
|
'"'
|
|
|
|
)
|
2021-10-28 16:40:03 -05:00
|
|
|
|
2021-11-02 16:01:54 -05:00
|
|
|
job_id = client.submit_job(
|
2022-01-29 18:41:57 -08:00
|
|
|
entrypoint=print_metadata_cmd, metadata={"key1": "val1", "key2": "val2"}
|
|
|
|
)
|
2021-11-02 16:01:54 -05:00
|
|
|
|
|
|
|
wait_for_condition(_check_job_succeeded, client=client, job_id=job_id)
|
|
|
|
|
2022-01-29 18:41:57 -08:00
|
|
|
assert (
|
|
|
|
str(
|
|
|
|
{
|
|
|
|
"job_name": job_id,
|
|
|
|
"job_submission_id": job_id,
|
|
|
|
"key1": "val1",
|
|
|
|
"key2": "val2",
|
|
|
|
}
|
|
|
|
)
|
|
|
|
in client.get_job_logs(job_id)
|
|
|
|
)
|
2021-10-28 16:40:03 -05:00
|
|
|
|
|
|
|
|
2021-11-08 23:10:27 -08:00
|
|
|
def test_pass_job_id(job_sdk_client):
|
|
|
|
client = job_sdk_client
|
|
|
|
|
|
|
|
job_id = "my_custom_id"
|
|
|
|
returned_id = client.submit_job(entrypoint="echo hello", job_id=job_id)
|
|
|
|
|
|
|
|
assert returned_id == job_id
|
|
|
|
wait_for_condition(_check_job_succeeded, client=client, job_id=returned_id)
|
|
|
|
|
|
|
|
# Test that a duplicate job_id is rejected.
|
|
|
|
with pytest.raises(Exception, match=f"{job_id} already exists"):
|
|
|
|
returned_id = client.submit_job(entrypoint="echo hello", job_id=job_id)
|
|
|
|
|
|
|
|
|
|
|
|
def test_nonexistent_job(job_sdk_client):
|
|
|
|
client = job_sdk_client
|
|
|
|
|
2021-11-13 22:54:01 -08:00
|
|
|
with pytest.raises(RuntimeError, match="nonexistent_job does not exist"):
|
|
|
|
client.get_job_status("nonexistent_job")
|
|
|
|
|
|
|
|
|
|
|
|
def test_submit_optional_args(job_sdk_client):
|
|
|
|
"""Check that job_id, runtime_env, and metadata are optional."""
|
|
|
|
client = job_sdk_client
|
|
|
|
|
|
|
|
r = client._do_request(
|
|
|
|
"POST",
|
|
|
|
"/api/jobs/",
|
|
|
|
json_data={"entrypoint": "ls"},
|
|
|
|
)
|
|
|
|
|
2022-01-29 18:41:57 -08:00
|
|
|
wait_for_condition(_check_job_succeeded, client=client, job_id=r.json()["job_id"])
|
2021-11-13 22:54:01 -08:00
|
|
|
|
|
|
|
|
|
|
|
def test_missing_resources(job_sdk_client):
|
|
|
|
"""Check that 404s are raised for resources that don't exist."""
|
|
|
|
client = job_sdk_client
|
|
|
|
|
2022-01-29 18:41:57 -08:00
|
|
|
conditions = [
|
|
|
|
("GET", "/api/jobs/fake_job_id"),
|
|
|
|
("GET", "/api/jobs/fake_job_id/logs"),
|
|
|
|
("POST", "/api/jobs/fake_job_id/stop"),
|
|
|
|
("GET", "/api/packages/fake_package_uri"),
|
|
|
|
]
|
2021-11-13 22:54:01 -08:00
|
|
|
|
|
|
|
for method, route in conditions:
|
|
|
|
assert client._do_request(method, route).status_code == 404
|
2021-11-08 23:10:27 -08:00
|
|
|
|
|
|
|
|
2021-11-22 15:11:04 -06:00
|
|
|
def test_version_endpoint(job_sdk_client):
|
|
|
|
client = job_sdk_client
|
|
|
|
|
|
|
|
r = client._do_request("GET", "/api/version")
|
|
|
|
assert r.status_code == 200
|
|
|
|
assert r.json() == {
|
|
|
|
"version": CURRENT_VERSION,
|
|
|
|
"ray_version": ray.__version__,
|
2022-01-29 18:41:57 -08:00
|
|
|
"ray_commit": ray.__commit__,
|
2021-11-22 15:11:04 -06:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-12-03 16:44:30 -08:00
|
|
|
def test_request_headers(job_sdk_client):
|
|
|
|
client = job_sdk_client
|
|
|
|
|
|
|
|
with patch("requests.request") as mock_request:
|
|
|
|
_ = client._do_request(
|
|
|
|
"POST",
|
|
|
|
"/api/jobs/",
|
|
|
|
json_data={"entrypoint": "ls"},
|
|
|
|
)
|
|
|
|
mock_request.assert_called_with(
|
|
|
|
"POST",
|
|
|
|
"http://127.0.0.1:8265/api/jobs/",
|
|
|
|
cookies=None,
|
|
|
|
data=None,
|
|
|
|
json={"entrypoint": "ls"},
|
2022-01-29 18:41:57 -08:00
|
|
|
headers={"Connection": "keep-alive", "Authorization": "TOK:<MY_TOKEN>"},
|
|
|
|
)
|
2021-12-03 16:44:30 -08:00
|
|
|
|
|
|
|
|
2022-02-02 14:34:34 -06:00
|
|
|
@pytest.mark.parametrize("scheme", ["http", "https", "ray", "fake_module"])
|
|
|
|
@pytest.mark.parametrize("host", ["127.0.0.1", "localhost", "fake.dns.name"])
|
|
|
|
@pytest.mark.parametrize("port", [None, 8265, 10000])
|
|
|
|
def test_parse_cluster_info(scheme: str, host: str, port: Optional[int]):
|
|
|
|
address = f"{scheme}://{host}"
|
|
|
|
if port is not None:
|
|
|
|
address += f":{port}"
|
|
|
|
|
|
|
|
final_port = port if port is not None else DEFAULT_DASHBOARD_PORT
|
|
|
|
if scheme in {"http", "ray"}:
|
2021-11-13 16:24:02 -08:00
|
|
|
assert parse_cluster_info(address, False) == ClusterInfo(
|
2022-02-02 14:34:34 -06:00
|
|
|
address=f"http://{host}:{final_port}",
|
2021-11-13 16:24:02 -08:00
|
|
|
cookies=None,
|
2021-12-03 16:44:30 -08:00
|
|
|
metadata=None,
|
2022-01-29 18:41:57 -08:00
|
|
|
headers=None,
|
|
|
|
)
|
2022-02-02 14:34:34 -06:00
|
|
|
elif scheme == "https":
|
2021-11-13 16:24:02 -08:00
|
|
|
assert parse_cluster_info(address, False) == ClusterInfo(
|
2022-02-02 14:34:34 -06:00
|
|
|
address=f"https://{host}:{final_port}",
|
|
|
|
cookies=None,
|
|
|
|
metadata=None,
|
|
|
|
headers=None,
|
2022-01-29 18:41:57 -08:00
|
|
|
)
|
2021-11-13 16:24:02 -08:00
|
|
|
else:
|
|
|
|
with pytest.raises(RuntimeError):
|
|
|
|
parse_cluster_info(address, False)
|
|
|
|
|
|
|
|
|
2021-12-14 17:01:53 -08:00
|
|
|
@pytest.mark.asyncio
|
|
|
|
async def test_tail_job_logs(job_sdk_client):
|
|
|
|
client = job_sdk_client
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
|
|
path = Path(tmp_dir)
|
|
|
|
driver_script = """
|
|
|
|
import time
|
|
|
|
for i in range(100):
|
|
|
|
print("Hello", i)
|
|
|
|
time.sleep(0.1)
|
|
|
|
"""
|
|
|
|
test_script_file = path / "test_script.py"
|
|
|
|
with open(test_script_file, "w+") as f:
|
|
|
|
f.write(driver_script)
|
|
|
|
|
|
|
|
job_id = client.submit_job(
|
2022-01-29 18:41:57 -08:00
|
|
|
entrypoint="python test_script.py", runtime_env={"working_dir": tmp_dir}
|
|
|
|
)
|
2021-12-14 17:01:53 -08:00
|
|
|
|
|
|
|
i = 0
|
|
|
|
async for lines in client.tail_job_logs(job_id):
|
|
|
|
print(lines, end="")
|
|
|
|
for line in lines.strip().split("\n"):
|
|
|
|
assert line.split(" ") == ["Hello", str(i)]
|
|
|
|
i += 1
|
|
|
|
|
|
|
|
wait_for_condition(_check_job_succeeded, client=client, job_id=job_id)
|
|
|
|
|
|
|
|
|
2021-10-23 10:48:16 -07:00
|
|
|
if __name__ == "__main__":
|
|
|
|
sys.exit(pytest.main(["-v", __file__]))
|