Why tf.data?
The simplest way to train a Keras model is to pass NumPy arrays directly to model.fit(). That works for toy datasets, but it falls apart at scale. When your dataset is too large for memory, when loading from disk is slower than the GPU can compute, or when you need real-time augmentation, you need a dedicated input pipeline. That's exactly what tf.data provides.
tf.data solves this by overlapping data preparation with model execution through prefetching, parallel mapping, and caching.
The ETL Pipeline Concept
Every tf.data pipeline follows the Extract → Transform → Load pattern. You extract raw data from files, tensors, or generators; transform it with map, filter, batch, and shuffle operations; and load it into the model for training. The key insight is that these stages can run concurrently — while the GPU trains on batch N, the CPU can already be preparing batch N+1.
Throughput of a data pipeline is measured as:
$$T = \frac{\text{batch\_size}}{\text{step\_time}}$$
where step_time includes both data loading and model computation. The goal of tf.data is to make data loading invisible — fully hidden behind computation time.
import tensorflow as tf
import numpy as np
import time
# Problem: Naive NumPy feeding
# The entire dataset must fit in memory, and there's no pipelining
np.random.seed(42)
X = np.random.randn(10000, 224, 224, 3).astype(np.float32) # ~5.6 GB!
# This would crash on most machines for large image datasets
# Solution: tf.data pipeline — lazy loading, never holds full dataset
# Create a small demo to show the concept
X_small = np.random.randn(1000, 32).astype(np.float32)
y_small = np.random.randint(0, 2, 1000).astype(np.float32)
# NumPy approach — entire dataset in memory
start = time.time()
model = tf.keras.Sequential([tf.keras.layers.Dense(16, activation='relu'),
tf.keras.layers.Dense(1, activation='sigmoid')])
model.compile(optimizer='adam', loss='binary_crossentropy')
model.fit(X_small, y_small, batch_size=64, epochs=5, verbose=0)
numpy_time = time.time() - start
# tf.data approach — with prefetching
start = time.time()
dataset = tf.data.Dataset.from_tensor_slices((X_small, y_small))
dataset = dataset.shuffle(1000).batch(64).prefetch(tf.data.AUTOTUNE)
model2 = tf.keras.Sequential([tf.keras.layers.Dense(16, activation='relu'),
tf.keras.layers.Dense(1, activation='sigmoid')])
model2.compile(optimizer='adam', loss='binary_crossentropy')
model2.fit(dataset, epochs=5, verbose=0)
tfdata_time = time.time() - start
print(f"NumPy feeding: {numpy_time:.3f}s")
print(f"tf.data pipeline: {tfdata_time:.3f}s")
print(f"Dataset element spec: {dataset.element_spec}")
For small in-memory data, both approaches perform similarly. The real gains emerge with large datasets loaded from disk, where tf.data's lazy evaluation and prefetching keep the accelerator fully utilized.
Creating Datasets
TensorFlow offers multiple factory methods to create tf.data.Dataset objects, each suited to a different data source. The most common are from_tensor_slices (in-memory data), from_generator (Python generators), and file-based constructors like TFRecordDataset and TextLineDataset.
from_tensor_slices
The workhorse for in-memory data. It slices a tensor (or tuple/dict of tensors) along the first dimension, producing one element per row. Unlike from_tensors, which wraps the entire input as a single element, from_tensor_slices iterates over individual samples.
import tensorflow as tf
import numpy as np
# from_tensor_slices — one element per row
features = np.array([[1, 2], [3, 4], [5, 6], [7, 8]], dtype=np.float32)
labels = np.array([0, 1, 1, 0], dtype=np.int32)
# Tuple of (features, labels) — most common for supervised learning
dataset = tf.data.Dataset.from_tensor_slices((features, labels))
for x, y in dataset:
print(f"Features: {x.numpy()}, Label: {y.numpy()}")
# Dictionary input — useful for feature columns
dict_dataset = tf.data.Dataset.from_tensor_slices({
'age': [25, 30, 35, 40],
'income': [50000, 60000, 70000, 80000],
'label': [0, 1, 1, 0]
})
for element in dict_dataset.take(2):
print(element)
# from_tensors — wraps entire input as ONE element (contrast)
single_element = tf.data.Dataset.from_tensors((features, labels))
print(f"\nfrom_tensor_slices: {tf.data.Dataset.from_tensor_slices(features).cardinality().numpy()} elements")
print(f"from_tensors: {single_element.cardinality().numpy()} element")
from_generator & range
from_generator wraps any Python generator into a dataset. This is ideal for data that doesn't fit in memory or comes from custom sources (databases, APIs, complex preprocessing). Dataset.range produces integer sequences, handy for indexing or benchmarking.
import tensorflow as tf
import numpy as np
# from_generator — lazy evaluation, generates on demand
def data_generator():
"""Simulate reading from a database or file one record at a time."""
for i in range(100):
# Simulate variable-length sequences
seq_len = np.random.randint(5, 20)
features = np.random.randn(seq_len).astype(np.float32)
label = np.random.randint(0, 3)
yield features, label
gen_dataset = tf.data.Dataset.from_generator(
data_generator,
output_signature=(
tf.TensorSpec(shape=(None,), dtype=tf.float32), # variable length
tf.TensorSpec(shape=(), dtype=tf.int32)
)
)
for x, y in gen_dataset.take(3):
print(f"Sequence length: {x.shape[0]}, Label: {y.numpy()}")
# Dataset.range — integer sequences
range_ds = tf.data.Dataset.range(0, 10, 2) # 0, 2, 4, 6, 8
print("\nRange dataset:", list(range_ds.as_numpy_iterator()))
# list_files — glob pattern for file paths
# file_ds = tf.data.Dataset.list_files('data/images/*.jpg', shuffle=True)
# Simulated example:
print("\nDataset cardinality:", gen_dataset.cardinality().numpy())
print("(UNKNOWN = -2, means lazy/generator-based)")
File-Based Datasets
For large-scale workloads, TensorFlow provides specialized dataset classes that read directly from files without loading everything into memory: TextLineDataset for CSV/text, TFRecordDataset for binary TFRecord files, and FixedLengthRecordDataset for fixed-width binary data.
import tensorflow as tf
import tempfile
import os
# Create a temporary CSV file for demonstration
csv_content = "name,age,score\nAlice,25,88.5\nBob,30,92.0\nCharlie,35,76.3\nDiana,28,95.1\n"
tmp_dir = tempfile.mkdtemp()
csv_path = os.path.join(tmp_dir, "data.csv")
with open(csv_path, 'w') as f:
f.write(csv_content)
# TextLineDataset — reads line by line
text_ds = tf.data.TextLineDataset(csv_path)
for line in text_ds:
print(line.numpy().decode('utf-8'))
# Skip header and parse CSV
def parse_csv(line):
defaults = [tf.constant(''), tf.constant(0), tf.constant(0.0)]
fields = tf.io.decode_csv(line, record_defaults=defaults)
return {'name': fields[0], 'age': fields[1]}, fields[2]
parsed_ds = text_ds.skip(1).map(parse_csv) # skip header row
print("\nParsed records:")
for features, label in parsed_ds:
print(f" Name: {features['name'].numpy().decode()}, "
f"Age: {features['age'].numpy()}, "
f"Score: {label.numpy()}")
# Cleanup
os.remove(csv_path)
os.rmdir(tmp_dir)
Core Transformations
Transformations are the heart of tf.data. They are lazy — nothing executes until you iterate over the dataset. Transformations return new datasets, so you chain them fluently: dataset.map(...).filter(...).batch(...). Let's explore the essential operations.
map & filter
map() applies a function to each element. filter() keeps only elements that satisfy a predicate. Both accept a num_parallel_calls argument to parallelize work across CPU cores.
import tensorflow as tf
# Create a simple dataset
dataset = tf.data.Dataset.range(20)
# map — transform each element
squared = dataset.map(lambda x: x ** 2)
print("Squared:", list(squared.as_numpy_iterator()))
# filter — keep elements matching a predicate
evens = dataset.filter(lambda x: x % 2 == 0)
print("Evens:", list(evens.as_numpy_iterator()))
# Chaining map + filter
result = (dataset
.filter(lambda x: x % 3 == 0) # keep multiples of 3
.map(lambda x: tf.cast(x, tf.float32) * 0.1) # scale
)
print("Multiples of 3, scaled:", list(result.as_numpy_iterator()))
# map with (features, labels) tuples
features = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])
labels = tf.constant([0, 1, 1])
ds = tf.data.Dataset.from_tensor_slices((features, labels))
# Normalize features in map
def normalize(x, y):
return (x - tf.reduce_mean(x)) / tf.math.reduce_std(x), y
normalized_ds = ds.map(normalize)
for x, y in normalized_ds:
print(f"Normalized: {x.numpy()}, Label: {y.numpy()}")
flat_map & interleave
flat_map maps a function that returns a dataset, then flattens all inner datasets into one. interleave does the same but pulls elements from multiple inner datasets concurrently — crucial for parallel file reading.
import tensorflow as tf
# flat_map — each element produces a sub-dataset, then flatten
dataset = tf.data.Dataset.from_tensor_slices([3, 5, 2])
# For each number n, produce n copies of it
flat = dataset.flat_map(lambda x: tf.data.Dataset.from_tensors(x).repeat(x))
print("flat_map result:", list(flat.as_numpy_iterator()))
# [3, 3, 3, 5, 5, 5, 5, 5, 2, 2]
# interleave — pull from multiple sub-datasets concurrently
# Simulates reading from multiple files in parallel
sources = tf.data.Dataset.range(3) # 3 "files"
interleaved = sources.interleave(
lambda x: tf.data.Dataset.range(x * 10, x * 10 + 5),
cycle_length=3, # read from 3 sources at once
block_length=2, # take 2 elements from each before switching
num_parallel_calls=tf.data.AUTOTUNE
)
print("Interleaved:", list(interleaved.as_numpy_iterator()))
# Elements from source 0 (0-4), source 1 (10-14), source 2 (20-24) — interleaved
take, skip, repeat & shard
Utility transformations for subsetting and repeating datasets. take(n) grabs the first n elements, skip(n) drops them. repeat() loops the dataset, and shard partitions it for distributed training.
import tensorflow as tf
dataset = tf.data.Dataset.range(10)
# take — first n elements (great for debugging)
print("Take 3:", list(dataset.take(3).as_numpy_iterator()))
# skip — drop first n elements
print("Skip 7:", list(dataset.skip(7).as_numpy_iterator()))
# Combine for train/val split
full_ds = tf.data.Dataset.range(100)
val_size = 20
val_ds = full_ds.take(val_size)
train_ds = full_ds.skip(val_size)
print(f"Train size: {train_ds.cardinality().numpy()}, Val size: {val_ds.cardinality().numpy()}")
# repeat — loop dataset (None = infinite)
repeated = tf.data.Dataset.range(3).repeat(2)
print("Repeat(2):", list(repeated.as_numpy_iterator())) # [0,1,2,0,1,2]
# shard — distributed training (worker i of n)
# Worker 0 gets elements 0, 2, 4, ...
# Worker 1 gets elements 1, 3, 5, ...
shard_0 = dataset.shard(num_shards=2, index=0)
shard_1 = dataset.shard(num_shards=2, index=1)
print("Shard 0:", list(shard_0.as_numpy_iterator()))
print("Shard 1:", list(shard_1.as_numpy_iterator()))
# enumerate — add index to each element
for idx, val in dataset.enumerate().take(3):
print(f"Index {idx.numpy()}: Value {val.numpy()}")
Batching & Shuffling
Batching groups individual elements into fixed-size tensors for efficient GPU computation. Shuffling randomizes the order to prevent the model from memorizing data ordering. Getting the shuffle buffer size right is critical for training quality.
batch & padded_batch
Here is the implementation for batch & padded_batch. Each code example below is self-contained and can be run independently:
import tensorflow as tf
import numpy as np
# Basic batching
dataset = tf.data.Dataset.range(10)
batched = dataset.batch(3)
for batch in batched:
print("Batch:", batch.numpy())
# Output: [0,1,2], [3,4,5], [6,7,8], [9] — last batch is smaller
# drop_remainder — ensure uniform batch sizes (important for TPUs)
batched_drop = dataset.batch(3, drop_remainder=True)
for batch in batched_drop:
print("Uniform batch:", batch.numpy())
# Output: [0,1,2], [3,4,5], [6,7,8] — last incomplete batch dropped
# padded_batch — for variable-length sequences
# Pad shorter sequences with zeros to match the longest in the batch
def gen_variable_sequences():
for length in [2, 5, 3, 7, 1, 4]:
yield tf.random.normal([length])
var_ds = tf.data.Dataset.from_generator(
gen_variable_sequences,
output_signature=tf.TensorSpec(shape=(None,), dtype=tf.float32)
)
padded = var_ds.padded_batch(3, padded_shapes=[10]) # pad to length 10
for batch in padded:
print(f"Padded batch shape: {batch.shape}")
print(f" Values:\n{batch.numpy()}\n")
shuffle Buffer Size
The shuffle(buffer_size) operation maintains a buffer of buffer_size elements and randomly samples from it. A buffer equal to the dataset size gives perfect shuffling; a smaller buffer provides approximate shuffling with less memory. Setting buffer_size=1 means no shuffling at all.
buffer_size too small. If your dataset has 10,000 elements but you set buffer_size=100, elements that are far apart in the original order will never appear in the same shuffle window. For small datasets, use buffer_size=len(dataset). For large datasets, use the largest value your memory allows — at least 1,000–10,000.
import tensorflow as tf
# Demonstrate shuffle buffer size effects
dataset = tf.data.Dataset.range(20)
# Perfect shuffle — buffer = dataset size
perfect = dataset.shuffle(buffer_size=20, seed=42)
print("Perfect shuffle (buffer=20):", list(perfect.as_numpy_iterator()))
# Approximate shuffle — small buffer
approx = dataset.shuffle(buffer_size=3, seed=42)
print("Small buffer (buffer=3): ", list(approx.as_numpy_iterator()))
# No shuffle — buffer = 1
no_shuffle = dataset.shuffle(buffer_size=1)
print("No shuffle (buffer=1): ", list(no_shuffle.as_numpy_iterator()))
# reshuffle_each_iteration — different order each epoch (default True)
ds = tf.data.Dataset.range(5).shuffle(5, reshuffle_each_iteration=True)
print("\nEpoch 1:", list(ds.as_numpy_iterator()))
print("Epoch 2:", list(ds.as_numpy_iterator()))
# Standard pattern: shuffle BEFORE batch
pipeline = (tf.data.Dataset.range(100)
.shuffle(buffer_size=100) # shuffle first
.batch(16) # then batch
.prefetch(tf.data.AUTOTUNE) # overlap with training
)
print(f"\nPipeline element spec: {pipeline.element_spec}")
Performance Optimization
The three pillars of tf.data performance are prefetching, caching, and parallel mapping. Together, they ensure that data preparation never becomes the bottleneck — the GPU always has a batch ready when it finishes the previous one.
prefetch & cache
prefetch(buffer_size) overlaps data preprocessing with model execution. While the model trains on batch N, the input pipeline prepares batch N+1. Using tf.data.AUTOTUNE lets TensorFlow dynamically tune the buffer size at runtime.
cache() stores the dataset in memory (or on disk) after the first epoch, eliminating repeated I/O and transformation costs for subsequent epochs. Place it after expensive transformations but before random augmentation (which should vary each epoch).
import tensorflow as tf
import numpy as np
import time
# Simulate an expensive preprocessing step
def expensive_preprocess(x, y):
# Simulate 10ms of CPU work per sample
tf.py_function(lambda: time.sleep(0.001), [], [])
return tf.cast(x, tf.float32) / 255.0, y
# Create dataset
X = np.random.randint(0, 256, (500, 28, 28), dtype=np.uint8)
y = np.random.randint(0, 10, 500, dtype=np.int32)
base_ds = tf.data.Dataset.from_tensor_slices((X, y))
# Without optimization — sequential
ds_naive = base_ds.map(expensive_preprocess).batch(32)
# With cache — preprocess once, reuse
ds_cached = base_ds.map(expensive_preprocess).cache().batch(32)
# With prefetch — overlap prep and training
ds_prefetch = base_ds.map(expensive_preprocess).batch(32).prefetch(tf.data.AUTOTUNE)
# With both — best of both worlds
ds_optimal = (base_ds
.map(expensive_preprocess, num_parallel_calls=tf.data.AUTOTUNE)
.cache() # cache after expensive ops
.shuffle(500) # shuffle after cache (varies each epoch)
.batch(32)
.prefetch(tf.data.AUTOTUNE) # always last in the chain
)
# cache to disk for datasets too large for memory
# ds_disk_cache = base_ds.map(expensive_preprocess).cache('/tmp/tf_cache')
print("Naive pipeline spec:", ds_naive.element_spec)
print("Optimal pipeline spec:", ds_optimal.element_spec)
print("AUTOTUNE value:", tf.data.AUTOTUNE)
Parallel Calls & I/O
Set num_parallel_calls=tf.data.AUTOTUNE on map() and interleave() to parallelize transformation across CPU cores. For file-based pipelines, use interleave to read from multiple files simultaneously.
The Optimal Pipeline Recipe
For most training workloads, the optimal tf.data pipeline follows this pattern:
- interleave — read from multiple files in parallel
- map with
num_parallel_calls=AUTOTUNE— parse/preprocess in parallel - cache — store after deterministic transforms (skip for huge datasets)
- shuffle — randomize order (after cache so it varies each epoch)
- batch — group into batches
- map — apply random augmentation (after batch for vectorized ops)
- prefetch(AUTOTUNE) — overlap with training (always last)
Pipeline Stages Diagram
The diagram below visualizes how pipeline stages overlap with training. Without prefetching, the GPU waits for data. With prefetching and parallel map, data preparation happens concurrently with gradient computation.
flowchart LR
subgraph Extract
A[Read Files] --> B[Interleave I/O]
end
subgraph Transform
B --> C[Parse / Decode]
C --> D[Map / Preprocess]
D --> E[Cache]
E --> F[Shuffle]
F --> G[Batch]
G --> H[Augment]
end
subgraph Load
H --> I[Prefetch]
I --> J[GPU Training]
end
style A fill:#132440,stroke:#3B9797,color:#fff
style B fill:#16476A,stroke:#3B9797,color:#fff
style C fill:#16476A,stroke:#3B9797,color:#fff
style D fill:#3B9797,stroke:#132440,color:#fff
style E fill:#3B9797,stroke:#132440,color:#fff
style F fill:#3B9797,stroke:#132440,color:#fff
style G fill:#3B9797,stroke:#132440,color:#fff
style H fill:#3B9797,stroke:#132440,color:#fff
style I fill:#BF092F,stroke:#132440,color:#fff
style J fill:#132440,stroke:#BF092F,color:#fff
Data Augmentation
Data augmentation artificially expands your training set by applying random transformations (flips, rotations, crops, color jitter) to each sample. In tf.data pipelines, augmentation is applied inside map() — typically after batching for better vectorized performance.
tf.image Transformations
Here is the implementation for tf.image Transformations. Each code example below is self-contained and can be run independently:
import tensorflow as tf
import numpy as np
# Create a synthetic image batch (4 images, 64x64, RGB)
np.random.seed(42)
images = np.random.randint(0, 256, (4, 64, 64, 3), dtype=np.uint8)
images_f32 = tf.cast(images, tf.float32) / 255.0
# Individual augmentation operations
flipped = tf.image.random_flip_left_right(images_f32[0])
print("Flipped shape:", flipped.shape)
brightness = tf.image.random_brightness(images_f32[0], max_delta=0.2)
print("Brightness adjusted shape:", brightness.shape)
contrast = tf.image.random_contrast(images_f32[0], lower=0.8, upper=1.2)
print("Contrast adjusted shape:", contrast.shape)
saturation = tf.image.random_saturation(images_f32[0], lower=0.8, upper=1.2)
print("Saturation adjusted shape:", saturation.shape)
hue = tf.image.random_hue(images_f32[0], max_delta=0.1)
print("Hue adjusted shape:", hue.shape)
# Central crop and resize
cropped = tf.image.central_crop(images_f32[0], central_fraction=0.75)
resized = tf.image.resize(cropped, [64, 64])
print("Crop + Resize shape:", resized.shape)
# Random crop
random_crop = tf.image.random_crop(images_f32[0], size=[48, 48, 3])
print("Random crop shape:", random_crop.shape)
Augmentation Layers
Keras preprocessing layers provide a higher-level API for augmentation that integrates directly into the model or pipeline. They're especially convenient because they respect training=True/False — augmentation is automatically disabled during inference.
import tensorflow as tf
import numpy as np
# Build an augmentation pipeline using Keras layers
augmentation = tf.keras.Sequential([
tf.keras.layers.RandomFlip("horizontal"),
tf.keras.layers.RandomRotation(0.1), # ±10% of full rotation
tf.keras.layers.RandomZoom(0.1), # ±10% zoom
tf.keras.layers.RandomContrast(0.1), # ±10% contrast
], name='augmentation')
# Create dataset of synthetic images
np.random.seed(42)
X = np.random.rand(100, 64, 64, 3).astype(np.float32)
y = np.random.randint(0, 5, 100)
ds = tf.data.Dataset.from_tensor_slices((X, y))
# Apply augmentation inside map — only during training
def augment_fn(image, label):
image = augmentation(image, training=True)
return image, label
train_ds = (ds
.shuffle(100)
.batch(16)
.map(augment_fn, num_parallel_calls=tf.data.AUTOTUNE)
.prefetch(tf.data.AUTOTUNE)
)
# Alternatively, include augmentation in the model itself
model = tf.keras.Sequential([
tf.keras.layers.Input(shape=(64, 64, 3)),
augmentation, # augmentation layers
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.GlobalAveragePooling2D(),
tf.keras.layers.Dense(5, activation='softmax')
])
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
print(model.summary())
TFRecord Format
TFRecord is TensorFlow's native binary storage format. It stores data as sequential byte records in Protocol Buffer format, enabling fast sequential reads, built-in compression, and schema-flexible storage. For large-scale ML pipelines, TFRecords are the standard choice over CSV, JSON, or raw image files.
Writing TFRecords
Each example in a TFRecord file is a serialized tf.train.Example proto containing a map of feature names to tf.train.Feature values. Features can be bytes (strings, images), floats, or int64s.
import tensorflow as tf
import numpy as np
import tempfile
import os
# Helper functions to create Feature objects
def _bytes_feature(value):
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def _float_feature(value):
return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))
def _int64_feature(value):
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
def _float_list_feature(value):
return tf.train.Feature(float_list=tf.train.FloatList(value=value))
# Create synthetic data
np.random.seed(42)
num_examples = 50
images = np.random.randint(0, 256, (num_examples, 32, 32, 3), dtype=np.uint8)
labels = np.random.randint(0, 10, num_examples)
scores = np.random.rand(num_examples).astype(np.float32)
# Write TFRecord file
tfrecord_path = os.path.join(tempfile.mkdtemp(), 'data.tfrecord')
with tf.io.TFRecordWriter(tfrecord_path) as writer:
for i in range(num_examples):
# Serialize image to raw bytes
image_bytes = images[i].tobytes()
# Build tf.train.Example
example = tf.train.Example(features=tf.train.Features(feature={
'image': _bytes_feature(image_bytes),
'label': _int64_feature(int(labels[i])),
'score': _float_feature(float(scores[i])),
'height': _int64_feature(32),
'width': _int64_feature(32),
}))
writer.write(example.SerializeToString())
file_size = os.path.getsize(tfrecord_path)
print(f"Wrote {num_examples} examples to {tfrecord_path}")
print(f"File size: {file_size / 1024:.1f} KB")
print(f"Avg bytes/example: {file_size / num_examples:.0f}")
Reading & Parsing
To read TFRecords back, create a TFRecordDataset and apply a map function that parses the serialized proto back into tensors. The parse function defines the expected schema using tf.io.parse_single_example.
flowchart TD
subgraph Write
A[Raw Data] --> B[Create tf.train.Feature]
B --> C[Build tf.train.Example]
C --> D[Serialize to Bytes]
D --> E[TFRecordWriter]
E --> F[.tfrecord File]
end
subgraph Read
F --> G[TFRecordDataset]
G --> H[map parse_fn]
H --> I[tf.io.parse_single_example]
I --> J[Decoded Tensors]
J --> K[Pipeline: batch, prefetch]
end
style A fill:#132440,stroke:#3B9797,color:#fff
style F fill:#BF092F,stroke:#132440,color:#fff
style J fill:#3B9797,stroke:#132440,color:#fff
style K fill:#132440,stroke:#3B9797,color:#fff
import tensorflow as tf
import numpy as np
import tempfile
import os
# Helper to write a small TFRecord file
def _bytes_feature(value):
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def _int64_feature(value):
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
np.random.seed(42)
tfrecord_path = os.path.join(tempfile.mkdtemp(), 'data.tfrecord')
with tf.io.TFRecordWriter(tfrecord_path) as writer:
for i in range(50):
img = np.random.randint(0, 256, (32, 32, 3), dtype=np.uint8)
example = tf.train.Example(features=tf.train.Features(feature={
'image': _bytes_feature(img.tobytes()),
'label': _int64_feature(np.random.randint(0, 10)),
'height': _int64_feature(32),
'width': _int64_feature(32),
}))
writer.write(example.SerializeToString())
# Define the parse function — must match the write schema
def parse_tfrecord(serialized):
feature_description = {
'image': tf.io.FixedLenFeature([], tf.string),
'label': tf.io.FixedLenFeature([], tf.int64),
'height': tf.io.FixedLenFeature([], tf.int64),
'width': tf.io.FixedLenFeature([], tf.int64),
}
parsed = tf.io.parse_single_example(serialized, feature_description)
# Decode raw bytes back to tensor
image = tf.io.decode_raw(parsed['image'], tf.uint8)
image = tf.reshape(image, [parsed['height'], parsed['width'], 3])
image = tf.cast(image, tf.float32) / 255.0
return image, parsed['label']
# Build the pipeline
dataset = (tf.data.TFRecordDataset(tfrecord_path)
.map(parse_tfrecord, num_parallel_calls=tf.data.AUTOTUNE)
.shuffle(50)
.batch(8)
.prefetch(tf.data.AUTOTUNE)
)
# Verify
for images, labels in dataset.take(1):
print(f"Batch images shape: {images.shape}")
print(f"Batch labels: {labels.numpy()}")
print(f"Pixel range: [{images.numpy().min():.2f}, {images.numpy().max():.2f}]")
# Cleanup
os.remove(tfrecord_path)
Image Data Pipeline
Now let's combine everything into a complete, production-quality image pipeline that reads files from disk, decodes them, resizes, augments, batches, and prefetches — the kind of pipeline you'd use for training a CNN on a real dataset.
End-to-End Pipeline
Here is the implementation for End-to-End Pipeline. Each code example below is self-contained and can be run independently:
import tensorflow as tf
import numpy as np
import tempfile
import os
# Create a temporary directory with synthetic "image" files
tmp_dir = tempfile.mkdtemp()
for split in ['train', 'val']:
for cls in ['cat', 'dog']:
cls_dir = os.path.join(tmp_dir, split, cls)
os.makedirs(cls_dir, exist_ok=True)
for i in range(20):
# Create small random PNG images
img = tf.random.uniform([64, 64, 3], 0, 255, dtype=tf.float32)
img_bytes = tf.io.encode_png(tf.cast(img, tf.uint8))
tf.io.write_file(os.path.join(cls_dir, f'{i:03d}.png'), img_bytes)
print(f"Created dataset at: {tmp_dir}")
# Configuration
IMG_SIZE = 48
BATCH_SIZE = 8
AUTOTUNE = tf.data.AUTOTUNE
class_names = ['cat', 'dog']
# Step 1: List all image files with labels
def get_file_paths_and_labels(data_dir):
file_paths = []
labels = []
for idx, cls_name in enumerate(class_names):
cls_dir = os.path.join(data_dir, cls_name)
for fname in os.listdir(cls_dir):
file_paths.append(os.path.join(cls_dir, fname))
labels.append(idx)
return file_paths, labels
train_paths, train_labels = get_file_paths_and_labels(os.path.join(tmp_dir, 'train'))
# Step 2: Parse function — read, decode, resize
def parse_image(file_path, label):
raw = tf.io.read_file(file_path)
image = tf.io.decode_png(raw, channels=3)
image = tf.image.resize(image, [IMG_SIZE, IMG_SIZE])
image = tf.cast(image, tf.float32) / 255.0
return image, label
# Step 3: Augmentation function
def augment(image, label):
image = tf.image.random_flip_left_right(image)
image = tf.image.random_brightness(image, max_delta=0.2)
image = tf.image.random_contrast(image, 0.8, 1.2)
image = tf.clip_by_value(image, 0.0, 1.0)
return image, label
# Step 4: Build the complete pipeline
train_ds = (tf.data.Dataset.from_tensor_slices((train_paths, train_labels))
.shuffle(len(train_paths))
.map(parse_image, num_parallel_calls=AUTOTUNE)
.cache()
.map(augment, num_parallel_calls=AUTOTUNE)
.batch(BATCH_SIZE)
.prefetch(AUTOTUNE)
)
# Verify the pipeline
for images, labels in train_ds.take(1):
print(f"Batch shape: {images.shape}")
print(f"Labels: {labels.numpy()}")
print(f"Pixel range: [{images.numpy().min():.3f}, {images.numpy().max():.3f}]")
# Cleanup
import shutil
shutil.rmtree(tmp_dir)
Text Data Pipeline
Text pipelines follow the same ETL pattern but swap image decoding for tokenization. The TextVectorization layer handles vocabulary building, tokenization, and padding — all within a tf.data-compatible workflow.
TextVectorization Layer
Here is the implementation for TextVectorization Layer. Each code example below is self-contained and can be run independently:
import tensorflow as tf
import numpy as np
# Sample text data
texts = [
"TensorFlow makes deep learning easy",
"Data pipelines are essential for training",
"GPUs accelerate neural network training",
"Keras provides high-level model building APIs",
"Batch processing improves throughput significantly",
"Prefetching overlaps data loading with computation",
"Augmentation prevents overfitting on small datasets",
"TFRecords enable efficient binary data storage",
]
labels = [1, 1, 1, 1, 0, 0, 0, 0] # positive/negative sentiment
# Create TextVectorization layer
text_vectorizer = tf.keras.layers.TextVectorization(
max_tokens=200, # vocabulary size limit
output_mode='int', # integer token IDs
output_sequence_length=15 # pad/truncate to fixed length
)
# Adapt on training data to build vocabulary
text_vectorizer.adapt(texts)
# Inspect the vocabulary
vocab = text_vectorizer.get_vocabulary()
print(f"Vocabulary size: {len(vocab)}")
print(f"First 20 tokens: {vocab[:20]}")
# Vectorize a sample
sample = tf.constant(["TensorFlow data pipelines are fast"])
vectorized = text_vectorizer(sample)
print(f"\nInput: 'TensorFlow data pipelines are fast'")
print(f"Encoded: {vectorized.numpy()[0]}")
# Decode back (manually)
decoded = [vocab[i] for i in vectorized.numpy()[0] if i > 0]
print(f"Decoded: {' '.join(decoded)}")
Complete Text Classification Pipeline
Here is the implementation for Complete Text Classification Pipeline. Each code example below is self-contained and can be run independently:
import tensorflow as tf
import numpy as np
# Generate synthetic text classification data
train_texts = [
"great product highly recommend", "terrible quality waste of money",
"absolutely love this item", "broken on arrival very disappointed",
"excellent service fast delivery", "worst purchase ever made",
"fantastic value for the price", "does not work as described",
"best thing I ever bought", "complete garbage do not buy",
"amazing quality exceeded expectations", "poor craftsmanship fell apart",
"wonderful experience will buy again", "horrible customer service",
"perfect gift for anyone", "total ripoff avoid at all costs",
] * 10 # repeat for more data
train_labels = ([1, 0] * 8) * 10 # 1=positive, 0=negative
# Step 1: TextVectorization layer
vectorizer = tf.keras.layers.TextVectorization(
max_tokens=500,
output_mode='int',
output_sequence_length=20
)
vectorizer.adapt(train_texts)
# Step 2: Build tf.data pipeline
dataset = tf.data.Dataset.from_tensor_slices((train_texts, train_labels))
def vectorize_text(text, label):
text = tf.expand_dims(text, -1) # vectorizer expects batch dim
return vectorizer(text)[0], label # remove batch dim
train_ds = (dataset
.shuffle(len(train_texts))
.map(vectorize_text, num_parallel_calls=tf.data.AUTOTUNE)
.batch(16)
.prefetch(tf.data.AUTOTUNE)
)
# Step 3: Build and train model
model = tf.keras.Sequential([
tf.keras.layers.Embedding(500, 32, input_length=20),
tf.keras.layers.GlobalAveragePooling1D(),
tf.keras.layers.Dense(16, activation='relu'),
tf.keras.layers.Dropout(0.3),
tf.keras.layers.Dense(1, activation='sigmoid')
])
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
history = model.fit(train_ds, epochs=5, verbose=1)
print(f"\nFinal accuracy: {history.history['accuracy'][-1]:.4f}")
print(f"Vocabulary sample: {vectorizer.get_vocabulary()[:15]}")
Benchmarking & Profiling
To verify that your pipeline is not the bottleneck, measure its throughput independently from model training. Compare naive vs optimized pipelines to quantify the impact of prefetching, caching, and parallel mapping.
Measuring Throughput
Pipeline throughput is $T = \frac{\text{batch\_size}}{\text{step\_time}}$ samples per second. A well-optimized pipeline should produce data faster than the model can consume it, meaning the GPU utilization stays near 100%.
import tensorflow as tf
import numpy as np
import time
def benchmark(dataset, num_epochs=2, description=""):
"""Measure dataset iteration throughput."""
start = time.perf_counter()
total_batches = 0
for epoch in range(num_epochs):
for batch in dataset:
total_batches += 1
elapsed = time.perf_counter() - start
throughput = total_batches / elapsed
print(f"{description:30s} | {elapsed:.3f}s | {throughput:.1f} batches/s | {total_batches} batches")
return elapsed
# Create a dataset with a simulated expensive operation
X = np.random.randn(2000, 128).astype(np.float32)
y = np.random.randint(0, 10, 2000)
base_ds = tf.data.Dataset.from_tensor_slices((X, y))
def simulate_work(x, y):
# Simulate preprocessing work
return tf.nn.l2_normalize(x, axis=-1), y
# Pipeline variants
naive = base_ds.map(simulate_work).batch(64)
parallel = base_ds.map(simulate_work, num_parallel_calls=tf.data.AUTOTUNE).batch(64)
cached = base_ds.map(simulate_work).cache().batch(64)
optimized = (base_ds
.map(simulate_work, num_parallel_calls=tf.data.AUTOTUNE)
.cache()
.shuffle(2000)
.batch(64)
.prefetch(tf.data.AUTOTUNE)
)
# Benchmark each variant
print(f"{'Pipeline':30s} | {'Time':>7s} | {'Speed':>14s} | {'Batches':>7s}")
print("-" * 75)
benchmark(naive, description="Naive (sequential)")
benchmark(parallel, description="Parallel map")
benchmark(cached, description="Cached")
benchmark(optimized, description="Fully optimized")
TensorBoard tf.data Profiling
For production-grade profiling, use TensorBoard's built-in tf.data analyzer. Add a tf.profiler trace during training and open the tf.data Bottleneck Analysis page in TensorBoard. It shows time spent in each pipeline stage and highlights which operation is the bottleneck. Enable tracing with:
import tensorflow as tf
# Profile 2 batches starting at step 5
tf.profiler.experimental.start('logdir')
# ... run training steps ...
tf.profiler.experimental.stop()
# Or use the TensorBoard callback:
# tf.keras.callbacks.TensorBoard(log_dir='logs', profile_batch='5,7')
Summary & Next Steps
In this article, we've built a complete understanding of TensorFlow's tf.data API — from creating datasets to optimizing them for maximum GPU utilization. Here are the key takeaways:
- Always use tf.data instead of feeding raw NumPy arrays for anything beyond toy datasets
- Pipeline order matters: parse → cache → shuffle → batch → augment → prefetch
- AUTOTUNE everything:
num_parallel_callson map/interleave, buffer_size on prefetch - Cache wisely: after deterministic transforms, before random augmentation
- Shuffle buffer size should be as large as memory allows (ideally = dataset size)
- TFRecords are the gold standard for large-scale sequential-read workloads
- Benchmark independently: measure pipeline throughput separate from model training
Next in the Series
In Part 5: Training Workflows & Callbacks, we'll master TensorFlow training workflows — callbacks (EarlyStopping, ModelCheckpoint, TensorBoard), custom training loops with GradientTape, and distributed training strategies.