diff --git a/Cargo.lock b/Cargo.lock index b726943..97d84af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -741,6 +741,7 @@ dependencies = [ "sha2", "symphonia", "tokio", + "tokio-stream", "tonic", "tonic-build", "tonic-web", diff --git a/Cargo.toml b/Cargo.toml index e2d4b11..ba89c50 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ rusqlite = { version = "0.32.1", features = ["bundled"] } sha2 = "0.10.8" symphonia = { version = "0.5.4", features = ["mp3"] } tokio = { version = "1.41.1", features = ["full"] } +tokio-stream = "0.1.16" tonic = "0.12.3" tonic-web = "0.12.3" tower-http = { version = "0.6.2", features = ["cors", "fs"] } diff --git a/proto/library.proto b/proto/library.proto index 3787455..ee0ed1d 100644 --- a/proto/library.proto +++ b/proto/library.proto @@ -17,4 +17,5 @@ message Track { string name = 2; string artist_name = 3; uint64 artist_id = 4; + uint64 duration = 5; } diff --git a/proto/player.proto b/proto/player.proto index 837622d..087e54c 100644 --- a/proto/player.proto +++ b/proto/player.proto @@ -1,13 +1,18 @@ syntax = "proto3"; import 'google/protobuf/empty.proto'; +import 'library.proto'; package player; service Player { rpc PlayTrack(PlayTrackRequest) returns (PlayTrackResponse); - rpc ResumeTrack(google.protobuf.Empty) returns (google.protobuf.Empty); - rpc PauseTrack(google.protobuf.Empty) returns (google.protobuf.Empty); + rpc ResumeTrack(google.protobuf.Empty) returns (PauseState); + rpc PauseTrack(google.protobuf.Empty) returns (PauseState); + rpc TogglePause(google.protobuf.Empty) returns (PauseState); + rpc GetStatus(google.protobuf.Empty) returns (stream PlayerStatus); + rpc SeekPosition(SeekPositionRequest) returns (SeekPositionResponse); + rpc SetVolume(SetVolumeRequest) returns (SetVolumeResponse); } message PlayTrackRequest { @@ -15,4 +20,33 @@ message PlayTrackRequest { } message PlayTrackResponse { + library.Track track = 1; + uint64 position = 2; +} + +message PlayerStatus { + optional library.Track currently_playing = 1; + bool is_paused = 2; + float volume = 3; + uint64 progress = 4; +} + +message PauseState { + bool is_paused = 1; +} + +message SeekPositionRequest { + uint64 position = 1; +} + +message SeekPositionResponse { + uint64 position = 1; +} + +message SetVolumeRequest { + float volume = 1; +} + +message SetVolumeResponse { + float volume = 1; } diff --git a/src/checksum.rs b/src/checksum.rs index 550d9b1..1416764 100644 --- a/src/checksum.rs +++ b/src/checksum.rs @@ -1,4 +1,4 @@ -use sha2::{Digest, Sha256, self}; +use sha2::{self, Digest, Sha256}; pub fn generate_hash(content: impl AsRef<[u8]>) -> String { let mut hasher = Sha256::new(); diff --git a/src/covers.rs b/src/covers.rs index 141c9dc..abc0f98 100644 --- a/src/covers.rs +++ b/src/covers.rs @@ -21,7 +21,6 @@ pub fn get_all_cover_hashes() -> Vec { let _ = fs::create_dir_all(base_path); } - let walkdir = walkdir::WalkDir::new(path).min_depth(1).max_depth(1); let hashes: Vec = walkdir diff --git a/src/database/mod.rs b/src/database/mod.rs index 36662d1..02d976e 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1,6 +1,6 @@ +pub mod artists; pub mod paths; pub mod tracks; -pub mod artists; use r2d2::{Pool, PooledConnection}; use r2d2_sqlite::SqliteConnectionManager; @@ -37,21 +37,21 @@ pub fn initialize_database( [], )?; - connection - .execute( - " + connection.execute( + " CREATE TABLE IF NOT EXISTS tracks ( hash TEXT PRIMARY KEY NOT NULL, library_path_id INTEGER NOT NULL, name TEXT NOT NULL, artist_id INTEGER NOT NULL, path TEXT NOT NULL, + duration INTEGER NOT NULL, FOREIGN KEY (library_path_id) REFERENCES library_paths (id) ON DELETE CASCADE, FOREIGN KEY (artist_id) REFERENCES artists (id) ON DELETE CASCADE ); ", - [], - )?; + [], + )?; Ok(()) } diff --git a/src/database/tracks.rs b/src/database/tracks.rs index ac08a82..23b7bc1 100644 --- a/src/database/tracks.rs +++ b/src/database/tracks.rs @@ -11,31 +11,58 @@ use rusqlite::{params, Row}; use crate::{ covers::{get_all_cover_hashes, write_cover}, music::metadata::TrackMetadata, + proto, }; use super::artists::get_artists; -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq)] pub struct Track { pub hash: String, pub name: String, pub artist_name: String, pub artist_id: u64, + pub duration: u64, +} + +impl Into for Track { + fn into(self) -> proto::library::Track { + proto::library::Track { + hash: self.hash, + name: self.name, + artist_name: self.artist_name, + artist_id: self.artist_id, + duration: self.duration, + } + } +} + +impl From for Track { + fn from(value: proto::library::Track) -> Self { + Track { + hash: value.hash, + name: value.name, + artist_name: value.artist_name, + artist_id: value.artist_id, + duration: value.duration, + } + } } fn map_track(row: &Row) -> Result { Ok(Track { hash: row.get(0)?, name: row.get(1)?, - artist_id: row.get(2)?, - artist_name: row.get(3)?, + duration: row.get(2)?, + artist_id: row.get(3)?, + artist_name: row.get(4)?, }) } pub fn get_tracks( connection: &PooledConnection, ) -> Result, rusqlite::Error> { - let mut statement = connection.prepare("SELECT t.hash, t.name, t.artist_id, a.name AS artist_name FROM tracks t INNER JOIN artists a ON a.id = t.artist_id")?; + let mut statement = connection.prepare("SELECT t.hash, t.name, t.duration, t.artist_id, a.name AS artist_name FROM tracks t INNER JOIN artists a ON a.id = t.artist_id")?; let rows = statement.query_map([], map_track)?; let mut tracks: Vec = Vec::new(); @@ -53,7 +80,7 @@ pub fn get_track( connection: &PooledConnection, hash: &str, ) -> Result { - connection.query_row("SELECT t.hash, t.name, t.artist_id, a.name AS artist_name FROM tracks t INNER JOIN artists a ON a.id = t.artist_id WHERE t.hash = ?1", [hash], map_track) + connection.query_row("SELECT t.hash, t.name, t.duration, t.artist_id, a.name AS artist_name FROM tracks t INNER JOIN artists a ON a.id = t.artist_id WHERE t.hash = ?1", [hash], map_track) } pub fn get_track_full_path( @@ -122,7 +149,7 @@ pub fn insert_tracks( { let mut statement = - tx.prepare("INSERT OR REPLACE INTO tracks (hash, library_path_id, name, artist_id, path) VALUES (?1, ?2, ?3, ?4, ?5)")?; + tx.prepare("INSERT OR REPLACE INTO tracks (hash, library_path_id, name, artist_id, path, duration) VALUES (?1, ?2, ?3, ?4, ?5, ?6)")?; for (hash, meta) in tracks { statement.execute(params![ @@ -131,6 +158,7 @@ pub fn insert_tracks( meta.name, artist_names_to_id[&meta.artist_name], meta.path, + meta.total_seconds * 1000 ])?; if let Some(cover) = meta.cover { diff --git a/src/library.rs b/src/library.rs index 1476e68..fad3189 100644 --- a/src/library.rs +++ b/src/library.rs @@ -9,7 +9,7 @@ use crate::{ checksum::generate_hash, database::tracks::{get_tracks, insert_tracks}, music::metadata::{extract_track_data, TrackMetadata}, - proto::{self, library_server::Library}, + proto::library::{library_server::Library, Track, TrackList}, state::GrooveState, }; @@ -30,7 +30,7 @@ impl Library for LibraryService { async fn list_tracks( &self, _request: tonic::Request<()>, - ) -> Result, tonic::Status> { + ) -> Result, tonic::Status> { let Ok(db) = self.pool.get() else { return Err(tonic::Status::internal("")); }; @@ -39,16 +39,17 @@ impl Library for LibraryService { return Err(tonic::Status::internal("")); }; - let response = proto::TrackList { + let response = TrackList { tracks: tracks .iter() - .map(|t| proto::Track { + .map(|t| Track { hash: t.hash.clone(), name: t.name.clone(), artist_name: t.artist_name.clone(), artist_id: t.artist_id, + duration: t.duration, }) - .collect::>(), + .collect::>(), }; Ok(tonic::Response::new(response)) diff --git a/src/main.rs b/src/main.rs index 7385a02..d31a411 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,12 +3,12 @@ use database::{establish_connection, initialize_database}; use library::LibraryService; use music::player::AudioPlayer; use player::PlayerService; -use proto::library_server::LibraryServer; -use proto::player_server::PlayerServer; -use proto::settings_server::SettingsServer; +use proto::library::library_server::LibraryServer; +use proto::player::player_server::PlayerServer; +use proto::settings::settings_server::SettingsServer; use rodio::{OutputStream, Sink}; use state::{GrooveState, GrooveStateData}; -use tokio::sync::RwLock; +use tokio::sync::Mutex; use tonic::transport::Server; pub mod checksum; @@ -23,9 +23,15 @@ pub mod state; use settings::SettingsService; pub mod proto { - tonic::include_proto!("settings"); - tonic::include_proto!("library"); - tonic::include_proto!("player"); + pub mod settings { + tonic::include_proto!("settings"); + } + pub mod library { + tonic::include_proto!("library"); + } + pub mod player { + tonic::include_proto!("player"); + } } #[tokio::main] @@ -44,7 +50,7 @@ async fn main() -> Result<(), Box> { let player = AudioPlayer::new(sink); - let state = GrooveState::new(RwLock::new(GrooveStateData::new(player))); + let state = GrooveState::new(Mutex::new(GrooveStateData::new(player))); let settings = SettingsService::new(state.clone(), pool.clone()); let library = LibraryService::new(state.clone(), pool.clone()); diff --git a/src/music/player.rs b/src/music/player.rs index e823fb4..fe3aea8 100644 --- a/src/music/player.rs +++ b/src/music/player.rs @@ -1,28 +1,40 @@ use std::{ fs, - io::{BufReader, Cursor}, path::Path, + io::{BufReader, Cursor}, + path::Path, + time::Duration, }; -use rodio::{Decoder, Sink, Source}; +use rodio::{Decoder, Sink}; + +use crate::{database::tracks::Track, proto::player::PlayerStatus}; pub struct AudioPlayer { - pub sink: Sink, + sink: Sink, + currently_playing: Option, } impl AudioPlayer { pub fn new(sink: Sink) -> Self { + sink.set_volume(0.5); + Self { - sink + sink, + currently_playing: None, } } - pub fn play_song

(&mut self, path: P) -> Result<(), Box> where P: AsRef { + pub fn play_track

(&mut self, track: Track, path: P) -> Result<(), Box> + where + P: AsRef, + { self.sink.clear(); let file = BufReader::new(Cursor::new(fs::read(path)?)); - let source = Decoder::new(file)?.amplify(0.2); + let source = Decoder::new(file)?; + self.currently_playing = Some(track); self.sink.append(source); self.sink.play(); @@ -30,11 +42,74 @@ impl AudioPlayer { Ok(()) } - pub fn resume(&mut self) { + pub fn resume(&self) { self.sink.play(); } - pub fn pause(&mut self) { + pub fn pause(&self) { self.sink.pause(); } + + /// Toggles the player's pause state and returns the new value + pub fn toggle_pause(&self) -> bool { + if self.is_paused() { + self.resume(); + false + } else { + self.pause(); + true + } + } + + pub fn volume(&self) -> f32 { + self.sink.volume() + } + + pub fn set_volume(&self, value: f32) { + self.sink.set_volume(value); + } + + pub fn position(&self) -> u128 { + self.sink.get_pos().as_millis() + } + + pub fn set_position(&self, position: u64) { + let _ = self.sink.try_seek(Duration::from_millis(position)); + } + + pub fn is_paused(&self) -> bool { + self.sink.is_paused() + } + + pub fn currently_playing(&self) -> Option { + self.currently_playing.clone() + } + + pub fn get_snapshot(&self) -> StatusSnapshot { + StatusSnapshot { + volume: self.volume(), + position: self.position(), + is_paused: self.is_paused(), + currently_playing: self.currently_playing(), + } + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct StatusSnapshot { + pub volume: f32, + pub position: u128, + pub is_paused: bool, + pub currently_playing: Option, +} + +impl Into for StatusSnapshot { + fn into(self) -> PlayerStatus { + PlayerStatus { + volume: self.volume, + is_paused: self.is_paused, + progress: self.position as u64, + currently_playing: self.currently_playing.clone().map(|t| t.into()), + } + } } diff --git a/src/player.rs b/src/player.rs index aa97a90..2523238 100644 --- a/src/player.rs +++ b/src/player.rs @@ -1,8 +1,19 @@ use r2d2::Pool; use r2d2_sqlite::SqliteConnectionManager; +use std::{pin::Pin, time::Duration}; +use tokio::sync::mpsc; +use tokio_stream::{wrappers::ReceiverStream, Stream}; use crate::{ - database::tracks::get_track_full_path, proto::{player_server::Player, PlayTrackRequest, PlayTrackResponse}, state::GrooveState + database::tracks::{get_track, get_track_full_path}, + proto::{ + self, + player::{ + player_server::Player, PauseState, PlayTrackRequest, PlayTrackResponse, PlayerStatus, + SeekPositionRequest, SeekPositionResponse, SetVolumeRequest, SetVolumeResponse, + }, + }, + state::GrooveState, }; pub struct PlayerService { @@ -18,6 +29,8 @@ impl PlayerService { #[tonic::async_trait] impl Player for PlayerService { + type GetStatusStream = Pin> + Send>>; + async fn play_track( &self, request: tonic::Request, @@ -28,15 +41,21 @@ impl Player for PlayerService { let input = request.get_ref(); + let Ok(track) = get_track(&db, input.hash.as_str()) else { + return Err(tonic::Status::not_found("")); + }; + let Ok(track_path) = get_track_full_path(&db, input.hash.as_str()) else { return Err(tonic::Status::not_found("")); }; - let mut state = self.state.write().await; + let mut state = self.state.lock().await; - let _ = state.player.play_song(track_path); + let _ = state.player.play_track(track.clone(), track_path); let response = PlayTrackResponse { + track: Some(proto::library::Track::from(track.into())), + position: 0, }; Ok(tonic::Response::new(response)) @@ -45,22 +64,105 @@ impl Player for PlayerService { async fn resume_track( &self, _request: tonic::Request<()>, - ) -> Result, tonic::Status> { - let mut state = self.state.write().await; + ) -> Result, tonic::Status> { + let state = self.state.lock().await; state.player.resume(); - Ok(tonic::Response::new(())) + let response = PauseState { is_paused: false }; + + Ok(tonic::Response::new(response)) } - + async fn pause_track( &self, _request: tonic::Request<()>, - ) -> Result, tonic::Status> { - let mut state = self.state.write().await; + ) -> Result, tonic::Status> { + let state = self.state.lock().await; state.player.pause(); - Ok(tonic::Response::new(())) + let response = PauseState { is_paused: true }; + + Ok(tonic::Response::new(response)) + } + + async fn toggle_pause( + &self, + _request: tonic::Request<()>, + ) -> Result, tonic::Status> { + let state = self.state.lock().await; + + let is_paused = state.player.toggle_pause(); + + let response = PauseState { is_paused }; + + Ok(tonic::Response::new(response)) + } + + async fn seek_position( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let input = request.get_ref(); + let state = self.state.lock().await; + + state.player.set_position(input.position); + + let response = SeekPositionResponse { + position: input.position, + }; + + Ok(tonic::Response::new(response)) + } + + async fn set_volume( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let input = request.get_ref(); + let state = self.state.lock().await; + + state.player.set_volume(input.volume); + + let response = SetVolumeResponse { + volume: input.volume, + }; + + Ok(tonic::Response::new(response)) + } + + async fn get_status( + &self, + _request: tonic::Request<()>, + ) -> Result, tonic::Status> { + let state = self.state.clone(); + + let (tx, rx) = mpsc::channel::>(128); + + let sleep_duration = Duration::from_millis(1000); + + tokio::spawn(async move { + println!("get_status stream opened"); + + while !tx.is_closed() { + if let Err(_) = tx + .send(Ok(state.lock().await.player.get_snapshot().into())) + .await + { + break; + } + + tokio::time::sleep(sleep_duration).await; + } + + println!("get_status stream closed"); + }); + + let output_stream = ReceiverStream::new(rx); + + let response = Box::pin(output_stream) as Self::GetStatusStream; + + Ok(tonic::Response::new(response)) } } diff --git a/src/settings.rs b/src/settings.rs index 7d40ec3..f704663 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -3,9 +3,13 @@ use crate::database::paths::{ }; use crate::library::index_path; use crate::proto; +use crate::proto::settings::{ + AddPathRequest, AddPathResponse, DeletePathRequest, DeletePathResponse, RefreshPathRequest, + RefreshPathResponse, +}; use crate::state::GrooveState; -use proto::settings_server::Settings; +use proto::settings::{settings_server::Settings, LibraryPath, SettingsData}; use r2d2::Pool; use r2d2_sqlite::SqliteConnectionManager; @@ -26,7 +30,7 @@ impl Settings for SettingsService { async fn list_paths( &self, _request: tonic::Request<()>, - ) -> Result, tonic::Status> { + ) -> Result, tonic::Status> { let Ok(db) = self.pool.get() else { return Err(tonic::Status::internal("")); }; @@ -35,10 +39,10 @@ impl Settings for SettingsService { return Err(tonic::Status::internal("")); }; - let response = proto::SettingsData { + let response = SettingsData { library_paths: library_paths .iter() - .map(|p| proto::LibraryPath { + .map(|p| LibraryPath { id: p.id, path: p.path.clone(), }) @@ -50,8 +54,8 @@ impl Settings for SettingsService { async fn add_path( &self, - request: tonic::Request, - ) -> Result, tonic::Status> { + request: tonic::Request, + ) -> Result, tonic::Status> { let input = request.into_inner(); let Ok(db) = self.pool.get() else { @@ -66,15 +70,15 @@ impl Settings for SettingsService { return Err(tonic::Status::internal("")); } - let response = proto::AddPathResponse { id: 0 }; + let response = AddPathResponse { id: 0 }; Ok(tonic::Response::new(response)) } async fn delete_path( &self, - request: tonic::Request, - ) -> Result, tonic::Status> { + request: tonic::Request, + ) -> Result, tonic::Status> { let input = request.into_inner(); let Ok(db) = self.pool.get() else { @@ -89,15 +93,15 @@ impl Settings for SettingsService { return Err(tonic::Status::not_found("")); } - let response = proto::DeletePathResponse {}; + let response = DeletePathResponse {}; Ok(tonic::Response::new(response)) } async fn refresh_path( &self, - request: tonic::Request, - ) -> Result, tonic::Status> { + request: tonic::Request, + ) -> Result, tonic::Status> { let input = request.into_inner(); let Ok(db) = self.pool.get() else { @@ -110,7 +114,7 @@ impl Settings for SettingsService { let _ = index_path(library_path.path.into(), db, library_path.id); - let response = proto::RefreshPathResponse {}; + let response = RefreshPathResponse {}; Ok(tonic::Response::new(response)) } diff --git a/src/state.rs b/src/state.rs index 4174460..020b888 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,10 +1,10 @@ use std::sync::Arc; -use tokio::sync::RwLock; +use tokio::sync::Mutex; use crate::music::player::AudioPlayer; -pub type GrooveState = Arc>; +pub type GrooveState = Arc>; pub struct GrooveStateData { pub player: AudioPlayer,