Executor with Electrostatic Forces
This tutorial highlights libEnsemble’s capability to execute and monitor external scripts or user applications within simulation or generator functions using the executor. In this tutorial, our calling script registers a compiled executable that simulates electrostatic forces between a collection of particles. The simulator function launches instances of this executable and reads output files to determine if the run was successful.
It is possible to use subprocess
calls from Python to issue
commands such as jsrun
or aprun
to run applications. Unfortunately,
hard-coding such commands within user scripts isn’t portable.
Furthermore, many systems like Argonne’s Theta do not
allow libEnsemble to submit additional tasks from the compute nodes. On these
systems, a proxy launch mechanism (such as Balsam) is required.
libEnsemble’s Executors were developed to directly address such issues.
In particular, we’ll be experimenting with libEnsemble’s MPI Executor, since it can automatically detect available MPI runners and resources, and by default divide them equally among workers.
Getting Started
The simulation source code forces.c
can be obtained directly from the
libEnsemble repository here.
Assuming MPI and its C compiler mpicc
are available, compile
forces.c
into an executable (forces.x
) with:
$ mpicc -O3 -o forces.x forces.c -lm
Calling Script
Let’s begin by writing our calling script to parameterize our simulation and generation functions and call libEnsemble. Create a Python file called run_libe_forces.py containing:
1#!/usr/bin/env python
2import os
3import numpy as np
4from forces_simf import run_forces # Sim func from current dir
5
6from libensemble.libE import libE
7from libensemble.gen_funcs.sampling import uniform_random_sample
8from libensemble.tools import parse_args, add_unique_random_streams
9from libensemble.executors import MPIExecutor
10
11# Parse number of workers, comms type, etc. from arguments
12nworkers, is_manager, libE_specs, _ = parse_args()
13
14# Initialize MPI Executor instance
15exctr = MPIExecutor()
16
17# Register simulation executable with executor
18sim_app = os.path.join(os.getcwd(), "../forces_app/forces.x")
19exctr.register_app(full_path=sim_app, app_name="forces")
On line 15, we instantiate our MPI Executor class instance, which can optionally be customized by specifying alternative MPI runners. The auto-detected default should be sufficient.
Registering an application is as easy as providing the full file-path and giving it a memorable name. This Executor instance will later be retrieved within our simulation function to launch the registered app.
Next define the sim_specs and gen_specs data structures. Recall that these are used to specify to libEnsemble what user functions and input/output fields to expect, and also to parameterize function instances without hard-coding:
1# State the sim_f, inputs, outputs
2sim_specs = {
3 "sim_f": run_forces, # sim_f, imported above
4 "in": ["x"], # Name of input for sim_f
5 "out": [("energy", float)], # Name, type of output from sim_f
6}
7
8# State the gen_f, inputs, outputs, additional parameters
9gen_specs = {
10 "gen_f": uniform_random_sample, # Generator function
11 "in": [], # Generator input
12 "out": [("x", float, (1,))], # Name, type, and size of data from gen_f
13 "user": {
14 "lb": np.array([1000]), # User parameters for the gen_f
15 "ub": np.array([3000]),
16 "gen_batch_size": 8,
17 },
18}
Our generation function will generate random numbers of particles (between
the "lb"
and "ub"
bounds) for our simulation function to evaluate via our
registered application.
The following additional instructs libEnsemble’s workers to each create and work within a separate directory each time they call a simulation function. This helps organize output and also helps prevents workers from overwriting previous results:
1# Create and work inside separate per-simulation directories
2libE_specs["sim_dirs_make"] = True
After configuring persis_info and exit_criteria, we initialize libEnsemble by calling the primary libE() routine:
1# Instruct libEnsemble to exit after this many simulations
2exit_criteria = {"sim_max": 8}
3
4# Seed random streams for each worker, particularly for gen_f
5persis_info = add_unique_random_streams({}, nworkers + 1)
6
7# Launch libEnsemble
8H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info=persis_info, libE_specs=libE_specs)
Exercise
This may take some additional browsing of the docs to complete.
Write an alternative Calling Script similar to above, but with the following differences:
Add an additional worker directory so workers operate in
/scratch/ensemble
instead of the default current working directory.Override the MPIExecutor’s detected MPI runner with
"openmpi"
.Set libEnsemble’s logger to print debug messages.
Use the
save_libE_output()
function to save the History array andpersis_info
to files after libEnsemble completes.
Click Here for Solution
1 #!/usr/bin/env python
2 import os
3 import numpy as np
4 from forces_simf import run_forces # Sim func from current dir
5
6 from libensemble import logger
7 from libensemble.libE import libE
8 from libensemble.gen_funcs.sampling import uniform_random_sample
9 from libensemble.tools import parse_args, add_unique_random_streams, save_libE_output
10 from libensemble.executors import MPIExecutor
11
12 # Parse number of workers, comms type, etc. from arguments
13 nworkers, is_manager, libE_specs, _ = parse_args()
14
15 # Adjust logger level
16 logger.set_level("DEBUG")
17
18 # Initialize MPI Executor instance
19 exctr = MPIExecutor(custom_info={"mpi_runner": "openmpi"})
20
21 ...
22
23 # Instruct workers to operate somewhere else on the filesystem
24 libE_specs["ensemble_dir_path"] = "/scratch/ensemble"
25
26 ...
27
28 # Launch libEnsemble
29 H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info=persis_info, libE_specs=libE_specs)
30
31 if is_manager:
32 save_libE_output(H, persis_info, __file__, nworkers)
Simulation Function
Our simulation function is where we’ll use libEnsemble’s executor to configure and submit our application for execution. We’ll poll this task’s state while it runs, and once we’ve detected it has finished we’ll send any results or exit statuses back to the manager.
Create another Python file named forces_simf.py
containing the following
for starters:
1import numpy as np
2
3# To retrieve our MPI Executor instance
4from libensemble.executors.executor import Executor
5
6# Optional status codes to display in libE_stats.txt for each gen or sim
7from libensemble.message_numbers import WORKER_DONE, TASK_FAILED
8
9
10def run_forces(H, _, sim_specs):
11 calc_status = 0
12
13 # Parse out num particles, from generator function
14 particles = str(int(H["x"][0][0]))
15
16 # num particles, timesteps, also using num particles as seed
17 args = particles + " " + str(10) + " " + particles
18
19 # Retrieve our MPI Executor instance
20 exctr = Executor.executor
21
22 # Submit our forces app for execution
23 task = exctr.submit(app_name="forces", app_args=args)
24
25 # Block until the task finishes
26 task.wait()
We retrieve the generated number of particles from H
and construct
an argument string for our launched application. The particle count doubles up
as a random number seed here. Note a fourth argument can be added to forces
that gives forces a chance of a “bad run” (a float between 0 and 1), but
for now that will default to zero.
We then retrieve our previously instantiated Executor instance from the class definition, where it was automatically stored as an attribute.
After submitting the “forces” app for execution,
a Task object is returned that correlates with the launched app.
This object is roughly equivalent to a Python future, and can be polled, killed,
and evaluated in a variety of helpful ways. For now, we’re satisfied with waiting
for the task to complete via task.wait()
.
We can assume that afterward, any results are now available to parse. Our application
produces a forces.stat
file that contains either energy
computations for every time-step or a “kill” message if particles were lost, which
indicates a bad run - this can be ignored for now.
To complete our simulation function, parse the last energy value from the output file into
a local output History array, and if successful,
set the simulation function’s exit status calc_status
to WORKER_DONE
. Otherwise, send back NAN
and a TASK_FAILED
status:
1 # Stat file to check for bad runs
2 statfile = "forces.stat"
3
4 # Try loading final energy reading, set the sim's status
5 try:
6 data = np.loadtxt(statfile)
7 final_energy = data[-1]
8 calc_status = WORKER_DONE
9 except Exception:
10 final_energy = np.nan
11 calc_status = TASK_FAILED
12
13 # Define our output array, populate with energy reading
14 outspecs = sim_specs["out"]
15 output = np.zeros(1, dtype=outspecs)
16 output["energy"][0] = final_energy
17
18 # Return final information to worker, for reporting to manager
19 return output, calc_status
calc_status
will be displayed in the libE_stats.txt
log file.
That’s it! As can be seen, with libEnsemble, it’s relatively easy to get started with launching applications. Behind the scenes, libEnsemble evaluates default MPI runners and available resources and divides them among the workers.
This completes our calling script and simulation function. Run libEnsemble with:
$ python run_libe_forces.py --comms local --nworkers [nworkers]
This may take up to a minute to complete. Output files—including forces.stat
and files containing stdout
and stderr
content for each task—should
appear in the current working directory. Overall workflow information
should appear in libE_stats.txt
and ensemble.log
as usual.
For example, my libE_stats.txt
resembled:
Worker 1: Gen no 1: gen Time: 0.001 Start: ... End: ... Status: Not set
Worker 1: sim_id 0: sim Time: 0.227 Start: ... End: ... Status: Completed
Worker 2: sim_id 1: sim Time: 0.426 Start: ... End: ... Status: Completed
Worker 1: sim_id 2: sim Time: 0.627 Start: ... End: ... Status: Completed
Worker 2: sim_id 3: sim Time: 0.225 Start: ... End: ... Status: Completed
Worker 1: sim_id 4: sim Time: 0.224 Start: ... End: ... Status: Completed
Worker 2: sim_id 5: sim Time: 0.625 Start: ... End: ... Status: Completed
Worker 1: sim_id 6: sim Time: 0.225 Start: ... End: ... Status: Completed
Worker 2: sim_id 7: sim Time: 0.626 Start: ... End: ... Status: Completed
Where status
is set based on the simulation function’s returned calc_status
.
My ensemble.log
(on a ten-core laptop) resembled:
[0] ... libensemble.libE (INFO): Logger initializing: [workerID] precedes each line. [0] = Manager
[0] ... libensemble.libE (INFO): libE version v0.9.0
[0] ... libensemble.manager (INFO): Manager initiated on node my_laptop
[0] ... libensemble.manager (INFO): Manager exit_criteria: {"sim_max": 8}
[1] ... libensemble.worker (INFO): Worker 1 initiated on node my_laptop
[2] ... libensemble.worker (INFO): Worker 2 initiated on node my_laptop
[1] ... libensemble.executors.mpi_executor (INFO): Launching task libe_task_forces_worker1_0: mpirun -hosts my_laptop -np 5 --ppn 5 /Users/.../forces.x 2023 10 2023
[2] ... libensemble.executors.mpi_executor (INFO): Launching task libe_task_forces_worker2_0: mpirun -hosts my_laptop -np 5 --ppn 5 /Users/.../forces.x 2900 10 2900
[1] ... libensemble.executors.executor (INFO): Task libe_task_forces_worker1_0 finished with errcode 0 (FINISHED)
[1] ... libensemble.executors.mpi_executor (INFO): Launching task libe_task_forces_worker1_1: mpirun -hosts my_laptop -np 5 --ppn 5 /Users/.../forces.x 1288 10 1288
[2] ... libensemble.executors.executor (INFO): Task libe_task_forces_worker2_0 finished with errcode 0 (FINISHED)
[2] ... libensemble.executors.mpi_executor (INFO): Launching task libe_task_forces_worker2_1: mpirun -hosts my_laptop -np 5 --ppn 5 /Users/.../forces.x 2897 10 2897
[1] ... libensemble.executors.executor (INFO): Task libe_task_forces_worker1_1 finished with errcode 0 (FINISHED)
[1] ... libensemble.executors.mpi_executor (INFO): Launching task libe_task_forces_worker1_2: mpirun -hosts my_laptop -np 5 --ppn 5 /Users/.../forces.x 1623 10 1623
[2] ... libensemble.executors.executor (INFO): Task libe_task_forces_worker2_1 finished with errcode 0 (FINISHED)
[2] ... libensemble.executors.mpi_executor (INFO): Launching task libe_task_forces_worker2_2: mpirun -hosts my_laptop -np 5 --ppn 5 /Users/.../forces.x 1846 10 1846
[1] ... libensemble.executors.executor (INFO): Task libe_task_forces_worker1_2 finished with errcode 0 (FINISHED)
[1] ... libensemble.executors.mpi_executor (INFO): Launching task libe_task_forces_worker1_3: mpirun -hosts my_laptop -np 5 --ppn 5 /Users/.../forces.x 2655 10 2655
[2] ... libensemble.executors.executor (INFO): Task libe_task_forces_worker2_2 finished with errcode 0 (FINISHED)
[2] ... libensemble.executors.mpi_executor (INFO): Launching task libe_task_forces_worker2_3: mpirun -hosts my_laptop -np 5 --ppn 5 /Users/.../forces.x 1818 10 1818
[1] ... libensemble.executors.executor (INFO): Task libe_task_forces_worker1_3 finished with errcode 0 (FINISHED)
[2] ... libensemble.executors.executor (INFO): Task libe_task_forces_worker2_3 finished with errcode 0 (FINISHED)
[0] ... libensemble.manager (INFO): Term test tripped: sim_max
[0] ... libensemble.manager (INFO): Term test tripped: sim_max
[0] ... libensemble.libE (INFO): Manager total time: 3.939
Note again that the ten cores were divided equally among two workers.
That concludes this tutorial. Each of these example files can be found in the repository in examples/tutorials/forces_with_executor.
For further experimentation, we recommend trying out this libEnsemble tutorial workflow on a cluster or multi-node system, since libEnsemble can also manage those resources and is developed to coordinate computations at huge scales. Please feel free to contact us or open an issue on GitHub if this tutorial workflow doesn’t work properly on your cluster or other compute resource.
Exercises
These may require additional browsing of the documentation to complete.
Adjust
submit()
to launch with four processes.Adjust
submit()
again so the app’sstdout
andstderr
are written tostdout.txt
andstderr.txt
respectively.Add a fourth argument to the args line to make 20% of simulations go bad.
Construct a
while not task.finished:
loop that periodically sleeps for a tenth of a second, callstask.poll()
, then reads the output.stat
file, and callstask.kill()
if the output file contains"kill\n"
or iftask.runtime
exceeds sixty seconds.
Click Here for Solution
1 import time
2
3 ...
4 args = particles + " " + str(10) + " " + particles + " " + str(0.2)
5 ...
6 statfile = "forces.stat"
7 task = exctr.submit(
8 app_name="forces",
9 app_args=args,
10 num_procs=4,
11 stdout="stdout.txt",
12 stderr="stderr.txt",
13 )
14
15 while not task.finished:
16 time.sleep(0.1)
17 task.poll()
18
19 if task.file_exists_in_workdir(statfile):
20 with open(statfile, "r") as f:
21 if "kill\n" in f.readlines():
22 task.kill()
23
24 if task.runtime > 60:
25 task.kill()
26
27 ...