Source code for clusterking.plots.plot_bundles

#!/usr/bin/env python3

# std
import logging
import random
from typing import List, Iterable, Union, Optional, Dict, Any
from distutils.version import StrictVersion

# 3rd party
import matplotlib.pyplot as plt
import matplotlib
import numpy as np
import pandas as pd
import matplotlib.animation as animation

# ours
from clusterking.util.log import get_logger
from clusterking.plots.plot_histogram import plot_histogram, plot_histogram_fill
from clusterking.plots.colors import ColorScheme


def get_random_indizes(start: int, stop: int, n: int) -> List[int]:
    """Generate a list of n distinct (!) random integers.

    Args:
        start: Minimum of index (start <= index)
        stop: Maximum of index (index < stop)
        n: Number of distinct random indizes to be generated

    Returns:
        List `number` many (different) random indizes
    """
    indizes = set()
    iterations = 0
    while len(indizes) < n:
        indizes.add(random.randrange(start, stop))
        if iterations >= 10 * n:
            print(
                "Did not manage to generate enough different random "
                "integers (only {} of {}).".format(len(indizes), n)
            )
            break
    return sorted(list(indizes))


[docs]class BundlePlot(object): """Plotting class to plot distributions by cluster in order to analyse which distributions get assigned to which cluster."""
[docs] def __init__(self, data): """ Args: data: :py:class:`~clusterking.data.data.Data` object """ #: logging.Logger object self.log = get_logger("BundlePlot", sh_level=logging.WARNING) #: pandas dataframe self.data = data #: Name of the column holding the cluster number self.cluster_column = "cluster" self.bpoint_column = "bpoint" #: Color scheme # fixme: this will be problematic if I reinitialize this if self._has_clusters: self.color_scheme = ColorScheme(self._clusters) else: self.color_scheme = ColorScheme([0]) #: Draw legend? self.draw_legend = True #: Override default titles with this title. If None, the default title #: is used. self.title = None #: Instance of matplotlib.axes.Axes self.ax = None
@property def fig(self): """Instance of matplotlib.pyplot.figure""" return self.ax.get_figure() @property def xrange(self): """Range of the xaxis""" return self.data._dist_xrange @property def xlabel(self): if self.data._dist_vars[0]: return self.data._get_axis_label(self.data._dist_vars[0]) return None @property def ylabel(self): if self.data._dist_vars[1]: return self.data._get_axis_label(self.data._dist_vars[1]) return None @property def _bins(self): if self.data.md["scan"]["dfunction"]["binning_mode"] == "integrate": return self.data.md["scan"]["dfunction"]["binning"] return np.array(range(0, self.data.nbins + 1)) # ************************************************************************** # Internal helpers # ************************************************************************** @property def _has_bpoints(self): """Do we have benchmark points?""" return self.bpoint_column in self.data.df.columns @property def _has_clusters(self): """Do we have clustered data?""" return self.cluster_column in self.data.df.columns @property def _clusters(self): """Return array of all distinct clusters.""" return self.data.clusters(cluster_column=self.cluster_column) def _filter_clusters(self, clusters: Iterable[int]) -> List[int]: """Return list of existing clusters only.""" clusters = list(set(clusters)) selection = [c for c in clusters if c in self._clusters] removed = [c for c in clusters if c not in self._clusters] if removed: self.log.warning( "The cluster(s) {} does not exist in data, " "so I removed them.".format( ", ".join(map(str, sorted(removed))) ) ) return selection def _interpret_cluster_input(self, clusters=None) -> List[int]: """Flexible handling of user specifications for clusters. Args: clusters: Either None (all clusters) a single int (just that cluster or a list of clusters). Returns: List of selected clusters. If no clusters are available at all, an empty list is returned. Raises: If clusters are requested but None are available, a ValueError is raised """ if not self._has_clusters: if clusters is None: return [] else: raise ValueError( "No cluster information available, but individual clusters" " were requested." ) if isinstance(clusters, int): clusters = [clusters] if not clusters: clusters = self._clusters return self._filter_clusters(clusters) # todo: getting the bpoint should be a different function def _get_df_cluster( self, cluster: Union[None, int], bpoint=None, bpoint_return_index=False ) -> pd.DataFrame: """Return only the rows corresponding to one cluster in the dataframe and only the columns that correspond to the bins. Args: cluster: Name of the cluster. If ``None``, no cluster selection will be done. bpoint: If True, return benchmark point, if False, return all non- benchmark points, if None, return everything. Returns: pandas.DataFrame as described above """ # to avoid long line: cc = self.cluster_column bc = self.data.bin_cols if cluster is not None: df = self.data.df[self.data.df[cc] == cluster] else: df = self.data.df if bpoint is None: return df[bc] elif bpoint is False: if self._has_bpoints: return df[df[self.bpoint_column] == False][bc] else: return df[bc] elif bpoint is True: if self._has_bpoints: df_bp = df[df[self.bpoint_column] == True] assert len(df_bp) == 1 if not bpoint_return_index: return df_bp[bc] else: return df_bp[bc], df_bp.index[0] else: return pd.DataFrame() else: raise ValueError("Invalid argument bpoint=={}".format(bpoint)) def _get_df_cluster_err_high(self, index): indices = list(self.data.df.index) loc = indices.index(index) # todo: this is horribly inefficient return self.data.err()[loc] def _get_df_cluster_err_low(self, *args, **kwargs): return self._get_df_cluster_err_high(*args, **kwargs) def _set_ax(self, ax, title): """Set up axes.""" if len(self.data.df) == 0: raise ValueError( "No data to plot. Please check if your dataframe contains " "any row." ) if self.title is not None: title = self.title if not ax: fig, ax = plt.subplots() self.ax = ax ax.set_title(title) if self.xlabel: ax.set_xlabel(self.xlabel) if self.ylabel: ax.set_ylabel(self.ylabel) # ************************************************************************** # Plots # ************************************************************************** # -------------------------------------------------------------------------- # Legend # -------------------------------------------------------------------------- def _draw_legend(self, clusters=None): # todo: Should be multi column legend if we have too many patches... if not self._has_clusters: return if not self.draw_legend: return clusters = self._interpret_cluster_input(clusters) if len(clusters) <= 1: return legend_elements = [] for cluster in clusters: color = self.color_scheme.get_cluster_color(cluster) # pycharm can't seem to find patches: # noinspection PyUnresolvedReferences p = matplotlib.patches.Patch( facecolor=color, edgecolor=color, label=cluster ) legend_elements.append(p) self.ax.legend( handles=legend_elements, loc="best", title="Clusters", frameon=False ) # -------------------------------------------------------------------------- # Benchmark points + more lines # -------------------------------------------------------------------------- def _plot_bundles( self, cluster: Union[None, int], nlines=0, benchmark=True, hist_kwargs: Optional[Dict[str, Any]] = None, hist_kwargs_bp: Optional[Dict[str, Any]] = None, ) -> None: """Main implementation of self.plot_bundles (private method). This method will be called for each cluster in self.plot_bundles. Args: cluster: Number of cluster to be plotted nlines: Number of example distributions of the cluster to be plotted hist_kwargs: See :meth:`plot_bundles`, hist_kwargs_bp: See :meth:`plot_bundles` Returns: None """ if hist_kwargs is None: hist_kwargs = {} if hist_kwargs_bp is None: hist_kwargs = hist_kwargs.copy() df_cluster_no_bp = self._get_df_cluster(cluster, bpoint=False) if len(df_cluster_no_bp) < nlines: self.log.warning( "Not enough rows for cluster {} " "Only plotting {} lines.".format(cluster, len(df_cluster_no_bp)) ) nlines = len(df_cluster_no_bp) df_cluster_bp = self._get_df_cluster(cluster, bpoint=True) indizes = get_random_indizes(0, len(df_cluster_no_bp), nlines) if cluster is None: color = self.color_scheme.get_cluster_color(0) colors = self.color_scheme.get_cluster_colors_faded(0, nlines) else: color = self.color_scheme.get_cluster_color(cluster) colors = self.color_scheme.get_cluster_colors_faded(cluster, nlines) if nlines == 1 and not benchmark: # Do not use faded out color if we just plot one line colors = [color] for i, index in enumerate(indizes): data = np.squeeze(df_cluster_no_bp.iloc[[index]].values) plot_histogram( self.ax, self._bins, data, color=colors[i], linestyle="-", **hist_kwargs ) if self._has_bpoints and benchmark: plot_histogram( self.ax, self._bins, df_cluster_bp.values, color=color, **hist_kwargs )
[docs] def plot_bundles( self, clusters: Optional[Union[None, int, Iterable[int]]] = None, nlines=None, ax=None, bpoints=True, hist_kwargs: Optional[Dict[str, Any]] = None, hist_kwargs_bp: Optional[Dict[str, Any]] = None, ) -> None: """Plot several examples of distributions for each cluster specified Args: clusters: List of clusters to selected or single cluster. If None (default), all clusters are chosen. nlines: Number of example distributions of each cluster to be plotted. Defaults to 0 if we plot benchmark points and 3 otherwise. ax: Instance of matplotlib.axes.Axes to be plotted on. If None (default), a new axes object and figure is initialized and saved as self.ax and self.fig. bpoints: Draw benchmark curve hist_kwargs: Keyword arguments passed on to :meth:`~clusterking.plots.plot_histogram.plot_histogram` hist_kwargs_bp: Like ``hist_kwargs`` but used for benchmark points. If ``None``, ``hist_kwargs`` is used. Returns: None """ clusters = self._interpret_cluster_input(clusters) if nlines is None: if self._has_bpoints and bpoints: nlines = 0 else: nlines = 3 _title = [] if self._has_bpoints: _title.append("benchmark point(s)") if nlines: if self._has_bpoints: _title.append("+") _title.append("{} sample point(s) ".format(nlines)) if clusters: _title.append( "for cluster(s) {}".format( ", ".join(map(str, sorted(clusters))) ) ) self._set_ax(ax, " ".join(_title)) # pycharm might be confused about the type of `clusters`: # noinspection PyTypeChecker for cluster in clusters: self._plot_bundles( cluster, nlines=nlines, benchmark=bpoints, hist_kwargs=hist_kwargs, hist_kwargs_bp=hist_kwargs_bp, ) if not clusters: self._plot_bundles( cluster=None, nlines=nlines, benchmark=False, hist_kwargs=hist_kwargs, hist_kwargs_bp=hist_kwargs_bp, ) self._draw_legend(clusters)
# todo: doc
[docs] def animate_bundle(self, cluster, n, benchmark=True): # There seems to be some underlying magic here with fig fig = plt.figure() ax = fig.gca() self.ax = ax linestyle = "-" if benchmark: self._plot_bundles(cluster, 0, benchmark=True) linestyle = "--" ims = [] df_cluster_no_bp = self._get_df_cluster(cluster, bpoint=False) color = self.color_scheme.get_cluster_color(cluster) for i in range(n): index = random.randrange(0, len(df_cluster_no_bp)) contents = np.squeeze(df_cluster_no_bp.iloc[[index]].values) contents = np.append(contents, contents[-1]) edges = np.arange(len(contents)) ims.append( plt.step( edges, contents, where="post", color=color, linestyle=linestyle, ) ) # self._set_ax(None, "Animated sample points") anim = animation.ArtistAnimation( fig, ims, interval=500, repeat_delay=3000, blit=True ) # In order to display this in the notebook, use # from IPython.display import HTML # HTML(anim.to_html5_video()) return anim
# -------------------------------------------------------------------------- # Minima/Maxima of bin content for each cluster # -------------------------------------------------------------------------- def _plot_minmax( self, cluster: Union[None, int], bpoints=True, hist_kwargs: Optional[Dict[str, Any]] = None, fill_kwargs: Optional[Dict[str, Any]] = None, ) -> None: """Main implementation of self.plot_minmax. This method will be called for each cluster in self.plot_minmax. Args: cluster: Name of cluster to be plotted or None if there are no clusters bpoints: Plot benchmark points hist_kwargs: See :meth:`plot_minmax` fill_kwargs: See :meth:`plot_minmax` Returns: None """ df_cluster = self._get_df_cluster(cluster) maxima = list(df_cluster.max().values) minima = list(df_cluster.min().values) if fill_kwargs is None: fill_kwargs = {} if cluster is not None: color = self.color_scheme.get_cluster_color(cluster) else: color = self.color_scheme.get_cluster_color(0) for i in range(len(maxima)): x = self._bins[i : (i + 2)] y1 = [minima[i], minima[i]] y2 = [maxima[i], maxima[i]] fb_kwargs = dict( facecolor=color, interpolate=False, alpha=0.3, hatch="////", color=color, ) fb_kwargs.update(fill_kwargs) self.ax.fill_between(x, y1, y2, **fb_kwargs) if bpoints: self._plot_bundles( cluster, nlines=0, benchmark=True, hist_kwargs=hist_kwargs )
[docs] def plot_minmax( self, clusters: Optional[Union[int, Iterable[int]]] = None, ax=None, bpoints=True, hist_kwargs: Optional[Dict[str, Any]] = None, fill_kwargs: Optional[Dict[str, Any]] = None, ) -> None: """Plot the minimum and maximum of each bin for the specified clusters. Args: clusters: List of clusters to selected or single cluster. If None (default), all clusters are chosen. ax: Instance of ``matplotlib.axes.Axes`` to plot on. If None, a new one is instantiated. bpoints: Plot benchmark points hist_kwargs: Keyword arguments to :meth:`~clusterking.plots.plot_histogram.plot_histogram` fill_kwargs: Keyword arguments to`matplotlib.pyplot.fill_between` Returns: None """ clusters = self._interpret_cluster_input(clusters) _title = ["Minima and maxima of the bin contents"] if self._has_clusters: _title.append( "for cluster(s) {}".format( ", ".join(map(str, sorted(clusters))) ) ) self._set_ax(ax, " ".join(_title)) # pycharm might be confused about the type of `clusters`: # noinspection PyTypeChecker for cluster in clusters: self._plot_minmax( cluster, bpoints=bpoints, hist_kwargs=hist_kwargs, fill_kwargs=fill_kwargs, ) if not clusters: self._plot_minmax( None, bpoints=bpoints, hist_kwargs=hist_kwargs, fill_kwargs=fill_kwargs, ) self._draw_legend(clusters)
# -------------------------------------------------------------------------- # Plot with errors # -------------------------------------------------------------------------- def _err_plot( self, cluster: Union[None, int], bpoints=True, hist_kwargs: Optional[Dict[str, Any]] = None, hist_fill_kwargs: Optional[Dict[str, Any]] = None, ) -> None: """Main implementation of :meth:`err_plot`` Args: cluster: Name of cluster to be plotted or None if there are no clusters bpoints: Plot benchmark points? If False or benchmark points are not available, distributions corresponding to random sample points are chosen. hist_kwargs: See :meth:`err_plot` hist_fill_kwargs: See :meth:`err_plot` Returns: None """ if hist_kwargs is None: hist_kwargs = {} if hist_fill_kwargs is None: hist_fill_kwargs = {} if bpoints and self._has_bpoints: data, index = self._get_df_cluster( cluster, bpoint=True, bpoint_return_index=True ) else: df_cluster_no_bp = self._get_df_cluster(cluster, bpoint=False) i = get_random_indizes(0, len(df_cluster_no_bp), 1)[0] data = df_cluster_no_bp.iloc[[i]] index = data.index[0] data = np.squeeze(data.values) err_high = self._get_df_cluster_err_high(index=index) err_low = self._get_df_cluster_err_low(index=index) if cluster is not None: color = self.color_scheme.get_cluster_color(cluster) light_color = self.color_scheme.get_err_color(cluster) else: color = self.color_scheme.get_cluster_color(0) light_color = self.color_scheme.get_err_color(0) hist_kw = dict(color=color, linestyle="-") hist_kw.update(hist_kwargs) plot_histogram(self.ax, self._bins, data, **hist_kw) hf_kw = dict(color=light_color) hf_kw.update(hist_fill_kwargs) plot_histogram_fill( self.ax, self._bins, data - err_low, data + err_high, **hf_kw )
[docs] def err_plot( self, clusters: Optional[Union[None, int, Iterable[int]]] = None, ax=None, bpoints=True, hist_kwargs: Optional[Dict[str, Any]] = None, hist_fill_kwargs: Optional[Dict[str, Any]] = None, ): """Plot distributions with errors. Args: clusters: List of clusters to selected or single cluster. If None (default), all clusters are chosen. ax: Instance of `matplotlib.axes.Axes` to plot on. If None, a new one is instantiated. bpoints: Plot benchmark points? If False or benchmark points are not available, distributions corresponding to random sample points are chosen. hist_kwargs: Keyword arguments to :meth:`~clusterking.plots.plot_histogram.plot_histogram` hist_fill_kwargs: Keyword arguments to :meth:`~clusterking.plots.plot_histogram.plot_histogram_fill` Returns: None """ clusters = self._interpret_cluster_input(clusters) _title = [] if self._has_bpoints and bpoints: _title.append("Benchmark point") else: _title.append("Random sample point") if clusters: _title.append( "for cluster(s) {}".format( ", ".join(map(str, sorted(clusters))) ) ) self._set_ax(ax, " ".join(_title)) # pycharm might be confused about the type of `clusters`: # noinspection PyTypeChecker for cluster in clusters: self._err_plot( cluster, bpoints=bpoints, hist_kwargs=hist_kwargs, hist_fill_kwargs=hist_fill_kwargs, ) if not clusters: self._err_plot( cluster=None, bpoints=False, hist_kwargs=hist_kwargs, hist_fill_kwargs=hist_fill_kwargs, ) self._draw_legend(clusters)
# -------------------------------------------------------------------------- # Box plots # -------------------------------------------------------------------------- def _box_plot( self, cluster, whiskers=1.5, bpoints=True, boxplot_kwargs=None, hist_kwargs=None, ) -> None: """Main implementation of :meth:`box_plot`. Gets called for every cluster specified in :meth:`box_plot`. Args: cluster: Name of cluster to be plotted whiskers: Length of the whiskers of the box plot. See self.box_plot for more information. Default: 1.5 (matplotlib default) boxplot_kwargs: See :meth:`box_plot` hist_kwargs: See :meth:`box_plot` Returns: None """ if boxplot_kwargs is None: boxplot_kwargs = {} if hist_kwargs is None: hist_kwargs = {} df_cluster = self._get_df_cluster(cluster) data = df_cluster.values if cluster is not None: color = self.color_scheme.get_cluster_color(cluster) else: color = self.color_scheme.get_cluster_color(0) bins = self._bins positions = 1 / 2 * (np.array(bins[1:]) + np.array(bins[:-1])) boxplot_options = dict( notch=False, positions=positions, # np.array(range(len(data.T))) + 0.5, vert=True, patch_artist=True, boxprops=dict(facecolor=color, color=color, alpha=0.3), capprops=dict(color=color), whiskerprops=dict(color=color), flierprops=dict(color=color, markeredgecolor=color), medianprops=dict(color=color), whis=whiskers, # extend the range of the whiskers ) boxplot_options.update(boxplot_kwargs) if StrictVersion(matplotlib.__version__) < StrictVersion("3.1"): boxplot_options["manage_xticks"] = False else: boxplot_options["manage_ticks"] = False self.ax.boxplot(data, **boxplot_options) if bpoints: self._plot_bundles(cluster, nlines=0, hist_kwargs=hist_kwargs)
[docs] def box_plot( self, clusters: Optional[Union[int, Iterable[int]]] = None, ax=None, whiskers=2.5, bpoints=True, boxplot_kwargs: Optional[Dict[str, Any]] = None, hist_kwargs: Optional[Dict[str, Any]] = None, ) -> None: """Box plot of the bin contents of the distributions corresponding to selected clusters. Args: clusters: List of clusters to selected or single cluster. If None (default), all clusters are chosen. ax: Instance of matplotlib.axes.Axes to plot on. If None, a new one is instantiated. whiskers: Length of the whiskers of the box plot in units of IQR (interquartile range, containing 50% of all values). Default 2.5. bpoints: Draw benchmarks? boxplot_kwargs: Arguments to `matplotlib.pyplot.boxplot` hist_kwargs: Keyword arguments to :meth:`~clusterking.plots.plot_histogram.plot_histogram` """ clusters = self._interpret_cluster_input(clusters) _title = ["Box plot of the bin contents"] if self._has_clusters: _title.append( "for cluster(s) {}".format( ", ".join(map(str, sorted(clusters))) ) ) _title.append("\nWhisker length set to {}*IQR".format(whiskers)) self._set_ax(ax, " ".join(_title)) # pycharm might be confused about the type of `clusters`: # noinspection PyTypeChecker for cluster in clusters: self._box_plot( cluster, whiskers=whiskers, bpoints=bpoints, boxplot_kwargs=boxplot_kwargs, ) if not clusters: self._box_plot( None, whiskers=whiskers, bpoints=bpoints, hist_kwargs=hist_kwargs, ) self._draw_legend(clusters)