664 lines
20 KiB
Rust
664 lines
20 KiB
Rust
use anyhow::{Context as _, anyhow, bail};
|
|
use futures_util::{TryStreamExt, future::join_all};
|
|
use rustix::fs::{Statx, statx};
|
|
use std::{
|
|
collections::HashSet,
|
|
io::Write,
|
|
os::unix::fs::MetadataExt,
|
|
path::{Path, PathBuf},
|
|
};
|
|
use tokio::{
|
|
fs::{self},
|
|
io::{self, AsyncReadExt as _, AsyncWriteExt as _},
|
|
};
|
|
use tokio_stream::wrappers::ReceiverStream;
|
|
use tokio_util::io::ReaderStream;
|
|
|
|
use crate::{
|
|
config::Config,
|
|
domain::warren::{
|
|
models::{
|
|
file::{
|
|
AbsoluteFilePath, CatError, CatRequest, CpError, CpRequest, CpResponse, File,
|
|
FileMimeType, FileName, FilePath, FileStream, FileType, LsError, LsRequest,
|
|
LsResponse, MkdirError, MkdirRequest, MvError, MvRequest, RelativeFilePath,
|
|
RmError, RmRequest, SaveError, SaveRequest, SaveResponse, StatError, StatRequest,
|
|
StatResponse, TouchError, TouchRequest,
|
|
},
|
|
warren::UploadFileStream,
|
|
},
|
|
ports::FileSystemRepository,
|
|
},
|
|
};
|
|
|
|
const MAX_FILE_FETCH_BYTES_KEY: &str = "MAX_FILE_FETCH_BYTES";
|
|
const ZIP_READ_BUFFER_BYTES_KEY: &str = "ZIP_READ_BUFFER_BYTES";
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct FileSystemConfig {
|
|
base_directory: String,
|
|
max_file_fetch_bytes: u64,
|
|
zip_read_buffer_bytes: usize,
|
|
}
|
|
|
|
impl FileSystemConfig {
|
|
pub fn new(
|
|
base_directory: String,
|
|
max_file_fetch_bytes: u64,
|
|
zip_read_buffer_bytes: usize,
|
|
) -> Self {
|
|
Self {
|
|
base_directory,
|
|
max_file_fetch_bytes,
|
|
zip_read_buffer_bytes,
|
|
}
|
|
}
|
|
|
|
pub fn from_env(serve_dir: String) -> anyhow::Result<Self> {
|
|
// 268435456 bytes = 0.25GB
|
|
let max_file_fetch_bytes: u64 = match Config::load_env(MAX_FILE_FETCH_BYTES_KEY) {
|
|
Ok(value) => value.parse()?,
|
|
Err(_) => 268435456,
|
|
};
|
|
|
|
let zip_read_buffer_bytes: usize = match Config::load_env(ZIP_READ_BUFFER_BYTES_KEY) {
|
|
Ok(value) => value.parse()?,
|
|
Err(_) => 4096,
|
|
};
|
|
|
|
Ok(Self::new(
|
|
serve_dir,
|
|
max_file_fetch_bytes,
|
|
zip_read_buffer_bytes,
|
|
))
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct FileSystem {
|
|
base_directory: FilePath,
|
|
max_file_fetch_bytes: u64,
|
|
zip_read_buffer_bytes: usize,
|
|
}
|
|
|
|
impl FileSystem {
|
|
pub fn new(config: FileSystemConfig) -> anyhow::Result<Self> {
|
|
let file_system = Self {
|
|
base_directory: FilePath::new(&config.base_directory)?,
|
|
max_file_fetch_bytes: config.max_file_fetch_bytes,
|
|
zip_read_buffer_bytes: config.zip_read_buffer_bytes,
|
|
};
|
|
|
|
Ok(file_system)
|
|
}
|
|
|
|
/// Combines `self.base_directory` with the specified path
|
|
///
|
|
/// * `path`: The absolute path (absolute in relation to the base directory)
|
|
fn get_target_path(&self, path: &AbsoluteFilePath) -> FilePath {
|
|
self.base_directory.join(&path.as_relative())
|
|
}
|
|
|
|
async fn get_all_files(
|
|
&self,
|
|
absolute_path: &AbsoluteFilePath,
|
|
include_parent: bool,
|
|
) -> anyhow::Result<LsResponse> {
|
|
let dir_path = self.get_target_path(absolute_path).as_ref().to_path_buf();
|
|
|
|
let mut files = Vec::new();
|
|
|
|
let parent = if include_parent {
|
|
let dir_name = FileName::new(
|
|
&dir_path
|
|
.file_name()
|
|
.context("Failed to get directory name")?
|
|
.to_owned()
|
|
.into_string()
|
|
.ok()
|
|
.context("Failed to get directory name")?,
|
|
)?;
|
|
|
|
Some(File::new(
|
|
dir_name,
|
|
FileType::Directory,
|
|
None,
|
|
get_btime(&dir_path),
|
|
))
|
|
} else {
|
|
None
|
|
};
|
|
|
|
let mut dir = fs::read_dir(&dir_path).await?;
|
|
|
|
while let Ok(Some(entry)) = dir.next_entry().await {
|
|
let name = entry
|
|
.file_name()
|
|
.into_string()
|
|
.ok()
|
|
.context("Failed to get file name")?;
|
|
let file_type = {
|
|
let file_type = entry.file_type().await?;
|
|
|
|
if file_type.is_dir() {
|
|
FileType::Directory
|
|
} else if file_type.is_file() {
|
|
FileType::File
|
|
} else {
|
|
continue;
|
|
}
|
|
};
|
|
|
|
let created_at = get_btime(entry.path());
|
|
|
|
let mime_type = match file_type {
|
|
FileType::File => FileMimeType::from_name(&name),
|
|
_ => None,
|
|
};
|
|
|
|
files.push(File::new(
|
|
FileName::new(&name)?,
|
|
file_type,
|
|
mime_type,
|
|
created_at,
|
|
));
|
|
}
|
|
|
|
Ok(LsResponse::new(files, parent))
|
|
}
|
|
|
|
/// Actually created a directory in the underlying file system
|
|
///
|
|
/// * `path`: The directory's absolute path (absolute not in relation to the root file system but `self.base_directory`)
|
|
async fn mkdir(&self, path: &AbsoluteFilePath) -> io::Result<()> {
|
|
let file_path = self.get_target_path(path);
|
|
|
|
if fs::try_exists(&file_path).await? {
|
|
return Err(io::ErrorKind::AlreadyExists.into());
|
|
}
|
|
|
|
fs::create_dir(&file_path).await
|
|
}
|
|
|
|
/// Actually removes a file or directory from the underlying file system
|
|
///
|
|
/// * `path`: The file's absolute path (absolute not in relation to the root file system but `self.base_directory`)
|
|
/// * `force`: Whether to delete directories that are not empty
|
|
async fn rm(&self, path: &AbsoluteFilePath, force: bool) -> io::Result<()> {
|
|
let file_path = self.get_target_path(&path);
|
|
|
|
if fs::metadata(&file_path).await?.is_file() {
|
|
return fs::remove_file(&file_path).await;
|
|
}
|
|
|
|
if force {
|
|
fs::remove_dir_all(&file_path).await
|
|
} else {
|
|
fs::remove_dir(&file_path).await
|
|
}
|
|
}
|
|
|
|
async fn save(
|
|
&self,
|
|
path: &AbsoluteFilePath,
|
|
stream: &mut UploadFileStream<'_>,
|
|
) -> anyhow::Result<Vec<AbsoluteFilePath>> {
|
|
let base_path = self.get_target_path(path);
|
|
|
|
let mut paths = Vec::new();
|
|
|
|
while let Ok(Some(mut upload_file)) = stream.try_next().await {
|
|
let file_name_as_path = RelativeFilePath::from_str(upload_file.file_name().as_str())?;
|
|
let file_path = base_path.join(&file_name_as_path);
|
|
|
|
let mut file = fs::OpenOptions::new()
|
|
.write(true)
|
|
.create(true)
|
|
.open(&file_path)
|
|
.await?;
|
|
|
|
while let Ok(Some(chunk)) = upload_file.try_next().await {
|
|
file.write(&chunk).await?;
|
|
}
|
|
|
|
paths.push(path.clone().join(&file_name_as_path));
|
|
}
|
|
|
|
Ok(paths)
|
|
}
|
|
|
|
async fn cat(&self, paths: &Vec<AbsoluteFilePath>) -> anyhow::Result<FileStream> {
|
|
let paths: Vec<FilePath> = paths
|
|
.into_iter()
|
|
.map(|path| self.get_target_path(path))
|
|
.collect();
|
|
|
|
let path_request = PathRequest::from_paths(paths)?;
|
|
|
|
fn build_zip_stream(
|
|
prefix: String,
|
|
paths: Vec<FilePath>,
|
|
buffer_size: usize,
|
|
) -> FileStream {
|
|
let (sync_tx, sync_rx) =
|
|
std::sync::mpsc::channel::<Result<bytes::Bytes, std::io::Error>>();
|
|
let (tx, rx) = tokio::sync::mpsc::channel::<Result<bytes::Bytes, std::io::Error>>(1024);
|
|
|
|
tokio::task::spawn(create_zip(
|
|
prefix,
|
|
paths
|
|
.into_iter()
|
|
.map(|p| PathBuf::from(p.as_str()))
|
|
.collect(),
|
|
sync_tx,
|
|
buffer_size,
|
|
));
|
|
tokio::task::spawn(async move {
|
|
while let Ok(v) = sync_rx.recv() {
|
|
let _ = tx.send(v).await;
|
|
}
|
|
});
|
|
|
|
FileStream::new(FileType::Directory, ReceiverStream::new(rx))
|
|
}
|
|
|
|
match path_request {
|
|
PathRequest::Single(file_path) => {
|
|
let file = fs::OpenOptions::new()
|
|
.create(false)
|
|
.write(false)
|
|
.read(true)
|
|
.open(&file_path)
|
|
.await?;
|
|
|
|
let metadata = file.metadata().await?;
|
|
|
|
if metadata.is_dir() {
|
|
drop(file);
|
|
|
|
return Ok(build_zip_stream(
|
|
file_path.to_string(),
|
|
vec![file_path],
|
|
self.zip_read_buffer_bytes,
|
|
));
|
|
}
|
|
|
|
if metadata.size() > self.max_file_fetch_bytes {
|
|
bail!("File size exceeds configured limit");
|
|
}
|
|
|
|
let stream = FileStream::new(FileType::File, ReaderStream::new(file));
|
|
|
|
Ok(stream)
|
|
}
|
|
PathRequest::Multiple {
|
|
lowest_common_prefix,
|
|
paths,
|
|
} => Ok(build_zip_stream(
|
|
lowest_common_prefix,
|
|
paths,
|
|
self.zip_read_buffer_bytes,
|
|
)),
|
|
}
|
|
}
|
|
|
|
async fn mv(&self, path: &AbsoluteFilePath, target_path: &AbsoluteFilePath) -> io::Result<()> {
|
|
let current_path = self.get_target_path(path);
|
|
let target_path = self.get_target_path(target_path);
|
|
|
|
if !fs::try_exists(¤t_path).await? {
|
|
return Err(io::ErrorKind::NotFound.into());
|
|
}
|
|
|
|
if fs::try_exists(&target_path).await? {
|
|
return Err(io::ErrorKind::AlreadyExists.into());
|
|
}
|
|
|
|
fs::rename(current_path, &target_path).await
|
|
}
|
|
|
|
async fn touch(&self, path: &AbsoluteFilePath) -> io::Result<()> {
|
|
let path = self.get_target_path(path);
|
|
|
|
fs::OpenOptions::new()
|
|
.create(true)
|
|
.write(true)
|
|
.open(&path)
|
|
.await
|
|
.map(|_| ())
|
|
}
|
|
|
|
async fn cp(
|
|
&self,
|
|
path: AbsoluteFilePath,
|
|
target_path: AbsoluteFilePath,
|
|
) -> io::Result<CpResponse> {
|
|
let fs_current_path = self.get_target_path(&path);
|
|
let fs_target_path = self.get_target_path(&target_path);
|
|
|
|
fs::copy(fs_current_path, fs_target_path).await?;
|
|
|
|
Ok(CpResponse::new(path, target_path))
|
|
}
|
|
|
|
async fn stat(&self, path: AbsoluteFilePath) -> anyhow::Result<File> {
|
|
let target_path = self.get_target_path(&path);
|
|
let fs_path = PathBuf::from(target_path.to_string());
|
|
|
|
let metadata = fs::metadata(&fs_path).await?;
|
|
|
|
let name = {
|
|
let file_name = fs_path
|
|
.clone()
|
|
.file_name()
|
|
.context("Failed to get file name")?
|
|
.to_owned()
|
|
.into_string()
|
|
.ok()
|
|
.context("Failed to get file name")?;
|
|
FileName::new(&file_name)?
|
|
};
|
|
|
|
let file_type = {
|
|
let file_type = metadata.file_type();
|
|
|
|
if file_type.is_dir() {
|
|
FileType::Directory
|
|
} else if file_type.is_file() {
|
|
FileType::File
|
|
} else {
|
|
bail!("Invalid file type");
|
|
}
|
|
};
|
|
|
|
let mime_type = match file_type {
|
|
FileType::File => FileMimeType::from_name(name.as_str()),
|
|
_ => None,
|
|
};
|
|
|
|
let created_at = get_btime(&fs_path);
|
|
|
|
Ok(File::new(name, file_type, mime_type, created_at))
|
|
}
|
|
}
|
|
|
|
impl FileSystemRepository for FileSystem {
|
|
async fn ls(&self, request: LsRequest) -> Result<LsResponse, LsError> {
|
|
let files = self
|
|
.get_all_files(request.path(), request.include_parent())
|
|
.await
|
|
.map_err(|err| {
|
|
anyhow!(err).context(format!(
|
|
"Failed to get the files at path: {}",
|
|
request.path()
|
|
))
|
|
})?;
|
|
|
|
Ok(files)
|
|
}
|
|
|
|
async fn cat(&self, request: CatRequest) -> Result<FileStream, CatError> {
|
|
self.cat(request.paths().paths()).await.map_err(|e| {
|
|
anyhow!("Failed to fetch files {:?}: {e:?}", request.paths().paths()).into()
|
|
})
|
|
}
|
|
|
|
async fn mkdir(&self, request: MkdirRequest) -> Result<(), MkdirError> {
|
|
self.mkdir(request.path())
|
|
.await
|
|
.map_err(|e| match e.kind() {
|
|
std::io::ErrorKind::AlreadyExists => MkdirError::Exists,
|
|
_ => anyhow!("Failed to create directory at path: {}", request.path()).into(),
|
|
})
|
|
}
|
|
|
|
async fn rm(&self, request: RmRequest) -> Vec<Result<AbsoluteFilePath, RmError>> {
|
|
let force = request.force();
|
|
let paths: Vec<AbsoluteFilePath> = request.into_paths().into();
|
|
|
|
async fn _rm(
|
|
fs: &FileSystem,
|
|
path: AbsoluteFilePath,
|
|
force: bool,
|
|
) -> Result<AbsoluteFilePath, RmError> {
|
|
fs.rm(&path, force)
|
|
.await
|
|
.map(|_| path.clone())
|
|
.map_err(|e| match e.kind() {
|
|
std::io::ErrorKind::NotFound => RmError::NotFound(path),
|
|
std::io::ErrorKind::DirectoryNotEmpty => RmError::NotEmpty(path),
|
|
_ => anyhow!("Failed to delete file at {}: {e:?}", path).into(),
|
|
})
|
|
}
|
|
|
|
let results: Vec<Result<AbsoluteFilePath, RmError>> = join_all(
|
|
paths
|
|
.into_iter()
|
|
.map(|path| _rm(&self, path, force))
|
|
.collect::<Vec<_>>(),
|
|
)
|
|
.await;
|
|
|
|
results
|
|
}
|
|
|
|
async fn mv(&self, request: MvRequest) -> Result<(), MvError> {
|
|
self.mv(request.path(), request.target_path())
|
|
.await
|
|
.map_err(|e| match e.kind() {
|
|
std::io::ErrorKind::NotFound => MvError::NotFound,
|
|
_ => anyhow!(
|
|
"Failed to move {} to {}: {e:?}",
|
|
request.path(),
|
|
request.target_path()
|
|
)
|
|
.into(),
|
|
})
|
|
}
|
|
|
|
async fn touch(&self, request: TouchRequest) -> Result<(), TouchError> {
|
|
self.touch(request.path())
|
|
.await
|
|
.map_err(|e| match e.kind() {
|
|
std::io::ErrorKind::NotFound => TouchError::NotFound,
|
|
_ => anyhow!("Failed to touch path {}: {e:?}", request.path()).into(),
|
|
})
|
|
}
|
|
|
|
async fn save(&self, request: SaveRequest<'_>) -> Result<SaveResponse, SaveError> {
|
|
let (path, mut stream) = request.unpack();
|
|
Ok(self.save(&path, &mut stream).await.map(SaveResponse::new)?)
|
|
}
|
|
|
|
async fn cp(&self, request: CpRequest) -> Result<CpResponse, CpError> {
|
|
let (path, target_path) = request.into_paths();
|
|
self.cp(path, target_path)
|
|
.await
|
|
.map_err(|e| CpError::Unknown(e.into()))
|
|
}
|
|
|
|
async fn stat(&self, request: StatRequest) -> Result<StatResponse, StatError> {
|
|
let path = request.into_path();
|
|
Ok(self.stat(path).await.map(StatResponse::new)?)
|
|
}
|
|
}
|
|
|
|
// TODO: Use `DirEntry::metadata` once `target=x86_64-unknown-linux-musl` updates from musl 1.2.3 to 1.2.5
|
|
// https://github.com/rust-lang/rust/pull/142682
|
|
fn get_btime<P>(path: P) -> Option<u64>
|
|
where
|
|
P: rustix::path::Arg,
|
|
{
|
|
get_statx(path)
|
|
.ok()
|
|
.map(|statx| statx.stx_btime.tv_sec as u64)
|
|
}
|
|
|
|
fn get_statx<P>(path: P) -> rustix::io::Result<Statx>
|
|
where
|
|
P: rustix::path::Arg,
|
|
{
|
|
unsafe {
|
|
statx(
|
|
std::os::fd::BorrowedFd::borrow_raw(-100),
|
|
path,
|
|
rustix::fs::AtFlags::empty(),
|
|
rustix::fs::StatxFlags::BTIME,
|
|
)
|
|
}
|
|
}
|
|
|
|
async fn walk_dir<P>(dir: P) -> Result<Vec<PathBuf>, tokio::io::Error>
|
|
where
|
|
P: AsRef<Path>,
|
|
{
|
|
let mut dirs = vec![dir.as_ref().to_owned()];
|
|
let mut files = vec![];
|
|
|
|
while !dirs.is_empty() {
|
|
let path = dirs.remove(0);
|
|
match tokio::fs::read_dir(path.clone()).await {
|
|
Ok(mut dir_iter) => {
|
|
while let Some(entry) = dir_iter.next_entry().await? {
|
|
let entry_path_buf = entry.path();
|
|
|
|
if entry_path_buf.is_dir() {
|
|
dirs.push(entry_path_buf);
|
|
} else {
|
|
files.push(entry_path_buf);
|
|
}
|
|
}
|
|
}
|
|
Err(e) => match e.kind() {
|
|
std::io::ErrorKind::NotADirectory => files.push(path),
|
|
_ => return Err(e),
|
|
},
|
|
}
|
|
}
|
|
|
|
Ok(files)
|
|
}
|
|
|
|
/// Creates a ZIP archive from a directory
|
|
///
|
|
/// * `path`: The directory's path
|
|
/// * `tx`: The sender for the new ZIP archive's bytes
|
|
/// * `buffer_size`: The size of the file read buffer. A large buffer increases both the speed and the memory usage
|
|
async fn create_zip(
|
|
prefix: String,
|
|
paths: Vec<PathBuf>,
|
|
tx: std::sync::mpsc::Sender<Result<bytes::Bytes, std::io::Error>>,
|
|
buffer_size: usize,
|
|
) -> anyhow::Result<()> {
|
|
let options = zip::write::SimpleFileOptions::default()
|
|
.compression_method(zip::CompressionMethod::Stored)
|
|
.compression_level(None)
|
|
.large_file(true)
|
|
.unix_permissions(0o644);
|
|
|
|
let mut file_buf = vec![0; buffer_size];
|
|
let mut zip = zip::write::ZipWriter::new_stream(ChannelWriter(tx));
|
|
|
|
let mut entries = Vec::new();
|
|
for path in &paths {
|
|
entries.append(&mut walk_dir(&path).await?);
|
|
}
|
|
|
|
for entry_path_buf in entries {
|
|
let entry_path = entry_path_buf.as_path();
|
|
let entry_str = entry_path
|
|
.strip_prefix(&prefix)?
|
|
.to_str()
|
|
.context("Failed to get directory entry name")?;
|
|
|
|
if entry_path.is_dir() {
|
|
zip.add_directory(entry_str, options)?;
|
|
continue;
|
|
}
|
|
|
|
zip.start_file(entry_str, options)?;
|
|
let mut entry_file = tokio::fs::File::open(entry_path).await?;
|
|
|
|
while let Ok(len) = entry_file.read(&mut file_buf).await
|
|
&& len > 0
|
|
{
|
|
zip.write(&file_buf[..len])?;
|
|
}
|
|
}
|
|
|
|
zip.finish()?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
struct ChannelWriter(std::sync::mpsc::Sender<Result<bytes::Bytes, std::io::Error>>);
|
|
|
|
impl Write for ChannelWriter {
|
|
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
|
let len = buf.len();
|
|
let data = bytes::Bytes::copy_from_slice(buf);
|
|
|
|
self.0
|
|
.send(Ok(data))
|
|
.map(|_| len)
|
|
.map_err(|_| std::io::ErrorKind::Other.into())
|
|
}
|
|
|
|
fn flush(&mut self) -> std::io::Result<()> {
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
enum PathRequest {
|
|
Single(FilePath),
|
|
Multiple {
|
|
lowest_common_prefix: String,
|
|
paths: Vec<FilePath>,
|
|
},
|
|
}
|
|
|
|
impl PathRequest {
|
|
fn from_paths(paths: Vec<FilePath>) -> anyhow::Result<Self> {
|
|
let mut input_paths: Vec<FilePath> = HashSet::<FilePath>::from_iter(paths.into_iter())
|
|
.into_iter()
|
|
.collect();
|
|
|
|
if input_paths.len() == 1 {
|
|
return Ok(Self::Single(input_paths.pop().unwrap()));
|
|
}
|
|
|
|
let mut lowest_common_prefix = input_paths
|
|
.first()
|
|
.expect("paths to contain at least 1 entry")
|
|
.to_string();
|
|
|
|
for path in &input_paths {
|
|
let chars = lowest_common_prefix
|
|
.chars()
|
|
.zip(path.as_str().chars())
|
|
.enumerate();
|
|
|
|
for (index, (a, b)) in chars {
|
|
if a != b {
|
|
lowest_common_prefix = lowest_common_prefix[..index].to_string();
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
let lowest_common_prefix = if let Some(last_slash_index) = lowest_common_prefix.rfind("/")
|
|
&& last_slash_index > 1
|
|
{
|
|
lowest_common_prefix.truncate(last_slash_index);
|
|
lowest_common_prefix
|
|
} else {
|
|
lowest_common_prefix
|
|
};
|
|
|
|
Ok(Self::Multiple {
|
|
lowest_common_prefix,
|
|
paths: input_paths,
|
|
})
|
|
}
|
|
}
|