# The MIT License (MIT)
#
# Copyright (c) 2021 TU Wien
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import logging
import os
from tempfile import gettempdir
from pathlib import Path, PurePosixPath
import numpy as np
from tqdm import tqdm
from typing import Union
from multiprocessing import Pool, cpu_count
from operator import itemgetter
import time
from typing import Tuple
import pandas as pd
from collections import OrderedDict
from ismn.base import IsmnRoot
import ismn.const as const
from ismn.filehandlers import DataFile, StaticMetaFile
from ismn.meta import MetaData, MetaVar, Depth
def _read_station_dir(
root: Union[IsmnRoot, Path, str],
stat_dir: Union[Path, str],
temp_root: Path,
custom_meta_reader: list,
) -> Tuple[dict, list]:
"""
Parallelizable function to read metadata for files in station dir
"""
infos = []
if not isinstance(root, IsmnRoot):
proc_root = True
root = IsmnRoot(root)
else:
proc_root = False
csv = root.find_files(stat_dir, "*.csv")
try:
if len(csv) == 0:
raise const.IsmnFileError(
"Expected 1 csv file for station, found 0. "
"Use empty static metadata.")
else:
if len(csv) > 1:
infos.append(
f"Expected 1 csv file for station, found {len(csv)}. "
f"Use first file in dir.")
static_meta_file = StaticMetaFile(
root, csv[0], load_metadata=True, temp_root=temp_root)
station_meta = static_meta_file.metadata
except const.IsmnFileError as e:
infos.append(f"Error loading static meta for station: {e}")
station_meta = MetaData(
[MetaVar(k, v) for k, v in const.CSV_META_TEMPLATE.items()])
data_files = root.find_files(stat_dir, "*.stm")
filelist = []
for file_path in data_files:
try:
f = DataFile(root, file_path, temp_root=temp_root)
except Exception as e:
infos.append(f"Error loading ismn file: {e}")
continue
f.metadata.merge(station_meta, inplace=True, exclude_empty=False)
f.metadata = f.metadata.best_meta_for_depth(
Depth(
f.metadata["instrument"].depth.start,
f.metadata["instrument"].depth.end,
))
# If custom metadata readers are available
if custom_meta_reader is not None:
for cmr in np.atleast_1d(custom_meta_reader):
cmeta = cmr.read_metadata(f.metadata)
if isinstance(cmeta, dict):
cmeta = MetaData([MetaVar(k, v) for k, v in cmeta.items()])
if cmeta is not None:
f.metadata.merge(cmeta, inplace=True)
network = f.metadata["network"].val
station = f.metadata["station"].val
filelist.append((network, station, f))
infos.append(f"Processed file {file_path}")
if proc_root:
root.close()
return filelist, infos
def _load_metadata_df(meta_csv_file: Union[str, Path]) -> pd.DataFrame:
"""
Load metadata data frame from csv file
"""
metadata_df = pd.read_csv(
meta_csv_file,
index_col=0,
header=[0, 1],
low_memory=False,
engine="c")
# parse date cols as datetime
for col in ["timerange_from", "timerange_to"]:
metadata_df[col, "val"] = pd.to_datetime(metadata_df[col, "val"])
lvars = []
for c in metadata_df.columns:
if c[0] not in lvars:
lvars.append(c[0])
# we assume triples for all vars except these, so they must be at the end
assert lvars[-2:] == [
"file_path",
"file_type",
], "file_type and file_path must be at the end."
metadata_df.index.name = "idx"
return metadata_df
[docs]class IsmnFileCollection(object):
"""
The IsmnFileCollection class contains a list of file handlers to access data
in the given data directory. The file list can be loaded from a previously
stored csv file, or built by iterating over all files in the data root.
This class also contains function to load filehandlers for certain networks
only.
Attributes
----------
root : IsmnRoot
Root object where data is stored.
filelist : collections.OrderedDict
A collection of filehandlers and network names
temp_root : Path
Temporary root dir.
"""
def __init__(self, root, filelist, temp_root=gettempdir()):
"""
Parameters
----------
root : IsmnRoot
Root object where data is stored.
filelist : collections.OrderedDict
A collection of filehandler stored in lists with network name as key.
temp_root : Path or str, optional (default : gettempdir())
Root directory where a separate subdir for temporary files
will be created (and deleted).
"""
self.root = root
self.filelist = filelist
self.temp_root = Path(temp_root)
os.makedirs(self.temp_root, exist_ok=True)
def __repr__(self):
return f"{self.__class__.__name__} for {len(self.filelist.keys())} Networks"
[docs] @classmethod
def build_from_scratch(
cls,
data_root,
parallel=True,
log_path=None,
temp_root=gettempdir(),
custom_meta_readers=None,
):
"""
Parameters
----------
data_root : IsmnRoot or str or Path
Root path of ISMN files or path to metadata pkl file.
i.e. path to the downloaded zip file or the extracted zip directory (faster)
or a file list that contains these infos already.
parallel : bool, optional (default: True)
Speed up metadata collecting with multiple processes.
log_path : str or Path, optional (default: None)
Path where the log file is created. If None is set, no log file
will be written.
temp_root : str or Path, (default: gettempdir())
Temporary folder where extracted data is copied during reading from
zip archive.
custom_meta_readers: tuple, optional (default: None)
Custom metadata readers
"""
t0 = time.time()
if isinstance(data_root, IsmnRoot):
root = data_root
else:
root = IsmnRoot(data_root)
os.makedirs(temp_root, exist_ok=True)
if log_path is not None:
log_file = os.path.join(log_path, f"{root.name}.log")
else:
log_file = None
if log_file:
os.makedirs(os.path.dirname(log_file), exist_ok=True)
logging.basicConfig(
filename=log_file,
level=logging.INFO,
format="%(levelname)s %(asctime)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
n_proc = 1 if not parallel else cpu_count()
logging.info(f"Collecting metadata with {n_proc} processes.")
if not parallel:
hint = 'Hint: Use `parallel=True` to speed up metadata ' \
'generation for large datasets'
else:
hint = ''
print(
f"Processing metadata for all ismn stations into folder "
f"{root.path}.\n"
f"This may take a few minutes, but is only done once...\n{hint}"
)
process_stat_dirs = []
for net_dir, stat_dirs in root.cont.items():
process_stat_dirs += list(stat_dirs)
args = [(root.path if root.zip else root, d, temp_root,
custom_meta_readers) for d in process_stat_dirs]
pbar = tqdm(total=len(args), desc="Files Processed")
fl_elements = []
def update(r):
net_stat_fh, infos = r
for i in infos:
logging.info(i)
for elements in net_stat_fh:
fl_elements.append(elements)
pbar.update()
def error(e):
logging.error(e)
pbar.update()
if n_proc == 1:
for arg in args:
try:
r = _read_station_dir(*arg)
update(r)
except Exception as e:
error(e)
else:
with Pool(n_proc) as pool:
for arg in args:
pool.apply_async(
_read_station_dir,
arg,
callback=update,
error_callback=error,
)
pool.close()
pool.join()
pbar.close()
fl_elements.sort(key=itemgetter(0, 1)) # sort by net name, stat name
# to ensure alphabetical order... not sure if necessary, slow?
filelist = OrderedDict([])
for net, stat, fh in fl_elements:
if net not in filelist.keys():
filelist[net] = []
filelist[net].append(fh)
t1 = time.time()
info = f"Metadata generation finished after {int(t1-t0)} Seconds."
if log_file is not None:
info += f"\nMetadata and Log stored in {os.path.dirname(log_file)}"
logging.info(info)
print(info)
return cls(root, filelist=filelist)
[docs] def get_filehandler(self, idx):
"""
Get the nth filehandler in a list of all filehandlers for all networks.
e.g. if there are 2 networks, with 3 filehandlers/sensors each, idx=4
will return the first filehandler of the second network.
Parameters
----------
idx: int
Index of filehandler to read.
Returns
-------
filehandler : DataFile
nth filehandler of all filehandlers in the sorted list.
"""
fs = 0
for net, files in self.filelist.items():
l = len(files)
if fs + l > idx:
return files[idx - fs]
else:
fs += l
[docs] def iter_filehandlers(self, networks=None):
"""
Iterator over files for networks
Parameters
----------
networks : list, optional (default: None)
Name of networks to get files for, or None to use all networks.
Yields
-------
file : DataFile
Filehandler with metadata
"""
for net, files in self.filelist.items():
if (networks is None) or (net in networks):
for f in files:
yield f
yield from () # in case networks is an empty list
[docs] def close(self):
# close root and all filehandlers
self.root.close()
for f in self.iter_filehandlers():
f.close()