Source code for experimental_experiment.xoptim.patterns.onnx_attention

import inspect
from typing import List, Optional
import numpy as np
from onnx import NodeProto
from ...helpers import tensor_dtype_to_np_dtype
from ...xbuilder import FunctionOptions, GraphBuilder
from ..patterns_api import MatchResult, PatternOptimization


[docs] class FunctionAttentionPattern(PatternOptimization): """ Merges Attention nodes into a local function. Model with nodes to be fused: .. gdot:: :script: DOT-SECTION :process: from experimental_experiment.doc import to_dot import numpy as np import ml_dtypes import onnx import onnx.helper as oh import onnx.numpy_helper as onh opset_imports = [ oh.make_opsetid("", 18), oh.make_opsetid("intermediate", 1), ] inputs = [] outputs = [] nodes = [] initializers = [] sparse_initializers = [] functions = [] inputs.append( oh.make_tensor_value_info( "values", onnx.TensorProto.FLOAT, shape=("av", "bv", "cv", "dv") ) ) inputs.append( oh.make_tensor_value_info( "keys", onnx.TensorProto.FLOAT, shape=("ak", "bk", "ck", "dk") ) ) inputs.append( oh.make_tensor_value_info("scale_sqrt", onnx.TensorProto.FLOAT, shape=(1,)) ) inputs.append( oh.make_tensor_value_info( "mask", onnx.TensorProto.BOOL, shape=("am", "bm", "cm", "dm") ) ) inputs.append( oh.make_tensor_value_info( "query", onnx.TensorProto.FLOAT, shape=("aq", "bq", "cq", "dq") ) ) nodes.append( oh.make_node( "Constant", [], ["scale_sqrt"], value=onh.from_array( np.array([0.3162277638912201], dtype=np.float32), name="value" ), ) ) nodes.append( oh.make_node( "Constant", [], ["zero"], value=onh.from_array(np.array([0.0], dtype=np.float32), name="value"), ) ) nodes.append( oh.make_node( "Constant", [], ["minfty"], value=onh.from_array(np.array([-np.inf], dtype=np.float32), name="value"), ) ) nodes.append(oh.make_node("Mul", ["query", "scale_sqrt"], ["query_scaled"])) nodes.append(oh.make_node("Mul", ["keys", "scale_sqrt"], ["keys_scaled"])) nodes.append( oh.make_node( "Transpose", ["keys_scaled"], ["keys_scaled_t"], perm=[0, 1, 3, 2] ) ) nodes.append(oh.make_node("MatMul", ["query_scaled", "keys_scaled_t"], ["qk"])) nodes.append(oh.make_node("Where", ["mask", "zero", "minfty"], ["bias"])) nodes.append(oh.make_node("Add", ["qk", "bias"], ["qkb"])) nodes.append(oh.make_node("Softmax", ["qkb"], ["qkbs"], axis=-1)) nodes.append(oh.make_node("IsNaN", ["qkbs"], ["nans"])) nodes.append(oh.make_node("Where", ["nans", "zero", "qkbs"], ["filt"])) nodes.append(oh.make_node("MatMul", ["filt", "values"], ["Y"])) outputs.append( oh.make_tensor_value_info( "Y", onnx.TensorProto.FLOAT, shape=("ay", "by", "cy", "dy") ) ) graph = oh.make_graph( nodes, "pattern", inputs, outputs, initializers, sparse_initializer=sparse_initializers, ) model = oh.make_model(graph, functions=functions, opset_imports=opset_imports) print("DOT-SECTION", to_dot(model)) Outcome of the fusion: .. gdot:: :script: DOT-SECTION :process: from experimental_experiment.doc import to_dot import numpy as np import ml_dtypes import onnx import onnx.helper as oh import onnx.numpy_helper as onh opset_imports = [ oh.make_opsetid("", 18), oh.make_opsetid("intermediate", 1), ] inputs = [] outputs = [] nodes = [] initializers = [] sparse_initializers = [] functions = [] inputs.append( oh.make_tensor_value_info( "values", onnx.TensorProto.FLOAT, shape=("av", "bv", "cv", "dv") ) ) inputs.append( oh.make_tensor_value_info( "keys", onnx.TensorProto.FLOAT, shape=("ak", "bk", "ck", "dk") ) ) inputs.append( oh.make_tensor_value_info("scale_sqrt", onnx.TensorProto.FLOAT, shape=(1,)) ) inputs.append( oh.make_tensor_value_info( "mask", onnx.TensorProto.BOOL, shape=("am", "bm", "cm", "dm") ) ) inputs.append( oh.make_tensor_value_info( "query", onnx.TensorProto.FLOAT, shape=("aq", "bq", "cq", "dq") ) ) nodes.append( oh.make_node( "LocalAttention_to1", ["query", "keys", "values", "mask", "scale_sqrt"], ["Y"], domain="intermediate", ) ) outputs.append( oh.make_tensor_value_info( "Y", onnx.TensorProto.FLOAT, shape=("ay", "by", "cy", "dy") ) ) graph = oh.make_graph( nodes, "pattern", inputs, outputs, initializers, sparse_initializer=sparse_initializers, ) model = oh.make_model(graph, functions=functions, opset_imports=opset_imports) print("DOT-SECTION", to_dot(model)) """ _operator_name = "LocalAttention" _domain_name = "intermediate" def __init__(self, verbose: int = 0, priority: int = 0): super().__init__(verbose, priority)
[docs] def match( self, g: "GraphBuilderPatternOptimization", # noqa: F821 node: NodeProto, matched: List[MatchResult], ) -> Optional[MatchResult]: if node.op_type != "Softmax" or node.domain != "": return self.none() axis = g.get_attribute(node, "axis").i if axis != -1: return self.none(node, inspect.currentframe().f_lineno) add_node = g.node_before(node.input[0]) if add_node is None or add_node.op_type != "Add": return self.none(node, inspect.currentframe().f_lineno) where_node = g.node_before(add_node.input[1]) if where_node is None or where_node.op_type != "Where": return self.none(node, inspect.currentframe().f_lineno) if not g.is_constant_scalar(where_node.input[1]): return self.none(node, inspect.currentframe().f_lineno) if not g.is_constant_scalar(where_node.input[2]): return self.none(node, inspect.currentframe().f_lineno) cst_zero = g.get_constant_scalar(where_node.input[1]) if cst_zero != 0: return self.none(node, inspect.currentframe().f_lineno) cst_inf = g.get_constant_scalar(where_node.input[2]) if not np.isinf(cst_inf): return self.none(node, inspect.currentframe().f_lineno) mat_qk = g.node_before(add_node.input[0]) if mat_qk is None or mat_qk.op_type != "MatMul": return self.none(node, inspect.currentframe().f_lineno) mul1 = g.node_before(mat_qk.input[0]) if mul1 is None or mul1.op_type != "Mul": return self.none(node, inspect.currentframe().f_lineno) if not g.is_constant_scalar(mul1.input[1]): return self.none(node, inspect.currentframe().f_lineno) transpose = g.node_before(mat_qk.input[1]) if transpose is None or transpose.op_type != "Transpose": return self.none(node, inspect.currentframe().f_lineno) perm = g.get_attribute(transpose, "perm").ints if tuple(perm) != (0, 1, 3, 2): return self.none(node, inspect.currentframe().f_lineno) mul2 = g.node_before(transpose.input[0]) if mul2 is None or mul2.op_type != "Mul": return self.none(node, inspect.currentframe().f_lineno) if mul2.input[1] != mul1.input[1]: return self.none(node, inspect.currentframe().f_lineno) # after softmax next_nodes = g.next_nodes(node.output[0]) if len(next_nodes) != 2: return self.none(node, inspect.currentframe().f_lineno) if {n.op_type for n in next_nodes} != {"Where", "IsNaN"}: return self.none(node, inspect.currentframe().f_lineno) isnan, where2 = next_nodes[:: (1 if next_nodes[0].op_type == "IsNaN" else -1)] if where2.input[0] != isnan.output[0]: return self.none(node, inspect.currentframe().f_lineno) if where2.input[2] != node.output[0]: return self.none(node, inspect.currentframe().f_lineno) if not g.is_constant_scalar(where2.input[1]): return self.none(node, inspect.currentframe().f_lineno) cst = g.get_constant_scalar(where2.input[1]) if cst != 0: return self.none(node, inspect.currentframe().f_lineno) mat_qkvs = g.next_nodes(where2.output[0]) if len(mat_qkvs) != 1: return self.none(node, inspect.currentframe().f_lineno) mat_qkv = mat_qkvs[0] if mat_qkv.op_type != "MatMul": return self.none(node, inspect.currentframe().f_lineno) for n in [mul1, mul2, transpose, mat_qk, where_node, add_node, isnan, where2]: if g.is_used_more_than_once(n.output[0]): return self.none(node, inspect.currentframe().f_lineno) return MatchResult( self, [mul1, mul2, transpose, mat_qk, where_node, add_node, node, isnan, where2, mat_qkv], self.apply, )
[docs] def apply( self, g: "GraphBuilder", # noqa: F821 mul1: NodeProto, mul2: NodeProto, transpose: NodeProto, mat_qk: NodeProto, where_node: NodeProto, add_node: NodeProto, softmax: NodeProto, isnan: NodeProto, where: NodeProto, mat_qkv: NodeProto, ) -> List[NodeProto]: itype = g.get_type(mul1.input[1]) name = f"{self._operator_name}_to{itype}" attention_nodes = [ g.make_node( name, [ mul1.input[0], mul2.input[0], mat_qkv.input[1], where_node.input[0], mul1.input[1], ], [mat_qkv.output[0]], name=f"{self.__class__.__name__}--{softmax.name}", domain=self._domain_name, ) ] nodes_to_return = attention_nodes # Creates the local function if not g.builder.has_local_function(name, domain=self._domain_name): self._add_local_function(g.builder, name, itype=itype) return nodes_to_return
@classmethod def _add_local_function(cls, g: GraphBuilder, name: str, itype: int): lg = GraphBuilder(g.main_opset, as_function=True) lg.make_tensor_input("query") lg.make_tensor_input("keys") lg.make_tensor_input("values") lg.make_tensor_input("mask") lg.make_tensor_input("scale_sqrt") scaled_query = lg.op.Mul("query", "scale_sqrt", name=cls.__name__) scaled_keys = lg.op.Mul("keys", "scale_sqrt", name=cls.__name__) scaled_keys_t = lg.op.Transpose(scaled_keys, perm=(0, 1, 3, 2), name=cls.__name__) qk = lg.op.MatMul(scaled_query, scaled_keys_t, name=cls.__name__) dtype = tensor_dtype_to_np_dtype(itype) zero = np.array([0], dtype=dtype) minfty = np.array([-np.inf], dtype=dtype) bias = lg.op.Where("mask", zero, minfty, name=cls.__name__) softmax = lg.op.Softmax( lg.op.Add(qk, bias, name=cls.__name__), axis=-1, name=cls.__name__ ) filtered = lg.op.Where( lg.op.IsNaN(softmax, name=cls.__name__), zero, softmax, name=cls.__name__ ) lg.op.MatMul(filtered, "values", outputs=["Y"], name=cls.__name__) lg.make_tensor_output("Y") function_options = FunctionOptions( export_as_function=True, name=name, domain=cls._domain_name, move_initializer_to_constant=True, ) g.make_local_function(lg, function_options=function_options) assert g.has_local_function( name, domain=cls._domain_name ), f"The function {cls._domain_name}.{name} was not added to the builder."