Skip to content

Stage 1 Data Pipeline Design

This document covers the full data pipeline for Stage 1 — ingesting, calibrating, and combining data from existing public archives (AAVSO, MPC, AstroBin, ETD) to produce calibrated light curves and scientific output without any live telescope network. The pipeline also serves as the foundation for Stage 2's live data processing.

The Roadmap document (NewOpenAstro/Roadmap.md) establishes the high-level structure. This document provides the implementation details — the specific APIs, parsing code, calibration math, and quality control logic needed to actually build it.


1. Architecture Overview

The pipeline is modular. Each stage has a clean process(input) → output signature. Stages can be run independently and their outputs inspected. Nothing is a black box.

┌───────────────────────────────────────────────────────────────────────┐
│                        STAGE 1 DATA PIPELINE                          │
│                                                                       │
│  Source Adapters          Normalize          Calibrate                │
│  ┌──────────┐             ┌────────────┐     ┌───────────────────┐   │
│  │  AAVSO   │──raw LCs───▶│            │     │                   │   │
│  │ Adapter  │             │            │     │  Zero-point vs    │   │
│  ├──────────┤             │ Normalizer │────▶│  Gaia/APASS       │   │
│  │   MPC    │──raw obs───▶│            │     │  Color term       │   │
│  │ Adapter  │             │  WCS check │     │  correction       │   │
│  ├──────────┤             │  Dedup     │     │                   │   │
│  │ AstroBin │──FITS───────▶│  Std hdr  │     │  Instrument       │   │
│  │ Adapter  │             └────────────┘     │  registry update  │   │
│  ├──────────┤                                └────────┬──────────┘   │
│  │   ETD    │──transit times──────────────────────────┤              │
│  │ Adapter  │                                         │              │
│  └──────────┘                                         ▼              │
│                                              ┌──────────────────┐    │
│                                              │   Combination    │    │
│                                              │   & Assembly     │    │
│                                              │                  │    │
│                                              │ SNR-weighted     │    │
│                                              │ ensemble phot    │    │
│                                              │ Light curve      │    │
│                                              │ assembly         │    │
│                                              └────────┬─────────┘    │
│                                                       ▼              │
│                                              ┌──────────────────┐    │
│                                              │  Science Layer   │    │
│                                              │                  │    │
│                                              │ Lomb-Scargle     │    │
│                                              │ Transit timing   │    │
│                                              │ TTV extraction   │    │
│                                              │ N-body inference │    │
│                                              └────────┬─────────┘    │
│                                                       ▼              │
│                                              ┌──────────────────┐    │
│                                              │    Outputs       │    │
│                                              │                  │    │
│                                              │ Calibrated FITS  │    │
│                                              │ archive          │    │
│                                              │ Light curves     │    │
│                                              │ (FITS + CSV)     │    │
│                                              │ Instrument       │    │
│                                              │ registry         │    │
│                                              │ Science results  │    │
│                                              └──────────────────┘    │
└───────────────────────────────────────────────────────────────────────┘

2. Source Adapters

2.1 AAVSO

What AAVSO provides: The richest amateur photometry archive in existence. Millions of observations of variable stars, spanning decades. Data is already observer-calibrated using standard comparison stars from AAVSO's own charts. Equipment metadata is associated with each observer code.

Access: astroquery.aavso (built into astroquery) or direct API.

from astroquery.aavso import AAVSO

aavso = AAVSO()
table = aavso.query_star(
    star_name='RR_Lyr',   # or any AAVSO VSX name
    obsbands=['V', 'B'],
    date_start='2024-01-01',
    date_end='2026-01-01'
)

AAVSO observation format (key fields): - obsbandcode: Filter (V, B, R, I, Vis, etc.) - obsdate: JD of observation - magnitude: Reported magnitude - uncertainty: Reported photometric uncertainty - observer: AAVSO observer code (e.g., 'MAAN') - comp1: Primary comparison star designation - comp2: Check star designation - airmass: (sometimes reported) - telescope / ccd: Equipment info (observer-reported, inconsistent)

AAVSO data quality notes: - Visual (Vis band) estimates are systematically noisier than CCD; treat separately - Comparison star issues: some observers use non-standard comp stars; flag and cross-check against AAVSO chart revisions - Saturated measurements: occasionally observers submit magnitudes for saturated images; outlier rejection handles these - Observer consistency: some observers have multi-decade records; their historical data may predate standardization

AAVSO adapter implementation:

from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import astropy.units as u
from astropy.time import Time

@dataclass
class RawPhotometry:
    """Normalized photometry record, source-agnostic."""
    source: str               # 'aavso', 'mpc', 'astrobin', 'etd'
    target_name: str
    observer_code: str
    jd_obs: float             # Julian Date of observation mid-point
    filter_name: str          # normalized to Johnson-Cousins: 'B','V','R','I','Clear'
    magnitude: float
    mag_error: float
    ra_deg: Optional[float]   # if known
    dec_deg: Optional[float]
    airmass: Optional[float]
    instrument_id: Optional[str]  # FK to instruments table after calibration
    raw_metadata: dict

class AAVSOAdapter:
    def fetch(self, target: str, start_jd: float, end_jd: float,
              filters: list[str] = None) -> list[RawPhotometry]:
        from astroquery.aavso import AAVSO
        aavso = AAVSO()

        obs_time_start = Time(start_jd, format='jd').iso.split('T')[0]
        obs_time_end = Time(end_jd, format='jd').iso.split('T')[0]

        try:
            table = aavso.query_star(
                star_name=target,
                obsbands=filters,
                date_start=obs_time_start,
                date_end=obs_time_end
            )
        except Exception as e:
            log.error(f"AAVSO fetch failed for {target}: {e}")
            return []

        records = []
        for row in table:
            # Skip visual estimates unless explicitly requested
            if row['obsbandcode'] == 'Vis' and 'Vis' not in (filters or []):
                continue

            filter_normalized = self._normalize_filter(row['obsbandcode'])
            if filter_normalized is None:
                continue

            records.append(RawPhotometry(
                source='aavso',
                target_name=target,
                observer_code=str(row['observer']),
                jd_obs=float(row['obsdate']),
                filter_name=filter_normalized,
                magnitude=float(row['magnitude']),
                mag_error=float(row['uncertainty']) if row['uncertainty'] else 0.05,
                ra_deg=None,  # AAVSO doesn't give per-observation RA/Dec
                dec_deg=None,
                airmass=float(row['airmass']) if row['airmass'] else None,
                instrument_id=None,  # Assigned after instrument registry lookup
                raw_metadata=dict(zip(table.colnames, [str(v) for v in row]))
            ))

        return records

    def _normalize_filter(self, aavso_band: str) -> Optional[str]:
        """Map AAVSO band codes to standard Johnson-Cousins names."""
        mapping = {
            'B': 'B', 'V': 'V', 'R': 'R', 'I': 'I',
            'Rc': 'R', 'Ic': 'I',  # Cousins variants
            'TG': 'V',  # Transformed Green → V
            'TR': 'R',  # Transformed Red
            'CV': 'Clear',  # Clear V-filtered comparison
            'CR': 'Clear',
            'J': None,  # NIR — skip unless we specifically want it
        }
        return mapping.get(aavso_band)

2.2 ETD (Exoplanet Transit Database)

What ETD provides: Amateur transit light curves submitted by observers worldwide. The Czech Astronomical Society manages this. Each submission includes: transit start, minimum, end times; depth; fitted parameters; raw light curve data.

Access: ETD has a public web interface but no formal API. Two options: 1. Scrape the per-target HTML page (fragile, not recommended) 2. Use astroquery.exofop instead: ExoFOP (NASA) is better-structured, includes ETD data plus TESS data, and has a proper API.

ExoFOP access:

from astroquery.exofop import ExoFOP

exofop = ExoFOP()
timeseries = exofop.query_planet('KELT-11b')  # returns astropy Table

For raw ETD data when ExoFOP doesn't have what we need: - ETD provides per-transit data files at http://var2.astro.cz/tresca/transit-detail.php?id={transit_id} - Bulk download: http://var2.astro.cz/tresca/transit-list.php?target={planet_name}&format=csv

ETD data format for transit timing:

# ETD output columns (relevant subset):
# Transit_midtime (JD), Transit_midtime_error (days),
# Duration (hours), Depth (mmag), Observer, Telescope, Filter

ETD adapter (simplified — full implementation handles edge cases):

import requests
import pandas as pd
import io

class ETDAdapter:
    ETD_URL = "http://var2.astro.cz/tresca/transit-list.php"

    def fetch_transit_times(self, planet_name: str) -> pd.DataFrame:
        resp = requests.get(self.ETD_URL,
                           params={'target': planet_name, 'format': 'csv'},
                           timeout=60)
        resp.raise_for_status()
        df = pd.read_csv(io.StringIO(resp.text), comment='#')
        df = df.rename(columns={
            'Transit_midtime': 'bjd_mid',
            'Transit_midtime_error': 'bjd_mid_err',
            'Duration': 'duration_hours',
            'Depth': 'depth_mmag'
        })
        df['source'] = 'etd'
        return df

Using ETD data for TTV: ETD's transit mid-times have already been measured by the submitting observer, so we don't need to re-fit. We take the BJD mid-time and error from each submission and feed directly into the TTV pipeline. See Section 7.

2.3 MPC

What MPC provides: Standardized astrometric observations of minor planets from amateur and professional observers worldwide. The MPC80 format is compact ASCII.

Access: Astroquery or direct HTTP.

from astroquery.mpc import MPC

mpc = MPC()
obs = mpc.get_observations_async('(433) Eros', get_mpc_observations=True)
# Returns table of MPC80-format observations

Or direct API: https://www.minorplanetcenter.net/obs_search?object_id=433&output_format=json

MPC data for OpenAstro Stage 1: - Primary use: validate the astrometry pipeline against known asteroid positions - Secondary: feed into occultation prediction pipeline (need accurate asteroid ephemerides) - NOT for photometry: MPC observations are astrometric; magnitudes are incidental and poorly calibrated

MPC observer code registry: Each MPC observer code maps to a telescope site with known location. Cross-reference MPC observer codes with our instruments table. This is a valuable source of characterized site locations.

2.4 AstroBin

What AstroBin provides: Large community image hosting platform. Contains processed FITS files from thousands of observers, with equipment metadata. Not a science archive — images are primarily for aesthetics — but equipment metadata and some photometric data are accessible.

Access: AstroBin API (registration required, free tier available).

GET https://www.astrobin.com/api/v1/image/?format=json&api_key={key}&api_secret={secret}
    &imaging_telescopes__aperture__gte=200  # aperture filter
    &subject_type__in=STAR,DEEP_SKY         # type filter

AstroBin's value for Stage 1: Primarily the equipment metadata, not the images. We can use AstroBin as a source for populating the instruments registry — querying the telescope/camera combinations that observers use, along with rough site locations. This pre-populates the registry before any Stage 2 live observations arrive.

For actual image data, AstroBin images are usually heavily processed (noise reduction, stretch, etc.) and not appropriate for science photometry. Exception: some observers upload science-quality calibrated FITS. These can be identified by checking FITS headers for proper calibration keywords (GAIN, RDNOISE, etc.).

AstroBin adapter (instruments registry focus):

class AstroBinAdapter:
    API_URL = "https://www.astrobin.com/api/v1"

    def fetch_equipment_profiles(self, min_aperture_mm: int = 100) -> list[dict]:
        """Fetch observer equipment combinations to populate instrument registry."""
        resp = requests.get(
            f"{self.API_URL}/telescope/",
            params={
                'format': 'json',
                'aperture__gte': min_aperture_mm,
                'limit': 100,
                'offset': 0
            },
            headers={'Authorization': f'Token {self.api_key}'}
        )
        return resp.json().get('objects', [])


3. Normalization Stage

The normalizer takes RawPhotometry records from any adapter and produces a unified internal format, applying:

  1. Filter standardization: Map all filter names to Johnson-Cousins (B, V, R, I) or Sloan (g, r, i, z). Anything that can't be mapped is flagged as filter_unknown.

  2. Time standardization: Convert all times to BJD_TDB (Barycentric Julian Date, Barycentric Dynamical Time). This corrects for Earth's orbital motion around the Sun (up to ±8 minutes) and is the standard for precision timing in exoplanet and variable star work.

    from astropy.time import Time
    from astropy.coordinates import SkyCoord, EarthLocation
    import astropy.units as u
    
    def to_bjd_tdb(jd_utc: float, ra_deg: float, dec_deg: float,
                   location: EarthLocation = None) -> float:
        t = Time(jd_utc, format='jd', scale='utc')
        if location:
            t = t.light_travel_time(
                SkyCoord(ra=ra_deg*u.deg, dec=dec_deg*u.deg),
                ephemeris='builtin',
                location=location
            ) + t
        # Convert to TDB scale
        return t.tdb.jd
    

  3. Coordinate standardization: All coordinates to ICRS J2000 decimal degrees. This catches occasional submissions in galactic coordinates or other frames.

  4. Deduplication: If two records from different sources have the same target, same observer code, same JD (within 0.001 days = 1.4 minutes), and same filter, they are likely duplicates. Keep the one with better metadata and flag the other.

  5. Outlier flagging: Flag any single observation more than 5σ from the local median of observations within ±1 hour. These are not removed at this stage — flagging preserves the record for human review. The flag is SIGMA_OUTLIER.


4. Instrument Registry

The instrument registry is the central contribution of Stage 1. By the time Stage 2 begins with live telescopes, we will have characterized hundreds of instrument profiles from archival data. When a Stage 2 volunteer's data arrives, we can immediately look up their equipment and apply a pre-computed zero-point correction.

4.1 Registry Fields

See instruments table in Server Architecture Deep Dive.md for the full schema. Key calibration fields:

  • zero_point_v: The zero-point offset in V-band between this instrument's measurements and the standard system. A zero-point of +0.3 means this instrument measures 0.3 mag fainter than the catalog; we add 0.3 to its measurements to bring them to the standard system.
  • color_term_bv: The color coefficient. Not all CCDs have perfectly flat spectral response. The color term accounts for systematic offsets that correlate with the source's B-V color. Typically small (0.02–0.15) for well-characterized CCDs.
  • noise_floor_mmag: The irreducible noise floor for this instrument. Even with long exposures, scintillation and atmospheric variability impose a floor. For a typical 20cm amateur scope, this is ~3–10 mmag.

4.2 Calibration Method

For each instrument (unique observer code × telescope combination), calibrate against Gaia or APASS catalog stars.

Step 1: Find observations where the observer imaged a field containing many Gaia/APASS stars with known magnitudes. Every AAVSO variable star field contains comparison stars with catalog magnitudes.

Step 2: Match the observer's comparison star magnitudes against catalog values.

from astroquery.vizier import Vizier

def get_apass_comparison_stars(ra_deg: float, dec_deg: float,
                                radius_arcmin: float = 30) -> Table:
    """Fetch APASS catalog stars in a field for zero-point calibration."""
    v = Vizier(columns=['RAJ2000', 'DEJ2000', 'Vmag', 'e_Vmag',
                        'Bmag', 'e_Bmag', 'r_mag', 'e_r_mag'],
               row_limit=200)
    result = v.query_region(
        SkyCoord(ra=ra_deg*u.deg, dec=dec_deg*u.deg),
        radius=radius_arcmin*u.arcmin,
        catalog='APASS9'
    )
    if result:
        return result[0]
    return None

Step 3: For each session (observer + date), compute the zero-point:

zero_point = mean(mag_catalog - mag_observed) for all comparison stars

Step 4: Check for color term:

residual_i = (mag_catalog_i - mag_observed_i) - zero_point
color_term = linear_regression(residual_i ~ (B-V)_catalog_i).slope

Step 5: Store the derived calibration parameters in the instruments table. Update when new data arrives (running average with more weight on newer data).

Noise floor estimation:

def estimate_noise_floor(observations: list[RawPhotometry]) -> float:
    """
    Estimate the irreducible noise floor for an instrument by looking
    at how much scatter remains after removing all known systematic effects.
    Uses a known non-variable comparison star.
    """
    magnitudes = np.array([o.magnitude for o in observations])
    # After zero-point correction, residual scatter is the noise floor
    return np.std(magnitudes) * 1000  # in mmag

4.3 Observer Code Sources

Observer code systems: - AAVSO codes: 3–5 character codes, globally unique. Query at https://www.aavso.org/observer-profile/{code} - MPC codes: 3-character numeric/alpha codes. Registry at https://www.minorplanetcenter.net/iau/lists/ObsCodesF.html - ETD: Free-text observer names — normalize by stripping whitespace, converting to uppercase

For Stage 2: OpenAstro site_id is the instrument identifier. But we should maintain backward compatibility with AAVSO and MPC codes so that a volunteer who is already an AAVSO observer can link their Stage 2 account to their historical AAVSO record.


5. Calibration Pipeline

5.1 Zero-Point Correction

Apply the instrument registry's zero-point to each observation:

def apply_zero_point_correction(obs: RawPhotometry, instrument: Instrument) -> float:
    corrected_mag = obs.magnitude + instrument.zero_point_v
    return corrected_mag

The corrected magnitude is in the standard Johnson-Cousins system.

5.2 Color Term Correction

Apply the color term correction when B and V observations are available for the same epoch:

def apply_color_correction(mag_obs: float, bv_color: float,
                           color_term: float) -> float:
    """
    Corrects for the wavelength-dependent response of the detector.
    mag_corrected = mag_obs - color_term * (B - V)_source
    """
    return mag_obs - color_term * bv_color

For single-filter observations where B-V is not known: use the catalog B-V for the target if available, or skip color correction and note this in quality flags (NO_COLOR_CORRECTION).

5.3 Atmospheric Extinction

For airmass corrections, apply Bouguer's law:

mag_corrected = mag_observed - k * (airmass - 1)
where k is the site's extinction coefficient (typically 0.15–0.30 mag/airmass in V band, varies by altitude and atmospheric conditions).

For AAVSO archival data: airmass is usually not provided per-observation. Use the site's typical extinction coefficient from published values or skip the correction. For Stage 2 live data: the client reports airmass for each observation; apply the correction.

5.4 Heliocentric / Barycentric Correction

Convert all times to BJD_TDB. See Section 3 (Normalization) for the implementation. This is critical for: - TTV analysis (need microsecond precision timing for best results) - Long-baseline period analysis (phase folding over years requires precise times)

The correction can be up to 8 minutes. For rough variable star monitoring it doesn't matter. For TTV science it is essential.


6. Combination and Light Curve Assembly

6.1 Ensemble Photometry

When multiple observers observe the same target in the same filter on the same night, we can combine their measurements to produce a better estimate than any single observer.

The combination uses SNR-weighted averaging, after applying zero-point corrections:

import numpy as np
from typing import list

def ensemble_photometry(observations: list[RawPhotometry],
                        instruments: dict[str, Instrument]) -> tuple[float, float]:
    """
    Combine multiple observations of the same target at similar epochs.
    Returns (weighted_magnitude, weighted_error).

    Applies SNR weighting: weight_i = 1 / sigma_i^2
    """
    calibrated_mags = []
    weights = []

    for obs in observations:
        instrument = instruments.get(obs.observer_code)
        if instrument is None:
            # Uncalibrated instrument: use unweighted but flag it
            calibrated_mag = obs.magnitude
            sigma = obs.mag_error if obs.mag_error > 0 else 0.1
        else:
            calibrated_mag = apply_zero_point_correction(obs, instrument)
            # Error: combine reported error with instrument noise floor in quadrature
            sigma = np.sqrt(obs.mag_error**2 + (instrument.noise_floor_mmag / 1000)**2)

        calibrated_mags.append(calibrated_mag)
        weights.append(1.0 / sigma**2)

    weights = np.array(weights)
    mags = np.array(calibrated_mags)

    weighted_mean = np.sum(weights * mags) / np.sum(weights)
    weighted_error = 1.0 / np.sqrt(np.sum(weights))

    return weighted_mean, weighted_error

When to combine vs. when to keep separate: - Combine observations within ±1 hour window for long-period variables (period >> 1 hour) - Do NOT combine observations for short-period variables or transits — use individual points - For occultations: absolutely do not combine — individual timing is the science

6.2 Multi-Filter Handling

Observations in different filters of the same target at similar epochs should be kept as separate data points (different rows in light_curve_points). Never average magnitudes across different filters.

When constructing the light curve for a target, always specify the filter. Multi-filter light curves are multiple time series, not one.

6.3 Light Curve Assembly

The assembled light curve is stored in light_curve_points (see schema in Server Architecture Deep Dive.md). Assembly process:

  1. Fetch all observations for target_id with quality_score = 0 (good quality only)
  2. Apply calibration corrections
  3. Apply BJD_TDB conversion
  4. For each observation within a combination window: ensemble-combine if appropriate
  5. Insert/update rows in light_curve_points

The assembly is idempotent: if run twice on the same data, produces the same result. This allows re-running after instrument calibration is updated.

def assemble_light_curve(target_id: str, filter_name: str,
                         start_bjd: float = None, end_bjd: float = None):
    """
    Assemble the light curve for a target in a given filter.
    Result stored in light_curve_points table.
    """
    # Fetch observations
    obs = db.query(Observation).filter(
        Observation.target_id == target_id,
        Observation.filter_used == filter_name,
        Observation.quality_score == 0
    ).all()

    if start_bjd:
        obs = [o for o in obs if o.bjd_mid >= start_bjd]
    if end_bjd:
        obs = [o for o in obs if o.bjd_mid <= end_bjd]

    # Sort by time
    obs.sort(key=lambda o: o.bjd_mid)

    # Insert into light_curve_points
    points = []
    for o in obs:
        instrument = get_instrument(o.site_id)
        calibrated_mag, calibrated_err = apply_calibration(o, instrument)
        bjd = o.bjd_tdb if o.bjd_tdb else to_bjd_tdb(o.obs_mid, target.ra_deg, target.dec_deg)

        points.append(LightCurvePoint(
            target_id=target_id,
            obs_id=o.obs_id,
            bjd_tdb=bjd,
            filter_used=filter_name,
            magnitude=calibrated_mag,
            mag_error=calibrated_err,
            ensemble_corrected=instrument is not None,
            zero_point_applied=instrument.zero_point_v if instrument else None
        ))

    # Bulk insert / upsert
    db.bulk_insert_mappings(LightCurvePoint, [p.__dict__ for p in points])
    db.commit()

7. Quality Flagging

Quality control is where bad data gets caught before it corrupts the science products. The quality pipeline runs after each observation is ingested and assigns quality_score (0 = good, 1 = warning, 2 = bad) and specific quality_flags.

7.1 Quality Checks

Timestamp validation: - TIMESTAMP_FUTURE: obs_mid is in the future (submission error) - TIMESTAMP_ANCIENT: obs_mid is before 1900 (almost certainly a parsing error) - TIMESTAMP_MISMATCH: Submission timestamp vs. obs_mid differ by more than exposure_sec + 300s

Magnitude validation: - MAGNITUDE_TOO_BRIGHT: Magnitude < 0 for a star (impossible for point source photometry from the ground) - MAGNITUDE_TOO_FAINT: Magnitude > 22 (beyond even large amateur scopes with reasonable exposure) - MAGNITUDE_SATURATION_SUSPECTED: Magnitude is brighter than the site's estimated saturation limit

Airmass validation: - HIGH_AIRMASS: Airmass > 3.0 (very poor quality expected) - IMPOSSIBLE_AIRMASS: Airmass < 1.0 (impossible)

Comparison star validation (when comparison star data is submitted): - POOR_COMPARISON_ENSEMBLE: Fewer than 3 comparison stars used - COMPARISON_STAR_OUTLIER: One comparison star deviates by > 3σ from ensemble (that star may be variable itself)

Cross-site consistency check (for targets with multiple observers):

def check_cross_site_consistency(obs: Observation, window_hours: float = 2.0) -> list[str]:
    """
    Compare this observation against other obs of the same target in a time window.
    Flag if it deviates significantly from the ensemble.
    """
    flags = []
    recent_obs = get_observations_window(obs.target_id, obs.obs_mid, window_hours)
    if len(recent_obs) < 2:
        return flags  # Can't check with only one point

    mags = [o.magnitude for o in recent_obs if o.quality_score == 0]
    if not mags:
        return flags

    median_mag = np.median(mags)
    std_mag = np.std(mags)

    if std_mag > 0 and abs(obs.magnitude - median_mag) > 4 * std_mag:
        flags.append('CROSS_SITE_OUTLIER')

    return flags

GPS timing validation (for occultation targets):

def validate_timing(obs: Observation, target: Target) -> list[str]:
    flags = []
    if target.requires_gps_timing:
        site = get_site(obs.site_id)
        if not site.has_gps:
            flags.append('TIMING_NO_GPS')
        # Check if event timing makes sense
        if target.event_time:
            dt = abs((obs.obs_mid - target.event_time).total_seconds())
            if dt > 300:  # More than 5 minutes from event time
                flags.append('TIMING_OUTSIDE_WINDOW')
    return flags

7.2 Automatic Rejection

Quality score 2 (bad) observations are excluded from all science products automatically. They are retained in the database for human review and potential recovery. Conditions for score 2: - Any TIMESTAMP_* flag - IMPOSSIBLE_AIRMASS - MAGNITUDE_TOO_BRIGHT or MAGNITUDE_TOO_FAINT - CROSS_SITE_OUTLIER with no adjacent good observations to explain the deviation

Quality score 1 (warning) observations are included in science products but marked. Conditions for score 1: - HIGH_AIRMASS - POOR_COMPARISON_ENSEMBLE - TIMING_NO_GPS for occultation targets - NO_COLOR_CORRECTION


8. FITS Ingestion and Plate Solving

When full FITS images are submitted (optional at Stage 2, not from Stage 1 archives), the pipeline runs additional processing.

8.1 Plate Solving

Plate solving determines the precise WCS (World Coordinate System) of an image — maps pixel coordinates to sky coordinates. This is necessary for: - Verifying the correct target was observed - Precise astrometry for NEO observations - Extracting photometry at the known target position even if the finder chart is off-center

Primary solver: astrometry.net (local installation preferred for production; cloud API for testing).

from astroquery.astrometry_net import AstrometryNet

an = AstrometryNet()
an.api_key = config.astrometry_api_key

wcs_header = an.solve_from_image(fits_path, force_image_upload=False)

Fallback: astap (Astrometric STAcking Program) — faster than astrometry.net for dense field images. Local installation, no API key required.

For production, install astrometry.net locally on the server with the appropriate index files. The index files to download depend on the FOV of incoming images; for typical amateur scopes (0.1°–3° FOV): index files 4205–4208.

Plate solving threshold: Reject solutions with residuals > 2 arcsec per star (indicates poor solve or wrong field).

8.2 Aperture Photometry

Run aperture photometry on submitted FITS files to extract or verify the photometric value:

from photutils.detection import DAOStarFinder
from photutils.aperture import aperture_photometry, CircularAperture
from astropy.stats import sigma_clipped_stats
from astropy.io import fits
import numpy as np

def extract_photometry(fits_path: str, target_ra: float, target_dec: float,
                       wcs_header: dict) -> dict:
    """
    Extract aperture photometry for a target in a FITS image.
    Returns magnitude (instrumental) and error.
    """
    with fits.open(fits_path) as hdul:
        data = hdul[0].data.astype(float)
        header = hdul[0].header

    # Background estimation
    from photutils.background import Background2D, MedianBackground
    bkg_estimator = MedianBackground()
    bkg = Background2D(data, box_size=50, filter_size=3, bkg_estimator=bkg_estimator)
    data_sub = data - bkg.background

    # Convert target sky coordinates to pixel coordinates
    from astropy.wcs import WCS
    from astropy.coordinates import SkyCoord
    wcs = WCS(wcs_header)
    target_coord = SkyCoord(ra=target_ra*u.deg, dec=target_dec*u.deg)
    target_pixel = wcs.world_to_pixel(target_coord)

    # Estimate aperture size from seeing
    # Optimal aperture radius ≈ 1.5 × FWHM in pixels
    fwhm_pixels = estimate_fwhm(data_sub)  # from brightest stars in field
    aperture_radius = max(1.5 * fwhm_pixels, 3.0)  # minimum 3 pixels

    aperture = CircularAperture(target_pixel, r=aperture_radius)
    annulus_aperture = CircularAnnulus(target_pixel, r_in=aperture_radius*2,
                                       r_out=aperture_radius*3)

    # Photometry
    phot_table = aperture_photometry(data_sub, aperture)
    bkg_annulus = aperture_photometry(data_sub, annulus_aperture)

    bkg_per_pixel = bkg_annulus['aperture_sum'] / annulus_aperture.area
    source_flux = phot_table['aperture_sum'] - bkg_per_pixel * aperture.area

    # Instrumental magnitude: m_inst = -2.5 * log10(flux / exptime)
    exptime = header.get('EXPTIME', 1.0)
    if source_flux <= 0:
        return {'success': False, 'reason': 'negative_flux'}

    mag_inst = -2.5 * np.log10(source_flux / exptime)

    # Poisson noise + readout noise estimate
    gain = header.get('GAIN', 1.0)
    rdnoise = header.get('RDNOISE', 5.0)  # e- RMS
    n_pixels = np.pi * aperture_radius**2
    noise = np.sqrt(source_flux / gain + n_pixels * (bkg_per_pixel / gain + rdnoise**2 / gain**2))
    snr = source_flux / noise
    mag_error = 1.0857 / snr  # delta_m = (1 / ln(10)) * (delta_f / f) ≈ 1.0857 / SNR

    return {
        'success': True,
        'magnitude_instrumental': float(mag_inst),
        'mag_error': float(mag_error),
        'snr': float(snr),
        'aperture_radius_px': float(aperture_radius),
        'fwhm_pixels': float(fwhm_pixels)
    }

8.3 Comparison Star Extraction

For ensemble differential photometry, extract comparison stars from the same image and cross-match against APASS or Gaia:

def extract_comparison_stars(fits_path: str, wcs_header: dict,
                              target_ra: float, target_dec: float) -> list[dict]:
    """
    Detect all stars in image, cross-match against APASS catalog,
    return list of calibrated comparison stars.
    """
    with fits.open(fits_path) as hdul:
        data = hdul[0].data.astype(float)

    # Detect sources
    mean, median, std = sigma_clipped_stats(data)
    daofind = DAOStarFinder(fwhm=4.0, threshold=10*std)
    sources = daofind(data - median)

    # Convert pixel coords to sky coords
    wcs = WCS(wcs_header)
    sky_coords = wcs.pixel_to_world(sources['xcentroid'], sources['ycentroid'])

    # Cross-match against APASS
    apass_catalog = get_apass_comparison_stars(target_ra, target_dec)

    comp_stars = []
    if apass_catalog is not None:
        catalog_coords = SkyCoord(ra=apass_catalog['RAJ2000'],
                                  dec=apass_catalog['DEJ2000'])
        idx, sep, _ = sky_coords.match_to_catalog_sky(catalog_coords)
        good_match = sep < 3*u.arcsec

        for i, match in enumerate(good_match):
            if match and apass_catalog[idx[i]]['Vmag'] is not np.ma.masked:
                comp_stars.append({
                    'catalog_id': f"APASS_{idx[i]}",
                    'ra_deg': float(sky_coords[i].ra.deg),
                    'dec_deg': float(sky_coords[i].dec.deg),
                    'mag_catalog': float(apass_catalog[idx[i]]['Vmag']),
                    'mag_instrumental': None  # filled in after photometry
                })

    return comp_stars

9. Science Layer: TTV Pipeline

The TTV pipeline is the flagship Stage 1 science product. It takes a time series of transit mid-times and extracts Transit Timing Variations (TTVs), then optionally runs the reverse n-body inference to recover perturber parameters.

See NewOpenAstro/Science/Projects/TTV Reverse N-Body Inference.md for the scientific background. This section covers the implementation.

9.1 Transit Mid-Time Extraction

Given a light curve with a transit, fit the transit model to extract the precise mid-time:

import batman
import scipy.optimize as op
from dataclasses import dataclass

@dataclass
class TransitFitResult:
    t_mid_bjd: float      # Transit mid-time in BJD_TDB
    t_mid_err: float      # 1σ error on mid-time (days)
    depth_frac: float     # Transit depth as fraction of out-of-transit flux
    duration_hours: float
    residuals_rms: float  # RMS of fit residuals (mag)
    success: bool

def fit_transit_midtime(bjd: np.ndarray, flux: np.ndarray, flux_err: np.ndarray,
                        t0_guess: float, period: float,
                        planet_params: dict) -> TransitFitResult:
    """
    Fit a transit model to extract precise mid-time.
    planet_params: dict with 'rp', 'a', 'inc', 'ecc', 'w', 'limb_dark_coeffs'
    """
    # Batman transit model
    params = batman.TransitParams()
    params.t0 = t0_guess
    params.per = period
    params.rp = planet_params['rp']  # Rp/R*
    params.a = planet_params['a']    # a/R*
    params.inc = planet_params['inc']
    params.ecc = planet_params.get('ecc', 0.0)
    params.w = planet_params.get('w', 90.0)
    params.u = planet_params.get('limb_dark_coeffs', [0.3, 0.2])
    params.limb_dark = 'quadratic'

    m = batman.TransitModel(params, bjd)

    def chi2(theta):
        t_mid, depth_factor = theta
        params.t0 = t_mid
        params.rp *= depth_factor
        model_flux = m.light_curve(params)
        return np.sum(((flux - model_flux) / flux_err)**2)

    result = op.minimize(chi2, [t0_guess, 1.0], method='Nelder-Mead',
                         options={'xatol': 1e-7, 'fatol': 1e-10})

    if not result.success:
        return TransitFitResult(t_mid_bjd=t0_guess, t_mid_err=999,
                                depth_frac=0, duration_hours=0,
                                residuals_rms=999, success=False)

    t_mid_fit, depth_factor_fit = result.x

    # Estimate timing uncertainty via MCMC or parabolic approximation
    # For production: use emcee. For speed: parabolic approximation.
    timing_err = estimate_timing_uncertainty_parabolic(chi2, t_mid_fit)

    return TransitFitResult(
        t_mid_bjd=float(t_mid_fit),
        t_mid_err=float(timing_err),
        depth_frac=float(params.rp**2 * depth_factor_fit),
        duration_hours=float(estimate_duration(params, bjd)),
        residuals_rms=float(np.sqrt(result.fun / len(bjd))),
        success=True
    )

9.2 TTV Extraction

Once we have a time series of transit mid-times, extract TTVs:

def extract_ttvs(transit_times: list[tuple[float, float]],
                 period: float, t0: float) -> pd.DataFrame:
    """
    Compute TTV residuals (observed - calculated) for each transit.

    transit_times: list of (bjd_mid, bjd_mid_err) tuples
    period: best-estimate period in days
    t0: reference transit time in BJD_TDB

    Returns DataFrame with columns: transit_number, t_obs, t_calc, ttv_days, ttv_err
    """
    rows = []
    for t_obs, t_err in sorted(transit_times, key=lambda x: x[0]):
        # Find nearest expected transit
        n = round((t_obs - t0) / period)
        t_calc = t0 + n * period
        ttv = t_obs - t_calc
        rows.append({
            'transit_number': int(n),
            't_observed_bjd': t_obs,
            't_calculated_bjd': t_calc,
            'ttv_days': ttv,
            'ttv_err_days': t_err
        })

    df = pd.DataFrame(rows).sort_values('transit_number')

    # Refine period using linear fit to O-C diagram
    if len(df) >= 5:
        from scipy.stats import linregress
        slope, intercept, r, p, se = linregress(df['transit_number'], df['t_observed_bjd'])
        refined_period = slope
        refined_t0 = intercept
        # Recompute TTVs with refined period
        df['t_calc_refined'] = refined_t0 + df['transit_number'] * refined_period
        df['ttv_days_refined'] = df['t_observed_bjd'] - df['t_calc_refined']

    return df

9.3 N-Body Inference

The reverse n-body problem: given observed TTVs, infer perturber orbital parameters. This is the computationally intensive step.

import ttvfast
import emcee

def run_ttv_inference(ttv_data: pd.DataFrame, known_planet: dict) -> dict:
    """
    Run MCMC inference on TTV residuals to recover perturber parameters.

    known_planet: {'mass': M_earth, 'period': days, 't0': BJD, 'e': float, 'omega': deg}

    Returns posterior samples and best-fit parameters.
    """
    def log_likelihood(theta):
        m2, p2, t2, e2, omega2 = theta

        # Forward model: simulate TTVs with TTVFast
        planets = [
            ttvfast.models.Planet(
                mass=known_planet['mass'] / 333000,  # convert to solar masses
                period=known_planet['period'],
                ecc=known_planet.get('e', 0.0),
                omega=known_planet.get('omega', 0.0),
                mean_anomaly=known_planet.get('mean_anomaly', 0.0)
            ),
            ttvfast.models.Planet(
                mass=m2 / 333000,
                period=p2,
                ecc=e2,
                omega=omega2,
                mean_anomaly=t2  # approximate
            )
        ]

        results = ttvfast.ttvfast(planets,
                                   stellar_mass=1.0,
                                   time=ttv_data['t_calculated_bjd'].min() - 100,
                                   dt=0.001,
                                   total=ttv_data['t_calculated_bjd'].max() + 100,
                                   input_flag=1)

        # Extract predicted mid-times for the inner planet
        planet1_times = np.array(results[0][1])  # transit times for planet 1

        # Match predicted times to observed transit numbers
        chi2 = 0
        for _, row in ttv_data.iterrows():
            n = int(row['transit_number'])
            if n >= len(planet1_times):
                return -np.inf
            predicted_ttv = planet1_times[n] - row['t_calculated_bjd']
            chi2 += ((row['ttv_days'] - predicted_ttv) / row['ttv_err_days'])**2

        return -0.5 * chi2

    def log_prior(theta):
        m2, p2, t2, e2, omega2 = theta
        # Physically motivated priors
        if not (0.01 < m2 < 3000):     return -np.inf  # Earth masses
        if not (1 < p2 < 1000):        return -np.inf  # days
        if not (0 <= e2 < 0.9):        return -np.inf
        if not (0 <= omega2 < 360):    return -np.inf
        return 0.0

    def log_probability(theta):
        lp = log_prior(theta)
        if not np.isfinite(lp):
            return -np.inf
        return lp + log_likelihood(theta)

    # Initialize walkers near reasonable starting point
    ndim = 5
    nwalkers = 32
    p0 = np.array([10, 50, 0.5, 0.05, 90]) + 0.1 * np.random.randn(nwalkers, ndim)

    sampler = emcee.EnsembleSampler(nwalkers, ndim, log_probability)
    sampler.run_mcmc(p0, nsteps=2000, progress=True)

    # Discard burn-in
    flat_samples = sampler.get_chain(discard=500, thin=15, flat=True)

    return {
        'samples': flat_samples,
        'median': np.median(flat_samples, axis=0),
        'std': np.std(flat_samples, axis=0),
        'parameter_names': ['m2_Mearth', 'period_days', 't0_fraction', 'eccentricity', 'omega_deg']
    }

Computational cost note: The MCMC run with 32 walkers × 2000 steps × N_transits TTVFast calls is expensive. Estimate 1–10 minutes on a modern CPU for a typical system with 50 transits. For Stage 1 proof-of-concept, this is fine. For Stage 2 with live data and many target systems, consider a Celery task queue or BOINC (see TTV document).

9.4 Period Search (Lomb-Scargle)

For variable star science:

from astropy.timeseries import LombScargle

def run_period_search(bjd: np.ndarray, mag: np.ndarray, mag_err: np.ndarray,
                      period_min: float = 0.1, period_max: float = 365) -> dict:
    """
    Run Lomb-Scargle period search on a light curve.
    Returns best period and periodogram.
    """
    frequency_min = 1.0 / period_max
    frequency_max = 1.0 / period_min

    ls = LombScargle(bjd, mag, mag_err, normalization='standard')
    frequency, power = ls.autopower(
        minimum_frequency=frequency_min,
        maximum_frequency=frequency_max,
        samples_per_peak=10
    )

    best_freq = frequency[np.argmax(power)]
    best_period = 1.0 / best_freq
    false_alarm = ls.false_alarm_probability(power.max(), method='bootstrap')

    return {
        'best_period_days': float(best_period),
        'peak_power': float(power.max()),
        'false_alarm_probability': float(false_alarm),
        'frequency': frequency.tolist(),
        'power': power.tolist()
    }


10. Output Products

Stage 1 produces:

  1. Calibrated FITS archive: Processed FITS files with updated WCS headers and photometry keywords. Stored in B2. Filename convention: {target_name}/{date_utc}/{observer_code}_{filter}_{jd}.fits

  2. Light curve files: Per-target per-filter light curves in FITS binary table format (standard for astronomy) AND CSV (accessible). Columns: BJD_TDB, magnitude, mag_error, filter, observer_code, quality_flag.

  3. Instrument registry export: CSV of all characterized instruments with calibration parameters. Shared publicly as a community resource. This is a distinct scientific contribution — a database of amateur telescope system parameters.

  4. TTV timing tables: Per-planet tables of transit mid-times, errors, and TTVs. FITS binary table format. Accompanied by diagnostic plots (O-C diagram).

  5. Science results: Period measurements, TTV posteriors, comparison with published values. LaTeX-formatted for paper preparation.


11. First Science Target Selection

The Roadmap identifies three candidates. Recommendation:

Start with option 1: AAVSO variable star period refinement.

Reasons: - AAVSO data API is the most mature and well-documented of all the sources - Success metric is unambiguous (period measurement with uncertainty) - Pipeline can be validated against published GCVS periods - Low risk: if the pipeline has bugs, the comparison against published values immediately reveals them - Can produce a publishable result within weeks of starting

Candidate targets: - Z UMa (Mira variable): 195-day period, thousands of AAVSO observations spanning decades. Period shows secular changes and cycle-to-cycle variation. Paper topic: period change analysis using full AAVSO archive. - T Cep (Mira): Similar, but less well-studied secular period change. More room for novel contribution. - RR Lyrae (classic Cepheid-like): Period stability is itself scientifically interesting. Period changes at sub-second level require the BJD_TDB correction.

After period refinement paper is in draft: Move to option 2 (TTV from archival ETD data).

Candidate exoplanet system for TTV proof-of-concept: - WASP-12b: Very short period (1.09 days), known orbital decay, hundreds of ETD transits available. Strong signal. Risk: period derivative is known; our value should match. Good validation target. - Kepler-19c perturber inference: Kepler-19b has known TTVs with a known perturber. We can re-derive the perturber parameters using only ETD data as a proof-of-concept that our inference pipeline recovers the known answer.


Last updated: 2026-03-20 Status: Planning document — not yet implemented Dependencies: Server Architecture Deep Dive.md (database schema), instruments table, light_curve_points table First action: Stand up AAVSO adapter and run it against Z UMa