Alert Ingestion Design¶
This document covers the full design of the alert ingestion subsystem — how external astronomical event streams get into OpenAstro, normalized, deduplicated, prioritized, and translated into actionable targets for the network. It builds on the high-level mention in Details on componets.md and fills in the specific format, parsing, and operational details for each source.
1. What Alert Ingestion Must Do¶
External alert streams are the primary mechanism by which OpenAstro learns about time-critical targets it didn't already know about. The ingestion system must:
- Pull or receive alerts from each source on the appropriate cadence
- Normalize the heterogeneous formats into a single internal schema
- Deduplicate — the same event often appears across multiple brokers (a single ZTF transient may appear in ZTF, TNS, and Gaia Alerts)
- Prioritize — assign a meaningful priority score based on alert type and science value
- Create or update targets in the database
- Trigger scheduler re-evaluation so urgent targets propagate to client sites quickly
- Log everything — the raw payload is always saved regardless of what we do with it
What It Must Not Do¶
- Make irrevocable decisions about target creation without bounds checking (don't create targets at dec < -90°)
- Block the main FastAPI application thread
- Introduce circular alerts (we shouldn't re-ingest our own submitted data)
- Hammer source APIs with excessive requests (respect rate limits)
2. Source Inventory¶
2.1 ZTF (Zwicky Transient Facility)¶
What it is: Wide-field survey covering the northern sky every ~2 nights at g/r/i bands. Real-time alert stream for all sources deviating from baseline. ~1 million alerts per night; we only care about a tiny fraction.
Alert format: AVRO packets distributed via Apache Kafka stream. Each packet contains:
- Alert ID (ZTF format: ZTF{year}aaaaaaaa)
- RA, Dec (degrees)
- JD of detection
- Photometric measurements (PSF magnitudes, errors) for current and last 2 detections
- Source metadata: real-bogus score, star-galaxy score
- xmatch section: cross-matches against PS1, 2MASS, Gaia DR2
Access methods:
- Full Kafka stream: Requires subscription through ALeRCE, ANTARES, Lasair, or Fink broker. These are filter brokers that pre-classify alerts. Recommended: use one of these rather than raw Kafka.
- Lasair API: REST JSON, filtered queries. Query format: https://lasair-ztf.lsst.ac.uk/api/streams/my_filter/ — requires account, free.
- ALeRCE API: https://alerce.online/api/v1/objects/{oid} — returns complete object history. Query new alerts: https://alerce.online/api/v1/objects?lastmjd__gte={mjd}.
Recommended approach for OpenAstro: Use the ALeRCE query API with a polling interval of 5 minutes. Query for alerts classified as:
- classifier=lc_classifier&class=SN (supernovae — possible transient follow-up)
- classifier=lc_classifier&class=AGN (blazars — if we want blazar monitoring)
- classifier=stamp_classifier&class=real&rb__gte=0.9 (high real-bogus score)
- Filter by apparent magnitude: magpsf__lte=18 (bright enough for amateur gear)
Parsing ZTF alerts:
def parse_ztf_alert(alerce_object: dict) -> NormalizedAlert:
return NormalizedAlert(
source='ztf',
external_id=alerce_object['oid'],
ra_deg=alerce_object['meanra'],
dec_deg=alerce_object['meandec'],
position_error_deg=0.0003, # ZTF astrometry ~1 arcsec
alert_type=classify_ztf(alerce_object),
magnitude=alerce_object['lastmagnr'],
magnitude_band='r',
discovery_time=mjd_to_utc(alerce_object['firstmjd']),
last_detection=mjd_to_utc(alerce_object['lastmjd']),
priority=compute_ztf_priority(alerce_object),
raw_payload=alerce_object
)
def classify_ztf(obj: dict) -> str:
classifications = obj.get('classifications', {})
lc_class = classifications.get('lc_classifier', {}).get('class')
if lc_class == 'SN': return 'supernova'
if lc_class == 'AGN': return 'agn'
return 'transient'
def compute_ztf_priority(obj: dict) -> float:
priority = 40 # Base for ZTF transients
mag = obj.get('lastmagnr', 20)
if mag < 15: priority += 20 # Bright — excellent for amateurs
elif mag < 17: priority += 10
# Rapid rise: compare first vs last detection
if obj.get('deltamjd', 0) < 3 and obj.get('magdiff', 0) > 1.0:
priority += 25 # Rising fast — interesting
return min(priority, 90)
Deduplication: ZTF alert oid (object ID) is stable across detections. Store in alerts.external_id. If we already have a target with source='ztf' AND external_id='{oid}', update the target's last-seen time and magnitude rather than creating a new one.
2.2 Gaia Alerts¶
What it is: Science Alerts system from the Gaia mission. Detects transients that deviate from Gaia's own baseline catalog. Primarily: supernovae, AGN flares, microlensing events, cataclysmic variables, YSO outbursts.
Alert format: Simple HTML/CSV table at http://gsaweb.ast.cam.ac.uk/alerts/alertsindex. Also provides an RSS feed and individual alert pages. Each alert has:
- Gaia Alert name: Gaia{YY}{Letter}{SequenceNumber} e.g. Gaia26abc
- RA, Dec (degrees)
- Trigger date (UTC)
- Peak magnitude (Gaia G-band)
- Classification string (free text, e.g. "probable SN", "microlensing event", "AGN variability")
- Light curve data (simple table format)
Access method: Poll the alertsindex page. No official API; parse the HTML table. Gaia also provides a machine-readable format at the same URL with ?format=csv. Use the CSV endpoint.
CSV columns: #Name,Date,JD,AlertMag,HistoricMag,HistoricStd,Class,Published,Comment
Parsing Gaia Alerts:
import requests
import csv
import io
from datetime import datetime, timezone
GAIA_ALERTS_URL = "http://gsaweb.ast.cam.ac.uk/alerts/alertsindex?format=csv"
def fetch_gaia_alerts(since_jd: float) -> list[dict]:
resp = requests.get(GAIA_ALERTS_URL, timeout=30)
resp.raise_for_status()
reader = csv.DictReader(io.StringIO(resp.text), skipinitialspace=True)
alerts = []
for row in reader:
try:
jd = float(row['JD'])
except (ValueError, KeyError):
continue
if jd < since_jd:
continue
alerts.append(row)
return alerts
def parse_gaia_alert(row: dict) -> NormalizedAlert:
# Gaia provides RA/Dec on individual alert page, not in the CSV index
# Fetch individual alert page if we don't have it cached
ra, dec = fetch_gaia_coordinates(row['#Name'])
alert_type = classify_gaia(row.get('Class', ''))
priority = compute_gaia_priority(row, alert_type)
return NormalizedAlert(
source='gaia',
external_id=row['#Name'],
ra_deg=ra,
dec_deg=dec,
position_error_deg=0.0001, # Gaia astrometry is excellent
alert_type=alert_type,
magnitude=float(row.get('AlertMag', 20)),
magnitude_band='G',
discovery_time=jd_to_utc(float(row['JD'])),
priority=priority,
raw_payload=dict(row)
)
def classify_gaia(class_str: str) -> str:
s = class_str.lower()
if 'microlensing' in s: return 'microlensing_event'
if 'sn' in s or 'supernova' in s: return 'supernova'
if 'cv' in s or 'cataclysmic' in s: return 'cv'
if 'agn' in s: return 'agn'
if 'yso' in s: return 'yso'
return 'transient'
Cadence: Poll every 5 minutes. Gaia Alerts publishes ~1–10 alerts per day, so frequent polling wastes little. The since_jd parameter prevents reprocessing old alerts.
Special case — microlensing: Gaia microlensing events are high priority for OpenAstro (see science_cases_and_scheduler.md). When we detect an alert classified as microlensing, immediately set priority = 75 and create the target with requires_simultaneous = false but with multi-longitude coverage encouraged via cadence settings.
2.3 GCN (Gamma-ray Coordinates Network)¶
What it is: The gold standard for GRB and high-energy transient alerts. Distributes notices from Fermi, Swift, INTEGRAL, IceCube, LIGO/Virgo, and others. Critical for GRB afterglow follow-up.
Alert formats: GCN uses two distinct distribution systems: 1. GCN Classic: Old system. Plain text notices over UDP socket. Format varies by instrument. Being replaced. 2. GCN Kafka: New system (launched 2023). AVRO/JSON packets over Kafka. Machine-readable. Preferred.
Access method: GCN Classic required a VOEvent socket subscription. The new GCN Kafka system is accessed via the gcn-kafka Python client. Register at https://gcn.nasa.gov for client credentials.
from gcn_kafka import Consumer
consumer = Consumer(
client_id='your_client_id',
client_secret='your_client_secret'
)
consumer.subscribe(['gcn.classic.text.FERMI_GBM_GND_POS',
'gcn.classic.text.SWIFT_BAT_GRB_POS_ACK',
'gcn.classic.text.FERMI_GBM_FIN_POS',
'gcn.notices.swift.bat.guano'])
GRB Notice types to subscribe to (in priority order):
| Topic | Source | What it provides | Latency |
|---|---|---|---|
SWIFT_BAT_GRB_POS_ACK |
Swift/BAT | Position, rough error box | ~20s post-trigger |
FERMI_GBM_GND_POS |
Fermi/GBM | Ground-computed position | ~15 min |
FERMI_GBM_FIN_POS |
Fermi/GBM | Final best position | ~15 min |
SWIFT_XRT_POSITION |
Swift/XRT | Refined position, <5 arcsec | ~100s post-trigger |
SWIFT_UVOT_POSITION |
Swift/UVOT | Optical counterpart found | Minutes to hours |
Parsing GCN alerts:
import json
from datetime import datetime, timezone
def process_gcn_message(topic: str, message: bytes) -> NormalizedAlert | None:
# GCN Kafka sends JSON
try:
payload = json.loads(message)
except json.JSONDecodeError:
# Fall back to text parsing for legacy format
payload = parse_gcn_text(message.decode('utf-8'))
if payload is None:
return None
# Extract common fields
alert_type = infer_gcn_type(topic, payload)
if alert_type is None:
return None # Not something we act on
ra = extract_ra(payload, topic)
dec = extract_dec(payload, topic)
error_deg = extract_error(payload, topic) # in degrees
if ra is None or dec is None:
return None # No position yet; wait for refined notice
# Compute priority based on type and localization quality
priority = 95 if alert_type == 'grb' else 80
if error_deg > 5.0:
priority -= 20 # Large error box, harder to follow up
return NormalizedAlert(
source='gcn',
external_id=payload.get('trigger_id') or payload.get('trigcat_id'),
ra_deg=ra,
dec_deg=dec,
position_error_deg=error_deg,
alert_type=alert_type,
magnitude=None, # Unknown at trigger time
trigger_time=parse_gcn_time(payload),
priority=priority,
raw_payload=payload
)
def extract_ra(payload: dict, topic: str) -> float | None:
# GCN JSON format (new Kafka-based notices)
if 'ra' in payload: return float(payload['ra'])
# Text format key variants
for key in ['RA', 'Ra', 'RIGHT_ASCENSION']:
if key in payload: return float(payload[key])
return None
GCN operational considerations:
- The GCN Kafka consumer should run in a dedicated background process, not embedded in the FastAPI worker. GCN notices must be processed within seconds, not waiting for a poll cycle.
- On startup, replay messages from the last 2 hours to catch anything missed during downtime.
- GCN triggers often come in rapid succession: initial rough position, then refined XRT position minutes later. The system must update an existing target's coordinates when a more precise position arrives, not create a duplicate.
- For large error boxes (Fermi/GBM, error > 5°): create a target at the centroid but annotate it with position_error_deg. The scheduler will present this to sites but the observation plan should instruct wide-field imaging to search the error box.
- LIGO/Virgo alerts (gravitational waves): These have enormous error boxes (10–1000 sq deg). Create a special kilonova_candidate campaign when a LIGO alert arrives, with multiple tiling targets covering the highest-probability sky region. This requires a separate tiling strategy (not implemented at MVP; document as future work).
GCN real-time latency considerations: The 30-second heartbeat abort mechanism (designed specifically for this case) should propagate a GRB alert to all sites within ~30 seconds. This is acceptable — most GRB afterglows are detectable for hours. The truly critical window (first minutes) will be missed by sites mid-observation anyway; the abort signal gets them observing within ~90 seconds (30s for heartbeat + 60s slew time). That's good enough for well-characterized afterglows.
2.4 TNS (Transient Name Server)¶
What it is: The IAU-sanctioned registry for transient astronomical events. All newly discovered transients that receive an IAU name (AT, SN, etc.) are registered here. TNS is the canonical deduplication source — if something is in TNS, it has an official name.
Alert format: REST JSON API. Well-documented at https://www.wis-tns.org/api/. Requires registration and an API key (free for research purposes).
Key endpoints:
GET https://www.wis-tns.org/api/get/search
?ra={ra}&dec={dec}&radius={arcsec} # Cone search
?format=json&api_key={key}
GET https://www.wis-tns.org/api/get/object
?objname={name}&format=json&api_key={key} # Full object info
Search for new objects:
GET https://www.wis-tns.org/api/get/search
?format=json
&api_key={key}
&discovered_period_value=1
&discovered_period_units=days
&classified_sne=1
&unclassified_at=1
&page=0
&num_page=50
Parsing TNS:
def parse_tns_object(obj: dict) -> NormalizedAlert:
# TNS provides very detailed info after classification
obj_type = obj.get('type', {}).get('name', 'unknown')
alert_type = classify_tns_type(obj_type)
# TNS has RA/Dec as sexagesimal strings
ra = sexag_to_deg(obj.get('ra', '00:00:00'))
dec = sexag_to_dec(obj.get('declination', '+00:00:00'))
# Latest photometry
photometry = obj.get('photometry', [])
magnitude = None
if photometry:
latest = sorted(photometry, key=lambda p: p.get('jd', 0))[-1]
magnitude = latest.get('flux')
return NormalizedAlert(
source='tns',
external_id=obj.get('objname'), # e.g. 'SN 2026abc'
ra_deg=ra,
dec_deg=dec,
position_error_deg=0.0001,
alert_type=alert_type,
magnitude=magnitude,
discovery_time=parse_tns_date(obj.get('discoverydate')),
priority=compute_tns_priority(obj),
raw_payload=obj
)
def classify_tns_type(type_str: str) -> str:
t = type_str.lower()
if 'sn ia' in t: return 'supernova_ia'
if 'sn ii' in t: return 'supernova_ii'
if 'sn ib' in t or 'sn ic' in t: return 'supernova_ibc'
if 'at' == t or 'transient' in t: return 'transient'
if 'cv' in t: return 'cv'
return 'transient'
Cadence: Poll every 5 minutes for newly classified objects. TNS updates several times per day for busy alert periods. Their rate limit is 100 requests/hour; 5-minute polling = 12 requests/hour, well within limits.
TNS as canonical deduplication layer: When we receive a ZTF or Gaia alert, we should cross-match against TNS to see if it already has an IAU name. If yes, use the TNS name as the canonical identifier and set external_id = tns_name. This ensures we don't have duplicate targets for the same object under different broker IDs.
def tns_crossmatch(ra: float, dec: float, search_radius_arcsec: float = 5.0) -> str | None:
"""Returns TNS name if the position matches a known TNS object."""
# Search TNS API with cone search
...
return tns_name_or_none
2.5 MPC (Minor Planet Center)¶
What it is: The IAU-sanctioned repository for all small body orbital data — asteroids, comets, TNOs, NEOs. Critical for: - Occultation predictions (know where the asteroid will be) - NEO follow-up targets (newly discovered, need rapid astrometry) - Comet activity monitoring
Alert types we care about: 1. MPC NEOCP (Near-Earth Object Confirmation Page): Newly discovered objects needing follow-up astrometry within hours to days before they fade from view or leave the detection window. High priority. 2. Occultation predictions: Not from MPC directly; derived from MPC orbital elements by Occult4 software or online services like Occult.work or Lucky Star. MPC provides the orbital elements; we apply them.
NEOCP access:
GET https://www.minorplanetcenter.net/iau/NEO/toconfirm_tabular.html
# Machine-readable version:
GET https://www.minorplanetcenter.net/cgi-bin/confirmObs.cgi?output=json
Response: list of unconfirmed NEO candidates with estimated RA/Dec, magnitude, and score field (0–100, higher = more likely real NEO).
Parsing NEOCP:
def parse_neocp_object(obj: dict) -> NormalizedAlert:
return NormalizedAlert(
source='mpc',
external_id=obj['designator'], # e.g. 'P10Xabc'
ra_deg=float(obj['ra']), # already decimal degrees
dec_deg=float(obj['dec']),
position_error_deg=0.01, # NEOCP positions can be rough
alert_type='neo_candidate',
magnitude=float(obj.get('V', 20)),
priority=compute_neocp_priority(obj),
expires_at=utc_now() + timedelta(days=3), # NEOCP objects need obs within days
raw_payload=obj
)
def compute_neocp_priority(obj: dict) -> float:
priority = 50
score = int(obj.get('score', 50))
if score > 80: priority += 20 # High confidence real object
mag = float(obj.get('V', 20))
if mag < 18: priority += 15 # Bright enough for smaller apertures
# Urgency: objects disappear from NEOCP quickly
priority += 10 # All NEOCP objects get baseline urgency boost
return min(priority, 85)
Occultation predictions: Separate integration. Services to pull from:
- Occult.work (Cees Bassa / community): REST API providing predicted occultations per date and geographic location. Returns star coordinates, predicted shadow path, event time.
- Lucky Star (Paris Observatory): Similar predictions for TNO occultations. CSV format accessible at https://lesia.obspm.fr/lucky-star/occ.php?nb=10&sort=date.
Occultation ingestion flow:
1. Pull predictions for the next 7 days
2. For each predicted event, compute which active OpenAstro sites lie within ±500 km of the shadow path centerline
3. If ≥ 1 active site is within the shadow path: create an occultation_star target with high priority, set event_time to predicted event UTC, set requires_simultaneous = true, set requires_gps_timing = true
4. Trigger scheduler to pre-assign this target to sites along the path
LUCKY_STAR_URL = "https://lesia.obspm.fr/lucky-star/occ.php"
def fetch_occultation_predictions(days_ahead: int = 7) -> list[dict]:
resp = requests.get(LUCKY_STAR_URL, params={
'nb': 100,
'sort': 'date',
'filter': 'next7days'
}, timeout=30)
# Parse response...
Cadence: NEOCP every 30 minutes (objects can come and go quickly). Occultation predictions once per day (they're computed well in advance).
3. Normalization Schema¶
All five sources produce different formats. They all get normalized into a single NormalizedAlert structure before writing to the database.
from pydantic import BaseModel
from datetime import datetime
from typing import Optional, Any
class NormalizedAlert(BaseModel):
source: str # 'ztf','gaia','gcn','tns','mpc'
external_id: str # source's internal ID
ra_deg: float # ICRS J2000 decimal degrees
dec_deg: float
position_error_deg: float = 0.001 # 1σ positional uncertainty
alert_type: str # normalized classification
magnitude: Optional[float] = None
magnitude_band: Optional[str] = None # 'V','R','r','G' etc.
trigger_time: Optional[datetime] = None # when event was detected
discovery_time: Optional[datetime] = None # when first light was seen
priority: float = 50 # 0–100
expires_at: Optional[datetime] = None # when to remove from active targets
raw_payload: dict[str, Any] # always store the original
Alert Type Vocabulary¶
Standardized alert_type values (used for scheduling logic):
| alert_type | Science case | Base priority |
|---|---|---|
grb |
GRB afterglow | 90 |
grb_afterglow |
GRB with optical counterpart | 85 |
kilonova_candidate |
GW counterpart | 80 |
microlensing_event |
Microlensing | 75 |
supernova |
Supernova monitoring | 60 |
supernova_ia |
Type Ia SN | 65 |
transient |
Unknown type transient | 50 |
neo_candidate |
NEOCP object | 60 |
occultation_star |
Stellar occultation | 85 (when within 24h) |
cv |
Cataclysmic variable | 55 |
agn |
AGN/blazar flare | 45 |
yso |
Young stellar object | 40 |
4. Deduplication Logic¶
The same astronomical event routinely appears in multiple streams. A single supernova will be discovered by ZTF, registered on TNS, and picked up by Gaia Alerts. Without deduplication, we'd create 3 targets for the same object.
Deduplication Algorithm¶
Step 1: Source-internal ID check
Query alerts table for existing record with same (source, external_id). If found, update the existing record rather than creating new. This handles reprocessing and repeat ingestion.
Step 2: Position cross-match For each new alert, run a cone search against existing active targets:
SELECT target_id, name, source, external_id,
earth_distance(ll_to_earth(dec_deg, ra_deg),
ll_to_earth($1, $2)) AS distance_deg
FROM targets
WHERE active = true
AND earth_box(ll_to_earth($1, $2), $3) @> ll_to_earth(dec_deg, ra_deg)
ORDER BY distance_deg
LIMIT 5;
$1=new_dec, $2=new_ra, $3=search_radius_meters
If a match exists within the search radius AND the existing target has the same broad alert type, it's the same event. Update the existing target rather than create a new one.
Search radii by source: - GCN initial: 3° (rough Fermi localizations) - GCN XRT refined: 30 arcsec (arcsec-precision Swift/XRT positions) - ZTF: 5 arcsec - Gaia: 2 arcsec - TNS: 5 arcsec - MPC NEOCP: 1 arcmin (NEOs move; some tolerance needed)
Step 3: TNS cross-match for canonical naming
After deduplication, check TNS for any previously registered name. If TNS has a name, adopt it as the canonical targets.name and store the TNS name in targets.aliases.
Deduplication Precedence¶
When merging alerts from multiple sources, use this priority for which source "wins" on conflicting fields:
| Field | Precedence |
|---|---|
| Coordinates | XRT > UVOT > Gaia > ZTF > TNS > Fermi GBM |
| Name | TNS official name > source-internal ID |
| Classification | TNS (spectroscopic) > ZTF/ALeRCE (photometric) > GCN (trigger type) |
| Magnitude | Most recent detection from any source |
5. Target Creation Rules¶
Not every alert becomes an OpenAstro target. Filters applied before creating a target:
Hard filters (reject if any are true):
- dec_deg < -60: Too far south for most of our northern volunteer network (adjust as network grows)
- magnitude > 20: Too faint for amateur gear (even large amateur scopes struggle past V~18-19)
- position_error_deg > 15: Localization too poor to follow up (waiting for refinement notice)
Soft filters (create target but with low priority):
- magnitude > 17: Requires large aperture (≥ 14"), set min_aperture_mm = 350
- alert_type == 'agn' unless magnitude has brightened significantly from baseline
For GRB targets specifically: Create immediately even without a precise position if position_error_deg < 5. Wide-field observers can start searching. Once XRT position arrives, update the target and refine.
Target expiry: - GRB afterglow targets: expire 7 days after trigger (afterglow fades but still detectable) - Supernova: expire 180 days after discovery - NEO candidate: expire 3 days (either confirmed or lost) - Microlensing: expire 30 days after peak (lensing events last weeks) - Occultation star: expire 1 hour after event time
6. Priority Computation¶
The alert ingestion service sets an initial priority when creating/updating a target. The scheduler then applies dynamic boosts on top of this. The ingestion service handles the "base" priority.
Priority Formula by Type¶
def compute_ingested_priority(alert: NormalizedAlert) -> float:
base = {
'grb': 90,
'kilonova_candidate': 80,
'microlensing_event': 70,
'supernova': 55,
'supernova_ia': 60,
'neo_candidate': 60,
'occultation_star': 70, # scheduler will boost to 95 within 1 hour of event
'transient': 50,
'cv': 50,
'agn': 40,
}.get(alert.alert_type, 45)
# Magnitude bonus (brighter = more accessible to network)
if alert.magnitude is not None:
if alert.magnitude < 13: base += 15
elif alert.magnitude < 16: base += 10
elif alert.magnitude < 18: base += 5
elif alert.magnitude > 19: base -= 15
# Position quality penalty
if alert.position_error_deg > 1.0: base -= 20
elif alert.position_error_deg > 0.1: base -= 10
# Source credibility
source_credibility = {
'gcn': 1.0, # NASA-operated, very reliable
'tns': 1.0, # IAU-sanctioned
'gaia': 0.95,
'ztf': 0.9,
'mpc': 0.95,
}.get(alert.source, 0.8)
return min(base * source_credibility, 100)
7. Service Architecture¶
7.1 At MVP¶
The alert ingestion service runs as scheduled background tasks within the FastAPI application process, using APScheduler:
FastAPI process
├── Gunicorn workers (2x) — handle HTTP requests
└── APScheduler main process
├── job: poll_ztf() every 5 min
├── job: poll_gaia() every 5 min
├── job: poll_tns() every 5 min
├── job: poll_neocp() every 30 min
├── job: poll_occultations() every 6 hours
└── job: gcn_consumer() continuous (separate thread)
GCN is special: it's a continuous Kafka consumer, not a periodic poll. Run it as a separate thread in APScheduler with a persistent loop.
7.2 At Scale (Phase 2+)¶
Move alert ingestion to a separate service with its own process (or container). This isolates the Kafka consumer and prevents it from being affected by application restarts.
alert-ingestor service (separate process):
├── ZTF poller (5 min interval)
├── Gaia poller (5 min interval)
├── TNS poller (5 min interval)
├── NEOCP poller (30 min interval)
├── Occultation poller (6 hour interval)
└── GCN Kafka consumer (continuous)
Writes to: PostgreSQL alerts table
Signals: Redis key for new alerts → FastAPI picks up and processes
7.3 Error Handling¶
Each source has independent error handling. If ZTF is down, the ZTF poller logs the error and retries at next interval. It does not affect GCN, Gaia, or TNS. Each poller tracks its last successful poll time in Redis; if a poller hasn't succeeded in > 2× its interval, raise a PagerDuty/UptimeRobot alert to the operator.
class PollerBase:
def __init__(self, name: str, interval_minutes: int):
self.name = name
self.interval = interval_minutes
self.redis_key = f"poller:last_success:{name}"
def run_poll(self):
try:
alerts = self.fetch()
self.process(alerts)
redis.set(self.redis_key, utc_now().isoformat(), ex=86400)
log.info(f"{self.name}: processed {len(alerts)} alerts")
except Exception as e:
log.error(f"{self.name}: poll failed: {e}", exc_info=True)
# Do NOT re-raise — let scheduler continue
8. Rate Limiting and API Etiquette¶
We are a guest on all these services. Do not abuse them.
| Service | Rate Limit | Our Cadence | Notes |
|---|---|---|---|
| ALeRCE (ZTF) | 1 req/s, no stated daily limit | 1 req/5min | Stay polite |
| Gaia Alerts | Not stated | 1 req/5min | Simple static CSV |
| GCN Kafka | No limit (push) | Continuous | Kafka consumer |
| TNS | 100 req/hour | 12 req/hour | Leave buffer |
| MPC NEOCP | Not stated | 2 req/hour | Light traffic |
| Lucky Star | Not stated | 1 req/day | Bulk prediction file |
Always identify ourselves in the User-Agent header:
User-Agent: OpenAstro-Ingestor/1.0 (openastro.science; contact@openastro.science)
9. Testing the Ingestion Pipeline¶
Testing alert ingestion is tricky because the sources are external and real-time. Strategy:
Unit tests: Use saved snapshots of real API responses (anonymized if needed). Test parsing functions against these snapshots. Do not make real HTTP calls in unit tests.
Integration tests: Run against staging APIs where available (TNS has a sandbox). For sources without sandboxes, use recorded responses via responses or pytest-httpserver.
End-to-end smoke tests: Manually inject a synthetic alert via the internal POST /api/v1/alerts endpoint. Verify it creates a target, the target appears in the scheduler output for appropriate sites, and a heartbeat abort is sent if priority is > 85.
GCN replay: GCN Kafka supports message replay from offset. For testing the GCN consumer, replay a known historical GRB notice (e.g., GRB 221009A — the "BOAT") and verify the target is created correctly with appropriate priority.
Last updated: 2026-03-20 Status: Planning document — not yet implemented Dependencies: Server Architecture Deep Dive.md (database schema, target creation)
Supplement: Alert Ingestion — Extended Design¶
Added: 2026-03-21. This supplement extends the base design above with four areas that were unresolved at the time of the original document: (1) the Rubin/LSST broker tier and the 10-million-to-50 funnel problem, (2) AAVSO as an alert source, (3) science-case-specific filter logic per science type, and (4) per-site weather integration. All sections below are new and are not covered above.
10. The Rubin/LSST Broker Tier¶
10.1 Why This Is Its Own Section¶
The existing source inventory covers ZTF via ALeRCE, which produces ~1 million alerts per night. Rubin/LSST is now operational and generates approximately 10 million alerts per night — an order of magnitude more. The volume makes direct Kafka consumption impractical without pre-filtering. This section describes the broker ecosystem that sits between Rubin and OpenAstro, how to choose between brokers, and how to consume only what we need.
Reference: Bellm et al. (2019), PASP 131, 068003 — the ZTF alert system paper, which established the AVRO-over-Kafka format that Rubin adopted. Nordin et al. (2019) describes the ALeRCE broker architecture. Graham et al. (2019) describes the ANTARES broker.
10.2 What a Broker Does¶
A community alert broker subscribes to the full Rubin (or ZTF) Kafka stream, classifies every alert using ML models and cross-matches, and re-publishes filtered sub-streams that clients can subscribe to. The broker absorbs the full volume; OpenAstro subscribes only to a named filter output.
Rubin → Kafka (10M alerts/night)
|
┌──────┴──────┬──────────────┬──────────────┐
↓ ↓ ↓ ↓
ANTARES ALeRCE Lasair Fink
(NOAO) (Chile/ZTF) (UK/Edinburgh) (France/CNRS)
| | | |
└──────┬──────┘ | |
↓ ↓ ↓
OpenAstro ingestion service (subscribe to one or more)
10.3 Broker Comparison¶
[NOVEL] This table summarises the four main community brokers as of early 2026 and their suitability for OpenAstro's specific needs:
| Broker | Operator | Primary stream | Filter interface | Output format | Latency from alert | Best for OpenAstro |
|---|---|---|---|---|---|---|
| ANTARES | NOIRLab (US) | ZTF + Rubin | SQL-like "locus filters" | Kafka sub-streams, REST API | ~5 min | GRB/transient follow-up; deep ML classification |
| ALeRCE | CATA Chile | ZTF + Rubin | REST query API, Python client | JSON REST | ~5–15 min | Supernova, AGN, variable star classification; good API |
| Lasair | Edinburgh/QUB | ZTF + Rubin | Named SQL "streams" | REST + Kafka | ~5–10 min | UK community; good TNS cross-matching |
| Fink | CNRS France | ZTF + Rubin | Python filter functions, Kafka | Kafka sub-streams | ~10 min | Custom science filters; most flexible; requires Python filter code |
Recommendation for OpenAstro MVP: Use ALeRCE as the primary broker because:
1. The REST query API requires no persistent Kafka consumer at MVP — a simple HTTP poll every 5 minutes is sufficient
2. ALeRCE's lc_classifier provides reliable SN/AGN/variable/transient labels that map cleanly to our alert_type vocabulary
3. ALeRCE has the most mature Python client (alerce-client) and extensive documentation
4. The alerce.online API is publicly accessible without Kafka infrastructure
Recommendation for Phase 2 (scale): Add Fink as a second broker via Kafka consumer, because Fink allows deploying custom Python filter functions on their infrastructure. This means we can push our science-case filters into Fink and receive only the ~50–200 alerts/night that matter, rather than querying the full classification index. [NOVEL: Running our science filters inside the broker rather than post-download cuts our ingestion bandwidth by ~99.9%.]
10.4 The 10M-to-50 Funnel¶
Getting from 10 million Rubin alerts per night to ~50 actionable OpenAstro targets requires staged filtering. The stages correspond roughly to where the computation happens:
Stage 0 — At broker (Fink/ANTARES filter):
Apply before download. This is a logical AND of coarse conditions.
Result: ~5,000 alerts pass per night.
Stage 1 — At ingestion service (post-download):
Apply classification and magnitude cuts.
Result: ~500 alerts pass.
Stage 2 — At target creation:
Apply science-case-specific criteria (see Section 12).
Apply network feasibility checks (declination, magnitude vs aperture).
Result: ~50 new or updated targets enter the scheduler.
Stage 3 — At scheduler:
Apply site-specific observability and weather.
Result: per-site target list of 5–10 targets.
Stage 0 filter (to deploy in Fink or ANTARES):
# Expressed as boolean conditions on Rubin/ZTF alert fields
# This runs inside the broker, not in our code
def openastro_coarse_filter(alert) -> bool:
# Magnitude: bright enough for amateur gear
if alert.magpsf > 19.5:
return False
# Real/bogus: reject obvious artifacts
if alert.rb < 0.65:
return False
# Not a known solar system object
if alert.ssdistnr is not None and alert.ssdistnr < 5.0:
return False
# Must have a previous non-detection (confirms it's new)
if alert.ndethist < 1:
return False
return True
This coarse filter can be expressed as a Fink filter function or an ANTARES locus filter expression. It reduces 10M → ~5,000 before we ever download anything.
Stage 1 filter (at ingestion service, post-download):
def stage1_filter(alert: NormalizedAlert) -> bool:
# Hard magnitude cut
if alert.magnitude is not None and alert.magnitude > 19.5:
return False
# Declination window: network can observe -60 to +90
if alert.dec_deg < -60:
return False
# Position quality
if alert.position_error_deg > 10.0:
return False
# Type filter: only types we have science cases for
actionable_types = {
'grb', 'grb_afterglow', 'kilonova_candidate',
'microlensing_event', 'supernova', 'supernova_ia', 'supernova_ii',
'supernova_ibc', 'transient', 'neo_candidate', 'occultation_star',
'cv', 'agn', 'yso', 'nova', 'variable_high_state'
}
if alert.alert_type not in actionable_types:
return False
return True
11. AAVSO as an Alert Source¶
11.1 What AAVSO Provides¶
The American Association of Variable Star Observers maintains the largest database of variable star observations in the world (~50 million observations), and also operates two alert mechanisms relevant to OpenAstro:
-
AAVSO Alerts: Human-issued notices when a known variable enters an unusual state (nova eruption, dwarf nova outburst, Mira approaching maximum, long-period variable behaving anomalously). Published at
https://www.aavso.org/aavso-alerts. Format: human-readable email/web post with target name, RA/Dec, current magnitude, and classification. -
AAVSO Special Notice: Faster, shorter alert for time-sensitive requests — e.g. "target of opportunity, observe tonight." Format: short text notice.
-
AAVSO VSX (Variable Star Index): Not an alert stream, but a catalog. Useful for cross-matching: when we receive a ZTF transient, check VSX to see if it's a known variable in outburst rather than a new transient. Access:
https://www.aavso.org/vsx/index.php?view=api.list&format=json&ident={name}or cone search athttps://www.aavso.org/vsx/index.php?view=api.list&format=json&coords={ra},{dec}&size={arcmin}.
11.2 Why AAVSO Matters for OpenAstro¶
AAVSO alerts cover two OpenAstro science cases specifically: - Recurrent novae and cataclysmic variables: When RS Oph, T CrB, U Sco, or similar erupts, AAVSO will issue an alert hours before any automated broker classifies it. Human observers catch nova eruptions early. - Microlensing high-magnification alerts: MOA and OGLE issue microlensing alerts; AAVSO sometimes relays them when the event is bright enough for amateurs.
The AAVSO alert cadence is low (~1–5 per week) but the targets are specifically chosen to be accessible to amateur gear (V < 15 at peak for most alerts).
11.3 Ingesting AAVSO Alerts¶
AAVSO does not have a formal machine-readable alert API as of 2026. The practical approach is to parse the alerts page:
GET https://www.aavso.org/aavso-alerts
→ HTML page with recent alerts
→ Parse with BeautifulSoup; extract: target name, date, magnitude, notes
GET https://www.aavso.org/vsx/index.php?view=api.list&format=json&ident={name}
→ VSX record: RA, Dec, type, period, max/min magnitudes
[NOVEL] Because AAVSO alerts are human-written, a simple keyword classifier covers the common cases reliably:
AAVSO_ALERT_URL = "https://www.aavso.org/aavso-alerts"
def classify_aavso_text(text: str) -> str:
t = text.lower()
if 'nova' in t and 'recurrent' in t: return 'recurrent_nova'
if 'nova' in t: return 'nova'
if 'dwarf nova' in t or 'dn outburst' in t: return 'cv'
if 'microlensing' in t: return 'microlensing_event'
if 'mira' in t or 'long period' in t: return 'variable_high_state'
if 'symbiotic' in t: return 'symbiotic_nova'
return 'variable_high_state'
def parse_aavso_alert(alert_html_block: str, vsx_record: dict) -> NormalizedAlert:
alert_type = classify_aavso_text(alert_html_block)
ra = float(vsx_record.get('RA2000', 0))
dec = float(vsx_record.get('Declination2000', 0))
# AAVSO often reports current magnitude in alert text
magnitude = extract_magnitude_from_text(alert_html_block)
priority = 70 if alert_type == 'nova' else 55
return NormalizedAlert(
source='aavso',
external_id=vsx_record.get('Name', 'unknown'),
ra_deg=ra,
dec_deg=dec,
position_error_deg=0.001,
alert_type=alert_type,
magnitude=magnitude,
magnitude_band='V',
priority=priority,
raw_payload={'alert_text': alert_html_block, 'vsx': vsx_record}
)
Cadence: Poll AAVSO alerts page every 30 minutes. The page changes infrequently; caching the last-seen alert ID prevents reprocessing.
VSX cross-matching: When any alert (ZTF, Gaia, TNS) comes in with alert_type in {'cv', 'nova', 'transient'}, run a VSX cone search at 10 arcsec radius. If VSX returns a known variable, use the VSX type to refine our classification and set external_id to the VSX name for deduplication.
Updated source inventory (additions to Section 2):
| Source | Type | Cadence | Volume/night | Latency |
|---|---|---|---|---|
| ZTF via ALeRCE | REST poll | 5 min | ~50 after filter | 5–15 min |
| Rubin via ALeRCE/Fink | REST poll / Kafka | 5 min | ~50 after filter | 5–20 min |
| Gaia Science Alerts | REST poll (CSV) | 5 min | 0–10 | Minutes to hours |
| GCN Kafka | Push (Kafka) | Continuous | 1–5 | 20s–15 min |
| TNS | REST poll | 5 min | 5–20 | Hours |
| MPC NEOCP | REST poll | 30 min | 1–10 | Minutes |
| Occultation predictions | REST poll | 6 hours | 0–5 per 7 days | Hours–days |
| AAVSO Alerts | HTML poll | 30 min | 0–1 | Hours |
12. Science-Case Filter Logic¶
12.1 Design Principle¶
Different science cases need different things from an alert. An exoplanet TTV campaign needs an ephemeris and a predicted transit window — a bare ZTF transient alert is not useful unless we can match it to a known exoplanet system. A GRB follow-up needs fast response and a rough position — precision photometry is irrelevant for the first 30 minutes.
[NOVEL] This section documents the per-science-case filter rules that run at Stage 2 of the funnel (Section 10.4). These are implemented in ingestion/science_filters.py as a chain of rule functions, each returning (accept: bool, target_fields_override: dict).
12.2 Filter: Exoplanet TTV Follow-up¶
Required inputs: Alert must match a known exoplanet host star in our ephemeris catalog. A random transient alert that happens to be near an exoplanet host is not useful; the transit must be predicted.
Process: 1. This filter is not primarily triggered by alert ingestion. It is driven by the ephemeris scheduler: a separate nightly job computes predicted transit windows for all known TESS/Kepler/PLATO transiting systems over the next 7 days and creates scheduled targets directly, without an alert. 2. Alert ingestion contributes to TTV follow-up only in one case: an AAVSO Special Notice or TNS classification identifies a transient on a known exoplanet host, which may indicate confusion with the host star photometry or a genuine stellar flare affecting transit timing.
Filter rule:
EXOPLANET_HOST_CATALOG = load_exoplanet_archive() # NASA Exoplanet Archive
def filter_ttv(alert: NormalizedAlert) -> tuple[bool, dict]:
# Check if alert position matches a known exoplanet host within 5 arcsec
host = cone_search_exoplanet_hosts(
alert.ra_deg, alert.dec_deg, radius_arcsec=5.0,
catalog=EXOPLANET_HOST_CATALOG
)
if host is None:
return False, {}
# Only accept if it's a flare/transient on the host that affects our photometry
if alert.alert_type not in ('transient', 'variable_high_state', 'yso'):
return False, {}
return True, {
'target_type': 'exoplanet_host_contamination',
'notes': f"Alert on exoplanet host {host['name']} — may affect TTV baseline",
'priority': 60
}
TTV target creation (ephemeris-driven, not alert-driven):
# Run nightly; creates targets for predicted transit windows
def create_transit_window_targets(lookahead_days: int = 7):
for system in EXOPLANET_HOST_CATALOG:
windows = predict_transit_windows(system, lookahead_days)
for window in windows:
create_or_update_target(
name=f"{system['name']} transit {window['midtime'].date()}",
ra_deg=system['ra'],
dec_deg=system['dec'],
target_type='exoplanet_transit',
event_time=window['midtime'],
window_start=window['ingress'] - timedelta(minutes=30),
window_end=window['egress'] + timedelta(minutes=30),
priority=65,
requires_simultaneous=False,
cadence_minutes=2, # 2-min cadence during transit
expires_at=window['egress'] + timedelta(hours=2)
)
12.3 Filter: GRB / Kilonova Follow-up¶
Alert source: GCN exclusively. No ZTF/Gaia filter produces GRB alerts.
Key criteria:
- Declination: the alert must be observable from at least one active site within 4 hours of trigger. This is computed dynamically, not as a hard cut.
- Position quality: position_error_deg < 0.5 for targeted follow-up (Swift/XRT quality). For large error boxes (Fermi/GBM, error 1°–5°), create the target immediately but flag it as search_mode = true.
- Magnitude: unknown at trigger time — do not filter on magnitude for GRB targets. The afterglow brightness ranges from V~6 (bright, rare) to V~22 (faint, common). Always create the target; let the scheduler assign only sites with appropriate aperture.
- T90 > 2 seconds: filters for long GRBs (collapsar progenitors with longer afterglows). Short GRBs (T90 < 2s, likely NS mergers) are kilonova candidates — create with alert_type = 'kilonova_candidate' and higher priority.
def filter_grb(alert: NormalizedAlert) -> tuple[bool, dict]:
if alert.source != 'gcn':
return False, {}
if alert.alert_type not in ('grb', 'kilonova_candidate'):
return False, {}
if alert.position_error_deg > 15.0:
# Too poorly localised; wait for refined notice
return False, {}
# Determine if any site can observe within 4 hours
observable_sites = sites_that_can_observe_in_window(
ra=alert.ra_deg,
dec=alert.dec_deg,
within_hours=4.0
)
if not observable_sites:
# No site can observe this declination in the next 4 hours
# Still create the target — network may have more sites tomorrow
pass # Accept but log as low-feasibility
overrides = {}
if alert.position_error_deg > 0.5:
overrides['search_mode'] = True
overrides['notes'] = f"Large error box ({alert.position_error_deg:.1f}°); wide-field search required"
if alert.alert_type == 'kilonova_candidate':
overrides['priority'] = 88
overrides['requires_multiband'] = True # r+i+z for red kilonova SED
return True, overrides
Colour constraints for kilonova classification:
Kilonovae are red (r-i > 0.5 within hours, darkening rapidly). [NOVEL] When a GCN GW alert triggers and a candidate is found, the scheduler should request multi-band observations (r, i, and if possible z) from any site with appropriate filters. Single-band V photometry alone cannot confirm a kilonova. The observation plan should encode this: required_filters = ['r', 'i'].
12.4 Filter: Stellar Occultations¶
Alert source: Not from transient alert brokers at all. Occultation predictions come from Lucky Star or Occult.work and are ingested by the occultation poller (Section 2.5 of base document). No ZTF/Gaia/GCN alert will ever tell you about a scheduled occultation.
Key criteria for creating an occultation target: - At least one active OpenAstro site must lie within ±750 km of the predicted shadow path centerline (3σ path uncertainty) - Star magnitude: V ≤ 13 preferred (brighter = more accessible to smaller apertures); V ≤ 15 acceptable for large-aperture sites - Event duration: ≥ 0.5 seconds (shorter events are undetectable without specialized hardware) - Time to event: create target when event is 7 days or fewer in the future; mark as urgent when ≤ 24 hours
def filter_occultation(prediction: dict, active_sites: list[Site]) -> tuple[bool, dict]:
path_lat = prediction['centerline_lat']
path_lon = prediction['centerline_lon']
path_uncertainty_km = prediction.get('path_uncertainty_km', 200)
# Check if any site is within 3σ of path (750 km hard ceiling)
search_radius_km = min(path_uncertainty_km * 3, 750)
sites_in_path = [
s for s in active_sites
if great_circle_km(s.latitude, s.longitude, path_lat, path_lon) < search_radius_km
]
if not sites_in_path:
return False, {} # No site in shadow path; skip
star_mag = prediction.get('star_mag_V', 15)
duration_s = prediction.get('max_duration_s', 0)
if duration_s < 0.5:
return False, {} # Too brief to detect
# Assign sites along the path
assigned_site_ids = [s.id for s in sites_in_path]
return True, {
'alert_type': 'occultation_star',
'priority': 90,
'requires_simultaneous': True,
'requires_gps_timing': True,
'assigned_sites': assigned_site_ids,
'star_mag': star_mag,
'min_aperture_mm': 100 if star_mag < 12 else 200,
'cadence_seconds': max(0.1, duration_s / 10), # Sample at 10x event duration
'notes': f"Shadow path ±{search_radius_km:.0f} km; {len(sites_in_path)} site(s) in path"
}
12.5 Filter: Microlensing Events¶
Alert sources: Gaia Alerts (best for amateurs; bright events), OGLE EWS (Early Warning System), KMTNet, and MOA. OGLE EWS is the primary professional source.
[NOVEL] OpenAstro's value for microlensing is specifically during high-magnification events and caustic crossings, which last hours and require continuous multi-longitude coverage. A standard microlensing event rising over 10 days has less urgency. The filter should distinguish these cases:
OGLE_EWS_URL = "https://ogle.astrouw.edu.pl/ogle4/ews/ews.html"
# OGLE EWS is a web page; parse it for new events with rising-alert status
def filter_microlensing(alert: NormalizedAlert) -> tuple[bool, dict]:
if alert.alert_type != 'microlensing_event':
return False, {}
# Check for high-magnification indicators in raw payload
raw = alert.raw_payload
magnification = raw.get('magnification') or raw.get('Amax')
u0 = raw.get('u0') # impact parameter; u0 < 0.1 → Amax > 10
high_mag = False
if magnification is not None and float(magnification) > 10:
high_mag = True
if u0 is not None and float(u0) < 0.1:
high_mag = True
priority = 80 if high_mag else 55
cadence_minutes = 5 if high_mag else 60
return True, {
'priority': priority,
'cadence_minutes': cadence_minutes,
'notes': 'High-magnification microlensing event' if high_mag else 'Standard microlensing event',
'requires_simultaneous': high_mag # Only enforce simultaneous coverage for caustic crossings
}
OGLE EWS ingestion: OGLE's Early Warning System publishes a list of ongoing microlensing events. This should be polled once per hour and new high-magnification events ingested:
def poll_ogle_ews():
# OGLE EWS publishes current events in a parseable format
resp = requests.get(OGLE_EWS_URL, timeout=30,
headers={'User-Agent': 'OpenAstro-Ingestor/1.0'})
# Parse for events with predicted peak < 10 days away and Amax > 10
# Create NormalizedAlert with source='ogle_ews'
...
MOA alert stream: MOA (Microlensing Observations in Astrophysics, New Zealand) publishes rapid microlensing alerts at https://www.massey.ac.nz/~iabond/moa/alerts/. These are particularly useful for caustic-crossing events with very short predicted windows. Add as a separate poller alongside OGLE EWS.
12.6 Filter: Supernovae (Shock Breakout and Early Follow-up)¶
Alert sources: ZTF/ALeRCE, Gaia Alerts, TNS.
Key criteria: Distance matters more than for other types. A nearby supernova in a face-on spiral at d < 20 Mpc is scientifically far more valuable than a cosmologically distant SN Ia used only for H₀ measurement.
def filter_supernova(alert: NormalizedAlert) -> tuple[bool, dict]:
if alert.alert_type not in ('supernova', 'supernova_ia', 'supernova_ii', 'supernova_ibc'):
return False, {}
# Magnitude proxy for distance: V < 16 implies d < ~50 Mpc for typical SN
if alert.magnitude is None or alert.magnitude > 18:
return False, {} # Too faint; beyond useful range for amateurs
priority = 55
if alert.magnitude < 14:
priority = 75 # Very nearby SN; e.g. in a Virgo cluster galaxy
# Check rising: did magnitude increase by > 0.5 mag in last detection?
raw = alert.raw_payload
mag_rise = raw.get('magdiff') or 0
if float(mag_rise) > 0.5:
priority += 10 # Actively rising; early-phase photometry is valuable
return True, {
'priority': priority,
'cadence_minutes': 30, # Monitor multiple times per night at peak
'expires_at': utc_now() + timedelta(days=120)
}
Shock breakout targets: [NOVEL] The first hours of a supernova are the most scientifically valuable for constraining progenitor structure, but they are also the most likely to be missed — the source must already be in our field when the shock breaks out. The response strategy is different from follow-up:
- For a newly discovered SN within 12 hours of first detection: treat as priority = 80, observe every 30 minutes to catch any remaining early-time evolution
- For ULTRASAT UV alerts (when ULTRASAT is operational ~2026): these will directly flag shock breakout in UV; create a high-priority optical counterpart target immediately on receipt of an ULTRASAT alert via GCN
13. Weather Integration¶
13.1 The Problem¶
Alert ingestion produces targets. The scheduler assigns targets to sites. But a site under thick cloud cover will waste the network's time — it will receive a target assignment, attempt to observe, and report nothing (or bad data). The scheduler must know which sites have usable conditions before assigning targets.
Weather integration is not about blocking observations — it is about smarter assignment. A site that is cloudy should get fewer assignments, not zero, because cloud cover can clear unexpectedly. A site with a forecast of 80% cloud cover in 2 hours should get a target that can be observed now, not one that requires a 2-hour observing run.
13.2 Data Sources¶
Three tiers of weather information, in decreasing cost:
Tier 1 — Self-reported from client software [NOVEL]
The OpenAstro client running at each site should report its current conditions on every heartbeat:
{
"site_id": "abc123",
"timestamp": "2026-03-21T22:14:00Z",
"sky_condition": "clear", // "clear" | "thin_cloud" | "thick_cloud" | "rain"
"seeing_arcsec": 2.3, // if site has a seeing monitor
"humidity_pct": 55,
"wind_ms": 4.2,
"dew_point_c": 8.1,
"weather_source": "manual" // "manual" | "boltwood" | "aag_cloudwatcher" | "api"
}
This requires the client to either poll a cloud sensor (Boltwood Cloud Sensor, AAG CloudWatcher) or allow the operator to set condition manually. Most serious amateur sites have at least a Boltwood unit. The weather_source field lets the scheduler weight self-reported data appropriately: sensor data is more reliable than manual input.
Tier 2 — OpenWeatherMap API (automated, per-site)
OpenWeatherMap provides a free tier of their One Call API with 1,000 calls/day. At 50 sites × 12 calls/hour = 600 calls/hour, the free tier is marginal but workable. The paid tier (One Call 3.0) provides hourly forecast for the next 48 hours, which is what we need for scheduling.
Endpoint:
GET https://api.openweathermap.org/data/3.0/onecall
?lat={lat}&lon={lon}
&exclude=minutely,daily,alerts
&appid={key}
&units=metric
Response fields we use:
- current.clouds — cloud cover percentage (0–100)
- current.visibility — visibility in metres
- current.humidity — relative humidity
- current.wind_speed — wind in m/s
- hourly[0..8] — next 8 hours of forecast
[NOVEL] Cache each site's weather forecast for 30 minutes. Re-fetch only when the cache expires or the scheduler explicitly requests a refresh for an urgent target.
@dataclass
class SiteWeather:
site_id: str
timestamp: datetime
cloud_cover_pct: float # 0–100; from OWM or sensor
visibility_m: float # 0–10000+
humidity_pct: float
wind_ms: float
forecast_2h_cloud_pct: float # OWM hourly[2].clouds
weather_source: str # 'owm' | 'sensor' | 'manual'
is_usable: bool # computed property
@property
def is_usable(self) -> bool:
if self.cloud_cover_pct > 70:
return False
if self.visibility_m < 5000:
return False
if self.wind_ms > 15:
return False
return True
@property
def usability_score(self) -> float:
"""0.0 (unusable) to 1.0 (perfect). Used as scheduler weight multiplier."""
if not self.is_usable:
return 0.0
cloud_factor = 1.0 - (self.cloud_cover_pct / 100) ** 0.5
wind_factor = max(0, 1.0 - self.wind_ms / 20.0)
humidity_factor = max(0, 1.0 - max(0, self.humidity_pct - 60) / 40.0)
return cloud_factor * wind_factor * humidity_factor
Tier 3 — Clear Outside (forecast-focused, UK/Europe bias)
Clear Outside (https://clearoutside.com) is a popular astronomy-specific forecast service. It provides hourly astronomy conditions: cloud layers, seeing forecast (based on atmosphere model), transparency. There is no official API; the data can be fetched from their api.clearoutside.com endpoint (undocumented but stable). Format: JSON with hourly forecast arrays.
[NOVEL] Use Clear Outside for seeing forecast where available (it uses the GFS model to predict atmospheric turbulence). Supplement OpenWeatherMap's cloud data with Clear Outside's seeing estimate. Merge: OWM for cloud/wind/humidity, Clear Outside for seeing.
CLEAR_OUTSIDE_URL = "https://api.clearoutside.com/forecast/{lat}/{lon}"
def fetch_clear_outside(lat: float, lon: float) -> dict | None:
try:
resp = requests.get(
CLEAR_OUTSIDE_URL.format(lat=round(lat, 2), lon=round(lon, 2)),
timeout=10,
headers={'User-Agent': 'OpenAstro-Ingestor/1.0'}
)
if resp.status_code != 200:
return None
return resp.json()
except Exception:
return None
13.3 Weather-Aware Scheduling¶
[NOVEL] The scheduler currently applies altitude and priority scoring. Weather adds a third dimension: site usability. The integration is simple: multiply the target score for a given site by that site's usability_score.
# In scheduler.py — modified calculate_observable_targets()
def calculate_observable_targets(site: Site, weather: SiteWeather, now: datetime = None):
...
for target in active_targets:
# Existing altitude and priority score
base_score = compute_base_score(target, site, now)
# Weather multiplier
if not weather.is_usable:
continue # Skip targets for cloudy sites entirely in assignment
score = base_score * weather.usability_score
# Forecast penalty: if it's going to cloud over during a long exposure run
required_duration_h = estimate_observation_duration(target)
if required_duration_h > 1.0 and weather.forecast_2h_cloud_pct > 60:
score *= 0.5 # Halve priority if clouds expected mid-session
scored_targets.append(...)
Weather for occultations specifically: [NOVEL] Occultation targets are time-critical and non-recoverable. If the primary site assigned to an occultation chord is forecasting >60% cloud cover in the 30-minute window around the event, the system should:
1. Log a warning in targets.notes
2. Look for backup sites within ±300 km of the primary site's shadow chord
3. If a clear backup exists, add it to the assigned_sites list
4. Notify the operator via the ops channel
def check_occultation_weather_coverage(target: Target, active_sites: list[Site],
weather_cache: dict[str, SiteWeather]) -> None:
assigned = target.assigned_sites
event_time = target.event_time
for site_id in assigned:
weather = weather_cache.get(site_id)
if weather is None:
continue
if weather.forecast_2h_cloud_pct > 60:
# Find backup site within 300 km along the shadow chord
backups = find_backup_sites_near_chord(
target, site_id, active_sites, weather_cache, radius_km=300
)
if backups:
log.warning(
f"Occultation {target.name}: primary site {site_id} clouded out "
f"(forecast {weather.forecast_2h_cloud_pct}% cloud). "
f"Adding backup: {backups[0].id}"
)
target.assigned_sites.append(backups[0].id)
else:
log.warning(
f"Occultation {target.name}: primary site {site_id} clouded out "
f"and no backup site found within 300 km."
)
13.4 Weather Fetch Schedule¶
| Source | Fetch frequency | Cached for | Priority |
|---|---|---|---|
| Client heartbeat (sensor/manual) | 30 seconds (passive) | N/A (streaming) | Highest — most accurate |
| OpenWeatherMap One Call | Every 30 min per site | 30 min | Primary automated source |
| Clear Outside (seeing) | Every 60 min per site | 60 min | Supplementary |
[NOVEL] The weather fetch for a site should be triggered by scheduler demand, not on a fixed global clock. When the scheduler evaluates site X for a new high-priority target, it checks if X's weather cache is older than 30 minutes and fetches fresh data before scoring. This avoids polling all 50+ sites constantly when most are in daylight and irrelevant.
def get_site_weather(site: Site, cache: dict, max_age_minutes: int = 30) -> SiteWeather:
cached = cache.get(site.id)
if cached and (utc_now() - cached.timestamp).total_seconds() < max_age_minutes * 60:
return cached
# Fetch fresh
owm_data = fetch_owm(site.latitude, site.longitude)
co_data = fetch_clear_outside(site.latitude, site.longitude)
weather = SiteWeather(
site_id=site.id,
timestamp=utc_now(),
cloud_cover_pct=owm_data['current']['clouds'],
visibility_m=owm_data['current'].get('visibility', 10000),
humidity_pct=owm_data['current']['humidity'],
wind_ms=owm_data['current']['wind_speed'],
forecast_2h_cloud_pct=owm_data['hourly'][2]['clouds'],
weather_source='owm'
)
# Supplement with Clear Outside seeing if available
if co_data:
weather.seeing_arcsec_forecast = parse_co_seeing(co_data)
cache[site.id] = weather
return weather
13.5 Weather Database Schema¶
Add a site_weather table to track historical conditions (useful for understanding which sites consistently have good weather and for science quality flagging):
CREATE TABLE site_weather (
id INTEGER PRIMARY KEY,
site_id INTEGER REFERENCES sites(id),
recorded_at DATETIME NOT NULL, -- UTC
cloud_pct REAL,
visibility_m REAL,
humidity_pct REAL,
wind_ms REAL,
seeing_arcsec REAL,
weather_source TEXT, -- 'owm'|'sensor'|'manual'
is_usable INTEGER, -- 0 or 1
usability_score REAL -- 0.0–1.0
);
CREATE INDEX ix_site_weather_site_time ON site_weather(site_id, recorded_at);
Retention policy: keep full data for 30 days, then aggregate to daily min/mean/max per site and delete hourly records.
14. Cross-Broker Deduplication (Extension to Section 4)¶
The deduplication logic in Section 4 handles the case where the same astrophysical source appears in ZTF and Gaia and TNS. There is a second, subtler deduplication problem: the same ZTF alert can be redistributed by multiple brokers — ALeRCE, Lasair, and Fink all ingest the identical ZTF AVRO packet and independently produce JSON outputs. If OpenAstro subscribes to more than one broker (which it should in Phase 2), it will receive the same event multiple times.
14.1 Broker-Level Dedup Key¶
Every ZTF alert has a stable candid (candidate ID) — a unique 64-bit integer assigned by ZTF. This is present in ALeRCE, Lasair, and Fink outputs. Rubin will use a similar diaSourceId.
[NOVEL] The deduplication table should store both the source broker and the underlying survey ID:
CREATE TABLE alert_dedup (
id INTEGER PRIMARY KEY,
survey_id TEXT NOT NULL, -- ZTF candid, Rubin diaSourceId, etc.
survey_source TEXT NOT NULL, -- 'ztf', 'rubin', 'gaia', etc.
first_seen_via TEXT, -- which broker delivered it first
target_id INTEGER REFERENCES targets(id),
processed_at DATETIME
);
CREATE UNIQUE INDEX ix_dedup_survey ON alert_dedup(survey_source, survey_id);
When a new alert arrives from any broker:
1. Extract the underlying survey ID (candid for ZTF, diaSourceId for Rubin)
2. INSERT OR IGNORE INTO alert_dedup (survey_source, survey_id, first_seen_via, ...) — SQLite's INSERT OR IGNORE atomically does nothing if the unique index is violated
3. Check changes() — if 0, this is a duplicate from a second broker; skip processing
def is_duplicate_cross_broker(survey_source: str, survey_id: str, broker: str,
conn) -> bool:
"""Returns True if we've already processed this alert from another broker."""
result = conn.execute(
"INSERT OR IGNORE INTO alert_dedup (survey_source, survey_id, first_seen_via, processed_at) "
"VALUES (?, ?, ?, ?)",
(survey_source, survey_id, broker, utc_now().isoformat())
)
conn.commit()
return result.rowcount == 0 # 0 rows inserted → already exists → duplicate
14.2 ZTF Object-Level vs Alert-Level Dedup¶
ZTF generates two levels of identifiers:
- candid: unique per individual alert (one detection). A single source may have 100 candids over its lifetime.
- objectId (ZTF name, e.g. ZTF21abcdefg): stable identifier for the astrophysical source. All alerts from the same source share this.
The dedup table uses candid to avoid double-processing from multiple brokers. The target table uses objectId to avoid creating duplicate targets for the same source over multiple nights. Both checks must run:
- Cross-broker dedup:
candidinalert_dedup→ avoid double-processing same detection - Cross-night dedup:
objectIdintargets.external_id WHERE source='ztf'→ update existing target rather than create new
15. Operational Runbook Additions¶
15.1 What Happens When a Broker Is Down¶
| Situation | Expected behavior |
|---|---|
| ALeRCE API returns 503 | ZTF poller logs error and skips cycle; retries in 5 min; no other source affected |
| GCN Kafka consumer loses connection | Consumer attempts reconnect with exponential backoff (1s, 2s, 4s, max 60s); replays from last committed offset on reconnect; ops alert if down > 10 min |
| TNS API rate-limited (429 response) | Poller backs off for the rate-limit window (use Retry-After header if present); log warning; no data lost |
| OpenWeatherMap API key exhausted | Weather falls back to Clear Outside only; scheduler marks weather data as low_confidence; log daily digest of weather fetch failures |
| Lucky Star unreachable | Occultation poller skips the cycle and uses cached predictions from last successful fetch; alert ops if cache is > 24 hours old |
15.2 Missing Fields and Graceful Degradation¶
Not every alert from every source will have all fields. The system must degrade gracefully:
| Missing field | Fallback |
|---|---|
magnitude |
Allow target creation; scheduler will flag as magnitude_unknown; assign only to sites with wide magnitude range |
position_error_deg |
Set to 0.1° (conservative default); do not block target creation |
discovery_time |
Use ingestion timestamp; document as approximate |
alert_type classification |
Default to 'transient' with priority 50; let human operator reclassify |
dec_deg outside -90/+90 |
Reject with error log; do not create target |
15.3 Alert Volume Monitoring¶
The ingestion service should emit metrics that are logged to the /api/v1/ingestor/stats endpoint:
{
"window": "last_24h",
"alerts_received": {
"ztf_alerce": 312,
"rubin_alerce": 4821,
"gaia": 6,
"gcn": 4,
"tns": 18,
"mpc_neocp": 7,
"aavso": 1,
"ogle_ews": 2
},
"alerts_filtered_out": 5043,
"targets_created": 38,
"targets_updated": 94,
"dedup_cross_broker": 241,
"dedup_cross_night": 87,
"weather_fetches": {
"owm_success": 1440,
"owm_fail": 12,
"clear_outside_success": 720,
"clear_outside_fail": 8
}
}
If targets_created drops to zero for more than 24 hours, raise an ops alert — this likely means a filter is misconfigured or all pollers are failing silently.
16. Implementation Priority Order¶
Given the MVP constraint (solo developer, SQLite, limited ops overhead), implement alert ingestion in this order:
-
GCN Kafka consumer — highest science impact; handles GRBs which are the most time-critical alerts and require no polling logic. Use the
gcn-kafkaPython package; 50 lines of code for a working consumer. -
Gaia Alerts CSV poller — simplest implementation (no auth, no Kafka, pure HTTP + CSV parse); covers microlensing and CVs immediately.
-
TNS REST poller — adds canonical deduplication and supernova coverage. Requires free API key.
-
ALeRCE REST poller — adds ZTF-sourced transients. Requires free account. Largest volume but also the most well-documented.
-
MPC NEOCP poller — adds NEO candidates. Covers occultation-adjacent science.
-
Occultation prediction poller (Lucky Star / Occult.work) — adds occultation targets. This is the highest-priority science case but requires the most custom logic (path geometry, site assignment).
-
OpenWeatherMap weather integration — add after at least 5 active sites are registered; useless before then.
-
AAVSO / OGLE EWS pollers — add in Phase 2; covers remaining edge cases (novae, high-magnification microlensing).
-
Rubin/Fink broker integration — Phase 2; requires the Rubin data to be worth acting on and the network to have sites capable of following up Rubin-discovered sources at V > 18.
Supplement added: 2026-03-21 Status: Planning document — not yet implemented Dependencies: Alert Ingestion Design.md (base document, Sections 1–9), Server Architecture Deep Dive.md, Stellar Occultations — Complete.md, Exoplanet Science — Complete.md