Skip to main content
Complete Python SDK for WebSocket connections to receive real-time market data and order updates.

Installation

pip install websockets asyncio cryptography

Complete SDK

import os
import asyncio
import json
import time
import base64
from cryptography.hazmat.primitives.asymmetric import ed25519
import websockets

class PolymarketWebSocketSDK:
    """SDK for Polymarket US WebSocket API"""

    def __init__(self, api_key_id, private_key_base64):
        """
        Initialize the SDK

        Args:
            api_key_id: Your API key ID from developer portal
            private_key_base64: Your base64-encoded Ed25519 private key (64 bytes)
        """
        self.api_key_id = api_key_id
        self.ws_url = "wss://api.polymarket.us"

        # Parse Ed25519 private key
        private_key_bytes = base64.b64decode(private_key_base64)
        self.private_key = ed25519.Ed25519PrivateKey.from_private_bytes(
            private_key_bytes[:32]
        )

    def _sign_request(self, method, path):
        """Generate Ed25519 signature for WebSocket handshake"""
        timestamp = str(int(time.time() * 1000))
        message = f"{timestamp}{method}{path}"
        signature = self.private_key.sign(message.encode('utf-8'))
        signature_base64 = base64.b64encode(signature).decode('utf-8')

        return {
            "X-PM-Access-Key": self.api_key_id,
            "X-PM-Timestamp": timestamp,
            "X-PM-Signature": signature_base64
        }

    async def connect_private(self, on_message, on_error=None):
        """
        Connect to private WebSocket for order and position updates

        Args:
            on_message: Callback function for messages: on_message(data)
            on_error: Optional callback for errors: on_error(error)
        """
        path = "/v1/ws/private"
        headers = self._sign_request("GET", path)

        uri = f"{self.ws_url}{path}"

        try:
            async with websockets.connect(uri, extra_headers=headers) as websocket:
                print("Connected to private WebSocket")

                async for message in websocket:
                    try:
                        data = json.loads(message)
                        await on_message(data)
                    except json.JSONDecodeError as e:
                        if on_error:
                            await on_error(f"JSON decode error: {e}")
                    except Exception as e:
                        if on_error:
                            await on_error(f"Message handler error: {e}")

        except websockets.exceptions.WebSocketException as e:
            if on_error:
                await on_error(f"WebSocket error: {e}")
            raise

    async def connect_markets(self, on_message, on_error=None):
        """
        Connect to public market data WebSocket

        Args:
            on_message: Callback function for messages: on_message(data)
            on_error: Optional callback for errors: on_error(error)
        """
        path = "/v1/ws/markets"
        headers = self._sign_request("GET", path)

        uri = f"{self.ws_url}{path}"

        try:
            async with websockets.connect(uri, extra_headers=headers) as websocket:
                print("Connected to market data WebSocket")

                async for message in websocket:
                    try:
                        data = json.loads(message)
                        await on_message(data)
                    except json.JSONDecodeError as e:
                        if on_error:
                            await on_error(f"JSON decode error: {e}")
                    except Exception as e:
                        if on_error:
                            await on_error(f"Message handler error: {e}")

        except websockets.exceptions.WebSocketException as e:
            if on_error:
                await on_error(f"WebSocket error: {e}")
            raise

    async def subscribe_to_market(self, websocket, market_slug):
        """
        Subscribe to a specific market's updates

        Args:
            websocket: Active WebSocket connection
            market_slug: Market slug to subscribe to
        """
        subscribe_msg = {
            "type": "subscribe",
            "market": market_slug
        }
        await websocket.send(json.dumps(subscribe_msg))

    async def unsubscribe_from_market(self, websocket, market_slug):
        """
        Unsubscribe from a market's updates

        Args:
            websocket: Active WebSocket connection
            market_slug: Market slug to unsubscribe from
        """
        unsubscribe_msg = {
            "type": "unsubscribe",
            "market": market_slug
        }
        await websocket.send(json.dumps(unsubscribe_msg))


# Example 1: Monitor orders and positions
async def monitor_orders():
    """Monitor real-time order and position updates"""
    api_key_id = os.environ.get("POLYMARKET_API_KEY")
    private_key = os.environ.get("POLYMARKET_PRIVATE_KEY")

    sdk = PolymarketWebSocketSDK(api_key_id, private_key)

    async def handle_message(data):
        msg_type = data.get("type")

        if msg_type == "order":
            order = data.get("data", {})
            print(f"\nOrder Update:")
            print(f"  ID: {order.get('id')}")
            print(f"  Market: {order.get('market_slug')}")
            print(f"  State: {order.get('state')}")
            print(f"  Filled: {order.get('cumQuantity')}/{order.get('quantity')}")

        elif msg_type == "execution":
            execution = data.get("data", {})
            print(f"\nExecution:")
            print(f"  Order ID: {execution.get('order', {}).get('id')}")
            print(f"  Shares: {execution.get('lastShares')}")
            print(f"  Price: \${execution.get('lastPx', {}).get('value')}")

        elif msg_type == "position":
            position = data.get("data", {})
            print(f"\nPosition Update:")
            print(f"  Market: {position.get('marketMetadata', {}).get('title')}")
            print(f"  Quantity: {position.get('quantity')}")
            print(f"  Value: \${position.get('value', {}).get('value')}")

    async def handle_error(error):
        print(f"Error: {error}")

    await sdk.connect_private(handle_message, handle_error)


# Example 2: Monitor market data
async def monitor_market_data():
    """Monitor real-time market price updates"""
    api_key_id = os.environ.get("POLYMARKET_API_KEY")
    private_key = os.environ.get("POLYMARKET_PRIVATE_KEY")

    sdk = PolymarketWebSocketSDK(api_key_id, private_key)

    async def handle_message(data):
        msg_type = data.get("type")

        if msg_type == "price":
            market = data.get("market")
            price_data = data.get("data", {})
            print(f"\nPrice Update - {market}:")
            print(f"  Bid: \${price_data.get('bestBid')}")
            print(f"  Ask: \${price_data.get('bestAsk')}")
            print(f"  Last: \${price_data.get('lastPrice')}")

        elif msg_type == "trade":
            trade = data.get("data", {})
            print(f"\nTrade:")
            print(f"  Market: {trade.get('market')}")
            print(f"  Price: \${trade.get('price')}")
            print(f"  Quantity: {trade.get('quantity')}")
            print(f"  Side: {trade.get('side')}")

        elif msg_type == "orderbook":
            orderbook = data.get("data", {})
            print(f"\nOrder Book Update:")
            print(f"  Bids: {len(orderbook.get('bids', []))}")
            print(f"  Asks: {len(orderbook.get('asks', []))}")

    await sdk.connect_markets(handle_message)


# Example 3: Subscribe to specific markets
async def subscribe_to_specific_markets():
    """Subscribe to updates for specific markets"""
    api_key_id = os.environ.get("POLYMARKET_API_KEY")
    private_key = os.environ.get("POLYMARKET_PRIVATE_KEY")

    sdk = PolymarketWebSocketSDK(api_key_id, private_key)

    path = "/v1/ws/markets"
    headers = sdk._sign_request("GET", path)
    uri = f"{sdk.ws_url}{path}"

    markets_to_watch = [
        "will-trump-win-2024",
        "will-biden-win-2024"
    ]

    async with websockets.connect(uri, extra_headers=headers) as websocket:
        print("Connected to market WebSocket")

        # Subscribe to markets
        for market_slug in markets_to_watch:
            await sdk.subscribe_to_market(websocket, market_slug)
            print(f"Subscribed to {market_slug}")

        # Listen for updates
        async for message in websocket:
            data = json.loads(message)
            print(f"\nUpdate from {data.get('market')}:")
            print(f"  Type: {data.get('type')}")
            print(f"  Data: {data.get('data')}")


# Example 4: Track portfolio in real-time
async def track_portfolio_realtime():
    """Track real-time portfolio changes"""
    api_key_id = os.environ.get("POLYMARKET_API_KEY")
    private_key = os.environ.get("POLYMARKET_PRIVATE_KEY")

    sdk = PolymarketWebSocketSDK(api_key_id, private_key)

    portfolio_value = 0
    positions = {}

    async def handle_message(data):
        nonlocal portfolio_value, positions

        msg_type = data.get("type")

        if msg_type == "position":
            position = data.get("data", {})
            market_slug = position.get("marketMetadata", {}).get("slug")

            quantity = float(position.get("quantity", 0))
            last_price = float(position.get("lastPrice", {}).get("value", 0))
            value = quantity * last_price

            positions[market_slug] = {
                "quantity": quantity,
                "price": last_price,
                "value": value
            }

            # Recalculate total portfolio value
            portfolio_value = sum(p["value"] for p in positions.values())

            print(f"\nPortfolio Update:")
            print(f"  Total Value: \${portfolio_value:.2f}")
            print(f"  Positions: {len(positions)}")

        elif msg_type == "execution":
            print(f"\nTrade Executed:")
            execution = data.get("data", {})
            print(f"  Shares: {execution.get('lastShares')}")
            print(f"  Price: \${execution.get('lastPx', {}).get('value')}")

    await sdk.connect_private(handle_message)


# Run examples
if __name__ == "__main__":
    # Choose which example to run:

    # Monitor orders and positions
    asyncio.run(monitor_orders())

    # Monitor market data
    # asyncio.run(monitor_market_data())

    # Subscribe to specific markets
    # asyncio.run(subscribe_to_specific_markets())

    # Track portfolio in real-time
    # asyncio.run(track_portfolio_realtime())

Message Types

Private WebSocket Messages

TypeDescription
orderOrder state changes
executionOrder fills and partial fills
positionPosition updates
balanceAccount balance changes

Market Data WebSocket Messages

TypeDescription
priceBest bid/ask price updates
tradeTrade executions
orderbookOrder book snapshots
liquidityLevel2Level 2 market depth

Subscription Management

async def manage_subscriptions():
    """Dynamically manage market subscriptions"""
    sdk = PolymarketWebSocketSDK(api_key_id, private_key)

    path = "/v1/ws/markets"
    headers = sdk._sign_request("GET", path)
    uri = f"{sdk.ws_url}{path}"

    async with websockets.connect(uri, extra_headers=headers) as websocket:
        # Subscribe to initial markets
        await sdk.subscribe_to_market(websocket, "market-1")
        await sdk.subscribe_to_market(websocket, "market-2")

        # Wait a bit
        await asyncio.sleep(10)

        # Unsubscribe from one
        await sdk.unsubscribe_from_market(websocket, "market-1")

        # Subscribe to a new one
        await sdk.subscribe_to_market(websocket, "market-3")

        # Continue listening
        async for message in websocket:
            data = json.loads(message)
            print(f"Update: {data}")

Reconnection Logic

async def connect_with_reconnect(sdk, max_retries=5):
    """Connect with automatic reconnection"""
    retry_count = 0

    while retry_count < max_retries:
        try:
            async def handle_message(data):
                print(f"Message: {data}")

            async def handle_error(error):
                print(f"Error: {error}")

            await sdk.connect_private(handle_message, handle_error)

        except Exception as e:
            retry_count += 1
            wait_time = min(2 ** retry_count, 30)  # Exponential backoff
            print(f"Connection failed. Retrying in {wait_time}s...")
            await asyncio.sleep(wait_time)

    print("Max retries reached. Giving up.")


# Usage
sdk = PolymarketWebSocketSDK(api_key_id, private_key)
asyncio.run(connect_with_reconnect(sdk))

Error Handling

async def safe_websocket_connection():
    """WebSocket connection with comprehensive error handling"""
    sdk = PolymarketWebSocketSDK(api_key_id, private_key)

    async def handle_message(data):
        try:
            # Process message
            print(f"Received: {data}")
        except Exception as e:
            print(f"Error processing message: {e}")

    async def handle_error(error):
        print(f"WebSocket error: {error}")
        # Log error, send alert, etc.

    try:
        await sdk.connect_private(handle_message, handle_error)
    except websockets.exceptions.ConnectionClosed:
        print("Connection closed by server")
    except websockets.exceptions.WebSocketException as e:
        print(f"WebSocket exception: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")

Rate Limits

WebSocket connections have the following limits:
  • Maximum 5 concurrent connections per API key
  • Maximum 50 market subscriptions per connection
  • Reconnection attempts limited to 1 per second

Best Practices

  1. Implement reconnection logic - connections can drop unexpectedly
  2. Handle message types gracefully - ignore unknown message types
  3. Subscribe selectively - only subscribe to markets you need
  4. Use async/await - never block the event loop
  5. Monitor connection health - implement ping/pong or heartbeat