From aeb2346804b2f1140c872a081b16a02ff6d00a83 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Sun, 7 Aug 2022 20:14:12 -0700 Subject: [PATCH] [AIR] Replace references of `to_torch` with `iter_torch_batches` (#27574) --- doc/source/data/advanced-pipelines.rst | 2 +- doc/source/data/dataset.rst | 4 +-- .../data/doc_code/accessing_datasets.py | 30 +++++++------------ doc/source/data/doc_code/key_concepts.py | 2 +- doc/source/ray-air/doc_code/hvd_trainer.py | 10 ++----- .../examples/torch_image_example.ipynb | 14 ++++----- .../examples/torch_incremental_learning.ipynb | 8 ++--- doc/source/train/dl_guide.rst | 2 +- python/ray/air/examples/custom_trainer.py | 8 ++--- .../pytorch/torch_regression_example.py | 21 +++++-------- python/ray/air/session.py | 10 ++++--- python/ray/train/base_trainer.py | 7 +++-- python/ray/train/horovod/horovod_trainer.py | 10 ++----- python/ray/train/torch/torch_trainer.py | 2 +- python/ray/train/train_loop_utils.py | 9 +++--- 15 files changed, 56 insertions(+), 83 deletions(-) diff --git a/doc/source/data/advanced-pipelines.rst b/doc/source/data/advanced-pipelines.rst index b20b36810..fb4549232 100644 --- a/doc/source/data/advanced-pipelines.rst +++ b/doc/source/data/advanced-pipelines.rst @@ -37,7 +37,7 @@ For example, in the following pipeline, the ``map(func)`` transformation only oc @ray.remote(num_gpus=1) def train_func(pipe: DatasetPipeline): model = MyModel() - for batch in pipe.to_torch(): + for batch in pipe.iter_torch_batches(): model.fit(batch) # Read from the pipeline in a remote training function. diff --git a/doc/source/data/dataset.rst b/doc/source/data/dataset.rst index d2bfedbd8..cfddc6e47 100644 --- a/doc/source/data/dataset.rst +++ b/doc/source/data/dataset.rst @@ -286,8 +286,8 @@ Supported Output Formats * - Pandas Dataframe Iterator - :meth:`ds.iter_batches(batch_format="pandas") ` - ✅ - * - PyTorch Iterable Dataset - - :meth:`ds.to_torch() ` + * - PyTorch Tensor Iterator + - :meth:`ds.iter_torch_batches() ` - ✅ * - TensorFlow Iterable Dataset - :meth:`ds.to_tf() ` diff --git a/doc/source/data/doc_code/accessing_datasets.py b/doc/source/data/doc_code/accessing_datasets.py index 091b25d35..303a6148c 100644 --- a/doc/source/data/doc_code/accessing_datasets.py +++ b/doc/source/data/doc_code/accessing_datasets.py @@ -92,10 +92,8 @@ import torch ds = ray.data.range(10000) -torch_ds: torch.utils.data.IterableDataset = ds.to_torch(batch_size=2) - num_batches = 0 -for batch, _ in torch_ds: +for batch in ds.iter_torch_batches(batch_size=2): assert isinstance(batch, torch.Tensor) assert batch.size(dim=0) == 2 num_batches += 1 @@ -118,25 +116,17 @@ df = pd.DataFrame({ }) ds = ray.data.from_pandas(df) -# Specify the label column; all other columns will be treated as feature columns and -# will be concatenated into the same Torch tensor. -# We set unsqueeze_label_tensor=False in order to remove a redundant unit column -# dimension. -torch_ds: torch.utils.data.IterableDataset = ds.to_torch( - label_column="label", - batch_size=2, - unsqueeze_label_tensor=False, -) - num_batches = 0 -for feature, label in torch_ds: - assert isinstance(feature, torch.Tensor) +for batch in ds.iter_torch_batches(batch_size=2): + feature1 = batch["feature1"] + feature2 = batch["feature2"] + label = batch["label"] + assert isinstance(feature1, torch.Tensor) + assert isinstance(feature2, torch.Tensor) assert isinstance(label, torch.Tensor) # Batch dimension. - assert feature.size(dim=0) == 2 - # Column dimension. - assert feature.size(dim=1) == 2 - # Batch dimension. + assert feature1.size(dim=0) == 2 + assert feature2.size(dim=0) == 2 assert label.size(dim=0) == 2 num_batches += 1 @@ -218,7 +208,7 @@ class Worker: pass def train(self, shard: ray.data.Dataset[int]) -> int: - for batch in shard.to_torch(batch_size=256): + for batch in shard.iter_torch_batches(batch_size=256): pass return shard.count() diff --git a/doc/source/data/doc_code/key_concepts.py b/doc/source/data/doc_code/key_concepts.py index 30ffb4fd3..7dadc50bd 100644 --- a/doc/source/data/doc_code/key_concepts.py +++ b/doc/source/data/doc_code/key_concepts.py @@ -78,7 +78,7 @@ import ray torch_ds = ray.data.read_parquet("example://iris.parquet") \ .repeat() \ .random_shuffle_each_window() \ - .to_torch() + .iter_torch_batches() # Streaming batch inference pipeline that pipelines the transforming of a single # file with the reading of a single file (at most 2 file's worth of data in-flight diff --git a/doc/source/ray-air/doc_code/hvd_trainer.py b/doc/source/ray-air/doc_code/hvd_trainer.py index 3cb2fad05..eff640acb 100644 --- a/doc/source/ray-air/doc_code/hvd_trainer.py +++ b/doc/source/ray-air/doc_code/hvd_trainer.py @@ -42,14 +42,10 @@ def train_loop_per_worker(): ) for epoch in range(num_epochs): model.train() - for inputs, labels in iter( - dataset_shard.to_torch( - label_column="y", - label_column_dtype=torch.float, - feature_column_dtypes=torch.float, - batch_size=32, - ) + for batch in dataset_shard.iter_torch_batches( + batch_size=32, dtypes=torch.float ): + inputs, labels = torch.unsqueeze(batch["x"], 1), batch["y"] inputs.to(device) labels.to(device) outputs = model(inputs) diff --git a/doc/source/ray-air/examples/torch_image_example.ipynb b/doc/source/ray-air/examples/torch_image_example.ipynb index 16044376a..dd89baf0a 100644 --- a/doc/source/ray-air/examples/torch_image_example.ipynb +++ b/doc/source/ray-air/examples/torch_image_example.ipynb @@ -142,7 +142,7 @@ "source": [ "Note that {py:class}`SimpleTorchDatasource ` loads all data into memory, so you shouldn't use it with larger datasets.\n", "\n", - "Next, let's represent our data using pandas dataframes instead of tuples. This lets us call methods like {py:meth}`Dataset.to_torch ` later in the tutorial." + "Next, let's represent our data using pandas dataframes instead of tuples. This lets us call methods like {py:meth}`Dataset.iter_torch_batches ` later in the tutorial." ] }, { @@ -280,7 +280,7 @@ "\n", "`train_loop_per_worker` contains regular PyTorch code with a few notable exceptions:\n", "* We wrap our model with {py:func}`train.torch.prepare_model `.\n", - "* We call {py:func}`session.get_dataset_shard ` and {py:meth}`Dataset.to_torch ` to convert a subset of our training data to a Torch dataset.\n", + "* We call {py:func}`session.get_dataset_shard ` and {py:meth}`Dataset.iter_torch_batches ` to convert a subset of our training data to a Torch dataset.\n", "* We save model state using {py:func}`session.report `." ] }, @@ -302,19 +302,15 @@ " criterion = nn.CrossEntropyLoss()\n", " optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)\n", "\n", - " train_dataset_shard: torch.utils.data.Dataset = session.get_dataset_shard(\"train\").to_torch(\n", - " feature_columns=[\"image\"],\n", - " label_column=\"label\",\n", + " train_dataset_shard = session.get_dataset_shard(\"train\").iter_torch_batches(\n", " batch_size=config[\"batch_size\"],\n", - " unsqueeze_feature_tensors=False,\n", - " unsqueeze_label_tensor=False\n", " )\n", "\n", " for epoch in range(2):\n", " running_loss = 0.0\n", " for i, data in enumerate(train_dataset_shard):\n", - " # get the inputs; data is a list of [inputs, labels]\n", - " inputs, labels = data\n", + " # get the inputs and labels\n", + " inputs, labels = data[\"image\"], data[\"label\"]\n", " print(inputs)\n", " print(labels)\n", "\n", diff --git a/doc/source/ray-air/examples/torch_incremental_learning.ipynb b/doc/source/ray-air/examples/torch_incremental_learning.ipynb index 743e79a0a..5f0c5cf56 100644 --- a/doc/source/ray-air/examples/torch_incremental_learning.ipynb +++ b/doc/source/ray-air/examples/torch_incremental_learning.ipynb @@ -461,17 +461,15 @@ " criterion = CrossEntropyLoss()\n", "\n", " # Get the Ray Dataset shard for this data parallel worker, and convert it to a PyTorch Dataset.\n", - " dataset_shard = session.get_dataset_shard(\"train\").to_torch(\n", - " label_column=\"label\",\n", + " dataset_shard = session.get_dataset_shard(\"train\").iter_torch_batches(\n", " batch_size=batch_size,\n", - " unsqueeze_feature_tensors=False,\n", - " unsqueeze_label_tensor=False,\n", " )\n", "\n", " for epoch_idx in range(num_epochs):\n", " running_loss = 0\n", - " for iteration, (train_mb_x, train_mb_y) in enumerate(dataset_shard):\n", + " for iteration, batch in enumerate(dataset_shard):\n", " optimizer.zero_grad()\n", + " train_mb_x, train_mb_y = batch[\"image\"], batch[\"label\"]\n", " train_mb_x = train_mb_x.to(train.torch.get_device())\n", " train_mb_y = train_mb_y.to(train.torch.get_device())\n", "\n", diff --git a/doc/source/train/dl_guide.rst b/doc/source/train/dl_guide.rst index 91a495e3e..34211edd6 100644 --- a/doc/source/train/dl_guide.rst +++ b/doc/source/train/dl_guide.rst @@ -1144,7 +1144,7 @@ Reproducibility def training_func(config): dataloader = ray.train.get_dataset()\ .get_shard(torch.rank())\ - .to_torch(batch_size=config["batch_size"]) + .iter_torch_batches(batch_size=config["batch_size"]) for i in config["epochs"]: ray.train.report(...) # use same intermediate reporting API diff --git a/python/ray/air/examples/custom_trainer.py b/python/ray/air/examples/custom_trainer.py index b43f19b46..dee51ec9b 100644 --- a/python/ray/air/examples/custom_trainer.py +++ b/python/ray/air/examples/custom_trainer.py @@ -20,16 +20,16 @@ class MyPytorchTrainer(BaseTrainer): # preprocessed by self.preprocessor dataset = self.datasets["train"] - torch_ds = dataset.to_torch(label_column="y") loss_fn = torch.nn.MSELoss() for epoch_idx in range(10): loss = 0 num_batches = 0 - for X, y in iter(torch_ds): + for batch in dataset.iter_torch_batches(dtypes=torch.float): # Compute prediction error - pred = self.model(X.float()) - batch_loss = loss_fn(pred, y.float()) + X, y = torch.unsqueeze(batch["x"], 1), batch["y"] + pred = self.model(X) + batch_loss = loss_fn(pred, y) # Backpropagation self.optimizer.zero_grad() diff --git a/python/ray/air/examples/pytorch/torch_regression_example.py b/python/ray/air/examples/pytorch/torch_regression_example.py index d7c060b05..f3bcedfc7 100644 --- a/python/ray/air/examples/pytorch/torch_regression_example.py +++ b/python/ray/air/examples/pytorch/torch_regression_example.py @@ -88,21 +88,14 @@ def train_func(config): results = [] + def create_torch_iterator(shard): + iterator = shard.iter_torch_batches(batch_size=batch_size) + for batch in iterator: + yield batch["x"].float(), batch["y"].float() + for _ in range(epochs): - train_torch_dataset = train_dataset_shard.to_torch( - label_column="y", - feature_columns=["x"], - label_column_dtype=torch.float, - feature_column_dtypes=torch.float, - batch_size=batch_size, - ) - validation_torch_dataset = validation_dataset.to_torch( - label_column="y", - feature_columns=["x"], - label_column_dtype=torch.float, - feature_column_dtypes=torch.float, - batch_size=batch_size, - ) + train_torch_dataset = create_torch_iterator(train_dataset_shard) + validation_torch_dataset = create_torch_iterator(validation_dataset) device = train.torch.get_device() diff --git a/python/ray/air/session.py b/python/ray/air/session.py index 83e7bd738..441f03a12 100644 --- a/python/ray/air/session.py +++ b/python/ray/air/session.py @@ -223,8 +223,9 @@ def get_dataset_shard( ) -> Optional[Union["Dataset", "DatasetPipeline"]]: """Returns the Ray Dataset or DatasetPipeline shard for this worker. - You should call ``to_torch()`` or ``to_tf()`` on this shard to convert - it to the appropriate framework-specific Dataset. + You should call ``iter_torch_batches()`` or ``iter_tf_batches()`` + on this shard to convert it to the appropriate + framework-specific data type. .. code-block:: python @@ -237,8 +238,9 @@ def get_dataset_shard( model = Net() for iter in range(100): # Trainer will automatically handle sharding. - data_shard = session.get_dataset_shard().to_torch() - model.train(data_shard) + data_shard = session.get_dataset_shard("train") + for batch in data_shard.iter_torch_batches(): + # ... return model train_dataset = ray.data.from_items( diff --git a/python/ray/train/base_trainer.py b/python/ray/train/base_trainer.py index 9dd117c25..1f5a00735 100644 --- a/python/ray/train/base_trainer.py +++ b/python/ray/train/base_trainer.py @@ -84,16 +84,17 @@ class BaseTrainer(abc.ABC): # preprocessed by self.preprocessor dataset = self.datasets["train"] - torch_ds = dataset.to_torch(label_column="y") + torch_ds = dataset.iter_torch_batches(dtypes=torch.float) loss_fn = torch.nn.MSELoss() for epoch_idx in range(10): loss = 0 num_batches = 0 - for X, y in iter(torch_ds): + for batch in torch_ds: + X, y = torch.unsqueeze(batch["x"], 1), batch["y"] # Compute prediction error pred = self.model(X) - batch_loss = loss_fn(pred, y.float()) + batch_loss = loss_fn(pred, y) # Backpropagation self.optimizer.zero_grad() diff --git a/python/ray/train/horovod/horovod_trainer.py b/python/ray/train/horovod/horovod_trainer.py index ac54e013c..beded4a43 100644 --- a/python/ray/train/horovod/horovod_trainer.py +++ b/python/ray/train/horovod/horovod_trainer.py @@ -122,14 +122,10 @@ class HorovodTrainer(DataParallelTrainer): ) for epoch in range(num_epochs): model.train() - for inputs, labels in iter( - dataset_shard.to_torch( - label_column="y", - label_column_dtype=torch.float, - feature_column_dtypes=torch.float, - batch_size=32, - ) + for batch in dataset_shard.iter_torch_batches( + batch_size=32, dtypes=torch.float ): + inputs, labels = torch.unsqueeze(batch["x"], 1), batch["y"] inputs.to(device) labels.to(device) outputs = model(inputs) diff --git a/python/ray/train/torch/torch_trainer.py b/python/ray/train/torch/torch_trainer.py index e84012cbe..bbd5848c7 100644 --- a/python/ray/train/torch/torch_trainer.py +++ b/python/ray/train/torch/torch_trainer.py @@ -80,7 +80,7 @@ class TorchTrainer(DataParallelTrainer): # Configures the dataloader for distributed training by adding a # `DistributedSampler`. # You should NOT use this if you are doing - # `session.get_dataset_shard(...).to_torch(...)` + # `session.get_dataset_shard(...).iter_torch_batches(...)` train.torch.prepare_data_loader(...) # Returns the current torch device. diff --git a/python/ray/train/train_loop_utils.py b/python/ray/train/train_loop_utils.py index 58244652e..bf1a0d84c 100644 --- a/python/ray/train/train_loop_utils.py +++ b/python/ray/train/train_loop_utils.py @@ -31,8 +31,8 @@ def get_dataset_shard( ) -> Optional[Union["Dataset", "DatasetPipeline"]]: """Returns the Ray Dataset or DatasetPipeline shard for this worker. - You should call ``to_torch()`` or ``to_tf()`` on this shard to convert - it to the appropriate framework-specific Dataset. + You should call ``iter_torch_batches()`` or ``iter_tf_batches()`` on this shard + to convert it to the appropriate framework-specific data type. .. code-block:: python @@ -42,8 +42,9 @@ def get_dataset_shard( def train_func(): model = Net() for iter in range(100): - data_shard = train.get_dataset_shard().to_torch() - model.train(data_shard) + data_shard = session.get_dataset_shard("train") + for batch in data_shard.iter_torch_batches(): + # ... return model dataset = ray.data.read_csv("train.csv")