Websockets

WebSocket Programming in Python

WebSockets provide a full-duplex, persistent connection between a client and a server. Unlike HTTP (Request-Response), WebSockets allow the server to push data to the client at any time.

Key Concepts

  1. Handshake: WebSockets start as an HTTP request with an Upgrade header.
  2. Stateful: The server must keep track of all active connections.
  3. Concurrency: Because connections stay open, you MUST use asynchronous programming (asyncio in Python).
  4. Scaling: Scaling WebSockets is harder than HTTP because you can't just use a simple round-robin load balancer (clients need to stay connected to the same server, or you need a Pub/Sub like Redis).

Use Cases Covered

  1. server.py: Basic Echo Server.
  2. broadcast_server.py: Music "Live Ticker" (Server-to-Client push).
  3. integrated_app.py: Client sends data -> Server saves to MongoDB -> Server broadcasts to ALL clients.
  4. like_button_client.py: Simulates a user liking a song and receiving real-time notifications.
  5. public_echo_client.py: Test against online servers.
  6. public_json_client.py: Demonstrates how to send and parse structured JSON data over a WebSocket connection.

Setup

  1. Navigate to this folder: cd websocket_learnings
  2. Install dependencies: pip install -r requirements.txt
  3. Run the server in one terminal and the client in another.

📝 Lab Implementation & Scripts

server.py

import asyncio
import websockets

async def echo_handler(websocket):
    print(f"New client connected: {websocket.remote_address}")
    try:
        async for message in websocket:
            print(f"Received: {message}")
            response = f"Server Echo: {message}"
            await websocket.send(response)
    except websockets.exceptions.ConnectionClosed:
        print(f"Client disconnected: {websocket.remote_address}")

async def main():
    # Start the server on localhost:8765
    async with websockets.serve(echo_handler, "localhost", 8765):
        print("🚀 Socket Server started on ws://localhost:8765")
        await asyncio.Future()  # Run forever

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\nStopping server...")

broadcast_server.py

import asyncio
import websockets
import json
import random

# Track all connected clients
connected_clients = set()

async def broadcast_updates(websocket):
    # Register client
    connected_clients.add(websocket)
    print(f"Listener joined. Total listeners: {len(connected_clients)}")
    
    try:
        # Keep the connection open
        async for message in websocket:
            # If client sends something, we can handle it
            data = json.loads(message)
            print(f"Received request from client: {data}")
    except Exception:
        pass
    finally:
        # Unregister client
        connected_clients.remove(websocket)
        print(f"Listener left. Total listeners: {len(connected_clients)}")

async def simulate_song_changes():
    """Simulate a radio station changing songs every few seconds"""
    songs = [
        {"title": "Starboy", "artist": "The Weeknd"},
        {"title": "Midnight City", "artist": "M83"},
        {"title": "Blinding Lights", "artist": "The Weeknd"},
        {"title": "Chlorine", "artist": "Twenty One Pilots"}
    ]
    
    while True:
        if connected_clients:
            new_song = random.choice(songs)
            payload = json.dumps({
                "type": "SONG_CHANGE",
                "data": new_song,
                "timestamp": json.dumps(str(asyncio.get_event_loop().time()))
            })
            print(f"Broadcasting new song: {new_song['title']}")
            
            # Send to everyone!
            # Using asyncio.gather to send in parallel
            await asyncio.gather(
                *[client.send(payload) for client in connected_clients],
                return_exceptions=True
            )
        
        await asyncio.sleep(5) # Change song every 5 seconds

async def main():
    # Start the websocket server
    server = websockets.serve(broadcast_updates, "localhost", 8765)
    
    # Run the server and the simulator concurrently
    await asyncio.gather(server, simulate_song_changes())

if __name__ == "__main__":
    print("🚀 Music Live-Ticker Server started on ws://localhost:8765")
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        pass

integrated_app.py

import asyncio
import websockets
import json
import os
from pymongo import MongoClient
from dotenv import load_dotenv
from datetime import datetime

# 1. Setup Environment and MongoDB
load_dotenv()
MONGO_URI = os.getenv("MONGO_URI")
DB_NAME = os.getenv("DB_NAME", "music_db")

# Global variables
connected_clients = set()

def get_mongo_collection():
    client = MongoClient(MONGO_URI)
    db = client[DB_NAME]
    return db["activity_logs"]

# Initialize collection
activity_logs = get_mongo_collection()

async def handle_client(websocket):
    # Register client
    connected_clients.add(websocket)
    print(f"✅ New connection. Active listeners: {len(connected_clients)}")
    
    try:
        async for message in websocket:
            try:
                data = json.loads(message)
                
                # Check for "LIKE" events
                if data.get("type") == "LIKE":
                    track_id = data.get("track_id", "unknown")
                    user_id = data.get("user_id", "guest")
                    
                    print(f"📩 Received LIKE for {track_id} from {user_id}")
                    
                    # 2. PERSIST TO MONGODB
                    log_entry = {
                        "event": "LIKE",
                        "track_id": track_id,
                        "user_id": user_id,
                        "timestamp": datetime.now()
                    }
                    result = activity_logs.insert_one(log_entry)
                    print(f"💾 Saved to MongoDB index: {result.inserted_id}")
                    
                    # 3. BROADCAST TO OTHERS
                    broadcast_msg = json.dumps({
                        "type": "NOTIFICATION",
                        "message": f"🔥 Someone just liked {track_id}!",
                        "total_likes_saved": activity_logs.count_documents({"track_id": track_id})
                    })
                    
                    # Send to everyone else
                    if connected_clients:
                        await asyncio.gather(
                            *[client.send(broadcast_msg) for client in connected_clients],
                            return_exceptions=True
                        )
                
            except json.JSONDecodeError:
                await websocket.send(json.dumps({"error": "Invalid JSON format"}))
                
    except websockets.exceptions.ConnectionClosed:
        pass
    finally:
        connected_clients.remove(websocket)
        print(f"❌ Connection closed. Active listeners: {len(connected_clients)}")

async def main():
    print(f"🚀 Integrated WebSocket + MongoDB Server running on ws://localhost:8765")
    print(f"📂 Logging events to MongoDB database: {DB_NAME}")
    
    async with websockets.serve(handle_client, "localhost", 8765):
        await asyncio.Future()  # run forever

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\nStopping server...")

like_button_client.py

import asyncio
import websockets
import json
import random

async def simulate_likes():
    uri = "ws://localhost:8765"
    tracks = ["Starboy", "Midnight City", "Blinding Lights", "Chlorine"]
    user_id = f"user_{random.randint(100, 999)}"
    
    print(f"Connecting as {user_id}...")
    
    try:
        async with websockets.connect(uri) as websocket:
            # Function to listen for broadcasts
            async def listen():
                try:
                    while True:
                        msg = await websocket.recv()
                        data = json.loads(msg)
                        if data["type"] == "NOTIFICATION":
                            print(f"\n📢 SERVER BROADCAST: {data['message']}")
                            print(f"📊 Global stats for this track: {data['total_likes_saved']} likes")
                except:
                    pass

            # Start listening in background
            listener_task = asyncio.create_task(listen())

            # Simulate sending 3 likes over a few seconds
            for _ in range(3):
                await asyncio.sleep(random.randint(2, 5))
                track = random.choice(tracks)
                
                payload = {
                    "type": "LIKE",
                    "track_id": track,
                    "user_id": user_id
                }
                
                print(f"\n👆 Clicking LIKE for '{track}'...")
                await websocket.send(json.dumps(payload))
            
            print("\nFinished sending likes. Waiting 5s for final broadcasts...")
            await asyncio.sleep(5)
            listener_task.cancel()

    except Exception as e:
        print(f"Connection error: {e}")

if __name__ == "__main__":
    asyncio.run(simulate_likes())

public_echo_client.py

import asyncio
import websockets

async def test_public_echo():
    # A widely used public WebSocket echo server
    uri = "wss://echo.websocket.org"
    
    print(f"Connecting to {uri}...")
    try:
        async with websockets.connect(uri) as websocket:
            print("Connected! Type a message to send (or 'exit' to quit):")
            
            while True:
                # Get user input in an async-friendly way
                message = await asyncio.get_event_loop().run_in_executor(None, input, "> ")
                
                if message.lower() == 'exit':
                    break
                
                # Send message to public server
                await websocket.send(message)
                print(f"Sent: {message}")
                
                # Receive echo back
                response = await websocket.recv()
                print(f"Received from Public Server: {response}")
                
    except Exception as e:
        print(f"Connection failed: {e}")
        print("\nTip: If wss://echo.websocket.org is down, try wss://ws.postman-echo.com/raw")

if __name__ == "__main__":
    try:
        asyncio.run(test_public_echo())
    except KeyboardInterrupt:
        print("\nClient closed.")

public_json_client.py

import asyncio
import websockets
import json
from datetime import datetime

async def test_json_echo():
    uri = "wss://echo.websocket.org"
    
    print(f"Connecting to {uri} for JSON testing...")
    
    try:
        async with websockets.connect(uri) as websocket:
            # 1. Define a structured JSON message (Music related)
            payload = {
                "event": "LIKE_TRACK",
                "data": {
                    "track_id": "track_99",
                    "user_id": "user_456",
                    "timestamp": datetime.now().isoformat()
                },
                "metadata": {
                    "client": "python-websocket-client",
                    "version": "1.0.0"
                }
            }
            
            # 2. Convert dictionary to JSON string
            json_string = json.dumps(payload)
            print(f"\n📤 Sending JSON Payload:\n{json.dumps(payload, indent=2)}")
            
            # 3. Send to server
            await websocket.send(json_string)
            
            # 4. Listen for the echo
            print("\n📡 Waiting for echo (skipping welcome messages)...")
            async for response in websocket:
                print(f"📥 Received: {response}")
                
                # Check if this is our JSON
                try:
                    received_data = json.loads(response)
                    if "event" in received_data:
                        print(f"\n✅ Found JSON Payload!")
                        print(f"Parsed Event: {received_data['event']}")
                        print(f"Track Liked: {received_data['data']['track_id']}")
                        break # Exit loop after finding our data
                except json.JSONDecodeError:
                    # Ignore non-JSON messages (like "Request served by...")
                    continue

    except Exception as e:
        print(f"❌ Error: {e}")

if __name__ == "__main__":
    asyncio.run(test_json_echo())