from collections import OrderedDict
from io import BytesIO
from enum import IntFlag
from typing import Dict, List, Optional
import numpy as np
import onnx
from onnx import ModelProto, TensorProto
from onnx.helper import make_model, make_graph, make_node, make_tensor
from onnx_array_api.graph_api import GraphBuilder
from .loss_helper import get_train_initializer
[docs]
class DerivativeOptions(IntFlag):
"""
Options defining how to build the onnx graph of the
gradients.
* `Zero`: default option, all options are disabled
* `KeepYieldOp`: keeps the operator *YieldOp* in the graph,
see @see fn onnx_derivative
* `KeepOutputs`: keeps the output of the original graph
* `FillGrad`: does not add any output to specify the gradient
of the output but assumes it is one
* `Loss`: the function assumes the loss was added to the graph
"""
Zero = 0
KeepYieldOp = 1
KeepOutputs = 2
FillGrad = 4
Loss = 5
def _random_input(typ, shape, batch):
if typ in ("tensor(double)", TensorProto.DOUBLE):
dtype = np.float64
elif typ in ("tensor(float)", TensorProto.FLOAT):
dtype = np.float32
else:
raise NotImplementedError(f"Unable to guess dtype from {typ!r}.")
if len(shape) <= 1:
new_shape = shape
elif shape[0] in (None, 0):
new_shape = (batch, *shape[1:])
else:
new_shape = shape
return np.random.randn(*new_shape).astype(dtype)
[docs]
def random_feed(inputs, batch: int = 10, empty_dimension: int = 1) -> Dict[str, np.array]:
"""
Creates a dictionary of random inputs.
:param batch: dimension to use as batch dimension if unknown
:param empty_dimension: if a dimension is null, replaces it by this value
:return: dictionary
"""
res = OrderedDict()
for inp in inputs:
name = inp.name
if hasattr(inp.type, "tensor_type"):
typ = inp.type.tensor_type.elem_type
shape = tuple(
getattr(d, "dim_value", batch) for d in inp.type.tensor_type.shape.dim
)
shape = (shape[0], *[b if b > 0 else empty_dimension for b in shape[1:]])
else:
typ = inp.type
shape = inp.shape
res[name] = _random_input(typ, shape, batch)
return res
[docs]
def onnx_derivative(
onx: ModelProto,
weights: Optional[List[str]] = None,
inputs: Optional[List[str]] = None,
options: DerivativeOptions = DerivativeOptions.Zero,
loss: Optional[str] = None,
label: Optional[str] = None,
path_name: Optional[str] = None,
verbose: int = 0,
) -> ModelProto:
"""
Builds the gradient for an onnx graph.
:param onx: onnx graph
:param weights: gradient against those weights, None for all real weights
:param inputs: gradient against inputs, None for all real inputs
:param options: options of type @see cl DerivativeOptions
:param loss: loss output in case a loss was added in the graph,
*options* must be equal to `DerivativeOptions.Loss`
:param label: if *loss* is specified, then the label must be
specified as well
:param path_name: if *options* equal to `DerivativeOptions.Loss`,
the gradient is saved to that path
:param verbose: verbosity
:return: onnx graph
The function calls *OrtModuleGraphBuilderConfiguration*
from :epkg:`onnxruntime-training`. This graph is meant to be used
with `OrtGradientForwardBackward` and includes
operator `YieldOp`. That's the graph looks this way:
.. gdot::
:script: DOT-SECTION
import numpy as np
from onnx.defs import onnx_opset_version
from skl2onnx.common.data_types import FloatTensorType
from skl2onnx.algebra.onnx_ops import OnnxAdd
from onnx_array_api.plotting.dot_plot import to_dot
from experimental_experiment.gradient.grad_helper import (
onnx_derivative,
DerivativeOptions,
)
opv = onnx_opset_version() - 2
node = OnnxAdd(
"X",
np.array([1], dtype=np.float32),
op_version=opv,
output_names=["Y"]
)
onx = node.to_onnx(
{"X": FloatTensorType([None, 10])},
{"Y": FloatTensorType([None, 10])},
target_opset=opv,
)
try:
new_onx = onnx_derivative(onx, options=DerivativeOptions.KeepYieldOp)
except ImportError as e:
print("onnxruntime-training is not installed", e)
new_onx = None
if new_onx:
print("DOT-SECTION", to_dot(new_onx))
These operators are the outputs of the
initial graph and must be replaced by the gradient of these
outputs to compute the gradient of the weights and the inputs.
After they are replaced, it looks this way:
.. gdot::
:script: DOT-SECTION
import numpy as np
from onnx.defs import onnx_opset_version
from skl2onnx.common.data_types import FloatTensorType
from skl2onnx.algebra.onnx_ops import OnnxAdd
from onnx_array_api.plotting.dot_plot import to_dot
from experimental_experiment.gradient.grad_helper import (
onnx_derivative,
DerivativeOptions,
)
opv = onnx_opset_version() - 2
node = OnnxAdd(
"X",
np.array([1], dtype=np.float32),
op_version=opv,
output_names=["Y"]
)
onx = node.to_onnx(
{"X": FloatTensorType([None, 10])},
{"Y": FloatTensorType([None, 10])},
target_opset=opv,
)
try:
new_onx = onnx_derivative(onx, options=DerivativeOptions.Zero)
except ImportError as e:
print("onnxruntime-training is not installed", e)
new_onx = None
if new_onx:
print("DOT-SECTION", to_dot(new_onx))
The user can still compute the outputs.
.. gdot::
:script: DOT-SECTION
import numpy as np
from onnx.defs import onnx_opset_version
from skl2onnx.common.data_types import FloatTensorType
from skl2onnx.algebra.onnx_ops import OnnxAdd
from onnx_array_api.plotting.dot_plot import to_dot
from experimental_experiment.gradient.grad_helper import (
onnx_derivative,
DerivativeOptions,
)
opv = onnx_opset_version() - 2
node = OnnxAdd(
"X",
np.array([1], dtype=np.float32),
op_version=opv,
output_names=["Y"]
)
onx = node.to_onnx(
{"X": FloatTensorType([None, 10])},
{"Y": FloatTensorType([None, 10])},
target_opset=opv,
)
try:
new_onx = onnx_derivative(onx, options=DerivativeOptions.KeepOutputs)
except ImportError as e:
print("onnxruntime-training is not installed", e)
new_onx = None
if new_onx:
print("DOT-SECTION", to_dot(new_onx))
The input gradient can be filled with a constant matrix
filled with one and with the expected shape.
.. gdot::
:script: DOT-SECTION
import numpy as np
from onnx.defs import onnx_opset_version
from skl2onnx.common.data_types import FloatTensorType
from skl2onnx.algebra.onnx_ops import OnnxAdd
from onnx_array_api.plotting.dot_plot import to_dot
from experimental_experiment.gradient.grad_helper import (
onnx_derivative,
DerivativeOptions,
)
opv = onnx_opset_version() - 2
node = OnnxAdd(
"X",
np.array([1], dtype=np.float32),
op_version=opv,
output_names=["Y"]
)
onx = node.to_onnx(
{"X": FloatTensorType([None, 10])},
{"Y": FloatTensorType([None, 10])},
target_opset=opv,
)
try:
new_onx = onnx_derivative(
onx, options=DerivativeOptions.KeepOutputs | DerivativeOptions.FillGrad
)
except ImportError as e:
print("onnxruntime-training is not installed", e)
new_onx = None
if new_onx:
print("DOT-SECTION", to_dot(new_onx))
"""
assert isinstance(
options, DerivativeOptions
), f"Options must be from type DerivativeOptions not {type(options)!r}."
if options == DerivativeOptions.Loss:
return _onnx_derivative_loss(
onx,
weights=weights,
inputs=inputs,
options=options,
loss=loss,
label=label,
path_name=path_name,
verbose=verbose,
)
return _onnx_derivative_fw(
onx, weights=weights, inputs=inputs, options=options, verbose=verbose
)
def _default_inputs(onx: ModelProto) -> List[str]:
"Guesses default inputs (float ones) if not specified."
inputs_name = []
for i in onx.graph.input:
try:
elem_type = i.type.tensor_type.elem_type
except AttributeError:
# not a vector
continue
if elem_type in (
onnx.TensorProto.FLOAT16,
onnx.TensorProto.FLOAT,
onnx.TensorProto.DOUBLE,
):
inputs_name.append(i.name)
return inputs_name
def _onnx_derivative_fw(
onx: ModelProto,
weights: List[str],
inputs: List[str],
options: DerivativeOptions,
verbose: int = 0,
) -> ModelProto:
"""
Implements a gradient based on class `OrtModuleGraphBuilder`.
"""
from onnxruntime.capi._pybind_state import (
OrtModuleGraphBuilder,
OrtModuleGraphBuilderConfiguration,
TrainingGraphTransformerConfiguration,
Severity,
)
if verbose > 0:
print(f"[_onnx_derivative_fw] weights={weights} inputs={inputs} options={options}")
if weights is None:
inits = get_train_initializer(onx)
weights = list(inits)
if verbose > 0:
print(f"[_onnx_derivative_fw] guessed weights={weights}")
if verbose > 0:
print("[_onnx_derivative_fw] OrtModuleGraphBuilder")
builder = OrtModuleGraphBuilder()
config = OrtModuleGraphBuilderConfiguration()
config.initializer_names = weights
config.initializer_names_to_train = weights
if verbose > 0:
config.loglevel = Severity.INFO
if inputs is None:
inputs_name = _default_inputs(onx)
if len(inputs_name) > 0:
config.input_names_require_grad = inputs_name
config.build_gradient_graph = True
if verbose > 0:
print(
f"[_onnx_derivative_fw] TrainingGraphTransformerConfiguration "
f"with inputs_name={inputs_name}"
)
p = TrainingGraphTransformerConfiguration()
if verbose > 0:
print("[_onnx_derivative_fw] builder initialize")
builder.initialize(onx.SerializeToString(), config)
if verbose > 0:
print("[_onnx_derivative_fw] build")
builder.build(p)
try:
train_onnx_model_serialized = builder.get_gradient_model()
except AttributeError:
train_onnx_model_serialized = builder.get_model()
# optimized_pre_grad_model = builder.get_inference_optimized_model()
grad_yield = onnx.load(BytesIO(train_onnx_model_serialized))
if options & DerivativeOptions.KeepYieldOp:
assert (
options == DerivativeOptions.KeepYieldOp
), "Option YieldOd cannot be combined with any other."
return grad_yield
yields_op = [
(index, node)
for index, node in enumerate(grad_yield.graph.node)
if node.op_type == "YieldOp"
]
assert len(yields_op) > 0, "No YieldOp was found. The input graph must be wrong."
other_nodes = [
(index, node)
for index, node in enumerate(grad_yield.graph.node)
if node.op_type != "YieldOp"
]
inputs = list(grad_yield.graph.input)
if options & DerivativeOptions.KeepOutputs:
outputs = list(grad_yield.graph.output)
else:
original = set(i.name for i in onx.graph.output)
outputs = [o for o in grad_yield.graph.output if o.name not in original]
map_out = {o.name: o for o in onx.graph.output}
set_out = set(map_out)
for index, yn in yields_op:
assert len(yn.input) == len(yn.output), (
f"YieldOp should have the same number of inputs and outputs "
f"but index={index} and yield op is\n{yn}"
)
assert len(set(yn.input) & set_out) == len(
yn.input
), f"Unable to find one output {yn.input!r} in {list(map_out)!r}."
if not (options & DerivativeOptions.FillGrad):
for i, inp in enumerate(yn.input):
out = map_out[inp]
new_input = onnx.ValueInfoProto()
new_input.name = yn.output[i]
new_input.doc_string = "from yieldop"
new_input.type.CopyFrom(out.type)
inputs.append(new_input)
else:
assert (
options & DerivativeOptions.KeepOutputs
), "FillGrad should be set with KeepOutputs."
for i, inp in enumerate(yn.input):
name = f"{inp}_shape"
node = make_node("Shape", [inp], [name])
other_nodes.append((index + 0.1, node))
out = map_out[inp]
elem_type = out.type.tensor_type.elem_type
node = make_node(
"ConstantOfShape",
[name],
[yn.output[i]],
value=make_tensor("value", elem_type, (1,), [1]),
)
other_nodes.append((index + 0.2, node))
if options & DerivativeOptions.KeepOutputs:
# Keeps output from the original graph.
outputs.append(out)
# Final graph.
if verbose > 0:
print("[_onnx_derivative_fw] final graph")
other_nodes.sort()
other_nodes = [o[1] for o in other_nodes]
graph = make_graph(
other_nodes,
grad_yield.graph.name,
inputs,
outputs,
list(grad_yield.graph.initializer),
)
new_model = make_model(graph)
new_model.ir_version = grad_yield.ir_version
new_model.producer_name = grad_yield.producer_name
new_model.producer_version = grad_yield.producer_version
new_model.domain = grad_yield.domain
new_model.model_version = grad_yield.model_version
new_model.doc_string = grad_yield.doc_string
new_model.ir_version = onx.ir_version
if hasattr(onx, "value_info"):
graph.value_info.extend(grad_yield.value_info)
del new_model.opset_import[:]
for oimp in grad_yield.opset_import:
op_set = new_model.opset_import.add()
op_set.domain = oimp.domain
op_set.version = oimp.version
if verbose > 0:
print("[_onnx_derivative_fw] optimize")
g = GraphBuilder(new_model)
g.optimize()
onx_grad = g.to_onnx()
onx_grad.ir_version = new_model.ir_version
if verbose > 0:
print("[_onnx_derivative_fw] done")
return onx_grad
def _onnx_derivative_loss(
onx: ModelProto,
weights: List[str],
inputs: List[str],
options: DerivativeOptions,
loss: str,
label: str,
path_name: str,
verbose: int = 0,
) -> ModelProto:
"""
Implements a gradient based on class `PyGradientGraphBuilder`.
"""
from onnxruntime.capi._pybind_state import GradientGraphBuilder
assert path_name is not None, "path_name must not be None if options is 'Loss'."
assert weights is None, "weights must be None if options is 'Loss'."
assert label is not None, "label must not be None if options is 'Loss'."
assert loss is not None and isinstance(
loss, str
), "loss must not None and a string if options is 'Loss'."
if isinstance(label, str):
label = {label}
else:
label = set(label)
if inputs is None:
inputs_name = _default_inputs(onx)
inputs = inputs_name
if isinstance(inputs, str):
inputs = {inputs}
else:
inputs = set(inputs)
inputs = set(x for x in inputs if x not in label)
str_onx = onx.SerializeToString()
if verbose > 0:
print(f"[_onnx_derivative_loss] label={label!r}, inputs={inputs}, loss={loss}")
builder = GradientGraphBuilder(str_onx, label, inputs, loss)
if verbose > 0:
print(f"[_onnx_derivative_loss] build, onx size={len(str_onx)}")
builder.build()
if verbose > 0:
print(f"[_onnx_derivative_loss] save to {path_name!r}")
builder.save(path_name)
if verbose > 0:
print(f"[_onnx_derivative_loss] load {path_name!r}")
with open(path_name, "rb") as f:
grad_onx = onnx.load(f)
grad_onx.ir_version = onx.ir_version
if verbose > 0:
print("[_onnx_derivative_loss] done")
return grad_onx