Source code for pycif.plugins.modes.ensrf.optimization

import copy
import numpy as np
from logging import debug


[docs] def serial_optimization(mode, controlvect, nsample, list_obs2assim_idx, list_cntrlv_idx, yobs, yobs_err, x_samples_dev, hx_mean, hx_samples_dev, loc_matrix_state, loc_matrix_obs): """Serial optimization following Whitacker and Hamill (2002) Args: self (Plugin): mode Plugin controlvect (Plugin): controlvect Plugin list_obs2assim_idx (list): list_cntrlv_idx (list): yobs: yobs_err: x_samples_dev: hx_mean: hx_samples_dev: loc_matrix_state: loc_matrix_obs: Returns: xa: x_samples_dev: hx_mean: hx_samples_dev: """ # Initialize the returned variables xa = copy.deepcopy(controlvect.x[list_cntrlv_idx]) xs_dev = copy.deepcopy(x_samples_dev) hxm = copy.deepcopy(hx_mean) hxs_dev = copy.deepcopy(hx_samples_dev) # Loop over the observations to assimilate for ii, iobs in enumerate(list_obs2assim_idx): if ii % 10 == 0: debug( f"Assimilating the observation n°{iobs} " f"({ii + 1} / {len(list_obs2assim_idx)})" ) # Compute the gain matrix dy_obs = yobs[ii] - hxm[ii] # mean residuals (1) r_obs = yobs_err[ii] ** 2 # (1) hxs_dev_obs = hxs_dev[ii] # (nsample,) bht = xs_dev.T.dot(hxs_dev_obs) / (nsample - 1) # (ncontrol,) hbht = hxs_dev_obs.dot(hxs_dev_obs) / (nsample - 1) # (1) kgain_obs = bht / (hbht + r_obs) # (ncontrol) # Apply state-obs localization if hasattr(mode, "localization"): if ii % 10 == 0: debug(f"Localizing the observation n°{iobs} with state-obs matrix") kgain_obs *= loc_matrix_state[ii] # Update the mean and the sample deviations xa += kgain_obs * dy_obs # (ncontrol,) alpha = 1 / (1 + np.sqrt(r_obs / (hbht + r_obs))) # () xs_dev -= alpha * np.outer(kgain_obs, hxs_dev_obs).T # (nsample, ncontrol) # Compute the observational gain matrix hm_bht = hxs_dev.dot(hxs_dev_obs) / (nsample - 1) # (nobs,) hm_kgain_obs = hm_bht / (hbht + r_obs) # (nobs,) # Apply obs-obs localization if hasattr(mode, "localization") and getattr(mode.localization, "full_localization", False) \ and loc_matrix_obs is not None: if ii % 10 == 0: debug(f"Localizing the observation n°{iobs} with obs-obs matrix") hm_kgain_obs *= loc_matrix_obs[ii] # Update H(x_mean) et H(x_samples) to account for update of state vector based on ii hxm += hm_kgain_obs * dy_obs # (nobs,) hxs_dev -= alpha * np.outer(hm_kgain_obs, hxs_dev_obs) # (nobs, nsample) return xa, xs_dev, hxm, hxs_dev
[docs] def bulk_optimization(mode, controlvect, nsample, list_obs2assim_idx, list_cntrlv_idx, yobs, yobs_err, x_samples_dev, hx_mean, hx_samples_dev, loc_matrix_state, loc_matrix_obs): """Bulk optimization following Whitacker and Hamill (2002). See also this other version of the paper for a better explanation. https://ams.confex.com/ams/pdfpapers/23823.pdf Args: self (Plugin): mode Plugin controlvect (Plugin): controlvect Plugin list_obs2assim_idx (list): list_cntrlv_idx (list): yobs: yobs_err: x_samples_dev: hx_mean: hx_samples_dev: loc_matrix_state: Returns: xa: x_samples_dev: hx_mean: hx_samples_dev: """ # Initialize the returned variables xa = copy.deepcopy(controlvect.x[list_cntrlv_idx]) xs_dev = copy.deepcopy(x_samples_dev) hxm = copy.deepcopy(hx_mean) hxs_dev = copy.deepcopy(hx_samples_dev) # Compute the correlations dy = yobs - hxm # (nobs,) r = np.diag(yobs_err ** 2) # (nobs, nobs) bht = xs_dev.T.dot(hxs_dev.T) / (nsample - 1) # (ncontrol, nobs) hbht = hxs_dev.dot(hxs_dev.T) / (nsample - 1) # (nobs, nobs) # Apply localization if hasattr(mode, "localization"): debug(f"Localizing all the assimilated observations...") bht *= loc_matrix_state.T hbht *= loc_matrix_obs # Compute the gain matrix hbhtr_inv = np.linalg.inv(hbht + r) # (nobs, nobs) kgain = bht.dot(hbhtr_inv) # (ncontrol, nobs) # Update the mean xa += kgain.dot(dy) # Update the sample deviations hbhtr_s = np.linalg.cholesky(hbht + r) # (nobs, nobs) hbhtr_s_inv = np.linalg.inv(hbhtr_s) # (nobs, nobs) r_s = np.linalg.cholesky(r) # (nobs, nobs) hbhtr_s_r_s_inv = np.linalg.inv(hbhtr_s + r_s) # (nobs, nobs) kgain_tilde = bht.dot(hbhtr_s_inv.T).dot(hbhtr_s_r_s_inv) # (ncontrol, nobs) xs_dev -= kgain_tilde.dot(hxs_dev).T # (nsample, ncontrol) # Update H(x_mean) et H(x_samples) to account for update of state vector # --------------------------------------------------------------------------- # This part is not necessary and can be commented if the process is too long # --------------------------------------------------------------------------- hkgain = hbht.dot(hbhtr_inv.T) # (nobs, nobs) hkgain_tilde = hbht.dot(hbhtr_s_inv.T).dot(hbhtr_s_r_s_inv) # (nobs, nobs) hxm += hkgain.dot(dy) # (nobs,) hxs_dev -= hkgain_tilde.dot(hxs_dev) # (nobs, nsample) # --------------------------------------------------------------------------- return xa, xs_dev, hxm, hxs_dev