Celery

Celery Learnings

Production-ready Celery task queue implementations covering worker lifecycle, task chaining, parallel groups, chords, beat scheduling, and RabbitMQ integration with Python.

What You Will Learn

  • How Celery decouples producers from workers via a broker (RabbitMQ)
  • Task lifecycle: PENDING → STARTED → PROGRESS → SUCCESS / FAILURE
  • Composing complex workflows with chains, groups, and chords
  • Recurring tasks with Celery Beat
  • Tuning workers for throughput and reliability

Prerequisites

  • Python 3.13+
  • RabbitMQ running via the shared message_brokers/ Docker Compose stack

Setup

cd celery_learnings
python3 -m venv venv
source venv/bin/activate       # Windows: venv\Scripts\activate
pip install -r requirements.txt

Start RabbitMQ

# From the message_brokers/ directory
docker compose up -d rabbitmq

RabbitMQ management UI: http://localhost:15672 (guest / guest)

Running the Labs

All scripts live in python/. Run them from the python/ directory:

cd celery_learnings/python

Terminal 1 — Start the worker

python celery_worker.py
# or equivalently:
# celery -A celery_tasks worker --loglevel=info --concurrency=2

Terminal 2 — Send tasks

python celery_producer.py

Terminal 3 (optional) — Start Beat scheduler

python celery_beat.py

Lab Implementation & Engineering Deep Dives

1. Task Definitions (python/celery_tasks.py)

  • Why: Central module defining the Celery app, broker/backend config, and all tasks.
  • What: add, multiply, slow_job (with progress states), transform (chain step), summarise (chord callback).
  • How: Uses bind=True for self-reference, rpc:// backend for results over RabbitMQ, acks_late=True for at-least-once delivery.

2. Worker Process (python/celery_worker.py)

  • Why: Demonstrates starting a worker programmatically.
  • What: Calls app.worker_main() with concurrency=2, showing how worker lifecycle is managed.
  • How: app.worker_main(argv=[...]) mirrors the celery worker CLI.

3. Task Producer (python/celery_producer.py)

  • Why: Shows every major task dispatch pattern in one runnable script.
  • What: Fire-and-forget, result retrieval, chains, groups, chords, countdown scheduling.
  • How: Uses .delay(), .apply_async(), chain(), group(), chord() from Celery.

4. Beat Scheduler (python/celery_beat.py)

  • Why: Shows periodic task execution without external cron.
  • What: Schedules add every 5s, slow_job every 15s, multiply every 60s.
  • How: app.conf.beat_schedule dict + app.start(["beat", "--loglevel=info"]).

📝 Lab Implementation & Scripts

python/celery_tasks.py

"""
celery_tasks.py — Celery application and all task definitions.

Requires RabbitMQ on localhost:5672 (guest/guest).
Start RabbitMQ from message_brokers/:
    docker compose up -d rabbitmq

This file is a shared module. Do not run directly.
Start worker:  python celery_worker.py
Send tasks:    python celery_producer.py
"""

import time
from celery import Celery
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

# ── Celery App ─────────────────────────────────────────────────────────────────
app = Celery(
    "celery_demo",
    broker="amqp://guest:guest@localhost:5672//",
    backend="rpc://",
)

app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    timezone="UTC",
    enable_utc=True,
    task_track_started=True,
    task_acks_late=True,          # Ack only after the task completes
    worker_prefetch_multiplier=1, # One task per worker slot at a time
)


# ── Basic Tasks ────────────────────────────────────────────────────────────────

@app.task(bind=True, name="tasks.add")
def add(self, x: int, y: int) -> int:
    """Add two numbers."""
    logger.info("add(%s, %s)", x, y)
    return x + y


@app.task(bind=True, name="tasks.multiply")
def multiply(self, x: int, y: int) -> int:
    """Multiply two numbers."""
    logger.info("multiply(%s, %s)", x, y)
    return x * y


# ── Long-Running Task with Progress States ─────────────────────────────────────

@app.task(bind=True, name="tasks.slow_job", max_retries=3, default_retry_delay=5)
def slow_job(self, job_id: str, duration: float = 2.0) -> dict:
    """
    Simulates a slow external job.
    Tracks PENDING → STARTED → PROGRESS → SUCCESS lifecycle.
    Retries automatically on failure (up to max_retries).
    """
    logger.info("slow_job start: job_id=%s, duration=%ss", job_id, duration)
    try:
        self.update_state(state="PROGRESS", meta={"job_id": job_id, "progress": 0})
        time.sleep(duration * 0.5)
        self.update_state(state="PROGRESS", meta={"job_id": job_id, "progress": 50})
        time.sleep(duration * 0.5)
        result = {"job_id": job_id, "status": "done", "duration": duration}
        logger.info("slow_job done: %s", result)
        return result
    except Exception as exc:
        logger.warning("slow_job retrying: %s", exc)
        raise self.retry(exc=exc)


# ── Pipeline / Composition Tasks ───────────────────────────────────────────────

@app.task(bind=True, name="tasks.transform")
def transform(self, value: int, op: str = "square") -> int:
    """
    Transforms a value. Designed as a composable chain step.
    op: 'square' | 'double' | 'negate'
    """
    ops = {
        "square": lambda v: v * v,
        "double": lambda v: v * 2,
        "negate": lambda v: -v,
    }
    result = ops.get(op, lambda v: v)(value)
    logger.info("transform %s(%s) = %s", op, value, result)
    return result


@app.task(bind=True, name="tasks.summarise")
def summarise(self, results: list) -> dict:
    """
    Aggregates a list of results.
    Used as the chord callback after a parallel group completes.
    """
    total = sum(results)
    logger.info("summarise: results=%s total=%s", results, total)
    return {"results": results, "total": total, "count": len(results)}


if __name__ == "__main__":
    print("This module defines tasks. Start the worker with:")
    print("  python celery_worker.py")
    print("  or: celery -A celery_tasks worker --loglevel=info")

python/celery_worker.py

"""
celery_worker.py — Starts the Celery worker process.

Prerequisites (from celery_learnings/ root):
    python3 -m venv venv
    source venv/bin/activate
    pip install -r requirements.txt

    Start RabbitMQ (from message_brokers/):
        docker compose up -d rabbitmq

Run (Terminal 1, from celery_learnings/python/):
    python celery_worker.py

Then (Terminal 2):
    python celery_producer.py
"""

import os
import sys

# Ensure the python/ directory is on sys.path so celery_tasks is importable.
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

from celery_tasks import app  # noqa: E402

if __name__ == "__main__":
    print("Starting Celery worker — consuming from RabbitMQ amqp://localhost:5672")
    print("Workers: 2 concurrent processes")
    print("Press Ctrl+C to stop.\n")
    app.worker_main(
        argv=[
            "worker",
            "--loglevel=info",
            "--concurrency=2",
            "--hostname=worker@%h",
        ]
    )

python/celery_producer.py

"""
celery_producer.py — Sends tasks to the Celery worker via RabbitMQ.

Demonstrates:
    1. Fire-and-forget (.delay)
    2. Synchronous result retrieval (.get)
    3. Task chaining  (add → square → double)
    4. Parallel groups  (N multiplications at once)
    5. Chords  (parallel group + summarise callback)
    6. Long-running task with progress polling
    7. Countdown (scheduled future execution)

Prerequisites:
    Worker must be running in Terminal 1: python celery_worker.py
    RabbitMQ must be running:            docker compose up -d rabbitmq

Run (Terminal 2, from celery_learnings/python/):
    python celery_producer.py
"""

import os
import sys
import time

sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

from celery import chain, chord, group  # noqa: E402
from celery_tasks import add, multiply, slow_job, summarise, transform  # noqa: E402

SEP = "─" * 60


def section(title: str) -> None:
    print(f"\n{SEP}\n  {title}\n{SEP}")


# ── 1. Fire-and-Forget ─────────────────────────────────────────────────────────

def demo_fire_and_forget() -> None:
    section("1. Fire-and-Forget  (.delay)")
    result = add.delay(10, 20)
    print(f"Task dispatched — ID: {result.id}")
    print(f"State before .get(): {result.state}")
    value = result.get(timeout=10)
    print(f"Result: 10 + 20 = {value}")


# ── 2. Synchronous Result ──────────────────────────────────────────────────────

def demo_sync_result() -> None:
    section("2. Synchronous Result  (.get)")
    result = multiply.delay(7, 6)
    value = result.get(timeout=10)
    print(f"7 × 6 = {value}")


# ── 3. Task Chain ──────────────────────────────────────────────────────────────

def demo_chain() -> None:
    section("3. Task Chain  add(3,4) → square → double")
    # add(3, 4) = 7 → square(7) = 49 → double(49) = 98
    pipeline = chain(
        add.s(3, 4),
        transform.s(op="square"),
        transform.s(op="double"),
    )
    result = pipeline.apply_async()
    value = result.get(timeout=15)
    print(f"Chain result: {value}  (expected 98)")


# ── 4. Parallel Group ──────────────────────────────────────────────────────────

def demo_group() -> None:
    section("4. Parallel Group  (4 multiplications in parallel)")
    tasks = group(multiply.s(i, i) for i in range(1, 5))
    results = tasks.apply_async().get(timeout=15)
    print(f"i×i for i in 1..4: {results}  (expected [1, 4, 9, 16])")


# ── 5. Chord ───────────────────────────────────────────────────────────────────

def demo_chord() -> None:
    section("5. Chord  (parallel group + summarise callback)")
    # Compute add(i, i*2) for i in 1..5, then aggregate
    header = group(add.s(i, i * 2) for i in range(1, 6))
    callback = summarise.s()
    result = chord(header)(callback)
    summary = result.get(timeout=20)
    print(f"Chord summary: {summary}")


# ── 6. Progress Tracking ───────────────────────────────────────────────────────

def demo_progress() -> None:
    section("6. Long-Running Task with Progress States")
    result = slow_job.delay("demo-job-001", duration=2.0)
    print(f"Task ID: {result.id}  — polling...")
    while not result.ready():
        state = result.state
        meta = result.info if isinstance(result.info, dict) else {}
        print(f"  State: {state:<12}  Meta: {meta}")
        time.sleep(0.4)
    print(f"Final result: {result.get(timeout=10)}")


# ── 7. Countdown ───────────────────────────────────────────────────────────────

def demo_countdown() -> None:
    section("7. Scheduled Execution  (countdown=3s)")
    result = add.apply_async((100, 200), countdown=3)
    print(f"Task scheduled 3s from now. ID: {result.id}")
    value = result.get(timeout=15)
    print(f"100 + 200 = {value}")


# ── Main ───────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    print("Celery Producer — dispatching tasks to RabbitMQ")
    print("Ensure the worker is running:  python celery_worker.py\n")

    demo_fire_and_forget()
    demo_sync_result()
    demo_chain()
    demo_group()
    demo_chord()
    demo_progress()
    demo_countdown()

    print(f"\n{SEP}\n  All demos complete.\n{SEP}")

python/celery_beat.py

"""
celery_beat.py — Celery Beat periodic task scheduler.

Beat reads a schedule and publishes tasks to RabbitMQ automatically.
The worker (celery_worker.py) must be running to execute those tasks.

Prerequisites:
    Worker running in Terminal 1: python celery_worker.py
    RabbitMQ running:             docker compose up -d rabbitmq

Run (Terminal 3, from celery_learnings/python/):
    python celery_beat.py

You can also run worker + beat together in one process:
    celery -A celery_tasks worker --beat --loglevel=info
"""

import os
import sys

sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

from celery_tasks import app  # noqa: E402

# ── Beat Schedule ──────────────────────────────────────────────────────────────
app.conf.beat_schedule = {
    # Simple arithmetic — fires every 5 seconds
    "add-every-5s": {
        "task": "tasks.add",
        "schedule": 5.0,
        "args": (1, 1),
    },
    # Slow job — fires every 15 seconds
    "slow-job-every-15s": {
        "task": "tasks.slow_job",
        "schedule": 15.0,
        "kwargs": {"job_id": "beat-job", "duration": 1.0},
    },
    # Multiply — fires every 60 seconds
    "multiply-every-60s": {
        "task": "tasks.multiply",
        "schedule": 60.0,
        "args": (3, 7),
    },
}

app.conf.timezone = "UTC"

if __name__ == "__main__":
    print("Starting Celery Beat scheduler...")
    print("Active schedule:")
    for name, entry in app.conf.beat_schedule.items():
        print(f"  {name:<25} task={entry['task']}, every={entry['schedule']}s")
    print("\nPress Ctrl+C to stop.\n")
    app.start(argv=["beat", "--loglevel=info"])