V0.1.1 release, close to actual release. Bug & security fixes/improvements.
This commit is contained in:
@@ -6,7 +6,7 @@ use teloxide::{
|
||||
prelude::*,
|
||||
types::{
|
||||
InlineKeyboardButton, InlineKeyboardMarkup, Message, MessageId, ParseMode, CallbackQuery,
|
||||
ChatMemberStatus, UserId, ChatPermissions, InputMedia, InputMediaPhoto, InputMediaDocument, InputFile,
|
||||
ChatMemberStatus, UserId, ChatPermissions, InputMedia, InputMediaPhoto, InputMediaDocument, InputMediaVideo, InputMediaAudio, InputFile,
|
||||
},
|
||||
RequestError,
|
||||
utils::command::BotCommands,
|
||||
@@ -37,15 +37,14 @@ pub enum BotState {
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
|
||||
#[derive(Default)]
|
||||
pub enum UploadType {
|
||||
#[default]
|
||||
Media,
|
||||
Document,
|
||||
Text,
|
||||
}
|
||||
|
||||
impl Default for UploadType {
|
||||
fn default() -> Self { UploadType::Media }
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub struct StagedItem {
|
||||
@@ -402,7 +401,7 @@ async fn handle_message_inner(
|
||||
}
|
||||
|
||||
// Admin commands in groups
|
||||
if msg.chat.is_group() || msg.chat.is_supergroup() {
|
||||
if msg.chat.is_group() || msg.chat.is_supergroup() || msg.chat.is_channel() {
|
||||
if let Some(text) = msg.text() {
|
||||
let cmd = text.split_whitespace().next().unwrap_or("");
|
||||
match cmd {
|
||||
@@ -418,14 +417,14 @@ async fn handle_message_inner(
|
||||
}
|
||||
"/blacklist_uid" => {
|
||||
tracing::info!("admin command /blacklist_uid chat={} user={}", chat_id, user_id);
|
||||
if is_admin(&bot, msg.chat.id, user.id).await {
|
||||
if ctx.config.groups.admin_group_ids.contains(&chat_id.0) && is_admin(&bot, msg.chat.id, user.id).await {
|
||||
handle_admin_blacklist_uid(&bot, chat_id, text, &ctx).await?;
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
"/whitelist_uid" => {
|
||||
tracing::info!("admin command /whitelist_uid chat={} user={}", chat_id, user_id);
|
||||
if is_admin(&bot, msg.chat.id, user.id).await {
|
||||
if ctx.config.groups.admin_group_ids.contains(&chat_id.0) && is_admin(&bot, msg.chat.id, user.id).await {
|
||||
handle_admin_whitelist_uid(&bot, chat_id, text, &ctx).await?;
|
||||
}
|
||||
return Ok(());
|
||||
@@ -436,18 +435,18 @@ async fn handle_message_inner(
|
||||
let help_text = r#"<b>Admin Commands</b>
|
||||
|
||||
/reload — Reload moderation lists.
|
||||
/blacklist_uid <ID> — Blacklist a user ID.
|
||||
/whitelist_uid <ID> — Remove a user from blacklist.
|
||||
/blacklist_uid [ID] — Blacklist a user ID.
|
||||
/whitelist_uid [ID] — Remove a user from blacklist.
|
||||
/help — Show this message.
|
||||
/get_id — Get current chat ID.
|
||||
/get_id <@username> — Search administrators by username.
|
||||
/get_id <displayname> — Search members in this chat by display name.
|
||||
/get_id [@username] — Search administrators by username.
|
||||
/get_id [displayname] — Search members in this chat by display name.
|
||||
/create_submit_forward <dest> <review> [msg] — Create a submission forward.
|
||||
/show_c_forward [page] — List forward links.
|
||||
/add_blacklist <user_id> — Blacklist a user in all active forwards.
|
||||
/rm_blacklist <user_id> — Remove a user from blacklist in all active forwards.
|
||||
/sban @user <dur> <unit> [reason] — Ban for duration
|
||||
/smute @user <dur> <unit> [reason] — Mute for duration
|
||||
/add_blacklist [user_id] — Blacklist a user in all active forwards.
|
||||
/rm_blacklist [user_id] — Remove a user from blacklist in all active forwards.
|
||||
/sban @user [dur] [unit] [reason] — Ban for duration
|
||||
/smute @user [dur] [unit] [reason] — Mute for duration
|
||||
/mute @user [reason] — Mute indefinitely
|
||||
/pban @user [reason] — Permanent ban
|
||||
/kick @user [reason] — Kick from group
|
||||
@@ -769,20 +768,18 @@ async fn handle_message_inner(
|
||||
if let Some(def) = forward_repo.get_by_code(code).await? {
|
||||
if def.revoked_at.is_some() {
|
||||
bot.send_message(chat_id, "This submission link has been revoked.").await?;
|
||||
} else if forward_repo.is_allowed(def.id, user_id).await? {
|
||||
dialogue.update(BotState::SubmitMode { forward_id: def.id, code: code.to_string() }).await?;
|
||||
let keyboard = InlineKeyboardMarkup::new(vec![vec![
|
||||
InlineKeyboardButton::callback("[ Continue ]", "v1:submit:continue"),
|
||||
InlineKeyboardButton::callback("[ Exit ]", "v1:submit:exit"),
|
||||
]]);
|
||||
bot.send_message(chat_id, "<b>[ Submission Mode ]</b>\n\nYou are about to submit content to a forward.\n\n<i>Continue to upload content for submission, or exit to return to the main menu.</i>")
|
||||
.parse_mode(ParseMode::Html)
|
||||
.reply_markup(keyboard)
|
||||
.await?;
|
||||
} else {
|
||||
if forward_repo.is_allowed(def.id, user_id).await? {
|
||||
dialogue.update(BotState::SubmitMode { forward_id: def.id, code: code.to_string() }).await?;
|
||||
let keyboard = InlineKeyboardMarkup::new(vec![vec![
|
||||
InlineKeyboardButton::callback("[ Continue ]", "v1:submit:continue"),
|
||||
InlineKeyboardButton::callback("[ Exit ]", "v1:submit:exit"),
|
||||
]]);
|
||||
bot.send_message(chat_id, "<b>[ Submission Mode ]</b>\n\nYou are about to submit content to a forward.\n\n<i>Continue to upload content for submission, or exit to return to the main menu.</i>")
|
||||
.parse_mode(ParseMode::Html)
|
||||
.reply_markup(keyboard)
|
||||
.await?;
|
||||
} else {
|
||||
bot.send_message(chat_id, "You are not allowed to use this submission link.").await?;
|
||||
}
|
||||
bot.send_message(chat_id, "You are not allowed to use this submission link.").await?;
|
||||
}
|
||||
} else {
|
||||
bot.send_message(chat_id, "Invalid submission link.").await?;
|
||||
@@ -968,7 +965,7 @@ async fn handle_callback_inner(
|
||||
..Default::default()
|
||||
};
|
||||
dialogue.update(BotState::UploadOptions { items, options: options.clone() }).await?;
|
||||
refresh_options_message(&bot, chat_id, &vec![], &options).await?;
|
||||
refresh_options_message(&bot, chat_id, &[], &options).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1556,6 +1553,10 @@ async fn finalize_upload(
|
||||
let input_file = InputFile::memory(bytes.clone());
|
||||
let media = if mime_type.starts_with("image/") {
|
||||
InputMedia::Photo(InputMediaPhoto::new(input_file))
|
||||
} else if mime_type.starts_with("video/") {
|
||||
InputMedia::Video(InputMediaVideo::new(input_file))
|
||||
} else if mime_type.starts_with("audio/") {
|
||||
InputMedia::Audio(InputMediaAudio::new(input_file))
|
||||
} else {
|
||||
InputMedia::Document(InputMediaDocument::new(input_file))
|
||||
};
|
||||
@@ -1572,6 +1573,14 @@ async fn finalize_upload(
|
||||
d.caption = Some(review_text.clone());
|
||||
d.parse_mode = Some(ParseMode::Html);
|
||||
}
|
||||
InputMedia::Video(v) => {
|
||||
v.caption = Some(review_text.clone());
|
||||
v.parse_mode = Some(ParseMode::Html);
|
||||
}
|
||||
InputMedia::Audio(a) => {
|
||||
a.caption = Some(review_text.clone());
|
||||
a.parse_mode = Some(ParseMode::Html);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
@@ -1630,7 +1639,7 @@ async fn show_previous_uploads(
|
||||
let repo = ContentRepo::new(ctx.db.conn());
|
||||
let total = repo.count_by_user(user_id).await?;
|
||||
let items = repo.list_by_user(user_id, 10, page * 10).await?;
|
||||
let total_pages = (total + 9) / 10;
|
||||
let total_pages = total.div_ceil(10);
|
||||
|
||||
if items.is_empty() {
|
||||
bot.send_message(chat_id, "<i>You have no uploads.</i>")
|
||||
@@ -1765,32 +1774,29 @@ async fn handle_admin_callback(
|
||||
ctx: &BotContext,
|
||||
) -> HandlerResult {
|
||||
tracing::info!("handle_admin_callback user={} action={}", user_id, parts[2]);
|
||||
match parts[2] {
|
||||
"delcontent" => {
|
||||
let cxid = parts[3];
|
||||
let content_id = ContentId::try_from(cxid)?;
|
||||
let content_repo = ContentRepo::new(ctx.db.conn());
|
||||
let content = match content_repo.get(&content_id).await? {
|
||||
Some(c) => c,
|
||||
None => {
|
||||
bot.send_message(chat_id, "<b>Content not found.</b>")
|
||||
.parse_mode(ParseMode::Html).await?;
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
let is_admin = is_admin_in_chat(bot, chat_id, UserId(user_id as u64)).await;
|
||||
if !is_admin && content.user_id != user_id {
|
||||
bot.send_message(chat_id, "<b>Unauthorized.</b>")
|
||||
if parts[2] == "delcontent" {
|
||||
let cxid = parts[3];
|
||||
let content_id = ContentId::try_from(cxid)?;
|
||||
let content_repo = ContentRepo::new(ctx.db.conn());
|
||||
let content = match content_repo.get(&content_id).await? {
|
||||
Some(c) => c,
|
||||
None => {
|
||||
bot.send_message(chat_id, "<b>Content not found.</b>")
|
||||
.parse_mode(ParseMode::Html).await?;
|
||||
return Ok(());
|
||||
}
|
||||
ctx.pipeline.delete_content(&content_id, ctx.config.content.keep_content).await.ok();
|
||||
content_repo.set_status(&content_id, ContentStatus::Deleted).await.ok();
|
||||
bot.send_message(chat_id, format!("Content <code>{}</code> deleted.", cxid))
|
||||
};
|
||||
let is_admin = is_admin_in_chat(bot, chat_id, UserId(user_id as u64)).await;
|
||||
if !is_admin && content.user_id != user_id {
|
||||
bot.send_message(chat_id, "<b>Unauthorized.</b>")
|
||||
.parse_mode(ParseMode::Html).await?;
|
||||
return Ok(());
|
||||
}
|
||||
_ => {}
|
||||
ctx.pipeline.delete_content(&content_id, ctx.config.content.keep_content).await.ok();
|
||||
content_repo.set_status(&content_id, ContentStatus::Deleted).await.ok();
|
||||
bot.send_message(chat_id, format!("Content <code>{}</code> deleted.", cxid))
|
||||
.parse_mode(ParseMode::Html).await?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if !is_admin_in_chat(bot, chat_id, UserId(user_id as u64)).await {
|
||||
@@ -1936,13 +1942,17 @@ async fn handle_forward_callback(
|
||||
"<i>anonymous</i>".to_string()
|
||||
};
|
||||
|
||||
let caption = format!(
|
||||
let mut caption = format!(
|
||||
"{}\n\nSubmitted by: {}\nDirect link: <code>{}</code>\nForward link: <code>{}</code>",
|
||||
escape_html(&forward_def.forward_message),
|
||||
author_line,
|
||||
link,
|
||||
forward_link
|
||||
);
|
||||
// Telegram media caption limit is 1024 characters
|
||||
if caption.chars().count() > 1024 {
|
||||
caption = caption.chars().take(1024).collect();
|
||||
}
|
||||
|
||||
// 6. Forward content to destination (media batching or text-only)
|
||||
let file_repo = ContentFileRepo::new(ctx.db.conn());
|
||||
@@ -1983,6 +1993,10 @@ async fn handle_forward_callback(
|
||||
let input_file = InputFile::memory(bytes.clone());
|
||||
let media = if mime_type.starts_with("image/") {
|
||||
InputMedia::Photo(InputMediaPhoto::new(input_file))
|
||||
} else if mime_type.starts_with("video/") {
|
||||
InputMedia::Video(InputMediaVideo::new(input_file))
|
||||
} else if mime_type.starts_with("audio/") {
|
||||
InputMedia::Audio(InputMediaAudio::new(input_file))
|
||||
} else {
|
||||
InputMedia::Document(InputMediaDocument::new(input_file))
|
||||
};
|
||||
@@ -1999,6 +2013,14 @@ async fn handle_forward_callback(
|
||||
d.caption = Some(caption.clone());
|
||||
d.parse_mode = Some(ParseMode::Html);
|
||||
}
|
||||
InputMedia::Video(v) => {
|
||||
v.caption = Some(caption.clone());
|
||||
v.parse_mode = Some(ParseMode::Html);
|
||||
}
|
||||
InputMedia::Audio(a) => {
|
||||
a.caption = Some(caption.clone());
|
||||
a.parse_mode = Some(ParseMode::Html);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
@@ -2131,11 +2153,7 @@ async fn handle_get_id_search(
|
||||
_ctx: &BotContext,
|
||||
) -> HandlerResult {
|
||||
let query_lower = query.to_lowercase();
|
||||
let search_term = if query_lower.starts_with('@') {
|
||||
&query_lower[1..]
|
||||
} else {
|
||||
&query_lower
|
||||
};
|
||||
let search_term = query_lower.strip_prefix('@').unwrap_or(&query_lower);
|
||||
let mut matches = vec![];
|
||||
|
||||
// Try administrators first (bots usually can see them)
|
||||
@@ -2168,11 +2186,6 @@ async fn handle_admin_blacklist_uid(
|
||||
text: &str,
|
||||
ctx: &BotContext,
|
||||
) -> HandlerResult {
|
||||
if !ctx.config.groups.admin_group_ids.contains(&chat_id.0) {
|
||||
bot.send_message(chat_id, "This command is only available in the admin group.")
|
||||
.await?;
|
||||
return Ok(());
|
||||
}
|
||||
let uid = text.split_whitespace().nth(1).and_then(|s| s.parse::<i64>().ok());
|
||||
let Some(uid) = uid else {
|
||||
bot.send_message(chat_id, "Usage: /blacklist_uid <user_id>").await?;
|
||||
@@ -2192,11 +2205,6 @@ async fn handle_admin_whitelist_uid(
|
||||
text: &str,
|
||||
ctx: &BotContext,
|
||||
) -> HandlerResult {
|
||||
if !ctx.config.groups.admin_group_ids.contains(&chat_id.0) {
|
||||
bot.send_message(chat_id, "This command is only available in the admin group.")
|
||||
.await?;
|
||||
return Ok(());
|
||||
}
|
||||
let uid = text.split_whitespace().nth(1).and_then(|s| s.parse::<i64>().ok());
|
||||
let Some(uid) = uid else {
|
||||
bot.send_message(chat_id, "Usage: /whitelist_uid <user_id>").await?;
|
||||
@@ -2241,6 +2249,7 @@ fn parse_duration(parts: &[&str]) -> Result<Option<i64>, String> {
|
||||
Ok(Some(total as i64))
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn propagate_punishment(
|
||||
bot: &Bot,
|
||||
ctx: &BotContext,
|
||||
|
||||
@@ -37,3 +37,4 @@ password-hash = "0.5"
|
||||
hmac = "0.12"
|
||||
sha2 = "0.10"
|
||||
subtle = "2.5"
|
||||
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }
|
||||
|
||||
@@ -10,7 +10,7 @@ use axum::{
|
||||
use cgcx_config::Config;
|
||||
use cgcx_core::{ContentId, CgcxError};
|
||||
use cgcx_crypto::{unwrap_content_key, DecryptStream, MasterKey};
|
||||
use cgcx_db::{Database, ContentRepo, ContentFileRepo, UserRepo};
|
||||
use cgcx_db::{Database, ContentRepo, ContentFileRepo, UserRepo, ReportRepo};
|
||||
use cgcx_storage::Storage;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::net::IpAddr;
|
||||
@@ -37,6 +37,7 @@ struct AppState {
|
||||
master_key: Arc<MasterKey>,
|
||||
cookie_secret: Vec<u8>,
|
||||
allowed_roots: Arc<Vec<std::path::PathBuf>>,
|
||||
http_client: reqwest::Client,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
@@ -77,6 +78,11 @@ struct VerifyPasswordRequest {
|
||||
password: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct ReportRequest {
|
||||
reason: String,
|
||||
}
|
||||
|
||||
fn deserialize_download_bool<'de, D>(deserializer: D) -> Result<bool, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
@@ -239,6 +245,12 @@ async fn main() -> cgcx_core::Result<()> {
|
||||
info!("Server database opened at: {:?}", std::fs::canonicalize(&db_path).unwrap_or_else(|_| db_path.clone()));
|
||||
db.run_migrations().await?;
|
||||
|
||||
// Seed a dummy web-reporter user so web-submitted reports satisfy the FK constraint.
|
||||
let user_repo = UserRepo::new(db.conn());
|
||||
if let Err(e) = user_repo.ensure_exists(0, Some("web"), "Web Reporter", 0, None).await {
|
||||
tracing::warn!("Failed to seed web reporter user: {}", e);
|
||||
}
|
||||
|
||||
let storage = Arc::new(Storage::new(config.storage.paths.clone()));
|
||||
storage.ensure_dirs().await?;
|
||||
|
||||
@@ -251,12 +263,14 @@ async fn main() -> cgcx_core::Result<()> {
|
||||
let cookie_secret = blake3::hash(master_key.as_bytes()).as_bytes().to_vec();
|
||||
|
||||
let allowed_roots = Arc::new(vec![
|
||||
tokio::fs::canonicalize(&config.storage.paths.media).await.map_err(|e| CgcxError::Io(e))?,
|
||||
tokio::fs::canonicalize(&config.storage.paths.documents).await.map_err(|e| CgcxError::Io(e))?,
|
||||
tokio::fs::canonicalize(&config.storage.paths.text).await.map_err(|e| CgcxError::Io(e))?,
|
||||
tokio::fs::canonicalize(&config.storage.paths.temp).await.map_err(|e| CgcxError::Io(e))?,
|
||||
tokio::fs::canonicalize(&config.storage.paths.media).await.map_err(CgcxError::Io)?,
|
||||
tokio::fs::canonicalize(&config.storage.paths.documents).await.map_err(CgcxError::Io)?,
|
||||
tokio::fs::canonicalize(&config.storage.paths.text).await.map_err(CgcxError::Io)?,
|
||||
tokio::fs::canonicalize(&config.storage.paths.temp).await.map_err(CgcxError::Io)?,
|
||||
]);
|
||||
|
||||
let http_client = reqwest::Client::new();
|
||||
|
||||
let state = AppState {
|
||||
db,
|
||||
storage,
|
||||
@@ -264,6 +278,7 @@ async fn main() -> cgcx_core::Result<()> {
|
||||
master_key: Arc::new(master_key),
|
||||
cookie_secret,
|
||||
allowed_roots,
|
||||
http_client,
|
||||
};
|
||||
|
||||
let mut governor_builder = tower_governor::governor::GovernorConfigBuilder::default();
|
||||
@@ -331,6 +346,7 @@ async fn main() -> cgcx_core::Result<()> {
|
||||
.route("/api/content/:cxid", get(get_metadata))
|
||||
.route("/api/content/:cxid/file/:file_idx", get(serve_file))
|
||||
.route("/api/content/:cxid/file/:file_idx/raw", get(serve_raw_file))
|
||||
.route("/api/content/:cxid/report", post(report_content))
|
||||
.merge(password_route)
|
||||
.nest_service("/assets", static_service)
|
||||
.fallback(fallback)
|
||||
@@ -371,8 +387,8 @@ async fn main() -> cgcx_core::Result<()> {
|
||||
|
||||
let addr = format!("{}:{}", config.server.bind_address, config.server.port);
|
||||
info!("Server listening on http://{}", addr);
|
||||
let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| CgcxError::Io(e))?;
|
||||
axum::serve(listener, app).await.map_err(|e| CgcxError::Io(e))?;
|
||||
let listener = tokio::net::TcpListener::bind(&addr).await.map_err(CgcxError::Io)?;
|
||||
axum::serve(listener, app).await.map_err(CgcxError::Io)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -610,11 +626,83 @@ fn client_ip_from_headers(headers: &HeaderMap) -> String {
|
||||
"unknown".to_string()
|
||||
}
|
||||
|
||||
async fn report_content(
|
||||
State(state): State<AppState>,
|
||||
Path(cxid): Path<String>,
|
||||
Json(req): Json<ReportRequest>,
|
||||
) -> AppResult<impl IntoResponse> {
|
||||
tracing::info!("report_content: cxid={}", cxid);
|
||||
let content_id = ContentId::try_from(cxid.as_str())?;
|
||||
|
||||
let repo = ContentRepo::new(state.db.conn());
|
||||
let content = repo.get(&content_id).await?.ok_or(CgcxError::NotFound)?;
|
||||
|
||||
if content.status == cgcx_core::ContentStatus::Deleted || content.status == cgcx_core::ContentStatus::Blacklisted {
|
||||
return Err(CgcxError::NotFound.into());
|
||||
}
|
||||
|
||||
let file_repo = ContentFileRepo::new(state.db.conn());
|
||||
let files = file_repo.list_by_content(&content_id).await?;
|
||||
let file_count = files.len();
|
||||
|
||||
let report_repo = ReportRepo::new(state.db.conn());
|
||||
let report_id = report_repo.insert(&content_id, 0, &req.reason).await?;
|
||||
|
||||
let bot_token = &state.config.telegram.bot_token;
|
||||
let api_base = state.config.telegram.api_url.as_deref().unwrap_or("https://api.telegram.org");
|
||||
|
||||
let report_text = format!(
|
||||
"<b>[ NEW REPORT ]</b> #{}\n\nCXID: <code>{}</code>\nReporter: <i>web</i>\nOwner: <code>{}</code>\nUploaded: <i>{}</i>\nFiles: <b>{}</b>",
|
||||
report_id,
|
||||
cxid,
|
||||
content.user_id,
|
||||
content.created_at.format("%Y-%m-%d %H:%M"),
|
||||
file_count
|
||||
);
|
||||
|
||||
let keyboard = serde_json::json!({
|
||||
"inline_keyboard": [
|
||||
[
|
||||
{"text": "[ Rmv + Ban ]", "callback_data": format!("v1:admin:delblk:{}", report_id)},
|
||||
{"text": "[ Delete Only ]", "callback_data": format!("v1:admin:del:{}", report_id)}
|
||||
],
|
||||
[
|
||||
{"text": "[ Blacklist Only ]", "callback_data": format!("v1:admin:blk:{}", report_id)},
|
||||
{"text": "[ Ignore ]", "callback_data": format!("v1:admin:ign:{}", report_id)}
|
||||
]
|
||||
]
|
||||
});
|
||||
|
||||
for &group_id in &state.config.groups.review_group_ids {
|
||||
let url = format!("{}/bot{}/sendMessage", api_base, bot_token);
|
||||
let payload = serde_json::json!({
|
||||
"chat_id": group_id,
|
||||
"text": report_text,
|
||||
"parse_mode": "HTML",
|
||||
"reply_markup": keyboard
|
||||
});
|
||||
|
||||
match state.http_client.post(&url).json(&payload).send().await {
|
||||
Ok(resp) => {
|
||||
if !resp.status().is_success() {
|
||||
tracing::warn!("Failed to send report notification to group {}: HTTP {}", group_id, resp.status());
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("Failed to send report notification to group {}: {}", group_id, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(StatusCode::NO_CONTENT)
|
||||
}
|
||||
|
||||
async fn serve_file(
|
||||
State(state): State<AppState>,
|
||||
Path((cxid, file_idx)): Path<(String, u32)>,
|
||||
Query(query): Query<FileQuery>,
|
||||
headers: HeaderMap,
|
||||
method: Method,
|
||||
) -> AppResult<impl IntoResponse> {
|
||||
tracing::info!("serve_file: cxid={} file_idx={}", cxid, file_idx);
|
||||
let content_id = ContentId::try_from(cxid.as_str())?;
|
||||
@@ -712,7 +800,7 @@ async fn serve_file(
|
||||
|
||||
// Parse Range header
|
||||
let range = if let Some(range_hdr) = headers.get(header::RANGE) {
|
||||
if let Some(hdr_str) = range_hdr.to_str().ok() {
|
||||
if let Ok(hdr_str) = range_hdr.to_str() {
|
||||
match parse_range(hdr_str, file.size_bytes) {
|
||||
Some(r) => Some(r),
|
||||
None => {
|
||||
@@ -732,7 +820,8 @@ async fn serve_file(
|
||||
|
||||
let is_range = range.is_some();
|
||||
let is_conditional = headers.contains_key(header::IF_NONE_MATCH);
|
||||
if !is_range && !is_conditional {
|
||||
let is_head = method == Method::HEAD;
|
||||
if !is_range && !is_conditional && !is_head {
|
||||
let new_views = repo.increment_views(&content_id).await?;
|
||||
if let Some(max) = content.max_views {
|
||||
if new_views >= max {
|
||||
@@ -750,10 +839,8 @@ async fn serve_file(
|
||||
if let Err(e) = file_repo.decrement_ref_count(&f.content_id, f.file_index).await {
|
||||
tracing::warn!("failed to decrement ref_count: {}", e);
|
||||
}
|
||||
} else {
|
||||
if let Err(e) = file_repo.decrement_ref_count_for_path(&f.stored_path).await {
|
||||
tracing::warn!("failed to decrement owner ref_count: {}", e);
|
||||
}
|
||||
} else if let Err(e) = file_repo.decrement_ref_count_for_path(&f.stored_path).await {
|
||||
tracing::warn!("failed to decrement owner ref_count: {}", e);
|
||||
}
|
||||
let remaining = file_repo.count_by_path_excluding_content(&f.stored_path, &f.content_id).await.unwrap_or(1);
|
||||
if remaining == 0 {
|
||||
@@ -836,6 +923,7 @@ async fn serve_raw_file(
|
||||
Path((cxid, file_idx)): Path<(String, u32)>,
|
||||
Query(query): Query<ScQuery>,
|
||||
headers: HeaderMap,
|
||||
method: Method,
|
||||
) -> AppResult<impl IntoResponse> {
|
||||
tracing::info!("serve_raw_file: cxid={} file_idx={}", cxid, file_idx);
|
||||
let content_id = ContentId::try_from(cxid.as_str())?;
|
||||
@@ -901,6 +989,44 @@ async fn serve_raw_file(
|
||||
return Err(CgcxError::Forbidden.into());
|
||||
}
|
||||
|
||||
let is_head = method == Method::HEAD;
|
||||
if !is_head {
|
||||
let new_views = repo.increment_views(&content_id).await?;
|
||||
if let Some(max) = content.max_views {
|
||||
if new_views >= max {
|
||||
let db = state.db.clone();
|
||||
let storage = state.storage.clone();
|
||||
let content_id = content_id.clone();
|
||||
let files = files.clone();
|
||||
let keep_content = state.config.content.keep_content;
|
||||
tokio::spawn(async move {
|
||||
tokio::time::sleep(Duration::from_secs(30)).await;
|
||||
if !keep_content {
|
||||
let file_repo = ContentFileRepo::new(db.conn());
|
||||
for f in &files {
|
||||
if f.ref_count > 0 {
|
||||
if let Err(e) = file_repo.decrement_ref_count(&f.content_id, f.file_index).await {
|
||||
tracing::warn!("failed to decrement ref_count: {}", e);
|
||||
}
|
||||
} else if let Err(e) = file_repo.decrement_ref_count_for_path(&f.stored_path).await {
|
||||
tracing::warn!("failed to decrement owner ref_count: {}", e);
|
||||
}
|
||||
let remaining = file_repo.count_by_path_excluding_content(&f.stored_path, &f.content_id).await.unwrap_or(1);
|
||||
if remaining == 0 {
|
||||
if let Err(e) = tokio::fs::remove_file(&f.stored_path).await {
|
||||
tracing::warn!("failed to remove file {:?}: {}", f.stored_path, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
let _ = storage.delete_content_files(&content_id, "application/octet-stream").await;
|
||||
}
|
||||
let repo = ContentRepo::new(db.conn());
|
||||
let _ = repo.set_status(&content_id, cgcx_core::ContentStatus::Deleted).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Decrypt entire file into memory
|
||||
let mut f = tokio::fs::File::open(&file.stored_path).await.map_err(|e| CgcxError::Storage(e.to_string()))?;
|
||||
let mut header_buf = [0u8; 24];
|
||||
|
||||
Reference in New Issue
Block a user