stream zip archive creation
This commit is contained in:
@@ -1,16 +1,16 @@
|
||||
use anyhow::{Context as _, anyhow, bail};
|
||||
use futures_util::TryStreamExt;
|
||||
use rustix::fs::{Statx, statx};
|
||||
use std::{
|
||||
io::{Cursor, Write},
|
||||
io::Write,
|
||||
os::unix::fs::MetadataExt,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
use anyhow::{Context, anyhow, bail};
|
||||
use futures_util::TryStreamExt;
|
||||
use rustix::fs::{Statx, statx};
|
||||
use tokio::{
|
||||
fs::{self},
|
||||
io::{self, AsyncReadExt as _, AsyncWriteExt as _},
|
||||
};
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tokio_util::io::ReaderStream;
|
||||
|
||||
use crate::{
|
||||
@@ -223,38 +223,19 @@ impl FileSystem {
|
||||
if metadata.is_dir() {
|
||||
drop(file);
|
||||
|
||||
let options = zip::write::SimpleFileOptions::default()
|
||||
.compression_method(zip::CompressionMethod::Zstd)
|
||||
.unix_permissions(0o755);
|
||||
let (sync_tx, sync_rx) =
|
||||
std::sync::mpsc::channel::<Result<bytes::Bytes, std::io::Error>>();
|
||||
let (tx, rx) =
|
||||
tokio::sync::mpsc::channel::<Result<bytes::Bytes, std::io::Error>>(65536);
|
||||
|
||||
let mut file_buf = Vec::new();
|
||||
let zip_buf = Vec::new();
|
||||
let cursor = Cursor::new(zip_buf);
|
||||
let mut zip = zip::ZipWriter::new(cursor);
|
||||
|
||||
for entry_path_buf in walk_dir(&path).await? {
|
||||
let entry_path = entry_path_buf.as_path();
|
||||
let entry_str = entry_path
|
||||
.strip_prefix(&path)?
|
||||
.to_str()
|
||||
.context("Failed to get directory entry name")?;
|
||||
|
||||
if entry_path.is_dir() {
|
||||
zip.add_directory(entry_str, options)?;
|
||||
continue;
|
||||
tokio::task::spawn(async move {
|
||||
while let Ok(v) = sync_rx.recv() {
|
||||
let _ = tx.send(v).await;
|
||||
}
|
||||
});
|
||||
tokio::task::spawn(create_zip(path, sync_tx));
|
||||
|
||||
zip.start_file(entry_str, options)?;
|
||||
let mut entry_file = tokio::fs::File::open(entry_path).await?;
|
||||
entry_file.read_to_end(&mut file_buf).await?;
|
||||
zip.write_all(&file_buf)?;
|
||||
file_buf.clear();
|
||||
}
|
||||
|
||||
let mut cursor = zip.finish()?;
|
||||
|
||||
cursor.set_position(0);
|
||||
let stream = FileStream::new(ReaderStream::new(cursor));
|
||||
let stream = FileStream::new(ReceiverStream::new(rx));
|
||||
|
||||
return Ok(stream);
|
||||
}
|
||||
@@ -479,3 +460,59 @@ where
|
||||
|
||||
Ok(files)
|
||||
}
|
||||
|
||||
async fn create_zip<P>(
|
||||
path: P,
|
||||
tx: std::sync::mpsc::Sender<Result<bytes::Bytes, std::io::Error>>,
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
{
|
||||
let options = zip::write::SimpleFileOptions::default()
|
||||
.compression_method(zip::CompressionMethod::Deflated)
|
||||
.unix_permissions(0o644);
|
||||
|
||||
let mut file_buf = Vec::new();
|
||||
let mut zip = zip::write::ZipWriter::new_stream(ChannelWriter(tx));
|
||||
|
||||
for entry_path_buf in walk_dir(&path).await? {
|
||||
let entry_path = entry_path_buf.as_path();
|
||||
let entry_str = entry_path
|
||||
.strip_prefix(&path)?
|
||||
.to_str()
|
||||
.context("Failed to get directory entry name")?;
|
||||
|
||||
if entry_path.is_dir() {
|
||||
zip.add_directory(entry_str, options)?;
|
||||
continue;
|
||||
}
|
||||
|
||||
zip.start_file(entry_str, options)?;
|
||||
let mut entry_file = tokio::fs::File::open(entry_path).await?;
|
||||
entry_file.read_to_end(&mut file_buf).await?;
|
||||
zip.write_all(&file_buf)?;
|
||||
file_buf.clear();
|
||||
}
|
||||
|
||||
drop(zip.finish()?);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct ChannelWriter(std::sync::mpsc::Sender<Result<bytes::Bytes, std::io::Error>>);
|
||||
|
||||
impl Write for ChannelWriter {
|
||||
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||
let len = buf.len();
|
||||
let data = bytes::Bytes::copy_from_slice(&buf[..(len)]);
|
||||
|
||||
self.0
|
||||
.send(Ok(data))
|
||||
.map(|_| len)
|
||||
.map_err(|_| std::io::ErrorKind::Other.into())
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> std::io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user