# modules/docs_site/docs_site.py import json import threading import traceback import os import re import time import importlib from pathlib import Path from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse from typing import Any, Dict, List, Optional, Tuple import discord from discord.ext import commands from discord import app_commands from modules.common.settings import cfg _START_TS = time.time() # ============================= # Safe JSON helpers # ============================= _PRIMITIVES = (str, int, float, bool, type(None)) def _to_primitive(obj: Any, depth: int = 0) -> Any: if depth > 6: return str(obj) if isinstance(obj, _PRIMITIVES): return obj if isinstance(obj, (list, tuple, set)): return [_to_primitive(x, depth + 1) for x in obj] if isinstance(obj, dict): out = {} for k, v in obj.items(): try: out[str(k)] = _to_primitive(v, depth + 1) except Exception: out[str(k)] = str(v) return out if hasattr(obj, "value"): try: return _to_primitive(getattr(obj, "value"), depth + 1) except Exception: pass return str(obj) def _json_dumps_safe(payload: Any) -> bytes: return json.dumps(_to_primitive(payload), ensure_ascii=False, separators=(",", ":")).encode("utf-8") # ============================= # Version / uptime helpers # ============================= def _project_root() -> Path: return Path(__file__).resolve().parents[2] def _read_version_from_file() -> Optional[str]: """Strictly parse VERSION from bot.py: VERSION = "x.y.z".""" try: bot_py = _project_root() / "bot.py" if not bot_py.is_file(): return None text = bot_py.read_text(encoding="utf-8", errors="ignore") m = re.search(r'^\s*VERSION\s*=\s*["\']([^"\']+)["\']', text, re.M) if m: return m.group(1) except Exception: traceback.print_exc() return None def _get_version_from_botpy() -> Optional[str]: try: m = importlib.import_module("bot") v = getattr(m, "VERSION", None) if isinstance(v, (str, int, float)): return str(v) except Exception: pass return None def _get_boot_state(bot: commands.Bot) -> Dict[str, Any]: dm = getattr(bot, "data_manager", None) if dm and dm.get("boot_state"): try: return (dm.get("boot_state") or [{}])[-1] except Exception: return {} return {} def _status_payload(bot: commands.Bot) -> Dict[str, Any]: st = _get_boot_state(bot) now = time.time() boot_ts = float(st.get("last_boot_ts", 0.0)) if st else 0.0 if boot_ts > 0: uptime = max(0, int(now - boot_ts)) else: uptime = max(0, int(now - _START_TS)) ver = st.get("last_version") if st else None if not ver: ver = _read_version_from_file() or _get_version_from_botpy() or os.getenv("SHAI_VERSION", "unknown") return {"uptime_seconds": uptime, "version": str(ver)} # ============================= # Mod detection (heuristics) # ============================= def _looks_like_mod_check(fn) -> bool: try: qn = getattr(fn, "__qualname__", "") or "" mod_names = ( "is_moderator_member", "is_moderator_userid", "require_mod_ctx", "require_mod_interaction", ) if any(m in qn for m in mod_names): return True mod = getattr(fn, "__module__", "") or "" if "mod_perms" in mod: return True except Exception: pass return False def _is_perm_check(fn) -> Tuple[bool, List[str]]: perms = [] try: code = getattr(fn, "__code__", None) if code and code.co_freevars and getattr(fn, "__closure__", None): for cell in fn.__closure__ or []: val = getattr(cell, "cell_contents", None) if isinstance(val, dict): for k, v in val.items(): if v: perms.append(str(k)) except Exception: pass return (len(perms) > 0), perms # ============================= # Introspection # ============================= def _bot_prefix(bot: commands.Bot) -> str: p = getattr(bot, "command_prefix", "!") try: if callable(p): val = p(bot, None) if isinstance(val, (list, tuple)) and val: return str(val[0]) return str(val) return str(p) except Exception: return "!" def _is_mod_command_prefix(cmd: commands.Command) -> Tuple[bool, List[str]]: is_mod = False perms: List[str] = [] try: for chk in getattr(cmd, "checks", []) or []: if _looks_like_mod_check(chk): is_mod = True p_flag, p_list = _is_perm_check(chk) if p_flag: perms.extend(p_list) except Exception: pass return is_mod, sorted(set(perms)) def _is_mod_command_slash(cmd: app_commands.Command) -> Tuple[bool, List[str]]: is_mod = False try: for chk in getattr(cmd, "checks", []) or []: if _looks_like_mod_check(chk): is_mod = True for chk in getattr(cmd, "_checks", []) or []: if _looks_like_mod_check(chk): is_mod = True except Exception: pass return is_mod, [] def _command_usage_prefix(cmd: commands.Command, prefix: str) -> str: if cmd.usage: return cmd.usage try: params = [] for k, p in (cmd.clean_params or {}).items(): if getattr(p, "kind", None) and str(p.kind).lower().startswith("var"): params.append(f"[{k}...]") elif p.default is p.empty: params.append(f"<{k}>") else: params.append(f"[{k}]") return f"{prefix}{cmd.qualified_name}" + ((" " + " ".join(params)) if params else "") except Exception: return f"{prefix}{cmd.name}" def _command_usage_slash_like(cmd_name: str, options: Optional[List[Any]] = None) -> str: try: parts = [f"/{cmd_name}"] opts = [] seq = options or [] for opt in seq: n = getattr(opt, "name", None) or getattr(opt, "display_name", None) or "arg" req = bool(getattr(opt, "required", False)) opts.append(f"<{n}>" if req else f"[{n}]") if opts: parts.append(" " + " ".join(opts)) return "".join(parts) except Exception: return f"/{cmd_name}" def _command_usage_slash(cmd: app_commands.Command) -> str: try: options = getattr(cmd, "options", None) or getattr(cmd, "parameters", None) or getattr(cmd, "_params", None) return _command_usage_slash_like((cmd.name or "").replace("/", " "), options) except Exception: return f"/{cmd.name}" def _iter_all_app_commands(bot: commands.Bot): """Yield (scope_tag, top_level_command) including global and per-guild trees.""" out = [] # Global try: for cmd in bot.tree.get_commands(): out.append(("", cmd)) except Exception: pass # Per-guild for g in list(getattr(bot, "guilds", []) or []): try: cmds = bot.tree.get_commands(guild=g) except TypeError: try: cmds = bot.tree.get_commands(guild=discord.Object(id=g.id)) except Exception: cmds = [] except Exception: cmds = [] for cmd in cmds or []: out.append((str(g.id), cmd)) return out def _gather_prefix_and_hybrid(bot: commands.Bot) -> List[Dict[str, Any]]: rows: List[Dict[str, Any]] = [] seen = set() prefix = _bot_prefix(bot) for cmd in getattr(bot, "commands", []) or []: try: if getattr(cmd, "hidden", False): continue is_hybrid = isinstance(cmd, commands.HybridCommand) ctype = "hybrid" if is_hybrid else "prefix" is_mod, perms = _is_mod_command_prefix(cmd) usage_slash = None if is_hybrid: try: app_cmd = getattr(cmd, "app_command", None) if isinstance(app_cmd, app_commands.Command): usage_slash = _command_usage_slash(app_cmd) s_mod, _ = _is_mod_command_slash(app_cmd) if s_mod: is_mod = True desc = (getattr(app_cmd, "description", "") or "") if "[mod]" in desc.lower(): is_mod = True except Exception: pass usage_prefix = _command_usage_prefix(cmd, prefix) row = { "type": ctype, "name": cmd.qualified_name, "display_name": cmd.qualified_name, # bare name "help": (cmd.help or "").strip(), "brief": (cmd.brief or "").strip(), "usage": usage_prefix if not is_hybrid else None, "usage_prefix": usage_prefix, "usage_slash": usage_slash, "cog": getattr(cmd.cog, "qualified_name", None) if getattr(cmd, "cog", None) else None, "module": getattr(getattr(cmd, "callback", None), "__module__", None), "moderator_only": bool(is_mod), "required_permissions": perms, } key = ("px", row["name"]) if key not in seen: rows.append(row) seen.add(key) except Exception: traceback.print_exc() continue return rows def _walk_app_tree(node: Any, prefix: str = "") -> List[Tuple[str, app_commands.Command]]: out: List[Tuple[str, app_commands.Command]] = [] if isinstance(node, app_commands.Command): out.append((f"{prefix}/{node.name}", node)) return out if isinstance(node, app_commands.Group): base = f"{prefix}/{node.name}" for sub in list(getattr(node, "commands", []) or []): out.extend(_walk_app_tree(sub, base)) return out def _safe_extras(obj: Any) -> Optional[Dict[str, Any]]: d = getattr(obj, "extras", None) if not d: return None if not isinstance(d, dict): return {"value": _to_primitive(d)} return _to_primitive(d) def _gather_slash(bot: commands.Bot) -> List[Dict[str, Any]]: rows: List[Dict[str, Any]] = [] collected: List[Tuple[str, app_commands.Command, str]] = [] try: for scope, top in _iter_all_app_commands(bot): for path, leaf in _walk_app_tree(top, prefix=""): collected.append((scope, leaf, path)) except Exception: traceback.print_exc() seen_paths = set() for scope, leaf, path in collected: try: canon = path.lstrip("/") # power/restart if canon in seen_paths: continue seen_paths.add(canon) is_mod, perms = _is_mod_command_slash(leaf) binding = getattr(leaf, "binding", None) callback = getattr(leaf, "callback", None) display = canon.replace("/", " ") options = getattr(leaf, "options", None) or getattr(leaf, "parameters", None) or getattr(leaf, "_params", None) usage_full = _command_usage_slash_like(display, options) row = { "type": "slash", "name": "/" + canon, "display_name": "/" + display, "help": (getattr(leaf, "description", "") or "").strip(), "brief": "", "usage": usage_full, "usage_prefix": None, "usage_slash": usage_full, "cog": binding.__class__.__name__ if binding else None, "module": getattr(callback, "__module__", None) if callback else None, "moderator_only": bool(is_mod), "required_permissions": perms, "extras": _safe_extras(leaf), "dm_permission": getattr(leaf, "dm_permission", None), } rows.append(row) except Exception: traceback.print_exc() continue return rows # ============================= # Details loader & master JSON # ============================= _DETAILS_DIR = _project_root() / "assets" / "docs" / "commands" _MASTER_PATH = _DETAILS_DIR / "__commands__.json" _DOCS_CACHE: Dict[str, Any] = {"map": {}, "sig": ""} def _row_key_candidates(row: Dict[str, Any]) -> List[str]: c = str(row.get("cog") or "").strip() if row.get("type") == "slash": base = str(row.get("name", "")).lstrip("/").split("/")[-1] else: base = str(row.get("name", "")).split(" ")[0] keys = [] if c: keys.append(f"{c}.{base}") if row.get("type") == "slash": keys.append(str(row.get("name", "")).lstrip("/")) keys.append(base) return keys def _scan_md_files() -> Dict[str, Dict[str, Any]]: out: Dict[str, Dict[str, Any]] = {} if not _DETAILS_DIR.is_dir(): return out for p in _DETAILS_DIR.glob("*.md"): if p.name.startswith("_"): continue try: out[p.stem] = {"details_md": p.read_text(encoding="utf-8")} except Exception: traceback.print_exc() continue return out def _load_master_json() -> Dict[str, Dict[str, Any]]: if not _MASTER_PATH.is_file(): return {} try: raw_text = _MASTER_PATH.read_text(encoding="utf-8") if not raw_text.strip(): return {} raw_any = json.loads(raw_text) or {} out: Dict[str, Dict[str, Any]] = {} for k, v in raw_any.items(): if isinstance(v, dict): out[str(k)] = {kk: vv for kk, vv in v.items()} return out except Exception: traceback.print_exc() return {} def _write_master_json(mapping: Dict[str, Dict[str, Any]]) -> None: try: _DETAILS_DIR.mkdir(parents=True, exist_ok=True) with _MASTER_PATH.open("w", encoding="utf-8") as f: json.dump(mapping, f, ensure_ascii=False, indent=2) except Exception: traceback.print_exc() def _dir_signature() -> str: parts = [] if _DETAILS_DIR.is_dir(): for p in sorted(_DETAILS_DIR.glob("*")): try: parts.append(f"{p.name}:{int(p.stat().st_mtime)}") except Exception: continue try: if _MASTER_PATH.exists(): parts.append(f"__master__:{int(_MASTER_PATH.stat().st_mtime)}") except Exception: pass return "|".join(parts) def _load_external_docs() -> Dict[str, Dict[str, Any]]: sig = _dir_signature() if _DOCS_CACHE.get("sig") == sig: return _DOCS_CACHE["map"] master = _load_master_json() md_map = _scan_md_files() merged: Dict[str, Dict[str, Any]] = {k: dict(v) for k, v in master.items()} for k, v in md_map.items(): if k not in merged: merged[k] = {} merged[k]["details_md"] = v.get("details_md", "") _write_master_json(merged) _DOCS_CACHE["map"] = merged _DOCS_CACHE["sig"] = sig return merged def _augment_with_external_docs(rows: List[Dict[str, Any]]) -> None: mapping = _load_external_docs() for r in rows: if not r.get("details_md") and isinstance(r.get("extras"), dict): dm = r["extras"].get("details_md") if isinstance(dm, str) and dm.strip(): r["details_md"] = dm if not r.get("details_md"): for key in _row_key_candidates(r): if key in mapping and isinstance(mapping[key], dict): dm = mapping[key].get("details_md") if isinstance(dm, str) and dm.strip(): r["details_md"] = dm meta = {kk: vv for kk, vv in mapping[key].items() if kk != "details_md"} if meta: r["doc_meta"] = meta break # ============================= # Merge hybrids with their slash twins # ============================= def _merge_hybrid_slash(rows: List[Dict[str, Any]]) -> None: idx_by_hybrid: Dict[Tuple[str, str], int] = {} for i, r in enumerate(rows): if r.get("type") == "hybrid": cog = (r.get("cog") or "").strip() base = str(r.get("name") or "").split(" ")[-1].lower() idx_by_hybrid[(cog, base)] = i to_remove: List[int] = [] for i, r in enumerate(rows): if r.get("type") != "slash": continue base = str(r.get("name") or "").lstrip("/").split("/")[-1].lower() cog = (r.get("cog") or "").strip() key = (cog, base) hi = idx_by_hybrid.get(key) if hi is None: continue h = rows[hi] if not h.get("usage_slash") and r.get("usage_slash"): h["usage_slash"] = r["usage_slash"] if not h.get("help") and r.get("help"): h["help"] = r["help"] if r.get("moderator_only"): h["moderator_only"] = True if r.get("required_permissions"): h["required_permissions"] = sorted(set((h.get("required_permissions") or []) + r["required_permissions"])) if not h.get("extras") and r.get("extras"): h["extras"] = r["extras"] if r.get("details_md") and not h.get("details_md"): h["details_md"] = r["details_md"] to_remove.append(i) for i in sorted(to_remove, reverse=True): rows.pop(i) # ============================= # Schema builder # ============================= def build_command_schema(bot: commands.Bot) -> Dict[str, Any]: px = _gather_prefix_and_hybrid(bot) sl = _gather_slash(bot) all_rows = px + sl # Additional moderator checks & sanitization for row in all_rows: try: helptext = f"{row.get('help') or ''} {row.get('brief') or ''}" if "[mod]" in helptext.lower(): row["moderator_only"] = True except Exception: pass if row.get("required_permissions"): row["moderator_only"] = True try: ex = row.get("extras") or {} if isinstance(ex, dict) and str(ex.get("category", "")).lower() in {"mod", "moderator", "staff"}: row["moderator_only"] = True except Exception: pass _augment_with_external_docs(all_rows) _merge_hybrid_slash(all_rows) mods = [r for r in all_rows if r.get("moderator_only")] users = [r for r in all_rows if not r.get("moderator_only")] return { "title": "ShaiWatcher Commands", "count": len(all_rows), "sections": { "user": users, "moderator": mods, }, "all": all_rows, } # ============================= # Static asset serving # ============================= def _static_root() -> Path: return _project_root() / "assets" / "docs" def _guess_mime(p: Path) -> str: ext = p.suffix.lower() return { ".svg": "image/svg+xml; charset=utf-8", ".png": "image/png", ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".webp": "image/webp", ".gif": "image/gif", ".css": "text/css; charset=utf-8", ".js": "application/javascript; charset=utf-8", ".json": "application/json; charset=utf-8", ".md": "text/markdown; charset=utf-8", ".txt": "text/plain; charset=utf-8", }.get(ext, "application/octet-stream") # ============================= # HTTP + UI # ============================= _HTML = """ __TITLE__

__TITLE__

Sections: User · Moderator · All
""" class _DocsHandler(BaseHTTPRequestHandler): bot: commands.Bot = None title: str = "ShaiWatcher Commands" force_ready: bool = False def _set(self, code=200, content_type="text/html; charset=utf-8"): self.send_response(code) self.send_header("Content-Type", content_type) self.send_header("Cache-Control", "no-store") self.end_headers() def log_message(self, fmt, *args): return def do_GET(self): try: ready = bool(self.force_ready) or (getattr(self, "bot", None) and self.bot.is_ready()) if not ready: self._set(503, "text/plain; charset=utf-8") self.wfile.write(b"warming up") return path = urlparse(self.path).path # Serve static assets from /assets/docs/* if path.startswith("/assets/docs/"): try: root = _static_root().resolve() rel = path[len("/assets/docs/"):] fs_path = (root / rel).resolve() try: if not fs_path.is_relative_to(root): raise ValueError("outside root") except AttributeError: if str(root) not in str(fs_path): raise ValueError("outside root") if fs_path.is_file(): mime = _guess_mime(fs_path) self.send_response(200) self.send_header("Content-Type", mime) self.send_header("Cache-Control", "public, max-age=86400, immutable") self.end_headers() with fs_path.open("rb") as f: self.wfile.write(f.read()) return self._set(404, "text/plain; charset=utf-8") self.wfile.write(b"not found") return except Exception: traceback.print_exc() self._set(500, "text/plain; charset=utf-8") self.wfile.write(b"internal error") return if path == "/": self._set() html = _HTML.replace("__TITLE__", self.title) try: schema = build_command_schema(self.bot) inline = json.dumps(_to_primitive(schema), ensure_ascii=False, separators=(",", ":")) html = html.replace("", f"") # Inject support link + visibility support_url = getattr(_DocsHandler, "support_url", "") or "" support_label = getattr(_DocsHandler, "support_label", "Buy me a ☕") vis = "block" if support_url else "none" html = (html .replace("__SUPPORT_URL__", support_url) .replace("__SUPPORT_LABEL__", support_label) .replace("__SUPPORT_VIS__", vis) ) # Fallback to values stored on the cog (we’ll wire them below in _start_server) support_url = support_url or getattr(_DocsHandler, "support_url", "") support_label = support_label or getattr(_DocsHandler, "support_label", "Buy me a ☕") vis = "block" if support_url else "none" html = (html .replace("__SUPPORT_URL__", support_url) .replace("__SUPPORT_LABEL__", support_label) .replace("__SUPPORT_VIS__", vis) ) except Exception: traceback.print_exc() self.wfile.write(html.encode("utf-8")) return if path == "/api/status": payload = _status_payload(self.bot) self._set(200, "application/json; charset=utf-8") self.wfile.write(_json_dumps_safe(payload)) return if path == "/healthz": self._set(200, "text/plain; charset=utf-8") self.wfile.write(b"ok") return if path == "/api/commands": schema = build_command_schema(self.bot) payload = _json_dumps_safe(schema) self._set(200, "application/json; charset=utf-8") self.wfile.write(payload) return self._set(404, "text/plain; charset=utf-8") self.wfile.write(b"not found") except Exception: traceback.print_exc() try: self._set(500, "text/plain; charset=utf-8") self.wfile.write(b"internal error") except Exception: pass def _start_server(bot: commands.Bot, host: str, port: int, title: str): _DocsHandler.bot = bot _DocsHandler.title = title _DocsHandler.support_url = getattr(bot, "docs_support_url", None) _DocsHandler.support_label = getattr(bot, "docs_support_label", None) _DocsHandler.force_ready = os.getenv("SHAI_OFFLINE", "").lower() in {"1", "true", "yes"} httpd = ThreadingHTTPServer((host, port), _DocsHandler) def _run(): try: httpd.serve_forever(poll_interval=0.5) except Exception: traceback.print_exc() t = threading.Thread(target=_run, name="DocsSite", daemon=True) t.start() print(f"[DocsSite] Listening on http://{host}:{port} (title='{title}', offline={_DocsHandler.force_ready})") class DocsSite(commands.Cog): """Tiny Swagger-like docs site for bot commands.""" def __init__(self, bot: commands.Bot): self.bot = bot r = cfg(bot) self.host = r.get("docs_host", "0.0.0.0") # SHAI_DOCS_HOST self.port = r.int("docs_port", 8910) # SHAI_DOCS_PORT self.title = r.get("docs_title", "ShaiWatcher Commands") # SHAI_DOCS_TITLE # Support link config self.support_url = r.get("docs_support_url", "https://throne.com/ookamikuntv/item/39590391-c582-4c5d-8795-fe6f1925eaae") self.support_label = r.get("docs_support_label", "Buy me a ☕") # Expose to handler via bot (read in _start_server) self.bot.docs_support_url = self.support_url self.bot.docs_support_label = self.support_label _start_server(self.bot, self.host, self.port, self.title) def force_ready(self, value: bool = True): _DocsHandler.force_ready = bool(value) async def setup(bot: commands.Bot): await bot.add_cog(DocsSite(bot))