from logging import info, debug
import xarray as xr
import pandas as pd
import numpy as np
import datetime
import netCDF4 as nc
from netCDF4 import Dataset
from ....utils.datastores.empty import init_empty
[docs]
def do_parse(
self,
obs_file,
**kwargs
):
"""Parse function for a file from template observations
Args:
obs_file (str) :
Path to input file
Returns:
pandas.DataFrame :
Dataframe from input file df[parameter][station]
"""
info("Parsing CO2M pseudo data"
"from observation file: {}".format(obs_file))
ddi, ddf = np.datetime64(self.datei), np.datetime64(self.datef)
ref_year_delay = ddi - np.datetime64(datetime.datetime(self.ref_year, self.datei.month, self.datei.day))
product_data = xr.open_dataset(obs_file, group='data/observation_data', decode_times=False)
product_data["time"].attrs['units'] = 'seconds since 2020-01-01 00:00:00.000'
product_data = xr.decode_cf(product_data, decode_times=True)
obs_dates = product_data["time"].astype('datetime64[ms]') \
+ product_data["delta_time"].astype('timedelta64[ms]') \
+ ref_year_delay.astype('timedelta64[ms]')
obs_dates = obs_dates.values
self.data_date_max = obs_dates.max()
if ddi > obs_dates.max() or ddf < obs_dates.min():
debug(f"File {obs_file} with date {obs_dates.min()} "
f"is outside simulation time period {ddi} / {ddf}")
return init_empty()
# Filter out data outside domain
location_data = xr.open_dataset(obs_file, group='data/geolocation_data_dem', decode_times=False)
longitude = location_data.variables['longitude'].values
latitude = location_data.variables['latitude'].values
mask_domain = np.ones_like(longitude, dtype=bool)
if hasattr(self, "crop_domain"):
crop_domain = self.crop_domain
mask_domain = ((longitude < crop_domain.xmax) &
(longitude > crop_domain.xmin) &
(latitude < crop_domain.ymax) &
(latitude > crop_domain.ymin))
if mask_domain.sum() == 0:
return init_empty()
# Check number of levels
nlevread = product_data.dims["layer"]+1
if nlevread != self.nlayers:
raise Exception(
f"The file {obs_file} does not have the expected number of "
f"averaging kernel layers: {nlevread} instead of "
f"{self.nlayers}. \n"
f"Please check your files or change the parameter 'nlayers' "
f"in the Yaml (expert users only).")
# Valid data
scanline = product_data.dims["scanline"]
ground_pixel = product_data.dims["ground_pixel"]
#time = product_data.dims["time"]
obs = product_data.variables['xco2'].values #"mumol/mol"
#mask_valid = product_data.variables['xco2_quality_flag'].values >= self.qa_value
mask_valid = ~np.isnan(obs)
if (mask_domain & mask_valid).sum() == 0:
return init_empty()
# Filter out data not in simulation window
mask_time = (obs_dates >= ddi) & (obs_dates <= ddf)
mask_all = mask_time[..., np.newaxis] & mask_valid & mask_domain
if mask_all.sum() == 0:
return init_empty()
else :
info(" VALUES!!!! ")
# Now process actual data
debug(f"Data found in file {obs_file}")
list_basic_cols = [
'date', 'duration', 'station', 'network', 'parameter',
'lon', 'lat', 'obs', 'obserror']
subdata = pd.DataFrame(columns=list_basic_cols)
subdata['date'] = np.repeat(obs_dates[...,np.newaxis],ground_pixel,axis=1)[mask_all]
subdata = subdata.assign(duration=self.obs_duration)
subdata['lon'] = longitude[mask_all]
subdata['lat'] = latitude[mask_all]
subdata = subdata.assign(station=self.stationID)
subdata = subdata.assign(network=self.networkID)
subdata = subdata.assign(parameter=self.parameter)
subdata["obs"] = obs[mask_all].astype(float)*1000.
if self.err_sys :
obs_sys_error = product_data.variables[
'xco2_Systematic_Error'].values.astype(float)*1000.
obs_precision = product_data.variables[
'xco2_precision'].values.astype(float)*1000.
model_err = self.err_model * 1000.
if self.err_sys :
# subdata["obserror"] = np.sqrt(obs_sys_error[mask_all]**2+obs_precision[mask_all]**2+model_err**2)
subdata["obserror"] = obs_sys_error[mask_all]
else:
subdata["obserror"] = np.sqrt(obs_precision[mask_all]**2+model_err**2)
# Load aks and obs
ak = np.float64(product_data.variables['xco2_averaging_kernel']
.values[mask_all, :]
)
# Pressure levels
# product_data.variables['pressure_level']
#ancillary_data = xr.open_dataset(obs_file, group="ancillary_algorithm1")
psurf = product_data.variables['Surface_pressure'].values[mask_all] #Pa
if self.pressure_file :
with Dataset(self.pressure_file) as f:
psurf = f.variables['Psurf'][:]
ai = f.variables['a_vcoord'][:]
bi = f.variables['b_vcoord'][:]
ai = ai.astype(np.float32)
bi = bi.astype(np.float32)
n_layers = (ai.shape)[0]-1
pavg0 = ai*1.E+5+bi*psurf[:,np.newaxis]
else :
pw = product_data.variables['pressure_weight'].values[mask_all,:]
pressure = [1., 5./6, 4./6, 3./6, 2./6, 1./6, 0.]
pavg0 = psurf[:,np.newaxis]*pressure
n_layers = self.nlayers
ak = np.ones((psurf.shape[0],n_layers))
#pavg = np.concatenate((psurf[np.newaxis,:],plev),axis=0)
#pavg = pavg[:,mask_all].T
#pavg0 = np.concatenate((psurf[:,np.newaxis],plev),axis=1)
#pavg0 = plev
# prior profile in molecules_percm2
#qa0 = np.float64(product_data.variables['co2_profile_apriori'].values[mask_all, :])*1000.
ds = xr.Dataset({ #'pw': (['index', 'level'], pw),
'ak': (['index', 'level'], ak),
'pavg0': (['index', 'level_pressure'], pavg0)},
# 'obs_precision' :(['index'], obs_precision),
# 'obs_sys_error':(['index'], obs_sys_error)},
coords={'index': subdata.index,
'level': range(n_layers),
'level_pressure': range(n_layers+1)})
subdata = subdata.to_xarray()
subdata = xr.merge([ds, subdata])
debug(f"Observations after filter: {len(subdata.index)}")
return subdata