use std::{ collections::VecDeque, fs, io::{BufReader, Cursor}, path::Path, sync::Arc, time::Duration, }; use rodio::{Decoder, Sink}; use tokio::{ sync::{ mpsc::{self, Receiver}, Mutex, }, task::JoinHandle, }; use crate::{database::tracks::Track, proto::player::PlayerStatus}; use super::queue::QueuedTrack; pub struct AudioPlayer { sink: Arc>, currently_playing: Arc>>, queue: Arc>>, } impl AudioPlayer { const WATCH_SLEEP_TIME: Duration = Duration::from_millis(20); pub fn new(sink: Sink) -> Self { sink.set_volume(0.5); Self { sink: Arc::new(Mutex::new(sink)), currently_playing: Arc::new(Mutex::new(None)), queue: Arc::new(Mutex::new(VecDeque::new())), } } pub fn start_watching(&mut self) -> (JoinHandle<()>, Receiver>) { let sink = self.sink.clone(); let current_track = self.currently_playing.clone(); let queue = self.queue.clone(); let (tx, rx) = mpsc::channel::>(128); ( tokio::spawn(async move { loop { if let Some(current_track) = current_track.lock().await.as_mut() { let sink = sink.lock().await; if sink.empty() || current_track.duration as u128 <= sink.get_pos().as_millis() { if let Err(_) = tx.send(queue.lock().await.pop_front()).await { break; } } } tokio::time::sleep(Self::WATCH_SLEEP_TIME).await; } }), rx, ) } pub async fn play_track

( &mut self, track: Track, path: P, clear_queue: bool, ) -> Result<(), Box> where P: AsRef, { let sink = self.sink.lock().await; sink.clear(); if clear_queue { self.queue.lock().await.clear(); } let file = BufReader::new(Cursor::new(fs::read(path)?)); let source = Decoder::new(file)?; *self.currently_playing.lock().await = Some(track); sink.append(source); sink.play(); Ok(()) } pub async fn play_track_next( &mut self, track: Track, path: &Path, ) -> Result<(), Box> { let sink = self.sink.lock().await; let mut queue = self.queue.lock().await; if sink.empty() && queue.is_empty() { drop(queue); drop(sink); return self.play_track(track, path, false).await; } let queued_track = QueuedTrack::new(track, path.to_path_buf()); queue.push_front(queued_track); Ok(()) } pub async fn add_to_queue( &mut self, track: Track, path: &Path, ) -> Result<(), Box> { let sink = self.sink.lock().await; let mut queue = self.queue.lock().await; if sink.empty() && queue.is_empty() { drop(queue); drop(sink); return self.play_track(track, path, false).await; } let queued_track = QueuedTrack::new(track, path.to_path_buf()); queue.push_back(queued_track); Ok(()) } pub async fn skip_track(&mut self) -> Result<(), Box> { let mut queue = self.queue.lock().await; let Some(queued_track) = queue.pop_front() else { drop(queue); self.clear().await; return Ok(()); }; drop(queue); self.play_track(queued_track.track, queued_track.path, false) .await } pub async fn skip_to_queue_index( &mut self, index: usize, ) -> Result<(), Box> { let mut queue = self.queue.lock().await; let Some(queued_track) = queue.remove(index) else { drop(queue); self.clear().await; return Ok(()); }; drop(queue); self.play_track(queued_track.track, queued_track.path, false) .await } pub async fn swap_queue_indices(&mut self, a: usize, b: usize) -> bool { let mut queue = self.queue.lock().await; let len = queue.len(); if a >= len || b >= len { return false; } queue.swap(a, b); true } pub async fn resume(&self) { self.sink.lock().await.play(); } pub async fn pause(&self) { self.sink.lock().await.pause(); } /// Toggles the player's pause state and returns the new value pub async fn toggle_pause(&self) -> bool { if self.is_paused().await { self.resume().await; false } else { self.pause().await; true } } pub async fn volume(&self) -> f32 { self.sink.lock().await.volume() } pub async fn set_volume(&self, value: f32) { self.sink.lock().await.set_volume(value); } pub async fn position(&self) -> u128 { self.sink.lock().await.get_pos().as_millis() } pub async fn set_position(&self, position: u64) { let _ = self .sink .lock() .await .try_seek(Duration::from_millis(position)); } pub async fn is_paused(&self) -> bool { self.sink.lock().await.is_paused() } pub async fn currently_playing(&self) -> Option { self.currently_playing.lock().await.clone() } pub async fn queue(&self) -> VecDeque { self.queue.lock().await.clone() } pub async fn clear(&mut self) { self.queue.lock().await.clear(); self.sink.lock().await.clear(); *self.currently_playing.lock().await = None; } pub async fn get_snapshot(&self) -> StatusSnapshot { StatusSnapshot { volume: self.volume().await, position: self.position().await, is_paused: self.is_paused().await, currently_playing: self.currently_playing().await, queue: self.queue().await, } } } #[derive(Debug, Clone)] pub struct StatusSnapshot { pub volume: f32, pub position: u128, pub is_paused: bool, pub currently_playing: Option, pub queue: VecDeque, } impl Into for StatusSnapshot { fn into(self) -> PlayerStatus { PlayerStatus { volume: self.volume, is_paused: self.is_paused, progress: self.position as u64, currently_playing: self.currently_playing.clone().map(|t| t.into()), queue: Vec::from_iter(self.queue.into_iter().map(|t| t.track.into())), } } }