Add micro benchmark to releaser repo (#17727)

This commit is contained in:
Jiao 2021-08-11 15:15:33 -07:00 committed by GitHub
parent 9a70e83e90
commit 3c64a1a3c1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 110 additions and 20 deletions

View file

@ -0,0 +1,3 @@
from ray.serve.benchmarks import microbenchmark
__all__ = ["microbenchmark"]

View file

@ -7,10 +7,11 @@ import asyncio
import time
import requests
import numpy as np
import ray
from ray import serve
import numpy as np
from ray.serve.utils import logger
NUM_CLIENTS = 8
CALLS_PER_BATCH = 100
@ -31,8 +32,9 @@ async def timeit(name, fn, multiplier=1):
count += 1
end = time.time()
stats.append(multiplier * count / (end - start))
print("\t{} {} +- {} requests/s".format(name, round(np.mean(stats), 2),
round(np.std(stats), 2)))
logger.info("\t{} {} +- {} requests/s".format(name, round(
np.mean(stats), 2), round(np.std(stats), 2)))
return round(np.mean(stats), 2)
async def fetch(session, data):
@ -54,13 +56,18 @@ class Client:
await fetch(self.session, data)
async def trial(intermediate_handles, num_replicas, max_batch_size,
max_concurrent_queries, data_size):
print(f"intermediate_handles={intermediate_handles},"
async def trial(result_json, intermediate_handles, num_replicas,
max_batch_size, max_concurrent_queries, data_size):
trial_key_base = (
f"replica:{num_replicas}/batch_size:{max_batch_size}/"
f"concurrent_queries:{max_concurrent_queries}/"
f"data_size:{data_size}/intermediate_handle:{intermediate_handles}")
logger.info(f"intermediate_handles={intermediate_handles},"
f"num_replicas={num_replicas},"
f"max_batch_size={max_batch_size},"
f"max_concurrent_queries={max_concurrent_queries},"
f"data_size={data_size}:")
f"data_size={data_size}")
deployment_name = "api"
if intermediate_handles:
@ -111,10 +118,12 @@ async def trial(intermediate_handles, num_replicas, max_batch_size,
for _ in range(CALLS_PER_BATCH):
await fetch(session, data)
await timeit(
single_client_avg_tps = await timeit(
"single client {} data".format(data_size),
single_client,
multiplier=CALLS_PER_BATCH)
key = "num_client:1/" + trial_key_base
result_json.update({key: single_client_avg_tps})
clients = [Client.remote() for _ in range(NUM_CLIENTS)]
ray.get([client.ready.remote() for client in clients])
@ -122,26 +131,32 @@ async def trial(intermediate_handles, num_replicas, max_batch_size,
async def many_clients():
ray.get([a.do_queries.remote(CALLS_PER_BATCH, data) for a in clients])
await timeit(
multi_client_avg_tps = await timeit(
"{} clients {} data".format(len(clients), data_size),
many_clients,
multiplier=CALLS_PER_BATCH * len(clients))
key = f"num_client:{len(clients)}/" + trial_key_base
result_json.update({key: multi_client_avg_tps})
logger.info(result_json)
async def main():
ray.init(log_to_driver=False)
serve.start()
result_json = {}
for intermediate_handles in [False, True]:
for num_replicas in [1, 8]:
for max_batch_size, max_concurrent_queries in [(1, 1), (1, 10000),
(10000, 10000)]:
# TODO(edoakes): large data causes broken pipe errors.
for data_size in ["small"]:
await trial(intermediate_handles, num_replicas,
max_batch_size, max_concurrent_queries,
data_size)
await trial(result_json, intermediate_handles,
num_replicas, max_batch_size,
max_concurrent_queries, data_size)
return result_json
if __name__ == "__main__":
ray.init(log_to_driver=False)
serve.start()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

View file

@ -0,0 +1,27 @@
cloud_id: {{env["ANYSCALE_CLOUD_ID"]}}
region: us-west-2
max_workers: 0
head_node_type:
name: head_node
# 8 cpus, x86, 32G mem, 10Gb NIC, $0.384/hr on demand
instance_type: m5.2xlarge
worker_node_types:
- name: worker_node
# 8 cpus, x86, 32G mem, 10Gb NIC, $0.384/hr on demand
instance_type: m5.2xlarge
min_workers: 0
# 1k max replicas
max_workers: 0
use_spot: false
aws:
TagSpecifications:
- ResourceType: "instance"
Tags:
- Key: anyscale-user
Value: '{{env["ANYSCALE_USER"]}}'
- Key: anyscale-expiration
Value: '{{env["EXPIRATION_1D"]}}'

View file

@ -23,3 +23,16 @@
smoke_test:
timeout: 600
- name: serve_micro_benchmark
cluster:
app_config: app_config.yaml
compute_template: compute_tpl_single_node.yaml
run:
timeout: 7200
long_running: False
script: python workloads/serve_micro_benchmark.py
smoke_test:
timeout: 600

View file

@ -0,0 +1,32 @@
import asyncio
import os
from ray.serve.benchmarks.microbenchmark import main as benchmark_main
from ray.serve.utils import logger
from serve_test_cluster_utils import (
setup_local_single_node_cluster,
setup_anyscale_cluster,
)
from serve_test_utils import (
save_test_results, )
async def main():
# Give default cluster parameter values based on smoke_test config
# if user provided values explicitly, use them instead.
# IS_SMOKE_TEST is set by args of releaser's e2e.py
smoke_test = os.environ.get("IS_SMOKE_TEST", "1")
if smoke_test == "1":
setup_local_single_node_cluster(1)
else:
setup_anyscale_cluster()
result_json = await benchmark_main()
logger.info(result_json)
save_test_results(
result_json, default_output_file="/tmp/micro_benchmark.json")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())