migrate to sqlite

NOTE: extension loading crashes docker (for some reason)
This commit is contained in:
2025-09-07 15:09:14 +02:00
parent 5c3057e998
commit a1c9832515
32 changed files with 536 additions and 256 deletions

View File

@@ -2,4 +2,4 @@ pub mod file_system;
pub mod metrics_debug_logger;
pub mod notifier_debug_logger;
pub mod oidc;
pub mod postgres;
pub mod sqlite;

View File

@@ -1,95 +0,0 @@
use std::{str::FromStr as _, time::Duration};
use anyhow::Context as _;
use sqlx::{
ConnectOptions as _, Connection as _, PgConnection, PgPool,
postgres::{PgConnectOptions, PgPoolOptions},
};
use tokio::task::JoinHandle;
pub mod auth;
pub mod share;
pub mod warrens;
#[derive(Debug, Clone)]
pub struct PostgresConfig {
database_url: String,
database_name: String,
}
impl PostgresConfig {
pub fn new(database_url: String, database_name: String) -> Self {
Self {
database_url,
database_name,
}
}
}
#[derive(Debug, Clone)]
pub struct Postgres {
pool: PgPool,
}
impl Postgres {
pub async fn new(config: PostgresConfig) -> anyhow::Result<Self> {
let opts = PgConnectOptions::from_str(&config.database_url)?.disable_statement_logging();
let mut connection = PgConnection::connect_with(&opts)
.await
.context("Failed to connect to the PostgreSQL database")?;
match sqlx::query("SELECT datname FROM pg_database WHERE datname = $1")
.bind(&config.database_name)
.fetch_one(&mut connection)
.await
{
Ok(_) => (),
Err(sqlx::Error::RowNotFound) => {
sqlx::query(&format!("CREATE DATABASE {}", config.database_name))
.execute(&mut connection)
.await?;
}
Err(e) => return Err(e.into()),
};
connection.close().await?;
let pool = PgPoolOptions::new()
.connect_with(opts.database(&config.database_name))
.await?;
sqlx::migrate!("./migrations").run(&pool).await?;
// 3600 seconds = 1 hour
Self::start_cleanup_tasks(pool.clone(), Duration::from_secs(3600));
Ok(Self { pool })
}
pub(super) fn start_cleanup_tasks(pool: PgPool, interval: Duration) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
{
let Ok(mut connection) = pool.acquire().await else {
break;
};
if let Ok(count) = Self::delete_expired_auth_sessions(&mut connection).await {
tracing::debug!("Removed {count} expired auth session(s)");
}
if let Ok(count) = Self::delete_expired_shares(&mut connection).await {
tracing::debug!("Deleted {count} expired share(s)");
}
}
tokio::time::sleep(interval).await;
}
tracing::debug!("Session cleanup task stopped");
})
}
}
pub(super) fn is_not_found_error(err: &sqlx::Error) -> bool {
matches!(err, sqlx::Error::RowNotFound)
}

View File

@@ -7,7 +7,7 @@ use argon2::{
},
};
use chrono::Utc;
use sqlx::{Acquire as _, PgConnection};
use sqlx::{Acquire as _, SqliteConnection};
use uuid::Uuid;
use crate::domain::warren::{
@@ -40,9 +40,9 @@ use crate::domain::warren::{
ports::{AuthRepository, WarrenService},
};
use super::{Postgres, is_not_found_error};
use super::{Sqlite, is_not_found_error};
impl AuthRepository for Postgres {
impl AuthRepository for Sqlite {
async fn create_user(&self, request: CreateUserRequest) -> Result<User, CreateUserError> {
let mut connection = self
.pool
@@ -368,9 +368,9 @@ impl AuthRepository for Postgres {
}
}
impl Postgres {
impl Sqlite {
pub(super) async fn delete_expired_auth_sessions(
connection: &mut PgConnection,
connection: &mut SqliteConnection,
) -> Result<u64, sqlx::Error> {
let delete_count = sqlx::query(
"
@@ -389,7 +389,7 @@ impl Postgres {
async fn create_user(
&self,
connection: &mut PgConnection,
connection: &mut SqliteConnection,
name: &UserName,
email: &UserEmail,
password: &UserPassword,
@@ -431,7 +431,7 @@ impl Postgres {
async fn create_or_update_user(
&self,
connection: &mut PgConnection,
connection: &mut SqliteConnection,
sub: &String,
name: &UserName,
email: &UserEmail,
@@ -546,7 +546,7 @@ impl Postgres {
async fn edit_user(
&self,
connection: &mut PgConnection,
connection: &mut SqliteConnection,
id: &Uuid,
name: &UserName,
email: &UserEmail,
@@ -592,7 +592,7 @@ impl Postgres {
async fn delete_user_sessions(
&self,
connection: &mut PgConnection,
connection: &mut SqliteConnection,
user_id: &Uuid,
) -> Result<u64, sqlx::Error> {
let rows_affected = sqlx::query(
@@ -613,7 +613,7 @@ impl Postgres {
async fn delete_user_from_database(
&self,
connection: &mut PgConnection,
connection: &mut SqliteConnection,
user_id: &Uuid,
) -> Result<User, sqlx::Error> {
let user: User = sqlx::query_as(
@@ -635,7 +635,7 @@ impl Postgres {
async fn get_user_from_id(
&self,
connection: &mut PgConnection,
connection: &mut SqliteConnection,
id: &Uuid,
) -> Result<User, sqlx::Error> {
let user: User = sqlx::query_as(
@@ -657,7 +657,7 @@ impl Postgres {
async fn get_user_from_email(
&self,
connection: &mut PgConnection,
connection: &mut SqliteConnection,
email: &UserEmail,
) -> Result<User, sqlx::Error> {
let user: User = sqlx::query_as(
@@ -698,7 +698,7 @@ impl Postgres {
async fn create_session(
&self,
connection: &mut PgConnection,
connection: &mut SqliteConnection,
user: &User,
expiration: &SessionExpirationTime,
) -> anyhow::Result<AuthSession> {
@@ -721,7 +721,7 @@ impl Postgres {
) VALUES (
$1,
$2,
TO_TIMESTAMP($3::double precision / 1000)
datetime($3, 'unixepoch')
)
RETURNING
*
@@ -729,7 +729,7 @@ impl Postgres {
)
.bind(session_id)
.bind(user.id())
.bind(expiration_time)
.bind(expiration_time / 1000)
.fetch_one(&mut *tx)
.await?;
@@ -740,7 +740,7 @@ impl Postgres {
async fn get_auth_session(
&self,
connection: &mut PgConnection,
connection: &mut SqliteConnection,
session_id: &AuthSessionId,
) -> Result<AuthSession, sqlx::Error> {
let session: AuthSession = sqlx::query_as(
@@ -762,7 +762,7 @@ impl Postgres {
async fn get_user_warrens(
&self,
connection: &mut PgConnection,
connection: &mut SqliteConnection,
user_id: &Uuid,
) -> Result<Vec<UserWarren>, sqlx::Error> {
let user_warrens: Vec<UserWarren> = sqlx::query_as(
@@ -784,7 +784,7 @@ impl Postgres {
async fn get_all_user_warrens(
&self,
connection: &mut PgConnection,
connection: &mut SqliteConnection,
) -> Result<Vec<UserWarren>, sqlx::Error> {
let user_warrens: Vec<UserWarren> = sqlx::query_as(
"
@@ -802,7 +802,7 @@ impl Postgres {
async fn get_user_warren(
&self,
connection: &mut PgConnection,
connection: &mut SqliteConnection,
user_id: &Uuid,
warren_id: &Uuid,
) -> Result<UserWarren, sqlx::Error> {
@@ -825,7 +825,10 @@ impl Postgres {
Ok(ids)
}
async fn fetch_users(&self, connection: &mut PgConnection) -> Result<Vec<User>, sqlx::Error> {
async fn fetch_users(
&self,
connection: &mut SqliteConnection,
) -> Result<Vec<User>, sqlx::Error> {
let users: Vec<User> = sqlx::query_as(
"
SELECT
@@ -844,7 +847,7 @@ impl Postgres {
async fn add_user_to_warren(
&self,
connection: &mut PgConnection,
connection: &mut SqliteConnection,
user_warren: &UserWarren,
) -> Result<UserWarren, sqlx::Error> {
let user_warren: UserWarren = sqlx::query_as(
@@ -855,14 +858,22 @@ impl Postgres {
can_list_files,
can_read_files,
can_modify_files,
can_delete_files
can_delete_files,
can_list_shares,
can_create_shares,
can_modify_shares,
can_delete_shares
) VALUES (
$1,
$2,
$3,
$4,
$5,
$6
$6,
$7,
$8,
$9,
$10
)
RETURNING
*
@@ -874,6 +885,10 @@ impl Postgres {
.bind(user_warren.can_read_files())
.bind(user_warren.can_modify_files())
.bind(user_warren.can_delete_files())
.bind(user_warren.can_list_shares())
.bind(user_warren.can_create_shares())
.bind(user_warren.can_modify_shares())
.bind(user_warren.can_delete_shares())
.fetch_one(connection)
.await?;
@@ -882,7 +897,7 @@ impl Postgres {
async fn update_user_warren(
&self,
connection: &mut PgConnection,
connection: &mut SqliteConnection,
user_warren: &UserWarren,
) -> Result<UserWarren, sqlx::Error> {
let user_warren: UserWarren = sqlx::query_as(
@@ -923,7 +938,7 @@ impl Postgres {
async fn remove_user_from_warren(
&self,
connection: &mut PgConnection,
connection: &mut SqliteConnection,
user_id: &Uuid,
warren_id: &Uuid,
) -> Result<UserWarren, sqlx::Error> {

View File

@@ -0,0 +1,74 @@
use std::{str::FromStr as _, time::Duration};
use sqlx::{
ConnectOptions as _, SqlitePool,
sqlite::{SqliteConnectOptions, SqlitePoolOptions},
};
use tokio::task::JoinHandle;
pub mod auth;
pub mod share;
pub mod warrens;
#[derive(Debug, Clone)]
pub struct SqliteConfig {
database_url: String,
}
impl SqliteConfig {
pub fn new(database_url: String) -> Self {
Self { database_url }
}
}
#[derive(Debug, Clone)]
pub struct Sqlite {
pool: SqlitePool,
}
impl Sqlite {
pub async fn new(config: SqliteConfig) -> anyhow::Result<Self> {
let opts = SqliteConnectOptions::from_str(&config.database_url)?
.create_if_missing(true)
.extension_with_entrypoint(
"/var/lib/warren/sqlite_extensions/uuid",
"sqlite3_uuid_init",
)
.disable_statement_logging();
let pool = SqlitePoolOptions::new().connect_with(opts).await?;
sqlx::migrate!("./migrations").run(&pool).await?;
// 3600 seconds = 1 hour
Self::start_cleanup_tasks(pool.clone(), Duration::from_secs(3600));
Ok(Self { pool })
}
pub(super) fn start_cleanup_tasks(pool: SqlitePool, interval: Duration) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
{
let Ok(mut connection) = pool.acquire().await else {
break;
};
if let Ok(count) = Self::delete_expired_auth_sessions(&mut connection).await {
tracing::debug!("Removed {count} expired auth session(s)");
}
if let Ok(count) = Self::delete_expired_shares(&mut connection).await {
tracing::debug!("Deleted {count} expired share(s)");
}
}
tokio::time::sleep(interval).await;
}
tracing::debug!("Session cleanup task stopped");
})
}
}
pub(super) fn is_not_found_error(err: &sqlx::Error) -> bool {
matches!(err, sqlx::Error::RowNotFound)
}

View File

@@ -1,10 +1,10 @@
use anyhow::anyhow;
use argon2::{
Argon2, PasswordHash, PasswordVerifier as _,
password_hash::{PasswordHasher as _, SaltString, rand_core::OsRng},
password_hash::{PasswordHasher as _, SaltString},
};
use chrono::{NaiveDateTime, Utc};
use sqlx::{Acquire as _, PgConnection};
use sqlx::{Acquire as _, SqliteConnection};
use thiserror::Error;
use uuid::Uuid;
@@ -17,7 +17,7 @@ use crate::domain::warren::models::{
warren::HasWarrenId as _,
};
use super::{Postgres, is_not_found_error};
use super::{Sqlite, is_not_found_error};
#[derive(sqlx::FromRow)]
struct ShareRow {
@@ -62,7 +62,7 @@ impl TryFrom<ShareRow> for Share {
}
pub(super) async fn get_share(
connection: &mut PgConnection,
connection: &mut SqliteConnection,
request: GetShareRequest,
) -> anyhow::Result<Share> {
let share_row: ShareRow = sqlx::query_as(
@@ -90,7 +90,7 @@ pub(super) async fn get_share(
}
pub(super) async fn list_shares(
connection: &mut PgConnection,
connection: &mut SqliteConnection,
request: ListSharesRequest,
) -> anyhow::Result<Vec<Share>> {
let share_rows: Vec<ShareRow> = sqlx::query_as(
@@ -126,13 +126,13 @@ pub(super) async fn list_shares(
}
pub(super) async fn create_share(
connection: &mut PgConnection,
connection: &mut SqliteConnection,
request: CreateShareRequest,
) -> anyhow::Result<Share> {
let mut tx = connection.begin().await?;
let password_hash = if let Some(password) = request.base().password() {
let salt = SaltString::generate(&mut OsRng);
let salt = SaltString::generate(&mut argon2::password_hash::rand_core::OsRng);
let argon2 = Argon2::default();
Some(
@@ -164,7 +164,7 @@ pub(super) async fn create_share(
$2,
$3,
$4,
TO_TIMESTAMP($5::double precision / 1000)
datetime($5, 'unixepoch')
)
RETURNING
*
@@ -174,7 +174,7 @@ pub(super) async fn create_share(
.bind(request.warren_id())
.bind(request.base().path())
.bind(password_hash)
.bind(expires_at)
.bind(expires_at.map(|v| v / 1000))
.fetch_one(&mut *tx)
.await?;
@@ -184,7 +184,7 @@ pub(super) async fn create_share(
}
pub(super) async fn delete_share(
connection: &mut PgConnection,
connection: &mut SqliteConnection,
request: DeleteShareRequest,
) -> anyhow::Result<Share> {
let mut tx = connection.begin().await?;
@@ -209,7 +209,7 @@ pub(super) async fn delete_share(
}
pub(super) async fn verify_password(
connection: &mut PgConnection,
connection: &mut SqliteConnection,
request: VerifySharePasswordRequest,
) -> Result<Share, VerifySharePasswordError> {
let share_row: ShareRow = sqlx::query_as(
@@ -264,9 +264,9 @@ pub(super) async fn verify_password(
}
}
impl Postgres {
impl Sqlite {
pub(super) async fn delete_expired_shares(
connection: &mut PgConnection,
connection: &mut SqliteConnection,
) -> Result<u64, sqlx::Error> {
let delete_count = sqlx::query(
"

View File

@@ -1,5 +1,5 @@
use anyhow::{Context as _, anyhow};
use sqlx::{Acquire as _, PgConnection};
use sqlx::{Acquire as _, SqliteConnection};
use uuid::Uuid;
use crate::domain::warren::{
@@ -21,9 +21,9 @@ use crate::domain::warren::{
ports::WarrenRepository,
};
use super::{Postgres, is_not_found_error};
use super::{Sqlite, is_not_found_error};
impl WarrenRepository for Postgres {
impl WarrenRepository for Sqlite {
async fn create_warren(
&self,
request: CreateWarrenRequest,
@@ -220,10 +220,10 @@ impl WarrenRepository for Postgres {
}
}
impl Postgres {
impl Sqlite {
async fn create_warren(
&self,
connection: &mut PgConnection,
connection: &mut SqliteConnection,
name: &WarrenName,
path: &AbsoluteFilePath,
) -> Result<Warren, sqlx::Error> {
@@ -254,7 +254,7 @@ impl Postgres {
async fn edit_warren(
&self,
connection: &mut PgConnection,
connection: &mut SqliteConnection,
id: &Uuid,
name: &WarrenName,
path: &AbsoluteFilePath,
@@ -287,7 +287,7 @@ impl Postgres {
async fn delete_warren(
&self,
connection: &mut PgConnection,
connection: &mut SqliteConnection,
id: &Uuid,
) -> Result<Warren, sqlx::Error> {
let mut tx = connection.begin().await?;
@@ -313,7 +313,7 @@ impl Postgres {
async fn get_warren(
&self,
connection: &mut PgConnection,
connection: &mut SqliteConnection,
id: &Uuid,
) -> Result<Warren, sqlx::Error> {
let warren: Warren = sqlx::query_as(
@@ -335,20 +335,28 @@ impl Postgres {
async fn fetch_warrens(
&self,
connection: &mut PgConnection,
connection: &mut SqliteConnection,
ids: &[Uuid],
) -> Result<Vec<Warren>, sqlx::Error> {
let warrens: Vec<Warren> = sqlx::query_as::<sqlx::Postgres, Warren>(
let mut ids_as_string = ids.into_iter().fold(String::new(), |mut acc, id| {
let encoded = hex::encode(id.as_bytes());
acc.push_str("x'");
acc.push_str(encoded.as_str());
acc.push_str("',");
acc
});
ids_as_string.pop();
let warrens: Vec<Warren> = sqlx::query_as::<sqlx::Sqlite, Warren>(&format!(
"
SELECT
*
FROM
warrens
WHERE
id = ANY($1)
id IN ({ids_as_string})
",
)
.bind(ids)
))
.fetch_all(&mut *connection)
.await?;
@@ -357,9 +365,9 @@ impl Postgres {
async fn fetch_all_warrens(
&self,
connection: &mut PgConnection,
connection: &mut SqliteConnection,
) -> Result<Vec<Warren>, sqlx::Error> {
let warrens: Vec<Warren> = sqlx::query_as::<sqlx::Postgres, Warren>(
let warrens: Vec<Warren> = sqlx::query_as::<sqlx::Sqlite, Warren>(
"
SELECT
*