refactor postgres repo into multiple files
This commit is contained in:
@@ -1,18 +1,13 @@
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::{Context, anyhow};
|
||||
use anyhow::{Context as _, anyhow};
|
||||
use argon2::{
|
||||
Argon2, PasswordHash, PasswordVerifier,
|
||||
Argon2, PasswordHash, PasswordVerifier as _,
|
||||
password_hash::{
|
||||
PasswordHasher, SaltString,
|
||||
PasswordHasher as _, SaltString,
|
||||
rand_core::{OsRng, RngCore as _},
|
||||
},
|
||||
};
|
||||
use chrono::Utc;
|
||||
use sqlx::{
|
||||
ConnectOptions as _, Connection as _, PgConnection, PgPool,
|
||||
postgres::{PgConnectOptions, PgPoolOptions},
|
||||
};
|
||||
use sqlx::{Acquire as _, PgConnection};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::domain::warren::{
|
||||
@@ -39,128 +34,312 @@ use crate::domain::warren::{
|
||||
FetchUserWarrensRequest, ListUserWarrensError, ListUserWarrensRequest,
|
||||
},
|
||||
},
|
||||
warren::{
|
||||
FetchWarrenError, FetchWarrenRequest, FetchWarrensError, FetchWarrensRequest,
|
||||
ListWarrensError, ListWarrensRequest, Warren,
|
||||
warren::ListWarrensRequest,
|
||||
},
|
||||
},
|
||||
ports::{AuthRepository, WarrenRepository, WarrenService},
|
||||
ports::{AuthRepository, WarrenService},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PostgresConfig {
|
||||
database_url: String,
|
||||
database_name: String,
|
||||
}
|
||||
use super::{Postgres, is_not_found_error};
|
||||
|
||||
impl PostgresConfig {
|
||||
pub fn new(database_url: String, database_name: String) -> Self {
|
||||
Self {
|
||||
database_url,
|
||||
database_name,
|
||||
}
|
||||
}
|
||||
}
|
||||
impl AuthRepository for Postgres {
|
||||
async fn create_user(&self, request: CreateUserRequest) -> Result<User, CreateUserError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Postgres {
|
||||
pool: PgPool,
|
||||
let user = self
|
||||
.create_user(
|
||||
&mut connection,
|
||||
request.name(),
|
||||
request.email(),
|
||||
request.password(),
|
||||
request.admin(),
|
||||
)
|
||||
.await
|
||||
.context(format!("Failed to create user"))?;
|
||||
|
||||
Ok(user)
|
||||
}
|
||||
|
||||
async fn edit_user(&self, request: EditUserRequest) -> Result<User, EditUserError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let user = self
|
||||
.edit_user(
|
||||
&mut connection,
|
||||
request.user_id(),
|
||||
request.name(),
|
||||
request.email(),
|
||||
request.password(),
|
||||
request.admin(),
|
||||
)
|
||||
.await
|
||||
.context(format!("Failed to edit user"))?;
|
||||
|
||||
Ok(user)
|
||||
}
|
||||
|
||||
async fn delete_user(&self, request: DeleteUserRequest) -> Result<User, DeleteUserError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
self.delete_user_from_database(&mut connection, request.user_id())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if is_not_found_error(&e) {
|
||||
DeleteUserError::NotFound
|
||||
} else {
|
||||
DeleteUserError::Unknown(anyhow!(e))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn verify_user_password(
|
||||
&self,
|
||||
request: VerifyUserPasswordRequest,
|
||||
) -> Result<User, VerifyUserPasswordError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let user = self
|
||||
.get_user_from_email(&mut connection, request.email())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if is_not_found_error(&e) {
|
||||
VerifyUserPasswordError::NotFound(request.email().clone())
|
||||
} else {
|
||||
VerifyUserPasswordError::Unknown(anyhow!(e))
|
||||
}
|
||||
})?;
|
||||
|
||||
self.check_user_password_against_hash(request.password(), user.password_hash())?;
|
||||
|
||||
Ok(user)
|
||||
}
|
||||
|
||||
async fn create_auth_session(
|
||||
&self,
|
||||
request: CreateAuthSessionRequest,
|
||||
) -> Result<AuthSession, CreateAuthSessionError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let session = self
|
||||
.create_session(&mut connection, request.user(), request.expiration())
|
||||
.await
|
||||
.context("Failed to create session")?;
|
||||
|
||||
Ok(session)
|
||||
}
|
||||
|
||||
async fn fetch_auth_session(
|
||||
&self,
|
||||
request: FetchAuthSessionRequest,
|
||||
) -> Result<FetchAuthSessionResponse, FetchAuthSessionError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let session = self
|
||||
.get_auth_session(&mut connection, request.session_id())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if is_not_found_error(&e) {
|
||||
FetchAuthSessionError::NotFound
|
||||
} else {
|
||||
anyhow!("Failed to get auth session: {e:?}").into()
|
||||
}
|
||||
})?;
|
||||
let user = self
|
||||
.get_user_from_id(&mut connection, session.user_id())
|
||||
.await
|
||||
.context("Failed to get user")?;
|
||||
|
||||
Ok(FetchAuthSessionResponse::new(session, user))
|
||||
}
|
||||
|
||||
async fn create_user_warren(
|
||||
&self,
|
||||
request: CreateUserWarrenRequest,
|
||||
) -> Result<UserWarren, CreateUserWarrenError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let user_warren = self
|
||||
.add_user_to_warren(&mut connection, request.user_warren())
|
||||
.await
|
||||
.context("Failed to create user warren")?;
|
||||
|
||||
Ok(user_warren)
|
||||
}
|
||||
|
||||
async fn edit_user_warren(
|
||||
&self,
|
||||
request: EditUserWarrenRequest,
|
||||
) -> Result<UserWarren, EditUserWarrenError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let user_warren = self
|
||||
.update_user_warren(&mut connection, request.user_warren())
|
||||
.await
|
||||
.context("Failed to edit user warren")?;
|
||||
|
||||
Ok(user_warren)
|
||||
}
|
||||
|
||||
async fn delete_user_warren(
|
||||
&self,
|
||||
request: DeleteUserWarrenRequest,
|
||||
) -> Result<UserWarren, DeleteUserWarrenError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let user_warren = self
|
||||
.remove_user_from_warren(&mut connection, request.user_id(), request.warren_id())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if is_not_found_error(&e) {
|
||||
DeleteUserWarrenError::NotFound
|
||||
} else {
|
||||
anyhow!("Failed to delete user warren: {e:?}").into()
|
||||
}
|
||||
})?;
|
||||
|
||||
Ok(user_warren)
|
||||
}
|
||||
|
||||
async fn fetch_user_warrens(
|
||||
&self,
|
||||
request: FetchUserWarrensRequest,
|
||||
) -> Result<Vec<UserWarren>, FetchUserWarrensError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let user_warrens = self
|
||||
.get_user_warrens(&mut connection, request.user_id())
|
||||
.await
|
||||
.context("Failed to get user warrens")?;
|
||||
|
||||
Ok(user_warrens)
|
||||
}
|
||||
|
||||
async fn list_user_warrens(
|
||||
&self,
|
||||
_request: ListUserWarrensRequest,
|
||||
) -> Result<Vec<UserWarren>, ListUserWarrensError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let user_warrens = self
|
||||
.get_all_user_warrens(&mut connection)
|
||||
.await
|
||||
.context("Failed to get all user warrens")?;
|
||||
|
||||
Ok(user_warrens)
|
||||
}
|
||||
|
||||
async fn fetch_user_warren(
|
||||
&self,
|
||||
request: FetchUserWarrenRequest,
|
||||
) -> Result<UserWarren, FetchUserWarrenError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
self.get_user_warren(&mut connection, request.user_id(), request.warren_id())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if is_not_found_error(&e) {
|
||||
FetchUserWarrenError::NotFound
|
||||
} else {
|
||||
FetchUserWarrenError::Unknown(anyhow!(e))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn list_users(&self, _request: ListUsersRequest) -> Result<Vec<User>, ListUsersError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let users = self
|
||||
.fetch_users(&mut connection)
|
||||
.await
|
||||
.map_err(|e| anyhow!(e))?;
|
||||
|
||||
Ok(users)
|
||||
}
|
||||
|
||||
async fn list_all_users_and_warrens<WS: WarrenService>(
|
||||
&self,
|
||||
_request: ListAllUsersAndWarrensRequest,
|
||||
warren_service: &WS,
|
||||
) -> Result<ListAllUsersAndWarrensResponse, ListAllUsersAndWarrensError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let users = self
|
||||
.fetch_users(&mut connection)
|
||||
.await
|
||||
.context("Failed to fetch all users")?;
|
||||
let user_warrens = self
|
||||
.get_all_user_warrens(&mut connection)
|
||||
.await
|
||||
.context("Failed to fetch all user warrens")?;
|
||||
let warrens = warren_service
|
||||
.list_warrens(ListWarrensRequest::new())
|
||||
.await
|
||||
.context("Failed to get all warrens")?;
|
||||
|
||||
Ok(ListAllUsersAndWarrensResponse::new(
|
||||
users,
|
||||
user_warrens,
|
||||
warrens,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl Postgres {
|
||||
pub async fn new(config: PostgresConfig) -> anyhow::Result<Self> {
|
||||
let opts = PgConnectOptions::from_str(&config.database_url)?.disable_statement_logging();
|
||||
|
||||
let mut connection = PgConnection::connect_with(&opts)
|
||||
.await
|
||||
.context("Failed to connect to the PostgreSQL database")?;
|
||||
|
||||
match sqlx::query("SELECT datname FROM pg_database WHERE datname = $1")
|
||||
.bind(&config.database_name)
|
||||
.fetch_one(&mut connection)
|
||||
.await
|
||||
{
|
||||
Ok(_) => (),
|
||||
Err(sqlx::Error::RowNotFound) => {
|
||||
sqlx::query(&format!("CREATE DATABASE {}", config.database_name))
|
||||
.execute(&mut connection)
|
||||
.await?;
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
connection.close().await?;
|
||||
|
||||
let pool = PgPoolOptions::new()
|
||||
.connect_with(opts.database(&config.database_name))
|
||||
.await?;
|
||||
sqlx::migrate!("./migrations").run(&pool).await?;
|
||||
|
||||
Ok(Self { pool })
|
||||
}
|
||||
|
||||
async fn get_warren(
|
||||
&self,
|
||||
connection: &mut PgConnection,
|
||||
id: &Uuid,
|
||||
) -> Result<Warren, sqlx::Error> {
|
||||
let warren: Warren = sqlx::query_as(
|
||||
"
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
warrens
|
||||
WHERE
|
||||
id = $1
|
||||
",
|
||||
)
|
||||
.bind(id)
|
||||
.fetch_one(connection)
|
||||
.await?;
|
||||
|
||||
Ok(warren)
|
||||
}
|
||||
|
||||
async fn fetch_warrens(
|
||||
&self,
|
||||
connection: &mut PgConnection,
|
||||
ids: &[Uuid],
|
||||
) -> Result<Vec<Warren>, sqlx::Error> {
|
||||
let warrens: Vec<Warren> = sqlx::query_as::<sqlx::Postgres, Warren>(
|
||||
"
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
warrens
|
||||
WHERE
|
||||
id = ANY($1)
|
||||
",
|
||||
)
|
||||
.bind(ids)
|
||||
.fetch_all(&mut *connection)
|
||||
.await?;
|
||||
|
||||
Ok(warrens)
|
||||
}
|
||||
|
||||
async fn fetch_all_warrens(
|
||||
&self,
|
||||
connection: &mut PgConnection,
|
||||
) -> Result<Vec<Warren>, sqlx::Error> {
|
||||
let warrens: Vec<Warren> = sqlx::query_as::<sqlx::Postgres, Warren>(
|
||||
"
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
warrens
|
||||
",
|
||||
)
|
||||
.fetch_all(&mut *connection)
|
||||
.await?;
|
||||
|
||||
Ok(warrens)
|
||||
}
|
||||
|
||||
async fn create_user(
|
||||
&self,
|
||||
connection: &mut PgConnection,
|
||||
@@ -602,369 +781,6 @@ impl Postgres {
|
||||
}
|
||||
}
|
||||
|
||||
impl WarrenRepository for Postgres {
|
||||
async fn fetch_warrens(
|
||||
&self,
|
||||
request: FetchWarrensRequest,
|
||||
) -> Result<Vec<Warren>, FetchWarrensError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let warrens = self
|
||||
.fetch_warrens(&mut connection, request.ids())
|
||||
.await
|
||||
.map_err(|err| anyhow!(err).context("Failed to fetch warrens"))?;
|
||||
|
||||
Ok(warrens)
|
||||
}
|
||||
|
||||
async fn list_warrens(
|
||||
&self,
|
||||
_request: ListWarrensRequest,
|
||||
) -> Result<Vec<Warren>, ListWarrensError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let warrens = self
|
||||
.fetch_all_warrens(&mut connection)
|
||||
.await
|
||||
.map_err(|err| anyhow!(err).context("Failed to list all warrens"))?;
|
||||
|
||||
Ok(warrens)
|
||||
}
|
||||
|
||||
async fn fetch_warren(&self, request: FetchWarrenRequest) -> Result<Warren, FetchWarrenError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let warren = self
|
||||
.get_warren(&mut connection, request.id())
|
||||
.await
|
||||
.map_err(|err| {
|
||||
if is_not_found_error(&err) {
|
||||
return FetchWarrenError::NotFound(request.id().clone());
|
||||
}
|
||||
|
||||
anyhow!(err)
|
||||
.context(format!("Failed to fetch warren with id {:?}", request.id()))
|
||||
.into()
|
||||
})?;
|
||||
|
||||
Ok(warren)
|
||||
}
|
||||
}
|
||||
|
||||
impl AuthRepository for Postgres {
|
||||
async fn create_user(&self, request: CreateUserRequest) -> Result<User, CreateUserError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let user = self
|
||||
.create_user(
|
||||
&mut connection,
|
||||
request.name(),
|
||||
request.email(),
|
||||
request.password(),
|
||||
request.admin(),
|
||||
)
|
||||
.await
|
||||
.context(format!("Failed to create user"))?;
|
||||
|
||||
Ok(user)
|
||||
}
|
||||
|
||||
async fn edit_user(&self, request: EditUserRequest) -> Result<User, EditUserError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let user = self
|
||||
.edit_user(
|
||||
&mut connection,
|
||||
request.user_id(),
|
||||
request.name(),
|
||||
request.email(),
|
||||
request.password(),
|
||||
request.admin(),
|
||||
)
|
||||
.await
|
||||
.context(format!("Failed to edit user"))?;
|
||||
|
||||
Ok(user)
|
||||
}
|
||||
|
||||
async fn delete_user(&self, request: DeleteUserRequest) -> Result<User, DeleteUserError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
self.delete_user_from_database(&mut connection, request.user_id())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if is_not_found_error(&e) {
|
||||
DeleteUserError::NotFound
|
||||
} else {
|
||||
DeleteUserError::Unknown(anyhow!(e))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn verify_user_password(
|
||||
&self,
|
||||
request: VerifyUserPasswordRequest,
|
||||
) -> Result<User, VerifyUserPasswordError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let user = self
|
||||
.get_user_from_email(&mut connection, request.email())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if is_not_found_error(&e) {
|
||||
VerifyUserPasswordError::NotFound(request.email().clone())
|
||||
} else {
|
||||
VerifyUserPasswordError::Unknown(anyhow!(e))
|
||||
}
|
||||
})?;
|
||||
|
||||
self.check_user_password_against_hash(request.password(), user.password_hash())?;
|
||||
|
||||
Ok(user)
|
||||
}
|
||||
|
||||
async fn create_auth_session(
|
||||
&self,
|
||||
request: CreateAuthSessionRequest,
|
||||
) -> Result<AuthSession, CreateAuthSessionError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let session = self
|
||||
.create_session(&mut connection, request.user(), request.expiration())
|
||||
.await
|
||||
.context("Failed to create session")?;
|
||||
|
||||
Ok(session)
|
||||
}
|
||||
|
||||
async fn fetch_auth_session(
|
||||
&self,
|
||||
request: FetchAuthSessionRequest,
|
||||
) -> Result<FetchAuthSessionResponse, FetchAuthSessionError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let session = self
|
||||
.get_auth_session(&mut connection, request.session_id())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if is_not_found_error(&e) {
|
||||
FetchAuthSessionError::NotFound
|
||||
} else {
|
||||
anyhow!("Failed to get auth session: {e:?}").into()
|
||||
}
|
||||
})?;
|
||||
let user = self
|
||||
.get_user_from_id(&mut connection, session.user_id())
|
||||
.await
|
||||
.context("Failed to get user")?;
|
||||
|
||||
Ok(FetchAuthSessionResponse::new(session, user))
|
||||
}
|
||||
|
||||
async fn create_user_warren(
|
||||
&self,
|
||||
request: CreateUserWarrenRequest,
|
||||
) -> Result<UserWarren, CreateUserWarrenError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let user_warren = self
|
||||
.add_user_to_warren(&mut connection, request.user_warren())
|
||||
.await
|
||||
.context("Failed to create user warren")?;
|
||||
|
||||
Ok(user_warren)
|
||||
}
|
||||
|
||||
async fn edit_user_warren(
|
||||
&self,
|
||||
request: EditUserWarrenRequest,
|
||||
) -> Result<UserWarren, EditUserWarrenError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let user_warren = self
|
||||
.update_user_warren(&mut connection, request.user_warren())
|
||||
.await
|
||||
.context("Failed to edit user warren")?;
|
||||
|
||||
Ok(user_warren)
|
||||
}
|
||||
|
||||
async fn delete_user_warren(
|
||||
&self,
|
||||
request: DeleteUserWarrenRequest,
|
||||
) -> Result<UserWarren, DeleteUserWarrenError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let user_warren = self
|
||||
.remove_user_from_warren(&mut connection, request.user_id(), request.warren_id())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if is_not_found_error(&e) {
|
||||
DeleteUserWarrenError::NotFound
|
||||
} else {
|
||||
anyhow!("Failed to delete user warren: {e:?}").into()
|
||||
}
|
||||
})?;
|
||||
|
||||
Ok(user_warren)
|
||||
}
|
||||
|
||||
async fn fetch_user_warrens(
|
||||
&self,
|
||||
request: FetchUserWarrensRequest,
|
||||
) -> Result<Vec<UserWarren>, FetchUserWarrensError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let user_warrens = self
|
||||
.get_user_warrens(&mut connection, request.user_id())
|
||||
.await
|
||||
.context("Failed to get user warrens")?;
|
||||
|
||||
Ok(user_warrens)
|
||||
}
|
||||
|
||||
async fn list_user_warrens(
|
||||
&self,
|
||||
_request: ListUserWarrensRequest,
|
||||
) -> Result<Vec<UserWarren>, ListUserWarrensError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let user_warrens = self
|
||||
.get_all_user_warrens(&mut connection)
|
||||
.await
|
||||
.context("Failed to get all user warrens")?;
|
||||
|
||||
Ok(user_warrens)
|
||||
}
|
||||
|
||||
async fn fetch_user_warren(
|
||||
&self,
|
||||
request: FetchUserWarrenRequest,
|
||||
) -> Result<UserWarren, FetchUserWarrenError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
self.get_user_warren(&mut connection, request.user_id(), request.warren_id())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if is_not_found_error(&e) {
|
||||
FetchUserWarrenError::NotFound
|
||||
} else {
|
||||
FetchUserWarrenError::Unknown(anyhow!(e))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn list_users(&self, _request: ListUsersRequest) -> Result<Vec<User>, ListUsersError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let users = self
|
||||
.fetch_users(&mut connection)
|
||||
.await
|
||||
.map_err(|e| anyhow!(e))?;
|
||||
|
||||
Ok(users)
|
||||
}
|
||||
|
||||
async fn list_all_users_and_warrens<WS: WarrenService>(
|
||||
&self,
|
||||
_request: ListAllUsersAndWarrensRequest,
|
||||
warren_service: &WS,
|
||||
) -> Result<ListAllUsersAndWarrensResponse, ListAllUsersAndWarrensError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let users = self
|
||||
.fetch_users(&mut connection)
|
||||
.await
|
||||
.context("Failed to fetch all users")?;
|
||||
let user_warrens = self
|
||||
.get_all_user_warrens(&mut connection)
|
||||
.await
|
||||
.context("Failed to fetch all user warrens")?;
|
||||
let warrens = warren_service
|
||||
.list_warrens(ListWarrensRequest::new())
|
||||
.await
|
||||
.context("Failed to get all warrens")?;
|
||||
|
||||
Ok(ListAllUsersAndWarrensResponse::new(
|
||||
users,
|
||||
user_warrens,
|
||||
warrens,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
fn is_not_found_error(err: &sqlx::Error) -> bool {
|
||||
matches!(err, sqlx::Error::RowNotFound)
|
||||
}
|
||||
|
||||
fn hash_password(password: &UserPassword) -> Result<String, argon2::password_hash::Error> {
|
||||
let salt = SaltString::generate(&mut OsRng);
|
||||
let argon2 = Argon2::default();
|
||||
67
backend/src/lib/outbound/postgres/mod.rs
Normal file
67
backend/src/lib/outbound/postgres/mod.rs
Normal file
@@ -0,0 +1,67 @@
|
||||
use std::str::FromStr as _;
|
||||
|
||||
use anyhow::Context as _;
|
||||
use sqlx::{
|
||||
ConnectOptions as _, Connection as _, PgConnection, PgPool,
|
||||
postgres::{PgConnectOptions, PgPoolOptions},
|
||||
};
|
||||
|
||||
pub mod auth;
|
||||
pub mod warrens;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PostgresConfig {
|
||||
database_url: String,
|
||||
database_name: String,
|
||||
}
|
||||
|
||||
impl PostgresConfig {
|
||||
pub fn new(database_url: String, database_name: String) -> Self {
|
||||
Self {
|
||||
database_url,
|
||||
database_name,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Postgres {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl Postgres {
|
||||
pub async fn new(config: PostgresConfig) -> anyhow::Result<Self> {
|
||||
let opts = PgConnectOptions::from_str(&config.database_url)?.disable_statement_logging();
|
||||
|
||||
let mut connection = PgConnection::connect_with(&opts)
|
||||
.await
|
||||
.context("Failed to connect to the PostgreSQL database")?;
|
||||
|
||||
match sqlx::query("SELECT datname FROM pg_database WHERE datname = $1")
|
||||
.bind(&config.database_name)
|
||||
.fetch_one(&mut connection)
|
||||
.await
|
||||
{
|
||||
Ok(_) => (),
|
||||
Err(sqlx::Error::RowNotFound) => {
|
||||
sqlx::query(&format!("CREATE DATABASE {}", config.database_name))
|
||||
.execute(&mut connection)
|
||||
.await?;
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
connection.close().await?;
|
||||
|
||||
let pool = PgPoolOptions::new()
|
||||
.connect_with(opts.database(&config.database_name))
|
||||
.await?;
|
||||
sqlx::migrate!("./migrations").run(&pool).await?;
|
||||
|
||||
Ok(Self { pool })
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn is_not_found_error(err: &sqlx::Error) -> bool {
|
||||
matches!(err, sqlx::Error::RowNotFound)
|
||||
}
|
||||
138
backend/src/lib/outbound/postgres/warrens.rs
Normal file
138
backend/src/lib/outbound/postgres/warrens.rs
Normal file
@@ -0,0 +1,138 @@
|
||||
use anyhow::{Context as _, anyhow};
|
||||
use sqlx::PgConnection;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::domain::warren::{
|
||||
models::warren::{
|
||||
FetchWarrenError, FetchWarrenRequest, FetchWarrensError, FetchWarrensRequest,
|
||||
ListWarrensError, ListWarrensRequest, Warren,
|
||||
},
|
||||
ports::WarrenRepository,
|
||||
};
|
||||
|
||||
use super::{Postgres, is_not_found_error};
|
||||
|
||||
impl WarrenRepository for Postgres {
|
||||
async fn fetch_warrens(
|
||||
&self,
|
||||
request: FetchWarrensRequest,
|
||||
) -> Result<Vec<Warren>, FetchWarrensError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let warrens = self
|
||||
.fetch_warrens(&mut connection, request.ids())
|
||||
.await
|
||||
.map_err(|err| anyhow!(err).context("Failed to fetch warrens"))?;
|
||||
|
||||
Ok(warrens)
|
||||
}
|
||||
|
||||
async fn list_warrens(
|
||||
&self,
|
||||
_request: ListWarrensRequest,
|
||||
) -> Result<Vec<Warren>, ListWarrensError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let warrens = self
|
||||
.fetch_all_warrens(&mut connection)
|
||||
.await
|
||||
.map_err(|err| anyhow!(err).context("Failed to list all warrens"))?;
|
||||
|
||||
Ok(warrens)
|
||||
}
|
||||
|
||||
async fn fetch_warren(&self, request: FetchWarrenRequest) -> Result<Warren, FetchWarrenError> {
|
||||
let mut connection = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.context("Failed to get a PostgreSQL connection")?;
|
||||
|
||||
let warren = self
|
||||
.get_warren(&mut connection, request.id())
|
||||
.await
|
||||
.map_err(|err| {
|
||||
if is_not_found_error(&err) {
|
||||
return FetchWarrenError::NotFound(request.id().clone());
|
||||
}
|
||||
|
||||
anyhow!(err)
|
||||
.context(format!("Failed to fetch warren with id {:?}", request.id()))
|
||||
.into()
|
||||
})?;
|
||||
|
||||
Ok(warren)
|
||||
}
|
||||
}
|
||||
|
||||
impl Postgres {
|
||||
async fn get_warren(
|
||||
&self,
|
||||
connection: &mut PgConnection,
|
||||
id: &Uuid,
|
||||
) -> Result<Warren, sqlx::Error> {
|
||||
let warren: Warren = sqlx::query_as(
|
||||
"
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
warrens
|
||||
WHERE
|
||||
id = $1
|
||||
",
|
||||
)
|
||||
.bind(id)
|
||||
.fetch_one(connection)
|
||||
.await?;
|
||||
|
||||
Ok(warren)
|
||||
}
|
||||
|
||||
async fn fetch_warrens(
|
||||
&self,
|
||||
connection: &mut PgConnection,
|
||||
ids: &[Uuid],
|
||||
) -> Result<Vec<Warren>, sqlx::Error> {
|
||||
let warrens: Vec<Warren> = sqlx::query_as::<sqlx::Postgres, Warren>(
|
||||
"
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
warrens
|
||||
WHERE
|
||||
id = ANY($1)
|
||||
",
|
||||
)
|
||||
.bind(ids)
|
||||
.fetch_all(&mut *connection)
|
||||
.await?;
|
||||
|
||||
Ok(warrens)
|
||||
}
|
||||
|
||||
async fn fetch_all_warrens(
|
||||
&self,
|
||||
connection: &mut PgConnection,
|
||||
) -> Result<Vec<Warren>, sqlx::Error> {
|
||||
let warrens: Vec<Warren> = sqlx::query_as::<sqlx::Postgres, Warren>(
|
||||
"
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
warrens
|
||||
",
|
||||
)
|
||||
.fetch_all(&mut *connection)
|
||||
.await?;
|
||||
|
||||
Ok(warrens)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user