Lesson 3 — Market data (REST & WebSocket)

Skip to content

Lesson 3 — Market data (REST & WebSocket)

Implement reliable REST polling and WebSocket subscriptions, normalize different exchange payloads, merge snapshot + delta orderbooks, and respect rate limits.

Estimated time: 45–60 minutes • Skill level: Beginner → Intermediate

Use this as a starting point. Don’t commit secrets.

Prerequisites

  • Completion of Crypto Lessons 1–2 (env setup and HMAC auth)
  • Python 3.10+, requests, websockets, pandas (optional)
  • Testnet API keys and WebSocket URL for your exchange

Overview

We’ll build two complementary components:

  1. a REST poller for slower, periodic data and fallback snapshots, and
  2. a WebSocket client that ingests real-time snapshots and incremental deltas (orderbook updates).

1) Recommended .env values

# .env
EXCHANGE_API_URL=https://testnet-api.example.com
EXCHANGE_API_KEY=your_testnet_api_key
EXCHANGE_API_SECRET=your_testnet_api_secret
EXCHANGE_WS_URL=wss://testnet-ws.example.com
MARKETS=BTC/USDT,ETH/USDT

2) REST poller (rest_poller.py)

Use REST polling for periodic snapshots (orderbook snapshots, recent trades). Respect rate limits; cache responses and use exponential backoff on 5xx errors.

import os, time, json
import requests
from dotenv import load_dotenv

load_dotenv()
API_URL = os.getenv('EXCHANGE_API_URL')
MARKETS = os.getenv('MARKETS','BTC/USDT').split(',')

def fetch_orderbook(market):
    url = API_URL.rstrip('/') + f"/orderbook/{market.replace('/','')}"
    r = requests.get(url, timeout=10)
    r.raise_for_status()
    return r.json()

def poll_loop(interval=5):
    while True:
        for m in MARKETS:
            try:
                ob = fetch_orderbook(m)
                # persist or feed into your normalization pipeline
                print('snapshot', m, 'top bids', ob.get('bids',[])[:3])
            except Exception as e:
                print('rest poll error', e)
        time.sleep(interval)

if __name__ == '__main__':
    poll_loop()

Tip: Keep REST polling cadence low (1–5s for snapshots) and rely on WS for high-frequency updates. Use conditional requests or ETags if the API supports them.

3) WebSocket subscriber (ws_client.py)

The WebSocket client receives snapshots and deltas. This example uses asyncio + websockets and shows a subscription and basic parsing loop.

import asyncio, json, os
import websockets
from dotenv import load_dotenv

load_dotenv()
WS_URL = os.getenv('EXCHANGE_WS_URL')
MARKETS = os.getenv('MARKETS','BTC/USDT').split(',')

async def ws_run():
    async with websockets.connect(WS_URL) as ws:
        # subscribe - adapt to exchange protocol
        subscribe = {"op":"subscribe", "args":[{"channel":"orderbook","symbols":MARKETS}]}
        await ws.send(json.dumps(subscribe))
        while True:
            msg = await ws.recv()
            data = json.loads(msg)
            # handle snapshot or incremental update
            handle_message(data)

def handle_message(data):
    # Example: data could be {'type':'snapshot','symbol':'BTC/USDT','bids':[[price,size],...], 'asks':[...] }
    print('ws message', data.get('type'))
    # feed into normalizer / orderbook merger

if __name__ == '__main__':
    asyncio.run(ws_run())

WS auth: if auth required, sign per exchange docs and send the auth frame before subscribing.

4) Snapshot + delta merging (orderbook merge)

Pattern: fetch a full snapshot via REST then apply incremental deltas from WS (sequence numbers) until the next snapshot. This avoids gaps and avoids rebuilding from every delta.

# orderbook_merge.py (conceptual)
# orderbook format: {'bids': {price:size}, 'asks': {price:size}, 'seq': n}
def apply_delta(ob, delta):
    # delta example: {'side':'bid','price':100.0,'size':1.5,'action':'update'|'remove'}
    side = 'bids' if delta['side']=='bid' else 'asks'
    price = float(delta['price'])
    if delta['action']=='remove':
        ob[side].pop(price, None)
    else:
        size = float(delta['size'])
        if size == 0:
            ob[side].pop(price, None)
        else:
            ob[side][price] = size
    ob['seq'] = delta.get('seq', ob.get('seq'))
    return ob

def snapshot_to_dict(snapshot):
    return {'bids': {float(p):float(s) for p,s in snapshot.get('bids',[])}, 'asks': {float(p):float(s) for p,s in snapshot.get('asks',[])}, 'seq': snapshot.get('seq',0)} 

Important: Use sequence numbers to ensure delta continuity. If you detect a gap (delta.seq != snapshot.seq+1), re-fetch a fresh snapshot before applying further deltas.

5) Normalization & unified schema

Different exchanges use different field names. Normalize into a common schema for downstream logic:

# normalized schema example
{
  "exchange": "binance",
  "symbol": "BTC/USDT",
  "type": "orderbook",   # or 'trade'
  "timestamp": 1690000000000,
  "payload": {
     "bids": [[price, size], ...],
     "asks": [[price, size], ...],
     "seq": 12345
  }
}

Keep a small adapter layer that maps vendor-specific keys to the normalized schema. This makes strategies portable across exchanges.

6) Rate limits & backpressure

Respect REST and WS rate limits. Implement a local rate limiter (token bucket) and backpressure for inbound API bursts. Example using a simple sleep/backoff for REST and queueing for WS.

import time
from collections import deque

class SimpleRateLimiter:
    def __init__(self, max_per_second):
        self.max_per_second = max_per_second
        self.timestamps = deque()

    def wait(self):
        now = time.time()
        while len(self.timestamps) >= self.max_per_second:
            if now - self.timestamps[0] > 1:
                self.timestamps.popleft()
            else:
                time.sleep(0.05)
                now = time.time()
        self.timestamps.append(now)

Tip: For production use a robust library (ratelimit, asyncio-semaphores) and honor exchange-specific headers describing remaining quota.

7) Persisting market data

Store snapshots/deltas and trades for replay and backtesting. Use Parquet/TSDB for efficient time-series storage. Example: append to local Parquet or push to InfluxDB/TimescaleDB.

import pandas as pd
def persist_ticks(ticks, filename='ticks.parquet'):
    df = pd.DataFrame(ticks)
    df.to_parquet(filename, index=False)

Include metadata (exchange, symbol, ingestion source, seq numbers) to make replays deterministic.

8) Debugging & observability

  • Log sequence numbers and snapshot timestamps to detect gaps.
  • Emit metrics: ws reconnect count, message processing latency, delta gaps, snapshot refreshes.
  • Expose a /health or /metrics endpoint for monitoring and alerts.

What you’ll build next

Lesson 4 will implement order placement on exchange testnets and handling of fills/cancellations. Use the normalized market feed you built here as the input to the trading layer.

Back to course hub

← Previous
Author: Stephane Patteux • Part of the Build your own bot series

Don’t miss our blog post!

We don’t spam! Read our privacy policy for more info.

Leave a Reply

Your email address will not be published. Required fields are marked *