import copy
import numpy as np
import pandas as pd
from ....utils import dates
from .utils.dimensions import hresol2dim, vresol2dim
from ....plugins.transforms.system.fromcontrol.utils.scalemaps \
import map2scale, vmap2vaggreg
from ....utils.dataarrays.reindex import reindex
from logging import debug, info
from .utils.get_physical import get_physical
[docs]
def init_xb(cntrlv, trid, **kwargs):
"""Initializes the prior control vector. Loops over all components and
tracers and process temporal and horizontal resolution.
Args:
cntrlv (Plugin): definition of the control vector.
datei (datetime): initial date of the inversion window
datei (datetime): end date of the inversion window
"""
comp, trcr = trid
datavect = cntrlv.datavect
# If no definition is specified for the control vector in the Yaml,
# return empty control vector
tracer = getattr(getattr(getattr(getattr(
datavect, "components", None),
comp, None),
"parameters", None),
trcr, None)
if tracer is None:
return
# Skip tracers that are not control variables
if not tracer.iscontrol:
return
debug("Initializing xb for {}/{}"
.format(comp, trcr))
# Upper and lower bounds
lbound = getattr(tracer, "lower_bound", -np.inf)
ubound = getattr(tracer, "upper_bound", np.inf)
# Filling Xb and uncertainties
# Most types are scalar factors
if (
getattr(tracer, "type", "scalar") == "scalar"
# or getattr(tracer, "vresol", "") != "vpixels"
):
tracer.type = "scalar"
xb = np.ones(tracer.dim) * getattr(tracer, "xb_scale", 1.0) \
+ getattr(tracer, "xb_value", 0.0)
std = tracer.err
cntrlv.xb[tracer.xpointer: tracer.xpointer + tracer.dim] = \
copy.deepcopy(xb)
cntrlv.x[tracer.xpointer: tracer.xpointer + tracer.dim] = \
copy.deepcopy(xb)
cntrlv.std[tracer.xpointer: tracer.xpointer + tracer.dim] = std
if cntrlv.use_boundaries:
cntrlv.lower_bounds[
tracer.xpointer: tracer.xpointer + tracer.dim
] = lbound
cntrlv.upper_bounds[
tracer.xpointer: tracer.xpointer + tracer.dim
] = ubound
# Physical variables stores uncertainties in the physical space
# Valid only for control variables at the pixel resolution
else:
tracer.type = "physical"
xb, std = get_physical(cntrlv, tracer, comp, trcr)
cntrlv.xb[tracer.xpointer:tracer.xpointer + tracer.dim] = \
copy.deepcopy(xb)
cntrlv.x[tracer.xpointer:tracer.xpointer + tracer.dim] = \
copy.deepcopy(xb)
cntrlv.std[tracer.xpointer:tracer.xpointer + tracer.dim] = std
if cntrlv.use_boundaries:
cntrlv.lower_bounds[
tracer.xpointer: tracer.xpointer + tracer.dim
] = lbound * std
cntrlv.upper_bounds[
tracer.xpointer: tracer.xpointer + tracer.dim
] = ubound * std
# Apply minimal error if required
if hasattr(tracer, "lowlim_error"):
lowlim_error = \
tracer.lowlim_error.err \
* getattr(tracer.lowlim_error, "unit_scale", 1)
# Load physical values if not already done
if (
getattr(tracer, "type", "scalar") == "scalar"
# or getattr(tracer, "vresol", "") != "vpixels"
):
xb, std = get_physical(cntrlv, tracer, comp, trcr)
# Replace std whenever necessary
std = np.maximum(std, lowlim_error)
if (
getattr(tracer, "type", "scalar") == "scalar"
# or getattr(tracer, "vresol", "") != "vpixels"
):
std /= xb
if np.any(xb == 0):
raise Exception("Trying to scale standard deviation for {}"
"in scalar mode, but prior is zero; "
"thus preventing any scaling. \n"
"This means that lowlim_error cannot be "
"applied in your case with a scalar variable.\n"
"Please consider using a physical variable"
.format(trid))
cntrlv.std[tracer.xpointer:tracer.xpointer + tracer.dim] = std