# modules/dd/dd_loot_table.py from __future__ import annotations import asyncio import hashlib import os import re import time from dataclasses import dataclass from datetime import datetime, timedelta, timezone from typing import Any, Dict, List, Optional, Tuple, Literal import aiohttp import discord from discord.ext import commands from discord import app_commands from modules.common.settings import cfg DD_FALLBACK_CHANNEL = 1404764793377652807 DD_URL = "https://dune.gaming.tools/deep-desert" OWNER_ID = 203190147582394369 # for error notices def _log(*a): print("[DD]", *a) def _utcnow() -> datetime: return datetime.now(timezone.utc) def _this_week_anchor(now: Optional[datetime] = None) -> datetime: if now is None: now = _utcnow() target_wd = 1 # Tue cur_wd = now.weekday() delta_days = (cur_wd - target_wd) % 7 anchor_date = (now - timedelta(days=delta_days)).date() anchor_dt = datetime(anchor_date.year, anchor_date.month, anchor_date.day, 3, 0, 0, tzinfo=timezone.utc) if now < anchor_dt: anchor_dt -= timedelta(days=7) return anchor_dt def _next_week_anchor(after: Optional[datetime] = None) -> datetime: return _this_week_anchor(after) + timedelta(days=7) def _backoff_delay_secs(waiting_since: float, now_ts: float) -> int: waited = max(0.0, now_ts - waiting_since) if waited < 3600: return 5 * 60 if waited < 3 * 3600: return 15 * 60 if waited < 6 * 3600: return 30 * 60 if waited < 24 * 3600: return 60 * 60 return 3 * 60 * 60 @dataclass class DDState: channel_id: int message_id: Optional[int] disabled: bool # hashes last_hash: str # current cycle prev_hash: str # previous cycle last_post_hash: str # hash of the message content currently posted week_anchor_ts: int last_success_ts: int waiting_since_ts: int last_attempt_ts: int @classmethod def from_dm(cls, dm) -> "DDState": rows = dm.get("dd_state") row = rows[0] if rows else {} env_raw = os.getenv("SHAI_DD_CHANNEL_ID", "").strip().strip('"').strip("'") env_cid = int(env_raw) if env_raw.isdigit() else 0 try: stored_cid = int(row.get("channel_id") or 0) except Exception: stored_cid = 0 chosen_cid = env_cid or stored_cid or DD_FALLBACK_CHANNEL return cls( channel_id=chosen_cid, message_id=row.get("message_id"), disabled=bool(row.get("disabled", False)), last_hash=str(row.get("last_hash", "")), prev_hash=str(row.get("prev_hash", "")), last_post_hash=str(row.get("last_post_hash", "")), week_anchor_ts=int(row.get("week_anchor_ts", 0)), last_success_ts=int(row.get("last_success_ts", 0)), waiting_since_ts=int(row.get("waiting_since_ts", 0)), last_attempt_ts=int(row.get("last_attempt_ts", 0)), ) def to_row(self) -> Dict[str, Any]: return { "channel_id": self.channel_id, "message_id": self.message_id, "disabled": self.disabled, "last_hash": self.last_hash, "prev_hash": self.prev_hash, "last_post_hash": self.last_post_hash, "week_anchor_ts": self.week_anchor_ts, "last_success_ts": self.last_success_ts, "waiting_since_ts": self.waiting_since_ts, "last_attempt_ts": self.last_attempt_ts, } # ---------- parsing ---------- _USER_AGENT = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" ) DETAILS_BLOCK_RE = re.compile(r"]*>.*?", re.I | re.S) NAME_SPAN_RE = re.compile(r"]*>.*?]*>(?P[^<]+).*?", re.I | re.S) ROW_RE = re.compile( r']*class="[^"]*flex[^"]*items-center[^"]*gap-2[^"]*"[^>]*>\s*' r']*class="[^"]*w-8[^"]*text-center[^"]*"[^>]*>\s*(?P[A-Z]\d+)\s*\s*' r']*>\s*(?P[^<]+?)\s*.*?' r']*class="[^"]*ml-auto[^"]*"[^>]*>.*?' r']*class="[^"]*w-10[^"]*text-center[^"]*"[^>]*>\s*(?P[^<]+?)\s*\s*' r']*>\s*(?P~?\d+%|\d+\.\d+%)\s*.*?' r'\s*', re.I | re.S, ) def _parse_dd_html(html: str) -> List[Dict[str, str]]: results: List[Dict[str, str]] = [] for dmatch in DETAILS_BLOCK_RE.finditer(html or ""): block = dmatch.group(0) nmatch = NAME_SPAN_RE.search(block) if not nmatch: continue name = " ".join(nmatch.group("name").split()) for rmatch in ROW_RE.finditer(block): grid = " ".join(rmatch.group("grid").split()) loc = " ".join(rmatch.group("loc").split()) amt = " ".join(rmatch.group("amt").split()) chance = " ".join(rmatch.group("chance").split()) results.append({"name": name, "grid": grid, "loc": loc, "amount": amt, "chance": chance}) return results def _hash_text(s: str) -> str: return hashlib.sha1(s.encode("utf-8")).hexdigest() def _hash_records(rows) -> str: rows = _sanitize_rows(rows) m = hashlib.sha256() for r in rows: m.update(f"{r['name']}|{r['grid']}|{r['loc']}|{r['amount']}|{r['chance']}\n".encode("utf-8")) return m.hexdigest() # ---------- formatters ---------- def _as_str(v) -> str: """Coerce any value (incl. lists/tuples) to a compact string.""" if isinstance(v, str): return v if isinstance(v, (list, tuple, set)): try: return ", ".join(map(str, v)) except Exception: return str(v) return str(v) def _sanitize_rows(rows): """Return rows with all fields as trimmed strings; safe for hashing/formatting.""" out = [] for r in rows or []: out.append({ "name": _as_str(r.get("name", "")).strip(), "grid": _as_str(r.get("grid", "")).strip().upper(), "loc": _as_str(r.get("loc", "")).strip(), "amount": _as_str(r.get("amount", "")).strip().replace("–", "-"), "chance": _as_str(r.get("chance", "")).strip().replace(" ", ""), }) return out def _abbr_loc(loc: str) -> str: """Shorten common locations to save characters.""" m = { "Imperial Testing Station": "Imp. Testing Station", "Large Shipwreck": "L. Shipwreck", "Small Shipwreck": "S. Shipwreck", } return m.get(loc.strip(), loc.strip()) def _grid_sort_key(g: str): """Sort grids like A1, A2, B10 naturally.""" g = g.strip().upper() if not g: return ("Z", 999) letter, num = g[0], g[1:] try: n = int(num) except Exception: n = 999 return (letter, n) def _fit_discord_message(lines: list[str], header: str, budget: int = 1900) -> str: """Join lines under budget with a truncation notice if needed.""" out = [header] total = len(header) + 1 dropped = 0 for ln in lines: ln_len = len(ln) + 1 if total + ln_len > budget: dropped += 1 continue out.append(ln) total += ln_len if dropped: out.append(f"... _(truncated {dropped} lines)_") return "\n".join(out) def _fmt_waiting(anchor_dt: datetime) -> str: when = anchor_dt.strftime("%Y-%m-%d %H:%M UTC") return ("**Deep Desert — Weekly Uniques**\n" f"_Reset detected (week starting **{when}**)._\n" "Waiting for the new loot table to appear...\n" "This message will update automatically once the new data is available.") def _fmt_error(anchor_dt: datetime, note: str) -> str: when = anchor_dt.strftime("%Y-%m-%d %H:%M UTC") return ("**Deep Desert — Weekly Uniques**\n" f"_Week starting **{when}**._\n" f"⚠️ {note}\n" f"<@{OWNER_ID}> will investigate.") def _fmt_rows(rows, anchor_dt: datetime) -> str: from collections import OrderedDict rows = _sanitize_rows(rows) def _abbr_loc(loc: str) -> str: m = { "Imperial Testing Station": "Imp. Testing Station", "Large Shipwreck": "L. Shipwreck", "Small Shipwreck": "S. Shipwreck", } return m.get(loc, loc) def _grid_sort_key(g: str): g = (g or "").upper() if not g: return ("Z", 999) letter, num = g[0], g[1:] try: n = int(num) except: n = 999 return (letter, n) # item -> location -> (amount, chance) -> [grids] grouped: "OrderedDict[str, OrderedDict[str, Dict[Tuple[str, str], List[str]]]]" = OrderedDict() for r in sorted(rows, key=lambda x: (x["name"], _abbr_loc(x["loc"]), _grid_sort_key(x["grid"]))): item, loc, grid, amt, ch = r["name"], _abbr_loc(r["loc"]), r["grid"], r["amount"], r["chance"] grouped.setdefault(item, OrderedDict()).setdefault(loc, {}).setdefault((amt, ch), []).append(grid) lines = [] for item, loc_map in grouped.items(): lines.append(f"- **{item}**") for loc, by_ac in loc_map.items(): lines.append(f" - {loc}") def _sort_ac(k): amt, ch = k try: chv = float(ch.lstrip("~").rstrip("%")) except Exception: chv = -1.0 return (-chv, amt) for (amt, ch), grids in sorted(by_ac.items(), key=_sort_ac): gstr = ", ".join(sorted(set(grids), key=_grid_sort_key)) lines.append(f" - {gstr} - {amt} ({ch})") when = anchor_dt.strftime("%Y-%m-%d %H:%M UTC") header = f"**Deep Desert — Weekly Uniques** _(week starting **{when}**)_" return _fit_discord_message(lines, header, budget=1900) # ---------- HTTP fetchers ---------- async def _fetch_via_aiohttp(session: aiohttp.ClientSession, url: str) -> str: headers = { "User-Agent": _USER_AGENT, "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9", "Cache-Control": "no-cache", "Pragma": "no-cache", } timeout = aiohttp.ClientTimeout(total=20, sock_connect=10, sock_read=10) async with session.get(url, headers=headers, allow_redirects=True, timeout=timeout) as resp: text = await resp.text() if resp.status >= 400: raise aiohttp.ClientResponseError( request_info=resp.request_info, history=resp.history, status=resp.status, message=f"HTTP {resp.status}", headers=resp.headers ) return text # ---------- Playwright (headless) ---------- class _PlaywrightPool: """Lazy, optional Playwright Chromium pool (single context).""" def __init__(self): self.apw = None self.browser = None self.context = None self.enabled = False async def ensure(self) -> bool: if self.enabled and self.apw and self.browser and self.context: return True try: from playwright.async_api import async_playwright # type: ignore except Exception: return False self.apw = await async_playwright().start() # flags for container/root environments + reduce automation signals self.browser = await self.apw.chromium.launch( headless=True, args=[ "--no-sandbox", "--disable-dev-shm-usage", "--disable-gpu", "--disable-blink-features=AutomationControlled", ], ) self.context = await self.browser.new_context( user_agent=_USER_AGENT, locale="en-US", timezone_id="UTC", java_script_enabled=True, ignore_https_errors=True, viewport={"width": 1366, "height": 900}, extra_http_headers={ "Accept-Language": "en-US,en;q=0.9", "Upgrade-Insecure-Requests": "1", }, ) # Minimal stealth: remove webdriver and add a few common props await self.context.add_init_script(""" Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); Object.defineProperty(navigator, 'platform', { get: () => 'Win32' }); Object.defineProperty(navigator, 'plugins', { get: () => [1,2,3,4,5] }); """) self.enabled = True return True async def close(self): try: if self.context: await self.context.close() finally: try: if self.browser: await self.browser.close() finally: try: if self.apw: await self.apw.stop() finally: self.apw = self.browser = self.context = None self.enabled = False async def fetch(self, url: str, timeout_ms: Optional[int] = None, wait: Optional[str] = None) -> str: """ Fetch fully rendered HTML with tolerant waiting against Cloudflare. Env overrides: SHAI_DD_PW_TIMEOUT_MS (default 45000) SHAI_DD_PW_WAIT = domcontentloaded|load|networkidle (default domcontentloaded) """ if not await self.ensure(): raise RuntimeError("playwright-unavailable") timeout_ms = int(os.getenv("SHAI_DD_PW_TIMEOUT_MS", "45000") or "45000") if timeout_ms is None else timeout_ms wait_mode = (os.getenv("SHAI_DD_PW_WAIT", "domcontentloaded") or "domcontentloaded").lower() if wait: wait_mode = wait page = await self.context.new_page() # Keep media traffic low but don't block fonts/CSS/JS (CF sometimes needs them) async def _route(route): rt = route.request.resource_type if rt in ("media", "video", "audio"): await route.abort() else: await route.continue_() await page.route("**/*", _route) # Step 1: navigate, but don't require networkidle (CF pages rarely go "idle") await page.goto(url, wait_until=wait_mode, timeout=timeout_ms) # Step 2: loop for CF auto-redirect and app hydration # We'll try up to ~35s total here. end_by = time.time() + max(20, timeout_ms / 1000 - 5) last_details = 0 while time.time() < end_by: html = await page.content() u = page.url # If we're still on a CF challenge or "just a moment" page, give it a bit if ("cdn-cgi/challenge" in u) or ("cf-chl" in u) or ("Just a moment" in html) or ("Please wait" in html): await page.wait_for_timeout(2500) continue # Check if our target content looks present try: count = await page.locator("details").count() except Exception: count = 0 last_details = max(last_details, count) if count > 0: break await page.wait_for_timeout(1500) html = await page.content() await page.close() return html # ---------- Cog ---------- class DDLootTableCog(commands.Cog): def __init__(self, bot: commands.Bot): self.bot = bot r = cfg(bot) self.dd_url = r.get("dd_url", DD_URL) try: self.channel_id_default = int(r.get("dd_channel_id", DD_FALLBACK_CHANNEL)) except Exception: self.channel_id_default = DD_FALLBACK_CHANNEL self._task: Optional[asyncio.Task] = None self._session: Optional[aiohttp.ClientSession] = None self._pw = _PlaywrightPool() self._last_debug: str = "" async def cog_load(self): self._session = aiohttp.ClientSession() if self._task is None: self._task = asyncio.create_task(self._runner(), name="DDLootTableRunner") _log("cog loaded; runner started:", bool(self._task), "url:", self.dd_url) async def cog_unload(self): t, self._task = self._task, None if t: t.cancel() s, self._session = self._session, None if s: await s.close() try: await self._pw.close() except Exception: pass _log("cog unloaded; runner/task closed") # ---- state ---- def _load_state(self) -> DDState: st = DDState.from_dm(self.bot.data_manager) env_raw = os.getenv("SHAI_DD_CHANNEL_ID", "").strip().strip('"').strip("'") env_cid = int(env_raw) if env_raw.isdigit() else 0 if env_cid and env_cid != st.channel_id: st.channel_id = env_cid self._save_state(st.to_row()) _log(f"channel id overridden by ENV -> {env_cid}") _log(f"state loaded: ch={st.channel_id} msg={st.message_id} disabled={st.disabled}") return st def _save_state(self, patch: Dict[str, Any]) -> None: dm = self.bot.data_manager rows = dm.get("dd_state") if not rows: dm.add("dd_state", patch); return def pred(_): return True def upd(d): d.update(patch); return d dm.update("dd_state", pred, upd) # ---- message helpers ---- async def _resolve_channel(self, channel_id: int) -> Optional[discord.TextChannel]: ch = self.bot.get_channel(channel_id) if ch is None: try: ch = await self.bot.fetch_channel(channel_id) except Exception: ch = None if not isinstance(ch, discord.TextChannel): return None me = ch.guild.me if me: p = ch.permissions_for(me) if not (p.read_messages and p.send_messages): _log(f"missing perms in #{ch.name} ({ch.id})") return ch async def _ensure_message(self, st: DDState, content_if_create: Optional[str]) -> Optional[discord.Message]: ch = await self._resolve_channel(st.channel_id) if not ch: _log("target channel not found/invalid:", st.channel_id) return None if st.message_id: try: return await ch.fetch_message(st.message_id) except discord.NotFound: st.message_id = None self._save_state({"message_id": None}) except discord.Forbidden: _log("cannot fetch message (no history); will NOT create a new one") return None except Exception as e: _log("fetch_message failed:", repr(e)) return None if content_if_create is None: return None try: msg = await ch.send(content_if_create) st.message_id = msg.id st.last_post_hash = _hash_text(content_if_create) self._save_state({"channel_id": st.channel_id, "message_id": msg.id, "last_post_hash": st.last_post_hash}) return msg except Exception as e: _log("failed to create message:", repr(e)) return None async def _set_message(self, st: DDState, content: str) -> Optional[int]: """Create-or-edit the single managed message. Returns message_id (if known) and stores last_post_hash.""" msg = await self._ensure_message(st, content_if_create=content if not st.message_id else None) if not msg: return None try: await msg.edit(content=content) st.last_post_hash = _hash_text(content) self._save_state({"last_post_hash": st.last_post_hash}) except discord.NotFound: st.message_id = None self._save_state({"message_id": None}) msg2 = await self._ensure_message(st, content_if_create=content) if msg2: try: await msg2.edit(content=content) st.last_post_hash = _hash_text(content) self._save_state({"message_id": msg2.id, "last_post_hash": st.last_post_hash}) except Exception: pass except discord.Forbidden: _log("edit forbidden; single-message mode keeps state") except Exception as e: _log("edit failed:", repr(e)) return st.message_id # ---- fetch orchestration ---- async def _fetch_dd_html_any(self) -> Tuple[str, str]: """Return (html, backend_tag). Preference: env → playwright(if available) → aiohttp.""" prefer = os.getenv("SHAI_DD_FETCHER", "").lower() # prefer Playwright if prefer in {"playwright", "pw", "browser"}: if await self._pw.ensure(): html = await self._pw.fetch(self.dd_url) return html, "playwright" else: # opportunistic: try Playwright first if available try: if await self._pw.ensure(): html = await self._pw.fetch(self.dd_url) return html, "playwright" except Exception: pass # fallback: aiohttp (may 403) html = await _fetch_via_aiohttp(self._session, self.dd_url) return html, "aiohttp" async def _attempt_fetch(self) -> Tuple[bool, List[Dict[str, str]], str]: import asyncio self._last_debug = "" if not self._session: self._last_debug = "internal: no HTTP session" return (False, [], "unable to check for new loot (will retry)") try: html, backend = await self._fetch_dd_html_any() self._last_debug = f"ok via {backend}" except aiohttp.ClientResponseError as e: self._last_debug = f"http {getattr(e,'status','?')} (aiohttp)" return (False, [], "unable to check for new loot (will retry)") except asyncio.TimeoutError: self._last_debug = "timeout" return (False, [], "unable to check for new loot (will retry)") except Exception as e: self._last_debug = f"{e.__class__.__name__}: {e}" return (False, [], "unable to check for new loot (will retry)") try: rows = _parse_dd_html(html) if not rows: self._last_debug = "parse: zero rows" return (False, [], "no loot entries detected yet (will retry)") clean = [] for r in rows: name = r["name"].strip() grid = r["grid"].strip().upper() loc = r["loc"].strip() amt = r["amount"].strip().replace("–", "-") chance = r["chance"].strip().replace(" ", "") if not name or not re.match(r"^[A-Z]\d+$", grid): continue clean.append({"name": name, "grid": grid, "loc": loc, "amount": amt, "chance": chance}) if not clean: self._last_debug = "parse: filtered to zero rows" return (False, [], "loot data format changed (will retry)") return (True, clean, "") except Exception as e: self._last_debug = f"parse error: {e.__class__.__name__}: {e}" return (False, [], "loot data parse error (will retry)") # ---- manual kick ---- async def _manual_kick_once(self, st: DDState) -> str: anchor_dt = _this_week_anchor() # always show "waiting" briefly so users see it's been kicked mid = await self._set_message(st, _fmt_waiting(anchor_dt)) if mid and not st.message_id: st.message_id = mid self._save_state(st.to_row()) ok, rows, note = await self._attempt_fetch() if not ok or not rows: if note: await self._set_message(st, _fmt_error(anchor_dt, note)) return f"Fetch failed: {note or 'unknown error'}" new_hash = _hash_records(rows) if st.prev_hash and new_hash == st.prev_hash: # still last week's data; keep waiting await self._set_message(st, _fmt_waiting(anchor_dt)) return "Data unchanged from previous cycle; still waiting." table = _fmt_rows(rows, anchor_dt) if st.last_hash and new_hash == st.last_hash: # same as what we already posted this cycle → ensure table is visible await self._set_message(st, table) return "Data unchanged; table ensured." # fresh for this cycle st.last_hash = new_hash st.last_success_ts = int(time.time()) self._save_state(st.to_row()) await self._set_message(st, table) return "Posted fresh data." # ---- runner ---- async def _runner(self): await self.bot.wait_until_ready() _log("runner loop started") while not self.bot.is_closed(): try: st = self._load_state() if st.disabled: await asyncio.sleep(300); continue now_dt = _utcnow() this_anchor_dt = _this_week_anchor(now_dt) this_anchor_ts = int(this_anchor_dt.timestamp()) next_anchor_dt = _next_week_anchor(now_dt) if st.week_anchor_ts != this_anchor_ts: # roll current → prev; reset current st.prev_hash = st.last_hash or st.prev_hash st.last_hash = "" st.week_anchor_ts = this_anchor_ts st.last_success_ts = 0 st.waiting_since_ts = this_anchor_ts st.last_attempt_ts = 0 self._save_state(st.to_row()) mid = await self._set_message(st, _fmt_waiting(this_anchor_dt)) if mid and not st.message_id: st.message_id = mid self._save_state(st.to_row()) _log("new week anchor -> waiting UPDATED (single-message)") if st.last_success_ts >= this_anchor_ts and st.last_success_ts < int(next_anchor_dt.timestamp()): await asyncio.sleep(min(3600, max(60, int(next_anchor_dt.timestamp() - time.time())))) continue if st.waiting_since_ts == 0: st.waiting_since_ts = this_anchor_ts delay = _backoff_delay_secs(st.waiting_since_ts, time.time()) if st.last_attempt_ts == 0 or (time.time() - st.last_attempt_ts) >= delay: ok, rows, note = await self._attempt_fetch() st.last_attempt_ts = int(time.time()) self._save_state(st.to_row()) if ok and rows: new_hash = _hash_records(rows) # 1) identical to last cycle → keep waiting; keep polling if st.prev_hash and new_hash == st.prev_hash: waiting = _fmt_waiting(this_anchor_dt) if st.last_post_hash != _hash_text(waiting): await self._set_message(st, waiting) _log("data equals prev week; still waiting") # no success_ts update; try again with backoff else: table = _fmt_rows(rows, this_anchor_dt) # 2) same as current hash → ensure table is visible (flip off any waiting message) if st.last_hash and new_hash == st.last_hash: if st.last_post_hash != _hash_text(table): await self._set_message(st, table) _log("data same as already posted; ensured table visible") # already have success this cycle; sleep a bit longer await asyncio.sleep(900) continue # 3) fresh data for this cycle → post table, mark success st.last_hash = new_hash st.last_success_ts = int(time.time()) self._save_state(st.to_row()) await self._set_message(st, table) _log("updated weekly uniques (fresh data)") await asyncio.sleep(900) continue else: if note: await self._set_message(st, _fmt_error(this_anchor_dt, note)) _log("fetch failed:", note, "| debug:", self._last_debug) await asyncio.sleep(30) except asyncio.CancelledError: break except Exception as e: _log("runner error:", repr(e)); await asyncio.sleep(30) _log("runner loop stopped") # ---- command ---- @app_commands.command(name="dd_update", description="[MOD] Control the Deep Desert weekly loot updater") @app_commands.describe(action="stop/resume/start", reason="Optional reason") async def dd_update(self, interaction: discord.Interaction, action: Literal["stop", "resume", "start"], reason: Optional[str] = None): st = self._load_state() is_owner = bool(interaction.guild and interaction.user.id == getattr(interaction.guild, "owner_id", 0)) if action == "start": perms_ok = is_owner else: perms = interaction.user.guild_permissions if interaction.guild else None perms_ok = bool(is_owner or (perms and perms.manage_guild)) if not perms_ok: return await interaction.response.send_message("You don't have permission to do that.", ephemeral=True) if action == "stop": st.disabled = True; self._save_state(st.to_row()) msg = "DD updater stopped."; if reason: msg += f" Reason: {reason}" return await interaction.response.send_message(msg, ephemeral=True) if action == "resume": st.disabled = False; self._save_state(st.to_row()) return await interaction.response.send_message("DD updater resumed.", ephemeral=True) # start (owner-only) st.disabled = False now_dt = _utcnow() st.week_anchor_ts = int(_this_week_anchor(now_dt).timestamp()) st.waiting_since_ts = int(time.time()) st.last_attempt_ts = 0 self._save_state(st.to_row()) ch = await self._resolve_channel(st.channel_id) if not ch: return await interaction.response.send_message( f"Manual start queued, but the target channel is invalid or missing.\n" f"Set **SHAI_DD_CHANNEL_ID** to a valid text channel ID (current: `{st.channel_id}`).", ephemeral=True ) await interaction.response.defer(ephemeral=True) status = await self._manual_kick_once(st) dbg = f" (debug: {self._last_debug})" if self._last_debug else "" await interaction.followup.send(f"Manual start triggered. {status}{dbg}", ephemeral=True) async def setup(bot: commands.Bot): await bot.add_cog(DDLootTableCog(bot))