Source code for pycif.plugins.modes.adjtl_test.testing_adj_sym

import numpy as np
import copy
from logging import info, warning


[docs] def testing_AD_symmetry(self, accuracy, increments, testspace, **kwargs): """Check symmetry of the operators ``H^T H`` and ``H^T R^{-1} H``. Computes ``H^T H u`` and ``H^T H v`` for two independent random vectors ``u`` and ``v`` and prints the inner products ``<H^T H u, v>`` vs ``<H^T H v, u>``. Repeats the check with ``R^{-1}`` included. Args: self (Plugin): mode plugin with all configuration attributes. accuracy (float): machine epsilon (``np.finfo(np.float64).eps``). increments (float): standard deviation used to draw random vectors. testspace (str): ``'control'`` or ``'chi'``. **kwargs: forwarded verbatim to the observation operator. """ # Working directory workdir = self.workdir # Control vector controlvect = self.controlvect # Observation operator obsoper = self.obsoperator # Obsvervation vector obsvect = self.obsvect # Simulation window datei = self.datei datef = self.datef # Set simulation id to 1 self.iter_id = 1 # Some verbose info("Computing the test of the adjoint symmetry") # Compute HtH for two random vectors init_random(self, controlvect, **kwargs) u0_x = copy.deepcopy(controlvect.dx[:]) u0_chi = copy.deepcopy(controlvect.chi[:]) compute_HtH(self, obsoper, controlvect, obsvect, datei, datef, workdir, **kwargs) HtHu0 = copy.deepcopy(controlvect.dx[:]) HtHu0_chi = copy.deepcopy(controlvect.sqrtbprod_ad( controlvect.dx, **kwargs)) init_random(self, controlvect, **kwargs) v0_x = copy.deepcopy(controlvect.dx[:]) v0_chi = copy.deepcopy(controlvect.chi[:]) compute_HtH(self, obsoper, controlvect, obsvect, datei, datef, workdir, **kwargs) HtHv0 = copy.deepcopy(controlvect.dx[:]) HtHv0_chi = copy.deepcopy(controlvect.sqrtbprod_ad( controlvect.dx, **kwargs)) # Check symmetry with HtH print(f"< HtH u , v > = {np.nansum(HtHu0 * v0_x)}") print(f"< HtH v , u > = {np.nansum(HtHv0 * u0_x)}") # Now checking the application of sqrtB print(f"< BHtHB u , v > = {np.nansum(HtHu0_chi * v0_chi)}") print(f"< BHtHB v , u > = {np.nansum(HtHv0_chi * u0_chi)}") # Compute HtRinvH for two random vectors controlvect.dx[:] = copy.deepcopy(u0_x[:]) controlvect.chi[:] = copy.deepcopy(u0_chi[:]) compute_HtH(self, obsoper, controlvect, obsvect, datei, datef, workdir, include_R=True, **kwargs) HtRHu1 = copy.deepcopy(controlvect.dx[:]) HtRHu1_chi = copy.deepcopy(controlvect.sqrtbprod_ad( controlvect.dx, **kwargs)) controlvect.dx[:] = copy.deepcopy(v0_x[:]) controlvect.chi[:] = copy.deepcopy(v0_chi[:]) compute_HtH(self, obsoper, controlvect, obsvect, datei, datef, workdir, include_R=True, **kwargs) HtRHv1 = copy.deepcopy(controlvect.dx[:]) HtRHv1_chi = copy.deepcopy(controlvect.sqrtbprod_ad( controlvect.dx, **kwargs)) # Check symmetry with HtH print(f"< HtRH u , v > = {np.nansum(HtRHu1 * v0_x)}") print(f"< HtRH v , u > = {np.nansum(HtRHv1 * u0_x)}") # Now checking the application of sqrtB print(f"< BHtRHB u , v > = {np.nansum(HtRHu1_chi * v0_chi)}") print(f"< BHtRHB v , u > = {np.nansum(HtRHv1_chi * u0_chi)}") # Now get values for the simulator and gradient fb_u, fo_u, gb_u, go_u = self.simulator.simul( u0_chi.flatten(), run_id=self.iter_id, linear=True, split_obs_vs_control=True, **kwargs ) self.iter_id += 1 fb_v, fo_v, gb_v, go_v = self.simulator.simul( v0_chi.flatten(), run_id=self.iter_id, linear=True, split_obs_vs_control=True, **kwargs ) self.iter_id += 1 fb_ref, fo_ref, gb_ref, go_ref = self.simulator.simul( 0 * u0_chi.flatten(), run_id=self.iter_id, linear=True, split_obs_vs_control=True, **kwargs ) self.iter_id += 1 print(__file__) import code code.interact(local=dict(locals(), **globals())) go_u - go_ref == HtRHu1_chi
[docs] def init_random(self, controlvect, **kwargs): """Draw a random perturbation and store it in ``controlvect.dx`` and ``controlvect.chi``.""" # Increments in x increments = self.increments incrmode = 'random' testspace = self.testspace testtype = self.testtype controlvect.chi = np.random.normal(0, increments, controlvect.chi_dim) controlvect.dx = ( controlvect.sqrtbprod(controlvect.chi, **kwargs) - controlvect.xb )
[docs] def compute_HtH( self, obsoper, controlvect, obsvect, datei, datef, workdir, include_R=False, **kwargs ): """Apply ``H^T H`` (or ``H^T R^{-1} H``) to the current ``controlvect.dx``. Runs the tangent-linear (or two finite-difference forward runs when ``use_forward`` is set) to compute ``H·dx``, optionally applies ``R^{-1}``, then runs the adjoint to produce ``H^T(R^{-1})H·dx`` stored in ``controlvect.dx``. Args: self (Plugin): mode plugin. obsoper: observation operator plugin. controlvect: control vector plugin (``dx`` is read on entry and overwritten on exit). obsvect: observation vector plugin. datei (datetime): simulation start date. datef (datetime): simulation end date. workdir (str): working directory. include_R (bool): if ``True``, multiply ``obsvect.dy`` by ``R^{-1}`` before the adjoint run. **kwargs: forwarded verbatim to the observation operator. """ # Running the tangent linear code of the model: H(xb)(dx) if not self.use_forward: obsvect = obsoper.obsoper( controlvect, obsvect, "tl", datei=datei, datef=datef, workdir=workdir, reload_results=self.reload_results, check_transforms=self.check_transforms, run_id=self.iter_id, **kwargs ) else: # Reference simulation: H(xb) obsvect_ref = obsoper.obsoper( controlvect, obsvect, "fwd", datei=datei, datef=datef, workdir=workdir, reload_results=self.reload_results, check_transforms=self.check_transforms, run_id=self.iter_id, **kwargs ) ysim_ref = copy.deepcopy(obsvect_ref.ysim) # Save check transforms if self.check_transforms: data_tl_save = copy.deepcopy(obsoper.data_tl) del obsoper.data_tl # Perturbed simulation: H(xb + dx) controlvect.x = copy.deepcopy(controlvect.xb + controlvect.dx_save) obsvect_dy = obsoper.obsoper( controlvect, obsvect, "fwd", datei=datei, datef=datef, workdir=workdir, reload_results=self.reload_results, check_transforms=self.check_transforms, run_id=self.iter_id + 1, **kwargs ) # Replace dy by difference between ref and perturbed control vector obsvect.dy = obsvect_dy.ysim - ysim_ref # Putting xb and x back to there original values controlvect.x -= controlvect.dx_save # Putting increments in the observation vector mask_obsvect = obsvect.obsvect_mask obsvect.dy = np.where(np.isnan(obsvect.dy), 0, obsvect.dy) obsvect.dy[~mask_obsvect] = 0 # Include multiplication by Rinv if include_R: obsvect.dy = obsvect.rinvprod(obsvect.dy, mask=mask_obsvect) # Running the observation operator controlvect = obsoper.obsoper( controlvect, obsvect, "adj", datei=datei, datef=datef, workdir=workdir, reload_results=self.reload_results, check_transforms=self.check_transforms, run_id=self.iter_id, **kwargs ) # Updqte run_id self.iter_id += 1
# def estimate_norm_A(A, n, iters=10): # x = np.random.randn(n) # x /= np.linalg.norm(x) # for _ in range(iters): # y = A.matvec(x) # norm_y = np.linalg.norm(y) # if norm_y == 0: # return 0.0 # x = y / norm_y # return norm_y # n = grad_dim # est_normA = estimate_norm_A(A, n, iters=10) # eps = np.finfo(float).eps # threshold = 100 * n * eps * max(1.0, est_normA) # def symmetry_check(A, n, trials=5): # errs = [] # for _ in range(trials): # u = np.random.randn(n) # v = np.random.randn(n) # Au = A.matvec(u) # Av = A.matvec(v) # err = abs(np.vdot(u, Av) - np.vdot(Au, v)) # errs.append(err) # return np.max(errs), np.median(errs) # max_err, med_err = symmetry_check(A, n) # print("max sym test error:", max_err) # u = np.random.uniform(0, 1, size=(grad_dim, 1)) # v = np.random.uniform(0, 1, size=(grad_dim, 1)) # print(np.sum(A.matvec(u) * v), np.sum(A.matvec(v) * u)) # print(__file__) # import code # code.interact(local=dict(locals(), **globals()))