import numpy as np
import pandas as pd
from logging import info, warning
[docs]
def dist_obs_matrix(zlon1, zlat1, zlon2, zlat2, projection="gps"):
"""Computes the distance vector between (zlon1, zlat1) and an observation's location.
Args:
zlat1 (np.array): numpy array of latitudes
zlon1 (np.array): numpy array of longitudes
projection (str): the projection used for the longitudes and latitudes
lon_obs (float): the projection used for the longitudes and latitudes
lat_obs (float): the projection used for the longitudes and latitudes
Returns:
np.array(zlat1.size, zlat2.size): vector of distances
"""
if zlat1.size != zlon1.size:
raise ValueError(
"Warning: longitudes and latitudes do not have the "
"same dimension. Cannot compute the distance matrix"
)
# Dealing differently with gps and xy projections
if projection == "gps":
# Earth radius
rearth = 6371.03
# Flatten lon/lat
radlats1 = np.radians(zlat1).flatten()
radlons1 = np.radians(zlon1).flatten()
radlats2 = np.radians(zlat2).flatten()
radlons2 = np.radians(zlon2).flatten()
# Compute the distance (1/2)
val = 0.5 * (
1
- np.sin(radlats2[:, np.newaxis]) *
np.sin(radlats1[np.newaxis])
- np.cos(radlats2[:, np.newaxis]) *
np.cos(radlats1[np.newaxis])
* np.cos(radlons2[:, np.newaxis] - radlons1[np.newaxis])
)
# Set the values below epsilon to zero
epsilon = np.finfo(float).eps
mask_zero = np.abs(val) < epsilon
val[mask_zero] = 0
# Compute the distance (2/2)
dx = rearth * 2 * np.arcsin(val ** 0.5)
elif projection == "xy":
zlat1 = zlat1.flatten()
zlon1 = zlon1.flatten()
dlat = (zlat2[:, np.newaxis] - zlat1[np.newaxis, :]) ** 2
dlon = (zlon2[:, np.newaxis] - zlon1[np.newaxis, :]) ** 2
dx = dlat + dlon
dx = np.sqrt(dx)
else:
raise ValueError("Projection {} is not recognized".format(projection))
return dx
[docs]
def fill_loc_matrices(mode, controlvect, lons_obs2assim, lats_obs2assim, list_cntrlv_idx):
"""Fill the localization matrices based on the requested decay function
and the distance between pixels and observations.
Args:
mode (Plugin): mode Plugin
controlvect (Plugin): controlvect Plugin
lons_obs (np.array): longitudes of observations
lats_obs (np.array): latitudes of observations
Returns:
localize matrix (np.array): filled localization matrix
"""
loc_matrix_state = np.ones((len(lons_obs2assim), len(list_cntrlv_idx)))
loc_matrix_obs = np.ones((len(lons_obs2assim), len(lons_obs2assim)))
decay_lengths = mode.localization.decay_length
decay_func = mode.localization.decay_func
components = controlvect.datavect.components
all_decayl_same = True
previous_decay_length_comp = None
# Fill the localization matrix for the state-obs
for comp in components.attributes:
component = getattr(components, comp)
# Skip if no parameters
if not hasattr(component, "parameters"):
continue
for trcr in component.parameters.attributes:
tracer = getattr(component.parameters, trcr)
if not tracer.iscontrol:
continue
tracer = getattr(component.parameters, trcr)
grid = getattr(tracer, "domain", controlvect.domain)
projection = getattr(grid, "projection", "gps")
if tracer.hresol == 'hpixels':
centroids_lats = grid.zlat
centroids_lons = grid.zlon
else:
try:
centroids_lats = tracer.centroids_lats
centroids_lons = tracer.centroids_lons
except AttributeError:
warning(f"Cannot compute the centroids for the component {comp}/{trcr}."
" At present, you need to prescribe horizontal correlations to"
" use localization with regions.")
all_decayl_same = False
continue
# Fetch the decay lengths from the parameters
# Only one value for every component
if isinstance(decay_lengths, int) or isinstance(decay_lengths, float):
decay_length_comp = decay_lengths
# Different values for each component
else:
if hasattr(decay_lengths, comp):
decay_length_comp = getattr(decay_lengths, comp)
else:
decay_length_comp = getattr(decay_lengths, decay_lengths.attributes[0])
warning(f"No decay length has been prescribed for this component: {comp}.\n"
"The length is set identical to the first component of the list : "
"{}.".format(decay_lengths.attributes[0]))
if decay_length_comp < 0:
raise Exception(
"The decay length for localization must be positive")
if previous_decay_length_comp is not None and decay_length_comp != previous_decay_length_comp:
all_decayl_same = False
# Set the state-obs localization matrix
info(f"Filling state-obs localization matrix for {comp=}/{trcr=} with {decay_func=} "
f"and decay_length={decay_length_comp}")
dx_state = dist_obs_matrix(centroids_lons, centroids_lats,
lons_obs2assim, lats_obs2assim,
projection)
loc_state_partial = apply_function_loc(dx_state, decay_func, decay_length_comp)
previous_decay_length_comp = decay_length_comp
# Fill the localization matrix with the correlation coefficients
tracer_idx_range = np.arange(tracer.xpointer, tracer.xpointer + tracer.dim)
tracer_idx_seg = np.intersect1d(list_cntrlv_idx, tracer_idx_range)
mask_trcr_seg = np.in1d(list_cntrlv_idx, tracer_idx_seg)
ntiles = np.sum(mask_trcr_seg) // loc_state_partial.shape[1]
loc_matrix_state[:, mask_trcr_seg] = np.tile(loc_state_partial, ntiles)
# If full localization is not activated, no need to fill the obs-obs matrix
if not getattr(mode.localization, "full_localization", False):
return loc_matrix_state, loc_matrix_obs
# If all decay lengths are not equal, switch off the full localization
if not all_decayl_same:
warning(f"Full localization is switched off because all decay lengths are not identical")
setattr(mode.localization, "full_localization", False)
return loc_matrix_state, loc_matrix_obs
# Calculate the distances for the obs-obs localization matrix
dx_obs = dist_obs_matrix(lons_obs2assim, lats_obs2assim,
lons_obs2assim, lats_obs2assim,
projection)
# Fill the matrix
info(f"Filling obs-obs localization matrix with {decay_func=} and "
f"decay_length={previous_decay_length_comp}")
loc_matrix_obs = apply_function_loc(dx_obs, decay_func, previous_decay_length_comp)
return loc_matrix_state, loc_matrix_obs
[docs]
def apply_function_loc(dx, decay_func, decay_length):
"""Apply the decay function to generate the localization.
Args:
dx: the distance matrix
decay_func: the decay function
decay_length: the decay length
Returns:
obsvect updated with latitude, longitude, date, enddate and parameter
"""
loc = np.ones_like(dx)
if decay_func == "exponential":
loc = np.exp(- dx / decay_length)
elif decay_func == "normal":
loc = np.exp(- 0.5 * dx**2 / decay_length**2)
elif decay_func == "heaviside":
loc[dx <= decay_length] = 1
loc[dx > decay_length] = 0
elif decay_func == "gc99":
mask = dx <= decay_length
loc[mask] = \
- (1 / 4) * (dx[mask] / decay_length) ** 5 + \
(1 / 2) * (dx[mask] / decay_length) ** 4 + \
(5 / 8) * (dx[mask] / decay_length) ** 3 - \
(5 / 3) * (dx[mask] / decay_length) ** 2 + 1
mask = (dx > decay_length) & (dx <= 2 * decay_length)
loc[mask] = \
(1 / 12) * (dx[mask] / decay_length) ** 5 - \
(1 / 2) * (dx[mask] / decay_length) ** 4 + \
(5 / 8) * (dx[mask] / decay_length) ** 3 + \
(5 / 3) * (dx[mask] / decay_length) ** 2 - \
5 * (dx[mask] / decay_length) - \
(2 / 3) * (decay_length / dx[mask]) + 4
loc[(dx > 2 * decay_length)] = 0
else:
raise Exception(
"The requested decay function is not recognized")
if np.ma.isMaskedArray(loc):
loc = loc.data
return loc
[docs]
def complement_obsvect(obsvect, **kwargs):
"""Add extra information to obsvect needed for EnSRF.
Args:
obsvect (Plugin): the observation vector with all its attributes
Returns:
obsvect updated with latitude, longitude, date, enddate and parameter
"""
list_metadata = []
components = obsvect.datavect.components
for comp in components.attributes:
component = getattr(components, comp)
# Skip if component does not have parameters
if not hasattr(component, "parameters"):
continue
for trcr in component.parameters.attributes:
tracer = getattr(component.parameters, trcr)
if not tracer.isobs:
continue
keys2store = [
("metadata", "parameter"),
("metadata", "station"),
("metadata", "lat"),
("metadata", "lon"),
("metadata", "date"),
("metadata", "enddate")
]
list_metadata.append(tracer.datastore[keys2store])
try:
obsvect.metadata = pd.concat(list_metadata)
obsvect.metadata.columns = [k[1] for k in keys2store]
obsvect.metadata.index = range(len(obsvect.metadata))
except ValueError:
raise ValueError("No observations have been detected in this time window. Check your monitor.")
return obsvect