From 634c147ee99d6303ebcc6797669ea21363236103 Mon Sep 17 00:00:00 2001 From: 409 Date: Fri, 29 Nov 2024 03:21:48 +0100 Subject: [PATCH] fix: player deadlock related to `start_watching` --- Cargo.lock | 95 +++++++++--------- Cargo.toml | 4 +- src/database/artists.rs | 10 +- src/database/mod.rs | 28 +++--- src/database/paths.rs | 31 ++++-- src/database/tracks.rs | 33 ++++--- src/library.rs | 17 ++-- src/main.rs | 12 +-- src/music/player.rs | 209 +++++++++++++++++----------------------- src/player.rs | 73 ++++++-------- src/settings.rs | 33 ++----- 11 files changed, 254 insertions(+), 291 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 531cb0e..48faa84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -512,6 +512,46 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c87e182de0887fd5361989c677c4e8f5000cd9491d6d563161a8f3a5519fc7f" +[[package]] +name = "deadpool" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6541a3916932fe57768d4be0b1ffb5ec7cbf74ca8c903fdfd5c0fe8aa958f0ed" +dependencies = [ + "deadpool-runtime", + "num_cpus", + "tokio", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" +dependencies = [ + "tokio", +] + +[[package]] +name = "deadpool-sqlite" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "656f14fc1ab819c65f332045ea7cb38841bbe551f3b2bc7e3abefb559af4155c" +dependencies = [ + "deadpool", + "deadpool-sync", + "rusqlite", +] + +[[package]] +name = "deadpool-sync" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524bc3df0d57e98ecd022e21ba31166c2625e7d3e5bcc4510efaeeab4abcab04" +dependencies = [ + "deadpool-runtime", +] + [[package]] name = "dotenvy" version = "0.15.7" @@ -706,13 +746,13 @@ version = "0.1.0" dependencies = [ "axum", "blake3", + "deadpool", + "deadpool-sqlite", "dotenvy", "hex", "home", "image", "prost", - "r2d2", - "r2d2_sqlite", "rayon", "rodio", "rusqlite", @@ -1347,6 +1387,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "num_enum" version = "0.7.3" @@ -1644,28 +1694,6 @@ dependencies = [ "proc-macro2", ] -[[package]] -name = "r2d2" -version = "0.8.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" -dependencies = [ - "log", - "parking_lot", - "scheduled-thread-pool", -] - -[[package]] -name = "r2d2_sqlite" -version = "0.25.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb14dba8247a6a15b7fdbc7d389e2e6f03ee9f184f87117706d509c092dfe846" -dependencies = [ - "r2d2", - "rusqlite", - "uuid", -] - [[package]] name = "rand" version = "0.8.5" @@ -1883,15 +1911,6 @@ dependencies = [ "winapi-util", ] -[[package]] -name = "scheduled-thread-pool" -version = "0.2.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" -dependencies = [ - "parking_lot", -] - [[package]] name = "scopeguard" version = "1.2.0" @@ -2556,16 +2575,6 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83" -[[package]] -name = "uuid" -version = "1.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" -dependencies = [ - "getrandom", - "rand", -] - [[package]] name = "v_frame" version = "0.3.8" diff --git a/Cargo.toml b/Cargo.toml index fdd70f5..c3685af 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,13 +6,13 @@ edition = "2021" [dependencies] axum = "0.7.9" blake3 = "1.5.4" +deadpool = "0.12.1" +deadpool-sqlite = "0.9.0" dotenvy = "0.15.7" hex = "0.4.3" home = "0.5.9" image = "0.25.5" prost = "0.13.3" -r2d2 = "0.8.10" -r2d2_sqlite = { version = "0.25.0", features = ["bundled"] } rayon = "1.10.0" rodio = "0.20.1" rusqlite = { version = "0.32.1", features = ["bundled"] } diff --git a/src/database/artists.rs b/src/database/artists.rs index cacb030..c81c82b 100644 --- a/src/database/artists.rs +++ b/src/database/artists.rs @@ -1,5 +1,4 @@ -use r2d2::PooledConnection; -use r2d2_sqlite::SqliteConnectionManager; +use deadpool_sqlite::Pool; use rusqlite::Row; #[derive(Debug)] @@ -15,9 +14,12 @@ fn map_artist(row: &Row) -> Result { }) } -pub fn get_artists( - connection: &PooledConnection, +pub async fn get_artists( + pool: &Pool, ) -> Result, rusqlite::Error> { + let manager = pool.get().await.unwrap(); + let connection = manager.lock().unwrap(); + let mut statement = connection.prepare("SELECT id, name FROM artists")?; let rows = statement.query_map([], map_artist)?; diff --git a/src/database/mod.rs b/src/database/mod.rs index 04884bc..a8cbbb9 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1,25 +1,27 @@ +use deadpool_sqlite::{Config, Pool}; + pub mod artists; pub mod paths; pub mod tracks; -use r2d2::{Pool, PooledConnection}; -use r2d2_sqlite::SqliteConnectionManager; - -pub fn establish_connection() -> Pool { +pub fn establish_connection() -> Pool { dotenvy::dotenv().ok(); let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"); - let manager = SqliteConnectionManager::file(database_url); - let pool = Pool::new(manager).expect("Error creating SQLite pool"); + let cfg = Config::new(database_url); + let pool = cfg.create_pool(deadpool::Runtime::Tokio1).expect("Error creating SQLite pool"); pool } -pub fn initialize_database( - connection: &PooledConnection, -) -> Result<(), r2d2_sqlite::rusqlite::Error> { - connection.execute( +pub async fn initialize_database( + pool: &Pool, +) -> Result<(), rusqlite::Error> { + let manager = pool.get().await.unwrap(); + let conn = manager.lock().unwrap(); + + conn.execute( " CREATE TABLE IF NOT EXISTS library_paths ( id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, @@ -29,7 +31,7 @@ pub fn initialize_database( [], )?; - connection.execute( + conn.execute( "CREATE TABLE IF NOT EXISTS artists ( id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, name TEXT NOT NULL @@ -37,7 +39,7 @@ pub fn initialize_database( [], )?; - connection.execute( + conn.execute( "CREATE TABLE IF NOT EXISTS albums ( name TEXT NOT NULL, artist_id INTEGER NOT NULL, @@ -47,7 +49,7 @@ pub fn initialize_database( [], )?; - connection.execute( + conn.execute( " CREATE TABLE IF NOT EXISTS tracks ( hash TEXT PRIMARY KEY NOT NULL, diff --git a/src/database/paths.rs b/src/database/paths.rs index e448e11..c081f18 100644 --- a/src/database/paths.rs +++ b/src/database/paths.rs @@ -1,5 +1,4 @@ -use r2d2::PooledConnection; -use r2d2_sqlite::SqliteConnectionManager; +use deadpool_sqlite::Pool; use rusqlite::Row; #[derive(Debug)] @@ -17,9 +16,12 @@ fn map_library_path(row: &Row) -> Result { }) } -pub fn get_library_paths( - connection: &PooledConnection, +pub async fn get_library_paths( + pool: &Pool, ) -> Result, rusqlite::Error> { + let manager = pool.get().await.unwrap(); + let connection = manager.lock().unwrap(); + let mut statement = connection.prepare("SELECT id, path FROM library_paths")?; let rows = statement.query_map([], map_library_path)?; @@ -34,10 +36,13 @@ pub fn get_library_paths( Ok(paths) } -pub fn get_library_path( - connection: &PooledConnection, +pub async fn get_library_path( + pool: &Pool, id: u64, ) -> Result { + let manager = pool.get().await.unwrap(); + let connection = manager.lock().unwrap(); + Ok(connection.query_row( "SELECT id, path FROM library_paths WHERE id = ?1", [id], @@ -45,19 +50,25 @@ pub fn get_library_path( )?) } -pub fn insert_library_path( - connection: &PooledConnection, +pub async fn insert_library_path( + pool: &Pool, path: LibraryPathInsertData, ) -> Result { + let manager = pool.get().await.unwrap(); + let connection = manager.lock().unwrap(); + let result = connection.execute("INSERT INTO library_paths (path) VALUES (?1)", [path])?; Ok(result > 0) } -pub fn delete_library_path( - connection: &PooledConnection, +pub async fn delete_library_path( + pool: &Pool, path_id: u64, ) -> Result { + let manager = pool.get().await.unwrap(); + let connection = manager.lock().unwrap(); + let result = connection.execute("DELETE FROM library_paths WHERE id = ?1", [path_id])?; Ok(result > 0) diff --git a/src/database/tracks.rs b/src/database/tracks.rs index dd44578..d8c363e 100644 --- a/src/database/tracks.rs +++ b/src/database/tracks.rs @@ -3,8 +3,7 @@ use std::{ path::{Path, PathBuf}, }; -use r2d2::PooledConnection; -use r2d2_sqlite::SqliteConnectionManager; +use deadpool_sqlite::Pool; use rusqlite::{params, Row}; use crate::{ @@ -58,9 +57,12 @@ fn map_track(row: &Row) -> Result { }) } -pub fn get_tracks( - connection: &PooledConnection, +pub async fn get_tracks( + pool: &Pool, ) -> Result, rusqlite::Error> { + let manager = pool.get().await.unwrap(); + let connection = manager.lock().unwrap(); + let mut statement = connection.prepare("SELECT t.hash, t.name, t.duration, t.artist_id, a.name AS artist_name FROM tracks t INNER JOIN artists a ON a.id = t.artist_id")?; let rows = statement.query_map([], map_track)?; @@ -75,17 +77,23 @@ pub fn get_tracks( Ok(tracks) } -pub fn get_track( - connection: &PooledConnection, +pub async fn get_track( + pool: &Pool, hash: &str, ) -> Result { + let manager = pool.get().await.unwrap(); + let connection = &manager.lock().unwrap(); + connection.query_row("SELECT t.hash, t.name, t.duration, t.artist_id, a.name AS artist_name FROM tracks t INNER JOIN artists a ON a.id = t.artist_id WHERE t.hash = ?1", [hash], map_track) } -pub fn get_track_full_path( - connection: &PooledConnection, +pub async fn get_track_full_path( + pool: &Pool, hash: &str, ) -> Result { + let manager = pool.get().await.unwrap(); + let connection = &manager.lock().unwrap(); + let (relative_path, library_path): (String, String) = connection.query_row( "SELECT t.path, l.path AS library_root FROM tracks t INNER JOIN library_paths l ON t.library_path_id = l.id WHERE t.hash = ?1", [hash], @@ -98,14 +106,14 @@ pub fn get_track_full_path( Ok(path) } -pub fn insert_tracks( - mut connection: PooledConnection, +pub async fn insert_tracks( + pool: &Pool, tracks: HashMap, library_path_id: u64, ) -> Result<(), rusqlite::Error> { let existing_covers = get_all_cover_hashes(); - let artists = get_artists(&connection)?; + let artists = get_artists(pool).await?; let mut artist_names_to_id: HashMap = HashMap::new(); for artist in artists { @@ -124,6 +132,9 @@ pub fn insert_tracks( new_artists.push(meta.artist_name.clone()); } + let manager = pool.get().await.unwrap(); + let mut connection = manager.lock().unwrap(); + // BEGIN TRANSACTION let tx = connection.transaction()?; diff --git a/src/library.rs b/src/library.rs index 72b14d7..a1fe446 100644 --- a/src/library.rs +++ b/src/library.rs @@ -1,8 +1,7 @@ use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Instant}; +use deadpool_sqlite::Pool; use tokio::{sync::Mutex, task::JoinSet}; -use r2d2::{Pool, PooledConnection}; -use r2d2_sqlite::SqliteConnectionManager; use walkdir::{DirEntry, WalkDir}; use crate::{ @@ -16,11 +15,11 @@ use crate::{ pub struct LibraryService { #[allow(dead_code)] state: GrooveState, - pool: Pool, + pool: Pool, } impl LibraryService { - pub fn new(state: GrooveState, pool: Pool) -> Self { + pub fn new(state: GrooveState, pool: Pool) -> Self { Self { state, pool } } } @@ -31,11 +30,7 @@ impl Library for LibraryService { &self, _request: tonic::Request<()>, ) -> Result, tonic::Status> { - let Ok(db) = self.pool.get() else { - return Err(tonic::Status::internal("")); - }; - - let Ok(tracks) = get_tracks(&db) else { + let Ok(tracks) = get_tracks(&self.pool).await else { return Err(tonic::Status::internal("")); }; @@ -58,7 +53,7 @@ impl Library for LibraryService { pub async fn index_path( path: PathBuf, - db: PooledConnection, + db: &Pool, path_id: u64, ) -> Result<(), rusqlite::Error> { let home = home::home_dir().unwrap(); @@ -103,7 +98,7 @@ pub async fn index_path( let now = Instant::now(); - insert_tracks(db, Arc::try_unwrap(tracks).unwrap().into_inner(), path_id)?; + insert_tracks(db, Arc::try_unwrap(tracks).unwrap().into_inner(), path_id).await?; let elapsed = now.elapsed(); diff --git a/src/main.rs b/src/main.rs index 6bfba72..d796793 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,6 +22,7 @@ pub mod settings; pub mod state; use settings::SettingsService; +use music::player::start_watching; pub mod proto { pub mod settings { @@ -39,9 +40,7 @@ pub mod proto { async fn main() -> Result<(), Box> { let pool = &mut establish_connection(); - let connection = pool.get().unwrap(); - - initialize_database(&connection).expect("Error initializing database"); + initialize_database(&pool).await.expect("Error initializing database"); let address = "[::1]:39993".parse()?; @@ -52,7 +51,7 @@ async fn main() -> Result<(), Box> { let player = Arc::new(Mutex::new(AudioPlayer::new(sink))); let c_player = player.clone(); - let (watch_handle, mut rx) = player.lock().await.start_watching(); + let (watch_handle, mut rx) = start_watching(player.clone()); let player_rx_handle = tokio::spawn(async move { while let Some(next) = rx.recv().await { @@ -60,11 +59,10 @@ async fn main() -> Result<(), Box> { match next { Some(queued_track) => { let _ = player - .play_track(queued_track.track, queued_track.path, false) - .await; + .play_track(queued_track.track, queued_track.path, false); } None => { - player.clear().await; + player.clear(); } } } diff --git a/src/music/player.rs b/src/music/player.rs index 76e23e6..7705c5e 100644 --- a/src/music/player.rs +++ b/src/music/player.rs @@ -21,9 +21,9 @@ use crate::{database::tracks::Track, proto::player::PlayerStatus}; use super::queue::QueuedTrack; pub struct AudioPlayer { - sink: Arc>, - currently_playing: Arc>>, - queue: Arc>>, + sink: Sink, + currently_playing: Option, + queue: VecDeque, } impl AudioPlayer { @@ -32,41 +32,13 @@ impl AudioPlayer { sink.set_volume(0.5); Self { - sink: Arc::new(Mutex::new(sink)), - currently_playing: Arc::new(Mutex::new(None)), - queue: Arc::new(Mutex::new(VecDeque::new())), + sink, + currently_playing: None, + queue: VecDeque::new(), } } - pub fn start_watching(&mut self) -> (JoinHandle<()>, Receiver>) { - let sink = self.sink.clone(); - let current_track = self.currently_playing.clone(); - let queue = self.queue.clone(); - - let (tx, rx) = mpsc::channel::>(128); - - ( - tokio::spawn(async move { - loop { - if let Some(current_track) = current_track.lock().await.as_mut() { - let sink = sink.lock().await; - if sink.empty() - || current_track.duration as u128 <= sink.get_pos().as_millis() - { - if let Err(_) = tx.send(queue.lock().await.pop_front()).await { - break; - } - } - } - - tokio::time::sleep(Self::WATCH_SLEEP_TIME).await; - } - }), - rx, - ) - } - - pub async fn play_track

( + pub fn play_track

( &mut self, track: Track, path: P, @@ -75,43 +47,36 @@ impl AudioPlayer { where P: AsRef, { - let sink = self.sink.lock().await; + self.currently_playing = Some(track); - sink.clear(); + self.sink.clear(); if clear_queue { - self.queue.lock().await.clear(); + self.queue.clear(); } let file = BufReader::new(Cursor::new(fs::read(path)?)); let source = Decoder::new(file)?; - *self.currently_playing.lock().await = Some(track); - sink.append(source); + self.sink.append(source); - sink.play(); + self.sink.play(); Ok(()) } - pub async fn play_track_next( + pub fn play_track_next( &mut self, track: Track, path: &Path, ) -> Result<(), Box> { - let sink = self.sink.lock().await; - - let mut queue = self.queue.lock().await; - - if sink.empty() && queue.is_empty() { - drop(queue); - drop(sink); - return self.play_track(track, path, false).await; + if self.sink.empty() && self.queue.is_empty() { + return self.play_track(track, path, false); } let queued_track = QueuedTrack::new(track, path.to_path_buf()); - queue.push_front(queued_track); + self.queue.push_front(queued_track); Ok(()) } @@ -121,138 +86,142 @@ impl AudioPlayer { track: Track, path: &Path, ) -> Result<(), Box> { - let sink = self.sink.lock().await; - let mut queue = self.queue.lock().await; - - if sink.empty() && queue.is_empty() { - drop(queue); - drop(sink); - return self.play_track(track, path, false).await; + if self.sink.empty() && self.queue.is_empty() { + return self.play_track(track, path, false); } let queued_track = QueuedTrack::new(track, path.to_path_buf()); - queue.push_back(queued_track); + self.queue.push_back(queued_track); Ok(()) } - pub async fn skip_track(&mut self) -> Result<(), Box> { - let mut queue = self.queue.lock().await; - - let Some(queued_track) = queue.pop_front() else { - drop(queue); - self.clear().await; + pub fn skip_track(&mut self) -> Result<(), Box> { + let Some(queued_track) = self.queue.pop_front() else { + self.clear(); return Ok(()); }; - drop(queue); - self.play_track(queued_track.track, queued_track.path, false) - .await } - pub async fn skip_to_queue_index( - &mut self, - index: usize, - ) -> Result<(), Box> { - let mut queue = self.queue.lock().await; - - let Some(queued_track) = queue.remove(index) else { - drop(queue); - self.clear().await; + pub fn skip_to_queue_index(&mut self, index: usize) -> Result<(), Box> { + let Some(queued_track) = self.queue.remove(index) else { + self.clear(); return Ok(()); }; - drop(queue); self.play_track(queued_track.track, queued_track.path, false) - .await } - pub async fn swap_queue_indices(&mut self, a: usize, b: usize) -> bool { - let mut queue = self.queue.lock().await; - - let len = queue.len(); + pub fn swap_queue_indices(&mut self, a: usize, b: usize) -> bool { + let len = self.queue.len(); if a >= len || b >= len { return false; } - queue.swap(a, b); + self.queue.swap(a, b); true } - pub async fn resume(&self) { - self.sink.lock().await.play(); + pub fn resume(&self) { + self.sink.play(); } - pub async fn pause(&self) { - self.sink.lock().await.pause(); + pub fn pause(&self) { + self.sink.pause() } /// Toggles the player's pause state and returns the new value - pub async fn toggle_pause(&self) -> bool { - if self.is_paused().await { - self.resume().await; + pub fn toggle_pause(&self) -> bool { + if self.is_paused() { + self.resume(); false } else { - self.pause().await; + self.pause(); true } } - pub async fn volume(&self) -> f32 { - self.sink.lock().await.volume() + pub fn volume(&self) -> f32 { + self.sink.volume() } - pub async fn set_volume(&self, value: f32) { - self.sink.lock().await.set_volume(value); + pub fn set_volume(&self, value: f32) { + self.sink.set_volume(value); } - pub async fn position(&self) -> u128 { - self.sink.lock().await.get_pos().as_millis() + pub fn position(&self) -> u128 { + self.sink.get_pos().as_millis() } - pub async fn set_position(&self, position: u64) { - let _ = self - .sink - .lock() - .await - .try_seek(Duration::from_millis(position)); + pub fn set_position(&self, position: u64) { + let _ = self.sink.try_seek(Duration::from_millis(position)); } - pub async fn is_paused(&self) -> bool { - self.sink.lock().await.is_paused() + pub fn is_paused(&self) -> bool { + self.sink.is_paused() } - pub async fn currently_playing(&self) -> Option { - self.currently_playing.lock().await.clone() + pub fn currently_playing(&self) -> Option { + self.currently_playing.clone() } - pub async fn queue(&self) -> VecDeque { - self.queue.lock().await.clone() + pub fn queue(&self) -> VecDeque { + self.queue.clone() } - pub async fn clear(&mut self) { - self.queue.lock().await.clear(); - self.sink.lock().await.clear(); - *self.currently_playing.lock().await = None; + pub fn clear(&mut self) { + self.queue.clear(); + self.sink.clear(); + self.currently_playing = None; } - pub async fn get_snapshot(&self) -> StatusSnapshot { + pub fn get_snapshot(&self) -> StatusSnapshot { StatusSnapshot { - volume: self.volume().await, - position: self.position().await, - is_paused: self.is_paused().await, - currently_playing: self.currently_playing().await, - queue: self.queue().await, + volume: self.volume(), + position: self.position(), + is_paused: self.is_paused(), + currently_playing: self.currently_playing(), + queue: self.queue(), } } } +pub fn start_watching( + player: Arc>, +) -> (JoinHandle<()>, Receiver>) { + let (tx, rx) = mpsc::channel::>(128); + + ( + tokio::spawn(async move { + loop { + { + let mut player = player.lock().await; + + if let Some(current_track) = player.currently_playing() { + if player.sink.empty() + || current_track.duration as u128 <= player.position() + { + if let Err(_) = tx.send(player.queue.pop_front()).await { + break; + } + } + } + } + + tokio::time::sleep(AudioPlayer::WATCH_SLEEP_TIME).await; + } + }), + rx, + ) +} + #[derive(Debug, Clone)] pub struct StatusSnapshot { pub volume: f32, diff --git a/src/player.rs b/src/player.rs index 9da4f72..d89fa81 100644 --- a/src/player.rs +++ b/src/player.rs @@ -1,5 +1,4 @@ -use r2d2::Pool; -use r2d2_sqlite::SqliteConnectionManager; +use deadpool_sqlite::Pool; use std::{pin::Pin, time::Duration}; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, Stream}; @@ -20,11 +19,11 @@ use crate::{ pub struct PlayerService { state: GrooveState, - pool: Pool, + pool: Pool, } impl PlayerService { - pub fn new(state: GrooveState, pool: Pool) -> Self { + pub fn new(state: GrooveState, pool: Pool) -> Self { Self { state, pool } } } @@ -37,28 +36,21 @@ impl Player for PlayerService { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let Ok(db) = self.pool.get() else { - return Err(tonic::Status::internal("")); - }; - let input = request.get_ref(); - let Ok(track) = get_track(&db, input.hash.as_str()) else { - return Err(tonic::Status::not_found("")); + let Ok(track) = get_track(&self.pool, input.hash.as_str()).await else { + return Err(tonic::Status::not_found("Could not get track")); }; - let Ok(track_path) = get_track_full_path(&db, input.hash.as_str()) else { - return Err(tonic::Status::not_found("")); + let Ok(track_path) = get_track_full_path(&self.pool, input.hash.as_str()).await else { + return Err(tonic::Status::not_found("Could not get track file path")); }; let state = self.state.lock().await; - let _ = state - .player - .lock() - .await - .play_track(track.clone(), track_path, true) - .await; + let player = &mut state.player.lock().await; + + let _ = player.play_track(track.clone(), track_path, true); let response = PlayTrackResponse { track: Some(proto::library::Track::from(track.into())), @@ -74,7 +66,7 @@ impl Player for PlayerService { ) -> Result, tonic::Status> { let state = self.state.lock().await; - state.player.lock().await.resume().await; + state.player.lock().await.resume(); let response = PauseState { is_paused: false }; @@ -87,7 +79,7 @@ impl Player for PlayerService { ) -> Result, tonic::Status> { let state = self.state.lock().await; - state.player.lock().await.pause().await; + state.player.lock().await.pause(); let response = PauseState { is_paused: true }; @@ -100,7 +92,7 @@ impl Player for PlayerService { ) -> Result, tonic::Status> { let state = self.state.lock().await; - let is_paused = state.player.lock().await.toggle_pause().await; + let is_paused = state.player.lock().await.toggle_pause(); let response = PauseState { is_paused }; @@ -114,7 +106,7 @@ impl Player for PlayerService { let input = request.get_ref(); let state = self.state.lock().await; - state.player.lock().await.set_position(input.position).await; + state.player.lock().await.set_position(input.position); let response = SeekPositionResponse { position: input.position, @@ -130,7 +122,7 @@ impl Player for PlayerService { let input = request.get_ref(); let state = self.state.lock().await; - state.player.lock().await.set_volume(input.volume).await; + state.player.lock().await.set_volume(input.volume); let response = SetVolumeResponse { volume: input.volume, @@ -159,7 +151,6 @@ impl Player for PlayerService { .lock() .await .get_snapshot() - .await .into())) .await { @@ -181,17 +172,13 @@ impl Player for PlayerService { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let Ok(db) = self.pool.get() else { - return Err(tonic::Status::internal("")); - }; - let input = request.get_ref(); - let Ok(track) = get_track(&db, input.hash.as_str()) else { + let Ok(track) = get_track(&self.pool, input.hash.as_str()).await else { return Err(tonic::Status::not_found("")); }; - let Ok(track_path) = get_track_full_path(&db, input.hash.as_str()) else { + let Ok(track_path) = get_track_full_path(&self.pool, input.hash.as_str()).await else { return Err(tonic::Status::not_found("")); }; @@ -202,7 +189,7 @@ impl Player for PlayerService { return Err(tonic::Status::internal("")); } - let queue = player.queue().await; + let queue = player.queue(); let response = Queue { tracks: queue_to_track_vec(queue), @@ -215,28 +202,24 @@ impl Player for PlayerService { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let Ok(db) = self.pool.get() else { - return Err(tonic::Status::internal("")); - }; - let input = request.get_ref(); - let Ok(track) = get_track(&db, input.hash.as_str()) else { + let Ok(track) = get_track(&self.pool, input.hash.as_str()).await else { return Err(tonic::Status::not_found("")); }; - let Ok(track_path) = get_track_full_path(&db, input.hash.as_str()) else { + let Ok(track_path) = get_track_full_path(&self.pool, input.hash.as_str()).await else { return Err(tonic::Status::not_found("")); }; let state = self.state.lock().await; let mut player = state.player.lock().await; - if let Err(_) = player.play_track_next(track, &track_path).await { + if let Err(_) = player.play_track_next(track, &track_path) { return Err(tonic::Status::internal("")); } - let queue = player.queue().await; + let queue = player.queue(); let response = Queue { tracks: queue_to_track_vec(queue), @@ -253,11 +236,11 @@ impl Player for PlayerService { let mut player = state.player.lock().await; - if let Err(_) = player.skip_track().await { + if let Err(_) = player.skip_track() { return Err(tonic::Status::internal("")); }; - let response = player.get_snapshot().await.into(); + let response = player.get_snapshot().into(); Ok(tonic::Response::new(response)) } @@ -275,11 +258,11 @@ impl Player for PlayerService { let state = self.state.lock().await; let mut player = state.player.lock().await; - if let Err(_) = player.skip_to_queue_index(index).await { + if let Err(_) = player.skip_to_queue_index(index) { return Err(tonic::Status::internal("")); }; - let response = player.get_snapshot().await.into(); + let response = player.get_snapshot().into(); Ok(tonic::Response::new(response)) } @@ -301,11 +284,11 @@ impl Player for PlayerService { let state = self.state.lock().await; let mut player = state.player.lock().await; - if !player.swap_queue_indices(a, b).await { + if !player.swap_queue_indices(a, b) { return Err(tonic::Status::internal("")); }; - let queue = player.queue().await; + let queue = player.queue(); let response = Queue { tracks: queue_to_track_vec(queue), diff --git a/src/settings.rs b/src/settings.rs index 3e5aeff..9f0e664 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -9,18 +9,17 @@ use crate::proto::settings::{ }; use crate::state::GrooveState; +use deadpool_sqlite::Pool; use proto::settings::{settings_server::Settings, LibraryPath, SettingsData}; -use r2d2::Pool; -use r2d2_sqlite::SqliteConnectionManager; pub struct SettingsService { #[allow(dead_code)] state: GrooveState, - pool: Pool, + pool: Pool, } impl SettingsService { - pub fn new(state: GrooveState, pool: Pool) -> Self { + pub fn new(state: GrooveState, pool: Pool) -> Self { Self { state, pool } } } @@ -31,11 +30,7 @@ impl Settings for SettingsService { &self, _request: tonic::Request<()>, ) -> Result, tonic::Status> { - let Ok(db) = self.pool.get() else { - return Err(tonic::Status::internal("")); - }; - - let Ok(library_paths) = get_library_paths(&db) else { + let Ok(library_paths) = get_library_paths(&self.pool).await else { return Err(tonic::Status::internal("")); }; @@ -58,11 +53,7 @@ impl Settings for SettingsService { ) -> Result, tonic::Status> { let input = request.into_inner(); - let Ok(db) = self.pool.get() else { - return Err(tonic::Status::internal("")); - }; - - let Ok(insert_result) = insert_library_path(&db, input.path) else { + let Ok(insert_result) = insert_library_path(&self.pool, input.path).await else { return Err(tonic::Status::internal("")); }; @@ -81,11 +72,7 @@ impl Settings for SettingsService { ) -> Result, tonic::Status> { let input = request.into_inner(); - let Ok(db) = self.pool.get() else { - return Err(tonic::Status::internal("")); - }; - - let Ok(delete_result) = delete_library_path(&db, input.id) else { + let Ok(delete_result) = delete_library_path(&self.pool, input.id).await else { return Err(tonic::Status::internal("")); }; @@ -104,15 +91,11 @@ impl Settings for SettingsService { ) -> Result, tonic::Status> { let input = request.into_inner(); - let Ok(db) = self.pool.get() else { - return Err(tonic::Status::internal("")); - }; - - let Ok(library_path) = get_library_path(&db, input.id) else { + let Ok(library_path) = get_library_path(&self.pool, input.id).await else { return Err(tonic::Status::not_found("")); }; - let _ = index_path(library_path.path.into(), db, library_path.id).await; + let _ = index_path(library_path.path.into(), &self.pool, library_path.id).await; let response = RefreshPathResponse {};