Source code for pycif.plugins.obsparsers.CO2M.multiple_parse

from logging import info
import glob
import os
import copy
import pandas as pd
import numpy as np
import xarray as xr
from ....utils.datastores.empty import init_empty
from ....utils.check import errclass as error


[docs] def parse_multiple_files(self, spec="", **kwargs): """Parse multiple SRON files by orbit and merge them by day Args: self: the plugin with its describing arguments (in particular dir_obs) Returns: dict: {obs_file} = df[obssite_id, parameter] """ info("Reading files in " + self.dir_obs) dir_monitors = "{}/obs/{}/".format(self.workdir, self.parameter) # Aggregation intervals aggreg_periods = pd.date_range(self.datei, self.datef, freq=self.aggreg_freq).to_numpy() info(aggreg_periods) dfs = {ddi: None for ddi in aggreg_periods[:-1]} date_ref = np.datetime64(self.datei) dates2dump = copy.deepcopy(aggreg_periods[:-1]) # Loop over files list_files = sorted(glob.glob(self.dir_obs + "*")) + [''] file_date_min = date_ref #info(list_files) for obs_file in list_files: # Dump intermediate monitor files for aggregation dates if file_date_min >= date_ref: date_ref = file_date_min for ddi in dates2dump[dates2dump < date_ref]: date_str = pd.to_datetime(ddi).strftime("%Y%m%d%H%M") dates2dump = dates2dump[dates2dump != ddi] # Reset index if dfs[ddi] is None: del dfs[ddi] continue dfs[ddi]["index"] = (("index",), range(dfs[ddi].dims["index"])) # Dump to netcdf dfs[ddi].to_netcdf( f"{dir_monitors}/monitor_{self.stationID}_{self.networkID}" f"_{self.parameter}_{date_str}.nc" ) # Keep only 1D variables for later use in CIF dfs[ddi] = xr.Dataset( {var: dfs[ddi][var] for var in dfs[ddi].variables if dfs[ddi][var].dims == ("index",)}).to_pandas() # Stop if files after period of interest if dates2dump.size == 0: break #continue # Parse the file itself try: df = self.do_parse(obs_file, **kwargs) except error.PluginError as e: info(f"{obs_file} was not loaded for the following reason") info(e) continue # Check level 1 dates # Flush last datastores if still available if self.data_date_max > np.datetime64(self.datef): file_date_min = np.datetime64(self.datef) continue if len(df.index) == 0: continue # Aggregate at a given frequency rounded_dates = pd.to_datetime(df["date"]).floor(self.aggreg_freq) file_dates = np.unique(rounded_dates.values) file_date_min = file_dates.min() for ddi in file_dates: mask = rounded_dates == ddi if ddi in dfs.keys(): if dfs[ddi] is None: dfs[ddi] = df else: info(dfs[ddi]) info(df.sel({"index": mask})) dfs[ddi] = xr.concat( [dfs[ddi], df.sel({"index": mask})], dim="index") else: info(f"error with {ddi} probably no data for this time step") # Update dir / file to read averaging kernel info in satellite transform self.dir = dir_monitors self.file = f"monitor_{self.stationID}_{self.networkID}" \ f"_{self.parameter}_%Y%m%d%H%M.nc" # Return concatenated dataframe if dfs != {}: return pd.concat(list(dfs.values()), ignore_index=True) else: return init_empty()