Source code for onnx_diagnostic.reference.torch_evaluator

import functools
from typing import Dict, List, Optional, Sequence, Tuple, Union
import numpy as np
import onnx
import torch
from ..helpers.torch_helper import to_tensor
from ..torch_onnx.runtime_info import first_used_last_used, RuntimeValue
from . import torch_ops


[docs] @functools.lru_cache def get_kernels() -> Dict[Tuple[str, str, int], type[torch_ops.OpRun]]: """ Retrieves all the available kernels class :class:`TorchOnnxEvaluator` can use. The full list is the following. .. runpython:: :showcode: from onnx_diagnostic.reference.torch_evaluator import get_kernels for k, v in sorted(get_kernels().items()): domain, name, version = k f = f"{name}({version})" if domain == "" else f"{name}[{domain}]({version})" add = " " * max(25 - len(f), 0) dd = " -- device dependent" if v.device_dependent() else "" print(f"{f}{add} -- {v.__name__}{dd}") """ res = {} for _k, v in torch_ops.__dict__.items(): if isinstance(v, type) and issubclass(v, torch_ops.OpRun) and "_" in v.__name__: name, version = v.__name__.split("_") domain = getattr(v, "domain", "") res[domain, name, int(version)] = v return res
[docs] class TorchOnnxEvaluator: """ Torch evaluator for onnx models. The model does not stores the original proto it evaluates to avoid :param proto: a proto :param providers: where to run the model :param opsets: needed if proto is a graph :param functions: known local functions :param verbose: verbosity level The class holds the following attributes: * `providers`: providers * `default_device`: default torch device * `constants`: all initializers or constants * `kernels`: kernels * `runtime_info`: produced by :func:`first_used_last_used <onnx_diagnostic.torch_onnx.runtime_info.first_used_last_used>` * `last_used`: contains the list of intermediate results, to remove after every node execution, this avoid the memory to grow too much * `functions`: local functions The class is not multithreaded. `runtime_info` gets updated by the the class. The list of available kernels is returned by function :func:`onnx_diagnostic.reference.torch_evaluator.get_kernels`. Example: .. runpython:: :showcode: import onnx import onnx.helper as oh import torch from onnx_diagnostic.helpers import string_type from onnx_diagnostic.reference import TorchOnnxEvaluator TFLOAT = onnx.TensorProto.FLOAT proto = oh.make_model( oh.make_graph( [ oh.make_node("Sigmoid", ["Y"], ["sy"]), oh.make_node("Mul", ["Y", "sy"], ["ysy"]), oh.make_node("Mul", ["X", "ysy"], ["final"]), ], "-nd-", [ oh.make_tensor_value_info("X", TFLOAT, [1, "b", "c"]), oh.make_tensor_value_info("Y", TFLOAT, ["a", "b", "c"]), ], [oh.make_tensor_value_info("final", TFLOAT, ["a", "b", "c"])], ), opset_imports=[oh.make_opsetid("", 18)], ir_version=9, ) sess = TorchOnnxEvaluator(proto) feeds = dict(X=torch.rand((4, 5)), Y=torch.rand((4, 5))) result = sess.run(None, feeds) print(string_type(result, with_shape=True, with_min_max=True)) Adding ``verbose=1`` shows which kernels is executed: .. runpython:: :showcode: import onnx import onnx.helper as oh import torch from onnx_diagnostic.helpers import string_type from onnx_diagnostic.reference import TorchOnnxEvaluator TFLOAT = onnx.TensorProto.FLOAT proto = oh.make_model( oh.make_graph( [ oh.make_node("Sigmoid", ["Y"], ["sy"]), oh.make_node("Mul", ["Y", "sy"], ["ysy"]), oh.make_node("Mul", ["X", "ysy"], ["final"]), ], "-nd-", [ oh.make_tensor_value_info("X", TFLOAT, [1, "b", "c"]), oh.make_tensor_value_info("Y", TFLOAT, ["a", "b", "c"]), ], [oh.make_tensor_value_info("final", TFLOAT, ["a", "b", "c"])], ), opset_imports=[oh.make_opsetid("", 18)], ir_version=9, ) sess = TorchOnnxEvaluator(proto, verbose=1) feeds = dict(X=torch.rand((4, 5)), Y=torch.rand((4, 5))) result = sess.run(None, feeds) print(string_type(result, with_shape=True, with_min_max=True)) It also shows when a result is not needed anymore. In that case, it is deleted to free the memory it takes. The runtime can also execute the kernel the onnx model on CUDA. It follows the same logic as :class:`onnxruntime.InferenceSession`: ``providers=["CUDAExecutionProvider"]``. It is better in that case to move the input on CUDA. The class tries to move every weight on CUDA but tries to keep any tensor identified as a shape in CPU. Some bugs may remain as torch raises an exception when devices are expected to be the same. The runtime was validated with model :epkg:`arnir0/Tiny-LLM`. """
[docs] class IO: "IO" def __init__(self, name: str, type: int, shape: Tuple[Union[str, int], ...]): self.name = name self.type = type self.shape = shape
@classmethod def _on_cuda(cls, providers) -> int: if not providers: return -1 for p in providers: if p == "CUDAExecutionProvider": return 0 if isinstance(p, tuple) and p[0] == "CUDAExecutionProvider": return p[1]["device_id"] return -1 def __init__( self, proto: Union[onnx.FunctionProto, onnx.GraphProto, onnx.ModelProto], providers: Tuple[str, ...] = ("CPUExecutionProvider",), opsets: Optional[Dict[str, int]] = None, local_functions: Optional[Dict[Tuple[str, str], "TorchOnnxEvaluator"]] = None, verbose: int = 0, ): self.providers = providers self.constants: Dict[str, torch.Tensor] = {} self.kernels: List[Optional[torch_ops.OpRun]] = [] self.functions = local_functions.copy() if local_functions else {} self.CPU = torch.tensor([0]).to("cpu").device self.verbose = verbose dev = self._on_cuda(providers) if dev < 0: self.default_device = self.CPU self.CUDA = None else: self.CUDA = torch.tensor([0]).to(f"cuda:{dev}").device self.default_device = self.CUDA if isinstance(proto, str): proto = onnx.load(proto) if isinstance(proto, onnx.ModelProto): assert opsets is None, "proto is a model, opsets must be None in that case" assert not proto.graph.sparse_initializer, "sparse_initializer not support yet" self.opsets = {d.domain: d.version for d in proto.opset_import} for f in proto.functions: self.functions[f.domain, f.name] = self.__class__( f, providers=providers, local_functions=self.functions, verbose=self.verbose, ) self._build_initializers(proto.graph.initializer) self._build_initializers(proto.graph.node) self._build_kernels(proto.graph.node) self.input_names = [i.name for i in proto.graph.input] self.output_names = [i.name for i in proto.graph.output] self._io_input_names = [ self.IO( name=i.name, type=i.type.tensor_type.elem_type, shape=tuple( d.dim_param or d.dim_value for d in i.type.tensor_type.shape.dim ), ) for i in proto.graph.input ] self._io_output_names = [ self.IO( name=i.name, type=i.type.tensor_type.elem_type, shape=tuple( d.dim_param or d.dim_value for d in i.type.tensor_type.shape.dim ), ) for i in proto.graph.output ] elif isinstance(proto, onnx.GraphProto): assert opsets, "opsets must be specified if proto is a graph" assert not proto.sparse_initializer, "sparse_initializer not support yet" self.opsets = opsets self._build_initializers(proto.initializer) self._build_initializers(proto.node) self._build_kernels(proto.node) self.input_names = [i.name for i in proto.input] self.output_names = [i.name for i in proto.output] elif isinstance(proto, onnx.FunctionProto): assert opsets is None, "proto is a model, opsets must be None in that case" self.opsets = {d.domain: d.version for d in proto.opset_import} self._build_initializers(proto.node) self._build_kernels(proto.node) self.input_names = list(proto.input) self.output_names = list(proto.output) else: raise TypeError(f"Unexpected type {type(proto)} for proto") self.runtime_info = first_used_last_used(proto, constant_as_initializer=True) self.last_used: List[List[str]] = [[] for _ in self.kernels] for name, info in self.runtime_info.items(): assert isinstance(info.last_used, int) or info.is_input, ( f"Missing field last_used in {info!r}, last_used={info.last_used!r}, " f"This may mean the node is unused and it should be removed." ) if info.last_used is None: # Not used. self.last_used[0].append(name) elif not info.is_output and not info.is_initializer: self.last_used[info.last_used].append(name)
[docs] def get_inputs(self): "Same API than onnxruntime." assert hasattr(self, "_io_input_names"), "Missing attribute '_io_input_names'." return self._io_input_names
[docs] def get_outputs(self): "Same API than onnxruntime." assert hasattr(self, "_io_output_names"), "Missing attribute '_io_output_names'." return self._io_output_names
@property def on_cuda(self) -> bool: "Tells if the default device is CUDA." return self.default_device == self.CUDA def _build_initializers(self, inits: Sequence[Union[onnx.NodeProto, onnx.TensorProto]]): for init in inits: if isinstance(init, onnx.TensorProto): self.constants[init.name] = to_tensor(init).to(self.default_device) elif ( isinstance(init, onnx.NodeProto) and init.op_type == "Constant" and init.domain == "" ): value = None for att in init.attribute: if att.name == "value": value = to_tensor(att.t).to(self.default_device) elif att.name == "value_floats": value = torch.tensor(list(att.floats), dtype=torch.float32).to( self.default_device ) assert value is not None, f"No attribute value in node {init}" self.constants[init.output[0]] = value def _build_kernels(self, nodes: Sequence[onnx.NodeProto]): kernels = get_kernels() self.kernels.clear() for node in nodes: if (node.domain, node.op_type) in self.functions: kernel = torch_ops.OpRunFunction( self.functions[node.domain, node.op_type], node, self.opsets[node.domain] ) self.kernels.append(kernel) continue if node.op_type == "Constant" and node.domain == "": # Treated as a constant. self.kernels.append(None) continue opset = self.opsets[node.domain] key = node.domain, node.op_type, opset while key not in kernels and opset > 0: opset -= 1 key = node.domain, node.op_type, opset assert key in kernels, ( f"Missing kernel for node type {node.op_type!r} from domain {node.domain!r}, " f"local functions={sorted(self.functions)}" ) cls = kernels[key] ags = [self.default_device] if cls.device_dependent() else [] kws = dict(parent=self) if cls.has_subgraphs() else {} kernel2 = cls(node, opset, *ags, **kws) self.kernels.append(kernel2)
[docs] def run( self, outputs: Optional[List[str]], feeds: Union[Dict[str, torch.Tensor], Dict[str, np.ndarray]], ) -> Union[List[Optional[torch.Tensor]], List[Optional[np.ndarray]]]: """ Runs the ONNX model. :param outputs: outputs required :param feeds: inputs :return: output tensors. """ use_numpy = any(isinstance(t, np.ndarray) for t in feeds.values()) if use_numpy: feeds = {k: torch.from_numpy(v) for k, v in feeds.items()} if outputs is None: outputs = self.output_names # sets constants for k, v in self.constants.items(): r = self.runtime_info[k] if not r.has_value: r.set_value( torch_ops.OpRunTensor( v.to(self.CUDA) if not r.is_shape and self.on_cuda else v, is_constant=True, may_cpu=len(v.shape) == 1 and v.numel() < 8 and v.dtype == torch.int64, ) ) if self.verbose: print(f"+C {r.name}: {r.string_type()}") # inputs for k, v in feeds.items(): r = self.runtime_info[k] r.set_value( torch_ops.OpRunTensor( v.to(self.CUDA) if not r.is_shape and self.on_cuda else v, is_constant=False, may_cpu=len(v.shape) == 1 and v.numel() < 8 and v.dtype == torch.int64, ) ) if self.verbose: print(f"+I {r.name}: {r.string_type()}") # node execution for it, kernel in enumerate(self.kernels): if kernel is not None: if self.verbose: print( f"{kernel.__class__.__name__}" f"({', '.join(kernel.input)}) -> " f"{', '.join(kernel.output)}" ) # kernel execution inputs = [(self.runtime_info[i].value if i else None) for i in kernel.input] if kernel.has_subgraphs(): res = kernel.run(*inputs, context=self.runtime_info) # type: ignore[call-arg] else: res = kernel.run(*inputs) if isinstance(res, tuple): # outputs assert all(isinstance(o, torch_ops.OpRunValue) for o in res), ( f"Unexpected output type {[type(o) for o in res]} " f"for kernel {type(kernel)}." ) for name, t in zip(kernel.output, res): self.runtime_info[name].set_value(t) if self.verbose: for name in kernel.output: print(f"+R {name}: {self.runtime_info[name].string_type()}") else: assert isinstance( res, torch_ops.OpRunValue ), f"Unexpected output type {type(res)} for kernel {type(kernel)}." self.runtime_info[kernel.output[0]].set_value(res) if self.verbose: print( f"+R {kernel.output[0]}: " f"{self.runtime_info[kernel.output[0]].string_type()}" ) # free intermediate results for name in self.last_used[it]: self.runtime_info[name].clean_value() if self.verbose: print(f"- clean {name}") assert all( self.runtime_info[o].value is not None for o in outputs ), "Not implemented yet when one output is None." fres = [self.runtime_info[o].value.tensor for o in outputs] # type: ignore[union-attr] if self.verbose: print(f"++ outputs {', '.join(outputs)}") # clean previous execution for k in feeds: self.runtime_info[k].clean_value() if self.verbose: print(f"- clean {k}") for o in outputs: self.runtime_info[o].clean_value() if self.verbose: print(f"- clean {o}") if use_numpy: return [None if a is None else a.detach().cpu().numpy() for a in fres] return fres
[docs] def run_with_values( self, *args: Optional[torch_ops.OpRunTensor], context: Optional[Dict[str, RuntimeValue]] = None, ) -> Union[torch_ops.OpRunValue, Tuple[torch_ops.OpRunValue, ...]]: """ Runs the ONNX model. :param args: inputs :param context: local context for the execution of subgraphs :return: output OpRunTensor """ assert all( isinstance(a, torch_ops.OpRunValue) for a in args ), f"Unexpected type in args: {[type(a) for a in args]}" outputs = self.output_names context = context or {} # sets constants for k, v in self.constants.items(): r = self.runtime_info[k] if not r.has_value: r.set_value( torch_ops.OpRunTensor( v.to(self.CUDA) if r.is_shape is False and self.on_cuda else v, is_constant=True, may_cpu=len(v.shape) == 1 and v.numel() < 8 and v.dtype == torch.int64, ) ) # inputs for k, v in zip(self.input_names, args): r = self.runtime_info[k] r.set_value( torch_ops.OpRunTensor(None) if v is None else v.__class__(v.tensor_or_sequence) ) # node execution for it, kernel in enumerate(self.kernels): if kernel is not None: # kernel execution inputs = [ ( ( self.runtime_info[i].value if i in self.runtime_info else context[i].value ) if i else None ) for i in kernel.input ] res = kernel.run(*inputs) if isinstance(res, tuple): # outputs assert all(isinstance(o, torch_ops.OpRunTensor) for o in res), ( f"Unexpected output type {[type(o) for o in res]} " f"for kernel {type(kernel)}." ) for name, t in zip(kernel.output, res): self.runtime_info[name].set_value(t) else: assert isinstance( res, torch_ops.OpRunValue ), f"Unexpected output type {type(res)} for kernel {type(kernel)}." self.runtime_info[kernel.output[0]].set_value(res) # free intermediate results for name in self.last_used[it]: self.runtime_info[name].clean_value() assert all( self.runtime_info[o].value is not None for o in outputs ), "Not implemented yet when one output is None." res2 = [self.runtime_info[o].value.copy() for o in outputs] # type: ignore[assignment, union-attr] # clean previous execution for k in self.input_names: self.runtime_info[k].clean_value() for o in self.output_names: self.runtime_info[o].clean_value() return res2[0] if len(res2) == 1 else tuple(res2) # type: ignore[index, return-value, arg-type]