Source code for onnx_extended.tools.einsum.einsum_impl

from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
import numpy
from .einsum_impl_classes import EinsumSubOp, GraphEinsumSubOp


[docs] def analyse_einsum_equation( equation: str, ) -> Tuple[str, numpy.ndarray, List[int], List[Optional[Dict[str, List[int]]]]]: """ Analyses an einsum equation. :param equation: :func:`numpy.einsum` equation :return: four results, list of letters, a matrix (see below), lengths of each components, duplicates The returned a matrix is defined as follows: .. math:: m_{ij}=\\left\\{\\begin{array}{ll}-1 & \\text{if letter j is involved in input i} \\\\ p & \\text{p is position of letter j in equation i} \\end{array}\\right. """ spl = equation.strip(" ,").split("->") if len(spl) != 2 or not spl[1] or not spl[0]: raise NotImplementedError( "The function only implements the case when there are " "two sides in the equation: %r." % equation ) inputs = list(map(lambda s: s.strip(), spl[0].split(","))) output = spl[1] all_letters = set(inputs[0]) # Set of letters for inp in inputs[1:]: all_letters |= set(inp) letters = list(sorted(all_letters)) for c in letters: if not (("a" <= c <= "z") or ("A" <= c <= "Z")): raise ValueError( "Equation %r must only contain lower or upper letters " "but %r is not." % (equation, c) ) rev = {c: i for i, c in enumerate(letters)} for c in output: assert ( c in letters ), f"Output contains one unexpected letter {c!r} in equation {equation!r}, letters={letters!r}." mat = numpy.full((len(inputs) + 1, len(letters)), -1, dtype=numpy.int8) for i, inp in enumerate(inputs): for k, c in enumerate(inp): mat[i, rev[c]] = k for k, c in enumerate(output): mat[len(inputs), rev[c]] = k lengths = [len(inp) for inp in inputs] lengths.append(len(output)) # Look for duplicates duplicates: List[Optional[Dict[str, List[int]]]] = [] for inp in inputs + [output]: if len(inp) == len(set(inp)): duplicates.append(None) continue # There is some duplicates. counts: Dict[str, List[int]] = {} for i, c in enumerate(inp): if c in counts: counts[c].append(i) else: counts[c] = [i] duplicates.append(counts) return "".join(letters), mat, lengths, duplicates
[docs] def decompose_einsum_equation( equation: str, *shapes: List[Tuple[int, ...]], strategy: str = "simple", clean: bool = False, verbose: bool = False, ) -> GraphEinsumSubOp: """ Decomposes an equation used in :func:`numpy.einsum` knowing the input shapes. It returns a sequence of operations to do to compute the results. :param equation: a string :param shapes: sequence of input shapes :param strategy: there are different way to decompose the equation, this parameters defines the way to do it (see below) :param clean: clean the unnecessary node in the graph :param verbose: verbosity :return: instance of :class:`GraphEinsumSubOp <onnx_extended.tools.einsum.einsum_impl_classes.GraphEinsumSubOp>` About *strategy*: * `'simple'`: align all dimensions in the alphabetical order, some generic matrix multiplication remains implemented with :func:`numpy.einsum` but only with two matrices aligned on the same dimension (see :func:`numpy_extended_dot <onnx_extended.tools.einsum.einsum_impl_ext.numpy_extended_dot>`) * `'numpy'`: same as `simple` but the decomposition does not use :func:`numpy.einsum` anymore but only multiplication or matrix multiplication merged into a single operator called *batch_dot* (see :func:`numpy_extended_dot_matrix <onnx_extended.tools.einsum.einsum_impl_ext.numpy_extended_dot_matrix>`) Available operations: *expand_dims*, *transpose*, *matmul*, *reduce_sum*, *id*, *squeeze*, *diagonal*. It analyses an equation and produces a graph where nodes are instance of class :class:`EinsumSubOp <onnx_extended.tools.einsum.einsum_impl_classes.EinsumSubOp>`. .. runpython:: :showcode: from onnx_extended.tools.einsum import decompose_einsum_equation seq = decompose_einsum_equation("bac,cd,def->ebc") for op in seq: print(op) It can be better displayed as the following. .. gdot:: :script: DOT-SECTION :process: from onnx_extended.tools.einsum import decompose_einsum_equation seq = decompose_einsum_equation( "bac,cd,def->ebc", (2, 2, 2), (2, 2), (2, 2, 2)) print("DOT-SECTION", seq.to_dot()) """ if shapes: for sh in shapes: if not isinstance(sh, tuple): raise TypeError(f"All shapes must be tuples for {sh!r} is not.") if strategy in ("simple", "numpy"): op_matmul = {"simple": "matmul", "numpy": "batch_dot"} graph = _decompose_einsum_equation_simple( equation, *shapes, verbose=verbose, op_matmul=op_matmul[strategy] ) else: raise ValueError(f"Unknown strategy {strategy!r}.") # Last step: clean unused nodes. if clean: last_node = graph.last_added_op assert isinstance(last_node, EinsumSubOp) graph.append(EinsumSubOp(last_node.full_dim, "id", last_node)) graph.mark_last_node() graph.simplify_mm_nodes(verbose=verbose) graph.remove_duplicate_transpose(verbose=verbose) graph.clean_unused_nodes(verbose=verbose) else: graph.mark_last_node() return graph
[docs] def apply_einsum_sequence( seq: List[numpy.ndarray], *inputs: List[EinsumSubOp], verbose: bool = False, **kwargs: Dict[str, Any], ) -> numpy.ndarray: """ Applies a sequence of operations on a list of inputs. The sequence of operations is produced by function :func:`decompose_einsum_equation`. :param seq: sequence of operations :param inputs: inputs :param verbose: verbosity :param kwargs: additional parameters, see `apply_sequence` in :class:`GraphEinsumSubOp <onnx_extended.tools.einsum.einsum_impl_classes.GraphEinsumSubOp>` :return: output .. runpython:: :showcode: import numpy from onnx_extended.tools.einsum import ( decompose_einsum_equation, apply_einsum_sequence) m1 = numpy.arange(2 * 2 * 2).reshape((2, 2, 2)) + 10 m2 = numpy.arange(4).reshape((2, 2)) + 100 m3 = numpy.arange(8).reshape((2, 2, 2)) + 1000 seq = decompose_einsum_equation("bac,cd,def->ebc") res = apply_einsum_sequence(seq, m1, m2, m3) print(res) """ return seq.apply_sequence(*inputs, verbose=verbose, **kwargs)
def is_transpose_identity(perm: Tuple[int, ...]) -> bool: """ Tells if the permutation *perm* does nothing (itentity). :param perm: permutation :return: boolean """ return list(perm) == list(range(len(perm))) def _basic_verification( lengths: List[int], shapes: List[Tuple[int, ...]], equation: str ): assert len(lengths) - 1 == len( shapes ), "Equation %r has %d inputs but %d shapes are given." "" % ( equation, len(lengths), len(shapes), ) for i, (le, sh) in enumerate(zip(lengths, shapes)): assert le == len( sh ), "Inputs %d has %d dimensions but shapes %r has %d " " in equation %r." % ( i, le, sh, len(sh), equation, ) def _apply_transpose_reshape( op: Union[int, EinsumSubOp], row: str ) -> Iterable[EinsumSubOp]: """ Put all dimensions in the same order. :param op: integer (for one input) or an operator :param row: letter involved in this input (as a vector of binaries) :return: last created operator """ axes = [] p = 0 perm = [] for i, r in enumerate(row): if r == -1: axes.append((p, i)) else: p += 1 perm.append((r, i)) op = EinsumSubOp(len(row), "expand_dims", op, axes=tuple(axes)) yield op perm.sort() p = 0 new_perm = numpy.arange(len(row)) for i, r in enumerate(row): if r == -1: continue new_perm[perm[p][1]] = i p += 1 if not is_transpose_identity(new_perm): op = EinsumSubOp(len(row), "transpose", op, perm=tuple(new_perm)) yield op def _apply_squeeze_transpose( op: Union[int, EinsumSubOp], row_last: str, row_output: List[int] ) -> Iterable[EinsumSubOp]: """ Puts output dimension in the expected order. """ perm = [] sq = [] for i, d in enumerate(row_output): if d == -1: sq.append(i) else: perm.append((d, i)) perm.sort() new_perm = numpy.arange(len(row_last)) p = 0 for i, d in enumerate(row_output): if d == -1: continue new_perm[i] = perm[p][1] p += 1 perm = [p[1] for p in perm] if not is_transpose_identity(new_perm): op = EinsumSubOp(len(row_last), "transpose", op, perm=tuple(new_perm)) yield op if sq: op = EinsumSubOp(len(row_last), "squeeze", op, axes=tuple(sq)) yield op def _apply_einsum_matmul( fd, op1, op2, axes, left, right, ndim, op_matmul, row1, row2, verbose=False ) -> Iterable[EinsumSubOp]: """ Decomposes the generic matrix multiplication into numpy operations depending on the operator to use for matrix multiplication *op_matmul* (see :func:`decompose_einsum_equation`). """ allowed = {"matmul", "batch_dot", "dot"} assert ( op_matmul in allowed ), f"Unknown operator op_matmul={op_matmul!r} not in {allowed!r}." if op_matmul == "matmul": if verbose: print(f" -- MATMUL -> matmul axes={axes!r} left={left!r} right={right!r}") yield EinsumSubOp( fd, "matmul", op1, op2, axes=axes, left=left, right=right, ndim=ndim ) elif len(axes) == 0 and not (set(left) & set(right)): if verbose: print(f" -- MATMUL -> mul axes={axes!r} left={left!r} right={right!r}") yield EinsumSubOp(fd, "mul", op1, op2) elif not (set(axes) & set(left)) and not (set(axes) & set(right)): # No intersection between axes and right: matrix multiplication if verbose: print( " -- MATMUL -> batch_dot axes=%r left=%r right=%r" "" % (axes, left, right) ) all_axes = set(left) | set(right) | set(axes) common_axes = list(set(left) & set(right)) for i in range(ndim): if i not in all_axes: common_axes.append(i) common_axes.sort() # ReduceSum* has_dim = set(i for i in range(len(row1)) if row1[i] >= 0) right_no_left = (set(right) & has_dim) - (set(right) & (set(left) | set(axes))) if right_no_left: if verbose: print(f" -- MATMUL reduce1 has_dim={has_dim!r} axes={right_no_left!r}") op1 = EinsumSubOp( fd, "reduce_sum_mm", op1, op2, axes=tuple(sorted(right_no_left)) ) yield op1 has_dim = set(i for i in range(len(row2)) if row2[i] >= 0) left_no_right = (set(left) & has_dim) - (set(left) & (set(right) | set(axes))) if left_no_right: if verbose: print(f" -- MATMUL reduce2 has_dim={has_dim!r} axes={left_no_right!r}") op2 = EinsumSubOp(fd, "reduce_sum", op2, axes=tuple(sorted(left_no_right))) yield op2 # Transpose i_axes = [ (-1 if i in common_axes else (1 if i in axes else 0), i) for i in range(ndim) ] i_axes.sort() perm = [_[1] for _ in i_axes] perm_left = [i for i in range(len(perm)) if perm[i] in left] perm_right = [i for i in range(len(perm)) if perm[i] in right] if not is_transpose_identity(perm): op1 = EinsumSubOp(fd, "transpose_mm", op1, op2, perm=tuple(perm)) yield op1 op2 = EinsumSubOp(fd, "transpose", op2, perm=tuple(perm)) yield op2 # Reshape all_axes = list(range(0, ndim)) new_axes = all_axes[-len(axes) :] if len(axes) > 0 else [] new_common_axes = all_axes[: len(common_axes)] not_in_both = [] for i in range(0, ndim): if i not in left and i not in right and i not in common_axes: not_in_both.append(i) op = EinsumSubOp( fd, "batch_dot", op1, op2, batch_axes=tuple(new_common_axes), keep_axes=None, sum_axes=tuple(new_axes), left=tuple(perm_left), right=tuple(perm_right), ndim=ndim, ) yield op # Transpose again ordered_axes = ( common_axes + list(i for i in left if i not in right) + list(i for i in right if i not in left) + not_in_both ) rev_perm = [(a, i) for i, a in enumerate(ordered_axes)] rev_perm.sort() rev_perm = [p[1] for p in rev_perm] if not is_transpose_identity(rev_perm): op_unused = EinsumSubOp(fd, "transpose_mm", op1, op, perm=tuple(rev_perm)) yield op_unused op = EinsumSubOp(fd, "transpose", op, perm=tuple(rev_perm)) yield op else: raise NotImplementedError( "axes and right or left have axes in common, " "axes=%r left=%r right=%r ndim=%r." % (axes, left, right, ndim) ) def _decompose_einsum_equation_simple( equation: str, *shapes: List[Tuple[int, ...]], verbose: bool = False, op_matmul: str = "matmul", ) -> GraphEinsumSubOp: """ Applies strategy `simple`, `numpy` defined in by function :func:`decompose_einsum_equation`. :param equation: equation :param shapes: input shapes :param verbose: verbosity :param op_matmul: which operator to use for matrix multiplication, a single operator *matmul*, or *batch_dot* with *transposes*, *reduce_sum*, or just *dot* """ letters, mat, lengths, duplicates = analyse_einsum_equation(equation) assert ( len(letters) == mat.shape[1] ), f"Unexpected number of letters {letters!r}, shape={mat.shape!r}." if not shapes: shapes = [(2,) * le for le in lengths[:-1]] _basic_verification(lengths, shapes, equation) # last_row, current_row (row = shape) rows = numpy.full((2, mat.shape[1]), -1) graph = GraphEinsumSubOp(letters, mat, lengths, duplicates) fd = mat.shape[1] if verbose: print(f"EQUATION={equation!r}") print(f"LETTERS={letters!r}", f"LENGTHS={lengths!r}") print(f"DUPLICATES={duplicates!r}") for i, sh in enumerate(shapes): if verbose: print() print("######### ROW %d shape=%r row=%r" % (i, sh, rows[1, :])) graph.append(i) # Input matrix aligned to the same dimensions. op = EinsumSubOp(fd, "id", i) op.compute_output_row(rows[1, :], mat[i, :], verbose=verbose) marked = graph.append(op) duplicate = duplicates[i] if duplicate is not None: # Diagonal diag = [] for _, v in duplicate.items(): if len(v) == 1: continue diag.append((v[0], tuple(v))) op = EinsumSubOp(fd, "diagonal", op, diag=diag) op.compute_output_row(rows[1, :], mat[i, :], verbose=verbose) tr_row = rows[1, :] marked = graph.append(op) else: diag = None tr_row = mat[i] for op in _apply_transpose_reshape(op, tr_row): op.compute_output_row(rows[1, :], verbose=verbose) marked = graph.append(op) # Reduction? (a dimension not used later) red = [] for d in range(0, mat.shape[1]): if mat[i + 1 :, d].max() == -1 and rows[1, d] != -1 and rows[0, d] == -1: red.append(d) if red: if verbose: print(" -- REDUCE1 row=%d axes=%r" % (i, red)) print(mat) print(" -") print(rows) op = EinsumSubOp(fd, "reduce_sum", graph.last_added_op, axes=tuple(red)) op.compute_output_row(rows[1, :], verbose=verbose) marked = graph.append(op) if graph.last_op is not None: # Matrix multiplication? common_dims = [] left = [] right = [] for d in range(0, mat.shape[1]): if rows[:, d].min() >= 0: if mat[i + 1 :, d].max() >= 0: left.append(d) right.append(d) else: common_dims.append(d) else: if rows[0, d] >= 0: left.append(d) if rows[1, d] >= 0: right.append(d) if verbose: print(f" -- MATMUL common_dims={common_dims!r}") print(rows) for iop in _apply_einsum_matmul( fd, graph.last_op, op, axes=tuple(common_dims), left=tuple(left), right=tuple(right), ndim=rows.shape[1], op_matmul=op_matmul, row1=rows[0, :], row2=rows[1, :], verbose=verbose, ): op = iop op.compute_output_row(rows[0, :], rows[1, :], ab=True, verbose=verbose) marked = graph.append(op) # End graph.mark(i, marked) rows[0, :] = rows[1, :] # Final output if verbose: print() print(f"######### FIN row={rows[1, :]!r}") if mat[len(shapes), :].max() >= 0: rows[1, :] = mat[len(shapes), :] red = [] for d in range(0, mat.shape[1]): if rows[0, d] > 0 and rows[1, d] == -1: red.append(d) elif rows[0, d] == -1 and rows[1, d] >= 0: raise RuntimeError( "Issue in equation %r, variable %d, last_result is %r, " "output is %r." % (equation, d, rows[0, :], rows[1, :]) ) if red: if verbose: print(f"-- REDUCE2 axes={red!r}") print(mat) op = EinsumSubOp(fd, "reduce_sum", op, axes=tuple(red)) graph.append(op) op.compute_output_row(rows[1, :], verbose=verbose) # Removes empty axes. for op in _apply_squeeze_transpose(op, rows[1, :], mat[len(shapes), :]): op.compute_output_row(rows[1, :], verbose=verbose) graph.append(op) return graph