Arbeidspuls/backend/app.py
2026-05-30 13:42:28 +02:00

497 lines
19 KiB
Python

from __future__ import annotations
import base64
import hashlib
import hmac
import json
import os
import re
import sqlite3
import urllib.error
import urllib.request
from contextlib import contextmanager
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
from uuid import uuid4
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
def load_env_file(path: Path) -> None:
if not path.exists():
return
for raw_line in path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
os.environ.setdefault(key.strip(), value.strip().strip('"').strip("'"))
load_env_file(Path(os.environ.get("ARBEIDSPULS_ENV_FILE", ".env")))
def default_feedback_config_path() -> Path:
if "ARBEIDSPULS_FEEDBACK_CONFIG" in os.environ:
return Path(os.environ["ARBEIDSPULS_FEEDBACK_CONFIG"])
repo_root_path = Path("backend/feedback_config.json")
return repo_root_path if repo_root_path.exists() else Path("feedback_config.json")
DATABASE_PATH = Path(os.environ.get("ARBEIDSPULS_SHARE_DB", "backend/share-data.sqlite3"))
FEEDBACK_CONFIG_PATH = default_feedback_config_path()
EXPIRY_DAYS = int(os.environ.get("ARBEIDSPULS_SHARE_EXPIRY_DAYS", "7"))
MAX_PAYLOAD_BYTES = 2 * 1024 * 1024
MAX_CIPHERTEXT_CHARS = 3_000_000
CREATE_RATE_LIMIT_PER_HOUR = 10
READ_RATE_LIMIT_PER_HOUR = 120
CONFIRM_RATE_LIMIT_PER_HOUR = 60
TRUSTED_PROXY_HOSTS = {"127.0.0.1", "::1", "localhost"}
ALLOWED_ORIGINS = [
"https://arbeidspuls.rolfsvaag.no",
"http://localhost:5173",
"http://127.0.0.1:5173",
]
app = FastAPI(title="Arbeidspuls secure share API")
app.add_middleware(
CORSMiddleware,
allow_origins=[origin.strip() for origin in os.environ.get("ARBEIDSPULS_CORS_ORIGINS", ",".join(ALLOWED_ORIGINS)).split(",")],
allow_credentials=False,
allow_methods=["POST", "GET", "OPTIONS"],
allow_headers=["Content-Type"],
)
rate_bucket: Dict[str, List[datetime]] = {}
feedback_rate_bucket: Dict[str, List[datetime]] = {}
feedback_config: Dict[str, Any] = {}
class ShareCreateRequest(BaseModel):
ciphertext: str = Field(min_length=1, max_length=MAX_CIPHERTEXT_CHARS)
iv: str = Field(min_length=1, max_length=64)
share_schema: str = Field(alias="schema", min_length=1, max_length=80)
confirm_token_hash: str = Field(min_length=32, max_length=128)
class ConfirmImportRequest(BaseModel):
confirm_token: Optional[str] = Field(default=None, max_length=256)
class ShareCreateResponse(BaseModel):
export_id: str
expires_at: str
class FeedbackRequest(BaseModel):
feedback_type: str = Field(min_length=1, max_length=32)
areas: List[str] = Field(min_items=1, max_items=8)
message: str = Field(min_length=1, max_length=4500)
consent: bool
language: str = Field(default="nb-NO", max_length=12)
route: str = Field(default="/", max_length=180)
viewport_category: str = Field(default="unknown", max_length=24)
app_version: str = Field(default="unknown", max_length=32)
form_started_at: str = Field(default="", max_length=40)
form_completed_at: str = Field(default="", max_length=40)
company_website: str = Field(default="", max_length=200)
urgent_contact_allowed: bool = False
@contextmanager
def connect():
DATABASE_PATH.parent.mkdir(parents=True, exist_ok=True)
db = sqlite3.connect(DATABASE_PATH)
db.row_factory = sqlite3.Row
try:
yield db
db.commit()
finally:
db.close()
def utc_now() -> datetime:
return datetime.now(timezone.utc)
def iso(value: datetime) -> str:
return value.isoformat().replace("+00:00", "Z")
def init_db() -> None:
with connect() as db:
db.execute(
"""
CREATE TABLE IF NOT EXISTS active_shares (
export_id TEXT PRIMARY KEY,
created_at TEXT NOT NULL,
expires_at TEXT NOT NULL,
ciphertext TEXT NOT NULL,
iv TEXT NOT NULL,
schema TEXT NOT NULL
)
"""
)
db.execute(
"""
CREATE TABLE IF NOT EXISTS share_audit (
export_id TEXT PRIMARY KEY,
created_at TEXT NOT NULL,
deleted_at TEXT NOT NULL,
delete_reason TEXT NOT NULL
)
"""
)
columns = {row["name"] for row in db.execute("PRAGMA table_info(active_shares)").fetchall()}
if "confirm_token_hash" not in columns:
db.execute("ALTER TABLE active_shares ADD COLUMN confirm_token_hash TEXT")
@app.middleware("http")
async def security_headers(request: Request, call_next):
response = await call_next(request)
if request.url.path.startswith("/api/share") or request.url.path.startswith("/api/feedback"):
response.headers["Cache-Control"] = "no-store"
response.headers["Pragma"] = "no-cache"
response.headers["Expires"] = "0"
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
response.headers["Permissions-Policy"] = "camera=(), microphone=(), geolocation=(), payment=()"
return response
def decoded_size(value: str) -> int:
if not re.fullmatch(r"[A-Za-z0-9_-]+", value):
raise HTTPException(status_code=400, detail="Invalid base64url payload.")
padded = value.replace("-", "+").replace("_", "/") + "=" * (-len(value) % 4)
try:
return len(base64.b64decode(padded, validate=True))
except Exception as exc:
raise HTTPException(status_code=400, detail="Invalid base64url payload.") from exc
def cleanup_expired() -> None:
now = iso(utc_now())
with connect() as db:
rows = db.execute("SELECT export_id, created_at FROM active_shares WHERE expires_at <= ?", (now,)).fetchall()
for row in rows:
mark_deleted(db, row["export_id"], row["created_at"], "expired")
def mark_deleted(db: sqlite3.Connection, export_id: str, created_at: str, reason: str) -> None:
db.execute("DELETE FROM active_shares WHERE export_id = ?", (export_id,))
db.execute(
"""
INSERT OR REPLACE INTO share_audit (export_id, created_at, deleted_at, delete_reason)
VALUES (?, ?, ?, ?)
""",
(export_id, created_at, iso(utc_now()), reason),
)
def client_ip(request: Request) -> str:
direct_host = request.client.host if request.client else "unknown"
if direct_host in TRUSTED_PROXY_HOSTS:
real_ip = request.headers.get("x-real-ip", "").strip()
if real_ip:
return real_ip
return direct_host
def assert_rate_limit(request: Request, action: str, limit: int) -> None:
ip = client_ip(request)
bucket_key = f"{action}:{ip}"
cutoff = utc_now() - timedelta(hours=1)
recent = [created for created in rate_bucket.get(bucket_key, []) if created > cutoff]
if len(recent) >= limit:
raise HTTPException(status_code=429, detail="Rate limit exceeded.")
recent.append(utc_now())
rate_bucket[bucket_key] = recent
def token_hash(confirm_token: str) -> str:
digest = hashlib.sha256(confirm_token.encode("utf-8")).digest()
return base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=")
def load_feedback_config() -> None:
global feedback_config
if FEEDBACK_CONFIG_PATH.exists():
with FEEDBACK_CONFIG_PATH.open("r", encoding="utf-8") as handle:
feedback_config = json.load(handle)
else:
feedback_config = {}
def feedback_enabled() -> bool:
return os.environ.get("FEEDBACK_ENABLED", "true").lower() in {"1", "true", "yes", "on"}
def feedback_repo_config() -> Dict[str, str]:
repo = feedback_config.get("repo", {})
return {
"base_url": os.environ.get("GITEA_BASE_URL", repo.get("base_url", "https://git.rolfsvaag.no")).rstrip("/"),
"owner": os.environ.get("GITEA_OWNER", repo.get("owner", "Rolfsvaag_Datateknikk")),
"repo": os.environ.get("GITEA_REPO", repo.get("repo", "Arbeidspuls")),
}
def parse_iso(value: str) -> Optional[datetime]:
if not value:
return None
try:
return datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError:
return None
def feedback_rate_key(request: Request) -> str:
pepper = os.environ.get("FEEDBACK_RATE_LIMIT_PEPPER", "")
if not pepper:
pepper = "missing-feedback-rate-limit-pepper"
ip = client_ip(request)
user_agent = request.headers.get("user-agent", "")
digest = hmac.new(pepper.encode("utf-8"), f"{ip}|{user_agent}".encode("utf-8"), hashlib.sha256).hexdigest()
return digest
def assert_feedback_rate_limit(request: Request) -> None:
config = feedback_config.get("rate_limit", {})
key = feedback_rate_key(request)
now = utc_now()
ten_minute_cutoff = now - timedelta(minutes=10)
day_cutoff = now - timedelta(hours=24)
recent = [created for created in feedback_rate_bucket.get(key, []) if created > day_cutoff]
if len([created for created in recent if created > ten_minute_cutoff]) >= int(config.get("ten_minutes", 3)):
raise HTTPException(status_code=429, detail="Rate limit exceeded.")
if len(recent) >= int(config.get("twenty_four_hours", 10)):
raise HTTPException(status_code=429, detail="Rate limit exceeded.")
recent.append(now)
feedback_rate_bucket[key] = recent
def meaningful_feedback_errors(message: str) -> List[str]:
config = feedback_config.get("validation", {})
min_chars = int(config.get("min_chars", 30))
min_words = int(config.get("min_words", 5))
min_letters = int(config.get("min_letters", 15))
max_chars = int(config.get("max_chars", 4000))
text = message.strip()
words = [word for word in re.split(r"\s+", text) if word]
letters = re.findall(r"[A-Za-zÆØÅæøå]", text)
errors = []
if len(text) < min_chars:
errors.append("min_chars")
if len(text) > max_chars:
errors.append("max_chars")
if len(words) < min_words:
errors.append("min_words")
if len(letters) < min_letters:
errors.append("min_letters")
if re.fullmatch(r"[\d\W_]+", text, re.UNICODE):
errors.append("not_meaningful")
if re.search(r"(.)\1{11,}", text, re.UNICODE):
errors.append("repetition")
return errors
REDACTION_PATTERNS = [
re.compile(r"[^\s@]+@[^\s@]+\.[^\s@]+"),
re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b"),
re.compile(r"\b\d{11}\b"),
re.compile(r"\b\d{8,}\b"),
re.compile(r"(?:\+?\d[\s().-]*){8,}"),
]
def redact_feedback(message: str) -> tuple[str, int, float]:
redactions = 0
redacted = message
for pattern in REDACTION_PATTERNS:
redacted, count = pattern.subn("[removed]", redacted)
redactions += count
changed_chars = max(0, len(message) - len(redacted.replace("[removed]", "")))
ratio = changed_chars / max(1, len(message))
return redacted, redactions, ratio
def validate_feedback_payload(payload: FeedbackRequest) -> tuple[Dict[str, Any], List[int], str]:
if not feedback_enabled():
raise HTTPException(status_code=503, detail="Feedback is not enabled.")
if not payload.consent:
raise HTTPException(status_code=400, detail="Consent is required.")
if payload.company_website.strip() or payload.urgent_contact_allowed:
return {"fake_success": True}, [], ""
completed = parse_iso(payload.form_completed_at)
started = parse_iso(payload.form_started_at)
min_seconds = int(feedback_config.get("validation", {}).get("min_form_seconds", 5))
if not completed or not started or (completed - started).total_seconds() < min_seconds:
raise HTTPException(status_code=400, detail="The form was submitted too quickly.")
labels = feedback_config.get("labels", {})
type_config = labels.get("types", {}).get(payload.feedback_type)
area_configs = [labels.get("areas", {}).get(area) for area in payload.areas]
if not type_config or len(payload.areas) < 1 or any(area is None for area in area_configs):
raise HTTPException(status_code=400, detail="Invalid feedback categories.")
errors = meaningful_feedback_errors(payload.message)
if errors:
raise HTTPException(status_code=400, detail="Feedback text does not pass validation.")
redacted, redactions, ratio = redact_feedback(payload.message.strip())
validation = feedback_config.get("validation", {})
if redactions > int(validation.get("reject_redactions_over", 3)) or ratio > float(validation.get("reject_redacted_ratio_over", 0.2)):
raise HTTPException(status_code=400, detail="Remove identifying or sensitive information before submitting.")
label_ids = list(labels.get("always", []))
label_ids.append(int(type_config["id"]))
label_ids.extend(int(area["id"]) for area in area_configs if area)
return {"type": type_config, "areas": area_configs, "redactions": redactions}, label_ids, redacted
def issue_title(payload: FeedbackRequest, redacted_message: str) -> str:
type_label = feedback_config.get("labels", {}).get("types", {}).get(payload.feedback_type, {}).get("en", payload.feedback_type)
excerpt = re.sub(r"\s+", " ", redacted_message).strip()[:72]
return f"{type_label}: {excerpt}"
def issue_body(payload: FeedbackRequest, meta: Dict[str, Any], redacted_message: str) -> str:
area_labels = ", ".join(f"{area['en']} (`{area['label']}`)" for area in meta["areas"])
route = payload.route.split("?", 1)[0].split("#", 1)[0] or "/"
submitted_at = iso(utc_now())
return f"""## In-app feedback
This issue was created automatically from the Arbeidspuls in-app feedback form.
The user confirmed that the feedback does not contain sensitive or identifiable information, and that it may be processed, stored and displayed publicly as ordinary project feedback. Obvious contact details are automatically removed when detected.
### User feedback
{redacted_message}
### Selected categories
- Type: {meta["type"]["en"]} (`{meta["type"]["label"]}`)
- Areas: {area_labels}
### Non-identifying context
- App version: {payload.app_version}
- UI language: {payload.language}
- Route: {route}
- Viewport: {payload.viewport_category}
- Submitted at: {submitted_at}
_No contact information, IP address, user agent, report data or local data is intentionally included._
"""
def create_gitea_issue(title: str, body: str, label_ids: List[int]) -> Dict[str, Any]:
token = os.environ.get("GITEA_FEEDBACK_TOKEN") or os.environ.get("GITEA_FEEDBACK_ACCESS_TOKEN")
if not token:
raise HTTPException(status_code=503, detail="Feedback service is not configured.")
repo = feedback_repo_config()
url = f"{repo['base_url']}/api/v1/repos/{repo['owner']}/{repo['repo']}/issues"
data = json.dumps({"title": title, "body": body, "labels": label_ids}).encode("utf-8")
request = urllib.request.Request(
url,
data=data,
headers={
"Authorization": f"token {token}",
"Content-Type": "application/json",
"Accept": "application/json",
},
method="POST",
)
try:
with urllib.request.urlopen(request, timeout=12) as response:
return json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as exc:
raise HTTPException(status_code=502, detail="Feedback service could not create issue.") from exc
except urllib.error.URLError as exc:
raise HTTPException(status_code=502, detail="Feedback service is unavailable.") from exc
@app.on_event("startup")
def startup() -> None:
load_feedback_config()
init_db()
cleanup_expired()
@app.post("/api/share", response_model=ShareCreateResponse)
def create_share(payload: ShareCreateRequest, request: Request) -> Dict[str, str]:
cleanup_expired()
assert_rate_limit(request, "create", CREATE_RATE_LIMIT_PER_HOUR)
if decoded_size(payload.ciphertext) > MAX_PAYLOAD_BYTES:
raise HTTPException(status_code=413, detail="Payload is too large.")
if decoded_size(payload.iv) != 12:
raise HTTPException(status_code=400, detail="Invalid IV size.")
if not re.fullmatch(r"[A-Za-z0-9_-]+", payload.confirm_token_hash):
raise HTTPException(status_code=400, detail="Invalid confirm verifier.")
now = utc_now()
export_id = str(uuid4())
expires_at = now + timedelta(days=EXPIRY_DAYS)
with connect() as db:
db.execute(
"""
INSERT INTO active_shares (export_id, created_at, expires_at, ciphertext, iv, schema, confirm_token_hash)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(export_id, iso(now), iso(expires_at), payload.ciphertext, payload.iv, payload.share_schema, payload.confirm_token_hash),
)
return {"export_id": export_id, "expires_at": iso(expires_at)}
@app.get("/api/share/{export_id}")
def get_share(export_id: str, request: Request) -> Dict[str, Any]:
cleanup_expired()
assert_rate_limit(request, "read", READ_RATE_LIMIT_PER_HOUR)
with connect() as db:
row = db.execute("SELECT * FROM active_shares WHERE export_id = ?", (export_id,)).fetchone()
if row:
return {
"status": "available",
"export_id": row["export_id"],
"ciphertext": row["ciphertext"],
"iv": row["iv"],
"schema": row["schema"],
"expires_at": row["expires_at"],
}
audit = db.execute("SELECT delete_reason FROM share_audit WHERE export_id = ?", (export_id,)).fetchone()
if audit and audit["delete_reason"] == "expired":
return {"status": "expired", "export_id": export_id}
if audit:
return {"status": "deleted", "export_id": export_id}
raise HTTPException(status_code=404, detail="Share not found.")
@app.post("/api/share/{export_id}/confirm-import")
def confirm_import(export_id: str, payload: ConfirmImportRequest, request: Request) -> Dict[str, str]:
cleanup_expired()
assert_rate_limit(request, "confirm", CONFIRM_RATE_LIMIT_PER_HOUR)
with connect() as db:
row = db.execute("SELECT export_id, created_at, confirm_token_hash FROM active_shares WHERE export_id = ?", (export_id,)).fetchone()
if not row:
raise HTTPException(status_code=404, detail="Share is not available.")
expected = row["confirm_token_hash"]
if not expected or not payload.confirm_token or len(payload.confirm_token) < 16 or token_hash(payload.confirm_token) != expected:
raise HTTPException(status_code=403, detail="Invalid confirmation proof.")
mark_deleted(db, row["export_id"], row["created_at"], "imported_by_recipient")
return {"status": "deleted"}
@app.post("/api/feedback")
def create_feedback(payload: FeedbackRequest, request: Request) -> Dict[str, Any]:
assert_feedback_rate_limit(request)
meta, label_ids, redacted_message = validate_feedback_payload(payload)
if meta.get("fake_success"):
return {"accepted": False}
issue = create_gitea_issue(issue_title(payload, redacted_message), issue_body(payload, meta, redacted_message), label_ids)
return {
"accepted": True,
"issue_number": issue.get("number"),
"issue_url": issue.get("html_url")
}