Source code for pycif.plugins.datastreams.fluxes.gridded_NetCDF.read

import os
from logging import debug, info

import numpy as np
import pandas as pd
import xarray as xr

from ....domains.gridded_NetCDF.utils import find_coord
from .time_coord import (
    convert_calendar,
    find_time_coord,
    get_calendar,
    preprocess_time_coord,
    shift_years,
)
from .utils import OFFSET, get_year_offset


[docs] def read( self, name, varnames, dates, files, interpol_flx=False, tracer=None, model=None, ddi=None, debug_read=False, **kwargs ): """Get fluxes from raw files and load them into a pyCIF variables. Args: name (str): name of the component varnames (list[str]): original names of variables to read; use `name` if `varnames` is empty dates (list): list of the date intervals to extract files (list): list of the files matching dates Return: xr.DataArray: the actual data with dimension: time, levels, latitudes, longitudes """ # Return empty datastore if no date if not dates: return xr.DataArray( data=np.zeros((0, 0, 0, 0)), dims=('time', 'lev', 'lat', 'lon') ) varnames = varnames if varnames else name time_varname = getattr(tracer, 'time_varname', None) da_list = [] ref_path = "" ref_year_offset = None for (date_i, date_f), file_path in zip(dates, files): if file_path != ref_path: ref_path = file_path # Decode times or not for very badly formatted files... try: with xr.open_dataset(file_path) as ds: decode_times = True except ValueError: decode_times = False # Filter MissingDimensionErrors try: ds = xr.open_dataset( file_path, decode_times=decode_times, drop_variables=tracer.drop_variables ) except xr.core.variable.MissingDimensionsError as f: info(f) raise Exception( f"Could not open {file_path} with xarray. " "This happens when one dimension variable has multiple dimensions, " "with a name common with one of its own dimensions. " "Please consider reformatting your NetCDF properly. " "It is possible to use the argument `drop_variables` in your yaml " "to avoid reading the erroneous variables. \n" "Please check the error message above to find out which variable is the problem." ) with xr.open_dataset( file_path, group=getattr(tracer, "group_name", None), decode_times=decode_times, drop_variables=tracer.drop_variables ) as ref_ds: ref_ds, time_varname = find_time_coord(ref_ds, tracer, date_i) ref_ds = preprocess_time_coord( ref_ds, tracer, time_varname, date_i, time_midpoint=tracer.time_midpoint, time_endpoint=tracer.time_endpoint, file_freq=tracer.file_freq ) calendar = getattr( tracer, "time_calendar", ref_ds[time_varname].dt.calendar ) # Check that the variable is present try: _ = ref_ds[varnames] except KeyError as e: raise KeyError( f"The variable {varnames} is not in {file_path}.\n" "Please check your yaml and define the `varname` parameter properly." ) from e # Duplicate the dataset for later use ds = ref_ds.copy() # Get year offset year_offset = get_year_offset( ref_ds, time_varname, date_i.year, self.is_climatology) # Shift years only if necessary if ref_year_offset is None or year_offset != ref_year_offset: ref_year_offset = year_offset ds = ref_ds.copy() ds = shift_years(ds, time_varname, year_offset) # 'convert_calendar' must be called after 'shift_years', # otherwhise leap years will be incorrect ds = convert_calendar(ds, tracer, time_varname, calendar) # Slicing along time coordinate dslice = slice(date_i, date_f - OFFSET) da = ds[varnames].sel({time_varname: dslice}) # Slice over specified dimension if hasattr(tracer, "slice_dimension"): slice_dimension = tracer.slice_dimension for dim in slice_dimension.attributes: # Check that the dimension fit in the file if dim not in da.dims: raise Exception( f"Trying to slice over the dimension {dim} whereas " f"the dimension is not in the dataset: {da.dims}" ) # Now slice over the dimension slice_dim = getattr(slice_dimension, dim) method = slice_dim.method slice_min = getattr(slice_dim, "slice_min", None) slice_max = getattr(slice_dim, "slice_max", None) slice_freq = getattr(slice_dim, "slice_freq", None) dslice = slice(slice_min, slice_max, slice_freq) da_tmp = da.sel({dim: dslice}) if method == "sum": da = da_tmp.sum(dim=dim) elif method == "avg": da = da_tmp.mean(dim=dim) else: raise Exception( f"Trying to slice with an unknown methd: {method}" ) # Excepting a single time value in the slice if da[time_varname].size == 0: raise ValueError(f"no value in file '{file_path}' between " f"datetimes {date_i} and {date_f}") if da[time_varname].size >= 2: raise ValueError(f"multiple values in file '{file_path}' between " f"datetimes {date_i} and {date_f}") # If multiple varnames and for summing them if isinstance(varnames, list) and not tracer.sum_variables: raise TypeError( "A list of variables have been specified, whereas " "'sum_variables' is set to False. Please check your " "yaml and the documentation." ) elif isinstance(varnames, list): da = sum([da[v] for v in varnames]) # Force the time variable to be aligned with date_i da = da.assign_coords({time_varname: [date_i]}) da_list.append(da) # Concatenating all the slices xmod = xr.concat(da_list, dim=time_varname) # Check that data is consistent with the domain domain = tracer.domain if not xmod.shape[-2:] == domain.zlat.shape: raise ValueError( "The data has not the same shape as the domain initialized from the file:\n" f"\t- data shape: {xmod.shape[1:]}\n" f"\t- domain lat/lon shape: {domain.zlat.shape}\n" "This can either come from erroneous files with inconsistent lon/lat comapred to the data, " "or from mis-specified fix longitudes or latitudes from your yaml:\n" f"\t- delta_lat: {getattr(domain, 'delta_lat', 'None')} \n" f"\t- delta_lon: {getattr(domain, 'delta_lon', 'None')} \n" "Please check below that the longitudes / latitudes are what you expect:\n" f"\t- longitudes: {domain.zlon} \n" f"\t- latitudes: {domain.zlat} \n" ) # Getting coordinates names lat_varname = find_coord(xmod, domain.latitude_dimname, 'lat') lon_varname = find_coord(xmod, domain.longitude_dimname, 'lon') # Getting dimensions names time_dimname, = xmod[time_varname].dims lat_dimname, = xmod[lat_varname].dims lon_dimname, = xmod[lon_varname].dims # Renaming coordinates and dimensions rename_mapping = [ (time_varname, 'time'), (time_dimname, 'time'), (lat_varname, 'lat'), (lat_dimname, 'lat'), (lon_varname, 'lon'), (lon_dimname, 'lon') ] # Renaming vertical dimension if present if self.vertical_dim_name in xmod.dims: rename_mapping.append((self.vertical_dim_name, 'lev')) try: xmod = xmod.rename({ old_name: new_name for old_name, new_name in rename_mapping if old_name != new_name }) except ValueError as e: raise ValueError( "Could not rename variables due to incompatible names.\n" "Trying to rename as follows:\n" + "\n".join([f"\t-{old_name} -> {new_name}" for old_name, new_name in rename_mapping]) + "\n" + "This can come to erroneous attributes specified in the lon/lat variables and dimensions in your file.\n" f"Please inspect the original files, in particular long_names and attributes.\n" "\n".join([f' - {f}' for f in files]) + "\n" "In case, long_names and/or attributes are not conventional (or erroneous), it is possible to overwrite " "the default parsing of lon/lat dimension names by including the following attributes in your yaml:\n" " - latitude_dimname\n" " - longitude_dimname" ) from e # Correcting the time coordinates (usefull when climatology or shift year) is_in_intervals = all(pd.to_datetime(di) <= date <= pd.to_datetime(df) for date, (di, df) in zip(xmod.time.values, dates)) if not is_in_intervals: xmod['time'] = (['time'], [di + (df - di) / 2 for di, df in dates]) debug(f"Replaced dates with: {xmod.time.values}") # Sorting latitudes and longitudes in ascending order dims_to_sort = [] if tracer.sort_lat: dims_to_sort.append('lat') if tracer.sort_lon: dims_to_sort.append('lon') if dims_to_sort: xmod = xmod.sortby(dims_to_sort) # Adding a 'lev' dimension if "lev" not in xmod.dims: xmod = xmod.expand_dims(['lev']) # Summing along an extra dimension if hasattr(self, "sum_along_dim"): if self.sum_along_dim not in xmod.dims: raise KeyError( f"Requested to sum along dimension {self.sum_along_dim}, " f"but the dimension is not in list of dimensions: " f"{xmod.dims}" ) xmod = xmod.sum(dim=self.sum_along_dim) # Reordering dimensions if set(xmod.dims) != set(['time', 'lev', 'lat', 'lon']): raise KeyError( "The retrieved data does not fit the expected dimensions " "('time', 'lev', 'lat', 'lon'). \n" f"The following dimensions are present in data: {xmod.dims}\n" "Consider using the parameter `sum_along_dim` or `slice_dimension`.\n" "This could also come from one of the dimension not being well parsed.\n" "Please check the names of the data dimensions and consider updating your yaml to help CIF " "parsing the dimensions. The corresponding paramters in the yaml are:\n" "\t- `vertical_dim_name`\n" "\t- `latitude_dimname`\n" "\t- `longitude_dimname`\n" "\t- `time_varname`\n" ) xmod = xmod.transpose('time', 'lev', 'lat', 'lon') # Sort vertical dimensions if tracer.domain.vflip: xmod = xmod[:, ::-1, :, :] # Fix issue with NaNs type in some cases xmod = xmod.astype(xmod.values.dtype) # Check consistency between vertical domain and data shape vdim_data = xmod.shape[1] vdim_domain = self.domain.nlev if vdim_domain != vdim_data: raise ValueError( f"Your data has a vertical size of {vdim_data}, " f"whereas the domain vertical extent is {vdim_domain}. \n" "Please ensure consistency. This can happen if you forgot to specify the argument " "'vertical_coord' in your yaml, in particular if the domain vertical extent appears as 1." ) return xmod