use r2d2::Pool; use r2d2_sqlite::SqliteConnectionManager; use std::{pin::Pin, time::Duration}; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, Stream}; use crate::{ database::tracks::{get_track, get_track_full_path}, music::queue::queue_to_track_vec, proto::{ self, player::{ player_server::Player, PauseState, PlayTrackResponse, PlayerStatus, Queue, SeekPositionRequest, SeekPositionResponse, SetVolumeRequest, SetVolumeResponse, SkipToQueueIndexRequest, TrackRequest, }, }, state::GrooveState, }; pub struct PlayerService { state: GrooveState, pool: Pool, } impl PlayerService { pub fn new(state: GrooveState, pool: Pool) -> Self { Self { state, pool } } } #[tonic::async_trait] impl Player for PlayerService { type GetStatusStream = Pin> + Send>>; async fn play_track( &self, request: tonic::Request, ) -> Result, tonic::Status> { let Ok(db) = self.pool.get() else { return Err(tonic::Status::internal("")); }; let input = request.get_ref(); let Ok(track) = get_track(&db, input.hash.as_str()) else { return Err(tonic::Status::not_found("")); }; let Ok(track_path) = get_track_full_path(&db, input.hash.as_str()) else { return Err(tonic::Status::not_found("")); }; let state = self.state.lock().await; let _ = state .player .lock() .await .play_track(track.clone(), track_path, true) .await; let response = PlayTrackResponse { track: Some(proto::library::Track::from(track.into())), position: 0, }; Ok(tonic::Response::new(response)) } async fn resume_track( &self, _: tonic::Request<()>, ) -> Result, tonic::Status> { let state = self.state.lock().await; state.player.lock().await.resume().await; let response = PauseState { is_paused: false }; Ok(tonic::Response::new(response)) } async fn pause_track( &self, _: tonic::Request<()>, ) -> Result, tonic::Status> { let state = self.state.lock().await; state.player.lock().await.pause().await; let response = PauseState { is_paused: true }; Ok(tonic::Response::new(response)) } async fn toggle_pause( &self, _: tonic::Request<()>, ) -> Result, tonic::Status> { let state = self.state.lock().await; let is_paused = state.player.lock().await.toggle_pause().await; let response = PauseState { is_paused }; Ok(tonic::Response::new(response)) } async fn seek_position( &self, request: tonic::Request, ) -> Result, tonic::Status> { let input = request.get_ref(); let state = self.state.lock().await; state.player.lock().await.set_position(input.position).await; let response = SeekPositionResponse { position: input.position, }; Ok(tonic::Response::new(response)) } async fn set_volume( &self, request: tonic::Request, ) -> Result, tonic::Status> { let input = request.get_ref(); let state = self.state.lock().await; state.player.lock().await.set_volume(input.volume).await; let response = SetVolumeResponse { volume: input.volume, }; Ok(tonic::Response::new(response)) } async fn get_status( &self, _: tonic::Request<()>, ) -> Result, tonic::Status> { let state = self.state.clone(); let (tx, rx) = mpsc::channel::>(128); let sleep_duration = Duration::from_millis(1000); tokio::spawn(async move { while !tx.is_closed() { if let Err(_) = tx .send(Ok(state .lock() .await .player .lock() .await .get_snapshot() .await .into())) .await { break; } tokio::time::sleep(sleep_duration).await; } }); let output_stream = ReceiverStream::new(rx); let response = Box::pin(output_stream) as Self::GetStatusStream; Ok(tonic::Response::new(response)) } async fn add_track_to_queue( &self, request: tonic::Request, ) -> Result, tonic::Status> { let Ok(db) = self.pool.get() else { return Err(tonic::Status::internal("")); }; let input = request.get_ref(); let Ok(track) = get_track(&db, input.hash.as_str()) else { return Err(tonic::Status::not_found("")); }; let Ok(track_path) = get_track_full_path(&db, input.hash.as_str()) else { return Err(tonic::Status::not_found("")); }; let state = self.state.lock().await; let mut player = state.player.lock().await; if let Err(_) = player.add_to_queue(track, &track_path).await { return Err(tonic::Status::internal("")); } let queue = player.queue().await; let response = Queue { tracks: queue_to_track_vec(queue), }; Ok(tonic::Response::new(response)) } async fn play_track_next( &self, request: tonic::Request, ) -> Result, tonic::Status> { let Ok(db) = self.pool.get() else { return Err(tonic::Status::internal("")); }; let input = request.get_ref(); let Ok(track) = get_track(&db, input.hash.as_str()) else { return Err(tonic::Status::not_found("")); }; let Ok(track_path) = get_track_full_path(&db, input.hash.as_str()) else { return Err(tonic::Status::not_found("")); }; let state = self.state.lock().await; let mut player = state.player.lock().await; if let Err(_) = player.play_track_next(track, &track_path).await { return Err(tonic::Status::internal("")); } let queue = player.queue().await; let response = Queue { tracks: queue_to_track_vec(queue), }; Ok(tonic::Response::new(response)) } async fn skip_track( &self, _: tonic::Request<()>, ) -> Result, tonic::Status> { let state = self.state.lock().await; let mut player = state.player.lock().await; if let Err(_) = player.skip_track().await { return Err(tonic::Status::internal("")); }; let response = player.get_snapshot().await.into(); Ok(tonic::Response::new(response)) } async fn skip_to_queue_index( &self, request: tonic::Request, ) -> Result, tonic::Status> { let input = request.get_ref(); let Ok(index) = input.index.try_into() else { return Err(tonic::Status::internal("")); }; let state = self.state.lock().await; let mut player = state.player.lock().await; if let Err(_) = player.skip_to_queue_index(index).await { return Err(tonic::Status::internal("")); }; let response = player.get_snapshot().await.into(); Ok(tonic::Response::new(response)) } }