Source code for onnx_array_api.ort.ort_profile

import json
import os
from typing import Any, Dict, List, Optional, Tuple, Union
import numpy
from onnx import ModelProto
from pandas import DataFrame


def post_process_df_profile(
    df: DataFrame,
    first_it_out: bool = False,
    agg: bool = False,
    agg_op_name: bool = True,
) -> DataFrame:
    """
    Post-processed a dataframe obtained after profiling onnxruntime.
    It adds a column for a more explicit event name and adds
    a column for the iteration number

    :param agg: aggregate the result
    :param first_it_out: leave the first iteration
        out of the aggregation
    :param agg_op_name: aggregate on operator name or operator index
    :return: DataFrame
    """
    events = {"kernel_time", "fence_after", "fence_before"}

    def sep_event(s):
        for e in events:
            if s.endswith(e):
                return e
        return s

    df = df.copy()
    df["event_name"] = df["name"].apply(sep_event)
    df["iteration"] = -1
    current = -1
    for i in range(df.shape[0]):
        if df.loc[i, "name"] == "SequentialExecutor::Execute":
            current += 1
        df.loc[i, "iteration"] = current

    if not agg:
        return df

    agg_cols = ["cat", "args_node_index", "args_op_name", "args_provider", "event_name"]
    if first_it_out:
        df["it==0"] = (df["iteration"] <= 0).astype(int)
        agg_cols.insert(0, "it==0")
    if agg_op_name:
        del agg_cols[agg_cols.index("args_node_index")]
    for c in agg_cols:
        df[c] = df[c].fillna("")
    df["dur"] = df["dur"].fillna(0)
    agg = df[agg_cols + ["dur"]].groupby(agg_cols).sum()
    return agg


[docs]def ort_profile( filename_or_bytes: Union[str, bytes, ModelProto], feeds: Dict[str, numpy.ndarray], sess_options: Optional[Any] = None, disable_optimization: bool = False, repeat: int = 10, as_df: bool = True, providers: Optional[List[str]] = None, first_it_out: bool = False, agg: bool = False, agg_op_name: bool = False, **kwargs, ) -> Union[List, DataFrame]: """ Profiles the execution of an onnx graph with onnxruntime. :param filename_or_bytes: filename or bytes :param feeds: inputs, dictionary of numpy arrays :param sess_options: instance of :class:`onnxruntime.SessionOptions` :param disable_optimization: disable onnxruntime optimization :param repeat: number of times to run the inference :param as_df: returns the :param providers: list of providers to use when initializing the inference session, if None, the default value is `["CPUExecutionProvider"]` :param first_it_out: if aggregated, leaves the first iteration out :param agg: aggregate by event :param agg_op_name: aggregate on operator name or operator index :param kwargs: additional parameters when initializing the inference session :return: DataFrame or dictionary """ from onnxruntime import GraphOptimizationLevel, InferenceSession, SessionOptions if sess_options is None: sess_options = SessionOptions() if disable_optimization: sess_options.graph_optimization_level = GraphOptimizationLevel.ORT_DISABLE_ALL sess_options.enable_profiling = True obj = ( filename_or_bytes.SerializeToString() if isinstance(filename_or_bytes, ModelProto) else filename_or_bytes ) if providers is None: providers = ["CPUExecutionProvider"] sess = InferenceSession(obj, sess_options, providers=providers, **kwargs) first = list(feeds.values())[0] if isinstance(first, numpy.ndarray): for i in range(repeat): sess.run(None, feeds) else: out_names = [o.name for o in sess.get_outputs()] for i in range(repeat): sess._sess.run_with_ort_values(feeds, out_names, None) prof = sess.end_profiling() with open(prof, "r") as f: content = f.read() js = json.loads(content) os.remove(prof) suffixes = ["_kernel_time", "_fence_before", "_fence_after"] rows = [] for row in js: if "args" in row and isinstance(row["args"], dict): for k, v in row["args"].items(): row[f"args_{k}"] = v del row["args"] name = row["name"] for suf in suffixes: if name.endswith(suf): changed = name[: -len(suf)] row["op_name"] = changed break rows.append(row) if as_df: return post_process_df_profile( DataFrame(rows), first_it_out=first_it_out, agg=agg, agg_op_name=agg_op_name ) return rows
def _merge_ort_profile_preprocess(df): groupkey = [ "args_op_name", "args_output_type_shape", "args_input_type_shape", "args_provider", ] def _idx(row): """ There may be multiple node with the same input/output types and shapes. This function gives every instance a distinct id. First unique op with same I/O receives the index 0. The counter restart when the session goes to the next image. """ if row["cat"] == "Session": occurences[0] = {} return -1 assert "idx" not in groupkey vals = [row[k] for k in groupkey] key = tuple(map(str, vals)) if key not in occurences[0]: occurences[0][key] = 0 else: occurences[0][key] += 1 return occurences[0][key] df = df.copy() occurences = [{}] df["idx"] = df.apply(_idx, axis=1) df = df[(df["cat"] == "Node") & df["name"].str.contains("kernel_time")] groupkey.append("idx") for c in groupkey: if c != "idx": df[c] = df[c].apply(str) df = df.copy() df["count"] = 1 gr = df[groupkey + ["dur", "count"]].groupby(groupkey) return gr.sum() def _process_shape(s: Tuple[int, ...], keys: Dict[str, str]) -> str: value = eval(s) ns = [] for v in value: if len(v) != 1: raise NotImplementedError(f"Unexpected value {v} in {s!r}.") k, v = list(v.items())[0] n = "-".join([keys[k], "x".join(map(str, v))]) ns.append(n) return ",".join(ns) def _label(row: Dict[str, Any], column: Optional[str], keys: Dict[str, str]) -> str: name = row["args_op_name"] inshape = _process_shape(row["args_input_type_shape"], keys) outshape = _process_shape(row["args_output_type_shape"], keys) side = row["side"][0] prov = row["args_provider"][:3] add = "" if column is None else f"[{row[column]}]" return f"[{side}{prov}]{name}({inshape})->{outshape}{add}"
[docs]def merge_ort_profile( prof1: DataFrame, prof2: DataFrame, suffixes: Tuple[str, str] = ("base", "opti"), by_column: Optional[str] = None, ) -> Tuple[DataFrame, DataFrame]: """ Merges two profiles produced by function :func:`ort_profile <onnx_array_api.ort.ort_profile.ort_profile>`. :param prof1: first profile :param prof2: second profile :param suffixes: used by pandas merge :param by_column: the second profile merged by input, output shapes and types plus an additional column, usually `None`, `'idx'` or `'op_name'` :return: merged profiles """ # First merge base = _merge_ort_profile_preprocess(prof1) opti = _merge_ort_profile_preprocess(prof2) merge = base.merge( opti, how="outer", suffixes=suffixes, left_index=True, right_index=True ) merge = merge.reset_index(drop=False) # Second merge def classify(row): if numpy.isnan(row[f"dur{suffixes[1]}"]): return "-" if numpy.isnan(row[f"dur{suffixes[0]}"]): return "+" return "=" keys = {"float": "f"} df = merge.copy() df["side"] = df.apply(classify, axis=1) df["label"] = df.apply(lambda row: _label(row, by_column, keys), axis=1) gr = ( df[ [ "label", f"dur{suffixes[0]}", f"dur{suffixes[1]}", f"count{suffixes[0]}", f"count{suffixes[1]}", ] ] .groupby("label") .agg( { f"dur{suffixes[0]}": numpy.sum, f"dur{suffixes[1]}": numpy.sum, f"count{suffixes[0]}": numpy.sum, f"count{suffixes[1]}": numpy.sum, } ) ) return merge, gr