Source code for clusterking.data.dfmd

#!/usr/bin/env python3

# std
import copy
import json
import logging
import pandas as pd
from pathlib import PurePath, Path
from typing import Union

# ours
from clusterking.util.metadata import nested_dict
from clusterking.util.log import get_logger
from clusterking.util.cli import handle_overwrite


# fixme @caveat below: perhaps we should simply do that ourselves then?
#   Unused objects should be garbage collected anyhow
[docs]class DFMD(object): """ This class bundles a pandas dataframe together with metadata and provides methods to load from and write these two to files. """ # todo: Use @classmethod instead of so much logic?
[docs] def __init__(self, *args, log=None, **kwargs): """ There are five different ways to initialize this class: 1. Initialize it empty: ``DFMD()``. 2. From another DFMD object ``my_dfmd``: ``DFMD(my_dfmd)`` or ``DFMD(dfmd=my_dfmd)``. 3. From a directory path and a project name: ``DFMD("path/to/io", "my_name")`` or ``DFMD(directory="path/to/io", name="my_name"`` 4. From a dataframe and a metadata object (a nested dictionary like object) or paths to corresponding files: ``DFMD(df=my_df, md=my_metadata)`` or ``DFMD(df="/path/to/df.csv", md=my_metadata)`` etc. .. warning:: If you use ``df=<pd.DataFrame>`` or ``md=<dict like>``, please be aware that this will not copy these objects, i.e. any changes that are done to these objects subsequently will affect both the original DataFrame/metadata and self.df or self.md. To avoid this, use ``pd.DataFrame.copy()`` or ``dict.copy()`` to create a deepcopy. Args: log: instance of ``logging.Logger`` or name of logger to be created *args: See above **kwargs: See above """ # These are the three attributes of this class #: This will hold all the configuration that we will write out self.md = None #: Pandas dataframe to hold all of the results self.df = None # type: pd.DataFrame #: instance of ``logging.Logger`` self.log = None # First check if the user wants to initialize this class using # positional arguments. Handling of keyword arguments is done below. if len(args) == 0 and len(kwargs) == 0: # Initialize blank self.md = nested_dict() self.df = pd.DataFrame() self.log = None elif len(args) == 1: # Assume that we were given a DFMD object dfmd = args[0] self.md = dfmd.md self.df = dfmd.df self.log = dfmd.log # Handling this here, because it also sets the logger and we have to # be careful which logger specification takes priority if "dfmd" in kwargs: dfmd = kwargs["dfmd"] self.md = dfmd.md self.df = dfmd.df self.log = dfmd.log # Now we can set up the logger (because all other initializations # don't copy it) if isinstance(log, logging.Logger): self.log = log elif isinstance(log, str): self.log = get_logger(log) elif log is None: if not self.log: self.log = get_logger("DFMD") else: raise ValueError( "Unsupported type '{}' for 'log' argument.".format( type(log) ) ) if len(args) == 2: # Assume we initialize from directory and name self.load(directory=args[0], name=args[1]) elif len(args) >= 3: raise ValueError( "Got {} positional parameters and don't know what to do with" " them. Please check the signature of the " "intialization.".format(len(args)) ) # Now we turn to the kwargs # First we check if all keyword arguments are known known_kwargs = { "dfmd", "df", "md", "directory", "name", } unknown_kwargs = set(kwargs) - known_kwargs if unknown_kwargs: raise ValueError( "Unsupported keyword arguments: {}.".format( ", ".join(list(unknown_kwargs)) ) ) mixed_error = ValueError( "It looks like you are mixing initalization signatures. Please " "check the documentation about how to initialize the DFMD class." ) # Now we go through all keyword arguments and try to execute them if # they make sense, else we throw mixed_error. if "df" in kwargs: if self.df is not None: raise mixed_error df = kwargs["df"] if isinstance(df, (PurePath, str)): self.load_df(df) elif isinstance(df, pd.DataFrame): self.df = kwargs["df"] else: raise ValueError( "Unsupported type for df: '{}'.".format(type(df)) ) if "md" in kwargs: if self.md: raise mixed_error md = kwargs["md"] if isinstance(md, (PurePath, str)): self.load_md(md) elif isinstance(md, dict): # fixme: no, we need something more clever, because now it's # just gonna be a normal dict instead of a nested_dict? self.md = md else: raise ValueError( "Unsupported type for df: '{}'.".format(type(md)) ) if "directory" in kwargs: if "name" not in kwargs or self.md or self.df is not None: raise mixed_error self.load(kwargs["directory"], kwargs["name"])
# ************************************************************************** # Paths # **************************************************************************
[docs] @staticmethod def get_df_path(directory: Union[PurePath, str], name: str) -> Path: """ Return path to metadata json file based on directory and project name. Args: directory: Path to input/output directory name: Name of project Returns: Path to metadata json file. """ return Path(directory) / (name + "_data.csv")
[docs] @staticmethod def get_md_path(directory: Union[PurePath, str], name: str) -> Path: """ Return path to dataframe csv file based on directory and project name. Args: directory: Path to input/output directory name: Name of project Returns: Path to dataframe csv file. """ return Path(directory) / (name + "_metadata.json")
# ************************************************************************** # Loading # **************************************************************************
[docs] def load_md(self, md_path: Union[PurePath, str]) -> None: """ Load metadata from json file generated by :py:meth:`~clusterking.data.dfmd.DFMD.write_md`. """ md_path = Path(md_path) self.log.debug("Loading metadata from '{}'.".format( md_path.resolve()) ) with md_path.open() as metadata_file: md = json.load(metadata_file) # Make sure that we still have nested_dict as type for the metadata: self.md = nested_dict() self.md.update(md) self.log.debug("Done.")
[docs] def load_df(self, df_path: Union[PurePath, str]) -> None: """ Load dataframe from csv file creating by :py:meth:`~clusterking.data.dfmd.DFMD.write_md`. """ df_path = Path(df_path) self.log.debug("Loading scanner data from '{}'.".format( df_path.resolve())) with df_path.open() as data_file: self.df = pd.read_csv(data_file) self.df.set_index("index", inplace=True) self.log.debug("Loading done.")
[docs] def load(self, directory: Union[PurePath, str], name: str) -> None: """ Load from input files which have been generated from :py:meth:`~clusterking.data.dfmd.DFMD.write`. Args: directory: Path to input/output directory name: Name of project Returns: None """ self.load_df(self.get_df_path(directory, name)) self.load_md(self.get_md_path(directory, name))
# ************************************************************************** # Writing # **************************************************************************
[docs] def write_md(self, md_path: Union[PurePath, str], overwrite="ask"): """ Write out metadata. The file can later be read in using :py:meth:`~clusterking.data.dfmd.DFMD.load_md`. Args: md_path: overwrite: How to proceed if output file already exists: 'ask', 'overwrite', 'raise' Returns: """ md_path = Path(md_path) self.log.info("Will write metadata to '{}'.".format(md_path)) if not md_path .parent.is_dir(): self.log.debug("Creating directory '{}'.".format(md_path .parent)) md_path .parent.mkdir(parents=True) handle_overwrite([md_path], behavior=overwrite, log=self.log) with md_path.open("w") as metadata_file: json.dump(self.md, metadata_file, sort_keys=True, indent=4) self.log.debug("Done")
[docs] def write_df(self, df_path, overwrite="ask"): """ Write out dataframe. The file can later be read in using :py:meth:`~clusterking.data.dfmd.DFMD.load_df`. Args: df_path: overwrite: How to proceed if output file already exists: 'ask', 'overwrite', 'raise' Returns: """ df_path = Path(df_path) self.log.info("Will write dataframe to '{}'.".format(df_path)) if not df_path.parent.is_dir(): self.log.debug("Creating directory '{}'.".format(df_path.parent)) df_path.parent.mkdir(parents=True) handle_overwrite([df_path], behavior=overwrite, log=self.log) if self.df.empty: self.log.error( "Dataframe seems to be empty. Still writing out anyway." ) with df_path.open("w") as data_file: self.df.to_csv(data_file) self.log.debug("Done")
[docs] def write(self, directory: Union[PurePath, str], name: str, overwrite="ask") -> None: """ Write to input files that can be later loaded with :py:meth:`~clusterking.data.dfmd.DFMD.load`. Args: directory: Path to input/output directory name: Name of project overwrite: How to proceed if output file already exists: 'ask', 'overwrite', 'raise' Returns: """ df_path = self.get_df_path(directory, name) md_path = self.get_md_path(directory, name) handle_overwrite([df_path, md_path], behavior=overwrite, log=self.log) self.write_df(df_path, overwrite="overwrite") self.write_md(md_path, overwrite="overwrite")
[docs] def copy(self, deep=True): """ Make a copy of this object. Args: deep: Make a deep copy (default True). If this is disabled, any change to the copy will also affect the original. Returns: New object. """ if deep: return copy.deepcopy(self) else: return copy.copy(self)
# ************************************************************************** # Magic methods # ************************************************************************** def __copy__(self): return type(self)(df=copy.copy(self.df), md=copy.copy(self.md)) def __deepcopy__(self, memo): new = type(self)( df=copy.deepcopy(self.df, memo), md=copy.deepcopy(self.md, memo) ) memo[id(self)] = new return new