Source code for onnx_extended.tools.js_profile

import json
import warnings
from typing import List, Optional, Union
from pandas import DataFrame


_mapping_types = {
    "float": "F",
    "double": "D",
    "float16": "H",
    "uint8": "U8",
    "uint16": "U16",
    "uint32": "U32",
    "uint64": "U64",
    "int8": "I8",
    "int16": "I16",
    "int32": "I32",
    "int64": "I64",
}


def _process_shape(shape_df):
    if isinstance(shape_df, float) or len(shape_df) == 0:
        return ""
    values = []
    for val in shape_df:
        if len(val) != 1:
            raise ValueError(f"Unable to process shape {val!r} from {values!r}.")
        k, v = list(val.items())[0]
        if v:
            vs = "x".join(map(str, v))
            values.append(f"{_mapping_types.get(k,k)}[{vs}]")
        else:
            values.append(f"{_mapping_types.get(k,k)}")
    return "+".join(values)


def post_process_df_profile(
    df: DataFrame,
    first_it_out: bool = False,
    agg: bool = False,
    agg_op_name: bool = True,
    with_shape: bool = False,
) -> DataFrame:
    """
    Post-processed a dataframe obtained after profiling onnxruntime.
    It adds a column for a more explicit event name and adds
    a column for the iteration number

    :param agg: aggregate the result
    :param first_it_out: leave the first iteration
        out of the aggregation
    :param agg_op_name: aggregate on operator name or operator index
    :param with_shape: keep the shape to aggregate
    :return: DataFrame
    """
    events = {"kernel_time", "fence_after", "fence_before"}

    def sep_event(s):
        for e in events:
            if s.endswith(e):
                return e
        return s

    df = df.copy()
    df["event_name"] = df["name"].apply(sep_event)
    df["iteration"] = -1
    current = -1
    for i in range(df.shape[0]):
        if df.loc[i, "name"] == "SequentialExecutor::Execute":
            current += 1
        df.loc[i, "iteration"] = current

    if not agg:
        if with_shape:
            df["args_input_type_shape"] = df["args_input_type_shape"].apply(
                _process_shape
            )
            df["args_output_type_shape"] = df["args_output_type_shape"].apply(
                _process_shape
            )
        else:
            df = df.drop(["args_input_type_shape", "args_output_type_shape"], axis=1)
        if first_it_out:
            df["it==0"] = (df["iteration"] <= 0).astype(int)
        return df

    agg_cols = ["cat", "args_node_index", "args_op_name", "args_provider", "event_name"]
    if with_shape:
        agg_cols.append("args_input_type_shape")
        df["args_input_type_shape"] = df["args_input_type_shape"].apply(_process_shape)
        df["args_output_type_shape"] = df["args_output_type_shape"].apply(
            _process_shape
        )
    else:
        df = df.drop(["args_input_type_shape", "args_output_type_shape"], axis=1)

    if first_it_out:
        df["it==0"] = (df["iteration"] <= 0).astype(int)
        agg_cols.insert(0, "it==0")
    if agg_op_name:
        del agg_cols[agg_cols.index("args_node_index")]
    for c in agg_cols:
        df[c] = df[c].fillna("")
    df["dur"] = df["dur"].fillna(0)
    agg = df[agg_cols + ["dur"]].groupby(agg_cols).sum()
    return agg


[docs] def js_profile_to_dataframe( filename: str, as_df: bool = True, first_it_out: bool = False, agg: bool = False, agg_op_name: bool = False, with_shape: bool = False, ) -> Union[List, DataFrame]: """ Profiles the execution of an onnx graph with onnxruntime. :param filename: filename holding the profiling stored in json format :param as_df: returns the :param first_it_out: if aggregated, leaves the first iteration out :param agg: aggregate by event :param agg_op_name: aggregate on operator name or operator index :param with_shape: keep the shape before aggregating :return: DataFrame or dictionary """ with open(filename, "r") as f: content = f.read() js = json.loads(content) suffixes = ["_kernel_time", "_fence_before", "_fence_after"] rows = [] for row in js: if "args" in row and isinstance(row["args"], dict): for k, v in row["args"].items(): row[f"args_{k}"] = v del row["args"] name = row["name"] for suf in suffixes: if name.endswith(suf): changed = name[: -len(suf)] row["op_name"] = changed break rows.append(row) if as_df: return post_process_df_profile( DataFrame(rows), first_it_out=first_it_out, agg=agg, agg_op_name=agg_op_name, with_shape=with_shape, ) return rows
def _preprocess_graph1(df): df = df.copy() df["args_provider"] = df["args_provider"].apply( lambda s: s.replace("ExecutionProvider", "") if isinstance(s, str) else s ) agg_cols = ["dur", "args_op_name", "args_provider"] for c in ["it==0", "args_input_type_shape"]: if c in df.columns: agg_cols.append(c) if "it==0" in df.columns: vs = ["t>=1", "t=0"] df["it==0"] = df["it==0"].apply(lambda v: vs[v]) gr_dur = df[agg_cols].groupby(agg_cols[1:]).sum().sort_values("dur") gr_n = df[agg_cols].groupby(agg_cols[1:]).count() gr_n = gr_n.loc[gr_dur.index, :] gr_n.columns = ["count"] gr = gr_dur.merge(gr_n, left_index=True, right_index=True, how="outer") gr["ratio"] = gr["dur"] / gr["dur"].sum() return gr_dur, gr_n, gr def _preprocess_graph2(df): df = df.reset_index(drop=False).copy() df["args_node_index"] = df["args_node_index"].apply( lambda i: int(i) if i not in {None, ""} else -1 ) df["args_provider"] = df["args_provider"].apply( lambda s: s.replace("ExecutionProvider", "") if isinstance(s, str) else s ) df = df[(df["cat"] == "Node") & (df["event_name"] == "kernel_time")] agg_cols = ["dur", "args_node_index", "args_op_name", "args_provider"] for c in ["it==0", "args_input_type_shape"]: if c in df.columns: agg_cols.append(c) if "it==0" in df.columns: vs = ["t>=1", "t=0"] df["it==0"] = df["it==0"].apply(lambda v: vs[v]) df = df[agg_cols].groupby(agg_cols[1:]).sum() df = df.sort_index(ascending=False) df["ratio"] = df["dur"] / df["dur"].sum() return df
[docs] def plot_ort_profile( df: DataFrame, ax0: Optional["matplotlib.axes.Axes"] = None, ax1: Optional["matplotlib.axes.Axes"] = None, title: Optional[str] = None, ) -> "matplotlib.axes.Axes": """ Plots time spend in computation based on a dataframe produced by function :func:`js_profile_to_dataframe`. :param df: dataframe :param ax0: first axis to draw time :param ax1: second axis to draw occurences :param title: graph title :return: the graph """ fontsize = 10 if ax0 is None: import matplotlib.pyplot as plt ax0 = plt.gca() if "args_provider" in df.columns: # Aggregation by operator gr_dur, gr_n, _ = _preprocess_graph1(df) gr_dur.plot.barh(ax=ax0) with warnings.catch_warnings(): warnings.simplefilter("ignore") ax0.set_xticklabels(ax0.get_xticklabels(), fontsize=fontsize) ax0.get_yaxis().set_label_text("") ax0.set_yticklabels( ax0.get_yticklabels(), rotation=45, ha="right", fontsize=fontsize ) if title is not None: ax0.set_title(title) if ax1 is not None: gr_n.plot.barh(ax=ax1) ax1.set_title("n occurences") with warnings.catch_warnings(): warnings.simplefilter("ignore") ax1.set_xticklabels(ax1.get_xticklabels(), fontsize=fontsize) ax1.get_yaxis().set_label_text("") ax1.set_yticklabels( ax1.get_yticklabels(), rotation=45, ha="right", fontsize=fontsize ) return ax0 df = _preprocess_graph2(df) df[["dur"]].plot.barh(ax=ax0) if title is not None: ax0.set_title(title) with warnings.catch_warnings(): warnings.simplefilter("ignore") ax0.set_xticklabels(ax0.get_xticklabels(), fontsize=fontsize) ax0.get_yaxis().set_label_text("") ax0.set_yticklabels(ax0.get_yticklabels(), fontsize=fontsize) return ax0
[docs] def plot_ort_profile_timeline( df: DataFrame, ax: Optional["matplotlib.axes.Axes"] = None, iteration: int = -2, title: Optional[str] = None, quantile: float = 0.5, fontsize: int = 12, ) -> "matplotlib.axes.Axes": """ Creates a timeline based on a dataframe produced by function :func:`js_profile_to_dataframe`. :param df: dataframe :param ax: first axis to draw time :param iteration: iteration to plot, negative value to start from the end :param title: graph title :param quantile: draw the 10% less consuming operators in a different color :param fontsize: font size :return: the graph """ if ax is None: import matplotlib.pyplot as plt ax = plt.gca() df = df.copy() df["iteration"] = df["iteration"].astype(int) iterations = set(df["iteration"]) n_iter = iteration if iteration >= 0 else max(iterations) + 1 + iteration dfi = df[df["iteration"] == n_iter] assert dfi.shape[0] > 0, f"Iteration {iteration} cannot be found in {iterations}." started = {} data = [] for irow in dfi.iterrows(): assert isinstance( irow, tuple ), f"pandas has changed its api, type is {type(row)}" assert len(irow) == 2, f"pandas has changed its api, row is {row}" row = irow[1] it = row["iteration"] op_type = row["args_op_name"] op_name = row["op_name"] event_name = row["event_name"] provider = row["args_provider"] ts = float(row["ts"]) dur = float(row["dur"]) if event_name == "fence_before": started[op_type, op_name, it] = dict( op_name=op_name, op_type=op_type, begin=ts ) elif event_name == "kernel_time": obs = started[op_type, op_name, it] obs["duration"] = dur obs["begin_kernel"] = ts obs["provider"] = provider elif event_name == "fence_after": obs = started[op_type, op_name, it] obs["end"] = ts data.append(obs) del started[op_type, op_name, it] else: assert event_name in { "SequentialExecutor::Execute", "model_run", }, f"Unexpected event_name={event_name!r}, row={row}" # durations data_dur = list(sorted(d["duration"] for d in data)) threshold = data_dur[int(quantile * len(data_dur))] origin = dfi["ts"].min() colors = ["blue", "green", "red", "orange"] import matplotlib.patches as mpatches cs = [0, 0] for i, obs in enumerate(data): dur = obs["duration"] cat = int(dur >= threshold) # color color = colors[cat * 2 + cs[cat] % 2] cs[cat] += 1 # rectangle t1 = obs["begin"] - origin t2 = obs["end"] - origin shape = mpatches.Rectangle((0, t1), 1, t2 - t1, ec="none", color=color) ax.add_artist(shape) tk1 = obs["begin_kernel"] - origin tk2 = (obs["begin_kernel"] + obs["duration"]) - origin ax.plot([0, 1], [tk1, tk1], "b--") ax.plot([0, 1], [tk2, tk2], "b--") if i == 0: ax.plot([0, 2], [tk1, tk1], "b") elif i == len(data) - 1: ax.plot([0, 2], [tk2, tk2], "b") # text y = (tk1 + tk2) / 2 text = obs["op_type"] prov = obs["provider"].replace("ExecutionProvider", "") name = obs["op_name"] if len(name) >= 10: name = name[:5] + "..." + name[5:] ax.text(1, y, f"{i}:{prov}:{text}-{name}", fontsize=fontsize, va="center") ax.invert_yaxis() return ax