cctools
Data Structures | Functions
resource_monitor Namespace Reference

Python resource_monitor bindings. More...

Data Structures

class  ResourceExhaustion
 Exception raised when a function goes over the resources limits. More...
 
class  ResourceInternalError
 
class  Categories
 Class to encapsule all the categories in a workflow. More...
 
class  Category
 

Functions

def monitored
 Create a monitored version of a function. More...
 

Detailed Description

Python resource_monitor bindings.

The objects and methods provided by this package correspond to the native C API in category.h, rmonitor_poll.h, and rmsummary.h

The SWIG-based Python bindings provide a higher-level interface that revolves around the following function/decorator and objects:

Function Documentation

def resource_monitor.monitored (   limits = None,
  callback = None,
  interval = 1,
  return_resources = True 
)

Create a monitored version of a function.

It can be used as a decorator, or called by itself.

Parameters
limitsDictionary of resource limits to set. Available limits are:
  • wall_time: time spent during execution (seconds)
  • cpu_time: user + system time of the execution (seconds)
  • cores: peak number of cores used
  • cores_avg: number of cores computed as cpu_time/wall_time
  • max_concurrent_processes: the maximum number of processes running concurrently
  • total_processes: count of all of the processes created
  • virtual_memory: maximum virtual memory across all processes (megabytes)
  • memory: maximum resident size across all processes (megabytes)
  • swap_memory: maximum swap usage across all processes (megabytes)
  • bytes_read: number of bytes read from disk
  • bytes_written: number of bytes written to disk
  • bytes_received: number of bytes read from the network
  • bytes_sent: number of bytes written to the network
  • bandwidth: maximum network bits/s (average over one minute)
  • total_files: total maximum number of files and directories of all the working directories in the tree
  • disk: size of all working directories in the tree (megabytes)
callbackFunction to call every time a measurement is done. The arguments given to the function are
  • id: Unique identifier for the function and its arguments.
  • name: Name of the original function.
  • step: Measurement step. It is -1 for the last measurement taken.
  • resources: Current resources measured.
intervalMaximum time in seconds between measurements.
return_resourcesWhether to modify the return value of the function to a tuple of the original result and a dictionary with the final measurements.
1 # Decorating a function:
2 @monitored()
3 def my_sum_a(lst):
4  return sum(lst)
5 
6 @monitored(return_resources = False, callback = lambda id, name, step, res: print('memory used', res['memory']))
7 def my_sum_b(lst):
8  return sum(lst)
9 
10 >>> (result_a, resources) = my_sum_a([1,2,3])
11 >>> print(result, resources['memory'])
12 6, 66
13 
14 >>> result_b = my_sum_b([1,2,3])
15 memory used: 66
16 
17 >>> assert(result_a == result_b)
18 
19 
20 # Wrapping the already defined function 'sum', adding limits:
21 my_sum_monitored = monitored(limits = {'memory': 1024})(sum)
22 try:
23  # original_result = sum(...)
24  (original_result, resources_used) = my_sum_monitored(...)
25 except ResourceExhaustion as e:
26  print(e)
27  ...
28 
29 # Defining a function with a callback and a decorator.
30 # In this example, we record the time series of resources used:
31 import multiprocessing
32 results_series = multiprocessing.Queue()
33 
34 def my_callback(id, name, step, resources):
35  results_series.put((step, resources))
36 
37 @monitored(callback = my_callback, return_resources = False):
38 def my_function(...):
39  ...
40 
41 result = my_function(...)
42 
43 # print the time series
44 while not results_series.empty():
45  try:
46  step, resources = results_series.get(False)
47  print(step, resources)
48  except multiprocessing.Empty:
49  pass