Source code for ismn.misc

import os
import warnings
from datetime import datetime
from glob import glob

from repurpose.process import parallel_process


[docs]def collect_stm_cov(data_path: str, n_proc=1, progressbar=False): """ Open all .stm time series files in a directory (slow) and detect the latest end date across all files. Files are filtered to those whose 4th underscore-separated field is "sm" (e.g. FMI_FMI_SOD023_sm_0.050000_..._19500101_20260511.stm). The end date is read from the first whitespace-separated token of the last line of each file (format "YYYY/MM/DD HH:MM ..."). Parameters ---------- data_path : str Path where the .stm files are stored. n_proc : int, optional (default: 1) Number of parallel workers (threads, since this is I/O-bound). progressbar : bool, optional (default: False) Show progress bar when looping through files. Returns ------- last_end : datetime The most recent end date found across all .stm files. """ fl = glob( os.path.join(data_path, '**', '*_*_*_sm_*.stm'), recursive=True) if len(fl) == 0: raise ValueError(f"No matching .stm files found in {data_path}") def _func(f: str) -> datetime: fname = os.path.basename(f) stem = fname[:-4] if fname.lower().endswith('.stm') else fname parts = stem.split('_') # Defensive: confirm "sm" position even though glob should ensure it if len(parts) < 4 or parts[3] != 'sm': raise ValueError( f"Filename does not match expected pattern: {fname}") # End date: first token of the last line. Seek from the end of the # file instead of reading the whole thing — these can be large. with open(f, 'rb') as fh: fh.seek(0, os.SEEK_END) filesize = fh.tell() bufsize = min(1024, filesize) data = b'' while True: fh.seek(-bufsize, os.SEEK_END) data = fh.read(bufsize) if data.count(b'\n') >= 2 or bufsize >= filesize: break bufsize = min(bufsize * 2, filesize) text = data.decode('utf-8', errors='replace').rstrip() last_nl = text.rfind('\n') last_line = text[last_nl + 1:] if last_nl != -1 else text if not last_line: raise ValueError(f"Empty last line in {f}") return datetime.strptime(last_line.split()[0], '%Y/%m/%d') ends = parallel_process( _func, ITER_KWARGS=dict(f=fl), show_progress_bars=progressbar, backend='threading', n_proc=n_proc) return max(ends)
[docs]def write_overview(data_path: str, period_to: datetime, product: str = "ISMN", version: str = "v202505") -> str: """ Write (or overwrite) an overview.yml file inside `data_path`, describing the dataset's product, version, and latest end date. Parameters ---------- data_path : str The data directory where overview.yml will be written. product : str, optional Product name. Default "ISMN". version : str, optional Product version. Default "v202505". Returns ------- out_path : str Absolute path of the written overview.yml file. """ out_path = os.path.join(os.path.abspath(data_path), "overview.yml") content = ( f"product: {product}\n" f"version: {version}\n" f"period_to: {period_to.strftime('%Y-%m-%d')}\n" ) with open(out_path, 'w') as f: f.write(content) return out_path
if __name__ == "__main__": print(collect_stm_cov("/tmp/ISMN", n_proc=4, progressbar=True))