Markov Decision Process
An MDP formally defines the RL problem as a tuple $(S, A, P, R, \gamma)$:
- $S$ — state space
- $A$ — action space
- $P(s' | s, a)$ — transition probability
- $R(s, a, s')$ — reward function
- $\gamma \in [0,1)$ — discount factor
GridWorld Environment
import torch
class GridWorld:
"""
Simple deterministic GridWorld MDP.
States: (row, col) flattened to integer.
Actions: 0=UP, 1=RIGHT, 2=DOWN, 3=LEFT.
Goal: reach bottom-right corner.
"""
def __init__(self, rows=4, cols=4):
self.rows = rows
self.cols = cols
self.n_states = rows * cols
self.n_actions = 4
self._deltas = [(-1, 0), (0, 1), (1, 0), (0, -1)] # UP, RIGHT, DOWN, LEFT
def _state_to_rc(self, s):
return s // self.cols, s % self.cols
def _rc_to_state(self, r, c):
return r * self.cols + c
def step(self, state, action):
"""Take action, return (next_state, reward, done)."""
r, c = self._state_to_rc(state)
dr, dc = self._deltas[action]
nr, nc = r + dr, c + dc
# Clamp to grid bounds (wall = stay in place)
nr = max(0, min(self.rows - 1, nr))
nc = max(0, min(self.cols - 1, nc))
next_state = self._rc_to_state(nr, nc)
done = next_state == self.n_states - 1 # Goal: bottom-right
reward = 1.0 if done else -0.01 # Small step penalty encourages efficiency
return next_state, reward, done
def reset(self):
"""Start at top-left (state 0)."""
return 0
def render(self, state=None):
"""Print grid with agent position."""
for r in range(self.rows):
row = ""
for c in range(self.cols):
s = self._rc_to_state(r, c)
if s == state:
row += " A "
elif s == self.n_states - 1:
row += " G "
else:
row += " . "
print(row)
# Demo: random walk
env = GridWorld(4, 4)
state = env.reset()
total_reward = 0.0
torch.manual_seed(42)
for step_num in range(20):
action = torch.randint(0, 4, (1,)).item()
state, reward, done = env.step(state, action)
total_reward += reward
if done:
print(f"Reached goal at step {step_num + 1}! Total reward: {total_reward:.2f}")
break
else:
print(f"Did not reach goal in 20 steps. Total reward: {total_reward:.2f}")
print("\nFinal grid state:")
env.render(state)
Value Functions
The state value function $V^\pi(s)$ is the expected cumulative discounted reward starting from state $s$ following policy $\pi$:
$$V^\pi(s) = \mathbb{E}_\pi\left[\sum_{t=0}^{\infty} \gamma^t R_t \,\Big|\, S_0 = s\right]$$
Bellman Equation
The Bellman equation expresses the value function recursively:
$$V^\pi(s) = \sum_a \pi(a|s) \sum_{s'} P(s'|s,a) \left[ R(s,a,s') + \gamma V^\pi(s') \right]$$
import torch
class GridWorld:
def __init__(self, rows=4, cols=4):
self.rows = rows
self.cols = cols
self.n_states = rows * cols
self.n_actions = 4
self._deltas = [(-1, 0), (0, 1), (1, 0), (0, -1)]
def _rc(self, s): return s // self.cols, s % self.cols
def _st(self, r, c): return r * self.cols + c
def step(self, s, a):
if s == self.n_states - 1: return s, 0.0, True
r, c = self._rc(s)
dr, dc = self._deltas[a]
nr, nc = max(0, min(self.rows-1, r+dr)), max(0, min(self.cols-1, c+dc))
ns = self._st(nr, nc)
done = ns == self.n_states - 1
return ns, (1.0 if done else -0.01), done
def policy_evaluation(env, policy, gamma=0.9, tol=1e-6, max_iter=1000):
"""
Compute V^pi using iterative Bellman updates.
policy: (n_states, n_actions) probability tensor
Returns: V (n_states,) value tensor
"""
n = env.n_states
V = torch.zeros(n)
for iteration in range(max_iter):
V_new = torch.zeros(n)
for s in range(n):
if s == n - 1: # Terminal state
continue
for a in range(env.n_actions):
ns, r, _ = env.step(s, a)
V_new[s] += policy[s, a] * (r + gamma * V[ns])
delta = (V_new - V).abs().max().item()
V = V_new
if delta < tol:
print(f"Policy evaluation converged in {iteration + 1} iterations")
break
return V
env = GridWorld(4, 4)
# Uniform random policy
policy = torch.ones(env.n_states, env.n_actions) / env.n_actions
V = policy_evaluation(env, policy, gamma=0.9)
V_grid = V.reshape(env.rows, env.cols)
print("State values under uniform random policy:")
print(V_grid.round(decimals=3))
Q-Values (Action-Value Function)
import torch
class GridWorld:
def __init__(self, rows=4, cols=4):
self.rows = rows; self.cols = cols
self.n_states = rows * cols; self.n_actions = 4
self._deltas = [(-1, 0), (0, 1), (1, 0), (0, -1)]
def _rc(self, s): return s // self.cols, s % self.cols
def _st(self, r, c): return r * self.cols + c
def step(self, s, a):
if s == self.n_states - 1: return s, 0.0, True
r, c = self._rc(s)
dr, dc = self._deltas[a]
nr, nc = max(0, min(self.rows-1, r+dr)), max(0, min(self.cols-1, c+dc))
ns = self._st(nr, nc)
return ns, (1.0 if ns==self.n_states-1 else -0.01), ns==self.n_states-1
def compute_q_from_v(env, V, gamma=0.9):
"""
Q(s,a) = sum_{s'} P(s'|s,a) [R(s,a,s') + gamma * V(s')]
For deterministic environments: Q(s,a) = R(s,a,s') + gamma * V(s')
"""
n, m = env.n_states, env.n_actions
Q = torch.zeros(n, m)
for s in range(n):
for a in range(m):
ns, r, _ = env.step(s, a)
Q[s, a] = r + gamma * V[ns]
return Q
env = GridWorld(4, 4)
# Use V from previous policy evaluation (simplified: use value iteration result)
V = torch.zeros(env.n_states)
# Run a quick value iteration to get good V
for _ in range(500):
V_new = torch.zeros(env.n_states)
for s in range(env.n_states - 1):
best = -float('inf')
for a in range(env.n_actions):
ns, r, _ = env.step(s, a)
best = max(best, r + 0.9 * V[ns])
V_new[s] = best
V = V_new
Q = compute_q_from_v(env, V, gamma=0.9)
greedy_policy = Q.argmax(dim=1)
action_names = ['UP', 'RIGHT', 'DOWN', 'LEFT']
print("Greedy policy (optimal):")
print([[action_names[greedy_policy[r*4+c].item()] for c in range(4)] for r in range(4)])
Exploration Strategies
Epsilon-Greedy
import torch
import random
def epsilon_greedy(q_values, epsilon):
"""
With probability epsilon: explore (random action).
With probability 1-epsilon: exploit (greedy action).
q_values: (n_actions,) tensor for current state
"""
if random.random() < epsilon:
return torch.randint(len(q_values), (1,)).item()
return q_values.argmax().item()
# Epsilon decay schedule: linearly anneal over training
def epsilon_schedule(step, total_steps, eps_start=1.0, eps_end=0.01):
"""Linearly decay epsilon from eps_start to eps_end."""
progress = step / total_steps
return max(eps_end, eps_start - progress * (eps_start - eps_end))
# Demo: how epsilon affects exploration
torch.manual_seed(42)
q_values = torch.tensor([0.1, 0.5, 0.2, 0.8]) # Q-values for 4 actions
print("Q-values:", q_values.tolist())
print(f"Greedy action: {q_values.argmax().item()} (Q={q_values.max():.2f})")
for eps in [0.0, 0.1, 0.3, 0.8]:
counts = [0] * 4
for _ in range(10000):
a = epsilon_greedy(q_values, eps)
counts[a] += 1
print(f"eps={eps:.1f}: action frequencies = {[f'{c/100:.1f}%' for c in counts]}")
Upper Confidence Bound (UCB)
import torch
import math
class UCBBandit:
"""
Upper Confidence Bound action selection.
Balances exploration and exploitation via uncertainty bonus.
UCB1: a* = argmax Q(a) + c * sqrt(ln(t) / N(a))
"""
def __init__(self, n_actions, c=2.0):
self.n_actions = n_actions
self.c = c
self.q_estimates = torch.zeros(n_actions) # Q-value estimates
self.counts = torch.zeros(n_actions) # Times each action taken
self.t = 0 # Total steps
def select_action(self):
self.t += 1
# Force initial exploration: try each action once
if (self.counts == 0).any():
return (self.counts == 0).nonzero(as_tuple=True)[0][0].item()
# UCB score
ucb_scores = self.q_estimates + self.c * torch.sqrt(
torch.log(torch.tensor(float(self.t))) / self.counts
)
return ucb_scores.argmax().item()
def update(self, action, reward):
"""Incremental mean update."""
self.counts[action] += 1
n = self.counts[action]
self.q_estimates[action] += (reward - self.q_estimates[action]) / n
# Compare epsilon-greedy vs UCB on 5-armed bandit
torch.manual_seed(42)
true_means = torch.tensor([0.2, 0.5, 0.8, 0.3, 0.6]) # True action values
n_steps = 2000
# UCB agent
ucb_agent = UCBBandit(n_actions=5, c=2.0)
ucb_rewards = []
for _ in range(n_steps):
a = ucb_agent.select_action()
r = true_means[a] + torch.randn(1).item() * 0.1 # Noisy reward
ucb_agent.update(a, r)
ucb_rewards.append(r)
print(f"UCB — mean reward: {sum(ucb_rewards)/len(ucb_rewards):.4f}")
print(f"UCB — final Q estimates: {ucb_agent.q_estimates.round(decimals=3).tolist()}")
print(f"UCB — true values: {true_means.tolist()}")
print(f"UCB — optimal action pulled: {ucb_agent.counts[2].item():.0f}/{n_steps} times")
Multi-Armed Bandit Benchmark
import torch
import random
torch.manual_seed(42)
true_means = torch.tensor([0.1, 0.3, 0.7, 0.4, 0.2]) # Arm 2 is best
n_steps = 1000
optimal_action = true_means.argmax().item()
def run_epsilon_greedy(eps, n_steps=1000, seed=42):
random.seed(seed); torch.manual_seed(seed)
q = torch.zeros(5); counts = torch.zeros(5); total = 0.0
for t in range(n_steps):
a = random.randint(0, 4) if random.random() < eps else q.argmax().item()
r = true_means[a] + torch.randn(1).item() * 0.2
counts[a] += 1
q[a] += (r - q[a]) / counts[a]
total += r
return total / n_steps, (counts[optimal_action] / n_steps).item()
def run_ucb(c, n_steps=1000, seed=42):
torch.manual_seed(seed)
q = torch.zeros(5); counts = torch.zeros(5); total = 0.0
for t in range(1, n_steps + 1):
if (counts == 0).any():
a = (counts == 0).nonzero(as_tuple=True)[0][0].item()
else:
a = (q + c * (torch.log(torch.tensor(float(t))) / counts).sqrt()).argmax().item()
r = true_means[a] + torch.randn(1).item() * 0.2
counts[a] += 1
q[a] += (r - q[a]) / counts[a]
total += r
return total / n_steps, (counts[optimal_action] / n_steps).item()
print(f"Optimal arm: {optimal_action} (true value: {true_means[optimal_action]:.2f})")
print(f"\nStrategy | Mean Reward | Optimal Pull Rate")
for eps in [0.01, 0.1, 0.3]:
mr, opr = run_epsilon_greedy(eps)
print(f"Eps-greedy e={eps:.2f} | {mr:.4f} | {opr*100:.1f}%")
for c in [0.5, 2.0, 5.0]:
mr, opr = run_ucb(c)
print(f"UCB c={c:.1f} | {mr:.4f} | {opr*100:.1f}%")