Delete modules/auto_vc/auto_vc.py

This commit is contained in:
frarol96 2025-08-10 15:10:56 +00:00
parent 1a89e21ebf
commit 42385d19f6

View File

@ -1,254 +0,0 @@
# modules/auto_vc/auto_vc.py
import asyncio
import time
import discord
from discord.ext import commands
def now() -> float:
return time.time()
class AutoVCCog(commands.Cog):
"""
Auto-VC:
When someone joins the trigger voice channel, create a new VC under the target category,
name it "{prefix} N", and move the member there.
When an auto-VC is empty for `delay` seconds, delete it and renumber the remaining ones.
Only channels created by this cog are managed (tracked in data_manager['vc_channels']).
Admin commands:
/avc_status -> show current state
/avc_cleanup_now -> run a cleanup/renumber pass now
/avc_renumber -> renumber without deleting
"""
def __init__(self, bot):
self.bot = bot
cfg = bot.config['DEFAULT']
# Config
self.trigger_id = int(cfg['trigger_channel_id'])
self.category_id = int(cfg['auto_vc_category_id'])
self.prefix = cfg['vc_name_prefix']
self.delay = int(cfg.get('auto_vc_cleanup_delay', 30))
self.modlog_channel_id = int(cfg.get('modlog_channel_id', '0')) if cfg.get('modlog_channel_id') else 0
# State
self.empty_since: dict[int, float] = {} # channel_id -> ts when became empty
self._vc_cooldowns: dict[int, float] = {} # user_id -> ts last created (anti-spam)
self._create_lock = asyncio.Lock()
# Background sweeper
self._task = asyncio.create_task(self._sweeper())
# ------------- utilities -------------
def cog_unload(self):
try:
self._task.cancel()
except Exception:
pass
def _prefixed(self, num: int) -> str:
return f"{self.prefix.strip()} {num}"
def _vc_records(self, guild_id: int):
"""Return list of tracked records for this guild from persistent store."""
return [r for r in self.bot.data_manager.get('vc_channels') if r.get('guild_id') == guild_id]
def _find_record(self, guild_id: int, channel_id: int):
for r in self._vc_records(guild_id):
if r.get('channel_id') == channel_id:
return r
return None
async def _log(self, guild: discord.Guild, msg: str):
if not self.modlog_channel_id:
print(f"[AutoVC][{guild.name}] {msg}")
return
ch = guild.get_channel(self.modlog_channel_id)
if ch:
try:
await ch.send(msg)
except Exception:
print(f"[AutoVC][{guild.name}] (fallback) {msg}")
else:
print(f"[AutoVC][{guild.name}] {msg}")
async def _renumber(self, guild: discord.Guild):
"""Rename tracked channels to {prefix} 1..N in stable order (by creation_ts)."""
recs = sorted(self._vc_records(guild.id), key=lambda r: r.get('created_ts', 0))
for i, rec in enumerate(recs, start=1):
ch = guild.get_channel(rec['channel_id'])
if not ch:
# prune dead record
self.bot.data_manager.remove('vc_channels', lambda x: x.get('channel_id') == rec['channel_id'])
continue
desired = self._prefixed(i)
if ch.name != desired:
try:
await ch.edit(name=desired, reason="Auto-VC renumber")
except Exception as e:
print("[auto_vc] rename failed:", repr(e))
async def _cleanup_pass(self, guild: discord.Guild):
"""Delete empty tracked channels that exceeded delay and renumber."""
cat = guild.get_channel(self.category_id)
if not cat:
return
tracked_ids = {r['channel_id'] for r in self._vc_records(guild.id)}
now_ts = now()
to_delete: list[discord.VoiceChannel] = []
# Mark empties & collect deletions
for ch in cat.voice_channels:
if ch.id not in tracked_ids:
continue # unmanaged room
if len(ch.members) == 0:
started = self.empty_since.get(ch.id)
if started is None:
self.empty_since[ch.id] = now_ts
elif now_ts - started >= self.delay:
to_delete.append(ch)
else:
self.empty_since.pop(ch.id, None)
# Delete idle channels
for ch in to_delete:
try:
await ch.delete(reason=f"Auto-VC idle > {self.delay}s")
await self._log(guild, f"🗑️ Deleted idle room: `{ch.name}`")
except Exception as e:
print("[auto_vc] delete failed:", repr(e))
# purge record and emptiness stamp
self.bot.data_manager.remove('vc_channels', lambda r, cid=ch.id: r.get('channel_id') == cid)
self.empty_since.pop(ch.id, None)
# purge records for channels that vanished by other means
for rec in list(self._vc_records(guild.id)):
if not guild.get_channel(rec['channel_id']):
self.bot.data_manager.remove('vc_channels', lambda r, cid=rec['channel_id']: r.get('channel_id') == cid)
if to_delete:
await self._renumber(guild)
# ------------- background worker -------------
async def _sweeper(self):
await self.bot.wait_until_ready()
while not self.bot.is_closed():
try:
for guild in self.bot.guilds:
await self._cleanup_pass(guild)
except Exception as e:
print("[auto_vc] sweeper loop error:", repr(e))
await asyncio.sleep(30)
# ------------- channel creation -------------
async def _spawn_and_move(self, member: discord.Member):
guild = member.guild
cat = guild.get_channel(self.category_id)
if not cat:
await self._log(guild, "⚠️ auto_vc_category_id not found; cannot create rooms.")
return
async with self._create_lock:
# Determine next index based on *tracked* channels
recs = sorted(self._vc_records(guild.id), key=lambda r: r.get('created_ts', 0))
next_index = len(recs) + 1
name = self._prefixed(next_index)
# Create room
try:
new_ch = await cat.create_voice_channel(name, reason="Auto-VC spawn")
except discord.Forbidden:
await self._log(guild, "❌ Missing permission to create voice channels in the category.")
return
except Exception as e:
await self._log(guild, f"❌ Failed to create voice channel: {e}")
return
# Persist record
self.bot.data_manager.add('vc_channels', {
'guild_id': guild.id,
'channel_id': new_ch.id,
'created_ts': now()
})
# Move the user
try:
await member.move_to(new_ch, reason="Auto-VC move")
except discord.Forbidden:
await self._log(guild, "⚠️ I need **Move Members** and **Connect** permissions to move users.")
except Exception as e:
await self._log(guild, f"⚠️ Could not move {member} into `{name}`: {e}")
# Start its idle timer if it immediately empties
if len(new_ch.members) == 0:
self.empty_since[new_ch.id] = now()
# ------------- core flow -------------
@commands.Cog.listener()
async def on_voice_state_update(self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState):
guild = member.guild
# Create on trigger join (with 5s per-user cooldown)
if after.channel and after.channel.id == self.trigger_id:
last = self._vc_cooldowns.get(member.id, 0.0)
if now() - last < 5.0:
return
self._vc_cooldowns[member.id] = now()
try:
await self._spawn_and_move(member)
except Exception as e:
print("[auto_vc] spawn/move failed:", repr(e))
# Mark empties immediately on leave
if before.channel:
ch = before.channel
if ch.category_id == self.category_id:
rec = self._find_record(guild.id, ch.id)
if rec and len(ch.members) == 0:
self.empty_since[ch.id] = now()
# ------------- admin commands -------------
@commands.hybrid_command(name="avc_status", description="Show Auto-VC status for this guild")
@commands.has_permissions(manage_guild=True)
async def avc_status(self, ctx: commands.Context):
g = ctx.guild
recs = sorted(self._vc_records(g.id), key=lambda r: r.get('created_ts', 0))
lines = [
f"Trigger: <#{self.trigger_id}> | Category: <#{self.category_id}> | Prefix: `{self.prefix}` | Delay: {self.delay}s"
]
for idx, rec in enumerate(recs, start=1):
ch = g.get_channel(rec['channel_id'])
if not ch:
state = "missing"
name = f"(deleted #{idx})"
else:
name = ch.name
state = f"{len(ch.members)} inside" if len(ch.members) else "empty"
t = self.empty_since.get(rec['channel_id'])
tail = f" | idle {int(now()-t)}s" if t and (not ch or (ch and not ch.members)) else ""
lines.append(f"- #{idx}: {name}{state}{tail}")
msg = "Auto-VC status:\n" + "\n".join(lines) if lines else "No Auto-VC rooms tracked."
await ctx.reply(msg)
@commands.hybrid_command(name="avc_cleanup_now", description="Run an immediate cleanup pass (delete idle rooms & renumber)")
@commands.has_permissions(manage_guild=True)
async def avc_cleanup_now(self, ctx: commands.Context):
await self._cleanup_pass(ctx.guild)
await ctx.reply("Cleanup pass complete.")
@commands.hybrid_command(name="avc_renumber", description="Force a renumber of tracked rooms")
@commands.has_permissions(manage_guild=True)
async def avc_renumber(self, ctx: commands.Context):
await self._renumber(ctx.guild)
await ctx.reply("Renumbered.")
async def setup(bot):
await bot.add_cog(AutoVCCog(bot))