Installation
Copy
Ask AI
pip install websockets asyncio cryptography
Complete SDK
Copy
Ask AI
import os
import asyncio
import json
import time
import base64
from cryptography.hazmat.primitives.asymmetric import ed25519
import websockets
class PolymarketWebSocketSDK:
"""SDK for Polymarket US WebSocket API"""
def __init__(self, api_key_id, private_key_base64):
"""
Initialize the SDK
Args:
api_key_id: Your API key ID from developer portal
private_key_base64: Your base64-encoded Ed25519 private key (64 bytes)
"""
self.api_key_id = api_key_id
self.ws_url = "wss://api.polymarket.us"
# Parse Ed25519 private key
private_key_bytes = base64.b64decode(private_key_base64)
self.private_key = ed25519.Ed25519PrivateKey.from_private_bytes(
private_key_bytes[:32]
)
def _sign_request(self, method, path):
"""Generate Ed25519 signature for WebSocket handshake"""
timestamp = str(int(time.time() * 1000))
message = f"{timestamp}{method}{path}"
signature = self.private_key.sign(message.encode('utf-8'))
signature_base64 = base64.b64encode(signature).decode('utf-8')
return {
"X-PM-Access-Key": self.api_key_id,
"X-PM-Timestamp": timestamp,
"X-PM-Signature": signature_base64
}
async def connect_private(self, on_message, on_error=None):
"""
Connect to private WebSocket for order and position updates
Args:
on_message: Callback function for messages: on_message(data)
on_error: Optional callback for errors: on_error(error)
"""
path = "/v1/ws/private"
headers = self._sign_request("GET", path)
uri = f"{self.ws_url}{path}"
try:
async with websockets.connect(uri, extra_headers=headers) as websocket:
print("Connected to private WebSocket")
async for message in websocket:
try:
data = json.loads(message)
await on_message(data)
except json.JSONDecodeError as e:
if on_error:
await on_error(f"JSON decode error: {e}")
except Exception as e:
if on_error:
await on_error(f"Message handler error: {e}")
except websockets.exceptions.WebSocketException as e:
if on_error:
await on_error(f"WebSocket error: {e}")
raise
async def connect_markets(self, on_message, on_error=None):
"""
Connect to public market data WebSocket
Args:
on_message: Callback function for messages: on_message(data)
on_error: Optional callback for errors: on_error(error)
"""
path = "/v1/ws/markets"
headers = self._sign_request("GET", path)
uri = f"{self.ws_url}{path}"
try:
async with websockets.connect(uri, extra_headers=headers) as websocket:
print("Connected to market data WebSocket")
async for message in websocket:
try:
data = json.loads(message)
await on_message(data)
except json.JSONDecodeError as e:
if on_error:
await on_error(f"JSON decode error: {e}")
except Exception as e:
if on_error:
await on_error(f"Message handler error: {e}")
except websockets.exceptions.WebSocketException as e:
if on_error:
await on_error(f"WebSocket error: {e}")
raise
async def subscribe_to_market(self, websocket, market_slug):
"""
Subscribe to a specific market's updates
Args:
websocket: Active WebSocket connection
market_slug: Market slug to subscribe to
"""
subscribe_msg = {
"type": "subscribe",
"market": market_slug
}
await websocket.send(json.dumps(subscribe_msg))
async def unsubscribe_from_market(self, websocket, market_slug):
"""
Unsubscribe from a market's updates
Args:
websocket: Active WebSocket connection
market_slug: Market slug to unsubscribe from
"""
unsubscribe_msg = {
"type": "unsubscribe",
"market": market_slug
}
await websocket.send(json.dumps(unsubscribe_msg))
# Example 1: Monitor orders and positions
async def monitor_orders():
"""Monitor real-time order and position updates"""
api_key_id = os.environ.get("POLYMARKET_API_KEY")
private_key = os.environ.get("POLYMARKET_PRIVATE_KEY")
sdk = PolymarketWebSocketSDK(api_key_id, private_key)
async def handle_message(data):
msg_type = data.get("type")
if msg_type == "order":
order = data.get("data", {})
print(f"\nOrder Update:")
print(f" ID: {order.get('id')}")
print(f" Market: {order.get('market_slug')}")
print(f" State: {order.get('state')}")
print(f" Filled: {order.get('cumQuantity')}/{order.get('quantity')}")
elif msg_type == "execution":
execution = data.get("data", {})
print(f"\nExecution:")
print(f" Order ID: {execution.get('order', {}).get('id')}")
print(f" Shares: {execution.get('lastShares')}")
print(f" Price: \${execution.get('lastPx', {}).get('value')}")
elif msg_type == "position":
position = data.get("data", {})
print(f"\nPosition Update:")
print(f" Market: {position.get('marketMetadata', {}).get('title')}")
print(f" Quantity: {position.get('quantity')}")
print(f" Value: \${position.get('value', {}).get('value')}")
async def handle_error(error):
print(f"Error: {error}")
await sdk.connect_private(handle_message, handle_error)
# Example 2: Monitor market data
async def monitor_market_data():
"""Monitor real-time market price updates"""
api_key_id = os.environ.get("POLYMARKET_API_KEY")
private_key = os.environ.get("POLYMARKET_PRIVATE_KEY")
sdk = PolymarketWebSocketSDK(api_key_id, private_key)
async def handle_message(data):
msg_type = data.get("type")
if msg_type == "price":
market = data.get("market")
price_data = data.get("data", {})
print(f"\nPrice Update - {market}:")
print(f" Bid: \${price_data.get('bestBid')}")
print(f" Ask: \${price_data.get('bestAsk')}")
print(f" Last: \${price_data.get('lastPrice')}")
elif msg_type == "trade":
trade = data.get("data", {})
print(f"\nTrade:")
print(f" Market: {trade.get('market')}")
print(f" Price: \${trade.get('price')}")
print(f" Quantity: {trade.get('quantity')}")
print(f" Side: {trade.get('side')}")
elif msg_type == "orderbook":
orderbook = data.get("data", {})
print(f"\nOrder Book Update:")
print(f" Bids: {len(orderbook.get('bids', []))}")
print(f" Asks: {len(orderbook.get('asks', []))}")
await sdk.connect_markets(handle_message)
# Example 3: Subscribe to specific markets
async def subscribe_to_specific_markets():
"""Subscribe to updates for specific markets"""
api_key_id = os.environ.get("POLYMARKET_API_KEY")
private_key = os.environ.get("POLYMARKET_PRIVATE_KEY")
sdk = PolymarketWebSocketSDK(api_key_id, private_key)
path = "/v1/ws/markets"
headers = sdk._sign_request("GET", path)
uri = f"{sdk.ws_url}{path}"
markets_to_watch = [
"will-trump-win-2024",
"will-biden-win-2024"
]
async with websockets.connect(uri, extra_headers=headers) as websocket:
print("Connected to market WebSocket")
# Subscribe to markets
for market_slug in markets_to_watch:
await sdk.subscribe_to_market(websocket, market_slug)
print(f"Subscribed to {market_slug}")
# Listen for updates
async for message in websocket:
data = json.loads(message)
print(f"\nUpdate from {data.get('market')}:")
print(f" Type: {data.get('type')}")
print(f" Data: {data.get('data')}")
# Example 4: Track portfolio in real-time
async def track_portfolio_realtime():
"""Track real-time portfolio changes"""
api_key_id = os.environ.get("POLYMARKET_API_KEY")
private_key = os.environ.get("POLYMARKET_PRIVATE_KEY")
sdk = PolymarketWebSocketSDK(api_key_id, private_key)
portfolio_value = 0
positions = {}
async def handle_message(data):
nonlocal portfolio_value, positions
msg_type = data.get("type")
if msg_type == "position":
position = data.get("data", {})
market_slug = position.get("marketMetadata", {}).get("slug")
quantity = float(position.get("quantity", 0))
last_price = float(position.get("lastPrice", {}).get("value", 0))
value = quantity * last_price
positions[market_slug] = {
"quantity": quantity,
"price": last_price,
"value": value
}
# Recalculate total portfolio value
portfolio_value = sum(p["value"] for p in positions.values())
print(f"\nPortfolio Update:")
print(f" Total Value: \${portfolio_value:.2f}")
print(f" Positions: {len(positions)}")
elif msg_type == "execution":
print(f"\nTrade Executed:")
execution = data.get("data", {})
print(f" Shares: {execution.get('lastShares')}")
print(f" Price: \${execution.get('lastPx', {}).get('value')}")
await sdk.connect_private(handle_message)
# Run examples
if __name__ == "__main__":
# Choose which example to run:
# Monitor orders and positions
asyncio.run(monitor_orders())
# Monitor market data
# asyncio.run(monitor_market_data())
# Subscribe to specific markets
# asyncio.run(subscribe_to_specific_markets())
# Track portfolio in real-time
# asyncio.run(track_portfolio_realtime())
Message Types
Private WebSocket Messages
| Type | Description |
|---|---|
order | Order state changes |
execution | Order fills and partial fills |
position | Position updates |
balance | Account balance changes |
Market Data WebSocket Messages
| Type | Description |
|---|---|
price | Best bid/ask price updates |
trade | Trade executions |
orderbook | Order book snapshots |
liquidityLevel2 | Level 2 market depth |
Subscription Management
Copy
Ask AI
async def manage_subscriptions():
"""Dynamically manage market subscriptions"""
sdk = PolymarketWebSocketSDK(api_key_id, private_key)
path = "/v1/ws/markets"
headers = sdk._sign_request("GET", path)
uri = f"{sdk.ws_url}{path}"
async with websockets.connect(uri, extra_headers=headers) as websocket:
# Subscribe to initial markets
await sdk.subscribe_to_market(websocket, "market-1")
await sdk.subscribe_to_market(websocket, "market-2")
# Wait a bit
await asyncio.sleep(10)
# Unsubscribe from one
await sdk.unsubscribe_from_market(websocket, "market-1")
# Subscribe to a new one
await sdk.subscribe_to_market(websocket, "market-3")
# Continue listening
async for message in websocket:
data = json.loads(message)
print(f"Update: {data}")
Reconnection Logic
Copy
Ask AI
async def connect_with_reconnect(sdk, max_retries=5):
"""Connect with automatic reconnection"""
retry_count = 0
while retry_count < max_retries:
try:
async def handle_message(data):
print(f"Message: {data}")
async def handle_error(error):
print(f"Error: {error}")
await sdk.connect_private(handle_message, handle_error)
except Exception as e:
retry_count += 1
wait_time = min(2 ** retry_count, 30) # Exponential backoff
print(f"Connection failed. Retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
print("Max retries reached. Giving up.")
# Usage
sdk = PolymarketWebSocketSDK(api_key_id, private_key)
asyncio.run(connect_with_reconnect(sdk))
Error Handling
Copy
Ask AI
async def safe_websocket_connection():
"""WebSocket connection with comprehensive error handling"""
sdk = PolymarketWebSocketSDK(api_key_id, private_key)
async def handle_message(data):
try:
# Process message
print(f"Received: {data}")
except Exception as e:
print(f"Error processing message: {e}")
async def handle_error(error):
print(f"WebSocket error: {error}")
# Log error, send alert, etc.
try:
await sdk.connect_private(handle_message, handle_error)
except websockets.exceptions.ConnectionClosed:
print("Connection closed by server")
except websockets.exceptions.WebSocketException as e:
print(f"WebSocket exception: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
Rate Limits
WebSocket connections have the following limits:- Maximum 5 concurrent connections per API key
- Maximum 50 market subscriptions per connection
- Reconnection attempts limited to 1 per second
Best Practices
- Implement reconnection logic - connections can drop unexpectedly
- Handle message types gracefully - ignore unknown message types
- Subscribe selectively - only subscribe to markets you need
- Use async/await - never block the event loop
- Monitor connection health - implement ping/pong or heartbeat