All checks were successful
Build ISO / build-iso (push) Successful in 17m28s
CI / lint (push) Successful in 27s
CI / test (push) Successful in 59s
CI / validate-json (push) Successful in 23s
CI / markdown-links (push) Successful in 15s
Release / release (push) Successful in 11m38s
Three interlocking issues that made 26.11/26.12 effectively un-upgradable from pre-auth versions without manual pacman + symlink surgery. Caught while SSH-testing the .196 VM which landed on a rollback loop after every Update-now click. 1. auth.py imported werkzeug.security, but the target system runs core as bare system Python — neither flask nor werkzeug are pip-installed. Fresh 26.11+ boxes died on import. Replaced with a 50-line stdlib `furtka/passwd.py` using hashlib.pbkdf2_hmac for new hashes and parsing werkzeug's `scrypt:N:r:p$salt$hex` format for backward-read so existing users.json survives. 2. updater._health_check pinged /api/apps expecting 200. Post- auth, /api/apps returns 401 for unauth requests → HTTPError caught as URLError → retry loop → 30s timeout → rollback. Now any 2xx-4xx counts as "server alive"; only 5xx / connection errors fail. Server responding at all is proof it came back up. 3. _do_install released the fcntl lock between sync pre-validation and the systemd-run dispatch. A second POST could slip in, pass the lock check, return 202, and leave its install-bg child to die silently on the in-child lock. Now the API also reads install-state.json and refuses 409 on non-terminal stages — the state file is the reliable signal, the fcntl lock is defence in depth. Test coverage: - tests/test_passwd.py (new, 6 cases): roundtrip, salt uniqueness, format shape, werkzeug scrypt backward-compat against a real hash captured from the .196 box, malformed + non-string rejection. - tests/test_updater.py: +3 cases for _health_check — 4xx=healthy, 5xx=unhealthy, URLError retry loop. - tests/test_api.py: +2 cases for install 409 on non-terminal state + 202 after terminal. All 267 tests green, ruff check + format clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1088 lines
36 KiB
Python
1088 lines
36 KiB
Python
import json
|
|
import threading
|
|
import urllib.error
|
|
import urllib.request
|
|
|
|
import pytest
|
|
|
|
from furtka import api, auth, dockerops
|
|
|
|
VALID_MANIFEST = {
|
|
"name": "fileshare",
|
|
"display_name": "Network Files",
|
|
"version": "0.1.0",
|
|
"description": "SMB share",
|
|
"volumes": ["files"],
|
|
"ports": [445],
|
|
"icon": "icon.svg",
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
def fake_dirs(tmp_path, monkeypatch):
|
|
apps = tmp_path / "apps"
|
|
bundled = tmp_path / "bundled"
|
|
catalog = tmp_path / "catalog"
|
|
users_file = tmp_path / "users.json"
|
|
apps.mkdir()
|
|
bundled.mkdir()
|
|
monkeypatch.setenv("FURTKA_APPS_DIR", str(apps))
|
|
monkeypatch.setenv("FURTKA_BUNDLED_APPS_DIR", str(bundled))
|
|
monkeypatch.setenv("FURTKA_CATALOG_DIR", str(catalog))
|
|
monkeypatch.setenv("FURTKA_USERS_FILE", str(users_file))
|
|
# install_runner writes to /var/lib/furtka/install-state.json and
|
|
# /run/furtka/install.lock by default — redirect into tmp_path so
|
|
# test code doesn't need root.
|
|
monkeypatch.setenv("FURTKA_INSTALL_STATE", str(tmp_path / "install-state.json"))
|
|
monkeypatch.setenv("FURTKA_INSTALL_LOCK", str(tmp_path / "install.lock"))
|
|
# install_runner caches env vars at import time, so reload it to
|
|
# pick up the tmp-path env vars this fixture just set.
|
|
import importlib
|
|
|
|
from furtka import install_runner
|
|
|
|
importlib.reload(install_runner)
|
|
# Scrub any sessions that leaked from a prior test — the SESSIONS
|
|
# store is module-level.
|
|
auth.SESSIONS.clear()
|
|
return apps, bundled
|
|
|
|
|
|
@pytest.fixture
|
|
def admin_session(fake_dirs):
|
|
"""Pre-create an admin account + live session. Returns a Cookie header
|
|
value ready to drop into urllib.request.Request(headers=...)."""
|
|
auth.create_admin("daniel", "hunter2-pw")
|
|
session = auth.SESSIONS.create("daniel")
|
|
return f"{auth.COOKIE_NAME}={session.token}"
|
|
|
|
|
|
@pytest.fixture
|
|
def no_docker(monkeypatch):
|
|
"""Stub docker calls so install/remove can run without a daemon."""
|
|
monkeypatch.setattr(dockerops, "ensure_volume", lambda name: True)
|
|
monkeypatch.setattr(dockerops, "compose_up", lambda app_dir, project: None)
|
|
monkeypatch.setattr(dockerops, "compose_down", lambda app_dir, project: None)
|
|
|
|
|
|
@pytest.fixture
|
|
def no_systemd_run(monkeypatch):
|
|
"""Stub the systemd-run dispatch in _do_install so tests don't need it.
|
|
|
|
The install endpoint now spawns a background systemd-run unit to do
|
|
the docker-facing phases. Tests that exercise the install path only
|
|
care that the sync pre-phase succeeded and the dispatch was
|
|
attempted with the right args — they shouldn't actually fire up
|
|
systemd. subprocess.run gets monkeypatched to return a fake success
|
|
CompletedProcess, and the call args get captured for assertions.
|
|
"""
|
|
import subprocess
|
|
|
|
calls = []
|
|
|
|
def fake_run(cmd, check=False, capture_output=False, text=False, **kwargs):
|
|
calls.append(cmd)
|
|
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
|
|
|
|
monkeypatch.setattr(subprocess, "run", fake_run)
|
|
return calls
|
|
|
|
|
|
def _write_bundled(bundled, name, manifest=None, env_example=None):
|
|
app = bundled / name
|
|
app.mkdir()
|
|
(app / "manifest.json").write_text(json.dumps(manifest or VALID_MANIFEST))
|
|
(app / "docker-compose.yaml").write_text("services: {}\n")
|
|
if env_example is not None:
|
|
(app / ".env.example").write_text(env_example)
|
|
return app
|
|
|
|
|
|
def test_list_installed_empty(fake_dirs):
|
|
assert api._list_installed() == []
|
|
|
|
|
|
def test_list_available_empty(fake_dirs):
|
|
assert api._list_available() == []
|
|
|
|
|
|
def test_list_available_shows_uninstalled(fake_dirs):
|
|
_, bundled = fake_dirs
|
|
_write_bundled(bundled, "fileshare")
|
|
out = api._list_available()
|
|
assert len(out) == 1
|
|
assert out[0]["name"] == "fileshare"
|
|
assert "display_name" in out[0]
|
|
# Source field lets the UI later distinguish catalog from bundled seed.
|
|
assert out[0]["source"] == "bundled"
|
|
|
|
|
|
# --- Icon inlining ----------------------------------------------------------
|
|
|
|
_SIMPLE_SVG = (
|
|
'<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24"><path d="M0 0h10v10H0z"/></svg>'
|
|
)
|
|
|
|
|
|
def _write_icon(app_dir, contents, name="icon.svg"):
|
|
(app_dir / name).write_text(contents)
|
|
|
|
|
|
def test_read_icon_svg_returns_content(tmp_path):
|
|
_write_icon(tmp_path, _SIMPLE_SVG)
|
|
assert api._read_icon_svg(tmp_path, "icon.svg") == _SIMPLE_SVG
|
|
|
|
|
|
def test_read_icon_svg_strips_xml_declaration(tmp_path):
|
|
_write_icon(tmp_path, '<?xml version="1.0" encoding="UTF-8"?>\n' + _SIMPLE_SVG)
|
|
assert api._read_icon_svg(tmp_path, "icon.svg") == _SIMPLE_SVG
|
|
|
|
|
|
def test_read_icon_svg_missing_file_returns_none(tmp_path):
|
|
assert api._read_icon_svg(tmp_path, "ghost.svg") is None
|
|
|
|
|
|
def test_read_icon_svg_no_name_returns_none(tmp_path):
|
|
assert api._read_icon_svg(tmp_path, None) is None
|
|
assert api._read_icon_svg(tmp_path, "") is None
|
|
|
|
|
|
def test_read_icon_svg_rejects_non_svg(tmp_path):
|
|
_write_icon(tmp_path, "<html><body>hi</body></html>")
|
|
assert api._read_icon_svg(tmp_path, "icon.svg") is None
|
|
|
|
|
|
def test_read_icon_svg_rejects_oversized(tmp_path):
|
|
_write_icon(tmp_path, "<svg>" + ("x" * (17 * 1024)) + "</svg>")
|
|
assert api._read_icon_svg(tmp_path, "icon.svg") is None
|
|
|
|
|
|
def test_read_icon_svg_rejects_script_tag(tmp_path):
|
|
_write_icon(tmp_path, "<svg><script>alert(1)</script></svg>")
|
|
assert api._read_icon_svg(tmp_path, "icon.svg") is None
|
|
|
|
|
|
def test_read_icon_svg_rejects_event_handler(tmp_path):
|
|
_write_icon(tmp_path, '<svg onload="alert(1)"><path/></svg>')
|
|
assert api._read_icon_svg(tmp_path, "icon.svg") is None
|
|
|
|
|
|
def test_read_icon_svg_rejects_javascript_url(tmp_path):
|
|
_write_icon(tmp_path, '<svg><a href="javascript:alert(1)"/></svg>')
|
|
assert api._read_icon_svg(tmp_path, "icon.svg") is None
|
|
|
|
|
|
def test_list_available_inlines_icon_svg(fake_dirs):
|
|
_, bundled = fake_dirs
|
|
app = _write_bundled(bundled, "fileshare")
|
|
_write_icon(app, _SIMPLE_SVG)
|
|
[entry] = api._list_available()
|
|
assert entry["icon_svg"] == _SIMPLE_SVG
|
|
|
|
|
|
def test_list_installed_inlines_icon_svg(fake_dirs, no_docker, no_systemd_run):
|
|
apps, bundled = fake_dirs
|
|
app = _write_bundled(bundled, "fileshare", env_example="A=real")
|
|
_write_icon(app, _SIMPLE_SVG)
|
|
api._do_install("fileshare")
|
|
[entry] = api._list_installed()
|
|
assert entry["icon_svg"] == _SIMPLE_SVG
|
|
|
|
|
|
def test_list_available_hides_already_installed(fake_dirs, no_docker, no_systemd_run):
|
|
apps, bundled = fake_dirs
|
|
_write_bundled(bundled, "fileshare", env_example="A=real")
|
|
status, _ = api._do_install("fileshare")
|
|
assert status == 202 # async dispatch
|
|
# Now bundled should NOT include fileshare anymore — the app folder
|
|
# exists on disk (install_from finished synchronously before the
|
|
# dispatch), which is what _list_available uses for the "installed"
|
|
# check.
|
|
assert api._list_available() == []
|
|
# But installed list should.
|
|
installed = api._list_installed()
|
|
assert len(installed) == 1 and installed[0]["name"] == "fileshare"
|
|
|
|
|
|
def test_list_available_prefers_catalog_over_bundled(fake_dirs):
|
|
_, bundled = fake_dirs
|
|
catalog_root = bundled.parent / "catalog" / "apps"
|
|
catalog_root.mkdir(parents=True)
|
|
_write_bundled(bundled, "fileshare")
|
|
# A fileshare in the catalog as well — manifest version 0.2.0 to tell apart.
|
|
catalog_manifest = dict(VALID_MANIFEST, version="0.2.0")
|
|
cat_app = catalog_root / "fileshare"
|
|
cat_app.mkdir()
|
|
(cat_app / "manifest.json").write_text(json.dumps(catalog_manifest))
|
|
|
|
out = api._list_available()
|
|
assert len(out) == 1
|
|
assert out[0]["source"] == "catalog"
|
|
assert out[0]["version"] == "0.2.0"
|
|
|
|
|
|
def test_install_endpoint_rejects_placeholder(fake_dirs):
|
|
_, bundled = fake_dirs
|
|
_write_bundled(bundled, "fileshare", env_example="SMB_PASSWORD=changeme")
|
|
status, body = api._do_install("fileshare")
|
|
assert status == 400
|
|
assert "placeholder" in body["error"]
|
|
|
|
|
|
def test_install_endpoint_rejects_unknown_app(fake_dirs):
|
|
status, body = api._do_install("does-not-exist")
|
|
assert status == 400
|
|
assert "not found" in body["error"]
|
|
|
|
|
|
def test_remove_endpoint_unknown(fake_dirs, no_docker):
|
|
status, body = api._do_remove("ghost")
|
|
assert status == 404
|
|
|
|
|
|
def test_remove_endpoint_happy_path(fake_dirs, no_docker, no_systemd_run):
|
|
apps, bundled = fake_dirs
|
|
_write_bundled(bundled, "fileshare", env_example="A=real")
|
|
api._do_install("fileshare")
|
|
assert (apps / "fileshare").exists()
|
|
status, body = api._do_remove("fileshare")
|
|
assert status == 200
|
|
assert body["removed"] == "fileshare"
|
|
assert not (apps / "fileshare").exists()
|
|
|
|
|
|
def _request(port, path, cookie=None, method="GET", body=None):
|
|
headers = {}
|
|
if cookie is not None:
|
|
headers["Cookie"] = cookie
|
|
data = None
|
|
if body is not None:
|
|
headers["Content-Type"] = "application/json"
|
|
data = json.dumps(body).encode()
|
|
return urllib.request.Request(
|
|
f"http://127.0.0.1:{port}{path}",
|
|
data=data,
|
|
headers=headers,
|
|
method=method,
|
|
)
|
|
|
|
|
|
def test_http_get_apps_route(fake_dirs, no_docker, admin_session):
|
|
"""Smoke test the actual HTTP server with a real socket, urllib client."""
|
|
server = api.HTTPServer(("127.0.0.1", 0), api._Handler) # port 0 → ephemeral
|
|
port = server.server_address[1]
|
|
t = threading.Thread(target=server.serve_forever, daemon=True)
|
|
t.start()
|
|
try:
|
|
with urllib.request.urlopen(_request(port, "/api/apps", cookie=admin_session)) as r:
|
|
assert r.status == 200
|
|
data = json.loads(r.read())
|
|
assert data == []
|
|
with urllib.request.urlopen(_request(port, "/", cookie=admin_session)) as r:
|
|
assert r.status == 200
|
|
assert b"Furtka Apps" in r.read()
|
|
# Unknown route → 404 JSON.
|
|
try:
|
|
urllib.request.urlopen(_request(port, "/api/nope", cookie=admin_session))
|
|
raise AssertionError("expected 404")
|
|
except urllib.error.HTTPError as e:
|
|
assert e.code == 404
|
|
finally:
|
|
server.shutdown()
|
|
server.server_close()
|
|
|
|
|
|
def test_http_post_install_unknown_app(fake_dirs, admin_session):
|
|
server = api.HTTPServer(("127.0.0.1", 0), api._Handler)
|
|
port = server.server_address[1]
|
|
t = threading.Thread(target=server.serve_forever, daemon=True)
|
|
t.start()
|
|
try:
|
|
req = _request(
|
|
port,
|
|
"/api/apps/install",
|
|
cookie=admin_session,
|
|
method="POST",
|
|
body={"name": "ghost"},
|
|
)
|
|
try:
|
|
urllib.request.urlopen(req)
|
|
raise AssertionError("expected 400")
|
|
except urllib.error.HTTPError as e:
|
|
assert e.code == 400
|
|
body = json.loads(e.read())
|
|
assert "not found" in body["error"]
|
|
finally:
|
|
server.shutdown()
|
|
server.server_close()
|
|
|
|
|
|
# --- Auth guard + login flow ------------------------------------------------
|
|
|
|
|
|
def _start_server():
|
|
server = api.HTTPServer(("127.0.0.1", 0), api._Handler)
|
|
port = server.server_address[1]
|
|
t = threading.Thread(target=server.serve_forever, daemon=True)
|
|
t.start()
|
|
return server, port
|
|
|
|
|
|
def test_unauthenticated_api_returns_401(fake_dirs):
|
|
# No admin_session fixture → no cookie on the request.
|
|
server, port = _start_server()
|
|
try:
|
|
try:
|
|
urllib.request.urlopen(_request(port, "/api/apps"))
|
|
raise AssertionError("expected 401")
|
|
except urllib.error.HTTPError as e:
|
|
assert e.code == 401
|
|
body = json.loads(e.read())
|
|
assert body["error"] == "not authenticated"
|
|
finally:
|
|
server.shutdown()
|
|
server.server_close()
|
|
|
|
|
|
def test_unauthenticated_html_redirects_to_login(fake_dirs):
|
|
server, port = _start_server()
|
|
try:
|
|
# Disable redirect following so we can inspect the 302.
|
|
opener = urllib.request.build_opener(_NoRedirectHandler())
|
|
try:
|
|
opener.open(_request(port, "/apps"))
|
|
raise AssertionError("expected 302")
|
|
except urllib.error.HTTPError as e:
|
|
assert e.code == 302
|
|
assert e.headers["Location"] == "/login"
|
|
finally:
|
|
server.shutdown()
|
|
server.server_close()
|
|
|
|
|
|
class _NoRedirectHandler(urllib.request.HTTPRedirectHandler):
|
|
def redirect_request(self, *args, **kwargs):
|
|
return None
|
|
|
|
|
|
def test_get_login_renders_login_form_when_admin_exists(fake_dirs):
|
|
auth.create_admin("daniel", "hunter2-pw")
|
|
server, port = _start_server()
|
|
try:
|
|
with urllib.request.urlopen(_request(port, "/login")) as r:
|
|
html = r.read().decode()
|
|
assert r.status == 200
|
|
assert "Furtka login" in html
|
|
# No setup confirm-password field rendered in login mode.
|
|
assert 'id="password2"' not in html
|
|
assert "Repeat password" not in html
|
|
finally:
|
|
server.shutdown()
|
|
server.server_close()
|
|
|
|
|
|
def test_get_login_renders_setup_form_when_no_admin(fake_dirs):
|
|
server, port = _start_server()
|
|
try:
|
|
with urllib.request.urlopen(_request(port, "/login")) as r:
|
|
html = r.read().decode()
|
|
assert r.status == 200
|
|
assert "Set admin password" in html
|
|
assert "password2" in html # setup confirm field rendered
|
|
finally:
|
|
server.shutdown()
|
|
server.server_close()
|
|
|
|
|
|
def test_get_login_redirects_when_already_authed(fake_dirs, admin_session):
|
|
server, port = _start_server()
|
|
try:
|
|
opener = urllib.request.build_opener(_NoRedirectHandler())
|
|
try:
|
|
opener.open(_request(port, "/login", cookie=admin_session))
|
|
raise AssertionError("expected 302")
|
|
except urllib.error.HTTPError as e:
|
|
assert e.code == 302
|
|
assert e.headers["Location"] == "/apps"
|
|
finally:
|
|
server.shutdown()
|
|
server.server_close()
|
|
|
|
|
|
def test_post_login_setup_creates_admin(fake_dirs):
|
|
server, port = _start_server()
|
|
try:
|
|
req = _request(
|
|
port,
|
|
"/login",
|
|
method="POST",
|
|
body={
|
|
"username": "daniel",
|
|
"password": "a-real-password",
|
|
"password2": "a-real-password",
|
|
},
|
|
)
|
|
with urllib.request.urlopen(req) as r:
|
|
assert r.status == 200
|
|
set_cookie = r.headers["Set-Cookie"]
|
|
assert auth.COOKIE_NAME in set_cookie
|
|
assert "HttpOnly" in set_cookie
|
|
assert "SameSite=Strict" in set_cookie
|
|
# users.json got written.
|
|
assert auth.load_users()["admin"]["username"] == "daniel"
|
|
# And the password really works.
|
|
assert auth.authenticate("daniel", "a-real-password") is True
|
|
finally:
|
|
server.shutdown()
|
|
server.server_close()
|
|
|
|
|
|
def test_post_login_setup_rejects_password_mismatch(fake_dirs):
|
|
server, port = _start_server()
|
|
try:
|
|
req = _request(
|
|
port,
|
|
"/login",
|
|
method="POST",
|
|
body={"username": "x", "password": "abcdefgh", "password2": "different"},
|
|
)
|
|
try:
|
|
urllib.request.urlopen(req)
|
|
raise AssertionError("expected 400")
|
|
except urllib.error.HTTPError as e:
|
|
assert e.code == 400
|
|
body = json.loads(e.read())
|
|
assert "match" in body["error"].lower()
|
|
# No admin created.
|
|
assert auth.setup_needed() is True
|
|
finally:
|
|
server.shutdown()
|
|
server.server_close()
|
|
|
|
|
|
def test_post_login_setup_rejects_short_password(fake_dirs):
|
|
server, port = _start_server()
|
|
try:
|
|
req = _request(
|
|
port,
|
|
"/login",
|
|
method="POST",
|
|
body={"username": "x", "password": "short", "password2": "short"},
|
|
)
|
|
try:
|
|
urllib.request.urlopen(req)
|
|
raise AssertionError("expected 400")
|
|
except urllib.error.HTTPError as e:
|
|
assert e.code == 400
|
|
finally:
|
|
server.shutdown()
|
|
server.server_close()
|
|
|
|
|
|
def test_post_login_success_with_correct_credentials(fake_dirs):
|
|
auth.create_admin("daniel", "hunter2-pw")
|
|
server, port = _start_server()
|
|
try:
|
|
req = _request(
|
|
port,
|
|
"/login",
|
|
method="POST",
|
|
body={"username": "daniel", "password": "hunter2-pw"},
|
|
)
|
|
with urllib.request.urlopen(req) as r:
|
|
assert r.status == 200
|
|
set_cookie = r.headers["Set-Cookie"]
|
|
assert auth.COOKIE_NAME in set_cookie
|
|
finally:
|
|
server.shutdown()
|
|
server.server_close()
|
|
|
|
|
|
def test_post_login_rejects_wrong_password(fake_dirs):
|
|
auth.create_admin("daniel", "hunter2-pw")
|
|
server, port = _start_server()
|
|
try:
|
|
req = _request(
|
|
port,
|
|
"/login",
|
|
method="POST",
|
|
body={"username": "daniel", "password": "nope"},
|
|
)
|
|
try:
|
|
urllib.request.urlopen(req)
|
|
raise AssertionError("expected 401")
|
|
except urllib.error.HTTPError as e:
|
|
assert e.code == 401
|
|
finally:
|
|
server.shutdown()
|
|
server.server_close()
|
|
|
|
|
|
def test_post_logout_revokes_session(fake_dirs, admin_session):
|
|
server, port = _start_server()
|
|
try:
|
|
# Logout returns 200 and clears the cookie.
|
|
with urllib.request.urlopen(
|
|
_request(port, "/logout", cookie=admin_session, method="POST", body={})
|
|
) as r:
|
|
assert r.status == 200
|
|
set_cookie = r.headers["Set-Cookie"]
|
|
assert "Max-Age=0" in set_cookie
|
|
# Subsequent API call with same cookie → 401 (session revoked).
|
|
try:
|
|
urllib.request.urlopen(_request(port, "/api/apps", cookie=admin_session))
|
|
raise AssertionError("expected 401")
|
|
except urllib.error.HTTPError as e:
|
|
assert e.code == 401
|
|
finally:
|
|
server.shutdown()
|
|
server.server_close()
|
|
|
|
|
|
def test_post_to_protected_route_without_auth_is_401(fake_dirs):
|
|
server, port = _start_server()
|
|
try:
|
|
req = _request(
|
|
port,
|
|
"/api/apps/install",
|
|
method="POST",
|
|
body={"name": "whatever"},
|
|
)
|
|
try:
|
|
urllib.request.urlopen(req)
|
|
raise AssertionError("expected 401")
|
|
except urllib.error.HTTPError as e:
|
|
assert e.code == 401
|
|
finally:
|
|
server.shutdown()
|
|
server.server_close()
|
|
|
|
|
|
# --- Settings endpoints ------------------------------------------------------
|
|
|
|
SETTINGS_MANIFEST = dict(
|
|
VALID_MANIFEST,
|
|
description_long="Long help text.",
|
|
settings=[
|
|
{
|
|
"name": "SMB_USER",
|
|
"label": "User",
|
|
"type": "text",
|
|
"default": "furtka",
|
|
"required": True,
|
|
},
|
|
{"name": "SMB_PASSWORD", "label": "Pass", "type": "password", "required": True},
|
|
],
|
|
)
|
|
|
|
|
|
def test_get_settings_bundled(fake_dirs):
|
|
_, bundled = fake_dirs
|
|
_write_bundled(
|
|
bundled, "fileshare", manifest=SETTINGS_MANIFEST, env_example="SMB_USER=furtka\n"
|
|
)
|
|
status, body = api._do_get_settings("fileshare")
|
|
assert status == 200
|
|
assert body["installed"] is False
|
|
assert body["description_long"] == "Long help text."
|
|
names = [s["name"] for s in body["settings"]]
|
|
assert names == ["SMB_USER", "SMB_PASSWORD"]
|
|
# Password values never leak back.
|
|
pwd = next(s for s in body["settings"] if s["name"] == "SMB_PASSWORD")
|
|
assert pwd["value"] == ""
|
|
# Text value comes from .env.example.
|
|
user = next(s for s in body["settings"] if s["name"] == "SMB_USER")
|
|
assert user["value"] == "furtka"
|
|
|
|
|
|
def test_get_settings_not_found(fake_dirs):
|
|
status, _ = api._do_get_settings("ghost")
|
|
assert status == 404
|
|
|
|
|
|
def test_install_with_settings_writes_env_via_api(fake_dirs, no_docker, no_systemd_run):
|
|
_, bundled = fake_dirs
|
|
_write_bundled(bundled, "fileshare", manifest=SETTINGS_MANIFEST)
|
|
status, body = api._do_install(
|
|
"fileshare", settings={"SMB_USER": "alice", "SMB_PASSWORD": "s3cret"}
|
|
)
|
|
assert status == 202, body
|
|
apps, _ = fake_dirs
|
|
env = (apps / "fileshare" / ".env").read_text()
|
|
assert "SMB_USER=alice" in env
|
|
assert "SMB_PASSWORD=s3cret" in env
|
|
|
|
|
|
def test_install_with_settings_rejects_empty_required_via_api(fake_dirs, no_docker):
|
|
_, bundled = fake_dirs
|
|
_write_bundled(bundled, "fileshare", manifest=SETTINGS_MANIFEST)
|
|
status, body = api._do_install("fileshare", settings={"SMB_USER": "a", "SMB_PASSWORD": ""})
|
|
assert status == 400
|
|
assert "SMB_PASSWORD" in body["error"]
|
|
|
|
|
|
def test_update_settings_merges(fake_dirs, no_docker, no_systemd_run):
|
|
_, bundled = fake_dirs
|
|
_write_bundled(bundled, "fileshare", manifest=SETTINGS_MANIFEST)
|
|
api._do_install("fileshare", settings={"SMB_USER": "alice", "SMB_PASSWORD": "original"})
|
|
# Edit flow: submit only the changed password.
|
|
status, body = api._do_update_settings("fileshare", {"SMB_PASSWORD": "newpass"})
|
|
assert status == 200, body
|
|
apps, _ = fake_dirs
|
|
env = (apps / "fileshare" / ".env").read_text()
|
|
assert "SMB_USER=alice" in env
|
|
assert "SMB_PASSWORD=newpass" in env
|
|
|
|
|
|
def test_update_settings_unknown_app(fake_dirs):
|
|
status, _ = api._do_update_settings("ghost", {"SMB_USER": "x"})
|
|
assert status == 404
|
|
|
|
|
|
def test_http_get_settings_route(fake_dirs, no_docker, admin_session):
|
|
_, bundled = fake_dirs
|
|
_write_bundled(bundled, "fileshare", manifest=SETTINGS_MANIFEST)
|
|
server = api.HTTPServer(("127.0.0.1", 0), api._Handler)
|
|
port = server.server_address[1]
|
|
t = threading.Thread(target=server.serve_forever, daemon=True)
|
|
t.start()
|
|
try:
|
|
with urllib.request.urlopen(
|
|
_request(port, "/api/apps/fileshare/settings", cookie=admin_session)
|
|
) as r:
|
|
assert r.status == 200
|
|
data = json.loads(r.read())
|
|
assert data["name"] == "fileshare"
|
|
assert len(data["settings"]) == 2
|
|
finally:
|
|
server.shutdown()
|
|
server.server_close()
|
|
|
|
|
|
# --- Update endpoint --------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture
|
|
def update_docker_stubs(monkeypatch):
|
|
"""Stub the dockerops helpers _do_update touches. Tests tune the return
|
|
values of running_/local_image_id via `state` to steer the comparison."""
|
|
state = {
|
|
"tags": {"samba": "dperson/samba:latest"},
|
|
"running": {"samba": "sha256:OLD"},
|
|
"local": {"samba": "sha256:OLD"},
|
|
"pull_called": 0,
|
|
"up_called": 0,
|
|
"pull_raises": None,
|
|
}
|
|
|
|
def _pull(app_dir, project):
|
|
state["pull_called"] += 1
|
|
if state["pull_raises"]:
|
|
raise state["pull_raises"]
|
|
|
|
def _up(app_dir, project):
|
|
state["up_called"] += 1
|
|
|
|
monkeypatch.setattr(api.dockerops, "compose_pull", _pull)
|
|
monkeypatch.setattr(api.dockerops, "compose_up", _up)
|
|
monkeypatch.setattr(
|
|
api.dockerops, "compose_image_tags", lambda app_dir, project: dict(state["tags"])
|
|
)
|
|
monkeypatch.setattr(
|
|
api.dockerops,
|
|
"running_container_image_id",
|
|
lambda app_dir, project, service: state["running"].get(service),
|
|
)
|
|
monkeypatch.setattr(api.dockerops, "local_image_id", lambda tag: state["local"].get("samba"))
|
|
return state
|
|
|
|
|
|
def test_update_not_installed(fake_dirs):
|
|
status, body = api._do_update("ghost")
|
|
assert status == 404
|
|
assert "not installed" in body["error"]
|
|
|
|
|
|
def test_update_no_changes(fake_dirs, no_docker, no_systemd_run, update_docker_stubs):
|
|
_, bundled = fake_dirs
|
|
_write_bundled(bundled, "fileshare", env_example="A=real")
|
|
api._do_install("fileshare")
|
|
update_docker_stubs["up_called"] = 0 # reset counter after install
|
|
status, body = api._do_update("fileshare")
|
|
assert status == 200
|
|
assert body["updated"] is False
|
|
assert body["services"] == []
|
|
assert update_docker_stubs["pull_called"] == 1
|
|
assert update_docker_stubs["up_called"] == 0
|
|
|
|
|
|
def test_update_changes_applied(fake_dirs, no_docker, no_systemd_run, update_docker_stubs):
|
|
_, bundled = fake_dirs
|
|
_write_bundled(bundled, "fileshare", env_example="A=real")
|
|
api._do_install("fileshare")
|
|
update_docker_stubs["up_called"] = 0 # reset counter after install
|
|
# Simulate: pull advanced the local image.
|
|
update_docker_stubs["local"] = {"samba": "sha256:NEW"}
|
|
status, body = api._do_update("fileshare")
|
|
assert status == 200
|
|
assert body["updated"] is True
|
|
[change] = body["services"]
|
|
assert change == {
|
|
"service": "samba",
|
|
"from": "sha256:OLD",
|
|
"to": "sha256:NEW",
|
|
"tag": "dperson/samba:latest",
|
|
}
|
|
assert update_docker_stubs["up_called"] == 1
|
|
|
|
|
|
def test_update_skips_services_not_running(
|
|
fake_dirs, no_docker, no_systemd_run, update_docker_stubs
|
|
):
|
|
_, bundled = fake_dirs
|
|
_write_bundled(bundled, "fileshare", env_example="A=real")
|
|
api._do_install("fileshare")
|
|
update_docker_stubs["up_called"] = 0 # reset counter after install
|
|
# Container not up at all: running_container_image_id returns None.
|
|
update_docker_stubs["running"] = {}
|
|
update_docker_stubs["local"] = {"samba": "sha256:NEW"}
|
|
status, body = api._do_update("fileshare")
|
|
assert status == 200
|
|
assert body["updated"] is False
|
|
assert update_docker_stubs["up_called"] == 0
|
|
|
|
|
|
def test_update_returns_502_on_pull_error(
|
|
fake_dirs, no_docker, no_systemd_run, update_docker_stubs
|
|
):
|
|
_, bundled = fake_dirs
|
|
_write_bundled(bundled, "fileshare", env_example="A=real")
|
|
api._do_install("fileshare")
|
|
update_docker_stubs["up_called"] = 0 # reset counter after install
|
|
update_docker_stubs["pull_raises"] = api.dockerops.DockerError("no network")
|
|
status, body = api._do_update("fileshare")
|
|
assert status == 502
|
|
assert "no network" in body["error"]
|
|
assert update_docker_stubs["up_called"] == 0
|
|
|
|
|
|
# --- Furtka self-update endpoints ------------------------------------------
|
|
|
|
|
|
@pytest.fixture
|
|
def stub_furtka_updater(monkeypatch):
|
|
"""Stub the updater module so api endpoints don't hit Forgejo / systemd-run."""
|
|
state = {"check_called": 0, "apply_called": 0, "status_called": 0}
|
|
|
|
from furtka import updater
|
|
|
|
class _Lock:
|
|
def close(self):
|
|
pass
|
|
|
|
def stub_check():
|
|
state["check_called"] += 1
|
|
return updater.UpdateCheck(
|
|
current="26.0-alpha",
|
|
latest="26.1-alpha",
|
|
update_available=True,
|
|
tarball_url="https://x/t.tar.gz",
|
|
sha256_url="https://x/t.tar.gz.sha256",
|
|
)
|
|
|
|
def stub_acquire_lock():
|
|
return _Lock()
|
|
|
|
def stub_read_state():
|
|
state["status_called"] += 1
|
|
return {"stage": "done", "version": "26.1-alpha"}
|
|
|
|
import subprocess
|
|
|
|
def stub_subprocess_run(*args, **kwargs):
|
|
state["apply_called"] += 1
|
|
|
|
class _Result:
|
|
returncode = 0
|
|
stdout = ""
|
|
stderr = ""
|
|
|
|
return _Result()
|
|
|
|
monkeypatch.setattr(updater, "check_update", stub_check)
|
|
monkeypatch.setattr(updater, "acquire_lock", stub_acquire_lock)
|
|
monkeypatch.setattr(updater, "read_state", stub_read_state)
|
|
monkeypatch.setattr(subprocess, "run", stub_subprocess_run)
|
|
return state
|
|
|
|
|
|
def test_furtka_update_check_endpoint(stub_furtka_updater):
|
|
status, body = api._do_furtka_check()
|
|
assert status == 200
|
|
assert body == {
|
|
"current": "26.0-alpha",
|
|
"latest": "26.1-alpha",
|
|
"update_available": True,
|
|
}
|
|
assert stub_furtka_updater["check_called"] == 1
|
|
|
|
|
|
def test_furtka_update_check_reports_updater_errors(monkeypatch):
|
|
from furtka import updater
|
|
|
|
def raising():
|
|
raise updater.UpdateError("no network")
|
|
|
|
monkeypatch.setattr(updater, "check_update", raising)
|
|
status, body = api._do_furtka_check()
|
|
assert status == 502
|
|
assert "no network" in body["error"]
|
|
|
|
|
|
def test_furtka_update_apply_endpoint_dispatches(stub_furtka_updater):
|
|
status, body = api._do_furtka_apply()
|
|
assert status == 202
|
|
assert body["status"] == "dispatched"
|
|
assert stub_furtka_updater["apply_called"] == 1
|
|
|
|
|
|
def test_furtka_update_apply_returns_409_if_locked(monkeypatch):
|
|
from furtka import updater
|
|
|
|
def raising():
|
|
raise updater.UpdateError("another update is already in progress")
|
|
|
|
monkeypatch.setattr(updater, "acquire_lock", raising)
|
|
status, body = api._do_furtka_apply()
|
|
assert status == 409
|
|
assert "in progress" in body["error"]
|
|
|
|
|
|
def test_furtka_update_status_endpoint(stub_furtka_updater):
|
|
status, body = api._do_furtka_status()
|
|
assert status == 200
|
|
assert body == {"stage": "done", "version": "26.1-alpha"}
|
|
assert stub_furtka_updater["status_called"] == 1
|
|
|
|
|
|
def test_http_post_update_route(
|
|
fake_dirs, no_docker, no_systemd_run, update_docker_stubs, admin_session
|
|
):
|
|
_, bundled = fake_dirs
|
|
_write_bundled(bundled, "fileshare", env_example="A=real")
|
|
api._do_install("fileshare")
|
|
update_docker_stubs["up_called"] = 0 # reset counter after install
|
|
update_docker_stubs["local"] = {"samba": "sha256:NEW"}
|
|
server = api.HTTPServer(("127.0.0.1", 0), api._Handler)
|
|
port = server.server_address[1]
|
|
t = threading.Thread(target=server.serve_forever, daemon=True)
|
|
t.start()
|
|
try:
|
|
req = _request(
|
|
port,
|
|
"/api/apps/fileshare/update",
|
|
cookie=admin_session,
|
|
method="POST",
|
|
body={},
|
|
)
|
|
with urllib.request.urlopen(req) as r:
|
|
assert r.status == 200
|
|
body = json.loads(r.read())
|
|
assert body["updated"] is True
|
|
assert body["services"][0]["service"] == "samba"
|
|
finally:
|
|
server.shutdown()
|
|
server.server_close()
|
|
|
|
|
|
def test_http_post_install_with_settings(fake_dirs, no_docker, no_systemd_run, admin_session):
|
|
_, bundled = fake_dirs
|
|
_write_bundled(bundled, "fileshare", manifest=SETTINGS_MANIFEST)
|
|
server = api.HTTPServer(("127.0.0.1", 0), api._Handler)
|
|
port = server.server_address[1]
|
|
t = threading.Thread(target=server.serve_forever, daemon=True)
|
|
t.start()
|
|
try:
|
|
req = _request(
|
|
port,
|
|
"/api/apps/install",
|
|
cookie=admin_session,
|
|
method="POST",
|
|
body={
|
|
"name": "fileshare",
|
|
"settings": {"SMB_USER": "alice", "SMB_PASSWORD": "s3cret"},
|
|
},
|
|
)
|
|
with urllib.request.urlopen(req) as r:
|
|
# Async: 202 Accepted + dispatched background job.
|
|
assert r.status == 202
|
|
body = json.loads(r.read())
|
|
assert body["status"] == "dispatched"
|
|
assert body["unit"] == "furtka-install-fileshare"
|
|
# Sync phase wrote the .env before dispatch.
|
|
apps, _ = fake_dirs
|
|
assert "SMB_PASSWORD=s3cret" in (apps / "fileshare" / ".env").read_text()
|
|
# And systemd-run was called exactly once with the expected cmd.
|
|
assert len(no_systemd_run) == 1
|
|
assert no_systemd_run[0][:4] == [
|
|
"systemd-run",
|
|
"--unit=furtka-install-fileshare",
|
|
"--no-block",
|
|
"--collect",
|
|
]
|
|
assert no_systemd_run[0][-3:] == ["app", "install-bg", "fileshare"]
|
|
finally:
|
|
server.shutdown()
|
|
server.server_close()
|
|
|
|
|
|
def test_do_install_returns_409_when_locked(fake_dirs, no_docker, no_systemd_run):
|
|
_, bundled = fake_dirs
|
|
_write_bundled(bundled, "fileshare", env_example="A=real")
|
|
# Hold the install lock so _do_install fast-fails.
|
|
fh = api.install_runner.acquire_lock()
|
|
try:
|
|
status, body = api._do_install("fileshare")
|
|
assert status == 409
|
|
assert "in progress" in body["error"]
|
|
finally:
|
|
fh.close()
|
|
|
|
|
|
def test_do_install_returns_409_when_state_reports_running(fake_dirs, no_docker, no_systemd_run):
|
|
"""Closes the race window where _do_install had already released
|
|
the fcntl lock (so the systemd-run child could grab it) but a
|
|
second POST tried to start a new install while the first was still
|
|
mid-flight. The state file's non-terminal stage is the reliable
|
|
"someone else is installing" signal."""
|
|
_, bundled = fake_dirs
|
|
_write_bundled(bundled, "fileshare", env_example="A=real")
|
|
api.install_runner.write_state("pulling_image", app="jellyfin")
|
|
status, body = api._do_install("fileshare")
|
|
assert status == 409
|
|
assert "in progress" in body["error"]
|
|
assert "jellyfin" in body["error"]
|
|
assert "pulling_image" in body["error"]
|
|
|
|
|
|
def test_do_install_goes_through_after_terminal_state(fake_dirs, no_docker, no_systemd_run):
|
|
"""After a successful or failed install, the state file stays at
|
|
done/error — a new install must be accepted, not blocked."""
|
|
_, bundled = fake_dirs
|
|
_write_bundled(bundled, "fileshare", env_example="A=real")
|
|
api.install_runner.write_state("done", app="previous", version="1.0.0")
|
|
status, _ = api._do_install("fileshare")
|
|
assert status == 202
|
|
|
|
api.install_runner.write_state("error", app="previous", error="oops")
|
|
status, _ = api._do_install("fileshare")
|
|
assert status == 202
|
|
|
|
|
|
def test_do_install_status_returns_state(fake_dirs):
|
|
# Write state directly, then GET it via the status handler.
|
|
api.install_runner.write_state("pulling_image", app="jellyfin")
|
|
status, body = api._do_install_status()
|
|
assert status == 200
|
|
assert body["stage"] == "pulling_image"
|
|
assert body["app"] == "jellyfin"
|
|
|
|
|
|
# --- Catalog endpoints ------------------------------------------------------
|
|
|
|
|
|
def test_catalog_status_reports_absent_catalog(fake_dirs, monkeypatch):
|
|
"""With no /var/lib/furtka/catalog/ on disk, status reports current=None + empty state."""
|
|
# FURTKA_CATALOG_STATE is not touched by fake_dirs — point it at tmp so we
|
|
# don't hit the production path.
|
|
monkeypatch.setenv("FURTKA_CATALOG_STATE", str(fake_dirs[0].parent / "catalog-state.json"))
|
|
import importlib
|
|
|
|
from furtka import catalog as c
|
|
|
|
importlib.reload(c)
|
|
status, body = api._do_catalog_status()
|
|
assert status == 200
|
|
assert body["current"] is None
|
|
assert body["state"] == {}
|
|
|
|
|
|
def test_catalog_check_surfaces_forgejo_error(fake_dirs, monkeypatch):
|
|
monkeypatch.setenv("FURTKA_CATALOG_STATE", str(fake_dirs[0].parent / "catalog-state.json"))
|
|
import importlib
|
|
|
|
from furtka import _release_common as _rc
|
|
from furtka import catalog as c
|
|
|
|
importlib.reload(c)
|
|
|
|
def boom(host, repo, path, *, error_cls=RuntimeError):
|
|
raise error_cls("forgejo api down")
|
|
|
|
monkeypatch.setattr(_rc, "forgejo_api", boom)
|
|
status, body = api._do_catalog_check()
|
|
assert status == 502
|
|
assert "forgejo api down" in body["error"]
|
|
|
|
|
|
# --- Power endpoints --------------------------------------------------------
|
|
|
|
|
|
def test_power_rejects_unknown_action(fake_dirs):
|
|
status, body = api._do_power({"action": "format-harddrive"})
|
|
assert status == 400
|
|
assert "action" in body["error"]
|
|
|
|
|
|
def test_power_rejects_missing_action(fake_dirs):
|
|
status, body = api._do_power({})
|
|
assert status == 400
|
|
|
|
|
|
def test_power_reboot_dispatches_systemd_run(fake_dirs, monkeypatch):
|
|
seen = []
|
|
|
|
class _FakeCompleted:
|
|
returncode = 0
|
|
stdout = ""
|
|
stderr = ""
|
|
|
|
def fake_run(cmd, *, check=False, capture_output=False, text=False):
|
|
seen.append(cmd)
|
|
return _FakeCompleted()
|
|
|
|
monkeypatch.setattr("subprocess.run", fake_run)
|
|
status, body = api._do_power({"action": "reboot"})
|
|
assert status == 202
|
|
assert body == {"action": "reboot", "scheduled_in_seconds": 3}
|
|
# The dispatched command is a delayed systemd-run that eventually
|
|
# invokes `systemctl reboot`. Asserting the key flags catches
|
|
# accidental regressions (e.g. losing --no-block would block the API
|
|
# thread until the unit completes).
|
|
assert seen[0][:1] == ["systemd-run"]
|
|
assert "--on-active=3s" in seen[0]
|
|
assert "--no-block" in seen[0]
|
|
assert seen[0][-2:] == ["systemctl", "reboot"]
|
|
|
|
|
|
def test_power_poweroff_dispatches_systemctl_poweroff(fake_dirs, monkeypatch):
|
|
seen = []
|
|
|
|
class _FakeCompleted:
|
|
returncode = 0
|
|
|
|
monkeypatch.setattr("subprocess.run", lambda cmd, **kw: (seen.append(cmd), _FakeCompleted())[1])
|
|
status, body = api._do_power({"action": "poweroff"})
|
|
assert status == 202
|
|
assert body["action"] == "poweroff"
|
|
assert seen[0][-2:] == ["systemctl", "poweroff"]
|
|
|
|
|
|
def test_power_surfaces_systemd_run_missing(fake_dirs, monkeypatch):
|
|
def boom(*a, **kw):
|
|
raise FileNotFoundError(2, "No such file", "systemd-run")
|
|
|
|
monkeypatch.setattr("subprocess.run", boom)
|
|
status, body = api._do_power({"action": "reboot"})
|
|
assert status == 502
|
|
assert "systemd-run" in body["error"]
|