feat: stream currently playing + volume + toggle pause + seek position

This commit is contained in:
2024-11-24 23:36:24 +01:00
parent 51a57ebd40
commit de9e430828
14 changed files with 314 additions and 62 deletions

View File

@@ -1,8 +1,19 @@
use r2d2::Pool;
use r2d2_sqlite::SqliteConnectionManager;
use std::{pin::Pin, time::Duration};
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, Stream};
use crate::{
database::tracks::get_track_full_path, proto::{player_server::Player, PlayTrackRequest, PlayTrackResponse}, state::GrooveState
database::tracks::{get_track, get_track_full_path},
proto::{
self,
player::{
player_server::Player, PauseState, PlayTrackRequest, PlayTrackResponse, PlayerStatus,
SeekPositionRequest, SeekPositionResponse, SetVolumeRequest, SetVolumeResponse,
},
},
state::GrooveState,
};
pub struct PlayerService {
@@ -18,6 +29,8 @@ impl PlayerService {
#[tonic::async_trait]
impl Player for PlayerService {
type GetStatusStream = Pin<Box<dyn Stream<Item = Result<PlayerStatus, tonic::Status>> + Send>>;
async fn play_track(
&self,
request: tonic::Request<PlayTrackRequest>,
@@ -28,15 +41,21 @@ impl Player for PlayerService {
let input = request.get_ref();
let Ok(track) = get_track(&db, input.hash.as_str()) else {
return Err(tonic::Status::not_found(""));
};
let Ok(track_path) = get_track_full_path(&db, input.hash.as_str()) else {
return Err(tonic::Status::not_found(""));
};
let mut state = self.state.write().await;
let mut state = self.state.lock().await;
let _ = state.player.play_song(track_path);
let _ = state.player.play_track(track.clone(), track_path);
let response = PlayTrackResponse {
track: Some(proto::library::Track::from(track.into())),
position: 0,
};
Ok(tonic::Response::new(response))
@@ -45,22 +64,105 @@ impl Player for PlayerService {
async fn resume_track(
&self,
_request: tonic::Request<()>,
) -> Result<tonic::Response<()>, tonic::Status> {
let mut state = self.state.write().await;
) -> Result<tonic::Response<PauseState>, tonic::Status> {
let state = self.state.lock().await;
state.player.resume();
Ok(tonic::Response::new(()))
let response = PauseState { is_paused: false };
Ok(tonic::Response::new(response))
}
async fn pause_track(
&self,
_request: tonic::Request<()>,
) -> Result<tonic::Response<()>, tonic::Status> {
let mut state = self.state.write().await;
) -> Result<tonic::Response<PauseState>, tonic::Status> {
let state = self.state.lock().await;
state.player.pause();
Ok(tonic::Response::new(()))
let response = PauseState { is_paused: true };
Ok(tonic::Response::new(response))
}
async fn toggle_pause(
&self,
_request: tonic::Request<()>,
) -> Result<tonic::Response<PauseState>, tonic::Status> {
let state = self.state.lock().await;
let is_paused = state.player.toggle_pause();
let response = PauseState { is_paused };
Ok(tonic::Response::new(response))
}
async fn seek_position(
&self,
request: tonic::Request<SeekPositionRequest>,
) -> Result<tonic::Response<SeekPositionResponse>, tonic::Status> {
let input = request.get_ref();
let state = self.state.lock().await;
state.player.set_position(input.position);
let response = SeekPositionResponse {
position: input.position,
};
Ok(tonic::Response::new(response))
}
async fn set_volume(
&self,
request: tonic::Request<SetVolumeRequest>,
) -> Result<tonic::Response<SetVolumeResponse>, tonic::Status> {
let input = request.get_ref();
let state = self.state.lock().await;
state.player.set_volume(input.volume);
let response = SetVolumeResponse {
volume: input.volume,
};
Ok(tonic::Response::new(response))
}
async fn get_status(
&self,
_request: tonic::Request<()>,
) -> Result<tonic::Response<Self::GetStatusStream>, tonic::Status> {
let state = self.state.clone();
let (tx, rx) = mpsc::channel::<Result<PlayerStatus, tonic::Status>>(128);
let sleep_duration = Duration::from_millis(1000);
tokio::spawn(async move {
println!("get_status stream opened");
while !tx.is_closed() {
if let Err(_) = tx
.send(Ok(state.lock().await.player.get_snapshot().into()))
.await
{
break;
}
tokio::time::sleep(sleep_duration).await;
}
println!("get_status stream closed");
});
let output_stream = ReceiverStream::new(rx);
let response = Box::pin(output_stream) as Self::GetStatusStream;
Ok(tonic::Response::new(response))
}
}