Source code for pycif.plugins.obsparsers.CO2M.multiple_parse
from logging import info
import glob
import os
import copy
import pandas as pd
import numpy as np
import xarray as xr
from ....utils.datastores.empty import init_empty
from ....utils.check import errclass as error
[docs]
def parse_multiple_files(self, spec="", **kwargs):
"""Parse multiple SRON files by orbit and merge them by day
Args:
self: the plugin with its describing arguments (in particular dir_obs)
Returns:
dict: {obs_file} = df[obssite_id, parameter]
"""
info("Reading files in " + self.dir_obs)
dir_monitors = "{}/obs/{}/".format(self.workdir, self.parameter)
# Aggregation intervals
aggreg_periods = pd.date_range(self.datei, self.datef,
freq=self.aggreg_freq).to_numpy()
info(aggreg_periods)
dfs = {ddi: None for ddi in aggreg_periods[:-1]}
date_ref = np.datetime64(self.datei)
dates2dump = copy.deepcopy(aggreg_periods[:-1])
# Loop over files
list_files = sorted(glob.glob(self.dir_obs + "*")) + ['']
file_date_min = date_ref
#info(list_files)
for obs_file in list_files:
# Dump intermediate monitor files for aggregation dates
if file_date_min >= date_ref:
date_ref = file_date_min
for ddi in dates2dump[dates2dump < date_ref]:
date_str = pd.to_datetime(ddi).strftime("%Y%m%d%H%M")
dates2dump = dates2dump[dates2dump != ddi]
# Reset index
if dfs[ddi] is None:
del dfs[ddi]
continue
dfs[ddi]["index"] = (("index",), range(dfs[ddi].dims["index"]))
# Dump to netcdf
dfs[ddi].to_netcdf(
f"{dir_monitors}/monitor_{self.stationID}_{self.networkID}"
f"_{self.parameter}_{date_str}.nc"
)
# Keep only 1D variables for later use in CIF
dfs[ddi] = xr.Dataset(
{var: dfs[ddi][var] for var in dfs[ddi].variables
if dfs[ddi][var].dims == ("index",)}).to_pandas()
# Stop if files after period of interest
if dates2dump.size == 0:
break
#continue
# Parse the file itself
try:
df = self.do_parse(obs_file, **kwargs)
except error.PluginError as e:
info(f"{obs_file} was not loaded for the following reason")
info(e)
continue
# Check level 1 dates
# Flush last datastores if still available
if self.data_date_max > np.datetime64(self.datef):
file_date_min = np.datetime64(self.datef)
continue
if len(df.index) == 0:
continue
# Aggregate at a given frequency
rounded_dates = pd.to_datetime(df["date"]).floor(self.aggreg_freq)
file_dates = np.unique(rounded_dates.values)
file_date_min = file_dates.min()
for ddi in file_dates:
mask = rounded_dates == ddi
if ddi in dfs.keys():
if dfs[ddi] is None:
dfs[ddi] = df
else:
info(dfs[ddi])
info(df.sel({"index": mask}))
dfs[ddi] = xr.concat(
[dfs[ddi], df.sel({"index": mask})], dim="index")
else:
info(f"error with {ddi} probably no data for this time step")
# Update dir / file to read averaging kernel info in satellite transform
self.dir = dir_monitors
self.file = f"monitor_{self.stationID}_{self.networkID}" \
f"_{self.parameter}_%Y%m%d%H%M.nc"
# Return concatenated dataframe
if dfs != {}:
return pd.concat(list(dfs.values()), ignore_index=True)
else:
return init_empty()