import json import threading import urllib.error import urllib.request import pytest from furtka import api, auth, dockerops VALID_MANIFEST = { "name": "fileshare", "display_name": "Network Files", "version": "0.1.0", "description": "SMB share", "volumes": ["files"], "ports": [445], "icon": "icon.svg", } @pytest.fixture def fake_dirs(tmp_path, monkeypatch): apps = tmp_path / "apps" bundled = tmp_path / "bundled" catalog = tmp_path / "catalog" users_file = tmp_path / "users.json" static_www = tmp_path / "www" apps.mkdir() bundled.mkdir() static_www.mkdir() (static_www / "index.html").write_text("landing page") (static_www / "settings").mkdir() (static_www / "settings" / "index.html").write_text("settings page") monkeypatch.setenv("FURTKA_APPS_DIR", str(apps)) monkeypatch.setenv("FURTKA_BUNDLED_APPS_DIR", str(bundled)) monkeypatch.setenv("FURTKA_CATALOG_DIR", str(catalog)) monkeypatch.setenv("FURTKA_USERS_FILE", str(users_file)) monkeypatch.setenv("FURTKA_STATIC_WWW", str(static_www)) # install_runner writes to /var/lib/furtka/install-state.json and # /run/furtka/install.lock by default — redirect into tmp_path so # test code doesn't need root. monkeypatch.setenv("FURTKA_INSTALL_STATE", str(tmp_path / "install-state.json")) monkeypatch.setenv("FURTKA_INSTALL_LOCK", str(tmp_path / "install.lock")) # install_runner caches env vars at import time, so reload it to # pick up the tmp-path env vars this fixture just set. import importlib from furtka import install_runner importlib.reload(install_runner) # Scrub any sessions or lockout counters that leaked from a prior # test — both stores are module-level. auth.SESSIONS.clear() auth.LOCKOUT.clear_all() return apps, bundled @pytest.fixture def admin_session(fake_dirs): """Pre-create an admin account + live session. Returns a Cookie header value ready to drop into urllib.request.Request(headers=...).""" auth.create_admin("daniel", "hunter2-pw") session = auth.SESSIONS.create("daniel") return f"{auth.COOKIE_NAME}={session.token}" @pytest.fixture def no_docker(monkeypatch): """Stub docker calls so install/remove can run without a daemon.""" monkeypatch.setattr(dockerops, "ensure_volume", lambda name: True) monkeypatch.setattr(dockerops, "compose_up", lambda app_dir, project: None) monkeypatch.setattr(dockerops, "compose_down", lambda app_dir, project: None) @pytest.fixture def no_systemd_run(monkeypatch): """Stub the systemd-run dispatch in _do_install so tests don't need it. The install endpoint now spawns a background systemd-run unit to do the docker-facing phases. Tests that exercise the install path only care that the sync pre-phase succeeded and the dispatch was attempted with the right args — they shouldn't actually fire up systemd. subprocess.run gets monkeypatched to return a fake success CompletedProcess, and the call args get captured for assertions. """ import subprocess calls = [] def fake_run(cmd, check=False, capture_output=False, text=False, **kwargs): calls.append(cmd) return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") monkeypatch.setattr(subprocess, "run", fake_run) return calls def _write_bundled(bundled, name, manifest=None, env_example=None): app = bundled / name app.mkdir() (app / "manifest.json").write_text(json.dumps(manifest or VALID_MANIFEST)) (app / "docker-compose.yaml").write_text("services: {}\n") if env_example is not None: (app / ".env.example").write_text(env_example) return app def test_list_installed_empty(fake_dirs): assert api._list_installed() == [] def test_list_available_empty(fake_dirs): assert api._list_available() == [] def test_list_available_shows_uninstalled(fake_dirs): _, bundled = fake_dirs _write_bundled(bundled, "fileshare") out = api._list_available() assert len(out) == 1 assert out[0]["name"] == "fileshare" assert "display_name" in out[0] # Source field lets the UI later distinguish catalog from bundled seed. assert out[0]["source"] == "bundled" # --- Icon inlining ---------------------------------------------------------- _SIMPLE_SVG = ( '' ) def _write_icon(app_dir, contents, name="icon.svg"): (app_dir / name).write_text(contents) def test_read_icon_svg_returns_content(tmp_path): _write_icon(tmp_path, _SIMPLE_SVG) assert api._read_icon_svg(tmp_path, "icon.svg") == _SIMPLE_SVG def test_read_icon_svg_strips_xml_declaration(tmp_path): _write_icon(tmp_path, '\n' + _SIMPLE_SVG) assert api._read_icon_svg(tmp_path, "icon.svg") == _SIMPLE_SVG def test_read_icon_svg_missing_file_returns_none(tmp_path): assert api._read_icon_svg(tmp_path, "ghost.svg") is None def test_read_icon_svg_no_name_returns_none(tmp_path): assert api._read_icon_svg(tmp_path, None) is None assert api._read_icon_svg(tmp_path, "") is None def test_read_icon_svg_rejects_non_svg(tmp_path): _write_icon(tmp_path, "
hi") assert api._read_icon_svg(tmp_path, "icon.svg") is None def test_read_icon_svg_rejects_oversized(tmp_path): _write_icon(tmp_path, "") assert api._read_icon_svg(tmp_path, "icon.svg") is None def test_read_icon_svg_rejects_script_tag(tmp_path): _write_icon(tmp_path, "") assert api._read_icon_svg(tmp_path, "icon.svg") is None def test_read_icon_svg_rejects_event_handler(tmp_path): _write_icon(tmp_path, '') assert api._read_icon_svg(tmp_path, "icon.svg") is None def test_read_icon_svg_rejects_javascript_url(tmp_path): _write_icon(tmp_path, '') assert api._read_icon_svg(tmp_path, "icon.svg") is None def test_list_available_inlines_icon_svg(fake_dirs): _, bundled = fake_dirs app = _write_bundled(bundled, "fileshare") _write_icon(app, _SIMPLE_SVG) [entry] = api._list_available() assert entry["icon_svg"] == _SIMPLE_SVG def test_list_installed_inlines_icon_svg(fake_dirs, no_docker, no_systemd_run): apps, bundled = fake_dirs app = _write_bundled(bundled, "fileshare", env_example="A=real") _write_icon(app, _SIMPLE_SVG) api._do_install("fileshare") [entry] = api._list_installed() assert entry["icon_svg"] == _SIMPLE_SVG def test_list_available_hides_already_installed(fake_dirs, no_docker, no_systemd_run): apps, bundled = fake_dirs _write_bundled(bundled, "fileshare", env_example="A=real") status, _ = api._do_install("fileshare") assert status == 202 # async dispatch # Now bundled should NOT include fileshare anymore — the app folder # exists on disk (install_from finished synchronously before the # dispatch), which is what _list_available uses for the "installed" # check. assert api._list_available() == [] # But installed list should. installed = api._list_installed() assert len(installed) == 1 and installed[0]["name"] == "fileshare" def test_list_available_prefers_catalog_over_bundled(fake_dirs): _, bundled = fake_dirs catalog_root = bundled.parent / "catalog" / "apps" catalog_root.mkdir(parents=True) _write_bundled(bundled, "fileshare") # A fileshare in the catalog as well — manifest version 0.2.0 to tell apart. catalog_manifest = dict(VALID_MANIFEST, version="0.2.0") cat_app = catalog_root / "fileshare" cat_app.mkdir() (cat_app / "manifest.json").write_text(json.dumps(catalog_manifest)) out = api._list_available() assert len(out) == 1 assert out[0]["source"] == "catalog" assert out[0]["version"] == "0.2.0" def test_install_endpoint_rejects_placeholder(fake_dirs): _, bundled = fake_dirs _write_bundled(bundled, "fileshare", env_example="SMB_PASSWORD=changeme") status, body = api._do_install("fileshare") assert status == 400 assert "placeholder" in body["error"] def test_install_endpoint_rejects_unknown_app(fake_dirs): status, body = api._do_install("does-not-exist") assert status == 400 assert "not found" in body["error"] def test_remove_endpoint_unknown(fake_dirs, no_docker): status, body = api._do_remove("ghost") assert status == 404 def test_remove_endpoint_happy_path(fake_dirs, no_docker, no_systemd_run): apps, bundled = fake_dirs _write_bundled(bundled, "fileshare", env_example="A=real") api._do_install("fileshare") assert (apps / "fileshare").exists() status, body = api._do_remove("fileshare") assert status == 200 assert body["removed"] == "fileshare" assert not (apps / "fileshare").exists() def _request(port, path, cookie=None, method="GET", body=None): headers = {} if cookie is not None: headers["Cookie"] = cookie data = None if body is not None: headers["Content-Type"] = "application/json" data = json.dumps(body).encode() return urllib.request.Request( f"http://127.0.0.1:{port}{path}", data=data, headers=headers, method=method, ) def test_http_get_apps_route(fake_dirs, no_docker, admin_session): """Smoke test the actual HTTP server with a real socket, urllib client.""" server = api.HTTPServer(("127.0.0.1", 0), api._Handler) # port 0 → ephemeral port = server.server_address[1] t = threading.Thread(target=server.serve_forever, daemon=True) t.start() try: with urllib.request.urlopen(_request(port, "/api/apps", cookie=admin_session)) as r: assert r.status == 200 data = json.loads(r.read()) assert data == [] with urllib.request.urlopen(_request(port, "/apps", cookie=admin_session)) as r: assert r.status == 200 assert b"Furtka Apps" in r.read() # Unknown route → 404 JSON. try: urllib.request.urlopen(_request(port, "/api/nope", cookie=admin_session)) raise AssertionError("expected 404") except urllib.error.HTTPError as e: assert e.code == 404 finally: server.shutdown() server.server_close() def test_http_post_install_unknown_app(fake_dirs, admin_session): server = api.HTTPServer(("127.0.0.1", 0), api._Handler) port = server.server_address[1] t = threading.Thread(target=server.serve_forever, daemon=True) t.start() try: req = _request( port, "/api/apps/install", cookie=admin_session, method="POST", body={"name": "ghost"}, ) try: urllib.request.urlopen(req) raise AssertionError("expected 400") except urllib.error.HTTPError as e: assert e.code == 400 body = json.loads(e.read()) assert "not found" in body["error"] finally: server.shutdown() server.server_close() # --- Auth guard + login flow ------------------------------------------------ def _start_server(): server = api.HTTPServer(("127.0.0.1", 0), api._Handler) port = server.server_address[1] t = threading.Thread(target=server.serve_forever, daemon=True) t.start() return server, port def test_unauthenticated_api_returns_401(fake_dirs): # No admin_session fixture → no cookie on the request. server, port = _start_server() try: try: urllib.request.urlopen(_request(port, "/api/apps")) raise AssertionError("expected 401") except urllib.error.HTTPError as e: assert e.code == 401 body = json.loads(e.read()) assert body["error"] == "not authenticated" finally: server.shutdown() server.server_close() def test_unauthenticated_html_redirects_to_login(fake_dirs): server, port = _start_server() try: # Disable redirect following so we can inspect the 302. opener = urllib.request.build_opener(_NoRedirectHandler()) try: opener.open(_request(port, "/apps")) raise AssertionError("expected 302") except urllib.error.HTTPError as e: assert e.code == 302 assert e.headers["Location"] == "/login" finally: server.shutdown() server.server_close() class _NoRedirectHandler(urllib.request.HTTPRedirectHandler): def redirect_request(self, *args, **kwargs): return None def test_unauth_root_redirects_to_login(fake_dirs): """/ was previously Caddy-direct static HTML, bypassing auth. Now Python serves it and the auth-guard applies — unauth visitor gets bounced to /login just like /apps does.""" server, port = _start_server() try: opener = urllib.request.build_opener(_NoRedirectHandler()) try: opener.open(_request(port, "/")) raise AssertionError("expected 302") except urllib.error.HTTPError as e: assert e.code == 302 assert e.headers["Location"] == "/login" finally: server.shutdown() server.server_close() def test_unauth_settings_redirects_to_login(fake_dirs): server, port = _start_server() try: opener = urllib.request.build_opener(_NoRedirectHandler()) for path in ("/settings", "/settings/"): try: opener.open(_request(port, path)) raise AssertionError(f"expected 302 for {path}") except urllib.error.HTTPError as e: assert e.code == 302 assert e.headers["Location"] == "/login" finally: server.shutdown() server.server_close() def test_authed_root_serves_static_index(fake_dirs, admin_session): server, port = _start_server() try: with urllib.request.urlopen(_request(port, "/", cookie=admin_session)) as r: assert r.status == 200 assert r.read() == b"landing page" finally: server.shutdown() server.server_close() def test_authed_settings_serves_static(fake_dirs, admin_session): server, port = _start_server() try: for path in ("/settings", "/settings/"): with urllib.request.urlopen(_request(port, path, cookie=admin_session)) as r: assert r.status == 200 assert r.read() == b"settings page" finally: server.shutdown() server.server_close() def test_authed_root_does_not_serve_apps_html(fake_dirs, admin_session): """Regression guard: the pre-26.14 do_GET had `if self.path in ("/", "/apps", ...)` which served _HTML (the apps page) for / too, since Caddy wasn't proxying / so nobody noticed. Now that Caddy does proxy /, the two paths must serve different content.""" server, port = _start_server() try: with urllib.request.urlopen(_request(port, "/", cookie=admin_session)) as r: root_body = r.read() with urllib.request.urlopen(_request(port, "/apps", cookie=admin_session)) as r: apps_body = r.read() assert root_body != apps_body assert b"Furtka Apps" in apps_body assert b"landing page" in root_body finally: server.shutdown() server.server_close() def test_get_login_renders_login_form_when_admin_exists(fake_dirs): auth.create_admin("daniel", "hunter2-pw") server, port = _start_server() try: with urllib.request.urlopen(_request(port, "/login")) as r: html = r.read().decode() assert r.status == 200 assert "Furtka login" in html # No setup confirm-password field rendered in login mode. assert 'id="password2"' not in html assert "Repeat password" not in html finally: server.shutdown() server.server_close() def test_get_login_renders_setup_form_when_no_admin(fake_dirs): server, port = _start_server() try: with urllib.request.urlopen(_request(port, "/login")) as r: html = r.read().decode() assert r.status == 200 assert "Set admin password" in html assert "password2" in html # setup confirm field rendered finally: server.shutdown() server.server_close() def test_get_login_redirects_when_already_authed(fake_dirs, admin_session): server, port = _start_server() try: opener = urllib.request.build_opener(_NoRedirectHandler()) try: opener.open(_request(port, "/login", cookie=admin_session)) raise AssertionError("expected 302") except urllib.error.HTTPError as e: assert e.code == 302 assert e.headers["Location"] == "/apps" finally: server.shutdown() server.server_close() def test_post_login_setup_creates_admin(fake_dirs): server, port = _start_server() try: req = _request( port, "/login", method="POST", body={ "username": "daniel", "password": "a-real-password", "password2": "a-real-password", }, ) with urllib.request.urlopen(req) as r: assert r.status == 200 set_cookie = r.headers["Set-Cookie"] assert auth.COOKIE_NAME in set_cookie assert "HttpOnly" in set_cookie assert "SameSite=Strict" in set_cookie # users.json got written. assert auth.load_users()["admin"]["username"] == "daniel" # And the password really works. assert auth.authenticate("daniel", "a-real-password") is True finally: server.shutdown() server.server_close() def test_post_login_setup_rejects_password_mismatch(fake_dirs): server, port = _start_server() try: req = _request( port, "/login", method="POST", body={"username": "x", "password": "abcdefgh", "password2": "different"}, ) try: urllib.request.urlopen(req) raise AssertionError("expected 400") except urllib.error.HTTPError as e: assert e.code == 400 body = json.loads(e.read()) assert "match" in body["error"].lower() # No admin created. assert auth.setup_needed() is True finally: server.shutdown() server.server_close() def test_post_login_setup_rejects_short_password(fake_dirs): server, port = _start_server() try: req = _request( port, "/login", method="POST", body={"username": "x", "password": "short", "password2": "short"}, ) try: urllib.request.urlopen(req) raise AssertionError("expected 400") except urllib.error.HTTPError as e: assert e.code == 400 finally: server.shutdown() server.server_close() def test_post_login_success_with_correct_credentials(fake_dirs): auth.create_admin("daniel", "hunter2-pw") server, port = _start_server() try: req = _request( port, "/login", method="POST", body={"username": "daniel", "password": "hunter2-pw"}, ) with urllib.request.urlopen(req) as r: assert r.status == 200 set_cookie = r.headers["Set-Cookie"] assert auth.COOKIE_NAME in set_cookie finally: server.shutdown() server.server_close() def test_post_login_rejects_wrong_password(fake_dirs): auth.create_admin("daniel", "hunter2-pw") server, port = _start_server() try: req = _request( port, "/login", method="POST", body={"username": "daniel", "password": "nope"}, ) try: urllib.request.urlopen(req) raise AssertionError("expected 401") except urllib.error.HTTPError as e: assert e.code == 401 finally: server.shutdown() server.server_close() def _post_wrong_login(port, username="daniel", password="nope"): req = _request( port, "/login", method="POST", body={"username": username, "password": password}, ) try: urllib.request.urlopen(req) raise AssertionError("expected HTTPError") except urllib.error.HTTPError as e: return e def test_post_login_locks_out_after_repeated_failures(fake_dirs, monkeypatch): auth.create_admin("daniel", "hunter2-pw") # Flatten the 0.5s speed-bump so the test doesn't take 5 seconds. monkeypatch.setattr(api.time, "sleep", lambda _s: None) server, port = _start_server() try: for _ in range(auth.LoginAttempts.MAX_FAILURES): err = _post_wrong_login(port) assert err.code == 401 err = _post_wrong_login(port) assert err.code == 429 assert err.headers.get("Retry-After") is not None assert int(err.headers["Retry-After"]) > 0 finally: server.shutdown() server.server_close() def test_post_login_429_masks_correctness(fake_dirs, monkeypatch): """Once locked, the correct password must also get 429 — no oracle.""" auth.create_admin("daniel", "hunter2-pw") monkeypatch.setattr(api.time, "sleep", lambda _s: None) server, port = _start_server() try: for _ in range(auth.LoginAttempts.MAX_FAILURES): _post_wrong_login(port) req = _request( port, "/login", method="POST", body={"username": "daniel", "password": "hunter2-pw"}, ) try: urllib.request.urlopen(req) raise AssertionError("expected 429") except urllib.error.HTTPError as e: assert e.code == 429 finally: server.shutdown() server.server_close() def test_post_login_success_clears_lockout_counter(fake_dirs, monkeypatch): auth.create_admin("daniel", "hunter2-pw") monkeypatch.setattr(api.time, "sleep", lambda _s: None) server, port = _start_server() try: # Get close to the threshold, then log in successfully. for _ in range(auth.LoginAttempts.MAX_FAILURES - 1): _post_wrong_login(port) req = _request( port, "/login", method="POST", body={"username": "daniel", "password": "hunter2-pw"}, ) with urllib.request.urlopen(req) as r: assert r.status == 200 # Counter must have been cleared: another full MAX_FAILURES-1 # fails shouldn't trigger 429. for _ in range(auth.LoginAttempts.MAX_FAILURES - 1): err = _post_wrong_login(port) assert err.code == 401 finally: server.shutdown() server.server_close() def test_post_login_setup_not_rate_limited(fake_dirs, monkeypatch): """First-run setup is never auth-ed against a hash, so the lockout must not apply — otherwise a clumsy admin could lock themselves out of a box that has no admin yet.""" monkeypatch.setattr(api.time, "sleep", lambda _s: None) server, port = _start_server() try: # Many mismatched setup submissions (400s) — no 429 should appear. for _ in range(auth.LoginAttempts.MAX_FAILURES + 3): req = _request( port, "/login", method="POST", body={ "username": "daniel", "password": "longenough", "password2": "different", }, ) try: urllib.request.urlopen(req) raise AssertionError("expected 400") except urllib.error.HTTPError as e: assert e.code == 400 # Then a good setup still succeeds. req = _request( port, "/login", method="POST", body={ "username": "daniel", "password": "longenough", "password2": "longenough", }, ) with urllib.request.urlopen(req) as r: assert r.status == 200 finally: server.shutdown() server.server_close() def test_post_logout_revokes_session(fake_dirs, admin_session): server, port = _start_server() try: # Logout returns 200 and clears the cookie. with urllib.request.urlopen( _request(port, "/logout", cookie=admin_session, method="POST", body={}) ) as r: assert r.status == 200 set_cookie = r.headers["Set-Cookie"] assert "Max-Age=0" in set_cookie # Subsequent API call with same cookie → 401 (session revoked). try: urllib.request.urlopen(_request(port, "/api/apps", cookie=admin_session)) raise AssertionError("expected 401") except urllib.error.HTTPError as e: assert e.code == 401 finally: server.shutdown() server.server_close() def test_post_to_protected_route_without_auth_is_401(fake_dirs): server, port = _start_server() try: req = _request( port, "/api/apps/install", method="POST", body={"name": "whatever"}, ) try: urllib.request.urlopen(req) raise AssertionError("expected 401") except urllib.error.HTTPError as e: assert e.code == 401 finally: server.shutdown() server.server_close() # --- Settings endpoints ------------------------------------------------------ SETTINGS_MANIFEST = dict( VALID_MANIFEST, description_long="Long help text.", settings=[ { "name": "SMB_USER", "label": "User", "type": "text", "default": "furtka", "required": True, }, {"name": "SMB_PASSWORD", "label": "Pass", "type": "password", "required": True}, ], ) def test_get_settings_bundled(fake_dirs): _, bundled = fake_dirs _write_bundled( bundled, "fileshare", manifest=SETTINGS_MANIFEST, env_example="SMB_USER=furtka\n" ) status, body = api._do_get_settings("fileshare") assert status == 200 assert body["installed"] is False assert body["description_long"] == "Long help text." names = [s["name"] for s in body["settings"]] assert names == ["SMB_USER", "SMB_PASSWORD"] # Password values never leak back. pwd = next(s for s in body["settings"] if s["name"] == "SMB_PASSWORD") assert pwd["value"] == "" # Text value comes from .env.example. user = next(s for s in body["settings"] if s["name"] == "SMB_USER") assert user["value"] == "furtka" def test_get_settings_not_found(fake_dirs): status, _ = api._do_get_settings("ghost") assert status == 404 def test_install_with_settings_writes_env_via_api(fake_dirs, no_docker, no_systemd_run): _, bundled = fake_dirs _write_bundled(bundled, "fileshare", manifest=SETTINGS_MANIFEST) status, body = api._do_install( "fileshare", settings={"SMB_USER": "alice", "SMB_PASSWORD": "s3cret"} ) assert status == 202, body apps, _ = fake_dirs env = (apps / "fileshare" / ".env").read_text() assert "SMB_USER=alice" in env assert "SMB_PASSWORD=s3cret" in env def test_install_with_settings_rejects_empty_required_via_api(fake_dirs, no_docker): _, bundled = fake_dirs _write_bundled(bundled, "fileshare", manifest=SETTINGS_MANIFEST) status, body = api._do_install("fileshare", settings={"SMB_USER": "a", "SMB_PASSWORD": ""}) assert status == 400 assert "SMB_PASSWORD" in body["error"] def test_update_settings_merges(fake_dirs, no_docker, no_systemd_run): _, bundled = fake_dirs _write_bundled(bundled, "fileshare", manifest=SETTINGS_MANIFEST) api._do_install("fileshare", settings={"SMB_USER": "alice", "SMB_PASSWORD": "original"}) # Edit flow: submit only the changed password. status, body = api._do_update_settings("fileshare", {"SMB_PASSWORD": "newpass"}) assert status == 200, body apps, _ = fake_dirs env = (apps / "fileshare" / ".env").read_text() assert "SMB_USER=alice" in env assert "SMB_PASSWORD=newpass" in env def test_update_settings_unknown_app(fake_dirs): status, _ = api._do_update_settings("ghost", {"SMB_USER": "x"}) assert status == 404 def test_http_get_settings_route(fake_dirs, no_docker, admin_session): _, bundled = fake_dirs _write_bundled(bundled, "fileshare", manifest=SETTINGS_MANIFEST) server = api.HTTPServer(("127.0.0.1", 0), api._Handler) port = server.server_address[1] t = threading.Thread(target=server.serve_forever, daemon=True) t.start() try: with urllib.request.urlopen( _request(port, "/api/apps/fileshare/settings", cookie=admin_session) ) as r: assert r.status == 200 data = json.loads(r.read()) assert data["name"] == "fileshare" assert len(data["settings"]) == 2 finally: server.shutdown() server.server_close() # --- Update endpoint -------------------------------------------------------- @pytest.fixture def update_docker_stubs(monkeypatch): """Stub the dockerops helpers _do_update touches. Tests tune the return values of running_/local_image_id via `state` to steer the comparison.""" state = { "tags": {"samba": "dperson/samba:latest"}, "running": {"samba": "sha256:OLD"}, "local": {"samba": "sha256:OLD"}, "pull_called": 0, "up_called": 0, "pull_raises": None, } def _pull(app_dir, project): state["pull_called"] += 1 if state["pull_raises"]: raise state["pull_raises"] def _up(app_dir, project): state["up_called"] += 1 monkeypatch.setattr(api.dockerops, "compose_pull", _pull) monkeypatch.setattr(api.dockerops, "compose_up", _up) monkeypatch.setattr( api.dockerops, "compose_image_tags", lambda app_dir, project: dict(state["tags"]) ) monkeypatch.setattr( api.dockerops, "running_container_image_id", lambda app_dir, project, service: state["running"].get(service), ) monkeypatch.setattr(api.dockerops, "local_image_id", lambda tag: state["local"].get("samba")) return state def test_update_not_installed(fake_dirs): status, body = api._do_update("ghost") assert status == 404 assert "not installed" in body["error"] def test_update_no_changes(fake_dirs, no_docker, no_systemd_run, update_docker_stubs): _, bundled = fake_dirs _write_bundled(bundled, "fileshare", env_example="A=real") api._do_install("fileshare") update_docker_stubs["up_called"] = 0 # reset counter after install status, body = api._do_update("fileshare") assert status == 200 assert body["updated"] is False assert body["services"] == [] assert update_docker_stubs["pull_called"] == 1 assert update_docker_stubs["up_called"] == 0 def test_update_changes_applied(fake_dirs, no_docker, no_systemd_run, update_docker_stubs): _, bundled = fake_dirs _write_bundled(bundled, "fileshare", env_example="A=real") api._do_install("fileshare") update_docker_stubs["up_called"] = 0 # reset counter after install # Simulate: pull advanced the local image. update_docker_stubs["local"] = {"samba": "sha256:NEW"} status, body = api._do_update("fileshare") assert status == 200 assert body["updated"] is True [change] = body["services"] assert change == { "service": "samba", "from": "sha256:OLD", "to": "sha256:NEW", "tag": "dperson/samba:latest", } assert update_docker_stubs["up_called"] == 1 def test_update_skips_services_not_running( fake_dirs, no_docker, no_systemd_run, update_docker_stubs ): _, bundled = fake_dirs _write_bundled(bundled, "fileshare", env_example="A=real") api._do_install("fileshare") update_docker_stubs["up_called"] = 0 # reset counter after install # Container not up at all: running_container_image_id returns None. update_docker_stubs["running"] = {} update_docker_stubs["local"] = {"samba": "sha256:NEW"} status, body = api._do_update("fileshare") assert status == 200 assert body["updated"] is False assert update_docker_stubs["up_called"] == 0 def test_update_returns_502_on_pull_error( fake_dirs, no_docker, no_systemd_run, update_docker_stubs ): _, bundled = fake_dirs _write_bundled(bundled, "fileshare", env_example="A=real") api._do_install("fileshare") update_docker_stubs["up_called"] = 0 # reset counter after install update_docker_stubs["pull_raises"] = api.dockerops.DockerError("no network") status, body = api._do_update("fileshare") assert status == 502 assert "no network" in body["error"] assert update_docker_stubs["up_called"] == 0 # --- Furtka self-update endpoints ------------------------------------------ @pytest.fixture def stub_furtka_updater(monkeypatch): """Stub the updater module so api endpoints don't hit Forgejo / systemd-run.""" state = {"check_called": 0, "apply_called": 0, "status_called": 0} from furtka import updater class _Lock: def close(self): pass def stub_check(): state["check_called"] += 1 return updater.UpdateCheck( current="26.0-alpha", latest="26.1-alpha", update_available=True, tarball_url="https://x/t.tar.gz", sha256_url="https://x/t.tar.gz.sha256", ) def stub_acquire_lock(): return _Lock() def stub_read_state(): state["status_called"] += 1 return {"stage": "done", "version": "26.1-alpha"} import subprocess def stub_subprocess_run(*args, **kwargs): state["apply_called"] += 1 class _Result: returncode = 0 stdout = "" stderr = "" return _Result() monkeypatch.setattr(updater, "check_update", stub_check) monkeypatch.setattr(updater, "acquire_lock", stub_acquire_lock) monkeypatch.setattr(updater, "read_state", stub_read_state) monkeypatch.setattr(subprocess, "run", stub_subprocess_run) return state def test_furtka_update_check_endpoint(stub_furtka_updater): status, body = api._do_furtka_check() assert status == 200 assert body == { "current": "26.0-alpha", "latest": "26.1-alpha", "update_available": True, } assert stub_furtka_updater["check_called"] == 1 def test_furtka_update_check_reports_updater_errors(monkeypatch): from furtka import updater def raising(): raise updater.UpdateError("no network") monkeypatch.setattr(updater, "check_update", raising) status, body = api._do_furtka_check() assert status == 502 assert "no network" in body["error"] def test_furtka_update_apply_endpoint_dispatches(stub_furtka_updater): status, body = api._do_furtka_apply() assert status == 202 assert body["status"] == "dispatched" assert stub_furtka_updater["apply_called"] == 1 def test_furtka_update_apply_returns_409_if_locked(monkeypatch): from furtka import updater def raising(): raise updater.UpdateError("another update is already in progress") monkeypatch.setattr(updater, "acquire_lock", raising) status, body = api._do_furtka_apply() assert status == 409 assert "in progress" in body["error"] def test_furtka_update_status_endpoint(stub_furtka_updater): status, body = api._do_furtka_status() assert status == 200 assert body == {"stage": "done", "version": "26.1-alpha"} assert stub_furtka_updater["status_called"] == 1 def test_http_post_update_route( fake_dirs, no_docker, no_systemd_run, update_docker_stubs, admin_session ): _, bundled = fake_dirs _write_bundled(bundled, "fileshare", env_example="A=real") api._do_install("fileshare") update_docker_stubs["up_called"] = 0 # reset counter after install update_docker_stubs["local"] = {"samba": "sha256:NEW"} server = api.HTTPServer(("127.0.0.1", 0), api._Handler) port = server.server_address[1] t = threading.Thread(target=server.serve_forever, daemon=True) t.start() try: req = _request( port, "/api/apps/fileshare/update", cookie=admin_session, method="POST", body={}, ) with urllib.request.urlopen(req) as r: assert r.status == 200 body = json.loads(r.read()) assert body["updated"] is True assert body["services"][0]["service"] == "samba" finally: server.shutdown() server.server_close() def test_http_post_install_with_settings(fake_dirs, no_docker, no_systemd_run, admin_session): _, bundled = fake_dirs _write_bundled(bundled, "fileshare", manifest=SETTINGS_MANIFEST) server = api.HTTPServer(("127.0.0.1", 0), api._Handler) port = server.server_address[1] t = threading.Thread(target=server.serve_forever, daemon=True) t.start() try: req = _request( port, "/api/apps/install", cookie=admin_session, method="POST", body={ "name": "fileshare", "settings": {"SMB_USER": "alice", "SMB_PASSWORD": "s3cret"}, }, ) with urllib.request.urlopen(req) as r: # Async: 202 Accepted + dispatched background job. assert r.status == 202 body = json.loads(r.read()) assert body["status"] == "dispatched" assert body["unit"] == "furtka-install-fileshare" # Sync phase wrote the .env before dispatch. apps, _ = fake_dirs assert "SMB_PASSWORD=s3cret" in (apps / "fileshare" / ".env").read_text() # And systemd-run was called exactly once with the expected cmd. assert len(no_systemd_run) == 1 assert no_systemd_run[0][:4] == [ "systemd-run", "--unit=furtka-install-fileshare", "--no-block", "--collect", ] assert no_systemd_run[0][-3:] == ["app", "install-bg", "fileshare"] finally: server.shutdown() server.server_close() def test_do_install_returns_409_when_locked(fake_dirs, no_docker, no_systemd_run): _, bundled = fake_dirs _write_bundled(bundled, "fileshare", env_example="A=real") # Hold the install lock so _do_install fast-fails. fh = api.install_runner.acquire_lock() try: status, body = api._do_install("fileshare") assert status == 409 assert "in progress" in body["error"] finally: fh.close() def test_do_install_returns_409_when_state_reports_running(fake_dirs, no_docker, no_systemd_run): """Closes the race window where _do_install had already released the fcntl lock (so the systemd-run child could grab it) but a second POST tried to start a new install while the first was still mid-flight. The state file's non-terminal stage is the reliable "someone else is installing" signal.""" _, bundled = fake_dirs _write_bundled(bundled, "fileshare", env_example="A=real") api.install_runner.write_state("pulling_image", app="jellyfin") status, body = api._do_install("fileshare") assert status == 409 assert "in progress" in body["error"] assert "jellyfin" in body["error"] assert "pulling_image" in body["error"] def test_do_install_goes_through_after_terminal_state(fake_dirs, no_docker, no_systemd_run): """After a successful or failed install, the state file stays at done/error — a new install must be accepted, not blocked.""" _, bundled = fake_dirs _write_bundled(bundled, "fileshare", env_example="A=real") api.install_runner.write_state("done", app="previous", version="1.0.0") status, _ = api._do_install("fileshare") assert status == 202 api.install_runner.write_state("error", app="previous", error="oops") status, _ = api._do_install("fileshare") assert status == 202 def test_do_install_status_returns_state(fake_dirs): # Write state directly, then GET it via the status handler. api.install_runner.write_state("pulling_image", app="jellyfin") status, body = api._do_install_status() assert status == 200 assert body["stage"] == "pulling_image" assert body["app"] == "jellyfin" # --- Catalog endpoints ------------------------------------------------------ def test_catalog_status_reports_absent_catalog(fake_dirs, monkeypatch): """With no /var/lib/furtka/catalog/ on disk, status reports current=None + empty state.""" # FURTKA_CATALOG_STATE is not touched by fake_dirs — point it at tmp so we # don't hit the production path. monkeypatch.setenv("FURTKA_CATALOG_STATE", str(fake_dirs[0].parent / "catalog-state.json")) import importlib from furtka import catalog as c importlib.reload(c) status, body = api._do_catalog_status() assert status == 200 assert body["current"] is None assert body["state"] == {} def test_catalog_check_surfaces_forgejo_error(fake_dirs, monkeypatch): monkeypatch.setenv("FURTKA_CATALOG_STATE", str(fake_dirs[0].parent / "catalog-state.json")) import importlib from furtka import _release_common as _rc from furtka import catalog as c importlib.reload(c) def boom(host, repo, path, *, error_cls=RuntimeError): raise error_cls("forgejo api down") monkeypatch.setattr(_rc, "forgejo_api", boom) status, body = api._do_catalog_check() assert status == 502 assert "forgejo api down" in body["error"] # --- Power endpoints -------------------------------------------------------- def test_power_rejects_unknown_action(fake_dirs): status, body = api._do_power({"action": "format-harddrive"}) assert status == 400 assert "action" in body["error"] def test_power_rejects_missing_action(fake_dirs): status, body = api._do_power({}) assert status == 400 def test_power_reboot_dispatches_systemd_run(fake_dirs, monkeypatch): seen = [] class _FakeCompleted: returncode = 0 stdout = "" stderr = "" def fake_run(cmd, *, check=False, capture_output=False, text=False): seen.append(cmd) return _FakeCompleted() monkeypatch.setattr("subprocess.run", fake_run) status, body = api._do_power({"action": "reboot"}) assert status == 202 assert body == {"action": "reboot", "scheduled_in_seconds": 3} # The dispatched command is a delayed systemd-run that eventually # invokes `systemctl reboot`. Asserting the key flags catches # accidental regressions (e.g. losing --no-block would block the API # thread until the unit completes). assert seen[0][:1] == ["systemd-run"] assert "--on-active=3s" in seen[0] assert "--no-block" in seen[0] assert seen[0][-2:] == ["systemctl", "reboot"] def test_power_poweroff_dispatches_systemctl_poweroff(fake_dirs, monkeypatch): seen = [] class _FakeCompleted: returncode = 0 monkeypatch.setattr("subprocess.run", lambda cmd, **kw: (seen.append(cmd), _FakeCompleted())[1]) status, body = api._do_power({"action": "poweroff"}) assert status == 202 assert body["action"] == "poweroff" assert seen[0][-2:] == ["systemctl", "poweroff"] def test_power_surfaces_systemd_run_missing(fake_dirs, monkeypatch): def boom(*a, **kw): raise FileNotFoundError(2, "No such file", "systemd-run") monkeypatch.setattr("subprocess.run", boom) status, body = api._do_power({"action": "reboot"}) assert status == 502 assert "systemd-run" in body["error"]