Source code for onnx_diagnostic.helpers.model_builder_helper

import importlib.util
import os
import requests
import sys
from pathlib import Path
from typing import Any, Optional
from urllib.parse import urlparse
from onnx import helper, save_model, external_data_helper, ModelProto

CACHE_SUBDIR = "onnx-diagnostic"


[docs] def download_model_builder_to_cache( url: str = "https://raw.githubusercontent.com/microsoft/onnxruntime-genai/refs/heads/main/src/python/py/models/builder.py", ): """ Downloads ``builder.py`` from the ``https://github.com/microsoft/onnxruntime-genai/blob/main/src/python/py/models/builder.py``. """ filename = os.path.basename(urlparse(url).path) cache_dir = Path(os.getenv("HOME", Path.home())) / ".cache" / CACHE_SUBDIR cache_dir.mkdir(parents=True, exist_ok=True) file_path = cache_dir / filename if file_path.exists(): return file_path response = requests.get(url) response.raise_for_status() with open(file_path, "wb") as f: f.write(response.content) return file_path
[docs] def import_model_builder(module_name: str = "builder") -> object: """Imports the downloaded ``model.by``.""" if module_name in sys.modules: return sys.modules[module_name] path = Path(os.getenv("HOME", Path.home())) / ".cache" / CACHE_SUBDIR module_file = path / f"{module_name}.py" assert os.path.exists(module_file), f"Unable to find {module_file!r}" spec = importlib.util.spec_from_file_location(module_name, str(path)) if spec is None: spath = str(path) if spath not in sys.path: sys.path.append(spath) module = importlib.__import__(module_name) return module assert spec is not None, f"Unable to import module {module_name!r} from {str(path)!r}" module = importlib.util.module_from_spec(spec) sys.modules[module_name] = module spec.loader.exec_module(module) return module
def _make_model(self, model, verbose: int = 0): # Make inputs and outputs to ONNX model import torch self.make_inputs_and_outputs() # Make pre-processing nodes self.make_preprocessing_nodes() # Loop through model and map each module to ONNX/ORT ops self.layer_id = 0 for module in model.modules(): if ( isinstance(module, torch.nn.Embedding) and module.weight.shape[0] == self.vocab_size ) or (hasattr(model, "embedding") and module == model.embedding): # Checks (Hugging Face logic) or (GGUF logic) if not self.exclude_embeds: # Embedding layer if verbose: print("[_make_model] Reading embedding layer") self.make_embedding(module.weight.detach().cpu()) else: # Exclude embedding layer from model self.layernorm_attrs["root_input"] = "inputs_embeds" self.layernorm_attrs["skip_input"] = "inputs_embeds" elif ( module.__class__.__name__.endswith("DecoderLayer") or module.__class__.__name__.endswith("GLMBlock") ) and self.layer_id < self.num_layers: # Each decoder layer of model if verbose: print(f"[_make_model] Reading decoder layer {self.layer_id}") self.make_layer(self.layer_id, module) self.layer_id += 1 elif self.layer_id == self.num_layers and self.has_final_norm(module, model): # SkipLayerNorm after last decoder layer (MatMul --> SkipLayerNorm) if verbose: print("[_make_model] Reading final norm") self.make_layernorm( self.layer_id, module, skip=True, simple=self.layernorm_attrs["simple"], location="final_norm", ) elif ( isinstance(module, torch.nn.Linear) and module.out_features == self.vocab_size ) or (hasattr(model, "lm_head") and module == model.lm_head): # Checks (Hugging Face logic) or (GGUF logic) if not self.exclude_lm_head: # Language modeling head (SkipLayerNorm --> logits) if verbose: print("[_make_model] Reading LM head") self.make_lm_head(module)
[docs] def save_model_builder(self, out_dir: Optional[str] = "", verbose: int = 0) -> ModelProto: """ Saves a model created by function :func:`create_model_builder`. If out_dir is empty or not specified, the function still returns the generated model. """ if verbose: print(f"[save_model_builder] Saving ONNX model in {out_dir}") # Create ONNX model model = helper.make_model( opset_imports=[ self.clear_field( helper.make_operatorsetid("", 21 if self.quant_attrs["use_qdq"] else 14), "domain", ), helper.make_operatorsetid("com.microsoft", 1), ], ir_version=7, producer_name="onnxruntime-genai", producer_version="0.0.0", graph=self.make_graph( name="main_graph", inputs=self.inputs, outputs=self.outputs, initializer=self.initializers, value_info=self.value_infos, nodes=self.nodes, ), ) # Load external data into ONNX model external_data_helper.load_external_data_for_model(model, self.cache_dir) # Delete external data files on disk before re-saving for path in os.listdir(self.cache_dir): if path.endswith(".bin"): os.remove(os.path.join(self.cache_dir, path)) # Delete temporary cache dir if empty # if len(os.listdir(self.cache_dir)) == 0: # os.rmdir(self.cache_dir) # Quantize ONNX model to desired precision already_quantized_in_qdq_format = ( self.quant_type is not None and self.quant_attrs["use_qdq"] ) # Skip quantizing `MatMul` in `DequantizeLinear --> Transpose --> MatMul` path if self.onnx_dtype == "int4" and not already_quantized_in_qdq_format: model = self.to_int4(model) # Save ONNX model with only one external data file and delete any existing duplicate copies if out_dir: out_path = os.path.join(out_dir, self.filename) data_path = os.path.join(out_dir, os.path.basename(out_path) + ".data") if os.path.exists(out_path): if verbose: print(f"[save_model_builder] Overwriting {out_path!r}") os.remove(out_path) if os.path.exists(data_path): if verbose: print(f"[save_model_builder] Overwriting {data_path!r}") os.remove(data_path) if out_dir: location = os.path.basename(data_path) if os.path.exists(location): os.remove(location) if verbose: print(f"[save_model_builder] out_path={out_path!r}") print(f"[save_model_builder] location={location!r}") save_model( model, out_path, save_as_external_data=True, all_tensors_to_one_file=True, location=location, size_threshold=1024, convert_attribute=False, ) return None return model
[docs] def create_model_builder( config: Any, model: "torch.nn.Module", # noqa: F821 cache_dir: str, precision: str = "fp32", execution_provider: str = "cpu", verbose: int = 0, **extra_options, ) -> "Model": # noqa: F821 """ Creates a model based on a configuration. The onnx model is returned by function :func:`save_model_builder`. :param config: configuration :param cache_dir: cache directory :param precision: precision :param execution_provider: execution provider :param verbose: verbosity :param extra_options: extra options :return: model """ assert cache_dir, "create_model_builder does not work without cache_dir." assert os.path.exists(cache_dir), f"cache_dir={cache_dir!r} does not exists" download_model_builder_to_cache() builder = import_model_builder() io_dtype = builder.set_io_dtype(precision, execution_provider, extra_options) arch_map = { "ChatGLMForConditionalGeneration": builder.ChatGLMModel, "ChatGLMModel": builder.ChatGLMModel, "GemmaForCausalLM": builder.Gemma2Model, "Gemma3ForCausalLM": builder.Gemma3Model, "Gemma3ForConditionalGeneration": builder.Gemma3Model, "GraniteForCausalLM": builder.GraniteModel, "LlamaForCausalLM": builder.LlamaModel, "MistralForCausalLM": builder.MistralModel, "NemotronForCausalLM": builder.NemotronModel, "OlmoForCausalLM": builder.OLMoModel, "PhiForCausalLM": builder.PhiModel, "Phi3ForCausalLM": ( lambda config, *_: ( builder.Phi3MiniModel if config.max_position_embeddings == config.original_max_position_embeddings else builder.Phi3MiniLongRoPEModel ) ), "PhiMoEForCausalLM": builder.Phi3MoELongRoPEModel, "Phi3SmallForCausalLM": ( lambda config, *_: ( builder.Phi3SmallModel if config.max_position_embeddings == config.original_max_position_embeddings else builder.Phi3SmallLongRoPEModel ) ), "Phi3VForCausalLM": builder.Phi3VModel, "Phi4MMForCausalLM": builder.Phi4MMModel, "Qwen2ForCausalLM": builder.QwenModel, "Qwen3ForCausalLM": builder.Qwen3Model, } assert config.architectures[0] in arch_map, ( f"Unable find {config.architectures[0]!r} in the supported list " f"of architectures: {sorted(arch_map)}" ) # Additional validations. post = None if config.architectures[0] in ("ChatGLMForConditionalGeneration", "ChatGLMModel"): # Quantized ChatGLM model has ChatGLMForConditionalGeneration # as architecture whereas HF model as the latter config.hidden_act = "swiglu" elif config.architectures[0] == "Gemma2ForCausalLM": assert precision == "bfp16", ( f"architecture {config.architectures[0]!r} loses accuracy " f"with float16 precision, use bfp16." ) elif config.architectures[0] == "Gemma3ForCausalLM": assert precision == "bfp16", ( f"architecture {config.architectures[0]!r} loses accuracy " f"with float16 precision, use bfp16." ) def _post(onnx_model): onnx_model.model_type = "gemma3_text" post = _post elif config.architectures[0] == "Gemma3ForConditionalGeneration": assert extra_options.get("exclude_embeds", False), ( f"This is only generating the text component of architecture " f"{config.architectures[0]!r}. Set extra_options exclude_embeds=true." ) assert precision == "bfp16", ( f"architecture {config.architectures[0]!r} loses accuracy " f"with float16 precision, use bfp16." ) text_config = config.text_config for key in text_config: if not hasattr(config, key): setattr(config, key, getattr(text_config, key)) elif ( config.architectures[0] == "PhiMoEForCausalLM" and config.max_position_embeddings != config.original_max_position_embeddings ): assert execution_provider == "cuda", ( f"architecture {config.architectures[0]!r} works on 'cuda' " f"because `MoE` is only supported for CUDA in ONNX Runtime." ) assert precision == "int4", f"architecture {config.architectures[0]!r} supports int4." elif config.architectures[0] == "Phi3VForCausalLM": assert extra_options.get("exclude_embeds", False), ( f"This is only generating the text component of architecture " f"{config.architectures[0]!r}. Set extra_options exclude_embeds=true." ) elif config.architectures[0] == "Phi4MMForCausalLM": assert extra_options.get("exclude_embeds", False), ( f"This is only generating the text component of architecture " f"{config.architectures[0]!r}. Set extra_options exclude_embeds=true." ) cls = arch_map[config.architectures[0]] onnx_model = cls(config, io_dtype, precision, execution_provider, cache_dir, extra_options) if post: post(onnx_model) _make_model(onnx_model, model, verbose=verbose) assert onnx_model.nodes, ( f"No node in the model, io_dtype={io_dtype!r}, " f"precision={precision!r}, execution_provider={execution_provider!r}, " f"extra_options={extra_options!r}, cache_dir={cache_dir!r}, " f"\n-- config --\n{config}" ) # onnx_model.make_genai_config(hf_name, extra_kwargs, output_dir) # onnx_model.save_processing(hf_name, extra_kwargs, output_dir) return onnx_model