Cooperative Computing Lab
CCL | Software | Install | Manuals | Forum | Papers
CCL Home

Research

Software Community Operations

Work Queue Tutorial

This tutorial will have you install CCTools and will take you through some distributed computation examples using Work Queue.

  1. Getting Started
    1. Tutorial Setup
    2. Set Environment Variables
  2. Work Queue Example
    1. Setup
    2. Analysis
    3. Running
  3. Using Project Names
    1. SGE Workers
    2. Multi-Slot Workers
  4. Exercise: Chaining Tasks

Getting Started

Tutorial Setup

The setup for this tutorial follows the setup described and installed in the Makeflow tutorial.

First, login in to the head node of XSEDE's Lonestar where you will install CCTools and run this tutorial.

ssh USERNAME@login2.ls4.tacc.utexas.edu

Please replace USERNAME with the username in the sheet of paper handed out to you. Use the password given there as well.

NOTE: Please feel free to use your own XSEDE account if you have one.

Set Environment Variables

For this tutorial, you will need to add your CCTools directory to your $PATH:

export PATH=~/cctools/bin:${PATH}

We will use both Python and Perl in this tutorial. They need to be able to find our installed packages. So set these environment variables for running the Python and Perl examples:

export PYTHONPATH=${PYTHONPATH}:~/cctools/lib/python2.4/site-packages export PERL5LIB=${PERL5LIB}:~/cctools/lib/perl5/site_perl

Work Queue program running the Simulation Executable

It is simple to write a program in C/Perl/Python (or any language with appropriate Work Queue bindings) which can generate tasks to be queued for Work Queue. In this example, we will create 10 simulation tasks using simulation.py.

Setup

mkdir -p ~/cctools-tutorial/wq cd ~/cctools-tutorial/wq wget http://www.nd.edu/~ccl/software/tutorials/xsede13/wq/simulation.py wget http://www.nd.edu/~ccl/software/tutorials/xsede13/wq/input.txt wget http://www.nd.edu/~ccl/software/tutorials/xsede13/wq/wq.py wget http://www.nd.edu/~ccl/software/tutorials/xsede13/wq/wq.pl

Analysis

We will analyze the code in wq.py to study a workflow implemented using the Python bindings for the Work Queue API.

$ cat wq.py #!/usr/bin/env python from work_queue import * import sys try: Q = WorkQueue(port = 0) except: print "could not instantiate Work Queue master" sys.exit(1) print "Listening on port %d." % Q.port print "Submitting 10 simulation tasks..." for i in range(0, 10): infile = "input.txt" outfile = "file.%0.2x" % i command = "python simulation.py %d < %s > %s" % (i, infile, outfile) T = Task(command) T.specify_file("simulation.py", "simulation.py", WORK_QUEUE_INPUT, cache = True) T.specify_file(infile, infile, WORK_QUEUE_INPUT, cache = False) T.specify_file(outfile, outfile, WORK_QUEUE_OUTPUT, cache = False) taskid = Q.submit(T) print "done." print "Waiting for tasks to complete..." while not Q.empty(): T = Q.wait(5) if T: print "task (id# %d) complete: %s (return code %d)" % (T.id, T.command, T.return_status) print "done."

Here we load the work_queue Python binding. Python will look in PYTHONPATH which is setup in your environment.

This instantiates a Work Queue master to which you may submit work. Setting the port to 0 instructs Work Queue to pick an arbitrary port to bind on.

We create a task which takes a shell command argument. The first task created in this workflow will have the command:

T = Task("python simulation.py 0 < input.txt > file.00");

Each task usually depends on a number of files to run. These include the executable and any input files. Here we specify the simulation.py executable and its input infile. Notice that we specify both simulation.py and infile twice when calling specify_file. The first argument is the name of the file on the master and the second argument is the name of the file we want created on the worker. Usually these filenames will be the same as in this example.

We specify the output file, outfile, which we want transferred back to the master.

At this point we have finished the description of our task and it is ready to be submitted for execution on the Work Queue workers. Q.submit submits this task.

At this point we wish to wait for all submitted tasks to complete. So long as the queue is not empty, we continue to call Q.wait waiting for the result of a task we submitted.

Here we call Q.wait(5) which takes a timeout argument. The call to wait will return a finished task which allows us to analyze the return_status or output. In this example, we set the timeout to 5 seconds which allows our application to do other things if a task is taking an inordinate amount of time to complete. We could have used the constant WORK_QUEUE_WAITFORTASK to wait indefinitely until a task completes.

Perl equivalent

The Perl code of the above program is in wq.pl and is shown here:

$ cat wq.pl #!/usr/bin/perl use work_queue; my $q = work_queue_create(0); if (not defined($q)) { print "could not instantiate Work Queue master\n"; exit 1; } $port = work_queue_port($q); print "Listening on port $port.\n"; print "Submitting 10 simulation tasks..."; for (my $i = 0; $i < 10; $i++) { my $infile = "input.txt"; my $outfile = sprintf("file.%0.2x", $i); my $command = "python simulation.py $i < $infile > $outfile"; my $t = work_queue_task_create($command); work_queue_task_specify_file($t,"simulation.py","simulation.py",$WORK_QUEUE_INPUT,$WORK_QUEUE_CACHE); work_queue_task_specify_file($t,$infile,$infile,$WORK_QUEUE_INPUT,$WORK_QUEUE_CACHE); work_queue_task_specify_file($t,$outfile,$outfile,$WORK_QUEUE_OUTPUT,$WORK_QUEUE_NOCACHE); my $taskid = work_queue_submit($q, $t); } print "done." print "Waiting for tasks to complete...\n"; while (not work_queue_empty($q)) { my $t = work_queue_wait($q, 5); if (defined($t)) { print "task (id#$t->{taskid}) complete:$t->{command_line} (return code $t->{return_status})\n"; work_queue_task_delete($t); } } print "done.\n"; work_queue_delete($q); exit 0;

Running

We are now going to run the Work Queue program shown above for running the simulation tasks. Pick one of the programs to run:

To run the Python program (wq.py) do: To run the Perl program (wq.pl) do:
python wq.py perl wq.pl

If you encounter an error when running this step, be sure you did not forget to setup your environment.

When a Work Queue program is run, it prints the port on which it is listening for connections from the workers. For example:

$ python wq.py Listening on port XXXX. ... $ perl wq.pl Listening on port XXXX. ...

We have successfully started a master and found the port and hostname on which it is listening. The master now is waiting for connections from workers so that it can dispatch the submitted tasks for execution.

To start a work_queue_worker for this master, open another terminal window and login into a head node of XSEDE's Lonestar with your given username:

ssh USERNAME@login2.ls4.tacc.utexas.edu

As before, replace USERNAME with the username provided to you (and its password).

Add the CCTools directory to our $PATH as before:

export PATH=~/cctools/bin:${PATH}

In this newly opened terminal, start a work_queue_worker for the master by giving it the port and hostname of the master. Also, enable the debugging output (-d all).

work_queue_worker -t 10 -d all localhost XXXX ...

replacing XXXX with the port the Work Queue master program is listening on.

The debug output for the worker will appear on your terminal. When the tasks are finished, the worker will quit after the 10 second timeout.

Running Makeflow with Work Queue using Project Names

Similarly to Makeflow, we can run Work Queue using project names and the catalog server.

To do this, you will first modify the master program to use the default catalog server (running at catalog.cse.nd.edu) by enabling the catalog mode. Then, you will provide the master with a project name that will be advertised to the default catalog server.

For Work Queue programs written in Python, you will use the specify_master_mode() API to set the catalog mode. Similarly, you will modify the program to specify a project name using the specify_name() API. An example of their usage is given below:

try: Q = WorkQueue(port = 0) except: sys.exit(1) user = getpass.getuser() projectname = "%s.workqueue" % user Q.specify_master_mode(WORK_QUEUE_MASTER_MODE_CATALOG) Q.specify_name(projectname)

You can download this changed python file using:

wget -O wq.py http://www.nd.edu/~ccl/software/tutorials/xsede13/wq/named/wq.py

For Work Queue programs in C/Perl, you will use the work_queue_specify_master_mode() API to set the catalog mode. To specify a project name, you will use work_queue_specify_name(). For example, a Work Queue program written in Perl will look like:

my $q = work_queue_create(0); if (not defined($q)) { print "could not instantiate Work Queue master\n"; exit 1; } my $user = getpwuid($>); my $projectname = "$user.workqueue"; work_queue_specify_master_mode($q, 1); work_queue_specify_name($q, $projectname);

You can download this changed perl file using:

wget -O wq.pl http://www.nd.edu/~ccl/software/tutorials/xsede13/wq/named/wq.pl

As before, starting the master is done be running the script:

python wq.py perl wq.pl

You will then start workers for your Work Queue master by specifying the option to use the default catalog server (-a option) and the project name of your master (-N option). Example:

work_queue_worker -t 10 -d all -a -M $USER.workqueue

Running Work Queue workers on SGE

We can also submit workers to run on dedicated SGE instances using the sge_submit_workers script. Again, we will use Lonstar's SGE cluster to demonstrate.

First, we submit the Work Queue master as a job using qsub as follows. We will create an SGE submission script:

echo -e '#!/bin/sh\n python wq.py' > sge_workqueue.sh echo -e '#!/bin/sh\n perl wq.pl' > sge_workqueue.sh

The python version of this script file for SGE should look like this:

$ cat sge_workqueue.sh #!/bin/sh python wq.py

We start this master on SGE by running qsub:

qsub -q development -l h_rt=00:10:00 -V -cwd -pe 12way 12 sge_workqueue.sh

Next, we start a worker for this master on SGE using qsub. Again, we create a script:

echo -e '#!/bin/sh\n work_queue_worker -t 10 -a -M $USER.workqueue' > sge_worker.sh

The worker script should look like this:

$ cat sge_worker.sh #!/bin/sh work_queue_worker -t 10 -a -M $USER.workqueue

Submit this script to SGE using qsub:

qsub -q development -l h_rt=00:10:00 -cwd -V -pe 12way 12 sge_worker.sh

As before, you can run this command multiple times, to run the task in parallel. Or you can use our sge_submit_workers script:

sge_submit_workers -t 10 -p "-q development -l h_rt=00:10:00 -V -pe 12way 12" -M $USER.workqueue 4

To monitor the completion of the submitted master and worker jobs, we will watch the status of these jobs using SGE's qstat interface:

watch "qstat -u $USER"

Note: Similar to sge_submit_workers, we also have condor_submit_workers, ec2_submit_workers, torque_submit_workers, pbs_submit_workers for submitting workers to Condor, EC2, Torque, and PBS respectively.

Running with Multi-slot Work Queue Workers

Similarly to the previous tutorial, we can specify the resource usage for each task allowing multiple tasks to be run on a single 12-core node.

To do this, we must, again, annotate the tasks with the requested resource usage. In Work Queue, we do this by calling functions to specify these resources.

You can download an annotated version of the previous python and perl scripts using:

wget -O wq.py http://www.nd.edu/~ccl/software/tutorials/xsede13/wq/multi/wq.py wget -O wq.pl http://www.nd.edu/~ccl/software/tutorials/xsede13/wq/multi/wq.pl

In this example, each of the simulation tasks each use a single core. So, we annotate the tasks to each require and use a single core. Since these are also small tasks, we annotate them to use 1 MB of memory and 1 MB of disk.

Below are the annotated code segments in context for each script:

T = Task(command) T.specify_file("simulation.py", "simulation.py", WORK_QUEUE_INPUT, cache = True) T.specify_file(infile, infile, WORK_QUEUE_INPUT, cache = False) T.specify_file(outfile, outfile, WORK_QUEUE_OUTPUT, cache = False) T.specify_cores( 1 ) T.specify_memory( 1 ) T.specify_disk( 1 ) taskid = Q.submit(T) my $t = work_queue_task_create($command); work_queue_task_specify_file($t, "simulation.py", "simulation.py", $WORK_QUEUE_INPUT, $WORK_QUEUE_CACHE); work_queue_task_specify_file($t, $infile, $infile, $WORK_QUEUE_INPUT, $WORK_QUEUE_CACHE); work_queue_task_specify_file($t, $outfile, $outfile, $WORK_QUEUE_OUTPUT, $WORK_QUEUE_NOCACHE); work_queue_task_specify_memory($t, 1 ); work_queue_task_specify_disk($t, 1 ); work_queue_task_specify_cores($t, 1 ); my $taskid = work_queue_submit($q, $t);

We can run this master by reusing the sge submit script from the previous section:

qsub -q development -l h_rt=00:10:00 -V -cwd -pe 12way 12 sge_workqueue.sh

After this, we submit multiple-slot workers as in the previous tutorial. First, we create a submit script:

echo -e '#!/bin/sh\n work_queue_worker --cores all -t 10 -a -M $USER.workqueue' > sge_multicore_worker.sh

The worker script should look like this:

$ cat sge_multicore_worker.sh #!/bin/sh work_queue_worker --cores all -t 10 -a -M $USER.workqueue

Submit this script to SGE using qsub:

qsub -q development -l h_rt=00:10:00 -cwd -V -pe 12way 12 sge_multicore_worker.sh

This worker will execute the many independent single-core tasks to complete the work queue.

Exercise: Chaining Tasks

The goal of this exercise is to change the workflow to chain the executions of simulation.py so that the output of one simulation is the input of another. For this exercise, the workflow should look like:

chaining simulation

Because our simulation.py is sophisticated and runs on average for 5 seconds, in this example we will only do 5 instances of the simulation (instead of 100) so it takes about 25 seconds.

For this exercise, remember that when you run Q.submit(T), it finalizes the task and allows it to be sent to a worker for execution. You will need to wait for the output from a worker to come back before sending out the next one. As before, you can wait for the completion of a task using Q.wait(5).

Once you have finished implementing your version of the chained simulation, you may compare with this python solution or perl solution.