Source code for clusterking.scan.scanner

#!/usr/bin/env python3

""" Scans the NP parameter space in a grid and also q2, producing the
normalized q2 distribution. """

# std
import functools
import multiprocessing
import os
import time
from typing import Callable, Sized, Dict, Iterable, Optional, List
import itertools

# 3rd party
import numpy as np
import pandas as pd

# ours
from clusterking.worker import DataWorker
from import Data
import clusterking.maths.binning
from clusterking.util.metadata import (
from clusterking.util.log import get_logger
from clusterking.result import DataResult

class SpointCalculator(object):
    """A class that holds the function with which we calculate each
    point in sample space. Note that this has to be a separate class from
    Scanner to avoid problems related to multiprocessing's use of the pickle
    library, which are described here:

    def __init__(self):
        # All of these have to be set!
        #: Function to run
        self.func = None
        #: When function should be binned, this should be an array
        #: of the bin edge points, if the function should be sampled, an array
        #: of the sample points.
        self.binning = None
        #: 'sample', 'integrate'
        self.binning_mode = "integrate"
        #: Normalize distribution if binning is specified
        self.normalize = False
        self.kwargs = {}

    # todo: doc
    # todo: ignore static warning
    def _prepare_spoint(self, spoint):
        return spoint

    def calc(self, spoint) -> np.array:
        """Calculates one point in wilson space.

            spoint: Wilson coefficients

            np.array of the integration results

        spoint = self._prepare_spoint(spoint)
        if self.binning is not None:
            if self.binning_mode == "integrate":
                return clusterking.maths.binning.bin_function(
                    functools.partial(self.func, spoint, **self.kwargs),
            elif self.binning_mode == "sample":
                func = functools.partial(self.func, spoint, **self.kwargs)
                res = np.array(list(map(func, self.binning)))
                if self.normalize:
                    res /= sum(res)
                print("results", res)
                return res
            return self.func(spoint, **self.kwargs)

# todo: also allow to disable multiprocessing if there are problems.
[docs]class Scanner(DataWorker): """ This class is set up with a function (specified in :meth:`.set_dfunction`) that depends on points in parameter space and a set of sample points in this parameter space (specified via one of the ``set_spoints_...`` methods). The function is then run for every sample point (in the :meth:`.run` method) and the results are written to a :class:``-like object. Usage example: .. code-block:: python import clusterking as ck def myfunction(parameters, x): return sum(parameters) * x # Initialize Scanner class s = ck.scan.Scanner() # Set the function s.set_dfunction(myfunction) # Set the sample points s.set_spoints_equidist({ "a": (-1, 1, 10), "b": (-1, 1, 10) }) # Initialize a Data class to write to: d = # Run it r = # Write back results to data r.write() """ # ************************************************************************** # Constructor # **************************************************************************
[docs] def __init__(self): """Initializes the :class:`clusterking.scan.Scanner` class.""" super().__init__() # todo: move self.log = get_logger("Scanner") #: Points in wilson space #: Use self.spoints to access this self._spoints = None # type: Optional[np.ndarray] #: Instance of SpointCalculator to perform the claculations of #: the wilson space points. self._spoint_calculator = SpointCalculator() # todo: move = nested_dict()["git"] = version_info(self.log)["time"] = time.strftime("%a %d %b %Y %H:%M", time.gmtime()) # todo: shouldn't that be in metadata? #: Names of the parameters self._coeffs = [] # type: List[str] self._no_workers = None # type: Optional[int] self._progress_bar = True self._tqdm_kwargs = {} self.set_imaginary_prefix("im_")
# ************************************************************************** # Convenience properties # ************************************************************************** @property def imaginary_prefix(self) -> str: """Prefix for the name of imaginary parts of coefficients. Also see e.g. :meth:`.set_spoints_equidist`. Read only. """ return["imaginary_prefix"] @property def spoints(self): """Points in parameter space that are sampled (read-only).""" return self._spoints @property def coeffs(self): """The name of the parameters/coefficients/dimensions of the spoints (read only). Set after spoints are set. Does **not** include the names of the columns of the imaginary parts. """ return self._coeffs.copy() # ************************************************************************** # Settings # **************************************************************************
[docs] def set_progress_bar(self, show: bool, **kwargs) -> None: """Settings for progress bar Args: show: Show progress bar? **kwargs: Keyword arguments for tqdm progress bar Returns: """ self._progress_bar = show self._tqdm_kwargs = kwargs
[docs] def set_dfunction( self, func: Callable, binning: Optional[Sized] = None, sampling: Optional[Sized] = None, normalize=False, xvar="xvar", yvar="yvar", **kwargs ): """Set the function that generates the distributions that are later clustered (e.g. a differential cross section). Args: func: A function that takes the point in parameter space as the first argument (**Note**: The parameters are given in alphabetically order with respect to the parameter name!). It should either return a ``float`` or a ``np.ndarray``. If the ``binning`` or ``sampling`` options are specified, only ``float`` s as return value are allowed. binning: If this parameter is set to an array-like object, we will integrate the function over the specified bins for every point in parameter space. sampling: If this parameter is set to an array-like object, we will apply the function to these points for every point in parameter space. normalize: If a binning is specified, normalize the resulting distribution. xvar: Name of variable on x-axis yvar: Name of variable on y-axis **kwargs: All other keyword arguments are passed to the function. Returns: None """ if normalize and binning is None and sampling is None: raise ValueError( "The setting normalize=True only makes sense if a binning or " "sampling is specified." ) if binning is not None and sampling is not None: raise ValueError("Please specify EITHER sampling OR binning.") # The block below just wants to put some information about the function # in the metadata. Can be ignored if you're only interested in what's # happening. md =["dfunction"] try: md["name"] = func.__name__ md["doc"] = func.__doc__ except AttributeError: try: # For functools.partial objects # noinspection PyUnresolvedReferences md["name"] = "functools.partial({})".format(func.func.__name__) # noinspection PyUnresolvedReferences md["doc"] = func.func.__doc__ except AttributeError: pass md["kwargs"] = failsafe_serialize(kwargs) md["binning"] = binning # This is the important thing: We set all required attributes of the # spoint calculator! self._spoint_calculator.func = func if binning is not None: self._spoint_calculator.binning = binning self._spoint_calculator.binning_mode = "integrate" md["binning"] = list(binning) md["binning_mode"] = "integrate" md["nbins"] = len(binning) - 1 elif sampling is not None: self._spoint_calculator.binning = sampling md["binning"] = list(sampling) self._spoint_calculator.binning_mode = "sample" md["binning_mode"] = "sample" md["nbins"] = len(sampling) md["xvar"] = xvar md["yvar"] = yvar self._spoint_calculator.normalize = normalize self._spoint_calculator.kwargs = kwargs
[docs] def set_spoints_grid(self, values: Dict[str, Iterable[float]]) -> None: """Set a grid of points in sampling space. Args: values: A dictionary of the following form: .. code-block:: python { <coeff name>: [ value_1, ..., value_n ] } where ``value_1``, ..., ``value_n`` can be complex numbers in general. """ # IMPORTANT to keep this order! self._coeffs = sorted(list(values.keys())) # Now we collect all lists of values. values_lists = [values[coeff] for coeff in self._coeffs] # Now we build the cartesian product, i.e. # [a1, a2, ...] x [b1, b2, ...] x ... x [z1, z2, ...] = # [(a1, b1, ..., z1), ..., (a2, b2, ..., z2)] self._spoints = np.array(list(itertools.product(*values_lists)))["spoints"]["grid"] = failsafe_serialize(values)
[docs] def set_spoints_equidist(self, ranges: Dict[str, tuple]) -> None: """Set a list of 'equidistant' points in sampling space. Args: ranges: A dictionary of the following form: .. code-block:: python { <coeff name>: ( <Minimum of coeff>, <Maximum of coeff>, <Number of bins between min and max>, ) } .. note:: In order to add imaginary parts to your coefficients, prepend their name with ``im_`` (you can customize this prefix by setting the :attr:`.imaginary_prefix` attribute to a custom value.) Example: .. code-block:: python s = Scanner() s.set_spoints_equidist( { "a": (-2, 2, 4), "im_a": (-1, 1, 10), }, ... ) Will sample the real part of ``a`` in 4 points between -2 and 2 and the imaginary part of ``a`` in 10 points between -1 and 1. Returns: None """ # Because of our hack with the imaginary prefix, let's first see which # coefficients we really have def is_imaginary(name: str) -> bool: return name.startswith(self.imaginary_prefix) def real_part(name: str) -> str: if is_imaginary(name): return name.replace(self.imaginary_prefix, "", 1) else: return name def imaginary_part(name: str) -> str: if not is_imaginary(name): return self.imaginary_prefix + name else: return name coeffs = list(set(map(real_part, ranges.keys()))) grid_config = {} for coeff in coeffs: # Now let's always collect the values of the real part and of the # imaginary part res = [0.0] ims = [0.0] is_complex = False if real_part(coeff) in ranges: res = list(np.linspace(*ranges[real_part(coeff)])) if imaginary_part(coeff) in ranges: ims = list(np.linspace(*ranges[imaginary_part(coeff)])) is_complex = True # And basically take their cartesian product, alias initialize # the complex number. if is_complex: grid_config[coeff] = [complex(x, y) for x in res for y in ims] else: grid_config[coeff] = res self.set_spoints_grid(grid_config) # Make sure to do this after set_spoints_grid, so we overwrite # the relevant parts. md =["spoints"] md["sampling"] = "equidistant" md["ranges"] = ranges
# todo: Apply to only one dimension?
[docs] def add_spoints_noise(self, generator="gauss", **kwargs) -> None: """Add noise to existing sample points. Args: generator: Random number generator. Default is ``gauss``. Currently supported: ``gauss``. **kwargs: Additional keywords to configure the generator. These keywords are as follows (value assignments are the default values): ``gauss``: ``mean = 0``, ``sigma = 1`` """ if self.spoints is None: raise ValueError( "This method can only be applied after spoints" " have been set." ) if generator == "gauss": gauss_kwargs = {"mean": 0.0, "sigma": 1.0} gauss_kwargs.update(kwargs) rand = np.random.normal( loc=gauss_kwargs["mean"], scale=gauss_kwargs["sigma"], size=self.spoints.shape, ) else: raise ValueError("Unknown generator {}.".format(generator)) if "noise" not in["noise"] = []["noise"].append({"generator": generator, "kwargs": kwargs}) self._spoints += rand
[docs] def set_no_workers(self, no_workers: int) -> None: """Set the number of worker processes to be used. This will usually translate to the number of CPUs being used. Args: no_workers: Number of worker processes Returns: ``None`` """ self._no_workers = no_workers
[docs] def set_imaginary_prefix(self, value: str) -> None: """Set prefix to be used for imaginary parameters in :meth:`set_spoints_grid` and :meth:`set_spoints_equidist`. Args: value: Prefix string Returns: ``None`` """["imaginary_prefix"] = value
# ************************************************************************** # Run # **************************************************************************
[docs] def run(self, data: Data) -> Optional["ScannerResult"]: """Calculate all sample points and writes the result to a dataframe. Args: data: Data object. Returns: :class:`ScannerResult` or None .. warning:: The function set in :meth:`set_dfunction` has to be a globally defined function in order to do multiprocessing, else you will probably run into the error ``Can't pickle local object ...`` that is issued by the python multiprocessing module. If you run into any problems like this, you can always run in single core mode by specifying ``no_workes=1``. """ # todo: rather raise exceptions? if not self._spoints.any(): self.log.error( "No sample points specified. Returning without doing " "anything." ) return if not self._spoint_calculator: self.log.error( "No function specified. Please set it " "using ``Scanner.set_dfunction``. Returning without doing " "anything." ) return no_workers = self._no_workers if not self._no_workers: no_workers = os.cpu_count() if not no_workers: # os.cpu_count() didn't work self.log.warn( "os.cpu_count() not determine number of cores. Fallling " "back to single core mode." ) no_workers = 1 start_time = time.time() if no_workers >= 2: rows = self._run_multicore(no_workers) else: rows = self._run_singlecore() end_time = time.time() run_time = end_time - start_time["run_time"] = run_time return ScannerResult( data=data, rows=rows, spoints=self._spoints,, coeffs=self._coeffs, )
# todo: shouldn't this rather return numpy arrays than List2 def _run_multicore(self, no_workers: int) -> List[List[float]]: """Calculate spoints in parallel processing mode. Args: no_workers: Number of workers. Returns: Rows of the dataframe. """ # pool of worker nodes pool = multiprocessing.Pool(processes=no_workers) # this is the worker function. worker = self._spoint_calculator.calc results = pool.imap(worker, self._spoints) # close the queue for new jobs pool.close() "Started queue with {} job(s) distributed over up to {} " "core(s)/worker(s).".format(len(self._spoints), no_workers) ) rows = [] if self._progress_bar: tqdm_kwargs = dict( desc="Scanning: ", unit=" spoint", total=len(self._spoints) ) tqdm_kwargs.update(self._tqdm_kwargs) iterator =, **tqdm_kwargs) else: iterator = enumerate(results) for index, result in iterator: md =["dfunction"] if not isinstance(result, Iterable): result = [result] if "nbins" not in md: md["nbins"] = len(result) rows.append([*self._spoints[index], *result]) # Wait for completion of all jobs here pool.join() return rows # todo: shouldn't this rather return numpy arrays than List2 def _run_singlecore(self) -> List[List[float]]: """Calculate spoints in single core processing mode. This is sometimes useful because multiprocessing has its quirks. Returns: Rows of the dataframe. """ "Started queue with {} job(s) in single core mode.".format( len(self._spoints) ) ) rows = [] for index, spoint in enumerate(self._spoints), desc="Scanning: ", unit=" spoint", total=len(self._spoints), ): result = self._spoint_calculator.calc(spoint) md =["dfunction"] if not isinstance(result, Iterable): result = [result] if "nbins" not in md: md["nbins"] = len(result) rows.append([*self._spoints[index], *result]) return rows
[docs]class ScannerResult(DataResult):
[docs] def __init__( self, data: Data, rows: List[List[float]], spoints, md, coeffs ): super().__init__(data=data) self._rows = rows self._spoints = spoints = md # type: nested_dict self._coeffs = coeffs
# ************************************************************************** # Convenience properties # ************************************************************************** @property def imaginary_prefix(self) -> str: """Prefix for the name of imaginary parts of coefficients. Also see e.g. :meth:`.set_spoints_equidist`. Read only. """ return["imaginary_prefix"] @property def spoints(self): """Points in parameter space that are sampled (read-only).""" return self._spoints @property def coeffs(self): """The name of the parameters/coefficients/dimensions of the spoints (read only). Set after spoints are set. Does **not** include the names of the columns of the imaginary parts. """ return self._coeffs.copy() # ************************************************************************** # Write # **************************************************************************
[docs] def write(self) -> None: self.log.debug("Converting data to pandas dataframe.") cols = self.coeffs cols.extend( [ "bin{}".format(no_bin) for no_bin in range(["dfunction"]["nbins"]) ] ) # Now we finally write everything to data self._data.df = pd.DataFrame(data=self._rows, columns=cols) # todo: Shouldn't we do that above already? This sounds not so # great performance wise... # Special handling for complex numbers coeffs_with_im = [] for coeff in self.coeffs: coeffs_with_im.append(coeff) if not list(self._data.df[coeff].apply(np.imag).unique()) == [0.0]: values = self._data.df[coeff] self._data.df[coeff] = values.apply(np.real) loc = list(self._data.df.columns).index(coeff) self._data.df.insert( loc + 1, self.imaginary_prefix + coeff, values.apply(np.imag), ) coeffs_with_im.append(self.imaginary_prefix + coeff) else: self._data.df[coeff] = self._data.df[coeff].apply(np.real) = "index" # fixme: Should already be set in worker class["spoints"]["coeffs"] = coeffs_with_im["scan"] ="Integration done.")