Source code for onnx_diagnostic.helpers._log_helper

import datetime
import glob
import io
import os
import zipfile
from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple, Union
import numpy as np
import pandas

BUCKET_SCALES_VALUES = np.array(
    [-np.inf, -20, -10, -5, -2, 0, 2, 5, 10, 20, 100, 200, 300, 400, np.inf], dtype=float
)


BUCKET_SCALES = BUCKET_SCALES_VALUES / 100 + 1


[docs] def mann_kendall(series: Sequence[float], threshold: float = 0.5): """ Computes the test of Mann-Kendall. :param series: series :param threshold: 1.96 is the usual value, 0.5 means a short timeseries ``(0, 1, 2, 3, 4)`` has a significant trend :return: trend (-1, 0, +1), test value .. math:: S =\\sum_{i=1}^{n}\\sum_{j=i+1}^{n} sign(x_j - x_i) where the function *sign* is: .. math:: sign(x) = \\left\\{ \\begin{array}{l} -1 if x < 0 \\\\ 0 if x = 0 \\\\ +1 otherwise \\end{array} \\right. And: .. math:: Var(S)= \\frac{n(n-1)(2n+5) - \\sum_t t(t-1)(2t+5)}{18} """ aseries = np.asarray(series) stat = 0 n = len(aseries) var = n * (n - 1) * (2 * n + 5) for i in range(n - 1): stat += np.sign(aseries[i + 1 :] - aseries[i]).sum() var = var**0.5 test = (stat + (1 if stat < 0 else (0 if stat == 0 else -1))) / var trend = np.sign(test) if np.abs(test) > threshold else 0 return trend, test
[docs] def breaking_last_point(series: Sequence[float], threshold: float = 1.2): """ Assuming a timeseries is constant, we check the last value is not an outlier. :param series: series :return: significant change (-1, 0, +1), test value """ signal = np.asarray(series) if not np.issubdtype(signal.dtype, np.number): return 0, np.nan assert len(signal.shape) == 1, f"Unexpected signal shape={signal.shape}, signal={signal}" if signal.shape[0] <= 2: return 0, 0 has_value = ~(np.isnan(signal).all()) and ~(np.isinf(signal).all()) if np.isnan(signal[-1]) or np.isinf(signal[-1]): return (-1, np.inf) if has_value else (0, 0) try: m = np.mean(signal[:-1]) except (TypeError, ValueError): # Not a numerical type return 0, np.nan if np.isnan(m) or np.isinf(m): return (1, np.inf) if np.isinf(signal[-2]) or np.isnan(signal[-2]) else (0, 0) v = np.std(signal[:-1]) if v == 0: test = signal[-1] - m assert not np.isnan( test ), f"Unexpected test value, test={test}, signal={signal}, m={m}, v={v}" trend = np.sign(test) return trend, trend test = (signal[-1] - m) / v assert not np.isnan( test ), f"Unexpected test value, test={test}, signal={signal}, m={m}, v={v}" trend = np.sign(test) if np.abs(test) > threshold else 0 return trend, test
[docs] def filter_data( df: pandas.DataFrame, filter_in: Optional[str] = None, filter_out: Optional[str] = None, verbose: int = 0, ) -> pandas.DataFrame: """ Argument `filter` follows the syntax ``<column1>:<fmt1>//<column2>:<fmt2>``. The format is the following: * a value or a set of values separated by ``;`` """ if not filter_in and not filter_out: return df def _f(fmt): cond = {} if isinstance(fmt, str): cols = fmt.split("//") for c in cols: assert ":" in c, f"Unexpected value {c!r} in fmt={fmt!r}" spl = c.split(":") assert len(spl) == 2, f"Unexpected value {c!r} in fmt={fmt!r}" name, fil = spl cond[name] = set(fil.split(";")) return cond if filter_in: cond = _f(filter_in) assert isinstance(cond, dict), f"Unexpected type {type(cond)} for fmt={filter_in!r}" for k, v in cond.items(): if k not in df.columns: continue if verbose: print( f"[_filter_data] filter in column {k!r}, " f"values {v!r} among {set(df[k].astype(str))}" ) df = df[df[k].astype(str).isin(v)] if filter_out: cond = _f(filter_out) assert isinstance(cond, dict), f"Unexpected type {type(cond)} for fmt={filter_out!r}" for k, v in cond.items(): if k not in df.columns: continue if verbose: print( f"[_filter_data] filter out column {k!r}, " f"values {v!r} among {set(df[k].astype(str))}" ) df = df[~df[k].astype(str).isin(v)] return df
[docs] def enumerate_csv_files( data: Union[ pandas.DataFrame, List[Union[str, Tuple[str, str]]], str, Tuple[str, str, str, str] ], verbose: int = 0, filtering: Optional[Callable[[str], bool]] = None, ) -> Iterator[Union[pandas.DataFrame, str, Tuple[str, str, str, str]]]: """ Enumerates files considered for the aggregation. Only csv files are considered. If a zip file is given, the function digs into the zip files and loops over csv candidates. :param data: dataframe with the raw data or a file or list of files :param vrbose: verbosity :param filtering: function to filter in or out files in zip files, must return true to keep the file, false to skip it. :return: a generator yielding tuples with the filename, date, full path and zip file data can contains: * a dataframe * a string for a filename, zip or csv * a list of string * a tuple """ if not isinstance(data, list): data = [data] for itn, filename in enumerate(data): if isinstance(filename, pandas.DataFrame): if verbose: print(f"[enumerate_csv_files] data[{itn}] is a dataframe") yield filename continue if isinstance(filename, tuple): # A file in a zipfile if verbose: print(f"[enumerate_csv_files] data[{itn}] is {filename!r}") yield filename continue if os.path.exists(filename): ext = os.path.splitext(filename)[-1] if ext == ".csv": # We check the first line is ok. if verbose: print(f"[enumerate_csv_files] data[{itn}] is a csv file: {filename!r}]") dt = datetime.datetime.fromtimestamp(os.stat(filename).st_mtime) du = dt.strftime("%Y-%m-%d %H:%M:%S") yield (os.path.split(filename)[-1], du, filename, "") continue if ext == ".zip": if verbose: print(f"[enumerate_csv_files] data[{itn}] is a zip file: {filename!r}]") zf = zipfile.ZipFile(filename, "r") for ii, info in enumerate(zf.infolist()): name = info.filename if filtering is None: ext = os.path.splitext(name)[-1] if ext != ".csv": continue elif not filtering(name): continue if verbose: print( f"[enumerate_csv_files] data[{itn}][{ii}] is a csv file: {name!r}]" ) with zf.open(name) as zzf: first_line = zzf.readline() if b"," not in first_line: continue yield ( os.path.split(name)[-1], "%04d-%02d-%02d %02d:%02d:%02d" % info.date_time, name, filename, ) zf.close() continue raise AssertionError(f"Unexpected format {filename!r}, cannot read it.") # filename is a pattern. found = glob.glob(filename) if verbose and not found: print(f"[enumerate_csv_files] unable to find file in {filename!r}") for ii, f in enumerate(found): if verbose: print(f"[enumerate_csv_files] data[{itn}][{ii}] {f!r} from {filename!r}") yield from enumerate_csv_files(f, verbose=verbose, filtering=filtering)
[docs] def open_dataframe( data: Union[str, Tuple[str, str, str, str], pandas.DataFrame], ) -> pandas.DataFrame: """ Opens a filename defined by function :func:`onnx_diagnostic.helpers.log_helper.enumerate_csv_files`. :param data: a dataframe, a filename, a tuple indicating the file is coming from a zip file :return: a dataframe """ if isinstance(data, pandas.DataFrame): return data if isinstance(data, str): df = pandas.read_csv(data) df["RAWFILENAME"] = data return df if isinstance(data, tuple): if not data[-1]: df = pandas.read_csv(data[2]) df["RAWFILENAME"] = data[2] return df zf = zipfile.ZipFile(data[-1]) with zf.open(data[2]) as f: df = pandas.read_csv(f) df["RAWFILENAME"] = f"{data[-1]}/{data[2]}" zf.close() return df raise ValueError(f"Unexpected value for data: {data!r}")
[docs] def align_dataframe_with( df: pandas.DataFrame, baseline: pandas.DataFrame, fill_value: float = 0 ) -> Optional[pandas.DataFrame]: """ Modifies the first dataframe *df* to get the exact same number of columns and rows. They must share the same levels on both axes. Empty cells are filled with 0. We only keep the numerical columns. The function return None if the output is empty. """ df = df.select_dtypes(include="number") if df.shape[1] == 0: return None bool_cols = list(df.select_dtypes(include="bool").columns) if bool_cols: df[bool_cols] = df[bool_cols].astype(int) assert ( df.columns.names == baseline.columns.names or df.index.names == baseline.index.names ), ( f"Levels mismatch, expected index.names={baseline.index.names}, " f"expected columns.names={baseline.columns.names}, " f"got index.names={df.index.names}, " f"got columns.names={df.columns.names}" ) dtypes = set(df[c].dtype for c in df.columns) assert all(np.issubdtype(dt, np.number) for dt in dtypes), ( f"All columns in the first dataframe are expected to share " f"the same type or be at least numerical but got {dtypes}\n{df}" ) common_index = df.index.intersection(baseline.index) cp = pandas.DataFrame(float(fill_value), index=baseline.index, columns=baseline.columns) for c in df.columns: if c not in cp.columns or not np.issubdtype(df[c].dtype, np.number): continue cp.loc[common_index, c] = df.loc[common_index, c].astype(cp[c].dtype) return cp
[docs] def apply_excel_style( filename_or_writer: Any, f_highlights: Optional[ # type: ignore[name-defined] Dict[str, Callable[[Any], "CubeViewDef.HighLightKind"]] # noqa: F821 ] = None, time_mask_view: Optional[Dict[str, pandas.DataFrame]] = None, verbose: int = 0, ): """ Applies styles on all sheets in a file unless the sheet is too big. :param filename_or_writer: filename, modified inplace :param f_highlight: color function to apply, one per sheet :param time_mask_view: if specified, it contains dataframe with the same shape and values in {-1, 0, +1} which indicates if a value is unexpectedly lower (-1) or higher (+1), it changes the color of the background then. :param verbosity: progress loop """ from openpyxl import load_workbook from openpyxl.styles import Alignment from openpyxl.utils import get_column_letter from openpyxl.styles import Font, PatternFill from .log_helper import CubeViewDef if isinstance(filename_or_writer, str): workbook = load_workbook(filename_or_writer) save = True else: workbook = filename_or_writer.book save = False mask_low = PatternFill(fgColor="AAAAF0", fill_type="solid") mask_high = PatternFill(fgColor="F0AAAA", fill_type="solid") left = Alignment(horizontal="left") left_shrink = Alignment(horizontal="left", shrink_to_fit=True) right = Alignment(horizontal="right") font_colors = { CubeViewDef.HighLightKind.GREEN: Font(color="00AA00"), CubeViewDef.HighLightKind.RED: Font(color="FF0000"), } if verbose: from tqdm import tqdm sheet_names = tqdm(list(workbook.sheetnames)) else: sheet_names = workbook.sheetnames for name in sheet_names: if time_mask_view and name in time_mask_view: mask = time_mask_view[name] with pandas.ExcelWriter(io.BytesIO(), engine="openpyxl") as mask_writer: mask.to_excel(mask_writer, sheet_name=name) sheet_mask = mask_writer.sheets[name] else: sheet_mask = None f_highlight = f_highlights.get(name, None) if f_highlights else None sheet = workbook[name] n_rows = sheet.max_row n_cols = sheet.max_column if n_rows * n_cols > 2**16 or n_rows > 2**13: # Too big. continue co: Dict[int, int] = {} sizes: Dict[int, int] = {} cols = set() for i in range(1, n_rows + 1): for j, cell in enumerate(sheet[i]): if j > n_cols: break cols.add(cell.column) if isinstance(cell.value, float): co[j] = co.get(j, 0) + 1 elif isinstance(cell.value, str): sizes[cell.column] = max(sizes.get(cell.column, 0), len(cell.value)) for k, v in sizes.items(): c = get_column_letter(k) sheet.column_dimensions[c].width = min(max(8, v), 30) for k in cols: if k not in sizes: c = get_column_letter(k) sheet.column_dimensions[c].width = 15 for i in range(1, n_rows + 1): for j, cell in enumerate(sheet[i]): if j > n_cols: break if isinstance(cell.value, pandas.Timestamp): cell.alignment = right dt = cell.value.to_pydatetime() cell.value = dt cell.number_format = ( "YYYY-MM-DD" if ( dt.hour == 0 and dt.minute == 0 and dt.second == 0 and dt.microsecond == 0 ) else "YYYY-MM-DD 00:00:00" ) elif isinstance(cell.value, (float, int)): cell.alignment = right x = abs(cell.value) if int(x) == x: cell.number_format = "0" elif x > 5000: cell.number_format = "# ##0" elif x >= 500: cell.number_format = "0.0" elif x >= 50: cell.number_format = "0.00" elif x >= 5: cell.number_format = "0.000" elif x > 0.5: cell.number_format = "0.0000" elif x > 0.005: cell.number_format = "0.00000" else: cell.number_format = "0.000E+00" if f_highlight: h = f_highlight(cell.value) if h in font_colors: cell.font = font_colors[h] elif isinstance(cell.value, str) and len(cell.value) > 70: cell.alignment = left_shrink else: cell.alignment = left if f_highlight: h = f_highlight(cell.value) if h in font_colors: cell.font = font_colors[h] if sheet_mask is not None: for i in range(1, n_rows + 1): for j, (cell, cell_mask) in enumerate(zip(sheet[i], sheet_mask[i])): if j > n_cols: break if cell_mask.value not in (1, -1): continue cell.fill = mask_low if cell_mask.value < 0 else mask_high if save: workbook.save(filename_or_writer)