import numpy as np
import copy
from logging import info, warning
[docs]
def testing_AD_symmetry(self, accuracy, increments, testspace, **kwargs):
"""Check symmetry of the operators ``H^T H`` and ``H^T R^{-1} H``.
Computes ``H^T H u`` and ``H^T H v`` for two independent random vectors
``u`` and ``v`` and prints the inner products ``<H^T H u, v>`` vs
``<H^T H v, u>``. Repeats the check with ``R^{-1}`` included.
Args:
self (Plugin): mode plugin with all configuration attributes.
accuracy (float): machine epsilon (``np.finfo(np.float64).eps``).
increments (float): standard deviation used to draw random vectors.
testspace (str): ``'control'`` or ``'chi'``.
**kwargs: forwarded verbatim to the observation operator.
"""
# Working directory
workdir = self.workdir
# Control vector
controlvect = self.controlvect
# Observation operator
obsoper = self.obsoperator
# Obsvervation vector
obsvect = self.obsvect
# Simulation window
datei = self.datei
datef = self.datef
# Set simulation id to 1
self.iter_id = 1
# Some verbose
info("Computing the test of the adjoint symmetry")
# Compute HtH for two random vectors
init_random(self, controlvect, **kwargs)
u0_x = copy.deepcopy(controlvect.dx[:])
u0_chi = copy.deepcopy(controlvect.chi[:])
compute_HtH(self, obsoper, controlvect, obsvect,
datei, datef, workdir, **kwargs)
HtHu0 = copy.deepcopy(controlvect.dx[:])
HtHu0_chi = copy.deepcopy(controlvect.sqrtbprod_ad(
controlvect.dx, **kwargs))
init_random(self, controlvect, **kwargs)
v0_x = copy.deepcopy(controlvect.dx[:])
v0_chi = copy.deepcopy(controlvect.chi[:])
compute_HtH(self, obsoper, controlvect, obsvect,
datei, datef, workdir, **kwargs)
HtHv0 = copy.deepcopy(controlvect.dx[:])
HtHv0_chi = copy.deepcopy(controlvect.sqrtbprod_ad(
controlvect.dx, **kwargs))
# Check symmetry with HtH
print(f"< HtH u , v > = {np.nansum(HtHu0 * v0_x)}")
print(f"< HtH v , u > = {np.nansum(HtHv0 * u0_x)}")
# Now checking the application of sqrtB
print(f"< BHtHB u , v > = {np.nansum(HtHu0_chi * v0_chi)}")
print(f"< BHtHB v , u > = {np.nansum(HtHv0_chi * u0_chi)}")
# Compute HtRinvH for two random vectors
controlvect.dx[:] = copy.deepcopy(u0_x[:])
controlvect.chi[:] = copy.deepcopy(u0_chi[:])
compute_HtH(self, obsoper, controlvect, obsvect,
datei, datef, workdir, include_R=True, **kwargs)
HtRHu1 = copy.deepcopy(controlvect.dx[:])
HtRHu1_chi = copy.deepcopy(controlvect.sqrtbprod_ad(
controlvect.dx, **kwargs))
controlvect.dx[:] = copy.deepcopy(v0_x[:])
controlvect.chi[:] = copy.deepcopy(v0_chi[:])
compute_HtH(self, obsoper, controlvect, obsvect,
datei, datef, workdir, include_R=True, **kwargs)
HtRHv1 = copy.deepcopy(controlvect.dx[:])
HtRHv1_chi = copy.deepcopy(controlvect.sqrtbprod_ad(
controlvect.dx, **kwargs))
# Check symmetry with HtH
print(f"< HtRH u , v > = {np.nansum(HtRHu1 * v0_x)}")
print(f"< HtRH v , u > = {np.nansum(HtRHv1 * u0_x)}")
# Now checking the application of sqrtB
print(f"< BHtRHB u , v > = {np.nansum(HtRHu1_chi * v0_chi)}")
print(f"< BHtRHB v , u > = {np.nansum(HtRHv1_chi * u0_chi)}")
# Now get values for the simulator and gradient
fb_u, fo_u, gb_u, go_u = self.simulator.simul(
u0_chi.flatten(), run_id=self.iter_id,
linear=True,
split_obs_vs_control=True,
**kwargs
)
self.iter_id += 1
fb_v, fo_v, gb_v, go_v = self.simulator.simul(
v0_chi.flatten(), run_id=self.iter_id,
linear=True,
split_obs_vs_control=True,
**kwargs
)
self.iter_id += 1
fb_ref, fo_ref, gb_ref, go_ref = self.simulator.simul(
0 * u0_chi.flatten(), run_id=self.iter_id,
linear=True,
split_obs_vs_control=True,
**kwargs
)
self.iter_id += 1
print(__file__)
import code
code.interact(local=dict(locals(), **globals()))
go_u - go_ref == HtRHu1_chi
[docs]
def init_random(self, controlvect, **kwargs):
"""Draw a random perturbation and store it in ``controlvect.dx`` and ``controlvect.chi``."""
# Increments in x
increments = self.increments
incrmode = 'random'
testspace = self.testspace
testtype = self.testtype
controlvect.chi = np.random.normal(0, increments,
controlvect.chi_dim)
controlvect.dx = (
controlvect.sqrtbprod(controlvect.chi, **kwargs) - controlvect.xb
)
[docs]
def compute_HtH(
self, obsoper, controlvect, obsvect, datei, datef, workdir,
include_R=False,
**kwargs
):
"""Apply ``H^T H`` (or ``H^T R^{-1} H``) to the current ``controlvect.dx``.
Runs the tangent-linear (or two finite-difference forward runs when
``use_forward`` is set) to compute ``H·dx``, optionally applies
``R^{-1}``, then runs the adjoint to produce ``H^T(R^{-1})H·dx`` stored
in ``controlvect.dx``.
Args:
self (Plugin): mode plugin.
obsoper: observation operator plugin.
controlvect: control vector plugin (``dx`` is read on entry and
overwritten on exit).
obsvect: observation vector plugin.
datei (datetime): simulation start date.
datef (datetime): simulation end date.
workdir (str): working directory.
include_R (bool): if ``True``, multiply ``obsvect.dy`` by ``R^{-1}``
before the adjoint run.
**kwargs: forwarded verbatim to the observation operator.
"""
# Running the tangent linear code of the model: H(xb)(dx)
if not self.use_forward:
obsvect = obsoper.obsoper(
controlvect,
obsvect,
"tl",
datei=datei,
datef=datef,
workdir=workdir,
reload_results=self.reload_results,
check_transforms=self.check_transforms,
run_id=self.iter_id,
**kwargs
)
else:
# Reference simulation: H(xb)
obsvect_ref = obsoper.obsoper(
controlvect,
obsvect,
"fwd",
datei=datei,
datef=datef,
workdir=workdir,
reload_results=self.reload_results,
check_transforms=self.check_transforms,
run_id=self.iter_id,
**kwargs
)
ysim_ref = copy.deepcopy(obsvect_ref.ysim)
# Save check transforms
if self.check_transforms:
data_tl_save = copy.deepcopy(obsoper.data_tl)
del obsoper.data_tl
# Perturbed simulation: H(xb + dx)
controlvect.x = copy.deepcopy(controlvect.xb + controlvect.dx_save)
obsvect_dy = obsoper.obsoper(
controlvect,
obsvect,
"fwd",
datei=datei,
datef=datef,
workdir=workdir,
reload_results=self.reload_results,
check_transforms=self.check_transforms,
run_id=self.iter_id + 1,
**kwargs
)
# Replace dy by difference between ref and perturbed control vector
obsvect.dy = obsvect_dy.ysim - ysim_ref
# Putting xb and x back to there original values
controlvect.x -= controlvect.dx_save
# Putting increments in the observation vector
mask_obsvect = obsvect.obsvect_mask
obsvect.dy = np.where(np.isnan(obsvect.dy), 0, obsvect.dy)
obsvect.dy[~mask_obsvect] = 0
# Include multiplication by Rinv
if include_R:
obsvect.dy = obsvect.rinvprod(obsvect.dy, mask=mask_obsvect)
# Running the observation operator
controlvect = obsoper.obsoper(
controlvect,
obsvect,
"adj",
datei=datei,
datef=datef,
workdir=workdir,
reload_results=self.reload_results,
check_transforms=self.check_transforms,
run_id=self.iter_id,
**kwargs
)
# Updqte run_id
self.iter_id += 1
# def estimate_norm_A(A, n, iters=10):
# x = np.random.randn(n)
# x /= np.linalg.norm(x)
# for _ in range(iters):
# y = A.matvec(x)
# norm_y = np.linalg.norm(y)
# if norm_y == 0:
# return 0.0
# x = y / norm_y
# return norm_y
# n = grad_dim
# est_normA = estimate_norm_A(A, n, iters=10)
# eps = np.finfo(float).eps
# threshold = 100 * n * eps * max(1.0, est_normA)
# def symmetry_check(A, n, trials=5):
# errs = []
# for _ in range(trials):
# u = np.random.randn(n)
# v = np.random.randn(n)
# Au = A.matvec(u)
# Av = A.matvec(v)
# err = abs(np.vdot(u, Av) - np.vdot(Au, v))
# errs.append(err)
# return np.max(errs), np.median(errs)
# max_err, med_err = symmetry_check(A, n)
# print("max sym test error:", max_err)
# u = np.random.uniform(0, 1, size=(grad_dim, 1))
# v = np.random.uniform(0, 1, size=(grad_dim, 1))
# print(np.sum(A.matvec(u) * v), np.sum(A.matvec(v) * u))
# print(__file__)
# import code
# code.interact(local=dict(locals(), **globals()))