from logging import info, debug
import xarray as xr
import pandas as pd
import numpy as np
import datetime
import netCDF4 as nc
from ....utils.datastores.empty import init_empty
[docs]
def do_parse(
self,
obs_file,
**kwargs
):
"""Parse function for a file from template observations
Args:
obs_file (str) :
Path to input file
Returns:
pandas.DataFrame :
Dataframe from input file df[parameter][station]
"""
info("Parsing Blended GOSAT+TROPOMI data from observation file: {}".format(obs_file))
ddi, ddf = self.datei, self.datef
# year = int(obs_file[83:87])
# month = int(obs_file[87:89])
# if (year < 2019) or (year==2019 and month<12):
# debug(f"File {obs_file} is < 12-2019")
# return init_empty()
product_data = xr.open_dataset(obs_file)
obs_dates = product_data["time_utc"].values.astype(np.datetime64).astype(datetime.datetime)
if len(obs_dates) == 0:
debug(f"File {obs_file} has 0 obs")
return init_empty()
if ddi > obs_dates.max() or ddf < obs_dates.min():
debug(f"File {obs_file} with date {obs_dates.min()} "
f"is outside simulation time period {ddi} / {ddf}")
return init_empty()
data_day = datetime.datetime(year=obs_dates.min().year, month=obs_dates.min().month, day=obs_dates.min().day)
nobs = product_data.dims["nobs"]
# Filter out data outside domain
mask_domain = np.ones((nobs), bool)
longitude = product_data.variables['longitude'].values
latitude = product_data.variables['latitude'].values
if hasattr(self, "crop_domain"):
crop_domain = self.crop_domain
mask_domain = ((longitude < crop_domain.xmax) &
(longitude > crop_domain.xmin) &
(latitude < crop_domain.ymax) &
(latitude > crop_domain.ymin))
if mask_domain.sum() == 0:
return init_empty()
# Valid data
obs = product_data.variables['methane_mixing_ratio_blended'].values
mask_valid = product_data.variables['qa_value'].values >= self.qa_value
mask_valid = mask_valid & ~np.isnan(obs)
if (mask_valid).sum() == 0:
return init_empty()
# Filter out data not in simulation window
mask_time = (obs_dates >= ddi) & (obs_dates <= ddf)
# Filter out coastal scenes
if self.filter_coastal_scene:
chi2_SWIR = product_data.variables['chi_square_SWIR'].values.astype(float)
surface_classification = product_data.variables['surface_classification'].values.astype(float)
mask_coasts = ~((surface_classification == 3) | ((surface_classification == 2) & (chi2_SWIR >= 20000)))
# Filter blended_albedo
blended_albedo = 2.4*np.float64(product_data.variables['surface_albedo_NIR'].values) - 1.13*np.float64(product_data.variables['surface_albedo_SWIR'].values)
mask_albedo = blended_albedo < 0.8
# Aggregate masks
# mask_all = mask_valid & mask_domain & mask_time & mask_albedo
mask_all = mask_valid & mask_domain & mask_time
if self.filter_coastal_scene:
mask_all = mask_all & mask_coasts
if mask_all.sum() <= 1:
return init_empty()
# Now process actual data
debug(f"Data found in file {obs_file}")
list_basic_cols = [
'date', 'duration', 'station', 'network', 'parameter',
'lon', 'lat', 'obs', 'obserror']
subdata = pd.DataFrame(columns=list_basic_cols)
subdata['date'] = obs_dates[mask_all]
subdata = subdata.assign(duration=self.obs_duration)
subdata['lon'] = longitude[mask_all]
subdata['lat'] = latitude[mask_all]
subdata = subdata.assign(station=self.stationID)
subdata = subdata.assign(network=self.networkID)
subdata = subdata.assign(parameter=self.parameter)
subdata["obs"] = np.float64(obs[mask_all])
subdata['obserror'] = 2 * product_data.variables['methane_mixing_ratio_precision'].values[mask_all].astype(float) # Factor 2 to apply to SRON obserror !
subdata["chi_square_SWIR"] = product_data.variables[
'chi_square_SWIR'].values[mask_all].astype(float)
subdata["surface_classification"] = product_data.variables[
'surface_classification'].values[mask_all].astype(float)
# Load aks and obs
nlevread = product_data.dims["layer"]
if nlevread != self.nlayers:
raise Exception(f"The file {obs_file} does not have the expected number of "
f"averaging kernel layers: {nlevread} instead of "
f"{self.nlayers}. \n"
f"Please check your files or change the parameter 'nlayers' "
f"in the Yaml (expert users only).")
ak = np.float64(product_data.variables['column_averaging_kernel'][mask_all, :].values)
# Pressure levels
# particular case of TROPOM CH4: create pressure grid from psurf and delta
psurf = np.float64(product_data.variables['surface_pressure'][mask_all].values)
deltap = np.float64(product_data.variables['pressure_interval'][mask_all].values)
pavg0 = np.float64(np.array([psurf - deltap * lev for lev in range(self.nlayers + 1)]))
pavg = np.float64(np.transpose(pavg0))
# prior profile in molecules_percm2
qa0 = np.float64(product_data.variables['methane_profile_apriori'][mask_all, :].values)
conv = product_data.variables['methane_profile_apriori'].attrs['multiplication_factor_to_convert_to_molecules_percm2']
qa0 *= conv
# information specific to TROPOMI
# dry air subcolumns - required if not provided by the CTM (to update if O
dvair = np.float64(product_data.variables['dry_air_subcolumns'][mask_all, :].values)
conv = product_data.variables['dry_air_subcolumns'].attrs['multiplication_factor_to_convert_to_molecules_percm2']
dvair *= conv
# SWIR, NIR and Blended albedo
swir_albedo = np.float64(product_data.variables['surface_albedo_SWIR'][mask_all].values)
nir_albedo = np.float64(product_data.variables['surface_albedo_NIR'][mask_all].values)
blended_albedo = 2.4*nir_albedo - 1.13*swir_albedo
# aerosol size
aero_size = np.float64(product_data.variables['aerosol_size'][mask_all].values)
ds = xr.Dataset({'qa0': (['index', 'level'], qa0),
'ak': (['index', 'level'], ak),
'pavg0': (['index', 'level_pressure'], pavg),
'dryair': (['index', 'level'], dvair),
'nir_albedo':(['index'], nir_albedo),
'swir_albedo':(['index'], swir_albedo),
'blended_albedo':(['index'], blended_albedo),
'aerosol_size':(['index'], aero_size)},
coords={'index': subdata.index,
'level': range(self.nlayers),
'level_pressure': range(self.nlayers + 1)})
subdata = subdata.to_xarray()
subdata = xr.merge([ds, subdata])
debug(f"Observations after filter: {len(subdata.index)}")
# # Dump information and return pandas 1D values only
# dir_monitors = "{}/obs/{}/".format(self.workdir, self.parameter)
# subdata.to_netcdf(
# f"{dir_monitors}/monitor_{self.stationID}_{self.networkID}"
# f"_{self.parameter}_{data_day.strftime('%Y%m%d%H%M')}.nc"
# )
# subdata = xr.Dataset(
# {var: subdata[var] for var in subdata.variables
# if subdata[var].dims == ("index",)}).to_pandas()
return subdata