mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
[doc] Fixes to RayDP docs (#14309)
* minor fix to raydp docs * fix pytorch and tensorflow samples * fix: minor fixes
This commit is contained in:
parent
6cd0cd3bd9
commit
e3f3269b15
1 changed files with 79 additions and 21 deletions
|
@ -35,13 +35,15 @@ For example,
|
||||||
|
|
||||||
.. code-block:: python
|
.. code-block:: python
|
||||||
|
|
||||||
|
import ray
|
||||||
import raydp
|
import raydp
|
||||||
|
|
||||||
|
ray.init()
|
||||||
spark = raydp.init_spark(
|
spark = raydp.init_spark(
|
||||||
app_name = "example",
|
app_name = "example",
|
||||||
num_executors = 10,
|
num_executors = 10,
|
||||||
executor_cores = 64,
|
executor_cores = 64,
|
||||||
memory_per_executor = "256GB"
|
executor_memory = "256GB"
|
||||||
)
|
)
|
||||||
|
|
||||||
====================================
|
====================================
|
||||||
|
@ -56,24 +58,49 @@ Training a Spark DataFrame with TensorFlow
|
||||||
|
|
||||||
.. code-block:: python
|
.. code-block:: python
|
||||||
|
|
||||||
d = [{'age': 17 , 'grade': 12}]
|
from pyspark.sql.functions import col
|
||||||
df = spark.createDataFrame(d).collect()
|
df = spark.range(1, 1000)
|
||||||
|
# calculate z = x + 2y + 1000
|
||||||
|
df = df.withColumn("x", col("id")*2)\
|
||||||
|
.withColumn("y", col("id") + 200)\
|
||||||
|
.withColumn("z", col("x") + 2*col("y") + 1000)
|
||||||
|
|
||||||
|
from raydp.utils import random_split
|
||||||
|
train_df, test_df = random_split(df, [0.7, 0.3])
|
||||||
|
|
||||||
|
# TensorFlow code
|
||||||
from tensorflow import keras
|
from tensorflow import keras
|
||||||
model = keras.Sequential([])
|
input_1 = keras.Input(shape=(1,))
|
||||||
|
input_2 = keras.Input(shape=(1,))
|
||||||
|
|
||||||
estimator = raydp.tf.TFEstimator(
|
concatenated = keras.layers.concatenate([input_1, input_2])
|
||||||
model = model,
|
output = keras.layers.Dense(1, activation='sigmoid')(concatenated)
|
||||||
num_worker = 10,
|
model = keras.Model(inputs=[input_1, input_2],
|
||||||
feature_columns = ["age"],
|
outputs=output)
|
||||||
label_column = ["grade"]
|
|
||||||
)
|
|
||||||
|
|
||||||
estimator.fit_on_spark(df, test_df=None)
|
optimizer = keras.optimizers.Adam(0.01)
|
||||||
|
loss = keras.losses.MeanSquaredError()
|
||||||
|
|
||||||
|
from raydp.tf import TFEstimator
|
||||||
|
estimator = TFEstimator(
|
||||||
|
num_workers=2,
|
||||||
|
model=model,
|
||||||
|
optimizer=optimizer,
|
||||||
|
loss=loss,
|
||||||
|
metrics=["accuracy", "mse"],
|
||||||
|
feature_columns=["x", "y"],
|
||||||
|
label_column="z",
|
||||||
|
batch_size=1000,
|
||||||
|
num_epochs=2,
|
||||||
|
use_gpu=False,
|
||||||
|
config={"fit_config": {"steps_per_epoch": 2}})
|
||||||
|
|
||||||
|
estimator.fit_on_spark(train_df, test_df)
|
||||||
|
|
||||||
tensorflow_model = estimator.get_model()
|
tensorflow_model = estimator.get_model()
|
||||||
|
|
||||||
|
estimator.shutdown()
|
||||||
|
|
||||||
|
|
||||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
Training a Spark DataFrame with PyTorch
|
Training a Spark DataFrame with PyTorch
|
||||||
|
@ -84,21 +111,52 @@ PyTorch.
|
||||||
|
|
||||||
.. code-block:: python
|
.. code-block:: python
|
||||||
|
|
||||||
d = [{'age': 17 , 'grade': 12}]
|
from pyspark.sql.functions import col
|
||||||
df = spark.createDataFrame(d).collect()
|
df = spark.range(1, 1000)
|
||||||
|
# calculate z = x + 2y + 1000
|
||||||
|
df = df.withColumn("x", col("id")*2)\
|
||||||
|
.withColumn("y", col("id") + 200)\
|
||||||
|
.withColumn("z", col("x") + 2*col("y") + 1000)
|
||||||
|
|
||||||
|
from raydp.utils import random_split
|
||||||
|
train_df, test_df = random_split(df, [0.7, 0.3])
|
||||||
|
|
||||||
|
# PyTorch Code
|
||||||
import torch
|
import torch
|
||||||
model = torch.nn.Sequential()
|
class LinearModel(torch.nn.Module):
|
||||||
|
def __init__(self):
|
||||||
|
super(LinearModel, self).__init__()
|
||||||
|
self.linear = torch.nn.Linear(2, 1)
|
||||||
|
|
||||||
estimator = raydp.tf.TFEstimator(
|
def forward(self, x, y):
|
||||||
|
x = torch.cat([x, y], dim=1)
|
||||||
|
return self.linear(x)
|
||||||
|
|
||||||
|
model = LinearModel()
|
||||||
|
optimizer = torch.optim.Adam(model.parameters())
|
||||||
|
loss_fn = torch.nn.MSELoss()
|
||||||
|
|
||||||
|
def lr_scheduler_creator(optimizer, config):
|
||||||
|
return torch.optim.lr_scheduler.MultiStepLR(
|
||||||
|
optimizer, milestones=[150, 250, 350], gamma=0.1)
|
||||||
|
|
||||||
|
# You can use the RayDP Estimator API or libraries like RaySGD for distributed training.
|
||||||
|
from raydp.torch import TorchEstimator
|
||||||
|
estimator = TorchEstimator(
|
||||||
|
num_workers = 2,
|
||||||
model = model,
|
model = model,
|
||||||
num_worker = 10,
|
optimizer = optimizer,
|
||||||
feature_columns = ["age"],
|
loss = loss_fn,
|
||||||
label_column = ["grade"]
|
lr_scheduler_creator=lr_scheduler_creator,
|
||||||
|
feature_columns = ["x", "y"],
|
||||||
|
label_column = ["z"],
|
||||||
|
batch_size = 1000,
|
||||||
|
num_epochs = 2
|
||||||
)
|
)
|
||||||
|
|
||||||
estimator.fit_on_spark(df, test_df=None)
|
estimator.fit_on_spark(train_df, test_df)
|
||||||
|
|
||||||
pytorch_model = estimator.get_model()
|
pytorch_model = estimator.get_model()
|
||||||
|
|
||||||
|
estimator.shutdown()
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue