shaiwatcher/modules/user_cards/user_cards.py
Franz Rolfsvaag aab931b543 0.3.9.7.a5
- Forgot some command labels for some mod commands
- `/recreate` -> `/recreate_nick_review` for clarity
  - This recreates reviews for a single user, if applicable
  - The plural command still affects all applicable users as normal
2025-08-11 09:58:44 +02:00

530 lines
21 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# modules/user_cards/user_cards.py
import asyncio
import time
from typing import Optional, Set, Tuple
import discord
from discord.ext import commands
from modules.common.emoji_accept import is_accept
from modules.common.settings import cfg # ENV-first helper
CHECK = '' # verified
CROSS = '' # not done
PENDING = '✔️' # claimed / pending review
ACCEPT = {CHECK, '🫡'}
NO_MENTIONS = discord.AllowedMentions.none()
class UserCardsCog(commands.Cog):
"""
Per-user status cards with live reconcile and offline review triggers.
Title = Nickname → Display → Global → Username (adds '*' if NO server nickname)
Fields: Rules / RoE = ✅/❌, Nickname = ❌ | ✔️(pending) | ✅(verified)
Colors:
- Green: Rules✅ & RoE✅ & Nickname✅
- Blue: Rules✅ & RoE✅ & Nickname✔ (pending)
- Yellow: Some/partial
- Red: None
"""
def __init__(self, bot):
self.bot = bot
r = cfg(bot)
# Channels / IDs from ENV/INI
self.userslist_channel_id = r.int('userslist_channel_id', 0)
self.modlog_channel_id = r.int('modlog_channel_id', 0)
self.mod_channel_id = r.int('mod_channel_id', 0)
# reaction-role authoritative messages/roles
self.rules_msg_id = r.int('rules_message_id', 0)
self.engage_msg_id = r.int('engagement_message_id', 0)
self.nick_msg_id = r.int('nickname_message_id', 0)
self.rules_role_id = r.int('rules_role_id', 0)
self.engage_role_id = r.int('engagement_role_id', 0)
self.full_access_role_id = r.int('full_access_role_id', 0)
self._refresh_locks = {} # per-user locks to avoid racey double-posts
# Optional periodic refresh (twice a day)
self.cron_enabled = r.bool('user_cards_cron_enabled', False)
self._cron_task = asyncio.create_task(self._periodic_refresh()) if self.cron_enabled else None
self._startup_task = asyncio.create_task(self._startup_reconcile())
def cog_unload(self):
for t in (self._cron_task, self._startup_task):
if t:
try:
t.cancel()
except Exception:
pass
# ---------- status helpers ----------
def _rules_ok(self, uid: int) -> bool:
return uid in self.bot.data_manager.get('agreed_rules')
def _engage_ok(self, uid: int) -> bool:
return uid in self.bot.data_manager.get('agreed_engagement')
def _nick_status(self, uid: int) -> str:
dm = self.bot.data_manager
if uid in dm.get('nick_verified'):
return 'verified'
if uid in dm.get('nick_claim_pending') or uid in dm.get('agreed_nickname'):
return 'pending'
return 'none'
def _primary_name(self, m: discord.Member) -> str:
has_nick = bool(m.nick and m.nick.strip())
base = m.nick or m.display_name or m.global_name or m.name
if not has_nick:
base = f"{base} *"
return base
def _secondary_name(self, m: discord.Member) -> str:
return m.global_name or m.name
def _card_color(self, uid: int) -> discord.Color:
r = self._rules_ok(uid)
e = self._engage_ok(uid)
n = self._nick_status(uid)
if r and e and n == 'verified':
return discord.Color.green()
if r and e and n == 'pending':
return discord.Color.blurple()
if r or e or n != 'none':
return discord.Color.yellow()
return discord.Color.red()
# ---------- persistence ----------
def _get_card_record(self, user_id: int):
for r in self.bot.data_manager.get('user_cards'):
if r.get('user_id') == user_id:
return r
return None
async def _build_embed(self, member: discord.Member) -> discord.Embed:
uid = member.id
rules = CHECK if self._rules_ok(uid) else CROSS
roe = CHECK if self._engage_ok(uid) else CROSS
ns = self._nick_status(uid)
if ns == 'verified':
nick_val = CHECK
elif ns == 'pending':
nick_val = PENDING
else:
nick_val = CROSS
title_left = self._primary_name(member)
title_right = self._secondary_name(member)
title = f"{title_left} ({title_right})"
join_val = f"<t:{int(member.joined_at.timestamp())}:R>" if member.joined_at else "Unknown"
embed = discord.Embed(title=title, color=self._card_color(uid))
embed.add_field(name="Rules", value=rules)
embed.add_field(name="RoE", value=roe)
embed.add_field(name="Nickname Set", value=nick_val)
embed.add_field(name="Joined", value=join_val, inline=False)
if member.avatar:
embed.set_thumbnail(url=member.avatar.url)
# Stable identity so we can find/edit the right card later
embed.set_footer(text=f"UID:{member.id}")
return embed
async def refresh_card(self, member: discord.Member):
"""
Idempotent refresh:
1) Try to edit the tracked message.
2) If not found, search the channel by footer marker and edit that.
3) If still not found, post a new one, then delete any stragglers with the same marker.
"""
if not member or not member.guild or not self.userslist_channel_id:
return
async with self._lock_for(member.id):
dm = self.bot.data_manager
channel: discord.TextChannel = member.guild.get_channel(self.userslist_channel_id)
if not channel:
return
record = self._get_card_record(member.id)
embed = await self._build_embed(member)
async def _fetch_in(ch_id: int, mid: int) -> Optional[discord.Message]:
ch = member.guild.get_channel(ch_id)
if not ch:
return None
try:
return await ch.fetch_message(mid)
except Exception:
return None
# 1) Attempt to edit the exact tracked message (configured channel, then stored channel)
msg = None
stored_ch_id = int(record.get('channel_id')) if record and record.get('channel_id') else None
if record and record.get('message_id'):
mid = int(record['message_id'])
msg = await _fetch_in(channel.id, mid)
if not msg and stored_ch_id and stored_ch_id != channel.id:
msg = await _fetch_in(stored_ch_id, mid)
# 2) If we couldnt find by record, try to find by footer marker in channel history
if not msg:
msg = await self._find_existing_card(channel, member.id)
# 3) Edit in place if found
if msg:
try:
await msg.edit(embed=embed, allowed_mentions=NO_MENTIONS)
except Exception:
pass
# upsert record with the definitive location
dm.remove('user_cards', lambda r: r.get('user_id') == member.id)
dm.add('user_cards', {'user_id': member.id, 'message_id': msg.id, 'channel_id': msg.channel.id})
return
# 4) Post fresh card
try:
new_msg = await channel.send(embed=embed, allowed_mentions=NO_MENTIONS)
except Exception:
return
# 5) Clean up any other messages that look like this user's card
try:
marker = f"UID:{member.id}"
async for m in channel.history(limit=400, oldest_first=False):
if m.id == new_msg.id or m.author.id != self.bot.user.id or not m.embeds:
continue
foot = (m.embeds[0].footer.text or "") if m.embeds[0].footer else ""
if foot == marker:
try:
await m.delete()
except Exception:
pass
except Exception:
pass
# 6) Persist mapping
dm.remove('user_cards', lambda r: r.get('user_id') == member.id)
dm.add('user_cards', {'user_id': member.id, 'message_id': new_msg.id, 'channel_id': new_msg.channel.id})
def _lock_for(self, user_id: int) -> asyncio.Lock:
lk = self._refresh_locks.get(user_id)
if not lk:
lk = asyncio.Lock()
self._refresh_locks[user_id] = lk
return lk
async def _find_existing_card(self, channel: discord.TextChannel, user_id: int) -> Optional[discord.Message]:
"""Search recent history for a card we posted for this user (by footer marker)."""
marker = f"UID:{user_id}"
try:
async for m in channel.history(limit=400, oldest_first=False):
if m.author.id != self.bot.user.id:
continue
if not m.embeds:
continue
foot = (m.embeds[0].footer.text or "") if m.embeds[0].footer else ""
if foot == marker:
return m
except Exception:
pass
return None
async def _log(self, guild: discord.Guild, content: str):
if not self.modlog_channel_id:
return
ch = guild.get_channel(self.modlog_channel_id)
if ch:
try:
await ch.send(content, allowed_mentions=NO_MENTIONS)
except Exception:
pass
self.bot.data_manager.add('modlog', {'guild_id': guild.id, 'content': content})
# ---------- RR message lookup & reactor collection ----------
def _get_cached_msg_channel_id(self, guild_id: int, message_id: int) -> Optional[int]:
for r in self.bot.data_manager.get('rr_msg_channels'):
if r.get('guild_id') == guild_id and r.get('message_id') == message_id:
return int(r.get('channel_id'))
return None
def _cache_msg_channel_id(self, guild_id: int, message_id: int, channel_id: int):
dm = self.bot.data_manager
dm.remove('rr_msg_channels', lambda r: r.get('guild_id') == guild_id and r.get('message_id') == message_id)
dm.add('rr_msg_channels', {'guild_id': guild_id, 'message_id': int(message_id), 'channel_id': int(channel_id)})
async def _get_message_by_id(self, guild: discord.Guild, message_id: int) -> Optional[discord.Message]:
if not message_id:
return None
ch_id = self._get_cached_msg_channel_id(guild.id, message_id)
if ch_id:
ch = guild.get_channel(ch_id)
if ch:
try:
return await ch.fetch_message(message_id)
except Exception:
pass
for ch in guild.text_channels:
try:
msg = await ch.fetch_message(message_id)
self._cache_msg_channel_id(guild.id, message_id, ch.id)
return msg
except (discord.Forbidden, discord.NotFound, discord.HTTPException):
continue
return None
async def _collect_reactors(self, message: Optional[discord.Message]) -> Set[int]:
ids: Set[int] = set()
if not message:
return ids
for rxn in message.reactions:
if is_accept(rxn.emoji):
async for u in rxn.users(limit=None):
if not u.bot:
ids.add(u.id)
return ids
# ---------- reconcile (creates reviews for offline nickname claims) ----------
async def _open_review_fallback(self, guild: discord.Guild, member: discord.Member, source: str):
"""If NickNudgeCog.start_nick_review is missing, post a simple review to mod channel."""
if not self.mod_channel_id:
return
ch = guild.get_channel(self.mod_channel_id)
if not ch:
return
try:
await ch.send(
f"📝 **Nickname claim review (fallback)**\n"
f"User: {member.mention} (`{member.id}`)\n"
f"Source: `{source}`\n"
f"Current name: **{self._primary_name(member)}**\n"
f"Please verify and handle manually.",
allowed_mentions=NO_MENTIONS
)
except Exception:
pass
async def _reconcile_agreements(self, guild: discord.Guild) -> Tuple[int, int, int, int]:
"""
Reconcile from live reaction messages.
Returns (rules_changed, engage_changed, nick_added_claims, nick_removed_claims).
Also opens nickname reviews for any **unreviewed** claimers (startup/offline).
"""
dm = self.bot.data_manager
rules_msg = await self._get_message_by_id(guild, self.rules_msg_id)
engage_msg = await self._get_message_by_id(guild, self.engage_msg_id)
nick_msg = await self._get_message_by_id(guild, self.nick_msg_id)
new_rules = await self._collect_reactors(rules_msg)
new_engage = await self._collect_reactors(engage_msg)
new_nick = await self._collect_reactors(nick_msg)
old_rules = set(dm.get('agreed_rules'))
old_engage = set(dm.get('agreed_engagement'))
old_nick = set(dm.get('agreed_nickname'))
rules_added, rules_removed = new_rules - old_rules, old_rules - new_rules
engage_added, engage_removed = new_engage - old_engage, old_engage - new_engage
nick_added, nick_removed = new_nick - old_nick, old_nick - new_nick
# Persist live truth for rules/engagement/nickname claim (reaction path)
if old_rules != new_rules:
dm.remove('agreed_rules', lambda _x: True)
for uid in new_rules:
dm.add('agreed_rules', int(uid))
if old_engage != new_engage:
dm.remove('agreed_engagement', lambda _x: True)
for uid in new_engage:
dm.add('agreed_engagement', int(uid))
if old_nick != new_nick:
dm.remove('agreed_nickname', lambda _x: True)
for uid in new_nick:
dm.add('agreed_nickname', int(uid))
# Apply/repair roles for Rules/Engagement
rules_role = guild.get_role(self.rules_role_id)
engage_role = guild.get_role(self.engage_role_id)
for uid in (rules_added | rules_removed | engage_added | engage_removed):
m = guild.get_member(uid)
if not m or m.bot:
continue
if rules_role:
try:
if uid in new_rules and rules_role not in m.roles:
await m.add_roles(rules_role, reason="Reconcile: reacted to Rules ✅")
elif uid not in new_rules and rules_role in m.roles:
await m.remove_roles(rules_role, reason="Reconcile: removed Rules ✅")
except Exception:
pass
if engage_role:
try:
if uid in new_engage and engage_role not in m.roles:
await m.add_roles(engage_role, reason="Reconcile: reacted to RoE ✅")
elif uid not in new_engage and engage_role in m.roles:
await m.remove_roles(engage_role, reason="Reconcile: removed RoE ✅")
except Exception:
pass
# --- Open reviews for *any* unreviewed claimers (startup/offline) ---
nn = self.bot.get_cog('NickNudgeCog')
verified_set = set(dm.get('nick_verified'))
pending_set = set(dm.get('nick_claim_pending'))
open_review_user_ids = {
int(r['user_id'])
for r in dm.get('nick_reviews')
if r.get('guild_id') == guild.id and r.get('status') == 'pending'
}
# Anyone who currently claims (reacted) but isn't verified/pending/under review
need_review = [
uid for uid in new_nick
if uid not in verified_set and uid not in pending_set and uid not in open_review_user_ids
]
for uid in need_review:
dm.add('nick_claim_pending', int(uid))
m = guild.get_member(uid)
if m and not m.bot:
if nn and hasattr(nn, 'start_nick_review'):
try:
await nn.start_nick_review(guild, m, source="claim_offline")
except Exception:
pass
else:
await self._open_review_fallback(guild, m, source="claim_offline")
# Anyone who *lost* their reaction -> clear pending/verified
for uid in nick_removed:
dm.remove('nick_claim_pending', lambda x: x == uid)
dm.remove('nick_verified', lambda x: x == uid)
# Nudge roles/full access via ReactionRoleCog for anyone impacted
rr = self.bot.get_cog('ReactionRoleCog')
impacted = (rules_added | rules_removed | engage_added | engage_removed |
set(need_review) | nick_removed)
if rr:
for uid in impacted:
m = guild.get_member(uid)
if m and not m.bot:
try:
await rr.maybe_apply_full_access(m)
except Exception:
pass
else:
for uid in impacted:
m = guild.get_member(uid)
if m and not m.bot:
try:
await self.refresh_card(m)
except Exception:
pass
return (
len(rules_added) + len(rules_removed),
len(engage_added) + len(engage_removed),
len(nick_added),
len(nick_removed),
)
# ---------- listeners ----------
@commands.Cog.listener()
async def on_member_join(self, member: discord.Member):
await self._log(member.guild, f"📝 User joined: {member.mention} (ID: {member.id})")
await self.refresh_card(member)
@commands.Cog.listener()
async def on_member_update(self, before: discord.Member, after: discord.Member):
if before.nick != after.nick or before.roles != after.roles:
await self.refresh_card(after)
@commands.Cog.listener()
async def on_user_update(self, before: discord.User, after: discord.User):
if before.global_name == after.global_name and before.name == after.name:
return
for g in self.bot.guilds:
m = g.get_member(after.id)
if m:
try:
await self.refresh_card(m)
except Exception:
pass
# ---------- periodic + startup ----------
async def _startup_reconcile(self):
await self.bot.wait_until_ready()
for g in list(self.bot.guilds):
try:
await self._reconcile_agreements(g)
except Exception:
pass
for g in list(self.bot.guilds):
for m in g.members:
if not m.bot:
try:
await self.refresh_card(m)
except Exception:
pass
async def _periodic_refresh(self):
await self.bot.wait_until_ready()
while not self.bot.is_closed():
try:
for g in self.bot.guilds:
try:
await self._reconcile_agreements(g)
except Exception:
pass
for m in g.members:
if not m.bot:
await self.refresh_card(m)
except Exception:
pass
await asyncio.sleep(12 * 60 * 60) # twice a day
# ---------- mod command: rescan + live reconcile ----------
@commands.hybrid_command(
name="usercards_rescan",
description="[MOD] Re-check all users and refresh cards"
)
@commands.has_permissions(manage_guild=True)
async def usercards_rescan(self, ctx: commands.Context):
g = ctx.guild
if not g:
return await ctx.reply("Use this in a server.", ephemeral=True)
rchg, echg, nadd, nrem = await self._reconcile_agreements(g)
# Rebuild cards
updated = 0
for m in g.members:
if not m.bot:
try:
await self.refresh_card(m)
updated += 1
except Exception:
pass
await ctx.reply(
f"Reconciled from messages. Changes — Rules: **{rchg}**, RoE: **{echg}**, "
f"Nickname (added): **{nadd}**, Nickname (removed): **{nrem}**. "
f"Refreshed cards for **{updated}** members.",
ephemeral=True
)
async def setup(bot):
await bot.add_cog(UserCardsCog(bot))