Distributed consensus

Distributed Consensus: Raft in Practice

Hands-on Python implementations of the Raft consensus algorithm — starting from the core problem of keeping multiple servers in agreement, all the way to a working 5-node cluster that survives leader crashes and re-elects without losing a single committed entry.


The Problem We Are Solving

Imagine you are building a database that must stay available even if a server dies. The obvious answer is to run copies on multiple servers. But now you have a new problem: how do you keep all copies in sync?

flowchart LR Client -->|write x=5| A["Server A\n(x=5)"] Client -->|write x=5| B["Server B\n(x=5)"] Client -->|write x=5| C["Server C\n(x=5)"] A -. "what if B gets the\nwrite but A and C don't?" .- B

If a network hiccup means only Server B gets the write, you now have three servers that disagree on the value of x. Any server that answers a read might return a stale or wrong value. This is the distributed consensus problem: getting a group of servers to agree on a sequence of values, even when messages get lost, delayed, or servers crash.

Real systems that solve this problem are everywhere: etcd (stores Kubernetes cluster state), CockroachDB (distributed SQL), Consul (service discovery). They all use algorithms like Raft under the hood.


Why Is This Hard?

Three properties you want from a distributed system are in tension:

PropertyWhat It MeansTrade-off
ConsistencyAll servers see the same dataMay refuse requests during failures
AvailabilityEvery request gets a responseMay return stale data
Partition ToleranceWorks despite dropped messagesCan't have all three simultaneously

This is the CAP theorem: you can pick at most two. Raft picks Consistency + Partition Tolerance — it will stall rather than return wrong data.

On top of that, the FLP impossibility result (1985) proved mathematically that no algorithm can guarantee both safety (never wrong) and liveness (always makes progress) in an asynchronous network where nodes can crash. Every real consensus algorithm accepts this trade-off by using timeouts to detect failures and make progress.


What Raft Does

Raft solves consensus by electing one server as a Leader and routing all writes through it. The leader replicates every write to a majority of servers before confirming it. A majority is the key — if 3 out of 5 servers have a log entry, any future quorum of 3 will include at least one of them, so no committed entry is ever lost.

sequenceDiagram participant C as Client participant L as Leader (Node 0) participant F1 as Follower (Node 1) participant F2 as Follower (Node 2) C->>L: Write x=5 L->>F1: AppendEntries (x=5) L->>F2: AppendEntries (x=5) F1-->>L: OK F2-->>L: OK Note over L,F2: Majority confirmed → commit L-->>C: Write accepted

Every server can be in one of three roles at any time:

  • Follower — passive; accepts log entries from the leader, votes in elections.
  • Candidate — trying to become leader; requests votes from peers.
  • Leader — handles all client writes; sends heartbeats to suppress new elections.
stateDiagram-v2 [*] --> Follower : start Follower --> Candidate : election timeout fires\n(no heartbeat received) Candidate --> Leader : receives votes from majority Candidate --> Follower : higher term seen\nor loses election Leader --> Follower : higher term seen\n(another leader elected)

What You Will Build

Script 1 — raft_node.py: A single Raft node state machine with no networking. You run it and watch it transition through all three roles, process vote requests, append log entries, and advance its commit index — all annotated with print output explaining why each step is happening.

Script 2 — raft_cluster.py: Five nodes connected by in-memory message queues (no real network needed). You watch a full election play out, three writes replicate to a majority, the leader crash-stop, a new election, and replication continuing — with split-brain confirmed absent at the end.


Learning Objectives

By the end of this lab, you will be able to:

  1. Explain why you cannot have both perfect safety and perfect liveness in a distributed system (FLP impossibility).
  2. Describe what a quorum is and why a majority specifically is required.
  3. Trace the three vote-granting rules Raft uses to ensure at most one leader per term.
  4. Explain how log entries are replicated and what "committed" means in Raft.
  5. Describe what happens during a network partition and why the minority side stalls instead of electing a new leader.

Architecture Overview

flowchart TD subgraph Cluster["5-Node Cluster (raft_cluster.py)"] MB["MessageBus\n(in-memory queues,\nconfigurable drop rate)"] N0["Node 0"] <--> MB N1["Node 1"] <--> MB N2["Node 2"] <--> MB N3["Node 3"] <--> MB N4["Node 4"] <--> MB end subgraph Node["Each Node (raft_node.py)"] SM["Role State Machine\nFollower / Candidate / Leader"] LOG["Replicated Log\n[ {term, idx, cmd} ... ]"] TIMERS["Timers\nelection timeout\nheartbeat interval"] SM --> LOG SM --> TIMERS end

Each ClusterNode thread runs a 10ms tick loop: process one inbound message from the bus, then check election and heartbeat timers. The protocol logic is entirely reactive — the same model production Raft implementations use with event loops.


Lab Implementation & Engineering Deep Dives

1. Raft Node State Machine (python/raft_node.py)

The core Raft state machine for a single node — no network required.

  • Why: Understanding the node in isolation is the prerequisite for understanding the cluster. Every correctness property in Raft is enforced at the individual node level: a node decides whether to grant a vote, whether to accept a log entry, and when to step down — independently of what other nodes think.
  • What: Implements RaftNode with all three roles (Follower, Candidate, Leader), the VoteRequest/VoteResponse RPC types, the AppendEntries RPC, and the commit index advancement logic. Running it standalone walks through every state transition with annotated output.
  • How: Uses Python dataclasses for immutable RPC payloads and a monotonic clock for election timeouts. The handle_vote_request method encodes all three vote-granting rules (term check, one-vote-per-term, log freshness) in ~20 lines — making the safety argument directly visible in code.

2. 5-Node Cluster Simulation (python/raft_cluster.py)

A full Raft cluster running in a single process via threads and in-memory queues.

  • Why: The cluster simulation is where the emergent behaviour of the protocol becomes visible. Individual node correctness is necessary but not sufficient — you need to see leader election, concurrent replication, and failure recovery play out across multiple nodes to build intuition for why the protocol is safe under adversarial conditions.
  • What: Spins up 5 ClusterNode threads sharing a MessageBus. The simulation drives through leader election, 3-entry log replication, a leader crash, re-election, and a final safety check confirming no split-brain.
  • How: The MessageBus supports configurable drop rates for simulating partitions. Each ClusterNode thread runs a 10ms tick loop: process one inbound message, then check election/heartbeat timers. This mirrors how production Raft implementations use event loops — the protocol logic is entirely reactive.

Setup & Running

cd distributed_consensus_learnings
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt

Run the single-node state machine demo

cd python
python raft_node.py

This is a good starting point. Read the output top to bottom — each line explains a decision the node is making.

Run the full cluster simulation (5 nodes)

cd python
python raft_cluster.py

The simulation will:

  1. Boot 5 nodes as followers.
  2. Elect the first leader via randomised timeouts.
  3. Replicate 3 log entries to a quorum.
  4. Crash the leader.
  5. Trigger re-election and confirm replication continues.
  6. Assert no split-brain: no two nodes committed different entries at the same index.

Key Concepts

Quorum: why majority specifically?

With 5 nodes, any two groups of 3 must share at least one member. That shared member has seen both decisions, so a future leader always inherits the full committed history. A group of 2 cannot form a quorum, so a minority partition stalls instead of electing its own leader.

Election safety: why can't two leaders exist at the same term?

Each node votes at most once per term. To win, a candidate needs votes from 3 out of 5 nodes. Since no node votes twice, two candidates cannot both reach 3 — one of them will fall short.

Log freshness: why can't a stale node become leader?

A voter rejects a candidate whose log is behind its own. Since a committed entry was acknowledged by a majority, any quorum of voters includes at least one node that has that entry — and that node will refuse to vote for a candidate missing it.

For a deeper dive into the theory, see docs/distributed_consensus_deep_dive.md.


📝 Lab Implementation & Scripts

python/raft_node.py

"""
raft_node.py — Single Raft Node State Machine

Demonstrates the core Raft state machine for a single node:
  - Three roles: Follower, Candidate, Leader
  - Election timeout and reset
  - Vote granting rules (term comparison + log freshness)
  - Log append and commit index advancement
  - Heartbeat emission (leader only)

Run:
    python raft_node.py
"""

from __future__ import annotations

import random
import time
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Optional


# ─── Data Structures ──────────────────────────────────────────────────────────

class Role(Enum):
    FOLLOWER = auto()
    CANDIDATE = auto()
    LEADER = auto()


@dataclass
class LogEntry:
    """A single entry in the Raft replicated log."""
    term: int
    index: int
    command: str

    def __repr__(self) -> str:
        return f"Entry(term={self.term}, idx={self.index}, cmd={self.command!r})"


@dataclass
class VoteRequest:
    """RequestVote RPC arguments."""
    candidate_id: int
    candidate_term: int
    last_log_index: int
    last_log_term: int


@dataclass
class VoteResponse:
    """RequestVote RPC reply."""
    voter_id: int
    term: int
    granted: bool


@dataclass
class AppendEntriesRequest:
    """AppendEntries RPC arguments (also used as heartbeat when entries=[])."""
    leader_id: int
    term: int
    prev_log_index: int
    prev_log_term: int
    entries: list[LogEntry]
    leader_commit: int


@dataclass
class AppendEntriesResponse:
    """AppendEntries RPC reply."""
    follower_id: int
    term: int
    success: bool
    match_index: int  # Highest log index the follower has matched


# ─── Raft Node ────────────────────────────────────────────────────────────────

class RaftNode:
    """
    A single Raft consensus node.

    Persistent state (survives restarts in production; initialised fresh here):
      current_term  — Latest term this node has seen
      voted_for     — Candidate this node voted for in current_term (or None)
      log           — Ordered list of LogEntry

    Volatile state:
      commit_index  — Highest log entry known to be committed
      last_applied  — Highest log entry applied to state machine

    Leader-only volatile state (reinitialised on each new election win):
      next_index    — For each follower: next log index to send them
      match_index   — For each follower: highest log index known to be replicated
    """

    ELECTION_TIMEOUT_MIN = 1.5   # seconds
    ELECTION_TIMEOUT_MAX = 3.0   # seconds
    HEARTBEAT_INTERVAL  = 0.5    # seconds

    def __init__(self, node_id: int, cluster_size: int = 5):
        self.node_id = node_id
        self.cluster_size = cluster_size
        self.quorum = (cluster_size // 2) + 1

        # Persistent state
        self.current_term: int = 0
        self.voted_for: Optional[int] = None
        self.log: list[LogEntry] = []

        # Volatile state
        self.commit_index: int = -1
        self.last_applied: int = -1
        self.role: Role = Role.FOLLOWER

        # Election timing
        self._reset_election_timeout()
        self._last_heartbeat = time.monotonic()

        # Leader-only state (populated when this node wins an election)
        self.next_index: dict[int, int] = {}
        self.match_index: dict[int, int] = {}

        print(f"  [Node {self.node_id}] Initialised as {self.role.name} — term={self.current_term}")

    # ── Timer helpers ──────────────────────────────────────────────────────

    def _reset_election_timeout(self) -> None:
        """Randomise the election timeout (Raft's key anti-collision mechanism)."""
        self._election_timeout = random.uniform(
            self.ELECTION_TIMEOUT_MIN,
            self.ELECTION_TIMEOUT_MAX,
        )
        self._election_deadline = time.monotonic() + self._election_timeout

    def election_timeout_elapsed(self) -> bool:
        return time.monotonic() >= self._election_deadline

    # ── Role transitions ───────────────────────────────────────────────────

    def become_candidate(self) -> VoteRequest:
        """
        Transition: Follower → Candidate.

        Steps per Raft paper §5.2:
          1. Increment current term.
          2. Vote for self.
          3. Reset election timer.
          4. Broadcast RequestVote RPCs.
        """
        self.current_term += 1
        self.voted_for = self.node_id
        self.role = Role.CANDIDATE
        self._reset_election_timeout()
        print(f"  [Node {self.node_id}] → CANDIDATE  (term={self.current_term})")

        return VoteRequest(
            candidate_id=self.node_id,
            candidate_term=self.current_term,
            last_log_index=len(self.log) - 1,
            last_log_term=self.log[-1].term if self.log else -1,
        )

    def become_leader(self) -> None:
        """
        Transition: Candidate → Leader.

        Reinitialise nextIndex and matchIndex for all peers.
        Immediately send an empty AppendEntries (heartbeat) to assert authority.
        """
        self.role = Role.LEADER
        for peer in range(self.cluster_size):
            if peer != self.node_id:
                self.next_index[peer] = len(self.log)
                self.match_index[peer] = -1
        print(f"  [Node {self.node_id}] → LEADER     (term={self.current_term}) ✓")

    def step_down(self, new_term: int) -> None:
        """
        Revert to Follower when we observe a higher term.

        Any node — follower, candidate, or leader — must immediately revert
        to follower if it sees a term greater than its own (§5.1).
        """
        print(f"  [Node {self.node_id}] → FOLLOWER   (term {self.current_term}{new_term}, stepping down)")
        self.current_term = new_term
        self.voted_for = None
        self.role = Role.FOLLOWER
        self._reset_election_timeout()

    # ── Vote handling ──────────────────────────────────────────────────────

    def handle_vote_request(self, req: VoteRequest) -> VoteResponse:
        """
        Process a RequestVote RPC.

        Grant the vote only if ALL of the following hold (§5.2, §5.4):
          1. candidate_term >= current_term
          2. We haven't voted for someone else this term.
          3. The candidate's log is at least as up-to-date as ours
             (higher last log term, or equal term and at least as long a log).
        """
        # Rule 1: step down if we see a higher term
        if req.candidate_term > self.current_term:
            self.step_down(req.candidate_term)

        grant = False
        if req.candidate_term < self.current_term:
            # Stale candidate — decline
            pass
        elif self.voted_for in (None, req.candidate_id):
            # Check log freshness (§5.4.1)
            my_last_term  = self.log[-1].term if self.log else -1
            my_last_index = len(self.log) - 1
            log_ok = (
                req.last_log_term > my_last_term
                or (req.last_log_term == my_last_term and req.last_log_index >= my_last_index)
            )
            if log_ok:
                self.voted_for = req.candidate_id
                self._reset_election_timeout()
                grant = True

        result = "✓ granted" if grant else "✗ denied"
        print(f"  [Node {self.node_id}] Vote {result} → Node {req.candidate_id} "
              f"(term={req.candidate_term})")
        return VoteResponse(voter_id=self.node_id, term=self.current_term, granted=grant)

    def handle_vote_response(self, resp: VoteResponse, votes_received: set[int]) -> bool:
        """
        Tally an incoming VoteResponse.

        Returns True if this node just reached quorum and should become leader.
        """
        if resp.term > self.current_term:
            self.step_down(resp.term)
            return False

        if self.role != Role.CANDIDATE:
            return False

        if resp.granted:
            votes_received.add(resp.voter_id)
            print(f"  [Node {self.node_id}] Votes collected: {len(votes_received)}/{self.quorum} needed")
            if len(votes_received) >= self.quorum:
                return True

        return False

    # ── Log replication ────────────────────────────────────────────────────

    def append_entries(self, req: AppendEntriesRequest) -> AppendEntriesResponse:
        """
        Process an AppendEntries RPC (§5.3).

        Heartbeat:  req.entries == []  → just reset timer and update commit.
        Replication: req.entries != [] → validate consistency, append new entries.
        """
        # Reject stale leaders
        if req.term < self.current_term:
            return AppendEntriesResponse(
                follower_id=self.node_id,
                term=self.current_term,
                success=False,
                match_index=-1,
            )

        # Valid leader — reset election timeout, step down if needed
        if req.term > self.current_term:
            self.step_down(req.term)
        elif self.role == Role.CANDIDATE:
            # Another node won — become follower
            self.role = Role.FOLLOWER
            print(f"  [Node {self.node_id}] → FOLLOWER   (another leader elected, term={req.term})")

        self._reset_election_timeout()

        # Log consistency check (§5.3): does our log contain an entry at
        # prev_log_index whose term matches prev_log_term?
        if req.prev_log_index >= 0:
            if len(self.log) <= req.prev_log_index:
                # We're missing entries — tell leader to back up
                return AppendEntriesResponse(
                    follower_id=self.node_id,
                    term=self.current_term,
                    success=False,
                    match_index=len(self.log) - 1,
                )
            if self.log[req.prev_log_index].term != req.prev_log_term:
                # Conflict — delete this entry and everything after it
                self.log = self.log[:req.prev_log_index]
                return AppendEntriesResponse(
                    follower_id=self.node_id,
                    term=self.current_term,
                    success=False,
                    match_index=len(self.log) - 1,
                )

        # Append any new entries (dedup entries we already have)
        for entry in req.entries:
            if entry.index < len(self.log):
                if self.log[entry.index].term != entry.term:
                    self.log = self.log[:entry.index]  # Conflict — truncate
                    self.log.append(entry)
            else:
                self.log.append(entry)

        # Advance commit index
        if req.leader_commit > self.commit_index:
            self.commit_index = min(req.leader_commit, len(self.log) - 1)
            self._apply_committed_entries()

        return AppendEntriesResponse(
            follower_id=self.node_id,
            term=self.current_term,
            success=True,
            match_index=len(self.log) - 1,
        )

    def leader_receive_client_command(self, command: str) -> Optional[LogEntry]:
        """
        Accept a command from a client (leader-only, §5.3).

        Appends to the local log. The entry becomes committed once
        a quorum of followers acknowledge it via AppendEntries.
        """
        if self.role != Role.LEADER:
            print(f"  [Node {self.node_id}] Rejected client command — not the leader")
            return None

        entry = LogEntry(
            term=self.current_term,
            index=len(self.log),
            command=command,
        )
        self.log.append(entry)
        print(f"  [Node {self.node_id}] Appended to local log: {entry}")
        return entry

    def leader_advance_commit_index(self, match_indices: list[int]) -> None:
        """
        Advance commit_index to the highest N such that a quorum of nodes
        have match_index >= N and log[N].term == current_term (§5.3, §5.4).
        """
        if self.role != Role.LEADER:
            return

        # Include our own last log index
        all_indices = sorted(match_indices + [len(self.log) - 1], reverse=True)

        for n in all_indices:
            if n <= self.commit_index:
                break
            if n < len(self.log) and self.log[n].term == self.current_term:
                # Count how many nodes (including self) have this entry
                replicated = 1 + sum(1 for idx in match_indices if idx >= n)
                if replicated >= self.quorum:
                    print(f"  [Node {self.node_id}] Committed log index {n} "
                          f"(replicated on {replicated}/{self.cluster_size} nodes)")
                    self.commit_index = n
                    self._apply_committed_entries()
                    break

    def _apply_committed_entries(self) -> None:
        """Apply all committed but not yet applied entries to the state machine."""
        while self.last_applied < self.commit_index:
            self.last_applied += 1
            entry = self.log[self.last_applied]
            print(f"  [Node {self.node_id}] Applied to state machine: {entry}")

    # ── Heartbeat emission ─────────────────────────────────────────────────

    def build_heartbeat(self, follower_id: int) -> AppendEntriesRequest:
        """Build an empty AppendEntries (heartbeat) for a given follower."""
        next_idx = self.next_index.get(follower_id, len(self.log))
        prev_idx = next_idx - 1
        prev_term = self.log[prev_idx].term if prev_idx >= 0 and self.log else -1

        return AppendEntriesRequest(
            leader_id=self.node_id,
            term=self.current_term,
            prev_log_index=prev_idx,
            prev_log_term=prev_term,
            entries=[],
            leader_commit=self.commit_index,
        )

    def build_append_entries(self, follower_id: int) -> AppendEntriesRequest:
        """Build an AppendEntries RPC with any unsent log entries for a follower."""
        next_idx = self.next_index.get(follower_id, 0)
        prev_idx = next_idx - 1
        prev_term = self.log[prev_idx].term if prev_idx >= 0 and self.log else -1
        entries = self.log[next_idx:]

        return AppendEntriesRequest(
            leader_id=self.node_id,
            term=self.current_term,
            prev_log_index=prev_idx,
            prev_log_term=prev_term,
            entries=entries,
            leader_commit=self.commit_index,
        )

    def handle_append_entries_response(
        self, resp: AppendEntriesResponse, match_indices: list[int]
    ) -> None:
        """Process a follower's AppendEntries response (leader-only)."""
        if resp.term > self.current_term:
            self.step_down(resp.term)
            return

        if resp.success:
            self.match_index[resp.follower_id] = resp.match_index
            self.next_index[resp.follower_id] = resp.match_index + 1
            match_indices[resp.follower_id] = resp.match_index
            self.leader_advance_commit_index(
                [v for k, v in self.match_index.items()]
            )
        else:
            # Back off: retry with one earlier entry
            self.next_index[resp.follower_id] = max(0, resp.match_index)

    def __repr__(self) -> str:
        return (
            f"RaftNode(id={self.node_id}, role={self.role.name}, "
            f"term={self.current_term}, log_len={len(self.log)}, "
            f"commit={self.commit_index})"
        )


# ─── Single-Node State Machine Demo ───────────────────────────────────────────

def demo_state_machine() -> None:
    """
    Walk through the state transitions a single Raft node goes through,
    without needing a real network.  Useful for understanding the rules
    in isolation before looking at the full cluster simulation.
    """
    try:
        from colorama import Fore, Style, init
        init(autoreset=True)
        CYAN    = Fore.CYAN
        GREEN   = Fore.GREEN
        YELLOW  = Fore.YELLOW
        MAGENTA = Fore.MAGENTA
        RESET   = Style.RESET_ALL
    except ImportError:
        CYAN = GREEN = YELLOW = MAGENTA = RESET = ""

    def section(title: str) -> None:
        print(f"\n{CYAN}{'─' * 60}{RESET}")
        print(f"{CYAN}  {title}{RESET}")
        print(f"{CYAN}{'─' * 60}{RESET}")

    section("Phase 1: Node initialises as Follower")
    node = RaftNode(node_id=0, cluster_size=5)
    print(f"  State: {node}")

    section("Phase 2: Election timeout fires → node becomes Candidate")
    vote_req = node.become_candidate()
    print(f"  Broadcasting: {vote_req}")

    section("Phase 3: Node votes for itself and collects votes from quorum")
    votes: set[int] = {node.node_id}  # Self-vote
    # Simulate 2 more nodes granting their votes (quorum = 3 of 5)
    for peer_id in [1, 2]:
        fake_resp = VoteResponse(voter_id=peer_id, term=node.current_term, granted=True)
        won = node.handle_vote_response(fake_resp, votes)
        if won:
            node.become_leader()
            break

    section("Phase 4: Leader receives client commands and appends to log")
    commands = ["SET x=1", "SET y=2", "DELETE z"]
    for cmd in commands:
        node.leader_receive_client_command(cmd)

    section("Phase 5: Followers acknowledge — leader advances commit index")
    # Simulate 2 followers having replicated all 3 entries (indices 0, 1, 2)
    fake_match = {1: 2, 2: 2, 3: -1, 4: -1}
    node.match_index = fake_match
    node.leader_advance_commit_index(list(fake_match.values()))

    section("Phase 6: Leader sees higher term — steps down to Follower")
    node.step_down(new_term=5)
    print(f"  State: {node}")

    section("Phase 7: Node receives a vote request from a legitimate candidate")
    req = VoteRequest(
        candidate_id=3,
        candidate_term=5,
        last_log_index=5,
        last_log_term=5,
    )
    resp = node.handle_vote_request(req)
    print(f"  Response: {resp}")

    print(f"\n{GREEN}  ✓ Single-node state machine demo complete.{RESET}\n")


if __name__ == "__main__":
    demo_state_machine()

python/raft_cluster.py

"""
raft_cluster.py — 5-Node Raft Cluster Simulation

Simulates a full Raft cluster in a single process using threads and
in-memory message queues instead of real sockets. This lets you observe
all the consensus protocol mechanics without any network setup.

Demonstrates:
  1. All 5 nodes boot as Followers.
  2. A Leader is elected via randomised election timeouts.
  3. Three client commands are replicated to a quorum and committed.
  4. The leader is "crashed" (thread stopped).
  5. A new election takes place and a new leader is elected.
  6. Replication continues under the new leader.

Run:
    python raft_cluster.py
"""

from __future__ import annotations

import queue
import random
import threading
import time
from dataclasses import dataclass
from typing import Optional

from raft_node import (
    AppendEntriesRequest,
    AppendEntriesResponse,
    LogEntry,
    RaftNode,
    Role,
    VoteRequest,
    VoteResponse,
)

try:
    from colorama import Fore, Style, init as colorama_init
    colorama_init(autoreset=True)
    C = {
        "leader":    Fore.GREEN,
        "candidate": Fore.YELLOW,
        "follower":  Fore.CYAN,
        "error":     Fore.RED,
        "section":   Fore.MAGENTA,
        "dim":       Style.DIM,
        "reset":     Style.RESET_ALL,
    }
except ImportError:
    C = {k: "" for k in ["leader", "candidate", "follower", "error", "section", "dim", "reset"]}


# ─── Message Bus ──────────────────────────────────────────────────────────────

@dataclass
class Message:
    """A network message between two nodes."""
    sender: int
    receiver: int
    payload: object  # VoteRequest | VoteResponse | AppendEntriesRequest | AppendEntriesResponse


class MessageBus:
    """
    In-process message bus that simulates an unreliable network.

    Supports:
      - Point-to-point delivery via per-node queues.
      - Configurable packet drop rate (to simulate network partitions).
      - Configurable latency jitter.
    """

    def __init__(self, node_count: int, drop_rate: float = 0.0):
        self._queues: dict[int, queue.Queue] = {
            i: queue.Queue() for i in range(node_count)
        }
        self.drop_rate = drop_rate
        self._lock = threading.Lock()

    def send(self, msg: Message) -> None:
        if random.random() < self.drop_rate:
            return  # Silently drop
        self._queues[msg.receiver].put(msg)

    def broadcast(self, sender: int, payload: object, exclude: set[int] | None = None) -> None:
        for node_id, q in self._queues.items():
            if node_id == sender:
                continue
            if exclude and node_id in exclude:
                continue
            self.send(Message(sender=sender, receiver=node_id, payload=payload))

    def receive(self, node_id: int, timeout: float = 0.05) -> Optional[Message]:
        try:
            return self._queues[node_id].get(timeout=timeout)
        except queue.Empty:
            return None

    def partition(self, isolated_nodes: set[int]) -> None:
        """
        Simulate a network partition by setting a very high drop rate for
        messages to/from the isolated nodes.  (Simplified: just clears their
        queues and blocks new deliveries via the drop_rate mechanism.)
        """
        for node_id in isolated_nodes:
            # Drain queued messages so the partitioned node can't see them
            while not self._queues[node_id].empty():
                try:
                    self._queues[node_id].get_nowait()
                except queue.Empty:
                    break

    def heal(self) -> None:
        self.drop_rate = 0.0


# ─── Cluster Node Thread ───────────────────────────────────────────────────────

class ClusterNode(threading.Thread):
    """
    Runs a RaftNode in its own thread, polling for messages and driving
    the election/heartbeat timers.
    """

    def __init__(self, node_id: int, cluster_size: int, bus: MessageBus):
        super().__init__(name=f"Node-{node_id}", daemon=True)
        self.node = RaftNode(node_id=node_id, cluster_size=cluster_size)
        self.bus = bus
        self._stop_event = threading.Event()
        self._votes_received: set[int] = set()

    @property
    def node_id(self) -> int:
        return self.node.node_id

    def stop(self) -> None:
        self._stop_event.set()

    def run(self) -> None:
        while not self._stop_event.is_set():
            self._process_messages()
            self._check_timers()
            time.sleep(0.01)  # 10 ms tick

    def _process_messages(self) -> None:
        msg = self.bus.receive(self.node_id, timeout=0.01)
        if msg is None:
            return

        payload = msg.payload

        if isinstance(payload, VoteRequest):
            resp = self.node.handle_vote_request(payload)
            self.bus.send(Message(sender=self.node_id, receiver=msg.sender, payload=resp))

        elif isinstance(payload, VoteResponse):
            won = self.node.handle_vote_response(payload, self._votes_received)
            if won:
                self.node.become_leader()
                self._votes_received.clear()
                # Immediately broadcast heartbeats to assert leadership
                self._send_heartbeats()

        elif isinstance(payload, AppendEntriesRequest):
            resp = self.node.append_entries(payload)
            self.bus.send(Message(sender=self.node_id, receiver=msg.sender, payload=resp))

        elif isinstance(payload, AppendEntriesResponse):
            match_indices = list(self.node.match_index.values())
            self.node.handle_append_entries_response(payload, match_indices)

    def _check_timers(self) -> None:
        if self.node.role == Role.LEADER:
            # Send heartbeats on schedule
            if time.monotonic() - self.node._last_heartbeat >= self.node.HEARTBEAT_INTERVAL:
                self._send_heartbeats()
                self.node._last_heartbeat = time.monotonic()
        else:
            # Follower/Candidate: check election timeout
            if self.node.election_timeout_elapsed():
                self._votes_received = {self.node_id}  # Vote for self
                vote_req = self.node.become_candidate()
                self.bus.broadcast(sender=self.node_id, payload=vote_req)

    def _send_heartbeats(self) -> None:
        for peer_id in range(self.node.cluster_size):
            if peer_id == self.node_id:
                continue
            rpc = self.node.build_append_entries(peer_id)
            self.bus.send(Message(sender=self.node_id, receiver=peer_id, payload=rpc))

    def submit_command(self, command: str) -> Optional[LogEntry]:
        """Submit a client command (only succeeds if this node is the leader)."""
        return self.node.leader_receive_client_command(command)

    def __repr__(self) -> str:
        return repr(self.node)


# ─── Cluster Orchestrator ──────────────────────────────────────────────────────

class RaftCluster:
    """Manages a set of ClusterNode threads and drives the simulation."""

    def __init__(self, size: int = 5):
        self.size = size
        self.bus = MessageBus(node_count=size)
        self.nodes: list[ClusterNode] = [
            ClusterNode(node_id=i, cluster_size=size, bus=self.bus)
            for i in range(size)
        ]

    def start(self) -> None:
        for node in self.nodes:
            node.start()

    def stop(self) -> None:
        for node in self.nodes:
            node.stop()

    def leader(self) -> Optional[ClusterNode]:
        leaders = [n for n in self.nodes if n.node.role == Role.LEADER and not n._stop_event.is_set()]
        return leaders[0] if leaders else None

    def wait_for_leader(self, timeout: float = 10.0) -> Optional[ClusterNode]:
        deadline = time.monotonic() + timeout
        while time.monotonic() < deadline:
            leader = self.leader()
            if leader:
                return leader
            time.sleep(0.1)
        return None

    def print_status(self) -> None:
        print(f"\n  {'─' * 55}")
        print(f"  {'NODE':<8} {'ROLE':<12} {'TERM':<8} {'LOG':>5} {'COMMIT':>7}")
        print(f"  {'─' * 55}")
        for cn in self.nodes:
            n = cn.node
            stopped = cn._stop_event.is_set()
            role_str = ("CRASHED" if stopped else n.role.name)
            color = (
                C["error"] if stopped else
                C["leader"] if n.role == Role.LEADER else
                C["candidate"] if n.role == Role.CANDIDATE else
                C["follower"]
            )
            print(f"  {color}Node {n.node_id:<3}  {role_str:<12} {n.current_term:<8} "
                  f"{len(n.log):>5} {n.commit_index:>7}{C['reset']}")
        print(f"  {'─' * 55}\n")

    def crash_leader(self) -> Optional[ClusterNode]:
        """Simulate the current leader crashing."""
        leader = self.leader()
        if leader:
            print(f"\n  {C['error']}⚡ CRASH: Node {leader.node_id} (leader, term={leader.node.current_term}) "
                  f"has failed!{C['reset']}\n")
            leader.stop()
            return leader
        return None


# ─── Main Simulation ───────────────────────────────────────────────────────────

def run_simulation() -> None:
    def banner(text: str) -> None:
        width = 62
        print(f"\n{C['section']}{'═' * width}{C['reset']}")
        print(f"{C['section']}{text:<{width - 2}}║{C['reset']}")
        print(f"{C['section']}{'═' * width}{C['reset']}\n")

    def step(text: str) -> None:
        print(f"\n{C['section']}{text}{C['reset']}\n")

    banner("Raft Cluster Simulation — 5 Nodes")

    # ── Phase 1: Boot ─────────────────────────────────────────────────────
    step("Phase 1: All nodes boot as Followers")
    cluster = RaftCluster(size=5)
    cluster.start()
    cluster.print_status()

    # ── Phase 2: First election ───────────────────────────────────────────
    step("Phase 2: Waiting for leader election (randomised timeouts)...")
    leader = cluster.wait_for_leader(timeout=12.0)

    if not leader:
        print(f"  {C['error']}No leader elected within timeout. Aborting.{C['reset']}")
        cluster.stop()
        return

    print(f"  {C['leader']}✓ Leader elected: Node {leader.node_id} "
          f"(term={leader.node.current_term}){C['reset']}")
    cluster.print_status()

    # ── Phase 3: Log replication ──────────────────────────────────────────
    step("Phase 3: Client submits 3 commands to the leader")
    commands = ["SET balance=1000", "TRANSFER 200 -> account_7", "SET balance=800"]
    for cmd in commands:
        entry = leader.submit_command(cmd)
        time.sleep(0.3)  # Allow replication round-trip

    time.sleep(1.0)  # Allow commit index to advance across followers
    cluster.print_status()

    # ── Phase 4: Leader crash ─────────────────────────────────────────────
    step("Phase 4: Leader failure — simulating node crash")
    old_leader_id = leader.node_id
    cluster.crash_leader()
    time.sleep(0.5)
    cluster.print_status()

    # ── Phase 5: Re-election ──────────────────────────────────────────────
    step("Phase 5: Remaining nodes detect leader failure and hold new election")
    new_leader = cluster.wait_for_leader(timeout=12.0)

    if not new_leader:
        print(f"  {C['error']}No new leader elected. "
              f"(Expected: quorum={cluster.size // 2 + 1}, alive={cluster.size - 1}){C['reset']}")
        cluster.stop()
        return

    print(f"  {C['leader']}✓ New leader elected: Node {new_leader.node_id} "
          f"(term={new_leader.node.current_term}){C['reset']}")
    cluster.print_status()

    # ── Phase 6: Continue replication under new leader ────────────────────
    step("Phase 6: Client submits 2 more commands to new leader")
    new_commands = ["AUDIT log entry", "CLOSE session"]
    for cmd in new_commands:
        new_leader.submit_command(cmd)
        time.sleep(0.3)

    time.sleep(1.0)
    cluster.print_status()

    # ── Phase 7: Safety check ─────────────────────────────────────────────
    step("Phase 7: Safety verification")
    live_nodes = [n for n in cluster.nodes if not n._stop_event.is_set()]
    committed = [n.node.commit_index for n in live_nodes]
    terms     = [n.node.current_term  for n in live_nodes]

    print(f"  Commit indices (live nodes): {committed}")
    print(f"  Current terms  (live nodes): {terms}")

    all_same_term = len(set(terms)) == 1
    no_split_brain = len([n for n in live_nodes if n.node.role == Role.LEADER]) <= 1

    if all_same_term and no_split_brain:
        print(f"\n  {C['leader']}✓ Safety: single leader, all nodes on the same term.{C['reset']}")
    else:
        print(f"\n  {C['error']}✗ Safety violation detected!{C['reset']}")

    cluster.stop()
    print(f"\n  {C['leader']}Simulation complete.{C['reset']}\n")


if __name__ == "__main__":
    run_simulation()