fix: player deadlock related to start_watching

This commit is contained in:
2024-11-29 03:21:48 +01:00
parent e60c2d15d2
commit 634c147ee9
11 changed files with 254 additions and 291 deletions

95
Cargo.lock generated
View File

@@ -512,6 +512,46 @@ version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c87e182de0887fd5361989c677c4e8f5000cd9491d6d563161a8f3a5519fc7f"
[[package]]
name = "deadpool"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6541a3916932fe57768d4be0b1ffb5ec7cbf74ca8c903fdfd5c0fe8aa958f0ed"
dependencies = [
"deadpool-runtime",
"num_cpus",
"tokio",
]
[[package]]
name = "deadpool-runtime"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b"
dependencies = [
"tokio",
]
[[package]]
name = "deadpool-sqlite"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "656f14fc1ab819c65f332045ea7cb38841bbe551f3b2bc7e3abefb559af4155c"
dependencies = [
"deadpool",
"deadpool-sync",
"rusqlite",
]
[[package]]
name = "deadpool-sync"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524bc3df0d57e98ecd022e21ba31166c2625e7d3e5bcc4510efaeeab4abcab04"
dependencies = [
"deadpool-runtime",
]
[[package]]
name = "dotenvy"
version = "0.15.7"
@@ -706,13 +746,13 @@ version = "0.1.0"
dependencies = [
"axum",
"blake3",
"deadpool",
"deadpool-sqlite",
"dotenvy",
"hex",
"home",
"image",
"prost",
"r2d2",
"r2d2_sqlite",
"rayon",
"rodio",
"rusqlite",
@@ -1347,6 +1387,16 @@ dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "num_enum"
version = "0.7.3"
@@ -1644,28 +1694,6 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "r2d2"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
dependencies = [
"log",
"parking_lot",
"scheduled-thread-pool",
]
[[package]]
name = "r2d2_sqlite"
version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb14dba8247a6a15b7fdbc7d389e2e6f03ee9f184f87117706d509c092dfe846"
dependencies = [
"r2d2",
"rusqlite",
"uuid",
]
[[package]]
name = "rand"
version = "0.8.5"
@@ -1883,15 +1911,6 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "scheduled-thread-pool"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19"
dependencies = [
"parking_lot",
]
[[package]]
name = "scopeguard"
version = "1.2.0"
@@ -2556,16 +2575,6 @@ version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83"
[[package]]
name = "uuid"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a"
dependencies = [
"getrandom",
"rand",
]
[[package]]
name = "v_frame"
version = "0.3.8"

View File

@@ -6,13 +6,13 @@ edition = "2021"
[dependencies]
axum = "0.7.9"
blake3 = "1.5.4"
deadpool = "0.12.1"
deadpool-sqlite = "0.9.0"
dotenvy = "0.15.7"
hex = "0.4.3"
home = "0.5.9"
image = "0.25.5"
prost = "0.13.3"
r2d2 = "0.8.10"
r2d2_sqlite = { version = "0.25.0", features = ["bundled"] }
rayon = "1.10.0"
rodio = "0.20.1"
rusqlite = { version = "0.32.1", features = ["bundled"] }

View File

@@ -1,5 +1,4 @@
use r2d2::PooledConnection;
use r2d2_sqlite::SqliteConnectionManager;
use deadpool_sqlite::Pool;
use rusqlite::Row;
#[derive(Debug)]
@@ -15,9 +14,12 @@ fn map_artist(row: &Row) -> Result<Artist, rusqlite::Error> {
})
}
pub fn get_artists(
connection: &PooledConnection<SqliteConnectionManager>,
pub async fn get_artists(
pool: &Pool,
) -> Result<Vec<Artist>, rusqlite::Error> {
let manager = pool.get().await.unwrap();
let connection = manager.lock().unwrap();
let mut statement = connection.prepare("SELECT id, name FROM artists")?;
let rows = statement.query_map([], map_artist)?;

View File

@@ -1,25 +1,27 @@
use deadpool_sqlite::{Config, Pool};
pub mod artists;
pub mod paths;
pub mod tracks;
use r2d2::{Pool, PooledConnection};
use r2d2_sqlite::SqliteConnectionManager;
pub fn establish_connection() -> Pool<SqliteConnectionManager> {
pub fn establish_connection() -> Pool {
dotenvy::dotenv().ok();
let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set");
let manager = SqliteConnectionManager::file(database_url);
let pool = Pool::new(manager).expect("Error creating SQLite pool");
let cfg = Config::new(database_url);
let pool = cfg.create_pool(deadpool::Runtime::Tokio1).expect("Error creating SQLite pool");
pool
}
pub fn initialize_database(
connection: &PooledConnection<SqliteConnectionManager>,
) -> Result<(), r2d2_sqlite::rusqlite::Error> {
connection.execute(
pub async fn initialize_database(
pool: &Pool,
) -> Result<(), rusqlite::Error> {
let manager = pool.get().await.unwrap();
let conn = manager.lock().unwrap();
conn.execute(
"
CREATE TABLE IF NOT EXISTS library_paths (
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
@@ -29,7 +31,7 @@ pub fn initialize_database(
[],
)?;
connection.execute(
conn.execute(
"CREATE TABLE IF NOT EXISTS artists (
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
name TEXT NOT NULL
@@ -37,7 +39,7 @@ pub fn initialize_database(
[],
)?;
connection.execute(
conn.execute(
"CREATE TABLE IF NOT EXISTS albums (
name TEXT NOT NULL,
artist_id INTEGER NOT NULL,
@@ -47,7 +49,7 @@ pub fn initialize_database(
[],
)?;
connection.execute(
conn.execute(
"
CREATE TABLE IF NOT EXISTS tracks (
hash TEXT PRIMARY KEY NOT NULL,

View File

@@ -1,5 +1,4 @@
use r2d2::PooledConnection;
use r2d2_sqlite::SqliteConnectionManager;
use deadpool_sqlite::Pool;
use rusqlite::Row;
#[derive(Debug)]
@@ -17,9 +16,12 @@ fn map_library_path(row: &Row) -> Result<LibraryPath, rusqlite::Error> {
})
}
pub fn get_library_paths(
connection: &PooledConnection<SqliteConnectionManager>,
pub async fn get_library_paths(
pool: &Pool,
) -> Result<Vec<LibraryPath>, rusqlite::Error> {
let manager = pool.get().await.unwrap();
let connection = manager.lock().unwrap();
let mut statement = connection.prepare("SELECT id, path FROM library_paths")?;
let rows = statement.query_map([], map_library_path)?;
@@ -34,10 +36,13 @@ pub fn get_library_paths(
Ok(paths)
}
pub fn get_library_path(
connection: &PooledConnection<SqliteConnectionManager>,
pub async fn get_library_path(
pool: &Pool,
id: u64,
) -> Result<LibraryPath, rusqlite::Error> {
let manager = pool.get().await.unwrap();
let connection = manager.lock().unwrap();
Ok(connection.query_row(
"SELECT id, path FROM library_paths WHERE id = ?1",
[id],
@@ -45,19 +50,25 @@ pub fn get_library_path(
)?)
}
pub fn insert_library_path(
connection: &PooledConnection<SqliteConnectionManager>,
pub async fn insert_library_path(
pool: &Pool,
path: LibraryPathInsertData,
) -> Result<bool, rusqlite::Error> {
let manager = pool.get().await.unwrap();
let connection = manager.lock().unwrap();
let result = connection.execute("INSERT INTO library_paths (path) VALUES (?1)", [path])?;
Ok(result > 0)
}
pub fn delete_library_path(
connection: &PooledConnection<SqliteConnectionManager>,
pub async fn delete_library_path(
pool: &Pool,
path_id: u64,
) -> Result<bool, rusqlite::Error> {
let manager = pool.get().await.unwrap();
let connection = manager.lock().unwrap();
let result = connection.execute("DELETE FROM library_paths WHERE id = ?1", [path_id])?;
Ok(result > 0)

View File

@@ -3,8 +3,7 @@ use std::{
path::{Path, PathBuf},
};
use r2d2::PooledConnection;
use r2d2_sqlite::SqliteConnectionManager;
use deadpool_sqlite::Pool;
use rusqlite::{params, Row};
use crate::{
@@ -58,9 +57,12 @@ fn map_track(row: &Row) -> Result<Track, rusqlite::Error> {
})
}
pub fn get_tracks(
connection: &PooledConnection<SqliteConnectionManager>,
pub async fn get_tracks(
pool: &Pool,
) -> Result<Vec<Track>, rusqlite::Error> {
let manager = pool.get().await.unwrap();
let connection = manager.lock().unwrap();
let mut statement = connection.prepare("SELECT t.hash, t.name, t.duration, t.artist_id, a.name AS artist_name FROM tracks t INNER JOIN artists a ON a.id = t.artist_id")?;
let rows = statement.query_map([], map_track)?;
@@ -75,17 +77,23 @@ pub fn get_tracks(
Ok(tracks)
}
pub fn get_track(
connection: &PooledConnection<SqliteConnectionManager>,
pub async fn get_track(
pool: &Pool,
hash: &str,
) -> Result<Track, rusqlite::Error> {
let manager = pool.get().await.unwrap();
let connection = &manager.lock().unwrap();
connection.query_row("SELECT t.hash, t.name, t.duration, t.artist_id, a.name AS artist_name FROM tracks t INNER JOIN artists a ON a.id = t.artist_id WHERE t.hash = ?1", [hash], map_track)
}
pub fn get_track_full_path(
connection: &PooledConnection<SqliteConnectionManager>,
pub async fn get_track_full_path(
pool: &Pool,
hash: &str,
) -> Result<PathBuf, rusqlite::Error> {
let manager = pool.get().await.unwrap();
let connection = &manager.lock().unwrap();
let (relative_path, library_path): (String, String) = connection.query_row(
"SELECT t.path, l.path AS library_root FROM tracks t INNER JOIN library_paths l ON t.library_path_id = l.id WHERE t.hash = ?1",
[hash],
@@ -98,14 +106,14 @@ pub fn get_track_full_path(
Ok(path)
}
pub fn insert_tracks(
mut connection: PooledConnection<SqliteConnectionManager>,
pub async fn insert_tracks(
pool: &Pool,
tracks: HashMap<String, TrackMetadata>,
library_path_id: u64,
) -> Result<(), rusqlite::Error> {
let existing_covers = get_all_cover_hashes();
let artists = get_artists(&connection)?;
let artists = get_artists(pool).await?;
let mut artist_names_to_id: HashMap<String, u64> = HashMap::new();
for artist in artists {
@@ -124,6 +132,9 @@ pub fn insert_tracks(
new_artists.push(meta.artist_name.clone());
}
let manager = pool.get().await.unwrap();
let mut connection = manager.lock().unwrap();
// BEGIN TRANSACTION
let tx = connection.transaction()?;

View File

@@ -1,8 +1,7 @@
use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Instant};
use deadpool_sqlite::Pool;
use tokio::{sync::Mutex, task::JoinSet};
use r2d2::{Pool, PooledConnection};
use r2d2_sqlite::SqliteConnectionManager;
use walkdir::{DirEntry, WalkDir};
use crate::{
@@ -16,11 +15,11 @@ use crate::{
pub struct LibraryService {
#[allow(dead_code)]
state: GrooveState,
pool: Pool<SqliteConnectionManager>,
pool: Pool,
}
impl LibraryService {
pub fn new(state: GrooveState, pool: Pool<SqliteConnectionManager>) -> Self {
pub fn new(state: GrooveState, pool: Pool) -> Self {
Self { state, pool }
}
}
@@ -31,11 +30,7 @@ impl Library for LibraryService {
&self,
_request: tonic::Request<()>,
) -> Result<tonic::Response<TrackList>, tonic::Status> {
let Ok(db) = self.pool.get() else {
return Err(tonic::Status::internal(""));
};
let Ok(tracks) = get_tracks(&db) else {
let Ok(tracks) = get_tracks(&self.pool).await else {
return Err(tonic::Status::internal(""));
};
@@ -58,7 +53,7 @@ impl Library for LibraryService {
pub async fn index_path(
path: PathBuf,
db: PooledConnection<SqliteConnectionManager>,
db: &Pool,
path_id: u64,
) -> Result<(), rusqlite::Error> {
let home = home::home_dir().unwrap();
@@ -103,7 +98,7 @@ pub async fn index_path(
let now = Instant::now();
insert_tracks(db, Arc::try_unwrap(tracks).unwrap().into_inner(), path_id)?;
insert_tracks(db, Arc::try_unwrap(tracks).unwrap().into_inner(), path_id).await?;
let elapsed = now.elapsed();

View File

@@ -22,6 +22,7 @@ pub mod settings;
pub mod state;
use settings::SettingsService;
use music::player::start_watching;
pub mod proto {
pub mod settings {
@@ -39,9 +40,7 @@ pub mod proto {
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = &mut establish_connection();
let connection = pool.get().unwrap();
initialize_database(&connection).expect("Error initializing database");
initialize_database(&pool).await.expect("Error initializing database");
let address = "[::1]:39993".parse()?;
@@ -52,7 +51,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let player = Arc::new(Mutex::new(AudioPlayer::new(sink)));
let c_player = player.clone();
let (watch_handle, mut rx) = player.lock().await.start_watching();
let (watch_handle, mut rx) = start_watching(player.clone());
let player_rx_handle = tokio::spawn(async move {
while let Some(next) = rx.recv().await {
@@ -60,11 +59,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
match next {
Some(queued_track) => {
let _ = player
.play_track(queued_track.track, queued_track.path, false)
.await;
.play_track(queued_track.track, queued_track.path, false);
}
None => {
player.clear().await;
player.clear();
}
}
}

View File

@@ -21,9 +21,9 @@ use crate::{database::tracks::Track, proto::player::PlayerStatus};
use super::queue::QueuedTrack;
pub struct AudioPlayer {
sink: Arc<Mutex<Sink>>,
currently_playing: Arc<Mutex<Option<Track>>>,
queue: Arc<Mutex<VecDeque<QueuedTrack>>>,
sink: Sink,
currently_playing: Option<Track>,
queue: VecDeque<QueuedTrack>,
}
impl AudioPlayer {
@@ -32,41 +32,13 @@ impl AudioPlayer {
sink.set_volume(0.5);
Self {
sink: Arc::new(Mutex::new(sink)),
currently_playing: Arc::new(Mutex::new(None)),
queue: Arc::new(Mutex::new(VecDeque::new())),
sink,
currently_playing: None,
queue: VecDeque::new(),
}
}
pub fn start_watching(&mut self) -> (JoinHandle<()>, Receiver<Option<QueuedTrack>>) {
let sink = self.sink.clone();
let current_track = self.currently_playing.clone();
let queue = self.queue.clone();
let (tx, rx) = mpsc::channel::<Option<QueuedTrack>>(128);
(
tokio::spawn(async move {
loop {
if let Some(current_track) = current_track.lock().await.as_mut() {
let sink = sink.lock().await;
if sink.empty()
|| current_track.duration as u128 <= sink.get_pos().as_millis()
{
if let Err(_) = tx.send(queue.lock().await.pop_front()).await {
break;
}
}
}
tokio::time::sleep(Self::WATCH_SLEEP_TIME).await;
}
}),
rx,
)
}
pub async fn play_track<P>(
pub fn play_track<P>(
&mut self,
track: Track,
path: P,
@@ -75,43 +47,36 @@ impl AudioPlayer {
where
P: AsRef<Path>,
{
let sink = self.sink.lock().await;
self.currently_playing = Some(track);
sink.clear();
self.sink.clear();
if clear_queue {
self.queue.lock().await.clear();
self.queue.clear();
}
let file = BufReader::new(Cursor::new(fs::read(path)?));
let source = Decoder::new(file)?;
*self.currently_playing.lock().await = Some(track);
sink.append(source);
self.sink.append(source);
sink.play();
self.sink.play();
Ok(())
}
pub async fn play_track_next(
pub fn play_track_next(
&mut self,
track: Track,
path: &Path,
) -> Result<(), Box<dyn std::error::Error>> {
let sink = self.sink.lock().await;
let mut queue = self.queue.lock().await;
if sink.empty() && queue.is_empty() {
drop(queue);
drop(sink);
return self.play_track(track, path, false).await;
if self.sink.empty() && self.queue.is_empty() {
return self.play_track(track, path, false);
}
let queued_track = QueuedTrack::new(track, path.to_path_buf());
queue.push_front(queued_track);
self.queue.push_front(queued_track);
Ok(())
}
@@ -121,138 +86,142 @@ impl AudioPlayer {
track: Track,
path: &Path,
) -> Result<(), Box<dyn std::error::Error>> {
let sink = self.sink.lock().await;
let mut queue = self.queue.lock().await;
if sink.empty() && queue.is_empty() {
drop(queue);
drop(sink);
return self.play_track(track, path, false).await;
if self.sink.empty() && self.queue.is_empty() {
return self.play_track(track, path, false);
}
let queued_track = QueuedTrack::new(track, path.to_path_buf());
queue.push_back(queued_track);
self.queue.push_back(queued_track);
Ok(())
}
pub async fn skip_track(&mut self) -> Result<(), Box<dyn std::error::Error>> {
let mut queue = self.queue.lock().await;
let Some(queued_track) = queue.pop_front() else {
drop(queue);
self.clear().await;
pub fn skip_track(&mut self) -> Result<(), Box<dyn std::error::Error>> {
let Some(queued_track) = self.queue.pop_front() else {
self.clear();
return Ok(());
};
drop(queue);
self.play_track(queued_track.track, queued_track.path, false)
.await
}
pub async fn skip_to_queue_index(
&mut self,
index: usize,
) -> Result<(), Box<dyn std::error::Error>> {
let mut queue = self.queue.lock().await;
let Some(queued_track) = queue.remove(index) else {
drop(queue);
self.clear().await;
pub fn skip_to_queue_index(&mut self, index: usize) -> Result<(), Box<dyn std::error::Error>> {
let Some(queued_track) = self.queue.remove(index) else {
self.clear();
return Ok(());
};
drop(queue);
self.play_track(queued_track.track, queued_track.path, false)
.await
}
pub async fn swap_queue_indices(&mut self, a: usize, b: usize) -> bool {
let mut queue = self.queue.lock().await;
let len = queue.len();
pub fn swap_queue_indices(&mut self, a: usize, b: usize) -> bool {
let len = self.queue.len();
if a >= len || b >= len {
return false;
}
queue.swap(a, b);
self.queue.swap(a, b);
true
}
pub async fn resume(&self) {
self.sink.lock().await.play();
pub fn resume(&self) {
self.sink.play();
}
pub async fn pause(&self) {
self.sink.lock().await.pause();
pub fn pause(&self) {
self.sink.pause()
}
/// Toggles the player's pause state and returns the new value
pub async fn toggle_pause(&self) -> bool {
if self.is_paused().await {
self.resume().await;
pub fn toggle_pause(&self) -> bool {
if self.is_paused() {
self.resume();
false
} else {
self.pause().await;
self.pause();
true
}
}
pub async fn volume(&self) -> f32 {
self.sink.lock().await.volume()
pub fn volume(&self) -> f32 {
self.sink.volume()
}
pub async fn set_volume(&self, value: f32) {
self.sink.lock().await.set_volume(value);
pub fn set_volume(&self, value: f32) {
self.sink.set_volume(value);
}
pub async fn position(&self) -> u128 {
self.sink.lock().await.get_pos().as_millis()
pub fn position(&self) -> u128 {
self.sink.get_pos().as_millis()
}
pub async fn set_position(&self, position: u64) {
let _ = self
.sink
.lock()
.await
.try_seek(Duration::from_millis(position));
pub fn set_position(&self, position: u64) {
let _ = self.sink.try_seek(Duration::from_millis(position));
}
pub async fn is_paused(&self) -> bool {
self.sink.lock().await.is_paused()
pub fn is_paused(&self) -> bool {
self.sink.is_paused()
}
pub async fn currently_playing(&self) -> Option<Track> {
self.currently_playing.lock().await.clone()
pub fn currently_playing(&self) -> Option<Track> {
self.currently_playing.clone()
}
pub async fn queue(&self) -> VecDeque<QueuedTrack> {
self.queue.lock().await.clone()
pub fn queue(&self) -> VecDeque<QueuedTrack> {
self.queue.clone()
}
pub async fn clear(&mut self) {
self.queue.lock().await.clear();
self.sink.lock().await.clear();
*self.currently_playing.lock().await = None;
pub fn clear(&mut self) {
self.queue.clear();
self.sink.clear();
self.currently_playing = None;
}
pub async fn get_snapshot(&self) -> StatusSnapshot {
pub fn get_snapshot(&self) -> StatusSnapshot {
StatusSnapshot {
volume: self.volume().await,
position: self.position().await,
is_paused: self.is_paused().await,
currently_playing: self.currently_playing().await,
queue: self.queue().await,
volume: self.volume(),
position: self.position(),
is_paused: self.is_paused(),
currently_playing: self.currently_playing(),
queue: self.queue(),
}
}
}
pub fn start_watching(
player: Arc<Mutex<AudioPlayer>>,
) -> (JoinHandle<()>, Receiver<Option<QueuedTrack>>) {
let (tx, rx) = mpsc::channel::<Option<QueuedTrack>>(128);
(
tokio::spawn(async move {
loop {
{
let mut player = player.lock().await;
if let Some(current_track) = player.currently_playing() {
if player.sink.empty()
|| current_track.duration as u128 <= player.position()
{
if let Err(_) = tx.send(player.queue.pop_front()).await {
break;
}
}
}
}
tokio::time::sleep(AudioPlayer::WATCH_SLEEP_TIME).await;
}
}),
rx,
)
}
#[derive(Debug, Clone)]
pub struct StatusSnapshot {
pub volume: f32,

View File

@@ -1,5 +1,4 @@
use r2d2::Pool;
use r2d2_sqlite::SqliteConnectionManager;
use deadpool_sqlite::Pool;
use std::{pin::Pin, time::Duration};
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, Stream};
@@ -20,11 +19,11 @@ use crate::{
pub struct PlayerService {
state: GrooveState,
pool: Pool<SqliteConnectionManager>,
pool: Pool,
}
impl PlayerService {
pub fn new(state: GrooveState, pool: Pool<SqliteConnectionManager>) -> Self {
pub fn new(state: GrooveState, pool: Pool) -> Self {
Self { state, pool }
}
}
@@ -37,28 +36,21 @@ impl Player for PlayerService {
&self,
request: tonic::Request<TrackRequest>,
) -> Result<tonic::Response<PlayTrackResponse>, tonic::Status> {
let Ok(db) = self.pool.get() else {
return Err(tonic::Status::internal(""));
};
let input = request.get_ref();
let Ok(track) = get_track(&db, input.hash.as_str()) else {
return Err(tonic::Status::not_found(""));
let Ok(track) = get_track(&self.pool, input.hash.as_str()).await else {
return Err(tonic::Status::not_found("Could not get track"));
};
let Ok(track_path) = get_track_full_path(&db, input.hash.as_str()) else {
return Err(tonic::Status::not_found(""));
let Ok(track_path) = get_track_full_path(&self.pool, input.hash.as_str()).await else {
return Err(tonic::Status::not_found("Could not get track file path"));
};
let state = self.state.lock().await;
let _ = state
.player
.lock()
.await
.play_track(track.clone(), track_path, true)
.await;
let player = &mut state.player.lock().await;
let _ = player.play_track(track.clone(), track_path, true);
let response = PlayTrackResponse {
track: Some(proto::library::Track::from(track.into())),
@@ -74,7 +66,7 @@ impl Player for PlayerService {
) -> Result<tonic::Response<PauseState>, tonic::Status> {
let state = self.state.lock().await;
state.player.lock().await.resume().await;
state.player.lock().await.resume();
let response = PauseState { is_paused: false };
@@ -87,7 +79,7 @@ impl Player for PlayerService {
) -> Result<tonic::Response<PauseState>, tonic::Status> {
let state = self.state.lock().await;
state.player.lock().await.pause().await;
state.player.lock().await.pause();
let response = PauseState { is_paused: true };
@@ -100,7 +92,7 @@ impl Player for PlayerService {
) -> Result<tonic::Response<PauseState>, tonic::Status> {
let state = self.state.lock().await;
let is_paused = state.player.lock().await.toggle_pause().await;
let is_paused = state.player.lock().await.toggle_pause();
let response = PauseState { is_paused };
@@ -114,7 +106,7 @@ impl Player for PlayerService {
let input = request.get_ref();
let state = self.state.lock().await;
state.player.lock().await.set_position(input.position).await;
state.player.lock().await.set_position(input.position);
let response = SeekPositionResponse {
position: input.position,
@@ -130,7 +122,7 @@ impl Player for PlayerService {
let input = request.get_ref();
let state = self.state.lock().await;
state.player.lock().await.set_volume(input.volume).await;
state.player.lock().await.set_volume(input.volume);
let response = SetVolumeResponse {
volume: input.volume,
@@ -159,7 +151,6 @@ impl Player for PlayerService {
.lock()
.await
.get_snapshot()
.await
.into()))
.await
{
@@ -181,17 +172,13 @@ impl Player for PlayerService {
&self,
request: tonic::Request<TrackRequest>,
) -> Result<tonic::Response<Queue>, tonic::Status> {
let Ok(db) = self.pool.get() else {
return Err(tonic::Status::internal(""));
};
let input = request.get_ref();
let Ok(track) = get_track(&db, input.hash.as_str()) else {
let Ok(track) = get_track(&self.pool, input.hash.as_str()).await else {
return Err(tonic::Status::not_found(""));
};
let Ok(track_path) = get_track_full_path(&db, input.hash.as_str()) else {
let Ok(track_path) = get_track_full_path(&self.pool, input.hash.as_str()).await else {
return Err(tonic::Status::not_found(""));
};
@@ -202,7 +189,7 @@ impl Player for PlayerService {
return Err(tonic::Status::internal(""));
}
let queue = player.queue().await;
let queue = player.queue();
let response = Queue {
tracks: queue_to_track_vec(queue),
@@ -215,28 +202,24 @@ impl Player for PlayerService {
&self,
request: tonic::Request<TrackRequest>,
) -> Result<tonic::Response<Queue>, tonic::Status> {
let Ok(db) = self.pool.get() else {
return Err(tonic::Status::internal(""));
};
let input = request.get_ref();
let Ok(track) = get_track(&db, input.hash.as_str()) else {
let Ok(track) = get_track(&self.pool, input.hash.as_str()).await else {
return Err(tonic::Status::not_found(""));
};
let Ok(track_path) = get_track_full_path(&db, input.hash.as_str()) else {
let Ok(track_path) = get_track_full_path(&self.pool, input.hash.as_str()).await else {
return Err(tonic::Status::not_found(""));
};
let state = self.state.lock().await;
let mut player = state.player.lock().await;
if let Err(_) = player.play_track_next(track, &track_path).await {
if let Err(_) = player.play_track_next(track, &track_path) {
return Err(tonic::Status::internal(""));
}
let queue = player.queue().await;
let queue = player.queue();
let response = Queue {
tracks: queue_to_track_vec(queue),
@@ -253,11 +236,11 @@ impl Player for PlayerService {
let mut player = state.player.lock().await;
if let Err(_) = player.skip_track().await {
if let Err(_) = player.skip_track() {
return Err(tonic::Status::internal(""));
};
let response = player.get_snapshot().await.into();
let response = player.get_snapshot().into();
Ok(tonic::Response::new(response))
}
@@ -275,11 +258,11 @@ impl Player for PlayerService {
let state = self.state.lock().await;
let mut player = state.player.lock().await;
if let Err(_) = player.skip_to_queue_index(index).await {
if let Err(_) = player.skip_to_queue_index(index) {
return Err(tonic::Status::internal(""));
};
let response = player.get_snapshot().await.into();
let response = player.get_snapshot().into();
Ok(tonic::Response::new(response))
}
@@ -301,11 +284,11 @@ impl Player for PlayerService {
let state = self.state.lock().await;
let mut player = state.player.lock().await;
if !player.swap_queue_indices(a, b).await {
if !player.swap_queue_indices(a, b) {
return Err(tonic::Status::internal(""));
};
let queue = player.queue().await;
let queue = player.queue();
let response = Queue {
tracks: queue_to_track_vec(queue),

View File

@@ -9,18 +9,17 @@ use crate::proto::settings::{
};
use crate::state::GrooveState;
use deadpool_sqlite::Pool;
use proto::settings::{settings_server::Settings, LibraryPath, SettingsData};
use r2d2::Pool;
use r2d2_sqlite::SqliteConnectionManager;
pub struct SettingsService {
#[allow(dead_code)]
state: GrooveState,
pool: Pool<SqliteConnectionManager>,
pool: Pool,
}
impl SettingsService {
pub fn new(state: GrooveState, pool: Pool<SqliteConnectionManager>) -> Self {
pub fn new(state: GrooveState, pool: Pool) -> Self {
Self { state, pool }
}
}
@@ -31,11 +30,7 @@ impl Settings for SettingsService {
&self,
_request: tonic::Request<()>,
) -> Result<tonic::Response<SettingsData>, tonic::Status> {
let Ok(db) = self.pool.get() else {
return Err(tonic::Status::internal(""));
};
let Ok(library_paths) = get_library_paths(&db) else {
let Ok(library_paths) = get_library_paths(&self.pool).await else {
return Err(tonic::Status::internal(""));
};
@@ -58,11 +53,7 @@ impl Settings for SettingsService {
) -> Result<tonic::Response<AddPathResponse>, tonic::Status> {
let input = request.into_inner();
let Ok(db) = self.pool.get() else {
return Err(tonic::Status::internal(""));
};
let Ok(insert_result) = insert_library_path(&db, input.path) else {
let Ok(insert_result) = insert_library_path(&self.pool, input.path).await else {
return Err(tonic::Status::internal(""));
};
@@ -81,11 +72,7 @@ impl Settings for SettingsService {
) -> Result<tonic::Response<DeletePathResponse>, tonic::Status> {
let input = request.into_inner();
let Ok(db) = self.pool.get() else {
return Err(tonic::Status::internal(""));
};
let Ok(delete_result) = delete_library_path(&db, input.id) else {
let Ok(delete_result) = delete_library_path(&self.pool, input.id).await else {
return Err(tonic::Status::internal(""));
};
@@ -104,15 +91,11 @@ impl Settings for SettingsService {
) -> Result<tonic::Response<RefreshPathResponse>, tonic::Status> {
let input = request.into_inner();
let Ok(db) = self.pool.get() else {
return Err(tonic::Status::internal(""));
};
let Ok(library_path) = get_library_path(&db, input.id) else {
let Ok(library_path) = get_library_path(&self.pool, input.id).await else {
return Err(tonic::Status::not_found(""));
};
let _ = index_path(library_path.path.into(), db, library_path.id).await;
let _ = index_path(library_path.path.into(), &self.pool, library_path.id).await;
let response = RefreshPathResponse {};