[Tests] Remove app level error from nightly tests (#16968)

* Completed

* Fix tests

* increase the node wait timeout

Signed-off-by: SangBin Cho <rkooo567@gmail.com>
This commit is contained in:
SangBin Cho 2021-07-09 12:20:42 -07:00 committed by GitHub
parent 66ea099897
commit 33e319e9d7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 205 additions and 257 deletions

View file

@ -185,31 +185,23 @@ if __name__ == "__main__":
if args.nbytes // npartitions > args.max_partition_size:
npartitions = args.nbytes // args.max_partition_size
success = 1
duration = []
try:
output = trial(
client,
args.data_dir,
args.nbytes,
npartitions,
args.generate_only,
s3_bucket=args.s3_bucket,
file_path=args.file_path)
print("mean over {} trials: {} +- {}".format(
len(output), np.mean(output), np.std(output)))
except Exception as e:
import traceback
print(traceback.format_exc())
print(e)
success = 0
duration = []
output = trial(
client,
args.data_dir,
args.nbytes,
npartitions,
args.generate_only,
s3_bucket=args.s3_bucket,
file_path=args.file_path)
print("mean over {} trials: {} +- {}".format(
len(output), np.mean(output), np.std(output)))
print(ray.internal.internal_api.memory_summary(stats_only=True))
duration = np.mean(output)
with open(os.environ["TEST_OUTPUT_JSON"], "w") as f:
f.write(json.dumps({"duration": duration, "success": success}))
f.write(json.dumps({"duration": duration, "success": 1}))
write_header = not os.path.exists("output.csv") or os.path.getsize(
"output.csv") == 0

View file

@ -421,44 +421,40 @@ def parse_script_args():
def main():
args, unknown = parse_script_args()
logging.info("Received arguments: {}".format(args))
success = 1
try:
# Create test spec
test_spec = TestSpec(
num_workers=args.num_workers,
worker_obj_store_size_in_gb=args.worker_obj_store_size_in_gb,
error_rate=args.error_rate,
trigger_object_spill=args.trigger_object_spill,
)
logging.info("Created test spec: {}".format(test_spec))
# Create the data save path if it doesn't exist.
data_save_path = args.data_save_path
if not os.path.exists(data_save_path):
os.makedirs(data_save_path, mode=0o777, exist_ok=True)
os.chmod(data_save_path, mode=0o777)
# Create test spec
test_spec = TestSpec(
num_workers=args.num_workers,
worker_obj_store_size_in_gb=args.worker_obj_store_size_in_gb,
error_rate=args.error_rate,
trigger_object_spill=args.trigger_object_spill,
)
logging.info("Created test spec: {}".format(test_spec))
# Lazily construct Xarrays
xarray_filename_pairs = lazy_create_xarray_filename_pairs(test_spec)
# Create the data save path if it doesn't exist.
data_save_path = args.data_save_path
if not os.path.exists(data_save_path):
os.makedirs(data_save_path, mode=0o777, exist_ok=True)
os.chmod(data_save_path, mode=0o777)
# Connect to the Ray cluster
ray.init(address="auto")
# Lazily construct Xarrays
xarray_filename_pairs = lazy_create_xarray_filename_pairs(test_spec)
# Save all the Xarrays to disk; this will trigger
# Dask computations on Ray.
logging.info("Saving {} xarrays..".format(len(xarray_filename_pairs)))
SaveRoutines.save_all_xarrays(
xarray_filename_pairs=xarray_filename_pairs,
dirpath=data_save_path,
batch_size=test_spec.batch_size,
ray_scheduler=ray_dask_get,
)
print(ray.internal.internal_api.memory_summary(stats_only=True))
except Exception as e:
logging.exception(e)
success = 0
# Connect to the Ray cluster
ray.init(address="auto")
# Save all the Xarrays to disk; this will trigger
# Dask computations on Ray.
logging.info("Saving {} xarrays..".format(len(xarray_filename_pairs)))
SaveRoutines.save_all_xarrays(
xarray_filename_pairs=xarray_filename_pairs,
dirpath=data_save_path,
batch_size=test_spec.batch_size,
ray_scheduler=ray_dask_get,
)
print(ray.internal.internal_api.memory_summary(stats_only=True))
with open(os.environ["TEST_OUTPUT_JSON"], "w") as f:
f.write(json.dumps({"success": success}))
f.write(json.dumps({"success": 1}))
if __name__ == "__main__":

View file

@ -99,26 +99,6 @@
prepare: python wait_cluster.py 4 600
script: python shuffle/shuffle_test.py --num-partitions=200 --partition-size=500e6 --no-streaming
# Test streaming shuffle in a single node with a large partition size.
- name: streaming_shuffle_1tb_100_partitions
cluster:
app_config: shuffle/shuffle_app_config.yaml
compute_template: shuffle/shuffle_compute_multi.yaml
run:
timeout: 3000
script: python shuffle/shuffle_test.py --num-partitions=100 --partition-size=10e9
# Test non streaming shuffle in a single node with a large partition size.
- name: non_streaming_shuffle_1tb_100_partitions
cluster:
app_config: shuffle/shuffle_app_config.yaml
compute_template: shuffle/shuffle_compute_multi.yaml
run:
timeout: 3000
script: python shuffle/shuffle_test.py --num-partitions=100 --partition-size=10e9 --no-streaming
# Test multi nodes 1TB streaming shuffle with a large number of partitions.
- name: shuffle_1tb_1000_partition
cluster:
@ -127,7 +107,7 @@
run:
timeout: 3000
prepare: python wait_cluster.py 20 600
prepare: python wait_cluster.py 20 900
script: python shuffle/shuffle_test.py --num-partitions=1000 --partition-size=1e9
# Test multi nodes 1TB non streaming shuffle with a large number of partitions.
@ -138,7 +118,7 @@
run:
timeout: 3000
prepare: python wait_cluster.py 20 600
prepare: python wait_cluster.py 20 900
script: python shuffle/shuffle_test.py --num-partitions=1000 --partition-size=1e9 --no-streaming
# Stress test for 1TB multi node streaming shuffle.
@ -149,7 +129,7 @@
run:
timeout: 3000
prepare: python wait_cluster.py 20 600
prepare: python wait_cluster.py 20 900
script: python shuffle/shuffle_test.py --num-partitions=5000 --partition-size=200e6
# Stress test for 1TB multi node non-streaming shuffle.
@ -160,7 +140,7 @@
run:
timeout: 3000
prepare: python wait_cluster.py 20 600
prepare: python wait_cluster.py 20 900
script: python shuffle/shuffle_test.py --num-partitions=5000 --partition-size=200e6 --no-streaming
# Test large scale dask on ray test without spilling.

View file

@ -18,7 +18,6 @@ if __name__ == "__main__":
args = parser.parse_args()
start = time.time()
success = 1
commands = [
"python", "-m", "ray.experimental.shuffle", "--ray-address={}".format(
@ -29,16 +28,8 @@ if __name__ == "__main__":
if args.no_streaming:
commands.append("--no-streaming")
try:
subprocess.check_call(commands)
except Exception as e:
print(f"The test failed with {e}")
success = 0
subprocess.check_call(commands)
delta = time.time() - start
# Report the running time as 0 if it fails so that
# it is easy to be discovered from the graph.
if not success:
delta = 0
with open(os.environ["TEST_OUTPUT_JSON"], "w") as f:
f.write(json.dumps({"shuffle_time": delta, "success": success}))
f.write(json.dumps({"shuffle_time": delta, "success": 1}))

View file

@ -73,53 +73,50 @@ if __name__ == "__main__":
num_children = args.num_children
death_probability = args.death_probability
try:
# Wait until the expected number of nodes have joined the cluster.
while True:
num_nodes = len(ray.nodes())
logger.info("Waiting for nodes {}/{}".format(
num_nodes, num_remote_nodes + 1))
if num_nodes >= num_remote_nodes + 1:
break
time.sleep(5)
logger.info("Nodes have all joined. There are %s resources.",
ray.cluster_resources())
# Wait until the expected number of nodes have joined the cluster.
while True:
num_nodes = len(ray.nodes())
logger.info("Waiting for nodes {}/{}".format(num_nodes,
num_remote_nodes + 1))
if num_nodes >= num_remote_nodes + 1:
break
time.sleep(5)
logger.info("Nodes have all joined. There are %s resources.",
ray.cluster_resources())
parents = [
Parent.remote(num_children, death_probability)
for _ in range(num_parents)
]
parents = [
Parent.remote(num_children, death_probability)
for _ in range(num_parents)
]
start = time.time()
loop_times = []
for i in range(100):
loop_start = time.time()
ray.get([parent.ping.remote(10) for parent in parents])
start = time.time()
loop_times = []
for i in range(100):
loop_start = time.time()
ray.get([parent.ping.remote(10) for parent in parents])
# Kill a parent actor with some probability.
exit_chance = np.random.rand()
if exit_chance > death_probability:
parent_index = np.random.randint(len(parents))
parents[parent_index].kill.remote()
parents[parent_index] = Parent.remote(num_children,
death_probability)
# Kill a parent actor with some probability.
exit_chance = np.random.rand()
if exit_chance > death_probability:
parent_index = np.random.randint(len(parents))
parents[parent_index].kill.remote()
parents[parent_index] = Parent.remote(num_children,
death_probability)
logger.info("Finished trial %s", i)
loop_times.append(time.time() - loop_start)
logger.info("Finished trial %s", i)
loop_times.append(time.time() - loop_start)
print("Finished in: {}s".format(time.time() - start))
print("Average iteration time: {}s".format(
sum(loop_times) / len(loop_times)))
print("Max iteration time: {}s".format(max(loop_times)))
print("Min iteration time: {}s".format(min(loop_times)))
result["total_time"] = time.time() - start
result["avg_iteration_time"] = sum(loop_times) / len(loop_times)
result["max_iteration_time"] = max(loop_times)
result["min_iteration_time"] = min(loop_times)
result["success"] = 1
print("PASSED.")
print("Finished in: {}s".format(time.time() - start))
print("Average iteration time: {}s".format(
sum(loop_times) / len(loop_times)))
print("Max iteration time: {}s".format(max(loop_times)))
print("Min iteration time: {}s".format(min(loop_times)))
result["total_time"] = time.time() - start
result["avg_iteration_time"] = sum(loop_times) / len(loop_times)
result["max_iteration_time"] = max(loop_times)
result["min_iteration_time"] = min(loop_times)
result["success"] = 1
print("PASSED.")
except Exception as e:
logging.exception(e)
print("FAILED.")
with open(os.environ["TEST_OUTPUT_JSON"], "w") as f:
f.write(json.dumps(result))

View file

@ -179,65 +179,62 @@ if __name__ == "__main__":
is_smoke_test = args.smoke_test
result = {"success": 0}
try:
# Wait until the expected number of nodes have joined the cluster.
while True:
num_nodes = len(ray.nodes())
logger.info("Waiting for nodes {}/{}".format(
num_nodes, num_remote_nodes + 1))
if num_nodes >= num_remote_nodes + 1:
break
time.sleep(5)
logger.info("Nodes have all joined. There are %s resources.",
ray.cluster_resources())
# Wait until the expected number of nodes have joined the cluster.
while True:
num_nodes = len(ray.nodes())
logger.info("Waiting for nodes {}/{}".format(num_nodes,
num_remote_nodes + 1))
if num_nodes >= num_remote_nodes + 1:
break
time.sleep(5)
logger.info("Nodes have all joined. There are %s resources.",
ray.cluster_resources())
stage_0_time = stage0(smoke=is_smoke_test)
logger.info("Finished stage 0 after %s seconds.", stage_0_time)
result["stage_0_time"] = stage_0_time
stage_0_time = stage0(smoke=is_smoke_test)
logger.info("Finished stage 0 after %s seconds.", stage_0_time)
result["stage_0_time"] = stage_0_time
stage_1_time, stage_1_iterations = stage1(smoke=is_smoke_test)
logger.info("Finished stage 1 after %s seconds.", stage_1_time)
result["stage_1_time"] = stage_1_time
result["stage_1_avg_iteration_time"] = sum(stage_1_iterations) / len(
stage_1_iterations)
result["stage_1_max_iteration_time"] = max(stage_1_iterations)
result["stage_1_min_iteration_time"] = min(stage_1_iterations)
stage_1_time, stage_1_iterations = stage1(smoke=is_smoke_test)
logger.info("Finished stage 1 after %s seconds.", stage_1_time)
result["stage_1_time"] = stage_1_time
result["stage_1_avg_iteration_time"] = sum(stage_1_iterations) / len(
stage_1_iterations)
result["stage_1_max_iteration_time"] = max(stage_1_iterations)
result["stage_1_min_iteration_time"] = min(stage_1_iterations)
stage_2_time, stage_2_iterations = stage2(smoke=is_smoke_test)
logger.info("Finished stage 2 after %s seconds.", stage_2_time)
result["stage_2_time"] = stage_2_time
result["stage_2_avg_iteration_time"] = sum(stage_2_iterations) / len(
stage_2_iterations)
result["stage_2_max_iteration_time"] = max(stage_2_iterations)
result["stage_2_min_iteration_time"] = min(stage_2_iterations)
stage_2_time, stage_2_iterations = stage2(smoke=is_smoke_test)
logger.info("Finished stage 2 after %s seconds.", stage_2_time)
result["stage_2_time"] = stage_2_time
result["stage_2_avg_iteration_time"] = sum(stage_2_iterations) / len(
stage_2_iterations)
result["stage_2_max_iteration_time"] = max(stage_2_iterations)
result["stage_2_min_iteration_time"] = min(stage_2_iterations)
stage_3_time, stage_3_creation_time = stage3(
total_num_remote_cpus, smoke=is_smoke_test)
logger.info("Finished stage 3 in %s seconds.", stage_3_time)
result["stage_3_creation_time"] = stage_3_creation_time
result["stage_3_time"] = stage_3_time
stage_3_time, stage_3_creation_time = stage3(
total_num_remote_cpus, smoke=is_smoke_test)
logger.info("Finished stage 3 in %s seconds.", stage_3_time)
result["stage_3_creation_time"] = stage_3_creation_time
result["stage_3_time"] = stage_3_time
stage_4_spread = stage4()
# avg_spread ~ 115 with Ray 1.0 scheduler. ~695 with (buggy) 0.8.7
# scheduler.
result["stage_4_spread"] = stage_4_spread
result["success"] = 1
print("PASSED.")
stage_4_spread = stage4()
# avg_spread ~ 115 with Ray 1.0 scheduler. ~695 with (buggy) 0.8.7
# scheduler.
result["stage_4_spread"] = stage_4_spread
result["success"] = 1
print("PASSED.")
# TODO(rkn): The test below is commented out because it currently
# does not pass.
# # Submit a bunch of actor tasks with all-to-all communication.
# start_time = time.time()
# logger.info("Submitting actor tasks with all-to-all communication.")
# x_ids = []
# for _ in range(50):
# for size_exponent in [0, 1, 2, 3, 4, 5, 6]:
# x_ids = [a.method.remote(10**size_exponent, *x_ids) for a
# in actors]
# ray.get(x_ids)
# logger.info("Finished after %s seconds.", time.time() - start_time)
# TODO(rkn): The test below is commented out because it currently
# does not pass.
# # Submit a bunch of actor tasks with all-to-all communication.
# start_time = time.time()
# logger.info("Submitting actor tasks with all-to-all communication.")
# x_ids = []
# for _ in range(50):
# for size_exponent in [0, 1, 2, 3, 4, 5, 6]:
# x_ids = [a.method.remote(10**size_exponent, *x_ids) for a
# in actors]
# ray.get(x_ids)
# logger.info("Finished after %s seconds.", time.time() - start_time)
except Exception as e:
logging.exception(e)
print("FAILED.")
with open(os.environ["TEST_OUTPUT_JSON"], "w") as out_put:
out_put.write(json.dumps(result))

View file

@ -87,86 +87,81 @@ def pg_launcher(pre_created_pgs, num_pgs_to_create):
if __name__ == "__main__":
result = {"success": 0}
try:
# Wait until the expected number of nodes have joined the cluster.
ray.init(address="auto")
while True:
num_nodes = len(ray.nodes())
logger.info("Waiting for nodes {}/{}".format(
num_nodes, NUM_NODES + 1))
if num_nodes >= NUM_NODES + 1:
break
time.sleep(5)
logger.info("Nodes have all joined. There are %s resources.",
ray.cluster_resources())
# Scenario 1: Create bunch of placement groups and measure how long
# it takes.
total_creating_time = 0
total_removing_time = 0
repeat = 1
total_trial = repeat * NUM_PG
BUNDLES = [{"pg_custom": 1}] * NUM_NODES
# Wait until the expected number of nodes have joined the cluster.
ray.init(address="auto")
while True:
num_nodes = len(ray.nodes())
logger.info("Waiting for nodes {}/{}".format(num_nodes, NUM_NODES + 1))
if num_nodes >= NUM_NODES + 1:
break
time.sleep(5)
logger.info("Nodes have all joined. There are %s resources.",
ray.cluster_resources())
# Create and remove placement groups.
for _ in range(repeat):
pgs = []
for i in range(NUM_PG):
start = perf_counter()
pgs.append(placement_group(BUNDLES, strategy="PACK"))
end = perf_counter()
logger.info(f"append_group iteration {i}")
total_creating_time += (end - start)
# Scenario 1: Create bunch of placement groups and measure how long
# it takes.
total_creating_time = 0
total_removing_time = 0
repeat = 1
total_trial = repeat * NUM_PG
BUNDLES = [{"pg_custom": 1}] * NUM_NODES
ray.get([pg.ready() for pg in pgs])
# Create and remove placement groups.
for _ in range(repeat):
pgs = []
for i in range(NUM_PG):
start = perf_counter()
pgs.append(placement_group(BUNDLES, strategy="PACK"))
end = perf_counter()
logger.info(f"append_group iteration {i}")
total_creating_time += (end - start)
for i, pg in enumerate(pgs):
start = perf_counter()
remove_placement_group(pg)
end = perf_counter()
logger.info(f"remove_group iteration {i}")
total_removing_time += (end - start)
ray.get([pg.ready() for pg in pgs])
# Validate the correctness.
assert ray.cluster_resources()[
"pg_custom"] == NUM_NODES * RESOURCE_QUANTITY
for i, pg in enumerate(pgs):
start = perf_counter()
remove_placement_group(pg)
end = perf_counter()
logger.info(f"remove_group iteration {i}")
total_removing_time += (end - start)
# Scenario 2:
# - Launch 30% of placement group in the driver and pass them.
# - Launch 70% of placement group at each remote tasks.
# - Randomly remove placement groups and schedule tasks and actors.
#
# Goal:
# - Make sure jobs are done without breaking GCS server.
# - Make sure all the resources are recovered after the job is done.
# - Measure the creation latency in the stressful environment.
pre_created_num_pgs = round(NUM_PG * 0.3)
num_pgs_to_create = NUM_PG - pre_created_num_pgs
pg_launchers = []
for i in range(3):
pre_created_pgs = [
placement_group(BUNDLES, strategy="STRICT_SPREAD")
for _ in range(pre_created_num_pgs // 3)
]
pg_launchers.append(
pg_launcher.remote(pre_created_pgs, num_pgs_to_create // 3))
# Validate the correctness.
assert ray.cluster_resources()[
"pg_custom"] == NUM_NODES * RESOURCE_QUANTITY
ray.get(pg_launchers)
assert ray.cluster_resources()[
"pg_custom"] == NUM_NODES * RESOURCE_QUANTITY
# Scenario 2:
# - Launch 30% of placement group in the driver and pass them.
# - Launch 70% of placement group at each remote tasks.
# - Randomly remove placement groups and schedule tasks and actors.
#
# Goal:
# - Make sure jobs are done without breaking GCS server.
# - Make sure all the resources are recovered after the job is done.
# - Measure the creation latency in the stressful environment.
pre_created_num_pgs = round(NUM_PG * 0.3)
num_pgs_to_create = NUM_PG - pre_created_num_pgs
pg_launchers = []
for i in range(3):
pre_created_pgs = [
placement_group(BUNDLES, strategy="STRICT_SPREAD")
for _ in range(pre_created_num_pgs // 3)
]
pg_launchers.append(
pg_launcher.remote(pre_created_pgs, num_pgs_to_create // 3))
ray.get(pg_launchers)
assert ray.cluster_resources()[
"pg_custom"] == NUM_NODES * RESOURCE_QUANTITY
result["avg_pg_create_time_ms"] = total_creating_time / total_trial * 1000
result["avg_pg_remove_time_ms"] = total_removing_time / total_trial * 1000
result["success"] = 1
print("Avg placement group creating time: "
f"{total_creating_time / total_trial * 1000} ms")
print("Avg placement group removing time: "
f"{total_removing_time / total_trial* 1000} ms")
print("PASSED.")
result[
"avg_pg_create_time_ms"] = total_creating_time / total_trial * 1000
result[
"avg_pg_remove_time_ms"] = total_removing_time / total_trial * 1000
result["success"] = 1
print("Avg placement group creating time: "
f"{total_creating_time / total_trial * 1000} ms")
print("Avg placement group removing time: "
f"{total_removing_time / total_trial* 1000} ms")
print("PASSED.")
except Exception as e:
logger.exception(e)
print("FAILED.")
with open(os.environ["TEST_OUTPUT_JSON"], "w") as out_put:
out_put.write(json.dumps(result))