Source code for experimental_experiment.xoptim.patterns_exp.simple_rotary

import inspect
from typing import List, Optional
import numpy as np
from onnx import NodeProto
from ..patterns_api import MatchResult, PatternOptimization


[docs] class SimpleRotaryPattern(PatternOptimization): """ Replaces ConstantOfShape + ScatterND with ScatterNDOfShape (com.domain). """
[docs] def match( self, g: "GraphBuilderPatternOptimization", # noqa: F821 node: NodeProto, matched: List[MatchResult], ) -> Optional[MatchResult]: if not g.has_processor("CUDA"): return self.none() if node.op_type != "Split" or node.domain != "": return self.none() if not g.has_rank(node.input[0]): return self.none(node, inspect.currentframe().f_lineno) axis = g.get_attribute(node, "axis", exc=False) if axis is None: axis = 0 else: axis = axis.i rk = g.get_rank(node.input[0]) if axis < 0: axis += rk if axis != rk - 1: return self.none(node, inspect.currentframe().f_lineno) if len(node.input) == 2: cst = g.get_computed_constant(node.input[1]) if cst.dtype != np.int64 or cst.shape != (2,) or cst[0] != cst[1]: return self.none(node, inspect.currentframe().f_lineno) else: att = g.get_attribute(node, "num_outputs", exc=False) if att is None or att.i != 2: return self.none(node, inspect.currentframe().f_lineno) if g.is_used_more_than_once(node.output[0]) or g.is_used_more_than_once( node.output[1] ): return self.none(node, inspect.currentframe().f_lineno) left_node = g.next_node(node.output[0]) right_node = g.next_node(node.output[1]) if left_node.op_type != "Neg" and right_node.op_type != "Neg": return self.none(node, inspect.currentframe().f_lineno) if left_node.op_type != "Concat" and right_node.op_type != "Concat": return self.none(node, inspect.currentframe().f_lineno) if left_node.op_type == "Neg": inputs = [node.output[1], left_node.output[0]] neg_node = left_node concat_node = right_node else: inputs = [right_node.output[0], node.output[0]] neg_node = right_node concat_node = left_node if inputs != list(concat_node.input): return self.none(node, inspect.currentframe().f_lineno) if g.is_used_more_than_once(neg_node.output[0]): return self.none(node, inspect.currentframe().f_lineno) axis_ = g.get_attribute(concat_node, "axis", exc=False).i if axis_ < 0: axis_ += rk if axis != axis_: return self.none(node, inspect.currentframe().f_lineno) return MatchResult( self, [node, neg_node, concat_node], self.apply, insert_at=concat_node )
[docs] def apply( self, g: "GraphBuilder", # noqa: F821 split_node: NodeProto, neg_node: NodeProto, concat_node: NodeProto, ) -> List[NodeProto]: side = "right" if neg_node.input[0] == split_node.output[1] else "left" new_node = g.make_node( "Rotary", split_node.input, concat_node.output, side=side, name=f"{self.__class__.__name__}--{neg_node.name}", domain="onnx_extended.ortops.optim.cuda", ) return [new_node]