refactor file system operations

the most notable improvement is that uploads are now using streams so
they no longer require the entire file to be stored in memory
This commit is contained in:
2025-07-28 22:38:28 +02:00
parent bb79ea56f8
commit 23fdd55612
36 changed files with 1567 additions and 2088 deletions

View File

@@ -1,6 +1,7 @@
use std::os::unix::fs::MetadataExt;
use anyhow::{Context, anyhow, bail};
use futures_util::TryStreamExt;
use rustix::fs::statx;
use tokio::{
fs,
@@ -11,12 +12,14 @@ use tokio_util::io::ReaderStream;
use crate::{
config::Config,
domain::warren::{
models::file::{
AbsoluteFilePath, CreateDirectoryError, CreateDirectoryRequest, CreateFileError,
CreateFileRequest, DeleteDirectoryError, DeleteDirectoryRequest, DeleteFileError,
DeleteFileRequest, FetchFileError, FetchFileRequest, File, FileMimeType, FileName,
FilePath, FileStream, FileType, ListFilesError, ListFilesRequest, RenameEntryError,
RenameEntryRequest,
models::{
file::{
AbsoluteFilePath, CatError, CatRequest, File, FileMimeType, FileName, FilePath,
FileStream, FileType, LsError, LsRequest, MkdirError, MkdirRequest, MvError,
MvRequest, RelativeFilePath, RmError, RmRequest, SaveError, SaveRequest,
SaveResponse, TouchError, TouchRequest,
},
warren::UploadFileStream,
},
ports::FileSystemRepository,
},
@@ -129,49 +132,65 @@ impl FileSystem {
/// Actually created a directory in the underlying file system
///
/// * `path`: The directory's absolute path (absolute not in relation to the root file system but `self.base_directory`)
async fn create_dir(&self, path: &AbsoluteFilePath) -> io::Result<FilePath> {
async fn mkdir(&self, path: &AbsoluteFilePath) -> io::Result<()> {
let file_path = self.get_target_path(path);
if fs::try_exists(&file_path).await? {
return Err(io::ErrorKind::AlreadyExists.into());
}
fs::create_dir(&file_path).await?;
Ok(file_path)
fs::create_dir(&file_path).await
}
/// Actually removes a directory from the underlying file system
/// Actually removes a file or directory from the underlying file system
///
/// * `path`: The directory's absolute path (absolute not in relation to the root file system but `self.base_directory`)
/// * `force`: Whether to delete directories that are not empty
async fn remove_dir(&self, path: &AbsoluteFilePath, force: bool) -> io::Result<FilePath> {
async fn rm(&self, path: &AbsoluteFilePath, force: bool) -> io::Result<()> {
let file_path = self.get_target_path(path);
if force {
fs::remove_dir_all(&file_path).await?;
} else {
fs::remove_dir(&file_path).await?;
if fs::metadata(&file_path).await?.is_file() {
return fs::remove_file(&file_path).await;
}
Ok(file_path)
if force {
fs::remove_dir_all(&file_path).await
} else {
fs::remove_dir(&file_path).await
}
}
async fn write_file(&self, path: &AbsoluteFilePath, data: &[u8]) -> anyhow::Result<FilePath> {
let path = self.get_target_path(path);
async fn save(
&self,
path: &AbsoluteFilePath,
stream: &mut UploadFileStream<'_>,
) -> anyhow::Result<Vec<AbsoluteFilePath>> {
let base_path = self.get_target_path(path);
let mut file = fs::OpenOptions::new()
.write(true)
.create(true)
.open(&path)
.await?;
let paths = Vec::new();
file.write_all(data).await?;
while let Ok(Some(mut upload_file)) = stream.try_next().await {
// TODO: Refactor this result question mark chain thing
let file_name_as_path: RelativeFilePath =
FilePath::new(upload_file.file_name().as_str())?.try_into()?;
let file_path = base_path.join(&file_name_as_path);
Ok(path)
let mut file = fs::OpenOptions::new()
.write(true)
.create(true)
.open(&file_path)
.await?;
while let Ok(Some(chunk)) = upload_file.try_next().await {
tracing::info!("Writing chunk (len: {}) to {file_path}", chunk.len());
file.write(&chunk).await?;
}
}
Ok(paths)
}
async fn fetch_file(&self, path: &AbsoluteFilePath) -> anyhow::Result<FileStream> {
async fn cat(&self, path: &AbsoluteFilePath) -> anyhow::Result<FileStream> {
let path = self.get_target_path(path);
let file = fs::OpenOptions::new()
@@ -192,45 +211,30 @@ impl FileSystem {
Ok(stream)
}
/// Actually removes a file from the underlying file system
///
/// * `path`: The file's absolute path (absolute not in relation to the root file system but `self.base_directory`)
async fn remove_file(&self, path: &AbsoluteFilePath) -> anyhow::Result<FilePath> {
let path = self.get_target_path(path);
fs::remove_file(&path).await?;
Ok(path)
}
async fn rename(
&self,
path: &AbsoluteFilePath,
new_name: &FileName,
) -> anyhow::Result<FilePath> {
async fn mv(&self, path: &AbsoluteFilePath, target_path: &AbsoluteFilePath) -> io::Result<()> {
let current_path = self.get_target_path(path);
let target_path = self.get_target_path(target_path);
let new_path = {
let mut c = current_path.to_string();
let last_slash_index = c.rfind('/').unwrap();
c.drain((last_slash_index + 1)..);
c.push_str(new_name.as_str());
FilePath::new(&c)?
};
if fs::try_exists(&new_path).await? {
bail!("File exists");
if !fs::try_exists(&current_path).await? {
return Err(io::ErrorKind::NotFound.into());
}
fs::rename(current_path, &new_path).await?;
if fs::try_exists(&target_path).await? {
return Err(io::ErrorKind::AlreadyExists.into());
}
Ok(new_path)
fs::rename(current_path, &target_path).await
}
async fn touch(&self, path: &AbsoluteFilePath) -> io::Result<()> {
let path = self.get_target_path(path);
tokio::fs::File::create(&path).await.map(|_| ())
}
}
impl FileSystemRepository for FileSystem {
async fn list_files(&self, request: ListFilesRequest) -> Result<Vec<File>, ListFilesError> {
async fn ls(&self, request: LsRequest) -> Result<Vec<File>, LsError> {
let files = self.get_all_files(request.path()).await.map_err(|err| {
anyhow!(err).context(format!(
"Failed to get the files at path: {}",
@@ -241,81 +245,56 @@ impl FileSystemRepository for FileSystem {
Ok(files)
}
async fn create_directory(
&self,
request: CreateDirectoryRequest,
) -> Result<FilePath, CreateDirectoryError> {
match self.create_dir(request.path()).await {
Ok(path) => Ok(path),
Err(e) => match e.kind() {
std::io::ErrorKind::AlreadyExists => Err(CreateDirectoryError::Exists),
_ => Err(anyhow!("Failed to create directory at path: {}", request.path()).into()),
},
}
}
async fn delete_directory(
&self,
request: DeleteDirectoryRequest,
) -> Result<FilePath, DeleteDirectoryError> {
match self.remove_dir(request.path(), request.force()).await {
Ok(deleted_path) => return Ok(deleted_path),
Err(e) => match e.kind() {
std::io::ErrorKind::NotFound => Err(DeleteDirectoryError::NotFound),
std::io::ErrorKind::DirectoryNotEmpty => Err(DeleteDirectoryError::NotEmpty),
_ => Err(anyhow!("Failed to delete directory at {}: {e:?}", request.path()).into()),
},
}
}
async fn create_file(&self, request: CreateFileRequest) -> Result<FilePath, CreateFileError> {
let file_path = self
.write_file(request.path(), request.data())
async fn cat(&self, request: CatRequest) -> Result<FileStream, CatError> {
self.cat(request.path())
.await
.map_err(|e| {
anyhow!(
"Failed to write {} byte(s) to path {}: {e:?}",
request.data().len(),
request.path()
)
})?;
Ok(file_path)
.map_err(|e| anyhow!("Failed to fetch file {}: {e:?}", request.path()).into())
}
async fn fetch_file(&self, request: FetchFileRequest) -> Result<FileStream, FetchFileError> {
let contents = self
.fetch_file(request.path())
async fn mkdir(&self, request: MkdirRequest) -> Result<(), MkdirError> {
self.mkdir(request.path())
.await
.map_err(|e| anyhow!("Failed to fetch file {}: {e:?}", request.path()))?;
Ok(contents)
.map_err(|e| match e.kind() {
std::io::ErrorKind::AlreadyExists => MkdirError::Exists,
_ => anyhow!("Failed to create directory at path: {}", request.path()).into(),
})
}
async fn delete_file(&self, request: DeleteFileRequest) -> Result<FilePath, DeleteFileError> {
let deleted_path = self
.remove_file(request.path())
async fn rm(&self, request: RmRequest) -> Result<(), RmError> {
self.rm(request.path(), request.force())
.await
.context(format!("Failed to delete file at {}", request.path()))?;
Ok(deleted_path)
.map_err(|e| match e.kind() {
std::io::ErrorKind::NotFound => RmError::NotFound,
std::io::ErrorKind::DirectoryNotEmpty => RmError::NotEmpty,
_ => anyhow!("Failed to delete file at {}: {e:?}", request.path()).into(),
})
}
async fn rename_entry(
&self,
request: RenameEntryRequest,
) -> Result<FilePath, RenameEntryError> {
let new_path = self
.rename(request.path(), request.new_name())
async fn mv(&self, request: MvRequest) -> Result<(), MvError> {
self.mv(request.path(), request.target_path())
.await
.map_err(|e| {
anyhow!(
"Failed to rename {} to {}: {e:?}",
.map_err(|e| match e.kind() {
std::io::ErrorKind::NotFound => MvError::NotFound,
_ => anyhow!(
"Failed to move {} to {}: {e:?}",
request.path(),
request.new_name()
request.target_path()
)
})?;
.into(),
})
}
Ok(new_path)
async fn touch(&self, request: TouchRequest) -> Result<(), TouchError> {
self.touch(request.path())
.await
.map_err(|e| match e.kind() {
std::io::ErrorKind::NotFound => TouchError::NotFound,
_ => anyhow!("Failed to touch path {}: {e:?}", request.path()).into(),
})
}
async fn save(&self, request: SaveRequest<'_>) -> Result<SaveResponse, SaveError> {
let (path, mut stream) = request.unpack();
Ok(self.save(&path, &mut stream).await.map(SaveResponse::new)?)
}
}