Skip to content

Space Situational Awareness Implementation Roadmap

Overview

This document outlines the implementation path from current state to operational SSA system, organized by priority and dependencies.


Critical Path

Phase 0: Foundation (Weeks 1-4)
    β”‚
    β”œβ”€β”€ Hardware Procurement
    β”‚   └── Detection node kits (IMX307 + RPi)
    β”‚
    β”œβ”€β”€ Software Infrastructure
    β”‚   β”œβ”€β”€ Edge detection (OpenCV)
    β”‚   β”œβ”€β”€ Track association (PostgreSQL)
    β”‚   └── Orbit determination (Orekit)
    β”‚
    └── Testing Framework
        └── Validation with ISS/TLE objects

        β”‚
        β–Ό

Phase 1: Detection Network (Weeks 5-12)
    β”‚
    β”œβ”€β”€ Deploy 10Γ— Detection Nodes
    β”‚   └── Geographic distribution
    β”‚
    β”œβ”€β”€ Implement Edge Processing
    β”‚   β”œβ”€β”€ Streak detection
    β”‚   β”œβ”€β”€ Plate solving
    β”‚   └── Timing synchronization
    β”‚
    └── Catalog Integration
        └── Space-Track.org TLE correlation
        β”‚
        β–Ό

Phase 2: Refinement Layer (Weeks 13-20)
    β”‚
    β”œβ”€β”€ Add 3Γ— Tracking Stations
    β”‚   └── Entry-level telescopes
    β”‚
    β”œβ”€β”€ Implement Triangulation
    β”‚   └── Multi-site parallax
    β”‚
    β”œβ”€β”€ Orbit Refinement
    β”‚   └── Gauss method implementation
    β”‚
    └── Admissible Region Filter
        └── Particle propagation

        β”‚
        β–Ό

Phase 3: Characterization (Weeks 21-28)
    β”‚
    β”œβ”€β”€ Light Curve Pipeline
    β”‚   └── Tumble detection
    β”‚
    β”œβ”€β”€ Multi-filter Photometry
    β”‚   └── Material classification
    β”‚
    └── Partnership Agreements
        └── Data sharing with existing networks

        β”‚
        β–Ό

Phase 4: CubeSat Future (Years 2-3)
    β”‚
    β”œβ”€β”€ Payload Development
    β”‚   └── Lidar design
    β”‚
    β”œβ”€β”€ Spacecraft Integration
    β”‚   └── 3U CubeSat bus
    β”‚
    └── Launch & Operations
        └── Rideshare deployment

Phase 0: Foundation (Weeks 1-4)

Hardware Procurement

Detection Node Bill of Materials:

Component Model Quantity Unit Cost Total Source
Camera sensor IMX307 or IMX291 1 $15-30 $15-30 AliExp, ArduCam
Lens 8mm f/1.2 1 $20 $20 CCTV suppliers
Computer Raspberry Pi 4 (4GB) 1 $55 $55 Approved distributors
Storage SD card 128GB 1 $15 $15 Any
Enclosure 3D printed 1 $5-10 $5-10 DIY
Power supply 5V 3A USB-C 1 $10 $10 Standard
Cables HDMI, USB, etc. bundle $10 $10 Standard
Total per node $130-145

Tracking Station Bill of Materials:

Component Model Quantity Unit Cost Total Notes
Telescope Used 6" Dob 1 $200-400 $200-400 Craigslist, CloudyNights
Camera ZWO ASI290MM Mini 1 $299 $299 Or modified webcam $100
Guide scope 30mm finder 1 $50 $50 Used OK
Computer Mini PC / NUC 1 $200 $200 Or used laptop
Mount control DIY Arduino 1 $30 $30 If not motorized

Software Setup

Edge Detection (Raspberry Pi):

# Install dependencies
sudo apt update
sudo apt install python3-opencv python3-numpy python3-astropy

# Create detection pipeline
mkdir -p ~/ssa/edge
cd ~/ssa/edge

# Core components needed:
# - streak_detector.py (Hough transform)
# - plate_solver.py (astrometry)
# - uploader.py (send to central)

Central Server:

# Cloud VPS setup (DigitalOcean / Linode / etc.)
# Minimum: 4 vCPU, 8GB RAM, 100GB SSD
# Cost: ~$40-80/month

# Install dependencies
sudo apt install postgresql redis-server python3-pip

# Python packages
pip3 install astropy skyfield orekit numpy scipy

Validation Testing

Test targets (verify system works):

Target Magnitude Orbit Test Purpose
ISS -3 to -1 LEO Bright, well-tracked Baseline detection
Hubble 2-3 LEO Medium brightness, known orbit
Sentinel-6 4-5 LEO Fainter, still trackable
GOES satellites 5-7 GEO Stationary, test long exposure

Validation checklist:

  • [ ] Detection node captures frames at 25fps
  • [ ] Edge software detects ISS streak
  • [ ] Plate solving identifies stars correctly
  • [ ] Position accuracy within 30 arcsec
  • [ ] Upload to central server succeeds
  • [ ] Central server correlates with known TLE
  • [ ] Multiple nodes report simultaneously
  • [ ] Triangulation gives altitude within 50% error

Phase 1: Detection Network (Weeks 5-12)

Geographic Distribution Strategy

Optimal node placement:

Goal: Maximize baseline while maintaining coverage

For regional (single continent):
  - 3 nodes minimum for triangulation
  - Ideal separation: 500-3000 km
  - Example placements:
    * Node A: Urban center (North)
    * Node B: Urban center (South)
    * Node C: Urban center (East/West)

For continental:
  - 10 nodes across continent
  - Covers all weather patterns
  - Example (North America):
    * Seattle, WA
    * Los Angeles, CA
    * Denver, CO
    * Chicago, IL
    * Dallas, TX
    * New York, NY
    * Miami, FL
    * Boston, MA
    * Toronto, ON
    * Phoenix, AZ

For global:
  - 20+ nodes worldwide
  - Focus on populated areas + partners

Edge Processing Implementation

StreakDetection Pipeline:

# /edge/streak_detector.py

import cv2
import numpy as np
from astropy.io import fits

class StreakDetector:
    def __init__(self, config):
        self.config = config
        self.reference_frame = None

    def process_video(self, video_path):
        """
        Process raw video forstreaks.

        Input: Video file or stream
        Output: List of (timestamp, x, y, angle, length, brightness)
        """
        cap = cv2.VideoCapture(video_path)
        fps = cap.get(cv2.CAP_PROP_FPS)

        streaks = []
        frame_idx = 0

        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            # Convert to grayscale
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

            # Subtract reference (stars)
            if self.reference_frame is not None:
                diff = cv2.absdiff(gray, self.reference_frame)
            else:
                diff = gray

            # Threshold for moving objects
            _, thresh = cv2.threshold(diff, 30, 255, cv2.THRESH_BINARY)

            # Hough Line Transform for streaks
            lines = cv2.HoughLinesP(
                thresh,
                rho=1,
                theta=np.pi/180,
                threshold=50,
                minLineLength=100,
                maxLineGap=10
            )

            if lines is not None:
                for line in lines:
                    x1, y1, x2, y2 = line[0]
                    # Filter by velocity (distinguish from planes, birds)
                    # ...
                    streaks.append({
                        'timestamp': frame_idx / fps,
                        'x_center': (x1 + x2) / 2,
                        'y_center': (y1 + y2) / 2,
                        'angle': np.arctan2(y2 - y1, x2 - x1),
                        'length': np.sqrt((x2-x1)**2 + (y2-y1)**2)
                    })

            frame_idx += 1

        cap.release()
        return streaks

    def plate_solve(self, frame):
        """
        Use astrometry to solve frame coordinates.

        Requires: astrometry.net or similar
        """
        # Save as FITS
        # Submit to astrometry.net
        # Parse solution
        # Return: RA, Dec center, scale, rotation
        pass

Timing Synchronization:

# /edge/timing.py

import time
import ntplib
from datetime import datetime

class TimeSync:
    def __init__(self, ntp_servers=['pool.ntp.org', 'time.nist.gov']):
        self.ntp_servers = ntp_servers
        self.offset = 0  # ms offset from NTP

    def sync(self):
        """
        Synchronize with NTP servers.
        """
        client = ntplib.NTPClient()
        for server in self.ntp_servers:
            try:
                response = client.request(server, version=3)
                self.offset = response.offset * 1000  # to ms
                print(f"Synced to {server}, offset: {self.offset:.1f} ms")
                return True
            except:
                continue
        return False

    def get_timestamp(self):
        """
        Get precise timestamp with NTP correction.
        """
        return time.time() - (self.offset / 1000.0)

Catalog Integration

Space-Track API:

# /central/catalog.py

import requests
from skyfield.api import load, EarthSatellite

class SpaceTrackClient:
    def __init__(self, username, password):
        self.username = username
        self.password = password
        self.session = None

    def login(self):
        """
        Authenticate with Space-Track.org
        """
        url = "https://www.space-track.org/ajaxauth/login"
        self.session = requests.Session()
        response = self.session.post(url, data={
            'identity': self.username,
            'password': self.password
        })
        return response.status_code == 200

    def get_tles(self, norad_id=None, days=30):
        """
        Fetch TLEs for specified objects or all.
        """
        base_url = "https://www.space-track.org/basicspacedata/query"

        if norad_id:
            url = f"{base_url}/class/tle/NORAD_CAT_ID/{norad_id}/format/json"
        else:
            url = f"{base_url}/class/tle/PRESENT/1/orderby/ORDINAL/format/json"

        response = self.session.get(url)
        return response.json()

    def propagate(self, tle_line1, tle_line2, timestamp):
        """
        Propagate TLE to given timestamp using SGP4.
        """
        sat = EarthSatellite(tle_line1, tle_line2)
        ts = load.timescale()
        t = ts.utc(timestamp.year, timestamp.month, timestamp.day,
                   timestamp.hour, timestamp.minute, timestamp.second)

        position = sat.at(t)
        ra, dec, distance = position.radec()

        return {
            'ra': ra.degrees,
            'dec': dec.degrees,
            'distance': distance.km,
            'tle_satellite': sat
        }

Phase 2: Refinement Layer (Weeks 13-20)

Multi-Site Coordination

Visibility Calculator:

# /central/visibility.py

import numpy as np
from astropy.coordinates import EarthLocation, AltAz, get_body
from astropy.time import Time
import astropy.units as u

class VisibilityCalculator:
    def __init__(self, sites):
        """
        sites: list of (name, lat, lon, elevation)
        """
        self.sites = [
            EarthLocation(
                lat=lat*u.deg,
                lon=lon*u.deg,
                height=elevation*u.m
            )
            for name, lat, lon, elevation in sites
        ]

    def find_common_window(self, target_ra, target_dec, start_time, end_time):
        """
        Find time windows when target is visible from all sites.
        """
        from astropy.coordinates import SkyCoord

        target = SkyCoord(ra=target_ra*u.deg, dec=target_dec*u.deg)

        windows = []
        current_time = start_time

        while current_time < end_time:
            visible_from_all = True

            for site in self.sites:
                altaz = target.transform_to(
                    AltAz(obstime=Time(current_time), location=site)
                )

                # Check if above horizon and in dark
                if altaz.alt.deg < 20:  # Minimum altitude
                    visible_from_all = False
                    break

                # Check if sun is below horizon (twilight)
                sun = get_body('sun', Time(current_time), site)
                sun_altaz = sun.transform_to(
                    AltAz(obstime=Time(current_time), location=site)
                )

                if sun_altaz.alt.deg > -12:  # Astronomical twilight
                    visible_from_all = False
                    break

            if visible_from_all:
                windows.append(current_time)

            current_time += timedelta(minutes=1)

        # Merge consecutive windows
        merged_windows = self._merge_windows(windows)

        return merged_windows

Task Scheduler:

# /central/scheduler.py

class SSAScheduler:
    def schedule_simultaneous_observations(
        self,
        target_norad_id,
        sites,
        duration=60  # seconds
    ):
        """
        Schedule simultaneous observations across sites.
        """
        # Get TLE
        tle = self.catalog.get_tles(norad_id=target_norad_id)

        # Find visibility windows
        windows = self.visibility.find_common_window(
            tle['ra'], tle['dec'],
            start_time=datetime.utcnow(),
            end_time=datetime.utcnow() + timedelta(days=1)
        )

        if not windows:
            return None  # No common visibility

        # Select best window
        best_window = self._select_best_window(windows, sites)

        # Create observation tasks for each site
        tasks = []
        for site in sites:
            task = {
                'site_id': site.id,
                'target_norad_id': target_norad_id,
                'start_time': best_window['start'],
                'end_time': best_window['end'],
                'mode': 'track',  # or 'stare'
                'integration_time': duration,
                'expected_position': self._propagate_position(
                    tle, best_window['start']
                )
            }
            tasks.append(task)

        return tasks

Triangulation Module

# /central/triangulation.py

import numpy as np
from astropy.coordinates import SkyCoord, EarthLocation, AltAz
from astropy.time import Time
import astropy.units as u

class Triangulator:
    def triangulate_orbit(self, observations):
        """
        Triangulate object position from multiple ground observations.

        observations: list of {
            'site': EarthLocation,
            'time': Time,
            'ra': float (degrees),
            'dec': float (degrees),
            'timing_uncertainty': float (seconds)
        }

        Returns: {
            'position': (x, y, z) in ECI,
            'altitude': km,
            'uncertainty': km
        }
        """
        sites = [obs['site'] for obs in observations]
        times = [obs['time'] for obs in observations]
        ras = [obs['ra'] for obs in observations]
        decs = [obs['dec'] for obs in observations]

        # Convert to unit vectors from each site
        los_vectors = []
        for site, time, ra, dec in zip(sites, times, ras, decs):
            coord = SkyCoord(ra=ra*u.deg, dec=dec*u.deg)
            # Transform to AltAz at site
            altaz = coord.transform_to(AltAz(obstime=time, location=site))
            los_vectors.append(altaz.cartesian.xyz.value)

        # Find closest approach of multiple lines of sight
        position, distance = self._closest_approach(
            sites, times, los_vectors
        )

        # Calculate altitude
        altitude = self._calculate_altitude(position)

        # Estimate uncertainty from timing errors
        uncertainty = self._estimate_uncertainty(
            observations, position, los_vectors
        )

        return {
            'position': position,
            'altitude': altitude,
            'uncertainty': uncertainty
        }

    def _closest_approach(self, sites, times, los_vectors):
        """
        Find closest approach point for multiple lines of sight.

        Uses least squares to minimize distance from all lines.
        """
        # Transform sites to ECI coordinates at each time
        # ...
        # Set up system of equations
        # Solve for closest point
        # Return position and nearest approach distance
        pass

    def _calculate_altitude(self, position_eci):
        """
        Calculate altitude above Earth's surface.
        """
        earth_radius = 6371  # km
        distance_from_center = np.linalg.norm(position_eci)
        return distance_from_center - earth_radius

Phase 3: Characterization (Weeks 21-28)

Light Curve Analysis

# /central/characterization/lightcurve.py

from scipy.signal import lombscargle
import numpy as np

class LightCurveAnalyzer:
    def detect_tumble(self, times, magnitudes):
        """
        Detect tumbling period from light curve.

        Returns: {
            'is_tumbling': bool,
            'period': float (seconds) or None,
            'amplitude': float (mag) or None,
            'confidence': float
        }
        """
        # Normalize
        times = np.array(times)
        magnitudes = np.array(magnitudes)

        # Remove outliers
        mag_mean = np.mean(magnitudes)
        mag_std = np.std(magnitudes)
        mask = np.abs(magnitudes - mag_mean) < 5 * mag_std
        times = times[mask]
        magnitudes = magnitudes[mask]

        # Calculate time span
        time_span = times[-1] - times[0]

        # Define frequency range to search
        # Short periods: 1 second to 1 minute
        # Long periods: 1 minute to 10 minutes
        min_freq = 1.0 / (10 * 60)  # 10 minute period
        max_freq = 1.0  # 1 second period

        frequencies = np.linspace(min_freq, max_freq, 10000)

        # Lomb-Scargle periodogram
        angular_freqs = 2 * np.pi * frequencies
        power = lombscargle(times, magnitudes - mag_mean, angular_freqs)

        # Find peak
        peak_idx = np.argmax(power)
        peak_freq = frequencies[peak_idx]
        peak_power = power[peak_idx]

        # Calculate significance
        power_std = np.std(power)
        significance = (peak_power - np.mean(power)) / power_std

        # Period at peak
        period = 1.0 / peak_freq

        # Amplitude estimate
        amplitude = (np.max(magnitudes) - np.min(magnitudes)) / 2

        # Determine if tumbling
        # Tumbling if period is well-defined and amplitude is significant
        is_tumbling = significance > 3.0 and amplitude > 0.1

        return {
            'is_tumbling': is_tumbling,
            'period': period if is_tumbling else None,
            'amplitude': amplitude if is_tumbling else None,
            'confidence': min(significance / 5.0, 1.0)
        }

Material Classification

# /central/characterization/material_classifier.py

class MaterialClassifier:
    # Material signatures from SSA.md
    SIGNATURES = {
        'S-type': {
            'B-V': 0.5-1.0,
            'V-R': 0.4-0.8,
            'R-I': 0.3-0.6,
            'features': ['950nm absorption']
        },
        'C-type': {
            'B-V': 0.3-0.5,
            'V-R': 0.2-0.4,
            'R-I': 0.1-0.3,
            'features': ['flat spectrum']
        },
        'M-type': {
            'B-V': 0.4-0.6,
            'V-R': 0.3-0.5,
            'R-I': 0.2-0.4,
            'features': ['red slope', 'noabsorption']
        },
        'solar_panel': {
            'B-V': 0.0-0.2,
            'V-R': -0.1-0.1,
            'R-I': -0.2-0.0,
            'features': ['blue', 'high albedo']
        }
    }

    def classify_material(self, color_indices):
        """
        Classify material from photometric colors.

        color_indices: {
            'B-V': float,
            'V-R': float,
            'R-I': float
        }

        Returns: {
            'classification': str,
            'confidence': float,
            'notes': str
        }
        """
        scores = {}

        for material, signature in self.SIGNATURES.items():
            score = 0
            for color, value in color_indices.items():
                if color in signature:
                    min_val, max_val = signature[color]
                    if min_val <= value <= max_val:
                        score += 1
            scores[material] = score / len(color_indices)

        best_match = max(scores, key=scores.get)

        return {
            'classification': best_match,
            'confidence': scores[best_match],
            'notes': f"Color indices: {color_indices}"
        }

Phase 4: CubeSat Future (Years 2-3)

See separate document: SSA CubeSat Deployment Plan.md


Testing & Validation

Unit Tests

Each software component needs tests:

# /tests/test_streak_detector.py

def test_streak_detection_basic():
    """Test detection of simulated streak."""
    # Create synthetic frame with streak
    frame = create_test_frame(streak=True)
    detector = StreakDetector()
    streaks = detector.process_frame(frame)

    assert len(streaks) == 1
    assert streaks[0]['angle'] == expected_angle

def test_plate_solving():
    """Test astrometric solution."""
    # Load test frame with known stars
    frame = load_test_frame_with_stars()
    detector = StreakDetector()
    solution = detector.plate_solve(frame)

    assert abs(solution['ra'] - expected_ra) < 0.5  # degrees
    assert abs(solution['dec'] - expected_dec) < 0.5

Integration Tests

# /tests/test_integration.py

def test_end_to_end_detection():
    """
    Simulate observation from摄像倴 to catalog correlation.
    """
    # 1. Generate synthetic video with known satellite
    # 2. Run through edge detection
    # 3. Upload to central
    # 4. Correlate withcatalog
    # 5. Verify correct identification

def test_multi_site_triangulation():
    """
    Test triangulation with known object (ISS).
    """
    # 1. Get ISS TLE
    # 2. Propagate to known time
    # 3. Simulate observations from 3 sites
    # 4. Run triangulation
    # 5. Compare known position vs calculated
    # 6. Verify error < 50 km for this implementation

Deployment Checklist

Detection Node Deployment

  • [ ] Hardware assembled and tested indoors
  • [ ] Software installed and configured
  • [ ] Camera calibrated (dark frames, flat frames)
  • [ ] Plate solving validated with test images
  • [ ] NTP synchronization verified (< 50ms offset)
  • [ ] Network connectivity tested
  • [ ] Upload to central server working
  • [ ] Outdoor enclosure weatherproofed
  • [ ] Power supply stable
  • [ ] Location documented (lat, lon, elevation)

Central Server Deployment

  • [ ] VPS provisioned
  • [ ] PostgreSQL installed and configured
  • [ ] Redis installed and configured
  • [ ] fire installed
  • [ ] Catalog sync working (Space-Track)
  • [ ] SSL certificates configured
  • [ ] Monitoring configured (uptime, errors)
  • [ ] Backup system configured

Success Metrics

Metric Threshold Target
Detection rate (ISS) > 90% 95%
Position accuracy < 30 arcsec 15 arcsec
Timing sync < 50 ms 10 ms
Triangulation error < 100 km 50 km
Orbit prediction (24h) < 500 km 200 km
Light curve period accuracy < 10% 5%
Material classification confidence > 70% 85%
System uptime > 95% 99%

Risk Register

Risk Impact Probability Mitigation
Weather High 100% Multi-site redundancy
Equipment failure Medium 30% Cheap replacement, spares
Network outage Medium 10% Local buffer, retry
Timing drift High 20% Regular NTP sync, GPS backup
Software bugs Medium 50% Testing, validation
TLE catalog delay Low 10% Multi-source, own tracking
Light pollution Medium 40% Background subtraction
Aircraft confusion Low 20% Velocity discrimination