Source code for clusterking.stability.noisysamplestability

#!/usr/bin/env python3

# std
import copy
import collections
from typing import Optional, Union, List, Callable
from pathlib import PurePath, Path

# 3rd
import pandas as pd
import tqdm.auto

# ours
from clusterking.stability.stabilitytester import (
    AbstractStabilityTester,
    SimpleStabilityTesterResult,
)
from clusterking.data.data import Data
from clusterking.scan.scanner import Scanner
from clusterking.cluster.cluster import Cluster
from clusterking.benchmark.benchmark import AbstractBenchmark
from clusterking.worker import AbstractWorker
from clusterking.result import AbstractResult
from clusterking.util.log import get_logger


[docs]class NoisySampleStabilityTesterResult(SimpleStabilityTesterResult): """Result of :class:`NoisySampleStabilityTester`"""
[docs] def __init__(self, df, samples=None, **kwargs): super().__init__(df) if samples is None: samples = [] #: Collected samples self.samples = samples
[docs]class NoisySampleResult(AbstractResult):
[docs] def __init__(self, samples: Optional[List[Data]] = None): super().__init__() if samples is None: samples = [] self.samples = samples # type: List[Data]
[docs] def write(self, directory: Union[str, PurePath], non_empty="add") -> None: """Write to output directory Args: directory: Path to directory non_empty: What to do if directory is not empty: ``raise`` (raise :py:class:`FileExistsError`), ``ignore`` (do nothing and potentially overwrite files), ``add`` (add files with new name). Returns: None """ directory = Path(directory) if directory.exists() and not directory.is_dir(): raise FileExistsError( "{} exists but is not a directory.".format(directory.resolve()) ) if not directory.exists(): directory.mkdir(parents=True) if len(list(directory.iterdir())) >= 1 and non_empty == "raise": raise FileExistsError( "{} is not an empty directory".format(directory.resolve()) ) if non_empty in ["ignore", "raise"]: for i, data in enumerate(self.samples): path = directory / "data_{:04d}.sql".format(i) data.write(path, overwrite="overwrite") elif non_empty == "add": i = 0 for data in self.samples: while True: path = directory / "data_{:04d}.sql".format(i) if not path.is_file(): data.write(path, overwrite="raise") break i += 1 else: raise ValueError( "Unknown option '{}' for non_empty.".format(non_empty) )
[docs] @classmethod def load( cls, directory: Union[str, PurePath], loader: Optional[Callable] = None ) -> "NoisySampleResult": """Load from output directory Args: directory: Path to directory to load from loader: Function used to load data (optional). Example: .. code-block:: python def loader(path): d = clusterking.DataWithError(path) d.add_rel_err_uncorr(0.01) return d nsr = NoisySampleResult.load("/path/to/dir/", loader=loader) """ directory = Path(directory) if not directory.is_dir(): raise FileNotFoundError( "{} does not exist or is not a directory".format(directory) ) samples = [] for path in sorted(directory.glob("data_*.sql")): if loader is not None: d = loader(path) else: d = Data(path) samples.append(d) return NoisySampleResult(samples=samples)
[docs]class NoisySample(AbstractWorker): """This stability test generates data samples with slightly varied sample points (by adding :meth:`clusterking.scan.Scanner.add_spoints_noise` to a pre-configured :class:`clusterking.scan.Scanner` object) Example: .. code-block:: python import clusterking as ck from clusterking.stability.noisysamplestability import NoisySample # Set up data object d = ck.Data() # Set up scanner s = Scanner() s.set_dfunction(...) s.set_spoints_equidist(...) # Set up noisysample object ns = NoisySample() ns.set_repeat(1) ns.set_noise("gauss", mean=0., sigma=1/30/4) # Run and write nsr = ns.run(scanner=s, data=d) nsr.write("output/folder") """
[docs] def __init__(self): super().__init__() self._noise_kwargs = {} self._noise_args = [] self._repeat = 10 self._cache_data = True self.set_repeat() self.log = get_logger("NoisySample")
# ************************************************************************** # Config # **************************************************************************
[docs] def set_repeat(self, repeat=10) -> None: """Set number of experiments. Args: repeat: Number of experiments Returns: None """ self._repeat = repeat
[docs] def set_noise(self, *args, **kwargs) -> None: """Configure noise, applied to the spoints in each experiment. See :meth:`clusterking.scan.Scanner.add_spoints_noise`. Args: *args: Positional arguments to :meth:`clusterking.scan.Scanner.add_spoints_noise`. **kwargs: Keyword arguments to :meth:`clusterking.scan.Scanner.add_spoints_noise`. Returns: None """ self._noise_args = args self._noise_kwargs = kwargs
# ************************************************************************** # Run # **************************************************************************
[docs] def run( self, scanner: Scanner, data: Optional[Data] = None ) -> NoisySampleResult: """ .. note:: This method will handle keyboard interrupts and still return the so far collected data. Args: scanner: :class:`~clusterking.scan.scan.Scanner` object data: data: :class:`~clusterking.data.data.Data` object. This does not have to contain any actual sample points, but is used so that you can use data with errors by passing a :class:`~clusterking.data.DataWithErrors` object. Returns: :class:`NoisySampleResult`. """ datas = [] for _ in tqdm.auto.tqdm(range(self._repeat + 1), desc="NoisySample"): try: noisy_scanner = copy.copy(scanner) noisy_scanner.set_progress_bar(True, leave=False, position=1) noisy_scanner.add_spoints_noise( *self._noise_args, **self._noise_kwargs ) this_data = data.copy(deep=True) noisy_scanner.run(this_data).write() datas.append(this_data) except KeyboardInterrupt: self.log.critical( "Keyboard interrupt: Will still return " "so far collected samples" ) return NoisySampleResult(datas)
[docs]class NoisySampleStabilityTester(AbstractStabilityTester): """This stability test generates data samples with slightly varied sample points (by adding :meth:`clusterking.scan.Scanner.add_spoints_noise` to a pre-configured :class:`clusterking.scan.Scanner` object) and compares the resulting clusters and benchmark points. Example: .. code-block:: python nsr = NoisySampleResult() nsr.load("/path/to/samples/") c = ck.cluster.HierarchyCluster() c.set_metric() c.set_max_d(0.2) nsst = NoisySampleStabilityTester() nsst.add_fom(DeltaNClusters(name="DeltaNClusters")) r = nsst.run(sample=nsr, cluster=c) """
[docs] def __init__(self, *args, keep_samples=False, **kwargs): """Initialize :class:`NoisySampleStabilityTester` Args: *args: Arguments passed on to :class:`~clusterking.stability.stabilitytester.AbstractStabilityTester` keep_samples: Save clustered/benchmarked samples to ``NoisySampleStabilityTester.samples`` **kwargs: Keyword arguments passed on to :class:`~clusterking.stability.stabilitytester.AbstractStabilityTester` """ super().__init__(*args, **kwargs) self._keep_samples = keep_samples
# ************************************************************************** # Run # **************************************************************************
[docs] def run( self, sample: NoisySampleResult, cluster: Optional[Cluster] = None, benchmark: Optional[AbstractBenchmark] = None, ) -> NoisySampleStabilityTesterResult: """Run stability test. Args: sample: :class:`~NoisySampleResult` cluster: :class:`~clusterking.cluster.cluster.Cluster` object benchmark: Optional: :class:`~clusterking.cluster.cluster.Cluster` object Returns: :class:`~NoisySampleStabilityTesterResult` object """ reference_data = None fom_results = collections.defaultdict(list) # Collected samples if ``keep_samples == True``: samples = [] for isample, data in tqdm.auto.tqdm(list(enumerate(sample.samples))): if cluster is not None: cluster.run(data).write() if benchmark is not None: benchmark.run(data).write() if isample == 0: reference_data = data.copy(deep=True) continue for fom_name, fom in self._foms.items(): try: fom = fom.run(reference_data, data).fom except Exception as e: print("isample = {}".format(isample)) if self._exceptions_handling == "raise": raise e elif self._exceptions_handling == "print": fom = None print(e) else: raise ValueError( "Invalid value for exception " "handling: {}".format(self._exceptions_handling) ) fom_results[fom_name].append(fom) if self._keep_samples: samples.append(data) return NoisySampleStabilityTesterResult( df=pd.DataFrame(fom_results), samples=samples )