# modules/common/settings.py import os import json import shutil import threading import re from datetime import datetime from urllib.parse import urlparse from typing import Any, Dict, Iterable, Optional, List # ========================= # Public API Exceptions # ========================= class ValidationError(Exception): pass # ========================= # Helpers # ========================= def _clean(s: Optional[str]) -> str: s = (s or "").strip() if (s.startswith('"') and s.endswith('"')) or (s.startswith("'") and s.endswith("'")): s = s[1:-1].strip() return s def _is_intish(x: Any) -> bool: try: int(str(x).strip()) return True except Exception: return False def _to_bool(x: Any) -> bool: s = str(x).strip().lower() if s in ("1", "true", "yes", "on", "y", "t"): return True if s in ("0", "false", "no", "off", "n", "f"): return False raise ValidationError(f"Expected a boolean, got {x!r}") def _to_int(x: Any) -> int: if _is_intish(x): return int(str(x).strip()) raise ValidationError(f"Expected an integer, got {x!r}") def _to_float(x: Any) -> float: try: return float(str(x).strip()) except Exception: raise ValidationError(f"Expected a float, got {x!r}") def _to_str(x: Any) -> str: return str(x) def _to_list_int(x: Any) -> List[int]: if isinstance(x, list): out = [] for v in x: if not _is_intish(v): raise ValidationError(f"List must contain integers; got {v!r}") out.append(int(v)) return out if isinstance(x, str): toks = [t.strip() for t in x.split(",") if t.strip()] try: return [int(t) for t in toks] except Exception: raise ValidationError(f"Could not parse list of integers from {x!r}") raise ValidationError(f"Expected a list of integers, got {type(x).__name__}") # ---- URL / Time / Date validators ---- def _to_url(x: Any) -> str: s = str(x).strip() if not s: raise ValidationError("URL cannot be empty") p = urlparse(s) if p.scheme not in ("http", "https"): raise ValidationError("URL must start with http:// or https://") if not p.netloc: raise ValidationError("URL missing host") if not re.match(r"^([A-Za-z0-9\-.]+|\d{1,3}(?:\.\d{1,3}){3}|localhost)(:\d+)?$", p.netloc): raise ValidationError("URL host looks invalid") return s _TIME_24H_RE = re.compile(r"^(?P[01]?\d|2[0-3]):(?P[0-5]\d)(?::(?P[0-5]\d))?$") def _to_time_24h(x: Any) -> str: s = str(x).strip() m = _TIME_24H_RE.match(s) if not m: raise ValidationError("Time must be HH:MM or HH:MM:SS (24-hour)") hh = int(m.group("h")) mm = int(m.group("m")) # canonical store as HH:MM return f"{hh:02d}:{mm:02d}" def _expand_two_digit_year(two_digit: int, pivot_year: int) -> int: # Map YY -> same century as pivot_year base = pivot_year - (pivot_year % 100) return base + two_digit def _to_date_ymd(x: Any, *, pivot_year: int | None = None) -> str: s = str(x).strip() if not s: raise ValidationError("Date cannot be empty") m = re.match(r"^(?P\d{2}|\d{4})-(?P\d{1,2})-(?P\d{1,2})$", s) if not m: raise ValidationError("Expected date format YYYY-MM-DD or YY-MM-DD") y = m.group("y") month = int(m.group("m")) day = int(m.group("d")) if len(y) == 2: yy = int(y) if pivot_year is not None: year = _expand_two_digit_year(yy, pivot_year) dt = datetime(year, month, day) return dt.strftime("%Y-%m-%d") else: dt = datetime.strptime(s, "%y-%m-%d") return dt.strftime("%Y-%m-%d") year = int(y) dt = datetime(year, month, day) return dt.strftime("%Y-%m-%d") def _to_date_dmy(x: Any, *, pivot_year: int | None = None) -> str: s = str(x).strip() if not s: raise ValidationError("Date cannot be empty") m = re.match(r"^(?P\d{1,2})-(?P\d{1,2})-(?P\d{2}|\d{4})$", s) if not m: raise ValidationError("Expected date format DD-MM-YYYY or DD-MM-YY") day = int(m.group("d")) month = int(m.group("m")) y = m.group("y") if len(y) == 2: yy = int(y) if pivot_year is not None: year = _expand_two_digit_year(yy, pivot_year) dt = datetime(year, month, day) return dt.strftime("%d-%m-%Y") else: dt = datetime.strptime(s, "%d-%m-%y") return dt.strftime("%d-%m-%Y") year = int(y) dt = datetime(year, month, day) return dt.strftime("%d-%m-%Y") # ========================= # Schema (non-sensitive, front-end editable) # ========================= SETTINGS_SCHEMA: Dict[str, Dict[str, Any]] = { # Channels (IDs) "mod_channel_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Moderation command channel."}, "modlog_channel_id": {"type": "int", "default": 0, "nonzero": True, "desc": "ModLog channel."}, "pirates_list_channel_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Pirates list channel."}, "dd_channel_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Deep Desert updates channel."}, "report_channel_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Reports/approvals channel."}, "userslist_channel_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Users list channel."}, "trigger_channel_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Trigger channel for Auto VC."}, # Roles (IDs) "rules_role_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Rules-agreed role ID."}, "moderator_role_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Moderator role ID."}, "intel_mod_role_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Intel mod role ID."}, "full_access_role_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Full Access role ID."}, "field_mod_role_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Field mod role ID."}, "engagement_role_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Engagement role ID."}, "admin_role_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Admin role ID."}, # Message IDs "rules_message_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Rules message ID."}, "engagement_message_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Engagement message ID."}, "nickname_message_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Nickname message ID."}, # Emojis (IDs) "emoji_carrier_crawler_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Custom emoji: carrier/crawler."}, "emoji_melange_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Custom emoji: melange."}, "emoji_sand_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Custom emoji: sand."}, "emoji_harvester_crew": {"type": "int", "default": 0, "nonzero": True, "desc": "Custom emoji: harvester crew"}, "emoji_escort_crew": {"type": "int", "default": 0, "nonzero": True, "desc": "Custom emoji: escort crew"}, "emoji_fedaykin": {"type": "int", "default": 0, "nonzero": True, "desc": "Custom emoji: fedaykin - kill squad"}, # Auto-VC "auto_vc_category_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Category to host Auto-VCs."}, "auto_vc_cleanup_delay": {"type": "int", "default": 30, "nonzero": True, "desc": "Seconds before empty Auto-VCs are cleaned up."}, "vc_name_prefix": {"type": "str", "default": "DD Crew", "desc": "Auto-VC name prefix."}, # Pirates / threat model "threat_group_threshold": {"type": "int", "default": 3, "desc": "Threshold for group classification."}, "threat_min_samples_for_stats": {"type": "int", "default": 3, "desc": "Min samples for stats."}, "threat_w_destruction": {"type": "float", "default": 0.40, "desc": "Weight: destruction."}, "threat_w_group": {"type": "float", "default": 0.20, "desc": "Weight: group."}, "threat_w_kill": {"type": "float", "default": 0.30, "desc": "Weight: kill."}, "threat_w_skill": {"type": "float", "default": 0.10, "desc": "Weight: skill."}, # SpicePay "spicepay_base_weight": {"type": "float", "default": 25.0, "desc": "Base weight."}, "spicepay_carrier_bonus": {"type": "float", "default": 12.5, "desc": "Carrier bonus."}, "spicepay_crawler_bonus": {"type": "float", "default": 12.5, "desc": "Crawler bonus."}, "spicepay_lsr_cut_percent": {"type": "float", "default": 10.0, "desc": "SR cut percent."}, # Jobs / loops "user_cards_cron_enabled": {"type": "bool", "default": True, "desc": "Enable user-cards cron."}, "nick_nudge_loop_enabled": {"type": "bool", "default": False, "desc": "Enable nick-nudge loop."}, # Deep Desert fetcher "dd_fetcher": {"type": "str", "default": "playwright", "allowed": ["playwright","requests"], "desc": "Fetcher backend."}, "dd_pw_timeout_ms": {"type": "int", "default": 60000, "desc": "Playwright timeout (ms)."}, "dd_pw_wait_ms": {"type": "int", "default": 0, "desc": "Extra wait after navigation (ms)."}, # Repo (non-secret) "repo_url": {"type": "url", "default": "https://git.rolfsvaag.no/frarol96/shaiwatcher", "desc": "Repository URL."}, "repo_branch": {"type": "str", "default": "main", "desc": "Repository branch."}, "repo_rss": {"type": "url", "default": "https://git.rolfsvaag.no/frarol96/shaiwatcher.rss", "desc": "Repository RSS feed."}, # Admin allow-list for /shaiadmin (besides owner) "admin_user_ids": {"type": "list[int]", "default": [], "desc": "User IDs allowed to use /shaiadmin."}, "admin_role_ids": {"type": "list[int]", "default": [], "desc": "Role IDs allowed to use /shaiadmin."}, # Misc "check_time_utc": {"type": "time_24h", "default": "03:00", "desc": "Daily check time (UTC HH:MM)"}, "ignore_test_level": {"type": "int", "default": 0, "desc": "Test-level ignore flag."}, "lang": {"type": "str", "default": "C.UTF-8", "desc": "Locale (if referenced)."}, # Examples of date keys you may enable later: # "feature_window_start": {"type": "date_ymd", "default": "", "allow_empty": True, "pivot_year": 2000, "desc": "Start date (YYYY-MM-DD or YY-MM-DD)."}, # "event_date_dmy": {"type": "date_dmy", "default": "", "allow_empty": True, "pivot_year": 2000, "desc": "Event date (DD-MM-YYYY or DD-MM-YY)."}, } # ========================= # Env — ONLY the allowed set (plus legacy HOME_GUILD_ID alias) # ========================= def _allowed_env_map() -> Dict[str, str]: env: Dict[str, str] = {} if os.getenv("DISCORD_TOKEN"): env["discord_token"] = _clean(os.getenv("DISCORD_TOKEN")) data_file = os.getenv("DATA_FILE") if data_file: env["data_file"] = _clean(data_file) if os.getenv("DOCS_HOST_IP"): env["docs_host_ip"] = _clean(os.getenv("DOCS_HOST_IP")) if os.getenv("DOCS_HOST_PORT"): env["docs_host_port"] = _clean(os.getenv("DOCS_HOST_PORT")) if os.getenv("HOME_GUILD_ID"): env["home_guild_id"] = _clean(os.getenv("HOME_GUILD_ID")) if os.getenv("REPO_AHTOKEN"): env["repo_ahtoken"] = _clean(os.getenv("REPO_AHTOKEN")) return env # ========================= # On-disk store + globals # ========================= _SETTINGS_LOCK = threading.Lock() _FILE_MAP: Dict[str, Any] = {} _ENV_MAP: Dict[str, str] = {} _SETTINGS_FILE: Optional[str] = None def settings_path() -> str: """Place settings.json next to DATA_FILE if available; otherwise default to ./data/settings.json.""" data_file = os.getenv("DATA_FILE") if data_file: base = os.path.dirname(data_file) or "." return os.path.join(base, "settings.json") return "./data/settings.json" def _ensure_loaded(): global _SETTINGS_FILE, _FILE_MAP, _ENV_MAP with _SETTINGS_LOCK: if _SETTINGS_FILE is not None: return _SETTINGS_FILE = settings_path() _ENV_MAP = _allowed_env_map() if os.path.exists(_SETTINGS_FILE): try: with open(_SETTINGS_FILE, "r", encoding="utf-8") as f: _FILE_MAP = json.load(f) or {} except Exception: _FILE_MAP = {} else: _FILE_MAP = {} _save_locked() changed = False for key, meta in SETTINGS_SCHEMA.items(): if key not in _FILE_MAP: _FILE_MAP[key] = meta.get("default") changed = True if changed: _save_locked() def _save_locked(): global _SETTINGS_FILE, _FILE_MAP path = _SETTINGS_FILE or settings_path() os.makedirs(os.path.dirname(path) or ".", exist_ok=True) tmp = path + ".tmp" with open(tmp, "w", encoding="utf-8") as f: json.dump(_FILE_MAP, f, indent=2, ensure_ascii=False) if os.path.exists(path): try: shutil.copy2(path, path + ".bak") except Exception: pass os.replace(tmp, path) def settings_get_all() -> Dict[str, Any]: _ensure_loaded() with _SETTINGS_LOCK: return dict(_FILE_MAP) def _cast_value(name: str, raw: Any, *, enforce_nonzero: bool = True) -> Any: meta = SETTINGS_SCHEMA.get(name) if not meta: raise ValidationError(f"Unknown setting: {name}") allow_empty = bool(meta.get("allow_empty", False)) t = meta.get("type") if t in ("str", "url", "time_24h", "date_ymd", "date_dmy"): s = str(raw).strip() if s == "" and allow_empty: val = "" else: if t == "str": val = _to_str(raw) elif t == "url": val = _to_url(raw) elif t == "time_24h": val = _to_time_24h(raw) elif t == "date_ymd": val = _to_date_ymd(raw, pivot_year=meta.get("pivot_year")) elif t == "date_dmy": val = _to_date_dmy(raw, pivot_year=meta.get("pivot_year")) elif t == "bool": val = _to_bool(raw) elif t == "int": val = _to_int(raw) elif t == "float": val = _to_float(raw) elif t == "list[int]": val = _to_list_int(raw) else: raise ValidationError(f"Unsupported type for {name}: {t}") # enum constraint (always enforced) if "allowed" in meta: allowed = meta["allowed"] if val not in allowed: raise ValidationError(f"`{name}` must be one of {allowed}, got {val!r}") # nonzero constraint (skippable for bulk uploads) if enforce_nonzero and meta.get("nonzero") and isinstance(val, int) and val == 0: raise ValidationError(f"`{name}` must be a non-zero integer.") return val def settings_set(name: str, raw_value: Any) -> bool: _ensure_loaded() with _SETTINGS_LOCK: name = name.lower().strip() if name not in SETTINGS_SCHEMA: raise ValidationError(f"Unknown setting: {name}") new_val = _cast_value(name, raw_value) old_val = _FILE_MAP.get(name, SETTINGS_SCHEMA[name].get("default")) if old_val == new_val: return False _FILE_MAP[name] = new_val _save_locked() return True def settings_reset(name: str) -> None: _ensure_loaded() with _SETTINGS_LOCK: name = name.lower().strip() if name not in SETTINGS_SCHEMA: raise ValidationError(f"Unknown setting: {name}") _FILE_MAP[name] = SETTINGS_SCHEMA[name].get("default") _save_locked() def settings_import_bulk(obj: Dict[str, Any]) -> List[str]: _ensure_loaded() if not isinstance(obj, dict): raise ValidationError("Uploaded JSON must be an object/dict at the top level.") new_map: Dict[str, Any] = dict(_FILE_MAP) changed: List[str] = [] for k, v in obj.items(): if k not in SETTINGS_SCHEMA: raise ValidationError(f"Unknown setting in upload: {k}") # Allow 0 for keys marked nonzero during bulk import (treating as 'unset' sentinel) new_val = _cast_value(k, v, enforce_nonzero=False) if new_map.get(k) != new_val: new_map[k] = new_val changed.append(k) with _SETTINGS_LOCK: if changed: _FILE_MAP.update({k: new_map[k] for k in changed}) _save_locked() return changed # ========================= # Unified read view (keeps cfg(bot) contract) # ========================= class ConfigView: """ Reads: - Schema-managed keys from settings.json - Env: discord_token, data_file, docs_host_ip, docs_host_port, home_guild_id - Fallback to bot.config['DEFAULT'] for anything else (legacy) Helpers: get/int/bool/float/list, to_dict() """ def __init__(self, bot=None): _ensure_loaded() self._env_map = dict(_ENV_MAP) try: self._default = (getattr(bot, "config", {}) or {}).get("DEFAULT", {}) or {} except Exception: self._default = {} def _effective_map(self) -> Dict[str, str]: merged: Dict[str, str] = {} # defaults first for k in getattr(self._default, "keys", lambda: [])(): merged[k] = _clean(str(self._default.get(k, ""))) # env overlay for k, v in self._env_map.items(): merged[k] = _clean(v) # schema values overlay defaults for k, meta in SETTINGS_SCHEMA.items(): v = _FILE_MAP.get(k, meta.get("default")) if isinstance(v, (list, dict)): merged[k] = json.dumps(v, ensure_ascii=False) else: merged[k] = _clean(str(v)) return merged def get(self, key: str, default: str = "") -> str: m = self._effective_map() v = _clean(m.get(key.lower(), "")) return v if v != "" else default def int(self, key: str, default: int = 0) -> int: s = self.get(key, "") try: return int(s) except Exception: return default def float(self, key: str, default: float = 0.0) -> float: s = self.get(key, "") try: return float(s) except Exception: return default def bool(self, key: str, default: bool = False) -> bool: s = self.get(key, "") if s == "": return default s = s.lower() if s in ("1", "true", "yes", "on", "y", "t"): return True if s in ("0", "false", "no", "off", "n", "f"): return False return default def list(self, key: str, default: Optional[Iterable[str]] = None, sep: str = ",") -> Iterable[str]: s = self.get(key, "") if s == "": return list(default or []) parts = [p.strip() for p in s.split(sep)] return [p for p in parts if p] def to_dict(self) -> Dict[str, str]: return dict(self._effective_map()) def cfg(bot=None) -> ConfigView: return ConfigView(bot)