shaiwatcher/modules/docs_site/docs_site.py
Franz Rolfsvaag 1ede582a76 0.4.2.0.a1
- DD cycle data fetching
  - ShaiWatcher will now keep an updated loot table of the unique items in the DD each week
    The bot will **only** edit its message if already present, which should reduce message spam
  - Added command `/dd_update` to control the update behaviour. stop|resume|start [reason_text]
- Docsite changes
  - Added "ADMIN" tags to commands, signifying owner-only commands
  - Owner-only commands are now filtered under the "moderator" category
  - Added docs for `/dd_update`
- Logging
  - Added logging info for more verbose info relating to configuration and installation
2025-08-16 06:39:01 +02:00

846 lines
30 KiB
Python

import json
import threading
import traceback
import os
import re
import time
import importlib
from pathlib import Path
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse
from typing import Any, Dict, List, Optional, Tuple
import discord
from discord.ext import commands
from discord import app_commands
from modules.common.settings import cfg
_START_TS = time.time()
# =============================
# Safe JSON helpers
# =============================
_PRIMITIVES = (str, int, float, bool, type(None))
def _to_primitive(obj: Any, depth: int = 0) -> Any:
if depth > 6:
return str(obj)
if isinstance(obj, _PRIMITIVES):
return obj
if isinstance(obj, (list, tuple, set)):
return [_to_primitive(x, depth + 1) for x in obj]
if isinstance(obj, dict):
out = {}
for k, v in obj.items():
try:
out[str(k)] = _to_primitive(v, depth + 1)
except Exception:
out[str(k)] = str(v)
return out
if hasattr(obj, "value"):
try:
return _to_primitive(getattr(obj, "value"), depth + 1)
except Exception:
pass
return str(obj)
def _json_dumps_safe(payload: Any) -> bytes:
return json.dumps(_to_primitive(payload), ensure_ascii=False, separators=(",", ":")).encode("utf-8")
# =============================
# Version / uptime helpers
# =============================
def _project_root() -> Path:
return Path(__file__).resolve().parents[2]
def _read_version_from_file() -> Optional[str]:
"""Strictly parse VERSION from bot.py: VERSION = "x.y.z"."""
try:
bot_py = _project_root() / "bot.py"
if not bot_py.is_file():
return None
text = bot_py.read_text(encoding="utf-8", errors="ignore")
m = re.search(r'^\s*VERSION\s*=\s*["\']([^"\']+)["\']', text, re.M)
if m:
return m.group(1)
except Exception:
traceback.print_exc()
return None
def _get_version_from_botpy() -> Optional[str]:
try:
m = importlib.import_module("bot")
v = getattr(m, "VERSION", None)
if isinstance(v, (str, int, float)):
return str(v)
except Exception:
pass
return None
def _get_boot_state(bot: commands.Bot) -> Dict[str, Any]:
dm = getattr(bot, "data_manager", None)
if dm and dm.get("boot_state"):
try:
return (dm.get("boot_state") or [{}])[-1]
except Exception:
return {}
return {}
def _status_payload(bot: commands.Bot) -> Dict[str, Any]:
st = _get_boot_state(bot)
now = time.time()
boot_ts = float(st.get("last_boot_ts", 0.0)) if st else 0.0
if boot_ts > 0:
uptime = max(0, int(now - boot_ts))
else:
uptime = max(0, int(now - _START_TS))
ver = st.get("last_version") if st else None
if not ver:
ver = _read_version_from_file() or _get_version_from_botpy() or os.getenv("SHAI_VERSION", "unknown")
return {"uptime_seconds": uptime, "version": str(ver)}
# =============================
# Mod detection (heuristics)
# =============================
def _looks_like_mod_check(fn) -> bool:
try:
qn = getattr(fn, "__qualname__", "") or ""
mod_names = (
"is_moderator_member",
"is_moderator_userid",
"require_mod_ctx",
"require_mod_interaction",
)
if any(m in qn for m in mod_names):
return True
mod = getattr(fn, "__module__", "") or ""
if "mod_perms" in mod:
return True
except Exception:
pass
return False
def _is_perm_check(fn) -> Tuple[bool, List[str]]:
perms = []
try:
code = getattr(fn, "__code__", None)
if code and code.co_freevars and getattr(fn, "__closure__", None):
for cell in fn.__closure__ or []:
val = getattr(cell, "cell_contents", None)
if isinstance(val, dict):
for k, v in val.items():
if v:
perms.append(str(k))
except Exception:
pass
return (len(perms) > 0), perms
# =============================
# Introspection
# =============================
def _bot_prefix(bot: commands.Bot) -> str:
p = getattr(bot, "command_prefix", "!")
try:
if callable(p):
val = p(bot, None)
if isinstance(val, (list, tuple)) and val:
return str(val[0])
return str(val)
return str(p)
except Exception:
return "!"
def _is_mod_command_prefix(cmd: commands.Command) -> Tuple[bool, List[str]]:
is_mod = False
perms: List[str] = []
try:
for chk in getattr(cmd, "checks", []) or []:
if _looks_like_mod_check(chk):
is_mod = True
p_flag, p_list = _is_perm_check(chk)
if p_flag:
perms.extend(p_list)
except Exception:
pass
return is_mod, sorted(set(perms))
def _is_mod_command_slash(cmd: app_commands.Command) -> Tuple[bool, List[str]]:
is_mod = False
try:
for chk in getattr(cmd, "checks", []) or []:
if _looks_like_mod_check(chk):
is_mod = True
for chk in getattr(cmd, "_checks", []) or []:
if _looks_like_mod_check(chk):
is_mod = True
except Exception:
pass
return is_mod, []
def _command_usage_prefix(cmd: commands.Command, prefix: str) -> str:
if cmd.usage:
return cmd.usage
try:
params = []
for k, p in (cmd.clean_params or {}).items():
if getattr(p, "kind", None) and str(p.kind).lower().startswith("var"):
params.append(f"[{k}...]")
elif p.default is p.empty:
params.append(f"<{k}>")
else:
params.append(f"[{k}]")
return f"{prefix}{cmd.qualified_name}" + ((" " + " ".join(params)) if params else "")
except Exception:
return f"{prefix}{cmd.name}"
def _command_usage_slash_like(cmd_name: str, options: Optional[List[Any]] = None) -> str:
try:
parts = [f"/{cmd_name}"]
opts = []
seq = options or []
for opt in seq:
n = getattr(opt, "name", None) or getattr(opt, "display_name", None) or "arg"
req = bool(getattr(opt, "required", False))
opts.append(f"<{n}>" if req else f"[{n}]")
if opts:
parts.append(" " + " ".join(opts))
return "".join(parts)
except Exception:
return f"/{cmd_name}"
def _command_usage_slash(cmd: app_commands.Command) -> str:
try:
options = getattr(cmd, "options", None) or getattr(cmd, "parameters", None) or getattr(cmd, "_params", None)
return _command_usage_slash_like((cmd.name or "").replace("/", " "), options)
except Exception:
return f"/{cmd.name}"
def _iter_all_app_commands(bot: commands.Bot):
"""Yield (scope_tag, top_level_command) including global and per-guild trees."""
out = []
try:
for cmd in bot.tree.get_commands():
out.append(("", cmd))
except Exception:
pass
for g in list(getattr(bot, "guilds", []) or []):
try:
cmds = bot.tree.get_commands(guild=g)
except TypeError:
try:
cmds = bot.tree.get_commands(guild=discord.Object(id=g.id))
except Exception:
cmds = []
except Exception:
cmds = []
for cmd in cmds or []:
out.append((str(g.id), cmd))
return out
def _gather_prefix_and_hybrid(bot: commands.Bot) -> List[Dict[str, Any]]:
rows: List[Dict[str, Any]] = []
seen = set()
prefix = _bot_prefix(bot)
for cmd in getattr(bot, "commands", []) or []:
try:
if getattr(cmd, "hidden", False):
continue
is_hybrid = isinstance(cmd, commands.HybridCommand)
ctype = "hybrid" if is_hybrid else "prefix"
is_mod, perms = _is_mod_command_prefix(cmd)
usage_slash = None
if is_hybrid:
try:
app_cmd = getattr(cmd, "app_command", None)
if isinstance(app_cmd, app_commands.Command):
usage_slash = _command_usage_slash(app_cmd)
s_mod, _ = _is_mod_command_slash(app_cmd)
if s_mod:
is_mod = True
desc = (getattr(app_cmd, "description", "") or "")
if "[mod]" in desc.lower():
is_mod = True
except Exception:
pass
usage_prefix = _command_usage_prefix(cmd, prefix)
qn = cmd.qualified_name # single source of truth for counters
row = {
"type": ctype,
"name": cmd.qualified_name,
"display_name": cmd.qualified_name,
"help": (cmd.help or "").strip(),
"brief": (cmd.brief or "").strip(),
"usage": usage_prefix if not is_hybrid else None,
"usage_prefix": usage_prefix,
"usage_slash": usage_slash,
"cog": getattr(cmd.cog, "qualified_name", None) if getattr(cmd, "cog", None) else None,
"module": getattr(getattr(cmd, "callback", None), "__module__", None),
"moderator_only": bool(is_mod),
"admin_only": False,
"required_permissions": perms,
"counter_key": qn,
"exec_count": _cmd_counter(bot, qn),
}
key = ("px", row["name"])
if key not in seen:
rows.append(row)
seen.add(key)
except Exception:
traceback.print_exc()
continue
return rows
def _walk_app_tree(node: Any, prefix: str = "") -> List[Tuple[str, app_commands.Command]]:
out: List[Tuple[str, app_commands.Command]] = []
if isinstance(node, app_commands.Command):
out.append((f"{prefix}/{node.name}", node))
return out
if isinstance(node, app_commands.Group):
base = f"{prefix}/{node.name}"
for sub in list(getattr(node, "commands", []) or []):
out.extend(_walk_app_tree(sub, base))
return out
def _safe_extras(obj: Any) -> Optional[Dict[str, Any]]:
d = getattr(obj, "extras", None)
if not d:
return None
if not isinstance(d, dict):
return {"value": _to_primitive(d)}
return _to_primitive(d)
def _gather_slash(bot: commands.Bot) -> List[Dict[str, Any]]:
rows: List[Dict[str, Any]] = []
collected: List[Tuple[str, app_commands.Command, str]] = []
try:
for scope, top in _iter_all_app_commands(bot):
for path, leaf in _walk_app_tree(top, prefix=""):
collected.append((scope, leaf, path))
except Exception:
traceback.print_exc()
seen_paths = set()
for scope, leaf, path in collected:
try:
canon = path.lstrip("/") # e.g., 'power/restart'
if canon in seen_paths:
continue
seen_paths.add(canon)
is_mod, perms = _is_mod_command_slash(leaf)
binding = getattr(leaf, "binding", None)
callback = getattr(leaf, "callback", None)
display = canon.replace("/", " ")
options = getattr(leaf, "options", None) or getattr(leaf, "parameters", None) or getattr(leaf, "_params", None)
usage_full = _command_usage_slash_like(display, options)
# Use leaf.qualified_name when available, it matches listener keys
qn = getattr(leaf, "qualified_name", None) or display
row = {
"type": "slash",
"name": "/" + canon,
"display_name": "/" + display,
"help": (getattr(leaf, "description", "") or "").strip(),
"brief": "",
"usage": usage_full,
"usage_prefix": None,
"usage_slash": usage_full,
"cog": binding.__class__.__name__ if binding else None,
"module": getattr(callback, "__module__", None) if callback else None,
"moderator_only": bool(is_mod),
"admin_only": False,
"required_permissions": perms,
"extras": _safe_extras(leaf),
"dm_permission": getattr(leaf, "dm_permission", None),
"counter_key": qn,
"exec_count": _cmd_counter(bot, qn),
}
rows.append(row)
except Exception:
traceback.print_exc()
continue
return rows
def _cmd_counter(bot, qualified_name: str) -> int:
dm = getattr(bot, "data_manager", None)
return dm.get_counter(f"cmd::{qualified_name}") if dm else 0
# =============================
# Details loader & master JSON
# =============================
_DETAILS_DIR = _project_root() / "assets" / "docs" / "commands"
_MASTER_PATH = _DETAILS_DIR / "__commands__.json"
_DOCS_CACHE: Dict[str, Any] = {"map": {}, "sig": ""}
def _row_key_candidates(row: Dict[str, Any]) -> List[str]:
c = str(row.get("cog") or "").strip()
if row.get("type") == "slash":
base = str(row.get("name", "")).lstrip("/").split("/")[-1]
else:
base = str(row.get("name", "")).split(" ")[0]
keys = []
if c:
keys.append(f"{c}.{base}")
if row.get("type") == "slash":
keys.append(str(row.get("name", "")).lstrip("/"))
keys.append(base)
return keys
def _scan_doc_files() -> Dict[str, Dict[str, Any]]:
"""
Scans assets/docs/commands for:
- <key>.md -> details_md
- <key>.html -> details_html
- <key>.brief.html -> brief_html
- <key>.details.html -> details_html (wins over <key>.html)
"""
out: Dict[str, Dict[str, Any]] = {}
if not _DETAILS_DIR.is_dir():
return out
def _put(key: str, field: str, text: str):
if key not in out:
out[key] = {}
out[key][field] = text
# .md legacy
for p in _DETAILS_DIR.glob("*.md"):
if p.name.startswith("_"):
continue
try:
_put(p.stem, "details_md", p.read_text(encoding="utf-8"))
except Exception:
traceback.print_exc()
# .html brief/details
for p in _DETAILS_DIR.glob("*.html"):
if p.name.startswith("_"):
continue
name = p.stem
try:
txt = p.read_text(encoding="utf-8")
except Exception:
traceback.print_exc()
continue
if name.endswith(".brief"):
key = name[:-6]
_put(key, "brief_html", txt)
elif name.endswith(".details"):
key = name[:-8]
_put(key, "details_html", txt)
else:
_put(name, "details_html", txt)
return out
def _load_master_json() -> Dict[str, Dict[str, Any]]:
if not _MASTER_PATH.is_file():
return {}
try:
raw_text = _MASTER_PATH.read_text(encoding="utf-8")
if not raw_text.strip():
return {}
raw_any = json.loads(raw_text) or {}
out: Dict[str, Dict[str, Any]] = {}
for k, v in raw_any.items():
if isinstance(v, dict):
out[str(k)] = {kk: vv for kk, vv in v.items()}
return out
except Exception:
traceback.print_exc()
return {}
def _write_master_json(mapping: Dict[str, Dict[str, Any]]) -> None:
try:
_DETAILS_DIR.mkdir(parents=True, exist_ok=True)
with _MASTER_PATH.open("w", encoding="utf-8") as f:
json.dump(mapping, f, ensure_ascii=False, indent=2)
except Exception:
traceback.print_exc()
def _dir_signature() -> str:
parts = []
if _DETAILS_DIR.is_dir():
for p in sorted(_DETAILS_DIR.glob("*")):
try:
parts.append(f"{p.name}:{int(p.stat().st_mtime)}")
except Exception:
continue
try:
if _MASTER_PATH.exists():
parts.append(f"__master__:{int(_MASTER_PATH.stat().st_mtime)}")
except Exception:
pass
return "|".join(parts)
def _load_external_docs() -> Dict[str, Dict[str, Any]]:
sig = _dir_signature()
if _DOCS_CACHE.get("sig") == sig:
return _DOCS_CACHE["map"]
master = _load_master_json()
doc_map = _scan_doc_files()
merged: Dict[str, Dict[str, Any]] = {k: dict(v) for k, v in master.items()}
for k, v in doc_map.items():
if k not in merged:
merged[k] = {}
for kk, vv in v.items():
merged[k][kk] = vv
_write_master_json(merged)
_DOCS_CACHE["map"] = merged
_DOCS_CACHE["sig"] = sig
return merged
def _augment_with_external_docs(rows: List[Dict[str, Any]]) -> None:
mapping = _load_external_docs()
for r in rows:
if not r.get("details_md") and isinstance(r.get("extras"), dict):
dm = r["extras"].get("details_md")
if isinstance(dm, str) and dm.strip():
r["details_md"] = dm
if not r.get("details_md") or not r.get("brief_html") or not r.get("details_html"):
for key in _row_key_candidates(r):
if key in mapping and isinstance(mapping[key], dict):
m = mapping[key]
if not r.get("details_md") and isinstance(m.get("details_md"), str):
r["details_md"] = m["details_md"]
if not r.get("brief_html") and isinstance(m.get("brief_html"), str):
r["brief_html"] = m["brief_html"]
if not r.get("details_html") and isinstance(m.get("details_html"), str):
r["details_html"] = m["details_html"]
if not r.get("doc_meta"):
meta = {kk: vv for kk, vv in m.items() if kk not in {"details_md","brief_html","details_html"}}
if meta:
r["doc_meta"] = meta
break
# =============================
# Merge hybrids with their slash twins
# =============================
def _merge_hybrid_slash(rows: List[Dict[str, Any]]) -> None:
idx_by_hybrid: Dict[Tuple[str, str], int] = {}
for i, r in enumerate(rows):
if r.get("type") == "hybrid":
cog = (r.get("cog") or "").strip()
base = str(r.get("name") or "").split(" ")[-1].lower()
idx_by_hybrid[(cog, base)] = i
to_remove: List[int] = []
for i, r in enumerate(rows):
if r.get("type") != "slash":
continue
base = str(r.get("name") or "").lstrip("/").split("/")[-1].lower()
cog = (r.get("cog") or "").strip()
key = (cog, base)
hi = idx_by_hybrid.get(key)
if hi is None:
continue
h = rows[hi]
if not h.get("usage_slash") and r.get("usage_slash"):
h["usage_slash"] = r["usage_slash"]
if not h.get("help") and r.get("help"):
h["help"] = r["help"]
if r.get("moderator_only"):
h["moderator_only"] = True
if r.get("admin_only"):
h["admin_only"] = True
if r.get("required_permissions"):
h["required_permissions"] = sorted(set((h.get("required_permissions") or []) + r["required_permissions"]))
if not h.get("extras") and r.get("extras"):
h["extras"] = r["extras"]
if r.get("details_md") and not h.get("details_md"):
h["details_md"] = r["details_md"]
if r.get("brief_html") and not h.get("brief_html"):
h["brief_html"] = r["brief_html"]
if r.get("details_html") and not h.get("details_html"):
h["details_html"] = r["details_html"]
# NEW: sum exec_count from slash twin into the hybrid row
h["exec_count"] = int(h.get("exec_count", 0) or 0) + int(r.get("exec_count", 0) or 0)
to_remove.append(i)
for i in sorted(to_remove, reverse=True):
rows.pop(i)
# =============================
# Schema builder
# =============================
def build_command_schema(bot: commands.Bot) -> Dict[str, Any]:
px = _gather_prefix_and_hybrid(bot)
sl = _gather_slash(bot)
all_rows = px + sl
# Mark mod-only via hints/perms/extras
for row in all_rows:
try:
helptext = f"{row.get('help') or ''} {row.get('brief') or ''}"
hl = helptext.lower()
if "[mod]" in hl:
row["moderator_only"] = True
if "[admin]" in hl:
row["admin_only"] = True
except Exception:
pass
if row.get("required_permissions"):
row["moderator_only"] = True
try:
ex = row.get("extras") or {}
if isinstance(ex, dict) and str(ex.get("category", "")).lower() in {"mod", "moderator", "staff"}:
row["moderator_only"] = True
if isinstance(ex, dict) and str(ex.get("category", "")).lower() in {"admin", "administrator", "owner"}:
row["admin_only"] = True
except Exception:
pass
_augment_with_external_docs(all_rows)
_merge_hybrid_slash(all_rows)
# ✅ NEW: sort alphabetically by *command name* (display_name/name), not by cog
def _sort_key(r: Dict[str, Any]) -> str:
s = (r.get("display_name") or r.get("name") or "")
# strip leading slash and normalize
return s.lstrip("/").strip().lower()
all_rows.sort(key=_sort_key)
mods = [r for r in all_rows if r.get("moderator_only") or r.get("admin_only")]
users = [r for r in all_rows if not (r.get("moderator_only") or r.get("admin_only"))]
return {
"title": "ShaiWatcher Commands",
"count": len(all_rows),
"sections": {"user": users, "moderator": mods},
"all": all_rows,
}
# =============================
# Static asset serving
# =============================
def _static_root() -> Path:
return _project_root() / "assets" / "docs"
def _guess_mime(p: Path) -> str:
ext = p.suffix.lower()
return {
".svg": "image/svg+xml; charset=utf-8",
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".webp": "image/webp",
".gif": "image/gif",
".css": "text/css; charset=utf-8",
".js": "application/javascript; charset=utf-8",
".json": "application/json; charset=utf-8",
".md": "text/markdown; charset=utf-8",
".txt": "text/plain; charset=utf-8",
".ico": "image/x-icon",
".html": "text/html; charset=utf-8",
}.get(ext, "application/octet-stream")
# =============================
# HTTP + UI
# =============================
class _DocsHandler(BaseHTTPRequestHandler):
bot: commands.Bot = None
title: str = "ShaiWatcher Commands"
force_ready: bool = False
support_url: Optional[str] = None
support_label: Optional[str] = None
def _set(self, code=200, content_type="text/html; charset=utf-8"):
self.send_response(code)
self.send_header("Content-Type", content_type)
self.send_header("Cache-Control", "no-store")
self.end_headers()
def log_message(self, fmt, *args):
return
def _serve_file(self, fs_path: Path, cache: str = "public, max-age=86400, immutable"):
mime = _guess_mime(fs_path)
self.send_response(200)
self.send_header("Content-Type", mime)
self.send_header("Cache-Control", cache)
self.end_headers()
with fs_path.open("rb") as f:
self.wfile.write(f.read())
def do_GET(self):
try:
ready = bool(self.force_ready) or (getattr(self, "bot", None) and self.bot.is_ready())
if not ready:
self._set(503, "text/plain; charset=utf-8")
self.wfile.write(b"warming up")
return
path = urlparse(self.path).path
# Root favicon / PWA convenience routes
if path in ("/favicon.ico", "/favicon-16x16.png", "/favicon-32x32.png",
"/apple-touch-icon.png", "/site.webmanifest"):
root = _static_root().resolve()
name = path.lstrip("/")
fs_path = (root / name).resolve()
try:
if not fs_path.is_relative_to(root):
raise ValueError("outside root")
except AttributeError:
if str(root) not in str(fs_path):
raise ValueError("outside root")
if fs_path.is_file():
self._serve_file(fs_path)
else:
self._set(404, "text/plain; charset=utf-8"); self.wfile.write(b"not found")
return
# Static assets under /assets/docs/...
if path.startswith("/assets/docs/"):
try:
root = _static_root().resolve()
rel = path[len("/assets/docs/"):]
fs_path = (root / rel).resolve()
try:
if not fs_path.is_relative_to(root):
raise ValueError("outside root")
except AttributeError:
if str(root) not in str(fs_path):
raise ValueError("outside root")
if fs_path.is_file():
self._serve_file(fs_path)
return
self._set(404, "text/plain; charset=utf-8"); self.wfile.write(b"not found")
return
except Exception:
traceback.print_exc()
self._set(500, "text/plain; charset=utf-8"); self.wfile.write(b"internal error")
return
if path == "/":
# Load template file
tpl_path = _static_root() / "cmd.html"
if tpl_path.is_file():
html = tpl_path.read_text(encoding="utf-8")
else:
# Fallback minimal page if template missing
html = "<!doctype html><meta charset='utf-8'><title>Docs</title><body><div id='alerts'>Missing assets/docs/cmd.html</div></body>"
# Inject data + support placeholders
try:
schema = build_command_schema(self.bot)
inline = json.dumps(_to_primitive(schema), ensure_ascii=False, separators=(",", ":"))
# Insert __DATA__ just before </head>
inj = f"<script>window.__DATA__={inline};</script>"
html = html.replace("</head>", inj + "</head>")
support_url = getattr(_DocsHandler, "support_url", "") or ""
support_label = getattr(_DocsHandler, "support_label", "Buy me a ☕")
vis = "block" if support_url else "none"
html = (html.replace("__TITLE__", self.title)
.replace("__SUPPORT_URL__", support_url)
.replace("__SUPPORT_LABEL__", support_label)
.replace("__SUPPORT_VIS__", vis))
self._set()
self.wfile.write(html.encode("utf-8"))
except Exception:
traceback.print_exc()
self._set(500, "text/plain; charset=utf-8")
self.wfile.write(b"internal error")
return
if path == "/api/status":
payload = _status_payload(self.bot)
self._set(200, "application/json; charset=utf-8")
self.wfile.write(_json_dumps_safe(payload))
return
if path == "/healthz":
self._set(200, "text/plain; charset=utf-8"); self.wfile.write(b"ok"); return
if path == "/api/commands":
schema = build_command_schema(self.bot)
payload = _json_dumps_safe(schema)
self._set(200, "application/json; charset=utf-8")
self.wfile.write(payload)
return
self._set(404, "text/plain; charset=utf-8"); self.wfile.write(b"not found")
except Exception:
traceback.print_exc()
try:
self._set(500, "text/plain; charset=utf-8"); self.wfile.write(b"internal error")
except Exception:
pass
def _start_server(bot: commands.Bot, host: str, port: int, title: str):
_DocsHandler.bot = bot
_DocsHandler.title = title
_DocsHandler.support_url = getattr(bot, "docs_support_url", None)
_DocsHandler.support_label = getattr(bot, "docs_support_label", None)
_DocsHandler.force_ready = os.getenv("SHAI_OFFLINE", "").lower() in {"1", "true", "yes"}
httpd = ThreadingHTTPServer((host, port), _DocsHandler)
def _run():
try:
httpd.serve_forever(poll_interval=0.5)
except Exception:
traceback.print_exc()
t = threading.Thread(target=_run, name="DocsSite", daemon=True)
t.start()
print(f"[DocsSite] Listening on http://{host}:{port} (title='{title}', offline={_DocsHandler.force_ready})")
class DocsSite(commands.Cog):
"""Tiny Swagger-like docs site for bot commands."""
def __init__(self, bot: commands.Bot):
self.bot = bot
r = cfg(bot)
self.host = r.get("docs_host", "0.0.0.0") # SHAI_DOCS_HOST
self.port = r.int("docs_port", 8910) # SHAI_DOCS_PORT
self.title = r.get("docs_title", "ShaiWatcher Commands") # SHAI_DOCS_TITLE
# Support link config
self.support_url = r.get("docs_support_url", "https://throne.com/ookamikuntv/item/39590391-c582-4c5d-8795-fe6f1925eaae")
self.support_label = r.get("docs_support_label", "Buy me a ☕")
# Expose to handler
self.bot.docs_support_url = self.support_url
self.bot.docs_support_label = self.support_label
_start_server(self.bot, self.host, self.port, self.title)
def force_ready(self, value: bool = True):
_DocsHandler.force_ready = bool(value)
async def setup(bot: commands.Bot):
await bot.add_cog(DocsSite(bot))