Skip to content

Build price alerts

Price alerts are deceptively easy to start and surprisingly easy to get wrong. A naive implementation fires repeatedly when the price oscillates around a threshold, fires during low-liquidity ticks, or floods the user when the service restarts. This guide builds a robust one in Python.

What “robust” means

The worker should:

  1. Fire once per crossing, not once per poll.
  2. Suppress alerts on stale data (no new tick = no alert).
  3. Tolerate gateway hiccups without losing state.
  4. Use the API quota efficiently (batched polling).

Data model

We store one row per active alert and one row per fired alert event:

CREATE TABLE alerts (
id BIGSERIAL PRIMARY KEY,
user_id UUID NOT NULL,
symbol TEXT NOT NULL,
threshold NUMERIC(20, 6) NOT NULL,
direction TEXT NOT NULL CHECK (direction IN ('above', 'below')),
last_seen_price NUMERIC(20, 6),
last_evaluated_at TIMESTAMPTZ,
triggered_at TIMESTAMPTZ,
cooldown_until TIMESTAMPTZ
);
CREATE TABLE alert_events (
id BIGSERIAL PRIMARY KEY,
alert_id BIGINT NOT NULL REFERENCES alerts(id),
fired_at TIMESTAMPTZ NOT NULL DEFAULT now(),
price NUMERIC(20, 6) NOT NULL,
quote_timestamp TIMESTAMPTZ NOT NULL,
quote_source TEXT
);

The cooldown column is the deduplication trick: when an alert fires, we set cooldown_until = now() + interval '15 minutes' and refuse to fire again until then.

Polling worker

  1. Group alerts by symbol so each symbol is fetched once per cycle.

    import os
    from collections import defaultdict
    import httpx
    import psycopg
    from datetime import datetime, timezone, timedelta
    API_KEY = os.environ["ONEAPI_KEY"]
    POLL_INTERVAL = 60 # seconds
    MAX_QUOTE_AGE_MIN = 30 # ignore quotes older than this
    COOLDOWN = timedelta(minutes=15)
    def load_active_alerts(conn):
    rows = conn.execute("""
    SELECT id, symbol, threshold, direction, last_seen_price, cooldown_until
    FROM alerts
    WHERE (cooldown_until IS NULL OR cooldown_until < now())
    """).fetchall()
    by_symbol = defaultdict(list)
    for row in rows:
    by_symbol[row[1]].append(row)
    return by_symbol
  2. Batch up to 8 symbols per /v1/quote call.

    def fetch_quotes(symbols: list[str]) -> dict[str, dict]:
    out: dict[str, dict] = {}
    for i in range(0, len(symbols), 8):
    batch = symbols[i:i + 8]
    r = httpx.get(
    "https://api.oneapi.finance/v1/quote",
    params={"symbols": ",".join(batch)},
    headers={"Authorization": f"Bearer {API_KEY}"},
    timeout=15.0,
    )
    r.raise_for_status()
    out.update(r.json().get("quotes", {}))
    return out
  3. Detect crossings.

    The trick is to look at the previous observation, not just the current one. We have a crossing if:

    • direction is above and prev < threshold <= now, or
    • direction is below and prev > threshold >= now.
    def crossed(direction: str, prev: float | None, now: float, threshold: float) -> bool:
    if prev is None:
    # First observation. Fire only if condition is currently true.
    return (direction == "above" and now >= threshold) or \
    (direction == "below" and now <= threshold)
    if direction == "above":
    return prev < threshold <= now
    if direction == "below":
    return prev > threshold >= now
    return False
  4. Tie it together.

    def too_stale(quote: dict) -> bool:
    ts = datetime.fromisoformat(quote["timestamp"].replace("Z", "+00:00"))
    age = datetime.now(timezone.utc) - ts
    return age > timedelta(minutes=MAX_QUOTE_AGE_MIN)
    def evaluate(conn):
    alerts_by_symbol = load_active_alerts(conn)
    if not alerts_by_symbol:
    return
    quotes = fetch_quotes(list(alerts_by_symbol.keys()))
    for symbol, alerts in alerts_by_symbol.items():
    q = quotes.get(symbol)
    if not q or q.get("price") is None:
    continue
    if too_stale(q):
    continue
    price = q["price"]
    q_ts = q["timestamp"]
    q_src = q.get("meta", {}).get("source")
    for alert_id, _, threshold, direction, prev, _ in alerts:
    threshold = float(threshold)
    prev = float(prev) if prev is not None else None
    if crossed(direction, prev, price, threshold):
    conn.execute(
    "INSERT INTO alert_events (alert_id, price, quote_timestamp, quote_source) "
    "VALUES (%s, %s, %s, %s)",
    (alert_id, price, q_ts, q_src),
    )
    conn.execute(
    "UPDATE alerts SET triggered_at = now(), "
    "cooldown_until = now() + INTERVAL '15 minutes', "
    "last_seen_price = %s, last_evaluated_at = now() WHERE id = %s",
    (price, alert_id),
    )
    send_notification(alert_id, symbol, direction, threshold, price)
    else:
    conn.execute(
    "UPDATE alerts SET last_seen_price = %s, last_evaluated_at = now() WHERE id = %s",
    (price, alert_id),
    )
    conn.commit()
  5. Run on a schedule.

    Use a cron, a systemd timer, or a Cloudflare Cron Trigger. Calling evaluate() every 60 seconds with hundreds of symbols stays well under the Indie tier’s per-minute cap because of batching: 100 symbols requires 13 batched calls.

Why this approach

  • Crossings, not levels. Naive “fire when price > threshold” fires every poll. We require a crossing: previous below, current at-or-above. State is persisted so a restart does not re-fire.
  • Stale-data suppression. If our upstream is silent for 30+ minutes, the worker abstains rather than firing on stale prices. The user would rather miss an alert than receive a wrong one.
  • Cooldown. Volatile names that oscillate around a threshold would fire every minute without it. 15 minutes is a sensible default; expose it as a per-alert setting if your users want.
  • Idempotent inserts. The alert_events row is the source of truth. If the worker dies after inserting it but before sending the notification, the next run can replay from alert_events rather than re-evaluating.

Going further

  • Subscribe to /v1/quote?symbols=... once per minute even when alerts are scarce, so the cache stays warm.
  • Add percentage-change alerts (change > 5% since open) by adding previousClose to the comparison.
  • Use the usage endpoint to back off polling if you are about to exhaust your quota for the month.

See also