"""Shared primitives for release-tarball flows. Both ``furtka.updater`` (core self-update) and ``furtka.catalog`` (apps catalog sync) pull a tarball from a Forgejo Releases page, verify its SHA256 against the ``.sha256`` sidecar, and extract it with a path- traversal guard. The helpers here are the single implementation of that dance. Each error-raising helper accepts an ``error_cls`` kwarg so callers can keep their domain-specific exception type (``UpdateError``, ``CatalogError``) at call sites — the helper itself defaults to a neutral ``ReleaseError`` for use in tests or standalone scripts. """ from __future__ import annotations import hashlib import json import shutil import tarfile import urllib.error import urllib.request from pathlib import Path class ReleaseError(RuntimeError): """Neutral failure for release-tarball operations.""" def forgejo_api(host: str, repo: str, path: str, *, error_cls: type = ReleaseError) -> dict | list: url = f"https://{host}/api/v1/repos/{repo}{path}" req = urllib.request.Request(url, headers={"Accept": "application/json"}) try: with urllib.request.urlopen(req, timeout=15) as resp: return json.loads(resp.read()) except (urllib.error.URLError, json.JSONDecodeError) as e: raise error_cls(f"forgejo api {url}: {e}") from e def download(url: str, dest: Path, *, error_cls: type = ReleaseError) -> None: dest.parent.mkdir(parents=True, exist_ok=True) req = urllib.request.Request(url) try: with urllib.request.urlopen(req, timeout=60) as resp, dest.open("wb") as f: shutil.copyfileobj(resp, f) except urllib.error.URLError as e: raise error_cls(f"download {url}: {e}") from e def sha256_of(path: Path) -> str: h = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): h.update(chunk) return h.hexdigest() def verify_tarball(tarball: Path, expected_sha: str, *, error_cls: type = ReleaseError) -> None: actual = sha256_of(tarball) if actual != expected_sha: raise error_cls(f"sha256 mismatch: expected {expected_sha}, got {actual}") def parse_sha256_sidecar(text: str, *, error_cls: type = ReleaseError) -> str: """Extract the hash from a standard `sha256sum` sidecar line.""" line = text.strip().split("\n", 1)[0].strip() if not line: raise error_cls("empty sha256 sidecar") return line.split()[0] def extract_tarball(tarball: Path, dest: Path, *, error_cls: type = ReleaseError) -> str: """Extract the tarball and return the VERSION read from its root. Refuses entries that could escape ``dest`` via absolute paths or ``..`` segments. On Python 3.12+ the stricter ``data`` filter is additionally enabled to catch symlink-escape / device-node / setuid tricks that the regex check can't see. """ dest.mkdir(parents=True, exist_ok=True) with tarfile.open(tarball, "r:gz") as tf: for member in tf.getmembers(): if member.name.startswith(("/", "..")) or ".." in Path(member.name).parts: raise error_cls(f"refusing tarball entry {member.name!r}") try: tf.extractall(dest, filter="data") except TypeError: tf.extractall(dest) version_file = dest / "VERSION" if not version_file.is_file(): raise error_cls("tarball has no VERSION file at root") return version_file.read_text().strip() def version_tuple(v: str) -> tuple: """CalVer comparator: 26.1-alpha < 26.1-beta < 26.1-rc < 26.1 < 26.2-alpha. Pre-release stages sort before the corresponding stable (no-suffix) release. Unknown suffixes sort below everything except the malformed fallback. Returns a tuple of (year, release, stage_rank, suffix). """ stage_rank = {"alpha": 0, "beta": 1, "rc": 2} head, _, suffix = v.partition("-") try: year_str, release_str = head.split(".", 1) year = int(year_str) release = int(release_str) except (ValueError, IndexError): return (-1, -1, -1, v) if not suffix: return (year, release, 3, "") for name, rank in stage_rank.items(): if suffix.startswith(name): return (year, release, rank, suffix) return (year, release, -1, suffix)