mirror of
https://github.com/vale981/ray
synced 2025-03-10 05:16:49 -04:00

Running `./ci/long_running_tests/start_workloads.sh` will start several workloads running (each in their own EC2 instance). - The workloads run forever. - The workloads all simulate multiple nodes but use a single machine. - You can get the tail of each workload by running `./ci/long_running_tests/check_workloads.sh`. - You have to manually shut down the instances. As discussed with @ericl @richardliaw, the idea here is to optimize for the debuggability of the tests. If one of them fails, you can ssh to the relevant instance and see all of the logs.
70 lines
1.9 KiB
Python
70 lines
1.9 KiB
Python
# This workload tests repeatedly killing a node and adding a new node.
|
|
|
|
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
|
|
import time
|
|
|
|
import ray
|
|
from ray.tests.cluster_utils import Cluster
|
|
|
|
num_redis_shards = 5
|
|
redis_max_memory = 10**8
|
|
object_store_memory = 10**8
|
|
num_nodes = 10
|
|
|
|
message = ("Make sure there is enough memory on this machine to run this "
|
|
"workload. We divide the system memory by 2 to provide a buffer.")
|
|
assert (num_nodes * object_store_memory + num_redis_shards * redis_max_memory <
|
|
ray.utils.get_system_memory() / 2), message
|
|
|
|
# Simulate a cluster on one machine.
|
|
|
|
cluster = Cluster()
|
|
for i in range(num_nodes):
|
|
cluster.add_node(
|
|
redis_port=6379 if i == 0 else None,
|
|
num_redis_shards=num_redis_shards if i == 0 else None,
|
|
num_cpus=2,
|
|
num_gpus=0,
|
|
resources={str(i): 2},
|
|
object_store_memory=object_store_memory,
|
|
redis_max_memory=redis_max_memory)
|
|
ray.init(redis_address=cluster.redis_address)
|
|
|
|
# Run the workload.
|
|
|
|
|
|
@ray.remote
|
|
def f(*xs):
|
|
return 1
|
|
|
|
|
|
iteration = 0
|
|
previous_ids = [1 for _ in range(100)]
|
|
start_time = time.time()
|
|
previous_time = start_time
|
|
while True:
|
|
for _ in range(100):
|
|
previous_ids = [f.remote(previous_id) for previous_id in previous_ids]
|
|
|
|
ray.wait(previous_ids, num_returns=len(previous_ids))
|
|
|
|
for _ in range(100):
|
|
previous_ids = [f.remote(previous_id) for previous_id in previous_ids]
|
|
|
|
node_to_kill = cluster.list_all_nodes()[1]
|
|
# Remove the first non-head node.
|
|
cluster.remove_node(node_to_kill)
|
|
cluster.add_node()
|
|
|
|
new_time = time.time()
|
|
print("Iteration {}:\n"
|
|
" - Iteration time: {}.\n"
|
|
" - Absolute time: {}.\n"
|
|
" - Total elapsed time: {}.".format(
|
|
iteration, new_time - previous_time, new_time,
|
|
new_time - start_time))
|
|
previous_time = new_time
|
|
iteration += 1
|