Files
Telegram_ShBot/routers/group.py
2026-05-13 23:38:18 +02:00

186 lines
7.5 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Handlers for the semipublic chatroom and backup group."""
import asyncio
import logging
from aiogram import F, Router, types
from aiogram.filters import Command
import bot_state as state_store
from config import (
BLACKLIST_MODE,
CHATROOM_PRIVATE_BACKUP_GROUP_ID,
CHATROOM_SEMIPUBLIC_GROUP_ID,
PURGE_INTERVAL_HOURS,
INVITELINK_ARCH,
INVITELINK_CHAT,
BOT_USERNAME,
)
from filters import contains_link, process_blacklisted_message
from hashing import register_file
from utils import is_group_admin
logger = logging.getLogger(__name__)
router = Router(name="group")
# ── Background purge task ─────────────────────────────────────────────────────
async def purge_chatroom_semipublic_group(bot) -> None:
while True:
await asyncio.sleep(PURGE_INTERVAL_HOURS * 3600)
for message_id, is_bot_msg in list(state_store.chatroom_semipublic_group_messages.items()):
if is_bot_msg:
continue
try:
await bot.delete_message(CHATROOM_SEMIPUBLIC_GROUP_ID, message_id)
except Exception:
pass
state_store.chatroom_semipublic_group_messages.pop(message_id, None)
# ── Backup-group tracker ──────────────────────────────────────────────────────
@router.message(F.chat.id == CHATROOM_PRIVATE_BACKUP_GROUP_ID)
async def track_backup_group(message: types.Message):
if message.photo:
register_file(message.photo[-1].file_unique_id)
elif message.video:
register_file(message.video.file_unique_id)
elif message.document:
register_file(message.document.file_unique_id)
@router.message(
F.chat.id == CHATROOM_PRIVATE_BACKUP_GROUP_ID,
Command("reindex"),
)
async def reindex_backup(message: types.Message):
if not await is_group_admin(message.bot, CHATROOM_PRIVATE_BACKUP_GROUP_ID, message.from_user.id):
return
count = len(state_store.backup_hashes)
await message.reply(
f" Currently <b>{count}</b> files indexed in the hash cache.\n\n"
"The Bot API does not expose a bulk message history endpoint. "
"To do a full historical reindex, forward all older media back into "
"this group — the bot will register each file automatically as it arrives.",
parse_mode="HTML",
)
# ── Welcome / join request ────────────────────────────────────────────────────
@router.message(F.new_chat_members)
async def welcome(message: types.Message):
async with state_store.welcome_lock:
try:
await message.delete()
except Exception:
pass
for mid in state_store.welcome_messages.get(message.chat.id, []):
try:
await message.bot.delete_message(message.chat.id, mid)
except Exception:
pass
sent = []
for user in message.new_chat_members:
mention = (
f'@{user.username}' if user.username else str(user.id)
)
text = (
f"{mention} welcome to the official s3lfharm archive.\n\n"
"You can view media by checking pinned messages or the media section.\n\n"
"Feel free to share your own s3lfharm imagery using "
f'<a href="https://t.me/{BOT_USERNAME}">this bot</a>.\n\n'
f'\U0001f48b <a href="{INVITELINK_ARCH}">Join the public archive</a>\n\n'
f'<blockquote>S3LF HARM</blockquote>'
)
try:
msg = await message.bot.send_message(
message.chat.id,
text,
parse_mode="HTML",
disable_web_page_preview=True,
)
sent.append(msg.message_id)
except Exception:
logger.exception("Failed to send welcome in %s", message.chat.id)
state_store.welcome_messages[message.chat.id] = sent
@router.chat_join_request()
async def auto_approve_join_request(request: types.ChatJoinRequest):
try:
await request.bot.approve_chat_join_request(request.chat.id, request.from_user.id)
logger.info("Approved user %s to join %s", request.from_user.id, request.chat.id)
except Exception:
logger.exception("Failed to approve join request from %s", request.from_user.id)
# ── Auto-delete Telegram service/system messages in semipublic group ──────────
@router.message(
F.chat.id == CHATROOM_SEMIPUBLIC_GROUP_ID,
F.content_type.in_({
types.ContentType.NEW_CHAT_MEMBERS,
types.ContentType.LEFT_CHAT_MEMBER,
types.ContentType.NEW_CHAT_TITLE,
types.ContentType.NEW_CHAT_PHOTO,
types.ContentType.DELETE_CHAT_PHOTO,
types.ContentType.GROUP_CHAT_CREATED,
types.ContentType.SUPERGROUP_CHAT_CREATED,
types.ContentType.MESSAGE_AUTO_DELETE_TIMER_CHANGED,
types.ContentType.PINNED_MESSAGE,
}),
)
async def delete_service_messages(message: types.Message):
try:
await message.delete()
except Exception:
pass
# ── Semipublic group moderation ───────────────────────────────────────────────
@router.message(F.chat.id == CHATROOM_SEMIPUBLIC_GROUP_ID)
async def handle_semipublic_message(message: types.Message):
state_store.chatroom_semipublic_group_messages[message.message_id] = bool(
message.from_user and message.from_user.is_bot
)
if not message.from_user or message.from_user.is_bot:
return
# ── Define text first ────────────────────────────────────────────────────
text = message.text or message.caption or ""
# ── Blacklist check (all users including admins) ──────────────────────────
censored_text, was_censored = process_blacklisted_message(text)
if was_censored:
try:
await message.delete()
except Exception:
pass
sender = (
f'@{message.from_user.username}'
if message.from_user.username
else str(message.from_user.id)
)
if BLACKLIST_MODE == 1:
try:
await message.bot.send_message(
CHATROOM_SEMIPUBLIC_GROUP_ID,
f"Censored text from {sender}\n<blockquote>{censored_text}</blockquote>",
parse_mode="HTML",
)
except Exception:
logger.exception("Failed to send censor notice")
return
# ── Link check — admins are fully exempt ─────────────────────────────────
if await is_group_admin(message.bot, CHATROOM_SEMIPUBLIC_GROUP_ID, message.from_user.id):
return # admin: keep message as-is, no further checks
has_link_entity = any(
e.type in ("url", "text_link")
for e in (message.entities or []) + (message.caption_entities or [])
)
if has_link_entity or contains_link(text):
try:
await message.delete()
except Exception:
pass