Distributed Consensus: Raft in Practice
Hands-on Python implementations of the Raft consensus algorithm — starting from the core problem of keeping multiple servers in agreement, all the way to a working 5-node cluster that survives leader crashes and re-elects without losing a single committed entry.
The Problem We Are Solving
Imagine you are building a database that must stay available even if a server dies. The obvious answer is to run copies on multiple servers. But now you have a new problem: how do you keep all copies in sync?
If a network hiccup means only Server B gets the write, you now have three servers that disagree on the value of x. Any server that answers a read might return a stale or wrong value. This is the distributed consensus problem: getting a group of servers to agree on a sequence of values, even when messages get lost, delayed, or servers crash.
Real systems that solve this problem are everywhere: etcd (stores Kubernetes cluster state), CockroachDB (distributed SQL), Consul (service discovery). They all use algorithms like Raft under the hood.
Why Is This Hard?
Three properties you want from a distributed system are in tension:
| Property | What It Means | Trade-off |
|---|---|---|
| Consistency | All servers see the same data | May refuse requests during failures |
| Availability | Every request gets a response | May return stale data |
| Partition Tolerance | Works despite dropped messages | Can't have all three simultaneously |
This is the CAP theorem: you can pick at most two. Raft picks Consistency + Partition Tolerance — it will stall rather than return wrong data.
On top of that, the FLP impossibility result (1985) proved mathematically that no algorithm can guarantee both safety (never wrong) and liveness (always makes progress) in an asynchronous network where nodes can crash. Every real consensus algorithm accepts this trade-off by using timeouts to detect failures and make progress.
What Raft Does
Raft solves consensus by electing one server as a Leader and routing all writes through it. The leader replicates every write to a majority of servers before confirming it. A majority is the key — if 3 out of 5 servers have a log entry, any future quorum of 3 will include at least one of them, so no committed entry is ever lost.
Every server can be in one of three roles at any time:
- Follower — passive; accepts log entries from the leader, votes in elections.
- Candidate — trying to become leader; requests votes from peers.
- Leader — handles all client writes; sends heartbeats to suppress new elections.
What You Will Build
Script 1 — raft_node.py: A single Raft node state machine with no networking. You run it and watch it transition through all three roles, process vote requests, append log entries, and advance its commit index — all annotated with print output explaining why each step is happening.
Script 2 — raft_cluster.py: Five nodes connected by in-memory message queues (no real network needed). You watch a full election play out, three writes replicate to a majority, the leader crash-stop, a new election, and replication continuing — with split-brain confirmed absent at the end.
Learning Objectives
By the end of this lab, you will be able to:
- Explain why you cannot have both perfect safety and perfect liveness in a distributed system (FLP impossibility).
- Describe what a quorum is and why a majority specifically is required.
- Trace the three vote-granting rules Raft uses to ensure at most one leader per term.
- Explain how log entries are replicated and what "committed" means in Raft.
- Describe what happens during a network partition and why the minority side stalls instead of electing a new leader.
Architecture Overview
Each ClusterNode thread runs a 10ms tick loop: process one inbound message from the bus, then check election and heartbeat timers. The protocol logic is entirely reactive — the same model production Raft implementations use with event loops.
Lab Implementation & Engineering Deep Dives
1. Raft Node State Machine (python/raft_node.py)
The core Raft state machine for a single node — no network required.
- Why: Understanding the node in isolation is the prerequisite for understanding the cluster. Every correctness property in Raft is enforced at the individual node level: a node decides whether to grant a vote, whether to accept a log entry, and when to step down — independently of what other nodes think.
- What: Implements
RaftNodewith all three roles (Follower, Candidate, Leader), theVoteRequest/VoteResponseRPC types, theAppendEntriesRPC, and the commit index advancement logic. Running it standalone walks through every state transition with annotated output. - How: Uses Python dataclasses for immutable RPC payloads and a monotonic clock for election timeouts. The
handle_vote_requestmethod encodes all three vote-granting rules (term check, one-vote-per-term, log freshness) in ~20 lines — making the safety argument directly visible in code.
2. 5-Node Cluster Simulation (python/raft_cluster.py)
A full Raft cluster running in a single process via threads and in-memory queues.
- Why: The cluster simulation is where the emergent behaviour of the protocol becomes visible. Individual node correctness is necessary but not sufficient — you need to see leader election, concurrent replication, and failure recovery play out across multiple nodes to build intuition for why the protocol is safe under adversarial conditions.
- What: Spins up 5
ClusterNodethreads sharing aMessageBus. The simulation drives through leader election, 3-entry log replication, a leader crash, re-election, and a final safety check confirming no split-brain. - How: The
MessageBussupports configurable drop rates for simulating partitions. EachClusterNodethread runs a 10ms tick loop: process one inbound message, then check election/heartbeat timers. This mirrors how production Raft implementations use event loops — the protocol logic is entirely reactive.
Setup & Running
cd distributed_consensus_learnings
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txtRun the single-node state machine demo
cd python
python raft_node.pyThis is a good starting point. Read the output top to bottom — each line explains a decision the node is making.
Run the full cluster simulation (5 nodes)
cd python
python raft_cluster.pyThe simulation will:
- Boot 5 nodes as followers.
- Elect the first leader via randomised timeouts.
- Replicate 3 log entries to a quorum.
- Crash the leader.
- Trigger re-election and confirm replication continues.
- Assert no split-brain: no two nodes committed different entries at the same index.
Key Concepts
Quorum: why majority specifically?
With 5 nodes, any two groups of 3 must share at least one member. That shared member has seen both decisions, so a future leader always inherits the full committed history. A group of 2 cannot form a quorum, so a minority partition stalls instead of electing its own leader.
Election safety: why can't two leaders exist at the same term?
Each node votes at most once per term. To win, a candidate needs votes from 3 out of 5 nodes. Since no node votes twice, two candidates cannot both reach 3 — one of them will fall short.
Log freshness: why can't a stale node become leader?
A voter rejects a candidate whose log is behind its own. Since a committed entry was acknowledged by a majority, any quorum of voters includes at least one node that has that entry — and that node will refuse to vote for a candidate missing it.
For a deeper dive into the theory, see docs/distributed_consensus_deep_dive.md.
📝 Lab Implementation & Scripts
python/raft_node.py
"""
raft_node.py — Single Raft Node State Machine
Demonstrates the core Raft state machine for a single node:
- Three roles: Follower, Candidate, Leader
- Election timeout and reset
- Vote granting rules (term comparison + log freshness)
- Log append and commit index advancement
- Heartbeat emission (leader only)
Run:
python raft_node.py
"""
from __future__ import annotations
import random
import time
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Optional
# ─── Data Structures ──────────────────────────────────────────────────────────
class Role(Enum):
FOLLOWER = auto()
CANDIDATE = auto()
LEADER = auto()
@dataclass
class LogEntry:
"""A single entry in the Raft replicated log."""
term: int
index: int
command: str
def __repr__(self) -> str:
return f"Entry(term={self.term}, idx={self.index}, cmd={self.command!r})"
@dataclass
class VoteRequest:
"""RequestVote RPC arguments."""
candidate_id: int
candidate_term: int
last_log_index: int
last_log_term: int
@dataclass
class VoteResponse:
"""RequestVote RPC reply."""
voter_id: int
term: int
granted: bool
@dataclass
class AppendEntriesRequest:
"""AppendEntries RPC arguments (also used as heartbeat when entries=[])."""
leader_id: int
term: int
prev_log_index: int
prev_log_term: int
entries: list[LogEntry]
leader_commit: int
@dataclass
class AppendEntriesResponse:
"""AppendEntries RPC reply."""
follower_id: int
term: int
success: bool
match_index: int # Highest log index the follower has matched
# ─── Raft Node ────────────────────────────────────────────────────────────────
class RaftNode:
"""
A single Raft consensus node.
Persistent state (survives restarts in production; initialised fresh here):
current_term — Latest term this node has seen
voted_for — Candidate this node voted for in current_term (or None)
log — Ordered list of LogEntry
Volatile state:
commit_index — Highest log entry known to be committed
last_applied — Highest log entry applied to state machine
Leader-only volatile state (reinitialised on each new election win):
next_index — For each follower: next log index to send them
match_index — For each follower: highest log index known to be replicated
"""
ELECTION_TIMEOUT_MIN = 1.5 # seconds
ELECTION_TIMEOUT_MAX = 3.0 # seconds
HEARTBEAT_INTERVAL = 0.5 # seconds
def __init__(self, node_id: int, cluster_size: int = 5):
self.node_id = node_id
self.cluster_size = cluster_size
self.quorum = (cluster_size // 2) + 1
# Persistent state
self.current_term: int = 0
self.voted_for: Optional[int] = None
self.log: list[LogEntry] = []
# Volatile state
self.commit_index: int = -1
self.last_applied: int = -1
self.role: Role = Role.FOLLOWER
# Election timing
self._reset_election_timeout()
self._last_heartbeat = time.monotonic()
# Leader-only state (populated when this node wins an election)
self.next_index: dict[int, int] = {}
self.match_index: dict[int, int] = {}
print(f" [Node {self.node_id}] Initialised as {self.role.name} — term={self.current_term}")
# ── Timer helpers ──────────────────────────────────────────────────────
def _reset_election_timeout(self) -> None:
"""Randomise the election timeout (Raft's key anti-collision mechanism)."""
self._election_timeout = random.uniform(
self.ELECTION_TIMEOUT_MIN,
self.ELECTION_TIMEOUT_MAX,
)
self._election_deadline = time.monotonic() + self._election_timeout
def election_timeout_elapsed(self) -> bool:
return time.monotonic() >= self._election_deadline
# ── Role transitions ───────────────────────────────────────────────────
def become_candidate(self) -> VoteRequest:
"""
Transition: Follower → Candidate.
Steps per Raft paper §5.2:
1. Increment current term.
2. Vote for self.
3. Reset election timer.
4. Broadcast RequestVote RPCs.
"""
self.current_term += 1
self.voted_for = self.node_id
self.role = Role.CANDIDATE
self._reset_election_timeout()
print(f" [Node {self.node_id}] → CANDIDATE (term={self.current_term})")
return VoteRequest(
candidate_id=self.node_id,
candidate_term=self.current_term,
last_log_index=len(self.log) - 1,
last_log_term=self.log[-1].term if self.log else -1,
)
def become_leader(self) -> None:
"""
Transition: Candidate → Leader.
Reinitialise nextIndex and matchIndex for all peers.
Immediately send an empty AppendEntries (heartbeat) to assert authority.
"""
self.role = Role.LEADER
for peer in range(self.cluster_size):
if peer != self.node_id:
self.next_index[peer] = len(self.log)
self.match_index[peer] = -1
print(f" [Node {self.node_id}] → LEADER (term={self.current_term}) ✓")
def step_down(self, new_term: int) -> None:
"""
Revert to Follower when we observe a higher term.
Any node — follower, candidate, or leader — must immediately revert
to follower if it sees a term greater than its own (§5.1).
"""
print(f" [Node {self.node_id}] → FOLLOWER (term {self.current_term}→{new_term}, stepping down)")
self.current_term = new_term
self.voted_for = None
self.role = Role.FOLLOWER
self._reset_election_timeout()
# ── Vote handling ──────────────────────────────────────────────────────
def handle_vote_request(self, req: VoteRequest) -> VoteResponse:
"""
Process a RequestVote RPC.
Grant the vote only if ALL of the following hold (§5.2, §5.4):
1. candidate_term >= current_term
2. We haven't voted for someone else this term.
3. The candidate's log is at least as up-to-date as ours
(higher last log term, or equal term and at least as long a log).
"""
# Rule 1: step down if we see a higher term
if req.candidate_term > self.current_term:
self.step_down(req.candidate_term)
grant = False
if req.candidate_term < self.current_term:
# Stale candidate — decline
pass
elif self.voted_for in (None, req.candidate_id):
# Check log freshness (§5.4.1)
my_last_term = self.log[-1].term if self.log else -1
my_last_index = len(self.log) - 1
log_ok = (
req.last_log_term > my_last_term
or (req.last_log_term == my_last_term and req.last_log_index >= my_last_index)
)
if log_ok:
self.voted_for = req.candidate_id
self._reset_election_timeout()
grant = True
result = "✓ granted" if grant else "✗ denied"
print(f" [Node {self.node_id}] Vote {result} → Node {req.candidate_id} "
f"(term={req.candidate_term})")
return VoteResponse(voter_id=self.node_id, term=self.current_term, granted=grant)
def handle_vote_response(self, resp: VoteResponse, votes_received: set[int]) -> bool:
"""
Tally an incoming VoteResponse.
Returns True if this node just reached quorum and should become leader.
"""
if resp.term > self.current_term:
self.step_down(resp.term)
return False
if self.role != Role.CANDIDATE:
return False
if resp.granted:
votes_received.add(resp.voter_id)
print(f" [Node {self.node_id}] Votes collected: {len(votes_received)}/{self.quorum} needed")
if len(votes_received) >= self.quorum:
return True
return False
# ── Log replication ────────────────────────────────────────────────────
def append_entries(self, req: AppendEntriesRequest) -> AppendEntriesResponse:
"""
Process an AppendEntries RPC (§5.3).
Heartbeat: req.entries == [] → just reset timer and update commit.
Replication: req.entries != [] → validate consistency, append new entries.
"""
# Reject stale leaders
if req.term < self.current_term:
return AppendEntriesResponse(
follower_id=self.node_id,
term=self.current_term,
success=False,
match_index=-1,
)
# Valid leader — reset election timeout, step down if needed
if req.term > self.current_term:
self.step_down(req.term)
elif self.role == Role.CANDIDATE:
# Another node won — become follower
self.role = Role.FOLLOWER
print(f" [Node {self.node_id}] → FOLLOWER (another leader elected, term={req.term})")
self._reset_election_timeout()
# Log consistency check (§5.3): does our log contain an entry at
# prev_log_index whose term matches prev_log_term?
if req.prev_log_index >= 0:
if len(self.log) <= req.prev_log_index:
# We're missing entries — tell leader to back up
return AppendEntriesResponse(
follower_id=self.node_id,
term=self.current_term,
success=False,
match_index=len(self.log) - 1,
)
if self.log[req.prev_log_index].term != req.prev_log_term:
# Conflict — delete this entry and everything after it
self.log = self.log[:req.prev_log_index]
return AppendEntriesResponse(
follower_id=self.node_id,
term=self.current_term,
success=False,
match_index=len(self.log) - 1,
)
# Append any new entries (dedup entries we already have)
for entry in req.entries:
if entry.index < len(self.log):
if self.log[entry.index].term != entry.term:
self.log = self.log[:entry.index] # Conflict — truncate
self.log.append(entry)
else:
self.log.append(entry)
# Advance commit index
if req.leader_commit > self.commit_index:
self.commit_index = min(req.leader_commit, len(self.log) - 1)
self._apply_committed_entries()
return AppendEntriesResponse(
follower_id=self.node_id,
term=self.current_term,
success=True,
match_index=len(self.log) - 1,
)
def leader_receive_client_command(self, command: str) -> Optional[LogEntry]:
"""
Accept a command from a client (leader-only, §5.3).
Appends to the local log. The entry becomes committed once
a quorum of followers acknowledge it via AppendEntries.
"""
if self.role != Role.LEADER:
print(f" [Node {self.node_id}] Rejected client command — not the leader")
return None
entry = LogEntry(
term=self.current_term,
index=len(self.log),
command=command,
)
self.log.append(entry)
print(f" [Node {self.node_id}] Appended to local log: {entry}")
return entry
def leader_advance_commit_index(self, match_indices: list[int]) -> None:
"""
Advance commit_index to the highest N such that a quorum of nodes
have match_index >= N and log[N].term == current_term (§5.3, §5.4).
"""
if self.role != Role.LEADER:
return
# Include our own last log index
all_indices = sorted(match_indices + [len(self.log) - 1], reverse=True)
for n in all_indices:
if n <= self.commit_index:
break
if n < len(self.log) and self.log[n].term == self.current_term:
# Count how many nodes (including self) have this entry
replicated = 1 + sum(1 for idx in match_indices if idx >= n)
if replicated >= self.quorum:
print(f" [Node {self.node_id}] Committed log index {n} "
f"(replicated on {replicated}/{self.cluster_size} nodes)")
self.commit_index = n
self._apply_committed_entries()
break
def _apply_committed_entries(self) -> None:
"""Apply all committed but not yet applied entries to the state machine."""
while self.last_applied < self.commit_index:
self.last_applied += 1
entry = self.log[self.last_applied]
print(f" [Node {self.node_id}] Applied to state machine: {entry}")
# ── Heartbeat emission ─────────────────────────────────────────────────
def build_heartbeat(self, follower_id: int) -> AppendEntriesRequest:
"""Build an empty AppendEntries (heartbeat) for a given follower."""
next_idx = self.next_index.get(follower_id, len(self.log))
prev_idx = next_idx - 1
prev_term = self.log[prev_idx].term if prev_idx >= 0 and self.log else -1
return AppendEntriesRequest(
leader_id=self.node_id,
term=self.current_term,
prev_log_index=prev_idx,
prev_log_term=prev_term,
entries=[],
leader_commit=self.commit_index,
)
def build_append_entries(self, follower_id: int) -> AppendEntriesRequest:
"""Build an AppendEntries RPC with any unsent log entries for a follower."""
next_idx = self.next_index.get(follower_id, 0)
prev_idx = next_idx - 1
prev_term = self.log[prev_idx].term if prev_idx >= 0 and self.log else -1
entries = self.log[next_idx:]
return AppendEntriesRequest(
leader_id=self.node_id,
term=self.current_term,
prev_log_index=prev_idx,
prev_log_term=prev_term,
entries=entries,
leader_commit=self.commit_index,
)
def handle_append_entries_response(
self, resp: AppendEntriesResponse, match_indices: list[int]
) -> None:
"""Process a follower's AppendEntries response (leader-only)."""
if resp.term > self.current_term:
self.step_down(resp.term)
return
if resp.success:
self.match_index[resp.follower_id] = resp.match_index
self.next_index[resp.follower_id] = resp.match_index + 1
match_indices[resp.follower_id] = resp.match_index
self.leader_advance_commit_index(
[v for k, v in self.match_index.items()]
)
else:
# Back off: retry with one earlier entry
self.next_index[resp.follower_id] = max(0, resp.match_index)
def __repr__(self) -> str:
return (
f"RaftNode(id={self.node_id}, role={self.role.name}, "
f"term={self.current_term}, log_len={len(self.log)}, "
f"commit={self.commit_index})"
)
# ─── Single-Node State Machine Demo ───────────────────────────────────────────
def demo_state_machine() -> None:
"""
Walk through the state transitions a single Raft node goes through,
without needing a real network. Useful for understanding the rules
in isolation before looking at the full cluster simulation.
"""
try:
from colorama import Fore, Style, init
init(autoreset=True)
CYAN = Fore.CYAN
GREEN = Fore.GREEN
YELLOW = Fore.YELLOW
MAGENTA = Fore.MAGENTA
RESET = Style.RESET_ALL
except ImportError:
CYAN = GREEN = YELLOW = MAGENTA = RESET = ""
def section(title: str) -> None:
print(f"\n{CYAN}{'─' * 60}{RESET}")
print(f"{CYAN} {title}{RESET}")
print(f"{CYAN}{'─' * 60}{RESET}")
section("Phase 1: Node initialises as Follower")
node = RaftNode(node_id=0, cluster_size=5)
print(f" State: {node}")
section("Phase 2: Election timeout fires → node becomes Candidate")
vote_req = node.become_candidate()
print(f" Broadcasting: {vote_req}")
section("Phase 3: Node votes for itself and collects votes from quorum")
votes: set[int] = {node.node_id} # Self-vote
# Simulate 2 more nodes granting their votes (quorum = 3 of 5)
for peer_id in [1, 2]:
fake_resp = VoteResponse(voter_id=peer_id, term=node.current_term, granted=True)
won = node.handle_vote_response(fake_resp, votes)
if won:
node.become_leader()
break
section("Phase 4: Leader receives client commands and appends to log")
commands = ["SET x=1", "SET y=2", "DELETE z"]
for cmd in commands:
node.leader_receive_client_command(cmd)
section("Phase 5: Followers acknowledge — leader advances commit index")
# Simulate 2 followers having replicated all 3 entries (indices 0, 1, 2)
fake_match = {1: 2, 2: 2, 3: -1, 4: -1}
node.match_index = fake_match
node.leader_advance_commit_index(list(fake_match.values()))
section("Phase 6: Leader sees higher term — steps down to Follower")
node.step_down(new_term=5)
print(f" State: {node}")
section("Phase 7: Node receives a vote request from a legitimate candidate")
req = VoteRequest(
candidate_id=3,
candidate_term=5,
last_log_index=5,
last_log_term=5,
)
resp = node.handle_vote_request(req)
print(f" Response: {resp}")
print(f"\n{GREEN} ✓ Single-node state machine demo complete.{RESET}\n")
if __name__ == "__main__":
demo_state_machine()
python/raft_cluster.py
"""
raft_cluster.py — 5-Node Raft Cluster Simulation
Simulates a full Raft cluster in a single process using threads and
in-memory message queues instead of real sockets. This lets you observe
all the consensus protocol mechanics without any network setup.
Demonstrates:
1. All 5 nodes boot as Followers.
2. A Leader is elected via randomised election timeouts.
3. Three client commands are replicated to a quorum and committed.
4. The leader is "crashed" (thread stopped).
5. A new election takes place and a new leader is elected.
6. Replication continues under the new leader.
Run:
python raft_cluster.py
"""
from __future__ import annotations
import queue
import random
import threading
import time
from dataclasses import dataclass
from typing import Optional
from raft_node import (
AppendEntriesRequest,
AppendEntriesResponse,
LogEntry,
RaftNode,
Role,
VoteRequest,
VoteResponse,
)
try:
from colorama import Fore, Style, init as colorama_init
colorama_init(autoreset=True)
C = {
"leader": Fore.GREEN,
"candidate": Fore.YELLOW,
"follower": Fore.CYAN,
"error": Fore.RED,
"section": Fore.MAGENTA,
"dim": Style.DIM,
"reset": Style.RESET_ALL,
}
except ImportError:
C = {k: "" for k in ["leader", "candidate", "follower", "error", "section", "dim", "reset"]}
# ─── Message Bus ──────────────────────────────────────────────────────────────
@dataclass
class Message:
"""A network message between two nodes."""
sender: int
receiver: int
payload: object # VoteRequest | VoteResponse | AppendEntriesRequest | AppendEntriesResponse
class MessageBus:
"""
In-process message bus that simulates an unreliable network.
Supports:
- Point-to-point delivery via per-node queues.
- Configurable packet drop rate (to simulate network partitions).
- Configurable latency jitter.
"""
def __init__(self, node_count: int, drop_rate: float = 0.0):
self._queues: dict[int, queue.Queue] = {
i: queue.Queue() for i in range(node_count)
}
self.drop_rate = drop_rate
self._lock = threading.Lock()
def send(self, msg: Message) -> None:
if random.random() < self.drop_rate:
return # Silently drop
self._queues[msg.receiver].put(msg)
def broadcast(self, sender: int, payload: object, exclude: set[int] | None = None) -> None:
for node_id, q in self._queues.items():
if node_id == sender:
continue
if exclude and node_id in exclude:
continue
self.send(Message(sender=sender, receiver=node_id, payload=payload))
def receive(self, node_id: int, timeout: float = 0.05) -> Optional[Message]:
try:
return self._queues[node_id].get(timeout=timeout)
except queue.Empty:
return None
def partition(self, isolated_nodes: set[int]) -> None:
"""
Simulate a network partition by setting a very high drop rate for
messages to/from the isolated nodes. (Simplified: just clears their
queues and blocks new deliveries via the drop_rate mechanism.)
"""
for node_id in isolated_nodes:
# Drain queued messages so the partitioned node can't see them
while not self._queues[node_id].empty():
try:
self._queues[node_id].get_nowait()
except queue.Empty:
break
def heal(self) -> None:
self.drop_rate = 0.0
# ─── Cluster Node Thread ───────────────────────────────────────────────────────
class ClusterNode(threading.Thread):
"""
Runs a RaftNode in its own thread, polling for messages and driving
the election/heartbeat timers.
"""
def __init__(self, node_id: int, cluster_size: int, bus: MessageBus):
super().__init__(name=f"Node-{node_id}", daemon=True)
self.node = RaftNode(node_id=node_id, cluster_size=cluster_size)
self.bus = bus
self._stop_event = threading.Event()
self._votes_received: set[int] = set()
@property
def node_id(self) -> int:
return self.node.node_id
def stop(self) -> None:
self._stop_event.set()
def run(self) -> None:
while not self._stop_event.is_set():
self._process_messages()
self._check_timers()
time.sleep(0.01) # 10 ms tick
def _process_messages(self) -> None:
msg = self.bus.receive(self.node_id, timeout=0.01)
if msg is None:
return
payload = msg.payload
if isinstance(payload, VoteRequest):
resp = self.node.handle_vote_request(payload)
self.bus.send(Message(sender=self.node_id, receiver=msg.sender, payload=resp))
elif isinstance(payload, VoteResponse):
won = self.node.handle_vote_response(payload, self._votes_received)
if won:
self.node.become_leader()
self._votes_received.clear()
# Immediately broadcast heartbeats to assert leadership
self._send_heartbeats()
elif isinstance(payload, AppendEntriesRequest):
resp = self.node.append_entries(payload)
self.bus.send(Message(sender=self.node_id, receiver=msg.sender, payload=resp))
elif isinstance(payload, AppendEntriesResponse):
match_indices = list(self.node.match_index.values())
self.node.handle_append_entries_response(payload, match_indices)
def _check_timers(self) -> None:
if self.node.role == Role.LEADER:
# Send heartbeats on schedule
if time.monotonic() - self.node._last_heartbeat >= self.node.HEARTBEAT_INTERVAL:
self._send_heartbeats()
self.node._last_heartbeat = time.monotonic()
else:
# Follower/Candidate: check election timeout
if self.node.election_timeout_elapsed():
self._votes_received = {self.node_id} # Vote for self
vote_req = self.node.become_candidate()
self.bus.broadcast(sender=self.node_id, payload=vote_req)
def _send_heartbeats(self) -> None:
for peer_id in range(self.node.cluster_size):
if peer_id == self.node_id:
continue
rpc = self.node.build_append_entries(peer_id)
self.bus.send(Message(sender=self.node_id, receiver=peer_id, payload=rpc))
def submit_command(self, command: str) -> Optional[LogEntry]:
"""Submit a client command (only succeeds if this node is the leader)."""
return self.node.leader_receive_client_command(command)
def __repr__(self) -> str:
return repr(self.node)
# ─── Cluster Orchestrator ──────────────────────────────────────────────────────
class RaftCluster:
"""Manages a set of ClusterNode threads and drives the simulation."""
def __init__(self, size: int = 5):
self.size = size
self.bus = MessageBus(node_count=size)
self.nodes: list[ClusterNode] = [
ClusterNode(node_id=i, cluster_size=size, bus=self.bus)
for i in range(size)
]
def start(self) -> None:
for node in self.nodes:
node.start()
def stop(self) -> None:
for node in self.nodes:
node.stop()
def leader(self) -> Optional[ClusterNode]:
leaders = [n for n in self.nodes if n.node.role == Role.LEADER and not n._stop_event.is_set()]
return leaders[0] if leaders else None
def wait_for_leader(self, timeout: float = 10.0) -> Optional[ClusterNode]:
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
leader = self.leader()
if leader:
return leader
time.sleep(0.1)
return None
def print_status(self) -> None:
print(f"\n {'─' * 55}")
print(f" {'NODE':<8} {'ROLE':<12} {'TERM':<8} {'LOG':>5} {'COMMIT':>7}")
print(f" {'─' * 55}")
for cn in self.nodes:
n = cn.node
stopped = cn._stop_event.is_set()
role_str = ("CRASHED" if stopped else n.role.name)
color = (
C["error"] if stopped else
C["leader"] if n.role == Role.LEADER else
C["candidate"] if n.role == Role.CANDIDATE else
C["follower"]
)
print(f" {color}Node {n.node_id:<3} {role_str:<12} {n.current_term:<8} "
f"{len(n.log):>5} {n.commit_index:>7}{C['reset']}")
print(f" {'─' * 55}\n")
def crash_leader(self) -> Optional[ClusterNode]:
"""Simulate the current leader crashing."""
leader = self.leader()
if leader:
print(f"\n {C['error']}⚡ CRASH: Node {leader.node_id} (leader, term={leader.node.current_term}) "
f"has failed!{C['reset']}\n")
leader.stop()
return leader
return None
# ─── Main Simulation ───────────────────────────────────────────────────────────
def run_simulation() -> None:
def banner(text: str) -> None:
width = 62
print(f"\n{C['section']}╔{'═' * width}╗{C['reset']}")
print(f"{C['section']}║ {text:<{width - 2}}║{C['reset']}")
print(f"{C['section']}╚{'═' * width}╝{C['reset']}\n")
def step(text: str) -> None:
print(f"\n{C['section']}◆ {text}{C['reset']}\n")
banner("Raft Cluster Simulation — 5 Nodes")
# ── Phase 1: Boot ─────────────────────────────────────────────────────
step("Phase 1: All nodes boot as Followers")
cluster = RaftCluster(size=5)
cluster.start()
cluster.print_status()
# ── Phase 2: First election ───────────────────────────────────────────
step("Phase 2: Waiting for leader election (randomised timeouts)...")
leader = cluster.wait_for_leader(timeout=12.0)
if not leader:
print(f" {C['error']}No leader elected within timeout. Aborting.{C['reset']}")
cluster.stop()
return
print(f" {C['leader']}✓ Leader elected: Node {leader.node_id} "
f"(term={leader.node.current_term}){C['reset']}")
cluster.print_status()
# ── Phase 3: Log replication ──────────────────────────────────────────
step("Phase 3: Client submits 3 commands to the leader")
commands = ["SET balance=1000", "TRANSFER 200 -> account_7", "SET balance=800"]
for cmd in commands:
entry = leader.submit_command(cmd)
time.sleep(0.3) # Allow replication round-trip
time.sleep(1.0) # Allow commit index to advance across followers
cluster.print_status()
# ── Phase 4: Leader crash ─────────────────────────────────────────────
step("Phase 4: Leader failure — simulating node crash")
old_leader_id = leader.node_id
cluster.crash_leader()
time.sleep(0.5)
cluster.print_status()
# ── Phase 5: Re-election ──────────────────────────────────────────────
step("Phase 5: Remaining nodes detect leader failure and hold new election")
new_leader = cluster.wait_for_leader(timeout=12.0)
if not new_leader:
print(f" {C['error']}No new leader elected. "
f"(Expected: quorum={cluster.size // 2 + 1}, alive={cluster.size - 1}){C['reset']}")
cluster.stop()
return
print(f" {C['leader']}✓ New leader elected: Node {new_leader.node_id} "
f"(term={new_leader.node.current_term}){C['reset']}")
cluster.print_status()
# ── Phase 6: Continue replication under new leader ────────────────────
step("Phase 6: Client submits 2 more commands to new leader")
new_commands = ["AUDIT log entry", "CLOSE session"]
for cmd in new_commands:
new_leader.submit_command(cmd)
time.sleep(0.3)
time.sleep(1.0)
cluster.print_status()
# ── Phase 7: Safety check ─────────────────────────────────────────────
step("Phase 7: Safety verification")
live_nodes = [n for n in cluster.nodes if not n._stop_event.is_set()]
committed = [n.node.commit_index for n in live_nodes]
terms = [n.node.current_term for n in live_nodes]
print(f" Commit indices (live nodes): {committed}")
print(f" Current terms (live nodes): {terms}")
all_same_term = len(set(terms)) == 1
no_split_brain = len([n for n in live_nodes if n.node.role == Role.LEADER]) <= 1
if all_same_term and no_split_brain:
print(f"\n {C['leader']}✓ Safety: single leader, all nodes on the same term.{C['reset']}")
else:
print(f"\n {C['error']}✗ Safety violation detected!{C['reset']}")
cluster.stop()
print(f"\n {C['leader']}Simulation complete.{C['reset']}\n")
if __name__ == "__main__":
run_simulation()