Source code for banffprocessor.metadata.metaobjects
import inspect # noqa: I001
import operator
from pathlib import Path
from typing import Any
from xml.parsers.expat import ExpatError
import duckdb
import banffprocessor.processor_logger as plg
from banffprocessor.exceptions import EmptyMetadataFileError, MetadataConstraintError
from banffprocessor.metadata import models
from banffprocessor.nls import _
from banffprocessor.metadata.models.metadataclass import MetadataClass
from banffprocessor.metadata.models.algorithms import Algorithms
from banffprocessor.metadata.models.editgroups import Editgroups
from banffprocessor.metadata.models.edits import Edits
from banffprocessor.metadata.models.estimators import Estimators
from banffprocessor.metadata.models.expressions import Expressions
from banffprocessor.metadata.models.jobs import Jobs
from banffprocessor.metadata.models.processcontrols import ProcessControls, ProcessControlType
from banffprocessor.metadata.models.processoutputs import ProcessOutputs
from banffprocessor.metadata.models.uservars import Uservars
from banffprocessor.metadata.models.varlists import Varlists
from banffprocessor.metadata.models.weights import Weights
log_lcl = plg.get_processor_child_logger("metadata")
[docs]
class MetaObjects:
"""Container class for collections of metadata objects."""
# The total number of job steps in the validated job sequence
total_job_steps: int
# The set of unique proc names in the validated job sequence
job_proc_names: set[str]
@property
def dbconn(self) -> duckdb.DuckDBPyConnection | None:
"""The currently connected database used to store metadata objects.
:return: The duckdbpyconnection currently being used to store metadata objects.
:rtype: duckdb.DuckDBPyConnection | None
"""
return self._dbconn
#Current implementation is based on a simple dictionary.
#- Keys are the name of the object type (which for metadata objects is the filename
# with no extension and a capitalized first character, i.e. donorspecs.xml -> Donorspecs)
#- Values are lists of objects with the same type as the key name, where each
# object should be one line or entry in the corresponding metadata table or file
def __init__(self, metadata_folder: str | Path | None = None, job_id: str | None = None,
dbconn: duckdb.DuckDBPyConnection | None = None) -> None:
"""Construct a MetaObjects object.
:param metadata_folder: The directory to load metadata files from, defaults to None
:type metadata_folder: str | Path | None, optional
:param job_id: The job_id to be run and whose job steps to validate, defaults to None
:type job_id: str | None, optional
:attribute total_job_steps: Used for tracking progress when performing execute()
:type total_job_steps: int
:raises FileNotFoundError: If the required Jobs metadata file cannot be found
:raises EmptyMetadataFileError: If the required Jobs metadata file is empty
:raises ExpatError: If there is an issue while reading the XML data from file
"""
self.meta_dict = {}
self.module_to_class = {}
self._dbconn = dbconn
if self._dbconn is None:
self._dbconn = duckdb.connect(database=":memory:")
self._is_private_connection = True
else:
self._is_private_connection = False
self.total_job_steps = None
# Build a list of classes that inherit from the MetadataClass in the metdata.models sub-module
for mod_name in models.__all__:
# Get the module itself and get its underlying class if it is a sub-class
# of the MetadataClass super-class (also skipping MetadataClass itself)
mod_tuple = inspect.getmembers(getattr(models, mod_name),
predicate=lambda o: inspect.isclass(o) and
issubclass(o, MetadataClass) and
o != MetadataClass)
if(mod_tuple):
# module names should already be lower case but just in case
self.module_to_class[mod_name.casefold()] = mod_tuple[0][1]
self.initialize_metadata()
if(metadata_folder):
# Grab the list of metadata files from the metadata folder
metadata_files = Path(metadata_folder).glob("*.xml")
# Currently loads all metadata files including Jobs entries and then
# expects the main processor loop to perform the jobid filter to get the list it needs.
# Alternatively we could perform that filter on load from file if we need the
# extra memory and there's no need for jobs entries under other jobids
for file in metadata_files:
try:
metadata_module_name = file.stem.casefold()
if(metadata_module_name in self.module_to_class):
# If the filename is the name of a metadata class, remove the
# class reference itself from our map
cls = self.module_to_class.get(metadata_module_name)
self.load_xml_file(file, cls)
else:
# If the file has a name that doesn't correspond with a metadata
# class, just skip it and log
msg = _("The following metadata file did not correspond to any existing "
"metadata model and was skipped: {}.").format(file.name)
log_lcl.info(msg)
continue
except EmptyMetadataFileError:
msg = _("Provided metadata file {} contains no valid entries.").format(file.name)
log_lcl.exception(msg)
raise
except ExpatError:
msg = _("Unable to read XML in {} metadata file.").format(file.name)
log_lcl.exception(msg)
raise
if(Jobs.get_record_count(dbconn=self._dbconn) == 0):
msg = _("Required Jobs metadata not found or unable to be read.")
log_lcl.exception(msg)
raise MetadataConstraintError(msg)
self.job_proc_names, self.total_job_steps = self.validate_job_sequence(job_steps=self.meta_dict[Jobs.__name__], job_id=job_id)
self.check_constraints()
self.display_load_summary()
def __del__(self) -> None:
self.cleanup_metadata()
if self._is_private_connection:
self._dbconn.close()
[docs]
def load_xml_file(self, metadata_file: Path, cls: type[MetadataClass]) -> None:
"""Load the metadata found in `metadata_file` into this class:`src.banffprocessor.metadata.MetaObjects` object.
The new entry added will have a key of the `cls` name and value of a collection of objects
of type `cls`.
:param metadata_file: The path of the XML file to load
:type metadata_file: pathlib.Path
:param cls: The metadata object type to load the file into
:type cls: type[:class:`src.banffprocessor.metadata.MetadataClass`]
:raises EmptyMetadataFileError: If the metadata file does not contain any valid entries
:raises MetadataConstraintError: If an entry in the metadata file contains values
that violate the constraints on the object type being loaded
"""
metafile_dict = cls.load_xml(metadata_file)
meta_objs = []
for index, entry in enumerate(metafile_dict[cls.__name__.casefold()], start=1):
try:
new_obj = cls(**entry, dbconn=self._dbconn)
meta_objs.append(new_obj)
except MetadataConstraintError as e:
log_lcl.exception()
msg = _("Metadata constraint violated while processing {} on entry {}").format(metadata_file.name, index)
log_lcl.exception(msg)
raise MetadataConstraintError(msg) from e
# Add list of metadata objects to our collection
self.add_objects_of_single_type(meta_objs)
[docs]
def add_objects_of_single_type(self, objects: list["MetadataClass"]) -> None:
"""Add a list of metadata objects all of the same type to the MetaObjects collection which can be retrieved using their type.
Only one list per object type can be added; if a second is added it will overwrite the original list stored under that type.
:param objects: The list of metadata objects to load
:type objects: list[:class:`src.banffprocessor.metadata.MetadataClass`]
:raises ValueError: If `objects` is empty or `None`
:raises TypeError: If `objects` contains objects of more than one type
"""
if(not objects):
msg = _("'objects' list is empty.")
log_lcl.exception(msg)
raise ValueError(msg)
objects_type = type(objects[0])
if(not all(isinstance(obj, objects_type) for obj in objects)):
msg = _("All objects in 'objects' list must be the same type.")
log_lcl.exception(msg)
raise TypeError(msg)
if(objects_type == ProcessControls):
objects = MetaObjects._load_process_control_dict(objects)
self.meta_dict[objects_type.__name__] = objects
[docs]
@staticmethod
def validate_job_sequence(job_steps: list["Jobs"], job_id: str | None = None) -> tuple[set[str], int]:
"""Iterate through `job_steps` and validates the sequence of all steps and process blocks contained/referenced in the job with `job_id`.
If `job_id` is not provided, the first job found in `job_steps` will be used as the starting point.
Returns a list of the unique proc names contained in the job sequence.
:param job_steps: A collection of Jobs metadata objects
:type job_steps: list[Jobs]
:param job_id: The job_id to be run and whose job steps to validate, defaults to None
:type job_id: str | None, optional
:raises MetadataConstraintError: If a job step of process "JOB" has a specid pointing
to a job_id that does not exist in the current Jobs metadata collection
:raises MetadataConstraintError: If a cycle exists in the graph of job_steps (i.e. a
step points to a process block which points back to the calling block, thus creating
an infinite loop)
:return: A set of the unique proc names contained in the job sequence and the total number
of job steps across the entire job.
:rtype: tuple[set[str], int]
"""
jobs_by_block = {}
root_id = job_id
# Group our jobs by jobid to separate each job block
for job in job_steps:
if(not root_id):
# If no job_id was provided we assume the first set of steps is the root job
root_id = job.jobid
if(job.jobid in jobs_by_block):
jobs_by_block[job.jobid].append(job)
else:
jobs_by_block[job.jobid] = [job]
# Sort each block on seqno
for block in jobs_by_block.values():
block.sort(key=operator.attrgetter("seqno"))
unique_proc_names = set()
current_blocks = []
total_step_count = 0
class BinaryNode:
def __init__(self, job: Jobs) -> None:
self.left = None
self.right = None
self.data = job
def add_block_to_graph(block_job_id: str, parent_node: BinaryNode = None) -> None:
# Keep track of the blocks that are currently in sequence to avoid cycles
current_blocks.append(block_job_id)
if(block_job_id not in jobs_by_block):
msg = _("No job steps were found with job_id {}.").format(block_job_id)
log_lcl.exception(msg)
raise MetadataConstraintError(msg)
prev_node = parent_node
# Iterate over the job and build our graph out from there
for step in jobs_by_block[block_job_id]:
new_node = BinaryNode(step)
# Every new node increases the step count
#NOTE: Can just take a total graph node count at the end if using a proper graphing library
nonlocal total_step_count
total_step_count += 1
# If we aren't the root we need to add an edge to the last node
if(prev_node):
if(prev_node.left):
prev_node.right = new_node
else:
prev_node.left = new_node
prev_node = new_node
# Save a list of all unique process names in the list of visited job steps
nonlocal unique_proc_names
unique_proc_names.add(step.process)
if(step.process == "job"):
# If we revisit a block that we've already added in the current sequence,
# we have found a cycle
if(step.specid in current_blocks):
msg = _("The Jobs sequence rooted at job_id {} contains a "
"cycle caused by job_id {} at seqno {}. As this will result "
"in an infinite loop the job must be rewritten to remove the "
"cycle.").format(root_id, block_job_id, step.seqno)
log_lcl.exception(msg)
raise MetadataConstraintError(msg)
# Otherwise recursively add this block (and its sub-blocks) to the graph
add_block_to_graph(step.specid, parent_node=prev_node)
# The block is completely processed, it may now be re-visited without cycling
current_blocks.remove(block_job_id)
add_block_to_graph(root_id)
#NOTE: Preferably just return a graph containing all nodes which reference a jobs object
# then the graph f'ns can be used to get this info indirectly
return unique_proc_names, total_step_count
@staticmethod
def _load_process_control_dict(controls: list[ProcessControls]) -> dict[str, Any]:
"""Load a list of :class:`src.banffprocessor.metadata.ProcessControls` to a
dict mapping controlids to dicts mapping targetfile names to the operations to
perform on them, which are dicts of the operation type to the ProcessControls
object with the control value.
{
"controlid1234": {
"indata": {
ProcessControlType.ROW_FILTER: [ProcessControls1, ProcessControls2],
ProcessControlType.COLUMN_FILTER: [ProcessControls3, ProcessControls4],
},
},
}
:param controls: List of :class:`src.banffprocessor.metadata.ProcessControls` objects
to add to a dictionary
:type controls: list[ProcessControls]
:return: The dictionary containing the categorized mappings of `controls`
:rtype: dict[str, Any]
"""
control_dict = {}
for control in controls:
if control.controlid in control_dict:
file_to_type = control_dict[control.controlid]
else:
file_to_type = {}
control_dict[control.controlid] = file_to_type
if control.targetfile in file_to_type:
type_to_control = file_to_type[control.targetfile]
else:
type_to_control = {}
file_to_type[control.targetfile] = type_to_control
if control.parameter in type_to_control:
type_to_control[control.parameter].append(control)
else:
type_to_control[control.parameter] = [control]
return control_dict
[docs]
def get_objects_of_type(self, cls: type[MetadataClass]) -> list[MetadataClass] | dict[str, Any]:
"""Get the list of metadata objects of type `cls`.
If no objects are found, an empty list is returned.
:param cls: The class reference of the object type to fetch
:type cls: type[:class:`src.banffprocessor.metadata.MetadataClass`]
:return: A list of all type `cls` objects found in this `MetaObjects` object or a special
dictionary if objects are of type :class:`src.banffprocessor.metadata.ProcessControls`
:rtype: list[:class:`src.banffprocessor.metadata.MetadataClass`] | dict[str, Any]
"""
if(cls.__name__ not in self.meta_dict):
return []
return self.meta_dict[cls.__name__]
[docs]
def get_job_steps(self, jobid: str | None) -> list[Jobs]:
"""Get and returns the list of Jobs objects with `jobid` sorted in ascending order of their seqno.
If no objects are found under the `jobid` an empty list is returned.
:param jobid: The class reference of the object type to fetch
:type jobid: str | None
:return: A list of the :class:`src.banffprocessor.metadata.Jobs` objects
with jobid `jobid`
:rtype: list[:class:`src.banffprocessor.metadata.Jobs`]
"""
if(not jobid or Jobs.__name__ not in self.meta_dict):
return []
return sorted([x for x in self.meta_dict[Jobs.__name__] if x.jobid == jobid],
key=operator.attrgetter("seqno"))
[docs]
def get_specs_obj(self, cls: type[MetadataClass], specid: str) -> MetadataClass:
"""Get and return the object of type `cls` with the specid `specid`.
Only one result should be found for the specified `specid` as it is effectively
a primary key for its metadata table. If no object is found for the specid
`None` is returned.
:param cls: The metadata object type to search for
:type cls: type[:class:`src.banffprocessor.metadata.MetadataClass`]
:param specid: The specid to match on objects of type `cls`
:type specid: str
:raises MetadataConstraintError: If multiple :class:`src.banffprocessor.metadata.MetadataClass`
objects are found under `specid`
:return: The object with type `cls` and specid `specid` or `None` if
not found
:rtype: :class:`src.banffprocessor.metadata.MetadataClass`
"""
if(not specid or cls.__name__ not in self.meta_dict):
return None
spec_objs = [x for x in self.meta_dict[cls.__name__] if x.specid == specid]
# SpecID should be essentially a primary key in a Spec table (can be repeated in Jobs table)
if(len(spec_objs) > 1):
msg = _("Multiple entries using same specid ({}) in {} table. "
"SpecIDs must be unique in Spec tables.").format(specid, cls.__name__)
log_lcl.exception(msg)
raise MetadataConstraintError
return (spec_objs[0] if spec_objs else None)
[docs]
def get_varlist_fieldids(self, varid: str | None) -> list[str]:
"""Given a list of varlist objects gets and returns the list of fieldids of the varlist
objects associated with the specified varid and sorts it on seqno.
If no variables are found under the varid, an empty list is returned.
:param varid: the ID to filter the varlists on
:type varid: str | None
:return: A list of varlist objects with varid `varid` sorted on their `seqno` and
an empty list if no objects are found
:rtype: list[str]
"""
if(not varid):
return []
varlists = self.get_objects_of_type(Varlists)
return [y.fieldid for y in sorted([x for x in varlists if x.varlistid == varid],
key=operator.attrgetter("seqno"))]
[docs]
def get_edits_string(self, editgroupid: str) -> str:
"""Get and return a string containing the list of edits in the :class:`src.banffprocessor.metadata.Edits`
objects associated with the specified `editgroupid`.
The string is formed by concatenating the formed edit strings from the edits objects
with a semi-colon and space as well as prepending each edit with its modifier (if present)
and a colon and space. If no edits are found under the `editgroupid`, an empty string is returned.
i.e. "PASS: a > b; FAIL: c + d <= e; f - g = h;"
:param editgroupid: the ID to filter the :class:`src.banffprocessor.metadata.Editgroups` on
:type editgroupid: str
:return: The semi-colon separated list of formed edits as a single string, empty
if no edits were found
:rtype: str
"""
if(not editgroupid):
return ""
editgroups = self.get_objects_of_type(Editgroups)
edits = self.get_objects_of_type(Edits)
edits_list = [x.edit for x in edits if x.editid in
[y.editid for y in editgroups if y.editgroupid == editgroupid]
]
return " ".join(edits_list)
[docs]
def get_weights_string(self, weightid: str) -> str:
"""Get and return a string containing the list of weights in the
:class:`src.banffprocessor.metadata.Weights` objects
associated with the specified `weightid` sorted in descending order by weight,
and formed by concatenating the formed weight strings from the objects with a
semi-colon and space.
If no :class:`src.banffprocessor.metadata.Weights` are found under the `weightid`,
an empty string is returned.
i.e. "field1=9.0; field2=7.0; field3=5.0;"
:param weightid: the ID to filter the :class:`src.banffprocessor.metadata.Weights` on
:type weightid: str
:return: The semi-colon separated list of formed weights as a single string, empty
if no :class:`src.banffprocessor.metadata.Weights` were found
:rtype: str
"""
if(not weightid):
return ""
weights = self.get_objects_of_type(Weights)
weight_string_list = [y.weight_string for y in
sorted([x for x in weights if x.weightid == weightid],
key=operator.attrgetter("weight"), reverse=True)
]
return " ".join(weight_string_list)
[docs]
def get_expression(self, exprid: str) -> str:
"""Get the expression string associated with the specified `exprid`.
:param exprid: The identifier of the Expression to get.
:type exprid: str
:return: The `expressions` field value of the Expression object fetched.
:rtype: str
"""
return Expressions.get_expression(exprid, dbconn=self._dbconn)
[docs]
def get_estimators(self, estid: str) -> list[Estimators]:
"""Get and return the list of :class:`src.banffprocessor.metadata.Estimators` objects
associated with the specified `estid` and sorted by their `seqno`.
If no variables are found under the `estid`, an empty list is returned.
:param estid: the ID to filter the :class:`src.banffprocessor.metadata.Estimators` on
:type estid: str
:return: A list of :class:`src.banffprocessor.metadata.Estimators` objects or an
empty list if no :class:`src.banffprocessor.metadata.Estimators` are found under
the `estid`
:rtype: list[:class:`src.banffprocessor.metadata.Estimators`]
"""
if(not estid):
return []
estimators = self.get_objects_of_type(Estimators)
return sorted([x for x in estimators if x.estimatorid == estid], key=operator.attrgetter("seqno"))
[docs]
def get_user_vars_dict(self, specid: str, process: str) -> dict[str,str]:
"""Get Uservars objects identified by the `specid` and `process` and return a dict mapping
the Uservars `var` to its `value`.
:param specid: The specid identifying Uservars to fetch.
:type specid: str
:param process: The process value of the Uservars to fetch.
:type process: str
:return: A dictionary mapping the fetched Uservars `var` field to their `value`
:rtype: dict[str,str]
"""
return Uservars.uservars_to_dict(specid, process, dbconn=self._dbconn)
[docs]
def get_algorithm(self, algorithmname: str) -> Algorithms | None:
"""Get and return the :class:`src.banffprocessor.metadata.Algorithms` object associated with the specified `algorithmname`.
:param algorithmname: The algorithmname of the :class:`src.banffprocessor.metadata.Algorithms`
object to retrieve
:return: The :class:`src.banffprocessor.metadata.Algorithms` object with the specified
`algorithmname`
:rtype: :class:`src.banffprocessor.metadata.Algorithms` | None
"""
if(not algorithmname):
return None
algorithms = self.get_objects_of_type(Algorithms)
algo = next((x for x in algorithms if x.algorithmname.upper() == algorithmname.upper()), None)
if algo is None:
# If is key is provided and not found, we should return an error.
msg = _("The following value was not found: {} = '{}'.").format("algorithms.algorithm_name", algorithmname)
raise ValueError(msg)
return algo
[docs]
def get_process_controls(self, controlid: str) -> dict[str, dict[ProcessControlType, list[ProcessControls]]]:
"""Get and return a mapping of targetfile names to a dict of `parameter` values to
their list of :class:`src.banffprocessor.metadata.ProcessControls` objects associated
with the specified `controlid`.
The lists are sorted on the enum value of the `parameter`
field to ensure that a regular list traversal will always pass over controls in
the order that they should be applied to the `target_file`.
If no variables are found under the `controlid`, an empty dict is returned.
i.e.
{
"indata": {
ProcessControlType.ROW_FILTER: [ProcessControls1, ProcessControls2],
ProcessControlType.COLUMN_FILTER: [ProcessControls3, ProcessControls4],
},
}
:param controlid: the ID to filter the :class:`src.banffprocessor.metadata.ProcessControls` on
:type controlid: str
:return: A dict of target file names mapped to dicts of
:class:`src.banffprocessor.metadata.ProcessControlType` mapped to lists of
:class:`src.banffprocessor.metadata.ProcessControls` of that type for
that targetfile, or an empty list if no records are found under the `controlid`
:rtype: dict[str, :class:`src.banffprocessor.metadata.ProcessControls`]
"""
if(not controlid):
return {}
controls = self.get_objects_of_type(ProcessControls)
if(controlid not in controls):
return {}
return controls[controlid]
[docs]
def get_process_outputs(self, process: str) -> list[str]:
"""Get and return the list of output_name strings for `process`.
If no objects are found under `process` an empty list is returned.
:param process: The name of the process value to retrieve records of
:type process: str
:return: A list of the output_name attributes of the
:class:`banffprocessor.metadata.ProcessOutputs` objects with process `process`
:rtype: list[str]
"""
if(not process or ProcessOutputs.__name__ not in self.meta_dict):
return []
return [x.output_name for x in self.meta_dict[ProcessOutputs.__name__] if x.process == process.casefold()]
[docs]
def initialize_metadata(self) -> None:
"""Perform the initialize() method on each metadata class type loaded to this object."""
for cls in self.module_to_class.items():
cls[1].initialize(dbconn=self._dbconn)
[docs]
def cleanup_metadata(self) -> None:
"""Perform the cleanup() method on each metadata class type loaded to this object."""
for cls in self.module_to_class.items():
cls[1].cleanup(dbconn=self._dbconn)
[docs]
def check_constraints(self) -> None:
"""Perform the check_constraints method on each metadata class type loaded to this object."""
for cls in self.module_to_class.items():
cls[1].check_constraints(dbconn=self._dbconn)
[docs]
def display_load_summary(self) -> None:
"""Display a summary of the Metadata files that were loaded to memory."""
border_string = "="*100
summary = "\n" + border_string
summary += _("\n Metadata Load Summary \n")
summary += border_string + "\n"
for cls in self.module_to_class.items():
summary += f" {cls[0].capitalize() : <30} {cls[1].get_record_count(dbconn=self._dbconn) : >10} \n"
summary += border_string + "\n"
log_lcl.info(summary)
summary.format()