import inspect
from typing import List, Optional
import numpy as np
from onnx import NodeProto
from onnx.numpy_helper import from_array
from ...helpers import tensor_dtype_to_np_dtype
from ..patterns_api import MatchResult, PatternOptimization
[docs]
class LayerNormalizationPattern(PatternOptimization):
"""
Fuses node of a LayerNormalization.
"""
[docs]
def match(
self,
g: "GraphBuilderPatternOptimization", # noqa: F821
node: NodeProto,
matched: List[MatchResult],
) -> Optional[MatchResult]:
if node.op_type != "ReduceMean" or node.domain != "":
return self.none()
if len(node.input) != 2:
# Not defined for older opset than 18.
return self.none(node, inspect.currentframe().f_lineno)
if not g.is_constant(node.input[1]):
return self.none(node, inspect.currentframe().f_lineno)
axis = g.get_computed_constant(node.input[1])
if axis.tolist() != [-1]:
if not g.has_rank(node.input[0]):
return self.none(node, inspect.currentframe().f_lineno)
rk = g.get_rank(node.input[0])
al = axis.tolist()
if al != list(range(rk - len(al), rk)):
return self.none(node, inspect.currentframe().f_lineno)
# before
pow = g.node_before(node.input[0])
if pow is None:
return self.none(node, inspect.currentframe().f_lineno)
if pow.op_type != "Pow" or len(g.next_nodes(pow.output[0])) != 1:
return self.none(node, inspect.currentframe().f_lineno)
if (
not g.is_constant_scalar(pow.input[1], broadcast=True)
or g.get_constant_scalar(pow.input[1], broadcast=True) != 2
):
return self.none(node, inspect.currentframe().f_lineno)
sub = g.node_before(pow.input[0])
if sub is None:
return self.none(node, inspect.currentframe().f_lineno)
if sub.op_type != "Sub" or len(g.next_nodes(sub.output[0])) != 2:
return self.none(node, inspect.currentframe().f_lineno)
red = g.node_before(sub.input[1])
if red is None:
return self.none(node, inspect.currentframe().f_lineno)
if red.op_type != "ReduceMean" or len(g.next_nodes(red.output[0])) != 1:
return self.none(node, inspect.currentframe().f_lineno)
if not g.is_constant(red.input[1]):
return self.none(node, inspect.currentframe().f_lineno)
axis2 = g.get_computed_constant(red.input[1])
if axis.tolist() != axis2.tolist():
return self.none(node, inspect.currentframe().f_lineno)
if sub.input[0] != red.input[0]:
return self.none(node, inspect.currentframe().f_lineno)
kp = g.get_attribute(red, "keepdims", exc=False)
if kp is None or kp.i != 1:
return self.none(node, inspect.currentframe().f_lineno)
# after
add = g.next_nodes(node.output[0])
if len(add) != 1:
return self.none(node, inspect.currentframe().f_lineno)
if add[0].op_type == "Add":
add = add[0]
if not g.is_constant_scalar(add.input[1], broadcast=True):
return self.none(node, inspect.currentframe().f_lineno)
sqrt = g.next_nodes(add.output[0])
else:
add = None
if add is None:
sqrt = g.next_nodes(node.output[0])
if len(sqrt) != 1 or sqrt[0].op_type != "Sqrt":
return self.none(node, inspect.currentframe().f_lineno)
sqrt = sqrt[0]
div = g.next_nodes(sqrt.output[0])
if len(div) != 1:
return self.none(node, inspect.currentframe().f_lineno)
div = div[0]
if div.op_type == "Div":
if len(g.next_nodes(div.input[1])) != 1:
return self.none(node, inspect.currentframe().f_lineno)
if div.input[0] != sub.output[0]:
return self.none(node, inspect.currentframe().f_lineno)
elif div.op_type == "Reciprocal":
if div.input[0] != sub.output[0]:
return self.none(node, inspect.currentframe().f_lineno)
else:
return self.none(node, inspect.currentframe().f_lineno)
return MatchResult(
self, [red, sub, pow, node, add, sqrt, div], self.apply, insert_at=node
)
[docs]
def apply(
self,
g: "GraphBuilder", # noqa: F821
red: NodeProto,
sub: NodeProto,
pow: NodeProto,
node: NodeProto,
add: Optional[NodeProto],
sqrt: NodeProto,
div: NodeProto,
) -> List[NodeProto]:
itype = g.get_type(red.input[0])
dtype = tensor_dtype_to_np_dtype(itype)
axis = g.get_computed_constant(red.input[1]).tolist()
scale = None
dtype = tensor_dtype_to_np_dtype(g.get_type(red.input[0]))
new_nodes = []
if axis == [-1]:
ly_axis = -1
if g.has_shape(red.input[0]):
shape = g.get_shape(red.input[0])
if isinstance(shape[-1], int):
scale = g.make_initializer(
"",
np.ones((shape[-1],), dtype=dtype),
source="LayerNormalizationPattern.apply.scale",
)
bias = g.make_initializer(
"",
np.zeros((shape[-1],), dtype=dtype),
source="LayerNormalizationPattern.apply.bias",
)
else:
ly_axis = min(axis)
if scale is None:
shape = g.unique_name(f"{self.__class__.__name__}_Sh_{red.input[0]}")
new_nodes.append(
g.make_node(
"Shape",
[red.input[0]],
[shape],
start=ly_axis,
name=f"{self.__class__.__name__}--{red.name}",
)
)
scale = g.unique_name(f"{self.__class__.__name__}_Sc_{red.input[0]}")
new_nodes.append(
g.make_node(
"ConstantOfShape",
[shape],
[scale],
name=f"{self.__class__.__name__}--{red.name}",
value=from_array(np.array([1], dtype=dtype)),
)
)
bias = g.unique_name(f"{self.__class__.__name__}_Bi_{red.input[0]}")
new_nodes.append(
g.make_node(
"ConstantOfShape",
[shape],
[bias],
name=f"{self.__class__.__name__}--{red.name}",
value=from_array(np.array([0], dtype=dtype)),
)
)
eps = (
g.get_constant_scalar(add.input[1], broadcast=True)
if add
else 9.999999960041972e-13
)
new_nodes.append(
g.make_node(
"LayerNormalization",
[red.input[0], scale, bias],
[div.output[0]],
epsilon=float(eps),
name=f"{self.__class__.__name__}--{node.name}",
doc_string=node.doc_string,
stash_type=1, # itype,
axis=ly_axis,
)
)
return new_nodes
[docs]
class LayerNormalizationScalePattern(PatternOptimization):
"""
Fused LayerNormalization, scale, bias just after.
"""
[docs]
def match(
self,
g: "GraphBuilderPatternOptimization", # noqa: F821
node: NodeProto,
matched: List[MatchResult],
) -> Optional[MatchResult]:
if node.op_type != "LayerNormalization" or node.domain != "":
return self.none()
if len(node.output) != 1:
# No need for the scale.
return self.none(node, inspect.currentframe().f_lineno)
nodes = g.next_nodes(node.output[0])
if len(nodes) != 1 or nodes[0].op_type != "Mul":
return self.none(node, inspect.currentframe().f_lineno)
mul_node = nodes[0]
nodes = g.next_nodes(mul_node.output[0])
if len(nodes) == 0:
return MatchResult(self, [node, mul_node, None], self.apply, insert_at=mul_node)
index = 1 if mul_node.input[0] == node.output[0] else 0
if not g.has_shape(mul_node.input[index]):
return self.none(node, inspect.currentframe().f_lineno)
if not g.has_shape(node.input[1]):
return self.none(node, inspect.currentframe().f_lineno)
if g.get_shape(mul_node.input[index]) != g.get_shape(node.input[1]):
return self.none(node, inspect.currentframe().f_lineno)
nodes = g.next_nodes(mul_node.output[0])
if len(nodes) != 1 or nodes[0].op_type != "Add":
return MatchResult(self, [node, mul_node, None], self.apply, insert_at=nodes[0])
add_node = nodes[0]
index = 1 if add_node.input[0] == mul_node.output[0] else 0
if not g.has_shape(add_node.input[index]):
return self.none(node, inspect.currentframe().f_lineno)
if not g.has_shape(node.input[1]):
return self.none(node, inspect.currentframe().f_lineno)
if g.get_shape(add_node.input[index]) != g.get_shape(node.input[1]):
return self.none(node, inspect.currentframe().f_lineno)
return MatchResult(self, [node, mul_node, add_node], self.apply, insert_at=nodes[0])
[docs]
def apply(
self,
g: "GraphBuilder", # noqa: F821
ln_node: NodeProto,
mul_node: NodeProto,
add_node: Optional[NodeProto],
) -> List[NodeProto]:
# scale
scale = (
mul_node.input[1] if mul_node.input[0] == ln_node.output[0] else mul_node.input[0]
)
new_scale = None
if g.is_constant_scalar(ln_node.input[1], broadcast=True):
fscale = g.get_constant_scalar(ln_node.input[1], broadcast=True)
if fscale == 1:
new_scale = scale
new_nodes = []
if new_scale is None:
new_scale = g.unique_name(f"{self.__class__.__name__}_{ln_node.input[1]}")
node = g.make_node(
"Mul",
[ln_node.input[1], scale],
[new_scale],
name=f"{self.__class__.__name__}--{ln_node.name}",
)
new_nodes.append(node)
if add_node:
if len(ln_node.input) == 2:
new_bias = (
add_node.input[1]
if add_node.input[0] == mul_node.output[0]
else add_node.input[0]
)
else:
# there is an existing bias
existing_bias = ln_node.input[2]
mul_cst = (
mul_node.input[0]
if mul_node.input[1] == ln_node.output[0]
else mul_node.input[1]
)
add_cst = (
add_node.input[0]
if add_node.input[1] == mul_node.output[0]
else add_node.input[1]
)
# new_bias = existing_bias * mul_cst + add_cst
temp = g.unique_name(f"{self.__class__.__name__}_{ln_node.input[1]}")
new_bias = g.unique_name(f"{self.__class__.__name__}_{ln_node.input[1]}")
new_nodes.extend(
[
g.make_node(
"Mul",
[mul_cst, existing_bias],
[temp],
name=f"{self.__class__.__name__}--{ln_node.name}",
),
g.make_node(
"Add",
[temp, add_cst],
[new_bias],
name=f"{self.__class__.__name__}--{ln_node.name}",
),
]
)
else:
new_bias = ln_node.input[2] if len(ln_node.input) > 2 else None
kwargs = {}
axis = g.get_attribute(ln_node, "axis", exc=None)
if axis:
kwargs["axis"] = axis.i
epsilon = g.get_attribute(ln_node, "epsilon", exc=None)
if epsilon:
kwargs["epsilon"] = epsilon.f
stash_type = g.get_attribute(ln_node, "stash_type", exc=None)
if stash_type:
kwargs["stash_type"] = stash_type.i
new_node = g.make_node(
"LayerNormalization",
(
[ln_node.input[0], new_scale, new_bias]
if new_bias
else [ln_node.input[0], new_scale]
),
[(add_node or mul_node).output[0]],
name=f"{self.__class__.__name__}--{ln_node.name}",
doc_string=ln_node.doc_string,
**kwargs,
)
return [*new_nodes, new_node]
[docs]
class CastLayerNormalizationCastPattern(PatternOptimization):
"""
Checks that a Cast is really needed around LayerNormalization
"""
[docs]
def match(
self,
g: "GraphBuilderPatternOptimization", # noqa: F821
node: NodeProto,
matched: List[MatchResult],
) -> Optional[MatchResult]:
if node.op_type not in (
"LayerNormalization",
"SimplifiedLayerNormalization",
) or node.domain not in ("", "com.microsoft"):
return self.none()
if len(node.output) > 1 and g.is_used(node.output[1]):
# No need for the scale.
return self.none(node, inspect.currentframe().f_lineno)
stash_type = g.get_attribute(node, "stash_type", exc=False)
stash_itype = 1 if stash_type is None else stash_type.i
cast_before = g.node_before(node.input[0])
if cast_before is None or cast_before.op_type != "Cast" or cast_before.domain != "":
return self.none(node, inspect.currentframe().f_lineno)
to = g.get_attribute(cast_before, "to")
if to.i != stash_itype:
return self.none(node, inspect.currentframe().f_lineno)
if g.is_used_more_than_once(node.input[0]):
return self.none(node, inspect.currentframe().f_lineno)
cast_afters = g.next_nodes(node.output[0])
if len(cast_afters) != 1:
return self.none(node, inspect.currentframe().f_lineno)
cast_after = cast_afters[0]
if cast_after.op_type != "Cast" or cast_after.domain != "":
return self.none(node, inspect.currentframe().f_lineno)
to = g.get_attribute(cast_after, "to")
itype = g.get_type(cast_before.input[0])
if to.i != itype:
return self.none(node, inspect.currentframe().f_lineno)
return MatchResult(self, [cast_before, node, cast_after], self.apply, insert_at=node)
[docs]
def apply(
self,
g: "GraphBuilder", # noqa: F821
cast_before: NodeProto,
node: NodeProto,
cast_after: NodeProto,
) -> List[NodeProto]:
itype = g.get_type(cast_before.input[0])
other = []
nodes = []
for i in node.input[1:]:
name = g.unique_name(f"{self.__class__.__name__}_{i}")
other.append(name)
nodes.append(
g.make_node(
"Cast",
[i],
[name],
to=itype,
name=f"{self.__class__.__name__}--cast--{node.name}",
)
)
new_node = g.make_node(
node.op_type,
[cast_before.input[0], *other],
[cast_after.output[0], *node.output[1:]],
name=f"{self.__class__.__name__}--{node.name}",
doc_string=node.doc_string,
domain=node.domain,
)
new_node.attribute.extend(node.attribute)
return [*nodes, new_node]
[docs]
class BatchNormalizationPattern(PatternOptimization):
"""
Checks that a BatchNormalization is really needed.
"""
def __init__(self, verbose: int = 0, priority: int = 0):
super().__init__(verbose, priority)
[docs]
def match(
self,
g: "GraphBuilderPatternOptimization", # noqa: F821
node: NodeProto,
matched: List[MatchResult],
) -> Optional[MatchResult]:
if node.op_type != "BatchNormalization" or node.domain != "":
return self.none()
if len(node.output) > 1 and g.next_nodes(node.output[1]):
return self.none(node, inspect.currentframe().f_lineno)
if len(node.output) > 2 and g.next_nodes(node.output[2]):
return self.none(node, inspect.currentframe().f_lineno)
momentum = 0.9
epsilon = 1e-5
training_mode = 0
for att in node.attribute:
if att.name == "momentum":
momentum = att.f
elif att.name == "epsilon":
epsilon = att.f
elif att.name == "training_mode":
training_mode = att.i
if training_mode and momentum != 0:
return self.none(node, inspect.currentframe().f_lineno)
if epsilon != 0:
return self.none(node, inspect.currentframe().f_lineno)
if not g.is_constant(node.input[1]):
return self.none(node, inspect.currentframe().f_lineno)
if not g.is_constant(node.input[2]):
return self.none(node, inspect.currentframe().f_lineno)
if not g.is_constant(node.input[3]):
return self.none(node, inspect.currentframe().f_lineno)
if not g.is_constant(node.input[3]):
return self.none(node, inspect.currentframe().f_lineno)
# biases
for z in node.input[2:4]:
cst = g.get_computed_constant(z)
if cst is None:
return self.none(node, inspect.currentframe().f_lineno)
if cst.min() == cst.max() == 0:
continue
return self.none(node, inspect.currentframe().f_lineno)
# scales
for z in [node.input[1], node.input[4]]:
cst = g.get_computed_constant(z)
if cst is None:
return self.none(node, inspect.currentframe().f_lineno)
if cst.min() == cst.max() == 1:
continue
return self.none(node, inspect.currentframe().f_lineno)
return MatchResult(self, [node], self.apply, insert_at=node)
[docs]
def apply(
self,
g: "GraphBuilder", # noqa: F821
node: NodeProto,
) -> List[NodeProto]:
new_node = g.make_node(
"Identity",
node.input[:1],
node.output[:1],
name=f"{self.__class__.__name__}--{node.name}",
doc_string=node.doc_string,
)
return [new_node]
[docs]
class BatchNormalizationTrainingPattern(PatternOptimization):
"""
Checks that a BatchNormalization in training mode can be avoided.
"""
def __init__(self, verbose: int = 0, priority: int = 0):
super().__init__(verbose, priority)
[docs]
def match(
self,
g: "GraphBuilderPatternOptimization", # noqa: F821
node: NodeProto,
matched: List[MatchResult],
) -> Optional[MatchResult]:
if node.op_type != "BatchNormalization" or node.domain != "":
return self.none()
if g.main_opset < 18:
return self.none(node, inspect.currentframe().f_lineno)
if not g.has_rank(node.input[0]) or g.get_rank(node.input[0]) < 2:
return self.none(node, inspect.currentframe().f_lineno)
if len(node.output) > 1 and (
not g.has_rank(node.input[1]) or g.next_nodes(node.output[1])
):
return self.none(node, inspect.currentframe().f_lineno)
if len(node.output) > 2 and (
not g.has_rank(node.input[2]) or g.next_nodes(node.output[2])
):
return self.none(node, inspect.currentframe().f_lineno)
momentum = 0.9
training_mode = 0
for att in node.attribute:
if att.name == "momentum":
momentum = att.f
elif att.name == "training_mode":
training_mode = att.i
if not training_mode and momentum != 1:
return self.none(node, inspect.currentframe().f_lineno)
return MatchResult(self, [node], self.apply, insert_at=node)
[docs]
def apply(
self,
g: "GraphBuilder", # noqa: F821
node: NodeProto,
) -> List[NodeProto]:
nname = f"{self.__class__.__name__}--{node.name}"
rk = g.get_rank(node.input[0])
axes = tuple(np.delete(np.arange(rk), 1))
init_axes = g.make_initializer(
"",
np.array(list(axes), dtype=np.int64),
source="BatchNormalizationTrainingPattern.apply.init_axes",
)
mean_name = g.unique_name(f"{self.__class__.__name__}_mean_{node.input[0]}")
mean = g.make_node(
"ReduceMean", [node.input[0], init_axes], [mean_name], keepdims=1, name=nname
)
centered_name = g.unique_name(f"{self.__class__.__name__}_center_{node.input[0]}")
sub = g.make_node("Sub", [node.input[0], mean_name], [centered_name], name=nname)
x2 = g.unique_name(f"{self.__class__.__name__}_x2_{node.input[0]}")
mul2 = g.make_node("Mul", [centered_name, centered_name], [x2], name=nname)
var_name = g.unique_name(f"{self.__class__.__name__}_var_{node.input[0]}")
var = g.make_node("ReduceMean", [x2, init_axes], [var_name], keepdims=1, name=nname)
dtype = tensor_dtype_to_np_dtype(g.get_type(node.input[0]))
epsilon = g.get_attributes_with_default(node, epsilon=1e-5)["epsilon"]
init_epsilon = g.make_initializer(
"",
np.array([epsilon], dtype=dtype),
source="BatchNormalizationTrainingPattern.apply.init_epsilon",
)
vare_name = g.unique_name(f"{self.__class__.__name__}_vareps_{node.input[0]}")
add = g.make_node("Add", [var_name, init_epsilon], [vare_name], name=nname)
std_name = g.unique_name(f"{self.__class__.__name__}_vareps_{node.input[0]}")
sqrt = g.make_node("Sqrt", [vare_name], [std_name], name=nname)
new_shape = [1 for _ in range(rk)]
new_shape[1] = -1
new_shape = g.make_initializer(
"",
np.array(new_shape, dtype=np.int64),
source="BatchNormalizationTrainingPattern.apply.new_shape",
)
if g.get_rank(node.input[1]) == 1:
scale_name = g.unique_name(f"{self.__class__.__name__}_scale_{node.input[1]}")
scale = g.make_node(
"Reshape", [node.input[1], new_shape], [scale_name], name=nname
)
else:
scale_name = node.input[1]
scale = None
if g.get_rank(node.input[2]) == 1:
bias_name = g.unique_name(f"{self.__class__.__name__}_bias_{node.input[2]}")
bias = g.make_node("Reshape", [node.input[2], new_shape], [bias_name], name=nname)
else:
bias_name = node.input[2]
bias = None
scaled_name = g.unique_name(f"{self.__class__.__name__}_scaled_{node.input[1]}")
scaled = g.make_node("Div", [centered_name, std_name], [scaled_name], name=nname)
scaled2_name = g.unique_name(f"{self.__class__.__name__}_scaled2_{node.input[2]}")
scaled2 = g.make_node("Mul", [scaled_name, scale_name], [scaled2_name], name=nname)
final = g.make_node("Add", [scaled2_name, bias_name], [node.output[0]], name=nname)
return [
_
for _ in [mean, sub, mul2, var, add, sqrt, scale, bias, scaled, scaled2, final]
if _ is not None
]