import grpc
import requests
from datetime import datetime, timedelta
from typing import Optional
from polymarket.v1 import trading_pb2
from polymarket.v1 import trading_pb2_grpc
from polymarket.v1 import enums_pb2
class PolymarketOrderStreamer:
def __init__(self, base_url: str = "https://rest.preprod.polymarketexchange.com",
grpc_server: str = "grpc-api.preprod.polymarketexchange.com:443"):
self.base_url = base_url
self.grpc_server = grpc_server
self.access_token: Optional[str] = None
self.refresh_token: Optional[str] = None
self.access_expiration: Optional[datetime] = None
self.session_id: Optional[str] = None
self.price_scale: int = 1000 # Get from instrument metadata
def login(self, auth0_domain: str, client_id: str, private_key_path: str, audience: str) -> dict:
"""Authenticate using Private Key JWT and store the access token."""
import jwt
import uuid
from cryptography.hazmat.primitives import serialization
# Load private key
with open(private_key_path, 'rb') as f:
private_key = serialization.load_pem_private_key(f.read(), password=None)
# Create JWT assertion
now = int(datetime.now().timestamp())
claims = {
"iss": client_id,
"sub": client_id,
"aud": f"https://{auth0_domain}/oauth/token",
"iat": now,
"exp": now + 300,
"jti": str(uuid.uuid4()),
}
assertion = jwt.encode(claims, private_key, algorithm="RS256")
# Exchange for access token
response = requests.post(
f"https://{auth0_domain}/oauth/token",
json={
"client_id": client_id,
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
"client_assertion": assertion,
"audience": audience,
"grant_type": "client_credentials"
}
)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
# Set expiration with 30-second buffer (tokens expire in 180 seconds)
expires_in = token_data.get("expires_in", 180)
self.access_expiration = datetime.now() + timedelta(seconds=expires_in - 30)
return token_data
def stream_orders(self, symbols: list = None, accounts: list = None, snapshot_only: bool = False):
"""Stream order updates using gRPC."""
if not self.access_token:
raise ValueError("Not authenticated. Please login first.")
# Create credentials
credentials = grpc.ssl_channel_credentials()
# Create channel
channel = grpc.secure_channel(self.grpc_server, credentials)
# Create stub
stub = trading_pb2_grpc.OrderEntryAPIStub(channel)
# Create request
request = trading_pb2.CreateOrderSubscriptionRequest(
symbols=symbols or [],
accounts=accounts or [],
snapshot_only=snapshot_only
)
# Set up metadata with authorization
metadata = [
('authorization', f'Bearer {self.access_token}')
]
try:
print(f"Starting order stream")
print(f"Symbols: {symbols or 'ALL'}")
print(f"Accounts: {accounts or 'ALL'}")
print(f"Snapshot only: {snapshot_only}")
print("-" * 60)
# Start streaming
response_stream = stub.CreateOrderSubscription(request, metadata=metadata)
for response in response_stream:
self._process_order_response(response)
except grpc.RpcError as e:
print(f"gRPC error: {e.code()} - {e.details()}")
raise
except KeyboardInterrupt:
print("\nStream interrupted by user")
finally:
channel.close()
def _process_order_response(self, response):
"""Process and display order response."""
# Capture session ID on first message
if response.session_id and not self.session_id:
self.session_id = response.session_id
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Session established")
print(f" Session ID: {self.session_id}")
print("-" * 60)
if response.HasField('heartbeat'):
print(f"[{datetime.now().strftime('%H:%M:%S')}] Heartbeat received")
elif response.HasField('snapshot'):
snapshot = response.snapshot
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Order Snapshot")
print(f" Total orders: {len(snapshot.orders)}")
for order in snapshot.orders:
self._display_order(order)
print("-" * 60)
elif response.HasField('update'):
update = response.update
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Order Update")
# Display executions
if update.executions:
print(f" Executions: {len(update.executions)}")
for execution in update.executions:
self._display_execution(execution)
# Display cancel rejects
if update.HasField('cancel_reject'):
self._display_cancel_reject(update.cancel_reject)
print("-" * 60)
def _display_order(self, order):
"""Display order details."""
print(f" Order ID: {order.id}")
print(f" Client Order ID: {order.clord_id}")
print(f" Symbol: {order.symbol}")
print(f" Side: {enums_pb2.Side.Name(order.side)}")
print(f" Type: {enums_pb2.OrderType.Name(order.type)}")
print(f" State: {enums_pb2.OrderState.Name(order.state)}")
if order.price > 0:
price = order.price / self.price_scale
print(f" Price: ${price:.4f}")
print(f" Order Qty: {order.order_qty}")
print(f" Filled Qty: {order.cum_qty}")
print(f" Remaining Qty: {order.leaves_qty}")
if order.avg_px > 0:
avg_px = order.avg_px / self.price_scale
print(f" Avg Price: ${avg_px:.4f}")
if order.account:
print(f" Account: {order.account}")
print()
def _display_execution(self, execution):
"""Display execution details."""
print(f" Execution ID: {execution.id}")
print(f" Type: {enums_pb2.ExecutionType.Name(execution.type)}")
if execution.HasField('order'):
order = execution.order
print(f" Order ID: {order.id}")
print(f" Symbol: {order.symbol}")
print(f" Side: {enums_pb2.Side.Name(order.side)}")
print(f" State: {enums_pb2.OrderState.Name(order.state)}")
if execution.last_shares > 0:
print(f" Last Shares: {execution.last_shares}")
if execution.last_px > 0:
last_px = execution.last_px / self.price_scale
print(f" Last Price: ${last_px:.4f}")
if execution.trade_id:
print(f" Trade ID: {execution.trade_id}")
if execution.text:
print(f" Text: {execution.text}")
if execution.order_reject_reason != enums_pb2.ORD_REJECT_REASON_EXCHANGE_OPTION:
print(f" Reject Reason: {enums_pb2.OrdRejectReason.Name(execution.order_reject_reason)}")
print()
def _display_cancel_reject(self, cancel_reject):
"""Display cancel reject details."""
print(f" Cancel Reject:")
print(f" Order ID: {cancel_reject.order_id}")
print(f" Client Order ID: {cancel_reject.clord_id}")
print(f" Reject Reason: {enums_pb2.CxlRejReason.Name(cancel_reject.reject_reason)}")
if cancel_reject.text:
print(f" Text: {cancel_reject.text}")
print()
# Usage
if __name__ == "__main__":
streamer = PolymarketOrderStreamer()
# Login using Private Key JWT
streamer.login(
auth0_domain="pmx-preprod.us.auth0.com",
client_id="your_client_id",
private_key_path="private_key.pem",
audience="https://api.preprod.polymarketexchange.com"
)
# Stream orders
streamer.stream_orders(
symbols=["tec-nfl-sbw-2026-02-08-kc"],
accounts=[]
)