Source code for ismn.filecollection

# The MIT License (MIT)
#
# Copyright (c) 2021 TU Wien
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import logging

import os
from tempfile import gettempdir
from pathlib import Path, PurePosixPath
import numpy as np
from tqdm import tqdm
from typing import Union
from multiprocessing import Pool, cpu_count
from operator import itemgetter
import time
from typing import Tuple
import pandas as pd
from collections import OrderedDict

from ismn.base import IsmnRoot
import ismn.const as const
from ismn.filehandlers import DataFile, StaticMetaFile
from ismn.meta import MetaData, MetaVar, Depth


def _read_station_dir(
    root: Union[IsmnRoot, Path, str],
    stat_dir: Union[Path, str],
    temp_root: Path,
    custom_meta_reader: list,
) -> Tuple[dict, list]:
    """
    Parallelizable function to read metadata for files in station dir
    """
    infos = []

    if not isinstance(root, IsmnRoot):
        proc_root = True
        root = IsmnRoot(root)
    else:
        proc_root = False

    csv = root.find_files(stat_dir, "*.csv")

    try:
        if len(csv) == 0:
            raise const.IsmnFileError(
                "Expected 1 csv file for station, found 0. "
                "Use empty static metadata.")
        else:
            if len(csv) > 1:
                infos.append(
                    f"Expected 1 csv file for station, found {len(csv)}. "
                    f"Use first file in dir.")
            static_meta_file = StaticMetaFile(
                root, csv[0], load_metadata=True, temp_root=temp_root)
            station_meta = static_meta_file.metadata
    except const.IsmnFileError as e:
        infos.append(f"Error loading static meta for station: {e}")
        station_meta = MetaData(
            [MetaVar(k, v) for k, v in const.CSV_META_TEMPLATE.items()])

    data_files = root.find_files(stat_dir, "*.stm")

    filelist = []

    for file_path in data_files:
        try:
            f = DataFile(root, file_path, temp_root=temp_root)
        except Exception as e:
            infos.append(f"Error loading ismn file: {e}")
            continue

        f.metadata.merge(station_meta, inplace=True, exclude_empty=False)

        f.metadata = f.metadata.best_meta_for_depth(
            Depth(
                f.metadata["instrument"].depth.start,
                f.metadata["instrument"].depth.end,
            ))

        # If custom metadata readers are available
        if custom_meta_reader is not None:
            for cmr in np.atleast_1d(custom_meta_reader):
                cmeta = cmr.read_metadata(f.metadata)
                if isinstance(cmeta, dict):
                    cmeta = MetaData([MetaVar(k, v) for k, v in cmeta.items()])
                if cmeta is not None:
                    f.metadata.merge(cmeta, inplace=True)

        network = f.metadata["network"].val
        station = f.metadata["station"].val

        filelist.append((network, station, f))

        infos.append(f"Processed file {file_path}")

    if proc_root:
        root.close()

    return filelist, infos


def _load_metadata_df(meta_csv_file: Union[str, Path]) -> pd.DataFrame:
    """
    Load metadata data frame from csv file
    """

    metadata_df = pd.read_csv(
        meta_csv_file,
        index_col=0,
        header=[0, 1],
        low_memory=False,
        engine="c")

    # parse date cols as datetime
    for col in ["timerange_from", "timerange_to"]:
        metadata_df[col, "val"] = pd.to_datetime(metadata_df[col, "val"])

    lvars = []
    for c in metadata_df.columns:
        if c[0] not in lvars:
            lvars.append(c[0])

    # we assume triples for all vars except these, so they must be at the end
    assert lvars[-2:] == [
        "file_path",
        "file_type",
    ], "file_type and file_path must be at the end."

    metadata_df.index.name = "idx"

    return metadata_df


[docs]class IsmnFileCollection(object): """ The IsmnFileCollection class contains a list of file handlers to access data in the given data directory. The file list can be loaded from a previously stored csv file, or built by iterating over all files in the data root. This class also contains function to load filehandlers for certain networks only. Attributes ---------- root : IsmnRoot Root object where data is stored. filelist : collections.OrderedDict A collection of filehandlers and network names temp_root : Path Temporary root dir. """ def __init__(self, root, filelist, temp_root=gettempdir()): """ Parameters ---------- root : IsmnRoot Root object where data is stored. filelist : collections.OrderedDict A collection of filehandler stored in lists with network name as key. temp_root : Path or str, optional (default : gettempdir()) Root directory where a separate subdir for temporary files will be created (and deleted). """ self.root = root self.filelist = filelist self.temp_root = Path(temp_root) os.makedirs(self.temp_root, exist_ok=True) def __repr__(self): return f"{self.__class__.__name__} for {len(self.filelist.keys())} Networks"
[docs] @classmethod def build_from_scratch( cls, data_root, parallel=True, log_path=None, temp_root=gettempdir(), custom_meta_readers=None, ): """ Parameters ---------- data_root : IsmnRoot or str or Path Root path of ISMN files or path to metadata pkl file. i.e. path to the downloaded zip file or the extracted zip directory (faster) or a file list that contains these infos already. parallel : bool, optional (default: True) Speed up metadata collecting with multiple processes. log_path : str or Path, optional (default: None) Path where the log file is created. If None is set, no log file will be written. temp_root : str or Path, (default: gettempdir()) Temporary folder where extracted data is copied during reading from zip archive. custom_meta_readers: tuple, optional (default: None) Custom metadata readers """ t0 = time.time() if isinstance(data_root, IsmnRoot): root = data_root else: root = IsmnRoot(data_root) os.makedirs(temp_root, exist_ok=True) if log_path is not None: log_file = os.path.join(log_path, f"{root.name}.log") else: log_file = None if log_file: os.makedirs(os.path.dirname(log_file), exist_ok=True) logging.basicConfig( filename=log_file, level=logging.INFO, format="%(levelname)s %(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) n_proc = 1 if not parallel else cpu_count() logging.info(f"Collecting metadata with {n_proc} processes.") if not parallel: hint = 'Hint: Use `parallel=True` to speed up metadata ' \ 'generation for large datasets' else: hint = '' print( f"Processing metadata for all ismn stations into folder " f"{root.path}.\n" f"This may take a few minutes, but is only done once...\n{hint}" ) process_stat_dirs = [] for net_dir, stat_dirs in root.cont.items(): process_stat_dirs += list(stat_dirs) args = [(root.path if root.zip else root, d, temp_root, custom_meta_readers) for d in process_stat_dirs] pbar = tqdm(total=len(args), desc="Files Processed") fl_elements = [] def update(r): net_stat_fh, infos = r for i in infos: logging.info(i) for elements in net_stat_fh: fl_elements.append(elements) pbar.update() def error(e): logging.error(e) pbar.update() if n_proc == 1: for arg in args: try: r = _read_station_dir(*arg) update(r) except Exception as e: error(e) else: with Pool(n_proc) as pool: for arg in args: pool.apply_async( _read_station_dir, arg, callback=update, error_callback=error, ) pool.close() pool.join() pbar.close() fl_elements.sort(key=itemgetter(0, 1)) # sort by net name, stat name # to ensure alphabetical order... not sure if necessary, slow? filelist = OrderedDict([]) for net, stat, fh in fl_elements: if net not in filelist.keys(): filelist[net] = [] filelist[net].append(fh) t1 = time.time() info = f"Metadata generation finished after {int(t1-t0)} Seconds." if log_file is not None: info += f"\nMetadata and Log stored in {os.path.dirname(log_file)}" logging.info(info) print(info) return cls(root, filelist=filelist)
[docs] @classmethod def from_metadata_df(cls, data_root, metadata_df, temp_root=gettempdir()): """ Load a previously created and stored filelist from :func:`ismn.filecollection.IsmnFileCollection.to_metadata_csv` Parameters ---------- data_root : IsmnRoot or str or Path Path where the ismn data is stored, can also be a zip file metadata_df : pd.DataFrame Metadata frame temp_root : str or Path, optional (default: gettempdir()) Temporary folder where extracted data is copied during reading from zip archive. """ if isinstance(data_root, IsmnRoot): root = data_root else: root = IsmnRoot(data_root) filelist = OrderedDict([]) columns = np.array(list(metadata_df.columns)) for i, row in enumerate(metadata_df.values): # this_nw = row.loc['network', 'val'] vars = np.unique(columns[:-2][:, 0]) vals = row[:-2].reshape(-1, 3) metadata = MetaData([ MetaVar.from_tuple( (vars[i], vals[i][2], vals[i][0], vals[i][1])) for i in range(len(vars)) ]) f = DataFile( root=root, file_path=Path(str(PurePosixPath(row[-2]))), load_metadata=False, temp_root=temp_root, verify_filepath=False, verify_temp_root=False, ) f.metadata = metadata f.file_type = row[-1] this_nw = f.metadata["network"].val if this_nw not in filelist.keys(): filelist[this_nw] = [] filelist[this_nw].append(f) cls.metadata_df = metadata_df return cls(root, filelist=filelist)
[docs] @classmethod def from_metadata_csv(cls, data_root, meta_csv_file, network=None, temp_root=gettempdir()): """ Load a previously created and stored filelist from :func:`ismn.filecollection.IsmnFileCollection.to_metadata_csv` Parameters ---------- data_root : IsmnRoot or str or Path Path where the ismn data is stored, can also be a zip file meta_csv_file : str or Path Csv file where the metadata is stored. network : list, optional (default: None) List of networks that are considered. Filehandlers for other networks are set to None. temp_root : str or Path, optional (default: gettempdir()) Temporary folder where extracted data is copied during reading from zip archive. """ if network is not None: network = np.atleast_1d(network) print(f"Found existing ismn metadata in {meta_csv_file}.") metadata_df = _load_metadata_df(meta_csv_file) if network is not None: metadata_df = metadata_df[np.isin(metadata_df["network"].values, network)] metadata_df.index = range(len(metadata_df.index)) return cls.from_metadata_df( data_root, metadata_df, temp_root=temp_root)
[docs] def to_metadata_csv(self, meta_csv_file): """ Write filehandle metadata from filelist to metdata csv that contains ALL metadata / variables of the filehander. Can be read back in as filelist with filehandlers using :func:`ismn.filecollection.IsmnFileCollection.from_metadata_csv`. Parameters ---------- meta_csv_file : Path or str, optional (default: None) Directory where the csv file with the correct name is crated """ dfs = [] for i, filehandler in enumerate(self.iter_filehandlers()): df = filehandler.metadata.to_pd(True, dropna=False) df[("file_path", "val")] = str(PurePosixPath(filehandler.file_path)) df[("file_type", "val")] = filehandler.file_type df.index = [i] dfs.append(df) i += 1 dfs = pd.concat(dfs, axis=0, sort=True) cols_end = ["file_path", "file_type"] dfs = dfs[[c for c in dfs.columns if c[0] not in cols_end] + [c for c in dfs.columns if c[0] in cols_end]] dfs = dfs.infer_objects(copy=False).fillna(np.nan) os.makedirs(Path(os.path.dirname(meta_csv_file)), exist_ok=True) dfs.to_csv(meta_csv_file)
[docs] def get_filehandler(self, idx): """ Get the nth filehandler in a list of all filehandlers for all networks. e.g. if there are 2 networks, with 3 filehandlers/sensors each, idx=4 will return the first filehandler of the second network. Parameters ---------- idx: int Index of filehandler to read. Returns ------- filehandler : DataFile nth filehandler of all filehandlers in the sorted list. """ fs = 0 for net, files in self.filelist.items(): l = len(files) if fs + l > idx: return files[idx - fs] else: fs += l
[docs] def iter_filehandlers(self, networks=None): """ Iterator over files for networks Parameters ---------- networks : list, optional (default: None) Name of networks to get files for, or None to use all networks. Yields ------- file : DataFile Filehandler with metadata """ for net, files in self.filelist.items(): if (networks is None) or (net in networks): for f in files: yield f yield from () # in case networks is an empty list
[docs] def close(self): # close root and all filehandlers self.root.close() for f in self.iter_filehandlers(): f.close()