Source code for empyrean.od.result

"""Orbit determination result types and configuration.

This module mirrors the Rust wrapper's ``empyrean::ODConfig`` and
``empyrean::DetermineResult`` field-for-field, with the same nested
structure (no flattening). ``ODConfig()`` defaults are identical to
``ODConfig::default()`` on the Rust side so no surprises round-
tripping through the C ABI.
"""

from dataclasses import dataclass, field
from enum import Enum
from typing import TypeAlias

import numpy as np

from empyrean.coordinates.enums import Frame, Origin
from empyrean.od.residuals import (
    AcceptabilityReport,
    ObservationResults,
    ResidualSummary,
    StationBiases,
)
from empyrean.orbits.orbits import (
    CartesianOrbits,
    CometaryOrbits,
    KeplerianOrbits,
    SphericalOrbits,
)

# JSON-like value type for the nested wire dicts marshaled across the
# C ABI boundary (str / numeric / bool / None leaves, plus nested dicts
# and lists of the same).
WireValue: TypeAlias = str | int | float | bool | None | list["WireValue"] | dict[str, "WireValue"]


# ── Enums ────────────────────────────────────────────────────


class ForceModelTier(str, Enum):
    """Force-model tier for OD propagation."""

    APPROXIMATE = "approximate"
    BASIC = "basic"
    STANDARD = "standard"


[docs] class SolveForParams(str, Enum): """Parameters to solve for in differential correction. Mirrors ``scott::od::SolveForParams``. """ STATE_ONLY = "state_only" """Solve only for the 6-element state vector.""" STATE_AND_NONGRAV = "state_and_nongrav" """Solve for state + (A1, A2, A3) non-grav coefficients (9 params).""" AUTO = "auto" """Start with state-only, escalate to 9-param if reduced χ² or AT/CT ratio crosses thresholds set by :class:`AutoEscalationPolicy`."""
[docs] class CovarianceRepresentation(str, Enum): """Coordinate basis the OD output covariance is reported in.""" CARTESIAN = "cartesian" KEPLERIAN = "keplerian" COMETARY = "cometary" SPHERICAL = "spherical"
# ── Output epoch (tagged-union dataclass) ────────────────────
[docs] class OutputEpochMode(str, Enum): """How :class:`OutputEpoch` selects the fitted-orbit epoch. Mirrors the discriminant on ``scott::od::OutputEpoch``. """ MID_ARC = "mid_arc" """Midpoint of the observation arc (default). Resolved against the active observation set (not the full input arc) so multi-year arcs whose mid-arc target lies in a chaotic interval keep the integrator anchor inside the IOD opposition window.""" LAST_OBSERVATION = "last_observation" """Epoch of the last observation, resolved against the active set.""" IOD_EPOCH = "iod_epoch" """Anchor at the IOD-derived epoch — the state stays where the initial-orbit determination produced it. Matches OrbFit's ``epoch.eq0`` and find_orb's "anchor at most recent good fit" pattern.""" EXPLICIT = "explicit" """:attr:`OutputEpoch.mjd_tdb` is honored."""
[docs] @dataclass class OutputEpoch: """Output epoch for the fitted orbit. Mirrors ``scott::od::OutputEpoch``. """ mode: OutputEpochMode = OutputEpochMode.MID_ARC mjd_tdb: float | None = None """Required when ``mode == OutputEpochMode.EXPLICIT``."""
# ── Origin policy (tagged-union dataclass) ─────────────────── class OriginPolicyMode(str, Enum): """How :class:`OriginPolicy` selects the central body for IOD + DC. Mirrors the discriminant on ``scott::od::OriginPolicy``. """ AUTO = "auto" """Try heliocentric IOD + DC first. On hard error, on any ``outside_arc`` rejection tag, or when ``reduced_chi2 > 100``, cascade to body-centric Earth and return whichever produced the lower reduced χ². Default.""" EXPLICIT = "explicit" """Pin IOD + DC to a specific central body — set :attr:`OriginPolicy.origin` to the desired :class:`Origin`. Skips the cascade.""" @dataclass class OriginPolicy: """Origin-policy selector for the OD pipeline. Mirrors ``scott::od::OriginPolicy``. Auto handles TCOs / minimoons / geocentric impactors / chaotic- capture interiors without per-object regime classification by the caller. Explicit is required for cataloged satellites where heliocentric Gauss is unphysical, and recommended for pipelines that already know the regime. """ mode: OriginPolicyMode = OriginPolicyMode.AUTO origin: "Origin | str | None" = None """The central body to pin to. Required when ``mode == OriginPolicyMode.EXPLICIT``. Pass an :class:`Origin` instance or a canonical name string.""" # ── Nested config bundles ────────────────────────────────────
[docs] @dataclass class IODConfig: """IOD ranging tuning. Mirrors the IOD section of ``scott::od::ODConfig``. Defaults match ``ODConfig::default()``.""" max_triplet_attempts: int = 10 max_triplet_span_days: float = 30.0 opposition_gap_days: float = 90.0 """Set to a negative value to disable opposition splitting.""" max_iod_arc_days: float = 30.0 """Maximum arc length (days) used for IOD. Densest-sub-window selection on the highest-scoring opposition before IOD runs; DC inherits the same subset as group 0 and expands outward (find_orb / OrbFit progressive-arc pattern).""" curvature_snr_threshold: float = 3.0 max_iod_fractional_sigma_a: float = 1.0
[docs] @dataclass class AutoEscalationPolicy: """Trigger thresholds for :attr:`SolveForParams.AUTO` escalation. Mirrors ``scott::od::AutoEscalationPolicy``.""" reduced_chi2: float = 10.0 at_ct_ratio: float = 3.0 min_arc_days: float = 30.0 min_n_obs: int = 50
[docs] @dataclass class AcceptabilityThresholds: """Thresholds for the post-DC fit-acceptability sub-checks. Mirrors ``scott::od::AcceptabilityThresholds``. Defaults are tuned for production NEO survey work; tighten for Sentry-grade impact-monitoring orbits (e.g. ``fractional_sigma_a = 1e-4``), loosen for short-arc discovery fits. """ reduced_chi2: float = 3.0 rms_arcsec: float = 1.0 at_ct_ratio: float = 3.0 min_arc_days: float = 7.0 fractional_sigma_a: float = 0.1
# ── Weighting (mirrors empyrean::WeightingConfig) ─────────────
[docs] class WeightingPreset(str, Enum): """Preset selector for :class:`WeightingConfig`. Picking a preset seeds the layer chain with scott's curated layers; entries in :attr:`WeightingConfig.additional_layers` are appended in order. """ NONE = "none" """No preset — only ``additional_layers`` apply.""" VFC17 = "vfc17" """Vereš, Farnocchia, Chesley et al. 2017 station floors + nightly de-weighting at floor-σ policy. Production default.""" NEODYS = "neodys" """NEODyS production preset."""
[docs] class SigmaPolicy(str, Enum): """How a weighting layer's σ combines with the per-observation reported σ. Mirrors ``scott::weighting::SigmaPolicy``.""" DEFAULT_ONLY = "default_only" """``σ = reported`` if present, else ``σ = rule``. Default.""" FLOOR = "floor" """``σ = max(reported, rule)``. VFC17 / NEODyS production policy."""
[docs] class WeightingLayerKind(str, Enum): """Discriminator for :class:`WeightingLayer` variants.""" OBSERVATORY_RULE = "observatory_rule" NIGHTLY_DEWEIGHTING = "nightly_deweighting"
[docs] @dataclass class WeightingLayer: """One element of the weighting pipeline. Tagged-union shape — the active fields depend on :attr:`kind`. Mirrors ``scott::weighting::WeightingLayer``. """ kind: WeightingLayerKind = WeightingLayerKind.OBSERVATORY_RULE # ── ObservatoryRule fields ───────────────────────────────── obs_code: str = "" """MPC observatory code (e.g. ``"F51"``).""" sigma: tuple[float, float] = (1.0, 1.0) """1σ (RA·cos(δ), Dec) in arcseconds.""" start_epoch_mjd_tdb: float | None = None """Start of applicable time range (MJD TDB). ``None`` = unbounded.""" end_epoch_mjd_tdb: float | None = None """End of applicable time range (MJD TDB). ``None`` = unbounded.""" scale: float = 1.0 """Scale factor on the final weight.""" # ── NightlyDeweighting fields ────────────────────────────── max_gap_days: float = 0.5 """Max gap (days) between observations to count as the same night (NightlyDeweighting only)."""
def _default_weighting_layers() -> list["WeightingLayer"]: # Mirrors `scott::od::ODConfig::default()` — the production preset is # VFC17 station floors WITH NightlyDeweighting (1/√N within 0.5 days) # appended. Without the nightly layer, OD on objects with clustered # same-station-same-night observations diverges from `validate-core`'s # direct-scott path because the rejection layer treats high-weight # cluster residuals as outliers. return [ WeightingLayer( kind=WeightingLayerKind.NIGHTLY_DEWEIGHTING, max_gap_days=0.5, ) ]
[docs] @dataclass class WeightingConfig: """Observation weighting pipeline. Mirrors ``empyrean::WeightingConfig``. Default = enabled with the VFC17 preset + a NightlyDeweighting layer (production hot path; matches ``scott::od::ODConfig::default()``). Set ``enabled=False`` for uniform 1″ weighting; pick a different preset or replace ``additional_layers`` for custom pipelines. """ enabled: bool = True preset: WeightingPreset = WeightingPreset.VFC17 default_sigma_arcsec: float = 1.0 """Default 1σ when no rule applies (arcsec). Used only when ``preset = NONE``.""" sigma_policy: SigmaPolicy | None = None """Sigma combination policy override. ``None`` = use the preset's policy.""" additional_layers: list[WeightingLayer] = field(default_factory=_default_weighting_layers) """Layers appended to the preset's chain."""
# ── Debiasing (mirrors empyrean::DebiasingConfig) ─────────────
[docs] class DebiasingResolution(str, Enum): """Healpix resolution of a debiasing table.""" STANDARD = "standard" """NSIDE = 64, ~35 MB. Production default.""" HIRES = "hires" """NSIDE = 256, ~567 MB."""
[docs] @dataclass class DebiasingConfig: """Catalog-bias-correction configuration. Mirrors scott's ``Option<Arc<DebiasingTable>>`` field on ``ODConfig``. Default = enabled at standard resolution with no explicit path (uses the DataManager default lookup at ``~/.empyrean/data/bias.dat``). Set ``enabled=False`` to disable catalog debiasing entirely. """ enabled: bool = True resolution: DebiasingResolution = DebiasingResolution.STANDARD bias_dat_path: str | None = None
[docs] class RejectionKind(str, Enum): """Outlier-rejection strategy selector. Pick the variant that matches the reference pipeline you're interoperating with — `ADAPTIVE` is the production default (information-loss-weighted, Layer 3); `CMC2003` matches the OrbFit / NEODyS χ²-with-hysteresis scheme of Carpino, Milani & Chesley (2003). """ ADAPTIVE = "adaptive" CMC2003 = "cmc2003"
[docs] @dataclass class RejectionConfig: """Outlier-rejection configuration. The active fields are determined by :attr:`kind`. Mirrors ``scott::rejection::RejectionStrategy`` plus the upstream ``max_rejection_passes`` knob. Set ``enabled=False`` to disable the rejection pass entirely. """ enabled: bool = True kind: RejectionKind = RejectionKind.ADAPTIVE """Strategy selector. Default :attr:`RejectionKind.ADAPTIVE`.""" # ── Adaptive (kind = ADAPTIVE) ────────────────────────── chi2_base: float = 9.21 """χ²(2 dof, p = 0.01) — Carpino, Milani & Chesley 2003. Adaptive rejection only.""" lambda_: float = 1.0 """Adaptation strength. ``0`` reduces to standard χ² rejection; higher values protect informative observations more. Adaptive rejection only.""" max_threshold: float = 100.0 """Effective-threshold cap for adaptive rejection.""" # ── CMC2003 (kind = CMC2003) ──────────────────────────── chi2_rej: float = 8.0 """χ²-with-hysteresis upper threshold — reject when χ² > chi2_rej. CMC2003 only. Default 8.0 (≈ 98.2% confidence at 2 DOF).""" chi2_rec: float = 7.0 """χ²-with-hysteresis lower threshold — recover a previously- rejected observation when χ² < chi2_rec. CMC2003 only. Must satisfy ``chi2_rec < chi2_rej`` for hysteresis to break cycles. Default 7.0 (≈ 96.9% confidence at 2 DOF).""" # ── Both ──────────────────────────────────────────────── max_passes: int = 4
[docs] @dataclass class StationRaDecConfig: """Per-station RA/Dec bias-fit configuration. Schur-eliminated nuisance parameters that absorb per-station pointing offsets, fit alongside the orbit. Default thresholds target modern survey arcs. Attributes ---------- sigma_prior_arcsec : float 1-σ Gaussian prior on the per-station offset, in arcseconds. Default 0.3. min_obs_per_station : int Minimum observations per station required to allocate a bias parameter for that station. Default 5. """ sigma_prior_arcsec: float = 0.3 min_obs_per_station: int = 5
# ── Top-level config ─────────────────────────────────────────
[docs] @dataclass class ODConfig: """Unified orbit-determination configuration. Sensible production defaults out of the box: - VFC17 station weighting + nightly de-weighting (:attr:`WeightingConfig.preset`) - EFCC2020 catalog debiasing enabled (:attr:`DebiasingConfig.enabled`) - :attr:`SolveForParams.AUTO` (escalates 6→9 parameters on poor fit) - Adaptive outlier rejection enabled, ``max_passes = 4`` """ # ── Shared (all OD entry points) ──────────────────────── force_model: ForceModelTier = ForceModelTier.STANDARD epsilon: float = 1e-9 """Adaptive integrator truncation-error tolerance.""" max_light_time_iterations: int = 3 num_threads: int = 0 """``0`` = use all available cores.""" frame: Frame = Frame.ICRF weighting: "WeightingConfig" = field(default_factory=lambda: WeightingConfig()) """Observation weighting pipeline. Default = enabled + VFC17 preset. See :class:`WeightingConfig` for full layered control.""" debiasing: "DebiasingConfig" = field(default_factory=lambda: DebiasingConfig()) """Catalog-bias-correction configuration. Default = EFCC2020 standard resolution loaded from the engine's default data location. See :class:`DebiasingConfig`.""" excluded_perturbers: list[Origin | str] = field(default_factory=list) """Bodies to omit from the perturber set. Pass :class:`Origin` instances (or canonical names). Useful when fitting an asteroid that the force model would otherwise include as a perturber — e.g. fitting Eros while excluding ``Origin.asteroid(433)``.""" origin: OriginPolicy = field(default_factory=OriginPolicy) """Origin-policy selector. Default :attr:`OriginPolicyMode.AUTO` (heliocentric → geocentric Earth cascade). Set ``origin=OriginPolicy(mode=OriginPolicyMode.EXPLICIT, origin=Origin.EARTH)`` to pin the pipeline to a specific central body for catalog satellites or regime-classified workflows.""" # ── IOD (determine only) ──────────────────────────────── iod: IODConfig = field(default_factory=IODConfig) # ── Differential correction ───────────────────────────── output_epoch: OutputEpoch = field(default_factory=OutputEpoch) max_iterations: int = 100 convergence_tol: float = 1e-5 use_stm_cache: bool = True solve_for: SolveForParams = SolveForParams.AUTO auto_escalation: AutoEscalationPolicy = field(default_factory=AutoEscalationPolicy) acceptability: AcceptabilityThresholds = field(default_factory=AcceptabilityThresholds) fit_station_biases: bool = False """Enable Schur-eliminated per-station RA/Dec bias fitting.""" station_radec: StationRaDecConfig = field(default_factory=StationRaDecConfig) use_span_grouping: bool = False # ── Rejection ────────────────────────────────────────── rejection: RejectionConfig = field(default_factory=RejectionConfig) auto_force_model: bool = False """Auto-select force-model tier from IOD orbital elements.""" output_representation: CovarianceRepresentation = CovarianceRepresentation.CARTESIAN def _to_wire_dict(self) -> dict[str, WireValue]: """Serialize to the nested dict shape the binding consumes. Internal — called by :func:`empyrean.determine` / :func:`empyrean.evaluate` / :func:`empyrean.refine` to marshal the config across the FFI boundary. For user-facing serialization (saving config to JSON, displaying it in a notebook, etc.), use :func:`dataclasses.asdict`. """ return { "force_model": _enum_value(self.force_model), "epsilon": self.epsilon, "max_light_time_iterations": self.max_light_time_iterations, "num_threads": self.num_threads, "frame": _enum_value(self.frame), "weighting": _weighting_to_dict(self.weighting), "debiasing": _debiasing_to_dict(self.debiasing), "excluded_perturbers_naif": [_origin_to_naif(o) for o in self.excluded_perturbers], "origin": { "mode": _enum_value(self.origin.mode), "naif_id": ( _origin_to_naif(self.origin.origin) if self.origin.origin is not None else None ), }, "iod": { "max_triplet_attempts": self.iod.max_triplet_attempts, "max_triplet_span_days": self.iod.max_triplet_span_days, "opposition_gap_days": self.iod.opposition_gap_days, "max_iod_arc_days": self.iod.max_iod_arc_days, "curvature_snr_threshold": self.iod.curvature_snr_threshold, "max_iod_fractional_sigma_a": self.iod.max_iod_fractional_sigma_a, }, "output_epoch": { "mode": self.output_epoch.mode, "mjd_tdb": self.output_epoch.mjd_tdb, }, "max_iterations": self.max_iterations, "convergence_tol": self.convergence_tol, "use_stm_cache": self.use_stm_cache, "solve_for": _enum_value(self.solve_for), "auto_escalation": { "reduced_chi2": self.auto_escalation.reduced_chi2, "at_ct_ratio": self.auto_escalation.at_ct_ratio, "min_arc_days": self.auto_escalation.min_arc_days, "min_n_obs": self.auto_escalation.min_n_obs, }, "acceptability": { "reduced_chi2": self.acceptability.reduced_chi2, "rms_arcsec": self.acceptability.rms_arcsec, "at_ct_ratio": self.acceptability.at_ct_ratio, "min_arc_days": self.acceptability.min_arc_days, "fractional_sigma_a": self.acceptability.fractional_sigma_a, }, "fit_station_biases": self.fit_station_biases, "station_radec": { "sigma_prior_arcsec": self.station_radec.sigma_prior_arcsec, "min_obs_per_station": self.station_radec.min_obs_per_station, }, "use_span_grouping": self.use_span_grouping, "rejection": { "enabled": self.rejection.enabled, "kind": _enum_value(self.rejection.kind), "chi2_base": self.rejection.chi2_base, # Python alias `lambda_` keeps the keyword from # collising with the language; wire format uses bare # `lambda` so it round-trips through Rust unchanged. "lambda": self.rejection.lambda_, "max_threshold": self.rejection.max_threshold, "chi2_rej": self.rejection.chi2_rej, "chi2_rec": self.rejection.chi2_rec, "max_passes": self.rejection.max_passes, }, "auto_force_model": self.auto_force_model, "output_representation": _enum_value(self.output_representation), }
def _weighting_to_dict(w: WeightingConfig) -> dict[str, WireValue]: """Serialize a :class:`WeightingConfig` to the wire dict the PyO3 bridge expects.""" return { "enabled": w.enabled, "preset": _enum_value(w.preset), "default_sigma_arcsec": w.default_sigma_arcsec, "sigma_policy": _enum_value(w.sigma_policy) if w.sigma_policy is not None else None, "additional_layers": [_weighting_layer_to_dict(layer) for layer in w.additional_layers], } def _weighting_layer_to_dict(layer: WeightingLayer) -> dict[str, WireValue]: return { "kind": _enum_value(layer.kind), "obs_code": layer.obs_code, "sigma": list(layer.sigma), "start_epoch_mjd_tdb": layer.start_epoch_mjd_tdb, "end_epoch_mjd_tdb": layer.end_epoch_mjd_tdb, "scale": layer.scale, "max_gap_days": layer.max_gap_days, } def _debiasing_to_dict(d: DebiasingConfig) -> dict[str, WireValue]: return { "enabled": d.enabled, "resolution": _enum_value(d.resolution), "bias_dat_path": d.bias_dat_path, } def _enum_value(v: Enum | str) -> str: """Accept either an Enum or a bare string; return a string.""" return str(v.value) if isinstance(v, Enum) else str(v) def _origin_to_naif(o: Origin | str) -> int: """Internal — resolve an :class:`Origin` (or canonical name) to the integer body code the binding wire format uses.""" from empyrean._convert import origin_to_naif return origin_to_naif(o) # ── Result types ───────────────────────────────────────────── # Re-export StationBiases at the result module so callers can import # it alongside the other OD types. __all__ = [] # Any of the four orbit flavors that can come back from a determine / # refine, depending on `ODConfig.output_representation`. OrbitsTable = CartesianOrbits | KeplerianOrbits | CometaryOrbits | SphericalOrbits
[docs] @dataclass class EvaluateResult: """Result of orbit evaluation (residuals only, no fitting).""" observations: ObservationResults summary: ResidualSummary
[docs] @dataclass class DetermineResult: """Result of orbit determination — returned by both :func:`~empyrean.od.determine.determine` (full IOD + DC pipeline) and :func:`~empyrean.od.determine.refine` (Bayesian-prior fit against an existing orbit + covariance). Mirrors the Rust wrapper's ``empyrean::DetermineResult``. """ orbit: OrbitsTable """Fitted orbit. Coordinate flavor matches :attr:`ODConfig.output_representation`.""" observations: ObservationResults """Per-observation residuals + rejection / influence diagnostics.""" summary: ResidualSummary iterations: int update_norm: float converged: bool covariance: np.ndarray """Fitted 6×6 state covariance, in :attr:`covariance_representation`.""" covariance_representation: CovarianceRepresentation covariance_9x9: np.ndarray | None """Full 9×9 covariance over (state, A1, A2, A3) when solving for non-grav.""" non_grav_delta: np.ndarray | None """Cumulative non-grav corrections (ΔA1, ΔA2, ΔA3) when present.""" rejection_passes: int num_oppositions_fit: int force_model_used: ForceModelTier solve_for_used: SolveForParams acceptability: AcceptabilityReport station_biases: StationBiases """Per-station fitted nuisance biases when :attr:`ODConfig.fit_station_biases` was active. Empty quivr table otherwise."""