From 17c2745cdd366cb17657fe257344fb14c56029ba Mon Sep 17 00:00:00 2001 From: G2-Games Date: Thu, 7 Dec 2023 15:14:03 -0600 Subject: [PATCH] Added buffering and bus message watching --- src/music_player.rs | 132 ++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 121 insertions(+), 11 deletions(-) diff --git a/src/music_player.rs b/src/music_player.rs index 0bae684..647e470 100644 --- a/src/music_player.rs +++ b/src/music_player.rs @@ -1,6 +1,6 @@ // Crate things //use crate::music_controller::config::Config; -use crate::music_storage::music_db::URI; +use crate::music_storage::music_db::{URI, Tag}; use crossbeam_channel::bounded; use std::error::Error; use std::sync::{Arc, RwLock}; @@ -22,17 +22,55 @@ pub enum PlayerCmd { AboutToFinish, } +#[derive(Debug)] +pub enum PlayerState { + Playing, + Paused, + Ready, + Buffering(u8), + Null, + VoidPending, +} + +impl From for PlayerState { + fn from(value: gst::State) -> Self { + match value { + gst::State::VoidPending => Self::VoidPending, + gst::State::Playing => Self::Playing, + gst::State::Paused => Self::Paused, + gst::State::Ready => Self::Ready, + gst::State::Null => Self::Null, + } + } +} + +impl TryInto for PlayerState { + fn try_into(self) -> Result> { + match self { + Self::VoidPending => Ok(gst::State::VoidPending), + Self::Playing => Ok(gst::State::Playing), + Self::Paused => Ok(gst::State::Paused), + Self::Ready => Ok(gst::State::Ready), + Self::Null => Ok(gst::State::Null), + state => Err(format!("Invalid gst::State: {:?}", state).into()) + } + } + + type Error = Box; +} + /// An instance of a music player with a GStreamer backend pub struct Player { source: Option, //pub message_tx: Sender, pub message_rx: crossbeam::channel::Receiver, playbin: Arc>, - paused: bool, volume: f64, start: Arc>>, end: Arc>>, - pub position: Arc>>, + position: Arc>>, + buffer: Arc>>, + paused: Arc>, } impl Default for Player { @@ -45,6 +83,9 @@ impl Player { pub fn new() -> Self { // Initialize GStreamer gst::init().unwrap(); + let ctx = glib::MainContext::default(); + let _guard = ctx.acquire(); + let mainloop = glib::MainLoop::new(Some(&ctx), false); let playbin_arc = Arc::new(RwLock::new( gst::ElementFactory::make("playbin3").build().unwrap(), @@ -76,11 +117,14 @@ impl Player { let position = Arc::new(RwLock::new(None)); let start = Arc::new(RwLock::new(None)); let end: Arc>> = Arc::new(RwLock::new(None)); + let buffer = Arc::new(RwLock::new(None)); + let paused = Arc::new(RwLock::new(false)); + // Set up the thread to monitor the position let position_update = position.clone(); let start_update = Arc::clone(&start); let end_update = Arc::clone(&end); - let (message_tx, message_rx) = bounded(1); + let (message_tx, message_rx) = bounded(1); //TODO: Maybe figure out a better method than making this bounded std::thread::spawn(move || { loop { let mut pos_temp = playbin_arc @@ -95,7 +139,7 @@ impl Player { { let atf = end_update.read().unwrap().unwrap() - Duration::milliseconds(250); if pos_temp.unwrap() >= end_update.read().unwrap().unwrap() { - message_tx.try_send(PlayerCmd::Eos).unwrap(); + let _ = message_tx.try_send(PlayerCmd::Eos); playbin_arc .write() .unwrap() @@ -114,22 +158,70 @@ impl Player { } } + //println!("{:?}", pos_temp); + *position_update.write().unwrap() = pos_temp; - std::thread::sleep(std::time::Duration::from_millis(50)); + std::thread::sleep(std::time::Duration::from_millis(100)); } }); + // Set up the thread to monitor bus messages + let playbin_bus_ctrl = Arc::clone(&playbin); + let buffer_bus_ctrl = Arc::clone(&buffer); + let paused_bus_ctrl = Arc::clone(&paused); + let bus_watch = playbin + .read() + .unwrap() + .bus() + .expect("Failed to get GStreamer message bus") + .add_watch(move |_bus, msg| { + match msg.view() { + gst::MessageView::Eos(_) => {}, + gst::MessageView::StreamStart(_) => {}, + gst::MessageView::Error(e) => + println!("song {}", e.error()), + gst::MessageView::Tag(tag) => { + if let Some(title) = tag.tags().get::() { + println!(" Title: {}", title.get()); + } + if let Some(album) = tag.tags().get::() { + println!(" Album: {}", album.get()); + } + } + gst::MessageView::Buffering(buffering) => { + let percent = buffering.percent(); + if percent < 100 { + *buffer_bus_ctrl.write().unwrap() = Some(percent as u8); + playbin_bus_ctrl.write().unwrap().set_state(gst::State::Paused).unwrap(); + } else if *paused_bus_ctrl.read().unwrap() == false { + *buffer_bus_ctrl.write().unwrap() = None; + playbin_bus_ctrl.write().unwrap().set_state(gst::State::Playing).unwrap(); + } + } + _ => (), + } + glib::ControlFlow::Continue + }) + .expect("Failed to connect to GStreamer message bus"); + + // Set up a thread to watch the messages + std::thread::spawn(move || { + let _watch = bus_watch; + mainloop.run() + }); + let source = None; Self { source, playbin, message_rx, - paused: false, volume: 1.0, start, end, + paused, position, + buffer, } } @@ -216,13 +308,13 @@ impl Player { /// Pause, if playing pub fn pause(&mut self) -> Result<(), gst::StateChangeError> { - self.paused = true; + *self.paused.write().unwrap() = true; self.set_state(gst::State::Paused) } /// Resume from being paused pub fn resume(&mut self) -> Result<(), gst::StateChangeError> { - self.paused = false; + *self.paused.write().unwrap() = false; self.set_state(gst::State::Playing) } @@ -296,13 +388,31 @@ impl Player { Ok(()) } - pub fn state(&mut self) -> gst::State { - self.playbin.read().unwrap().current_state() + /// Get the current state of the playback + pub fn state(&mut self) -> PlayerState { + match *self.buffer.read().unwrap() { + None => self.playbin.read().unwrap().current_state().into(), + Some(value) => { + PlayerState::Buffering(value) + } + } } pub fn property(&self, property: &str) -> glib::Value { self.playbin.read().unwrap().property_value(property) } + + /// Stop the playback entirely + pub fn stop(&mut self) -> Result<(), gst::StateChangeError> { + self.pause()?; + self.ready()?; + + // Set all positions to none + *self.position.write().unwrap() = None; + *self.start.write().unwrap() = None; + *self.end.write().unwrap() = None; + Ok(()) + } } impl Drop for Player {