From e3f3269b150cadc63cecc287adfefc13bc19fe75 Mon Sep 17 00:00:00 2001 From: Guy Khazma <33684427+guykhazma@users.noreply.github.com> Date: Thu, 25 Feb 2021 21:23:10 +0200 Subject: [PATCH] [doc] Fixes to RayDP docs (#14309) * minor fix to raydp docs * fix pytorch and tensorflow samples * fix: minor fixes --- doc/source/raydp.rst | 100 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 79 insertions(+), 21 deletions(-) diff --git a/doc/source/raydp.rst b/doc/source/raydp.rst index a0ee98282..665d22313 100644 --- a/doc/source/raydp.rst +++ b/doc/source/raydp.rst @@ -35,13 +35,15 @@ For example, .. code-block:: python + import ray import raydp + ray.init() spark = raydp.init_spark( app_name = "example", num_executors = 10, executor_cores = 64, - memory_per_executor = "256GB" + executor_memory = "256GB" ) ==================================== @@ -56,24 +58,49 @@ Training a Spark DataFrame with TensorFlow .. code-block:: python - d = [{'age': 17 , 'grade': 12}] - df = spark.createDataFrame(d).collect() - + from pyspark.sql.functions import col + df = spark.range(1, 1000) + # calculate z = x + 2y + 1000 + df = df.withColumn("x", col("id")*2)\ + .withColumn("y", col("id") + 200)\ + .withColumn("z", col("x") + 2*col("y") + 1000) + + from raydp.utils import random_split + train_df, test_df = random_split(df, [0.7, 0.3]) + # TensorFlow code from tensorflow import keras - model = keras.Sequential([]) + input_1 = keras.Input(shape=(1,)) + input_2 = keras.Input(shape=(1,)) - estimator = raydp.tf.TFEstimator( - model = model, - num_worker = 10, - feature_columns = ["age"], - label_column = ["grade"] - ) + concatenated = keras.layers.concatenate([input_1, input_2]) + output = keras.layers.Dense(1, activation='sigmoid')(concatenated) + model = keras.Model(inputs=[input_1, input_2], + outputs=output) - estimator.fit_on_spark(df, test_df=None) + optimizer = keras.optimizers.Adam(0.01) + loss = keras.losses.MeanSquaredError() + + from raydp.tf import TFEstimator + estimator = TFEstimator( + num_workers=2, + model=model, + optimizer=optimizer, + loss=loss, + metrics=["accuracy", "mse"], + feature_columns=["x", "y"], + label_column="z", + batch_size=1000, + num_epochs=2, + use_gpu=False, + config={"fit_config": {"steps_per_epoch": 2}}) + + estimator.fit_on_spark(train_df, test_df) tensorflow_model = estimator.get_model() + estimator.shutdown() + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Training a Spark DataFrame with PyTorch @@ -84,21 +111,52 @@ PyTorch. .. code-block:: python - d = [{'age': 17 , 'grade': 12}] - df = spark.createDataFrame(d).collect() - + from pyspark.sql.functions import col + df = spark.range(1, 1000) + # calculate z = x + 2y + 1000 + df = df.withColumn("x", col("id")*2)\ + .withColumn("y", col("id") + 200)\ + .withColumn("z", col("x") + 2*col("y") + 1000) + + from raydp.utils import random_split + train_df, test_df = random_split(df, [0.7, 0.3]) + # PyTorch Code import torch - model = torch.nn.Sequential() + class LinearModel(torch.nn.Module): + def __init__(self): + super(LinearModel, self).__init__() + self.linear = torch.nn.Linear(2, 1) - estimator = raydp.tf.TFEstimator( + def forward(self, x, y): + x = torch.cat([x, y], dim=1) + return self.linear(x) + + model = LinearModel() + optimizer = torch.optim.Adam(model.parameters()) + loss_fn = torch.nn.MSELoss() + + def lr_scheduler_creator(optimizer, config): + return torch.optim.lr_scheduler.MultiStepLR( + optimizer, milestones=[150, 250, 350], gamma=0.1) + + # You can use the RayDP Estimator API or libraries like RaySGD for distributed training. + from raydp.torch import TorchEstimator + estimator = TorchEstimator( + num_workers = 2, model = model, - num_worker = 10, - feature_columns = ["age"], - label_column = ["grade"] + optimizer = optimizer, + loss = loss_fn, + lr_scheduler_creator=lr_scheduler_creator, + feature_columns = ["x", "y"], + label_column = ["z"], + batch_size = 1000, + num_epochs = 2 ) - estimator.fit_on_spark(df, test_df=None) + estimator.fit_on_spark(train_df, test_df) pytorch_model = estimator.get_model() + + estimator.shutdown()