Source code for pycif.plugins.transforms.complex.satellites.forward

import numpy as np
import pandas as pd
import xarray as xr
import os
import itertools
import copy

from logging import info, debug, warning
from .....utils.path import init_dir
from .....utils.datastores.dump import dump_datastore, read_datastore
from .....utils.datastores.empty import init_empty
from .....utils.datastores.crop_monitor import crop_monitor
from .apply_AK import apply_ak
from .apply_AK import apply_ak_tl
from .vinterp import vertical_interp


[docs] def forward( transf, inout_datastore, controlvect, obsvect, mapper, di, df, mode, runsubdir, workdir, onlyinit=False, save_debug=False, **kwargs ): """Aggregate simulations at the grid scale to total columns. Re-interpolate the model pressure levels to the satellite averaging kernel levels. Average using the averaging kernel formula """ if not hasattr(transf, "save_data"): transf.save_data = {} ddi = min(di, df) if onlyinit: return ref_parameter = transf.parameter[0] ref_component = transf.component[0] ref_datastore = inout_datastore["outputs"][(ref_component, ref_parameter)] ref_ds = ref_datastore[ddi] target_datastore = \ inout_datastore["inputs"][("concs", ref_parameter)][ddi].reset_index(drop=True) # Put auxiliary values into target datastore target_datastore.loc[:, ("metadata", "pressure")] = \ inout_datastore["inputs"][ ("pressure", ref_parameter)][ddi][("maindata", "spec")].values target_datastore.loc[:, ("metadata", "dp")] = \ inout_datastore["inputs"][ ("dpressure", ref_parameter)][ddi][("maindata", "spec")].values if transf.product == "column": target_datastore.loc[:, ("metadata", "airm")] = \ inout_datastore["inputs"][ ("airm", ref_parameter)][ddi][("maindata", "spec")].values target_datastore.loc[:, ("metadata", "hlay")] = \ inout_datastore["inputs"][( "hlay", ref_parameter)][ddi][("maindata", "spec")].values target_datastore.loc[:, ("metadata", "indorig")] = \ transf.metadata[ddi].loc[:, ("metadata", "indorig")].values target_datastore.loc[:, ("metadata", "date")] = \ transf.metadata[ddi].loc[:, ("metadata", "date")].values ddi = min(di, df) ref_indexes = ~transf.metadata[ddi].loc[ :, ("metadata", "indorig")].duplicated().values y0 = copy.deepcopy(target_datastore.loc[ref_indexes]) y0.index = y0.loc[:, ("metadata", "indorig")].values # Re-construct list of file_aks ref_mapper = mapper["outputs"][(ref_component, ref_parameter)] ref_tracer = ref_mapper["tracer"] files_aks = \ set(list(itertools.chain( *itertools.chain(*ref_tracer.input_files.values())))) files_aks = sorted(files_aks) # Use only files relevant for the present dataset ind_file = transf.metadata[ddi].loc[ref_indexes, ("metadata", "ind_file")] files_aks = np.array(files_aks)[ind_file.drop_duplicates().values.astype(int)] # Pass information from previous transforms if debug if save_debug: df_debug = pd.DataFrame({ c: inout_datastore["inputs"][trid][ddi][c].loc[ref_indexes] for trid in inout_datastore["inputs"] if trid[0] != "stratosphere" for c in inout_datastore["inputs"][trid][ddi].columns if c[0] not in y0.columns.get_level_values(level=0) }) y0 = pd.concat([y0, df_debug], axis=1) # Exit if empty observation datastore if target_datastore.size == 0: return # Init directory for saving forward datastore for later use by adjoint dir_fwd = ddi.strftime("{}/../chain/satellites/{}".format( runsubdir, transf.transform_id)) if not os.path.isdir(dir_fwd): init_dir(dir_fwd) # Check that only one satellite was specified in the datastore iq1 = transf.metadata[ddi].loc[ref_indexes, ("metadata", "station")] list_satIDs = iq1.unique() if len(list_satIDs) > 1: raise Exception( f"Warning! Several satellite IDs were specified in the monitor file: " f"{list_satIDs} \n " "This corresponds to the column 'station'. Please check your monitor file. " "It is possible to manually exclude IDs from the datastore by using the " "parameter 'exclude_stations' (list[str]) in the corresponding satellite " "paragraph of your data vector in the Yaml configuration file." ) # Deal with case when no ID was specified: assume that all data are concerned satID = list_satIDs[0] if type(satID) == str: pass elif ~pd.notnull(satID): satID = 0 iq1[:] = satID # Building the extended dataframe ds_p = target_datastore["metadata"].set_index("indorig")[ ["pressure", "dp"] + (transf.product == "column") * ["airm", "hlay"]] nlev_model = mapper["inputs"][("concs", ref_parameter)]["domain"].nlev satmask = iq1 == satID nobs = np.sum(satmask) # Stacking output datastore into levels * nobs native_ind_stack = ( np.flatnonzero(ref_indexes)[satmask] + np.arange(nlev_model)[:, np.newaxis] ) # If all nans in datasim, meaning that the species was not simulated sim = target_datastore.loc[ :, ("maindata", "spec")].values[native_ind_stack] if not np.any(pd.notnull(sim)): return # Grouping all data from this satellite datasim = xr.Dataset( { "pressure": ( ["level", "index"], np.log(ds_p["pressure"].values[native_ind_stack]), ), "dp": ( ["level", "index"], ds_p["dp"].values[native_ind_stack], ), "sim": (["level", "index"], sim), }, coords={ "index": np.arange(nobs), "level": np.arange(nlev_model), }, ) if transf.product == "column": datasim = datasim.assign( { "airm": ( ["level", "index"], ds_p["airm"].values[native_ind_stack], ), "hlay": ( ["level", "index"], ds_p["hlay"].values[native_ind_stack], ), } ) if mode == "tl": datasim["sim_tl"] = ( ["level", "index"], target_datastore.loc[:, ("maindata", "incr") ].values[native_ind_stack], ) # convert ppb fields to the correct unit # from ppb to molec.cm-2 if the satellite product is a column if transf.product == "column": keys = ["sim"] + (["sim_tl"] if mode == "tl" else []) factor = (datasim["hlay"] * 100) / (1e9 / datasim["airm"]) for k in keys: datasim[k] *= factor # alternative conversion # factor2 = 2.120*1e13*datasim["dp"]/100. # Crop aks depending on split_freq and original file ddf = ref_tracer.datef if hasattr(ref_tracer, "split_freq"): obsvect_dates = pd.date_range( ref_tracer.datei, ref_tracer.datef, freq=ref_tracer.split_freq) i0 = np.argwhere(obsvect_dates == ddi)[0][0] ddf = obsvect_dates[i0 + 1] # Check whether there is some ak info("Fetching satellite infos from files: {}".format(files_aks)) try: colsat = ["ak", "pavg0", "date", "index", "station", "duration"] if transf.use_prior: colsat += ["qa0"] if transf.use_drycols: colsat += ["dryair"] if transf.precomputed_pwgt: colsat += ["pw"] coord2dump = ["index"] all_sat_aks = [] for ind_file, file_aks in enumerate(files_aks): # Crop the monitor to replicate what is done # in pycif/plugins/obsvects/standard/utils/__init__.py ds_dates = xr.open_dataset(file_aks)[["date", "duration"]] crop_date = init_empty() crop_date[("metadata", "date")] = ds_dates["date"].values crop_date[("metadata", "duration")] = ds_dates["duration"].values crop_index = crop_monitor( crop_date, ref_tracer.datei, ref_tracer.datef, return_index=True ) if crop_index.size == 0: continue # Now extract only data relevant to present sub-period # This happens only with split_freq crop_date = crop_date.iloc[crop_index] if ref_tracer.datei != ddi or ref_tracer.datef != ddf: crop_index = crop_monitor( crop_date, ddi, ddf, return_index=True, keep_partial=True ) crop_index = crop_date.index[crop_index].values # Now read the data itself sat_aks = read_datastore( file_aks, col2dump=colsat, coord2dump=coord2dump, keep_default=False, to_pandas=False ) if 'index' in sat_aks: sat_aks = sat_aks.drop('index') # Check that all variables are float64 to avoid precision issues for v in sat_aks: if sat_aks.dtypes[v] == np.dtype('float32'): sat_aks[v] = sat_aks[v].astype('float64') # Crop to sub-date sat_aks["index"] = np.arange(sat_aks.dims["index"]) sat_aks = sat_aks.isel(index=crop_index) # Keep in memory the file name sat_aks = sat_aks.assign( file_aks=("index", sat_aks.dims["index"] * [ind_file])) if len(sat_aks) == 0: warning(f"WARNING: There is no data in file {file_aks}") continue if len(all_sat_aks) == 0: all_sat_aks = sat_aks else: all_sat_aks = xr.concat([all_sat_aks, sat_aks], "index") # # Crop the monitor to replicate what is done # # in pycif/plugins/obsvects/standard/utils/__init__.py # if hasattr(ref_tracer, "split_freq"): # crop_date = init_empty() # crop_date[("metadata", "date")] = all_sat_aks["date"].values # crop_date[ # ("metadata", "duration") # ] = all_sat_aks["duration"].values # ddf = ref_tracer.datef # obsvect_dates = pd.date_range( # ref_tracer.datei, ref_tracer.datef, # freq=ref_tracer.split_freq) # i0 = np.argwhere(obsvect_dates == ddi)[0][0] # ddf = obsvect_dates[i0 + 1] # crop_index = crop_monitor( # crop_date, ddi, ddf, # return_index=True, # keep_partial=True # ) # Reset index all_sat_aks["index"] = np.arange(all_sat_aks.dims["index"]) # all_sat_aks = all_sat_aks.isel(index=crop_index) # If no station was specified in the original file, just fill with 0 if "station" not in all_sat_aks: all_sat_aks["station"] = xr.full_like(all_sat_aks.index, satID) # Crop first observations for debugging purposes if specified in yml if getattr(ref_tracer, "crop_datastore", False): crop_istart = getattr(ref_tracer, "crop_istart", 0) all_sat_aks = all_sat_aks.isel( index=slice(crop_istart, crop_istart + ref_tracer.nobs2crop)) # Selecting only lines used in simulation mask = all_sat_aks["date"].isin( target_datastore['metadata']["date"].drop_duplicates() ) & (all_sat_aks["station"] == satID) sat_aks = all_sat_aks.loc[{"index": mask}] except IOError: # Assumes total columns? raise IOError("Could not fetch " "satellite info from {}".format(file_aks)) # datastore['qdp'] = datastore['sim'] * datastore['dp'] # groups = datastore.groupby(['indorig']) # y0.loc[:, 'sim'] += \ # groups['qdp'].sum().values / groups['dp'].sum().values # # if 'sim_tl' in datastore: # datastore['qdp'] = datastore['sim_tl'] * datastore['dp'] # groups = datastore.groupby(['indorig']) # y0.loc[:, 'sim_tl'] += \ # groups['qdp'].sum().values / groups['dp'].sum().values # continue aks = sat_aks["ak"][:, ::-1].T pavgs = sat_aks["pavg0"][:, ::-1].T if transf.pressure == "hPa": pavgs *= 100 # -- For level-based, given pressures are averages over the partial column (n) # -- For layer-based, given pressures are the ones at inter-levels (n + 1) level0_size = aks.level.size + 1 level1_size = aks.level.size if transf.level_based: level0_size -= 1 coords0 = {"index": np.arange(nobs), "level": np.arange(level0_size)} coords1 = {"index": np.arange(nobs), "level": np.arange(level1_size)} dims = ("level", "index") if transf.use_prior: qa0 = sat_aks["qa0"][:, ::-1].T else: qa0 = 0. * aks if transf.level_based: pavgs_mid = xr.DataArray( np.log(pavgs.values), coords0, dims).bfill("level") # Assuming goes from surface to the top of the atmosphere flip_pressure = False if not np.all(np.diff(pavgs.values, axis=0) > 0): pavgs.values = np.flip(pavgs.values, axis=0) flip_pressure = True # Computes dpavgs from top to bottom dpavgs = np.diff(pavgs.values, axis=0) dpavgs = np.concatenate([ np.maximum(0, pavgs.values[[0]] - dpavgs[[0]] / 2), pavgs.values[1:] - dpavgs / 2, pavgs.values[[-1]] + dpavgs[[-1]] / 2 ], axis=0) dpavgs = xr.DataArray(np.diff(dpavgs, axis=0), coords1, dims) # Flip back pressures if flip_pressure: pavgs.values = np.flip(pavgs.values, axis=0) dpavgs.values = np.flip(dpavgs.values, axis=0) else: try: pavgs = xr.DataArray(pavgs, coords0, dims).bfill("level") except: print(__file__) import code code.interact(local=dict(locals(), **globals())) dpavgs = np.abs(xr.DataArray(np.diff(-pavgs, axis=0), coords1, dims)) pavgs_mid = xr.DataArray( np.log(0.5 * (pavgs[:-1].values + pavgs[1:].values)), coords1, dims) # Exclude observations where there are all zeros in the simulation # exclude_zeros = np.where(sim.sum(axis=0) != 0)[0] exclude_zeros = np.where(pd.notnull(sim.sum(axis=0)))[0] if exclude_zeros.size == 0: warning( "All simulations are invalid. Skipping satellite computations for " f"{ref_component} / {ref_parameter} and period {ddi}. \n" f"This can be related to: \n" f" - all observations are outside the domain of simulation\n" f" - simulations are 0 for a reasonable cause?\n" f" - you have a bug somewhere upstream this transformation..." ) return datasim = datasim.isel(index=exclude_zeros) sat_aks = sat_aks.isel(index=exclude_zeros) aks = aks.isel(index=exclude_zeros) pavgs_mid = pavgs_mid.isel(index=exclude_zeros) dpavgs = dpavgs.isel(index=exclude_zeros) qa0 = qa0.isel(index=exclude_zeros) nobs = datasim.dims["index"] # Adding dry air mole fraction if formula 5 if transf.use_drycols: drycols = sat_aks["dryair"][:, ::-1].T else: drycols = qa0 * 0.0 + 1 # Pressure weight to apply on columns pwgt = dpavgs / dpavgs.sum(axis=0) if not transf.scale_dpressure: pwgt = 0 * pwgt + 1 elif transf.precomputed_pwgt: pwgt = sat_aks["pw"][:, ::-1].T # Fetch values if fill strato if transf.fill_strato: strato_mapper = mapper["inputs"][("stratosphere", ref_parameter)] strato_sigma_a = strato_mapper["domain"].sigma_a_mid strato_sigma_b = strato_mapper["domain"].sigma_b_mid strato_sigma_a_interface = strato_mapper["domain"].sigma_a strato_sigma_b_interface = strato_mapper["domain"].sigma_b strato_nlev = len(strato_sigma_a) # Expand debug_datastore with info # from previous transforms in strato if save_debug: trid_strato = ("stratosphere", ref_parameter) df_debug = pd.DataFrame({ c: inout_datastore["inputs"][trid_strato][ddi][c].iloc[::strato_nlev] for c in inout_datastore["inputs"][trid_strato][ddi].columns if c[0] not in y0.columns.get_level_values(level=0) }) y0 = pd.concat([y0, df_debug], axis=1) # Interpolating simulated values to averaging kernel pressures sim_ak = 0.0 * pavgs_mid sim_ak_tl = 0.0 * pavgs_mid if transf.split_tropo_strato: sim_ak_tropo = 0.0 * pavgs_mid sim_ak_tl_tropo = 0.0 * pavgs_mid sim_ak_strato = 0.0 * pavgs_mid sim_ak_tl_strato = 0.0 * pavgs_mid sim_pressure = datasim["pressure"].values sim_dpressure = datasim["dp"].values sim = datasim["sim"].values if mode == "tl": sim_tl = datasim["sim_tl"].values # Fetch missing values from stratosphere if transf.fill_strato: psurf_strato = np.exp(sim_pressure[0]) pstrato = np.log( strato_sigma_b * psurf_strato[:, np.newaxis] + strato_sigma_a).T pstrato_interface = ( strato_sigma_b_interface * psurf_strato[:, np.newaxis] + strato_sigma_a_interface ).T dpstrato = np.abs(np.diff(pstrato_interface, axis=0)) # Here merge sim_pressure # and pstrato properly, then apply vertical_interp nblevbasic = np.shape(sim_pressure)[0] cond = pstrato < sim_pressure[-1] missing_levels = np.argmax(cond, axis=0) missing_levels[~np.any(cond, axis=0)] = pstrato.shape[0] nblevremainingstrato = pstrato.shape[0] - missing_levels.min() sim_pressure_varying = np.full( (nblevbasic + nblevremainingstrato, len(datasim.index)), 0.) sim_pressure_varying[:nblevbasic] = sim_pressure[:] sim_dpressure_varying = np.full( (nblevbasic + nblevremainingstrato, len(datasim.index)), 0.) sim_dpressure_varying[:nblevbasic] = sim_dpressure[:] if ~np.all(np.any(cond, axis=0)): warning( "Stratosphere is used with strato pressure too high to complete the " "troposphere data set. Please check that your yml does the expected " "configuration." ) # Filling pressure levels in stratosphere lev_index_target = [ i for m in missing_levels for i in list(range(nblevbasic, nblevbasic + pstrato.shape[0] - m))] lev_index_orig = [i for m in missing_levels for i in list(range(m, pstrato.shape[0]))] obs_index_target = [ j for j, m in enumerate(missing_levels) for i in list(range(nblevbasic, nblevbasic + pstrato.shape[0] - m))] np.add.at(sim_pressure_varying, (lev_index_target, obs_index_target), pstrato[lev_index_orig, obs_index_target] ) sim_pressure = copy.deepcopy(sim_pressure_varying) # Filling dpressure values in stratosphere np.add.at(sim_dpressure_varying, (lev_index_target, obs_index_target), dpstrato[lev_index_orig, obs_index_target] ) sim_dpressure = copy.deepcopy(sim_dpressure_varying) # Stacking strato dataframe to column shape strato = inout_datastore["inputs"][ ("stratosphere", ref_parameter)][ddi] sim_strato_ref = \ strato["maindata"]["spec"].values.reshape( (strato_nlev, -1), order="F")[:, exclude_zeros] incr_strato = 0. * sim_strato_ref if "incr" in strato["maindata"]: incr_strato = \ strato["maindata"]["incr"].values.reshape( (strato_nlev, -1), order="F") # ppb to molec/cm2 if column product if transf.product == "column": dpstrato = \ np.diff(np.concatenate( [psurf_strato[np.newaxis, :], np.exp(pstrato)], axis=0), axis=0) / 100 # hPa G = 9.81 dmass = np.abs(dpstrato / G) # kg/m2 column = dmass * 1e3 # g/m2 column /= 28.96 * 1e4 # mol/cm2 column *= 6.02214076e23 # molec / cm2 column /= 1e9 # scaling from ppb sim_strato_ref *= column incr_strato *= column if transf.split_tropo_strato: debug("BEWARE, " "\"if\" not tested in case of varying missing levels") sim_tropo = np.full((nblevbasic + nblevremainingstrato, len(datasim.index)), 0.) sim_tropo[:nblevbasic] = sim[:] sim_strato = np.full((nblevbasic + nblevremainingstrato, len(datasim.index)), 0.) np.add.at( sim_strato, (lev_index_target, obs_index_target), sim_strato_ref[lev_index_orig, obs_index_target] ) if mode == "tl": sim_tropo_tl = np.full((nblevbasic + nblevremainingstrato, len(datasim.index)), 0.) sim_tropo_tl[:nblevbasic] = sim_tl[:] sim_strato_tl = np.full((nblevbasic + nblevremainingstrato, len(datasim.index)), 0.) np.add.at( sim_strato_tl, (lev_index_target, obs_index_target), incr_strato[lev_index_orig, obs_index_target] ) sim_varying = np.full((nblevbasic + nblevremainingstrato, len(datasim.index)), 0.) sim_varying[:nblevbasic] = sim[:] np.add.at(sim_varying, (lev_index_target, obs_index_target), sim_strato_ref[lev_index_orig, obs_index_target] ) sim = copy.deepcopy(sim_varying) if mode == "tl": sim_tl_varying = np.full( (nblevbasic + nblevremainingstrato, len(datasim.index)), 0.) sim_tl_varying[:nblevbasic, :] += sim_tl[:] np.add.at(sim_tl_varying, (lev_index_target, obs_index_target), incr_strato[lev_index_orig, obs_index_target] ) sim_tl = copy.deepcopy(sim_tl_varying) # Forward fill with last top value available mask = np.full((nblevbasic + nblevremainingstrato, len(datasim.index)), 0.) mask[:nblevbasic] = 1 np.add.at(mask, (lev_index_target, obs_index_target), np.ones(len(lev_index_target)) ) idx = np.where(mask == 1, np.arange(mask.shape[0])[:, np.newaxis], 0) np.maximum.accumulate(idx, axis=0, out=idx) sim = sim[idx, np.arange(nobs)[np.newaxis, :]] if mode == "tl": sim_tl = sim_tl[idx, np.arange(nobs)[np.newaxis, :]] # end fillstrato # Vertical interpolation xlow, xhigh, alphalow, alphahigh = vertical_interp( sim_pressure, sim_dpressure, pavgs_mid.values, dpavgs.values, transf.cropstrato, transf.vinterp_type, transf.weights_nsubsteps ) # Applying coefficients meshout_wgt = ( np.arange(pavgs_mid.shape[0])[:, np.newaxis] * np.ones((1, len(datasim.index))) ).astype(int) meshout_wgt_iobs = ( np.arange(len(datasim.index))[np.newaxis] * np.ones((pavgs_mid.shape[0], 1)) ).astype(int) if transf.vinterp_type == "weight": meshout_wgt = np.floor( np.linspace(0, pavgs_mid.shape[0], pavgs_mid.shape[0] * transf.weights_nsubsteps, endpoint=False )[:, np.newaxis] * np.ones((1, len(datasim.index))) ).astype(int) meshout_wgt_iobs = ( np.arange(len(datasim.index))[np.newaxis] * np.ones((pavgs_mid.shape[0] * transf.weights_nsubsteps, 1)) ).astype(int) np.add.at( sim_ak.values, (meshout_wgt, meshout_wgt_iobs), alphalow * sim[xlow, meshout_wgt_iobs] + alphahigh * sim[xhigh, meshout_wgt_iobs] ) if mode == "tl": np.add.at( sim_ak_tl.values, (meshout_wgt, meshout_wgt_iobs), alphalow * sim_tl[xlow, meshout_wgt_iobs] + alphahigh * sim_tl[xhigh, meshout_wgt_iobs] ) # Apply coefficient to "partial" columns if transf.split_tropo_strato: np.add.at( sim_ak_tropo.values, (meshout_wgt, meshout_wgt_iobs), alphalow * sim_tropo[xlow, meshout_wgt_iobs] + alphahigh * sim_tropo[xhigh, meshout_wgt_iobs] ) np.add.at( sim_ak_strato.values, (meshout_wgt, meshout_wgt_iobs), alphalow * sim_strato[xlow, meshout_wgt_iobs] + alphahigh * sim_strato[xhigh, meshout_wgt_iobs] ) if mode == "tl": np.add.at( sim_ak_tl_tropo.values, (meshout_wgt, meshout_wgt_iobs), alphalow * sim_tropo_tl[xlow, meshout_wgt_iobs] + alphahigh * sim_tropo_tl[xhigh, meshout_wgt_iobs] ) np.add.at( sim_ak_tl_strato.values, (meshout_wgt, meshout_wgt_iobs), alphalow * sim_strato_tl[xlow, meshout_wgt_iobs] + alphahigh * sim_strato_tl[xhigh, meshout_wgt_iobs] ) # Correction with the pressure thickness target_datastore[("metadata", "pthick")] = np.nan if transf.correct_pthick: if transf.product == "column": scale_pthick = (sim.sum(axis=0) / sim_ak.sum(axis=0)).values else: scale_pthick = ( (sim_dpressure * sim).sum(axis=0) / (dpavgs * sim_ak).sum(axis=0).values * dpavgs.sum(axis=0).values / sim_dpressure.sum(axis=0) ) sim_ak *= scale_pthick if 'sim_tl' in datasim: sim_ak_tl *= scale_pthick target_datastore.iloc[ np.flatnonzero(ref_indexes)[satmask][exclude_zeros], target_datastore.columns.get_loc(("metadata", "pthick")) ] = scale_pthick if transf.split_tropo_strato: sim_ak_tropo *= scale_pthick sim_ak_strato *= scale_pthick if 'sim_tl' in datasim: sim_ak_tl_tropo *= scale_pthick sim_ak_tl_strato *= scale_pthick # Applying aks nbformula = transf.formula chosenlevel = transf.chosenlev debug("product: {}".format(transf.product)) debug("nbformula: {}".format(nbformula)) debug("chosenlev: {}".format(chosenlevel)) out_index = satmask.iloc[exclude_zeros].index y0.loc[out_index, ("maindata", "spec")] = \ apply_ak( sim_ak.values, aks.values, pwgt.values, drycols.values, qa0.values, use_drycols=transf.use_drycols, chosen_level=chosenlevel, scale_factor=transf.unit_scaling, normalize_columns=transf.normalize_columns, log_space=transf.log_space ) if mode == "tl": y0.loc[out_index, ("maindata", "incr")] = \ apply_ak_tl( sim_ak_tl.values, sim_ak.values, aks.values, pwgt.values, drycols.values, qa0.values, use_drycols=transf.use_drycols, chosen_level=chosenlevel, scale_factor=transf.unit_scaling, normalize_columns=transf.normalize_columns, log_space=transf.log_space ) if transf.split_tropo_strato: y0.loc[satmask.iloc[exclude_zeros].index, (transf.transform_id, "tropo")] = \ apply_ak( sim_ak_tropo.values, aks.values, pwgt.values, drycols.values, qa0.values, use_drycols=transf.use_drycols, chosen_level=chosenlevel, scale_factor=transf.unit_scaling, normalize_columns=transf.normalize_columns, log_space=transf.log_space ) y0.loc[satmask.iloc[exclude_zeros].index, (transf.transform_id, "strato")] = \ apply_ak( sim_ak_strato.values, aks.values, pwgt.values, drycols.values, qa0.values, use_drycols=transf.use_drycols, chosen_level=chosenlevel, scale_factor=transf.unit_scaling, normalize_columns=transf.normalize_columns, log_space=transf.log_space ) if mode == "tl": y0.loc[satmask.iloc[exclude_zeros].index, (transf.transform_id, "tropo_tl")] = \ apply_ak_tl( sim_ak_tl_tropo.values, sim_ak.values, aks.values, pwgt.values, drycols.values, qa0.values, use_drycols=transf.use_drycols, chosen_level=chosenlevel, scale_factor=transf.unit_scaling, normalize_columns=transf.normalize_columns, log_space=transf.log_space ) y0.loc[satmask.iloc[exclude_zeros].index, (transf.transform_id, "strato_tl")] = \ apply_ak_tl( sim_ak_tl_strato.values, sim_ak.values, aks.values, pwgt.values, drycols.values, qa0.values, use_drycols=transf.use_drycols, chosen_level=chosenlevel, scale_factor=transf.unit_scaling, normalize_columns=transf.normalize_columns, log_space=transf.log_space ) # Saves sim_ak for later use by adjoint if transf.log_space or transf.force_dump_sim_aks: file_dump = ddi.strftime( "{}/sim_ak_{}_%Y%m%d%H%M.nc".format(dir_fwd, satID)) sim_ak.to_netcdf(file_dump) inout_datastore["outputs"][(ref_component, ref_parameter)][ddi] = y0 # Dump information about what monitor file was used with which observation infos = sat_aks["file_aks"].to_dataframe().reset_index().rename( columns={"index": "orig_index", "file_aks": "file_id"}) infos.loc[:, "file_aks"] = pd.DataFrame( {"files_aks": files_aks.flatten()}, dtype=files_aks.dtype, index=np.arange(len(files_aks), dtype=int) ).loc[infos["file_id"].values].values.flatten() del infos["file_id"] todump = pd.DataFrame(index=y0.index) todump = todump.reindex( columns=todump.columns.tolist() + [("metadata", "orig_index"), ("metadata", "file_aks")]) todump.loc[satmask.iloc[exclude_zeros].index, [("metadata", "orig_index"), ("metadata", "file_aks")]] = infos.values file_infos = ddi.strftime( "{}/infos_%Y%m%d%H%M.nc".format(dir_fwd) ) dump_datastore( todump, file_monit=file_infos, dump_default=False, col2dump=[("metadata", "orig_index"), ("metadata", "file_aks")], mode="w", ) # Save exclude_zeros in forward datastore mask_zeros = target_datastore[("metadata", "indorig")].isin(exclude_zeros) target_datastore[("metadata", "exclude_zeros")] = True target_datastore.loc[mask_zeros, ("metadata", "exclude_zeros")] = False # Save forward datastore for later use by adjoint file_monit = ddi.strftime( "{}/monit_%Y%m%d%H%M.nc" .format(dir_fwd) ) col2dump = ["pressure", "dp", "indorig", "hlay", "airm", "sim", "pthick", "exclude_zeros"] dump_datastore( target_datastore, file_monit=file_monit, dump_default=False, col2dump=col2dump, mode="w", )