Source code for clusterking.stability.fom

#!/usr/bin/env python3

# std
from abc import abstractmethod
from typing import Optional, Callable, Dict, Tuple, Any, Union

# 3rd
import numpy as np

# ours
from clusterking.stability.preprocessor import Preprocessor
from clusterking.worker import AbstractWorker
from clusterking.result import AbstractResult
from import Data

[docs]class FOMResult(AbstractResult): """Object containing the result of a Figure of Merit (FOM), represented by a :class:`FOM` object."""
[docs] def __init__(self, fom, name): super().__init__() self.fom = fom = name
[docs]class FOM(AbstractWorker): """Figure of Merit, comparing the outcome of two experiments (e.g. the clusters of two very similar datasets)."""
[docs] def __init__( self, name: Optional[str] = None, preprocessor: Optional[Preprocessor] = None, ): """Initialize the FOM worker. Args: name: Name of the FOM preprocessor: :class:`~clusterking.stability.preprocessor.Preprocessor` object """ super().__init__() self._name = name self._preprocessor = preprocessor
@property def name(self): """Name of the FOM""" if self._name is None: return str(type(self).__name__) + "_" + return self._name
[docs] def set_name(self, value: str): self._name = value
@property def preprocessor(self): if self._preprocessor is None: self._preprocessor = Preprocessor() return self._preprocessor
[docs] def set_preprocessor(self, preprocessor: Preprocessor): self._preprocessor = preprocessor
[docs] def run(self, data1: Data, data2: Data) -> FOMResult: """Calculate figure of merit. Args: data1: "original" :class:`` object data2: "other" :class:`` object Returns: :class:`FOMResult` object """ preprocessed =, data2) fom = self._fom(preprocessed.data1, preprocessed.data2) return FOMResult(fom=fom,
@abstractmethod def _fom(self, data1: Data, data2: Data): pass
# todo: add cluster column setting in init
[docs]class CCFOM(FOM): """Cluster Comparison figure of merit (CCFOM), comparing whether the clusters of two experiments match.""" @abstractmethod def _fom(self, data1: Data, data2: Data): pass
[docs]class MatchingClusters(CCFOM): """Fraction of sample points (spoints) that lie in the same cluster, when comparing two clustered datasets with the same number of sample points. """ def _fom(self, data1, data2) -> float: clustered1 = data1.df["cluster"] clustered2 = data2.df["cluster"] assert len(clustered1) == len(clustered2) return sum(clustered1 == clustered2) / len(clustered1)
[docs]class DeltaNClusters(CCFOM): """Difference of number of clusters between two experiments (number of clusters in experiment 1 - number of lcusters in experiment 2). """ def _fom(self, data1, data2) -> int: clustered1 = data1.df["cluster"] clustered2 = data2.df["cluster"] return len(set(clustered1)) - len(set(clustered2))
[docs]class NClusters(CCFOM): """Number of clusters in dataset 1 or 2"""
[docs] def __init__(self, which, **kwargs): """ Args: which: 1 or 2 for dataset 1 or dataset 2 **kwargs: Keyword arguments for :class:`CCFOM`` """ super().__init__(**kwargs) self.which = which if self.which not in [1, 2]: raise ValueError( "Invalid value of which, must be 1 or 2, but is {}".format( self.which ) )
def _fom(self, data1, data2) -> int: if self.which == 1: return len(set(data1.df["cluster"])) elif self.which == 2: return len(set(data2.df["cluster"])) else: raise ValueError("Invalid which value.")
[docs]class BpointList(FOM): """Adds array of bpoint coordinates of data2""" def _fom(self, data1, data2) -> np.ndarray: return data2.df[data2.df["bpoint"] == True][data2.par_cols].to_numpy()
# todo: configure bpoint column
[docs]class BMFOM(FOM): """**Abstract class**: Benchmark Figure of Merit (BMFOM), comparing whether the benchmark points of two experiments match. """ def _fom(self, data1: Data, data2: Data) -> float: clusters1 = set(data1.df["cluster"].unique()) clusters2 = set(data2.df["cluster"].unique()) if not clusters1 == clusters2: return np.nan clusters = clusters1 cluster2bpoint = {} for cluster in clusters: bpoints1 = data1.df[ (data1.df["cluster"] == cluster) & data1.df["bpoint"] ] bpoints2 = data2.df[ (data2.df["cluster"] == cluster) & data2.df["bpoint"] ] msg = "Found {} bpoints instead of 1 for dataset {}: " if len(bpoints1) != 1: raise ValueError(msg.format(len(bpoints1), 1) + str(bpoints1)) if len(bpoints2) != 1: raise ValueError(msg.format(len(bpoints2), 2) + str(bpoints2)) bpoint1 = bpoints1.iloc[0][data1.par_cols] bpoint2 = bpoints2.iloc[0][data2.par_cols] cluster2bpoint[cluster] = (bpoint1, bpoint2) return self._fom2(cluster2bpoint) @abstractmethod def _fom2(self, cluster2bpoint: Dict[int, Tuple[Any, Any]]) -> float: pass
[docs]class AverageBMProximityFOM(BMFOM): """Returns the average distance of benchmark points in parameter space between two experiments. """ _named_averaging_fcts = { "max": lambda it: max(it), "arithmetic": lambda it: sum(it) / len(it), } _named_metric_fcts = { "euclidean": lambda x: np.sqrt(np.sum(np.square(x[0] - x[1]))) } named_averaging_fcts = _named_averaging_fcts.keys() named_metric_fcts = _named_metric_fcts.keys()
[docs] def __init__(self, *args, **kwargs): """Initialize the FOM worker. Args: See :meth:`~clusterking.stability.fom.FOM.__init__` """ super().__init__(*args, **kwargs) self._averaging = self._named_averaging_fcts["arithmetic"] self._metric = self._named_metric_fcts["euclidean"]
# todo: no, set this in __init__
[docs] def set_averaging(self, fct: Union[str, Callable]) -> None: """Set averaging mode Args: fct: Function of the distances between benchmark points of the same cluster or name of pre-implemented functions (check :attr:`named_averaging_fcts` for a list) Returns: None """ if isinstance(fct, str): self._averaging = self._named_averaging_fcts[fct] else: self._averaging = fct
[docs] def set_metric(self, fct: Union[str, Callable]) -> None: """Set metric in parameter space Args: fct: Function of a tuple of two points in parameter space or name of pre-implemented functions (check :attr:`named_metric_fcts` for a list) Returns: None """ if isinstance(fct, str): self._metric = self._named_metric_fcts[fct] else: self._metric = fct
def _fom2(self, cluster2bpoint: Dict[int, Tuple[Any, Any]]) -> float: ret = self._averaging(list(map(self._metric, cluster2bpoint.values()))) if not isinstance(ret, (float, int)): raise ValueError("Not float") return ret