import inspect
from typing import List, Optional
from onnx import NodeProto
from ..patterns_api import MatchResult, PatternOptimization
[docs]
class SequenceConstructAtPattern(PatternOptimization):
"""
Replaces the sequence ``SequenceConstruct(x1, x2, ...)`` followed
by ``SequenceAt(seq, 0)``, ``SequenceAt(seq, 1)``, ...
Model with nodes to be fused:
.. gdot::
:script: DOT-SECTION
:process:
from experimental_experiment.doc import to_dot
import numpy as np
import ml_dtypes
import onnx
import onnx.helper as oh
import onnx.numpy_helper as onh
opset_imports = [
oh.make_opsetid("", 18),
]
inputs = []
outputs = []
nodes = []
initializers = []
sparse_initializers = []
functions = []
inputs.append(oh.make_tensor_value_info("X1", onnx.TensorProto.FLOAT, shape=("a", "b")))
inputs.append(oh.make_tensor_value_info("X2", onnx.TensorProto.FLOAT, shape=("c", "d")))
nodes.append(
oh.make_node(
"Constant",
[],
["i0"],
value=onh.from_array(np.array(0, dtype=np.int64), name="value"),
)
)
nodes.append(
oh.make_node(
"Constant",
[],
["i1"],
value=onh.from_array(np.array(1, dtype=np.int64), name="value"),
)
)
nodes.append(oh.make_node("SequenceConstruct", ["X1", "X2"], ["seq"]))
nodes.append(oh.make_node("SequenceAt", ["seq", "i0"], ["Y1"]))
nodes.append(oh.make_node("SequenceAt", ["seq", "i1"], ["Y2"]))
outputs.append(
oh.make_tensor_value_info("Y1", onnx.TensorProto.FLOAT, shape=("a", "b"))
)
outputs.append(
oh.make_tensor_value_info("Y2", onnx.TensorProto.FLOAT, shape=("c", "d"))
)
graph = oh.make_graph(
nodes,
"pattern",
inputs,
outputs,
initializers,
sparse_initializer=sparse_initializers,
)
model = oh.make_model(graph, functions=functions, opset_imports=opset_imports)
print("DOT-SECTION", to_dot(model))
Outcome of the fusion:
.. gdot::
:script: DOT-SECTION
:process:
from experimental_experiment.doc import to_dot
import numpy as np
import ml_dtypes
import onnx
import onnx.helper as oh
import onnx.numpy_helper as onh
opset_imports = [
oh.make_opsetid("", 18),
]
inputs = []
outputs = []
nodes = []
initializers = []
sparse_initializers = []
functions = []
inputs.append(oh.make_tensor_value_info("X1", onnx.TensorProto.FLOAT, shape=("a", "b")))
inputs.append(oh.make_tensor_value_info("X2", onnx.TensorProto.FLOAT, shape=("c", "d")))
nodes.append(oh.make_node("Identity", ["X1"], ["Y1"]))
nodes.append(oh.make_node("Identity", ["X2"], ["Y2"]))
outputs.append(
oh.make_tensor_value_info("Y1", onnx.TensorProto.FLOAT, shape=("a", "b"))
)
outputs.append(
oh.make_tensor_value_info("Y2", onnx.TensorProto.FLOAT, shape=("c", "d"))
)
graph = oh.make_graph(
nodes,
"pattern",
inputs,
outputs,
initializers,
sparse_initializer=sparse_initializers,
)
model = oh.make_model(graph, functions=functions, opset_imports=opset_imports)
print("DOT-SECTION", to_dot(model))
"""
[docs]
def match(
self,
g: "GraphBuilderPatternOptimization", # noqa: F821
node: NodeProto,
matched: List[MatchResult],
) -> Optional[MatchResult]:
if node.op_type != "SequenceConstruct" or node.domain != "":
return self.none()
next_nodes = g.next_nodes(node.output[0])
if len(next_nodes) != len(node.input):
return self.none(node, inspect.currentframe().f_lineno)
if any(n.op_type != "SequenceAt" for n in next_nodes):
return self.none(node, inspect.currentframe().f_lineno)
ats = [n.input[1] for n in next_nodes]
if any(not g.is_constant_scalar(a) for a in ats):
return self.none(node, inspect.currentframe().f_lineno)
cst = [g.get_constant_scalar(a) for a in ats]
if set(cst) != set(range(len(ats))):
return self.none(node, inspect.currentframe().f_lineno)
return MatchResult(self, [node, *next_nodes], self.apply, insert_at=node)
[docs]
def apply(
self,
g: "GraphBuilder", # noqa: F821
node_seq: NodeProto,
*node_ats: NodeProto,
) -> List[NodeProto]:
assert len(node_seq.input) == len(
node_ats
), f"Matching failed because len({node_seq.input}) != {len(node_ats)}"
new_nodes = []
for n in node_ats:
i = g.get_constant_scalar(n.input[1])
new_nodes.append(
g.make_node(
"Identity",
[node_seq.input[i]],
n.output,
name=f"{self.__class__.__name__}--{node_seq.name}",
)
)
return new_nodes