Websockets
WebSocket Programming in Python
WebSockets provide a full-duplex, persistent connection between a client and a server. Unlike HTTP (Request-Response), WebSockets allow the server to push data to the client at any time.
Key Concepts
- Handshake: WebSockets start as an HTTP request with an
Upgradeheader. - Stateful: The server must keep track of all active connections.
- Concurrency: Because connections stay open, you MUST use asynchronous programming (
asyncioin Python). - Scaling: Scaling WebSockets is harder than HTTP because you can't just use a simple round-robin load balancer (clients need to stay connected to the same server, or you need a Pub/Sub like Redis).
Use Cases Covered
- server.py: Basic Echo Server.
- broadcast_server.py: Music "Live Ticker" (Server-to-Client push).
- integrated_app.py: Client sends data -> Server saves to MongoDB -> Server broadcasts to ALL clients.
- like_button_client.py: Simulates a user liking a song and receiving real-time notifications.
- public_echo_client.py: Test against online servers.
- public_json_client.py: Demonstrates how to send and parse structured JSON data over a WebSocket connection.
Setup
- Navigate to this folder:
cd websocket_learnings - Install dependencies:
pip install -r requirements.txt - Run the server in one terminal and the client in another.
📝 Lab Implementation & Scripts
server.py
import asyncio
import websockets
async def echo_handler(websocket):
print(f"New client connected: {websocket.remote_address}")
try:
async for message in websocket:
print(f"Received: {message}")
response = f"Server Echo: {message}"
await websocket.send(response)
except websockets.exceptions.ConnectionClosed:
print(f"Client disconnected: {websocket.remote_address}")
async def main():
# Start the server on localhost:8765
async with websockets.serve(echo_handler, "localhost", 8765):
print("🚀 Socket Server started on ws://localhost:8765")
await asyncio.Future() # Run forever
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nStopping server...")
broadcast_server.py
import asyncio
import websockets
import json
import random
# Track all connected clients
connected_clients = set()
async def broadcast_updates(websocket):
# Register client
connected_clients.add(websocket)
print(f"Listener joined. Total listeners: {len(connected_clients)}")
try:
# Keep the connection open
async for message in websocket:
# If client sends something, we can handle it
data = json.loads(message)
print(f"Received request from client: {data}")
except Exception:
pass
finally:
# Unregister client
connected_clients.remove(websocket)
print(f"Listener left. Total listeners: {len(connected_clients)}")
async def simulate_song_changes():
"""Simulate a radio station changing songs every few seconds"""
songs = [
{"title": "Starboy", "artist": "The Weeknd"},
{"title": "Midnight City", "artist": "M83"},
{"title": "Blinding Lights", "artist": "The Weeknd"},
{"title": "Chlorine", "artist": "Twenty One Pilots"}
]
while True:
if connected_clients:
new_song = random.choice(songs)
payload = json.dumps({
"type": "SONG_CHANGE",
"data": new_song,
"timestamp": json.dumps(str(asyncio.get_event_loop().time()))
})
print(f"Broadcasting new song: {new_song['title']}")
# Send to everyone!
# Using asyncio.gather to send in parallel
await asyncio.gather(
*[client.send(payload) for client in connected_clients],
return_exceptions=True
)
await asyncio.sleep(5) # Change song every 5 seconds
async def main():
# Start the websocket server
server = websockets.serve(broadcast_updates, "localhost", 8765)
# Run the server and the simulator concurrently
await asyncio.gather(server, simulate_song_changes())
if __name__ == "__main__":
print("🚀 Music Live-Ticker Server started on ws://localhost:8765")
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
integrated_app.py
import asyncio
import websockets
import json
import os
from pymongo import MongoClient
from dotenv import load_dotenv
from datetime import datetime
# 1. Setup Environment and MongoDB
load_dotenv()
MONGO_URI = os.getenv("MONGO_URI")
DB_NAME = os.getenv("DB_NAME", "music_db")
# Global variables
connected_clients = set()
def get_mongo_collection():
client = MongoClient(MONGO_URI)
db = client[DB_NAME]
return db["activity_logs"]
# Initialize collection
activity_logs = get_mongo_collection()
async def handle_client(websocket):
# Register client
connected_clients.add(websocket)
print(f"✅ New connection. Active listeners: {len(connected_clients)}")
try:
async for message in websocket:
try:
data = json.loads(message)
# Check for "LIKE" events
if data.get("type") == "LIKE":
track_id = data.get("track_id", "unknown")
user_id = data.get("user_id", "guest")
print(f"📩 Received LIKE for {track_id} from {user_id}")
# 2. PERSIST TO MONGODB
log_entry = {
"event": "LIKE",
"track_id": track_id,
"user_id": user_id,
"timestamp": datetime.now()
}
result = activity_logs.insert_one(log_entry)
print(f"💾 Saved to MongoDB index: {result.inserted_id}")
# 3. BROADCAST TO OTHERS
broadcast_msg = json.dumps({
"type": "NOTIFICATION",
"message": f"🔥 Someone just liked {track_id}!",
"total_likes_saved": activity_logs.count_documents({"track_id": track_id})
})
# Send to everyone else
if connected_clients:
await asyncio.gather(
*[client.send(broadcast_msg) for client in connected_clients],
return_exceptions=True
)
except json.JSONDecodeError:
await websocket.send(json.dumps({"error": "Invalid JSON format"}))
except websockets.exceptions.ConnectionClosed:
pass
finally:
connected_clients.remove(websocket)
print(f"❌ Connection closed. Active listeners: {len(connected_clients)}")
async def main():
print(f"🚀 Integrated WebSocket + MongoDB Server running on ws://localhost:8765")
print(f"📂 Logging events to MongoDB database: {DB_NAME}")
async with websockets.serve(handle_client, "localhost", 8765):
await asyncio.Future() # run forever
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nStopping server...")
like_button_client.py
import asyncio
import websockets
import json
import random
async def simulate_likes():
uri = "ws://localhost:8765"
tracks = ["Starboy", "Midnight City", "Blinding Lights", "Chlorine"]
user_id = f"user_{random.randint(100, 999)}"
print(f"Connecting as {user_id}...")
try:
async with websockets.connect(uri) as websocket:
# Function to listen for broadcasts
async def listen():
try:
while True:
msg = await websocket.recv()
data = json.loads(msg)
if data["type"] == "NOTIFICATION":
print(f"\n📢 SERVER BROADCAST: {data['message']}")
print(f"📊 Global stats for this track: {data['total_likes_saved']} likes")
except:
pass
# Start listening in background
listener_task = asyncio.create_task(listen())
# Simulate sending 3 likes over a few seconds
for _ in range(3):
await asyncio.sleep(random.randint(2, 5))
track = random.choice(tracks)
payload = {
"type": "LIKE",
"track_id": track,
"user_id": user_id
}
print(f"\n👆 Clicking LIKE for '{track}'...")
await websocket.send(json.dumps(payload))
print("\nFinished sending likes. Waiting 5s for final broadcasts...")
await asyncio.sleep(5)
listener_task.cancel()
except Exception as e:
print(f"Connection error: {e}")
if __name__ == "__main__":
asyncio.run(simulate_likes())
public_echo_client.py
import asyncio
import websockets
async def test_public_echo():
# A widely used public WebSocket echo server
uri = "wss://echo.websocket.org"
print(f"Connecting to {uri}...")
try:
async with websockets.connect(uri) as websocket:
print("Connected! Type a message to send (or 'exit' to quit):")
while True:
# Get user input in an async-friendly way
message = await asyncio.get_event_loop().run_in_executor(None, input, "> ")
if message.lower() == 'exit':
break
# Send message to public server
await websocket.send(message)
print(f"Sent: {message}")
# Receive echo back
response = await websocket.recv()
print(f"Received from Public Server: {response}")
except Exception as e:
print(f"Connection failed: {e}")
print("\nTip: If wss://echo.websocket.org is down, try wss://ws.postman-echo.com/raw")
if __name__ == "__main__":
try:
asyncio.run(test_public_echo())
except KeyboardInterrupt:
print("\nClient closed.")
public_json_client.py
import asyncio
import websockets
import json
from datetime import datetime
async def test_json_echo():
uri = "wss://echo.websocket.org"
print(f"Connecting to {uri} for JSON testing...")
try:
async with websockets.connect(uri) as websocket:
# 1. Define a structured JSON message (Music related)
payload = {
"event": "LIKE_TRACK",
"data": {
"track_id": "track_99",
"user_id": "user_456",
"timestamp": datetime.now().isoformat()
},
"metadata": {
"client": "python-websocket-client",
"version": "1.0.0"
}
}
# 2. Convert dictionary to JSON string
json_string = json.dumps(payload)
print(f"\n📤 Sending JSON Payload:\n{json.dumps(payload, indent=2)}")
# 3. Send to server
await websocket.send(json_string)
# 4. Listen for the echo
print("\n📡 Waiting for echo (skipping welcome messages)...")
async for response in websocket:
print(f"📥 Received: {response}")
# Check if this is our JSON
try:
received_data = json.loads(response)
if "event" in received_data:
print(f"\n✅ Found JSON Payload!")
print(f"Parsed Event: {received_data['event']}")
print(f"Track Liked: {received_data['data']['track_id']}")
break # Exit loop after finding our data
except json.JSONDecodeError:
# Ignore non-JSON messages (like "Request served by...")
continue
except Exception as e:
print(f"❌ Error: {e}")
if __name__ == "__main__":
asyncio.run(test_json_echo())