From scripts to GNU Parallel

This tutorial describes in more detail how you can run jobs with GNU Parallel by using the multijob.commandline module.

Prerequisites:

Concepts and Architecture

The multijob module is intended to support easy parallelization of computation-intensive jobs, for example: evolutionary algorithms. We assume all jobs are the same but are executed with different parameter values.

For the purpose of this tutorial, we will assume the EA is written in Python. However, the command line interface is language-agnostic and can also be adapted for other languages.

The central part of your work is the executable that performs the job. I’ll call it runGA.py. This executable will be invoked with the job parameters as command line parameters. The command line parameters are represented in our code as multijob.job.Job objects.

The general workflow is:

  1. Generate a list of job configurations with the multijob.job.JobBuilder.
  2. Turn the list into a shell script with multijob.commandline.shell_command_from_job(). This shell script can be seen as a job queue.
  3. Process the job queue with GNU Parallel. This will invoke the runGA.py program. The actual target program may be run locally, or on a remote server.
  4. In the runGA.py, decode the command line arguments back to a Job instance.
  5. In the runGA.py, execute the Job
  6. In the runGA.py, write the results to output files.
  7. Aggregate and analyze your results.

Typemaps and Coercions

To convert between multijob.job.Job instances and command line parameters, we need to coerce each parameter value. A Coercion is a function that specifies how this is done. A mapping between parameter names and the coercions that should be used for these parameters is a typemap.

When converting a job to the command line params, by default we convert each parameter with str(). This works fine for strings, numbers, booleans, etc.:

>>> from multijob.job import JobBuilder
>>> from multijob.commandline import shell_command_from_job
>>> def dummy_target(x, y, z):
...     pass
...
>>> job, *_ = JobBuilder(x='foo', y=42.3, z=False).build(dummy_target)
>>> print(shell_command_from_job('runGA.py', job))
runGA.py --id=0 --rep=0 -- x=foo y=42.3 z=False

If you want to pass special types that do not have a suitable string representation, you may have to write your own coercion. E.g. if you want to pass a list of values for a parameter, we might want to encode it as CSV. Note that we now declare a TYPEMAP:

>>> import csv, io
>>> from multijob.job import JobBuilder
>>> from multijob.commandline import shell_command_from_job
>>> def dummy_target(xs):
...     pass
...
>>> def as_csv_line(values):
...     out = io.StringIO()
...     csv.writer(out).writerow(values)
...     return out.getvalue().strip('\r\n')
...
>>> TYPEMAP = dict(xs=as_csv_line)
>>> job, *_ = JobBuilder(xs=['Bond, James', '007']).build(dummy_target)
>>> print(shell_command_from_job('runGA.py', job, typemap=TYPEMAP))
runGA.py --id=0 --rep=0 -- 'xs="Bond, James",007'

This typemap just says: “for the xs parameter, use the as_csv_line() function.”

On the other end, we have to turn the command line parameters back to into a Job instance. This is usually a bit more difficult. In particular, we will almost always need a typemap. The coercions now specify how to turn the string back into a Python object. For simple built-in types like strings, ints, floats, and bools, you can use the type name as a string in place of a coercion – a named coercion. See the Coercion docs for details.

In Python, the command line parameters can be read from the sys.argv list. The first item is the name of the current command and has to be skipped. Multijob provides the multi.commandline.job_from_argv() function for the conversion:

>>> from multijob.commandline import job_from_argv
>>> def my_algorithm(x, y, z):
...     ...  # you probably have something more interesting here
...     return y if z else len(x)
...
>>> TYPEMAP = dict(
...     x='str',
...     y='float',
...     z='bool',
... )
...
>>> # argv = sys.argv
>>> argv = ['runGA.py', '--id=0', '--rep=0', '--',
...         'x=foo', 'y=42.3', 'z=False']
>>> # remember to skip first argv entry
>>> job = job_from_argv(argv[1:], my_algorithm, typemap=TYPEMAP)
...
>>> # execute the job
>>> res = job.run()
>>> res.result
3

Instead of using the predefined named coercions, you can always provide your own functions.

Converting a runGA() function to a runGA.py script

When moving to Multijob, you don’t have to throw your existing code away: Multijob is absolutely framework-agnostic. We only need a function that takes all configuration parameters as (named) arguments. So as long as you have such a runGA() function or can easily write one, everything is good.

In Python, it is possible to use a .py file as both a module that can be imported, and a script that can be executed directly. Anything that should be only run for a script has to be guarded, since top-level code is executed even for modules:

... # always executed

if __name__ == '__main__':
    ... # only executed when used as a script

The runGA() function would be in the always-executed part. Our adaption code will live in the guarded section.

Creating a Typemap to decode command line args

To adapt this function, we only have to decode the command line parameters. As shown above, this requires a typemap. We then get a Job instance that can then be run.

To construct the typemap, we look at the parameters for the runGA function. Usually, our typemap will have an entry for each named parameter. Here is a simple case:

def runGA(popsize, cxpb, mutpb, use_injection):
    ...

All parameters are named in the function definition, so we start writing our typemap:

TYPEMAP = dict(
    popsize=...,
    cxpb=...,
    mutpb=...,
    use_injection=...)

We now have to select the types for these parameters. Note that the typemap does not contain classes or class names, but coercion functions that parse the parameter value from a string. Some constructors (like int()) happen to correctly parse any string they are given. For other constructors, this is not the case. For example, bool('False') == True, so bool() is not a suitable coercion function.

To solve such problems, common conversions can simply be specified as named coercions. Here:

TYPEMAP = dict(
    popsize='int',
    cxpb='float',
    mutpb='float',
    use_injection='bool')

Not all functions are so easy. If the parameter list uses **kwargs to collect many named parameters, you’ll have to trace the code to find the actual parameter names. If the parameter list slurps many *args into a list, you will have to provide a parameter called args in your TYPEMAP which then expects a list. Remember that all parameters are passed as named parameters, not as positional parameters.

Decoding and running the Job

With the typemap, you can easily re-create a job from the command line arguments and execute it:

import sys
from multijob.commandline import job_from_argv

def runGA(a, b, c):
    ...

TYPEMAP = dict(a=..., b=..., c=...)

if __name__ == '__main__':
    job = job_from_argv(sys.argv[1:], runGA, typemap=TYPEMAP)
    res = job.run()
    ...

Note that you have to skip the first command line parameter: sys.argv[0] contains the name of the executable, not any real parameters.

Storing results in a file

After you executed the job, the JobResult should be stored somewhere. It is best to use language-agnostic formats like CSV for this.

If you are storing the results in a file, you will of course need different file names for each parameter configuration. The Job object includes job_id and repetition_id properties that can be used here. The job_id identifies the parameter configuration. If you repeated each configuration, the job_id is therefore not unique. Instead, the repetition_id identifies repetitions of the same job_id.

To create a result file using both of these IDs, you could do:

filename = "results.{}.{}.csv".format(job.job_id, job.repetition_id)
with open(filename, 'w') as f:
    ...

This produces file names like results.3.7.csv.

Of course, Multijob imposes no restrictions on the file name so you can use whatever schema you want, or even use non-file-based solutions like REST APIs or databases.

In my experience, it is best to put all of this plumbing into a separate main() function. For example:

import sys
from multijob.commandline import job_from_argv

def runGA(...):
    ...

TYPEMAP = dict(...)

def main(argv):
    job = job_from_argv(argv, runGA, typemap=TYPEMAP)
    res = job.run()

    filename = "results-{}-{}-logbook.csv".format(
        job.job_id, job.repetition_id)
    with open(filename, 'w') as f:
        csv_file = csv.writer(f)
        for record in res.result:
            csv_file.writerow(record)

if __name__ == '__main__':
    main(sys.argv[1:])

Now that your actual algorithm has been wrapped as an independent shell script, you can test it by running the script directly. Remember that you will have to provide all parameters in the expected format, and that you have to provide a job id:

$ python runGA.py --id=0 --rep=0 -- x=7 y=42.3 z='foo bar'

Creating the jobs

The multijob.job.JobBuilder is used to generate all job configurations you want to run. For each parameter, you can specify a list of one or more possible values for this parameter. The job configurations are then the cartesian product of these lists.

You have multiple possibilities to provide these lists:

Example:

>>> from multijob.job import JobBuilder
>>> builder = JobBuilder()
>>> builder.add('explict', 1, 2, 3)
(1, 2, 3)
>>> builder.add_range('range', 0.0, 6.0, 2.0)
[0.0, 2.0, 4.0, 6.0]
>>> builder.add_linspace('linspace', 3.0, 6.0, 4)
[3.0, 4.0, 5.0, 6.0]

The add_range() and add_linspace() variants are only convenience functions. They have many problematic aspects:

  • They only operate on floats. Because floats are inherently imprecise, you may see rounding problems.
  • Also, their output is evenly spaced. Your parameters might benefit from a different distribution, e.g. more dense around zero, or selected randomly. Randomizing your values also prevents sampling artifacts. If you randomize your values, make sure to record the random generator seed to ensure reproducibility!

When you specify many possible parameter values, this easily leads to a combinatorial explosion of job configurations. You may want to check for a sane number of configurations first. For that each builder can calculate the expected number_of_jobs(). This only considers the number of distinct configurations that will be built, but does not consider repetitions.

n_jobs = builder.number_of_jobs()
max_jobs = 1000
if n_jobs > max_jobs:
    raise RuntimeError(
        "too many job configurations: {}/{}".format(n_jobs, max_jobs))

Finally, we generate the actual list of jobs. Since we want to turn the jobs into shell commands, these job instances will never be directly run. We can therefore use a dummy callback for the worker function:

def dummy(**kwargs):
    pass

jobs = builder.build(dummy)

For many uses, it makes sense to repeat each configuration multiple times. In the context of evolutionary algorithms, the results of a single run are not statistically significant. We need multiple repetitions of each configuration in order to draw reliable conclusions. The build() method can take a repetitions argument that specifies the number of repetitions of each config, it defaults to 1.

The repeated job objects are identical (same param values, same job id) except for the repetition ID.

jobs = builder.build(dummy, repetitions=20)

Turning jobs into shell commands

The shell_command_from_job(prefix, job) function can turn a Job object into a shell command. The prefix argument is the command that should be invoked with the job parameters as arguments. If you know the command name in advance, you can use it here:

shell_command_from_job('./target.exe', job)

However, choosing a particular target executable is unnecessary at this point. By using a shell variable, we can defer the decision until run time. A variable will later make it easier to run distributed jobs on multiple servers. The variable also allows the same job definition file to be run with different target executables (e.g. a runGA.py script and a runGA native program). I recommend using the RUN_GA variable:

shell_command_from_job('$RUN_GA', job)

By default, the job parameters are just converted to strings via the builtin str(). If this is not appropriate, you can provide a typemap to provide an explicit serialization. This is discussed in more detail above in the section Typemaps and Coercions.

The resulting commands should be written to a file, usually called jobs.sh. This happens to be a valid shell script, though it should be rather understood as a line-by-line list of work items.

You can create this file by just writing to STDOUT and piping the results into a file:

from multijob.commandline import shell_command_from_job

for job in jobs:
    print(shell_command_from_job('$RUN_GA', job))

Then on the commandline: python generate-jobs.py >jobs.sh.

If your job generation script produces additional output that does not belong into the jobs file, you will have to open a file explicitly:

from multijob.commandline import shell_command_from_job

with open('jobs.sh', 'w') as f:
    for job in jobs:
        print(shell_command_from_job('$RUN_GA', job), file=f)

Executing jobs locally with parallel

Running the jobs locally is fairly easy: We just pipe the jobs.sh file into parallel. This will read the file line by line and execute each line as an individual shell script. By default, this will try to utilize your CPU optimally, and will run one job per available logical CPU.

# Minimal usage:
$ parallel <jobs.sh

However, GNU Parallel provides many additional useful options. Of these, --eta and joblog LOGFILE are must-have options:

# Recommended usage
$ parallel --eta --joblog .parallel-log <jobs.sh
--eta

Prints regular progress information and tries to estimate the remaining time until completion of all jobs. The estimates are unreliable until the first batch of jobs has completed.

This option is extremely useful feedback for monitoring, and for using your intermediate time. Since the number of completed jobs is slowly going up, you can see that the experiment is in fact making progress. If the time estimate is longer than expected, you may want to schedule the experiment for a time where you are absent, e.g. over night or over a weekend.

--jobs N
Limits the number of parallel jobs. This is mostly useful for testing where you don’t want to run with full power.
--joblog LOGFILE

Causes parallel to keep track of which jobs were started and run. If the jobs were interrupted (e.g. network connection loss), they can then be resumed later.

To resume processing, use the exact same set of options but add the --resume flag. This will continue with the next job that wasn’t run. In practice, it is probably better to use --resume-failed since the cause of interruption will likely cause jobs to fail. That way, any job that was recorded as completed-but-failed will also be run.

Resuming will only work correctly if the exact same options are used. This also means that the jobs.sh file should not be modified.

--line-buffer

Switch buffering mode to line-buffers.

By default (--group), the complete output (STDOUT, STDERR) is buffered until that job completes, and is then printed out at once. This avoids that the input of multiple jobs is intermingled, but also means that the output is deferred. This also requires extra storage and processing power.

Depending on your programs, it can be better to switch to line buffering, or to deactivate output buffering entirely. With line buffering, each line is printed as soon as it is complete. That means output arrives sooner, but lines of multiple jobs are intermingled without any indication which line belongs to which job.

You can --tag the output lines but by default that will prepend all arguments, which is likely to be unreadable. Instead, consider a custom --tagstring PATTERN, e.g. with a pattern like "{#}", which would be the GNU Parallel job ID.

To turn buffering off completely, use --ungroup.

Executing jobs remotely with parallel

To run a job on remote servers, we have to tackle two problems:

  • initialize the necessary environment for the job (data files, tools, libraries)
  • connect to the server, transfer any files, run the experiment, and transfer results back

Initializing the remote environment

Because we have to initialize the remote environment first, we cannot run our target executable runGA.py directly. Instead, we’ll have to create a wrapper script that initializes the environment and then runs the actual script.

This remote-job.sh script will:

  • unpack any data files that were transferred as an archive
  • initialize the environment
  • run the actual command
  • gather any output files into a results archive

The results archive must have a name that is known by GNU Parallel so that it can be transferred back. To that end, the remote-job.sh will receive a sequence ID as one parameter, and of course the line from jobs.sh as another argument. In our script, we’ll call these seq_id and target_exe, respectively.

Unpacking the data may just be a case of un-tarring any data files:

tar xzf data.tar.gz

How your environment needs to be initialized depends entirely on your system. In simple cases, it will suffice to activate a venv Python virtual environment that was pre-installed at a known location on the target server:

source path/to/venv/bin/activate

In other cases, you may want to adjust environment variables like PATH.

To run the job, we need to resolve the RUN_GA environment variable. Since the given jobs.sh line is given to the remote-job.sh script as a single string, we will need to exec it.

We should also write any output to a log file, though this not technically necessary. Using STDOUT is probably easier when debugging a concrete problem, but a logfile de-clutters possible output. The correct choice, as always, depends on your circumstances.

RUN_GA='python runGA.py'
eval "$target_exe >logfile_${seq_id}.txt 2>&1"

Finally, we need to pack any output files into the results archive. This would be the logfile produced above, and any result files. For example:

tar czf results-${seq_id}.tar.gz \
        result_*.csv logfile_*.csv

Together, the complete script would like this. Note that we set -e so that the whole script will fail as soon as any command fails. This fail-fast behaviour makes it more likely that errors will be spotted early, before you spent many hours running a potentially flawed experiment.

#!/bin/bash
set -e
seq_id="$1"
target_exe="$2"

# unpack data files
tar xzf data.tar.gz

# initialize environment
source path/to/venv/bin/activate

# run experiment
RUN_GA='python runGA.py'
eval "$target_exe >logfile_${seq_id}.txt 2>&1"

# gather results
tar czf results-${seq_id}.tar.gz \
        result_*.csv logfile_*.csv

Preparing SSH connections

GNU Parallel needs to be able to log into the remote servers without a password. I.e. the controlling user must upload a public key to the remote servers, and add the corresponding private key to their keyring session. See ssh-agent for info on the keyring. You can add keys with ssh-add.

By default, there will be one SSH connection per available CPU to each remote host. Usually, the SSH server has a fairly low connection limit which can cause some GNU Parallel connections to be refused. For large servers, you will have to edit the sshd configuration to raise the limit. At the very least, you will need to allow as many connections as you have CPUs, plus at least two additional connections.

We can later list all remote hosts on the commandline, or store them in an --sshloginfile. This text file contains one hostname per line, plus additional flags e.g. to limit the number of concurrent jobs on that host.

Running the remote job

We now need to transfer all these files to the server and run them. This implies the various options for running the script locally, as explained in section Executing jobs locally with parallel.

Instead of running the target executable directly, we want to actually run the wrapper script on the server:

$ parallel OPTIONS... ./remote-job.sh '{#}' '{}' <jobs.sh

The placeholders {#} and {} will be replaced by GNU Parallel to provide the actual arguments. The {#} is the GNU Parallel job id, which is used as seq_id in the script. The {} is one line from the jobs.sh file, i.e. the job configuration as a shell script snippet.

Required options:

--workdir DIRECTORY

The job will be executed in some particular --workdir.

You can specify a path to a directory, but then all concurrent jobs will share this directory. If you write your remote-job.sh to reflect this, a pre-configured path can be OK. E.g. the remote-job.sh could use the seq_id to create a temporary subdir. The advantage of a known directory is that any data files only need to be transferred once, and that you can pre-configure the environment.

This complication is usually undesirable, so we can get a temporary directory by giving GNU Parallel a triple dot: --workdir .... However, now all files have to be transferred again for each job, or manually before the experiment to a known location on the server.

--transferfile FILE

Files that should be transferred to the remote host for each job are given with --transferfile. In particular, this is the data archive data.tar.gz, the remote-job.sh script, and the target executable (e.g. runGA.py). This option is repeated for each file:

parallel OPTIONS... \
    --transferfile data.tar.gz \
    --transferfile remote-job.sh \
    --transferfile runGA.py \
    COMMAND ARGS...
--return FILE
Files that should be returned from the remote server have to be listed with --return. In our case, this would be the result archive results-${seq_id}.tar.gz. To generate the correct name with the ID, GNU Parallel can perform a substitution: --return 'results-{#}.tar.gz'.
--cleanup
Any transferred files and the temporary directory should be deleted afterwards.
--sshlogin HOST, --sshloginfile FILE
A list of remote hosts. The --sshlogin option is repeated for each remote host. See the section Preparing SSH connections for details.

After collecting the data files, these options allow you to run the jobs remotely. Afterwards, we can unpack the result archives and perform any analysis:

# Create the data archive
tar czf data.tar.gz data/

# Run the job remotely
parallel                            \
    --hostname worker.example.com   \
    --workdir ...                   \
    --transferfile data.tar.gz      \
    --transferfile remote-job.sh    \
    --transferfile runGA.py         \
    --return 'result-{#}.tar.gz'    \
    --cleanup                       \
    --eta                           \
    --joblog .joblog                \
    ./remote-job.sh '{#}' '{}' <jobs.sh

# Unpack the results
parallel tar xzf ::: result-*.tar.gz

# Clean unneeded files
rm -f data.tar.gz result-*.tar.gz

And that’s it! This looks like a lot, but you probably only need half the details mentioned here, and everything will be easier the second time.