Templates — Free Tools (Python-first)
A curated collection of copy‑ready templates and snippets to accelerate building and operating trading bots. This page is Python-focused and includes idempotent order examples, retry/backoff, websocket clients, webhook handlers, reconciliation tools, Prometheus metrics, CI/CD and runbook templates — ready to copy or download.
Core templates
Idempotency-Key: <uuid> # Example: Idempotency-Key: 1b9d6f...-unique-req-0001
Use a UUID per logical request; persist keys and response mapping to return the same response on retries.
# retry_backoff.py
import random, time
def backoff_sleep(attempt, base=0.5, cap=30, jitter=0.1):
sleep = min(cap, base * 2 ** attempt)
sleep = sleep * (1 + random.uniform(-jitter, jitter))
time.sleep(sleep)
# Usage:
attempt = 0
max_attempts = 5
while attempt < max_attempts:
try:
# call API
break
except RetriableError:
backoff_sleep(attempt)
attempt += 1
Tune base, cap and jitter by exchange/API behaviour to avoid synchronized retries.
# order_place.py
import requests, uuid, os
API_URL = os.getenv("API_URL")
API_KEY = os.getenv("API_KEY")
def place_order(symbol, side, qty, price=None, order_type="limit"):
client_id = str(uuid.uuid4())
headers = {
"Authorization": f"Bearer {API_KEY}",
"Idempotency-Key": client_id,
"Content-Type": "application/json"
}
body = {"symbol": symbol, "side": side, "qty": qty, "type": order_type}
if price: body["price"] = price
r = requests.post(f"{API_URL}/orders", json=body, headers=headers, timeout=10)
r.raise_for_status()
return r.json()
# Example:
# resp = place_order("BTC-USD", "buy", 0.01, price=45000)
Server must persist idempotency keys and return identical result for the same key.
Python templates (many)
# ws_client.py (asyncio, websockets)
import asyncio, json, os
import websockets
WS_URL = os.getenv("WS_URL")
async def handle_message(msg):
data = json.loads(msg)
# process orderbook or trade
print("MSG", data)
async def run():
async with websockets.connect(WS_URL) as ws:
await ws.send(json.dumps({"type":"subscribe", "channels":["orderbook"]}))
async for message in ws:
await handle_message(message)
if __name__ == "__main__":
asyncio.run(run())
Use reconnect/backoff logic and persist sequence numbers if provided by the exchange.
# webhook.py
from fastapi import FastAPI, Request, Header, HTTPException
import hmac, hashlib, os
app = FastAPI()
SHARED_SECRET = os.getenv("WEBHOOK_SECRET", "")
def verify_signature(body: bytes, sig_header: str):
expected = hmac.new(SHARED_SECRET.encode(), body, hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, sig_header)
@app.post("/webhook")
async def webhook(req: Request, x_signature: str = Header(None)):
body = await req.body()
if not verify_signature(body, x_signature):
raise HTTPException(status_code=403, detail="Invalid signature")
event = await req.json()
# enqueue processing
return {"status":"ok"}
Secure with signature verification and replay protection (timestamps + nonce).
# async_worker.py
import asyncio, aiohttp, os, uuid
API_URL = os.getenv("API_URL")
API_KEY = os.getenv("API_KEY")
async def place_order(session, payload):
headers = {"Authorization":f"Bearer {API_KEY}","Idempotency-Key": str(uuid.uuid4())}
async with session.post(f"{API_URL}/orders", json=payload, headers=headers, timeout=10) as resp:
return await resp.json()
async def worker(queue):
async with aiohttp.ClientSession() as session:
while True:
job = await queue.get()
try:
await place_order(session, job)
finally:
queue.task_done()
async def main():
q = asyncio.Queue()
# push jobs into q...
await q.join()
if __name__ == "__main__":
asyncio.run(main())
Use connection pooling and limit concurrency to respect rate limits.
# models.py
from sqlalchemy import Column, Integer, String, Numeric, DateTime, Boolean, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func
Base = declarative_base()
class Order(Base):
__tablename__ = "orders"
id = Column(Integer, primary_key=True)
order_id = Column(String, unique=True, index=True)
client_id = Column(String, index=True)
symbol = Column(String)
qty = Column(Numeric)
price = Column(Numeric)
status = Column(String)
created_at = Column(DateTime(timezone=True), server_default=func.now())
class Fill(Base):
__tablename__ = "fills"
id = Column(Integer, primary_key=True)
order_id = Column(String, ForeignKey("orders.order_id"))
filled_qty = Column(Numeric)
price = Column(Numeric)
filled_at = Column(DateTime(timezone=True), server_default=func.now())
Index order_id and client_id for fast lookups during reconciliation.
# alembic env snippet (upgrade)
def upgrade():
op.create_table('orders',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('order_id', sa.String, nullable=False),
sa.Column('client_id', sa.String),
sa.Column('symbol', sa.String),
sa.Column('qty', sa.Numeric),
)
Use migrations to keep schema consistent across environments.
# settings.py
from pydantic import BaseSettings, Field, SecretStr
class Settings(BaseSettings):
api_url: str
api_key: SecretStr
environment: str = "staging"
class Config:
env_file = ".env"
Use SecretStr for secrets; load from .env in local dev and secrets manager in production.
# metrics.py
from prometheus_client import Counter, Gauge, start_http_server
ORDERS_PLACED = Counter('orders_placed_total', 'Total orders placed')
CURRENT_POS = Gauge('current_position_size', 'Current position size')
def start_metrics_port(port=8000):
start_http_server(port)
Start the metrics port and instrument critical code paths (orders, fills, latency).
# tests/test_order_flow.py
def test_place_order_in_sandbox(client):
resp = client.post("/orders", json={"symbol":"BTC-USD","qty":0.01,"type":"limit","price":45000})
assert resp.status_code == 200
data = resp.json()
assert "order_id" in data
Use fixtures to inject sandbox credentials and mock external responses where appropriate.
DevOps templates
# Dockerfile FROM python:3.11-slim WORKDIR /app COPY pyproject.toml requirements.txt ./ RUN pip install --no-cache-dir -r requirements.txt COPY . . ENV PYTHONUNBUFFERED=1 CMD ["gunicorn","-k","uvicorn.workers.UvicornWorker","bot.main:app","--bind","0.0.0.0:8000","--workers","2"]
Use multi-stage builds or smaller base images for optimized images.
# .github/workflows/ci.yml
name: CI
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v4
with: python-version: '3.11'
- run: pip install -r requirements.txt
- run: pytest -q
Add a job to run sandbox integrations against testnet when appropriate and protected by secrets.
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 10
periodSeconds: 5
Health endpoints should include dependency checks (DB, exchange connectivity, reconciler status).
Docs & runbook templates
# Post-mortem: [Title] - Date/Time: - Severity: - Summary: - Timeline: - Root cause: - Impact: - Mitigation: - Action items (owner/due):
Keep postmortems short, factual and focussed on corrective action and measurement.
1. Check reconciliation dashboard for mismatched orders. 2. Identify order_id and client_id. 3. Compare expected vs actual fills. 4. If discrepancy < threshold, create adjustment; else pause trading and escalate.
Embed runbook steps in pager notifications for operators to follow during incidents.
