"""Module to parse the IONosphere map EXchange (IONEX) data format,
as described in Schaer and Gurtner (1998)"""
from __future__ import annotations
import gzip
import io
from pathlib import Path
from typing import Any, NamedTuple, TextIO
import astropy.units as u
import numpy as np
from astropy.time import Time
from numpy.typing import NDArray
from unlzw3 import unlzw
from spinifex.exceptions import IonexError
from spinifex.ionospheric.tec_data import IonexOptions
from spinifex.times import get_unique_days
[docs]
class IonexData(NamedTuple):
"""Object containing all necessary information from Ionex data"""
[docs]
lons: NDArray[np.float64]
"""array with available longitude values (degrees)"""
[docs]
lats: NDArray[np.float64]
"""array with available latitude values (degrees)"""
"""available times"""
"""dimension of the heights (usually 1)"""
"""available heights (km)"""
[docs]
tec: NDArray[np.float64]
"""array with tecvalues times x lons x lats (TECU)"""
[docs]
rms: NDArray[np.float64]
"""array with rms of tecvalues times x lons x lats (TECU, if available, nan otherwise)"""
[docs]
def read_ionex(
ionex_filename: Path,
next_day_ionex_filename: Path | None = None,
options: IonexOptions | None = None,
) -> IonexData:
"""Read and parse a ionex file. Returns a ionex object.
Parameters
----------
ionex_filename : str
name of the ionex data file
next_day_ionex_filename : str| None
name of the ionex data file for the next day (to remove midnight jumps)
options : IonexOptions
Options for the ionex model.
Returns
-------
IonexData
ionex object with data and grid
"""
if ionex_filename.suffix == ".gz":
with gzip.open(ionex_filename, "rt", encoding="utf-8") as file_buffer:
ionex = _read_ionex_data(file_buffer, options=options)
elif ionex_filename.suffix == ".Z":
data = unlzw(ionex_filename.read_bytes()).decode("utf-8")
with io.StringIO(data) as file_buffer:
ionex = _read_ionex_data(file_buffer, options=options)
else:
with ionex_filename.open(encoding="utf-8") as file_buffer:
ionex = _read_ionex_data(file_buffer, options=options)
if next_day_ionex_filename is not None:
if next_day_ionex_filename.suffix == ".gz":
with gzip.open(
next_day_ionex_filename, "rt", encoding="utf-8"
) as file_buffer:
next_day_ionex = _read_ionex_data(file_buffer, options=options)
elif next_day_ionex_filename.suffix == ".Z":
data = unlzw(next_day_ionex_filename.read_bytes()).decode("utf-8")
with io.StringIO(data) as file_buffer:
next_day_ionex = _read_ionex_data(file_buffer, options=options)
else:
with next_day_ionex_filename.open(encoding="utf-8") as file_buffer:
next_day_ionex = _read_ionex_data(file_buffer, options=options)
ionex = _replace_midnight_data(ionex, next_day_ionex)
return ionex
[docs]
def _replace_midnight_data(ionex: IonexData, next_day_ionex: IonexData) -> IonexData:
"""mtigate jumps in tec value at midnight by inserting the tec value of the next day
Parameters
----------
ionex : IonexData
ionex data object
next_day_ionex : IonexData
ionex data of the next day
Returns
-------
IonexData
ionex data object with the data of last timeslot replaced by the data of the next day
"""
tec = ionex.tec
tec_error = ionex.rms
tmidx = np.where(np.isclose(ionex.times.mjd - next_day_ionex.times.mjd[0], 0, 1e-6))
tec[tmidx] = next_day_ionex.tec[0]
tec_error[tmidx] = next_day_ionex.rms[0]
return IonexData(
tec=tec,
rms=tec_error,
lons=ionex.lons,
lats=ionex.lats,
dims=ionex.dims,
h=ionex.h,
times=ionex.times,
)
[docs]
def _fill_data_record(
data: NDArray[np.float64],
filep: TextIO,
stop_label: str,
timeidx: int,
ionex_header: IonexHeader,
) -> None:
"""Helper function to parse a data block of a single map in ionex.
Puts filepointer to the end of the map
Parameters
----------
data : NDArray
pre allocated array to store the datablock
filep : TextIO
_description_
stop_label : str
end of the data block indicator
timeidx : int
index of time of the data block
ionex_header : namedtuple
header information
"""
line = filep.readline() # read EPOCH (not needed since we have the index)
# TODO: Properly type tec
tec: list[Any] = []
lonidx = 0
latidx = 0
# TODO: Replace magic numbers with named constants
for line in filep:
label = line[60:-1]
if stop_label in label:
if tec:
tec_array = np.array(tec) * ionex_header.mfactor
data[timeidx, lonidx:, latidx] = tec_array
return
if "LAT/LON1/LON2/DLON/H" in label:
if tec:
tec_array = np.array(tec) * ionex_header.mfactor
data[timeidx, lonidx:, latidx] = tec_array
tec = []
record = line[:60]
lat, lon1, _, _, _ = (float(record[i : i + 6]) for i in range(2, 32, 6))
latidx = int(np.argmin(np.abs(ionex_header.lats - lat)))
lonidx = int(np.argmin(np.abs(ionex_header.lons - lon1)))
else:
record = line[:-1]
tec += [
float(record[i : i + 5])
for i in range(0, len(record), 5)
if record[i : i + 5].strip()
]
[docs]
def _read_ionex_data(filep: TextIO, options: IonexOptions | None = None) -> IonexData:
"""This function parses the IONEX file.
Some fixed structure (like data records being strings of exactly 80 characters) of the file
is assumed. This structure is described in Schaer and Gurtner (1998).
Parameters
----------
filep : TextIO
pointer to an ionex file
options : IonexOptions | None, optional
options for ionex model, by default None
Returns
-------
IonexData
ionex object
"""
if options is None:
options = IonexOptions()
ionex_header = _read_ionex_header(filep)
tecarray = np.zeros(
ionex_header.times.shape + ionex_header.lons.shape + ionex_header.lats.shape,
dtype=float,
)
rmsarray = np.full(tecarray.shape, np.nan)
for line in filep:
# _read_ionex_header should have put the filep at the end of the header
label = line[60:-1]
record = line[:60]
if "START OF TEC MAP" in label:
timeidx = int(record) - 1
_fill_data_record(tecarray, filep, "END OF TEC MAP", timeidx, ionex_header)
if "START OF RMS MAP" in label:
timeidx = int(record) - 1
_fill_data_record(rmsarray, filep, "END OF RMS MAP", timeidx, ionex_header)
if options.prefix == "uqr" and options.correct_uqrg_rms:
# apply uqr correction zhao et al. (2021)
rmsarray = rmsarray - 6
rmsarray[rmsarray < 1] = 1
return IonexData(
lons=ionex_header.lons,
lats=ionex_header.lats,
times=ionex_header.times,
dims=ionex_header.dims,
h=ionex_header.h,
tec=tecarray,
rms=rmsarray,
)
[docs]
def unique_days_from_ionex(ionex_data: IonexData | list[IonexData]) -> Time:
"""Get unique days from a ionex object or list of ionex objects.
Parameters
----------
ionex_data : IonexData | list[IonexData]
ionex object or list of ionex objects
Returns
-------
Time
unique days
"""
# Get first MJD of each ionex object
# This avoids issues with midnight crossing
if isinstance(ionex_data, IonexData):
time_jd_array = ionex_data.times.sort().mjd[0]
else:
time_list: list[NDArray[np.int64]] = []
for ionex in ionex_data:
ionex_time = ionex.times.sort().mjd[0]
time_list.append(ionex_time)
time_jd_array = np.array(time_list)
times = Time(
time_jd_array,
format="mjd",
)
return get_unique_days(times)
[docs]
def unique_days_from_ionex_files(ionex_files: list[Path] | Path) -> Time:
"""Get unique days from a list of ionex files.
Parameters
----------
ionex_files : list[Path]
list of ionex files
Returns
-------
Time
unique days
"""
if isinstance(ionex_files, Path):
ionex_data = read_ionex(ionex_files)
return unique_days_from_ionex(ionex_data)
ionex_data_list = [read_ionex(ionex_file) for ionex_file in ionex_files]
return unique_days_from_ionex(ionex_data_list)