Source code for experimental_experiment.xbuilder.model_container

import os
import ctypes
import time
import sys
from typing import Any, Optional
import numpy as np
import onnx.helper as oh
from onnx import ModelProto, StringStringEntryProto, TensorProto
from onnx.model_container import ModelContainer, _set_external_data
from onnx.external_data_helper import _get_all_tensors, uses_external_data
from onnx.inliner import inline_local_functions


STORAGE_TYPE = {
    TensorProto.FLOAT16: np.int16,
    TensorProto.BFLOAT16: np.int16,
}


def _get_type(elem_type: Any, exc: bool = True) -> int:
    if not isinstance(elem_type, int):
        st = str(elem_type)
        if "float32" in st:
            elem_type = TensorProto.FLOAT
        elif "float64" in st:
            elem_type = TensorProto.DOUBLE
        elif "bfloat16" in st:
            elem_type = TensorProto.BFLOAT16
        elif "float16" in st:
            elem_type = TensorProto.FLOAT16
        elif "uint64" in st:
            elem_type = TensorProto.UINT64
        elif "int64" in st:
            elem_type = TensorProto.INT64
        elif "uint32" in st:
            elem_type = TensorProto.UINT32
        elif "int32" in st:
            elem_type = TensorProto.INT32
        elif "uint16" in st:
            elem_type = TensorProto.UINT16
        elif "int16" in st:
            elem_type = TensorProto.INT16
        elif "bool" in st:
            elem_type = TensorProto.BOOL
        elif "uint8" in st:
            elem_type = TensorProto.UINT8
        elif "int8" in st:
            elem_type = TensorProto.INT8
        elif "complex64" in st:
            elem_type = TensorProto.COMPLEX64
        elif "complex128" in st:
            elem_type = TensorProto.COMPLEX128
        elif elem_type is None:
            elem_type = TensorProto.UNDEFINED
        elif exc:
            raise ValueError(f"Unable to interpret elem_type {elem_type!r}.")
    return elem_type


[docs] def proto_from_array( arr: "torch.Tensor", # noqa: F821 name: Optional[str] = None, verbose: int = 0, # noqa: F821 ) -> TensorProto: """ Converts a torch Tensor into a TensorProto. """ import sys import torch if not isinstance(arr, torch.Tensor): raise TypeError(f"Unexpected type {type(arr)}.") if arr.is_sparse: raise NotImplementedError( f"Sparse tensor is not supported yet but initializer {name!r} is." ) # arr.contiguous() is slow after a transpose, maybe there is a way to optimize this. if arr.is_contiguous(): arr_cpu = arr.cpu() else: arr_cpu = arr.contiguous().cpu() numel = torch.numel(arr_cpu) element_size = arr_cpu.element_size() if arr_cpu.dtype in {torch.bfloat16}: np_arr = arr_cpu elif arr_cpu.data_ptr() == arr.data_ptr(): copy = arr_cpu.clone().detach().requires_grad_(False) assert arr_cpu.data_ptr() != copy.data_ptr() np_arr = np.from_dlpack(copy) else: np_arr = np.from_dlpack(arr_cpu.detach()) tensor = TensorProto() tensor.dims.extend(arr_cpu.shape) tensor.name = name itype = _get_type(arr_cpu.dtype) assert not hasattr(TensorProto, "INT4") or itype not in { TensorProto.INT4, TensorProto.UINT4, }, f"Type {arr.dtype} is not supported yet for name={name!r}" tensor.data_type = itype if verbose > 1 and numel > 100: print(f"[proto_from_array] {tensor.data_type}[{arr_cpu.shape}]") if isinstance(np_arr, torch.Tensor): byte_data = (ctypes.c_ubyte * numel * element_size).from_address(np_arr.data_ptr()) tensor.raw_data = bytes(byte_data) if sys.byteorder == "big": np_dtype = oh.tensor_dtype_to_np_dtype(STORAGE_TYPE[tensor.data_type]) np.byteswap(np.frombuffer(tensor.raw_data, dtype=np_dtype), inplace=True) else: tensor.raw_data = np_arr.tobytes() if sys.byteorder == "big": np_dtype = oh.tensor_dtype_to_np_dtype(tensor.data_type) np.byteswap(np.frombuffer(tensor.raw_data, dtype=np_dtype), inplace=True) return tensor
[docs] class TorchModelContainer(ModelContainer): """ Overwrites :class:`onnx.model_container.ModelContainer` to support torch tensors. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._stats = { "time_export_write_model": 0, "time_export_byteswap_tobytes": 0, "time_export_tobytes": 0, "time_export_proto_from_array": 0, "time_export_write_tensor_bytes": 0, "time_export_inline_model": 0, } self.inline = False def _save_external( self, file_path: str, all_tensors_to_one_file: bool, ) -> ModelProto: """Save the large model into a main onnx file and one file per tensor. Follows the same format as :func:`write_external_data_tensors <onnx.external_data_helper.write_external_data_tensors>`. The main model needs to be modified to update the file location, the function returns this modified copy. Arguments: file_path: model file all_tensors_to_one_file: all tensors in one file stats: saves time if not Nones Returns: modified main model proto """ def _clean_name(prefix: str, name: str, unique_names: dict[str, int]) -> str: if prefix: name = f"{prefix}-{name}" for c in ":/\\;,!#": name = name.replace(c, "") base_name = name if name in unique_names: i = unique_names[name] + 1 unique_names[name] = i return f"{base_name}_{i}" unique_names[name] = 1 return name unique_names: dict[str, int] = {} folder = os.path.dirname(file_path) if folder and not os.path.exists(folder): raise FileNotFoundError(f"Folder {folder!r} does not exist.") proto = self.model_proto.SerializeToString() copy = ModelProto() copy.ParseFromString(proto) prefix = os.path.splitext(os.path.split(file_path)[-1])[0] if all_tensors_to_one_file: file_weight = f"{os.path.split(file_path)[1]}.weight" full_file_weight = f"{file_path}.weight" offset = 0 with open(full_file_weight, "wb") as f: pass for tensor in _get_all_tensors(copy): if not uses_external_data(tensor): continue prop: Optional[StringStringEntryProto] = None for ext in tensor.external_data: # type: ignore[assignment] if ext.key == "location": # type: ignore[attr-defined] prop = ext # type: ignore[assignment] if prop is None: raise RuntimeError(f"No location found for tensor name {tensor.name!r}.") if prop.value not in self.large_initializers: raise RuntimeError( f"Unable to find large tensor named {tensor.name!r} " f"with location {prop.value!r} in " f"{sorted(self.large_initializers)}." ) np_tensor = self.large_initializers[prop.value] if sys.byteorder == "big": # Convert endian from little to big begin = time.perf_counter() tensor_bytes = np_tensor.byteswap().tobytes() self._stats["time_export_byteswap_tobytes"] += time.perf_counter() - begin elif isinstance(np_tensor, np.ndarray): begin = time.perf_counter() tensor_bytes = np_tensor.tobytes() self._stats["time_export_tobytes"] += time.perf_counter() - begin elif isinstance(np_tensor, TensorProto): tensor_bytes = np_tensor.raw_data assert len(tensor_bytes) > 0, f"One tensor is null, np_tensor={np_tensor}." else: import torch if isinstance(np_tensor, torch.nn.Parameter): pt = np_tensor.data elif isinstance(np_tensor, torch.Tensor): pt = np_tensor else: raise NotImplementedError( f"Handling of type {type(np_tensor)} as large initializer " f"is not implemented yet." ) begin = time.perf_counter() proto = proto_from_array(pt, name="dummy") self._stats["time_export_proto_from_array"] += time.perf_counter() - begin tensor_bytes = proto.raw_data assert ( pt.dtype != torch.float32 or len(tensor_bytes) == np.prod(pt.shape) * 4 ), ( f"Unexpected size mismatch, buffer size is {len(tensor_bytes)}, " f"but tensor size={np.prod(pt.shape) * 4}, " f"shape={pt.shape}, dtype={pt.dtype}" ) begin = time.perf_counter() if all_tensors_to_one_file: _set_external_data( tensor, location=file_weight, offset=offset, length=len(tensor_bytes), ) offset += len(tensor_bytes) with open(full_file_weight, "ab") as f: f.write(tensor_bytes) else: name = f"{_clean_name(prefix, prop.value, unique_names)}.weight" _set_external_data(tensor, location=name) full_name = os.path.join(folder, name) prop.value = name with open(full_name, "wb") as f: f.write(tensor_bytes) self._stats["time_export_write_tensor_bytes"] += time.perf_counter() - begin if self.inline: begin = time.perf_counter() copy = inline_local_functions(copy) self._stats["time_export_inline_model"] += time.perf_counter() - begin begin = time.perf_counter() with open(file_path, "wb") as f: f.write(copy.SerializeToString()) self._stats["time_export_write_model"] += time.perf_counter() - begin return copy