"""Ephemeris generation."""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
import numpy as np
from empyrean._convert import (
_COORD_TYPE_MAP,
AnyOrbits,
coordinates_to_arrays,
extract_photometry,
)
if TYPE_CHECKING:
import pyarrow as pa
from empyrean.ephemeris.result import EphemerisConfig, EphemerisResult
from empyrean.observers.observers import Observers
from empyrean.propagation.config import (
_DATACLASS_TO_INT,
_FORCE_MODEL_TO_INT,
_UNCERTAINTY_METHOD_TO_INT,
ForceModelTier,
MonteCarlo,
PropagationConfig,
SigmaPoint,
UncertaintyMethod,
)
FloatArray = np.ndarray[Any, np.dtype[np.float64]]
UncertaintyMethodLike = UncertaintyMethod | SigmaPoint | MonteCarlo | str
[docs]
def generate_ephemeris(
orbits: AnyOrbits,
observers: Observers,
config: EphemerisConfig | None = None,
*,
# Sugar for quick inline overrides on the embedded
# PropagationConfig. Ignored when `config` is passed.
#
# Sugar mirrors top-level config knobs only — integrator-tuning
# parameters nested under `config.propagation.advanced` (epsilon,
# step bounds, loop guards) are deliberately not surfaced here.
# Reach for the structured config when you need them.
force_model: ForceModelTier | str | None = None,
uncertainty_method: UncertaintyMethodLike | None = None,
) -> EphemerisResult:
"""Generate predicted ephemeris (RA/Dec) for orbits at observer locations.
Parameters
----------
orbits : CartesianOrbits | CometaryOrbits | KeplerianOrbits | SphericalOrbits
Input orbits with optional covariance and non-gravitational
parameters.
observers : Observers
Observer states from ``get_observer_states()``.
config : EphemerisConfig, optional
Full configuration. Construct with
``EphemerisConfig(propagation=PropagationConfig(...), ...)``.
If omitted, one is built from the sugar kwargs below (or
defaults).
Other Parameters
----------------
force_model : ForceModelTier or str, optional
Quick override for ``config.propagation.force_model``. Ignored
if ``config`` is given.
uncertainty_method : UncertaintyMethod | SigmaPoint | MonteCarlo | str, optional
Quick override for ``config.propagation.uncertainty_method``.
``SECOND_ORDER`` is what populates observation Hessians on the
resulting :class:`~empyrean.types.ObservationSensitivity`.
Ignored if ``config`` is given.
Returns
-------
EphemerisResult
Wraps the :class:`~empyrean.types.Ephemeris` table and, when
input covariance is carried, the observation-partials
:class:`~empyrean.types.ObservationSensitivity` container.
Examples
--------
Defaults (Standard force model, FirstOrder uncertainty):
>>> result = empyrean.generate_ephemeris(orbits, observers)
With a config object:
>>> cfg = EphemerisConfig(
... propagation=PropagationConfig(
... force_model=ForceModelTier.FULL,
... uncertainty_method=UncertaintyMethod.SECOND_ORDER,
... ),
... compute_diagnostics=False,
... )
>>> result = empyrean.generate_ephemeris(orbits, observers, cfg)
"""
from empyrean._empyrean_rs import _generate_ephemeris
from empyrean.ephemeris.result import Ephemeris, EphemerisResult
# ── Assemble EphemerisConfig ──────────────────────────────
if config is None:
# PropagationConfig.force_model is typed as ForceModelTier, while the
# `force_model` sugar additionally accepts a str. Resolve a str tier
# to its ForceModelTier member here so the constructed config carries
# the precise enum type; the case-insensitive lookup mirrors the
# downstream `_FORCE_MODEL_TO_INT` mapping.
force_model_tier: ForceModelTier
if force_model is None:
force_model_tier = ForceModelTier.STANDARD
elif isinstance(force_model, str):
force_model_tier = ForceModelTier(force_model.lower())
else:
force_model_tier = force_model
prop = PropagationConfig(
force_model=force_model_tier,
uncertainty_method=(
uncertainty_method
if uncertainty_method is not None
else UncertaintyMethod.FIRST_ORDER
),
)
config = EphemerisConfig(propagation=prop)
elif any(v is not None for v in (force_model, uncertainty_method)):
raise TypeError(
"generate_ephemeris(): pass either `config` or the sugar kwargs "
"(force_model / uncertainty_method), not both"
)
# Pull fields off the config
force_model = config.propagation.force_model
uncertainty_method = config.propagation.uncertainty_method
epsilon = config.propagation.epsilon
# ── Extract coordinate arrays from orbits ────────────────
coords = orbits.coordinates
coord_type = type(coords)
if coord_type not in _COORD_TYPE_MAP:
raise TypeError(f"unsupported coordinate type: {coord_type}")
(
epochs_arr,
elements_arr,
covariances_arr,
has_cov_arr,
representations_arr,
frames_arr,
origins_arr,
) = coordinates_to_arrays(coords)
# IDs
orbit_ids = orbits.orbit_id.to_pylist()
if orbits.object_id is not None:
object_ids = [s if s else "" for s in orbits.object_id.to_pylist()]
else:
object_ids = [""] * len(orbits)
# Non-grav parameters
n = len(orbits)
non_grav_dts: np.ndarray | None = None
if orbits.non_grav is not None:
ng = orbits.non_grav
a1s = np.asarray(ng.a1.to_numpy(zero_copy_only=False), dtype=np.float64)
a2s = np.asarray(ng.a2.to_numpy(zero_copy_only=False), dtype=np.float64)
a3s = np.asarray(ng.a3.to_numpy(zero_copy_only=False), dtype=np.float64)
a1s = np.nan_to_num(a1s, nan=0.0)
a2s = np.nan_to_num(a2s, nan=0.0)
a3s = np.nan_to_num(a3s, nan=0.0)
# SBDB DT (days). NaN per-row → no delay; whole array
# passed only when at least one row populated.
dt_col = np.asarray(ng.dt.to_numpy(zero_copy_only=False), dtype=np.float64)
if np.isfinite(dt_col).any():
non_grav_dts = dt_col
else:
a1s = np.zeros(n, dtype=np.float64)
a2s = np.zeros(n, dtype=np.float64)
a3s = np.zeros(n, dtype=np.float64)
# Photometric parameters
phot_h, phot_g, phot_model = extract_photometry(orbits)
# ── Extract observer arrays ──────────────────────────────
obs_codes = observers.obs_code.to_pylist()
oc = observers.coordinates
obs_epochs = np.asarray(oc.epoch.to_numpy(zero_copy_only=False), dtype=np.float64)
obs_x = np.asarray(oc.x.to_numpy(zero_copy_only=False), dtype=np.float64)
obs_y = np.asarray(oc.y.to_numpy(zero_copy_only=False), dtype=np.float64)
obs_z = np.asarray(oc.z.to_numpy(zero_copy_only=False), dtype=np.float64)
obs_vx = np.asarray(oc.vx.to_numpy(zero_copy_only=False), dtype=np.float64)
obs_vy = np.asarray(oc.vy.to_numpy(zero_copy_only=False), dtype=np.float64)
obs_vz = np.asarray(oc.vz.to_numpy(zero_copy_only=False), dtype=np.float64)
# ── Map force model to int ───────────────────────────────
if isinstance(force_model, str):
fm_int = _FORCE_MODEL_TO_INT.get(force_model.lower())
if fm_int is None:
raise ValueError(f"unknown force model: {force_model}")
elif isinstance(force_model, ForceModelTier):
fm_int = _FORCE_MODEL_TO_INT[force_model]
elif isinstance(force_model, int):
fm_int = force_model
else:
raise TypeError(f"force_model must be ForceModelTier, str, or int, got {type(force_model)}")
# ── Map uncertainty method to int + params (same dispatch
# as `empyrean.propagate`) ───────────────────────────────────
sigma_n_sigma = 1.0
sigma_samples_per_plane = 8
mc_n_samples = 1000
mc_seed: int | None = 42
# GaussianMixture knobs — same defaults as `empyrean.propagate`
# since the ephemeris pipeline embeds a PropagationConfig and
# the C ABI requires the GM params be threaded through even
# when the uncertainty method isn't a mixture.
gm_threshold = 1.0
gm_max_depth = 3
gm_components_per_split = 3
if isinstance(uncertainty_method, (SigmaPoint, MonteCarlo)):
um_int = _DATACLASS_TO_INT[type(uncertainty_method)]
if isinstance(uncertainty_method, SigmaPoint):
sigma_n_sigma = uncertainty_method.n_sigma
sigma_samples_per_plane = uncertainty_method.samples_per_plane
else: # MonteCarlo
mc_n_samples = uncertainty_method.n_samples
mc_seed = uncertainty_method.seed
elif isinstance(uncertainty_method, str):
um_lookup = _UNCERTAINTY_METHOD_TO_INT.get(uncertainty_method.lower())
if um_lookup is None:
raise ValueError(f"unknown uncertainty method: {uncertainty_method}")
um_int = um_lookup
elif isinstance(uncertainty_method, UncertaintyMethod):
um_int = _UNCERTAINTY_METHOD_TO_INT[uncertainty_method]
elif isinstance(uncertainty_method, int):
um_int = uncertainty_method
else:
raise TypeError(
"uncertainty_method must be UncertaintyMethod, a SigmaPoint / "
"MonteCarlo dataclass, str, or int; got "
f"{type(uncertainty_method).__name__}"
)
# ── Call Rust ─────────────────────────────────────────────
result = _generate_ephemeris(
orbit_ids,
object_ids,
epochs_arr,
elements_arr,
covariances_arr,
has_cov_arr,
representations_arr,
frames_arr,
origins_arr,
a1s,
a2s,
a3s,
phot_h,
phot_g,
phot_model,
obs_codes,
obs_epochs,
obs_x,
obs_y,
obs_z,
obs_vx,
obs_vy,
obs_vz,
fm_int,
epsilon,
uncertainty_method=um_int,
non_grav_dts=non_grav_dts,
gm_threshold=gm_threshold,
gm_max_depth=gm_max_depth,
gm_components_per_split=gm_components_per_split,
sigma_n_sigma=sigma_n_sigma,
sigma_samples_per_plane=sigma_samples_per_plane,
mc_n_samples=mc_n_samples,
mc_seed=mc_seed,
# Thread the full nested EphemerisConfig (which embeds a full
# PropagationConfig) so light-time iteration limits, diagnostics
# toggles, integrator advanced knobs, and event-detection
# settings all reach the C ABI.
ephemeris_config_dict=config._to_wire_dict(),
)
# ── Build Ephemeris from result ──────────────────────────
#
# The C ABI's flat ephemeris dict carries: orbit_id, object_id,
# epoch, ra, dec, rho, vrho, vra, vdec, light_time, phase_angle,
# elongation, heliocentric_distance, mag, mag_sigma, obs_code, and —
# as of the parity extension — the local-horizon / sky-motion angles
# zenith_angle, azimuth, hour_angle, lunar_elongation, position_angle,
# sky_rate (all degrees; sky_rate is deg/day), NaN where the observer
# geometry made them unavailable.
#
# Aberrated state and per-row observation covariance are still NOT in
# the flat schema — those remain NaN / null placeholders here.
from empyrean.coordinates.coordinates import (
CartesianCoordinates,
SphericalCoordinates,
)
from empyrean.coordinates.enums import Frame, Origin
m = len(result["epoch"])
object_id_list = [s if s else None for s in result["object_id"]]
coordinates = SphericalCoordinates.from_kwargs(
epoch=np.asarray(result["epoch"]),
rho=np.asarray(result["rho"]),
lon=np.asarray(result["ra"]),
lat=np.asarray(result["dec"]),
vrho=np.asarray(result["vrho"]),
vlon=np.asarray(result["vra"]),
vlat=np.asarray(result["vdec"]),
frame=Frame.ICRF.value,
origin=result["obs_code"],
)
def _nullable_float(key: str) -> pa.Array | FloatArray:
arr: FloatArray = np.asarray(result[key], dtype=np.float64)
mask = np.isnan(arr)
if mask.any():
import pyarrow as pa
return pa.array(arr.tolist(), type=pa.float64(), mask=mask)
return arr
nan_col = np.full(m, np.nan)
# `aberrated_state` is `nullable=True` but its inner CartesianCoordinates
# still requires its `frame` StringAttribute to be set during quivr
# validation, even when every row is null. Provide an all-NaN
# placeholder with `Frame.ICRF` to satisfy the schema; the C ABI
# doesn't carry aberrated state, so callers should treat these as
# absent rather than meaningful values.
aberrated_state = CartesianCoordinates.from_kwargs(
epoch=np.asarray(result["epoch"]),
x=nan_col,
y=nan_col,
z=nan_col,
vx=nan_col,
vy=nan_col,
vz=nan_col,
frame=Frame.ICRF.value,
origin=[str(Origin.SSB)] * m,
)
ephemeris = Ephemeris.from_kwargs(
orbit_id=result["orbit_id"],
object_id=object_id_list,
obs_code=result["obs_code"],
coordinates=coordinates,
aberrated_state=aberrated_state,
light_time=_nullable_float("light_time"),
phase_angle=_nullable_float("phase_angle"),
elongation=_nullable_float("elongation"),
heliocentric_distance=_nullable_float("heliocentric_distance"),
mag=_nullable_float("mag"),
mag_sigma=_nullable_float("mag_sigma"),
zenith_angle=_nullable_float("zenith_angle"),
azimuth=_nullable_float("azimuth"),
hour_angle=_nullable_float("hour_angle"),
lunar_elongation=_nullable_float("lunar_elongation"),
position_angle=_nullable_float("position_angle"),
sky_rate=_nullable_float("sky_rate"),
)
return EphemerisResult(ephemeris=ephemeris, sensitivity=None)