Source code for pycif.plugins.obsparsers.CO2M.parse

from logging import info, debug
import xarray as xr
import pandas as pd
import numpy as np
import datetime
import netCDF4 as nc
from netCDF4 import Dataset
from ....utils.datastores.empty import init_empty


[docs] def do_parse( self, obs_file, **kwargs ): """Parse function for a file from template observations Args: obs_file (str) : Path to input file Returns: pandas.DataFrame : Dataframe from input file df[parameter][station] """ info("Parsing CO2M pseudo data" "from observation file: {}".format(obs_file)) ddi, ddf = np.datetime64(self.datei), np.datetime64(self.datef) ref_year_delay = ddi - np.datetime64(datetime.datetime(self.ref_year, self.datei.month, self.datei.day)) product_data = xr.open_dataset(obs_file, group='data/observation_data', decode_times=False) product_data["time"].attrs['units'] = 'seconds since 2020-01-01 00:00:00.000' product_data = xr.decode_cf(product_data, decode_times=True) obs_dates = product_data["time"].astype('datetime64[ms]') \ + product_data["delta_time"].astype('timedelta64[ms]') \ + ref_year_delay.astype('timedelta64[ms]') obs_dates = obs_dates.values self.data_date_max = obs_dates.max() if ddi > obs_dates.max() or ddf < obs_dates.min(): debug(f"File {obs_file} with date {obs_dates.min()} " f"is outside simulation time period {ddi} / {ddf}") return init_empty() # Filter out data outside domain location_data = xr.open_dataset(obs_file, group='data/geolocation_data_dem', decode_times=False) longitude = location_data.variables['longitude'].values latitude = location_data.variables['latitude'].values mask_domain = np.ones_like(longitude, dtype=bool) if hasattr(self, "crop_domain"): crop_domain = self.crop_domain mask_domain = ((longitude < crop_domain.xmax) & (longitude > crop_domain.xmin) & (latitude < crop_domain.ymax) & (latitude > crop_domain.ymin)) if mask_domain.sum() == 0: return init_empty() # Check number of levels nlevread = product_data.dims["layer"]+1 if nlevread != self.nlayers: raise Exception( f"The file {obs_file} does not have the expected number of " f"averaging kernel layers: {nlevread} instead of " f"{self.nlayers}. \n" f"Please check your files or change the parameter 'nlayers' " f"in the Yaml (expert users only).") # Valid data scanline = product_data.dims["scanline"] ground_pixel = product_data.dims["ground_pixel"] #time = product_data.dims["time"] obs = product_data.variables['xco2'].values #"mumol/mol" #mask_valid = product_data.variables['xco2_quality_flag'].values >= self.qa_value mask_valid = ~np.isnan(obs) if (mask_domain & mask_valid).sum() == 0: return init_empty() # Filter out data not in simulation window mask_time = (obs_dates >= ddi) & (obs_dates <= ddf) mask_all = mask_time[..., np.newaxis] & mask_valid & mask_domain if mask_all.sum() == 0: return init_empty() else : info(" VALUES!!!! ") # Now process actual data debug(f"Data found in file {obs_file}") list_basic_cols = [ 'date', 'duration', 'station', 'network', 'parameter', 'lon', 'lat', 'obs', 'obserror'] subdata = pd.DataFrame(columns=list_basic_cols) subdata['date'] = np.repeat(obs_dates[...,np.newaxis],ground_pixel,axis=1)[mask_all] subdata = subdata.assign(duration=self.obs_duration) subdata['lon'] = longitude[mask_all] subdata['lat'] = latitude[mask_all] subdata = subdata.assign(station=self.stationID) subdata = subdata.assign(network=self.networkID) subdata = subdata.assign(parameter=self.parameter) subdata["obs"] = obs[mask_all].astype(float)*1000. if self.err_sys : obs_sys_error = product_data.variables[ 'xco2_Systematic_Error'].values.astype(float)*1000. obs_precision = product_data.variables[ 'xco2_precision'].values.astype(float)*1000. model_err = self.err_model * 1000. if self.err_sys : # subdata["obserror"] = np.sqrt(obs_sys_error[mask_all]**2+obs_precision[mask_all]**2+model_err**2) subdata["obserror"] = obs_sys_error[mask_all] else: subdata["obserror"] = np.sqrt(obs_precision[mask_all]**2+model_err**2) # Load aks and obs ak = np.float64(product_data.variables['xco2_averaging_kernel'] .values[mask_all, :] ) # Pressure levels # product_data.variables['pressure_level'] #ancillary_data = xr.open_dataset(obs_file, group="ancillary_algorithm1") psurf = product_data.variables['Surface_pressure'].values[mask_all] #Pa if self.pressure_file : with Dataset(self.pressure_file) as f: psurf = f.variables['Psurf'][:] ai = f.variables['a_vcoord'][:] bi = f.variables['b_vcoord'][:] ai = ai.astype(np.float32) bi = bi.astype(np.float32) n_layers = (ai.shape)[0]-1 pavg0 = ai*1.E+5+bi*psurf[:,np.newaxis] else : pw = product_data.variables['pressure_weight'].values[mask_all,:] pressure = [1., 5./6, 4./6, 3./6, 2./6, 1./6, 0.] pavg0 = psurf[:,np.newaxis]*pressure n_layers = self.nlayers ak = np.ones((psurf.shape[0],n_layers)) #pavg = np.concatenate((psurf[np.newaxis,:],plev),axis=0) #pavg = pavg[:,mask_all].T #pavg0 = np.concatenate((psurf[:,np.newaxis],plev),axis=1) #pavg0 = plev # prior profile in molecules_percm2 #qa0 = np.float64(product_data.variables['co2_profile_apriori'].values[mask_all, :])*1000. ds = xr.Dataset({ #'pw': (['index', 'level'], pw), 'ak': (['index', 'level'], ak), 'pavg0': (['index', 'level_pressure'], pavg0)}, # 'obs_precision' :(['index'], obs_precision), # 'obs_sys_error':(['index'], obs_sys_error)}, coords={'index': subdata.index, 'level': range(n_layers), 'level_pressure': range(n_layers+1)}) subdata = subdata.to_xarray() subdata = xr.merge([ds, subdata]) debug(f"Observations after filter: {len(subdata.index)}") return subdata