From 0fe69cea9ff15b427051fcfb734b3f9b1723d8d6 Mon Sep 17 00:00:00 2001 From: Seungha Yang Date: Sun, 24 Sep 2023 18:19:49 +0900 Subject: [PATCH] hlssink3: Various cleanup * Simplify state/playlist management * Fix a bug that segment is not deleted if location contains directory and playlist-root is unset * Split playlist update routine into two steps, adding segment to playlist and playlist write Part-of: --- net/hlssink3/src/imp.rs | 334 +++++++++++++++++----------------------- 1 file changed, 144 insertions(+), 190 deletions(-) diff --git a/net/hlssink3/src/imp.rs b/net/hlssink3/src/imp.rs index 1d5b40bf..12fa2376 100644 --- a/net/hlssink3/src/imp.rs +++ b/net/hlssink3/src/imp.rs @@ -18,7 +18,7 @@ use m3u8_rs::MediaPlaylistType; use std::fs; use std::io::Write; use std::path; -use std::sync::{Arc, Mutex}; +use std::sync::Mutex; const DEFAULT_LOCATION: &str = "segment%05d.ts"; const DEFAULT_PLAYLIST_LOCATION: &str = "playlist.m3u8"; @@ -111,7 +111,7 @@ impl Default for Settings { } } -pub(crate) struct StartedState { +struct PlaylistContext { pdt_base_utc: Option>, pdt_base_running_time: Option, playlist: Playlist, @@ -121,45 +121,15 @@ pub(crate) struct StartedState { old_segment_locations: Vec, } -impl StartedState { - fn new( - target_duration: f32, - playlist_type: Option, - i_frames_only: bool, - ) -> Self { - Self { - pdt_base_utc: None, - pdt_base_running_time: None, - playlist: Playlist::new(target_duration, playlist_type, i_frames_only), - current_segment_location: None, - fragment_opened_at: None, - fragment_running_time: None, - old_segment_locations: Vec::new(), - } - } - - fn fragment_duration_since(&self, fragment_closed: gst::ClockTime) -> f32 { - let segment_duration = fragment_closed - self.fragment_opened_at.unwrap(); - segment_duration.mseconds() as f32 / 1_000f32 - } -} - -#[allow(clippy::large_enum_variant)] -enum State { - Stopped, - Started(StartedState), -} - -impl Default for State { - fn default() -> Self { - Self::Stopped - } +#[derive(Default)] +struct State { + context: Option, } #[derive(Default)] pub struct HlsSink3 { - settings: Arc>, - state: Arc>, + settings: Mutex, + state: Mutex, } impl HlsSink3 { @@ -176,12 +146,24 @@ impl HlsSink3 { }; let mut state = self.state.lock().unwrap(); - if let State::Stopped = *state { - *state = State::Started(StartedState::new( - target_duration, - playlist_type, - i_frames_only, - )); + state.context = Some(PlaylistContext { + pdt_base_utc: None, + pdt_base_running_time: None, + playlist: Playlist::new(target_duration, playlist_type, i_frames_only), + fragment_opened_at: None, + fragment_running_time: None, + current_segment_location: None, + old_segment_locations: Vec::new(), + }); + } + + fn stop(&self) { + let mut state = self.state.lock().unwrap(); + if let Some(mut context) = state.context.take() { + if context.playlist.is_rendering() { + context.playlist.stop(); + let _ = self.write_playlist(&mut context); + } } } @@ -193,11 +175,18 @@ impl HlsSink3 { fragment_id ); - // TODO: Create method in state to simplify this boilerplate: `let state = self.state.started()?` - let mut state_guard = self.state.lock().unwrap(); - let state = match &mut *state_guard { - State::Stopped => return Err("Not in Started state".to_string()), - State::Started(s) => s, + let mut state = self.state.lock().unwrap(); + let context = match state.context.as_mut() { + Some(context) => context, + None => { + gst::error!( + CAT, + imp: self, + "Playlist is not configured", + ); + + return Err(String::from("Playlist is not configured")); + } }; let settings = self.settings.lock().unwrap(); @@ -220,7 +209,7 @@ impl HlsSink3 { segment_file_location ); - state.current_segment_location = Some(segment_file_location.clone()); + context.current_segment_location = Some(segment_file_location.clone()); let fragment_stream = self .obj() @@ -238,7 +227,7 @@ impl HlsSink3 { CAT, imp: self, "New segment location: {:?}", - state.current_segment_location.as_ref() + context.current_segment_location.as_ref() ); Ok(segment_file_location) } @@ -276,31 +265,60 @@ impl HlsSink3 { }); } - fn write_playlist( - &self, - fragment_closed_at: Option, - date_time: Option>, - ) -> Result { - gst::info!(CAT, imp: self, "Preparing to write new playlist"); - - let mut state_guard = self.state.lock().unwrap(); - let state = match &mut *state_guard { - State::Stopped => return Err(gst::StateChangeError), - State::Started(s) => s, + fn on_fragment_closed(&self, closed_at: gst::ClockTime, date_time: Option>) { + let mut state = self.state.lock().unwrap(); + let context = match state.context.as_mut() { + Some(context) => context, + None => { + gst::error!(CAT, imp: self, "Playlist is not configured"); + return; + } }; - gst::info!(CAT, imp: self, "COUNT {}", state.playlist.len()); + let location = match context.current_segment_location.take() { + Some(location) => location, + None => { + gst::error!(CAT, imp: self, "Unknown segment location"); + return; + } + }; - // Only add fragment if it's complete. - if let Some(fragment_closed) = fragment_closed_at { - let segment_filename = self.segment_filename(state); - state.playlist.add_segment( - segment_filename.clone(), - state.fragment_duration_since(fragment_closed), - date_time, - ); - state.old_segment_locations.push(segment_filename); - } + let opened_at = match context.fragment_opened_at.take() { + Some(opened_at) => opened_at, + None => { + gst::error!(CAT, imp: self, "Unknown segment duration"); + return; + } + }; + + let duration = ((closed_at - opened_at).mseconds() as f32) / 1_000f32; + let file_name = path::Path::new(&location) + .file_name() + .unwrap() + .to_str() + .unwrap(); + + let settings = self.settings.lock().unwrap(); + let segment_file_name = if let Some(playlist_root) = &settings.playlist_root { + format!("{playlist_root}/{file_name}") + } else { + file_name.to_string() + }; + drop(settings); + + context + .playlist + .add_segment(segment_file_name, duration, date_time); + context.old_segment_locations.push(location); + + let _ = self.write_playlist(context); + } + + fn write_playlist( + &self, + context: &mut PlaylistContext, + ) -> Result { + gst::info!(CAT, imp: self, "Preparing to write new playlist, COUNT {}", context.playlist.len()); let (playlist_location, max_num_segments, max_playlist_length) = { let settings = self.settings.lock().unwrap(); @@ -311,7 +329,7 @@ impl HlsSink3 { ) }; - state.playlist.update_playlist_state(max_playlist_length); + context.playlist.update_playlist_state(max_playlist_length); // Acquires the playlist file handle so we can update it with new content. By default, this // is expected to be the same file every time. @@ -327,11 +345,11 @@ impl HlsSink3 { imp: self, "Could not get stream to write playlist content", ); - gst::StateChangeError + gst::FlowError::Error })? .into_write(); - state + context .playlist .write_to(&mut playlist_stream) .map_err(|err| { @@ -341,7 +359,7 @@ impl HlsSink3 { "Could not write new playlist: {}", err.to_string() ); - gst::StateChangeError + gst::FlowError::Error })?; playlist_stream.flush().map_err(|err| { gst::error!( @@ -350,14 +368,14 @@ impl HlsSink3 { "Could not flush playlist: {}", err.to_string() ); - gst::StateChangeError + gst::FlowError::Error })?; - if state.playlist.is_type_undefined() && max_num_segments > 0 { + if context.playlist.is_type_undefined() && max_num_segments > 0 { // Cleanup old segments from filesystem - if state.old_segment_locations.len() > max_num_segments { - for _ in 0..state.old_segment_locations.len() - max_num_segments { - let old_segment_location = state.old_segment_locations.remove(0); + if context.old_segment_locations.len() > max_num_segments { + for _ in 0..context.old_segment_locations.len() - max_num_segments { + let old_segment_location = context.old_segment_locations.remove(0); if !self .obj() .emit_by_name::(SIGNAL_DELETE_FRAGMENT, &[&old_segment_location]) @@ -369,40 +387,7 @@ impl HlsSink3 { } gst::debug!(CAT, imp: self, "Wrote new playlist file!"); - Ok(gst::StateChangeSuccess::Success) - } - - fn segment_filename(&self, state: &mut StartedState) -> String { - assert!(state.current_segment_location.is_some()); - let name = state.current_segment_location.take().unwrap(); - let segment_filename = path::Path::new(&name) - .file_name() - .unwrap() - .to_str() - .unwrap(); - - let settings = self.settings.lock().unwrap(); - if let Some(playlist_root) = &settings.playlist_root { - format!("{playlist_root}/{segment_filename}") - } else { - segment_filename.to_string() - } - } - - fn write_final_playlist(&self) -> Result { - gst::debug!(CAT, imp: self, "Preparing to write final playlist"); - self.write_playlist(None, None) - } - - fn stop(&self) { - gst::debug!(CAT, imp: self, "Stopping"); - - let mut state = self.state.lock().unwrap(); - if let State::Started(_) = *state { - *state = State::Stopped; - } - - gst::debug!(CAT, imp: self, "Stopped"); + Ok(gst::FlowSuccess::Ok) } } @@ -435,44 +420,39 @@ impl BinImpl for HlsSink3 { if let Ok(new_fragment_opened_at) = s.get::("running-time") { let mut state = self.state.lock().unwrap(); - match &mut *state { - State::Stopped => {} - State::Started(state) => { - state.fragment_opened_at = Some(new_fragment_opened_at) - } - }; + if let Some(context) = state.context.as_mut() { + context.fragment_opened_at = Some(new_fragment_opened_at); + } } } "splitmuxsink-fragment-closed" => { - let s = msg.structure().unwrap(); - let settings = self.settings.lock().unwrap(); - let mut state_guard = self.state.lock().unwrap(); - let state = match &mut *state_guard { - State::Stopped => { - gst::element_error!( - self.obj(), - gst::StreamError::Failed, - ("Fragment closed in wrong state"), - ["Fragment closed but element is in stopped state"] + let mut state = self.state.lock().unwrap(); + let context = match state.context.as_mut() { + Some(context) => context, + None => { + gst::error!( + CAT, + imp: self, + "Playlist is not configured", ); + return; } - State::Started(state) => state, }; - let fragment_pts = state + let fragment_pts = context .fragment_running_time .expect("fragment running time must be set by format-location-full"); - if state.pdt_base_running_time.is_none() { - state.pdt_base_running_time = state.fragment_running_time; + if context.pdt_base_running_time.is_none() { + context.pdt_base_running_time = context.fragment_running_time; } // Calculate the mapping from running time to UTC // calculate pdt_base_utc for each segment for !pdt_follows_pipeline_clock // when pdt_follows_pipeline_clock is set, we calculate the base time every time // this avoids the drift between pdt tag and external clock (if gst clock has skew w.r.t external clock) - if state.pdt_base_utc.is_none() || !settings.pdt_follows_pipeline_clock { + if context.pdt_base_utc.is_none() || !settings.pdt_follows_pipeline_clock { let now_utc = Utc::now(); let now_gst = settings.giostreamsink.clock().unwrap().time().unwrap(); let pts_clock_time = @@ -483,21 +463,21 @@ impl BinImpl for HlsSink3 { .checked_sub_signed(Duration::nanoseconds(diff.nseconds() as i64)) .expect("offsetting the utc with gstreamer clock-diff overflow"); - state.pdt_base_utc = Some(pts_utc); + context.pdt_base_utc = Some(pts_utc); } let fragment_date_time = if settings.enable_program_date_time - && state.pdt_base_running_time.is_some() + && context.pdt_base_running_time.is_some() { // Add the diff of running time to UTC time // date_time = first_segment_utc + (current_seg_running_time - first_seg_running_time) - state + context .pdt_base_utc .unwrap() .checked_add_signed(Duration::nanoseconds( - state + context .fragment_running_time - .opt_checked_sub(state.pdt_base_running_time) + .opt_checked_sub(context.pdt_base_running_time) .unwrap() .unwrap() .nseconds() as i64, @@ -505,12 +485,11 @@ impl BinImpl for HlsSink3 { } else { None }; - drop(state_guard); + drop(state); drop(settings); if let Ok(fragment_closed_at) = s.get::("running-time") { - let _ = - self.write_playlist(Some(fragment_closed_at), fragment_date_time); + self.on_fragment_closed(fragment_closed_at, fragment_date_time); } } _ => {} @@ -768,30 +747,29 @@ impl ObjectImpl for HlsSink3 { ]); obj.add(&settings.splitmuxsink).unwrap(); - let state = self.state.clone(); settings .splitmuxsink .connect("format-location-full", false, { - let self_weak = self.downgrade(); + let imp_weak = self.downgrade(); move |args| { - let self_ = match self_weak.upgrade() { - Some(self_) => self_, + let imp = match imp_weak.upgrade() { + Some(imp) => imp, None => return Some(None::.to_value()), }; let fragment_id = args[1].get::().unwrap(); - gst::info!(CAT, imp: self_, "Got fragment-id: {}", fragment_id); + gst::info!(CAT, imp: imp, "Got fragment-id: {}", fragment_id); - let mut state_guard = state.lock().unwrap(); - let state = match &mut *state_guard { - State::Stopped => { + let mut state = imp.state.lock().unwrap(); + let context = match state.context.as_mut() { + Some(context) => context, + None => { gst::error!( CAT, - imp: self_, + imp: imp, "on format location called with Stopped state" ); return Some("unknown_segment".to_value()); } - State::Started(s) => s, }; let sample = args[2].get::().unwrap(); @@ -802,22 +780,22 @@ impl ObjectImpl for HlsSink3 { .expect("segment not available") .downcast_ref::() .expect("no time segment"); - state.fragment_running_time = + context.fragment_running_time = segment.to_running_time(buffer.pts().unwrap()); } else { gst::warning!( CAT, - imp: self_, + imp: imp, "buffer null for fragment-id: {}", fragment_id ); } - drop(state_guard); + drop(state); - match self_.on_format_location(fragment_id) { + match imp.on_format_location(fragment_id) { Ok(segment_location) => Some(segment_location.to_value()), Err(err) => { - gst::error!(CAT, imp: self_, "on format-location handler: {}", err); + gst::error!(CAT, imp: imp, "on format-location handler: {}", err); Some("unknown_segment".to_value()) } } @@ -874,7 +852,7 @@ impl ElementImpl for HlsSink3 { &self, transition: gst::StateChange, ) -> Result { - if let gst::StateChange::NullToReady = transition { + if transition == gst::StateChange::ReadyToPaused { self.start(); } @@ -883,39 +861,15 @@ impl ElementImpl for HlsSink3 { match transition { gst::StateChange::PlayingToPaused => { let mut state = self.state.lock().unwrap(); - match &mut *state { - State::Stopped => (), - State::Started(state) => { - // reset mapping from rt to utc. during pause - // rt is stopped but utc keep moving so need to - // calculate the mapping again - state.pdt_base_running_time = None; - state.pdt_base_utc = None - } + if let Some(context) = state.context.as_mut() { + // reset mapping from rt to utc. during pause + // rt is stopped but utc keep moving so need to + // calculate the mapping again + context.pdt_base_running_time = None; + context.pdt_base_utc = None } } gst::StateChange::PausedToReady => { - let write_final = { - let mut state = self.state.lock().unwrap(); - match &mut *state { - State::Stopped => false, - State::Started(state) => { - if state.playlist.is_rendering() { - state.playlist.stop(); - true - } else { - false - } - } - } - }; - - if write_final { - // Don't fail transitioning to READY if this fails - let _ = self.write_final_playlist(); - } - } - gst::StateChange::ReadyToNull => { self.stop(); } _ => (),