import os
import time
from logging import debug, info
from typing import Any, Literal, Tuple
import numpy as np
import xarray as xr
from ....utils.sparse_array import to_sparse_dataset
# Aliases for type hinting
Mode = Any
ControlVect = Any
ObsVect = Any
[docs]
def compute_inversion(
H: np.ndarray,
B: np.ndarray,
R: np.ndarray,
xb: np.ndarray,
dy: np.ndarray,
use_woodbury_identity: Literal[True, False, "auto"],
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Perform the analytical inversion using either straightforward matrix
inversion or the Woodbury matrix identity.
Args:
H (2D array (M, N)): The H matrix.
B (2D array (N, N)): The B matrix.
R (2D array (M, M)): The R matrix.
xb (1D array (N,)): The xb vector.
dy (1D array (M,)): The dy vector.
use_woodbury_identity (bool or "auto"): Whether to use the Woodbury
matrix identity for inversion.
Returns:
(2D array (N,N), 1D array (N,), 1D array (M,)): Pa, xa, and ya.
"""
# Checking if using of the Woodbury matrix identity to invert matrix
# (R + H B H^T) would reduce computation time, this is the case when
# dim(R) >> dim(B)
if use_woodbury_identity == "auto":
# Rough estimates of the complexity of all operation in the computation
# of the S matrix:
x, _ = np.shape(B)
y, _ = np.shape(R)
# Straightforward matrix inversion (roughly equivalent to y^3 + x^2)
cpx_a = y * (x * x + x * y + y * y + y)
# Matrix inversion with Woodbury identity
# (assuming R diagonal, roughly equivalent to x^3 + y^2)
cpx_b = x * x * (x + 3 * y + 1) + y * (y + 4)
use_woodbury_identity = cpx_a / cpx_b > 10
# Computing S = (R + HBH^T)^{-1}
start_time = time.perf_counter()
if use_woodbury_identity:
info("Using Woodbury matrix identity for inversion")
# Assumining R is diagonal
R = np.diag(R)
R_inv = 1 / R
debug("Computing the inverse of B")
B_inv = np.linalg.pinv(B)
debug("Computing (B-1 + H^T R-1 H)")
# Right multiplication by a diagonal matrix
M = R_inv[:, np.newaxis] * H
A = B_inv + H.T @ M
debug("Computing the inverse of (B-1 + H^T R-1 H)")
A_inv = np.linalg.pinv(A)
debug("Computing the inverse of (R + H B H^T)")
M = H @ A_inv @ H.T
# Left and right multiplication by a diagonal matrix
M = R_inv[:, np.newaxis] * np.asarray(M) * R_inv[np.newaxis, :]
S = np.diag(R_inv) - M
else:
info("Using straightforward matrix inverse for inversion")
debug("Computing the inverse of (R + H B H^T)")
M = R + H @ B @ H.T
debug("Computing the inverse of (R + H B H^T)")
S = np.linalg.pinv(M)
inv_time = time.perf_counter() - start_time
info(f"Computation of S = (R + HBH^T)^{-1} took {inv_time} s")
# Remaining operations
K = B @ H.T @ S
Pa = B - K @ H @ B
xa = xb + K.dot(dy)
ya = H.dot(xa)
return Pa, xa, ya
[docs]
def analytical_inversion(
self: Mode,
xb: np.ndarray,
h_matrix: np.ndarray,
controlvect: ControlVect,
obsvect: ObsVect
) -> None:
"""Perform an analytical inversions and dumps the matrices an vectors if
the 'analytical_inversion' input argument is set to 'true', otherwise only
dumps the H matrix
Args:
self (Mode): the mode plugin
xb (1D array): control vector prior
h_matrix (2D array): H matrix
controlvect (ControlVect): the control vector plugin
obsvect (ObsVect): the observation vector plugin
"""
if self.run_mode == "fwd":
name = "forward"
elif self.run_mode == "tl":
name = "tangent linear"
else:
raise ValueError(f"unexpected run_mode '{self.run_mode}'")
dump_data = xr.Dataset(
{
"H_matrix": (
["y", "x"],
h_matrix,
{
"standard_name": "H",
"long_name": f"{name} observation operator matrix",
},
)
},
coords={
"x": (
["x"],
range(controlvect.dim),
{
"standard_name": "x_dim",
"long_name": "control vector dimension",
},
),
"y": (
["y"],
range(obsvect.dim),
{
"standard_name": "y_dim",
"long_name": "observation vector dimension",
},
),
},
)
if self.analytical_inversion:
assert np.all(np.isfinite(self.obsvect.yobs))
assert np.all(np.isfinite(self.obsvect.yobs_err))
# 'obsvect.ysim' is filled by 'fill_obsvect':
# fwd: obsvect.ysim = h_matrix.dot(xb)
# tl: obsvect.ysim = np.sum(h_matrix, axis=1)
dy = obsvect.yobs - obsvect.ysim
controlvect.xb = xb
b_matrix = controlvect.build_b(controlvect)
r_matrix = obsvect.build_r(obsvect)
dump_data['B_matrix'] = (['x', 'x'], b_matrix, {
'standard_name': "B",
'long_name': "control vector error covariance matrix",
})
dump_data['R_matrix'] = (['y', 'y'], r_matrix, {
'standard_name': "R",
'long_name': "observation vector error covariance matrix",
})
dump_data['y_vector'] = (['y'], obsvect.yobs, {
'standard_name': "y",
'long_name': "observation vector",
})
dump_data['xb_vector'] = (['x'], xb, {
'standard_name': "xb",
'long_name': "prior control vector",
})
# Perform the inversion
Pa, xa, ya = compute_inversion(h_matrix, b_matrix, r_matrix, xb, dy,
self.use_woodbury_identity)
# Dumping the posterior observation vector
obsvect.ysim = ya
dir_obsvect_post = os.path.join(self.workdir, "obsvect_post")
obsvect.dump(dir_obsvect_post)
controlvect.x = xa
controlvect.pa = Pa
# Dumping the posterior control vector
dir_controlvect = os.path.join(self.workdir, "controlvect")
file_xa = os.path.join(dir_controlvect, "controlvect_post.pickle")
controlvect.dump(file_xa, to_netcdf=True, dir_netcdf=dir_controlvect)
if self.dump_sparse_arrays:
var_names_list = ['H_matrix']
if self.analytical_inversion:
var_names_list.extend(['B_matrix', 'R_matrix'])
dump_data = to_sparse_dataset(
dump_data, variable_names=var_names_list) # type: ignore
# Dumping the analytical inversion matrices and vectors
filename = "analytical_inversion.nc" if self.analytical_inversion else "h_matrix.nc"
dump_path = os.path.join(self.workdir, filename)
info(f"Dumping H matrix to '{dump_path}'")
dump_data.to_netcdf(dump_path)