"""Reusable async helpers shared across routers.""" import asyncio import logging from aiogram import Bot, types from aiogram.types import InputMediaPhoto, InputMediaVideo import bot_state as state logger = logging.getLogger(__name__) def get_chat_display_name(user: types.User, anonymous_name: str | None = None) -> str: if anonymous_name: return anonymous_name if user.username: return f"{user.full_name} (@{user.username}, {user.id})" return f"{user.full_name} ({user.id})" def get_admin_display_name(user: types.User) -> str: if user.username: return f"Admin: {user.full_name} (@{user.username})" return f"Admin: {user.full_name}" def build_message_preview(message: types.Message) -> str: if message.text: return message.text if message.photo: return f"[Photo]\n{message.caption}" if message.caption else "[Photo]" if message.video: return f"[Video]\n{message.caption}" if message.caption else "[Video]" if message.document: return f"[Document]\n{message.caption}" if message.caption else "[Document]" if message.voice: return "[Voice message]" if message.audio: return f"[Audio]\n{message.caption}" if message.caption else "[Audio]" if message.sticker: return f"[Sticker] {message.sticker.emoji or ''}".strip() if message.animation: return f"[Animation]\n{message.caption}" if message.caption else "[Animation]" return "[Unsupported message]" def build_quote(text: str | None) -> str: return "\n".join("> " + line for line in (text or "[No text]").splitlines()) async def is_group_admin(bot: Bot, chat_id: int, user_id: int) -> bool: try: member = await bot.get_chat_member(chat_id, user_id) return member.status in ("administrator", "creator") except Exception: logger.warning("Failed to check admin status for user %s in chat %s", user_id, chat_id) return False async def send_media_items( bot: Bot, chat_id: int, media: list[dict], caption: str | None = None ) -> None: if len(media) == 1: item = media[0] pm = "HTML" if caption else None try: if item["type"] == "photo": await bot.send_photo(chat_id, item["file_id"], caption=caption, parse_mode=pm) else: await bot.send_video(chat_id, item["file_id"], caption=caption, parse_mode=pm) except Exception: logger.exception("Failed to send single media to %s", chat_id) return for i in range(0, len(media), 10): chunk = media[i:i + 10] group = [] for j, item in enumerate(chunk): ic = caption if i == 0 and j == 0 else None if item["type"] == "photo": group.append(InputMediaPhoto(media=item["file_id"], caption=ic, parse_mode="HTML" if ic else None)) else: group.append(InputMediaVideo(media=item["file_id"], caption=ic, parse_mode="HTML" if ic else None)) try: await bot.send_media_group(chat_id, media=group) except Exception: logger.exception("Failed to send media group to %s", chat_id) def cancel_upload_prompt(user_id: int) -> None: task = state.upload_prompt_tasks.pop(user_id, None) if task: task.cancel() async def schedule_upload_prompt(bot: Bot, chat_id: int, user_id: int) -> None: cancel_upload_prompt(user_id) async def _delayed() -> None: try: await asyncio.sleep(0.8) from keyboards import confirm_kb prev_msg_id = state.upload_prompt_msg_ids.pop(user_id, None) if prev_msg_id: try: await bot.delete_message(chat_id, prev_msg_id) except Exception: pass sent = await bot.send_message( chat_id, "Media added. Send more or confirm submission.", reply_markup=confirm_kb, ) state.upload_prompt_msg_ids[user_id] = sent.message_id except asyncio.CancelledError: pass except Exception: logger.exception("Failed to send upload prompt to %s", chat_id) finally: if state.upload_prompt_tasks.get(user_id) is task: state.upload_prompt_tasks.pop(user_id, None) task = asyncio.create_task(_delayed()) state.upload_prompt_tasks[user_id] = task async def start_chat_session( bot: Bot, user: types.User, send_func, fsm_state, anonymous_name: str | None = None, ) -> None: await fsm_state.clear() cancel_upload_prompt(user.id) state.chat_sessions[user.id] = { "display_name": get_chat_display_name(user, anonymous_name), "anonymous": bool(anonymous_name), } try: await send_func( "Your chat has started. Every message you send will be forwarded to the admins. " "Send /stop to return to the menu." ) except Exception: logger.exception("Failed to send chat-start message to %s", user.id)