Source code for banffprocessor.procedures.banff_procedures.job_proc
# Note that importing the Processor class directly leads to circular import (from banffprocessor.processor import Processor)
import banffprocessor.processor as bpp
# Import must be absolute in order to ensure all modules reference the same global _c_handlers
import banffprocessor.processor_logger as plg
from banffprocessor.nls import _
from banffprocessor.procedures import factory
from banffprocessor.processor_data import ProcessorData
# Setup local log for processor module specifically
log_lcl = plg.get_processor_child_logger("jobproc")
[docs]
class JobProc:
"""Implements the execution of a new process block as a `:class:banffprocessor.procedures.procedure_interface`."""
[docs]
@classmethod
def execute(cls, processor_data: ProcessorData) -> int:
"""Execute a process block indicated by the "JOB" process for a particular job step."""
# Jobid of the calling block
curr_jobid = processor_data.input_params.job_id
# Jobid for the block being called
sub_jobid = processor_data.current_job_step.specid
msg = _("Beginning execution of job {} from job {} at seqno {}").format(
sub_jobid, curr_jobid, processor_data.current_job_step.seqno)
log_lcl.info(msg)
# Call a new instance of the Banff Processor
# Supplying parameters directly, instead of an input file
my_bp = bpp.Processor(initial_data=processor_data.get_new_block_copy())
my_bp.execute()
my_bp.save_outputs()
# Bring all of the sub-block's datasets in our dataset collection
processor_data.update_with_block_result(my_bp.processor_data)
# Copy over the custom_outputs list, which should hold the expected output tables for the
# last job step of the process block. This will let the processor know it needs to update
# and not discard the outputs from our block, since the ProcessOutputs metadata is set
# on the proc that ran in the job block, not the job step that calls the block.
processor_data.custom_outputs = my_bp.processor_data.custom_outputs.copy()
msg = _("Job {} complete.").format(sub_jobid)
log_lcl.info(msg)
my_bp = None
return 0
[docs]
def register(factory: factory) -> None:
"""Register this procedure class in the Banff processor procedure factory."""
factory.register("job", JobProc)