The Job Interface

This page describes the Job interface that is used for building a workflow.

Introduction

A Job is a Node in a Directed Acyclic Graph that performs an action on values coming into the Job (the output of its parents) and specifies output(s) that other Jobs/Nodes can use again as their inputs. In the following, upstream means that the values are being consumed by the Job and coming from the parent job(s), while downstream means that the job is providing outputs to the child jobs (consumed or not).

Note

Root Jobs of the DAG only operate on specified parameters and do not use any other upstream inputs.

Job state and computation caching

Jobs are specially designed to avoid recomputations. They store their state and their outputs to the file system. For the purpose of storing their state, Jobs provide the functions to ease the definition of what is needed and what should be stored. When a Job is asked to perform the action that it is supposed to do, it first checks if the parents need to recompute anything. If not, it means that the parents state can be immediately retrieved from the stored file, which saves the computations at the cost of loading/saving the state. For the current job, if the parameters have not changed and the parents do not need to recomputed their output, the current job may also retrieve its state from the stored file.

All this functionality (loading/storing state, comparing states, check for outdated parents) is already taken care of by the Job class, and only a few number of fields need to be set up.

Extension

If the default method for loading the state back from the JSON file needs some additional functionality, it is possible to overload the Job.load_state function.

Runtime and static parameters

Each Job may have parameters that define its current state. These parameters are flushed to a JSON file together with the outputs of the Job. This allows us to store intermediate computations in a simple manner and to avoid recomputing actions whose parameters did not change.

It also initializes all specified parameters so we only have to specify these via name (See Example Job for the implementation of a Job example)

Job identification

Every job is named using the simple name attribute:

class HistogramCorrelationJob(Job):
    name = 'histogram_correlation'

This name should be unique in the workflow. If the behaviour of a job is needed several times in a workflow (but with different parameters), then new classes may be defined being child of the job to reuse, this time with a different name. This is the case for instance for the classes SelectSlide and SelectSpeaker, refining the behaviour of SelectPolygonJob.

Job cache comparison

The main entry of the cached value comparison is performed by the method Job.is_up_to_date, which is possible to override in a child class (for eg. test for file existance, timestamp of files, etc).

This method checks that the parents are up to date (and returns False if not), and then calls the function Job.are_states_equal, which is also possible to override.

The current implementation of the comparison is that the values being compared are transformed to a string, and the resulting strings are compared instead.

Note

If you override one of those member function, it is always possible to fall-back/call the default behaviour with for instance:

super(CurrentJobClass, self).is_up_to_date()

Job action

Subclasses only need to define their specific parameters, outputs and parents and overload the Job.run and Job.get_outputs functions.

Example Job

We are going to use the HistogramCorrelationJob to explain how to build further Jobs.

Job Setup

We first need to setup a Job by subclassing it and setting a name:

class HistogramCorrelationJob(Job):
    name = 'histogram_correlation'

This specific Job has no special parameters, if it had, we would set those in attributes_to_serialize.

attributes_to_serialize = []

Next we need to specify what outputs are cached in the JSON file (This automatically sets an attribute with the name histogram_correlations for this Job).

outputs_to_cache = ['histogram_correlations']

The __init__ method only calls the Job’s __init__ method. If there were any special parameters for this Job we could assert that those were actually passed.

def __init__(self, *args, **kwargs):
    super(HistogramCorrelationJob, self).__init__(*args, **kwargs)

    # Optional
    # assert('name_of_job_parameter' in kwargs)

The next optional thing to do is to overload the load_state function. It is responsible for loading the stored state back from the JSON file.

We need to overload this function, when the JSON storage differs from the format we want to have in Python. JSON stores all keys as Strings for example, and for this specific Job we want to index the stored data by integers and also sort it, so we do

from ....util.tools import sort_dictionary_by_integer_key

[...]

def load_state(self):
    state = super(HistogramCorrelationJob, self).load_state()

    if state is None:
        return None

    correlations = state['histogram_correlations']
    correlations = sort_dictionary_by_integer_key(correlations)

    state['histogram_correlations'] = correlations

    return state

Defining the Job Action

The action of the Job is defined in the run method which every Job needs to overload.

Parent Inputs

The run method receives its argumentes in the same order that the parents of the Job are specified.

When building a workflow we can for example specify

HistogramCorrelationJob.add_parent(HistogramsLABDiff)
HistogramCorrelationJob.add_parent(NumberOfFilesJob)

or alternatively directly set the parents member of the Job

parents = [HistogramsLABDiff, NumberOfFilesJob]

The first parent returns a function of time and area_name (see HistogramsLABDiff) and the second parent just returns the number of thumbnails. So the run method looks as follows

def run(self, *args, **kwargs):

    # The first parent is the HistogramComputation
    get_histogram = args[0]

    # Second parent is the NumberOfFiles
    number_of_files = args[1]

    # init
    self.histogram_correlations = {}

    previous_slide_histogram = get_histogram('slides', 1)
    previous_speaker_histogram_plane = get_speaker_histogram_plane(1)

    for frame_index in range(2, number_of_files):

        slide_histogram = get_histogram('slides', frame_index)

        if previous_slide_histogram is not None:
            self.histogram_correlations[frame_index] = \
                cv2.compareHist(slide_histogram, previous_slide_histogram, cv2.cv.CV_COMP_CORREL)

        previous_slide_histogram = slide_histogram
        previous_speaker_histogram_plane = speaker_histogram_plane

The run method builds up a dictionary with the indices being the frames and the values being the corresponding histogram correlation.

This dictionary is saved to the JSON file as specified by the outputs_to_cache member of HistogramCorrelationJob.

Job Outputs

Since it is more convenient for other Jobs to operate with a proper function instead of just a dictionary we transform the output in the get_outputs method.

def get_outputs(self):
    super(HistogramCorrelationJob, self).get_outputs()

    if self.histogram_correlations is None:
        raise RuntimeError('The Correlations between the histograms have not been computed yet.')

    return Functor(self.histogram_correlations)

The Functor class just wraps the dictionary in a callable object. When this object is called with an index, it returns the value of the index from the dictionary.

Reference

The Job module

This module provides a Job class that all Actions should subclass.

The Job class takes care of the dependencies between Jobs and only runs computations if needed. This can happen if some parameters of a predecessing action change.

class livius.video.processing.job.Job(*args, **kwargs)

This class implements the basic functionality for each Job.

Important

All subclasses must override the run() function. It should compute all attributes mentioned in self.outputs_to_cache.

After the Job is run, the state is serialized to a JSON file.

Subclasses can optionally override the load_state() function which provides a way to deal with the difference between JSON storage and the Python objects (e.g the fact that keys are always stored as unicode strings).

classmethod add_parent(obj)

Add a specific job as a parent job.

Parent Jobs are jobs which the current job is dependent on. They are executed and updated before the current job.

are_states_equal()

Return True is the state of the current object is the same as the one in the serialized json dump.

attributes_to_serialize = []

List of attributes that represent the Job’s state

cache_output()

Set all output attributes as loaded from the JSON file.

get_outputs()

Return all the possible outputs of this step.

get_parent_by_name(name)

Algorithm is breadth first.

get_parent_by_type(t)

Algorithm is breadth first.

classmethod get_parents()

Return all the parents jobs of this class.

is_output_cached()

Check if all output attributes are present.

is_up_to_date()

Indicate wether this step should be processed again.

This may be overriden by a child Job for instance when the output is a file and cannot be seen by the internal state (example: output file does not exist, input parameters makes the output file obsolete…)

load_state()

Load the json file.

name = 'root'

Name of the Job (used for identification).

outputs_to_cache = []
parents = None

List of parents.

Important

The order of the parents is important as it determines the order in which the outputs of outputs of the parent Jobs are passed to this Job’s run() method.

process()

Process the current node and all the parent nodes, and provide the outputs of the parents to this node.

run(*args, **kwargs)

Run this Job’s computation/action. Should be overridden by an implementation class.

Parameters:args – Outputs of the parent Jobs. The order of the args is determined by the the order of self.parents.
serialize_state()

Flush the state of the runner into the json file mentioned by ‘json_prefix’ (init) to which the name of the current Job has been appended in the form ‘json_prefix’_’name’.json Also flushes the state of the parents as well.

classmethod workflow_to_dot()

Returns a dot representation of the current workflow.

This allows to plot the workflow with graphviz.

classmethod workflow_to_string(current_index=0)

Returns a string version of the workflow