Source code for experimental_experiment.xoptim.patterns.onnx_functions

import inspect
from typing import List, Optional
from onnx import NodeProto, TensorProto
from ..patterns_api import EasyPatternOptimization


[docs] class GeluPattern(EasyPatternOptimization): """ Detects the decomposed version of Gelu with Tanh .. math:: y = \\frac{x}{2} \\left(1 + \\tanh\\left(\\sqrt{\\frac{2}{\\pi}} (x + 0.044715 * x^3)\\right)\\right) Model with nodes to be fused: .. gdot:: :script: DOT-SECTION :process: from experimental_experiment.doc import to_dot import numpy as np import ml_dtypes import onnx import onnx.helper as oh import onnx.numpy_helper as onh opset_imports = [ oh.make_opsetid("", 20), ] inputs = [] outputs = [] nodes = [] initializers = [] sparse_initializers = [] functions = [] inputs.append( oh.make_tensor_value_info( "linear_5", onnx.TensorProto.FLOAT16, shape=(4, 512, 16384) ) ) nodes.append( oh.make_node( "Constant", [], ["init10_s1_5"], value=onh.from_array(np.array([3.0], dtype=np.float16), name="value"), ) ) nodes.append( oh.make_node( "Constant", [], ["init10_s_8"], value=onh.from_array( np.array(0.044708251953125, dtype=np.float16), name="value" ), ) ) nodes.append( oh.make_node( "Constant", [], ["init10_s_9"], value=onh.from_array(np.array(0.7978515625, dtype=np.float16), name="value"), ) ) nodes.append( oh.make_node( "Constant", [], ["init10_s_10"], value=onh.from_array(np.array(1.0, dtype=np.float16), name="value"), ) ) nodes.append( oh.make_node( "Constant", [], ["init10_s_7"], value=onh.from_array(np.array(0.5, dtype=np.float16), name="value"), ) ) nodes.append(oh.make_node("Pow", ["linear_5", "init10_s1_5"], ["pow_1"])) nodes.append(oh.make_node("Mul", ["pow_1", "init10_s_8"], ["_onx_mul05"])) nodes.append(oh.make_node("Add", ["linear_5", "_onx_mul05"], ["add_4"])) nodes.append(oh.make_node("Mul", ["add_4", "init10_s_9"], ["_onx_mul06"])) nodes.append(oh.make_node("Tanh", ["_onx_mul06"], ["tanh"])) nodes.append(oh.make_node("Add", ["tanh", "init10_s_10"], ["add_5"])) nodes.append(oh.make_node("Mul", ["linear_5", "init10_s_7"], ["_onx_mul04"])) nodes.append(oh.make_node("Mul", ["_onx_mul04", "add_5"], ["mul_4"])) outputs.append( oh.make_tensor_value_info("mul_4", onnx.TensorProto.FLOAT16, shape=(4, 512, 16384)) ) graph = oh.make_graph( nodes, "pattern", inputs, outputs, initializers, sparse_initializer=sparse_initializers, ) model = oh.make_model(graph, functions=functions, opset_imports=opset_imports) print("DOT-SECTION", to_dot(model)) Outcome of the fusion: .. gdot:: :script: DOT-SECTION :process: from experimental_experiment.doc import to_dot import numpy as np import ml_dtypes import onnx import onnx.helper as oh import onnx.numpy_helper as onh opset_imports = [ oh.make_opsetid("", 20), ] inputs = [] outputs = [] nodes = [] initializers = [] sparse_initializers = [] functions = [] inputs.append( oh.make_tensor_value_info( "linear_5", onnx.TensorProto.FLOAT16, shape=(4, 512, 16384) ) ) nodes.append(oh.make_node("Gelu", ["linear_5"], ["mul_4"], approximate="tanh")) outputs.append( oh.make_tensor_value_info("mul_4", onnx.TensorProto.FLOAT16, shape=(4, 512, 16384)) ) graph = oh.make_graph( nodes, "pattern", inputs, outputs, initializers, sparse_initializer=sparse_initializers, ) model = oh.make_model(graph, functions=functions, opset_imports=opset_imports) print("DOT-SECTION", to_dot(model)) """ def __init__( self, verbose: int = 0, priority: int = 0, min_opset: int = 20, domain: str = "" ): super().__init__(verbose, priority, min_opset=min_opset) self.domain = domain
[docs] def match_pattern(self, g: "GraphBuilder", x, c3, c04, cpi, one, c2): # noqa: F821 x3 = g.op.Pow(x, c3) # 3 cx3 = g.op.Mul(x3, c04) # 0.044715 add = g.op.Add(x, cx3) addm = g.op.Mul(add, cpi) # 0.7978515625 = 2/pi tanh = g.op.Tanh(addm) tanh1 = g.op.Add(tanh, one) # 1 x2 = g.op.Mul(x, c2) # 0.5 return g.op.Mul(x2, tanh1)
[docs] def apply_pattern( self, g: "GraphBuilder", # noqa: F821 x, c3, c04, cpi, one, c2, ): return g.op.Gelu(x, approximate="tanh", domain=self.domain)
[docs] def validate_mapping( self, g: "GraphBuilderPatternOptimization", # noqa: F821 deleted_nodes: List[NodeProto], pattern_nodes: Optional[List[NodeProto]] = None, ) -> bool: assert len(deleted_nodes) == 8, f"Unexpected pattern length {len(deleted_nodes)}" assert deleted_nodes[0].op_type == "Pow", f"-- {deleted_nodes[0]}" c3 = deleted_nodes[0].input[1] assert deleted_nodes[1].op_type == "Mul", f"-- {deleted_nodes[1]}" cx3 = deleted_nodes[1].input[1] assert deleted_nodes[3].op_type == "Mul", f"-- {deleted_nodes[3]}" cpi = deleted_nodes[3].input[1] assert deleted_nodes[5].op_type == "Add", f"-- {deleted_nodes[5]}" one = deleted_nodes[5].input[1] assert deleted_nodes[6].op_type == "Mul", f"-- {deleted_nodes[5]}" c2 = deleted_nodes[6].input[1] node = deleted_nodes[0] if not g.is_constant_scalar(c3) or g.get_constant_scalar(c3) != 3: return self.none(node, inspect.currentframe().f_lineno) if not g.is_constant_scalar(cx3) or g.get_constant_scalar(cx3) not in ( 0.044715, 0.044708251953125, ): return self.none(node, inspect.currentframe().f_lineno) if not g.is_constant_scalar(cpi) or g.get_constant_scalar(cpi) != 0.7978515625: return self.none(node, inspect.currentframe().f_lineno) if not g.is_constant_scalar(one) or g.get_constant_scalar(one) != 1: return self.none(node, inspect.currentframe().f_lineno) if not g.is_constant_scalar(c2) or g.get_constant_scalar(c2) != 0.5: return self.none(node, inspect.currentframe().f_lineno) return True
[docs] class LeakyReluPattern(EasyPatternOptimization): """ Detects the decomposed version of LeakyRelu. Model with nodes to be fused: .. gdot:: :script: DOT-SECTION :process: from experimental_experiment.doc import to_dot import numpy as np import ml_dtypes import onnx import onnx.helper as oh import onnx.numpy_helper as onh opset_imports = [ oh.make_opsetid("", 18), ] inputs = [] outputs = [] nodes = [] initializers = [] sparse_initializers = [] functions = [] inputs.append(oh.make_tensor_value_info("X1", onnx.TensorProto.FLOAT, shape=(3, 3))) nodes.append( oh.make_node( "Constant", [], ["zero"], value=onh.from_array(np.array([0.0], dtype=np.float32), name="value"), ) ) nodes.append( oh.make_node( "Constant", [], ["slope2"], value=onh.from_array( np.array([-0.33000001311302185], dtype=np.float32), name="value" ), ) ) nodes.append(oh.make_node("Greater", ["X1", "zero"], ["xpos2"])) nodes.append(oh.make_node("Mul", ["X1", "slope2"], ["xmul2"])) nodes.append(oh.make_node("Where", ["xpos2", "X1", "xmul2"], ["Y"])) outputs.append(oh.make_tensor_value_info("Y", onnx.TensorProto.FLOAT, shape=(3, 3))) graph = oh.make_graph( nodes, "pattern", inputs, outputs, initializers, sparse_initializer=sparse_initializers, ) model = oh.make_model(graph, functions=functions, opset_imports=opset_imports) print("DOT-SECTION", to_dot(model)) Outcome of the fusion: .. gdot:: :script: DOT-SECTION :process: from experimental_experiment.doc import to_dot import numpy as np import ml_dtypes import onnx import onnx.helper as oh import onnx.numpy_helper as onh opset_imports = [ oh.make_opsetid("", 18), ] inputs = [] outputs = [] nodes = [] initializers = [] sparse_initializers = [] functions = [] inputs.append(oh.make_tensor_value_info("X1", onnx.TensorProto.FLOAT, shape=(3, 3))) nodes.append(oh.make_node("LeakyRelu", ["X1"], ["Y"], alpha=-0.33000001311302185)) outputs.append(oh.make_tensor_value_info("Y", onnx.TensorProto.FLOAT, shape=(3, 3))) graph = oh.make_graph( nodes, "pattern", inputs, outputs, initializers, sparse_initializer=sparse_initializers, ) model = oh.make_model(graph, functions=functions, opset_imports=opset_imports) print("DOT-SECTION", to_dot(model)) """ def __init__(self, verbose: int = 0, priority: int = 0, min_opset: int = 6): super().__init__(verbose, priority, min_opset=min_opset)
[docs] def match_pattern(self, g: "GraphBuilder", x, zero, slope): # noqa: F821 return g.op.Where(g.op.Greater(x, zero), x, g.op.Mul(x, slope))
[docs] def apply_pattern( self, g: "GraphBuilder", # noqa: F821 x, zero, slope, ): # g is not the GraphBuilder for the main graph. return g.op.LeakyRelu(x, alpha=self.get_validate_param("slope"))
[docs] def validate_mapping( self, g: "GraphBuilderPatternOptimization", # noqa: F821 deleted_nodes: List[NodeProto], pattern_nodes: Optional[List[NodeProto]] = None, ) -> bool: assert len(deleted_nodes) == 3, f"Unexpected pattern length {len(deleted_nodes)}" assert deleted_nodes[2].op_type == "Where", f"-- {deleted_nodes[0]}" greater, mul = ( (deleted_nodes[0], deleted_nodes[1]) if deleted_nodes[0].op_type == "Greater" else (deleted_nodes[1], deleted_nodes[0]) ) zero = greater.input[1] slope = mul.input[1] if not g.is_constant_scalar(zero) or g.get_constant_scalar(zero) != 0: return self.none(greater, inspect.currentframe().f_lineno) if not g.is_constant_scalar(slope): return self.none(mul, inspect.currentframe().f_lineno) self.add_validate_param("slope", g.get_constant_scalar(slope)) return True
[docs] class SoftmaxCrossEntropyLossCastPattern(EasyPatternOptimization): """ Detects one decomposed version of SoftmaxCrossEntropyLoss. Model with nodes to be fused: .. gdot:: :script: DOT-SECTION :process: from experimental_experiment.doc import to_dot import numpy as np import ml_dtypes import onnx import onnx.helper as oh import onnx.numpy_helper as onh opset_imports = [ oh.make_opsetid("", 18), ] inputs = [] outputs = [] nodes = [] initializers = [] sparse_initializers = [] functions = [] inputs.append(oh.make_tensor_value_info("I", onnx.TensorProto.INT64, shape=("A",))) inputs.append( oh.make_tensor_value_info("X", onnx.TensorProto.FLOAT16, shape=("A", "B")) ) nodes.append( oh.make_node( "Constant", [], ["B"], value=onh.from_array(np.array([-100], dtype=np.int64), name="value"), ) ) nodes.append( oh.make_node( "Constant", [], ["zeroi"], value=onh.from_array(np.array([0], dtype=np.int64), name="value"), ) ) nodes.append( oh.make_node( "Constant", [], ["one"], value=onh.from_array(np.array([1], dtype=np.int64), name="value"), ) ) nodes.append( oh.make_node( "Constant", [], ["zerof"], value=onh.from_array(np.array([0.0], dtype=np.float16), name="value"), ) ) nodes.append(oh.make_node("Equal", ["I", "B"], ["eq1"])) nodes.append(oh.make_node("Not", ["eq1"], ["neq1"])) nodes.append(oh.make_node("Where", ["neq1", "I", "zeroi"], ["ind"])) nodes.append(oh.make_node("Unsqueeze", ["ind", "one"], ["flat_ind"])) nodes.append(oh.make_node("LogSoftmax", ["X"], ["logX"], axis=1)) nodes.append(oh.make_node("GatherElements", ["logX", "flat_ind"], ["gx"], axis=1)) nodes.append(oh.make_node("Squeeze", ["gx", "one"], ["flat_gx"])) nodes.append(oh.make_node("Neg", ["flat_gx"], ["neg_gx"])) nodes.append(oh.make_node("Where", ["neq1", "neg_gx", "zerof"], ["w2"])) nodes.append(oh.make_node("Cast", ["neq1"], ["neq1f"], to=1)) nodes.append( oh.make_node( "ReduceSum", ["neq1f"], ["red2"], keepdims=0, noop_with_empty_axes=0 ) ) nodes.append(oh.make_node("Cast", ["red2"], ["red2_16"], to=10)) nodes.append(oh.make_node("Cast", ["w2"], ["w2f"], to=1)) nodes.append( oh.make_node( "ReduceSum", ["w2f"], ["red1"], keepdims=0, noop_with_empty_axes=0 ) ) nodes.append(oh.make_node("Cast", ["red1"], ["red1_16"], to=10)) nodes.append(oh.make_node("Div", ["red1_16", "red2_16"], ["Y"])) outputs.append(oh.make_tensor_value_info("Y", onnx.TensorProto.FLOAT16, shape=[])) graph = oh.make_graph( nodes, "pattern", inputs, outputs, initializers, sparse_initializer=sparse_initializers, ) model = oh.make_model(graph, functions=functions, opset_imports=opset_imports) print("DOT-SECTION", to_dot(model)) Outcome of the fusion: .. gdot:: :script: DOT-SECTION :process: from experimental_experiment.doc import to_dot import numpy as np import ml_dtypes import onnx import onnx.helper as oh import onnx.numpy_helper as onh opset_imports = [ oh.make_opsetid("", 18), ] inputs = [] outputs = [] nodes = [] initializers = [] sparse_initializers = [] functions = [] inputs.append(oh.make_tensor_value_info("I", onnx.TensorProto.INT64, shape=("A",))) inputs.append( oh.make_tensor_value_info("X", onnx.TensorProto.FLOAT16, shape=("A", "B")) ) nodes.append( oh.make_node( "SoftmaxCrossEntropyLoss", ["X", "I"], ["Y"], ignore_index=-100, reduction="mean", ) ) outputs.append(oh.make_tensor_value_info("Y", onnx.TensorProto.FLOAT16, shape=[])) graph = oh.make_graph( nodes, "pattern", inputs, outputs, initializers, sparse_initializer=sparse_initializers, ) model = oh.make_model(graph, functions=functions, opset_imports=opset_imports) print("DOT-SECTION", to_dot(model)) """ def __init__( self, verbose: int = 0, priority: int = 0, min_opset: int = 14, domain: str = "" ): super().__init__(verbose, priority, min_opset=min_opset) self.domain = domain
[docs] def match_pattern( self, g: "GraphBuilder", # noqa: F821 X, indices, axis, zerof, zeroi, b, ): neq1 = g.op.Not(g.op.Equal(indices, b)) wh1 = g.op.Where(neq1, indices, zeroi) uns = g.op.Unsqueeze(wh1, axis) ge = g.op.GatherElements(g.op.LogSoftmax(X, axis=1), uns, axis=1) wh2 = g.op.Where(neq1, g.op.Neg(g.op.Squeeze(ge, axis)), zerof) denominator = g.op.Cast( g.op.ReduceSum( g.op.Cast(neq1, to=TensorProto.FLOAT), keepdims=0, ), to=TensorProto.FLOAT16, ) numerator = g.op.Cast( g.op.ReduceSum( g.op.Cast(wh2, to=TensorProto.FLOAT), keepdims=0, ), to=TensorProto.FLOAT16, ) return g.op.Div(numerator, denominator)
[docs] @classmethod def apply_pattern( cls, g: "GraphBuilder", # noqa: F821 X, indices, axis, zerof, zeroi, b, ): return g.op.SoftmaxCrossEntropyLoss(X, indices, ignore_index=-100, reduction="mean")
[docs] def validate_mapping( self, g: "GraphBuilderPatternOptimization", # noqa: F821 deleted_nodes: List[NodeProto], pattern_nodes: Optional[List[NodeProto]] = None, ) -> bool: assert len(deleted_nodes) == 16, f"Unexpected pattern length {len(deleted_nodes)}" node = deleted_nodes[-1] for n in deleted_nodes: if n.op_type in {"Squeeze", "Unsqueeze"}: c = n.input[1] if not g.is_constant_scalar(c) or g.get_constant_scalar(c) != 1: return self.none(node, inspect.currentframe().f_lineno) continue if n.op_type in {"Equal"}: c = n.input[1] if not g.is_constant_scalar(c) or g.get_constant_scalar(c) != -100: return self.none(node, inspect.currentframe().f_lineno) continue return True