# flake8: noqa # isort: skip_file # __air_generic_preprocess_start__ import ray from ray.data.preprocessors import StandardScaler from ray.air.config import ScalingConfig dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv") # Split data into train and validation. train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3) # Create a test dataset by dropping the target column. test_dataset = valid_dataset.map_batches( lambda df: df.drop("target", axis=1), batch_format="pandas" ) # Create a preprocessor to scale some columns columns_to_scale = ["mean radius", "mean texture"] preprocessor = StandardScaler(columns=columns_to_scale) # __air_generic_preprocess_end__ # __air_tf_preprocess_start__ import numpy as np import pandas as pd from ray.data.preprocessors import Concatenator, Chain # Chain the preprocessors together. preprocessor = Chain( preprocessor, Concatenator(exclude=["target"], dtype=np.float32), ) # __air_tf_preprocess_end__ # __air_tf_train_start__ import tensorflow as tf from tensorflow.keras.callbacks import Callback from tensorflow import keras from tensorflow.keras import layers from ray import train from ray.air import session from ray.air.config import ScalingConfig from ray.air.callbacks.keras import Callback as KerasCallback from ray.train.tensorflow import ( TensorflowTrainer, TensorflowCheckpoint, prepare_dataset_shard, ) def create_keras_model(input_features): return keras.Sequential( [ keras.Input(shape=(input_features,)), layers.Dense(16, activation="relu"), layers.Dense(16, activation="relu"), layers.Dense(1), ] ) def to_tf_dataset(dataset, batch_size): def to_tensor_iterator(): data_iterator = dataset.iter_batches( batch_format="numpy", batch_size=batch_size ) for d in data_iterator: yield ( # "concat_out" is the output column of the Concatenator. tf.convert_to_tensor(d["concat_out"], dtype=tf.float32), tf.convert_to_tensor(d["target"], dtype=tf.float32), ) output_signature = ( tf.TensorSpec(shape=(None, num_features), dtype=tf.float32), tf.TensorSpec(shape=(None), dtype=tf.float32), ) tf_dataset = tf.data.Dataset.from_generator( to_tensor_iterator, output_signature=output_signature ) return prepare_dataset_shard(tf_dataset) def train_loop_per_worker(config): batch_size = config["batch_size"] lr = config["lr"] epochs = config["num_epochs"] num_features = config["num_features"] # Get the Ray Dataset shard for this data parallel worker, # and convert it to a Tensorflow Dataset. train_data = train.get_dataset_shard("train") strategy = tf.distribute.MultiWorkerMirroredStrategy() with strategy.scope(): # Model building/compiling need to be within `strategy.scope()`. multi_worker_model = create_keras_model(num_features) multi_worker_model.compile( optimizer=tf.keras.optimizers.SGD(learning_rate=lr), loss=tf.keras.losses.BinaryCrossentropy(from_logits=True), metrics=[ tf.keras.metrics.BinaryCrossentropy( name="loss", ) ], ) results = [] for _ in range(epochs): tf_dataset = to_tf_dataset(dataset=train_data, batch_size=batch_size) history = multi_worker_model.fit( tf_dataset, callbacks=[KerasCallback()], verbose=0, ) return results num_features = len(train_dataset.schema().names) - 1 trainer = TensorflowTrainer( train_loop_per_worker=train_loop_per_worker, train_loop_config={ "batch_size": 128, "num_epochs": 50, "num_features": num_features, "lr": 0.0001, }, scaling_config=ScalingConfig( num_workers=2, # Number of data parallel training workers use_gpu=False, trainer_resources={"CPU": 0}, # so that the example works on Colab. ), datasets={"train": train_dataset}, preprocessor=preprocessor, ) result = trainer.fit() print(f"Last result: {result.metrics}") # Last result: {'loss': 8.997025489807129, ...} # __air_tf_train_end__ # __air_tf_tuner_start__ from ray import tune param_space = {"train_loop_config": {"lr": tune.loguniform(0.0001, 0.01)}} metric = "loss" # __air_tf_tuner_end__ # __air_tune_generic_start__ from ray.tune.tuner import Tuner, TuneConfig tuner = Tuner( trainer, param_space=param_space, tune_config=TuneConfig(num_samples=3, metric=metric, mode="min"), ) # Execute tuning. result_grid = tuner.fit() # Fetch the best result. best_result = result_grid.get_best_result() print("Best Result:", best_result) # Best Result: Result(metrics={'loss': 4.997025489807129, ...) # __air_tune_generic_end__ # __air_tf_batchpred_start__ from ray.train.batch_predictor import BatchPredictor from ray.train.tensorflow import TensorflowPredictor # You can also create a checkpoint from a trained model using `TensorflowCheckpoint`. checkpoint = best_result.checkpoint batch_predictor = BatchPredictor.from_checkpoint( checkpoint, TensorflowPredictor, model_definition=lambda: create_keras_model(num_features), ) predicted_probabilities = batch_predictor.predict(test_dataset) predicted_probabilities.show() # {'predictions': 0.033036969602108} # {'predictions': 0.05944341793656349} # {'predictions': 0.1657751202583313} # ... # __air_tf_batchpred_end__