[Placement group] Release test (#10924)

* Done.

* Lint.

* Addressed code review.
This commit is contained in:
SangBin Cho 2020-09-22 00:49:04 -07:00 committed by GitHub
parent 9d205cda86
commit e3b4850224
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 150 additions and 1 deletions

View file

@ -0,0 +1,145 @@
# This is stress test to run placement group.
# Please don't run it in the cluster
# setup yet. This test uses the cluster util to simulate the
# cluster environment.
import time
from time import perf_counter
from random import random
import ray
from ray.cluster_utils import Cluster
from ray.util.placement_group import (placement_group, remove_placement_group)
# TODO(sang): Increase the number in the actual stress test.
# This number should be divisible by 3.
resource_quantity = 999
num_nodes = 5
custom_resources = {"pg_custom": resource_quantity}
# Create pg that uses 1 resource of cpu & custom resource.
num_pg = resource_quantity
# TODO(sang): Cluster setup. Remove when running in real clusters.
cluster = Cluster()
nodes = []
for _ in range(num_nodes):
nodes.append(
cluster.add_node(
num_cpus=3, num_gpus=resource_quantity,
resources=custom_resources))
cluster.wait_for_nodes()
ray.init(address=cluster.address)
while not ray.is_initialized():
time.sleep(0.1)
# Scenario 1: Create bunch of placement groups and measure how long it takes.
total_creating_time = 0
total_removing_time = 0
repeat = 1
total_trial = repeat * num_pg
bundles = [{"GPU": 1, "pg_custom": 1}] * num_nodes
# Create and remove placement groups.
for _ in range(repeat):
pgs = []
for i in range(num_pg):
start = perf_counter()
pgs.append(placement_group(bundles, strategy="PACK", name=str(i)))
end = perf_counter()
total_creating_time += (end - start)
ray.get([pg.ready() for pg in pgs])
for pg in pgs:
start = perf_counter()
remove_placement_group(pg)
end = perf_counter()
total_removing_time += (end - start)
# Validate the correctness.
assert ray.cluster_resources()["GPU"] == num_nodes * resource_quantity
assert ray.cluster_resources()["pg_custom"] == num_nodes * resource_quantity
# Scenario 2:
# - Launch 30% of placement group in the driver and pass them.
# - Launch 70% of placement group at each remote tasks.
# - Randomly remove placement groups and schedule tasks and actors.
#
# Goal:
# - Make sure jobs are done without breaking GCS server.
# - Make sure all the resources are recovered after the job is done.
# - Measure the creation latency in the stressful environment.
@ray.remote
def mock_task():
time.sleep(0.1)
return True
@ray.remote
class MockActor:
def __init__(self):
pass
@ray.remote
def pg_launcher(pre_created_pgs, num_pgs_to_create):
pgs = []
pgs += pre_created_pgs
for i in range(num_pgs_to_create):
pgs.append(
placement_group(bundles, strategy="STRICT_SPREAD", name=str(i)))
pgs_removed = []
pgs_unremoved = []
# Randomly choose placement groups to remove.
for pg in pgs:
if random() < .5:
pgs_removed.append(pg)
else:
pgs_unremoved.append(pg)
# Randomly schedule tasks or actors on placement groups that
# are not removed.
for pg in pgs_unremoved:
# TODO(sang): Comment in this line causes GCS actor management
# failure. We need to fix it.
# if random() < .5:
mock_task.options(placement_group=pg).remote()
# else:
# MockActor.options(placement_group=pg).remote()
# Remove the rest of placement groups.
for pg in pgs_removed:
remove_placement_group(pg)
ray.get([pg.ready() for pg in pgs_unremoved], timeout=10)
# Since placement groups are scheduled, remove them.
for pg in pgs_unremoved:
remove_placement_group(pg)
pre_created_num_pgs = round(num_pg * 0.3)
num_pgs_to_create = num_pg - pre_created_num_pgs
pg_launchers = []
for i in range(3):
pre_created_pgs = [
placement_group(bundles, strategy="STRICT_SPREAD")
for _ in range(pre_created_num_pgs // 3)
]
pg_launchers.append(
pg_launcher.remote(pre_created_pgs, num_pgs_to_create // 3))
ray.get(pg_launchers)
assert ray.cluster_resources()["GPU"] == num_nodes * resource_quantity
assert ray.cluster_resources()["pg_custom"] == num_nodes * resource_quantity
ray.shutdown()
print("Avg placement group creating time: "
f"{total_creating_time / total_trial * 1000} ms")
print("Avg placement group removing time: "
f"{total_removing_time / total_trial* 1000} ms")
print("Stress Test succeed.")

View file

@ -59,6 +59,9 @@ This document describes the process for creating new releases.
- ``ci/regression_test/rllib_stress_tests`` run multinode 8hr IMPALA trial.
- ``ci/regression_test/stress_tests`` contains two tests: ``many_tasks`` and ``dead_actors``.
Each of the test runs on 105 spot instances.
- ``ci/regression_test/stress_tests/placement_group`` contains a Python script to run tests.
It currently uses ``cluster_util`` to emulate the cluster testing. It will be converted to
real multi-node tests in the future. For now, just make sure the test succeed locally.
Make sure that these pass. For the RLlib regression tests, see the comment on the
file for the pass criteria. For the rest, it will be obvious if they passed.

View file

@ -126,7 +126,8 @@ void GcsPlacementGroupManager::OnPlacementGroupCreationFailed(
// registered.
auto state = placement_group->GetState();
RAY_CHECK(state == rpc::PlacementGroupTableData::RESCHEDULING ||
state == rpc::PlacementGroupTableData::PENDING);
state == rpc::PlacementGroupTableData::PENDING)
<< "State: " << state;
if (state == rpc::PlacementGroupTableData::RESCHEDULING) {
// NOTE: If a node is dead, the placement group scheduler should try to recover the
// group by rescheduling the bundles of the dead node. This should have higher