persistent_tasmanian
Required: Tasmanian, pypackaging, scikit-build
Note that Tasmanian can be pip installed, but currently must use either venv or –user install.
E.g: pip install scikit-build packaging Tasmanian --user
A persistent generator using the uncertainty quantification capabilities in Tasmanian.
- persistent_tasmanian.lex_le(x, y, tol=1e-12)
Returns True if x <= y lexicographically up to some tolerance.
- persistent_tasmanian.get_2D_insert_indices(x, y, x_ord=array([], dtype=int64), y_ord=array([], dtype=int64), tol=1e-12)
Finds the row indices in a 2D numpy array x for which the sorted values of y can be inserted into. If x_ord (resp. y_ord) is empty, then x (resp. y) must be lexicographically sorted. Otherwise, x[x_ord] (resp. y[y_ord]) must be lexicographically sorted. Complexity is O(x.shape[0] + y.shape[0]).
- persistent_tasmanian.get_2D_duplicate_indices(x, y, x_ord=array([], dtype=int64), y_ord=array([], dtype=int64), tol=1e-12)
Finds the row indices of a 2D numpy array x which overlap with y. If x_ord (resp. y_ord) is empty, then x (resp. y) must be lexicographically sorted. Otherwise, x[x_ord] (resp. y[y_ord]) must be lexicographically sorted.Complexity is O(x.shape[0] + y.shape[0]).
- persistent_tasmanian.get_state(queued_pts, queued_ids, id_offset, new_points=array([], dtype=float64), completed_points=array([], dtype=float64), tol=1e-12)
Creates the data to be sent and updates the state arrays and scalars if new information (new_points or completed_points) arrives. Ensures that the output state arrays remain sorted if the input state arrays are already sorted.
- persistent_tasmanian.get_H0(gen_specs, refined_pts, refined_ord, queued_pts, queued_ids, tol=1e-12)
For runs following the first one, get the history array H0 based on the ordering in refined_pts
- persistent_tasmanian.sparse_grid_batched(H, persis_info, gen_specs, libE_info)
Implements batched construction for a Tasmanian sparse grid, using the loop described in Tasmanian Example 09: sparse grid example
- persistent_tasmanian.sparse_grid_async(H, persis_info, gen_specs, libE_info)
Implements asynchronous construction for a Tasmanian sparse grid, using the logic in the dynamic Tasmanian model construction function: sparse grid dynamic example
- persistent_tasmanian.get_sparse_grid_specs(user_specs, sim_f, num_dims, num_outputs=1, mode='batched')
Helper function that generates the simulator, generator, and allocator specs as well as the persis_info dictionary to ensure that they are compatible with the custom generators in this script. The outputs should be used in the main libE() call.
- INPUTS:
- user_specs (dict)a dictionary of user specs that is needed in the generator specs;
expects the key “tasmanian_init” whose value is a 0-argument lambda that initializes an appropriate Tasmanian sparse grid object.
- sim_f (func)a lambda function that takes in generator outputs (simulator inputs)
and returns simulator outputs.
num_dims (int) : number of model inputs.
num_outputs (int) : number of model outputs.
mode (string) : can either be “batched” or “async”.
- OUTPUTS:
sim_specs (dict) : a dictionary of simulation specs and also one of the inputs of libE().
gen_specs (dict) : a dictionary of generator specs and also one of the inputs of libE().
alloc_specs (dict) : a dictionary of allocation specs and also one of the inputs of libE().
- persis_info (dict)a dictionary containing common information that is passed to all
workers and also one of the inputs of libE().
persistent_tasmanian.py
1"""
2A persistent generator using the uncertainty quantification capabilities in
3`Tasmanian <https://tasmanian.ornl.gov/>`_.
4"""
5
6import numpy as np
7
8from libensemble.alloc_funcs.start_only_persistent import only_persistent_gens as allocf
9from libensemble.message_numbers import EVAL_GEN_TAG, FINISHED_PERSISTENT_GEN_TAG, PERSIS_STOP, STOP_TAG
10from libensemble.tools import parse_args
11from libensemble.tools.persistent_support import PersistentSupport
12
13
14def lex_le(x, y, tol=1e-12):
15 """
16 Returns True if x <= y lexicographically up to some tolerance.
17 """
18 cmp = np.fabs(x - y) > tol
19 ind = np.argmax(cmp)
20 if not cmp[ind]:
21 return True
22 return x[ind] <= y[ind]
23
24
25def get_2D_insert_indices(x, y, x_ord=np.empty(0, dtype="int"), y_ord=np.empty(0, dtype="int"), tol=1e-12):
26 """
27 Finds the row indices in a 2D numpy array `x` for which the sorted values of `y` can be inserted
28 into. If `x_ord` (resp. `y_ord`) is empty, then `x` (resp. `y`) must be lexicographically
29 sorted. Otherwise, `x[x_ord]` (resp. `y[y_ord]`) must be lexicographically sorted. Complexity is
30 O(x.shape[0] + y.shape[0]).
31 """
32 assert len(x.shape) == 2
33 assert len(y.shape) == 2
34 if x.size == 0:
35 return np.zeros(y.shape[0], dtype="int")
36 else:
37 if x_ord.size == 0:
38 x_ord = np.arange(x.shape[0], dtype="int")
39 if y_ord.size == 0:
40 y_ord = np.arange(y.shape[0], dtype="int")
41 x_ptr = 0
42 y_ptr = 0
43 out_ord = np.empty(0, dtype="int")
44 while y_ptr < y.shape[0]:
45 # The case where y[k] <= max of x[k:end, :]
46 xk = x[x_ord[x_ptr], :]
47 yk = y[y_ord[y_ptr], :]
48 if lex_le(yk, xk, tol=tol):
49 out_ord = np.append(out_ord, x_ord[x_ptr])
50 y_ptr += 1
51 else:
52 x_ptr += 1
53 # The edge case where y[k] is the largest of all elements of x.
54 if x_ptr >= x_ord.shape[0]:
55 for i in range(y_ptr, y_ord.shape[0], 1):
56 out_ord = np.append(out_ord, x_ord.shape[0])
57 y_ptr += 1
58 break
59 return out_ord
60
61
62def get_2D_duplicate_indices(x, y, x_ord=np.empty(0, dtype="int"), y_ord=np.empty(0, dtype="int"), tol=1e-12):
63 """
64 Finds the row indices of a 2D numpy array `x` which overlap with `y`. If `x_ord` (resp. `y_ord`)
65 is empty, then `x` (resp. `y`) must be lexicographically sorted. Otherwise, `x[x_ord]` (resp.
66 `y[y_ord]`) must be lexicographically sorted.Complexity is O(x.shape[0] + y.shape[0]).
67 """
68 assert len(x.shape) == 2
69 assert len(y.shape) == 2
70 if x.size == 0:
71 return np.empty(0, dtype="int")
72 else:
73 if x_ord.size == 0:
74 x_ord = np.arange(x.shape[0], dtype="int")
75 if y_ord.size == 0:
76 y_ord = np.arange(y.shape[0], dtype="int")
77 x_ptr = 0
78 y_ptr = 0
79 out_ord = np.empty(0, dtype="int")
80 while y_ptr < y.shape[0] and x_ptr < x.shape[0]:
81 # The case where y[k] <= max of x[k:end, :]
82 xk = x[x_ord[x_ptr], :]
83 yk = y[y_ord[y_ptr], :]
84 if all(np.fabs(yk - xk) <= tol):
85 out_ord = np.append(out_ord, x_ord[x_ptr])
86 x_ptr += 1
87 elif lex_le(xk, yk, tol=tol):
88 x_ptr += 1
89 else:
90 y_ptr += 1
91 return out_ord
92
93
94def get_state(queued_pts, queued_ids, id_offset, new_points=np.array([]), completed_points=np.array([]), tol=1e-12):
95 """
96 Creates the data to be sent and updates the state arrays and scalars if new information
97 (new_points or completed_points) arrives. Ensures that the output state arrays remain sorted if
98 the input state arrays are already sorted.
99 """
100 if new_points.size > 0:
101 new_points_ord = np.lexsort(np.rot90(new_points))
102 new_points_ids = id_offset + np.arange(new_points.shape[0])
103 id_offset += new_points.shape[0]
104 insert_idx = get_2D_insert_indices(queued_pts, new_points, y_ord=new_points_ord, tol=tol)
105 queued_pts = np.insert(queued_pts, insert_idx, new_points[new_points_ord], axis=0)
106 queued_ids = np.insert(queued_ids, insert_idx, new_points_ids[new_points_ord], axis=0)
107
108 if completed_points.size > 0:
109 completed_ord = np.lexsort(np.rot90(completed_points))
110 delete_ind = get_2D_duplicate_indices(queued_pts, completed_points, y_ord=completed_ord, tol=tol)
111 queued_pts = np.delete(queued_pts, delete_ind, axis=0)
112 queued_ids = np.delete(queued_ids, delete_ind, axis=0)
113
114 return queued_pts, queued_ids, id_offset
115
116
117def get_H0(gen_specs, refined_pts, refined_ord, queued_pts, queued_ids, tol=1e-12):
118 """
119 For runs following the first one, get the history array H0 based on the ordering in `refined_pts`
120 """
121
122 def approx_eq(x, y):
123 return np.argmax(np.fabs(x - y)) <= tol
124
125 num_ids = queued_ids.shape[0]
126 H0 = np.zeros(num_ids, dtype=gen_specs["out"])
127 refined_priority = np.flip(np.arange(refined_pts.shape[0], dtype="int"))
128 rptr = 0
129 for qptr in range(num_ids):
130 while not approx_eq(refined_pts[refined_ord[rptr]], queued_pts[qptr]):
131 rptr += 1
132 assert rptr <= refined_pts.shape[0]
133 H0["x"][qptr] = queued_pts[qptr]
134 H0["sim_id"][qptr] = queued_ids[qptr]
135 H0["priority"][qptr] = refined_priority[refined_ord[rptr]]
136 return H0
137
138
139# ========================
140# Main generator functions
141# ========================
142
143
144def sparse_grid_batched(H, persis_info, gen_specs, libE_info):
145 """
146 Implements batched construction for a Tasmanian sparse grid,
147 using the loop described in Tasmanian Example 09:
148 `sparse grid example <https://github.com/ORNL/TASMANIAN/blob/master/InterfacePython/example_sparse_grids_09.py>`_
149
150 """
151 U = gen_specs["user"]
152 ps = PersistentSupport(libE_info, EVAL_GEN_TAG)
153 grid = U["tasmanian_init"]() # initialize the grid
154 allowed_refinements = [
155 "setAnisotropicRefinement",
156 "getAnisotropicRefinement",
157 "setSurplusRefinement",
158 "getSurplusRefinement",
159 "none",
160 ]
161 assert (
162 "refinement" in U and U["refinement"] in allowed_refinements
163 ), f"Must provide a gen_specs['user']['refinement'] in: {allowed_refinements}"
164
165 while grid.getNumNeeded() > 0:
166 aPoints = grid.getNeededPoints()
167
168 H0 = np.zeros(len(aPoints), dtype=gen_specs["out"])
169 H0["x"] = aPoints
170
171 # Receive values from manager
172 tag, Work, calc_in = ps.send_recv(H0)
173 if tag in [STOP_TAG, PERSIS_STOP]:
174 break
175 aModelValues = calc_in["f"]
176
177 # Update surrogate on grid
178 t = aModelValues.reshape((aModelValues.shape[0], grid.getNumOutputs()))
179 t = t.flatten()
180 t = np.atleast_2d(t).T
181 grid.loadNeededPoints(t)
182
183 if "tasmanian_checkpoint_file" in U:
184 grid.write(U["tasmanian_checkpoint_file"])
185
186 # set refinement, using user["refinement"] to pick the refinement strategy
187 if U["refinement"] in ["setAnisotropicRefinement", "getAnisotropicRefinement"]:
188 assert "sType" in U
189 assert "iMinGrowth" in U
190 assert "iOutput" in U
191 grid.setAnisotropicRefinement(U["sType"], U["iMinGrowth"], U["iOutput"])
192 elif U["refinement"] in ["setSurplusRefinement", "getSurplusRefinement"]:
193 assert "fTolerance" in U
194 assert "iOutput" in U
195 assert "sCriteria" in U
196 grid.setSurplusRefinement(U["fTolerance"], U["iOutput"], U["sCriteria"])
197
198 return H0, persis_info, FINISHED_PERSISTENT_GEN_TAG
199
200
201def sparse_grid_async(H, persis_info, gen_specs, libE_info):
202 """
203 Implements asynchronous construction for a Tasmanian sparse grid,
204 using the logic in the dynamic Tasmanian model construction function:
205 `sparse grid dynamic example <https://github.com/ORNL/TASMANIAN/blob/master/Addons/tsgConstructSurrogate.hpp>`_
206
207 """
208 U = gen_specs["user"]
209 ps = PersistentSupport(libE_info, EVAL_GEN_TAG)
210 grid = U["tasmanian_init"]() # initialize the grid
211 allowed_refinements = ["getCandidateConstructionPoints", "getCandidateConstructionPointsSurplus"]
212 assert (
213 "refinement" in U and U["refinement"] in allowed_refinements
214 ), f"Must provide a gen_specs['user']['refinement'] in: {allowed_refinements}"
215 tol = U["_match_tolerance"] if "_match_tolerance" in U else 1.0e-12
216
217 # Choose the refinement function based on U["refinement"].
218 if U["refinement"] == "getCandidateConstructionPoints":
219 assert "sType" in U
220 assert "liAnisotropicWeightsOrOutput" in U
221 if U["refinement"] == "getCandidateConstructionPointsSurplus":
222 assert "fTolerance" in U
223 assert "sRefinementType" in U
224
225 def get_refined_points(g, U):
226 if U["refinement"] == "getCandidateConstructionPoints":
227 return g.getCandidateConstructionPoints(U["sType"], U["liAnisotropicWeightsOrOutput"])
228 else:
229 assert U["refinement"] == "getCandidateConstructionPointsSurplus"
230 return g.getCandidateConstructionPointsSurplus(U["fTolerance"], U["sRefinementType"])
231 # else:
232 # raise ValueError("Unknown refinement string")
233
234 # Asynchronous helper and state variables.
235 num_dims = grid.getNumDimensions()
236 num_completed = 0
237 offset = 0
238 queued_pts = np.empty((0, num_dims), dtype="float")
239 queued_ids = np.empty(0, dtype="int")
240
241 # First run.
242 grid.beginConstruction()
243 init_pts = get_refined_points(grid, U)
244 queued_pts, queued_ids, offset = get_state(queued_pts, queued_ids, offset, new_points=init_pts, tol=tol)
245 H0 = np.zeros(init_pts.shape[0], dtype=gen_specs["out"])
246 H0["x"] = init_pts
247 H0["sim_id"] = np.arange(init_pts.shape[0], dtype="int")
248 H0["priority"] = np.flip(H0["sim_id"])
249 tag, Work, calc_in = ps.send_recv(H0)
250
251 # Subsequent runs.
252 while tag not in [STOP_TAG, PERSIS_STOP]:
253 # Parse the points returned by the allocator.
254 num_completed += calc_in["x"].shape[0]
255 queued_pts, queued_ids, offset = get_state(
256 queued_pts, queued_ids, offset, completed_points=calc_in["x"], tol=tol
257 )
258
259 # Compute the next batch of points (if they exist).
260 new_pts = np.empty((0, num_dims), dtype="float")
261 refined_pts = np.empty((0, num_dims), dtype="float")
262 refined_ord = np.empty(0, dtype="int")
263 if grid.getNumLoaded() < 1000 or num_completed > 0.2 * grid.getNumLoaded():
264 # A copy is needed because the data in the calc_in arrays are not contiguous.
265 grid.loadConstructedPoint(np.copy(calc_in["x"]), np.copy(calc_in["f"]))
266 if "tasmanian_checkpoint_file" in U:
267 grid.write(U["tasmanian_checkpoint_file"])
268 refined_pts = get_refined_points(grid, U)
269 # If the refined points are empty, then there is a stopping condition internal to the
270 # Tasmanian sparse grid that is being triggered by the loaded points.
271 if refined_pts.size == 0:
272 break
273 refined_ord = np.lexsort(np.rot90(refined_pts))
274 delete_ind = get_2D_duplicate_indices(refined_pts, queued_pts, x_ord=refined_ord, tol=tol)
275 new_pts = np.delete(refined_pts, delete_ind, axis=0)
276
277 if new_pts.shape[0] > 0:
278 # Update the state variables with the refined points and update the queue in the allocator.
279 num_completed = 0
280 queued_pts, queued_ids, offset = get_state(queued_pts, queued_ids, offset, new_points=new_pts, tol=tol)
281 H0 = get_H0(gen_specs, refined_pts, refined_ord, queued_pts, queued_ids, tol=tol)
282 tag, Work, calc_in = ps.send_recv(H0)
283 else:
284 tag, Work, calc_in = ps.recv()
285
286 return [], persis_info, FINISHED_PERSISTENT_GEN_TAG
287
288
289def get_sparse_grid_specs(user_specs, sim_f, num_dims, num_outputs=1, mode="batched"):
290 """
291 Helper function that generates the simulator, generator, and allocator specs as well as the
292 persis_info dictionary to ensure that they are compatible with the custom generators in this
293 script. The outputs should be used in the main libE() call.
294
295 INPUTS:
296 user_specs (dict) : a dictionary of user specs that is needed in the generator specs;
297 expects the key "tasmanian_init" whose value is a 0-argument lambda
298 that initializes an appropriate Tasmanian sparse grid object.
299
300 sim_f (func) : a lambda function that takes in generator outputs (simulator inputs)
301 and returns simulator outputs.
302
303 num_dims (int) : number of model inputs.
304
305 num_outputs (int) : number of model outputs.
306
307 mode (string) : can either be "batched" or "async".
308
309 OUTPUTS:
310 sim_specs (dict) : a dictionary of simulation specs and also one of the inputs of libE().
311
312 gen_specs (dict) : a dictionary of generator specs and also one of the inputs of libE().
313
314 alloc_specs (dict) : a dictionary of allocation specs and also one of the inputs of libE().
315
316 persis_info (dict) : a dictionary containing common information that is passed to all
317 workers and also one of the inputs of libE().
318
319 """
320
321 assert "tasmanian_init" in user_specs
322 assert mode in ["batched", "async"]
323
324 sim_specs = {
325 "sim_f": sim_f,
326 "in": ["x"],
327 }
328 gen_out = [
329 ("x", float, (num_dims,)),
330 ("sim_id", int),
331 ("priority", int),
332 ]
333 gen_specs = {
334 "persis_in": [t[0] for t in gen_out] + ["f"],
335 "out": gen_out,
336 "user": user_specs,
337 }
338 alloc_specs = {
339 "alloc_f": allocf,
340 "user": {},
341 }
342
343 if mode == "batched":
344 gen_specs["gen_f"] = sparse_grid_batched
345 sim_specs["out"] = [("f", float, (num_outputs,))]
346 if mode == "async":
347 gen_specs["gen_f"] = sparse_grid_async
348 sim_specs["out"] = [("x", float, (num_dims,)), ("f", float, (num_outputs,))]
349 alloc_specs["user"]["active_recv_gen"] = True
350 alloc_specs["user"]["async_return"] = True
351
352 nworkers, _, _, _ = parse_args()
353 persis_info = {}
354 for i in range(nworkers + 1):
355 persis_info[i] = {"worker_num": i}
356
357 return sim_specs, gen_specs, alloc_specs, persis_info