banffprocessor package#
Subpackages#
- banffprocessor.exceptions package
- banffprocessor.metadata package
- Subpackages
- banffprocessor.metadata.models package
- Submodules
- banffprocessor.metadata.models.algorithms module
- banffprocessor.metadata.models.donorspecs module
- banffprocessor.metadata.models.editgroups module
- banffprocessor.metadata.models.edits module
- banffprocessor.metadata.models.errorlocspecs module
- banffprocessor.metadata.models.estimators module
- banffprocessor.metadata.models.estimatorspecs module
- banffprocessor.metadata.models.expressions module
- banffprocessor.metadata.models.jobs module
- banffprocessor.metadata.models.massimputationspecs module
- banffprocessor.metadata.models.metadataclass module
- banffprocessor.metadata.models.outlierspecs module
- banffprocessor.metadata.models.processcontrols module
- banffprocessor.metadata.models.processoutputs module
- banffprocessor.metadata.models.proratespecs module
- banffprocessor.metadata.models.uservars module
- banffprocessor.metadata.models.varlists module
- banffprocessor.metadata.models.verifyeditsspecs module
- banffprocessor.metadata.models.weights module
- Module contents
- banffprocessor.metadata.models package
- Submodules
- banffprocessor.metadata.metaobjects module
MetaObjects
MetaObjects.add_objects_of_single_type()
MetaObjects.check_constraints()
MetaObjects.cleanup_metadata()
MetaObjects.dbconn
MetaObjects.display_load_summary()
MetaObjects.get_algorithm()
MetaObjects.get_edits_string()
MetaObjects.get_estimators()
MetaObjects.get_expression()
MetaObjects.get_job_steps()
MetaObjects.get_objects_of_type()
MetaObjects.get_process_controls()
MetaObjects.get_process_outputs()
MetaObjects.get_specs_obj()
MetaObjects.get_user_vars_dict()
MetaObjects.get_varlist_fieldids()
MetaObjects.get_weights_string()
MetaObjects.initialize_metadata()
MetaObjects.job_proc_names
MetaObjects.load_xml_file()
MetaObjects.total_job_steps
MetaObjects.validate_job_sequence()
- Module contents
- Subpackages
- banffprocessor.nls package
- banffprocessor.procedures package
- Subpackages
- banffprocessor.procedures.banff_procedures package
- Submodules
- banffprocessor.procedures.banff_procedures.deterministic module
- banffprocessor.procedures.banff_procedures.donorimputation module
- banffprocessor.procedures.banff_procedures.editstats module
- banffprocessor.procedures.banff_procedures.errorloc module
- banffprocessor.procedures.banff_procedures.estimator module
- banffprocessor.procedures.banff_procedures.job_proc module
- banffprocessor.procedures.banff_procedures.massimputation module
- banffprocessor.procedures.banff_procedures.outlier module
- banffprocessor.procedures.banff_procedures.prorate module
- banffprocessor.procedures.banff_procedures.verifyedits module
- Module contents
- banffprocessor.procedures.banff_procedures package
- Submodules
- banffprocessor.procedures.factory module
- banffprocessor.procedures.loader module
- banffprocessor.procedures.procedure_interface module
- Module contents
- Subpackages
- banffprocessor.processor_logger package
- banffprocessor.util package
Submodules#
banffprocessor.processor module#
- class banffprocessor.processor.Processor(input_params: ProcessorInput | None = None, initial_data: ProcessorData | None = None, dbconn: DuckDBPyConnection | None = None, indata: Table | DataFrame | None = None, indata_aux: Table | DataFrame | None = None, indata_hist: Table | DataFrame | None = None, instatus: Table | DataFrame | None = None, instatus_hist: Table | DataFrame | None = None)[source]#
Bases:
object
Main class for the Banff processor.
- Attribute processor_data:
Contains all the datasets and parameters required to run a Banff Processor job
- Attribute is_child_block:
Is the job running in this processor instance a child of another currently running processor instance executing the parent block?
- property dbconn: DuckDBPyConnection | None#
The currently connected database used to store some processor data.
- Returns:
The duckdbpyconnection currently being used to store data.
- Return type:
duckdb.DuckDBPyConnection | None
- execute() None [source]#
Execute the Banff Processor.
Iterates over the
banffprocessor.metadata.Jobs
entries in the processor_data metaobjects collection and performs the respective Banff or user defined procedures.- Raises:
ProcedureReturnCodeError – If the return code returned by a Banff or user defined procedure is non-zero
- classmethod from_file(input_filepath: str | Path, indata: Table | DataFrame | None = None, indata_aux: Table | DataFrame | None = None, indata_hist: Table | DataFrame | None = None, instatus: Table | DataFrame | None = None, instatus_hist: Table | DataFrame | None = None, dbconn: DuckDBPyConnection | None = None) Processor [source]#
Initialize a
src.banffprocessor.processor.Processor
object from a JSON file.- Parameters:
input_filepath (str | pathlib.Path) – The full path to the JSON file containing the input parameters required to run the processor. The containing folder will be used as the default location for required and optional files for the operation of the processor. If a value is provided for an alternate filepath/folder parameter in this file, that will be used instead of the containing folder.
indata (
pyarrow.Table
|pandas.DataFrame
| None, optional) – The indata dataset, defaults to Noneindata_aux (
pyarrow.Table
|pandas.DataFrame
| None, optional) – The indata_aux dataset, defaults to Noneindata_hist (
pyarrow.Table
|pandas.DataFrame
| None, optional) – The indata_hist dataset, defaults to Noneinstatus (
pyarrow.Table
|pandas.DataFrame
| None, optional) – The instatus dataset, defaults to Noneinstatus_hist (
pyarrow.Table
|pandas.DataFrame
| None, optional) – The instatus_hist dataset, defaults to Nonedbconn (
duckdb.DuckDBPyConnection
| None, optional) – A DuckDBPyConnection to use for storing required data and metadata. If not provided an in-memory DB will be instantiated, defaults to None
- Returns:
The
src.banffprocessor.processor.Processor
object created using the specified parameters- Return type:
src.banffprocessor.processor.Processor
- is_child_block: bool#
- processor_data: ProcessorData#
- banffprocessor.processor.get_args(args: list | str | None = None) ArgumentParser [source]#
Create an argument parser.
Example args -> [“my_filename.xlsx”, “-o”, “/my/out/folder”, “-l”, “fr”]
- banffprocessor.processor.init() None [source]#
Call the main function.
Used when running this module from the command line. Created to faciliate testing.
banffprocessor.processor_data module#
- class banffprocessor.processor_data.ProcessorData(input_params: ProcessorInput | None = None, dbconn: DuckDBPyConnection | None = None)[source]#
Bases:
object
Helper class for the Processor in order to allow easier passing of data to user procs.
- Attribute _datasets:
A collection of datasets for the current job
- Attribute input_params:
The object containing the input parameters from the user’s input JSON file
- Attribute metaobjects:
The collection of metadata objects
- Attribute by_varlist:
The list of by variable fieldIDs for the current job step
- Attribute current_job_step:
The
src.banffprocessor.metadata.models.Jobs
object for the current job step being run- Attribute current_uservars:
A mapping of var to value attributes for the
src.banffprocessor.metadata.models.Uservars
objects for the current job step- Attribute custom_outputs:
The table names that are requested to be output from a proc used in the job
- Attribute total_job_duration:
Used for tracking runtime when performing execute()
- Attribute curr_step_count:
The current step in the absolute sequence of all job steps.
- apply_process_controls() None [source]#
Apply a set of process controls for a single controlid in controls according to their specifications and sets the filtered dataset(s) in processor_data.
- by_varlist: list[str]#
- clean_status_file() None [source]#
Implement cleanStatusAll.sas. Removes all extra columns from the statusAll file.
- clear_filtered_data() None [source]#
Clear all filtered datasets off of the dataset objects in the _dataset collection.
Use once a proc has finished executing as we no longer need the filtered data.
- curr_step_count: int#
- current_uservars: dict[str, str]#
- custom_outputs: list[str]#
- property datasets: dict[str, Dataset]#
Return a dictionary of datasets being tracked by the ProcessorData object.
- property dbconn: DuckDBPyConnection#
Return a connection to the database being used.
- get_dataset(name: str, create_if_not_exist: bool = False, ds_format: str = 'pyarrow') Table | DataFrame | Dataset | None [source]#
Get a dataset by name (case-insensitive) in format ds_format.
If no dataset is found returns None unless create_if_not_exist is True in which case a new dataset is created under the name name with an empty pyarrow Table. This new Table is then returned.
- Parameters:
name (str) – The name of the dataset to retrieve
create_if_not_exist (bool, optional) – Creates a dataset with name if no existing one was found, defaults to False
ds_format (str, optional) – A string with the name of the format to return the dataset in. Possible options are “pyarrow” for a pyarrow Table, “pandas” for a pandas DataFrame, and “object” for a
banffprocessor.util.Dataset
object, defaults to “pyarrow”
- Returns:
The dataset with name if it is found or create_if_not_exist is set, None if not
- Return type:
pa.Table | pd.DataFrame | banffprocessor.util.dataset.Dataset | None
- get_new_block_copy() ProcessorData [source]#
Return a new ProcessorData object with the necessary attributes copied in from this ProcessorData object to spawn a new processor block.
- Returns:
A new ProcessorData object with the required input parameters, metadata and datasets copied in.
- Return type:
- property indata: Table#
The indata dataset for a Banff procedure.
- Getter:
Returns the imputed_file input dataset
- Setter:
Sets the imputed_file input dataset
- Type:
pa.Table
- input_params: ProcessorInput#
- property instatus: Table#
The instatus dataset for a Banff procedure.
- Getter:
Returns the status_file dataset
- Type:
pa.Table
- load_dataset_from_file(data_file: str | Path) Table [source]#
Load a Pyarrow Table from the data file indicated by the data_file parameter.
If data_file contains a full filepath, the file will be retrieved using this filepath. If data_file only contains a filename, the processor_input.input_folder is used as the containing directory to fetch from.
- Parameters:
data_file (str | Path) – The full filepath or filename of the data file to load
- Raises:
ProcessorInputParameterError – If an empty filename is given or just a filename but no input_folder had been previously specified in the processor_input member object.
Exception – If some other issue causes the file to not be read
- Returns:
The data_file as loaded into a PyArrow Table
- Return type:
pa.Table
- metaobjects: MetaObjects#
- property minimal_outputs: list[str]#
Return the minimal list of names of tables output by default.
- property outdata: Table#
The outdata dataset of a procedure.
Required to be set for a procedure to update the imputed_file on completion.
- Getter:
Returns the outdata dataset
- Setter:
Sets the outdata dataset
- Type:
pa.Table
- output_required(output_name: str) bool [source]#
Return true if the output is required for the current job step.
- property outstatus: Table#
The outstatus dataset for a Banff procedure.
Required to be set for a procedure to update the status_file dataset on completion.
- Getter:
Returns the outstatus dataset
- Setter:
Sets the outstatus dataset
- Type:
pa.Table
- pop_dataset(name: str) Dataset | None [source]#
Remove the
banffprocessor.util.dataset.Dataset
object found under name (case-insensitive) from the dataset collection and return the object if it exists.Returns None if the dataset is not found.
- Parameters:
name (str) – The name of the dataset to pop
- Returns:
The pop()’d dataset, None if not found
- Return type:
pd.DataFrame | None
- save_outputs() None [source]#
Save all current tables in _datasets to the output folder.
Each dataset’s name will be used as the filename, the extension is determined by the input parameters: The save_format input parameter is checked first, then the file type of indata. If neither parameter is provided or the filetype is unrecognized .parq is used by default.
- save_proc_output(banff_call: BanffProcedure, all_tables: list[str]) None [source]#
Save procedure output datasets.
Append the non-default tables, the list of which is set on processor_data by the main processor loop, from banff_call to their respective cumulative datasets in the dataset collection based on the output type parameters set in the input_params.
- Parameters:
banff_call (
banff.proc.BanffProcedure
) – A BanffProcedure call object that has completed execution and contains the output tables as attributes named as they are specified in processor_to_procall_tables (list[str]) – The list of all possible table names available on banff_call, required if process_output_type is ALL
- Raises:
MetadataConstraintError – If a user’s ProcessOutputs metadata contains an unrecognized output_name
AttributeError – If the processor_to_proc mapping contains a table name that is not found as an attribute on the banff_call object.
- set_dataset(name: str, ds: Table | DataFrame | Dataset) None [source]#
Set ds as an entry in the dataset collection under name (case-insensitive).
If a dataset under name already exists, ds will be saved as the current output version of the name dataset and will be used to update the cumulative version when the current job step completed. If name identifies a dataset that is produced by default, ds instead directly overwrites the cumulative version, and no current output version is saved.
- Parameters:
name (str) – The name of the dataset to set
ds (pyarrow.Table | pandas.DataFrame | banffprocessor.util.dataset.Dataset) – The dataset to set
- set_dataset_from_file(name: str, data_file: str | Path) None [source]#
Create a
pa.Table
from the data file at data_file and load it to the dataset collection under the name name (case-insensitive).If data_file only contains a filename and not a full path, the input parameters are used to determine the appropriate directory to look in. Any existing dataset of the same name is replaced.
- Parameters:
name (str) – The name to identify the resultant dataset by
data_file (str | Path) – The full filepath or just filename of the data file to create a dataset from
- property status_file: Table#
The status_file dataset for a Banff job.
- Getter:
Returns the status_file dataset
- Setter:
Sets the status_file dataset
- Type:
pa.Table
- total_job_duration: datetime#
- property total_job_steps: int | None#
Return the total number of job steps for the current job.
- update_cumulatives(pre_execute_tables: set[str]) None [source]#
Update every non-default dataset in the dataset collection with its respective ds_curr_output version, if the output is required by the configured input parameters and metadata.
- Parameters:
pre_execute_tables (set[str]) – The set of table names in the dataset collection prior to the execution of the current job step
- update_file_all(dataset: str | Table | Dataset, dataset_all_name: str | None = None) None [source]#
Update the respective cumulative version of dataset using the contents of the non-cumulative version after adding the current jobid and seqno to it.
If dataset is given as a Dataset object or the name of the object to fetch, the object may contain both the cumulative and curr_output versions. If it only contains the cumulative version, the jobid and seqno columns are still added to it if they are not yet present. As well in this case, the jobid and seqno column additions will be saved back to the non-cumulative dataset if dataset_all_name is seperately provided. Otherwise the changes to the non-cumulative dataset are discarded and only the updated cumulative version is saved.
- Parameters:
dataset – Either the name of a dataset to fetch, the Dataset object itself or a specific
pyarrow table. If provided as a pyarrow table no changes are saved back to this dataset and dataset_all_name must be provided as well. :type dataset: str | pa.Table | banffprocessor.util.dataset.Dataset :param dataset_all_name: The name of the dataset to update using dataset_name.
If not provided the non-cumulative dataset’s name is used, defaults to None
- Raises:
ValueError – If dataset is given as a pa.Table but no dataset_all_name is given.
- update_imputed_file() None [source]#
Implement non-‘prebanff’ section of updateImputedFile.sas.
Updates the imputed_file with the data found in outdata. Does not add any records from outdata without a matching index value in imputed_file, only updates matching records. imputed_file is updated inplace.
- update_status() None [source]#
Append outstatus dataset to status_file and updates status_file where outstatus shares the same index values.
Also calls update_file_all() to update status_log with outstatus.
- update_with_block_result(block_data: ProcessorData) None [source]#
Update this ProcessorData object with the datasets found in block_data as it is upon completion of a JOB process which calls a Process Block.
- Parameters:
block_data (ProcessorData) – The ProcessorData object containing the results from a Process Block execution.
- banffprocessor.processor_data.ds_to_format(ds: Table | DataFrame, ret_format: str) Table | DataFrame | None [source]#
Convert ds to format.
- ret_format currently supports these values:
For Pandas DataFrame: [“pandas”, “dataframe”, “pandas dataframe”] For PyArrow Table: [“pyarrow”, “table”, “pyarrow table”]
- Parameters:
ds (pa.Table | pd.DataFrame | Dataset) – The dataset to convert
ret_format (str) – The name of the format to convert to.
- Raises:
Exception – If an error occurs during the conversion of the file.
ValueError – If ret_format is not a recognized format string.
- Returns:
A new dataset containing the data of ds converted into ret_format, None if ds is None
- Return type:
pa.Table | pd.DataFrame | None
banffprocessor.processor_input module#
- class banffprocessor.processor_input.ProcessOutputType(*values)[source]#
Bases:
Enum
Represents the different sets of outputs the processor should be creating for a job.
- ALL = 2#
- CUSTOM = 3#
- MINIMAL = 1#
- class banffprocessor.processor_input.ProcessorInput(job_id: str, unit_id: str | None = None, input_folder: str | Path | None = None, indata_filename: str | Path | None = None, auxdata_filename: str | Path | None = None, indata_aux_filename: str | Path | None = None, histdata_filename: str | Path | None = None, indata_hist_filename: str | Path | None = None, histstatus_filename: str | Path | None = None, instatus_hist_filename: str | Path | None = None, instatus_filename: str | Path | None = None, user_plugins_folder: str | Path | None = None, metadata_folder: str | Path | None = None, process_output_type: str | int | None = None, seed: int | None = None, no_by_stats: str | bool | None = None, randnumvar: str | None = None, save_format: list[str] | None = None, output_folder: str | Path | None = None, log_level: int | None = None)[source]#
Bases:
object
Holds input parameters from a user’s JSON input file used to configure a Banff Processor job.
- classmethod from_file(filepath: str | Path) ProcessorInput [source]#
Initialize a
src.banffprocessor.processor_input.ProcessorInput
object from a JSON file.- Parameters:
filepath (str | Path) – the full path to the JSON file containing the input parameters required to run the processor.
- Raises:
ProcessorInputParameterError – If the filepath does not contain a valid directory or JSON filename
FileNotFoundError – If the file at filepath is not able to be found
- Returns:
The object loaded from the parameters in the JSON file
- Return type:
src.banffprocessor.processor_input.ProcessorInput
- banffprocessor.processor_input.get_path_val(fpath: str | Path | None) Path | None [source]#
Return the Path representation of fpath or None if fpath is None or empty.
- Parameters:
fpath (str | Path | None) – The filepath to convert
- Returns:
The Path representation of fpath
- Return type:
Path | None
- banffprocessor.processor_input.get_string_param_value(parameter_to_check: str | None) str | None [source]#
Process string values from the parameter file.
- Parameters:
parameter_to_check (str | None) – The string value to process
- Returns:
None if parameter_to_check is None, empty or only whitespace and the original string with whitespace trimmed from beginning and end if not
- Return type:
str | None
Module contents#
- banffprocessor.set_language(new_lang: SupportedLanguage = SupportedLanguage.en) None [source]#
Set the languaged used for console and log messages.
For new_lang, specify a value from enum banffprocessor.SupportedLanguage.