Initial commit
This commit is contained in:
17
crates/cgcx-file-pipeline/Cargo.toml
Normal file
17
crates/cgcx-file-pipeline/Cargo.toml
Normal file
@@ -0,0 +1,17 @@
|
||||
[package]
|
||||
name = "cgcx-file-pipeline"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
|
||||
[dependencies]
|
||||
cgcx-core = { path = "../cgcx-core" }
|
||||
cgcx-crypto = { path = "../cgcx-crypto" }
|
||||
cgcx-storage = { path = "../cgcx-storage" }
|
||||
cgcx-content-typing = { path = "../cgcx-content-typing" }
|
||||
cgcx-db = { path = "../cgcx-db" }
|
||||
cgcx-config = { path = "../cgcx-config" }
|
||||
tokio = { version = "1", features = ["fs", "io-util", "sync"] }
|
||||
tempfile = "3"
|
||||
tracing = "0.1"
|
||||
chrono = "0.4"
|
||||
sodiumoxide = "0.2"
|
||||
285
crates/cgcx-file-pipeline/src/lib.rs
Normal file
285
crates/cgcx-file-pipeline/src/lib.rs
Normal file
@@ -0,0 +1,285 @@
|
||||
use cgcx_config::Config;
|
||||
use cgcx_core::{ContentFile, ContentId, ContentStatus, Content, Result, CgcxError};
|
||||
use cgcx_crypto::{ContentKey, wrap_content_key};
|
||||
use cgcx_db::{Database, ContentRepo, ContentFileRepo};
|
||||
use cgcx_storage::Storage;
|
||||
use cgcx_content_typing::{detect_mime_type, compute_render_flags};
|
||||
use sodiumoxide::crypto::secretstream::xchacha20poly1305::Tag::{Message, Final};
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
|
||||
use std::collections::HashSet;
|
||||
|
||||
pub use cgcx_crypto::MasterKey;
|
||||
|
||||
pub struct FilePipeline {
|
||||
storage: Storage,
|
||||
db: Database,
|
||||
config: Config,
|
||||
}
|
||||
|
||||
impl FilePipeline {
|
||||
pub fn new(storage: Storage, db: Database, config: Config) -> Self {
|
||||
Self { storage, db, config }
|
||||
}
|
||||
|
||||
pub async fn ingest_file(
|
||||
&self,
|
||||
content_id: &ContentId,
|
||||
file_index: u32,
|
||||
mut source: impl AsyncRead + Unpin,
|
||||
original_name: &str,
|
||||
master_key: &MasterKey,
|
||||
sem: &tokio::sync::Semaphore,
|
||||
) -> Result<ContentFile> {
|
||||
let _permit = sem.acquire().await
|
||||
.map_err(|e| CgcxError::Storage(format!("semaphore acquire failed: {}", e)))?;
|
||||
|
||||
let chunk_size = self.config.storage.chunk_size_bytes;
|
||||
let mut buf = vec![0u8; chunk_size];
|
||||
|
||||
// Read first chunk for MIME detection
|
||||
let n = source.read(&mut buf).await
|
||||
.map_err(|e| CgcxError::Storage(format!("read failed: {}", e)))?;
|
||||
if n == 0 {
|
||||
return Err(CgcxError::BadRequest("empty file".into()));
|
||||
}
|
||||
|
||||
let mime_type = detect_mime_type(&buf[..n], original_name);
|
||||
let render_flags = compute_render_flags(&mime_type, original_name, &buf[..n]);
|
||||
|
||||
let content_key = ContentKey::generate();
|
||||
let mut encrypt_stream = cgcx_crypto::EncryptStream::new(&content_key.key);
|
||||
let header = encrypt_stream.header().clone();
|
||||
|
||||
let named_temp = self.storage.temp_file()?;
|
||||
let temp_path = named_temp.path().to_path_buf();
|
||||
|
||||
let mut total_size: u64 = 0;
|
||||
{
|
||||
let mut temp_file = tokio::fs::File::create(&temp_path).await
|
||||
.map_err(|e| CgcxError::Storage(format!("create temp file: {}", e)))?;
|
||||
|
||||
temp_file.write_all(header.as_ref()).await
|
||||
.map_err(|e| CgcxError::Storage(format!("write header: {}", e)))?;
|
||||
|
||||
let mut pending = n;
|
||||
|
||||
loop {
|
||||
if pending == chunk_size {
|
||||
let new_total = total_size + pending as u64;
|
||||
if new_total > self.config.upload_limits.max_file_size_bytes {
|
||||
return Err(CgcxError::BadRequest(format!(
|
||||
"file too large: {} > {}",
|
||||
new_total, self.config.upload_limits.max_file_size_bytes
|
||||
)));
|
||||
}
|
||||
total_size = new_total;
|
||||
let ciphertext = encrypt_stream.push(&buf[..pending], Message);
|
||||
temp_file.write_all(&(ciphertext.len() as u32).to_le_bytes()).await
|
||||
.map_err(|e| CgcxError::Storage(format!("write length prefix: {}", e)))?;
|
||||
temp_file.write_all(&ciphertext).await
|
||||
.map_err(|e| CgcxError::Storage(format!("write ciphertext: {}", e)))?;
|
||||
pending = 0;
|
||||
}
|
||||
|
||||
let read_n = source.read(&mut buf[pending..]).await
|
||||
.map_err(|e| CgcxError::Storage(format!("read failed: {}", e)))?;
|
||||
|
||||
if read_n == 0 {
|
||||
if pending > 0 {
|
||||
let new_total = total_size + pending as u64;
|
||||
if new_total > self.config.upload_limits.max_file_size_bytes {
|
||||
return Err(CgcxError::BadRequest(format!(
|
||||
"file too large: {} > {}",
|
||||
new_total, self.config.upload_limits.max_file_size_bytes
|
||||
)));
|
||||
}
|
||||
total_size = new_total;
|
||||
let ciphertext = encrypt_stream.push(&buf[..pending], Final);
|
||||
temp_file.write_all(&(ciphertext.len() as u32).to_le_bytes()).await
|
||||
.map_err(|e| CgcxError::Storage(format!("write length prefix: {}", e)))?;
|
||||
temp_file.write_all(&ciphertext).await
|
||||
.map_err(|e| CgcxError::Storage(format!("write ciphertext: {}", e)))?;
|
||||
} else if total_size > 0 {
|
||||
// File ended exactly on a chunk boundary; push empty final tag.
|
||||
let ciphertext = encrypt_stream.push(&[], Final);
|
||||
temp_file.write_all(&(ciphertext.len() as u32).to_le_bytes()).await
|
||||
.map_err(|e| CgcxError::Storage(format!("write length prefix: {}", e)))?;
|
||||
temp_file.write_all(&ciphertext).await
|
||||
.map_err(|e| CgcxError::Storage(format!("write ciphertext: {}", e)))?;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
pending += read_n;
|
||||
}
|
||||
|
||||
temp_file.flush().await
|
||||
.map_err(|e| CgcxError::Storage(format!("flush temp file: {}", e)))?;
|
||||
}
|
||||
|
||||
let encrypted_hash = encrypt_stream.finalize();
|
||||
let ciphertext_size_bytes = self.storage.file_size(&temp_path).await?;
|
||||
|
||||
let final_path = self.storage.file_path(content_id, file_index, &mime_type)?;
|
||||
if let Some(parent) = final_path.parent() {
|
||||
tokio::fs::create_dir_all(parent).await
|
||||
.map_err(|e| CgcxError::Storage(e.to_string()))?;
|
||||
}
|
||||
|
||||
named_temp.persist(&final_path)
|
||||
.map_err(|e| CgcxError::Storage(format!("persist failed: {}", e)))?;
|
||||
|
||||
let encrypted_key_wrapped = wrap_content_key(&content_key.key, master_key);
|
||||
|
||||
let content_file = ContentFile {
|
||||
content_id: content_id.clone(),
|
||||
file_index,
|
||||
original_name: original_name.to_string(),
|
||||
stored_path: final_path,
|
||||
mime_type,
|
||||
size_bytes: total_size,
|
||||
ciphertext_size_bytes,
|
||||
encrypted_key_wrapped,
|
||||
encrypted_hash: encrypted_hash.to_vec(),
|
||||
render_flags,
|
||||
created_at: chrono::Utc::now(),
|
||||
};
|
||||
|
||||
let file_repo = ContentFileRepo::new(self.db.conn());
|
||||
file_repo.insert(&content_file).await?;
|
||||
|
||||
Ok(content_file)
|
||||
}
|
||||
|
||||
pub async fn create_content_entry(
|
||||
&self,
|
||||
content_id: ContentId,
|
||||
user_id: i64,
|
||||
max_views: Option<u64>,
|
||||
allow_download: bool,
|
||||
password_hash: Option<String>,
|
||||
) -> Result<()> {
|
||||
let content = Content {
|
||||
id: content_id,
|
||||
user_id,
|
||||
status: ContentStatus::Staged,
|
||||
view_count: 0,
|
||||
max_views,
|
||||
allow_download,
|
||||
password_hash,
|
||||
created_at: chrono::Utc::now(),
|
||||
deleted_at: None,
|
||||
};
|
||||
let repo = ContentRepo::new(self.db.conn());
|
||||
repo.insert(&content).await
|
||||
}
|
||||
|
||||
pub async fn activate_content(&self, content_id: &ContentId) -> Result<()> {
|
||||
let repo = ContentRepo::new(self.db.conn());
|
||||
repo.set_status(content_id, ContentStatus::Active).await
|
||||
}
|
||||
|
||||
pub async fn delete_content(&self, content_id: &ContentId, keep_disk: bool) -> Result<()> {
|
||||
let file_repo = ContentFileRepo::new(self.db.conn());
|
||||
let files = file_repo.list_by_content(content_id).await?;
|
||||
|
||||
if !keep_disk {
|
||||
for file in &files {
|
||||
if let Err(e) = tokio::fs::remove_file(&file.stored_path).await {
|
||||
tracing::warn!("failed to remove file {:?}: {}", file.stored_path, e);
|
||||
}
|
||||
}
|
||||
if let Some(first) = files.first() {
|
||||
let _ = self.storage.delete_content_files(content_id, &first.mime_type).await;
|
||||
}
|
||||
}
|
||||
|
||||
let repo = ContentRepo::new(self.db.conn());
|
||||
repo.delete_permanent(content_id).await
|
||||
}
|
||||
|
||||
pub async fn cleanup_orphans(&self) -> Result<()> {
|
||||
let cutoff = std::time::SystemTime::now() - std::time::Duration::from_secs(24 * 60 * 60);
|
||||
|
||||
// 1. Clean old temp files
|
||||
let mut entries = tokio::fs::read_dir(self.storage.temp_dir()).await
|
||||
.map_err(|e| CgcxError::Storage(format!("read temp dir: {}", e)))?;
|
||||
while let Some(entry) = entries.next_entry().await
|
||||
.map_err(|e| CgcxError::Storage(format!("read temp dir entry: {}", e)))?
|
||||
{
|
||||
let path = entry.path();
|
||||
if path.extension().and_then(|s| s.to_str()) == Some("tmp") {
|
||||
if let Ok(meta) = entry.metadata().await {
|
||||
if let Ok(modified) = meta.modified() {
|
||||
if modified < cutoff {
|
||||
if let Err(e) = tokio::fs::remove_file(&path).await {
|
||||
tracing::warn!("failed to remove orphan temp file {:?}: {}", path, e);
|
||||
} else {
|
||||
tracing::info!("removed orphan temp file: {:?}", path);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 2. Clean unreferenced .enc files in storage dirs
|
||||
let file_repo = ContentFileRepo::new(self.db.conn());
|
||||
|
||||
for root in [self.storage.media_dir(), self.storage.documents_dir(), self.storage.text_dir()] {
|
||||
let mut entries = match tokio::fs::read_dir(root).await {
|
||||
Ok(e) => e,
|
||||
Err(e) => {
|
||||
tracing::warn!("failed to read storage dir {:?}: {}", root, e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
while let Some(entry) = entries.next_entry().await
|
||||
.map_err(|e| CgcxError::Storage(format!("read storage dir entry: {}", e)))?
|
||||
{
|
||||
let dir_path = entry.path();
|
||||
if !dir_path.is_dir() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let content_id_str = dir_path.file_name()
|
||||
.and_then(|s| s.to_str())
|
||||
.unwrap_or("");
|
||||
|
||||
let db_paths: HashSet<std::path::PathBuf> = if ContentId::is_valid(content_id_str) {
|
||||
let content_id = ContentId::new_unchecked(content_id_str.to_string());
|
||||
match file_repo.list_by_content(&content_id).await {
|
||||
Ok(files) => files.into_iter().map(|f| f.stored_path).collect(),
|
||||
Err(e) => {
|
||||
tracing::warn!("failed to list files for {}: {}", content_id, e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Invalid content directory – nothing in it can be referenced.
|
||||
HashSet::new()
|
||||
};
|
||||
|
||||
let mut sub_entries = tokio::fs::read_dir(&dir_path).await
|
||||
.map_err(|e| CgcxError::Storage(format!("read content dir: {}", e)))?;
|
||||
while let Some(sub_entry) = sub_entries.next_entry().await
|
||||
.map_err(|e| CgcxError::Storage(format!("read content dir entry: {}", e)))?
|
||||
{
|
||||
let path = sub_entry.path();
|
||||
if path.extension().and_then(|s| s.to_str()) == Some("enc") {
|
||||
if !db_paths.contains(&path) {
|
||||
if let Err(e) = tokio::fs::remove_file(&path).await {
|
||||
tracing::warn!("failed to remove orphan enc file {:?}: {}", path, e);
|
||||
} else {
|
||||
tracing::info!("removed orphan enc file: {:?}", path);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user