Source code for ismn.download

import requests
import zipfile
import shutil
from pathlib import Path
from ismn.misc import collect_stm_cov, write_overview
from ismn.const import nrt_networks
import re

[docs]class ISMNDownloader: BASE_URL = "https://ismn.earth" LOGIN_URL = f"{BASE_URL}/en/accounts/login/" DOWNLOAD_URL = f"{BASE_URL}/en/dataviewer/api/download_archive" def __init__(self, username: str, password: str, output_path: str = "ismn_archive.zip"): self.username = username self.password = password self.output_path = Path(output_path).expanduser().resolve() self.session = requests.Session() def _get_csrf_token(self) -> str: print("Step 1: Fetching login page and CSRF token...") self.session.get(self.LOGIN_URL) csrf_token = self.session.cookies["csrftoken"] print(f" ✓ CSRF token obtained: {csrf_token[:10]}...") return csrf_token def _login(self, csrf_token: str) -> None: print("\nStep 2: Logging in...") response = self.session.post( self.LOGIN_URL, data={ "csrfmiddlewaretoken": csrf_token, "login": self.username, "password": self.password, }, headers={"Referer": self.LOGIN_URL}, ) if response.ok: print(f" ✓ Login successful (status {response.status_code})") else: raise RuntimeError(f"Login failed (status {response.status_code})") def _download(self) -> None: print("\nStep 3: Starting archive download...") self.output_path.parent.mkdir(parents=True, exist_ok=True) response = self.session.get(self.DOWNLOAD_URL, stream=True) if not response.ok: raise RuntimeError( f"Download request failed (status {response.status_code})") total_size = int(response.headers.get("content-length", 0)) if total_size: print(f" ✓ File size: {total_size / (1024**3):.2f} GB") else: print(" ! File size unknown (no Content-Length header)") chunk_size = 1024 * 1024 # 1 MB downloaded = 0 with open(self.output_path, "wb") as f: for chunk in response.iter_content(chunk_size=chunk_size): if chunk: f.write(chunk) downloaded += len(chunk) if total_size: percent = downloaded / total_size * 100 downloaded_gb = downloaded / (1024**3) total_gb = total_size / (1024**3) print( f" Downloading... {downloaded_gb:.2f} / {total_gb:.2f} GB ({percent:.1f}%)", end="\r") else: print( f" Downloaded: {downloaded / (1024**3):.2f} GB", end="\r") print( f"\n ✓ Download complete: {self.output_path} ({downloaded / (1024**3):.2f} GB)" )
[docs] def run(self) -> None: csrf_token = self._get_csrf_token() self._login(csrf_token) self._download()
[docs]class ISMNExtractor: # Matches "<prefix>_<YYYYMMDD>.stm" -> group(1) is everything except the final date _STM_RE = re.compile(r"^(.*)_(\d{8})\.stm$") def __init__(self, archive_path: str, nrt_networks: list[str], output_dir: str = "ISMN"): self.archive_path = Path(archive_path) self.nrt_networks = nrt_networks self.output_dir = Path(output_dir) @classmethod def _stm_prefix(cls, name: str) -> str | None: """Return the filename with the trailing '_YYYYMMDD.stm' stripped, or None if it doesn't match.""" m = cls._STM_RE.match(name) return m.group(1) if m else None def _index_existing_stm(self, network: str) -> dict[tuple[Path, str], Path]: """Map (parent_dir, prefix) -> existing .stm file on disk for the given network.""" index: dict[tuple[Path, str], Path] = {} net_dir = self.output_dir / network if not net_dir.exists(): return index for path in net_dir.rglob("*.stm"): prefix = self._stm_prefix(path.name) if prefix is None: continue index[(path.parent, prefix)] = path return index
[docs] def run(self) -> None: self.output_dir.mkdir(parents=True, exist_ok=True) print(f"Updating {len(self.nrt_networks)} NRT networks from {self.archive_path}...") print(f"Output directory: {self.output_dir.resolve()}\n") totals = {"overwritten": 0, "no_match": 0, "missing_networks": 0} with zipfile.ZipFile(self.archive_path, "r") as zf: all_entries = zf.namelist() for network in self.nrt_networks: matching = [e for e in all_entries if e.startswith(f"{network}/")] if not matching: print(f" ! Network not found in archive: {network}") totals["missing_networks"] += 1 continue # Build an index of the .stm files already present for this network existing_index = self._index_existing_stm(network) print(f"\n Processing {network} ({len(matching)} entries)...") net_overwritten = net_no_match = 0 for entry in matching: # Skip directory entries and anything that isn't a .stm file if entry.endswith("/") or not entry.endswith(".stm"): continue archive_name = Path(entry).name prefix = self._stm_prefix(archive_name) if prefix is None: continue # not a dated .stm file we care about target_dir = (self.output_dir / entry).parent existing = existing_index.get((target_dir, prefix)) if existing is None: # No file with the same prefix on disk -> nothing to update net_no_match += 1 print(f" - no match {archive_name}") continue # Overwrite the existing file, keeping its (old) name with zf.open(entry) as src, open(existing, "wb") as dst: shutil.copyfileobj(src, dst) net_overwritten += 1 print(f" ~ updated {existing.name} (from {archive_name})") totals["overwritten"] += net_overwritten totals["no_match"] += net_no_match print(f" ✓ {network}: {net_overwritten} overwritten, {net_no_match} without a match") print( f"\n✓ All done. " f"{totals['overwritten']} overwritten, {totals['no_match']} without a match." ) if totals["missing_networks"]: print(f" ! {totals['missing_networks']} requested network(s) not found in archive.") print("Collecting coverage information from .stm files...") period_to = collect_stm_cov(str(self.output_dir), n_proc=4) print("Writing overview file...") write_overview(self.output_dir, period_to=period_to)
if __name__ == "__main__": ISMNExtractor( archive_path="ismn_archive.zip", output_dir="/ISMN", nrt_networks=nrt_networks).run()