4.27. jobqueue — Job queue

This module implements a simple job queue where you queue jobs (such as rendering an animation) which can then be processed by one or more machines. A job is made up of a hierarchy of individual sub-jobs that are processed in a depth-first order (a job is only executed when all of its sub-jobs have executed successfully). Running an individual job means running a particular job procedure which is implemented as a Python object.

The module defines the following function to create a new job queue:

cgkit.jobqueue.createJobQueue(location, keepJobsInRepository=False, useSymLinks=False)

Create and initialize a new job queue directory.

location is a string containing the directory path where the job queue should be put. This must either refer to an empty directory or to a path that does not exist (the parent must exist though).

When jobs are created, they are first stored in a dedicated job repository directory (within the job queue directory). The keepJobsInRepository flag determines whether the jobs will be moved into their respective parent job directory or whether they remain in the repository directory.

useSymLinks determines whether sub-jobs are linked by creating symbolic links on disk or by creating files that contain the target directory. This can only be set to True if the file system actually supports symbolic links (so on Windows this should always be False). The only advantage of using symbolic links is that it will be easier to navigate the job directory manually. However, the disadvantage is that the entire job queue directory will not be location independent anymore.

Once the job queue has been created successfully, you can queue jobs using the JobQueue class.

4.27.1. JobQueue class

class cgkit.jobqueue.JobQueue(location, logger=None)

A job queue contains a list of jobs which can be run by one or more processes. Each job may be composed of sub-jobs which all have to be successfully completed before the job is allowed to be processed. Jobs that are on the same level in the hierarchy are independent and can be run concurrently.

An individual job is represented by a job procedure (JobProc) object that implements the actual functionality of a particular type of job. A job is considered done when the run() method of a job procedure has been executed successfully and the job procedure did not indicate an error.

On disk, the entire job queue is stored as a directory which contains a job directory for each job in the queue. Sub-jobs are sub-directories of the job directories.

location is a string containing the directory where the job queue is stored (the directory should already exist, it is not created). It is also valid to pass None (which is equivalent to an empty queue that cannot receive jobs).

logger can be set to a logger object from the logging module (or an object with the same interface) which will receive log message.

Note: A job queue object does not store any data about the queue state in memory. This means there is no open-close cycle and you can discard the job queue object at any time without that the job queue directory would get corrupted. Any number of job queue objects (even from different processes or machines) can be associated with the same job queue directory and submit or run jobs at the same time. Synchronization is entirely disk-based, there is no other communication between job queue objects.

JobQueue.location

Return the absolute location of the job queue directory.

Returns None if no directory is set.

JobQueue.createJobRoot(jobType=None, **params)

Create a new top-level job.

jobType is the name of the job procedure that should be created. Any additional keyword arguments are passed to the constructor of the job procedure.

Returns a JobRoot object that represents the newly created job. Sub-jobs can be created by calling createJob() on the returned job root.

After the entire job hierarchy has been created, the activate() method must be called on the job root, otherwise the job will not be processed.

JobQueue.createJob(jobType=None, **params)

This is an alias for createJobRoot().

JobQueue.listJobs()

Return a list of all top-level jobs.

Returns a list of JobHandle objects that represent the roots of all jobs. The order of the jobs is in the order they would be processed. If the job queue is not associated with a job directory, the return value is an empty list.

JobQueue.deleteJobs(jobs)

Delete one or more jobs.

jobs is either a single JobHandle object or a sequence of JobHandle objects. jobs may also be None in which case the method returns immediately.

The method raises an error (OSError) if it couldn’t delete a job. This will abort the entire operation and some of the input jobs may not even have been processed. Jobs that don’t exist at all anymore are not considered an error and won’t trigger an exception.

Note: The method is meant to be used on top-level jobs. Currently, it can also be used for sub-jobs, but it doesn’t check if deleting a sub-job breaks another sub-job that references a job within the deleted job.

JobQueue.runNextAvailableJob(retries=10)

Run the next available job in the queue that is in a waiting state.

The return value is True if a job could be run or False if there are no more waiting jobs at the moment. When there are waiting jobs but the method fails to allocate one of them, then the integer retries determines how many attempts should be made before giving up. Once the last attempt has failed, a JobQueueError exception is thrown. Some causes for this could be:

  • The process doesn’t have file access permissions for the job directory
  • There were too many other processes that were slightly quicker in getting permission
  • There is a broken job directory somewhere (if it’s in waiting state but can’t be run for some reason (maybe the proc is missing))

4.27.2. Job class

class cgkit.jobqueue.jobqueue.Job(jobRoot, jobType, **params)

Job object (during creation).

Objects of this class represent jobs that are just being created.

addDependency(job)

Establish a dependency between another job.

This job will only be run when all dependencies have been successfully finished.

createJob(jobType, **params)

Create a new sub-job.

jobType is the name of the job procedure that should be created. Any additional keyword arguments are passed to the constructor of the job procedure.

Returns a Job object that represents the newly created job.

This method is equivalent to creating a job object manually and calling addDependency(job).

4.27.3. JobRoot class

class cgkit.jobqueue.jobqueue.JobRoot(jobQueue, jobType, **params)

The root of a job hierarchy.

In principle, this is a job like every other job as well, but because it has a directory inside the main job directory it must remain the root of the job hierarchy. The root is also the only job directory that has a special job repository directory that may store all the sub-jobs.

This class is derived from the Job class.

activate()

Activate the job so that it can be processed.

This is the last step of a job submission. After calling this method, the job hierarchy must not be changed anymore.

Raises a JobQueueError if the graph contains a cycle.

addDependency(job)

Establish a dependency between another job.

This job will only be run when all dependencies have been successfully finished.

createJob(jobType, **params)

Create a new sub-job.

jobType is the name of the job procedure that should be created. Any additional keyword arguments are passed to the constructor of the job procedure.

Returns a Job object that represents the newly created job.

This method is equivalent to creating a job object manually and calling addDependency(job).

4.27.4. JobHandle class

class cgkit.jobqueue.jobqueue.JobHandle(location, rootLocation)

Job object (queued).

Objects of this class represent jobs that have already been queued.

Every job has:

  • A type name
  • An instance name
  • A job description
  • An associated directory
endTime

Return the time the job was finished (in seconds).

Returns None if the job hasn’t been finished yet (or there is a problem with the job directory).

errorMarkerFile

Return the location of the error marker file.

The presence of this file indicates that running the job resulted in an error.

hasError(recursive=False)

Check if this job produced an error.

If recursive is False, only this job is considered, not the children jobs. In this case, the return value is only meaningful when the job is in finished state. If recursive is True, the result will be True if any job in the entire sub-hierarchy has an error. This can be an expensive operation because all sub-directories have to be checked.

hostFile

Return the location of the host file.

The host file contains the name of the host that is/was running the job.

isFinished()

Check if this job is currently in ‘finished’ state.

isRunning()

Check if this job is currently in ‘running’ state.

isWaiting()

Check if this job is currently in ‘waiting’ state.

label

Return a short job label to quickly identify the job.

labelFile

Return the location of the file that contains the job label.

listSubJobs()

Return a list of sub-jobs.

The return value is a list of JobHandle objects.

location

Return the job directory.

number

Return the job number.

pidFile

Return the location of the PID file.

This file contains the PID of the process that is/was running the job.

procTracebackFile

Return the location of the file that contains the job procedure traceback.

progress

Return the progress percentage value as an int.

Returns None if the value couldn’t be obtained for some reason.

progressFile

Return the location of the progress indicator file.

setError()

Set the error flag.

This method may only be called by the process that is currently running the job (the .running directory must exist).

startTime

Return the time the job was started (in seconds).

Returns None if the job hasn’t been started yet (or there is a problem with the job directory).

statusLine

Return a string containing the current status line for the GUI.

The status line indicates what a running job is currently doing.

statusLineFile

Return the location of the status line file.

stderrFile

Return the location of the file that contains stderr.

stdoutFile

Return the location of the file that contains stdout.

submitTime

Return the submission time (in seconds).

May return None if the job doesn’t exist or is broken.

4.27.5. JobProc class

class cgkit.jobqueue.JobProc(label=None)

This is the base class for all job procedures. Every node in the job graph is associated with a job procedure object that gets run when that particular node is being processed.

label is a string with a short informational message about what the procedure is going to do (this can be displayed in a GUI).

Note

A job procedure object may get instantiated several times per job (e.g. once at submission time to validate parameters and once when the job is actually run), so the constructor shouldn’t do any initialization that must only be done exactly once. This can be done in the postCreate() method instead.

JobProc.runningDir

The directory in which the job is run.

JobProc.run()

Do whatever this job has to do.

When this method is run, the current directory is set to the corresponding .running directory. This method has to be implemented in a derived class. The base method does nothing.

The method can use the execCmd() method to run command line applications. It can call setStatusLine() and setProgress() to report status information to the user and it can call setError() when the job has failed to run successfully.

JobProc.postCreate(jobDir)

This method is called once when the job is created.

jobDir is the directory in which the job will be run (the attribute runningDir will not yet be initialised).

This method can be implemented in a derived class. The base method does nothing.

Note: The method is not executed when the job is actually run, so you can use it to do initializations that must only be run exactly once.

JobProc.stdoutCallback(lineNr, line)

This method gets called for every line written to stdout.

When execCmd() is used to run a command line application, then this callback is called for every line written by the application to stdout.

lineNr is the line number and line the actual contents of the line.

Derived classes may implement this method to scan for errors, status messages or progress reports. The default implementation does nothing.

JobProc.stderrCallback(lineNr, line)

This method gets called for every line written to stderr.

When execCmd() is used to run a command line application, then this callback is called for every line written by the application to stderr.

lineNr is the line number and line the actual contents of the line.

Derived classes may implement this method to scan for errors, status messages or progress reports. The default implementation does nothing.

JobProc.setProgress(value)

Set the progress percentage value.

value is a number between 0 and 100 (it is clamped) which indicates the progress the job has made.

JobProc.setStatusLine(s)

Set a status line string.

s is a string with a short message that reflects the current status of the job.

JobProc.setError()

Set the job’s error flag.

JobProc.execCmd(cmd)

Execute a command line string.

cmds is a string containing a command that will be executed. stdout/stderr is captured into the job’s stdout/stderr stream. The return value is the return code that was returned by the command.

Derived classes can implement stdoutCallback() and stderrCallback() which get called for every line.