use std::sync::mpsc::{self, Sender, Receiver}; use std::sync::{Arc, RwLock}; use std::thread; use std::io::SeekFrom; use async_std::io::ReadExt; use async_std::task; use futures::future::join_all; use symphonia::core::codecs::{CODEC_TYPE_NULL, DecoderOptions, Decoder}; use symphonia::core::formats::{FormatOptions, FormatReader, SeekMode, SeekTo}; use symphonia::core::io::{MediaSourceStream, MediaSource}; use symphonia::core::meta::MetadataOptions; use symphonia::core::probe::Hint; use symphonia::core::errors::Error; use symphonia::core::units::{Time, TimeBase}; use futures::AsyncBufRead; use crate::music_controller::config::Config; use crate::music_player::music_output::AudioStream; use crate::music_processor::music_processor::MusicProcessor; use crate::music_storage::music_db::{URI, Song}; use crate::music_tracker::music_tracker::{MusicTracker, LastFM, DiscordRPCConfig, DiscordRPC, TrackerError}; // Struct that controls playback of music pub struct MusicPlayer { pub music_processor: MusicProcessor, player_status: PlayerStatus, music_trackers: Vec>, current_song: Arc>>, message_sender: Sender, status_receiver: Receiver, config: Arc>, } #[derive(Clone, Copy, Debug)] pub enum PlayerStatus { Playing(f64), Paused, Stopped, Error, } #[derive(Debug, Clone)] pub enum DecoderMessage { OpenSong(Song), Play, Pause, Stop, SeekTo(u64), DSP(DSPMessage) } #[derive(Clone)] pub enum TrackerMessage { Track(Song), TrackNow(Song) } #[derive(Debug, Clone)] pub enum DSPMessage { UpdateProcessor(Box) } // Holds a song decoder reader, etc struct SongHandler { pub reader: Box, pub decoder: Box, pub time_base: Option, pub duration: Option, } // TODO: actual error handling here impl SongHandler { pub fn new(uri: &URI) -> Result { // Opens remote/local source and creates MediaSource for symphonia let config = RemoteOptions {media_buffer_len: 10000, forward_buffer_len: 10000}; let src: Box = match uri { URI::Local(path) => { match std::fs::File::open(path) { Ok(file) => Box::new(file), Err(_) => return Err(()), } }, URI::Remote(_, location) => { match RemoteSource::new(location.as_ref(), &config) { Ok(remote_source) => Box::new(remote_source), Err(_) => return Err(()), } }, }; let mss = MediaSourceStream::new(src, Default::default()); // Use default metadata and format options let meta_opts: MetadataOptions = Default::default(); let fmt_opts: FormatOptions = Default::default(); let mut hint = Hint::new(); let probed = symphonia::default::get_probe().format(&hint, mss, &fmt_opts, &meta_opts).expect("Unsupported format"); let mut reader = probed.format; let track = reader.tracks() .iter() .find(|t| t.codec_params.codec != CODEC_TYPE_NULL) .expect("no supported audio tracks"); let time_base = track.codec_params.time_base; let duration = track.codec_params.n_frames; let dec_opts: DecoderOptions = Default::default(); let mut decoder = symphonia::default::get_codecs().make(&track.codec_params, &dec_opts) .expect("unsupported codec"); return Ok(SongHandler {reader, decoder, time_base, duration}); } } impl MusicPlayer { pub fn new(config: Arc>) -> Self { // Creates mpsc channels to communicate with music player threads let (message_sender, message_receiver) = mpsc::channel(); let (status_sender, status_receiver) = mpsc::channel(); let current_song = Arc::new(RwLock::new(None)); MusicPlayer::start_player(message_receiver, status_sender, config.clone(), current_song.clone()); MusicPlayer { music_processor: MusicProcessor::new(), music_trackers: Vec::new(), player_status: PlayerStatus::Stopped, current_song, message_sender, status_receiver, config, } } fn start_tracker(status_sender: Sender>, tracker_receiver: Receiver, config: Arc>) { thread::spawn(move || { let global_config = &*config.read().unwrap(); // Sets local config for trackers to detect changes let local_config = global_config.clone(); let mut trackers: Vec> = Vec::new(); // Updates local trackers to the music controller config let update_trackers = |trackers: &mut Vec>|{ if let Some(lastfm_config) = global_config.lastfm.clone() { trackers.push(Box::new(LastFM::new(&lastfm_config))); } if let Some(discord_config) = global_config.discord.clone() { trackers.push(Box::new(DiscordRPC::new(&discord_config))); } }; update_trackers(&mut trackers); loop { if let message = tracker_receiver.recv() { if local_config != global_config { update_trackers(&mut trackers); } let mut results = Vec::new(); task::block_on(async { let mut futures = Vec::new(); for tracker in trackers.iter_mut() { match message.clone() { Ok(TrackerMessage::Track(song)) => futures.push(tracker.track_song(song)), Ok(TrackerMessage::TrackNow(song)) => futures.push(tracker.track_now(song)), Err(_) => {}, } } results = join_all(futures).await; }); for result in results { status_sender.send(result).unwrap_or_default() } } } }); } // Opens and plays song with given path in separate thread fn start_player(message_receiver: Receiver, status_sender: Sender, config: Arc>, current_song: Arc>>) { // Creates thread that audio is decoded in thread::spawn(move || { let current_song = current_song; let mut song_handler = None; let mut seek_time: Option = None; let mut audio_output: Option> = None; let mut music_processor = MusicProcessor::new(); let (tracker_sender, tracker_receiver): (Sender, Receiver) = mpsc::channel(); let (tracker_status_sender, tracker_status_receiver): (Sender>, Receiver>) = mpsc::channel(); MusicPlayer::start_tracker(tracker_status_sender, tracker_receiver, config); let mut song_tracked = false; let mut song_time = 0.0; let mut paused = true; 'main_decode: loop { 'handle_message: loop { let message = if paused { // Pauses playback by blocking on waiting for new player messages match message_receiver.recv() { Ok(message) => Some(message), Err(_) => None, } } else { // Resumes playback by not blocking match message_receiver.try_recv() { Ok(message) => Some(message), Err(_) => break 'handle_message, } }; // Handles message received from MusicPlayer struct match message { Some(DecoderMessage::OpenSong(song)) => { let song_uri = song.path.clone(); match SongHandler::new(&song_uri) { Ok(new_handler) => { song_handler = Some(new_handler); *current_song.write().unwrap() = Some(song); paused = false; song_tracked = false; } Err(_) => status_sender.send(PlayerStatus::Error).unwrap(), } } Some(DecoderMessage::Play) => { if song_handler.is_some() { paused = false; } } Some(DecoderMessage::Pause) => { paused = true; status_sender.send(PlayerStatus::Paused).unwrap(); } Some(DecoderMessage::SeekTo(time)) => seek_time = Some(time), Some(DecoderMessage::DSP(dsp_message)) => { match dsp_message { DSPMessage::UpdateProcessor(new_processor) => music_processor = *new_processor, } } // Exits main decode loop and subsequently ends thread Some(DecoderMessage::Stop) => { status_sender.send(PlayerStatus::Stopped).unwrap(); break 'main_decode } None => {}, } status_sender.send(PlayerStatus::Error).unwrap(); } // In theory this check should not need to occur? if let (Some(song_handler), current_song) = (&mut song_handler, &*current_song.read().unwrap()) { match seek_time { Some(time) => { let seek_to = SeekTo::Time { time: Time::from(time), track_id: Some(0) }; song_handler.reader.seek(SeekMode::Accurate, seek_to).unwrap(); seek_time = None; } None => {} //Nothing to do! } let packet = match song_handler.reader.next_packet() { Ok(packet) => packet, Err(Error::ResetRequired) => panic!(), //TODO, Err(err) => { // Unrecoverable? panic!("{}", err); } }; if let (Some(time_base), Some(song)) = (song_handler.time_base, current_song) { let time_units = time_base.calc_time(packet.ts); song_time = time_units.seconds as f64 + time_units.frac; // Tracks song now if song has just started if song_time == 0.0 { tracker_sender.send(TrackerMessage::TrackNow(song.clone())).unwrap(); } if let Some(duration) = song_handler.duration { let song_duration = time_base.calc_time(duration); let song_duration_secs = song_duration.seconds as f64 + song_duration.frac; // Tracks song if current time is past half of total song duration or past 4 minutes if (song_duration_secs / 2.0 < song_time || song_time > 240.0) && !song_tracked { song_tracked = true; tracker_sender.send(TrackerMessage::Track(song.clone())).unwrap(); } } } status_sender.send(PlayerStatus::Playing(song_time)).unwrap(); match song_handler.decoder.decode(&packet) { Ok(decoded) => { // Opens audio stream if there is not one if audio_output.is_none() { let spec = *decoded.spec(); let duration = decoded.capacity() as u64; audio_output.replace(crate::music_player::music_output::open_stream(spec, duration).unwrap()); } // Handles audio normally provided there is an audio stream if let Some(ref mut audio_output) = audio_output { // Changes buffer of the MusicProcessor if the packet has a differing capacity or spec if music_processor.audio_buffer.capacity() != decoded.capacity() ||music_processor.audio_buffer.spec() != decoded.spec() { let spec = *decoded.spec(); let duration = decoded.capacity() as u64; music_processor.set_buffer(duration, spec); } let transformed_audio = music_processor.process(&decoded); // Writes transformed packet to audio out audio_output.write(transformed_audio).unwrap() } }, Err(Error::IoError(_)) => { // rest in peace packet continue; }, Err(Error::DecodeError(_)) => { // may you one day be decoded continue; }, Err(err) => { // Unrecoverable, though shouldn't panic here panic!("{}", err); } } } } }); } // Updates status by checking on messages from spawned thread fn update_player(&mut self) { for message in self.status_receiver.try_recv() { self.player_status = message; } } pub fn get_current_song(&self) -> Option{ match self.current_song.try_read() { Ok(song) => return (*song).clone(), Err(_) => return None, } } // Sends message to spawned thread pub fn send_message(&mut self, message: DecoderMessage) { self.update_player(); // Checks that message sender exists before sending a message off self.message_sender.send(message).unwrap(); } pub fn get_status(&mut self) -> PlayerStatus { self.update_player(); return self.player_status; } } // TODO: Make the buffer length do anything /// Options for remote sources /// /// media_buffer_len is how many bytes are to be buffered in totala /// /// forward_buffer is how many bytes can ahead of the seek position without the remote source being read from pub struct RemoteOptions { media_buffer_len: u64, forward_buffer_len: u64, } impl Default for RemoteOptions { fn default() -> Self { RemoteOptions { media_buffer_len: 100000, forward_buffer_len: 1024, } } } /// A remote source of media struct RemoteSource { reader: Box, media_buffer: Vec, forward_buffer_len: u64, offset: u64, } impl RemoteSource { /// Creates a new RemoteSource with given uri and configuration pub fn new(uri: &str, config: &RemoteOptions) -> Result { let mut response = task::block_on(async { return surf::get(uri).await; })?; let reader = response.take_body().into_reader(); Ok(RemoteSource { reader, media_buffer: Vec::new(), forward_buffer_len: config.forward_buffer_len, offset: 0, }) } } // TODO: refactor this + buffer into the buffer passed into the function, not a newly allocated one impl std::io::Read for RemoteSource { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { // Reads bytes into the media buffer if the offset is within the specified distance from the end of the buffer if self.media_buffer.len() as u64 - self.offset < self.forward_buffer_len { let mut buffer = [0; 1024]; let read_bytes = task::block_on(async { match self.reader.read_exact(&mut buffer).await { Ok(_) => { self.media_buffer.extend_from_slice(&buffer); return Ok(()); }, Err(err) => return Err(err), } }); match read_bytes { Err(err) => return Err(err), _ => {}, } } // Reads bytes from the media buffer into the buffer given by let mut bytes_read = 0; for location in 0..1024 { if (location + self.offset as usize) < self.media_buffer.len() { buf[location] = self.media_buffer[location + self.offset as usize]; bytes_read += 1; } } self.offset += bytes_read; return Ok(bytes_read as usize); } } impl std::io::Seek for RemoteSource { // Seeks to a given position // Seeking past the internal buffer's length results in the seeking to the end of content fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result { match pos { // Offset is set to given position SeekFrom::Start(pos) => { if pos > self.media_buffer.len() as u64{ self.offset = self.media_buffer.len() as u64; } else { self.offset = pos; } return Ok(self.offset); }, // Offset is set to length of buffer + given position SeekFrom::End(pos) => { if self.media_buffer.len() as u64 + pos as u64 > self.media_buffer.len() as u64 { self.offset = self.media_buffer.len() as u64; } else { self.offset = self.media_buffer.len() as u64 + pos as u64; } return Ok(self.offset); }, // Offset is set to current offset + given position SeekFrom::Current(pos) => { if self.offset + pos as u64 > self.media_buffer.len() as u64{ self.offset = self.media_buffer.len() as u64; } else { self.offset += pos as u64 } return Ok(self.offset); }, } } } impl MediaSource for RemoteSource { fn is_seekable(&self) -> bool { return true; } fn byte_len(&self) -> Option { return None; } }