Source code for madlib._sensor_collection
# Copyright (c) 2024 Massachusetts Institute of Technology
# SPDX-License-Identifier: MIT
from pathlib import Path
from typing import List, Sequence
import numpy as np
import yaml
from jsonschema import validate
from jsonschema.exceptions import ValidationError
from numpy.typing import NDArray
from madlib._utils import MadlibException, sensor_yaml_schema
from ._observation import ObservationCollection, combineObsCollections
from ._satellite import Satellite
from ._sensor import GroundOpticalSensor, _Sensor
[docs]
class SensorException(Exception):
"""SensorException class"""
def __init__(self, message=None):
super().__init__(message)
[docs]
class SensorCollection:
"""Class containing multiple sensor objects that can generate a
comprehensive observing schedule and collate observations."""
[docs]
def __init__(self, sensorList: Sequence[_Sensor]):
"""Initialize SensorCollection class
Parameters
----------
sensorList : List[Sensor]
List of Sensors to include in observation network
"""
self.sensorList = list(sensorList)
self.numSensors = len(sensorList)
self.obsTimes: List[NDArray[np.float64]] | None = None
[docs]
@staticmethod
def paramsFromYAML(
yaml_file: str | Path,
):
"""Parse a YAML into sensor parameters, returned as a list of dicts"""
try:
with open(yaml_file, "r") as f:
sensor_data = yaml.safe_load(f)
validate(sensor_data, sensor_yaml_schema)
except ValidationError as e:
if e.message == "'sensor_list' is a required property":
msg = f"The sensor YAML file {yaml_file} must contain the top-level property 'sensor_list'."
else:
e_path = str(e.path)
section = "/".join(list(e_path))
msg = (
f"Error in the sensor YAML file [{yaml_file}] in the entry [{section}]: "
f"{e.message}"
)
raise MadlibException(msg) from None
sensor_data = sensor_data["sensor_list"]
return sensor_data
[docs]
@classmethod
def fromYAML(cls, yaml_file: str):
"""Instantiate a SensorCollection object from a YAML file
Parameters
----------
yaml_file : str
Path to YAML file defining the sensors in the collection
"""
sensor_data = cls.paramsFromYAML(yaml_file)
sensors = [GroundOpticalSensor(**params) for key, params in sensor_data.items()]
sensor_network = cls(sensors)
return sensor_network
[docs]
def generate_obs_timing(self, start: float, end: float):
"""Given a start time and an end time (in MJD), generate an
array of observation times (also in MJD) based on the sensors'
defined parameters.
Parameters
----------
start : float
Earliest possible observation timestamp (MJD)
end : float
Latest possible observation (MJD)
Raises
------
SensorException
The observation schedule has already been generated.
"""
if self.obsTimes is not None:
message = "The observation schedule has already been generated."
raise SensorException(message=message)
self.obsTimes = [
sensor.generate_obs_timing(start, end) for sensor in self.sensorList
]
[docs]
def add_sensor(self, sensor: _Sensor):
"""Add a new sensor to the existing collection, provided the
sensor timing has not already been generated.
Parameters
----------
sensor : _Sensor
The sensor object to add to the collection
"""
if self.obsTimes is not None:
message = (
"Cannot add new sensors to a SensorCollection "
"if observation timing has already been generated."
)
raise (SensorException(message=message))
self.sensorList.append(sensor)
self.numSensors = len(self.sensorList)
[docs]
def observe(self, target_satellite: Satellite) -> ObservationCollection:
"""Given a madlib.Satellite, generate an ObservationCollection
Parameters
----------
target_satellite : Satellite
madlib.Satellite is a class for propagating a satellite
Returns
-------
ObservationCollection
Observations of a satellite from multiple sensors, combined into a single object.
Raises
------
SensorException
The observation schedule has already been generated.
"""
if self.obsTimes is None:
message = "obsTimes is None. Did you forget to generate obs timing first?"
raise SensorException(message=message)
else:
obsCollections: List[ObservationCollection] = [
sensor.observe(target_satellite, obstimes)
for sensor, obstimes in zip(self.sensorList, self.obsTimes)
]
observations: ObservationCollection = combineObsCollections(obsCollections)
return observations