Source code for onnx_diagnostic.helpers.rt_helper

import json
import os
import warnings
from typing import Any, Dict, List, Optional, Tuple, Union
import numpy as np
import onnx
import torch
from .helper import string_type, flatten_object, max_diff
from .torch_helper import torch_deepcopy
from .ort_session import InferenceSessionForTorch


def name_type_to_onnx_dtype(name: str) -> int:
    assert name.startswith("tensor(") and name.endswith(")"), f"Invalid value name={name!r}"
    look = name[7:-1]
    return getattr(onnx.TensorProto, look.upper())


[docs] def make_feeds( proto: Union[onnx.ModelProto, List[str]], inputs: Any, use_numpy: bool = False, copy: bool = False, check_flatten: bool = True, is_modelbuilder: bool = False, ) -> Dict[str, Union[torch.Tensor, np.ndarray]]: """ Serializes the inputs to produce feeds expected by :class:`onnxruntime.InferenceSession`. :param proto: onnx model or list of names :param inputs: any kind of inputs :param use_numpy: if True, converts torch tensors into numpy arrays :param copy: a copy is made, this should be the case if the inputs is ingested by ``OrtValue`` :param check_flatten: if True, checks the ``torch.utils._pytree.tree_flatten`` returns the same number of outputs :param is_modelbuilder: if True, the exporter is ModelBuilder, and we need to reorder the past_key_values inputs to match the expected order, and get rid of position_ids. :return: feeds dictionary """ # NOTE: position_ids is a special case because ModelBuilder does not usually use it, # because it's fued into rotary embedding in GQA. if is_modelbuilder and isinstance(inputs, dict): inputs.pop("position_ids", None) # Ensure 'position_ids' absent before removing. flat = flatten_object(inputs, drop_keys=True) assert ( not check_flatten or not all(isinstance(obj, torch.Tensor) for obj in flat) # or not is_cache_dynamic_registered(fast=True) or len(flat) == len(torch.utils._pytree.tree_flatten(inputs)[0]) ), ( f"Unexpected number of flattened objects, " f"{string_type(flat, with_shape=True)} != " f"{string_type(torch.utils._pytree.tree_flatten(inputs)[0], with_shape=True)}" ) if use_numpy: from .torch_helper import to_numpy flat = [to_numpy(t) if isinstance(t, torch.Tensor) else t for t in flat] names = ( [i.name for i in proto.graph.input] if isinstance(proto, onnx.ModelProto) else ( [i.name for i in proto.get_inputs()] if hasattr(proto, "get_inputs") else (proto.input_names if hasattr(proto, "input_names") else proto) ) ) assert ( isinstance(names, list) and len(names) <= len(flat) and ( len(names) == len(flat) or isinstance(proto, onnx.ModelProto) or hasattr(proto, "get_inputs") ) ), ( f"Not the same number of given inputs {len(flat)} " f"and the number of model inputs {len(names)}, " f"type(names)={type(names)}, type(proto)={type(proto)}" f"\n-- inputs={string_type(inputs, with_shape=True)}" f"\n-- names={names}" ) if copy: flat = [t.copy() if hasattr(t, "copy") else t.clone() for t in flat] # bool, int, float, onnxruntime does not support float, bool, int new_flat = [] for i in flat: if isinstance(i, bool): i = np.array(i, dtype=np.bool_) elif isinstance(i, int): i = np.array(i, dtype=np.int64) elif isinstance(i, float): i = np.array(i, dtype=np.float32) new_flat.append(i) return dict(zip(names, new_flat))
def _get_dim(i: int, s: Union[str, int], batch: int = 1) -> int: if isinstance(s, int): return s if s == "batch": return batch # Everything else is cache length or sequence length. return 0 _DTYPES = { "tensor(float)": torch.float32, "tensor(float16)": torch.float16, "tensor(bfloat16)": torch.bfloat16, "tensor(int64)": torch.int64, "tensor(int32)": torch.int32, }
[docs] def rt_type_to_torch_dtype(typename: str) -> torch.dtype: """Converts a string such as ``tensor(float)`` into a dtype (torch.float32).""" return _DTYPES[typename]
[docs] def make_empty_cache( batch: int, onnx_input_names: List[str], onnx_input_shapes: List[Tuple[Union[int, str], ...]], onnx_input_types: List[str], ) -> Dict[str, torch.Tensor]: """ Creates an empty cache. Example: .. code-block:: python make_empty_cache( 1, sess.input_names[2:], [i.shape for i in sess.get_inputs()[2:]], [i.type for i in sess.get_inputs()[2:]], ) """ feeds = {} for name, shape, dtype in zip(onnx_input_names, onnx_input_shapes, onnx_input_types): new_shape = tuple(_get_dim(i, s, batch=batch) for i, s in enumerate(shape)) feeds[name] = torch.empty(new_shape, dtype=rt_type_to_torch_dtype(dtype)) return feeds
[docs] def generate_and_validate( model, input_ids: torch.Tensor, eos_token_id: int = 2, max_new_tokens: int = 100, session: Optional[Union[InferenceSessionForTorch, onnx.ModelProto, str]] = None, atol: float = 0.1, ) -> Union[torch.Tensor, Tuple[torch.Tensor, List[Dict]]]: """ Implements a simple method ``generate`` for a torch model. The function does not expect any ``position_ids`` as input. The function also checks the outputs coming from an onnx model are close to the output the torch model produces. :param model_or_path: model or loaded model :param input_ids: input tokens :param eos_token_ids: token representing the end of an answer :param max_new_tokens: stops after this number of generated tokens :param session: the onnx model :return: input tokens concatenated with new tokens, if session is not null, it also returns the maximum differences at every iterations See example given with function :func:`onnx_generate <onnx_diagnostic.helpers.rt_helper.onnx_generate>`. """ if session is not None: if not isinstance(session, InferenceSessionForTorch): providers = ["CUDAExecutionProvider"] if input_ids.is_cuda else [] providers.append("CPUExecutionProvider") session = InferenceSessionForTorch(session, providers=providers) # First call: prefill attention_mask = torch.ones( input_ids.shape, dtype=input_ids.dtype, device=input_ids.device ) if session: feeds = { **dict(zip(session.input_names[:2], [input_ids, attention_mask])), **make_empty_cache( input_ids.shape[0], session.input_names[2:], session.input_shapes[2:], session.input_types[2:], ), } onnx_results = session.run(None, feeds) outputs = model(input_ids, use_cache=True, attention_mask=attention_mask) if session: diff = max_diff(outputs, onnx_results) assert isinstance(diff["abs"], float) and diff["abs"] <= atol, ( f"Unexpected issue with {type(model)}\ndiff={diff}" f"\ninput_ids.shape={input_ids.shape}" f"\nexpected={string_type(outputs, with_shape=True, with_min_max=True)}" f"\n got=\n" f"{string_type(onnx_results, with_shape=True, with_min_max=True)}\n" f"feeds={string_type(feeds, with_shape=True, with_min_max=True)}" ) diffs = [diff] # Next calls: decode for iteration in range(max_new_tokens): next_token_logits = outputs.logits[:, -1, :] next_token_id = torch.argmax(next_token_logits, dim=-1, keepdim=True) if next_token_id.item() == eos_token_id: break input_ids = torch.cat([input_ids, next_token_id], dim=-1) attention_mask = torch.ones( input_ids.shape, dtype=input_ids.dtype, device=input_ids.device ) if session: feeds = dict( zip( session.input_names, [ t.detach() for t in torch_deepcopy( flatten_object( [next_token_id, attention_mask, outputs.past_key_values] ) ) ], ) ) onnx_results = session.run(None, feeds) outputs = model( next_token_id, use_cache=True, past_key_values=outputs.past_key_values, attention_mask=attention_mask, ) if session: diff = max_diff(outputs, onnx_results) assert isinstance(diff["abs"], float) and diff["abs"] <= atol, ( f"Unexpected issue with {type(model)}, iteration={iteration}" f"\ndiff={diff}\ninput_ids.shape={input_ids.shape}" f"\nexpected={string_type(outputs, with_shape=True, with_min_max=True)}" f"\n got=\n" f"{string_type(onnx_results, with_shape=True, with_min_max=True)}\n" f"feeds={string_type(feeds, with_shape=True, with_min_max=True)}" ) diffs.append(diff) if session: return input_ids, diffs return input_ids
[docs] def onnx_generate( model_or_path: Union[onnx.ModelProto, str, InferenceSessionForTorch], input_ids: torch.Tensor, eos_token_id: int = 2, max_new_tokens=100, return_session: bool = False, ) -> Union[torch.Tensor, Tuple[torch.Tensor, InferenceSessionForTorch, Dict[str, Any]]]: """ Implements a simple method ``generate`` for an ONNX model. The function does not expect any ``position_ids`` as input. :param model_or_path: model or loaded model :param input_ids: input tokens :param eos_token_ids: token representing the end of an answer :param max_new_tokens: stops after this number of generated tokens :param return_session: returns the instance of class :class:`InferenceSessionForTorch <onnx_diagnostic.helpers.ort_session.InferenceSessionForTorch>` created if necessary, the function returns the feeds for the next iteration :return: input tokens concatenated with new tokens .. runpython:: :showcode: import os from onnx_diagnostic.helpers import string_type, string_diff from onnx_diagnostic.helpers.rt_helper import ( onnx_generate, generate_and_validate, onnx_generate_with_genai, ) from onnx_diagnostic.torch_models.hghub import get_untrained_model_with_inputs from onnx_diagnostic.torch_export_patches import torch_export_patches from onnx_diagnostic.export.api import to_onnx mid = "arnir0/Tiny-LLM" print(f"-- get model for {mid!r}") data = get_untrained_model_with_inputs(mid) model, inputs, ds = data["model"], data["inputs"], data["dynamic_shapes"] del inputs["position_ids"] del ds["position_ids"] input_ids = inputs["input_ids"] print(f"-- input_ids={input_ids.shape}") print(f"-- inputs: {string_type(inputs, with_shape=True)}") print(f"-- dynamic_shapes: {string_type(ds)}") folder = "dump_test" os.makedirs(folder, exist_ok=True) model_name = os.path.join(folder, "model.onnx") print("-- test_onnx_generate: export model") with torch_export_patches(patch_transformers=True, patch_torch=False): to_onnx( model, (), kwargs=inputs, dynamic_shapes=ds, filename=model_name, exporter="custom", # custom, dynamo or onnx-dynamo, modelbuilder ) print("-- generate with onnx") onnx_outputs = onnx_generate(model_name, input_ids[:1], 2, max_new_tokens=10) print("-- onnx output", onnx_outputs) # The example continues with other functions doing the same. print("-- generate with pytorch") torch_outputs, diffs = generate_and_validate( model, input_ids[:1], 2, max_new_tokens=10, session=model_name ) print("-- torch output", torch_outputs) print("-- differences at each step:") for i, d in enumerate(diffs): print(f"iteration {i}: {string_diff(d)}") print("-- generate with genai") genai_outputs, session = onnx_generate_with_genai( model_name, input_ids[:1], max_new_tokens=10, return_session=True, transformers_config=data["configuration"], ) print("-- genai output", genai_outputs) """ if not isinstance(model_or_path, InferenceSessionForTorch): providers = ["CUDAExecutionProvider"] if input_ids.is_cuda else [] providers.append("CPUExecutionProvider") session = InferenceSessionForTorch(model_or_path, providers=providers) else: session = model_or_path input_shapes = session.input_shapes input_names = session.input_names input_types = session.input_types has_position_ids = "position_ids" in session.input_names assert ( len(input_names) > 2 and input_names[:2] == ["input_ids", "attention_mask"] and input_names[3 if has_position_ids else 2].startswith("past_key_values") ), ( f"Only text generation is supported but input_names == {input_names}, " f"has_position_ids={has_position_ids}" ) assert ( not has_position_ids or input_names[2] == "position_ids" ), f"position_ids must the third input but input_names={input_names}" # First call: prefill feeds = dict( input_ids=input_ids, attention_mask=torch.ones( input_ids.shape, dtype=input_ids.dtype, device=input_ids.device ), **make_empty_cache( input_ids.shape[0], input_names[2:], input_shapes[2:], input_types[2:] ), ) if has_position_ids: feeds["position_ids"] = torch.unsqueeze( torch.arange(input_ids.shape[1], dtype=torch.int64, device=input_ids.device), 0 ) outputs = session.run(None, feeds) # Next calls: decode for _ in range(max_new_tokens): next_token_logits = outputs[0][:, -1, :] # The most probable next token is chosen. next_token_id = torch.argmax(next_token_logits, dim=-1, keepdim=True) # But we could select it using a multinomial law # <<< probs = torch.softmax(next_token_logits / temperature, dim=-1) # <<< top_probs, top_indices = torch.topk(probs, top_k) # <<< next_token_id = top_indices[torch.multinomial(top_probs, 1)] if next_token_id.item() == eos_token_id: break input_ids = torch.cat([input_ids, next_token_id.to(input_ids.device)], dim=-1) feeds = dict( input_ids=next_token_id, attention_mask=torch.ones( input_ids.shape, dtype=input_ids.dtype, device=input_ids.device ), ) if has_position_ids: feeds["position_ids"] = torch.unsqueeze( torch.arange( input_ids.shape[1], input_ids.shape[1] + 1, dtype=torch.int64, device=input_ids.device, ), 0, ) feeds.update(dict(zip(input_names[3 if has_position_ids else 2 :], outputs[1:]))) outputs = session.run(None, feeds) if return_session: return input_ids, session, feeds return input_ids
[docs] def onnx_generate_with_genai( model_or_path: Union[onnx.ModelProto, str, InferenceSessionForTorch], input_ids: torch.Tensor, max_new_tokens=100, return_session: bool = False, transformers_config: Optional[Any] = None, ) -> Union[torch.Tensor, Tuple[torch.Tensor, InferenceSessionForTorch]]: """ Uses :epkg:`onnxruntime-genai` to implement a simple method ``generate`` for an ONNX model. The function does not expect any ``position_ids`` as input. :param model_or_path: model or loaded model :param input_ids: input tokens :param eos_token_ids: token representing the end of an answer :param max_new_tokens: stops after this number of generated tokens :param return_session: returns the instance of class :class:`InferenceSessionForTorch <onnx_diagnostic.helpers.ort_session.InferenceSessionForTorch>` created if necessary :param transformers_config: write configuration if missing and if this configuration is provided :return: input tokens concatenated with new tokens See example given with function :func:`onnx_generate <onnx_diagnostic.helpers.rt_helper.onnx_generate>`. """ import onnxruntime_genai as og if not isinstance(model_or_path, og.Model): from .model_builder_helper import make_genai_config assert isinstance( model_or_path, str ), f"Only a filename is allowed for model_or_path but type is {type(model_or_path)}" folder = os.path.dirname(model_or_path) assert os.path.exists(folder), f"Folder {folder!r} does not exists." assert os.path.exists(model_or_path), f"Folder {model_or_path!r} does not exists." config_file = os.path.join(folder, "genai_config.json") if not os.path.exists(config_file): if not transformers_config: raise FileNotFoundError( f"Folder {model_or_path!r} does not contain 'genai_config.json'." ) config = make_genai_config(transformers_config, model_or_path) with open(config_file, "w") as f: json.dump(config, f, indent=4) config = og.Config(os.path.dirname(config_file)) if input_ids.is_cuda: config.clear_providers() config.append_provider("cuda") session = og.Model(config) else: session = model_or_path params = og.GeneratorParams(session) params.set_search_options( max_length=max_new_tokens + input_ids.shape[1], batch_size=input_ids.shape[0] ) generator = og.Generator(session, params) # First call: prefill cats = [] generator.append_tokens(input_ids) while not generator.is_done(): generator.generate_next_token() new_token = generator.get_next_tokens()[0] cats.append(int(new_token)) input_ids = torch.cat([input_ids, torch.tensor([cats], dtype=torch.int64)], dim=-1) if return_session: return input_ids, session return input_ids
_mapping_types = { "float": "F", "double": "D", "float16": "H", "uint8": "U8", "uint16": "U16", "uint32": "U32", "uint64": "U64", "int8": "I8", "int16": "I16", "int32": "I32", "int64": "I64", } def _process_shape(shape_df): if isinstance(shape_df, float) or len(shape_df) == 0: return "" values = [] for val in shape_df: if len(val) != 1: raise ValueError(f"Unable to process shape {val!r} from {values!r}.") for _k, _v in val.items(): k, v = _k, _v break if v: vs = "x".join(map(str, v)) values.append(f"{_mapping_types.get(k,k)}[{vs}]") else: values.append(f"{_mapping_types.get(k,k)}") return "+".join(values)
[docs] def post_process_df_profile( df: "pandas.DataFrame", # noqa: F821 first_it_out: bool = False, agg: bool = False, agg_op_name: bool = True, with_shape: bool = False, ) -> "pandas.DataFrame": # noqa: F821 """ Post-processed a dataframe obtained after profiling onnxruntime. It adds a column for a more explicit event name and adds a column for the iteration number :param agg: aggregate the result :param first_it_out: leave the first iteration out of the aggregation :param agg_op_name: aggregate on operator name or operator index :param with_shape: keep the shape to aggregate :return: DataFrame """ events = {"kernel_time", "fence_after", "fence_before"} def sep_event(s): for e in events: if s.endswith(e): return e return s df = df.copy() df["event_name"] = df["name"].apply(sep_event) df["iteration"] = -1 current = -1 for i in range(df.shape[0]): if df.loc[i, "name"] == "SequentialExecutor::Execute": current += 1 df.loc[i, "iteration"] = current if not agg: if with_shape: df["args_input_type_shape"] = df["args_input_type_shape"].apply(_process_shape) df["args_output_type_shape"] = df["args_output_type_shape"].apply(_process_shape) else: df = df.drop(["args_input_type_shape", "args_output_type_shape"], axis=1) if first_it_out: df["it==0"] = (df["iteration"] <= 0).astype(int) return df agg_cols = ["cat", "args_node_index", "args_op_name", "args_provider", "event_name"] if with_shape: agg_cols.append("args_input_type_shape") df["args_input_type_shape"] = df["args_input_type_shape"].apply(_process_shape) df["args_output_type_shape"] = df["args_output_type_shape"].apply(_process_shape) else: df = df.drop(["args_input_type_shape", "args_output_type_shape"], axis=1) if first_it_out: df["it==0"] = (df["iteration"] <= 0).astype(int) agg_cols.insert(0, "it==0") if agg_op_name: del agg_cols[agg_cols.index("args_node_index")] for c in agg_cols: df[c] = df[c].fillna("") df["dur"] = df["dur"].fillna(0) agg = df[[*agg_cols, "dur"]].groupby(agg_cols).sum() return agg
[docs] def js_profile_to_dataframe( filename: str, as_df: bool = True, first_it_out: bool = False, agg: bool = False, agg_op_name: bool = False, with_shape: bool = False, ) -> Union[List, "pandas.DataFrame"]: # noqa: F821 """ Profiles the execution of an onnx graph with onnxruntime. :param filename: filename holding the profiling stored in json format :param as_df: returns the :param first_it_out: if aggregated, leaves the first iteration out :param agg: aggregate by event :param agg_op_name: aggregate on operator name or operator index :param with_shape: keep the shape before aggregating :return: DataFrame or dictionary """ with open(filename, "r") as f: content = f.read() js = json.loads(content) suffixes = ["_kernel_time", "_fence_before", "_fence_after"] rows = [] for row in js: if "args" in row and isinstance(row["args"], dict): for k, v in row["args"].items(): row[f"args_{k}"] = v del row["args"] name = row["name"] for suf in suffixes: if name.endswith(suf): changed = name[: -len(suf)] row["op_name"] = changed break rows.append(row) if as_df: import pandas return post_process_df_profile( pandas.DataFrame(rows), first_it_out=first_it_out, agg=agg, agg_op_name=agg_op_name, with_shape=with_shape, ) return rows
def _preprocess_graph1(df): df = df.copy() df["args_provider"] = df["args_provider"].apply( lambda s: s.replace("ExecutionProvider", "") if isinstance(s, str) else s ) agg_cols = ["dur", "args_op_name", "args_provider"] for c in ["it==0", "args_input_type_shape"]: if c in df.columns: agg_cols.append(c) if "it==0" in df.columns: vs = ["t>=1", "t=0"] df["it==0"] = df["it==0"].apply(lambda v: vs[v]) gr_dur = df[agg_cols].groupby(agg_cols[1:]).sum().sort_values("dur") gr_n = df[agg_cols].groupby(agg_cols[1:]).count() gr_n = gr_n.loc[gr_dur.index, :] gr_n.columns = ["count"] gr = gr_dur.merge(gr_n, left_index=True, right_index=True, how="outer") gr["ratio"] = gr["dur"] / gr["dur"].sum() return gr_dur, gr_n, gr def _preprocess_graph2(df): df = df.reset_index(drop=False).copy() df["args_node_index"] = df["args_node_index"].apply( lambda i: int(i) if i not in {None, ""} else -1 ) df["args_provider"] = df["args_provider"].apply( lambda s: s.replace("ExecutionProvider", "") if isinstance(s, str) else s ) df = df[(df["cat"] == "Node") & (df["event_name"] == "kernel_time")] agg_cols = ["dur", "args_node_index", "args_op_name", "args_provider"] for c in ["it==0", "args_input_type_shape"]: if c in df.columns: agg_cols.append(c) if "it==0" in df.columns: vs = ["t>=1", "t=0"] df["it==0"] = df["it==0"].apply(lambda v: vs[v]) df = df[agg_cols].groupby(agg_cols[1:]).sum() df = df.sort_index(ascending=False) df["ratio"] = df["dur"] / df["dur"].sum() return df
[docs] def plot_ort_profile( df: "pandas.DataFrame", # noqa: F821 ax0: Optional["matplotlib.axes.Axes"] = None, # noqa: F821 ax1: Optional["matplotlib.axes.Axes"] = None, # noqa: F821 title: Optional[str] = None, ) -> "matplotlib.axes.Axes": # noqa: F821 """ Plots time spend in computation based on a dataframe produced by function :func:`js_profile_to_dataframe`. :param df: dataframe :param ax0: first axis to draw time :param ax1: second axis to draw occurrences :param title: graph title :return: the graph .. plot:: :include-source: import numpy as np from onnx import TensorProto import onnx.helper as oh from onnx.checker import check_model from onnx.numpy_helper import from_array import matplotlib.pyplot as plt from onnxruntime import InferenceSession, SessionOptions from onnx_diagnostic.helpers.rt_helper import js_profile_to_dataframe, plot_ort_profile def get_model(): model_def0 = oh.make_model( oh.make_graph( [ oh.make_node("Add", ["X", "init1"], ["X1"]), oh.make_node("Abs", ["X"], ["X2"]), oh.make_node("Add", ["X", "init3"], ["inter"]), oh.make_node("Mul", ["X1", "inter"], ["Xm"]), oh.make_node("Sub", ["X2", "Xm"], ["final"]), ], "test", [oh.make_tensor_value_info("X", TensorProto.FLOAT, [None])], [oh.make_tensor_value_info("final", TensorProto.FLOAT, [None])], [ from_array(np.array([1], dtype=np.float32), name="init1"), from_array(np.array([3], dtype=np.float32), name="init3"), ], ), opset_imports=[oh.make_opsetid("", 18)], ir_version=9, ) check_model(model_def0) return model_def0 sess_options = SessionOptions() sess_options.enable_profiling = True sess = InferenceSession( get_model().SerializeToString(), sess_options, providers=["CPUExecutionProvider"] ) for _ in range(11): sess.run(None, dict(X=np.arange(10).astype(np.float32))) prof = sess.end_profiling() df = js_profile_to_dataframe(prof, first_it_out=True) print(df.head()) fig, ax = plt.subplots(1, 2, figsize=(10, 5)) plot_ort_profile(df, ax[0], ax[1], "test_title") fig.tight_layout() With ``agg=True``: .. plot:: :include-source: import numpy as np from onnx import TensorProto import onnx.helper as oh from onnx.checker import check_model from onnx.numpy_helper import from_array import matplotlib.pyplot as plt from onnxruntime import InferenceSession, SessionOptions from onnx_diagnostic.helpers.rt_helper import js_profile_to_dataframe, plot_ort_profile def get_model(): model_def0 = oh.make_model( oh.make_graph( [ oh.make_node("Add", ["X", "init1"], ["X1"]), oh.make_node("Abs", ["X"], ["X2"]), oh.make_node("Add", ["X", "init3"], ["inter"]), oh.make_node("Mul", ["X1", "inter"], ["Xm"]), oh.make_node("Sub", ["X2", "Xm"], ["final"]), ], "test", [oh.make_tensor_value_info("X", TensorProto.FLOAT, [None])], [oh.make_tensor_value_info("final", TensorProto.FLOAT, [None])], [ from_array(np.array([1], dtype=np.float32), name="init1"), from_array(np.array([3], dtype=np.float32), name="init3"), ], ), opset_imports=[oh.make_opsetid("", 18)], ir_version=9, ) check_model(model_def0) return model_def0 sess_options = SessionOptions() sess_options.enable_profiling = True sess = InferenceSession( get_model().SerializeToString(), sess_options, providers=["CPUExecutionProvider"] ) for _ in range(11): sess.run(None, dict(X=np.arange(10).astype(np.float32))) prof = sess.end_profiling() df = js_profile_to_dataframe(prof, first_it_out=True, agg=True) print(df.head()) fig, ax = plt.subplots(1, 2, figsize=(10, 5)) plot_ort_profile(df, ax[0], ax[1], "test_title") fig.tight_layout() """ fontsize = 10 if ax0 is None: import matplotlib.pyplot as plt ax0 = plt.gca() if "args_provider" in df.columns: # Aggregation by operator gr_dur, gr_n, _ = _preprocess_graph1(df) gr_dur.plot.barh(ax=ax0) with warnings.catch_warnings(): warnings.simplefilter("ignore") ax0.set_xticklabels(ax0.get_xticklabels(), fontsize=fontsize) ax0.get_yaxis().set_label_text("") ax0.set_yticklabels( ax0.get_yticklabels(), rotation=45, ha="right", fontsize=fontsize ) if title is not None: ax0.set_title(title) if ax1 is not None: gr_n.plot.barh(ax=ax1) ax1.set_title("n occurrences") with warnings.catch_warnings(): warnings.simplefilter("ignore") ax1.set_xticklabels(ax1.get_xticklabels(), fontsize=fontsize) ax1.get_yaxis().set_label_text("") ax1.set_yticklabels( ax1.get_yticklabels(), rotation=45, ha="right", fontsize=fontsize ) return ax0 df = _preprocess_graph2(df) df[["dur"]].plot.barh(ax=ax0) if title is not None: ax0.set_title(title) with warnings.catch_warnings(): warnings.simplefilter("ignore") ax0.set_xticklabels(ax0.get_xticklabels(), fontsize=fontsize) ax0.get_yaxis().set_label_text("") ax0.set_yticklabels(ax0.get_yticklabels(), fontsize=fontsize) return ax0
[docs] def plot_ort_profile_timeline( df: "pandas.DataFrame", # noqa: F821 ax: Optional["matplotlib.axes.Axes"] = None, # noqa: F821 iteration: int = -2, title: Optional[str] = None, quantile: float = 0.5, fontsize: int = 12, ) -> "matplotlib.axes.Axes": # noqa: F821 """ Creates a timeline based on a dataframe produced by function :func:`js_profile_to_dataframe`. :param df: dataframe :param ax: first axis to draw time :param iteration: iteration to plot, negative value to start from the end :param title: graph title :param quantile: draw the 10% less consuming operators in a different color :param fontsize: font size :return: the graph .. plot:: :include-source: import numpy as np from onnx import TensorProto import onnx.helper as oh from onnx.checker import check_model from onnx.numpy_helper import from_array import matplotlib.pyplot as plt from onnxruntime import InferenceSession, SessionOptions from onnx_diagnostic.helpers.rt_helper import ( js_profile_to_dataframe, plot_ort_profile_timeline, ) def get_model(): model_def0 = oh.make_model( oh.make_graph( [ oh.make_node("Add", ["X", "init1"], ["X1"]), oh.make_node("Abs", ["X"], ["X2"]), oh.make_node("Add", ["X", "init3"], ["inter"]), oh.make_node("Mul", ["X1", "inter"], ["Xm"]), oh.make_node("Sub", ["X2", "Xm"], ["final"]), ], "test", [oh.make_tensor_value_info("X", TensorProto.FLOAT, [None])], [oh.make_tensor_value_info("final", TensorProto.FLOAT, [None])], [ from_array(np.array([1], dtype=np.float32), name="init1"), from_array(np.array([3], dtype=np.float32), name="init3"), ], ), opset_imports=[oh.make_opsetid("", 18)], ir_version=9, ) check_model(model_def0) return model_def0 sess_options = SessionOptions() sess_options.enable_profiling = True sess = InferenceSession( get_model().SerializeToString(), sess_options, providers=["CPUExecutionProvider"] ) for _ in range(11): sess.run(None, dict(X=np.arange(10).astype(np.float32))) prof = sess.end_profiling() df = js_profile_to_dataframe(prof, first_it_out=True) print(df.head()) fig, ax = plt.subplots(1, 1, figsize=(10, 5)) plot_ort_profile_timeline(df, ax, title="test_timeline", quantile=0.5) fig.tight_layout() """ if ax is None: import matplotlib.pyplot as plt ax = plt.gca() df = df.copy() df["iteration"] = df["iteration"].astype(int) iterations = set(df["iteration"]) n_iter = iteration if iteration >= 0 else max(iterations) + 1 + iteration dfi = df[df["iteration"] == n_iter] assert dfi.shape[0] > 0, f"Iteration {iteration} cannot be found in {iterations}." if "fence_before" in set(dfi["event_name"]): started = {} data = [] for irow in dfi.iterrows(): assert isinstance(irow, tuple), f"pandas has changed its api, type is {type(irow)}" assert len(irow) == 2, f"pandas has changed its api, row is {irow}" row = irow[1] it = row["iteration"] op_type = row["args_op_name"] op_name = row["op_name"] event_name = row["event_name"] provider = row["args_provider"] ts = float(row["ts"]) dur = float(row["dur"]) if event_name == "fence_before": started[op_type, op_name, it] = dict( op_name=op_name, op_type=op_type, begin=ts ) elif event_name == "kernel_time": obs = started[op_type, op_name, it] obs["duration"] = dur obs["begin_kernel"] = ts obs["provider"] = provider elif event_name == "fence_after": obs = started[op_type, op_name, it] obs["end"] = ts data.append(obs) del started[op_type, op_name, it] else: assert event_name in { "SequentialExecutor::Execute", "model_run", }, f"Unexpected event_name={event_name!r}, row={row}" else: # New format data = [] for irow in dfi.iterrows(): row = irow[1] if row["event_name"] != "kernel_time": continue obs = dict( duration=float(row["dur"]), op_name=row["op_name"], op_type=row["args_op_name"], provider=row["args_provider"], begin=float(row["ts"]), end=float(row["ts"]) + float(row["dur"]), begin_kernel=float(row["ts"]), ) data.append(obs) # durations data_dur = list(sorted(d["duration"] for d in data)) threshold = data_dur[int(quantile * len(data_dur))] origin = dfi["ts"].min() colors = ["blue", "green", "red", "orange"] import matplotlib.patches as mpatches cs = [0, 0] for i, obs in enumerate(data): dur = obs["duration"] cat = int(dur >= threshold) # color color = colors[cat * 2 + cs[cat] % 2] cs[cat] += 1 # rectangle t1 = obs["begin"] - origin t2 = obs["end"] - origin shape = mpatches.Rectangle((0, t1), 1, t2 - t1, ec="none", color=color) ax.add_artist(shape) tk1 = obs["begin_kernel"] - origin tk2 = (obs["begin_kernel"] + obs["duration"]) - origin ax.plot([0, 1], [tk1, tk1], "b--") ax.plot([0, 1], [tk2, tk2], "b--") if i == 0: ax.plot([0, 2], [tk1, tk1], "b") elif i == len(data) - 1: ax.plot([0, 2], [tk2, tk2], "b") # text y = (tk1 + tk2) / 2 text = obs["op_type"] prov = obs["provider"].replace("ExecutionProvider", "") name = obs["op_name"] if len(name) >= 10: name = name[:5] + "..." + name[5:] ax.text(1, y, f"{i}:{prov}:{text}-{name}", fontsize=fontsize, va="center") ax.invert_yaxis() return ax