Source code for banffprocessor.util.dataset

"""Model for a Banff Processor working dataset."""

from typing import Any

import duckdb
import pyarrow as pa

import banffprocessor.processor_logger as plg
from banffprocessor.nls import _

log_lcl = plg.get_processor_child_logger("dataset")

[docs] class Dataset: """Model for a Banff Processor working dataset.""" name: str _ds_cumulative: pa.Table _ds_filtered: pa.Table _ds_curr_output: pa.Table _dbconn: duckdb.DuckDBPyConnection _is_private_connection: bool @property def dbconn(self) -> duckdb.DuckDBPyConnection | None: """Return a connection to the database being used.""" return self._dbconn def __init__(self, name: str, ds: pa.Table, dbconn: duckdb.DuckDBPyConnection | None = None) -> None: """Create a Banff Processor Dataset. :param name: The name of the dataset that the dataset will be identified by. :type name: str :param ds: The arrow table with the underlying data that this object will store. :type ds: pa.Table :param dbconn: The DuckDB connection to register cumulative datasets to, defaults to None :type dbconn: duckdb.DuckDBPyConnection | None, optional """ self.name = name.casefold() self._ds_cumulative = ds self._ds_filtered = None self._ds_curr_output = None self._dbconn = dbconn if self._dbconn is None: self._dbconn = duckdb.connect(database=":memory:") self._is_private_connection = True else: self._is_private_connection = False self.register_table() def __del__(self) -> None: """Ensure memory is released and the database connection is closed.""" self._ds_cumulative = None self._ds_curr_output = None self._ds_filtered = None if self._is_private_connection: self._dbconn.close() @property def ds(self) -> pa.Table: """Return the dataset as an Arrow Table.""" return self._ds_cumulative @ds.setter def ds(self, value: pa.Table) -> None: """Set the dataset using the provided Arrow Table.""" self._ds_cumulative = value self.register_table() @property def ds_filtered(self) -> pa.Table: """Return the filtered version of the dataset as an Arrow table.""" return self._ds_filtered @ds_filtered.setter def ds_filtered(self, value: pa.Table) -> None: """Set the filtered version of the dataset using the provided Arrow table.""" #NOTE: we don't re-register the filtered dataset to duckdb over the unfiltered # version. This is because if multiple process controls exist for a single step, # one control may filter instatus and then the 2nd control to be applied will # reference instatus to filter another file. We want the 2nd control to be able # to reference the original file, thus we don't replace the duckdb registration # of the original table with the filtered one. self._ds_filtered = value @property def ds_curr_output(self) -> pa.Table: """Return the dataset output by the current proc as an Arrow table.""" return self._ds_curr_output @ds_curr_output.setter def ds_curr_output(self, value: pa.Table) -> None: """Set the version of this dataset output by the current proc using the provided Arrow table.""" self._ds_curr_output = value
[docs] def register_table(self) -> None: """Register `ds` in the in-memory duckdb instance under `name` or the alias of `name`, if one exists.""" if(not table_empty(self.ds)): # Register the Dataset with duckdb so the latest version will always # be accesible for process controls alias_name = get_dataset_name_alias(self.name) self.dbconn.register(alias_name if alias_name is not None else self.name, self.ds) else: msg = _("Dataset {} was empty and so was not able to be registered to Duckdb.").format(self.name) log_lcl.warning(msg)
[docs] def unregister_table(self) -> None: """Un-registers the dataset in the in-memory duckdb instance under `name` or the alias of `name`, if one exists.""" # Unregister the tables when we remove them alias_name = get_dataset_name_alias(self.name) self.dbconn.unregister(alias_name if alias_name is not None else self.name)
# The real name is the only name actually stored in the datasets collection. # Aliases are solely used for users identifying datasets for process controls or UDPs, thus # aliases are used to register tables in duckdb _alias_to_real = { "indata": "imputed_file", "instatus": "status_file", "cumulative_status_all": "status_log", } _real_to_alias = { "imputed_file": "indata", "status_file": "instatus", "status_log": "cumulative_status_all", }
[docs] def get_dataset_name_alias(name: str) -> str | None: """Get the alias name of the given dataset name. Casefold() `name` and get the aliased name of the dataset `name`, if `name` exists with an alias. If no alias exists, None is returned. :param name: The name of the dataset to get the aliased name of :type name: str :return: `name` casefolded, or the proper dataset name if `name` is an alias :rtype: str """ name_lower = name.casefold() if(name_lower in _real_to_alias): return _real_to_alias[name_lower] return None
[docs] def get_dataset_real_name(name: str) -> str: """Get the real name of the given dataset name. Casefold() `name` and get the actual name of the dataset `name`, if `name` exists as an alias. If no alias exists just returns the casefolded `name`. :param name: The name of the dataset to get the proper name of :type name: str :return: `name` casefolded, or the proper dataset name if `name` is an alias :rtype: str """ name_lower = name.casefold() if(name_lower in _alias_to_real): return _alias_to_real[name_lower] return name_lower
##### # Helper methods for working with pyarrow. In case I find a better way of doing these # operations later, it's easier to just change them one time here #####
[docs] def table_empty(table: pa.Table) -> bool: """Determine if `table` is empty (has no rows nor columns). :param table: The table to check :type table: pa.Table :return: True if the table's shape is (0,0), false otherwise :rtype: bool """ return (table.shape == (0,0))
[docs] def copy_table(to_copy: pa.Table) -> pa.Table: """Return a copy of a PyArrow Table. :param to_copy: The table to make a copy of :type to_copy: pa.Table :return: A new table containing the data and metadata of `to_copy` :rtype: pa.Table """ # This will create a copy of the table information but not a copy of the data. # However, a PyArrow Table's arrays are immutable and therefore any changes to the # data for one of the tables would not affect the other. return to_copy.select(to_copy.column_names)
[docs] def add_single_value_column(table: pa.Table, column_name: str, value: Any, dtype: pa.DataType | None = None) -> pa.Table: """Add a new column to `table` where every row of the column contains the same value and the column is the same length as `table`. :param table: The table to append the new column to :type table: pa.Table :param column_name: The name for the new column :type column_name: str :param value: The value to use for each row of the new column :type value: Any :param dtype: The PyArrow DataType to use for the columnc, defaults to None :type dtype: pa.DataType, optional :return: `table` with the new column appended to it :rtype: pa.Table """ # Equivalent to: my_df[column_name] = value if(dtype): return table.append_column(column_name, pa.array([value]*len(table), dtype)) return table.append_column(column_name, pa.array([value]*len(table)))