use std::os::unix::fs::MetadataExt; use anyhow::{Context, anyhow, bail}; use futures_util::TryStreamExt; use rustix::fs::statx; use tokio::{ fs, io::{self, AsyncWriteExt as _}, }; use tokio_util::io::ReaderStream; use crate::{ config::Config, domain::warren::{ models::{ file::{ AbsoluteFilePath, CatError, CatRequest, File, FileMimeType, FileName, FilePath, FileStream, FileType, LsError, LsRequest, LsResponse, MkdirError, MkdirRequest, MvError, MvRequest, RelativeFilePath, RmError, RmRequest, SaveError, SaveRequest, SaveResponse, TouchError, TouchRequest, }, warren::UploadFileStream, }, ports::FileSystemRepository, }, }; const MAX_FILE_FETCH_BYTES: &str = "MAX_FILE_FETCH_BYTES"; #[derive(Debug, Clone)] pub struct FileSystemConfig { base_directory: String, max_file_fetch_bytes: u64, } impl FileSystemConfig { pub fn new(base_directory: String, max_file_fetch_bytes: u64) -> Self { Self { base_directory, max_file_fetch_bytes, } } pub fn from_env(serve_dir: String) -> anyhow::Result { // 268435456 bytes = 0.25GB let max_file_fetch_bytes: u64 = match Config::load_env(MAX_FILE_FETCH_BYTES) { Ok(value) => value.parse()?, Err(_) => 268435456, }; Ok(Self::new(serve_dir, max_file_fetch_bytes)) } } #[derive(Debug, Clone)] pub struct FileSystem { base_directory: FilePath, max_file_fetch_bytes: u64, } impl FileSystem { pub fn new(config: FileSystemConfig) -> anyhow::Result { let file_system = Self { base_directory: FilePath::new(&config.base_directory)?, max_file_fetch_bytes: config.max_file_fetch_bytes, }; Ok(file_system) } /// Combines `self.base_directory` with the specified path /// /// * `path`: The absolute path (absolute in relation to the base directory) fn get_target_path(&self, path: &AbsoluteFilePath) -> FilePath { self.base_directory.join(&path.as_relative()) } async fn get_all_files( &self, absolute_path: &AbsoluteFilePath, include_parent: bool, ) -> anyhow::Result { let dir_path = self.get_target_path(absolute_path).as_ref().to_path_buf(); let mut files = Vec::new(); let parent = if include_parent { let dir_name = FileName::new( &dir_path .file_name() .context("Failed to get directory name")? .to_owned() .into_string() .ok() .context("Failed to get directory name")?, )?; Some(File::new( dir_name, FileType::Directory, None, get_btime(&dir_path), )) } else { None }; let mut dir = fs::read_dir(&dir_path).await?; while let Ok(Some(entry)) = dir.next_entry().await { let name = entry .file_name() .into_string() .ok() .context("Failed to get file name")?; let file_type = { let file_type = entry.file_type().await?; if file_type.is_dir() { FileType::Directory } else if file_type.is_file() { FileType::File } else { continue; } }; let created_at = get_btime(entry.path()); let mime_type = match file_type { FileType::File => FileMimeType::from_name(&name), _ => None, }; files.push(File::new( FileName::new(&name)?, file_type, mime_type, created_at, )); } Ok(LsResponse::new(files, parent)) } /// Actually created a directory in the underlying file system /// /// * `path`: The directory's absolute path (absolute not in relation to the root file system but `self.base_directory`) async fn mkdir(&self, path: &AbsoluteFilePath) -> io::Result<()> { let file_path = self.get_target_path(path); if fs::try_exists(&file_path).await? { return Err(io::ErrorKind::AlreadyExists.into()); } fs::create_dir(&file_path).await } /// Actually removes a file or directory from the underlying file system /// /// * `path`: The directory's absolute path (absolute not in relation to the root file system but `self.base_directory`) /// * `force`: Whether to delete directories that are not empty async fn rm(&self, path: &AbsoluteFilePath, force: bool) -> io::Result<()> { let file_path = self.get_target_path(path); if fs::metadata(&file_path).await?.is_file() { return fs::remove_file(&file_path).await; } if force { fs::remove_dir_all(&file_path).await } else { fs::remove_dir(&file_path).await } } async fn save( &self, path: &AbsoluteFilePath, stream: &mut UploadFileStream<'_>, ) -> anyhow::Result> { let base_path = self.get_target_path(path); let mut paths = Vec::new(); while let Ok(Some(mut upload_file)) = stream.try_next().await { let file_name_as_path = RelativeFilePath::from_str(upload_file.file_name().as_str())?; let file_path = base_path.join(&file_name_as_path); let mut file = fs::OpenOptions::new() .write(true) .create(true) .open(&file_path) .await?; while let Ok(Some(chunk)) = upload_file.try_next().await { file.write(&chunk).await?; } paths.push(path.clone().join(&file_name_as_path)); } Ok(paths) } async fn cat(&self, path: &AbsoluteFilePath) -> anyhow::Result { let path = self.get_target_path(path); let file = fs::OpenOptions::new() .create(false) .write(false) .read(true) .open(&path) .await?; let file_size = file.metadata().await?.size(); if file_size > self.max_file_fetch_bytes { bail!("File size exceeds configured limit"); } let stream = FileStream::new(ReaderStream::new(file)); Ok(stream) } async fn mv(&self, path: &AbsoluteFilePath, target_path: &AbsoluteFilePath) -> io::Result<()> { let current_path = self.get_target_path(path); let target_path = self.get_target_path(target_path); if !fs::try_exists(¤t_path).await? { return Err(io::ErrorKind::NotFound.into()); } if fs::try_exists(&target_path).await? { return Err(io::ErrorKind::AlreadyExists.into()); } fs::rename(current_path, &target_path).await } async fn touch(&self, path: &AbsoluteFilePath) -> io::Result<()> { let path = self.get_target_path(path); tokio::fs::File::create(&path).await.map(|_| ()) } } impl FileSystemRepository for FileSystem { async fn ls(&self, request: LsRequest) -> Result { let files = self .get_all_files(request.path(), request.include_parent()) .await .map_err(|err| { anyhow!(err).context(format!( "Failed to get the files at path: {}", request.path() )) })?; Ok(files) } async fn cat(&self, request: CatRequest) -> Result { self.cat(request.path()) .await .map_err(|e| anyhow!("Failed to fetch file {}: {e:?}", request.path()).into()) } async fn mkdir(&self, request: MkdirRequest) -> Result<(), MkdirError> { self.mkdir(request.path()) .await .map_err(|e| match e.kind() { std::io::ErrorKind::AlreadyExists => MkdirError::Exists, _ => anyhow!("Failed to create directory at path: {}", request.path()).into(), }) } async fn rm(&self, request: RmRequest) -> Result<(), RmError> { self.rm(request.path(), request.force()) .await .map_err(|e| match e.kind() { std::io::ErrorKind::NotFound => RmError::NotFound, std::io::ErrorKind::DirectoryNotEmpty => RmError::NotEmpty, _ => anyhow!("Failed to delete file at {}: {e:?}", request.path()).into(), }) } async fn mv(&self, request: MvRequest) -> Result<(), MvError> { self.mv(request.path(), request.target_path()) .await .map_err(|e| match e.kind() { std::io::ErrorKind::NotFound => MvError::NotFound, _ => anyhow!( "Failed to move {} to {}: {e:?}", request.path(), request.target_path() ) .into(), }) } async fn touch(&self, request: TouchRequest) -> Result<(), TouchError> { self.touch(request.path()) .await .map_err(|e| match e.kind() { std::io::ErrorKind::NotFound => TouchError::NotFound, _ => anyhow!("Failed to touch path {}: {e:?}", request.path()).into(), }) } async fn save(&self, request: SaveRequest<'_>) -> Result { let (path, mut stream) = request.unpack(); Ok(self.save(&path, &mut stream).await.map(SaveResponse::new)?) } } // TODO: Use `DirEntry::metadata` once `target=x86_64-unknown-linux-musl` updates from musl 1.2.3 to 1.2.5 // https://github.com/rust-lang/rust/pull/142682 fn get_btime

(path: P) -> Option where P: rustix::path::Arg, { unsafe { statx( std::os::fd::BorrowedFd::borrow_raw(-100), path, rustix::fs::AtFlags::empty(), rustix::fs::StatxFlags::BTIME, ) } .ok() .map(|statx| statx.stx_btime.tv_sec as u64) }