Source code for banff.proc.proc_donorimp

import logging
from pathlib import Path

import pandas
import pyarrow

from banff.io_util import (
    GensysInputDataset,
    GensysOutputDataset,
    c_argtype_input_dataset,
    c_argtype_output_dataset,
    c_argtype_parameters,
    flag_rows_where,
)
from banff.nls import _
from banff.proc import BanffProcedure


#******CLASS DEFINITIONS************************************************************
[docs] class ProcDonorimp(BanffProcedure): """Performs nearest neighbour donor imputation such that each imputed record satisfies the specified post-imputation edits. The donorimp procedure splits records into recipients (records requiring imputation) and donors (records not requiring imputation that pass the edits). For each recipient, the procedure performs the following steps: 1. From the fields in the edits, a subset are chosen as matching fields for the purpose of distance calculations. This selection can vary depending on which fields require imputation. Users can also specify must-match fields (`must_match`), which will automatically be included in distance calculations alongside the system-selected ones. 2. Matching fields are transformed into normalized ranks to remove the effect of scale and clustering from the data. Without this transformation, original data with wide ranges, such as dollar values, would always dominate the distance calculation. 3. Distances between the recipient and donors are calculated using a L-infinity norm on the transformed matching fields. This is sometimes referred to as the minimax distance because the closest donor is the one with the smallest maximum absolute difference between the transformed values of its matching fields and those of the recipient. 4. From the donors, a search algorithm is used to efficiently find the closest donor whose values allow the recipient record to pass the user-specified post-imputation edits (`post_edits`). These are typically a more relaxed form of the edits to ensure a donor can be found. *Note: The Banff distance metric will usually select different donors than a typical Euclidean distance metric. This is by design. When using Euclidean distance metrics, scale differences and skewed distributions in economic data typically result in a distance metric that is dominated by a single field such as revenue. The Banff distance metric ensures that all matching fields are given the same weight in the distance calculation.* Recipients are defined as any record with at least one field within the edits requiring imputation, as indicated by an FTI (Field to Impute) flag on the input status (`instatus`) file. Donors are defined as any record satisfying all the edits that is not a recipient. The donorimp procedure requires a set of edits; for a version of donor imputation that does not, please see the `massimp` procedure. There are a number of ways to exclude records or values from the donor pool. Records can be excluded using the `exclude_where_indata` or `data_excl_var` parameters. This does not exclude them from the procedure completely; they may still be included as recipients if they require imputation. Records that have previously been imputed can also be excluded from the donor pool using the `eligdon` (eligible donor) parameter. The parameter `n_limit` will limit the number of times a single donor is used for imputation. Users may sometimes identify values that do not require imputation, but are sufficiently unusual that they should not be donated to other records; these should be flagged as FTE (Field to Exclude) on the instatus file. The Banff distance metric does not accommodate categorical variables. Instead, users may create by-groups by specifying `by` variables. These by-groups act as imputation classes. Use the `min_donors` and `percent_donors` parameters to ensure an appropriate number or ratio of recipients and donors exist in each imputation class before performing imputation. """ # static variables _proc_name = {"short": "donorimp", "long": "Donor Imputation"} _arg_types = [ c_argtype_parameters(), # parameters c_argtype_input_dataset(), # indata c_argtype_input_dataset(), # instatus c_argtype_output_dataset(), # outdata c_argtype_output_dataset(), # outdata c_argtype_output_dataset(), # outstatus c_argtype_output_dataset(), # outstatus c_argtype_output_dataset(), # outdonormap c_argtype_output_dataset(), # outdonormap c_argtype_output_dataset(), # outmatching_fields c_argtype_output_dataset(), # outmatching_fields ] def __init__(self, # USER C code parameters unit_id: str | None = None, by: str | None = None, must_match: str | None = None, data_excl_var: str | None = None, rand_num_var: str | None = None, random: bool | None = None, seed: int | None = None, edits: str | None = None, post_edits: str | None = None, display_level: int | None = None, accept_negative: bool | None = None, no_by_stats: bool | None = None, min_donors: int | None = None, percent_donors: float | None = None, n: int | None = None, eligdon: str | None = None, n_limit: int | None = None, mrl: float | None = None, # USER dataset references indata: pyarrow.Table | pandas.DataFrame | Path | str | None = None, instatus: pyarrow.Table | pandas.DataFrame | Path | str | None = None, outdata: Path | str | None = None, outstatus: Path | str | None = None, outdonormap: Path | str | None = None, outmatching_fields: Path | str | None = None, # Fancy New Options presort: bool | None = None, prefill_by_vars: bool | None = None, exclude_where_indata: str | None = None, # super class options trace: int | bool | None = None, capture: bool | None = False, logger: logging.Logger | None = None, **kwargs, ): """Performs nearest neighbour donor imputation such that each imputed record satisfies the specified post-imputation edits. :param unit_id: Identify key variable (unit identifier) on indata. Mandatory. :type unit_id: str | None, optional :param by: Variable(s) used to partition indata into by-groups for independent processing. :type by: str | None, optional :param must_match: List of variables on indata to be used in distance calculations. :type must_match: str | None, optional :param data_excl_var: Identify donor exclusion variable on indata. :type data_excl_var: str | None, optional :param rand_num_var: Identify random number variable on indata. :type rand_num_var: str | None, optional :param random: Randomly select donors for recipients without matching fields. :type random: bool | None, optional :param seed: Specify a seed for the random number generator. :type seed: int | None, optional :param edits: List of consistency edits. Mandatory. :type edits: str | None, optional :param post_edits: List of edits a recipient must satisfy after imputation. :type post_edits: str | None, optional :param display_level: Value (0 or 1) to request detail output to the log in relation to the random number variable. Default=0. :type display_level: int | None, optional :param accept_negative: Treat negative values as valid. Default=False. :type accept_negative: bool | None, optional :param no_by_stats: Reduces log output by suppressing by-group specific messages. Default=False. :type no_by_stats: bool | None, optional :param min_donors: Minimum number of donors needed in a by-group to perform imputation. :type min_donors: int | None, optional :param percent_donors: Minimum percentage of donors needed in a by-group to perform imputation. :type percent_donors: float | None, optional :param n: Maximum number of donors to try for a given recipient. :type n: int | None, optional :param eligdon: Identify eligible donors as 'ANY' (include all potential donors) or 'ORIGINAL' (exclude records with imputed values). :type eligdon: str | None, optional :param n_limit: Limit the number of times a donor can be used. :type n_limit: int | None, optional :param mrl: Multiplier ratio limit used to limit number of times a donor can be used. :type mrl: float | None, optional :param indata: Input statistical data. Mandatory. :type indata: pyarrow.Table | pandas.DataFrame | Path | str | None, optional :param instatus: Input status file containing FTI, FTE and I-- status flags. Mandatory. :type instatus: pyarrow.Table | pandas.DataFrame | Path | str | None, optional :param outdata: Output statistical table containing imputed data. :type outdata: Path | str | None, optional :param outstatus: Output status file identifying imputed fields with IDN status flags, and their values after imputation. :type outstatus: Path | str | None, optional :param outdonormap: Output table of recipient-donor pairs for successfully imputed records. :type outdonormap: Path | str | None, optional :param outmatching_fields: Output table of matching fields for each recipient. :type outmatching_fields: Path | str | None, optional :param presort: Sorts input tables before processing, according to procedure requirements. Default=True. :type presort: bool | None, optional :param prefill_by_vars: Adds by-group variable(s) to input status file(s) to improve performance. Default=True. :type prefill_by_vars: bool | None, optional :param exclude_where_indata: Donor exclusion expression using SQL syntax. :type exclude_where_indata: str | None, optional :param trace: Control which log levels are included when using the default logger. :type trace: int | bool | None, optional :param capture: Configure how console output is displayed. :type capture: bool | None, optional :param logger: Custom logger to use for procedure execution. :type logger: logging.Logger | None, optional """ # noqa: D401,E501 # USER C code parameters parm_dict = {} parm_dict["unit_id"] = unit_id parm_dict["by"] = by parm_dict["must_match"] = must_match parm_dict["data_excl_var"] = data_excl_var parm_dict["rand_num_var"] = rand_num_var parm_dict["random"] = random parm_dict["seed"] = seed parm_dict["edits"] = edits parm_dict["post_edits"] = post_edits parm_dict["display_level"] = display_level parm_dict["accept_negative"] = accept_negative parm_dict["no_by_stats"] = no_by_stats parm_dict["min_donors"] = min_donors parm_dict["percent_donors"] = percent_donors parm_dict["n"] = n parm_dict["eligdon"] = eligdon parm_dict["n_limit"] = n_limit parm_dict["mrl"] = mrl self.c_parms = parm_dict # INTERNAL dataset components (they store USER datasets/output specifications) self._indata = GensysInputDataset("indata", indata) self._instatus = GensysInputDataset("instatus", instatus) self._outdata = GensysOutputDataset("outdata", outdata) self._outdonormap = GensysOutputDataset("outdonormap", outdonormap) self._outstatus = GensysOutputDataset("outstatus", outstatus) self._outmatching_fields = GensysOutputDataset("outmatching_fields", outmatching_fields, mandatory=False, requested_by_default=False) # call super constructor super().__init__( trace=trace, capture=capture, logger=logger, input_datasets=[ self._indata, self._instatus, ], output_datasets=[ self._outdata, self._outstatus, self._outdonormap, self._outmatching_fields, ], presort=presort, prefill_by_vars=prefill_by_vars, exclude_where_indata = exclude_where_indata, keyword_args=kwargs, ) ##### property methods @property def indata(self): return self._get_input_dataset(self._indata) @indata.setter def indata(self, value): self._set_input_dataset(ds=self._indata, value=value) @property def instatus(self): return self._get_input_dataset(self._instatus) @instatus.setter def instatus(self, value): self._set_input_dataset(ds=self._instatus, value=value) @property def outdata(self): return self._get_output_dataset(self._outdata) @outdata.setter def outdata(self, value): self._set_output_dataset(ds=self._outdata, value=value) @property def outstatus(self): return self._get_output_dataset(self._outstatus) @outstatus.setter def outstatus(self, value): self._set_output_dataset(ds=self._outstatus, value=value) @property def outmatching_fields(self): return self._get_output_dataset(self._outmatching_fields) @outmatching_fields.setter def outmatching_fields(self, value): self._set_output_dataset(ds=self._outmatching_fields, value=value) @property def outdonormap(self): return self._get_output_dataset(self._outdonormap) @outdonormap.setter def outdonormap(self, value): self._set_output_dataset(ds=self._outdonormap, value=value) def _call_c_code(self): return self._cproc_func( self._parm_dict, self._indata.c_arg, self._instatus.c_arg, self._outdata.c_schema, self._outdata.c_array, self._outstatus.c_schema, self._outstatus.c_array, self._outdonormap.c_schema, self._outdonormap.c_array, self._outmatching_fields.c_schema, self._outmatching_fields.c_array, ) def _pp_exclude_where_indata(self): """Flag rows for exclusion, if matching user-specified criteria. Adds "exclusion" flag to indata rows matching user-provided `exclude_where_indata`. Based on Banff Processor 1.x `generateDonorImputation.sas` """ self._indata.ds_intermediate, self.c_parms["data_excl_var"] = flag_rows_where( self._indata.ds_intermediate, where_stmt=self._exclude_where_indata, ) def _validate_deprecations(self, log, keyword_args): """Procedure specific check for use of deprecated options.""" super()._validate_deprecations(log=log, keyword_args=keyword_args) log_lcl = self._get_stack_logger(log) # Version 3.1.1b12: 'match_field_stat' flag replaced with optional output dataset 'outmatching_fields' if "match_field_stat" in keyword_args.keys(): mesg = _("Option `{}` is deprecated, use `{}` instead").format("match_field_stat", "outmatching_fields") log_lcl.error(mesg) raise DeprecationWarning(mesg)