# data_manager.py import json import threading import shutil import os import time from typing import Callable, Any class DataManager: def __init__(self, json_path: str): self.json_path = json_path self.lock = threading.Lock() self._data = self._load() def _default_payload(self): return { 'agreed_rules': [], 'agreed_engagement': [], 'agreed_nickname': [], 'nick_same_confirmed': [], 'nick_nudged': [], 'nick_dm_map': [], 'pirates': [], 'modlog': [], 'reports': [], 'encounters': [], 'vc_channels': [], 'user_cards': [], 'pirates_list_posts': [], 'spicepay_prefs': [], 'nick_verified': [], 'nick_claim_pending': [], 'nick_reviews': [], 'rr_msg_channels': [], '_counters': {}, # key -> int (metrics) '_events_seen': {}, # optional (kept for other uses) '_counter_last_ts': {}, # key -> last increment unix_ts (timelock) } def _load(self): try: with open(self.json_path, 'r', encoding='utf-8') as f: data = json.load(f) if not isinstance(data, dict): raise ValueError("root is not an object") data.setdefault('_counters', {}) data.setdefault('_events_seen', {}) data.setdefault('_counter_last_ts', {}) return data except FileNotFoundError: default = self._default_payload() self._save(default) return default except Exception: # Backup the broken file if it exists, then start fresh try: if os.path.exists(self.json_path): shutil.copy2(self.json_path, self.json_path + ".corrupt.bak") except Exception: pass default = self._default_payload() self._save(default) return default def _safe_write(self, data: dict): os.makedirs(os.path.dirname(self.json_path) or ".", exist_ok=True) tmp = self.json_path + ".tmp" with open(tmp, 'w', encoding='utf-8') as f: json.dump(data, f, indent=4, ensure_ascii=False) if os.path.exists(self.json_path): try: shutil.copy2(self.json_path, self.json_path + ".bak") except Exception: pass os.replace(tmp, self.json_path) def _save(self, data: dict): self._safe_write(data) # ------------- list helpers ------------- def get(self, category: str): with self.lock: return list(self._data.get(category, [])) def add(self, category: str, item: Any): with self.lock: self._data.setdefault(category, []).append(item) self._save(self._data) def remove(self, category: str, predicate: Callable[[Any], bool]): with self.lock: arr = self._data.get(category, []) self._data[category] = [i for i in arr if not predicate(i)] self._save(self._data) def update(self, category: str, predicate: Callable[[Any], bool], updater: Callable[[dict], dict]) -> bool: with self.lock: arr = self._data.get(category, []) for idx, item in enumerate(arr): if predicate(item): new_item = dict(item) new_item = updater(new_item) or new_item arr[idx] = new_item self._data[category] = arr self._save(self._data) return True return False # ------------- counters (plain) ------------- def incr_counter(self, key: str, by: int = 1) -> int: with self.lock: c = self._data.setdefault('_counters', {}) c[key] = int(c.get(key, 0)) + int(by) self._save(self._data) return c[key] def get_counter(self, key: str) -> int: with self.lock: return int(self._data.get('_counters', {}).get(key, 0)) def get_all_counters(self, prefix: str = "") -> dict[str, int]: with self.lock: c = dict(self._data.get('_counters', {})) return {k: v for k, v in c.items() if (not prefix or k.startswith(prefix))} # ------------- counters (timelocked) ------------- def incr_counter_timelocked(self, counter_key: str, window_sec: float = 1.0) -> int | None: """ Increment `counter_key` at most once per `window_sec`. Returns the new value if incremented, or None if suppressed by the timelock. """ now = time.time() with self.lock: last_map = self._data.setdefault('_counter_last_ts', {}) last = float(last_map.get(counter_key, 0.0)) if now - last < float(window_sec): # within lock window -> ignore return None # increment and stamp counters = self._data.setdefault('_counters', {}) counters[counter_key] = int(counters.get(counter_key, 0)) + 1 last_map[counter_key] = now # opportunistic pruning for very old stamps (keeps file smaller) if len(last_map) > 5000: cutoff = now - (window_sec * 60) for k in list(last_map.keys()): try: if float(last_map.get(k, 0.0)) < cutoff: last_map.pop(k, None) except Exception: last_map.pop(k, None) self._save(self._data) return counters[counter_key]