Skip to content

SSA Bulk Coding Handoff Specification

Purpose

This document provides complete specifications for implementing the SSA (Space Situational Awareness) system. Each component is broken down into implementable units with clear inputs, outputs, and acceptance criteria.

For the implementing model: Read each section, implement the code as specified, follow the file structure exactly.


Project Structure

ssa/
├── edge/                      # Raspberry Pi edge detection
│   ├── __init__.py
│   ├── streak_detector.py     # Core detection algorithm
│   ├── plate_solver.py        # Astrometry solution
│   ├── timing_sync.py         # NTP timing
│   ├── frame_buffer.py        # Video buffering
│   ├── uploader.py            # Send to central
│   ├── config.py              # Configuration
│   └── main.py                # Entry point
│
├── central/                   # Cloud server
│   ├── __init__.py
│   ├── catalog.py             # TLE management
│   ├── track_associator.py    # Link observations
│   ├── orbit_solver.py       # Orbit determination
│   ├── triangulator.py        # Multi-site position
│   ├── scheduler.py          # Observation coordination
│   ├── api.py                 # REST API
│   ├── models.py              # Database models
│   └── config.py              # Configuration
│
├── characterization/          # Analysis module
│   ├── __init__.py
│   ├── lightcurve.py          # Tumble detection
│   ├── material_classifier.py # Material analysis
│   └── size_estimator.py      # Size estimation
│
├── tests/
│   ├── test_edge/
│   ├── test_central/
│   └── test_characterization/
│
├── scripts/
│   ├── deploy_edge.sh         # Edge node deployment
│   ├── deploy_central.sh      # Central server setup
│   └── test_with_iss.py       # Integration test
│
└── docs/
    ├── API.md
    ├── DEPLOYMENT.md
    └── TESTING.md

Component 1: Edge Detection Node

Location: /edge/streak_detector.py

Purpose: Detect satellite streaks in video frames from cheap cameras.

Input: - Video file path or camera stream - Configuration with detection parameters

Output: - JSON list of detected streaks with: timestamp, position (x,y), angle, length, brightness

Implementation Requirements:

"""
Streak detection module for satellite tracking.

Performance requirements:
- Process25 fps video in real-time on RPi 4
- Detect streaks down to magnitude 10
- False positive rate < 5%
"""

import cv2
import numpy as np
from typing import List, Dict
from dataclasses import dataclass
from datetime import datetime

@dataclass
class Streak:
    timestamp: float  # seconds since epoch
    x_center: float   # pixel coordinates
    y_center: float
    angle: float      # radians
    length: float     # pixels
    brightness: float # ADU

class StreakDetector:
    def __init__(self, config: Dict):
        """
        Initialize detector with configuration.

        config keys:
            - threshold: int - brightness threshold (default: 30)
            - min_line_length: int - minimum streak length in pixels (default: 50)
            - max_line_gap: int - maximum gap in streak (default: 10)
            - fps: int - frames per second (default: 25)
        """
        pass

    def process_frame(self, frame: np.ndarray) -> List[Streak]:
        """
        Process single frame forstreaks.

        Algorithm:
            1. Convert to grayscale
            2. Subtract background (median of previous N frames)
            3. Threshold for moving pixels
            4. Apply morphological cleaning (remove noise)
            5. Detect lines using Hough transform
            6. Filter by length and velocity
            7. Calculate brightness (sum along line)

        Args:
            frame: BGR or grayscale image array

        Returns:
            List of Streak objects detected in frame
        """
        # IMPLEMENT THIS
        pass

    def process_video(self, video_path: str) -> List[Streak]:
        """
        Process entire video file.

        Args:
            video_path: Path to video file

        Returns:
            List of all Streak objects detected across all frames
        """
        # IMPLEMENT THIS
        pass

    def set_reference_frame(self, frame: np.ndarray):
        """
        Set reference frame for background subtraction.

        Should be a frame with stars only (no streaks).
        Use median of multiple frames for best results.
        """
        # IMPLEMENT THIS
        pass

Testing Requirements:

def test_detector_basic():
    """Test detection of synthetic streak."""
    # Create synthetic frame: black background, white streak
    frame = np.zeros((1080, 1920), dtype=np.uint8)
    cv2.line(frame, (100, 540), (1800, 600), 255, 2)

    detector = StreakDetector({'threshold': 10})
    streaks = detector.process_frame(frame)

    assert len(streaks) == 1
    assert streaks[0].length > 100
    assert streaks[0].brightness > 0

def test_detector_no_streak():
    """Test that static frame produces no streaks."""
    frame = np.random.randint(0, 30, (1080, 1920), dtype=np.uint8)

    detector = StreakDetector({'threshold': 50})
    streaks = detector.process_frame(frame)

    assert len(streaks) == 0

Location: /edge/plate_solver.py

Purpose: Convert pixel coordinates to celestial coordinates (RA, Dec).

Input: - Image frame - Detected streak pixel positions

Output: - RA, Dec for each streak

Implementation Requirements:

"""
Astrometric plate solving for coordinate conversion.

Dependencies:
    - astrometry.net (install separately)
    OR
    - astropy.coordinates
"""

from astropy.coordinates import SkyCoord
from astropy.wcs import WCS
import astropy.units as u

class PlateSolver:
    def __init__(self, config: Dict):
        """
        Initialize plate solver.

        config keys:
            - method: 'astrometry_net' or 'astropy'
            - astrometry_api_key: str (if using astrometry.net)
            - fov_estimate: float - estimated field of view in degrees
        """
        pass

    def solve_frame(self, frame: np.ndarray) -> Dict:
        """
        Solve frame for WCS (World Coordinate System).

        Returns:
            {
                'wcs': WCS object,
                'ra_center': float (degrees),
                'dec_center': float (degrees),
                'scale': float (arcsec/pixel),
                'rotation': float (degrees)
            }
        """
        # IMPLEMENT THIS
        pass

    def pixel_to_sky(self, x: float, y: float, wcs: WCS) -> SkyCoord:
        """
        Convert pixel coordinates to sky coordinates.

        Args:
            x, y: Pixel coordinates
            wcs: World Coordinate System from solve_frame()

        Returns:
            SkyCoord object with RA, Dec
        """
        # IMPLEMENT THIS
        pass

Testing Requirements:

def test_solve_synthetic():
    """Test plate solving on synthetic star field."""
    # Create frame with known star positions
    # Solve
    # Verify RA, Dec match expected
    pass

Location: /edge/timing_sync.py

Purpose: Synchronize timestamps across multiple nodes.

Implementation Requirements:

"""
Precision timing synchronization using NTP.
"""

import ntplib
import time
from datetime import datetime

class TimeSync:
    def __init__(self, ntp_servers: List[str] = None):
        """
        Initialize time synchronizer.

        Args:
            ntp_servers: List of NTP servers to query
                        Default: ['pool.ntp.org', 'time.nist.gov']
        """
        if ntp_servers is None:
            self.ntp_servers = ['pool.ntp.org', 'time.nist.gov']
        else:
            self.ntp_servers = ntp_servers

        self.offset_ms = 0.0

    def sync(self) -> bool:
        """
        Synchronize with NTP servers.

        Returns:
            True if sync successful, False otherwise
        """
        # IMPLEMENT THIS
        # Query each server
        # Calculate offset
        # Store offset
        pass

    def get_precise_time(self) -> float:
        """
        Get current time with NTP correction.

        Returns:
            Unix timestamp (seconds since epoch) with millisecond precision
        """
        # IMPLEMENT THIS
        pass

    def get_status(self) -> Dict:
        """
        Get synchronization status.

        Returns:
            {
                'offset_ms': float,
                'last_sync': datetime,
                'source': str (which NTP server)
            }
        """
        pass

Location: /edge/uploader.py

Purpose: Send detection results to central server.

Implementation Requirements:

"""
Upload detection results to central server.
"""

import requests
import json
from typing import List
import backoff

class Uploader:
    def __init__(self, config: Dict):
        """
        Initialize uploader.

        config keys:
            - server_url: str - Central server base URL
            - api_key: str - Authentication key
            - batch_size: int - Upload in batches (default: 10)
            - retry_attempts: int - Retry on failure (default: 3)
        """
        pass

    @backoff.on_exception(backoff.expo, 
                          requests.RequestException,
                          max_tries=3)
    def upload_streaks(self, streaks: List[Streak]) -> bool:
        """
        Upload detected streaks to central server.

        Args:
            streaks: List ofStreak objects

        Returns:
            True if upload successful
        """
        # IMPLEMENT THIS
        # Convert Streaks to dict
        # POST to /api/v1/streaks
        # Handle retries
        pass

    def upload_status(self, status: Dict) -> bool:
        """
        Upload node status.

        Args:
            status: {
                'timestamp': float,
                'cpu_temp': float,
                'disk_free': float,
                'uptime': float,
                'frames_processed': int
            }
        """
        pass

Location: /edge/main.py

Purpose: Main entry point for edge node.

Implementation Requirements:

"""
Main entry point for edge detection node.

Usage:
    python main.py --config edge_config.yaml
"""

import argparse
import time
from pathlib import Path
import yaml

from streak_detector import StreakDetector
from plate_solver import PlateSolver
from timing_sync import TimeSync
from uploader import Uploader

def main(config_path: str):
    """
    Run edge detection pipeline.

    Workflow:
        1. Load configuration
        2. Initialize components
        3. Sync time
        4. Capture frames
        5. Detect streaks
        6. Solve astrometry
        7. Upload to central
        8. Loop forever
    """
    # Load config
    with open(config_path) as f:
        config = yaml.safe_load(f)

    # Initialize
    detector = StreakDetector(config['detection'])
    solver = PlateSolver(config['plate_solving'])
    timing = TimeSync(config['ntp_servers'])
    uploader = Uploader(config['upload'])

    # Sync time
    timing.sync()

    # Main loop
    while True:
        # IMPLEMENT THIS
        pass

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--config', required=True)
    args = parser.parse_args()
    main(args.config)

Component 2: Central Server

Location: /central/models.py

Purpose: Database models for SSA data.

Implementation Requirements:

"""
SQLAlchemy models for SSA database.
"""

from sqlalchemy import Column, Integer, Float, String, DateTime, JSON
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime

Base = declarative_base()

class SpaceObject(Base):
    """Tracked object (satellite, debris, unknown)."""
    __tablename__ = 'space_objects'

    id = Column(Integer, primary_key=True)
    norad_id = Column(Integer, unique=True, nullable=True)
    cospar_id = Column(String(20), nullable=True)
    name = Column(String(255))
    object_type = Column(String(50))  # 'payload', 'debris', 'unknown'

    # Orbital elements (latest)
    semi_major_axis = Column(Float)  # km
    eccentricity = Column(Float)
    inclination = Column(Float)      # degrees
    raan = Column(Float)             # degrees
    arg_perigee = Column(Float)      # degrees
    mean_anomaly = Column(Float)     # degrees
    epoch = Column(DateTime)

    # Characterization
    magnitude_estimate = Column(Float)
    tumble_status = Column(String(50))  # 'stable', 'tumbling', 'unknown'
    tumble_period = Column(Float)        # seconds
    material_class = Column(String(50))  # 'S', 'C', 'M', 'solar_panel', 'unknown'
    size_estimate = Column(Float)        # meters

    # Metadata
    first_seen = Column(DateTime, default=datetime.utcnow)
    last_seen = Column(DateTime, default=datetime.utcnow)
    observation_count = Column(Integer, default=0)

    def to_dict(self):
        return {
            'id': self.id,
            'norad_id': self.norad_id,
            'name': self.name,
            'object_type': self.object_type,
            'tumble_status': self.tumble_status,
            'last_seen': self.last_seen.isoformat()
        }

class Observation(Base):
    """Single observation from one site."""
    __tablename__ = 'observations'

    id = Column(Integer, primary_key=True)
    object_id = Column(Integer, nullable=True)
    site_id = Column(Integer, nullable=False)

    # Timing (precise)
    timestamp = Column(Float, nullable=False)  # Unix timestamp
    timestamp_uncertainty = Column(Float)      # seconds

    # Position
    ra = Column(Float, nullable=False)         # degrees
    dec = Column(Float, nullable=False)        # degrees
    ra_rate = Column(Float)                    # deg/sec
    dec_rate = Column(Float)                    # deg/sec

    # Photometry
    magnitude = Column(Float)
    filter_band = Column(String(10))          # 'B', 'V', 'R', 'I', 'clear'

    # Quality
    snr = Column(Float)
    astrometric_uncertainty = Column(Float)    # arcsec

    # Metadata
    created_at = Column(DateTime, default=datetime.utcnow)

    def to_dict(self):
        return {
            'id': self.id,
            'object_id': self.object_id,
            'site_id': self.site_id,
            'timestamp': self.timestamp,
            'ra': self.ra,
            'dec': self.dec,
            'magnitude': self.magnitude
        }

class SSANode(Base):
    """Detection node (camera site)."""
    __tablename__ = 'ssa_nodes'

    id = Column(Integer, primary_key=True)
    name = Column(String(100), unique=True)

    # Location
    latitude = Column(Float)    # degrees
    longitude = Column(Float)   # degrees
    elevation = Column(Float)   # meters

    # Capability
    min_magnitude = Column(Float)  # detection limit
    fov_degrees = Column(Float)    # field of view

    # Status
    is_active = Column(Integer, default=1)
    last_heartbeat = Column(DateTime)

    created_at = Column(DateTime, default=datetime.utcnow)

    def to_dict(self):
        return {
            'id': self.id,
            'name': self.name,
            'latitude': self.latitude,
            'longitude': self.longitude,
            'is_active': self.is_active
        }

class Campaign(Base):
    """Observation campaign."""
    __tablename__ = 'campaigns'

    id = Column(Integer, primary_key=True)
    name = Column(String(255))
    description = Column(String)

    target_object_id = Column(Integer)
    stage = Column(Integer)  # 1=detection, 2=refinement, 3=characterization

    status = Column(String(50))  # 'draft', 'active', 'completed', 'failed'

    scheduled_start = Column(DateTime)
    scheduled_end = Column(DateTime)

    created_at = Column(DateTime, default=datetime.utcnow)
    started_at = Column(DateTime)
    completed_at = Column(DateTime)

    def to_dict(self):
        return {
            'id': self.id,
            'name': self.name,
            'stage': self.stage,
            'status': self.status
        }

Location: /central/catalog.py

Purpose: Manage TLE catalog and orbit propagation.

Implementation Requirements:

"""
Space-Track catalog integration and TLE propagation.
"""

import requests
from skyfield.api import load, EarthSatellite
from datetime import datetime
from typing import List, Dict, Optional

class CatalogManager:
    def __init__(self, config: Dict):
        """
        Initialize catalog manager.

        config keys:
            - space_track_username: str
            - space_track_password: str
            - cache_dir: str - Directory to cache TLEs
        """
        self.session = None
        self.cache = {}
        pass

    def login(self) -> bool:
        """
        Authenticate with Space-Track.org.

        Returns:
            True if successful
        """
        # IMPLEMENT THIS
        pass

    def get_tle(self, norad_id: int) -> Optional[Dict]:
        """
        Get TLE for specific object.

        Returns:
            {
                'norad_id':int,
                'line1': str,
                'line2': str,
                'name': str,
                'epoch': datetime
            }
            or None if not found
        """
        # IMPLEMENT THIS
        pass

    def search_by_name(self, name: str) -> List[Dict]:
        """
        Search for objects by name.

        Returns:
            List of matching TLEs
        """
        pass

    def propagate(self, tle: Dict, timestamp: datetime) -> Dict:
        """
        Propagate TLE to given timestamp using SGP4.

        Returns:
            {
                'ra': float (degrees),
                'dec': float (degrees),
                'altitude': float (km),
                'velocity': float (km/s),
                'geocentric': (x, y, z) in km
            }
        """
        # IMPLEMENT THIS
        # Use skyfield for propagation
        pass

    def find_visible(
        self,
        site_lat: float,
        site_lon: float,
        site_elev: float,
        time_start: datetime,
        time_end: datetime
    ) -> List[Dict]:
        """
        Find all objects visible from site in time window.

        Returns:
            List of objects with visibility windows
        """
        # IMPLEMENT THIS
        pass

Location: /central/track_associator.py

Purpose: Link observations into tracklets and identify objects.

Implementation Requirements:

"""
Associate observations with known objects or create new candidates.
"""

import numpy as np
from scipy.spatial.distance import cdist
from typing import List, Optional

class TrackAssociator:
    def __init__(self, catalog: CatalogManager):
        """
        Initialize associator with catalog.
        """
        self.catalog = catalog

    def associate(
        self,
        observation: Dict,
        time_tolerance: float = 60.0,  # seconds
        position_tolerance: float = 1.0  # degrees
    ) -> Optional[int]:
        """
        Associate observation with known object.

        Args:
            observation: {
                'ra': float,
                'dec': float,
                'timestamp': float
            }
            time_tolerance: Maximum time difference for correlation
            position_tolerance: Maximum angular separation (degrees)

        Returns:
            object_id if associated, None if new object
        """
        # IMPLEMENT THIS
        # Query catalog for objects near position
        # Propagate TLE to observation time
        # Check if within tolerance
        pass

    def create_candidate(self, observation: Dict) -> int:
        """
        Create new candidate object from observation.

        Returns:
            new object_id
        """
        # IMPLEMENT THIS
        pass

    def link_observations(
        self,
        observations: List[Dict],
        max_angular_velocity: float = 5.0  # deg/sec for LEO
    ) -> List[List[int]]:
        """
        Link observations into tracklets.

        Algorithm:
            1. Sort by time
            2. For each observation:
                3. Find observations within time window
                4. Calculate angular separation / time delta
                5. If consistent velocity: link
                6. If velocity too high: skip
            7. Return linked tracklets

        Returns:
            List of tracklets (each is list of observation IDs)
        """
        # IMPLEMENT THIS
        pass

Location: /central/triangulator.py

Purpose: Calculate 3D position from multiple observations.

Implementation Requirements:

"""
Multi-site triangulation for altitude determination.
"""

import numpy as np
from astropy.coordinates import EarthLocation, SkyCoord, AltAz
from astropy.time import Time
import astropy.units as u
from typing import List, Dict

class Triangulator:
    def __init__(self):
        """Initialize triangulator."""
        pass

    def triangulate(
        self,
        observations: List[Dict],
        method: str = 'least_squares'
    ) -> Dict:
        """
        Calculate 3D position from multiple observations.

        Args:
            observations: List of {
                'site': EarthLocation,
                'time': Time,
                'ra': float (degrees),
                'dec': float (degrees)
            }
            method: 'least_squares' or 'geometric'

        Returns:
            {
                'position_eci': (x, y, z) in km,
                'altitude': float in km,
                'range_to_sites': [r1, r2, r3],
                'uncertainty': float in km,
                'method': str
            }
        """
        # IMPLEMENT THIS
        # Convert sites to ECI at each observation time
        # Form line-of-sight vectors
        # Find closest approach point
        # Calculate altitude
        # Estimate uncertainty from site geometry
        pass

    def geometric_triangulation(
        self,
        sites_ecef: List[np.ndarray],
        los_vectors: List[np.ndarray]
    ) -> np.ndarray:
        """
        Geometric intersection of multiple lines of sight.

        Finds point minimizing distance to all lines.
        """
        # IMPLEMENT THIS
        pass

    def estimate_uncertainty(
        self,
        position: np.ndarray,
        sites_ecef: List[np.ndarray],
        timing_uncertainties: List[float]
    ) -> float:
        """
        Estimate position uncertainty from geometry and timing.

        Args:
            position: ECI position (km)
            sites_ecef: Site positions in ECEF
            timing_uncertainties: Timing errors in seconds

        Returns:
            Uncertainty in km
        """
        # IMPLEMENT THIS
        # Geometric dilution of precision
        # Timing contribution
        # Combine
        pass

Location: /central/orbit_solver.py

Purpose: Determine orbits from observations.

Implementation Requirements:

"""
Orbit determination fromobservations.
"""

import numpy as np
from typing import List, Dict, Optional

class OrbitSolver:
    def __init__(self):
        """Initialize orbit solver."""
        pass

    def gauss_method(
        self,
        observations: List[Dict]
    ) -> Dict:
        """
        Determine orbit from 3+ observations using Gauss method.

        Args:
            observations: List of {
                'time': Time,
                'ra': float,
                'dec': float,
                'site': EarthLocation
            }

        Returns:
            {
                'semi_major_axis': float (km),
                'eccentricity': float,
                'inclination': float (degrees),
                'raan': float (degrees),
                'arg_perigee': float (degrees),
                'mean_anomaly': float (degrees),
                'epoch': Time,
                'covariance': np.ndarray
            }
        """
        # IMPLEMENT THIS
        # Classic Gauss method
        # Use first3 observations
        # Solve for orbital elements
        pass

    def admissible_region(
        self,
        observations: List[Dict],
        n_particles: int = 10000
    ) -> List[Dict]:
        """
        Create particle swarm for short-arc observations.

        For observations too short for Gauss method.

        Args:
            observations: Short arc (< 1 min typically)
            n_particles: Number of virtual particles

        Returns:
            List of possible orbital states
        """
        # IMPLEMENT THIS
        # Extract angles and rates
        # Define (range, range_rate) grid
        # Apply constraints:
        #   - Energy < 0 (bound)
        #   - Perigee > 100 km
        # Sample particles
        # Return initial states
        pass

    def propagate_particles(
        self,
        particles: List[Dict],
        time_delta: float  # seconds
    ) -> List[Dict]:
        """
        Propagate particle cloud forward in time.

        Args:
            particles: List of orbital states
            time_delta: Seconds to propagate

        Returns:
            Propagated particles
        """
        # IMPLEMENT THIS
        # Use SGP4 for each particle
        pass

    def prune_particles(
        self,
        particles: List[Dict],
        new_observation: Dict,
        tolerance: float = 1.0  # degrees
    ) -> List[Dict]:
        """
        Remove particles inconsistent with new observation.

        Args:
            particles: Current particle cloud
            new_observation: {
                'time': Time,
                'ra': float,
                'dec': float
            }
            tolerance: Angular separation threshold

        Returns:
            Pruned particle list
        """
        # IMPLEMENT THIS
        # Propagate all particles to new time
        # Calculate angular separation from observation
        # Remove particles outside tolerance
        pass

Location: /central/api.py

Purpose: REST API for SSA system.

Implementation Requirements:

"""
REST API for SSA system.

Endpoints:
    POST /api/v1/streaks - Upload detections
    GET /api/v1/objects - List tracked objects
    GET /api/v1/objects/{id} - Get object details
    POST /api/v1/campaigns - Create observation campaign
    GET /api/v1/campaigns/{id} - Get campaign status
"""

from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from models import Base, Observation, SpaceObject, SSANode, Campaign

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql:///ssa'
db = SQLAlchemy(app)

@app.route('/api/v1/streaks', methods=['POST'])
def upload_streaks():
    """
    Upload detected streaks from edge node.

    Request body:
        {
            'site_id': int,
            'timestamp': float,
            'streaks': [
                {
                    'timestamp': float,
                    'x_center': float,
                    'y_center': float,
                    'angle': float,
                    'length': float,
                    'brightness': float,
                    'ra': float (if plate solved),
                    'dec': float (if plate solved)
                },
                ...
            ]
        }

    Returns:
        {
            'status': 'ok',
            'observations_created': int
        }
    """
    # IMPLEMENT THIS
    # Validate input
    # Associate with known objects
    # Store in database
    # Trigger downstream processing
    pass

@app.route('/api/v1/objects', methods=['GET'])
def list_objects():
    """
    List tracked space objects.

    Query params:
        - limit: int (default 50)
        - offset: int (default 0)
        - type: str (filter by object_type)
        - min_magnitude: float

    Returns:
        {
            'objects': [SpaceObject.to_dict(), ...],
            'total': int,
            'limit': int,
            'offset': int
        }
    """
    # IMPLEMENT THIS
    pass

@app.route('/api/v1/objects/<int:object_id>', methods=['GET'])
def get_object(object_id):
    """Get detailed object information."""
    # IMPLEMENT THIS
    # Include recent observations
    # Include orbit elements
    # Include characterization
    pass

@app.route('/api/v1/campaigns', methods=['POST'])
def create_campaign():
    """
    Create observation campaign.

    Request body:
        {
            'name': str,
            'description': str,
            'target_object_id': int (optional),
            'stage': int (1, 2, or3),
            'sites': [int],  # site_ids to involve
            'scheduled_start': str (ISO datetime),
            'scheduled_end': str (ISO datetime)
        }
    """
    # IMPLEMENT THIS
    # Validate
    # Create campaign
    # Schedule observations
    pass

if __name__ == '__main__':
    app.run(debug=True)

Component3: Characterization

Location: /characterization/lightcurve.py

Purpose: Detect tumble periods from light curves.

Implementation Requirements:

"""
Light curve analysis for tumble detection.
"""

import numpy as np
from scipy.signal import lombscargle
from typing import List, Dict, Optional

class LightCurveAnalyzer:
    def __init__( self, config: Dict = None):
        """
        Initialize analyzer.

        config keys:
            - min_period: float - minimum period to search (seconds)
            - max_period: float - maximum period to search (seconds)
            - significance_threshold: float - for peak detection
        """
        self.config = config or {}

    def analyze(
        self,
        times: List[float],
        magnitudes: List[float],
        errors: List[float] = None
    ) -> Dict:
        """
        Analyze light curve for temporal variations.

        Args:
            times: Observation times (seconds since epoch orJD)
            magnitudes: Observed magnitudes
            errors: Photometric errors (optional)

        Returns:
            {
                'is_tumbling': bool,
                'period': float or None (seconds),
                'period_uncertainty': float or None,
                'amplitude': float (mag),
                'mean_magnitude': float,
                'lightcurve': {
                    'times': List[float],
                    'magnitudes': List[float],
                    'model': List[float] (if period found)
                },
                'periodogram': {
                    'frequencies': List[float],
                    'power': List[float]
                }
            }
        """
        # IMPLEMENT THIS
        # Remove outliers
        # Normalize magnitudes
        # Compute periodogram
        # Find significant peaks
        # Fit sinusoidal model if period found
        pass

    def lomb_scargle_periodogram(
        self,
        times: np.ndarray,
        magnitudes: np.ndarray,
        min_freq: float,
        max_freq: float,
        oversample: int =5
    ) -> Dict:
        """
        Compute Lomb-Scargle periodogram.

        Returns:
            {
                'frequencies': np.ndarray,
                'power': np.ndarray,
                'peak_freq': float,
                'peak_power': float,
                'significance': float (sigma)
            }
        """
        # IMPLEMENT THIS
        pass

    def fit_model(
        self,
        times: np.ndarray,
        magnitudes: np.ndarray,
        period: float
    ) -> np.ndarray:
        """
        Fit sinusoidal model to phased light curve.

        Returns:
            Model magnitudes at given times
        """
        # IMPLEMENT THIS
        pass

Location: /characterization/material_classifier.py

Purpose: Classify material from photometric colors.

Implementation Requirements:

"""
Material classification from photometric colors.
"""

from typing import Dict, List

class MaterialClassifier:
    # Known material signatures (from SSA.md)
    SIGNATURES = {
        'S-type': {
            'B-V': (0.5, 1.0),
            'V-R': (0.4, 0.8),
            'R-I': (0.3, 0.6),
            'features': ['950nm_absorption'],
            'description': 'Stony/metallic, silicates'
        },
        'C-type': {
            'B-V': (0.3, 0.5),
            'V-R': (0.2, 0.4),
            'R-I': (0.1, 0.3),
            'features': ['flat_spectrum'],
            'description': 'Carbonaceous, dark'
        },
        'M-type': {
            'B-V': (0.4, 0.6),
            'V-R': (0.3, 0.5),
            'R-I': (0.2, 0.4),
            'features': ['red_slope', 'no_absorption'],
            'description': 'Metallic'
        },
        'solar_panel': {
            'B-V': (0.0, 0.2),
            'V-R': (-0.1, 0.1),
            'R-I': (-0.2, 0.0),
            'features': ['blue', 'high_albedo'],
            'description': 'Solar panel, active satellite'
        },
        'paint_debris': {
            'B-V': (-0.1, 0.3),
            'V-R': (-0.1, 0.2),
            'R-I': (-0.1, 0.2),
            'features': ['variable', 'high_albedo'],
            'description': 'Paint flakes, highly reflective'
        }
    }

    def __init__(self):
        pass

    def classify(
        self,
        color_indices: Dict[str, float]
    ) -> Dict:
        """
        Classify material from photometric colors.

        Args:
            color_indices: {
                'B-V': float or None,
                'V-R': float or None,
                'R-I': float or None
            }

        Returns:
            {
                'classification': str,
                'confidence': float,
                'possible_types': [  # ranked by likelihood
                    {'type': str, 'score': float},
                    ...
                ],
                'notes': str
            }
        """
        # IMPLEMENT THIS
        # Score each material type
        # Rank by match score
        # Return top classification
        pass

    def score_match(
        self,
        color_indices: Dict[str, float],
        signature: Dict
    ) -> float:
        """
        Calculate match score between colors and signature.

        Returns:
            Score between 0.0 and1.0
        """
        # IMPLEMENT THIS
        pass

    def estimate_size(
        self,
        magnitude: float,
        distance_km: float,
        albedo: float = 0.1
    ) -> float:
        """
        Estimate physical size from magnitude.

        Args:
            magnitude: Apparent magnitude
            distance_km: Distance to object in km
            albedo: Reflectivity (0.1 typical fordebris)

        Returns:
            Estimated diameter in meters
        """
        # IMPLEMENT THIS
        # Use: size = 2 * distance * 10^((magnitude - H_sun)/5)
        # Where H_sun = -26.74 (apparent mag of sun)
        pass

Testing Requirements

Unit Tests

Each module must have corresponding test file:

tests/
├── test_edge/
│   ├── test_streak_detector.py
│   ├── test_plate_solver.py
│   ├── test_timing_sync.py
│   └── test_uploader.py
├── test_central/
│   ├── test_catalog.py
│   ├── test_track_associator.py
│   ├── test_triangulator.py
│   ├── test_orbit_solver.py
│   └── test_api.py
└── test_characterization/
    ├── test_lightcurve.py
    └── test_material_classifier.py

Integration Tests

# tests/test_integration.py

def test_end_to_end_detection():
    """
    Full pipeline test from camera to orbit.
    """
    # Generate synthetic video with ISS streak
    # Run edge detection
    # Upload to central
    # Correlate with catalog
    # Verify correct identification

def test_triangulation_accuracy():
    """
    Test triangulation with known object.
    """
    # Use ISS TLE
    # Simulate observations from 3 sites
    # Run triangulation
    # Compare calculated altitude vs known
    # Verify error < 50 km

def test_tumble_detection():
    """
    Test tumble detection with synthetic light curve.
    """
    # Generate sinusoidal light curve
    # Run analysis
    # Verify period detection
    # Assert period within 5% of true

Deployment Configuration

Edge Node Configuration

# edge_config.yaml

detection:
  threshold: 30
  min_line_length: 50
  max_line_gap: 10
  fps: 25

plate_solving:
  method: "astrometry_net"  # or "astropy"
  astrometry_api_key: "YOUR_KEY"
  fov_estimate: 50  # degrees

camera:
  device:0  # /dev/video0
  resolution: [1920, 1080]
  exposure: 0.001  # seconds
  gain: 10

timing:
  ntp_servers:
    - "pool.ntp.org"
    - "time.nist.gov"
  sync_interval: 300  # seconds

upload:
  server_url: "https://api.openastro.net"
  api_key: "YOUR_API_KEY"
  batch_size: 10
  retry_attempts: 3

logging:
  level: "INFO"
  file: "/var/log/ssa/edge.log"

Central Server Configuration

# central_config.yaml

database:
  host: "localhost"
  port: 5432
  database: "ssa"
  user: "ssa_user"
  password: "YOUR_PASSWORD"

redis:
  host: "localhost"
  port: 6379

space_track:
  username: "YOUR_USERNAME"
  password: "YOUR_PASSWORD"
  cache_ttl: 86400  # 24 hours

triangulation:
  min_sites: 2
  max_timing_error: 0.1  # seconds

orbit_determination:
  min_observations: 3
  max_arc_length: 86400  # seconds
  particle_count: 10000

api:
  host: "0.0.0.0"
  port: 5000
  debug: false

Performance Requirements

Component Requirement Target
Streak detection Process 25 fps Real-time on RPi 4
Plate solving < 10 seconds per frame < 5 seconds
Catalog query < 100 ms < 50 ms
Triangulation < 1 second < 500 ms
Orbit determination (Gauss) < 5 seconds < 2 seconds
Light curve analysis < 10 seconds < 5 seconds

Acceptance Criteria

Each component is complete when:

  1. ✅ All unit tests pass
  2. ✅ Integration tests pass
  3. ✅ Code follows style guide (PEP 8)
  4. ✅ Docstrings complete for all public functions
  5. ✅ Type hints on all function signatures
  6. ✅ No critical security issues
  7. ✅ Memory usage < 500 MB for edge node
  8. ✅ Performance meets requirements

Next Steps for Implementing Model

  1. Implement each module in order:
  2. /edge/streak_detector.py ⬅️ Start here
  3. /edge/plate_solver.py
  4. /edge/timing_sync.py
  5. /edge/uploader.py
  6. /edge/main.py
  7. /central/models.py
  8. /central/catalog.py
  9. /central/track_associator.py
  10. /central/triangulator.py
  11. /central/orbit_solver.py
  12. /central/api.py
  13. /characterization/lightcurve.py
  14. /characterization/material_classifier.py

  15. Write tests alongside implementation

  16. Deploy edge node first, then central

  17. Validate with ISS observations


Questions?

Ask clarifying questions before implementation. It's better to clarify early than refactor later.