History of Spark
The story of Apache Spark begins in 2009 at the UC Berkeley AMPLab (Algorithms, Machines, and People Lab), where PhD student Matei Zaharia was working on cluster computing research. At the time, Hadoop MapReduce was the dominant framework for distributed data processing, powering data pipelines at Yahoo, Facebook, and dozens of other companies. But MapReduce had severe limitations.
Google had published the original MapReduce paper in 2004, and Doug Cutting and Mike Cafarella built the open-source Hadoop implementation starting in 2005. MapReduce was revolutionary for batch processing: it could process petabytes of data on commodity hardware. But it forced every computation into a rigid two-phase pattern (map then reduce), and it wrote all intermediate data to the Hadoop Distributed File System (HDFS) between stages. For a simple word count, this was acceptable. For a machine learning algorithm that iterates over the same dataset 100 times, it was devastating.
The Birth of RDDs
Zaharia's key insight was the Resilient Distributed Dataset (RDD), described in his 2012 NSDI paper "Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing." RDDs kept data in memory across operations while providing fault tolerance through lineage (the ability to recompute lost partitions from the original transformation chain rather than replicating data). This paper won the NSDI Test of Time award in 2022.
The first Spark paper was published at HotCloud 2010, and the project was open-sourced in 2010. It entered the Apache Incubator in June 2013 and became a top-level Apache project in February 2014. In 2013, Zaharia co-founded Databricks with six other UC Berkeley researchers to commercialize Spark. Databricks has since grown into a company valued at over $43 billion (as of 2023), providing a managed Spark platform used by thousands of enterprises.
Spark vs MapReduce: The Numbers
| Metric | Hadoop MapReduce | Apache Spark |
|---|---|---|
| Processing model | Disk-based, two-phase | In-memory, DAG-based |
| Iterative processing | 10-100x slower (disk I/O per iteration) | Fast (data cached in memory) |
| Latency | Minutes to hours (job startup overhead) | Seconds to minutes |
| Languages | Java (primarily) | Scala, Python, Java, R, SQL |
| Streaming | Not built-in (Storm/Flink needed) | Structured Streaming built-in |
| Machine Learning | Mahout (separate project) | MLlib built-in |
| Sort record (2014) | 72 min for 100 TB (2013) | 23 min for 100 TB (3x fewer machines) |
In November 2014, Spark set a new world record for large-scale sorting: it sorted 100 TB of data in 23 minutes using 206 machines, beating the previous Hadoop record of 72 minutes on 2,100 machines. This benchmark made headlines and cemented Spark as the next generation of big data processing.
Spark Architecture
Understanding Spark's architecture is essential for writing efficient applications and debugging performance issues. Spark uses a master-worker architecture with a Driver program that coordinates Executors running on cluster nodes.
Driver and Executor Model
Spark Application Components
- Driver: The JVM process that runs your main() function. It creates the SparkContext/SparkSession, builds the DAG of operations, splits the DAG into stages, and schedules tasks on executors. The driver runs on one node (or on the client machine in client mode).
- Executor: JVM processes that run on worker nodes. Each executor runs tasks (the smallest unit of work), stores cached data in memory, and reports status back to the driver. A typical configuration runs 1-5 executors per physical machine.
- Cluster Manager: External service that allocates resources. Spark supports Standalone (built-in), YARN (Hadoop), Kubernetes, and Mesos (deprecated as of Spark 3.4).
- Task: A unit of work sent to one executor, operating on one partition of data. Tasks within the same stage run in parallel across executors.
DAG Scheduler
When you write Spark code, you describe a series of transformations (lazy) and actions (eager). The DAG (Directed Acyclic Graph) scheduler converts your logical plan into a physical execution plan by breaking it into stages at shuffle boundaries. Within each stage, tasks can be pipelined (run sequentially on the same partition without materializing intermediate results).
# This code creates a DAG with 2 stages (separated by the groupByKey shuffle)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DAGExample").getOrCreate()
sc = spark.sparkContext
# Stage 1: Read + Map (no shuffle needed)
rdd = sc.textFile("hdfs:///data/logs/*.gz") # Read from HDFS
pairs = rdd.map(lambda line: (line.split()[0], 1)) # Map: extract key
# Stage 2: Shuffle + Reduce (requires data movement between nodes)
counts = pairs.reduceByKey(lambda a, b: a + b) # Shuffle + Reduce
top10 = counts.takeOrdered(10, key=lambda x: -x[1]) # Action triggers execution
print(top10)
Cluster Managers
| Manager | Best For | Resource Sharing | Complexity |
|---|---|---|---|
| Standalone | Development, small clusters | Spark only | Low |
| YARN | Hadoop ecosystems, shared clusters | Multi-tenant (Hive, Spark, MapReduce) | Medium |
| Kubernetes | Cloud-native, containerized workloads | Multi-tenant (any workload) | High |
| Mesos | Legacy (deprecated in Spark 3.4) | Multi-framework | High |
RDDs: The Foundation
Resilient Distributed Datasets are the fundamental data structure in Spark. While modern Spark code primarily uses DataFrames and Datasets (which sit on top of RDDs), understanding RDDs is essential for performance tuning, debugging, and working with unstructured data.
Creating RDDs
from pyspark import SparkContext
sc = SparkContext("local[4]", "RDD Demo")
# From a Python collection (parallelize)
numbers = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], numSlices=4)
print(f"Partitions: {numbers.getNumPartitions()}") # 4
# From a file (each line becomes an element)
logs = sc.textFile("hdfs:///var/log/access.log")
# From a file with explicit partitioning
logs_partitioned = sc.textFile("hdfs:///var/log/access.log", minPartitions=16)
# From a whole directory of files
all_logs = sc.textFile("hdfs:///var/log/2025/*/*.log")
Transformations vs Actions
count() inside a loop, triggering a full recomputation each time.
# Transformations (lazy - return new RDDs)
filtered = logs.filter(lambda line: "ERROR" in line) # Narrow
mapped = filtered.map(lambda line: (line.split()[0], 1)) # Narrow
reduced = mapped.reduceByKey(lambda a, b: a + b) # Wide (shuffle)
sorted_rdd = reduced.sortByKey(ascending=False) # Wide (shuffle)
# Nothing has executed yet! The DAG is just being built.
# Actions (eager - trigger computation)
count = sorted_rdd.count() # Returns a number
top5 = sorted_rdd.take(5) # Returns a list
all_data = sorted_rdd.collect() # Returns entire RDD to driver (DANGEROUS for large data)
sorted_rdd.saveAsTextFile("hdfs:///output/errors") # Writes to HDFS
Lineage and Fault Tolerance
Each RDD remembers the chain of transformations used to create it. If a partition is lost (due to a node failure), Spark recomputes only that partition from the original data using the lineage graph. This is more efficient than data replication (the approach used by HDFS) because the lineage is much smaller than the actual data.
# View the lineage (dependency graph) of an RDD
print(sorted_rdd.toDebugString())
# Output:
# (4) MapPartitionsRDD[6] at sortByKey
# | ShuffledRDD[5] at reduceByKey
# +-(4) MapPartitionsRDD[4] at map
# | FilteredRDD[3] at filter
# | hdfs:///var/log/access.log MapPartitionsRDD[1]
Caching and Persistence
from pyspark import StorageLevel
# cache() = persist(StorageLevel.MEMORY_ONLY)
errors = logs.filter(lambda line: "ERROR" in line).cache()
# Persistence levels
errors.persist(StorageLevel.MEMORY_ONLY) # Deserialize in JVM heap
errors.persist(StorageLevel.MEMORY_AND_DISK) # Spill to disk if no memory
errors.persist(StorageLevel.MEMORY_ONLY_SER) # Serialized (less memory, more CPU)
errors.persist(StorageLevel.DISK_ONLY) # Disk only (for very large RDDs)
errors.persist(StorageLevel.OFF_HEAP) # Off-heap memory (Tungsten)
# IMPORTANT: cache() is lazy - data is cached on first action
count = errors.count() # First action: computes AND caches
count2 = errors.count() # Second action: reads from cache (fast)
# Unpersist when done to free memory
errors.unpersist()
Partitioning
# Repartition: shuffle data to change partition count
# Use when you need more parallelism or even distribution
balanced = skewed_rdd.repartition(200)
# Coalesce: reduce partitions WITHOUT a full shuffle
# Use after filter() that removes most data
filtered = large_rdd.filter(lambda x: x.value > 1000)
compact = filtered.coalesce(10) # Merge partitions locally
# Custom partitioning for key-value RDDs
from pyspark import Partitioner
class CustomerPartitioner(Partitioner):
def __init__(self, num_partitions):
self.num_partitions = num_partitions
def getPartition(self, key):
return hash(key) % self.num_partitions
# Partition by customer_id for efficient joins
customer_orders = orders.partitionBy(100, lambda key: hash(key) % 100)
DataFrames & Datasets
DataFrames were introduced in Spark 1.3 (2015) and represent a major evolution over raw RDDs. A DataFrame is a distributed collection of rows organized into named columns, similar to a table in a relational database or a pandas DataFrame. The key advantage is that Spark can inspect the schema and optimize the query plan, something impossible with opaque RDD lambda functions.
Creating DataFrames
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
spark = SparkSession.builder \
.appName("DataFrame Demo") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# From CSV with schema inference
df = spark.read.csv("hdfs:///data/sales.csv", header=True, inferSchema=True)
# From JSON
df_json = spark.read.json("s3a://bucket/events/*.json")
# From Parquet (columnar, compressed - preferred format)
df_parquet = spark.read.parquet("hdfs:///data/warehouse/users/")
# With explicit schema (faster than inferSchema for large files)
schema = StructType([
StructField("user_id", IntegerType(), nullable=False),
StructField("name", StringType(), nullable=True),
StructField("age", IntegerType(), nullable=True),
StructField("salary", DoubleType(), nullable=True),
StructField("department", StringType(), nullable=True)
])
df_explicit = spark.read.schema(schema).csv("hdfs:///data/employees.csv", header=True)
# From a Python list (for testing)
data = [("Alice", 30, 85000.0), ("Bob", 25, 72000.0), ("Charlie", 35, 95000.0)]
df_test = spark.createDataFrame(data, ["name", "age", "salary"])
Catalyst Optimizer
The Catalyst optimizer is Spark SQL's query optimization engine. It takes your logical plan (what you want to compute) and produces an optimized physical plan (how to compute it efficiently). Catalyst applies rule-based and cost-based optimizations including predicate pushdown, column pruning, constant folding, and join reordering.
# See the query plan at different optimization stages
df_result = df.filter(df.age > 25).select("name", "salary").filter(df.salary > 80000)
# Logical plan (before optimization)
df_result.explain(mode="extended")
# Example output:
# == Parsed Logical Plan ==
# Filter (salary > 80000)
# +- Project [name, salary]
# +- Filter (age > 25)
# +- Relation[user_id, name, age, salary, department] csv
#
# == Optimized Logical Plan ==
# Project [name, salary]
# +- Filter ((age > 25) AND (salary > 80000)) ← Filters combined!
# +- Relation[user_id, name, age, salary, department] csv
#
# == Physical Plan ==
# Project [name, salary]
# +- Filter ((age > 25) AND (salary > 80000))
# +- FileScan csv [name, age, salary] ← Only 3 columns read, not 5!
user_id and department columns entirely (column pruning). For Parquet files, column pruning means those columns are never even read from disk. This is why DataFrames are almost always faster than raw RDDs for structured data.
Tungsten Execution Engine
Project Tungsten (introduced in Spark 1.4-1.6, fully realized in Spark 2.0) optimizes Spark's memory management and code generation. Instead of storing data as Java objects (which have high overhead due to object headers and garbage collection), Tungsten uses off-heap binary format and generates JVM bytecode at runtime for critical operations. The result is 2-10x performance improvement over Spark 1.x for many workloads.
Spark SQL
Spark SQL allows you to query DataFrames using standard SQL syntax. This makes Spark accessible to data analysts and BI tools that speak SQL, while still benefiting from Spark's distributed execution engine.
# Register DataFrame as a temporary SQL view
df.createOrReplaceTempView("employees")
# Standard SQL queries
result = spark.sql("""
SELECT department,
COUNT(*) as employee_count,
AVG(salary) as avg_salary,
MAX(salary) as max_salary
FROM employees
WHERE age > 25
GROUP BY department
HAVING COUNT(*) > 5
ORDER BY avg_salary DESC
""")
result.show()
# +----------+--------------+----------+----------+
# |department|employee_count|avg_salary|max_salary|
# +----------+--------------+----------+----------+
# |Engineering| 142| 125000.0| 210000.0|
# | Product| 38| 115000.0| 185000.0|
# | Sales| 67| 95000.0| 160000.0|
# +----------+--------------+----------+----------+
User-Defined Functions (UDFs)
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
# Python UDF (serialized, slower than built-in functions)
@udf(returnType=StringType())
def salary_band(salary):
if salary >= 150000:
return "Senior"
elif salary >= 100000:
return "Mid"
elif salary >= 70000:
return "Junior"
else:
return "Entry"
df_with_band = df.withColumn("band", salary_band(col("salary")))
# Register UDF for SQL use
spark.udf.register("salary_band_sql", salary_band)
result = spark.sql("SELECT name, salary, salary_band_sql(salary) as band FROM employees")
Window Functions
from pyspark.sql.window import Window
from pyspark.sql import functions as F
# Define a window: partition by department, order by salary descending
window_spec = Window.partitionBy("department").orderBy(F.desc("salary"))
# Rank employees within each department
df_ranked = df.withColumn("rank", F.rank().over(window_spec)) \
.withColumn("dense_rank", F.dense_rank().over(window_spec)) \
.withColumn("row_number", F.row_number().over(window_spec))
# Running total of salary within each department
running_window = Window.partitionBy("department").orderBy("salary") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
df_running = df.withColumn("running_total", F.sum("salary").over(running_window))
# Moving average (last 3 rows)
moving_window = Window.partitionBy("department").orderBy("hire_date") \
.rowsBetween(-2, 0)
df_moving = df.withColumn("moving_avg_salary", F.avg("salary").over(moving_window))
Hive Integration
# Enable Hive support for accessing Hive metastore
spark = SparkSession.builder \
.appName("HiveIntegration") \
.config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
.enableHiveSupport() \
.getOrCreate()
# Query existing Hive tables
spark.sql("SELECT * FROM hive_db.customer_orders WHERE order_date > '2025-01-01'")
# Save DataFrame as a Hive managed table
df.write.mode("overwrite").saveAsTable("analytics.employee_summary")
# Save as external table with partitioning
df.write.mode("overwrite") \
.partitionBy("year", "month") \
.format("parquet") \
.saveAsTable("analytics.sales_partitioned")
PySpark Development
PySpark is the Python API for Apache Spark. As of 2025, Python has overtaken Scala as the most popular language for Spark development, driven by the data science community and the pandas ecosystem. PySpark accounts for over 70% of Spark API usage according to Databricks usage statistics.
Setting Up PySpark
# Install PySpark via pip
pip install pyspark==3.5.0
# Or with additional dependencies
pip install pyspark[sql,ml,streaming]==3.5.0
# Set environment variables
export SPARK_HOME=/opt/spark
export PYSPARK_PYTHON=python3
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
# Launch PySpark shell
pyspark --master local[4] --driver-memory 4g
# Submit a PySpark application
spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 10 \
--executor-memory 8g \
--executor-cores 4 \
my_spark_app.py
Pandas UDFs (Vectorized UDFs)
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
# Pandas UDF: operates on pandas Series/DataFrames
# 10-100x faster than regular Python UDFs because data is
# transferred in Apache Arrow columnar format (no row-by-row serialization)
@pandas_udf(DoubleType())
def normalize_salary(salary: pd.Series) -> pd.Series:
return (salary - salary.mean()) / salary.std()
df_normalized = df.withColumn("salary_normalized", normalize_salary(df.salary))
# Grouped Map UDF: apply a function to each group
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def scale_by_department(pdf: pd.DataFrame) -> pd.DataFrame:
pdf["salary"] = (pdf["salary"] - pdf["salary"].min()) / \
(pdf["salary"].max() - pdf["salary"].min())
return pdf
df_scaled = df.groupby("department").apply(scale_by_department)
Pandas API on Spark (formerly Koalas)
import pyspark.pandas as ps
# Use pandas syntax on distributed data!
# (Merged into PySpark in Spark 3.2, previously the Koalas project)
# Read data with pandas API
pdf = ps.read_csv("hdfs:///data/large_file.csv")
# Familiar pandas operations, executed on Spark
pdf.head(10)
pdf.describe()
pdf["salary"].hist()
pdf.groupby("department")["salary"].mean()
# Convert between PySpark DataFrame and pandas-on-Spark
spark_df = pdf.to_spark()
pandas_on_spark = spark_df.pandas_api()
MLlib Machine Learning
MLlib is Spark's built-in machine learning library. It provides scalable implementations of common ML algorithms, feature engineering tools, and a Pipeline API that standardizes the ML workflow. MLlib operates on DataFrames (the older RDD-based API is in maintenance mode).
ML Pipelines
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Load training data
data = spark.read.parquet("hdfs:///data/ml/customer_churn.parquet")
# Split into train/test
train, test = data.randomSplit([0.8, 0.2], seed=42)
# Build a pipeline
# Step 1: Encode categorical columns
gender_indexer = StringIndexer(inputCol="gender", outputCol="gender_index")
plan_indexer = StringIndexer(inputCol="plan_type", outputCol="plan_index")
# Step 2: Assemble features into a single vector
assembler = VectorAssembler(
inputCols=["age", "tenure_months", "monthly_charges",
"total_charges", "gender_index", "plan_index"],
outputCol="features_raw"
)
# Step 3: Scale features
scaler = StandardScaler(inputCol="features_raw", outputCol="features",
withStd=True, withMean=True)
# Step 4: Train classifier
rf = RandomForestClassifier(
labelCol="churned", featuresCol="features",
numTrees=100, maxDepth=10, seed=42
)
# Assemble pipeline
pipeline = Pipeline(stages=[gender_indexer, plan_indexer, assembler, scaler, rf])
# Train
model = pipeline.fit(train)
# Predict
predictions = model.transform(test)
# Evaluate
evaluator = MulticlassClassificationEvaluator(
labelCol="churned", predictionCol="prediction", metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy:.4f}") # e.g., 0.8723
# Save model for production
model.write().overwrite().save("hdfs:///models/churn_rf_v1")
Feature Engineering
from pyspark.ml.feature import (
Tokenizer, HashingTF, IDF, # Text features
Bucketizer, QuantileDiscretizer, # Binning
OneHotEncoder, Imputer # Encoding & missing values
)
# Text processing pipeline (TF-IDF)
tokenizer = Tokenizer(inputCol="description", outputCol="words")
hashing_tf = HashingTF(inputCol="words", outputCol="raw_features", numFeatures=10000)
idf = IDF(inputCol="raw_features", outputCol="tfidf_features")
# Binning continuous features
bucketizer = Bucketizer(
splits=[0, 18, 25, 35, 50, 65, float("inf")],
inputCol="age", outputCol="age_bucket"
)
# Handle missing values
imputer = Imputer(
inputCols=["salary", "tenure"],
outputCols=["salary_imputed", "tenure_imputed"],
strategy="median" # or "mean", "mode"
)
Model Persistence
# Save and load models
model.save("hdfs:///models/churn_rf_v1")
loaded_model = PipelineModel.load("hdfs:///models/churn_rf_v1")
# Use loaded model for batch scoring
new_data = spark.read.parquet("hdfs:///data/new_customers/")
predictions = loaded_model.transform(new_data)
predictions.select("customer_id", "prediction", "probability") \
.write.parquet("hdfs:///data/predictions/2025-03/")
Structured Streaming
Structured Streaming, introduced in Spark 2.0, treats a live data stream as an unbounded table that is continuously appended. You write the same DataFrame/SQL operations you would use for batch processing, and Spark executes them incrementally as new data arrives.
Micro-batch vs Continuous Processing
| Property | Micro-batch | Continuous (Experimental) |
|---|---|---|
| Latency | 100ms - seconds | ~1ms |
| Exactly-once | Yes | At-least-once |
| Aggregate support | Full (window, watermark) | Limited (map-like only) |
| Maturity | Production-ready | Experimental since Spark 2.3 |
Kafka Integration
# Read from Kafka topic
kafka_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") \
.option("subscribe", "user-events") \
.option("startingOffsets", "latest") \
.load()
# Parse JSON events
from pyspark.sql.functions import from_json, col, window, count
from pyspark.sql.types import StructType, StringType, TimestampType
event_schema = StructType() \
.add("user_id", StringType()) \
.add("event_type", StringType()) \
.add("timestamp", TimestampType()) \
.add("page", StringType())
events = kafka_stream \
.select(from_json(col("value").cast("string"), event_schema).alias("data")) \
.select("data.*")
# Windowed aggregation: count events per 5-minute window
event_counts = events \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window("timestamp", "5 minutes"),
"event_type"
) \
.agg(count("*").alias("event_count"))
# Write results to console (for debugging)
query = event_counts.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", "false") \
.trigger(processingTime="30 seconds") \
.start()
# Write to Kafka output topic (production)
output_query = event_counts.writeStream \
.outputMode("update") \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker1:9092") \
.option("topic", "event-summaries") \
.option("checkpointLocation", "hdfs:///checkpoints/event-counts") \
.trigger(processingTime="1 minute") \
.start()
Watermarks and Late Data
Performance Tuning
Performance tuning is what separates a Spark job that runs in 5 minutes from one that runs in 5 hours. The primary bottlenecks in Spark are shuffle operations, data skew, improper caching, and suboptimal resource allocation.
Partitioning Strategy
# Rule of thumb: 128 MB per partition, 2-3 partitions per core
# 1 TB dataset, 40 cores → ~8000 partitions (1TB / 128MB)
# Check current partitioning
print(f"Partitions: {df.rdd.getNumPartitions()}")
# Repartition for better parallelism (causes shuffle)
df_repartitioned = df.repartition(200)
# Repartition by key (co-locates data for joins)
df_by_customer = df.repartition(200, "customer_id")
# Coalesce to reduce partitions (no shuffle)
df_compact = df.coalesce(50)
Broadcast Joins
from pyspark.sql.functions import broadcast
# When one side of a join is small (< 10 MB by default),
# Spark broadcasts it to all executors to avoid a shuffle
# Explicit broadcast hint
result = large_df.join(broadcast(small_df), "customer_id")
# Configure broadcast threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 50 * 1024 * 1024) # 50 MB
# Disable broadcast for debugging
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
Memory Tuning
# spark-defaults.conf - production memory configuration
# Executor memory: 60% for execution, 40% for storage (default)
spark.executor.memory: 8g
spark.executor.memoryOverhead: 2g # Off-heap (Python, JVM overhead)
spark.memory.fraction: 0.6 # 60% of JVM heap for Spark
spark.memory.storageFraction: 0.5 # 50% of Spark memory for caching
# Driver memory (increase for collect() or large broadcast variables)
spark.driver.memory: 4g
spark.driver.maxResultSize: 2g
# Off-heap memory for Tungsten
spark.memory.offHeap.enabled: true
spark.memory.offHeap.size: 4g
Adaptive Query Execution (AQE)
# AQE (Spark 3.0+) dynamically optimizes queries at runtime
# based on actual data statistics collected during execution
# Enable AQE (default in Spark 3.2+)
spark.conf.set("spark.sql.adaptive.enabled", "true")
# AQE features:
# 1. Coalescing post-shuffle partitions (reduces small file problem)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m")
# 2. Converting sort-merge join to broadcast join at runtime
spark.conf.set("spark.sql.adaptive.autoBroadcastJoinThreshold", "50m")
# 3. Optimizing skew joins (splits skewed partitions)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")
collect() on a large DataFrame. This pulls all data to the driver node, which typically has limited memory. For a 100 GB DataFrame, collect() will crash the driver with an OutOfMemoryError. Use show(), take(n), or write results to storage instead.
Handling Data Skew
Data skew occurs when a small number of keys contain a disproportionate amount of data. For example, if you are joining orders with customers and one customer has 10 million orders while most have fewer than 100, that one partition will take 1000x longer than the others, becoming a bottleneck for the entire job.
# Technique 1: Salted joins (manual skew handling)
# Add a random salt to the skewed key to distribute data evenly
import pyspark.sql.functions as F
# Add salt to the large (skewed) table
num_salts = 10
large_df_salted = large_df.withColumn(
"salt", (F.rand() * num_salts).cast("int")
).withColumn(
"salted_key", F.concat(F.col("customer_id"), F.lit("_"), F.col("salt"))
)
# Explode the small table to match all salt values
from pyspark.sql.functions import explode, array, lit
small_df_exploded = small_df.withColumn(
"salt", explode(array([lit(i) for i in range(num_salts)]))
).withColumn(
"salted_key", F.concat(F.col("customer_id"), F.lit("_"), F.col("salt"))
)
# Join on the salted key - data is now distributed evenly
result = large_df_salted.join(small_df_exploded, "salted_key")
# Technique 2: Use AQE skew join (Spark 3.0+ - automatic)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
Serialization
# Use Kryo serialization instead of Java serialization (2-10x faster)
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.kryo.registrationRequired", "false")
# Register custom classes for even better performance
# In Scala/Java:
# conf.registerKryoClasses(Array(classOf[MyClass]))
Common Performance Anti-Patterns
Anti-Patterns to Avoid
- Using count() for existence checks:
df.count() > 0scans the entire dataset. Usedf.head(1)ordf.isEmpty()(Spark 3.3+) instead. - Calling actions inside loops: Each action triggers a full computation. Accumulate transformations and call a single action at the end.
- Using Python UDFs when built-in functions exist: Built-in functions (F.col, F.when, F.regexp_extract) are executed in the JVM. Python UDFs serialize data to Python and back, adding massive overhead.
- Not caching intermediate results: If you use the same DataFrame in multiple downstream operations, cache it. Otherwise Spark recomputes it from scratch each time.
- Too many small partitions: Each partition has scheduling overhead (~50ms). If you have 100,000 tiny partitions, the scheduling overhead alone is 5,000 seconds. Use coalesce() after filtering.
- Writing many small files: Each partition produces one output file. 10,000 partitions = 10,000 files, which kills downstream read performance. Use coalesce() or repartition() before writing.
Production Deployment
YARN Configuration
# Submit Spark application to YARN cluster
spark-submit \
--master yarn \
--deploy-mode cluster \
--name "Daily ETL Pipeline" \
--num-executors 20 \
--executor-memory 16g \
--executor-cores 4 \
--driver-memory 8g \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=5 \
--conf spark.dynamicAllocation.maxExecutors=50 \
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.shuffle.partitions=400 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.speculation=true \
--conf spark.speculation.multiplier=1.5 \
--py-files utils.zip,models.zip \
main_pipeline.py --date 2025-03-15
Kubernetes Deployment
# spark-on-k8s-operator SparkApplication manifest
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: daily-etl-pipeline
namespace: spark-jobs
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: company-registry.io/spark:3.5.0-python3
mainApplicationFile: "s3a://spark-jobs/pipelines/main_pipeline.py"
sparkVersion: "3.5.0"
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 60
driver:
cores: 2
memory: "4g"
serviceAccount: spark-driver
executor:
cores: 4
instances: 10
memory: "16g"
dynamicAllocation:
enabled: true
initialExecutors: 5
minExecutors: 2
maxExecutors: 30
sparkConf:
"spark.sql.adaptive.enabled": "true"
"spark.kubernetes.container.image.pullPolicy": "Always"
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem"
Monitoring with Spark UI
Key Spark UI Tabs for Debugging
- Jobs tab: Overall progress of each action. Look for failed jobs and their error messages.
- Stages tab: Breakdown of each stage. Check "Shuffle Read" and "Shuffle Write" for data movement. High shuffle = potential optimization opportunity.
- Storage tab: Cached RDDs/DataFrames. Check memory usage and eviction counts. High evictions mean you need more memory or fewer cached datasets.
- Executors tab: Per-executor metrics. Look for uneven task distribution (data skew), high GC time (memory pressure), or failed tasks.
- SQL tab: Visual DAG of the physical plan. Click on each node to see row counts and timing. This is the most useful tab for optimizing DataFrame/SQL queries.
Log Aggregation
# View YARN application logs
yarn logs -applicationId application_1234567890_0001
# View specific container log
yarn logs -applicationId application_1234567890_0001 \
-containerId container_1234567890_0001_01_000002
# Enable structured logging for better parsing
# In log4j2.properties:
# appender.file.type = RollingFile
# appender.file.layout.type = JsonLayout
Production Checklist
Pre-Production Deployment Checklist
- Data validation: Add schema validation on input data. Use
spark.read.schema(expected_schema).option("mode", "FAILFAST")to fail early on schema mismatches. - Idempotency: Ensure your pipeline can be re-run safely. Use
mode("overwrite")with partition-level granularity, or implement merge logic for incremental loads. - Error handling: Wrap critical sections in try/except, log failures with context, and configure retry policies (3 retries with exponential backoff is standard).
- Monitoring: Set up alerts on job duration (more than 2x historical average), failure count, and output row count (sudden drops indicate data issues).
- Resource limits: Set
spark.dynamicAllocation.maxExecutorsto prevent runaway jobs from consuming the entire cluster. - Data quality: Add post-processing checks. Verify row counts, null percentages, and value ranges. Write quality metrics to a monitoring table.
- Secrets management: Never hardcode credentials. Use YARN credential providers, Kubernetes secrets, or environment variables injected at runtime.
Delta Lake Integration
# Delta Lake provides ACID transactions on top of Spark
# Essential for production data pipelines
# Write DataFrame as Delta table
df.write.format("delta") \
.mode("overwrite") \
.partitionBy("date") \
.save("s3://warehouse/sales_delta/")
# Upsert (merge) new data into existing Delta table
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "s3://warehouse/sales_delta/")
delta_table.alias("target").merge(
new_data.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# Time travel: query previous versions
df_yesterday = spark.read.format("delta") \
.option("timestampAsOf", "2025-03-14") \
.load("s3://warehouse/sales_delta/")
# Schema evolution: automatically add new columns
df_with_new_col.write.format("delta") \
.option("mergeSchema", "true") \
.mode("append") \
.save("s3://warehouse/sales_delta/")
Case Studies
Netflix: Recommendation Pipeline
Netflix processes over 1.5 petabytes of data daily and Spark is central to their data infrastructure. Their recommendation system, which drives 80% of viewer activity, uses Spark for both batch feature engineering and model training. The pipeline reads viewing history from their Cassandra cluster, computes user-item interaction features using Spark SQL, trains collaborative filtering models using MLlib's ALS (Alternating Least Squares) algorithm, and writes the resulting recommendation scores to a serving layer. Netflix runs over 10,000 Spark jobs daily on their internal Genie platform (built on top of YARN). Their largest jobs process hundreds of terabytes and run on clusters with thousands of executors.
A key optimization Netflix employs is adaptive partitioning: they dynamically adjust the number of partitions based on the input data size, which varies significantly between countries (US viewing data is 100x larger than some smaller markets). They also use custom partitioners that hash on user_id to ensure all data for a single user lands on the same executor, avoiding shuffles during per-user feature computation.
Uber: Real-Time Pricing Engine
Uber uses Apache Spark as part of their real-time pricing and demand forecasting system. Their architecture combines Spark Structured Streaming with Apache Kafka to process ride request events in near-real-time. The pipeline ingests millions of ride requests per minute from Kafka, enriches them with geospatial features (using custom UDFs that perform H3 hexagonal grid lookups), computes demand density per geographic cell, and feeds the results into their surge pricing model.
Uber contributed the H3 geospatial indexing system to the open-source community and integrated it as a PySpark UDF library. Their Spark deployment runs on a multi-tenant YARN cluster with over 100,000 cores. They reported reducing their ETL pipeline runtime from 12 hours (with Hive/MapReduce) to 45 minutes after migrating to Spark, a 16x improvement. The migration also reduced their cluster costs by 30% because jobs completed faster and released resources sooner.
Alibaba: Singles' Day (11.11) Data Processing
Alibaba's Singles' Day (November 11) is the world's largest online shopping event, generating over $85 billion in GMV in 2023. During the 2023 event, Alibaba's data platform processed over 15 petabytes of data in real-time using a custom Spark deployment on their Apsara cloud infrastructure. The peak processing rate exceeded 7 million events per second. They used Spark Structured Streaming for real-time sales dashboards (the famous giant screen showing live transaction counts), Spark SQL for merchant analytics, and MLlib for real-time fraud detection (flagging suspicious transactions within 100ms).
Alibaba's engineering team made significant contributions to Spark's adaptive query execution feature and contributed optimizations for handling extreme data skew, which is common in e-commerce data (a small number of popular products generate a disproportionate share of transactions).
Exercises
Word Count Pipeline
Build a PySpark application that reads a collection of text files, counts word frequencies, and outputs the top 100 most common words. Use both the RDD API and the DataFrame API, compare the execution plans using explain(), and measure the performance difference. Write the results to Parquet format partitioned by the first letter of each word.
Real-Time Dashboard with Structured Streaming
Create a Structured Streaming application that reads JSON events from a Kafka topic (simulated with a rate source if Kafka is unavailable), computes 5-minute windowed aggregations (event count, average value, max value per event_type), handles late data with a 10-minute watermark, and writes results to both the console and a Parquet sink. Verify exactly-once semantics by killing and restarting the application mid-stream.
End-to-End ML Pipeline with Model Serving
Build a complete MLlib pipeline for predicting customer churn: ingest data from multiple sources (CSV user profiles, JSON event logs, Parquet transaction history), perform feature engineering (TF-IDF on text fields, binning on numerical fields, one-hot encoding on categoricals), train and evaluate three models (Logistic Regression, Random Forest, Gradient Boosted Trees), select the best model using cross-validation, serialize the pipeline, and write a batch scoring job that loads the model and produces predictions for new customers daily. Profile the application using Spark UI and optimize for at least 2x speedup.
Spark Pipeline Design Generator
Use this tool to document your Spark pipeline design -- input sources, transformations, output sinks, cluster sizing, and scheduling. Download as Word, Excel, PDF, or PowerPoint for architecture review and team documentation.
Document your Spark pipeline configuration for review and export. All data stays in your browser.
All data stays in your browser. Nothing is sent to or stored on any server.
Conclusion & Resources
Apache Spark has evolved from a research project at UC Berkeley into the de facto standard for large-scale data processing. Its unified API for batch, streaming, and machine learning workloads, combined with support for Python, Scala, Java, R, and SQL, makes it accessible to both data engineers and data scientists.
- Spark's in-memory processing model is 10-100x faster than MapReduce for iterative workloads
- Use DataFrames over RDDs for structured data to leverage the Catalyst optimizer
- Pandas UDFs are 10-100x faster than regular Python UDFs due to Arrow serialization
- Enable Adaptive Query Execution (AQE) for automatic runtime optimization
- Avoid collect() on large datasets and minimize shuffle operations
- Structured Streaming provides unified batch-and-stream processing with exactly-once semantics
Recommended Resources
Books and References
- "Learning Spark, 2nd Edition" by Damji, Wenig, Das, and Lee (O'Reilly, 2020) - The definitive Spark book, updated for Spark 3.x
- "Spark: The Definitive Guide" by Chambers and Zaharia (O'Reilly, 2018) - Comprehensive reference by the creator of Spark
- "High Performance Spark" by Karau and Warren (O'Reilly, 2017) - Deep performance tuning guide
- Databricks Academy - Free online courses on Spark, Delta Lake, and MLflow
- spark.apache.org - Official documentation, migration guides, and API references