[kubernetes][test] Operator test tweaks. (#15074)

This commit is contained in:
Dmitri Gekhtman 2021-04-02 12:20:52 -04:00 committed by GitHub
parent 8de66fce3d
commit 6f81ec1998
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -14,9 +14,13 @@ import yaml
IMAGE_ENV = "KUBERNETES_OPERATOR_TEST_IMAGE"
IMAGE = os.getenv(IMAGE_ENV, "rayproject/ray:nightly")
NAMESPACE_ENV = "KUBERNETES_OPERATOR_TEST_NAMESPACE"
NAMESPACE = os.getenv(NAMESPACE_ENV, "test-k8s-operator")
PULL_POLICY_ENV = "KUBERNETES_OPERATOR_TEST_PULL_POLICY"
PULL_POLICY = os.getenv(PULL_POLICY_ENV, "Always")
RAY_PATH = os.path.abspath(
os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))))
@ -57,13 +61,22 @@ def wait_for_logs():
@retry_until_true
def wait_for_job(job_pod):
print(">>>Checking job logs.")
cmd = f"kubectl -n {NAMESPACE} logs {job_pod}"
try:
out = subprocess.check_output(cmd, shell=True).decode()
out = subprocess.check_output(
cmd, shell=True, stderr=subprocess.STDOUT).decode()
except subprocess.CalledProcessError as e:
print(">>>Failed to check job logs.")
print(e.output.decode())
raise (e)
return ("success" in out.lower())
raise e
success = "success" in out.lower()
if success:
print(">>>Job submission succeeded.")
else:
print(">>>Job logs do not indicate job sucess:")
print(out)
return success
def kubernetes_configs_directory():
@ -116,7 +129,7 @@ class KubernetesOperatorTest(unittest.TestCase):
] + [podType["podConfig"]["spec"] for podType in podTypes2])
for pod_spec in pod_specs:
pod_spec["containers"][0]["image"] = IMAGE
pod_spec["containers"][0]["imagePullPolicy"] = "IfNotPresent"
pod_spec["containers"][0]["imagePullPolicy"] = PULL_POLICY
# Dump to temporary files
yaml.dump(example_cluster_config, example_cluster_file)
@ -130,27 +143,33 @@ class KubernetesOperatorTest(unittest.TestCase):
file.flush()
# Start operator and two clusters
print(">>>Starting operator and two clusters.")
for file in files:
cmd = f"kubectl -n {NAMESPACE} apply -f {file.name}"
subprocess.check_call(cmd, shell=True)
# Check that autoscaling respects minWorkers by waiting for
# six pods in the namespace.
print(">>>Waiting for pods to join clusters.")
wait_for_pods(6)
# Check that logging output looks normal (two workers connected to
# ray cluster example-cluster.)
print(">>>Checking monitor logs for head and workers.")
wait_for_logs()
# Delete the second cluster
print(">>>Deleting example-cluster2.")
cmd = f"kubectl -n {NAMESPACE} delete -f"\
f"{example_cluster2_file.name}"
subprocess.check_call(cmd, shell=True)
# Four pods remain
print(">>>Checking that example-cluster2 pods are gone.")
wait_for_pods(4)
# Check job submission
print(">>>Submitting a job to test Ray client connection.")
cmd = f"kubectl -n {NAMESPACE} create -f {job_file.name}"
subprocess.check_call(cmd, shell=True)
@ -165,21 +184,24 @@ class KubernetesOperatorTest(unittest.TestCase):
# Check that cluster updates work: increase minWorkers to 3
# and check that one worker is created.
print(">>>Updating cluster size.")
example_cluster_edit = copy.deepcopy(example_cluster_config)
example_cluster_edit["spec"]["podTypes"][1]["minWorkers"] = 3
yaml.dump(example_cluster_edit, example_cluster_file)
example_cluster_file.flush()
cm = f"kubectl -n {NAMESPACE} apply -f {example_cluster_file.name}"
subprocess.check_call(cm, shell=True)
print(">>>Checking that new cluster size is respected.")
wait_for_pods(5)
# Delete the first cluster
print(">>>Deleting second cluster.")
cmd = f"kubectl -n {NAMESPACE} delete -f"\
f"{example_cluster_file.name}"
subprocess.check_call(cmd, shell=True)
# Only operator pod remains.
print(">>>Checking that all Ray cluster pods are gone.")
wait_for_pods(1)