import numpy as np
from logging import info
[docs]
def apply_ak(
sim_ak, aks, pwgt, drycols, qa0,
chosen_level=-1, use_drycols=False,
scale_factor=1, log_space=False,
normalize_columns=False
):
r"""Apply averaging kernels to simulated concentrations (forward pass).
Computes the satellite-equivalent observation from the vertically
interpolated model field ``sim_ak``, the averaging kernels ``aks``,
pressure weights ``pwgt``, dry-air columns ``drycols``, and prior
profile ``qa0``. The exact formula depends on the keyword flags:
*Normal space* (``log_space = False``):
.. math::
y = \sum_i \left[(sim\_ak_i \cdot drycols_i - qa0_i)
\cdot aks_i \cdot pwgt_i\right] + y_0^{prior}
*Log space* (``log_space = True``):
.. math::
y = \exp\left[\sum_i
\left[(\log sim\_ak_i - \log qa0_i) \cdot aks_i \cdot pwgt_i\right]
+ \log y_0^{prior}\right]
Optional normalisations and unit scaling are applied afterwards.
Args:
sim_ak (np.ndarray): simulated concentrations on satellite pressure
levels, shape ``(nlevsat, nobs)``.
aks (np.ndarray): averaging kernels, shape ``(nlevsat, nobs)``.
pwgt (np.ndarray): pressure weights, shape ``(nlevsat, nobs)``.
drycols (np.ndarray): dry-air column factors, shape
``(nlevsat, nobs)``.
qa0 (np.ndarray): prior profiles, shape ``(nlevsat, nobs)``.
chosen_level (int): level index for partial-column products
(formula 3/8); ``-1`` means use the pressure-weighted sum.
use_drycols (bool): normalise by dry-air columns instead of
pressure weights.
scale_factor (float): multiplicative unit scaling factor.
log_space (bool): apply AKs in log space (MOPITT-style).
normalize_columns (bool): normalise the total column by the
summed AK/pressure-weight product.
Returns:
np.ndarray: satellite-equivalent observation, shape ``(nobs,)``.
"""
if not log_space:
prior_profile = qa0[chosen_level] if chosen_level != -1 \
else np.nansum(qa0 * pwgt, axis=0)
y0 = np.nansum(
(sim_ak * drycols - qa0) * aks * pwgt,
axis=0
) + prior_profile
if not normalize_columns:
pass
elif use_drycols:
y0 /= np.nansum(drycols, axis=0)
else:
y0 /= np.nansum(pwgt * aks, axis=0)
y0 *= scale_factor
else:
prior_profile = np.log(qa0[chosen_level]) if chosen_level != -1 \
else np.nansum(pwgt * np.where(qa0 > 0, np.log(qa0), 0), axis=0)
y0 = np.nansum(
(np.where(sim_ak > 0, np.log(sim_ak), 0) * drycols
- np.where(qa0 > 0, np.log(qa0), 0)) * aks * pwgt,
axis=0
) + prior_profile
if not normalize_columns:
pass
elif use_drycols:
y0 /= np.nansum(drycols, axis=0)
else:
y0 /= np.nansum(pwgt * aks, axis=0)
y0 = np.exp(y0)
y0 *= scale_factor
return y0
[docs]
def apply_ak_tl(
sim_ak_tl, sim_ak, aks, pwgt, drycols, qa0,
chosen_level=-1, use_drycols=False,
scale_factor=1, log_space=False,
normalize_columns=False
):
"""Apply averaging kernels to concentration increments (tangent-linear pass).
The tangent-linear of :func:`apply_ak` with respect to the simulated
concentrations ``sim_ak``. Computes:
* Normal space: :math:`y_{TL} = \\sum_i sim\_ak^{TL}_i \\cdot drycols_i \\cdot aks_i \\cdot pwgt_i`
* Log space: :math:`y_{TL} = y_{fwd} \\sum_i (sim\_ak^{TL}_i / sim\_ak_i) \\cdot aks_i \\cdot pwgt_i`
Args:
sim_ak_tl (np.ndarray): TL of simulated concentrations on satellite
pressure levels, shape ``(nlevsat, nobs)``.
sim_ak (np.ndarray): forward simulated concentrations (needed in log
space), shape ``(nlevsat, nobs)``.
aks, pwgt, drycols, qa0, chosen_level, use_drycols, scale_factor,
log_space, normalize_columns: same as :func:`apply_ak`.
Returns:
np.ndarray: TL of the satellite-equivalent observation,
shape ``(nobs,)``.
"""
if not log_space:
y0_tl = np.nansum((sim_ak_tl * drycols) * aks * pwgt, axis=0)
if not normalize_columns:
pass
elif use_drycols:
y0_tl /= np.nansum(drycols, axis=0)
else:
y0_tl /= np.nansum(pwgt * aks, axis=0)
y0_tl *= scale_factor
else:
y0 = apply_ak(sim_ak, aks, pwgt, drycols, qa0,
chosen_level=chosen_level, use_drycols=use_drycols,
scale_factor=scale_factor, log_space=log_space,
normalize_columns=normalize_columns)
y0_tl = np.nansum(
np.where(
sim_ak > 0,
sim_ak_tl / sim_ak * drycols,
0
) * aks * pwgt,
axis=0
)
if not normalize_columns:
pass
elif use_drycols:
y0_tl /= np.nansum(drycols, axis=0)
else:
y0_tl /= np.nansum(pwgt * aks, axis=0)
y0_tl *= y0
return y0_tl
[docs]
def apply_ak_ad(
obs_incr, sim_ak, aks, pwgt, drycols, qa0,
chosen_level=-1, use_drycols=False,
scale_factor=1, log_space=False,
normalize_columns=False
):
"""Apply averaging kernels to propagate observation sensitivities (adjoint pass).
The adjoint of :func:`apply_ak` w.r.t. the simulated concentrations.
Maps the scalar observation sensitivity ``obs_incr`` back to
level-resolved concentration sensitivities:
* Normal space: :math:`s_i = obs\_incr \\cdot scale \\cdot drycols_i \\cdot aks_i \\cdot pwgt_i / norm`
* Log space: :math:`s_i = obs\_incr \\cdot scale \\cdot aks_i \\cdot pwgt_i / sim\_ak_i / norm \\cdot y_{fwd}`
Args:
obs_incr (np.ndarray): observation-space sensitivity (``adj_out``),
shape ``(nobs,)``.
sim_ak (np.ndarray): forward simulated concentrations on satellite
levels, shape ``(nlevsat, nobs)``.
aks, pwgt, drycols, qa0, chosen_level, use_drycols, scale_factor,
log_space, normalize_columns: same as :func:`apply_ak`.
Returns:
np.ndarray: adjoint of simulated concentrations on satellite levels,
shape ``(nlevsat, nobs)``.
"""
if not log_space:
obs_incr_out = obs_incr * scale_factor
if not normalize_columns:
pass
elif use_drycols:
obs_incr_out /= np.nansum(drycols, axis=0)
else:
obs_incr_out /= np.nansum(pwgt * aks, axis=0)
obs_incr_out = (obs_incr_out * drycols) * aks * pwgt
else:
y0 = apply_ak(sim_ak, aks, pwgt, drycols, qa0,
chosen_level=chosen_level, use_drycols=use_drycols,
scale_factor=scale_factor, log_space=log_space,
normalize_columns=normalize_columns)
obs_incr_out = obs_incr * y0
if not normalize_columns:
pass
elif use_drycols:
obs_incr_out /= np.nansum(drycols, axis=0)
else:
obs_incr_out /= np.nansum(pwgt * aks, axis=0)
obs_incr_out = np.where(
sim_ak > 0,
((obs_incr_out / sim_ak * drycols) * aks * pwgt),
0
)
return obs_incr_out