"""Orbit propagation."""
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any, TypeVar
import numpy as np
import pyarrow as pa
import quivr as qv
if TYPE_CHECKING:
from empyrean.ephemeris.sensitivity import StateSensitivities
from empyrean.propagation.tagged_covariance import TaggedCovariances
from empyrean._convert import (
_COORD_TYPE_MAP,
AnyOrbits,
coordinates_to_arrays,
extract_photometry,
int_to_frame,
naif_to_origin,
)
from empyrean.coordinates.coordinates import CartesianCoordinates
from empyrean.coordinates.covariance import (
CartesianCovariance as _CartesianCovariance,
)
from empyrean.coordinates.covariance import _CovarianceTable
from empyrean.coordinates.epoch import Epochs
from empyrean.orbits.orbits import CartesianOrbits
from empyrean.propagation.config import (
_DATACLASS_TO_INT,
_FORCE_MODEL_TO_INT,
_UNCERTAINTY_METHOD_TO_INT,
ForceModelTier,
MonteCarlo,
PropagationConfig,
SigmaPoint,
UncertaintyMethod,
)
from empyrean.propagation.events import (
AtmosphericEntries,
AtmosphericExits,
CaptureEnds,
CaptureStarts,
CloseApproachEnds,
CloseApproachStarts,
CovarianceRegimeChanges,
EventConfig,
Events,
EventSummary,
Impacts,
Periapses,
PossibleImpacts,
ShadowEntries,
ShadowExits,
)
from empyrean.propagation.result import PropagationResult
from empyrean.propagation.tagged_covariance import _KIND_BY_CODE
# Quivr event sub-tables share the build helpers below; the helpers
# return the same concrete table subclass they are handed.
_EventTableT = TypeVar("_EventTableT", bound=qv.Table)
# ``CartesianCovariance`` is built dynamically by ``_make_covariance_class``,
# whose declared return type is the bare ``type``; that loses the injected
# ``from_matrix`` / ``from_kwargs`` constructors. Re-bind it through
# ``type[_CovarianceTable]`` (the Protocol describing exactly that
# dynamically-injected surface) so those constructors type-check at the call
# sites below. The runtime object is unchanged.
CartesianCovariance: type[_CovarianceTable] = _CartesianCovariance
UncertaintyMethodLike = UncertaintyMethod | SigmaPoint | MonteCarlo | str
[docs]
def propagate(
orbits: AnyOrbits,
epochs: Epochs | np.ndarray | Sequence[float],
config: PropagationConfig | None = None,
*,
# ── Sugar for quick, inline overrides ─────────────────────
# Any of these will populate a fresh PropagationConfig when `config`
# isn't supplied. Ignored when `config` is passed.
#
# Sugar mirrors top-level `PropagationConfig` fields only — knobs
# nested under `config.advanced` (`epsilon`, step bounds, loop
# guards) are deliberately not surfaced here. Reaching for those
# means you're tuning the integrator itself, which is a structured-
# config conversation, not an inline override.
force_model: ForceModelTier | str | None = None,
uncertainty_method: UncertaintyMethodLike | None = None,
num_threads: int | None = None,
events: EventConfig | None = None,
tagged_covariance: bool = False,
) -> PropagationResult:
"""Propagate orbits to target epochs.
Parameters
----------
orbits : CartesianOrbits | CometaryOrbits | KeplerianOrbits | SphericalOrbits
Input orbits with optional covariance and non-gravitational
parameters.
epochs : Epochs | array-like
Target epochs. An :class:`~empyrean.types.Epochs` table (converted
to TDB internally), or a 1-D array of MJD TDB values.
config : PropagationConfig, optional
Full propagation configuration. Construct with
``PropagationConfig(force_model=..., uncertainty_method=...)``
etc. If omitted, one is built from the sugar kwargs below (or
defaults).
Other Parameters
----------------
force_model : ForceModelTier or str, optional
Quick override for ``config.force_model``. Ignored if ``config``
is given.
uncertainty_method : UncertaintyMethod | SigmaPoint | MonteCarlo | str, optional
Quick override for ``config.uncertainty_method``. Accepts either
an enum / string (default parameters) or a parameterized
dataclass (:class:`SigmaPoint(n_sigma=2.0)` etc.). Ignored if
``config`` is given.
num_threads : int, optional
Threads for multi-orbit propagation. ``None`` (default) =
sequential; ``0`` = use all available cores; ``n`` > 0 = use
exactly ``n`` cores. Each orbit is integrated on a single
thread; parallelism is across orbits, not within a single
trajectory.
events : EventConfig, optional
Event-detection toggles + body filter + dense-output cadence.
Override individual flags here without rebuilding a full
:class:`PropagationConfig`. See
:class:`~empyrean.EventConfig`.
tagged_covariance : bool, default False
When ``True``, also read back the provenance-tagged,
resolved-kind covariance at every output epoch (the honest
covariance that distinguishes a second-order close-approach
ellipsoid from the bare linear ``Φ Σ₀ Φᵀ`` mapping on the
states). The result's
:attr:`~empyrean.PropagationResult.tagged_covariance` table is
populated and
:meth:`~empyrean.PropagationResult.tagged_covariance_series`
becomes usable. Off by default — the readback recomputes the
resolved kind per orbit, so it isn't free.
Returns
-------
PropagationResult
Propagated states, detected events, and per-orbit state
sensitivity chains.
Examples
--------
Defaults (Standard force model, FirstOrder uncertainty):
>>> result = empyrean.propagate(orbits, times)
With a config object:
>>> cfg = PropagationConfig(
... force_model=ForceModelTier.FULL,
... uncertainty_method=SigmaPoint(n_sigma=2.0),
... num_threads=8,
... )
>>> result = empyrean.propagate(orbits, times, cfg)
With inline kwargs (sugar):
>>> result = empyrean.propagate(orbits, times, force_model="full")
"""
from empyrean._empyrean_rs import _propagate
# ── Assemble PropagationConfig ────────────────────────────
if config is None:
# PropagationConfig.force_model is typed as ForceModelTier, while the
# `force_model` sugar additionally accepts a str. Resolve a str tier
# to its ForceModelTier member here so the constructed config carries
# the precise enum type; the case-insensitive lookup mirrors the
# downstream `_FORCE_MODEL_TO_INT` mapping.
force_model_tier: ForceModelTier
if force_model is None:
force_model_tier = ForceModelTier.STANDARD
elif isinstance(force_model, str):
force_model_tier = ForceModelTier(force_model.lower())
else:
force_model_tier = force_model
config = PropagationConfig(
force_model=force_model_tier,
uncertainty_method=(
uncertainty_method
if uncertainty_method is not None
else UncertaintyMethod.FIRST_ORDER
),
num_threads=num_threads,
events=events if events is not None else EventConfig(),
)
elif any(v is not None for v in (force_model, uncertainty_method, num_threads, events)):
raise TypeError(
"propagate(): pass either `config` or the sugar kwargs "
"(force_model / uncertainty_method / num_threads / events), "
"not both"
)
# Pull fields off the config from here on
force_model = config.force_model
uncertainty_method = config.uncertainty_method
num_threads = config.num_threads
epsilon = config.epsilon
events = config.events
if events is None:
events = EventConfig()
# ── Extract coordinate arrays from orbits ────────────────
coords = orbits.coordinates
coord_type = type(coords)
if coord_type not in _COORD_TYPE_MAP:
raise TypeError(f"unsupported coordinate type: {coord_type}")
(
epochs_arr,
elements_arr,
covariances_arr,
has_cov_arr,
representations_arr,
frames_arr,
origins_arr,
) = coordinates_to_arrays(coords)
# IDs
orbit_ids = orbits.orbit_id.to_pylist()
if orbits.object_id is not None:
object_ids = [s if s else "" for s in orbits.object_id.to_pylist()]
else:
object_ids = [""] * len(orbits)
# Non-grav parameters
n = len(orbits)
non_grav_dts: np.ndarray | None = None
# g(r) Marsden–Sekanina exponents. Passed only when a non-default g(r)
# is present (any non-zero α/r0/m/n/k); all-zero is the inverse-square
# asteroid default that the engine applies without a marshal.
ng_alphas: np.ndarray | None = None
ng_r0s: np.ndarray | None = None
ng_ms: np.ndarray | None = None
ng_ns: np.ndarray | None = None
ng_ks: np.ndarray | None = None
if orbits.non_grav is not None:
ng = orbits.non_grav
# Handle nullable columns: fill None with 0.0
a1s = np.asarray(ng.a1.to_numpy(zero_copy_only=False), dtype=np.float64)
a2s = np.asarray(ng.a2.to_numpy(zero_copy_only=False), dtype=np.float64)
a3s = np.asarray(ng.a3.to_numpy(zero_copy_only=False), dtype=np.float64)
a1s = np.nan_to_num(a1s, nan=0.0)
a2s = np.nan_to_num(a2s, nan=0.0)
a3s = np.nan_to_num(a3s, nan=0.0)
# SBDB non-grav DT (days). NaN entries → no delay; pass the
# whole array only when at least one row populated, so the
# asteroid-only case avoids an FFI marshal.
dt_col = np.asarray(ng.dt.to_numpy(zero_copy_only=False), dtype=np.float64)
if np.isfinite(dt_col).any():
non_grav_dts = dt_col
# g(r) exponents — carry the comet Marsden–Sekanina g(r) so a fitted
# or SBDB comet orbit isn't silently propagated with inverse-square.
alpha_col = np.nan_to_num(
np.asarray(ng.alpha.to_numpy(zero_copy_only=False), dtype=np.float64), nan=0.0
)
r0_col = np.nan_to_num(
np.asarray(ng.r0.to_numpy(zero_copy_only=False), dtype=np.float64), nan=0.0
)
m_col = np.nan_to_num(
np.asarray(ng.m.to_numpy(zero_copy_only=False), dtype=np.float64), nan=0.0
)
n_col = np.nan_to_num(
np.asarray(ng.n.to_numpy(zero_copy_only=False), dtype=np.float64), nan=0.0
)
k_col = np.nan_to_num(
np.asarray(ng.k.to_numpy(zero_copy_only=False), dtype=np.float64), nan=0.0
)
if (
(alpha_col != 0).any()
or (r0_col != 0).any()
or (m_col != 0).any()
or (n_col != 0).any()
or (k_col != 0).any()
):
ng_alphas = alpha_col
ng_r0s = r0_col
ng_ms = m_col
ng_ns = n_col
ng_ks = k_col
else:
a1s = np.zeros(n, dtype=np.float64)
a2s = np.zeros(n, dtype=np.float64)
a3s = np.zeros(n, dtype=np.float64)
# Photometric parameters
phot_h, phot_g, phot_model = extract_photometry(orbits)
# ── Extract times ────────────────────────────────────────
if isinstance(epochs, Epochs):
# Convert to TDB if needed
tdb = epochs.to_tdb()
times_mjd_tdb = np.asarray(tdb.mjd.to_numpy(zero_copy_only=False), dtype=np.float64)
else:
times_mjd_tdb = np.asarray(epochs, dtype=np.float64)
# ── Map force model to int ───────────────────────────────
if isinstance(force_model, str):
fm_int = _FORCE_MODEL_TO_INT.get(force_model.lower())
if fm_int is None:
raise ValueError(f"unknown force model: {force_model}")
elif isinstance(force_model, ForceModelTier):
fm_int = _FORCE_MODEL_TO_INT[force_model]
elif isinstance(force_model, int):
fm_int = force_model
else:
raise TypeError(f"force_model must be ForceModelTier, str, or int, got {type(force_model)}")
# ── Map uncertainty method to int + extract params ─────────
#
# Three input shapes:
# 1. str / UncertaintyMethod enum → default parameters
# 2. SigmaPoint / MonteCarlo dataclass → method + params
# 3. int (legacy) → default parameters
sigma_n_sigma = 1.0
sigma_samples_per_plane = 8
mc_n_samples = 1000
mc_seed: int | None = 42
if isinstance(uncertainty_method, (SigmaPoint, MonteCarlo)):
um_int = _DATACLASS_TO_INT[type(uncertainty_method)]
if isinstance(uncertainty_method, SigmaPoint):
sigma_n_sigma = uncertainty_method.n_sigma
sigma_samples_per_plane = uncertainty_method.samples_per_plane
else: # MonteCarlo
mc_n_samples = uncertainty_method.n_samples
mc_seed = uncertainty_method.seed
elif isinstance(uncertainty_method, str):
um_int_opt = _UNCERTAINTY_METHOD_TO_INT.get(uncertainty_method.lower())
if um_int_opt is None:
raise ValueError(f"unknown uncertainty method: {uncertainty_method}")
um_int = um_int_opt
elif isinstance(uncertainty_method, UncertaintyMethod):
um_int = _UNCERTAINTY_METHOD_TO_INT[uncertainty_method]
elif isinstance(uncertainty_method, int):
um_int = uncertainty_method
else:
raise TypeError(
"uncertainty_method must be UncertaintyMethod, a SigmaPoint / "
"MonteCarlo dataclass, str, or int; got "
f"{type(uncertainty_method).__name__}"
)
# ── Call Rust ─────────────────────────────────────────────
# Thread the full nested PropagationConfig as a single dict so that
# advanced fields (events.dense_output, diagnostics.*, advanced.*,
# excluded_perturbers, max_propagation_time_days, etc.) are honored
# without growing _propagate's flat-arg signature.
result = _propagate(
orbit_ids,
object_ids,
epochs_arr,
elements_arr,
covariances_arr,
has_cov_arr,
representations_arr,
frames_arr,
origins_arr,
times_mjd_tdb,
fm_int,
um_int,
a1s,
a2s,
a3s,
phot_h,
phot_g,
phot_model,
num_threads=num_threads,
epsilon=epsilon,
non_grav_dts=non_grav_dts,
ng_alphas=ng_alphas,
ng_r0s=ng_r0s,
ng_ms=ng_ms,
ng_ns=ng_ns,
ng_ks=ng_ks,
# Adaptive Gaussian Mixture is not exposed in this distribution;
# these kwargs are accepted by the binding for ABI compatibility
# but ignored. Pass benign placeholders.
gm_threshold=1.0,
gm_max_depth=3,
gm_components_per_split=3,
sigma_n_sigma=sigma_n_sigma,
sigma_samples_per_plane=sigma_samples_per_plane,
mc_n_samples=mc_n_samples,
mc_seed=mc_seed,
propagation_config_dict=config._to_wire_dict(),
with_tagged_covariance=tagged_covariance,
)
# ── Build CartesianOrbits from result ─────────────────────
states = _build_cartesian_orbits(result)
detected_events = _build_events(result)
# ── Build per-orbit StateSensitivity chains ───────────────
sensitivity = _build_state_sensitivity(result)
# ── Build provenance-tagged covariance table (opt-in) ─────
tagged = _build_tagged_covariance(result) if tagged_covariance else None
return PropagationResult(
states=states,
events=detected_events,
sensitivity=sensitivity,
tagged_covariance=tagged,
)
def _build_tagged_covariance(
result: dict[str, Any],
) -> "TaggedCovariances | None":
"""Build a :class:`TaggedCovariances` table from the pyo3 result.
The per-``(orbit, epoch)`` tagged arrays in ``result`` are aligned
1:1 with the states, so the orbit / object ids and epochs are reused
straight from the flat state columns. Returns ``None`` when the
extension emitted no tagged sub-dict.
"""
from empyrean.propagation.tagged_covariance import build_tagged_covariances
if "tagged_covariance" not in result:
return None
orbit_ids: list[str] = list(result["orbit_ids"])
object_ids: list[str | None] = [s if s else None for s in result["object_ids"]]
epochs_arr = np.asarray(result["epochs"], dtype=np.float64)
return build_tagged_covariances(result, orbit_ids, object_ids, epochs_arr)
def _build_state_sensitivity(result: dict[str, Any]) -> "StateSensitivities | None":
"""Build a :class:`StateSensitivities` table from the pyo3 result.
Flattens the per-row (6, 6) STM and (6, 6, 6) STT arrays into the
row-major lists the table expects (length 36 / 216 per row), with
``None`` per row when ``has_stm`` / ``has_stt`` is false on that row.
Returns ``None`` if no row has an STM (sample-based methods or
FirstOrder without input covariance).
"""
from empyrean.ephemeris.sensitivity import StateSensitivities
stms = np.asarray(result["stms"]) if "stms" in result else None
has_stm = np.asarray(result["has_stm"]) if "has_stm" in result else None
stts = np.asarray(result["stts"]) if "stts" in result else None
has_stt = np.asarray(result["has_stt"]) if "has_stt" in result else None
if has_stm is None or not bool(has_stm.any()):
return None
orbit_ids = list(result["orbit_ids"])
object_ids = [s if s else None for s in result["object_ids"]]
epochs_arr = np.asarray(result["epochs"], dtype=np.float64)
n = len(orbit_ids)
# Flatten STMs to length-36 lists; null per-row where has_stm is false.
if stms is not None:
stms_flat = stms.reshape(n, 36).tolist()
stm_col = [
row if (has_stm is None or bool(has_stm[i])) else None
for i, row in enumerate(stms_flat)
]
else:
stm_col = [None] * n
# STTs: only populate rows where has_stt is true; null elsewhere.
if stts is not None and has_stt is not None and bool(has_stt.any()):
stts_flat = stts.reshape(n, 216).tolist()
stt_col = [row if bool(has_stt[i]) else None for i, row in enumerate(stts_flat)]
else:
stt_col = [None] * n
# Per-row resolved covariance kind (linear / second_order / …).
rk_codes = result.get("resolved_kind")
if rk_codes is not None:
resolved_kind_col: list[str | None] = [
_KIND_BY_CODE[int(c)].value if int(c) >= 0 else None for c in np.asarray(rk_codes)
]
else:
resolved_kind_col = [None] * n
return StateSensitivities.from_kwargs(
orbit_id=orbit_ids,
object_id=object_ids,
epoch_mjd_tdb=epochs_arr,
stm=stm_col,
stt=stt_col,
resolved_kind=resolved_kind_col,
)
def _build_cartesian_orbits(result: dict[str, Any]) -> CartesianOrbits:
"""Build CartesianOrbits from the Rust result dict."""
m = len(result["epochs"])
out_epochs = np.asarray(result["epochs"])
out_x = np.asarray(result["x"])
out_y = np.asarray(result["y"])
out_z = np.asarray(result["z"])
out_vx = np.asarray(result["vx"])
out_vy = np.asarray(result["vy"])
out_vz = np.asarray(result["vz"])
out_frames = np.asarray(result["frames"])
out_origins = np.asarray(result["origins"])
out_covariances = np.asarray(result["covariances"])
out_has_cov = np.asarray(result["has_covariance"])
out_orbit_ids = result["orbit_ids"]
out_object_ids = result["object_ids"]
# Build frame attribute (all rows should have the same frame)
from empyrean.coordinates.enums import Frame as FrameEnum
frame = int_to_frame(int(out_frames[0])) if m > 0 else FrameEnum.ICRF
# Build origin column
origin_strs = [naif_to_origin(int(o)) for o in out_origins]
# Build covariance
if out_has_cov.any():
cov = CartesianCovariance.from_matrix(out_covariances)
else:
cov = None
frame_value = frame.value if hasattr(frame, "value") else frame
if cov is not None:
cart_coords = CartesianCoordinates.from_kwargs(
epoch=out_epochs,
x=out_x,
y=out_y,
z=out_z,
vx=out_vx,
vy=out_vy,
vz=out_vz,
frame=frame_value,
origin=origin_strs,
covariance=cov,
)
else:
cart_coords = CartesianCoordinates.from_kwargs(
epoch=out_epochs,
x=out_x,
y=out_y,
z=out_z,
vx=out_vx,
vy=out_vy,
vz=out_vz,
frame=frame_value,
origin=origin_strs,
)
# Convert empty strings to None for nullable object_id
object_id_list = [s if s else None for s in out_object_ids]
return CartesianOrbits.from_kwargs(
orbit_id=out_orbit_ids,
object_id=object_id_list,
coordinates=cart_coords,
)
def _nullable_float(values: np.ndarray) -> pa.Array | np.ndarray:
"""Convert a list of floats to a pyarrow array with NaN -> null."""
arr = np.asarray(values, dtype=np.float64)
mask = np.isnan(arr)
if mask.any():
return pa.array(arr.tolist(), type=pa.float64(), mask=mask)
return arr
def _nullable_str_list(values: Sequence[str | None]) -> list[str | None]:
"""Convert a list of strings to a list with empty string -> None."""
return [s if s else None for s in values]
def _build_events(result: dict[str, Any]) -> Events:
"""Build Events container from the Rust result dict.
The Rust extension returns a single flat events sub-dict with an
``event_types`` discriminator column carrying ``distance_au`` /
``distance_km`` / ``relative_velocity_au_day``. We dispatch each row
into the appropriate per-subtype quivr table. Subtype-specific fields
that the flat schema *does* carry are read across (e.g. the atmospheric
entry altitude rides in ``distance_km``); fields it does not carry
(latitude / longitude on impacts, jacobi constants on captures,
illumination on shadow events) are filled with NaN / null.
"""
ev = result.get("events")
if ev is None or len(ev["event_types"]) == 0:
return Events(
summary=EventSummary.empty(),
close_approach_starts=CloseApproachStarts.empty(),
close_approach_ends=CloseApproachEnds.empty(),
periapses=Periapses.empty(),
impacts=Impacts.empty(),
possible_impacts=PossibleImpacts.empty(),
atmospheric_entries=AtmosphericEntries.empty(),
atmospheric_exits=AtmosphericExits.empty(),
capture_starts=CaptureStarts.empty(),
capture_ends=CaptureEnds.empty(),
shadow_entries=ShadowEntries.empty(),
shadow_exits=ShadowExits.empty(),
covariance_regime_changes=CovarianceRegimeChanges.empty(),
)
orbit_ids = list(ev["orbit_ids"])
object_ids = list(ev["object_ids"])
event_types = list(ev["event_types"])
bodies = list(ev["bodies"])
epochs = np.asarray(ev["epochs"], dtype=np.float64)
distance_au = np.asarray(ev["distance_au"], dtype=np.float64)
distance_km = np.asarray(ev["distance_km"], dtype=np.float64)
rel_v = np.asarray(ev["relative_velocity_au_day"], dtype=np.float64)
# Subtype payload columns the C ABI now carries (NaN / -1 sentinels on
# rows the field doesn't apply to). `.get` with a sentinel fallback so
# older result dicts (pre-extension) still load.
n_all = len(event_types)
def _ev_f(key: str) -> np.ndarray:
return np.asarray(ev.get(key, np.full(n_all, np.nan)), dtype=np.float64)
two_body_energy = _ev_f("two_body_energy")
jacobi = _ev_f("jacobi_constant")
jacobi_sigma = _ev_f("jacobi_constant_sigma")
jacobi_l1 = _ev_f("jacobi_constant_l1")
jacobi_l2 = _ev_f("jacobi_constant_l2")
n_periapses = np.asarray(ev.get("n_periapses", np.full(n_all, -1)), dtype=np.int32)
impact_lat = _ev_f("impact_latitude_deg")
impact_lon = _ev_f("impact_longitude_deg")
impact_alt = _ev_f("impact_altitude_km")
previous_kind = np.asarray(ev.get("previous_kind", np.full(n_all, -1)), dtype=np.int64)
regime_resolved_kind = np.asarray(
ev.get("regime_resolved_kind", np.full(n_all, -1)), dtype=np.int64
)
kappa = _ev_f("kappa")
threshold_below = _ev_f("threshold_below")
threshold_above = _ev_f("threshold_above")
# Cross-cutting summary table — every event lands here.
summary = EventSummary.from_kwargs(
orbit_id=orbit_ids,
object_id=_nullable_str_list(object_ids),
event_type=event_types,
body=bodies,
epoch=epochs,
)
# Filter helpers ---------------------------------------------------
def _idx(tag: str) -> list[int]:
return [i for i, t in enumerate(event_types) if t == tag]
def _str(values: Sequence[str], idx: list[int]) -> list[str]:
return [values[i] for i in idx]
def _str_opt(values: Sequence[str], idx: list[int]) -> list[str | None]:
return [values[i] if values[i] else None for i in idx]
def _arr(values: np.ndarray, idx: list[int]) -> np.ndarray:
return values[idx] if len(idx) > 0 else np.zeros(0, dtype=values.dtype)
def _common(tag: str, cls: type[_EventTableT]) -> _EventTableT:
idx = _idx(tag)
if not idx:
return cls.empty()
return cls.from_kwargs(
orbit_id=_str(orbit_ids, idx),
object_id=_str_opt(object_ids, idx),
body=_str(bodies, idx),
epoch=_arr(epochs, idx),
distance_au=_arr(distance_au, idx),
distance_km=_arr(distance_km, idx),
)
close_approach_starts = _common("close_approach_start", CloseApproachStarts)
close_approach_ends = _common("close_approach_end", CloseApproachEnds)
# Periapses carry relative state vectors that aren't in the flat
# schema — fill those with NaN.
per_idx = _idx("periapsis")
if per_idx:
n = len(per_idx)
periapses = Periapses.from_kwargs(
orbit_id=_str(orbit_ids, per_idx),
object_id=_str_opt(object_ids, per_idx),
body=_str(bodies, per_idx),
epoch=_arr(epochs, per_idx),
distance_au=_arr(distance_au, per_idx),
distance_km=_arr(distance_km, per_idx),
relative_velocity_au_day=_arr(rel_v, per_idx),
relative_x=np.full(n, np.nan),
relative_y=np.full(n, np.nan),
relative_z=np.full(n, np.nan),
relative_vx=np.full(n, np.nan),
relative_vy=np.full(n, np.nan),
relative_vz=np.full(n, np.nan),
)
else:
periapses = Periapses.empty()
# Impacts: planetodetic surface-intercept lat/lon/alt now carried by
# the flat schema (NaN -> null where the impact geometry was
# unresolved).
imp_idx = _idx("impact")
if imp_idx:
impacts = Impacts.from_kwargs(
orbit_id=_str(orbit_ids, imp_idx),
object_id=_str_opt(object_ids, imp_idx),
body=_str(bodies, imp_idx),
epoch=_arr(epochs, imp_idx),
latitude_deg=_nullable_float(_arr(impact_lat, imp_idx)),
longitude_deg=_nullable_float(_arr(impact_lon, imp_idx)),
altitude_km=_nullable_float(_arr(impact_alt, imp_idx)),
)
else:
impacts = Impacts.empty()
# Possible impacts: probabilistic fields not in the flat schema.
pi_idx = _idx("possible_impact")
if pi_idx:
n = len(pi_idx)
possible_impacts = PossibleImpacts.from_kwargs(
orbit_id=_str(orbit_ids, pi_idx),
object_id=_str_opt(object_ids, pi_idx),
body=_str(bodies, pi_idx),
epoch=_arr(epochs, pi_idx),
miss_distance_au=_arr(distance_au, pi_idx),
miss_distance_km=_arr(distance_km, pi_idx),
effective_radius_au=np.zeros(n),
effective_radius_km=np.zeros(n),
sigma_distance_au=np.zeros(n),
ip_linear=np.zeros(n),
relative_velocity_au_day=_arr(rel_v, pi_idx),
ip_second_order=[None] * n,
nonlinearity=[None] * n,
ip_agm=[None] * n,
ip_mc=[None] * n,
)
else:
possible_impacts = PossibleImpacts.empty()
# Atmospheric entries: distance_au is the body-CENTER crossing
# distance (the Karman radius), NOT an altitude. The true altitude
# above the reference ellipsoid and the surface lat/lon come from the
# planetodetic ground track (impact_altitude_km / impact_*_deg on the
# flat event), NaN -> null when the ground track is unresolved. The
# entry speed rides in relative_velocity_au_day.
ae_idx = _idx("atmospheric_entry")
if ae_idx:
atmospheric_entries = AtmosphericEntries.from_kwargs(
orbit_id=_str(orbit_ids, ae_idx),
object_id=_str_opt(object_ids, ae_idx),
body=_str(bodies, ae_idx),
epoch=_arr(epochs, ae_idx),
distance_au=_arr(distance_au, ae_idx),
altitude_km=_nullable_float(_arr(impact_alt, ae_idx)),
relative_velocity_au_day=_nullable_float(_arr(rel_v, ae_idx)),
latitude_deg=_nullable_float(_arr(impact_lat, ae_idx)),
longitude_deg=_nullable_float(_arr(impact_lon, ae_idx)),
)
else:
atmospheric_entries = AtmosphericEntries.empty()
def _simple_entry_exit(tag: str, cls: type[_EventTableT]) -> _EventTableT:
idx = _idx(tag)
if not idx:
return cls.empty()
return cls.from_kwargs(
orbit_id=_str(orbit_ids, idx),
object_id=_str_opt(object_ids, idx),
body=_str(bodies, idx),
epoch=_arr(epochs, idx),
distance_au=_arr(distance_au, idx),
)
atmospheric_exits = _simple_entry_exit("atmospheric_exit", AtmosphericExits)
# Capture starts/ends: two-body energy + CR3BP Jacobi constants (and
# the escape periapsis count) now carried by the flat schema.
def _capture(tag: str, cls: type[_EventTableT], with_n_periapses: bool = False) -> _EventTableT:
idx = _idx(tag)
if not idx:
return cls.empty()
kwargs: dict[str, Any] = dict(
orbit_id=_str(orbit_ids, idx),
object_id=_str_opt(object_ids, idx),
body=_str(bodies, idx),
epoch=_arr(epochs, idx),
distance_au=_arr(distance_au, idx),
distance_km=_arr(distance_km, idx),
relative_velocity_au_day=_arr(rel_v, idx),
two_body_energy=_arr(two_body_energy, idx),
jacobi_constant=_nullable_float(_arr(jacobi, idx)),
jacobi_constant_sigma=_nullable_float(_arr(jacobi_sigma, idx)),
jacobi_constant_l1=_nullable_float(_arr(jacobi_l1, idx)),
jacobi_constant_l2=_nullable_float(_arr(jacobi_l2, idx)),
)
if with_n_periapses:
kwargs["n_periapses"] = _arr(n_periapses, idx)
return cls.from_kwargs(**kwargs)
capture_starts = _capture("capture_start", CaptureStarts)
capture_ends = _capture("capture_end", CaptureEnds, with_n_periapses=True)
# Shadow events: shadow_fraction / illumination not in flat schema.
def _shadow(tag: str, cls: type[_EventTableT]) -> _EventTableT:
idx = _idx(tag)
if not idx:
return cls.empty()
n = len(idx)
return cls.from_kwargs(
orbit_id=_str(orbit_ids, idx),
object_id=_str_opt(object_ids, idx),
body=_str(bodies, idx),
epoch=_arr(epochs, idx),
shadow_fraction=np.zeros(n),
illumination=np.zeros(n),
)
shadow_entries = _shadow("shadow_entry", ShadowEntries)
shadow_exits = _shadow("shadow_exit", ShadowExits)
# Covariance-regime-change events: the UncertaintyMethod.AUTO audit
# trail (linear <-> second-order transitions at CA-window boundaries).
# Kind codes: -1 = not applicable, else EMPYREAN_COVARIANCE_KIND_*.
def _kind_label(code: int) -> str | None:
return _KIND_BY_CODE[int(code)].value if int(code) >= 0 else None
crc_idx = _idx("covariance_regime_change")
if crc_idx:
covariance_regime_changes = CovarianceRegimeChanges.from_kwargs(
orbit_id=_str(orbit_ids, crc_idx),
object_id=_str_opt(object_ids, crc_idx),
body=_str_opt(bodies, crc_idx),
epoch=_arr(epochs, crc_idx),
previous_kind=[_kind_label(c) for c in _arr(previous_kind, crc_idx)],
resolved_kind=[_kind_label(c) for c in _arr(regime_resolved_kind, crc_idx)],
kappa=_nullable_float(_arr(kappa, crc_idx)),
threshold_below=_nullable_float(_arr(threshold_below, crc_idx)),
threshold_above=_nullable_float(_arr(threshold_above, crc_idx)),
)
else:
covariance_regime_changes = CovarianceRegimeChanges.empty()
return Events(
summary=summary,
close_approach_starts=close_approach_starts,
close_approach_ends=close_approach_ends,
periapses=periapses,
impacts=impacts,
possible_impacts=possible_impacts,
atmospheric_entries=atmospheric_entries,
atmospheric_exits=atmospheric_exits,
capture_starts=capture_starts,
capture_ends=capture_ends,
shadow_entries=shadow_entries,
shadow_exits=shadow_exits,
covariance_regime_changes=covariance_regime_changes,
)