API¶
Phaser¶
phaser - a library to simplify automated batch-oriented complex data integration pipelines
Pipeline : organizes a data pipeline into multiple phases, each with checkpoints so that the pipeline can be
continued from any phase.
Phase : Organizes the work of a complicated pipeline into a smaller unit. It still contains multiple steps and
column definitions/fixes, but it can be run standalone from its input data and have its output examined as both
a checkpoint and input to the next phase.
PipelineErrorException, DropRowException, WarningException : these exceptions can be used in custom steps to define what should happen when the exception is raised.
row_step, batch_step : These are decorators used to wrap custom steps so they can be called by a Phase with the
right input data, and check the output and handle exceptions.
check_unique, sort_by : Built-in steps that can be declared to operate on a column, and included in the steps of a Phase.
Column, IntColumn, DateColumn, DateTimeColumn : Define column instances by name and type, and pass them to the Phase to have datatype casting and name canonicalization done automatically.
Classes¶
- class phaser.Pipeline(working_dir=None, source=None, phases=None, verbose=False, error_policy=None)¶
Pipeline handles running phases in order. It also handles I/O and marshalling what outputs from phases get used as inputs in later phases.
- check_extra_outputs(phase)¶
Check that any extra outputs the phase declared have been added into the context. Throws a PhaserError if any do not exist, as that is a programming error that should be fixed.
- init_source(name, source_path)¶
Initializes a named source based on the kind of ‘source’ passed in.
- Parameters:
name – An IOObject that specifies the name and type of the source
source_path – must be a os.PathLike file in csv format and will be read entirely into memory
- load(source)¶
The load method can be overridden to apply a pipeline-specific way of loading data. Phaser default is to read data from a CSV file.
- report_errors_and_warnings(phase_name)¶
TODO: different formats, flexibility: For CLI operation we want to report errors to the CLI, but for unsupervised operation these should go to logs. Python logging does allow users of a library to send log messages to more than one place while customizing log level desired, and we could have drop-row messages as info and warning as warn level so these fit very nicely into the standard levels allowing familiar customization.
- save(results, destination)¶
This method saves the result of the Phase operating on the batch, in phaser’s preferred format. It should be easy to override this method to save in a different way, using pandas’ to_csv, to_excel, to_json or a different output entirely.
- setup_phases()¶
Instantiates phases passed as classes, assigns unique names to phases, and passes Context in also.
- validate_sources()¶
Check that all required sources have been initialized.
- class phaser.Phase(name=None, steps=None, columns=None, context=None, renumber=False, extra_sources=None, extra_outputs=None)¶
The organizing principle for data transformation steps and column definitions is the phase. A phase can
load a data file
Apply a set of preferred column names and datatypes via ‘columns’
Apply a further list of transformations via ‘steps’
While applying steps, can drop invalid or unwanted rows, add columns
Save only the desired columns
Provide a detailed diff or a summary of what changed in the phase
Attributes¶
- namestr
The name of the phase (for debugging and file name usage)
- stepslist
A list of functions that will be run in order on data loaded into the phase
- columnslist
A list of column definitions with declarations of how to handle the column name and data within the column. Columns are also processed in order, so a column early in the list that instructs the phase to drop rows without values will cause those rows never to be processed by columns later in the list.
- contextContext obj
Optional context information that can apply to multiple phases organized in a Pipeline. If no context is passed in, one will be created just for this Phase. The context will be passed to each step in case that step needs outside context.
- error_policy: str
The error handling policy to apply in this phase. Default is ON_ERROR_COLLECT, which collects errors, up to one per row, and reports all errors at the end of running the phase. Other options are ON_ERROR_WARN, which adds warnings that will all be reported at the end, ON_ERROR_DROP_ROW which means that a row causing an error will be dropped, and ON_ERROR_STOP_NOW which aborts the phase mid-step rather than continue and collect more errors. Any step that needs to apply different error handling than the phase’s default can throw its own typed exception (see step documentation).
Methods¶
- run(source, destination)
Loads data from source, applies all the phase’s column definitions and steps, and saves to destination. If run inside a Pipeline, the pipeline will call this, but for debugging/developing or simpler data transformations, this can be used to run the phase without a Pipeline.
- load(source)
If creating a Phase that takes data in a custom way, subclass Phase and override the load method. Besides overriding the load method, users of Phase should not need to run load directly as it is run as part of ‘run’. if overriding ‘load’, make sure that both phase.headers and phase.row_data are set up before finishing the method.
- save(source)
If creating a Phase that sends data to a custom destination, subclass Phase and override the save method. If the method is not overridden, the phase will save the data in CSV format at the destination.
- execute_row_step(step, outputs={})¶
Internal method. Each step that is run on a row is run through this method in order to do consistent error numbering and error reporting.
- load_data(data)¶
Call this method to pass record-oriented data to the Phase before calling ‘run’ Can be overridden to load data in a different structure. Note that in normal operation, a Records object is passed in with Record objects and row numbers - however if a Phase is being used in tests, it makes testing a lot easier if load_data can take a raw list of dicts and row numbers get added.
- prepare_for_save()¶
Checks consistency of data and drops unneeded columns
- rename_columns()¶
Renames columns: both using case and space (‘_’, ‘ ‘) matching to convert columns to preferred label format, and using a list of additional alternative names provided in each column definition. It would be cool if this could be done before converting everything to list-of-dicts format…
- run()¶
Each kind of phase has a different process for doing its work, so this method must be overridden.
- class phaser.Context(variables=None, working_dir=None, error_policy='ON_ERROR_COLLECT', verbose=False)¶
Context is created by the pipeline, and passed to each phase. Thus, it can be used to carry extra data or variable values between phases if necessary.
- add_variable(name, value)¶
Add variables that are global to the pipeline and accessible to steps and internal methods
- process_exception(exc, phase, step, row)¶
A method to delegate exception handling to turn into error reporting in standardized way. Called by phase’s step handlers when a phaser data exception or a coding exception occurs :param exc: The exception or error thrown :param step: What step this occurred in :param row: What row of the data this occurred in :param error_policy: The phase’s chosen error handling policies (ON_ERROR_COLLECT, ON_ERROR_STOP_NOW, etc.) :return: Nothing
Columns¶
- class phaser.Column(name, required=True, null=True, blank=True, default=None, fix_value_fn=None, rename=None, allowed_values=None, save=True, on_error=None)¶
Sets up a Column instance ready to do type, format, null and default checking on values, as well as renaming the column name itself to chosen version. This column is useful for fields with Strings as values.
- Parameters:
name – The preferred name/presentation of the column, e.g. “Date of Birth” or “first_name”
required – If the column is required, the phase will present errors if it is missing.
null – Checks all values of the column for null values to raise as errors.
default – A default value to apply if a column value is null. Not compatible with “null=False”
fix_value_fn – A function (string or callable) or array of functions to apply to each value
rename – A set of names that may be used in the data as column headers, all of which should be mapped to the preferred name of this column. Upon loading the data, all rows that have columns matching any alternate name in this set will have a column with the preferred name with the same data in it. In other words, any data in a column name in
renamewill end up in a column namedname.allowed_values – If allowed_values is not empty and a column value is not in the list, raises errors.
save – if True, column is saved at the end of the phase; if not it is omitted.
on_error – Choose from ‘warn’, ‘drop_row’, ‘collect’, ‘stop_now’ to pick how errors checking or fixing this column affect the pipeline.
- cast(value)¶
Basic column only fixes NaN values. Even values like “NULL” or “None” might be actual values if we don’t know the type. this is good to subclass to cast python data types or custom objects.
- check_and_cast_value(row)¶
This checks to see if the value is there before attempting to cast it. It does some checks before casting the value to a datatype, and some other checks afterward. . Most of the time, a custom algorithm for converting a value to a specific datatype can just override the simpler ‘cast’ method.
- Parameters:
row – entire row is passed for simplicity elsewhere and in case this needs more scope
- check_required(data_headers)¶
If this column is required, then checks the list of headers of the dataset to see if its name is there.
- Parameters:
data_headers – just the column headers found in data
- Returns:
None
- check_value(value)¶
Raises chosen exception type if something is wrong with a value in the column. One can override this to use a different exception or check value in a different way (don’t forget to call super().check_value()
- fix_value(value)¶
Sets value to default if provided and appropriate, and calls any functions or methods passed as ‘fix_value_fn’. Also, this method can be overridden if a special column has a custom way to fix values and it’s worth subclassing Column - just be sure to call value = super(value) if you want to apply the logic already here.
- class phaser.BooleanColumn(name, required=True, null=False, default=None, fix_value_fn=None, rename=None, save=True, on_error=None)¶
Validates truthy and falsey values, as defined in TRUE_VALUES and FALSE_VALUES.
- class phaser.IntColumn(name, required=True, null=True, default=None, fix_value_fn=None, rename=None, allowed_values=None, save=True, on_error=None, min_value=None, max_value=None)¶
Sets up a Column instance ready to do type, format, null and default checking on values, as well as renaming the column name itself to chosen version.
- Parameters:
name – The preferred name/presentation of the column, e.g. “Date of Birth” or “first_name”
required – If the column is required, the phase will present errors if it is missing.
null – Checks all values of the column for null values to raise as errors.
default – A default value to apply if a column value is null. Not compatible with “null=False”
fix_value_fn – A function (string or callable) or array of functions to apply to each value
rename – A set of names that may be used in the data as column headers, all of which should be mapped to the preferred name of this column. Upon loading the data, all rows that have columns matching any alternate name in this set will have a column with the preferred name with the same data in it. In other words, any data in a column name in
renamewill end up in a column namedname.allowed_values – If allowed_values is not empty and a column value is not in the list, raises errors. To supply a range, use min_value and max_value instead. NOTE: this is checked after casting, so to check allowed values of a column specified to cast to int, such as IntColumn, check for values like [1, 2, 3] rather than [“1”, “2”, “3”]
save – if True, column is saved at the end of the phase; if not it is omitted.
min_value – If data is below this value, column raises errors
max_value – If data is above this value, column raises errors
- class phaser.FloatColumn(*args, **kwargs)¶
Defines a column that accepts a float value. See
IntColumnfor parameters.
- class phaser.DateColumn(name, required=True, null=True, default=None, fix_value_fn=None, rename=None, allowed_values=None, save=True, on_error=None, min_value=None, max_value=None, date_format_code=None, default_tz=None)¶
A column that supports the date value only (no time). See
DateTimeColumnfor parameters.
- class phaser.DateTimeColumn(name, required=True, null=True, default=None, fix_value_fn=None, rename=None, allowed_values=None, save=True, on_error=None, min_value=None, max_value=None, date_format_code=None, default_tz=None)¶
Sets up a DateColumn instance ready to do type, format, null and default checking on values, as well as renaming the column name itself to chosen version.
- Parameters:
name – The preferred name/presentation of the column, e.g. “Date of Birth” or “first_name”
required – If the column is required, the phase will present errors if it is missing.
null – Checks all values of the column for null values to raise as errors.
default – A default value to apply if a column value is null. Not compatible with “null=False”
fix_value_fn – A function (string or callable) or array of functions to apply to each value
rename – A set of names that may be used in the data as column headers, all of which should be mapped to the preferred name of this column. Upon loading the data, all rows that have columns matching any alternate name in this set will have a column with the preferred name with the same data in it. In other words, any data in a column name in
renamewill end up in a column namedname.allowed_values – If allowed_values is not empty and a column value is not in the list, raises errors. To supply a range, use min_value and max_value instead.
save – if True, column is saved at the end of the phase; if not it is omitted.
min_value – If data is below this value, column raises errors
max_value – If data is above this value, column raises errors
date_format_code – Formatting string used by datetime.strptime to parse string to date, e.g. ‘%d/%m/%y %H:%M:%S.%f’, ‘%d/%m/%Y’ or ‘%m/%d/%y’. If left None, class will use dateutil.parser.
default_tz – If timezone is not specified in value, assume this timezone applies.
Exceptions¶
- exception phaser.DropRowException(message, **kwargs)¶
Throwing this exception in a row_step will cause the current row to be dropped. Rows dropped this way will be listed in the phase results report along with a reason given in the exception constructor.
Steps¶
Steps are defined using decorators. A step is meant to be a function that accepts an input and produces an output.
@row_step
def add_an_id(row):
row['id'] = next_id()
return row
Occasionally extra sources or outputs are needed. In order to use them, they
must first be defined on the Pipeline as an ExtraRecords object or an
ExtraMapping object. The extra sources or outputs must then be declared in
the decorator as in the example below. In the example, foo is an
ExtraMapping, which presents to the function as a dictionary, and bar is an
ExtraRecord, which presents like a list.
@row_step(extra_sources=['foo'], extra_outputs=['bar'])
def do_foo_and_bar(row, foo, bar):
row['foo'] = foo[row['id']]
bar.append(row['bar'])
return row
All steps can accept a parameter named context which will be the
Context object for the current run of the pipeline.
- @phaser.row_step(func=None, *, extra_sources=None, extra_outputs=None)¶
Used to define a step that should run on each row of a data set.
The function that is decorated should accept a dictionary as its first parameter. If extra_sources or extra_outputs are defined, then they will be passed in as explicit parameters, named according to their definition in the decorator. Sources and outputs appear to the step function as the type of object defined at the pipeline level. If a
Contextis required, then it can be included as a parameter, too.The decorated function must return a dict, or throw an exception.
- Parameters:
extra_sources – An array of source names
extra_output – An array of names of outputs
- @phaser.batch_step(func=None, *, extra_sources=None, extra_outputs=None, check_size=False)¶
Used to define a step that needs to run on the whole batch of data.
The decorated function should accept a list of dictionaries as its first parameter.
- Parameters:
extra_sources – An array of source names
extra_output – An array of names of outputs
check_size – A boolean indicating whether or not to validate the size of the batch after the step is run
- @phaser.context_step(func=None, *, extra_sources=None, extra_outputs=None)¶
Used to define a step that works on the context.
The decorated function should accept the context as its first parameter.
- Parameters:
extra_sources – An array of source names
extra_output – An array of names of outputs
- @phaser.dataframe_step(func=None, *, pass_row_nums=True, extra_sources=None, extra_outputs=None, check_size=False)¶
Used to define a step that needs to run on the whole set of data as a
pandas.DataFrame.The decorated function should accept a DataFrame as its first parameter.
- Parameters:
pass_row_nums – If True, the row numbers will be set in the DataFrame in a column named the value of
PHASER_ROW_NUMextra_sources – An array of source names
extra_output – An array of names of outputs
check_size – A boolean indicating whether or not to validate the size of the DataFrame after the step is run