From c71d3128e27cfebdd4817c52e79b5aff70f0fc11 Mon Sep 17 00:00:00 2001 From: 409 Date: Wed, 27 Nov 2024 01:54:53 +0100 Subject: [PATCH] feat!: queue system --- proto/player.proto | 17 +++- src/library.rs | 4 +- src/main.rs | 26 +++++- src/music/mod.rs | 1 + src/music/player.rs | 213 ++++++++++++++++++++++++++++++++++++-------- src/music/queue.rs | 28 ++++++ src/player.rs | 150 ++++++++++++++++++++++++++++--- src/state.rs | 4 +- 8 files changed, 386 insertions(+), 57 deletions(-) create mode 100644 src/music/queue.rs diff --git a/proto/player.proto b/proto/player.proto index 087e54c..e86cd47 100644 --- a/proto/player.proto +++ b/proto/player.proto @@ -6,16 +6,20 @@ import 'library.proto'; package player; service Player { - rpc PlayTrack(PlayTrackRequest) returns (PlayTrackResponse); + rpc PlayTrack(TrackRequest) returns (PlayTrackResponse); rpc ResumeTrack(google.protobuf.Empty) returns (PauseState); rpc PauseTrack(google.protobuf.Empty) returns (PauseState); rpc TogglePause(google.protobuf.Empty) returns (PauseState); rpc GetStatus(google.protobuf.Empty) returns (stream PlayerStatus); rpc SeekPosition(SeekPositionRequest) returns (SeekPositionResponse); rpc SetVolume(SetVolumeRequest) returns (SetVolumeResponse); + rpc PlayTrackNext(TrackRequest) returns (Queue); + rpc AddTrackToQueue(TrackRequest) returns (Queue); + rpc SkipTrack(google.protobuf.Empty) returns (PlayerStatus); + rpc SkipToQueueIndex(SkipToQueueIndexRequest) returns (PlayerStatus); } -message PlayTrackRequest { +message TrackRequest { string hash = 1; } @@ -29,6 +33,11 @@ message PlayerStatus { bool is_paused = 2; float volume = 3; uint64 progress = 4; + repeated library.Track queue = 5; +} + +message Queue { + repeated library.Track tracks = 1; } message PauseState { @@ -50,3 +59,7 @@ message SetVolumeRequest { message SetVolumeResponse { float volume = 1; } + +message SkipToQueueIndexRequest { + uint32 index = 1; +} diff --git a/src/library.rs b/src/library.rs index fad3189..659ac29 100644 --- a/src/library.rs +++ b/src/library.rs @@ -41,7 +41,7 @@ impl Library for LibraryService { let response = TrackList { tracks: tracks - .iter() + .into_iter() .map(|t| Track { hash: t.hash.clone(), name: t.name.clone(), @@ -74,7 +74,7 @@ pub fn index_path( .collect(); let hashmaps: Vec> = entries - .par_iter() + .into_par_iter() .fold( || HashMap::new(), |mut acc: HashMap, entry| { diff --git a/src/main.rs b/src/main.rs index d31a411..6bfba72 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,6 +8,7 @@ use proto::player::player_server::PlayerServer; use proto::settings::settings_server::SettingsServer; use rodio::{OutputStream, Sink}; use state::{GrooveState, GrooveStateData}; +use std::sync::Arc; use tokio::sync::Mutex; use tonic::transport::Server; @@ -48,9 +49,28 @@ async fn main() -> Result<(), Box> { OutputStream::try_default().expect("Error getting audio output stream"); let sink = Sink::try_new(&stream_handle).expect("Error getting audio sink"); - let player = AudioPlayer::new(sink); + let player = Arc::new(Mutex::new(AudioPlayer::new(sink))); + let c_player = player.clone(); - let state = GrooveState::new(Mutex::new(GrooveStateData::new(player))); + let (watch_handle, mut rx) = player.lock().await.start_watching(); + + let player_rx_handle = tokio::spawn(async move { + while let Some(next) = rx.recv().await { + let mut player = player.lock().await; + match next { + Some(queued_track) => { + let _ = player + .play_track(queued_track.track, queued_track.path, false) + .await; + } + None => { + player.clear().await; + } + } + } + }); + + let state = GrooveState::new(Mutex::new(GrooveStateData::new(c_player))); let settings = SettingsService::new(state.clone(), pool.clone()); let library = LibraryService::new(state.clone(), pool.clone()); @@ -72,6 +92,8 @@ async fn main() -> Result<(), Box> { .await?; let _ = cover_server_handle.await; + let _ = watch_handle.await; + let _ = player_rx_handle.await; Ok(()) } diff --git a/src/music/mod.rs b/src/music/mod.rs index 1f78c1c..173a5ee 100644 --- a/src/music/mod.rs +++ b/src/music/mod.rs @@ -1,2 +1,3 @@ pub mod metadata; pub mod player; +pub mod queue; diff --git a/src/music/player.rs b/src/music/player.rs index fe3aea8..75a6aea 100644 --- a/src/music/player.rs +++ b/src/music/player.rs @@ -1,106 +1,248 @@ use std::{ + collections::VecDeque, fs, io::{BufReader, Cursor}, path::Path, + sync::Arc, time::Duration, }; use rodio::{Decoder, Sink}; +use tokio::{ + sync::{ + mpsc::{self, Receiver}, + Mutex, + }, + task::JoinHandle, +}; use crate::{database::tracks::Track, proto::player::PlayerStatus}; +use super::queue::QueuedTrack; + pub struct AudioPlayer { - sink: Sink, - currently_playing: Option, + sink: Arc>, + currently_playing: Arc>>, + queue: Arc>>, } impl AudioPlayer { + const WATCH_SLEEP_TIME: Duration = Duration::from_millis(20); pub fn new(sink: Sink) -> Self { sink.set_volume(0.5); Self { - sink, - currently_playing: None, + sink: Arc::new(Mutex::new(sink)), + currently_playing: Arc::new(Mutex::new(None)), + queue: Arc::new(Mutex::new(VecDeque::new())), } } - pub fn play_track

(&mut self, track: Track, path: P) -> Result<(), Box> + pub fn start_watching(&mut self) -> (JoinHandle<()>, Receiver>) { + let sink = self.sink.clone(); + let current_track = self.currently_playing.clone(); + let queue = self.queue.clone(); + + let (tx, rx) = mpsc::channel::>(128); + + ( + tokio::spawn(async move { + loop { + if let Some(current_track) = current_track.lock().await.as_mut() { + if current_track.duration as u128 <= sink.lock().await.get_pos().as_millis() + { + if let Err(_) = tx.send(queue.lock().await.pop_front()).await { + break; + } + } + } + tokio::time::sleep(Self::WATCH_SLEEP_TIME).await; + } + }), + rx, + ) + } + + pub async fn play_track

( + &mut self, + track: Track, + path: P, + clear_queue: bool, + ) -> Result<(), Box> where P: AsRef, { - self.sink.clear(); + let sink = self.sink.lock().await; + + sink.clear(); + if clear_queue { + self.queue.lock().await.clear(); + } let file = BufReader::new(Cursor::new(fs::read(path)?)); let source = Decoder::new(file)?; - self.currently_playing = Some(track); - self.sink.append(source); + *self.currently_playing.lock().await = Some(track); + sink.append(source); - self.sink.play(); + sink.play(); Ok(()) } - pub fn resume(&self) { - self.sink.play(); + pub async fn play_track_next( + &mut self, + track: Track, + path: &Path, + ) -> Result<(), Box> { + let sink = self.sink.lock().await; + + let mut queue = self.queue.lock().await; + + if sink.empty() && queue.is_empty() { + drop(queue); + drop(sink); + return self.play_track(track, path, false).await; + } + + let queued_track = QueuedTrack::new(track, path.to_path_buf()); + + queue.push_front(queued_track); + + Ok(()) } - pub fn pause(&self) { - self.sink.pause(); + pub async fn add_to_queue( + &mut self, + track: Track, + path: &Path, + ) -> Result<(), Box> { + let sink = self.sink.lock().await; + let mut queue = self.queue.lock().await; + + if sink.empty() && queue.is_empty() { + drop(queue); + drop(sink); + return self.play_track(track, path, false).await; + } + + let queued_track = QueuedTrack::new(track, path.to_path_buf()); + + queue.push_back(queued_track); + + Ok(()) + } + + pub async fn skip_track(&mut self) -> Result<(), Box> { + let mut queue = self.queue.lock().await; + + let Some(queued_track) = queue.pop_front() else { + drop(queue); + self.clear().await; + + return Ok(()); + }; + + drop(queue); + + self.play_track(queued_track.track, queued_track.path, false) + .await + } + + pub async fn skip_to_queue_index( + &mut self, + index: usize, + ) -> Result<(), Box> { + let mut queue = self.queue.lock().await; + + let Some(queued_track) = queue.remove(index) else { + drop(queue); + self.clear().await; + + return Ok(()); + }; + + drop(queue); + self.play_track(queued_track.track, queued_track.path, false) + .await + } + + pub async fn resume(&self) { + self.sink.lock().await.play(); + } + + pub async fn pause(&self) { + self.sink.lock().await.pause(); } /// Toggles the player's pause state and returns the new value - pub fn toggle_pause(&self) -> bool { - if self.is_paused() { - self.resume(); + pub async fn toggle_pause(&self) -> bool { + if self.is_paused().await { + self.resume().await; false } else { - self.pause(); + self.pause().await; true } } - pub fn volume(&self) -> f32 { - self.sink.volume() + pub async fn volume(&self) -> f32 { + self.sink.lock().await.volume() } - pub fn set_volume(&self, value: f32) { - self.sink.set_volume(value); + pub async fn set_volume(&self, value: f32) { + self.sink.lock().await.set_volume(value); } - pub fn position(&self) -> u128 { - self.sink.get_pos().as_millis() + pub async fn position(&self) -> u128 { + self.sink.lock().await.get_pos().as_millis() } - pub fn set_position(&self, position: u64) { - let _ = self.sink.try_seek(Duration::from_millis(position)); + pub async fn set_position(&self, position: u64) { + let _ = self + .sink + .lock() + .await + .try_seek(Duration::from_millis(position)); } - pub fn is_paused(&self) -> bool { - self.sink.is_paused() + pub async fn is_paused(&self) -> bool { + self.sink.lock().await.is_paused() } - pub fn currently_playing(&self) -> Option { - self.currently_playing.clone() + pub async fn currently_playing(&self) -> Option { + self.currently_playing.lock().await.clone() } - pub fn get_snapshot(&self) -> StatusSnapshot { + pub async fn queue(&self) -> VecDeque { + self.queue.lock().await.clone() + } + + pub async fn clear(&mut self) { + self.queue.lock().await.clear(); + self.sink.lock().await.clear(); + *self.currently_playing.lock().await = None; + } + + pub async fn get_snapshot(&self) -> StatusSnapshot { StatusSnapshot { - volume: self.volume(), - position: self.position(), - is_paused: self.is_paused(), - currently_playing: self.currently_playing(), + volume: self.volume().await, + position: self.position().await, + is_paused: self.is_paused().await, + currently_playing: self.currently_playing().await, + queue: self.queue().await, } } } -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone)] pub struct StatusSnapshot { pub volume: f32, pub position: u128, pub is_paused: bool, pub currently_playing: Option, + pub queue: VecDeque, } impl Into for StatusSnapshot { @@ -110,6 +252,7 @@ impl Into for StatusSnapshot { is_paused: self.is_paused, progress: self.position as u64, currently_playing: self.currently_playing.clone().map(|t| t.into()), + queue: Vec::from_iter(self.queue.into_iter().map(|t| t.track.into())), } } } diff --git a/src/music/queue.rs b/src/music/queue.rs new file mode 100644 index 0000000..eb06b77 --- /dev/null +++ b/src/music/queue.rs @@ -0,0 +1,28 @@ +use std::{collections::VecDeque, path::PathBuf}; + +use crate::{database::tracks::Track, proto}; + +#[derive(Debug, Clone)] +pub struct QueuedTrack { + pub track: Track, + pub path: PathBuf, +} + +impl QueuedTrack { + pub fn new(track: Track, path: PathBuf) -> Self { + Self { track, path } + } +} + +impl From for Track { + fn from(value: QueuedTrack) -> Track { + value.track + } +} + +pub fn queue_to_track_vec(queue: VecDeque) -> Vec { + queue + .into_iter() + .map(|queued_track| Track::from(queued_track).into()) + .collect() +} diff --git a/src/player.rs b/src/player.rs index 51a6a8e..1c7f619 100644 --- a/src/player.rs +++ b/src/player.rs @@ -6,11 +6,13 @@ use tokio_stream::{wrappers::ReceiverStream, Stream}; use crate::{ database::tracks::{get_track, get_track_full_path}, + music::queue::queue_to_track_vec, proto::{ self, player::{ - player_server::Player, PauseState, PlayTrackRequest, PlayTrackResponse, PlayerStatus, + player_server::Player, PauseState, PlayTrackResponse, PlayerStatus, Queue, SeekPositionRequest, SeekPositionResponse, SetVolumeRequest, SetVolumeResponse, + SkipToQueueIndexRequest, TrackRequest, }, }, state::GrooveState, @@ -33,7 +35,7 @@ impl Player for PlayerService { async fn play_track( &self, - request: tonic::Request, + request: tonic::Request, ) -> Result, tonic::Status> { let Ok(db) = self.pool.get() else { return Err(tonic::Status::internal("")); @@ -49,9 +51,14 @@ impl Player for PlayerService { return Err(tonic::Status::not_found("")); }; - let mut state = self.state.lock().await; + let state = self.state.lock().await; - let _ = state.player.play_track(track.clone(), track_path); + let _ = state + .player + .lock() + .await + .play_track(track.clone(), track_path, true) + .await; let response = PlayTrackResponse { track: Some(proto::library::Track::from(track.into())), @@ -63,11 +70,11 @@ impl Player for PlayerService { async fn resume_track( &self, - _request: tonic::Request<()>, + _: tonic::Request<()>, ) -> Result, tonic::Status> { let state = self.state.lock().await; - state.player.resume(); + state.player.lock().await.resume().await; let response = PauseState { is_paused: false }; @@ -76,11 +83,11 @@ impl Player for PlayerService { async fn pause_track( &self, - _request: tonic::Request<()>, + _: tonic::Request<()>, ) -> Result, tonic::Status> { let state = self.state.lock().await; - state.player.pause(); + state.player.lock().await.pause().await; let response = PauseState { is_paused: true }; @@ -89,11 +96,11 @@ impl Player for PlayerService { async fn toggle_pause( &self, - _request: tonic::Request<()>, + _: tonic::Request<()>, ) -> Result, tonic::Status> { let state = self.state.lock().await; - let is_paused = state.player.toggle_pause(); + let is_paused = state.player.lock().await.toggle_pause().await; let response = PauseState { is_paused }; @@ -107,7 +114,7 @@ impl Player for PlayerService { let input = request.get_ref(); let state = self.state.lock().await; - state.player.set_position(input.position); + state.player.lock().await.set_position(input.position).await; let response = SeekPositionResponse { position: input.position, @@ -123,7 +130,7 @@ impl Player for PlayerService { let input = request.get_ref(); let state = self.state.lock().await; - state.player.set_volume(input.volume); + state.player.lock().await.set_volume(input.volume).await; let response = SetVolumeResponse { volume: input.volume, @@ -134,7 +141,7 @@ impl Player for PlayerService { async fn get_status( &self, - _request: tonic::Request<()>, + _: tonic::Request<()>, ) -> Result, tonic::Status> { let state = self.state.clone(); @@ -145,7 +152,15 @@ impl Player for PlayerService { tokio::spawn(async move { while !tx.is_closed() { if let Err(_) = tx - .send(Ok(state.lock().await.player.get_snapshot().into())) + .send(Ok(state + .lock() + .await + .player + .lock() + .await + .get_snapshot() + .await + .into())) .await { break; @@ -161,4 +176,111 @@ impl Player for PlayerService { Ok(tonic::Response::new(response)) } + + async fn add_track_to_queue( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let Ok(db) = self.pool.get() else { + return Err(tonic::Status::internal("")); + }; + + let input = request.get_ref(); + + let Ok(track) = get_track(&db, input.hash.as_str()) else { + return Err(tonic::Status::not_found("")); + }; + + let Ok(track_path) = get_track_full_path(&db, input.hash.as_str()) else { + return Err(tonic::Status::not_found("")); + }; + + let state = self.state.lock().await; + let mut player = state.player.lock().await; + + if let Err(_) = player.add_to_queue(track, &track_path).await { + return Err(tonic::Status::internal("")); + } + + let queue = player.queue().await; + + let response = Queue { + tracks: queue_to_track_vec(queue), + }; + + Ok(tonic::Response::new(response)) + } + + async fn play_track_next( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let Ok(db) = self.pool.get() else { + return Err(tonic::Status::internal("")); + }; + + let input = request.get_ref(); + + let Ok(track) = get_track(&db, input.hash.as_str()) else { + return Err(tonic::Status::not_found("")); + }; + + let Ok(track_path) = get_track_full_path(&db, input.hash.as_str()) else { + return Err(tonic::Status::not_found("")); + }; + + let state = self.state.lock().await; + let mut player = state.player.lock().await; + + if let Err(_) = player.play_track_next(track, &track_path).await { + return Err(tonic::Status::internal("")); + } + + let queue = player.queue().await; + + let response = Queue { + tracks: queue_to_track_vec(queue), + }; + + Ok(tonic::Response::new(response)) + } + + async fn skip_track( + &self, + _: tonic::Request<()>, + ) -> Result, tonic::Status> { + let state = self.state.lock().await; + + let mut player = state.player.lock().await; + + if let Err(_) = player.skip_track().await { + return Err(tonic::Status::internal("")); + }; + + let response = player.get_snapshot().await.into(); + + Ok(tonic::Response::new(response)) + } + + async fn skip_to_queue_index( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let input = request.get_ref(); + + let Ok(index) = input.index.try_into() else { + return Err(tonic::Status::internal("")); + }; + + let state = self.state.lock().await; + let mut player = state.player.lock().await; + + if let Err(_) = player.skip_to_queue_index(index).await { + return Err(tonic::Status::internal("")); + }; + + let response = player.get_snapshot().await.into(); + + Ok(tonic::Response::new(response)) + } } diff --git a/src/state.rs b/src/state.rs index 020b888..2792f11 100644 --- a/src/state.rs +++ b/src/state.rs @@ -7,11 +7,11 @@ use crate::music::player::AudioPlayer; pub type GrooveState = Arc>; pub struct GrooveStateData { - pub player: AudioPlayer, + pub player: Arc>, } impl GrooveStateData { - pub fn new(player: AudioPlayer) -> Self { + pub fn new(player: Arc>) -> Self { Self { player } } }