from __future__ import annotations import base64 import hashlib import hmac import json import os import re import sqlite3 import urllib.error import urllib.request from contextlib import contextmanager from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any, Dict, List, Optional from uuid import uuid4 from fastapi import FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field def load_env_file(path: Path) -> None: if not path.exists(): return for raw_line in path.read_text(encoding="utf-8").splitlines(): line = raw_line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) os.environ.setdefault(key.strip(), value.strip().strip('"').strip("'")) load_env_file(Path(os.environ.get("ARBEIDSPULS_ENV_FILE", ".env"))) def default_feedback_config_path() -> Path: if "ARBEIDSPULS_FEEDBACK_CONFIG" in os.environ: return Path(os.environ["ARBEIDSPULS_FEEDBACK_CONFIG"]) repo_root_path = Path("backend/feedback_config.json") return repo_root_path if repo_root_path.exists() else Path("feedback_config.json") DATABASE_PATH = Path(os.environ.get("ARBEIDSPULS_SHARE_DB", "backend/share-data.sqlite3")) FEEDBACK_CONFIG_PATH = default_feedback_config_path() EXPIRY_DAYS = int(os.environ.get("ARBEIDSPULS_SHARE_EXPIRY_DAYS", "7")) MAX_PAYLOAD_BYTES = 2 * 1024 * 1024 MAX_CIPHERTEXT_CHARS = 3_000_000 CREATE_RATE_LIMIT_PER_HOUR = 10 READ_RATE_LIMIT_PER_HOUR = 120 CONFIRM_RATE_LIMIT_PER_HOUR = 60 TRUSTED_PROXY_HOSTS = {"127.0.0.1", "::1", "localhost"} ALLOWED_ORIGINS = [ "https://arbeidspuls.rolfsvaag.no", "http://localhost:5173", "http://127.0.0.1:5173", ] app = FastAPI(title="Arbeidspuls secure share API") app.add_middleware( CORSMiddleware, allow_origins=[origin.strip() for origin in os.environ.get("ARBEIDSPULS_CORS_ORIGINS", ",".join(ALLOWED_ORIGINS)).split(",")], allow_credentials=False, allow_methods=["POST", "GET", "OPTIONS"], allow_headers=["Content-Type"], ) rate_bucket: Dict[str, List[datetime]] = {} feedback_rate_bucket: Dict[str, List[datetime]] = {} feedback_config: Dict[str, Any] = {} class ShareCreateRequest(BaseModel): ciphertext: str = Field(min_length=1, max_length=MAX_CIPHERTEXT_CHARS) iv: str = Field(min_length=1, max_length=64) share_schema: str = Field(alias="schema", min_length=1, max_length=80) confirm_token_hash: str = Field(min_length=32, max_length=128) class ConfirmImportRequest(BaseModel): confirm_token: Optional[str] = Field(default=None, max_length=256) class ShareCreateResponse(BaseModel): export_id: str expires_at: str class FeedbackRequest(BaseModel): feedback_type: str = Field(min_length=1, max_length=32) areas: List[str] = Field(min_items=1, max_items=8) message: str = Field(min_length=1, max_length=4500) consent: bool language: str = Field(default="nb-NO", max_length=12) route: str = Field(default="/", max_length=180) viewport_category: str = Field(default="unknown", max_length=24) app_version: str = Field(default="unknown", max_length=32) form_started_at: str = Field(default="", max_length=40) form_completed_at: str = Field(default="", max_length=40) company_website: str = Field(default="", max_length=200) urgent_contact_allowed: bool = False @contextmanager def connect(): DATABASE_PATH.parent.mkdir(parents=True, exist_ok=True) db = sqlite3.connect(DATABASE_PATH) db.row_factory = sqlite3.Row try: yield db db.commit() finally: db.close() def utc_now() -> datetime: return datetime.now(timezone.utc) def iso(value: datetime) -> str: return value.isoformat().replace("+00:00", "Z") def init_db() -> None: with connect() as db: db.execute( """ CREATE TABLE IF NOT EXISTS active_shares ( export_id TEXT PRIMARY KEY, created_at TEXT NOT NULL, expires_at TEXT NOT NULL, ciphertext TEXT NOT NULL, iv TEXT NOT NULL, schema TEXT NOT NULL ) """ ) db.execute( """ CREATE TABLE IF NOT EXISTS share_audit ( export_id TEXT PRIMARY KEY, created_at TEXT NOT NULL, deleted_at TEXT NOT NULL, delete_reason TEXT NOT NULL ) """ ) columns = {row["name"] for row in db.execute("PRAGMA table_info(active_shares)").fetchall()} if "confirm_token_hash" not in columns: db.execute("ALTER TABLE active_shares ADD COLUMN confirm_token_hash TEXT") @app.middleware("http") async def security_headers(request: Request, call_next): response = await call_next(request) if request.url.path.startswith("/api/share") or request.url.path.startswith("/api/feedback"): response.headers["Cache-Control"] = "no-store" response.headers["Pragma"] = "no-cache" response.headers["Expires"] = "0" response.headers["X-Content-Type-Options"] = "nosniff" response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" response.headers["Permissions-Policy"] = "camera=(), microphone=(), geolocation=(), payment=()" return response def decoded_size(value: str) -> int: if not re.fullmatch(r"[A-Za-z0-9_-]+", value): raise HTTPException(status_code=400, detail="Invalid base64url payload.") padded = value.replace("-", "+").replace("_", "/") + "=" * (-len(value) % 4) try: return len(base64.b64decode(padded, validate=True)) except Exception as exc: raise HTTPException(status_code=400, detail="Invalid base64url payload.") from exc def cleanup_expired() -> None: now = iso(utc_now()) with connect() as db: rows = db.execute("SELECT export_id, created_at FROM active_shares WHERE expires_at <= ?", (now,)).fetchall() for row in rows: mark_deleted(db, row["export_id"], row["created_at"], "expired") def mark_deleted(db: sqlite3.Connection, export_id: str, created_at: str, reason: str) -> None: db.execute("DELETE FROM active_shares WHERE export_id = ?", (export_id,)) db.execute( """ INSERT OR REPLACE INTO share_audit (export_id, created_at, deleted_at, delete_reason) VALUES (?, ?, ?, ?) """, (export_id, created_at, iso(utc_now()), reason), ) def client_ip(request: Request) -> str: direct_host = request.client.host if request.client else "unknown" if direct_host in TRUSTED_PROXY_HOSTS: real_ip = request.headers.get("x-real-ip", "").strip() if real_ip: return real_ip return direct_host def assert_rate_limit(request: Request, action: str, limit: int) -> None: ip = client_ip(request) bucket_key = f"{action}:{ip}" cutoff = utc_now() - timedelta(hours=1) recent = [created for created in rate_bucket.get(bucket_key, []) if created > cutoff] if len(recent) >= limit: raise HTTPException(status_code=429, detail="Rate limit exceeded.") recent.append(utc_now()) rate_bucket[bucket_key] = recent def token_hash(confirm_token: str) -> str: digest = hashlib.sha256(confirm_token.encode("utf-8")).digest() return base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=") def load_feedback_config() -> None: global feedback_config if FEEDBACK_CONFIG_PATH.exists(): with FEEDBACK_CONFIG_PATH.open("r", encoding="utf-8") as handle: feedback_config = json.load(handle) else: feedback_config = {} def feedback_enabled() -> bool: return os.environ.get("FEEDBACK_ENABLED", "true").lower() in {"1", "true", "yes", "on"} def feedback_repo_config() -> Dict[str, str]: repo = feedback_config.get("repo", {}) return { "base_url": os.environ.get("GITEA_BASE_URL", repo.get("base_url", "https://git.rolfsvaag.no")).rstrip("/"), "owner": os.environ.get("GITEA_OWNER", repo.get("owner", "Rolfsvaag_Datateknikk")), "repo": os.environ.get("GITEA_REPO", repo.get("repo", "Arbeidspuls")), } def parse_iso(value: str) -> Optional[datetime]: if not value: return None try: return datetime.fromisoformat(value.replace("Z", "+00:00")) except ValueError: return None def feedback_rate_key(request: Request) -> str: pepper = os.environ.get("FEEDBACK_RATE_LIMIT_PEPPER", "") if not pepper: pepper = "missing-feedback-rate-limit-pepper" ip = client_ip(request) user_agent = request.headers.get("user-agent", "") digest = hmac.new(pepper.encode("utf-8"), f"{ip}|{user_agent}".encode("utf-8"), hashlib.sha256).hexdigest() return digest def assert_feedback_rate_limit(request: Request) -> None: config = feedback_config.get("rate_limit", {}) key = feedback_rate_key(request) now = utc_now() ten_minute_cutoff = now - timedelta(minutes=10) day_cutoff = now - timedelta(hours=24) recent = [created for created in feedback_rate_bucket.get(key, []) if created > day_cutoff] if len([created for created in recent if created > ten_minute_cutoff]) >= int(config.get("ten_minutes", 3)): raise HTTPException(status_code=429, detail="Rate limit exceeded.") if len(recent) >= int(config.get("twenty_four_hours", 10)): raise HTTPException(status_code=429, detail="Rate limit exceeded.") recent.append(now) feedback_rate_bucket[key] = recent def meaningful_feedback_errors(message: str) -> List[str]: config = feedback_config.get("validation", {}) min_chars = int(config.get("min_chars", 30)) min_words = int(config.get("min_words", 5)) min_letters = int(config.get("min_letters", 15)) max_chars = int(config.get("max_chars", 4000)) text = message.strip() words = [word for word in re.split(r"\s+", text) if word] letters = re.findall(r"[A-Za-zÆØÅæøå]", text) errors = [] if len(text) < min_chars: errors.append("min_chars") if len(text) > max_chars: errors.append("max_chars") if len(words) < min_words: errors.append("min_words") if len(letters) < min_letters: errors.append("min_letters") if re.fullmatch(r"[\d\W_]+", text, re.UNICODE): errors.append("not_meaningful") if re.search(r"(.)\1{11,}", text, re.UNICODE): errors.append("repetition") return errors REDACTION_PATTERNS = [ re.compile(r"[^\s@]+@[^\s@]+\.[^\s@]+"), re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b"), re.compile(r"\b\d{11}\b"), re.compile(r"\b\d{8,}\b"), re.compile(r"(?:\+?\d[\s().-]*){8,}"), ] def redact_feedback(message: str) -> tuple[str, int, float]: redactions = 0 redacted = message for pattern in REDACTION_PATTERNS: redacted, count = pattern.subn("[removed]", redacted) redactions += count changed_chars = max(0, len(message) - len(redacted.replace("[removed]", ""))) ratio = changed_chars / max(1, len(message)) return redacted, redactions, ratio def validate_feedback_payload(payload: FeedbackRequest) -> tuple[Dict[str, Any], List[int], str]: if not feedback_enabled(): raise HTTPException(status_code=503, detail="Feedback is not enabled.") if not payload.consent: raise HTTPException(status_code=400, detail="Consent is required.") if payload.company_website.strip() or payload.urgent_contact_allowed: return {"fake_success": True}, [], "" completed = parse_iso(payload.form_completed_at) started = parse_iso(payload.form_started_at) min_seconds = int(feedback_config.get("validation", {}).get("min_form_seconds", 5)) if not completed or not started or (completed - started).total_seconds() < min_seconds: raise HTTPException(status_code=400, detail="The form was submitted too quickly.") labels = feedback_config.get("labels", {}) type_config = labels.get("types", {}).get(payload.feedback_type) area_configs = [labels.get("areas", {}).get(area) for area in payload.areas] if not type_config or len(payload.areas) < 1 or any(area is None for area in area_configs): raise HTTPException(status_code=400, detail="Invalid feedback categories.") errors = meaningful_feedback_errors(payload.message) if errors: raise HTTPException(status_code=400, detail="Feedback text does not pass validation.") redacted, redactions, ratio = redact_feedback(payload.message.strip()) validation = feedback_config.get("validation", {}) if redactions > int(validation.get("reject_redactions_over", 3)) or ratio > float(validation.get("reject_redacted_ratio_over", 0.2)): raise HTTPException(status_code=400, detail="Remove identifying or sensitive information before submitting.") label_ids = list(labels.get("always", [])) label_ids.append(int(type_config["id"])) label_ids.extend(int(area["id"]) for area in area_configs if area) return {"type": type_config, "areas": area_configs, "redactions": redactions}, label_ids, redacted def issue_title(payload: FeedbackRequest, redacted_message: str) -> str: type_label = feedback_config.get("labels", {}).get("types", {}).get(payload.feedback_type, {}).get("en", payload.feedback_type) excerpt = re.sub(r"\s+", " ", redacted_message).strip()[:72] return f"{type_label}: {excerpt}" def issue_body(payload: FeedbackRequest, meta: Dict[str, Any], redacted_message: str) -> str: area_labels = ", ".join(f"{area['en']} (`{area['label']}`)" for area in meta["areas"]) route = payload.route.split("?", 1)[0].split("#", 1)[0] or "/" submitted_at = iso(utc_now()) return f"""## In-app feedback This issue was created automatically from the Arbeidspuls in-app feedback form. The user confirmed that the feedback does not contain sensitive or identifiable information, and that it may be processed, stored and displayed publicly as ordinary project feedback. Obvious contact details are automatically removed when detected. ### User feedback {redacted_message} ### Selected categories - Type: {meta["type"]["en"]} (`{meta["type"]["label"]}`) - Areas: {area_labels} ### Non-identifying context - App version: {payload.app_version} - UI language: {payload.language} - Route: {route} - Viewport: {payload.viewport_category} - Submitted at: {submitted_at} _No contact information, IP address, user agent, report data or local data is intentionally included._ """ def create_gitea_issue(title: str, body: str, label_ids: List[int]) -> Dict[str, Any]: token = os.environ.get("GITEA_FEEDBACK_TOKEN") or os.environ.get("GITEA_FEEDBACK_ACCESS_TOKEN") if not token: raise HTTPException(status_code=503, detail="Feedback service is not configured.") repo = feedback_repo_config() url = f"{repo['base_url']}/api/v1/repos/{repo['owner']}/{repo['repo']}/issues" data = json.dumps({"title": title, "body": body, "labels": label_ids}).encode("utf-8") request = urllib.request.Request( url, data=data, headers={ "Authorization": f"token {token}", "Content-Type": "application/json", "Accept": "application/json", }, method="POST", ) try: with urllib.request.urlopen(request, timeout=12) as response: return json.loads(response.read().decode("utf-8")) except urllib.error.HTTPError as exc: raise HTTPException(status_code=502, detail="Feedback service could not create issue.") from exc except urllib.error.URLError as exc: raise HTTPException(status_code=502, detail="Feedback service is unavailable.") from exc @app.on_event("startup") def startup() -> None: load_feedback_config() init_db() cleanup_expired() @app.post("/api/share", response_model=ShareCreateResponse) def create_share(payload: ShareCreateRequest, request: Request) -> Dict[str, str]: cleanup_expired() assert_rate_limit(request, "create", CREATE_RATE_LIMIT_PER_HOUR) if decoded_size(payload.ciphertext) > MAX_PAYLOAD_BYTES: raise HTTPException(status_code=413, detail="Payload is too large.") if decoded_size(payload.iv) != 12: raise HTTPException(status_code=400, detail="Invalid IV size.") if not re.fullmatch(r"[A-Za-z0-9_-]+", payload.confirm_token_hash): raise HTTPException(status_code=400, detail="Invalid confirm verifier.") now = utc_now() export_id = str(uuid4()) expires_at = now + timedelta(days=EXPIRY_DAYS) with connect() as db: db.execute( """ INSERT INTO active_shares (export_id, created_at, expires_at, ciphertext, iv, schema, confirm_token_hash) VALUES (?, ?, ?, ?, ?, ?, ?) """, (export_id, iso(now), iso(expires_at), payload.ciphertext, payload.iv, payload.share_schema, payload.confirm_token_hash), ) return {"export_id": export_id, "expires_at": iso(expires_at)} @app.get("/api/share/{export_id}") def get_share(export_id: str, request: Request) -> Dict[str, Any]: cleanup_expired() assert_rate_limit(request, "read", READ_RATE_LIMIT_PER_HOUR) with connect() as db: row = db.execute("SELECT * FROM active_shares WHERE export_id = ?", (export_id,)).fetchone() if row: return { "status": "available", "export_id": row["export_id"], "ciphertext": row["ciphertext"], "iv": row["iv"], "schema": row["schema"], "expires_at": row["expires_at"], } audit = db.execute("SELECT delete_reason FROM share_audit WHERE export_id = ?", (export_id,)).fetchone() if audit and audit["delete_reason"] == "expired": return {"status": "expired", "export_id": export_id} if audit: return {"status": "deleted", "export_id": export_id} raise HTTPException(status_code=404, detail="Share not found.") @app.post("/api/share/{export_id}/confirm-import") def confirm_import(export_id: str, payload: ConfirmImportRequest, request: Request) -> Dict[str, str]: cleanup_expired() assert_rate_limit(request, "confirm", CONFIRM_RATE_LIMIT_PER_HOUR) with connect() as db: row = db.execute("SELECT export_id, created_at, confirm_token_hash FROM active_shares WHERE export_id = ?", (export_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="Share is not available.") expected = row["confirm_token_hash"] if not expected or not payload.confirm_token or len(payload.confirm_token) < 16 or token_hash(payload.confirm_token) != expected: raise HTTPException(status_code=403, detail="Invalid confirmation proof.") mark_deleted(db, row["export_id"], row["created_at"], "imported_by_recipient") return {"status": "deleted"} @app.post("/api/feedback") def create_feedback(payload: FeedbackRequest, request: Request) -> Dict[str, Any]: assert_feedback_rate_limit(request) meta, label_ids, redacted_message = validate_feedback_payload(payload) if meta.get("fake_success"): return {"accepted": False} issue = create_gitea_issue(issue_title(payload, redacted_message), issue_body(payload, meta, redacted_message), label_ids) return { "accepted": True, "issue_number": issue.get("number"), "issue_url": issue.get("html_url") }