Source code for onnx_extended.ext_test_case

import os
import sys
import unittest
import warnings
from argparse import ArgumentParser, Namespace
from contextlib import redirect_stderr, redirect_stdout
from io import StringIO
from timeit import Timer
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

import numpy
from numpy.testing import assert_allclose


def unit_test_going():
    """
    Enables a flag telling the script is running while testing it.
    Avois unit tests to be very long.
    """
    going = int(os.environ.get("UNITTEST_GOING", 0))
    return going == 1


def ignore_warnings(warns: List[Warning]) -> Callable:
    """
    Catches warnings.

    :param warns:   warnings to ignore
    """

    def wrapper(fct):
        if warns is None:
            raise AssertionError(f"warns cannot be None for '{fct}'.")

        def call_f(self):
            with warnings.catch_warnings():
                warnings.simplefilter("ignore", warns)
                return fct(self)

        return call_f

    return wrapper


[docs]def measure_time( stmt: Union[str, Callable], context: Optional[Dict[str, Any]] = None, repeat: int = 10, number: int = 50, warmup: int = 1, div_by_number: bool = True, max_time: Optional[float] = None, ) -> Dict[str, float]: """ Measures a statement and returns the results as a dictionary. :param stmt: string or callable :param context: variable to know in a dictionary :param repeat: average over *repeat* experiment :param number: number of executions in one row :param warmup: number of iteration to do before starting the real measurement :param div_by_number: divide by the number of executions :param max_time: execute the statement until the total goes beyond this time (approximatively), *repeat* is ignored, *div_by_number* must be set to True :return: dictionary .. runpython:: :showcode: from pprint import pprint from math import cos from onnx_extended.ext_test_case import measure_time res = measure_time(lambda: cos(0.5)) pprint(res) See `Timer.repeat <https://docs.python.org/3/library/ timeit.html?timeit.Timer.repeat>`_ for a better understanding of parameter *repeat* and *number*. The function returns a duration corresponding to *number* times the execution of the main statement. .. versionchanged:: 0.4 Parameter *max_time* was added. """ if not callable(stmt) and not isinstance(stmt, str): raise TypeError( f"stmt is not callable or a string but is of type {type(stmt)!r}." ) if context is None: context = {} if isinstance(stmt, str): tim = Timer(stmt, globals=context) else: tim = Timer(stmt) if warmup > 0: warmup_time = tim.timeit(warmup) else: warmup_time = 0 if max_time is not None: if not div_by_number: raise ValueError( "div_by_number must be set to True of max_time is defined." ) i = 1 total_time = 0.0 results = [] while True: for j in (1, 2): number = i * j time_taken = tim.timeit(number) results.append((number, time_taken)) total_time += time_taken if total_time >= max_time: break if total_time >= max_time: break ratio = (max_time - total_time) / total_time ratio = max(ratio, 1) i = int(i * ratio) res = numpy.array(results) tw = res[:, 0].sum() ttime = res[:, 1].sum() mean = ttime / tw ave = res[:, 1] / res[:, 0] dev = (((ave - mean) ** 2 * res[:, 0]).sum() / tw) ** 0.5 mes = dict( average=mean, deviation=dev, min_exec=numpy.min(ave), max_exec=numpy.max(ave), repeat=1, number=tw, ttime=ttime, ) else: res = numpy.array(tim.repeat(repeat=repeat, number=number)) if div_by_number: res /= number mean = numpy.mean(res) dev = numpy.mean(res**2) dev = (dev - mean**2) ** 0.5 mes = dict( average=mean, deviation=dev, min_exec=numpy.min(res), max_exec=numpy.max(res), repeat=repeat, number=number, ttime=res.sum(), ) if "values" in context: if hasattr(context["values"], "shape"): mes["size"] = context["values"].shape[0] else: mes["size"] = len(context["values"]) else: mes["context_size"] = sys.getsizeof(context) mes["warmup_time"] = warmup_time return mes
class ExtTestCase(unittest.TestCase): _warns: List[Tuple[str, int, Warning]] = [] def assertExists(self, name): if not os.path.exists(name): raise AssertionError(f"File or folder {name!r} does not exists.") def assertEqualArray( self, expected: numpy.ndarray, value: numpy.ndarray, atol: float = 0, rtol: float = 0, ): self.assertEqual(expected.dtype, value.dtype) self.assertEqual(expected.shape, value.shape) assert_allclose(expected, value, atol=atol, rtol=rtol) def assertAlmostEqual( self, expected: numpy.ndarray, value: numpy.ndarray, atol: float = 0, rtol: float = 0, ): if not isinstance(expected, numpy.ndarray): expected = numpy.array(expected) if not isinstance(value, numpy.ndarray): value = numpy.array(value).astype(expected.dtype) self.assertEqualArray(expected, value, atol=atol, rtol=rtol) def assertRaise(self, fct: Callable, exc_type: type[Exception]): try: fct() except exc_type as e: if not isinstance(e, exc_type): raise AssertionError(f"Unexpected exception {type(e)!r}.") return raise AssertionError("No exception was raised.") def assertEmpty(self, value: Any): if value is None: return if len(value) == 0: return raise AssertionError(f"value is not empty: {value!r}.") def assertNotEmpty(self, value: Any): if value is None: raise AssertionError(f"value is empty: {value!r}.") if isinstance(value, (list, dict, tuple, set)): if len(value) == 0: raise AssertionError(f"value is empty: {value!r}.") def assertStartsWith(self, prefix: str, full: str): if not full.startswith(prefix): raise AssertionError(f"prefix={prefix!r} does not start string {full!r}.") @classmethod def tearDownClass(cls): for name, line, w in cls._warns: warnings.warn(f"\n{name}:{line}: {type(w)}\n {str(w)}") def capture(self, fct: Callable): """ Runs a function and capture standard output and error. :param fct: function to run :return: result of *fct*, output, error """ sout = StringIO() serr = StringIO() with redirect_stdout(sout): with redirect_stderr(serr): try: res = fct() except Exception as e: raise AssertionError( f"function {fct} failed, stdout=" f"\n{sout.getvalue()}\n---\nstderr=\n{serr.getvalue()}" ) from e return res, sout.getvalue(), serr.getvalue() def get_parsed_args( name: str, scenarios: Optional[Dict[str, str]] = None, description: Optional[str] = None, epilog: Optional[str] = None, number: int = 10, repeat: int = 10, warmup: int = 5, sleep: float = 0.1, tries: int = 2, expose: Optional[str] = None, **kwargs: Dict[str, Tuple[Union[int, str, float], str]], ) -> Namespace: """ Returns parsed arguments for examples in this package. :param name: script name :param scenarios: list of available scenarios :param description: parser description :param epilog: text at the end of the parser :param number: default value for number parameter :param repeat: default value for repeat parameter :param warmup: default value for warmup parameter :param sleep: default value for sleep parameter :param expose: if empty, keeps all the parameters, if None, only publish kwargs contains, otherwise the list of parameters to publish separated by a comma :param kwargs: additional parameters, example: `n_trees=(10, "number of trees to train")` :return: parser """ if description is None: description = f"Available options for {name}.py." if epilog is None: epilog = "" parser = ArgumentParser(prog=name, description=description, epilog=epilog) if expose is not None: to_publish = set(expose.split(",")) if len(expose) > 0 else set() if scenarios is not None: rows = ", ".join(f"{k}: {v}" for k, v in scenarios.items()) parser.add_argument( "-s", "--scenario", help=f"Available scenarios: {rows}." ) if len(to_publish) == 0 or "number" in to_publish: parser.add_argument( "-n", "--number", help=f"number of executions to measure, default is {number}", type=int, default=number, ) if len(to_publish) == 0 or "repeat" in to_publish: parser.add_argument( "-r", "--repeat", help=f"number of times to repeat the measure, default is {repeat}", type=int, default=repeat, ) if len(to_publish) == 0 or "warmup" in to_publish: parser.add_argument( "-w", "--warmup", help=f"number of times to repeat the measure, default is {warmup}", type=int, default=warmup, ) if len(to_publish) == 0 or "sleep" in to_publish: parser.add_argument( "-S", "--sleep", help=f"sleeping time between two configurations, default is {sleep}", type=float, default=sleep, ) if len(to_publish) == 0 or "tries" in to_publish: parser.add_argument( "-t", "--tries", help=f"number of tries for each configurations, default is {tries}", type=int, default=tries, ) for k, v in kwargs.items(): parser.add_argument( f"--{k}", help=f"{v[1]}, default is {v[0]}", type=type(v[0]), default=v[0], ) return parser.parse_args()