import inspect
from typing import List, Optional
import numpy as np
from onnx import NodeProto
from ..patterns_api import MatchResult, PatternOptimization
[docs]
class SliceSlicePattern(PatternOptimization):
"""
Merges consecutive slices if axis are disjoints.
Model with nodes to be fused:
.. gdot::
:script: DOT-SECTION
:process:
from experimental_experiment.doc import to_dot
import numpy as np
import ml_dtypes
import onnx
import onnx.helper as oh
import onnx.numpy_helper as onh
opset_imports = [
oh.make_opsetid("", 18),
]
inputs = []
outputs = []
nodes = []
initializers = []
sparse_initializers = []
functions = []
inputs.append(oh.make_tensor_value_info("X", onnx.TensorProto.FLOAT, shape=("a", "b")))
inputs.append(oh.make_tensor_value_info("one", onnx.TensorProto.INT64, shape=(1,)))
inputs.append(oh.make_tensor_value_info("zero", onnx.TensorProto.INT64, shape=(1,)))
nodes.append(
oh.make_node(
"Constant",
[],
["zero"],
value=onh.from_array(np.array([0], dtype=np.int64), name="value"),
)
)
nodes.append(
oh.make_node(
"Constant",
[],
["one"],
value=onh.from_array(np.array([1], dtype=np.int64), name="value"),
)
)
nodes.append(oh.make_node("Slice", ["X", "zero", "one", "zero"], ["x1"]))
nodes.append(oh.make_node("Slice", ["x1", "zero", "one", "one"], ["Y"]))
outputs.append(oh.make_tensor_value_info("Y", onnx.TensorProto.FLOAT, shape=("c", "d")))
graph = oh.make_graph(
nodes,
"pattern",
inputs,
outputs,
initializers,
sparse_initializer=sparse_initializers,
)
model = oh.make_model(graph, functions=functions, opset_imports=opset_imports)
print("DOT-SECTION", to_dot(model))
Outcome of the fusion:
.. gdot::
:script: DOT-SECTION
:process:
from experimental_experiment.doc import to_dot
import numpy as np
import ml_dtypes
import onnx
import onnx.helper as oh
import onnx.numpy_helper as onh
opset_imports = [
oh.make_opsetid("", 18),
]
inputs = []
outputs = []
nodes = []
initializers = []
sparse_initializers = []
functions = []
inputs.append(oh.make_tensor_value_info("X", onnx.TensorProto.FLOAT, shape=("a", "b")))
inputs.append(oh.make_tensor_value_info("one", onnx.TensorProto.INT64, shape=(1,)))
inputs.append(oh.make_tensor_value_info("zero", onnx.TensorProto.INT64, shape=(1,)))
nodes.append(
oh.make_node(
"Concat", ["zero", "zero"], ["SliceSlicePattern_zero_start"], axis=0
)
)
nodes.append(
oh.make_node("Concat", ["one", "one"], ["SliceSlicePattern_one_end"], axis=0)
)
nodes.append(
oh.make_node(
"Concat", ["zero", "one"], ["SliceSlicePattern_one_axis"], axis=0
)
)
nodes.append(
oh.make_node(
"Slice",
[
"X",
"SliceSlicePattern_zero_start",
"SliceSlicePattern_one_end",
"SliceSlicePattern_one_axis",
],
["Y"],
)
)
outputs.append(oh.make_tensor_value_info("Y", onnx.TensorProto.FLOAT, shape=("c", "d")))
graph = oh.make_graph(
nodes,
"pattern",
inputs,
outputs,
initializers,
sparse_initializer=sparse_initializers,
)
model = oh.make_model(graph, functions=functions, opset_imports=opset_imports)
print("DOT-SECTION", to_dot(model))
"""
[docs]
def match(
self,
g: "GraphBuilderPatternOptimization", # noqa: F821
node: NodeProto,
matched: List[MatchResult],
) -> Optional[MatchResult]:
if node.op_type != "Slice" or node.domain != "":
return self.none()
before = g.node_before(node.input[0])
if (
before is None
or g.is_used_more_than_once(node.input[0])
or before.op_type != "Slice"
or before.domain != ""
):
return self.none(node, inspect.currentframe().f_lineno)
axis2 = None if len(node.input) < 3 else node.input[3]
axis1 = None if len(before.input) < 3 else before.input[3]
if axis1 is None or axis2 is None:
return self.none(node, inspect.currentframe().f_lineno)
if not g.is_constant(axis1) or not g.is_constant(axis2):
return self.none(node, inspect.currentframe().f_lineno)
cst1 = g.get_computed_constant(axis1)
cst2 = g.get_computed_constant(axis2)
if cst1 is None or cst2 is None:
return self.none(node, inspect.currentframe().f_lineno)
set1 = set(map(int, cst1))
set2 = set(map(int, cst2))
if set1 & set2:
return self.none(node, inspect.currentframe().f_lineno)
return MatchResult(self, [before, node], self.apply, insert_at=node)
[docs]
def apply(
self,
g: "GraphBuilder", # noqa: F821
before: NodeProto,
node: NodeProto,
) -> List[NodeProto]:
# merges slices
new_start = g.unique_name(f"{self.__class__.__name__}_{node.input[1]}_start")
new_end = g.unique_name(f"{self.__class__.__name__}_{node.input[2]}_end")
new_axis = g.unique_name(f"{self.__class__.__name__}_{node.input[3]}_axis")
conc = [
g.make_node(
"Concat",
[before.input[1], node.input[1]],
[new_start],
axis=0,
name=f"{self.__class__.__name__}--{node.name}-start",
),
g.make_node(
"Concat",
[before.input[2], node.input[2]],
[new_end],
axis=0,
name=f"{self.__class__.__name__}--{node.name}-end",
),
g.make_node(
"Concat",
[before.input[3], node.input[3]],
[new_axis],
axis=0,
name=f"{self.__class__.__name__}--{node.name}-axis",
),
]
inputs = [before.input[0], new_start, new_end, new_axis]
if len(node.input) > 4 and len(before.input) > 4:
new_step = g.unique_name(f"{self.__class__.__name__}_{node.input[0]}_step")
conc.append(
g.make_node(
"Concat",
[before.input[4], node.input[4]],
[new_step],
axis=0,
name=f"{self.__class__.__name__}--{node.name}-step",
)
)
inputs.append(new_step)
elif len(node.input) > 4:
one = g.make_initializer(
"", np.array([1], dtype=np.int64), source="SliceSlicePattern.apply.step.1"
)
new_step = g.unique_name(f"{self.__class__.__name__}_{node.input[0]}_step")
conc.append(
g.make_node(
"Concat",
[one, node.input[4]],
[new_step],
axis=0,
name=f"{self.__class__.__name__}--{node.name}-step",
)
)
inputs.append(new_step)
elif len(before.input) > 4:
one = g.make_initializer(
"", np.array([1], dtype=np.int64), source="SliceSlicePattern.apply.step.2"
)
new_step = g.unique_name(f"{self.__class__.__name__}_{node.input[0]}_step")
conc.append(
g.make_node(
"Concat",
[before.input[4], one],
[new_step],
axis=0,
name=f"{self.__class__.__name__}--{node.name}-step",
)
)
inputs.append(new_step)
node = g.make_node(
"Slice",
inputs,
node.output,
name=f"{self.__class__.__name__}--{node.name}",
)
return [*conc, node]