basic selection + download multiple files with selection
This commit is contained in:
@@ -2,6 +2,7 @@ use anyhow::{Context as _, anyhow, bail};
|
||||
use futures_util::TryStreamExt;
|
||||
use rustix::fs::{Statx, statx};
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
io::Write,
|
||||
os::unix::fs::MetadataExt,
|
||||
path::{Path, PathBuf},
|
||||
@@ -226,46 +227,79 @@ impl FileSystem {
|
||||
Ok(paths)
|
||||
}
|
||||
|
||||
async fn cat(&self, path: &AbsoluteFilePath) -> anyhow::Result<FileStream> {
|
||||
let path = self.get_target_path(path);
|
||||
async fn cat(&self, paths: &Vec<AbsoluteFilePath>) -> anyhow::Result<FileStream> {
|
||||
let paths: Vec<FilePath> = paths
|
||||
.into_iter()
|
||||
.map(|path| self.get_target_path(path))
|
||||
.collect();
|
||||
|
||||
let file = fs::OpenOptions::new()
|
||||
.create(false)
|
||||
.write(false)
|
||||
.read(true)
|
||||
.open(&path)
|
||||
.await?;
|
||||
|
||||
let metadata = file.metadata().await?;
|
||||
|
||||
if metadata.is_dir() {
|
||||
drop(file);
|
||||
let path_request = PathRequest::from_paths(paths)?;
|
||||
|
||||
fn build_zip_stream(
|
||||
prefix: String,
|
||||
paths: Vec<FilePath>,
|
||||
buffer_size: usize,
|
||||
) -> FileStream {
|
||||
let (sync_tx, sync_rx) =
|
||||
std::sync::mpsc::channel::<Result<bytes::Bytes, std::io::Error>>();
|
||||
let (tx, rx) = tokio::sync::mpsc::channel::<Result<bytes::Bytes, std::io::Error>>(1024);
|
||||
|
||||
tokio::task::spawn(create_zip(path, sync_tx, self.zip_read_buffer_bytes));
|
||||
tokio::task::spawn(create_zip(
|
||||
prefix,
|
||||
paths
|
||||
.into_iter()
|
||||
.map(|p| PathBuf::from(p.as_str()))
|
||||
.collect(),
|
||||
sync_tx,
|
||||
buffer_size,
|
||||
));
|
||||
tokio::task::spawn(async move {
|
||||
while let Ok(v) = sync_rx.recv() {
|
||||
let _ = tx.send(v).await;
|
||||
}
|
||||
});
|
||||
|
||||
let stream = FileStream::new(FileType::Directory, ReceiverStream::new(rx));
|
||||
|
||||
return Ok(stream);
|
||||
FileStream::new(FileType::Directory, ReceiverStream::new(rx))
|
||||
}
|
||||
|
||||
let file_size = metadata.size();
|
||||
match path_request {
|
||||
PathRequest::Single(file_path) => {
|
||||
let file = fs::OpenOptions::new()
|
||||
.create(false)
|
||||
.write(false)
|
||||
.read(true)
|
||||
.open(&file_path)
|
||||
.await?;
|
||||
|
||||
if file_size > self.max_file_fetch_bytes {
|
||||
bail!("File size exceeds configured limit");
|
||||
let metadata = file.metadata().await?;
|
||||
|
||||
if metadata.is_dir() {
|
||||
drop(file);
|
||||
|
||||
return Ok(build_zip_stream(
|
||||
file_path.to_string(),
|
||||
vec![file_path],
|
||||
self.zip_read_buffer_bytes,
|
||||
));
|
||||
}
|
||||
|
||||
if metadata.size() > self.max_file_fetch_bytes {
|
||||
bail!("File size exceeds configured limit");
|
||||
}
|
||||
|
||||
let stream = FileStream::new(FileType::File, ReaderStream::new(file));
|
||||
|
||||
Ok(stream)
|
||||
}
|
||||
PathRequest::Multiple {
|
||||
lowest_common_prefix,
|
||||
paths,
|
||||
} => Ok(build_zip_stream(
|
||||
lowest_common_prefix,
|
||||
paths,
|
||||
self.zip_read_buffer_bytes,
|
||||
)),
|
||||
}
|
||||
|
||||
let stream = FileStream::new(FileType::File, ReaderStream::new(file));
|
||||
|
||||
Ok(stream)
|
||||
}
|
||||
|
||||
async fn mv(&self, path: &AbsoluteFilePath, target_path: &AbsoluteFilePath) -> io::Result<()> {
|
||||
@@ -364,9 +398,9 @@ impl FileSystemRepository for FileSystem {
|
||||
}
|
||||
|
||||
async fn cat(&self, request: CatRequest) -> Result<FileStream, CatError> {
|
||||
self.cat(request.path())
|
||||
.await
|
||||
.map_err(|e| anyhow!("Failed to fetch file {}: {e:?}", request.path()).into())
|
||||
self.cat(request.paths().paths()).await.map_err(|e| {
|
||||
anyhow!("Failed to fetch files {:?}: {e:?}", request.paths().paths()).into()
|
||||
})
|
||||
}
|
||||
|
||||
async fn mkdir(&self, request: MkdirRequest) -> Result<(), MkdirError> {
|
||||
@@ -462,16 +496,23 @@ where
|
||||
let mut files = vec![];
|
||||
|
||||
while !dirs.is_empty() {
|
||||
let mut dir_iter = tokio::fs::read_dir(dirs.remove(0)).await?;
|
||||
let path = dirs.remove(0);
|
||||
match tokio::fs::read_dir(path.clone()).await {
|
||||
Ok(mut dir_iter) => {
|
||||
while let Some(entry) = dir_iter.next_entry().await? {
|
||||
let entry_path_buf = entry.path();
|
||||
|
||||
while let Some(entry) = dir_iter.next_entry().await? {
|
||||
let entry_path_buf = entry.path();
|
||||
|
||||
if entry_path_buf.is_dir() {
|
||||
dirs.push(entry_path_buf);
|
||||
} else {
|
||||
files.push(entry_path_buf);
|
||||
if entry_path_buf.is_dir() {
|
||||
dirs.push(entry_path_buf);
|
||||
} else {
|
||||
files.push(entry_path_buf);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => match e.kind() {
|
||||
std::io::ErrorKind::NotADirectory => files.push(path),
|
||||
_ => return Err(e),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -483,14 +524,12 @@ where
|
||||
/// * `path`: The directory's path
|
||||
/// * `tx`: The sender for the new ZIP archive's bytes
|
||||
/// * `buffer_size`: The size of the file read buffer. A large buffer increases both the speed and the memory usage
|
||||
async fn create_zip<P>(
|
||||
path: P,
|
||||
async fn create_zip(
|
||||
prefix: String,
|
||||
paths: Vec<PathBuf>,
|
||||
tx: std::sync::mpsc::Sender<Result<bytes::Bytes, std::io::Error>>,
|
||||
buffer_size: usize,
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
{
|
||||
) -> anyhow::Result<()> {
|
||||
let options = zip::write::SimpleFileOptions::default()
|
||||
.compression_method(zip::CompressionMethod::Stored)
|
||||
.compression_level(None)
|
||||
@@ -500,12 +539,15 @@ where
|
||||
let mut file_buf = vec![0; buffer_size];
|
||||
let mut zip = zip::write::ZipWriter::new_stream(ChannelWriter(tx));
|
||||
|
||||
let entries = walk_dir(&path).await?;
|
||||
let mut entries = Vec::new();
|
||||
for path in &paths {
|
||||
entries.append(&mut walk_dir(&path).await?);
|
||||
}
|
||||
|
||||
for entry_path_buf in entries {
|
||||
let entry_path = entry_path_buf.as_path();
|
||||
let entry_str = entry_path
|
||||
.strip_prefix(&path)?
|
||||
.strip_prefix(&prefix)?
|
||||
.to_str()
|
||||
.context("Failed to get directory entry name")?;
|
||||
|
||||
@@ -546,3 +588,47 @@ impl Write for ChannelWriter {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
enum PathRequest {
|
||||
Single(FilePath),
|
||||
Multiple {
|
||||
lowest_common_prefix: String,
|
||||
paths: Vec<FilePath>,
|
||||
},
|
||||
}
|
||||
|
||||
impl PathRequest {
|
||||
fn from_paths(paths: Vec<FilePath>) -> anyhow::Result<Self> {
|
||||
let mut input_paths: Vec<FilePath> = HashSet::<FilePath>::from_iter(paths.into_iter())
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
if input_paths.len() == 1 {
|
||||
return Ok(Self::Single(input_paths.pop().unwrap()));
|
||||
}
|
||||
|
||||
let mut lowest_common_prefix = input_paths
|
||||
.first()
|
||||
.expect("paths to contain at least 1 entry")
|
||||
.to_string();
|
||||
|
||||
for path in &input_paths {
|
||||
let chars = lowest_common_prefix
|
||||
.chars()
|
||||
.zip(path.as_str().chars())
|
||||
.enumerate();
|
||||
|
||||
for (index, (a, b)) in chars {
|
||||
if a != b {
|
||||
lowest_common_prefix = lowest_common_prefix[..index].to_string();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Self::Multiple {
|
||||
lowest_common_prefix,
|
||||
paths: input_paths,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,7 @@ use crate::domain::{
|
||||
warren::{
|
||||
models::{
|
||||
auth_session::requests::FetchAuthSessionResponse,
|
||||
file::{AbsoluteFilePath, LsResponse},
|
||||
file::{AbsoluteFilePath, AbsoluteFilePathList, LsResponse},
|
||||
share::{
|
||||
CreateShareResponse, DeleteShareResponse, GetShareResponse, ListSharesResponse,
|
||||
ShareCatResponse, ShareLsResponse,
|
||||
@@ -58,10 +58,10 @@ impl WarrenNotifier for NotifierDebugLogger {
|
||||
tracing::debug!("[Notifier] Fetched warren {}", warren.name());
|
||||
}
|
||||
|
||||
async fn warren_cat(&self, warren: &Warren, path: &AbsoluteFilePath) {
|
||||
async fn warren_cat(&self, warren: &Warren, paths: &AbsoluteFilePathList) {
|
||||
tracing::debug!(
|
||||
"[Notifier] Fetched file {} in warren {}",
|
||||
path,
|
||||
"[Notifier] Fetched {} file(s) in warren {}",
|
||||
paths.paths().len(),
|
||||
warren.name(),
|
||||
);
|
||||
}
|
||||
@@ -165,8 +165,8 @@ impl WarrenNotifier for NotifierDebugLogger {
|
||||
|
||||
async fn warren_share_cat(&self, response: &ShareCatResponse) {
|
||||
tracing::debug!(
|
||||
"[Notifier] Fetched file {} from share {}",
|
||||
response.path(),
|
||||
"[Notifier] Fetched {} file(s) from share {}",
|
||||
response.paths().paths().len(),
|
||||
response.share().id(),
|
||||
);
|
||||
}
|
||||
@@ -177,8 +177,8 @@ impl FileSystemNotifier for NotifierDebugLogger {
|
||||
tracing::debug!("[Notifier] Listed {} file(s)", response.files().len());
|
||||
}
|
||||
|
||||
async fn cat(&self, path: &AbsoluteFilePath) {
|
||||
tracing::debug!("[Notifier] Fetched file {path}");
|
||||
async fn cat(&self, paths: &AbsoluteFilePathList) {
|
||||
tracing::debug!("[Notifier] Fetched {} file(s)", paths.paths().len());
|
||||
}
|
||||
|
||||
async fn mkdir(&self, path: &AbsoluteFilePath) {
|
||||
@@ -356,10 +356,11 @@ impl AuthNotifier for NotifierDebugLogger {
|
||||
);
|
||||
}
|
||||
|
||||
async fn auth_warren_cat(&self, user: &User, warren_id: &Uuid, path: &AbsoluteFilePath) {
|
||||
async fn auth_warren_cat(&self, user: &User, warren_id: &Uuid, paths: &AbsoluteFilePathList) {
|
||||
tracing::debug!(
|
||||
"[Notifier] User {} fetched file {path} in warren {warren_id}",
|
||||
"[Notifier] User {} fetched {} file(s) in warren {warren_id}",
|
||||
user.id(),
|
||||
paths.paths().len(),
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user