Mongodb
MongoDB with Python
This project contains advanced use cases and examples for using MongoDB with Python (pymongo).
Prerequisites
- Python 3.8+
- A MongoDB instance (e.g., Railway, Atlas, or Local)
pymongoandpython-dotenv
Music Industry Use Cases
- 01_music_crud.py: Tracking songs with ISRC codes, unique indexing, and atomic play-count increments (
$inc). - 02_music_aggregation.py: Building a Trending Dashboard. Calculate top genres and artist reach using
$groupand$addToSet. - 03_music_modeling.py: The Album vs. Playlist dilemma. Learn when to embed (Tracks in an Album) vs. reference (Songs in a Playlist).
- 04_music_bulk_ops.py: Mass-syncing Trending Scores from external APIs using
bulk_write.
Setup
- Create a Virtual Environment:
python3 -m venv venv source venv/bin/activate - Install dependencies:
pip install -r requirements.txt - Copy
.env.exampleto.envand fill in your Railway MongoDB URI. - Initialize the Database:
Run the setup script to create collections, indexes, and sample data automatically:python setup_db.py
📝 Lab Implementation & Scripts
01_music_crud.py
import os
from pymongo import MongoClient, ASCENDING
from dotenv import load_dotenv
load_dotenv()
MONGO_URI = os.getenv("MONGO_URI")
DB_NAME = os.getenv("DB_NAME", "music_db")
def get_db():
client = MongoClient(MONGO_URI)
return client[DB_NAME]
def setup_library():
db = get_db()
tracks = db["tracks"]
try:
# 1. Indexing: Unique ID for tracks (ISRC is the industry standard)
tracks.create_index([("isrc", ASCENDING)], unique=True)
# Index for fast searching by name/artist
tracks.create_index([("artist", ASCENDING), ("title", ASCENDING)])
print("✓ Indices verified/created")
except Exception as e:
print(f"⚠️ Note: Indices could not be created (likely disk space), but we'll continue: {e}")
return tracks
def track_management(tracks):
# CREATE: A new hit song
song = {
"title": "Midnight City",
"artist": "M83",
"genre": "Synthwave",
"isrc": "US-S1Z-11-00001",
"duration_sec": 243,
"play_count": 0,
"tags": ["electronic", "80s-vibe", "anthemic"]
}
try:
tracks.insert_one(song)
print(f"✓ Added track: {song['title']} by {song['artist']}")
except Exception as e:
print(f"✗ Failed to add track (likely duplicate ISRC): {e}")
# UPDATE: Increment play count (Atomic)
# Tip: Use $inc for high-frequency updates like "plays" or "likes"
tracks.update_one(
{"isrc": "US-S1Z-11-00001"},
{"$inc": {"play_count": 1}, "$set": {"last_played_at": "2024-03-23T10:00:00Z"}}
)
print("✓ Track play count incremented atomically")
if __name__ == "__main__":
if not MONGO_URI:
print("Please set MONGO_URI in .env")
else:
tracks_coll = setup_library()
track_management(tracks_coll)
02_music_aggregation.py
import os
from pymongo import MongoClient
from dotenv import load_dotenv
load_dotenv()
def run_music_analytics():
client = MongoClient(os.getenv("MONGO_URI"))
db = client[os.getenv("DB_NAME", "music_db")]
tracks = db["tracks"]
# Seed data for analytics if empty
if tracks.count_documents({}) <= 1:
tracks.insert_many([
{"title": "Starboy", "artist": "The Weeknd", "genre": "Pop", "play_count": 1500, "year": 2016},
{"title": "Blinding Lights", "artist": "The Weeknd", "genre": "Pop", "play_count": 3000, "year": 2019},
{"title": "Level of Concern", "artist": "Twenty One Pilots", "genre": "Indie", "play_count": 800, "year": 2020},
{"title": "Chlorine", "artist": "Twenty One Pilots", "genre": "Indie", "play_count": 1200, "year": 2018},
{"title": "One More Time", "artist": "Daft Punk", "genre": "Electronic", "play_count": 5000, "year": 2000},
])
# Interview Task: Find top genres by total play_count
pipeline = [
# 1. Filter tracks from 2000 onwards
{"$match": {"year": {"$gte": 2000}}},
# 2. Group by genre and sum plays
{
"$group": {
"_id": "$genre",
"total_plays": {"$sum": "$play_count"},
"unique_artists": {"$addToSet": "$artist"},
"avg_plays": {"$avg": "$play_count"}
}
},
# 3. Add a field for artist count
{"$addFields": {"artist_count": {"$size": "$unique_artists"}}},
# 4. Sort by most played
{"$sort": {"total_plays": -1}},
# 5. Clean up output
{"$project": {"_id": 0, "genre": "$_id", "total_plays": 1, "artist_count": 1}}
]
print("--- Top Music Genres by Play Count ---")
results = list(tracks.aggregate(pipeline))
for res in results:
print(f"Genre: {res['genre']} | Plays: {res['total_plays']} | Artists: {res['artist_count']}")
if __name__ == "__main__":
run_music_analytics()
03_music_modeling.py
import os
from pymongo import MongoClient
from dotenv import load_dotenv
load_dotenv()
def music_modeling_demo():
client = MongoClient(os.getenv("MONGO_URI"))
db = client[os.getenv("DB_NAME", "music_db")]
# CASE 1: EMBEDDING (The "Album" pattern)
# Tracks usually don't exist without an album, and albums have a fixed number of tracks.
# This is perfect for embedding.
album = {
"title": "Random Access Memories",
"artist": "Daft Punk",
"release_year": 2013,
"tracks": [
{"track_no": 1, "title": "Give Life Back to Music", "duration": 274},
{"track_no": 2, "title": "The Game of Love", "duration": 322},
{"track_no": 3, "title": "Giorgio by Moroder", "duration": 544}
]
}
db.albums.insert_one(album)
print("✓ Inserted Album with embedded tracks (Fast Reads!)")
# CASE 2: REFERENCING (The "Playlist" pattern)
# A playlist can have thousands of tracks, and tracks belong to many playlists.
# We reference the track IDs instead of embedding the whole song data.
# Let's assume we have track IDs from a central collection
track_ids = [db.tracks.find_one()["_id"]]
playlist = {
"name": "Summer Vibes 2024",
"user_id": "user_123",
"track_ids": track_ids # Only storing foreign keys
}
db.playlists.insert_one(playlist)
print("✓ Created Playlist with track references (Avoids duplication)")
# Join tracks into the playlist using $lookup
pipeline = [
{"$match": {"name": "Summer Vibes 2024"}},
{
"$lookup": {
"from": "tracks",
"localField": "track_ids",
"foreignField": "_id",
"as": "track_details"
}
}
]
playlist_full = list(db.playlists.aggregate(pipeline))[0]
print(f"✓ Expanded playlist '{playlist_full['name']}' using $lookup")
if __name__ == "__main__":
music_modeling_demo()
04_music_bulk_ops.py
import os
import time
from pymongo import MongoClient, UpdateOne
from dotenv import load_dotenv
load_dotenv()
def sync_trending_scores():
"""
Imagine we calculate 'trending scores' in a Python service
and need to update 1000s of tracks at once.
"""
client = MongoClient(os.getenv("MONGO_URI"))
db = client[os.getenv("DB_NAME", "music_db")]
tracks = db["tracks"]
# Mock data: A list of updates from an external API
external_updates = [
{"isrc": "US-S1Z-11-00001", "score": 98.5},
{"isrc": "US-WB1-16-00123", "score": 85.2},
{"isrc": "GB-AHT-20-00456", "score": 77.0},
]
# Bulk preparation
operations = []
for update in external_updates:
operations.append(
UpdateOne(
{"isrc": update["isrc"]},
{"$set": {"trending_score": update["score"], "last_sync": time.time()}},
upsert=True
)
)
if operations:
print(f"Syncing {len(operations)} trending scores in ONE network request...")
result = tracks.bulk_write(operations)
print(f"✓ Bulk Sync Complete: {result.upserted_count} new, {result.modified_count} updated.")
if __name__ == "__main__":
sync_trending_scores()