Fix element not moving to PLAYING state

This commit is contained in:
Rafael Caricio 2021-05-14 16:30:15 +02:00
parent 88dcd68ab0
commit eed4bccab9
Signed by: rafaelcaricio
GPG key ID: 3C86DBCE8E93C947
3 changed files with 104 additions and 89 deletions

View file

@ -8,10 +8,7 @@ use crate::playlist::PlaylistRenderState;
use m3u8_rs::playlist::{MediaPlaylist, MediaPlaylistType, MediaSegment};
use once_cell::sync::Lazy;
use std::fs::{File, OpenOptions};
use std::io::Write;
use std::path;
use std::sync::{Arc, Mutex, MutexGuard};
use gio::glib::WeakRef;
use std::sync::{Arc, Mutex};
const DEFAULT_LOCATION: &str = "segment%05d.ts";
const DEFAULT_PLAYLIST_LOCATION: &str = "playlist.m3u8";
@ -43,7 +40,6 @@ struct Settings {
// TODO: old_locations ? Maybe just use another thread and send msgs with files to delete ?
splitmuxsink: Option<gst::Element>,
giostreamsink: Option<gst::Element>,
muxer: Option<gst::Element>,
video_sink: bool,
audio_sink: bool,
}
@ -61,7 +57,6 @@ impl Default for Settings {
splitmuxsink: None,
giostreamsink: None,
muxer: None,
video_sink: false,
audio_sink: false,
}
@ -101,7 +96,49 @@ impl FlexHlsSink {
}
}
fn start(
&self,
element: &super::FlexHlsSink,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_info!(CAT, obj: element, "Starting");
let settings = self.settings.lock().unwrap();
let target_duration = settings.target_duration as f32;
let mut state = self.state.lock().unwrap();
if let State::Stopped = *state {
*state = State::Started {
playlist: MediaPlaylist {
version: GST_M3U8_PLAYLIST_VERSION,
target_duration,
media_sequence: 0,
segments: vec![],
discontinuity_sequence: 0,
end_list: false,
playlist_type: Some(MediaPlaylistType::Vod),
i_frames_only: false,
start: None,
independent_segments: false,
unknown_tags: vec![],
},
playlist_render_state: PlaylistRenderState::Init,
playlist_index: 0,
current_segment_location: None,
current_running_time_start: None,
current_segment_file: None,
};
}
Ok(gst::StateChangeSuccess::Success)
}
fn on_format_location(&self, fragment_id: u32) -> Result<String, String> {
gst_info!(
CAT,
"Starting the formatting of the fragment-id: {}",
fragment_id
);
let mut state = self.state.lock().unwrap();
let (current_segment_location, current_segment_file) = match &mut *state {
State::Stopped => return Err("Not in Started state".to_string()),
@ -112,7 +149,7 @@ impl FlexHlsSink {
} => (current_segment_location, current_segment_file),
};
let mut settings = self.settings.lock().unwrap();
let settings = self.settings.lock().unwrap();
let seq_num = format!("{:0>5}", fragment_id);
let segment_file_location = settings
@ -162,51 +199,6 @@ impl FlexHlsSink {
Ok(gio::WriteOutputStream::new(file_stream))
}
fn start(
&self,
element: &super::FlexHlsSink,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_debug!(CAT, obj: element, "Starting");
let settings = self.settings.lock().unwrap();
let mut state = self.state.lock().unwrap();
if let State::Stopped = *state {
*state = State::Started {
playlist: MediaPlaylist {
version: GST_M3U8_PLAYLIST_VERSION,
target_duration: settings.target_duration as f32,
media_sequence: 0,
segments: vec![],
discontinuity_sequence: 0,
end_list: false,
playlist_type: Some(MediaPlaylistType::Event),
i_frames_only: false,
start: None,
independent_segments: false,
unknown_tags: vec![],
},
playlist_render_state: PlaylistRenderState::Init,
playlist_index: 0,
current_segment_location: None,
current_running_time_start: None,
current_segment_file: None,
};
}
Ok(gst::StateChangeSuccess::Success)
}
fn stop(&self, element: &super::FlexHlsSink) {
gst_debug!(CAT, obj: element, "Stopping");
let mut state = self.state.lock().unwrap();
if let State::Started { .. } = *state {
*state = State::Stopped;
}
gst_debug!(CAT, obj: element, "Stopped");
}
fn write_playlist(
&self,
element: &super::FlexHlsSink,
@ -221,7 +213,7 @@ impl FlexHlsSink {
current_running_time_start,
playlist,
current_segment_location,
playlist_render_state: render_state,
playlist_render_state,
..
} => {
gst_info!(CAT, "COUNT {}", playlist.segments.len());
@ -280,7 +272,7 @@ impl FlexHlsSink {
})?;
// TODO: clean up (delete) old segment files
*render_state = PlaylistRenderState::Started;
*playlist_render_state = PlaylistRenderState::Started;
*current_segment_location = None;
}
};
@ -294,9 +286,19 @@ impl FlexHlsSink {
element: &super::FlexHlsSink,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_debug!(CAT, obj: element, "Preparing to write final playlist");
Ok(self.write_playlist(element, element.current_running_time())?)
}
fn stop(&self, element: &super::FlexHlsSink) {
gst_debug!(CAT, obj: element, "Stopping");
let mut state = self.state.lock().unwrap();
if let State::Started { .. } = *state {
*state = State::Stopped;
}
gst_debug!(CAT, obj: element, "Stopped");
}
}
#[glib::object_subclass]
@ -324,12 +326,10 @@ impl BinImpl for FlexHlsSink {
== Some(settings.splitmuxsink.as_ref().unwrap().upcast_ref())
{
let s = msg.structure().unwrap();
if msg
.structure()
.map(|s| s.name() == "splitmuxsink-fragment-opened")
.unwrap_or(false)
match s.name() {
"splitmuxsink-fragment-opened" => {
if let Ok(fragment_opened_at) = s.get::<gst::ClockTime>("running-time")
{
if let Ok(fragment_opened_at) = s.get::<gst::ClockTime>("running-time") {
let mut state = self.state.lock().unwrap();
match &mut *state {
State::Stopped => return,
@ -340,16 +340,15 @@ impl BinImpl for FlexHlsSink {
};
}
}
if msg
.structure()
.map(|s| s.name() == "splitmuxsink-fragment-closed")
.unwrap_or(false)
{
"splitmuxsink-fragment-closed" => {
let s = msg.structure().unwrap();
if let Ok(fragment_closed_at) = s.get::<gst::ClockTime>("running-time") {
if let Ok(fragment_closed_at) = s.get::<gst::ClockTime>("running-time")
{
self.write_playlist(element, fragment_closed_at).unwrap();
}
}
_ => {}
}
}
}
_ => self.parent_handle_message(element, msg),
@ -507,7 +506,6 @@ impl ObjectImpl for FlexHlsSink {
.expect("Could not make element splitmuxsink");
let giostreamsink = gst::ElementFactory::make("giostreamsink", Some("giostream_sink"))
.expect("Could not make element giostreamsink");
giostreamsink.set_property("async", &false).unwrap();
let mux = gst::ElementFactory::make("mpegtsmux", Some("mpeg-ts_mux"))
.expect("Could not make element mpegtsmux");
@ -527,8 +525,8 @@ impl ObjectImpl for FlexHlsSink {
])
.unwrap();
obj.add(&splitmuxsink).unwrap();
obj.set_element_flags(gst::ElementFlags::SINK);
obj.add(&splitmuxsink).unwrap();
let this = self.clone();
splitmuxsink
@ -547,12 +545,8 @@ impl ObjectImpl for FlexHlsSink {
})
.unwrap();
let temp_stream = gio::MemoryOutputStream::new_resizable();
giostreamsink.set_property("stream", &temp_stream).unwrap();
settings.splitmuxsink = Some(splitmuxsink);
settings.giostreamsink = Some(giostreamsink);
settings.muxer = Some(mux);
}
}
@ -608,19 +602,39 @@ impl ElementImpl for FlexHlsSink {
_ => (),
}
self.parent_change_state(element, transition)?;
let ret = self.parent_change_state(element, transition)?;
match transition {
gst::StateChange::PausedToReady => {
// Turning down
let mut state = self.state.lock().unwrap();
let write_final = match &mut *state {
State::Stopped => false,
State::Started {
playlist,
playlist_render_state,
..
} => {
if *playlist_render_state == PlaylistRenderState::Started {
playlist.end_list = true;
true
} else {
false
}
}
};
if write_final {
self.write_final_playlist(element)?;
}
}
gst::StateChange::ReadyToNull => {
self.stop(element);
}
_ => (),
}
Ok(gst::StateChangeSuccess::Success)
Ok(ret)
}
fn request_new_pad(
@ -628,7 +642,7 @@ impl ElementImpl for FlexHlsSink {
element: &Self::Type,
templ: &gst::PadTemplate,
_name: Option<String>,
caps: Option<&gst::Caps>,
_caps: Option<&gst::Caps>,
) -> Option<gst::Pad> {
let mut settings = self.settings.lock().unwrap();
match templ.name_template().as_ref().map(|val| val.as_str()) {

View file

@ -12,6 +12,7 @@ impl MediaPlaylist {
}
}
#[derive(Copy, Clone, PartialEq)]
pub enum PlaylistRenderState {
Init,
Started,

View file

@ -29,7 +29,7 @@ fn init() {
fn test_basic_element_with_video_content() {
init();
const BUFFER_NB: i32 = 100;
const BUFFER_NB: i32 = 200;
let pipeline = gst::Pipeline::new(Some("video_pipeline"));