diff --git a/.gitignore b/.gitignore
index c233395..e7d3710 100644
--- a/.gitignore
+++ b/.gitignore
@@ -14,6 +14,7 @@ NOTES.md
sanity/
.offline_data.json
dev/.env.production
+dev/portainer_config.png
# Tools
wrapper/
diff --git a/assets/images/escort_logo.png b/assets/images/escort_logo.png
new file mode 100644
index 0000000..a9c0b77
Binary files /dev/null and b/assets/images/escort_logo.png differ
diff --git a/assets/images/escort_logo.svg b/assets/images/escort_logo.svg
new file mode 100644
index 0000000..c053f03
--- /dev/null
+++ b/assets/images/escort_logo.svg
@@ -0,0 +1,108 @@
+
+
+
+
diff --git a/assets/images/feydakin_logo.png b/assets/images/feydakin_logo.png
new file mode 100644
index 0000000..d1c3145
Binary files /dev/null and b/assets/images/feydakin_logo.png differ
diff --git a/assets/images/feydakin_logo.svg b/assets/images/feydakin_logo.svg
new file mode 100644
index 0000000..7f7a59a
--- /dev/null
+++ b/assets/images/feydakin_logo.svg
@@ -0,0 +1,93 @@
+
+
+
+
diff --git a/assets/images/harvester_logo.png b/assets/images/harvester_logo.png
new file mode 100644
index 0000000..e444fd5
Binary files /dev/null and b/assets/images/harvester_logo.png differ
diff --git a/assets/images/harvester_logo.svg b/assets/images/harvester_logo.svg
new file mode 100644
index 0000000..fa6550b
--- /dev/null
+++ b/assets/images/harvester_logo.svg
@@ -0,0 +1,51 @@
+
+
+
+
diff --git a/bot.py b/bot.py
index b5f87cd..55314d1 100644
--- a/bot.py
+++ b/bot.py
@@ -9,10 +9,9 @@ from modules.common.boot_notice import post_boot_notice
# Version consists of:
# Major.Enhancement.Minor.Patch.Test (Test is alphanumeric; doesn’t trigger auto update)
-VERSION = "0.4.2.1.a1"
+VERSION = "0.5.1.1.a1"
# ---------- Env loading ----------
-
load_dotenv()
def _get_env(name: str, default: str = "") -> str:
@@ -20,11 +19,26 @@ def _get_env(name: str, default: str = "") -> str:
return (v or "").strip().strip('"').strip("'") or default
TOKEN = _get_env("DISCORD_TOKEN")
-DATA_FILE = _get_env("SHAI_DATA") or _get_env("SHAI_DATA_FILE") or "/data/data.json"
+DATA_FILE = _get_env("DATA_FILE") or "./data/data.json"
print("[Config] DISCORD_TOKEN set:", bool(TOKEN))
print("[Config] DATA_FILE:", DATA_FILE)
+# ---------- Ensure data path exists (fallback if not writable) ----------
+data_dir = os.path.dirname(DATA_FILE) or "."
+try:
+ os.makedirs(data_dir, exist_ok=True)
+except PermissionError:
+ fallback = "./data/data.json"
+ print(f"[Config] No permission to create '{data_dir}'. Falling back to {fallback}")
+ DATA_FILE = fallback
+ data_dir = os.path.dirname(DATA_FILE)
+ os.makedirs(data_dir, exist_ok=True)
+
+if not os.path.exists(DATA_FILE):
+ with open(DATA_FILE, "w", encoding="utf-8") as f:
+ f.write("{}")
+
# ---------- Discord intents ----------
intents = discord.Intents.default()
@@ -52,13 +66,10 @@ bot.data_manager = DataManager(DATA_FILE)
# ---------- Self-check: resolve from ENV first, then cfg_helper ----------
def _resolve_channel_id(c, key: str) -> int:
- # 1) ENV always wins
- env_key = f"SHAI_{key.upper()}"
- raw = os.getenv(env_key, "").strip().strip('"').strip("'")
- if raw.isdigit():
- return int(raw)
-
- # 2) Try cfg_helper (if it happens to know)
+ """
+ Resolve channel IDs from the runtime settings store (cfg), with a final
+ fallback to legacy bot.config['DEFAULT'] if present. No SHAI_* env usage.
+ """
try:
v = int(c.int(key, 0))
if v:
@@ -66,9 +77,8 @@ def _resolve_channel_id(c, key: str) -> int:
except Exception:
pass
- # 3) Last resort: legacy bot.config shapes
try:
- # bot.config like dict
+ # legacy DEFAULT mapping (ConfigParser-like or our shim)
v = int(getattr(c, "get", lambda *_: 0)(key, 0))
if v:
return v
@@ -77,7 +87,6 @@ def _resolve_channel_id(c, key: str) -> int:
return 0
-
async def _guild_selfcheck(g: discord.Guild, c):
problems = []
@@ -128,92 +137,47 @@ async def on_ready():
print("[SelfCheck] failed:", repr(e))
# ---------- Slash command scope & sync ----------
- #
- # Toggle here (or set SHAI_SLASH_GUILD_ONLY=true/false):
- guild_only = env_cfg.bool("slash_guild_only", True)
-
- # Choose target guilds for "instant" registration
- target_gids = set()
- for key in ("home_guild_id", "dev_guild_id"):
- val = env_cfg.get(key)
- if val:
- try:
- target_gids.add(int(val))
- except Exception:
- pass
-
try:
- if guild_only and target_gids:
- print(f"[Slash] Mode: GUILD-ONLY to {sorted(target_gids)}")
+ # env_cfg already exists above in on_ready()
+ gid = env_cfg.int("home_guild_id", 0)
- # Copy all currently-loaded global commands to each target guild
- for gid in sorted(target_gids):
- g = bot.get_guild(gid)
- if not g:
- print(f"[Slash] Guild {gid}: not in cache; skipping copy/sync.")
- continue
- bot.tree.copy_global_to(guild=g)
- g_cmds = await bot.tree.sync(guild=g)
- names = ", ".join(f"/{c.name}" for c in g_cmds) if g_cmds else "(none)"
- print(f"[Slash] Synced {len(g_cmds)} commands to {g.name} ({g.id}): {names}")
+ if gid > 0:
+ print(f"[Slash] Mode: GUILD-ONLY → {gid}")
- # Now remove global commands so only guild-scoped remain
+ guild_obj = discord.Object(id=gid)
+
+ # Copy all currently-loaded global commands to HOME guild
+ bot.tree.copy_global_to(guild=guild_obj)
+ g_cmds = await bot.tree.sync(guild=guild_obj)
+ g_names = ", ".join(f"/{c.name}" for c in g_cmds) if g_cmds else "(none)"
+ print(f"[Slash] Synced {len(g_cmds)} commands to guild {gid}: {g_names}")
+
+ # Clear global so only guild-scoped remain
bot.tree.clear_commands(guild=None)
- cleared = await bot.tree.sync() # push empty global set (purges old global copies)
+ cleared = await bot.tree.sync() # push empty global set
print(f"[Slash] Cleared global commands (now {len(cleared)}).")
- else:
- print("[Slash] Mode: GLOBAL")
- # Purge any old per-guild copies in target guilds (to avoid dupes),
- # then sync globally once.
- for gid in sorted(target_gids):
- g = bot.get_guild(gid)
- if not g:
- print(f"[Slash] Guild {gid}: not in cache; skip purge.")
- continue
- bot.tree.clear_commands(guild=g)
- await bot.tree.sync(guild=g)
- print(f"[Slash] Purged guild-specific commands in {g.name} ({g.id}).")
+ # Debug: list actual state after sync
+ try:
+ global_cmds = await bot.tree.fetch_commands()
+ print(f"[Slash] Global commands ({len(global_cmds)}): {', '.join(f'/{c.name}' for c in global_cmds) or '(none)'}")
+ except Exception as e:
+ print("[Slash] Failed to fetch global commands:", repr(e))
+ try:
+ g_cmds = await bot.tree.fetch_commands(guild=guild_obj)
+ print(f"[Slash] Guild {gid} commands ({len(g_cmds)}): {', '.join(f'/{c.name}' for c in g_cmds) or '(none)'}")
+ except Exception as e:
+ print(f"[Slash] Failed to fetch commands for guild {gid}:", repr(e))
+ else:
+ print("[Slash] Mode: GLOBAL (HOME_GUILD_ID not set)")
global_cmds = await bot.tree.sync()
names = ", ".join(f"/{c.name}" for c in global_cmds) if global_cmds else "(none)"
print(f"[Slash] Synced {len(global_cmds)} commands globally: {names}")
- # --- Always print what actually exists after sync ---
- def _fmt_cmds(cmds):
- try:
- return ", ".join(f"/{c.name}" for c in cmds) if cmds else "(none)"
- except Exception:
- return "(unreadable)"
-
- # Global list
- try:
- global_cmds = await bot.tree.fetch_commands()
- print(f"[Slash] Global commands ({len(global_cmds)}): {_fmt_cmds(global_cmds)}")
- except Exception as e:
- print("[Slash] Failed to fetch global commands:", repr(e))
-
- # Guild lists
- for gid in sorted(target_gids):
- g = bot.get_guild(gid)
- if not g:
- print(f"[Slash] Guild {gid}: not in cache; cannot fetch commands.")
- continue
- try:
- g_cmds = await bot.tree.fetch_commands(guild=g)
- print(f"[Slash] {g.name} ({g.id}) guild commands ({len(g_cmds)}): {_fmt_cmds(g_cmds)}")
- except Exception as e:
- print(f"[Slash] Failed to fetch commands for guild {gid}:", repr(e))
-
except Exception as e:
print("[Slash] Sync failed:", repr(e))
- # Post boot status message
- try:
- await post_boot_notice(bot)
- except Exception as e:
- print("[BootNotice] failed:", repr(e))
-
# ---------- Auto-discover extensions ----------
modules_path = pathlib.Path(__file__).parent / "modules"
diff --git a/modules/admin/__init__.py b/modules/admin/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/modules/admin/shaiadmin.py b/modules/admin/shaiadmin.py
new file mode 100644
index 0000000..f7c5b03
--- /dev/null
+++ b/modules/admin/shaiadmin.py
@@ -0,0 +1,168 @@
+# modules/admin/shaiadmin.py
+import io
+import json
+from typing import Any, Dict, List
+
+import discord
+from discord import app_commands
+from discord.ext import commands
+
+from modules.common.settings import (
+ cfg, SETTINGS_SCHEMA, settings_path, settings_get_all,
+ settings_set, settings_reset, settings_import_bulk, ValidationError,
+)
+from mod_perms import is_moderator_member # keep if you want mods as managers
+
+def _user_has_role_ids(member: discord.Member, role_ids: List[int]) -> bool:
+ if not isinstance(member, discord.Member) or not role_ids:
+ return False
+ rset = set(role_ids)
+ return any(r.id in rset for r in member.roles)
+
+async def _is_owner(bot: commands.Bot, user: discord.abc.User) -> bool:
+ try:
+ return await bot.is_owner(user)
+ except Exception:
+ return False
+
+def _get_admin_lists(bot: commands.Bot) -> Dict[str, List[int]]:
+ r = cfg(bot)
+ users, roles = [], []
+ try:
+ users = json.loads(r.get("admin_user_ids", "[]"))
+ except Exception:
+ users = []
+ try:
+ roles = json.loads(r.get("admin_role_ids", "[]"))
+ except Exception:
+ roles = []
+ return {"users": users, "roles": roles}
+
+async def _check_admin(inter: discord.Interaction) -> bool:
+ bot: commands.Bot = inter.client # type: ignore
+ user = inter.user
+ if await _is_owner(bot, user):
+ return True
+ if isinstance(user, discord.Member):
+ lists = _get_admin_lists(bot)
+ if user.id in set(lists["users"]):
+ return True
+ if _user_has_role_ids(user, lists["roles"]):
+ return True
+ if is_moderator_member(user, bot): # optional; remove if not desired
+ return True
+ if not inter.response.is_done():
+ await inter.response.send_message("You don’t have permission to use `/shaiadmin`.", ephemeral=True)
+ else:
+ await inter.followup.send("You don’t have permission to use `/shaiadmin`.", ephemeral=True)
+ return False
+
+
+class ShaiAdminCog(commands.Cog):
+ """Runtime settings administration (file-backed)."""
+
+ def __init__(self, bot: commands.Bot):
+ self.bot = bot
+
+ # Guild-only group; prefix description with [ADMIN]
+ shaiadmin = app_commands.Group(
+ name="shaiadmin",
+ description="[ADMIN] Owner/approved-only settings manager.",
+ guild_only=True,
+ )
+
+ # ---- bound coroutine for autocomplete ----
+ async def ac_setting_keys(self, interaction: discord.Interaction, current: str) -> List[app_commands.Choice[str]]:
+ cur = (current or "").lower()
+ keys = [k for k in sorted(SETTINGS_SCHEMA.keys()) if cur in k]
+ return [app_commands.Choice(name=k, value=k) for k in keys[:25]]
+
+ # /shaiadmin set
+ @shaiadmin.command(name="set", description="[ADMIN] Set a setting (validated, persisted, applied).")
+ @app_commands.describe(setting_name="Which setting to change", value="New value (type depends on setting)")
+ @app_commands.autocomplete(setting_name=ac_setting_keys)
+ async def set_value(self, inter: discord.Interaction, setting_name: str, value: str):
+ if not await _check_admin(inter):
+ return
+ await inter.response.defer(ephemeral=True, thinking=True)
+ setting_name = setting_name.lower().strip()
+ try:
+ changed = settings_set(setting_name, value)
+ await inter.followup.send(
+ f"✅ `{setting_name}` updated and applied." if changed else "ℹ️ No change.",
+ ephemeral=True,
+ )
+ except ValidationError as ve:
+ await inter.followup.send(f"❌ {ve}", ephemeral=True)
+ except Exception as e:
+ await inter.followup.send(f"❌ Failed to set `{setting_name}`: {e!r}", ephemeral=True)
+
+ # /shaiadmin unset
+ @shaiadmin.command(name="unset", description="[ADMIN] Reset/unset a setting to its default.")
+ @app_commands.describe(setting_name="Which setting to reset")
+ @app_commands.autocomplete(setting_name=ac_setting_keys)
+ async def unset_value(self, inter: discord.Interaction, setting_name: str):
+ if not await _check_admin(inter):
+ return
+ await inter.response.defer(ephemeral=True, thinking=True)
+ setting_name = setting_name.lower().strip()
+ try:
+ settings_reset(setting_name)
+ await inter.followup.send(f"✅ `{setting_name}` reset to default and applied.", ephemeral=True)
+ except ValidationError as ve:
+ await inter.followup.send(f"❌ {ve}", ephemeral=True)
+ except Exception as e:
+ await inter.followup.send(f"❌ Failed to reset `{setting_name}`: {e!r}", ephemeral=True)
+
+ # /shaiadmin settings (download/upload)
+ settings = app_commands.Group(
+ name="settings",
+ description="[ADMIN] Download or upload the full settings JSON.",
+ parent=shaiadmin,
+ guild_only=True,
+ )
+
+ @settings.command(name="download", description="[ADMIN] Download the current settings.json")
+ async def download(self, inter: discord.Interaction):
+ if not await _check_admin(inter):
+ return
+ await inter.response.defer(ephemeral=True, thinking=True)
+ data = settings_get_all()
+ buf = io.BytesIO(json.dumps(data, indent=2, ensure_ascii=False).encode("utf-8"))
+ buf.seek(0)
+ await inter.followup.send(
+ content=f"📦 Current settings from `{settings_path()}`",
+ file=discord.File(buf, filename="settings.json"),
+ ephemeral=True,
+ )
+
+ @settings.command(name="upload", description="[ADMIN] Upload and apply a settings.json")
+ @app_commands.describe(file="A JSON file exported by /shaiadmin settings download (or matching the schema).")
+ async def upload(self, inter: discord.Interaction, file: discord.Attachment):
+ if not await _check_admin(inter):
+ return
+ await inter.response.defer(ephemeral=True, thinking=True)
+ if not file or not file.filename.lower().endswith(".json"):
+ await inter.followup.send("Please attach a `.json` file.", ephemeral=True)
+ return
+ try:
+ raw = await file.read()
+ obj = json.loads(raw.decode("utf-8"))
+ except Exception:
+ await inter.followup.send("❌ Invalid JSON file.", ephemeral=True)
+ return
+
+ try:
+ changed_keys = settings_import_bulk(obj)
+ await inter.followup.send(
+ f"✅ Uploaded and applied `{len(changed_keys)}` keys: {', '.join(sorted(changed_keys))}."
+ if changed_keys else "ℹ️ No changes detected.",
+ ephemeral=True,
+ )
+ except ValidationError as ve:
+ await inter.followup.send(f"❌ {ve}", ephemeral=True)
+ except Exception as e:
+ await inter.followup.send(f"❌ Upload failed: {e!r}", ephemeral=True)
+
+async def setup(bot: commands.Bot):
+ await bot.add_cog(ShaiAdminCog(bot))
diff --git a/modules/common/boot_notice.py b/modules/common/boot_notice.py
index 29fa1af..94d92e5 100644
--- a/modules/common/boot_notice.py
+++ b/modules/common/boot_notice.py
@@ -1,4 +1,3 @@
-# modules/common/boot_notice.py
import os
import re
import time
@@ -81,18 +80,18 @@ def _parse_repo_url(repo_url: str) -> tuple[str | None, str | None, str | None]:
def _auth_headers_from_cfg(r):
"""
- Build Authorization header using SHAI_REPO_AHTOKEN (cfg: repo_ahtoken).
- Value may be raw; we prefix 'token ' if needed.
- Also supports SHAI_GITEA_TOKEN / SHAI_GITEA_USER as secondary.
+ Build Authorization header using repo auth tokens.
+ - Preferred: cfg('repo_ahtoken') (comes from settings.json or env REPO_AHTOKEN)
+ - Fallbacks: GITEA_TOKEN / GITEA_USER envs (non-SHAI)
"""
- ahtoken = r.get('repo_ahtoken', '').strip() # SHAI_REPO_AHTOKEN
+ ahtoken = r.get('repo_ahtoken', '').strip() # REPO_AHTOKEN via settings/env
if ahtoken:
if not ahtoken.lower().startswith('token '):
ahtoken = f"token {ahtoken}"
return {"Authorization": ahtoken}
- tok = os.getenv("SHAI_GITEA_TOKEN", "").strip()
- usr = os.getenv("SHAI_GITEA_USER", "").strip()
+ tok = os.getenv("GITEA_TOKEN", "").strip()
+ usr = os.getenv("GITEA_USER", "").strip()
if tok and usr:
import base64
b64 = base64.b64encode(f"{usr}:{tok}".encode()).decode()
@@ -102,6 +101,7 @@ def _auth_headers_from_cfg(r):
return {}
async def _http_json(url: str, headers: dict, timeout_sec: int = 10):
+ import aiohttp
timeout = aiohttp.ClientTimeout(total=timeout_sec)
async with aiohttp.ClientSession(timeout=timeout, headers=headers or {}) as sess:
async with sess.get(url) as resp:
@@ -121,6 +121,7 @@ async def _fetch_latest_commit(api_base: str, owner: str, repo: str, branch: str
/api/v1/repos/{owner}/{repo}/commits?sha=main&stat=false&verification=false&files=false&limit=1
If branch is falsy, omit 'sha' to use server default.
"""
+ from urllib.parse import urlencode
params = {
"stat": "false",
"verification": "false",
@@ -181,7 +182,6 @@ async def post_boot_notice(bot):
except Exception as e:
print(f"[boot_notice] wait_until_ready failed: {e}")
-
for guild in bot.guilds:
print(f' - {guild.name} (id: {guild.id})')
@@ -201,9 +201,9 @@ async def post_boot_notice(bot):
print(f"[boot_notice] channel id {modlog_channel_id} not found; skipping.")
return
- repo_url = r.get('repo_url', '') # SHAI_REPO_URL
- branch = r.get('repo_branch', 'main') or None # SHAI_REPO_BRANCH (optional)
- check_time_utc = r.get('check_time_utc', '') # SHAI_CHECK_TIME_UTC (optional)
+ repo_url = r.get('repo_url', '')
+ branch = r.get('repo_branch', 'main') or None
+ check_time_utc = r.get('check_time_utc', '')
headers = _auth_headers_from_cfg(r)
api_base = owner = repo = None
@@ -254,7 +254,7 @@ async def post_boot_notice(bot):
# Build + post status line
status_line = _format_status_line(reason, prev_ver, curr_ver)
- # NEW: If no version change (manual/scheduled), append the running version to the status line,
+ # If no version change (manual/scheduled), append the running version to the status line,
# and DO NOT post the commit message separately.
append_version_only = reason in ("manual", "scheduled")
if append_version_only and curr_ver:
diff --git a/modules/common/settings.py b/modules/common/settings.py
index df2df78..c58ffe5 100644
--- a/modules/common/settings.py
+++ b/modules/common/settings.py
@@ -1,68 +1,453 @@
# modules/common/settings.py
import os
-from typing import Any, Dict, Iterable, Optional
+import json
+import shutil
+import threading
+import re
+from datetime import datetime
+from urllib.parse import urlparse
+from typing import Any, Dict, Iterable, Optional, List
+# =========================
+# Public API Exceptions
+# =========================
+
+class ValidationError(Exception):
+ pass
+
+
+# =========================
+# Helpers
+# =========================
def _clean(s: Optional[str]) -> str:
s = (s or "").strip()
- # strip accidental quotes Portainer sometimes adds
if (s.startswith('"') and s.endswith('"')) or (s.startswith("'") and s.endswith("'")):
s = s[1:-1].strip()
return s
+def _is_intish(x: Any) -> bool:
+ try:
+ int(str(x).strip())
+ return True
+ except Exception:
+ return False
-def _collect_shai_env() -> Dict[str, str]:
- """
- Build a {key_without_prefix_lower: cleaned_value} mapping
- from all environment variables that start with SHAI_.
- """
- out: Dict[str, str] = {}
- for k, v in os.environ.items():
- if not k.startswith("SHAI_"):
- continue
- key = k[5:].lower() # SHAI_MOD_CHANNEL_ID -> mod_channel_id
- out[key] = _clean(v)
- return out
+def _to_bool(x: Any) -> bool:
+ s = str(x).strip().lower()
+ if s in ("1", "true", "yes", "on", "y", "t"):
+ return True
+ if s in ("0", "false", "no", "off", "n", "f"):
+ return False
+ raise ValidationError(f"Expected a boolean, got {x!r}")
+def _to_int(x: Any) -> int:
+ if _is_intish(x):
+ return int(str(x).strip())
+ raise ValidationError(f"Expected an integer, got {x!r}")
+
+def _to_float(x: Any) -> float:
+ try:
+ return float(str(x).strip())
+ except Exception:
+ raise ValidationError(f"Expected a float, got {x!r}")
+
+def _to_str(x: Any) -> str:
+ return str(x)
+
+def _to_list_int(x: Any) -> List[int]:
+ if isinstance(x, list):
+ out = []
+ for v in x:
+ if not _is_intish(v):
+ raise ValidationError(f"List must contain integers; got {v!r}")
+ out.append(int(v))
+ return out
+ if isinstance(x, str):
+ toks = [t.strip() for t in x.split(",") if t.strip()]
+ try:
+ return [int(t) for t in toks]
+ except Exception:
+ raise ValidationError(f"Could not parse list of integers from {x!r}")
+ raise ValidationError(f"Expected a list of integers, got {type(x).__name__}")
+
+# ---- URL / Time / Date validators ----
+
+def _to_url(x: Any) -> str:
+ s = str(x).strip()
+ if not s:
+ raise ValidationError("URL cannot be empty")
+ p = urlparse(s)
+ if p.scheme not in ("http", "https"):
+ raise ValidationError("URL must start with http:// or https://")
+ if not p.netloc:
+ raise ValidationError("URL missing host")
+ if not re.match(r"^([A-Za-z0-9\-.]+|\d{1,3}(?:\.\d{1,3}){3}|localhost)(:\d+)?$", p.netloc):
+ raise ValidationError("URL host looks invalid")
+ return s
+
+_TIME_24H_RE = re.compile(r"^(?P[01]?\d|2[0-3]):(?P[0-5]\d)(?::(?P[0-5]\d))?$")
+
+def _to_time_24h(x: Any) -> str:
+ s = str(x).strip()
+ m = _TIME_24H_RE.match(s)
+ if not m:
+ raise ValidationError("Time must be HH:MM or HH:MM:SS (24-hour)")
+ hh = int(m.group("h"))
+ mm = int(m.group("m"))
+ # canonical store as HH:MM
+ return f"{hh:02d}:{mm:02d}"
+
+def _expand_two_digit_year(two_digit: int, pivot_year: int) -> int:
+ # Map YY -> same century as pivot_year
+ base = pivot_year - (pivot_year % 100)
+ return base + two_digit
+
+def _to_date_ymd(x: Any, *, pivot_year: int | None = None) -> str:
+ s = str(x).strip()
+ if not s:
+ raise ValidationError("Date cannot be empty")
+ m = re.match(r"^(?P\d{2}|\d{4})-(?P\d{1,2})-(?P\d{1,2})$", s)
+ if not m:
+ raise ValidationError("Expected date format YYYY-MM-DD or YY-MM-DD")
+ y = m.group("y")
+ month = int(m.group("m"))
+ day = int(m.group("d"))
+ if len(y) == 2:
+ yy = int(y)
+ if pivot_year is not None:
+ year = _expand_two_digit_year(yy, pivot_year)
+ dt = datetime(year, month, day)
+ return dt.strftime("%Y-%m-%d")
+ else:
+ dt = datetime.strptime(s, "%y-%m-%d")
+ return dt.strftime("%Y-%m-%d")
+ year = int(y)
+ dt = datetime(year, month, day)
+ return dt.strftime("%Y-%m-%d")
+
+def _to_date_dmy(x: Any, *, pivot_year: int | None = None) -> str:
+ s = str(x).strip()
+ if not s:
+ raise ValidationError("Date cannot be empty")
+ m = re.match(r"^(?P\d{1,2})-(?P\d{1,2})-(?P\d{2}|\d{4})$", s)
+ if not m:
+ raise ValidationError("Expected date format DD-MM-YYYY or DD-MM-YY")
+ day = int(m.group("d"))
+ month = int(m.group("m"))
+ y = m.group("y")
+ if len(y) == 2:
+ yy = int(y)
+ if pivot_year is not None:
+ year = _expand_two_digit_year(yy, pivot_year)
+ dt = datetime(year, month, day)
+ return dt.strftime("%d-%m-%Y")
+ else:
+ dt = datetime.strptime(s, "%d-%m-%y")
+ return dt.strftime("%d-%m-%Y")
+ year = int(y)
+ dt = datetime(year, month, day)
+ return dt.strftime("%d-%m-%Y")
+
+
+# =========================
+# Schema (non-sensitive, front-end editable)
+# =========================
+
+SETTINGS_SCHEMA: Dict[str, Dict[str, Any]] = {
+ # Channels (IDs)
+ "mod_channel_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Moderation command channel."},
+ "modlog_channel_id": {"type": "int", "default": 0, "nonzero": True, "desc": "ModLog channel."},
+ "pirates_list_channel_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Pirates list channel."},
+ "dd_channel_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Deep Desert updates channel."},
+ "report_channel_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Reports/approvals channel."},
+ "userslist_channel_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Users list channel."},
+ "trigger_channel_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Trigger channel for Auto VC."},
+
+ # Roles (IDs)
+ "rules_role_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Rules-agreed role ID."},
+ "moderator_role_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Moderator role ID."},
+ "intel_mod_role_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Intel mod role ID."},
+ "full_access_role_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Full Access role ID."},
+ "field_mod_role_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Field mod role ID."},
+ "engagement_role_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Engagement role ID."},
+ "admin_role_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Admin role ID."},
+
+ # Message IDs
+ "rules_message_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Rules message ID."},
+ "engagement_message_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Engagement message ID."},
+ "nickname_message_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Nickname message ID."},
+
+ # Emojis (IDs)
+ "emoji_carrier_crawler_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Custom emoji: carrier/crawler."},
+ "emoji_melange_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Custom emoji: melange."},
+ "emoji_sand_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Custom emoji: sand."},
+ "emoji_harvester_crew": {"type": "int", "default": 0, "nonzero": True, "desc": "Custom emoji: harvester crew"},
+ "emoji_escort_crew": {"type": "int", "default": 0, "nonzero": True, "desc": "Custom emoji: escort crew"},
+ "emoji_fedaykin": {"type": "int", "default": 0, "nonzero": True, "desc": "Custom emoji: fedaykin - kill squad"},
+
+ # Auto-VC
+ "auto_vc_category_id": {"type": "int", "default": 0, "nonzero": True, "desc": "Category to host Auto-VCs."},
+ "auto_vc_cleanup_delay": {"type": "int", "default": 30, "nonzero": True, "desc": "Seconds before empty Auto-VCs are cleaned up."},
+ "vc_name_prefix": {"type": "str", "default": "DD Crew", "desc": "Auto-VC name prefix."},
+
+ # Pirates / threat model
+ "threat_group_threshold": {"type": "int", "default": 3, "desc": "Threshold for group classification."},
+ "threat_min_samples_for_stats": {"type": "int", "default": 3, "desc": "Min samples for stats."},
+ "threat_w_destruction": {"type": "float", "default": 0.40, "desc": "Weight: destruction."},
+ "threat_w_group": {"type": "float", "default": 0.20, "desc": "Weight: group."},
+ "threat_w_kill": {"type": "float", "default": 0.30, "desc": "Weight: kill."},
+ "threat_w_skill": {"type": "float", "default": 0.10, "desc": "Weight: skill."},
+
+ # SpicePay
+ "spicepay_base_weight": {"type": "float", "default": 25.0, "desc": "Base weight."},
+ "spicepay_carrier_bonus": {"type": "float", "default": 12.5, "desc": "Carrier bonus."},
+ "spicepay_crawler_bonus": {"type": "float", "default": 12.5, "desc": "Crawler bonus."},
+ "spicepay_lsr_cut_percent": {"type": "float", "default": 10.0, "desc": "SR cut percent."},
+
+ # Jobs / loops
+ "user_cards_cron_enabled": {"type": "bool", "default": True, "desc": "Enable user-cards cron."},
+ "nick_nudge_loop_enabled": {"type": "bool", "default": False, "desc": "Enable nick-nudge loop."},
+
+ # Deep Desert fetcher
+ "dd_fetcher": {"type": "str", "default": "playwright", "allowed": ["playwright","requests"], "desc": "Fetcher backend."},
+ "dd_pw_timeout_ms": {"type": "int", "default": 60000, "desc": "Playwright timeout (ms)."},
+ "dd_pw_wait_ms": {"type": "int", "default": 0, "desc": "Extra wait after navigation (ms)."},
+
+ # Repo (non-secret)
+ "repo_url": {"type": "url",
+ "default": "https://git.rolfsvaag.no/frarol96/shaiwatcher",
+ "desc": "Repository URL."},
+ "repo_branch": {"type": "str", "default": "main", "desc": "Repository branch."},
+ "repo_rss": {"type": "url",
+ "default": "https://git.rolfsvaag.no/frarol96/shaiwatcher.rss",
+ "desc": "Repository RSS feed."},
+
+ # Admin allow-list for /shaiadmin (besides owner)
+ "admin_user_ids": {"type": "list[int]", "default": [], "desc": "User IDs allowed to use /shaiadmin."},
+ "admin_role_ids": {"type": "list[int]", "default": [], "desc": "Role IDs allowed to use /shaiadmin."},
+
+ # Misc
+ "check_time_utc": {"type": "time_24h", "default": "03:00", "desc": "Daily check time (UTC HH:MM)"},
+ "ignore_test_level": {"type": "int", "default": 0, "desc": "Test-level ignore flag."},
+ "lang": {"type": "str", "default": "C.UTF-8", "desc": "Locale (if referenced)."},
+ # Examples of date keys you may enable later:
+ # "feature_window_start": {"type": "date_ymd", "default": "", "allow_empty": True, "pivot_year": 2000, "desc": "Start date (YYYY-MM-DD or YY-MM-DD)."},
+ # "event_date_dmy": {"type": "date_dmy", "default": "", "allow_empty": True, "pivot_year": 2000, "desc": "Event date (DD-MM-YYYY or DD-MM-YY)."},
+}
+
+# =========================
+# Env — ONLY the allowed set (plus legacy HOME_GUILD_ID alias)
+# =========================
+
+def _allowed_env_map() -> Dict[str, str]:
+ env: Dict[str, str] = {}
+ if os.getenv("DISCORD_TOKEN"):
+ env["discord_token"] = _clean(os.getenv("DISCORD_TOKEN"))
+ data_file = os.getenv("DATA_FILE")
+ if data_file:
+ env["data_file"] = _clean(data_file)
+ if os.getenv("DOCS_HOST_IP"):
+ env["docs_host_ip"] = _clean(os.getenv("DOCS_HOST_IP"))
+ if os.getenv("DOCS_HOST_PORT"):
+ env["docs_host_port"] = _clean(os.getenv("DOCS_HOST_PORT"))
+ if os.getenv("HOME_GUILD_ID"):
+ env["home_guild_id"] = _clean(os.getenv("HOME_GUILD_ID"))
+ if os.getenv("REPO_AHTOKEN"):
+ env["repo_ahtoken"] = _clean(os.getenv("REPO_AHTOKEN"))
+ return env
+
+# =========================
+# On-disk store + globals
+# =========================
+
+_SETTINGS_LOCK = threading.Lock()
+_FILE_MAP: Dict[str, Any] = {}
+_ENV_MAP: Dict[str, str] = {}
+_SETTINGS_FILE: Optional[str] = None
+
+def settings_path() -> str:
+ """Place settings.json next to DATA_FILE if available; otherwise default to ./data/settings.json."""
+ data_file = os.getenv("DATA_FILE")
+ if data_file:
+ base = os.path.dirname(data_file) or "."
+ return os.path.join(base, "settings.json")
+ return "./data/settings.json"
+
+def _ensure_loaded():
+ global _SETTINGS_FILE, _FILE_MAP, _ENV_MAP
+ with _SETTINGS_LOCK:
+ if _SETTINGS_FILE is not None:
+ return
+ _SETTINGS_FILE = settings_path()
+ _ENV_MAP = _allowed_env_map()
+
+ if os.path.exists(_SETTINGS_FILE):
+ try:
+ with open(_SETTINGS_FILE, "r", encoding="utf-8") as f:
+ _FILE_MAP = json.load(f) or {}
+ except Exception:
+ _FILE_MAP = {}
+ else:
+ _FILE_MAP = {}
+ _save_locked()
+
+ changed = False
+ for key, meta in SETTINGS_SCHEMA.items():
+ if key not in _FILE_MAP:
+ _FILE_MAP[key] = meta.get("default")
+ changed = True
+ if changed:
+ _save_locked()
+
+def _save_locked():
+ global _SETTINGS_FILE, _FILE_MAP
+ path = _SETTINGS_FILE or settings_path()
+ os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
+ tmp = path + ".tmp"
+ with open(tmp, "w", encoding="utf-8") as f:
+ json.dump(_FILE_MAP, f, indent=2, ensure_ascii=False)
+ if os.path.exists(path):
+ try:
+ shutil.copy2(path, path + ".bak")
+ except Exception:
+ pass
+ os.replace(tmp, path)
+
+def settings_get_all() -> Dict[str, Any]:
+ _ensure_loaded()
+ with _SETTINGS_LOCK:
+ return dict(_FILE_MAP)
+
+def _cast_value(name: str, raw: Any, *, enforce_nonzero: bool = True) -> Any:
+ meta = SETTINGS_SCHEMA.get(name)
+ if not meta:
+ raise ValidationError(f"Unknown setting: {name}")
+
+ allow_empty = bool(meta.get("allow_empty", False))
+ t = meta.get("type")
+
+ if t in ("str", "url", "time_24h", "date_ymd", "date_dmy"):
+ s = str(raw).strip()
+ if s == "" and allow_empty:
+ val = ""
+ else:
+ if t == "str":
+ val = _to_str(raw)
+ elif t == "url":
+ val = _to_url(raw)
+ elif t == "time_24h":
+ val = _to_time_24h(raw)
+ elif t == "date_ymd":
+ val = _to_date_ymd(raw, pivot_year=meta.get("pivot_year"))
+ elif t == "date_dmy":
+ val = _to_date_dmy(raw, pivot_year=meta.get("pivot_year"))
+ elif t == "bool":
+ val = _to_bool(raw)
+ elif t == "int":
+ val = _to_int(raw)
+ elif t == "float":
+ val = _to_float(raw)
+ elif t == "list[int]":
+ val = _to_list_int(raw)
+ else:
+ raise ValidationError(f"Unsupported type for {name}: {t}")
+
+ # enum constraint (always enforced)
+ if "allowed" in meta:
+ allowed = meta["allowed"]
+ if val not in allowed:
+ raise ValidationError(f"`{name}` must be one of {allowed}, got {val!r}")
+
+ # nonzero constraint (skippable for bulk uploads)
+ if enforce_nonzero and meta.get("nonzero") and isinstance(val, int) and val == 0:
+ raise ValidationError(f"`{name}` must be a non-zero integer.")
+
+ return val
+
+def settings_set(name: str, raw_value: Any) -> bool:
+ _ensure_loaded()
+ with _SETTINGS_LOCK:
+ name = name.lower().strip()
+ if name not in SETTINGS_SCHEMA:
+ raise ValidationError(f"Unknown setting: {name}")
+ new_val = _cast_value(name, raw_value)
+ old_val = _FILE_MAP.get(name, SETTINGS_SCHEMA[name].get("default"))
+ if old_val == new_val:
+ return False
+ _FILE_MAP[name] = new_val
+ _save_locked()
+ return True
+
+def settings_reset(name: str) -> None:
+ _ensure_loaded()
+ with _SETTINGS_LOCK:
+ name = name.lower().strip()
+ if name not in SETTINGS_SCHEMA:
+ raise ValidationError(f"Unknown setting: {name}")
+ _FILE_MAP[name] = SETTINGS_SCHEMA[name].get("default")
+ _save_locked()
+
+def settings_import_bulk(obj: Dict[str, Any]) -> List[str]:
+ _ensure_loaded()
+ if not isinstance(obj, dict):
+ raise ValidationError("Uploaded JSON must be an object/dict at the top level.")
+ new_map: Dict[str, Any] = dict(_FILE_MAP)
+ changed: List[str] = []
+ for k, v in obj.items():
+ if k not in SETTINGS_SCHEMA:
+ raise ValidationError(f"Unknown setting in upload: {k}")
+ # Allow 0 for keys marked nonzero during bulk import (treating as 'unset' sentinel)
+ new_val = _cast_value(k, v, enforce_nonzero=False)
+ if new_map.get(k) != new_val:
+ new_map[k] = new_val
+ changed.append(k)
+ with _SETTINGS_LOCK:
+ if changed:
+ _FILE_MAP.update({k: new_map[k] for k in changed})
+ _save_locked()
+ return changed
+
+# =========================
+# Unified read view (keeps cfg(bot) contract)
+# =========================
class ConfigView:
"""
- Unified config view.
- - Primary: SHAI_* envs (prefix removed, lowercased keys)
- - Secondary: bot.config['DEFAULT'] (if present)
- - Helpers: get/int/bool/float/list
- - Can mirror values back into os.environ as SHAI_* (opt-in)
+ Reads:
+ - Schema-managed keys from settings.json
+ - Env: discord_token, data_file, docs_host_ip, docs_host_port, home_guild_id
+ - Fallback to bot.config['DEFAULT'] for anything else (legacy)
+ Helpers: get/int/bool/float/list, to_dict()
"""
- def __init__(self, bot=None, *, mirror_to_env: bool = False):
- self._env_map = _collect_shai_env()
-
- # Optional: also look into bot.config['DEFAULT'] as a fallback
- self._default: Dict[str, Any] = {}
+ def __init__(self, bot=None):
+ _ensure_loaded()
+ self._env_map = dict(_ENV_MAP)
try:
self._default = (getattr(bot, "config", {}) or {}).get("DEFAULT", {}) or {}
except Exception:
self._default = {}
- if mirror_to_env:
- # Ensure os.environ has SHAI_* for everything we know (don’t clobber existing non-empty)
- for k, v in self._env_map.items():
- env_key = f"SHAI_{k.upper()}"
- if not os.environ.get(env_key):
- os.environ[env_key] = v
+ def _effective_map(self) -> Dict[str, str]:
+ merged: Dict[str, str] = {}
+ # defaults first
+ for k in getattr(self._default, "keys", lambda: [])():
+ merged[k] = _clean(str(self._default.get(k, "")))
+ # env overlay
+ for k, v in self._env_map.items():
+ merged[k] = _clean(v)
+ # schema values overlay defaults
+ for k, meta in SETTINGS_SCHEMA.items():
+ v = _FILE_MAP.get(k, meta.get("default"))
+ if isinstance(v, (list, dict)):
+ merged[k] = json.dumps(v, ensure_ascii=False)
+ else:
+ merged[k] = _clean(str(v))
+ return merged
- # ---- core accessors ----
def get(self, key: str, default: str = "") -> str:
- key = key.lower()
- if key in self._env_map:
- v = _clean(self._env_map[key])
- return v if v != "" else default
-
- # Fallback to DEFAULT mapping (ConfigParser-like or our shim)
- try:
- v = self._default.get(key, "")
- except Exception:
- v = ""
- v = _clean(str(v))
+ m = self._effective_map()
+ v = _clean(m.get(key.lower(), ""))
return v if v != "" else default
def int(self, key: str, default: int = 0) -> int:
@@ -97,23 +482,9 @@ class ConfigView:
parts = [p.strip() for p in s.split(sep)]
return [p for p in parts if p]
- # expose the resolved map if you ever want to dump it for debug
def to_dict(self) -> Dict[str, str]:
- d = dict(self._env_map)
- # Include defaults that aren’t already in env_map
- for k in getattr(self._default, "keys", lambda: [])():
- d.setdefault(k, _clean(str(self._default.get(k, ""))))
- return d
+ return dict(self._effective_map())
-def cfg(bot=None, *, mirror_to_env: bool = False) -> ConfigView:
- """
- Usage in cogs:
- r = cfg(bot)
- trigger_id = r.int('trigger_channel_id', 0)
- prefix = r.get('vc_name_prefix', 'Room')
-
- If you want to also ensure SHAI_* are present in os.environ at runtime:
- r = cfg(bot, mirror_to_env=True)
- """
- return ConfigView(bot, mirror_to_env=mirror_to_env)
+def cfg(bot=None) -> ConfigView:
+ return ConfigView(bot)