From e3b485022464109abee53cc2822070618dc3e327 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Tue, 22 Sep 2020 00:49:04 -0700 Subject: [PATCH] [Placement group] Release test (#10924) * Done. * Lint. * Addressed code review. --- .../placement_group/test_placement_group.py | 145 ++++++++++++++++++ doc/dev/RELEASE_PROCESS.rst | 3 + .../gcs_server/gcs_placement_group_manager.cc | 3 +- 3 files changed, 150 insertions(+), 1 deletion(-) create mode 100644 ci/regression_test/stress_tests/placement_group/test_placement_group.py diff --git a/ci/regression_test/stress_tests/placement_group/test_placement_group.py b/ci/regression_test/stress_tests/placement_group/test_placement_group.py new file mode 100644 index 000000000..3f7c9cc6c --- /dev/null +++ b/ci/regression_test/stress_tests/placement_group/test_placement_group.py @@ -0,0 +1,145 @@ +# This is stress test to run placement group. +# Please don't run it in the cluster +# setup yet. This test uses the cluster util to simulate the +# cluster environment. + +import time + +from time import perf_counter +from random import random + +import ray + +from ray.cluster_utils import Cluster +from ray.util.placement_group import (placement_group, remove_placement_group) + +# TODO(sang): Increase the number in the actual stress test. +# This number should be divisible by 3. +resource_quantity = 999 +num_nodes = 5 +custom_resources = {"pg_custom": resource_quantity} +# Create pg that uses 1 resource of cpu & custom resource. +num_pg = resource_quantity + +# TODO(sang): Cluster setup. Remove when running in real clusters. +cluster = Cluster() +nodes = [] +for _ in range(num_nodes): + nodes.append( + cluster.add_node( + num_cpus=3, num_gpus=resource_quantity, + resources=custom_resources)) +cluster.wait_for_nodes() + +ray.init(address=cluster.address) +while not ray.is_initialized(): + time.sleep(0.1) + +# Scenario 1: Create bunch of placement groups and measure how long it takes. +total_creating_time = 0 +total_removing_time = 0 +repeat = 1 +total_trial = repeat * num_pg +bundles = [{"GPU": 1, "pg_custom": 1}] * num_nodes + +# Create and remove placement groups. +for _ in range(repeat): + pgs = [] + for i in range(num_pg): + start = perf_counter() + pgs.append(placement_group(bundles, strategy="PACK", name=str(i))) + end = perf_counter() + total_creating_time += (end - start) + + ray.get([pg.ready() for pg in pgs]) + + for pg in pgs: + start = perf_counter() + remove_placement_group(pg) + end = perf_counter() + total_removing_time += (end - start) + +# Validate the correctness. +assert ray.cluster_resources()["GPU"] == num_nodes * resource_quantity +assert ray.cluster_resources()["pg_custom"] == num_nodes * resource_quantity + + +# Scenario 2: +# - Launch 30% of placement group in the driver and pass them. +# - Launch 70% of placement group at each remote tasks. +# - Randomly remove placement groups and schedule tasks and actors. +# +# Goal: +# - Make sure jobs are done without breaking GCS server. +# - Make sure all the resources are recovered after the job is done. +# - Measure the creation latency in the stressful environment. +@ray.remote +def mock_task(): + time.sleep(0.1) + return True + + +@ray.remote +class MockActor: + def __init__(self): + pass + + +@ray.remote +def pg_launcher(pre_created_pgs, num_pgs_to_create): + pgs = [] + pgs += pre_created_pgs + for i in range(num_pgs_to_create): + pgs.append( + placement_group(bundles, strategy="STRICT_SPREAD", name=str(i))) + + pgs_removed = [] + pgs_unremoved = [] + # Randomly choose placement groups to remove. + for pg in pgs: + if random() < .5: + pgs_removed.append(pg) + else: + pgs_unremoved.append(pg) + + # Randomly schedule tasks or actors on placement groups that + # are not removed. + for pg in pgs_unremoved: + # TODO(sang): Comment in this line causes GCS actor management + # failure. We need to fix it. + # if random() < .5: + mock_task.options(placement_group=pg).remote() + # else: + # MockActor.options(placement_group=pg).remote() + + # Remove the rest of placement groups. + for pg in pgs_removed: + remove_placement_group(pg) + + ray.get([pg.ready() for pg in pgs_unremoved], timeout=10) + # Since placement groups are scheduled, remove them. + for pg in pgs_unremoved: + remove_placement_group(pg) + + +pre_created_num_pgs = round(num_pg * 0.3) +num_pgs_to_create = num_pg - pre_created_num_pgs + +pg_launchers = [] +for i in range(3): + pre_created_pgs = [ + placement_group(bundles, strategy="STRICT_SPREAD") + for _ in range(pre_created_num_pgs // 3) + ] + pg_launchers.append( + pg_launcher.remote(pre_created_pgs, num_pgs_to_create // 3)) + +ray.get(pg_launchers) +assert ray.cluster_resources()["GPU"] == num_nodes * resource_quantity +assert ray.cluster_resources()["pg_custom"] == num_nodes * resource_quantity +ray.shutdown() +print("Avg placement group creating time: " + f"{total_creating_time / total_trial * 1000} ms") +print("Avg placement group removing time: " + f"{total_removing_time / total_trial* 1000} ms") +print("Stress Test succeed.") diff --git a/doc/dev/RELEASE_PROCESS.rst b/doc/dev/RELEASE_PROCESS.rst index 4d5eae734..127f11a99 100644 --- a/doc/dev/RELEASE_PROCESS.rst +++ b/doc/dev/RELEASE_PROCESS.rst @@ -59,6 +59,9 @@ This document describes the process for creating new releases. - ``ci/regression_test/rllib_stress_tests`` run multinode 8hr IMPALA trial. - ``ci/regression_test/stress_tests`` contains two tests: ``many_tasks`` and ``dead_actors``. Each of the test runs on 105 spot instances. + - ``ci/regression_test/stress_tests/placement_group`` contains a Python script to run tests. + It currently uses ``cluster_util`` to emulate the cluster testing. It will be converted to + real multi-node tests in the future. For now, just make sure the test succeed locally. Make sure that these pass. For the RLlib regression tests, see the comment on the file for the pass criteria. For the rest, it will be obvious if they passed. diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc index 1414559f4..888716fa8 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc @@ -126,7 +126,8 @@ void GcsPlacementGroupManager::OnPlacementGroupCreationFailed( // registered. auto state = placement_group->GetState(); RAY_CHECK(state == rpc::PlacementGroupTableData::RESCHEDULING || - state == rpc::PlacementGroupTableData::PENDING); + state == rpc::PlacementGroupTableData::PENDING) + << "State: " << state; if (state == rpc::PlacementGroupTableData::RESCHEDULING) { // NOTE: If a node is dead, the placement group scheduler should try to recover the // group by rescheduling the bundles of the dead node. This should have higher