cctools
Data Structures | Macros | Enumerations | Variables
work_queue.h File Reference

A master-worker library. More...

#include <sys/types.h>
#include "timestamp.h"
#include "category.h"
#include "rmsummary.h"

Go to the source code of this file.

Data Structures

struct  work_queue_task
 A task description. More...
 
struct  work_queue_stats
 Statistics describing a work queue. More...
 

Macros

#define WORK_QUEUE_DEFAULT_PORT   9123
 Default Work Queue port number. More...
 
#define WORK_QUEUE_RANDOM_PORT   0
 Indicates that any port may be chosen. More...
 
#define WORK_QUEUE_WAITFORTASK   -1
 Timeout value to wait for a task to complete before returning. More...
 
#define WORK_QUEUE_DEFAULT_KEEPALIVE_INTERVAL   120
 Default value for Work Queue keepalive interval in seconds. More...
 
#define WORK_QUEUE_DEFAULT_KEEPALIVE_TIMEOUT   30
 Default value for Work Queue keepalive timeout in seconds. More...
 

Enumerations

enum  work_queue_file_type_t {
  WORK_QUEUE_INPUT = 0,
  WORK_QUEUE_OUTPUT = 1
}
 
enum  work_queue_file_flags_t {
  WORK_QUEUE_NOCACHE = 0,
  WORK_QUEUE_CACHE = 1,
  WORK_QUEUE_SYMLINK = 2,
  WORK_QUEUE_PREEXIST = 4,
  WORK_QUEUE_THIRDGET = 8,
  WORK_QUEUE_THIRDPUT = 8,
  WORK_QUEUE_WATCH = 16
}
 
enum  work_queue_schedule_t { ,
  WORK_QUEUE_SCHEDULE_FCFS,
  WORK_QUEUE_SCHEDULE_FILES,
  WORK_QUEUE_SCHEDULE_TIME,
  WORK_QUEUE_SCHEDULE_RAND,
  WORK_QUEUE_SCHEDULE_WORST
}
 
enum  work_queue_result_t {
  WORK_QUEUE_RESULT_SUCCESS = 0,
  WORK_QUEUE_RESULT_INPUT_MISSING = 1,
  WORK_QUEUE_RESULT_OUTPUT_MISSING = 2,
  WORK_QUEUE_RESULT_STDOUT_MISSING = 4,
  WORK_QUEUE_RESULT_SIGNAL = 1 << 3,
  WORK_QUEUE_RESULT_RESOURCE_EXHAUSTION = 2 << 3,
  WORK_QUEUE_RESULT_TASK_TIMEOUT = 3 << 3,
  WORK_QUEUE_RESULT_UNKNOWN = 4 << 3,
  WORK_QUEUE_RESULT_FORSAKEN = 5 << 3,
  WORK_QUEUE_RESULT_MAX_RETRIES = 6 << 3,
  WORK_QUEUE_RESULT_TASK_MAX_RUN_TIME = 7 << 3,
  WORK_QUEUE_RESULT_DISK_ALLOC_FULL = 8 << 3
}
 
enum  work_queue_task_state_t {
  WORK_QUEUE_TASK_UNKNOWN = 0,
  WORK_QUEUE_TASK_READY,
  WORK_QUEUE_TASK_RUNNING,
  WORK_QUEUE_TASK_WAITING_RETRIEVAL,
  WORK_QUEUE_TASK_RETRIEVED,
  WORK_QUEUE_TASK_DONE,
  WORK_QUEUE_TASK_CANCELED
}
 
enum  work_queue_file_t {
  WORK_QUEUE_FILE = 1,
  WORK_QUEUE_BUFFER,
  WORK_QUEUE_REMOTECMD,
  WORK_QUEUE_FILE_PIECE,
  WORK_QUEUE_DIRECTORY,
  WORK_QUEUE_URL
}
 

Functions

Functions - Tasks
struct work_queue_taskwork_queue_task_create (const char *full_command)
 Create a new task object. More...
 
struct work_queue_taskwork_queue_task_clone (const struct work_queue_task *task)
 
Create a copy of a task

Create a functionally identical copy of a work_queue_task that can be re-submitted via work_queue_submit. More...

 
void work_queue_task_specify_command (struct work_queue_task *t, const char *cmd)
 Indicate the command to be executed. More...
 
int work_queue_task_specify_file (struct work_queue_task *t, const char *local_name, const char *remote_name, work_queue_file_type_t type, work_queue_file_flags_t flags)
 Add a file to a task. More...
 
int work_queue_task_specify_file_piece (struct work_queue_task *t, const char *local_name, const char *remote_name, off_t start_byte, off_t end_byte, work_queue_file_type_t type, work_queue_file_flags_t flags)
 Add a file piece to a task. More...
 
int work_queue_task_specify_buffer (struct work_queue_task *t, const char *data, int length, const char *remote_name, work_queue_file_flags_t)
 Add an input buffer to a task. More...
 
int work_queue_task_specify_directory (struct work_queue_task *t, const char *local_name, const char *remote_name, work_queue_file_type_t type, work_queue_file_flags_t, int recursive)
 Add a directory to a task. More...
 
void work_queue_task_specify_max_retries (struct work_queue_task *t, int64_t max_retries)
 Specify the number of times this task is retried on worker errors. More...
 
void work_queue_task_specify_memory (struct work_queue_task *t, int64_t memory)
 Specify the amount of disk space required by a task. More...
 
void work_queue_task_specify_disk (struct work_queue_task *t, int64_t disk)
 Specify the amount of disk space required by a task. More...
 
void work_queue_task_specify_cores (struct work_queue_task *t, int cores)
 Specify the number of cores required by a task. More...
 
void work_queue_task_specify_gpus (struct work_queue_task *t, int gpus)
 Specify the number of gpus required by a task. More...
 
void work_queue_task_specify_end_time (struct work_queue_task *t, int64_t useconds)
 Specify the maximum end time allowed for the task (in microseconds since the Epoch). More...
 
void work_queue_task_specify_running_time (struct work_queue_task *t, int64_t useconds)
 Specify the maximum time (in microseconds) the task is allowed to run in a worker. More...
 
void work_queue_task_specify_tag (struct work_queue_task *t, const char *tag)
 Attach a user defined string tag to the task. More...
 
void work_queue_task_specify_category (struct work_queue_task *t, const char *category)
 Label the task with the given category. More...
 
void work_queue_task_specify_priority (struct work_queue_task *t, double priority)
 Specify the priority of this task relative to others in the queue. More...
 
void work_queue_task_specify_enviroment_variable (struct work_queue_task *t, const char *name, const char *value)
 Specify an environment variable to be added to the task. More...
 
void work_queue_task_specify_algorithm (struct work_queue_task *t, work_queue_schedule_t algorithm)
 Select the scheduling algorithm for a single task. More...
 
void work_queue_task_specify_monitor_output (struct work_queue_task *t, const char *monitor_output)
 Specify a custom name for the monitoring summary. More...
 
void work_queue_task_delete (struct work_queue_task *t)
 Delete a task. More...
 
int work_queue_specify_snapshot_file (struct work_queue_task *t, const char *monitor_snapshot_file)
 
When monitoring, indicates a json-encoded file that instructs the

monitor to take a snapshot of the task resources. More...

 
Functions - Queues
struct work_queue * work_queue_create (int port)
 Create a new work queue. More...
 
int work_queue_enable_monitoring (struct work_queue *q, char *monitor_output_directory)
 Enables resource monitoring on the give work queue. More...
 
int work_queue_enable_monitoring_full (struct work_queue *q, char *monitor_output_directory)
 Enables resource monitoring on the give work queue. More...
 
int work_queue_submit (struct work_queue *q, struct work_queue_task *t)
 Submit a task to a queue. More...
 
int work_queue_specify_min_taskid (struct work_queue *q, int minid)
 Set the minimum taskid of future submitted tasks. More...
 
void work_queue_blacklist_add (struct work_queue *q, const char *hostname)
 Blacklist hostname from a queue. More...
 
void work_queue_blacklist_add_with_timeout (struct work_queue *q, const char *hostname, time_t seconds)
 Blacklist hostname from a queue. More...
 
void work_queue_blacklist_remove (struct work_queue *q, const char *hostname)
 Unblacklist host from a queue. More...
 
void work_queue_blacklist_clear (struct work_queue *q)
 Clear blacklist of a queue. More...
 
void work_queue_invalidate_cached_file (struct work_queue *q, const char *local_name, work_queue_file_t type)
 Invalidate cached file. More...
 
struct work_queue_taskwork_queue_wait (struct work_queue *q, int timeout)
 Wait for a task to complete. More...
 
int work_queue_hungry (struct work_queue *q)
 Determine whether the queue is 'hungry' for more tasks. More...
 
int work_queue_empty (struct work_queue *q)
 Determine whether the queue is empty. More...
 
int work_queue_port (struct work_queue *q)
 Get the listening port of the queue. More...
 
void work_queue_get_stats (struct work_queue *q, struct work_queue_stats *s)
 Get queue statistics (only from master). More...
 
void work_queue_get_stats_hierarchy (struct work_queue *q, struct work_queue_stats *s)
 Get statistics of the master queue together with foremen information. More...
 
void work_queue_get_stats_category (struct work_queue *q, const char *c, struct work_queue_stats *s)
 Get the task statistics for the given category. More...
 
work_queue_task_state_t work_queue_task_state (struct work_queue *q, int taskid)
 Get the current state of the task. More...
 
void work_queue_set_bandwidth_limit (struct work_queue *q, const char *bandwidth)
 Limit the queue bandwidth when transferring files to and from workers. More...
 
double work_queue_get_effective_bandwidth (struct work_queue *q)
 Get current queue bandwidth. More...
 
char * work_queue_get_worker_summary (struct work_queue *q)
 Summarize workers. More...
 
int work_queue_activate_fast_abort (struct work_queue *q, double multiplier)
 
Turn on or off fast abort functionality for a given queue for tasks without

an explicit category. More...

 
int work_queue_activate_fast_abort_category (struct work_queue *q, const char *category, double multiplier)
 Turn on or off fast abort functionality for a given category. More...
 
int work_queue_specify_category_mode (struct work_queue *q, const char *category, category_mode_t mode)
 Turn on or off first-allocation labeling for a given category. More...
 
int work_queue_enable_category_resource (struct work_queue *q, const char *category, const char *resource, int autolabel)
 Turn on or off first-allocation labeling for a given category and resource. More...
 
void work_queue_specify_algorithm (struct work_queue *q, work_queue_schedule_t algorithm)
 Change the worker selection algorithm. More...
 
const char * work_queue_name (struct work_queue *q)
 Get the project name of the queue. More...
 
void work_queue_specify_name (struct work_queue *q, const char *name)
 Change the project name for a given queue. More...
 
void work_queue_specify_priority (struct work_queue *q, int priority)
 Change the priority for a given queue. More...
 
void work_queue_specify_num_tasks_left (struct work_queue *q, int ntasks)
 Specify the number of tasks not yet submitted to the queue. More...
 
void work_queue_specify_catalog_server (struct work_queue *q, const char *hostname, int port)
 Specify the catalog server the master should report to. More...
 
void work_queue_specify_catalog_servers (struct work_queue *q, const char *hosts)
 Specify the catalog server(s) the master should report to. More...
 
struct work_queue_taskwork_queue_cancel_by_taskid (struct work_queue *q, int id)
 Cancel a submitted task using its task id and remove it from queue. More...
 
struct work_queue_taskwork_queue_cancel_by_tasktag (struct work_queue *q, const char *tag)
 Cancel a submitted task using its tag and remove it from queue. More...
 
struct listwork_queue_cancel_all_tasks (struct work_queue *q)
 Cancel all submitted tasks and remove them from the queue. More...
 
int work_queue_shut_down_workers (struct work_queue *q, int n)
 Shut down workers connected to the work_queue system. More...
 
void work_queue_delete (struct work_queue *q)
 Delete a work queue. More...
 
int work_queue_specify_log (struct work_queue *q, const char *logfile)
 Add a log file that records cummulative statistics of the connected workers and submitted tasks. More...
 
int work_queue_specify_transactions_log (struct work_queue *q, const char *logfile)
 Add a log file that records the states of the connected workers and tasks. More...
 
void work_queue_specify_password (struct work_queue *q, const char *password)
 Add a mandatory password that each worker must present. More...
 
int work_queue_specify_password_file (struct work_queue *q, const char *file)
 Add a mandatory password file that each worker must present. More...
 
void work_queue_specify_keepalive_interval (struct work_queue *q, int interval)
 Change the keepalive interval for a given queue. More...
 
void work_queue_specify_keepalive_timeout (struct work_queue *q, int timeout)
 Change the keepalive timeout for identifying dead workers for a given queue. More...
 
void work_queue_master_preferred_connection (struct work_queue *q, const char *preferred_connection)
 Set the preference for using hostname over IP address to connect. More...
 
int work_queue_tune (struct work_queue *q, const char *name, double value)
 Tune advanced parameters for work queue. More...
 
void work_queue_specify_max_resources (struct work_queue *q, const struct rmsummary *rm)
 Sets the maximum resources a task without an explicit category ("default" category). More...
 
void work_queue_specify_category_max_resources (struct work_queue *q, const char *category, const struct rmsummary *rm)
 Sets the maximum resources a task in the category may use. More...
 
void work_queue_specify_category_first_allocation_guess (struct work_queue *q, const char *category, const struct rmsummary *rm)
 Set the initial guess for resource autolabeling for the given category. More...
 
void work_queue_initialize_categories (struct work_queue *q, struct rmsummary *max, const char *summaries_file)
 Initialize first value of categories. More...
 

Variables

int wq_option_scheduler
 Initial setting for algorithm to assign tasks to workers upon creating queue . More...
 

Functions - Deprecated

#define WORK_QUEUE_TASK_ORDER_FIFO   0
 Retrieve tasks based on first-in-first-out order. More...
 
#define WORK_QUEUE_TASK_ORDER_LIFO   1
 Retrieve tasks based on last-in-first-out order. More...
 
#define WORK_QUEUE_MASTER_MODE_STANDALONE   0
 Work Queue master does not report to the catalog server. More...
 
#define WORK_QUEUE_MASTER_MODE_CATALOG   1
 Work Queue master reports to catalog server. More...
 
void work_queue_specify_task_order (struct work_queue *q, int order)
 Specified how the submitted tasks should be ordered. More...
 
void work_queue_specify_master_mode (struct work_queue *q, int mode)
 Specify the master mode for a given queue. More...
 
void work_queue_specify_estimate_capacity_on (struct work_queue *q, int estimate_capacity_on)
 Change whether to estimate master capacity for a given queue. More...
 
int work_queue_task_specify_input_buf (struct work_queue_task *t, const char *buf, int length, const char *rname)
 Add an input buffer to a task. More...
 
int work_queue_task_specify_input_file (struct work_queue_task *t, const char *fname, const char *rname)
 Add an input file to a task. More...
 
int work_queue_task_specify_input_file_do_not_cache (struct work_queue_task *t, const char *fname, const char *rname)
 Add an input file to a task, without caching. More...
 
int work_queue_task_specify_output_file (struct work_queue_task *t, const char *rname, const char *fname)
 Add an output file to a task. More...
 
int work_queue_task_specify_output_file_do_not_cache (struct work_queue_task *t, const char *rname, const char *fname)
 Add an output file to a task without caching. More...
 
char * work_queue_generate_disk_alloc_full_filename (char *pwd, int taskid)
 Generate a worker-level unique filename to indicate a disk allocation being full. More...
 

Detailed Description

A master-worker library.

The work queue provides an implementation of the master-worker computing model using TCP sockets, Unix applications, and files as intermediate buffers. A master process uses work_queue_create to create a queue, then work_queue_submit to submit tasks. Once tasks are running, call work_queue_wait to wait for completion. A generic worker program, named work_queue_worker, can be run on any machine, and simply needs to be told the host and port of the master.

Macro Definition Documentation

#define WORK_QUEUE_DEFAULT_PORT   9123

Default Work Queue port number.

#define WORK_QUEUE_RANDOM_PORT   0

Indicates that any port may be chosen.

#define WORK_QUEUE_WAITFORTASK   -1

Timeout value to wait for a task to complete before returning.

#define WORK_QUEUE_DEFAULT_KEEPALIVE_INTERVAL   120

Default value for Work Queue keepalive interval in seconds.

#define WORK_QUEUE_DEFAULT_KEEPALIVE_TIMEOUT   30

Default value for Work Queue keepalive timeout in seconds.

#define WORK_QUEUE_TASK_ORDER_FIFO   0

Retrieve tasks based on first-in-first-out order.

#define WORK_QUEUE_TASK_ORDER_LIFO   1

Retrieve tasks based on last-in-first-out order.

#define WORK_QUEUE_MASTER_MODE_STANDALONE   0

Work Queue master does not report to the catalog server.

#define WORK_QUEUE_MASTER_MODE_CATALOG   1

Work Queue master reports to catalog server.

Enumeration Type Documentation

Enumerator
WORK_QUEUE_INPUT 

Specify an input object.

WORK_QUEUE_OUTPUT 

Specify an output object.

Enumerator
WORK_QUEUE_NOCACHE 

Do not cache file at execution site.

WORK_QUEUE_CACHE 

Cache file at execution site for later use.

WORK_QUEUE_SYMLINK 

Create a symlink to the file rather than copying it, if possible.

WORK_QUEUE_PREEXIST 

If the filename already exists on the host, use it in place.

WORK_QUEUE_THIRDGET 

Access the file on the client from a shared filesystem.

WORK_QUEUE_THIRDPUT 

Access the file on the client from a shared filesystem (same as WORK_QUEUE_THIRDGET, included for readability)

WORK_QUEUE_WATCH 

Watch the output file and send back changes as the task runs.

Enumerator
WORK_QUEUE_SCHEDULE_FCFS 

Select worker on a first-come-first-serve basis.

WORK_QUEUE_SCHEDULE_FILES 

Select worker that has the most data required by the task.

WORK_QUEUE_SCHEDULE_TIME 

Select worker that has the fastest execution time on previous tasks.

WORK_QUEUE_SCHEDULE_RAND 

Select a random worker.

(default)

WORK_QUEUE_SCHEDULE_WORST 

Select the worst fit worker (the worker with more unused resources).

Enumerator
WORK_QUEUE_RESULT_SUCCESS 

The task ran successfully.

WORK_QUEUE_RESULT_INPUT_MISSING 

The task cannot be run due to a missing input file.

WORK_QUEUE_RESULT_OUTPUT_MISSING 

The task ran but failed to generate a specified output file.

WORK_QUEUE_RESULT_STDOUT_MISSING 

The task ran but its stdout has been truncated.

WORK_QUEUE_RESULT_SIGNAL 

The task was terminated with a signal.

WORK_QUEUE_RESULT_RESOURCE_EXHAUSTION 

The task used more resources than requested.

WORK_QUEUE_RESULT_TASK_TIMEOUT 

The task ran after the specified (absolute since epoch) end time.

WORK_QUEUE_RESULT_UNKNOWN 

The result could not be classified.

WORK_QUEUE_RESULT_FORSAKEN 

The task failed, but it was neither a task or worker error.

WORK_QUEUE_RESULT_MAX_RETRIES 

The task could not be completed successfully in the given number of retries.

WORK_QUEUE_RESULT_TASK_MAX_RUN_TIME 

The task ran for more than the specified time (relative since running in a worker).

WORK_QUEUE_RESULT_DISK_ALLOC_FULL 

The task filled its loop device allocation but needed more space.

Enumerator
WORK_QUEUE_TASK_UNKNOWN 

There is no such task.

WORK_QUEUE_TASK_READY 

Task is ready to be run, waiting in queue.

WORK_QUEUE_TASK_RUNNING 

Task has been dispatched to some worker.

WORK_QUEUE_TASK_WAITING_RETRIEVAL 

Task results are available at the worker.

WORK_QUEUE_TASK_RETRIEVED 

Task results are available at the master.

WORK_QUEUE_TASK_DONE 

Task is done, and returned through work_queue_wait >

WORK_QUEUE_TASK_CANCELED 

Task was canceled before completion.

Enumerator
WORK_QUEUE_FILE 

File-spec is a regular file.

WORK_QUEUE_BUFFER 

Data comes from buffer memory.

WORK_QUEUE_REMOTECMD 

File-spec is a regular file.

WORK_QUEUE_FILE_PIECE 

File-spec refers to only a part of a file.

WORK_QUEUE_DIRECTORY 

File-spec is a directory.

WORK_QUEUE_URL 

File-spec refers to an URL.

Function Documentation

struct work_queue_task* work_queue_task_create ( const char *  full_command)

Create a new task object.

Once created and elaborated with functions such as work_queue_task_specify_file and work_queue_task_specify_buffer, the task should be passed to work_queue_submit.

Parameters
full_commandThe shell command line to be executed by the task. If null, the command will be given later by work_queue_task_specify_command
Returns
A new task object, or null if it could not be created.

Referenced by work_queue.Task::__init__().

struct work_queue_task* work_queue_task_clone ( const struct work_queue_task task)

Create a copy of a task

Create a functionally identical copy of a work_queue_task that can be re-submitted via work_queue_submit.

Returns
A new task object

Referenced by work_queue.Task::clone().

void work_queue_task_specify_command ( struct work_queue_task t,
const char *  cmd 
)

Indicate the command to be executed.

Parameters
tA task object.
cmdThe command to be executed. This string will be duplicated by this call, so the argument may be freed or re-used afterward.

Referenced by work_queue.Task::specify_command().

int work_queue_task_specify_file ( struct work_queue_task t,
const char *  local_name,
const char *  remote_name,
work_queue_file_type_t  type,
work_queue_file_flags_t  flags 
)

Add a file to a task.

Parameters
tA task object.
local_nameThe name of the file on local disk or shared filesystem.
remote_nameThe name of the file at the remote execution site.
typeMust be one of the following values:
flagsMay be zero to indicate no special handling or any of work_queue_file_flags_t or'd together. The most common are:
  • WORK_QUEUE_CACHE indicates that the file should be cached for later tasks. (recommended)
  • WORK_QUEUE_NOCACHE indicates that the file should not be cached for later tasks.
  • WORK_QUEUE_WATCH indicates that the worker will watch the output file as it is created and incrementally return the file to the master as the task runs. (The frequency of these updates is entirely dependent upon the system load. If the master is busy interacting with many workers, output updates will be infrequent.)
Returns
1 if the task file is successfully specified, 0 if either of t, local_name, or remote_name is null or remote_name is an absolute path.

Referenced by work_queue.Task::specify_file().

int work_queue_task_specify_file_piece ( struct work_queue_task t,
const char *  local_name,
const char *  remote_name,
off_t  start_byte,
off_t  end_byte,
work_queue_file_type_t  type,
work_queue_file_flags_t  flags 
)

Add a file piece to a task.

Parameters
tA task object.
local_nameThe name of the file on local disk or shared filesystem.
remote_nameThe name of the file at the remote execution site.
start_byteThe starting byte offset of the file piece to be transferred.
end_byteThe ending byte offset of the file piece to be transferred.
typeMust be one of the following values:
flagsMay be zero to indicate no special handling or any of work_queue_file_flags_t or'd together. The most common are:
  • WORK_QUEUE_CACHE indicates that the file should be cached for later tasks. (recommended)
  • WORK_QUEUE_NOCACHE indicates that the file should not be cached for later tasks.
Returns
1 if the task file piece is successfully specified, 0 if either of t, local_name, or remote_name is null or remote_name is an absolute path.

Referenced by work_queue.Task::specify_file_piece().

int work_queue_task_specify_buffer ( struct work_queue_task t,
const char *  data,
int  length,
const char *  remote_name,
work_queue_file_flags_t   
)

Add an input buffer to a task.

Parameters
tA task object.
dataThe data to be passed as an input file.
lengthThe length of the buffer, in bytes
remote_nameThe name of the remote file to create.
flagsMay be zero to indicate no special handling or any of work_queue_file_flags_t or'd together. The most common are:
  • WORK_QUEUE_CACHE indicates that the file should be cached for later tasks. (recommended)
  • WORK_QUEUE_NOCACHE indicates that the file should not be cached for later tasks.
Returns
1 if the task file is successfully specified, 0 if either of t or remote_name is null or remote_name is an absolute path.

Referenced by work_queue.Task::specify_buffer().

int work_queue_task_specify_directory ( struct work_queue_task t,
const char *  local_name,
const char *  remote_name,
work_queue_file_type_t  type,
work_queue_file_flags_t  ,
int  recursive 
)

Add a directory to a task.

Parameters
tA task object.
local_nameThe name of the directory on local disk or shared filesystem. Optional if the directory is empty.
remote_nameThe name of the directory at the remote execution site.
typeMust be one of the following values:
flagsMay be zero to indicate no special handling or any of work_queue_file_flags_t or'd together. The most common are:
  • WORK_QUEUE_CACHE indicates that the file should be cached for later tasks. (recommended)
  • WORK_QUEUE_NOCACHE indicates that the file should not be cached for later tasks.
recursiveindicates whether just the directory (0) or the directory and all of its contents (1) should be included.
Returns
1 if the task directory is successfully specified, 0 if either of t, local_name, or remote_name is null or remote_name is an absolute path.

Referenced by work_queue.Task::specify_directory().

void work_queue_task_specify_max_retries ( struct work_queue_task t,
int64_t  max_retries 
)

Specify the number of times this task is retried on worker errors.

If less than one, the task is retried indefinitely (this the default). A task that did not succeed after the given number of retries is returned with result WORK_QUEUE_RESULT_MAX_RETRIES.

Parameters
tA task object.
max_retriesThe number of retries.

Referenced by work_queue.Task::specify_max_retries().

void work_queue_task_specify_memory ( struct work_queue_task t,
int64_t  memory 
)

Specify the amount of disk space required by a task.

Parameters
tA task object.
diskThe amount of disk space required by the task, in megabytes.

Referenced by work_queue.Task::specify_memory().

void work_queue_task_specify_disk ( struct work_queue_task t,
int64_t  disk 
)

Specify the amount of disk space required by a task.

Parameters
tA task object.
diskThe amount of disk space required by the task, in megabytes.

Referenced by work_queue.Task::specify_disk().

void work_queue_task_specify_cores ( struct work_queue_task t,
int  cores 
)

Specify the number of cores required by a task.

Parameters
tA task object.
coresThe number of cores required by the task.

Referenced by work_queue.Task::specify_cores().

void work_queue_task_specify_gpus ( struct work_queue_task t,
int  gpus 
)

Specify the number of gpus required by a task.

Parameters
tA task object.
gpusThe number of gpus required by the task.
void work_queue_task_specify_end_time ( struct work_queue_task t,
int64_t  useconds 
)

Specify the maximum end time allowed for the task (in microseconds since the Epoch).

If less than 1, then no end time is specified (this is the default). This is useful, for example, when the task uses certificates that expire.

Parameters
tA task object.
secondsNumber of seconds since the Epoch.

Referenced by work_queue.Task::specify_priority().

void work_queue_task_specify_running_time ( struct work_queue_task t,
int64_t  useconds 
)

Specify the maximum time (in microseconds) the task is allowed to run in a worker.

This time is accounted since the the moment the task starts to run in a worker. If less than 1, then no maximum time is specified (this is the default).

Parameters
tA task object.
secondsMaximum number of seconds the task may run in a worker.

Referenced by work_queue.Task::specify_priority().

void work_queue_task_specify_tag ( struct work_queue_task t,
const char *  tag 
)

Attach a user defined string tag to the task.

This field is not interpreted by the work queue, but is provided for the user's convenience in identifying tasks when they complete.

Parameters
tA task object.
tagThe tag to attach to task t.

Referenced by work_queue.Task::specify_tag().

void work_queue_task_specify_category ( struct work_queue_task t,
const char *  category 
)

Label the task with the given category.

It is expected that tasks with the same category have similar resources requirements (e.g. for fast abort).

Parameters
qA work queue object.
tA task object.
categoryThe name of the category to use.

Referenced by work_queue.Task::specify_category().

void work_queue_task_specify_priority ( struct work_queue_task t,
double  priority 
)

Specify the priority of this task relative to others in the queue.

Tasks with a higher priority value run first. If no priority is given, a task is placed at the end of the ready list, regardless of the priority.

Parameters
tA task object.
priorityThe priority of the task.

Referenced by work_queue.Task::specify_priority().

void work_queue_task_specify_enviroment_variable ( struct work_queue_task t,
const char *  name,
const char *  value 
)

Specify an environment variable to be added to the task.

Parameters
tA task object
nameName of the variable.
valueValue of the variable.

Referenced by work_queue.Task::specify_environment_variable().

void work_queue_task_specify_algorithm ( struct work_queue_task t,
work_queue_schedule_t  algorithm 
)

Select the scheduling algorithm for a single task.

To change the scheduling algorithm for all tasks, use work_queue_specify_algorithm instead.

Parameters
tA task object.
algorithmThe algorithm to use in assigning this task to a worker. For possible values, see work_queue_schedule_t.

Referenced by work_queue.Task::specify_algorithm().

void work_queue_task_specify_monitor_output ( struct work_queue_task t,
const char *  monitor_output 
)

Specify a custom name for the monitoring summary.

If work_queue_enable_monitoring is also enabled, the summary is also written to that directory.

Parameters
tA task object.
monitor_outputResource summary file.

Referenced by work_queue.Task::specify_monitor_output().

void work_queue_task_delete ( struct work_queue_task t)

Delete a task.

This may be called on tasks after they are returned from work_queue_wait.

Parameters
tThe task to delete.

Referenced by work_queue.Task::__init__().

int work_queue_specify_snapshot_file ( struct work_queue_task t,
const char *  monitor_snapshot_file 
)

When monitoring, indicates a json-encoded file that instructs the

monitor to take a snapshot of the task resources.

Snapshots appear in the JSON summary file of the task, under the key "snapshots". Snapshots are taken on events on files described in the monitor_snapshot_file. The monitor_snapshot_file is a json encoded file with the following format:

{
    "FILENAME": {
        "from-start":boolean,
        "from-start-if-truncated":boolean,
        "delete-if-found":boolean,
        "events": [
            {
                "label":"EVENT_NAME",
                "on-create":boolean,
                "on-truncate":boolean,
                "pattern":"REGEXP",
                "count":integer
            },
            {
                "label":"EVENT_NAME",
                ...
            }
        ]
    },
    "FILENAME": {
        ...
}

All fields but label are optional.

        from-start:boolean         If FILENAME exits when task starts running, process from line 1. Default: false, as the task may be appending to an already existing file.
        from-start-if-truncated    If FILENAME is truncated, process from line 1. Default: true, to account for log rotations.
        delete-if-found            Delete FILENAME when found. Default: false

        events:
        label        Name that identifies the snapshot. Only alphanumeric, -,
                     and _ characters are allowed. 
        on-create    Take a snapshot every time the file is created. Default: false
        on-truncate  Take a snapshot when the file is truncated.    Default: false
        pattern      Take a snapshot when a line matches the regexp pattern.    Default: none
        count        Maximum number of snapshots for this label. Default: -1 (no limit)

For more information, consult the manual of the resource_monitor.

Parameters
tA work queue task object.
monitor_snapshot_fileA filename.
Returns
1 if the task file is successfully specified, 0 if either of t, or monitor_snapshot_file is null.

Referenced by work_queue.Task::specify_buffer().

struct work_queue* work_queue_create ( int  port)

Create a new work queue.

Users may modify the behavior of work_queue_create by setting the following environmental variables before calling the function:

  • WORK_QUEUE_PORT: This sets the default port of the queue (if unset, the default is 9123).
  • WORK_QUEUE_LOW_PORT: If the user requests a random port, then this sets the first port number in the scan range (if unset, the default is 1024).
  • WORK_QUEUE_HIGH_PORT: If the user requests a random port, then this sets the last port number in the scan range (if unset, the default is 32767).
  • WORK_QUEUE_NAME: This sets the project name of the queue, which is reported to a catalog server (by default this is unset).
  • WORK_QUEUE_PRIORITY: This sets the priority of the queue, which is used by workers to sort masters such that higher priority masters will be served first (if unset, the default is 10).

If the queue has a project name, then queue statistics and information will be reported to a catalog server. To specify the catalog server, the user may set the CATALOG_HOST and CATALOG_PORT environmental variables as described in catalog_query_create.

Parameters
portThe port number to listen on. If zero is specified, then the port stored in the WORK_QUEUE_PORT environment variable is used if available. If it isn't, or if -1 is specified, the first unused port between WORK_QUEUE_LOW_PORT and WORK_QUEUE_HIGH_PORT (1024 and 32767 by default) is chosen.
Returns
A new work queue, or null if it could not be created.

Referenced by work_queue.WorkQueue::__init__().

int work_queue_enable_monitoring ( struct work_queue *  q,
char *  monitor_output_directory 
)

Enables resource monitoring on the give work queue.

It generates a resource summary per task, which is written to the given directory. It also creates all_summaries-PID.log, that consolidates all summaries into a single. If monitor_output_dirname is NULL, work_queue_task is updated with the resources measured, and no summary file is kept unless explicitely given by work_queue_task's monitor_output_file.

Parameters
qA work queue object.
monitor_output_dirnameThe name of the output directory. If NULL, summaries are kept only when monitor_output_directory is specify per task, but resources_measured from work_queue_task is updated.
Returns
1 on success, 0 if monitoring was not enabled.

Referenced by work_queue.WorkQueue::enable_monitoring().

int work_queue_enable_monitoring_full ( struct work_queue *  q,
char *  monitor_output_directory 
)

Enables resource monitoring on the give work queue.

As work_queue_enable_monitoring, but it generates a time series and a monitor debug file (WARNING: for long running tasks these files may reach gigabyte sizes. This function is mostly used for debugging.)

Parameters
qA work queue object.
monitor_output_dirnameThe name of the output directory.
Returns
1 on success, 0 if monitoring was not enabled.

Referenced by work_queue.WorkQueue::enable_monitoring_full().

int work_queue_submit ( struct work_queue *  q,
struct work_queue_task t 
)

Submit a task to a queue.

Once a task is submitted to a queue, it is not longer under the user's control and should not be inspected until returned via work_queue_wait. Once returned, it is safe to re-submit the same take object via work_queue_submit.

Parameters
qA work queue object.
tA task object returned from work_queue_task_create.
Returns
An integer taskid assigned to the submitted task.

Referenced by work_queue.WorkQueue::submit().

int work_queue_specify_min_taskid ( struct work_queue *  q,
int  minid 
)

Set the minimum taskid of future submitted tasks.

Further submitted tasks are guaranteed to have a taskid larger or equal to minid. This function is useful to make taskids consistent in a workflow that consists of sequential masters. (Note: This function is rarely used). If the minimum id provided is smaller than the last taskid computed, the minimum id provided is ignored.

Parameters
qA work queue object.
minidMinimum desired taskid
Returns
Returns the actual minimum taskid for future tasks.

Referenced by work_queue.WorkQueue::specify_min_taskid().

void work_queue_blacklist_add ( struct work_queue *  q,
const char *  hostname 
)

Blacklist hostname from a queue.

Parameters
qA work queue object.
hostnameA string for hostname.

Referenced by work_queue.WorkQueue::blacklist().

void work_queue_blacklist_add_with_timeout ( struct work_queue *  q,
const char *  hostname,
time_t  seconds 
)

Blacklist hostname from a queue.

Remove from blacklist in timeout seconds. If timeout is less than 1, then the hostname is blacklisted indefinitely, as if work_queue_blacklist_add was called instead.

Parameters
qA work queue object.
hostnameA string for hostname.
secondsNumber of seconds to the hostname will be in the blacklist.

Referenced by work_queue.WorkQueue::blacklist_with_timeout().

void work_queue_blacklist_remove ( struct work_queue *  q,
const char *  hostname 
)

Unblacklist host from a queue.

Parameters
qA work queue object.
hostnameA string for hostname.

Referenced by work_queue.WorkQueue::blacklist_clear().

void work_queue_blacklist_clear ( struct work_queue *  q)

Clear blacklist of a queue.

Parameters
qA work queue object.

Referenced by work_queue.WorkQueue::blacklist_clear().

void work_queue_invalidate_cached_file ( struct work_queue *  q,
const char *  local_name,
work_queue_file_t  type 
)

Invalidate cached file.

The file or directory with the given local name specification is deleted from the workers' cache, so that a newer version may be used. Any running task using the file is canceled and resubmitted. Completed tasks waiting for retrieval are not affected. (Currently anonymous buffers and file pieces cannot be deleted once cached in a worker.)

Parameters
qA work queue object.
local_nameThe name of the file on local disk or shared filesystem, or uri.
typeOne of:
struct work_queue_task* work_queue_wait ( struct work_queue *  q,
int  timeout 
)

Wait for a task to complete.

This call will block until either a task has completed, the timeout has expired, or the queue is empty. If a task has completed, the corresponding task object will be returned by this function. The caller may examine the task and then dispose of it using work_queue_task_delete.

If the task ran to completion, then the result field will be zero and the return_status field will contain the Unix exit code of the task. If the task could not, then the result field will be non-zero and the return_status field will be undefined.

Parameters
qA work queue object.
timeoutThe number of seconds to wait for a completed task before returning. Use an integer time to set the timeout or the constant WORK_QUEUE_WAITFORTASK to block until a task has completed.
Returns
A completed task description, or null if the queue is empty, or the timeout was reached without a completed task, or there is completed child process (call process_wait to retrieve the status of the completed child process).

Referenced by work_queue.WorkQueue::wait().

int work_queue_hungry ( struct work_queue *  q)

Determine whether the queue is 'hungry' for more tasks.

While the Work Queue can handle a very large number of tasks, it runs most efficiently when the number of tasks is slightly larger than the number of active workers. This function gives the user of a flexible application a hint about whether it would be better to submit more tasks via work_queue_submit or wait for some to complete via work_queue_wait.

Parameters
qA work queue object.
Returns
The number of additional tasks that can be efficiently submitted, or zero if the queue has enough to work with right now.

Referenced by work_queue.WorkQueue::hungry().

int work_queue_empty ( struct work_queue *  q)

Determine whether the queue is empty.

When all of the desired tasks have been submitted to the queue, the user should continue to call work_queue_wait until this function returns true.

Parameters
qA work queue object.
Returns
True if the queue is completely empty, false otherwise.

Referenced by work_queue.WorkQueue::empty().

int work_queue_port ( struct work_queue *  q)

Get the listening port of the queue.

As noted in work_queue_create, there are many controls that affect what TCP port the queue will listen on. Rather than assuming a specific port, the user should simply call this function to determine what port was selected.

Parameters
qA work queue object.
Returns
The port the queue is listening on.

Referenced by work_queue.WorkQueue::port().

void work_queue_get_stats ( struct work_queue *  q,
struct work_queue_stats s 
)

Get queue statistics (only from master).

Parameters
qA work queue object.
sA pointer to a buffer that will be filed with statistics.

Referenced by work_queue.WorkQueue::stats().

void work_queue_get_stats_hierarchy ( struct work_queue *  q,
struct work_queue_stats s 
)

Get statistics of the master queue together with foremen information.

Parameters
qA work queue object.
sA pointer to a buffer that will be filed with statistics.

Referenced by work_queue.WorkQueue::stats_hierarchy().

void work_queue_get_stats_category ( struct work_queue *  q,
const char *  c,
struct work_queue_stats s 
)

Get the task statistics for the given category.

Parameters
qA work queue object.
cA category name.
sA pointer to a buffer that will be filed with statistics.

Referenced by work_queue.WorkQueue::stats_category().

work_queue_task_state_t work_queue_task_state ( struct work_queue *  q,
int  taskid 
)

Get the current state of the task.

Parameters
qA work queue object.
taskidThe taskid of the task.
Returns
One of: WORK_QUEUE_TASK(UNKNOWN|READY|RUNNING|RESULTS|RETRIEVED|DONE)

Referenced by work_queue.WorkQueue::task_state().

void work_queue_set_bandwidth_limit ( struct work_queue *  q,
const char *  bandwidth 
)

Limit the queue bandwidth when transferring files to and from workers.

Parameters
qA work queue object.
bandwidthThe bandwidth limit in bytes per second.
double work_queue_get_effective_bandwidth ( struct work_queue *  q)

Get current queue bandwidth.

Parameters
qA work queue object.
Returns
The average bandwidth in MB/s measured by the master.
char* work_queue_get_worker_summary ( struct work_queue *  q)

Summarize workers.

This function summarizes the workers currently connected to the master, indicating how many from each worker pool are attached.

Parameters
qA work queue object.
Returns
A newly allocated string describing the distribution of workers by pool. The caller must release this string via free().
int work_queue_activate_fast_abort ( struct work_queue *  q,
double  multiplier 
)

Turn on or off fast abort functionality for a given queue for tasks without

an explicit category.

Given the multiplier, abort a task which running time is larger than the average times the multiplier. Fast-abort is computed per task category. The value specified here applies to all the categories for which work_queue_activate_fast_abort_category was not explicitely called.

Parameters
qA work queue object.
multiplierThe multiplier of the average task time at which point to abort; if less than zero, fast_abort is deactivated (the default).
Returns
0 if activated, 1 if deactivated.

Referenced by work_queue.WorkQueue::activate_fast_abort().

int work_queue_activate_fast_abort_category ( struct work_queue *  q,
const char *  category,
double  multiplier 
)

Turn on or off fast abort functionality for a given category.

Given the multiplier, abort a task which running time is larger than the average times the multiplier. The value specified here applies only to tasks in the given category. (Note: work_queue_activate_fast_abort_category(q, "default", n) is the same as work_queue_activate_fast_abort(q, n).)

Parameters
qA work queue object.
categoryA category name.
multiplierThe multiplier of the average task time at which point to abort; if zero, fast_abort is deactivated. If less than zero (default), use the fast abort of the "default" category.
Returns
0 if activated, 1 if deactivated.

Referenced by work_queue.WorkQueue::activate_fast_abort_category().

int work_queue_specify_category_mode ( struct work_queue *  q,
const char *  category,
category_mode_t  mode 
)

Turn on or off first-allocation labeling for a given category.

By default, only cores, memory, and disk are labeled. Turn on/off other specific resources use work_queue_specify_category_autolabel_resource.

Parameters
qA work queue object.
categoryA category name.
modeOne of category_mode_t.
Returns
1 if mode is valid, 0 otherwise.

Referenced by work_queue.WorkQueue::specify_category_mode().

int work_queue_enable_category_resource ( struct work_queue *  q,
const char *  category,
const char *  resource,
int  autolabel 
)

Turn on or off first-allocation labeling for a given category and resource.

This function should be use to fine-tune the defaults from work_queue_specify_category_mode.

Parameters
qA work queue object.
categoryA category name.
resourceA resource name.
autolabel0 off, 1 on.
Returns
1 if resource is valid, 0 otherwise.

Referenced by work_queue.WorkQueue::specify_category_autolabel_resource().

void work_queue_specify_algorithm ( struct work_queue *  q,
work_queue_schedule_t  algorithm 
)

Change the worker selection algorithm.

This function controls which worker will be selected for a given task.

Parameters
qA work queue object.
algorithmThe algorithm to use in assigning a task to a worker. See work_queue_schedule_t for possible values.

Referenced by work_queue.WorkQueue::specify_algorithm().

const char* work_queue_name ( struct work_queue *  q)

Get the project name of the queue.

Parameters
qA work queue object.
Returns
The project name of the queue.

Referenced by work_queue.WorkQueue::name().

void work_queue_specify_name ( struct work_queue *  q,
const char *  name 
)

Change the project name for a given queue.

Parameters
qA work queue object.
nameThe new project name.

Referenced by work_queue.WorkQueue::__init__(), and work_queue.WorkQueue::specify_name().

void work_queue_specify_priority ( struct work_queue *  q,
int  priority 
)

Change the priority for a given queue.

Parameters
qA work queue object.
priorityThe new priority of the queue. Higher priority masters will attract workers first.

Referenced by work_queue.WorkQueue::specify_priority().

void work_queue_specify_num_tasks_left ( struct work_queue *  q,
int  ntasks 
)

Specify the number of tasks not yet submitted to the queue.

  It is used by work_queue_factory to determine the number of workers to launch.
  If not specified, it defaults to 0.
  work_queue_factory considers the number of tasks as:
  num tasks left + num tasks running + num tasks read.
Parameters
qA work queue object.
ntasksNumber of tasks yet to be submitted.

Referenced by work_queue.WorkQueue::specify_num_tasks_left().

void work_queue_specify_catalog_server ( struct work_queue *  q,
const char *  hostname,
int  port 
)

Specify the catalog server the master should report to.

Parameters
qA work queue object.
hostnameThe catalog server's hostname.
portThe port the catalog server is listening on.

Referenced by work_queue.WorkQueue::specify_catalog_server().

void work_queue_specify_catalog_servers ( struct work_queue *  q,
const char *  hosts 
)

Specify the catalog server(s) the master should report to.

Parameters
qA work queue object.
hostsThe catalog servers given as a comma delimited list of hostnames or hostname:port
struct work_queue_task* work_queue_cancel_by_taskid ( struct work_queue *  q,
int  id 
)

Cancel a submitted task using its task id and remove it from queue.

Parameters
qA work queue object.
idThe taskid returned from work_queue_submit.
Returns
The task description of the cancelled task, or null if the task was not found in queue. The returned task must be deleted with work_queue_task_delete or resubmitted with work_queue_submit.

Referenced by work_queue.WorkQueue::cancel_by_taskid().

struct work_queue_task* work_queue_cancel_by_tasktag ( struct work_queue *  q,
const char *  tag 
)

Cancel a submitted task using its tag and remove it from queue.

Parameters
qA work queue object.
tagThe tag name assigned to task using work_queue_task_specify_tag.
Returns
The task description of the cancelled task, or null if the task was not found in queue. The returned task must be deleted with work_queue_task_delete or resubmitted with work_queue_submit.

Referenced by work_queue.WorkQueue::cancel_by_tasktag().

struct list* work_queue_cancel_all_tasks ( struct work_queue *  q)

Cancel all submitted tasks and remove them from the queue.

Parameters
qA work queue object.
Returns
A list of all of the tasks submitted to q. Each task must be deleted with work_queue_task_delete or resubmitted with work_queue_submit.
int work_queue_shut_down_workers ( struct work_queue *  q,
int  n 
)

Shut down workers connected to the work_queue system.

Gives a best effort and then returns the number of workers given the shut down order.

Parameters
qA work queue object.
nThe number to shut down. All workers if given "0".

Referenced by work_queue.WorkQueue::shutdown_workers().

void work_queue_delete ( struct work_queue *  q)

Delete a work queue.

This function should only be called after work_queue_empty returns true.

Parameters
qA work queue to delete.

Referenced by work_queue.WorkQueue::__init__().

int work_queue_specify_log ( struct work_queue *  q,
const char *  logfile 
)

Add a log file that records cummulative statistics of the connected workers and submitted tasks.

Parameters
qA work queue object.
logfileThe filename.
Returns
1 if logfile was opened, 0 otherwise.

Referenced by work_queue.WorkQueue::specify_log().

int work_queue_specify_transactions_log ( struct work_queue *  q,
const char *  logfile 
)

Add a log file that records the states of the connected workers and tasks.

Parameters
qA work queue object.
logfileThe filename.
Returns
1 if logfile was opened, 0 otherwise.

Referenced by work_queue.WorkQueue::specify_transactions_log().

void work_queue_specify_password ( struct work_queue *  q,
const char *  password 
)

Add a mandatory password that each worker must present.

Parameters
qA work queue object.
passwordThe password to require.

Referenced by work_queue.WorkQueue::specify_password().

int work_queue_specify_password_file ( struct work_queue *  q,
const char *  file 
)

Add a mandatory password file that each worker must present.

Parameters
qA work queue object.
fileThe name of the file containing the password.
Returns
True if the password was loaded, false otherwise.

Referenced by work_queue.WorkQueue::specify_password_file().

void work_queue_specify_keepalive_interval ( struct work_queue *  q,
int  interval 
)

Change the keepalive interval for a given queue.

Parameters
qA work queue object.
intervalThe minimum number of seconds to wait before sending new keepalive checks to workers.

Referenced by work_queue.WorkQueue::specify_keepalive_interval().

void work_queue_specify_keepalive_timeout ( struct work_queue *  q,
int  timeout 
)

Change the keepalive timeout for identifying dead workers for a given queue.

Parameters
qA work queue object.
timeoutThe minimum number of seconds to wait for a keepalive response from worker before marking it as dead.

Referenced by work_queue.WorkQueue::specify_keepalive_timeout().

void work_queue_master_preferred_connection ( struct work_queue *  q,
const char *  preferred_connection 
)

Set the preference for using hostname over IP address to connect.

'by_ip' uses IP address (standard behavior), or 'by_hostname' to use the hostname at the master.

Parameters
qA work queue object.
preferred_connectionAn string to indicate using 'by_ip' or a 'by_hostname'.

Referenced by work_queue.WorkQueue::specify_master_preferred_connection().

int work_queue_tune ( struct work_queue *  q,
const char *  name,
double  value 
)

Tune advanced parameters for work queue.

Parameters
qA work queue object.
nameThe name of the parameter to tune
  • "asynchrony-multiplier" Treat each worker as having (actual_cores * multiplier) total cores. (default = 1.0)
  • "asynchrony-modifier" Treat each worker as having an additional "modifier" cores. (default=0)
  • "min-transfer-timeout" Set the minimum number of seconds to wait for files to be transferred to or from a worker. (default=10)
  • "foreman-transfer-timeout" Set the minimum number of seconds to wait for files to be transferred to or from a foreman. (default=3600)
  • "transfer-outlier-factor" Transfer that are this many times slower than the average will be aborted. (default=10x)
  • "default-transfer-rate" The assumed network bandwidth used until sufficient data has been collected. (1MB/s)
  • "fast-abort-multiplier" Set the multiplier of the average task time at which point to abort; if negative or zero fast_abort is deactivated. (default=0)
  • "keepalive-interval" Set the minimum number of seconds to wait before sending new keepalive checks to workers. (default=300)
  • "keepalive-timeout" Set the minimum number of seconds to wait for a keepalive response from worker before marking it as dead. (default=30)
valueThe value to set the parameter to.
Returns
0 on succes, -1 on failure.

Referenced by work_queue.WorkQueue::tune().

void work_queue_specify_max_resources ( struct work_queue *  q,
const struct rmsummary rm 
)

Sets the maximum resources a task without an explicit category ("default" category).

rm specifies the maximum resources a task in the default category may use.

Parameters
qReference to the current work queue object.
rmStructure indicating maximum values. See for possible fields.

Referenced by work_queue.WorkQueue::specify_max_resources().

void work_queue_specify_category_max_resources ( struct work_queue *  q,
const char *  category,
const struct rmsummary rm 
)

Sets the maximum resources a task in the category may use.

Parameters
qReference to the current work queue object.
categoryName of the category.
rmStructure indicating maximum values. See for possible fields.

Referenced by work_queue.WorkQueue::specify_category_max_resources().

void work_queue_specify_category_first_allocation_guess ( struct work_queue *  q,
const char *  category,
const struct rmsummary rm 
)

Set the initial guess for resource autolabeling for the given category.

Parameters
qReference to the current work queue object.
categoryName of the category.
rmStructure indicating maximum values. See for possible fields.

Referenced by work_queue.WorkQueue::specify_category_first_allocation_guess().

void work_queue_initialize_categories ( struct work_queue *  q,
struct rmsummary max,
const char *  summaries_file 
)

Initialize first value of categories.

Parameters
qReference to the current work queue object.
rmStructure indicating maximum overall values. See for possible fields.
filenameJSON file with resource summaries.

Referenced by work_queue.WorkQueue::initialize_categories().

void work_queue_specify_task_order ( struct work_queue *  q,
int  order 
)

Specified how the submitted tasks should be ordered.

It does not have any effect now.

Parameters
qA work queue object.
orderThe ordering to use for dispatching submitted tasks:

Referenced by work_queue.WorkQueue::specify_task_order().

void work_queue_specify_master_mode ( struct work_queue *  q,
int  mode 
)

Specify the master mode for a given queue.

Parameters
qA work queue object.
mode
Deprecated:
Enabled automatically when work_queue_specify_name is used.

Referenced by work_queue.WorkQueue::__init__(), and work_queue.WorkQueue::specify_master_mode().

void work_queue_specify_estimate_capacity_on ( struct work_queue *  q,
int  estimate_capacity_on 
)

Change whether to estimate master capacity for a given queue.

Parameters
qA work queue object.
estimate_capacity_onif the value of this parameter is 1, then work queue should estimate the master capacity. If the value is 0, then work queue would not estimate its master capacity.
Deprecated:
This feature is always enabled.

Referenced by work_queue.WorkQueue::estimate_capacity().

int work_queue_task_specify_input_buf ( struct work_queue_task t,
const char *  buf,
int  length,
const char *  rname 
)

Add an input buffer to a task.

Parameters
tThe task to which to add parameters
bufA pointer to the data buffer to send to the worker to be available to the commands.
lengthThe number of bytes of data in the buffer
rnameThe name of the file in which to store the buffer data on the worker
Returns
1 if the input buffer is successfully specified, 0 if either of t or rname is null or rname is an absolute path.
Deprecated:
Use work_queue_task_specify_buffer instead.
int work_queue_task_specify_input_file ( struct work_queue_task t,
const char *  fname,
const char *  rname 
)

Add an input file to a task.

Parameters
tThe task to which to add parameters
fnameThe name of the data file to send to the worker to be available to the commands.
rnameThe name of the file in which to store the buffer data on the worker.
Returns
1 if the input file is successfully specified, 0 if either of t, fname, or rname is null or rname is an absolute path.
Deprecated:
See work_queue_task_specify_file instead.
int work_queue_task_specify_input_file_do_not_cache ( struct work_queue_task t,
const char *  fname,
const char *  rname 
)

Add an input file to a task, without caching.

Parameters
tThe task to which to add parameters
fnameThe name of the data file to send to the worker to be available to the commands.
rnameThe name of the file in which to store the buffer data on the worker.
Returns
1 if the input file is successfully specified, 0 if either of t, fname, or rname is null or rname is an absolute path.
Deprecated:
See work_queue_task_specify_file instead.
int work_queue_task_specify_output_file ( struct work_queue_task t,
const char *  rname,
const char *  fname 
)

Add an output file to a task.

Parameters
tThe task to which to add parameters
rnameThe name of a file created by the program when it runs.
fnameThe name of the file local target for copying rname back.
Returns
1 if the output file is successfully specified, 0 if either of t, fname, or rname is null or rname is an absolute path.
Deprecated:
See work_queue_task_specify_file instead.
int work_queue_task_specify_output_file_do_not_cache ( struct work_queue_task t,
const char *  rname,
const char *  fname 
)

Add an output file to a task without caching.

Parameters
tThe task to which to add parameters
rnameThe name of a file created by the program when it runs.
fnameThe name of the file local target for copying rname back.
Returns
1 if the output file is successfully specified, 0 if either of t, fname, or rname is null or rname is an absolute path.
Deprecated:
See work_queue_task_specify_file instead.
char* work_queue_generate_disk_alloc_full_filename ( char *  pwd,
int  taskid 
)

Generate a worker-level unique filename to indicate a disk allocation being full.

Parameters
pThe process for which we generate a unique disk allocation filename.
Returns
The string corresponding to the filename.

Variable Documentation

int wq_option_scheduler

Initial setting for algorithm to assign tasks to workers upon creating queue .

Change prior to calling work_queue_create, after queue is created this variable is not considered and changes must be made through the API calls.