from logging import info, debug
import xarray as xr
import pandas as pd
import numpy as np
import datetime
import netCDF4 as nc
from ....utils.datastores.empty import init_empty
[docs]
def do_parse(
self,
obs_file,
**kwargs
):
"""Parse function for a file from template observations
Args:
obs_file (str) :
Path to input file
Returns:
pandas.DataFrame :
Dataframe from input file df[parameter][station]
"""
info("Parsing SRON data from observation file: {}".format(obs_file))
ddi, ddf = self.datei, self.datef
instrument_data = xr.open_dataset(obs_file, group="instrument")
l1b_dates = instrument_data["l1b_file"].str.decode(
"ascii"
).data.tolist().strip().split("_")[5:7]
l1b_dates = [datetime.datetime.strptime(d, "%Y%m%dT%H%M%S")
for d in l1b_dates]
self.l1b_dates = l1b_dates
if ddi > l1b_dates[1] or ddf < l1b_dates[0]:
debug(f"File {obs_file} with date {l1b_dates[1]} "
f"is outside simulation time period {ddi} / {ddf}")
return init_empty()
nobs = instrument_data.dims["nobs"]
# Filter out data outside domain
mask_domain = np.ones((nobs), bool)
longitude = instrument_data.variables['longitude_center']
latitude = instrument_data.variables['latitude_center']
if hasattr(self, "crop_domain"):
crop_domain = self.crop_domain
mask_domain = ((longitude < crop_domain.xmax) &
(longitude > crop_domain.xmin) &
(latitude < crop_domain.ymax) &
(latitude > crop_domain.ymin))
if mask_domain.sum() == 0:
return init_empty()
# Valid data
mask_valid = instrument_data.variables['pixel_id'] >= 0
mask_valid = mask_valid & ~nc.Dataset(
obs_file, 'r').groups['target_product']["xch4_corrected"][:].mask
if (mask_domain & mask_valid).sum() == 0:
return init_empty()
# Filter out data not in simulation window
time = instrument_data.variables['time'].values
time_valid = time[mask_valid & mask_domain]
df_for_time = pd.DataFrame()
df_for_time['year'] = time_valid[:, 0]
df_for_time['month'] = time_valid[:, 1]
df_for_time['day'] = time_valid[:, 2]
df_for_time['hour'] = time_valid[:, 3]
df_for_time['minute'] = time_valid[:, 4]
df_for_time['second'] = time_valid[:, 5]
df_for_time['millis'] = time_valid[:, 6]
df_for_time['date'] = pd.to_datetime(
df_for_time[['year', 'month', 'day', 'hour', 'minute', 'second']])
df_for_time.loc[pd.isnull(df_for_time['date']), "date"] = \
datetime.datetime(year=1870, month=1, day=1)
mask_time = (df_for_time['date'] >= ddi) & (df_for_time['date'] <= ddf)
mask_all = mask_valid & mask_domain
mask_all[mask_all] = mask_time.values
# if mask_all.sum() == 0:
# return init_empty()
if mask_all.sum() <= 1:
return init_empty()
# Now process actual data
debug(f"Data found in file {obs_file}")
list_basic_cols = [
'date', 'duration', 'station', 'network', 'parameter',
'lon', 'lat', 'obs', 'obserror']
subdata = pd.DataFrame(columns=list_basic_cols)
subdata['date'] = df_for_time.loc[mask_time, 'date']
subdata = subdata.assign(duration=self.obs_duration)
subdata['lon'] = longitude.data[mask_all]
subdata['lat'] = latitude.data[mask_all]
subdata = subdata.assign(station=self.stationID)
subdata = subdata.assign(network=self.networkID)
subdata = subdata.assign(parameter=self.parameter)
# Load aks and obs
target_grp = xr.open_dataset(obs_file, group='target_product')
nlevread = target_grp.dims["nlayer"]
if nlevread != self.nlayers:
raise Exception(f"The file {obs_file} does not have the expected number of "
f"averaging kernel layers: {nlevread} instead of "
f"{self.nlayers}. \n"
f"Please check your files or change the parameter 'nlayers' "
f"in the Yaml (expert users only).")
subdata["obs"] = np.float64(target_grp.variables['xch4_corrected'][mask_all].values)
subdata['obserror'] = 2 * target_grp.variables['xch4_precision'][mask_all].values # Factor 2 to apply to SRON obserror !
ak = np.float64(target_grp.variables['xch4_column_averaging_kernel'][mask_all, :].values)
# Pressure levels
# particular case of TROPOM CH4: create pressure grid from psurf and delta
meteo_grp = xr.open_dataset(obs_file, group='meteo')
psurf = meteo_grp.variables['surface_pressure'][mask_all].values
deltap = meteo_grp.variables['dp'][mask_all].values
pavg0 = np.array([psurf - deltap * lev for lev in range(self.nlayers + 1)])
pavg = np.float64(np.transpose(pavg0))
# prior profile in molecules_percm2
qa0 = np.float64(target_grp.variables['ch4_profile_apriori'][mask_all, :].values)
# information specific to TROPOMI
# dry air subcolumns - required if not provided by the CTM (to update if O
dvair = np.float64(meteo_grp.variables['dry_air_subcolumns'][mask_all, :].values)
# SWIR, NIR and Blended albedo
sideprod_grp = xr.open_dataset(obs_file, group='side_product')
swir_albedo = np.float64(sideprod_grp.variables['surface_albedo'][mask_all, 1].values)
nir_albedo = np.float64(sideprod_grp.variables['surface_albedo'][mask_all, 0].values)
blended_albedo = 2.4*nir_albedo - 1.13*swir_albedo
# Other parameters for detailed analysis (not necessary for basic usage !)
fluorescence = np.float64(sideprod_grp.variables['fluorescence'][mask_all].values)
swir_aot = np.float64(sideprod_grp.variables['aerosol_optical_thickness'][mask_all, 1])
nir_aot = np.float64(sideprod_grp.variables['aerosol_optical_thickness'][mask_all, 0])
a_priori = np.float64(target_grp.variables['xch4_apriori'][mask_all].values)
surf_alti = np.float64(meteo_grp.variables['surface_altitude'][mask_all].values)
surf_rough = np.float64(meteo_grp.variables['surface_altitude_stdv'][mask_all].values)
across_track_pix_idx = instrument_data.variables['ground_pixel'][mask_all].values
# aerosol size
aero_size = np.float64(sideprod_grp.variables['aerosol_size'][mask_all].values)
try:
# Merge infos into xarray
# ds = xr.Dataset({'qa0': (['index', 'level'], qa0),
# 'ak': (['index', 'level'], ak),
# 'pavg0': (['index', 'level_pressure'], pavg),
# 'dryair': (['index', 'level'], dvair),
# 'nir_albedo':(['index'], nir_albedo),
# 'swir_albedo':(['index'], swir_albedo),
# 'blended_albedo':(['index'], blended_albedo),
# 'aerosol_size':(['index'], aero_size)},
# coords={'index': subdata.index,
# 'level': range(self.nlayers),
# 'level_pressure': range(self.nlayers + 1)})
# With other parameters
ds = xr.Dataset({'qa0': (['index', 'level'], qa0),
'ak': (['index', 'level'], ak),
'pavg0': (['index', 'level_pressure'], pavg),
'dryair': (['index', 'level'], dvair),
'nir_albedo':(['index'], nir_albedo),
'swir_albedo':(['index'], swir_albedo),
'blended_albedo':(['index'], blended_albedo),
'aerosol_size':(['index'], aero_size),
'fluorescence':(['index'], fluorescence),
'swir_aot':(['index'], swir_aot),
'nir_aot':(['index'], nir_aot),
'xch4_apriori':(['index'], a_priori),
'surf_altitude':(['index'], surf_alti),
'surf_roughness':(['index'], surf_rough),
'across_track_pixid':(['index'], across_track_pix_idx),
},
coords={'index': subdata.index,
'level': range(self.nlayers),
'level_pressure': range(self.nlayers + 1)})
except:
debug("Variables when error !!")
debug(np.count_nonzero(mask_valid))
debug(np.count_nonzero(mask_domain))
debug(np.count_nonzero(mask_time))
debug(np.count_nonzero(mask_all))
subdata = subdata.to_xarray()
subdata = xr.merge([ds, subdata])
# filtering according to information provided with data
debug(f"Observations before filter: {len(subdata.index)}")
subfilter = pd.DataFrame(
columns=['cf', 'ws', 'std', 'aot', 'chi2',
'sza', 'prec', 'sr', 'sa', 'qa', 'balb'])
cloud_frac = meteo_grp.variables['cloud_fraction'][mask_all, :]
subfilter['cf'] = np.max(cloud_frac, axis=1)
w_to_s = meteo_grp.variables['weak_ch4_column'][mask_all]\
/ meteo_grp.variables['strong_ch4_column'][mask_all]
subfilter['ws'] = w_to_s
subfilter['std'] = meteo_grp['stdv_ch4_ratio'][mask_all]
subfilter['aot'] = sideprod_grp.variables['aerosol_optical_thickness'][mask_all, 0]
# subfilter['aot'] = sideprod_grp.variables['aerosol_optical_thickness'][mask_all, 1]
diag_grp = xr.open_dataset(obs_file, group='diagnostics')
subfilter['chi2'] = diag_grp['chi_squared'][mask_all]
subfilter['sza'] = instrument_data['solar_zenith_angle'][mask_all]
subfilter['prec'] = target_grp['xch4_precision'][mask_all]
subfilter['sr'] = meteo_grp.variables['surface_altitude_stdv'][mask_all]
subfilter['sa'] = sideprod_grp.variables['surface_albedo'][mask_all, 1]
subfilter['qa'] = diag_grp.variables['qa_value'][mask_all]
subfilter['balb'] = blended_albedo
mask_filter = (
(subfilter['qa'] == self.qa_value)
& (subfilter['cf'] < self.cloud_fraction_max)
& (subfilter['ws'] > self.ws_min)
& (subfilter['ws'] < self.ws_max)
& (subfilter['std'] < self.std_max)
& (subfilter['aot'] < self.aot_max)
& (subfilter['chi2'] < self.chi2_max)
& (subfilter['sza'] < self.sza_max)
& (subfilter['prec'] < self.prec_max)
& (subfilter['sr'] < self.sr_max)
& (subfilter['sa'] > self.sa_min)
& (subfilter['balb'] < self.balb_max)
)
#Viewing zenith angle<60 ? AOT SWIR <0.1 (here AOT NIR <0.3) ? SNR >50
subdata = subdata.isel(index=mask_filter)
debug(f"Observations after filter: {len(subdata.index)}")
return subdata