import time
from typing import Any, Dict, Optional, Tuple, Union
import numpy as np
from onnx import ModelProto, TensorProto
[docs]
def create_tensor(shape: Tuple[int, ...], dtype: int, batch_size: int = 1) -> np.ndarray:
"""
Creates a random tensor.
:param shape: shape
:param dtype: onnx type
:param batch_size: batch_size
:return: numpy array
"""
assert (
not isinstance(shape, int) or shape[0] == batch_size
), f"Conflict between batch_size={batch_size} and shape={shape}"
if not isinstance(shape[0], int):
shape = (batch_size, *shape[1:])
t = np.random.random(shape)
if dtype in (TensorProto.FLOAT, "tensor(float)"):
return t.astype(np.float32)
if dtype == (TensorProto.FLOAT16, "tensor(float16)"):
return t.astype(np.float16)
if dtype in (TensorProto.DOUBLE, "tensor(double)"):
return t.astype(np.float64)
raise AssertionError(f"The function is not implemented yet for dtype={dtype!r}")
[docs]
def create_feeds(
sess: "onnxruntime.InferenceSession", batch_size: int = 1 # noqa: F821
) -> Dict[str, Any]:
"""
Creates random feeds for a model.
:param sess: onnxruntime session
:param batch_size: batch_size
:return: feeds
"""
feeds = {}
for inp in sess.get_inputs():
feeds[inp.name] = create_tensor(inp.shape, inp.type, batch_size=batch_size)
return feeds
[docs]
def model_run(
model: Union[str, ModelProto],
repeat: int = 10,
warmup: int = 5,
batch_size: int = 1,
processor: str = "CPU",
verbose: int = 0,
validate: Optional[Union[str, ModelProto]] = None,
) -> Dict[str, Any]:
"""
Loads a model with onnxruntime and measures the inference time.
:param model: model to run
:param warmup: number of iterations to run before measuring
:param repeat: number of iterations to run to measure
:param batch_size: batch size of the inputs
:param processor: processor to run
:param verbose: verbosity
:return: metrics
"""
if processor == "CPU":
providers = ["CPUExecutionProvider"]
elif processor == "CUDA":
providers = ["CUDAExecutionProvider"]
elif processor == "CUDA,CPU":
providers = ["CPUExecutionProvider", "CUDAExecutionProvider"]
else:
raise AssertionError(f"Unexpected value {processor!r} for processor.")
assert (
processor == "CPU"
), f"This function does not implement anything yet for CUDA, processor={processor!r}"
if verbose:
smodel = model if isinstance(model, str) else str(type(model))
print(f"[model_run] loads {smodel!r}")
print(f"[model_run] providers={providers}")
stats = {
"providers": providers,
"model": model,
"repeat": repeat,
"warmup": warmup,
"batch_size": batch_size,
}
begin = time.perf_counter()
from onnxruntime import InferenceSession
stats["time_import_ort"] = time.perf_counter() - begin
if verbose:
print(f"[model_run] import ort {stats['time_import_ort']!r}")
feeds = None
if validate is not None and validate != "":
from .bench_run import measure_discrepancies
if verbose:
smodel = validate if isinstance(validate, str) else str(type(validate))
print(f"[model_run] validate with {smodel!r}")
begin = time.perf_counter()
val = InferenceSession(
validate.SerializeToString() if isinstance(validate, ModelProto) else validate,
providers=providers,
)
stats["time_ort_load_validation"] = time.perf_counter() - begin
if feeds is None:
begin = time.perf_counter()
feeds = create_feeds(val, batch_size=batch_size)
stats["time_ort_create_feeds"] = time.perf_counter() - begin
if verbose:
print("[model_run] compute expected output")
begin = time.perf_counter()
expected = val.run(None, feeds)
stats["time_latency_validation_one"] = time.perf_counter() - begin
begin = time.perf_counter()
model_bytes = model.SerializeToString() if isinstance(model, ModelProto) else model
stats["time_model_bytes"] = time.perf_counter() - begin
if verbose:
print(f"[model_run] time model bytes {stats['time_model_bytes']!r}")
begin = time.perf_counter()
sess = InferenceSession(model_bytes, providers=providers)
stats["time_ort_load"] = time.perf_counter() - begin
if verbose:
print(f"[model_run] time ort load {stats['time_ort_load']!r}")
print("[model_run] create inputs")
if feeds is None:
begin = time.perf_counter()
feeds = create_feeds(sess, batch_size=batch_size)
stats["time_ort_create_feeds"] = time.perf_counter() - begin
for k, v in feeds.items():
stats[f"onnx_input_{k}_shape"] = "x".join(map(str, v.shape))
stats[f"onnx_input_{k}_dtype"] = str(v.dtype).split(".")[-1]
if verbose:
print(f"[model_run] create {len(feeds)} inputs")
for k, v in feeds.items():
print(f"[model_run] input {k!r}: {v.dtype} - {v.shape}")
print(f"[model_run] warmup {warmup} times")
begin = time.perf_counter()
for _ in range(warmup):
last = sess.run(None, feeds)
stats["time_warmup_total"] = time.perf_counter() - begin
stats["time_warmup"] = stats["time_warmup_total"] / float(warmup)
if validate is not None and validate != "":
if verbose:
print("[model_run] compare outputs")
disc = measure_discrepancies(expected, last)
for k, v in disc.items():
stats[f"discrepancies_{k}"] = v
if verbose:
print(f"[model_run] discrepancies {k}: {v}")
if verbose:
print(f"[model_run] warmup took {stats['time_warmup_total']}")
print(f"[model_run] repeat {repeat} times")
times = []
for _ in range(repeat):
begin = time.perf_counter()
sess.run(None, feeds)
times.append(time.perf_counter() - begin)
np_times = np.array(times)
stats["time_latency_total"] = np_times.sum()
stats["time_latency_t_std"] = np_times.std()
stats["time_latency_t_min"] = np_times.min()
stats["time_latency_t_max"] = np_times.max()
stats["time_latency_t_med"] = np.median(np_times)
# measure the correlation with the time
stats["time_latency_t_corrt"] = np.corrcoef(np_times, list(range(len(times))))[0, 1]
# measures the correlation with the previous value
stats["time_latency_t_corrp"] = np.corrcoef(np_times[1:], np_times[:-1])[0, 1]
stats["time_latency"] = stats["time_latency_total"] / len(times)
stats["time_latency_t_qu"] = "/".join(
map(str, np.quantile(np_times, np.arange(11) / 10.0))
)
if verbose:
print(f"[model_run] inference took {stats['time_latency_total']}")
print("[model_run] done")
return stats