Source code for pycif.plugins.obsparsers.tropomi_wfmd.parse

from logging import info, debug
import xarray as xr
import pandas as pd
import numpy as np
import datetime
import netCDF4 as nc
from ....utils.datastores.empty import init_empty

[docs] def do_parse( self, obs_file, **kwargs ): """Parse function for a file from template observations Args: obs_file (str) : Path to input file Returns: pandas.DataFrame : Dataframe from input file df[parameter][station] """ info("Parsing WFMD data from observation file: {}".format(obs_file)) ddi, ddf = self.datei, self.datef data = xr.open_dataset(obs_file) #update format for version 2.0 if obs_file[-6:] == "fv4.nc": version ="2.0" time_format = "%Y-%m-%dT%H:%M:%SZ" nlayers = 31 else: version = "1.8" time_format = "%Y%m%dT%H%M%SZ" nlayers = 20 # Filter on date data_di = datetime.datetime.strptime(data.attrs["time_coverage_start"], time_format) data_df = datetime.datetime.strptime(data.attrs["time_coverage_end"], time_format) if ddi > data_df or ddf < data_di: debug(f"File {obs_file} with date {data_df} " f"is outside simulation time period {ddi} / {ddf}") return init_empty() nobs = data.dims["sounding_dim"] # Filter out data outside domain mask_domain = np.ones((nobs), bool) longitude = data.variables['longitude'] latitude = data.variables['latitude'] if hasattr(self, "crop_domain"): crop_domain = self.crop_domain mask_domain = ((longitude < crop_domain.xmax) & (longitude > crop_domain.xmin) & (latitude < crop_domain.ymax) & (latitude > crop_domain.ymin)) if mask_domain.sum() == 0: return init_empty() # Valid data mask_valid = data.variables['xch4_quality_flag'] == 0 if (mask_domain & mask_valid).sum() == 0: return init_empty() # Filter out data not in simulation window time = data.variables['time'].values time_valid = pd.to_datetime(time[mask_valid & mask_domain]) mask_time = (time_valid >= ddi) & (time_valid <= ddf) mask_all = mask_valid & mask_domain mask_all[mask_all] = mask_time if mask_all.sum() == 0: return init_empty() # Now process actual data debug(f"Data found in file {obs_file}") list_basic_cols = [ 'date', 'duration', 'station', 'network', 'parameter', 'lon', 'lat', 'obs', 'obserror'] subdata = pd.DataFrame(columns=list_basic_cols) subdata['date'] = time_valid[mask_time] subdata = subdata.assign(duration=self.obs_duration) subdata['lon'] = longitude.data[mask_all] subdata['lat'] = latitude.data[mask_all] subdata = subdata.assign(station=self.stationID) subdata = subdata.assign(network=self.networkID) subdata = subdata.assign(parameter=self.parameter) # Load aks and obs nlevread = data.dims["layer_dim"] if nlevread != nlayers: raise Exception( "The file {obs_file} does not have the expected number of " f"averaging kernel layers: {nlevread} instead of {nlayers}. \n" f"Please check your files or change the parameter 'nlayers' " f"in the Yaml (expert users only).") subdata["obs"] = np.float64(data.variables['xch4'].values[mask_all.values]) subdata['obserror'] = data.variables['xch4_uncertainty'].values[mask_all.values] ak = np.float64(data['xch4_averaging_kernel'].values[mask_all.values]) # Pressure levels # particular case of TROPOMI CH4: create pressure grid from psurf and delta pavg0 = np.float64(data.variables["pressure_levels"].values[mask_all.values]) # prior profile in molecules_percm2 and pressure weights qa0 = np.float64(data.variables['ch4_profile_apriori'].values[mask_all.values]) pw = np.float64(data.variables['pressure_weight'].values[mask_all.values]) # Merge infos into xarray ds = xr.Dataset({'qa0': (['index', 'level'], qa0), 'ak': (['index', 'level'], ak), 'pw': (['index', 'level'], pw), 'pavg0': (['index', 'level_pressure'], pavg0) }, coords={'index': subdata.index, 'level': range(nlayers), 'level_pressure': range(nlayers + 1)}) subdata = subdata.to_xarray() subdata = xr.merge([ds, subdata]) # filtering according to information provided with data debug(f"Observations before filter: {len(subdata.index)}") mask_filter = data.variables["xch4_quality_flag"][mask_all].values == 0 subdata = subdata.isel(index=mask_filter) debug(f"Observations after filter: {len(subdata.index)}") # Dump information and return pandas 1D values only dir_monitors = "{}/obs/{}/".format(self.workdir, self.parameter) subdata.to_netcdf( f"{dir_monitors}/monitor_{self.stationID}_{self.networkID}" f"_{self.parameter}_{data_di.strftime('%Y%m%d%H%M')}.nc" ) subdata = xr.Dataset( {var: subdata[var] for var in subdata.variables if subdata[var].dims == ("index",)}).to_pandas() return subdata