Added buffering and bus message watching

This commit is contained in:
G2-Games 2023-12-07 15:14:03 -06:00
parent 5c2b718d79
commit 17c2745cdd

View file

@ -1,6 +1,6 @@
// Crate things
//use crate::music_controller::config::Config;
use crate::music_storage::music_db::URI;
use crate::music_storage::music_db::{URI, Tag};
use crossbeam_channel::bounded;
use std::error::Error;
use std::sync::{Arc, RwLock};
@ -22,17 +22,55 @@ pub enum PlayerCmd {
AboutToFinish,
}
#[derive(Debug)]
pub enum PlayerState {
Playing,
Paused,
Ready,
Buffering(u8),
Null,
VoidPending,
}
impl From<gst::State> for PlayerState {
fn from(value: gst::State) -> Self {
match value {
gst::State::VoidPending => Self::VoidPending,
gst::State::Playing => Self::Playing,
gst::State::Paused => Self::Paused,
gst::State::Ready => Self::Ready,
gst::State::Null => Self::Null,
}
}
}
impl TryInto<gst::State> for PlayerState {
fn try_into(self) -> Result<gst::State, Box<dyn Error>> {
match self {
Self::VoidPending => Ok(gst::State::VoidPending),
Self::Playing => Ok(gst::State::Playing),
Self::Paused => Ok(gst::State::Paused),
Self::Ready => Ok(gst::State::Ready),
Self::Null => Ok(gst::State::Null),
state => Err(format!("Invalid gst::State: {:?}", state).into())
}
}
type Error = Box<dyn Error>;
}
/// An instance of a music player with a GStreamer backend
pub struct Player {
source: Option<URI>,
//pub message_tx: Sender<PlayerCmd>,
pub message_rx: crossbeam::channel::Receiver<PlayerCmd>,
playbin: Arc<RwLock<Element>>,
paused: bool,
volume: f64,
start: Arc<RwLock<Option<Duration>>>,
end: Arc<RwLock<Option<Duration>>>,
pub position: Arc<RwLock<Option<Duration>>>,
position: Arc<RwLock<Option<Duration>>>,
buffer: Arc<RwLock<Option<u8>>>,
paused: Arc<RwLock<bool>>,
}
impl Default for Player {
@ -45,6 +83,9 @@ impl Player {
pub fn new() -> Self {
// Initialize GStreamer
gst::init().unwrap();
let ctx = glib::MainContext::default();
let _guard = ctx.acquire();
let mainloop = glib::MainLoop::new(Some(&ctx), false);
let playbin_arc = Arc::new(RwLock::new(
gst::ElementFactory::make("playbin3").build().unwrap(),
@ -76,11 +117,14 @@ impl Player {
let position = Arc::new(RwLock::new(None));
let start = Arc::new(RwLock::new(None));
let end: Arc<RwLock<Option<Duration>>> = Arc::new(RwLock::new(None));
let buffer = Arc::new(RwLock::new(None));
let paused = Arc::new(RwLock::new(false));
// Set up the thread to monitor the position
let position_update = position.clone();
let start_update = Arc::clone(&start);
let end_update = Arc::clone(&end);
let (message_tx, message_rx) = bounded(1);
let (message_tx, message_rx) = bounded(1); //TODO: Maybe figure out a better method than making this bounded
std::thread::spawn(move || {
loop {
let mut pos_temp = playbin_arc
@ -95,7 +139,7 @@ impl Player {
{
let atf = end_update.read().unwrap().unwrap() - Duration::milliseconds(250);
if pos_temp.unwrap() >= end_update.read().unwrap().unwrap() {
message_tx.try_send(PlayerCmd::Eos).unwrap();
let _ = message_tx.try_send(PlayerCmd::Eos);
playbin_arc
.write()
.unwrap()
@ -114,22 +158,70 @@ impl Player {
}
}
//println!("{:?}", pos_temp);
*position_update.write().unwrap() = pos_temp;
std::thread::sleep(std::time::Duration::from_millis(50));
std::thread::sleep(std::time::Duration::from_millis(100));
}
});
// Set up the thread to monitor bus messages
let playbin_bus_ctrl = Arc::clone(&playbin);
let buffer_bus_ctrl = Arc::clone(&buffer);
let paused_bus_ctrl = Arc::clone(&paused);
let bus_watch = playbin
.read()
.unwrap()
.bus()
.expect("Failed to get GStreamer message bus")
.add_watch(move |_bus, msg| {
match msg.view() {
gst::MessageView::Eos(_) => {},
gst::MessageView::StreamStart(_) => {},
gst::MessageView::Error(e) =>
println!("song {}", e.error()),
gst::MessageView::Tag(tag) => {
if let Some(title) = tag.tags().get::<gst::tags::Title>() {
println!(" Title: {}", title.get());
}
if let Some(album) = tag.tags().get::<gst::tags::Album>() {
println!(" Album: {}", album.get());
}
}
gst::MessageView::Buffering(buffering) => {
let percent = buffering.percent();
if percent < 100 {
*buffer_bus_ctrl.write().unwrap() = Some(percent as u8);
playbin_bus_ctrl.write().unwrap().set_state(gst::State::Paused).unwrap();
} else if *paused_bus_ctrl.read().unwrap() == false {
*buffer_bus_ctrl.write().unwrap() = None;
playbin_bus_ctrl.write().unwrap().set_state(gst::State::Playing).unwrap();
}
}
_ => (),
}
glib::ControlFlow::Continue
})
.expect("Failed to connect to GStreamer message bus");
// Set up a thread to watch the messages
std::thread::spawn(move || {
let _watch = bus_watch;
mainloop.run()
});
let source = None;
Self {
source,
playbin,
message_rx,
paused: false,
volume: 1.0,
start,
end,
paused,
position,
buffer,
}
}
@ -216,13 +308,13 @@ impl Player {
/// Pause, if playing
pub fn pause(&mut self) -> Result<(), gst::StateChangeError> {
self.paused = true;
*self.paused.write().unwrap() = true;
self.set_state(gst::State::Paused)
}
/// Resume from being paused
pub fn resume(&mut self) -> Result<(), gst::StateChangeError> {
self.paused = false;
*self.paused.write().unwrap() = false;
self.set_state(gst::State::Playing)
}
@ -296,13 +388,31 @@ impl Player {
Ok(())
}
pub fn state(&mut self) -> gst::State {
self.playbin.read().unwrap().current_state()
/// Get the current state of the playback
pub fn state(&mut self) -> PlayerState {
match *self.buffer.read().unwrap() {
None => self.playbin.read().unwrap().current_state().into(),
Some(value) => {
PlayerState::Buffering(value)
}
}
}
pub fn property(&self, property: &str) -> glib::Value {
self.playbin.read().unwrap().property_value(property)
}
/// Stop the playback entirely
pub fn stop(&mut self) -> Result<(), gst::StateChangeError> {
self.pause()?;
self.ready()?;
// Set all positions to none
*self.position.write().unwrap() = None;
*self.start.write().unwrap() = None;
*self.end.write().unwrap() = None;
Ok(())
}
}
impl Drop for Player {