Source code for pycif.plugins.obsparsers.tropomi_rpro.parse

from logging import info, debug
import xarray as xr
import pandas as pd
import numpy as np
import datetime
import netCDF4 as nc
from ....utils.datastores.empty import init_empty


[docs] def do_parse( self, obs_file, **kwargs ): """Parse function for a file from template observations Args: obs_file (str) : Path to input file Returns: pandas.DataFrame : Dataframe from input file df[parameter][station] """ info("Parsing official RPRO TROPOMI data " "from observation file: {}".format(obs_file)) ddi, ddf = self.datei, self.datef product_data = xr.open_dataset(obs_file, group="PRODUCT") obs_dates = product_data["time_utc"].values.astype( np.datetime64).astype(datetime.datetime) self.data_date_max = obs_dates.max() if ddi > obs_dates.max() or ddf < obs_dates.min(): debug(f"File {obs_file} with date {obs_dates.min()} " f"is outside simulation time period {ddi} / {ddf}") return init_empty() # Filter out data outside domain longitude = product_data.variables['longitude'].values latitude = product_data.variables['latitude'].values mask_domain = np.ones_like(longitude, dtype=bool) if hasattr(self, "crop_domain"): crop_domain = self.crop_domain mask_domain = ((longitude < crop_domain.xmax) & (longitude > crop_domain.xmin) & (latitude < crop_domain.ymax) & (latitude > crop_domain.ymin)) if mask_domain.sum() == 0: return init_empty() # Check number of levels nlevread = product_data.dims["layer"] if nlevread != self.nlayers: raise Exception( f"The file {obs_file} does not have the expected number of " f"averaging kernel layers: {nlevread} instead of " f"{self.nlayers}. \n" f"Please check your files or change the parameter 'nlayers' " f"in the Yaml (expert users only).") # Valid data scanline = product_data.dims["scanline"] ground_pixel = product_data.dims["ground_pixel"] time = product_data.dims["time"] obs = product_data.variables['methane_mixing_ratio_bias_corrected'].values mask_valid = product_data.variables['qa_value'].values >= self.qa_value qa = product_data.variables['qa_value'].values mask_valid = mask_valid & ~np.isnan(obs) if (mask_domain & mask_valid).sum() == 0: return init_empty() # Filter out data not in simulation window mask_time = (obs_dates >= ddi) & (obs_dates <= ddf) mask_all = mask_time[..., np.newaxis] & mask_valid & mask_domain if mask_all.sum() == 0: return init_empty() # Now process actual data debug(f"Data found in file {obs_file}, {mask_all.sum()} obs") list_basic_cols = [ 'date', 'duration', 'station', 'network', 'parameter', 'lon', 'lat', 'obs', 'obserror'] subdata = pd.DataFrame(columns=list_basic_cols) subdata['date'] = np.repeat( obs_dates[..., np.newaxis], ground_pixel, axis=2)[mask_all] subdata = subdata.assign(duration=self.obs_duration) subdata['lon'] = longitude[mask_all] subdata['lat'] = latitude[mask_all] subdata = subdata.assign(station=self.stationID) subdata = subdata.assign(network=self.networkID) subdata = subdata.assign(parameter=self.parameter) subdata["obs"] = obs[mask_all].astype(float) subdata["obserror"] = 2 * product_data.variables[ 'methane_mixing_ratio_precision'].values[mask_all].astype(float) subdata["qa_value"] = qa[mask_all].astype(float) across_track_pixel_index = np.tile(np.arange(ground_pixel), (product_data.dims["scanline"],1))[np.newaxis,...] subdata["across_track_pixel_index"] = across_track_pixel_index[mask_all].astype(int) # Load aks and obs detailed_results_grp = xr.open_dataset( obs_file, group='PRODUCT/SUPPORT_DATA/DETAILED_RESULTS') ak = np.float64( detailed_results_grp.variables['column_averaging_kernel'] .values[mask_all, :] ) # Pressure levels # particular case of TROPOM CH4: create pressure grid from psurf and delta input_data_grp = xr.open_dataset( obs_file, group='PRODUCT/SUPPORT_DATA/INPUT_DATA') psurf = input_data_grp.variables['surface_pressure'].values[mask_all] deltap = input_data_grp.variables['pressure_interval'].values[mask_all] pavg0 = np.array([psurf - deltap * lev for lev in range(self.nlayers + 1)]) pavg = np.float64(np.transpose(pavg0))/100 # prior profile in molecules_percm2 qa0 = np.float64( input_data_grp.variables['methane_profile_apriori'].values[mask_all, :] ) conv = input_data_grp.variables[ 'methane_profile_apriori' ].attrs['multiplication_factor_to_convert_to_molecules_percm2'] qa0 *= conv # information specific to TROPOMI # dry air subcolumns - required if not provided by the CTM (to update if O dvair = np.float64( input_data_grp.variables['dry_air_subcolumns'].values[mask_all, :] ) conv = input_data_grp.variables[ 'dry_air_subcolumns' ].attrs['multiplication_factor_to_convert_to_molecules_percm2'] dvair *= conv # Other parameters for detailed analysis (not necessary for basic usage !) swir_albedo = np.atleast_1d(np.float64(detailed_results_grp.variables['surface_albedo_SWIR'].values[mask_all])) nir_albedo = np.atleast_1d(np.float64(detailed_results_grp.variables['surface_albedo_NIR'].values[mask_all])) # blended_albedo = 2.4*nir_albedo - 1.13*swir_albedo fluorescence = np.atleast_1d(np.float64(detailed_results_grp.variables['fluorescence'].values[mask_all])) swir_aot = np.atleast_1d(np.float64(detailed_results_grp.variables['aerosol_optical_thickness_SWIR'].values[mask_all])) nir_aot = np.atleast_1d(np.float64(detailed_results_grp.variables['aerosol_optical_thickness_NIR'].values[mask_all])) aero_size = np.atleast_1d(np.float64(detailed_results_grp.variables['aerosol_size'].values[mask_all])) surf_alti = np.atleast_1d(np.float64(input_data_grp.variables['surface_altitude'].values[mask_all])) surf_rough = np.atleast_1d(np.float64(input_data_grp.variables['surface_altitude_precision'].values[mask_all])) ds = xr.Dataset({'qa0': (['index', 'level'], qa0), 'ak': (['index', 'level'], ak), 'pavg0': (['index', 'level_pressure'], pavg), 'dryair': (['index', 'level'], dvair), 'nir_albedo':(['index'], nir_albedo), 'swir_albedo':(['index'], swir_albedo), 'aerosol_size':(['index'], aero_size), 'fluorescence':(['index'], fluorescence), 'swir_aot':(['index'], swir_aot), 'nir_aot':(['index'], nir_aot), 'surf_altitude':(['index'], surf_alti), 'surf_roughness':(['index'], surf_rough) }, coords={'index': subdata.index, 'level': range(self.nlayers), 'level_pressure': range(self.nlayers + 1)}) subdata = subdata.to_xarray() subdata = xr.merge([ds, subdata]) debug(f"Observations after filter: {len(subdata.index)}") return subdata