Mongodb

MongoDB with Python

This project contains advanced use cases and examples for using MongoDB with Python (pymongo).

Prerequisites

  • Python 3.8+
  • A MongoDB instance (e.g., Railway, Atlas, or Local)
  • pymongo and python-dotenv

Music Industry Use Cases

  1. 01_music_crud.py: Tracking songs with ISRC codes, unique indexing, and atomic play-count increments ($inc).
  2. 02_music_aggregation.py: Building a Trending Dashboard. Calculate top genres and artist reach using $group and $addToSet.
  3. 03_music_modeling.py: The Album vs. Playlist dilemma. Learn when to embed (Tracks in an Album) vs. reference (Songs in a Playlist).
  4. 04_music_bulk_ops.py: Mass-syncing Trending Scores from external APIs using bulk_write.

Setup

  1. Create a Virtual Environment:
    python3 -m venv venv
    source venv/bin/activate
  2. Install dependencies:
    pip install -r requirements.txt
  3. Copy .env.example to .env and fill in your Railway MongoDB URI.
  4. Initialize the Database:
    Run the setup script to create collections, indexes, and sample data automatically:
    python setup_db.py

📝 Lab Implementation & Scripts

01_music_crud.py

import os
from pymongo import MongoClient, ASCENDING
from dotenv import load_dotenv

load_dotenv()

MONGO_URI = os.getenv("MONGO_URI")
DB_NAME = os.getenv("DB_NAME", "music_db")

def get_db():
    client = MongoClient(MONGO_URI)
    return client[DB_NAME]

def setup_library():
    db = get_db()
    tracks = db["tracks"]
    
    try:
        # 1. Indexing: Unique ID for tracks (ISRC is the industry standard)
        tracks.create_index([("isrc", ASCENDING)], unique=True)
        # Index for fast searching by name/artist
        tracks.create_index([("artist", ASCENDING), ("title", ASCENDING)])
        print("✓ Indices verified/created")
    except Exception as e:
        print(f"⚠️ Note: Indices could not be created (likely disk space), but we'll continue: {e}")
        
    return tracks

def track_management(tracks):
    # CREATE: A new hit song
    song = {
        "title": "Midnight City",
        "artist": "M83",
        "genre": "Synthwave",
        "isrc": "US-S1Z-11-00001",
        "duration_sec": 243,
        "play_count": 0,
        "tags": ["electronic", "80s-vibe", "anthemic"]
    }
    
    try:
        tracks.insert_one(song)
        print(f"✓ Added track: {song['title']} by {song['artist']}")
    except Exception as e:
        print(f"✗ Failed to add track (likely duplicate ISRC): {e}")

    # UPDATE: Increment play count (Atomic)
    # Tip: Use $inc for high-frequency updates like "plays" or "likes"
    tracks.update_one(
        {"isrc": "US-S1Z-11-00001"},
        {"$inc": {"play_count": 1}, "$set": {"last_played_at": "2024-03-23T10:00:00Z"}}
    )
    print("✓ Track play count incremented atomically")

if __name__ == "__main__":
    if not MONGO_URI:
        print("Please set MONGO_URI in .env")
    else:
        tracks_coll = setup_library()
        track_management(tracks_coll)

02_music_aggregation.py

import os
from pymongo import MongoClient
from dotenv import load_dotenv

load_dotenv()

def run_music_analytics():
    client = MongoClient(os.getenv("MONGO_URI"))
    db = client[os.getenv("DB_NAME", "music_db")]
    tracks = db["tracks"]

    # Seed data for analytics if empty
    if tracks.count_documents({}) <= 1:
        tracks.insert_many([
            {"title": "Starboy", "artist": "The Weeknd", "genre": "Pop", "play_count": 1500, "year": 2016},
            {"title": "Blinding Lights", "artist": "The Weeknd", "genre": "Pop", "play_count": 3000, "year": 2019},
            {"title": "Level of Concern", "artist": "Twenty One Pilots", "genre": "Indie", "play_count": 800, "year": 2020},
            {"title": "Chlorine", "artist": "Twenty One Pilots", "genre": "Indie", "play_count": 1200, "year": 2018},
            {"title": "One More Time", "artist": "Daft Punk", "genre": "Electronic", "play_count": 5000, "year": 2000},
        ])

    # Interview Task: Find top genres by total play_count
    pipeline = [
        # 1. Filter tracks from 2000 onwards
        {"$match": {"year": {"$gte": 2000}}},
        
        # 2. Group by genre and sum plays
        {
            "$group": {
                "_id": "$genre",
                "total_plays": {"$sum": "$play_count"},
                "unique_artists": {"$addToSet": "$artist"},
                "avg_plays": {"$avg": "$play_count"}
            }
        },
        
        # 3. Add a field for artist count
        {"$addFields": {"artist_count": {"$size": "$unique_artists"}}},
        
        # 4. Sort by most played
        {"$sort": {"total_plays": -1}},
        
        # 5. Clean up output
        {"$project": {"_id": 0, "genre": "$_id", "total_plays": 1, "artist_count": 1}}
    ]

    print("--- Top Music Genres by Play Count ---")
    results = list(tracks.aggregate(pipeline))
    for res in results:
        print(f"Genre: {res['genre']} | Plays: {res['total_plays']} | Artists: {res['artist_count']}")

if __name__ == "__main__":
    run_music_analytics()

03_music_modeling.py

import os
from pymongo import MongoClient
from dotenv import load_dotenv

load_dotenv()

def music_modeling_demo():
    client = MongoClient(os.getenv("MONGO_URI"))
    db = client[os.getenv("DB_NAME", "music_db")]
    
    # CASE 1: EMBEDDING (The "Album" pattern)
    # Tracks usually don't exist without an album, and albums have a fixed number of tracks.
    # This is perfect for embedding.
    album = {
        "title": "Random Access Memories",
        "artist": "Daft Punk",
        "release_year": 2013,
        "tracks": [
            {"track_no": 1, "title": "Give Life Back to Music", "duration": 274},
            {"track_no": 2, "title": "The Game of Love", "duration": 322},
            {"track_no": 3, "title": "Giorgio by Moroder", "duration": 544}
        ]
    }
    db.albums.insert_one(album)
    print("✓ Inserted Album with embedded tracks (Fast Reads!)")

    # CASE 2: REFERENCING (The "Playlist" pattern)
    # A playlist can have thousands of tracks, and tracks belong to many playlists.
    # We reference the track IDs instead of embedding the whole song data.
    
    # Let's assume we have track IDs from a central collection
    track_ids = [db.tracks.find_one()["_id"]] 
    
    playlist = {
        "name": "Summer Vibes 2024",
        "user_id": "user_123",
        "track_ids": track_ids # Only storing foreign keys
    }
    db.playlists.insert_one(playlist)
    print("✓ Created Playlist with track references (Avoids duplication)")

    # Join tracks into the playlist using $lookup
    pipeline = [
        {"$match": {"name": "Summer Vibes 2024"}},
        {
            "$lookup": {
                "from": "tracks",
                "localField": "track_ids",
                "foreignField": "_id",
                "as": "track_details"
            }
        }
    ]
    playlist_full = list(db.playlists.aggregate(pipeline))[0]
    print(f"✓ Expanded playlist '{playlist_full['name']}' using $lookup")

if __name__ == "__main__":
    music_modeling_demo()

04_music_bulk_ops.py

import os
import time
from pymongo import MongoClient, UpdateOne
from dotenv import load_dotenv

load_dotenv()

def sync_trending_scores():
    """
    Imagine we calculate 'trending scores' in a Python service 
    and need to update 1000s of tracks at once.
    """
    client = MongoClient(os.getenv("MONGO_URI"))
    db = client[os.getenv("DB_NAME", "music_db")]
    tracks = db["tracks"]
    
    # Mock data: A list of updates from an external API
    external_updates = [
        {"isrc": "US-S1Z-11-00001", "score": 98.5},
        {"isrc": "US-WB1-16-00123", "score": 85.2},
        {"isrc": "GB-AHT-20-00456", "score": 77.0},
    ]
    
    # Bulk preparation
    operations = []
    for update in external_updates:
        operations.append(
            UpdateOne(
                {"isrc": update["isrc"]},
                {"$set": {"trending_score": update["score"], "last_sync": time.time()}},
                upsert=True
            )
        )
    
    if operations:
        print(f"Syncing {len(operations)} trending scores in ONE network request...")
        result = tracks.bulk_write(operations)
        print(f"✓ Bulk Sync Complete: {result.upserted_count} new, {result.modified_count} updated.")

if __name__ == "__main__":
    sync_trending_scores()