# Copyright (c) 2024 Massachusetts Institute of Technology
# SPDX-License-Identifier: MIT
from abc import abstractmethod
from typing import Protocol, Tuple
import astroforge.coordinates as afc
import numpy as np
from astroforge import R_sun, R_earth
from numpy.typing import NDArray
from madlib._utils import MadlibException
from ._observation import Observation, ObservationCollection
from ._satellite import Satellite
from ._utils import calc_separation_angle
[docs]
class _Sensor(Protocol):
"""Class is not yet implemented"""
[docs]
@abstractmethod
def generate_obs_timing(self, start: float, end: float) -> NDArray[np.float64]:
"""Given a start time and an end time (in MJD) as well as the sensor's
defined parameters, generate an array of observation times (also in MJD).
Parameters
----------
start : float
Earliest possible observation timestamp (in MJD)
end : float
Latest possible observation timestamp (in MJD)
Returns
-------
NDArray[np.float64]
An array of times (in MJD) at which an observation will be made.
Raises
------
NotImplementedError
"""
raise NotImplementedError
[docs]
@abstractmethod
def observe(self, sat: Satellite, times: float | NDArray[np.float64]):
"""Observe a given satellite with this sensor at the times given.
Parameters
----------
sat : Satellite
Satellite object to observe
times : float | NDArray[np.float64]
Time(s) to observe the satellite. Specified as MJD in the UTC system.
Raises
------
NotImplementedError
"""
raise NotImplementedError
[docs]
class _OpticalSensor(_Sensor):
"""Generic class for optical sensors"""
id: str | None
# site metric accuracy
dra: float # arcsec
ddec: float # arcsec
# timing
obs_per_collect: int | tuple[int, int]
obs_time_spacing: float
collect_gap_mean: float
collect_gap_std: float
# observation limits
obs_limits: dict | None
[docs]
def __init__(
self,
dra: float,
ddec: float,
collect_gap_mean: float,
obs_limits: dict | None = None,
collect_gap_std: float = 0.0,
obs_per_collect: int | tuple[int, int] = 1,
obs_time_spacing: float = 1.0,
students_dof: int | None = None,
id: str | None = None,
cross_tag: Satellite | None = None,
cross_tag_limit_arcsec: float = 100.0,
):
"""Construct an _OpticalSensor object
Parameters
----------
dra : float
Metric accuracy in the right ascension direction
ddec : float
Metric accuracy in the declination direction
collect_gap_mean : float
Average time (seconds) between collects on a satellite
obs_limits : dict | None, optional
Dictionary of the limits on what this sensor can observe. This can be
obvious (e.g. can't observe through the Earth), or based on sensor
sensitivity (e.g. range limit for radar), by default None
collect_gap_std : float, optional
Standard deviation of times (seconds) between collects on a satellite, by default 0.0
obs_per_collect : int | tuple[int, int], optional
Typical number of observations per collect. This can be a constant, or
a tuple of (min, max) for randomly sampling, by default 1
obs_time_spacing : float, optional
Time between observations within a collect, by default 1.0
id : str | None, optional
Unique Sensor ID, by default None
cross_tag : madlib.Satellite | None, optional
An optional nearby satellite that the sensor has misattributed
as the target, by default None (no misattribution)
cross_tag_limit_arcsec : float, optional
The maximum allowed separation (in arcseconds) between the target's
expected position and the cross_tag satellite. At further separations,
observations of the misattributed satellite will be ignored.
By default 100.0
students_dof : int | None, optional
Degrees of freedom to use for T-distribution when generating noise, by default None (Gaussian)
"""
self.dra = dra
self.ddec = ddec
self.obs_per_collect = obs_per_collect
self.obs_time_spacing = obs_time_spacing
self.collect_gap_mean = collect_gap_mean
self.collect_gap_std = collect_gap_std
self.obs_limits = obs_limits
self.id = id
self.cross_tag = cross_tag
self.cross_tag_limit_arcsec = cross_tag_limit_arcsec
self.students_dof = students_dof
[docs]
def generate_obs_timing(self, start: float, end: float) -> NDArray[np.float64]:
"""Randomly generate a realistic time sampling of observations.
Parameters
----------
start : float
Timestamp (MJD) corresponding to the beginning of the observing period
end : float
Timestamp (MJD) corresponding to the end of the observing period
Returns
-------
NDArray[np.float64]
Array of MJD timestamps corresponding to typical observing times on a satellite
Raises
------
ValueError
Start timestamp must be before end timestamp
"""
if start >= end:
raise ValueError(f"start ({start}) must be before end ({end})")
def gap_gen() -> float:
return self.collect_gap_mean + self.collect_gap_std * np.random.randn()
istuple = isinstance(self.obs_per_collect, (tuple, list))
def nobs_gen() -> int:
if istuple:
return np.random.randint(
self.obs_per_collect[0], self.obs_per_collect[1] + 1 # type: ignore
)
else:
return self.obs_per_collect # type: ignore
# --- generate collect times
collect_times = []
latest = start
first = True
while latest < end:
# generate a gap time at random
gap = gap_gen() / 86400
# if this is the first gap, randomly scale it between 0-1
# (otherwise the first observation will be biased late)
if first:
gap *= np.random.rand()
first = False
# add the gap time to the latest collect time, and append to the
# list if it isn't too late
latest += gap
if latest < end:
collect_times.append(latest)
# --- generate observation times within each collect
obs_times = []
for coll in collect_times:
# generate a (possibly) random number of observations in this collect
nobs = nobs_gen()
# compute the time of each observation given the collect start time
times = coll + (np.arange(nobs) * self.obs_time_spacing) / 86400
# handle edge effect at the end of the observing period
# all observations in this collect must be before the end, otherwise
# none are saved
if (times < end).all():
obs_times.append(times)
if len(obs_times) > 0:
obs_times = np.hstack(obs_times)
else:
obs_times = np.array([])
return obs_times
[docs]
def validate_limits(self, obs: Observation) -> bool:
"""Determine whether Observation is possible based on the sensor limits.
Parameters
----------
obs : Observation
madlib.Observation class for holding observables
Returns
-------
bool
Whether or not the Observation is possible
"""
if not self.obs_limits:
return True
flag = True
obsdict = obs.__dict__
for key, val in self.obs_limits.items():
if key in obsdict:
if obsdict[key] is not None:
flag = flag and obsdict[key] > val[0] and obsdict[key] < val[1]
else:
msg = f"ERROR: Invalid observation, value of {key} is None"
raise MadlibException(msg)
else:
msg = (
f"ERROR: {key} is an invalid variable for observation limits. "
"Check the YAML configuration for sensor {self.id}"
)
raise MadlibException(msg)
return flag
[docs]
class GroundOpticalSensor(_OpticalSensor):
"""Class for modeling the observing of a satellite with an optical sensor"""
# site geography
_site_reported_itrs: NDArray[np.float64]
lat: float
lon: float
alt: float
# site metric accuracy
dra: float # arcsec
ddec: float # arcsec
# timing
obs_per_collect: int | tuple[int, int]
collect_gap_mean: float
collect_gap_std: float
# observation limits
obs_limits: dict | None
[docs]
def __init__(
self,
lat: float,
lon: float,
alt: float,
dra: float,
ddec: float,
collect_gap_mean: float,
obs_limits: dict | None = None,
collect_gap_std: float = 0.0,
obs_per_collect: int | tuple[int, int] = 1,
obs_time_spacing: float = 1.0,
id: str | None = None,
lat_truth: float | None = None,
lon_truth: float | None = None,
alt_truth: float | None = None,
cross_tag: Satellite | None = None,
cross_tag_limit_arcsec: float = 100.0,
**extras,
):
"""Construct a GroundOpticalSensor object
Parameters
----------
lat : float
Site geodetic latitude (deg)
lon : float
Site geodetic longitude (deg)
alt : float
Site altitude above WGS-84 reference ellipsoid
dra : float
Metric accuracy in the right ascension direction
ddec : float
Metric accuracy in the declination direction
collect_gap_mean : float
Average time (seconds) between collects on a satellite
obs_limits : dict | None, optional
Dictionary of the limits on what this sensor can observe. This can be
obvious (e.g. can't observe through the Earth), or based on sensor
sensitivity (e.g. range limit for radar), by default None
collect_gap_std : float, optional
Standard deviation of times (seconds) between collects on a satellite, by default 0.0
obs_per_collect : int | tuple[int, int], optional
Typical number of observations per collect. This can be a constant, or
a tuple of (min, max) for randomly sampling, by default 1
obs_time_spacing : float, optional
Time between observations within a collect, by default 1.0
id : str | None, optional
Unique Sensor ID, by default None
lat_truth : float | None, optional
If not None, the latitude specified by <lat> is reported by
the sensor as its position, but it is incorrect. The actual
position is specified by <lat_truth>. By default None.
lon_truth : float | None, optional
If not None, the longitude specified by <lon> is reported by
the sensor as its position, but it is incorrect. The actual
position is specified by <lon_truth>. By default None.
alt_truth : float | None, optional
If not None, the altitude specified by <alt> is reported by
the sensor as its position, but it is incorrect. The actual
position is specified by <alt_truth>. By default None.
cross_tag : madlib.Satellite | None, optional
An optional nearby satellite that the sensor has misattributed
as the target, by default None (no misattribution)
cross_tag_limit_arcsec : float, optional
The maximum allowed separation (in arcseconds) between the target's
expected position and the cross_tag satellite. At further separations,
observations of the misattributed satellite will be ignored.
By default 100.0
"""
super().__init__(
dra=dra,
ddec=ddec,
collect_gap_mean=collect_gap_mean,
obs_limits=obs_limits,
collect_gap_std=collect_gap_std,
obs_per_collect=obs_per_collect,
obs_time_spacing=obs_time_spacing,
id=id,
cross_tag=cross_tag,
cross_tag_limit_arcsec=cross_tag_limit_arcsec,
)
self.lat = lat
self.lon = lon
self.alt = alt
self._site_reported_itrs = afc.LatLonAltToITRS(lat, lon, alt)
self.lat_truth = lat
self.lon_truth = lon
self.alt_truth = alt
if lat_truth is not None:
self.lat_truth = lat_truth
if lon_truth is not None:
self.lon_truth = lon_truth
if alt_truth is not None:
self.alt_truth = alt_truth
self._site_truth_itrs = afc.LatLonAltToITRS(
self.lat_truth, self.lon_truth, self.alt_truth
)
[docs]
def observe(
self,
target_satellite: Satellite,
times: float | NDArray[np.float64] | Tuple[float, float],
) -> ObservationCollection:
"""Observe a satellite with this sensor model at the times given. Observations
are computed in three forms:
- truth: Measurements that would be returned if there were no random or
systematic errors and knowledge of the orbit and maneuvers was perfect.
These are the observations you would see in a perfect world.
- expected: Measurements without random noise, but also without maneuver knowledge
and with any systematic errors on sensor position and/or satellite orbit.
These are the observations that you THINK you should get, given your actual knowledge
of the system.
- measured: The actual output of the simulated system, including all random and
systematic sources of error and following the target satellite's full trajectory.
This can also contain cross-tag events.
Parameters
----------
target_satellite : Satellite
madlib.Satellite object to observe
times : float | NDArray[np.float64] | Tuple[float, float]
Time(s) to observe the satellite. Specified as MJD in the UTC system.
- If a single float is given, observation will be made at just that time.
- If a numpy array is given, observations will be made at each time in the array.
- If a tuple of two floats is given, they will represent the start and end
of observations, and observation times will be generated accordingly
Returns
-------
ObservationCollection
A container object holding a realistic observation of the satellite
given the sensor noise parameters and the "true" observation that
excludes all noise sources.
"""
# Handle scalar and tuple time inputs
if isinstance(times, (float, int)):
times = np.asarray([times])
if isinstance(times, tuple):
start_mjd = min(times)
end_mjd = max(times)
times = self.generate_obs_timing(start_mjd, end_mjd)
num_obs = len(times)
# Return empty ObservationCollection if there are no observation times
if num_obs == 0:
pos_observed = np.array([])
pos_truth = np.array([])
pos_expected = np.array([])
return ObservationCollection(
pos_observed=pos_observed,
pos_truth=pos_truth,
pos_expected=pos_expected,
)
# Propagate satellite to the desired times, with and without maneuvers
x_target, v_target = target_satellite.propagate(times, use_true_orbit=True)
x_target_expected, v_target_expected = target_satellite.propagate(
times,
ignore_maneuvers=True,
)
# Coordinate conversion to observables
# (observables for an optical sensor are RA/Dec)
# First rotate the site's reported and true locations into TETED
x_site_reported, v_site_reported = self._site_loc_TETED(times)
x_site_truth, v_site_truth = self._site_loc_TETED(times, use_true_location=True)
# Now compute the true and expected RA/Dec, range, and range_rate
(fp_state_truth, r_truth, r_rate_truth) = afc.PosVelToFPState(
x_target, v_target, x_site_truth, v_site_truth
)
ra_truth = fp_state_truth[:, 0] * 180 / np.pi
dec_truth = fp_state_truth[:, 1] * 180 / np.pi
az_truth, el_truth = self._eci_to_az_el(x_target, times)
(fp_state_expected, r_expected, r_rate_expected) = afc.PosVelToFPState(
x_target_expected,
v_target_expected,
x_site_reported,
v_site_reported,
)
ra_expected = fp_state_expected[:, 0] * 180 / np.pi
dec_expected = fp_state_expected[:, 1] * 180 / np.pi
az_expected, el_expected = self._eci_to_az_el(x_target_expected, times)
# Compute solar elevation at observation times (assumed invariant to expectation (small errors))
x_sun = np.array([R_sun(_t) for _t in times])
az_sun, el_sun = self._eci_to_az_el(x_sun, times)
# Collect into true (what you SHOULD see) and expected (what you THINK you'll see) observations
obs_truth: np.ndarray[Observation, np.dtype[np.float64]] = np.array(
[
Observation(
mjd=times[n],
ra=ra_truth[n],
dec=dec_truth[n],
az=az_truth[n],
el=el_truth[n],
sun_el=el_sun[n],
sensor_id=self.id,
)
for n in range(num_obs)
]
)
obs_expected: np.ndarray[Observation, np.dtype[np.float64]] = np.array(
[
Observation(
mjd=times[n],
ra=ra_expected[n],
dec=dec_expected[n],
az=az_expected[n],
el=el_expected[n],
sun_el=el_sun[n],
sensor_id=self.id,
)
for n in range(num_obs)
]
)
### Actual/Observed measurements
# Apply misattributed observations (cross-tags), if requested
if self.cross_tag is not None:
x_target, v_target = self.cross_tag.propagate(times, use_true_orbit=True)
# Coordinate conversion to observables using target's and sensor's true positions
(fp_state_measured, r_measured, r_rate_measured) = afc.PosVelToFPState(
x_target, v_target, x_site_truth, v_site_truth
)
ra_measured = fp_state_measured[:, 0] * 180 / np.pi
dec_measured = fp_state_measured[:, 1] * 180 / np.pi
# If this isn't a cross-tag, then the measurement is just the noise-addled truth
else:
r_measured = r_truth
ra_measured = ra_truth
dec_measured = dec_truth
# Apply noise sources
ra_delta = self.dra / 3600 * np.random.randn(num_obs)
dec_delta = self.ddec / 3600 * np.random.randn(num_obs)
ra_noisy = ra_measured + ra_delta
dec_noisy = dec_measured + dec_delta
x_noisy = np.zeros(x_target.shape)
for n in range(num_obs):
x_noisy[n] = x_site_truth[n] + r_measured[n] * spherical_to_cartesian(
ra_noisy[n], dec_noisy[n]
)
az_noisy, el_noisy = self._eci_to_az_el(x_noisy, times)
# Collect actual observables (what you DO see)
obs_measured: np.ndarray[Observation, np.dtype[np.float64]] = np.array(
[
Observation(
mjd=times[n],
ra=ra_noisy[n],
dec=dec_noisy[n],
az=az_noisy[n],
el=el_noisy[n],
range_=None, # N/A for optical sensors
range_rate=None, # N/A for optical sensors
sun_el=el_sun[n],
sensor_id=self.id,
)
for n in range(num_obs)
]
)
# Remove observations that are not within the sensor limits
valid = np.array([self.validate_limits(x) for x in obs_truth])
obs_measured = obs_measured[valid]
obs_truth = obs_truth[valid]
obs_expected = obs_expected[valid]
observations = ObservationCollection(
pos_observed=obs_measured,
pos_truth=obs_truth,
pos_expected=obs_expected,
)
return observations
[docs]
def _eci_to_az_el(
self, x: NDArray[np.float64], mjd: NDArray[np.float64]
) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
"""Convert TETED positions to Az/El from the site.
Parameters
----------
x : NDArray[np.float64]
TETED positions
mjd : NDArray[np.float64]
timestamps (mjd)
Returns
-------
tuple[NDArray[np.float64], NDArray[np.float64]]
Az/El results
"""
# rotate TETED -> ITRS -> SEZ -> Az/El
out = []
for n in range(len(x)):
# breakpoint()
xitrs = afc.TETEDToITRS(mjd[n], x[n])
xsez = afc.ITRSToSEZ(xitrs, self._site_reported_itrs, self.lat, self.lon)
az, el, _ = afc.SEZToAzElRange(xsez)
out.append([az, el])
out = np.vstack(out)
return out[:, 0], out[:, 1]
[docs]
def _site_loc_TETED(
self,
times: NDArray[np.float64],
use_true_location: bool = False,
) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
"""Rotate the site location to TETED for a given set of times.
Parameters
----------
times : NDArray[np.float64]
timestamps at which to compute the site location
use_true_location : bool, optional
If True, use the sensor's true position (as opposed to reported)
in the calculation. By default False.
Returns
-------
tuple[NDArray[np.float64], NDArray[np.float64]]
site locations for timestamps
"""
if use_true_location:
site_itrs = self._site_truth_itrs
else:
site_itrs = self._site_reported_itrs
temp = np.vstack(
[
afc.PosVelConversion(
afc.ITRSToTETED, times[n], site_itrs, np.zeros((3,))
)
for n in range(len(times))
]
)
xsite = temp[0::2]
vsite = temp[1::2]
return xsite, vsite
[docs]
class SpaceOpticalSensor(_OpticalSensor):
"""Class for modeling observations with an optical sensor in Earth orbit."""
# site metric accuracy
dra: float # arcsec
ddec: float # arcsec
# timing
obs_per_collect: int | tuple[int, int]
collect_gap_mean: float
collect_gap_std: float
# observation limits
obs_limits: dict | None
[docs]
def __init__(
self,
sensor_satellite: Satellite,
dra: float,
ddec: float,
collect_gap_mean: float,
sensor_satellite_truth: Satellite | None = None,
obs_limits: dict | None = None,
collect_gap_std: float = 0.0,
obs_per_collect: int | tuple[int, int] = 1,
obs_time_spacing: float = 1.0,
id: str | None = None,
cross_tag: Satellite | None = None,
cross_tag_limit_arcsec: float = 100.0,
**extras,
):
"""Construct a SpaceOpticalSensor object
Parameters
----------
sensor_satellite : madlib.Satellite
Satellite object governing sensor motion
dra : float
Metric accuracy in the right ascension direction
ddec : float
Metric accuracy in the declination direction
collect_gap_mean : float
Average time (seconds) between collects on a satellite
sensor_satellite_truth: madlib.Satellite | None, optional
If not None, this defines the *actual* orbit of the satellite, turning
the <satellite> argument into the *expected* orbit. This will affect
residual calculations. By default, None.
obs_limits : dict | None, optional
Dictionary of the limits on what this sensor can observe. This can be
obvious (e.g. can't observe through the Earth), or based on sensor
sensitivity (e.g. range limit for radar), by default None
collect_gap_std : float, optional
Standard deviation of times (seconds) between collects on a satellite, by default 0.0
obs_per_collect : int | tuple[int, int], optional
Typical number of observations per collect. This can be a constant, or
a tuple of (min, max) for randomly sampling, by default 1
obs_time_spacing : float, optional
Time between observations within a collect, by default 1.0
id : str | None, optional
Unique Sensor ID, by default None
cross_tag : madlib.Satellite | None, optional
An optional nearby satellite that the sensor has misattributed
as the target, by default None (no misattribution)
cross_tag_limit_arcsec : float, optional
The maximum allowed separation (in arcseconds) between the target's
expected position and the cross_tag satellite. At further separations,
observations of the misattributed satellite will be ignored.
By default 100.0
"""
super().__init__(
dra=dra,
ddec=ddec,
collect_gap_mean=collect_gap_mean,
obs_limits=obs_limits,
collect_gap_std=collect_gap_std,
obs_per_collect=obs_per_collect,
obs_time_spacing=obs_time_spacing,
id=id,
cross_tag=cross_tag,
cross_tag_limit_arcsec=cross_tag_limit_arcsec,
)
self.sensor_satellite = sensor_satellite
self.sensor_satellite_truth = sensor_satellite_truth
[docs]
def observe(
self,
target_satellite: Satellite,
times: float | NDArray[np.float64] | Tuple[float, float],
):
"""Observe a satellite with this sensor model at the times given. Observations
are computed in three forms:
- truth: Measurements that would be returned if there were no random or
systematic errors and knowledge of the target's orbit and maneuvers was perfect.
These are the observations you would see in a perfect world.
- expected: Measurements without random noise, but also without maneuver knowledge
and with any systematic errors on sensor position and/or satellite orbit.
These are the observations that you THINK you should get, given your actual knowledge
of the system.
- measured: The actual output of the simulated system, including all random and
systematic sources of error and following the target satellite's full trajectory.
This can also contain cross-tag events.
Parameters
----------
target_satellite : Satellite
madlib.Satellite object to observe
times : float | NDArray[np.float64] | Tuple[float, float]
Time(s) to observe the satellite. Specified as MJD in the UTC system.
- If a single float is given, observation will be made at just that time.
- If a numpy array is given, observations will be made at each time in the array.
- If a tuple of two floats is given, they will represent the start and end
of observations, and observation times will be generated accordingly
Returns
-------
ObservationCollection
A container object holding a realistic observation of the satellite
given the sensor noise parameters and the "true" observation that
excludes all noise sources.
"""
# Handle scalar and tuple time inputs
if isinstance(times, (float, int)):
times = np.asarray([times])
if isinstance(times, tuple):
start_mjd = min(times)
end_mjd = max(times)
times = self.generate_obs_timing(start_mjd, end_mjd)
num_obs = len(times)
# Return empty ObservationCollection if there are no observation times
if num_obs == 0:
pos_observed = np.array([])
pos_truth = np.array([])
pos_expected = np.array([])
return ObservationCollection(
pos_observed=pos_observed,
pos_truth=pos_truth,
pos_expected=pos_expected,
)
# Propagate sensor expected position to the desired times
x_sensor_reported, v_sensor_reported = self.sensor_satellite.propagate(times)
# If a true orbit has been given, propagate it as well
if self.sensor_satellite_truth is not None:
x_sensor_truth, v_sensor_truth = self.sensor_satellite_truth.propagate(
times
)
else:
x_sensor_truth = x_sensor_reported
v_sensor_truth = v_sensor_reported
# Propagate satellite's true and expected orbits to the desired times
x_target, v_target = target_satellite.propagate(times, use_true_orbit=True)
x_target_expected, v_target_expected = target_satellite.propagate(
times, ignore_maneuvers=True
)
# Compute true and expected RA/Dec, range, and range_rate
(fp_state_truth, r_truth, r_rate_truth) = afc.PosVelToFPState(
x_target,
v_target,
x_sensor_truth,
v_sensor_truth,
)
ra_truth = fp_state_truth[:, 0] * 180 / np.pi
dec_truth = fp_state_truth[:, 1] * 180 / np.pi
(fp_state_expected, r_expected, r_rate_expected) = afc.PosVelToFPState(
x_target_expected,
v_target_expected,
x_sensor_reported,
v_sensor_reported,
)
ra_expected = fp_state_expected[:, 0] * 180 / np.pi
dec_expected = fp_state_expected[:, 1] * 180 / np.pi
# Compute solar separation angle at observation times
# (using the sensor's true orbit)
x_sun = np.array([R_sun(_t) for _t in times])
sun_angle_deg = calc_separation_angle(
x_sun - x_sensor_truth,
x_target - x_sensor_truth,
in_deg=True,
)
# Collect into true (what you SHOULD see) and expected (what you THINK you'll see) observations
obs_truth: np.ndarray[Observation, np.dtype[np.float64]] = np.array(
[
Observation(
mjd=times[n],
ra=ra_truth[n],
dec=dec_truth[n],
sun_separation=sun_angle_deg[n],
sensor_id=self.id,
)
for n in range(num_obs)
]
)
obs_expected: np.ndarray[Observation, np.dtype[np.float64]] = np.array(
[
Observation(
mjd=times[n],
ra=ra_expected[n],
dec=dec_expected[n],
sun_separation=sun_angle_deg[n],
sensor_id=self.id,
)
for n in range(num_obs)
]
)
### Actual/Observed Measurements
# Apply misattributed observations, if requested
if self.cross_tag is not None:
x_target, v_target = self.cross_tag.propagate(times, use_true_orbit=True)
# Compute (observed) RA/Dec, range, and range_rate
(fp_state_obs, r_obs, r_rate_obs) = afc.PosVelToFPState(
x_target, v_target, x_sensor_truth, v_sensor_truth
)
ra_measured = fp_state_obs[:, 0] * 180 / np.pi
dec_measured = fp_state_obs[:, 1] * 180 / np.pi
# If this isn't a cross-tag, then the measurement is just the noise-addled truth
else:
ra_measured = ra_truth
dec_measured = dec_truth
# Apply noise sources
ra_delta = self.dra / 3600 * np.random.randn(num_obs)
dec_delta = self.ddec / 3600 * np.random.randn(num_obs)
ra_noisy = ra_measured + ra_delta
dec_noisy = dec_measured + dec_delta
# Sun angle stays the same (we want to know if it's *actually*
# too bright to observe)
# Collect actual observables (what you DO see)
obs_measured: np.ndarray[Observation, np.dtype[np.float64]] = np.array(
[
Observation(
mjd=times[n],
ra=ra_noisy[n],
dec=dec_noisy[n],
sun_separation=sun_angle_deg[n],
sensor_id=self.id,
)
for n in range(num_obs)
]
)
# Remove observations that are not within the sensor limits
## Any viewing vector that is obscured by Earth is not valid
sensor_dist = np.linalg.norm(x_sensor_truth, axis=1)
viewing_vector = x_target - x_sensor_truth
### Calculate the angle from center of Earth, to sensor, to target.
### Note: If this angle is obtuse, it is automatically valid.
viewing_angle = np.pi - np.abs(
calc_separation_angle(x_sensor_truth, viewing_vector)
)
### Calculate minimum height above Earth for viewing vector
min_view_height = sensor_dist * np.sin(viewing_angle)
view_valid = (min_view_height > R_earth) | (viewing_angle > np.pi / 2)
## Check the other sensor limits, then combine
valid = view_valid * np.array([self.validate_limits(x) for x in obs_truth])
obs_measured = obs_measured[valid]
obs_truth = obs_truth[valid]
obs_expected = obs_expected[valid]
observations = ObservationCollection(
pos_observed=obs_measured,
pos_truth=obs_truth,
pos_expected=obs_expected,
)
return observations
[docs]
def pos_to_lat_lon(
pos: NDArray[np.float64], times: NDArray[np.float64]
) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
"""Compute latitude & longitude for given position(s) and time(s).
Parameters
----------
pos : NDArray[np.float64]
position vector
times : NDArray[np.float64]
array of timestamps
Returns
-------
tuple[NDArray[np.float64], NDArray[np.float64]]
computed latitudes and longitudes
Raises
------
ValueError
The number of positions and times must be equal (for now)
"""
# TODO(jake): refactor this code so as not to make the assumption that pos is a 2D array
if len(pos) != len(times):
raise ValueError("The number of positions and times must be equal (for now)")
N = len(pos)
# first rotate from TETED to ITRS
pos_itrs = np.vstack([afc.TETEDToITRS(times[n], pos[n]) for n in range(N)])
# compute lat/lon/alt from ITRS position
lla = np.vstack(
[afc.ITRSToLatLonAlt(pos_itrs[n]) for n in range(N)], dtype=np.float64
)
return lla[:, 0], lla[:, 1]
[docs]
def spherical_to_cartesian(ra: float, dec: float) -> NDArray[np.float64]:
"""Convert spherical coordinates (ra, dec) to cartesian coordinates.
Parameters
----------
ra : float
Right Ascension
dec : float
Declination
Returns
-------
NDArray[np.float64]
Cartesian coordinates
"""
ra *= np.pi / 180
dec *= np.pi / 180
return np.array([np.cos(dec) * np.cos(ra), np.cos(dec) * np.sin(ra), np.sin(dec)])