Files
warren/backend/src/lib/outbound/file_system.rs
2025-08-30 02:42:38 +02:00

524 lines
15 KiB
Rust

use anyhow::{Context as _, anyhow, bail};
use futures_util::TryStreamExt;
use rustix::fs::{Statx, statx};
use std::{
io::Write,
os::unix::fs::MetadataExt,
path::{Path, PathBuf},
};
use tokio::{
fs::{self},
io::{self, AsyncReadExt as _, AsyncWriteExt as _},
};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::io::ReaderStream;
use crate::{
config::Config,
domain::warren::{
models::{
file::{
AbsoluteFilePath, CatError, CatRequest, CpError, CpRequest, CpResponse, File,
FileMimeType, FileName, FilePath, FileStream, FileType, LsError, LsRequest,
LsResponse, MkdirError, MkdirRequest, MvError, MvRequest, RelativeFilePath,
RmError, RmRequest, SaveError, SaveRequest, SaveResponse, StatError, StatRequest,
StatResponse, TouchError, TouchRequest,
},
warren::UploadFileStream,
},
ports::FileSystemRepository,
},
};
const MAX_FILE_FETCH_BYTES: &str = "MAX_FILE_FETCH_BYTES";
#[derive(Debug, Clone)]
pub struct FileSystemConfig {
base_directory: String,
max_file_fetch_bytes: u64,
}
impl FileSystemConfig {
pub fn new(base_directory: String, max_file_fetch_bytes: u64) -> Self {
Self {
base_directory,
max_file_fetch_bytes,
}
}
pub fn from_env(serve_dir: String) -> anyhow::Result<Self> {
// 268435456 bytes = 0.25GB
let max_file_fetch_bytes: u64 = match Config::load_env(MAX_FILE_FETCH_BYTES) {
Ok(value) => value.parse()?,
Err(_) => 268435456,
};
Ok(Self::new(serve_dir, max_file_fetch_bytes))
}
}
#[derive(Debug, Clone)]
pub struct FileSystem {
base_directory: FilePath,
max_file_fetch_bytes: u64,
}
impl FileSystem {
pub fn new(config: FileSystemConfig) -> anyhow::Result<Self> {
let file_system = Self {
base_directory: FilePath::new(&config.base_directory)?,
max_file_fetch_bytes: config.max_file_fetch_bytes,
};
Ok(file_system)
}
/// Combines `self.base_directory` with the specified path
///
/// * `path`: The absolute path (absolute in relation to the base directory)
fn get_target_path(&self, path: &AbsoluteFilePath) -> FilePath {
self.base_directory.join(&path.as_relative())
}
async fn get_all_files(
&self,
absolute_path: &AbsoluteFilePath,
include_parent: bool,
) -> anyhow::Result<LsResponse> {
let dir_path = self.get_target_path(absolute_path).as_ref().to_path_buf();
let mut files = Vec::new();
let parent = if include_parent {
let dir_name = FileName::new(
&dir_path
.file_name()
.context("Failed to get directory name")?
.to_owned()
.into_string()
.ok()
.context("Failed to get directory name")?,
)?;
Some(File::new(
dir_name,
FileType::Directory,
None,
get_btime(&dir_path),
))
} else {
None
};
let mut dir = fs::read_dir(&dir_path).await?;
while let Ok(Some(entry)) = dir.next_entry().await {
let name = entry
.file_name()
.into_string()
.ok()
.context("Failed to get file name")?;
let file_type = {
let file_type = entry.file_type().await?;
if file_type.is_dir() {
FileType::Directory
} else if file_type.is_file() {
FileType::File
} else {
continue;
}
};
let created_at = get_btime(entry.path());
let mime_type = match file_type {
FileType::File => FileMimeType::from_name(&name),
_ => None,
};
files.push(File::new(
FileName::new(&name)?,
file_type,
mime_type,
created_at,
));
}
Ok(LsResponse::new(files, parent))
}
/// Actually created a directory in the underlying file system
///
/// * `path`: The directory's absolute path (absolute not in relation to the root file system but `self.base_directory`)
async fn mkdir(&self, path: &AbsoluteFilePath) -> io::Result<()> {
let file_path = self.get_target_path(path);
if fs::try_exists(&file_path).await? {
return Err(io::ErrorKind::AlreadyExists.into());
}
fs::create_dir(&file_path).await
}
/// Actually removes a file or directory from the underlying file system
///
/// * `path`: The directory's absolute path (absolute not in relation to the root file system but `self.base_directory`)
/// * `force`: Whether to delete directories that are not empty
async fn rm(&self, path: &AbsoluteFilePath, force: bool) -> io::Result<()> {
let file_path = self.get_target_path(path);
if fs::metadata(&file_path).await?.is_file() {
return fs::remove_file(&file_path).await;
}
if force {
fs::remove_dir_all(&file_path).await
} else {
fs::remove_dir(&file_path).await
}
}
async fn save(
&self,
path: &AbsoluteFilePath,
stream: &mut UploadFileStream<'_>,
) -> anyhow::Result<Vec<AbsoluteFilePath>> {
let base_path = self.get_target_path(path);
let mut paths = Vec::new();
while let Ok(Some(mut upload_file)) = stream.try_next().await {
let file_name_as_path = RelativeFilePath::from_str(upload_file.file_name().as_str())?;
let file_path = base_path.join(&file_name_as_path);
let mut file = fs::OpenOptions::new()
.write(true)
.create(true)
.open(&file_path)
.await?;
while let Ok(Some(chunk)) = upload_file.try_next().await {
file.write(&chunk).await?;
}
paths.push(path.clone().join(&file_name_as_path));
}
Ok(paths)
}
async fn cat(&self, path: &AbsoluteFilePath) -> anyhow::Result<FileStream> {
let path = self.get_target_path(path);
let file = fs::OpenOptions::new()
.create(false)
.write(false)
.read(true)
.open(&path)
.await?;
let metadata = file.metadata().await?;
if metadata.is_dir() {
drop(file);
let (sync_tx, sync_rx) =
std::sync::mpsc::channel::<Result<bytes::Bytes, std::io::Error>>();
let (tx, rx) = tokio::sync::mpsc::channel::<Result<bytes::Bytes, std::io::Error>>(1024);
tokio::task::spawn(create_zip(path, sync_tx));
tokio::task::spawn(async move {
while let Ok(v) = sync_rx.recv() {
let _ = tx.send(v).await;
}
});
let stream = FileStream::new(FileType::Directory, ReceiverStream::new(rx));
return Ok(stream);
}
let file_size = metadata.size();
if file_size > self.max_file_fetch_bytes {
bail!("File size exceeds configured limit");
}
let stream = FileStream::new(FileType::File, ReaderStream::new(file));
Ok(stream)
}
async fn mv(&self, path: &AbsoluteFilePath, target_path: &AbsoluteFilePath) -> io::Result<()> {
let current_path = self.get_target_path(path);
let target_path = self.get_target_path(target_path);
if !fs::try_exists(&current_path).await? {
return Err(io::ErrorKind::NotFound.into());
}
if fs::try_exists(&target_path).await? {
return Err(io::ErrorKind::AlreadyExists.into());
}
fs::rename(current_path, &target_path).await
}
async fn touch(&self, path: &AbsoluteFilePath) -> io::Result<()> {
let path = self.get_target_path(path);
fs::OpenOptions::new()
.create(true)
.write(true)
.open(&path)
.await
.map(|_| ())
}
async fn cp(
&self,
path: AbsoluteFilePath,
target_path: AbsoluteFilePath,
) -> io::Result<CpResponse> {
let fs_current_path = self.get_target_path(&path);
let fs_target_path = self.get_target_path(&target_path);
fs::copy(fs_current_path, fs_target_path).await?;
Ok(CpResponse::new(path, target_path))
}
async fn stat(&self, path: AbsoluteFilePath) -> anyhow::Result<File> {
let target_path = self.get_target_path(&path);
let fs_path = PathBuf::from(target_path.to_string());
let metadata = fs::metadata(&fs_path).await?;
let name = {
let file_name = fs_path
.clone()
.file_name()
.context("Failed to get file name")?
.to_owned()
.into_string()
.ok()
.context("Failed to get file name")?;
FileName::new(&file_name)?
};
let file_type = {
let file_type = metadata.file_type();
if file_type.is_dir() {
FileType::Directory
} else if file_type.is_file() {
FileType::File
} else {
bail!("Invalid file type");
}
};
let mime_type = match file_type {
FileType::File => FileMimeType::from_name(name.as_str()),
_ => None,
};
let created_at = get_btime(&fs_path);
Ok(File::new(name, file_type, mime_type, created_at))
}
}
impl FileSystemRepository for FileSystem {
async fn ls(&self, request: LsRequest) -> Result<LsResponse, LsError> {
let files = self
.get_all_files(request.path(), request.include_parent())
.await
.map_err(|err| {
anyhow!(err).context(format!(
"Failed to get the files at path: {}",
request.path()
))
})?;
Ok(files)
}
async fn cat(&self, request: CatRequest) -> Result<FileStream, CatError> {
self.cat(request.path())
.await
.map_err(|e| anyhow!("Failed to fetch file {}: {e:?}", request.path()).into())
}
async fn mkdir(&self, request: MkdirRequest) -> Result<(), MkdirError> {
self.mkdir(request.path())
.await
.map_err(|e| match e.kind() {
std::io::ErrorKind::AlreadyExists => MkdirError::Exists,
_ => anyhow!("Failed to create directory at path: {}", request.path()).into(),
})
}
async fn rm(&self, request: RmRequest) -> Result<(), RmError> {
self.rm(request.path(), request.force())
.await
.map_err(|e| match e.kind() {
std::io::ErrorKind::NotFound => RmError::NotFound,
std::io::ErrorKind::DirectoryNotEmpty => RmError::NotEmpty,
_ => anyhow!("Failed to delete file at {}: {e:?}", request.path()).into(),
})
}
async fn mv(&self, request: MvRequest) -> Result<(), MvError> {
self.mv(request.path(), request.target_path())
.await
.map_err(|e| match e.kind() {
std::io::ErrorKind::NotFound => MvError::NotFound,
_ => anyhow!(
"Failed to move {} to {}: {e:?}",
request.path(),
request.target_path()
)
.into(),
})
}
async fn touch(&self, request: TouchRequest) -> Result<(), TouchError> {
self.touch(request.path())
.await
.map_err(|e| match e.kind() {
std::io::ErrorKind::NotFound => TouchError::NotFound,
_ => anyhow!("Failed to touch path {}: {e:?}", request.path()).into(),
})
}
async fn save(&self, request: SaveRequest<'_>) -> Result<SaveResponse, SaveError> {
let (path, mut stream) = request.unpack();
Ok(self.save(&path, &mut stream).await.map(SaveResponse::new)?)
}
async fn cp(&self, request: CpRequest) -> Result<CpResponse, CpError> {
let (path, target_path) = request.into_paths();
self.cp(path, target_path)
.await
.map_err(|e| CpError::Unknown(e.into()))
}
async fn stat(&self, request: StatRequest) -> Result<StatResponse, StatError> {
let path = request.into_path();
Ok(self.stat(path).await.map(StatResponse::new)?)
}
}
// TODO: Use `DirEntry::metadata` once `target=x86_64-unknown-linux-musl` updates from musl 1.2.3 to 1.2.5
// https://github.com/rust-lang/rust/pull/142682
fn get_btime<P>(path: P) -> Option<u64>
where
P: rustix::path::Arg,
{
get_statx(path)
.ok()
.map(|statx| statx.stx_btime.tv_sec as u64)
}
fn get_statx<P>(path: P) -> rustix::io::Result<Statx>
where
P: rustix::path::Arg,
{
unsafe {
statx(
std::os::fd::BorrowedFd::borrow_raw(-100),
path,
rustix::fs::AtFlags::empty(),
rustix::fs::StatxFlags::BTIME,
)
}
}
async fn walk_dir<P>(dir: P) -> Result<Vec<PathBuf>, tokio::io::Error>
where
P: AsRef<Path>,
{
let mut dirs = vec![dir.as_ref().to_owned()];
let mut files = vec![];
while !dirs.is_empty() {
let mut dir_iter = tokio::fs::read_dir(dirs.remove(0)).await?;
while let Some(entry) = dir_iter.next_entry().await? {
let entry_path_buf = entry.path();
if entry_path_buf.is_dir() {
dirs.push(entry_path_buf);
} else {
files.push(entry_path_buf);
}
}
}
Ok(files)
}
async fn create_zip<P>(
path: P,
tx: std::sync::mpsc::Sender<Result<bytes::Bytes, std::io::Error>>,
) -> anyhow::Result<()>
where
P: AsRef<Path>,
{
let options = zip::write::SimpleFileOptions::default()
.compression_method(zip::CompressionMethod::Stored)
.compression_level(None)
.large_file(true)
.unix_permissions(0o644);
let mut file_buf = Vec::new();
let mut zip = zip::write::ZipWriter::new_stream(ChannelWriter(tx));
let entries = walk_dir(&path).await?;
for entry_path_buf in entries {
let entry_path = entry_path_buf.as_path();
let entry_str = entry_path
.strip_prefix(&path)?
.to_str()
.context("Failed to get directory entry name")?;
if entry_path.is_dir() {
zip.add_directory(entry_str, options)?;
continue;
}
zip.start_file(entry_str, options)?;
let mut entry_file = tokio::fs::File::open(entry_path).await?;
entry_file.read_to_end(&mut file_buf).await?;
zip.write_all(&file_buf)?;
file_buf.clear();
}
zip.finish()?;
Ok(())
}
struct ChannelWriter(std::sync::mpsc::Sender<Result<bytes::Bytes, std::io::Error>>);
impl Write for ChannelWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let len = buf.len();
let data = bytes::Bytes::copy_from_slice(buf);
self.0
.send(Ok(data))
.map(|_| len)
.map_err(|_| std::io::ErrorKind::Other.into())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}