Introduction: The Backbone of AI Application Development
Series Overview: This is Part 4 of our 18-part AI Application Development Mastery series. We will take you from foundational understanding through prompt engineering, LangChain, RAG systems, agents, LangGraph, multi-agent architectures, production deployment, and building real-world AI applications.
1
Foundations & Evolution of AI Apps
Pre-LLM era, transformers, LLM revolution
2
LLM Fundamentals for Developers
Tokens, context windows, sampling, API patterns
3
Prompt Engineering Mastery
Zero/few-shot, CoT, ReAct, structured outputs
4
LangChain Core Concepts
Chains, prompts, LLMs, tools, LCEL
You Are Here
5
Retrieval-Augmented Generation (RAG)
Embeddings, vector DBs, retrievers, RAG pipelines
6
Memory & Context Engineering
Buffer/summary/vector memory, chunking, re-ranking
7
Agents — Core of Modern AI Apps
ReAct, tool-calling, planner-executor agents
8
LangGraph — Stateful Agent Workflows
Nodes, edges, state, graph execution, cycles
9
Deep Agents & Autonomous Systems
Multi-step reasoning, self-reflection, planning
10
Multi-Agent Systems
Supervisor, swarm, debate, role-based collaboration
11
AI Application Design Patterns
RAG, chat+memory, workflow automation, agent loops
12
Ecosystem & Frameworks
LlamaIndex, Haystack, HuggingFace, vLLM
13
MCP Foundations & Architecture
Protocol design, Host/Client/Server, primitives, security
14
MCP in Production
Building servers, integrations, scaling, agent systems
15
Evaluation & LLMOps
Prompt eval, tracing, LangSmith, experiment tracking
16
Production AI Systems
APIs, queues, caching, streaming, scaling
17
Safety, Guardrails & Reliability
Input filtering, hallucination mitigation, prompt injection
18
Advanced Topics
Fine-tuning, tool learning, hybrid LLM+symbolic
19
Building Real AI Applications
Chatbot, document QA, coding assistant, full-stack
20
Future of AI Applications
Autonomous agents, self-improving, multi-modal, AI OS
LangChain has become the de facto orchestration framework for AI application development. At its core lies the LangChain Expression Language (LCEL) — a declarative, composable way to build chains that supports streaming, parallelism, and fallbacks out of the box. In this installment, we dive deep into the primitives that make LangChain powerful: the Runnable protocol, chain composition patterns, output parsers, tool integration, and the callback system that powers observability.
Whether you are building a simple prompt-to-LLM pipeline or a complex multi-step workflow with routing, transformation, and tool calling, understanding these core concepts is essential. Everything in LangChain — from RAG pipelines to autonomous agents — is built on top of the foundations we cover here.
Key Insight: LCEL is not just syntactic sugar. It provides a unified interface (the Runnable protocol) that gives every component — prompts, models, parsers, retrievers, tools — the same set of methods: invoke, stream, batch, ainvoke, astream, and abatch. This composability is what makes LangChain so powerful for production systems.
Prerequisites
Before diving in, make sure you have your environment set up:
# Install LangChain core packages
pip install langchain langchain-core langchain-openai langchain-community langsmith pydantic
# Set your API key: export OPENAI_API_KEY="sk-..."
export OPENAI_API_KEY="sk-..."
# Optional: LangSmith for tracing (https://smith.langchain.com)
export LANGCHAIN_TRACING_V2="true"
export LANGCHAIN_API_KEY="ls__..."
LangChain Package Ecosystem: Starting with LangChain v0.3, the framework was restructured into focused packages. The main
langchain package provides core LCEL primitives and the Runnable protocol. Provider integrations live in dedicated packages like
langchain-openai,
langchain-anthropic, and
langchain-community. Legacy components — including
AgentExecutor,
create_react_agent,
create_tool_calling_agent,
hub.pull(), legacy memory classes, and retriever utilities like
ContextualCompressionRetriever — have been moved to
langchain-classic. Install it when needed:
pip install langchain-classic. You will see this package used in
Part 6 (Re-Ranking) and
Part 7 (Agents).
| Concept |
What You Will Learn |
| LCEL & Pipe Operator |
Compose chains declaratively using the | operator |
| Runnable Protocol |
Understand invoke, stream, batch, and async variants |
| Chain Types |
Build Sequential, Transform, and Router chains |
| Output Parsers |
Extract structured data from LLM responses |
| Tool Integration |
Create and bind tools using the @tool decorator |
| Callbacks & Tracing |
Monitor, debug, and observe chain execution |
1. LCEL Fundamentals
The LangChain Expression Language (LCEL) is the declarative composition layer that connects every component in LangChain. Introduced to replace the older LLMChain and SequentialChain APIs, LCEL provides a unified, composable way to build chains that automatically supports streaming, batching, parallelism, and fallbacks.
1.1 The Pipe Operator
The pipe operator (|) is the fundamental composition primitive in LCEL. It connects Runnables in sequence, passing the output of one as the input to the next — just like Unix pipes:
import os
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# Set your API key: export OPENAI_API_KEY="sk-..."
os.environ.setdefault("OPENAI_API_KEY", os.getenv("OPENAI_API_KEY", ""))
# Define components
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant that explains {topic} concepts."),
("human", "{question}")
])
model = ChatOpenAI(model="gpt-4o", temperature=0.7)
parser = StrOutputParser()
# Compose with the pipe operator: prompt -> model -> parser
chain = prompt | model | parser
# Invoke the chain
result = chain.invoke({
"topic": "machine learning",
"question": "Explain gradient descent in simple terms"
})
print(result)
Under the hood, the | operator creates a RunnableSequence. Each component implements the Runnable protocol, which guarantees a consistent interface across all components.
Mental Model: Think of LCEL like a data pipeline. Each pipe stage transforms the data: dict goes into the prompt and becomes a PromptValue, which goes into the model and becomes a ChatMessage, which goes into the parser and becomes a str. The types flow naturally from one stage to the next.
1.2 The Runnable Protocol
Every component in LCEL implements the Runnable interface, which provides these core methods:
import os
import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import Runnable
# The Runnable protocol - every LCEL component implements these:
#
# Synchronous methods:
# .invoke(input) - Process a single input
# .stream(input) - Stream output chunks
# .batch(inputs) - Process multiple inputs in parallel
#
# Asynchronous methods:
# .ainvoke(input) - Async single input
# .astream(input) - Async streaming
# .abatch(inputs) - Async batch processing
#
# Inspection methods:
# .input_schema - Pydantic model for input
# .output_schema - Pydantic model for output
# .get_graph() - Visualization of the chain
# Build a chain to demonstrate invocation methods
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant that explains {topic} concepts."),
("human", "{question}")
])
model = ChatOpenAI(model="gpt-4o", temperature=0.7)
parser = StrOutputParser()
chain = prompt | model | parser
# Single invocation
print("==========Single Invocation===============")
result = chain.invoke({"topic": "AI", "question": "What is attention?"})
print(f'Single Invocation Result: {result}')
# Streaming - tokens arrive as they are generated
print("\n\n==========Streaming - tokens arrive as they are generated===============")
for chunk in chain.stream({"topic": "AI", "question": "What is attention?"}):
print(f'-> {chunk}', end="", flush=True)
# Batch processing - runs in parallel by default
print("\n\n==========Batch processing - runs in parallel by default===============")
# Batch inputs (runs in parallel)
inputs = [
{"topic": "AI", "question": "What is attention?"},
{"topic": "AI", "question": "What is backpropagation?"},
{"topic": "AI", "question": "What is fine-tuning?"},
]
# Execute batch
results = chain.batch(inputs)
# Print results
for i, res in enumerate(results):
print(f"\n--- Result {i+1} ---\n{res}")
# Async invocation (for use in async frameworks like FastAPI)
async def main():
print("\n\n==========Async invocation (for use in async frameworks like FastAPI)===============")
result = await chain.ainvoke({"topic": "AI", "question": "What is RAG?"})
print(f'Async Invocation Result: {result}')
await main()
1.3 Runnable Primitives
LCEL provides four primitive Runnables that serve as the building blocks for composing complex chains. Each one solves a specific composition problem: passing data through unchanged, running tasks in parallel, injecting custom Python logic, and routing based on conditions. Mastering these four primitives is essential — they are the glue that connects prompts, models, and parsers into powerful workflows.
| Primitive |
Purpose |
Input → Output |
RunnablePassthrough |
Passes input through unchanged (or adds keys) |
dict → dict (same or augmented) |
RunnableParallel |
Runs multiple runnables simultaneously on the same input |
any → dict (keys mapped to each runnable's output) |
RunnableLambda |
Wraps any Python function as a Runnable |
any → any (whatever your function returns) |
RunnableBranch |
Routes input to different chains based on conditions |
any → any (output of the selected branch) |
RunnablePassthrough — The Identity Operator
RunnablePassthrough takes whatever input it receives and passes it through unchanged. At first glance this seems useless — why would you need a component that does nothing? The answer becomes clear in multi-key workflows where you need to preserve the original input alongside transformed data. The classic use case is RAG (Retrieval-Augmented Generation), where you need to pass the user's question through as-is while simultaneously fetching context from a vector store.
import os
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
os.environ.setdefault("OPENAI_API_KEY", os.getenv("OPENAI_API_KEY", ""))
model = ChatOpenAI(model="gpt-4o", temperature=0)
parser = StrOutputParser()
# --- Basic: RunnablePassthrough passes input through unchanged ---
passthrough = RunnablePassthrough()
result = passthrough.invoke("hello world")
print(result) # "hello world" — unchanged
# --- Key Use Case: Preserving input alongside transformed data ---
# In a RAG pipeline, you need BOTH the retrieved context AND the original question.
# RunnablePassthrough keeps the question intact while the retriever fetches context.
# Simulated retriever (in production, this would be a real vector store retriever)
def fake_retriever(query: str) -> str:
return f"Retrieved context for: {query}"
prompt = ChatPromptTemplate.from_template(
"Context: {context}\n\nQuestion: {question}\n\nAnswer based on the context above."
)
# RunnableParallel creates a dict with two keys:
# - "context" is produced by the fake retriever
# - "question" is passed through unchanged via RunnablePassthrough
chain = (
RunnableParallel(
context=lambda x: fake_retriever(x["question"]),
question=RunnablePassthrough() # Keeps the full input dict
)
# At this point the data looks like:
# {"context": "Retrieved context for: ...", "question": {"question": "..."}}
# We need to flatten "question" — see assign() below for a cleaner approach
)
# --- assign(): Add keys without losing existing ones ---
# RunnablePassthrough.assign() is the preferred way to augment input dicts
chain_with_assign = (
RunnablePassthrough.assign(
context=lambda x: fake_retriever(x["question"]),
uppercased=lambda x: x["question"].upper()
)
)
# Input: {"question": "What is LCEL?"}
# Output: {"question": "What is LCEL?", "context": "Retrieved...", "uppercased": "WHAT IS LCEL?"}
result = chain_with_assign.invoke({"question": "What is LCEL?"})
print(result)
# Original "question" key is preserved, and "context" + "uppercased" are added
Key Insight: RunnablePassthrough.assign() is more useful than plain RunnablePassthrough() in practice. It lets you add new keys to the input dict while preserving all existing keys — perfect for enriching data as it flows through a chain without losing anything.
RunnableParallel — Concurrent Execution
RunnableParallel runs multiple runnables simultaneously on the same input and combines their outputs into a single dictionary. Instead of running tasks one after another (which wastes time when tasks are independent), RunnableParallel executes them concurrently and returns a structured result. This is especially powerful when you need multiple analyses on the same data — summarization, sentiment analysis, keyword extraction — all at once.
import os
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel
os.environ.setdefault("OPENAI_API_KEY", os.getenv("OPENAI_API_KEY", ""))
model = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
parser = StrOutputParser()
# --- Step 1: Define individual chains for different analyses ---
summary_chain = (
ChatPromptTemplate.from_template("Summarize this text in 2 sentences: {text}")
| model | parser
)
sentiment_chain = (
ChatPromptTemplate.from_template(
"Analyze the sentiment of this text. Reply with: Positive, Negative, or Neutral. Text: {text}")
| model | parser
)
keyword_chain = (
ChatPromptTemplate.from_template(
"Extract the 5 most important keywords from this text as a comma-separated list: {text}")
| model | parser
)
# --- Step 2: Combine with RunnableParallel ---
# Each key becomes a key in the output dict
# All three chains execute simultaneously on the same input
parallel_analysis = RunnableParallel(
summary=summary_chain,
sentiment=sentiment_chain,
keywords=keyword_chain,
)
# --- Step 3: Invoke ---
text = """LangChain is a framework for developing applications powered by large language
models. It simplifies the process of building context-aware, reasoning applications
by providing tools for prompt management, memory, retrieval, and agent orchestration."""
result = parallel_analysis.invoke({"text": text})
# Result is a dict with keys matching the RunnableParallel keys
print("Summary:", result["summary"])
print("Sentiment:", result["sentiment"])
print("Keywords:", result["keywords"])
# All three analyses ran in parallel — total time ≈ slowest chain, not sum of all three
RunnableParallel also accepts a shorthand dict syntax — you can pass a plain dictionary instead of explicitly constructing a RunnableParallel object. LCEL automatically wraps it for you:
import os
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel, RunnableLambda
os.environ.setdefault("OPENAI_API_KEY", os.getenv("OPENAI_API_KEY", ""))
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
parser = StrOutputParser()
# --- Chaining with RunnableParallel: generate then analyze ---
# Step 1: Generate a joke
joke_chain = (
ChatPromptTemplate.from_template("Tell me a short joke about {topic}")
| model | parser
)
# Step 2: Analyze the joke in parallel for humor and clarity
humor_chain = (
ChatPromptTemplate.from_template("Rate this joke's humor from 1-10 and explain why: {joke}")
| model | parser
)
clarity_chain = (
ChatPromptTemplate.from_template("Is this joke clear and easy to understand? Explain: {joke}")
| model | parser
)
# Combine: generate joke → reshape output → analyze in parallel
full_chain = (
joke_chain
| (lambda joke: {"joke": joke}) # Reshape string output into dict for next step
| RunnableParallel(
humor=humor_chain,
clarity=clarity_chain,
)
)
result = full_chain.invoke({"topic": "Python programming"})
print("Humor Analysis:", result["humor"])
print("Clarity Analysis:", result["clarity"])
# --- Dict shorthand: LCEL auto-wraps dicts as RunnableParallel ---
# These two are equivalent:
explicit = RunnableParallel(humor=humor_chain, clarity=clarity_chain)
shorthand = {"humor": humor_chain, "clarity": clarity_chain} # Auto-wrapped by LCEL pipe
# Both produce the same result when used in a chain with |
Performance Benefit: RunnableParallel runs all branches concurrently. If you have 3 LLM calls that each take 2 seconds, sequential execution takes ~6 seconds. With RunnableParallel, it takes ~2 seconds (the slowest single call). This is especially impactful in production APIs where latency matters.
RunnableLambda — Custom Python Logic
RunnableLambda wraps any Python function as a Runnable, letting you inject arbitrary transformation logic into your chains. This is how you bridge the gap between LLM I/O and your application's business logic — data cleaning, validation, reformatting, API calls, database queries, or any custom computation that isn't an LLM call. Without RunnableLambda, you'd be limited to only LLM-compatible components in your chains.
import os
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda
os.environ.setdefault("OPENAI_API_KEY", os.getenv("OPENAI_API_KEY", ""))
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
parser = StrOutputParser()
# --- Basic: Wrap a function as a Runnable ---
def word_count(text: str) -> dict:
"""Count words and characters in LLM output."""
words = text.split()
return {
"text": text,
"word_count": len(words),
"char_count": len(text),
"avg_word_length": round(sum(len(w) for w in words) / len(words), 1) if words else 0
}
# word_count is now a step in the chain
chain = (
ChatPromptTemplate.from_template("Explain {topic} in exactly 3 sentences.")
| model
| parser
| RunnableLambda(word_count) # Transform LLM string output into structured dict
)
result = chain.invoke({"topic": "gradient descent"})
print(f"Words: {result['word_count']}, Chars: {result['char_count']}")
print(f"Text: {result['text'][:100]}...")
# --- Data preprocessing: clean input before sending to LLM ---
def preprocess_query(input_dict: dict) -> dict:
"""Clean and normalize user input before LLM processing."""
query = input_dict["query"]
return {
"query": query.strip().lower(),
"original_query": query,
"query_length": len(query.split()),
}
preprocess_chain = (
RunnableLambda(preprocess_query)
| ChatPromptTemplate.from_template("Answer this question concisely: {query}")
| model
| parser
)
result = preprocess_chain.invoke({"query": " WHAT Is Machine Learning? "})
print(result)
# --- Post-processing: transform LLM output for your application ---
def format_for_api(llm_output: str) -> dict:
"""Structure LLM output as an API response."""
return {
"status": "success",
"answer": llm_output,
"model": "gpt-4o-mini",
"truncated": len(llm_output) > 500,
}
api_chain = (
ChatPromptTemplate.from_template("Explain {concept} simply.")
| model
| parser
| RunnableLambda(format_for_api)
)
result = api_chain.invoke({"concept": "neural networks"})
print(result["status"]) # "success"
print(result["answer"]) # The LLM's explanation
RunnableLambda also supports async functions and the decorator syntax for cleaner code:
import os
import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda, chain as chain_decorator
os.environ.setdefault("OPENAI_API_KEY", os.getenv("OPENAI_API_KEY", ""))
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
parser = StrOutputParser()
# --- Async support: wrap async functions for use in async chains ---
async def async_enrich(text: str) -> dict:
"""Simulate async enrichment (e.g., API call, database lookup)."""
# In production: await httpx.get(...) or await db.query(...)
await asyncio.sleep(0.1) # Simulate async I/O
return {
"text": text,
"enriched": True,
"source": "knowledge_base"
}
async_chain = (
ChatPromptTemplate.from_template("Define {term} in one sentence.")
| model
| parser
| RunnableLambda(async_enrich) # Works with async functions automatically
)
# Use with await in async context
# result = await async_chain.ainvoke({"term": "transformer architecture"})
# --- @chain decorator: define a full Runnable as a function ---
# Instead of building with | pipes, you can use the @chain decorator
# to write the entire chain logic as a single function
@chain_decorator
def analysis_chain(input_dict: dict) -> str:
"""A complete chain defined as a decorated function."""
topic = input_dict["topic"]
# You can use any Python logic inside
if len(topic.split()) > 10:
prompt_text = f"Summarize and then explain: {topic}"
else:
prompt_text = f"Explain in detail: {topic}"
# Call other chains/runnables inside
result = (
ChatPromptTemplate.from_template(prompt_text)
| model
| parser
).invoke({}) # Empty dict since topic is already in the template
return f"[Analysis] {result}"
# Use like any other Runnable
result = analysis_chain.invoke({"topic": "attention mechanisms"})
print(result)
When to Use RunnableLambda: Any time you need custom Python logic inside a chain — data cleaning, validation, formatting, API calls, logging, conditional logic, or reshaping data between chain steps. If you find yourself wanting to write (lambda x: ...) in a pipe, consider using RunnableLambda for readability and debuggability (lambdas don't have names in tracing).
RunnableBranch — Conditional Routing
RunnableBranch routes input to different chains based on conditions — it's the if/elif/else of LCEL. This enables dynamic workflows where the path through your chain depends on the input content. For example, routing coding questions to a code-specialized prompt, math questions to a math-focused prompt, and everything else to a general-purpose prompt. Each branch is a tuple of (condition_function, runnable), with a fallback runnable as the final argument.
import os
import re
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableBranch, RunnableLambda
os.environ.setdefault("OPENAI_API_KEY", os.getenv("OPENAI_API_KEY", ""))
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
parser = StrOutputParser()
# --- Define specialized chains for different question types ---
code_chain = (
ChatPromptTemplate.from_template(
"You are a senior software engineer. Answer this coding question with code examples "
"and best practices:\n\n{question}")
| model | parser
)
math_chain = (
ChatPromptTemplate.from_template(
"You are a mathematics professor. Solve this problem step-by-step, "
"showing all work:\n\n{question}")
| model | parser
)
creative_chain = (
ChatPromptTemplate.from_template(
"You are a creative writing expert. Help with this creative request "
"with vivid, engaging language:\n\n{question}")
| model | parser
)
general_chain = (
ChatPromptTemplate.from_template(
"Answer this question clearly and concisely:\n\n{question}")
| model | parser
)
# --- Helper: match whole words only (avoids substring false positives) ---
# IMPORTANT: Plain "kw in text" does substring matching, which causes bugs.
# For example, "api" matches inside "capital", routing "What is the capital
# of France?" to the code chain! Use \b word boundaries to match whole words.
def has_keyword(text: str, keywords: list[str]) -> bool:
"""Check if any keyword appears as a whole word in text."""
pattern = r'\b(' + '|'.join(re.escape(kw) for kw in keywords) + r')\b'
return bool(re.search(pattern, text, re.IGNORECASE))
# --- RunnableBranch: route to the right chain based on input ---
# Each tuple is (condition_function, chain_to_run)
# The last argument (no tuple) is the default fallback
branch = RunnableBranch(
# Condition 1: coding questions
(lambda x: has_keyword(x["question"],
["code", "function", "python", "javascript", "bug", "api", "programming"]),
code_chain),
# Condition 2: math questions
(lambda x: has_keyword(x["question"],
["calculate", "solve", "equation", "integral", "derivative"]),
math_chain),
# Condition 3: creative requests
(lambda x: has_keyword(x["question"],
["write", "story", "poem", "creative", "imagine"]),
creative_chain),
# Default fallback: general questions
general_chain,
)
# Test with different questions
questions = [
"Write a Python function for binary search with type hints",
"Solve the integral of x^2 * sin(x) dx",
"Write a short poem about machine learning",
"What is the capital of France?",
]
for q in questions:
result = branch.invoke({"question": q})
print(f"Q: {q[:50]}...")
print(f"A: {result[:100]}...\n")
Substring Trap: Never use kw in text for keyword matching — it does substring matching, not whole-word matching. For example, "api" in "capital" is True because "capital" contains "api". Similarly, "code" in "barcode" or "function" in "malfunction" would cause false positives. Always use re.search(r'\bkeyword\b', text) with word boundaries for reliable routing.
For more sophisticated routing, you can use an LLM-based classifier to determine the route instead of keyword matching. This is more robust because the LLM understands intent, not just keywords:
import os
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableBranch, RunnableLambda
os.environ.setdefault("OPENAI_API_KEY", os.getenv("OPENAI_API_KEY", ""))
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
parser = StrOutputParser()
# --- LLM-based classification for smarter routing ---
classify_prompt = ChatPromptTemplate.from_template(
"Classify this question into exactly one category: "
"TECHNICAL, MATHEMATICAL, CREATIVE, or GENERAL.\n\n"
"Question: {question}\n\n"
"Reply with only the category name, nothing else."
)
# Specialized chains (same as before)
technical_chain = (
ChatPromptTemplate.from_template(
"As a senior engineer, answer technically:\n\n{question}")
| model | parser
)
math_chain = (
ChatPromptTemplate.from_template(
"As a math professor, solve step-by-step:\n\n{question}")
| model | parser
)
creative_chain = (
ChatPromptTemplate.from_template(
"As a creative writer, respond with flair:\n\n{question}")
| model | parser
)
general_chain = (
ChatPromptTemplate.from_template("Answer concisely:\n\n{question}")
| model | parser
)
# --- Smart router: classify first, then route ---
def classify_and_route(input_dict: dict) -> str:
"""Use LLM to classify, then route to the right chain."""
# Step 1: Classify the question
classification = (classify_prompt | model | parser).invoke(input_dict)
category = classification.strip().upper()
print(f" [Router] Classified as: {category}")
# Step 2: Route to the appropriate chain
if "TECHNICAL" in category:
return technical_chain.invoke(input_dict)
elif "MATHEMATICAL" in category:
return math_chain.invoke(input_dict)
elif "CREATIVE" in category:
return creative_chain.invoke(input_dict)
else:
return general_chain.invoke(input_dict)
# Wrap as a Runnable so it works in LCEL chains
smart_router = RunnableLambda(classify_and_route)
# Test: the LLM decides the route, not keyword matching
result = smart_router.invoke({"question": "How do transformers handle long sequences?"})
# [Router] Classified as: TECHNICAL
print(result[:200])
RunnableBranch vs RunnableLambda for Routing: RunnableBranch is declarative and works well for simple keyword-based conditions. For complex routing (LLM classification, multi-step logic, external API lookups), use RunnableLambda with a custom routing function instead — it gives you full Python control and is easier to debug.
Combining Primitives — Real-World Pattern
In practice, you'll combine all four primitives in a single chain. Here's a realistic example that preprocesses input, fetches context in parallel, routes to a specialized handler, and post-processes the output:
import os
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import (
RunnablePassthrough,
RunnableParallel,
RunnableLambda,
RunnableBranch,
)
os.environ.setdefault("OPENAI_API_KEY", os.getenv("OPENAI_API_KEY", ""))
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
parser = StrOutputParser()
# --- Step 1: RunnableLambda for preprocessing ---
def preprocess(input_dict: dict) -> dict:
"""Clean and normalize the query."""
query = input_dict["query"].strip()
return {
"query": query,
"query_type": "technical" if any(
kw in query.lower() for kw in ["code", "api", "function", "error"]
) else "general",
}
# --- Step 2: RunnablePassthrough.assign() to add context ---
def fetch_docs(input_dict: dict) -> str:
"""Simulate document retrieval."""
return f"Relevant documentation for: {input_dict['query']}"
def fetch_examples(input_dict: dict) -> str:
"""Simulate example retrieval."""
return f"Code examples related to: {input_dict['query']}"
# --- Step 3: RunnableBranch for routing ---
technical_prompt = ChatPromptTemplate.from_template(
"Context: {docs}\nExamples: {examples}\n\n"
"As a senior engineer, answer: {query}"
)
general_prompt = ChatPromptTemplate.from_template(
"Context: {docs}\n\nAnswer clearly: {query}"
)
# --- Step 4: RunnableLambda for post-processing ---
def format_response(text: str) -> dict:
return {"answer": text, "status": "success", "char_count": len(text)}
# --- Compose the full pipeline ---
full_chain = (
# Preprocess input
RunnableLambda(preprocess)
# Add context in parallel (both run simultaneously)
| RunnablePassthrough.assign(
docs=RunnableLambda(fetch_docs),
examples=RunnableLambda(fetch_examples),
)
# Route based on query type
| RunnableBranch(
(lambda x: x["query_type"] == "technical", technical_prompt | model | parser),
general_prompt | model | parser, # Default
)
# Post-process
| RunnableLambda(format_response)
)
result = full_chain.invoke({"query": "How do I handle API rate limits in Python?"})
print(f"Status: {result['status']}")
print(f"Answer ({result['char_count']} chars): {result['answer'][:200]}...")
Pro Tip: Use RunnableParallel to fetch data from multiple sources simultaneously. For example, in a RAG pipeline, you can retrieve documents from multiple vector stores in parallel and then merge the results before passing to the LLM. The dict shorthand {"key1": chain1, "key2": chain2} is automatically wrapped as RunnableParallel by LCEL, so you can use either syntax interchangeably.
2. Chain Types & Composition
While LCEL is the modern way to compose chains, understanding the different chain patterns helps you architect complex AI workflows. Every pattern can be expressed in LCEL, but understanding the conceptual categories clarifies when to use each approach.
2.1 Sequential Chains
Sequential chains process data through a series of steps, where each step's output feeds into the next. This is the most common pattern in AI applications:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda
model = ChatOpenAI(model="gpt-4o", temperature=0)
parser = StrOutputParser()
# Step 1: Generate a detailed outline
outline_prompt = ChatPromptTemplate.from_template(
"Create a detailed outline for a blog post about: {topic}\n"
"Include 5 main sections with 3 sub-points each."
)
# Step 2: Write the full article from the outline
article_prompt = ChatPromptTemplate.from_template(
"Write a comprehensive blog post based on this outline:\n\n"
"{outline}\n\n"
"Make it engaging, informative, and approximately 1000 words."
)
# Step 3: Generate a summary and SEO metadata
meta_prompt = ChatPromptTemplate.from_template(
"Given this blog post:\n\n{article}\n\n"
"Generate:\n"
"1. A compelling title (max 60 chars)\n"
"2. Meta description (max 160 chars)\n"
"3. 5 SEO keywords\n"
"4. A 2-sentence summary"
)
# Compose the sequential chain with LCEL
# Each RunnableLambda reshapes the output for the next prompt
sequential_chain = (
outline_prompt | model | parser
| RunnableLambda(lambda outline: {"outline": outline})
| article_prompt | model | parser
| RunnableLambda(lambda article: {"article": article})
| meta_prompt | model | parser
)
result = sequential_chain.invoke({"topic": "Building RAG applications with LangChain"})
print(result)
Transform chains apply data transformations between LLM calls. They are essential for preprocessing inputs, post-processing outputs, and adapting data formats between chain stages:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda, RunnableParallel
model = ChatOpenAI(model="gpt-4o", temperature=0)
parser = StrOutputParser()
# Transform function: clean and structure raw text
def preprocess_document(raw_text: str) -> dict:
"""Clean and prepare document for processing."""
# Remove excessive whitespace
cleaned = " ".join(raw_text.split())
# Truncate if too long (respect context window)
if len(cleaned) > 10000:
cleaned = cleaned[:10000] + "... [truncated]"
return {
"document": cleaned,
"char_count": len(cleaned),
"word_count": len(cleaned.split()),
}
# Transform function: extract structured data from LLM output
def parse_analysis(llm_output: str) -> dict:
"""Parse the LLM's analysis into structured format."""
sections = llm_output.split("\n\n")
return {
"summary": sections[0] if sections else "",
"key_points": sections[1] if len(sections) > 1 else "",
"recommendations": sections[2] if len(sections) > 2 else "",
"raw_output": llm_output,
}
# Build the transform chain
analysis_prompt = ChatPromptTemplate.from_template(
"Analyze this document ({word_count} words):\n\n{document}\n\n"
"Provide:\n1. Executive summary\n\n2. Key points\n\n3. Recommendations"
)
transform_chain = (
RunnableLambda(preprocess_document)
| analysis_prompt
| model
| parser
| RunnableLambda(parse_analysis)
)
result = transform_chain.invoke("Retrieval-Augmented Generation (RAG) is an AI framework that improves Large Language Model (LLM) accuracy by retrieving data from external, trusted sources before generating a response. It reduces hallucinations and provides up-to-date information by grounding AI in specific documents, databases, or live web searches, rather than relying solely on static training data")
print(result)
2.3 Router Chains
Router chains dynamically direct input to different processing paths based on the content. This is critical for building systems that handle diverse query types:
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnableBranch, RunnableLambda
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI(model="gpt-4o", temperature=0)
parser = StrOutputParser()
# Classification prompt - determines which route to take
classify_prompt = ChatPromptTemplate.from_template(
"Classify this query into exactly one category: "
"technical, creative, analytical, or general.\n\n"
"Query: {query}\n\nCategory:"
)
# Specialized chains for each category
technical_prompt = ChatPromptTemplate.from_template(
"You are an expert software engineer. Provide a detailed "
"technical answer with code examples.\n\nQuery: {query}"
)
creative_prompt = ChatPromptTemplate.from_template(
"You are a creative writing assistant. Provide an imaginative, "
"engaging response.\n\nQuery: {query}"
)
analytical_prompt = ChatPromptTemplate.from_template(
"You are a data analyst. Provide a structured analysis with "
"data-driven insights and clear reasoning.\n\nQuery: {query}"
)
general_prompt = ChatPromptTemplate.from_template(
"You are a helpful assistant. Provide a clear, concise answer."
"\n\nQuery: {query}"
)
# Build specialized chains
technical_chain = technical_prompt | model | parser
creative_chain = creative_prompt | model | parser
analytical_chain = analytical_prompt | model | parser
general_chain = general_prompt | model | parser
# Router using RunnableBranch
def classify_and_route(input_dict):
"""Classify the query and route to the appropriate chain."""
classification = (classify_prompt | model | parser).invoke(input_dict)
category = classification.strip().lower()
route_map = {
"technical": technical_chain,
"creative": creative_chain,
"analytical": analytical_chain,
}
selected_chain = route_map.get(category, general_chain)
return selected_chain.invoke(input_dict)
router_chain = RunnableLambda(classify_and_route)
# Usage
result = router_chain.invoke({
"query": "Write a Python function for binary search with type hints"
})
print(result)
Architecture Pattern: Router chains are the foundation of intelligent assistants that can handle diverse tasks. By classifying intent first and then routing to specialized sub-chains, you get better results than a single generic prompt. This pattern scales well — you can add new routes without modifying existing ones.
3. Output Parsers
LLMs generate text, but applications need structured data. Output parsers bridge this gap by transforming raw LLM text into Python objects, dictionaries, lists, or Pydantic models. They are essential for building reliable, type-safe AI applications.
3.1 Structured Output Parsing
Raw LLM output is unstructured text, but applications need typed data — JSON objects, lists, numbers. LangChain's output parsers bridge this gap by injecting format instructions into the prompt and parsing the response into Python types. StrOutputParser extracts plain text, JsonOutputParser returns dictionaries, and CommaSeparatedListOutputParser returns Python lists. Each parser handles the formatting instructions and parsing logic so your chain code stays clean.
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import (
StrOutputParser,
JsonOutputParser,
CommaSeparatedListOutputParser,
)
from langchain_core.prompts import ChatPromptTemplate
model = ChatOpenAI(model="gpt-4o", temperature=0)
# String parser (simplest - extracts raw text)
str_parser = StrOutputParser()
# Comma-separated list parser
list_parser = CommaSeparatedListOutputParser()
list_prompt = ChatPromptTemplate.from_template(
"List 5 key benefits of {technology}.\n"
"{format_instructions}"
)
list_chain = list_prompt.partial(
format_instructions=list_parser.get_format_instructions()
) | model | list_parser
benefits = list_chain.invoke({"technology": "vector databases"})
# Returns: ["Fast similarity search", "Scalable", ...]
# JSON parser
json_parser = JsonOutputParser()
json_prompt = ChatPromptTemplate.from_template(
"Analyze this technology and return a JSON object with keys: "
"name, category, maturity (1-10), pros (list), cons (list).\n\n"
"Technology: {technology}\n\n{format_instructions}"
)
json_chain = json_prompt.partial(
format_instructions=json_parser.get_format_instructions()
) | model | json_parser
analysis = json_chain.invoke({"technology": "LangChain"})
# Returns: {"name": "LangChain", "category": "...", ...}
3.2 Pydantic Output Parser
The Pydantic parser is the most powerful option — it provides full type validation, default values, and automatic format instructions:
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import PydanticOutputParser
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field
from typing import List, Optional
model = ChatOpenAI(model="gpt-4o", temperature=0)
# Define your output schema with Pydantic
class ChainAnalysis(BaseModel):
"""Structured analysis of an AI processing chain."""
chain_name: str = Field(description="Name of the chain")
chain_type: str = Field(description="Type: sequential, router, or transform")
steps: List[str] = Field(description="Ordered list of processing steps")
estimated_latency_ms: int = Field(description="Estimated end-to-end latency in ms")
cost_per_call_usd: float = Field(description="Estimated cost per invocation in USD")
strengths: List[str] = Field(description="Key strengths of this chain design")
weaknesses: List[str] = Field(description="Potential weaknesses or risks")
recommendation: str = Field(description="Overall recommendation for production use")
# Create the parser
pydantic_parser = PydanticOutputParser(pydantic_object=ChainAnalysis)
# Build the chain with format instructions injected
analysis_prompt = ChatPromptTemplate.from_template(
"You are an AI systems architect. Analyze this chain design "
"and provide a structured assessment.\n\n"
"Chain Description: {chain_description}\n\n"
"{format_instructions}"
)
analysis_chain = (
analysis_prompt.partial(
format_instructions=pydantic_parser.get_format_instructions()
)
| model
| pydantic_parser
)
# Usage - returns a validated Pydantic object
result = analysis_chain.invoke({
"chain_description": "A RAG chain that retrieves from Pinecone, "
"reranks with Cohere, and generates with GPT-4o"
})
# Access typed fields directly
print(f"Chain: {result.chain_name}")
print(f"Steps: {result.steps}")
print(f"Cost: ${result.cost_per_call_usd:.4f}")
3.3 Auto-Fixing & Retry Parsers
LLMs sometimes produce malformed output. Auto-fixing parsers handle this gracefully by asking the LLM to correct its own mistakes:
from langchain_openai import ChatOpenAI
from langchain.output_parsers import OutputFixingParser, RetryOutputParser
model = ChatOpenAI(model="gpt-4o", temperature=0)
# Assumes pydantic_parser and ChainAnalysis from previous block
# Wrap any parser with auto-fix capability
fixing_parser = OutputFixingParser.from_llm(
parser=pydantic_parser,
llm=model,
)
# If the initial parse fails, it sends the error back to the LLM
# with instructions to fix the output format
# Retry parser - more powerful, includes the original prompt
retry_parser = RetryOutputParser.from_llm(
parser=pydantic_parser,
llm=model,
max_retries=3,
)
# Using with_structured_output (recommended for modern LangChain)
# This uses the model's native structured output capabilities
structured_model = model.with_structured_output(ChainAnalysis)
# Now the model directly returns a Pydantic object
result = structured_model.invoke(
"Analyze a RAG chain: retrieve from Chroma, rerank, generate with GPT-4o"
)
print(type(result)) # <class 'ChainAnalysis'>
Best Practice: For production systems, prefer model.with_structured_output() over output parsers when your model supports it (OpenAI, Anthropic, etc.). It uses the model's native function calling / tool use capabilities, which is more reliable than parsing free-form text. Fall back to Pydantic parsers for models that lack structured output support.
4. Tool Integration
Tools extend LLMs beyond text generation, giving them the ability to search the web, query databases, execute code, call APIs, and interact with the real world. LangChain's tool system provides a standardized way to define, bind, and invoke tools.
The @tool decorator is the simplest way to create a LangChain tool. Any Python function decorated with @tool becomes callable by an LLM agent. The function's docstring becomes the tool description the model reads when deciding which tool to use, and the type hints on parameters tell the model what arguments to provide. Well-written docstrings are critical — they are the model's only way to understand what the tool does.
from langchain_core.tools import tool
from typing import Optional
@tool
def search_documentation(query: str) -> str:
"""Search the project documentation for relevant information.
Args:
query: The search query to find relevant documentation sections.
"""
# In production, this would query a real search index
# For demo, we simulate a documentation search
docs = {
"installation": "Run pip install langchain langchain-openai",
"quickstart": "Import ChatOpenAI, create a prompt, pipe to model",
"streaming": "Use chain.stream() for token-by-token output",
"batch": "Use chain.batch([inputs]) for parallel processing",
}
results = [v for k, v in docs.items() if query.lower() in k.lower()]
return "\n".join(results) if results else "No documentation found."
@tool
def calculate_token_cost(
input_tokens: int,
output_tokens: int = 0,
model: str = "gpt-4o"
) -> str:
"""Calculate the cost of an LLM API call based on token usage.
Args:
input_tokens: Number of input/prompt tokens.
output_tokens: Number of output/completion tokens. Defaults to 0.
model: The model name for pricing lookup.
"""
pricing = {
"gpt-4o": {"input": 2.50, "output": 10.00}, # per 1M tokens
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"claude-3-5-sonnet": {"input": 3.00, "output": 15.00},
"claude-3-haiku": {"input": 0.25, "output": 1.25},
}
if model not in pricing:
return f"Unknown model: {model}. Available: {list(pricing.keys())}"
rates = pricing[model]
input_cost = (input_tokens / 1_000_000) * rates["input"]
output_cost = (output_tokens / 1_000_000) * rates["output"]
total = input_cost + output_cost
return (
f"Model: {model}\n"
f"Input: {input_tokens:,} tokens = ${input_cost:.6f}\n"
f"Output: {output_tokens:,} tokens = ${output_cost:.6f}\n"
f"Total: ${total:.6f}"
)
@tool
def execute_python_code(code: str) -> str:
"""Safely execute a Python code snippet and return the output.
Args:
code: Python code to execute. Must be a single expression or
a script that prints its output.
"""
import io
import contextlib
# Capture stdout
output = io.StringIO()
try:
with contextlib.redirect_stdout(output):
exec(code, {"__builtins__": __builtins__})
result = output.getvalue()
return result if result else "Code executed successfully (no output)"
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
Once tools are defined, you attach them to a chat model using model.bind_tools(tools). This registers the tool schemas with the model's function-calling interface. When you invoke the bound model, it can autonomously decide which tool to call and what arguments to pass, returning structured tool_calls in the response that your application dispatches to the correct function.
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
# Create model with tools bound
model = ChatOpenAI(model="gpt-4o", temperature=0)
tools = [search_documentation, calculate_token_cost, execute_python_code]
# Bind tools to the model
model_with_tools = model.bind_tools(tools)
# The model can now decide when to call tools
response = model_with_tools.invoke([
HumanMessage(content="How much would it cost to process "
"1 million input tokens with GPT-4o?")
])
# Check if the model wants to call a tool
if response.tool_calls:
for tool_call in response.tool_calls:
print(f"Tool: {tool_call['name']}")
print(f"Args: {tool_call['args']}")
# Execute the tool
tool_map = {t.name: t for t in tools}
tool_result = tool_map[tool_call['name']].invoke(tool_call['args'])
print(f"Result: {tool_result}")
else:
print(response.content)
For tools that need input validation, safety checks, or complex error handling, subclass BaseTool directly. This gives you a Pydantic args_schema for strict type enforcement, a _run method for the implementation, and ToolException for structured error reporting. The example below builds a database query tool with SQL injection prevention — a pattern you will use frequently in production agents that interact with sensitive systems.
from langchain_core.tools import BaseTool, ToolException
from pydantic import BaseModel, Field
from typing import Type, Optional
# For more complex tools, extend BaseTool
class DatabaseQueryInput(BaseModel):
"""Input schema for the database query tool."""
query: str = Field(description="SQL query to execute")
database: str = Field(default="production", description="Target database")
limit: int = Field(default=100, description="Max rows to return")
class DatabaseQueryTool(BaseTool):
"""Tool for querying databases with safety checks."""
name: str = "database_query"
description: str = (
"Execute a read-only SQL query against the specified database. "
"Only SELECT queries are allowed. Use this when you need to "
"look up data, check records, or analyze database contents."
)
args_schema: Type[BaseModel] = DatabaseQueryInput
def _run(
self,
query: str,
database: str = "production",
limit: int = 100,
) -> str:
"""Execute the database query."""
# Safety check: only allow SELECT queries
if not query.strip().upper().startswith("SELECT"):
raise ToolException(
"Only SELECT queries are allowed. "
"Mutations must go through the admin interface."
)
# In production, this would use a real database connection
return f"Executed on {database}: {query} (limit: {limit})"
async def _arun(self, query: str, **kwargs) -> str:
"""Async version of the tool."""
return self._run(query, **kwargs)
# Use the custom tool
db_tool = DatabaseQueryTool()
result = db_tool.invoke({
"query": "SELECT COUNT(*) FROM users WHERE active = true",
"database": "analytics"
})
Key Design Decision: The @tool decorator is ideal for quick, simple tools. For production tools that need input validation, error handling, async support, or complex configuration, extend BaseTool instead. Both approaches produce tools that are fully compatible with LCEL and the agent framework.
5. Callbacks & Tracing
Observability is critical for production AI applications. LangChain's callback system provides hooks into every stage of chain execution — from LLM calls and tool invocations to retriever queries and chain completions. This powers logging, monitoring, cost tracking, and debugging.
5.1 Callback Handlers
LangChain's callback system lets you hook into every stage of chain execution — LLM calls, tool invocations, errors — without modifying your chain code. By subclassing BaseCallbackHandler and implementing methods like on_llm_start, on_llm_end, and on_tool_start, you can track performance metrics (latency, token usage, cost), log execution traces, and detect anomalies in real time. This is the foundation of production observability for LLM applications.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.outputs import LLMResult
from typing import Any, Dict, List
import time
import json
class PerformanceCallbackHandler(BaseCallbackHandler):
"""Track performance metrics for every chain execution."""
def __init__(self):
self.metrics = {
"llm_calls": 0,
"total_tokens": 0,
"total_cost_usd": 0.0,
"tool_calls": 0,
"errors": 0,
"timings": [],
}
self._start_times = {}
def on_llm_start(self, serialized: Dict, prompts: List[str], **kwargs):
run_id = kwargs.get("run_id", "unknown")
self._start_times[str(run_id)] = time.time()
self.metrics["llm_calls"] += 1
print(f"[LLM] Starting call #{self.metrics['llm_calls']}")
def on_llm_end(self, response: LLMResult, **kwargs):
run_id = kwargs.get("run_id", "unknown")
elapsed = time.time() - self._start_times.pop(str(run_id), time.time())
self.metrics["timings"].append(elapsed)
# Track token usage if available
if response.llm_output and "token_usage" in response.llm_output:
usage = response.llm_output["token_usage"]
self.metrics["total_tokens"] += usage.get("total_tokens", 0)
print(f"[LLM] Completed in {elapsed:.2f}s")
def on_tool_start(self, serialized: Dict, input_str: str, **kwargs):
tool_name = serialized.get("name", "unknown")
self.metrics["tool_calls"] += 1
print(f"[Tool] Calling: {tool_name}")
def on_tool_end(self, output: str, **kwargs):
print(f"[Tool] Result received ({len(output)} chars)")
def on_llm_error(self, error: Exception, **kwargs):
self.metrics["errors"] += 1
print(f"[ERROR] LLM error: {error}")
def get_summary(self) -> dict:
avg_time = (
sum(self.metrics["timings"]) / len(self.metrics["timings"])
if self.metrics["timings"] else 0
)
return {
**self.metrics,
"avg_latency_s": round(avg_time, 3),
"total_latency_s": round(sum(self.metrics["timings"]), 3),
}
# Usage: build a chain and attach the callback handler
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant that explains {topic} concepts."),
("human", "{question}")
])
model = ChatOpenAI(model="gpt-4o", temperature=0.7)
parser = StrOutputParser()
chain = prompt | model | parser
perf_handler = PerformanceCallbackHandler()
result = chain.invoke(
{"topic": "AI", "question": "What is LangChain?"},
config={"callbacks": [perf_handler]}
)
print(json.dumps(perf_handler.get_summary(), indent=2))
5.2 LangSmith Tracing
LangSmith is LangChain's hosted observability platform that automatically traces every LLM call, chain step, and tool invocation in your application. By setting environment variables (LANGCHAIN_TRACING_V2=true and LANGCHAIN_API_KEY), all chain executions are logged to the LangSmith dashboard with detailed timing, token counts, and cost breakdowns — no code changes required.
# pip install langsmith
# LangSmith provides production-grade tracing and evaluation
# Enable by setting environment variables:
# export LANGCHAIN_TRACING_V2="true"
# export LANGCHAIN_API_KEY="ls__..."
# export LANGCHAIN_PROJECT="my-project-name"
import os
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# Enable LangSmith tracing
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "langchain-core-tutorial"
# Set your LangSmith API key: export LANGCHAIN_API_KEY="ls__..."
# Build a chain to trace
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant that explains {topic} concepts."),
("human", "{question}")
])
model = ChatOpenAI(model="gpt-4o", temperature=0.7)
parser = StrOutputParser()
chain = prompt | model | parser
# Every chain.invoke() is now automatically traced in LangSmith
# You get:
# - Full execution trace with inputs/outputs at every step
# - Token usage and cost tracking
# - Latency breakdown per component
# - Error tracking and debugging
# - Dataset creation for evaluation
# Add custom metadata to traces
result = chain.invoke(
{"topic": "AI", "question": "Explain LCEL"},
config={
"metadata": {
"user_id": "user-123",
"session_id": "session-456",
"environment": "production",
},
"tags": ["tutorial", "lcel-demo"],
}
)
# You can also use the @traceable decorator for custom functions
from langsmith import traceable
@traceable(name="my_custom_pipeline")
def process_query(query: str) -> str:
"""Custom pipeline that is fully traced in LangSmith."""
classify_prompt = ChatPromptTemplate.from_template(
"Classify this query into one category: technical, creative, "
"analytical, or general.\n\nQuery: {query}\n\nCategory:"
)
# Step 1: Classify the query
category = (classify_prompt | model | parser).invoke({"query": query})
# Step 2: Process with the classified category
result = chain.invoke({"topic": category.strip(), "question": query})
return result
Production Warning: Always enable tracing in production. Without it, debugging failures in multi-step chains is nearly impossible. LangSmith is free for up to 5,000 traces per month. For self-hosted alternatives, consider Langfuse (open source) or Phoenix by Arize.
6. LangChain vs LlamaIndex vs Haystack
While LangChain is the most popular orchestration framework, it is not the only option. Choosing the right framework depends on your primary use case. Here is a comprehensive comparison of the three major frameworks for AI data processing pipelines:
6.1 Comprehensive Comparison Table
| Dimension |
LangChain |
LlamaIndex |
Haystack |
| Primary Focus |
General-purpose LLM orchestration |
Data indexing & retrieval (RAG-first) |
Production NLP/LLM pipelines |
| Composition Model |
LCEL pipe operator, Runnables |
QueryEngine, Index, Retriever abstractions |
Pipeline DAG with components |
| Chain/Pipeline |
LCEL chains (prompt | model | parser) |
QueryPipeline with modules |
Pipeline with nodes and connections |
| Data Ingestion |
Document loaders (100+ sources) |
Data connectors (LlamaHub, 150+ sources) |
FileConverters, preprocessors |
| Indexing/Chunking |
TextSplitters (recursive, token, semantic) |
Node parsers, hierarchical indexing, auto-merging |
PreProcessors, DocumentSplitter |
| Vector Store Support |
60+ integrations |
40+ integrations |
20+ integrations (focus on quality) |
| Agent Framework |
ReAct, OpenAI tools, custom agents, LangGraph |
ReAct agent, tool-calling agent |
Agent component (newer addition) |
| Advanced RAG |
Requires manual composition |
Built-in: HyDE, sentence-window, auto-merging, recursive |
Pipeline-based composition |
| Streaming |
First-class LCEL support |
Supported via callback handlers |
Supported in pipeline components |
| Evaluation |
LangSmith (SaaS), third-party |
Built-in eval modules (faithfulness, relevancy) |
Built-in evaluation pipelines |
| Tracing/Observability |
LangSmith (best-in-class) |
Arize Phoenix, callbacks |
Pipeline logs, Haystack Tracing |
| Learning Curve |
Moderate (LCEL concepts, many abstractions) |
Lower for RAG (higher-level abstractions) |
Lower (clean component model) |
| Community Size |
Largest (85k+ GitHub stars) |
Large (35k+ GitHub stars) |
Moderate (17k+ GitHub stars) |
| Production Readiness |
High (LangServe, LangGraph Cloud) |
High (LlamaCloud for managed RAG) |
High (deepset Cloud, Hayhooks) |
| License |
MIT |
MIT |
Apache 2.0 |
6.2 When to Use Which Framework
Choose LangChain when: You need general-purpose LLM orchestration, complex agent workflows, or maximum flexibility. Best for applications that combine RAG + agents + tools + memory in novel ways. Ideal if you plan to use LangGraph for stateful multi-step agents.
Choose LlamaIndex when: Your primary use case is RAG and you want the fastest path to production-quality retrieval. LlamaIndex has the best built-in advanced RAG techniques (auto-merging, sentence window, recursive retrieval) and the most data connectors via LlamaHub.
Choose Haystack when: You want a clean, production-focused pipeline framework with strong typing and explicit data flow. Haystack excels at traditional NLP + LLM hybrid pipelines and has excellent documentation. Best for teams that value explicit over implicit behavior.
LangChain RAG Pipeline
# pip install langchain langchain-openai langchain-chroma chromadb
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_chroma import Chroma
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain_core.documents import Document
embeddings = OpenAIEmbeddings()
# Create vectorstore with sample documents so the retriever has content
sample_docs = [
Document(page_content="LCEL (LangChain Expression Language) is a declarative way to compose "
"chains using the pipe operator (|). It connects prompts, models, and parsers."),
Document(page_content="LangChain is a framework for building LLM-powered applications with "
"tools for prompt management, memory, retrieval, and agent orchestration."),
Document(page_content="RAG (Retrieval-Augmented Generation) retrieves relevant documents "
"from a vector store and passes them as context to an LLM for grounded answers."),
]
vectorstore = Chroma.from_documents(sample_docs, embeddings)
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
prompt = ChatPromptTemplate.from_template(
"Answer based on context:\n{context}\n\nQuestion: {question}"
)
langchain_rag = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt | ChatOpenAI(model="gpt-4o") | StrOutputParser()
)
result = langchain_rag.invoke("What is LCEL?")
print(result)
LlamaIndex RAG Pipeline
# pip install llama-index
import tempfile, os
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
# Create a temporary directory with sample documents
tmp_dir = tempfile.mkdtemp()
for i, text in enumerate([
"LCEL is a declarative way to compose chains using the pipe operator.",
"LangChain is a framework for building LLM-powered applications.",
"RAG retrieves relevant documents and passes them as context to an LLM.",
]):
with open(os.path.join(tmp_dir, f"doc{i}.txt"), "w") as f:
f.write(text)
documents = SimpleDirectoryReader(tmp_dir).load_data()
index = VectorStoreIndex.from_documents(documents)
llamaindex_rag = index.as_query_engine(similarity_top_k=3)
result = llamaindex_rag.query("What is LCEL?")
print(result)
Haystack RAG Pipeline
# pip install haystack-ai
from haystack import Pipeline, Document
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders import PromptBuilder
from haystack.document_stores.in_memory import InMemoryDocumentStore
# Initialize document store with sample documents
doc_store = InMemoryDocumentStore()
doc_store.write_documents([
Document(content="LCEL is a declarative way to compose chains using the pipe operator."),
Document(content="LangChain is a framework for building LLM-powered applications."),
Document(content="RAG retrieves relevant documents and passes them as context to an LLM."),
])
haystack_rag = Pipeline()
haystack_rag.add_component("retriever", InMemoryBM25Retriever(document_store=doc_store))
haystack_rag.add_component("prompt", PromptBuilder(
template="Context: {{documents}}\nQuestion: {{question}}",
required_variables=["question", "documents"],
))
haystack_rag.add_component("llm", OpenAIGenerator(model="gpt-4o"))
haystack_rag.connect("retriever", "prompt.documents")
haystack_rag.connect("prompt", "llm")
result = haystack_rag.run({
"retriever": {"query": "What is LCEL?"},
"prompt": {"question": "What is LCEL?"},
})
print(result["llm"]["replies"][0])
Notice how LlamaIndex requires the fewest lines for a basic RAG pipeline — that is its sweet spot. LangChain offers maximum composability with the pipe operator. Haystack provides the most explicit data flow with named connections.
Exercises & Self-Assessment
Hands-On Exercises
- Build an LCEL Chain: Create a chain that takes a topic, generates 3 quiz questions, parses them into a list, and formats them as a numbered quiz. Use
RunnableParallel to generate questions at three difficulty levels simultaneously.
- Router Chain: Build a customer support router that classifies queries into billing, technical, and general categories, then routes each to a specialized prompt. Add a fallback for unclassified queries.
- Pydantic Output Parser: Define a Pydantic model for a code review (file, severity, issues list, suggestions list, score). Build a chain that analyzes code snippets and returns validated structured reviews.
- Custom Tool: Create a
@tool that queries a REST API (use a public API like JSONPlaceholder). Bind it to a model and build a conversational chain that uses the tool when needed.
- Callback Handler: Implement a callback handler that logs every LLM call to a JSON file with timestamp, input, output, token count, and latency. Run a multi-step chain and analyze the log.
Critical Thinking Questions
- What are the trade-offs between using LCEL's pipe operator versus building chains programmatically with explicit function calls? When would you choose one over the other?
- Output parsers can fail when LLMs produce unexpected formats. Compare the retry parser approach with
with_structured_output(). Which is more reliable and why?
- When building a router chain, is it better to use an LLM for classification or a lightweight classifier (like a fine-tuned BERT model)? Analyze the latency, cost, and accuracy trade-offs.
- Compare LangChain's callback system with OpenTelemetry-based observability. What are the advantages of each for production AI applications?
- You are building a document processing pipeline that handles PDFs, CSVs, and APIs. Would you choose LangChain, LlamaIndex, or Haystack? Justify your choice with specific technical reasons.
Conclusion & Next Steps
You now have a deep understanding of the core building blocks that power every LangChain application. Here are the key takeaways from Part 4:
- LCEL and the pipe operator provide a declarative, composable way to build chains with automatic streaming, batching, and async support
- The Runnable protocol gives every component the same interface (invoke, stream, batch), making them interchangeable and composable
- Chain patterns — Sequential for multi-step pipelines, Transform for data manipulation, Router for dynamic routing — cover the vast majority of real-world use cases
- Output parsers (especially Pydantic and
with_structured_output) transform raw LLM text into validated, typed Python objects
- Tool integration with the
@tool decorator and bind_tools() extends LLMs beyond text generation
- Callbacks and LangSmith tracing provide the observability layer that production AI applications require
- LangChain vs LlamaIndex vs Haystack — each framework has its sweet spot; LangChain for orchestration, LlamaIndex for RAG, Haystack for explicit pipeline composition
Next in the Series
In Part 5: Retrieval-Augmented Generation (RAG), we dive deep into embeddings, vector databases (FAISS vs Pinecone vs Weaviate vs Chroma vs pgvector vs Qdrant), document loaders, text splitting strategies, retriever patterns, and advanced RAG techniques like HyDE, RAG fusion, and parent document retrieval.
Continue the Series
Part 5: Retrieval-Augmented Generation (RAG)
Master embeddings, vector databases, document loaders, retrievers, and advanced RAG patterns including HyDE and RAG fusion.
Read Article
Part 6: Memory & Context Engineering
Learn buffer, summary, window, vector, and entity memory patterns plus context engineering, chunking, and re-ranking strategies.
Read Article
Part 3: Prompt Engineering Mastery
Review zero-shot, few-shot, chain-of-thought, and ReAct prompting techniques that underpin effective chain design.
Read Article