Back to PyTorch Mastery Series

Reinforcement Learning Fundamentals

May 29, 2026 Wasil Zafar 30 min read

Build RL foundations from the ground up: Markov Decision Processes, value functions, the Bellman equation, and principled exploration strategies — all implemented as runnable PyTorch code.

Table of Contents

  1. Markov Decision Process
  2. Value Functions
  3. Exploration Strategies
  4. Multi-Armed Bandit
  5. Related Articles

Markov Decision Process

An MDP formally defines the RL problem as a tuple $(S, A, P, R, \gamma)$:

  • $S$ — state space
  • $A$ — action space
  • $P(s' | s, a)$ — transition probability
  • $R(s, a, s')$ — reward function
  • $\gamma \in [0,1)$ — discount factor

GridWorld Environment

import torch


class GridWorld:
    """
    Simple deterministic GridWorld MDP.
    States: (row, col) flattened to integer.
    Actions: 0=UP, 1=RIGHT, 2=DOWN, 3=LEFT.
    Goal: reach bottom-right corner.
    """

    def __init__(self, rows=4, cols=4):
        self.rows = rows
        self.cols = cols
        self.n_states = rows * cols
        self.n_actions = 4
        self._deltas = [(-1, 0), (0, 1), (1, 0), (0, -1)]  # UP, RIGHT, DOWN, LEFT

    def _state_to_rc(self, s):
        return s // self.cols, s % self.cols

    def _rc_to_state(self, r, c):
        return r * self.cols + c

    def step(self, state, action):
        """Take action, return (next_state, reward, done)."""
        r, c = self._state_to_rc(state)
        dr, dc = self._deltas[action]
        nr, nc = r + dr, c + dc

        # Clamp to grid bounds (wall = stay in place)
        nr = max(0, min(self.rows - 1, nr))
        nc = max(0, min(self.cols - 1, nc))

        next_state = self._rc_to_state(nr, nc)
        done = next_state == self.n_states - 1  # Goal: bottom-right
        reward = 1.0 if done else -0.01  # Small step penalty encourages efficiency
        return next_state, reward, done

    def reset(self):
        """Start at top-left (state 0)."""
        return 0

    def render(self, state=None):
        """Print grid with agent position."""
        for r in range(self.rows):
            row = ""
            for c in range(self.cols):
                s = self._rc_to_state(r, c)
                if s == state:
                    row += " A "
                elif s == self.n_states - 1:
                    row += " G "
                else:
                    row += " . "
            print(row)


# Demo: random walk
env = GridWorld(4, 4)
state = env.reset()
total_reward = 0.0

torch.manual_seed(42)
for step_num in range(20):
    action = torch.randint(0, 4, (1,)).item()
    state, reward, done = env.step(state, action)
    total_reward += reward
    if done:
        print(f"Reached goal at step {step_num + 1}! Total reward: {total_reward:.2f}")
        break
else:
    print(f"Did not reach goal in 20 steps. Total reward: {total_reward:.2f}")

print("\nFinal grid state:")
env.render(state)

Value Functions

The state value function $V^\pi(s)$ is the expected cumulative discounted reward starting from state $s$ following policy $\pi$:

$$V^\pi(s) = \mathbb{E}_\pi\left[\sum_{t=0}^{\infty} \gamma^t R_t \,\Big|\, S_0 = s\right]$$

Bellman Equation

The Bellman equation expresses the value function recursively:

$$V^\pi(s) = \sum_a \pi(a|s) \sum_{s'} P(s'|s,a) \left[ R(s,a,s') + \gamma V^\pi(s') \right]$$

import torch


class GridWorld:
    def __init__(self, rows=4, cols=4):
        self.rows = rows
        self.cols = cols
        self.n_states = rows * cols
        self.n_actions = 4
        self._deltas = [(-1, 0), (0, 1), (1, 0), (0, -1)]

    def _rc(self, s): return s // self.cols, s % self.cols
    def _st(self, r, c): return r * self.cols + c

    def step(self, s, a):
        if s == self.n_states - 1: return s, 0.0, True
        r, c = self._rc(s)
        dr, dc = self._deltas[a]
        nr, nc = max(0, min(self.rows-1, r+dr)), max(0, min(self.cols-1, c+dc))
        ns = self._st(nr, nc)
        done = ns == self.n_states - 1
        return ns, (1.0 if done else -0.01), done


def policy_evaluation(env, policy, gamma=0.9, tol=1e-6, max_iter=1000):
    """
    Compute V^pi using iterative Bellman updates.
    policy: (n_states, n_actions) probability tensor
    Returns: V (n_states,) value tensor
    """
    n = env.n_states
    V = torch.zeros(n)

    for iteration in range(max_iter):
        V_new = torch.zeros(n)
        for s in range(n):
            if s == n - 1:  # Terminal state
                continue
            for a in range(env.n_actions):
                ns, r, _ = env.step(s, a)
                V_new[s] += policy[s, a] * (r + gamma * V[ns])

        delta = (V_new - V).abs().max().item()
        V = V_new
        if delta < tol:
            print(f"Policy evaluation converged in {iteration + 1} iterations")
            break

    return V


env = GridWorld(4, 4)
# Uniform random policy
policy = torch.ones(env.n_states, env.n_actions) / env.n_actions

V = policy_evaluation(env, policy, gamma=0.9)
V_grid = V.reshape(env.rows, env.cols)
print("State values under uniform random policy:")
print(V_grid.round(decimals=3))

Q-Values (Action-Value Function)

import torch


class GridWorld:
    def __init__(self, rows=4, cols=4):
        self.rows = rows; self.cols = cols
        self.n_states = rows * cols; self.n_actions = 4
        self._deltas = [(-1, 0), (0, 1), (1, 0), (0, -1)]
    def _rc(self, s): return s // self.cols, s % self.cols
    def _st(self, r, c): return r * self.cols + c
    def step(self, s, a):
        if s == self.n_states - 1: return s, 0.0, True
        r, c = self._rc(s)
        dr, dc = self._deltas[a]
        nr, nc = max(0, min(self.rows-1, r+dr)), max(0, min(self.cols-1, c+dc))
        ns = self._st(nr, nc)
        return ns, (1.0 if ns==self.n_states-1 else -0.01), ns==self.n_states-1


def compute_q_from_v(env, V, gamma=0.9):
    """
    Q(s,a) = sum_{s'} P(s'|s,a) [R(s,a,s') + gamma * V(s')]
    For deterministic environments: Q(s,a) = R(s,a,s') + gamma * V(s')
    """
    n, m = env.n_states, env.n_actions
    Q = torch.zeros(n, m)
    for s in range(n):
        for a in range(m):
            ns, r, _ = env.step(s, a)
            Q[s, a] = r + gamma * V[ns]
    return Q


env = GridWorld(4, 4)
# Use V from previous policy evaluation (simplified: use value iteration result)
V = torch.zeros(env.n_states)
# Run a quick value iteration to get good V
for _ in range(500):
    V_new = torch.zeros(env.n_states)
    for s in range(env.n_states - 1):
        best = -float('inf')
        for a in range(env.n_actions):
            ns, r, _ = env.step(s, a)
            best = max(best, r + 0.9 * V[ns])
        V_new[s] = best
    V = V_new

Q = compute_q_from_v(env, V, gamma=0.9)
greedy_policy = Q.argmax(dim=1)
action_names = ['UP', 'RIGHT', 'DOWN', 'LEFT']
print("Greedy policy (optimal):")
print([[action_names[greedy_policy[r*4+c].item()] for c in range(4)] for r in range(4)])

Exploration Strategies

Epsilon-Greedy

import torch
import random


def epsilon_greedy(q_values, epsilon):
    """
    With probability epsilon: explore (random action).
    With probability 1-epsilon: exploit (greedy action).
    q_values: (n_actions,) tensor for current state
    """
    if random.random() < epsilon:
        return torch.randint(len(q_values), (1,)).item()
    return q_values.argmax().item()


# Epsilon decay schedule: linearly anneal over training
def epsilon_schedule(step, total_steps, eps_start=1.0, eps_end=0.01):
    """Linearly decay epsilon from eps_start to eps_end."""
    progress = step / total_steps
    return max(eps_end, eps_start - progress * (eps_start - eps_end))


# Demo: how epsilon affects exploration
torch.manual_seed(42)
q_values = torch.tensor([0.1, 0.5, 0.2, 0.8])  # Q-values for 4 actions
print("Q-values:", q_values.tolist())
print(f"Greedy action: {q_values.argmax().item()} (Q={q_values.max():.2f})")

for eps in [0.0, 0.1, 0.3, 0.8]:
    counts = [0] * 4
    for _ in range(10000):
        a = epsilon_greedy(q_values, eps)
        counts[a] += 1
    print(f"eps={eps:.1f}: action frequencies = {[f'{c/100:.1f}%' for c in counts]}")

Upper Confidence Bound (UCB)

import torch
import math


class UCBBandit:
    """
    Upper Confidence Bound action selection.
    Balances exploration and exploitation via uncertainty bonus.
    UCB1: a* = argmax Q(a) + c * sqrt(ln(t) / N(a))
    """

    def __init__(self, n_actions, c=2.0):
        self.n_actions = n_actions
        self.c = c
        self.q_estimates = torch.zeros(n_actions)  # Q-value estimates
        self.counts = torch.zeros(n_actions)        # Times each action taken
        self.t = 0  # Total steps

    def select_action(self):
        self.t += 1
        # Force initial exploration: try each action once
        if (self.counts == 0).any():
            return (self.counts == 0).nonzero(as_tuple=True)[0][0].item()
        # UCB score
        ucb_scores = self.q_estimates + self.c * torch.sqrt(
            torch.log(torch.tensor(float(self.t))) / self.counts
        )
        return ucb_scores.argmax().item()

    def update(self, action, reward):
        """Incremental mean update."""
        self.counts[action] += 1
        n = self.counts[action]
        self.q_estimates[action] += (reward - self.q_estimates[action]) / n


# Compare epsilon-greedy vs UCB on 5-armed bandit
torch.manual_seed(42)
true_means = torch.tensor([0.2, 0.5, 0.8, 0.3, 0.6])  # True action values
n_steps = 2000

# UCB agent
ucb_agent = UCBBandit(n_actions=5, c=2.0)
ucb_rewards = []
for _ in range(n_steps):
    a = ucb_agent.select_action()
    r = true_means[a] + torch.randn(1).item() * 0.1  # Noisy reward
    ucb_agent.update(a, r)
    ucb_rewards.append(r)

print(f"UCB — mean reward: {sum(ucb_rewards)/len(ucb_rewards):.4f}")
print(f"UCB — final Q estimates: {ucb_agent.q_estimates.round(decimals=3).tolist()}")
print(f"UCB — true values:       {true_means.tolist()}")
print(f"UCB — optimal action pulled: {ucb_agent.counts[2].item():.0f}/{n_steps} times")

Multi-Armed Bandit Benchmark

import torch
import random


torch.manual_seed(42)
true_means = torch.tensor([0.1, 0.3, 0.7, 0.4, 0.2])  # Arm 2 is best
n_steps = 1000
optimal_action = true_means.argmax().item()


def run_epsilon_greedy(eps, n_steps=1000, seed=42):
    random.seed(seed); torch.manual_seed(seed)
    q = torch.zeros(5); counts = torch.zeros(5); total = 0.0
    for t in range(n_steps):
        a = random.randint(0, 4) if random.random() < eps else q.argmax().item()
        r = true_means[a] + torch.randn(1).item() * 0.2
        counts[a] += 1
        q[a] += (r - q[a]) / counts[a]
        total += r
    return total / n_steps, (counts[optimal_action] / n_steps).item()


def run_ucb(c, n_steps=1000, seed=42):
    torch.manual_seed(seed)
    q = torch.zeros(5); counts = torch.zeros(5); total = 0.0
    for t in range(1, n_steps + 1):
        if (counts == 0).any():
            a = (counts == 0).nonzero(as_tuple=True)[0][0].item()
        else:
            a = (q + c * (torch.log(torch.tensor(float(t))) / counts).sqrt()).argmax().item()
        r = true_means[a] + torch.randn(1).item() * 0.2
        counts[a] += 1
        q[a] += (r - q[a]) / counts[a]
        total += r
    return total / n_steps, (counts[optimal_action] / n_steps).item()


print(f"Optimal arm: {optimal_action} (true value: {true_means[optimal_action]:.2f})")
print(f"\nStrategy         | Mean Reward | Optimal Pull Rate")
for eps in [0.01, 0.1, 0.3]:
    mr, opr = run_epsilon_greedy(eps)
    print(f"Eps-greedy e={eps:.2f} |   {mr:.4f}    |   {opr*100:.1f}%")
for c in [0.5, 2.0, 5.0]:
    mr, opr = run_ucb(c)
    print(f"UCB c={c:.1f}         |   {mr:.4f}    |   {opr*100:.1f}%")