Major improvement, security handling, file handling +fixes

This commit is contained in:
unknown
2026-05-23 00:13:56 +02:00
parent 2129081599
commit a7b44af91a
25 changed files with 925 additions and 116 deletions

View File

@@ -1,3 +1,4 @@
use std::collections::HashSet;
use std::sync::Arc;
use teloxide::{
dispatching::{dialogue::{InMemStorage, Storage}, UpdateFilterExt},
@@ -5,7 +6,7 @@ use teloxide::{
prelude::*,
types::{
InlineKeyboardButton, InlineKeyboardMarkup, Message, MessageId, ParseMode, CallbackQuery,
ChatMemberStatus, UserId, ChatPermissions,
ChatMemberStatus, UserId, ChatPermissions, InputMedia, InputMediaPhoto, InputMediaDocument, InputFile,
},
RequestError,
utils::command::BotCommands,
@@ -55,12 +56,30 @@ pub struct StagedItem {
pub caption: Option<String>,
}
#[derive(Clone, Default, Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub struct UploadOptions {
pub max_views: Option<u64>,
pub allow_download: bool,
pub password: Option<String>,
pub pending_forward_id: Option<i64>,
#[serde(default = "default_show_author")]
pub show_author: bool,
}
fn default_show_author() -> bool {
true
}
impl Default for UploadOptions {
fn default() -> Self {
Self {
max_views: None,
allow_download: true,
password: None,
pending_forward_id: None,
show_author: true,
}
}
}
type HandlerResult = Result<(), Box<dyn std::error::Error + Send + Sync>>;
@@ -271,6 +290,8 @@ async fn run_bot() {
let repo = PunishmentRepo::new(db_clone.conn());
match repo.list_expired().await {
Ok(expired) => {
// global_ban propagated punishments are naturally revoked here
// because each chat has its own punishment row.
for p in expired {
let chat_id = ChatId(p.chat_id);
let target = UserId(p.target_user_id as u64);
@@ -365,7 +386,7 @@ async fn handle_message_inner(
let dialogue = BotDialogue { chat_id, storage };
let user_repo = UserRepo::new(ctx.db.conn());
user_repo.ensure_exists(user_id, user.username.as_deref(), &user.first_name).await?;
user_repo.ensure_exists(user_id, user.username.as_deref(), &user.first_name, chat_id.0, Some(&ctx.config.uname_changes_path)).await?;
let db_user = match user_repo.get(user_id).await? {
Some(u) => u,
@@ -421,7 +442,7 @@ async fn handle_message_inner(
/get_id — Get current chat ID.
/get_id <@username> — Search administrators by username.
/get_id <displayname> — Search members in this chat by display name.
/create_submit_forward <dest> <review> [msg] — Create a submission forward.
/create_submit_forward &lt;dest&gt; &lt;review&gt; [msg] — Create a submission forward.
/show_c_forward [page] — List forward links.
/add_blacklist <user_id> — Blacklist a user in all active forwards.
/rm_blacklist <user_id> — Remove a user from blacklist in all active forwards.
@@ -576,6 +597,7 @@ async fn handle_message_inner(
bot.ban_chat_member(chat_id, UserId(target_id)).until_date(until).await?;
let repo = PunishmentRepo::new(ctx.db.conn());
repo.insert(chat_id.0, target_id as i64, "ban", Some(duration_seconds), reason.as_deref(), user_id).await?;
propagate_punishment(&bot, &ctx, chat_id, target_id, "ban", Some(duration_seconds), reason.as_deref(), user_id).await;
bot.send_message(chat_id, format!("Banned <code>{}</code> for {} seconds.", target_id, duration_seconds)).parse_mode(ParseMode::Html).await?;
}
Ok(None) => { /* shouldn't happen with 2 args */ }
@@ -604,6 +626,7 @@ async fn handle_message_inner(
bot.restrict_chat_member(chat_id, UserId(target_id), ChatPermissions::empty()).until_date(until).await?;
let repo = PunishmentRepo::new(ctx.db.conn());
repo.insert(chat_id.0, target_id as i64, "mute", Some(duration_seconds), reason.as_deref(), user_id).await?;
propagate_punishment(&bot, &ctx, chat_id, target_id, "mute", Some(duration_seconds), reason.as_deref(), user_id).await;
bot.send_message(chat_id, format!("Muted <code>{}</code> for {} seconds.", target_id, duration_seconds)).parse_mode(ParseMode::Html).await?;
}
Ok(None) => {}
@@ -628,6 +651,7 @@ async fn handle_message_inner(
bot.restrict_chat_member(chat_id, UserId(target_id), ChatPermissions::empty()).await?;
let repo = PunishmentRepo::new(ctx.db.conn());
repo.insert(chat_id.0, target_id as i64, "mute", None, reason.as_deref(), user_id).await?;
propagate_punishment(&bot, &ctx, chat_id, target_id, "mute", None, reason.as_deref(), user_id).await;
bot.send_message(chat_id, format!("Muted <code>{}</code> indefinitely.", target_id)).parse_mode(ParseMode::Html).await?;
} else {
bot.send_message(chat_id, "Could not resolve target user.").await?;
@@ -648,6 +672,7 @@ async fn handle_message_inner(
bot.ban_chat_member(chat_id, UserId(target_id)).await?;
let repo = PunishmentRepo::new(ctx.db.conn());
repo.insert(chat_id.0, target_id as i64, "ban", None, reason.as_deref(), user_id).await?;
propagate_punishment(&bot, &ctx, chat_id, target_id, "ban", None, reason.as_deref(), user_id).await;
bot.send_message(chat_id, format!("Banned <code>{}</code> permanently.", target_id)).parse_mode(ParseMode::Html).await?;
} else {
bot.send_message(chat_id, "Could not resolve target user.").await?;
@@ -669,6 +694,7 @@ async fn handle_message_inner(
bot.unban_chat_member(chat_id, UserId(target_id)).await?;
let repo = PunishmentRepo::new(ctx.db.conn());
repo.insert(chat_id.0, target_id as i64, "kick", None, reason.as_deref(), user_id).await?;
propagate_punishment(&bot, &ctx, chat_id, target_id, "kick", None, reason.as_deref(), user_id).await;
bot.send_message(chat_id, format!("Kicked <code>{}</code>.", target_id)).parse_mode(ParseMode::Html).await?;
} else {
bot.send_message(chat_id, "Could not resolve target user.").await?;
@@ -763,6 +789,11 @@ async fn handle_message_inner(
}
return Ok(());
}
if !p.is_empty() && p.starts_with("report_") {
let cxid = &p["report_".len()..];
handle_report(&bot, chat_id, user_id, cxid, &dialogue, &ctx).await?;
return Ok(());
}
}
if db_user.accepted_terms_at.is_some() {
return send_main_menu(&bot, chat_id, &dialogue, None).await;
@@ -974,6 +1005,14 @@ async fn handle_callback_inner(
refresh_options_message(&bot, chat_id, &items, &new_options).await?;
}
}
"toggle_author" => {
let state = dialogue.get_or_default().await?;
if let BotState::UploadOptions { items, options } = state {
let new_options = UploadOptions { show_author: !options.show_author, ..options };
dialogue.update(BotState::UploadOptions { items: items.clone(), options: new_options.clone() }).await?;
refresh_options_message(&bot, chat_id, &items, &new_options).await?;
}
}
"set_password" => {
bot.send_message(chat_id, "Send the <code>password</code> (max <b>32</b> chars) or <code>/skip</code> to skip.")
.parse_mode(ParseMode::Html)
@@ -1154,7 +1193,7 @@ async fn send_staging_message(bot: &Bot, chat_id: ChatId, items: &[StagedItem],
let text = if items.is_empty() {
format!("<b>[ Staging {} ]</b>\n\n<i>Send me files to add them.</i>\n\n<code>0/{}</code> items", type_label, max_batch_size)
} else {
let list: String = items.iter().map(|i| format!("• <code>{}</code>\n", i.file_name)).collect();
let list: String = items.iter().map(|i| format!("• <code>{}</code>\n", escape_html(&i.file_name))).collect();
format!("<b>[ Staging {} ]</b> <code>{}/{}</code>\n\n{}", type_label, items.len(), max_batch_size, list)
};
@@ -1304,10 +1343,15 @@ async fn refresh_options_message(
} else {
"Password: <i>None</i>"
};
let author_text = if options.show_author {
"Show author: <b>Yes</b>"
} else {
"Show author: <b>No</b>"
};
let text = format!(
"<b>[ Upload Options ]</b>\n\n{}\n{}\n{}\n\n<i>Confirm when ready.</i>",
destroy_text, download_text, password_text
"<b>[ Upload Options ]</b>\n\n{}\n{}\n{}\n{}\n\n<i>Confirm when ready.</i>",
destroy_text, download_text, password_text, author_text
);
let keyboard = InlineKeyboardMarkup::new(vec![
@@ -1317,6 +1361,7 @@ async fn refresh_options_message(
],
vec![
InlineKeyboardButton::callback("[ Set Password ]", "v1:opt:set_password"),
InlineKeyboardButton::callback("[ Toggle Author ]", "v1:opt:toggle_author"),
],
vec![
InlineKeyboardButton::callback("[ Back ]", "v1:opt:back"),
@@ -1391,8 +1436,10 @@ async fn finalize_upload(
options.max_views,
options.allow_download,
password_hash,
options.show_author,
).await?;
let mut blocked = false;
for (idx, item) in items.iter().enumerate() {
let result = if item.file_id.starts_with("text://") {
let data = item.caption.clone().unwrap_or_default().into_bytes();
@@ -1431,10 +1478,28 @@ async fn finalize_upload(
};
if let Err(e) = result {
if matches!(e, cgcx_core::CgcxError::BlockedHash) {
blocked = true;
break;
}
warn!("Ingest error: {}", e);
}
}
if blocked {
bot.delete_message(chat_id, status_msg.id).await.ok();
for item in &items {
if let Some(rest) = item.file_id.strip_prefix("text://") {
if let Ok(msg_id) = rest.parse::<i32>() {
bot.delete_message(chat_id, MessageId(msg_id)).await.ok();
}
}
}
ctx.pipeline.delete_content(&content_id, false).await.ok();
dialogue.update(BotState::MainMenu { pending_forward_id: None }).await?;
return Ok(());
}
ctx.pipeline.activate_content(&content_id).await?;
if let Some(fid) = options.pending_forward_id {
@@ -1448,16 +1513,73 @@ async fn finalize_upload(
user_id,
forward_def.id
);
let keyboard = InlineKeyboardMarkup::new(vec![vec![
InlineKeyboardButton::callback("[ Approve ]", format!("v1:fwd:approve:{}", submission_id)),
InlineKeyboardButton::callback("[ Ignore ]", format!("v1:fwd:ignore:{}", submission_id)),
InlineKeyboardButton::callback("[ Blacklist User ]", format!("v1:fwd:blk:{}", submission_id)),
]]);
let sent = bot.send_message(ChatId(forward_def.review_group_id), review_text)
let keyboard = InlineKeyboardMarkup::new(vec![
vec![
InlineKeyboardButton::callback("[ Approve ]", format!("v1:fwd:approve:{}", submission_id)),
InlineKeyboardButton::callback("[ Ignore ]", format!("v1:fwd:ignore:{}", submission_id)),
],
vec![
InlineKeyboardButton::callback("[ Blackl. ]", format!("v1:fwd:blk:{}", submission_id)),
InlineKeyboardButton::callback("[ Ban ]", format!("v1:fwd:ban:{}", submission_id)),
InlineKeyboardButton::callback("[ Ban/BL u. ]", format!("v1:fwd:banblk:{}", submission_id)),
],
]);
let sent = bot.send_message(ChatId(forward_def.review_group_id), review_text.clone())
.parse_mode(ParseMode::Html)
.reply_markup(keyboard)
.await?;
forward_repo.set_review_message_id(submission_id, sent.id.0).await?;
// G — Send decrypted media batches to review group
let file_repo = ContentFileRepo::new(ctx.db.conn());
let files = file_repo.list_by_content(&content_id).await?;
if !files.is_empty() {
let mut decrypted = Vec::new();
for file in &files {
match tokio::fs::read(&file.stored_path).await {
Ok(ciphertext) => {
match cgcx_crypto::decrypt_bytes(&ciphertext, &file.encrypted_key_wrapped, &ctx.master_key) {
Ok(bytes) => decrypted.push((file.mime_type.clone(), bytes)),
Err(e) => tracing::warn!("decrypt error for {}: {}", file.file_index, e),
}
}
Err(e) => tracing::warn!("read error for {:?}: {}", file.stored_path, e),
}
}
if !decrypted.is_empty() {
let chunks: Vec<_> = decrypted.chunks(10).collect();
let total = chunks.len();
for (i, chunk) in chunks.iter().enumerate() {
let is_last = i == total - 1;
let mut batch: Vec<InputMedia> = Vec::new();
for (mime_type, bytes) in chunk.iter() {
let input_file = InputFile::memory(bytes.clone());
let media = if mime_type.starts_with("image/") {
InputMedia::Photo(InputMediaPhoto::new(input_file))
} else {
InputMedia::Document(InputMediaDocument::new(input_file))
};
batch.push(media);
}
if is_last {
if let Some(last) = batch.last_mut() {
match last {
InputMedia::Photo(p) => {
p.caption = Some(review_text.clone());
p.parse_mode = Some(ParseMode::Html);
}
InputMedia::Document(d) => {
d.caption = Some(review_text.clone());
d.parse_mode = Some(ParseMode::Html);
}
_ => {}
}
}
}
bot.send_media_group(ChatId(forward_def.review_group_id), batch).await?;
}
}
}
}
}
@@ -1486,7 +1608,7 @@ async fn finalize_upload(
);
if let Some(ref pw) = options.password {
let direct_link = format!("{}/?cxid={}&sc={}", base_url, content_id.as_str(), pw);
result_text.push_str(&format!("\n\n<i>Direct Access Link:</i> <code>{}</code>", direct_link));
result_text.push_str(&format!("\n\n<i>Direct Access Link:</i> <code>{}</code>", escape_html(&direct_link)));
}
bot.edit_message_text(chat_id, status_msg.id, result_text)
@@ -1786,22 +1908,124 @@ async fn handle_forward_callback(
// 3. Update content password_hash
let content_repo = ContentRepo::new(ctx.db.conn());
content_repo.update_password_hash(&submission.content_id, Some(&password_hash)).await?;
// 4. Forward content to destination
// 4. Build links
let link = format!("{}/?cxid={}&sc={}", ctx.config.server.base_url, submission.content_id.as_str(), password);
let posted_msg = bot.send_message(
ChatId(forward_def.destination_chat_id),
format!("{}\n\nDirect link: <code>{}</code>", forward_def.forward_message, link)
).parse_mode(ParseMode::Html).await?;
let forward_link = format!("https://t.me/{}?start=submitfwdid{}", ctx.bot_username, forward_def.code);
// 5. DM user
bot.send_message(
ChatId(submission.user_id),
format!("<b>Your submission was approved.</b>\n\nPosted: {}\nDirect access: <code>{}</code>",
format!("https://t.me/c/{}/{}", forward_def.destination_chat_id, posted_msg.id),
link)
).parse_mode(ParseMode::Html).await.ok();
// 5. Resolve author visibility
let content = match content_repo.get(&submission.content_id).await? {
Some(c) => c,
None => {
bot.send_message(chat_id, "Content not found.").await?;
return Ok(());
}
};
let user_repo = UserRepo::new(ctx.db.conn());
let submitter = user_repo.get(submission.user_id).await?;
let author_line = if content.show_author {
if let Some(ref user) = submitter {
if let Some(ref username) = user.telegram_username {
format!("@{} [{}]", escape_html(username), submission.user_id)
} else {
format!("<code>{}</code>", submission.user_id)
}
} else {
format!("<code>{}</code>", submission.user_id)
}
} else {
"<i>anonymous</i>".to_string()
};
// 6. Update review message
let caption = format!(
"{}\n\nSubmitted by: {}\nDirect link: <code>{}</code>\nForward link: <code>{}</code>",
escape_html(&forward_def.forward_message),
author_line,
link,
forward_link
);
// 6. Forward content to destination (media batching or text-only)
let file_repo = ContentFileRepo::new(ctx.db.conn());
let files = file_repo.list_by_content(&submission.content_id).await?;
let posted_link = if files.is_empty() {
let posted_msg = bot.send_message(
ChatId(forward_def.destination_chat_id),
caption
).parse_mode(ParseMode::Html).await?;
format!("https://t.me/c/{}/{}", forward_def.destination_chat_id, posted_msg.id)
} else {
let mut decrypted = Vec::new();
for file in &files {
match tokio::fs::read(&file.stored_path).await {
Ok(ciphertext) => {
match cgcx_crypto::decrypt_bytes(&ciphertext, &file.encrypted_key_wrapped, &ctx.master_key) {
Ok(bytes) => decrypted.push((file.mime_type.clone(), bytes)),
Err(e) => tracing::warn!("decrypt error for {}: {}", file.file_index, e),
}
}
Err(e) => tracing::warn!("read error for {:?}: {}", file.stored_path, e),
}
}
if decrypted.is_empty() {
let posted_msg = bot.send_message(
ChatId(forward_def.destination_chat_id),
caption
).parse_mode(ParseMode::Html).await?;
format!("https://t.me/c/{}/{}", forward_def.destination_chat_id, posted_msg.id)
} else {
let chunks: Vec<_> = decrypted.chunks(10).collect();
let total = chunks.len();
let mut first_msg_id = None;
for (i, chunk) in chunks.iter().enumerate() {
let is_last = i == total - 1;
let mut batch: Vec<InputMedia> = Vec::new();
for (mime_type, bytes) in chunk.iter() {
let input_file = InputFile::memory(bytes.clone());
let media = if mime_type.starts_with("image/") {
InputMedia::Photo(InputMediaPhoto::new(input_file))
} else {
InputMedia::Document(InputMediaDocument::new(input_file))
};
batch.push(media);
}
if is_last {
if let Some(last) = batch.last_mut() {
match last {
InputMedia::Photo(p) => {
p.caption = Some(caption.clone());
p.parse_mode = Some(ParseMode::Html);
}
InputMedia::Document(d) => {
d.caption = Some(caption.clone());
d.parse_mode = Some(ParseMode::Html);
}
_ => {}
}
}
}
let sent = bot.send_media_group(ChatId(forward_def.destination_chat_id), batch).await?;
if first_msg_id.is_none() {
first_msg_id = sent.first().map(|m| m.id);
}
}
if let Some(mid) = first_msg_id {
format!("https://t.me/c/{}/{}", forward_def.destination_chat_id, mid)
} else {
String::new()
}
}
};
// 7. DM user
if !posted_link.is_empty() {
bot.send_message(
ChatId(submission.user_id),
format!("<b>Your submission was approved.</b>\n\nPosted: {}\nDirect access: <code>{}</code>",
posted_link, link)
).parse_mode(ParseMode::Html).await.ok();
}
// 8. Update review message
if let Some(mid) = submission.review_message_id {
bot.edit_message_text(chat_id, MessageId(mid), format!("<b>[ APPROVED ]</b> #{}\nApproved by <code>{}</code>", submission_id, user_id))
.parse_mode(ParseMode::Html)
@@ -1809,7 +2033,7 @@ async fn handle_forward_callback(
.await.ok();
}
// 7. Update status
// 9. Update status
forward_repo.update_status(submission_id, "approved").await?;
}
"ignore" => {
@@ -1832,6 +2056,37 @@ async fn handle_forward_callback(
}
forward_repo.update_status(submission_id, "blacklisted").await?;
}
"ban" => {
let target = UserId(submission.user_id as u64);
let _ = bot.ban_chat_member(ChatId(forward_def.destination_chat_id), target).await;
let _ = bot.ban_chat_member(ChatId(forward_def.review_group_id), target).await;
let repo = PunishmentRepo::new(ctx.db.conn());
let _ = repo.insert(forward_def.destination_chat_id, submission.user_id, "ban", None, None, user_id).await;
let _ = repo.insert(forward_def.review_group_id, submission.user_id, "ban", None, None, user_id).await;
if let Some(mid) = submission.review_message_id {
bot.edit_message_text(chat_id, MessageId(mid), format!("<b>[ BANNED ]</b> #{}\nBanned by <code>{}</code>", submission_id, user_id))
.parse_mode(ParseMode::Html)
.reply_markup(InlineKeyboardMarkup::new(Vec::<Vec<InlineKeyboardButton>>::new()))
.await.ok();
}
forward_repo.update_status(submission_id, "banned").await?;
}
"banblk" => {
let target = UserId(submission.user_id as u64);
let _ = bot.ban_chat_member(ChatId(forward_def.destination_chat_id), target).await;
let _ = bot.ban_chat_member(ChatId(forward_def.review_group_id), target).await;
let repo = PunishmentRepo::new(ctx.db.conn());
let _ = repo.insert(forward_def.destination_chat_id, submission.user_id, "ban", None, None, user_id).await;
let _ = repo.insert(forward_def.review_group_id, submission.user_id, "ban", None, None, user_id).await;
forward_repo.add_to_list(submission.forward_id, submission.user_id, "blacklist").await?;
if let Some(mid) = submission.review_message_id {
bot.edit_message_text(chat_id, MessageId(mid), format!("<b>[ BAN/BL ]</b> #{}\nBanned + Blacklisted by <code>{}</code>", submission_id, user_id))
.parse_mode(ParseMode::Html)
.reply_markup(InlineKeyboardMarkup::new(Vec::<Vec<InlineKeyboardButton>>::new()))
.await.ok();
}
forward_repo.update_status(submission_id, "banblk").await?;
}
_ => {}
}
@@ -1863,6 +2118,12 @@ async fn is_admin_in_chat(bot: &Bot, chat_id: ChatId, user_id: UserId) -> bool {
is_admin(bot, chat_id, user_id).await
}
fn escape_html(s: &str) -> String {
s.replace('&', "&amp;")
.replace('<', "&lt;")
.replace('>', "&gt;")
}
async fn handle_get_id_search(
bot: &Bot,
chat_id: ChatId,
@@ -1885,7 +2146,7 @@ async fn handle_get_id_search(
let full_name = format!("{} {}", user.first_name, user.last_name.unwrap_or_default());
let display = full_name.trim().to_lowercase();
if username.contains(search_term) || display.contains(search_term) || user.id.0.to_string() == query {
let name = full_name.trim().to_string();
let name = escape_html(full_name.trim());
matches.push(format!("• <code>{}</code> — {}", user.id.0, name));
}
}
@@ -1907,14 +2168,21 @@ async fn handle_admin_blacklist_uid(
text: &str,
ctx: &BotContext,
) -> HandlerResult {
let uid = text.split_whitespace().nth(1).and_then(|s| s.parse::<i64>().ok());
if let Some(uid) = uid {
ctx.moderation.blacklist(uid).await?;
let user_repo = UserRepo::new(ctx.db.conn());
user_repo.set_role(uid, "banned").await?;
bot.send_message(chat_id, format!("Blacklisted UID <code>{}</code>", uid))
.parse_mode(ParseMode::Html).await?;
if !ctx.config.groups.admin_group_ids.contains(&chat_id.0) {
bot.send_message(chat_id, "This command is only available in the admin group.")
.await?;
return Ok(());
}
let uid = text.split_whitespace().nth(1).and_then(|s| s.parse::<i64>().ok());
let Some(uid) = uid else {
bot.send_message(chat_id, "Usage: /blacklist_uid <user_id>").await?;
return Ok(());
};
ctx.moderation.blacklist(uid).await?;
let user_repo = UserRepo::new(ctx.db.conn());
user_repo.set_role(uid, "banned").await?;
bot.send_message(chat_id, format!("Blacklisted UID <code>{}</code>", uid))
.parse_mode(ParseMode::Html).await?;
Ok(())
}
@@ -1924,14 +2192,21 @@ async fn handle_admin_whitelist_uid(
text: &str,
ctx: &BotContext,
) -> HandlerResult {
let uid = text.split_whitespace().nth(1).and_then(|s| s.parse::<i64>().ok());
if let Some(uid) = uid {
ctx.moderation.remove_blacklist(uid).await?;
let user_repo = UserRepo::new(ctx.db.conn());
user_repo.set_role(uid, "user").await?;
bot.send_message(chat_id, format!("Whitelisted UID <code>{}</code>", uid))
.parse_mode(ParseMode::Html).await?;
if !ctx.config.groups.admin_group_ids.contains(&chat_id.0) {
bot.send_message(chat_id, "This command is only available in the admin group.")
.await?;
return Ok(());
}
let uid = text.split_whitespace().nth(1).and_then(|s| s.parse::<i64>().ok());
let Some(uid) = uid else {
bot.send_message(chat_id, "Usage: /whitelist_uid <user_id>").await?;
return Ok(());
};
ctx.moderation.remove_blacklist(uid).await?;
let user_repo = UserRepo::new(ctx.db.conn());
user_repo.set_role(uid, "user").await?;
bot.send_message(chat_id, format!("Whitelisted UID <code>{}</code>", uid))
.parse_mode(ParseMode::Html).await?;
Ok(())
}
@@ -1966,6 +2241,70 @@ fn parse_duration(parts: &[&str]) -> Result<Option<i64>, String> {
Ok(Some(total as i64))
}
async fn propagate_punishment(
bot: &Bot,
ctx: &BotContext,
current_chat_id: ChatId,
target_user_id: u64,
action_type: &str,
duration_seconds: Option<i64>,
reason: Option<&str>,
created_by: i64,
) {
if !ctx.config.groups.global_ban {
return;
}
let forward_repo = ForwardRepo::new(ctx.db.conn());
let mut chat_ids = HashSet::new();
chat_ids.extend(ctx.config.groups.admin_group_ids.iter().copied());
chat_ids.extend(ctx.config.groups.review_group_ids.iter().copied());
match forward_repo.list_active_chat_ids().await {
Ok(ids) => chat_ids.extend(ids),
Err(e) => tracing::warn!("failed to list active forward chats: {}", e),
}
let target = UserId(target_user_id);
for chat_id in chat_ids {
if chat_id == current_chat_id.0 {
continue;
}
if !is_admin(bot, ChatId(chat_id), ctx.bot_id).await {
continue;
}
let chat = ChatId(chat_id);
let res = match action_type {
"ban" => {
if let Some(dur) = duration_seconds {
let until = chrono::Utc::now() + chrono::Duration::seconds(dur);
bot.ban_chat_member(chat, target).until_date(until).await
} else {
bot.ban_chat_member(chat, target).await
}
}
"mute" => {
if let Some(dur) = duration_seconds {
let until = chrono::Utc::now() + chrono::Duration::seconds(dur);
bot.restrict_chat_member(chat, target, ChatPermissions::empty()).until_date(until).await
} else {
bot.restrict_chat_member(chat, target, ChatPermissions::empty()).await
}
}
"kick" => {
let _ = bot.ban_chat_member(chat, target).await;
bot.unban_chat_member(chat, target).await
}
_ => return,
};
if let Err(e) = res {
tracing::warn!("global_ban {} failed in chat {} for user {}: {}", action_type, chat_id, target_user_id, e);
continue;
}
let repo = PunishmentRepo::new(ctx.db.conn());
if let Err(e) = repo.insert(chat_id, target_user_id as i64, action_type, duration_seconds, reason, created_by).await {
tracing::warn!("failed to record propagated punishment in chat {}: {}", chat_id, e);
}
}
}
async fn resolve_target_user_id(
bot: &Bot,
chat_id: ChatId,

View File

@@ -16,12 +16,18 @@ pub struct Config {
pub frontend: FrontendConfig,
#[serde(default = "default_db_path")]
pub database_path: String,
#[serde(default = "default_uname_changes_path")]
pub uname_changes_path: String,
}
fn default_db_path() -> String {
"data/db.sqlite".to_string()
}
fn default_uname_changes_path() -> String {
"data/uname_changes.json".to_string()
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ContentConfig {
pub keep_content: bool,
@@ -60,6 +66,12 @@ pub struct TelegramConfig {
pub struct GroupsConfig {
pub admin_group_ids: Vec<i64>,
pub review_group_ids: Vec<i64>,
#[serde(default = "default_global_ban")]
pub global_ban: bool,
}
fn default_global_ban() -> bool {
false
}
#[derive(Debug, Clone, Deserialize, Serialize)]
@@ -197,6 +209,9 @@ impl Config {
));
}
// global_ban is always present (bool with default), no further validation needed
let _ = self.groups.global_ban;
match &self.crypto.aes_master_key_source {
KeySource::File { path } if path.as_os_str().is_empty() => {
return Err(cgcx_core::CgcxError::Config(

View File

@@ -28,6 +28,8 @@ pub enum CgcxError {
RateLimited,
#[error("bad request: {0}")]
BadRequest(String),
#[error("blocked hash")]
BlockedHash,
#[error("insufficient storage")]
InsufficientStorage,
#[error("io error: {0}")]

View File

@@ -44,6 +44,7 @@ pub struct Content {
pub max_views: Option<u64>,
pub allow_download: bool,
pub password_hash: Option<String>,
pub show_author: bool,
pub created_at: DateTime<Utc>,
pub deleted_at: Option<DateTime<Utc>>,
}
@@ -61,6 +62,8 @@ pub struct ContentFile {
pub encrypted_hash: Vec<u8>,
pub render_flags: u32,
pub created_at: DateTime<Utc>,
pub plaintext_hash: Vec<u8>,
pub ref_count: u64,
}
#[derive(Debug, Clone, Deserialize, Serialize)]

View File

@@ -99,6 +99,37 @@ impl DecryptStream {
}
}
pub fn decrypt_bytes(ciphertext: &[u8], wrapped_key: &[u8], master_key: &MasterKey) -> cgcx_core::Result<Vec<u8>> {
let key = unwrap_content_key(wrapped_key, master_key)?;
if ciphertext.len() < 17 {
return Err(cgcx_core::CgcxError::Crypto("ciphertext too short".into()));
}
let header = xchacha20poly1305::Header::from_slice(&ciphertext[..17])
.ok_or_else(|| cgcx_core::CgcxError::Crypto("invalid header".into()))?;
let mut stream = DecryptStream::new(&key, &header)?;
let mut plaintext = Vec::new();
let mut offset = 17;
while offset < ciphertext.len() {
if offset + 4 > ciphertext.len() {
return Err(cgcx_core::CgcxError::Crypto("truncated length prefix".into()));
}
let len = u32::from_le_bytes([
ciphertext[offset],
ciphertext[offset + 1],
ciphertext[offset + 2],
ciphertext[offset + 3],
]) as usize;
offset += 4;
if offset + len > ciphertext.len() {
return Err(cgcx_core::CgcxError::Crypto("truncated ciphertext".into()));
}
let (pt, _tag) = stream.pull(&ciphertext[offset..offset + len])?;
plaintext.extend_from_slice(&pt);
offset += len;
}
Ok(plaintext)
}
pub fn hash_file_at_path(path: &Path) -> cgcx_core::Result<[u8; 32]> {
let mut hasher = Hasher::new();
let mut file = std::fs::File::open(path)?;

View File

@@ -9,5 +9,6 @@ cgcx-config = { path = "../cgcx-config" }
chrono = { version = "0.4", features = ["serde"] }
rusqlite = { version = "0.32", features = ["bundled", "chrono"] }
rusqlite_migration = "1.3"
serde_json = "1.0"
tokio = { version = "1", features = ["sync", "rt"] }
tracing = "0.1"

View File

@@ -49,6 +49,9 @@ impl Database {
rusqlite_migration::M::up(include_str!("../../../migrations/002_indexes.sql")),
rusqlite_migration::M::up(include_str!("../../../migrations/003_forward_system.sql")),
rusqlite_migration::M::up(include_str!("../../../migrations/004_punishments.sql")),
rusqlite_migration::M::up(include_str!("../../../migrations/005_show_author.sql")),
rusqlite_migration::M::up(include_str!("../../../migrations/006_dedup.sql")),
rusqlite_migration::M::up(include_str!("../../../migrations/007_hash_blacklist.sql")),
]);
migrations.to_latest(&mut *conn)
.map_err(|e| CgcxError::Database(format!("migration failed: {}", e)))?;

View File

@@ -12,16 +12,43 @@ impl UserRepo {
Self { conn }
}
pub async fn ensure_exists(&self, id: i64, username: Option<&str>, first_name: &str) -> Result<()> {
let conn = self.conn.lock().await;
conn.execute(
"INSERT INTO users (id, telegram_username, first_name) VALUES (?1, ?2, ?3)
ON CONFLICT(id) DO UPDATE SET telegram_username=excluded.telegram_username, first_name=excluded.first_name",
params![id, username, first_name],
).map_err(|e| CgcxError::Database(e.to_string()))?;
pub async fn ensure_exists(&self, id: i64, username: Option<&str>, first_name: &str, chat_id: i64, uname_changes_path: Option<&str>) -> Result<()> {
let old_username = {
let conn = self.conn.lock().await;
let old: Option<String> = conn.query_row(
"SELECT telegram_username FROM users WHERE id = ?1",
params![id],
|row| row.get(0),
).optional().map_err(|e| CgcxError::Database(e.to_string()))?;
conn.execute(
"INSERT INTO users (id, telegram_username, first_name) VALUES (?1, ?2, ?3)
ON CONFLICT(id) DO UPDATE SET telegram_username=excluded.telegram_username, first_name=excluded.first_name",
params![id, username, first_name],
).map_err(|e| CgcxError::Database(e.to_string()))?;
old
};
if let (Some(path), Some(ref old)) = (uname_changes_path, old_username) {
if old.as_str() != username.unwrap_or("") {
Self::log_username_change(id, chat_id, Some(old.as_str()), username, path);
}
}
Ok(())
}
fn log_username_change(user_id: i64, chat_id: i64, old: Option<&str>, new: Option<&str>, path: &str) {
let entry = serde_json::json!({
"timestamp": chrono::Utc::now().to_rfc3339(),
"user_id": user_id,
"old_username": old.unwrap_or(""),
"new_username": new.unwrap_or(""),
"chat_id": chat_id
});
if let Ok(mut file) = std::fs::OpenOptions::new().create(true).append(true).open(path) {
use std::io::Write;
let _ = writeln!(file, "{}", entry);
}
}
pub async fn get(&self, id: i64) -> Result<Option<User>> {
let conn = self.conn.lock().await;
let row = conn.query_row(
@@ -78,8 +105,8 @@ impl ContentRepo {
let conn = self.conn.lock().await;
let status = format!("{:?}", content.status).to_lowercase();
conn.execute(
"INSERT INTO contents (id, user_id, status, view_count, max_views, allow_download, password_hash, created_at, deleted_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
"INSERT INTO contents (id, user_id, status, view_count, max_views, allow_download, password_hash, show_author, created_at, deleted_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)
ON CONFLICT(id) DO NOTHING",
params![
content.id.as_str(),
@@ -89,6 +116,7 @@ impl ContentRepo {
content.max_views.map(|v| v as i64),
content.allow_download as i64,
content.password_hash.as_ref(),
content.show_author as i64,
content.created_at,
content.deleted_at,
],
@@ -99,7 +127,7 @@ impl ContentRepo {
pub async fn get(&self, id: &ContentId) -> Result<Option<Content>> {
let conn = self.conn.lock().await;
let row = conn.query_row(
"SELECT id, user_id, status, view_count, max_views, allow_download, password_hash, created_at, deleted_at
"SELECT id, user_id, status, view_count, max_views, allow_download, password_hash, show_author, created_at, deleted_at
FROM contents WHERE id = ?1",
params![id.as_str()],
|row| {
@@ -117,8 +145,9 @@ impl ContentRepo {
max_views: row.get::<_, Option<i64>>(4)?.map(|v| v as u64),
allow_download: row.get::<_, i64>(5)? != 0,
password_hash: row.get(6)?,
created_at: row.get(7)?,
deleted_at: row.get(8)?,
show_author: row.get::<_, i64>(7)? != 0,
created_at: row.get(8)?,
deleted_at: row.get(9)?,
})
},
).optional().map_err(|e| CgcxError::Database(e.to_string()))?;
@@ -128,7 +157,7 @@ impl ContentRepo {
pub async fn list_by_user(&self, user_id: i64, limit: usize, offset: usize) -> Result<Vec<Content>> {
let conn = self.conn.lock().await;
let mut stmt = conn.prepare(
"SELECT id, user_id, status, view_count, max_views, allow_download, password_hash, created_at, deleted_at
"SELECT id, user_id, status, view_count, max_views, allow_download, password_hash, show_author, created_at, deleted_at
FROM contents WHERE user_id = ?1 AND status != 'deleted' ORDER BY created_at DESC LIMIT ?2 OFFSET ?3"
).map_err(|e| CgcxError::Database(e.to_string()))?;
let rows = stmt.query_map(params![user_id, limit as i64, offset as i64], |row| {
@@ -146,8 +175,9 @@ impl ContentRepo {
max_views: row.get::<_, Option<i64>>(4)?.map(|v| v as u64),
allow_download: row.get::<_, i64>(5)? != 0,
password_hash: row.get(6)?,
created_at: row.get(7)?,
deleted_at: row.get(8)?,
show_author: row.get::<_, i64>(7)? != 0,
created_at: row.get(8)?,
deleted_at: row.get(9)?,
})
}).map_err(|e| CgcxError::Database(e.to_string()))?;
let mut out = Vec::new();
@@ -220,8 +250,8 @@ impl ContentFileRepo {
pub async fn insert(&self, file: &ContentFile) -> Result<()> {
let conn = self.conn.lock().await;
conn.execute(
"INSERT INTO content_files (content_id, file_index, original_name, stored_path, mime_type, size_bytes, ciphertext_size_bytes, encrypted_key_wrapped, encrypted_hash, render_flags, created_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
"INSERT INTO content_files (content_id, file_index, original_name, stored_path, mime_type, size_bytes, ciphertext_size_bytes, encrypted_key_wrapped, encrypted_hash, render_flags, created_at, plaintext_hash, ref_count)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
ON CONFLICT(content_id, file_index) DO NOTHING",
params![
file.content_id.as_str(),
@@ -235,6 +265,8 @@ impl ContentFileRepo {
&file.encrypted_hash,
file.render_flags as i64,
file.created_at,
&file.plaintext_hash,
file.ref_count as i64,
],
).map_err(|e| CgcxError::Database(e.to_string()))?;
Ok(())
@@ -243,7 +275,7 @@ impl ContentFileRepo {
pub async fn list_by_content(&self, content_id: &ContentId) -> Result<Vec<ContentFile>> {
let conn = self.conn.lock().await;
let mut stmt = conn.prepare(
"SELECT content_id, file_index, original_name, stored_path, mime_type, size_bytes, ciphertext_size_bytes, encrypted_key_wrapped, encrypted_hash, render_flags, created_at
"SELECT content_id, file_index, original_name, stored_path, mime_type, size_bytes, ciphertext_size_bytes, encrypted_key_wrapped, encrypted_hash, render_flags, created_at, plaintext_hash, ref_count
FROM content_files WHERE content_id = ?1 ORDER BY file_index"
).map_err(|e| CgcxError::Database(e.to_string()))?;
let rows = stmt.query_map(params![content_id.as_str()], |row| {
@@ -259,6 +291,8 @@ impl ContentFileRepo {
encrypted_hash: row.get(8)?,
render_flags: row.get::<_, i64>(9)? as u32,
created_at: row.get(10)?,
plaintext_hash: row.get(11)?,
ref_count: row.get::<_, i64>(12)? as u64,
})
}).map_err(|e| CgcxError::Database(e.to_string()))?;
let mut out = Vec::new();
@@ -284,6 +318,100 @@ impl ContentFileRepo {
}
Ok(out)
}
pub async fn find_active_by_plaintext_hash(&self, hash: &[u8]) -> Result<Option<ContentFile>> {
let conn = self.conn.lock().await;
let row = conn.query_row(
"SELECT cf.content_id, cf.file_index, cf.original_name, cf.stored_path, cf.mime_type, cf.size_bytes, cf.ciphertext_size_bytes, cf.encrypted_key_wrapped, cf.encrypted_hash, cf.render_flags, cf.created_at, cf.plaintext_hash, cf.ref_count
FROM content_files cf
JOIN contents c ON c.id = cf.content_id
WHERE cf.plaintext_hash = ?1 AND c.status NOT IN ('deleted', 'blacklisted')
LIMIT 1",
params![hash],
|row| {
Ok(ContentFile {
content_id: ContentId::new_unchecked(row.get(0)?),
file_index: row.get::<_, i64>(1)? as u32,
original_name: row.get(2)?,
stored_path: std::path::PathBuf::from(row.get::<_, String>(3)?),
mime_type: row.get(4)?,
size_bytes: row.get::<_, i64>(5)? as u64,
ciphertext_size_bytes: row.get::<_, i64>(6)? as u64,
encrypted_key_wrapped: row.get(7)?,
encrypted_hash: row.get(8)?,
render_flags: row.get::<_, i64>(9)? as u32,
created_at: row.get(10)?,
plaintext_hash: row.get(11)?,
ref_count: row.get::<_, i64>(12)? as u64,
})
},
).optional().map_err(|e| CgcxError::Database(e.to_string()))?;
Ok(row)
}
pub async fn increment_ref_count(&self, content_id: &ContentId, file_index: u32) -> Result<()> {
let conn = self.conn.lock().await;
conn.execute(
"UPDATE content_files SET ref_count = ref_count + 1 WHERE content_id = ?1 AND file_index = ?2",
params![content_id.as_str(), file_index as i64],
).map_err(|e| CgcxError::Database(e.to_string()))?;
Ok(())
}
pub async fn decrement_ref_count(&self, content_id: &ContentId, file_index: u32) -> Result<u64> {
let conn = self.conn.lock().await;
conn.execute(
"UPDATE content_files SET ref_count = ref_count - 1 WHERE content_id = ?1 AND file_index = ?2",
params![content_id.as_str(), file_index as i64],
).map_err(|e| CgcxError::Database(e.to_string()))?;
let count: i64 = conn.query_row(
"SELECT ref_count FROM content_files WHERE content_id = ?1 AND file_index = ?2",
params![content_id.as_str(), file_index as i64],
|row| row.get(0),
).map_err(|e| CgcxError::Database(e.to_string()))?;
Ok(count as u64)
}
pub async fn decrement_ref_count_for_path(&self, stored_path: &std::path::Path) -> Result<Option<u64>> {
let mut conn = self.conn.lock().await;
let tx = conn.transaction().map_err(|e| CgcxError::Database(e.to_string()))?;
let existing: Option<i64> = tx.query_row(
"SELECT ref_count FROM content_files WHERE stored_path = ?1 AND ref_count > 0 LIMIT 1",
params![stored_path.to_str()],
|row| row.get(0),
).optional().map_err(|e| CgcxError::Database(e.to_string()))?;
if let Some(rc) = existing {
tx.execute(
"UPDATE content_files SET ref_count = ref_count - 1 WHERE stored_path = ?1 AND ref_count > 0",
params![stored_path.to_str()],
).map_err(|e| CgcxError::Database(e.to_string()))?;
tx.commit().map_err(|e| CgcxError::Database(e.to_string()))?;
Ok(Some((rc - 1) as u64))
} else {
tx.commit().map_err(|e| CgcxError::Database(e.to_string()))?;
Ok(None)
}
}
pub async fn count_by_path_excluding_content(&self, stored_path: &std::path::Path, content_id: &ContentId) -> Result<usize> {
let conn = self.conn.lock().await;
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM content_files WHERE stored_path = ?1 AND content_id != ?2",
params![stored_path.to_str(), content_id.as_str()],
|row| row.get(0),
).map_err(|e| CgcxError::Database(e.to_string()))?;
Ok(count as usize)
}
pub async fn count_by_path(&self, stored_path: &std::path::Path) -> Result<usize> {
let conn = self.conn.lock().await;
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM content_files WHERE stored_path = ?1",
params![stored_path.to_str()],
|row| row.get(0),
).map_err(|e| CgcxError::Database(e.to_string()))?;
Ok(count as usize)
}
}
pub struct ReportRepo {
@@ -607,6 +735,25 @@ impl ForwardRepo {
).map_err(|e| CgcxError::Database(e.to_string()))?;
Ok(())
}
pub async fn list_active_chat_ids(&self) -> Result<Vec<i64>> {
let conn = self.conn.lock().await;
let mut stmt = conn.prepare(
"SELECT DISTINCT source_chat_id FROM forward_definitions WHERE revoked_at IS NULL
UNION
SELECT DISTINCT destination_chat_id FROM forward_definitions WHERE revoked_at IS NULL
UNION
SELECT DISTINCT review_group_id FROM forward_definitions WHERE revoked_at IS NULL"
).map_err(|e| CgcxError::Database(e.to_string()))?;
let rows = stmt.query_map([], |row| {
row.get::<_, i64>(0)
}).map_err(|e| CgcxError::Database(e.to_string()))?;
let mut out = Vec::new();
for r in rows {
out.push(r.map_err(|e| CgcxError::Database(e.to_string()))?);
}
Ok(out)
}
}
pub struct PunishmentRepo {
conn: Arc<Mutex<Connection>>,
@@ -697,3 +844,32 @@ impl PunishmentRepo {
Ok(results)
}
}
pub struct HashBlacklistRepo {
conn: Arc<Mutex<Connection>>,
}
impl HashBlacklistRepo {
pub fn new(conn: Arc<Mutex<Connection>>) -> Self {
Self { conn }
}
pub async fn insert(&self, hash: &[u8], reason: Option<&str>) -> Result<()> {
let conn = self.conn.lock().await;
conn.execute(
"INSERT INTO hash_blacklist (hash, reason) VALUES (?1, ?2) ON CONFLICT(hash) DO NOTHING",
params![hash, reason],
).map_err(|e| CgcxError::Database(e.to_string()))?;
Ok(())
}
pub async fn contains(&self, hash: &[u8]) -> Result<bool> {
let conn = self.conn.lock().await;
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM hash_blacklist WHERE hash = ?1",
params![hash],
|row| row.get(0),
).map_err(|e| CgcxError::Database(e.to_string()))?;
Ok(count > 0)
}
}

View File

@@ -15,3 +15,4 @@ tempfile = "3"
tracing = "0.1"
chrono = "0.4"
sodiumoxide = "0.2"
blake3 = "1.5"

View File

@@ -1,12 +1,12 @@
use cgcx_config::Config;
use cgcx_core::{ContentFile, ContentId, ContentStatus, Content, Result, CgcxError};
use cgcx_crypto::{ContentKey, wrap_content_key};
use cgcx_db::{Database, ContentRepo, ContentFileRepo};
use cgcx_db::{Database, ContentRepo, ContentFileRepo, HashBlacklistRepo};
use cgcx_storage::Storage;
use cgcx_content_typing::{detect_mime_type, compute_render_flags};
use sodiumoxide::crypto::secretstream::xchacha20poly1305::Tag::{Message, Final};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
use std::collections::HashSet;
pub use cgcx_crypto::MasterKey;
@@ -54,6 +54,7 @@ impl FilePipeline {
let temp_path = named_temp.path().to_path_buf();
let mut total_size: u64 = 0;
let mut plaintext_hasher = blake3::Hasher::new();
{
let mut temp_file = tokio::fs::File::create(&temp_path).await
.map_err(|e| CgcxError::Storage(format!("create temp file: {}", e)))?;
@@ -73,6 +74,7 @@ impl FilePipeline {
)));
}
total_size = new_total;
plaintext_hasher.update(&buf[..pending]);
let ciphertext = encrypt_stream.push(&buf[..pending], Message);
temp_file.write_all(&(ciphertext.len() as u32).to_le_bytes()).await
.map_err(|e| CgcxError::Storage(format!("write length prefix: {}", e)))?;
@@ -94,6 +96,7 @@ impl FilePipeline {
)));
}
total_size = new_total;
plaintext_hasher.update(&buf[..pending]);
let ciphertext = encrypt_stream.push(&buf[..pending], Final);
temp_file.write_all(&(ciphertext.len() as u32).to_le_bytes()).await
.map_err(|e| CgcxError::Storage(format!("write length prefix: {}", e)))?;
@@ -117,9 +120,43 @@ impl FilePipeline {
.map_err(|e| CgcxError::Storage(format!("flush temp file: {}", e)))?;
}
let plaintext_hash = plaintext_hasher.finalize();
let encrypted_hash = encrypt_stream.finalize();
let ciphertext_size_bytes = self.storage.file_size(&temp_path).await?;
let file_repo = ContentFileRepo::new(self.db.conn());
let hash_bytes = plaintext_hash.as_bytes();
// N — Hash blacklist enforcement
let blacklist_repo = HashBlacklistRepo::new(self.db.conn());
if blacklist_repo.contains(hash_bytes).await? {
drop(named_temp);
return Err(CgcxError::BlockedHash);
}
// M — Deduplication
if let Some(existing) = file_repo.find_active_by_plaintext_hash(hash_bytes).await? {
drop(named_temp);
let content_file = ContentFile {
content_id: content_id.clone(),
file_index,
original_name: original_name.to_string(),
stored_path: existing.stored_path.clone(),
mime_type: existing.mime_type.clone(),
size_bytes: total_size,
ciphertext_size_bytes: existing.ciphertext_size_bytes,
encrypted_key_wrapped: existing.encrypted_key_wrapped.clone(),
encrypted_hash: existing.encrypted_hash.clone(),
render_flags,
created_at: chrono::Utc::now(),
plaintext_hash: hash_bytes.to_vec(),
ref_count: 0,
};
file_repo.insert(&content_file).await?;
file_repo.increment_ref_count(&existing.content_id, existing.file_index).await?;
return Ok(content_file);
}
let final_path = self.storage.file_path(content_id, file_index, &mime_type)?;
if let Some(parent) = final_path.parent() {
tokio::fs::create_dir_all(parent).await
@@ -143,9 +180,10 @@ impl FilePipeline {
encrypted_hash: encrypted_hash.to_vec(),
render_flags,
created_at: chrono::Utc::now(),
plaintext_hash: hash_bytes.to_vec(),
ref_count: 1,
};
let file_repo = ContentFileRepo::new(self.db.conn());
file_repo.insert(&content_file).await?;
Ok(content_file)
@@ -158,6 +196,7 @@ impl FilePipeline {
max_views: Option<u64>,
allow_download: bool,
password_hash: Option<String>,
show_author: bool,
) -> Result<()> {
let content = Content {
id: content_id,
@@ -167,6 +206,7 @@ impl FilePipeline {
max_views,
allow_download,
password_hash,
show_author,
created_at: chrono::Utc::now(),
deleted_at: None,
};
@@ -185,8 +225,20 @@ impl FilePipeline {
if !keep_disk {
for file in &files {
if let Err(e) = tokio::fs::remove_file(&file.stored_path).await {
tracing::warn!("failed to remove file {:?}: {}", file.stored_path, e);
if file.ref_count > 0 {
if let Err(e) = file_repo.decrement_ref_count(&file.content_id, file.file_index).await {
tracing::warn!("failed to decrement ref_count for {:?}: {}", file.stored_path, e);
}
} else {
if let Err(e) = file_repo.decrement_ref_count_for_path(&file.stored_path).await {
tracing::warn!("failed to decrement owner ref_count for {:?}: {}", file.stored_path, e);
}
}
let remaining = file_repo.count_by_path_excluding_content(&file.stored_path, content_id).await.unwrap_or(1);
if remaining == 0 {
if let Err(e) = tokio::fs::remove_file(&file.stored_path).await {
tracing::warn!("failed to remove file {:?}: {}", file.stored_path, e);
}
}
}
if let Some(first) = files.first() {
@@ -243,24 +295,6 @@ impl FilePipeline {
continue;
}
let content_id_str = dir_path.file_name()
.and_then(|s| s.to_str())
.unwrap_or("");
let db_paths: HashSet<std::path::PathBuf> = if ContentId::is_valid(content_id_str) {
let content_id = ContentId::new_unchecked(content_id_str.to_string());
match file_repo.list_by_content(&content_id).await {
Ok(files) => files.into_iter().map(|f| f.stored_path).collect(),
Err(e) => {
tracing::warn!("failed to list files for {}: {}", content_id, e);
continue;
}
}
} else {
// Invalid content directory nothing in it can be referenced.
HashSet::new()
};
let mut sub_entries = tokio::fs::read_dir(&dir_path).await
.map_err(|e| CgcxError::Storage(format!("read content dir: {}", e)))?;
while let Some(sub_entry) = sub_entries.next_entry().await
@@ -268,7 +302,8 @@ impl FilePipeline {
{
let path = sub_entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("enc") {
if !db_paths.contains(&path) {
let count = file_repo.count_by_path(&path).await.unwrap_or(1);
if count == 0 {
if let Err(e) = tokio::fs::remove_file(&path).await {
tracing::warn!("failed to remove orphan enc file {:?}: {}", path, e);
} else {

View File

@@ -10,7 +10,7 @@ use axum::{
use cgcx_config::Config;
use cgcx_core::{ContentId, CgcxError};
use cgcx_crypto::{unwrap_content_key, DecryptStream, MasterKey};
use cgcx_db::{Database, ContentRepo, ContentFileRepo};
use cgcx_db::{Database, ContentRepo, ContentFileRepo, UserRepo};
use cgcx_storage::Storage;
use serde::{Deserialize, Serialize};
use std::net::IpAddr;
@@ -44,6 +44,12 @@ struct HealthResponse {
status: String,
}
#[derive(Serialize)]
struct AuthorInfo {
username: Option<String>,
user_id: i64,
}
#[derive(Serialize)]
struct ContentMetadata {
cxid: String,
@@ -53,6 +59,8 @@ struct ContentMetadata {
current_views: u64,
allow_download: bool,
created_at: String,
total_size: u64,
author: Option<AuthorInfo>,
}
#[derive(Serialize)]
@@ -504,6 +512,20 @@ async fn get_metadata(
let file_repo = ContentFileRepo::new(state.db.conn());
let files = file_repo.list_by_content(&content_id).await?;
let total_size = files.iter().map(|f| f.size_bytes).sum();
let user_repo = UserRepo::new(state.db.conn());
let author = if content.show_author {
match user_repo.get(content.user_id).await? {
Some(user) => Some(AuthorInfo {
username: user.telegram_username,
user_id: user.id,
}),
None => None,
}
} else {
None
};
let body = serde_json::to_vec(&ContentMetadata {
cxid: content.id.to_string(),
@@ -519,6 +541,8 @@ async fn get_metadata(
current_views: content.view_count,
allow_download: content.allow_download,
created_at: content.created_at.to_rfc3339(),
total_size,
author,
}).map_err(|_| CgcxError::BadRequest("json serialization".into()))?;
let mut response = Response::builder()
.status(StatusCode::OK)
@@ -712,21 +736,37 @@ async fn serve_file(
let new_views = repo.increment_views(&content_id).await?;
if let Some(max) = content.max_views {
if new_views >= max {
if !state.config.content.keep_content {
for f in &files {
if let Err(e) = tokio::fs::remove_file(&f.stored_path).await {
tracing::warn!("failed to remove file {:?}: {}", f.stored_path, e);
let db = state.db.clone();
let storage = state.storage.clone();
let content_id = content_id.clone();
let files = files.clone();
let keep_content = state.config.content.keep_content;
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(30)).await;
if !keep_content {
let file_repo = ContentFileRepo::new(db.conn());
for f in &files {
if f.ref_count > 0 {
if let Err(e) = file_repo.decrement_ref_count(&f.content_id, f.file_index).await {
tracing::warn!("failed to decrement ref_count: {}", e);
}
} else {
if let Err(e) = file_repo.decrement_ref_count_for_path(&f.stored_path).await {
tracing::warn!("failed to decrement owner ref_count: {}", e);
}
}
let remaining = file_repo.count_by_path_excluding_content(&f.stored_path, &f.content_id).await.unwrap_or(1);
if remaining == 0 {
if let Err(e) = tokio::fs::remove_file(&f.stored_path).await {
tracing::warn!("failed to remove file {:?}: {}", f.stored_path, e);
}
}
}
let _ = storage.delete_content_files(&content_id, "application/octet-stream").await;
}
let _ = state.storage.delete_content_files(&content_id, "application/octet-stream").await;
}
repo.set_status(&content_id, cgcx_core::ContentStatus::Deleted).await?;
let mut response = Response::builder()
.status(StatusCode::GONE)
.body(Body::empty())
.map_err(|e| CgcxError::Storage(format!("response build failed: {}", e)))?;
add_auth_cookie(&mut response, &auth_source, &cxid, &state.cookie_secret)?;
return Ok(response);
let repo = ContentRepo::new(db.conn());
let _ = repo.set_status(&content_id, cgcx_core::ContentStatus::Deleted).await;
});
}
}
}