- Correction to earlier patch. *Testing on live server is a pain, but I'm too lazy to fire up a dev server..*
157 lines
5.6 KiB
Python
157 lines
5.6 KiB
Python
# data_manager.py
|
|
import json
|
|
import threading
|
|
import shutil
|
|
import os
|
|
import time
|
|
from typing import Callable, Any
|
|
|
|
class DataManager:
|
|
def __init__(self, json_path: str):
|
|
self.json_path = json_path
|
|
self.lock = threading.Lock()
|
|
self._data = self._load()
|
|
|
|
def _default_payload(self):
|
|
return {
|
|
'agreed_rules': [],
|
|
'agreed_engagement': [],
|
|
'agreed_nickname': [],
|
|
'nick_same_confirmed': [],
|
|
'nick_nudged': [],
|
|
'nick_dm_map': [],
|
|
'pirates': [],
|
|
'modlog': [],
|
|
'reports': [],
|
|
'encounters': [],
|
|
'vc_channels': [],
|
|
'user_cards': [],
|
|
'pirates_list_posts': [],
|
|
'spicepay_prefs': [],
|
|
'nick_verified': [],
|
|
'nick_claim_pending': [],
|
|
'nick_reviews': [],
|
|
'rr_msg_channels': [],
|
|
'_counters': {}, # key -> int (metrics)
|
|
'_events_seen': {}, # stamp-key -> unix_ts (dedup window)
|
|
}
|
|
|
|
def _load(self):
|
|
try:
|
|
with open(self.json_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
if not isinstance(data, dict):
|
|
raise ValueError("root is not an object")
|
|
data.setdefault('_counters', {})
|
|
data.setdefault('_events_seen', {})
|
|
return data
|
|
except FileNotFoundError:
|
|
default = self._default_payload()
|
|
self._save(default)
|
|
return default
|
|
except Exception:
|
|
# Backup the broken file if it exists, then start fresh
|
|
try:
|
|
if os.path.exists(self.json_path):
|
|
shutil.copy2(self.json_path, self.json_path + ".corrupt.bak")
|
|
except Exception:
|
|
pass
|
|
default = self._default_payload()
|
|
self._save(default)
|
|
return default
|
|
|
|
def _safe_write(self, data: dict):
|
|
os.makedirs(os.path.dirname(self.json_path) or ".", exist_ok=True)
|
|
tmp = self.json_path + ".tmp"
|
|
with open(tmp, 'w', encoding='utf-8') as f:
|
|
json.dump(data, f, indent=4, ensure_ascii=False)
|
|
if os.path.exists(self.json_path):
|
|
try:
|
|
shutil.copy2(self.json_path, self.json_path + ".bak")
|
|
except Exception:
|
|
pass
|
|
os.replace(tmp, self.json_path)
|
|
|
|
def _save(self, data: dict):
|
|
self._safe_write(data)
|
|
|
|
# ------------- list helpers -------------
|
|
def get(self, category: str):
|
|
with self.lock:
|
|
return list(self._data.get(category, []))
|
|
|
|
def add(self, category: str, item: Any):
|
|
with self.lock:
|
|
self._data.setdefault(category, []).append(item)
|
|
self._save(self._data)
|
|
|
|
def remove(self, category: str, predicate: Callable[[Any], bool]):
|
|
with self.lock:
|
|
arr = self._data.get(category, [])
|
|
self._data[category] = [i for i in arr if not predicate(i)]
|
|
self._save(self._data)
|
|
|
|
def update(self, category: str, predicate: Callable[[Any], bool], updater: Callable[[dict], dict]) -> bool:
|
|
with self.lock:
|
|
arr = self._data.get(category, [])
|
|
for idx, item in enumerate(arr):
|
|
if predicate(item):
|
|
new_item = dict(item)
|
|
new_item = updater(new_item) or new_item
|
|
arr[idx] = new_item
|
|
self._data[category] = arr
|
|
self._save(self._data)
|
|
return True
|
|
return False
|
|
|
|
# ------------- counters -------------
|
|
def incr_counter(self, key: str, by: int = 1) -> int:
|
|
with self.lock:
|
|
c = self._data.setdefault('_counters', {})
|
|
c[key] = int(c.get(key, 0)) + int(by)
|
|
self._save(self._data)
|
|
return c[key]
|
|
|
|
def get_counter(self, key: str) -> int:
|
|
with self.lock:
|
|
return int(self._data.get('_counters', {}).get(key, 0))
|
|
|
|
def get_all_counters(self, prefix: str = "") -> dict[str, int]:
|
|
with self.lock:
|
|
c = dict(self._data.get('_counters', {}))
|
|
return {k: v for k, v in c.items() if (not prefix or k.startswith(prefix))}
|
|
|
|
# ------------- NEW: once-per-event increment with TTL dedup -------------
|
|
def incr_counter_once(self, counter_key: str, event_key: str, window_sec: float = 3.0, namespace: str = "cmd") -> int | None:
|
|
"""
|
|
Atomically: if `event_key` wasn't seen within `window_sec`, mark it seen and increment `counter_key`.
|
|
Returns new counter value, or None if this event is a duplicate.
|
|
"""
|
|
now = time.time()
|
|
stamp = f"{namespace}:{event_key}"
|
|
with self.lock:
|
|
seen = self._data.setdefault('_events_seen', {})
|
|
last = float(seen.get(stamp, 0.0))
|
|
if now - last <= float(window_sec):
|
|
# duplicate; ignore
|
|
return None
|
|
|
|
# mark seen
|
|
seen[stamp] = now
|
|
|
|
# opportunistic pruning to keep file small
|
|
if len(seen) > 5000:
|
|
cutoff = now - (window_sec * 2)
|
|
for k in list(seen.keys()):
|
|
try:
|
|
if float(seen.get(k, 0.0)) < cutoff:
|
|
seen.pop(k, None)
|
|
except Exception:
|
|
seen.pop(k, None)
|
|
|
|
# increment
|
|
counters = self._data.setdefault('_counters', {})
|
|
counters[counter_key] = int(counters.get(counter_key, 0)) + 1
|
|
self._save(self._data)
|
|
return counters[counter_key]
|