Source code for empyrean.propagation.result

"""Propagation result type.

:class:`PropagationResult` bundles the propagated states, dynamical
events, and (when available) per-orbit sensitivity chains. The
:meth:`PropagationResult.to_dir` / :meth:`PropagationResult.from_dir`
helpers persist the whole result as a directory of Parquet files +
a ``sensitivity/`` subdir.
"""

import os
from dataclasses import dataclass
from typing import TypeVar

import quivr as qv

from empyrean.ephemeris.sensitivity import StateSensitivities
from empyrean.orbits.orbits import CartesianOrbits
from empyrean.propagation.events import (
    AtmosphericEntries,
    AtmosphericExits,
    CaptureEnds,
    CaptureStarts,
    CloseApproachEnds,
    CloseApproachStarts,
    CovarianceRegimeChanges,
    Events,
    EventSummary,
    Impacts,
    Periapses,
    PossibleImpacts,
    ShadowEntries,
    ShadowExits,
)
from empyrean.propagation.tagged_covariance import (
    TaggedCovariance,
    TaggedCovariances,
)

# Table name → quivr class. Used by from_dir to map directory entries
# back into typed event tables.
_EVENT_TABLE_MAP = {
    "summary": EventSummary,
    "close_approach_starts": CloseApproachStarts,
    "close_approach_ends": CloseApproachEnds,
    "periapses": Periapses,
    "impacts": Impacts,
    "possible_impacts": PossibleImpacts,
    "atmospheric_entries": AtmosphericEntries,
    "atmospheric_exits": AtmosphericExits,
    "capture_starts": CaptureStarts,
    "capture_ends": CaptureEnds,
    "shadow_entries": ShadowEntries,
    "shadow_exits": ShadowExits,
    "covariance_regime_changes": CovarianceRegimeChanges,
}

_T = TypeVar("_T", bound=qv.Table)


def _load_event_table(table_cls: type[_T], path: str, name: str) -> _T:
    """Load one event table from ``{path}/{name}.parquet``, or empty.

    Preserves the precise table type of ``table_cls`` so callers receive
    a value typed as the concrete event-table subclass rather than the
    erased :class:`quivr.Table` base.
    """
    fpath = os.path.join(path, f"{name}.parquet")
    if os.path.exists(fpath):
        return table_cls.from_parquet(fpath)
    return table_cls.empty()


[docs] @dataclass class PropagationResult: """Result of orbit propagation. Attributes ---------- states : CartesianOrbits Propagated Cartesian states with optional covariance. events : Events All detected dynamical events, grouped by type. sensitivity : StateSensitivities, optional Flat per-``(orbit, epoch)`` sensitivity table — STMs and optional STTs. ``None`` when the propagation method did not produce sensitivities (Monte Carlo / SigmaPoint). tagged_covariance : TaggedCovariances, optional Flat per-``(orbit, epoch)`` provenance-tagged, resolved-kind covariance readback — the honest covariance that distinguishes a second-order close-approach ellipsoid from the bare linear ``Φ Σ₀ Φᵀ`` mapping on ``states``. ``None`` unless :func:`~empyrean.propagate` was called with ``tagged_covariance=True``. Use :meth:`tagged_covariance_series` for the per-epoch view of one orbit. """ states: CartesianOrbits events: Events sensitivity: StateSensitivities | None = None tagged_covariance: TaggedCovariances | None = None
[docs] def tagged_covariance_series(self, orbit_index: int) -> list[TaggedCovariance]: """Per-epoch tagged covariance for one orbit, as dataclasses. Yields one :class:`~empyrean.propagation.tagged_covariance.TaggedCovariance` per output epoch of the orbit at ``orbit_index`` (orbit-major, matching the input order), with each matrix re-materialized as a contiguous ``(6, 6)`` array and the provenance enums decoded. Parameters ---------- orbit_index : int Zero-based index into the input orbits (orbit-major output order). Returns ------- list[TaggedCovariance] The orbit's tagged covariance at every output epoch. Raises ------ ValueError If the result carries no tagged covariance (propagate was not called with ``tagged_covariance=True``), or ``orbit_index`` is out of range. """ if self.tagged_covariance is None: raise ValueError( "this result has no tagged covariance; call " "propagate(..., tagged_covariance=True) to populate it" ) oids = self.tagged_covariance.orbit_ids_unique() if orbit_index < 0 or orbit_index >= len(oids): raise ValueError( f"orbit_index {orbit_index} out of range (result holds {len(oids)} orbits)" ) chain = self.tagged_covariance.select("orbit_id", oids[orbit_index]) return chain.to_series()
[docs] def to_dir(self, path: str) -> None: """Write a propagation result to a directory of Parquet files.""" os.makedirs(path, exist_ok=True) self.states.to_parquet(os.path.join(path, "states.parquet")) for name, table in [ ("summary", self.events.summary), ("close_approach_starts", self.events.close_approach_starts), ("close_approach_ends", self.events.close_approach_ends), ("periapses", self.events.periapses), ("impacts", self.events.impacts), ("possible_impacts", self.events.possible_impacts), ("atmospheric_entries", self.events.atmospheric_entries), ("atmospheric_exits", self.events.atmospheric_exits), ("capture_starts", self.events.capture_starts), ("capture_ends", self.events.capture_ends), ("shadow_entries", self.events.shadow_entries), ("shadow_exits", self.events.shadow_exits), ("covariance_regime_changes", self.events.covariance_regime_changes), ]: if len(table) > 0: table.to_parquet(os.path.join(path, f"{name}.parquet")) if self.sensitivity is not None and len(self.sensitivity) > 0: self.sensitivity.to_parquet(os.path.join(path, "sensitivity.parquet")) if self.tagged_covariance is not None and len(self.tagged_covariance) > 0: self.tagged_covariance.to_parquet(os.path.join(path, "tagged_covariance.parquet"))
[docs] @classmethod def from_dir(cls, path: str) -> "PropagationResult": """Load a propagation result written by :meth:`to_dir`.""" states_path = os.path.join(path, "states.parquet") states = CartesianOrbits.from_parquet(states_path) events = Events( summary=_load_event_table(EventSummary, path, "summary"), close_approach_starts=_load_event_table( CloseApproachStarts, path, "close_approach_starts" ), close_approach_ends=_load_event_table(CloseApproachEnds, path, "close_approach_ends"), periapses=_load_event_table(Periapses, path, "periapses"), impacts=_load_event_table(Impacts, path, "impacts"), possible_impacts=_load_event_table(PossibleImpacts, path, "possible_impacts"), atmospheric_entries=_load_event_table(AtmosphericEntries, path, "atmospheric_entries"), atmospheric_exits=_load_event_table(AtmosphericExits, path, "atmospheric_exits"), capture_starts=_load_event_table(CaptureStarts, path, "capture_starts"), capture_ends=_load_event_table(CaptureEnds, path, "capture_ends"), shadow_entries=_load_event_table(ShadowEntries, path, "shadow_entries"), shadow_exits=_load_event_table(ShadowExits, path, "shadow_exits"), covariance_regime_changes=_load_event_table( CovarianceRegimeChanges, path, "covariance_regime_changes" ), ) sens_path = os.path.join(path, "sensitivity.parquet") sensitivity = ( StateSensitivities.from_parquet(sens_path) if os.path.exists(sens_path) else None ) tagged_path = os.path.join(path, "tagged_covariance.parquet") tagged_covariance = ( TaggedCovariances.from_parquet(tagged_path) if os.path.exists(tagged_path) else None ) return cls( states=states, events=events, sensitivity=sensitivity, tagged_covariance=tagged_covariance, )