mirror of
https://github.com/vale981/ray
synced 2025-03-04 17:41:43 -05:00
[Serve] Remove release tests for checkpoint_path (#27194)
This commit is contained in:
parent
b84ab363ec
commit
8beb887bbe
3 changed files with 0 additions and 278 deletions
|
@ -2472,28 +2472,6 @@
|
|||
|
||||
alert: default
|
||||
|
||||
- name: serve_serve_cluster_fault_tolerance
|
||||
group: Serve tests
|
||||
working_dir: serve_tests
|
||||
|
||||
legacy:
|
||||
test_name: serve_cluster_fault_tolerance
|
||||
test_suite: serve_tests
|
||||
|
||||
frequency: nightly
|
||||
team: serve
|
||||
cluster:
|
||||
cluster_env: app_config.yaml
|
||||
cluster_compute: compute_tpl_single_node.yaml
|
||||
|
||||
run:
|
||||
timeout: 7200
|
||||
long_running: false
|
||||
script: python workloads/serve_cluster_fault_tolerance.py
|
||||
type: sdk_command
|
||||
file_manager: job
|
||||
|
||||
alert: default
|
||||
|
||||
- name: deployment_graph_long_chain
|
||||
group: Serve tests
|
||||
|
|
|
@ -1,128 +0,0 @@
|
|||
"""
|
||||
Test that a serve deployment can recover from cluster failures by resuming
|
||||
from checkpoints of external source, such as s3.
|
||||
|
||||
For product testing, we skip the part of actually starting new cluster as
|
||||
it's Job Manager's responsibility, and only re-deploy to the same cluster
|
||||
with remote checkpoint.
|
||||
"""
|
||||
|
||||
import click
|
||||
import logging
|
||||
import requests
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from serve_test_cluster_utils import setup_local_single_node_cluster
|
||||
|
||||
from serve_test_utils import (
|
||||
save_test_results,
|
||||
is_smoke_test,
|
||||
)
|
||||
|
||||
import ray
|
||||
from ray import serve
|
||||
|
||||
logger = logging.getLogger(__file__)
|
||||
|
||||
# Deployment configs
|
||||
DEFAULT_NUM_REPLICAS = 2
|
||||
DEFAULT_MAX_BATCH_SIZE = 16
|
||||
|
||||
|
||||
def request_with_retries(endpoint, timeout=3):
|
||||
start = time.time()
|
||||
while True:
|
||||
try:
|
||||
return requests.get("http://127.0.0.1:8000" + endpoint, timeout=timeout)
|
||||
except requests.RequestException:
|
||||
if time.time() - start > timeout:
|
||||
raise TimeoutError
|
||||
time.sleep(0.1)
|
||||
|
||||
|
||||
@click.command()
|
||||
def main():
|
||||
# Setup local cluster, note this cluster setup is the same for both
|
||||
# local and product ray cluster env.
|
||||
# Each test uses different ray namespace, thus kv storage key for each
|
||||
# checkpoint is different to avoid collision.
|
||||
namespace = uuid.uuid4().hex
|
||||
|
||||
# IS_SMOKE_TEST is set by args of releaser's e2e.py
|
||||
if is_smoke_test():
|
||||
path = Path("checkpoint.db")
|
||||
checkpoint_path = f"file://{path}"
|
||||
if path.exists():
|
||||
path.unlink()
|
||||
else:
|
||||
checkpoint_path = (
|
||||
"s3://serve-nightly-tests/fault-tolerant-test-checkpoint" # noqa: E501
|
||||
)
|
||||
|
||||
_, cluster = setup_local_single_node_cluster(
|
||||
1, checkpoint_path=checkpoint_path, namespace=namespace
|
||||
)
|
||||
|
||||
# Deploy for the first time
|
||||
@serve.deployment(num_replicas=DEFAULT_NUM_REPLICAS)
|
||||
def hello():
|
||||
return serve.get_replica_context().deployment
|
||||
|
||||
for name in ["hello", "world"]:
|
||||
hello.options(name=name).deploy()
|
||||
|
||||
for _ in range(5):
|
||||
response = request_with_retries(f"/{name}/", timeout=3)
|
||||
assert response.text == name
|
||||
|
||||
logger.info("Initial deployment successful with working endpoint.")
|
||||
|
||||
# Kill current cluster, recover from remote checkpoint and ensure endpoint
|
||||
# is still available with expected results
|
||||
|
||||
ray.kill(serve.context._global_client._controller, no_restart=True)
|
||||
ray.shutdown()
|
||||
cluster.shutdown()
|
||||
serve.context._set_global_client(None)
|
||||
|
||||
# Start another ray cluster with same namespace to resume from previous
|
||||
# checkpoints with no new deploy() call.
|
||||
setup_local_single_node_cluster(
|
||||
1, checkpoint_path=checkpoint_path, namespace=namespace
|
||||
)
|
||||
|
||||
for name in ["hello", "world"]:
|
||||
for _ in range(5):
|
||||
response = request_with_retries(f"/{name}/", timeout=3)
|
||||
assert response.text == name
|
||||
|
||||
logger.info(
|
||||
"Deployment recovery from s3 checkpoint is successful " "with working endpoint."
|
||||
)
|
||||
|
||||
# Delete dangling checkpoints. If script failed before this step, it's up
|
||||
# to the TTL policy on s3 to clean up, but won't lead to collision with
|
||||
# subsequent tests since each test run in different uuid namespace.
|
||||
serve.shutdown()
|
||||
ray.shutdown()
|
||||
cluster.shutdown()
|
||||
|
||||
# Checkpoints in S3 bucket are moved after 7 days with explicit lifecycle
|
||||
# rules. Each checkpoint is ~260 Bytes in size from this test.
|
||||
|
||||
# Save results
|
||||
save_test_results(
|
||||
{"result": "success"},
|
||||
default_output_file="/tmp/serve_cluster_fault_tolerance.json",
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
import pytest
|
||||
import sys
|
||||
|
||||
sys.exit(pytest.main(["-v", "-s", __file__]))
|
|
@ -1,128 +0,0 @@
|
|||
"""
|
||||
Test that a serve deployment can recover from cluster failures by resuming
|
||||
from checkpoints of external source, such as Google Cloud Storage (GCS).
|
||||
|
||||
For product testing, we skip the part of actually starting new cluster as
|
||||
it's Job Manager's responsibility, and only re-deploy to the same cluster
|
||||
with remote checkpoint.
|
||||
"""
|
||||
|
||||
|
||||
import logging
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import click
|
||||
import requests
|
||||
from serve_test_cluster_utils import setup_local_single_node_cluster
|
||||
from serve_test_utils import (
|
||||
save_test_results,
|
||||
is_smoke_test,
|
||||
)
|
||||
|
||||
import ray
|
||||
from ray import serve
|
||||
|
||||
logger = logging.getLogger(__file__)
|
||||
|
||||
# Deployment configs
|
||||
DEFAULT_NUM_REPLICAS = 4
|
||||
DEFAULT_MAX_BATCH_SIZE = 16
|
||||
|
||||
|
||||
def request_with_retries(endpoint, timeout=3):
|
||||
start = time.time()
|
||||
while True:
|
||||
try:
|
||||
return requests.get("http://127.0.0.1:8000" + endpoint, timeout=timeout)
|
||||
except requests.RequestException:
|
||||
if time.time() - start > timeout:
|
||||
raise TimeoutError
|
||||
time.sleep(0.1)
|
||||
|
||||
|
||||
@click.command()
|
||||
def main():
|
||||
# Setup local cluster, note this cluster setup is the same for both
|
||||
# local and product ray cluster env.
|
||||
# Each test uses different ray namespace, thus kv storage key for each
|
||||
# checkpoint is different to avoid collision.
|
||||
namespace = uuid.uuid4().hex
|
||||
|
||||
# IS_SMOKE_TEST is set by args of releaser's e2e.py
|
||||
if is_smoke_test():
|
||||
checkpoint_path = "file://checkpoint.db"
|
||||
else:
|
||||
checkpoint_path = (
|
||||
"gs://kazi_test/test/fault-tolerant-test-checkpoint" # noqa: E501
|
||||
)
|
||||
|
||||
_, cluster = setup_local_single_node_cluster(
|
||||
1, checkpoint_path=checkpoint_path, namespace=namespace
|
||||
)
|
||||
|
||||
# Deploy for the first time
|
||||
@serve.deployment(name="echo", num_replicas=DEFAULT_NUM_REPLICAS)
|
||||
class Echo:
|
||||
def __init__(self):
|
||||
return True
|
||||
|
||||
def __call__(self, request):
|
||||
return "hii"
|
||||
|
||||
Echo.deploy()
|
||||
|
||||
# Ensure endpoint is working
|
||||
for _ in range(5):
|
||||
response = request_with_retries("/echo/", timeout=3)
|
||||
assert response.text == "hii"
|
||||
|
||||
logger.info("Initial deployment successful with working endpoint.")
|
||||
|
||||
# Kill current cluster, recover from remote checkpoint and ensure endpoint
|
||||
# is still available with expected results
|
||||
|
||||
ray.kill(serve.context._global_client._controller, no_restart=True)
|
||||
ray.shutdown()
|
||||
cluster.shutdown()
|
||||
serve.context._set_global_client(None)
|
||||
|
||||
# Start another ray cluster with same namespace to resume from previous
|
||||
# checkpoints with no new deploy() call.
|
||||
setup_local_single_node_cluster(
|
||||
1, checkpoint_path=checkpoint_path, namespace=namespace
|
||||
)
|
||||
|
||||
for _ in range(5):
|
||||
response = request_with_retries("/echo/", timeout=3)
|
||||
assert response.text == "hii"
|
||||
|
||||
logger.info(
|
||||
"Deployment recovery from Google Cloud Storage checkpoint "
|
||||
"is successful with working endpoint."
|
||||
)
|
||||
|
||||
# Delete dangling checkpoints. If script failed before this step, it's up
|
||||
# to the TTL policy on GCS to clean up, but won't lead to collision with
|
||||
# subsequent tests since each test run in different uuid namespace.
|
||||
serve.shutdown()
|
||||
ray.shutdown()
|
||||
cluster.shutdown()
|
||||
|
||||
# Checkpoints in GCS bucket are moved after 7 days with explicit lifecycle
|
||||
# rules. Each checkpoint is ~260 Bytes in size from this test.
|
||||
|
||||
# Save results
|
||||
save_test_results(
|
||||
{"result": "success"},
|
||||
default_output_file="/tmp/serve_cluster_fault_tolerance.json",
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
import sys
|
||||
|
||||
import pytest
|
||||
|
||||
sys.exit(pytest.main(["-v", "-s", __file__]))
|
Loading…
Add table
Reference in a new issue