# modules/pirates_list/pirates_list.py import asyncio import discord from discord.ext import commands from mod_perms import require_mod_ctx # ctx-aware mod gate class PiratesListCog(commands.Cog): """ Maintains a compact, alphabetized pirates list in the configured channel. Entry format: - **{Character}** (*{Account}*) [{Threat}%] - In group: {never/rarely/...}. Destructive: {never/rarely/...}. Encounters: N. Last: Posts are chunked to stay <2000 chars and previous posts are deleted on refresh. """ def __init__(self, bot): self.bot = bot cfg = bot.config["DEFAULT"] self.list_channel_id = int(cfg["pirates_list_channel_id"]) try: self.group_threshold = int(cfg.get("threat_group_threshold", "3")) except Exception: self.group_threshold = 3 try: self.min_samples = int(cfg.get("threat_min_samples_for_stats", "3")) except Exception: self.min_samples = 3 # serialize refreshes per guild self._locks = {} self._no_mentions = discord.AllowedMentions.none() def _lock_for(self, guild_id: int): import asyncio self._locks.setdefault(guild_id, asyncio.Lock()) return self._locks[guild_id] # send settings: never ping on posted content self._no_mentions = discord.AllowedMentions.none() # ----------------- utils ----------------- def _lock_for(self, guild_id: int) -> asyncio.Lock: self._locks.setdefault(guild_id, asyncio.Lock()) return self._locks[guild_id] @staticmethod def _esc(s: str) -> str: """Escape markdown and neuter @mentions in user-provided strings.""" safe = discord.utils.escape_markdown(str(s)) return safe.replace("@", "@\u200b") def _encounters_for(self, pirate: dict): """Return encounters matching either char name or account (case-insensitive), defensively.""" acct_l = str(pirate.get("account_name", "")).lower() char_l = str(pirate.get("character_name", "")).lower() out = [] for e in self.bot.data_manager.get("encounters"): try: ident = str(e.get("identifier", "")).lower() if ident in (acct_l, char_l): out.append(e) except Exception: continue return out def _bucket(self, rate: float, samples: int) -> str: if samples < self.min_samples: return "unknown" if rate <= 0: return "never" if rate <= 0.25: return "rarely" if rate <= 0.60: return "sometimes" if rate <= 0.85: return "often" return "always" def _format_entry(self, p: dict) -> str: encs = self._encounters_for(p) total = len(encs) def _safe_int(v, default=0): try: return int(v) except Exception: return default group_rate = ( sum(1 for e in encs if _safe_int(e.get("group_size"), 0) >= self.group_threshold) / total if total else 0.0 ) destroy_rate = ( sum(1 for e in encs if bool(e.get("destruction"))) / total if total else 0.0 ) groups_str = self._bucket(group_rate, total) destr_str = self._bucket(destroy_rate, total) last_ts = None if total: try: last_ts = max(int(float(e.get("timestamp", 0))) for e in encs) except Exception: last_ts = None char = self._esc(p.get("character_name", "Unknown")) acct = self._esc(p.get("account_name", "Unknown#00000")) threat = int(p.get("threat_level", 0)) last_str = f"" if last_ts else "—" line1 = f"- **{char}** (*{acct}*) [{threat}%]" line2 = f" - In group: {groups_str}. Destructive: {destr_str}. Encounters: {total}. Last: {last_str}" return f"{line1}\n{line2}" async def refresh_list(self, guild: discord.Guild): """Edit list messages in place; only send extra messages when we need more chunks (new pirates).""" # ---- serialize per guild ---- lock = getattr(self, "_locks", {}).get(guild.id) if lock is None: # tiny fallback if you didn't add _lock_for() import asyncio as _asyncio if not hasattr(self, "_locks"): self._locks = {} self._locks[guild.id] = _asyncio.Lock() lock = self._locks[guild.id] async with lock: channel = guild.get_channel(self.list_channel_id) if not channel: print("[pirates_list] list channel not found:", self.list_channel_id) return dm = self.bot.data_manager allow = getattr(self, "_no_mentions", discord.AllowedMentions.none()) # ---- load & prune existing posts for this guild/channel ---- records = [r for r in dm.get("pirates_list_posts") if r.get("guild_id") == guild.id and r.get("channel_id") == self.list_channel_id] # fetch messages (drop any that vanished) msgs, kept_records = [], [] for r in records: try: m = await channel.fetch_message(int(r["message_id"])) msgs.append(m) kept_records.append(r) except Exception: # prune dead record dm.remove("pirates_list_posts", lambda x, mid=r["message_id"]: x.get("message_id") == mid) records = kept_records # only live ones, in stored order # ---- build fresh, sorted contents ---- pirates = sorted( dm.get("pirates"), key=lambda x: ( str(x.get("character_name", "")).lower(), str(x.get("account_name", "")).lower() ) ) # Empty state if not pirates: placeholder = "_No verified pirates yet._" if msgs: # edit first, delete the rest if msgs[0].content != placeholder: try: await msgs[0].edit(content=placeholder, allowed_mentions=allow) except Exception as e: print("[pirates_list] edit placeholder failed:", repr(e)) # remove extra posts/records for extra in msgs[1:]: try: await extra.delete() except Exception: pass dm.remove("pirates_list_posts", lambda r, mid=extra.id: r.get("message_id") == mid) else: try: m = await channel.send(placeholder, allowed_mentions=allow) dm.add("pirates_list_posts", { "guild_id": guild.id, "channel_id": self.list_channel_id, "message_id": m.id }) except Exception as e: print("[pirates_list] send placeholder failed:", repr(e)) return # Chunk into <2000 char blocks (~1900 for margin) chunks, current = [], "" for p in pirates: entry = self._format_entry(p) if len(current) + len(entry) + 1 > 1900: chunks.append(current.rstrip()) current = entry + "\n" else: current += entry + "\n" if current.strip(): chunks.append(current.rstrip()) # ---- edit existing messages with new content ---- common = min(len(msgs), len(chunks)) for i in range(common): if msgs[i].content != chunks[i]: try: await msgs[i].edit(content=chunks[i], allowed_mentions=allow) except Exception as e: print("[pirates_list] edit block failed:", repr(e)) # ---- if we need *more* messages (usually after adding a pirate), send them ---- if len(chunks) > len(msgs): for i in range(len(msgs), len(chunks)): try: m = await channel.send(chunks[i], allowed_mentions=allow) dm.add("pirates_list_posts", { "guild_id": guild.id, "channel_id": self.list_channel_id, "message_id": m.id }) except Exception as e: print("[pirates_list] send block failed:", repr(e)) # ---- if we need fewer messages (e.g., pirate removed), delete extras ---- elif len(chunks) < len(msgs): extras = msgs[len(chunks):] for m in extras: try: await m.delete() except Exception: pass dm.remove("pirates_list_posts", lambda r, mid=m.id: r.get("message_id") == mid) # Manual refresh command (hybrid: works as /pirates_list_refresh and !pirates_list_refresh) @commands.hybrid_command(name="pirates_list_refresh", description="Rebuild the compact pirates list") @commands.cooldown(1, 10, commands.BucketType.guild) # tiny anti-spam async def pirates_list_refresh(self, ctx: commands.Context): if not await require_mod_ctx(ctx, "This command is restricted to moderators."): return if not ctx.guild: return await ctx.reply("Use this in a server.", ephemeral=True) await self.refresh_list(ctx.guild) is_slash = hasattr(ctx, "interaction") and ctx.interaction is not None await ctx.reply("Pirates list refreshed.", ephemeral=is_slash) async def setup(bot): await bot.add_cog(PiratesListCog(bot))