SSA Bulk Coding Handoff Specification¶
Purpose¶
This document provides complete specifications for implementing the SSA (Space Situational Awareness) system. Each component is broken down into implementable units with clear inputs, outputs, and acceptance criteria.
For the implementing model: Read each section, implement the code as specified, follow the file structure exactly.
Project Structure¶
ssa/
├── edge/ # Raspberry Pi edge detection
│ ├── __init__.py
│ ├── streak_detector.py # Core detection algorithm
│ ├── plate_solver.py # Astrometry solution
│ ├── timing_sync.py # NTP timing
│ ├── frame_buffer.py # Video buffering
│ ├── uploader.py # Send to central
│ ├── config.py # Configuration
│ └── main.py # Entry point
│
├── central/ # Cloud server
│ ├── __init__.py
│ ├── catalog.py # TLE management
│ ├── track_associator.py # Link observations
│ ├── orbit_solver.py # Orbit determination
│ ├── triangulator.py # Multi-site position
│ ├── scheduler.py # Observation coordination
│ ├── api.py # REST API
│ ├── models.py # Database models
│ └── config.py # Configuration
│
├── characterization/ # Analysis module
│ ├── __init__.py
│ ├── lightcurve.py # Tumble detection
│ ├── material_classifier.py # Material analysis
│ └── size_estimator.py # Size estimation
│
├── tests/
│ ├── test_edge/
│ ├── test_central/
│ └── test_characterization/
│
├── scripts/
│ ├── deploy_edge.sh # Edge node deployment
│ ├── deploy_central.sh # Central server setup
│ └── test_with_iss.py # Integration test
│
└── docs/
├── API.md
├── DEPLOYMENT.md
└── TESTING.md
Component 1: Edge Detection Node¶
Location: /edge/streak_detector.py¶
Purpose: Detect satellite streaks in video frames from cheap cameras.
Input: - Video file path or camera stream - Configuration with detection parameters
Output: - JSON list of detected streaks with: timestamp, position (x,y), angle, length, brightness
Implementation Requirements:
"""
Streak detection module for satellite tracking.
Performance requirements:
- Process25 fps video in real-time on RPi 4
- Detect streaks down to magnitude 10
- False positive rate < 5%
"""
import cv2
import numpy as np
from typing import List, Dict
from dataclasses import dataclass
from datetime import datetime
@dataclass
class Streak:
timestamp: float # seconds since epoch
x_center: float # pixel coordinates
y_center: float
angle: float # radians
length: float # pixels
brightness: float # ADU
class StreakDetector:
def __init__(self, config: Dict):
"""
Initialize detector with configuration.
config keys:
- threshold: int - brightness threshold (default: 30)
- min_line_length: int - minimum streak length in pixels (default: 50)
- max_line_gap: int - maximum gap in streak (default: 10)
- fps: int - frames per second (default: 25)
"""
pass
def process_frame(self, frame: np.ndarray) -> List[Streak]:
"""
Process single frame forstreaks.
Algorithm:
1. Convert to grayscale
2. Subtract background (median of previous N frames)
3. Threshold for moving pixels
4. Apply morphological cleaning (remove noise)
5. Detect lines using Hough transform
6. Filter by length and velocity
7. Calculate brightness (sum along line)
Args:
frame: BGR or grayscale image array
Returns:
List of Streak objects detected in frame
"""
# IMPLEMENT THIS
pass
def process_video(self, video_path: str) -> List[Streak]:
"""
Process entire video file.
Args:
video_path: Path to video file
Returns:
List of all Streak objects detected across all frames
"""
# IMPLEMENT THIS
pass
def set_reference_frame(self, frame: np.ndarray):
"""
Set reference frame for background subtraction.
Should be a frame with stars only (no streaks).
Use median of multiple frames for best results.
"""
# IMPLEMENT THIS
pass
Testing Requirements:
def test_detector_basic():
"""Test detection of synthetic streak."""
# Create synthetic frame: black background, white streak
frame = np.zeros((1080, 1920), dtype=np.uint8)
cv2.line(frame, (100, 540), (1800, 600), 255, 2)
detector = StreakDetector({'threshold': 10})
streaks = detector.process_frame(frame)
assert len(streaks) == 1
assert streaks[0].length > 100
assert streaks[0].brightness > 0
def test_detector_no_streak():
"""Test that static frame produces no streaks."""
frame = np.random.randint(0, 30, (1080, 1920), dtype=np.uint8)
detector = StreakDetector({'threshold': 50})
streaks = detector.process_frame(frame)
assert len(streaks) == 0
Location: /edge/plate_solver.py¶
Purpose: Convert pixel coordinates to celestial coordinates (RA, Dec).
Input: - Image frame - Detected streak pixel positions
Output: - RA, Dec for each streak
Implementation Requirements:
"""
Astrometric plate solving for coordinate conversion.
Dependencies:
- astrometry.net (install separately)
OR
- astropy.coordinates
"""
from astropy.coordinates import SkyCoord
from astropy.wcs import WCS
import astropy.units as u
class PlateSolver:
def __init__(self, config: Dict):
"""
Initialize plate solver.
config keys:
- method: 'astrometry_net' or 'astropy'
- astrometry_api_key: str (if using astrometry.net)
- fov_estimate: float - estimated field of view in degrees
"""
pass
def solve_frame(self, frame: np.ndarray) -> Dict:
"""
Solve frame for WCS (World Coordinate System).
Returns:
{
'wcs': WCS object,
'ra_center': float (degrees),
'dec_center': float (degrees),
'scale': float (arcsec/pixel),
'rotation': float (degrees)
}
"""
# IMPLEMENT THIS
pass
def pixel_to_sky(self, x: float, y: float, wcs: WCS) -> SkyCoord:
"""
Convert pixel coordinates to sky coordinates.
Args:
x, y: Pixel coordinates
wcs: World Coordinate System from solve_frame()
Returns:
SkyCoord object with RA, Dec
"""
# IMPLEMENT THIS
pass
Testing Requirements:
def test_solve_synthetic():
"""Test plate solving on synthetic star field."""
# Create frame with known star positions
# Solve
# Verify RA, Dec match expected
pass
Location: /edge/timing_sync.py¶
Purpose: Synchronize timestamps across multiple nodes.
Implementation Requirements:
"""
Precision timing synchronization using NTP.
"""
import ntplib
import time
from datetime import datetime
class TimeSync:
def __init__(self, ntp_servers: List[str] = None):
"""
Initialize time synchronizer.
Args:
ntp_servers: List of NTP servers to query
Default: ['pool.ntp.org', 'time.nist.gov']
"""
if ntp_servers is None:
self.ntp_servers = ['pool.ntp.org', 'time.nist.gov']
else:
self.ntp_servers = ntp_servers
self.offset_ms = 0.0
def sync(self) -> bool:
"""
Synchronize with NTP servers.
Returns:
True if sync successful, False otherwise
"""
# IMPLEMENT THIS
# Query each server
# Calculate offset
# Store offset
pass
def get_precise_time(self) -> float:
"""
Get current time with NTP correction.
Returns:
Unix timestamp (seconds since epoch) with millisecond precision
"""
# IMPLEMENT THIS
pass
def get_status(self) -> Dict:
"""
Get synchronization status.
Returns:
{
'offset_ms': float,
'last_sync': datetime,
'source': str (which NTP server)
}
"""
pass
Location: /edge/uploader.py¶
Purpose: Send detection results to central server.
Implementation Requirements:
"""
Upload detection results to central server.
"""
import requests
import json
from typing import List
import backoff
class Uploader:
def __init__(self, config: Dict):
"""
Initialize uploader.
config keys:
- server_url: str - Central server base URL
- api_key: str - Authentication key
- batch_size: int - Upload in batches (default: 10)
- retry_attempts: int - Retry on failure (default: 3)
"""
pass
@backoff.on_exception(backoff.expo,
requests.RequestException,
max_tries=3)
def upload_streaks(self, streaks: List[Streak]) -> bool:
"""
Upload detected streaks to central server.
Args:
streaks: List ofStreak objects
Returns:
True if upload successful
"""
# IMPLEMENT THIS
# Convert Streaks to dict
# POST to /api/v1/streaks
# Handle retries
pass
def upload_status(self, status: Dict) -> bool:
"""
Upload node status.
Args:
status: {
'timestamp': float,
'cpu_temp': float,
'disk_free': float,
'uptime': float,
'frames_processed': int
}
"""
pass
Location: /edge/main.py¶
Purpose: Main entry point for edge node.
Implementation Requirements:
"""
Main entry point for edge detection node.
Usage:
python main.py --config edge_config.yaml
"""
import argparse
import time
from pathlib import Path
import yaml
from streak_detector import StreakDetector
from plate_solver import PlateSolver
from timing_sync import TimeSync
from uploader import Uploader
def main(config_path: str):
"""
Run edge detection pipeline.
Workflow:
1. Load configuration
2. Initialize components
3. Sync time
4. Capture frames
5. Detect streaks
6. Solve astrometry
7. Upload to central
8. Loop forever
"""
# Load config
with open(config_path) as f:
config = yaml.safe_load(f)
# Initialize
detector = StreakDetector(config['detection'])
solver = PlateSolver(config['plate_solving'])
timing = TimeSync(config['ntp_servers'])
uploader = Uploader(config['upload'])
# Sync time
timing.sync()
# Main loop
while True:
# IMPLEMENT THIS
pass
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--config', required=True)
args = parser.parse_args()
main(args.config)
Component 2: Central Server¶
Location: /central/models.py¶
Purpose: Database models for SSA data.
Implementation Requirements:
"""
SQLAlchemy models for SSA database.
"""
from sqlalchemy import Column, Integer, Float, String, DateTime, JSON
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime
Base = declarative_base()
class SpaceObject(Base):
"""Tracked object (satellite, debris, unknown)."""
__tablename__ = 'space_objects'
id = Column(Integer, primary_key=True)
norad_id = Column(Integer, unique=True, nullable=True)
cospar_id = Column(String(20), nullable=True)
name = Column(String(255))
object_type = Column(String(50)) # 'payload', 'debris', 'unknown'
# Orbital elements (latest)
semi_major_axis = Column(Float) # km
eccentricity = Column(Float)
inclination = Column(Float) # degrees
raan = Column(Float) # degrees
arg_perigee = Column(Float) # degrees
mean_anomaly = Column(Float) # degrees
epoch = Column(DateTime)
# Characterization
magnitude_estimate = Column(Float)
tumble_status = Column(String(50)) # 'stable', 'tumbling', 'unknown'
tumble_period = Column(Float) # seconds
material_class = Column(String(50)) # 'S', 'C', 'M', 'solar_panel', 'unknown'
size_estimate = Column(Float) # meters
# Metadata
first_seen = Column(DateTime, default=datetime.utcnow)
last_seen = Column(DateTime, default=datetime.utcnow)
observation_count = Column(Integer, default=0)
def to_dict(self):
return {
'id': self.id,
'norad_id': self.norad_id,
'name': self.name,
'object_type': self.object_type,
'tumble_status': self.tumble_status,
'last_seen': self.last_seen.isoformat()
}
class Observation(Base):
"""Single observation from one site."""
__tablename__ = 'observations'
id = Column(Integer, primary_key=True)
object_id = Column(Integer, nullable=True)
site_id = Column(Integer, nullable=False)
# Timing (precise)
timestamp = Column(Float, nullable=False) # Unix timestamp
timestamp_uncertainty = Column(Float) # seconds
# Position
ra = Column(Float, nullable=False) # degrees
dec = Column(Float, nullable=False) # degrees
ra_rate = Column(Float) # deg/sec
dec_rate = Column(Float) # deg/sec
# Photometry
magnitude = Column(Float)
filter_band = Column(String(10)) # 'B', 'V', 'R', 'I', 'clear'
# Quality
snr = Column(Float)
astrometric_uncertainty = Column(Float) # arcsec
# Metadata
created_at = Column(DateTime, default=datetime.utcnow)
def to_dict(self):
return {
'id': self.id,
'object_id': self.object_id,
'site_id': self.site_id,
'timestamp': self.timestamp,
'ra': self.ra,
'dec': self.dec,
'magnitude': self.magnitude
}
class SSANode(Base):
"""Detection node (camera site)."""
__tablename__ = 'ssa_nodes'
id = Column(Integer, primary_key=True)
name = Column(String(100), unique=True)
# Location
latitude = Column(Float) # degrees
longitude = Column(Float) # degrees
elevation = Column(Float) # meters
# Capability
min_magnitude = Column(Float) # detection limit
fov_degrees = Column(Float) # field of view
# Status
is_active = Column(Integer, default=1)
last_heartbeat = Column(DateTime)
created_at = Column(DateTime, default=datetime.utcnow)
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'latitude': self.latitude,
'longitude': self.longitude,
'is_active': self.is_active
}
class Campaign(Base):
"""Observation campaign."""
__tablename__ = 'campaigns'
id = Column(Integer, primary_key=True)
name = Column(String(255))
description = Column(String)
target_object_id = Column(Integer)
stage = Column(Integer) # 1=detection, 2=refinement, 3=characterization
status = Column(String(50)) # 'draft', 'active', 'completed', 'failed'
scheduled_start = Column(DateTime)
scheduled_end = Column(DateTime)
created_at = Column(DateTime, default=datetime.utcnow)
started_at = Column(DateTime)
completed_at = Column(DateTime)
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'stage': self.stage,
'status': self.status
}
Location: /central/catalog.py¶
Purpose: Manage TLE catalog and orbit propagation.
Implementation Requirements:
"""
Space-Track catalog integration and TLE propagation.
"""
import requests
from skyfield.api import load, EarthSatellite
from datetime import datetime
from typing import List, Dict, Optional
class CatalogManager:
def __init__(self, config: Dict):
"""
Initialize catalog manager.
config keys:
- space_track_username: str
- space_track_password: str
- cache_dir: str - Directory to cache TLEs
"""
self.session = None
self.cache = {}
pass
def login(self) -> bool:
"""
Authenticate with Space-Track.org.
Returns:
True if successful
"""
# IMPLEMENT THIS
pass
def get_tle(self, norad_id: int) -> Optional[Dict]:
"""
Get TLE for specific object.
Returns:
{
'norad_id':int,
'line1': str,
'line2': str,
'name': str,
'epoch': datetime
}
or None if not found
"""
# IMPLEMENT THIS
pass
def search_by_name(self, name: str) -> List[Dict]:
"""
Search for objects by name.
Returns:
List of matching TLEs
"""
pass
def propagate(self, tle: Dict, timestamp: datetime) -> Dict:
"""
Propagate TLE to given timestamp using SGP4.
Returns:
{
'ra': float (degrees),
'dec': float (degrees),
'altitude': float (km),
'velocity': float (km/s),
'geocentric': (x, y, z) in km
}
"""
# IMPLEMENT THIS
# Use skyfield for propagation
pass
def find_visible(
self,
site_lat: float,
site_lon: float,
site_elev: float,
time_start: datetime,
time_end: datetime
) -> List[Dict]:
"""
Find all objects visible from site in time window.
Returns:
List of objects with visibility windows
"""
# IMPLEMENT THIS
pass
Location: /central/track_associator.py¶
Purpose: Link observations into tracklets and identify objects.
Implementation Requirements:
"""
Associate observations with known objects or create new candidates.
"""
import numpy as np
from scipy.spatial.distance import cdist
from typing import List, Optional
class TrackAssociator:
def __init__(self, catalog: CatalogManager):
"""
Initialize associator with catalog.
"""
self.catalog = catalog
def associate(
self,
observation: Dict,
time_tolerance: float = 60.0, # seconds
position_tolerance: float = 1.0 # degrees
) -> Optional[int]:
"""
Associate observation with known object.
Args:
observation: {
'ra': float,
'dec': float,
'timestamp': float
}
time_tolerance: Maximum time difference for correlation
position_tolerance: Maximum angular separation (degrees)
Returns:
object_id if associated, None if new object
"""
# IMPLEMENT THIS
# Query catalog for objects near position
# Propagate TLE to observation time
# Check if within tolerance
pass
def create_candidate(self, observation: Dict) -> int:
"""
Create new candidate object from observation.
Returns:
new object_id
"""
# IMPLEMENT THIS
pass
def link_observations(
self,
observations: List[Dict],
max_angular_velocity: float = 5.0 # deg/sec for LEO
) -> List[List[int]]:
"""
Link observations into tracklets.
Algorithm:
1. Sort by time
2. For each observation:
3. Find observations within time window
4. Calculate angular separation / time delta
5. If consistent velocity: link
6. If velocity too high: skip
7. Return linked tracklets
Returns:
List of tracklets (each is list of observation IDs)
"""
# IMPLEMENT THIS
pass
Location: /central/triangulator.py¶
Purpose: Calculate 3D position from multiple observations.
Implementation Requirements:
"""
Multi-site triangulation for altitude determination.
"""
import numpy as np
from astropy.coordinates import EarthLocation, SkyCoord, AltAz
from astropy.time import Time
import astropy.units as u
from typing import List, Dict
class Triangulator:
def __init__(self):
"""Initialize triangulator."""
pass
def triangulate(
self,
observations: List[Dict],
method: str = 'least_squares'
) -> Dict:
"""
Calculate 3D position from multiple observations.
Args:
observations: List of {
'site': EarthLocation,
'time': Time,
'ra': float (degrees),
'dec': float (degrees)
}
method: 'least_squares' or 'geometric'
Returns:
{
'position_eci': (x, y, z) in km,
'altitude': float in km,
'range_to_sites': [r1, r2, r3],
'uncertainty': float in km,
'method': str
}
"""
# IMPLEMENT THIS
# Convert sites to ECI at each observation time
# Form line-of-sight vectors
# Find closest approach point
# Calculate altitude
# Estimate uncertainty from site geometry
pass
def geometric_triangulation(
self,
sites_ecef: List[np.ndarray],
los_vectors: List[np.ndarray]
) -> np.ndarray:
"""
Geometric intersection of multiple lines of sight.
Finds point minimizing distance to all lines.
"""
# IMPLEMENT THIS
pass
def estimate_uncertainty(
self,
position: np.ndarray,
sites_ecef: List[np.ndarray],
timing_uncertainties: List[float]
) -> float:
"""
Estimate position uncertainty from geometry and timing.
Args:
position: ECI position (km)
sites_ecef: Site positions in ECEF
timing_uncertainties: Timing errors in seconds
Returns:
Uncertainty in km
"""
# IMPLEMENT THIS
# Geometric dilution of precision
# Timing contribution
# Combine
pass
Location: /central/orbit_solver.py¶
Purpose: Determine orbits from observations.
Implementation Requirements:
"""
Orbit determination fromobservations.
"""
import numpy as np
from typing import List, Dict, Optional
class OrbitSolver:
def __init__(self):
"""Initialize orbit solver."""
pass
def gauss_method(
self,
observations: List[Dict]
) -> Dict:
"""
Determine orbit from 3+ observations using Gauss method.
Args:
observations: List of {
'time': Time,
'ra': float,
'dec': float,
'site': EarthLocation
}
Returns:
{
'semi_major_axis': float (km),
'eccentricity': float,
'inclination': float (degrees),
'raan': float (degrees),
'arg_perigee': float (degrees),
'mean_anomaly': float (degrees),
'epoch': Time,
'covariance': np.ndarray
}
"""
# IMPLEMENT THIS
# Classic Gauss method
# Use first3 observations
# Solve for orbital elements
pass
def admissible_region(
self,
observations: List[Dict],
n_particles: int = 10000
) -> List[Dict]:
"""
Create particle swarm for short-arc observations.
For observations too short for Gauss method.
Args:
observations: Short arc (< 1 min typically)
n_particles: Number of virtual particles
Returns:
List of possible orbital states
"""
# IMPLEMENT THIS
# Extract angles and rates
# Define (range, range_rate) grid
# Apply constraints:
# - Energy < 0 (bound)
# - Perigee > 100 km
# Sample particles
# Return initial states
pass
def propagate_particles(
self,
particles: List[Dict],
time_delta: float # seconds
) -> List[Dict]:
"""
Propagate particle cloud forward in time.
Args:
particles: List of orbital states
time_delta: Seconds to propagate
Returns:
Propagated particles
"""
# IMPLEMENT THIS
# Use SGP4 for each particle
pass
def prune_particles(
self,
particles: List[Dict],
new_observation: Dict,
tolerance: float = 1.0 # degrees
) -> List[Dict]:
"""
Remove particles inconsistent with new observation.
Args:
particles: Current particle cloud
new_observation: {
'time': Time,
'ra': float,
'dec': float
}
tolerance: Angular separation threshold
Returns:
Pruned particle list
"""
# IMPLEMENT THIS
# Propagate all particles to new time
# Calculate angular separation from observation
# Remove particles outside tolerance
pass
Location: /central/api.py¶
Purpose: REST API for SSA system.
Implementation Requirements:
"""
REST API for SSA system.
Endpoints:
POST /api/v1/streaks - Upload detections
GET /api/v1/objects - List tracked objects
GET /api/v1/objects/{id} - Get object details
POST /api/v1/campaigns - Create observation campaign
GET /api/v1/campaigns/{id} - Get campaign status
"""
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from models import Base, Observation, SpaceObject, SSANode, Campaign
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql:///ssa'
db = SQLAlchemy(app)
@app.route('/api/v1/streaks', methods=['POST'])
def upload_streaks():
"""
Upload detected streaks from edge node.
Request body:
{
'site_id': int,
'timestamp': float,
'streaks': [
{
'timestamp': float,
'x_center': float,
'y_center': float,
'angle': float,
'length': float,
'brightness': float,
'ra': float (if plate solved),
'dec': float (if plate solved)
},
...
]
}
Returns:
{
'status': 'ok',
'observations_created': int
}
"""
# IMPLEMENT THIS
# Validate input
# Associate with known objects
# Store in database
# Trigger downstream processing
pass
@app.route('/api/v1/objects', methods=['GET'])
def list_objects():
"""
List tracked space objects.
Query params:
- limit: int (default 50)
- offset: int (default 0)
- type: str (filter by object_type)
- min_magnitude: float
Returns:
{
'objects': [SpaceObject.to_dict(), ...],
'total': int,
'limit': int,
'offset': int
}
"""
# IMPLEMENT THIS
pass
@app.route('/api/v1/objects/<int:object_id>', methods=['GET'])
def get_object(object_id):
"""Get detailed object information."""
# IMPLEMENT THIS
# Include recent observations
# Include orbit elements
# Include characterization
pass
@app.route('/api/v1/campaigns', methods=['POST'])
def create_campaign():
"""
Create observation campaign.
Request body:
{
'name': str,
'description': str,
'target_object_id': int (optional),
'stage': int (1, 2, or3),
'sites': [int], # site_ids to involve
'scheduled_start': str (ISO datetime),
'scheduled_end': str (ISO datetime)
}
"""
# IMPLEMENT THIS
# Validate
# Create campaign
# Schedule observations
pass
if __name__ == '__main__':
app.run(debug=True)
Component3: Characterization¶
Location: /characterization/lightcurve.py¶
Purpose: Detect tumble periods from light curves.
Implementation Requirements:
"""
Light curve analysis for tumble detection.
"""
import numpy as np
from scipy.signal import lombscargle
from typing import List, Dict, Optional
class LightCurveAnalyzer:
def __init__( self, config: Dict = None):
"""
Initialize analyzer.
config keys:
- min_period: float - minimum period to search (seconds)
- max_period: float - maximum period to search (seconds)
- significance_threshold: float - for peak detection
"""
self.config = config or {}
def analyze(
self,
times: List[float],
magnitudes: List[float],
errors: List[float] = None
) -> Dict:
"""
Analyze light curve for temporal variations.
Args:
times: Observation times (seconds since epoch orJD)
magnitudes: Observed magnitudes
errors: Photometric errors (optional)
Returns:
{
'is_tumbling': bool,
'period': float or None (seconds),
'period_uncertainty': float or None,
'amplitude': float (mag),
'mean_magnitude': float,
'lightcurve': {
'times': List[float],
'magnitudes': List[float],
'model': List[float] (if period found)
},
'periodogram': {
'frequencies': List[float],
'power': List[float]
}
}
"""
# IMPLEMENT THIS
# Remove outliers
# Normalize magnitudes
# Compute periodogram
# Find significant peaks
# Fit sinusoidal model if period found
pass
def lomb_scargle_periodogram(
self,
times: np.ndarray,
magnitudes: np.ndarray,
min_freq: float,
max_freq: float,
oversample: int =5
) -> Dict:
"""
Compute Lomb-Scargle periodogram.
Returns:
{
'frequencies': np.ndarray,
'power': np.ndarray,
'peak_freq': float,
'peak_power': float,
'significance': float (sigma)
}
"""
# IMPLEMENT THIS
pass
def fit_model(
self,
times: np.ndarray,
magnitudes: np.ndarray,
period: float
) -> np.ndarray:
"""
Fit sinusoidal model to phased light curve.
Returns:
Model magnitudes at given times
"""
# IMPLEMENT THIS
pass
Location: /characterization/material_classifier.py¶
Purpose: Classify material from photometric colors.
Implementation Requirements:
"""
Material classification from photometric colors.
"""
from typing import Dict, List
class MaterialClassifier:
# Known material signatures (from SSA.md)
SIGNATURES = {
'S-type': {
'B-V': (0.5, 1.0),
'V-R': (0.4, 0.8),
'R-I': (0.3, 0.6),
'features': ['950nm_absorption'],
'description': 'Stony/metallic, silicates'
},
'C-type': {
'B-V': (0.3, 0.5),
'V-R': (0.2, 0.4),
'R-I': (0.1, 0.3),
'features': ['flat_spectrum'],
'description': 'Carbonaceous, dark'
},
'M-type': {
'B-V': (0.4, 0.6),
'V-R': (0.3, 0.5),
'R-I': (0.2, 0.4),
'features': ['red_slope', 'no_absorption'],
'description': 'Metallic'
},
'solar_panel': {
'B-V': (0.0, 0.2),
'V-R': (-0.1, 0.1),
'R-I': (-0.2, 0.0),
'features': ['blue', 'high_albedo'],
'description': 'Solar panel, active satellite'
},
'paint_debris': {
'B-V': (-0.1, 0.3),
'V-R': (-0.1, 0.2),
'R-I': (-0.1, 0.2),
'features': ['variable', 'high_albedo'],
'description': 'Paint flakes, highly reflective'
}
}
def __init__(self):
pass
def classify(
self,
color_indices: Dict[str, float]
) -> Dict:
"""
Classify material from photometric colors.
Args:
color_indices: {
'B-V': float or None,
'V-R': float or None,
'R-I': float or None
}
Returns:
{
'classification': str,
'confidence': float,
'possible_types': [ # ranked by likelihood
{'type': str, 'score': float},
...
],
'notes': str
}
"""
# IMPLEMENT THIS
# Score each material type
# Rank by match score
# Return top classification
pass
def score_match(
self,
color_indices: Dict[str, float],
signature: Dict
) -> float:
"""
Calculate match score between colors and signature.
Returns:
Score between 0.0 and1.0
"""
# IMPLEMENT THIS
pass
def estimate_size(
self,
magnitude: float,
distance_km: float,
albedo: float = 0.1
) -> float:
"""
Estimate physical size from magnitude.
Args:
magnitude: Apparent magnitude
distance_km: Distance to object in km
albedo: Reflectivity (0.1 typical fordebris)
Returns:
Estimated diameter in meters
"""
# IMPLEMENT THIS
# Use: size = 2 * distance * 10^((magnitude - H_sun)/5)
# Where H_sun = -26.74 (apparent mag of sun)
pass
Testing Requirements¶
Unit Tests¶
Each module must have corresponding test file:
tests/
├── test_edge/
│ ├── test_streak_detector.py
│ ├── test_plate_solver.py
│ ├── test_timing_sync.py
│ └── test_uploader.py
├── test_central/
│ ├── test_catalog.py
│ ├── test_track_associator.py
│ ├── test_triangulator.py
│ ├── test_orbit_solver.py
│ └── test_api.py
└── test_characterization/
├── test_lightcurve.py
└── test_material_classifier.py
Integration Tests¶
# tests/test_integration.py
def test_end_to_end_detection():
"""
Full pipeline test from camera to orbit.
"""
# Generate synthetic video with ISS streak
# Run edge detection
# Upload to central
# Correlate with catalog
# Verify correct identification
def test_triangulation_accuracy():
"""
Test triangulation with known object.
"""
# Use ISS TLE
# Simulate observations from 3 sites
# Run triangulation
# Compare calculated altitude vs known
# Verify error < 50 km
def test_tumble_detection():
"""
Test tumble detection with synthetic light curve.
"""
# Generate sinusoidal light curve
# Run analysis
# Verify period detection
# Assert period within 5% of true
Deployment Configuration¶
Edge Node Configuration¶
# edge_config.yaml
detection:
threshold: 30
min_line_length: 50
max_line_gap: 10
fps: 25
plate_solving:
method: "astrometry_net" # or "astropy"
astrometry_api_key: "YOUR_KEY"
fov_estimate: 50 # degrees
camera:
device:0 # /dev/video0
resolution: [1920, 1080]
exposure: 0.001 # seconds
gain: 10
timing:
ntp_servers:
- "pool.ntp.org"
- "time.nist.gov"
sync_interval: 300 # seconds
upload:
server_url: "https://api.openastro.net"
api_key: "YOUR_API_KEY"
batch_size: 10
retry_attempts: 3
logging:
level: "INFO"
file: "/var/log/ssa/edge.log"
Central Server Configuration¶
# central_config.yaml
database:
host: "localhost"
port: 5432
database: "ssa"
user: "ssa_user"
password: "YOUR_PASSWORD"
redis:
host: "localhost"
port: 6379
space_track:
username: "YOUR_USERNAME"
password: "YOUR_PASSWORD"
cache_ttl: 86400 # 24 hours
triangulation:
min_sites: 2
max_timing_error: 0.1 # seconds
orbit_determination:
min_observations: 3
max_arc_length: 86400 # seconds
particle_count: 10000
api:
host: "0.0.0.0"
port: 5000
debug: false
Performance Requirements¶
| Component | Requirement | Target |
|---|---|---|
| Streak detection | Process 25 fps | Real-time on RPi 4 |
| Plate solving | < 10 seconds per frame | < 5 seconds |
| Catalog query | < 100 ms | < 50 ms |
| Triangulation | < 1 second | < 500 ms |
| Orbit determination (Gauss) | < 5 seconds | < 2 seconds |
| Light curve analysis | < 10 seconds | < 5 seconds |
Acceptance Criteria¶
Each component is complete when:
- ✅ All unit tests pass
- ✅ Integration tests pass
- ✅ Code follows style guide (PEP 8)
- ✅ Docstrings complete for all public functions
- ✅ Type hints on all function signatures
- ✅ No critical security issues
- ✅ Memory usage < 500 MB for edge node
- ✅ Performance meets requirements
Next Steps for Implementing Model¶
- Implement each module in order:
/edge/streak_detector.py⬅️ Start here/edge/plate_solver.py/edge/timing_sync.py/edge/uploader.py/edge/main.py/central/models.py/central/catalog.py/central/track_associator.py/central/triangulator.py/central/orbit_solver.py/central/api.py/characterization/lightcurve.py-
/characterization/material_classifier.py -
Write tests alongside implementation
-
Deploy edge node first, then central
-
Validate with ISS observations
Questions?¶
Ask clarifying questions before implementation. It's better to clarify early than refactor later.