pyslurmtq package

Submodules

pyslurmtq.SLURMTaskQueue module

SLURMTaskQueue

Defines classes for executing a collection of tasks in a single SLURM job. A task is defined as a command to be run in parallel using a given number of SLURM tasks, as would be run with ibrun -n.

class pyslurmtq.SLURMTaskQueue.SLURMTaskQueue(tasks: List[dict] | None = None, task_file: str | None = None, workdir: str | None = None, task_max_runtime: float = 10000000000.0, max_runtime: float = 10000000000.0, delay: float = 1, loglevel: int = 10, summary_interval: float = 60)[source]

Bases: object

Implements a Task Queue of :class:Task objects to be executed in parallel on available compute nodes according to the SLURM environment variables.

task_slots

List of task slots available. This is parsed upon initialization from SLURM environment variables SLURM_JOB_NODELIST and SLURM_TASKS_PER_NODE.

Type:

List(:class:Slot)

workdir

Path to directory to store files for tasks executed, if the tasks themselves dont specify their own work directories. Defaults to a directory with the prefix .stq-job{SLURM_JOB_ID}- in the current working directory.

Type:

str

delay

Number of seconds to pause between iterations of updating the queue. Default is 1 second. Note this affects the poll rate of tasks runing in the queue.

Type:

float

task_max_runtime

Max run time, in seconds, any individual task in the queue can run for.

Type:

float

max_runtime

Max run time, in seconds, for execution of run() to empty the queue.

Type:

float

task_count

Running counter, starting from 0, of total tasks that pass through the queue. The current count is used for the task_id of the next task added to the queue, so that a tasks task_id corresponds to the order in which it was added to the queue.

Type:

int

running_time

Total running time of the queue when run() is executed.

Type:

float

queue

List of :class:Task in queue. Populated via the enqueue_from_json() method.

Type:

List(:class:Task)

running

List of :class:Task that are currently running.

Type:

List(:clas:Task)

completed

List of :class:Task that are completed running successfully, in that the process executing them returned a 0 exit code.

Type:

List(:clas:Task)

errored

List of :class:Task that failed to run successfully in that the processes executing them returned a non-zero exit code..

Type:

List(:clas:Task)

timed_out

List of :class:Task that failed to run successfully in that the their runtime exceeded task_max_runtime.

Type:

List(:clas:Task)

invalid

List of :class:Task that were not run because their configurations were invalid, or the amount of resources required to run them was too large.

Type:

List(:clas:Task)

cleanup()[source]

Clean-up Task Queue by removing workdir

enqueue(task_list: List[dict], cores: int = 1)[source]

Add a list of tasks to the queue. Each task is a dictionary with at mininum each containing a cmnd field indicating the command to be executed in parallel using a corresponding number of cores, which defaults to the passed in value if not specified per task configuration.

Parameters:
  • task_list (List[dict]) –

    List of dictionaries, one per task with the following fields:

    ’cmnd’ : required, parllalel command to execute ‘cores’ : optional, number of cores to user on this task ‘pre_process’ : optional, serial command to run prior to running ‘cmnd’ ‘post_process’ : optional, serial command to run after running ‘cmnd’

  • cores (int) – Default number of cores to use for each task if not specified within task configuration.

enqueue_from_json(filename, cores=1)[source]

Add a list of tasks to the queue from a JSON file. The json file must contain a list of configurations, with at mininum each containing a cmnd field indicating the command to be executed in parallel using a corresponding number of cores, which defaults to the passed in value if not specified per task configuration.

Parameters:
  • filename (str) – Path to json files containing list of json configurations, one per task to add to the queue.

  • cores (int) – Default number of cores to use for each task if not specified within task configuration.

get_log(fields: List[str] = ['asctime', 'levelname', 'message'], search: str | None = None, match: str | None = None, print_log: bool = True) List[Dict[str, str | int | float]][source]

Get and optionally print log entries.

Parameters:

fieldsList[str], optional

List of fields to include in the summary. Defaults to [“asctime”, “levelname”, “message”].

searchstr, optional

String to search for in the summary. Defaults to None.

matchstr, optional

Regular expression to match against the search string. Defaults to None.

print_logbool, optional

Whether to print the log to the console. Defaults to True.

Returns:

List[Dict[str, Union[str, int, float]]]

List of dictionaries containing the log information for each entry.

read_log() List[Dict[str, str | int | float]][source]

Read the JSON log file.

Returns:

List of dictionaries containing the log information for each entry.

Return type:

List[Dict[str, Union[str, int, float]]]

run()[source]

Runs tasks and wait for all tasks in queue to complete, or until max_runtime is exceeded.

summary_by_slot(fields: List[str] = ['idx', 'host', 'status', 'num_tasks', 'task_ids', 'free_time', 'busy_time'], search: str | None = None, match: str = '.', all_fields: bool = False, print_res: bool = True, fname: str | None = None) List[Dict[str, int | str | List[int] | float]][source]

Summarize queue stats by slots.

Parameters:
  • fields (List[str], optional) – List of fields to include in the summary. Defaults to [“idx”, “host”, “status”, “num_tasks”, “task_ids”, “free_time”, “busy_time”].

  • search (str, optional) – String to search for in the summary. Defaults to None.

  • match (str, optional) – Regular expression to match against the search string. Defaults to “.”.

  • all_fields (bool, optional) – Whether to include all available fields in the summary. Defaults to False.

  • print_res (bool, optional) – Whether to print the summary to the console. Defaults to True.

  • fname (str, optional) – File name to write the summary to. Defaults to None.

Returns:

List of dictionaries containing the summary information for each slot.

Return type:

List[Dict[str, Union[int, str, List[int], float]]]

summary_by_task(fields: List[str] = ['task_id', 'running_time', 'cores', 'command'], search: str | None = None, match: str = '.', all_fields: bool = False, print_res: bool = True, fname: str | None = None) List[Dict[str, str | int | float]][source]

Summarize queue stats by task.

Parameters:

fieldsList[str], optional

List of fields to include in the summary. Defaults to [“task_id”, “running_time”, “cores”, “command”].

searchstr, optional

String to search for in the summary. Defaults to None.

matchstr, optional

Regular expression to match against the search string. Defaults to “.”.

all_fieldsbool, optional

Whether to include all available fields in the summary. Defaults to False.

print_resbool, optional

Whether to print the summary to the console. Defaults to True.

fnamestr, optional

File name to write the summary to. Defaults to None.

Returns:

List[Dict[str, Union[str, int, float]]]

List of dictionaries containing the summary information for each task.

pyslurmtq.Slot module

Slot

Class defining abstract unit for job execution in a SLURM job.

class pyslurmtq.Slot.Slot(host, idx)[source]

Bases: object

Combination of (host, idx) that can run a SLURM task. A slot can have a task associated with it or be free. host corresponds to the name of the host that can execute the task, as accessible in the SLURM environment variable SLURM_JOB_NODELIST or, locally from the host, in SLURM_NODENAME. idx corresponds to the index in the total task list available to the SLURM job that the slot corresponds to. For example if a job is being run with 3 nodes and 5 total tasks (-N 3 -n 5), then the SLURM execution environment will look something like:

SLURM_JOB_NODELIST=c303-[005-006],c304-005
SLURM_TASKS_PER_NODE=2(x2),1

In this scenario, host cs303-005 would have slot idxs 1 and 2, cs303-006 would have slots 3 and 4 associated with it, and host cs304-005 would have only slot 5 associated with it. Note that these task slots do not corespond to the available CPUs per host available, which can vary depending on the cluster being used.

host

Name of compute node on a SLURM execution system. Corresponds to a host listed in the environment variable SLURM_JOB_NODELIST.

Type:

str

idx

Index of task in total available SLURM task slots. See above for more details.

Type:

int

free

False if slot is being occupied currently by a task, True otherwise.

Type:

bool

tasks

List of Task objects that have been executed on this slot. If the slot is currently occupied, the last element in the list corresponds to the currently running task.

Type:

List[Task]

is_free()[source]

Test whether slot is occupied

occupy(task)[source]

Occupy a slot with a task.

Parameters:

task (:class:Task) – Task object that will occupy the slot.

Raises:

ValueError – If trying to occupy a slot that is not currently free.

release()[source]

Make slot unoccupied.

pyslurmtq.Task module

Task Class

TODO: Description

class pyslurmtq.Task.Task(task_id: int, cmnd: str, workdir: str, cores: int = 1, pre: str | None = None, post: str | None = None, cdir: str | None = None)[source]

Bases: object

Command to be executed in parallel using ibrun on a slot of SLURM tasks as designated by :class:SLURMTaskQueue. This class contains the particulars of a task to be executing, including the main parallel command to be executed in parallel using ibrun, optional pre/post process commands to be executed serially, and an optional directory to change to before executing the main parallel command. Once appropriate resources for the task have been found, and the execute() method is called, the class slots attribute will be filled with an (offset, extent) pair indicating what continuous region of the available task slots is being occupied by the currently running task. The command to be executed is then wrapped into a script file that is stored in workdir and a :class:subprocess.Popen object is opened to execute the script.

Note that the :class:SLURMTaskQueue handles the initialization and management of task objects, and in general a user has no need to initialize task objects individually.

task_id

Unique task ID to assign to this Task.

Type:

int

cmnd

Main command to be wrapped in ibrun with the appropriate offset/extent parameters for parallel execution.

Type:

str

cores

Number of cores, which correspond to SLURM job task numbers, to use for the job.

Type:

int

pre

Command to be executed in serial before the main parallel command.

Type:

str

post

Command to be executed in serial after the main parallel command.

Type:

str

cdir

directory to change to before executing the main parallel command.

Type:

str

workdir

directory to store execution script, along with output and error files.

Type:

str

execfile

Path to shell script containing wrapped command to be run by the subprocess that is spawned to executed the SLURM task. Note this file won’t exist until the task is executed.

Type:

str

logfile

Path to file where stdout of the SLURM task will be redirected to. Note this file won’t exist until the task is executed.

Type:

str

errfile

Path to file where stderr of the SLURM task will be redirected to. Note this file won’t exist until the task is executed.

Type:

str

slots

(offset, extent) tuple in SLURM Task slots where task is currently being/was executed, or None if task has not been executed yet.

Type:

Tuple(int)

start_ts

Timestamp, in seconds since epoch, when task execution started, or None if task has not been executed yet.

Type:

float

end_ts

Timestamp, in seconds since epoch, when task execution finished, as measured by first instance the process is polled using get_rc() with a non-negative response, or None if task has not finished yet.

Type:

float

running_time

Timestamp, in seconds since epoch, when task execution finished, as measured by first instance the process is polled using get_rc() with a non-negative response, or None if task has not finished yet.

Type:

float

execute(offset, extent)[source]

Execute a wrapped command on subprocesses given a task slot range.

Parameters:
  • offset (int) – Offset in list of total available SLURM tasks available. This will determine the -o paraemter to run ibrun with.

  • extent (int) – Extent, or number of slots, to occupy in list of total available SLURM tasks available. This will determine the -n parameter to run ibrun with.

get_rc()[source]

Poll process to see if completed

read_err(lineno=None)[source]

Read Task error file

read_log(lineno=None)[source]

Read Task output file

read_task_file(fname, lineno=None)[source]

Read Task error file

terminate()[source]

Terminate subprocess executing task if it exists.

pyslurmtq.pyslurmtq module

This is a skeleton file that can serve as a starting point for a Python console script. To run this script uncomment the following lines in the [options.entry_points] section in setup.cfg:

console_scripts =
     fibonacci = pyslurmqt.skeleton:run

Then run pip install . (or pip install -e . for editable mode) which will install the command fibonacci inside your current environment.

Besides console scripts, the header (i.e. until _logger…) of this file can also be used as template for Python modules.

Note

This file can be renamed depending on your needs or safely removed if not needed.

References

pyslurmtq.pyslurmtq.main(args)[source]

Wrapper allowing a SLURMTaskQueue to be initialized and run from an input json file containing a list of tasks.

Parameters:

args (List[str]) – command line parameters as list of strings (for example ["--verbose", "42"]).

pyslurmtq.pyslurmtq.parse_args(args)[source]

Parse command line parameters

Parameters:

args (List[str]) – command line parameters as list of strings (for example ["--help"]).

Returns:

command line parameters namespace

Return type:

argparse.Namespace

pyslurmtq.pyslurmtq.run()[source]

Calls main() passing the CLI arguments from sys.argv

This function is the main entry point for the CLI.

pyslurmtq.pyslurmtq.setup_logging(loglevel)[source]

Setup json logging

Parameters:
  • loglevel (int) – minimum loglevel for emitting messages

  • logfile (str) – Path to log file to create for task queue.

pyslurmtq.utils module

SLURM Task Queue Utilities

Utility functions for SLURMTaskQueue and supporting classes.

pyslurmtq.utils.compact_int_list(i, delim=',')[source]

Compacts int lists with ranges.

pyslurmtq.utils.expand_int_list(s)[source]

Expands int lists with ranges.

pyslurmtq.utils.filter_res(res, fields, search=None, match='.', print_res=False, output_file=None)[source]

Print results

Prints dictionary keys in list fields for each dictionary in res, filtering on the search column if specified with regular expression if desired.

Parameters:
  • res (List[dict]) – List of dictionaries containing response of an AgavePy call

  • fields (List[string]) – List of strings containing names of fields to extract for each element.

  • search (string, optional) – String containing column to perform string patter matching on to filter results.

  • match (str, default='.') – Regular expression to match strings in search column.

  • output_file (str, optional) – Path to file to output result table to.

Module contents