Source code for pycif.plugins.modes.variational.sampling

import copy
import subprocess
import os
import glob
import re
import pandas as pd
import time
import pickle
from ....utils import path
from logging import info, debug
from ....utils.yml import ordered_dump


[docs] def compute_sample(self, controlvect, obsvect, x_sample, y_sample): """Submit one variational inversion per ensemble member and collect the results. For each ``(x, y)`` pair in *x_sample* / *y_sample*, dumps a tailored YAML configuration and control/obs vectors, then submits the run via the platform plugin (or as a subprocess for the last member). After all jobs finish, discards members that did not converge correctly and moves the posterior control vectors into a common output directory. Args: self (Plugin): mode plugin providing ``workdir``, ``platform``, ``montecarlo``, and the reference YAML configuration. controlvect: control vector plugin (``xb`` is modified per member and restored on return). obsvect: observation vector plugin (``yobs`` is modified per member and restored on return). x_sample (np.ndarray): ensemble of prior control vectors, shape ``(nsample + 1, control_dim)``; row 0 is the reference. y_sample (np.ndarray): ensemble of perturbed observation vectors, shape ``(nsample + 1, obs_dim)``; row 0 is the reference. Returns: int: number of valid (converged) samples, excluding the reference. Raises: Exception: if the reference member (index 0) did not converge, or if a sample terminated with an unhandled exception. """ workdir = controlvect.workdir platform = self.platform montecarlo = self.montecarlo # Save Xb for later controlvect.xb_ref = copy.deepcopy(controlvect.xb) # Loop over state vector dimensions list_jobs = [] ijob = 0 for x, y in zip(x_sample, y_sample): if not montecarlo.compute_reference and ijob == 0: debug( "Skipping reference member (#0)" ) ijob += 1 continue base_dir = "{}/ensemble/sample_{:04d}/".format(workdir, ijob) path.init_dir(base_dir) # Check whether posterior control vector was dumped if os.path.isfile(f"{base_dir}/controlvect_final.pickle"): debug( f"Skipping sample #{ijob}. Inversion already computed:\n" f"{base_dir}/controlvect_final.pickle was successfully created." ) ijob += 1 continue # Dumps controlvect value from sample controlvect.xb[:] = copy.deepcopy(x) controlvect.dump( "{}/controlvect.pickle".format(base_dir), to_netcdf=controlvect.save_out_netcdf or montecarlo.save_out_netcdf, dir_netcdf="{}/controlvect/".format(base_dir)) # Dumps obsvect value from sample obsvect.yobs[:] = copy.deepcopy(y) obsvect.dump("{}/obsvect/".format(base_dir)) # Updating configuration dictionary yml_dict = \ self.from_yaml( self.reference_instances["reference_setup"].def_file) yml_dict["workdir"] = base_dir yml_dict["controlvect"] = { **yml_dict.get("controlvect", {}), **{"plugin": {"name": "standard", "version": "std"}, "reload_xb": True, "reload_file": "{}/controlvect.pickle".format(base_dir)} } yml_dict["obsvect"] = { **yml_dict.get("obsvect", {}), **{"plugin": {"name": "standard", "version": "std"}, "dir_obsvect": "{}/obsvect/".format(workdir)} } del yml_dict["mode"]["montecarlo"] del yml_dict["def_file"] # Dumps new yml file yml_file = "{}/config_base_{:04d}.yml".format(base_dir, ijob) with open(yml_file, "w") as outfile: ordered_dump(outfile, yml_dict) # Run the base function as an independent process job_file = os.path.join(base_dir, "job_pycif_base_{:04d}".format(ijob)) # Submit samples as other jobs, # or as a sub-process for the last to save resources if ijob + 1 == len(x_sample): info("Running the last sample as a sub-process") process = subprocess.Popen( f"{platform.python} -m pycif {yml_file}".split(), cwd=base_dir, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout, stderr = process.communicate() else: info("Submitting Monte Carlo sample inversion {} from {}" .format(ijob + 1, len(x_sample))) job_id = platform.submit_job( "{} -m pycif {}".format(platform.python, yml_file), job_file ) list_jobs.append(job_id) ijob += 1 # Check that jobs are over while not platform.check_jobs(list_jobs): time.sleep(platform.sleep_time) # Discard samples that did not converge correctly samples2discard = [] for ijob in range(len(x_sample)): if not montecarlo.compute_reference and ijob == 0: debug( "Skipping reference member (#0)" ) samples2discard.append(ijob) ijob += 1 continue # First check that the sample exited with no error base_dir = "{}/ensemble/sample_{:04d}/".format(workdir, ijob) logfile = self.from_yaml( self.reference_instances["reference_setup"].def_file )['logfile'] endstr = open(f"{base_dir}/{logfile}", "r").readlines()[-50:] if endstr[-21] != 'Thanks for using the Community Inversion Framework!\n': raise Exception( f"The sample #{ijob} terminated with an Exception. \n" f"Please check error in {base_dir}") # Read the cost function values file_simulator = "{}/simulator/cost.txt".format(base_dir) costs = pd.read_csv(file_simulator, header=None, index_col=0) # Check that the sample finished properly if -2 not in costs.index: raise Exception( f"WARNING! Sample #{ijob:04d} did not converge properly. \n" f"Please check the sample simulation in {base_dir}. \n" ) if costs.loc[-1, 3] < costs.loc[-2, 3]: info("Sample #{} did not converge correctly. Discarding it" .format(ijob)) samples2discard.append(ijob) if ijob == 0: raise Exception("The reference un-perturbed sample did not " "converge. Please check your configuration") # Move controlvects h_dir = "{}/ensemble/xa_emsemble/".format(workdir) path.init_dir(h_dir) target_index = 0 for ijob in range(len(x_sample)): if ijob in samples2discard: continue base_dir = "{}/ensemble/sample_{:04d}/".format(workdir, ijob) os.system( "mv {basedir}/controlvect_final.pickle " "{}/controlvect_final_{:04d}.pickle".format( h_dir, target_index, basedir=base_dir)) target_index += 1 # Re-set xb to initial value controlvect.xb = copy.deepcopy(controlvect.xb_ref) # Returning the number of valid samples return len(x_sample) - len(samples2discard) - 1