diff --git a/bot.py b/bot.py index a96f3bd..f066d74 100644 --- a/bot.py +++ b/bot.py @@ -9,7 +9,7 @@ from modules.common.boot_notice import post_boot_notice # Version consists of: # Major.Enhancement.Minor.Patch.Test (Test is alphanumeric; doesn’t trigger auto update) -VERSION = "0.3.9.5.a3" +VERSION = "0.3.9.5.a4" # ---------- Env loading ---------- diff --git a/modules/common/boot_notice.py b/modules/common/boot_notice.py index aad0260..13741c0 100644 --- a/modules/common/boot_notice.py +++ b/modules/common/boot_notice.py @@ -1,18 +1,16 @@ # modules/common/boot_notice.py import os import re -import base64 -import json import time from datetime import datetime, timezone -from urllib.parse import urlparse +from urllib.parse import urlparse, urlencode import discord import aiohttp from modules.common.settings import cfg -# ---------------- Version helpers ---------------- +# ---------- Version helpers ---------- _VERSION_RE = re.compile(r'\b\d+\.\d+\.\d+\.\d+(?:\.[A-Za-z0-9]+)?\b') @@ -26,7 +24,6 @@ def _split_subject_body(full_message: str) -> tuple[str | None, str | None]: if not full_message: return None, None lines = [ln.rstrip() for ln in full_message.splitlines()] - # subject = first non-empty line subject = None i = 0 while i < len(lines) and subject is None: @@ -37,19 +34,14 @@ def _split_subject_body(full_message: str) -> tuple[str | None, str | None]: return subject or None, (body or None) def _cmp_versions(a: str | None, b: str | None) -> int: - """ - Compare versions like 1.2.3.4.a2 (last segment may be alnum). - Returns -1 if ab. - If either is None, treat as equal to avoid false positives. - """ + """Compare 1.2.3.4.a2 style; if either missing, treat as equal (0).""" if not a or not b: return 0 - pa = a.split('.') - pb = b.split('.') + pa, pb = a.split('.'), b.split('.') while len(pa) < 5: pa.append('0') while len(pb) < 5: pb.append('0') - def part_key(x: str): + def key(x: str): if x.isdigit(): return (int(x), '', 1) m = re.match(r'(\d+)(.*)', x) @@ -58,7 +50,7 @@ def _cmp_versions(a: str | None, b: str | None) -> int: return (0, x, 3) for xa, xb in zip(pa, pb): - ka, kb = part_key(xa), part_key(xb) + ka, kb = key(xa), key(xb) if ka[0] != kb[0]: return 1 if ka[0] > kb[0] else -1 if ka[2] != kb[2]: @@ -67,7 +59,7 @@ def _cmp_versions(a: str | None, b: str | None) -> int: return 1 if ka[1] > kb[1] else -1 return 0 -# ---------------- Gitea helpers ---------------- +# ---------- Gitea helpers ---------- def _parse_repo_url(repo_url: str) -> tuple[str | None, str | None, str | None]: """ @@ -75,11 +67,10 @@ def _parse_repo_url(repo_url: str) -> tuple[str | None, str | None, str | None]: api_base = https://host/api/v1 """ try: - pr = urlparse(repo_url) + pr = urlparse(repo_url.strip().rstrip('/')) parts = [p for p in pr.path.split('/') if p] if len(parts) >= 2: - owner = parts[0] - repo = parts[1] + owner, repo = parts[0], parts[1] if repo.endswith('.git'): repo = repo[:-4] api_base = f"{pr.scheme}://{pr.netloc}/api/v1" @@ -88,68 +79,74 @@ def _parse_repo_url(repo_url: str) -> tuple[str | None, str | None, str | None]: pass return None, None, None -async def _gitea_get_json(url: str, token: str | None, user: str | None, timeout_sec: int = 10): - headers = {} - if token and user: - # Basic auth with user:token - cred = base64.b64encode(f"{user}:{token}".encode()).decode() - headers['Authorization'] = f"Basic {cred}" - elif token: - headers['Authorization'] = f"token {token}" +def _auth_headers_from_cfg(r): + """ + Build Authorization header using SHAI_REPO_AHTOKEN (cfg: repo_ahtoken). + The value may be raw; we prefix 'token ' if needed. + Also supports SHAI_GITEA_TOKEN / SHAI_GITEA_USER as secondary. + """ + ahtoken = r.get('repo_ahtoken', '').strip() # SHAI_REPO_AHTOKEN + if ahtoken: + if not ahtoken.lower().startswith('token '): + ahtoken = f"token {ahtoken}" + return {"Authorization": ahtoken} + # Optional secondary envs for future private usage + tok = os.getenv("SHAI_GITEA_TOKEN", "").strip() + usr = os.getenv("SHAI_GITEA_USER", "").strip() + if tok and usr: + import base64 + b64 = base64.b64encode(f"{usr}:{tok}".encode()).decode() + return {"Authorization": f"Basic {b64}"} + if tok: + return {"Authorization": f"token {tok}"} + return {} + +async def _http_json(url: str, headers: dict, timeout_sec: int = 10): timeout = aiohttp.ClientTimeout(total=timeout_sec) - async with aiohttp.ClientSession(timeout=timeout, headers=headers) as sess: + async with aiohttp.ClientSession(timeout=timeout, headers=headers or {}) as sess: async with sess.get(url) as resp: + ctype = resp.headers.get("Content-Type", "") if resp.status != 200: text = await resp.text() - raise RuntimeError(f"Gitea GET {url} -> {resp.status}: {text[:200]}") - # Gitea returns either dict or list depending on endpoint - ctype = resp.headers.get("Content-Type", "") + raise RuntimeError(f"Gitea GET {url} -> {resp.status} ({ctype}): {text[:200]}") if "application/json" not in ctype: text = await resp.text() - raise RuntimeError(f"Gitea GET {url} non-JSON: {ctype} {text[:200]}") + raise RuntimeError(f"Gitea GET {url} non-JSON {ctype}: {text[:200]}") return await resp.json() -async def _fetch_latest_commit(api_base: str, owner: str, repo: str, branch: str, - token: str | None, user: str | None) -> tuple[str | None, str | None, str | None]: +async def _fetch_latest_commit(api_base: str, owner: str, repo: str, branch: str | None, + headers: dict) -> tuple[str | None, str | None, str | None]: """ - Returns (sha, subject, body) for the latest commit on branch using the - list-commits endpoint which includes the full commit message: - /api/v1/repos/{owner}/{repo}/commits?sha={branch}&limit=1&stat=false&verification=false&files=false + Returns (sha, subject, body) for the latest commit using the list-commits endpoint: + /api/v1/repos/{owner}/{repo}/commits?sha=main&stat=false&verification=false&files=false&limit=1 + If branch is falsy, omits 'sha' and lets server default. """ - url = ( - f"{api_base}/repos/{owner}/{repo}/commits" - f"?sha={branch}&limit=1&stat=false&verification=false&files=false" - ) - data = await _gitea_get_json(url, token, user) + params = { + "stat": "false", + "verification": "false", + "files": "false", + "limit": "1", + } + if branch: + params["sha"] = branch + url = f"{api_base}/repos/{owner}/{repo}/commits?{urlencode(params)}" + + data = await _http_json(url, headers) if not isinstance(data, list) or not data: - raise RuntimeError("Commits list empty or invalid.") + raise RuntimeError("Commits list empty or invalid") + latest = data[0] sha = latest.get("sha") or latest.get("id") message = "" commit_obj = latest.get("commit") or {} if isinstance(commit_obj, dict): message = commit_obj.get("message") or "" - if not message: - # Extremely unlikely on this endpoint, but try secondary fetch - if sha: - for endpoint in ( - f"{api_base}/repos/{owner}/{repo}/git/commits/{sha}", - f"{api_base}/repos/{owner}/{repo}/commits/{sha}", - ): - try: - det = await _gitea_get_json(endpoint, token, user) - if isinstance(det, dict): - m = det.get("message") or det.get("commit", {}).get("message") - if m: - message = str(m) - break - except Exception: - continue - subject, body = _split_subject_body(message) + + subject, body = _split_subject_body(message or "") return sha, (subject or ""), (body or "") -# ---------------- Boot reason inference ---------------- +# ---------- Boot reason inference ---------- def _is_near_scheduled(now_utc: datetime, hhmm_utc: str | None, window_min: int = 5) -> bool: if not hhmm_utc: @@ -159,8 +156,7 @@ def _is_near_scheduled(now_utc: datetime, hhmm_utc: str | None, window_min: int except Exception: return False sched = now_utc.replace(hour=hh, minute=mm, second=0, microsecond=0) - delta = abs((now_utc - sched).total_seconds()) - return delta <= window_min * 60 + return abs((now_utc - sched).total_seconds()) <= window_min * 60 def _format_status_line(kind: str, old_ver: str | None, new_ver: str | None) -> str: if kind == "updated": @@ -173,25 +169,20 @@ def _format_status_line(kind: str, old_ver: str | None, new_ver: str | None) -> return f"⚠️ Version rollback detected: **{old_ver or 'unknown'}** → **{new_ver or 'unknown'}**" return "🟢 Bot started" -# ---------------- Main entry ---------------- +# ---------- Main entry ---------- async def post_boot_notice(bot): """ - Always post a boot status to the modlog channel. - - Waits for ready. - - Infers reason (updated/scheduled/manual/rollback). - - Fetches latest commit (full message) via list-commits endpoint. - - Posts status + commit message (Version bold + md details). - - Pings guild owner only on rollback. - - Persists last sha/version in data_manager['boot_state']. + Always posts a startup status + the latest commit message (Version + md body) + to the modlog channel. Uses SHAI_REPO_URL and SHAI_REPO_AHTOKEN via cfg(). """ try: await bot.wait_until_ready() except Exception as e: print(f"[boot_notice] wait_until_ready failed: {e}") - # Resolve modlog channel - modlog_channel_id = cfg(bot).int('modlog_channel_id', 0) + r = cfg(bot) + modlog_channel_id = r.int('modlog_channel_id', 0) if not modlog_channel_id: print("[boot_notice] modlog_channel_id not configured; skipping.") return @@ -206,37 +197,34 @@ async def post_boot_notice(bot): print(f"[boot_notice] channel id {modlog_channel_id} not found; skipping.") return - # Repo info - r = cfg(bot) - repo_url = r.get('repo_url', '') - branch = r.get('repo_branch', 'main') - check_time_utc = r.get('check_time_utc', '') # e.g., "03:00" - now_utc = datetime.now(timezone.utc) + repo_url = r.get('repo_url', '') # SHAI_REPO_URL + # Branch optional; if empty, we omit 'sha=' + branch = r.get('repo_branch', 'main') or None # SHAI_REPO_BRANCH (optional) + check_time_utc = r.get('check_time_utc', '') # SHAI_CHECK_TIME_UTC (optional, e.g. "03:00") + headers = _auth_headers_from_cfg(r) api_base = owner = repo = None if repo_url: api_base, owner, repo = _parse_repo_url(repo_url) + if not all([api_base, owner, repo]): + print(f"[boot_notice] failed to parse repo_url={repo_url!r}") else: print("[boot_notice] repo_url missing; commit lookup skipped.") - token = os.getenv("SHAI_GITEA_TOKEN", "").strip() or None - user = os.getenv("SHAI_GITEA_USER", "").strip() or None - # State dm = getattr(bot, "data_manager", None) if not dm: print("[boot_notice] data_manager missing on bot; cannot persist state.") return - prev = (dm.get('boot_state') or [{}])[-1] if dm.get('boot_state') else {} prev_sha = prev.get('last_sha') or None prev_ver = prev.get('last_version') or None - # Fetch latest commit (sha, subject, body) + # Fetch latest commit sha = subject = body = None if api_base and owner and repo: try: - sha, subject, body = await _fetch_latest_commit(api_base, owner, repo, branch, token, user) + sha, subject, body = await _fetch_latest_commit(api_base, owner, repo, branch, headers) except Exception as e: print(f"[boot_notice] fetch latest commit failed: {e}") else: @@ -245,34 +233,31 @@ async def post_boot_notice(bot): curr_ver = _extract_version(subject) if subject else None # Decide reason - reason = "manual" - mention_owner = False - + now_utc = datetime.now(timezone.utc) if prev_ver and curr_ver: cmpv = _cmp_versions(prev_ver, curr_ver) if cmpv < 0: - reason = "updated" + reason, ping_owner = "updated", False elif cmpv > 0: - reason = "rollback" - mention_owner = True + reason, ping_owner = "rollback", True else: - reason = "scheduled" if _is_near_scheduled(now_utc, check_time_utc) else "manual" + reason, ping_owner = ("scheduled" if _is_near_scheduled(now_utc, check_time_utc) else "manual"), False else: if prev_sha and sha and prev_sha != sha: - reason = "updated" + reason, ping_owner = "updated", False else: - reason = "scheduled" if _is_near_scheduled(now_utc, check_time_utc) else "manual" + reason, ping_owner = ("scheduled" if _is_near_scheduled(now_utc, check_time_utc) else "manual"), False - # Post status line + # Post status status_line = _format_status_line(reason, prev_ver, curr_ver) try: allowed = discord.AllowedMentions( everyone=False, - users=True if mention_owner else False, + users=True if (ping_owner and ch.guild and ch.guild.owner_id) else False, roles=False, replied_user=False ) - if mention_owner and ch.guild and ch.guild.owner_id: + if ping_owner and ch.guild and ch.guild.owner_id: status_line = f"{status_line}\n<@{ch.guild.owner_id}>" await ch.send(status_line, allowed_mentions=allowed) except Exception as e: @@ -281,7 +266,7 @@ async def post_boot_notice(bot): # Post commit message (Version + md details) try: - title = curr_ver or (subject or "Latest commit") + title = (curr_ver or subject or "Latest commit").strip() if title or body: commit_msg = f"**{title}**\n{body}" if body else f"**{title}**" await ch.send(commit_msg, allowed_mentions=discord.AllowedMentions.none()) @@ -290,12 +275,11 @@ async def post_boot_notice(bot): # Persist state try: - new_state = { + dm.add('boot_state', { 'last_sha': sha, 'last_version': curr_ver, 'last_subject': subject, 'last_boot_ts': time.time(), - } - dm.add('boot_state', new_state) + }) except Exception as e: print(f"[boot_notice] failed to persist boot_state: {e}")