diff --git a/src/music_controller/controller.rs b/src/music_controller/controller.rs index 5e0ea55..df4fa20 100644 --- a/src/music_controller/controller.rs +++ b/src/music_controller/controller.rs @@ -73,7 +73,7 @@ impl Controller

{ queue: Queue::default(), config: config_.clone(), library, - player: Box::new(P::new()), + player: Box::new(P::new()?), }) } diff --git a/src/music_player/gstreamer.rs b/src/music_player/gstreamer.rs index 91abfde..ff5d25c 100644 --- a/src/music_player/gstreamer.rs +++ b/src/music_player/gstreamer.rs @@ -1,7 +1,7 @@ // Crate things //use crate::music_controller::config::Config; use crate::music_storage::library::URI; -use crossbeam_channel::unbounded; +use crossbeam_channel::{unbounded, Receiver, Sender}; use std::error::Error; use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; @@ -13,29 +13,10 @@ use gstreamer::prelude::*; // Extra things use chrono::Duration; -use thiserror::Error; -use super::player::{Player, PlayerError}; +use super::player::{Player, PlayerCommand, PlayerError, PlayerState}; -#[derive(Debug)] -pub enum GstCmd { - Play, - Pause, - Eos, - AboutToFinish, -} - -#[derive(Debug, PartialEq, Eq)] -pub enum GstState { - Playing, - Paused, - Ready, - Buffering(u8), - Null, - VoidPending, -} - -impl From for GstState { +impl From for PlayerState { fn from(value: gst::State) -> Self { match value { gst::State::VoidPending => Self::VoidPending, @@ -47,7 +28,7 @@ impl From for GstState { } } -impl TryInto for GstState { +impl TryInto for PlayerState { fn try_into(self) -> Result> { match self { Self::VoidPending => Ok(gst::State::VoidPending), @@ -63,7 +44,7 @@ impl TryInto for GstState { } #[derive(Debug, PartialEq, Eq)] -enum PlaybackStats { +enum PlaybackInfo { Idle, Switching, Playing{ @@ -77,10 +58,10 @@ enum PlaybackStats { #[derive(Debug)] pub struct GStreamer { source: Option, - //pub message_tx: Sender, - pub message_rx: crossbeam::channel::Receiver, - playback_tx: crossbeam::channel::Sender, + message_rx: crossbeam::channel::Receiver, + playback_tx: crossbeam::channel::Sender, + playbin: Arc>, volume: f64, start: Option, @@ -89,16 +70,163 @@ pub struct GStreamer { position: Arc>>, } +impl From for PlayerError { + fn from(value: gst::StateChangeError) -> Self { + PlayerError::StateChange(value.to_string()) + } +} + +impl From for PlayerError { + fn from(value: glib::BoolError) -> Self { + PlayerError::General(value.to_string()) + } +} + impl GStreamer { - pub fn new() -> Result { + /// Set the playback URI + fn set_source(&mut self, source: &URI) -> Result<(), PlayerError> { + if !source.exists().is_ok_and(|x| x) { + // If the source doesn't exist, gstreamer will crash! + return Err(PlayerError::NotFound) + } + + // Make sure the playback tracker knows the stuff is stopped + self.playback_tx.send(PlaybackInfo::Switching).unwrap(); + + let uri = self.playbin.read().unwrap().property_value("current-uri"); + self.source = Some(source.clone()); + match source { + URI::Cue { start, end, .. } => { + self.playbin + .write() + .unwrap() + .set_property("uri", source.as_uri()); + + // Set the start and end positions of the CUE file + self.start = Some(Duration::from_std(*start).unwrap()); + self.end = Some(Duration::from_std(*end).unwrap()); + + // Send the updated position to the tracker + self.playback_tx.send(PlaybackInfo::Playing{ + start: self.start.unwrap(), + end: self.end.unwrap() + }).unwrap(); + + // Wait for it to be ready, and then move to the proper position + self.play().unwrap(); + let now = std::time::Instant::now(); + while now.elapsed() < std::time::Duration::from_millis(20) { + if self.seek_to(Duration::from_std(*start).unwrap()).is_ok() { + return Ok(()); + } + std::thread::sleep(std::time::Duration::from_millis(1)); + } + //panic!("Couldn't seek to beginning of cue track in reasonable time (>20ms)"); + return Err(PlayerError::StateChange("Could not seek to beginning of CUE track".into())) + } + _ => { + self.playbin + .write() + .unwrap() + .set_property("uri", source.as_uri()); + + self.play().unwrap(); + + while uri.get::<&str>().unwrap_or("") + == self.property("current-uri").get::<&str>().unwrap_or("") + || self.position().is_none() + { + std::thread::sleep(std::time::Duration::from_millis(10)); + } + + self.start = Some(Duration::seconds(0)); + self.end = self.raw_duration(); + + // Send the updated position to the tracker + self.playback_tx.send(PlaybackInfo::Playing{ + start: self.start.unwrap(), + end: self.end.unwrap() + }).unwrap(); + } + } + + Ok(()) + } + + /// Gets a mutable reference to the playbin element + fn playbin_mut( + &mut self, + ) -> Result, std::sync::PoisonError>> + { + let element = match self.playbin.write() { + Ok(element) => element, + Err(err) => return Err(err), + }; + Ok(element) + } + + /// Gets a read-only reference to the playbin element + fn playbin( + &self, + ) -> Result, std::sync::PoisonError>> + { + let element = match self.playbin.read() { + Ok(element) => element, + Err(err) => return Err(err), + }; + Ok(element) + } + + /// Set volume of the internal playbin player, can be + /// used to bypass the main volume control for seeking + fn set_gstreamer_volume(&mut self, volume: f64) { + self.playbin_mut().unwrap().set_property("volume", volume) + } + + fn set_state(&mut self, state: gst::State) -> Result<(), gst::StateChangeError> { + self.playbin_mut().unwrap().set_state(state)?; + + Ok(()) + } + + fn raw_duration(&self) -> Option { + self.playbin() + .unwrap() + .query_duration::() + .map(|pos| Duration::nanoseconds(pos.nseconds() as i64)) + } + + /// Get the current state of the playback + fn state(&mut self) -> PlayerState { + self.playbin().unwrap().current_state().into() + /* + match *self.buffer.read().unwrap() { + None => self.playbin().unwrap().current_state().into(), + Some(value) => PlayerState::Buffering(value), + } + */ + } + + fn property(&self, property: &str) -> glib::Value { + self.playbin().unwrap().property_value(property) + } +} + +impl Player for GStreamer { + fn new() -> Result { // Initialize GStreamer, maybe figure out how to nicely fail here - gst::init()?; + if let Err(err) = gst::init() { + return Err(PlayerError::Init(err.to_string())) + }; let ctx = glib::MainContext::default(); let _guard = ctx.acquire(); let mainloop = glib::MainLoop::new(Some(&ctx), false); let playbin_arc = Arc::new(RwLock::new( - gst::ElementFactory::make("playbin3").build()?, + match gst::ElementFactory::make("playbin3").build() { + Ok(playbin) => playbin, + Err(error) => return Err(PlayerError::Init(error.to_string())), + } )); let playbin = playbin_arc.clone(); @@ -124,53 +252,11 @@ impl GStreamer { // Set up the thread to monitor the position let (playback_tx, playback_rx) = unbounded(); - let (stat_tx, stat_rx) = unbounded::(); + let (status_tx, status_rx) = unbounded::(); let position_update = Arc::clone(&position); - let _playback_monitor = std::thread::spawn(move || { //TODO: Figure out how to return errors nicely in threads - let mut stats = PlaybackStats::Idle; - let mut pos_temp; - loop { - // Check for new messages or updates about how to proceed - if let Ok(res) = stat_rx.recv_timeout(std::time::Duration::from_millis(100)) { - stats = res - } - pos_temp = playbin_arc - .read() - .unwrap() - .query_position::() - .map(|pos| Duration::nanoseconds(pos.nseconds() as i64)); - - match stats { - PlaybackStats::Playing{start, end} if pos_temp.is_some() => { - // Check if the current playback position is close to the end - let finish_point = end - Duration::milliseconds(250); - if pos_temp.unwrap() >= end { - let _ = playback_tx.try_send(GstCmd::Eos); - playbin_arc - .write() - .unwrap() - .set_state(gst::State::Ready) - .expect("Unable to set the pipeline state"); - } else if pos_temp.unwrap() >= finish_point { - let _ = playback_tx.try_send(GstCmd::AboutToFinish); - } - - // This has to be done AFTER the current time in the file - // is calculated, or everything else is wrong - pos_temp = Some(pos_temp.unwrap() - start) - }, - PlaybackStats::Finished => { - *position_update.write().unwrap() = None; - break - }, - PlaybackStats::Idle | PlaybackStats::Switching => {}, - _ => () - } - - *position_update.write().unwrap() = pos_temp; - } - }); + let _playback_monitor = + std::thread::spawn(|| playback_monitor(playbin_arc, status_rx, playback_tx, position_update)); // Set up the thread to monitor bus messages let playbin_bus_ctrl = Arc::clone(&playbin); @@ -231,7 +317,7 @@ impl GStreamer { source, playbin, message_rx: playback_rx, - playback_tx: stat_tx, + playback_tx: status_tx, volume: 1.0, start: None, end: None, @@ -240,163 +326,65 @@ impl GStreamer { }) } - pub fn source(&self) -> &Option { + fn source(&self) -> &Option { &self.source } - pub fn enqueue_next(&mut self, next_track: &URI) -> Result<(), PlayerError> { + /// Insert a new track to be played. This method should be called at the + /// beginning to start playback of something, and once the [PlayerCommand] + /// indicates the track is about to finish to enqueue gaplessly. + fn enqueue_next(&mut self, next_track: &URI) -> Result<(), PlayerError> { self.set_source(next_track) } - /// Set the playback URI - fn set_source(&mut self, source: &URI) -> Result<(), PlayerError> { - if !source.exists().is_ok_and(|x| x) { - // If the source doesn't exist, gstreamer will crash! - return Err(PlayerError::NotFound) - } - - // Make sure the playback tracker knows the stuff is stopped - self.playback_tx.send(PlaybackStats::Switching).unwrap(); - - let uri = self.playbin.read().unwrap().property_value("current-uri"); - self.source = Some(source.clone()); - match source { - URI::Cue { start, end, .. } => { - self.playbin - .write() - .unwrap() - .set_property("uri", source.as_uri()); - - // Set the start and end positions of the CUE file - self.start = Some(Duration::from_std(*start).unwrap()); - self.end = Some(Duration::from_std(*end).unwrap()); - - // Send the updated position to the tracker - self.playback_tx.send(PlaybackStats::Playing{ - start: self.start.unwrap(), - end: self.end.unwrap() - }).unwrap(); - - // Wait for it to be ready, and then move to the proper position - self.play().unwrap(); - let now = std::time::Instant::now(); - while now.elapsed() < std::time::Duration::from_millis(20) { - if self.seek_to(Duration::from_std(*start).unwrap()).is_ok() { - return Ok(()); - } - std::thread::sleep(std::time::Duration::from_millis(1)); - } - panic!("Couldn't seek to beginning of cue track in reasonable time (>20ms)"); - } - _ => { - self.playbin - .write() - .unwrap() - .set_property("uri", source.as_uri()); - - self.play().unwrap(); - - while uri.get::<&str>().unwrap_or("") - == self.property("current-uri").get::<&str>().unwrap_or("") - || self.position().is_none() - { - std::thread::sleep(std::time::Duration::from_millis(10)); - } - - self.start = Some(Duration::seconds(0)); - self.end = self.raw_duration(); - - // Send the updated position to the tracker - self.playback_tx.send(PlaybackStats::Playing{ - start: self.start.unwrap(), - end: self.end.unwrap() - }).unwrap(); - } - } - - Ok(()) - } - - /// Gets a mutable reference to the playbin element - fn playbin_mut( - &mut self, - ) -> Result, std::sync::PoisonError>> - { - let element = match self.playbin.write() { - Ok(element) => element, - Err(err) => return Err(err), - }; - Ok(element) - } - - /// Gets a read-only reference to the playbin element - fn playbin( - &self, - ) -> Result, std::sync::PoisonError>> - { - let element = match self.playbin.read() { - Ok(element) => element, - Err(err) => return Err(err), - }; - Ok(element) - } - /// Set the playback volume, accepts a float from 0 to 1 - pub fn set_volume(&mut self, volume: f64) { + fn set_volume(&mut self, volume: f64) { self.volume = volume.clamp(0.0, 1.0); self.set_gstreamer_volume(self.volume); } - /// Set volume of the internal playbin player, can be - /// used to bypass the main volume control for seeking - fn set_gstreamer_volume(&mut self, volume: f64) { - self.playbin_mut().unwrap().set_property("volume", volume) - } - /// Returns the current volume level, a float from 0 to 1 - pub fn volume(&mut self) -> f64 { + fn volume(&mut self) -> f64 { self.volume } - fn set_state(&mut self, state: gst::State) -> Result<(), gst::StateChangeError> { - self.playbin_mut().unwrap().set_state(state)?; - + fn ready(&mut self) -> Result<(), PlayerError> { + self.set_state(gst::State::Ready)?; Ok(()) } - pub fn ready(&mut self) -> Result<(), gst::StateChangeError> { - self.set_state(gst::State::Ready) - } - /// If the player is paused or stopped, starts playback - pub fn play(&mut self) -> Result<(), gst::StateChangeError> { - self.set_state(gst::State::Playing) + fn play(&mut self) -> Result<(), PlayerError> { + self.set_state(gst::State::Playing)?; + Ok(()) } /// Pause, if playing - pub fn pause(&mut self) -> Result<(), gst::StateChangeError> { - //*self.paused.write().unwrap() = true; - self.set_state(gst::State::Paused) + fn pause(&mut self) -> Result<(), PlayerError> { + //self.paused = true; + self.set_state(gst::State::Paused)?; + Ok(()) } /// Resume from being paused - pub fn resume(&mut self) -> Result<(), gst::StateChangeError> { - //*self.paused.write().unwrap() = false; - self.set_state(gst::State::Playing) + fn resume(&mut self) -> Result<(), PlayerError> { + //self.paused = false; + self.set_state(gst::State::Playing)?; + Ok(()) } /// Check if playback is paused - pub fn is_paused(&mut self) -> bool { + fn is_paused(&mut self) -> bool { self.playbin().unwrap().current_state() == gst::State::Paused } /// Get the current playback position of the player - pub fn position(&mut self) -> Option { + fn position(&mut self) -> Option { *self.position.read().unwrap() } /// Get the duration of the currently playing track - pub fn duration(&mut self) -> Option { + fn duration(&mut self) -> Option { if self.end.is_some() && self.start.is_some() { Some(self.end.unwrap() - self.start.unwrap()) } else { @@ -404,18 +392,11 @@ impl GStreamer { } } - pub fn raw_duration(&self) -> Option { - self.playbin() - .unwrap() - .query_duration::() - .map(|pos| Duration::nanoseconds(pos.nseconds() as i64)) - } - /// Seek relative to the current position - pub fn seek_by(&mut self, seek_amount: Duration) -> Result<(), Box> { + fn seek_by(&mut self, seek_amount: Duration) -> Result<(), PlayerError> { let time_pos = match *self.position.read().unwrap() { Some(pos) => pos, - None => return Err("No position".into()), + None => return Err(PlayerError::Seek("No position".into())), }; let seek_pos = time_pos + seek_amount; @@ -424,15 +405,15 @@ impl GStreamer { } /// Seek absolutely - pub fn seek_to(&mut self, target_pos: Duration) -> Result<(), Box> { + fn seek_to(&mut self, target_pos: Duration) -> Result<(), PlayerError> { let start = if self.start.is_none() { - return Err("Failed to seek: No START time".into()); + return Err(PlayerError::Seek("No START time".into())); } else { self.start.unwrap() }; let end = if self.end.is_none() { - return Err("Failed to seek: No END time".into()); + return Err(PlayerError::Seek("No END time".into())); } else { self.end.unwrap() }; @@ -451,28 +432,13 @@ impl GStreamer { Ok(()) } - /// Get the current state of the playback - pub fn state(&mut self) -> GstState { - self.playbin().unwrap().current_state().into() - /* - match *self.buffer.read().unwrap() { - None => self.playbin().unwrap().current_state().into(), - Some(value) => PlayerState::Buffering(value), - } - */ - } - - pub fn property(&self, property: &str) -> glib::Value { - self.playbin().unwrap().property_value(property) - } - /// Stop the playback entirely - pub fn stop(&mut self) -> Result<(), gst::StateChangeError> { + fn stop(&mut self) -> Result<(), PlayerError> { self.pause()?; self.ready()?; // Send the updated position to the tracker - self.playback_tx.send(PlaybackStats::Idle).unwrap(); + self.playback_tx.send(PlaybackInfo::Idle).unwrap(); // Set all positions to none *self.position.write().unwrap() = None; @@ -480,9 +446,12 @@ impl GStreamer { self.end = None; Ok(()) } -} -// impl Player for GStreamer {} + /// Return a reference to the player message channel + fn message_channel(&self) -> &crossbeam::channel::Receiver { + &self.message_rx + } +} impl Drop for GStreamer { /// Cleans up the `GStreamer` pipeline and the monitoring @@ -492,6 +461,57 @@ impl Drop for GStreamer { .unwrap() .set_state(gst::State::Null) .expect("Unable to set the pipeline to the `Null` state"); - let _ = self.playback_tx.send(PlaybackStats::Finished); + let _ = self.playback_tx.send(PlaybackInfo::Finished); + } +} + +fn playback_monitor( + playbin: Arc>, + status_rx: Receiver, + playback_tx: Sender, + position: Arc>>, +) { + let mut stats = PlaybackInfo::Idle; + let mut pos_temp; + loop { + // Check for new messages to decide how to proceed + if let Ok(result) = status_rx.recv_timeout(std::time::Duration::from_millis(100)) { + stats = result + } + + pos_temp = playbin + .read() + .unwrap() + .query_position::() + .map(|pos| Duration::nanoseconds(pos.nseconds() as i64)); + + match stats { + PlaybackInfo::Playing{start, end} if pos_temp.is_some() => { + // Check if the current playback position is close to the end + let finish_point = end - Duration::milliseconds(250); + if pos_temp.unwrap() >= end { + let _ = playback_tx.try_send(PlayerCommand::EndOfStream); + playbin + .write() + .unwrap() + .set_state(gst::State::Ready) + .expect("Unable to set the pipeline state"); + } else if pos_temp.unwrap() >= finish_point { + let _ = playback_tx.try_send(PlayerCommand::AboutToFinish); + } + + // This has to be done AFTER the current time in the file + // is calculated, or everything else is wrong + pos_temp = Some(pos_temp.unwrap() - start) + }, + PlaybackInfo::Finished => { + *position.write().unwrap() = None; + break + }, + PlaybackInfo::Idle | PlaybackInfo::Switching => {}, + _ => () + } + + *position.write().unwrap() = pos_temp; } } diff --git a/src/music_player/player.rs b/src/music_player/player.rs index ddf79c4..c2e24d9 100644 --- a/src/music_player/player.rs +++ b/src/music_player/player.rs @@ -1,17 +1,16 @@ use chrono::Duration; -use gstreamer as gst; use thiserror::Error; use crate::music_storage::library::URI; #[derive(Error, Debug)] pub enum PlayerError { - #[error("player initialization failed")] - Init(#[from] glib::Error), - #[error("element factory failed to create playbin3")] - Factory(#[from] glib::BoolError), + #[error("player initialization failed: {0}")] + Init(String), #[error("could not change playback state")] - StateChange(#[from] gst::StateChangeError), + StateChange(String), + #[error("seeking failed: {0}")] + Seek(String), #[error("the file or source is not found")] NotFound, #[error("failed to build gstreamer item")] @@ -19,11 +18,31 @@ pub enum PlayerError { #[error("poison error")] Poison, #[error("general player error")] - General, + General(String), +} + +#[derive(Debug, PartialEq, Eq)] +pub enum PlayerState { + Playing, + Paused, + Ready, + Buffering(u8), + Null, + VoidPending, +} + +#[derive(Debug)] +pub enum PlayerCommand { + Play, + Pause, + EndOfStream, + AboutToFinish, } pub trait Player { - fn new() -> Self; + /// Create a new player + fn new() -> Result where Self: Sized; + fn source(&self) -> &Option; fn enqueue_next(&mut self, next_track: &URI) -> Result<(), PlayerError>; @@ -48,9 +67,9 @@ pub trait Player { fn duration(&mut self) -> Option; - fn raw_duration(&self) -> Option; - fn seek_by(&mut self, seek_amount: Duration) -> Result<(), PlayerError>; fn seek_to(&mut self, target_pos: Duration) -> Result<(), PlayerError>; + + fn message_channel(&self) -> &crossbeam::channel::Receiver; }