import contextlib
import io
import itertools
import re
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import numpy as np
import onnx
[docs]
def discover():
"""
Discovers all model cases used to evaluate an exporter.
.. runpython::
:showcode:
import pprint
from experimental_experiment.torch_interpreter.eval import discover
pprint.pprint(discover())
"""
from . import model_cases
res = {}
for m in model_cases.__dict__.values():
if m is None or isinstance(m, str):
continue
if not hasattr(m, "forward"):
continue
assert m.__name__ not in res, f"Case {m.__name__!r} is duplicated."
assert hasattr(m, "_inputs"), f"Attribute '_inputs' is missing from class {m}"
assert hasattr(m, "_dynamic"), f"Attribute '_dynamic' is missing from class {m}"
res[m.__name__] = m
return res
[docs]
def evaluation(
exporters: Tuple[str] = (
"export-strict",
"export-strict-decall",
"export-nostrict",
"export-nostrict-decall",
"export-tracing",
"custom-strict",
"custom-strict-decall",
"custom-nostrict",
"custom-nostrict-decall",
"custom-tracing",
),
dynamic: Tuple[bool] = (False, True),
cases: Optional[Union[str, Dict[str, type]]] = None,
verbose: int = 0,
quiet: bool = True,
) -> List[Dict[str, Any]]:
"""
Evaluates exporter for a list of cases.
:param exporters: exporters to evaluate
:param dynamic: evaluate static shape and dynamic shapes
:param cases: model cases to evaluate
:param verbose: verbosity
:param quiet: catch exception
:return: results, list of dictionaries
"""
if isinstance(exporters, str):
exporters = (exporters,)
if isinstance(dynamic, (bool, int)):
dynamic = (dynamic,)
if cases is None:
cases = discover()
elif cases in ("three", ["three"]):
all_cases = discover()
cases = dict(list(all_cases.items())[:3])
elif isinstance(cases, str):
cases = (cases,)
if isinstance(cases, (list, tuple)):
all_cases = discover()
new_cases = []
for c in cases:
if "*" in c or "?" in c:
# regex
reg = re.compile(c)
new_cases.extend(k for k in all_cases if reg.match(k))
else:
new_cases.append(c)
cases = {k: v for k, v in all_cases.items() if k in set(new_cases)}
sorted_cases = sorted(cases.items())
loop = list(itertools.product(sorted_cases, dynamic, exporters))
if verbose:
try:
import tqdm
loop = tqdm.tqdm(loop)
except ImportError:
def _loop():
for _ in loop:
print(f"[evaluation] {_}")
yield _
assert len(loop) > 0, f"No case to test for cases={cases!r}."
obs = []
for case, dyn, exporter in loop:
name, cls_model = case
res = run_exporter(exporter, cls_model, dyn, quiet=quiet, verbose=max(0, verbose - 1))
res.update(dict(name=name, dynamic=int(dyn), exporter=exporter))
obs.append(res)
return obs
def _flatten_inputs(x: Any) -> List["torch.Tensor"]: # noqa: F821
"""
Flatten inputs.
"""
if x is None:
return x
import torch
if isinstance(x, (list, tuple)):
res = []
for i in x:
if i is None or isinstance(
i,
(
torch.Tensor,
torch.SymInt,
torch.SymFloat,
int,
float,
),
):
res.append(i)
else:
res.extend(_flatten_inputs(i))
return tuple(res) if isinstance(x, tuple) else res
raise AssertionError(f"Unexpected type {type(x)} for x")
def _to_numpy(x):
if hasattr(x, "numpy"):
return x.numpy()
if isinstance(x, int):
# onnxruntime does not like scalar
return np.array([x], dtype=np.int64)
if isinstance(x, float):
# onnxruntime does not like scalar
return np.array([x], dtype=np.float32)
if isinstance(x, list):
return [_to_numpy(_) for _ in x]
if isinstance(x, tuple):
return tuple(_to_numpy(_) for _ in x)
raise TypeError(f"Unable to convert type {type(x)}, x={x} into numpy")
def _make_feeds(names, args):
if len(names) == len(args):
return {k: _to_numpy(v) for k, v in zip(names, args)}
if len(names) > len(args):
flats = _flatten_inputs(args)
return {k: _to_numpy(v) for k, v in zip(names, flats)}
from ...helpers import string_type
raise RuntimeError(f"Unable to handle names={names!r} and args={string_type(args)}")
def _clone(x):
if hasattr(x, "clone"):
return x.clone()
if isinstance(x, (int, float)):
return x
if isinstance(x, list):
return [_clone(_) for _ in x]
if isinstance(x, tuple):
return tuple(_clone(_) for _ in x)
raise TypeError(f"Unable to clone type {type(x)}, x={x} into numpy")
def _make_exporter_export(
exporter: str,
model: "torch.nn.Module", # noqa: F821
inputs: Tuple[Any, ...],
dynamic_shapes: Optional[Any] = None,
verbose: int = 0,
quiet: bool = True,
) -> Union[Dict, Callable]:
import torch
if exporter == "export-strict":
try:
exported = torch.export.export(
model, inputs, dynamic_shapes=dynamic_shapes, strict=True
)
except Exception as e:
if not quiet:
raise
return dict(error=str(e), success=0, error_step="export")
if verbose >= 9:
print("-- graph")
print(exported.graph)
return exported.module()
if exporter in ("export-strict-dec", "export-strict-decall"):
try:
exported = torch.export.export(
model, inputs, dynamic_shapes=dynamic_shapes, strict=True
)
if verbose >= 9:
print("-- graph before decomposition")
print(exported.graph)
exported = (
exported.run_decompositions()
if "decall" in exporter
else exported.run_decompositions({})
)
except Exception as e:
if not quiet:
raise
return dict(error=str(e), success=0, error_step="export")
if verbose >= 9:
print("-- graph after decomposition")
print(exported.graph)
return exported.module()
if exporter == "export-nostrict":
try:
exported = torch.export.export(
model, inputs, dynamic_shapes=dynamic_shapes, strict=False
)
except Exception as e:
if not quiet:
raise
return dict(error=str(e), success=0, error_step="export")
if verbose >= 9:
print("-- graph")
print(exported.graph)
return exported.module()
if exporter in ("export-nostrict-dec", "export-nostrict-decall"):
try:
exported = torch.export.export(
model, inputs, dynamic_shapes=dynamic_shapes, strict=False
)
if verbose >= 9:
print("-- graph before decomposition")
print(exported.graph)
exported = (
exported.run_decompositions()
if "decall" in exporter
else exported.run_decompositions({})
)
except Exception as e:
if not quiet:
raise
return dict(error=str(e), success=0, error_step="export")
if verbose >= 9:
print("-- graph after decomposition")
print(exported.graph)
return exported.module()
if exporter == "export-tracing":
from ..tracing import CustomTracer
try:
graph = CustomTracer().trace(model)
mod = torch.fx.GraphModule(model, graph)
except Exception as e:
if not quiet:
raise
return dict(error=str(e), success=0, error_step="export")
if verbose >= 9:
print("-- graph")
print(graph)
return mod
if exporter == "export-jit":
from torch._export.converter import TS2EPConverter
try:
jit_model = torch.jit.trace(
model, example_inputs=inputs, check_trace=False, strict=False
)
exported = TS2EPConverter(jit_model, inputs, {}).convert()
if verbose >= 9:
print("-- graph")
print(exported.graph)
except Exception as e:
if not quiet:
raise
return dict(error=str(e), success=0, error_step="export")
return exported.module()
if exporter in ("export-jit-dec", "export-jit-decall"):
from torch._export.converter import TS2EPConverter
try:
jit_model = torch.jit.trace(
model, example_inputs=inputs, check_trace=False, strict=False
)
exported = TS2EPConverter(jit_model, inputs, {}).convert()
if verbose >= 9:
print("-- graph before decomposition")
print(exported.graph)
exported = (
exported.run_decompositions()
if "decall" in exporter
else exported.run_decompositions({})
)
if verbose >= 9:
print("-- graph after decomposition")
print(exported.graph)
except Exception as e:
if not quiet:
raise
return dict(error=str(e), success=0, error_step="export")
return exported.module()
raise AssertionError(f"Unexpected exporter={exporter!r}")
def _make_exporter_onnx(
exporter: str,
model: "torch.nn.Module", # noqa: F821
inputs: Tuple[Any, ...],
dynamic_shapes: Optional[Any] = None,
verbose: int = 0,
quiet: bool = True,
) -> Union[Dict, Tuple[onnx.ModelProto, Any]]:
from ...torch_interpreter import to_onnx, ExportOptions
from ...helpers import string_type
if exporter.startswith("custom"):
opts = {}
opts["strict"] = "-nostrict" not in exporter
opts["fallback"] = "-fallback" in exporter
opts["tracing"] = "-tracing" in exporter
opts["jit"] = "-jit" in exporter
if "-dec" in exporter:
opts["decomposition_table"] = "all" if "-decall" in exporter else "default"
try:
onx, builder = to_onnx(
model,
inputs,
dynamic_shapes=dynamic_shapes,
export_options=ExportOptions(**opts),
return_builder=True,
)
except Exception as e:
if not quiet:
raise RuntimeError(
f"Unable to convert model={model.__class__.__name__}, "
f"input={string_type(inputs[0], with_shape=True)}, "
f"dynamic_shapes={dynamic_shapes}, "
f"exporter={exporter!r}"
) from e
return dict(error=str(e), success=0, error_step="export")
return onx, builder
if exporter == "script":
import torch
f = (
f"evaluation-{model.__class__.__name__}-"
f"{int(bool(dynamic_shapes))}-{exporter}.onnx"
)
dynamic_axes = {}
input_names = []
if dynamic_shapes:
if not isinstance(dynamic_shapes, dict):
return dict(
error=f"unable to convert dynamic shapes {dynamic_shapes}",
success=0,
error_step="input",
)
for k, v in dynamic_shapes.items():
if not isinstance(v, dict):
return dict(
error=f"unable to convert dynamic shapes {dynamic_shapes}",
success=0,
error_step="input",
)
dynamic_axes[k] = {_k: _v.__name__ for _k, _v in v.items()}
input_names.append(k)
while len(input_names) < len(inputs[0]):
input_names.append(f"args_{len(input_names)}")
if verbose >= 5:
print(
f"[run_exporter] dynamic_axes={dynamic_axes}, "
f"dynamic_shapes={dynamic_shapes}, input_names={input_names}"
)
try:
if verbose >= 2:
torch.onnx.export(
model,
inputs,
f,
dynamic_axes=dynamic_axes,
input_names=input_names if dynamic_axes else None,
dynamo=False,
)
onx = onnx.load(f)
else:
with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(
io.StringIO()
):
torch.onnx.export(
model,
inputs,
f,
dynamic_axes=dynamic_axes,
input_names=input_names if dynamic_axes else None,
dynamo=False,
)
onx = onnx.load(f)
except Exception as e:
if not quiet:
raise RuntimeError(
f"Unable to convert model={model.__class__.__name__}, "
f"input={string_type(inputs[0], with_shape=True)}, "
f"dynamic_shapes={dynamic_shapes}, "
f"exporter={exporter!r}"
) from e
return dict(error=str(e), success=0, error_step="export")
return onx, None
if exporter == "dynamo":
import torch
try:
if verbose >= 2:
onx = torch.onnx.export(
model,
inputs,
dynamic_shapes=dynamic_shapes,
dynamo=True,
report=True,
).model_proto
else:
with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(
io.StringIO()
):
onx = torch.onnx.export(
model,
inputs,
dynamic_shapes=dynamic_shapes,
dynamo=True,
).model_proto
except Exception as e:
if not quiet:
raise RuntimeError(
f"Unable to convert model={model.__class__.__name__}, "
f"input={string_type(inputs[0], with_shape=True)}, "
f"dynamic_shapes={dynamic_shapes}, "
f"exporter={exporter!r}"
) from e
return dict(error=str(e), success=0, error_step="export")
return onx, None
if exporter == "dynamo-ir":
import torch
try:
if verbose >= 2:
ep = torch.onnx.export(
model,
inputs,
dynamic_shapes=dynamic_shapes,
dynamo=True,
report=True,
)
ep.optimize()
onx = ep.model_proto
else:
with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(
io.StringIO()
):
ep = torch.onnx.export(
model,
inputs,
dynamic_shapes=dynamic_shapes,
dynamo=True,
)
ep.optimize()
onx = ep.model_proto
except Exception as e:
if not quiet:
raise RuntimeError(
f"Unable to convert model={model.__class__.__name__}, "
f"input={string_type(inputs[0], with_shape=True)}, "
f"dynamic_shapes={dynamic_shapes}, "
f"exporter={exporter!r}"
) from e
return dict(error=str(e), success=0, error_step="export")
return onx, None
raise AssertionError(f"Unexpected exporter={exporter!r}")
[docs]
def run_exporter(
exporter: str,
cls_model: type,
dynamic: bool = False,
quiet: bool = False,
verbose: int = 0,
) -> Dict[str, Any]:
"""
Runs an exporter and returns whether it fails or not.
:param exporter: exporter
:param cls_model: model class to create
:param inputs: list of inputs to try
:param dynamic: use dynamic shape or not
:param quiet: raise exception or not
:param verbose: verbosity
:return: results
"""
from ...bench_run import max_diff
from ...helpers import string_type, pretty_onnx
assert hasattr(
cls_model, "_inputs"
), f"Attribute '_inputs' is missing from class {cls_model}"
model = cls_model()
inputs = cls_model._inputs
if isinstance(inputs, tuple):
inputs = [inputs]
if dynamic:
assert hasattr(
cls_model, "_dynamic"
), f"Attribute '_inputs' is missing from class {cls_model}"
dynamic_shapes = cls_model._dynamic
else:
dynamic_shapes = None
if verbose > 0:
print(
f"[run_exporter] exporter={exporter}, model={cls_model.__name__}, "
f"dynamic={dynamic}, inputs={string_type(inputs,with_shape=True)}"
)
builder = None
onx = None
if exporter.startswith("export-"):
mod = _make_exporter_export(
exporter,
model,
inputs[0],
dynamic_shapes=dynamic_shapes,
verbose=verbose,
quiet=quiet,
)
if isinstance(mod, dict):
# something went wrong
return mod
else:
res = _make_exporter_onnx(
exporter,
model,
inputs[0],
dynamic_shapes=dynamic_shapes,
verbose=verbose,
quiet=quiet,
)
if isinstance(res, dict):
# something went wrong
return res
onx, builder = res
if verbose >= 9:
print("[run_exporter] onnx model")
print(
builder.pretty_text(add_fx_graph=True)
if builder is not None
else pretty_onnx(onx)
)
if verbose >= 2:
onnx.save(onx, f"evaluation-{model.__class__.__name__}-{dynamic}-{exporter}.onnx")
names = [i.name for i in onx.graph.input]
flats = _flatten_inputs(inputs[0]) if len(names) > len(inputs[0]) else inputs[0]
assert quiet or len(names) == len(flats), (
f"Input mismatch, inputs[0]={string_type(inputs[0])} "
f"inputs but names={names!r}, "
f"model={cls_model.__name__}, export={exporter!r}"
)
if len(names) != len(flats):
return dict(
error=f"Input mismatch, inputs[0]={string_type(inputs[0])} "
f"but names={names!r}, model={cls_model.__name__}, export={exporter!r}",
success=0,
error_step="inputs",
)
import onnxruntime
try:
sess = onnxruntime.InferenceSession(
onx.SerializeToString(), providers=["CPUExecutionProvider"]
)
except Exception as e:
if not quiet:
raise
return dict(error=str(e), success=0, error_step="ort-init")
mod = lambda *args, names=names: sess.run(None, _make_feeds(names, args)) # noqa: E731
# we need to clone for models modifying the inputs
try:
expected = model(*_clone(inputs[0]))
except Exception as e:
if not quiet:
raise RuntimeError(
f"eager mode failed=\n{string_type(inputs[0], with_shape=True)} "
f"\nmodel=\n{type(model)}"
) from e
return dict(error=str(e), success=0, error_step="eager")
try:
got = mod(*inputs[0])
except Exception as e:
if not quiet:
raise RuntimeError(
f"onnxruntime failed, feeds=\n{string_type(inputs[0], with_shape=True)} "
f"\nmodel=\n{pretty_onnx(onx)}"
) from e
return dict(error=str(e), success=0, error_step="run.0")
try:
disc = max_diff(expected, got)
except Exception as e:
if not quiet:
raise
return dict(error=str(e), success=0, error_step="discrepancy")
if verbose >= 5 and np.isinf(disc["abs"]):
print("[run_exporter] comparison issues with")
print(f"-- inputs={string_type(inputs[0], with_shape=True)}")
print(f"-- expected={string_type(expected, with_shape=True)}")
print(f"-- got={string_type(got, with_shape=True)}")
elif verbose >= 9:
print("[run_exporter] inputs and outputs")
print(f"-- inputs={string_type(inputs[0], with_shape=True, with_min_max=True)}")
print(f"-- expected={string_type(expected, with_shape=True, with_min_max=True)}")
print(f"-- got={string_type(got, with_shape=True, with_min_max=True)}")
del disc["n"]
del disc["sum"]
disc.update(
dict(
success=1 if disc["abs"] < 0.1 else 0,
model_cls=model.__class__,
exported=mod,
onnx=onx,
)
)
if disc["abs"] >= 0.1:
disc["error"] = "diff.0"
disc["error_step"] = "diff.0"
if verbose >= 9:
max_diff(expected, got, verbose=verbose)
else:
disc["success"] = 1
if dynamic and onx is not None:
ds = []
for i in onx.graph.input:
if i.type.tensor_type:
for di, dim in enumerate(i.type.tensor_type.shape.dim):
if dim.dim_param:
ds.append((i.name, di, dim.dim_param))
if verbose >= 2:
print(f"[run_exporter] dynamic dimension={ds}")
if not ds:
return dict(error="no dynamic shape", success=0, error_step="dynamic")
if dynamic and len(inputs) > 1:
for index, i in enumerate(inputs):
expected = model(*_clone(i))
try:
got = mod(*i)
except Exception as e:
if not quiet:
raise RuntimeError(
f"onnxruntime failed,\n-- feeds=\n{string_type(i, with_shape=True)} "
f"exporter={exporter!r}, dynamic_shapes={dynamic_shapes}"
f"\n-- model=\n{pretty_onnx(onx) if onx is not None else type(model)}"
) from e
return dict(error=str(e), success=0, error_step=f"run.{index}")
try:
d = max_diff(expected, got)
except Exception as e:
if not quiet:
raise
return dict(error=str(e), success=0, error_step=f"discrepancy.{index}")
if verbose >= 5 and np.isinf(d["abs"]):
print(f"[run_exporter] comparison issues iteration {index}")
print(f"-- inputs={string_type(i, with_shape=True)}")
print(f"-- expected={string_type(expected, with_shape=True)}")
print(f"-- got={string_type(got, with_shape=True)}")
elif verbose >= 9:
print(f"[run_exporter] inputs and outputs iteration {index}")
print(f"-- inputs={string_type(i, with_shape=True, with_min_max=True)}")
print(
f"-- expected={string_type(expected, with_shape=True, with_min_max=True)}"
)
print(f"-- got={string_type(got, with_shape=True, with_min_max=True)}")
del d["n"]
del d["sum"]
if d["abs"] >= 0.1:
d["error"] = f"diff.{index}"
d["error_step"] = f"diff.{index}"
d["success"] = 0
disc.update(d)
return disc