import os
import copy
from netCDF4 import Dataset
import numpy as np
import pandas as pd
import xarray as xr
from .....utils.classes.fluxes import Flux
[docs]
def write(self, name, flx_file, flx, mode="a", metadata=None, **kwargs):
"""Write flux to AEMISSION CHIMERE compatible files.
Args:
self (Fluxes): the Fluxes plugin
flx_file (str): the file where to write fluxes
flx (xarray.DataArray): fluxes data to write
mode (str): 'w' to overwrite, 'a' to append
"""
# Turn name to list to have cross-compatibility between single or multiple call
if type(name) == str:
flx = {name: flx}
name = [name]
flx_ref = copy.deepcopy(flx[name[0]])
# If mode is 'a' but file does not exit, switch to mode 'w'
if mode == "a" and not os.path.isfile(flx_file):
mode = "w"
# Check that values are valid for a AEMISSION file
if np.any((np.abs(flx_ref) == np.inf) | np.isnan(flx_ref)):
raise Exception("Trying to dump incorrect emission values "
"(NaNs, inf or -inf) for species {} "
"in file {}".format(name[0], flx_file))
# Variables from self
if hasattr(self, "domain"):
lon = self.domain.zlon
lat = self.domain.zlat
elif metadata is not None:
lon = metadata["domain"].zlon
lat = metadata["domain"].zlat
else:
raise Exception("Could not find information about the domain")
nlev = self.nlevemis
write_AEMISSIONS(name, flx_file, flx, mode, lon, lat, nlev, self.ncformat)
[docs]
def write_AEMISSIONS(name, flx_file, flx, mode="a", lon=None, lat=None, nlev=1, ncformat = 'NETCDF4' ):
"""Auxiliary function that can be used outside pycif"""
# Turn name to list to have cross-compatibility between single or multiple call
if type(name) == str:
flx_ref = copy.deepcopy(flx)
flx = {name: flx}
name = [name]
flx_ref = copy.deepcopy(flx[name[0]])
# Check that the number of levels in the data is consistent with what to be dumped
if flx_ref.shape[1] != nlev:
raise Exception(
f"Trying to dump data of vertical dimensiom = {flx_ref.shape[1]} "
f"into an AEMISSIONS file with nlevemis = {nlev}. "
f"This is most likely to happen when using the dump2format function with "
f"inconsistent options.\n"
f"Please check your yml!"
)
# Append species to existing file
spstrlen = 23
specs_str = [n.ljust(spstrlen) for n in name]
if os.path.isfile(flx_file) and mode == "a":
with Dataset(flx_file, "a") as f:
ljust_specs_in = []
if "species" in f.variables:
ljust_specs_in = f.variables["species"][:].astype(str)
specs_in = ["".join(p).strip() for p in ljust_specs_in]
specs_str = ["".join(p) for p in ljust_specs_in]
update_species = False
for n in name:
if n not in specs_in:
specs_str += [n.ljust(spstrlen)]
update_species = True
if update_species:
dtype = np.dtype(('S', spstrlen))
specs = xr.DataArray(
data=specs_str, dims=["Species"]).astype(dtype)
ds = xr.load_dataset(flx_file)
if "species" in ds:
del ds["species"]
ds["species"] = specs
ds.to_netcdf(flx_file, format=ncformat,
encoding={'species': {'char_dim_name': 'SpStrLen'}})
# Manage species variables
spstrlen = 23
dtype = np.dtype(('S', spstrlen))
spec_array = xr.DataArray(
specs_str, dims=["Species"]).astype(dtype)
# Manage time variable
timestrLen = 19
str_dates = list(
flx_ref["time"].dt.strftime("%Y-%m-%d_%H:%M:00")
.to_pandas().values
)
dtype = np.dtype(('S', timestrLen))
time_array = xr.DataArray(
str_dates, dims=["Time"]).astype(dtype)
# Initialize dataset
ds = xr.Dataset(data_vars={
"Times": time_array,
"species": spec_array
})
# Now include data
for k in flx:
ds[k] = xr.DataArray(
data=flx[k].values.astype("d"),
dims=("Time", "bottom_top", "south_north", "west_east"),
attrs={"units": "molecule/cm2/s", "long_name": f"{k} emissions"})
# Dump to netcdf
encoding = {
'Times': {'char_dim_name': 'DateStrLen'},
'species': {'char_dim_name': 'SpStrLen'}
}
if ncformat=='NETCDF4' :
comp = dict(zlib=True, complevel=5)
for var in ds.data_vars:
ds[var].encoding.update(comp)
ds = ds.chunk(chunks={"Time":1,
"bottom_top": nlev,
"south_north": lon.shape[0],
"west_east":lon.shape[1]})
ds.to_netcdf(
flx_file, mode, format=ncformat,
encoding=encoding, unlimited_dims={'Time': True})