cctools
Public Member Functions
work_queue.Task Class Reference

Python Task object. More...

Inheritance diagram for work_queue.Task:
work_queue.PythonTask

Public Member Functions

def __init__
 Create a new task specification. More...
 
def clone
 Return a copy of this task. More...
 
def specify_command
 Set the command to be executed by the task. More...
 
def specify_algorithm
 Set the worker selection algorithm for task. More...
 
def specify_tag
 Attach a user defined logical name to the task. More...
 
def specify_category
 Label the task with the given category. More...
 
def specify_feature
 Label the task with the given user-defined feature. More...
 
def specify_preferred_host
 Indicate that the task would be optimally run on a given host. More...
 
def specify_file
 Add a file to the task. More...
 
def specify_file_command
 Add a file to the task which will be transfered with a command at the worker. More...
 
def specify_file_piece
 Add a file piece to the task. More...
 
def specify_input_file
 Add a input file to the task. More...
 
def specify_output_file
 Add a output file to the task. More...
 
def specify_directory
 Add a directory to the task. More...
 
def specify_buffer
 Add an input bufer to the task. More...
 
def specify_snapshot_file
 When monitoring, indicates a json-encoded file that instructs the monitor to take a snapshot of the task resources. More...
 
def specify_max_retries
 Indicate the number of times the task should be retried. More...
 
def specify_cores
 Indicate the number of cores required by this task. More...
 
def specify_memory
 Indicate the memory (in MB) required by this task. More...
 
def specify_disk
 Indicate the disk space (in MB) required by this task. More...
 
def specify_gpus
 Indicate the number of GPUs required by this task. More...
 
def specify_priority
 Indicate the the priority of this task (larger means better priority, default is 0). More...
 
def specify_environment_variable
 Set this environment variable before running the task. More...
 
def specify_monitor_output
 Set a name for the resource summary output directory from the monitor. More...
 
def tag
 Get the user-defined logical name for the task. More...
 
def category
 Get the category name for the task. More...
 
def command
 Get the shell command executed by the task. More...
 
def priority
 Get the priority of the task. More...
 
def algorithm
 Get the algorithm for choosing worker to run the task. More...
 
def std_output
 Get the standard output of the task. More...
 
def output
 Get the standard output of the task. More...
 
def id
 Get the task id number. More...
 
def return_status
 Get the exit code of the command executed by the task. More...
 
def result
 Get the result of the task as an integer code, such as successful, missing file, etc. More...
 
def result_str
 Return a string that explains the result of a task. More...
 
def total_submissions
 Get the number of times the task has been resubmitted internally. More...
 
def exhausted_attempts
 Get the number of times the task has been failed given resource exhaustion. More...
 
def host
 Get the address and port of the host on which the task ran. More...
 
def hostname
 Get the name of the host on which the task ran. More...
 
def submit_time
 Get the time at which this task was submitted. More...
 
def finish_time
 Get the time at which this task was finished. More...
 
def total_cmd_exhausted_execute_time
 Get the total time the task executed and failed given resource exhaustion. More...
 
def app_delay
 Get the time spent in upper-level application (outside of work_queue_wait). More...
 
def send_input_start
 Get the time at which the task started to transfer input files. More...
 
def send_input_finish
 Get the time at which the task finished transferring input files. More...
 
def execute_cmd_start
 The time at which the task began. More...
 
def execute_cmd_finish
 Get the time at which the task finished (discovered by the manager). More...
 
def receive_output_start
 Get the time at which the task started to transfer output files. More...
 
def receive_output_finish
 Get the time at which the task finished transferring output files. More...
 
def total_bytes_received
 Get the number of bytes received since task started receiving input data. More...
 
def total_bytes_sent
 Get the number of bytes sent since task started sending input data. More...
 
def total_bytes_transferred
 Get the number of bytes transferred since task started transferring input data. More...
 
def total_transfer_time
 Get the time comsumed in microseconds for transferring total_bytes_transferred. More...
 
def cmd_execution_time
 Time spent in microseconds for executing the command until completion on a single worker. More...
 
def total_cmd_execution_time
 Accumulated time spent in microseconds for executing the command on any worker, regardless of whether the task finished (i.e., this includes time running on workers that disconnected). More...
 
def resources_measured
 Get the resources measured for the task execution if resource monitoring is enabled. More...
 
def limits_exceeded
 Get the resources the task exceeded. More...
 
def resources_requested
 Get the resources the task requested to run. More...
 
def resources_allocated
 Get the resources allocated to the task in its latest attempt. More...
 

Detailed Description

Python Task object.

This class is used to create a task specification.

Constructor & Destructor Documentation

def work_queue.Task.__init__ (   self,
  command 
)

Create a new task specification.

Parameters
selfReference to the current task object.
commandThe shell command line to be exected by the task.

References work_queue.Task._task, work_queue_task_create(), and work_queue_task_delete().

Member Function Documentation

def work_queue.Task.clone (   self)

Return a copy of this task.

Return a (deep)copy this task that can also be submitted to the WorkQueue.

References work_queue.Task._task, and work_queue_task_clone().

def work_queue.Task.specify_command (   self,
  command 
)

Set the command to be executed by the task.

Parameters
selfReference to the current task object.
commandThe command to be executed.

References work_queue.Task._task, and work_queue_task_specify_command().

def work_queue.Task.specify_algorithm (   self,
  algorithm 
)

Set the worker selection algorithm for task.

Parameters
selfReference to the current task object.
algorithmOne of the following algorithms to use in assigning a task to a worker. See work_queue_schedule_t for possible values.

References work_queue.Task._task, and work_queue_task_specify_algorithm().

def work_queue.Task.specify_tag (   self,
  tag 
)

Attach a user defined logical name to the task.

Parameters
selfReference to the current task object.
tagThe tag to attach to task.

References work_queue.Task._task, and work_queue_task_specify_tag().

def work_queue.Task.specify_category (   self,
  name 
)

Label the task with the given category.

It is expected that tasks with the same category have similar resources requirements (e.g. for fast abort).

Parameters
selfReference to the current task object.
nameThe name of the category

References work_queue.Task._task, and work_queue_task_specify_category().

def work_queue.Task.specify_feature (   self,
  name 
)

Label the task with the given user-defined feature.

Tasks with the feature will only run on workers that provide it (see worker's –feature option).

Parameters
selfReference to the current task object.
nameThe name of the feature.

References work_queue.Task._task, and work_queue_task_specify_feature().

def work_queue.Task.specify_preferred_host (   self,
  hostname 
)

Indicate that the task would be optimally run on a given host.

Parameters
selfReference to the current task object.
hostnameThe hostname to which this task would optimally be sent.

References work_queue.Task._task.

def work_queue.Task.specify_file (   self,
  local_name,
  remote_name = None,
  type = None,
  flags = None,
  cache = None,
  failure_only = None 
)

Add a file to the task.

Parameters
selfReference to the current task object.
local_nameThe name of the file on local disk or shared filesystem.
remote_nameThe name of the file at the execution site.
typeMust be one of the following values: WORK_QUEUE_INPUT or WORK_QUEUE_OUTPUT
flagsMay be zero to indicate no special handling, or any of the work_queue_file_flags_t or'd together The most common are:
cacheWhether the file should be cached at workers (True/False)
failure_onlyFor output files, whether the file should be retrieved only when the task fails (e.g., debug logs).

For example:

1 # The following are equivalent
2 >>> task.specify_file("/etc/hosts", type=WORK_QUEUE_INPUT, cache = True)
3 >>> task.specify_file("/etc/hosts", "hosts", type=WORK_QUEUE_INPUT, cache = True)

References work_queue.Task._task, and work_queue_task_specify_file().

Referenced by work_queue.Task.specify_input_file(), and work_queue.Task.specify_output_file().

def work_queue.Task.specify_file_command (   self,
  remote_name,
  cmd,
  type = None,
  flags = None,
  cache = None,
  failure_only = None 
)

Add a file to the task which will be transfered with a command at the worker.

Parameters
selfReference to the current task object.
remote_nameThe name of the file as seen by the task.
cmdThe shell command to transfer the file. Any occurance of the string %% will be replaced with the internal name that work queue uses for the file.
typeMust be one of the following values: WORK_QUEUE_INPUT or WORK_QUEUE_OUTPUT
flagsMay be zero to indicate no special handling, or any of the work_queue_file_flags_t or'd together The most common are:
cacheWhether the file should be cached at workers (True/False)
failure_onlyFor output files, whether the file should be retrieved only when the task fails (e.g., debug logs).

For example:

1 # The following are equivalent
2 >>> task.specify_file_command("my.result", "chirp_put %% chirp://somewhere/result.file", type=WORK_QUEUE_OUTPUT)

References work_queue.Task._task, and work_queue_task_specify_file_command().

def work_queue.Task.specify_file_piece (   self,
  local_name,
  remote_name = None,
  start_byte = 0,
  end_byte = 0,
  type = None,
  flags = None,
  cache = None,
  failure_only = None 
)

Add a file piece to the task.

Parameters
selfReference to the current task object.
local_nameThe name of the file on local disk or shared filesystem.
remote_nameThe name of the file at the execution site.
start_byteThe starting byte offset of the file piece to be transferred.
end_byteThe ending byte offset of the file piece to be transferred.
typeMust be one of the following values: WORK_QUEUE_INPUT or WORK_QUEUE_OUTPUT
flagsMay be zero to indicate no special handling, or any of the work_queue_file_flags_t or'd together The most common are:
cacheWhether the file should be cached at workers (True/False)
failure_onlyFor output files, whether the file should be retrieved only when the task fails (e.g., debug logs).

References work_queue.Task._task, and work_queue_task_specify_file_piece().

def work_queue.Task.specify_input_file (   self,
  local_name,
  remote_name = None,
  flags = None,
  cache = None 
)

Add a input file to the task.

This is just a wrapper for specify_file with type set to WORK_QUEUE_INPUT.

References work_queue.Task.specify_file().

Referenced by work_queue.PythonTask.output().

def work_queue.Task.specify_output_file (   self,
  local_name,
  remote_name = None,
  flags = None,
  cache = None,
  failure_only = None 
)

Add a output file to the task.

This is just a wrapper for specify_file with type set to WORK_QUEUE_OUTPUT.

References work_queue.Task.specify_file().

Referenced by work_queue.PythonTask.output().

def work_queue.Task.specify_directory (   self,
  local_name,
  remote_name = None,
  type = None,
  flags = None,
  recursive = False,
  cache = None,
  failure_only = None 
)

Add a directory to the task.

Parameters
selfReference to the current task object.
local_nameThe name of the directory on local disk or shared filesystem. Optional if the directory is empty.
remote_nameThe name of the directory at the remote execution site.
typeMust be one of the following values: WORK_QUEUE_INPUT or WORK_QUEUE_OUTPUT
flagsMay be zero to indicate no special handling, or any of the work_queue_file_flags_t or'd together The most common are:
recursiveIndicates whether just the directory (False) or the directory and all of its contents (True) should be included.
cacheWhether the file should be cached at workers (True/False)
failure_onlyFor output directories, whether the file should be retrieved only when the task fails (e.g., debug logs).
Returns
1 if the task directory is successfully specified, 0 if either of local_name, or remote_name is null or remote_name is an absolute path.

References work_queue.Task._task, and work_queue_task_specify_directory().

def work_queue.Task.specify_buffer (   self,
  buffer,
  remote_name,
  flags = None,
  cache = None 
)

Add an input bufer to the task.

Parameters
selfReference to the current task object.
bufferThe contents of the buffer to pass as input.
remote_nameThe name of the remote file to create.
flagsMay take the same values as specify_file.
cacheWhether the file should be cached at workers (True/False)

References work_queue.Task._task, and work_queue_task_specify_buffer().

def work_queue.Task.specify_snapshot_file (   self,
  filename 
)

When monitoring, indicates a json-encoded file that instructs the monitor to take a snapshot of the task resources.

Snapshots appear in the JSON summary file of the task, under the key "snapshots". Snapshots are taken on events on files described in the monitor_snapshot_file. The monitor_snapshot_file is a json encoded file with the following format:

1 {
2  "FILENAME": {
3  "from-start":boolean,
4  "from-start-if-truncated":boolean,
5  "delete-if-found":boolean,
6  "events": [
7  {
8  "label":"EVENT_NAME",
9  "on-create":boolean,
10  "on-truncate":boolean,
11  "pattern":"REGEXP",
12  "count":integer
13  },
14  {
15  "label":"EVENT_NAME",
16  ...
17  }
18  ]
19  },
20  "FILENAME": {
21  ...
22 }

All keys but "label" are optional:

from-start:boolean If FILENAME exits when task starts running, process from line 1. Default: false, as the task may be appending to an already existing file. from-start-if-truncated If FILENAME is truncated, process from line 1. Default: true, to account for log rotations. delete-if-found Delete FILENAME when found. Default: false

events: label Name that identifies the snapshot. Only alphanumeric, -, and _ characters are allowed. on-create Take a snapshot every time the file is created. Default: false on-truncate Take a snapshot when the file is truncated. Default: false on-pattern Take a snapshot when a line matches the regexp pattern. Default: none count Maximum number of snapshots for this label. Default: -1 (no limit)

Exactly one of on-create, on-truncate, or on-pattern should be specified.

Once a task has finished, the snapshots are available as:

1 for s in t.resources_measured.snapshots:
2  print(s.memory)

For more information, consult the manual of the resource_monitor.

Parameters
selfReference to the current task object.
filenameThe name of the snapshot events specification

References work_queue.Task._task, and work_queue_specify_snapshot_file().

def work_queue.Task.specify_max_retries (   self,
  max_retries 
)

Indicate the number of times the task should be retried.

If 0 (the default), the task is tried indefinitely. A task that did not succeed after the given number of retries is returned with result WORK_QUEUE_RESULT_MAX_RETRIES.

References work_queue.Task._task, and work_queue_task_specify_max_retries().

def work_queue.Task.specify_cores (   self,
  cores 
)

Indicate the number of cores required by this task.

References work_queue.Task._task, and work_queue_task_specify_cores().

def work_queue.Task.specify_memory (   self,
  memory 
)

Indicate the memory (in MB) required by this task.

References work_queue.Task._task, and work_queue_task_specify_memory().

def work_queue.Task.specify_disk (   self,
  disk 
)

Indicate the disk space (in MB) required by this task.

References work_queue.Task._task, and work_queue_task_specify_disk().

def work_queue.Task.specify_gpus (   self,
  gpus 
)

Indicate the number of GPUs required by this task.

References work_queue.Task._task, and work_queue_task_specify_gpus().

def work_queue.Task.specify_priority (   self,
  priority 
)
def work_queue.Task.specify_environment_variable (   self,
  name,
  value = None 
)

Set this environment variable before running the task.

If value is None, then variable is unset.

References work_queue.Task._task, and work_queue_task_specify_environment_variable().

def work_queue.Task.specify_monitor_output (   self,
  directory 
)

Set a name for the resource summary output directory from the monitor.

References work_queue.Task._task, and work_queue_task_specify_monitor_output().

def work_queue.Task.tag (   self)

Get the user-defined logical name for the task.

1 >>> print(t.tag)
def work_queue.Task.category (   self)

Get the category name for the task.

1 >>> print(t.category)
def work_queue.Task.command (   self)

Get the shell command executed by the task.

1 >>> print(t.command)
def work_queue.Task.priority (   self)

Get the priority of the task.

1 >>> print(t.priority)
def work_queue.Task.algorithm (   self)

Get the algorithm for choosing worker to run the task.

1 >>> print(t.algorithm)
def work_queue.Task.std_output (   self)

Get the standard output of the task.

Must be called only after the task completes execution.

1 >>> print(t.std_output)

Referenced by work_queue.PythonTask.output().

def work_queue.Task.output (   self)

Get the standard output of the task.

(Same as t.std_output for regular work queue tasks) Must be called only after the task completes execution.

1 >>> print(t.output)
def work_queue.Task.id (   self)

Get the task id number.

Must be called only after the task was submitted.

1 >>> print(t.id)
def work_queue.Task.return_status (   self)

Get the exit code of the command executed by the task.

Must be called only after the task completes execution.

1 >>> print(t.return_status)
def work_queue.Task.result (   self)

Get the result of the task as an integer code, such as successful, missing file, etc.

See work_queue_result_t for possible values. Must be called only after the task completes execution.

1 >>> print(t.result)
2 0
def work_queue.Task.result_str (   self)

Return a string that explains the result of a task.

Must be called only after the task completes execution.

1 >>> print(t.result_str)
2 'SUCCESS'

References work_queue_result_str().

def work_queue.Task.total_submissions (   self)

Get the number of times the task has been resubmitted internally.

Must be called only after the task completes execution.

1 >>> print(t.total_submissions)
def work_queue.Task.exhausted_attempts (   self)

Get the number of times the task has been failed given resource exhaustion.

1 >>> print(t.exhausted_attempts)
def work_queue.Task.host (   self)

Get the address and port of the host on which the task ran.

Must be called only after the task completes execution.

1 >>> print(t.host)
def work_queue.Task.hostname (   self)

Get the name of the host on which the task ran.

Must be called only after the task completes execution.

1 >>> print(t.hostname)
def work_queue.Task.submit_time (   self)

Get the time at which this task was submitted.

Must be called only after the task completes execution.

1 >>> print(t.submit_time)
def work_queue.Task.finish_time (   self)

Get the time at which this task was finished.

Must be called only after the task completes execution.

1 >>> print(t.finish_time)
def work_queue.Task.total_cmd_exhausted_execute_time (   self)

Get the total time the task executed and failed given resource exhaustion.

1 >>> print(t.total_cmd_exhausted_execute_time)
def work_queue.Task.app_delay (   self)

Get the time spent in upper-level application (outside of work_queue_wait).

Must be called only after the task completes execution.

1 >>> print(t.app_delay)
def work_queue.Task.send_input_start (   self)

Get the time at which the task started to transfer input files.

Must be called only after the task completes execution.

1 >>> print(t.send_input_start)
def work_queue.Task.send_input_finish (   self)

Get the time at which the task finished transferring input files.

Must be called only after the task completes execution.

1 >>> print(t.send_input_finish)
def work_queue.Task.execute_cmd_start (   self)

The time at which the task began.

Must be called only after the task completes execution.

1 >>> print(t.execute_cmd_start)
def work_queue.Task.execute_cmd_finish (   self)

Get the time at which the task finished (discovered by the manager).

Must be called only after the task completes execution.

1 >>> print(t.execute_cmd_finish)
def work_queue.Task.receive_output_start (   self)

Get the time at which the task started to transfer output files.

Must be called only after the task completes execution.

1 >>> print(t.receive_output_start)
def work_queue.Task.receive_output_finish (   self)

Get the time at which the task finished transferring output files.

Must be called only after the task completes execution.

1 >>> print(t.receive_output_finish)
def work_queue.Task.total_bytes_received (   self)

Get the number of bytes received since task started receiving input data.

Must be called only after the task completes execution.

1 >>> print(t.total_bytes_received)
def work_queue.Task.total_bytes_sent (   self)

Get the number of bytes sent since task started sending input data.

Must be called only after the task completes execution.

1 >>> print(t.total_bytes_sent)
def work_queue.Task.total_bytes_transferred (   self)

Get the number of bytes transferred since task started transferring input data.

Must be called only after the task completes execution.

1 >>> print(t.total_bytes_transferred)
def work_queue.Task.total_transfer_time (   self)

Get the time comsumed in microseconds for transferring total_bytes_transferred.

Must be called only after the task completes execution.

1 >>> print(t.total_transfer_time)
def work_queue.Task.cmd_execution_time (   self)

Time spent in microseconds for executing the command until completion on a single worker.

Must be called only after the task completes execution.

1 >>> print(t.cmd_execution_time)
def work_queue.Task.total_cmd_execution_time (   self)

Accumulated time spent in microseconds for executing the command on any worker, regardless of whether the task finished (i.e., this includes time running on workers that disconnected).

Must be called only after the task completes execution.

1 >>> print(t.total_cmd_execution_time)
def work_queue.Task.resources_measured (   self)

Get the resources measured for the task execution if resource monitoring is enabled.

Must be called only after the task completes execution. Valid fields:

start: microseconds at the start of execution

end: microseconds at the end of execution

wall_time: microseconds spent during execution

cpu_time: user + system time of the execution

cores: peak number of cores used

cores_avg: number of cores computed as cpu_time/wall_time

gpus: peak number of gpus used

max_concurrent_processes: the maximum number of processes running concurrently

total_processes: count of all of the processes created

virtual_memory: maximum virtual memory across all processes

memory: maximum resident size across all processes

swap_memory: maximum swap usage across all processes

bytes_read: number of bytes read from disk

bytes_written: number of bytes written to disk

bytes_received: number of bytes read from the network

bytes_sent: number of bytes written to the network

bandwidth: maximum network bits/s (average over one minute)

total_files: total maximum number of files and directories of all the working directories in the tree

disk: size in MB of all working directories in the tree

1 >>> print(t.resources_measured.memory)
def work_queue.Task.limits_exceeded (   self)

Get the resources the task exceeded.

For valid field see resources_measured.

def work_queue.Task.resources_requested (   self)

Get the resources the task requested to run.

For valid fields see resources_measured.

def work_queue.Task.resources_allocated (   self)

Get the resources allocated to the task in its latest attempt.

For valid fields see resources_measured.


The documentation for this class was generated from the following file: