135 lines
5.0 KiB
Python
135 lines
5.0 KiB
Python
"""Reusable async helpers shared across routers."""
|
|
import asyncio
|
|
import logging
|
|
from aiogram import Bot, types
|
|
from aiogram.types import InputMediaPhoto, InputMediaVideo
|
|
import bot_state as state
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def get_chat_display_name(user: types.User, anonymous_name: str | None = None) -> str:
|
|
if anonymous_name:
|
|
return anonymous_name
|
|
if user.username:
|
|
return f"{user.full_name} (@{user.username}, {user.id})"
|
|
return f"{user.full_name} ({user.id})"
|
|
|
|
|
|
def get_admin_display_name(user: types.User) -> str:
|
|
if user.username:
|
|
return f"Admin: {user.full_name} (@{user.username})"
|
|
return f"Admin: {user.full_name}"
|
|
|
|
|
|
def build_message_preview(message: types.Message) -> str:
|
|
if message.text: return message.text
|
|
if message.photo: return f"[Photo]\n{message.caption}" if message.caption else "[Photo]"
|
|
if message.video: return f"[Video]\n{message.caption}" if message.caption else "[Video]"
|
|
if message.document: return f"[Document]\n{message.caption}" if message.caption else "[Document]"
|
|
if message.voice: return "[Voice message]"
|
|
if message.audio: return f"[Audio]\n{message.caption}" if message.caption else "[Audio]"
|
|
if message.sticker: return f"[Sticker] {message.sticker.emoji or ''}".strip()
|
|
if message.animation: return f"[Animation]\n{message.caption}" if message.caption else "[Animation]"
|
|
return "[Unsupported message]"
|
|
|
|
|
|
def build_quote(text: str | None) -> str:
|
|
return "\n".join("> " + line for line in (text or "[No text]").splitlines())
|
|
|
|
|
|
async def is_group_admin(bot: Bot, chat_id: int, user_id: int) -> bool:
|
|
try:
|
|
member = await bot.get_chat_member(chat_id, user_id)
|
|
return member.status in ("administrator", "creator")
|
|
except Exception:
|
|
logger.warning("Failed to check admin status for user %s in chat %s", user_id, chat_id)
|
|
return False
|
|
|
|
|
|
async def send_media_items(
|
|
bot: Bot, chat_id: int, media: list[dict], caption: str | None = None
|
|
) -> None:
|
|
if len(media) == 1:
|
|
item = media[0]
|
|
pm = "HTML" if caption else None
|
|
try:
|
|
if item["type"] == "photo":
|
|
await bot.send_photo(chat_id, item["file_id"], caption=caption, parse_mode=pm)
|
|
else:
|
|
await bot.send_video(chat_id, item["file_id"], caption=caption, parse_mode=pm)
|
|
except Exception:
|
|
logger.exception("Failed to send single media to %s", chat_id)
|
|
return
|
|
for i in range(0, len(media), 10):
|
|
chunk = media[i:i + 10]
|
|
group = []
|
|
for j, item in enumerate(chunk):
|
|
ic = caption if i == 0 and j == 0 else None
|
|
if item["type"] == "photo":
|
|
group.append(InputMediaPhoto(media=item["file_id"], caption=ic, parse_mode="HTML" if ic else None))
|
|
else:
|
|
group.append(InputMediaVideo(media=item["file_id"], caption=ic, parse_mode="HTML" if ic else None))
|
|
try:
|
|
await bot.send_media_group(chat_id, media=group)
|
|
except Exception:
|
|
logger.exception("Failed to send media group to %s", chat_id)
|
|
|
|
|
|
def cancel_upload_prompt(user_id: int) -> None:
|
|
task = state.upload_prompt_tasks.pop(user_id, None)
|
|
if task:
|
|
task.cancel()
|
|
|
|
|
|
async def schedule_upload_prompt(bot: Bot, chat_id: int, user_id: int) -> None:
|
|
cancel_upload_prompt(user_id)
|
|
|
|
async def _delayed() -> None:
|
|
try:
|
|
await asyncio.sleep(0.8)
|
|
from keyboards import confirm_kb
|
|
prev_msg_id = state.upload_prompt_msg_ids.pop(user_id, None)
|
|
if prev_msg_id:
|
|
try:
|
|
await bot.delete_message(chat_id, prev_msg_id)
|
|
except Exception:
|
|
pass
|
|
sent = await bot.send_message(
|
|
chat_id,
|
|
"Media added. Send more or confirm submission.",
|
|
reply_markup=confirm_kb,
|
|
)
|
|
state.upload_prompt_msg_ids[user_id] = sent.message_id
|
|
except asyncio.CancelledError:
|
|
pass
|
|
except Exception:
|
|
logger.exception("Failed to send upload prompt to %s", chat_id)
|
|
finally:
|
|
if state.upload_prompt_tasks.get(user_id) is task:
|
|
state.upload_prompt_tasks.pop(user_id, None)
|
|
|
|
task = asyncio.create_task(_delayed())
|
|
state.upload_prompt_tasks[user_id] = task
|
|
|
|
|
|
async def start_chat_session(
|
|
bot: Bot,
|
|
user: types.User,
|
|
send_func,
|
|
fsm_state,
|
|
anonymous_name: str | None = None,
|
|
) -> None:
|
|
await fsm_state.clear()
|
|
cancel_upload_prompt(user.id)
|
|
state.chat_sessions[user.id] = {
|
|
"display_name": get_chat_display_name(user, anonymous_name),
|
|
"anonymous": bool(anonymous_name),
|
|
}
|
|
try:
|
|
await send_func(
|
|
"Your chat has started. Every message you send will be forwarded to the admins. "
|
|
"Send /stop to return to the menu."
|
|
)
|
|
except Exception:
|
|
logger.exception("Failed to send chat-start message to %s", user.id) |