Celery
Celery Learnings
Production-ready Celery task queue implementations covering worker lifecycle, task chaining, parallel groups, chords, beat scheduling, and RabbitMQ integration with Python.
What You Will Learn
- How Celery decouples producers from workers via a broker (RabbitMQ)
- Task lifecycle: PENDING → STARTED → PROGRESS → SUCCESS / FAILURE
- Composing complex workflows with chains, groups, and chords
- Recurring tasks with Celery Beat
- Tuning workers for throughput and reliability
Prerequisites
- Python 3.13+
- RabbitMQ running via the shared
message_brokers/Docker Compose stack
Setup
cd celery_learnings
python3 -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install -r requirements.txtStart RabbitMQ
# From the message_brokers/ directory
docker compose up -d rabbitmqRabbitMQ management UI: http://localhost:15672 (guest / guest)
Running the Labs
All scripts live in python/. Run them from the python/ directory:
cd celery_learnings/pythonTerminal 1 — Start the worker
python celery_worker.py
# or equivalently:
# celery -A celery_tasks worker --loglevel=info --concurrency=2Terminal 2 — Send tasks
python celery_producer.pyTerminal 3 (optional) — Start Beat scheduler
python celery_beat.pyLab Implementation & Engineering Deep Dives
1. Task Definitions (python/celery_tasks.py)
- Why: Central module defining the Celery app, broker/backend config, and all tasks.
- What:
add,multiply,slow_job(with progress states),transform(chain step),summarise(chord callback). - How: Uses
bind=Truefor self-reference,rpc://backend for results over RabbitMQ,acks_late=Truefor at-least-once delivery.
2. Worker Process (python/celery_worker.py)
- Why: Demonstrates starting a worker programmatically.
- What: Calls
app.worker_main()with concurrency=2, showing how worker lifecycle is managed. - How:
app.worker_main(argv=[...])mirrors thecelery workerCLI.
3. Task Producer (python/celery_producer.py)
- Why: Shows every major task dispatch pattern in one runnable script.
- What: Fire-and-forget, result retrieval, chains, groups, chords, countdown scheduling.
- How: Uses
.delay(),.apply_async(),chain(),group(),chord()from Celery.
4. Beat Scheduler (python/celery_beat.py)
- Why: Shows periodic task execution without external cron.
- What: Schedules
addevery 5s,slow_jobevery 15s,multiplyevery 60s. - How:
app.conf.beat_scheduledict +app.start(["beat", "--loglevel=info"]).
📝 Lab Implementation & Scripts
python/celery_tasks.py
"""
celery_tasks.py — Celery application and all task definitions.
Requires RabbitMQ on localhost:5672 (guest/guest).
Start RabbitMQ from message_brokers/:
docker compose up -d rabbitmq
This file is a shared module. Do not run directly.
Start worker: python celery_worker.py
Send tasks: python celery_producer.py
"""
import time
from celery import Celery
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
# ── Celery App ─────────────────────────────────────────────────────────────────
app = Celery(
"celery_demo",
broker="amqp://guest:guest@localhost:5672//",
backend="rpc://",
)
app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
timezone="UTC",
enable_utc=True,
task_track_started=True,
task_acks_late=True, # Ack only after the task completes
worker_prefetch_multiplier=1, # One task per worker slot at a time
)
# ── Basic Tasks ────────────────────────────────────────────────────────────────
@app.task(bind=True, name="tasks.add")
def add(self, x: int, y: int) -> int:
"""Add two numbers."""
logger.info("add(%s, %s)", x, y)
return x + y
@app.task(bind=True, name="tasks.multiply")
def multiply(self, x: int, y: int) -> int:
"""Multiply two numbers."""
logger.info("multiply(%s, %s)", x, y)
return x * y
# ── Long-Running Task with Progress States ─────────────────────────────────────
@app.task(bind=True, name="tasks.slow_job", max_retries=3, default_retry_delay=5)
def slow_job(self, job_id: str, duration: float = 2.0) -> dict:
"""
Simulates a slow external job.
Tracks PENDING → STARTED → PROGRESS → SUCCESS lifecycle.
Retries automatically on failure (up to max_retries).
"""
logger.info("slow_job start: job_id=%s, duration=%ss", job_id, duration)
try:
self.update_state(state="PROGRESS", meta={"job_id": job_id, "progress": 0})
time.sleep(duration * 0.5)
self.update_state(state="PROGRESS", meta={"job_id": job_id, "progress": 50})
time.sleep(duration * 0.5)
result = {"job_id": job_id, "status": "done", "duration": duration}
logger.info("slow_job done: %s", result)
return result
except Exception as exc:
logger.warning("slow_job retrying: %s", exc)
raise self.retry(exc=exc)
# ── Pipeline / Composition Tasks ───────────────────────────────────────────────
@app.task(bind=True, name="tasks.transform")
def transform(self, value: int, op: str = "square") -> int:
"""
Transforms a value. Designed as a composable chain step.
op: 'square' | 'double' | 'negate'
"""
ops = {
"square": lambda v: v * v,
"double": lambda v: v * 2,
"negate": lambda v: -v,
}
result = ops.get(op, lambda v: v)(value)
logger.info("transform %s(%s) = %s", op, value, result)
return result
@app.task(bind=True, name="tasks.summarise")
def summarise(self, results: list) -> dict:
"""
Aggregates a list of results.
Used as the chord callback after a parallel group completes.
"""
total = sum(results)
logger.info("summarise: results=%s total=%s", results, total)
return {"results": results, "total": total, "count": len(results)}
if __name__ == "__main__":
print("This module defines tasks. Start the worker with:")
print(" python celery_worker.py")
print(" or: celery -A celery_tasks worker --loglevel=info")
python/celery_worker.py
"""
celery_worker.py — Starts the Celery worker process.
Prerequisites (from celery_learnings/ root):
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
Start RabbitMQ (from message_brokers/):
docker compose up -d rabbitmq
Run (Terminal 1, from celery_learnings/python/):
python celery_worker.py
Then (Terminal 2):
python celery_producer.py
"""
import os
import sys
# Ensure the python/ directory is on sys.path so celery_tasks is importable.
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from celery_tasks import app # noqa: E402
if __name__ == "__main__":
print("Starting Celery worker — consuming from RabbitMQ amqp://localhost:5672")
print("Workers: 2 concurrent processes")
print("Press Ctrl+C to stop.\n")
app.worker_main(
argv=[
"worker",
"--loglevel=info",
"--concurrency=2",
"--hostname=worker@%h",
]
)
python/celery_producer.py
"""
celery_producer.py — Sends tasks to the Celery worker via RabbitMQ.
Demonstrates:
1. Fire-and-forget (.delay)
2. Synchronous result retrieval (.get)
3. Task chaining (add → square → double)
4. Parallel groups (N multiplications at once)
5. Chords (parallel group + summarise callback)
6. Long-running task with progress polling
7. Countdown (scheduled future execution)
Prerequisites:
Worker must be running in Terminal 1: python celery_worker.py
RabbitMQ must be running: docker compose up -d rabbitmq
Run (Terminal 2, from celery_learnings/python/):
python celery_producer.py
"""
import os
import sys
import time
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from celery import chain, chord, group # noqa: E402
from celery_tasks import add, multiply, slow_job, summarise, transform # noqa: E402
SEP = "─" * 60
def section(title: str) -> None:
print(f"\n{SEP}\n {title}\n{SEP}")
# ── 1. Fire-and-Forget ─────────────────────────────────────────────────────────
def demo_fire_and_forget() -> None:
section("1. Fire-and-Forget (.delay)")
result = add.delay(10, 20)
print(f"Task dispatched — ID: {result.id}")
print(f"State before .get(): {result.state}")
value = result.get(timeout=10)
print(f"Result: 10 + 20 = {value}")
# ── 2. Synchronous Result ──────────────────────────────────────────────────────
def demo_sync_result() -> None:
section("2. Synchronous Result (.get)")
result = multiply.delay(7, 6)
value = result.get(timeout=10)
print(f"7 × 6 = {value}")
# ── 3. Task Chain ──────────────────────────────────────────────────────────────
def demo_chain() -> None:
section("3. Task Chain add(3,4) → square → double")
# add(3, 4) = 7 → square(7) = 49 → double(49) = 98
pipeline = chain(
add.s(3, 4),
transform.s(op="square"),
transform.s(op="double"),
)
result = pipeline.apply_async()
value = result.get(timeout=15)
print(f"Chain result: {value} (expected 98)")
# ── 4. Parallel Group ──────────────────────────────────────────────────────────
def demo_group() -> None:
section("4. Parallel Group (4 multiplications in parallel)")
tasks = group(multiply.s(i, i) for i in range(1, 5))
results = tasks.apply_async().get(timeout=15)
print(f"i×i for i in 1..4: {results} (expected [1, 4, 9, 16])")
# ── 5. Chord ───────────────────────────────────────────────────────────────────
def demo_chord() -> None:
section("5. Chord (parallel group + summarise callback)")
# Compute add(i, i*2) for i in 1..5, then aggregate
header = group(add.s(i, i * 2) for i in range(1, 6))
callback = summarise.s()
result = chord(header)(callback)
summary = result.get(timeout=20)
print(f"Chord summary: {summary}")
# ── 6. Progress Tracking ───────────────────────────────────────────────────────
def demo_progress() -> None:
section("6. Long-Running Task with Progress States")
result = slow_job.delay("demo-job-001", duration=2.0)
print(f"Task ID: {result.id} — polling...")
while not result.ready():
state = result.state
meta = result.info if isinstance(result.info, dict) else {}
print(f" State: {state:<12} Meta: {meta}")
time.sleep(0.4)
print(f"Final result: {result.get(timeout=10)}")
# ── 7. Countdown ───────────────────────────────────────────────────────────────
def demo_countdown() -> None:
section("7. Scheduled Execution (countdown=3s)")
result = add.apply_async((100, 200), countdown=3)
print(f"Task scheduled 3s from now. ID: {result.id}")
value = result.get(timeout=15)
print(f"100 + 200 = {value}")
# ── Main ───────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("Celery Producer — dispatching tasks to RabbitMQ")
print("Ensure the worker is running: python celery_worker.py\n")
demo_fire_and_forget()
demo_sync_result()
demo_chain()
demo_group()
demo_chord()
demo_progress()
demo_countdown()
print(f"\n{SEP}\n All demos complete.\n{SEP}")
python/celery_beat.py
"""
celery_beat.py — Celery Beat periodic task scheduler.
Beat reads a schedule and publishes tasks to RabbitMQ automatically.
The worker (celery_worker.py) must be running to execute those tasks.
Prerequisites:
Worker running in Terminal 1: python celery_worker.py
RabbitMQ running: docker compose up -d rabbitmq
Run (Terminal 3, from celery_learnings/python/):
python celery_beat.py
You can also run worker + beat together in one process:
celery -A celery_tasks worker --beat --loglevel=info
"""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from celery_tasks import app # noqa: E402
# ── Beat Schedule ──────────────────────────────────────────────────────────────
app.conf.beat_schedule = {
# Simple arithmetic — fires every 5 seconds
"add-every-5s": {
"task": "tasks.add",
"schedule": 5.0,
"args": (1, 1),
},
# Slow job — fires every 15 seconds
"slow-job-every-15s": {
"task": "tasks.slow_job",
"schedule": 15.0,
"kwargs": {"job_id": "beat-job", "duration": 1.0},
},
# Multiply — fires every 60 seconds
"multiply-every-60s": {
"task": "tasks.multiply",
"schedule": 60.0,
"args": (3, 7),
},
}
app.conf.timezone = "UTC"
if __name__ == "__main__":
print("Starting Celery Beat scheduler...")
print("Active schedule:")
for name, entry in app.conf.beat_schedule.items():
print(f" {name:<25} task={entry['task']}, every={entry['schedule']}s")
print("\nPress Ctrl+C to stop.\n")
app.start(argv=["beat", "--loglevel=info"])