import argparse
import sys
from argparse import RawTextHelpFormatter
from datetime import datetime, timedelta
from pathlib import Path
import banff
import duckdb
import pandas as pd
import pyarrow as pa
import pyarrow.compute as pc
import banffprocessor as bpp
import banffprocessor.procedures.banff_procedures.job_proc as jp
import banffprocessor.processor_logger as plg
from banffprocessor.exceptions import MetadataConstraintError, ProcedureReturnCodeError, ProcessorInputParameterError, UserDefinedPluginError
from banffprocessor.metadata.metaobjects import MetaObjects
from banffprocessor.nls import _
from banffprocessor.procedures import banff_procedures, factory, loader
from banffprocessor.procedures.banff_procedures.editstats import EditStats
from banffprocessor.procedures.banff_procedures.verifyedits import VerifyEdits
from banffprocessor.processor_data import ProcessorData
from banffprocessor.processor_input import ProcessorInput
from banffprocessor.util.dataset import add_single_value_column, copy_table, table_empty
# Setup local log for processor module specifically
log_lcl = plg.setup_processor_logger()
[docs]
class Processor:
"""Main class for the Banff processor.
:attribute processor_data: Contains all the datasets and parameters required to run
a Banff Processor job
:type processor_data: :class:`src.banffprocessor.processor_data.ProcessorData`
:attribute is_child_block: Is the job running in this processor instance a child of another
currently running processor instance executing the parent block?
:type is_child_block: bool
"""
processor_data : ProcessorData
is_child_block: bool
@property
def dbconn(self) -> duckdb.DuckDBPyConnection | None:
"""The currently connected database used to store some processor data.
:return: The duckdbpyconnection currently being used to store data.
:rtype: duckdb.DuckDBPyConnection | None
"""
return self._dbconn
def __init__(self, input_params: ProcessorInput | None = None, initial_data: ProcessorData | None = None,
dbconn: duckdb.DuckDBPyConnection | None = None, indata: pa.Table | pd.DataFrame | None = None,
indata_aux: pa.Table | pd.DataFrame | None = None, indata_hist: pa.Table | pd.DataFrame | None = None,
instatus: pa.Table | pd.DataFrame | None = None, instatus_hist: pa.Table | pd.DataFrame | None = None) -> None:
"""Construct an instance of the Processor.
:param input_params: The job's input parameters. This may instead be provided as a member
of `initial_data`, but one must be provided. Defaults to None
:type input_params: :class:`src.banffprocessor.processor_input.ProcessorInput` | None, optional
:param initial_data: Initializes a new Processor object with data from an existing Processor, defaults to None
:type initial_data: :class:`src.banffprocessor.processor_data.ProcessorData` | None, optional
:param indata: The indata dataset, defaults to None
:type indata: :class:`pyarrow.Table` | :class:`pandas.DataFrame` | None, optional
:param indata_aux: The indata_aux dataset, defaults to None
:type indata_aux: :class:`pyarrow.Table` | :class:`pandas.DataFrame` | None, optional
:param indata_hist: The indata_hist dataset, defaults to None
:type indata_hist: :class:`pyarrow.Table` | :class:`pandas.DataFrame` | None, optional
:param instatus: The instatus dataset, defaults to None
:type instatus: :class:`pyarrow.Table` | :class:`pandas.DataFrame` | None, optional
:param instatus_hist: The instatus_hist dataset, defaults to None
:type instatus_hist: :class:`pyarrow.Table` | :class:`pandas.DataFrame` | None, optional
:param dbconn: A DuckDBPyConnection to use for storing required data and metadata.
If not provided an in-memory DB will be instantiated, defaults to None
:type dbconn: :class:`duckdb.DuckDBPyConnection` | None, optional
:raises ProcessorInputParameterError: If the `input_params` contains an
`output_folder` or `metadata_folder` parameter but the directory
cannot be found
:raises ValueError: If one of `input_params` or `initial_data` are not provided.
"""
self.is_child_block = False
if(initial_data):
self.processor_data = initial_data
self._dbconn = self.processor_data.dbconn
self._is_private_connection = False
self.is_child_block = True
elif(not input_params):
msg = _("Input parameters must be provided either via the input_params parameter "
"or as part of the input_params property of the initial_data ProcessorData object.")
log_lcl.exception(msg)
raise ValueError(msg)
else:
self._dbconn = dbconn
if self._dbconn is None:
self._dbconn = duckdb.connect(database=":memory:")
self._is_private_connection = True
else:
self._is_private_connection = False
self.processor_data = ProcessorData(input_params=input_params, dbconn=self._dbconn)
# Make sure DuckDB includes user input folder so DuckDB can see any tables the
# user might reference relative to the correct location
self.dbconn.sql(f"SET file_search_path = '{self.processor_data.input_params.input_folder}';")
log_lcl.debug(_("Set DuckDB connection file_search_path to input folder: {}").format(self.processor_data.input_params.input_folder))
plg.add_file_handlers(log_directory=self.processor_data.input_params.output_folder,
trace_level=self.processor_data.input_params.log_level)
# The remaining setup below is only necessary if an initial ProcessorData object is not provided
if(initial_data):
return
# Load the metadata files from the selected location
self.processor_data.metaobjects = MetaObjects(self.processor_data.input_params.metadata_folder,
job_id=self.processor_data.input_params.job_id,
dbconn=self._dbconn)
# Load Banff and User-defined plugins
self._load_plugins()
for proc_name in self.processor_data.metaobjects.job_proc_names:
proc_cls = factory.get_procedure(proc_name)
if(proc_cls not in [VerifyEdits, EditStats, jp.JobProc] and proc_name in banff_procedures.__all__):
msg = "At least one of the procedures referenced in your job require {} "
msg += "but one was not provided in your input parameters."
# If we are here, we need unit_id and imputed_file
if(not self.processor_data.input_params.unit_id):
msg = _(msg).format("a unit_id")
log_lcl.exception(msg)
raise ProcessorInputParameterError(msg)
if(not indata and not self.processor_data.input_params.indata_filename):
msg = _(msg).format("an indata file")
log_lcl.exception(msg)
raise ProcessorInputParameterError(msg)
# Input datasets
dataset_name = "imputed_file"
if(indata is not None):
self.processor_data.set_dataset(dataset_name, indata)
elif(self.processor_data.input_params.indata_filename):
self.processor_data.set_dataset_from_file(dataset_name, self.processor_data.input_params.indata_filename)
dataset_name = "indata_aux"
if(indata_aux is not None):
self.processor_data.set_dataset(dataset_name, indata_aux)
dataset_name = "indata_hist"
if(indata_hist is not None):
self.processor_data.set_dataset(dataset_name, indata_hist)
dataset_name = "instatus_hist"
if(instatus_hist is not None):
self.processor_data.set_dataset(dataset_name, instatus_hist)
dataset_name = "status_file"
if(instatus is not None):
self.processor_data.set_dataset(dataset_name, instatus)
elif(self.processor_data.input_params.instatus_filename):
self.processor_data.set_dataset_from_file(dataset_name, self.processor_data.input_params.instatus_filename)
# Special setup required for regular instatus file
self._prepare_status_file()
def __del__(self) -> None:
"""Ensure memory is released and the database connection is closed."""
del self.processor_data
if self._is_private_connection:
self._dbconn.close()
[docs]
@classmethod
def from_file(cls, input_filepath: str | Path, indata: pa.Table | pd.DataFrame | None = None,
indata_aux: pa.Table | pd.DataFrame | None = None, indata_hist: pa.Table | pd.DataFrame | None = None,
instatus: pa.Table | pd.DataFrame | None = None, instatus_hist: pa.Table | pd.DataFrame | None = None,
dbconn: duckdb.DuckDBPyConnection | None = None) -> "Processor":
"""Initialize a :class:`src.banffprocessor.processor.Processor` object from a JSON file.
:param input_filepath: The full path to the JSON file containing the input parameters
required to run the processor. The containing folder will be used as the
default location for required and optional files for the operation of the
processor. If a value is provided for an alternate filepath/folder parameter in
this file, that will be used instead of the containing folder.
:type input_filepath: str | pathlib.Path
:param indata: The indata dataset, defaults to None
:type indata: :class:`pyarrow.Table` | :class:`pandas.DataFrame` | None, optional
:param indata_aux: The indata_aux dataset, defaults to None
:type indata_aux: :class:`pyarrow.Table` | :class:`pandas.DataFrame` | None, optional
:param indata_hist: The indata_hist dataset, defaults to None
:type indata_hist: :class:`pyarrow.Table` | :class:`pandas.DataFrame` | None, optional
:param instatus: The instatus dataset, defaults to None
:type instatus: :class:`pyarrow.Table` | :class:`pandas.DataFrame` | None, optional
:param instatus_hist: The instatus_hist dataset, defaults to None
:type instatus_hist: :class:`pyarrow.Table` | :class:`pandas.DataFrame` | None, optional
:param dbconn: A DuckDBPyConnection to use for storing required data and metadata.
If not provided an in-memory DB will be instantiated, defaults to None
:type dbconn: :class:`duckdb.DuckDBPyConnection` | None, optional
:return: The :class:`src.banffprocessor.processor.Processor` object created using
the specified parameters
:rtype: :class:`src.banffprocessor.processor.Processor`
"""
return cls(input_params=ProcessorInput.from_file(input_filepath), indata=indata,
indata_aux=indata_aux, indata_hist=indata_hist, instatus=instatus,
instatus_hist=instatus_hist, dbconn=dbconn)
def _load_plugins(self) -> None:
"""Load all built-in banff procedures from their procedure classes in the project.
Also loads all available user plugins from the user's input or plugin folder.
:raises ProcessorInputParameterError: If the `processor_data` attribute's
`input_params` contains a `plugin_folder` parameter but the directory
cannot be found
"""
# Go through all the banff_procedures modules available
for mod_name in banff_procedures.__all__:
# Get the module, check if has a register method, indicating it
# has plugins within, and if so, call the registration method
module = sys.modules["banffprocessor.procedures.banff_procedures." + mod_name]
if(hasattr(module, "register")):
module.register(factory)
if(not self.processor_data.input_params.user_plugins_folder):
msg = _("No explicit user plugins folder was set and no \\plugins "
"subfolder was found in the input folder. No plugins were loaded.")
log_lcl.info(msg)
# If there is no plugins folder we can just return as the user likely didn't
# intend to include any plugins
return
# Load all user plugins from the plugin_folder
loader.load_plugins(self.processor_data.input_params.user_plugins_folder)
def _prepare_status_file(self) -> None:
"""Process the initial status_file file as loaded from an instatus file.
Ensures the correct configuration of JOBID and SEQNO columns and copies
final results to status_log.
"""
# Only necessary for instatus/status_file, others don't need preparation
status_file_obj = self.processor_data.get_dataset("status_file", create_if_not_exist=True, ds_format="object")
status_file_ds = status_file_obj.ds
if(status_file_ds is not None and not table_empty(status_file_ds)):
job_id_label = None
seqno_label = None
# Create a table schema to ensure the jobid and seqno columns are correctly formatted
new_schema = status_file_ds.schema
for col in status_file_ds.column_names:
if(col.upper() in ["JOBID", "JOB_ID"]):
job_id_label = col
# Make sure that the jobid column is the correct dtype
col_index = status_file_ds.column_names.index(col)
field = status_file_ds.field(col_index)
if(field.type != pa.string()):
new_schema = new_schema.set(col_index, pa.field(col, pa.string()))
elif(col.upper() == "SEQNO"):
seqno_label = col
# Make sure that the seqno column is the correct dtype
col_index = status_file_ds.column_names.index(col)
field = status_file_ds.field(col_index)
if(field.type != pa.float32()):
new_schema = new_schema.set(col_index, pa.field(col, pa.float32()))
# Set the new schema with the updated seqno dtype if necessary
if(new_schema is not status_file_ds.schema):
status_file_ds = status_file_ds.cast(new_schema)
# If the user hasn't included jobid and seqno, add them now with empty values
# Only jobid is expected but they are usually added as a pair
if(job_id_label is None):
job_id_label = "JOBID"
if(seqno_label is None):
seqno_label = "SEQNO"
status_file_ds = add_single_value_column(status_file_ds,
job_id_label,
self.processor_data.input_params.job_id,
pa.string())
# Hypothetically possible for SEQNO to exist with no JOBID col
status_file_ds = add_single_value_column(status_file_ds, seqno_label, 0, pa.float32())
else:
if(seqno_label is None):
seqno_label = "SEQNO"
status_file_ds = add_single_value_column(status_file_ds, seqno_label, 0, pa.float32())
new_job_id_col = []
new_seqno_col = []
change_flag = False
# Build new columns with unwanted values replaced with None
for i in range(len(status_file_ds)):
job_id_val = status_file_ds[job_id_label][i].as_py()
seqno_val = status_file_ds[seqno_label][i].as_py()
# If EITHER of JOBID or SEQNO are blank, treat both as blank regardless of the JOBID value
# Also, for any JOBID/SEQNO pairs in which the JOBID is the same as the current
# job_id input parameter, replace the values with null/empty to reduce
# confusion of status records from previous job runs being included as if
# they were results from the current job run
if(job_id_val == self.processor_data.input_params.job_id or
job_id_val is None or job_id_val.isspace() or seqno_val is None):
new_job_id_col.append(None)
new_seqno_col.append(None)
change_flag = True
else:
new_job_id_col.append(job_id_val)
new_seqno_col.append(seqno_val)
if(change_flag):
col_index = status_file_ds.column_names.index(job_id_label)
field = status_file_ds.field(col_index)
status_file_ds = status_file_ds.set_column(col_index, field, pa.array(new_job_id_col, pa.string()))
col_index = status_file_ds.column_names.index(seqno_label)
field = status_file_ds.field(col_index)
status_file_ds = status_file_ds.set_column(col_index, field, pa.array(new_seqno_col, pa.float32()))
#TODO: Implement support for process blocks
# Technically a user could have job_ids in their jobs file
# that differ from the input parameter job_id. However with the
# addition of process blocks it is somewhat difficult to parse
# which exact job_ids are relevant to the current job and which
# are extraneous.
msg = _("The current job_id value {} may have been found on the instatus file. "
"If it was, the SEQNO and JOBID values for those records, and any with "
"a missing JOBID, have been replaced with empty values.").format(self.processor_data.input_params.job_id)
log_lcl.info(msg)
# We also want a copy of instatus in status_log
self.processor_data.set_dataset("status_log", copy_table(status_file_ds))
status_file_obj.ds = status_file_ds
[docs]
def execute(self) -> None:
"""Execute the Banff Processor.
Iterates over the :class:`banffprocessor.metadata.Jobs` entries in
the `processor_data` `metaobjects` collection and performs the respective
Banff or user defined procedures.
:raises ProcedureReturnCodeError: If the return code returned by a Banff
or user defined procedure is non-zero
"""
# alias for easier use
bd = self.processor_data
# Get the job steps we are running
steps_to_run = bd.metaobjects.get_job_steps(bd.input_params.job_id)
if(not self.is_child_block):
msg = self._get_execution_header()
write_to_console(msg)
log_lcl.info(msg)
for job_step in steps_to_run:
# Increment our overall step count
bd.curr_step_count += 1
# Get the time before execution
# Note that perf_counter is the preferred option but we are want to print the full datetime stamp
# to match the previous version of the processor. This is why we use time or datetime instead.
start_time = datetime.now() # noqa: DTZ005
# Run protocols
curr_proc = factory.get_procedure(job_step.process)
is_built_in = job_step.process in banff_procedures.__all__ or job_step.process == "job"
# grab our by_varlist if not done yet and we don't already have it for the same byid
# Compare the byid in the new job_step against the previous
if(bd.current_job_step is None or job_step.byid != bd.current_job_step.byid):
bd.by_varlist = bd.metaobjects.get_varlist_fieldids(job_step.byid)
if(job_step.byid and not job_step.byid.isspace() and not bd.by_varlist):
msg = _("Error in byid field of job_id {} and seqno {}. Byid was set but "
"does not refer to any varlists found in the varlist metadata.",
).format(job_step.jobid, job_step.seqno)
log_lcl.exception(msg)
raise MetadataConstraintError(msg)
bd.current_uservars = None
if(job_step.specid):
bd.current_uservars = bd.metaobjects.get_user_vars_dict(job_step.specid, job_step.process)
# set this after fetching the new by_varlist to ensure we don't overwrite
# the reference to the previous job step (which is in current_job_step)
bd.current_job_step = job_step
# NOTE: If DonorImputation and MassImputation are both run in a single job, but the
# ProcessOutputs metadata only includes (donorimp, outdonormap), the final outdonormap
# file will only contain outputs from DonorImputation, NOT MassImputation
if(bd.input_params.output_custom):
bd.custom_outputs = bd.metaobjects.get_process_outputs(job_step.process)
else:
# If output setting is minimal or all, we don't need to specify any custom outputs
bd.custom_outputs = []
# Clear unwanted columns from status_file every new step
bd.clean_status_file()
# Print the header string
log_lcl.info(self._get_step_header())
# Perform process filters
bd.apply_process_controls()
# Note: we only check modifications made to indata (imputed_file) and instatus (status_file)
# If other data is changed, it will go undetected.
if(not is_built_in):
# Retain a temporary reference to the current tables
#NOTE: this is intentionally set after the process controls are applied therefore
# these would be references to the filtered version, if one exists.
indata_pre_execute = bd.get_dataset("imputed_file", create_if_not_exist=True)
instatus_pre_execute = bd.get_dataset("status_file", create_if_not_exist=True)
pre_execute_datasets = set(bd.datasets.keys())
# Execute and get the return code
#TODO: wrap in try...except and attach the current job info to the message.
# This way we can avoid having to repeat that info in every error message
# the procs throw and just have it once here.
rc = curr_proc.execute(bd)
log_lcl.info("%s return code: %s", job_step.process, rc)
if(rc != 0):
msg = _("Non-zero return code: {} returned by {} in job {} at seqno {}").format(
rc, job_step.process, job_step.jobid, job_step.seqno)
log_lcl.exception(msg)
raise ProcedureReturnCodeError(msg)
# Get the current datasets, don't create if they don't exist as we want
# to know if they're None.
# NOTE: We want to get the filtered version if it exists since that's what
# was passed in and we want to compare what should be the same dataset reference
curr_imputed = bd.get_dataset("imputed_file")
curr_status_file = bd.get_dataset("status_file")
# If a UDP was run, we want to check if indata or instatus were updated directly
# If these references don't match the user must have created a new table and
# updated the reference, so warn or kill the program
if(not is_built_in and (indata_pre_execute is not curr_imputed or
instatus_pre_execute is not curr_status_file)):
# If no process filter: accept the updates but warn the user, just in case
msg = _("Indata or instatus were modified. If this was not intended make "
"sure that any new tables you create are not saved back to the "
"processor_data object. Instead save your updates to the "
"outdata and outstatus attributes on processor_data")
log_lcl.warning(msg)
# Release these references now that we are 100% done with the old tables
indata_pre_execute = None
instatus_pre_execute = None
# Remove filtered datasets since the proc has completed so references
# below will use the original when performing updates
bd.clear_filtered_data()
# Get the out datasets but don't pop them until after update methods have run
curr_outstatus = bd.get_dataset("outstatus")
curr_outdata = bd.get_dataset("outdata")
if(curr_outstatus):
if(table_empty(curr_outstatus)):
# This can be either intentional or a mistake, depending on the plugin
msg = _("Process {} in job {} at seqno {} set an outstatus dataset but "
"it was empty.").format(job_step.process, job_step.jobid, job_step.seqno)
log_lcl.warning(msg)
elif(curr_outstatus is curr_status_file):
# If the user didn't copy() when forming outstatus from status_file
# the update will break. So give them a more informative error.
msg = _("Process {} in job {} at seqno {}. processor_data.outstatus should not "
"reference the same dataset as instatus.").format(job_step.process, job_step.jobid,
job_step.seqno)
log_lcl.exception(msg)
raise UserDefinedPluginError(msg)
else:
# Updates status_file and status_log files using the outstatus dataset
bd.update_status()
# We no longer save non-cumulative files
if(curr_outdata):
if(table_empty(curr_outdata)):
# This can be either intentional or a mistake, depending on the plugin
msg = _("Process {} in job {} at seqno {} set an outdata dataset but "
"it was empty.").format(job_step.process, job_step.jobid, job_step.seqno)
log_lcl.warning(msg)
elif(curr_outdata is curr_imputed):
# If the user didn't copy() when forming outdata from indata (imputed_file)
# the update will break. So give them a more informative error.
msg = _("Process {} in job {} at seqno {}. processor_data.outdata "
"should not reference the same dataframe as indata.").format(
job_step.process, job_step.jobid, job_step.seqno)
log_lcl.exception(msg)
raise UserDefinedPluginError(msg)
else:
# Updates imputed_file using outdata
bd.update_imputed_file()
# Note that we no longer keep the non-cumulative files
# Now that these have been used in their respective update methods and saved under
# new names (if desired), we remove these references so that the next proc doesn't
# accidentally use the same dataset.
bd.pop_dataset("outstatus")
bd.pop_dataset("outdata")
# Remove the single-step outputs and update their respective cumulative version
bd.update_cumulatives(pre_execute_datasets)
# Print the footer string, which also logs time/duration if specified in options
end_time = datetime.now() # noqa: DTZ005
# list indexed from 0 so add 1 to accurately count steps
# The log's Streamhandler is set to only ERROR, therefore need to print()
# as we want this to be the only other thing seen in console
# Also log to INFO so that it will also show in the log file, if one is set
log_lcl.info(self._get_step_footer(bd.curr_step_count, start_time, end_time))
write_to_console(self._get_console_step_footer(bd.curr_step_count))
# Reset local variables so the garbage collector can clean them up
curr_outdata = None
curr_outstatus = None
curr_imputed = None
curr_status_file = None
curr_proc = None
msg = None
end_time = None
# Just in case
pre_execute_datasets = None
if(not self.is_child_block):
msg = self._get_execution_footer()
write_to_console(msg)
log_lcl.info(msg)
# clean the status file one last time
bd.clean_status_file()
[docs]
def save_outputs(self) -> None:
"""Call the save_outputs method for the current job's processor_data."""
self.processor_data.save_outputs()
def _get_execution_header(self) -> str:
border_string = "="*112
msg = "\n" + border_string + "\n"
msg += _("Job ID : {}\n").format(self.processor_data.input_params.job_id)
msg += _("Banff Processor Version : {}\n").format(bpp.__version__)
msg += _("Banff Version : {}\n").format(banff.__version__)
msg += _("Support Email : {}\n").format("banff@statcan.gc.ca")
msg += _("Start Time : {} ({})\n").format(datetime.now().strftime("%c"), datetime.now().astimezone().tzinfo) # noqa: DTZ005
msg += border_string
return msg
def _get_execution_footer(self) -> str:
time_store_table = self.processor_data.get_dataset("time_store")
total_duration = pc.max(time_store_table["total_job_duration"]).as_py()
border_string = "="*112
msg = "\n" + border_string + "\n"
msg += _("The Banff Processor executed {} successfully. ").format(self.processor_data.current_job_step.jobid)
msg += _("Duration: {}").format(total_duration)
msg += "\n" + border_string + "\n"
return msg
def _get_step_header(self) -> str:
"""Return a string containing the header text for the current job step.
:return: The header string for the current job step.
:rtype: str
"""
border_string = "="*100
msg = "\n" + border_string + "\n"
msg += _("JOBID: {}\n").format(self.processor_data.current_job_step.jobid)
msg += _("PROCESS: {}\n").format(self.processor_data.current_job_step.process)
msg += _("SEQNO: {}\n").format(self.processor_data.current_job_step.seqno)
msg += _("SPECID: {}\n").format(self.processor_data.current_job_step.specid)
msg += _("CONTROLID: {}\n").format(self.processor_data.current_job_step.controlid)
msg += border_string
return msg
def _get_step_footer(self, cur_step_index: int, start_time: datetime|float, end_time: datetime|float) -> str:
"""Return a string containing the footer text for the current job step which includes
the computed start/end time and duration. Also saves time values to the timestore dataset.
:param start_time: The start time of the job step as a datetime object
or a perf_counter float output, defaults to None
:type start_time: datetime | float | None, optional
:param end_time: The end time of the job step as a datetime object
or a perf_counter float output, defaults to None
:type end_time: datetime | float | None, optional
:raises TypeError: If `start_time` or `end_time` are not provided but the
TIME or TIMESTORE option is set
:raises TypeError: If `start_time` or `end_time` are not datetime or float
:return: The footer string for the current job step.
:rtype: str
"""
time_print = self.processor_data.input_params.time
time_store = self.processor_data.input_params.time_store
border_string = "="*100
job_progress = round((cur_step_index/self.processor_data.total_job_steps)*100)
footer = "\n" + border_string + "\n"
footer += _("END OF PROCESS: {}\n").format(self.processor_data.current_job_step.process)
footer += _("JOBID: {}\n").format(self.processor_data.current_job_step.jobid)
footer += _("SEQNO: {}\n").format(self.processor_data.current_job_step.seqno)
footer += _("SPECID: {}\n").format(self.processor_data.current_job_step.specid)
footer += _("JOB PROGRESS: {}/{} - {}%\n").format(cur_step_index, self.processor_data.total_job_steps, job_progress)
# If the start and end time were not provided, skip duration
if (start_time is None or end_time is None):
return (footer + border_string)
duration = end_time - start_time
start_stamp = ""
end_stamp = ""
if(isinstance(start_time, datetime) and isinstance(end_time, datetime)):
# datetime will give us ms but we need to truncate the string with [:-4]
# to get the same reduced precision as the SAS
start_stamp = start_time.strftime("%Y-%m-%d %H:%M:%S.%f")[:-4]
end_stamp = end_time.strftime("%Y-%m-%d %H:%M:%S.%f")[:-4]
if(time_print):
footer += _("START TIME: {}\n").format(start_stamp)
footer += _("END TIME: {}\n").format(end_stamp)
elif(isinstance(start_time, float) and isinstance(end_time, float)):
duration = timedelta(seconds=(duration))
else:
msg = _("start_time or end_time must be datetime objects or floats.")
log_lcl.exception(msg)
raise TypeError(msg)
# datetime will give us ms but we need to truncate the string with [:-4]
# to get the same reduced precision as the SAS
duration_stamp = str(duration)[:-4]
footer += _("STEP DURATION: {}\n").format(duration_stamp)
if(self.processor_data.total_job_duration is None):
self.processor_data.total_job_duration = duration
else:
self.processor_data.total_job_duration = self.processor_data.total_job_duration + duration
new_total_duration_stamp = str(self.processor_data.total_job_duration)[:-4]
footer += _("TOTAL DURATION: {}\n").format(new_total_duration_stamp)
if(time_store):
if(start_stamp and end_stamp):
self._store_time(start_stamp, end_stamp, duration_stamp, new_total_duration_stamp)
else:
msg = _("TIMESTORE option was set but start_time and end_time were not passed in as datetime stamps.")
log_lcl.warning(msg)
return (footer + border_string)
def _get_console_step_footer(self, cur_step_index: int) -> str:
def truncate_output(val: str, max_len: int) -> str:
return (val[:max_len - 2] + "..") if len(val) > max_len else val
job_progress = round((cur_step_index/self.processor_data.total_job_steps)*100)
job_progress = str(job_progress) + "%"
time_store_table = self.processor_data.get_dataset("time_store")
# Get the last (should be the latest) record in the time_store table
time_store_curr_rec = time_store_table.slice(offset=time_store_table.shape[0]-1)
duration = time_store_curr_rec["duration"][0].as_py()
total_duration = time_store_curr_rec["total_job_duration"][0].as_py()
step = f"{cur_step_index}/{self.processor_data.total_job_steps}"
curr_process = self.processor_data.current_job_step.process
if(curr_process == "job"):
curr_process = f"job:{self.processor_data.current_job_step.specid}"
footer = ""
if cur_step_index == 1:
footer = _("STEP JOBID PROCESS SEQNO END TIME DURATION TOTAL DURATION PROGRESS\n")
footer += "========= ========== ==================== ====== =============== =============== =============== ========\n"
footer += _("{:<9} {:<10} {:<20} {:<6} {:<15} {:<15} {:<15} {:>8}").format(step,
truncate_output(self.processor_data.current_job_step.jobid, 10),
truncate_output(curr_process, 20),
self.processor_data.current_job_step.seqno,
datetime.now().strftime("%X"), # noqa: DTZ005
duration,
total_duration,
job_progress)
return footer
def _store_time(self, start_stamp: str, end_stamp: str,
duration: str, total_job_duration: str) -> None:
"""Store the timestamps of the start and end time and the duration of the current job step to the timestore output dataset.
:param start_time: The timestamp of the procedure start time
:type start_time: str
:param end_time: The timestamp of the procedure end time
:type end_time: str
:param duration: The difference between the start and end times in seconds
:type duration: str
:param total_job_duration: The total duration of the job up to this point
:type total_job_duration: str
"""
specid = "" if self.processor_data.current_job_step.specid is None else self.processor_data.current_job_step.specid
time_store_new = pa.table({"job_id": [self.processor_data.current_job_step.jobid],
"process": [self.processor_data.current_job_step.process],
"seqno": [self.processor_data.current_job_step.seqno],
"specid": [specid],
"start_time": [start_stamp],
"end_time": [end_stamp],
"duration": [duration],
"total_job_duration": [total_job_duration]})
time_store_name = "time_store"
time_store_curr_obj = self.processor_data.get_dataset(time_store_name, ds_format="object")
# append a new record with the info for this job run
if(time_store_curr_obj is not None):
# As these 2 tables are only produced/modified here we shouldn't
# need to set any promote_options for now as it might obscure an error
time_store_curr_obj.ds = pa.concat_tables([time_store_curr_obj.ds, time_store_new])
else:
self.processor_data.set_dataset(time_store_name, time_store_new)
[docs]
def write_to_console(text: str) -> None:
"""Write text to the console.
Write the text to the console instead of the log file. This was put in
a function as there may be a better way to do this.
"""
print(text) # noqa: T201
[docs]
def get_args(args:list | str | None = None) -> argparse.ArgumentParser:
"""Create an argument parser.
Example args -> ["my_filename.xlsx", "-o", "/my/out/folder", "-l", "fr"]
"""
parser = argparse.ArgumentParser(description="Execute the Banff Processor using the parameters in the provided JSON file.\n\n"
"Exécutez le processeur Banff à l'aide des paramètres du fichier JSON fourni.",
formatter_class=RawTextHelpFormatter)
# First argument is positional, required by default
parser.add_argument("filename", help="The name of the JSON file that contains the Banff Processor parameters.\n"
"Le nom du fichier JSON qui contient les paramètres du processeur Banff.")
# Second argument is set by flag
parser.add_argument("-l", "--lang", default="en",
help="Set the language, supported langauges are en for English and fr for French. \n"
"Définissez la langue, les langues prises en charge sont en pour l'anglais et fr pour le français.")
return parser.parse_args(args)
[docs]
def main(iargs: list | str | None = None) -> None:
"""Call the Banff Processor function.
Used when running this module from the command line. Created to faciliate testing.
"""
if iargs is None:
iargs = sys.argv[1:]
args = get_args(iargs)
bpp.set_language(bpp.SupportedLanguage[args.lang])
my_bp = Processor.from_file(args.filename)
my_bp.execute()
my_bp.save_outputs()
[docs]
def init() -> None:
"""Call the main function.
Used when running this module from the command line. Created to faciliate testing.
"""
if __name__ == "__main__":
sys.exit(main())
init()