Source code for torchMWRT.lineshape

# -*- coding: utf-8 -*-

"""Line-shape parameter loaders for microwave absorption models.

This module consolidates the original pyrtlib line-shape loaders for
water vapor, oxygen, and ozone into a single torch-friendly interface.
In pyrtlib, these definitions were split across ``pyrtlib._lineshape.h2oll``,
``pyrtlib._lineshape.o2ll``, and ``pyrtlib._lineshape.o3ll``.
"""

import os
import importlib.util
import shutil
from pathlib import Path
from typing import Any, List, Optional

from netCDF4 import Dataset
import torch

MODULE_DIR = os.path.dirname(os.path.abspath(__file__))
LINESHAPE_DATA_PATH = "_lineshape"
REQUIRED_LINESHAPE_FILES = (
    "h2o_lineshape.nc",
    "o2_lineshape.nc",
    "o3_lineshape.nc",
)


def _default_lineshape_dir() -> Path:
    """Return the local torchMWRT line-shape directory."""
    return Path(MODULE_DIR) / LINESHAPE_DATA_PATH


def _discover_pyrtlib_lineshape_dir() -> Optional[Path]:
    """Locate ``pyrtlib/_lineshape`` in the current Python environment."""
    spec = importlib.util.find_spec("pyrtlib")
    if spec is None:
        return None

    roots: list[Path] = []
    if spec.submodule_search_locations is not None:
        roots.extend(Path(p) for p in spec.submodule_search_locations)
    elif spec.origin is not None:
        roots.append(Path(spec.origin).resolve().parent)

    for root in roots:
        candidate = root / "_lineshape"
        if candidate.is_dir():
            return candidate
    return None


[docs] def ensure_lineshape_data_available(destination: Optional[str | Path] = None) -> Path: """Ensure required line-shape NetCDF tables exist locally. Missing files are copied from an installed ``pyrtlib`` package. """ dst = Path(destination) if destination is not None else _default_lineshape_dir() dst.mkdir(parents=True, exist_ok=True) missing = [name for name in REQUIRED_LINESHAPE_FILES if not (dst / name).is_file()] if not missing: return dst source_dir = _discover_pyrtlib_lineshape_dir() if source_dir is None: missing_str = ", ".join(missing) raise FileNotFoundError( "torchMWRT is missing line-shape data files " f"({missing_str}), and installed pyrtlib/_lineshape could not be found." ) unavailable = [name for name in missing if not (source_dir / name).is_file()] if unavailable: unavailable_str = ", ".join(unavailable) raise FileNotFoundError( "torchMWRT could not fetch required line-shape files from pyrtlib: " f"{unavailable_str}" ) for name in missing: shutil.copy2(source_dir / name, dst / name) return dst
class _LineShape: """Container for line-shape coefficients and metadata.""" def __init__(self, **kwargs: Any) -> None: self.__dict__.update(kwargs) @staticmethod def _resolve_nc_path(nc_path: str) -> str: """Resolve a NetCDF path relative to this module when needed.""" if os.path.isabs(nc_path): return nc_path return os.path.join(MODULE_DIR, nc_path) @staticmethod def _available_models(nc_path: str) -> List[str]: """List available model groups in a NetCDF line-shape file. Parameters ---------- nc_path : str Path to the NetCDF file containing model groups. Returns ------- list[str] Available model identifiers. """ with Dataset(_LineShape._resolve_nc_path(nc_path), mode='r') as nc: return list(nc.groups.keys()) def to(self, device: Optional[torch.device] = None, dtype: Optional[torch.dtype] = None) -> "_LineShape": """Move/convert numeric attributes to torch tensors. Parameters ---------- device : torch.device, optional Target device. dtype : torch.dtype, optional Target dtype. Returns ------- _LineShape The same object with numeric attributes converted in place. Notes ----- Non-numeric metadata (for example ``model``) are preserved unchanged. """ for name, value in vars(self).items(): if name == "model": continue if torch.is_tensor(value): if device is not None or dtype is not None: setattr(self, name, value.to(device=device, dtype=dtype)) continue if isinstance(value, (list, tuple, int, float)) or hasattr(value, "__array__"): setattr(self, name, torch.as_tensor(value, device=device, dtype=dtype)) return self
[docs] class H2OLL(_LineShape): """Water-vapor line-shape coefficient container. Notes ----- Frequencies are in GHz. Strengths and broadening/shift coefficients follow the conventions documented in the original pyrtlib H2O line-shape module. Provenance: reference text and coefficient intent are taken from ``pyrtlib._lineshape.h2oll`` and consolidated here. References ---------- - M. Koshelev et al., JQSRT, v.205, pp.51-58 (2018) - V. Payne et al., IEEE Trans. Geosci. Remote Sens., v.46, pp.3601-3617 (2008) - G. Golubiatnikov, J. Mol. Spectrosc., v.230, pp.196-198 (2005) - M. Koshelev et al., J. Mol. Spectrosc., v.241, pp.101-108 (2007) - J.-M. Colmont et al., J. Mol. Spectrosc., v.193, pp.233-243 (1999) - M. Tretyakov et al., JQSRT, v.114, pp.109-121 (2013) - G. Golubiatnikov et al., JQSRT, v.109, pp.1828-1833 (2008) - V. Podobedov et al., JQSRT, v.87, pp.377-385 (2004) - M. Koshelev, JQSRT, v.112, pp.550-552 (2011) - M. Tretyakov, JQSRT, v.328, pp.7-26 (2016) - D. Turner et al., IEEE Trans. Geosci. Remote Sens., v.47, pp.3326-3337 (2009) - M. Koshelev et al., JQSRT, doi:10.1016/j.jqsrt.2020.107472 Other parameters from HITRAN2020. """ # code to import coefficient from .asc file # import pandas as pd # import numpy as np # a = pd.read_table("h2o_sdlist.asc", sep=',',header=None, skiprows=1, nrows=19, usecols=range(0, 20)) # a = np.loadtxt("h2o_sdlist.asc", delimiter=',', skiprows=1, usecols=range(0, 20), max_rows=20) # np.savetxt("/Users/slarosa/dev/pyrtlib/pyrtlib/lineshape/h2o_list_r22.txt", a) # nc = Dataset("/Users/slarosa/Downloads/h2o_lineshape.nc", mode='w') # for m in H2OAbsModel.implemented_models(): # grp = nc.createGroup(m) # H2OAbsModel.model = m # H2OAbsModel.set_ll() # grp.createDimension('rows', H2OAbsModel.h2oll.mtx.shape[0]) # grp.createDimension('cols', H2OAbsModel.h2oll.mtx.shape[1]) # mtx = grp.createVariable('mtx', 'f8', ('rows', 'cols', )) # grp.createDimension('ctrdim', H2OAbsModel.h2oll.ctr.shape[0]) # mtx[:] = H2OAbsModel.h2oll.mtx # ctr = grp.createVariable('ctr', 'f8', ('ctrdim', )) # ctr[:] = H2OAbsModel.h2oll.ctr # reftline = grp.createVariable('reftline', 'f4', ()) # reftline[:] = H2OAbsModel.h2oll.reftline # nc.close() NC_PATH = os.path.join(LINESHAPE_DATA_PATH, "h2o_lineshape.nc")
[docs] @classmethod def available_models(cls) -> List[str]: """Return available H2O line-shape model identifiers. Returns ------- list[str] Names of model groups found in ``h2o_lineshape.nc``. """ return cls._available_models(cls.NC_PATH)
[docs] @classmethod def load( cls, model: str, device: Optional[torch.device] = None, dtype: Optional[torch.dtype] = None, ) -> "H2OLL": """Load H2O line-shape parameters for a model. Parameters ---------- model : str H2O absorption-model identifier. device : torch.device, optional Target device for returned tensors. dtype : torch.dtype, optional Target dtype for returned tensors. Returns ------- H2OLL Loaded and converted line-shape parameter object. Raises ------ ValueError If ``model`` is not available in the NetCDF file. Notes ----- Model-dependent field mapping mirrors the original pyrtlib logic, including special handling for R03/R16/R17 and SD-family variants. Provenance: variable mapping rules were consolidated from ``pyrtlib._lineshape.h2oll``. """ with Dataset(cls._resolve_nc_path(cls.NC_PATH), mode='r') as nc: if model not in nc.groups: raise ValueError( f"Unknown H2O model '{model}'. Available: {list(nc.groups)}" ) d = nc.groups[model] mtx = d.variables['mtx'][:].data ctr = d.variables['ctr'][:].data reftline = d.variables['reftline'][:].data.item() extra = {} if model == 'R20SD': extra['d2air'] = d.variables['d2air'][:].data.item() extra['d2self'] = d.variables['d2self'][:].data.item() ls = cls( model=model, mtx=mtx, ctr=ctr, reftline=reftline, reftcon=ctr[0], cf=ctr[1], xcf=ctr[2], cs=ctr[3], xcs=ctr[4], fl=mtx[:, 1], s1=mtx[:, 2], b2=mtx[:, 3], w0=mtx[:, 4] / 1000.0, x=mtx[:, 5], w0s=mtx[:, 6] / 1000.0, xs=mtx[:, 7], ) if model == 'R03': ls.w0s = mtx[:, 7] / 1000.0 ls.xs = mtx[:, 8] ls.sr = mtx[:, 6] elif model in ['R16', 'R17']: ls.sr = mtx[:, 6] ls.w0s = mtx[:, 7] / 1000.0 ls.xs = mtx[:, 8] if model not in ['R98', 'R03', 'R16', 'R17']: ls.sh = mtx[:, 8] / 1000.0 ls.xh = mtx[:, 9] ls.shs = mtx[:, 10] / 1000.0 ls.xhs = mtx[:, 11] if model in ['R19', 'R19SD', 'R20', 'R20SD', 'R21SD', 'R22SD', 'R23', 'R23SD', 'R24', 'MWL24']: ls.aair = mtx[:, 12] ls.aself = mtx[:, 13] if model in ['R19SD', 'R20SD', 'R21SD', 'R22SD', 'R23', 'R23SD', 'R24', 'MWL24']: ls.w2 = mtx[:, 14] / 1000.0 if model in ['R19SD', 'R20SD', 'R22SD', 'R23', 'R23SD', 'R24', 'MWL24']: ls.w2s = mtx[:, 15] / 1000.0 if model in ['R21SD', 'R22SD', 'R23', 'R23SD', 'R24', 'MWL24']: ls.xw2 = mtx[:, 15] ls.w2s = mtx[:, 16] / 1000.0 ls.xw2s = mtx[:, 17] ls.d2 = mtx[:, 18] / 1000.0 ls.d2s = mtx[:, 19] / 1000.0 if model == 'R20SD': # Provide defaults so downstream code expecting d2/d2s still works. ls.d2 = mtx[:, 14] * 0.0 ls.d2s = mtx[:, 15] * 0.0 ls.d2air = extra['d2air'] ls.d2self = extra['d2self'] return ls.to(device=device, dtype=dtype)
[docs] class O2LL(_LineShape): """Oxygen line-shape coefficient container. Notes ----- This class loads line-mixing and broadening coefficients used by the O2 absorption models. Provenance: data-field selection rules are taken from ``pyrtlib._lineshape.o2ll`` and consolidated here. """ # nc = Dataset("/Users/slarosa/Downloads/o2_lineshape.nc", mode='w') # for m in O2AbsModel.implemented_models(): # if m in ['R21SD']: # continue # O2AbsModel.model = m # try: # O2AbsModel.set_ll() # except: # m = m.replace("SD", "") # O2AbsModel.model = m.replace("SD", "") # O2AbsModel.set_ll() # grp = nc.createGroup(m) # vl = [item for item in dir(O2AbsModel.o2ll) if not item.startswith("__")] # for v in vl: # if v != 'np': # if v in ['wb300', 'x']: # mtx = grp.createVariable(v, 'f4', ()) # else: # grp.createDimension(f'rows_{v}', getattr(O2AbsModel.o2ll, v).shape[0]) # mtx = grp.createVariable(v, 'f8', (f'rows_{v}',)) # mtx[:] = getattr(O2AbsModel.o2ll, v) # nc.close() NC_PATH = os.path.join(LINESHAPE_DATA_PATH, "o2_lineshape.nc")
[docs] @classmethod def available_models(cls) -> List[str]: """Return available O2 line-shape model identifiers. Returns ------- list[str] Names of model groups found in ``o2_lineshape.nc``. """ return cls._available_models(cls.NC_PATH)
[docs] @classmethod def load( cls, model: str, device: Optional[torch.device] = None, dtype: Optional[torch.dtype] = None, ) -> "O2LL": """Load O2 line-shape parameters for a model. Parameters ---------- model : str O2 absorption-model identifier. device : torch.device, optional Target device for returned tensors. dtype : torch.dtype, optional Target dtype for returned tensors. Returns ------- O2LL Loaded and converted line-shape parameter object. Raises ------ ValueError If ``model`` is not available in the NetCDF file. Notes ----- Optional parameter arrays (for example ``v``, ``y0``, ``y1``, ``g*``, ``dnu*``) are attached only for model families where they exist. Provenance: model-conditional field extraction was consolidated from ``pyrtlib._lineshape.o2ll``. """ with Dataset(cls._resolve_nc_path(cls.NC_PATH), mode='r') as nc: if model not in nc.groups: raise ValueError( f"Unknown O2 model '{model}'. Available: {list(nc.groups)}" ) d = nc.groups[model] f = d.variables['f'][:].data s300 = d.variables['s300'][:].data be = d.variables['be'][:].data wb300 = d.variables['wb300'][:].data.item() x = d.variables['x'][:].data.item() w300 = d.variables['w300'][:].data v = None y300 = None y0 = None y1 = g0 = g1 = dnu0 = dnu1 = None if model in ['R98', 'R03', 'R17', 'R18', 'R19', 'R19SD']: v = d.variables['v'][:].data if model in ['R98', 'R03', 'R17', 'R18', 'R19', 'R19SD', 'R23', 'R24']: y300 = d.variables['y300'][:].data else: y0 = d.variables['y0'][:].data if model in ['R16', 'R20', 'R20SD', 'R21', 'R22', 'R23', 'R24']: y1 = d.variables['y1'][:].data g0 = d.variables['g0'][:].data g1 = d.variables['g1'][:].data dnu0 = d.variables['dnu0'][:].data dnu1 = d.variables['dnu1'][:].data ls = cls( model=model, f=f, s300=s300, be=be, wb300=wb300, x=x, w300=w300, ) if v is not None: ls.v = v if y300 is not None: ls.y300 = y300 if y0 is not None: ls.y0 = y0 if y1 is not None: ls.y1 = y1 ls.g0 = g0 ls.g1 = g1 ls.dnu0 = dnu0 ls.dnu1 = dnu1 return ls.to(device=device, dtype=dtype)
[docs] class O3LL(_LineShape): """Ozone line-shape coefficient container. Notes ----- Coefficients are loaded from the ozone NetCDF line-shape table and exposed with convenient derived views (``fl``, ``s1``, ``b``, ``w``, ``x``, ``sr``). Provenance: loading/mapping behavior is taken from ``pyrtlib._lineshape.o3ll`` and consolidated here. """ # nc = Dataset("/Users/slarosa/Downloads/o3_lineshape.nc", mode='w') # for m in O3AbsModel.implemented_models(): # if m in ['R22SD', 'R18']: # if m == 'R22SD': # m = 'R22' # grp = nc.createGroup(m) # O3AbsModel.model = m # O3AbsModel.set_ll() # grp.createDimension('rows', O3AbsModel.o3ll.mtx.shape[0]) # grp.createDimension('cols', O3AbsModel.o3ll.mtx.shape[1]) # mtx = grp.createVariable('mtx', 'f8', ('rows', 'cols', )) # mtx[:] = O3AbsModel.o3ll.mtx # reftline = grp.createVariable('reftline', 'f4', ()) # reftline[:] = O3AbsModel.o3ll.reftline # nc.close() NC_PATH = os.path.join(LINESHAPE_DATA_PATH, "o3_lineshape.nc")
[docs] @classmethod def available_models(cls) -> List[str]: """Return available O3 line-shape model identifiers. Returns ------- list[str] Names of model groups found in ``o3_lineshape.nc``. """ return cls._available_models(cls.NC_PATH)
[docs] @classmethod def load( cls, model: str, device: Optional[torch.device] = None, dtype: Optional[torch.dtype] = None, ) -> "O3LL": """Load O3 line-shape parameters for a model. Parameters ---------- model : str O3 absorption-model identifier. device : torch.device, optional Target device for returned tensors. dtype : torch.dtype, optional Target dtype for returned tensors. Returns ------- O3LL Loaded and converted line-shape parameter object. Raises ------ ValueError If ``model`` is not available in the NetCDF file. Notes ----- Provenance: model loading and variable mapping are consolidated from ``pyrtlib._lineshape.o3ll``. """ with Dataset(cls._resolve_nc_path(cls.NC_PATH), mode='r') as nc: if model not in nc.groups: raise ValueError( f"Unknown O3 model '{model}'. Available: {list(nc.groups)}" ) d = nc.groups[model] mtx = d.variables['mtx'][:].data reftline = d.variables['reftline'][:].data.item() return cls( model=model, mtx=mtx, reftline=reftline, fl=mtx[:, 1], s1=mtx[:, 2], b=mtx[:, 3], w=mtx[:, 4] / 1000.0, x=mtx[:, 5], sr=mtx[:, 6], ).to(device=device, dtype=dtype)