furtka/tests/test_api.py
Daniel Maksymilian Syrnicki 470823b347
All checks were successful
Build ISO / build-iso (push) Successful in 17m30s
CI / lint (push) Successful in 27s
CI / test (push) Successful in 43s
CI / validate-json (push) Successful in 31s
CI / markdown-links (push) Successful in 15s
Release / release (push) Successful in 11m38s
feat(auth): login-guard the Furtka UI with a cookie session
One-admin, one-password model — all of /apps, /api/*, /, and
/settings/ now require a signed-in session. Passwords are werkzeug
PBKDF2-hashed in /var/lib/furtka/users.json (mode 0600, atomic write
via the same .tmp+chmod+rename dance installer.write_env uses).
Sessions are secrets.token_urlsafe(32) tokens held in a module-level
SessionStore dict (thread-safe lock included for when we swap to
ThreadingHTTPServer). Cookies are HttpOnly, SameSite=Strict, and
Path=/, with Secure set when X-Forwarded-Proto from Caddy says HTTPS.

Two bootstrap paths:
  * Fresh install — webinstaller step-1 collects Linux user + password,
    the chroot post-install step hashes the password and writes
    users.json on the target partition. First browser visit lands on
    /login with the account already present.
  * Upgrade from 26.10-alpha — no users.json yet, so /login detects
    setup_needed() and renders a first-run setup form. POST creates
    the admin and immediately logs in.

POST /logout revokes the server session and clears the cookie.
Unauthenticated HTML requests 302 to /login; unauthenticated API
requests 401 JSON so fetch() callers see a clean error. A sleep(0.5)
on failed logins is the brute-force speed bump on top of werkzeug's
~600k-iter PBKDF2.

Caddyfile gains /login* and /logout* handle blocks in the shared
furtka_routes snippet so both :80 and the HTTPS hostname block
forward the auth endpoints to localhost:7000. Without this Caddy
would 404 from the static file server.

Test surface:
  * tests/test_auth.py (new, 19 cases): hash roundtrip, users.json
    I/O, session create/lookup/expire/revoke.
  * tests/test_api.py: new admin_session fixture; existing HTTP
    tests updated to send the cookie; new tests cover login setup,
    login success, wrong-password 401, logout revocation, and the
    guard's 302/401 split.
  * tests/test_webinstaller_assets.py: new case that unpacks the
    users.json _write_file_cmd body and verifies the werkzeug hash
    round-trips against the step-1 password.

Bumped version to 26.11-alpha and rolled CHANGELOG. Also folded in
the ruff-format fix that was pending from 26.10-alpha's lint red.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-21 13:01:17 +02:00

978 lines
32 KiB
Python

import json
import threading
import urllib.error
import urllib.request
import pytest
from furtka import api, auth, dockerops
VALID_MANIFEST = {
"name": "fileshare",
"display_name": "Network Files",
"version": "0.1.0",
"description": "SMB share",
"volumes": ["files"],
"ports": [445],
"icon": "icon.svg",
}
@pytest.fixture
def fake_dirs(tmp_path, monkeypatch):
apps = tmp_path / "apps"
bundled = tmp_path / "bundled"
catalog = tmp_path / "catalog"
users_file = tmp_path / "users.json"
apps.mkdir()
bundled.mkdir()
monkeypatch.setenv("FURTKA_APPS_DIR", str(apps))
monkeypatch.setenv("FURTKA_BUNDLED_APPS_DIR", str(bundled))
monkeypatch.setenv("FURTKA_CATALOG_DIR", str(catalog))
monkeypatch.setenv("FURTKA_USERS_FILE", str(users_file))
# Scrub any sessions that leaked from a prior test — the SESSIONS
# store is module-level.
auth.SESSIONS.clear()
return apps, bundled
@pytest.fixture
def admin_session(fake_dirs):
"""Pre-create an admin account + live session. Returns a Cookie header
value ready to drop into urllib.request.Request(headers=...)."""
auth.create_admin("daniel", "hunter2-pw")
session = auth.SESSIONS.create("daniel")
return f"{auth.COOKIE_NAME}={session.token}"
@pytest.fixture
def no_docker(monkeypatch):
"""Stub docker calls so install/remove can run without a daemon."""
monkeypatch.setattr(dockerops, "ensure_volume", lambda name: True)
monkeypatch.setattr(dockerops, "compose_up", lambda app_dir, project: None)
monkeypatch.setattr(dockerops, "compose_down", lambda app_dir, project: None)
def _write_bundled(bundled, name, manifest=None, env_example=None):
app = bundled / name
app.mkdir()
(app / "manifest.json").write_text(json.dumps(manifest or VALID_MANIFEST))
(app / "docker-compose.yaml").write_text("services: {}\n")
if env_example is not None:
(app / ".env.example").write_text(env_example)
return app
def test_list_installed_empty(fake_dirs):
assert api._list_installed() == []
def test_list_available_empty(fake_dirs):
assert api._list_available() == []
def test_list_available_shows_uninstalled(fake_dirs):
_, bundled = fake_dirs
_write_bundled(bundled, "fileshare")
out = api._list_available()
assert len(out) == 1
assert out[0]["name"] == "fileshare"
assert "display_name" in out[0]
# Source field lets the UI later distinguish catalog from bundled seed.
assert out[0]["source"] == "bundled"
# --- Icon inlining ----------------------------------------------------------
_SIMPLE_SVG = (
'<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24"><path d="M0 0h10v10H0z"/></svg>'
)
def _write_icon(app_dir, contents, name="icon.svg"):
(app_dir / name).write_text(contents)
def test_read_icon_svg_returns_content(tmp_path):
_write_icon(tmp_path, _SIMPLE_SVG)
assert api._read_icon_svg(tmp_path, "icon.svg") == _SIMPLE_SVG
def test_read_icon_svg_strips_xml_declaration(tmp_path):
_write_icon(tmp_path, '<?xml version="1.0" encoding="UTF-8"?>\n' + _SIMPLE_SVG)
assert api._read_icon_svg(tmp_path, "icon.svg") == _SIMPLE_SVG
def test_read_icon_svg_missing_file_returns_none(tmp_path):
assert api._read_icon_svg(tmp_path, "ghost.svg") is None
def test_read_icon_svg_no_name_returns_none(tmp_path):
assert api._read_icon_svg(tmp_path, None) is None
assert api._read_icon_svg(tmp_path, "") is None
def test_read_icon_svg_rejects_non_svg(tmp_path):
_write_icon(tmp_path, "<html><body>hi</body></html>")
assert api._read_icon_svg(tmp_path, "icon.svg") is None
def test_read_icon_svg_rejects_oversized(tmp_path):
_write_icon(tmp_path, "<svg>" + ("x" * (17 * 1024)) + "</svg>")
assert api._read_icon_svg(tmp_path, "icon.svg") is None
def test_read_icon_svg_rejects_script_tag(tmp_path):
_write_icon(tmp_path, "<svg><script>alert(1)</script></svg>")
assert api._read_icon_svg(tmp_path, "icon.svg") is None
def test_read_icon_svg_rejects_event_handler(tmp_path):
_write_icon(tmp_path, '<svg onload="alert(1)"><path/></svg>')
assert api._read_icon_svg(tmp_path, "icon.svg") is None
def test_read_icon_svg_rejects_javascript_url(tmp_path):
_write_icon(tmp_path, '<svg><a href="javascript:alert(1)"/></svg>')
assert api._read_icon_svg(tmp_path, "icon.svg") is None
def test_list_available_inlines_icon_svg(fake_dirs):
_, bundled = fake_dirs
app = _write_bundled(bundled, "fileshare")
_write_icon(app, _SIMPLE_SVG)
[entry] = api._list_available()
assert entry["icon_svg"] == _SIMPLE_SVG
def test_list_installed_inlines_icon_svg(fake_dirs, no_docker):
apps, bundled = fake_dirs
app = _write_bundled(bundled, "fileshare", env_example="A=real")
_write_icon(app, _SIMPLE_SVG)
api._do_install("fileshare")
[entry] = api._list_installed()
assert entry["icon_svg"] == _SIMPLE_SVG
def test_list_available_hides_already_installed(fake_dirs, no_docker):
apps, bundled = fake_dirs
_write_bundled(bundled, "fileshare", env_example="A=real")
status, _ = api._do_install("fileshare")
assert status == 200
# Now bundled should NOT include fileshare anymore.
assert api._list_available() == []
# But installed list should.
installed = api._list_installed()
assert len(installed) == 1 and installed[0]["name"] == "fileshare"
def test_list_available_prefers_catalog_over_bundled(fake_dirs):
_, bundled = fake_dirs
catalog_root = bundled.parent / "catalog" / "apps"
catalog_root.mkdir(parents=True)
_write_bundled(bundled, "fileshare")
# A fileshare in the catalog as well — manifest version 0.2.0 to tell apart.
catalog_manifest = dict(VALID_MANIFEST, version="0.2.0")
cat_app = catalog_root / "fileshare"
cat_app.mkdir()
(cat_app / "manifest.json").write_text(json.dumps(catalog_manifest))
out = api._list_available()
assert len(out) == 1
assert out[0]["source"] == "catalog"
assert out[0]["version"] == "0.2.0"
def test_install_endpoint_rejects_placeholder(fake_dirs):
_, bundled = fake_dirs
_write_bundled(bundled, "fileshare", env_example="SMB_PASSWORD=changeme")
status, body = api._do_install("fileshare")
assert status == 400
assert "placeholder" in body["error"]
def test_install_endpoint_rejects_unknown_app(fake_dirs):
status, body = api._do_install("does-not-exist")
assert status == 400
assert "not found" in body["error"]
def test_remove_endpoint_unknown(fake_dirs, no_docker):
status, body = api._do_remove("ghost")
assert status == 404
def test_remove_endpoint_happy_path(fake_dirs, no_docker):
apps, bundled = fake_dirs
_write_bundled(bundled, "fileshare", env_example="A=real")
api._do_install("fileshare")
assert (apps / "fileshare").exists()
status, body = api._do_remove("fileshare")
assert status == 200
assert body["removed"] == "fileshare"
assert not (apps / "fileshare").exists()
def _request(port, path, cookie=None, method="GET", body=None):
headers = {}
if cookie is not None:
headers["Cookie"] = cookie
data = None
if body is not None:
headers["Content-Type"] = "application/json"
data = json.dumps(body).encode()
return urllib.request.Request(
f"http://127.0.0.1:{port}{path}",
data=data,
headers=headers,
method=method,
)
def test_http_get_apps_route(fake_dirs, no_docker, admin_session):
"""Smoke test the actual HTTP server with a real socket, urllib client."""
server = api.HTTPServer(("127.0.0.1", 0), api._Handler) # port 0 → ephemeral
port = server.server_address[1]
t = threading.Thread(target=server.serve_forever, daemon=True)
t.start()
try:
with urllib.request.urlopen(_request(port, "/api/apps", cookie=admin_session)) as r:
assert r.status == 200
data = json.loads(r.read())
assert data == []
with urllib.request.urlopen(_request(port, "/", cookie=admin_session)) as r:
assert r.status == 200
assert b"Furtka Apps" in r.read()
# Unknown route → 404 JSON.
try:
urllib.request.urlopen(_request(port, "/api/nope", cookie=admin_session))
raise AssertionError("expected 404")
except urllib.error.HTTPError as e:
assert e.code == 404
finally:
server.shutdown()
server.server_close()
def test_http_post_install_unknown_app(fake_dirs, admin_session):
server = api.HTTPServer(("127.0.0.1", 0), api._Handler)
port = server.server_address[1]
t = threading.Thread(target=server.serve_forever, daemon=True)
t.start()
try:
req = _request(
port,
"/api/apps/install",
cookie=admin_session,
method="POST",
body={"name": "ghost"},
)
try:
urllib.request.urlopen(req)
raise AssertionError("expected 400")
except urllib.error.HTTPError as e:
assert e.code == 400
body = json.loads(e.read())
assert "not found" in body["error"]
finally:
server.shutdown()
server.server_close()
# --- Auth guard + login flow ------------------------------------------------
def _start_server():
server = api.HTTPServer(("127.0.0.1", 0), api._Handler)
port = server.server_address[1]
t = threading.Thread(target=server.serve_forever, daemon=True)
t.start()
return server, port
def test_unauthenticated_api_returns_401(fake_dirs):
# No admin_session fixture → no cookie on the request.
server, port = _start_server()
try:
try:
urllib.request.urlopen(_request(port, "/api/apps"))
raise AssertionError("expected 401")
except urllib.error.HTTPError as e:
assert e.code == 401
body = json.loads(e.read())
assert body["error"] == "not authenticated"
finally:
server.shutdown()
server.server_close()
def test_unauthenticated_html_redirects_to_login(fake_dirs):
server, port = _start_server()
try:
# Disable redirect following so we can inspect the 302.
opener = urllib.request.build_opener(_NoRedirectHandler())
try:
opener.open(_request(port, "/apps"))
raise AssertionError("expected 302")
except urllib.error.HTTPError as e:
assert e.code == 302
assert e.headers["Location"] == "/login"
finally:
server.shutdown()
server.server_close()
class _NoRedirectHandler(urllib.request.HTTPRedirectHandler):
def redirect_request(self, *args, **kwargs):
return None
def test_get_login_renders_login_form_when_admin_exists(fake_dirs):
auth.create_admin("daniel", "hunter2-pw")
server, port = _start_server()
try:
with urllib.request.urlopen(_request(port, "/login")) as r:
html = r.read().decode()
assert r.status == 200
assert "Furtka login" in html
# No setup confirm-password field rendered in login mode.
assert 'id="password2"' not in html
assert "Repeat password" not in html
finally:
server.shutdown()
server.server_close()
def test_get_login_renders_setup_form_when_no_admin(fake_dirs):
server, port = _start_server()
try:
with urllib.request.urlopen(_request(port, "/login")) as r:
html = r.read().decode()
assert r.status == 200
assert "Set admin password" in html
assert "password2" in html # setup confirm field rendered
finally:
server.shutdown()
server.server_close()
def test_get_login_redirects_when_already_authed(fake_dirs, admin_session):
server, port = _start_server()
try:
opener = urllib.request.build_opener(_NoRedirectHandler())
try:
opener.open(_request(port, "/login", cookie=admin_session))
raise AssertionError("expected 302")
except urllib.error.HTTPError as e:
assert e.code == 302
assert e.headers["Location"] == "/apps"
finally:
server.shutdown()
server.server_close()
def test_post_login_setup_creates_admin(fake_dirs):
server, port = _start_server()
try:
req = _request(
port,
"/login",
method="POST",
body={
"username": "daniel",
"password": "a-real-password",
"password2": "a-real-password",
},
)
with urllib.request.urlopen(req) as r:
assert r.status == 200
set_cookie = r.headers["Set-Cookie"]
assert auth.COOKIE_NAME in set_cookie
assert "HttpOnly" in set_cookie
assert "SameSite=Strict" in set_cookie
# users.json got written.
assert auth.load_users()["admin"]["username"] == "daniel"
# And the password really works.
assert auth.authenticate("daniel", "a-real-password") is True
finally:
server.shutdown()
server.server_close()
def test_post_login_setup_rejects_password_mismatch(fake_dirs):
server, port = _start_server()
try:
req = _request(
port,
"/login",
method="POST",
body={"username": "x", "password": "abcdefgh", "password2": "different"},
)
try:
urllib.request.urlopen(req)
raise AssertionError("expected 400")
except urllib.error.HTTPError as e:
assert e.code == 400
body = json.loads(e.read())
assert "match" in body["error"].lower()
# No admin created.
assert auth.setup_needed() is True
finally:
server.shutdown()
server.server_close()
def test_post_login_setup_rejects_short_password(fake_dirs):
server, port = _start_server()
try:
req = _request(
port,
"/login",
method="POST",
body={"username": "x", "password": "short", "password2": "short"},
)
try:
urllib.request.urlopen(req)
raise AssertionError("expected 400")
except urllib.error.HTTPError as e:
assert e.code == 400
finally:
server.shutdown()
server.server_close()
def test_post_login_success_with_correct_credentials(fake_dirs):
auth.create_admin("daniel", "hunter2-pw")
server, port = _start_server()
try:
req = _request(
port,
"/login",
method="POST",
body={"username": "daniel", "password": "hunter2-pw"},
)
with urllib.request.urlopen(req) as r:
assert r.status == 200
set_cookie = r.headers["Set-Cookie"]
assert auth.COOKIE_NAME in set_cookie
finally:
server.shutdown()
server.server_close()
def test_post_login_rejects_wrong_password(fake_dirs):
auth.create_admin("daniel", "hunter2-pw")
server, port = _start_server()
try:
req = _request(
port,
"/login",
method="POST",
body={"username": "daniel", "password": "nope"},
)
try:
urllib.request.urlopen(req)
raise AssertionError("expected 401")
except urllib.error.HTTPError as e:
assert e.code == 401
finally:
server.shutdown()
server.server_close()
def test_post_logout_revokes_session(fake_dirs, admin_session):
server, port = _start_server()
try:
# Logout returns 200 and clears the cookie.
with urllib.request.urlopen(
_request(port, "/logout", cookie=admin_session, method="POST", body={})
) as r:
assert r.status == 200
set_cookie = r.headers["Set-Cookie"]
assert "Max-Age=0" in set_cookie
# Subsequent API call with same cookie → 401 (session revoked).
try:
urllib.request.urlopen(_request(port, "/api/apps", cookie=admin_session))
raise AssertionError("expected 401")
except urllib.error.HTTPError as e:
assert e.code == 401
finally:
server.shutdown()
server.server_close()
def test_post_to_protected_route_without_auth_is_401(fake_dirs):
server, port = _start_server()
try:
req = _request(
port,
"/api/apps/install",
method="POST",
body={"name": "whatever"},
)
try:
urllib.request.urlopen(req)
raise AssertionError("expected 401")
except urllib.error.HTTPError as e:
assert e.code == 401
finally:
server.shutdown()
server.server_close()
# --- Settings endpoints ------------------------------------------------------
SETTINGS_MANIFEST = dict(
VALID_MANIFEST,
description_long="Long help text.",
settings=[
{
"name": "SMB_USER",
"label": "User",
"type": "text",
"default": "furtka",
"required": True,
},
{"name": "SMB_PASSWORD", "label": "Pass", "type": "password", "required": True},
],
)
def test_get_settings_bundled(fake_dirs):
_, bundled = fake_dirs
_write_bundled(
bundled, "fileshare", manifest=SETTINGS_MANIFEST, env_example="SMB_USER=furtka\n"
)
status, body = api._do_get_settings("fileshare")
assert status == 200
assert body["installed"] is False
assert body["description_long"] == "Long help text."
names = [s["name"] for s in body["settings"]]
assert names == ["SMB_USER", "SMB_PASSWORD"]
# Password values never leak back.
pwd = next(s for s in body["settings"] if s["name"] == "SMB_PASSWORD")
assert pwd["value"] == ""
# Text value comes from .env.example.
user = next(s for s in body["settings"] if s["name"] == "SMB_USER")
assert user["value"] == "furtka"
def test_get_settings_not_found(fake_dirs):
status, _ = api._do_get_settings("ghost")
assert status == 404
def test_install_with_settings_writes_env_via_api(fake_dirs, no_docker):
_, bundled = fake_dirs
_write_bundled(bundled, "fileshare", manifest=SETTINGS_MANIFEST)
status, body = api._do_install(
"fileshare", settings={"SMB_USER": "alice", "SMB_PASSWORD": "s3cret"}
)
assert status == 200, body
apps, _ = fake_dirs
env = (apps / "fileshare" / ".env").read_text()
assert "SMB_USER=alice" in env
assert "SMB_PASSWORD=s3cret" in env
def test_install_with_settings_rejects_empty_required_via_api(fake_dirs, no_docker):
_, bundled = fake_dirs
_write_bundled(bundled, "fileshare", manifest=SETTINGS_MANIFEST)
status, body = api._do_install("fileshare", settings={"SMB_USER": "a", "SMB_PASSWORD": ""})
assert status == 400
assert "SMB_PASSWORD" in body["error"]
def test_update_settings_merges(fake_dirs, no_docker):
_, bundled = fake_dirs
_write_bundled(bundled, "fileshare", manifest=SETTINGS_MANIFEST)
api._do_install("fileshare", settings={"SMB_USER": "alice", "SMB_PASSWORD": "original"})
# Edit flow: submit only the changed password.
status, body = api._do_update_settings("fileshare", {"SMB_PASSWORD": "newpass"})
assert status == 200, body
apps, _ = fake_dirs
env = (apps / "fileshare" / ".env").read_text()
assert "SMB_USER=alice" in env
assert "SMB_PASSWORD=newpass" in env
def test_update_settings_unknown_app(fake_dirs):
status, _ = api._do_update_settings("ghost", {"SMB_USER": "x"})
assert status == 404
def test_http_get_settings_route(fake_dirs, no_docker, admin_session):
_, bundled = fake_dirs
_write_bundled(bundled, "fileshare", manifest=SETTINGS_MANIFEST)
server = api.HTTPServer(("127.0.0.1", 0), api._Handler)
port = server.server_address[1]
t = threading.Thread(target=server.serve_forever, daemon=True)
t.start()
try:
with urllib.request.urlopen(
_request(port, "/api/apps/fileshare/settings", cookie=admin_session)
) as r:
assert r.status == 200
data = json.loads(r.read())
assert data["name"] == "fileshare"
assert len(data["settings"]) == 2
finally:
server.shutdown()
server.server_close()
# --- Update endpoint --------------------------------------------------------
@pytest.fixture
def update_docker_stubs(monkeypatch):
"""Stub the dockerops helpers _do_update touches. Tests tune the return
values of running_/local_image_id via `state` to steer the comparison."""
state = {
"tags": {"samba": "dperson/samba:latest"},
"running": {"samba": "sha256:OLD"},
"local": {"samba": "sha256:OLD"},
"pull_called": 0,
"up_called": 0,
"pull_raises": None,
}
def _pull(app_dir, project):
state["pull_called"] += 1
if state["pull_raises"]:
raise state["pull_raises"]
def _up(app_dir, project):
state["up_called"] += 1
monkeypatch.setattr(api.dockerops, "compose_pull", _pull)
monkeypatch.setattr(api.dockerops, "compose_up", _up)
monkeypatch.setattr(
api.dockerops, "compose_image_tags", lambda app_dir, project: dict(state["tags"])
)
monkeypatch.setattr(
api.dockerops,
"running_container_image_id",
lambda app_dir, project, service: state["running"].get(service),
)
monkeypatch.setattr(api.dockerops, "local_image_id", lambda tag: state["local"].get("samba"))
return state
def test_update_not_installed(fake_dirs):
status, body = api._do_update("ghost")
assert status == 404
assert "not installed" in body["error"]
def test_update_no_changes(fake_dirs, no_docker, update_docker_stubs):
_, bundled = fake_dirs
_write_bundled(bundled, "fileshare", env_example="A=real")
api._do_install("fileshare")
update_docker_stubs["up_called"] = 0 # reset counter after install
status, body = api._do_update("fileshare")
assert status == 200
assert body["updated"] is False
assert body["services"] == []
assert update_docker_stubs["pull_called"] == 1
assert update_docker_stubs["up_called"] == 0
def test_update_changes_applied(fake_dirs, no_docker, update_docker_stubs):
_, bundled = fake_dirs
_write_bundled(bundled, "fileshare", env_example="A=real")
api._do_install("fileshare")
update_docker_stubs["up_called"] = 0 # reset counter after install
# Simulate: pull advanced the local image.
update_docker_stubs["local"] = {"samba": "sha256:NEW"}
status, body = api._do_update("fileshare")
assert status == 200
assert body["updated"] is True
[change] = body["services"]
assert change == {
"service": "samba",
"from": "sha256:OLD",
"to": "sha256:NEW",
"tag": "dperson/samba:latest",
}
assert update_docker_stubs["up_called"] == 1
def test_update_skips_services_not_running(fake_dirs, no_docker, update_docker_stubs):
_, bundled = fake_dirs
_write_bundled(bundled, "fileshare", env_example="A=real")
api._do_install("fileshare")
update_docker_stubs["up_called"] = 0 # reset counter after install
# Container not up at all: running_container_image_id returns None.
update_docker_stubs["running"] = {}
update_docker_stubs["local"] = {"samba": "sha256:NEW"}
status, body = api._do_update("fileshare")
assert status == 200
assert body["updated"] is False
assert update_docker_stubs["up_called"] == 0
def test_update_returns_502_on_pull_error(fake_dirs, no_docker, update_docker_stubs):
_, bundled = fake_dirs
_write_bundled(bundled, "fileshare", env_example="A=real")
api._do_install("fileshare")
update_docker_stubs["up_called"] = 0 # reset counter after install
update_docker_stubs["pull_raises"] = api.dockerops.DockerError("no network")
status, body = api._do_update("fileshare")
assert status == 502
assert "no network" in body["error"]
assert update_docker_stubs["up_called"] == 0
# --- Furtka self-update endpoints ------------------------------------------
@pytest.fixture
def stub_furtka_updater(monkeypatch):
"""Stub the updater module so api endpoints don't hit Forgejo / systemd-run."""
state = {"check_called": 0, "apply_called": 0, "status_called": 0}
from furtka import updater
class _Lock:
def close(self):
pass
def stub_check():
state["check_called"] += 1
return updater.UpdateCheck(
current="26.0-alpha",
latest="26.1-alpha",
update_available=True,
tarball_url="https://x/t.tar.gz",
sha256_url="https://x/t.tar.gz.sha256",
)
def stub_acquire_lock():
return _Lock()
def stub_read_state():
state["status_called"] += 1
return {"stage": "done", "version": "26.1-alpha"}
import subprocess
def stub_subprocess_run(*args, **kwargs):
state["apply_called"] += 1
class _Result:
returncode = 0
stdout = ""
stderr = ""
return _Result()
monkeypatch.setattr(updater, "check_update", stub_check)
monkeypatch.setattr(updater, "acquire_lock", stub_acquire_lock)
monkeypatch.setattr(updater, "read_state", stub_read_state)
monkeypatch.setattr(subprocess, "run", stub_subprocess_run)
return state
def test_furtka_update_check_endpoint(stub_furtka_updater):
status, body = api._do_furtka_check()
assert status == 200
assert body == {
"current": "26.0-alpha",
"latest": "26.1-alpha",
"update_available": True,
}
assert stub_furtka_updater["check_called"] == 1
def test_furtka_update_check_reports_updater_errors(monkeypatch):
from furtka import updater
def raising():
raise updater.UpdateError("no network")
monkeypatch.setattr(updater, "check_update", raising)
status, body = api._do_furtka_check()
assert status == 502
assert "no network" in body["error"]
def test_furtka_update_apply_endpoint_dispatches(stub_furtka_updater):
status, body = api._do_furtka_apply()
assert status == 202
assert body["status"] == "dispatched"
assert stub_furtka_updater["apply_called"] == 1
def test_furtka_update_apply_returns_409_if_locked(monkeypatch):
from furtka import updater
def raising():
raise updater.UpdateError("another update is already in progress")
monkeypatch.setattr(updater, "acquire_lock", raising)
status, body = api._do_furtka_apply()
assert status == 409
assert "in progress" in body["error"]
def test_furtka_update_status_endpoint(stub_furtka_updater):
status, body = api._do_furtka_status()
assert status == 200
assert body == {"stage": "done", "version": "26.1-alpha"}
assert stub_furtka_updater["status_called"] == 1
def test_http_post_update_route(fake_dirs, no_docker, update_docker_stubs, admin_session):
_, bundled = fake_dirs
_write_bundled(bundled, "fileshare", env_example="A=real")
api._do_install("fileshare")
update_docker_stubs["up_called"] = 0 # reset counter after install
update_docker_stubs["local"] = {"samba": "sha256:NEW"}
server = api.HTTPServer(("127.0.0.1", 0), api._Handler)
port = server.server_address[1]
t = threading.Thread(target=server.serve_forever, daemon=True)
t.start()
try:
req = _request(
port,
"/api/apps/fileshare/update",
cookie=admin_session,
method="POST",
body={},
)
with urllib.request.urlopen(req) as r:
assert r.status == 200
body = json.loads(r.read())
assert body["updated"] is True
assert body["services"][0]["service"] == "samba"
finally:
server.shutdown()
server.server_close()
def test_http_post_install_with_settings(fake_dirs, no_docker, admin_session):
_, bundled = fake_dirs
_write_bundled(bundled, "fileshare", manifest=SETTINGS_MANIFEST)
server = api.HTTPServer(("127.0.0.1", 0), api._Handler)
port = server.server_address[1]
t = threading.Thread(target=server.serve_forever, daemon=True)
t.start()
try:
req = _request(
port,
"/api/apps/install",
cookie=admin_session,
method="POST",
body={
"name": "fileshare",
"settings": {"SMB_USER": "alice", "SMB_PASSWORD": "s3cret"},
},
)
with urllib.request.urlopen(req) as r:
assert r.status == 200
apps, _ = fake_dirs
assert "SMB_PASSWORD=s3cret" in (apps / "fileshare" / ".env").read_text()
finally:
server.shutdown()
server.server_close()
# --- Catalog endpoints ------------------------------------------------------
def test_catalog_status_reports_absent_catalog(fake_dirs, monkeypatch):
"""With no /var/lib/furtka/catalog/ on disk, status reports current=None + empty state."""
# FURTKA_CATALOG_STATE is not touched by fake_dirs — point it at tmp so we
# don't hit the production path.
monkeypatch.setenv("FURTKA_CATALOG_STATE", str(fake_dirs[0].parent / "catalog-state.json"))
import importlib
from furtka import catalog as c
importlib.reload(c)
status, body = api._do_catalog_status()
assert status == 200
assert body["current"] is None
assert body["state"] == {}
def test_catalog_check_surfaces_forgejo_error(fake_dirs, monkeypatch):
monkeypatch.setenv("FURTKA_CATALOG_STATE", str(fake_dirs[0].parent / "catalog-state.json"))
import importlib
from furtka import _release_common as _rc
from furtka import catalog as c
importlib.reload(c)
def boom(host, repo, path, *, error_cls=RuntimeError):
raise error_cls("forgejo api down")
monkeypatch.setattr(_rc, "forgejo_api", boom)
status, body = api._do_catalog_check()
assert status == 502
assert "forgejo api down" in body["error"]
# --- Power endpoints --------------------------------------------------------
def test_power_rejects_unknown_action(fake_dirs):
status, body = api._do_power({"action": "format-harddrive"})
assert status == 400
assert "action" in body["error"]
def test_power_rejects_missing_action(fake_dirs):
status, body = api._do_power({})
assert status == 400
def test_power_reboot_dispatches_systemd_run(fake_dirs, monkeypatch):
seen = []
class _FakeCompleted:
returncode = 0
stdout = ""
stderr = ""
def fake_run(cmd, *, check=False, capture_output=False, text=False):
seen.append(cmd)
return _FakeCompleted()
monkeypatch.setattr("subprocess.run", fake_run)
status, body = api._do_power({"action": "reboot"})
assert status == 202
assert body == {"action": "reboot", "scheduled_in_seconds": 3}
# The dispatched command is a delayed systemd-run that eventually
# invokes `systemctl reboot`. Asserting the key flags catches
# accidental regressions (e.g. losing --no-block would block the API
# thread until the unit completes).
assert seen[0][:1] == ["systemd-run"]
assert "--on-active=3s" in seen[0]
assert "--no-block" in seen[0]
assert seen[0][-2:] == ["systemctl", "reboot"]
def test_power_poweroff_dispatches_systemctl_poweroff(fake_dirs, monkeypatch):
seen = []
class _FakeCompleted:
returncode = 0
monkeypatch.setattr("subprocess.run", lambda cmd, **kw: (seen.append(cmd), _FakeCompleted())[1])
status, body = api._do_power({"action": "poweroff"})
assert status == 202
assert body["action"] == "poweroff"
assert seen[0][-2:] == ["systemctl", "poweroff"]
def test_power_surfaces_systemd_run_missing(fake_dirs, monkeypatch):
def boom(*a, **kw):
raise FileNotFoundError(2, "No such file", "systemd-run")
monkeypatch.setattr("subprocess.run", boom)
status, body = api._do_power({"action": "reboot"})
assert status == 502
assert "systemd-run" in body["error"]