Ensemble with an MPI Application
This tutorial highlights libEnsemble’s capability to portably execute and monitor external scripts or user applications within simulation or generator functions using the executor.
The calling script registers a compiled executable that simulates electrostatic forces between a collection of particles. The simulator function launches instances of this executable and reads output files to determine the result.
This tutorial uses libEnsemble’s MPI Executor, which automatically detects available MPI runners and resources.
This example also uses a persistent generator. This generator runs on a worker throughout the ensemble, producing new simulation parameters as requested.
Getting Started
The simulation source code forces.c
can be obtained directly from the
libEnsemble repository in the forces_app directory.
Assuming MPI and its C compiler mpicc
are available, compile
forces.c
into an executable (forces.x
) with:
mpicc -O3 -o forces.x forces.c -lm
Alternative build lines for different platforms can be found in the build_forces.sh
file in the same directory.
Calling Script
Complete scripts for this example can be found in the forces_simple directory.
Let’s begin by writing our calling script to specify our simulation and generation functions and call libEnsemble. Create a Python file called run_libe_forces.py containing:
1import os
2import sys
3
4import numpy as np
5from forces_simf import run_forces # Sim func from current dir
6
7from libensemble import Ensemble
8from libensemble.alloc_funcs.start_only_persistent import only_persistent_gens as alloc_f
9from libensemble.executors import MPIExecutor
10from libensemble.gen_funcs.persistent_sampling import persistent_uniform as gen_f
11from libensemble.specs import AllocSpecs, ExitCriteria, GenSpecs, LibeSpecs, SimSpecs
12
13if __name__ == "__main__":
14 # Initialize MPI Executor
15 exctr = MPIExecutor()
16
17 # Register simulation executable with executor
18 sim_app = os.path.join(os.getcwd(), "../forces_app/forces.x")
19
20 if not os.path.isfile(sim_app):
21 sys.exit("forces.x not found - please build first in ../forces_app dir")
22
23 exctr.register_app(full_path=sim_app, app_name="forces")
24
25 # Parse number of workers, comms type, etc. from arguments
26 ensemble = Ensemble(parse_args=True, executor=exctr)
We first instantiate our MPI Executor. Registering an application is as easy as providing the full file-path and giving it a memorable name. This Executor will later be used within our simulation function to launch the registered app.
The last line in the above codeblock initializes the ensemble. The parse_args
parameter is used to read comms and nworkers from the command line. This sets
the respective libE_specs options.
Next, we will add basic configuration for the ensemble. As one worker will run a persistent generator, we calculate the number of workers that need resources to run simulations. We also set sim_dirs_make so that a directory is created for each simulation. This helps organize output and also helps prevent workers from overwriting previous results.
28 nsim_workers = ensemble.nworkers - 1 # One worker is for persistent generator
29
30 # Persistent gen does not need resources
31 ensemble.libE_specs = LibeSpecs(
32 num_resource_sets=nsim_workers, sim_dirs_make=True, ensemble_dir_path="./test_executor_forces_tutorial"
33 )
Next we define the sim_specs and gen_specs. Recall that these are used to specify to libEnsemble what user functions and input/output fields to expect, and also to parameterize user functions:
37 ensemble.sim_specs = SimSpecs(
38 sim_f=run_forces,
39 inputs=["x"],
40 outputs=[("energy", float)],
41 )
42
43 ensemble.gen_specs = GenSpecs(
44 gen_f=gen_f,
45 inputs=[], # No input when starting persistent generator
46 persis_in=["sim_id"], # Return sim_ids of evaluated points to generator
47 outputs=[("x", float, (1,))],
48 user={
49 "initial_batch_size": nsim_workers,
50 "lb": np.array([1000]), # min particles
51 "ub": np.array([3000]), # max particles
52 },
53 ) # gen_specs_end_tag
Next, configure an allocation function, which starts the one persistent generator and farms out the simulations. We also tell it to wait for all simulations to return their results, before generating more parameters.
55 ensemble.alloc_specs = AllocSpecs(
56 alloc_f=alloc_f,
57 user={
58 "async_return": False, # False causes batch returns
59 },
60 )
Now we set exit_criteria to exit after running eight simulations.
We also give each worker a seeded random stream, via the persis_info option. These can be used for random number generation if required.
Finally we run the ensemble.
62 # Instruct libEnsemble to exit after this many simulations
63 ensemble.exit_criteria = ExitCriteria(sim_max=8)
64
65 # Seed random streams for each worker, particularly for gen_f
66 ensemble.add_random_streams()
67
68 # Run ensemble
69 ensemble.run()
Exercise
This may take some additional browsing of the docs to complete.
Write an alternative Calling Script similar to above, but with the following differences:
Set libEnsemble’s logger to print debug messages.
Override the MPIExecutor’s detected MPI runner with
"openmpi"
.Tell the allocation function to return results to the generator asynchronously.
Use the ensemble function
save_output()
to save the History array andpersis_info
to files after libEnsemble completes.
Click Here for Solutions
Soln 1. Debug logging gives lots of information.
from libensemble import Ensemble, logger from libensemble.alloc_funcs.start_only_persistent import only_persistent_gens as alloc_f from libensemble.executors import MPIExecutor from libensemble.gen_funcs.persistent_sampling import persistent_uniform as gen_f from libensemble.specs import AllocSpecs, ExitCriteria, GenSpecs, LibeSpecs, SimSpecs logger.set_level("DEBUG")
Soln 2. This can also be specified via platform_specs option.
# Initialize MPI Executor exctr = MPIExecutor(custom_info={"mpi_runner": "openmpi"})
Soln 3. Set async_return
to True in the allocation .
# Starts one persistent generator. Simulated values are returned in batch. ensemble.alloc_specs = AllocSpecs( alloc_f=alloc_f, user={ "async_return": True, }, )
Soln 4. End your script in the following manner to save the output based
on the name of the calling script. You can give any string in place of __file__
.
# Run ensemble ensemble.run() ensemble.save_output(__file__)
Simulation Function
Our simulation function is where we’ll use libEnsemble’s executor to configure and submit our application for execution. We’ll poll this task’s state while it runs, and once we’ve detected it has finished we’ll send any results or exit statuses back to the manager.
Create another Python file named forces_simf.py
containing the following
for starters:
1import numpy as np
2
3# Optional status codes to display in libE_stats.txt for each gen or sim
4from libensemble.message_numbers import TASK_FAILED, WORKER_DONE
5
6
7def run_forces(H, persis_info, sim_specs, libE_info):
8 """Runs the forces MPI application"""
9
10 calc_status = 0
11
12 # Parse out num particles, from generator function
13 particles = str(int(H["x"][0][0]))
14
15 # app arguments: num particles, timesteps, also using num particles as seed
16 args = particles + " " + str(10) + " " + particles
17
18 # Retrieve our MPI Executor
19 exctr = libE_info["executor"]
20
21 # Submit our forces app for execution.
22 task = exctr.submit(app_name="forces", app_args=args)
23
24 # Block until the task finishes
25 task.wait()
We retrieve the generated number of particles from H
and construct
an argument string for our launched application. The particle count doubles up
as a random number seed here.
We then retrieve our previously instantiated Executor. libEnsemble will use the MPI runner detected (or provided by platform options). As num_procs (or similar) is not specified, libEnsemble will assign the processors available to this worker.
After submitting the “forces” app for execution,
a Task object is returned that correlates with the launched app.
This object is roughly equivalent to a Python future and can be polled, killed,
and evaluated in a variety of helpful ways. For now, we’re satisfied with waiting
for the task to complete via task.wait()
.
We can assume that afterward, any results are now available to parse. Our application
produces a forces.stat
file that contains either energy
computations for every timestep or a “kill” message if particles were lost, which
indicates a bad run - this can be ignored for now.
To complete our simulation function, parse the last energy value from the output file into
a local output History array, and if successful,
set the simulation function’s exit status calc_status
to WORKER_DONE
. Otherwise, send back NAN
and a TASK_FAILED
status:
27 # Try loading final energy reading, set the sim's status
28 statfile = "forces.stat"
29 try:
30 data = np.loadtxt(statfile)
31 final_energy = data[-1]
32 calc_status = WORKER_DONE
33 except Exception:
34 final_energy = np.nan
35 calc_status = TASK_FAILED
36
37 # Define our output array, populate with energy reading
38 output = np.zeros(1, dtype=sim_specs["out"])
39 output["energy"] = final_energy
40
41 # Return final information to worker, for reporting to manager
42 return output, persis_info, calc_status
calc_status
will be displayed in the libE_stats.txt
log file.
That’s it! As can be seen, with libEnsemble, it’s relatively easy to get started with launching applications.
Running the example
This completes our calling script and simulation function. Run libEnsemble with:
python run_libe_forces.py --comms local --nworkers [nworkers]
where nworkers
is one more than the number of concurrent simulations.
Output files (including forces.stat
and files containing stdout
and
stderr
content for each task) should appear in the current working
directory. Overall workflow information should appear in libE_stats.txt
and ensemble.log
as usual.
Example run / output
For example, after running:
python run_libe_forces.py --comms local --nworkers 3
my libE_stats.txt
resembled:
Manager : Starting ensemble at: 2023-09-12 18:12:08.517
Worker 2: sim_id 0: sim Time: 0.205 Start: ... End: ... Status: Completed
Worker 3: sim_id 1: sim Time: 0.284 Start: ... End: ... Status: Completed
Worker 2: sim_id 2: sim Time: 0.117 Start: ... End: ... Status: Completed
Worker 3: sim_id 3: sim Time: 0.294 Start: ... End: ... Status: Completed
Worker 2: sim_id 4: sim Time: 0.124 Start: ... End: ... Status: Completed
Worker 3: sim_id 5: sim Time: 0.174 Start: ... End: ... Status: Completed
Worker 3: sim_id 7: sim Time: 0.135 Start: ... End: ... Status: Completed
Worker 2: sim_id 6: sim Time: 0.275 Start: ... End: ... Status: Completed
Worker 1: Gen no 1: gen Time: 1.038 Start: ... End: ... Status: Persis gen finished
Manager : Exiting ensemble at: 2023-09-12 18:12:09.565 Time Taken: 1.048
where status
is set based on the simulation function’s returned calc_status
.
My ensemble.log
(on a four-core laptop) resembled:
[0] ... libensemble.libE (INFO): Logger initializing: [workerID] precedes each line. [0] = Manager
[0] ... libensemble.libE (INFO): libE version v0.10.2+dev
[0] ... libensemble.manager (INFO): Manager initiated on node shuds
[0] ... libensemble.manager (INFO): Manager exit_criteria: {'sim_max': 8}
[2] ... libensemble.worker (INFO): Worker 2 initiated on node shuds
[3] ... libensemble.worker (INFO): Worker 3 initiated on node shuds
[1] ... libensemble.worker (INFO): Worker 1 initiated on node shuds
[2] ... libensemble.executors.mpi_executor (INFO): Launching task libe_task_forces_worker2_0: mpirun -hosts shuds -np 2 --ppn 2 /home/.../forces_app/forces.x 2023 10 2023
[3] ... libensemble.executors.mpi_executor (INFO): Launching task libe_task_forces_worker3_0: mpirun -hosts shuds -np 2 --ppn 2 /home/.../forces_app/forces.x 2900 10 2900
[2] ... libensemble.executors.executor (INFO): Task libe_task_forces_worker2_0 finished with errcode 0 (FINISHED)
[3] ... libensemble.executors.executor (INFO): Task libe_task_forces_worker3_0 finished with errcode 0 (FINISHED)
[2] ... libensemble.executors.mpi_executor (INFO): Launching task libe_task_forces_worker2_1: mpirun -hosts shuds -np 2 --ppn 2 /home/.../forces_app/forces.x 1288 10 1288
[3] ... libensemble.executors.mpi_executor (INFO): Launching task libe_task_forces_worker3_1: mpirun -hosts shuds -np 2 --ppn 2 /home/.../forces_app/forces.x 2897 10 2897
[2] ... libensemble.executors.executor (INFO): Task libe_task_forces_worker2_1 finished with errcode 0 (FINISHED)
[3] ... libensemble.executors.executor (INFO): Task libe_task_forces_worker3_1 finished with errcode 0 (FINISHED)
[2] ... libensemble.executors.mpi_executor (INFO): Launching task libe_task_forces_worker2_2: mpirun -hosts shuds -np 2 --ppn 2 /home/.../forces_app/forces.x 1623 10 1623
[3] ... libensemble.executors.mpi_executor (INFO): Launching task libe_task_forces_worker3_2: mpirun -hosts shuds -np 2 --ppn 2 /home/.../forces_app/forces.x 1846 10 1846
[2] ... libensemble.executors.executor (INFO): Task libe_task_forces_worker2_2 finished with errcode 0 (FINISHED)
[3] ... libensemble.executors.executor (INFO): Task libe_task_forces_worker3_2 finished with errcode 0 (FINISHED)
[2] ... libensemble.executors.mpi_executor (INFO): Launching task libe_task_forces_worker2_3: mpirun -hosts shuds -np 2 --ppn 2 /home/.../forces_app/forces.x 2655 10 2655
[3] ... libensemble.executors.mpi_executor (INFO): Launching task libe_task_forces_worker3_3: mpirun -hosts shuds -np 2 --ppn 2 /home/.../forces_app/forces.x 1818 10 1818
[3] ... libensemble.executors.executor (INFO): Task libe_task_forces_worker3_3 finished with errcode 0 (FINISHED)
[2] ... libensemble.executors.executor (INFO): Task libe_task_forces_worker2_3 finished with errcode 0 (FINISHED)
[0] ... libensemble.manager (INFO): Term test tripped: sim_max
[0] ... libensemble.manager (INFO): Term test tripped: sim_max
[0] ... libensemble.libE (INFO): Manager total time: 1.043
Note again that the four cores were divided equally among two workers that run simulations.
That concludes this tutorial. Each of these example files can be found in the repository in examples/tutorials/forces_with_executor.
For further experimentation, we recommend trying out this libEnsemble tutorial workflow on a cluster or multi-node system, since libEnsemble can also manage those resources and is developed to coordinate computations at huge scales. See HPC platform guides for more information.
See the forces_gpu tutorial for a similar workflow including GPUs. That tutorial also shows how to dynamically assign resources to each simulation.
Please feel free to contact us or open an issue on GitHub if this tutorial workflow doesn’t work properly on your cluster or other compute resource.
Exercises
These may require additional browsing of the documentation to complete.
Adjust
submit()
to launch with four processes.Adjust
submit()
again so the app’sstdout
andstderr
are written tostdout.txt
andstderr.txt
respectively.Add a fourth argument to the args line to make 20% of simulations go bad.
Construct a
while not task.finished:
loop that periodically sleeps for a tenth of a second, callstask.poll()
, then reads the output.stat
file, and callstask.kill()
if the output file contains"kill\n"
or iftask.runtime
exceeds sixty seconds.
Click Here for Solution
Showing updated sections only (---
refers to snips where code is unchanged).
import time
...
args = particles + " " + str(10) + " " + particles + " " + str(0.2)
...
statfile = "forces.stat"
task = exctr.submit(
app_name="forces",
app_args=args,
num_procs=4,
stdout="stdout.txt",
stderr="stderr.txt",
)
while not task.finished:
time.sleep(0.1)
task.poll()
if task.file_exists_in_workdir(statfile):
with open(statfile, "r") as f:
if "kill\n" in f.readlines():
task.kill()
if task.runtime > 60:
task.kill()
...
Running the generator on the manager
As of version 1.3.0, the generator can be run on a thread on the manager, using the libE_specs option gen_on_manager.
Change the libE_specs as follows.
28nsim_workers = ensemble.nworkers 29 30# Persistent gen does not need resources 31ensemble.libE_specs = LibeSpecs( 32 gen_on_manager=True, 33 sim_dirs_make=True, 34 ensemble_dir_path="./test_executor_forces_tutorial", 35)
When running set nworkers
to the number of workers desired for running simulations.
E.g., Instead of:
python run_libe_forces.py --comms local --nworkers 5
use:
python run_libe_forces.py --comms local --nworkers 4
Note that as the generator random number seed will be zero instead of one, the checksum will change.
For more information see Running generator on the manager.
Running forces application with input file
Many applications read an input file instead of being given parameters directly on the run line.
forces_simple_with_input_file directory contains a variant of this example, where a templated input file is parameterized for each evaluation.
This requires jinja2 to be installed:
pip install jinja2
The file forces_input
contains the following (remember we are using particles
as seed also for simplicity):
num_particles = {{particles}}
num_steps = 10
rand_seed = {{particles}}
libEnsemble will copy this input file to each simulation directory.
The sim_f
uses the following function to customize the input file with the parameters
for the current simulation.
def set_input_file_params(H, sim_specs, ints=False):
"""
This is a general function to parameterize the input file with any inputs
from sim_specs["in"]
Often sim_specs_in["x"] may be multi-dimensional, where each dimension
corresponds to a different input name in sim_specs["user"]["input_names"]).
Effectively an unpacking of "x"
"""
input_file = sim_specs["user"]["input_filename"]
input_values = {}
for i, name in enumerate(sim_specs["user"]["input_names"]):
value = int(H["x"][0][i]) if ints else H["x"][0][i]
input_values[name] = value
with open(input_file, "r") as f:
template = jinja2.Template(f.read())
with open(input_file, "w") as f:
f.write(template.render(input_values))
This is called in the simulation function as follows.
def run_forces(H, persis_info, sim_specs, libE_info):
"""Runs the forces MPI application reading input from file"""
calc_status = 0
input_file = sim_specs["user"]["input_filename"]
set_input_file_params(H, sim_specs, ints=True)
# Retrieve our MPI Executor
exctr = libE_info["executor"]
# Submit our forces app for execution.
task = exctr.submit(app_name="forces") # app_args removed
# Block until the task finishes
task.wait(timeout=60)
Notice that we convert the parameters to integers in this example.
The calling script then specifies the templated input file as follows.
30input_file = "forces_input"
31
32# Persistent gen does not need resources
33ensemble.libE_specs = LibeSpecs(
34 num_resource_sets=nsim_workers,
35 sim_dirs_make=True,
36 sim_dir_copy_files=[input_file],
37)
38
39ensemble.sim_specs = SimSpecs(
40 sim_f=run_forces,
41 inputs=["x"],
42 outputs=[("energy", float)],
43 user={"input_filename": input_file, "input_names": ["particles"]},
44)
Line 36 tells the templated input file to be copied to each simulation directory.
An alternative is to use sim_input_dir
, which gives the name of a directory
that may contain multiple files and will be used as the base of each simulation
directory.
Line 43 gives the input file name and the name of each parameter to the simulation function.
Multiple parameters
In our case, the only parameter name is x
. However, in some cases, x
(as defined by sim_specs["in"]
) may be multi-dimensional, where each
component has a different parameter name (e.g., “x”, “y”). For example, if the
input file were:
num_particles = {{particles}}
num_steps = {{nsteps}}
rand_seed = {{seed}}
then line 43 would be:
user = {"input_filename": input_file, "input_names": ["particles", "nsteps", "seed"]}
and gen_specs
would contain something similar to:
46ensemble.gen_specs = GenSpecs(
47 gen_f=gen_f,
48 inputs=[],
49 persis_in=["sim_id"],
50 outputs=[("x", float, 3)],
51 ...,
52)
libEnsemble uses a convention of a multi-dimensional x
in generator functions. However,
these parameters can also be specified as different variables with corresponding modification
to generator and simulator functions.