API

Phaser

phaser - a library to simplify automated batch-oriented complex data integration pipelines

Pipeline : organizes a data pipeline into multiple phases, each with checkpoints so that the pipeline can be continued from any phase.

Phase : Organizes the work of a complicated pipeline into a smaller unit. It still contains multiple steps and column definitions/fixes, but it can be run standalone from its input data and have its output examined as both a checkpoint and input to the next phase.

PipelineErrorException, DropRowException, WarningException : these exceptions can be used in custom steps to define what should happen when the exception is raised.

row_step, batch_step : These are decorators used to wrap custom steps so they can be called by a Phase with the right input data, and check the output and handle exceptions.

check_unique, sort_by : Built-in steps that can be declared to operate on a column, and included in the steps of a Phase.

Column, IntColumn, DateColumn, DateTimeColumn : Define column instances by name and type, and pass them to the Phase to have datatype casting and name canonicalization done automatically.

Classes

class phaser.Pipeline(working_dir=None, source=None, phases=None, verbose=False, error_policy=None, name='pipeline')

Pipeline handles running phases in order. It also handles I/O and marshalling what outputs from phases get used as inputs in later phases.

check_extra_outputs(phase)

Check that any extra outputs the phase declared have been added into the context. Throws a PhaserError if any do not exist, as that is a programming error that should be fixed.

check_output_collision()

This is to check that the outputs of the pipeline are not going to overwrite the source file or each other, and that previous copies of the outputs are copied to a previous-run directory

expected_outputs()

All the files expected to be saved in the pipeline. Right now this has phase output checkpoint files, extra outputs, errors and warnings file and the copy of source used for diffs.

init_source(name, source_path)

Initializes a named source based on the kind of ‘source’ passed in.

Parameters:
  • name – A SavableObject that specifies the name and type of the source

  • source_path – must be a os.PathLike file in csv format and will be read entirely into memory

classmethod load(source)

The load method can be overridden to apply a pipeline-specific way of loading data. Phaser default is to read data from a CSV or JSON file.

classmethod phase_save_filename(phase)

As a class method, this can be called from the diffing tool which would like to know what names files will be saved in this pipeline.

report_errors_and_warnings(phase_name)

TODO: different formats, flexibility: For CLI operation we want to report errors to the CLI, but for unsupervised operation these should go to logs. Python logging does allow users of a library to send log messages to more than one place while customizing log level desired, and we could have drop-row messages as info and warning as warn level so these fit very nicely into the standard levels allowing familiar customization.

run()

Nothing should be saved to file during instantiation, in case Pipeline is instantiated for another reason such as inspection. Thus, we do file cleanup/setup only at the start of ‘run’.

save(results, destination)

This method saves the result of the Phase operating on the batch, in phaser’s preferred format. It should be easy to override this method to save in a different way, using pandas’ to_csv, to_excel, to_json or a different output entirely.

setup_phases()

Instantiates phases passed as classes, assigns unique names to phases, and passes Context in also.

validate_sources()

Check that all required sources have been initialized.

class phaser.Phase(name=None, steps=None, columns=None, context=None, renumber=False, extra_sources=None, extra_outputs=None)

The organizing principle for data transformation steps and column definitions is the phase. A phase can

  • load a data file

  • Apply a set of preferred column names and datatypes via ‘columns’

  • Apply a further list of transformations via ‘steps’

  • While applying steps, can drop invalid or unwanted rows, add columns

  • Save only the desired columns

  • Provide a detailed diff or a summary of what changed in the phase

Attributes

namestr

The name of the phase (for debugging and file name usage)

stepslist

A list of functions that will be run in order on data loaded into the phase

columnslist

A list of column definitions with declarations of how to handle the column name and data within the column. Columns are also processed in order, so a column early in the list that instructs the phase to drop rows without values will cause those rows never to be processed by columns later in the list.

contextContext obj

Optional context information that can apply to multiple phases organized in a Pipeline. If no context is passed in, one will be created just for this Phase. The context will be passed to each step in case that step needs outside context.

error_policy: str

The error handling policy to apply in this phase. Default is ON_ERROR_COLLECT, which collects errors, up to one per row, and reports all errors at the end of running the phase. Other options are ON_ERROR_WARN, which adds warnings that will all be reported at the end, ON_ERROR_DROP_ROW which means that a row causing an error will be dropped, and ON_ERROR_STOP_NOW which aborts the phase mid-step rather than continue and collect more errors. Any step that needs to apply different error handling than the phase’s default can throw its own typed exception (see step documentation).

Methods

run(source, destination)

Loads data from source, applies all the phase’s column definitions and steps, and prepares for saving. If run inside a Pipeline, the pipeline will call this, but for debugging/developing or simpler data transformations, this can be used to run the phase without a Pipeline.

load(source)

If creating a Phase that takes data in a custom way, subclass Phase and override the load method. Besides overriding the load method, users of Phase should not need to run load directly as it is run as part of ‘run’. if overriding ‘load’, make sure that both phase.headers and phase.row_data are set up before finishing the method.

execute_row_step(step, outputs={})

Internal method. Each step that is run on a row is run through this method in order to do consistent error numbering and error reporting.

load_data(data)

Call this method to pass record-oriented data to the Phase before calling ‘run’ Can be overridden to load data in a different structure. Note that in normal operation, a Records object is passed in with Record objects and row numbers - however if a Phase is being used in tests, it makes testing a lot easier if load_data can take a raw list of dicts and row numbers get added.

prepare_for_save()

Checks consistency of data and drops unneeded columns

rename_columns()

Renames columns: both using case and space (‘_’, ‘ ‘) matching to convert columns to preferred label format, and using a list of additional alternative names provided in each column definition. It would be cool if this could be done before converting everything to list-of-dicts format…

run()

Each kind of phase has a different process for doing its work, so this method must be overridden.

class phaser.Context(variables=None, working_dir=None, error_policy='ON_ERROR_COLLECT', verbose=False)

Context is created by the pipeline, and passed to each phase. Thus, it can be used to carry extra data or variable values between phases if necessary.

add_variable(name, value)

Add variables that are global to the pipeline and accessible to steps and internal methods

process_exception(exc, phase, step, row)

A method to delegate exception handling to turn into error reporting in standardized way. Called by phase’s step handlers when a phaser data exception or a coding exception occurs :param exc: The exception or error thrown :param step: What step this occurred in :param row: What row of the data this occurred in :param error_policy: The phase’s chosen error handling policies (ON_ERROR_COLLECT, ON_ERROR_STOP_NOW, etc.) :return: Nothing

Columns

class phaser.Column(name, required=True, null=True, blank=True, default=None, fix_value_fn=None, rename=None, allowed_values=None, save=True, on_error=None)

A Column declaration is an instance of Column to provide to a Phase, so that the column name and values may be fixed, validated or cast to a datatype at the beginning of the Phase. The base Column type, besides being subclassed for specific datatypes, is useful for columns that might be mixed types or have strings as values.

Parameters:
  • name – The preferred name or label of the column, e.g. “Date of Birth” or “first_name”. The class will automatically canonicalize column names that appear with inconsistent capitalization or spacing, replacing names such as “last NAME” with the preferred name “last_name”.

  • required – If the column is required, it must be included in the data source. Even so, a phase will continue after finding this column is missing, unless the ‘on_error’ policy is to ‘stop_now’ (see below).

  • null – States whether null values are allowed (True) or will cause errors/warnings (False)

  • default – A default value to apply if a column value is null. Not compatible with “null=False”

  • fix_value_fn – A function (string or callable) or array of functions to apply to each value, such as a builtin ‘capitalize’ or a custom function defined in scope.

  • rename – A set of alternate names that may be used for this column, all of which should be mapped to the preferred name of this column. Upon loading the data, all rows that have columns matching any alternate name in this set will have the alternate name replaced with the preferred ‘name’ value.

  • allowed_values – If allowed_values is empty or None, it is not checked. If allowed_values is a single value or list of values, column logic checks every row to see that values are in the allowed list.

  • save – if True, column is saved at the end of the phase; if not it is omitted.

  • on_error – Choose from one of the error policies to determine how errors that occur while checking, type casting or fixing this column will be handled. Error policies are ON_ERROR_WARN, ON_ERROR_COLLECT, ON_ERROR_DROP_ROW, and ON_ERROR_STOP_NOW.

cast(value)

When subclassing Column to provide a custom column type, override the cast method to do type-casting. For example, the builtin IntColumn uses this method to cast to an int.

Parameters:

value – this parameter will hold a single value from the column, for this method to cast to the correct data type.

Returns:

the value that was passed, now in the correct data type

check_value(value)

When subclassing Column to provide custom validation in a re-usable form, override the check_value method to do additional checking.

Tips:
  • To also use the parent class’s validation, don’t forget to call super().check_value(value).

  • To use the exception declared when the column is defined:

` raise self.use_exception("Explanation of what check went wrong goes here") `

Parameters:

value – this parameter will hold a single value from the column.

Returns:

None

fix_value(value)

When subclassing Column to provide custom data cleaning in a re-usable form, override the ‘fix_value’ method. The default implementation of this method applies the ‘default’ parameter value and also applies functions passed into the ‘fix_value_fn’ parameter. This is called after casting the column to the appropriate date type if using IntColumn, DateColumn, etc.

Tips:
  • Don’t forget to call super().fix_value(value)

  • This method can raise exceptions if a problem is found trying to fix value, see ‘check_value’ for example.

class phaser.BooleanColumn(name, required=True, null=False, default=None, fix_value_fn=None, rename=None, save=True, on_error=None)

Validates truthy and falsey values, as defined in TRUE_VALUES and FALSE_VALUES.

class phaser.IntColumn(name, required=True, null=True, default=None, fix_value_fn=None, rename=None, allowed_values=None, save=True, on_error=None, min_value=None, max_value=None)

Sets up a Column instance ready to do type, format, null and default checking on values, as well as renaming the column name itself to chosen version.

Parameters:
  • name – The preferred name/presentation of the column, e.g. “Date of Birth” or “first_name”

  • required – If the column is required, the phase will present errors if it is missing.

  • null – Checks all values of the column for null values to raise as errors.

  • default – A default value to apply if a column value is null. Not compatible with “null=False”

  • fix_value_fn – A function (string or callable) or array of functions to apply to each value

  • rename – A set of names that may be used in the data as column headers, all of which should be mapped to the preferred name of this column. Upon loading the data, all rows that have columns matching any alternate name in this set will have a column with the preferred name with the same data in it. In other words, any data in a column name in ‘rename’ will end up in a column named ‘name’.

  • allowed_values – If allowed_values is not empty and a column value is not in the list, raises errors. To supply a range, use min_value and max_value instead. NOTE: this is checked after casting, so to check allowed values of a column specified to cast to int, such as IntColumn, check for values like [1, 2, 3] rather than [“1”, “2”, “3”]

  • save – if True, column is saved at the end of the phase; if not it is omitted.

  • min_value – If data is below this value, column raises errors

  • max_value – If data is above this value, column raises errors

class phaser.FloatColumn(*args, **kwargs)

Defines a column that accepts a float value. See IntColumn for parameters.

class phaser.DateColumn(name, required=True, null=True, default=None, fix_value_fn=None, rename=None, allowed_values=None, save=True, on_error=None, min_value=None, max_value=None, date_format=None)

Sets up column to do python date type, format, null and default checking on values, as well as other column functionality. Supports only a few unambiguous formats - otherwise specify date_format parameter.

Parameters:
  • name – The preferred name/presentation of the column, e.g. “Date of Birth” or “first_name”

  • required – If the column is required, the phase will present errors if it is missing.

  • null – Checks all values of the column for null values to raise as errors.

  • default – A default value to apply if a column value is null. Not compatible with “null=False”

  • fix_value_fn – A function (string or callable) or array of functions to apply to each value

  • rename – A set of names that may be used in the data as column headers, all of which should be mapped to the preferred name of this column. Upon loading the data, all rows that have columns matching any alternate name in this set will have a column with the preferred name with the same data in it. In other words, any data in a column name in ‘rename’ will end up in a column named ‘name’.

  • allowed_values – If allowed_values is not empty and a column value is not in the list, raises errors. To supply a range, use min_value and max_value instead.

  • save – if True, column is saved at the end of the phase; if not it is omitted.

  • min_value – If data is below this value, column raises errors

  • max_value – If data is above this value, column raises errors

  • date_format – Formatting string used by datetime.strptime to parse string to date, e.g. ‘%d/%m/%y %H:%M:%S.%f’, ‘%d/%m/%Y’ or ‘%m/%d/%y’.

class phaser.DateTimeColumn(name, required=True, null=True, default=None, fix_value_fn=None, rename=None, allowed_values=None, save=True, on_error=None, min_value=None, max_value=None, datetime_format=None, default_tz=None)

Sets up column to do python datetime type, format, null and default checking on values, as well as other column functionality. Supports ISO8601/RFC3339 date-time formatting. Supply a strptime-style format string to datetime_format to parse other formats.

Parameters:
  • name – The preferred name/presentation of the column, e.g. “Date of Birth” or “first_name”

  • required – If the column is required, the phase will present errors if it is missing.

  • null – Checks all values of the column for null values to raise as errors.

  • default – A default value to apply if a column value is null. Not compatible with “null=False”

  • fix_value_fn – A function (string or callable) or array of functions to apply to each value

  • rename – A set of names that may be used in the data as column headers, all of which should be mapped to the preferred name of this column. Upon loading the data, all rows that have columns matching any alternate name in this set will have a column with the preferred name with the same data in it. In other words, any data in a column name in ‘rename’ will end up in a column named ‘name’.

  • allowed_values – If allowed_values is not empty and a column value is not in the list, raises errors. To supply a range, use min_value and max_value instead.

  • save – if True, column is saved at the end of the phase; if not it is omitted.

  • min_value – If data is below this value, column raises errors

  • max_value – If data is above this value, column raises errors

  • datetime_format – Formatting string used by datetime.strptime to parse string to date, e.g. ‘%d/%m/%y %H:%M:%S.%f’, ‘%d/%m/%Y’ or ‘%m/%d/%y’.

  • default_tz – If timezone is not specified in value, assume this timezone applies.

Exceptions

exception phaser.DropRowException(message, **kwargs)

Throwing this exception in a row_step will cause the current row to be dropped. Rows dropped this way will be listed in the phase results report along with a reason given in the exception constructor.

Steps

Steps are defined using decorators. A step is meant to be a function that accepts an input and produces an output.

@row_step
def add_an_id(row):
  row['id'] = next_id()
  return row

Occasionally extra sources or outputs are needed. In order to use them, they must first be defined on the Pipeline as an ExtraRecords object or an ExtraMapping object. The extra sources or outputs must then be declared in the decorator as in the example below. In the example, foo is an ExtraMapping, which presents to the function as a dictionary, and bar is an ExtraRecord, which presents like a list.

@row_step(extra_sources=['foo'], extra_outputs=['bar'])
def do_foo_and_bar(row, foo, bar):
    row['foo'] = foo[row['id']]
    bar.append(row['bar'])
    return row

All steps can accept a parameter named context which will be the Context object for the current run of the pipeline.

@phaser.row_step(func=None, *, extra_sources=None, extra_outputs=None)

Used to define a step that should run on each row of a data set.

The function that is decorated should accept a dictionary as its first parameter. If extra_sources or extra_outputs are defined, then they will be passed in as explicit parameters, named according to their definition in the decorator. Sources and outputs appear to the step function as the type of object defined at the pipeline level. If a Context is required, then it can be included as a parameter, too.

The decorated function must return a dict, or throw an exception.

Parameters:
  • extra_sources – An array of source names

  • extra_output – An array of names of outputs

@phaser.batch_step(func=None, *, extra_sources=None, extra_outputs=None, check_size=False)

Used to define a step that needs to run on the whole batch of data.

The decorated function should accept a list of dictionaries as its first parameter.

Parameters:
  • extra_sources – An array of source names

  • extra_output – An array of names of outputs

  • check_size – A boolean indicating whether or not to validate the size of the batch after the step is run

@phaser.context_step(func=None, *, extra_sources=None, extra_outputs=None)

Used to define a step that works on the context.

The decorated function should accept the context as its first parameter.

Parameters:
  • extra_sources – An array of source names

  • extra_output – An array of names of outputs

@phaser.dataframe_step(func=None, *, pass_row_nums=True, extra_sources=None, extra_outputs=None, check_size=False)

Used to define a step that needs to run on the whole set of data as a pandas.DataFrame.

The decorated function should accept a DataFrame as its first parameter.

Parameters:
  • pass_row_nums – If True, the row numbers will be set in the DataFrame in a column named the value of PHASER_ROW_NUM

  • extra_sources – An array of source names

  • extra_output – An array of names of outputs

  • check_size – A boolean indicating whether or not to validate the size of the DataFrame after the step is run

Returns:

a DataFrame (or something else supporting to_dict– be wary of returning a Series unless you mean to)

Builtin steps

@phaser.drop_duplicate_rows

This step factory will build a step to delete rows that are duplicates of each other, based on every value or based on a list of columns or column names. Consider also using DropRowException in a custom row_step if the logic doesn’t conveniently fit in this built-in.

Parameters:

columns – A list of the columns that are checked to determine if a row is a duplicate. columns=None (which is the parameter default) may be used to apply to all columns.

Returns:

Returns a function that can be used as a step in a phaser pipeline.

Example usage:

phase.steps = [drop_duplicate_rows('guid')]
@phaser.check_unique(column, strip=True, ignore_case=False)

This is a step factory that will create a step that tests that all the values in a column are unique with respect to each other. It does not change any values permanently (strip spaces or lower-case letters).

Parameters:
  • column – The column class or name of the column in which all values should be unique.

  • strip – whether to strip spaces from all values (defaults to True)

  • ignore_case – whether to lower-case all values (defaults to False)

Returns:

Returns a function that can be used as a step in a phaser pipeline

@phaser.sort_by

This is a step factory that will create a step that orders rows by the values in a give column.

Parameters:

column – The column that will be ordered by when the step is run

Returns:

The function that can be added to a phase’s list of steps.

@phaser.filter_rows

This step factory will keep only specified rows. While there are other ways to accomplish the same thing, many of those create a DROPPED_ROW message for each dropped row. This will summarize how many rows were dropped. Consider also using DropRowException in a custom row_step if the logic doesn’t conveniently fit in this built-in.

Parameters:

func – The function that, if it returns true, will result in each row being kept.

Returns:

A function that can be added to a phase’s list of steps.

Usage:

filter_rows(lambda row: row['type'] == 'basal')
filter_rows(lambda row: row['type'] != None)
filter_rows(lambda row: row['sampleSize'] > 5)  # depends on sampleSize being an IntColumn or cast to int before
@phaser.flatten_all(row, context, **kwargs)

The flatten_all step is useful in JSON data handling, which often has value fields within JSON-formatted fields within records. In flattening, new column names are concatenated from hierarchical names. For example, flattening a column called ‘payload’ which is a dict with fields called ‘type’ and ‘id’ will create one new column for each key called ‘payload__type’ and ‘payload__id’.

This function can be used directly as a step in a Phase and does not take any parameters.

Usage:

phase = phaser.Phase(
    name='extract',
    steps=[flatten_all, drop_duplicate_rows['payload__id']]
)
@phaser.flatten_column(column_name, deep=True)

The flatten_column step is useful in JSON data handling, which often has value fields within JSON-formatted fields within records. Names are concatenated with ‘__’ as with flatten_all. Only chosen columns are flattened in this usage.

Parameters:
  • column_name – The name of the column with internal substructure to flatten into multiple columns.

  • deep – Whether to iterate and flatten substructure within the substructure of the column. Defaults to True.

Returns:

Returns a function that can be used as a step in a Phase.

Usage:

phase.steps = [
    flatten_column('payload')
    flatten_column('performance_detail', deep=False)  # Only flatten one layer into the substructure.
]

Diff logic

See Checkpoint files and Table-oriented diffs for how saving intermediate results and being able to diff them is supported in phaser and can help debugging and developing workflows. The classes below provide the diff functionality and the ability to plug in additional formats or functional changes.

class phaser.IndexedTableDiffer(f1, f2, index_column_name='__phaser_row_num__', index_type='int', column_renames={})

IndexedTableDiffer compares two tables that have some rows in common as determined by an index value. Two rows with the same index are compared field-by-field. Rows that are in one table and not the other are deemed to have been added or removed.

Methods

IndexedTableDiffer(f1, f2, index_column_name, index_type, column_renames)

Takes two tables f1 and f2 in record-oriented (table of dicts) format. The index_column name is the name of the field in each record that provides a row number or row index unique to each row. The index_type allows the row index to be cast to an int, if appropriate, so that the diff output is sorted by integer value rather than string value. Finally, the column_renames dict can be a mapping from old column name (in f1) to new column name (in f2).

html()

Returns the results of the diff processing formatted in HTML

output(formatter_class)

Returns the results of the diff processed by another formatter extending FormatterBase.

class phaser.HtmlTableFormat(old_and_new_columns)

By default, the HtmlTableFormat formatter is used to create table-oriented diffs that highlight added and removed rows and columns, and do within-field diffs of cell values.

class phaser.FormatterBase

Implement the abstract base class FormatterBase to provide a different way of formatting a diff between two tabular data sets. Pass the formatter class to phaser’s IndexedTableDiffer to use that class’ logic to generate the diff details.