feat(auth): rate-limit failed logins with per-(user, IP) lockout
All checks were successful
CI / lint (push) Successful in 1m59s
CI / test (push) Successful in 3m27s
CI / validate-json (push) Successful in 1m56s
CI / markdown-links (push) Successful in 1m24s
Build ISO / build-iso (push) Successful in 26m58s

Ten wrong passwords from the same (username, client-IP) tuple within
15 minutes now return 429 with Retry-After for the next 15 minutes;
authenticate() isn't even called while locked, so the 429 response is
identical whether the password would have been correct — no oracle.

Tuple keying prevents an attacker from one IP from locking the real
admin out of their own box: a different IP (or an ISP reconnect) keeps
them in. The client IP comes from the rightmost X-Forwarded-For entry,
which is what Caddy appends and thus trustworthy (no upstream proxy in
front of Caddy). First-run setup bypasses the lockout — otherwise a
clumsy operator could lock themselves out before an admin exists.

State is in-memory (parallel to SessionStore), so `systemctl restart
furtka` clears a stuck lockout.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Daniel Maksymilian Syrnicki 2026-04-22 17:27:14 +02:00
parent e68ed279cc
commit 1cff22658b
4 changed files with 315 additions and 8 deletions

View file

@ -1177,6 +1177,16 @@ class _Handler(BaseHTTPRequestHandler):
f"{auth.COOKIE_NAME}=; HttpOnly; SameSite=Strict; Path=/; Max-Age=0",
)
def _client_ip(self) -> str:
# Caddy's reverse_proxy appends the real TCP peer to X-Forwarded-For;
# the rightmost entry is the one Caddy added, so it's trustworthy
# even if a client spoofed an XFF of their own. Caddy is the edge —
# no upstream proxy in front of it.
xff = self.headers.get("X-Forwarded-For")
if xff:
return xff.rsplit(",", 1)[-1].strip()
return self.client_address[0]
def _handle_login(self, payload):
username = payload.get("username") if isinstance(payload, dict) else None
password = payload.get("password") if isinstance(payload, dict) else None
@ -1200,12 +1210,26 @@ class _Handler(BaseHTTPRequestHandler):
)
auth.create_admin(username, password)
else:
# Tuple-keyed lockout: a flood from one IP can't lock the
# admin out from a different IP. When locked we return the
# same 429 regardless of whether the password is correct —
# no oracle, no timing leak via "would have worked."
lockout_key = (username, self._client_ip())
retry = auth.LOCKOUT.retry_after_seconds(lockout_key)
if retry > 0:
return self._json(
429,
{"error": "too many failed attempts, try again later"},
extra_headers=[("Retry-After", str(retry))],
)
if not auth.authenticate(username, password):
# Cheap brute-force speed bump. werkzeug's PBKDF2 is
# already slow per attempt, but a fixed sleep makes
# "try 1000 passwords over the LAN" even less fun.
# Register before the sleep so concurrent threads see a
# consistent count; keep the sleep so timing can't
# distinguish "locked" from "wrong password."
auth.LOCKOUT.register_failure(lockout_key)
time.sleep(0.5)
return self._json(401, {"error": "invalid username or password"})
auth.LOCKOUT.clear(lockout_key)
session = auth.SESSIONS.create(username)
cookie = self._session_cookie_header(session.token, auth.COOKIE_TTL_SECONDS)

View file

@ -4,7 +4,9 @@ One admin, one password. Passwords are PBKDF2-SHA256 hashed via
``furtka.passwd`` (stdlib-only hashlib.pbkdf2_hmac / hashlib.scrypt),
stored in /var/lib/furtka/users.json with mode 0600. Sessions live in
memory a systemctl restart logs everyone out again, which is fine
for an alpha single-user box.
for an alpha single-user box. The ``LoginAttempts`` store in this
module rate-limits failed logins per (username, IP) and is also
in-memory; a restart clears a stuck lockout.
On upgrade from pre-auth Furtka the users.json file does not exist
yet; the api's GET /login detects this via ``setup_needed()`` and
@ -20,6 +22,7 @@ forward without re-setup; see ``furtka.passwd`` for the scrypt reader.
from __future__ import annotations
import json
import math
import secrets
import threading
from dataclasses import dataclass
@ -176,5 +179,82 @@ class SessionStore:
self._by_token.clear()
class LoginAttempts:
"""In-memory rate-limiter for failed logins, keyed by (username, ip).
Parallels SessionStore: thread-safe, uses ``datetime.now(UTC)`` so the
same ``_FakeDatetime`` test shim works, lives only in memory so a
``systemctl restart furtka`` wipes a stuck lockout. Tuple keying means
a flood from one source IP can't lock the admin out from elsewhere
(different IP different key) the trade-off is that an attacker
can keep probing forever by rotating IPs, but they still eat the
PBKDF2 cost per attempt.
Stored data is a dict[key list[datetime]] of recent failure
timestamps. Every call prunes entries older than ``WINDOW_SECONDS``,
so memory per active key is bounded by ``MAX_FAILURES``.
"""
MAX_FAILURES = 10
WINDOW_SECONDS = 15 * 60
LOCKOUT_SECONDS = 15 * 60
def __init__(
self,
max_failures: int = MAX_FAILURES,
window_seconds: int = WINDOW_SECONDS,
lockout_seconds: int = LOCKOUT_SECONDS,
) -> None:
self._max = max_failures
self._window = timedelta(seconds=window_seconds)
self._lockout = timedelta(seconds=lockout_seconds)
self._fails: dict[tuple[str, str], list[datetime]] = {}
self._lock = threading.Lock()
def _prune_locked(self, key: tuple[str, str], now: datetime) -> list[datetime]:
"""Drop timestamps older than the window; caller holds self._lock."""
cutoff = now - self._window
kept = [ts for ts in self._fails.get(key, ()) if ts >= cutoff]
if kept:
self._fails[key] = kept
else:
self._fails.pop(key, None)
return kept
def register_failure(self, key: tuple[str, str]) -> None:
now = datetime.now(UTC)
with self._lock:
self._prune_locked(key, now)
self._fails.setdefault(key, []).append(now)
def is_locked(self, key: tuple[str, str]) -> bool:
return self.retry_after_seconds(key) > 0
def retry_after_seconds(self, key: tuple[str, str]) -> int:
"""Seconds remaining on an active lockout, or 0 if not locked."""
now = datetime.now(UTC)
with self._lock:
kept = self._prune_locked(key, now)
if len(kept) < self._max:
return 0
# Lockout runs from the oldest retained failure; once it
# falls off the window the key is effectively released.
unlock_at = kept[0] + self._lockout
remaining = (unlock_at - now).total_seconds()
if remaining <= 0:
return 0
return max(1, math.ceil(remaining))
def clear(self, key: tuple[str, str]) -> None:
with self._lock:
self._fails.pop(key, None)
def clear_all(self) -> None:
"""Test helper — wipe all failure state."""
with self._lock:
self._fails.clear()
# Module-level singleton used by the HTTP handler.
SESSIONS = SessionStore()
LOCKOUT = LoginAttempts()

View file

@ -48,9 +48,10 @@ def fake_dirs(tmp_path, monkeypatch):
from furtka import install_runner
importlib.reload(install_runner)
# Scrub any sessions that leaked from a prior test — the SESSIONS
# store is module-level.
# Scrub any sessions or lockout counters that leaked from a prior
# test — both stores are module-level.
auth.SESSIONS.clear()
auth.LOCKOUT.clear_all()
return apps, bundled
@ -600,6 +601,130 @@ def test_post_login_rejects_wrong_password(fake_dirs):
server.server_close()
def _post_wrong_login(port, username="daniel", password="nope"):
req = _request(
port,
"/login",
method="POST",
body={"username": username, "password": password},
)
try:
urllib.request.urlopen(req)
raise AssertionError("expected HTTPError")
except urllib.error.HTTPError as e:
return e
def test_post_login_locks_out_after_repeated_failures(fake_dirs, monkeypatch):
auth.create_admin("daniel", "hunter2-pw")
# Flatten the 0.5s speed-bump so the test doesn't take 5 seconds.
monkeypatch.setattr(api.time, "sleep", lambda _s: None)
server, port = _start_server()
try:
for _ in range(auth.LoginAttempts.MAX_FAILURES):
err = _post_wrong_login(port)
assert err.code == 401
err = _post_wrong_login(port)
assert err.code == 429
assert err.headers.get("Retry-After") is not None
assert int(err.headers["Retry-After"]) > 0
finally:
server.shutdown()
server.server_close()
def test_post_login_429_masks_correctness(fake_dirs, monkeypatch):
"""Once locked, the correct password must also get 429 — no oracle."""
auth.create_admin("daniel", "hunter2-pw")
monkeypatch.setattr(api.time, "sleep", lambda _s: None)
server, port = _start_server()
try:
for _ in range(auth.LoginAttempts.MAX_FAILURES):
_post_wrong_login(port)
req = _request(
port,
"/login",
method="POST",
body={"username": "daniel", "password": "hunter2-pw"},
)
try:
urllib.request.urlopen(req)
raise AssertionError("expected 429")
except urllib.error.HTTPError as e:
assert e.code == 429
finally:
server.shutdown()
server.server_close()
def test_post_login_success_clears_lockout_counter(fake_dirs, monkeypatch):
auth.create_admin("daniel", "hunter2-pw")
monkeypatch.setattr(api.time, "sleep", lambda _s: None)
server, port = _start_server()
try:
# Get close to the threshold, then log in successfully.
for _ in range(auth.LoginAttempts.MAX_FAILURES - 1):
_post_wrong_login(port)
req = _request(
port,
"/login",
method="POST",
body={"username": "daniel", "password": "hunter2-pw"},
)
with urllib.request.urlopen(req) as r:
assert r.status == 200
# Counter must have been cleared: another full MAX_FAILURES-1
# fails shouldn't trigger 429.
for _ in range(auth.LoginAttempts.MAX_FAILURES - 1):
err = _post_wrong_login(port)
assert err.code == 401
finally:
server.shutdown()
server.server_close()
def test_post_login_setup_not_rate_limited(fake_dirs, monkeypatch):
"""First-run setup is never auth-ed against a hash, so the lockout
must not apply otherwise a clumsy admin could lock themselves out
of a box that has no admin yet."""
monkeypatch.setattr(api.time, "sleep", lambda _s: None)
server, port = _start_server()
try:
# Many mismatched setup submissions (400s) — no 429 should appear.
for _ in range(auth.LoginAttempts.MAX_FAILURES + 3):
req = _request(
port,
"/login",
method="POST",
body={
"username": "daniel",
"password": "longenough",
"password2": "different",
},
)
try:
urllib.request.urlopen(req)
raise AssertionError("expected 400")
except urllib.error.HTTPError as e:
assert e.code == 400
# Then a good setup still succeeds.
req = _request(
port,
"/login",
method="POST",
body={
"username": "daniel",
"password": "longenough",
"password2": "longenough",
},
)
with urllib.request.urlopen(req) as r:
assert r.status == 200
finally:
server.shutdown()
server.server_close()
def test_post_logout_revokes_session(fake_dirs, admin_session):
server, port = _start_server()
try:

View file

@ -10,9 +10,11 @@ from furtka import auth
def tmp_users_file(tmp_path, monkeypatch):
path = tmp_path / "users.json"
monkeypatch.setenv("FURTKA_USERS_FILE", str(path))
# Sessions are module-level; wipe between tests so one doesn't leak a
# valid token into the next.
# Sessions and lockout state are module-level; wipe between tests so
# one doesn't leak a valid token (or a stale failure counter) into
# the next.
auth.SESSIONS.clear()
auth.LOCKOUT.clear_all()
return path
@ -150,3 +152,79 @@ class _FakeDatetime:
if tz is None:
return self._fixed.replace(tzinfo=None)
return self._fixed.astimezone(tz)
# ---- Login attempts / lockout ----------------------------------------------
def test_lockout_under_threshold_still_allowed(tmp_users_file):
store = auth.LoginAttempts(max_failures=3, window_seconds=60, lockout_seconds=60)
key = ("daniel", "10.0.0.1")
for _ in range(2):
store.register_failure(key)
assert store.is_locked(key) is False
assert store.retry_after_seconds(key) == 0
def test_lockout_triggers_at_threshold(tmp_users_file):
store = auth.LoginAttempts(max_failures=3, window_seconds=60, lockout_seconds=60)
key = ("daniel", "10.0.0.1")
for _ in range(3):
store.register_failure(key)
assert store.is_locked(key) is True
assert store.retry_after_seconds(key) > 0
assert store.retry_after_seconds(key) <= 60
def test_lockout_window_decay(tmp_users_file, monkeypatch):
store = auth.LoginAttempts(max_failures=3, window_seconds=60, lockout_seconds=60)
key = ("daniel", "10.0.0.1")
for _ in range(3):
store.register_failure(key)
assert store.is_locked(key) is True
# Jump 2 minutes ahead — all failures are older than the window
# and should be pruned on the next check.
monkeypatch.setattr(
auth,
"datetime",
_FakeDatetime(datetime.now(UTC) + timedelta(seconds=121)),
)
assert store.is_locked(key) is False
assert store.retry_after_seconds(key) == 0
def test_lockout_clear_resets(tmp_users_file):
store = auth.LoginAttempts(max_failures=2, window_seconds=60, lockout_seconds=60)
key = ("daniel", "10.0.0.1")
store.register_failure(key)
store.register_failure(key)
assert store.is_locked(key) is True
store.clear(key)
assert store.is_locked(key) is False
assert store.retry_after_seconds(key) == 0
def test_lockout_keys_are_independent(tmp_users_file):
store = auth.LoginAttempts(max_failures=2, window_seconds=60, lockout_seconds=60)
locked = ("daniel", "1.1.1.1")
other_ip = ("daniel", "2.2.2.2")
other_user = ("robert", "1.1.1.1")
store.register_failure(locked)
store.register_failure(locked)
assert store.is_locked(locked) is True
assert store.is_locked(other_ip) is False
assert store.is_locked(other_user) is False
def test_lockout_clear_all_wipes_every_key(tmp_users_file):
store = auth.LoginAttempts(max_failures=2, window_seconds=60, lockout_seconds=60)
a = ("daniel", "1.1.1.1")
b = ("robert", "2.2.2.2")
store.register_failure(a)
store.register_failure(a)
store.register_failure(b)
store.register_failure(b)
assert store.is_locked(a) and store.is_locked(b)
store.clear_all()
assert not store.is_locked(a)
assert not store.is_locked(b)