"# Processing NYC taxi data using Ray Datasets\n",
"\n",
"The [NYC Taxi dataset](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page) is a popular tabular dataset. In this example, we demonstrate some basic data processing on this dataset using Ray Datasets.\n",
"\n",
"## Overview\n",
"\n",
"This tutorial will cover:\n",
" - Reading Parquet data\n",
" - Inspecting the metadata and first few rows of a large Ray {class}`Dataset <ray.data.Dataset>`\n",
" - Calculating some common global and grouped statistics on the dataset\n",
" - Dropping columns and rows\n",
" - Adding a derived column\n",
" - Shuffling the dataset\n",
" - Sharding the dataset and feeding it to parallel consumers (trainers)\n",
" - Applying batch (offline) inference to the data\n",
"\n",
"## Walkthrough\n",
"\n",
"Let's start by importing Ray and initializing a local Ray cluster."
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "366de039",
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2022-05-18 18:37:54,818\tINFO services.py:1484 -- View the Ray dashboard at \u001b[1m\u001b[32mhttp://127.0.0.1:8266\u001b[39m\u001b[22m\n"
"# Import ray and initialize a local Ray cluster.\n",
"import ray\n",
"ray.init()"
]
},
{
"cell_type": "markdown",
"id": "efb202d0",
"metadata": {},
"source": [
"### Reading and Inspecting the Data\n",
"\n",
"Next, we read a few of the files from the dataset. This read is semi-lazy, where reading of the first file is eagerly executed, but reading of all other files is delayed until the underlying data is needed by downstream operations (e.g. consuming the data with {meth}`ds.take() <ray.data.Dataset.take>`, or transforming the data with {meth}`ds.map_batches() <ray.data.Dataset.map_batches>`).\n",
"\n",
"We could process the entire Dataset in a streaming fashion using {ref}`pipelining <dataset_pipeline_concept>` or all of it in parallel using a multi-node Ray cluster, but we'll save that for our large-scale examples. :)"
"We can easily inspect the schema of this dataset. For Parquet files, we don't even have to read the actual data to get the schema; we can read it from the lightweight Parquet metadata!"
"# Fetch the schema from the underlying Parquet metadata.\n",
"ds.schema()"
]
},
{
"cell_type": "markdown",
"id": "fceebe4d",
"metadata": {},
"source": [
"Parquet even stores the number of rows per file in the Parquet metadata, so we can get the number of rows in ``ds`` without triggering a full data read."
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "5812dacf",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"27472535"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ds.count()"
]
},
{
"cell_type": "markdown",
"id": "cad044a7",
"metadata": {},
"source": [
"We can get a nice, cheap summary of the ``Dataset`` by leveraging it's informative repr:"
"We can also poke at the actual data, taking a peek at a single row. Since this is only returning a row from the first file, reading of the second file is **not** triggered yet."
"To get a better sense of the data size, we can calculate the size in bytes of the full dataset. Note that for Parquet files, this size-in-bytes will be pulled from the Parquet metadata (not triggering a data read) and will therefore be the on-disk size of the data; this might be significantly smaller than the in-memory size!"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "7f0b8702",
"metadata": {
"scrolled": true
},
"outputs": [
{
"data": {
"text/plain": [
"897130464"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ds.size_bytes()"
]
},
{
"cell_type": "markdown",
"id": "4a94d596",
"metadata": {},
"source": [
"In order to get the in-memory size, we can trigger full reading of the dataset and inspect the size in bytes."
"In addition to being able to read lists of individual files, {func}`ray.data.read_parquet() <ray.data.read_parquet>` (as well as other ``ray.data.read_*()`` APIs) can read directories containing multiple Parquet files. For Parquet in particular, reading Parquet datasets partitioned by a particular column is supported, allowing for path-based (zero-read) partition filtering and (optionally) including the partition column value specified in the file paths directly in the read table data.\n",
"\n",
"For the NYC taxi dataset, instead of reading individual per-month Parquet files, we can read the entire 2009 directory.\n",
"\n",
"```{warning}\n",
"This will be a lot of data (~5.6 GB on disk, ~14 GB in memory), so be careful trigger full reads on a limited-memory machine! This is one place where Datasets' semi-lazy reading comes in handy: Datasets will only read one file eagerly, which allows us to inspect a subset of the data without having to read the entire dataset.\n",
"The metadata that Datasets prints in its repr is guaranteed to not trigger reads of all files; data such as the row count and the schema is pulled directly from the Parquet metadata."
"Whoa, there was a trip with 113 people in the taxi!? Let's check out these kind of many-passenger records by filtering to just these records using our {meth}`ds.map_batches() <ray.data.Dataset.map_batches>` batch mapping API.\n",
"\n",
":::{note}\n",
"Our filtering UDF receives a Pandas DataFrame, which is the default batch format for tabular data, and returns a Pandas DataFrame, which keeps the Dataset in a tabular format.\n"
"#### Advanced Aside - Projection and Filter Pushdown\n",
"\n",
"Note that Ray Datasets' Parquet reader supports projection (column selection) and row filter pushdown, where we can push the above column selection and the row-based filter to the Parquet read. If we specify column selection at Parquet read time, the unselected columns won't even be read from disk!\n",
"\n",
"The row-based filter is specified via\n",
"[Arrow's dataset field expressions](https://arrow.apache.org/docs/6.0/python/generated/pyarrow.dataset.Expression.html#pyarrow.dataset.Expression). See the {ref}`feature guide for reading Parquet data <dataset_supported_file_formats>` for more information."
"See the feature guides for {ref}`transforming data <transforming_datasets>` and {ref}`ML preprocessing <datasets-ml-preprocessing>` for more information on how we can process our data with Ray Datasets."
]
},
{
"cell_type": "markdown",
"id": "3c2e28bf",
"metadata": {},
"source": [
"### Ingesting into Model Trainers\n",
"\n",
"Now that we've learned more about our data and we have cleaned up our dataset a bit, we now look at how we can feed this dataset into some dummy model trainers.\n",
"\n",
"First, let's do a full global random shuffle of the dataset to decorrelate these samples."
"We define a dummy ``Trainer`` actor, where each trainer will consume a dataset shard in batches and simulate model training.\n",
"\n",
":::{note}\n",
"In a real training workflow, we would feed ``ds`` to {ref}`Ray Train <train-docs>`, which would do this sharding and creation of training actors for us, under the hood.\n"
"Finally, we simulate training, passing each shard to the corresponding trainer. The number of rows per shard is returned."
]
},
{
"cell_type": "code",
"execution_count": 25,
"id": "d60d0e0d",
"metadata": {
"scrolled": false
},
"outputs": [
{
"data": {
"text/plain": [
"[6867923, 6867923, 6867923, 6867923]"
]
},
"execution_count": 25,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ray.get([w.train.remote(s) for w, s in zip(trainers, shards)])"
]
},
{
"cell_type": "code",
"execution_count": 26,
"id": "b1ae3f38",
"metadata": {},
"outputs": [],
"source": [
"# Delete trainer actor handle references, which should terminate the actors.\n",
"del trainers"
]
},
{
"cell_type": "markdown",
"id": "60c90def",
"metadata": {},
"source": [
"### Parallel Batch Inference\n",
"\n",
"After we've trained a model, we may want to perform batch (offline) inference on such a tabular dataset. With Ray Datasets, this is as easy as a {meth}`ds.map_batches() <ray.data.Dataset.map_batches>` call!\n",
"\n",
"First, we define a callable class that will cache the loading of the model in its constructor."
"``BatchInferModel``'s constructor will only be called once per actor worker when using the actor pool compute strategy in {meth}`ds.map_batches() <ray.data.Dataset.map_batches>`."
" #num_gpus=1, # Uncomment this to run this on GPUs!\n",
" compute=\"actors\",\n",
").take()"
]
},
{
"cell_type": "markdown",
"id": "a6806ab7",
"metadata": {},
"source": [
"We can also configure the autoscaling actor pool that this inference stage uses, setting upper and lower bounds on the actor pool size, and even tweak the batch prefetching vs. inference task queueing tradeoff."