Source code for banff._common.src.testing.assert_helper

import io  # for printing: creating StringIO buffer
import re  # preprocessing console log
import sys
from pathlib import Path

import numpy as np  # referring to types, like `np.int64`
import pandas as pd
import pyarrow as pa
import pyarrow.compute as pc

from .._log import get_misc_logger
from ..io_util import (
    load_input_dataset,
)
from ..io_util.io_util import (
    interm_to_DF,
    interm_to_PAT,
)
from ..io_util.processing import (
    handle_arrow_string_data,
)

# dictionary of keyword arguments passed in each procedure call
_default_proc_kwargs = {}

def assert_datasets_equal(ds_list, round_data=None, drop_columns=None, upcase_columns=None):
    """Assert that list of dataset pairs are equal.

    takes a list of dataset pairs (`[test_dataset, control_dataset]`) and comparison options,
    calls `assert_dataset_equal()` on each dataset pair in the list.
        `ds_list = [[<test_ds_1, control_ds_1], [<test_ds2, control_ds2], ...]`
         - *test_dataset* must be correspond to the procedure call's `_<dataset-name>` member (subclass of `..io_util.StcTable`).
         - *control_dataset* must be a type supported by `assert_dataset_equal(control_dataset=)`
    # Override options
        - To override option parameters for a specific dataset pair, specify a dictionary as a third member in that pair
            - for example, to override the `drop_columns` use `[test_dataset, control_dataset, {'drop_columns':None}]`
    """
    # create options dictionary using function parameters
    options_parm = {
        "round_data"    : round_data,
        "drop_columns"  : drop_columns,
        "upcase_columns": upcase_columns,
    }

    # process each dataset pair
    for ds_item in ds_list:
        # extract datasets
        ds_test = ds_item[0]
        ds_control = ds_item[1]

        if ds_control is None:
            continue  # not specified by user, go to next pair

        # validate dataset types
        allowed_types = (pd.DataFrame, pa.Table)
        if (not isinstance(ds_control, allowed_types)
            or not isinstance(ds_test.user_output, allowed_types)
        ):
            print(f"control or test dataset not of a valid type, skipping {ds_test.name}")
            print(f"  allowed types: {allowed_types}")
            print(f"  control dataset type: {type(ds_control)}")
            print(f"  test dataset type: {type(ds_test.user_output)}")
            mesg = "assert_datasets_equal(): received invalid dataset format"
            raise TypeError(mesg)

        # build options dictionary
        options_run = options_parm.copy()
        if len(ds_item) >= 3: # optional third member is dict of parameter overrides
            options_run.update(ds_item[2])

        assert_dataset_equal(
            round_data      = options_run["round_data"],
            drop_columns    = options_run["drop_columns"],
            upcase_columns  = options_run["upcase_columns"],
            dataset_name    = ds_test.name,
            test_dataset    = ds_test.user_output,
            control_dataset = ds_control,
        )

[docs] def assert_dataset_equal( test_dataset, control_dataset, dataset_name, upcase_columns=False, sort_columns=True, sort_values=True, round_data=None, convert_columns=True, drop_columns=None, compare_with=None, ): """Check that test and control datasets are sufficiently equal. Handles common issues such as - empty datasets - some type mismatches - different column sort order - float precision issues `dataset_name` : str used in print statements `sort_columns` : bool sort both dataset's columns before comparison `sort_values` : bool sort values of all columns in both dataset's before comparison `round_data` : int | None If integer, round floating point values to `round_data` decimal places `convert_columns` : bool convert integer columns to floating point columns `drop_columns` : str | list of str drop these columns, if found, on control datasets prior to comparison `compare_with` : None | `pandas.DataFrame` | `pyarrow.Table` Convert test and control datasets to the specified format for comparison. When unspecified (or `None`) use the the type that `test_dataset` uses. """ print(f"assert_dataset_equal(dataset_name={dataset_name})") # determine format/library to use for comparison if compare_with is None: if isinstance(test_dataset, pd.DataFrame): compare_with = pd.DataFrame elif isinstance(test_dataset, pa.Table): compare_with = pa.Table print(f" comparing as {compare_with}") try: if compare_with is pa.Table: # convert test and control datasets to proper format test_dataset = interm_to_PAT(test_dataset) control_dataset = interm_to_PAT(control_dataset) # set both dataset's string data to same type test_dataset=handle_arrow_string_data(test_dataset) control_dataset=handle_arrow_string_data(control_dataset) # handle parameter: drop_columns if isinstance(drop_columns, (str, list)): # promote string to list of strings if isinstance(drop_columns, str): drop_columns=drop_columns.split() print(f" drop columns: {drop_columns}") # remove columns for drop_col in drop_columns: drop_col = drop_col.upper() for in_col in control_dataset.column_names: if in_col.upper() == drop_col: print(f" dropped column '{in_col}' from control dataset") control_dataset = control_dataset.drop_columns(columns=in_col) for in_col in test_dataset.column_names: if in_col.upper() == drop_col: print(f" dropped column '{in_col}' from test dataset") test_dataset = test_dataset.drop_columns(columns=in_col) # handle parameter: round data if isinstance(round_data, int): def round_pat(pat): new_cols = [] for col in pat.columns: if pa.types.is_floating(col.type): new_cols.append(pc.round(col, ndigits=round_data)) else: new_cols.append(col) pat = pa.table(data=new_cols, schema=pat.schema) return pat test_dataset = round_pat(test_dataset) control_dataset = round_pat(control_dataset) # handle parameter: upcase_columns if upcase_columns: print(f" upcase columns: {upcase_columns}") test_dataset = test_dataset.rename_columns([x.upper() for x in test_dataset.column_names]) control_dataset = control_dataset.rename_columns([x.upper() for x in control_dataset.column_names]) # handle parameter: sort_columns if sort_columns: print(" sort columns: True") # sort columns test_dataset = test_dataset.select(sorted(test_dataset.column_names)) control_dataset = control_dataset.select(sorted(control_dataset.column_names)) # handle parameter: sort_values if sort_values: print(" sort values: True") # sort values sort_keys_test = [(name, "ascending") for name in test_dataset.column_names] sort_indices_test = pc.sort_indices(test_dataset, sort_keys_test) test_dataset = pc.take(test_dataset, sort_indices_test) sort_keys_control = [(name, "ascending") for name in control_dataset.column_names] sort_indices_control = pc.sort_indices(control_dataset, sort_keys_control) control_dataset = pc.take(control_dataset, sort_indices_control) # test for equality assert test_dataset.equals(control_dataset), f"assert_dataset_equal: {dataset_name} dataset does not match expected data" elif compare_with is pd.DataFrame: # convert test and control datasets to proper format test_dataset = interm_to_DF(test_dataset) control_dataset = interm_to_DF(control_dataset) # handle parameter: drop_columns if isinstance(drop_columns, (str, list)): # promote string to list of strings if isinstance(drop_columns, str): drop_columns=drop_columns.split() print(f" drop columns: {drop_columns}") # remove columns for drop_col in drop_columns: drop_col = drop_col.upper() for in_col in control_dataset.columns.to_list(): if in_col.upper() == drop_col: print(f" dropped column '{in_col}' from control dataset") control_dataset.drop(columns=in_col, inplace=True) for in_col in test_dataset.columns.to_list(): if in_col.upper() == drop_col: print(f" dropped column '{in_col}' from test dataset") test_dataset.drop(columns=in_col, inplace=True) # handle parameter: upcase_columns if upcase_columns: print(f" upcase columns: {upcase_columns}") test_dataset = test_dataset.rename(columns={x: x.upper() for x in test_dataset.columns.to_list()}) control_dataset = control_dataset.rename(columns={x: x.upper() for x in control_dataset.columns.to_list()}) # handle parameter: sort_columns if sort_columns: print(" sort columns: True") test_dataset = test_dataset.reindex(sorted(test_dataset.columns), axis=1) control_dataset = control_dataset.reindex(sorted(control_dataset.columns), axis=1) # handle parameter: sort_values if sort_values: print(" sort values: True") try: test_dataset_s = test_dataset.transform(np.sort) control_dataset_s = control_dataset.transform(np.sort) # only overwrite original if both sorted without exception test_dataset = test_dataset_s control_dataset = control_dataset_s except Exception: # noqa: BLE001 print(" unable to sort values, continuing with original sort order") if test_dataset.empty and control_dataset.empty: print(" both datasets are empty") assert test_dataset.columns.equals(control_dataset.columns), f"assert_dataset_equal: {dataset_name} datasets both empty, but columns differ" else: # set both dataset's numeric data to same type if convert_columns: for ds in [test_dataset, control_dataset]: # find integer columns int_columns = ds.select_dtypes(np.int64).columns # convert them to float columns ds[int_columns] = ds[int_columns].astype(np.float64) # handle parameter: round data if isinstance(round_data, int): print(" round data: {} decimal places".format(round_data)) test_dataset = test_dataset.round(decimals=round_data) control_dataset = control_dataset.round(decimals=round_data) assert test_dataset.equals(control_dataset), f"assert_dataset_equal: {dataset_name} dataset does not match expected data" print(" datasets equal: True") except Exception: print(" datasets equal: False") print("\n ************** Inspecting Dataset Differences: START *********************") inspect_dataset_difference(test_dataset=test_dataset, control_dataset=control_dataset) print(" *************** Inspecting Dataset Differences: END **********************\n") raise
[docs] def assert_dataset_value(dataset=None, dataset_name="", row_num=None, col_name=None, expected_values=None): """Validate specific dataset value against set of expected values. Specity the row number (int, 0-index) and column name (str) a list ['of', 'expected', 'values'] """ print("assert_dataset_value(...)") print(f" dataset: '{dataset_name}' (row {row_num}, column '{col_name}')") print(f" expecting: {expected_values}") # ensure dataset in `pandas.DataFrame` format dataset = interm_to_DF(dataset) actual_value = dataset.at[row_num, col_name] print(f" found: {actual_value}") assert actual_value in expected_values, f"assert_dataset_value: unexpected value found: {actual_value}"
[docs] def assert_log_contains(msg, test_log, clean_whitespace=False): """Check for `msg` in `test log`. Assert that it is found See `assert_log_consistent` for more details """ print(f"assert_log_contains(clean_whitespace={clean_whitespace}, ...)") print(f" searching for: {msg}") # preprocess inputs if clean_whitespace: msg_P = preprocess_input(msg) test_log_P = preprocess_input(test_log) else: msg_P = msg test_log_P = test_log # replace '\r\n' with '\n' msg_P = msg_P.replace("\r\n", "\n") test_log_P = test_log_P.replace("\r\n", "\n") # perform the check in_test = msg_P in test_log_P print(f" found in test log: {in_test}") assert in_test, f"assert_log_contains: could not find '{msg}'"
[docs] def assert_log_consistent(msg, test_log, sas_log_path, must_exist=False, clean_whitespace=False): """Check for `msg` in `test_log` and in log file found at `sas_log_path`. assert that it is either present in both, or not present in both i.e. assert fails if there's an inconsistency w.r.t. presence Tighten the check using `must_exist=True` - assert also fails if `msg` not found in the SAS log """ Warning(f"`assert_log_consistent` deprecated, using assert_log_contains, ignoring must_exist ({must_exist}) and sas_log_path ({sas_log_path})") return assert_log_contains(msg=msg, test_log=test_log, clean_whitespace=clean_whitespace)
def assert_substr_count_consistent(substr_to_count, test_log, sas_log_path): """Check count of `substr_to_count` in `test_log` and. in the log file found at `sas_log_path`. asserts that the counts are equal """ print(f"searching for: {substr_to_count}") sas_log = read_log_file(sas_log_path) sas_log_count = sas_log.count(substr_to_count) test_log_count = test_log.count(substr_to_count) print(f" sas_log_count: {sas_log_count}") print(f" test_log_count: {test_log_count}") mesg = f"assert_substr_count_consistent: inconsistent count of '{substr_to_count}', TEST has {test_log_count}, SAS has {sas_log_count}" assert sas_log_count == test_log_count, mesg
[docs] def assert_substr_count(substr_to_count="ERROR:", test_log=None, expected_count = 0): """Calculate count of `substr_to_count` in `test_log`. Asserts that it is equal to `expected_count` """ print("assert_substr_count(...)") print(f" searching for: {substr_to_count}") print(f" expected_count: {expected_count}") if test_log is None: mesg = "test_log must be non-empty `str`" raise ValueError(mesg) test_count = test_log.count(substr_to_count) print(f" test_log count: {test_count}") # the following two lines collectively assert that `test_count == expected_count`, while allowing specific error messaging assert test_count >= expected_count, f"assert_substr_count: found {expected_count - test_count} fewer than expected of '{substr_to_count}'" assert test_count <= expected_count, f"assert_substr_count: found {test_count - expected_count} more than expected of '{substr_to_count}'"
def get_console_log(pytest_capture): return pytest_capture.readouterr()[0] def get_control_dataset_path(dataset_name=None, depth=2): """Prepend path to `control_data` directory to `dataset_name` if appropriate. In the case where a filename (without path) is provided, and that file exists in the `control_data` folder, the path to that file is returned. Otherwise, the original value is returned. `depth` - varies depending on the stack depth from the original test file. """ if not isinstance(dataset_name, str): return dataset_name control_path = Path(sys._getframe(depth).f_code.co_filename).parent / "control_data" / dataset_name # noqa: SLF001 # don't mind this sketchy code if control_path.exists(): print(f"{__package__}: dataset '{dataset_name}' found in control_data folder, will load file at path '{control_path}'") return str(control_path) return dataset_name def get_log_path(sas_log_name, stack_level=2): """Given a filename, determine the full path. Looks `stack_level` levels up in the call stack, determines function's file path, generates path to sas log files """ return str(Path(sys._getframe(stack_level).f_code.co_filename).parent / "output-sas" / sas_log_name) # noqa: SLF001 # don't mind this sketchy code def inspect_dataset_difference(test_dataset=None, control_dataset=None): """Inspect two datasets and print summary of various differences, including. - overall equality - number of rows - number of columns - different sets of columns (names) - different values - different datatypes """ # print info and contents of both datasets, then try and compare them indent=" " if type(test_dataset) is not type(control_dataset): print(f"{indent}unable to inspect difference, test and control datasets are not the same type") print(f"{indent} test dataset type: {type(test_dataset)}") print(f"{indent} control dataset type: {type(control_dataset)}") return if isinstance(test_dataset, pa.Table): # INSPECT: number of rows test_row_count = test_dataset.num_rows cntl_row_count = control_dataset.num_rows row_count_equal = test_row_count == cntl_row_count if row_count_equal: print(f"{indent}number of rows equal: True") else: print(f"{indent}number of rows equal: False") print(f"{indent} number of control (expected) rows: {cntl_row_count}") print(f"{indent} number of test (actual) rows: {test_row_count}\n") print() # blank line # INSPECT: number of columns test_col_count = test_dataset.num_columns cntl_col_count = control_dataset.num_columns col_count_equal = test_col_count == cntl_col_count if col_count_equal: print(f"{indent}number of columns equal: True") else: print(f"{indent}number of columns equal: False") print(f"{indent} number of control (expected) columns: {cntl_col_count}") print(f"{indent} number of test (actual) columns: {test_col_count}") print() # blank line # INSPECT: names of columns test_col_set = set(test_dataset.column_names) cntl_col_set = set(control_dataset.column_names) symmetric_difference = test_col_set ^ cntl_col_set col_names_identical = len(symmetric_difference) == 0 if col_names_identical: print(f"{indent}column names equal: True") print(f"{indent}{indent}{test_col_set}") else: print(f"{indent}column names equal: False") # control only if len(cntl_col_set - test_col_set) > 0: print(f"{indent} columns only on control (expected) dataset:\n{indent}{indent}{cntl_col_set - test_col_set}") else: print(f"{indent} columns only on control (expected) dataset:\n{indent}{indent}{{}}") # test only if len(test_col_set - cntl_col_set) > 0: print(f"{indent} columns only on test (actual) dataset:\n{indent}{indent}{test_col_set - cntl_col_set}") else: print(f"{indent} columns only on test (actual) dataset:\n{indent}{indent}{{}}") # common columns print(f"{indent} columns common to both datasets:\n{indent}{indent}{test_col_set.intersection(cntl_col_set)}") return # no point in continuing to inspect if the names don't match print() # blank line # only continue comparison if all equal so far if not(row_count_equal and col_count_equal and col_names_identical): return # INSPECT: `pandas.DataFrame.compare()` try: print(f"{indent}Temporarily converting to pandas dataframe for value comparison") test_pandas = interm_to_DF(test_dataset) control_pandas = interm_to_DF(control_dataset) with pd.option_context( "display.max_rows", 5000, "display.max_columns", 5000, "display.width", 500, "display.precision", 20, ): diff = test_pandas.compare(control_pandas, result_names=("test", "control")) print(f"{indent}The following values differ (printing output of pandas `.compare()`):") print(indent + diff.to_string().replace("\n", "\n ")) except Exception as e: # noqa: BLE001 # in the .testing subpackage, we don't care print(f"{indent}unable to compare datasets using pandas `.compare()`:\n{indent} {e}") # don't raise print() # blank line # INSPECT datatypes if test_dataset.schema.equals(control_dataset.schema): print(f"{indent}datatypes equal: True") else: print(f"{indent}datatypes equal: False") try: test_type_set = set(test_dataset.schema) control_type_set = set(control_dataset.schema) # control only if len(control_type_set - test_type_set) > 0: print(f"{indent} column types only on control (expected) dataset:\n{indent}{indent}{control_type_set - test_type_set}") else: print(f"{indent} column types only on control (expected) dataset:\n{indent}{indent}{{}}") # test only if len(test_type_set - control_type_set) > 0: print(f"{indent} column types only on test (actual) dataset:\n{indent}{indent}{test_type_set - control_type_set}") else: print(f"{indent} column types only on test (actual) dataset:\n{indent}{indent}{{}}") except Exception as e: # noqa: BLE001 # in the .testing subpackage, we don't care print(f"{indent}unable to compare datatypes:\n{indent} {e}") # INSPECT: `pandas.DataFrame.info()` for ds, ds_name in [(test_dataset, "test"), (control_dataset, "control")]: print(f" printing {ds_name} dataset:") print(ds) print() else: return elif isinstance(test_dataset, pd.DataFrame): # INSPECT: number of rows test_row_count = test_dataset.shape[0] cntl_row_count = control_dataset.shape[0] row_count_equal = test_row_count == cntl_row_count if row_count_equal: print(f"{indent}number of rows equal: True") else: print(f"{indent}number of rows equal: False") print(f"{indent} number of control (expected) rows: {cntl_row_count}") print(f"{indent} number of test (actual) rows: {test_row_count}\n") print() # blank line # INSPECT: number of columns test_col_count = test_dataset.shape[1] cntl_col_count = control_dataset.shape[1] col_count_equal = test_col_count == cntl_col_count if col_count_equal: print(f"{indent}number of columns equal: True") else: print(f"{indent}number of columns equal: False") print(f"{indent} number of control (expected) columns: {cntl_col_count}") print(f"{indent} number of test (actual) columns: {test_col_count}") print() # blank line # INSPECT: names of columns test_col_set = set(test_dataset.columns.to_list()) cntl_col_set = set(control_dataset.columns.to_list()) symmetric_difference = test_col_set ^ cntl_col_set col_names_identical = len(symmetric_difference) == 0 if col_names_identical: print(f"{indent}column names equal: True") print(f"{indent}{indent}{test_col_set}") else: print(f"{indent}column names equal: False") # control only if len(cntl_col_set - test_col_set) > 0: print(f"{indent} columns only on control (expected) dataset:\n{indent}{indent}{cntl_col_set - test_col_set}") else: print(f"{indent} columns only on control (expected) dataset:\n{indent}{indent}{{}}") # test only if len(test_col_set - cntl_col_set) > 0: print(f"{indent} columns only on test (actual) dataset:\n{indent}{indent}{test_col_set - cntl_col_set}") else: print(f"{indent} columns only on test (actual) dataset:\n{indent}{indent}{{}}") # common columns print(f"{indent} columns common to both datasets:\n{indent}{indent}{test_col_set.intersection(cntl_col_set)}") return # no point in continuing to inspect if the names don't match print() # blank line # only continue comparison if all equal so far if not(row_count_equal and col_count_equal and col_names_identical): return # INSPECT: `pandas.DataFrame.compare()` try: with pd.option_context( "display.max_rows", 5000, "display.max_columns", 5000, "display.width", 500, "display.precision", 20, ): diff = test_dataset.compare(control_dataset, result_names=("test", "control")) print(f"{indent}The following values differ (printing output of pandas `.compare()`):") print(indent + diff.to_string().replace("\n", "\n ")) except Exception as e: # noqa: BLE001 # in the .testing subpackage, we don't care print(f"{indent}unable to compare datasets using pandas `.compare()`:\n{indent} {e}") # don't raise print() # blank line # INSPECT datatypes if test_dataset.dtypes.equals(control_dataset.dtypes): print(f"{indent}datatypes equal: True") else: print(f"{indent}datatypes equal: False") try: diff = test_dataset.dtypes.compare(control_dataset.dtypes, result_names=("test", "control")) print(f"{indent} comparing datatypes") print(indent + diff.to_string().replace("\n", f"\n{indent}")) except Exception as e: # noqa: BLE001 # in the .testing subpackage, we don't care print(f"{indent}unable to compare datatypes automatically using pandas `.dtypes.compare()`:\n{indent} {e}") # INSPECT: `pandas.DataFrame.info()` for ds, ds_name in [(test_dataset, "test"), (control_dataset, "control")]: print(f" printing pandas `.info()` for {ds_name} dataset:") print_dataset_info(dataset=ds, indent=f"{indent} ", print_output=True) print() else: return print() # blank line def load_control_dataset(ds_ref, depth=4): """Load `ds_ref` using the normal load function. First however, process `ds_ref`. to search *control_data* folder """ if ds_ref is None: return ds_ref return load_input_dataset(get_control_dataset_path(ds_ref, depth=depth), get_misc_logger()) def preprocess_input(str_in): str_out = str_in # remote whitespace str_out = re.sub(r"\s+", " ", str_out) # replace "text..........:" => "text.:" str_out = re.sub(r"(\.)(\.)+:", ".:", str_out) return str_out def print_dataset_contents_verbose(dataset=None, indent="", print_output=True): """Print a pandas dataframe with many rows, columns, and much precision. Returns a string with the printed dataset. Prints string by default, specify `print_dataset=False` to suppress printing. Specify an `indent=" "` to prepend spaces (or any text) to each line of output. """ with pd.option_context( "display.max_rows", 5000, "display.max_columns", 5000, "display.width", 500, "display.precision", 20, ): # print dataset with indent temp_str = indent + dataset.to_string().replace("\n", f"\n{indent}") if print_output: print(temp_str) return temp_str def print_dataset_info(dataset=None, indent="", print_output=True): temp_buff=io.StringIO() dataset.info(buf=temp_buff, memory_usage=False, show_counts=False) out_str = indent + temp_buff.getvalue().replace("\n", f"\n{indent}") if print_output: print(out_str) return out_str def read_log_file(log_path): if Path(log_path).exists(): mesg = f"log file does not exist: {log_path}" raise FileNotFoundError(mesg) with open(log_path, "r") as log_file: # noqa: PTH123 # No thanks, I'll use `open()` log_content= log_file.read() return log_content
[docs] def run_standard_assertions(expect_zero_rc=True, rc=None, python_log=None, sas_log_path=None, msg_list_sas=None, msg_list_sas_exact=None, msg_list_contains=None, msg_list_contains_exact=None, expect_error_count=None, expect_warning_count=None, ds_compare_list=None, round_data=None, drop_columns=None, upcase_columns=False, pytest_capture=None): if msg_list_sas is None: msg_list_sas=[] if msg_list_sas_exact is None: msg_list_sas_exact=[] if msg_list_contains is None: msg_list_contains=[] if msg_list_contains_exact is None: msg_list_contains_exact=[] if ds_compare_list is None: ds_compare_list=[] print("####################################### RUNNING ASSERTIONS #######################################") try: # build SAS log path if sas_log_path is not None: sas_log_path = get_log_path(sas_log_name=sas_log_path, stack_level=4) # assert that "cleaned" message exists in both SAS and Python logs if isinstance(msg_list_sas,str): msg_list_sas = [msg_list_sas] for message in msg_list_sas: assert sas_log_path is not None, "UNIT TEST SETUP ERROR: `sas_log_path` erroneously blank" assert_log_consistent(test_log=python_log, sas_log_path=sas_log_path, must_exist=True, clean_whitespace=True, msg=message) print() # assert exact message exists in both SAS and Python logs if isinstance(msg_list_sas_exact,str): msg_list_sas_exact = [msg_list_sas_exact] for message in msg_list_sas_exact: assert sas_log_path is not None, "UNIT TEST SETUP ERROR: `sas_log_path` erroneously blank" assert_log_consistent(test_log=python_log, sas_log_path=sas_log_path, must_exist=True,clean_whitespace=False, msg=message) print() # assert that "cleaned" message exists in Python log if isinstance(msg_list_contains,str): msg_list_contains = [msg_list_contains] for message in msg_list_contains: assert_log_contains(test_log=python_log, clean_whitespace=True, msg=message) print() # assert that exact message exists in Python log if isinstance(msg_list_contains_exact,str): msg_list_contains_exact = [msg_list_contains_exact] for message in msg_list_contains_exact: assert_log_contains(test_log=python_log, clean_whitespace=False, msg=message) print() # assert return value is correct if isinstance(expect_zero_rc, bool): if expect_zero_rc: assert rc == 0, f"Procedure returned non-zero value when zero was expected: {rc}" print("asserted RC == 0\n") else: assert rc != 0, f"Procedure return code should be non-zero, but is: {rc}" print("asserted RC != 0\n") print() # assert ERROR: count is correct if isinstance(expect_error_count,int): assert_substr_count(substr_to_count="ERROR:", test_log=python_log, expected_count=expect_error_count) print() # assert WARNING: count is correct if isinstance(expect_warning_count,int): assert_substr_count(substr_to_count="WARNING:", test_log=python_log, expected_count=expect_warning_count) print() # assert test and control datasets match assert_datasets_equal( ds_compare_list, round_data=round_data, drop_columns=drop_columns, upcase_columns=upcase_columns ) print() print("################################ ASSERTIONS COMPLETE WITHOUT ERROR ###############################") except Exception: print("####################################### ASSERTIONS FAILED ########################################") raise finally: out_assert = get_console_log(pytest_capture) # get console log for arrange and act print(python_log) print(out_assert)
class PytestProcedure: """Base class for Procedure Pytest helper functions. SUBCLASS MUST - populate the `self.ds_compare_list` with list containing a tuple for each output dataset - for associating expected and actual output datasets - populate tuple with `(<output-dataset>, expected_<output-dataset>)` """ def __init__( self, #### Unit test parameters pytest_capture = None, # for the `capfd` which captures the console log sas_log_name = None, msg_list_sas = None, msg_list_sas_exact = None, msg_list_contains = None, msg_list_contains_exact = None, expected_error_count = 0, expected_warning_count = 0, rc_should_be_zero = True, round_data = None, drop_columns = True, upcase_columns = False, by = None, ): if msg_list_sas is None: msg_list_sas=[] if msg_list_sas_exact is None: msg_list_sas_exact=[] if msg_list_contains is None: msg_list_contains=[] if msg_list_contains_exact is None: msg_list_contains_exact=[] print("############################# PROCEDURE LOG ENDED on previous line ##############################") #### assert self.out_act = get_console_log(pytest_capture) # get console log for arrange and act # load expected datasets for i in range(len(self.ds_compare_list)): self.ds_compare_list[i][1] = load_control_dataset(self.ds_compare_list[i][1]) if drop_columns is True: drop_columns = by run_standard_assertions( expect_zero_rc = rc_should_be_zero, rc = self.c_return_code, python_log = self.out_act, sas_log_path = sas_log_name, msg_list_sas = msg_list_sas, msg_list_sas_exact = msg_list_sas_exact, msg_list_contains = msg_list_contains, msg_list_contains_exact = msg_list_contains_exact, expect_error_count = expected_error_count, expect_warning_count = expected_warning_count, ds_compare_list = self.ds_compare_list, round_data = round_data, drop_columns = drop_columns, upcase_columns = upcase_columns, pytest_capture = pytest_capture, )