banffprocessor package#

Subpackages#

Submodules#

banffprocessor.processor module#

class banffprocessor.processor.Processor(input_params: ProcessorInput | None = None, initial_data: ProcessorData | None = None, dbconn: DuckDBPyConnection | None = None, indata: Table | DataFrame | None = None, indata_aux: Table | DataFrame | None = None, indata_hist: Table | DataFrame | None = None, instatus: Table | DataFrame | None = None, instatus_hist: Table | DataFrame | None = None)[source]#

Bases: object

Main class for the Banff processor.

Attribute processor_data:

Contains all the datasets and parameters required to run a Banff Processor job

Attribute is_child_block:

Is the job running in this processor instance a child of another currently running processor instance executing the parent block?

property dbconn: DuckDBPyConnection | None#

The currently connected database used to store some processor data.

Returns:

The duckdbpyconnection currently being used to store data.

Return type:

duckdb.DuckDBPyConnection | None

execute() None[source]#

Execute the Banff Processor.

Iterates over the banffprocessor.metadata.Jobs entries in the processor_data metaobjects collection and performs the respective Banff or user defined procedures.

Raises:

ProcedureReturnCodeError – If the return code returned by a Banff or user defined procedure is non-zero

classmethod from_file(input_filepath: str | Path, indata: Table | DataFrame | None = None, indata_aux: Table | DataFrame | None = None, indata_hist: Table | DataFrame | None = None, instatus: Table | DataFrame | None = None, instatus_hist: Table | DataFrame | None = None, dbconn: DuckDBPyConnection | None = None) Processor[source]#

Initialize a src.banffprocessor.processor.Processor object from a JSON file.

Parameters:
  • input_filepath (str | pathlib.Path) – The full path to the JSON file containing the input parameters required to run the processor. The containing folder will be used as the default location for required and optional files for the operation of the processor. If a value is provided for an alternate filepath/folder parameter in this file, that will be used instead of the containing folder.

  • indata (pyarrow.Table | pandas.DataFrame | None, optional) – The indata dataset, defaults to None

  • indata_aux (pyarrow.Table | pandas.DataFrame | None, optional) – The indata_aux dataset, defaults to None

  • indata_hist (pyarrow.Table | pandas.DataFrame | None, optional) – The indata_hist dataset, defaults to None

  • instatus (pyarrow.Table | pandas.DataFrame | None, optional) – The instatus dataset, defaults to None

  • instatus_hist (pyarrow.Table | pandas.DataFrame | None, optional) – The instatus_hist dataset, defaults to None

  • dbconn (duckdb.DuckDBPyConnection | None, optional) – A DuckDBPyConnection to use for storing required data and metadata. If not provided an in-memory DB will be instantiated, defaults to None

Returns:

The src.banffprocessor.processor.Processor object created using the specified parameters

Return type:

src.banffprocessor.processor.Processor

is_child_block: bool#
processor_data: ProcessorData#
save_outputs() None[source]#

Call the save_outputs method for the current job’s processor_data.

banffprocessor.processor.get_args(args: list | str | None = None) ArgumentParser[source]#

Create an argument parser.

Example args -> [“my_filename.xlsx”, “-o”, “/my/out/folder”, “-l”, “fr”]

banffprocessor.processor.init() None[source]#

Call the main function.

Used when running this module from the command line. Created to faciliate testing.

banffprocessor.processor.main(iargs: list | str | None = None) None[source]#

Call the Banff Processor function.

Used when running this module from the command line. Created to faciliate testing.

banffprocessor.processor.write_to_console(text: str) None[source]#

Write text to the console.

Write the text to the console instead of the log file. This was put in a function as there may be a better way to do this.

banffprocessor.processor_data module#

class banffprocessor.processor_data.ProcessorData(input_params: ProcessorInput | None = None, dbconn: DuckDBPyConnection | None = None)[source]#

Bases: object

Helper class for the Processor in order to allow easier passing of data to user procs.

Attribute _datasets:

A collection of datasets for the current job

Attribute input_params:

The object containing the input parameters from the user’s input JSON file

Attribute metaobjects:

The collection of metadata objects

Attribute by_varlist:

The list of by variable fieldIDs for the current job step

Attribute current_job_step:

The src.banffprocessor.metadata.models.Jobs object for the current job step being run

Attribute current_uservars:

A mapping of var to value attributes for the src.banffprocessor.metadata.models.Uservars objects for the current job step

Attribute custom_outputs:

The table names that are requested to be output from a proc used in the job

Attribute total_job_duration:

Used for tracking runtime when performing execute()

Attribute curr_step_count:

The current step in the absolute sequence of all job steps.

apply_process_controls() None[source]#

Apply a set of process controls for a single controlid in controls according to their specifications and sets the filtered dataset(s) in processor_data.

by_varlist: list[str]#
clean_status_file() None[source]#

Implement cleanStatusAll.sas. Removes all extra columns from the statusAll file.

clear_filtered_data() None[source]#

Clear all filtered datasets off of the dataset objects in the _dataset collection.

Use once a proc has finished executing as we no longer need the filtered data.

curr_step_count: int#
current_job_step: Jobs#
current_uservars: dict[str, str]#
custom_outputs: list[str]#
property datasets: dict[str, Dataset]#

Return a dictionary of datasets being tracked by the ProcessorData object.

property dbconn: DuckDBPyConnection#

Return a connection to the database being used.

get_dataset(name: str, create_if_not_exist: bool = False, ds_format: str = 'pyarrow') Table | DataFrame | Dataset | None[source]#

Get a dataset by name (case-insensitive) in format ds_format.

If no dataset is found returns None unless create_if_not_exist is True in which case a new dataset is created under the name name with an empty pyarrow Table. This new Table is then returned.

Parameters:
  • name (str) – The name of the dataset to retrieve

  • create_if_not_exist (bool, optional) – Creates a dataset with name if no existing one was found, defaults to False

  • ds_format (str, optional) – A string with the name of the format to return the dataset in. Possible options are “pyarrow” for a pyarrow Table, “pandas” for a pandas DataFrame, and “object” for a banffprocessor.util.Dataset object, defaults to “pyarrow”

Returns:

The dataset with name if it is found or create_if_not_exist is set, None if not

Return type:

pa.Table | pd.DataFrame | banffprocessor.util.dataset.Dataset | None

get_new_block_copy() ProcessorData[source]#

Return a new ProcessorData object with the necessary attributes copied in from this ProcessorData object to spawn a new processor block.

Returns:

A new ProcessorData object with the required input parameters, metadata and datasets copied in.

Return type:

ProcessorData

property indata: Table#

The indata dataset for a Banff procedure.

Getter:

Returns the imputed_file input dataset

Setter:

Sets the imputed_file input dataset

Type:

pa.Table

input_params: ProcessorInput#
property instatus: Table#

The instatus dataset for a Banff procedure.

Getter:

Returns the status_file dataset

Type:

pa.Table

load_dataset_from_file(data_file: str | Path) Table[source]#

Load a Pyarrow Table from the data file indicated by the data_file parameter.

If data_file contains a full filepath, the file will be retrieved using this filepath. If data_file only contains a filename, the processor_input.input_folder is used as the containing directory to fetch from.

Parameters:

data_file (str | Path) – The full filepath or filename of the data file to load

Raises:
  • ProcessorInputParameterError – If an empty filename is given or just a filename but no input_folder had been previously specified in the processor_input member object.

  • Exception – If some other issue causes the file to not be read

Returns:

The data_file as loaded into a PyArrow Table

Return type:

pa.Table

metaobjects: MetaObjects#
property minimal_outputs: list[str]#

Return the minimal list of names of tables output by default.

property outdata: Table#

The outdata dataset of a procedure.

Required to be set for a procedure to update the imputed_file on completion.

Getter:

Returns the outdata dataset

Setter:

Sets the outdata dataset

Type:

pa.Table

output_required(output_name: str) bool[source]#

Return true if the output is required for the current job step.

property outstatus: Table#

The outstatus dataset for a Banff procedure.

Required to be set for a procedure to update the status_file dataset on completion.

Getter:

Returns the outstatus dataset

Setter:

Sets the outstatus dataset

Type:

pa.Table

pop_dataset(name: str) Dataset | None[source]#

Remove the banffprocessor.util.dataset.Dataset object found under name (case-insensitive) from the dataset collection and return the object if it exists.

Returns None if the dataset is not found.

Parameters:

name (str) – The name of the dataset to pop

Returns:

The pop()’d dataset, None if not found

Return type:

pd.DataFrame | None

save_outputs() None[source]#

Save all current tables in _datasets to the output folder.

Each dataset’s name will be used as the filename, the extension is determined by the input parameters: The save_format input parameter is checked first, then the file type of indata. If neither parameter is provided or the filetype is unrecognized .parq is used by default.

save_proc_output(banff_call: BanffProcedure, all_tables: list[str]) None[source]#

Save procedure output datasets.

Append the non-default tables, the list of which is set on processor_data by the main processor loop, from banff_call to their respective cumulative datasets in the dataset collection based on the output type parameters set in the input_params.

Parameters:
  • banff_call (banff.proc.BanffProcedure) – A BanffProcedure call object that has completed execution and contains the output tables as attributes named as they are specified in processor_to_proc

  • all_tables (list[str]) – The list of all possible table names available on banff_call, required if process_output_type is ALL

Raises:
  • MetadataConstraintError – If a user’s ProcessOutputs metadata contains an unrecognized output_name

  • AttributeError – If the processor_to_proc mapping contains a table name that is not found as an attribute on the banff_call object.

set_dataset(name: str, ds: Table | DataFrame | Dataset) None[source]#

Set ds as an entry in the dataset collection under name (case-insensitive).

If a dataset under name already exists, ds will be saved as the current output version of the name dataset and will be used to update the cumulative version when the current job step completed. If name identifies a dataset that is produced by default, ds instead directly overwrites the cumulative version, and no current output version is saved.

Parameters:
  • name (str) – The name of the dataset to set

  • ds (pyarrow.Table | pandas.DataFrame | banffprocessor.util.dataset.Dataset) – The dataset to set

set_dataset_from_file(name: str, data_file: str | Path) None[source]#

Create a pa.Table from the data file at data_file and load it to the dataset collection under the name name (case-insensitive).

If data_file only contains a filename and not a full path, the input parameters are used to determine the appropriate directory to look in. Any existing dataset of the same name is replaced.

Parameters:
  • name (str) – The name to identify the resultant dataset by

  • data_file (str | Path) – The full filepath or just filename of the data file to create a dataset from

property status_file: Table#

The status_file dataset for a Banff job.

Getter:

Returns the status_file dataset

Setter:

Sets the status_file dataset

Type:

pa.Table

total_job_duration: datetime#
property total_job_steps: int | None#

Return the total number of job steps for the current job.

update_cumulatives(pre_execute_tables: set[str]) None[source]#

Update every non-default dataset in the dataset collection with its respective ds_curr_output version, if the output is required by the configured input parameters and metadata.

Parameters:

pre_execute_tables (set[str]) – The set of table names in the dataset collection prior to the execution of the current job step

update_file_all(dataset: str | Table | Dataset, dataset_all_name: str | None = None) None[source]#

Update the respective cumulative version of dataset using the contents of the non-cumulative version after adding the current jobid and seqno to it.

If dataset is given as a Dataset object or the name of the object to fetch, the object may contain both the cumulative and curr_output versions. If it only contains the cumulative version, the jobid and seqno columns are still added to it if they are not yet present. As well in this case, the jobid and seqno column additions will be saved back to the non-cumulative dataset if dataset_all_name is seperately provided. Otherwise the changes to the non-cumulative dataset are discarded and only the updated cumulative version is saved.

Parameters:

dataset – Either the name of a dataset to fetch, the Dataset object itself or a specific

pyarrow table. If provided as a pyarrow table no changes are saved back to this dataset and dataset_all_name must be provided as well. :type dataset: str | pa.Table | banffprocessor.util.dataset.Dataset :param dataset_all_name: The name of the dataset to update using dataset_name.

If not provided the non-cumulative dataset’s name is used, defaults to None

Raises:

ValueError – If dataset is given as a pa.Table but no dataset_all_name is given.

update_imputed_file() None[source]#

Implement non-‘prebanff’ section of updateImputedFile.sas.

Updates the imputed_file with the data found in outdata. Does not add any records from outdata without a matching index value in imputed_file, only updates matching records. imputed_file is updated inplace.

update_status() None[source]#

Append outstatus dataset to status_file and updates status_file where outstatus shares the same index values.

Also calls update_file_all() to update status_log with outstatus.

update_with_block_result(block_data: ProcessorData) None[source]#

Update this ProcessorData object with the datasets found in block_data as it is upon completion of a JOB process which calls a Process Block.

Parameters:

block_data (ProcessorData) – The ProcessorData object containing the results from a Process Block execution.

banffprocessor.processor_data.ds_to_format(ds: Table | DataFrame, ret_format: str) Table | DataFrame | None[source]#

Convert ds to format.

ret_format currently supports these values:

For Pandas DataFrame: [“pandas”, “dataframe”, “pandas dataframe”] For PyArrow Table: [“pyarrow”, “table”, “pyarrow table”]

Parameters:
  • ds (pa.Table | pd.DataFrame | Dataset) – The dataset to convert

  • ret_format (str) – The name of the format to convert to.

Raises:
  • Exception – If an error occurs during the conversion of the file.

  • ValueError – If ret_format is not a recognized format string.

Returns:

A new dataset containing the data of ds converted into ret_format, None if ds is None

Return type:

pa.Table | pd.DataFrame | None

banffprocessor.processor_input module#

class banffprocessor.processor_input.ProcessOutputType(*values)[source]#

Bases: Enum

Represents the different sets of outputs the processor should be creating for a job.

ALL = 2#
CUSTOM = 3#
MINIMAL = 1#
class banffprocessor.processor_input.ProcessorInput(job_id: str, unit_id: str | None = None, input_folder: str | Path | None = None, indata_filename: str | Path | None = None, auxdata_filename: str | Path | None = None, indata_aux_filename: str | Path | None = None, histdata_filename: str | Path | None = None, indata_hist_filename: str | Path | None = None, histstatus_filename: str | Path | None = None, instatus_hist_filename: str | Path | None = None, instatus_filename: str | Path | None = None, user_plugins_folder: str | Path | None = None, metadata_folder: str | Path | None = None, process_output_type: str | int | None = None, seed: int | None = None, no_by_stats: str | bool | None = None, randnumvar: str | None = None, save_format: list[str] | None = None, output_folder: str | Path | None = None, log_level: int | None = None)[source]#

Bases: object

Holds input parameters from a user’s JSON input file used to configure a Banff Processor job.

classmethod from_file(filepath: str | Path) ProcessorInput[source]#

Initialize a src.banffprocessor.processor_input.ProcessorInput object from a JSON file.

Parameters:

filepath (str | Path) – the full path to the JSON file containing the input parameters required to run the processor.

Raises:
  • ProcessorInputParameterError – If the filepath does not contain a valid directory or JSON filename

  • FileNotFoundError – If the file at filepath is not able to be found

Returns:

The object loaded from the parameters in the JSON file

Return type:

src.banffprocessor.processor_input.ProcessorInput

banffprocessor.processor_input.get_path_val(fpath: str | Path | None) Path | None[source]#

Return the Path representation of fpath or None if fpath is None or empty.

Parameters:

fpath (str | Path | None) – The filepath to convert

Returns:

The Path representation of fpath

Return type:

Path | None

banffprocessor.processor_input.get_string_param_value(parameter_to_check: str | None) str | None[source]#

Process string values from the parameter file.

Parameters:

parameter_to_check (str | None) – The string value to process

Returns:

None if parameter_to_check is None, empty or only whitespace and the original string with whitespace trimmed from beginning and end if not

Return type:

str | None

Module contents#

banffprocessor.set_language(new_lang: SupportedLanguage = SupportedLanguage.en) None[source]#

Set the languaged used for console and log messages.

For new_lang, specify a value from enum banffprocessor.SupportedLanguage.