373 lines
11 KiB
Rust
373 lines
11 KiB
Rust
use deadpool_sqlite::Pool;
|
|
use std::{path::PathBuf, pin::Pin, time::Duration};
|
|
use tokio::sync::mpsc;
|
|
use tokio_stream::{wrappers::ReceiverStream, Stream};
|
|
|
|
use crate::{
|
|
database::tracks::{
|
|
get_playlist, get_specific_tracks, get_specific_tracks_full_path, get_track,
|
|
get_track_full_path, Track,
|
|
},
|
|
music::queue::queue_to_track_vec,
|
|
proto::{
|
|
self,
|
|
player::{
|
|
player_server::Player, PauseState, PlayPlaylistRequest, PlayTrackResponse,
|
|
PlayerStatus, Queue, SeekPositionRequest, SeekPositionResponse, SetVolumeRequest,
|
|
SetVolumeResponse, SkipToQueueIndexRequest, SwapQueueIndicesRequest, TrackRequest,
|
|
TracksRequest,
|
|
},
|
|
},
|
|
state::GrooveState,
|
|
};
|
|
|
|
pub struct PlayerService {
|
|
state: GrooveState,
|
|
pool: Pool,
|
|
}
|
|
|
|
impl PlayerService {
|
|
pub fn new(state: GrooveState, pool: Pool) -> Self {
|
|
Self { state, pool }
|
|
}
|
|
}
|
|
|
|
#[tonic::async_trait]
|
|
impl Player for PlayerService {
|
|
type GetStatusStream = Pin<Box<dyn Stream<Item = Result<PlayerStatus, tonic::Status>> + Send>>;
|
|
|
|
async fn play_track(
|
|
&self,
|
|
request: tonic::Request<TrackRequest>,
|
|
) -> Result<tonic::Response<PlayTrackResponse>, tonic::Status> {
|
|
let input = request.get_ref();
|
|
|
|
let Ok(track) = get_track(&self.pool, input.hash.as_str()).await else {
|
|
return Err(tonic::Status::not_found("Could not get track"));
|
|
};
|
|
|
|
let Ok(track_path) = get_track_full_path(&self.pool, input.hash.as_str()).await else {
|
|
return Err(tonic::Status::not_found("Could not get track file path"));
|
|
};
|
|
|
|
let state = self.state.lock().await;
|
|
|
|
let player = &mut state.player.lock().await;
|
|
|
|
let _ = player.play_track(track.clone(), track_path, true);
|
|
|
|
let response = PlayTrackResponse {
|
|
track: Some(proto::library::Track::from(track.into())),
|
|
position: 0,
|
|
};
|
|
|
|
Ok(tonic::Response::new(response))
|
|
}
|
|
|
|
async fn resume_track(
|
|
&self,
|
|
_: tonic::Request<()>,
|
|
) -> Result<tonic::Response<PauseState>, tonic::Status> {
|
|
let state = self.state.lock().await;
|
|
|
|
state.player.lock().await.resume();
|
|
|
|
let response = PauseState { is_paused: false };
|
|
|
|
Ok(tonic::Response::new(response))
|
|
}
|
|
|
|
async fn pause_track(
|
|
&self,
|
|
_: tonic::Request<()>,
|
|
) -> Result<tonic::Response<PauseState>, tonic::Status> {
|
|
let state = self.state.lock().await;
|
|
|
|
state.player.lock().await.pause();
|
|
|
|
let response = PauseState { is_paused: true };
|
|
|
|
Ok(tonic::Response::new(response))
|
|
}
|
|
|
|
async fn toggle_pause(
|
|
&self,
|
|
_: tonic::Request<()>,
|
|
) -> Result<tonic::Response<PauseState>, tonic::Status> {
|
|
let state = self.state.lock().await;
|
|
|
|
let is_paused = state.player.lock().await.toggle_pause();
|
|
|
|
let response = PauseState { is_paused };
|
|
|
|
Ok(tonic::Response::new(response))
|
|
}
|
|
|
|
async fn seek_position(
|
|
&self,
|
|
request: tonic::Request<SeekPositionRequest>,
|
|
) -> Result<tonic::Response<SeekPositionResponse>, tonic::Status> {
|
|
let input = request.get_ref();
|
|
let state = self.state.lock().await;
|
|
|
|
state.player.lock().await.set_position(input.position);
|
|
|
|
let response = SeekPositionResponse {
|
|
position: input.position,
|
|
};
|
|
|
|
Ok(tonic::Response::new(response))
|
|
}
|
|
|
|
async fn set_volume(
|
|
&self,
|
|
request: tonic::Request<SetVolumeRequest>,
|
|
) -> Result<tonic::Response<SetVolumeResponse>, tonic::Status> {
|
|
let input = request.get_ref();
|
|
let state = self.state.lock().await;
|
|
|
|
state.player.lock().await.set_volume(input.volume);
|
|
|
|
let response = SetVolumeResponse {
|
|
volume: input.volume,
|
|
};
|
|
|
|
Ok(tonic::Response::new(response))
|
|
}
|
|
|
|
async fn get_status(
|
|
&self,
|
|
_: tonic::Request<()>,
|
|
) -> Result<tonic::Response<Self::GetStatusStream>, tonic::Status> {
|
|
let state = self.state.clone();
|
|
|
|
let (tx, rx) = mpsc::channel::<Result<PlayerStatus, tonic::Status>>(128);
|
|
|
|
let sleep_duration = Duration::from_millis(1000);
|
|
|
|
tokio::spawn(async move {
|
|
while !tx.is_closed() {
|
|
if let Err(_) = tx
|
|
.send(Ok(state
|
|
.lock()
|
|
.await
|
|
.player
|
|
.lock()
|
|
.await
|
|
.get_snapshot()
|
|
.into()))
|
|
.await
|
|
{
|
|
break;
|
|
}
|
|
|
|
tokio::time::sleep(sleep_duration).await;
|
|
}
|
|
});
|
|
|
|
let output_stream = ReceiverStream::new(rx);
|
|
|
|
let response = Box::pin(output_stream) as Self::GetStatusStream;
|
|
|
|
Ok(tonic::Response::new(response))
|
|
}
|
|
|
|
async fn add_track_to_queue(
|
|
&self,
|
|
request: tonic::Request<TrackRequest>,
|
|
) -> Result<tonic::Response<Queue>, tonic::Status> {
|
|
let input = request.get_ref();
|
|
|
|
let Ok(track) = get_track(&self.pool, input.hash.as_str()).await else {
|
|
return Err(tonic::Status::not_found(""));
|
|
};
|
|
|
|
let Ok(track_path) = get_track_full_path(&self.pool, input.hash.as_str()).await else {
|
|
return Err(tonic::Status::not_found(""));
|
|
};
|
|
|
|
let state = self.state.lock().await;
|
|
let mut player = state.player.lock().await;
|
|
|
|
if let Err(_) = player.add_to_queue(track, &track_path).await {
|
|
return Err(tonic::Status::internal(""));
|
|
}
|
|
|
|
let queue = player.queue();
|
|
|
|
let response = Queue {
|
|
tracks: queue_to_track_vec(queue),
|
|
};
|
|
|
|
Ok(tonic::Response::new(response))
|
|
}
|
|
|
|
async fn add_tracks_to_queue(
|
|
&self,
|
|
request: tonic::Request<TracksRequest>,
|
|
) -> Result<tonic::Response<Queue>, tonic::Status> {
|
|
let input = request.get_ref();
|
|
|
|
let Ok(tracks) = get_specific_tracks(&self.pool, &input.tracks).await else {
|
|
return Err(tonic::Status::not_found(""));
|
|
};
|
|
|
|
let Ok(track_paths) = get_specific_tracks_full_path(&self.pool, &input.tracks).await else {
|
|
return Err(tonic::Status::not_found(""));
|
|
};
|
|
|
|
let state = self.state.lock().await;
|
|
let mut player = state.player.lock().await;
|
|
|
|
let tracks_and_paths: Vec<(Track, PathBuf)> =
|
|
tracks.into_iter().zip(track_paths.into_iter()).collect();
|
|
|
|
if let Err(_) = player.add_tracks_to_queue(tracks_and_paths, false).await {
|
|
return Err(tonic::Status::internal(""));
|
|
}
|
|
|
|
let queue = player.queue();
|
|
|
|
let response = Queue {
|
|
tracks: queue_to_track_vec(queue),
|
|
};
|
|
|
|
Ok(tonic::Response::new(response))
|
|
}
|
|
|
|
async fn play_playlist(
|
|
&self,
|
|
request: tonic::Request<PlayPlaylistRequest>,
|
|
) -> Result<tonic::Response<PlayerStatus>, tonic::Status> {
|
|
let input = request.get_ref();
|
|
|
|
let Ok(playlist) = get_playlist(&self.pool, input.id).await else {
|
|
return Err(tonic::Status::not_found(""));
|
|
};
|
|
|
|
let Ok(track_paths) = get_specific_tracks_full_path(
|
|
&self.pool,
|
|
&playlist.tracks.iter().map(|t| t.hash.clone()).collect(),
|
|
)
|
|
.await
|
|
else {
|
|
return Err(tonic::Status::not_found(""));
|
|
};
|
|
|
|
let state = self.state.lock().await;
|
|
let mut player = state.player.lock().await;
|
|
|
|
let tracks = playlist.tracks.iter().map(|t| t.clone().into());
|
|
|
|
let tracks_and_paths: Vec<(Track, PathBuf)> =
|
|
tracks.into_iter().zip(track_paths.into_iter()).collect();
|
|
|
|
if let Err(_) = player.add_tracks_to_queue(tracks_and_paths, true).await {
|
|
return Err(tonic::Status::internal(""));
|
|
}
|
|
|
|
let response = player.get_snapshot().into();
|
|
|
|
Ok(tonic::Response::new(response))
|
|
}
|
|
|
|
async fn play_track_next(
|
|
&self,
|
|
request: tonic::Request<TrackRequest>,
|
|
) -> Result<tonic::Response<Queue>, tonic::Status> {
|
|
let input = request.get_ref();
|
|
|
|
let Ok(track) = get_track(&self.pool, input.hash.as_str()).await else {
|
|
return Err(tonic::Status::not_found(""));
|
|
};
|
|
|
|
let Ok(track_path) = get_track_full_path(&self.pool, input.hash.as_str()).await else {
|
|
return Err(tonic::Status::not_found(""));
|
|
};
|
|
|
|
let state = self.state.lock().await;
|
|
let mut player = state.player.lock().await;
|
|
|
|
if let Err(_) = player.play_track_next(track, &track_path) {
|
|
return Err(tonic::Status::internal(""));
|
|
}
|
|
|
|
let queue = player.queue();
|
|
|
|
let response = Queue {
|
|
tracks: queue_to_track_vec(queue),
|
|
};
|
|
|
|
Ok(tonic::Response::new(response))
|
|
}
|
|
|
|
async fn skip_track(
|
|
&self,
|
|
_: tonic::Request<()>,
|
|
) -> Result<tonic::Response<PlayerStatus>, tonic::Status> {
|
|
let state = self.state.lock().await;
|
|
|
|
let mut player = state.player.lock().await;
|
|
|
|
if let Err(_) = player.skip_track() {
|
|
return Err(tonic::Status::internal(""));
|
|
};
|
|
|
|
let response = player.get_snapshot().into();
|
|
|
|
Ok(tonic::Response::new(response))
|
|
}
|
|
|
|
async fn skip_to_queue_index(
|
|
&self,
|
|
request: tonic::Request<SkipToQueueIndexRequest>,
|
|
) -> Result<tonic::Response<PlayerStatus>, tonic::Status> {
|
|
let input = request.get_ref();
|
|
|
|
let Ok(index) = input.index.try_into() else {
|
|
return Err(tonic::Status::internal(""));
|
|
};
|
|
|
|
let state = self.state.lock().await;
|
|
let mut player = state.player.lock().await;
|
|
|
|
if let Err(_) = player.skip_to_queue_index(index) {
|
|
return Err(tonic::Status::internal(""));
|
|
};
|
|
|
|
let response = player.get_snapshot().into();
|
|
|
|
Ok(tonic::Response::new(response))
|
|
}
|
|
|
|
async fn swap_queue_indices(
|
|
&self,
|
|
request: tonic::Request<SwapQueueIndicesRequest>,
|
|
) -> Result<tonic::Response<Queue>, tonic::Status> {
|
|
let input = request.get_ref();
|
|
|
|
let Ok(a) = input.a.try_into() else {
|
|
return Err(tonic::Status::internal(""));
|
|
};
|
|
|
|
let Ok(b) = input.b.try_into() else {
|
|
return Err(tonic::Status::internal(""));
|
|
};
|
|
|
|
let state = self.state.lock().await;
|
|
let mut player = state.player.lock().await;
|
|
|
|
if !player.swap_queue_indices(a, b) {
|
|
return Err(tonic::Status::internal(""));
|
|
};
|
|
|
|
let queue = player.queue();
|
|
|
|
let response = Queue {
|
|
tracks: queue_to_track_vec(queue),
|
|
};
|
|
|
|
Ok(tonic::Response::new(response))
|
|
}
|
|
}
|