improve zip memory usage
This commit is contained in:
@@ -30,30 +30,46 @@ use crate::{
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
const MAX_FILE_FETCH_BYTES: &str = "MAX_FILE_FETCH_BYTES";
|
const MAX_FILE_FETCH_BYTES_KEY: &str = "MAX_FILE_FETCH_BYTES";
|
||||||
|
const ZIP_READ_BUFFER_BYTES_KEY: &str = "ZIP_READ_BUFFER_BYTES";
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct FileSystemConfig {
|
pub struct FileSystemConfig {
|
||||||
base_directory: String,
|
base_directory: String,
|
||||||
max_file_fetch_bytes: u64,
|
max_file_fetch_bytes: u64,
|
||||||
|
zip_read_buffer_bytes: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FileSystemConfig {
|
impl FileSystemConfig {
|
||||||
pub fn new(base_directory: String, max_file_fetch_bytes: u64) -> Self {
|
pub fn new(
|
||||||
|
base_directory: String,
|
||||||
|
max_file_fetch_bytes: u64,
|
||||||
|
zip_read_buffer_bytes: usize,
|
||||||
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
base_directory,
|
base_directory,
|
||||||
max_file_fetch_bytes,
|
max_file_fetch_bytes,
|
||||||
|
zip_read_buffer_bytes,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn from_env(serve_dir: String) -> anyhow::Result<Self> {
|
pub fn from_env(serve_dir: String) -> anyhow::Result<Self> {
|
||||||
// 268435456 bytes = 0.25GB
|
// 268435456 bytes = 0.25GB
|
||||||
let max_file_fetch_bytes: u64 = match Config::load_env(MAX_FILE_FETCH_BYTES) {
|
let max_file_fetch_bytes: u64 = match Config::load_env(MAX_FILE_FETCH_BYTES_KEY) {
|
||||||
Ok(value) => value.parse()?,
|
Ok(value) => value.parse()?,
|
||||||
Err(_) => 268435456,
|
Err(_) => 268435456,
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(Self::new(serve_dir, max_file_fetch_bytes))
|
let zip_read_buffer_bytes: usize = match Config::load_env(ZIP_READ_BUFFER_BYTES_KEY) {
|
||||||
|
Ok(value) => value.parse()?,
|
||||||
|
Err(_) => 4096,
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Self::new(
|
||||||
|
serve_dir,
|
||||||
|
max_file_fetch_bytes,
|
||||||
|
zip_read_buffer_bytes,
|
||||||
|
))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -61,6 +77,7 @@ impl FileSystemConfig {
|
|||||||
pub struct FileSystem {
|
pub struct FileSystem {
|
||||||
base_directory: FilePath,
|
base_directory: FilePath,
|
||||||
max_file_fetch_bytes: u64,
|
max_file_fetch_bytes: u64,
|
||||||
|
zip_read_buffer_bytes: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FileSystem {
|
impl FileSystem {
|
||||||
@@ -68,6 +85,7 @@ impl FileSystem {
|
|||||||
let file_system = Self {
|
let file_system = Self {
|
||||||
base_directory: FilePath::new(&config.base_directory)?,
|
base_directory: FilePath::new(&config.base_directory)?,
|
||||||
max_file_fetch_bytes: config.max_file_fetch_bytes,
|
max_file_fetch_bytes: config.max_file_fetch_bytes,
|
||||||
|
zip_read_buffer_bytes: config.zip_read_buffer_bytes,
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(file_system)
|
Ok(file_system)
|
||||||
@@ -227,7 +245,7 @@ impl FileSystem {
|
|||||||
std::sync::mpsc::channel::<Result<bytes::Bytes, std::io::Error>>();
|
std::sync::mpsc::channel::<Result<bytes::Bytes, std::io::Error>>();
|
||||||
let (tx, rx) = tokio::sync::mpsc::channel::<Result<bytes::Bytes, std::io::Error>>(1024);
|
let (tx, rx) = tokio::sync::mpsc::channel::<Result<bytes::Bytes, std::io::Error>>(1024);
|
||||||
|
|
||||||
tokio::task::spawn(create_zip(path, sync_tx));
|
tokio::task::spawn(create_zip(path, sync_tx, self.zip_read_buffer_bytes));
|
||||||
tokio::task::spawn(async move {
|
tokio::task::spawn(async move {
|
||||||
while let Ok(v) = sync_rx.recv() {
|
while let Ok(v) = sync_rx.recv() {
|
||||||
let _ = tx.send(v).await;
|
let _ = tx.send(v).await;
|
||||||
@@ -460,9 +478,15 @@ where
|
|||||||
Ok(files)
|
Ok(files)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Creates a ZIP archive from a directory
|
||||||
|
///
|
||||||
|
/// * `path`: The directory's path
|
||||||
|
/// * `tx`: The sender for the new ZIP archive's bytes
|
||||||
|
/// * `buffer_size`: The size of the file read buffer. A large buffer increases both the speed and the memory usage
|
||||||
async fn create_zip<P>(
|
async fn create_zip<P>(
|
||||||
path: P,
|
path: P,
|
||||||
tx: std::sync::mpsc::Sender<Result<bytes::Bytes, std::io::Error>>,
|
tx: std::sync::mpsc::Sender<Result<bytes::Bytes, std::io::Error>>,
|
||||||
|
buffer_size: usize,
|
||||||
) -> anyhow::Result<()>
|
) -> anyhow::Result<()>
|
||||||
where
|
where
|
||||||
P: AsRef<Path>,
|
P: AsRef<Path>,
|
||||||
@@ -473,7 +497,7 @@ where
|
|||||||
.large_file(true)
|
.large_file(true)
|
||||||
.unix_permissions(0o644);
|
.unix_permissions(0o644);
|
||||||
|
|
||||||
let mut file_buf = Vec::new();
|
let mut file_buf = vec![0; buffer_size];
|
||||||
let mut zip = zip::write::ZipWriter::new_stream(ChannelWriter(tx));
|
let mut zip = zip::write::ZipWriter::new_stream(ChannelWriter(tx));
|
||||||
|
|
||||||
let entries = walk_dir(&path).await?;
|
let entries = walk_dir(&path).await?;
|
||||||
@@ -493,10 +517,11 @@ where
|
|||||||
zip.start_file(entry_str, options)?;
|
zip.start_file(entry_str, options)?;
|
||||||
let mut entry_file = tokio::fs::File::open(entry_path).await?;
|
let mut entry_file = tokio::fs::File::open(entry_path).await?;
|
||||||
|
|
||||||
entry_file.read_to_end(&mut file_buf).await?;
|
while let Ok(len) = entry_file.read(&mut file_buf).await
|
||||||
|
&& len > 0
|
||||||
zip.write_all(&file_buf)?;
|
{
|
||||||
file_buf.clear();
|
zip.write(&file_buf[..len])?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
zip.finish()?;
|
zip.finish()?;
|
||||||
|
|||||||
Reference in New Issue
Block a user