import inspect
from typing import List, Optional
import numpy as np
from onnx import NodeProto
from ..patterns_api import MatchResult, PatternOptimization
[docs]
class SqueezeUnsqueezePattern(PatternOptimization):
"""
Replaces the sequence Squeeze, Unsqueeze by Identity or the other ways around.
Model with nodes to be fused:
.. gdot::
:script: DOT-SECTION
:process:
from experimental_experiment.doc import to_dot
import numpy as np
import ml_dtypes
import onnx
import onnx.helper as oh
import onnx.numpy_helper as onh
opset_imports = [
oh.make_opsetid("", 18),
]
inputs = []
outputs = []
nodes = []
initializers = []
sparse_initializers = []
functions = []
inputs.append(
oh.make_tensor_value_info("X", onnx.TensorProto.FLOAT, shape=("a", 1, 1, "d"))
)
nodes.append(
oh.make_node(
"Constant",
[],
["axes1"],
value=onh.from_array(np.array([1, 2], dtype=np.int64), name="value"),
)
)
nodes.append(
oh.make_node(
"Constant",
[],
["axes2"],
value=onh.from_array(np.array([1, 2], dtype=np.int64), name="value"),
)
)
nodes.append(oh.make_node("Unsqueeze", ["X", "axes1"], ["mm"]))
nodes.append(oh.make_node("Squeeze", ["mm", "axes2"], ["Y"]))
outputs.append(
oh.make_tensor_value_info("Y", onnx.TensorProto.FLOAT, shape=("a", 1, 1, "d"))
)
graph = oh.make_graph(
nodes,
"pattern",
inputs,
outputs,
initializers,
sparse_initializer=sparse_initializers,
)
model = oh.make_model(graph, functions=functions, opset_imports=opset_imports)
print("DOT-SECTION", to_dot(model))
Outcome of the fusion:
.. gdot::
:script: DOT-SECTION
:process:
from experimental_experiment.doc import to_dot
import numpy as np
import ml_dtypes
import onnx
import onnx.helper as oh
import onnx.numpy_helper as onh
opset_imports = [
oh.make_opsetid("", 18),
]
inputs = []
outputs = []
nodes = []
initializers = []
sparse_initializers = []
functions = []
inputs.append(
oh.make_tensor_value_info("X", onnx.TensorProto.FLOAT, shape=("a", 1, 1, "d"))
)
nodes.append(oh.make_node("Identity", ["X"], ["Y"]))
outputs.append(
oh.make_tensor_value_info("Y", onnx.TensorProto.FLOAT, shape=("a", 1, 1, "d"))
)
graph = oh.make_graph(
nodes,
"pattern",
inputs,
outputs,
initializers,
sparse_initializer=sparse_initializers,
)
model = oh.make_model(graph, functions=functions, opset_imports=opset_imports)
print("DOT-SECTION", to_dot(model))
"""
def __init__(self, verbose: int = 0, priority: int = 0):
super().__init__(verbose, priority)
def _diff_axes(
self,
g: "GraphBuilderPatternOptimization", # noqa: F821
first_node: NodeProto,
second_node: NodeProto,
):
if first_node.op_type == "Unsqueeze" and len(second_node.input) == 1:
return "Squeeze", None
axes1 = (
None if len(first_node.input) == 1 else g.get_computed_constant(first_node.input[1])
)
axes2 = (
None if len(second_node.input) == 1 else g.get_computed_constant(second_node.input[1])
)
if axes1 is None and first_node.op_type == "Squeeze" and g.has_shape(first_node.input[0]):
axes1 = tuple(i for i, a in enumerate(g.get_shape(first_node.input[0])) if a == 1)
if (
axes2 is None
and second_node.op_type == "Squeeze"
and g.has_shape(second_node.input[0])
):
axes2 = tuple(i for i, a in enumerate(g.get_shape(second_node.input[0])) if a == 1)
if len(first_node.input) == 2 and axes1 is None:
return self.none(second_node, inspect.currentframe().f_lineno)
if len(second_node.input) == 2 and axes2 is None:
return self.none(second_node, inspect.currentframe().f_lineno)
tax1 = tuple(map(int, axes1))
tax2 = tuple(map(int, axes2))
if tax1 == tax2:
if len(axes1) > 1 and tuple(map(int, axes1)) != tuple(
range(min(axes1), max(axes1) + 1)
):
return self.none(second_node, inspect.currentframe().f_lineno)
return "Identity", None
if first_node.op_type == "Unsqueeze" and set(tax1) < set(tax2):
keep_axes = sorted(set(tax2) - set(tax1))
for i in range(len(keep_axes)):
m = len([t for t in tax1 if t < keep_axes[i]])
keep_axes[i] -= m
return "Squeeze", tuple(keep_axes)
return self.none(second_node, inspect.currentframe().f_lineno)
[docs]
def match(
self,
g: "GraphBuilderPatternOptimization", # noqa: F821
node: NodeProto,
matched: List[MatchResult],
) -> Optional[MatchResult]:
if node.op_type not in {"Squeeze", "Unsqueeze"} or node.domain != "":
return self.none()
node_before = g.node_before(node.input[0])
if (
node_before is None
or node_before.op_type not in {"Squeeze", "Unsqueeze"}
or node_before.op_type == node.op_type
or node_before.domain != ""
):
return self.none(node, inspect.currentframe().f_lineno)
diff = self._diff_axes(g, node_before, node)
if diff is None:
return diff
return MatchResult(
self,
[node_before, node],
self.apply,
insert_at=node_before if g.is_used_more_than_once(node.input[0]) else node,
)
[docs]
def apply(
self,
g: "GraphBuilder", # noqa: F821
node_first: NodeProto,
node_second: NodeProto,
) -> List[NodeProto]:
diff = self._diff_axes(g, node_first, node_second)
assert diff is not None, "Match should not have happened then."
op_type, args = diff
if args is None:
new_node = g.make_node(
op_type,
[node_first.input[0]],
[node_second.output[0]],
name=f"{self.__class__.__name__}--{node_first.name}",
doc_string=node_first.doc_string,
)
else:
new_axes = g.make_initializer(
"",
np.array(args, dtype=np.int64),
source="SqueezeUnsqueezePattern.apply.new_axes",
)
new_node = g.make_node(
op_type,
[node_first.input[0], new_axes],
[node_second.output[0]],
name=f"{self.__class__.__name__}--{node_first.name}",
doc_string=node_first.doc_string,
)
return (
[node_first, new_node]
if g.is_used_more_than_once(node_second.input[0])
else [new_node]
)
[docs]
class UnsqueezeUnsqueezePattern(PatternOptimization):
"""
Replaces the sequence Unsqueeze, Unsqueeze by Unsqueeze.
Model with nodes to be fused:
.. gdot::
:script: DOT-SECTION
:process:
from experimental_experiment.doc import to_dot
import numpy as np
import ml_dtypes
import onnx
import onnx.helper as oh
import onnx.numpy_helper as onh
opset_imports = [
oh.make_opsetid("", 18),
]
inputs = []
outputs = []
nodes = []
initializers = []
sparse_initializers = []
functions = []
inputs.append(
oh.make_tensor_value_info("X", onnx.TensorProto.FLOAT, shape=("a", "b"))
)
nodes.append(
oh.make_node(
"Constant",
[],
["ii"],
value=onh.from_array(np.array([2], dtype=np.int64), name="value"),
)
)
nodes.append(
oh.make_node(
"Constant",
[],
["jj"],
value=onh.from_array(np.array([3], dtype=np.int64), name="value"),
)
)
nodes.append(oh.make_node("Unsqueeze", ["X", "ii"], ["x1"]))
nodes.append(oh.make_node("Unsqueeze", ["x1", "jj"], ["Y"]))
outputs.append(
oh.make_tensor_value_info("Y", onnx.TensorProto.FLOAT, shape=(1, 1, "a", "b"))
)
graph = oh.make_graph(
nodes,
"pattern",
inputs,
outputs,
initializers,
sparse_initializer=sparse_initializers,
)
model = oh.make_model(graph, functions=functions, opset_imports=opset_imports)
print("DOT-SECTION", to_dot(model))
Outcome of the fusion:
.. gdot::
:script: DOT-SECTION
:process:
from experimental_experiment.doc import to_dot
import numpy as np
import ml_dtypes
import onnx
import onnx.helper as oh
import onnx.numpy_helper as onh
opset_imports = [
oh.make_opsetid("", 18),
]
inputs = []
outputs = []
nodes = []
initializers = []
sparse_initializers = []
functions = []
inputs.append(oh.make_tensor_value_info("X", onnx.TensorProto.FLOAT, shape=("a", "b")))
nodes.append(
oh.make_node(
"Constant",
[],
["init7_s2_2_3"],
value=onh.from_array(np.array([2, 3], dtype=np.int64), name="value"),
)
)
nodes.append(oh.make_node("Unsqueeze", ["X", "init7_s2_2_3"], ["Y"]))
outputs.append(
oh.make_tensor_value_info("Y", onnx.TensorProto.FLOAT, shape=(1, 1, "a", "b"))
)
graph = oh.make_graph(
nodes,
"pattern",
inputs,
outputs,
initializers,
sparse_initializer=sparse_initializers,
)
model = oh.make_model(graph, functions=functions, opset_imports=opset_imports)
print("DOT-SECTION", to_dot(model))
"""
def __init__(self, verbose: int = 0, priority: int = 0):
super().__init__(verbose, priority)
[docs]
def match(
self,
g: "GraphBuilderPatternOptimization", # noqa: F821
node: NodeProto,
matched: List[MatchResult],
) -> Optional[MatchResult]:
if node.op_type != "Unsqueeze" or node.domain != "":
return self.none()
if not g.has_rank(node.input[0]):
return self.none(node, inspect.currentframe().f_lineno)
next_nodes = [n for n in g.next_nodes(node.output[0]) if n.op_type == "Unsqueeze"]
if not next_nodes:
return self.none(node, inspect.currentframe().f_lineno)
next_node = next_nodes[0]
if next_node.op_type != "Unsqueeze" or node.domain != "":
return self.none(node, inspect.currentframe().f_lineno)
if next_node.input[0] != node.output[0]:
return self.none(node, inspect.currentframe().f_lineno)
if not g.is_constant(node.input[1]) or not g.is_constant(next_node.input[1]):
return self.none(node, inspect.currentframe().f_lineno)
if not g.has_rank(node.input[1]) or not g.has_rank(next_node.input[1]):
return self.none(node, inspect.currentframe().f_lineno)
if g.get_rank(node.input[1]) != 1 or g.get_rank(next_node.input[1]) != 1:
return self.none(node, inspect.currentframe().f_lineno)
return MatchResult(self, [node, next_node], self.apply, insert_at=node)
@classmethod
def _unsqueeze(cls, current, axes):
if len(axes) == 1:
current.insert(axes[0], True)
else:
for a in axes:
current.insert(a, True)
return current
[docs]
def apply(
self,
g: "GraphBuilder", # noqa: F821
node: NodeProto,
next_node: NodeProto,
) -> List[NodeProto]:
axis1 = g.get_constant_or_attribute(node, "axis", 1)
axis2 = g.get_constant_or_attribute(next_node, "axis", 1)
rk = g.get_rank(node.input[0])
existing = [False for i in range(rk)]
existing = self._unsqueeze(existing, axis1)
assert axis1.min() < 0 or [i for i, a in enumerate(existing) if a] == list(axis1), (
f"Something is wrong: rk={rk}, axis={axis1}, existing={existing}, shapes:"
f"{g.get_shape(node.input[0]) if g.has_shape(node.input[0]) else '?'}, "
f"{g.get_shape(node.output[0]) if g.has_shape(node.output[0]) else '?'}"
)
existing = self._unsqueeze(existing, axis2)
new_axes = [i for i, a in enumerate(existing) if a]
new_axis = g.make_initializer(
"",
np.array(new_axes, dtype=np.int64),
source="UnsqueezeUnsqueezePattern.apply.new_axis",
)
new_node = g.make_node(
"Unsqueeze",
[node.input[0], new_axis],
next_node.output,
name=f"{self.__class__.__name__}--{node.name}",
doc_string=next_node.doc_string,
)
if g.is_used_more_than_once(node.output[0]):
return [node, new_node]
return [new_node]
[docs]
class SqueezeAddPattern(PatternOptimization):
"""
Replaces the sequence Add(Squeeze, Squeeze) by Squeeze(Add).
Model with nodes to be fused:
.. gdot::
:script: DOT-SECTION
:process:
from experimental_experiment.doc import to_dot
import numpy as np
import ml_dtypes
import onnx
import onnx.helper as oh
import onnx.numpy_helper as onh
opset_imports = [
oh.make_opsetid("", 18),
]
inputs = []
outputs = []
nodes = []
initializers = []
sparse_initializers = []
functions = []
inputs.append(oh.make_tensor_value_info("S2", onnx.TensorProto.INT64, shape=(1,)))
inputs.append(oh.make_tensor_value_info("S1", onnx.TensorProto.INT64, shape=(1,)))
nodes.append(oh.make_node("Squeeze", ["S1"], ["s1"]))
nodes.append(oh.make_node("Squeeze", ["S2"], ["s2"]))
nodes.append(oh.make_node("Add", ["s1", "s2"], ["s"]))
outputs.append(oh.make_tensor_value_info("s", onnx.TensorProto.INT64, shape=[]))
graph = oh.make_graph(
nodes,
"pattern",
inputs,
outputs,
initializers,
sparse_initializer=sparse_initializers,
)
model = oh.make_model(graph, functions=functions, opset_imports=opset_imports)
print("DOT-SECTION", to_dot(model))
Outcome of the fusion:
.. gdot::
:script: DOT-SECTION
:process:
from experimental_experiment.doc import to_dot
import numpy as np
import ml_dtypes
import onnx
import onnx.helper as oh
import onnx.numpy_helper as onh
opset_imports = [
oh.make_opsetid("", 18),
]
inputs = []
outputs = []
nodes = []
initializers = []
sparse_initializers = []
functions = []
inputs.append(oh.make_tensor_value_info("S2", onnx.TensorProto.INT64, shape=(1,)))
inputs.append(oh.make_tensor_value_info("S1", onnx.TensorProto.INT64, shape=(1,)))
nodes.append(oh.make_node("Add", ["S1", "S2"], ["SqueezeAddPattern_s"]))
nodes.append(oh.make_node("Squeeze", ["SqueezeAddPattern_s"], ["s"]))
outputs.append(oh.make_tensor_value_info("s", onnx.TensorProto.INT64, shape=[]))
graph = oh.make_graph(
nodes,
"pattern",
inputs,
outputs,
initializers,
sparse_initializer=sparse_initializers,
)
model = oh.make_model(graph, functions=functions, opset_imports=opset_imports)
print("DOT-SECTION", to_dot(model))
"""
def __init__(self, verbose: int = 0, priority: int = 0):
super().__init__(verbose, priority)
[docs]
def match(
self,
g: "GraphBuilderPatternOptimization", # noqa: F821
node: NodeProto,
matched: List[MatchResult],
) -> Optional[MatchResult]:
if node.op_type != "Add" or node.domain != "" or g.builder.main_opset < 13:
return self.none()
node_before = [g.node_before(node.input[0]), g.node_before(node.input[1])]
if (
not node_before[0]
or not node_before[1]
or node_before[0].op_type != "Squeeze"
or node_before[1].op_type != "Squeeze"
):
return self.none(node, inspect.currentframe().f_lineno)
if len(node_before[0].input) == 2:
s1 = g.builder.value_as_shape(node_before[0].input[1])
else:
if not g.has_shape(node_before[0].input[0]) or g.get_shape(
node_before[0].input[0]
) != (1,):
return self.none(node, inspect.currentframe().f_lineno)
s1 = (0,)
if len(node_before[1].input) == 2:
s2 = g.builder.value_as_shape(node_before[1].input[1])
else:
if not g.has_shape(node_before[1].input[0]) or g.get_shape(
node_before[1].input[0]
) != (1,):
return self.none(node, inspect.currentframe().f_lineno)
s2 = (0,)
if s1 is None or s2 is None or s1 != s2:
return self.none(node, inspect.currentframe().f_lineno)
return MatchResult(self, [*node_before, node], self.apply)
[docs]
def apply(
self,
g: "GraphBuilder", # noqa: F821
squeeze1: NodeProto,
squeeze2: NodeProto,
add: NodeProto,
) -> List[NodeProto]:
new_name = g.unique_name(f"{self.__class__.__name__}_{add.output[0]}")
new_nodes = [
g.make_node(
"Add",
[squeeze1.input[0], squeeze2.input[0]],
[new_name],
name=f"{self.__class__.__name__}--{add.name}",
doc_string=add.doc_string,
),
g.make_node(
"Squeeze",
[new_name, *squeeze1.input[1:]],
add.output,
name=f"{self.__class__.__name__}--{squeeze1.name}",
doc_string=squeeze1.doc_string,
),
]
if g.is_used_more_than_once(add.input[1]):
new_nodes = [squeeze2, *new_nodes]
if g.is_used_more_than_once(add.input[0]):
new_nodes = [squeeze1, *new_nodes]
return new_nodes
[docs]
class SqueezeBinaryUnsqueezePattern(PatternOptimization):
"""
Replaces the sequence Squeeze Binary Unsqueeze) by Binary.
Model with nodes to be fused:
.. gdot::
:script: DOT-SECTION
:process:
from experimental_experiment.doc import to_dot
import numpy as np
import ml_dtypes
import onnx
import onnx.helper as oh
import onnx.numpy_helper as onh
opset_imports = [
oh.make_opsetid("", 18),
]
inputs = []
outputs = []
nodes = []
initializers = []
sparse_initializers = []
functions = []
inputs.append(oh.make_tensor_value_info("zero", onnx.TensorProto.INT64, shape=(1,)))
inputs.append(oh.make_tensor_value_info("d", onnx.TensorProto.INT64, shape=(1,)))
inputs.append(oh.make_tensor_value_info("two", onnx.TensorProto.INT64, shape=[]))
nodes.append(
oh.make_node(
"Constant",
[],
["two"],
value=onh.from_array(np.array(2, dtype=np.int64), name="value"),
)
)
nodes.append(
oh.make_node(
"Constant",
[],
["zero"],
value=onh.from_array(np.array([0], dtype=np.int64), name="value"),
)
)
nodes.append(oh.make_node("Squeeze", ["d"], ["d0"]))
nodes.append(oh.make_node("Div", ["d0", "two"], ["d1"]))
nodes.append(oh.make_node("Unsqueeze", ["d1", "zero"], ["e"]))
outputs.append(oh.make_tensor_value_info("e", onnx.TensorProto.INT64, shape=(1,)))
graph = oh.make_graph(
nodes,
"pattern",
inputs,
outputs,
initializers,
sparse_initializer=sparse_initializers,
)
model = oh.make_model(graph, functions=functions, opset_imports=opset_imports)
print("DOT-SECTION", to_dot(model))
Outcome of the fusion:
.. gdot::
:script: DOT-SECTION
:process:
from experimental_experiment.doc import to_dot
import numpy as np
import ml_dtypes
import onnx
import onnx.helper as oh
import onnx.numpy_helper as onh
opset_imports = [
oh.make_opsetid("", 18),
]
inputs = []
outputs = []
nodes = []
initializers = []
sparse_initializers = []
functions = []
inputs.append(oh.make_tensor_value_info("zero", onnx.TensorProto.INT64, shape=(1,)))
inputs.append(oh.make_tensor_value_info("d", onnx.TensorProto.INT64, shape=(1,)))
inputs.append(oh.make_tensor_value_info("two", onnx.TensorProto.INT64, shape=[]))
nodes.append(
oh.make_node(
"Unsqueeze", ["two", "zero"], ["SqueezeBinaryUnsqueezePattern_two"]
)
)
nodes.append(
oh.make_node("Div", ["d", "SqueezeBinaryUnsqueezePattern_two"], ["e"])
)
outputs.append(oh.make_tensor_value_info("e", onnx.TensorProto.INT64, shape=(1,)))
graph = oh.make_graph(
nodes,
"pattern",
inputs,
outputs,
initializers,
sparse_initializer=sparse_initializers,
)
model = oh.make_model(graph, functions=functions, opset_imports=opset_imports)
print("DOT-SECTION", to_dot(model))
"""
def __init__(self, verbose: int = 0, priority: int = 0):
super().__init__(verbose, priority)
[docs]
def match(
self,
g: "GraphBuilderPatternOptimization", # noqa: F821
node: NodeProto,
matched: List[MatchResult],
) -> Optional[MatchResult]:
if node.op_type != "Unsqueeze" or node.domain != "" or g.builder.main_opset < 13:
return self.none()
if not g.is_constant_scalar(node.input[1]):
return self.none(node, inspect.currentframe().f_lineno)
scalar = g.get_constant_scalar(node.input[1])
if scalar != 0:
return self.none(node, inspect.currentframe().f_lineno)
binary = g.node_before(node.input[0])
if binary is None or binary.op_type not in {"Add", "Div", "Mul", "Sub"}:
return self.none(node, inspect.currentframe().f_lineno)
if g.is_used_more_than_once(binary.output[0]):
return self.none(node, inspect.currentframe().f_lineno)
if g.is_used_more_than_once(binary.input[0]):
return self.none(node, inspect.currentframe().f_lineno)
if not g.has_rank(binary.input[1]):
return self.none(node, inspect.currentframe().f_lineno)
if g.get_rank(binary.input[1]) != 0:
return self.none(node, inspect.currentframe().f_lineno)
squeeze = g.node_before(binary.input[0])
if squeeze is None:
return self.none(node, inspect.currentframe().f_lineno)
if len(squeeze.input) != 1:
return self.none(node, inspect.currentframe().f_lineno)
if not g.has_rank(squeeze.input[0]):
print("-------")
print(g.builder.get_debug_msg())
return self.none(node, inspect.currentframe().f_lineno)
if g.get_rank(squeeze.input[0]) != 1:
return self.none(node, inspect.currentframe().f_lineno)
return MatchResult(self, [squeeze, binary, node], self.apply, insert_at=node)
[docs]
def apply(
self,
g: "GraphBuilder", # noqa: F821
squeeze_node: NodeProto,
binary_node: NodeProto,
unsqueeze_node: NodeProto,
) -> List[NodeProto]:
new_name = g.unique_name(f"{self.__class__.__name__}_{binary_node.input[1]}")
new_nodes = [
g.make_node(
"Unsqueeze",
[binary_node.input[1], unsqueeze_node.input[1]],
[new_name],
name=f"{self.__class__.__name__}--{unsqueeze_node.name}",
doc_string=unsqueeze_node.doc_string,
),
g.make_node(
binary_node.op_type,
[squeeze_node.input[0], new_name],
[unsqueeze_node.output[0]],
name=f"{self.__class__.__name__}--{binary_node.name}",
doc_string=binary_node.doc_string,
),
]
return new_nodes