furtka/tests/test_auth.py
Daniel Maksymilian Syrnicki 1cff22658b
All checks were successful
CI / lint (push) Successful in 1m59s
CI / test (push) Successful in 3m27s
CI / validate-json (push) Successful in 1m56s
CI / markdown-links (push) Successful in 1m24s
Build ISO / build-iso (push) Successful in 26m58s
feat(auth): rate-limit failed logins with per-(user, IP) lockout
Ten wrong passwords from the same (username, client-IP) tuple within
15 minutes now return 429 with Retry-After for the next 15 minutes;
authenticate() isn't even called while locked, so the 429 response is
identical whether the password would have been correct — no oracle.

Tuple keying prevents an attacker from one IP from locking the real
admin out of their own box: a different IP (or an ISP reconnect) keeps
them in. The client IP comes from the rightmost X-Forwarded-For entry,
which is what Caddy appends and thus trustworthy (no upstream proxy in
front of Caddy). First-run setup bypasses the lockout — otherwise a
clumsy operator could lock themselves out before an admin exists.

State is in-memory (parallel to SessionStore), so `systemctl restart
furtka` clears a stuck lockout.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 17:27:14 +02:00

230 lines
7.2 KiB
Python

import json
from datetime import UTC, datetime, timedelta
import pytest
from furtka import auth
@pytest.fixture
def tmp_users_file(tmp_path, monkeypatch):
path = tmp_path / "users.json"
monkeypatch.setenv("FURTKA_USERS_FILE", str(path))
# Sessions and lockout state are module-level; wipe between tests so
# one doesn't leak a valid token (or a stale failure counter) into
# the next.
auth.SESSIONS.clear()
auth.LOCKOUT.clear_all()
return path
def test_hash_password_roundtrip():
h = auth.hash_password("hunter2")
assert h != "hunter2" # Not plain text.
assert auth.verify_password("hunter2", h) is True
assert auth.verify_password("hunter3", h) is False
def test_hash_password_is_salted():
# Two calls with the same password must produce different hashes.
a = auth.hash_password("same")
b = auth.hash_password("same")
assert a != b
# But both verify against the original.
assert auth.verify_password("same", a)
assert auth.verify_password("same", b)
def test_load_users_returns_empty_when_missing(tmp_users_file):
assert not tmp_users_file.exists()
assert auth.load_users() == {}
def test_load_users_returns_empty_on_junk(tmp_users_file):
tmp_users_file.write_text("{not json")
assert auth.load_users() == {}
def test_load_users_returns_empty_on_non_dict(tmp_users_file):
tmp_users_file.write_text("[]")
assert auth.load_users() == {}
def test_save_users_atomic_and_0600(tmp_users_file):
auth.save_users({"admin": {"hash": "x", "username": "daniel"}})
assert tmp_users_file.exists()
mode = tmp_users_file.stat().st_mode & 0o777
assert mode == 0o600, f"expected 0o600, got {oct(mode)}"
loaded = json.loads(tmp_users_file.read_text())
assert loaded["admin"]["username"] == "daniel"
def test_setup_needed_true_on_missing_file(tmp_users_file):
assert auth.setup_needed() is True
def test_setup_needed_true_on_empty_dict(tmp_users_file):
tmp_users_file.write_text("{}")
assert auth.setup_needed() is True
def test_setup_needed_false_when_admin_exists(tmp_users_file):
auth.create_admin("daniel", "secret-pw")
assert auth.setup_needed() is False
def test_create_admin_overwrites_file(tmp_users_file):
auth.create_admin("daniel", "secret-pw")
auth.create_admin("robert", "new-pw")
users = auth.load_users()
assert users["admin"]["username"] == "robert"
def test_authenticate_happy(tmp_users_file):
auth.create_admin("daniel", "secret-pw")
assert auth.authenticate("daniel", "secret-pw") is True
def test_authenticate_wrong_username(tmp_users_file):
auth.create_admin("daniel", "secret-pw")
assert auth.authenticate("robert", "secret-pw") is False
def test_authenticate_wrong_password(tmp_users_file):
auth.create_admin("daniel", "secret-pw")
assert auth.authenticate("daniel", "wrong") is False
def test_authenticate_no_admin(tmp_users_file):
assert auth.authenticate("daniel", "anything") is False
# ---- Session store ---------------------------------------------------------
def test_session_create_and_lookup(tmp_users_file):
s = auth.SESSIONS.create("daniel")
assert s.username == "daniel"
assert s.token
looked_up = auth.SESSIONS.lookup(s.token)
assert looked_up is not None
assert looked_up.username == "daniel"
def test_session_lookup_unknown_token(tmp_users_file):
assert auth.SESSIONS.lookup("not-a-real-token") is None
def test_session_lookup_none_token(tmp_users_file):
assert auth.SESSIONS.lookup(None) is None
assert auth.SESSIONS.lookup("") is None
def test_session_revoke(tmp_users_file):
s = auth.SESSIONS.create("daniel")
auth.SESSIONS.revoke(s.token)
assert auth.SESSIONS.lookup(s.token) is None
def test_session_expires(tmp_users_file, monkeypatch):
# Build a session store with a 0-second TTL so lookup immediately
# treats new sessions as expired.
store = auth.SessionStore(ttl_seconds=0)
s = store.create("daniel")
# Force the clock forward a hair so the > check fires.
monkeypatch.setattr(
auth,
"datetime",
_FakeDatetime(datetime.now(UTC) + timedelta(seconds=1)),
)
# The module-local datetime reference inside SessionStore.lookup
# resolves at call time. Verify that an expired session is dropped.
assert store.lookup(s.token) is None
class _FakeDatetime:
"""Tiny shim — only `.now(tz)` is used from SessionStore."""
def __init__(self, fixed_utc):
self._fixed = fixed_utc
def now(self, tz=None):
if tz is None:
return self._fixed.replace(tzinfo=None)
return self._fixed.astimezone(tz)
# ---- Login attempts / lockout ----------------------------------------------
def test_lockout_under_threshold_still_allowed(tmp_users_file):
store = auth.LoginAttempts(max_failures=3, window_seconds=60, lockout_seconds=60)
key = ("daniel", "10.0.0.1")
for _ in range(2):
store.register_failure(key)
assert store.is_locked(key) is False
assert store.retry_after_seconds(key) == 0
def test_lockout_triggers_at_threshold(tmp_users_file):
store = auth.LoginAttempts(max_failures=3, window_seconds=60, lockout_seconds=60)
key = ("daniel", "10.0.0.1")
for _ in range(3):
store.register_failure(key)
assert store.is_locked(key) is True
assert store.retry_after_seconds(key) > 0
assert store.retry_after_seconds(key) <= 60
def test_lockout_window_decay(tmp_users_file, monkeypatch):
store = auth.LoginAttempts(max_failures=3, window_seconds=60, lockout_seconds=60)
key = ("daniel", "10.0.0.1")
for _ in range(3):
store.register_failure(key)
assert store.is_locked(key) is True
# Jump 2 minutes ahead — all failures are older than the window
# and should be pruned on the next check.
monkeypatch.setattr(
auth,
"datetime",
_FakeDatetime(datetime.now(UTC) + timedelta(seconds=121)),
)
assert store.is_locked(key) is False
assert store.retry_after_seconds(key) == 0
def test_lockout_clear_resets(tmp_users_file):
store = auth.LoginAttempts(max_failures=2, window_seconds=60, lockout_seconds=60)
key = ("daniel", "10.0.0.1")
store.register_failure(key)
store.register_failure(key)
assert store.is_locked(key) is True
store.clear(key)
assert store.is_locked(key) is False
assert store.retry_after_seconds(key) == 0
def test_lockout_keys_are_independent(tmp_users_file):
store = auth.LoginAttempts(max_failures=2, window_seconds=60, lockout_seconds=60)
locked = ("daniel", "1.1.1.1")
other_ip = ("daniel", "2.2.2.2")
other_user = ("robert", "1.1.1.1")
store.register_failure(locked)
store.register_failure(locked)
assert store.is_locked(locked) is True
assert store.is_locked(other_ip) is False
assert store.is_locked(other_user) is False
def test_lockout_clear_all_wipes_every_key(tmp_users_file):
store = auth.LoginAttempts(max_failures=2, window_seconds=60, lockout_seconds=60)
a = ("daniel", "1.1.1.1")
b = ("robert", "2.2.2.2")
store.register_failure(a)
store.register_failure(a)
store.register_failure(b)
store.register_failure(b)
assert store.is_locked(a) and store.is_locked(b)
store.clear_all()
assert not store.is_locked(a)
assert not store.is_locked(b)