Huge refactor, submission system addition & security improvements. +Implementation of moderation cmds.
This commit is contained in:
@@ -21,6 +21,7 @@ tower-http = { version = "0.6", features = ["fs", "trace", "cors", "compression-
|
||||
tokio = { version = "1", features = ["rt-multi-thread", "macros", "fs", "sync"] }
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
tracing-appender = "0.2"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
|
||||
@@ -26,6 +26,7 @@ use tower_http::{
|
||||
trace::TraceLayer,
|
||||
};
|
||||
use tracing::{info, warn};
|
||||
use tracing_subscriber::prelude::*;
|
||||
use sodiumoxide::crypto::secretstream::xchacha20poly1305::Tag::Final as TagFinal;
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -68,9 +69,42 @@ struct VerifyPasswordRequest {
|
||||
password: String,
|
||||
}
|
||||
|
||||
fn deserialize_download_bool<'de, D>(deserializer: D) -> Result<bool, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
struct DownloadBoolVisitor;
|
||||
|
||||
impl<'de> serde::de::Visitor<'de> for DownloadBoolVisitor {
|
||||
type Value = bool;
|
||||
|
||||
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
formatter.write_str("a boolean or string representing a boolean")
|
||||
}
|
||||
|
||||
fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E> {
|
||||
Ok(v)
|
||||
}
|
||||
|
||||
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E> {
|
||||
Ok(matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes"))
|
||||
}
|
||||
|
||||
fn visit_string<E: serde::de::Error>(self, v: String) -> Result<Self::Value, E> {
|
||||
self.visit_str(&v)
|
||||
}
|
||||
|
||||
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E> {
|
||||
Ok(v == 1)
|
||||
}
|
||||
}
|
||||
|
||||
deserializer.deserialize_any(DownloadBoolVisitor)
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct FileQuery {
|
||||
#[serde(default)]
|
||||
#[serde(default, deserialize_with = "deserialize_download_bool")]
|
||||
download: bool,
|
||||
#[serde(rename = "sc", default)]
|
||||
sc: Option<String>,
|
||||
@@ -87,6 +121,11 @@ struct ByteRange {
|
||||
end: Option<u64>,
|
||||
}
|
||||
|
||||
enum AuthSource {
|
||||
Cookie,
|
||||
QueryParam,
|
||||
}
|
||||
|
||||
struct AppError(CgcxError);
|
||||
|
||||
impl From<CgcxError> for AppError {
|
||||
@@ -119,7 +158,57 @@ type AppResult<T> = Result<T, AppError>;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> cgcx_core::Result<()> {
|
||||
tracing_subscriber::fmt::init();
|
||||
let config = Arc::new(Config::load()?);
|
||||
config.validate()?;
|
||||
|
||||
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(&config.logging.level));
|
||||
|
||||
let console_layer = tracing_subscriber::fmt::layer();
|
||||
|
||||
let _file_guard = if config.logging.file_enabled {
|
||||
let log_path = std::path::Path::new(&config.logging.file_path);
|
||||
let log_dir = log_path.parent()
|
||||
.and_then(|p| p.to_str())
|
||||
.unwrap_or("data/logs");
|
||||
let log_prefix = log_path.file_name()
|
||||
.and_then(|f| f.to_str())
|
||||
.unwrap_or("cgcx-server.log");
|
||||
std::fs::create_dir_all(log_dir).ok();
|
||||
match tracing_appender::rolling::Builder::new()
|
||||
.rotation(tracing_appender::rolling::Rotation::DAILY)
|
||||
.filename_prefix(log_prefix)
|
||||
.max_log_files(config.logging.max_files)
|
||||
.build(log_dir)
|
||||
{
|
||||
Ok(file_appender) => {
|
||||
let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
|
||||
let file_layer = tracing_subscriber::fmt::layer()
|
||||
.with_writer(non_blocking)
|
||||
.with_ansi(false);
|
||||
tracing_subscriber::registry()
|
||||
.with(env_filter)
|
||||
.with(console_layer)
|
||||
.with(file_layer)
|
||||
.init();
|
||||
Some(guard)
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("Failed to create rolling file appender at {}: {}. Falling back to console only.", log_dir, e);
|
||||
tracing_subscriber::registry()
|
||||
.with(env_filter)
|
||||
.with(console_layer)
|
||||
.init();
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tracing_subscriber::registry()
|
||||
.with(env_filter)
|
||||
.with(console_layer)
|
||||
.init();
|
||||
None
|
||||
};
|
||||
|
||||
// Log panics so we can diagnose 500s that CatchPanicLayer swallows.
|
||||
std::panic::set_hook(Box::new(|info| {
|
||||
@@ -134,9 +223,6 @@ async fn main() -> cgcx_core::Result<()> {
|
||||
tracing::error!("PANIC at {}: {}", location, msg);
|
||||
}));
|
||||
|
||||
let config = Arc::new(Config::load()?);
|
||||
config.validate()?;
|
||||
|
||||
let db_path = std::path::PathBuf::from(&config.database_path);
|
||||
if let Some(parent) = db_path.parent() {
|
||||
tokio::fs::create_dir_all(parent).await.ok();
|
||||
@@ -236,6 +322,7 @@ async fn main() -> cgcx_core::Result<()> {
|
||||
.route("/api/health", get(health))
|
||||
.route("/api/content/:cxid", get(get_metadata))
|
||||
.route("/api/content/:cxid/file/:file_idx", get(serve_file))
|
||||
.route("/api/content/:cxid/file/:file_idx/raw", get(serve_raw_file))
|
||||
.merge(password_route)
|
||||
.nest_service("/assets", static_service)
|
||||
.fallback(fallback)
|
||||
@@ -299,7 +386,7 @@ async fn security_headers(req: axum::http::Request<Body>, next: Next) -> Respons
|
||||
let headers = response.headers_mut();
|
||||
headers.insert(
|
||||
header::CONTENT_SECURITY_POLICY,
|
||||
HeaderValue::from_static("default-src 'self'; script-src 'self'; style-src 'self' 'unsafe-inline'; img-src 'self' data: blob:; media-src 'self' blob:; connect-src 'self'; frame-ancestors 'none'; base-uri 'self'; form-action 'self';"),
|
||||
HeaderValue::from_static("default-src 'self'; script-src 'self'; style-src 'self' 'unsafe-inline'; img-src 'self' data: blob:; media-src 'self' blob:; connect-src 'self'; object-src 'self'; frame-ancestors 'none'; base-uri 'self'; form-action 'self';"),
|
||||
);
|
||||
headers.insert(header::X_CONTENT_TYPE_OPTIONS, HeaderValue::from_static("nosniff"));
|
||||
headers.insert(header::X_FRAME_OPTIONS, HeaderValue::from_static("DENY"));
|
||||
@@ -328,19 +415,19 @@ fn password_from_request(
|
||||
cxid: &str,
|
||||
password_hash: Option<&str>,
|
||||
cookie_secret: &[u8],
|
||||
) -> bool {
|
||||
) -> Option<AuthSource> {
|
||||
if let Some(sc) = query_sc {
|
||||
if let Some(hash) = password_hash {
|
||||
use argon2::{Argon2, PasswordHash, PasswordVerifier};
|
||||
if let Ok(parsed_hash) = PasswordHash::new(hash) {
|
||||
if Argon2::default().verify_password(sc.as_bytes(), &parsed_hash).is_ok() {
|
||||
return true;
|
||||
return Some(AuthSource::QueryParam);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
headers
|
||||
if headers
|
||||
.get_all(header::COOKIE)
|
||||
.iter()
|
||||
.any(|v| {
|
||||
@@ -351,6 +438,31 @@ fn password_from_request(
|
||||
})
|
||||
}).unwrap_or(false)
|
||||
})
|
||||
{
|
||||
return Some(AuthSource::Cookie);
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
fn add_auth_cookie(
|
||||
response: &mut Response,
|
||||
auth_source: &Option<AuthSource>,
|
||||
cxid: &str,
|
||||
cookie_secret: &[u8],
|
||||
) -> AppResult<()> {
|
||||
if let Some(AuthSource::QueryParam) = auth_source {
|
||||
let cookie_value = make_cookie_value(cxid, cookie_secret);
|
||||
let cookie = format!(
|
||||
"cgcx_pw={}; Max-Age=3600; SameSite=Strict; HttpOnly; Path=/",
|
||||
cookie_value
|
||||
);
|
||||
response.headers_mut().insert(
|
||||
header::SET_COOKIE,
|
||||
HeaderValue::from_str(&cookie).map_err(|_| CgcxError::Storage("invalid cookie header".into()))?,
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_metadata(
|
||||
@@ -378,12 +490,17 @@ async fn get_metadata(
|
||||
}
|
||||
}
|
||||
|
||||
if content.password_hash.is_some() {
|
||||
if !password_from_request(&headers, query.sc.as_deref(), &cxid, content.password_hash.as_deref(), &state.cookie_secret) {
|
||||
tracing::warn!("get_metadata returning Unauthorized for cxid={}", cxid);
|
||||
return Err(CgcxError::Unauthorized.into());
|
||||
let auth_source = if content.password_hash.is_some() {
|
||||
match password_from_request(&headers, query.sc.as_deref(), &cxid, content.password_hash.as_deref(), &state.cookie_secret) {
|
||||
Some(source) => Some(source),
|
||||
None => {
|
||||
tracing::warn!("get_metadata returning Unauthorized for cxid={}", cxid);
|
||||
return Err(CgcxError::Unauthorized.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let file_repo = ContentFileRepo::new(state.db.conn());
|
||||
let files = file_repo.list_by_content(&content_id).await?;
|
||||
@@ -403,11 +520,13 @@ async fn get_metadata(
|
||||
allow_download: content.allow_download,
|
||||
created_at: content.created_at.to_rfc3339(),
|
||||
}).map_err(|_| CgcxError::BadRequest("json serialization".into()))?;
|
||||
Ok(Response::builder()
|
||||
let mut response = Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(header::CONTENT_TYPE, "application/json")
|
||||
.body(Body::from(body))
|
||||
.map_err(|e| CgcxError::Storage(format!("response build failed: {}", e)))?)
|
||||
.map_err(|e| CgcxError::Storage(format!("response build failed: {}", e)))?;
|
||||
add_auth_cookie(&mut response, &auth_source, &cxid, &state.cookie_secret)?;
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
async fn verify_password(
|
||||
@@ -451,6 +570,22 @@ async fn verify_password(
|
||||
.map_err(|e| CgcxError::Storage(format!("response build failed: {}", e)))?)
|
||||
}
|
||||
|
||||
fn client_ip_from_headers(headers: &HeaderMap) -> String {
|
||||
if let Some(xff) = headers.get("x-forwarded-for") {
|
||||
if let Ok(s) = xff.to_str() {
|
||||
if let Some(ip) = s.split(',').next() {
|
||||
return ip.trim().to_string();
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(xri) = headers.get("x-real-ip") {
|
||||
if let Ok(s) = xri.to_str() {
|
||||
return s.trim().to_string();
|
||||
}
|
||||
}
|
||||
"unknown".to_string()
|
||||
}
|
||||
|
||||
async fn serve_file(
|
||||
State(state): State<AppState>,
|
||||
Path((cxid, file_idx)): Path<(String, u32)>,
|
||||
@@ -476,15 +611,22 @@ async fn serve_file(
|
||||
}
|
||||
}
|
||||
|
||||
if content.password_hash.is_some() {
|
||||
if !password_from_request(&headers, query.sc.as_deref(), &cxid, content.password_hash.as_deref(), &state.cookie_secret) {
|
||||
tracing::warn!("serve_file returning Unauthorized for cxid={}", cxid);
|
||||
return Err(CgcxError::Unauthorized.into());
|
||||
let auth_source = if content.password_hash.is_some() {
|
||||
match password_from_request(&headers, query.sc.as_deref(), &cxid, content.password_hash.as_deref(), &state.cookie_secret) {
|
||||
Some(source) => Some(source),
|
||||
None => {
|
||||
let ip = client_ip_from_headers(&headers);
|
||||
tracing::warn!("serve_file returning Unauthorized for cxid={} file_idx={} ip={}", cxid, file_idx, ip);
|
||||
return Err(CgcxError::Unauthorized.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
if query.download && !content.allow_download {
|
||||
tracing::warn!("serve_file returning Forbidden (download not allowed) for cxid={}", cxid);
|
||||
let ip = client_ip_from_headers(&headers);
|
||||
tracing::warn!("serve_file returning Forbidden (download not allowed) for cxid={} file_idx={} ip={}", cxid, file_idx, ip);
|
||||
return Err(CgcxError::Forbidden.into());
|
||||
}
|
||||
|
||||
@@ -502,26 +644,30 @@ async fn serve_file(
|
||||
} else {
|
||||
format!("inline; filename=\"{}\"", sanitized_name)
|
||||
};
|
||||
return Ok(Response::builder()
|
||||
let mut response = Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(header::CONTENT_TYPE, content_type)
|
||||
.header(header::CONTENT_DISPOSITION, disposition)
|
||||
.header(header::ETAG, etag)
|
||||
.header(header::CONTENT_LENGTH, "0")
|
||||
.header(header::CACHE_CONTROL, "private, no-store, max-age=0")
|
||||
.header(header::CACHE_CONTROL, if content.password_hash.is_some() { "private, no-store, max-age=0" } else { "private, max-age=60" })
|
||||
.body(Body::empty())
|
||||
.map_err(|e| CgcxError::Storage(format!("response build failed: {}", e)))?);
|
||||
.map_err(|e| CgcxError::Storage(format!("response build failed: {}", e)))?;
|
||||
add_auth_cookie(&mut response, &auth_source, &cxid, &state.cookie_secret)?;
|
||||
return Ok(response);
|
||||
}
|
||||
|
||||
// Path traversal validation
|
||||
let canonical_path = tokio::fs::canonicalize(&file.stored_path).await
|
||||
.map_err(|e| {
|
||||
tracing::error!("canonicalize failed for {:?}: {}", file.stored_path, e);
|
||||
let ip = client_ip_from_headers(&headers);
|
||||
tracing::error!("canonicalize failed for {:?}: {} | ip={} cxid={} file_idx={}", file.stored_path, e, ip, cxid, file_idx);
|
||||
CgcxError::Storage("invalid stored path".into())
|
||||
})?;
|
||||
if !state.allowed_roots.iter().any(|root| canonical_path.starts_with(root)) {
|
||||
tracing::error!("Path traversal blocked: {:?}", canonical_path);
|
||||
tracing::warn!("serve_file returning Forbidden (path traversal) for cxid={}", cxid);
|
||||
let ip = client_ip_from_headers(&headers);
|
||||
tracing::error!("Path traversal blocked: {:?} | ip={} cxid={} file_idx={}", canonical_path, ip, cxid, file_idx);
|
||||
tracing::warn!("serve_file returning Forbidden (path traversal) for cxid={} file_idx={} ip={}", cxid, file_idx, ip);
|
||||
return Err(CgcxError::Forbidden.into());
|
||||
}
|
||||
|
||||
@@ -530,11 +676,13 @@ async fn serve_file(
|
||||
// If-None-Match check (skip increment)
|
||||
if let Some(inm) = headers.get(header::IF_NONE_MATCH) {
|
||||
if inm.to_str().ok().map(|s| s == etag).unwrap_or(false) {
|
||||
return Ok(Response::builder()
|
||||
let mut response = Response::builder()
|
||||
.status(StatusCode::NOT_MODIFIED)
|
||||
.header(header::ETAG, etag.clone())
|
||||
.body(Body::empty())
|
||||
.map_err(|e| CgcxError::Storage(format!("response build failed: {}", e)))?);
|
||||
.map_err(|e| CgcxError::Storage(format!("response build failed: {}", e)))?;
|
||||
add_auth_cookie(&mut response, &auth_source, &cxid, &state.cookie_secret)?;
|
||||
return Ok(response);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -573,10 +721,12 @@ async fn serve_file(
|
||||
let _ = state.storage.delete_content_files(&content_id, "application/octet-stream").await;
|
||||
}
|
||||
repo.set_status(&content_id, cgcx_core::ContentStatus::Deleted).await?;
|
||||
return Ok(Response::builder()
|
||||
let mut response = Response::builder()
|
||||
.status(StatusCode::GONE)
|
||||
.body(Body::empty())
|
||||
.map_err(|e| CgcxError::Storage(format!("response build failed: {}", e)))?);
|
||||
.map_err(|e| CgcxError::Storage(format!("response build failed: {}", e)))?;
|
||||
add_auth_cookie(&mut response, &auth_source, &cxid, &state.cookie_secret)?;
|
||||
return Ok(response);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -636,7 +786,140 @@ async fn serve_file(
|
||||
let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
|
||||
let body = Body::from_stream(body_stream);
|
||||
|
||||
Ok(response.body(body).map_err(|e| CgcxError::Storage(format!("response build failed: {}", e)))?)
|
||||
let mut response = response.body(body).map_err(|e| CgcxError::Storage(format!("response build failed: {}", e)))?;
|
||||
add_auth_cookie(&mut response, &auth_source, &cxid, &state.cookie_secret)?;
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
async fn serve_raw_file(
|
||||
State(state): State<AppState>,
|
||||
Path((cxid, file_idx)): Path<(String, u32)>,
|
||||
Query(query): Query<ScQuery>,
|
||||
headers: HeaderMap,
|
||||
) -> AppResult<impl IntoResponse> {
|
||||
tracing::info!("serve_raw_file: cxid={} file_idx={}", cxid, file_idx);
|
||||
let content_id = ContentId::try_from(cxid.as_str())?;
|
||||
let repo = ContentRepo::new(state.db.conn());
|
||||
let content = repo.get(&content_id).await?.ok_or(CgcxError::NotFound)?;
|
||||
|
||||
if content.status == cgcx_core::ContentStatus::Deleted || content.status == cgcx_core::ContentStatus::Blacklisted {
|
||||
tracing::warn!("serve_raw_file returning NotFound for cxid={}", cxid);
|
||||
return Err(CgcxError::NotFound.into());
|
||||
}
|
||||
|
||||
if let Some(max) = content.max_views {
|
||||
if content.view_count >= max {
|
||||
return Ok(Response::builder()
|
||||
.status(StatusCode::GONE)
|
||||
.body(Body::empty())
|
||||
.map_err(|e| CgcxError::Storage(format!("response build failed: {}", e)))?);
|
||||
}
|
||||
}
|
||||
|
||||
let auth_source = if content.password_hash.is_some() {
|
||||
match password_from_request(&headers, query.sc.as_deref(), &cxid, content.password_hash.as_deref(), &state.cookie_secret) {
|
||||
Some(source) => Some(source),
|
||||
None => {
|
||||
let ip = client_ip_from_headers(&headers);
|
||||
tracing::warn!("serve_raw_file returning Unauthorized for cxid={} file_idx={} ip={}", cxid, file_idx, ip);
|
||||
return Err(CgcxError::Unauthorized.into());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let file_repo = ContentFileRepo::new(state.db.conn());
|
||||
let files = file_repo.list_by_content(&content_id).await?;
|
||||
let file = files.iter().find(|f| f.file_index == file_idx).ok_or(CgcxError::NotFound)?;
|
||||
|
||||
// Handle zero-size files early
|
||||
if file.size_bytes == 0 {
|
||||
let sanitized_name = sanitize_content_disposition(&file.original_name);
|
||||
let mut response = Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(header::CONTENT_TYPE, "text/plain; charset=utf-8")
|
||||
.header(header::CONTENT_DISPOSITION, format!("inline; filename=\"{}\"", sanitized_name))
|
||||
.header(header::CONTENT_LENGTH, "0")
|
||||
.body(Body::empty())
|
||||
.map_err(|e| CgcxError::Storage(format!("response build failed: {}", e)))?;
|
||||
add_auth_cookie(&mut response, &auth_source, &cxid, &state.cookie_secret)?;
|
||||
return Ok(response);
|
||||
}
|
||||
|
||||
// Path traversal validation
|
||||
let canonical_path = tokio::fs::canonicalize(&file.stored_path).await
|
||||
.map_err(|e| {
|
||||
let ip = client_ip_from_headers(&headers);
|
||||
tracing::error!("canonicalize failed for {:?}: {} | ip={} cxid={} file_idx={}", file.stored_path, e, ip, cxid, file_idx);
|
||||
CgcxError::Storage("invalid stored path".into())
|
||||
})?;
|
||||
if !state.allowed_roots.iter().any(|root| canonical_path.starts_with(root)) {
|
||||
let ip = client_ip_from_headers(&headers);
|
||||
tracing::error!("Path traversal blocked: {:?} | ip={} cxid={} file_idx={}", canonical_path, ip, cxid, file_idx);
|
||||
tracing::warn!("serve_raw_file returning Forbidden (path traversal) for cxid={} file_idx={} ip={}", cxid, file_idx, ip);
|
||||
return Err(CgcxError::Forbidden.into());
|
||||
}
|
||||
|
||||
// Decrypt entire file into memory
|
||||
let mut f = tokio::fs::File::open(&file.stored_path).await.map_err(|e| CgcxError::Storage(e.to_string()))?;
|
||||
let mut header_buf = [0u8; 24];
|
||||
f.read_exact(&mut header_buf).await.map_err(|e| CgcxError::Storage(e.to_string()))?;
|
||||
|
||||
let content_key = unwrap_content_key(&file.encrypted_key_wrapped, &state.master_key)?;
|
||||
let header = sodiumoxide::crypto::secretstream::xchacha20poly1305::Header::from_slice(&header_buf)
|
||||
.ok_or_else(|| CgcxError::Crypto("invalid header".into()))?;
|
||||
let mut decrypt_stream = DecryptStream::new(&content_key, &header)?;
|
||||
|
||||
let mut plaintext = Vec::with_capacity(file.size_bytes as usize);
|
||||
let mut len_buf = [0u8; 4];
|
||||
let mut saw_final = false;
|
||||
|
||||
loop {
|
||||
if f.read_exact(&mut len_buf).await.is_err() {
|
||||
break; // EOF at message boundary
|
||||
}
|
||||
let msg_len = u32::from_le_bytes(len_buf) as usize;
|
||||
if msg_len > 50_000_000 {
|
||||
return Err(AppError(CgcxError::Crypto("message length exceeds sanity bound".into())));
|
||||
}
|
||||
let mut msg_buf = vec![0u8; msg_len];
|
||||
f.read_exact(&mut msg_buf).await.map_err(|e| CgcxError::Storage(e.to_string()))?;
|
||||
|
||||
match decrypt_stream.pull(&msg_buf) {
|
||||
Ok((chunk, tag)) => {
|
||||
plaintext.extend_from_slice(&chunk);
|
||||
if tag == TagFinal {
|
||||
saw_final = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(AppError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !saw_final {
|
||||
return Err(AppError(CgcxError::Crypto("stream ended without Final tag".into())));
|
||||
}
|
||||
|
||||
let computed_hash = decrypt_stream.finalize().to_vec();
|
||||
if computed_hash != file.encrypted_hash {
|
||||
tracing::error!(target: "critical", "BLAKE3 integrity mismatch for raw file {:?}: expected {} got {}", file.stored_path, hex::encode(&file.encrypted_hash), hex::encode(&computed_hash));
|
||||
return Err(AppError(CgcxError::Crypto("BLAKE3 integrity mismatch".into())));
|
||||
}
|
||||
|
||||
let text = String::from_utf8_lossy(&plaintext);
|
||||
let sanitized_name = sanitize_content_disposition(&file.original_name);
|
||||
let mut response = Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(header::CONTENT_TYPE, "text/plain; charset=utf-8")
|
||||
.header(header::CONTENT_DISPOSITION, format!("inline; filename=\"{}\"", sanitized_name))
|
||||
.body(Body::from(text.into_owned()))
|
||||
.map_err(|e| CgcxError::Storage(format!("response build failed: {}", e)))?;
|
||||
add_auth_cookie(&mut response, &auth_source, &cxid, &state.cookie_secret)?;
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
async fn stream_decrypted_file(
|
||||
@@ -644,7 +927,7 @@ async fn stream_decrypted_file(
|
||||
master_key: Arc<MasterKey>,
|
||||
wrapped_key: Vec<u8>,
|
||||
tx: tokio::sync::mpsc::Sender<Result<Vec<u8>, std::io::Error>>,
|
||||
_range: Option<ByteRange>,
|
||||
range: Option<ByteRange>,
|
||||
_file_size: u64,
|
||||
expected_hash: Vec<u8>,
|
||||
) -> cgcx_core::Result<()> {
|
||||
@@ -658,6 +941,92 @@ async fn stream_decrypted_file(
|
||||
let mut decrypt_stream = DecryptStream::new(&content_key, &header)?;
|
||||
|
||||
let mut len_buf = [0u8; 4];
|
||||
|
||||
if let Some(ref r) = range {
|
||||
let range_start = r.start;
|
||||
let range_len = r.end.map(|e| e - r.start + 1);
|
||||
let mut skipped_plaintext: u64 = 0;
|
||||
let mut sent: u64 = 0;
|
||||
|
||||
loop {
|
||||
if file.read_exact(&mut len_buf).await.is_err() {
|
||||
break; // EOF at message boundary
|
||||
}
|
||||
let ciphertext_len = u32::from_le_bytes(len_buf) as usize;
|
||||
if ciphertext_len > 50_000_000 {
|
||||
let _ = tx.send(Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "message too large"))).await;
|
||||
return Err(CgcxError::Crypto("message length exceeds sanity bound".into()));
|
||||
}
|
||||
let plaintext_len = ciphertext_len.saturating_sub(16) as u64;
|
||||
|
||||
if skipped_plaintext + plaintext_len <= range_start {
|
||||
// Advance the decrypt stream state by reading and decrypting the
|
||||
// skipped chunk, then discarding the plaintext. XChaCha20-Poly1305
|
||||
// secretstream is stateful and must be processed sequentially.
|
||||
let mut skip_buf = vec![0u8; ciphertext_len];
|
||||
file.read_exact(&mut skip_buf).await.map_err(|e| CgcxError::Storage(e.to_string()))?;
|
||||
match decrypt_stream.pull(&skip_buf) {
|
||||
Ok((_, tag)) => {
|
||||
if tag == TagFinal {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = tx.send(Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))).await;
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
skipped_plaintext += plaintext_len;
|
||||
continue;
|
||||
}
|
||||
|
||||
let trim_start = if skipped_plaintext < range_start {
|
||||
(range_start - skipped_plaintext) as usize
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
let mut msg_buf = vec![0u8; ciphertext_len];
|
||||
file.read_exact(&mut msg_buf).await.map_err(|e| CgcxError::Storage(e.to_string()))?;
|
||||
|
||||
match decrypt_stream.pull(&msg_buf) {
|
||||
Ok((plaintext, tag)) => {
|
||||
let start = trim_start.min(plaintext.len());
|
||||
let mut slice = &plaintext[start..];
|
||||
if let Some(max_total) = range_len {
|
||||
let remaining = (max_total - sent) as usize;
|
||||
if slice.len() > remaining {
|
||||
slice = &slice[..remaining];
|
||||
}
|
||||
}
|
||||
if !slice.is_empty() {
|
||||
if tx.send(Ok(slice.to_vec())).await.is_err() {
|
||||
return Ok(()); // client disconnected
|
||||
}
|
||||
sent += slice.len() as u64;
|
||||
}
|
||||
if let Some(max_total) = range_len {
|
||||
if sent >= max_total {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
if tag == TagFinal {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = tx.send(Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))).await;
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
skipped_plaintext += plaintext_len;
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Full-file streaming
|
||||
let mut saw_final = false;
|
||||
loop {
|
||||
if file.read_exact(&mut len_buf).await.is_err() {
|
||||
|
||||
Reference in New Issue
Block a user