Source code for pycif.plugins.obsparsers.tropomi_wfmd.parse
from logging import info, debug
import xarray as xr
import pandas as pd
import numpy as np
import datetime
import netCDF4 as nc
from ....utils.datastores.empty import init_empty
[docs]
def do_parse(
self,
obs_file,
**kwargs
):
"""Parse function for a file from template observations
Args:
obs_file (str) :
Path to input file
Returns:
pandas.DataFrame :
Dataframe from input file df[parameter][station]
"""
info("Parsing WFMD data from observation file: {}".format(obs_file))
ddi, ddf = self.datei, self.datef
data = xr.open_dataset(obs_file)
#update format for version 2.0
if obs_file[-6:] == "fv4.nc":
version ="2.0"
time_format = "%Y-%m-%dT%H:%M:%SZ"
nlayers = 31
else:
version = "1.8"
time_format = "%Y%m%dT%H%M%SZ"
nlayers = 20
# Filter on date
data_di = datetime.datetime.strptime(data.attrs["time_coverage_start"],
time_format)
data_df = datetime.datetime.strptime(data.attrs["time_coverage_end"],
time_format)
if ddi > data_df or ddf < data_di:
debug(f"File {obs_file} with date {data_df} "
f"is outside simulation time period {ddi} / {ddf}")
return init_empty()
nobs = data.dims["sounding_dim"]
# Filter out data outside domain
mask_domain = np.ones((nobs), bool)
longitude = data.variables['longitude']
latitude = data.variables['latitude']
if hasattr(self, "crop_domain"):
crop_domain = self.crop_domain
mask_domain = ((longitude < crop_domain.xmax) &
(longitude > crop_domain.xmin) &
(latitude < crop_domain.ymax) &
(latitude > crop_domain.ymin))
if mask_domain.sum() == 0:
return init_empty()
# Valid data
mask_valid = data.variables['xch4_quality_flag'] == 0
if (mask_domain & mask_valid).sum() == 0:
return init_empty()
# Filter out data not in simulation window
time = data.variables['time'].values
time_valid = pd.to_datetime(time[mask_valid & mask_domain])
mask_time = (time_valid >= ddi) & (time_valid <= ddf)
mask_all = mask_valid & mask_domain
mask_all[mask_all] = mask_time
if mask_all.sum() == 0:
return init_empty()
# Now process actual data
debug(f"Data found in file {obs_file}")
list_basic_cols = [
'date', 'duration', 'station', 'network', 'parameter',
'lon', 'lat', 'obs', 'obserror']
subdata = pd.DataFrame(columns=list_basic_cols)
subdata['date'] = time_valid[mask_time]
subdata = subdata.assign(duration=self.obs_duration)
subdata['lon'] = longitude.data[mask_all]
subdata['lat'] = latitude.data[mask_all]
subdata = subdata.assign(station=self.stationID)
subdata = subdata.assign(network=self.networkID)
subdata = subdata.assign(parameter=self.parameter)
# Load aks and obs
nlevread = data.dims["layer_dim"]
if nlevread != nlayers:
raise Exception(
"The file {obs_file} does not have the expected number of "
f"averaging kernel layers: {nlevread} instead of {nlayers}. \n"
f"Please check your files or change the parameter 'nlayers' "
f"in the Yaml (expert users only).")
subdata["obs"] = np.float64(data.variables['xch4'].values[mask_all.values])
subdata['obserror'] = data.variables['xch4_uncertainty'].values[mask_all.values]
ak = np.float64(data['xch4_averaging_kernel'].values[mask_all.values])
# Pressure levels
# particular case of TROPOMI CH4: create pressure grid from psurf and delta
pavg0 = np.float64(data.variables["pressure_levels"].values[mask_all.values])
# prior profile in molecules_percm2 and pressure weights
qa0 = np.float64(data.variables['ch4_profile_apriori'].values[mask_all.values])
pw = np.float64(data.variables['pressure_weight'].values[mask_all.values])
# Merge infos into xarray
ds = xr.Dataset({'qa0': (['index', 'level'], qa0),
'ak': (['index', 'level'], ak),
'pw': (['index', 'level'], pw),
'pavg0': (['index', 'level_pressure'], pavg0)
},
coords={'index': subdata.index,
'level': range(nlayers),
'level_pressure': range(nlayers + 1)})
subdata = subdata.to_xarray()
subdata = xr.merge([ds, subdata])
# filtering according to information provided with data
debug(f"Observations before filter: {len(subdata.index)}")
mask_filter = data.variables["xch4_quality_flag"][mask_all].values == 0
subdata = subdata.isel(index=mask_filter)
debug(f"Observations after filter: {len(subdata.index)}")
# Dump information and return pandas 1D values only
dir_monitors = "{}/obs/{}/".format(self.workdir, self.parameter)
subdata.to_netcdf(
f"{dir_monitors}/monitor_{self.stationID}_{self.networkID}"
f"_{self.parameter}_{data_di.strftime('%Y%m%d%H%M')}.nc"
)
subdata = xr.Dataset(
{var: subdata[var] for var in subdata.variables
if subdata[var].dims == ("index",)}).to_pandas()
return subdata