Source code for pycif.plugins.models.wrfchem.wrfchem_helper

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Created on Mon Jul 22 15:03:02 2019

VERSION HISTORY
2021-09-25   freum   Adapted from CTDAS-WRF wrfchem_helper.py
- Everything not (yet) needed is commented

@author: friedemann
"""

# Instructions for pylint:
# pylint: disable=too-many-instance-attributes
# pylint: disable=W0201
# pylint: disable=C0301
# pylint: disable=E1136
# pylint: disable=E1101


import os
import shutil
import re
import glob
import bisect
import copy
import numpy as np
import netCDF4 as nc
import datetime as dt
# import wrf # not needed yet - downgrades numpy on mac, creating pandas errors
import f90nml
import pickle

# CTDAS modules
# import da.tools.io4 as io
# from da.tools.wrfchem.utilities import utilities


[docs] class WRFChemHelper(object): """Contains helper functions for sampling WRF-Chem""" def __init__(self, settings): self.settings = settings # def validate_settings(self, needed_items=[]): # """ # This is based on WRFChemOO._validate_rc # """ # # if len(needed_items)==0: # return # # for key in needed_items: # if key not in self.settings: # msg = "Missing a required value in settings: %s" % key # raise IOError(msg)
[docs] @staticmethod def read_namelist(nml_file): """Read run settings from namelist file""" namelist = f90nml.read(nml_file) # Some values are lists when running with nested domains, but # not when running with only one domain. Ensure here those # values are always lists. vars2list = dict(domains = ["e_we", "e_sn", "parent_id", "parent_grid_ratio", "i_parent_start", "j_parent_start" ], time_control = ["history_interval", "auxinput5_interval_m"] ) for section, varnames in vars2list.items(): for var in varnames: if not isinstance(namelist[section][var], list): namelist[section][var] = [namelist[section][var]] return namelist
# @staticmethod # def get_pressure_boundaries_paxis(p_axis, p_surf): # """ # Arguments # --------- # p_axis (:class:`array-like`) # Pressure at mid points of layers # p_surf (:class:`numeric`) # Surface pressure # Output # ------ # Pressure at layer boundaries # """ # # pb = np.array([float("nan")]*(len(p_axis)+1)) # pb[0] = p_surf # # for nl in range(len(pb)-1): # pb[nl+1] = pb[nl] + 2*(p_axis[nl] - pb[nl]) # # return pb # # @staticmethod # def get_pressure_boundaries_znw(znw, p_surf, p_top): # """ # Arguments # --------- # ZNW (:class:`ndarray`) # Eta coordinates of z-staggered WRF grid. For each # observation (2D) # p_surf (:class:`ndarray`) # Surface pressure (1D) # p_top (:class:`ndarray`) # Top-of-atmosphere pressure (1D) # Output # ------ # Pressure at layer boundaries # # CAVEATS # ------- # Maybe I should rather use P_HYD? Well, Butler et al. 2018 # (https://www.geosci-model-dev-discuss.net/gmd-2018-342/) used # znu and surface pressure to compute "WRF midpoint layer # pressure". # # For WRF it would be more consistent to interpolate to levels. # See also comments in code. # """ # # return znw*(p_surf-p_top) + p_top # #
[docs] @staticmethod def get_int_coefs(pb_ret, pb_mod, level_def): """ Computes a coefficients matrix to transfer a model profile onto a retrieval pressure axis. If level_def=="layer_average", this assumes that profiles are constant in each layer of the retrieval, bound by the pressure boundaries pb_ret. In this case, the WRF model layer is treated in the same way, and coefficients integrate over the assumed constant model layers. This works with non-staggered WRF variables (on "theta" points). However, this is actually not how WRF is defined, and the implementation should be changed to z-staggered variables. Details for this change are in a comment at the beginning of the code. If level_def=="pressure_boundary" (IMPLEMENTATION IN PROGRESS), assumes that profiles, kernel and pwf are defined at pressure boundaries that don't have a thickness (this is how OCO-2 data are defined, for example). In this case, the coefficients linearly interpolate adjacent model level points. This is incompatible with the treatment of WRF in the above-described layer-average assumption, but is closer to how WRF is actually defined. The exception is that pb_mod is still constructed and non-staggered variables are not defined at psurf. This can only be fixed by switching to z-staggered variables. In cases where retrieval surface pressure is higher than model surface pressure, and in cases where retrieval top pressure is lower than model top pressure, the model profile will be extrapolated with constant tracer mixing ratios. In cases where retrieval surface pressure is lower than model surface pressure, and in cases where retrieval top pressure is higher than model top pressure, only the parts of the model column that fall within the retrieval presure boundaries are sampled. Arguments --------- pb_ret (:class:`array_like`) Pressure boundaries of the retrieval column pb_mod (:class:`array_like`) Pressure boundaries of the model column level_def (:class:`string`) "layer_average" or "pressure_boundary" (IMPLEMENTATION IN PROGRESS). Refers to the retrieval profile. Note 2021-09-13: Inspected code for pressure_boundary. Should be correct. Interpolates linearly between two model levels. Returns ------- coefs (:class:`array_like`) Integration coefficient matrix. Each row sums to 1. Usage ----- .. code-block:: python import numpy as np pb_ret = np.linspace(900., 50., 5) pb_mod = np.linspace(1013., 50., 7) model_profile = 1. - np.linspace(0., 1., len(pb_mod)-1)**3 coefs = get_int_coefs(pb_ret, pb_mod, "layer_average") retrieval_profile = np.matmul(coefs, model_profile) """ if level_def == "layer_average": # This code assumes that WRF variables are constant in # layers, but they are defined on levels. This can be seen # for example by asking wrf.interplevel for the value of a # variable that is defined on the mass grid ("theta points") # at a pressure slightly higher than the pressure on its # grid (wrf.getvar(ncf, "p")), it returns nan. So There is # no extrapolation. There are no layers. There are only # levels. # In addition, this page here: # https://www.openwfm.org/wiki/How_to_interpret_WRF_variables # says that to find values at theta-points of a variable # living on u-points, you interpolate linearly. That's the # other way around from what I would do if I want to go from # theta to staggered. # WRF4.0 user guide: # - ungrib can interpolate linearly in p or log p # - real.exe comes with an extrap_type namelist option, that # extrapolates constantly BELOW GROUND. # This would mean the correct way would be to integrate over # a piecewise-linear function. It also means that I really # want the value at surface level, so I'd need the CO2 # fields on the Z-staggered grid ("w-points")! Interpolate # the vertical in p with wrf.interp1d, example: # wrf.interp1d(np.array(rh.isel(south_north=1, west_east=0)), # np.array(p.isel(south_north=1, west_east=0)), # np.array(988, 970)) # (wrf.interp1d gives the same results as wrf.interplevel, # but the latter just doesn't want to work with single # columns (32,1,1), it wants a dim>1 in the horizontal # directions) # So basically, I can keep using pb_ret and pb_mod, but it # would be more accurate to do the piecewise-linear # interpolation and the output matrix will have 1 more # value in each dimension. # Calculate integration weights by weighting with layer # thickness. This assumes that both axes are ordered # psurf to ptop. coefs = np.ndarray(shape=(len(pb_ret)-1, len(pb_mod)-1)) coefs[:] = 0. # Extend the model pressure grid if retrieval encompasses # more. pb_mod_tmp = copy.deepcopy(pb_mod) # In case the retrieval pressure is higher than the model # surface pressure, extend the lowest model layer. if pb_mod_tmp[0] < pb_ret[0]: pb_mod_tmp[0] = pb_ret[0] # In case the model doesn't extend as far as the retrieval, # extend the upper model layer upwards. if pb_mod_tmp[-1] > pb_ret[-1]: pb_mod_tmp[-1] = pb_ret[-1] # For each retrieval layer, this loop computes which # proportion falls into each model layer. for nret in range(len(pb_ret)-1): # 1st model pressure boundary index = the one before the # first boundary with lower pressure than high-pressure # retrieval layer boundary. model_lower = pb_mod_tmp < pb_ret[nret] id_model_lower = model_lower.nonzero()[0] id_min = id_model_lower[0]-1 # Last model pressure boundary index = the last one with # higher pressure than low-pressure retrieval layer # boundary. model_higher = pb_mod_tmp > pb_ret[nret+1] id_model_higher = model_higher.nonzero()[0] if len(id_model_higher) == 0: #id_max = id_min raise ValueError("This shouldn't happen. Debug.") else: id_max = id_model_higher[-1] # By the way, in case there is no model level with # higher pressure than the next retrieval level, # id_max must be the same as id_min. # For each model layer, find out how much of it makes up this # retrieval layer for nmod in range(id_min, id_max+1): if (nmod == id_min) & (nmod != id_max): # Part of 1st model layer that falls within # retrieval layer coefs[nret, nmod] = pb_ret[nret] - pb_mod_tmp[nmod+1] elif (nmod != id_min) & (nmod == id_max): # Part of last model layer that falls within # retrieval layer coefs[nret, nmod] = pb_mod_tmp[nmod] - pb_ret[nret+1] elif (nmod == id_min) & (nmod == id_max): # id_min = id_max, i.e. model layer encompasses # retrieval layer coefs[nret, nmod] = pb_ret[nret] - pb_ret[nret+1] else: # Retrieval layer encompasses model layer coefs[nret, nmod] = pb_mod_tmp[nmod] - pb_mod_tmp[nmod+1] coefs[nret, :] = coefs[nret, :]/sum(coefs[nret, :]) # I tested the code with many cases, but I'm only 99.9% sure # it works for all input. Hence a test here that the # coefficients sum to 1 and dump the data if not. sum_ = np.abs(coefs.sum(1) - 1) if np.any(sum_ > 2.*np.finfo(sum_.dtype).eps): dump = dict(pb_ret=pb_ret, pb_mod=pb_mod, level_def=level_def) fp = "int_coefs_dump.pkl" with open(fp, "w") as f: pickle.dump(dump, f, 0) msg_fmt = "Something doesn't sum to 1. Arguments dumped to: %s" raise ValueError(msg_fmt % fp) elif level_def=="pressure_boundary": #msg = "level_def is pressure_boundary. Implementation not complete." ##logging.error(msg) #raise ValueError(msg) # Note 2021-09-13: Inspected the code. Should be correct. # Go back to pressure midpoints for model... # Change this line to p_mod = pb_mod for z-staggered # variables p_mod = pb_mod[1:] - 0.5*np.diff(pb_mod) coefs = np.ndarray(shape=(len(pb_ret), len(pb_mod)-1)) coefs[:] = 0. # For each retrieval pressure level, compute linear # interpolation coefficients for nret in range(len(pb_ret)): nmod_list = (p_mod < pb_ret[nret]).nonzero()[0] if(len(nmod_list)>0): nmod = nmod_list[0] - 1 if nmod==-1: # Constant extrapolation at surface nmod = 0 coef = 1. else: # Normal case: coef = (pb_ret[nret]-p_mod[nmod+1])/(p_mod[nmod]-p_mod[nmod+1]) else: # Constant extrapolation at atmosphere top nmod = len(p_mod)-2 coef=0. coefs[nret, nmod] = coef coefs[nret, nmod+1] = 1.-coef elif level_def=="linear_average_pb": # pb_mod defined on levels and varying linearly in between # pb_ret defined on boundaries that define layers. # Coefficients are averages (integral/width) over piecewise # linear pb_mod # Constant extrapolation at model bottom and top: # Add an extra level, and after computing coefficients, # add the value of the extra level to the original bottom/ # top and delete the extra level pb_mod_tmp = copy.deepcopy(pb_mod) if pb_mod[0] < pb_ret[0]: pb_mod_tmp = np.append(pb_ret[0], np.array(pb_mod_tmp)) if pb_mod[-1] > pb_ret[-1]: pb_mod_tmp = np.append(np.array(pb_mod_tmp), pb_ret[-1]) coefs = np.ndarray(shape=(len(pb_ret)-1, len(pb_mod_tmp))) coefs[:] = 0. for nret in range(len(pb_ret)-1): # Special case: zero-thickness layer. # This is for the misuse of this procedure for temporal # interpolation, specifically the case of instantaneous # measurements. if pb_ret[nret] == pb_ret[nret+1]: nmod = [n for n in range(len(pb_mod_tmp)) if pb_mod_tmp[n]>=pb_ret[nret]>pb_mod_tmp[n+1]] if len(nmod)!=1: raise ValueError("This shouldn't happen.") nmod = nmod[0] coefs[nret, nmod] += (pb_ret[nret] - pb_mod_tmp[nmod+1]) / (pb_mod_tmp[nmod]-pb_mod_tmp[nmod+1]) coefs[nret, nmod+1] += (pb_mod_tmp[nmod] - pb_ret[nret]) / (pb_mod_tmp[nmod]-pb_mod_tmp[nmod+1]) continue # 1st model pressure boundary index = the one before the # first boundary with lower pressure than high-pressure # retrieval layer boundary. model_lower = pb_mod_tmp < pb_ret[nret] id_model_lower = model_lower.nonzero()[0] id_min = id_model_lower[0]-1 # Last model pressure boundary index = the last one with # higher pressure than low-pressure retrieval layer # boundary. model_higher = pb_mod_tmp > pb_ret[nret+1] id_model_higher = model_higher.nonzero()[0] if len(id_model_higher) == 0: #id_max = id_min raise ValueError("This shouldn't happen. Debug.") else: id_max = id_model_higher[-1] # For each model layer, add to piecewise linear integration coefficients for nmod in range(id_min, id_max+1): if (pb_ret[nret]<=pb_mod_tmp[nmod]) & (pb_ret[nret+1]<=pb_mod_tmp[nmod+1]): # A retrieval layer start falls on nth model level or between nth and (n+1)th, # while it ends at or above the next model level coefs[nret, nmod] += (pb_ret[nret] - pb_mod_tmp[nmod+1])**2 / (2*(pb_mod_tmp[nmod] - pb_mod_tmp[nmod+1])) coefs[nret, nmod+1] += (pb_ret[nret] - pb_mod_tmp[nmod+1])*(2*pb_mod_tmp[nmod] - pb_ret[nret] - pb_mod_tmp[nmod+1]) / (2*(pb_mod_tmp[nmod] - pb_mod_tmp[nmod+1])) elif (pb_ret[nret]>pb_mod_tmp[nmod]) & (pb_ret[nret+1]>=pb_mod_tmp[nmod+1]): # Retrieval layer starts below nth model layer, ends at or below (n+1)th model layer coefs[nret, nmod] += (pb_mod_tmp[nmod] - pb_ret[nret+1])*(pb_mod_tmp[nmod] + pb_ret[nret+1] - 2*pb_mod_tmp[nmod+1]) / (2*(pb_mod_tmp[nmod] - pb_mod_tmp[nmod+1])) coefs[nret, nmod+1] += (pb_mod_tmp[nmod] - pb_ret[nret+1])**2 / (2*(pb_mod_tmp[nmod] - pb_mod_tmp[nmod+1])) elif (pb_ret[nret]<=pb_mod_tmp[nmod]) & (pb_ret[nret+1]>pb_mod_tmp[nmod+1]): # Model layer encompasses retrieval layer coefs[nret, nmod] += (pb_ret[nret] - pb_ret[nret+1])*(pb_ret[nret] + pb_ret[nret+1] - 2*pb_mod_tmp[nmod+1]) / (2*(pb_mod_tmp[nmod] - pb_mod_tmp[nmod+1])) coefs[nret, nmod+1] += (pb_ret[nret] - pb_ret[nret+1])*(2*pb_mod_tmp[nmod] - pb_ret[nret] - pb_ret[nret+1]) / (2*(pb_mod_tmp[nmod] - pb_mod_tmp[nmod+1])) else: # Retrieval layer encompasses model layer coefs[nret, nmod] += (pb_mod_tmp[nmod] - pb_mod_tmp[nmod+1])/2 coefs[nret, nmod+1] += (pb_mod_tmp[nmod] - pb_mod_tmp[nmod+1])/2 # Convert integral to average coefs[nret, :] = coefs[nret, :]/(pb_ret[nret]-pb_ret[nret+1]) # Constant extrapolation: # Add the value of the temporary extra level to the # bottom/top and remove extra level if pb_mod[0] < pb_ret[0]: coefs[:, 1] += coefs[:, 0] coefs = coefs[:, 1:] if pb_mod[-1] > pb_ret[-1]: coefs[:, -2] += coefs[:, -1] coefs = coefs[:, :-1] else: msg = "Unknown level_def: " + level_def raise ValueError(msg) return coefs
# # @staticmethod # def get_pressure_weighting_function(pressure_boundaries, rule): # """ # Compute pressure weighting function according to 'rule'. # Valid rules are: # - simple (=layer thickness) # - connor2008 (not implemented) # """ # if rule == 'simple': # pwf = np.abs(np.diff(pressure_boundaries)/np.ptp(pressure_boundaries)) # else: # raise NotImplementedError("Rule %s not implemented" % rule) # # return pwf # # def locate_domain(self, lat, lon): # """ # Input # ----- # lat: Single values or lists or np.arrays # lon: Single values or lists or np.arrays # # Output # ------ # - xy-coordinates in finest domain that contains the coordinates # - finest domain # # NOTE # ---- # self.open_wrf_location_files() must be run first # """ # # if not hasattr(self, "_loc_files"): # raise RuntimeError("Must call open_wrf_loc_files first.") # # # Work with arrays internally. # lat = np.array(lat, ndmin=1) # lon = np.array(lon, ndmin=1) # # # Get coordinates of the observation in all domains # ndomains = self.namelist["domains"]["max_dom"] # # # Get domain sizes in xy on mass (=unstaggered) grid (hence the -1) # dom_size_x = np.array(self.namelist['domains']['e_we'], dtype=float, ndmin=1) - 1 # dom_size_y = np.array(self.namelist['domains']['e_sn'], dtype=float, ndmin=1) - 1 # # # Initialize output # xy = np.zeros((len(lat), 2)) # xy[:] = np.nan # finest_domain = np.zeros((len(lat), ), int) # # # Since wrf.ll_to_xy takes very long, I save a bit of time here # # by starting in the finest domain and processing only # # observations that weren't previously found. # for n in range(ndomains-1, -1, -1): # # Get xy for this domain # # Only process what you haven't processed # sel = np.where(finest_domain == 0)[0] # # # In case all domains where set # if len(sel)==0: # break # # x, y = wrf.ll_to_xy(wrfin=self._loc_files[n], # latitude=lat[sel], # longitude=lon[sel], # meta=False, # as_int=False) # # # # For each domain, check if the observation is inside the # # domain extent # # I put the edges on -0.5 and e_we/e_sn - 0.5. # # Reason is that wrf.ll_to_xy(..., as_int = True) returns # # -0.2 as 0, so it's inside the domain. # # Also, for e_we=99, e_sn = 92, ref_lon=9, ref_lat=51.5: # # wrf.ll_to_xy(wrfin=self._loc_files[0], latitude=51.5, longitude=9., meta=False, as_int=False) # # array([48.49996124, 45.00003737]) # # This means that int indices denote exactly the mass-staggered point. # # # To be able to iterate over x and y in case they're scalars: # x = np.array(x, ndmin=1) # y = np.array(y, ndmin=1) # # # Test: inside domain? # # The -1 here are because of 0-based indices, and the -/+ 0.5 are # # because that's halfway to the next mass-staggered point and # # I treat the WRF grid as boxes. # x_in = [(-0.5 <= x_) and (x_ <= dom_size_x[n] - 1 + 0.5) for x_ in x] # y_in = [(-0.5 <= y_) and (y_ <= dom_size_y[n] - 1 + 0.5) for y_ in y] # # # Save domain, x and y at these locations # in_this_dom = np.where(np.logical_and(x_in, y_in))[0] # finest_domain[sel[in_this_dom]] = n + 1 # # If domain = 1, set _all_ xy to see where they end up # if n==0: # xy[sel, 0] = x # xy[sel, 1] = y # else: # xy[sel[in_this_dom], 0] = x[in_this_dom] # xy[sel[in_this_dom], 1] = y[in_this_dom] # # # # Return the indices and domain # return xy, finest_domain # # def get_groups_space_time(self, dat, time_bins, only_in=False): # """ # Returns a dictionary of lists of indices of observations in dat, # where keys are tuples of (time bin, x bin, y bin and wrf # domain), and values are indices of the observations that fall # within this bin. # # Arguments # --------- # # dat : list # List containing measurement locations: # date (same type as time_bins), latitude, longitude # time_bins : list # List of boundaries of time bins (I use datetime.datetime # objects) # only_in : boolean # If only_in is True, groups that fall outside the time_bins # and domain are not returned. # # """ # # # Time groups # id_t = np.array([bisect.bisect_right(time_bins, dat_date) # for dat_date in dat['date']], # int) # # # Spatial groups (indices and domain) # id_xy_f, dom = self.locate_domain(dat['latitude'], dat['longitude']) # # id_xy = np.round(id_xy_f).astype(int) # # # # # Version of only_in with sel: might be faster if sel is short - # # I don't know! But the results for my test case where identical # # (meaning 'domain' looked correct and identical) for both # # options for only_in. # # # if only_in: # # # Yes, 0<id_t is correct: in bisect_right, it means the # # # value is below the lowest sequence value. # # time_in = np.logical_and(0<id_t, id_t<len(time_bins)) # # space_in = dom != 0 # # sel = np.where(np.logical_and(time_in, space_in))[0] # # # # else: # # sel = range(len(id_t)) # # # # indices = get_index_groups(id_t[sel], id_xy[sel, 0], id_xy[sel, 1], dom[sel]) # # # # # Now have to account for sel again! # # if only_in: # # for k, v in indices.iteritems(): # # indices[k] = sel[v] # # # Version of only_in without sel: might be faster if sel is # # long - I don't know! But the results for my test case where # # identical (meaning 'domain' looked correct and identical) # # # Indices for all groups: # indices = utilities.get_index_groups(id_t, id_xy[:, 0], id_xy[:, 1], dom) # # # Throw the out-of-domain ones out here already # if only_in: # # Remove the index groups where domain is 0 (= outside of # # domain). Need to iterate over a list, because with an # # iterator, python complains that the dictionary changed # # size during iteration. # for k in list(indices.keys()): # if k[3] == 0: # del indices[k] # # Equivalent to the above 3 lines, don't know what's faster: # # groups = {k: v for k, v in groups.iteritems() if k[3] != 0} # # return indices # #
[docs] @staticmethod def times_in_wrf_file(filename): """Returns the times in netcdf file as datetime object""" ncf = nc.Dataset(filename) times_nc = ncf.variables["Times"][:] ncf.close() # Hope the utf-8 always works... times_chr = [str(times_nc[nt, :], "utf-8") for nt in range(times_nc.shape[0])] times_dtm = [dt.datetime.strptime(t_chr, "%Y-%m-%d_%H:%M:%S") for t_chr in times_chr] return times_dtm
[docs] def wrf_times(self, file_list): """Read all times in a list of wrf files Output ------ - 1D-array containing all times - 1D-array containing start indices of each file """ times = list() start_indices = np.ndarray((len(file_list), ), int) for nf in range(len(file_list)): # ncf = nc.Dataset(file_list[nf]) times_this = self.times_in_wrf_file(file_list[nf]) start_indices[nf] = len(times) times += times_this # ncf.close() return times, start_indices
# # def open_wrf_location_files(self): # """ # Keep a description of a small wrf file for each domain in memory # to locate observations. # Appends _loc_file to self. # # Note: Should be edited out of the code. # """ # # ndomains = self.namelist["domains"]["max_dom"] # path = self.settings["run_dir"] # pattern = "wrfinput_d%02d" # self._loc_files = list() # for nd in range(1, ndomains+1): # fp = os.path.join(path, pattern % nd) # self._loc_files.append(nc.Dataset(fp, "r")) # # def close_wrf_location_files(self): # """See _open_wrf_location_files""" # for loc_file in self._loc_files: # loc_file.close() # # def wrf_filename_times(self, prefix): # """Get timestamps in wrf file names in run directory.""" # # # List all filenames # files = self.get_wrf_filenames(prefix + "*") # # Only use d01 files, pattern should be the same for all domains # pattern = os.path.join(self.settings["run_dir"], prefix) # files = [f for f in files if re.search(pattern, f)] # # Extract timestamp from filename # # Format is %Y-%m-%d_%H:%M:%S at the end of the filename # pattern_time = "%Y-%m-%d_%H:%M:%S" # len_tstamp = len(pattern_time) + 2 # times = [dt.datetime.strptime(f[-len_tstamp:], pattern_time) # for f in files] # # return times # # def get_wrf_filenames(self, glob_pattern): # """ # Gets the filenames in self.settings["run_dir"] that follow # glob_pattern, excluding those that end with # self.settings["original_save_suffix"] # """ # path = self.settings["run_dir"] # # All files... # wfiles = glob.glob(os.path.join(path, glob_pattern)) # # All originals # orig_suf = self.settings["original_save_suffix"] # opattern = glob_pattern + orig_suf # ofiles = glob.glob(os.path.join(path, opattern)) # # # All files except all originals: # files = [x for x in wfiles if x not in ofiles] # # # I need this sorted too often to not do it here. # files = np.sort(files).tolist() # return files # # # def sample_total_columns(self, dat, loc, fields_list): # """ # Sample total_columns of fields_list in WRF output in # self.settings["run_dir"] at the location id_xy in domain, id_t # in all wrfout-times. Files and indices therein are recognized # by id_t and file_time_start_indices. # All quantities needed for computing total columns from profiles # are in dat (kernel, prior, ...). # # Arguments # --------- # dat (:class:`list`) # Result of wrfhelper.read_sampling_coords. Used here: prior, # prior_profile, kernel, psurf, pressure_axis, [, pwf] # If psurf or any of pressure_axis are nan, wrf's own # surface pressure is used and pressure_axis constructed # from this and the number of levels in the averaging kernel. # This allows sampling with synthetic data that don't have # pressure information. This only works with level_def # "layer_average". # If pwf is not present or nan, a simple one is created, for # level_def "layer_average". # loc (:class:`dict`) # A dictionary with all location-related input for sampling, # computed in wrfout_sampler. Keys: # id_xy, domain: Domain coordinates # id_t: Timestep (continous throughout all files) # frac_t: Interpolation coeficient between id_t and id_t+1: # t_obs = frac_t*t[id_t] + (1-frac_t)*t[id_t+1]) # file_start_time_indices: Time index at which a new wrfout # file starts # files: names of wrfout files. # fields_list (:class:`list`) # The fields to sample total columns from. # # Output # ------ # sampled_columns (:class:`array`) # A 2D-array of sampled columns. # Shape: (len(dat["prior"]), len(fields_list)) # """ # # # Initialize output # tc = np.ndarray(shape=(len(dat["prior"]), len(fields_list)), dtype=float) # tc[:] = float("nan") # # # Process by domain # UD = list(set(loc["domain"])) # for dom in UD: # idd = np.nonzero(loc["domain"] == dom)[0] # # Process by id_t # UT = list(set(loc["id_t"][idd])) # for time_id in UT: # # Coordinates to process # idt = idd[np.nonzero(loc["id_t"][idd] == time_id)[0]] # # Get tracer ensemble profiles # profiles = self._read_and_intrp_v(loc, fields_list, time_id, idt) # # List, len=len(fields_list), shape of each: (len(idt),nz) # # Get pressure axis: # #paxis = self.read_and_intrp(wh_names, id_ts, frac_t, id_xy, "P_HYD")/1e2 # Pa -> hPa # psurf = self._read_and_intrp_v(loc, ["PSFC"], time_id, idt)[0]/1.e2 # Pa -> hPa # # Shape: (len(idt),) # ptop = float(self.namelist["domains"]["p_top_requested"])/1.e2 # # Shape: (len(idt),) # znw = self._read_and_intrp_v(loc, ["ZNW"], time_id, idt)[0] # #Shape:(len(idt),nz) # # # DONE reading from file. # # Here it starts to make sense to loop over individual observations # for nidt in range(len(idt)): # nobs = idt[nidt] # # Construct model pressure layer boundaries # pb_mod = self.get_pressure_boundaries_znw(znw[nidt, :], psurf[nidt], ptop) # # if (np.diff(pb_mod) >= 0).any(): # msg = ("Model pressure boundaries for observation %d " + \ # "are not monotonically decreasing! Investigate.") % nobs # raise ValueError(msg) # # # Construct retrieval pressure layer boundaries # if dat["level_def"][nobs] == "layer_average": # if np.any(np.isnan(dat["pressure_levels"][nobs])) \ # or np.isnan(dat["psurf"][nobs]): # # Code for synthetic data without a pressure axis, # # but with an averaging kernel: # # Use wrf's surface and top pressure # nlayers = len(dat["averaging_kernel"][nobs]) # pb_ret = np.linspace(psurf[nidt], ptop, nlayers+1) # else: # pb_ret = self.get_pressure_boundaries_paxis( # dat["pressure_levels"][nobs], # dat["psurf"][nobs]) # elif dat["level_def"][nobs] == "pressure_boundary": # if np.any(np.isnan(dat["pressure_levels"][nobs])): # # Code for synthetic data without a pressure axis, # # but with an averaging kernel: # # Use wrf's surface and top pressure # nlevels = len(dat["averaging_kernel"][nobs]) # pb_ret = np.linspace(psurf[nidt], ptop, nlevels) # else: # pb_ret = dat["pressure_levels"][nobs] # # if (np.diff(pb_ret) >= 0).any(): # msg = ("Retrieval pressure boundaries for " + \ # "observation %d are not monotonically " + \ # "decreasing! Investigate.") % nobs # raise ValueError(msg) # # # Get vertical integration coefficients (i.e. to # # "interpolate" from model to retrieval grid) # coef_matrix = self.get_int_coefs(pb_ret, pb_mod, dat["level_def"][nobs]) # # # Model retrieval with averaging kernel and prior profile # if "pressure_weighting_function" in list(dat.keys()): # pwf = dat["pressure_weighting_function"][nobs] # if (not "pressure_weighting_function" in list(dat.keys())) or np.any(np.isnan(pwf)): # # Construct pressure weighting function from # # pressure boundaries # pwf = self.get_pressure_weighting_function(pb_ret, rule="simple") # # # Compute pressure-weighted averaging kernel # avpw = pwf*dat["averaging_kernel"][nobs] # # # Get prior # prior_col = dat["prior"][nobs] # prior_profile = dat["prior_profile"][nobs] # if np.isnan(prior_col): # compute prior # prior_col = np.dot(pwf, prior_profile) # # # Compute total columns # for nf in range(len(fields_list)): # # Integrate model profile # profile_intrp = np.matmul(coef_matrix, profiles[nf][nidt, :]) # # # Model retrieval # tc[nobs, nf] = prior_col + np.dot(avpw, profile_intrp - prior_profile) # # # Test phase: save pb_ret, pb_mod, coef_matrix, # # one profile for manual checking # # # dat_save = dict(pb_ret=pb_ret, # # pb_mod=pb_mod, # # coef_matrix=coef_matrix, # # ens_profile=ens_profiles[0], # # profile_intrp=profile_intrp, # # id=dat.id) # # # #out = open("model_profile_%d.pkl"%dat.id, "w") # #cPickle.dump(dat_save, out, 0) # # Average over footprint # if self.settings["footprint_samples_dim"] > 1: # indices = utilities.get_index_groups(dat["sounding_id"]) # # # Make sure that this is correct: i know the number of indices # lens = [len(group) for group in list(indices.values())] # correct_len = self.settings["footprint_samples_dim"]**2 # if np.any([len_ != correct_len for len_ in set(lens)]): # raise ValueError("Not all footprints have %d samples" %correct_len) # # Ok, paranoid mode, also confirm that the indices are what I # # think they are: consecutive numbers # ranges = [np.ptp(group) for group in list(indices.values())] # if np.any([ptp != correct_len for ptp in set(ranges)]): # raise ValueError("Not all footprints have consecutive samples") # # tc_original = copy.deepcopy(tc) # tc = utilities.apply_by_group(np.average, tc_original, indices) # # return tc # # @staticmethod # def _read_and_intrp_v(loc, fields_list, time_id, idp): # """ # Helper function for sample_total_columns. # read_and_intrp, but vectorized. # Reads in fields and interpolates # them linearly in time. # # Arguments # ---------- # loc (:class:`dict`) # Passed through from sample_total_columns, see there. # fields_list (:class:`list` of :class:`str`) # List of netcdf-variables to process. # time_id (:class:`int`) # Time index referring to all files in loc to read # idp (:class:`array` of :class:`int`) # Indices for id_xy, domain and frac_t in loc (i.e. # observations) to process. # # Output # ------ # List of temporally interpolated fields, one entry per member of # fields_list. # """ # # var_intrp_l = list() # # # Check we were really called with observations for just one domain # domains = set(loc["domain"][idp]) # if len(domains) > 1: # raise ValueError("I can only operate on idp with identical domains.") # dom = domains.pop() # # # Select input files # id_file0 = bisect.bisect_right(loc["file_start_time_indices"][dom], time_id) - 1 # id_file1 = bisect.bisect_right(loc["file_start_time_indices"][dom], time_id+1) - 1 # if id_file0 < 0 or id_file1 < 0: # raise ValueError("This shouldn't happen.") # # # Get time id in file # id_t_file0 = time_id - loc["file_start_time_indices"][dom][id_file0] # id_t_file1 = time_id+1 - loc["file_start_time_indices"][dom][id_file1] # # # Open files # nc0 = nc.Dataset(loc["files"][dom][id_file0], "r") # nc1 = nc.Dataset(loc["files"][dom][id_file1], "r") # # Per field to sample # for field in fields_list: # # Read input file # field0 = wrf.getvar(wrfin=nc0, # varname=field, # timeidx=id_t_file0, # squeeze=False, # meta=False) # # field1 = wrf.getvar(wrfin=nc1, # varname=field, # timeidx=id_t_file1, # squeeze=False, # meta=False) # # if len(field0.shape) == 4: # # Sample field at timesteps before and after observation # # They are ordered nt x nz x ny x nx # # var0 will have shape (len(idp),len(profile)) # var0 = field0[0, :, loc["id_xy"][idp, 1], loc["id_xy"][idp, 0]] # var1 = field1[0, :, loc["id_xy"][idp, 1], loc["id_xy"][idp, 0]] # # Repeat frac_t for profile size # frac_t_ = np.array(loc["frac_t"][idp]).reshape((len(idp), 1)).repeat(var0.shape[1], 1) # elif len(field0.shape) == 3: # # var0 will have shape (len(idp),) # var0 = field0[0, loc["id_xy"][idp, 1], loc["id_xy"][idp, 0]] # var1 = field1[0, loc["id_xy"][idp, 1], loc["id_xy"][idp, 0]] # frac_t_ = np.array(loc["frac_t"][idp]) # elif len(field0.shape) == 2: # # var0 will have shape (len(idp),len(profile)) # # This is for ZNW, which is saved as (time_coordinate, # # vertical_coordinate) # var0 = field0[[0]*len(idp), :] # var1 = field1[[0]*len(idp), :] # frac_t_ = np.array(loc["frac_t"][idp]).reshape((len(idp), 1)).repeat(var0.shape[1], 1) # else: # raise ValueError("Can't deal with field with %d dimensions." % len(field0.shape)) # # # Interpolate in time # var_intrp_l.append(var0*frac_t_ + var1*(1. - frac_t_)) # # nc0.close() # nc1.close() # # return var_intrp_l # # @staticmethod # def read_sampling_coords(sampling_coords_file, id0=None, id1=None): # """Read in samples""" # # ncf = nc.Dataset(sampling_coords_file, "r") # if id0 is None: # id0 = 0 # if id1 is None: # id1 = len(ncf.dimensions['soundings']) # # dat = dict( # sounding_id=np.array(ncf.variables["sounding_id"][id0:id1]), # date=ncf.variables["date"][id0:id1], # latitude=np.array(ncf.variables["latitude"][id0:id1]), # longitude=np.array(ncf.variables["longitude"][id0:id1]), # latc_0=np.array(ncf.variables["latc_0"][id0:id1]), # latc_1=np.array(ncf.variables["latc_1"][id0:id1]), # latc_2=np.array(ncf.variables["latc_2"][id0:id1]), # latc_3=np.array(ncf.variables["latc_3"][id0:id1]), # lonc_0=np.array(ncf.variables["lonc_0"][id0:id1]), # lonc_1=np.array(ncf.variables["lonc_1"][id0:id1]), # lonc_2=np.array(ncf.variables["lonc_2"][id0:id1]), # lonc_3=np.array(ncf.variables["lonc_3"][id0:id1]), # prior=np.array(ncf.variables["prior"][id0:id1]), # prior_profile=np.array(ncf.variables["prior_profile"][id0:id1,]), # averaging_kernel=np.array(ncf.variables["averaging_kernel"][id0:id1]), # pressure_levels=np.array(ncf.variables["pressure_levels"][id0:id1]), # pressure_weighting_function=np.array(ncf.variables["pressure_weighting_function"][id0:id1]), # level_def=ncf.variables["level_def"][id0:id1], # psurf=np.array(ncf.variables["psurf"][id0:id1]) # ) # # ncf.close() # # # Convert level_def from it's weird nc format to string # dat["level_def"] = nc.chartostring(dat["level_def"]) # # # Convert date to datetime object # dat["time"] = [dt.datetime(*x) for x in dat["date"]] # # return dat # # @staticmethod # def write_simulated_columns(obs_id, simulated, nmembers, outfile): # """Write simulated observations to file.""" # # # Output format: see obs_xco2_fr # # f = io.CT_CDF(outfile, method="create") # # dimid = f.createDimension("sounding_id", size=None) # dimid = ("sounding_id",) # savedict = io.std_savedict.copy() # savedict["name"] = "sounding_id" # savedict["dtype"] = "int64" # savedict["long_name"] = "Unique_Dataset_observation_index_number" # savedict["units"] = "" # savedict["dims"] = dimid # savedict["comment"] = "Format as in input" # savedict["values"] = obs_id.tolist() # f.add_data(savedict, nsets=0) # # dimmember = f.createDimension("nmembers", size=nmembers) # dimmember = ("nmembers",) # savedict = io.std_savedict.copy() # savedict["name"] = "column_modeled" # savedict["dtype"] = "float" # savedict["long_name"] = "Simulated total column" # savedict["units"] = "??" # savedict["dims"] = dimid + dimmember # savedict["comment"] = "Simulated model value created by WRFChemOO" # savedict["values"] = simulated.tolist() # f.add_data(savedict, nsets=0) # # f.close() # # @staticmethod # def save_file_with_timestamp(file_path, out_dir, suffix=""): # """ Saves a file to with a timestamp""" # nowstamp = dt.datetime.now().strftime("_%Y-%m-%d_%H:%M:%S") # new_name = os.path.basename(file_path) + suffix + nowstamp # new_path = os.path.join(out_dir, new_name) # shutil.copy2(file_path, new_path) if __name__ == "__main__": pass