253 lines
9.2 KiB
Python
253 lines
9.2 KiB
Python
# modules/common/boot_notice.py
|
|
import os
|
|
import re
|
|
import html
|
|
import json
|
|
import discord
|
|
import aiohttp
|
|
import xml.etree.ElementTree as ET
|
|
from urllib.parse import urlparse
|
|
from modules.common.settings import cfg
|
|
|
|
# ---------------- Utilities ----------------
|
|
|
|
def _strip_html_keep_text(s: str) -> str:
|
|
"""Remove HTML tags, unescape entities, collapse excessive blank lines."""
|
|
if not s:
|
|
return ""
|
|
s = re.sub(r'(?i)<\s*br\s*/?\s*>', '\n', s)
|
|
s = re.sub(r'(?i)</\s*p\s*>', '\n', s)
|
|
s = re.sub(r'(?i)<\s*p\s*>', '', s)
|
|
s = re.sub(r'<[^>]+>', '', s)
|
|
s = html.unescape(s)
|
|
s = '\n'.join(line.rstrip() for line in s.splitlines())
|
|
s = re.sub(r'\n{3,}', '\n\n', s).strip()
|
|
return s
|
|
|
|
def _build_status_line(status: str, old_v: str, new_v: str, desc: str) -> str | None:
|
|
status = (status or "").strip()
|
|
old_v = (old_v or "").strip()
|
|
new_v = (new_v or "").strip()
|
|
|
|
if status == "fetched_new":
|
|
line = f"✅ Booted new version: v{old_v or '0.0.0.0'} → **v{new_v}**"
|
|
elif status == "cached_no_update":
|
|
line = f"🟢 Booted cached version: **v{new_v}** — no new update found"
|
|
elif status == "cache_only_error":
|
|
line = f"🟡 Booted cached version: **v{new_v}** — repository not accessible"
|
|
elif status == "scheduled_restart":
|
|
line = "🕒 Scheduled restart executed"
|
|
else:
|
|
return None
|
|
return f"{line}\n_{desc.strip()}_" if desc else line
|
|
|
|
def _only_version_and_details(subject: str | None, body: str | None) -> str | None:
|
|
"""Format to 'Version number' (bold) + 'Version details' (md)."""
|
|
if not subject and not body:
|
|
return None
|
|
version = None
|
|
if subject:
|
|
m = re.search(r'\bv?(\d+\.\d+(?:\.\d+){0,2})\b', subject)
|
|
version = m.group(0) if m else subject.strip()
|
|
if version and body:
|
|
return f"**{version}**\n{body.strip()}"
|
|
if version:
|
|
return f"**{version}**"
|
|
return body.strip() if body else None
|
|
|
|
# ---------------- RSS helpers ----------------
|
|
|
|
async def _fetch_latest_rss_item(rss_url: str):
|
|
"""
|
|
Return (title:str|None, body:str|None, link:str|None) from newest item.
|
|
Gitea RSS often only has the first line in <title>/<description>.
|
|
"""
|
|
try:
|
|
timeout = aiohttp.ClientTimeout(total=8)
|
|
async with aiohttp.ClientSession(timeout=timeout) as sess:
|
|
async with sess.get(rss_url) as resp:
|
|
if resp.status != 200:
|
|
return None, None, None
|
|
text = await resp.text()
|
|
root = ET.fromstring(text)
|
|
item = root.find('./channel/item')
|
|
if item is None:
|
|
return None, None, None
|
|
title = (item.findtext('title') or '').strip()
|
|
desc_raw = (item.findtext('description') or '').strip()
|
|
body = _strip_html_keep_text(desc_raw) or None
|
|
link = (item.findtext('link') or '').strip() or None
|
|
|
|
# If title looks like noise ("pushed", etc.), prefer body's first line
|
|
if title and re.search(r'\b(pushed|commit|committed)\b', title, re.I):
|
|
first = (body.splitlines()[0].strip() if body else "") or ""
|
|
if first:
|
|
title = first
|
|
|
|
title = _strip_html_keep_text(title) or None
|
|
return title, body, link
|
|
except Exception:
|
|
return None, None, None
|
|
|
|
def _parse_gitea_link_for_api(link: str):
|
|
"""
|
|
From a Gitea commit link like:
|
|
https://git.example.com/owner/repo/commit/abcdef...
|
|
derive:
|
|
api_base: https://git.example.com/api/v1
|
|
owner: owner
|
|
repo: repo
|
|
sha: abcdef...
|
|
"""
|
|
try:
|
|
pr = urlparse(link)
|
|
parts = [p for p in pr.path.split('/') if p]
|
|
# Expect: [owner, repo, 'commit', sha]
|
|
if len(parts) >= 4 and parts[2] == 'commit':
|
|
owner, repo, sha = parts[0], parts[1], parts[3]
|
|
api_base = f"{pr.scheme}://{pr.netloc}/api/v1"
|
|
return api_base, owner, repo, sha
|
|
except Exception:
|
|
pass
|
|
return None, None, None, None
|
|
|
|
async def _fetch_gitea_commit_message(commit_link: str, token: str | None):
|
|
"""
|
|
Ask Gitea API for the full commit message.
|
|
Try both endpoints:
|
|
1) /api/v1/repos/{owner}/{repo}/git/commits/{sha}
|
|
2) /api/v1/repos/{owner}/{repo}/commits/{sha}
|
|
Return full_message:str|None on success.
|
|
"""
|
|
api_base, owner, repo, sha = _parse_gitea_link_for_api(commit_link)
|
|
if not all([api_base, owner, repo, sha]):
|
|
return None
|
|
|
|
headers = {}
|
|
if token:
|
|
headers['Authorization'] = f'token {token}'
|
|
|
|
timeout = aiohttp.ClientTimeout(total=8)
|
|
async with aiohttp.ClientSession(timeout=timeout, headers=headers) as sess:
|
|
# 1) git/commits
|
|
url1 = f"{api_base}/repos/{owner}/{repo}/git/commits/{sha}"
|
|
try:
|
|
async with sess.get(url1) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
# Common shapes: {"message": "..."} or {"commit":{"message":"..."}}
|
|
msg = data.get('message') if isinstance(data, dict) else None
|
|
if not msg and isinstance(data, dict):
|
|
commit = data.get('commit') or {}
|
|
msg = commit.get('message')
|
|
if msg:
|
|
return str(msg)
|
|
except Exception:
|
|
pass
|
|
|
|
# 2) commits
|
|
url2 = f"{api_base}/repos/{owner}/{repo}/commits/{sha}"
|
|
try:
|
|
async with sess.get(url2) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
msg = None
|
|
if isinstance(data, dict):
|
|
# Gitea returns {"commit":{"message":"..."}, ...}
|
|
commit = data.get('commit') or {}
|
|
msg = commit.get('message') or data.get('message')
|
|
if msg:
|
|
return str(msg)
|
|
except Exception:
|
|
pass
|
|
|
|
return None
|
|
|
|
def _split_subject_body(full_message: str):
|
|
"""
|
|
Split a full commit message into (subject, body).
|
|
Subject = first non-empty line; body = rest (preserve markdown).
|
|
"""
|
|
if not full_message:
|
|
return None, None
|
|
lines = [ln.rstrip() for ln in full_message.splitlines()]
|
|
# find first non-empty line
|
|
subject = None
|
|
i = 0
|
|
while i < len(lines) and subject is None:
|
|
if lines[i].strip():
|
|
subject = lines[i].strip()
|
|
i += 1
|
|
body = '\n'.join(lines[i:]).strip() if i < len(lines) else ''
|
|
return subject or None, (body or None)
|
|
|
|
# ---------------- Main entry ----------------
|
|
|
|
async def post_boot_notice(bot):
|
|
"""
|
|
Posts concise boot status to the modlog channel.
|
|
- Always: one status line (if SHAI_BOOT_STATUS is set to a known value).
|
|
- If SHAI_BOOT_STATUS == 'fetched_new': fetch **full commit message**.
|
|
• Prefer Gitea API (requires only the commit link from RSS).
|
|
• Fallback to RSS subject/description if API fails.
|
|
Then post ONLY: Version number (bold) + markdown details.
|
|
"""
|
|
status = os.getenv("SHAI_BOOT_STATUS", "").strip() # fetched_new | cached_no_update | cache_only_error | scheduled_restart | ''
|
|
desc = os.getenv("SHAI_BOOT_DESC", "").strip()
|
|
old_v = os.getenv("SHAI_BOOT_OLD", "").strip()
|
|
new_v = os.getenv("SHAI_BOOT_NEW", "").strip()
|
|
rss = os.getenv("SHAI_REPO_RSS", "").strip()
|
|
token = os.getenv("SHAI_GITEA_TOKEN", "").strip() or None # optional
|
|
|
|
status_line = _build_status_line(status, old_v, new_v, desc)
|
|
if not status_line:
|
|
return # nothing to say
|
|
|
|
# Resolve modlog channel
|
|
modlog_channel_id = cfg(bot).int('modlog_channel_id', 0)
|
|
if not modlog_channel_id:
|
|
return
|
|
ch = None
|
|
for g in bot.guilds:
|
|
ch = g.get_channel(modlog_channel_id)
|
|
if ch:
|
|
break
|
|
if not ch:
|
|
return
|
|
|
|
# 1) Post the status line
|
|
try:
|
|
await ch.send(status_line, allowed_mentions=discord.AllowedMentions.none())
|
|
except Exception:
|
|
return
|
|
|
|
# 2) If updated, post ONLY the commit message (version + details)
|
|
if status == "fetched_new" and rss:
|
|
subj, body, link = await _fetch_latest_rss_item(rss)
|
|
|
|
# Try to get the full commit message from Gitea API using the link's SHA
|
|
full_msg = None
|
|
if link:
|
|
full_msg = await _fetch_gitea_commit_message(link, token)
|
|
|
|
# If API failed, fall back to what RSS gave us
|
|
if not full_msg:
|
|
# if RSS body is a single-line/empty, we only have subject anyway
|
|
if body and subj and body.startswith(subj):
|
|
# avoid duplicate subject at start
|
|
body_trimmed = body[len(subj):].lstrip()
|
|
full_msg = f"{subj}\n{body_trimmed}".strip() if body_trimmed else subj
|
|
else:
|
|
combined = "\n".join(x for x in [subj or "", body or ""] if x).strip()
|
|
full_msg = combined or (subj or body or "")
|
|
|
|
# Split into subject/body and format as Version + Details
|
|
c_subject, c_body = _split_subject_body(full_msg or "")
|
|
commit_msg = _only_version_and_details(c_subject, c_body)
|
|
|
|
if commit_msg:
|
|
try:
|
|
await ch.send(commit_msg, allowed_mentions=discord.AllowedMentions.none())
|
|
except Exception:
|
|
pass
|