use deadpool_sqlite::Pool; use std::{path::PathBuf, pin::Pin, time::Duration}; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, Stream}; use crate::{ database::tracks::{ get_playlist, get_specific_tracks, get_specific_tracks_full_path, get_track, get_track_full_path, Track, }, music::queue::queue_to_track_vec, proto::{ self, player::{ player_server::Player, PauseState, PlayPlaylistRequest, PlayTrackResponse, PlayerStatus, Queue, SeekPositionRequest, SeekPositionResponse, SetVolumeRequest, SetVolumeResponse, SkipToQueueIndexRequest, SwapQueueIndicesRequest, TrackRequest, TracksRequest, }, }, state::GrooveState, }; pub struct PlayerService { state: GrooveState, pool: Pool, } impl PlayerService { pub fn new(state: GrooveState, pool: Pool) -> Self { Self { state, pool } } } #[tonic::async_trait] impl Player for PlayerService { type GetStatusStream = Pin> + Send>>; async fn play_track( &self, request: tonic::Request, ) -> Result, tonic::Status> { let input = request.get_ref(); let Ok(track) = get_track(&self.pool, input.hash.as_str()).await else { return Err(tonic::Status::not_found("Could not get track")); }; let Ok(track_path) = get_track_full_path(&self.pool, input.hash.as_str()).await else { return Err(tonic::Status::not_found("Could not get track file path")); }; let state = self.state.lock().await; let player = &mut state.player.lock().await; let _ = player.play_track(track.clone(), track_path, true); let response = PlayTrackResponse { track: Some(proto::library::Track::from(track.into())), position: 0, }; Ok(tonic::Response::new(response)) } async fn resume_track( &self, _: tonic::Request<()>, ) -> Result, tonic::Status> { let state = self.state.lock().await; state.player.lock().await.resume(); let response = PauseState { is_paused: false }; Ok(tonic::Response::new(response)) } async fn pause_track( &self, _: tonic::Request<()>, ) -> Result, tonic::Status> { let state = self.state.lock().await; state.player.lock().await.pause(); let response = PauseState { is_paused: true }; Ok(tonic::Response::new(response)) } async fn toggle_pause( &self, _: tonic::Request<()>, ) -> Result, tonic::Status> { let state = self.state.lock().await; let is_paused = state.player.lock().await.toggle_pause(); let response = PauseState { is_paused }; Ok(tonic::Response::new(response)) } async fn seek_position( &self, request: tonic::Request, ) -> Result, tonic::Status> { let input = request.get_ref(); let state = self.state.lock().await; state.player.lock().await.set_position(input.position); let response = SeekPositionResponse { position: input.position, }; Ok(tonic::Response::new(response)) } async fn set_volume( &self, request: tonic::Request, ) -> Result, tonic::Status> { let input = request.get_ref(); let state = self.state.lock().await; state.player.lock().await.set_volume(input.volume); let response = SetVolumeResponse { volume: input.volume, }; Ok(tonic::Response::new(response)) } async fn get_status( &self, _: tonic::Request<()>, ) -> Result, tonic::Status> { let state = self.state.clone(); let (tx, rx) = mpsc::channel::>(128); let sleep_duration = Duration::from_millis(1000); tokio::spawn(async move { while !tx.is_closed() { if let Err(_) = tx .send(Ok(state .lock() .await .player .lock() .await .get_snapshot() .into())) .await { break; } tokio::time::sleep(sleep_duration).await; } }); let output_stream = ReceiverStream::new(rx); let response = Box::pin(output_stream) as Self::GetStatusStream; Ok(tonic::Response::new(response)) } async fn add_track_to_queue( &self, request: tonic::Request, ) -> Result, tonic::Status> { let input = request.get_ref(); let Ok(track) = get_track(&self.pool, input.hash.as_str()).await else { return Err(tonic::Status::not_found("")); }; let Ok(track_path) = get_track_full_path(&self.pool, input.hash.as_str()).await else { return Err(tonic::Status::not_found("")); }; let state = self.state.lock().await; let mut player = state.player.lock().await; if let Err(_) = player.add_to_queue(track, &track_path).await { return Err(tonic::Status::internal("")); } let queue = player.queue(); let response = Queue { tracks: queue_to_track_vec(queue), }; Ok(tonic::Response::new(response)) } async fn add_tracks_to_queue( &self, request: tonic::Request, ) -> Result, tonic::Status> { let input = request.get_ref(); let Ok(tracks) = get_specific_tracks(&self.pool, &input.tracks).await else { return Err(tonic::Status::not_found("")); }; let Ok(track_paths) = get_specific_tracks_full_path(&self.pool, &input.tracks).await else { return Err(tonic::Status::not_found("")); }; let state = self.state.lock().await; let mut player = state.player.lock().await; let tracks_and_paths: Vec<(Track, PathBuf)> = tracks.into_iter().zip(track_paths.into_iter()).collect(); if let Err(_) = player.add_tracks_to_queue(tracks_and_paths, false).await { return Err(tonic::Status::internal("")); } let queue = player.queue(); let response = Queue { tracks: queue_to_track_vec(queue), }; Ok(tonic::Response::new(response)) } async fn play_playlist( &self, request: tonic::Request, ) -> Result, tonic::Status> { let input = request.get_ref(); let Ok(playlist) = get_playlist(&self.pool, input.id).await else { return Err(tonic::Status::not_found("")); }; let Ok(track_paths) = get_specific_tracks_full_path( &self.pool, &playlist.tracks.iter().map(|t| t.hash.clone()).collect(), ) .await else { return Err(tonic::Status::not_found("")); }; let state = self.state.lock().await; let mut player = state.player.lock().await; let tracks = playlist.tracks.iter().map(|t| t.clone().into()); let tracks_and_paths: Vec<(Track, PathBuf)> = tracks .into_iter() .zip(track_paths.into_iter()) .skip(input.starting_rank.unwrap_or(0) as usize) .collect(); if let Err(_) = player.add_tracks_to_queue(tracks_and_paths, true).await { return Err(tonic::Status::internal("")); } let response = player.get_snapshot().into(); Ok(tonic::Response::new(response)) } async fn play_track_next( &self, request: tonic::Request, ) -> Result, tonic::Status> { let input = request.get_ref(); let Ok(track) = get_track(&self.pool, input.hash.as_str()).await else { return Err(tonic::Status::not_found("")); }; let Ok(track_path) = get_track_full_path(&self.pool, input.hash.as_str()).await else { return Err(tonic::Status::not_found("")); }; let state = self.state.lock().await; let mut player = state.player.lock().await; if let Err(_) = player.play_track_next(track, &track_path) { return Err(tonic::Status::internal("")); } let queue = player.queue(); let response = Queue { tracks: queue_to_track_vec(queue), }; Ok(tonic::Response::new(response)) } async fn skip_track( &self, _: tonic::Request<()>, ) -> Result, tonic::Status> { let state = self.state.lock().await; let mut player = state.player.lock().await; if let Err(_) = player.skip_track() { return Err(tonic::Status::internal("")); }; let response = player.get_snapshot().into(); Ok(tonic::Response::new(response)) } async fn skip_to_queue_index( &self, request: tonic::Request, ) -> Result, tonic::Status> { let input = request.get_ref(); let Ok(index) = input.index.try_into() else { return Err(tonic::Status::internal("")); }; let state = self.state.lock().await; let mut player = state.player.lock().await; if let Err(_) = player.skip_to_queue_index(index) { return Err(tonic::Status::internal("")); }; let response = player.get_snapshot().into(); Ok(tonic::Response::new(response)) } async fn swap_queue_indices( &self, request: tonic::Request, ) -> Result, tonic::Status> { let input = request.get_ref(); let Ok(a) = input.a.try_into() else { return Err(tonic::Status::internal("")); }; let Ok(b) = input.b.try_into() else { return Err(tonic::Status::internal("")); }; let state = self.state.lock().await; let mut player = state.player.lock().await; if !player.swap_queue_indices(a, b) { return Err(tonic::Status::internal("")); }; let queue = player.queue(); let response = Queue { tracks: queue_to_track_vec(queue), }; Ok(tonic::Response::new(response)) } }