Source code for mlinsights.mlmodel.kmeans_constraint

import numpy
from scipy.spatial import Delaunay
from sklearn.cluster import KMeans
from sklearn.metrics.pairwise import euclidean_distances
from ._kmeans_constraint_ import constraint_kmeans, constraint_predictions


[docs] class ConstraintKMeans(KMeans): """ Defines a constraint :epkg:`k-means`. Clusters are modified to have an equal size. The algorithm is initialized with a regular :epkg:`KMeans` and continues with a modified version of it. Computing the predictions offer a choice. The first one is to keep the predictions from the regular :epkg:`k-means` algorithm but with the balanced clusters. The second is to compute balanced predictions over the test set. That implies that the predictions for the same observations might change depending on the set it belongs to. The parameter *strategy* determines how obseervations should be assigned to a cluster. The value can be: * ``'distance'``: observations are ranked by distance to a cluster, the algorithm assigns first point to the closest center unless it reached the maximum size, it deals first with the further point and maps it to the closest center * ``'gain'``: follows the algorithm described at see `Same-size k-Means Variation <https://elki-project.github.io/tutorial/same-size_k_means>`_, * ``'weights'``: estimates weights attached to each cluster, it weights the distance to each cluster in order to balance the number of points mapped to every cluster, the strategy uses a learning rate. The first two strategies cannot reach a good compromise without using function :func:`_switch_clusters <mlinsights.mlmodel._kmeans_constraint_._switch_clusters>` which tries every switch between clusters: two points change clusters. It keeps the number of points and checks that the inertia is reduced. :param n_clusters: number of clusters :param init: used by :epkg:`k-means` :param n_init: used by :epkg:`k-means` :param max_iter: used by :epkg:`k-means` :param tol: used by :epkg:`k-means` :param verbose: used by :epkg:`k-means` :param random_state: used by :epkg:`k-means` :param copy_x: used by :epkg:`k-means` :param algorithm: used by :epkg:`k-means` :param balanced_predictions: produced balanced prediction or the regular ones :param strategy: strategy or algorithm used to abide by the constraint :param kmeans0: if True, applies *k-means* algorithm first :param history: keeps centers accress iterations :param learning_rate: learning rate, used by strategy `'weights'` """ _strategy_value = {"distance", "gain", "weights"} def __init__( self, n_clusters=8, init="k-means++", n_init=10, max_iter=500, tol=0.0001, verbose=0, random_state=None, copy_x=True, algorithm="lloyd", balanced_predictions=False, strategy="gain", kmeans0=True, learning_rate=1.0, history=False, ): self._n_threads = 1 KMeans.__init__( self, n_clusters=n_clusters, init=init, n_init=n_init, max_iter=max_iter, tol=tol, verbose=verbose, random_state=random_state, copy_x=copy_x, algorithm=algorithm, ) self.balanced_predictions = balanced_predictions self.strategy = strategy self.kmeans0 = kmeans0 self.history = history self.learning_rate = learning_rate if strategy not in ConstraintKMeans._strategy_value: raise ValueError(f"strategy must be in {ConstraintKMeans._strategy_value}")
[docs] def fit(self, X, y=None, sample_weight=None): """ Compute k-means clustering. :param X: array-like or sparse matrix, shape=(n_samples, n_features) Training instances to cluster. It must be noted that the data will be converted to C ordering, which will cause a memory copy if the given data is not C-contiguous. :param y: Ignored :param sample_weight: sample weight """ max_iter = self.max_iter self.max_iter //= 2 if self.kmeans0: KMeans.fit(self, X, y, sample_weight=sample_weight) state = None else: state = numpy.random.RandomState(self.random_state) labels = state.randint(0, self.n_clusters, X.shape[0], dtype=numpy.int32) centers = numpy.empty((self.n_clusters, X.shape[1]), dtype=X.dtype) choice = state.randint(0, self.n_clusters, self.n_clusters) for i, c in enumerate(choice): centers[i, :] = X[c, :] self.labels_ = labels self.cluster_centers_ = centers self.inertia_ = float(X.shape[0]) self.n_iter_ = 0 self.max_iter = max_iter return self.constraint_kmeans( X, sample_weight=sample_weight, state=state, learning_rate=self.learning_rate, history=self.history, )
[docs] def constraint_kmeans( self, X, sample_weight=None, state=None, learning_rate=1.0, history=False, ): """ Completes the constraint k-means. :param X: features :param sample_weight: sample weight :param state: state :param learning_rate: learning rate :param history: keeps evolution of centers """ labels, centers, inertia, weights, iter_, all_centers = constraint_kmeans( X, self.labels_, sample_weight, self.cluster_centers_, inertia=self.inertia_, iter=self.n_iter_, max_iter=self.max_iter, verbose=self.verbose, strategy=self.strategy, state=state, learning_rate=learning_rate, history=history, ) self.labels_ = labels self.cluster_centers_ = centers self.cluster_centers_iter_ = ( None if len(all_centers) == 0 else numpy.dstack(all_centers) ) self.inertia_ = inertia self.n_iter_ = iter_ self.weights_ = weights return self
[docs] def predict(self, X): """ Computes the predictions. :param X: features. :return: prediction """ if self.weights_ is None: if self.balanced_predictions: labels, _, __ = constraint_predictions( X, self.cluster_centers_, strategy=self.strategy + "_p" ) return labels return KMeans.predict(self, X) else: assert ( not self.balanced_predictions ), "balanced_predictions and weights_ cannot be used together." return KMeans.predict(self, X)
[docs] def transform(self, X): """ Computes the predictions. :param X: features. :return: prediction """ if self.weights_ is None: if self.balanced_predictions: labels, distances, __ = constraint_predictions( X, self.cluster_centers_, strategy=self.strategy ) # We remove small distances than the chosen clusters # due to the constraint, we choose max*2 instead. mx = distances.max() * 2 for i, li in enumerate(labels): mi = distances[i, li] mmi = distances[i, :].min() if mi > mmi: # numpy.nan would be best distances[i, distances[i, :] < mi] = mx return distances return KMeans.transform(self, X) else: assert ( not self.balanced_predictions ), "balanced_predictions and weights_ cannot be used together." res = KMeans.transform(self, X) res *= self.weights_.reshape((1, -1)) return res
[docs] def score(self, X, y=None, sample_weight=None): """ Returns the distances to all clusters. :param X: features :param y: unused :param sample_weight: sample weight :return: distances """ if self.weights_ is None: if self.balanced_predictions: _, __, dist_close = constraint_predictions( X, self.cluster_centers_, strategy=self.strategy ) return dist_close res = euclidean_distances(self.cluster_centers_, X, squared=True) else: assert ( not self.balanced_predictions ), "balanced_predictions and weights_ cannot be used together." res = euclidean_distances(X, self.cluster_centers_, squared=True) res *= self.weights_.reshape((1, -1)) return res.max(axis=1)
[docs] def cluster_edges(self): """ Computes edges between clusters based on a `Delaunay <https://docs.scipy.org/doc/scipy/reference/ generated/scipy.spatial.Delaunay.html>`_ graph. """ tri = Delaunay(self.cluster_centers_) triangles = tri.simplices edges = set() for row in triangles: for j in range(1, row.shape[-1]): a, b = row[j - 1 : j + 1] edges.add((a, b) if a < b else (b, a)) a, b = row[0], row[-1] edges.add((a, b) if a < b else (b, a)) return edges