directory downloads (zipped)

This commit is contained in:
2025-08-29 21:20:44 +02:00
parent 76713db985
commit 3498a2926c
11 changed files with 368 additions and 47 deletions

View File

@@ -1,6 +1,7 @@
mod requests;
use bytes::Bytes;
use futures_util::Stream;
pub use requests::*;
use tokio_util::io::ReaderStream;
use std::path::Path;
@@ -263,20 +264,25 @@ impl From<AbsoluteFilePath> for FilePath {
}
}
#[derive(Debug)]
pub struct FileStream(ReaderStream<tokio::fs::File>);
pub type FileStreamInner =
Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + Unpin + 'static>;
pub struct FileStream(FileStreamInner);
impl FileStream {
pub fn new(stream: ReaderStream<tokio::fs::File>) -> Self {
Self(stream)
pub fn new<S>(stream: S) -> Self
where
S: Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + Unpin + 'static,
{
Self(Box::new(stream))
}
pub fn stream(&self) -> &ReaderStream<tokio::fs::File> {
pub fn stream(&self) -> &FileStreamInner {
&self.0
}
}
impl From<FileStream> for ReaderStream<tokio::fs::File> {
impl From<FileStream> for FileStreamInner {
fn from(value: FileStream) -> Self {
value.0
}

View File

@@ -50,7 +50,6 @@ impl From<&ShareCatRequest> for VerifySharePasswordRequest {
}
}
#[derive(Debug)]
pub struct ShareCatResponse {
share: Share,
warren: Warren,

View File

@@ -74,9 +74,9 @@ where
return Ok(Self(None));
};
SharePassword::new(cookie.value())
Ok(SharePassword::new(cookie.value())
.map(|v| Self(Some(v)))
.map_err(|_| ApiError::BadRequest("Invalid password".to_string()))
.unwrap_or(Self(None)))
}
// Debug build
else {

View File

@@ -16,6 +16,7 @@ mod warren_rm;
use axum::{
Router,
body::Body,
extract::DefaultBodyLimit,
routing::{get, post},
};
@@ -41,7 +42,7 @@ use warren_mv::warren_mv;
use crate::{
domain::warren::{
models::file::{File, FileMimeType, FileType},
models::file::{File, FileMimeType, FileStream, FileStreamInner, FileType},
ports::{AuthService, WarrenService},
},
inbound::http::AppState,
@@ -67,6 +68,13 @@ impl From<&File> for WarrenFileElement {
}
}
impl From<FileStream> for Body {
fn from(value: FileStream) -> Self {
let inner: FileStreamInner = value.into();
Body::from_stream(inner)
}
}
pub fn routes<WS: WarrenService, AS: AuthService>() -> Router<AppState<WS, AS>> {
Router::new()
.route("/", get(list_warrens))

View File

@@ -4,14 +4,13 @@ use axum::{
};
use serde::Deserialize;
use thiserror::Error;
use tokio_util::io::ReaderStream;
use uuid::Uuid;
use crate::{
domain::warren::{
models::{
auth_session::AuthRequest,
file::{AbsoluteFilePathError, CatRequest, FilePath, FilePathError, FileStream},
file::{AbsoluteFilePathError, CatRequest, FilePath, FilePathError},
warren::WarrenCatRequest,
},
ports::{AuthService, WarrenService},
@@ -59,12 +58,6 @@ impl WarrenCatHttpRequestBody {
}
}
impl From<FileStream> for Body {
fn from(value: FileStream) -> Self {
Body::from_stream::<ReaderStream<tokio::fs::File>>(value.into())
}
}
pub async fn fetch_file<WS: WarrenService, AS: AuthService>(
State(state): State<AppState<WS, AS>>,
SessionIdHeader(session): SessionIdHeader,

View File

@@ -1,11 +1,15 @@
use std::{os::unix::fs::MetadataExt, path::PathBuf};
use std::{
io::{Cursor, Write},
os::unix::fs::MetadataExt,
path::{Path, PathBuf},
};
use anyhow::{Context, anyhow, bail};
use futures_util::TryStreamExt;
use rustix::fs::{Statx, statx};
use tokio::{
fs,
io::{self, AsyncWriteExt as _},
fs::{self},
io::{self, AsyncReadExt as _, AsyncWriteExt as _},
};
use tokio_util::io::ReaderStream;
@@ -214,7 +218,48 @@ impl FileSystem {
.open(&path)
.await?;
let file_size = file.metadata().await?.size();
let metadata = file.metadata().await?;
if metadata.is_dir() {
drop(file);
let options = zip::write::SimpleFileOptions::default()
.compression_method(zip::CompressionMethod::Zstd)
.unix_permissions(0o755);
let mut file_buf = Vec::new();
let zip_buf = Vec::new();
let cursor = Cursor::new(zip_buf);
let mut zip = zip::ZipWriter::new(cursor);
for entry_path_buf in walk_dir(&path).await? {
let entry_path = entry_path_buf.as_path();
let entry_str = entry_path
.strip_prefix(&path)?
.to_str()
.context("Failed to get directory entry name")?;
if entry_path.is_dir() {
zip.add_directory(entry_str, options)?;
continue;
}
zip.start_file(entry_str, options)?;
let mut entry_file = tokio::fs::File::open(entry_path).await?;
entry_file.read_to_end(&mut file_buf).await?;
zip.write_all(&file_buf)?;
file_buf.clear();
}
let mut cursor = zip.finish()?;
cursor.set_position(0);
let stream = FileStream::new(ReaderStream::new(cursor));
return Ok(stream);
}
let file_size = metadata.size();
if file_size > self.max_file_fetch_bytes {
bail!("File size exceeds configured limit");
@@ -410,3 +455,27 @@ where
)
}
}
async fn walk_dir<P>(dir: P) -> Result<Vec<PathBuf>, tokio::io::Error>
where
P: AsRef<Path>,
{
let mut dirs = vec![dir.as_ref().to_owned()];
let mut files = vec![];
while !dirs.is_empty() {
let mut dir_iter = tokio::fs::read_dir(dirs.remove(0)).await?;
while let Some(entry) = dir_iter.next_entry().await? {
let entry_path_buf = entry.path();
if entry_path_buf.is_dir() {
dirs.push(entry_path_buf);
} else {
files.push(entry_path_buf);
}
}
}
Ok(files)
}