From eed4bccab91147dc387749c54f9692ebadaddaa9 Mon Sep 17 00:00:00 2001 From: Rafael Caricio Date: Fri, 14 May 2021 16:30:15 +0200 Subject: [PATCH] Fix element not moving to PLAYING state --- src/imp.rs | 190 +++++++++++++++++++++++-------------------- src/playlist.rs | 1 + tests/flexhlssink.rs | 2 +- 3 files changed, 104 insertions(+), 89 deletions(-) diff --git a/src/imp.rs b/src/imp.rs index 249691a..385c01f 100644 --- a/src/imp.rs +++ b/src/imp.rs @@ -8,10 +8,7 @@ use crate::playlist::PlaylistRenderState; use m3u8_rs::playlist::{MediaPlaylist, MediaPlaylistType, MediaSegment}; use once_cell::sync::Lazy; use std::fs::{File, OpenOptions}; -use std::io::Write; -use std::path; -use std::sync::{Arc, Mutex, MutexGuard}; -use gio::glib::WeakRef; +use std::sync::{Arc, Mutex}; const DEFAULT_LOCATION: &str = "segment%05d.ts"; const DEFAULT_PLAYLIST_LOCATION: &str = "playlist.m3u8"; @@ -43,7 +40,6 @@ struct Settings { // TODO: old_locations ? Maybe just use another thread and send msgs with files to delete ? splitmuxsink: Option, giostreamsink: Option, - muxer: Option, video_sink: bool, audio_sink: bool, } @@ -61,7 +57,6 @@ impl Default for Settings { splitmuxsink: None, giostreamsink: None, - muxer: None, video_sink: false, audio_sink: false, } @@ -101,7 +96,49 @@ impl FlexHlsSink { } } + fn start( + &self, + element: &super::FlexHlsSink, + ) -> Result { + gst_info!(CAT, obj: element, "Starting"); + + let settings = self.settings.lock().unwrap(); + let target_duration = settings.target_duration as f32; + + let mut state = self.state.lock().unwrap(); + if let State::Stopped = *state { + *state = State::Started { + playlist: MediaPlaylist { + version: GST_M3U8_PLAYLIST_VERSION, + target_duration, + media_sequence: 0, + segments: vec![], + discontinuity_sequence: 0, + end_list: false, + playlist_type: Some(MediaPlaylistType::Vod), + i_frames_only: false, + start: None, + independent_segments: false, + unknown_tags: vec![], + }, + playlist_render_state: PlaylistRenderState::Init, + playlist_index: 0, + current_segment_location: None, + current_running_time_start: None, + current_segment_file: None, + }; + } + + Ok(gst::StateChangeSuccess::Success) + } + fn on_format_location(&self, fragment_id: u32) -> Result { + gst_info!( + CAT, + "Starting the formatting of the fragment-id: {}", + fragment_id + ); + let mut state = self.state.lock().unwrap(); let (current_segment_location, current_segment_file) = match &mut *state { State::Stopped => return Err("Not in Started state".to_string()), @@ -112,7 +149,7 @@ impl FlexHlsSink { } => (current_segment_location, current_segment_file), }; - let mut settings = self.settings.lock().unwrap(); + let settings = self.settings.lock().unwrap(); let seq_num = format!("{:0>5}", fragment_id); let segment_file_location = settings @@ -162,51 +199,6 @@ impl FlexHlsSink { Ok(gio::WriteOutputStream::new(file_stream)) } - fn start( - &self, - element: &super::FlexHlsSink, - ) -> Result { - gst_debug!(CAT, obj: element, "Starting"); - - let settings = self.settings.lock().unwrap(); - let mut state = self.state.lock().unwrap(); - if let State::Stopped = *state { - *state = State::Started { - playlist: MediaPlaylist { - version: GST_M3U8_PLAYLIST_VERSION, - target_duration: settings.target_duration as f32, - media_sequence: 0, - segments: vec![], - discontinuity_sequence: 0, - end_list: false, - playlist_type: Some(MediaPlaylistType::Event), - i_frames_only: false, - start: None, - independent_segments: false, - unknown_tags: vec![], - }, - playlist_render_state: PlaylistRenderState::Init, - playlist_index: 0, - current_segment_location: None, - current_running_time_start: None, - current_segment_file: None, - }; - } - - Ok(gst::StateChangeSuccess::Success) - } - - fn stop(&self, element: &super::FlexHlsSink) { - gst_debug!(CAT, obj: element, "Stopping"); - - let mut state = self.state.lock().unwrap(); - if let State::Started { .. } = *state { - *state = State::Stopped; - } - - gst_debug!(CAT, obj: element, "Stopped"); - } - fn write_playlist( &self, element: &super::FlexHlsSink, @@ -221,7 +213,7 @@ impl FlexHlsSink { current_running_time_start, playlist, current_segment_location, - playlist_render_state: render_state, + playlist_render_state, .. } => { gst_info!(CAT, "COUNT {}", playlist.segments.len()); @@ -280,7 +272,7 @@ impl FlexHlsSink { })?; // TODO: clean up (delete) old segment files - *render_state = PlaylistRenderState::Started; + *playlist_render_state = PlaylistRenderState::Started; *current_segment_location = None; } }; @@ -294,9 +286,19 @@ impl FlexHlsSink { element: &super::FlexHlsSink, ) -> Result { gst_debug!(CAT, obj: element, "Preparing to write final playlist"); - Ok(self.write_playlist(element, element.current_running_time())?) } + + fn stop(&self, element: &super::FlexHlsSink) { + gst_debug!(CAT, obj: element, "Stopping"); + + let mut state = self.state.lock().unwrap(); + if let State::Started { .. } = *state { + *state = State::Stopped; + } + + gst_debug!(CAT, obj: element, "Stopped"); + } } #[glib::object_subclass] @@ -324,31 +326,28 @@ impl BinImpl for FlexHlsSink { == Some(settings.splitmuxsink.as_ref().unwrap().upcast_ref()) { let s = msg.structure().unwrap(); - if msg - .structure() - .map(|s| s.name() == "splitmuxsink-fragment-opened") - .unwrap_or(false) - { - if let Ok(fragment_opened_at) = s.get::("running-time") { - let mut state = self.state.lock().unwrap(); - match &mut *state { - State::Stopped => return, - State::Started { - current_running_time_start, - .. - } => *current_running_time_start = Some(fragment_opened_at), - }; + match s.name() { + "splitmuxsink-fragment-opened" => { + if let Ok(fragment_opened_at) = s.get::("running-time") + { + let mut state = self.state.lock().unwrap(); + match &mut *state { + State::Stopped => return, + State::Started { + current_running_time_start, + .. + } => *current_running_time_start = Some(fragment_opened_at), + }; + } } - } - if msg - .structure() - .map(|s| s.name() == "splitmuxsink-fragment-closed") - .unwrap_or(false) - { - let s = msg.structure().unwrap(); - if let Ok(fragment_closed_at) = s.get::("running-time") { - self.write_playlist(element, fragment_closed_at).unwrap(); + "splitmuxsink-fragment-closed" => { + let s = msg.structure().unwrap(); + if let Ok(fragment_closed_at) = s.get::("running-time") + { + self.write_playlist(element, fragment_closed_at).unwrap(); + } } + _ => {} } } } @@ -507,7 +506,6 @@ impl ObjectImpl for FlexHlsSink { .expect("Could not make element splitmuxsink"); let giostreamsink = gst::ElementFactory::make("giostreamsink", Some("giostream_sink")) .expect("Could not make element giostreamsink"); - giostreamsink.set_property("async", &false).unwrap(); let mux = gst::ElementFactory::make("mpegtsmux", Some("mpeg-ts_mux")) .expect("Could not make element mpegtsmux"); @@ -527,8 +525,8 @@ impl ObjectImpl for FlexHlsSink { ]) .unwrap(); - obj.add(&splitmuxsink).unwrap(); obj.set_element_flags(gst::ElementFlags::SINK); + obj.add(&splitmuxsink).unwrap(); let this = self.clone(); splitmuxsink @@ -547,12 +545,8 @@ impl ObjectImpl for FlexHlsSink { }) .unwrap(); - let temp_stream = gio::MemoryOutputStream::new_resizable(); - giostreamsink.set_property("stream", &temp_stream).unwrap(); - settings.splitmuxsink = Some(splitmuxsink); settings.giostreamsink = Some(giostreamsink); - settings.muxer = Some(mux); } } @@ -608,11 +602,31 @@ impl ElementImpl for FlexHlsSink { _ => (), } - self.parent_change_state(element, transition)?; + let ret = self.parent_change_state(element, transition)?; match transition { gst::StateChange::PausedToReady => { - self.write_final_playlist(element)?; + // Turning down + let mut state = self.state.lock().unwrap(); + let write_final = match &mut *state { + State::Stopped => false, + State::Started { + playlist, + playlist_render_state, + .. + } => { + if *playlist_render_state == PlaylistRenderState::Started { + playlist.end_list = true; + true + } else { + false + } + } + }; + + if write_final { + self.write_final_playlist(element)?; + } } gst::StateChange::ReadyToNull => { self.stop(element); @@ -620,7 +634,7 @@ impl ElementImpl for FlexHlsSink { _ => (), } - Ok(gst::StateChangeSuccess::Success) + Ok(ret) } fn request_new_pad( @@ -628,7 +642,7 @@ impl ElementImpl for FlexHlsSink { element: &Self::Type, templ: &gst::PadTemplate, _name: Option, - caps: Option<&gst::Caps>, + _caps: Option<&gst::Caps>, ) -> Option { let mut settings = self.settings.lock().unwrap(); match templ.name_template().as_ref().map(|val| val.as_str()) { diff --git a/src/playlist.rs b/src/playlist.rs index 5afbdb3..7f3c22e 100644 --- a/src/playlist.rs +++ b/src/playlist.rs @@ -12,6 +12,7 @@ impl MediaPlaylist { } } +#[derive(Copy, Clone, PartialEq)] pub enum PlaylistRenderState { Init, Started, diff --git a/tests/flexhlssink.rs b/tests/flexhlssink.rs index 7df4f6f..9f3a7f5 100644 --- a/tests/flexhlssink.rs +++ b/tests/flexhlssink.rs @@ -29,7 +29,7 @@ fn init() { fn test_basic_element_with_video_content() { init(); - const BUFFER_NB: i32 = 100; + const BUFFER_NB: i32 = 200; let pipeline = gst::Pipeline::new(Some("video_pipeline"));