Executor with Electrostatic Forces

This tutorial highlights libEnsemble’s capability to execute and monitor external scripts or user applications within simulation or generator functions using the executor. In this tutorial, our calling script registers an external C executable that simulates electrostatic forces between a collection of particles. The sim_f routine then launches and polls this executable.

It is possible to use subprocess calls from Python to issue commands such as jsrun or aprun to run applications. Unfortunately, hard-coding such commands within user scripts isn’t portable. Furthermore, many systems like Argonne’s Theta do not allow libEnsemble to submit additional tasks from the compute nodes. On these systems a proxy launch mechanism (such as Balsam) is required. libEnsemble’s executor was developed to directly address such issues.

Getting Started

The simulation source code forces.c can be obtained directly from the libEnsemble repository here.

Assuming MPI and its C compiler mpicc are available, compile forces.c into an executable (forces.x) with:

$ mpicc -O3 -o forces.x forces.c -lm

Calling Script

Let’s begin by writing our calling script to parameterize our simulation and generation functions and call libEnsemble. Create a Python file containing:

 1#!/usr/bin/env python
 2import os
 3import numpy as np
 4from forces_simf import run_forces  # Sim func from current dir
 5
 6from libensemble.libE import libE
 7from libensemble.gen_funcs.sampling import uniform_random_sample
 8from libensemble.tools import parse_args, add_unique_random_streams
 9from libensemble.executors.mpi_executor import MPIExecutor
10
11nworkers, is_manager, libE_specs, _ = parse_args()  # Convenience function
12
13# Create executor and register sim to it
14exctr = MPIExecutor()  # Use auto_resources=False to oversubscribe
15
16# Create empty simulation input directory
17if not os.path.isdir('./sim'):
18    os.mkdir('./sim')
19
20# Register simulation executable with executor
21sim_app = os.path.join(os.getcwd(), 'forces.x')
22exctr.register_calc(full_path=sim_app, calc_type='sim')

On line 4 we import our not-yet-written sim_f. We also import necessary libEnsemble components and some convenience functions. For example, our script can use the number of workers (nworkers), a boolean determining if the process is the manager process (is_manager), and a default libE_specs with a call to the parse_args() convenience function.

Next we define our executor class instance. This instance can be customized with many of the settings defined here. We’ll register our simulation with the executor and use the same instance within our sim_f.

libEnsemble can perform and write every simulation (within the ensemble) in a separate directory for organization and potential I/O benefits. In this example, libEnsemble copies a source directory and its contents to create these simulation directories. For our purposes, an empty directory ./sim is sufficient.

Next define the sim_specs and gen_specs data structures:

 1# State the sim_f, its arguments, output, and parameters (and their sizes)
 2sim_specs = {'sim_f': run_forces,         # sim_f, imported above
 3             'in': ['x'],                 # Name of input for sim_f
 4             'out': [('energy', float)],  # Name, type of output from sim_f
 5             'user': {'simdir_basename': 'forces',  # User parameters for sim_f
 6                      'cores': 2,
 7                      'sim_particles': 1e3,
 8                      'sim_timesteps': 5,
 9                      'sim_kill_minutes': 10.0,
10                      'particle_variance': 0.2,
11                      'kill_rate': 0.5}
12             }
13
14# State the gen_f, its arguments, output, and necessary parameters.
15gen_specs = {'gen_f': uniform_random_sample,  # Generator function
16             'in': ['sim_id'],                # Generator input
17             'out': [('x', float, (1,))],     # Name, type and size of data from gen_f
18             'user': {'lb': np.array([0]),            # User parameters for gen_f
19                      'ub': np.array([32767]),
20                      'gen_batch_size': 1000,
21                      'batch_mode': True,
22                      'num_active_gens': 1,
23                      }
24             }

These dictionaries configure our generation function gen_f and our simulation function sim_f, respectively, as the uniform_random_sample and run_forces functions. Our gen_f will generate random seeds when initializing each sim_f call.

After some additions to libE_specs and defining our exit_criteria and persis_info, our script calls the main libE routine:

1libE_specs['save_every_k_gens'] = 1000  # Save every K steps
2libE_specs['sim_input_dir'] = './sim'   # Sim dir to be copied for each worker
3
4exit_criteria = {'sim_max': 8}
5
6persis_info = add_unique_random_streams({}, nworkers + 1)
7
8H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria,
9                            persis_info=persis_info, libE_specs=libE_specs)

Simulation Function

Our sim_f is where we’ll use libEnsemble’s executor to configure and submit for execution our compiled simulation code. We will poll this task’s state while it runs, and once we’ve detected it has finished we will send any results or exit statuses back to the manager.

Create another Python file named forces_simf.py containing:

 1import os
 2import time
 3import numpy as np
 4
 5from libensemble.executors.executor import Executor
 6from libensemble.message_numbers import WORKER_DONE, WORKER_KILL, TASK_FAILED
 7
 8MAX_SEED = 32767
 9
10def perturb(particles, seed, max_fraction):
11    """Modify particle count"""
12    seed_fraction = seed/MAX_SEED
13    max_delta = particles * max_fraction
14    delta = seed_fraction * max_delta
15    delta = delta - max_delta/2  # translate so -/+
16    new_particles = particles + delta
17    return int(new_particles)
18
19def read_last_line(filepath):
20    """Read last line of statfile"""
21    try:
22        with open(filepath, 'rb') as fh:
23            line = fh.readlines()[-1].decode().rstrip()
24    except Exception:
25        line = ""  # In case file is empty or not yet created
26    return line

We use libEnsemble’s message number tags to communicate the worker’s status to the manager. For testing purposes, the perturb() function randomizes the resources used for each calculation. The second function parses forces values and statuses in the .stat file produced by our compiled code. Now we can write the actual sim_f. We’ll first write the function definition, extract our parameters from sim_specs, define a random seed, and use perturb() to randomize our particle counts.

 1def run_forces(H, persis_info, sim_specs, libE_info):
 2    calc_status = 0
 3
 4    x = H['x']
 5    sim_particles = sim_specs['user']['sim_particles']
 6    sim_timesteps = sim_specs['user']['sim_timesteps']
 7    time_limit = sim_specs['user']['sim_kill_minutes'] * 60.0
 8
 9    cores = sim_specs['user'].get('cores', None)
10    kill_rate = sim_specs['user'].get('kill_rate', 0)
11    particle_variance = sim_specs['user'].get('particle_variance', 0)
12
13    seed = int(np.rint(x[0][0]))
14
15    # To give a random variance of work-load
16    sim_particles = perturb(sim_particles, seed, particle_variance)

Next we will instantiate our executor and submit our registered application for execution.

 1    # Use pre-defined executor object
 2    exctr = Executor.executor
 3
 4    # Arguments for our registered simulation
 5    args = str(int(sim_particles)) + ' ' + str(sim_timesteps) + ' ' + str(seed) + ' ' + str(kill_rate)
 6
 7    # Submit our simulation for execution.
 8    if cores:
 9        task = exctr.submit(calc_type='sim', num_procs=cores, app_args=args,
10                            stdout='out.txt', stderr='err.txt', wait_on_run=True)
11    else:
12        task = exctr.submit(calc_type='sim', app_args=args, stdout='out.txt',
13                            stderr='err.txt', wait_on_run=True)

In each executor submit() routine, we define the type of calculation being performed, optionally the number of processors to run the task on, additional arguments for the simulation code, and files for stdout and stderr output. The wait_on_run argument pauses sim_f execution until the task is confirmed to be running. See the docs for more information about these and other options.

The rest of our sim_f polls the task’s dynamically updated attributes for its status, determines if a successful run occurred after the task completes, then formats and returns the output data to the manager.

We can poll the task and kill it in certain circumstances:

 1    # Stat file to check for bad runs
 2    statfile = 'forces.stat'
 3    filepath = os.path.join(task.workdir, statfile)
 4    line = None
 5
 6    poll_interval = 1
 7    while not task.finished :
 8        line = read_last_line(filepath)  # Parse some output from the task
 9        if line == "kill":
10            task.kill()
11        elif task.runtime > time_limit:
12            task.kill()
13        else:
14            time.sleep(poll_interval)
15            task.poll()                   # updates the task's attributes

Once our task finishes, adjust calc_status (our “exit code”) and report to the user based on the task’s final state:

 1    if task.finished:
 2        if task.state == 'FINISHED':
 3            print("Task {} completed".format(task.name))
 4            calc_status = WORKER_DONE
 5            if read_last_line(filepath) == "kill":
 6                print("Warning: Task complete but marked bad (kill flag in forces.stat)")
 7        elif task.state == 'FAILED':
 8            print("Warning: Task {} failed: Error code {}".format(task.name, task.errcode))
 9            calc_status = TASK_FAILED
10        elif task.state == 'USER_KILLED':
11            print("Warning: Task {} has been killed".format(task.name))
12            calc_status = WORKER_KILL
13        else:
14            print("Warning: Task {} in unknown state {}. Error code {}".format(task.name, task.state, task.errcode))

Load output data from our task and return to the libEnsemble manager:

 1    time.sleep(0.2) # Small buffer to guarantee data has been written
 2    try:
 3        data = np.loadtxt(filepath)
 4        final_energy = data[-1]
 5    except Exception:
 6        final_energy = np.nan
 7
 8    outspecs = sim_specs['out']
 9    output = np.zeros(1, dtype=outspecs)
10    output['energy'][0] = final_energy
11
12    return output, persis_info, calc_status

This completes our sim_f and calling script. Run libEnsemble with:

$ python my_calling_script.py --comms local --nworkers 4

This may take about a minute to complete. Output should appear in a new directory ./ensemble, with sub-directories labeled by sim_id and worker.

The following optional lines parse and display some output:

1import os
2
3for dir in os.listdir('./ensemble'):
4    with open(os.path.join('./ensemble', dir, 'out.txt')) as f:
5        out = f.readlines()
6    print(dir + ':')
7    for line in out:
8        print(line)
9    print('-'*60)

Executor Variants

libEnsemble features two variants of its executor that perform identical functions, but are designed for running on different systems. For most uses, the MPI variant will be satisfactory. However, some systems, such as ALCF’s Theta do not support MPI launches from compute nodes. On these systems libEnsemble is run either on launch nodes or uses a proxy launch mechanism to submit tasks from compute nodes. One such mechanism is a scheduling utility called Balsam which runs on a separate node. The Balsam Executor variant interacts with Balsam for this purpose. The only user-facing difference between the two is which executor is imported and called within a calling script.