2022-07-16 21:14:21 -07:00
|
|
|
# flake8: noqa
|
|
|
|
# isort: skip_file
|
|
|
|
|
|
|
|
# __use_predictor_start__
|
|
|
|
import numpy as np
|
|
|
|
import tensorflow as tf
|
|
|
|
|
|
|
|
import ray
|
|
|
|
from ray.train.batch_predictor import BatchPredictor
|
|
|
|
from ray.train.tensorflow import (
|
2022-07-20 19:33:27 -07:00
|
|
|
TensorflowCheckpoint,
|
2022-07-16 21:14:21 -07:00
|
|
|
TensorflowPredictor,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def build_model() -> tf.keras.Model:
|
|
|
|
model = tf.keras.Sequential(
|
|
|
|
[
|
2022-07-22 11:09:36 -06:00
|
|
|
tf.keras.layers.InputLayer(input_shape=()),
|
|
|
|
# Add feature dimension, expanding (batch_size,) to (batch_size, 1).
|
|
|
|
tf.keras.layers.Flatten(),
|
2022-07-16 21:14:21 -07:00
|
|
|
tf.keras.layers.Dense(1),
|
|
|
|
]
|
|
|
|
)
|
|
|
|
return model
|
|
|
|
|
|
|
|
|
|
|
|
model = build_model()
|
2022-07-20 19:33:27 -07:00
|
|
|
checkpoint = TensorflowCheckpoint.from_model(model)
|
2022-07-16 21:14:21 -07:00
|
|
|
predictor = TensorflowPredictor.from_checkpoint(
|
|
|
|
checkpoint, model_definition=build_model
|
|
|
|
)
|
|
|
|
|
|
|
|
data = np.array([1, 2, 3, 4])
|
|
|
|
predictions = predictor.predict(data)
|
|
|
|
print(predictions)
|
|
|
|
# [[-1.6930283]
|
|
|
|
# [-3.3860567]
|
|
|
|
# [-5.079085 ]
|
|
|
|
# [-6.7721133]]
|
|
|
|
# __use_predictor_end__
|
|
|
|
|
|
|
|
# __batch_prediction_start__
|
|
|
|
import pandas as pd
|
|
|
|
from ray.train.batch_predictor import BatchPredictor
|
|
|
|
|
|
|
|
batch_predictor = BatchPredictor(
|
|
|
|
checkpoint, TensorflowPredictor, model_definition=build_model
|
|
|
|
)
|
|
|
|
# Create a dummy dataset.
|
|
|
|
ds = ray.data.from_pandas(pd.DataFrame({"feature_1": [1, 2, 3], "label": [1, 2, 3]}))
|
|
|
|
|
|
|
|
# Use `feature_columns` to specify the input columns to your model.
|
|
|
|
predictions = batch_predictor.predict(ds, feature_columns=["feature_1"])
|
|
|
|
print(predictions.show())
|
|
|
|
# {'predictions': array([-1.2789773], dtype=float32)}
|
|
|
|
# {'predictions': array([-2.5579545], dtype=float32)}
|
|
|
|
# {'predictions': array([-3.8369317], dtype=float32)}
|
|
|
|
# __batch_prediction_end__
|
|
|
|
|
|
|
|
# __compute_accuracy_start__
|
|
|
|
def calculate_accuracy(df):
|
|
|
|
return pd.DataFrame({"correct": int(df["predictions"][0]) == df["label"]})
|
|
|
|
|
|
|
|
|
|
|
|
predictions = batch_predictor.predict(
|
|
|
|
ds, feature_columns=["feature_1"], keep_columns=["label"]
|
|
|
|
)
|
|
|
|
print(predictions.show())
|
|
|
|
# {'predictions': array([-1.2789773], dtype=float32), 'label': 0}
|
|
|
|
# {'predictions': array([-2.5579545], dtype=float32), 'label': 1}
|
|
|
|
# {'predictions': array([-3.8369317], dtype=float32), 'label': 0}
|
|
|
|
|
|
|
|
correct = predictions.map_batches(calculate_accuracy)
|
|
|
|
print("Final accuracy: ", correct.mean(on="correct"))
|
|
|
|
# Final accuracy: 0.5
|
|
|
|
# __compute_accuracy_end__
|
|
|
|
|
|
|
|
# __pipelined_prediction_start__
|
|
|
|
import pandas as pd
|
|
|
|
import ray
|
|
|
|
from ray.air import Checkpoint
|
|
|
|
from ray.train.predictor import Predictor
|
|
|
|
from ray.train.batch_predictor import BatchPredictor
|
|
|
|
|
|
|
|
# Create a BatchPredictor that always returns `42` for each input.
|
|
|
|
batch_pred = BatchPredictor.from_pandas_udf(
|
|
|
|
lambda data: pd.DataFrame({"a": [42] * len(data)})
|
|
|
|
)
|
|
|
|
|
|
|
|
# Create a dummy dataset.
|
|
|
|
ds = ray.data.range_tensor(200, parallelism=4)
|
|
|
|
# Setup a prediction pipeline.
|
|
|
|
pipeline = batch_pred.predict_pipelined(ds, blocks_per_window=1)
|
|
|
|
for batch in pipeline.iter_batches():
|
|
|
|
print("Pipeline result", batch)
|
|
|
|
# 0 42
|
|
|
|
# 1 42
|
|
|
|
# ...
|
|
|
|
# __pipelined_prediction_end__
|