import copy
import numpy as np
from logging import debug
[docs]
def serial_optimization(mode, controlvect, nsample,
list_obs2assim_idx, list_cntrlv_idx,
yobs, yobs_err, x_samples_dev,
hx_mean, hx_samples_dev,
loc_matrix_state, loc_matrix_obs):
"""Serial optimization following Whitacker and Hamill (2002)
Args:
self (Plugin): mode Plugin
controlvect (Plugin): controlvect Plugin
list_obs2assim_idx (list):
list_cntrlv_idx (list):
yobs:
yobs_err:
x_samples_dev:
hx_mean:
hx_samples_dev:
loc_matrix_state:
loc_matrix_obs:
Returns:
xa:
x_samples_dev:
hx_mean:
hx_samples_dev:
"""
# Initialize the returned variables
xa = copy.deepcopy(controlvect.x[list_cntrlv_idx])
xs_dev = copy.deepcopy(x_samples_dev)
hxm = copy.deepcopy(hx_mean)
hxs_dev = copy.deepcopy(hx_samples_dev)
# Loop over the observations to assimilate
for ii, iobs in enumerate(list_obs2assim_idx):
if ii % 10 == 0:
debug(
f"Assimilating the observation n°{iobs} "
f"({ii + 1} / {len(list_obs2assim_idx)})"
)
# Compute the gain matrix
dy_obs = yobs[ii] - hxm[ii] # mean residuals (1)
r_obs = yobs_err[ii] ** 2 # (1)
hxs_dev_obs = hxs_dev[ii] # (nsample,)
bht = xs_dev.T.dot(hxs_dev_obs) / (nsample - 1) # (ncontrol,)
hbht = hxs_dev_obs.dot(hxs_dev_obs) / (nsample - 1) # (1)
kgain_obs = bht / (hbht + r_obs) # (ncontrol)
# Apply state-obs localization
if hasattr(mode, "localization"):
if ii % 10 == 0:
debug(f"Localizing the observation n°{iobs} with state-obs matrix")
kgain_obs *= loc_matrix_state[ii]
# Update the mean and the sample deviations
xa += kgain_obs * dy_obs # (ncontrol,)
alpha = 1 / (1 + np.sqrt(r_obs / (hbht + r_obs))) # ()
xs_dev -= alpha * np.outer(kgain_obs, hxs_dev_obs).T # (nsample, ncontrol)
# Compute the observational gain matrix
hm_bht = hxs_dev.dot(hxs_dev_obs) / (nsample - 1) # (nobs,)
hm_kgain_obs = hm_bht / (hbht + r_obs) # (nobs,)
# Apply obs-obs localization
if hasattr(mode, "localization") and getattr(mode.localization, "full_localization", False) \
and loc_matrix_obs is not None:
if ii % 10 == 0:
debug(f"Localizing the observation n°{iobs} with obs-obs matrix")
hm_kgain_obs *= loc_matrix_obs[ii]
# Update H(x_mean) et H(x_samples) to account for update of state vector based on ii
hxm += hm_kgain_obs * dy_obs # (nobs,)
hxs_dev -= alpha * np.outer(hm_kgain_obs, hxs_dev_obs) # (nobs, nsample)
return xa, xs_dev, hxm, hxs_dev
[docs]
def bulk_optimization(mode, controlvect, nsample,
list_obs2assim_idx, list_cntrlv_idx,
yobs, yobs_err, x_samples_dev,
hx_mean, hx_samples_dev,
loc_matrix_state, loc_matrix_obs):
"""Bulk optimization following Whitacker and Hamill (2002).
See also this other version of the paper for a better explanation.
https://ams.confex.com/ams/pdfpapers/23823.pdf
Args:
self (Plugin): mode Plugin
controlvect (Plugin): controlvect Plugin
list_obs2assim_idx (list):
list_cntrlv_idx (list):
yobs:
yobs_err:
x_samples_dev:
hx_mean:
hx_samples_dev:
loc_matrix_state:
Returns:
xa:
x_samples_dev:
hx_mean:
hx_samples_dev:
"""
# Initialize the returned variables
xa = copy.deepcopy(controlvect.x[list_cntrlv_idx])
xs_dev = copy.deepcopy(x_samples_dev)
hxm = copy.deepcopy(hx_mean)
hxs_dev = copy.deepcopy(hx_samples_dev)
# Compute the correlations
dy = yobs - hxm # (nobs,)
r = np.diag(yobs_err ** 2) # (nobs, nobs)
bht = xs_dev.T.dot(hxs_dev.T) / (nsample - 1) # (ncontrol, nobs)
hbht = hxs_dev.dot(hxs_dev.T) / (nsample - 1) # (nobs, nobs)
# Apply localization
if hasattr(mode, "localization"):
debug(f"Localizing all the assimilated observations...")
bht *= loc_matrix_state.T
hbht *= loc_matrix_obs
# Compute the gain matrix
hbhtr_inv = np.linalg.inv(hbht + r) # (nobs, nobs)
kgain = bht.dot(hbhtr_inv) # (ncontrol, nobs)
# Update the mean
xa += kgain.dot(dy)
# Update the sample deviations
hbhtr_s = np.linalg.cholesky(hbht + r) # (nobs, nobs)
hbhtr_s_inv = np.linalg.inv(hbhtr_s) # (nobs, nobs)
r_s = np.linalg.cholesky(r) # (nobs, nobs)
hbhtr_s_r_s_inv = np.linalg.inv(hbhtr_s + r_s) # (nobs, nobs)
kgain_tilde = bht.dot(hbhtr_s_inv.T).dot(hbhtr_s_r_s_inv) # (ncontrol, nobs)
xs_dev -= kgain_tilde.dot(hxs_dev).T # (nsample, ncontrol)
# Update H(x_mean) et H(x_samples) to account for update of state vector
# ---------------------------------------------------------------------------
# This part is not necessary and can be commented if the process is too long
# ---------------------------------------------------------------------------
hkgain = hbht.dot(hbhtr_inv.T) # (nobs, nobs)
hkgain_tilde = hbht.dot(hbhtr_s_inv.T).dot(hbhtr_s_r_s_inv) # (nobs, nobs)
hxm += hkgain.dot(dy) # (nobs,)
hxs_dev -= hkgain_tilde.dot(hxs_dev) # (nobs, nsample)
# ---------------------------------------------------------------------------
return xa, xs_dev, hxm, hxs_dev