Websockets
public_json_client.py
import asyncio
import websockets
import json
from datetime import datetime
async def test_json_echo():
uri = "wss://echo.websocket.org"
print(f"Connecting to {uri} for JSON testing...")
try:
async with websockets.connect(uri) as websocket:
# 1. Define a structured JSON message (Music related)
payload = {
"event": "LIKE_TRACK",
"data": {
"track_id": "track_99",
"user_id": "user_456",
"timestamp": datetime.now().isoformat()
},
"metadata": {
"client": "python-websocket-client",
"version": "1.0.0"
}
}
# 2. Convert dictionary to JSON string
json_string = json.dumps(payload)
print(f"\nš¤ Sending JSON Payload:\n{json.dumps(payload, indent=2)}")
# 3. Send to server
await websocket.send(json_string)
# 4. Listen for the echo
print("\nš” Waiting for echo (skipping welcome messages)...")
async for response in websocket:
print(f"š„ Received: {response}")
# Check if this is our JSON
try:
received_data = json.loads(response)
if "event" in received_data:
print(f"\nā
Found JSON Payload!")
print(f"Parsed Event: {received_data['event']}")
print(f"Track Liked: {received_data['data']['track_id']}")
break # Exit loop after finding our data
except json.JSONDecodeError:
# Ignore non-JSON messages (like "Request served by...")
continue
except Exception as e:
print(f"ā Error: {e}")
if __name__ == "__main__":
asyncio.run(test_json_echo())