Concepts

Phaser was designed with only a few concepts to make managing data pipelines as simple as possible.

Pipelines

Pipeline is the main organizing unit that is used to define the structure of the work to be done. It runs one or more Phases and does I/O, marshalling source data and checkpoint data between them. It will

  • load source data from files or a previous phase

  • save checkpoint data between phases

  • save outputs

  • marshal inputs and outputs between phases

  • report errors or warnings as summaries

  • captures and handles errors, according to the error policy

Errors and warnings are output to a file in the working directory by default.

Phases

Each Phase runs one or more steps with individual data transformation or validation logic, and the Phase does routine work in a robust way:

  • transform column headers to preferred name/case

  • routine parsing and data typing

Columns

A Phase can be configured with the data that it expects, defined as Columns.

When columns are passed to the Phase, then the data formats and constraints are enforced at the beginning of a Phaser. Many steps that might otherwise have been programmed functionally are therefore available to declare.

Columns and features available so far:

Steps

For your data pipeline project, most of the work unique to that project can be done within steps that operate in a Phase to give structure and debuggability:

  • Steps are meant to be written as pure functions so they can be individually testable with simple pythonic ways to pass row data and verify results

  • Steps can drop rows with bad data

  • Steps can access context information

  • Steps can create warnings or errors

  • Pre-baked steps are available to check uniqueness values and do common transforms

Checkpoint files

Phaser produces intermediate results as well as final results of data transformations. These checkpoint files solve a number of important problems

  • Integrations - if part of the pipeline should be executed outside the phaser framework, checkpoint files are a point where the outside tool can pick up the data in progress, and take it to the next step for a Phase to pick up.

  • Debugging - easier to narrow down where an error may have been introduced, such as finding out which step removed a row that should not have been removed

  • Resuming pipeline runs - whether doing this in development or in production, checkpoint files make it possible to fix an error and resume the pipeline from the last checkpoint.

  • Change reporting - checkpoint files are used to generate table-oriented diff reports and summary reports, which are user-friendly ways to see what changes were made.

  • External verification - checkpoint files are used to summarize how many changes were made without relying on the steps doing the changes reporting that correctly.

Checkpoint files are saved at the end of each phase in the pipeline’s working directory. The main source file and output are also saved to the working directory, which makes it possible to calculate diffs for each phase and for the entire pipeline. Since diffs are calculated by actual changes to data at each save, diffs are not affected by potential bugs in steps; the step cannot report out changes that it did not make or the reverse. Checkpoint data is saved without any change information or metadata included in the files, nor is there any sampling or filtering other than what has been done in the pipeline so far, so checkpoint data can be used or analyzed with all sorts of data tools.

Table-oriented Diffs

A usable way of seeing what data has changed is a key part of a good developer experience with data transformations. For table-oriented data, a table-oriented diff is a much better experience than line-oriented diffs.

Diff in table format with colored highlighting

Phaser’s built-in tool for generating table-oriented diffs is extensible and re-usable. Showing these kinds of diffs in a friendly format, that allows a person to scan rows and columns quickly for what changes they expect and what changes they don’t expect, can be a very important part of robust data processes and procedures. One of the authors of phaser used table-oriented diffs (generated by django import-export) to compare weekly changes to customer data before applying those changes to a production database, and the visual comparison caught many errors early.

The CLI ‘diff’ tool can be used to generate all available diffs for a pipeline run that has completed. It uses the Pipeline code written by the developer to figure out which columns have been renamed – otherwise each renamed column looks like an added column and a deleted column (and it becomes hard to know which rows actually had changes in that column). Diffs are generated from checkpoint files, without using step logic at all.

The IndexedTableDiffer is the class that performs the diff between two indexed tables. Both tables need consistent row indexes, such as row numbers. If the index column is not named “__phaser_row_num__”, another index column can be provided. If a list of renamed columns is provided to the differ, then it can do a better job of representing those columns. The data format expected by the tool is a python table of dicts, called (e.g. in pandas) record-oriented representation.

The values gathered by the differ are formatted using the HtmlTableFormat class that gets called field by field and row by row to format the diff output. This formatter can be replaced by another class that implements the same FormatterBase base class that HtmlTableFormat inherits from, and that class can be provided to the IndexedTableDiffer class when it starts generating output.