Source code for experimental_experiment.mini_onnx_builder

import ctypes
import sys
from typing import Any, Dict, List, Optional, Tuple, Union
import numpy as np
from onnx import GraphProto, ModelProto, TensorProto
import onnx.helper as oh
import onnx.numpy_helper as onh
from .helpers import string_type

STORAGE_TYPE = {
    TensorProto.FLOAT16: np.int16,
    TensorProto.BFLOAT16: np.int16,
}


def _get_type(elem_type: Any, exc: bool = True) -> int:
    if not isinstance(elem_type, int):
        st = str(elem_type)
        if "float32" in st:
            elem_type = TensorProto.FLOAT
        elif "float64" in st:
            elem_type = TensorProto.DOUBLE
        elif "bfloat16" in st:
            elem_type = TensorProto.BFLOAT16
        elif "float16" in st:
            elem_type = TensorProto.FLOAT16
        elif "uint64" in st:
            elem_type = TensorProto.UINT64
        elif "int64" in st:
            elem_type = TensorProto.INT64
        elif "uint32" in st:
            elem_type = TensorProto.UINT32
        elif "int32" in st:
            elem_type = TensorProto.INT32
        elif "uint16" in st:
            elem_type = TensorProto.UINT16
        elif "int16" in st:
            elem_type = TensorProto.INT16
        elif "bool" in st:
            elem_type = TensorProto.BOOL
        elif "uint8" in st:
            elem_type = TensorProto.UINT8
        elif "int8" in st:
            elem_type = TensorProto.INT8
        elif "complex64" in st:
            elem_type = TensorProto.COMPLEX64
        elif "complex128" in st:
            elem_type = TensorProto.COMPLEX128
        elif elem_type is None:
            elem_type = TensorProto.UNDEFINED
        elif exc:
            raise ValueError(f"Unable to interpret elem_type {elem_type!r}.")
    return elem_type


[docs] def torch_dtype_to_onnx_dtype(to: "torch.dtype") -> int: # noqa: F821 """ Converts a torch dtype into a onnx element type. :param to: torch dtype :return: onnx type """ import torch if to == torch.float32: return TensorProto.FLOAT if to == torch.float16: return TensorProto.FLOAT16 if to == torch.bfloat16: return TensorProto.BFLOAT16 if to == torch.float64: return TensorProto.DOUBLE if to == torch.int64: return TensorProto.INT64 if to == torch.int32: return TensorProto.INT32 if to == torch.bool: return TensorProto.BOOL if to == torch.SymInt: return TensorProto.INT64 if to == torch.SymFloat: return TensorProto.FLOAT if to == torch.complex64: return TensorProto.COMPLEX64 if to == torch.complex128: return TensorProto.COMPLEX128 raise NotImplementedError(f"Unable to convert torch dtype {to!r} to onnx dtype.")
[docs] def dtype_to_tensor_dtype(dt: "dtype") -> int: # noqa: F821 """ Converts a torch dtype or numpy dtype into a onnx element type. :param to: dtype :return: onnx type """ try: return oh.np_dtype_to_tensor_dtype(dt) except (KeyError, TypeError): pass return torch_dtype_to_onnx_dtype(dt)
[docs] def proto_from_array( arr: "torch.Tensor", # noqa: F821 name: Optional[str] = None, verbose: int = 0, ) -> TensorProto: """ Converts a torch Tensor into a TensorProto. :param arr: tensor :param verbose: display the type and shape :return: a TensorProto """ import sys import torch if not isinstance(arr, torch.Tensor): raise TypeError(f"Unexpected type {type(arr)}.") if arr.is_sparse: raise NotImplementedError( f"Sparse tensor is not supported yet but initializer {name!r} is." ) # arr.contiguous() is slow after a transpose, maybe there is a way to optimize this. if arr.is_contiguous(): arr_cpu = arr.cpu() else: arr_cpu = arr.contiguous().cpu() numel = torch.numel(arr_cpu) element_size = arr_cpu.element_size() if arr_cpu.dtype in {torch.bfloat16}: np_arr = arr_cpu elif arr_cpu.data_ptr() == arr.data_ptr(): copy = arr_cpu.clone().detach().requires_grad_(False) assert arr_cpu.data_ptr() != copy.data_ptr() np_arr = np.from_dlpack(copy) else: np_arr = np.from_dlpack(arr_cpu.detach()) tensor = TensorProto() tensor.dims.extend(arr_cpu.shape) tensor.name = name itype = _get_type(arr_cpu.dtype) assert not hasattr(TensorProto, "INT4") or itype not in { TensorProto.INT4, TensorProto.UINT4, }, f"Type {arr.dtype} is not supported yet for name={name!r}" tensor.data_type = itype if verbose > 1 and numel > 100: print(f"[proto_from_array] {tensor.data_type}[{arr_cpu.shape}]") if isinstance(np_arr, torch.Tensor): byte_data = (ctypes.c_ubyte * numel * element_size).from_address(np_arr.data_ptr()) tensor.raw_data = bytes(byte_data) if sys.byteorder == "big": np_dtype = oh.tensor_dtype_to_np_dtype(STORAGE_TYPE[tensor.data_type]) np.byteswap(np.frombuffer(tensor.raw_data, dtype=np_dtype), inplace=True) else: tensor.raw_data = np_arr.tobytes() if sys.byteorder == "big": np_dtype = oh.tensor_dtype_to_np_dtype(tensor.data_type) np.byteswap(np.frombuffer(tensor.raw_data, dtype=np_dtype), inplace=True) return tensor
[docs] class MiniOnnxBuilder: """ Simplified builder to build very simple model. :param target_opset: opset to specify :param ir_verison: IR version to use """ def __init__(self, target_opset: int = 18, ir_version: int = 10): import torch self.initializers_dict = {} self.inputs = [] self.outputs = [] self.nodes = [] self.opsets = {"": target_opset} self.ir_version = ir_version self.torch = torch
[docs] def append_output_initializer( self, name: str, tensor: Union[np.ndarray, "torch.Tensor"], # noqa: F821 randomize: bool = False, ): # noqa: F821 """ Adds an initializer as an output. The initializer name is prefixed by ``t_``. The output name is *name*. If `randomize` is True, the tensor is not stored but replaced by a random generator. """ if randomize: dtype = dtype_to_tensor_dtype(tensor.dtype) if dtype in { TensorProto.FLOAT, TensorProto.FLOAT16, TensorProto.DOUBLE, TensorProto.BFLOAT16, }: mini, maxi = tensor.min(), tensor.max() if mini < 0 and maxi > 0: op_type = "RandomNormal" kwargs = { "mean": float(tensor.mean()), "scale": float(tensor.std()), "seed": 0.0, } else: op_type = "RandomUniform" kwargs = { "low": float(mini), "high": float(maxi), "seed": 0.0, } shape = tuple(map(int, tensor.shape)) self.nodes.append( oh.make_node(op_type, [], [name], dtype=dtype, shape=shape, **kwargs) ) self.outputs.append(oh.make_tensor_value_info(name, dtype, shape)) return init_name = f"t_{name}" self.initializers_dict[init_name] = tensor shape = tuple(map(int, tensor.shape)) self.outputs.append( oh.make_tensor_value_info(name, dtype_to_tensor_dtype(tensor.dtype), shape) ) self.nodes.append(oh.make_node("Identity", [init_name], [name]))
[docs] def append_output_sequence( self, name: str, tensors: List[Union[np.ndarray, "torch.Tensor"]] # noqa: F821 ): # noqa: F821 """ Adds a sequence of initializers as an output. The initializers names are prefixed by ``seq_``. The output name is ``name``. """ if not tensors: # empty list self.nodes.append(oh.make_node("SequenceEmpty", [], [name])) tensor_type_proto = oh.make_tensor_type_proto( elem_type=TensorProto.FLOAT, shape=None ) else: assert all( isinstance(t, (np.ndarray, self.torch.Tensor)) for t in tensors ), f"Nested sequences are not supported, types are {[type(t) for t in tensors]}" names = [] for i, t in enumerate(tensors): init_name = f"seq_{name}_{i}" self.initializers_dict[init_name] = t names.append(init_name) self.nodes.append(oh.make_node("SequenceConstruct", names, [name])) tensor_type_proto = oh.make_tensor_type_proto( elem_type=dtype_to_tensor_dtype(tensors[0].dtype), shape=None ) sequence_type_proto = oh.make_sequence_type_proto(tensor_type_proto) output = oh.make_value_info(name, type_proto=sequence_type_proto) self.outputs.append(output)
[docs] def append_output_dict( self, name: str, tensors: Dict[str, Union[np.ndarray, "torch.Tensor"]] # noqa: F821 ): # noqa: F821 """ Adds two outputs, a string tensors for the keys and a sequence of tensors for the values. The output name is ``name__keys`` and ``name__values``. """ keys = [] values = [] for k, v in tensors.items(): keys.append(k) values.append(v) self.append_output_initializer(f"{name}__keys", np.array(keys, dtype=np.str_)) self.append_output_sequence(f"{name}__values", values)
def _build_initializers( self, switch_low_high: bool ) -> Tuple[List[TensorProto], Dict[str, TensorProto]]: """ Builds initializers. :param switch_low_high: invert low, high precision :return: a list of tensors to stored in the model """ init_dict = self.initializers_dict if switch_low_high: # Let's try to minimize the time. initializer = [] for k, v in init_dict.items(): if isinstance(v, TensorProto): initializer.append(v) continue if isinstance(v, np.ndarray): itype = dtype_to_tensor_dtype(v.dtype) if itype in { TensorProto.BOOL, TensorProto.STRING, TensorProto.UNDEFINED, TensorProto.COMPLEX64, TensorProto.COMPLEX128, getattr(TensorProto, "UINT4", 0), getattr(TensorProto, "INT4", 0), }: t = onh.from_array(v, name=k) initializer.append(t) continue from_np = True elif isinstance(v, np.float32): t = onh.from_array(np.array([v], dtype=np.float32), name=k) initializer.append(t) continue elif isinstance(v, np.float16): t = onh.from_array(np.array([v], dtype=np.float16), name=k) initializer.append(t) continue else: assert isinstance( v, self.torch.Tensor ), f"tensor {k!r} has un unexpected type {type(v)}" assert "FakeTensor" not in str( type(v) ), f"tensor {k!r} cannot be a FakeTensor: {type(v)}" from_np = False itype = dtype_to_tensor_dtype(v.dtype) # How to avoid a copy? if from_np: tensor = TensorProto() tensor.name = k tensor.dims.extend(v.shape) tensor.data_type = itype tensor.raw_data = v.tobytes() else: tensor = proto_from_array(v, name=k) initializer.append(tensor) return initializer res = [] for k, v in init_dict.items(): if isinstance(v, TensorProto): res.append(v) continue if isinstance(v, self.torch.Tensor): # no string tensor t = self.from_array(v, name=k) res.append(t) continue if isinstance(v, np.ndarray): t = onh.from_array(v, name=k) res.append(t) continue raise TypeError( f"Unable to convert initializer {k!r} with type " f"{type(v)} into a TensorProto." ) return res
[docs] def to_onnx(self) -> ModelProto: """ Conversion to onnx. :return: the proto """ opsets = [oh.make_opsetid(*o) for o in self.opsets.items()] ir_version = self.ir_version model = ModelProto() model.graph.CopyFrom(GraphProto()) model.graph.name = "mini_model" model.graph.input.extend(self.inputs) model.graph.node.extend(self.nodes) model.graph.output.extend(self.outputs) initializers = self._build_initializers(switch_low_high=sys.byteorder != "big") model.graph.initializer.extend(initializers) model.opset_import.extend(opsets) model.ir_version = ir_version return model
[docs] def create_onnx_model_from_input_tensors( inputs: Any, switch_low_high: Optional[bool] = None, randomize: bool = False ) -> ModelProto: """ Creates a model proto including all the value as initializers. They can be restored by executing the model. We assume these inputs are not bigger than 2Gb, the limit of protobuf. :param inputs: anything :param switch_low_high: if None, it is equal to ``switch_low_high=sys.byteorder != "big"`` :param randomize: if True, float tensors are not stored but randomized to save space :return: ModelProto The function raises an error if not supported. """ import torch if switch_low_high is None: switch_low_high = sys.byteorder != "big" def flatten(obj): if isinstance(obj, np.ndarray): yield "array", obj elif isinstance(obj, torch.Tensor): yield "tensor", obj elif isinstance(obj, bool): yield "bool", np.array([obj], dtype=np.bool_) elif isinstance(obj, tuple): if not obj: yield "tuple.__empty", None else: for i, o in enumerate(obj): if i == len(obj) - 1: for p, oo in flatten(o): yield f"tuple.__{p}", oo else: for p, oo in flatten(o): yield f"tuple__{p}", oo elif isinstance(obj, list): if not obj: yield "list.__empty", None else: for i, o in enumerate(obj): if i == len(obj) - 1: for p, oo in flatten(o): yield f"list.__{p}", oo else: for p, oo in flatten(o): yield f"list__{p}", oo elif isinstance(obj, dict): if not obj: yield "dict.__empty", None else: for i, (k, v) in enumerate(obj.items()): assert "__" not in k, ( f"Key {k!r} cannot contain '__'. " f"It would interfer with the serialization." ) if i == len(obj) - 1: for p, o in flatten(v): yield f"dict._{k}__{p}", o else: for p, o in flatten(v): yield f"dict_{k}__{p}", o elif obj.__class__.__name__ == "DynamicCache": # transformers import transformers assert isinstance( obj, transformers.cache_utils.DynamicCache ), f"Unexpected type {type(obj)}" new_obj = dict(key_cache=obj.key_cache, value_cache=obj.value_cache) for p, o in flatten(new_obj): yield f"DynamicCache.__{p}", o else: raise NotImplementedError(f"Unexpected type {type(obj)}") builder = MiniOnnxBuilder() for prefix, o in flatten(inputs): if o is None: builder.append_output_initializer(prefix, np.array([])) else: builder.append_output_initializer(prefix, o, randomize=randomize) model = builder.to_onnx() model.doc_string = string_type(inputs, True, True) return model
[docs] def create_input_tensors_from_onnx_model( proto: Union[str, ModelProto], device: str = "cpu", engine: str = "ExtendedReferenceEvaluator", ) -> Union[Tuple[Any, ...], Dict[str, Any]]: """ Deserializes tensors stored with function :func:`create_onnx_model_from_input_tensors`. It relies on :class:`ExtendedReferenceEvaluator <experimental_experiment.reference.ExtendedReferenceEvaluator>` to restore the tensors. :param proto: ModelProto or the file itself :param device: moves the tensor to this device :param engine: runtime to use, onnx, the default value, onnxruntime :return: ModelProto """ if engine == "ExtendedReferenceEvaluator": from .reference import ExtendedReferenceEvaluator sess = ExtendedReferenceEvaluator(proto) names = sess.output_names elif engine == "onnx": from onnx.reference import ReferenceEvaluator sess = ReferenceEvaluator(proto) names = sess.output_names elif engine == "onnxruntime": from onnxruntime import InferenceSession sess = InferenceSession( proto if isinstance(proto, str) else proto.SerializeToString(), providers=["CPUExecutionProvider"], ) names = [i.name for i in sess.get_outputs()] else: raise AssertionError(f"Unexpected value for engine={engine!r}") got = sess.run(None, {}) if len(names) == 1: name = names[0] output = got[0] if name == "empty": return None if name == "array": return output if name == "bool": return bool(output[0]) if name == "tensor": import torch return torch.from_numpy(output).to(device) raise AssertionError(f"Unexpected name {name!r} in {names}") def unflatten(names, outputs, pos=0, level=0): name = names[pos] spl = name.split("__") if len(spl) == level + 1: # A tensor. if spl[-1] == "empty": return pos + 1, None if spl[-1] == "bool": return pos + 1, bool(outputs[pos][0]) if spl[-1] == "array": return pos + 1, outputs[pos] if spl[-1] == "tensor": import torch return pos + 1, torch.from_numpy(outputs[pos]).to(device) raise AssertionError(f"Unexpected name {name!r} in {names}") res = [] while True: assert pos < len(names), f"Something went wrong with names={names!r}\nres={res!r}" name = names[pos] spl = name.split("__") prefix = spl[level] next_pos, value = unflatten(names, outputs, pos=pos, level=level + 1) if prefix.startswith("DynamicCache"): assert prefix == "DynamicCache.", f"This should be final but prefix={prefix!r}" from transformers.cache_utils import DynamicCache cache = DynamicCache() for k, v in value.items(): setattr(cache, k, v) return next_pos, cache if prefix.startswith("dict"): key = prefix.split("_", maxsplit=1)[-1] res.append((key, value)) end = prefix[4] == "." else: res.append(value) end = prefix[-1] == "." if end: if prefix.startswith("dict"): ty = dict elif prefix.startswith("list"): ty = list else: ty = tuple break pos = next_pos return next_pos, ( ty() if len(res) == 1 and res[0] in (("dict.", None), None) else ty(res) ) return unflatten(names, got)[1]