Source code for clusterking.scan.scanner

#!/usr/bin/env python3

""" Scans the NP parameter space in a grid and also q2, producing the
normalized q2 distribution. """

# std
import functools
import multiprocessing
import os
import shutil
import time
from typing import Callable, Sized

# 3rd party
import numpy as np
import pandas as pd
import tqdm

# ours
from clusterking.data.data import Data
import clusterking.maths.binning
from clusterking.util.metadata import git_info, failsafe_serialize, nested_dict
from clusterking.util.log import get_logger


class SpointCalculator(object):
    """ A class that holds the function with which we calculate each
    point in wilson space. Note that this has to be a separate class from
    Scanner to avoid problems related to multiprocessing's use of the pickle
    library, which are described here:
    https://stackoverflow.com/questions/1412787/
    """
    def __init__(self, func: Callable, binning: Sized, normalize, kwargs):
        self.dfunc = func
        self.dfunc_binning = binning
        self.dfunc_normalize = normalize
        self.dfunc_kwargs = kwargs

    def calc(self, spoint) -> np.array:
        """Calculates one point in wilson space.

        Args:
            spoint: Wilson coefficients

        Returns:
            np.array of the integration results
        """

        if self.dfunc_binning is not None:
            return clusterking.maths.binning.bin_function(
                functools.partial(self.dfunc, spoint, **self.dfunc_kwargs),
                self.dfunc_binning,
                normalize=self.dfunc_normalize
            )
        else:
            return self.dfunc(spoint, **self.dfunc_kwargs)


[docs]class Scanner(object): # ************************************************************************** # A: Setup # **************************************************************************
[docs] def __init__(self): self.log = get_logger("Scanner") #: Points in wilson space #: Use self.spoints to access this self._spoints = [] #: Instance of SpointCalculator to perform the claculations of #: the wilson space points. self._spoint_calculator = None # type: SpointCalculator self.md = nested_dict() self.md["git"] = git_info(self.log) self.md["time"] = time.strftime( "%a %_d %b %Y %H:%M", time.gmtime() )
# Write only access @property def spoints(self): """ Points in parameter space that are sampled.""" return self._spoints
[docs] def set_dfunction( self, func: Callable, binning: Sized = None, normalize=False, **kwargs ): """ Set the function that generates the distributions that are later clustered (e.g. a differential cross section). Args: func: A function that takes the point in parameter space as the first argument. It should either return a float (if the binning option is specified), or a np.array elsewise. binning: If this parameter is not set (None), we will use the function as is. If it is set to an array-like object, we will integrate the function over the bins specified by this parameter. normalize: If a binning is specified, normalize the resulting distribution **kwargs: All other keyword arguments are passed to the function. Returns: None .. warning:: The function ``func`` has to be a globally defined function, else you will probably run into the error ``Can't pickle local object ...`` that is issued by the python multiprocessing module. """ md = self.md["dfunction"] try: md["name"] = func.__name__ md["doc"] = func.__doc__ except AttributeError: try: # For functools.partial objects # noinspection PyUnresolvedReferences md["name"] = "functools.partial({})".format(func.func.__name__) # noinspection PyUnresolvedReferences md["doc"] = func.func.__doc__ except AttributeError: pass md["kwargs"] = failsafe_serialize(kwargs) if binning is not None: md["nbins"] = len(binning) - 1 # global spoint calculator self._spoint_calculator = SpointCalculator( func, binning, normalize, kwargs )
# todo: implement set_spoints in a more general way here! # ************************************************************************** # B: Run # **************************************************************************
[docs] def run(self, data: Data, no_workers=None) -> None: """Calculate all sample points in parallel and saves the result in self.df. Args: data: Data object. no_workers: Number of worker nodes/cores. Default: Total number of cores. """ if not self._spoints: self.log.error( "No sample points specified. Returning without doing " "anything." ) return if not self._spoint_calculator: self.log.error( "No function specified. Please set it " "using ``Scanner.set_dfunction``. Returning without doing " "anything." ) return if not no_workers: no_workers = os.cpu_count() if not no_workers: # os.cpu_count() didn't work self.log.warn( "os.cpu_count() not determine number of cores. Fallling " "back to no_workser = 1." ) no_workers = 1 # pool of worker nodes pool = multiprocessing.Pool(processes=no_workers) # this is the worker function. worker = self._spoint_calculator.calc results = pool.imap(worker, self._spoints) # close the queue for new jobs pool.close() self.log.info( "Started queue with {} job(s) distributed over up to {} " "core(s)/worker(s).".format(len(self._spoints), no_workers) ) rows = [] for index, result in tqdm.tqdm( enumerate(results), desc="Scanning: ", unit=" spoint", total=len(self._spoints), ncols=min(100, shutil.get_terminal_size((80, 20)).columns) ): md = self.md["dfunction"] if "nbins" not in md: md["nbins"] = len(result) - 1 coeff_values = list(self._spoints[index].wc.values.values()) rows.append([*coeff_values, *result]) # Wait for completion of all jobs here pool.join() self.log.debug("Converting data to pandas dataframe.") # todo: check that there isn't any trouble with sorting. cols = self.md["spoints"]["coeffs"].copy() cols.extend([ "bin{}".format(no_bin) for no_bin in range(self.md["dfunction"]["nbins"]) ]) # Now we finally write everything to data data.df = pd.DataFrame(data=rows, columns=cols) data.df.index.name = "index" data.md["scan"] = self.md self.log.info("Integration done.")