ray/dashboard/modules/snapshot/tests/test_job_submission.py
Archit Kulkarni 50e2bef9d0
[Jobs] Hide dashboard from Job Submission import path (#22223)
For public SDK APIs, change the import path from 

```python
from ray.dashboard.modules.job.common import JobStatus, JobStatusInfo
from ray.dashboard.modules.job.sdk import JobSubmissionClient
```

to 
```python
from ray.job_submission import JobStatus, JobSubmissionClient
```

`JobStatus`, `JobStatusInfo` and `JobSubmissionClient` were the only names referenced in the SDK doc so far, but we can add more later as they appear.
2022-02-09 13:55:32 -06:00

98 lines
2.9 KiB
Python

import logging
import os
import sys
import json
import jsonschema
import pprint
import pytest
import requests
from ray._private.test_utils import (
format_web_url,
wait_for_condition,
wait_until_server_available,
)
from ray.dashboard import dashboard
from ray.dashboard.tests.conftest import * # noqa
from ray.job_submission import JobSubmissionClient
logger = logging.getLogger(__name__)
def _get_snapshot(address: str):
response = requests.get(f"{address}/api/snapshot")
response.raise_for_status()
data = response.json()
schema_path = os.path.join(
os.path.dirname(dashboard.__file__), "modules/snapshot/snapshot_schema.json"
)
pprint.pprint(data)
jsonschema.validate(instance=data, schema=json.load(open(schema_path)))
return data
def test_successful_job_status(
ray_start_with_dashboard, disable_aiohttp_cache, enable_test_module
):
address = ray_start_with_dashboard["webui_url"]
assert wait_until_server_available(address)
address = format_web_url(address)
entrypoint_cmd = (
'python -c"' "import ray;" "ray.init();" "import time;" "time.sleep(5);" '"'
)
client = JobSubmissionClient(address)
job_id = client.submit_job(entrypoint=entrypoint_cmd)
def wait_for_job_to_succeed():
data = _get_snapshot(address)
for job_entry in data["data"]["snapshot"]["jobs"].values():
if job_entry["status"] is not None:
assert job_entry["config"]["metadata"]["jobSubmissionId"] == job_id
assert job_entry["status"] in {"PENDING", "RUNNING", "SUCCEEDED"}
assert job_entry["statusMessage"] is not None
return job_entry["status"] == "SUCCEEDED"
return False
wait_for_condition(wait_for_job_to_succeed, timeout=30)
def test_failed_job_status(
ray_start_with_dashboard, disable_aiohttp_cache, enable_test_module
):
address = ray_start_with_dashboard["webui_url"]
assert wait_until_server_available(address)
address = format_web_url(address)
entrypoint_cmd = (
'python -c"'
"import ray;"
"ray.init();"
"import time;"
"time.sleep(5);"
"import sys;"
"sys.exit(1);"
'"'
)
client = JobSubmissionClient(address)
job_id = client.submit_job(entrypoint=entrypoint_cmd)
def wait_for_job_to_fail():
data = _get_snapshot(address)
for job_entry in data["data"]["snapshot"]["jobs"].values():
if job_entry["status"] is not None:
assert job_entry["config"]["metadata"]["jobSubmissionId"] == job_id
assert job_entry["status"] in {"PENDING", "RUNNING", "FAILED"}
assert job_entry["statusMessage"] is not None
return job_entry["status"] == "FAILED"
return False
wait_for_condition(wait_for_job_to_fail, timeout=30)
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))