Source code for pycif.plugins.datastreams.fluxes.chimere.write

import os
import copy
from netCDF4 import Dataset
import numpy as np
import pandas as pd
import xarray as xr

from .....utils.classes.fluxes import Flux


[docs] def write(self, name, flx_file, flx, mode="a", metadata=None, **kwargs): """Write flux to AEMISSION CHIMERE compatible files. Args: self (Fluxes): the Fluxes plugin flx_file (str): the file where to write fluxes flx (xarray.DataArray): fluxes data to write mode (str): 'w' to overwrite, 'a' to append """ # Turn name to list to have cross-compatibility between single or multiple call if type(name) == str: flx = {name: flx} name = [name] flx_ref = copy.deepcopy(flx[name[0]]) # If mode is 'a' but file does not exit, switch to mode 'w' if mode == "a" and not os.path.isfile(flx_file): mode = "w" # Check that values are valid for a AEMISSION file if np.any((np.abs(flx_ref) == np.inf) | np.isnan(flx_ref)): raise Exception("Trying to dump incorrect emission values " "(NaNs, inf or -inf) for species {} " "in file {}".format(name[0], flx_file)) # Variables from self if hasattr(self, "domain"): lon = self.domain.zlon lat = self.domain.zlat elif metadata is not None: lon = metadata["domain"].zlon lat = metadata["domain"].zlat else: raise Exception("Could not find information about the domain") nlev = self.nlevemis write_AEMISSIONS(name, flx_file, flx, mode, lon, lat, nlev, self.ncformat)
[docs] def write_AEMISSIONS(name, flx_file, flx, mode="a", lon=None, lat=None, nlev=1, ncformat = 'NETCDF4' ): """Auxiliary function that can be used outside pycif""" # Turn name to list to have cross-compatibility between single or multiple call if type(name) == str: flx_ref = copy.deepcopy(flx) flx = {name: flx} name = [name] flx_ref = copy.deepcopy(flx[name[0]]) # Check that the number of levels in the data is consistent with what to be dumped if flx_ref.shape[1] != nlev: raise Exception( f"Trying to dump data of vertical dimensiom = {flx_ref.shape[1]} " f"into an AEMISSIONS file with nlevemis = {nlev}. " f"This is most likely to happen when using the dump2format function with " f"inconsistent options.\n" f"Please check your yml!" ) # Append species to existing file spstrlen = 23 specs_str = [n.ljust(spstrlen) for n in name] if os.path.isfile(flx_file) and mode == "a": with Dataset(flx_file, "a") as f: ljust_specs_in = [] if "species" in f.variables: ljust_specs_in = f.variables["species"][:].astype(str) specs_in = ["".join(p).strip() for p in ljust_specs_in] specs_str = ["".join(p) for p in ljust_specs_in] update_species = False for n in name: if n not in specs_in: specs_str += [n.ljust(spstrlen)] update_species = True if update_species: dtype = np.dtype(('S', spstrlen)) specs = xr.DataArray( data=specs_str, dims=["Species"]).astype(dtype) ds = xr.load_dataset(flx_file) if "species" in ds: del ds["species"] ds["species"] = specs ds.to_netcdf(flx_file, format=ncformat, encoding={'species': {'char_dim_name': 'SpStrLen'}}) # Manage species variables spstrlen = 23 dtype = np.dtype(('S', spstrlen)) spec_array = xr.DataArray( specs_str, dims=["Species"]).astype(dtype) # Manage time variable timestrLen = 19 str_dates = list( flx_ref["time"].dt.strftime("%Y-%m-%d_%H:%M:00") .to_pandas().values ) dtype = np.dtype(('S', timestrLen)) time_array = xr.DataArray( str_dates, dims=["Time"]).astype(dtype) # Initialize dataset ds = xr.Dataset(data_vars={ "Times": time_array, "species": spec_array }) # Now include data for k in flx: ds[k] = xr.DataArray( data=flx[k].values.astype("d"), dims=("Time", "bottom_top", "south_north", "west_east"), attrs={"units": "molecule/cm2/s", "long_name": f"{k} emissions"}) # Dump to netcdf encoding = { 'Times': {'char_dim_name': 'DateStrLen'}, 'species': {'char_dim_name': 'SpStrLen'} } if ncformat=='NETCDF4' : comp = dict(zlib=True, complevel=5) for var in ds.data_vars: ds[var].encoding.update(comp) ds = ds.chunk(chunks={"Time":1, "bottom_top": nlev, "south_north": lon.shape[0], "west_east":lon.shape[1]}) ds.to_netcdf( flx_file, mode, format=ncformat, encoding=encoding, unlimited_dims={'Time': True})