Source code for pycif.plugins.modes.ensrf.localization

import numpy as np
import pandas as pd
from logging import info, warning


[docs] def dist_obs_matrix(zlon1, zlat1, zlon2, zlat2, projection="gps"): """Computes the distance vector between (zlon1, zlat1) and an observation's location. Args: zlat1 (np.array): numpy array of latitudes zlon1 (np.array): numpy array of longitudes projection (str): the projection used for the longitudes and latitudes lon_obs (float): the projection used for the longitudes and latitudes lat_obs (float): the projection used for the longitudes and latitudes Returns: np.array(zlat1.size, zlat2.size): vector of distances """ if zlat1.size != zlon1.size: raise ValueError( "Warning: longitudes and latitudes do not have the " "same dimension. Cannot compute the distance matrix" ) # Dealing differently with gps and xy projections if projection == "gps": # Earth radius rearth = 6371.03 # Flatten lon/lat radlats1 = np.radians(zlat1).flatten() radlons1 = np.radians(zlon1).flatten() radlats2 = np.radians(zlat2).flatten() radlons2 = np.radians(zlon2).flatten() # Compute the distance (1/2) val = 0.5 * ( 1 - np.sin(radlats2[:, np.newaxis]) * np.sin(radlats1[np.newaxis]) - np.cos(radlats2[:, np.newaxis]) * np.cos(radlats1[np.newaxis]) * np.cos(radlons2[:, np.newaxis] - radlons1[np.newaxis]) ) # Set the values below epsilon to zero epsilon = np.finfo(float).eps mask_zero = np.abs(val) < epsilon val[mask_zero] = 0 # Compute the distance (2/2) dx = rearth * 2 * np.arcsin(val ** 0.5) elif projection == "xy": zlat1 = zlat1.flatten() zlon1 = zlon1.flatten() dlat = (zlat2[:, np.newaxis] - zlat1[np.newaxis, :]) ** 2 dlon = (zlon2[:, np.newaxis] - zlon1[np.newaxis, :]) ** 2 dx = dlat + dlon dx = np.sqrt(dx) else: raise ValueError("Projection {} is not recognized".format(projection)) return dx
[docs] def fill_loc_matrices(mode, controlvect, lons_obs2assim, lats_obs2assim, list_cntrlv_idx): """Fill the localization matrices based on the requested decay function and the distance between pixels and observations. Args: mode (Plugin): mode Plugin controlvect (Plugin): controlvect Plugin lons_obs (np.array): longitudes of observations lats_obs (np.array): latitudes of observations Returns: localize matrix (np.array): filled localization matrix """ loc_matrix_state = np.ones((len(lons_obs2assim), len(list_cntrlv_idx))) loc_matrix_obs = np.ones((len(lons_obs2assim), len(lons_obs2assim))) decay_lengths = mode.localization.decay_length decay_func = mode.localization.decay_func components = controlvect.datavect.components all_decayl_same = True previous_decay_length_comp = None # Fill the localization matrix for the state-obs for comp in components.attributes: component = getattr(components, comp) # Skip if no parameters if not hasattr(component, "parameters"): continue for trcr in component.parameters.attributes: tracer = getattr(component.parameters, trcr) if not tracer.iscontrol: continue tracer = getattr(component.parameters, trcr) grid = getattr(tracer, "domain", controlvect.domain) projection = getattr(grid, "projection", "gps") if tracer.hresol == 'hpixels': centroids_lats = grid.zlat centroids_lons = grid.zlon else: try: centroids_lats = tracer.centroids_lats centroids_lons = tracer.centroids_lons except AttributeError: warning(f"Cannot compute the centroids for the component {comp}/{trcr}." " At present, you need to prescribe horizontal correlations to" " use localization with regions.") all_decayl_same = False continue # Fetch the decay lengths from the parameters # Only one value for every component if isinstance(decay_lengths, int) or isinstance(decay_lengths, float): decay_length_comp = decay_lengths # Different values for each component else: if hasattr(decay_lengths, comp): decay_length_comp = getattr(decay_lengths, comp) else: decay_length_comp = getattr(decay_lengths, decay_lengths.attributes[0]) warning(f"No decay length has been prescribed for this component: {comp}.\n" "The length is set identical to the first component of the list : " "{}.".format(decay_lengths.attributes[0])) if decay_length_comp < 0: raise Exception( "The decay length for localization must be positive") if previous_decay_length_comp is not None and decay_length_comp != previous_decay_length_comp: all_decayl_same = False # Set the state-obs localization matrix info(f"Filling state-obs localization matrix for {comp=}/{trcr=} with {decay_func=} " f"and decay_length={decay_length_comp}") dx_state = dist_obs_matrix(centroids_lons, centroids_lats, lons_obs2assim, lats_obs2assim, projection) loc_state_partial = apply_function_loc(dx_state, decay_func, decay_length_comp) previous_decay_length_comp = decay_length_comp # Fill the localization matrix with the correlation coefficients tracer_idx_range = np.arange(tracer.xpointer, tracer.xpointer + tracer.dim) tracer_idx_seg = np.intersect1d(list_cntrlv_idx, tracer_idx_range) mask_trcr_seg = np.in1d(list_cntrlv_idx, tracer_idx_seg) ntiles = np.sum(mask_trcr_seg) // loc_state_partial.shape[1] loc_matrix_state[:, mask_trcr_seg] = np.tile(loc_state_partial, ntiles) # If full localization is not activated, no need to fill the obs-obs matrix if not getattr(mode.localization, "full_localization", False): return loc_matrix_state, loc_matrix_obs # If all decay lengths are not equal, switch off the full localization if not all_decayl_same: warning(f"Full localization is switched off because all decay lengths are not identical") setattr(mode.localization, "full_localization", False) return loc_matrix_state, loc_matrix_obs # Calculate the distances for the obs-obs localization matrix dx_obs = dist_obs_matrix(lons_obs2assim, lats_obs2assim, lons_obs2assim, lats_obs2assim, projection) # Fill the matrix info(f"Filling obs-obs localization matrix with {decay_func=} and " f"decay_length={previous_decay_length_comp}") loc_matrix_obs = apply_function_loc(dx_obs, decay_func, previous_decay_length_comp) return loc_matrix_state, loc_matrix_obs
[docs] def apply_function_loc(dx, decay_func, decay_length): """Apply the decay function to generate the localization. Args: dx: the distance matrix decay_func: the decay function decay_length: the decay length Returns: obsvect updated with latitude, longitude, date, enddate and parameter """ loc = np.ones_like(dx) if decay_func == "exponential": loc = np.exp(- dx / decay_length) elif decay_func == "normal": loc = np.exp(- 0.5 * dx**2 / decay_length**2) elif decay_func == "heaviside": loc[dx <= decay_length] = 1 loc[dx > decay_length] = 0 elif decay_func == "gc99": mask = dx <= decay_length loc[mask] = \ - (1 / 4) * (dx[mask] / decay_length) ** 5 + \ (1 / 2) * (dx[mask] / decay_length) ** 4 + \ (5 / 8) * (dx[mask] / decay_length) ** 3 - \ (5 / 3) * (dx[mask] / decay_length) ** 2 + 1 mask = (dx > decay_length) & (dx <= 2 * decay_length) loc[mask] = \ (1 / 12) * (dx[mask] / decay_length) ** 5 - \ (1 / 2) * (dx[mask] / decay_length) ** 4 + \ (5 / 8) * (dx[mask] / decay_length) ** 3 + \ (5 / 3) * (dx[mask] / decay_length) ** 2 - \ 5 * (dx[mask] / decay_length) - \ (2 / 3) * (decay_length / dx[mask]) + 4 loc[(dx > 2 * decay_length)] = 0 else: raise Exception( "The requested decay function is not recognized") if np.ma.isMaskedArray(loc): loc = loc.data return loc
[docs] def complement_obsvect(obsvect, **kwargs): """Add extra information to obsvect needed for EnSRF. Args: obsvect (Plugin): the observation vector with all its attributes Returns: obsvect updated with latitude, longitude, date, enddate and parameter """ list_metadata = [] components = obsvect.datavect.components for comp in components.attributes: component = getattr(components, comp) # Skip if component does not have parameters if not hasattr(component, "parameters"): continue for trcr in component.parameters.attributes: tracer = getattr(component.parameters, trcr) if not tracer.isobs: continue keys2store = [ ("metadata", "parameter"), ("metadata", "station"), ("metadata", "lat"), ("metadata", "lon"), ("metadata", "date"), ("metadata", "enddate") ] list_metadata.append(tracer.datastore[keys2store]) try: obsvect.metadata = pd.concat(list_metadata) obsvect.metadata.columns = [k[1] for k in keys2store] obsvect.metadata.index = range(len(obsvect.metadata)) except ValueError: raise ValueError("No observations have been detected in this time window. Check your monitor.") return obsvect