Source code for mlinsights.mlmodel.piecewise_tree_regression

import numpy
from sklearn.tree import DecisionTreeRegressor


[docs] class PiecewiseTreeRegressor(DecisionTreeRegressor): """ Implements a kind of piecewise linear regression by modifying the criterion used by the algorithm which builds a decision tree. See :epkg:`sklearn:tree:DecisionTreeRegressor` to get the meaning of the parameters except criterion: * ``mselin``: optimizes for a piecewise linear regression * ``simple``: optimizes for a stepwise regression (equivalent to *mse*) If the file does not compile or crashes, some explanations are given in :ref:`blog-internal-api-impurity-improvement`. """ def __init__( self, criterion="mselin", splitter="best", max_depth=None, min_samples_split=2, min_samples_leaf=1, min_weight_fraction_leaf=0.0, max_features=None, random_state=None, max_leaf_nodes=None, min_impurity_decrease=0.0, ): DecisionTreeRegressor.__init__( self, criterion=criterion, splitter=splitter, max_depth=max_depth, min_samples_split=min_samples_split, min_samples_leaf=min_samples_leaf, min_weight_fraction_leaf=min_weight_fraction_leaf, max_features=max_features, random_state=random_state, max_leaf_nodes=max_leaf_nodes, min_impurity_decrease=min_impurity_decrease, )
[docs] def fit(self, X, y, sample_weight=None, check_input=True): """ Replaces the string stored in criterion by an instance of a class. """ replace = None if isinstance(self.criterion, str): if self.criterion == "mselin": from .piecewise_tree_regression_criterion_linear import ( LinearRegressorCriterion, ) replace = self.criterion self.criterion = LinearRegressorCriterion( 1 if len(y.shape) <= 1 else y.shape[1], X ) elif self.criterion == "simple": from .piecewise_tree_regression_criterion_fast import ( SimpleRegressorCriterionFast, ) replace = self.criterion self.criterion = SimpleRegressorCriterionFast( 1 if len(y.shape) <= 1 else y.shape[1], X.shape[0] ) else: replace = None DecisionTreeRegressor.fit( self, X, y, sample_weight=sample_weight, check_input=check_input ) if replace: self.criterion = replace if self.criterion == "mselin": self._fit_reglin(X, y, sample_weight) return self
def _mapping_train(self, X): tree = self.tree_ leaves = [ i for i in range(len(tree.children_left)) if tree.children_left[i] <= i and tree.children_right[i] <= i ] dec_path = self.decision_path(X) association = numpy.zeros((X.shape[0],)) association[:] = -1 mapping = {} ntree = 0 for j in leaves: ind = dec_path[:, j] == 1 ind = numpy.asarray(ind.todense()).flatten() if not numpy.any(ind): # No training example for this bucket. continue mapping[j] = ntree association[ind] = ntree ntree += 1 return mapping
[docs] def predict_leaves(self, X): """ Returns the leave index for each observation of *X*. :param X: array :return: array leaves index in ``self.leaves_index_`` """ # The creation of the sparse matrix could be avoided. leaves = self.decision_path(X) leaves = leaves[:, self.leaves_index_] mat = numpy.argmax(leaves, 1) res = numpy.asarray(mat).ravel() return res
def _fit_reglin(self, X, y, sample_weight): """ Fits linear regressions for all leaves. Sets attributes ``leaves_mapping_``, ``betas_``, ``leaves_index_``. The first attribute is a dictionary ``{leave: row}`` which maps a leave of the tree to the coefficients ``betas_[row, :]`` of a regression trained on all training points mapped a specific leave. ``leaves_index_`` keeps in memory a set of leaves. """ from .piecewise_tree_regression_criterion_linear import ( LinearRegressorCriterion, ) tree = self.tree_ self.leaves_index_ = [ i for i in range(len(tree.children_left)) if tree.children_left[i] <= i and tree.children_right[i] <= i ] if tree.n_leaves != len(self.leaves_index_): raise RuntimeError( f"Unexpected number of leaves {tree.n_leaves} " f"!= {len(self.leaves_index_)}." ) pred_leaves = self.predict_leaves(X) self.leaves_mapping_ = {k: i for i, k in enumerate(pred_leaves)} self.betas_ = numpy.empty((len(self.leaves_index_), X.shape[1] + 1)) for i, _ in enumerate(self.leaves_index_): ind = pred_leaves == i xs = X[ind, :].copy() ys = y[ind].astype(numpy.float64) if len(ys.shape) == 1: ys = ys[:, numpy.newaxis] ys = ys.copy() ws = sample_weight[ind].copy() if sample_weight is not None else None # Fatal Python error: __pyx_fatalerror: Acquisition count is 0 (line 26868) dec = LinearRegressorCriterion.create(xs, ys, ws) dec.node_beta(self.betas_[i, :])
[docs] def predict(self, X, check_input=True): """ Overloads method *predict*. Falls back into the predict from a decision tree is criterion is *mse*, *mae*, *simple*. Computes the predictions from linear regression if the criterion is *mselin*. """ if self.criterion == "mselin": return self._predict_reglin(X, check_input=check_input) return DecisionTreeRegressor.predict(self, X, check_input=check_input)
def _predict_reglin(self, X, check_input=True): """ Computes the predictions with a linear regression fitted with the observations mapped to each leave of the tree. :param X: array-like or sparse matrix of shape = [n_samples, n_features] The input samples. Internally, it will be converted to ``dtype=np.float32`` and if a sparse matrix is provided to a sparse ``csr_matrix``. :param check_input: boolean, (default=True) Allow to bypass several input checking. Don't use this parameter unless you know what you do. :return: y, array of shape = [n_samples] or [n_samples, n_outputs] The predicted classes, or the predict values. """ leaves = self.predict_leaves(X) pred = numpy.ones((X.shape[0], 1)) Xone = numpy.hstack([X, pred]) for i in range(X.shape[0]): li = leaves[i] pred[i] = numpy.dot(Xone[i, :], self.betas_[li, :]) return pred.ravel()