import copy
import subprocess
import os
import glob
import re
import pandas as pd
import time
import pickle
from ....utils import path
from logging import info, debug
from ....utils.yml import ordered_dump
[docs]
def compute_sample(self, controlvect, obsvect, x_sample, y_sample):
"""Submit one variational inversion per ensemble member and collect the results.
For each ``(x, y)`` pair in *x_sample* / *y_sample*, dumps a tailored
YAML configuration and control/obs vectors, then submits the run via the
platform plugin (or as a subprocess for the last member). After all jobs
finish, discards members that did not converge correctly and moves the
posterior control vectors into a common output directory.
Args:
self (Plugin): mode plugin providing ``workdir``, ``platform``,
``montecarlo``, and the reference YAML configuration.
controlvect: control vector plugin (``xb`` is modified per member and
restored on return).
obsvect: observation vector plugin (``yobs`` is modified per member
and restored on return).
x_sample (np.ndarray): ensemble of prior control vectors,
shape ``(nsample + 1, control_dim)``; row 0 is the reference.
y_sample (np.ndarray): ensemble of perturbed observation vectors,
shape ``(nsample + 1, obs_dim)``; row 0 is the reference.
Returns:
int: number of valid (converged) samples, excluding the reference.
Raises:
Exception: if the reference member (index 0) did not converge, or
if a sample terminated with an unhandled exception.
"""
workdir = controlvect.workdir
platform = self.platform
montecarlo = self.montecarlo
# Save Xb for later
controlvect.xb_ref = copy.deepcopy(controlvect.xb)
# Loop over state vector dimensions
list_jobs = []
ijob = 0
for x, y in zip(x_sample, y_sample):
if not montecarlo.compute_reference and ijob == 0:
debug(
"Skipping reference member (#0)"
)
ijob += 1
continue
base_dir = "{}/ensemble/sample_{:04d}/".format(workdir, ijob)
path.init_dir(base_dir)
# Check whether posterior control vector was dumped
if os.path.isfile(f"{base_dir}/controlvect_final.pickle"):
debug(
f"Skipping sample #{ijob}. Inversion already computed:\n"
f"{base_dir}/controlvect_final.pickle was successfully created."
)
ijob += 1
continue
# Dumps controlvect value from sample
controlvect.xb[:] = copy.deepcopy(x)
controlvect.dump(
"{}/controlvect.pickle".format(base_dir),
to_netcdf=controlvect.save_out_netcdf or montecarlo.save_out_netcdf,
dir_netcdf="{}/controlvect/".format(base_dir))
# Dumps obsvect value from sample
obsvect.yobs[:] = copy.deepcopy(y)
obsvect.dump("{}/obsvect/".format(base_dir))
# Updating configuration dictionary
yml_dict = \
self.from_yaml(
self.reference_instances["reference_setup"].def_file)
yml_dict["workdir"] = base_dir
yml_dict["controlvect"] = {
**yml_dict.get("controlvect", {}),
**{"plugin": {"name": "standard", "version": "std"},
"reload_xb": True,
"reload_file": "{}/controlvect.pickle".format(base_dir)}
}
yml_dict["obsvect"] = {
**yml_dict.get("obsvect", {}),
**{"plugin": {"name": "standard", "version": "std"},
"dir_obsvect": "{}/obsvect/".format(workdir)}
}
del yml_dict["mode"]["montecarlo"]
del yml_dict["def_file"]
# Dumps new yml file
yml_file = "{}/config_base_{:04d}.yml".format(base_dir, ijob)
with open(yml_file, "w") as outfile:
ordered_dump(outfile, yml_dict)
# Run the base function as an independent process
job_file = os.path.join(base_dir, "job_pycif_base_{:04d}".format(ijob))
# Submit samples as other jobs,
# or as a sub-process for the last to save resources
if ijob + 1 == len(x_sample):
info("Running the last sample as a sub-process")
process = subprocess.Popen(
f"{platform.python} -m pycif {yml_file}".split(),
cwd=base_dir,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = process.communicate()
else:
info("Submitting Monte Carlo sample inversion {} from {}"
.format(ijob + 1, len(x_sample)))
job_id = platform.submit_job(
"{} -m pycif {}".format(platform.python, yml_file),
job_file
)
list_jobs.append(job_id)
ijob += 1
# Check that jobs are over
while not platform.check_jobs(list_jobs):
time.sleep(platform.sleep_time)
# Discard samples that did not converge correctly
samples2discard = []
for ijob in range(len(x_sample)):
if not montecarlo.compute_reference and ijob == 0:
debug(
"Skipping reference member (#0)"
)
samples2discard.append(ijob)
ijob += 1
continue
# First check that the sample exited with no error
base_dir = "{}/ensemble/sample_{:04d}/".format(workdir, ijob)
logfile = self.from_yaml(
self.reference_instances["reference_setup"].def_file
)['logfile']
endstr = open(f"{base_dir}/{logfile}", "r").readlines()[-50:]
if endstr[-21] != 'Thanks for using the Community Inversion Framework!\n':
raise Exception(
f"The sample #{ijob} terminated with an Exception. \n"
f"Please check error in {base_dir}")
# Read the cost function values
file_simulator = "{}/simulator/cost.txt".format(base_dir)
costs = pd.read_csv(file_simulator, header=None, index_col=0)
# Check that the sample finished properly
if -2 not in costs.index:
raise Exception(
f"WARNING! Sample #{ijob:04d} did not converge properly. \n"
f"Please check the sample simulation in {base_dir}. \n"
)
if costs.loc[-1, 3] < costs.loc[-2, 3]:
info("Sample #{} did not converge correctly. Discarding it"
.format(ijob))
samples2discard.append(ijob)
if ijob == 0:
raise Exception("The reference un-perturbed sample did not "
"converge. Please check your configuration")
# Move controlvects
h_dir = "{}/ensemble/xa_emsemble/".format(workdir)
path.init_dir(h_dir)
target_index = 0
for ijob in range(len(x_sample)):
if ijob in samples2discard:
continue
base_dir = "{}/ensemble/sample_{:04d}/".format(workdir, ijob)
os.system(
"mv {basedir}/controlvect_final.pickle "
"{}/controlvect_final_{:04d}.pickle".format(
h_dir, target_index, basedir=base_dir))
target_index += 1
# Re-set xb to initial value
controlvect.xb = copy.deepcopy(controlvect.xb_ref)
# Returning the number of valid samples
return len(x_sample) - len(samples2discard) - 1