import copy
from datetime import datetime
from pathlib import Path
from typing import ClassVar
import duckdb
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from banff import io_util
from banff.io_util import handle_arrow_string_data
from banff.io_util import type_converters as tc
from banff.proc import BanffProcedure
import banffprocessor.processor_logger as plg
from banffprocessor.exceptions import MetadataConstraintError, ProcessControlError, ProcessorInputParameterError, UserDefinedPluginError
from banffprocessor.metadata.metaobjects import MetaObjects
from banffprocessor.metadata.models.jobs import Jobs
from banffprocessor.metadata.models.processcontrols import ProcessControlType
from banffprocessor.nls import _
from banffprocessor.processor_input import ProcessorInput
from banffprocessor.util.dataset import Dataset, add_single_value_column, copy_table, get_dataset_real_name, table_empty
log_lcl = plg.get_processor_child_logger("processor_data")
[docs]
class ProcessorData:
"""Helper class for the Processor in order to allow easier passing of data to user procs.
:attribute _datasets: A collection of datasets for the current job
:type _datasets: dict[str, :class:`banffprocessor.util.dataset.Dataset`]
:attribute input_params: The object containing the input parameters from the user's input JSON file
:type input_params: :class:`src.banffprocessor.processor_input.ProcessorInput`
:attribute metaobjects: The collection of metadata objects
:type metaobjects: :class:`src.banffprocessor.metadata.MetaObjects`
:attribute by_varlist: The list of by variable fieldIDs for the current job step
:type by_varlist: list[str]
:attribute current_job_step: The :class:`src.banffprocessor.metadata.models.Jobs` object for the
current job step being run
:type current_job_step: :class:`src.banffprocessor.metadata.models.Jobs`
:attribute current_uservars: A mapping of `var` to `value` attributes for the
:class:`src.banffprocessor.metadata.models.Uservars` objects for the current job step
:type current_uservars: dict[str,str]
:attribute custom_outputs: The table names that are requested to be output from a proc used in the job
:type custom_outputs: list[str]
:attribute total_job_duration: Used for tracking runtime when performing execute()
:type total_job_duration: datetime
:attribute curr_step_count: The current step in the absolute sequence of all job steps.
:type curr_step_count: int
"""
_datasets: dict[str, Dataset]
input_params: ProcessorInput
metaobjects: MetaObjects
by_varlist: list[str]
current_job_step: Jobs
current_uservars: dict[str,str]
custom_outputs: list[str]
_minimal_outputs: ClassVar[list[str]] = ["imputed_file", "status_file", "status_log",
"outreject", "outreject_all", "time_store"]
total_job_duration: datetime
curr_step_count: int
@property
def total_job_steps(self) -> int | None:
"""Return the total number of job steps for the current job."""
return self.metaobjects.total_job_steps if self.metaobjects else None
@property
def dbconn(self) -> duckdb.DuckDBPyConnection:
"""Return a connection to the database being used."""
return self._dbconn
@property
def minimal_outputs(self) -> list[str]:
"""Return the minimal list of names of tables output by default."""
return self._minimal_outputs
@property
def datasets(self) -> dict[str, Dataset]:
"""Return a dictionary of datasets being tracked by the ProcessorData object."""
return self._datasets
def __init__(self, input_params: ProcessorInput | None = None,
dbconn: duckdb.DuckDBPyConnection | None = None) -> None:
"""Construct an instance of ProcessorData.
:param input_params: The object containing the input parameters from the user's
input JSON file
:type input_params: :class:`src.banffprocessor.processor_input.ProcessorInput`
"""
self.input_params = input_params
self.by_varlist = []
self._datasets = {}
self._filtered_data = {}
self.current_job_step = None
self.metaobjects = None
self.custom_outputs = None
self.total_job_duration = None
# We increment the step count at the beginning of every job step in the execute() loop
self.curr_step_count = 0
self._dbconn = dbconn
if self._dbconn is None:
self._dbconn = duckdb.connect(database=":memory:")
self._is_private_connection = True
else:
self._is_private_connection = False
def __del__(self) -> None:
del self.metaobjects
if self._is_private_connection:
self._dbconn.close()
@property
def indata(self) -> pa.Table:
"""The indata dataset for a Banff procedure.
:getter: Returns the imputed_file input dataset
:setter: Sets the imputed_file input dataset
:type: pa.Table
"""
return self.get_dataset("indata", create_if_not_exist=True)
@indata.setter
def indata(self, value: pa.Table | pd.DataFrame | Dataset) -> None:
self.set_dataset("indata", value)
# instatus has no setter method as is is only intended
# to provide easy access for reading and producing a different output.
@property
def instatus(self) -> pa.Table:
"""The instatus dataset for a Banff procedure.
:getter: Returns the status_file dataset
:type: pa.Table
"""
return self.status_file
@instatus.setter
def instatus(self, value: pa.Table | pd.DataFrame | Dataset) -> None:
self.status_file = value
@property
def status_file(self) -> pa.Table:
"""The status_file dataset for a Banff job.
:getter: Returns the status_file dataset
:setter: Sets the status_file dataset
:type: pa.Table
"""
return self.get_dataset("status_file", create_if_not_exist=True)
@status_file.setter
def status_file(self, value: pa.Table | pd.DataFrame | Dataset) -> None:
self.set_dataset("status_file", value)
# Outdata and Outstatus are only for outputs from user procs, otherwise the
# built-in banff procs update imputed_file/status_file directly and
# save their respective outdata/status files as specially named files
@property
def outdata(self) -> pa.Table:
"""The outdata dataset of a procedure.
Required to be set for a procedure to update the imputed_file on completion.
:getter: Returns the outdata dataset
:setter: Sets the outdata dataset
:type: pa.Table
"""
return self.get_dataset("outdata")
@outdata.setter
def outdata(self, value: pa.Table | pd.DataFrame) -> None:
self.set_dataset("outdata", value)
@property
def outstatus(self) -> pa.Table:
"""The outstatus dataset for a Banff procedure.
Required to be set for a procedure to update the status_file dataset on completion.
:getter: Returns the outstatus dataset
:setter: Sets the outstatus dataset
:type: pa.Table
"""
return self.get_dataset("outstatus")
@outstatus.setter
def outstatus(self, value: pa.Table | pd.DataFrame) -> None:
self.set_dataset("outstatus", value)
[docs]
def output_required(self, output_name : str) -> bool:
"""Return true if the output is required for the current job step."""
std_name = get_dataset_real_name(output_name)
return (std_name in self.minimal_outputs or self.input_params.output_all or
(std_name in self.custom_outputs))
[docs]
def get_dataset(self, name: str, create_if_not_exist: bool = False,
ds_format: str = "pyarrow") -> pa.Table | pd.DataFrame | Dataset | None:
"""Get a dataset by `name` (case-insensitive) in format `ds_format`.
If no dataset is found returns `None` unless `create_if_not_exist` is True in which case a
new dataset is created under the name `name` with an empty pyarrow Table. This new Table
is then returned.
:param name: The name of the dataset to retrieve
:type name: str
:param create_if_not_exist: Creates a dataset with `name` if no existing one
was found, defaults to False
:type create_if_not_exist: bool, optional
:param ds_format: A string with the name of the format to return the dataset in.
Possible options are "pyarrow" for a pyarrow Table, "pandas" for a pandas
DataFrame, and "object" for a :class:`banffprocessor.util.Dataset` object, defaults to "pyarrow"
:type ds_format: str, optional
:return: The dataset with `name` if it is found or `create_if_not_exist` is set,
`None` if not
:rtype: `pa.Table` | `pd.DataFrame` | banffprocessor.util.dataset.Dataset | None
"""
std_name = get_dataset_real_name(name)
# Load a Historic/Auxillary file if one was provide but it has not yet been read from disk
if(std_name not in self._datasets):
if(std_name == "indata_hist" and self.input_params.indata_hist_filename is not None):
self.set_dataset_from_file(std_name, self.input_params.indata_hist_filename)
elif(std_name == "indata_aux" and self.input_params.indata_aux_filename is not None):
self.set_dataset_from_file(std_name, self.input_params.indata_aux_filename)
elif(std_name == "instatus_hist" and self.input_params.instatus_hist_filename is not None):
self.set_dataset_from_file(std_name, self.input_params.instatus_hist_filename)
if(std_name in self._datasets):
ds_obj = self._datasets[std_name]
if(ds_format != "object"):
dataset = ds_obj.ds_filtered if ds_obj.ds_filtered is not None else ds_obj.ds
elif(create_if_not_exist):
# No existing dataset was found but we were told to make one.
# So add a new entry with an empty Table
dataset = pa.table([])
ds_obj = Dataset(std_name, dataset, dbconn=self.dbconn)
self.set_dataset(std_name, ds_obj)
else:
dataset = None
ds_obj = None
return ds_obj if ds_format=="object" else ds_to_format(dataset, ds_format)
[docs]
def set_dataset(self, name: str, ds: pa.Table | pd.DataFrame | Dataset) -> None:
"""Set `ds` as an entry in the dataset collection under `name` (case-insensitive).
If a dataset under `name` already exists, `ds` will be saved as the current output
version of the `name` dataset and will be used to update the cumulative version
when the current job step completed. If `name` identifies a dataset that is produced
by default, `ds` instead directly overwrites the cumulative version, and no current
output version is saved.
:param name: The name of the dataset to set
:type name: str
:param ds: The dataset to set
:type ds: `pyarrow.Table` | `pandas.DataFrame` | banffprocessor.util.dataset.Dataset
"""
std_name = get_dataset_real_name(name)
if(isinstance(ds, Dataset)):
# If given as an object, just overwrite existing
# Assumes that the underlying dataset does not need any preparation
self._datasets[std_name] = ds
else:
# Ensuring that the datasets are using type string for character data.
# Sometimes pandas sets the type as large_string
dataset: pa.Table = handle_arrow_string_data(ds_to_format(ds, "pyarrow"), dest_type=pa.string())
if(std_name in self._datasets):
curr_ds = self._datasets[std_name]
if(curr_ds.ds_filtered is not None):
msg = _("The Banff Processor does not currently support direct updates "
"made to any datasets with a process control applied to them.")
log_lcl.exception(msg)
raise UserDefinedPluginError(msg)
if(std_name in self.minimal_outputs):
# Minimal outputs just get overwritten, not updated using ds_curr_output
curr_ds.ds = dataset
else:
# If the dataset already exists as an object, add this as the current output,
# which will be used to update the cumulative version after the proc completes
curr_ds.ds_curr_output = dataset
else:
self._datasets[std_name] = Dataset(std_name, dataset, dbconn=self.dbconn)
[docs]
def set_dataset_from_file(self, name: str, data_file: str | Path) -> None:
"""Create a :class:`pa.Table` from the data file at `data_file` and load it to the
dataset collection under the name `name` (case-insensitive).
If `data_file` only contains a filename and not a full path, the input parameters are
used to determine the appropriate directory to look in. Any existing dataset of the
same name is replaced.
:param name: The name to identify the resultant dataset by
:type name: str
:param data_file: The full filepath or just filename of the data file to create
a dataset from
:type data_file: str | Path
"""
self.set_dataset(name, self.load_dataset_from_file(data_file))
[docs]
def pop_dataset(self, name: str) -> Dataset | None:
"""Remove the :class:`banffprocessor.util.dataset.Dataset` object found under
`name` (case-insensitive) from the dataset collection and return the object if it exists.
Returns `None` if the dataset is not found.
:param name: The name of the dataset to pop
:type name: str
:return: The pop()'d dataset, `None` if not found
:rtype: pd.DataFrame | None
"""
name_lower = get_dataset_real_name(name)
ds = self._datasets.pop(name_lower, None)
if(ds):
# We want to make sure we unregister now. If it's done in the Dataset
# __del__() method, we can't guarantee de-registration until the object is GC'd
ds.unregister_table()
return ds
[docs]
def clear_filtered_data(self) -> None:
"""Clear all filtered datasets off of the dataset objects in the _dataset collection.
Use once a proc has finished executing as we no longer need the filtered data.
"""
for ds in self._datasets.values():
ds.ds_filtered = None
[docs]
def update_cumulatives(self, pre_execute_tables: set[str]) -> None:
"""Update every non-default dataset in the dataset collection with its
respective ds_curr_output version, if the output is required by the configured
input parameters and metadata.
:param pre_execute_tables: The set of table names in the dataset collection prior
to the execution of the current job step
:type pre_execute_tables: set[str]
"""
# We will be popping off datasets so convert to a list first
for name, ds_obj in list(self.datasets.items()):
# Remove the aux and hist conditions if we want those files wiped from memory after use
if(name not in self.minimal_outputs and ("_aux" not in name and "_hist" not in name)):
if(self.output_required(name)):
# The current proc requests this file, either via ALL or CUSTOM
self.update_file_all(ds_obj)
elif(name not in pre_execute_tables):
# The current proc doesn't want this file and it wasn't
# created by a different proc so we can discard it
self.pop_dataset(name)
else:
# Don't want the output from curr proc, but DO want existing Dataset object
# All built-ins should already have this attribute empty, but UDPs may have
# loaded a table they didn't want
ds_obj.ds_curr_output = None
msg = _("Output table {} was created but output settings exclude this table "
"from saving for the current proc and so it was discarded.").format(name)
log_lcl.warning(msg)
[docs]
def update_status(self) -> None:
"""Append `outstatus` dataset to `status_file` and updates `status_file`
where `outstatus` shares the same index values.
Also calls `update_file_all()` to update `status_log` with `outstatus`.
"""
proc_status = self.get_dataset("outstatus")
# If proc_status is empty, nothing to update
if(proc_status is None or table_empty(proc_status)):
return
# Check against upper case column names in the unlikely chance JOBID and SEQNO were
# originally on the user's instatus file in some other capitalization
upper_cols = [x.upper() for x in proc_status.column_names]
# These fields are likely added during a preceding call to updateFileAll() so don't re-add
# them if we don't have to
if("JOBID" not in upper_cols):
proc_status = add_single_value_column(proc_status, "JOBID", self.current_job_step.jobid, pa.string())
if("SEQNO" not in upper_cols):
proc_status = add_single_value_column(proc_status, "SEQNO", self.current_job_step.seqno, pa.float32())
status_file_obj = self.get_dataset("status_file", ds_format="object")
status_file = status_file_obj.ds
if(status_file_obj is None or table_empty(status_file)):
# Create a copy of proc_status
status_file_new = copy_table(proc_status)
else:
# Index values to use for concat and update
index_cols = ["JOBID", "FIELDID"]
if(self.input_params.unit_id):
index_cols.append(self.input_params.unit_id.upper())
#NOTE: Pyarrow 16.1 docs erroneously claim that it accepts a dict to rename columns,
# and treats the dict as {old_name: new_name}. However all of the 16.1.0 branches
# from the project's github show that the method is not yet updated to accept a dict.
# This is an issue as it allows you to pass a dict with no error, then iterates over
# the keys of the dict as if it is a list and renames your columns as if the key of
# the dict is the NEW name and not the old name.
# So make sure only lists are used for renaming until this feature is properly implemented
proc_status_col_dict = {x.upper():x for x in proc_status.column_names}
proc_status = proc_status.rename_columns(list(proc_status_col_dict.keys()))
status_file_col_dict = {x.upper():x for x in status_file.column_names}
status_file = status_file.rename_columns(list(status_file_col_dict.keys()))
# Get the records from each dataset with unique index values (i.e. no overlap)
proc_status_non_matching = proc_status.join(status_file, keys=index_cols, join_type="left anti")
status_file_non_matching = status_file.join(proc_status, keys=index_cols, join_type="left anti")
# Get records from both datasets with matching index values
combined_status_matching = status_file.join(proc_status, keys=index_cols, join_type="inner", left_suffix="to_drop")
# Drop the old value column from status_file to replace it with the new values from status
# Since we want non-NaN values replaced with NaN, we can just drop the whole column
combined_status_matching = combined_status_matching.drop_columns([x for x in combined_status_matching.column_names if "to_drop" in x])
# Combine all of our subsets to create the final dataset
# Note: If we can be sure that the schemas of status_file and proc_status are the same we
# don't need to set the promote_options and this operation may be faster
status_file_new = pa.concat_tables([status_file_non_matching, proc_status_non_matching, combined_status_matching], promote_options="permissive")
# Revert capitalization on status_file
# Dict feature only available in Python 3.9
# Computes the combined dictionary where any key overlaps are won by the
# right side dictionary
proc_status = proc_status.rename_columns(list(proc_status_col_dict.values()))
merged_dict = proc_status_col_dict | status_file_col_dict
normal_columns_merged = [merged_dict[x] for x in status_file_new.column_names]
status_file_new = status_file_new.rename_columns(normal_columns_merged)
self.outstatus = proc_status
self.update_file_all(proc_status, "status_log")
status_file_obj.ds = status_file_new
[docs]
def update_file_all(self, dataset: str | pa.Table | Dataset, dataset_all_name: str | None = None) -> None:
"""Update the respective cumulative version of `dataset` using the contents of the
non-cumulative version after adding the current jobid and seqno to it.
If `dataset` is given as a Dataset object or the name of the object to fetch, the object may
contain both the cumulative and curr_output versions. If it only contains the cumulative version,
the jobid and seqno columns are still added to it if they are not yet present. As well in this case,
the jobid and seqno column additions will be saved back to the non-cumulative dataset if
`dataset_all_name` is seperately provided. Otherwise the changes to the non-cumulative dataset are
discarded and only the updated cumulative version is saved.
:param dataset: Either the name of a dataset to fetch, the Dataset object itself or a specific
pyarrow table. If provided as a pyarrow table no changes are saved back to this dataset and
`dataset_all_name` must be provided as well.
:type dataset: str | pa.Table | banffprocessor.util.dataset.Dataset
:param dataset_all_name: The name of the dataset to update using dataset_name.
If not provided the non-cumulative dataset's name is used, defaults to None
:type dataset_all_name: str, optional
:raises ValueError: If `dataset` is given as a pa.Table but no `dataset_all_name` is given.
"""
dataset_obj = None
if(isinstance(dataset, str)):
if(dataset_all_name):
# If both a dataset table name AND a dataset_all table name are given,
# we are in a special case. Most likely outreject, but could change in the future
dataset_all_obj = self.get_dataset(dataset_all_name, create_if_not_exist=True, ds_format="object")
dataset_obj = self.get_dataset(dataset, ds_format="object")
dataset = dataset_obj.ds
else:
# If only a single table name is given, the underlying object should
# hold both the curr_output and cumulative versions
dataset_all_obj = self.get_dataset(dataset, create_if_not_exist=True, ds_format="object")
dataset = dataset_all_obj.ds_curr_output
dataset_all_obj.ds_curr_output = None
elif(isinstance(dataset, Dataset)):
if(dataset_all_name):
dataset_all_obj = self.get_dataset(dataset_all_name, create_if_not_exist=True, ds_format="object")
dataset_obj = dataset
dataset = dataset_obj.ds
else:
# If we were given an object, we always use ds_curr_output to update ds_cumulative
dataset_all_obj = dataset
dataset = dataset_all_obj.ds_curr_output
dataset_all_obj.ds_curr_output = None
elif(isinstance(dataset, pa.Table)):
if(not dataset_all_name):
msg = _("dataset was given as a pyarrow table but no name was given for the table to update.")
log_lcl.exception(msg)
raise ValueError(msg)
dataset_all_obj = self.get_dataset(dataset_all_name, create_if_not_exist=True, ds_format="object")
else:
msg = _("dataset was given as an invalid type. dataset must be of type str, pa.Table or Dataset.")
log_lcl.exception(msg)
raise TypeError(msg)
# We only ever want to update the unfiltered version of the dataset
dataset_all = dataset_all_obj.ds
if(dataset is None):
# A cumulative dataset was passed in alone and it has no ds_curr_output
# So we swap the variables to carry out the update with an empty table
# to ensure the JobID/seqno is added to ds_cumulative
dataset = dataset_all
dataset_all = pa.table([])
# See update_status() NOTE on using dict vs list for rename_columns
dataset_col_dict = {x.upper():x for x in dataset.column_names}
dataset = dataset.rename_columns(list(dataset_col_dict.keys()))
dataset_all_col_dict = {x.upper():x for x in dataset_all.column_names}
dataset_all = dataset_all.rename_columns(list(dataset_all_col_dict.keys()))
# Ensure the jobid and seqno for the current job step are present in the dataset first
if("JOBID" not in dataset.column_names and "JOB_ID" not in dataset.column_names):
dataset = add_single_value_column(dataset, "JOBID", self.current_job_step.jobid, pa.string())
# Need this for when we revert things later
dataset_col_dict["JOBID"] = "JOBID"
if("SEQNO" not in dataset.column_names):
dataset = add_single_value_column(dataset, "SEQNO", self.current_job_step.seqno, pa.float32())
# Need this for when we revert things later
dataset_col_dict["SEQNO"] = "SEQNO"
# Append the new dataset to the existing dataset_all
dataset_all_new = pa.concat_tables([dataset_all, dataset], promote_options="permissive")
# Revert capitalization, only necessary if keeping dataset
if(dataset_obj):
dataset = dataset.rename_columns([dataset_col_dict[x] for x in dataset.column_names])
dataset_obj.ds = dataset
merged_dict = dataset_col_dict | dataset_all_col_dict
normal_columns_merged = [merged_dict[x] for x in dataset_all_new.column_names]
dataset_all_new = dataset_all_new.rename_columns(normal_columns_merged)
# Save back the changes we made to dataset_all, explicitly to the non-filtered ds
dataset_all_obj.ds = dataset_all_new
[docs]
def update_imputed_file(self) -> None:
"""Implement non-'prebanff' section of updateImputedFile.sas.
Updates the `imputed_file` with the data found in `outdata`.
Does not add any records from `outdata` without a matching index value in `imputed_file`,
only updates matching records. `imputed_file` is updated inplace.
"""
unit_id = self.input_params.unit_id
if(not unit_id):
msg = _("update_imputed_file requires a unit_id value to sort and update on.")
log_lcl.exception(msg)
raise ValueError(msg)
unit_id_upper = unit_id.upper()
imputed_file_obj = self.get_dataset("imputed_file", create_if_not_exist=True, ds_format="object")
imputed_file = imputed_file_obj.ds
outdata = self.get_dataset("outdata", create_if_not_exist=True)
# See update_status() NOTE on using dict vs list for rename_columns
imputed_file_col_dict = {x.upper():x for x in imputed_file.column_names}
imputed_file = imputed_file.rename_columns(list(imputed_file_col_dict.keys()))
outdata_col_dict = {x.upper():x for x in outdata.column_names}
outdata = outdata.rename_columns(list(outdata_col_dict.keys()))
imputed_file = imputed_file.sort_by(unit_id_upper)
outdata = outdata.sort_by(unit_id_upper)
# In updateImputedFile.sas the update statement in the final data step includes a
# terminating if-statement which prevents any new rows from being copied over.
# Get imputed_file records that don't have a matching key in outdata
imputed_non_matching = imputed_file.join(outdata, keys=unit_id_upper, join_type="left anti")
# Get outdata records with matching keys in imputed_file, mark the imputed_file columns
# as we want to overwrite old data in imputed_file with new data from outdata. Use an
# inner join because we do not want any new records from outdata, only update existing
drop_suffix = "_to_drop"
outdata = outdata.join(imputed_file, keys=unit_id_upper, join_type="inner", right_suffix=drop_suffix)
# Drop any new columns from outdata that aren't originally found in imputed_file, also drop
# the old imputed_file data columns we marked above
outdata = outdata.drop_columns([x for x in outdata.column_names if
(drop_suffix in x or x not in imputed_file.column_names)])
# Concat our non-overlapping imputed data with the new values for matching records in outdata
new_imputed = pa.concat_tables([imputed_non_matching, outdata], promote_options="permissive")
# Our concat can make the order a little messed up so perform a final sort
new_imputed = new_imputed.sort_by(unit_id_upper)
# Save back the changes we made to imputed_file with original column names,
# we can discard outdata changes
imputed_file_obj.ds = new_imputed.rename_columns([imputed_file_col_dict[x] for x in new_imputed.column_names])
[docs]
def clean_status_file(self) -> None:
"""Implement cleanStatusAll.sas. Removes all extra columns from the statusAll file."""
status_file_obj = self.get_dataset("status_file", ds_format="object")
if(status_file_obj is None):
return
status_file = status_file_obj.ds
keep_list_upper = [
"JOBID",
"SEQNO",
"FIELDID",
"STATUS",
"VALUE",
]
if(self.input_params.unit_id):
keep_list_upper.append(self.input_params.unit_id.upper())
# Get the keep list columns with correct capitalization in status_file
keep_list_actual = [x for x in status_file.column_names if x.upper() in keep_list_upper]
# filter the dataset
status_file_obj.ds = status_file.select(keep_list_actual)
[docs]
def apply_process_controls(self) -> None:
"""Apply a set of process controls for a single controlid in `controls` according to their
specifications and sets the filtered dataset(s) in processor_data.
"""
# If the current job step has no controls, just return
if(not self.current_job_step.controlid):
return
controls = self.metaobjects.get_process_controls(self.current_job_step.controlid)
if(not controls):
msg = _("Undefined control ID found: controlid ='{}', jobid='{}', seqno='{}'").format(self.current_job_step.controlid,
self.current_job_step.jobid,
self.current_job_step.seqno)
log_lcl.exception(msg)
raise MetadataConstraintError(msg)
# Each control_set is a dict of all process controls for a single targetfile
for control_set in controls.values():
# arbitrary record we can use to get controlid and targetfile as every
# record in control_set should have the same values for these
arbitrary_control = next(iter(control_set.values()))[0]
try:
target_file_obj = self.get_dataset(arbitrary_control.targetfile, ds_format="object")
if(target_file_obj is None):
msg = _("Process Control has an unrecognized target file value: targetfile='{}'",
).format(arbitrary_control.targetfile)
if(Path(arbitrary_control.targetfile).suffix):
msg += _(". Note: targetfile must be an in-memory table, not one on disk.")
log_lcl.exception(msg)
raise ProcessControlError(msg)
target_file = target_file_obj.ds
if(table_empty(target_file)):
msg = _("Target file exists but is empty: targetfile='{}'. "
"Skipping process controls for file...").format(arbitrary_control.targetfile)
log_lcl.warning(msg)
# Continue to process the next targetfile
continue
row_filter = None
column_filter = []
exclude_rejected = None
edit_group_filter = None
if(ProcessControlType.ROW_FILTER in control_set):
row_filter = "(" + ") AND (".join([x.value for x in control_set[ProcessControlType.ROW_FILTER]]) + ")"
if(ProcessControlType.COLUMN_FILTER in control_set):
for x in control_set[ProcessControlType.COLUMN_FILTER]:
column_filter.extend([y.upper() for y in x.value])
if(ProcessControlType.EXCLUDE_REJECTED in control_set):
exclude_control = control_set[ProcessControlType.EXCLUDE_REJECTED]
if(len(exclude_control) != 1):
msg = _("Too many EXCLUDEREJECTED controls for one targetfile.")
log_lcl.exception(msg)
raise ProcessControlError(msg)
exclude_control = exclude_control[0]
# currently allows True/False values for this control, so users don't have to
# remove the control from their file if they want to disable it temporarily
if(exclude_control.value):
# Get the correct capitalization for unit_id, just in case.
curr_unit_id = [x for x in target_file.column_names if x.upper() == self.input_params.unit_id.upper()][0]
# We want to use rejected here, not outreject_all
# The banff package produces the rejected files and SHOULD use the original
# capitalizaion of unit_id
exclude_rejected = f"({curr_unit_id} NOT IN (SELECT {curr_unit_id} FROM outreject))" # noqa: S608
else:
exclude_rejected = False
if(ProcessControlType.EDIT_GROUP_FILTER in control_set):
egf_control = control_set[ProcessControlType.EDIT_GROUP_FILTER]
if(len(egf_control) != 1):
msg = _("Too many EDIT_GROUP_FILTER controls for one targetfile.")
log_lcl.exception(msg)
raise ProcessControlError(msg)
egf_control = egf_control[0]
if(not self.current_job_step.editgroupid):
msg = _("An EDIT_GROUP_FILTER process control was specified but no editgroupid was "
"given in the metadata for the job step.")
log_lcl.exception(msg)
raise ProcessControlError(msg)
egi_cols = {x.upper():x for x in target_file.column_names if x.upper() in ("JOBID", "JOB_ID", "SEQNO", "STATUS")}
if(("JOBID" in egi_cols or "JOB_ID" in egi_cols)
and "SEQNO" in egi_cols and "STATUS" in egi_cols):
statuses = ", ".join([f"'{status}'" for status in egf_control.value])
edit_group_filter = (
f"""
(({egi_cols['JOBID']}, {egi_cols['SEQNO']}) IN (
SELECT (jobid, seqno) FROM banff.Jobs WHERE
editgroupid = '{self.current_job_step.editgroupid}'))
OR
(({egi_cols['JOBID']}, {egi_cols['SEQNO']}) IN (
SELECT (jobid, seqno) FROM banff.Jobs WHERE
process = 'outlier') AND status IN ({statuses}))
""" # noqa: S608
)
else:
msg = _("An EDIT_GROUP_FILTER process control was used but one or more required fields were "
"missing (STATUS, JOBID and SEQNO) on the instatus file. The control has been skipped.")
log_lcl.warning(msg)
edit_group_filter = None
if(not row_filter and not column_filter and not edit_group_filter):
if(exclude_rejected is None):
# No valid controls were found in this control_set
msg = _("No supported controls found in process controls metadata.")
log_lcl.exception(msg)
raise MetadataConstraintError(msg)
if(not exclude_rejected):
# The only valid control is an exclude_rejected set to False, not an error,
# go to the next control_set/targetfile
continue
# Convert the column labels in the filter to the same capitalization as the target_file
column_filter_string = None
if(column_filter):
column_filter_normal = [x for x in target_file.column_names if x.upper() in column_filter]
column_filter_string = ", ".join(column_filter_normal)
# Currently column_filter KEEPS columns specified NOT DROPS
# Currently row_filter only supports arguments that are all present in one of
# indata, instatus or target_file
query = "SELECT "
if(column_filter_string is None):
query += "* "
elif(column_filter_string):
query += f"{column_filter_string} "
else:
# If none of the column_filter names are found in the targetfile columns
# column_filter_string will be an empty string
msg = _("A column filter was specified but none of the columns were found in targetfile.")
log_lcl.exception(msg)
raise ProcessControlError(msg)
# Make sure we use the name given by the user (i.e. "instatus" not "status_file")
# as the user-given name should be the names registered in duckdb.
query += f"FROM '{arbitrary_control.targetfile}'"
if(row_filter or exclude_rejected or edit_group_filter):
# Connect non-empty condition strings with 'AND'
query += f" WHERE {' AND '.join(filter(None, [row_filter, exclude_rejected, edit_group_filter]))}"
msg = _("Computed filter: \n\t{}").format(query)
log_lcl.debug(msg)
result = self.dbconn.sql(query).arrow()
if(result is None):
raise ProcessControlError
if(result.shape[0] < 1):
# if the resulting table has no rows
msg = _("Process filter resulted in a dataset containing {} columns but 0 rows. Continuing...").format(result.shape[1])
log_lcl.warning(msg)
except Exception as e:
msg = _("There was an error while applying process controls with controlid {} "
"in job {} seqno {}.\nException: {}").format(self.current_job_step.controlid,
self.current_job_step.jobid,
self.current_job_step.seqno, e)
log_lcl.exception(msg)
raise e
# Update the target_file with the new filtered version
target_file_obj.ds_filtered = result
msg = _("Filter succesfully applied.")
log_lcl.debug(msg)
[docs]
def get_new_block_copy(self) -> "ProcessorData":
"""Return a new ProcessorData object with the necessary attributes copied in
from this ProcessorData object to spawn a new processor block.
:return: A new ProcessorData object with the required input parameters, metadata
and datasets copied in.
:rtype: ProcessorData
"""
# Make a deep copy of input_params (to avoid any potential issues with a shallow copy)
new_input_params = copy.deepcopy(self.input_params)
# New block's job_id is stored in the specid field of the calling block
new_input_params.job_id = self.current_job_step.specid
# Create a new subfolder for the block
new_block_folder = self.input_params.output_folder.joinpath(
Path(f"{self.current_job_step.jobid}_{self.current_job_step.seqno}_{self.current_job_step.specid}"),
)
if(not new_block_folder.is_dir()):
new_block_folder.mkdir(parents=True)
new_input_params.input_folder = new_block_folder
new_input_params.output_folder = new_block_folder / "out"
if not new_input_params.output_folder.exists():
new_input_params.output_folder.mkdir(parents=True)
# input_params are copied with slight modifications made above
copied_data = ProcessorData(input_params=new_input_params, dbconn=self.dbconn)
# metadata may be copied in directly as the objects and tables should only
# be used in a read-only manner
copied_data.metaobjects = self.metaobjects
# Make sure the child block reports duration as continuous from the parent
copied_data.total_job_duration = self.total_job_duration
# The parent block increments the count, then the child runs, then the parent reports.
# Therefore, the curr_step_count must be decremented/incremented before/after the
# child block executes
copied_data.curr_step_count = self.curr_step_count - 1
# Copy input files (aux/hist if they have already been read into memory)
# rejected is a special case as it is directly used by name in a Process Control
to_copy = ["indata", "instatus", "outreject", "outreject_all", "indata_aux", "indata_hist", "instatus_hist"]
for name in to_copy:
# If a filtered version of a table exists it should be set as an un-filtered
# version for the new block.
data = self.get_dataset(name, ds_format="object")
if(data):
copied_data.set_dataset(name, data.ds_filtered if data.ds_filtered else data.ds)
return copied_data
[docs]
def update_with_block_result(self, block_data: "ProcessorData") -> None:
"""Update this ProcessorData object with the datasets found in `block_data`
as it is upon completion of a JOB process which calls a Process Block.
:param block_data: The ProcessorData object containing the results from a
Process Block execution.
:type block_data: ProcessorData
"""
# Correct our step count now the block is complete. Remember the increment
# to revert the decrement done when entering the child block
self.curr_step_count = block_data.curr_step_count + 1
# If update_status() modifies status_log it will not properly carry over all the records
# we want, so we do it seperately like this.
# First, make sure status_log is popped from the child so it is not revisited in the loop
child_status_log = block_data.pop_dataset("status_log")
if(child_status_log is not None):
# make sure the parent's status_log exists before updating it,
# we don't need the object here though, we're just using the flag
self.get_dataset("status_log", create_if_not_exist=True)
# Append child status_log to parent
self.update_file_all(child_status_log, "status_log")
# Now store a local reference to the updated status_log
# we will overwrite the resulting status_log in the parent after
# update_status() runs with this table instead
updated_status_log = self.get_dataset("status_log", ds_format="object").ds
else:
# What if a user calls `job` as their first step, then the child block calls
# UDPs which update instatus directly with no status_log involvement?
# If the child did not produce a status_log we just let update_status()
# do it's thing with the parent status_log, if it runs at all
updated_status_log = None
# At this point there should be no filtered datasets in block_data
for name, block_ds in block_data.datasets.items():
if("_aux" in name or "_hist" in name or name == "status_log"):
continue
if(name == "imputed_file"):
# Since we directly copied in table references, if the 2 ProcessorData objects
# reference the same table, we know no updates were made
# We cannot replace self's imputed_file with the block's imputed_file as it's
# possible the block started with a filtered file, so use outdata
if(block_ds.ds is not self.indata):
self.outdata = block_ds.ds
elif(name == "status_file"):
# If instatus is unchanged, child status_log should also be empty
if(block_ds.ds is not self.instatus):
# Only grab newly added records
status_file = block_ds.ds.filter(pa.compute.field("JOBID") == f"{block_data.current_job_step.jobid}")
# Remove the child block jobid, to be replaced by the current block's jobid on update
status_file = status_file.drop(["JOBID", "SEQNO"])
self.outstatus = status_file
# Perform the status_file update here, so we can ignore the status_log result
# in favour of our own updates
self.update_status()
# Make sure we only run update_status the one time by popping off outstatus
self.pop_dataset("outstatus")
elif(name == "time_store"):
time_store_curr_obj = self.get_dataset(name, ds_format="object")
if(time_store_curr_obj):
time_store_curr_obj.ds = pa.concat_tables([time_store_curr_obj.ds, block_ds.ds])
else:
self.set_dataset(name, block_ds.ds)
else:
# outreject and outreject_all will correctly fall here
self.set_dataset(name, block_ds.ds)
# Overwrite any changes made to status_log by any calls to update_status above
if(updated_status_log is not None):
self.set_dataset("status_log", updated_status_log)
[docs]
def save_proc_output(self, banff_call: BanffProcedure, all_tables: list[str]) -> None:
"""Save procedure output datasets.
Append the non-default tables, the list of which is set on processor_data by the
main processor loop, from `banff_call` to their respective cumulative datasets
in the dataset collection based on the output type parameters set in the input_params.
:param banff_call: A BanffProcedure call object that has completed execution and contains
the output tables as attributes named as they are specified in `processor_to_proc`
:type banff_call: :class:`banff.proc.BanffProcedure`
:param all_tables: The list of all possible table names available on `banff_call`,
required if `process_output_type` is ALL
:type all_tables: list[str]
:raises MetadataConstraintError: If a user's ProcessOutputs metadata contains an
unrecognized output_name
:raises AttributeError: If the `processor_to_proc` mapping contains a table name
that is not found as an attribute on the `banff_call` object.
"""
table_names = ([x for x in all_tables if x not in self.minimal_outputs]
if self.input_params.output_all
else self.custom_outputs)
for name in table_names:
try:
table = getattr(banff_call, name)
except AttributeError:
msg = _("ProcessOutputs metadata entry found under process {} with "
"unrecognized output_name {}").format(self.current_job_step.process, name)
log_lcl.exception(msg)
raise MetadataConstraintError from AttributeError
if(table is not None and not table_empty(table)):
self.set_dataset(name, table)
[docs]
def save_outputs(self) -> None:
"""Save all current tables in `_datasets` to the output folder.
Each dataset's name will be used as the filename, the extension is determined by the input parameters:
The `save_format` input parameter is checked first, then the file type of indata.
If neither parameter is provided or the filetype is unrecognized .parq is used by default.
"""
# default save format is parquet file
file_ext = [".parq"]
# save_format input param overrides other options
if(self.input_params.save_format):
file_ext = [x.casefold() for x in self.input_params.save_format]
# otherwise just use the same format as indata
elif(self.input_params.indata_filename):
file_ext = [self.input_params.indata_filename.suffix]
for name, ds_obj in self._datasets.items():
ds = ds_obj.ds
if("_aux" in name or "_hist" in name):
continue
if(table_empty(ds)):
log_lcl.warning(_("Output dataset {} was created but is empty.").format(name))
else:
if(not self.input_params.output_folder):
msg = _("No output folder set. Ouputs will not be saved.")
log_lcl.warning(msg)
return
for ext in file_ext:
if(ext and not ext.isspace()):
if(ext in tc.file_ext_csv):
ds.to_pandas().to_csv(self.input_params.output_folder / f"{name}.csv", sep=",", index=False)
else:#if(ext in tc.file_ext_parquet):
# If a non-supported file extension is used (or a sas7bdat indata file is used)
# then we just default to .parq for now
pq.write_table(ds, self.input_params.output_folder / f"{name}.parq")
[docs]
def load_dataset_from_file(self, data_file: str | Path) -> pa.Table:
"""Load a Pyarrow Table from the data file indicated by the `data_file` parameter.
If `data_file` contains a full filepath, the file will be retrieved using
this filepath. If `data_file` only contains a filename, the `processor_input.input_folder`
is used as the containing directory to fetch from.
:param data_file: The full filepath or filename of the data file to load
:type data_file: str | Path
:raises ProcessorInputParameterError: If an empty filename is given or just
a filename but no `input_folder` had been previously specified in the
processor_input member object.
:raises Exception: If some other issue causes the file to not be read
:return: The `data_file` as loaded into a PyArrow Table
:rtype: pa.Table
"""
dataset = None
# Currently supported data formats from io_util.type_converters include:
# Parquet --- ".parquet", ".parq"
# Pickle --- ".pickle", ".pkl", ".pcl"
# SAS --- ".sas7bdat"
# CSV --- ".csv"
# Feather --- ".feather"
try:
# First determine if the filename is a full filepath or just the filename
file_path = Path(data_file)
if(file_path.is_absolute()):
# data_file is a full filepath, so pass the whole thing
dataset = io_util.load_input_dataset(str(file_path), log_lcl)
if isinstance(dataset, pd.DataFrame):
dataset = io_util.pandas_to_arrow(dataset)
else:
# data_file is just a filename, so use the input folder as the filepath
if(not file_path.name or not self.input_params.input_folder):
msg = _("Input folder was not provided or data_file string was empty when "
"attempting to load datafile without full filepath.")
log_lcl.exception(msg)
raise ProcessorInputParameterError(msg)
file_path_complete = (self.input_params.input_folder / file_path).resolve()
dataset = io_util.load_input_dataset(str(file_path_complete), log_lcl)
if isinstance(dataset, pd.DataFrame):
dataset = io_util.pandas_to_arrow(dataset)
except Exception as e:
msg = _("Error while reading input data file {}").format(data_file)
log_lcl.exception(msg)
raise e from None
# This shouldn't happen really unless filetype not supported and not previously caught or filename is empty.
if dataset is None:
msg = _("Filetype not supported or filename empty.")
log_lcl.exception(msg)
raise ValueError(msg)
return dataset