Surrogate Modeling with gpCAM

This example uses gpCAM to construct a global surrogate of f values using a Gaussian process.

In each iteration, a batch of points is produced for concurrent evaluation, maximizing uncertainty reduction.

Open in Colab

Ensure that libEnsemble, and gpCAM are installed via: pip install libensemble gpcam

Generator function

The gpCAM generator function is called persistent_gpCAM.

This persistent generator is started at the beginning of the Ensemble and runs until the Ensemble closes down.

This version (and others) of the gpCAM generator can be found at libensemble/gen_funcs/persistent_gpCAM.py and can be imported from that location when libEnsemble is installed as follows:

from libensemble.gen_funcs.persistent_gpCAM import persistent_gpCAM

import numpy as np
from numpy.lib.recfunctions import repack_fields
from gpcam import GPOptimizer as GP

# Standard options for manager/generator comms
from libensemble.message_numbers import EVAL_GEN_TAG, FINISHED_PERSISTENT_GEN_TAG, PERSIS_STOP, STOP_TAG
from libensemble.tools.persistent_support import PersistentSupport

def persistent_gpCAM(H_in, persis_info, gen_specs, libE_info):
    """Run a batched gpCAM model to create a surrogate"""

    # Initialize
    rng, batch_size, n, lb, ub, x_new, y_new, ps = _initialize_gpcAM(gen_specs["user"], libE_info)
    ask_max_iter = gen_specs["user"].get("ask_max_iter") or 10
    test_points = _read_testpoints(gen_specs["user"])
    noise = 1e-8  # Initializes noise
    my_gp = None

    # Start with a batch of random points
    x_new = rng.uniform(lb, ub, (batch_size, n))
    H_o = np.zeros(batch_size, dtype=gen_specs["out"])
    H_o["x"] = x_new
    tag, Work, calc_in = ps.send_recv(H_o)  # Send random points for evaluation and wait for results

    while tag not in [STOP_TAG, PERSIS_STOP]:
        y_new = np.atleast_2d(calc_in["f"]).T
        my_gp = _update_gp(my_gp, x_new, y_new, test_points, persis_info, noise)

        # Request new points
        x_new = my_gp.ask(
            input_set=np.column_stack((lb, ub)),
            n=batch_size,
            pop_size=batch_size,
            acquisition_function="total correlation",
            max_iter=ask_max_iter,  # Larger takes longer. gpCAM default is 20.
        )["x"]

        H_o = np.zeros(batch_size, dtype=gen_specs["out"])
        H_o["x"] = x_new
        tag, Work, calc_in = ps.send_recv(H_o)  # Send points for evaluation and wait for results

    # If final points were returned update the model
    if calc_in is not None:
        y_new = np.atleast_2d(calc_in["f"]).T
        my_gp = _update_gp(my_gp, x_new, y_new, test_points, persis_info, noise)

    return None, persis_info, FINISHED_PERSISTENT_GEN_TAG

Common acquisition functions include:

Uncertainty reduction:

  • “variance” (default): The optimizer will produce N best points.

  • “total correlation”: More expensive but points produced are self-avoiding.

Bayesian optimization:

These produce one point at a time unless using the HGDL option.

  • “ucb” / “lcb”: Upper/Lower Confidence Bound.

  • “expected improvement”: Expected Improvement.

For more options see: https://gpcam.lbl.gov/examples/acquisition-functions


The following code adds the functions used by persistent_gpCAM.

_update_gp is where the GP is fed the data and trained.

def _initialize_gpcAM(user_specs, libE_info):
    """Extract user params"""
    rng_seed = user_specs.get("rng_seed")  # will default to None
    rng = np.random.default_rng(rng_seed)  # Create random stream
    b = user_specs["batch_size"]
    lb = np.array(user_specs["lb"])
    ub = np.array(user_specs["ub"])
    n = len(lb)  # no. of dimensions
    init_x = np.empty((0, n))
    init_y = np.empty((0, 1))
    ps = PersistentSupport(libE_info, EVAL_GEN_TAG)  # init comms
    return rng, b, n, lb, ub, init_x, init_y, ps


def _read_testpoints(U):
    """Read numpy file containing evaluated points for measuring GP error"""
    test_points_file = U.get("test_points_file")
    if test_points_file is None:
        return None
    test_points = np.load(test_points_file)
    test_points = repack_fields(test_points[["x", "f"]])
    return test_points


def _compare_testpoints(my_gp, test_points, persis_info):
    """Compare model at test points"""
    if test_points is None:
        return
    f_est = my_gp.posterior_mean(test_points["x"])["f(x)"]
    mse = np.mean((f_est - test_points["f"]) ** 2)
    persis_info.setdefault("mean_squared_error", []).append(float(mse))


def _update_gp(my_gp, x_new, y_new, test_points, persis_info, noise):
    """Update Gaussian process with new points and train"""
    noise_arr = noise * np.ones(len(y_new))  # Initializes noise
    if my_gp is None:
        my_gp = GP(x_new, y_new.flatten(), noise_variances=noise_arr)
    else:
        my_gp.tell(x_new, y_new.flatten(), noise_variances=noise_arr, append=True)
    my_gp.train()

    if test_points is not None:
        _compare_testpoints(my_gp, test_points, persis_info)

    return my_gp

Simulator function

Simulator functions or sim_fs perform calculations based on parameters created in the generator function. Each worker runs a copy of this function in parallel.

The function here is the simple 2D six_hump_camel, for demonstration purposes.

For running applications using parallel resources in the simulator see the forces examples.

# Define our simulation function
import numpy as np

def six_hump_camel(H, persis_info, sim_specs, _):
    """Six-Hump Camel sim_f."""

    batch = len(H["x"])  # Num evaluations each sim_f call.
    H_o = np.zeros(batch, dtype=sim_specs["out"])  # Define output array H

    for i, x in enumerate(H["x"]):
        H_o["f"][i] = six_hump_camel_func(x)  # Function evaluations placed into H

    return H_o, persis_info


def six_hump_camel_func(x):
    """Six-Hump Camel function definition"""
    x1 = x[0]
    x2 = x[1]
    term1 = (4 - 2.1 * x1**2 + (x1**4) / 3) * x1**2
    term2 = x1 * x2
    term3 = (-4 + 4 * x2**2) * x2**2

    return term1 + term2 + term3

Calling Script

Our calling script configures libEnsemble, the generator function, and the simulator function. It then create the ensemble object and runs the ensemble.

First we will create a cleanup script so we can easily re-run.

# To rerun this notebook, we need to delete the ensemble directory.
import shutil
def cleanup():
    try:
        shutil.rmtree("ensemble")
    except:
        pass

This calling script imports the Gen and Sim functions from the locations in the installed libensemble package. If you wish to make your own functions based on the above, those can be imported instead.

import numpy as np
from pprint import pprint

from libensemble import Ensemble
from libensemble.specs import LibeSpecs, GenSpecs, SimSpecs, AllocSpecs, ExitCriteria

# If importing from libensemble
from libensemble.gen_funcs.persistent_gpCAM import persistent_gpCAM
from libensemble.sim_funcs.six_hump_camel import six_hump_camel

from libensemble.alloc_funcs.start_only_persistent import only_persistent_gens
import warnings

warnings.filterwarnings("ignore", message="Default hyperparameter_bounds")
warnings.filterwarnings("ignore", message="Hyperparameters initialized")

nworkers = 4

# When using gen_on_manager, nworkers is number of concurrent sims.
# final_gen_send means the last evaluated points are returned to the generator to update the model.
libE_specs = LibeSpecs(nworkers=nworkers, gen_on_manager=True, final_gen_send=True)

n = 2  # Input dimensions
batch_size = 4
num_batches = 6

gen_specs = GenSpecs(
    gen_f=persistent_gpCAM,        # Generator function
    persis_in=["f"],               # Objective, defined in sim, is returned to gen
    outputs=[("x", float, (n,))],  # Parameters (name, type, size)
    user={
        "batch_size": batch_size,
        "lb": np.array([-2, -1]),  # lower boundaries for n dimensions
        "ub": np.array([2, 1]),    # upper boundaries for n dimensions
        "ask_max_iter": 5,         # Number of iterations for ask (default 20)
        "rng_seed": 0,
    },
)

sim_specs = SimSpecs(
    sim_f=six_hump_camel,      # Simulator function
    inputs=["x"],              # Input field names. "x" defined in gen
    outputs=[("f", float)],    # Objective
)

# Starts one persistent generator. Simulated values are returned in batch.
alloc_specs = AllocSpecs(
    alloc_f=only_persistent_gens,
    user={"async_return": False},  # False = batch returns
)

exit_criteria = ExitCriteria(sim_max=num_batches*batch_size)

# Initialize and run the ensemble.
ensemble = Ensemble(
    libE_specs=libE_specs,
    sim_specs=sim_specs,
    gen_specs=gen_specs,
    alloc_specs=alloc_specs,
    exit_criteria=exit_criteria,
)

At the end of our calling script we run the ensemble.

# To ensure re-running works - clean output and reset any persistent information
cleanup()
ensemble.persis_info = {}

H, persis_info, flag = ensemble.run()  # Start the ensemble. Blocks until completion.
ensemble.save_output("H_array", append_attrs=False)  # Save H (history of all evaluated points) to file
pprint(H[["sim_id", "x", "f"]][:16]) # See first 16 results

Rerun and test model at known points

To see how the accuracy of the surrogate model improves, we can use previously evaluated points as test points and run again with a different seed.

ensemble.gen_specs.user["rng_seed"] = 123
ensemble.gen_specs.user["test_points_file"] = "H_array.npy"  # our previous file

# To ensure re-running works - clean output and reset any persistent information
cleanup()
ensemble.persis_info = {}

H, persis_info, flag = ensemble.run()
print(persis_info)

Viewing model progression

Now we can check how our model’s values compared against the values at known test points as the ensemble progresses. The comparison is based on the mean squared error between the gpCAM model and our known values at the test points.

import matplotlib
import matplotlib.pyplot as plt

# Get "mean_squared_error" from generators return (worker 0 as we ran gen_on_manager)
mse = persis_info[0]["mean_squared_error"]
niter = len(mse)
num_sims = list(range(batch_size, (niter * batch_size) + 1, batch_size))

# Plotting the data
markersize = 10
plt.figure(figsize=(10, 5))
plt.plot(
    num_sims, mse, marker="^", markeredgecolor="black", markeredgewidth=2,
    markersize=markersize, linewidth=2, label="Mean squared error"
)
plt.xticks(num_sims)

# Labeling the axes and the legend
plt.title('Mean Squared Error at test points')
plt.xlabel("Number of simulations")
plt.ylabel('Mean squared error (rad$^2$)')
legend = plt.legend(framealpha=1, edgecolor="black")  # Increase edge width here
plt.grid(True)
plt.show()

The plot should look similar to the following.

Note

The graph may differ between runs because, although we seed libEnsemble’s random number generator, gpCAM introduces some randomness when initializing hyperparameters.

gpcam_model_improvement