Stage 1 Data Pipeline Design¶
This document covers the full data pipeline for Stage 1 — ingesting, calibrating, and combining data from existing public archives (AAVSO, MPC, AstroBin, ETD) to produce calibrated light curves and scientific output without any live telescope network. The pipeline also serves as the foundation for Stage 2's live data processing.
The Roadmap document (NewOpenAstro/Roadmap.md) establishes the high-level structure. This document provides the implementation details — the specific APIs, parsing code, calibration math, and quality control logic needed to actually build it.
1. Architecture Overview¶
The pipeline is modular. Each stage has a clean process(input) → output signature. Stages can be run independently and their outputs inspected. Nothing is a black box.
┌───────────────────────────────────────────────────────────────────────┐
│ STAGE 1 DATA PIPELINE │
│ │
│ Source Adapters Normalize Calibrate │
│ ┌──────────┐ ┌────────────┐ ┌───────────────────┐ │
│ │ AAVSO │──raw LCs───▶│ │ │ │ │
│ │ Adapter │ │ │ │ Zero-point vs │ │
│ ├──────────┤ │ Normalizer │────▶│ Gaia/APASS │ │
│ │ MPC │──raw obs───▶│ │ │ Color term │ │
│ │ Adapter │ │ WCS check │ │ correction │ │
│ ├──────────┤ │ Dedup │ │ │ │
│ │ AstroBin │──FITS───────▶│ Std hdr │ │ Instrument │ │
│ │ Adapter │ └────────────┘ │ registry update │ │
│ ├──────────┤ └────────┬──────────┘ │
│ │ ETD │──transit times──────────────────────────┤ │
│ │ Adapter │ │ │
│ └──────────┘ ▼ │
│ ┌──────────────────┐ │
│ │ Combination │ │
│ │ & Assembly │ │
│ │ │ │
│ │ SNR-weighted │ │
│ │ ensemble phot │ │
│ │ Light curve │ │
│ │ assembly │ │
│ └────────┬─────────┘ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ Science Layer │ │
│ │ │ │
│ │ Lomb-Scargle │ │
│ │ Transit timing │ │
│ │ TTV extraction │ │
│ │ N-body inference │ │
│ └────────┬─────────┘ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ Outputs │ │
│ │ │ │
│ │ Calibrated FITS │ │
│ │ archive │ │
│ │ Light curves │ │
│ │ (FITS + CSV) │ │
│ │ Instrument │ │
│ │ registry │ │
│ │ Science results │ │
│ └──────────────────┘ │
└───────────────────────────────────────────────────────────────────────┘
2. Source Adapters¶
2.1 AAVSO¶
What AAVSO provides: The richest amateur photometry archive in existence. Millions of observations of variable stars, spanning decades. Data is already observer-calibrated using standard comparison stars from AAVSO's own charts. Equipment metadata is associated with each observer code.
Access: astroquery.aavso (built into astroquery) or direct API.
from astroquery.aavso import AAVSO
aavso = AAVSO()
table = aavso.query_star(
star_name='RR_Lyr', # or any AAVSO VSX name
obsbands=['V', 'B'],
date_start='2024-01-01',
date_end='2026-01-01'
)
AAVSO observation format (key fields):
- obsbandcode: Filter (V, B, R, I, Vis, etc.)
- obsdate: JD of observation
- magnitude: Reported magnitude
- uncertainty: Reported photometric uncertainty
- observer: AAVSO observer code (e.g., 'MAAN')
- comp1: Primary comparison star designation
- comp2: Check star designation
- airmass: (sometimes reported)
- telescope / ccd: Equipment info (observer-reported, inconsistent)
AAVSO data quality notes: - Visual (Vis band) estimates are systematically noisier than CCD; treat separately - Comparison star issues: some observers use non-standard comp stars; flag and cross-check against AAVSO chart revisions - Saturated measurements: occasionally observers submit magnitudes for saturated images; outlier rejection handles these - Observer consistency: some observers have multi-decade records; their historical data may predate standardization
AAVSO adapter implementation:
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import astropy.units as u
from astropy.time import Time
@dataclass
class RawPhotometry:
"""Normalized photometry record, source-agnostic."""
source: str # 'aavso', 'mpc', 'astrobin', 'etd'
target_name: str
observer_code: str
jd_obs: float # Julian Date of observation mid-point
filter_name: str # normalized to Johnson-Cousins: 'B','V','R','I','Clear'
magnitude: float
mag_error: float
ra_deg: Optional[float] # if known
dec_deg: Optional[float]
airmass: Optional[float]
instrument_id: Optional[str] # FK to instruments table after calibration
raw_metadata: dict
class AAVSOAdapter:
def fetch(self, target: str, start_jd: float, end_jd: float,
filters: list[str] = None) -> list[RawPhotometry]:
from astroquery.aavso import AAVSO
aavso = AAVSO()
obs_time_start = Time(start_jd, format='jd').iso.split('T')[0]
obs_time_end = Time(end_jd, format='jd').iso.split('T')[0]
try:
table = aavso.query_star(
star_name=target,
obsbands=filters,
date_start=obs_time_start,
date_end=obs_time_end
)
except Exception as e:
log.error(f"AAVSO fetch failed for {target}: {e}")
return []
records = []
for row in table:
# Skip visual estimates unless explicitly requested
if row['obsbandcode'] == 'Vis' and 'Vis' not in (filters or []):
continue
filter_normalized = self._normalize_filter(row['obsbandcode'])
if filter_normalized is None:
continue
records.append(RawPhotometry(
source='aavso',
target_name=target,
observer_code=str(row['observer']),
jd_obs=float(row['obsdate']),
filter_name=filter_normalized,
magnitude=float(row['magnitude']),
mag_error=float(row['uncertainty']) if row['uncertainty'] else 0.05,
ra_deg=None, # AAVSO doesn't give per-observation RA/Dec
dec_deg=None,
airmass=float(row['airmass']) if row['airmass'] else None,
instrument_id=None, # Assigned after instrument registry lookup
raw_metadata=dict(zip(table.colnames, [str(v) for v in row]))
))
return records
def _normalize_filter(self, aavso_band: str) -> Optional[str]:
"""Map AAVSO band codes to standard Johnson-Cousins names."""
mapping = {
'B': 'B', 'V': 'V', 'R': 'R', 'I': 'I',
'Rc': 'R', 'Ic': 'I', # Cousins variants
'TG': 'V', # Transformed Green → V
'TR': 'R', # Transformed Red
'CV': 'Clear', # Clear V-filtered comparison
'CR': 'Clear',
'J': None, # NIR — skip unless we specifically want it
}
return mapping.get(aavso_band)
2.2 ETD (Exoplanet Transit Database)¶
What ETD provides: Amateur transit light curves submitted by observers worldwide. The Czech Astronomical Society manages this. Each submission includes: transit start, minimum, end times; depth; fitted parameters; raw light curve data.
Access: ETD has a public web interface but no formal API. Two options:
1. Scrape the per-target HTML page (fragile, not recommended)
2. Use astroquery.exofop instead: ExoFOP (NASA) is better-structured, includes ETD data plus TESS data, and has a proper API.
ExoFOP access:
from astroquery.exofop import ExoFOP
exofop = ExoFOP()
timeseries = exofop.query_planet('KELT-11b') # returns astropy Table
For raw ETD data when ExoFOP doesn't have what we need:
- ETD provides per-transit data files at http://var2.astro.cz/tresca/transit-detail.php?id={transit_id}
- Bulk download: http://var2.astro.cz/tresca/transit-list.php?target={planet_name}&format=csv
ETD data format for transit timing:
# ETD output columns (relevant subset):
# Transit_midtime (JD), Transit_midtime_error (days),
# Duration (hours), Depth (mmag), Observer, Telescope, Filter
ETD adapter (simplified — full implementation handles edge cases):
import requests
import pandas as pd
import io
class ETDAdapter:
ETD_URL = "http://var2.astro.cz/tresca/transit-list.php"
def fetch_transit_times(self, planet_name: str) -> pd.DataFrame:
resp = requests.get(self.ETD_URL,
params={'target': planet_name, 'format': 'csv'},
timeout=60)
resp.raise_for_status()
df = pd.read_csv(io.StringIO(resp.text), comment='#')
df = df.rename(columns={
'Transit_midtime': 'bjd_mid',
'Transit_midtime_error': 'bjd_mid_err',
'Duration': 'duration_hours',
'Depth': 'depth_mmag'
})
df['source'] = 'etd'
return df
Using ETD data for TTV: ETD's transit mid-times have already been measured by the submitting observer, so we don't need to re-fit. We take the BJD mid-time and error from each submission and feed directly into the TTV pipeline. See Section 7.
2.3 MPC¶
What MPC provides: Standardized astrometric observations of minor planets from amateur and professional observers worldwide. The MPC80 format is compact ASCII.
Access: Astroquery or direct HTTP.
from astroquery.mpc import MPC
mpc = MPC()
obs = mpc.get_observations_async('(433) Eros', get_mpc_observations=True)
# Returns table of MPC80-format observations
Or direct API: https://www.minorplanetcenter.net/obs_search?object_id=433&output_format=json
MPC data for OpenAstro Stage 1: - Primary use: validate the astrometry pipeline against known asteroid positions - Secondary: feed into occultation prediction pipeline (need accurate asteroid ephemerides) - NOT for photometry: MPC observations are astrometric; magnitudes are incidental and poorly calibrated
MPC observer code registry: Each MPC observer code maps to a telescope site with known location. Cross-reference MPC observer codes with our instruments table. This is a valuable source of characterized site locations.
2.4 AstroBin¶
What AstroBin provides: Large community image hosting platform. Contains processed FITS files from thousands of observers, with equipment metadata. Not a science archive — images are primarily for aesthetics — but equipment metadata and some photometric data are accessible.
Access: AstroBin API (registration required, free tier available).
GET https://www.astrobin.com/api/v1/image/?format=json&api_key={key}&api_secret={secret}
&imaging_telescopes__aperture__gte=200 # aperture filter
&subject_type__in=STAR,DEEP_SKY # type filter
AstroBin's value for Stage 1: Primarily the equipment metadata, not the images. We can use AstroBin as a source for populating the instruments registry — querying the telescope/camera combinations that observers use, along with rough site locations. This pre-populates the registry before any Stage 2 live observations arrive.
For actual image data, AstroBin images are usually heavily processed (noise reduction, stretch, etc.) and not appropriate for science photometry. Exception: some observers upload science-quality calibrated FITS. These can be identified by checking FITS headers for proper calibration keywords (GAIN, RDNOISE, etc.).
AstroBin adapter (instruments registry focus):
class AstroBinAdapter:
API_URL = "https://www.astrobin.com/api/v1"
def fetch_equipment_profiles(self, min_aperture_mm: int = 100) -> list[dict]:
"""Fetch observer equipment combinations to populate instrument registry."""
resp = requests.get(
f"{self.API_URL}/telescope/",
params={
'format': 'json',
'aperture__gte': min_aperture_mm,
'limit': 100,
'offset': 0
},
headers={'Authorization': f'Token {self.api_key}'}
)
return resp.json().get('objects', [])
3. Normalization Stage¶
The normalizer takes RawPhotometry records from any adapter and produces a unified internal format, applying:
-
Filter standardization: Map all filter names to Johnson-Cousins (B, V, R, I) or Sloan (g, r, i, z). Anything that can't be mapped is flagged as
filter_unknown. -
Time standardization: Convert all times to BJD_TDB (Barycentric Julian Date, Barycentric Dynamical Time). This corrects for Earth's orbital motion around the Sun (up to ±8 minutes) and is the standard for precision timing in exoplanet and variable star work.
from astropy.time import Time from astropy.coordinates import SkyCoord, EarthLocation import astropy.units as u def to_bjd_tdb(jd_utc: float, ra_deg: float, dec_deg: float, location: EarthLocation = None) -> float: t = Time(jd_utc, format='jd', scale='utc') if location: t = t.light_travel_time( SkyCoord(ra=ra_deg*u.deg, dec=dec_deg*u.deg), ephemeris='builtin', location=location ) + t # Convert to TDB scale return t.tdb.jd -
Coordinate standardization: All coordinates to ICRS J2000 decimal degrees. This catches occasional submissions in galactic coordinates or other frames.
-
Deduplication: If two records from different sources have the same target, same observer code, same JD (within 0.001 days = 1.4 minutes), and same filter, they are likely duplicates. Keep the one with better metadata and flag the other.
-
Outlier flagging: Flag any single observation more than 5σ from the local median of observations within ±1 hour. These are not removed at this stage — flagging preserves the record for human review. The flag is
SIGMA_OUTLIER.
4. Instrument Registry¶
The instrument registry is the central contribution of Stage 1. By the time Stage 2 begins with live telescopes, we will have characterized hundreds of instrument profiles from archival data. When a Stage 2 volunteer's data arrives, we can immediately look up their equipment and apply a pre-computed zero-point correction.
4.1 Registry Fields¶
See instruments table in Server Architecture Deep Dive.md for the full schema. Key calibration fields:
zero_point_v: The zero-point offset in V-band between this instrument's measurements and the standard system. A zero-point of +0.3 means this instrument measures 0.3 mag fainter than the catalog; we add 0.3 to its measurements to bring them to the standard system.color_term_bv: The color coefficient. Not all CCDs have perfectly flat spectral response. The color term accounts for systematic offsets that correlate with the source's B-V color. Typically small (0.02–0.15) for well-characterized CCDs.noise_floor_mmag: The irreducible noise floor for this instrument. Even with long exposures, scintillation and atmospheric variability impose a floor. For a typical 20cm amateur scope, this is ~3–10 mmag.
4.2 Calibration Method¶
For each instrument (unique observer code × telescope combination), calibrate against Gaia or APASS catalog stars.
Step 1: Find observations where the observer imaged a field containing many Gaia/APASS stars with known magnitudes. Every AAVSO variable star field contains comparison stars with catalog magnitudes.
Step 2: Match the observer's comparison star magnitudes against catalog values.
from astroquery.vizier import Vizier
def get_apass_comparison_stars(ra_deg: float, dec_deg: float,
radius_arcmin: float = 30) -> Table:
"""Fetch APASS catalog stars in a field for zero-point calibration."""
v = Vizier(columns=['RAJ2000', 'DEJ2000', 'Vmag', 'e_Vmag',
'Bmag', 'e_Bmag', 'r_mag', 'e_r_mag'],
row_limit=200)
result = v.query_region(
SkyCoord(ra=ra_deg*u.deg, dec=dec_deg*u.deg),
radius=radius_arcmin*u.arcmin,
catalog='APASS9'
)
if result:
return result[0]
return None
Step 3: For each session (observer + date), compute the zero-point:
zero_point = mean(mag_catalog - mag_observed) for all comparison stars
Step 4: Check for color term:
residual_i = (mag_catalog_i - mag_observed_i) - zero_point
color_term = linear_regression(residual_i ~ (B-V)_catalog_i).slope
Step 5: Store the derived calibration parameters in the instruments table. Update when new data arrives (running average with more weight on newer data).
Noise floor estimation:
def estimate_noise_floor(observations: list[RawPhotometry]) -> float:
"""
Estimate the irreducible noise floor for an instrument by looking
at how much scatter remains after removing all known systematic effects.
Uses a known non-variable comparison star.
"""
magnitudes = np.array([o.magnitude for o in observations])
# After zero-point correction, residual scatter is the noise floor
return np.std(magnitudes) * 1000 # in mmag
4.3 Observer Code Sources¶
Observer code systems:
- AAVSO codes: 3–5 character codes, globally unique. Query at https://www.aavso.org/observer-profile/{code}
- MPC codes: 3-character numeric/alpha codes. Registry at https://www.minorplanetcenter.net/iau/lists/ObsCodesF.html
- ETD: Free-text observer names — normalize by stripping whitespace, converting to uppercase
For Stage 2: OpenAstro site_id is the instrument identifier. But we should maintain backward compatibility with AAVSO and MPC codes so that a volunteer who is already an AAVSO observer can link their Stage 2 account to their historical AAVSO record.
5. Calibration Pipeline¶
5.1 Zero-Point Correction¶
Apply the instrument registry's zero-point to each observation:
def apply_zero_point_correction(obs: RawPhotometry, instrument: Instrument) -> float:
corrected_mag = obs.magnitude + instrument.zero_point_v
return corrected_mag
The corrected magnitude is in the standard Johnson-Cousins system.
5.2 Color Term Correction¶
Apply the color term correction when B and V observations are available for the same epoch:
def apply_color_correction(mag_obs: float, bv_color: float,
color_term: float) -> float:
"""
Corrects for the wavelength-dependent response of the detector.
mag_corrected = mag_obs - color_term * (B - V)_source
"""
return mag_obs - color_term * bv_color
For single-filter observations where B-V is not known: use the catalog B-V for the target if available, or skip color correction and note this in quality flags (NO_COLOR_CORRECTION).
5.3 Atmospheric Extinction¶
For airmass corrections, apply Bouguer's law:
mag_corrected = mag_observed - k * (airmass - 1)
k is the site's extinction coefficient (typically 0.15–0.30 mag/airmass in V band, varies by altitude and atmospheric conditions).
For AAVSO archival data: airmass is usually not provided per-observation. Use the site's typical extinction coefficient from published values or skip the correction. For Stage 2 live data: the client reports airmass for each observation; apply the correction.
5.4 Heliocentric / Barycentric Correction¶
Convert all times to BJD_TDB. See Section 3 (Normalization) for the implementation. This is critical for: - TTV analysis (need microsecond precision timing for best results) - Long-baseline period analysis (phase folding over years requires precise times)
The correction can be up to 8 minutes. For rough variable star monitoring it doesn't matter. For TTV science it is essential.
6. Combination and Light Curve Assembly¶
6.1 Ensemble Photometry¶
When multiple observers observe the same target in the same filter on the same night, we can combine their measurements to produce a better estimate than any single observer.
The combination uses SNR-weighted averaging, after applying zero-point corrections:
import numpy as np
from typing import list
def ensemble_photometry(observations: list[RawPhotometry],
instruments: dict[str, Instrument]) -> tuple[float, float]:
"""
Combine multiple observations of the same target at similar epochs.
Returns (weighted_magnitude, weighted_error).
Applies SNR weighting: weight_i = 1 / sigma_i^2
"""
calibrated_mags = []
weights = []
for obs in observations:
instrument = instruments.get(obs.observer_code)
if instrument is None:
# Uncalibrated instrument: use unweighted but flag it
calibrated_mag = obs.magnitude
sigma = obs.mag_error if obs.mag_error > 0 else 0.1
else:
calibrated_mag = apply_zero_point_correction(obs, instrument)
# Error: combine reported error with instrument noise floor in quadrature
sigma = np.sqrt(obs.mag_error**2 + (instrument.noise_floor_mmag / 1000)**2)
calibrated_mags.append(calibrated_mag)
weights.append(1.0 / sigma**2)
weights = np.array(weights)
mags = np.array(calibrated_mags)
weighted_mean = np.sum(weights * mags) / np.sum(weights)
weighted_error = 1.0 / np.sqrt(np.sum(weights))
return weighted_mean, weighted_error
When to combine vs. when to keep separate: - Combine observations within ±1 hour window for long-period variables (period >> 1 hour) - Do NOT combine observations for short-period variables or transits — use individual points - For occultations: absolutely do not combine — individual timing is the science
6.2 Multi-Filter Handling¶
Observations in different filters of the same target at similar epochs should be kept as separate data points (different rows in light_curve_points). Never average magnitudes across different filters.
When constructing the light curve for a target, always specify the filter. Multi-filter light curves are multiple time series, not one.
6.3 Light Curve Assembly¶
The assembled light curve is stored in light_curve_points (see schema in Server Architecture Deep Dive.md). Assembly process:
- Fetch all
observationsfortarget_idwithquality_score = 0(good quality only) - Apply calibration corrections
- Apply BJD_TDB conversion
- For each observation within a combination window: ensemble-combine if appropriate
- Insert/update rows in
light_curve_points
The assembly is idempotent: if run twice on the same data, produces the same result. This allows re-running after instrument calibration is updated.
def assemble_light_curve(target_id: str, filter_name: str,
start_bjd: float = None, end_bjd: float = None):
"""
Assemble the light curve for a target in a given filter.
Result stored in light_curve_points table.
"""
# Fetch observations
obs = db.query(Observation).filter(
Observation.target_id == target_id,
Observation.filter_used == filter_name,
Observation.quality_score == 0
).all()
if start_bjd:
obs = [o for o in obs if o.bjd_mid >= start_bjd]
if end_bjd:
obs = [o for o in obs if o.bjd_mid <= end_bjd]
# Sort by time
obs.sort(key=lambda o: o.bjd_mid)
# Insert into light_curve_points
points = []
for o in obs:
instrument = get_instrument(o.site_id)
calibrated_mag, calibrated_err = apply_calibration(o, instrument)
bjd = o.bjd_tdb if o.bjd_tdb else to_bjd_tdb(o.obs_mid, target.ra_deg, target.dec_deg)
points.append(LightCurvePoint(
target_id=target_id,
obs_id=o.obs_id,
bjd_tdb=bjd,
filter_used=filter_name,
magnitude=calibrated_mag,
mag_error=calibrated_err,
ensemble_corrected=instrument is not None,
zero_point_applied=instrument.zero_point_v if instrument else None
))
# Bulk insert / upsert
db.bulk_insert_mappings(LightCurvePoint, [p.__dict__ for p in points])
db.commit()
7. Quality Flagging¶
Quality control is where bad data gets caught before it corrupts the science products. The quality pipeline runs after each observation is ingested and assigns quality_score (0 = good, 1 = warning, 2 = bad) and specific quality_flags.
7.1 Quality Checks¶
Timestamp validation:
- TIMESTAMP_FUTURE: obs_mid is in the future (submission error)
- TIMESTAMP_ANCIENT: obs_mid is before 1900 (almost certainly a parsing error)
- TIMESTAMP_MISMATCH: Submission timestamp vs. obs_mid differ by more than exposure_sec + 300s
Magnitude validation:
- MAGNITUDE_TOO_BRIGHT: Magnitude < 0 for a star (impossible for point source photometry from the ground)
- MAGNITUDE_TOO_FAINT: Magnitude > 22 (beyond even large amateur scopes with reasonable exposure)
- MAGNITUDE_SATURATION_SUSPECTED: Magnitude is brighter than the site's estimated saturation limit
Airmass validation:
- HIGH_AIRMASS: Airmass > 3.0 (very poor quality expected)
- IMPOSSIBLE_AIRMASS: Airmass < 1.0 (impossible)
Comparison star validation (when comparison star data is submitted):
- POOR_COMPARISON_ENSEMBLE: Fewer than 3 comparison stars used
- COMPARISON_STAR_OUTLIER: One comparison star deviates by > 3σ from ensemble (that star may be variable itself)
Cross-site consistency check (for targets with multiple observers):
def check_cross_site_consistency(obs: Observation, window_hours: float = 2.0) -> list[str]:
"""
Compare this observation against other obs of the same target in a time window.
Flag if it deviates significantly from the ensemble.
"""
flags = []
recent_obs = get_observations_window(obs.target_id, obs.obs_mid, window_hours)
if len(recent_obs) < 2:
return flags # Can't check with only one point
mags = [o.magnitude for o in recent_obs if o.quality_score == 0]
if not mags:
return flags
median_mag = np.median(mags)
std_mag = np.std(mags)
if std_mag > 0 and abs(obs.magnitude - median_mag) > 4 * std_mag:
flags.append('CROSS_SITE_OUTLIER')
return flags
GPS timing validation (for occultation targets):
def validate_timing(obs: Observation, target: Target) -> list[str]:
flags = []
if target.requires_gps_timing:
site = get_site(obs.site_id)
if not site.has_gps:
flags.append('TIMING_NO_GPS')
# Check if event timing makes sense
if target.event_time:
dt = abs((obs.obs_mid - target.event_time).total_seconds())
if dt > 300: # More than 5 minutes from event time
flags.append('TIMING_OUTSIDE_WINDOW')
return flags
7.2 Automatic Rejection¶
Quality score 2 (bad) observations are excluded from all science products automatically. They are retained in the database for human review and potential recovery. Conditions for score 2:
- Any TIMESTAMP_* flag
- IMPOSSIBLE_AIRMASS
- MAGNITUDE_TOO_BRIGHT or MAGNITUDE_TOO_FAINT
- CROSS_SITE_OUTLIER with no adjacent good observations to explain the deviation
Quality score 1 (warning) observations are included in science products but marked. Conditions for score 1:
- HIGH_AIRMASS
- POOR_COMPARISON_ENSEMBLE
- TIMING_NO_GPS for occultation targets
- NO_COLOR_CORRECTION
8. FITS Ingestion and Plate Solving¶
When full FITS images are submitted (optional at Stage 2, not from Stage 1 archives), the pipeline runs additional processing.
8.1 Plate Solving¶
Plate solving determines the precise WCS (World Coordinate System) of an image — maps pixel coordinates to sky coordinates. This is necessary for: - Verifying the correct target was observed - Precise astrometry for NEO observations - Extracting photometry at the known target position even if the finder chart is off-center
Primary solver: astrometry.net (local installation preferred for production; cloud API for testing).
from astroquery.astrometry_net import AstrometryNet
an = AstrometryNet()
an.api_key = config.astrometry_api_key
wcs_header = an.solve_from_image(fits_path, force_image_upload=False)
Fallback: astap (Astrometric STAcking Program) — faster than astrometry.net for dense field images. Local installation, no API key required.
For production, install astrometry.net locally on the server with the appropriate index files. The index files to download depend on the FOV of incoming images; for typical amateur scopes (0.1°–3° FOV): index files 4205–4208.
Plate solving threshold: Reject solutions with residuals > 2 arcsec per star (indicates poor solve or wrong field).
8.2 Aperture Photometry¶
Run aperture photometry on submitted FITS files to extract or verify the photometric value:
from photutils.detection import DAOStarFinder
from photutils.aperture import aperture_photometry, CircularAperture
from astropy.stats import sigma_clipped_stats
from astropy.io import fits
import numpy as np
def extract_photometry(fits_path: str, target_ra: float, target_dec: float,
wcs_header: dict) -> dict:
"""
Extract aperture photometry for a target in a FITS image.
Returns magnitude (instrumental) and error.
"""
with fits.open(fits_path) as hdul:
data = hdul[0].data.astype(float)
header = hdul[0].header
# Background estimation
from photutils.background import Background2D, MedianBackground
bkg_estimator = MedianBackground()
bkg = Background2D(data, box_size=50, filter_size=3, bkg_estimator=bkg_estimator)
data_sub = data - bkg.background
# Convert target sky coordinates to pixel coordinates
from astropy.wcs import WCS
from astropy.coordinates import SkyCoord
wcs = WCS(wcs_header)
target_coord = SkyCoord(ra=target_ra*u.deg, dec=target_dec*u.deg)
target_pixel = wcs.world_to_pixel(target_coord)
# Estimate aperture size from seeing
# Optimal aperture radius ≈ 1.5 × FWHM in pixels
fwhm_pixels = estimate_fwhm(data_sub) # from brightest stars in field
aperture_radius = max(1.5 * fwhm_pixels, 3.0) # minimum 3 pixels
aperture = CircularAperture(target_pixel, r=aperture_radius)
annulus_aperture = CircularAnnulus(target_pixel, r_in=aperture_radius*2,
r_out=aperture_radius*3)
# Photometry
phot_table = aperture_photometry(data_sub, aperture)
bkg_annulus = aperture_photometry(data_sub, annulus_aperture)
bkg_per_pixel = bkg_annulus['aperture_sum'] / annulus_aperture.area
source_flux = phot_table['aperture_sum'] - bkg_per_pixel * aperture.area
# Instrumental magnitude: m_inst = -2.5 * log10(flux / exptime)
exptime = header.get('EXPTIME', 1.0)
if source_flux <= 0:
return {'success': False, 'reason': 'negative_flux'}
mag_inst = -2.5 * np.log10(source_flux / exptime)
# Poisson noise + readout noise estimate
gain = header.get('GAIN', 1.0)
rdnoise = header.get('RDNOISE', 5.0) # e- RMS
n_pixels = np.pi * aperture_radius**2
noise = np.sqrt(source_flux / gain + n_pixels * (bkg_per_pixel / gain + rdnoise**2 / gain**2))
snr = source_flux / noise
mag_error = 1.0857 / snr # delta_m = (1 / ln(10)) * (delta_f / f) ≈ 1.0857 / SNR
return {
'success': True,
'magnitude_instrumental': float(mag_inst),
'mag_error': float(mag_error),
'snr': float(snr),
'aperture_radius_px': float(aperture_radius),
'fwhm_pixels': float(fwhm_pixels)
}
8.3 Comparison Star Extraction¶
For ensemble differential photometry, extract comparison stars from the same image and cross-match against APASS or Gaia:
def extract_comparison_stars(fits_path: str, wcs_header: dict,
target_ra: float, target_dec: float) -> list[dict]:
"""
Detect all stars in image, cross-match against APASS catalog,
return list of calibrated comparison stars.
"""
with fits.open(fits_path) as hdul:
data = hdul[0].data.astype(float)
# Detect sources
mean, median, std = sigma_clipped_stats(data)
daofind = DAOStarFinder(fwhm=4.0, threshold=10*std)
sources = daofind(data - median)
# Convert pixel coords to sky coords
wcs = WCS(wcs_header)
sky_coords = wcs.pixel_to_world(sources['xcentroid'], sources['ycentroid'])
# Cross-match against APASS
apass_catalog = get_apass_comparison_stars(target_ra, target_dec)
comp_stars = []
if apass_catalog is not None:
catalog_coords = SkyCoord(ra=apass_catalog['RAJ2000'],
dec=apass_catalog['DEJ2000'])
idx, sep, _ = sky_coords.match_to_catalog_sky(catalog_coords)
good_match = sep < 3*u.arcsec
for i, match in enumerate(good_match):
if match and apass_catalog[idx[i]]['Vmag'] is not np.ma.masked:
comp_stars.append({
'catalog_id': f"APASS_{idx[i]}",
'ra_deg': float(sky_coords[i].ra.deg),
'dec_deg': float(sky_coords[i].dec.deg),
'mag_catalog': float(apass_catalog[idx[i]]['Vmag']),
'mag_instrumental': None # filled in after photometry
})
return comp_stars
9. Science Layer: TTV Pipeline¶
The TTV pipeline is the flagship Stage 1 science product. It takes a time series of transit mid-times and extracts Transit Timing Variations (TTVs), then optionally runs the reverse n-body inference to recover perturber parameters.
See NewOpenAstro/Science/Projects/TTV Reverse N-Body Inference.md for the scientific background. This section covers the implementation.
9.1 Transit Mid-Time Extraction¶
Given a light curve with a transit, fit the transit model to extract the precise mid-time:
import batman
import scipy.optimize as op
from dataclasses import dataclass
@dataclass
class TransitFitResult:
t_mid_bjd: float # Transit mid-time in BJD_TDB
t_mid_err: float # 1σ error on mid-time (days)
depth_frac: float # Transit depth as fraction of out-of-transit flux
duration_hours: float
residuals_rms: float # RMS of fit residuals (mag)
success: bool
def fit_transit_midtime(bjd: np.ndarray, flux: np.ndarray, flux_err: np.ndarray,
t0_guess: float, period: float,
planet_params: dict) -> TransitFitResult:
"""
Fit a transit model to extract precise mid-time.
planet_params: dict with 'rp', 'a', 'inc', 'ecc', 'w', 'limb_dark_coeffs'
"""
# Batman transit model
params = batman.TransitParams()
params.t0 = t0_guess
params.per = period
params.rp = planet_params['rp'] # Rp/R*
params.a = planet_params['a'] # a/R*
params.inc = planet_params['inc']
params.ecc = planet_params.get('ecc', 0.0)
params.w = planet_params.get('w', 90.0)
params.u = planet_params.get('limb_dark_coeffs', [0.3, 0.2])
params.limb_dark = 'quadratic'
m = batman.TransitModel(params, bjd)
def chi2(theta):
t_mid, depth_factor = theta
params.t0 = t_mid
params.rp *= depth_factor
model_flux = m.light_curve(params)
return np.sum(((flux - model_flux) / flux_err)**2)
result = op.minimize(chi2, [t0_guess, 1.0], method='Nelder-Mead',
options={'xatol': 1e-7, 'fatol': 1e-10})
if not result.success:
return TransitFitResult(t_mid_bjd=t0_guess, t_mid_err=999,
depth_frac=0, duration_hours=0,
residuals_rms=999, success=False)
t_mid_fit, depth_factor_fit = result.x
# Estimate timing uncertainty via MCMC or parabolic approximation
# For production: use emcee. For speed: parabolic approximation.
timing_err = estimate_timing_uncertainty_parabolic(chi2, t_mid_fit)
return TransitFitResult(
t_mid_bjd=float(t_mid_fit),
t_mid_err=float(timing_err),
depth_frac=float(params.rp**2 * depth_factor_fit),
duration_hours=float(estimate_duration(params, bjd)),
residuals_rms=float(np.sqrt(result.fun / len(bjd))),
success=True
)
9.2 TTV Extraction¶
Once we have a time series of transit mid-times, extract TTVs:
def extract_ttvs(transit_times: list[tuple[float, float]],
period: float, t0: float) -> pd.DataFrame:
"""
Compute TTV residuals (observed - calculated) for each transit.
transit_times: list of (bjd_mid, bjd_mid_err) tuples
period: best-estimate period in days
t0: reference transit time in BJD_TDB
Returns DataFrame with columns: transit_number, t_obs, t_calc, ttv_days, ttv_err
"""
rows = []
for t_obs, t_err in sorted(transit_times, key=lambda x: x[0]):
# Find nearest expected transit
n = round((t_obs - t0) / period)
t_calc = t0 + n * period
ttv = t_obs - t_calc
rows.append({
'transit_number': int(n),
't_observed_bjd': t_obs,
't_calculated_bjd': t_calc,
'ttv_days': ttv,
'ttv_err_days': t_err
})
df = pd.DataFrame(rows).sort_values('transit_number')
# Refine period using linear fit to O-C diagram
if len(df) >= 5:
from scipy.stats import linregress
slope, intercept, r, p, se = linregress(df['transit_number'], df['t_observed_bjd'])
refined_period = slope
refined_t0 = intercept
# Recompute TTVs with refined period
df['t_calc_refined'] = refined_t0 + df['transit_number'] * refined_period
df['ttv_days_refined'] = df['t_observed_bjd'] - df['t_calc_refined']
return df
9.3 N-Body Inference¶
The reverse n-body problem: given observed TTVs, infer perturber orbital parameters. This is the computationally intensive step.
import ttvfast
import emcee
def run_ttv_inference(ttv_data: pd.DataFrame, known_planet: dict) -> dict:
"""
Run MCMC inference on TTV residuals to recover perturber parameters.
known_planet: {'mass': M_earth, 'period': days, 't0': BJD, 'e': float, 'omega': deg}
Returns posterior samples and best-fit parameters.
"""
def log_likelihood(theta):
m2, p2, t2, e2, omega2 = theta
# Forward model: simulate TTVs with TTVFast
planets = [
ttvfast.models.Planet(
mass=known_planet['mass'] / 333000, # convert to solar masses
period=known_planet['period'],
ecc=known_planet.get('e', 0.0),
omega=known_planet.get('omega', 0.0),
mean_anomaly=known_planet.get('mean_anomaly', 0.0)
),
ttvfast.models.Planet(
mass=m2 / 333000,
period=p2,
ecc=e2,
omega=omega2,
mean_anomaly=t2 # approximate
)
]
results = ttvfast.ttvfast(planets,
stellar_mass=1.0,
time=ttv_data['t_calculated_bjd'].min() - 100,
dt=0.001,
total=ttv_data['t_calculated_bjd'].max() + 100,
input_flag=1)
# Extract predicted mid-times for the inner planet
planet1_times = np.array(results[0][1]) # transit times for planet 1
# Match predicted times to observed transit numbers
chi2 = 0
for _, row in ttv_data.iterrows():
n = int(row['transit_number'])
if n >= len(planet1_times):
return -np.inf
predicted_ttv = planet1_times[n] - row['t_calculated_bjd']
chi2 += ((row['ttv_days'] - predicted_ttv) / row['ttv_err_days'])**2
return -0.5 * chi2
def log_prior(theta):
m2, p2, t2, e2, omega2 = theta
# Physically motivated priors
if not (0.01 < m2 < 3000): return -np.inf # Earth masses
if not (1 < p2 < 1000): return -np.inf # days
if not (0 <= e2 < 0.9): return -np.inf
if not (0 <= omega2 < 360): return -np.inf
return 0.0
def log_probability(theta):
lp = log_prior(theta)
if not np.isfinite(lp):
return -np.inf
return lp + log_likelihood(theta)
# Initialize walkers near reasonable starting point
ndim = 5
nwalkers = 32
p0 = np.array([10, 50, 0.5, 0.05, 90]) + 0.1 * np.random.randn(nwalkers, ndim)
sampler = emcee.EnsembleSampler(nwalkers, ndim, log_probability)
sampler.run_mcmc(p0, nsteps=2000, progress=True)
# Discard burn-in
flat_samples = sampler.get_chain(discard=500, thin=15, flat=True)
return {
'samples': flat_samples,
'median': np.median(flat_samples, axis=0),
'std': np.std(flat_samples, axis=0),
'parameter_names': ['m2_Mearth', 'period_days', 't0_fraction', 'eccentricity', 'omega_deg']
}
Computational cost note: The MCMC run with 32 walkers × 2000 steps × N_transits TTVFast calls is expensive. Estimate 1–10 minutes on a modern CPU for a typical system with 50 transits. For Stage 1 proof-of-concept, this is fine. For Stage 2 with live data and many target systems, consider a Celery task queue or BOINC (see TTV document).
9.4 Period Search (Lomb-Scargle)¶
For variable star science:
from astropy.timeseries import LombScargle
def run_period_search(bjd: np.ndarray, mag: np.ndarray, mag_err: np.ndarray,
period_min: float = 0.1, period_max: float = 365) -> dict:
"""
Run Lomb-Scargle period search on a light curve.
Returns best period and periodogram.
"""
frequency_min = 1.0 / period_max
frequency_max = 1.0 / period_min
ls = LombScargle(bjd, mag, mag_err, normalization='standard')
frequency, power = ls.autopower(
minimum_frequency=frequency_min,
maximum_frequency=frequency_max,
samples_per_peak=10
)
best_freq = frequency[np.argmax(power)]
best_period = 1.0 / best_freq
false_alarm = ls.false_alarm_probability(power.max(), method='bootstrap')
return {
'best_period_days': float(best_period),
'peak_power': float(power.max()),
'false_alarm_probability': float(false_alarm),
'frequency': frequency.tolist(),
'power': power.tolist()
}
10. Output Products¶
Stage 1 produces:
-
Calibrated FITS archive: Processed FITS files with updated WCS headers and photometry keywords. Stored in B2. Filename convention:
{target_name}/{date_utc}/{observer_code}_{filter}_{jd}.fits -
Light curve files: Per-target per-filter light curves in FITS binary table format (standard for astronomy) AND CSV (accessible). Columns: BJD_TDB, magnitude, mag_error, filter, observer_code, quality_flag.
-
Instrument registry export: CSV of all characterized instruments with calibration parameters. Shared publicly as a community resource. This is a distinct scientific contribution — a database of amateur telescope system parameters.
-
TTV timing tables: Per-planet tables of transit mid-times, errors, and TTVs. FITS binary table format. Accompanied by diagnostic plots (O-C diagram).
-
Science results: Period measurements, TTV posteriors, comparison with published values. LaTeX-formatted for paper preparation.
11. First Science Target Selection¶
The Roadmap identifies three candidates. Recommendation:
Start with option 1: AAVSO variable star period refinement.
Reasons: - AAVSO data API is the most mature and well-documented of all the sources - Success metric is unambiguous (period measurement with uncertainty) - Pipeline can be validated against published GCVS periods - Low risk: if the pipeline has bugs, the comparison against published values immediately reveals them - Can produce a publishable result within weeks of starting
Candidate targets: - Z UMa (Mira variable): 195-day period, thousands of AAVSO observations spanning decades. Period shows secular changes and cycle-to-cycle variation. Paper topic: period change analysis using full AAVSO archive. - T Cep (Mira): Similar, but less well-studied secular period change. More room for novel contribution. - RR Lyrae (classic Cepheid-like): Period stability is itself scientifically interesting. Period changes at sub-second level require the BJD_TDB correction.
After period refinement paper is in draft: Move to option 2 (TTV from archival ETD data).
Candidate exoplanet system for TTV proof-of-concept: - WASP-12b: Very short period (1.09 days), known orbital decay, hundreds of ETD transits available. Strong signal. Risk: period derivative is known; our value should match. Good validation target. - Kepler-19c perturber inference: Kepler-19b has known TTVs with a known perturber. We can re-derive the perturber parameters using only ETD data as a proof-of-concept that our inference pipeline recovers the known answer.
Last updated: 2026-03-20 Status: Planning document — not yet implemented Dependencies: Server Architecture Deep Dive.md (database schema), instruments table, light_curve_points table First action: Stand up AAVSO adapter and run it against Z UMa