pyslurmtq package¶
Submodules¶
pyslurmtq.SLURMTaskQueue module¶
SLURMTaskQueue
Defines classes for executing a collection of tasks in a single SLURM job. A task is defined as a command to be run in parallel using a given number of SLURM tasks, as would be run with ibrun -n.
- class pyslurmtq.SLURMTaskQueue.SLURMTaskQueue(tasks: List[dict] | None = None, task_file: str | None = None, workdir: str | None = None, task_max_runtime: float = 10000000000.0, max_runtime: float = 10000000000.0, delay: float = 1, loglevel: int = 10, summary_interval: float = 60)[source]¶
Bases:
objectImplements a Task Queue of :class:Task objects to be executed in parallel on available compute nodes according to the SLURM environment variables.
- task_slots¶
List of task slots available. This is parsed upon initialization from SLURM environment variables SLURM_JOB_NODELIST and SLURM_TASKS_PER_NODE.
- Type:
List(:class:Slot)
- workdir¶
Path to directory to store files for tasks executed, if the tasks themselves dont specify their own work directories. Defaults to a directory with the prefix .stq-job{SLURM_JOB_ID}- in the current working directory.
- Type:
- delay¶
Number of seconds to pause between iterations of updating the queue. Default is 1 second. Note this affects the poll rate of tasks runing in the queue.
- Type:
- task_max_runtime¶
Max run time, in seconds, any individual task in the queue can run for.
- Type:
- task_count¶
Running counter, starting from 0, of total tasks that pass through the queue. The current count is used for the task_id of the next task added to the queue, so that a tasks task_id corresponds to the order in which it was added to the queue.
- Type:
- queue¶
List of :class:Task in queue. Populated via the enqueue_from_json() method.
- Type:
List(:class:Task)
- running¶
List of :class:Task that are currently running.
- Type:
List(:clas:Task)
- completed¶
List of :class:Task that are completed running successfully, in that the process executing them returned a 0 exit code.
- Type:
List(:clas:Task)
- errored¶
List of :class:Task that failed to run successfully in that the processes executing them returned a non-zero exit code..
- Type:
List(:clas:Task)
- timed_out¶
List of :class:Task that failed to run successfully in that the their runtime exceeded task_max_runtime.
- Type:
List(:clas:Task)
- invalid¶
List of :class:Task that were not run because their configurations were invalid, or the amount of resources required to run them was too large.
- Type:
List(:clas:Task)
- enqueue(task_list: List[dict], cores: int = 1)[source]¶
Add a list of tasks to the queue. Each task is a dictionary with at mininum each containing a cmnd field indicating the command to be executed in parallel using a corresponding number of cores, which defaults to the passed in value if not specified per task configuration.
- Parameters:
task_list (List[dict]) –
- List of dictionaries, one per task with the following fields:
’cmnd’ : required, parllalel command to execute ‘cores’ : optional, number of cores to user on this task ‘pre_process’ : optional, serial command to run prior to running ‘cmnd’ ‘post_process’ : optional, serial command to run after running ‘cmnd’
cores (int) – Default number of cores to use for each task if not specified within task configuration.
- enqueue_from_json(filename, cores=1)[source]¶
Add a list of tasks to the queue from a JSON file. The json file must contain a list of configurations, with at mininum each containing a cmnd field indicating the command to be executed in parallel using a corresponding number of cores, which defaults to the passed in value if not specified per task configuration.
- get_log(fields: List[str] = ['asctime', 'levelname', 'message'], search: str | None = None, match: str | None = None, print_log: bool = True) List[Dict[str, str | int | float]][source]¶
Get and optionally print log entries.
Parameters:¶
- fieldsList[str], optional
List of fields to include in the summary. Defaults to [“asctime”, “levelname”, “message”].
- searchstr, optional
String to search for in the summary. Defaults to None.
- matchstr, optional
Regular expression to match against the search string. Defaults to None.
- print_logbool, optional
Whether to print the log to the console. Defaults to True.
Returns:¶
- List[Dict[str, Union[str, int, float]]]
List of dictionaries containing the log information for each entry.
- run()[source]¶
Runs tasks and wait for all tasks in queue to complete, or until max_runtime is exceeded.
- summary_by_slot(fields: List[str] = ['idx', 'host', 'status', 'num_tasks', 'task_ids', 'free_time', 'busy_time'], search: str | None = None, match: str = '.', all_fields: bool = False, print_res: bool = True, fname: str | None = None) List[Dict[str, int | str | List[int] | float]][source]¶
Summarize queue stats by slots.
- Parameters:
fields (List[str], optional) – List of fields to include in the summary. Defaults to [“idx”, “host”, “status”, “num_tasks”, “task_ids”, “free_time”, “busy_time”].
search (str, optional) – String to search for in the summary. Defaults to None.
match (str, optional) – Regular expression to match against the search string. Defaults to “.”.
all_fields (bool, optional) – Whether to include all available fields in the summary. Defaults to False.
print_res (bool, optional) – Whether to print the summary to the console. Defaults to True.
fname (str, optional) – File name to write the summary to. Defaults to None.
- Returns:
List of dictionaries containing the summary information for each slot.
- Return type:
- summary_by_task(fields: List[str] = ['task_id', 'running_time', 'cores', 'command'], search: str | None = None, match: str = '.', all_fields: bool = False, print_res: bool = True, fname: str | None = None) List[Dict[str, str | int | float]][source]¶
Summarize queue stats by task.
Parameters:¶
- fieldsList[str], optional
List of fields to include in the summary. Defaults to [“task_id”, “running_time”, “cores”, “command”].
- searchstr, optional
String to search for in the summary. Defaults to None.
- matchstr, optional
Regular expression to match against the search string. Defaults to “.”.
- all_fieldsbool, optional
Whether to include all available fields in the summary. Defaults to False.
- print_resbool, optional
Whether to print the summary to the console. Defaults to True.
- fnamestr, optional
File name to write the summary to. Defaults to None.
Returns:¶
- List[Dict[str, Union[str, int, float]]]
List of dictionaries containing the summary information for each task.
pyslurmtq.Slot module¶
Slot
Class defining abstract unit for job execution in a SLURM job.
- class pyslurmtq.Slot.Slot(host, idx)[source]¶
Bases:
objectCombination of (host, idx) that can run a SLURM task. A slot can have a task associated with it or be free. host corresponds to the name of the host that can execute the task, as accessible in the SLURM environment variable SLURM_JOB_NODELIST or, locally from the host, in SLURM_NODENAME. idx corresponds to the index in the total task list available to the SLURM job that the slot corresponds to. For example if a job is being run with 3 nodes and 5 total tasks (-N 3 -n 5), then the SLURM execution environment will look something like:
SLURM_JOB_NODELIST=c303-[005-006],c304-005 SLURM_TASKS_PER_NODE=2(x2),1
In this scenario, host cs303-005 would have slot idxs 1 and 2, cs303-006 would have slots 3 and 4 associated with it, and host cs304-005 would have only slot 5 associated with it. Note that these task slots do not corespond to the available CPUs per host available, which can vary depending on the cluster being used.
- host¶
Name of compute node on a SLURM execution system. Corresponds to a host listed in the environment variable SLURM_JOB_NODELIST.
- Type:
- tasks¶
List of Task objects that have been executed on this slot. If the slot is currently occupied, the last element in the list corresponds to the currently running task.
- Type:
List[
Task]
- occupy(task)[source]¶
Occupy a slot with a task.
- Parameters:
task (:class:Task) – Task object that will occupy the slot.
- Raises:
ValueError – If trying to occupy a slot that is not currently free.
pyslurmtq.Task module¶
Task Class
TODO: Description
- class pyslurmtq.Task.Task(task_id: int, cmnd: str, workdir: str, cores: int = 1, pre: str | None = None, post: str | None = None, cdir: str | None = None)[source]¶
Bases:
objectCommand to be executed in parallel using ibrun on a slot of SLURM tasks as designated by :class:SLURMTaskQueue. This class contains the particulars of a task to be executing, including the main parallel command to be executed in parallel using ibrun, optional pre/post process commands to be executed serially, and an optional directory to change to before executing the main parallel command. Once appropriate resources for the task have been found, and the execute() method is called, the class slots attribute will be filled with an (offset, extent) pair indicating what continuous region of the available task slots is being occupied by the currently running task. The command to be executed is then wrapped into a script file that is stored in workdir and a :class:subprocess.Popen object is opened to execute the script.
Note that the :class:SLURMTaskQueue handles the initialization and management of task objects, and in general a user has no need to initialize task objects individually.
- cmnd¶
Main command to be wrapped in ibrun with the appropriate offset/extent parameters for parallel execution.
- Type:
- execfile¶
Path to shell script containing wrapped command to be run by the subprocess that is spawned to executed the SLURM task. Note this file won’t exist until the task is executed.
- Type:
- logfile¶
Path to file where stdout of the SLURM task will be redirected to. Note this file won’t exist until the task is executed.
- Type:
- errfile¶
Path to file where stderr of the SLURM task will be redirected to. Note this file won’t exist until the task is executed.
- Type:
- slots¶
(offset, extent) tuple in SLURM Task slots where task is currently being/was executed, or None if task has not been executed yet.
- Type:
Tuple(int)
- start_ts¶
Timestamp, in seconds since epoch, when task execution started, or None if task has not been executed yet.
- Type:
- end_ts¶
Timestamp, in seconds since epoch, when task execution finished, as measured by first instance the process is polled using get_rc() with a non-negative response, or None if task has not finished yet.
- Type:
- running_time¶
Timestamp, in seconds since epoch, when task execution finished, as measured by first instance the process is polled using get_rc() with a non-negative response, or None if task has not finished yet.
- Type:
pyslurmtq.pyslurmtq module¶
This is a skeleton file that can serve as a starting point for a Python
console script. To run this script uncomment the following lines in the
[options.entry_points] section in setup.cfg:
console_scripts =
fibonacci = pyslurmqt.skeleton:run
Then run pip install . (or pip install -e . for editable mode)
which will install the command fibonacci inside your current environment.
Besides console scripts, the header (i.e. until _logger…) of this file can
also be used as template for Python modules.
Note
This file can be renamed depending on your needs or safely removed if not needed.
References
- pyslurmtq.pyslurmtq.main(args)[source]¶
Wrapper allowing a
SLURMTaskQueueto be initialized and run from an input json file containing a list of tasks.- Parameters:
args (List[str]) – command line parameters as list of strings (for example
["--verbose", "42"]).
- pyslurmtq.pyslurmtq.parse_args(args)[source]¶
Parse command line parameters
- Parameters:
args (List[str]) – command line parameters as list of strings (for example
["--help"]).- Returns:
command line parameters namespace
- Return type:
pyslurmtq.utils module¶
SLURM Task Queue Utilities
Utility functions for SLURMTaskQueue and supporting classes.
- pyslurmtq.utils.filter_res(res, fields, search=None, match='.', print_res=False, output_file=None)[source]¶
Print results
Prints dictionary keys in list fields for each dictionary in res, filtering on the search column if specified with regular expression if desired.
- Parameters:
res (List[dict]) – List of dictionaries containing response of an AgavePy call
fields (List[string]) – List of strings containing names of fields to extract for each element.
search (string, optional) – String containing column to perform string patter matching on to filter results.
match (str, default='.') – Regular expression to match strings in search column.
output_file (str, optional) – Path to file to output result table to.