import numpy
from sklearn.tree import DecisionTreeRegressor
[docs]
class PiecewiseTreeRegressor(DecisionTreeRegressor):
"""
Implements a kind of piecewise linear regression by modifying
the criterion used by the algorithm which builds a decision tree.
See :epkg:`sklearn:tree:DecisionTreeRegressor` to get the meaning
of the parameters except criterion:
* ``mselin``: optimizes for a piecewise linear regression
* ``simple``: optimizes for a stepwise regression (equivalent to *mse*)
If the file does not compile or crashes, some explanations are given
in :ref:`blog-internal-api-impurity-improvement`.
"""
def __init__(
self,
criterion="mselin",
splitter="best",
max_depth=None,
min_samples_split=2,
min_samples_leaf=1,
min_weight_fraction_leaf=0.0,
max_features=None,
random_state=None,
max_leaf_nodes=None,
min_impurity_decrease=0.0,
):
DecisionTreeRegressor.__init__(
self,
criterion=criterion,
splitter=splitter,
max_depth=max_depth,
min_samples_split=min_samples_split,
min_samples_leaf=min_samples_leaf,
min_weight_fraction_leaf=min_weight_fraction_leaf,
max_features=max_features,
random_state=random_state,
max_leaf_nodes=max_leaf_nodes,
min_impurity_decrease=min_impurity_decrease,
)
[docs]
def fit(self, X, y, sample_weight=None, check_input=True):
"""
Replaces the string stored in criterion by an instance of a class.
"""
replace = None
if isinstance(self.criterion, str):
if self.criterion == "mselin":
from .piecewise_tree_regression_criterion_linear import (
LinearRegressorCriterion,
)
replace = self.criterion
self.criterion = LinearRegressorCriterion(
1 if len(y.shape) <= 1 else y.shape[1], X
)
elif self.criterion == "simple":
from .piecewise_tree_regression_criterion_fast import (
SimpleRegressorCriterionFast,
)
replace = self.criterion
self.criterion = SimpleRegressorCriterionFast(
1 if len(y.shape) <= 1 else y.shape[1], X.shape[0]
)
else:
replace = None
DecisionTreeRegressor.fit(
self, X, y, sample_weight=sample_weight, check_input=check_input
)
if replace:
self.criterion = replace
if self.criterion == "mselin":
self._fit_reglin(X, y, sample_weight)
return self
def _mapping_train(self, X):
tree = self.tree_
leaves = [
i
for i in range(len(tree.children_left))
if tree.children_left[i] <= i and tree.children_right[i] <= i
]
dec_path = self.decision_path(X)
association = numpy.zeros((X.shape[0],))
association[:] = -1
mapping = {}
ntree = 0
for j in leaves:
ind = dec_path[:, j] == 1
ind = numpy.asarray(ind.todense()).flatten()
if not numpy.any(ind):
# No training example for this bucket.
continue
mapping[j] = ntree
association[ind] = ntree
ntree += 1
return mapping
[docs]
def predict_leaves(self, X):
"""
Returns the leave index for each observation of *X*.
:param X: array
:return: array
leaves index in ``self.leaves_index_``
"""
# The creation of the sparse matrix could be avoided.
leaves = self.decision_path(X)
leaves = leaves[:, self.leaves_index_]
mat = numpy.argmax(leaves, 1)
res = numpy.asarray(mat).ravel()
return res
def _fit_reglin(self, X, y, sample_weight):
"""
Fits linear regressions for all leaves.
Sets attributes ``leaves_mapping_``, ``betas_``, ``leaves_index_``.
The first attribute is a dictionary ``{leave: row}``
which maps a leave of the tree to the coefficients
``betas_[row, :]`` of a regression trained on all training
points mapped a specific leave. ``leaves_index_`` keeps
in memory a set of leaves.
"""
from .piecewise_tree_regression_criterion_linear import (
LinearRegressorCriterion,
)
tree = self.tree_
self.leaves_index_ = [
i
for i in range(len(tree.children_left))
if tree.children_left[i] <= i and tree.children_right[i] <= i
]
if tree.n_leaves != len(self.leaves_index_):
raise RuntimeError(
f"Unexpected number of leaves {tree.n_leaves} "
f"!= {len(self.leaves_index_)}."
)
pred_leaves = self.predict_leaves(X)
self.leaves_mapping_ = {k: i for i, k in enumerate(pred_leaves)}
self.betas_ = numpy.empty((len(self.leaves_index_), X.shape[1] + 1))
for i, _ in enumerate(self.leaves_index_):
ind = pred_leaves == i
xs = X[ind, :].copy()
ys = y[ind].astype(numpy.float64)
if len(ys.shape) == 1:
ys = ys[:, numpy.newaxis]
ys = ys.copy()
ws = sample_weight[ind].copy() if sample_weight is not None else None
# Fatal Python error: __pyx_fatalerror: Acquisition count is 0 (line 26868)
dec = LinearRegressorCriterion.create(xs, ys, ws)
dec.node_beta(self.betas_[i, :])
[docs]
def predict(self, X, check_input=True):
"""
Overloads method *predict*. Falls back into
the predict from a decision tree is criterion is
*mse*, *mae*, *simple*. Computes the predictions
from linear regression if the criterion is *mselin*.
"""
if self.criterion == "mselin":
return self._predict_reglin(X, check_input=check_input)
return DecisionTreeRegressor.predict(self, X, check_input=check_input)
def _predict_reglin(self, X, check_input=True):
"""
Computes the predictions with a linear regression
fitted with the observations mapped to each leave
of the tree.
:param X: array-like or sparse matrix of shape = [n_samples, n_features]
The input samples. Internally, it will be converted to
``dtype=np.float32`` and if a sparse matrix is provided
to a sparse ``csr_matrix``.
:param check_input: boolean, (default=True)
Allow to bypass several input checking.
Don't use this parameter unless you know what you do.
:return: y, array of shape = [n_samples] or [n_samples, n_outputs]
The predicted classes, or the predict values.
"""
leaves = self.predict_leaves(X)
pred = numpy.ones((X.shape[0], 1))
Xone = numpy.hstack([X, pred])
for i in range(X.shape[0]):
li = leaves[i]
pred[i] = numpy.dot(Xone[i, :], self.betas_[li, :])
return pred.ravel()