Lesson 3 — Market data (REST & WebSocket)
Implement reliable REST polling and WebSocket subscriptions, normalize different exchange payloads, merge snapshot + delta orderbooks, and respect rate limits.
Prerequisites
- Completion of Crypto Lessons 1–2 (env setup and HMAC auth)
- Python 3.10+, requests, websockets, pandas (optional)
- Testnet API keys and WebSocket URL for your exchange
Overview
We’ll build two complementary components:
- a REST poller for slower, periodic data and fallback snapshots, and
- a WebSocket client that ingests real-time snapshots and incremental deltas (orderbook updates).
1) Recommended .env values
# .env
EXCHANGE_API_URL=https://testnet-api.example.com
EXCHANGE_API_KEY=your_testnet_api_key
EXCHANGE_API_SECRET=your_testnet_api_secret
EXCHANGE_WS_URL=wss://testnet-ws.example.com
MARKETS=BTC/USDT,ETH/USDT
2) REST poller (rest_poller.py)
Use REST polling for periodic snapshots (orderbook snapshots, recent trades). Respect rate limits; cache responses and use exponential backoff on 5xx errors.
import os, time, json
import requests
from dotenv import load_dotenv
load_dotenv()
API_URL = os.getenv('EXCHANGE_API_URL')
MARKETS = os.getenv('MARKETS','BTC/USDT').split(',')
def fetch_orderbook(market):
url = API_URL.rstrip('/') + f"/orderbook/{market.replace('/','')}"
r = requests.get(url, timeout=10)
r.raise_for_status()
return r.json()
def poll_loop(interval=5):
while True:
for m in MARKETS:
try:
ob = fetch_orderbook(m)
# persist or feed into your normalization pipeline
print('snapshot', m, 'top bids', ob.get('bids',[])[:3])
except Exception as e:
print('rest poll error', e)
time.sleep(interval)
if __name__ == '__main__':
poll_loop()
Tip: Keep REST polling cadence low (1–5s for snapshots) and rely on WS for high-frequency updates. Use conditional requests or ETags if the API supports them.
3) WebSocket subscriber (ws_client.py)
The WebSocket client receives snapshots and deltas. This example uses asyncio + websockets and shows a subscription and basic parsing loop.
import asyncio, json, os
import websockets
from dotenv import load_dotenv
load_dotenv()
WS_URL = os.getenv('EXCHANGE_WS_URL')
MARKETS = os.getenv('MARKETS','BTC/USDT').split(',')
async def ws_run():
async with websockets.connect(WS_URL) as ws:
# subscribe - adapt to exchange protocol
subscribe = {"op":"subscribe", "args":[{"channel":"orderbook","symbols":MARKETS}]}
await ws.send(json.dumps(subscribe))
while True:
msg = await ws.recv()
data = json.loads(msg)
# handle snapshot or incremental update
handle_message(data)
def handle_message(data):
# Example: data could be {'type':'snapshot','symbol':'BTC/USDT','bids':[[price,size],...], 'asks':[...] }
print('ws message', data.get('type'))
# feed into normalizer / orderbook merger
if __name__ == '__main__':
asyncio.run(ws_run())
WS auth: if auth required, sign per exchange docs and send the auth frame before subscribing.
4) Snapshot + delta merging (orderbook merge)
Pattern: fetch a full snapshot via REST then apply incremental deltas from WS (sequence numbers) until the next snapshot. This avoids gaps and avoids rebuilding from every delta.
# orderbook_merge.py (conceptual)
# orderbook format: {'bids': {price:size}, 'asks': {price:size}, 'seq': n}
def apply_delta(ob, delta):
# delta example: {'side':'bid','price':100.0,'size':1.5,'action':'update'|'remove'}
side = 'bids' if delta['side']=='bid' else 'asks'
price = float(delta['price'])
if delta['action']=='remove':
ob[side].pop(price, None)
else:
size = float(delta['size'])
if size == 0:
ob[side].pop(price, None)
else:
ob[side][price] = size
ob['seq'] = delta.get('seq', ob.get('seq'))
return ob
def snapshot_to_dict(snapshot):
return {'bids': {float(p):float(s) for p,s in snapshot.get('bids',[])}, 'asks': {float(p):float(s) for p,s in snapshot.get('asks',[])}, 'seq': snapshot.get('seq',0)}
Important: Use sequence numbers to ensure delta continuity. If you detect a gap (delta.seq != snapshot.seq+1), re-fetch a fresh snapshot before applying further deltas.
5) Normalization & unified schema
Different exchanges use different field names. Normalize into a common schema for downstream logic:
# normalized schema example
{
"exchange": "binance",
"symbol": "BTC/USDT",
"type": "orderbook", # or 'trade'
"timestamp": 1690000000000,
"payload": {
"bids": [[price, size], ...],
"asks": [[price, size], ...],
"seq": 12345
}
}
Keep a small adapter layer that maps vendor-specific keys to the normalized schema. This makes strategies portable across exchanges.
6) Rate limits & backpressure
Respect REST and WS rate limits. Implement a local rate limiter (token bucket) and backpressure for inbound API bursts. Example using a simple sleep/backoff for REST and queueing for WS.
import time
from collections import deque
class SimpleRateLimiter:
def __init__(self, max_per_second):
self.max_per_second = max_per_second
self.timestamps = deque()
def wait(self):
now = time.time()
while len(self.timestamps) >= self.max_per_second:
if now - self.timestamps[0] > 1:
self.timestamps.popleft()
else:
time.sleep(0.05)
now = time.time()
self.timestamps.append(now)
Tip: For production use a robust library (ratelimit, asyncio-semaphores) and honor exchange-specific headers describing remaining quota.
7) Persisting market data
Store snapshots/deltas and trades for replay and backtesting. Use Parquet/TSDB for efficient time-series storage. Example: append to local Parquet or push to InfluxDB/TimescaleDB.
import pandas as pd
def persist_ticks(ticks, filename='ticks.parquet'):
df = pd.DataFrame(ticks)
df.to_parquet(filename, index=False)
Include metadata (exchange, symbol, ingestion source, seq numbers) to make replays deterministic.
8) Debugging & observability
- Log sequence numbers and snapshot timestamps to detect gaps.
- Emit metrics: ws reconnect count, message processing latency, delta gaps, snapshot refreshes.
- Expose a /health or /metrics endpoint for monitoring and alerts.
What you’ll build next
Lesson 4 will implement order placement on exchange testnets and handling of fills/cancellations. Use the normalized market feed you built here as the input to the trading layer.
