Source code for pycif.plugins.datastreams.fields.chimere_icbc.write

import os
import copy
import numpy as np
import pandas as pd
from netCDF4 import Dataset
import xarray as xr


[docs] def write(self, name, lbc_file, data, mode="a", metadata=None, comp_type=None, **kwargs): """Write flux to INICOND or BOUN_CONC CHIMERE compatible files. """ # If mode is 'a' but file does not exit, switch to mode 'w' if mode == "a" and not os.path.isfile(lbc_file): mode = "w" # Loading longitudes and latitudes try: domain = self.domain except AttributeError: if metadata is None: raise AttributeError( "Could not find any information about the domain." ) domain = metadata["domain"] lon = domain.zlon lat = domain.zlat nlev = domain.nlev # Try fetching comp_type is not given from call if comp_type is None: if not hasattr(self, "comp_type"): raise Exception( "CHIMERE ICBC write was called without specifying a component type. " "If called in dump2format, please add `comp_type` to the `dump_options` " "of the transform" ) comp_type = getattr(self, "comp_type") # Write INI_CONCS if comp_type == "inicond": if type(data) == dict: data_min = list(data.values())[0].min() else: data_min = data.min() if data_min > 1e10: units_ic = 'molecules/cm3' else: units_ic = 'ppb' write_iniconcs(name, lbc_file, data, lon, lat, mode, units_ic) else: lon_side = domain.zlon_side lat_side = domain.zlat_side write_bounconcs( name, lbc_file, data, lon, lat, lon_side, lat_side, nlev, mode, comp_type, )
[docs] def write_iniconcs(name, lbc_file, data, lon, lat, mode="a", units_in="ppb"): if type(name) == str: data = {name: data} name = [name] data_ref = data[name[0]] # Check that the variables have the correct dimensions if np.sum(lon.shape) * 2 == data_ref[0, 0].values.size: raise Exception( "Warning! The data to dump in the INI_CONCS files has the shape of " "boundary conditions. Please check your Yaml! \n" "This can be caused by fields being interpolated with the option " "`is_lbc = True` in the corresponding paragraph of your yml file." ) if lon.shape != data_ref[0].values.shape[1:]: raise Exception( "WARNING! The data to dump in the INI_CONCS files does not have a correct " "horizontal shape: " f"{data_ref[0].values.shape[1:]} instead of {lon.shape}." ) # Append species to existing file spstrlen = 23 specs_var = [list(n.ljust(spstrlen)) for n in name] if os.path.isfile(lbc_file) and mode == "a": with Dataset(lbc_file, "a") as f: ljust_specs_in = f.variables["species"][:].astype(str) specs_in = ["".join(p).strip() for p in ljust_specs_in] specs_str = ["".join(p) for p in ljust_specs_in] update_species = False for n in name: if n not in specs_in: specs_str += [n.ljust(spstrlen)] update_species = True if update_species: dtype = np.dtype(('S', spstrlen)) specs = xr.DataArray( data=specs_str, dims=["Species"]).astype(dtype) ds = xr.load_dataset(lbc_file) del ds["species"] ds["species"] = specs ds.to_netcdf(lbc_file, format="NETCDF3_CLASSIC", encoding={'species': {'char_dim_name': 'SpStrLen'}}) specs_var = [list(s.ljust(spstrlen)) for s in specs_str] # Manage species variables spec_str = ["".join(p) for p in specs_var] dtype = np.dtype(('S', spstrlen)) spec_array = xr.DataArray( spec_str, dims=["Species"]).astype(dtype) # Manage time variable timestrLen = 19 str_dates = list( data_ref["time"].dt.strftime("%Y-%m-%d_%H:%M:00") .to_pandas().values ) dtype = np.dtype(('S', timestrLen)) time_array = xr.DataArray( str_dates, dims=["Time"]).astype(dtype) # Initialize dataset ds = xr.Dataset(data_vars={ "Times": time_array, "species": spec_array }) # Fill with lon/lat ds["lon"] = xr.DataArray( data=lon.astype("f"), dims=("south_north", "west_east"), attrs={"units": "degrees_east", "long_name": "Longitude"} ) ds["lat"] = xr.DataArray( data=lat.astype("f"), dims=("south_north", "west_east"), attrs={"units": "degrees_north", "long_name": "Latitude"} ) # Now include data for k in data: ds[k] = xr.DataArray( data=data[k].values[0].astype("d"), dims=("bottom_top", "south_north", "west_east"), attrs={"units": units_in, "long_name": k} ) # Dump to netcdf encoding = { 'Times': {'char_dim_name': 'DateStrLen'}, 'species': {'char_dim_name': 'SpStrLen'} } ds.to_netcdf( lbc_file, mode, format="NETCDF3_CLASSIC", encoding=encoding, unlimited_dims={'Time': True})
[docs] def write_bounconcs( name, lbc_file, data, lon, lat, lon_side, lat_side, nlev, mode="a", comp_type=None, ): if type(name) == str: data = {name: data} name = [name] data_ref = data[name[0]] nhours, nlev, nmerid, nzonal = np.shape(data_ref) nhours = len(data_ref) nsides = lon_side.size nmerid, nzonal = np.shape(lon) spstrlen = 23 datestrlen = 19 # Append species to existing file if os.path.isfile(lbc_file) and mode == "a": with Dataset(lbc_file, "a") as f: ljust_specs_in = f.variables["species"][:].astype(str) specs_in = ["".join(p).strip() for p in ljust_specs_in] top_in = f.variables["top_conc"][:] lat_in = f.variables["lat_conc"][:] specs_str = ["".join(p) for p in ljust_specs_in] for n in name: if n in specs_in: ispec = specs_in.index(n) if comp_type == "topcond": top_in[..., ispec] = data[n].data[:, 0] else: lat_in[..., ispec] = data[n].data[..., 0, :] else: specs_str += [n.ljust(spstrlen)] if comp_type == "topcond": top_in = np.concatenate( [top_in, data[n].data[:, 0]], axis=3 ) lat_in = np.concatenate( [lat_in, np.zeros((nhours, nlev, nsides, 1))], axis=3 ) else: top_in = np.concatenate( [top_in, np.zeros((nhours, nmerid, nzonal, 1))], axis=3 ) lat_in = np.concatenate([ lat_in, data[n].data[:, :, 0, :, np.newaxis]], axis=3 ) # Update file dtype = np.dtype(('S', spstrlen)) specs = xr.DataArray( data=specs_str, dims=["Species"]).astype(dtype) ds = xr.load_dataset(lbc_file) # Clean old variables del ds["species"] del ds["lat_conc"], ds["top_conc"] # Update list species ds["species"] = specs # Update values ds["lat_conc"] = ( ("Time", "bottom_top", "h_boundary", "Species"), lat_in) ds["top_conc"] = ( ("Time", "south_north", "west_east", "Species"), top_in) ds.to_netcdf(lbc_file, format="NETCDF3_CLASSIC", encoding={'species': {'char_dim_name': 'SpStrLen'}, 'Times': {'char_dim_name': 'DateStrLen'}}, compute=True, mode="w", unlimited_dims={'Time': True}) return # Otherwise, create file from scratch # Manage species variables spec_str = [n.ljust(spstrlen) for n in name] dtype = np.dtype(('S', spstrlen)) spec_array = xr.DataArray( spec_str, dims=["Species"]).astype(dtype) # Manage time variable timestrLen = 19 str_dates = list( data_ref["time"].dt.strftime("%Y-%m-%d_%H:%M:00") .to_pandas().values ) dtype = np.dtype(('S', timestrLen)) time_array = xr.DataArray( str_dates, dims=["Time"]).astype(dtype) # Initialize dataset ds = xr.Dataset(data_vars={ "Times": time_array, "species": spec_array }) # Fill with lon/lat ds["lon"] = xr.DataArray( data=lon.astype("f"), dims=("south_north", "west_east"), attrs={"units": "degrees_east", "long_name": "Longitude"} ) ds["lat"] = xr.DataArray( data=lat.astype("f"), dims=("south_north", "west_east"), attrs={"units": "degrees_north", "long_name": "Latitude"} ) # Now include data nspecies = len(name) ds["top_conc"] = xr.DataArray( data=np.concatenate([ data[k].data[:, 0, ..., np.newaxis] for k in data ], axis=-1) if comp_type == "topcond" else np.zeros((nhours, nmerid, nzonal, nspecies)), dims=("Time", "south_north", "west_east", "Species"), attrs={"long_name": "top_conc", "units": "ppb"} ) ds["lat_conc"] = xr.DataArray( data=np.concatenate([ data[k].data[..., 0, :, np.newaxis] for k in data ], axis=-1) if comp_type == "latcond" else np.zeros((nhours, nlev, nsides, nspecies)), dims=("Time", "bottom_top", "h_boundary", "Species"), attrs={"long_name": "lat_conc", "units": "ppb"} ) # Dump to netcdf encoding = { 'Times': {'char_dim_name': 'DateStrLen'}, 'species': {'char_dim_name': 'SpStrLen'} } ds.to_netcdf( lbc_file, mode, format="NETCDF3_CLASSIC", encoding=encoding, unlimited_dims={'Time': True})