#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Created on Mon Jul 22 15:03:02 2019
VERSION HISTORY
2021-09-25 freum Adapted from CTDAS-WRF wrfchem_helper.py
- Everything not (yet) needed is commented
@author: friedemann
"""
# Instructions for pylint:
# pylint: disable=too-many-instance-attributes
# pylint: disable=W0201
# pylint: disable=C0301
# pylint: disable=E1136
# pylint: disable=E1101
import os
import shutil
import re
import glob
import bisect
import copy
import numpy as np
import netCDF4 as nc
import datetime as dt
# import wrf # not needed yet - downgrades numpy on mac, creating pandas errors
import f90nml
import pickle
# CTDAS modules
# import da.tools.io4 as io
# from da.tools.wrfchem.utilities import utilities
[docs]
class WRFChemHelper(object):
"""Contains helper functions for sampling WRF-Chem"""
def __init__(self, settings):
self.settings = settings
# def validate_settings(self, needed_items=[]):
# """
# This is based on WRFChemOO._validate_rc
# """
#
# if len(needed_items)==0:
# return
#
# for key in needed_items:
# if key not in self.settings:
# msg = "Missing a required value in settings: %s" % key
# raise IOError(msg)
[docs]
@staticmethod
def read_namelist(nml_file):
"""Read run settings from namelist file"""
namelist = f90nml.read(nml_file)
# Some values are lists when running with nested domains, but
# not when running with only one domain. Ensure here those
# values are always lists.
vars2list = dict(domains = ["e_we", "e_sn",
"parent_id", "parent_grid_ratio",
"i_parent_start", "j_parent_start"
],
time_control = ["history_interval",
"auxinput5_interval_m"]
)
for section, varnames in vars2list.items():
for var in varnames:
if not isinstance(namelist[section][var], list):
namelist[section][var] = [namelist[section][var]]
return namelist
# @staticmethod
# def get_pressure_boundaries_paxis(p_axis, p_surf):
# """
# Arguments
# ---------
# p_axis (:class:`array-like`)
# Pressure at mid points of layers
# p_surf (:class:`numeric`)
# Surface pressure
# Output
# ------
# Pressure at layer boundaries
# """
#
# pb = np.array([float("nan")]*(len(p_axis)+1))
# pb[0] = p_surf
#
# for nl in range(len(pb)-1):
# pb[nl+1] = pb[nl] + 2*(p_axis[nl] - pb[nl])
#
# return pb
#
# @staticmethod
# def get_pressure_boundaries_znw(znw, p_surf, p_top):
# """
# Arguments
# ---------
# ZNW (:class:`ndarray`)
# Eta coordinates of z-staggered WRF grid. For each
# observation (2D)
# p_surf (:class:`ndarray`)
# Surface pressure (1D)
# p_top (:class:`ndarray`)
# Top-of-atmosphere pressure (1D)
# Output
# ------
# Pressure at layer boundaries
#
# CAVEATS
# -------
# Maybe I should rather use P_HYD? Well, Butler et al. 2018
# (https://www.geosci-model-dev-discuss.net/gmd-2018-342/) used
# znu and surface pressure to compute "WRF midpoint layer
# pressure".
#
# For WRF it would be more consistent to interpolate to levels.
# See also comments in code.
# """
#
# return znw*(p_surf-p_top) + p_top
#
#
[docs]
@staticmethod
def get_int_coefs(pb_ret, pb_mod, level_def):
"""
Computes a coefficients matrix to transfer a model profile onto
a retrieval pressure axis.
If level_def=="layer_average", this assumes that profiles are
constant in each layer of the retrieval, bound by the pressure
boundaries pb_ret. In this case, the WRF model layer is treated
in the same way, and coefficients integrate over the assumed
constant model layers. This works with non-staggered WRF
variables (on "theta" points). However, this is actually not how
WRF is defined, and the implementation should be changed to
z-staggered variables. Details for this change are in a comment
at the beginning of the code.
If level_def=="pressure_boundary" (IMPLEMENTATION IN PROGRESS),
assumes that profiles, kernel and pwf are defined at pressure
boundaries that don't have a thickness (this is how OCO-2 data
are defined, for example). In this case, the coefficients
linearly interpolate adjacent model level points. This is
incompatible with the treatment of WRF in the above-described
layer-average assumption, but is closer to how WRF is actually
defined. The exception is that pb_mod is still constructed and
non-staggered variables are not defined at psurf. This can only
be fixed by switching to z-staggered variables.
In cases where retrieval surface pressure is higher than model
surface pressure, and in cases where retrieval top pressure is
lower than model top pressure, the model profile will be
extrapolated with constant tracer mixing ratios. In cases where
retrieval surface pressure is lower than model surface pressure,
and in cases where retrieval top pressure is higher than model
top pressure, only the parts of the model column that fall
within the retrieval presure boundaries are sampled.
Arguments
---------
pb_ret (:class:`array_like`)
Pressure boundaries of the retrieval column
pb_mod (:class:`array_like`)
Pressure boundaries of the model column
level_def (:class:`string`)
"layer_average" or "pressure_boundary" (IMPLEMENTATION IN
PROGRESS). Refers to the retrieval profile.
Note 2021-09-13: Inspected code for pressure_boundary.
Should be correct. Interpolates linearly between two model
levels.
Returns
-------
coefs (:class:`array_like`)
Integration coefficient matrix. Each row sums to 1.
Usage
-----
.. code-block:: python
import numpy as np
pb_ret = np.linspace(900., 50., 5)
pb_mod = np.linspace(1013., 50., 7)
model_profile = 1. - np.linspace(0., 1., len(pb_mod)-1)**3
coefs = get_int_coefs(pb_ret, pb_mod, "layer_average")
retrieval_profile = np.matmul(coefs, model_profile)
"""
if level_def == "layer_average":
# This code assumes that WRF variables are constant in
# layers, but they are defined on levels. This can be seen
# for example by asking wrf.interplevel for the value of a
# variable that is defined on the mass grid ("theta points")
# at a pressure slightly higher than the pressure on its
# grid (wrf.getvar(ncf, "p")), it returns nan. So There is
# no extrapolation. There are no layers. There are only
# levels.
# In addition, this page here:
# https://www.openwfm.org/wiki/How_to_interpret_WRF_variables
# says that to find values at theta-points of a variable
# living on u-points, you interpolate linearly. That's the
# other way around from what I would do if I want to go from
# theta to staggered.
# WRF4.0 user guide:
# - ungrib can interpolate linearly in p or log p
# - real.exe comes with an extrap_type namelist option, that
# extrapolates constantly BELOW GROUND.
# This would mean the correct way would be to integrate over
# a piecewise-linear function. It also means that I really
# want the value at surface level, so I'd need the CO2
# fields on the Z-staggered grid ("w-points")! Interpolate
# the vertical in p with wrf.interp1d, example:
# wrf.interp1d(np.array(rh.isel(south_north=1, west_east=0)),
# np.array(p.isel(south_north=1, west_east=0)),
# np.array(988, 970))
# (wrf.interp1d gives the same results as wrf.interplevel,
# but the latter just doesn't want to work with single
# columns (32,1,1), it wants a dim>1 in the horizontal
# directions)
# So basically, I can keep using pb_ret and pb_mod, but it
# would be more accurate to do the piecewise-linear
# interpolation and the output matrix will have 1 more
# value in each dimension.
# Calculate integration weights by weighting with layer
# thickness. This assumes that both axes are ordered
# psurf to ptop.
coefs = np.ndarray(shape=(len(pb_ret)-1, len(pb_mod)-1))
coefs[:] = 0.
# Extend the model pressure grid if retrieval encompasses
# more.
pb_mod_tmp = copy.deepcopy(pb_mod)
# In case the retrieval pressure is higher than the model
# surface pressure, extend the lowest model layer.
if pb_mod_tmp[0] < pb_ret[0]:
pb_mod_tmp[0] = pb_ret[0]
# In case the model doesn't extend as far as the retrieval,
# extend the upper model layer upwards.
if pb_mod_tmp[-1] > pb_ret[-1]:
pb_mod_tmp[-1] = pb_ret[-1]
# For each retrieval layer, this loop computes which
# proportion falls into each model layer.
for nret in range(len(pb_ret)-1):
# 1st model pressure boundary index = the one before the
# first boundary with lower pressure than high-pressure
# retrieval layer boundary.
model_lower = pb_mod_tmp < pb_ret[nret]
id_model_lower = model_lower.nonzero()[0]
id_min = id_model_lower[0]-1
# Last model pressure boundary index = the last one with
# higher pressure than low-pressure retrieval layer
# boundary.
model_higher = pb_mod_tmp > pb_ret[nret+1]
id_model_higher = model_higher.nonzero()[0]
if len(id_model_higher) == 0:
#id_max = id_min
raise ValueError("This shouldn't happen. Debug.")
else:
id_max = id_model_higher[-1]
# By the way, in case there is no model level with
# higher pressure than the next retrieval level,
# id_max must be the same as id_min.
# For each model layer, find out how much of it makes up this
# retrieval layer
for nmod in range(id_min, id_max+1):
if (nmod == id_min) & (nmod != id_max):
# Part of 1st model layer that falls within
# retrieval layer
coefs[nret, nmod] = pb_ret[nret] - pb_mod_tmp[nmod+1]
elif (nmod != id_min) & (nmod == id_max):
# Part of last model layer that falls within
# retrieval layer
coefs[nret, nmod] = pb_mod_tmp[nmod] - pb_ret[nret+1]
elif (nmod == id_min) & (nmod == id_max):
# id_min = id_max, i.e. model layer encompasses
# retrieval layer
coefs[nret, nmod] = pb_ret[nret] - pb_ret[nret+1]
else:
# Retrieval layer encompasses model layer
coefs[nret, nmod] = pb_mod_tmp[nmod] - pb_mod_tmp[nmod+1]
coefs[nret, :] = coefs[nret, :]/sum(coefs[nret, :])
# I tested the code with many cases, but I'm only 99.9% sure
# it works for all input. Hence a test here that the
# coefficients sum to 1 and dump the data if not.
sum_ = np.abs(coefs.sum(1) - 1)
if np.any(sum_ > 2.*np.finfo(sum_.dtype).eps):
dump = dict(pb_ret=pb_ret,
pb_mod=pb_mod,
level_def=level_def)
fp = "int_coefs_dump.pkl"
with open(fp, "w") as f:
pickle.dump(dump, f, 0)
msg_fmt = "Something doesn't sum to 1. Arguments dumped to: %s"
raise ValueError(msg_fmt % fp)
elif level_def=="pressure_boundary":
#msg = "level_def is pressure_boundary. Implementation not complete."
##logging.error(msg)
#raise ValueError(msg)
# Note 2021-09-13: Inspected the code. Should be correct.
# Go back to pressure midpoints for model...
# Change this line to p_mod = pb_mod for z-staggered
# variables
p_mod = pb_mod[1:] - 0.5*np.diff(pb_mod)
coefs = np.ndarray(shape=(len(pb_ret), len(pb_mod)-1))
coefs[:] = 0.
# For each retrieval pressure level, compute linear
# interpolation coefficients
for nret in range(len(pb_ret)):
nmod_list = (p_mod < pb_ret[nret]).nonzero()[0]
if(len(nmod_list)>0):
nmod = nmod_list[0] - 1
if nmod==-1:
# Constant extrapolation at surface
nmod = 0
coef = 1.
else:
# Normal case:
coef = (pb_ret[nret]-p_mod[nmod+1])/(p_mod[nmod]-p_mod[nmod+1])
else:
# Constant extrapolation at atmosphere top
nmod = len(p_mod)-2
coef=0.
coefs[nret, nmod] = coef
coefs[nret, nmod+1] = 1.-coef
elif level_def=="linear_average_pb":
# pb_mod defined on levels and varying linearly in between
# pb_ret defined on boundaries that define layers.
# Coefficients are averages (integral/width) over piecewise
# linear pb_mod
# Constant extrapolation at model bottom and top:
# Add an extra level, and after computing coefficients,
# add the value of the extra level to the original bottom/
# top and delete the extra level
pb_mod_tmp = copy.deepcopy(pb_mod)
if pb_mod[0] < pb_ret[0]:
pb_mod_tmp = np.append(pb_ret[0], np.array(pb_mod_tmp))
if pb_mod[-1] > pb_ret[-1]:
pb_mod_tmp = np.append(np.array(pb_mod_tmp), pb_ret[-1])
coefs = np.ndarray(shape=(len(pb_ret)-1, len(pb_mod_tmp)))
coefs[:] = 0.
for nret in range(len(pb_ret)-1):
# Special case: zero-thickness layer.
# This is for the misuse of this procedure for temporal
# interpolation, specifically the case of instantaneous
# measurements.
if pb_ret[nret] == pb_ret[nret+1]:
nmod = [n
for n in range(len(pb_mod_tmp))
if pb_mod_tmp[n]>=pb_ret[nret]>pb_mod_tmp[n+1]]
if len(nmod)!=1:
raise ValueError("This shouldn't happen.")
nmod = nmod[0]
coefs[nret, nmod] += (pb_ret[nret] - pb_mod_tmp[nmod+1]) / (pb_mod_tmp[nmod]-pb_mod_tmp[nmod+1])
coefs[nret, nmod+1] += (pb_mod_tmp[nmod] - pb_ret[nret]) / (pb_mod_tmp[nmod]-pb_mod_tmp[nmod+1])
continue
# 1st model pressure boundary index = the one before the
# first boundary with lower pressure than high-pressure
# retrieval layer boundary.
model_lower = pb_mod_tmp < pb_ret[nret]
id_model_lower = model_lower.nonzero()[0]
id_min = id_model_lower[0]-1
# Last model pressure boundary index = the last one with
# higher pressure than low-pressure retrieval layer
# boundary.
model_higher = pb_mod_tmp > pb_ret[nret+1]
id_model_higher = model_higher.nonzero()[0]
if len(id_model_higher) == 0:
#id_max = id_min
raise ValueError("This shouldn't happen. Debug.")
else:
id_max = id_model_higher[-1]
# For each model layer, add to piecewise linear integration coefficients
for nmod in range(id_min, id_max+1):
if (pb_ret[nret]<=pb_mod_tmp[nmod]) & (pb_ret[nret+1]<=pb_mod_tmp[nmod+1]):
# A retrieval layer start falls on nth model level or between nth and (n+1)th,
# while it ends at or above the next model level
coefs[nret, nmod] += (pb_ret[nret] - pb_mod_tmp[nmod+1])**2 / (2*(pb_mod_tmp[nmod] - pb_mod_tmp[nmod+1]))
coefs[nret, nmod+1] += (pb_ret[nret] - pb_mod_tmp[nmod+1])*(2*pb_mod_tmp[nmod] - pb_ret[nret] - pb_mod_tmp[nmod+1]) / (2*(pb_mod_tmp[nmod] - pb_mod_tmp[nmod+1]))
elif (pb_ret[nret]>pb_mod_tmp[nmod]) & (pb_ret[nret+1]>=pb_mod_tmp[nmod+1]):
# Retrieval layer starts below nth model layer, ends at or below (n+1)th model layer
coefs[nret, nmod] += (pb_mod_tmp[nmod] - pb_ret[nret+1])*(pb_mod_tmp[nmod] + pb_ret[nret+1] - 2*pb_mod_tmp[nmod+1]) / (2*(pb_mod_tmp[nmod] - pb_mod_tmp[nmod+1]))
coefs[nret, nmod+1] += (pb_mod_tmp[nmod] - pb_ret[nret+1])**2 / (2*(pb_mod_tmp[nmod] - pb_mod_tmp[nmod+1]))
elif (pb_ret[nret]<=pb_mod_tmp[nmod]) & (pb_ret[nret+1]>pb_mod_tmp[nmod+1]):
# Model layer encompasses retrieval layer
coefs[nret, nmod] += (pb_ret[nret] - pb_ret[nret+1])*(pb_ret[nret] + pb_ret[nret+1] - 2*pb_mod_tmp[nmod+1]) / (2*(pb_mod_tmp[nmod] - pb_mod_tmp[nmod+1]))
coefs[nret, nmod+1] += (pb_ret[nret] - pb_ret[nret+1])*(2*pb_mod_tmp[nmod] - pb_ret[nret] - pb_ret[nret+1]) / (2*(pb_mod_tmp[nmod] - pb_mod_tmp[nmod+1]))
else:
# Retrieval layer encompasses model layer
coefs[nret, nmod] += (pb_mod_tmp[nmod] - pb_mod_tmp[nmod+1])/2
coefs[nret, nmod+1] += (pb_mod_tmp[nmod] - pb_mod_tmp[nmod+1])/2
# Convert integral to average
coefs[nret, :] = coefs[nret, :]/(pb_ret[nret]-pb_ret[nret+1])
# Constant extrapolation:
# Add the value of the temporary extra level to the
# bottom/top and remove extra level
if pb_mod[0] < pb_ret[0]:
coefs[:, 1] += coefs[:, 0]
coefs = coefs[:, 1:]
if pb_mod[-1] > pb_ret[-1]:
coefs[:, -2] += coefs[:, -1]
coefs = coefs[:, :-1]
else:
msg = "Unknown level_def: " + level_def
raise ValueError(msg)
return coefs
#
# @staticmethod
# def get_pressure_weighting_function(pressure_boundaries, rule):
# """
# Compute pressure weighting function according to 'rule'.
# Valid rules are:
# - simple (=layer thickness)
# - connor2008 (not implemented)
# """
# if rule == 'simple':
# pwf = np.abs(np.diff(pressure_boundaries)/np.ptp(pressure_boundaries))
# else:
# raise NotImplementedError("Rule %s not implemented" % rule)
#
# return pwf
#
# def locate_domain(self, lat, lon):
# """
# Input
# -----
# lat: Single values or lists or np.arrays
# lon: Single values or lists or np.arrays
#
# Output
# ------
# - xy-coordinates in finest domain that contains the coordinates
# - finest domain
#
# NOTE
# ----
# self.open_wrf_location_files() must be run first
# """
#
# if not hasattr(self, "_loc_files"):
# raise RuntimeError("Must call open_wrf_loc_files first.")
#
# # Work with arrays internally.
# lat = np.array(lat, ndmin=1)
# lon = np.array(lon, ndmin=1)
#
# # Get coordinates of the observation in all domains
# ndomains = self.namelist["domains"]["max_dom"]
#
# # Get domain sizes in xy on mass (=unstaggered) grid (hence the -1)
# dom_size_x = np.array(self.namelist['domains']['e_we'], dtype=float, ndmin=1) - 1
# dom_size_y = np.array(self.namelist['domains']['e_sn'], dtype=float, ndmin=1) - 1
#
# # Initialize output
# xy = np.zeros((len(lat), 2))
# xy[:] = np.nan
# finest_domain = np.zeros((len(lat), ), int)
#
# # Since wrf.ll_to_xy takes very long, I save a bit of time here
# # by starting in the finest domain and processing only
# # observations that weren't previously found.
# for n in range(ndomains-1, -1, -1):
# # Get xy for this domain
# # Only process what you haven't processed
# sel = np.where(finest_domain == 0)[0]
#
# # In case all domains where set
# if len(sel)==0:
# break
#
# x, y = wrf.ll_to_xy(wrfin=self._loc_files[n],
# latitude=lat[sel],
# longitude=lon[sel],
# meta=False,
# as_int=False)
#
#
# # For each domain, check if the observation is inside the
# # domain extent
# # I put the edges on -0.5 and e_we/e_sn - 0.5.
# # Reason is that wrf.ll_to_xy(..., as_int = True) returns
# # -0.2 as 0, so it's inside the domain.
# # Also, for e_we=99, e_sn = 92, ref_lon=9, ref_lat=51.5:
# # wrf.ll_to_xy(wrfin=self._loc_files[0], latitude=51.5, longitude=9., meta=False, as_int=False)
# # array([48.49996124, 45.00003737])
# # This means that int indices denote exactly the mass-staggered point.
#
# # To be able to iterate over x and y in case they're scalars:
# x = np.array(x, ndmin=1)
# y = np.array(y, ndmin=1)
#
# # Test: inside domain?
# # The -1 here are because of 0-based indices, and the -/+ 0.5 are
# # because that's halfway to the next mass-staggered point and
# # I treat the WRF grid as boxes.
# x_in = [(-0.5 <= x_) and (x_ <= dom_size_x[n] - 1 + 0.5) for x_ in x]
# y_in = [(-0.5 <= y_) and (y_ <= dom_size_y[n] - 1 + 0.5) for y_ in y]
#
# # Save domain, x and y at these locations
# in_this_dom = np.where(np.logical_and(x_in, y_in))[0]
# finest_domain[sel[in_this_dom]] = n + 1
# # If domain = 1, set _all_ xy to see where they end up
# if n==0:
# xy[sel, 0] = x
# xy[sel, 1] = y
# else:
# xy[sel[in_this_dom], 0] = x[in_this_dom]
# xy[sel[in_this_dom], 1] = y[in_this_dom]
#
#
# # Return the indices and domain
# return xy, finest_domain
#
# def get_groups_space_time(self, dat, time_bins, only_in=False):
# """
# Returns a dictionary of lists of indices of observations in dat,
# where keys are tuples of (time bin, x bin, y bin and wrf
# domain), and values are indices of the observations that fall
# within this bin.
#
# Arguments
# ---------
#
# dat : list
# List containing measurement locations:
# date (same type as time_bins), latitude, longitude
# time_bins : list
# List of boundaries of time bins (I use datetime.datetime
# objects)
# only_in : boolean
# If only_in is True, groups that fall outside the time_bins
# and domain are not returned.
#
# """
#
# # Time groups
# id_t = np.array([bisect.bisect_right(time_bins, dat_date)
# for dat_date in dat['date']],
# int)
#
# # Spatial groups (indices and domain)
# id_xy_f, dom = self.locate_domain(dat['latitude'], dat['longitude'])
#
# id_xy = np.round(id_xy_f).astype(int)
#
#
#
# # Version of only_in with sel: might be faster if sel is short -
# # I don't know! But the results for my test case where identical
# # (meaning 'domain' looked correct and identical) for both
# # options for only_in.
#
# # if only_in:
# # # Yes, 0<id_t is correct: in bisect_right, it means the
# # # value is below the lowest sequence value.
# # time_in = np.logical_and(0<id_t, id_t<len(time_bins))
# # space_in = dom != 0
# # sel = np.where(np.logical_and(time_in, space_in))[0]
# #
# # else:
# # sel = range(len(id_t))
# #
# # indices = get_index_groups(id_t[sel], id_xy[sel, 0], id_xy[sel, 1], dom[sel])
# #
# # # Now have to account for sel again!
# # if only_in:
# # for k, v in indices.iteritems():
# # indices[k] = sel[v]
#
# # Version of only_in without sel: might be faster if sel is
# # long - I don't know! But the results for my test case where
# # identical (meaning 'domain' looked correct and identical)
#
# # Indices for all groups:
# indices = utilities.get_index_groups(id_t, id_xy[:, 0], id_xy[:, 1], dom)
#
# # Throw the out-of-domain ones out here already
# if only_in:
# # Remove the index groups where domain is 0 (= outside of
# # domain). Need to iterate over a list, because with an
# # iterator, python complains that the dictionary changed
# # size during iteration.
# for k in list(indices.keys()):
# if k[3] == 0:
# del indices[k]
# # Equivalent to the above 3 lines, don't know what's faster:
# # groups = {k: v for k, v in groups.iteritems() if k[3] != 0}
#
# return indices
#
#
[docs]
@staticmethod
def times_in_wrf_file(filename):
"""Returns the times in netcdf file as datetime object"""
ncf = nc.Dataset(filename)
times_nc = ncf.variables["Times"][:]
ncf.close()
# Hope the utf-8 always works...
times_chr = [str(times_nc[nt, :], "utf-8")
for nt in range(times_nc.shape[0])]
times_dtm = [dt.datetime.strptime(t_chr, "%Y-%m-%d_%H:%M:%S")
for t_chr in times_chr]
return times_dtm
[docs]
def wrf_times(self, file_list):
"""Read all times in a list of wrf files
Output
------
- 1D-array containing all times
- 1D-array containing start indices of each file
"""
times = list()
start_indices = np.ndarray((len(file_list), ), int)
for nf in range(len(file_list)):
# ncf = nc.Dataset(file_list[nf])
times_this = self.times_in_wrf_file(file_list[nf])
start_indices[nf] = len(times)
times += times_this
# ncf.close()
return times, start_indices
#
# def open_wrf_location_files(self):
# """
# Keep a description of a small wrf file for each domain in memory
# to locate observations.
# Appends _loc_file to self.
#
# Note: Should be edited out of the code.
# """
#
# ndomains = self.namelist["domains"]["max_dom"]
# path = self.settings["run_dir"]
# pattern = "wrfinput_d%02d"
# self._loc_files = list()
# for nd in range(1, ndomains+1):
# fp = os.path.join(path, pattern % nd)
# self._loc_files.append(nc.Dataset(fp, "r"))
#
# def close_wrf_location_files(self):
# """See _open_wrf_location_files"""
# for loc_file in self._loc_files:
# loc_file.close()
#
# def wrf_filename_times(self, prefix):
# """Get timestamps in wrf file names in run directory."""
#
# # List all filenames
# files = self.get_wrf_filenames(prefix + "*")
# # Only use d01 files, pattern should be the same for all domains
# pattern = os.path.join(self.settings["run_dir"], prefix)
# files = [f for f in files if re.search(pattern, f)]
# # Extract timestamp from filename
# # Format is %Y-%m-%d_%H:%M:%S at the end of the filename
# pattern_time = "%Y-%m-%d_%H:%M:%S"
# len_tstamp = len(pattern_time) + 2
# times = [dt.datetime.strptime(f[-len_tstamp:], pattern_time)
# for f in files]
#
# return times
#
# def get_wrf_filenames(self, glob_pattern):
# """
# Gets the filenames in self.settings["run_dir"] that follow
# glob_pattern, excluding those that end with
# self.settings["original_save_suffix"]
# """
# path = self.settings["run_dir"]
# # All files...
# wfiles = glob.glob(os.path.join(path, glob_pattern))
# # All originals
# orig_suf = self.settings["original_save_suffix"]
# opattern = glob_pattern + orig_suf
# ofiles = glob.glob(os.path.join(path, opattern))
#
# # All files except all originals:
# files = [x for x in wfiles if x not in ofiles]
#
# # I need this sorted too often to not do it here.
# files = np.sort(files).tolist()
# return files
#
#
# def sample_total_columns(self, dat, loc, fields_list):
# """
# Sample total_columns of fields_list in WRF output in
# self.settings["run_dir"] at the location id_xy in domain, id_t
# in all wrfout-times. Files and indices therein are recognized
# by id_t and file_time_start_indices.
# All quantities needed for computing total columns from profiles
# are in dat (kernel, prior, ...).
#
# Arguments
# ---------
# dat (:class:`list`)
# Result of wrfhelper.read_sampling_coords. Used here: prior,
# prior_profile, kernel, psurf, pressure_axis, [, pwf]
# If psurf or any of pressure_axis are nan, wrf's own
# surface pressure is used and pressure_axis constructed
# from this and the number of levels in the averaging kernel.
# This allows sampling with synthetic data that don't have
# pressure information. This only works with level_def
# "layer_average".
# If pwf is not present or nan, a simple one is created, for
# level_def "layer_average".
# loc (:class:`dict`)
# A dictionary with all location-related input for sampling,
# computed in wrfout_sampler. Keys:
# id_xy, domain: Domain coordinates
# id_t: Timestep (continous throughout all files)
# frac_t: Interpolation coeficient between id_t and id_t+1:
# t_obs = frac_t*t[id_t] + (1-frac_t)*t[id_t+1])
# file_start_time_indices: Time index at which a new wrfout
# file starts
# files: names of wrfout files.
# fields_list (:class:`list`)
# The fields to sample total columns from.
#
# Output
# ------
# sampled_columns (:class:`array`)
# A 2D-array of sampled columns.
# Shape: (len(dat["prior"]), len(fields_list))
# """
#
# # Initialize output
# tc = np.ndarray(shape=(len(dat["prior"]), len(fields_list)), dtype=float)
# tc[:] = float("nan")
#
# # Process by domain
# UD = list(set(loc["domain"]))
# for dom in UD:
# idd = np.nonzero(loc["domain"] == dom)[0]
# # Process by id_t
# UT = list(set(loc["id_t"][idd]))
# for time_id in UT:
# # Coordinates to process
# idt = idd[np.nonzero(loc["id_t"][idd] == time_id)[0]]
# # Get tracer ensemble profiles
# profiles = self._read_and_intrp_v(loc, fields_list, time_id, idt)
# # List, len=len(fields_list), shape of each: (len(idt),nz)
# # Get pressure axis:
# #paxis = self.read_and_intrp(wh_names, id_ts, frac_t, id_xy, "P_HYD")/1e2 # Pa -> hPa
# psurf = self._read_and_intrp_v(loc, ["PSFC"], time_id, idt)[0]/1.e2 # Pa -> hPa
# # Shape: (len(idt),)
# ptop = float(self.namelist["domains"]["p_top_requested"])/1.e2
# # Shape: (len(idt),)
# znw = self._read_and_intrp_v(loc, ["ZNW"], time_id, idt)[0]
# #Shape:(len(idt),nz)
#
# # DONE reading from file.
# # Here it starts to make sense to loop over individual observations
# for nidt in range(len(idt)):
# nobs = idt[nidt]
# # Construct model pressure layer boundaries
# pb_mod = self.get_pressure_boundaries_znw(znw[nidt, :], psurf[nidt], ptop)
#
# if (np.diff(pb_mod) >= 0).any():
# msg = ("Model pressure boundaries for observation %d " + \
# "are not monotonically decreasing! Investigate.") % nobs
# raise ValueError(msg)
#
# # Construct retrieval pressure layer boundaries
# if dat["level_def"][nobs] == "layer_average":
# if np.any(np.isnan(dat["pressure_levels"][nobs])) \
# or np.isnan(dat["psurf"][nobs]):
# # Code for synthetic data without a pressure axis,
# # but with an averaging kernel:
# # Use wrf's surface and top pressure
# nlayers = len(dat["averaging_kernel"][nobs])
# pb_ret = np.linspace(psurf[nidt], ptop, nlayers+1)
# else:
# pb_ret = self.get_pressure_boundaries_paxis(
# dat["pressure_levels"][nobs],
# dat["psurf"][nobs])
# elif dat["level_def"][nobs] == "pressure_boundary":
# if np.any(np.isnan(dat["pressure_levels"][nobs])):
# # Code for synthetic data without a pressure axis,
# # but with an averaging kernel:
# # Use wrf's surface and top pressure
# nlevels = len(dat["averaging_kernel"][nobs])
# pb_ret = np.linspace(psurf[nidt], ptop, nlevels)
# else:
# pb_ret = dat["pressure_levels"][nobs]
#
# if (np.diff(pb_ret) >= 0).any():
# msg = ("Retrieval pressure boundaries for " + \
# "observation %d are not monotonically " + \
# "decreasing! Investigate.") % nobs
# raise ValueError(msg)
#
# # Get vertical integration coefficients (i.e. to
# # "interpolate" from model to retrieval grid)
# coef_matrix = self.get_int_coefs(pb_ret, pb_mod, dat["level_def"][nobs])
#
# # Model retrieval with averaging kernel and prior profile
# if "pressure_weighting_function" in list(dat.keys()):
# pwf = dat["pressure_weighting_function"][nobs]
# if (not "pressure_weighting_function" in list(dat.keys())) or np.any(np.isnan(pwf)):
# # Construct pressure weighting function from
# # pressure boundaries
# pwf = self.get_pressure_weighting_function(pb_ret, rule="simple")
#
# # Compute pressure-weighted averaging kernel
# avpw = pwf*dat["averaging_kernel"][nobs]
#
# # Get prior
# prior_col = dat["prior"][nobs]
# prior_profile = dat["prior_profile"][nobs]
# if np.isnan(prior_col): # compute prior
# prior_col = np.dot(pwf, prior_profile)
#
# # Compute total columns
# for nf in range(len(fields_list)):
# # Integrate model profile
# profile_intrp = np.matmul(coef_matrix, profiles[nf][nidt, :])
#
# # Model retrieval
# tc[nobs, nf] = prior_col + np.dot(avpw, profile_intrp - prior_profile)
#
# # Test phase: save pb_ret, pb_mod, coef_matrix,
# # one profile for manual checking
#
# # dat_save = dict(pb_ret=pb_ret,
# # pb_mod=pb_mod,
# # coef_matrix=coef_matrix,
# # ens_profile=ens_profiles[0],
# # profile_intrp=profile_intrp,
# # id=dat.id)
# #
# #out = open("model_profile_%d.pkl"%dat.id, "w")
# #cPickle.dump(dat_save, out, 0)
# # Average over footprint
# if self.settings["footprint_samples_dim"] > 1:
# indices = utilities.get_index_groups(dat["sounding_id"])
#
# # Make sure that this is correct: i know the number of indices
# lens = [len(group) for group in list(indices.values())]
# correct_len = self.settings["footprint_samples_dim"]**2
# if np.any([len_ != correct_len for len_ in set(lens)]):
# raise ValueError("Not all footprints have %d samples" %correct_len)
# # Ok, paranoid mode, also confirm that the indices are what I
# # think they are: consecutive numbers
# ranges = [np.ptp(group) for group in list(indices.values())]
# if np.any([ptp != correct_len for ptp in set(ranges)]):
# raise ValueError("Not all footprints have consecutive samples")
#
# tc_original = copy.deepcopy(tc)
# tc = utilities.apply_by_group(np.average, tc_original, indices)
#
# return tc
#
# @staticmethod
# def _read_and_intrp_v(loc, fields_list, time_id, idp):
# """
# Helper function for sample_total_columns.
# read_and_intrp, but vectorized.
# Reads in fields and interpolates
# them linearly in time.
#
# Arguments
# ----------
# loc (:class:`dict`)
# Passed through from sample_total_columns, see there.
# fields_list (:class:`list` of :class:`str`)
# List of netcdf-variables to process.
# time_id (:class:`int`)
# Time index referring to all files in loc to read
# idp (:class:`array` of :class:`int`)
# Indices for id_xy, domain and frac_t in loc (i.e.
# observations) to process.
#
# Output
# ------
# List of temporally interpolated fields, one entry per member of
# fields_list.
# """
#
# var_intrp_l = list()
#
# # Check we were really called with observations for just one domain
# domains = set(loc["domain"][idp])
# if len(domains) > 1:
# raise ValueError("I can only operate on idp with identical domains.")
# dom = domains.pop()
#
# # Select input files
# id_file0 = bisect.bisect_right(loc["file_start_time_indices"][dom], time_id) - 1
# id_file1 = bisect.bisect_right(loc["file_start_time_indices"][dom], time_id+1) - 1
# if id_file0 < 0 or id_file1 < 0:
# raise ValueError("This shouldn't happen.")
#
# # Get time id in file
# id_t_file0 = time_id - loc["file_start_time_indices"][dom][id_file0]
# id_t_file1 = time_id+1 - loc["file_start_time_indices"][dom][id_file1]
#
# # Open files
# nc0 = nc.Dataset(loc["files"][dom][id_file0], "r")
# nc1 = nc.Dataset(loc["files"][dom][id_file1], "r")
# # Per field to sample
# for field in fields_list:
# # Read input file
# field0 = wrf.getvar(wrfin=nc0,
# varname=field,
# timeidx=id_t_file0,
# squeeze=False,
# meta=False)
#
# field1 = wrf.getvar(wrfin=nc1,
# varname=field,
# timeidx=id_t_file1,
# squeeze=False,
# meta=False)
#
# if len(field0.shape) == 4:
# # Sample field at timesteps before and after observation
# # They are ordered nt x nz x ny x nx
# # var0 will have shape (len(idp),len(profile))
# var0 = field0[0, :, loc["id_xy"][idp, 1], loc["id_xy"][idp, 0]]
# var1 = field1[0, :, loc["id_xy"][idp, 1], loc["id_xy"][idp, 0]]
# # Repeat frac_t for profile size
# frac_t_ = np.array(loc["frac_t"][idp]).reshape((len(idp), 1)).repeat(var0.shape[1], 1)
# elif len(field0.shape) == 3:
# # var0 will have shape (len(idp),)
# var0 = field0[0, loc["id_xy"][idp, 1], loc["id_xy"][idp, 0]]
# var1 = field1[0, loc["id_xy"][idp, 1], loc["id_xy"][idp, 0]]
# frac_t_ = np.array(loc["frac_t"][idp])
# elif len(field0.shape) == 2:
# # var0 will have shape (len(idp),len(profile))
# # This is for ZNW, which is saved as (time_coordinate,
# # vertical_coordinate)
# var0 = field0[[0]*len(idp), :]
# var1 = field1[[0]*len(idp), :]
# frac_t_ = np.array(loc["frac_t"][idp]).reshape((len(idp), 1)).repeat(var0.shape[1], 1)
# else:
# raise ValueError("Can't deal with field with %d dimensions." % len(field0.shape))
#
# # Interpolate in time
# var_intrp_l.append(var0*frac_t_ + var1*(1. - frac_t_))
#
# nc0.close()
# nc1.close()
#
# return var_intrp_l
#
# @staticmethod
# def read_sampling_coords(sampling_coords_file, id0=None, id1=None):
# """Read in samples"""
#
# ncf = nc.Dataset(sampling_coords_file, "r")
# if id0 is None:
# id0 = 0
# if id1 is None:
# id1 = len(ncf.dimensions['soundings'])
#
# dat = dict(
# sounding_id=np.array(ncf.variables["sounding_id"][id0:id1]),
# date=ncf.variables["date"][id0:id1],
# latitude=np.array(ncf.variables["latitude"][id0:id1]),
# longitude=np.array(ncf.variables["longitude"][id0:id1]),
# latc_0=np.array(ncf.variables["latc_0"][id0:id1]),
# latc_1=np.array(ncf.variables["latc_1"][id0:id1]),
# latc_2=np.array(ncf.variables["latc_2"][id0:id1]),
# latc_3=np.array(ncf.variables["latc_3"][id0:id1]),
# lonc_0=np.array(ncf.variables["lonc_0"][id0:id1]),
# lonc_1=np.array(ncf.variables["lonc_1"][id0:id1]),
# lonc_2=np.array(ncf.variables["lonc_2"][id0:id1]),
# lonc_3=np.array(ncf.variables["lonc_3"][id0:id1]),
# prior=np.array(ncf.variables["prior"][id0:id1]),
# prior_profile=np.array(ncf.variables["prior_profile"][id0:id1,]),
# averaging_kernel=np.array(ncf.variables["averaging_kernel"][id0:id1]),
# pressure_levels=np.array(ncf.variables["pressure_levels"][id0:id1]),
# pressure_weighting_function=np.array(ncf.variables["pressure_weighting_function"][id0:id1]),
# level_def=ncf.variables["level_def"][id0:id1],
# psurf=np.array(ncf.variables["psurf"][id0:id1])
# )
#
# ncf.close()
#
# # Convert level_def from it's weird nc format to string
# dat["level_def"] = nc.chartostring(dat["level_def"])
#
# # Convert date to datetime object
# dat["time"] = [dt.datetime(*x) for x in dat["date"]]
#
# return dat
#
# @staticmethod
# def write_simulated_columns(obs_id, simulated, nmembers, outfile):
# """Write simulated observations to file."""
#
# # Output format: see obs_xco2_fr
#
# f = io.CT_CDF(outfile, method="create")
#
# dimid = f.createDimension("sounding_id", size=None)
# dimid = ("sounding_id",)
# savedict = io.std_savedict.copy()
# savedict["name"] = "sounding_id"
# savedict["dtype"] = "int64"
# savedict["long_name"] = "Unique_Dataset_observation_index_number"
# savedict["units"] = ""
# savedict["dims"] = dimid
# savedict["comment"] = "Format as in input"
# savedict["values"] = obs_id.tolist()
# f.add_data(savedict, nsets=0)
#
# dimmember = f.createDimension("nmembers", size=nmembers)
# dimmember = ("nmembers",)
# savedict = io.std_savedict.copy()
# savedict["name"] = "column_modeled"
# savedict["dtype"] = "float"
# savedict["long_name"] = "Simulated total column"
# savedict["units"] = "??"
# savedict["dims"] = dimid + dimmember
# savedict["comment"] = "Simulated model value created by WRFChemOO"
# savedict["values"] = simulated.tolist()
# f.add_data(savedict, nsets=0)
#
# f.close()
#
# @staticmethod
# def save_file_with_timestamp(file_path, out_dir, suffix=""):
# """ Saves a file to with a timestamp"""
# nowstamp = dt.datetime.now().strftime("_%Y-%m-%d_%H:%M:%S")
# new_name = os.path.basename(file_path) + suffix + nowstamp
# new_path = os.path.join(out_dir, new_name)
# shutil.copy2(file_path, new_path)
if __name__ == "__main__":
pass