From 9b3f4a5fe620a3c4dca20b20930dbd0b3fe722b3 Mon Sep 17 00:00:00 2001 From: 409 <409dev@protonmail.com> Date: Fri, 29 Aug 2025 23:34:21 +0200 Subject: [PATCH] stream zip archive creation --- backend/Cargo.lock | 1 + backend/Cargo.toml | 3 +- backend/src/bin/backend/main.rs | 2 +- backend/src/lib/outbound/file_system.rs | 105 ++++++++++++++++-------- 4 files changed, 75 insertions(+), 36 deletions(-) diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 0975a34..b68560b 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -2806,6 +2806,7 @@ dependencies = [ "sqlx", "thiserror 2.0.12", "tokio", + "tokio-stream", "tokio-util", "tower", "tower-http", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index a0399f6..d879ade 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -38,7 +38,8 @@ sqlx = { version = "0.8.6", features = [ ] } thiserror = "2.0.12" tokio = { version = "1.46.1", features = ["full"] } -tokio-util = "0.7.15" +tokio-stream = "0.1.17" +tokio-util = { version = "0.7.15", features = ["io-util"] } tower = "0.5.2" tower-http = { version = "0.6.6", features = ["cors", "fs", "trace"] } tracing = "0.1.41" diff --git a/backend/src/bin/backend/main.rs b/backend/src/bin/backend/main.rs index 32c818c..d61bb6c 100644 --- a/backend/src/bin/backend/main.rs +++ b/backend/src/bin/backend/main.rs @@ -11,7 +11,7 @@ use warren::{ }, }; -#[tokio::main] +#[tokio::main(flavor = "multi_thread")] async fn main() -> anyhow::Result<()> { dotenv::dotenv().ok(); diff --git a/backend/src/lib/outbound/file_system.rs b/backend/src/lib/outbound/file_system.rs index c8fdc08..29d69ce 100644 --- a/backend/src/lib/outbound/file_system.rs +++ b/backend/src/lib/outbound/file_system.rs @@ -1,16 +1,16 @@ +use anyhow::{Context as _, anyhow, bail}; +use futures_util::TryStreamExt; +use rustix::fs::{Statx, statx}; use std::{ - io::{Cursor, Write}, + io::Write, os::unix::fs::MetadataExt, path::{Path, PathBuf}, }; - -use anyhow::{Context, anyhow, bail}; -use futures_util::TryStreamExt; -use rustix::fs::{Statx, statx}; use tokio::{ fs::{self}, io::{self, AsyncReadExt as _, AsyncWriteExt as _}, }; +use tokio_stream::wrappers::ReceiverStream; use tokio_util::io::ReaderStream; use crate::{ @@ -223,38 +223,19 @@ impl FileSystem { if metadata.is_dir() { drop(file); - let options = zip::write::SimpleFileOptions::default() - .compression_method(zip::CompressionMethod::Zstd) - .unix_permissions(0o755); + let (sync_tx, sync_rx) = + std::sync::mpsc::channel::>(); + let (tx, rx) = + tokio::sync::mpsc::channel::>(65536); - let mut file_buf = Vec::new(); - let zip_buf = Vec::new(); - let cursor = Cursor::new(zip_buf); - let mut zip = zip::ZipWriter::new(cursor); - - for entry_path_buf in walk_dir(&path).await? { - let entry_path = entry_path_buf.as_path(); - let entry_str = entry_path - .strip_prefix(&path)? - .to_str() - .context("Failed to get directory entry name")?; - - if entry_path.is_dir() { - zip.add_directory(entry_str, options)?; - continue; + tokio::task::spawn(async move { + while let Ok(v) = sync_rx.recv() { + let _ = tx.send(v).await; } + }); + tokio::task::spawn(create_zip(path, sync_tx)); - zip.start_file(entry_str, options)?; - let mut entry_file = tokio::fs::File::open(entry_path).await?; - entry_file.read_to_end(&mut file_buf).await?; - zip.write_all(&file_buf)?; - file_buf.clear(); - } - - let mut cursor = zip.finish()?; - - cursor.set_position(0); - let stream = FileStream::new(ReaderStream::new(cursor)); + let stream = FileStream::new(ReceiverStream::new(rx)); return Ok(stream); } @@ -479,3 +460,59 @@ where Ok(files) } + +async fn create_zip

( + path: P, + tx: std::sync::mpsc::Sender>, +) -> anyhow::Result<()> +where + P: AsRef, +{ + let options = zip::write::SimpleFileOptions::default() + .compression_method(zip::CompressionMethod::Deflated) + .unix_permissions(0o644); + + let mut file_buf = Vec::new(); + let mut zip = zip::write::ZipWriter::new_stream(ChannelWriter(tx)); + + for entry_path_buf in walk_dir(&path).await? { + let entry_path = entry_path_buf.as_path(); + let entry_str = entry_path + .strip_prefix(&path)? + .to_str() + .context("Failed to get directory entry name")?; + + if entry_path.is_dir() { + zip.add_directory(entry_str, options)?; + continue; + } + + zip.start_file(entry_str, options)?; + let mut entry_file = tokio::fs::File::open(entry_path).await?; + entry_file.read_to_end(&mut file_buf).await?; + zip.write_all(&file_buf)?; + file_buf.clear(); + } + + drop(zip.finish()?); + + Ok(()) +} + +struct ChannelWriter(std::sync::mpsc::Sender>); + +impl Write for ChannelWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let len = buf.len(); + let data = bytes::Bytes::copy_from_slice(&buf[..(len)]); + + self.0 + .send(Ok(data)) + .map(|_| len) + .map_err(|_| std::io::ErrorKind::Other.into()) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +}