Build price alerts
Price alerts are deceptively easy to start and surprisingly easy to get wrong. A naive implementation fires repeatedly when the price oscillates around a threshold, fires during low-liquidity ticks, or floods the user when the service restarts. This guide builds a robust one in Python.
What “robust” means
The worker should:
- Fire once per crossing, not once per poll.
- Suppress alerts on stale data (no new tick = no alert).
- Tolerate gateway hiccups without losing state.
- Use the API quota efficiently (batched polling).
Data model
We store one row per active alert and one row per fired alert event:
CREATE TABLE alerts ( id BIGSERIAL PRIMARY KEY, user_id UUID NOT NULL, symbol TEXT NOT NULL, threshold NUMERIC(20, 6) NOT NULL, direction TEXT NOT NULL CHECK (direction IN ('above', 'below')), last_seen_price NUMERIC(20, 6), last_evaluated_at TIMESTAMPTZ, triggered_at TIMESTAMPTZ, cooldown_until TIMESTAMPTZ);
CREATE TABLE alert_events ( id BIGSERIAL PRIMARY KEY, alert_id BIGINT NOT NULL REFERENCES alerts(id), fired_at TIMESTAMPTZ NOT NULL DEFAULT now(), price NUMERIC(20, 6) NOT NULL, quote_timestamp TIMESTAMPTZ NOT NULL, quote_source TEXT);The cooldown column is the deduplication trick: when an alert fires, we set
cooldown_until = now() + interval '15 minutes' and refuse to fire again
until then.
Polling worker
-
Group alerts by symbol so each symbol is fetched once per cycle.
import osfrom collections import defaultdictimport httpximport psycopgfrom datetime import datetime, timezone, timedeltaAPI_KEY = os.environ["ONEAPI_KEY"]POLL_INTERVAL = 60 # secondsMAX_QUOTE_AGE_MIN = 30 # ignore quotes older than thisCOOLDOWN = timedelta(minutes=15)def load_active_alerts(conn):rows = conn.execute("""SELECT id, symbol, threshold, direction, last_seen_price, cooldown_untilFROM alertsWHERE (cooldown_until IS NULL OR cooldown_until < now())""").fetchall()by_symbol = defaultdict(list)for row in rows:by_symbol[row[1]].append(row)return by_symbol -
Batch up to 8 symbols per
/v1/quotecall.def fetch_quotes(symbols: list[str]) -> dict[str, dict]:out: dict[str, dict] = {}for i in range(0, len(symbols), 8):batch = symbols[i:i + 8]r = httpx.get("https://api.oneapi.finance/v1/quote",params={"symbols": ",".join(batch)},headers={"Authorization": f"Bearer {API_KEY}"},timeout=15.0,)r.raise_for_status()out.update(r.json().get("quotes", {}))return out -
Detect crossings.
The trick is to look at the previous observation, not just the current one. We have a crossing if:
- direction is
aboveandprev < threshold <= now, or - direction is
belowandprev > threshold >= now.
def crossed(direction: str, prev: float | None, now: float, threshold: float) -> bool:if prev is None:# First observation. Fire only if condition is currently true.return (direction == "above" and now >= threshold) or \(direction == "below" and now <= threshold)if direction == "above":return prev < threshold <= nowif direction == "below":return prev > threshold >= nowreturn False - direction is
-
Tie it together.
def too_stale(quote: dict) -> bool:ts = datetime.fromisoformat(quote["timestamp"].replace("Z", "+00:00"))age = datetime.now(timezone.utc) - tsreturn age > timedelta(minutes=MAX_QUOTE_AGE_MIN)def evaluate(conn):alerts_by_symbol = load_active_alerts(conn)if not alerts_by_symbol:returnquotes = fetch_quotes(list(alerts_by_symbol.keys()))for symbol, alerts in alerts_by_symbol.items():q = quotes.get(symbol)if not q or q.get("price") is None:continueif too_stale(q):continueprice = q["price"]q_ts = q["timestamp"]q_src = q.get("meta", {}).get("source")for alert_id, _, threshold, direction, prev, _ in alerts:threshold = float(threshold)prev = float(prev) if prev is not None else Noneif crossed(direction, prev, price, threshold):conn.execute("INSERT INTO alert_events (alert_id, price, quote_timestamp, quote_source) ""VALUES (%s, %s, %s, %s)",(alert_id, price, q_ts, q_src),)conn.execute("UPDATE alerts SET triggered_at = now(), ""cooldown_until = now() + INTERVAL '15 minutes', ""last_seen_price = %s, last_evaluated_at = now() WHERE id = %s",(price, alert_id),)send_notification(alert_id, symbol, direction, threshold, price)else:conn.execute("UPDATE alerts SET last_seen_price = %s, last_evaluated_at = now() WHERE id = %s",(price, alert_id),)conn.commit() -
Run on a schedule.
Use a cron, a systemd timer, or a Cloudflare Cron Trigger. Calling
evaluate()every 60 seconds with hundreds of symbols stays well under the Indie tier’s per-minute cap because of batching: 100 symbols requires 13 batched calls.
Why this approach
- Crossings, not levels. Naive “fire when price > threshold” fires every poll. We require a crossing: previous below, current at-or-above. State is persisted so a restart does not re-fire.
- Stale-data suppression. If our upstream is silent for 30+ minutes, the worker abstains rather than firing on stale prices. The user would rather miss an alert than receive a wrong one.
- Cooldown. Volatile names that oscillate around a threshold would fire every minute without it. 15 minutes is a sensible default; expose it as a per-alert setting if your users want.
- Idempotent inserts. The
alert_eventsrow is the source of truth. If the worker dies after inserting it but before sending the notification, the next run can replay fromalert_eventsrather than re-evaluating.
Going further
- Subscribe to
/v1/quote?symbols=...once per minute even when alerts are scarce, so the cache stays warm. - Add percentage-change alerts (
change > 5%since open) by addingpreviousCloseto the comparison. - Use the usage endpoint to back off polling if you are about to exhaust your quota for the month.
See also
/v1/quote— batched quote endpoint.- Rate limits — quota math.
- Source attribution —
meta.sourcefor monitoring.