ray/ci/long_running_tests/workloads/workload1.py
Robert Nishihara 75504b9586 Add script for running infinitely long stress tests. (#4163)
Running `./ci/long_running_tests/start_workloads.sh` will start several workloads running (each in their own EC2 instance).
- The workloads run forever.
- The workloads all simulate multiple nodes but use a single machine.
- You can get the tail of each workload by running `./ci/long_running_tests/check_workloads.sh`.
- You have to manually shut down the instances.

As discussed with @ericl @richardliaw, the idea here is to optimize for the debuggability of the tests. If one of them fails, you can ssh to the relevant instance and see all of the logs.
2019-02-27 14:33:06 -08:00

67 lines
1.8 KiB
Python

# This workload tests submitting and getting many tasks over and over.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import time
import ray
from ray.tests.cluster_utils import Cluster
num_redis_shards = 5
redis_max_memory = 10**8
object_store_memory = 10**8
num_nodes = 10
message = ("Make sure there is enough memory on this machine to run this "
"workload. We divide the system memory by 2 to provide a buffer.")
assert (num_nodes * object_store_memory + num_redis_shards * redis_max_memory <
ray.utils.get_system_memory() / 2)
# Simulate a cluster on one machine.
cluster = Cluster()
for i in range(num_nodes):
cluster.add_node(
redis_port=6379 if i == 0 else None,
num_redis_shards=num_redis_shards if i == 0 else None,
num_cpus=2,
num_gpus=0,
resources={str(i): 2},
object_store_memory=object_store_memory,
redis_max_memory=redis_max_memory)
ray.init(redis_address=cluster.redis_address)
# Run the workload.
@ray.remote
def f(*xs):
return 1
iteration = 0
ids = []
start_time = time.time()
previous_time = start_time
while True:
for _ in range(50):
new_constrained_ids = [
f._remote(args=[*ids], resources={str(i % num_nodes): 1})
for i in range(25)
]
new_unconstrained_ids = [f.remote(*ids) for _ in range(25)]
ids = new_constrained_ids + new_unconstrained_ids
ray.get(ids)
new_time = time.time()
print("Iteration {}:\n"
" - Iteration time: {}.\n"
" - Absolute time: {}.\n"
" - Total elapsed time: {}.".format(
iteration, new_time - previous_time, new_time,
new_time - start_time))
previous_time = new_time
iteration += 1