Back to TensorFlow Mastery Series

Part 4: Data Pipelines with tf.data

May 3, 2026 Wasil Zafar 25 min read

Build high-performance input pipelines with tf.data — from raw files to batched, shuffled, prefetched datasets that keep the GPU fully utilized and eliminate data loading bottlenecks.

Table of Contents

  1. Why tf.data?
  2. Creating Datasets
  3. Core Transformations
  4. Batching & Shuffling
  5. Performance Optimization
  6. Data Augmentation
  7. TFRecord Format
  8. Image Data Pipeline
  9. Text Data Pipeline
  10. Benchmarking & Profiling

Why tf.data?

The simplest way to train a Keras model is to pass NumPy arrays directly to model.fit(). That works for toy datasets, but it falls apart at scale. When your dataset is too large for memory, when loading from disk is slower than the GPU can compute, or when you need real-time augmentation, you need a dedicated input pipeline. That's exactly what tf.data provides.

The Core Problem: GPUs can process a batch in milliseconds, but loading that batch from disk, parsing it, and transforming it can take orders of magnitude longer. Without pipelining, the GPU sits idle while the CPU prepares the next batch. tf.data solves this by overlapping data preparation with model execution through prefetching, parallel mapping, and caching.

The ETL Pipeline Concept

Every tf.data pipeline follows the Extract → Transform → Load pattern. You extract raw data from files, tensors, or generators; transform it with map, filter, batch, and shuffle operations; and load it into the model for training. The key insight is that these stages can run concurrently — while the GPU trains on batch N, the CPU can already be preparing batch N+1.

Throughput of a data pipeline is measured as:

$$T = \frac{\text{batch\_size}}{\text{step\_time}}$$

where step_time includes both data loading and model computation. The goal of tf.data is to make data loading invisible — fully hidden behind computation time.

import tensorflow as tf
import numpy as np
import time

# Problem: Naive NumPy feeding
# The entire dataset must fit in memory, and there's no pipelining
np.random.seed(42)
X = np.random.randn(10000, 224, 224, 3).astype(np.float32)  # ~5.6 GB!
# This would crash on most machines for large image datasets

# Solution: tf.data pipeline — lazy loading, never holds full dataset
# Create a small demo to show the concept
X_small = np.random.randn(1000, 32).astype(np.float32)
y_small = np.random.randint(0, 2, 1000).astype(np.float32)

# NumPy approach — entire dataset in memory
start = time.time()
model = tf.keras.Sequential([tf.keras.layers.Dense(16, activation='relu'),
                              tf.keras.layers.Dense(1, activation='sigmoid')])
model.compile(optimizer='adam', loss='binary_crossentropy')
model.fit(X_small, y_small, batch_size=64, epochs=5, verbose=0)
numpy_time = time.time() - start

# tf.data approach — with prefetching
start = time.time()
dataset = tf.data.Dataset.from_tensor_slices((X_small, y_small))
dataset = dataset.shuffle(1000).batch(64).prefetch(tf.data.AUTOTUNE)

model2 = tf.keras.Sequential([tf.keras.layers.Dense(16, activation='relu'),
                               tf.keras.layers.Dense(1, activation='sigmoid')])
model2.compile(optimizer='adam', loss='binary_crossentropy')
model2.fit(dataset, epochs=5, verbose=0)
tfdata_time = time.time() - start

print(f"NumPy feeding:  {numpy_time:.3f}s")
print(f"tf.data pipeline: {tfdata_time:.3f}s")
print(f"Dataset element spec: {dataset.element_spec}")

For small in-memory data, both approaches perform similarly. The real gains emerge with large datasets loaded from disk, where tf.data's lazy evaluation and prefetching keep the accelerator fully utilized.

Creating Datasets

TensorFlow offers multiple factory methods to create tf.data.Dataset objects, each suited to a different data source. The most common are from_tensor_slices (in-memory data), from_generator (Python generators), and file-based constructors like TFRecordDataset and TextLineDataset.

from_tensor_slices

The workhorse for in-memory data. It slices a tensor (or tuple/dict of tensors) along the first dimension, producing one element per row. Unlike from_tensors, which wraps the entire input as a single element, from_tensor_slices iterates over individual samples.

import tensorflow as tf
import numpy as np

# from_tensor_slices — one element per row
features = np.array([[1, 2], [3, 4], [5, 6], [7, 8]], dtype=np.float32)
labels = np.array([0, 1, 1, 0], dtype=np.int32)

# Tuple of (features, labels) — most common for supervised learning
dataset = tf.data.Dataset.from_tensor_slices((features, labels))
for x, y in dataset:
    print(f"Features: {x.numpy()}, Label: {y.numpy()}")

# Dictionary input — useful for feature columns
dict_dataset = tf.data.Dataset.from_tensor_slices({
    'age': [25, 30, 35, 40],
    'income': [50000, 60000, 70000, 80000],
    'label': [0, 1, 1, 0]
})
for element in dict_dataset.take(2):
    print(element)

# from_tensors — wraps entire input as ONE element (contrast)
single_element = tf.data.Dataset.from_tensors((features, labels))
print(f"\nfrom_tensor_slices: {tf.data.Dataset.from_tensor_slices(features).cardinality().numpy()} elements")
print(f"from_tensors: {single_element.cardinality().numpy()} element")

from_generator & range

from_generator wraps any Python generator into a dataset. This is ideal for data that doesn't fit in memory or comes from custom sources (databases, APIs, complex preprocessing). Dataset.range produces integer sequences, handy for indexing or benchmarking.

import tensorflow as tf
import numpy as np

# from_generator — lazy evaluation, generates on demand
def data_generator():
    """Simulate reading from a database or file one record at a time."""
    for i in range(100):
        # Simulate variable-length sequences
        seq_len = np.random.randint(5, 20)
        features = np.random.randn(seq_len).astype(np.float32)
        label = np.random.randint(0, 3)
        yield features, label

gen_dataset = tf.data.Dataset.from_generator(
    data_generator,
    output_signature=(
        tf.TensorSpec(shape=(None,), dtype=tf.float32),   # variable length
        tf.TensorSpec(shape=(), dtype=tf.int32)
    )
)

for x, y in gen_dataset.take(3):
    print(f"Sequence length: {x.shape[0]}, Label: {y.numpy()}")

# Dataset.range — integer sequences
range_ds = tf.data.Dataset.range(0, 10, 2)  # 0, 2, 4, 6, 8
print("\nRange dataset:", list(range_ds.as_numpy_iterator()))

# list_files — glob pattern for file paths
# file_ds = tf.data.Dataset.list_files('data/images/*.jpg', shuffle=True)
# Simulated example:
print("\nDataset cardinality:", gen_dataset.cardinality().numpy())
print("(UNKNOWN = -2, means lazy/generator-based)")

File-Based Datasets

For large-scale workloads, TensorFlow provides specialized dataset classes that read directly from files without loading everything into memory: TextLineDataset for CSV/text, TFRecordDataset for binary TFRecord files, and FixedLengthRecordDataset for fixed-width binary data.

import tensorflow as tf
import tempfile
import os

# Create a temporary CSV file for demonstration
csv_content = "name,age,score\nAlice,25,88.5\nBob,30,92.0\nCharlie,35,76.3\nDiana,28,95.1\n"
tmp_dir = tempfile.mkdtemp()
csv_path = os.path.join(tmp_dir, "data.csv")
with open(csv_path, 'w') as f:
    f.write(csv_content)

# TextLineDataset — reads line by line
text_ds = tf.data.TextLineDataset(csv_path)
for line in text_ds:
    print(line.numpy().decode('utf-8'))

# Skip header and parse CSV
def parse_csv(line):
    defaults = [tf.constant(''), tf.constant(0), tf.constant(0.0)]
    fields = tf.io.decode_csv(line, record_defaults=defaults)
    return {'name': fields[0], 'age': fields[1]}, fields[2]

parsed_ds = text_ds.skip(1).map(parse_csv)  # skip header row
print("\nParsed records:")
for features, label in parsed_ds:
    print(f"  Name: {features['name'].numpy().decode()}, "
          f"Age: {features['age'].numpy()}, "
          f"Score: {label.numpy()}")

# Cleanup
os.remove(csv_path)
os.rmdir(tmp_dir)

Core Transformations

Transformations are the heart of tf.data. They are lazy — nothing executes until you iterate over the dataset. Transformations return new datasets, so you chain them fluently: dataset.map(...).filter(...).batch(...). Let's explore the essential operations.

map & filter

map() applies a function to each element. filter() keeps only elements that satisfy a predicate. Both accept a num_parallel_calls argument to parallelize work across CPU cores.

import tensorflow as tf

# Create a simple dataset
dataset = tf.data.Dataset.range(20)

# map — transform each element
squared = dataset.map(lambda x: x ** 2)
print("Squared:", list(squared.as_numpy_iterator()))

# filter — keep elements matching a predicate
evens = dataset.filter(lambda x: x % 2 == 0)
print("Evens:", list(evens.as_numpy_iterator()))

# Chaining map + filter
result = (dataset
    .filter(lambda x: x % 3 == 0)      # keep multiples of 3
    .map(lambda x: tf.cast(x, tf.float32) * 0.1)  # scale
)
print("Multiples of 3, scaled:", list(result.as_numpy_iterator()))

# map with (features, labels) tuples
features = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])
labels = tf.constant([0, 1, 1])
ds = tf.data.Dataset.from_tensor_slices((features, labels))

# Normalize features in map
def normalize(x, y):
    return (x - tf.reduce_mean(x)) / tf.math.reduce_std(x), y

normalized_ds = ds.map(normalize)
for x, y in normalized_ds:
    print(f"Normalized: {x.numpy()}, Label: {y.numpy()}")

flat_map & interleave

flat_map maps a function that returns a dataset, then flattens all inner datasets into one. interleave does the same but pulls elements from multiple inner datasets concurrently — crucial for parallel file reading.

import tensorflow as tf

# flat_map — each element produces a sub-dataset, then flatten
dataset = tf.data.Dataset.from_tensor_slices([3, 5, 2])

# For each number n, produce n copies of it
flat = dataset.flat_map(lambda x: tf.data.Dataset.from_tensors(x).repeat(x))
print("flat_map result:", list(flat.as_numpy_iterator()))
# [3, 3, 3, 5, 5, 5, 5, 5, 2, 2]

# interleave — pull from multiple sub-datasets concurrently
# Simulates reading from multiple files in parallel
sources = tf.data.Dataset.range(3)  # 3 "files"

interleaved = sources.interleave(
    lambda x: tf.data.Dataset.range(x * 10, x * 10 + 5),
    cycle_length=3,        # read from 3 sources at once
    block_length=2,        # take 2 elements from each before switching
    num_parallel_calls=tf.data.AUTOTUNE
)
print("Interleaved:", list(interleaved.as_numpy_iterator()))
# Elements from source 0 (0-4), source 1 (10-14), source 2 (20-24) — interleaved

take, skip, repeat & shard

Utility transformations for subsetting and repeating datasets. take(n) grabs the first n elements, skip(n) drops them. repeat() loops the dataset, and shard partitions it for distributed training.

import tensorflow as tf

dataset = tf.data.Dataset.range(10)

# take — first n elements (great for debugging)
print("Take 3:", list(dataset.take(3).as_numpy_iterator()))

# skip — drop first n elements
print("Skip 7:", list(dataset.skip(7).as_numpy_iterator()))

# Combine for train/val split
full_ds = tf.data.Dataset.range(100)
val_size = 20
val_ds = full_ds.take(val_size)
train_ds = full_ds.skip(val_size)
print(f"Train size: {train_ds.cardinality().numpy()}, Val size: {val_ds.cardinality().numpy()}")

# repeat — loop dataset (None = infinite)
repeated = tf.data.Dataset.range(3).repeat(2)
print("Repeat(2):", list(repeated.as_numpy_iterator()))  # [0,1,2,0,1,2]

# shard — distributed training (worker i of n)
# Worker 0 gets elements 0, 2, 4, ...
# Worker 1 gets elements 1, 3, 5, ...
shard_0 = dataset.shard(num_shards=2, index=0)
shard_1 = dataset.shard(num_shards=2, index=1)
print("Shard 0:", list(shard_0.as_numpy_iterator()))
print("Shard 1:", list(shard_1.as_numpy_iterator()))

# enumerate — add index to each element
for idx, val in dataset.enumerate().take(3):
    print(f"Index {idx.numpy()}: Value {val.numpy()}")

Batching & Shuffling

Batching groups individual elements into fixed-size tensors for efficient GPU computation. Shuffling randomizes the order to prevent the model from memorizing data ordering. Getting the shuffle buffer size right is critical for training quality.

batch & padded_batch

Here is the implementation for batch & padded_batch. Each code example below is self-contained and can be run independently:

import tensorflow as tf
import numpy as np

# Basic batching
dataset = tf.data.Dataset.range(10)
batched = dataset.batch(3)
for batch in batched:
    print("Batch:", batch.numpy())
# Output: [0,1,2], [3,4,5], [6,7,8], [9]  — last batch is smaller

# drop_remainder — ensure uniform batch sizes (important for TPUs)
batched_drop = dataset.batch(3, drop_remainder=True)
for batch in batched_drop:
    print("Uniform batch:", batch.numpy())
# Output: [0,1,2], [3,4,5], [6,7,8]  — last incomplete batch dropped

# padded_batch — for variable-length sequences
# Pad shorter sequences with zeros to match the longest in the batch
def gen_variable_sequences():
    for length in [2, 5, 3, 7, 1, 4]:
        yield tf.random.normal([length])

var_ds = tf.data.Dataset.from_generator(
    gen_variable_sequences,
    output_signature=tf.TensorSpec(shape=(None,), dtype=tf.float32)
)

padded = var_ds.padded_batch(3, padded_shapes=[10])  # pad to length 10
for batch in padded:
    print(f"Padded batch shape: {batch.shape}")
    print(f"  Values:\n{batch.numpy()}\n")

shuffle Buffer Size

The shuffle(buffer_size) operation maintains a buffer of buffer_size elements and randomly samples from it. A buffer equal to the dataset size gives perfect shuffling; a smaller buffer provides approximate shuffling with less memory. Setting buffer_size=1 means no shuffling at all.

Buffer Size Matters: A common mistake is setting buffer_size too small. If your dataset has 10,000 elements but you set buffer_size=100, elements that are far apart in the original order will never appear in the same shuffle window. For small datasets, use buffer_size=len(dataset). For large datasets, use the largest value your memory allows — at least 1,000–10,000.
import tensorflow as tf

# Demonstrate shuffle buffer size effects
dataset = tf.data.Dataset.range(20)

# Perfect shuffle — buffer = dataset size
perfect = dataset.shuffle(buffer_size=20, seed=42)
print("Perfect shuffle (buffer=20):", list(perfect.as_numpy_iterator()))

# Approximate shuffle — small buffer
approx = dataset.shuffle(buffer_size=3, seed=42)
print("Small buffer   (buffer=3): ", list(approx.as_numpy_iterator()))

# No shuffle — buffer = 1
no_shuffle = dataset.shuffle(buffer_size=1)
print("No shuffle     (buffer=1): ", list(no_shuffle.as_numpy_iterator()))

# reshuffle_each_iteration — different order each epoch (default True)
ds = tf.data.Dataset.range(5).shuffle(5, reshuffle_each_iteration=True)
print("\nEpoch 1:", list(ds.as_numpy_iterator()))
print("Epoch 2:", list(ds.as_numpy_iterator()))

# Standard pattern: shuffle BEFORE batch
pipeline = (tf.data.Dataset.range(100)
    .shuffle(buffer_size=100)   # shuffle first
    .batch(16)                   # then batch
    .prefetch(tf.data.AUTOTUNE)  # overlap with training
)
print(f"\nPipeline element spec: {pipeline.element_spec}")

Performance Optimization

The three pillars of tf.data performance are prefetching, caching, and parallel mapping. Together, they ensure that data preparation never becomes the bottleneck — the GPU always has a batch ready when it finishes the previous one.

prefetch & cache

prefetch(buffer_size) overlaps data preprocessing with model execution. While the model trains on batch N, the input pipeline prepares batch N+1. Using tf.data.AUTOTUNE lets TensorFlow dynamically tune the buffer size at runtime.

cache() stores the dataset in memory (or on disk) after the first epoch, eliminating repeated I/O and transformation costs for subsequent epochs. Place it after expensive transformations but before random augmentation (which should vary each epoch).

import tensorflow as tf
import numpy as np
import time

# Simulate an expensive preprocessing step
def expensive_preprocess(x, y):
    # Simulate 10ms of CPU work per sample
    tf.py_function(lambda: time.sleep(0.001), [], [])
    return tf.cast(x, tf.float32) / 255.0, y

# Create dataset
X = np.random.randint(0, 256, (500, 28, 28), dtype=np.uint8)
y = np.random.randint(0, 10, 500, dtype=np.int32)
base_ds = tf.data.Dataset.from_tensor_slices((X, y))

# Without optimization — sequential
ds_naive = base_ds.map(expensive_preprocess).batch(32)

# With cache — preprocess once, reuse
ds_cached = base_ds.map(expensive_preprocess).cache().batch(32)

# With prefetch — overlap prep and training
ds_prefetch = base_ds.map(expensive_preprocess).batch(32).prefetch(tf.data.AUTOTUNE)

# With both — best of both worlds
ds_optimal = (base_ds
    .map(expensive_preprocess, num_parallel_calls=tf.data.AUTOTUNE)
    .cache()                        # cache after expensive ops
    .shuffle(500)                   # shuffle after cache (varies each epoch)
    .batch(32)
    .prefetch(tf.data.AUTOTUNE)     # always last in the chain
)

# cache to disk for datasets too large for memory
# ds_disk_cache = base_ds.map(expensive_preprocess).cache('/tmp/tf_cache')

print("Naive pipeline spec:", ds_naive.element_spec)
print("Optimal pipeline spec:", ds_optimal.element_spec)
print("AUTOTUNE value:", tf.data.AUTOTUNE)

Parallel Calls & I/O

Set num_parallel_calls=tf.data.AUTOTUNE on map() and interleave() to parallelize transformation across CPU cores. For file-based pipelines, use interleave to read from multiple files simultaneously.

Performance Pattern
The Optimal Pipeline Recipe

For most training workloads, the optimal tf.data pipeline follows this pattern:

  1. interleave — read from multiple files in parallel
  2. map with num_parallel_calls=AUTOTUNE — parse/preprocess in parallel
  3. cache — store after deterministic transforms (skip for huge datasets)
  4. shuffle — randomize order (after cache so it varies each epoch)
  5. batch — group into batches
  6. map — apply random augmentation (after batch for vectorized ops)
  7. prefetch(AUTOTUNE) — overlap with training (always last)
prefetch cache parallel AUTOTUNE

Pipeline Stages Diagram

The diagram below visualizes how pipeline stages overlap with training. Without prefetching, the GPU waits for data. With prefetching and parallel map, data preparation happens concurrently with gradient computation.

tf.data Pipeline Stages (Parallel Execution)
flowchart LR
    subgraph Extract
        A[Read Files] --> B[Interleave I/O]
    end

    subgraph Transform
        B --> C[Parse / Decode]
        C --> D[Map / Preprocess]
        D --> E[Cache]
        E --> F[Shuffle]
        F --> G[Batch]
        G --> H[Augment]
    end

    subgraph Load
        H --> I[Prefetch]
        I --> J[GPU Training]
    end

    style A fill:#132440,stroke:#3B9797,color:#fff
    style B fill:#16476A,stroke:#3B9797,color:#fff
    style C fill:#16476A,stroke:#3B9797,color:#fff
    style D fill:#3B9797,stroke:#132440,color:#fff
    style E fill:#3B9797,stroke:#132440,color:#fff
    style F fill:#3B9797,stroke:#132440,color:#fff
    style G fill:#3B9797,stroke:#132440,color:#fff
    style H fill:#3B9797,stroke:#132440,color:#fff
    style I fill:#BF092F,stroke:#132440,color:#fff
    style J fill:#132440,stroke:#BF092F,color:#fff
                            

Data Augmentation

Data augmentation artificially expands your training set by applying random transformations (flips, rotations, crops, color jitter) to each sample. In tf.data pipelines, augmentation is applied inside map() — typically after batching for better vectorized performance.

tf.image Transformations

Here is the implementation for tf.image Transformations. Each code example below is self-contained and can be run independently:

import tensorflow as tf
import numpy as np

# Create a synthetic image batch (4 images, 64x64, RGB)
np.random.seed(42)
images = np.random.randint(0, 256, (4, 64, 64, 3), dtype=np.uint8)
images_f32 = tf.cast(images, tf.float32) / 255.0

# Individual augmentation operations
flipped = tf.image.random_flip_left_right(images_f32[0])
print("Flipped shape:", flipped.shape)

brightness = tf.image.random_brightness(images_f32[0], max_delta=0.2)
print("Brightness adjusted shape:", brightness.shape)

contrast = tf.image.random_contrast(images_f32[0], lower=0.8, upper=1.2)
print("Contrast adjusted shape:", contrast.shape)

saturation = tf.image.random_saturation(images_f32[0], lower=0.8, upper=1.2)
print("Saturation adjusted shape:", saturation.shape)

hue = tf.image.random_hue(images_f32[0], max_delta=0.1)
print("Hue adjusted shape:", hue.shape)

# Central crop and resize
cropped = tf.image.central_crop(images_f32[0], central_fraction=0.75)
resized = tf.image.resize(cropped, [64, 64])
print("Crop + Resize shape:", resized.shape)

# Random crop
random_crop = tf.image.random_crop(images_f32[0], size=[48, 48, 3])
print("Random crop shape:", random_crop.shape)

Augmentation Layers

Keras preprocessing layers provide a higher-level API for augmentation that integrates directly into the model or pipeline. They're especially convenient because they respect training=True/False — augmentation is automatically disabled during inference.

import tensorflow as tf
import numpy as np

# Build an augmentation pipeline using Keras layers
augmentation = tf.keras.Sequential([
    tf.keras.layers.RandomFlip("horizontal"),
    tf.keras.layers.RandomRotation(0.1),        # ±10% of full rotation
    tf.keras.layers.RandomZoom(0.1),             # ±10% zoom
    tf.keras.layers.RandomContrast(0.1),         # ±10% contrast
], name='augmentation')

# Create dataset of synthetic images
np.random.seed(42)
X = np.random.rand(100, 64, 64, 3).astype(np.float32)
y = np.random.randint(0, 5, 100)
ds = tf.data.Dataset.from_tensor_slices((X, y))

# Apply augmentation inside map — only during training
def augment_fn(image, label):
    image = augmentation(image, training=True)
    return image, label

train_ds = (ds
    .shuffle(100)
    .batch(16)
    .map(augment_fn, num_parallel_calls=tf.data.AUTOTUNE)
    .prefetch(tf.data.AUTOTUNE)
)

# Alternatively, include augmentation in the model itself
model = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(64, 64, 3)),
    augmentation,                                  # augmentation layers
    tf.keras.layers.Conv2D(32, 3, activation='relu'),
    tf.keras.layers.GlobalAveragePooling2D(),
    tf.keras.layers.Dense(5, activation='softmax')
])

model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
print(model.summary())

TFRecord Format

TFRecord is TensorFlow's native binary storage format. It stores data as sequential byte records in Protocol Buffer format, enabling fast sequential reads, built-in compression, and schema-flexible storage. For large-scale ML pipelines, TFRecords are the standard choice over CSV, JSON, or raw image files.

Writing TFRecords

Each example in a TFRecord file is a serialized tf.train.Example proto containing a map of feature names to tf.train.Feature values. Features can be bytes (strings, images), floats, or int64s.

import tensorflow as tf
import numpy as np
import tempfile
import os

# Helper functions to create Feature objects
def _bytes_feature(value):
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _float_feature(value):
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def _int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

def _float_list_feature(value):
    return tf.train.Feature(float_list=tf.train.FloatList(value=value))

# Create synthetic data
np.random.seed(42)
num_examples = 50
images = np.random.randint(0, 256, (num_examples, 32, 32, 3), dtype=np.uint8)
labels = np.random.randint(0, 10, num_examples)
scores = np.random.rand(num_examples).astype(np.float32)

# Write TFRecord file
tfrecord_path = os.path.join(tempfile.mkdtemp(), 'data.tfrecord')

with tf.io.TFRecordWriter(tfrecord_path) as writer:
    for i in range(num_examples):
        # Serialize image to raw bytes
        image_bytes = images[i].tobytes()

        # Build tf.train.Example
        example = tf.train.Example(features=tf.train.Features(feature={
            'image': _bytes_feature(image_bytes),
            'label': _int64_feature(int(labels[i])),
            'score': _float_feature(float(scores[i])),
            'height': _int64_feature(32),
            'width': _int64_feature(32),
        }))

        writer.write(example.SerializeToString())

file_size = os.path.getsize(tfrecord_path)
print(f"Wrote {num_examples} examples to {tfrecord_path}")
print(f"File size: {file_size / 1024:.1f} KB")
print(f"Avg bytes/example: {file_size / num_examples:.0f}")

Reading & Parsing

To read TFRecords back, create a TFRecordDataset and apply a map function that parses the serialized proto back into tensors. The parse function defines the expected schema using tf.io.parse_single_example.

TFRecord Write / Read Flow
flowchart TD
    subgraph Write
        A[Raw Data] --> B[Create tf.train.Feature]
        B --> C[Build tf.train.Example]
        C --> D[Serialize to Bytes]
        D --> E[TFRecordWriter]
        E --> F[.tfrecord File]
    end

    subgraph Read
        F --> G[TFRecordDataset]
        G --> H[map parse_fn]
        H --> I[tf.io.parse_single_example]
        I --> J[Decoded Tensors]
        J --> K[Pipeline: batch, prefetch]
    end

    style A fill:#132440,stroke:#3B9797,color:#fff
    style F fill:#BF092F,stroke:#132440,color:#fff
    style J fill:#3B9797,stroke:#132440,color:#fff
    style K fill:#132440,stroke:#3B9797,color:#fff
                            
import tensorflow as tf
import numpy as np
import tempfile
import os

# Helper to write a small TFRecord file
def _bytes_feature(value):
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def _int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

np.random.seed(42)
tfrecord_path = os.path.join(tempfile.mkdtemp(), 'data.tfrecord')
with tf.io.TFRecordWriter(tfrecord_path) as writer:
    for i in range(50):
        img = np.random.randint(0, 256, (32, 32, 3), dtype=np.uint8)
        example = tf.train.Example(features=tf.train.Features(feature={
            'image': _bytes_feature(img.tobytes()),
            'label': _int64_feature(np.random.randint(0, 10)),
            'height': _int64_feature(32),
            'width': _int64_feature(32),
        }))
        writer.write(example.SerializeToString())

# Define the parse function — must match the write schema
def parse_tfrecord(serialized):
    feature_description = {
        'image': tf.io.FixedLenFeature([], tf.string),
        'label': tf.io.FixedLenFeature([], tf.int64),
        'height': tf.io.FixedLenFeature([], tf.int64),
        'width': tf.io.FixedLenFeature([], tf.int64),
    }
    parsed = tf.io.parse_single_example(serialized, feature_description)

    # Decode raw bytes back to tensor
    image = tf.io.decode_raw(parsed['image'], tf.uint8)
    image = tf.reshape(image, [parsed['height'], parsed['width'], 3])
    image = tf.cast(image, tf.float32) / 255.0

    return image, parsed['label']

# Build the pipeline
dataset = (tf.data.TFRecordDataset(tfrecord_path)
    .map(parse_tfrecord, num_parallel_calls=tf.data.AUTOTUNE)
    .shuffle(50)
    .batch(8)
    .prefetch(tf.data.AUTOTUNE)
)

# Verify
for images, labels in dataset.take(1):
    print(f"Batch images shape: {images.shape}")
    print(f"Batch labels: {labels.numpy()}")
    print(f"Pixel range: [{images.numpy().min():.2f}, {images.numpy().max():.2f}]")

# Cleanup
os.remove(tfrecord_path)

Image Data Pipeline

Now let's combine everything into a complete, production-quality image pipeline that reads files from disk, decodes them, resizes, augments, batches, and prefetches — the kind of pipeline you'd use for training a CNN on a real dataset.

End-to-End Pipeline

Here is the implementation for End-to-End Pipeline. Each code example below is self-contained and can be run independently:

import tensorflow as tf
import numpy as np
import tempfile
import os

# Create a temporary directory with synthetic "image" files
tmp_dir = tempfile.mkdtemp()
for split in ['train', 'val']:
    for cls in ['cat', 'dog']:
        cls_dir = os.path.join(tmp_dir, split, cls)
        os.makedirs(cls_dir, exist_ok=True)
        for i in range(20):
            # Create small random PNG images
            img = tf.random.uniform([64, 64, 3], 0, 255, dtype=tf.float32)
            img_bytes = tf.io.encode_png(tf.cast(img, tf.uint8))
            tf.io.write_file(os.path.join(cls_dir, f'{i:03d}.png'), img_bytes)

print(f"Created dataset at: {tmp_dir}")

# Configuration
IMG_SIZE = 48
BATCH_SIZE = 8
AUTOTUNE = tf.data.AUTOTUNE
class_names = ['cat', 'dog']

# Step 1: List all image files with labels
def get_file_paths_and_labels(data_dir):
    file_paths = []
    labels = []
    for idx, cls_name in enumerate(class_names):
        cls_dir = os.path.join(data_dir, cls_name)
        for fname in os.listdir(cls_dir):
            file_paths.append(os.path.join(cls_dir, fname))
            labels.append(idx)
    return file_paths, labels

train_paths, train_labels = get_file_paths_and_labels(os.path.join(tmp_dir, 'train'))

# Step 2: Parse function — read, decode, resize
def parse_image(file_path, label):
    raw = tf.io.read_file(file_path)
    image = tf.io.decode_png(raw, channels=3)
    image = tf.image.resize(image, [IMG_SIZE, IMG_SIZE])
    image = tf.cast(image, tf.float32) / 255.0
    return image, label

# Step 3: Augmentation function
def augment(image, label):
    image = tf.image.random_flip_left_right(image)
    image = tf.image.random_brightness(image, max_delta=0.2)
    image = tf.image.random_contrast(image, 0.8, 1.2)
    image = tf.clip_by_value(image, 0.0, 1.0)
    return image, label

# Step 4: Build the complete pipeline
train_ds = (tf.data.Dataset.from_tensor_slices((train_paths, train_labels))
    .shuffle(len(train_paths))
    .map(parse_image, num_parallel_calls=AUTOTUNE)
    .cache()
    .map(augment, num_parallel_calls=AUTOTUNE)
    .batch(BATCH_SIZE)
    .prefetch(AUTOTUNE)
)

# Verify the pipeline
for images, labels in train_ds.take(1):
    print(f"Batch shape: {images.shape}")
    print(f"Labels: {labels.numpy()}")
    print(f"Pixel range: [{images.numpy().min():.3f}, {images.numpy().max():.3f}]")

# Cleanup
import shutil
shutil.rmtree(tmp_dir)
Pipeline Order Matters: The order of operations is deliberate. Parse (deterministic) → cache (store decoded images) → augment (random, varies each epoch) → batch → prefetch. If you cache after augmentation, every epoch sees the same augmented images, defeating the purpose.

Text Data Pipeline

Text pipelines follow the same ETL pattern but swap image decoding for tokenization. The TextVectorization layer handles vocabulary building, tokenization, and padding — all within a tf.data-compatible workflow.

TextVectorization Layer

Here is the implementation for TextVectorization Layer. Each code example below is self-contained and can be run independently:

import tensorflow as tf
import numpy as np

# Sample text data
texts = [
    "TensorFlow makes deep learning easy",
    "Data pipelines are essential for training",
    "GPUs accelerate neural network training",
    "Keras provides high-level model building APIs",
    "Batch processing improves throughput significantly",
    "Prefetching overlaps data loading with computation",
    "Augmentation prevents overfitting on small datasets",
    "TFRecords enable efficient binary data storage",
]
labels = [1, 1, 1, 1, 0, 0, 0, 0]  # positive/negative sentiment

# Create TextVectorization layer
text_vectorizer = tf.keras.layers.TextVectorization(
    max_tokens=200,          # vocabulary size limit
    output_mode='int',       # integer token IDs
    output_sequence_length=15 # pad/truncate to fixed length
)

# Adapt on training data to build vocabulary
text_vectorizer.adapt(texts)

# Inspect the vocabulary
vocab = text_vectorizer.get_vocabulary()
print(f"Vocabulary size: {len(vocab)}")
print(f"First 20 tokens: {vocab[:20]}")

# Vectorize a sample
sample = tf.constant(["TensorFlow data pipelines are fast"])
vectorized = text_vectorizer(sample)
print(f"\nInput:    'TensorFlow data pipelines are fast'")
print(f"Encoded:  {vectorized.numpy()[0]}")

# Decode back (manually)
decoded = [vocab[i] for i in vectorized.numpy()[0] if i > 0]
print(f"Decoded:  {' '.join(decoded)}")

Complete Text Classification Pipeline

Here is the implementation for Complete Text Classification Pipeline. Each code example below is self-contained and can be run independently:

import tensorflow as tf
import numpy as np

# Generate synthetic text classification data
train_texts = [
    "great product highly recommend", "terrible quality waste of money",
    "absolutely love this item", "broken on arrival very disappointed",
    "excellent service fast delivery", "worst purchase ever made",
    "fantastic value for the price", "does not work as described",
    "best thing I ever bought", "complete garbage do not buy",
    "amazing quality exceeded expectations", "poor craftsmanship fell apart",
    "wonderful experience will buy again", "horrible customer service",
    "perfect gift for anyone", "total ripoff avoid at all costs",
] * 10  # repeat for more data

train_labels = ([1, 0] * 8) * 10  # 1=positive, 0=negative

# Step 1: TextVectorization layer
vectorizer = tf.keras.layers.TextVectorization(
    max_tokens=500,
    output_mode='int',
    output_sequence_length=20
)
vectorizer.adapt(train_texts)

# Step 2: Build tf.data pipeline
dataset = tf.data.Dataset.from_tensor_slices((train_texts, train_labels))

def vectorize_text(text, label):
    text = tf.expand_dims(text, -1)   # vectorizer expects batch dim
    return vectorizer(text)[0], label  # remove batch dim

train_ds = (dataset
    .shuffle(len(train_texts))
    .map(vectorize_text, num_parallel_calls=tf.data.AUTOTUNE)
    .batch(16)
    .prefetch(tf.data.AUTOTUNE)
)

# Step 3: Build and train model
model = tf.keras.Sequential([
    tf.keras.layers.Embedding(500, 32, input_length=20),
    tf.keras.layers.GlobalAveragePooling1D(),
    tf.keras.layers.Dense(16, activation='relu'),
    tf.keras.layers.Dropout(0.3),
    tf.keras.layers.Dense(1, activation='sigmoid')
])

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
history = model.fit(train_ds, epochs=5, verbose=1)

print(f"\nFinal accuracy: {history.history['accuracy'][-1]:.4f}")
print(f"Vocabulary sample: {vectorizer.get_vocabulary()[:15]}")

Benchmarking & Profiling

To verify that your pipeline is not the bottleneck, measure its throughput independently from model training. Compare naive vs optimized pipelines to quantify the impact of prefetching, caching, and parallel mapping.

Measuring Throughput

Pipeline throughput is $T = \frac{\text{batch\_size}}{\text{step\_time}}$ samples per second. A well-optimized pipeline should produce data faster than the model can consume it, meaning the GPU utilization stays near 100%.

import tensorflow as tf
import numpy as np
import time

def benchmark(dataset, num_epochs=2, description=""):
    """Measure dataset iteration throughput."""
    start = time.perf_counter()
    total_batches = 0
    for epoch in range(num_epochs):
        for batch in dataset:
            total_batches += 1
    elapsed = time.perf_counter() - start
    throughput = total_batches / elapsed
    print(f"{description:30s} | {elapsed:.3f}s | {throughput:.1f} batches/s | {total_batches} batches")
    return elapsed

# Create a dataset with a simulated expensive operation
X = np.random.randn(2000, 128).astype(np.float32)
y = np.random.randint(0, 10, 2000)
base_ds = tf.data.Dataset.from_tensor_slices((X, y))

def simulate_work(x, y):
    # Simulate preprocessing work
    return tf.nn.l2_normalize(x, axis=-1), y

# Pipeline variants
naive     = base_ds.map(simulate_work).batch(64)
parallel  = base_ds.map(simulate_work, num_parallel_calls=tf.data.AUTOTUNE).batch(64)
cached    = base_ds.map(simulate_work).cache().batch(64)
optimized = (base_ds
    .map(simulate_work, num_parallel_calls=tf.data.AUTOTUNE)
    .cache()
    .shuffle(2000)
    .batch(64)
    .prefetch(tf.data.AUTOTUNE)
)

# Benchmark each variant
print(f"{'Pipeline':30s} | {'Time':>7s} | {'Speed':>14s} | {'Batches':>7s}")
print("-" * 75)
benchmark(naive,     description="Naive (sequential)")
benchmark(parallel,  description="Parallel map")
benchmark(cached,    description="Cached")
benchmark(optimized, description="Fully optimized")
Profiling Tip
TensorBoard tf.data Profiling

For production-grade profiling, use TensorBoard's built-in tf.data analyzer. Add a tf.profiler trace during training and open the tf.data Bottleneck Analysis page in TensorBoard. It shows time spent in each pipeline stage and highlights which operation is the bottleneck. Enable tracing with:

import tensorflow as tf

# Profile 2 batches starting at step 5
tf.profiler.experimental.start('logdir')
# ... run training steps ...
tf.profiler.experimental.stop()

# Or use the TensorBoard callback:
# tf.keras.callbacks.TensorBoard(log_dir='logs', profile_batch='5,7')
TensorBoard profiling bottleneck

Summary & Next Steps

In this article, we've built a complete understanding of TensorFlow's tf.data API — from creating datasets to optimizing them for maximum GPU utilization. Here are the key takeaways:

Key Takeaways:
  • Always use tf.data instead of feeding raw NumPy arrays for anything beyond toy datasets
  • Pipeline order matters: parse → cache → shuffle → batch → augment → prefetch
  • AUTOTUNE everything: num_parallel_calls on map/interleave, buffer_size on prefetch
  • Cache wisely: after deterministic transforms, before random augmentation
  • Shuffle buffer size should be as large as memory allows (ideally = dataset size)
  • TFRecords are the gold standard for large-scale sequential-read workloads
  • Benchmark independently: measure pipeline throughput separate from model training

Next in the Series

In Part 5: Training Workflows & Callbacks, we'll master TensorFlow training workflows — callbacks (EarlyStopping, ModelCheckpoint, TensorBoard), custom training loops with GradientTape, and distributed training strategies.