Websockets
integrated_app.py
import asyncio
import websockets
import json
import os
from pymongo import MongoClient
from dotenv import load_dotenv
from datetime import datetime
# 1. Setup Environment and MongoDB
load_dotenv()
MONGO_URI = os.getenv("MONGO_URI")
DB_NAME = os.getenv("DB_NAME", "music_db")
# Global variables
connected_clients = set()
def get_mongo_collection():
client = MongoClient(MONGO_URI)
db = client[DB_NAME]
return db["activity_logs"]
# Initialize collection
activity_logs = get_mongo_collection()
async def handle_client(websocket):
# Register client
connected_clients.add(websocket)
print(f"✅ New connection. Active listeners: {len(connected_clients)}")
try:
async for message in websocket:
try:
data = json.loads(message)
# Check for "LIKE" events
if data.get("type") == "LIKE":
track_id = data.get("track_id", "unknown")
user_id = data.get("user_id", "guest")
print(f"📩 Received LIKE for {track_id} from {user_id}")
# 2. PERSIST TO MONGODB
log_entry = {
"event": "LIKE",
"track_id": track_id,
"user_id": user_id,
"timestamp": datetime.now()
}
result = activity_logs.insert_one(log_entry)
print(f"💾 Saved to MongoDB index: {result.inserted_id}")
# 3. BROADCAST TO OTHERS
broadcast_msg = json.dumps({
"type": "NOTIFICATION",
"message": f"🔥 Someone just liked {track_id}!",
"total_likes_saved": activity_logs.count_documents({"track_id": track_id})
})
# Send to everyone else
if connected_clients:
await asyncio.gather(
*[client.send(broadcast_msg) for client in connected_clients],
return_exceptions=True
)
except json.JSONDecodeError:
await websocket.send(json.dumps({"error": "Invalid JSON format"}))
except websockets.exceptions.ConnectionClosed:
pass
finally:
connected_clients.remove(websocket)
print(f"❌ Connection closed. Active listeners: {len(connected_clients)}")
async def main():
print(f"🚀 Integrated WebSocket + MongoDB Server running on ws://localhost:8765")
print(f"📂 Logging events to MongoDB database: {DB_NAME}")
async with websockets.serve(handle_client, "localhost", 8765):
await asyncio.Future() # run forever
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nStopping server...")