From b91246a09389993636101e187bff97378185adbc Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Tue, 16 Aug 2022 19:16:08 +0200 Subject: [PATCH] [air/benchmarks] Measure local training time in torch/tf benchmarks (#27902) We currently measure end-to-end training time in our benchmarks, which includes setup overhead. This is an unequal comparison, as setup overhead for vanilla training cannot be accurately expressed and was instead just disregarded. By comparing the raw training times in the actual training loop, we will get a more accurate expression of any potential overhead or benefit in using Ray vs. vanilla tensorflow/torch. Signed-off-by: Kai Fricke --- doc/source/ray-air/benchmarks.rst | 22 ++++-- .../workloads/tensorflow_benchmark.py | 78 +++++++++++++------ .../workloads/torch_benchmark.py | 69 +++++++++++----- release/release_tests.yaml | 6 +- 4 files changed, 123 insertions(+), 52 deletions(-) diff --git a/doc/source/ray-air/benchmarks.rst b/doc/source/ray-air/benchmarks.rst index 8907763d9..6830866c5 100644 --- a/doc/source/ray-air/benchmarks.rst +++ b/doc/source/ray-air/benchmarks.rst @@ -181,9 +181,12 @@ Pytorch Training Parity This task checks the performance parity between native Pytorch Distributed and Ray Train's distributed TorchTrainer. -We demonstrate that the performance is similar (within 10\%) between the two frameworks. +We demonstrate that the performance is similar (within 2.5\%) between the two frameworks. Performance may vary greatly across different model, hardware, and cluster configurations. +The reported times are for the raw training times. There is an unreported constant setup +overhead of a few seconds for both methods that is negligible for longer training runs. + - `Pytorch comparison training script`_ - `Pytorch comparison CPU cluster configuration`_ - `Pytorch comparison GPU cluster configuration`_ @@ -196,15 +199,15 @@ Performance may vary greatly across different model, hardware, and cluster confi - **Command** * - 4 m5.2xlarge nodes (4 workers) - FashionMNIST - - 201.17 s (vs 195.90 s Pytorch) + - 196.64 s (vs 194.90 s Pytorch) - `python workloads/torch_benchmark.py run --num-runs 3 --num-epochs 20 --num-workers 4 --cpus-per-worker 8` * - 4 m5.2xlarge nodes (16 workers) - FashionMNIST - - 447.14 s (vs 461.75 s Pytorch) + - 430.88 s (vs 475.97 s Pytorch) - `python workloads/torch_benchmark.py run --num-runs 3 --num-epochs 20 --num-workers 16 --cpus-per-worker 2` * - 4 g4dn.12xlarge node (16 workers) - FashionMNIST - - 236.61 s (vs 220.97 s Pytorch) + - 149.80 s (vs 146.46 s Pytorch) - `python workloads/torch_benchmark.py run --num-runs 3 --num-epochs 20 --num-workers 16 --cpus-per-worker 4 --use-gpu` @@ -216,9 +219,12 @@ Tensorflow Training Parity This task checks the performance parity between native Tensorflow Distributed and Ray Train's distributed TensorflowTrainer. -We demonstrate that the performance is similar (within 10\%) between the two frameworks. +We demonstrate that the performance is similar (within 1\%) between the two frameworks. Performance may vary greatly across different model, hardware, and cluster configurations. +The reported times are for the raw training times. There is an unreported constant setup +overhead of a few seconds for both methods that is negligible for longer training runs. + .. note:: The batch size and number of epochs is different for the GPU benchmark, resulting in a longer runtime. - `Tensorflow comparison training script`_ @@ -233,15 +239,15 @@ Performance may vary greatly across different model, hardware, and cluster confi - **Command** * - 4 m5.2xlarge nodes (4 workers) - FashionMNIST - - 90.61 s (vs 81.26 s Tensorflow) + - 78.81 s (vs 79.67 s Tensorflow) - `python workloads/tensorflow_benchmark.py run --num-runs 3 --num-epochs 20 --num-workers 4 --cpus-per-worker 8` * - 4 m5.2xlarge nodes (16 workers) - FashionMNIST - - 75.34 s (vs 69.51 s Tensorflow) + - 64.57 s (vs 67.45 s Tensorflow) - `python workloads/tensorflow_benchmark.py run --num-runs 3 --num-epochs 20 --num-workers 16 --cpus-per-worker 2` * - 4 g4dn.12xlarge node (16 workers) - FashionMNIST - - 495.85 s (vs 479.28 s Tensorflow) + - 465.16 s (vs 461.74 s Tensorflow) - `python workloads/tensorflow_benchmark.py run --num-runs 3 --num-epochs 200 --num-workers 16 --cpus-per-worker 4 --batch-size 64 --use-gpu` diff --git a/release/air_tests/air_benchmarks/workloads/tensorflow_benchmark.py b/release/air_tests/air_benchmarks/workloads/tensorflow_benchmark.py index 937fb457c..c6f1b65e3 100644 --- a/release/air_tests/air_benchmarks/workloads/tensorflow_benchmark.py +++ b/release/air_tests/air_benchmarks/workloads/tensorflow_benchmark.py @@ -39,12 +39,7 @@ def build_cnn_model() -> tf.keras.Model: def train_func(use_ray: bool, config: dict): - if use_ray: - from ray.air.callbacks.keras import Callback as TrainCheckpointReportCallback - - callbacks = [TrainCheckpointReportCallback(frequency=0)] - else: - callbacks = [] + local_start_time = time.monotonic() per_worker_batch_size = config.get("batch_size", 64) epochs = config.get("epochs", 3) @@ -69,6 +64,18 @@ def train_func(use_ray: bool, config: dict): metrics=["accuracy"], ) + if use_ray: + from ray.air.callbacks.keras import Callback as TrainCheckpointReportCallback + + class CustomReportCallback(TrainCheckpointReportCallback): + def _handle(self, logs: dict, when: str = None): + logs["local_time_taken"] = time.monotonic() - local_start_time + super()._handle(logs, when) + + callbacks = [CustomReportCallback(frequency=0)] + else: + callbacks = [] + history = multi_worker_model.fit( multi_worker_dataset, epochs=epochs, @@ -79,10 +86,11 @@ def train_func(use_ray: bool, config: dict): loss = results["loss"][-1] if not use_ray: + local_time_taken = time.monotonic() - local_start_time print(f"Reporting loss: {loss:.4f}") if local_rank == 0: with open(VANILLA_RESULT_JSON, "w") as f: - json.dump({"loss": loss}, f) + json.dump({"loss": loss, "local_time_taken": local_time_taken}, f) return results @@ -93,7 +101,7 @@ def train_tf_ray_air( num_workers: int = 4, cpus_per_worker: int = 8, use_gpu: bool = False, -) -> Tuple[float, float]: +) -> Tuple[float, float, float]: # This function is kicked off by the main() function and runs a full training # run using Ray AIR. from ray.train.tensorflow import TensorflowTrainer @@ -117,7 +125,7 @@ def train_tf_ray_air( time_taken = time.monotonic() - start_time print(f"Last result: {result.metrics}") - return time_taken, result.metrics["loss"] + return time_taken, result.metrics["local_time_taken"], result.metrics["loss"] def train_tf_vanilla_worker( @@ -147,7 +155,7 @@ def train_tf_vanilla( num_workers: int = 4, cpus_per_worker: int = 8, use_gpu: bool = False, -) -> Tuple[float, float]: +) -> Tuple[float, float, float]: # This function is kicked off by the main() function and subsequently kicks # off tasks that run train_tf_vanilla_worker() on the worker nodes. from benchmark_util import ( @@ -205,13 +213,14 @@ def train_tf_vanilla( run_commands_on_actors(actors=actors, cmds=cmds) time_taken = time.monotonic() - start_time - loss = 0.0 + loss = local_time_taken = 0.0 if os.path.exists(VANILLA_RESULT_JSON): with open(VANILLA_RESULT_JSON, "r") as f: result = json.load(f) loss = result["loss"] + local_time_taken = result["local_time_taken"] - return time_taken, loss + return time_taken, local_time_taken, loss @click.group(help="Run Tensorflow benchmarks") @@ -253,15 +262,17 @@ def run( run_command_on_all_nodes(["python", path]) times_ray = [] + times_local_ray = [] losses_ray = [] times_vanilla = [] + times_local_vanilla = [] losses_vanilla = [] for run in range(1, num_runs + 1): time.sleep(2) print(f"[Run {run}/{num_runs}] Running Tensorflow Ray benchmark") - time_ray, loss_ray = train_tf_ray_air( + time_ray, time_local_ray, loss_ray = train_tf_ray_air( num_workers=num_workers, cpus_per_worker=cpus_per_worker, use_gpu=use_gpu, @@ -270,7 +281,8 @@ def run( print( f"[Run {run}/{num_runs}] Finished Ray training ({num_epochs} epochs) in " - f"{time_ray:.2f} seconds. Observed loss = {loss_ray:.4f}" + f"{time_ray:.2f} seconds (local training time: {time_local_ray:.2f}s). " + f"Observed loss = {loss_ray:.4f}" ) time.sleep(2) @@ -279,10 +291,10 @@ def run( # Todo: Vanilla runs are sometimes failing. We just retry here, but we should # get to the bottom of it. - time_vanilla = loss_vanilla = 0.0 + time_vanilla = time_local_vanilla = loss_vanilla = 0.0 for i in range(3): try: - time_vanilla, loss_vanilla = train_tf_vanilla( + time_vanilla, time_local_vanilla, loss_vanilla = train_tf_vanilla( num_workers=num_workers, cpus_per_worker=cpus_per_worker, use_gpu=use_gpu, @@ -297,40 +309,58 @@ def run( print( f"[Run {run}/{num_runs}] Finished vanilla training ({num_epochs} epochs) " - f"in {time_vanilla:.2f} seconds. Observed loss = {loss_vanilla:.4f}" + f"in {time_vanilla:.2f} seconds " + f"(local training time: {time_local_vanilla:.2f}s). " + f"Observed loss = {loss_vanilla:.4f}" ) print( f"[Run {run}/{num_runs}] Observed results: ", { "tensorflow_mnist_ray_time_s": time_ray, + "tensorflow_mnist_ray_local_time_s": time_local_ray, "tensorflow_mnist_ray_loss": loss_ray, "tensorflow_mnist_vanilla_time_s": time_vanilla, + "tensorflow_mnist_vanilla_local_time_s": time_local_vanilla, "tensorflow_mnist_vanilla_loss": loss_vanilla, }, ) times_ray.append(time_ray) + times_local_ray.append(time_local_ray) losses_ray.append(loss_ray) times_vanilla.append(time_vanilla) + times_local_vanilla.append(time_local_vanilla) losses_vanilla.append(loss_vanilla) times_ray_mean = np.mean(times_ray) times_ray_sd = np.std(times_ray) + times_local_ray_mean = np.mean(times_local_ray) + times_local_ray_sd = np.std(times_local_ray) + times_vanilla_mean = np.mean(times_vanilla) times_vanilla_sd = np.std(times_vanilla) + times_local_vanilla_mean = np.mean(times_local_vanilla) + times_local_vanilla_sd = np.std(times_local_vanilla) + result = { "tensorflow_mnist_ray_num_runs": num_runs, "tensorflow_mnist_ray_time_s_all": times_ray, "tensorflow_mnist_ray_time_s_mean": times_ray_mean, "tensorflow_mnist_ray_time_s_sd": times_ray_sd, + "tensorflow_mnist_ray_time_local_s_all": times_local_ray, + "tensorflow_mnist_ray_time_local_s_mean": times_local_ray_mean, + "tensorflow_mnist_ray_time_local_s_sd": times_local_ray_sd, "tensorflow_mnist_ray_loss_mean": np.mean(losses_ray), "tensorflow_mnist_ray_loss_sd": np.std(losses_ray), "tensorflow_mnist_vanilla_time_s_all": times_vanilla, "tensorflow_mnist_vanilla_time_s_mean": times_vanilla_mean, "tensorflow_mnist_vanilla_time_s_sd": times_vanilla_sd, + "tensorflow_mnist_vanilla_local_time_s_all": times_local_vanilla, + "tensorflow_mnist_vanilla_local_time_s_mean": times_local_vanilla_mean, + "tensorflow_mnist_vanilla_local_time_s_sd": times_local_vanilla_sd, "tensorflow_mnist_vanilla_loss_mean": np.mean(losses_vanilla), "tensorflow_mnist_vanilla_loss_std": np.std(losses_vanilla), } @@ -341,18 +371,22 @@ def run( json.dump(result, f) target_ratio = 1.2 - ratio = (times_ray_mean / times_vanilla_mean) if times_vanilla_mean != 0.0 else 1.0 + ratio = ( + (times_local_ray_mean / times_local_vanilla_mean) + if times_local_vanilla_mean != 0.0 + else 1.0 + ) if ratio > target_ratio: raise RuntimeError( - f"Training on Ray took an average of {times_ray_mean:.2f} seconds, " + f"Training on Ray took an average of {times_local_ray_mean:.2f} seconds, " f"which is more than {target_ratio:.2f}x of the average vanilla training " - f"time of {times_vanilla_mean:.2f} seconds ({ratio:.2f}x). FAILED" + f"time of {times_local_vanilla_mean:.2f} seconds ({ratio:.2f}x). FAILED" ) print( - f"Training on Ray took an average of {times_ray_mean:.2f} seconds, " + f"Training on Ray took an average of {times_local_ray_mean:.2f} seconds, " f"which is less than {target_ratio:.2f}x of the average vanilla training " - f"time of {times_vanilla_mean:.2f} seconds ({ratio:.2f}x). PASSED" + f"time of {times_local_vanilla_mean:.2f} seconds ({ratio:.2f}x). PASSED" ) diff --git a/release/air_tests/air_benchmarks/workloads/torch_benchmark.py b/release/air_tests/air_benchmarks/workloads/torch_benchmark.py index cc1e088c4..82d40607d 100644 --- a/release/air_tests/air_benchmarks/workloads/torch_benchmark.py +++ b/release/air_tests/air_benchmarks/workloads/torch_benchmark.py @@ -78,6 +78,8 @@ def validate_epoch(dataloader, model, loss_fn, world_size: int, local_rank: int) def train_func(use_ray: bool, config: Dict): + local_start_time = time.monotonic() + if use_ray: from ray.air import session import ray.train as train @@ -197,13 +199,16 @@ def train_func(use_ray: bool, config: Dict): world_size=world_size, local_rank=local_rank, ) + + local_time_taken = time.monotonic() - local_start_time + if use_ray: - session.report(dict(loss=loss)) + session.report(dict(loss=loss, local_time_taken=local_time_taken)) else: print(f"Reporting loss: {loss:.4f}") if local_rank == 0: with open(VANILLA_RESULT_JSON, "w") as f: - json.dump({"loss": loss}, f) + json.dump({"loss": loss, "local_time_taken": local_time_taken}, f) def train_torch_ray_air( @@ -212,7 +217,7 @@ def train_torch_ray_air( num_workers: int = 4, cpus_per_worker: int = 8, use_gpu: bool = False, -) -> Tuple[float, float]: +) -> Tuple[float, float, float]: # This function is kicked off by the main() function and runs a full training # run using Ray AIR. from ray.train.torch import TorchTrainer @@ -236,7 +241,7 @@ def train_torch_ray_air( time_taken = time.monotonic() - start_time print(f"Last result: {result.metrics}") - return time_taken, result.metrics["loss"] + return time_taken, result.metrics["local_time_taken"], result.metrics["loss"] def train_torch_vanilla_worker( @@ -273,7 +278,7 @@ def train_torch_vanilla( num_workers: int = 4, cpus_per_worker: int = 8, use_gpu: bool = False, -) -> Tuple[float, float]: +) -> Tuple[float, float, float]: # This function is kicked off by the main() function and subsequently kicks # off tasks that run train_torch_vanilla_worker() on the worker nodes. from benchmark_util import ( @@ -363,8 +368,9 @@ def train_torch_vanilla( with open(VANILLA_RESULT_JSON, "r") as f: result = json.load(f) loss = result["loss"] + local_time_taken = result["local_time_taken"] - return time_taken, loss + return time_taken, local_time_taken, loss @click.group(help="Run Torch benchmarks") @@ -417,15 +423,17 @@ def run( run_command_on_all_nodes(["python", path]) times_ray = [] + times_local_ray = [] losses_ray = [] times_vanilla = [] + times_local_vanilla = [] losses_vanilla = [] for run in range(1, num_runs + 1): time.sleep(2) print(f"[Run {run}/{num_runs}] Running Torch Ray benchmark") - time_ray, loss_ray = train_torch_ray_air( + time_ray, time_local_ray, loss_ray = train_torch_ray_air( num_workers=num_workers, cpus_per_worker=cpus_per_worker, use_gpu=use_gpu, @@ -434,14 +442,15 @@ def run( print( f"[Run {run}/{num_runs}] Finished Ray training ({num_epochs} epochs) in " - f"{time_ray:.2f} seconds. Observed loss = {loss_ray:.4f}" + f"{time_ray:.2f} seconds (local training time: {time_local_ray:.2f}s). " + f"Observed loss = {loss_ray:.4f}" ) time.sleep(2) print(f"[Run {run}/{num_runs}] Running Torch vanilla benchmark") - time_vanilla, loss_vanilla = train_torch_vanilla( + time_vanilla, time_local_vanilla, loss_vanilla = train_torch_vanilla( num_workers=num_workers, cpus_per_worker=cpus_per_worker, use_gpu=use_gpu, @@ -450,40 +459,58 @@ def run( print( f"[Run {run}/{num_runs}] Finished vanilla training ({num_epochs} epochs) " - f"in {time_vanilla:.2f} seconds. Observed loss = {loss_vanilla:.4f}" + f"in {time_vanilla:.2f} seconds " + f"(local training time: {time_local_vanilla:.2f}s). " + f"Observed loss = {loss_vanilla:.4f}" ) print( f"[Run {run}/{num_runs}] Observed results: ", { - "torch_mnist_ray_time_s": time_ray, - "torch_mnist_ray_loss": loss_ray, - "torch_mnist_vanilla_time_s": time_vanilla, - "torch_mnist_vanilla_loss": loss_vanilla, + "tensorflow_mnist_ray_time_s": time_ray, + "tensorflow_mnist_ray_local_time_s": time_local_ray, + "tensorflow_mnist_ray_loss": loss_ray, + "tensorflow_mnist_vanilla_time_s": time_vanilla, + "tensorflow_mnist_vanilla_local_time_s": time_local_vanilla, + "tensorflow_mnist_vanilla_loss": loss_vanilla, }, ) times_ray.append(time_ray) + times_local_ray.append(time_local_ray) losses_ray.append(loss_ray) times_vanilla.append(time_vanilla) + times_local_vanilla.append(time_local_vanilla) losses_vanilla.append(loss_vanilla) times_ray_mean = np.mean(times_ray) times_ray_sd = np.std(times_ray) + times_local_ray_mean = np.mean(times_local_ray) + times_local_ray_sd = np.std(times_local_ray) + times_vanilla_mean = np.mean(times_vanilla) times_vanilla_sd = np.std(times_vanilla) + times_local_vanilla_mean = np.mean(times_local_vanilla) + times_local_vanilla_sd = np.std(times_local_vanilla) + result = { "torch_mnist_ray_num_runs": num_runs, "torch_mnist_ray_time_s_all": times_ray, "torch_mnist_ray_time_s_mean": times_ray_mean, "torch_mnist_ray_time_s_sd": times_ray_sd, + "torch_mnist_ray_time_local_s_all": times_local_ray, + "torch_mnist_ray_time_local_s_mean": times_local_ray_mean, + "torch_mnist_ray_time_local_s_sd": times_local_ray_sd, "torch_mnist_ray_loss_mean": np.mean(losses_ray), "torch_mnist_ray_loss_sd": np.std(losses_ray), "torch_mnist_vanilla_time_s_all": times_vanilla, "torch_mnist_vanilla_time_s_mean": times_vanilla_mean, "torch_mnist_vanilla_time_s_sd": times_vanilla_sd, + "torch_mnist_vanilla_local_time_s_all": times_local_vanilla, + "torch_mnist_vanilla_local_time_s_mean": times_local_vanilla_mean, + "torch_mnist_vanilla_local_time_s_sd": times_local_vanilla_sd, "torch_mnist_vanilla_loss_mean": np.mean(losses_vanilla), "torch_mnist_vanilla_loss_std": np.std(losses_vanilla), } @@ -494,18 +521,22 @@ def run( json.dump(result, f) target_ratio = 1.15 - ratio = (times_ray_mean / times_vanilla_mean) if times_vanilla_mean != 0.0 else 1.0 + ratio = ( + (times_local_ray_mean / times_local_vanilla_mean) + if times_local_vanilla_mean != 0.0 + else 1.0 + ) if ratio > target_ratio: raise RuntimeError( - f"Training on Ray took an average of {times_ray_mean:.2f} seconds, " + f"Training on Ray took an average of {times_local_ray_mean:.2f} seconds, " f"which is more than {target_ratio:.2f}x of the average vanilla training " - f"time of {times_vanilla_mean:.2f} seconds ({ratio:.2f}x). FAILED" + f"time of {times_local_vanilla_mean:.2f} seconds ({ratio:.2f}x). FAILED" ) print( - f"Training on Ray took an average of {times_ray_mean:.2f} seconds, " + f"Training on Ray took an average of {times_local_ray_mean:.2f} seconds, " f"which is less than {target_ratio:.2f}x of the average vanilla training " - f"time of {times_vanilla_mean:.2f} seconds ({ratio:.2f}x). PASSED" + f"time of {times_local_vanilla_mean:.2f} seconds ({ratio:.2f}x). PASSED" ) diff --git a/release/release_tests.yaml b/release/release_tests.yaml index 7dd8ef6f6..46df4e98a 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -454,7 +454,7 @@ cluster_compute: compute_cpu_4.yaml run: - timeout: 3600 + timeout: 5400 script: python workloads/tensorflow_benchmark.py run --num-runs 3 --num-epochs 20 --num-workers 4 --cpus-per-worker 8 wait_for_nodes: @@ -479,7 +479,7 @@ cluster_compute: compute_cpu_1.yaml run: - timeout: 3600 + timeout: 5400 script: python workloads/tensorflow_benchmark.py run --num-runs 3 --num-epochs 20 --num-workers 4 --cpus-per-worker 2 type: sdk_command @@ -528,7 +528,7 @@ cluster_compute: compute_gpu_4x4.yaml run: - timeout: 3600 + timeout: 5400 script: python workloads/tensorflow_benchmark.py run --num-runs 3 --num-epochs 200 --num-workers 16 --cpus-per-worker 4 --batch-size 64 --use-gpu wait_for_nodes: