Source code for pycif.plugins.modes.response_functions.analytical_inversion

import os
import time
from logging import debug, info
from typing import Any, Literal, Tuple

import numpy as np
import xarray as xr

from ....utils.sparse_array import to_sparse_dataset

# Aliases for type hinting
Mode = Any
ControlVect = Any
ObsVect = Any


[docs] def compute_inversion( H: np.ndarray, B: np.ndarray, R: np.ndarray, xb: np.ndarray, dy: np.ndarray, use_woodbury_identity: Literal[True, False, "auto"], ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Perform the analytical inversion using either straightforward matrix inversion or the Woodbury matrix identity. Args: H (2D array (M, N)): The H matrix. B (2D array (N, N)): The B matrix. R (2D array (M, M)): The R matrix. xb (1D array (N,)): The xb vector. dy (1D array (M,)): The dy vector. use_woodbury_identity (bool or "auto"): Whether to use the Woodbury matrix identity for inversion. Returns: (2D array (N,N), 1D array (N,), 1D array (M,)): Pa, xa, and ya. """ # Checking if using of the Woodbury matrix identity to invert matrix # (R + H B H^T) would reduce computation time, this is the case when # dim(R) >> dim(B) if use_woodbury_identity == "auto": # Rough estimates of the complexity of all operation in the computation # of the S matrix: x, _ = np.shape(B) y, _ = np.shape(R) # Straightforward matrix inversion (roughly equivalent to y^3 + x^2) cpx_a = y * (x * x + x * y + y * y + y) # Matrix inversion with Woodbury identity # (assuming R diagonal, roughly equivalent to x^3 + y^2) cpx_b = x * x * (x + 3 * y + 1) + y * (y + 4) use_woodbury_identity = cpx_a / cpx_b > 10 # Computing S = (R + HBH^T)^{-1} start_time = time.perf_counter() if use_woodbury_identity: info("Using Woodbury matrix identity for inversion") # Assumining R is diagonal R = np.diag(R) R_inv = 1 / R debug("Computing the inverse of B") B_inv = np.linalg.pinv(B) debug("Computing (B-1 + H^T R-1 H)") # Right multiplication by a diagonal matrix M = R_inv[:, np.newaxis] * H A = B_inv + H.T @ M debug("Computing the inverse of (B-1 + H^T R-1 H)") A_inv = np.linalg.pinv(A) debug("Computing the inverse of (R + H B H^T)") M = H @ A_inv @ H.T # Left and right multiplication by a diagonal matrix M = R_inv[:, np.newaxis] * np.asarray(M) * R_inv[np.newaxis, :] S = np.diag(R_inv) - M else: info("Using straightforward matrix inverse for inversion") debug("Computing the inverse of (R + H B H^T)") M = R + H @ B @ H.T debug("Computing the inverse of (R + H B H^T)") S = np.linalg.pinv(M) inv_time = time.perf_counter() - start_time info(f"Computation of S = (R + HBH^T)^{-1} took {inv_time} s") # Remaining operations K = B @ H.T @ S Pa = B - K @ H @ B xa = xb + K.dot(dy) ya = H.dot(xa) return Pa, xa, ya
[docs] def analytical_inversion( self: Mode, xb: np.ndarray, h_matrix: np.ndarray, controlvect: ControlVect, obsvect: ObsVect ) -> None: """Perform an analytical inversions and dumps the matrices an vectors if the 'analytical_inversion' input argument is set to 'true', otherwise only dumps the H matrix Args: self (Mode): the mode plugin xb (1D array): control vector prior h_matrix (2D array): H matrix controlvect (ControlVect): the control vector plugin obsvect (ObsVect): the observation vector plugin """ if self.run_mode == "fwd": name = "forward" elif self.run_mode == "tl": name = "tangent linear" else: raise ValueError(f"unexpected run_mode '{self.run_mode}'") dump_data = xr.Dataset( { "H_matrix": ( ["y", "x"], h_matrix, { "standard_name": "H", "long_name": f"{name} observation operator matrix", }, ) }, coords={ "x": ( ["x"], range(controlvect.dim), { "standard_name": "x_dim", "long_name": "control vector dimension", }, ), "y": ( ["y"], range(obsvect.dim), { "standard_name": "y_dim", "long_name": "observation vector dimension", }, ), }, ) if self.analytical_inversion: assert np.all(np.isfinite(self.obsvect.yobs)) assert np.all(np.isfinite(self.obsvect.yobs_err)) # 'obsvect.ysim' is filled by 'fill_obsvect': # fwd: obsvect.ysim = h_matrix.dot(xb) # tl: obsvect.ysim = np.sum(h_matrix, axis=1) dy = obsvect.yobs - obsvect.ysim controlvect.xb = xb b_matrix = controlvect.build_b(controlvect) r_matrix = obsvect.build_r(obsvect) dump_data['B_matrix'] = (['x', 'x'], b_matrix, { 'standard_name': "B", 'long_name': "control vector error covariance matrix", }) dump_data['R_matrix'] = (['y', 'y'], r_matrix, { 'standard_name': "R", 'long_name': "observation vector error covariance matrix", }) dump_data['y_vector'] = (['y'], obsvect.yobs, { 'standard_name': "y", 'long_name': "observation vector", }) dump_data['xb_vector'] = (['x'], xb, { 'standard_name': "xb", 'long_name': "prior control vector", }) # Perform the inversion Pa, xa, ya = compute_inversion(h_matrix, b_matrix, r_matrix, xb, dy, self.use_woodbury_identity) # Dumping the posterior observation vector obsvect.ysim = ya dir_obsvect_post = os.path.join(self.workdir, "obsvect_post") obsvect.dump(dir_obsvect_post) controlvect.x = xa controlvect.pa = Pa # Dumping the posterior control vector dir_controlvect = os.path.join(self.workdir, "controlvect") file_xa = os.path.join(dir_controlvect, "controlvect_post.pickle") controlvect.dump(file_xa, to_netcdf=True, dir_netcdf=dir_controlvect) if self.dump_sparse_arrays: var_names_list = ['H_matrix'] if self.analytical_inversion: var_names_list.extend(['B_matrix', 'R_matrix']) dump_data = to_sparse_dataset( dump_data, variable_names=var_names_list) # type: ignore # Dumping the analytical inversion matrices and vectors filename = "analytical_inversion.nc" if self.analytical_inversion else "h_matrix.nc" dump_path = os.path.join(self.workdir, filename) info(f"Dumping H matrix to '{dump_path}'") dump_data.to_netcdf(dump_path)