[AIR] Replace references of to_torch with iter_torch_batches (#27574)

This commit is contained in:
Cheng Su 2022-08-07 20:14:12 -07:00 committed by GitHub
parent a61095a480
commit aeb2346804
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 56 additions and 83 deletions

View file

@ -37,7 +37,7 @@ For example, in the following pipeline, the ``map(func)`` transformation only oc
@ray.remote(num_gpus=1)
def train_func(pipe: DatasetPipeline):
model = MyModel()
for batch in pipe.to_torch():
for batch in pipe.iter_torch_batches():
model.fit(batch)
# Read from the pipeline in a remote training function.

View file

@ -286,8 +286,8 @@ Supported Output Formats
* - Pandas Dataframe Iterator
- :meth:`ds.iter_batches(batch_format="pandas") <ray.data.Dataset.iter_batches>`
- ✅
* - PyTorch Iterable Dataset
- :meth:`ds.to_torch() <ray.data.Dataset.to_torch>`
* - PyTorch Tensor Iterator
- :meth:`ds.iter_torch_batches() <ray.data.Dataset.iter_torch_batches>`
- ✅
* - TensorFlow Iterable Dataset
- :meth:`ds.to_tf() <ray.data.Dataset.to_tf>`

View file

@ -92,10 +92,8 @@ import torch
ds = ray.data.range(10000)
torch_ds: torch.utils.data.IterableDataset = ds.to_torch(batch_size=2)
num_batches = 0
for batch, _ in torch_ds:
for batch in ds.iter_torch_batches(batch_size=2):
assert isinstance(batch, torch.Tensor)
assert batch.size(dim=0) == 2
num_batches += 1
@ -118,25 +116,17 @@ df = pd.DataFrame({
})
ds = ray.data.from_pandas(df)
# Specify the label column; all other columns will be treated as feature columns and
# will be concatenated into the same Torch tensor.
# We set unsqueeze_label_tensor=False in order to remove a redundant unit column
# dimension.
torch_ds: torch.utils.data.IterableDataset = ds.to_torch(
label_column="label",
batch_size=2,
unsqueeze_label_tensor=False,
)
num_batches = 0
for feature, label in torch_ds:
assert isinstance(feature, torch.Tensor)
for batch in ds.iter_torch_batches(batch_size=2):
feature1 = batch["feature1"]
feature2 = batch["feature2"]
label = batch["label"]
assert isinstance(feature1, torch.Tensor)
assert isinstance(feature2, torch.Tensor)
assert isinstance(label, torch.Tensor)
# Batch dimension.
assert feature.size(dim=0) == 2
# Column dimension.
assert feature.size(dim=1) == 2
# Batch dimension.
assert feature1.size(dim=0) == 2
assert feature2.size(dim=0) == 2
assert label.size(dim=0) == 2
num_batches += 1
@ -218,7 +208,7 @@ class Worker:
pass
def train(self, shard: ray.data.Dataset[int]) -> int:
for batch in shard.to_torch(batch_size=256):
for batch in shard.iter_torch_batches(batch_size=256):
pass
return shard.count()

View file

@ -78,7 +78,7 @@ import ray
torch_ds = ray.data.read_parquet("example://iris.parquet") \
.repeat() \
.random_shuffle_each_window() \
.to_torch()
.iter_torch_batches()
# Streaming batch inference pipeline that pipelines the transforming of a single
# file with the reading of a single file (at most 2 file's worth of data in-flight

View file

@ -42,14 +42,10 @@ def train_loop_per_worker():
)
for epoch in range(num_epochs):
model.train()
for inputs, labels in iter(
dataset_shard.to_torch(
label_column="y",
label_column_dtype=torch.float,
feature_column_dtypes=torch.float,
batch_size=32,
)
for batch in dataset_shard.iter_torch_batches(
batch_size=32, dtypes=torch.float
):
inputs, labels = torch.unsqueeze(batch["x"], 1), batch["y"]
inputs.to(device)
labels.to(device)
outputs = model(inputs)

View file

@ -142,7 +142,7 @@
"source": [
"Note that {py:class}`SimpleTorchDatasource <ray.data.datasource.SimpleTorchDatasource>` loads all data into memory, so you shouldn't use it with larger datasets.\n",
"\n",
"Next, let's represent our data using pandas dataframes instead of tuples. This lets us call methods like {py:meth}`Dataset.to_torch <ray.data.Dataset.to_torch>` later in the tutorial."
"Next, let's represent our data using pandas dataframes instead of tuples. This lets us call methods like {py:meth}`Dataset.iter_torch_batches <ray.data.Dataset.iter_torch_batches>` later in the tutorial."
]
},
{
@ -280,7 +280,7 @@
"\n",
"`train_loop_per_worker` contains regular PyTorch code with a few notable exceptions:\n",
"* We wrap our model with {py:func}`train.torch.prepare_model <ray.train.torch.prepare_model>`.\n",
"* We call {py:func}`session.get_dataset_shard <ray.air.session.get_dataset_shard>` and {py:meth}`Dataset.to_torch <ray.data.Dataset.to_torch>` to convert a subset of our training data to a Torch dataset.\n",
"* We call {py:func}`session.get_dataset_shard <ray.air.session.get_dataset_shard>` and {py:meth}`Dataset.iter_torch_batches <ray.data.Dataset.iter_torch_batches>` to convert a subset of our training data to a Torch dataset.\n",
"* We save model state using {py:func}`session.report <ray.air.session.report>`."
]
},
@ -302,19 +302,15 @@
" criterion = nn.CrossEntropyLoss()\n",
" optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)\n",
"\n",
" train_dataset_shard: torch.utils.data.Dataset = session.get_dataset_shard(\"train\").to_torch(\n",
" feature_columns=[\"image\"],\n",
" label_column=\"label\",\n",
" train_dataset_shard = session.get_dataset_shard(\"train\").iter_torch_batches(\n",
" batch_size=config[\"batch_size\"],\n",
" unsqueeze_feature_tensors=False,\n",
" unsqueeze_label_tensor=False\n",
" )\n",
"\n",
" for epoch in range(2):\n",
" running_loss = 0.0\n",
" for i, data in enumerate(train_dataset_shard):\n",
" # get the inputs; data is a list of [inputs, labels]\n",
" inputs, labels = data\n",
" # get the inputs and labels\n",
" inputs, labels = data[\"image\"], data[\"label\"]\n",
" print(inputs)\n",
" print(labels)\n",
"\n",

View file

@ -461,17 +461,15 @@
" criterion = CrossEntropyLoss()\n",
"\n",
" # Get the Ray Dataset shard for this data parallel worker, and convert it to a PyTorch Dataset.\n",
" dataset_shard = session.get_dataset_shard(\"train\").to_torch(\n",
" label_column=\"label\",\n",
" dataset_shard = session.get_dataset_shard(\"train\").iter_torch_batches(\n",
" batch_size=batch_size,\n",
" unsqueeze_feature_tensors=False,\n",
" unsqueeze_label_tensor=False,\n",
" )\n",
"\n",
" for epoch_idx in range(num_epochs):\n",
" running_loss = 0\n",
" for iteration, (train_mb_x, train_mb_y) in enumerate(dataset_shard):\n",
" for iteration, batch in enumerate(dataset_shard):\n",
" optimizer.zero_grad()\n",
" train_mb_x, train_mb_y = batch[\"image\"], batch[\"label\"]\n",
" train_mb_x = train_mb_x.to(train.torch.get_device())\n",
" train_mb_y = train_mb_y.to(train.torch.get_device())\n",
"\n",

View file

@ -1144,7 +1144,7 @@ Reproducibility
def training_func(config):
dataloader = ray.train.get_dataset()\
.get_shard(torch.rank())\
.to_torch(batch_size=config["batch_size"])
.iter_torch_batches(batch_size=config["batch_size"])
for i in config["epochs"]:
ray.train.report(...) # use same intermediate reporting API

View file

@ -20,16 +20,16 @@ class MyPytorchTrainer(BaseTrainer):
# preprocessed by self.preprocessor
dataset = self.datasets["train"]
torch_ds = dataset.to_torch(label_column="y")
loss_fn = torch.nn.MSELoss()
for epoch_idx in range(10):
loss = 0
num_batches = 0
for X, y in iter(torch_ds):
for batch in dataset.iter_torch_batches(dtypes=torch.float):
# Compute prediction error
pred = self.model(X.float())
batch_loss = loss_fn(pred, y.float())
X, y = torch.unsqueeze(batch["x"], 1), batch["y"]
pred = self.model(X)
batch_loss = loss_fn(pred, y)
# Backpropagation
self.optimizer.zero_grad()

View file

@ -88,21 +88,14 @@ def train_func(config):
results = []
def create_torch_iterator(shard):
iterator = shard.iter_torch_batches(batch_size=batch_size)
for batch in iterator:
yield batch["x"].float(), batch["y"].float()
for _ in range(epochs):
train_torch_dataset = train_dataset_shard.to_torch(
label_column="y",
feature_columns=["x"],
label_column_dtype=torch.float,
feature_column_dtypes=torch.float,
batch_size=batch_size,
)
validation_torch_dataset = validation_dataset.to_torch(
label_column="y",
feature_columns=["x"],
label_column_dtype=torch.float,
feature_column_dtypes=torch.float,
batch_size=batch_size,
)
train_torch_dataset = create_torch_iterator(train_dataset_shard)
validation_torch_dataset = create_torch_iterator(validation_dataset)
device = train.torch.get_device()

View file

@ -223,8 +223,9 @@ def get_dataset_shard(
) -> Optional[Union["Dataset", "DatasetPipeline"]]:
"""Returns the Ray Dataset or DatasetPipeline shard for this worker.
You should call ``to_torch()`` or ``to_tf()`` on this shard to convert
it to the appropriate framework-specific Dataset.
You should call ``iter_torch_batches()`` or ``iter_tf_batches()``
on this shard to convert it to the appropriate
framework-specific data type.
.. code-block:: python
@ -237,8 +238,9 @@ def get_dataset_shard(
model = Net()
for iter in range(100):
# Trainer will automatically handle sharding.
data_shard = session.get_dataset_shard().to_torch()
model.train(data_shard)
data_shard = session.get_dataset_shard("train")
for batch in data_shard.iter_torch_batches():
# ...
return model
train_dataset = ray.data.from_items(

View file

@ -84,16 +84,17 @@ class BaseTrainer(abc.ABC):
# preprocessed by self.preprocessor
dataset = self.datasets["train"]
torch_ds = dataset.to_torch(label_column="y")
torch_ds = dataset.iter_torch_batches(dtypes=torch.float)
loss_fn = torch.nn.MSELoss()
for epoch_idx in range(10):
loss = 0
num_batches = 0
for X, y in iter(torch_ds):
for batch in torch_ds:
X, y = torch.unsqueeze(batch["x"], 1), batch["y"]
# Compute prediction error
pred = self.model(X)
batch_loss = loss_fn(pred, y.float())
batch_loss = loss_fn(pred, y)
# Backpropagation
self.optimizer.zero_grad()

View file

@ -122,14 +122,10 @@ class HorovodTrainer(DataParallelTrainer):
)
for epoch in range(num_epochs):
model.train()
for inputs, labels in iter(
dataset_shard.to_torch(
label_column="y",
label_column_dtype=torch.float,
feature_column_dtypes=torch.float,
batch_size=32,
)
for batch in dataset_shard.iter_torch_batches(
batch_size=32, dtypes=torch.float
):
inputs, labels = torch.unsqueeze(batch["x"], 1), batch["y"]
inputs.to(device)
labels.to(device)
outputs = model(inputs)

View file

@ -80,7 +80,7 @@ class TorchTrainer(DataParallelTrainer):
# Configures the dataloader for distributed training by adding a
# `DistributedSampler`.
# You should NOT use this if you are doing
# `session.get_dataset_shard(...).to_torch(...)`
# `session.get_dataset_shard(...).iter_torch_batches(...)`
train.torch.prepare_data_loader(...)
# Returns the current torch device.

View file

@ -31,8 +31,8 @@ def get_dataset_shard(
) -> Optional[Union["Dataset", "DatasetPipeline"]]:
"""Returns the Ray Dataset or DatasetPipeline shard for this worker.
You should call ``to_torch()`` or ``to_tf()`` on this shard to convert
it to the appropriate framework-specific Dataset.
You should call ``iter_torch_batches()`` or ``iter_tf_batches()`` on this shard
to convert it to the appropriate framework-specific data type.
.. code-block:: python
@ -42,8 +42,9 @@ def get_dataset_shard(
def train_func():
model = Net()
for iter in range(100):
data_shard = train.get_dataset_shard().to_torch()
model.train(data_shard)
data_shard = session.get_dataset_shard("train")
for batch in data_shard.iter_torch_batches():
# ...
return model
dataset = ray.data.read_csv("train.csv")