Source code for pycif.plugins.obsparsers.tropomi_sron.parse

from logging import info, debug
import xarray as xr
import pandas as pd
import numpy as np
import datetime
import netCDF4 as nc
from ....utils.datastores.empty import init_empty


[docs] def do_parse( self, obs_file, **kwargs ): """Parse function for a file from template observations Args: obs_file (str) : Path to input file Returns: pandas.DataFrame : Dataframe from input file df[parameter][station] """ info("Parsing SRON data from observation file: {}".format(obs_file)) ddi, ddf = self.datei, self.datef instrument_data = xr.open_dataset(obs_file, group="instrument") l1b_dates = instrument_data["l1b_file"].str.decode( "ascii" ).data.tolist().strip().split("_")[5:7] l1b_dates = [datetime.datetime.strptime(d, "%Y%m%dT%H%M%S") for d in l1b_dates] self.l1b_dates = l1b_dates if ddi > l1b_dates[1] or ddf < l1b_dates[0]: debug(f"File {obs_file} with date {l1b_dates[1]} " f"is outside simulation time period {ddi} / {ddf}") return init_empty() nobs = instrument_data.dims["nobs"] # Filter out data outside domain mask_domain = np.ones((nobs), bool) longitude = instrument_data.variables['longitude_center'] latitude = instrument_data.variables['latitude_center'] if hasattr(self, "crop_domain"): crop_domain = self.crop_domain mask_domain = ((longitude < crop_domain.xmax) & (longitude > crop_domain.xmin) & (latitude < crop_domain.ymax) & (latitude > crop_domain.ymin)) if mask_domain.sum() == 0: return init_empty() # Valid data mask_valid = instrument_data.variables['pixel_id'] >= 0 mask_valid = mask_valid & ~nc.Dataset( obs_file, 'r').groups['target_product']["xch4_corrected"][:].mask if (mask_domain & mask_valid).sum() == 0: return init_empty() # Filter out data not in simulation window time = instrument_data.variables['time'].values time_valid = time[mask_valid & mask_domain] df_for_time = pd.DataFrame() df_for_time['year'] = time_valid[:, 0] df_for_time['month'] = time_valid[:, 1] df_for_time['day'] = time_valid[:, 2] df_for_time['hour'] = time_valid[:, 3] df_for_time['minute'] = time_valid[:, 4] df_for_time['second'] = time_valid[:, 5] df_for_time['millis'] = time_valid[:, 6] df_for_time['date'] = pd.to_datetime( df_for_time[['year', 'month', 'day', 'hour', 'minute', 'second']]) df_for_time.loc[pd.isnull(df_for_time['date']), "date"] = \ datetime.datetime(year=1870, month=1, day=1) mask_time = (df_for_time['date'] >= ddi) & (df_for_time['date'] <= ddf) mask_all = mask_valid & mask_domain mask_all[mask_all] = mask_time.values # if mask_all.sum() == 0: # return init_empty() if mask_all.sum() <= 1: return init_empty() # Now process actual data debug(f"Data found in file {obs_file}") list_basic_cols = [ 'date', 'duration', 'station', 'network', 'parameter', 'lon', 'lat', 'obs', 'obserror'] subdata = pd.DataFrame(columns=list_basic_cols) subdata['date'] = df_for_time.loc[mask_time, 'date'] subdata = subdata.assign(duration=self.obs_duration) subdata['lon'] = longitude.data[mask_all] subdata['lat'] = latitude.data[mask_all] subdata = subdata.assign(station=self.stationID) subdata = subdata.assign(network=self.networkID) subdata = subdata.assign(parameter=self.parameter) # Load aks and obs target_grp = xr.open_dataset(obs_file, group='target_product') nlevread = target_grp.dims["nlayer"] if nlevread != self.nlayers: raise Exception(f"The file {obs_file} does not have the expected number of " f"averaging kernel layers: {nlevread} instead of " f"{self.nlayers}. \n" f"Please check your files or change the parameter 'nlayers' " f"in the Yaml (expert users only).") subdata["obs"] = np.float64(target_grp.variables['xch4_corrected'][mask_all].values) subdata['obserror'] = 2 * target_grp.variables['xch4_precision'][mask_all].values # Factor 2 to apply to SRON obserror ! ak = np.float64(target_grp.variables['xch4_column_averaging_kernel'][mask_all, :].values) # Pressure levels # particular case of TROPOM CH4: create pressure grid from psurf and delta meteo_grp = xr.open_dataset(obs_file, group='meteo') psurf = meteo_grp.variables['surface_pressure'][mask_all].values deltap = meteo_grp.variables['dp'][mask_all].values pavg0 = np.array([psurf - deltap * lev for lev in range(self.nlayers + 1)]) pavg = np.float64(np.transpose(pavg0)) # prior profile in molecules_percm2 qa0 = np.float64(target_grp.variables['ch4_profile_apriori'][mask_all, :].values) # information specific to TROPOMI # dry air subcolumns - required if not provided by the CTM (to update if O dvair = np.float64(meteo_grp.variables['dry_air_subcolumns'][mask_all, :].values) # SWIR, NIR and Blended albedo sideprod_grp = xr.open_dataset(obs_file, group='side_product') swir_albedo = np.float64(sideprod_grp.variables['surface_albedo'][mask_all, 1].values) nir_albedo = np.float64(sideprod_grp.variables['surface_albedo'][mask_all, 0].values) blended_albedo = 2.4*nir_albedo - 1.13*swir_albedo # Other parameters for detailed analysis (not necessary for basic usage !) fluorescence = np.float64(sideprod_grp.variables['fluorescence'][mask_all].values) swir_aot = np.float64(sideprod_grp.variables['aerosol_optical_thickness'][mask_all, 1]) nir_aot = np.float64(sideprod_grp.variables['aerosol_optical_thickness'][mask_all, 0]) a_priori = np.float64(target_grp.variables['xch4_apriori'][mask_all].values) surf_alti = np.float64(meteo_grp.variables['surface_altitude'][mask_all].values) surf_rough = np.float64(meteo_grp.variables['surface_altitude_stdv'][mask_all].values) across_track_pix_idx = instrument_data.variables['ground_pixel'][mask_all].values # aerosol size aero_size = np.float64(sideprod_grp.variables['aerosol_size'][mask_all].values) try: # Merge infos into xarray # ds = xr.Dataset({'qa0': (['index', 'level'], qa0), # 'ak': (['index', 'level'], ak), # 'pavg0': (['index', 'level_pressure'], pavg), # 'dryair': (['index', 'level'], dvair), # 'nir_albedo':(['index'], nir_albedo), # 'swir_albedo':(['index'], swir_albedo), # 'blended_albedo':(['index'], blended_albedo), # 'aerosol_size':(['index'], aero_size)}, # coords={'index': subdata.index, # 'level': range(self.nlayers), # 'level_pressure': range(self.nlayers + 1)}) # With other parameters ds = xr.Dataset({'qa0': (['index', 'level'], qa0), 'ak': (['index', 'level'], ak), 'pavg0': (['index', 'level_pressure'], pavg), 'dryair': (['index', 'level'], dvair), 'nir_albedo':(['index'], nir_albedo), 'swir_albedo':(['index'], swir_albedo), 'blended_albedo':(['index'], blended_albedo), 'aerosol_size':(['index'], aero_size), 'fluorescence':(['index'], fluorescence), 'swir_aot':(['index'], swir_aot), 'nir_aot':(['index'], nir_aot), 'xch4_apriori':(['index'], a_priori), 'surf_altitude':(['index'], surf_alti), 'surf_roughness':(['index'], surf_rough), 'across_track_pixid':(['index'], across_track_pix_idx), }, coords={'index': subdata.index, 'level': range(self.nlayers), 'level_pressure': range(self.nlayers + 1)}) except: debug("Variables when error !!") debug(np.count_nonzero(mask_valid)) debug(np.count_nonzero(mask_domain)) debug(np.count_nonzero(mask_time)) debug(np.count_nonzero(mask_all)) subdata = subdata.to_xarray() subdata = xr.merge([ds, subdata]) # filtering according to information provided with data debug(f"Observations before filter: {len(subdata.index)}") subfilter = pd.DataFrame( columns=['cf', 'ws', 'std', 'aot', 'chi2', 'sza', 'prec', 'sr', 'sa', 'qa', 'balb']) cloud_frac = meteo_grp.variables['cloud_fraction'][mask_all, :] subfilter['cf'] = np.max(cloud_frac, axis=1) w_to_s = meteo_grp.variables['weak_ch4_column'][mask_all]\ / meteo_grp.variables['strong_ch4_column'][mask_all] subfilter['ws'] = w_to_s subfilter['std'] = meteo_grp['stdv_ch4_ratio'][mask_all] subfilter['aot'] = sideprod_grp.variables['aerosol_optical_thickness'][mask_all, 0] # subfilter['aot'] = sideprod_grp.variables['aerosol_optical_thickness'][mask_all, 1] diag_grp = xr.open_dataset(obs_file, group='diagnostics') subfilter['chi2'] = diag_grp['chi_squared'][mask_all] subfilter['sza'] = instrument_data['solar_zenith_angle'][mask_all] subfilter['prec'] = target_grp['xch4_precision'][mask_all] subfilter['sr'] = meteo_grp.variables['surface_altitude_stdv'][mask_all] subfilter['sa'] = sideprod_grp.variables['surface_albedo'][mask_all, 1] subfilter['qa'] = diag_grp.variables['qa_value'][mask_all] subfilter['balb'] = blended_albedo mask_filter = ( (subfilter['qa'] == self.qa_value) & (subfilter['cf'] < self.cloud_fraction_max) & (subfilter['ws'] > self.ws_min) & (subfilter['ws'] < self.ws_max) & (subfilter['std'] < self.std_max) & (subfilter['aot'] < self.aot_max) & (subfilter['chi2'] < self.chi2_max) & (subfilter['sza'] < self.sza_max) & (subfilter['prec'] < self.prec_max) & (subfilter['sr'] < self.sr_max) & (subfilter['sa'] > self.sa_min) & (subfilter['balb'] < self.balb_max) ) #Viewing zenith angle<60 ? AOT SWIR <0.1 (here AOT NIR <0.3) ? SNR >50 subdata = subdata.isel(index=mask_filter) debug(f"Observations after filter: {len(subdata.index)}") return subdata