Source code for pycif.plugins.transforms.complex.satellites.apply_AK

import numpy as np
from logging import info


[docs] def apply_ak( sim_ak, aks, pwgt, drycols, qa0, chosen_level=-1, use_drycols=False, scale_factor=1, log_space=False, normalize_columns=False ): r"""Apply averaging kernels to simulated concentrations (forward pass). Computes the satellite-equivalent observation from the vertically interpolated model field ``sim_ak``, the averaging kernels ``aks``, pressure weights ``pwgt``, dry-air columns ``drycols``, and prior profile ``qa0``. The exact formula depends on the keyword flags: *Normal space* (``log_space = False``): .. math:: y = \sum_i \left[(sim\_ak_i \cdot drycols_i - qa0_i) \cdot aks_i \cdot pwgt_i\right] + y_0^{prior} *Log space* (``log_space = True``): .. math:: y = \exp\left[\sum_i \left[(\log sim\_ak_i - \log qa0_i) \cdot aks_i \cdot pwgt_i\right] + \log y_0^{prior}\right] Optional normalisations and unit scaling are applied afterwards. Args: sim_ak (np.ndarray): simulated concentrations on satellite pressure levels, shape ``(nlevsat, nobs)``. aks (np.ndarray): averaging kernels, shape ``(nlevsat, nobs)``. pwgt (np.ndarray): pressure weights, shape ``(nlevsat, nobs)``. drycols (np.ndarray): dry-air column factors, shape ``(nlevsat, nobs)``. qa0 (np.ndarray): prior profiles, shape ``(nlevsat, nobs)``. chosen_level (int): level index for partial-column products (formula 3/8); ``-1`` means use the pressure-weighted sum. use_drycols (bool): normalise by dry-air columns instead of pressure weights. scale_factor (float): multiplicative unit scaling factor. log_space (bool): apply AKs in log space (MOPITT-style). normalize_columns (bool): normalise the total column by the summed AK/pressure-weight product. Returns: np.ndarray: satellite-equivalent observation, shape ``(nobs,)``. """ if not log_space: prior_profile = qa0[chosen_level] if chosen_level != -1 \ else np.nansum(qa0 * pwgt, axis=0) y0 = np.nansum( (sim_ak * drycols - qa0) * aks * pwgt, axis=0 ) + prior_profile if not normalize_columns: pass elif use_drycols: y0 /= np.nansum(drycols, axis=0) else: y0 /= np.nansum(pwgt * aks, axis=0) y0 *= scale_factor else: prior_profile = np.log(qa0[chosen_level]) if chosen_level != -1 \ else np.nansum(pwgt * np.where(qa0 > 0, np.log(qa0), 0), axis=0) y0 = np.nansum( (np.where(sim_ak > 0, np.log(sim_ak), 0) * drycols - np.where(qa0 > 0, np.log(qa0), 0)) * aks * pwgt, axis=0 ) + prior_profile if not normalize_columns: pass elif use_drycols: y0 /= np.nansum(drycols, axis=0) else: y0 /= np.nansum(pwgt * aks, axis=0) y0 = np.exp(y0) y0 *= scale_factor return y0
[docs] def apply_ak_tl( sim_ak_tl, sim_ak, aks, pwgt, drycols, qa0, chosen_level=-1, use_drycols=False, scale_factor=1, log_space=False, normalize_columns=False ): """Apply averaging kernels to concentration increments (tangent-linear pass). The tangent-linear of :func:`apply_ak` with respect to the simulated concentrations ``sim_ak``. Computes: * Normal space: :math:`y_{TL} = \\sum_i sim\_ak^{TL}_i \\cdot drycols_i \\cdot aks_i \\cdot pwgt_i` * Log space: :math:`y_{TL} = y_{fwd} \\sum_i (sim\_ak^{TL}_i / sim\_ak_i) \\cdot aks_i \\cdot pwgt_i` Args: sim_ak_tl (np.ndarray): TL of simulated concentrations on satellite pressure levels, shape ``(nlevsat, nobs)``. sim_ak (np.ndarray): forward simulated concentrations (needed in log space), shape ``(nlevsat, nobs)``. aks, pwgt, drycols, qa0, chosen_level, use_drycols, scale_factor, log_space, normalize_columns: same as :func:`apply_ak`. Returns: np.ndarray: TL of the satellite-equivalent observation, shape ``(nobs,)``. """ if not log_space: y0_tl = np.nansum((sim_ak_tl * drycols) * aks * pwgt, axis=0) if not normalize_columns: pass elif use_drycols: y0_tl /= np.nansum(drycols, axis=0) else: y0_tl /= np.nansum(pwgt * aks, axis=0) y0_tl *= scale_factor else: y0 = apply_ak(sim_ak, aks, pwgt, drycols, qa0, chosen_level=chosen_level, use_drycols=use_drycols, scale_factor=scale_factor, log_space=log_space, normalize_columns=normalize_columns) y0_tl = np.nansum( np.where( sim_ak > 0, sim_ak_tl / sim_ak * drycols, 0 ) * aks * pwgt, axis=0 ) if not normalize_columns: pass elif use_drycols: y0_tl /= np.nansum(drycols, axis=0) else: y0_tl /= np.nansum(pwgt * aks, axis=0) y0_tl *= y0 return y0_tl
[docs] def apply_ak_ad( obs_incr, sim_ak, aks, pwgt, drycols, qa0, chosen_level=-1, use_drycols=False, scale_factor=1, log_space=False, normalize_columns=False ): """Apply averaging kernels to propagate observation sensitivities (adjoint pass). The adjoint of :func:`apply_ak` w.r.t. the simulated concentrations. Maps the scalar observation sensitivity ``obs_incr`` back to level-resolved concentration sensitivities: * Normal space: :math:`s_i = obs\_incr \\cdot scale \\cdot drycols_i \\cdot aks_i \\cdot pwgt_i / norm` * Log space: :math:`s_i = obs\_incr \\cdot scale \\cdot aks_i \\cdot pwgt_i / sim\_ak_i / norm \\cdot y_{fwd}` Args: obs_incr (np.ndarray): observation-space sensitivity (``adj_out``), shape ``(nobs,)``. sim_ak (np.ndarray): forward simulated concentrations on satellite levels, shape ``(nlevsat, nobs)``. aks, pwgt, drycols, qa0, chosen_level, use_drycols, scale_factor, log_space, normalize_columns: same as :func:`apply_ak`. Returns: np.ndarray: adjoint of simulated concentrations on satellite levels, shape ``(nlevsat, nobs)``. """ if not log_space: obs_incr_out = obs_incr * scale_factor if not normalize_columns: pass elif use_drycols: obs_incr_out /= np.nansum(drycols, axis=0) else: obs_incr_out /= np.nansum(pwgt * aks, axis=0) obs_incr_out = (obs_incr_out * drycols) * aks * pwgt else: y0 = apply_ak(sim_ak, aks, pwgt, drycols, qa0, chosen_level=chosen_level, use_drycols=use_drycols, scale_factor=scale_factor, log_space=log_space, normalize_columns=normalize_columns) obs_incr_out = obs_incr * y0 if not normalize_columns: pass elif use_drycols: obs_incr_out /= np.nansum(drycols, axis=0) else: obs_incr_out /= np.nansum(pwgt * aks, axis=0) obs_incr_out = np.where( sim_ak > 0, ((obs_incr_out / sim_ak * drycols) * aks * pwgt), 0 ) return obs_incr_out