persistent_sampling

Persistent generator providing points using sampling

persistent_sampling.persistent_uniform(_, persis_info, gen_specs, libE_info)

Persistent Input Fields: ['sim_id']

Output Datatypes: [('x', <class 'float'>, (2,))]

This generation function always enters into persistent mode and returns gen_specs["initial_batch_size"] uniformly sampled points the first time it is called. Afterwards, it returns the number of points given. This can be used in either a batch or asynchronous mode by adjusting the allocation function.

persistent_sampling.persistent_uniform_final_update(_, persis_info, gen_specs, libE_info)

Assuming the value "f" returned from sim_f is stochastic, this generation is updating an estimated mean "f_est" of the sim_f output at each of the corners of the domain.

persistent_sampling.persistent_request_shutdown(_, persis_info, gen_specs, libE_info)

This generation function is similar in structure to persistent_uniform, but uses a count to test exiting on a threshold value. This principle can be used with a supporting allocation function (e.g. start_only_persistent) to shutdown an ensemble when a condition is met.

persistent_sampling.uniform_nonblocking(_, persis_info, gen_specs, libE_info)

This generation function is designed to test non-blocking receives.

persistent_sampling.batched_history_matching(_, persis_info, gen_specs, libE_info)

Given - sim_f with an input of x with len(x)=n - b, the batch size of points to generate - q<b, the number of best samples to use in the following iteration

Pseudocode: Let (mu, Sigma) denote a mean and covariance matrix initialized to the origin and the identity, respectively.

While true (batch synchronous for now):

Draw b samples x_1, … , x_b from MVN( mu, Sigma) Evaluate f(x_1), … , f(x_b) and determine the set of q x_i whose f(x_i) values are smallest (breaking ties lexicographically) Update (mu, Sigma) based on the sample mean and sample covariance of these q x values.

persistent_sampling.persistent_uniform_with_cancellations(_, persis_info, gen_specs, libE_info)
persistent_sampling.py
  1"""Persistent generator providing points using sampling"""
  2
  3import numpy as np
  4
  5from libensemble.message_numbers import EVAL_GEN_TAG, FINISHED_PERSISTENT_GEN_TAG, PERSIS_STOP, STOP_TAG
  6from libensemble.specs import output_data, persistent_input_fields
  7from libensemble.tools.persistent_support import PersistentSupport
  8
  9__all__ = [
 10    "persistent_uniform",
 11    "persistent_uniform_final_update",
 12    "persistent_request_shutdown",
 13    "uniform_nonblocking",
 14    "batched_history_matching",
 15    "persistent_uniform_with_cancellations",
 16]
 17
 18
 19def _get_user_params(user_specs):
 20    """Extract user params"""
 21    b = user_specs["initial_batch_size"]
 22    ub = user_specs["ub"]
 23    lb = user_specs["lb"]
 24    n = len(lb)  # dimension
 25    assert isinstance(b, int), "Batch size must be an integer"
 26    assert isinstance(n, int), "Dimension must be an integer"
 27    assert isinstance(lb, np.ndarray), "lb must be a numpy array"
 28    assert isinstance(ub, np.ndarray), "ub must be a numpy array"
 29    return b, n, lb, ub
 30
 31
 32@persistent_input_fields(["sim_id"])
 33@output_data([("x", float, (2,))])  # The dimesion of 2 is  a default and can be overwritten
 34def persistent_uniform(_, persis_info, gen_specs, libE_info):
 35    """
 36    This generation function always enters into persistent mode and returns
 37    ``gen_specs["initial_batch_size"]`` uniformly sampled points the first time it
 38    is called. Afterwards, it returns the number of points given. This can be
 39    used in either a batch or asynchronous mode by adjusting the allocation
 40    function.
 41
 42    .. seealso::
 43        `test_persistent_uniform_sampling.py <https://github.com/Libensemble/libensemble/blob/develop/libensemble/tests/functionality_tests/test_persistent_uniform_sampling.py>`_
 44        `test_persistent_uniform_sampling_async.py <https://github.com/Libensemble/libensemble/blob/develop/libensemble/tests/functionality_tests/test_persistent_uniform_sampling_async.py>`_
 45    """  # noqa
 46
 47    b, n, lb, ub = _get_user_params(gen_specs["user"])
 48    ps = PersistentSupport(libE_info, EVAL_GEN_TAG)
 49
 50    # Send batches until manager sends stop tag
 51    tag = None
 52    while tag not in [STOP_TAG, PERSIS_STOP]:
 53        H_o = np.zeros(b, dtype=gen_specs["out"])
 54        H_o["x"] = persis_info["rand_stream"].uniform(lb, ub, (b, n))
 55        if "obj_component" in H_o.dtype.fields:
 56            H_o["obj_component"] = persis_info["rand_stream"].integers(
 57                low=0, high=gen_specs["user"]["num_components"], size=b
 58            )
 59        tag, Work, calc_in = ps.send_recv(H_o)
 60        if hasattr(calc_in, "__len__"):
 61            b = len(calc_in)
 62
 63    return None, persis_info, FINISHED_PERSISTENT_GEN_TAG
 64
 65
 66def persistent_uniform_final_update(_, persis_info, gen_specs, libE_info):
 67    """
 68    Assuming the value ``"f"`` returned from sim_f is stochastic, this
 69    generation is updating an estimated mean ``"f_est"`` of the sim_f output at
 70    each of the corners of the domain.
 71
 72    .. seealso::
 73        `test_persistent_uniform_sampling_running_mean.py <https://github.com/Libensemble/libensemble/blob/develop/libensemble/tests/functionality_tests/test_persistent_uniform_sampling_running_mean.py>`_
 74    """  # noqa
 75
 76    b, n, lb, ub = _get_user_params(gen_specs["user"])
 77    ps = PersistentSupport(libE_info, EVAL_GEN_TAG)
 78
 79    def generate_corners(x, y):
 80        n = len(x)
 81        corner_indices = np.arange(2**n)
 82        corners = []
 83        for index in corner_indices:
 84            corner = [x[i] if index & (1 << i) else y[i] for i in range(n)]
 85            corners.append(corner)
 86        return corners
 87
 88    def sample_corners_with_probability(corners, p, b):
 89        selected_corners = np.random.choice(len(corners), size=b, p=p)
 90        sampled_corners = [corners[i] for i in selected_corners]
 91        return sampled_corners, selected_corners
 92
 93    corners = generate_corners(lb, ub)
 94
 95    # Start with equal probabilities
 96    p = np.ones(2**n) / 2**n
 97
 98    running_total = np.nan * np.ones(2**n)
 99    number_of_samples = np.zeros(2**n)
100    sent = np.array([], dtype=int)
101
102    # Send batches of `b` points until manager sends stop tag
103    tag = None
104    next_id = 0
105    while tag not in [STOP_TAG, PERSIS_STOP]:
106        H_o = np.zeros(b, dtype=gen_specs["out"])
107        H_o["sim_id"] = range(next_id, next_id + b)
108        next_id += b
109
110        sampled_corners, corner_ids = sample_corners_with_probability(corners, p, b)
111
112        H_o["corner_id"] = corner_ids
113        H_o["x"] = sampled_corners
114        sent = np.append(sent, corner_ids)
115
116        tag, Work, calc_in = ps.send_recv(H_o)
117        if hasattr(calc_in, "__len__"):
118            b = len(calc_in)
119            for row in calc_in:
120                number_of_samples[row["corner_id"]] += 1
121                if np.isnan(running_total[row["corner_id"]]):
122                    running_total[row["corner_id"]] = row["f"]
123                else:
124                    running_total[row["corner_id"]] += row["f"]
125
126    # Having received a PERSIS_STOP, update f_est field for all points and return
127    # For manager to honor final H_o return, must have set libE_specs["use_persis_return_gen"] = True
128    f_est = running_total / number_of_samples
129    H_o = np.zeros(len(sent), dtype=[("sim_id", int), ("corner_id", int), ("f_est", float)])
130    for count, i in enumerate(sent):
131        H_o["sim_id"][count] = count
132        H_o["corner_id"][count] = i
133        H_o["f_est"][count] = f_est[i]
134
135    return H_o, persis_info, FINISHED_PERSISTENT_GEN_TAG
136
137
138def persistent_request_shutdown(_, persis_info, gen_specs, libE_info):
139    """
140    This generation function is similar in structure to persistent_uniform,
141    but uses a count to test exiting on a threshold value. This principle can
142    be used with a supporting allocation function (e.g. start_only_persistent)
143    to shutdown an ensemble when a condition is met.
144
145    .. seealso::
146        `test_persistent_uniform_gen_decides_stop.py <https://github.com/Libensemble/libensemble/blob/develop/libensemble/tests/functionality_tests/test_persistent_uniform_gen_decides_stop.py>`_
147    """  # noqa
148    b, n, lb, ub = _get_user_params(gen_specs["user"])
149    shutdown_limit = gen_specs["user"]["shutdown_limit"]
150    f_count = 0
151    ps = PersistentSupport(libE_info, EVAL_GEN_TAG)
152
153    # Send batches until manager sends stop tag
154    tag = None
155    while tag not in [STOP_TAG, PERSIS_STOP]:
156        H_o = np.zeros(b, dtype=gen_specs["out"])
157        H_o["x"] = persis_info["rand_stream"].uniform(lb, ub, (b, n))
158        tag, Work, calc_in = ps.send_recv(H_o)
159        if hasattr(calc_in, "__len__"):
160            b = len(calc_in)
161        f_count += b
162        if f_count >= shutdown_limit:
163            print("Reached threshold.", f_count, flush=True)
164            break  # End the persistent gen
165
166    return None, persis_info, FINISHED_PERSISTENT_GEN_TAG
167
168
169def uniform_nonblocking(_, persis_info, gen_specs, libE_info):
170    """
171    This generation function is designed to test non-blocking receives.
172
173    .. seealso::
174        `test_persistent_uniform_sampling.py <https://github.com/Libensemble/libensemble/blob/develop/libensemble/tests/functionality_tests/test_persistent_uniform_sampling.py>`_
175    """  # noqa
176    b, n, lb, ub = _get_user_params(gen_specs["user"])
177    ps = PersistentSupport(libE_info, EVAL_GEN_TAG)
178
179    # Send batches until manager sends stop tag
180    tag = None
181    while tag not in [STOP_TAG, PERSIS_STOP]:
182        H_o = np.zeros(b, dtype=gen_specs["out"])
183        H_o["x"] = persis_info["rand_stream"].uniform(lb, ub, (b, n))
184        ps.send(H_o)
185
186        received = False
187        spin_count = 0
188        while not received:
189            tag, Work, calc_in = ps.recv(blocking=False)
190            if tag is not None:
191                received = True
192            else:
193                spin_count += 1
194
195        persis_info["spin_count"] = spin_count
196
197        if hasattr(calc_in, "__len__"):
198            b = len(calc_in)
199
200    return None, persis_info, FINISHED_PERSISTENT_GEN_TAG
201
202
203def batched_history_matching(_, persis_info, gen_specs, libE_info):
204    """
205    Given
206    - sim_f with an input of x with len(x)=n
207    - b, the batch size of points to generate
208    - q<b, the number of best samples to use in the following iteration
209
210    Pseudocode:
211    Let (mu, Sigma) denote a mean and covariance matrix initialized to the
212    origin and the identity, respectively.
213
214    While true (batch synchronous for now):
215
216        Draw b samples x_1, ... , x_b from MVN( mu, Sigma)
217        Evaluate f(x_1), ... , f(x_b) and determine the set of q x_i whose f(x_i) values are smallest (breaking ties lexicographically)
218        Update (mu, Sigma) based on the sample mean and sample covariance of these q x values.
219
220    .. seealso::
221        `test_persistent_uniform_sampling.py <https://github.com/Libensemble/libensemble/blob/develop/libensemble/tests/functionality_tests/test_persistent_uniform_sampling.py>`_
222    """  # noqa
223    lb = gen_specs["user"]["lb"]
224
225    n = len(lb)
226    b = gen_specs["user"]["initial_batch_size"]
227    q = gen_specs["user"]["num_best_vals"]
228    ps = PersistentSupport(libE_info, EVAL_GEN_TAG)
229
230    mu = np.zeros(n)
231    Sigma = np.eye(n)
232    tag = None
233
234    while tag not in [STOP_TAG, PERSIS_STOP]:
235        H_o = np.zeros(b, dtype=gen_specs["out"])
236        H_o["x"] = persis_info["rand_stream"].multivariate_normal(mu, Sigma, b)
237
238        # Send data and get next assignment
239        tag, Work, calc_in = ps.send_recv(H_o)
240        if calc_in is not None:
241            all_inds = np.argsort(calc_in["f"])
242            best_inds = all_inds[:q]
243            mu = np.mean(H_o["x"][best_inds], axis=0)
244            Sigma = np.cov(H_o["x"][best_inds].T)
245
246    return None, persis_info, FINISHED_PERSISTENT_GEN_TAG
247
248
249def persistent_uniform_with_cancellations(_, persis_info, gen_specs, libE_info):
250    ub = gen_specs["user"]["ub"]
251    lb = gen_specs["user"]["lb"]
252    n = len(lb)
253    b = gen_specs["user"]["initial_batch_size"]
254
255    # Start cancelling points from half initial batch onward
256    cancel_from = b // 2  # Should get at least this many points back
257
258    ps = PersistentSupport(libE_info, EVAL_GEN_TAG)
259
260    # Send batches until manager sends stop tag
261    tag = None
262    while tag not in [STOP_TAG, PERSIS_STOP]:
263        H_o = np.zeros(b, dtype=gen_specs["out"])
264        H_o["x"] = persis_info["rand_stream"].uniform(lb, ub, (b, n))
265        tag, Work, calc_in = ps.send_recv(H_o)
266
267        if hasattr(calc_in, "__len__"):
268            b = len(calc_in)
269
270            # Cancel as many points as got back
271            cancel_ids = list(range(cancel_from, cancel_from + b))
272            cancel_from += b
273            ps.request_cancel_sim_ids(cancel_ids)
274
275    return None, persis_info, FINISHED_PERSISTENT_GEN_TAG