75 lines
3.1 KiB
Python
75 lines
3.1 KiB
Python
"""Outer middlewares: username tracker + TOS gate."""
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from aiogram import BaseMiddleware, Bot, types
|
|
import bot_state as state
|
|
from config import CHAT_CONTACT_ADMIN_GROUP_ID, CHATROOM_SEMIPUBLIC_GROUP_ID
|
|
from keyboards import confirm_tos_kb
|
|
from persistence import save_confirmed_users
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
TOS_TEXT = (
|
|
"Hey! \n\n"
|
|
"This bot is affiliated with services offering extreme contents and services involving "
|
|
"topics / touching on topics such as "
|
|
"political controversies, gore, self-injury etc. \n\n"
|
|
"If you acknowledge that and want to proceed, "
|
|
"please tap 'Yes'.\n\n"
|
|
"This is solely a trigger warning that will only show up once."
|
|
)
|
|
|
|
|
|
class UsernameTrackerMiddleware(BaseMiddleware):
|
|
async def __call__(self, handler, event, data):
|
|
user: types.User | None = data.get("event_from_user")
|
|
if user:
|
|
old = state.known_usernames.get(user.id, "UNSET")
|
|
if old != "UNSET" and old != user.username:
|
|
old_m = f'@{old}' if old else str(user.id)
|
|
new_m = f'@{user.username}' if user.username else str(user.id)
|
|
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
|
|
text = f"{old_m} changed username to {new_m} at {now}."
|
|
bot: Bot = data["bot"]
|
|
for chat_id in (CHATROOM_SEMIPUBLIC_GROUP_ID, CHAT_CONTACT_ADMIN_GROUP_ID):
|
|
try:
|
|
await bot.send_message(chat_id, text, parse_mode="HTML")
|
|
except Exception:
|
|
logger.warning("Failed to broadcast username change to %s", chat_id)
|
|
state.known_usernames[user.id] = user.username
|
|
return await handler(event, data)
|
|
|
|
|
|
class TosMiddleware(BaseMiddleware):
|
|
"""Block all private interactions until the user confirms the TOS."""
|
|
|
|
async def __call__(self, handler, event, data):
|
|
user: types.User | None = data.get("event_from_user")
|
|
if not user:
|
|
return await handler(event, data)
|
|
|
|
is_private = (
|
|
(isinstance(event, types.Message) and event.chat.type == "private")
|
|
or (isinstance(event, types.CallbackQuery) and event.message and event.message.chat.type == "private")
|
|
)
|
|
if not is_private:
|
|
return await handler(event, data)
|
|
|
|
# Always let the TOS confirmation callback through FIRST
|
|
if isinstance(event, types.CallbackQuery) and event.data == "tos_confirm":
|
|
return await handler(event, data)
|
|
|
|
if user.id not in state.confirmed_users:
|
|
bot: Bot = data["bot"]
|
|
try:
|
|
await bot.send_message(user.id, TOS_TEXT, parse_mode="HTML", reply_markup=confirm_tos_kb)
|
|
except Exception:
|
|
logger.warning("Failed to send TOS to user %s", user.id)
|
|
if isinstance(event, types.CallbackQuery):
|
|
try:
|
|
await event.answer()
|
|
except Exception:
|
|
pass
|
|
return # gate: do NOT call handler
|
|
|
|
return await handler(event, data) |