From 702f16f199dd76c53960da2d5d716a1eeddc10fd Mon Sep 17 00:00:00 2001 From: 409 <409dev@protonmail.com> Date: Sat, 30 Aug 2025 02:42:38 +0200 Subject: [PATCH] improve zip downloads --- .../src/lib/domain/warren/models/file/mod.rs | 16 ++++++++----- .../http/handlers/warrens/cat_share.rs | 6 ++--- .../lib/inbound/http/handlers/warrens/mod.rs | 24 +++++++++++++++---- .../http/handlers/warrens/warren_cat.rs | 5 ++-- backend/src/lib/outbound/file_system.rs | 23 +++++++++++------- 5 files changed, 48 insertions(+), 26 deletions(-) diff --git a/backend/src/lib/domain/warren/models/file/mod.rs b/backend/src/lib/domain/warren/models/file/mod.rs index 5624f2e..08f597f 100644 --- a/backend/src/lib/domain/warren/models/file/mod.rs +++ b/backend/src/lib/domain/warren/models/file/mod.rs @@ -82,7 +82,7 @@ impl FileName { } /// A valid file type -#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Display)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Display)] #[serde(rename_all = "camelCase")] pub enum FileType { File, @@ -267,23 +267,27 @@ impl From for FilePath { pub type FileStreamInner = Box> + Send + Sync + Unpin + 'static>; -pub struct FileStream(FileStreamInner); +pub struct FileStream(FileType, FileStreamInner); impl FileStream { - pub fn new(stream: S) -> Self + pub fn new(file_type: FileType, stream: S) -> Self where S: Stream> + Send + Sync + Unpin + 'static, { - Self(Box::new(stream)) + Self(file_type, Box::new(stream)) + } + + pub fn file_type(&self) -> FileType { + self.0 } pub fn stream(&self) -> &FileStreamInner { - &self.0 + &self.1 } } impl From for FileStreamInner { fn from(value: FileStream) -> Self { - value.0 + value.1 } } diff --git a/backend/src/lib/inbound/http/handlers/warrens/cat_share.rs b/backend/src/lib/inbound/http/handlers/warrens/cat_share.rs index ee2481b..47c3057 100644 --- a/backend/src/lib/inbound/http/handlers/warrens/cat_share.rs +++ b/backend/src/lib/inbound/http/handlers/warrens/cat_share.rs @@ -1,6 +1,6 @@ use axum::{ - body::Body, extract::{Query, State}, + response::IntoResponse, }; use serde::Deserialize; use thiserror::Error; @@ -74,13 +74,13 @@ pub async fn cat_share( State(state): State>, SharePasswordHeader(password): SharePasswordHeader, Query(request): Query, -) -> Result { +) -> Result { let domain_request = request.try_into_domain(password)?; state .warren_service .warren_share_cat(domain_request) .await - .map(|response| FileStream::from(response).into()) + .map(|response| FileStream::from(response)) .map_err(ApiError::from) } diff --git a/backend/src/lib/inbound/http/handlers/warrens/mod.rs b/backend/src/lib/inbound/http/handlers/warrens/mod.rs index d9504c4..a9d4461 100644 --- a/backend/src/lib/inbound/http/handlers/warrens/mod.rs +++ b/backend/src/lib/inbound/http/handlers/warrens/mod.rs @@ -16,8 +16,9 @@ mod warren_rm; use axum::{ Router, - body::Body, extract::DefaultBodyLimit, + http::{self, HeaderValue, Response}, + response::IntoResponse, routing::{get, post}, }; @@ -68,10 +69,23 @@ impl From<&File> for WarrenFileElement { } } -impl From for Body { - fn from(value: FileStream) -> Self { - let inner: FileStreamInner = value.into(); - Body::from_stream(inner) +impl IntoResponse for FileStream { + fn into_response(self) -> axum::response::Response { + let mut builder = Response::builder().header(http::header::TRANSFER_ENCODING, "chunked"); + + if let Some(headers) = builder.headers_mut() { + if self.file_type() == FileType::Directory { + headers.insert( + http::header::CONTENT_TYPE, + HeaderValue::from_str("application/zip").unwrap(), + ); + } + + headers.remove(http::header::CONTENT_LENGTH); + } + + let inner: FileStreamInner = self.into(); + builder.body(axum::body::Body::from_stream(inner)).unwrap() } } diff --git a/backend/src/lib/inbound/http/handlers/warrens/warren_cat.rs b/backend/src/lib/inbound/http/handlers/warrens/warren_cat.rs index 6301285..7f5ad33 100644 --- a/backend/src/lib/inbound/http/handlers/warrens/warren_cat.rs +++ b/backend/src/lib/inbound/http/handlers/warrens/warren_cat.rs @@ -1,6 +1,6 @@ use axum::{ - body::Body, extract::{Query, State}, + response::IntoResponse, }; use serde::Deserialize; use thiserror::Error; @@ -62,13 +62,12 @@ pub async fn fetch_file( State(state): State>, SessionIdHeader(session): SessionIdHeader, Query(request): Query, -) -> Result { +) -> Result { let domain_request = AuthRequest::new(session, request.try_into_domain()?); state .auth_service .auth_warren_cat(domain_request, state.warren_service.as_ref()) .await - .map(|contents| contents.into()) .map_err(ApiError::from) } diff --git a/backend/src/lib/outbound/file_system.rs b/backend/src/lib/outbound/file_system.rs index 29d69ce..5a13e7f 100644 --- a/backend/src/lib/outbound/file_system.rs +++ b/backend/src/lib/outbound/file_system.rs @@ -225,17 +225,16 @@ impl FileSystem { let (sync_tx, sync_rx) = std::sync::mpsc::channel::>(); - let (tx, rx) = - tokio::sync::mpsc::channel::>(65536); + let (tx, rx) = tokio::sync::mpsc::channel::>(1024); + tokio::task::spawn(create_zip(path, sync_tx)); tokio::task::spawn(async move { while let Ok(v) = sync_rx.recv() { let _ = tx.send(v).await; } }); - tokio::task::spawn(create_zip(path, sync_tx)); - let stream = FileStream::new(ReceiverStream::new(rx)); + let stream = FileStream::new(FileType::Directory, ReceiverStream::new(rx)); return Ok(stream); } @@ -246,7 +245,7 @@ impl FileSystem { bail!("File size exceeds configured limit"); } - let stream = FileStream::new(ReaderStream::new(file)); + let stream = FileStream::new(FileType::File, ReaderStream::new(file)); Ok(stream) } @@ -469,13 +468,17 @@ where P: AsRef, { let options = zip::write::SimpleFileOptions::default() - .compression_method(zip::CompressionMethod::Deflated) + .compression_method(zip::CompressionMethod::Stored) + .compression_level(None) + .large_file(true) .unix_permissions(0o644); let mut file_buf = Vec::new(); let mut zip = zip::write::ZipWriter::new_stream(ChannelWriter(tx)); - for entry_path_buf in walk_dir(&path).await? { + let entries = walk_dir(&path).await?; + + for entry_path_buf in entries { let entry_path = entry_path_buf.as_path(); let entry_str = entry_path .strip_prefix(&path)? @@ -489,12 +492,14 @@ where zip.start_file(entry_str, options)?; let mut entry_file = tokio::fs::File::open(entry_path).await?; + entry_file.read_to_end(&mut file_buf).await?; + zip.write_all(&file_buf)?; file_buf.clear(); } - drop(zip.finish()?); + zip.finish()?; Ok(()) } @@ -504,7 +509,7 @@ struct ChannelWriter(std::sync::mpsc::Sender std::io::Result { let len = buf.len(); - let data = bytes::Bytes::copy_from_slice(&buf[..(len)]); + let data = bytes::Bytes::copy_from_slice(buf); self.0 .send(Ok(data))