- DD cycle data fetching - ShaiWatcher will now keep an updated loot table of the unique items in the DD each week The bot will **only** edit its message if already present, which should reduce message spam - Added command `/dd_update` to control the update behaviour. stop|resume|start [reason_text] - Docsite changes - Added "ADMIN" tags to commands, signifying owner-only commands - Owner-only commands are now filtered under the "moderator" category - Added docs for `/dd_update` - Logging - Added logging info for more verbose info relating to configuration and installation
846 lines
30 KiB
Python
846 lines
30 KiB
Python
import json
|
|
import threading
|
|
import traceback
|
|
import os
|
|
import re
|
|
import time
|
|
import importlib
|
|
from pathlib import Path
|
|
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
|
|
from urllib.parse import urlparse
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
import discord
|
|
from discord.ext import commands
|
|
from discord import app_commands
|
|
|
|
from modules.common.settings import cfg
|
|
|
|
_START_TS = time.time()
|
|
|
|
# =============================
|
|
# Safe JSON helpers
|
|
# =============================
|
|
|
|
_PRIMITIVES = (str, int, float, bool, type(None))
|
|
|
|
def _to_primitive(obj: Any, depth: int = 0) -> Any:
|
|
if depth > 6:
|
|
return str(obj)
|
|
if isinstance(obj, _PRIMITIVES):
|
|
return obj
|
|
if isinstance(obj, (list, tuple, set)):
|
|
return [_to_primitive(x, depth + 1) for x in obj]
|
|
if isinstance(obj, dict):
|
|
out = {}
|
|
for k, v in obj.items():
|
|
try:
|
|
out[str(k)] = _to_primitive(v, depth + 1)
|
|
except Exception:
|
|
out[str(k)] = str(v)
|
|
return out
|
|
if hasattr(obj, "value"):
|
|
try:
|
|
return _to_primitive(getattr(obj, "value"), depth + 1)
|
|
except Exception:
|
|
pass
|
|
return str(obj)
|
|
|
|
def _json_dumps_safe(payload: Any) -> bytes:
|
|
return json.dumps(_to_primitive(payload), ensure_ascii=False, separators=(",", ":")).encode("utf-8")
|
|
|
|
|
|
# =============================
|
|
# Version / uptime helpers
|
|
# =============================
|
|
|
|
def _project_root() -> Path:
|
|
return Path(__file__).resolve().parents[2]
|
|
|
|
def _read_version_from_file() -> Optional[str]:
|
|
"""Strictly parse VERSION from bot.py: VERSION = "x.y.z"."""
|
|
try:
|
|
bot_py = _project_root() / "bot.py"
|
|
if not bot_py.is_file():
|
|
return None
|
|
text = bot_py.read_text(encoding="utf-8", errors="ignore")
|
|
m = re.search(r'^\s*VERSION\s*=\s*["\']([^"\']+)["\']', text, re.M)
|
|
if m:
|
|
return m.group(1)
|
|
except Exception:
|
|
traceback.print_exc()
|
|
return None
|
|
|
|
def _get_version_from_botpy() -> Optional[str]:
|
|
try:
|
|
m = importlib.import_module("bot")
|
|
v = getattr(m, "VERSION", None)
|
|
if isinstance(v, (str, int, float)):
|
|
return str(v)
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
def _get_boot_state(bot: commands.Bot) -> Dict[str, Any]:
|
|
dm = getattr(bot, "data_manager", None)
|
|
if dm and dm.get("boot_state"):
|
|
try:
|
|
return (dm.get("boot_state") or [{}])[-1]
|
|
except Exception:
|
|
return {}
|
|
return {}
|
|
|
|
def _status_payload(bot: commands.Bot) -> Dict[str, Any]:
|
|
st = _get_boot_state(bot)
|
|
now = time.time()
|
|
boot_ts = float(st.get("last_boot_ts", 0.0)) if st else 0.0
|
|
if boot_ts > 0:
|
|
uptime = max(0, int(now - boot_ts))
|
|
else:
|
|
uptime = max(0, int(now - _START_TS))
|
|
|
|
ver = st.get("last_version") if st else None
|
|
if not ver:
|
|
ver = _read_version_from_file() or _get_version_from_botpy() or os.getenv("SHAI_VERSION", "unknown")
|
|
return {"uptime_seconds": uptime, "version": str(ver)}
|
|
|
|
|
|
# =============================
|
|
# Mod detection (heuristics)
|
|
# =============================
|
|
|
|
def _looks_like_mod_check(fn) -> bool:
|
|
try:
|
|
qn = getattr(fn, "__qualname__", "") or ""
|
|
mod_names = (
|
|
"is_moderator_member",
|
|
"is_moderator_userid",
|
|
"require_mod_ctx",
|
|
"require_mod_interaction",
|
|
)
|
|
if any(m in qn for m in mod_names):
|
|
return True
|
|
mod = getattr(fn, "__module__", "") or ""
|
|
if "mod_perms" in mod:
|
|
return True
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
def _is_perm_check(fn) -> Tuple[bool, List[str]]:
|
|
perms = []
|
|
try:
|
|
code = getattr(fn, "__code__", None)
|
|
if code and code.co_freevars and getattr(fn, "__closure__", None):
|
|
for cell in fn.__closure__ or []:
|
|
val = getattr(cell, "cell_contents", None)
|
|
if isinstance(val, dict):
|
|
for k, v in val.items():
|
|
if v:
|
|
perms.append(str(k))
|
|
except Exception:
|
|
pass
|
|
return (len(perms) > 0), perms
|
|
|
|
|
|
# =============================
|
|
# Introspection
|
|
# =============================
|
|
|
|
def _bot_prefix(bot: commands.Bot) -> str:
|
|
p = getattr(bot, "command_prefix", "!")
|
|
try:
|
|
if callable(p):
|
|
val = p(bot, None)
|
|
if isinstance(val, (list, tuple)) and val:
|
|
return str(val[0])
|
|
return str(val)
|
|
return str(p)
|
|
except Exception:
|
|
return "!"
|
|
|
|
def _is_mod_command_prefix(cmd: commands.Command) -> Tuple[bool, List[str]]:
|
|
is_mod = False
|
|
perms: List[str] = []
|
|
try:
|
|
for chk in getattr(cmd, "checks", []) or []:
|
|
if _looks_like_mod_check(chk):
|
|
is_mod = True
|
|
p_flag, p_list = _is_perm_check(chk)
|
|
if p_flag:
|
|
perms.extend(p_list)
|
|
except Exception:
|
|
pass
|
|
return is_mod, sorted(set(perms))
|
|
|
|
def _is_mod_command_slash(cmd: app_commands.Command) -> Tuple[bool, List[str]]:
|
|
is_mod = False
|
|
try:
|
|
for chk in getattr(cmd, "checks", []) or []:
|
|
if _looks_like_mod_check(chk):
|
|
is_mod = True
|
|
for chk in getattr(cmd, "_checks", []) or []:
|
|
if _looks_like_mod_check(chk):
|
|
is_mod = True
|
|
except Exception:
|
|
pass
|
|
return is_mod, []
|
|
|
|
def _command_usage_prefix(cmd: commands.Command, prefix: str) -> str:
|
|
if cmd.usage:
|
|
return cmd.usage
|
|
try:
|
|
params = []
|
|
for k, p in (cmd.clean_params or {}).items():
|
|
if getattr(p, "kind", None) and str(p.kind).lower().startswith("var"):
|
|
params.append(f"[{k}...]")
|
|
elif p.default is p.empty:
|
|
params.append(f"<{k}>")
|
|
else:
|
|
params.append(f"[{k}]")
|
|
return f"{prefix}{cmd.qualified_name}" + ((" " + " ".join(params)) if params else "")
|
|
except Exception:
|
|
return f"{prefix}{cmd.name}"
|
|
|
|
def _command_usage_slash_like(cmd_name: str, options: Optional[List[Any]] = None) -> str:
|
|
try:
|
|
parts = [f"/{cmd_name}"]
|
|
opts = []
|
|
seq = options or []
|
|
for opt in seq:
|
|
n = getattr(opt, "name", None) or getattr(opt, "display_name", None) or "arg"
|
|
req = bool(getattr(opt, "required", False))
|
|
opts.append(f"<{n}>" if req else f"[{n}]")
|
|
if opts:
|
|
parts.append(" " + " ".join(opts))
|
|
return "".join(parts)
|
|
except Exception:
|
|
return f"/{cmd_name}"
|
|
|
|
def _command_usage_slash(cmd: app_commands.Command) -> str:
|
|
try:
|
|
options = getattr(cmd, "options", None) or getattr(cmd, "parameters", None) or getattr(cmd, "_params", None)
|
|
return _command_usage_slash_like((cmd.name or "").replace("/", " "), options)
|
|
except Exception:
|
|
return f"/{cmd.name}"
|
|
|
|
def _iter_all_app_commands(bot: commands.Bot):
|
|
"""Yield (scope_tag, top_level_command) including global and per-guild trees."""
|
|
out = []
|
|
try:
|
|
for cmd in bot.tree.get_commands():
|
|
out.append(("", cmd))
|
|
except Exception:
|
|
pass
|
|
for g in list(getattr(bot, "guilds", []) or []):
|
|
try:
|
|
cmds = bot.tree.get_commands(guild=g)
|
|
except TypeError:
|
|
try:
|
|
cmds = bot.tree.get_commands(guild=discord.Object(id=g.id))
|
|
except Exception:
|
|
cmds = []
|
|
except Exception:
|
|
cmds = []
|
|
for cmd in cmds or []:
|
|
out.append((str(g.id), cmd))
|
|
return out
|
|
|
|
def _gather_prefix_and_hybrid(bot: commands.Bot) -> List[Dict[str, Any]]:
|
|
rows: List[Dict[str, Any]] = []
|
|
seen = set()
|
|
prefix = _bot_prefix(bot)
|
|
|
|
for cmd in getattr(bot, "commands", []) or []:
|
|
try:
|
|
if getattr(cmd, "hidden", False):
|
|
continue
|
|
is_hybrid = isinstance(cmd, commands.HybridCommand)
|
|
ctype = "hybrid" if is_hybrid else "prefix"
|
|
|
|
is_mod, perms = _is_mod_command_prefix(cmd)
|
|
|
|
usage_slash = None
|
|
if is_hybrid:
|
|
try:
|
|
app_cmd = getattr(cmd, "app_command", None)
|
|
if isinstance(app_cmd, app_commands.Command):
|
|
usage_slash = _command_usage_slash(app_cmd)
|
|
s_mod, _ = _is_mod_command_slash(app_cmd)
|
|
if s_mod:
|
|
is_mod = True
|
|
desc = (getattr(app_cmd, "description", "") or "")
|
|
if "[mod]" in desc.lower():
|
|
is_mod = True
|
|
except Exception:
|
|
pass
|
|
|
|
usage_prefix = _command_usage_prefix(cmd, prefix)
|
|
qn = cmd.qualified_name # single source of truth for counters
|
|
|
|
row = {
|
|
"type": ctype,
|
|
"name": cmd.qualified_name,
|
|
"display_name": cmd.qualified_name,
|
|
"help": (cmd.help or "").strip(),
|
|
"brief": (cmd.brief or "").strip(),
|
|
"usage": usage_prefix if not is_hybrid else None,
|
|
"usage_prefix": usage_prefix,
|
|
"usage_slash": usage_slash,
|
|
"cog": getattr(cmd.cog, "qualified_name", None) if getattr(cmd, "cog", None) else None,
|
|
"module": getattr(getattr(cmd, "callback", None), "__module__", None),
|
|
"moderator_only": bool(is_mod),
|
|
"admin_only": False,
|
|
"required_permissions": perms,
|
|
"counter_key": qn,
|
|
"exec_count": _cmd_counter(bot, qn),
|
|
}
|
|
key = ("px", row["name"])
|
|
if key not in seen:
|
|
rows.append(row)
|
|
seen.add(key)
|
|
except Exception:
|
|
traceback.print_exc()
|
|
continue
|
|
return rows
|
|
|
|
def _walk_app_tree(node: Any, prefix: str = "") -> List[Tuple[str, app_commands.Command]]:
|
|
out: List[Tuple[str, app_commands.Command]] = []
|
|
if isinstance(node, app_commands.Command):
|
|
out.append((f"{prefix}/{node.name}", node))
|
|
return out
|
|
if isinstance(node, app_commands.Group):
|
|
base = f"{prefix}/{node.name}"
|
|
for sub in list(getattr(node, "commands", []) or []):
|
|
out.extend(_walk_app_tree(sub, base))
|
|
return out
|
|
|
|
def _safe_extras(obj: Any) -> Optional[Dict[str, Any]]:
|
|
d = getattr(obj, "extras", None)
|
|
if not d:
|
|
return None
|
|
if not isinstance(d, dict):
|
|
return {"value": _to_primitive(d)}
|
|
return _to_primitive(d)
|
|
|
|
def _gather_slash(bot: commands.Bot) -> List[Dict[str, Any]]:
|
|
rows: List[Dict[str, Any]] = []
|
|
collected: List[Tuple[str, app_commands.Command, str]] = []
|
|
try:
|
|
for scope, top in _iter_all_app_commands(bot):
|
|
for path, leaf in _walk_app_tree(top, prefix=""):
|
|
collected.append((scope, leaf, path))
|
|
except Exception:
|
|
traceback.print_exc()
|
|
|
|
seen_paths = set()
|
|
for scope, leaf, path in collected:
|
|
try:
|
|
canon = path.lstrip("/") # e.g., 'power/restart'
|
|
if canon in seen_paths:
|
|
continue
|
|
seen_paths.add(canon)
|
|
|
|
is_mod, perms = _is_mod_command_slash(leaf)
|
|
binding = getattr(leaf, "binding", None)
|
|
callback = getattr(leaf, "callback", None)
|
|
|
|
display = canon.replace("/", " ")
|
|
options = getattr(leaf, "options", None) or getattr(leaf, "parameters", None) or getattr(leaf, "_params", None)
|
|
usage_full = _command_usage_slash_like(display, options)
|
|
|
|
# Use leaf.qualified_name when available, it matches listener keys
|
|
qn = getattr(leaf, "qualified_name", None) or display
|
|
|
|
row = {
|
|
"type": "slash",
|
|
"name": "/" + canon,
|
|
"display_name": "/" + display,
|
|
"help": (getattr(leaf, "description", "") or "").strip(),
|
|
"brief": "",
|
|
"usage": usage_full,
|
|
"usage_prefix": None,
|
|
"usage_slash": usage_full,
|
|
"cog": binding.__class__.__name__ if binding else None,
|
|
"module": getattr(callback, "__module__", None) if callback else None,
|
|
"moderator_only": bool(is_mod),
|
|
"admin_only": False,
|
|
"required_permissions": perms,
|
|
"extras": _safe_extras(leaf),
|
|
"dm_permission": getattr(leaf, "dm_permission", None),
|
|
"counter_key": qn,
|
|
"exec_count": _cmd_counter(bot, qn),
|
|
}
|
|
rows.append(row)
|
|
except Exception:
|
|
traceback.print_exc()
|
|
continue
|
|
return rows
|
|
|
|
def _cmd_counter(bot, qualified_name: str) -> int:
|
|
dm = getattr(bot, "data_manager", None)
|
|
return dm.get_counter(f"cmd::{qualified_name}") if dm else 0
|
|
|
|
# =============================
|
|
# Details loader & master JSON
|
|
# =============================
|
|
|
|
_DETAILS_DIR = _project_root() / "assets" / "docs" / "commands"
|
|
_MASTER_PATH = _DETAILS_DIR / "__commands__.json"
|
|
|
|
_DOCS_CACHE: Dict[str, Any] = {"map": {}, "sig": ""}
|
|
|
|
def _row_key_candidates(row: Dict[str, Any]) -> List[str]:
|
|
c = str(row.get("cog") or "").strip()
|
|
if row.get("type") == "slash":
|
|
base = str(row.get("name", "")).lstrip("/").split("/")[-1]
|
|
else:
|
|
base = str(row.get("name", "")).split(" ")[0]
|
|
keys = []
|
|
if c:
|
|
keys.append(f"{c}.{base}")
|
|
if row.get("type") == "slash":
|
|
keys.append(str(row.get("name", "")).lstrip("/"))
|
|
keys.append(base)
|
|
return keys
|
|
|
|
def _scan_doc_files() -> Dict[str, Dict[str, Any]]:
|
|
"""
|
|
Scans assets/docs/commands for:
|
|
- <key>.md -> details_md
|
|
- <key>.html -> details_html
|
|
- <key>.brief.html -> brief_html
|
|
- <key>.details.html -> details_html (wins over <key>.html)
|
|
"""
|
|
out: Dict[str, Dict[str, Any]] = {}
|
|
if not _DETAILS_DIR.is_dir():
|
|
return out
|
|
|
|
def _put(key: str, field: str, text: str):
|
|
if key not in out:
|
|
out[key] = {}
|
|
out[key][field] = text
|
|
|
|
# .md legacy
|
|
for p in _DETAILS_DIR.glob("*.md"):
|
|
if p.name.startswith("_"):
|
|
continue
|
|
try:
|
|
_put(p.stem, "details_md", p.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
traceback.print_exc()
|
|
|
|
# .html brief/details
|
|
for p in _DETAILS_DIR.glob("*.html"):
|
|
if p.name.startswith("_"):
|
|
continue
|
|
name = p.stem
|
|
try:
|
|
txt = p.read_text(encoding="utf-8")
|
|
except Exception:
|
|
traceback.print_exc()
|
|
continue
|
|
|
|
if name.endswith(".brief"):
|
|
key = name[:-6]
|
|
_put(key, "brief_html", txt)
|
|
elif name.endswith(".details"):
|
|
key = name[:-8]
|
|
_put(key, "details_html", txt)
|
|
else:
|
|
_put(name, "details_html", txt)
|
|
|
|
return out
|
|
|
|
def _load_master_json() -> Dict[str, Dict[str, Any]]:
|
|
if not _MASTER_PATH.is_file():
|
|
return {}
|
|
try:
|
|
raw_text = _MASTER_PATH.read_text(encoding="utf-8")
|
|
if not raw_text.strip():
|
|
return {}
|
|
raw_any = json.loads(raw_text) or {}
|
|
out: Dict[str, Dict[str, Any]] = {}
|
|
for k, v in raw_any.items():
|
|
if isinstance(v, dict):
|
|
out[str(k)] = {kk: vv for kk, vv in v.items()}
|
|
return out
|
|
except Exception:
|
|
traceback.print_exc()
|
|
return {}
|
|
|
|
def _write_master_json(mapping: Dict[str, Dict[str, Any]]) -> None:
|
|
try:
|
|
_DETAILS_DIR.mkdir(parents=True, exist_ok=True)
|
|
with _MASTER_PATH.open("w", encoding="utf-8") as f:
|
|
json.dump(mapping, f, ensure_ascii=False, indent=2)
|
|
except Exception:
|
|
traceback.print_exc()
|
|
|
|
def _dir_signature() -> str:
|
|
parts = []
|
|
if _DETAILS_DIR.is_dir():
|
|
for p in sorted(_DETAILS_DIR.glob("*")):
|
|
try:
|
|
parts.append(f"{p.name}:{int(p.stat().st_mtime)}")
|
|
except Exception:
|
|
continue
|
|
try:
|
|
if _MASTER_PATH.exists():
|
|
parts.append(f"__master__:{int(_MASTER_PATH.stat().st_mtime)}")
|
|
except Exception:
|
|
pass
|
|
return "|".join(parts)
|
|
|
|
def _load_external_docs() -> Dict[str, Dict[str, Any]]:
|
|
sig = _dir_signature()
|
|
if _DOCS_CACHE.get("sig") == sig:
|
|
return _DOCS_CACHE["map"]
|
|
|
|
master = _load_master_json()
|
|
doc_map = _scan_doc_files()
|
|
|
|
merged: Dict[str, Dict[str, Any]] = {k: dict(v) for k, v in master.items()}
|
|
for k, v in doc_map.items():
|
|
if k not in merged:
|
|
merged[k] = {}
|
|
for kk, vv in v.items():
|
|
merged[k][kk] = vv
|
|
|
|
_write_master_json(merged)
|
|
_DOCS_CACHE["map"] = merged
|
|
_DOCS_CACHE["sig"] = sig
|
|
return merged
|
|
|
|
def _augment_with_external_docs(rows: List[Dict[str, Any]]) -> None:
|
|
mapping = _load_external_docs()
|
|
for r in rows:
|
|
if not r.get("details_md") and isinstance(r.get("extras"), dict):
|
|
dm = r["extras"].get("details_md")
|
|
if isinstance(dm, str) and dm.strip():
|
|
r["details_md"] = dm
|
|
if not r.get("details_md") or not r.get("brief_html") or not r.get("details_html"):
|
|
for key in _row_key_candidates(r):
|
|
if key in mapping and isinstance(mapping[key], dict):
|
|
m = mapping[key]
|
|
if not r.get("details_md") and isinstance(m.get("details_md"), str):
|
|
r["details_md"] = m["details_md"]
|
|
if not r.get("brief_html") and isinstance(m.get("brief_html"), str):
|
|
r["brief_html"] = m["brief_html"]
|
|
if not r.get("details_html") and isinstance(m.get("details_html"), str):
|
|
r["details_html"] = m["details_html"]
|
|
if not r.get("doc_meta"):
|
|
meta = {kk: vv for kk, vv in m.items() if kk not in {"details_md","brief_html","details_html"}}
|
|
if meta:
|
|
r["doc_meta"] = meta
|
|
break
|
|
|
|
|
|
# =============================
|
|
# Merge hybrids with their slash twins
|
|
# =============================
|
|
|
|
def _merge_hybrid_slash(rows: List[Dict[str, Any]]) -> None:
|
|
idx_by_hybrid: Dict[Tuple[str, str], int] = {}
|
|
for i, r in enumerate(rows):
|
|
if r.get("type") == "hybrid":
|
|
cog = (r.get("cog") or "").strip()
|
|
base = str(r.get("name") or "").split(" ")[-1].lower()
|
|
idx_by_hybrid[(cog, base)] = i
|
|
|
|
to_remove: List[int] = []
|
|
for i, r in enumerate(rows):
|
|
if r.get("type") != "slash":
|
|
continue
|
|
base = str(r.get("name") or "").lstrip("/").split("/")[-1].lower()
|
|
cog = (r.get("cog") or "").strip()
|
|
key = (cog, base)
|
|
hi = idx_by_hybrid.get(key)
|
|
if hi is None:
|
|
continue
|
|
h = rows[hi]
|
|
if not h.get("usage_slash") and r.get("usage_slash"):
|
|
h["usage_slash"] = r["usage_slash"]
|
|
if not h.get("help") and r.get("help"):
|
|
h["help"] = r["help"]
|
|
if r.get("moderator_only"):
|
|
h["moderator_only"] = True
|
|
if r.get("admin_only"):
|
|
h["admin_only"] = True
|
|
if r.get("required_permissions"):
|
|
h["required_permissions"] = sorted(set((h.get("required_permissions") or []) + r["required_permissions"]))
|
|
if not h.get("extras") and r.get("extras"):
|
|
h["extras"] = r["extras"]
|
|
if r.get("details_md") and not h.get("details_md"):
|
|
h["details_md"] = r["details_md"]
|
|
if r.get("brief_html") and not h.get("brief_html"):
|
|
h["brief_html"] = r["brief_html"]
|
|
if r.get("details_html") and not h.get("details_html"):
|
|
h["details_html"] = r["details_html"]
|
|
|
|
# NEW: sum exec_count from slash twin into the hybrid row
|
|
h["exec_count"] = int(h.get("exec_count", 0) or 0) + int(r.get("exec_count", 0) or 0)
|
|
|
|
to_remove.append(i)
|
|
|
|
for i in sorted(to_remove, reverse=True):
|
|
rows.pop(i)
|
|
|
|
|
|
# =============================
|
|
# Schema builder
|
|
# =============================
|
|
|
|
def build_command_schema(bot: commands.Bot) -> Dict[str, Any]:
|
|
px = _gather_prefix_and_hybrid(bot)
|
|
sl = _gather_slash(bot)
|
|
all_rows = px + sl
|
|
|
|
# Mark mod-only via hints/perms/extras
|
|
for row in all_rows:
|
|
try:
|
|
helptext = f"{row.get('help') or ''} {row.get('brief') or ''}"
|
|
hl = helptext.lower()
|
|
if "[mod]" in hl:
|
|
row["moderator_only"] = True
|
|
if "[admin]" in hl:
|
|
row["admin_only"] = True
|
|
except Exception:
|
|
pass
|
|
if row.get("required_permissions"):
|
|
row["moderator_only"] = True
|
|
try:
|
|
ex = row.get("extras") or {}
|
|
if isinstance(ex, dict) and str(ex.get("category", "")).lower() in {"mod", "moderator", "staff"}:
|
|
row["moderator_only"] = True
|
|
if isinstance(ex, dict) and str(ex.get("category", "")).lower() in {"admin", "administrator", "owner"}:
|
|
row["admin_only"] = True
|
|
except Exception:
|
|
pass
|
|
|
|
_augment_with_external_docs(all_rows)
|
|
_merge_hybrid_slash(all_rows)
|
|
|
|
# ✅ NEW: sort alphabetically by *command name* (display_name/name), not by cog
|
|
def _sort_key(r: Dict[str, Any]) -> str:
|
|
s = (r.get("display_name") or r.get("name") or "")
|
|
# strip leading slash and normalize
|
|
return s.lstrip("/").strip().lower()
|
|
|
|
all_rows.sort(key=_sort_key)
|
|
|
|
mods = [r for r in all_rows if r.get("moderator_only") or r.get("admin_only")]
|
|
users = [r for r in all_rows if not (r.get("moderator_only") or r.get("admin_only"))]
|
|
|
|
return {
|
|
"title": "ShaiWatcher Commands",
|
|
"count": len(all_rows),
|
|
"sections": {"user": users, "moderator": mods},
|
|
"all": all_rows,
|
|
}
|
|
|
|
# =============================
|
|
# Static asset serving
|
|
# =============================
|
|
|
|
def _static_root() -> Path:
|
|
return _project_root() / "assets" / "docs"
|
|
|
|
def _guess_mime(p: Path) -> str:
|
|
ext = p.suffix.lower()
|
|
return {
|
|
".svg": "image/svg+xml; charset=utf-8",
|
|
".png": "image/png",
|
|
".jpg": "image/jpeg",
|
|
".jpeg": "image/jpeg",
|
|
".webp": "image/webp",
|
|
".gif": "image/gif",
|
|
".css": "text/css; charset=utf-8",
|
|
".js": "application/javascript; charset=utf-8",
|
|
".json": "application/json; charset=utf-8",
|
|
".md": "text/markdown; charset=utf-8",
|
|
".txt": "text/plain; charset=utf-8",
|
|
".ico": "image/x-icon",
|
|
".html": "text/html; charset=utf-8",
|
|
}.get(ext, "application/octet-stream")
|
|
|
|
|
|
# =============================
|
|
# HTTP + UI
|
|
# =============================
|
|
|
|
class _DocsHandler(BaseHTTPRequestHandler):
|
|
bot: commands.Bot = None
|
|
title: str = "ShaiWatcher Commands"
|
|
force_ready: bool = False
|
|
support_url: Optional[str] = None
|
|
support_label: Optional[str] = None
|
|
|
|
def _set(self, code=200, content_type="text/html; charset=utf-8"):
|
|
self.send_response(code)
|
|
self.send_header("Content-Type", content_type)
|
|
self.send_header("Cache-Control", "no-store")
|
|
self.end_headers()
|
|
|
|
def log_message(self, fmt, *args):
|
|
return
|
|
|
|
def _serve_file(self, fs_path: Path, cache: str = "public, max-age=86400, immutable"):
|
|
mime = _guess_mime(fs_path)
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", mime)
|
|
self.send_header("Cache-Control", cache)
|
|
self.end_headers()
|
|
with fs_path.open("rb") as f:
|
|
self.wfile.write(f.read())
|
|
|
|
def do_GET(self):
|
|
try:
|
|
ready = bool(self.force_ready) or (getattr(self, "bot", None) and self.bot.is_ready())
|
|
if not ready:
|
|
self._set(503, "text/plain; charset=utf-8")
|
|
self.wfile.write(b"warming up")
|
|
return
|
|
|
|
path = urlparse(self.path).path
|
|
|
|
# Root favicon / PWA convenience routes
|
|
if path in ("/favicon.ico", "/favicon-16x16.png", "/favicon-32x32.png",
|
|
"/apple-touch-icon.png", "/site.webmanifest"):
|
|
root = _static_root().resolve()
|
|
name = path.lstrip("/")
|
|
fs_path = (root / name).resolve()
|
|
try:
|
|
if not fs_path.is_relative_to(root):
|
|
raise ValueError("outside root")
|
|
except AttributeError:
|
|
if str(root) not in str(fs_path):
|
|
raise ValueError("outside root")
|
|
if fs_path.is_file():
|
|
self._serve_file(fs_path)
|
|
else:
|
|
self._set(404, "text/plain; charset=utf-8"); self.wfile.write(b"not found")
|
|
return
|
|
|
|
# Static assets under /assets/docs/...
|
|
if path.startswith("/assets/docs/"):
|
|
try:
|
|
root = _static_root().resolve()
|
|
rel = path[len("/assets/docs/"):]
|
|
fs_path = (root / rel).resolve()
|
|
try:
|
|
if not fs_path.is_relative_to(root):
|
|
raise ValueError("outside root")
|
|
except AttributeError:
|
|
if str(root) not in str(fs_path):
|
|
raise ValueError("outside root")
|
|
if fs_path.is_file():
|
|
self._serve_file(fs_path)
|
|
return
|
|
self._set(404, "text/plain; charset=utf-8"); self.wfile.write(b"not found")
|
|
return
|
|
except Exception:
|
|
traceback.print_exc()
|
|
self._set(500, "text/plain; charset=utf-8"); self.wfile.write(b"internal error")
|
|
return
|
|
|
|
if path == "/":
|
|
# Load template file
|
|
tpl_path = _static_root() / "cmd.html"
|
|
if tpl_path.is_file():
|
|
html = tpl_path.read_text(encoding="utf-8")
|
|
else:
|
|
# Fallback minimal page if template missing
|
|
html = "<!doctype html><meta charset='utf-8'><title>Docs</title><body><div id='alerts'>Missing assets/docs/cmd.html</div></body>"
|
|
|
|
# Inject data + support placeholders
|
|
try:
|
|
schema = build_command_schema(self.bot)
|
|
inline = json.dumps(_to_primitive(schema), ensure_ascii=False, separators=(",", ":"))
|
|
# Insert __DATA__ just before </head>
|
|
inj = f"<script>window.__DATA__={inline};</script>"
|
|
html = html.replace("</head>", inj + "</head>")
|
|
support_url = getattr(_DocsHandler, "support_url", "") or ""
|
|
support_label = getattr(_DocsHandler, "support_label", "Buy me a ☕")
|
|
vis = "block" if support_url else "none"
|
|
html = (html.replace("__TITLE__", self.title)
|
|
.replace("__SUPPORT_URL__", support_url)
|
|
.replace("__SUPPORT_LABEL__", support_label)
|
|
.replace("__SUPPORT_VIS__", vis))
|
|
self._set()
|
|
self.wfile.write(html.encode("utf-8"))
|
|
except Exception:
|
|
traceback.print_exc()
|
|
self._set(500, "text/plain; charset=utf-8")
|
|
self.wfile.write(b"internal error")
|
|
return
|
|
|
|
if path == "/api/status":
|
|
payload = _status_payload(self.bot)
|
|
self._set(200, "application/json; charset=utf-8")
|
|
self.wfile.write(_json_dumps_safe(payload))
|
|
return
|
|
|
|
if path == "/healthz":
|
|
self._set(200, "text/plain; charset=utf-8"); self.wfile.write(b"ok"); return
|
|
|
|
if path == "/api/commands":
|
|
schema = build_command_schema(self.bot)
|
|
payload = _json_dumps_safe(schema)
|
|
self._set(200, "application/json; charset=utf-8")
|
|
self.wfile.write(payload)
|
|
return
|
|
|
|
self._set(404, "text/plain; charset=utf-8"); self.wfile.write(b"not found")
|
|
except Exception:
|
|
traceback.print_exc()
|
|
try:
|
|
self._set(500, "text/plain; charset=utf-8"); self.wfile.write(b"internal error")
|
|
except Exception:
|
|
pass
|
|
|
|
def _start_server(bot: commands.Bot, host: str, port: int, title: str):
|
|
_DocsHandler.bot = bot
|
|
_DocsHandler.title = title
|
|
_DocsHandler.support_url = getattr(bot, "docs_support_url", None)
|
|
_DocsHandler.support_label = getattr(bot, "docs_support_label", None)
|
|
_DocsHandler.force_ready = os.getenv("SHAI_OFFLINE", "").lower() in {"1", "true", "yes"}
|
|
httpd = ThreadingHTTPServer((host, port), _DocsHandler)
|
|
|
|
def _run():
|
|
try:
|
|
httpd.serve_forever(poll_interval=0.5)
|
|
except Exception:
|
|
traceback.print_exc()
|
|
|
|
t = threading.Thread(target=_run, name="DocsSite", daemon=True)
|
|
t.start()
|
|
print(f"[DocsSite] Listening on http://{host}:{port} (title='{title}', offline={_DocsHandler.force_ready})")
|
|
|
|
class DocsSite(commands.Cog):
|
|
"""Tiny Swagger-like docs site for bot commands."""
|
|
|
|
def __init__(self, bot: commands.Bot):
|
|
self.bot = bot
|
|
r = cfg(bot)
|
|
|
|
self.host = r.get("docs_host", "0.0.0.0") # SHAI_DOCS_HOST
|
|
self.port = r.int("docs_port", 8910) # SHAI_DOCS_PORT
|
|
self.title = r.get("docs_title", "ShaiWatcher Commands") # SHAI_DOCS_TITLE
|
|
|
|
# Support link config
|
|
self.support_url = r.get("docs_support_url", "https://throne.com/ookamikuntv/item/39590391-c582-4c5d-8795-fe6f1925eaae")
|
|
self.support_label = r.get("docs_support_label", "Buy me a ☕")
|
|
|
|
# Expose to handler
|
|
self.bot.docs_support_url = self.support_url
|
|
self.bot.docs_support_label = self.support_label
|
|
|
|
_start_server(self.bot, self.host, self.port, self.title)
|
|
|
|
def force_ready(self, value: bool = True):
|
|
_DocsHandler.force_ready = bool(value)
|
|
|
|
async def setup(bot: commands.Bot):
|
|
await bot.add_cog(DocsSite(bot))
|