282 lines
12 KiB
Python
282 lines
12 KiB
Python
# modules/auto_vc/auto_vc.py
|
|
import asyncio
|
|
import time
|
|
import discord
|
|
from discord.ext import commands
|
|
from discord import app_commands
|
|
from modules.common.settings import cfg # ENV-first config helper
|
|
from mod_perms import require_mod_ctx, require_mod_interaction # <- use project mod perms
|
|
|
|
def now() -> float:
|
|
return time.time()
|
|
|
|
class AutoVCCog(commands.Cog):
|
|
"""
|
|
Auto-VC:
|
|
• When someone joins the trigger voice channel, create a new VC under the target category,
|
|
name it "{prefix} N", and move the member there.
|
|
• When an auto-VC is empty for `delay` seconds, delete it and renumber the remaining ones.
|
|
• Only channels created by this cog are managed (tracked in data_manager['vc_channels']).
|
|
|
|
Admin commands:
|
|
/avc_status -> show current state
|
|
/avc_cleanup_now -> [MOD] run a cleanup/renumber pass now
|
|
/avc_renumber -> [MOD] renumber without deleting
|
|
"""
|
|
|
|
def __init__(self, bot):
|
|
self.bot = bot
|
|
r = cfg(bot)
|
|
|
|
# Config (ENV/INI via helper; safe defaults)
|
|
self.trigger_id = r.int('trigger_channel_id', 0)
|
|
self.category_id = r.int('auto_vc_category_id', 0)
|
|
self.prefix = r.get('vc_name_prefix', 'Room')
|
|
self.delay = r.int('auto_vc_cleanup_delay', 30)
|
|
self.modlog_channel_id = r.int('modlog_channel_id', 0)
|
|
|
|
# State
|
|
self.empty_since: dict[int, float] = {} # channel_id -> ts when became empty
|
|
self._vc_cooldowns: dict[int, float] = {} # user_id -> ts last created (anti-spam)
|
|
self._create_lock = asyncio.Lock()
|
|
self._ops_lock = asyncio.Lock() # serialize admin ops vs sweeper
|
|
|
|
# Background sweeper
|
|
self._task = asyncio.create_task(self._sweeper())
|
|
|
|
# ------------- utilities -------------
|
|
|
|
def cog_unload(self):
|
|
try:
|
|
self._task.cancel()
|
|
except Exception:
|
|
pass
|
|
|
|
def _prefixed(self, num: int) -> str:
|
|
return f"{self.prefix.strip()} {num}"
|
|
|
|
def _vc_records(self, guild_id: int):
|
|
"""Return list of tracked records for this guild from persistent store."""
|
|
return [r for r in self.bot.data_manager.get('vc_channels') if r.get('guild_id') == guild_id]
|
|
|
|
def _find_record(self, guild_id: int, channel_id: int):
|
|
for r in self._vc_records(guild_id):
|
|
if r.get('channel_id') == channel_id:
|
|
return r
|
|
return None
|
|
|
|
async def _log(self, guild: discord.Guild, msg: str):
|
|
if not self.modlog_channel_id:
|
|
print(f"[AutoVC][{guild.name}] {msg}")
|
|
return
|
|
ch = guild.get_channel(self.modlog_channel_id)
|
|
if ch:
|
|
try:
|
|
await ch.send(msg)
|
|
except Exception:
|
|
print(f"[AutoVC][{guild.name}] (fallback) {msg}")
|
|
else:
|
|
print(f"[AutoVC][{guild.name}] {msg}")
|
|
|
|
async def _renumber(self, guild: discord.Guild):
|
|
"""Rename tracked channels to {prefix} 1..N in stable order (by creation_ts)."""
|
|
recs = sorted(self._vc_records(guild.id), key=lambda r: r.get('created_ts', 0))
|
|
for i, rec in enumerate(recs, start=1):
|
|
ch = guild.get_channel(rec['channel_id'])
|
|
if not ch:
|
|
# prune dead record
|
|
self.bot.data_manager.remove('vc_channels', lambda x: x.get('channel_id') == rec['channel_id'])
|
|
continue
|
|
desired = self._prefixed(i)
|
|
if ch.name != desired:
|
|
try:
|
|
await ch.edit(name=desired, reason="Auto-VC renumber")
|
|
except Exception as e:
|
|
print("[auto_vc] rename failed:", repr(e))
|
|
|
|
async def _cleanup_pass(self, guild: discord.Guild):
|
|
"""Delete empty tracked channels that exceeded delay and renumber."""
|
|
if not self.category_id:
|
|
return
|
|
cat = guild.get_channel(self.category_id)
|
|
if not cat:
|
|
return
|
|
|
|
tracked_ids = {r['channel_id'] for r in self._vc_records(guild.id)}
|
|
now_ts = now()
|
|
to_delete: list[discord.VoiceChannel] = []
|
|
|
|
# Mark empties & collect deletions
|
|
for ch in cat.voice_channels:
|
|
if ch.id not in tracked_ids:
|
|
continue # unmanaged room
|
|
if len(ch.members) == 0:
|
|
started = self.empty_since.get(ch.id)
|
|
if started is None:
|
|
self.empty_since[ch.id] = now_ts
|
|
elif now_ts - started >= self.delay:
|
|
to_delete.append(ch)
|
|
else:
|
|
self.empty_since.pop(ch.id, None)
|
|
|
|
# Delete idle channels
|
|
for ch in to_delete:
|
|
try:
|
|
await ch.delete(reason=f"Auto-VC idle > {self.delay}s")
|
|
await self._log(guild, f"🗑️ Deleted idle room: `{ch.name}`")
|
|
except Exception as e:
|
|
print("[auto_vc] delete failed:", repr(e))
|
|
# purge record and emptiness stamp
|
|
self.bot.data_manager.remove('vc_channels', lambda r, cid=ch.id: r.get('channel_id') == cid)
|
|
self.empty_since.pop(ch.id, None)
|
|
|
|
# purge records for channels that vanished by other means
|
|
for rec in list(self._vc_records(guild.id)):
|
|
if not guild.get_channel(rec['channel_id']):
|
|
self.bot.data_manager.remove('vc_channels', lambda r, cid=rec['channel_id']: r.get('channel_id') == cid)
|
|
|
|
if to_delete:
|
|
await self._renumber(guild)
|
|
|
|
# ------------- background worker -------------
|
|
|
|
async def _sweeper(self):
|
|
await self.bot.wait_until_ready()
|
|
while not self.bot.is_closed():
|
|
try:
|
|
# Serialize with admin ops
|
|
async with self._ops_lock:
|
|
for guild in self.bot.guilds:
|
|
await self._cleanup_pass(guild)
|
|
except Exception as e:
|
|
print("[auto_vc] sweeper loop error:", repr(e))
|
|
await asyncio.sleep(30)
|
|
|
|
# ------------- channel creation -------------
|
|
|
|
async def _spawn_and_move(self, member: discord.Member):
|
|
guild = member.guild
|
|
if not self.category_id:
|
|
await self._log(guild, "⚠️ auto_vc_category_id not configured; cannot create rooms.")
|
|
return
|
|
cat = guild.get_channel(self.category_id)
|
|
if not cat:
|
|
await self._log(guild, "⚠️ auto_vc_category_id not found; cannot create rooms.")
|
|
return
|
|
|
|
async with self._create_lock:
|
|
# Determine next index based on *tracked* channels
|
|
recs = sorted(self._vc_records(guild.id), key=lambda r: r.get('created_ts', 0))
|
|
next_index = len(recs) + 1
|
|
name = self._prefixed(next_index)
|
|
|
|
# Create room
|
|
try:
|
|
new_ch = await cat.create_voice_channel(name, reason="Auto-VC spawn")
|
|
except discord.Forbidden:
|
|
await self._log(guild, "❌ Missing permission to create voice channels in the category.")
|
|
return
|
|
except Exception as e:
|
|
await self._log(guild, f"❌ Failed to create voice channel: {e}")
|
|
return
|
|
|
|
# Persist record
|
|
self.bot.data_manager.add('vc_channels', {
|
|
'guild_id': guild.id,
|
|
'channel_id': new_ch.id,
|
|
'created_ts': now()
|
|
})
|
|
|
|
# Move the user
|
|
try:
|
|
await member.move_to(new_ch, reason="Auto-VC move")
|
|
except discord.Forbidden:
|
|
await self._log(guild, "⚠️ I need **Move Members** and **Connect** permissions to move users.")
|
|
except Exception as e:
|
|
await self._log(guild, f"⚠️ Could not move {member} into `{name}`: {e}")
|
|
|
|
# Start its idle timer if it immediately empties
|
|
if len(new_ch.members) == 0:
|
|
self.empty_since[new_ch.id] = now()
|
|
|
|
# ------------- core flow -------------
|
|
|
|
@commands.Cog.listener()
|
|
async def on_voice_state_update(self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState):
|
|
guild = member.guild
|
|
|
|
# Create on trigger join (with 5s per-user cooldown)
|
|
if self.trigger_id and after.channel and after.channel.id == self.trigger_id:
|
|
last = self._vc_cooldowns.get(member.id, 0.0)
|
|
if now() - last < 5.0:
|
|
return
|
|
self._vc_cooldowns[member.id] = now()
|
|
try:
|
|
await self._spawn_and_move(member)
|
|
except Exception as e:
|
|
print("[auto_vc] spawn/move failed:", repr(e))
|
|
|
|
# Mark empties immediately on leave
|
|
if before.channel and self.category_id:
|
|
ch = before.channel
|
|
if ch.category_id == self.category_id:
|
|
rec = self._find_record(guild.id, ch.id)
|
|
if rec and len(ch.members) == 0:
|
|
self.empty_since[ch.id] = now()
|
|
|
|
# ------------- admin commands -------------
|
|
|
|
@app_commands.command(name="avc_status", description="Show Auto-VC status for this guild")
|
|
async def avc_status(self, interaction: discord.Interaction):
|
|
if not interaction.guild:
|
|
return await interaction.response.send_message("Use this in a server.", ephemeral=True)
|
|
|
|
g = interaction.guild
|
|
recs = sorted(self._vc_records(g.id), key=lambda r: r.get('created_ts', 0))
|
|
lines = [
|
|
f"Trigger: <#{self.trigger_id or 0}> | Category: <#{self.category_id or 0}> | Prefix: `{self.prefix}` | Delay: {self.delay}s"
|
|
]
|
|
for idx, rec in enumerate(recs, start=1):
|
|
ch = g.get_channel(rec['channel_id'])
|
|
if not ch:
|
|
state = "missing"
|
|
name = f"(deleted #{idx})"
|
|
else:
|
|
name = ch.name
|
|
state = f"{len(ch.members)} inside" if len(ch.members) else "empty"
|
|
t = self.empty_since.get(rec['channel_id'])
|
|
tail = f" | idle {int(now()-t)}s" if t and (not ch or (ch and not ch.members)) else ""
|
|
lines.append(f"- #{idx}: {name} — {state}{tail}")
|
|
|
|
msg = "Auto-VC status:\n" + "\n".join(lines) if lines else "No Auto-VC rooms tracked."
|
|
await interaction.response.send_message(msg)
|
|
|
|
@app_commands.command(name="avc_cleanup_now", description="[MOD] Run an immediate cleanup pass (delete idle rooms & renumber)")
|
|
async def avc_cleanup_now(self, interaction: discord.Interaction):
|
|
if not interaction.guild:
|
|
return await interaction.response.send_message("Use this in a server.", ephemeral=True)
|
|
if not await require_mod_interaction(interaction, "This command is restricted to moderators."):
|
|
return
|
|
|
|
await interaction.response.defer(ephemeral=True)
|
|
async with self._ops_lock:
|
|
await self._cleanup_pass(interaction.guild)
|
|
await self._log(interaction.guild, f"🧹 Cleanup pass invoked by {interaction.user.mention}")
|
|
await interaction.followup.send("Cleanup pass complete.", ephemeral=True)
|
|
|
|
@app_commands.command(name="avc_renumber", description="[MOD] Force a renumber of tracked rooms")
|
|
async def avc_renumber(self, interaction: discord.Interaction):
|
|
if not interaction.guild:
|
|
return await interaction.response.send_message("Use this in a server.", ephemeral=True)
|
|
if not await require_mod_interaction(interaction, "This command is restricted to moderators."):
|
|
return
|
|
|
|
await interaction.response.defer(ephemeral=True)
|
|
async with self._ops_lock:
|
|
await self._renumber(interaction.guild)
|
|
await self._log(interaction.guild, f"🔢 Renumber invoked by {interaction.user.mention}")
|
|
await interaction.followup.send("Renumbered.", ephemeral=True)
|
|
|
|
async def setup(bot):
|
|
await bot.add_cog(AutoVCCog(bot))
|