Core templates

Idempotency header
Idempotency-Key: <uuid>

# Example:
Idempotency-Key: 1b9d6f...-unique-req-0001

Use a UUID per logical request; persist keys and response mapping to return the same response on retries.

Exponential backoff + jitter (Python)
# retry_backoff.py
import random, time

def backoff_sleep(attempt, base=0.5, cap=30, jitter=0.1):
    sleep = min(cap, base * 2 ** attempt)
    sleep = sleep * (1 + random.uniform(-jitter, jitter))
    time.sleep(sleep)

# Usage:
attempt = 0
max_attempts = 5
while attempt < max_attempts:
    try:
        # call API
        break
    except RetriableError:
        backoff_sleep(attempt)
        attempt += 1

Tune base, cap and jitter by exchange/API behaviour to avoid synchronized retries.

Idempotent order placement (Python requests)
# order_place.py
import requests, uuid, os

API_URL = os.getenv("API_URL")
API_KEY = os.getenv("API_KEY")

def place_order(symbol, side, qty, price=None, order_type="limit"):
    client_id = str(uuid.uuid4())
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Idempotency-Key": client_id,
        "Content-Type": "application/json"
    }
    body = {"symbol": symbol, "side": side, "qty": qty, "type": order_type}
    if price: body["price"] = price
    r = requests.post(f"{API_URL}/orders", json=body, headers=headers, timeout=10)
    r.raise_for_status()
    return r.json()

# Example:
# resp = place_order("BTC-USD", "buy", 0.01, price=45000)

Server must persist idempotency keys and return identical result for the same key.

Python templates (many)

Async WebSocket client (orderbook streaming)
# ws_client.py (asyncio, websockets)
import asyncio, json, os
import websockets

WS_URL = os.getenv("WS_URL")

async def handle_message(msg):
    data = json.loads(msg)
    # process orderbook or trade
    print("MSG", data)

async def run():
    async with websockets.connect(WS_URL) as ws:
        await ws.send(json.dumps({"type":"subscribe", "channels":["orderbook"]}))
        async for message in ws:
            await handle_message(message)

if __name__ == "__main__":
    asyncio.run(run())

Use reconnect/backoff logic and persist sequence numbers if provided by the exchange.

Webhook receiver (FastAPI)
# webhook.py
from fastapi import FastAPI, Request, Header, HTTPException
import hmac, hashlib, os

app = FastAPI()
SHARED_SECRET = os.getenv("WEBHOOK_SECRET", "")

def verify_signature(body: bytes, sig_header: str):
    expected = hmac.new(SHARED_SECRET.encode(), body, hashlib.sha256).hexdigest()
    return hmac.compare_digest(expected, sig_header)

@app.post("/webhook")
async def webhook(req: Request, x_signature: str = Header(None)):
    body = await req.body()
    if not verify_signature(body, x_signature):
        raise HTTPException(status_code=403, detail="Invalid signature")
    event = await req.json()
    # enqueue processing
    return {"status":"ok"}

Secure with signature verification and replay protection (timestamps + nonce).

Async order worker (aiohttp)
# async_worker.py
import asyncio, aiohttp, os, uuid

API_URL = os.getenv("API_URL")
API_KEY = os.getenv("API_KEY")

async def place_order(session, payload):
    headers = {"Authorization":f"Bearer {API_KEY}","Idempotency-Key": str(uuid.uuid4())}
    async with session.post(f"{API_URL}/orders", json=payload, headers=headers, timeout=10) as resp:
        return await resp.json()

async def worker(queue):
    async with aiohttp.ClientSession() as session:
        while True:
            job = await queue.get()
            try:
                await place_order(session, job)
            finally:
                queue.task_done()

async def main():
    q = asyncio.Queue()
    # push jobs into q...
    await q.join()

if __name__ == "__main__":
    asyncio.run(main())

Use connection pooling and limit concurrency to respect rate limits.

SQLAlchemy models (orders & fills)
# models.py
from sqlalchemy import Column, Integer, String, Numeric, DateTime, Boolean, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func

Base = declarative_base()

class Order(Base):
    __tablename__ = "orders"
    id = Column(Integer, primary_key=True)
    order_id = Column(String, unique=True, index=True)
    client_id = Column(String, index=True)
    symbol = Column(String)
    qty = Column(Numeric)
    price = Column(Numeric)
    status = Column(String)
    created_at = Column(DateTime(timezone=True), server_default=func.now())

class Fill(Base):
    __tablename__ = "fills"
    id = Column(Integer, primary_key=True)
    order_id = Column(String, ForeignKey("orders.order_id"))
    filled_qty = Column(Numeric)
    price = Column(Numeric)
    filled_at = Column(DateTime(timezone=True), server_default=func.now())

Index order_id and client_id for fast lookups during reconciliation.

Alembic migration snippet
# alembic env snippet (upgrade)
def upgrade():
    op.create_table('orders',
        sa.Column('id', sa.Integer, primary_key=True),
        sa.Column('order_id', sa.String, nullable=False),
        sa.Column('client_id', sa.String),
        sa.Column('symbol', sa.String),
        sa.Column('qty', sa.Numeric),
    )

Use migrations to keep schema consistent across environments.

Pydantic settings (BaseSettings)
# settings.py
from pydantic import BaseSettings, Field, SecretStr

class Settings(BaseSettings):
    api_url: str
    api_key: SecretStr
    environment: str = "staging"
    class Config:
        env_file = ".env"

Use SecretStr for secrets; load from .env in local dev and secrets manager in production.

Prometheus metrics (prometheus_client)
# metrics.py
from prometheus_client import Counter, Gauge, start_http_server

ORDERS_PLACED = Counter('orders_placed_total', 'Total orders placed')
CURRENT_POS = Gauge('current_position_size', 'Current position size')

def start_metrics_port(port=8000):
    start_http_server(port)

Start the metrics port and instrument critical code paths (orders, fills, latency).

pytest smoke test (sandbox)
# tests/test_order_flow.py
def test_place_order_in_sandbox(client):
    resp = client.post("/orders", json={"symbol":"BTC-USD","qty":0.01,"type":"limit","price":45000})
    assert resp.status_code == 200
    data = resp.json()
    assert "order_id" in data

Use fixtures to inject sandbox credentials and mock external responses where appropriate.

DevOps templates

Dockerfile (Python production)
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY pyproject.toml requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
ENV PYTHONUNBUFFERED=1
CMD ["gunicorn","-k","uvicorn.workers.UvicornWorker","bot.main:app","--bind","0.0.0.0:8000","--workers","2"]

Use multi-stage builds or smaller base images for optimized images.

GitHub Actions — CI (Python)
# .github/workflows/ci.yml
name: CI
on: [push, pull_request]
jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v4
        with: python-version: '3.11'
      - run: pip install -r requirements.txt
      - run: pytest -q

Add a job to run sandbox integrations against testnet when appropriate and protected by secrets.

Kubernetes liveness & readiness probes (example)
livenessProbe:
  httpGet:
    path: /health
    port: 8000
  initialDelaySeconds: 30
  periodSeconds: 10
readinessProbe:
  httpGet:
    path: /ready
    port: 8000
  initialDelaySeconds: 10
  periodSeconds: 5

Health endpoints should include dependency checks (DB, exchange connectivity, reconciler status).

Docs & runbook templates

Incident post-mortem template
# Post-mortem: [Title]
- Date/Time:
- Severity:
- Summary:
- Timeline:
- Root cause:
- Impact:
- Mitigation:
- Action items (owner/due):

Keep postmortems short, factual and focussed on corrective action and measurement.

Runbook extract — order mismatch
1. Check reconciliation dashboard for mismatched orders.
2. Identify order_id and client_id.
3. Compare expected vs actual fills.
4. If discrepancy < threshold, create adjustment; else pause trading and escalate.

Embed runbook steps in pager notifications for operators to follow during incidents.

© BotBlog — Free Tools • Last updated: 2025-11-11