Source code for clusterking.stability.preprocessor

#!/usr/bin/env python3

# std
from abc import abstractmethod
from typing import Dict

# 3rd
import numpy as np

# ours
from clusterking.result import AbstractResult
from clusterking.worker import AbstractWorker
from import Data

[docs]class PreprocessorResult(AbstractResult):
[docs] def __init__(self, data1, data2): super().__init__() self.data1 = data1 self.data2 = data2
[docs]class Preprocessor(AbstractWorker):
[docs] def __init__(self, name=None): super().__init__() self._name = name
@property def name(self): if self._name is None: return str(type(self).__name__) return self._name @name.setter def name(self, value): self._name = value
[docs] def run(self, data1: Data, data2: Data) -> PreprocessorResult: """Run. Args: data1: "original" :class:`` object data2: "other" :class:`` object Returns: :class:`~PreprocessorResult` """ return PreprocessorResult(data1=data1, data2=data2)
[docs]class ClusterMatcherResult(PreprocessorResult):
[docs] def __init__(self, data1, data2, rename_dct): super().__init__(data1=data1, data2=data2) self.rename_dct = rename_dct
[docs]class ClusterMatcher(Preprocessor): """Cluster names are arbitrary in general, i.e. when trying to compare two clustered datasets and trying to calculate a figure of merit, we have to match the names together. This is donen by this worker class. """
[docs] def __init__(self, *args, cluster_column="cluster", **kwargs): super().__init__(*args, **kwargs) self.cluster_column = cluster_column
[docs] @abstractmethod def run(self, data1: Data, data2: Data) -> ClusterMatcherResult: """ Args: data1: "original" :class:`` object data2: "other" :class:`` object Returns: :class:`~ClusterMatcherResult` """ pass
[docs]class TrivialClusterMatcher(ClusterMatcher): """Thus subclass of :class:`CCMatcher` maps cluster names from the first clustering to the cluster name of the second that maximizes the number of sample points that lie in the same cluster. It also only returns the intersection of the indizes of both Series. """
[docs] def run(self, data1: Data, data2: Data) -> ClusterMatcherResult: # todo [perf, low effort, med prio]: for speedup: only use pd.Series of # clusters ndata1 = data1.copy(deep=True) ndata2 = data2.copy(deep=True) # 1. Throw out index_intersection = set(ndata1.df.index).intersection( set(ndata2.df.index) ) ndata1.df = ndata1.df.loc[index_intersection] ndata2.df = ndata2.df.loc[index_intersection] # 2. Rename clusters clusters2 = set(ndata2.df[self.cluster_column]) dct = {} for cluster2 in clusters2: mask = ndata2.df[self.cluster_column] == cluster2 most_likely = np.argmax( np.bincount(ndata1.df[self.cluster_column][mask]) ) dct[cluster2] = most_likely ndata2.df[self.cluster_column] = ndata2.df[self.cluster_column].map(dct) return ClusterMatcherResult(data1=ndata1, data2=ndata2, rename_dct=dct)
[docs]class FirstComeFirstServe1DClusterMatcher(ClusterMatcher): """This subclass of :class:`CCMatcher` works only for 1D parameter spaces. It simply sorts the first points of each cluster and enumerates them in order to get a unique name for each cluster."""
[docs] def run(self, data1: Data, data2: Data) -> ClusterMatcherResult: ndata1 = data1.copy(deep=True) ndata2 = data2.copy(deep=True) nclusters1 = len(data1.df[self.cluster_column].unique()) nclusters2 = len(data2.df[self.cluster_column].unique()) if nclusters1 != nclusters2: raise ValueError("Cluster numbers don't match") order1 = self._get_order_of_clusters(data1) order2 = self._get_order_of_clusters(data2) order1_inverted = {value: key for key, value in order1.items()} rename_dct = {} for cluster in order2: rename_dct[cluster] = order1_inverted[order2[cluster]] ndata2.df[self.cluster_column] = ndata2.df[self.cluster_column].map( rename_dct ) return ClusterMatcherResult( data1=ndata1, data2=ndata2, rename_dct=rename_dct )
def _get_order_of_clusters(self, data: Data) -> Dict[int, int]: cluster2min = {} uclusters = data.df[self.cluster_column].unique() for ucluster in uclusters: cluster2min[ucluster] = data.df[ data.df[self.cluster_column] == ucluster ][data.par_cols[0]].min() sorted_mins = sorted(list(cluster2min.values())) return { ucluster: sorted_mins.index(cluster2min[ucluster]) for ucluster in uclusters }