Source code for experimental_experiment.gradient.loss_helper

from collections import OrderedDict
from typing import Any, Dict, List, Optional, Set, Tuple
import numpy
from onnx import ModelProto
from onnx.mapping import TENSOR_TYPE_TO_NP_TYPE
from onnx.numpy_helper import to_array, from_array
from onnx.helper import (
    make_node,
    make_graph,
    make_model,
    make_tensor_value_info,
    set_model_props,
)
from onnx import TensorProto


def _unique_name(existing_names: Set[str], name: str) -> str:
    """
    Returns a name different from any name in *existing_names*.

    :param existing_names: set of names
    :param name: current
    :return: unique name
    """
    if name not in existing_names:
        existing_names.add(name)
        return name
    name0 = name
    i = 2
    while name in existing_names:
        name = "%s_%d" % (name0, i)
        i += 1
    existing_names.add(name)
    return name


def _loss_l1(
    existing_names: List[str],
    elem: int,
    shape: Tuple[int, ...],
    output_name: str,
    label_name: str,
    weight_name: str,
    loss_name: str,
):
    """
    Implements loss l1.
    """
    diff_name = _unique_name(existing_names, "loss_diff")
    diff2_name = _unique_name(existing_names, "loss_diff")
    nodes = [
        make_node("Sub", [output_name, label_name], [diff_name]),
        make_node("Abs", [diff_name], [diff2_name]),
    ]
    if weight_name is not None:
        res_name = _unique_name(existing_names, "loss_diff_weight")
        nodes.append(make_node("Mul", [diff2_name, weight_name], [res_name]))
    else:
        res_name = diff2_name
    nodes.append(make_node("ReduceSum", [res_name], [loss_name]))

    inputs = [make_tensor_value_info(label_name, elem, shape)]
    if weight_name is not None:
        inputs.append(make_tensor_value_info(weight_name, elem, [shape[0]]))
    return ([], inputs, nodes, [make_tensor_value_info(loss_name, elem, [1, 1])])


def _loss_l2(
    existing_names: List[str],
    elem: int,
    shape: Tuple[int, ...],
    output_name: str,
    label_name: str,
    weight_name: str,
    loss_name: str,
):
    """
    Implements loss l2.
    """
    diff_name = _unique_name(existing_names, "loss_diff")
    diff2_name = _unique_name(existing_names, "loss_diff")
    nodes = [
        make_node("Sub", [output_name, label_name], [diff_name]),
        make_node("Mul", [diff_name, diff_name], [diff2_name]),
    ]
    if weight_name is not None:
        res_name = _unique_name(existing_names, "loss_diff_weight")
        nodes.append(make_node("Mul", [diff2_name, weight_name], [res_name]))
    else:
        res_name = diff2_name
    nodes.append(make_node("ReduceSum", [res_name], [loss_name]))

    inputs = [make_tensor_value_info(label_name, elem, shape)]
    if weight_name is not None:
        inputs.append(make_tensor_value_info(weight_name, elem, [shape[0]]))
    return ([], inputs, nodes, [make_tensor_value_info(loss_name, elem, [1, 1])])


def _loss_elastic(
    existing_names: List[str],
    elem: int,
    shape: Tuple[int, ...],
    output_name: str,
    label_name: str,
    weight_name: str,
    loss_name: str,
    l1_weight: float = 0.5,
    l2_weight: float = 0.5,
):
    """
    Implements mixture of losses l1 and l2.
    """
    l1_name = _unique_name(existing_names, "l1_name")
    l2_name = _unique_name(existing_names, "l2_name")
    dtype = TENSOR_TYPE_TO_NP_TYPE[elem]
    onx_l1_weight = from_array(numpy.array([l1_weight], dtype=dtype), name=l1_name)
    onx_l2_weight = from_array(numpy.array([l2_weight], dtype=dtype), name=l2_name)
    inits = [onx_l1_weight, onx_l2_weight]

    diff_name = _unique_name(existing_names, "loss_diff")
    diff1_name = _unique_name(existing_names, "loss_l1")
    diff2_name = _unique_name(existing_names, "loss_l2")
    wl1_name = _unique_name(existing_names, "loss_l1")
    wl2_name = _unique_name(existing_names, "loss_l2")
    final_loss = _unique_name(existing_names, "final_loss")
    nodes = [
        make_node("Sub", [output_name, label_name], [diff_name]),
        make_node("Mul", [diff_name, diff_name], [diff2_name]),
        make_node("Abs", [diff_name], [diff1_name]),
        make_node("Mul", [diff1_name, l1_name], [wl1_name]),
        make_node("Mul", [diff2_name, l2_name], [wl2_name]),
        make_node("Add", [wl1_name, wl2_name], [final_loss]),
    ]
    if weight_name is not None:
        res_name = _unique_name(existing_names, "loss_diff_weight")
        nodes.append(make_node("Mul", [final_loss, weight_name], [res_name]))
    else:
        res_name = final_loss
    nodes.append(make_node("ReduceSum", [res_name], [loss_name]))

    inputs = [make_tensor_value_info(label_name, elem, shape)]
    if weight_name is not None:
        inputs.append(make_tensor_value_info(weight_name, elem, [shape[0]]))
    return (inits, inputs, nodes, [make_tensor_value_info(loss_name, elem, [1, 1])])


def _loss_log(
    existing_names: List[str],
    elem: int,
    shape: Tuple[int, ...],
    output_name: str,
    label_name: str,
    weight_name: str,
    loss_name: str,
    eps: float = 1e-6,
):
    """
    This only works for a binary classification.
    The log loss is `'log(yt, yp) = (1-yt)\\log(1-yp) - yt\\log(yp)`,
    this only works for a binary classification where *yp* is the
    predicted probability, *yt* is the expected probability.
    *yt* is expected to be binary, *yp* is a matrix with two
    columns, the sum on every line is 1.
    Parameter *eps* is used to avoid computing *log(0)*.
    """
    if output_name == "output_label":
        raise RuntimeError(  # pragma: no cover
            f"output_name={output_name!r}, log loss does not work on labels."
        )
    dtype = TENSOR_TYPE_TO_NP_TYPE[elem]
    one_name = _unique_name(existing_names, "one_name")
    eps_name = _unique_name(existing_names, "eps_name")
    eps1_name = _unique_name(existing_names, "eps1_name")
    axes_name = _unique_name(existing_names, "axes_name")

    eps_init = from_array(numpy.array([eps], dtype=dtype), name=eps_name)
    one_init = from_array(numpy.array([1], dtype=dtype), name=one_name)
    eps1_init = from_array(numpy.array([1 - eps], dtype=dtype), name=eps1_name)
    axes_init = from_array(numpy.array([1], dtype=numpy.int64), name=axes_name)

    clip_name = _unique_name(existing_names, "clip_name")
    clip_red_name = _unique_name(existing_names, "clip_red_name")
    new_output_name = _unique_name(existing_names, "new_output_name")
    cast_name = _unique_name(existing_names, "cast_name")
    log_name = _unique_name(existing_names, "log_name")
    subl_name = _unique_name(existing_names, "subl_name")
    conc_name = _unique_name(existing_names, "conc_name")
    mul_name = _unique_name(existing_names, "mul_name")
    like_name = _unique_name(existing_names, "like_name")

    nodes = [
        make_node("Clip", [output_name, eps_name, eps1_name], [clip_name]),
        make_node("ReduceSum", [clip_name, axes_name], [clip_red_name], keepdims=1),
        make_node("Div", [clip_name, clip_red_name], [new_output_name]),
        make_node("Log", [new_output_name], [log_name]),
        make_node("Cast", [label_name], [cast_name], to=elem),
        make_node("Sub", [one_name, cast_name], [subl_name]),
        make_node("Concat", [subl_name, cast_name], [conc_name], axis=1),
        make_node("Mul", [log_name, conc_name], [mul_name]),
        make_node("ReduceSum", [mul_name, axes_name], [like_name], keepdims=1),
    ]

    inputs = [make_tensor_value_info(label_name, TensorProto.INT64, shape)]

    if weight_name is not None:
        inputs.append(make_tensor_value_info(weight_name, elem, [shape[0]]))
        likew_name = _unique_name(existing_names, "likew_name")
        nodes.append(make_node("Mul", [like_name, weight_name], [likew_name]))
        like_name = likew_name

    shape_name = _unique_name(existing_names, "shape_name")
    onx_shape = from_array(numpy.array([1, 1], dtype=numpy.int64), name=shape_name)
    reduced_loss = _unique_name(existing_names, "reduced_loss")
    neg_reduced_loss = _unique_name(existing_names, "neg_reduced_loss")
    nodes.extend(
        [
            make_node("ReduceMean", [like_name], [reduced_loss]),
            make_node("Neg", [reduced_loss], [neg_reduced_loss]),
            make_node("Reshape", [neg_reduced_loss, shape_name], [loss_name]),
        ]
    )

    return (
        [eps_init, eps1_init, one_init, axes_init, onx_shape],
        inputs,
        nodes,
        [make_tensor_value_info(loss_name, elem, [1, 1])],
    )


[docs] def penalty_loss_onnx( name: str, dtype: Any, l1: Optional[float] = None, l2: Optional[float] = None, existing_names: Optional[List[str]] = None, ): """ Returns onnx nodes to compute :math:`|w| \\alpha + w^2 \\beta` where :math:`\\alpha=l1` and :math:`\\beta=l2`. :param name: name of weights :param dtype: numpy dtype :param l1: coefficient for L1 norm :param l2: coefficient for L2 norm :param existing_names: names already taken in the ONNX graph :return: initializer, nodes """ suffix = name cst_shape = _unique_name(existing_names, f"shape_{suffix}") new_name = _unique_name(existing_names, f"reshaped_{suffix}") inits = [from_array(numpy.array([-1], dtype=numpy.int64), name=cst_shape)] nodes = [make_node("Reshape", [name, cst_shape], [new_name])] name = new_name if l1 is None or l1 == 0: if l2 is None or l2 == 0: raise ValueError( # pragma: no cover f"l1 and l2 cannot be null or None at the same time, name={name!r}." ) l2_name = _unique_name(existing_names, f"l2_weight_{suffix}") inits.extend([from_array(numpy.array([l2], dtype=dtype), name=l2_name)]) mul_name = _unique_name(existing_names, f"reduced0_{suffix}") red_name = _unique_name(existing_names, f"reduced_{suffix}") pen_name = _unique_name(existing_names, f"penalty_{suffix}") nodes.extend( [ make_node("Mul", [name, name], [mul_name]), make_node("ReduceSum", [mul_name], [red_name]), make_node("Mul", [red_name, l2_name], [pen_name]), ] ) return inits, nodes if l2 is None or l2 == 0: l1_name = _unique_name(existing_names, f"l1_weight_{suffix}") inits.extend([from_array(numpy.array([l1], dtype=dtype), name=l1_name)]) red_name = _unique_name(existing_names, f"reduced_{suffix}") abs_name = _unique_name(existing_names, f"absolute_{suffix}") pen_name = _unique_name(existing_names, f"penalty_{suffix}") nodes.extend( [ make_node("Abs", [name], [abs_name]), make_node("ReduceSum", [abs_name], [red_name]), make_node("Mul", [red_name, l1_name], [pen_name]), ] ) return inits, nodes l1_name = _unique_name(existing_names, f"l1_weight_{suffix}") l2_name = _unique_name(existing_names, f"l2_weight_{suffix}") inits.extend( [ from_array(numpy.array([l1], dtype=dtype), name=l1_name), from_array(numpy.array([l2], dtype=dtype), name=l2_name), ] ) red_name1 = _unique_name(existing_names, f"reduced1_{suffix}") mul_name = _unique_name(existing_names, f"reducedm_{suffix}") red_name2 = _unique_name(existing_names, f"reduced2_{suffix}") abs_name = _unique_name(existing_names, f"absolute_{suffix}") pen_name1 = _unique_name(existing_names, f"penalty1_{suffix}") pen_name2 = _unique_name(existing_names, f"penalty2_{suffix}") pen_name = _unique_name(existing_names, f"penalty_{suffix}") nodes.extend( [ make_node("Mul", [name, name], [mul_name]), make_node("ReduceSum", [mul_name], [red_name2]), make_node("Mul", [red_name2, l2_name], [pen_name2]), make_node("Abs", [name], [abs_name]), make_node("ReduceSum", [abs_name], [red_name1]), make_node("Mul", [red_name1, l1_name], [pen_name1]), make_node("Add", [pen_name1, pen_name2], [pen_name]), ] ) return inits, nodes
[docs] def get_train_initializer(onx: ModelProto): """ Returns the list of initializers to train. :return: dictionary `{name: (value, tensor)}` The function walk through the list of initializers and returns all tensors with elements from types float or double. """ res = OrderedDict() for init in onx.graph.initializer: if init.data_type in ( TensorProto.FLOAT16, # pylint: disable=E1101 TensorProto.FLOAT, # pylint: disable=E1101 TensorProto.DOUBLE, ): # pylint: disable=E1101 res[init.name] = (to_array(init), init) return res
[docs] def add_loss_output( onx: ModelProto, score_name: str = "squared_error", loss_name: str = "loss", label_name: str = "label", weight_name: Optional[str] = None, penalty: Optional[Dict[str, float]] = None, output_index: Optional[int] = None, **kwargs: Optional[Dict[str, Any]], ) -> ModelProto: """ Modifies an ONNX graph to add operators to score and allow training. :param onx: onx graph :param score_name: name of the score :param loss_name: name of the output loss :param label_name: name of the label input :param weight_name: None or any value to consider weight while computing loss :param penalty: dictionary similar to the following one `{ weight_name: {'l1': alpha, 'l2': beta} }` or `{ weight_name: beta}`, it adds a L1 and/or L2 penalty to one input or initializer, penalty = :math:`|w| \\alpha + w^2 \\beta` :param output_index: the output used to compute the loss, if None, the function assumes there is only one output, it must be specified if there are more than 1, it can be an integer or a string (output name) :param kwargs: additional arguments for losses (see below) :return: modified graph Possible values for *score_name*: * `'squared_error'` or `'l2`': :math:`\\sum_i{(f(x_i)-y_i)^2}` or :math:`\\sum_i{w_i (f(x_i)-y_i)^2}` if *weight_name* is not None * `'absolute_error'` or `'l1`': :math:`\\sum_i{|f(x_i)-y_i|}` or :math:`\\sum_i{w_i |f(x_i)-y_i|}` if *weight_name* is not None * `'elastic'`: mixture of losses, kwargs must define *l1_weight* and *l2_weight*, undefined, default value are 0.5 * `'log'`: log loss :math:`(1-yt)\\log(1-yp) - yt\\log(yp)`, this only works for a binary classification where *yp* is the predicted probability, *yt* is the expected probability. *yt* is expected to be binary, *yp* is a matrix with two columns, the sum on every line is 1. Next example shows the loss with L1 and L2 loss. .. gdot:: :script: DOT-SECTION import numpy from onnx.defs import onnx_opset_version from sklearn.datasets import make_regression from sklearn.model_selection import train_test_split from sklearn.linear_model import LinearRegression from skl2onnx import to_onnx from onnx_array_api.plotting.dot_plot import to_dot from experimental_experiment.gradient.loss_helper import add_loss_output opset = onnx_opset_version() - 2 X, y = make_regression(100, n_features=10, bias=2, random_state=0) X = X.astype(numpy.float32) y = y.astype(numpy.float32) w = (numpy.random.rand(y.shape[0]) + 1).astype(X.dtype) X_train, _, y_train, __, w_train, ___ = train_test_split(X, y, w) reg = LinearRegression() reg.fit(X_train, y_train, sample_weight=w_train) reg.coef_ = reg.coef_.reshape((1, -1)) onx = to_onnx(reg, X_train, target_opset=opset, black_op={"LinearRegressor"}) onx_loss = add_loss_output( onx, weight_name="weight", score_name="elastic", l1_weight=0.1, l2_weight=0.9 ) print("DOT-SECTION", to_dot(onx_loss)) Next example shows how to add a L2 loss with L1 and L2 penalties on the coefficients. .. gdot:: :script: DOT-SECTION import numpy from onnx.defs import onnx_opset_version from sklearn.datasets import make_regression from sklearn.model_selection import train_test_split from sklearn.linear_model import LinearRegression from skl2onnx import to_onnx from onnx_array_api.plotting.dot_plot import to_dot from experimental_experiment.gradient.loss_helper import add_loss_output opset = onnx_opset_version() - 2 X, y = make_regression(100, n_features=10, bias=2, random_state=0) X = X.astype(numpy.float32) y = y.astype(numpy.float32) w = (numpy.random.rand(y.shape[0]) + 1).astype(X.dtype) X_train, _, y_train, __, w_train, ___ = train_test_split(X, y, w) reg = LinearRegression() reg.fit(X_train, y_train, sample_weight=w_train) reg.coef_ = reg.coef_.reshape((1, -1)) onx = to_onnx(reg, X_train, target_opset=opset, black_op={"LinearRegressor"}) onx_loss = add_loss_output( onx, weight_name="weight", score_name="elastic", penalty={"coef": {"l1": 0.5, "l2": 0.5}, "intercept": {"l1": 0.5, "l2": 0.5}}, ) print("DOT-SECTION", to_dot(onx_loss)) """ from onnx_array_api.graph_api import GraphBuilder # rename every intermediate output call label def _replace(ens): for i in range(len(ens)): # pylint: disable=C0200 if ens[i] == "label": ens[i] = "_label_" for node in onx.graph.node: if "_label_" in node.input or "_label_" in node.output: raise RuntimeError( # pragma: no cover "One intermediate result contains '_label_'. " "It should be removed manually.\n%r" % node ) _replace(node.input) _replace(node.output) if output_index is None: if len(onx.graph.output) != 1: raise ValueError( # pragma: no cover "Unable to guess the output to compare to the " "expacted labels among %r." % ([o.name for o in onx.graph.output]) ) outputs = onx.graph.output output_index = 0 elif isinstance(output_index, int): outputs = [onx.graph.output[output_index]] elif isinstance(output_index, str): outputs = [(i, o) for i, o in enumerate(onx.graph.output) if o.name == output_index] if len(outputs) != 1: raise ValueError( # pragma: no cover "Unable to find output %r in %r." % (output_index, [o.name for o in onx.graph.output]) ) output_index = outputs[0][0] outputs = [outputs[0][1]] else: raise TypeError( # pragma: no cover f"output_index must be an integer or a str not {type(output_index)!r}." ) existing_names = [] for node in onx.graph.node: existing_names.extend(node.output) existing_names.extend(node.input) existing_names = set(existing_names) output_onx = onx.graph.output[output_index] output_name = output_onx.name elem = output_onx.type.tensor_type.elem_type if elem == 0: raise TypeError( # pragma: no cover f"Unable to guess input tensor type from {output_onx!r}." ) shape = [] for d in output_onx.type.tensor_type.shape.dim: shape.append(d.dim_value if d.dim_value > 0 else None) if score_name in ("squared_error", "l2"): inits, inputs, nodes, outputs = _loss_l2( existing_names, elem, shape, output_name, label_name, weight_name, loss_name ) elif score_name in ("absolute_error", "l1"): inits, inputs, nodes, outputs = _loss_l1( existing_names, elem, shape, output_name, label_name, weight_name, loss_name ) elif score_name == "elastic": inits, inputs, nodes, outputs = _loss_elastic( existing_names, elem, shape, output_name, label_name, weight_name, loss_name, **kwargs, ) elif score_name == "log": shape = (None, 1) inits, inputs, nodes, outputs = _loss_log( existing_names, elem, shape, output_name, label_name, weight_name, loss_name, **kwargs, ) else: raise NotImplementedError( # pragma: no cover f"Unexpected {score_name!r} value for score_name." ) if penalty is not None: final_name = nodes[-1].output[0] loss_name = _unique_name(existing_names, "loss_diff") nodes[-1].output[0] = loss_name names = [] for k, v in penalty.items(): if isinstance(v, float): v = {"l2": v} inits_to_add, nodes_to_add = penalty_loss_onnx( k, dtype=TENSOR_TYPE_TO_NP_TYPE[elem], existing_names=existing_names, **v, ) names.append(nodes_to_add[-1].output[0]) nodes.extend(nodes_to_add) inits.extend(inits_to_add) # Operator Sum does not have a gradient. if len(names) == 1: pen_name = names[0] else: current = names[0] for i in range(1, len(names)): new_name = _unique_name(existing_names, "sumop") nodes.append(make_node("Add", [current, names[i]], [new_name])) current = new_name pen_name = current cst_shape = _unique_name(existing_names, "shapevect") inits.append(from_array(numpy.array([-1, 1], dtype=numpy.int64), name=cst_shape)) loss_reshape = _unique_name(existing_names, "loss_reshape") pen_reshape = _unique_name(existing_names, "penalty_reshape") nodes.extend( [ make_node("Reshape", [pen_name, cst_shape], [pen_reshape]), make_node("Reshape", [loss_name, cst_shape], [loss_reshape]), ] ) nodes.append(make_node("Add", [pen_reshape, loss_reshape], [final_name])) inits = [*onx.graph.initializer, *inits] graph = make_graph( [*onx.graph.node, *nodes], onx.graph.name, [*onx.graph.input, *inputs], [*outputs, onx.graph.output[output_index]], inits, ) onnx_model = make_model(graph) onnx_model.ir_version = onx.ir_version onnx_model.producer_name = onx.producer_name onnx_model.producer_version = onx.producer_version onnx_model.domain = onx.domain onnx_model.model_version = onx.model_version onnx_model.doc_string = onx.doc_string if len(onx.metadata_props) > 0: values = {p.key: p.value for p in onx.metadata_props} set_model_props(onnx_model, values) # fix opset import del onnx_model.opset_import[:] # pylint: disable=E1101 for oimp in onx.opset_import: op_set = onnx_model.opset_import.add() # pylint: disable=E1101 op_set.domain = oimp.domain op_set.version = oimp.version # Some nodes may have to be rewritten, Reciprocal(X) -> Div(1 / X). g = GraphBuilder(onnx_model) g.optimize() onnx_model_opt = g.to_onnx() return onnx_model_opt