persistent_sampling

Persistent generator providing points using sampling

persistent_sampling.persistent_uniform(_, persis_info, gen_specs, libE_info)

This generation function always enters into persistent mode and returns gen_specs["initial_batch_size"] uniformly sampled points the first time it is called. Afterwards, it returns the number of points given. This can be used in either a batch or asynchronous mode by adjusting the allocation function.

persistent_sampling.persistent_uniform_final_update(_, persis_info, gen_specs, libE_info)

Assuming the value "f" returned from sim_f is stochastic, this generation is updating an estimated mean "f_est" of the sim_f output at each of the corners of the domain.

persistent_sampling.persistent_request_shutdown(_, persis_info, gen_specs, libE_info)

This generation function is similar in structure to persistent_uniform, but uses a count to test exiting on a threshold value. This principle can be used with a supporting allocation function (e.g. start_only_persistent) to shutdown an ensemble when a condition is met.

persistent_sampling.uniform_nonblocking(_, persis_info, gen_specs, libE_info)

This generation function is designed to test non-blocking receives.

persistent_sampling.batched_history_matching(_, persis_info, gen_specs, libE_info)

Given - sim_f with an input of x with len(x)=n - b, the batch size of points to generate - q<b, the number of best samples to use in the following iteration

Pseudocode: Let (mu, Sigma) denote a mean and covariance matrix initialized to the origin and the identity, respectively.

While true (batch synchronous for now):

Draw b samples x_1, … , x_b from MVN( mu, Sigma) Evaluate f(x_1), … , f(x_b) and determine the set of q x_i whose f(x_i) values are smallest (breaking ties lexicographically) Update (mu, Sigma) based on the sample mean and sample covariance of these q x values.

persistent_sampling.persistent_uniform_with_cancellations(_, persis_info, gen_specs, libE_info)
persistent_sampling.py
  1"""Persistent generator providing points using sampling"""
  2
  3import numpy as np
  4
  5from libensemble.message_numbers import EVAL_GEN_TAG, FINISHED_PERSISTENT_GEN_TAG, PERSIS_STOP, STOP_TAG
  6from libensemble.tools import get_rng
  7from libensemble.tools.persistent_support import PersistentSupport
  8
  9__all__ = [
 10    "persistent_uniform",
 11    "persistent_uniform_final_update",
 12    "persistent_request_shutdown",
 13    "uniform_nonblocking",
 14    "batched_history_matching",
 15    "persistent_uniform_with_cancellations",
 16]
 17
 18
 19def _get_user_params(user_specs, gen_specs):
 20    """Extract user params"""
 21    b = gen_specs.get("initial_batch_size") or user_specs.get("initial_batch_size") or gen_specs.get("init_sample_size")
 22    ub = user_specs["ub"]
 23    lb = user_specs["lb"]
 24    n = len(lb)  # dimension
 25    assert isinstance(b, int), "Batch size must be an integer"
 26    assert isinstance(n, int), "Dimension must be an integer"
 27    assert isinstance(lb, np.ndarray), "lb must be a numpy array"
 28    assert isinstance(ub, np.ndarray), "ub must be a numpy array"
 29    return b, n, lb, ub
 30
 31
 32def persistent_uniform(_, persis_info, gen_specs, libE_info):
 33    """
 34    This generation function always enters into persistent mode and returns
 35    ``gen_specs["initial_batch_size"]`` uniformly sampled points the first time it
 36    is called. Afterwards, it returns the number of points given. This can be
 37    used in either a batch or asynchronous mode by adjusting the allocation
 38    function.
 39
 40    .. seealso::
 41        `test_persistent_uniform_sampling.py <https://github.com/Libensemble/libensemble/blob/develop/libensemble/tests/functionality_tests/test_persistent_uniform_sampling.py>`_
 42        `test_persistent_uniform_sampling_async.py <https://github.com/Libensemble/libensemble/blob/develop/libensemble/tests/functionality_tests/test_persistent_uniform_sampling_async.py>`_
 43    """  # noqa
 44    rng = get_rng(gen_specs, libE_info)
 45
 46    b, n, lb, ub = _get_user_params(gen_specs.get("user", {}), gen_specs)
 47    ps = PersistentSupport(libE_info, EVAL_GEN_TAG)
 48
 49    # Send batches until manager sends stop tag
 50    tag = None
 51    while tag not in [STOP_TAG, PERSIS_STOP]:
 52        H_o = np.zeros(b, dtype=gen_specs["out"])
 53        H_o["x"] = rng.uniform(lb, ub, (b, n))
 54        if "obj_component" in H_o.dtype.fields:
 55            H_o["obj_component"] = rng.integers(low=0, high=gen_specs["user"]["num_components"], size=b)
 56        tag, Work, calc_in = ps.send_recv(H_o)
 57        if hasattr(calc_in, "__len__"):
 58            b = len(calc_in)
 59
 60    return None, persis_info, FINISHED_PERSISTENT_GEN_TAG
 61
 62
 63def persistent_uniform_final_update(_, persis_info, gen_specs, libE_info):
 64    """
 65    Assuming the value ``"f"`` returned from sim_f is stochastic, this
 66    generation is updating an estimated mean ``"f_est"`` of the sim_f output at
 67    each of the corners of the domain.
 68
 69    .. seealso::
 70        `test_persistent_uniform_sampling_running_mean.py <https://github.com/Libensemble/libensemble/blob/develop/libensemble/tests/functionality_tests/test_persistent_uniform_sampling_running_mean.py>`_
 71    """  # noqa
 72
 73    b, n, lb, ub = _get_user_params(gen_specs.get("user", {}), gen_specs)
 74    ps = PersistentSupport(libE_info, EVAL_GEN_TAG)
 75
 76    def generate_corners(x, y):
 77        n = len(x)
 78        corner_indices = np.arange(2**n)
 79        corners = []
 80        for index in corner_indices:
 81            corner = [x[i] if index & (1 << i) else y[i] for i in range(n)]
 82            corners.append(corner)
 83        return corners
 84
 85    def sample_corners_with_probability(corners, p, b):
 86        selected_corners = np.random.choice(len(corners), size=b, p=p)
 87        sampled_corners = [corners[i] for i in selected_corners]
 88        return sampled_corners, selected_corners
 89
 90    corners = generate_corners(lb, ub)
 91
 92    # Start with equal probabilities
 93    p = np.ones(2**n) / 2**n
 94
 95    running_total = np.nan * np.ones(2**n)
 96    number_of_samples = np.zeros(2**n)
 97    sent = np.array([], dtype=int)
 98
 99    # Send batches of `b` points until manager sends stop tag
100    tag = None
101    next_id = 0
102    while tag not in [STOP_TAG, PERSIS_STOP]:
103        H_o = np.zeros(b, dtype=gen_specs["out"])
104        H_o["sim_id"] = range(next_id, next_id + b)
105        next_id += b
106
107        sampled_corners, corner_ids = sample_corners_with_probability(corners, p, b)
108
109        H_o["corner_id"] = corner_ids
110        H_o["x"] = sampled_corners
111        sent = np.append(sent, corner_ids)
112
113        tag, Work, calc_in = ps.send_recv(H_o)
114        if hasattr(calc_in, "__len__"):
115            b = len(calc_in)
116            for row in calc_in:
117                number_of_samples[row["corner_id"]] += 1
118                if np.isnan(running_total[row["corner_id"]]):
119                    running_total[row["corner_id"]] = row["f"]
120                else:
121                    running_total[row["corner_id"]] += row["f"]
122
123    # Having received a PERSIS_STOP, update f_est field for all points and return
124    f_est = running_total / number_of_samples
125    H_o = np.zeros(len(sent), dtype=[("sim_id", int), ("corner_id", int), ("f_est", float)])
126    for count, i in enumerate(sent):
127        H_o["sim_id"][count] = count
128        H_o["corner_id"][count] = i
129        H_o["f_est"][count] = f_est[i]
130
131    return H_o, persis_info, FINISHED_PERSISTENT_GEN_TAG
132
133
134def persistent_request_shutdown(_, persis_info, gen_specs, libE_info):
135    """
136    This generation function is similar in structure to persistent_uniform,
137    but uses a count to test exiting on a threshold value. This principle can
138    be used with a supporting allocation function (e.g. start_only_persistent)
139    to shutdown an ensemble when a condition is met.
140
141    .. seealso::
142        `test_persistent_uniform_gen_decides_stop.py <https://github.com/Libensemble/libensemble/blob/develop/libensemble/tests/functionality_tests/test_persistent_uniform_gen_decides_stop.py>`_
143    """  # noqa
144    rng = get_rng(gen_specs, libE_info)
145    b, n, lb, ub = _get_user_params(gen_specs.get("user", {}), gen_specs)
146    shutdown_limit = gen_specs["user"]["shutdown_limit"]
147    f_count = 0
148    ps = PersistentSupport(libE_info, EVAL_GEN_TAG)
149
150    # Send batches until manager sends stop tag
151    tag = None
152    while tag not in [STOP_TAG, PERSIS_STOP]:
153        H_o = np.zeros(b, dtype=gen_specs["out"])
154        H_o["x"] = rng.uniform(lb, ub, (b, n))
155        tag, Work, calc_in = ps.send_recv(H_o)
156        if hasattr(calc_in, "__len__"):
157            b = len(calc_in)
158        f_count += b
159        if f_count >= shutdown_limit:
160            print("Reached threshold.", f_count, flush=True)
161            break  # End the persistent gen
162
163    return None, persis_info, FINISHED_PERSISTENT_GEN_TAG
164
165
166def uniform_nonblocking(_, persis_info, gen_specs, libE_info):
167    """
168    This generation function is designed to test non-blocking receives.
169
170    .. seealso::
171        `test_persistent_uniform_sampling.py <https://github.com/Libensemble/libensemble/blob/develop/libensemble/tests/functionality_tests/test_persistent_uniform_sampling.py>`_
172    """  # noqa
173    rng = get_rng(gen_specs, libE_info)
174    b, n, lb, ub = _get_user_params(gen_specs.get("user", {}), gen_specs)
175    ps = PersistentSupport(libE_info, EVAL_GEN_TAG)
176
177    # Send batches until manager sends stop tag
178    tag = None
179    while tag not in [STOP_TAG, PERSIS_STOP]:
180        H_o = np.zeros(b, dtype=gen_specs["out"])
181        H_o["x"] = rng.uniform(lb, ub, (b, n))
182        ps.send(H_o)
183
184        received = False
185        spin_count = 0
186        while not received:
187            tag, Work, calc_in = ps.recv(blocking=False)
188            if tag is not None:
189                received = True
190            else:
191                spin_count += 1
192
193        persis_info["spin_count"] = spin_count
194
195        if hasattr(calc_in, "__len__"):
196            b = len(calc_in)
197
198    return None, persis_info, FINISHED_PERSISTENT_GEN_TAG
199
200
201def batched_history_matching(_, persis_info, gen_specs, libE_info):
202    """
203    Given
204    - sim_f with an input of x with len(x)=n
205    - b, the batch size of points to generate
206    - q<b, the number of best samples to use in the following iteration
207
208    Pseudocode:
209    Let (mu, Sigma) denote a mean and covariance matrix initialized to the
210    origin and the identity, respectively.
211
212    While true (batch synchronous for now):
213
214        Draw b samples x_1, ... , x_b from MVN( mu, Sigma)
215        Evaluate f(x_1), ... , f(x_b) and determine the set of q x_i whose f(x_i) values are smallest (breaking ties lexicographically)
216        Update (mu, Sigma) based on the sample mean and sample covariance of these q x values.
217
218    .. seealso::
219        `test_persistent_uniform_sampling.py <https://github.com/Libensemble/libensemble/blob/develop/libensemble/tests/functionality_tests/test_persistent_uniform_sampling.py>`_
220    """  # noqa
221    rng = get_rng(gen_specs, libE_info)
222    lb = gen_specs["user"]["lb"]
223
224    n = len(lb)
225    b = (
226        gen_specs.get("initial_batch_size")
227        or gen_specs["user"].get("initial_batch_size")
228        or gen_specs.get("init_sample_size")
229    )
230    q = gen_specs["user"]["num_best_vals"]
231    ps = PersistentSupport(libE_info, EVAL_GEN_TAG)
232
233    mu = np.zeros(n)
234    Sigma = np.eye(n)
235    tag = None
236
237    while tag not in [STOP_TAG, PERSIS_STOP]:
238        H_o = np.zeros(b, dtype=gen_specs["out"])
239        H_o["x"] = rng.multivariate_normal(mu, Sigma, b)
240
241        # Send data and get next assignment
242        tag, Work, calc_in = ps.send_recv(H_o)
243        if calc_in is not None:
244            all_inds = np.argsort(calc_in["f"])
245            best_inds = all_inds[:q]
246            mu = np.mean(H_o["x"][best_inds], axis=0)
247            Sigma = np.cov(H_o["x"][best_inds].T)
248
249    return None, persis_info, FINISHED_PERSISTENT_GEN_TAG
250
251
252def persistent_uniform_with_cancellations(_, persis_info, gen_specs, libE_info):
253    rng = get_rng(gen_specs, libE_info)
254    ub = gen_specs["user"]["ub"]
255    lb = gen_specs["user"]["lb"]
256    n = len(lb)
257    b = (
258        gen_specs.get("initial_batch_size")
259        or gen_specs["user"].get("initial_batch_size")
260        or gen_specs.get("init_sample_size")
261    )
262
263    # Start cancelling points from half initial batch onward
264    cancel_from = b // 2  # Should get at least this many points back
265
266    ps = PersistentSupport(libE_info, EVAL_GEN_TAG)
267
268    # Send batches until manager sends stop tag
269    tag = None
270    while tag not in [STOP_TAG, PERSIS_STOP]:
271        H_o = np.zeros(b, dtype=gen_specs["out"])
272        H_o["x"] = rng.uniform(lb, ub, (b, n))
273        tag, Work, calc_in = ps.send_recv(H_o)
274
275        if hasattr(calc_in, "__len__"):
276            b = len(calc_in)
277
278            # Cancel as many points as got back
279            cancel_ids = list(range(cancel_from, cancel_from + b))
280            cancel_from += b
281            ps.request_cancel_sim_ids(cancel_ids)
282
283    return None, persis_info, FINISHED_PERSISTENT_GEN_TAG