furtka/furtka/auth.py
Daniel Maksymilian Syrnicki 1cff22658b
All checks were successful
CI / lint (push) Successful in 1m59s
CI / test (push) Successful in 3m27s
CI / validate-json (push) Successful in 1m56s
CI / markdown-links (push) Successful in 1m24s
Build ISO / build-iso (push) Successful in 26m58s
feat(auth): rate-limit failed logins with per-(user, IP) lockout
Ten wrong passwords from the same (username, client-IP) tuple within
15 minutes now return 429 with Retry-After for the next 15 minutes;
authenticate() isn't even called while locked, so the 429 response is
identical whether the password would have been correct — no oracle.

Tuple keying prevents an attacker from one IP from locking the real
admin out of their own box: a different IP (or an ISP reconnect) keeps
them in. The client IP comes from the rightmost X-Forwarded-For entry,
which is what Caddy appends and thus trustworthy (no upstream proxy in
front of Caddy). First-run setup bypasses the lockout — otherwise a
clumsy operator could lock themselves out before an admin exists.

State is in-memory (parallel to SessionStore), so `systemctl restart
furtka` clears a stuck lockout.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 17:27:14 +02:00

260 lines
8.6 KiB
Python

"""Login-guard primitives for the Furtka UI.
One admin, one password. Passwords are PBKDF2-SHA256 hashed via
``furtka.passwd`` (stdlib-only — hashlib.pbkdf2_hmac / hashlib.scrypt),
stored in /var/lib/furtka/users.json with mode 0600. Sessions live in
memory — a systemctl restart logs everyone out again, which is fine
for an alpha single-user box. The ``LoginAttempts`` store in this
module rate-limits failed logins per (username, IP) and is also
in-memory; a restart clears a stuck lockout.
On upgrade from pre-auth Furtka the users.json file does not exist
yet; the api's GET /login detects this via ``setup_needed()`` and
renders a first-run form that POSTs to /login as if it were a setup
submit. Fresh installs get the file pre-populated by the webinstaller
so the setup step is skipped.
Hash format is compatible with werkzeug.security — 26.11 / 26.12 boxes
that happened to have werkzeug installed can carry their users.json
forward without re-setup; see ``furtka.passwd`` for the scrypt reader.
"""
from __future__ import annotations
import json
import math
import secrets
import threading
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from furtka.passwd import hash_password as _hash_password
from furtka.passwd import verify_password as _verify_password
from furtka.paths import users_file
COOKIE_NAME = "furtka_session"
COOKIE_TTL_SECONDS = 7 * 24 * 3600 # one week
def hash_password(plain: str) -> str:
"""PBKDF2-SHA256 via stdlib. 600k iterations (OWASP 2023)."""
return _hash_password(plain)
def verify_password(plain: str, hashed: str) -> bool:
"""Constant-time compare. Accepts stdlib + legacy werkzeug formats."""
return _verify_password(plain, hashed)
def load_users() -> dict:
"""Return the users dict, or {} if the file is missing or empty.
Missing-file is the expected state on first boot and on upgrades from
pre-auth versions — callers treat empty-dict as "setup required".
"""
path = users_file()
if not path.exists():
return {}
try:
raw = path.read_text()
except OSError:
return {}
if not raw.strip():
return {}
try:
data = json.loads(raw)
except json.JSONDecodeError:
return {}
if not isinstance(data, dict):
return {}
return data
def save_users(users: dict) -> None:
"""Atomically write users.json with mode 0600.
Same pattern as installer.write_env — write to .tmp, chmod, rename —
so a crash between open() and close() can't leave a world-readable
partial file.
"""
path = users_file()
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text(json.dumps(users, indent=2) + "\n")
tmp.chmod(0o600)
tmp.replace(path)
def setup_needed() -> bool:
"""True when no admin is registered yet — initial setup is required."""
users = load_users()
return not users or "admin" not in users
def create_admin(username: str, password: str) -> None:
"""Overwrite users.json with a single admin account.
The webinstaller calls this post-install (with the step-1 password) so
the installed system is login-guarded from first boot. The /login
route calls it on first setup for upgrade-path boxes that don't
already have a users.json.
"""
users = {
"admin": {
"username": username,
"hash": hash_password(password),
"created_at": datetime.now(UTC).isoformat(timespec="seconds"),
}
}
save_users(users)
def authenticate(username: str, password: str) -> bool:
"""Return True iff the supplied credentials match the admin record."""
users = load_users()
admin = users.get("admin")
if not admin:
return False
if admin.get("username") != username:
return False
hashed = admin.get("hash")
if not isinstance(hashed, str) or not hashed:
return False
return verify_password(password, hashed)
@dataclass(frozen=True)
class Session:
token: str
username: str
expires_at: datetime
class SessionStore:
"""In-memory session table. Thread-safe (api.py uses the stdlib
HTTPServer which handles one request per thread — though the default
variant is single-threaded, we keep the lock so swapping to
ThreadingHTTPServer later doesn't require revisiting this).
"""
def __init__(self, ttl_seconds: int = COOKIE_TTL_SECONDS) -> None:
self._ttl = timedelta(seconds=ttl_seconds)
self._by_token: dict[str, Session] = {}
self._lock = threading.Lock()
def create(self, username: str) -> Session:
token = secrets.token_urlsafe(32)
session = Session(
token=token,
username=username,
expires_at=datetime.now(UTC) + self._ttl,
)
with self._lock:
self._by_token[token] = session
return session
def lookup(self, token: str | None) -> Session | None:
if not token:
return None
with self._lock:
session = self._by_token.get(token)
if session is None:
return None
if datetime.now(UTC) >= session.expires_at:
# Expired — drop it on the floor so repeat lookups stay fast.
self._by_token.pop(token, None)
return None
return session
def revoke(self, token: str | None) -> None:
if not token:
return
with self._lock:
self._by_token.pop(token, None)
def clear(self) -> None:
"""Test helper — wipe all sessions."""
with self._lock:
self._by_token.clear()
class LoginAttempts:
"""In-memory rate-limiter for failed logins, keyed by (username, ip).
Parallels SessionStore: thread-safe, uses ``datetime.now(UTC)`` so the
same ``_FakeDatetime`` test shim works, lives only in memory so a
``systemctl restart furtka`` wipes a stuck lockout. Tuple keying means
a flood from one source IP can't lock the admin out from elsewhere
(different IP → different key) — the trade-off is that an attacker
can keep probing forever by rotating IPs, but they still eat the
PBKDF2 cost per attempt.
Stored data is a dict[key → list[datetime]] of recent failure
timestamps. Every call prunes entries older than ``WINDOW_SECONDS``,
so memory per active key is bounded by ``MAX_FAILURES``.
"""
MAX_FAILURES = 10
WINDOW_SECONDS = 15 * 60
LOCKOUT_SECONDS = 15 * 60
def __init__(
self,
max_failures: int = MAX_FAILURES,
window_seconds: int = WINDOW_SECONDS,
lockout_seconds: int = LOCKOUT_SECONDS,
) -> None:
self._max = max_failures
self._window = timedelta(seconds=window_seconds)
self._lockout = timedelta(seconds=lockout_seconds)
self._fails: dict[tuple[str, str], list[datetime]] = {}
self._lock = threading.Lock()
def _prune_locked(self, key: tuple[str, str], now: datetime) -> list[datetime]:
"""Drop timestamps older than the window; caller holds self._lock."""
cutoff = now - self._window
kept = [ts for ts in self._fails.get(key, ()) if ts >= cutoff]
if kept:
self._fails[key] = kept
else:
self._fails.pop(key, None)
return kept
def register_failure(self, key: tuple[str, str]) -> None:
now = datetime.now(UTC)
with self._lock:
self._prune_locked(key, now)
self._fails.setdefault(key, []).append(now)
def is_locked(self, key: tuple[str, str]) -> bool:
return self.retry_after_seconds(key) > 0
def retry_after_seconds(self, key: tuple[str, str]) -> int:
"""Seconds remaining on an active lockout, or 0 if not locked."""
now = datetime.now(UTC)
with self._lock:
kept = self._prune_locked(key, now)
if len(kept) < self._max:
return 0
# Lockout runs from the oldest retained failure; once it
# falls off the window the key is effectively released.
unlock_at = kept[0] + self._lockout
remaining = (unlock_at - now).total_seconds()
if remaining <= 0:
return 0
return max(1, math.ceil(remaining))
def clear(self, key: tuple[str, str]) -> None:
with self._lock:
self._fails.pop(key, None)
def clear_all(self) -> None:
"""Test helper — wipe all failure state."""
with self._lock:
self._fails.clear()
# Module-level singleton used by the HTTP handler.
SESSIONS = SessionStore()
LOCKOUT = LoginAttempts()