[ci/multinode] Add utilities to kill nodes in multi node testing (#21580)

Killing nodes enables advanced fault tolerance testing. This PR adds utilities and a test for this functionality in fake multinode docker mode.
This commit is contained in:
Kai Fricke 2022-01-15 17:11:16 -08:00 committed by GitHub
parent a971774820
commit d84154a774
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 88 additions and 0 deletions

View file

@ -1,6 +1,7 @@
import json
import logging
import os
import random
import shutil
import subprocess
import sys
@ -34,6 +35,9 @@ class DockerCluster:
self._tempdir = None
self._config_file = None
self._nodes_file = None
self._nodes = {}
self._status_file = None
self._status = {}
self._partial_config = config
self._cluster_config = None
self._docker_image = None
@ -197,6 +201,7 @@ class DockerCluster:
os.chmod(self._tempdir, 0o777)
self._config_file = os.path.join(self._tempdir, "cluster.yaml")
self._nodes_file = os.path.join(self._tempdir, "nodes.json")
self._status_file = os.path.join(self._tempdir, "status.json")
self.update_config()
self.maybe_pull_image()
@ -240,3 +245,66 @@ class DockerCluster:
f"RAY_FAKE_CLUSTER=1 ray down -y {self.config_file}", shell=True)
self._stop_monitor()
def _update_nodes(self):
with open(self._nodes_file, "rt") as f:
self._nodes = json.load(f)
def _update_status(self):
with open(self._status_file, "rt") as f:
self._status = json.load(f)
def _get_node(self,
node_id: Optional[str] = None,
num: Optional[int] = None,
rand: Optional[str] = None) -> str:
self._update_nodes()
if node_id:
assert not num and not rand, (
"Only provide either `node_id`, `num`, or `random`.")
elif num:
assert not node_id and not rand, (
"Only provide either `node_id`, `num`, or `random`.")
base = "fffffffffffffffffffffffffffffffffffffffffffffffffff"
node_id = base + str(num).zfill(5)
elif rand:
assert not node_id and not num, (
"Only provide either `node_id`, `num`, or `random`.")
assert rand in ["worker", "any"
], ("`random` must be one of ['worker', 'any']")
choices = list(self._nodes.keys())
if rand == "worker":
choices.remove(
"fffffffffffffffffffffffffffffffffffffffffffffffffff00000")
# Else: any
node_id = random.choice(choices)
assert node_id in self._nodes, (
f"Node with ID {node_id} is not in active nodes.")
return node_id
def _get_docker_container(self, node_id: str) -> Optional[str]:
self._update_status()
node_status = self._status.get(node_id)
if not node_status:
return None
return node_status["Name"]
def kill_node(self,
node_id: Optional[str] = None,
num: Optional[int] = None,
rand: Optional[str] = None):
"""Kill node.
If ``node_id`` is given, kill that node.
If ``num`` is given, construct node_id from this number, and kill
that node.
If ``rand`` is given (as either ``worker`` or ``any``), kill a random
node.
"""
node_id = self._get_node(node_id=node_id, num=num, rand=rand)
container = self._get_docker_container(node_id=node_id)
subprocess.check_output(f"docker kill {container}", shell=True)

View file

@ -51,6 +51,26 @@ class MultiNodeSyncTest(unittest.TestCase):
num_cpus=1, num_gpus=1, placement_group=pg).remote(5)))
print("Autoscaling worked")
ray.util.remove_placement_group(pg)
time.sleep(2) # Give some time so nodes.json is updated
self.cluster.kill_node(num=2)
print("Killed GPU node.")
pg = ray.util.placement_group([{"CPU": 1, "GPU": 1}] * 2)
table = ray.util.placement_group_table(pg)
assert table["state"] == "PENDING"
timeout = time.monotonic() + 180
while table["state"] != "CREATED":
if time.monotonic() > timeout:
raise RuntimeError(
"Re-starting killed node failed or too slow.")
time.sleep(1)
table = ray.util.placement_group_table(pg)
print("Node was restarted.")
if __name__ == "__main__":