Source code for ismn.download
import requests
import zipfile
import zlib
from pathlib import Path
from ismn.misc import collect_stm_cov, write_overview
from ismn.const import nrt_networks
[docs]class ISMNDownloader:
BASE_URL = "https://ismn.earth"
LOGIN_URL = f"{BASE_URL}/en/accounts/login/"
DOWNLOAD_URL = f"{BASE_URL}/en/dataviewer/api/download_archive"
def __init__(self, username: str, password: str, output_path: str = "ismn_archive.zip"):
self.username = username
self.password = password
self.output_path = Path(output_path).expanduser().resolve()
self.session = requests.Session()
def _get_csrf_token(self) -> str:
print("Step 1: Fetching login page and CSRF token...")
self.session.get(self.LOGIN_URL)
csrf_token = self.session.cookies["csrftoken"]
print(f" ✓ CSRF token obtained: {csrf_token[:10]}...")
return csrf_token
def _login(self, csrf_token: str) -> None:
print("\nStep 2: Logging in...")
response = self.session.post(
self.LOGIN_URL,
data={
"csrfmiddlewaretoken": csrf_token,
"login": self.username,
"password": self.password,
},
headers={"Referer": self.LOGIN_URL},
)
if response.ok:
print(f" ✓ Login successful (status {response.status_code})")
else:
raise RuntimeError(f"Login failed (status {response.status_code})")
def _download(self) -> None:
print("\nStep 3: Starting archive download...")
self.output_path.parent.mkdir(parents=True, exist_ok=True)
response = self.session.get(self.DOWNLOAD_URL, stream=True)
if not response.ok:
raise RuntimeError(f"Download request failed (status {response.status_code})")
total_size = int(response.headers.get("content-length", 0))
if total_size:
print(f" ✓ File size: {total_size / (1024**3):.2f} GB")
else:
print(" ! File size unknown (no Content-Length header)")
chunk_size = 1024 * 1024 # 1 MB
downloaded = 0
with open(self.output_path, "wb") as f:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total_size:
percent = downloaded / total_size * 100
downloaded_gb = downloaded / (1024**3)
total_gb = total_size / (1024**3)
print(f" Downloading... {downloaded_gb:.2f} / {total_gb:.2f} GB ({percent:.1f}%)", end="\r")
else:
print(f" Downloaded: {downloaded / (1024**3):.2f} GB", end="\r")
print(f"\n ✓ Download complete: {self.output_path} ({downloaded / (1024**3):.2f} GB)")
[docs] def run(self) -> None:
csrf_token = self._get_csrf_token()
self._login(csrf_token)
self._download()
[docs]class ISMNExtractor:
def __init__(self, archive_path: str, nrt_networks: list[str], output_dir: str = "ISMN"):
self.archive_path = Path(archive_path)
self.nrt_networks = nrt_networks
self.output_dir = Path(output_dir)
@staticmethod
def _file_crc32(path: Path, chunk_size: int = 65536) -> int:
crc = 0
with open(path, "rb") as f:
while chunk := f.read(chunk_size):
crc = zlib.crc32(chunk, crc)
return crc
def _needs_extraction(self, zinfo: zipfile.ZipInfo, target: Path) -> tuple[bool, str]:
"""Return (needs_extraction, reason)."""
if not target.exists():
return True, "new"
if target.stat().st_size != zinfo.file_size:
return True, "size differs"
if self._file_crc32(target) != zinfo.CRC:
return True, "content differs"
return False, "unchanged"
[docs] def run(self) -> None:
self.output_dir.mkdir(parents=True, exist_ok=True)
print(f"Extracting {len(self.nrt_networks)} NRT networks from {self.archive_path}...")
print(f"Output directory: {self.output_dir.resolve()}\n")
totals = {"new": 0, "updated": 0, "skipped": 0, "missing_networks": 0}
with zipfile.ZipFile(self.archive_path, "r") as zf:
all_entries = zf.namelist()
for network in self.nrt_networks:
matching = [e for e in all_entries if e.startswith(f"{network}/")]
if not matching:
print(f" ! Network not found in archive: {network}")
totals["missing_networks"] += 1
continue
print(f"\n Processing {network} ({len(matching)} files)...")
net_new = net_updated = net_skipped = 0
for entry in matching:
# Directory entries in zips end with "/"; skip them
if entry.endswith("/"):
continue
zinfo = zf.getinfo(entry)
target = self.output_dir / entry
needs, reason = self._needs_extraction(zinfo, target)
if needs:
zf.extract(entry, self.output_dir)
if reason == "new":
net_new += 1
print(f" + new {entry}")
else:
net_updated += 1
print(f" ~ updated {entry} ({reason})")
else:
net_skipped += 1
print(f" = skipped {entry}")
totals["new"] += net_new
totals["updated"] += net_updated
totals["skipped"] += net_skipped
print(f" ✓ {network}: {net_new} new, {net_updated} updated, {net_skipped} unchanged")
print(
f"\n✓ All done. "
f"{totals['new']} new, {totals['updated']} updated, {totals['skipped']} unchanged."
)
if totals["missing_networks"]:
print(f" ! {totals['missing_networks']} requested network(s) not found in archive.")
print("Collecting coverage information from .stm files...")
period_to = collect_stm_cov(str(self.output_dir), n_proc=4)
print("Writing overview file...")
write_overview(self.output_dir, period_to=period_to)
if __name__ == "__main__":
ISMNExtractor(archive_path="ismn_archive.zip", nrt_networks=nrt_networks, output_dir="ISMN").run()