import os
from typing import Any, Dict, List, Optional, Set, Union
import numpy as np
from onnx import FunctionProto, ModelProto, NodeProto, TensorProto
from onnx.defs import get_schema
from onnx.helper import make_graph, make_model, make_node, make_tensor_value_info
from onnx.numpy_helper import from_array
from onnx.reference import ReferenceEvaluator
from onnx.reference.custom_element_types import (
bfloat16,
float8e4m3fn,
float8e4m3fnuz,
float8e5m2,
float8e5m2fnuz,
)
from onnx.reference.op_run import OpRun
[docs]def from_array_extended(tensor: np.ndarray, name: Optional[str] = None) -> TensorProto:
"""
Converts an array into a TensorProto including float 8 types.
:param tensor: numpy array
:param name: name
:return: TensorProto
"""
dt = tensor.dtype
if dt == float8e4m3fn and dt.descr[0][0] == "e4m3fn":
to = TensorProto.FLOAT8E4M3FN
dt_to = np.uint8 # type: ignore[assignment]
elif dt == float8e4m3fnuz and dt.descr[0][0] == "e4m3fnuz":
to = TensorProto.FLOAT8E4M3FNUZ
dt_to = np.uint8 # type: ignore[assignment]
elif dt == float8e5m2 and dt.descr[0][0] == "e5m2":
to = TensorProto.FLOAT8E5M2
dt_to = np.uint8 # type: ignore[assignment]
elif dt == float8e5m2fnuz and dt.descr[0][0] == "e5m2fnuz":
to = TensorProto.FLOAT8E5M2FNUZ
dt_to = np.uint8 # type: ignore[assignment]
elif dt == bfloat16 and dt.descr[0][0] == "bfloat16":
to = TensorProto.BFLOAT16
dt_to = np.uint16 # type: ignore[assignment]
else:
return from_array(tensor, name)
t = from_array(tensor.astype(dt_to), name)
t.data_type = to
return t
[docs]class CReferenceEvaluator(ReferenceEvaluator):
"""
This class replaces the python implementation by C implementation
for a short list of operators quite slow in python (such as `Conv`).
The class automatically replaces a python implementation
by a C implementation if available. See example :ref:`l-example-conv`.
::
from onnx.reference import ReferenceEvaluator
from onnx_extended.reference.c_ops import Conv
ref = ReferenceEvaluator(..., new_ops=[Conv])
See :class:`onnx.reference.ReferenceEvaluator` for a detailed documentation.
**Additions**
Parameter **save_intermediate** can be set to a folder to save intermediate
results in this folder. It follows the same design as the backend test.
Let's consider a model with the following nodes:
::
<
ir_version: 8,
opset_import: [ "" : 18]
>
agraph (float[N, 128] X, float[128,10] W, float[10] B) => (float[N] C)
{
T = MatMul(X, W)
S = Add(T, B)
C = Softmax(S)
}
It will produce the following files after it is run with
`CReferenceEvaluator(..., save_intermediate="modelrun")`.
::
modelrun
+-- test_node_0_MatMul
| +-- model.onnx
| +-- test_data_set_0
| + input_0.pb
| + input_1.pb
| + output_0.pb
+-- test_node_1_Add
| +-- model.onnx
| +-- test_data_set_0
| + input_0.pb
| + input_1.pb
| + output_0.pb
+-- test_node_2_Softmax
+-- model.onnx
+-- test_data_set_0
+ input_0.pb
+ output_0.pb
These files can then be run with a different runtime to look for discrepancies.
Following example executes node by node with onnxruntime.
::
from onnx.backend.test.loader import load_model_tests
from onnx.reference.c_reference_backend import (
ReferenceEvaluatorBackend,
create_reference_backend,
)
from onnxruntime import InferenceSession
root = "folder which folder modelrun"
examples = load_model_tests(root, "modelrun")
class Wrapper(InferenceSession):
def __init__(self, model, *args, providers=None, **kwargs):
super().__init__(
model.SerializeToString(),
*args,
providers=providers or ["CPUExecutionProvider"],
**kwargs,
)
def run(self, *args, **kwargs):
return InferenceSession.run(self, *args, **kwargs)
@property
def input_names(self):
return [i.name for i in self.get_inputs()]
@property
def output_names(self):
return [o.name for o in self.get_outputs()]
new_cls = ReferenceEvaluatorBackend[NewRef]
backend = create_reference_backend(new_cls, path_to_test=root)
beckend.run()
.. versionadded:: 0.2.0
"""
@staticmethod
def default_ops():
from onnx_extended.reference.c_ops.c_op_conv import Conv
from onnx_extended.reference.c_ops.c_op_tree_ensemble_regressor import (
TreeEnsembleRegressor_1,
TreeEnsembleRegressor_3,
)
from onnx_extended.reference.c_ops.c_op_tree_ensemble_classifier import (
TreeEnsembleClassifier_1,
TreeEnsembleClassifier_3,
)
from onnx_extended.reference.c_custom_ops.custom_op_tree_ensemble_regressor import ( # noqa: E501
TreeEnsembleRegressor_1 as TreeEnsembleRegressor_1_Float,
)
return [
Conv,
TreeEnsembleClassifier_1,
TreeEnsembleClassifier_3,
TreeEnsembleRegressor_1,
TreeEnsembleRegressor_3,
TreeEnsembleRegressor_1_Float,
]
@staticmethod
def filter_ops(
proto: Union[FunctionProto, ModelProto],
new_ops: List[OpRun],
opsets: Dict[str, int],
) -> List[OpRun]:
if opsets is None and isinstance(proto, (ModelProto, FunctionProto)):
opsets = {d.domain: d.version for d in proto.opset_import}
best = {}
renamed = {}
for cl in new_ops:
if "_" not in cl.__name__:
continue
vers = cl.__name__.split("_")
try:
v = int(vers[-1])
except ValueError:
# not a version
continue
if opsets is not None and v > opsets.get(cl.op_domain, 1):
continue
renamed[cl.__name__] = cl
key = cl.op_domain, "_".join(vers[:-1])
if key not in best or best[key][0] < v:
best[key] = (v, cl)
modified = []
for cl in new_ops:
if cl.__name__ not in renamed:
modified.append(cl)
for k, v in best.items():
atts = {"domain": k[0]}
bases = (v[1],)
if not hasattr(v[1], "op_schema"):
atts["op_schema"] = get_schema(k[1], v[0], domain=v[1].op_domain)
new_cl = type(k[1], bases, atts)
modified.append(new_cl)
new_ops = modified
return new_ops
def __init__(
self,
proto: Any,
opsets: Optional[Dict[str, int]] = None,
functions: Optional[List[Union[ReferenceEvaluator, FunctionProto]]] = None,
verbose: int = 0,
new_ops: Optional[List[OpRun]] = None,
save_intermediate: Optional[str] = None,
**kwargs,
):
if new_ops is None:
new_ops = CReferenceEvaluator.default_ops()
else:
new_ops = new_ops.copy()
new_ops.extend(CReferenceEvaluator.default_ops())
new_ops = CReferenceEvaluator.filter_ops(proto, new_ops, opsets)
ReferenceEvaluator.__init__(
self,
proto,
opsets=opsets,
functions=functions,
verbose=verbose,
new_ops=new_ops,
**kwargs,
)
self.save_intermediate = save_intermediate
if save_intermediate is not None:
self._cached_saved_results = {}
[docs] def run(
self,
output_names,
feed_inputs: Dict[str, Any],
attributes: Optional[Dict[str, Any]] = None,
):
"""
Executes the onnx model.
:param output_names: requested outputs by names,
None for all
:param feed_inputs: dictionary `{ input name: input value }`
:param attributes: attributes value if the instance runs a FunctionProto
:return: list of requested outputs
"""
if output_names is None:
output_names = self.output_names
if isinstance(self.proto_, FunctionProto) and attributes is None:
raise TypeError(f"Unexpected proto type {type(self.proto_)}.")
# step 1: inputs and initializers
results = {"": None}
results.update(self.rt_inits_)
results.update(feed_inputs)
for k, v in self.rt_inits_.items():
self._log(2, " +C %s: %s", k, v)
for k, v in feed_inputs.items():
self._log(2, " +I %s: %s", k, v)
# step 2: execute nodes
for index, node in enumerate(self.rt_nodes_):
self._log(1, "%s(%s) -> %s", node.op_type, node.input, node.output)
inputs = [results[i] for i in node.input]
linked_attributes = {}
if node.has_linked_attribute and attributes:
linked_attributes["linked_attributes"] = attributes
if node.need_context():
outputs = node.run(*inputs, context=results, **linked_attributes)
else:
outputs = node.run(*inputs, **linked_attributes)
for name, value in zip(node.output, outputs):
if isinstance(value, tuple):
raise TypeError(
f"Unexected type {type(value)} for output {name!r}."
)
self._log(2, " + %s: %s", name, value)
results[name] = value
if self.save_intermediate is not None:
hidden_names = self._retrieve_hidden_inputs(node.onnx_node)
set_inputs = set(node.input)
hidden = {
k: v
for k, v in results.items()
if k in hidden_names and k not in set_inputs
}
self._save_intermerdiate_results(index, node, inputs, outputs, hidden)
# return the results
list_results: list[Any] = []
for name in output_names:
if name not in results:
raise RuntimeError(
f"Unable to find output name {name!r} "
f"in {sorted(results)}, proto is\n{self.proto_}"
)
list_results.append(results[name])
return list_results
@staticmethod
def _retrieve_input_names(nodes: List[NodeProto]) -> Set[str]:
inputs = set()
for node in nodes:
inputs |= set(node.input)
for att in node.attribute:
if att.g:
inputs |= CReferenceEvaluator._retrieve_input_names(att.g.node)
return inputs
def _retrieve_hidden_inputs(self, node: NodeProto) -> Set[str]:
names = set()
for att in node.attribute:
if att.g:
names |= CReferenceEvaluator._retrieve_input_names(att.g.node)
return names
def _save_intermerdiate_results(
self,
index: int,
node: NodeProto,
inputs: List[np.ndarray],
outputs: List[np.ndarray],
hidden: Dict[str, np.ndarray],
) -> str:
"""
Saves intermediate results into a folder with
the same organization as the backend tests.
:param index: index of the node in the graph
:param node: node proto
:param inputs: inputs of the node
:param outputs: outputs of the node, supposedly the expected outputs
:param hidden: hidden variables if the node has a subgraph
:return: test folder
"""
def get_shape(t):
return list(t.dims)
input_protos = [
from_array_extended(inp, name) for name, inp in zip(node.input, inputs)
]
output_protos = [
from_array_extended(out, name) for name, out in zip(node.output, outputs)
]
constants = []
if hidden:
constants.extend(
[
make_node("Constant", [], [name], value=from_array_extended(value))
for name, value in hidden.items()
]
)
model = make_model(
make_graph(
[*constants, node.onnx_node],
f"test_{node.op_type}",
[
make_tensor_value_info(i.name, i.data_type, get_shape(i))
for i in input_protos
],
[
make_tensor_value_info(o.name, o.data_type, get_shape(o))
for o in output_protos
],
),
opset_imports=self.proto_.opset_import,
)
model_bytes = model.SerializeToString()
if model_bytes not in self._cached_saved_results:
sub = f"test_node_{index}_{node.op_type}"
path = os.path.join(self.save_intermediate, sub)
if not os.path.exists(path):
os.makedirs(path)
self._cached_saved_results[model_bytes] = [path, -1]
model_file = os.path.join(path, "model.onnx")
with open(model_file, "wb") as f:
f.write(model_bytes)
path, n_example = self._cached_saved_results[model_bytes]
self._cached_saved_results[model_bytes][1] += 1
n_example += 1
sub_path = os.path.join(path, f"test_data_set_{n_example}")
if not os.path.exists(sub_path):
os.mkdir(sub_path)
for i, t in enumerate(input_protos):
with open(os.path.join(sub_path, f"input_{i}.pb"), "wb") as f:
f.write(t.SerializeToString())
for i, t in enumerate(output_protos):
with open(os.path.join(sub_path, f"output_{i}.pb"), "wb") as f:
f.write(t.SerializeToString())
return path