Source code for banffprocessor.procedures.banff_procedures.prorate

import banff.exceptions
import pyarrow as pa
from banff import prorate
from banff._log import log_levels

# Import must be absolute in order to ensure all modules reference the same global _c_handlers
import banffprocessor.processor_logger as plg
from banffprocessor.exceptions import ProcessorInputParameterError
from banffprocessor.metadata.models.proratespecs import Proratespecs
from banffprocessor.nls import _
from banffprocessor.procedures import factory
from banffprocessor.processor_data import ProcessorData
from banffprocessor.util.dataset import table_empty

# Setup local log for processor module specifically
log_lcl = plg.get_processor_child_logger("prorate")

# Required metadata files = "proratespecs", "editgroups", "edits"
# Optional metadata files = "varlists"

[docs] class Prorate: """Implements the Prorate Banff procedure as a `:class:banffprocessor.procedures.procedure_interface`.""" output_tables: tuple[str] = ("outreject")
[docs] @classmethod def execute(cls, processor_data: ProcessorData) -> int: """Execute the banff.prorate call, and returns the results.""" # alias the param name to shorten references bp = processor_data job_step = bp.current_job_step prorate_spec = bp.metaobjects.get_specs_obj(Proratespecs, job_step.specid) status_file = bp.get_dataset("status_file") if((status_file is None or table_empty(status_file)) and prorate_spec.modifier.upper() in ["ORIGINAL", "IMPUTED"]): msg = _("Prorate requires an instatus file for using the 'ORIGINAL' or 'IMPUTED' " "modifiers. No status_file dataframe was generated by any previous steps " " and no status file was provided in the input JSON file. " "Jobid: {} and seqno: {}").format(job_step.jobid, job_step.seqno) log_lcl.exception(msg) raise ProcessorInputParameterError(msg) proc_edits = None if(job_step.editgroupid): proc_edits = bp.metaobjects.get_edits_string(job_step.editgroupid) # Imputed_File should always have data by this point, but we'll make sure to pass None # instead of an empty table to the banff call just to make sure we don't pass an empty table imputed_file = bp.get_dataset("imputed_file") # Form our Banff call try: banff_call = prorate( # Not supposed to provide these as False, only True or None accept_negative=job_step.acceptnegative, no_by_stats=bp.input_params.no_by_stats, prefill_by_vars=True, presort=True, lower_bound=prorate_spec.lowerbound, upper_bound=prorate_spec.upperbound, decimal=prorate_spec.decimal, edits=proc_edits if proc_edits else None, method=prorate_spec.method, modifier=prorate_spec.modifier, unit_id=bp.input_params.unit_id, by=" ".join(bp.by_varlist) if bp.by_varlist else None, indata=imputed_file if imputed_file is not None and not table_empty(imputed_file) else None, instatus=status_file if status_file is not None and not table_empty(status_file) else None, outstatus="pyarrow", outdata="pyarrow", outreject="pyarrow", # We want everything captured while an input param configures the handlers # which indirectly filter. trace=log_levels.NOTSET, # Note that capture=None will supress console output in new version so use False or omit logger=log_lcl, _BP_c_log_handlers=plg.get_c_handlers(), ) except banff.exceptions.ProcedureCError as e: msg = _("An error occured during execution of this procedure.") log_lcl.exception(msg) return e.return_code # Get the return code from the exception outreject = banff_call.outreject bp.outstatus = banff_call.outstatus bp.outdata = banff_call.outdata outreject_obj = bp.get_dataset("outreject", ds_format="object") if(outreject_obj is None): # No existing version, just save the new one curr_rejected = None bp.set_dataset("outreject", outreject) else: # Save the existing version for appending after updating outreject_all curr_rejected = outreject_obj.ds outreject_obj.ds = outreject # Update outreject_all using the new outreject. This will save jobid/seqno columns to both datasets bp.update_file_all("outreject", "outreject_all") # Append the new rejected data to the current rejected table for use in future procs if(curr_rejected is not None): # As update_file_all updates .ds on outreject_obj, when we reference it again here it should # be pointing to the updated version from update_file_all outreject_obj.ds = pa.concat_tables([curr_rejected, outreject_obj.ds], promote_options="default") return banff_call.rc
[docs] def register(factory: factory) -> None: """Register this procedure class in the Banff processor procedure factory.""" factory.register("prorate", Prorate)