Source code for banff.proc.proc_estimato

import logging
from pathlib import Path

import pandas
import pyarrow

from banff.io_util import (
    GensysInputDataset,
    GensysOutputDataset,
    c_argtype_input_dataset,
    c_argtype_output_dataset,
    c_argtype_parameters,
    flag_rows_where,
)
from banff.proc import BanffProcedure

#******CLASS DEFINITIONS************************************************************

[docs] class ProcEstimato(BanffProcedure): """Performs imputation using estimation functions and/or linear regression estimators. The estimator procedure offers imputation methods such as mean, ratio and regression imputation using current (`indata`) and/or historical data (`indata_hist`) for the variable to impute and potentially auxiliary variables. Users may choose from twenty (20) pre-defined imputation estimator algorithms that are included in the procedure, or define their own custom algorithms. Only fields with an FTI (Field to Impute) from the `instatus` file are imputed. Fields with FTE (Field to Excluded) or I-- (Imputed Field) flags are excluded from the imputation model. (Note that this does not include the flag IDE, which indicates deterministic imputation.) Estimator or linear regression parameters (e.g. means or regression coefficients) can be calculated on all records or on a particular subset of acceptable records. The restriction of the acceptable records can be applied using an exclusion parameter or by specifying by-groups imputation. """ # static variables _proc_name = {"short": "estimato", "long": "Estimator"} _arg_types = [ c_argtype_parameters(), # parameters c_argtype_input_dataset(), # indata c_argtype_input_dataset(), # instatus c_argtype_input_dataset(), # indata_hist c_argtype_input_dataset(), # instatus_hist c_argtype_input_dataset(), # inalgorithm c_argtype_input_dataset(), # inestimator c_argtype_output_dataset(), # outdata c_argtype_output_dataset(), # outdata c_argtype_output_dataset(), # outstatus c_argtype_output_dataset(), # outstatus c_argtype_output_dataset(), # outacceptable c_argtype_output_dataset(), # outacceptable c_argtype_output_dataset(), # outest_ef c_argtype_output_dataset(), # outest_ef c_argtype_output_dataset(), # outest_lr c_argtype_output_dataset(), # outest_lr c_argtype_output_dataset(), # outest_parm c_argtype_output_dataset(), # outest_parm c_argtype_output_dataset(), # outrand_err c_argtype_output_dataset(), # outrand_err ] def __init__(self, # USER C code parameters unit_id: str | None = None, by: str | None = None, data_excl_var: str | None = None, hist_excl_var: str | None = None, seed: int | None = None, verify_specs: bool | None = None, accept_negative: bool | None = None, no_by_stats: bool | None = None, # USER dataset references indata: pyarrow.Table | pandas.DataFrame | Path | str | None = None, instatus: pyarrow.Table | pandas.DataFrame | Path | str | None = None, indata_hist: pyarrow.Table | pandas.DataFrame | Path | str | None = None, inalgorithm: pyarrow.Table | pandas.DataFrame | Path | str | None = None, inestimator: pyarrow.Table | pandas.DataFrame | Path | str | None = None, instatus_hist: pyarrow.Table | pandas.DataFrame | Path | str | None = None, outstatus: Path | str | None = None, outdata: Path | str | None = None, outacceptable: Path | str | None = None, outest_ef: Path | str | None = None, outest_lr: Path | str | None = None, outest_parm: Path | str | None = None, outrand_err: Path | str | None = None, # Fancy New Options presort: bool | None = None, prefill_by_vars: bool | None = None, exclude_where_indata: str | None = None, exclude_where_indata_hist: str | None = None, # super class options trace: int | bool | None = None, capture: bool | None = False, logger: logging.Logger | None = None, **kwargs, ): """Performs imputation using estimation functions and/or linear regression estimators. :param unit_id: Identify key variable (unit identifier) on indata and indata_hist. Mandatory. :type unit_id: str | None, optional :param by: Variable(s) used to partition indata into by-groups for independent processing. :type by: str | None, optional :param data_excl_var: Variable of the input table used to exclude observations from the set of acceptable observations. :type data_excl_var: str | None, optional :param hist_excl_var: Variable of the historical input table used to exclude historical observations from the set of acceptable observations. :type hist_excl_var: str | None, optional :param seed: Specify the root for the random number generator. :type seed: int | None, optional :param verify_specs: Estimator specifications verified without running the imputation. :type verify_specs: bool | None, optional :param accept_negative: Treat negative values as valid. Default=False. :type accept_negative: bool | None, optional :param no_by_stats: Reduce log output by suppressing by-group specific messages. Default=False. :type no_by_stats: bool | None, optional :param indata: Input statistical data. Mandatory. :type indata: pyarrow.Table | pandas.DataFrame | Path | str | None, optional :param instatus: Input status file containing FTI, FTE and I-- status flags. Mandatory. :type instatus: pyarrow.Table | pandas.DataFrame | Path | str | None, optional :param indata_hist: Input historical data. :type indata_hist: pyarrow.Table | pandas.DataFrame | Path | str | None, optional :param inalgorithm: User defined algorithms table. :type inalgorithm: pyarrow.Table | pandas.DataFrame | Path | str | None, optional :param inestimator: Estimator specifications table. Mandatory. :type inestimator: pyarrow.Table | pandas.DataFrame | Path | str | None, optional :param instatus_hist: Input historical status file containing FTI, FTE and I-- status flags. :type instatus_hist: pyarrow.Table | pandas.DataFrame | Path | str | None, optional :param outstatus: Output status file identifying imputed fields with I-- status flags, and their values after imputation. :type outstatus: Path | str | None, optional :param outdata: Output statistical table containing imputed data. :type outdata: Path | str | None, optional :param outacceptable: Report on acceptable observations retained to calculate parameters for each estimator. :type outacceptable: Path | str | None, optional :param outest_ef: Report on calculation of averages for estimator functions. :type outest_ef: Path | str | None, optional :param outest_lr: Report on calculation of « beta » coefficients for linear regression estimators (type LR). :type outest_lr: Path | str | None, optional :param outest_parm: Report on imputation statistics by estimator. :type outest_parm: Path | str | None, optional :param outrand_err: Random error report when a random error is added to the imputed variable. :type outrand_err: Path | str | None, optional :param presort: Sort input tables before processing, according to procedure requirements. Default=True. :type presort: bool | None, optional :param prefill_by_vars: Add by-group variable(s) to input status file(s) to improve performance. Default=True. :type prefill_by_vars: bool | None, optional :param exclude_where_indata: Exclusion expression using SQL syntax to specify which observations to exclude from the set of acceptable observations. :type exclude_where_indata: str | None, optional :param exclude_where_indata_hist: Exclusion expression using SQL syntax to specify which historical observations to exclude from the set of acceptable observations. :type exclude_where_indata_hist: str | None, optional :param trace: Control which log levels are included when using the default logger. :type trace: int | bool | None, optional :param capture: Configure how console output is displayed. :type capture: bool | None, optional :param logger: Custom logger to use for procedure execution. :type logger: logging.Logger | None, optional """ # noqa: D401,E501 # USER C code parameters parm_dict = {} parm_dict["unit_id"] = unit_id parm_dict["by"] = by parm_dict["data_excl_var"] = data_excl_var parm_dict["hist_excl_var"] = hist_excl_var parm_dict["seed"] = seed parm_dict["verify_specs"] = verify_specs parm_dict["accept_negative"] = accept_negative parm_dict["no_by_stats"] = no_by_stats self.c_parms = parm_dict # INTERNAL dataset components (they store USER datasets/output specifications) self._inalgorithm = GensysInputDataset("inalgorithm", inalgorithm) self._indata = GensysInputDataset("indata", indata) self._indata_hist = GensysInputDataset("indata_hist", indata_hist) self._inestimator = GensysInputDataset("inestimator", inestimator) self._instatus = GensysInputDataset("instatus", instatus) self._instatus_hist = GensysInputDataset("instatus_hist", instatus_hist) if verify_specs is True: # all output datasets (including mandatory ones) disabled when `verify_specs` is `True` self._outacceptable = GensysOutputDataset("outacceptable", output_specification=False, mandatory=False) self._outdata = GensysOutputDataset("outdata", output_specification=False, mandatory=False) self._outest_ef = GensysOutputDataset("outest_ef", output_specification=False, mandatory=False) self._outest_lr = GensysOutputDataset("outest_lr", output_specification=False, mandatory=False) self._outest_parm = GensysOutputDataset("outest_parm", output_specification=False, mandatory=False) self._outrand_err = GensysOutputDataset("outrand_err", output_specification=False, mandatory=False) self._outstatus = GensysOutputDataset("outstatus", output_specification=False, mandatory=False) else: self._outacceptable = GensysOutputDataset("outacceptable", outacceptable, mandatory=False) self._outdata = GensysOutputDataset("outdata", outdata) self._outest_ef = GensysOutputDataset("outest_ef", outest_ef, mandatory=False) self._outest_lr = GensysOutputDataset("outest_lr", outest_lr, mandatory=False) self._outest_parm = GensysOutputDataset("outest_parm", outest_parm, mandatory=False) self._outrand_err = GensysOutputDataset("outrand_err", outrand_err, mandatory=False) self._outstatus = GensysOutputDataset("outstatus", outstatus) # call super constructor super().__init__( trace=trace, capture=capture, logger=logger, input_datasets=[ self._indata, self._instatus, self._indata_hist, self._inalgorithm, self._inestimator, self._instatus_hist, ], output_datasets=[ self._outstatus, self._outdata, self._outacceptable, self._outest_ef, self._outest_lr, self._outest_parm, self._outrand_err, ], presort=presort, prefill_by_vars=prefill_by_vars, exclude_where_indata = exclude_where_indata, exclude_where_indata_hist = exclude_where_indata_hist, keyword_args=kwargs, ) ##### property methods @property def indata(self): return self._get_input_dataset(self._indata) @indata.setter def indata(self, value): self._set_input_dataset(ds=self._indata, value=value) @property def instatus(self): return self._get_input_dataset(self._instatus) @instatus.setter def instatus(self, value): self._set_input_dataset(ds=self._instatus, value=value) @property def indata_hist(self): return self._get_input_dataset(self._indata_hist) @indata_hist.setter def indata_hist(self, value): self._set_input_dataset(ds=self._indata_hist, value=value) @property def inalgorithm(self): return self._get_input_dataset(self._inalgorithm) @inalgorithm.setter def inalgorithm(self, value): self._set_input_dataset(ds=self._inalgorithm, value=value) @property def inestimator(self): return self._get_input_dataset(self._inestimator) @inestimator.setter def inestimator(self, value): self._set_input_dataset(ds=self._inestimator, value=value) @property def instatus_hist(self): return self._get_input_dataset(self._instatus_hist) @instatus_hist.setter def instatus_hist(self, value): self._set_input_dataset(ds=self._instatus_hist, value=value) @property def outstatus(self): return self._get_output_dataset(self._outstatus) @outstatus.setter def outstatus(self, value): self._set_output_dataset(ds=self._outstatus, value=value) @property def outdata(self): return self._get_output_dataset(self._outdata) @outdata.setter def outdata(self, value): self._set_output_dataset(ds=self._outdata, value=value) @property def outacceptable(self): return self._get_output_dataset(self._outacceptable) @outacceptable.setter def outacceptable(self, value): self._set_output_dataset(ds=self._outacceptable, value=value) @property def outest_ef(self): return self._get_output_dataset(self._outest_ef) @outest_ef.setter def outest_ef(self, value): self._set_output_dataset(ds=self._outest_ef, value=value) @property def outest_lr(self): return self._get_output_dataset(self._outest_lr) @outest_lr.setter def outest_lr(self, value): self._set_output_dataset(ds=self._outest_lr, value=value) @property def outest_parm(self): return self._get_output_dataset(self._outest_parm) @outest_parm.setter def outest_parm(self, value): self._set_output_dataset(ds=self._outest_parm, value=value) @property def outrand_err(self): return self._get_output_dataset(self._outrand_err) @outrand_err.setter def outrand_err(self, value): self._set_output_dataset(ds=self._outrand_err, value=value) def _call_c_code(self): return self._cproc_func( self._parm_dict, self._indata.c_arg, self._instatus.c_arg, self._indata_hist.c_arg, self._instatus_hist.c_arg, self._inalgorithm.c_arg, self._inestimator.c_arg, self._outdata.c_schema, self._outdata.c_array, self._outstatus.c_schema, self._outstatus.c_array, self._outacceptable.c_schema, self._outacceptable.c_array, self._outest_ef.c_schema, self._outest_ef.c_array, self._outest_lr.c_schema, self._outest_lr.c_array, self._outest_parm.c_schema, self._outest_parm.c_array, self._outrand_err.c_schema, self._outrand_err.c_array, ) def _pp_exclude_where_indata(self): """Flag rows for exclusion, if matching user-specified criteria. Adds "exclusion" flag to indata rows matching user-provided `exclude_where_indata`. Based on Banff Processor 1.x `generateDonorImputation.sas` """ self._indata.ds_intermediate, self.c_parms["data_excl_var"] = flag_rows_where( self._indata.ds_intermediate, where_stmt=self._exclude_where_indata, ) def _pp_exclude_where_indata_hist(self): """Flag rows for exclusion, if matching user-specified criteria. Adds "exclusion" flag to indata_hist rows matching user-provided `exclude_where_indata_hist`. Based on Banff Processor 1.x `generateEstimator.sas` """ self._indata_hist.ds_intermediate, self.c_parms["hist_excl_var"] = flag_rows_where( self._indata_hist.ds_intermediate, where_stmt=self._exclude_where_indata_hist, )