basic file sharing

This commit is contained in:
2025-08-29 15:32:23 +02:00
parent c8b52a5b3b
commit 284d805590
84 changed files with 3969 additions and 375 deletions

View File

@@ -1,8 +1,8 @@
use std::os::unix::fs::MetadataExt;
use std::{os::unix::fs::MetadataExt, path::PathBuf};
use anyhow::{Context, anyhow, bail};
use futures_util::TryStreamExt;
use rustix::fs::statx;
use rustix::fs::{Statx, statx};
use tokio::{
fs,
io::{self, AsyncWriteExt as _},
@@ -17,7 +17,8 @@ use crate::{
AbsoluteFilePath, CatError, CatRequest, CpError, CpRequest, CpResponse, File,
FileMimeType, FileName, FilePath, FileStream, FileType, LsError, LsRequest,
LsResponse, MkdirError, MkdirRequest, MvError, MvRequest, RelativeFilePath,
RmError, RmRequest, SaveError, SaveRequest, SaveResponse, TouchError, TouchRequest,
RmError, RmRequest, SaveError, SaveRequest, SaveResponse, StatError, StatRequest,
StatResponse, TouchError, TouchRequest,
},
warren::UploadFileStream,
},
@@ -242,7 +243,12 @@ impl FileSystem {
async fn touch(&self, path: &AbsoluteFilePath) -> io::Result<()> {
let path = self.get_target_path(path);
fs::File::create(&path).await.map(|_| ())
fs::OpenOptions::new()
.create(true)
.write(true)
.open(&path)
.await
.map(|_| ())
}
async fn cp(
@@ -257,6 +263,46 @@ impl FileSystem {
Ok(CpResponse::new(path, target_path))
}
async fn stat(&self, path: AbsoluteFilePath) -> anyhow::Result<File> {
let target_path = self.get_target_path(&path);
let fs_path = PathBuf::from(target_path.to_string());
let metadata = fs::metadata(&fs_path).await?;
let name = {
let file_name = fs_path
.clone()
.file_name()
.context("Failed to get file name")?
.to_owned()
.into_string()
.ok()
.context("Failed to get file name")?;
FileName::new(&file_name)?
};
let file_type = {
let file_type = metadata.file_type();
if file_type.is_dir() {
FileType::Directory
} else if file_type.is_file() {
FileType::File
} else {
bail!("Invalid file type");
}
};
let mime_type = match file_type {
FileType::File => FileMimeType::from_name(name.as_str()),
_ => None,
};
let created_at = get_btime(&fs_path);
Ok(File::new(name, file_type, mime_type, created_at))
}
}
impl FileSystemRepository for FileSystem {
@@ -333,11 +379,25 @@ impl FileSystemRepository for FileSystem {
.await
.map_err(|e| CpError::Unknown(e.into()))
}
async fn stat(&self, request: StatRequest) -> Result<StatResponse, StatError> {
let path = request.into_path();
Ok(self.stat(path).await.map(StatResponse::new)?)
}
}
// TODO: Use `DirEntry::metadata` once `target=x86_64-unknown-linux-musl` updates from musl 1.2.3 to 1.2.5
// https://github.com/rust-lang/rust/pull/142682
fn get_btime<P>(path: P) -> Option<u64>
where
P: rustix::path::Arg,
{
get_statx(path)
.ok()
.map(|statx| statx.stx_btime.tv_sec as u64)
}
fn get_statx<P>(path: P) -> rustix::io::Result<Statx>
where
P: rustix::path::Arg,
{
@@ -349,6 +409,4 @@ where
rustix::fs::StatxFlags::BTIME,
)
}
.ok()
.map(|statx| statx.stx_btime.tv_sec as u64)
}

View File

@@ -110,6 +110,47 @@ impl WarrenMetrics for MetricsDebugLogger {
async fn record_warren_cp_failure(&self) {
tracing::debug!("[Metrics] Warren entry cp failed");
}
async fn record_warren_get_share_success(&self) {
tracing::debug!("[Metrics] Warren get share succeeded");
}
async fn record_warren_get_share_failure(&self) {
tracing::debug!("[Metrics] Warren get share failed");
}
async fn record_warren_share_creation_success(&self) {
tracing::debug!("[Metrics] Warren share creation succeeded");
}
async fn record_warren_share_creation_failure(&self) {
tracing::debug!("[Metrics] Warren share creation failed");
}
async fn record_warren_share_list_success(&self) {
tracing::debug!("[Metrics] Warren share list succeeded");
}
async fn record_warren_share_list_failure(&self) {
tracing::debug!("[Metrics] Warren share list failed");
}
async fn record_warren_share_deletion_success(&self) {
tracing::debug!("[Metrics] Warren share deletion succeeded");
}
async fn record_warren_share_deletion_failure(&self) {
tracing::debug!("[Metrics] Warren share deletion failed");
}
async fn record_warren_share_ls_success(&self) {
tracing::debug!("[Metrics] Warren share ls succeeded");
}
async fn record_warren_share_ls_failure(&self) {
tracing::debug!("[Metrics] Warren share ls failed");
}
async fn record_warren_share_cat_success(&self) {
tracing::debug!("[Metrics] Warren share cat succeeded");
}
async fn record_warren_share_cat_failure(&self) {
tracing::debug!("[Metrics] Warren share cat failed");
}
}
impl FileSystemMetrics for MetricsDebugLogger {
@@ -168,6 +209,13 @@ impl FileSystemMetrics for MetricsDebugLogger {
async fn record_cp_failure(&self) {
tracing::debug!("[Metrics] Cp failed");
}
async fn record_stat_success(&self) {
tracing::debug!("[Metrics] Stat succeeded");
}
async fn record_stat_failure(&self) {
tracing::debug!("[Metrics] Stat failed");
}
}
impl AuthMetrics for MetricsDebugLogger {
@@ -359,6 +407,27 @@ impl AuthMetrics for MetricsDebugLogger {
async fn record_auth_warren_cp_failure(&self) {
tracing::debug!("[Metrics] Auth warren cp failed");
}
async fn record_auth_share_creation_success(&self) {
tracing::debug!("[Metrics] Auth warren share creation succeeded");
}
async fn record_auth_share_creation_failure(&self) {
tracing::debug!("[Metrics] Auth warren share creation failed");
}
async fn record_auth_share_list_success(&self) {
tracing::debug!("[Metrics] Auth warren share list succeeded");
}
async fn record_auth_share_list_failure(&self) {
tracing::debug!("[Metrics] Auth warren share list failed");
}
async fn record_auth_share_deletion_success(&self) {
tracing::debug!("[Metrics] Auth warren share deletion succeeded");
}
async fn record_auth_share_deletion_failure(&self) {
tracing::debug!("[Metrics] Auth warren share deletion failed");
}
}
impl OidcMetrics for MetricsDebugLogger {

View File

@@ -9,6 +9,10 @@ use crate::domain::{
models::{
auth_session::requests::FetchAuthSessionResponse,
file::{AbsoluteFilePath, LsResponse},
share::{
CreateShareResponse, DeleteShareResponse, GetShareResponse, ListSharesResponse,
ShareCatResponse, ShareLsResponse,
},
user::{
ListAllUsersAndWarrensResponse, LoginUserOidcResponse, LoginUserResponse, User,
},
@@ -115,6 +119,57 @@ impl WarrenNotifier for NotifierDebugLogger {
response.warren().name()
);
}
async fn warren_share_created(&self, response: &CreateShareResponse) {
tracing::debug!(
"[Notifier] Created share for file {} in warren {}",
response.share().path(),
response.share().warren_id(),
);
}
async fn warren_shares_listed(&self, response: &ListSharesResponse) {
tracing::debug!(
"[Notifier] Listed {} share(s) for file {} in warren {}",
response.shares().len(),
response.path(),
response.warren().id(),
);
}
async fn warren_share_deleted(&self, response: &DeleteShareResponse) {
tracing::debug!(
"[Notifier] Deleted share {} for file {} in warren {}",
response.share().id(),
response.share().path(),
response.share().warren_id(),
);
}
async fn got_warren_share(&self, response: &GetShareResponse) {
tracing::debug!(
"[Notifier] Got share {} from warren {}",
response.share().id(),
response.share().warren_id()
);
}
async fn warren_share_ls(&self, response: &ShareLsResponse) {
tracing::debug!(
"[Notifier] Listed {} file(s) in share {} at path {}",
response.base().files().len(),
response.share().id(),
response.path()
);
}
async fn warren_share_cat(&self, response: &ShareCatResponse) {
tracing::debug!(
"[Notifier] Fetched file {} from share {}",
response.path(),
response.share().id(),
);
}
}
impl FileSystemNotifier for NotifierDebugLogger {
@@ -149,6 +204,10 @@ impl FileSystemNotifier for NotifierDebugLogger {
async fn cp(&self, path: &AbsoluteFilePath, target_path: &AbsoluteFilePath) {
tracing::debug!("[Notifier] Copied file {} to {}", path, target_path);
}
async fn stat(&self, path: &AbsoluteFilePath) {
tracing::debug!("[Notifier] Got stats for file {}", path);
}
}
impl AuthNotifier for NotifierDebugLogger {
@@ -368,6 +427,35 @@ impl AuthNotifier for NotifierDebugLogger {
user.id()
)
}
async fn auth_warren_share_created(&self, user: &User, response: &CreateShareResponse) {
tracing::debug!(
"[Notifier] Created share for file {} in warren {} for authenticated user {}",
response.share().path(),
response.share().warren_id(),
user.id(),
);
}
async fn auth_warren_shares_listed(&self, user: &User, response: &ListSharesResponse) {
tracing::debug!(
"[Notifier] Listed {} share(s) for file {} in warren {} for authenticated user {}",
response.shares().len(),
response.path(),
response.warren().id(),
user.id(),
);
}
async fn auth_warren_share_deleted(&self, user: &User, response: &DeleteShareResponse) {
tracing::debug!(
"[Notifier] Deleted share {} for file {} in warren {} for authenticated user {}",
response.share().id(),
response.share().path(),
response.share().warren_id(),
user.id(),
);
}
}
impl OidcNotifier for NotifierDebugLogger {

View File

@@ -1,5 +1,3 @@
use std::time::Duration;
use anyhow::{Context as _, anyhow, bail};
use argon2::{
Argon2, PasswordHash, PasswordVerifier as _,
@@ -9,8 +7,7 @@ use argon2::{
},
};
use chrono::Utc;
use sqlx::{Acquire as _, PgConnection, PgPool};
use tokio::task::JoinHandle;
use sqlx::{Acquire as _, PgConnection};
use uuid::Uuid;
use crate::domain::warren::{
@@ -372,27 +369,7 @@ impl AuthRepository for Postgres {
}
impl Postgres {
pub(super) fn start_session_cleanup_task(pool: PgPool, interval: Duration) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
{
let Ok(mut connection) = pool.acquire().await else {
break;
};
if let Ok(count) = Self::delete_expired_auth_sessions(&mut connection).await {
tracing::debug!("Removed {count} expired auth session(s)");
}
}
tokio::time::sleep(interval).await;
}
tracing::debug!("Session cleanup task stopped");
})
}
async fn delete_expired_auth_sessions(
pub(super) async fn delete_expired_auth_sessions(
connection: &mut PgConnection,
) -> Result<u64, sqlx::Error> {
let delete_count = sqlx::query(
@@ -916,7 +893,11 @@ impl Postgres {
can_list_files = $3,
can_read_files = $4,
can_modify_files = $5,
can_delete_files = $6
can_delete_files = $6,
can_list_shares = $7,
can_create_shares = $8,
can_modify_shares = $9,
can_delete_shares = $10
WHERE
user_id = $1 AND
warren_id = $2
@@ -930,6 +911,10 @@ impl Postgres {
.bind(user_warren.can_read_files())
.bind(user_warren.can_modify_files())
.bind(user_warren.can_delete_files())
.bind(user_warren.can_list_shares())
.bind(user_warren.can_create_shares())
.bind(user_warren.can_modify_shares())
.bind(user_warren.can_delete_shares())
.fetch_one(connection)
.await?;

View File

@@ -5,7 +5,9 @@ use sqlx::{
ConnectOptions as _, Connection as _, PgConnection, PgPool,
postgres::{PgConnectOptions, PgPoolOptions},
};
use tokio::task::JoinHandle;
pub mod auth;
pub mod share;
pub mod warrens;
#[derive(Debug, Clone)]
@@ -58,10 +60,34 @@ impl Postgres {
sqlx::migrate!("./migrations").run(&pool).await?;
// 3600 seconds = 1 hour
Self::start_session_cleanup_task(pool.clone(), Duration::from_secs(3600));
Self::start_cleanup_tasks(pool.clone(), Duration::from_secs(3600));
Ok(Self { pool })
}
pub(super) fn start_cleanup_tasks(pool: PgPool, interval: Duration) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
{
let Ok(mut connection) = pool.acquire().await else {
break;
};
if let Ok(count) = Self::delete_expired_auth_sessions(&mut connection).await {
tracing::debug!("Removed {count} expired auth session(s)");
}
if let Ok(count) = Self::delete_expired_shares(&mut connection).await {
tracing::debug!("Deleted {count} expired share(s)");
}
}
tokio::time::sleep(interval).await;
}
tracing::debug!("Session cleanup task stopped");
})
}
}
pub(super) fn is_not_found_error(err: &sqlx::Error) -> bool {

View File

@@ -0,0 +1,285 @@
use anyhow::anyhow;
use argon2::{
Argon2, PasswordHash, PasswordVerifier as _,
password_hash::{PasswordHasher as _, SaltString, rand_core::OsRng},
};
use chrono::{NaiveDateTime, Utc};
use sqlx::{Acquire as _, PgConnection};
use thiserror::Error;
use uuid::Uuid;
use crate::domain::warren::models::{
file::{AbsoluteFilePathError, FilePath, FilePathError},
share::{
CreateShareRequest, DeleteShareRequest, GetShareRequest, ListSharesRequest, Share,
VerifySharePasswordError, VerifySharePasswordRequest,
},
warren::HasWarrenId as _,
};
use super::{Postgres, is_not_found_error};
#[derive(sqlx::FromRow)]
struct ShareRow {
id: Uuid,
creator_id: Uuid,
warren_id: Uuid,
path: String,
password_hash: Option<String>,
expires_at: Option<NaiveDateTime>,
created_at: NaiveDateTime,
}
#[derive(Debug, Error)]
enum TryFromShareRowError {
#[error(transparent)]
FilePath(#[from] FilePathError),
#[error(transparent)]
AbsoluteFilePath(#[from] AbsoluteFilePathError),
}
impl TryFrom<ShareRow> for Share {
type Error = TryFromShareRowError;
fn try_from(value: ShareRow) -> Result<Self, Self::Error> {
Ok(Share::new(
value.id,
value.creator_id,
value.warren_id,
FilePath::new(&value.path)?.try_into()?,
value.password_hash,
value
.expires_at
.map(|nt| nt.and_utc().timestamp_millis() as u64),
value.created_at.and_utc().timestamp_millis() as u64,
))
}
}
pub(super) async fn get_share(
connection: &mut PgConnection,
request: GetShareRequest,
) -> anyhow::Result<Share> {
let share_row: ShareRow = sqlx::query_as(
"
SELECT
id,
creator_id,
warren_id,
path,
password_hash,
expires_at,
created_at
FROM
shares
WHERE
id = $1 AND
(expires_at IS NULL OR expires_at > CURRENT_TIMESTAMP)
",
)
.bind(request.share_id())
.fetch_one(connection)
.await?;
Ok(Share::try_from(share_row)?)
}
pub(super) async fn list_shares(
connection: &mut PgConnection,
request: ListSharesRequest,
) -> anyhow::Result<Vec<Share>> {
let share_rows: Vec<ShareRow> = sqlx::query_as(
"
SELECT
id,
creator_id,
warren_id,
path,
password_hash,
expires_at,
created_at
FROM
shares
WHERE
warren_id = $1 AND
path = $2
ORDER BY
created_at DESC
",
)
.bind(request.warren_id())
.bind(request.path())
.fetch_all(connection)
.await?;
let shares = share_rows
.into_iter()
.map(Share::try_from)
.collect::<Result<Vec<Share>, TryFromShareRowError>>()?;
Ok(shares)
}
pub(super) async fn create_share(
connection: &mut PgConnection,
request: CreateShareRequest,
) -> anyhow::Result<Share> {
let mut tx = connection.begin().await?;
let password_hash = if let Some(password) = request.base().password() {
let salt = SaltString::generate(&mut OsRng);
let argon2 = Argon2::default();
Some(
argon2
.hash_password(password.as_str().as_bytes(), &salt)
.map(|h| h.to_string())
.map_err(|e| anyhow!("Failed to hash file password: {e:?}"))?,
)
} else {
None
};
let expires_at = if let Some(lifetime) = request.base().lifetime() {
Some(Utc::now().timestamp_millis() + i64::try_from(lifetime)?.saturating_mul(1000))
} else {
None
};
let share: ShareRow = sqlx::query_as(
"
INSERT INTO shares (
creator_id,
warren_id,
path,
password_hash,
expires_at
) VALUES (
$1,
$2,
$3,
$4,
TO_TIMESTAMP($5::double precision / 1000)
)
RETURNING
*
",
)
.bind(request.creator_id())
.bind(request.warren_id())
.bind(request.base().path())
.bind(password_hash)
.bind(expires_at)
.fetch_one(&mut *tx)
.await?;
tx.commit().await?;
Ok(Share::try_from(share)?)
}
pub(super) async fn delete_share(
connection: &mut PgConnection,
request: DeleteShareRequest,
) -> anyhow::Result<Share> {
let mut tx = connection.begin().await?;
let share_row: ShareRow = sqlx::query_as(
"
DELETE FROM
shares
WHERE
id = $1
RETURNING
*
",
)
.bind(request.share_id())
.fetch_one(&mut *tx)
.await?;
tx.commit().await?;
Ok(Share::try_from(share_row)?)
}
pub(super) async fn verify_password(
connection: &mut PgConnection,
request: VerifySharePasswordRequest,
) -> Result<Share, VerifySharePasswordError> {
let share_row: ShareRow = sqlx::query_as(
"
SELECT
*
FROM
shares
WHERE
id = $1 AND
(expires_at IS NULL OR expires_at > CURRENT_TIMESTAMP)
",
)
.bind(request.share_id())
.fetch_one(connection)
.await
.map_err(|e| {
if is_not_found_error(&e) {
VerifySharePasswordError::NotFound
} else {
anyhow!(e).into()
}
})?;
let share = Share::try_from(share_row).map_err(|e| anyhow!(e))?;
match (request.password(), share.password_hash()) {
// If the share doesn't have a password hash or the request provided a password but the share doesn't have one we do
// not care
(None, None) | (Some(_), None) => Ok(share),
// If there is a password hash and the request provided a password we have to check if they
// match
(Some(password), Some(hash)) => {
let argon = Argon2::default();
let hash = PasswordHash::new(hash)
.map_err(|e| VerifySharePasswordError::Unknown(anyhow!(e)))?;
argon
.verify_password(password.as_str().as_bytes(), &hash)
.map_err(|e| match e {
argon2::password_hash::Error::Password => {
VerifySharePasswordError::IncorrectPassword
}
_ => VerifySharePasswordError::Unknown(anyhow!(e)),
})?;
Ok(share)
}
// If the request didn't provide a password but the share has a password hash the access
// should be denied
(None, Some(_hash)) => Err(VerifySharePasswordError::IncorrectPassword),
}
}
impl Postgres {
pub(super) async fn delete_expired_shares(
connection: &mut PgConnection,
) -> Result<u64, sqlx::Error> {
let delete_count = sqlx::query(
"
DELETE FROM
shares
WHERE
expires_at <= CURRENT_TIMESTAMP
",
)
.execute(connection)
.await?
.rows_affected();
Ok(delete_count)
}
}

View File

@@ -5,6 +5,12 @@ use uuid::Uuid;
use crate::domain::warren::{
models::{
file::AbsoluteFilePath,
share::{
CreateShareError, CreateShareRequest, CreateShareResponse, DeleteShareError,
DeleteShareRequest, DeleteShareResponse, GetShareError, GetShareRequest,
ListSharesError, ListSharesRequest, ListSharesResponse, Share,
VerifySharePasswordError, VerifySharePasswordRequest, VerifySharePasswordResponse,
},
warren::{
CreateWarrenError, CreateWarrenRequest, DeleteWarrenError, DeleteWarrenRequest,
EditWarrenError, EditWarrenRequest, FetchWarrenError, FetchWarrenRequest,
@@ -132,6 +138,86 @@ impl WarrenRepository for Postgres {
Ok(warren)
}
async fn get_warren_share(&self, request: GetShareRequest) -> Result<Share, GetShareError> {
let mut connection = self
.pool
.acquire()
.await
.context("Failed to get a PostgreSQL connection")?;
super::share::get_share(&mut connection, request)
.await
.map_err(Into::into)
}
async fn create_warren_share(
&self,
request: CreateShareRequest,
) -> Result<CreateShareResponse, CreateShareError> {
let mut connection = self
.pool
.acquire()
.await
.context("Failed to get a PostgreSQL connection")?;
super::share::create_share(&mut connection, request)
.await
.map(CreateShareResponse::new)
.map_err(Into::into)
}
async fn list_warren_shares(
&self,
request: ListSharesRequest,
) -> Result<ListSharesResponse, ListSharesError> {
let warren = self.fetch_warren((&request).into()).await?;
let mut connection = self
.pool
.acquire()
.await
.context("Failed to get a PostgreSQL connection")?;
let path = request.path().clone();
super::share::list_shares(&mut connection, request)
.await
.map(|shares| ListSharesResponse::new(warren, path, shares))
.map_err(Into::into)
}
async fn delete_warren_share(
&self,
request: DeleteShareRequest,
) -> Result<DeleteShareResponse, DeleteShareError> {
let mut connection = self
.pool
.acquire()
.await
.context("Failed to get a PostgreSQL connection")?;
super::share::delete_share(&mut connection, request)
.await
.map(DeleteShareResponse::new)
.map_err(Into::into)
}
async fn verify_warren_share_password(
&self,
request: VerifySharePasswordRequest,
) -> Result<VerifySharePasswordResponse, VerifySharePasswordError> {
let mut connection = self
.pool
.acquire()
.await
.context("Failed to get a PostgreSQL connection")?;
super::share::verify_password(&mut connection, request)
.await
.map(VerifySharePasswordResponse::new)
.map_err(Into::into)
}
}
impl Postgres {