NAME

Work_Queue - Perl Work Queue bindings.

SYNOPSIS

The objects and methods provided by this package correspond to the native C API in work_queue.h. See also Work_Queue::Task, which is automatically loaded with this module.

The SWIG-based Perl bindings provide a higher-level interface, such as:

                use Work_Queue;

                my $q = Work_Queue->new( port => $port, name => 'my_queue_name');

                my $t = Work_Queue::Task->new($command);
                $t->specify_input_file(local_name => 'some_name', remote_name => 'some_other_name');
                $t->specify_output_file('some_name');

                $q->submit($t);

                my $stats = $q->stats;
                print $stats->{tasks_running}, '\n';

                $t = $q->wait(5);

                if($t) {
                                my $resources = $t->resources_measured;
                                print $resources->{resident_memory}, '\n';
                }

METHODS

Work_Queue

Work_Queue::new ( )

Work_Queue::new ( $port )

Work_Queue::new ( port => ..., name => ..., catalog => ..., shutdown => ..., transactions_log => ..., stats_log => ..., debug_log => ...)

Create a new work queue.

port

A single number indicating the port number to listen on, or an array of the form [lower, upper], which indicates the inclusive range of available ports from which one is chosen at random. If not specified, the default 9123 is chosen. If zero, a port is chosen at random.

name

The project name to use.

catalog

Whether or not to enable catalog mode.

transactions_log

The name of a file to write the queue's transactions log.

stats_log

The name of a file to write the queue's statistics log.

debug_log

The name of a file to write the queue's debug log.

shutdown

Automatically shutdown workers when queue is finished. Disabled by default.

                my $q = Work_Queue->new( port => 0, name => 'my_queue' );

See work_queue_create in the C API for more information about environmental variables that affect the behavior this method.

name

Get the project name of the queue.

                 print $q->name;

port

Get the listening port of the queue.

                 print $q->port

stats

Get the manager's queue statistics.

                 print $q->stats->{workers_busy};

stats_hierarchy

Get the queue statistics, including manager and foremen.

                 print $q->stats_hierarchy->{workers_busy};

stats_category

Get the tasks statistics from the particular category.

                 $s = $q->stats_category("my_category")
                 print $s->{tasks_waiting}

specify_category_mode

Turn on or off first-allocation labeling for a given category. By default, only cores, memory, and disk resources are labeled, and gpus are unlabeled. Turn on/off specific resources with specify_category_autolabel_resource. NOTE: autolabeling is only meaningfull when task monitoring is enabled (enable_monitoring). When monitoring is enabled and a task exhausts resources in a worker, mode dictates how work queue handles the exhaustion:

category

A category name. If undefined, sets the mode by default for newly created categories.

mode

One of @ref category_mode_t:

$Work_Queue::WORK_QUEUE_ALLOCATION_MODE_FIXED

Task fails (default).

$Work_Queue::WORK_QUEUE_ALLOCATION_MODE_MAX

If maximum values are specified for cores, memory, disk, or gpus (e.g. via specify_max_category_resources or specify_memory), and one of those resources is exceeded, the task fails. Otherwise it is retried until a large enough worker connects to the manager, using the maximum values specified, and the maximum values so far seen for resources not specified. Use specify_max_retries to set a limit on the number of times work queue attemps to complete the task.

$Work_Queue::WORK_QUEUE_ALLOCATION_MODE_MIN_WASTE

As above, but work queue tries allocations to minimize resource waste.

$Work_Queue::WORK_QUEUE_ALLOCATION_MODE_MAX_THROUGHPUT

As above, but work queue tries allocations to maximize throughput.

specify_category_autolabel_resource

Turn on or off first-allocation labeling for a given category and resource. This function should be use to fine-tune the defaults from specify_category_mode.

category

A category name.

resource

A resource name.

autolabel

0/1 for off/on.

enable_monitoring($dir_name, $watchdog)

Enables resource monitoring of tasks in the queue, and writes a summary per task to the directory given. Additionally, all summaries are consolidate into the file all_summaries-PID.log

Returns 1 on success, 0 on failure (i.e., monitoring was not enabled).

dirname Directory name for the monitor output.
watchdog If non-zero, kill tasks that exhaust their declared resources. (if not given, defaults to 1)

enable_monitoring_full($dir_name, $watchdog)

As @ref enable_monitoring, but it also generates a time series and a debug file. WARNING: Such files may reach gigabyte sizes for long running tasks.

Returns 1 on success, 0 on failure (i.e., monitoring was not enabled).

dirname Directory name for the monitor output.
watchdog If non-zero, kill tasks that exhaust their declared resources. (if not given, defaults to 1)

enable_monitoring_snapshots($filename)

When monitoring, indicates a file that when present, directs the resource monitor to take a snapshot of the resources. Snapshots appear in the JSON summary file of the task, under the key "snapshots". The file is removed after the snapshot, so that a new snapshot can be taken when it is recreated by a task. Optionaly, the first line of the file can be used to give an identifying label to the snapshot.

self Reference to the current work queue object.

item signal_file Name of the file which presence directs the resource monitor to take a snapshot. After the snapshot, THIS FILE IS REMOVED.

activate_fast_abort

Turn on or off fast abort functionality for a given queue for tasks without an explicit category. Given the multiplier, abort a task which running time is larger than the average times the multiplier. Fast-abort is computed per task category. The value specified here applies to all the categories for which @ref activate_fast_abort_category was not explicitely called.

multiplier

The multiplier of the average task time at which point to abort; if less than zero, fast_abort is deactivated (the default).

activate_fast_abort_category

Turn on or off fast abort functionality for a given category. Given the multiplier, abort a task which running time is larger than the average times the multiplier. The value specified here applies only to tasks in the given category. (Note: work_queue_activate_fast_abort_category(q, "default", n) is the same as work_queue_activate_fast_abort(q, n).)

name

The name of the category.

multiplier

The multiplier of the average task time at which point to abort; if zero, fast_abort is deactivated. If less than zero (default), use the fast abort of the "default" category.

specify_draining_by_hostname

Set draining mode for workers at hostname.

The hostname the host running the workers.
If 0, workers at hostname work as usual, else no new tasks are dispatched, and empty workers are shutdown.

empty

Determine whether there are any known tasks queued, running, or waiting to be collected.

Returns 0 if there are tasks remaining in the system, 1 if the system is "empty".

hungry

Determine whether the queue can support more tasks.

Returns the number of additional tasks it can support if "hungry" and 0 if "sated".

specify_algorithm

Set the worker selection algorithm for queue.

algorithm

One of the following algorithms to use in assigning a task to a worker:

$Work_Queue::WORK_QUEUE_SCHEDULE_FCFS
$Work_Queue::WORK_QUEUE_SCHEDULE_FILES
$Work_Queue::WORK_QUEUE_SCHEDULE_TIME
$Work_Queue::WORK_QUEUE_SCHEDULE_RAND

specify_task_order

Set the order for dispatching submitted tasks in the queue to workers:

order

One of the following algorithms to use in dispatching

$Work_Queue::WORK_QUEUE_TASK_ORDER_FIFO
$Work_Queue::WORK_QUEUE_TASK_ORDER_LIFO

specify_name

Change the project name for the given queue.

name

The new project name.

specify_min_taskid

Set the minimum taskid of future submitted tasks.

Further submitted tasks are guaranteed to have a taskid larger or equal to minid. This function is useful to make taskids consistent in a workflow that consists of sequential managers. (Note: This function is rarely used). If the minimum id provided is smaller than the last taskid computed, the minimum id provided is ignored.

Returns the actual minimum taskid for future tasks.

q

A work queue object.

minid

Minimum desired taskid

specify_priority

Change the project priority for the given queue.

priority

An integer that presents the priorty of this work queue manager. The higher the value, the higher the priority.

specify_num_tasks_left

Specify the number of tasks not yet submitted to the queue. It is used by work_queue_factory to determine the number of workers to launch. If not specified, it defaults to 0. work_queue_factory considers the number of tasks as: num tasks left + num tasks running + num tasks read.

ntasks

ntasks Number of tasks yet to be submitted.

specify_manager_mode

Specify the manager mode for the given queue.

mode

This may be one of the following values:

$Work_Queue::WORK_QUEUE_MASTER_MODE_STANDALONE
$Work_Queue::WORK_QUEUE_MASTER_MODE_CATALOG.

specify_catalog_server

Specify the catalog server the manager should report to.

hostname

The hostname of the catalog server.

port

The port the catalog server is listening on.

specify_log

Specify a log file that records cummulative stats of connected workers and submitted tasks.

logfile

Name of the file to write the log. If the file exists, then new records are appended.

specify_transactions_log

Specify a log file that records the states of submitted tasks.

logfile

Name of the file to write the log. If the file exists, then new records are appended.

specify_password

Add a mandatory password that each worker must present.

password

The password, as a string.

specify_password_file

Add a mandatory password file that each worker must present.

file

Name of the file containing the password.

cancel_by_taskid

Cancel task identified by its taskid and remove from the given queue.

id

The taskid returned from Work_Queue->submit.

cancel_by_tasktag

Cancel task identified by its tag and remove from the given queue.

tag

The tag assigned to task using $t->speficy_tag($tag);

shutdown_workers

Shutdown workers connected to queue. Gives a best effort and then returns the number of workers given the shutdown order.

n

The number to shutdown. To shut down all workers, specify 0.

block_host

Block workers running on host from working to this queue.

host

The hostname the host running the workers.

block_host_with_timeout

Temporarily block workers running on host timeout seconds.

host

The hostname the host running the workers.

timeout

The duration of the block.

unblock_host

Unblock workers in host from work for the queue. Clear all blocks if host not provided.

host

The of the hostname the host.

invalidate_cache_file

Delete file from workers's caches.

self

Reference to the current work queue object.

local_name

Name of the file as seen by the manager.

specify_keepalive_interval

Change keepalive interval for a given queue.

interval Minimum number of seconds to wait before sending new keepalive checks to workers.

specify_keepalive_timeout

Change keepalive timeout for a given queue.

timeout

Minimum number of seconds to wait for a keepalive response from worker before marking it as dead.

estimate_capacity

Turn on manager capacity measurements.

activate_worker_waiting

Wait for at least n workers to connect before continuing.

n

Number of workers.

tune

Tune advanced parameters for work queue. Return 0 on succes, -1 on failure.

name The name fo the parameter to tune. Can be one of following:
"asynchrony-multiplier"

Treat each worker as having (actual_cores * multiplier) total cores. (default = 1.0)

"asynchrony-modifier"

Treat each worker as having an additional "modifier" cores. (default=0)

"min-transfer-timeout"

Set the minimum number of seconds to wait for files to be transferred to or from a worker. (default=300)

"foreman-transfer-timeout"

Set the minimum number of seconds to wait for files to be transferred to or from a foreman. (default=3600)

"transfer-outlier-factor"

Transfer that are this many times slower than the average will be aborted. (default=10x)

"default-transfer-rate"

The assumed network bandwidth used until sufficient data has been collected. (1MB/s)

"fast-abort-multiplier"

Set the multiplier of the average task time at which point to abort; if negative or zero fast_abort is deactivated. (default=0)

"keepalive-interval"

Set the minimum number of seconds to wait before sending new keepalive checks to workers. (default=300)

"keepalive-timeout"

Set the minimum number of seconds to wait for a keepalive response from worker before marking it as dead. (default=30)

value The value to set the parameter to.
"short-timeout"

Set the minimum timeout when sending a brief message to a single worker. (default=5s)

"long-timeout"

Set the minimum timeout when sending a brief message to a foreman. (default=1h)

"category-steady-n-tasks"

Set the number of tasks considered when computing category buckets.

specify_max_resources

Specifies the max resources for tasks without an explicit category ("default" category). rm specifies the maximum resources a task in the default category may use.

rm

Hash reference indicating maximum values. See @resources_measured for possible fields.

A maximum of 4 cores is found on any worker:

                q->specify_max_resources({'cores' => 4});

A maximum of 8 cores, 1GB of memory, and 10GB disk are found on any worker:

                q->specify_max_resources({'cores' => 8, 'memory' => 1024, 'disk' => 10240});

specify_min_resources

Specifies the min resources for tasks without an explicit category ("default" category). rm specifies the minimum resources a task in the default category may use.

rm

Hash reference indicating minimum values. See @resources_measured for possible fields.

A minimum of 2 cores is found on any worker:

                $q->specify_min_resources({'cores' => 2});

A minimum of 4 cores, 1GB of memory, and 10GB disk are found on any worker:

                $q->specify_min_resources({'cores' => 4, 'memory' => 1024, 'disk' => 10240});

specify_category_max_resources

Specifies the max resources for tasks in the given category.

category

Name of the category

rm

Hash reference indicating maximum values. See @resources_measured for possible fields.

A maximum of 4 cores is found on any worker:

                $q->specify_category_max_resources('my_category', {'cores' => 4});

A maximum of 8 cores, 1GB of memory, and 10GB disk are found on any worker:

                $q->specify_category_max_resources('my_category', {'cores' => 8, 'memory' => 1024, 'disk' => 10240});

specify_category_min_resources

Specifies the min resources for tasks in the given category.

category

Name of the category

rm

Hash reference indicating minimum values. See @resources_measured for possible fields.

A minimum of 2 cores is found on any worker:

                q->specify_category_min_resources('my_category', {'cores' => 2});

A minimum of 2 cores, 1GB of memory, and 10GB disk are found on any worker:

                q->specify_category_min_resources('my_category', {'cores' => 2, 'memory' => 1024, 'disk' => 10240});

specify_category_first_allocation_guess

Specifies the first-allocation guess for the given category

category

Name of the category

rm

initialize_categories

Initialize first value of categories

rm

Hash reference indicating maximum values. See @resources_measured for possible fields.

filename

JSON file with resource summaries.

submit

Submit a task to the queue.

task

A task description created from Work_Queue::Task.

                $q->submit($task);

wait

Wait for tasks to complete.

This call will block until the timeout has elapsed

timeout

The number of seconds to wait for a completed task back before returning. Use an integer to set the timeout or the constant $Work_Queue::WORK_QUEUE_WAITFORTASK to block until a task has completed.

                while( !$q->empty ) {
                                ...
                                $task = $q->wait($seconds);

                                if($task) {
                                        ...
                                }
                                ...
                }

POD ERRORS

Hey! The above document had some coding errors, which are explained below:

Around line 699:

Unknown directive: =over12

Around line 701:

'=item' outside of any '=over'

Around line 1109:

You forgot a '=back' before '=head3'

Around line 1178:

You forgot a '=back' before '=head3'

Around line 1201:

You forgot a '=back' before '=head3'

Unterminated C<...> sequence

Around line 1214:

You forgot a '=back' before '=head3'