From d2e5cb43ccf6b2f7e7509fe797e72f3e7b6e073f Mon Sep 17 00:00:00 2001 From: Seungha Yang Date: Fri, 10 Nov 2023 22:49:40 +0900 Subject: [PATCH] hlssink3: Various cleanup * Simplify state/playlist management * Fix a bug that segment is not deleted if location contains directory and playlist-root is unset * Split playlist update routine into two steps, adding segment to playlist and playlist write Part-of: --- net/hlssink3/src/imp.rs | 259 +++++++++++++++++----------------------- 1 file changed, 109 insertions(+), 150 deletions(-) diff --git a/net/hlssink3/src/imp.rs b/net/hlssink3/src/imp.rs index 3d07a071c..0b4323fcc 100644 --- a/net/hlssink3/src/imp.rs +++ b/net/hlssink3/src/imp.rs @@ -17,7 +17,7 @@ use once_cell::sync::Lazy; use std::fs; use std::io::Write; use std::path; -use std::sync::{Arc, Mutex}; +use std::sync::Mutex; const DEFAULT_LOCATION: &str = "segment%05d.ts"; const DEFAULT_PLAYLIST_LOCATION: &str = "playlist.m3u8"; @@ -104,49 +104,22 @@ impl Default for Settings { } } -pub(crate) struct StartedState { +struct PlaylistContext { playlist: Playlist, fragment_opened_at: Option, current_segment_location: Option, old_segment_locations: Vec, } -impl StartedState { - fn new( - target_duration: f32, - playlist_type: Option, - i_frames_only: bool, - ) -> Self { - Self { - playlist: Playlist::new(target_duration, playlist_type, i_frames_only), - current_segment_location: None, - fragment_opened_at: None, - old_segment_locations: Vec::new(), - } - } - - fn fragment_duration_since(&self, fragment_closed: gst::ClockTime) -> f32 { - let segment_duration = fragment_closed - self.fragment_opened_at.unwrap(); - segment_duration.mseconds() as f32 / 1_000f32 - } -} - -#[allow(clippy::large_enum_variant)] -enum State { - Stopped, - Started(StartedState), -} - -impl Default for State { - fn default() -> Self { - Self::Stopped - } +#[derive(Default)] +struct State { + context: Option, } #[derive(Default)] pub struct HlsSink3 { - settings: Arc>, - state: Arc>, + settings: Mutex, + state: Mutex, } impl HlsSink3 { @@ -163,12 +136,21 @@ impl HlsSink3 { }; let mut state = self.state.lock().unwrap(); - if let State::Stopped = *state { - *state = State::Started(StartedState::new( - target_duration, - playlist_type, - i_frames_only, - )); + state.context = Some(PlaylistContext { + playlist: Playlist::new(target_duration, playlist_type, i_frames_only), + fragment_opened_at: None, + current_segment_location: None, + old_segment_locations: Vec::new(), + }); + } + + fn stop(&self) { + let mut state = self.state.lock().unwrap(); + if let Some(mut context) = state.context.take() { + if context.playlist.is_rendering() { + context.playlist.stop(); + let _ = self.write_playlist(&mut context); + } } } @@ -180,11 +162,14 @@ impl HlsSink3 { fragment_id ); - // TODO: Create method in state to simplify this boilerplate: `let state = self.state.started()?` - let mut state_guard = self.state.lock().unwrap(); - let state = match &mut *state_guard { - State::Stopped => return Err("Not in Started state".to_string()), - State::Started(s) => s, + let mut state = self.state.lock().unwrap(); + let context = match state.context.as_mut() { + Some(context) => context, + None => { + gst::error!(CAT, imp: self, "Playlist is not configured",); + + return Err(String::from("Playlist is not configured")); + } }; let settings = self.settings.lock().unwrap(); @@ -203,7 +188,7 @@ impl HlsSink3 { segment_file_location ); - state.current_segment_location = Some(segment_file_location.clone()); + context.current_segment_location = Some(segment_file_location.clone()); let fragment_stream = self .obj() @@ -221,7 +206,7 @@ impl HlsSink3 { CAT, imp: self, "New segment location: {:?}", - state.current_segment_location.as_ref() + context.current_segment_location.as_ref() ); Ok(segment_file_location) } @@ -259,29 +244,63 @@ impl HlsSink3 { }); } - fn write_playlist( - &self, - fragment_closed_at: Option, - ) -> Result { - gst::info!(CAT, imp: self, "Preparing to write new playlist"); - - let mut state_guard = self.state.lock().unwrap(); - let state = match &mut *state_guard { - State::Stopped => return Err(gst::StateChangeError), - State::Started(s) => s, + fn on_fragment_closed(&self, closed_at: gst::ClockTime) { + let mut state = self.state.lock().unwrap(); + let context = match state.context.as_mut() { + Some(context) => context, + None => { + gst::error!(CAT, imp: self, "Playlist is not configured"); + return; + } }; - gst::info!(CAT, imp: self, "COUNT {}", state.playlist.len()); + let location = match context.current_segment_location.take() { + Some(location) => location, + None => { + gst::error!(CAT, imp: self, "Unknown segment location"); + return; + } + }; - // Only add fragment if it's complete. - if let Some(fragment_closed) = fragment_closed_at { - let segment_filename = self.segment_filename(state); - state.playlist.add_segment( - segment_filename.clone(), - state.fragment_duration_since(fragment_closed), - ); - state.old_segment_locations.push(segment_filename); - } + let opened_at = match context.fragment_opened_at.take() { + Some(opened_at) => opened_at, + None => { + gst::error!(CAT, imp: self, "Unknown segment duration"); + return; + } + }; + + let duration = ((closed_at - opened_at).mseconds() as f32) / 1_000f32; + let file_name = path::Path::new(&location) + .file_name() + .unwrap() + .to_str() + .unwrap(); + + let settings = self.settings.lock().unwrap(); + let segment_file_name = if let Some(playlist_root) = &settings.playlist_root { + format!("{playlist_root}/{file_name}") + } else { + file_name.to_string() + }; + drop(settings); + + context.playlist.add_segment(segment_file_name, duration); + context.old_segment_locations.push(location); + + let _ = self.write_playlist(context); + } + + fn write_playlist( + &self, + context: &mut PlaylistContext, + ) -> Result { + gst::info!( + CAT, + imp: self, + "Preparing to write new playlist, COUNT {}", + context.playlist.len() + ); let (playlist_location, max_num_segments, max_playlist_length) = { let settings = self.settings.lock().unwrap(); @@ -292,7 +311,7 @@ impl HlsSink3 { ) }; - state.playlist.update_playlist_state(max_playlist_length); + context.playlist.update_playlist_state(max_playlist_length); // Acquires the playlist file handle so we can update it with new content. By default, this // is expected to be the same file every time. @@ -308,11 +327,11 @@ impl HlsSink3 { imp: self, "Could not get stream to write playlist content", ); - gst::StateChangeError + gst::FlowError::Error })? .into_write(); - state + context .playlist .write_to(&mut playlist_stream) .map_err(|err| { @@ -322,7 +341,7 @@ impl HlsSink3 { "Could not write new playlist: {}", err.to_string() ); - gst::StateChangeError + gst::FlowError::Error })?; playlist_stream.flush().map_err(|err| { gst::error!( @@ -331,14 +350,14 @@ impl HlsSink3 { "Could not flush playlist: {}", err.to_string() ); - gst::StateChangeError + gst::FlowError::Error })?; - if state.playlist.is_type_undefined() && max_num_segments > 0 { + if context.playlist.is_type_undefined() && max_num_segments > 0 { // Cleanup old segments from filesystem - if state.old_segment_locations.len() > max_num_segments { - for _ in 0..state.old_segment_locations.len() - max_num_segments { - let old_segment_location = state.old_segment_locations.remove(0); + if context.old_segment_locations.len() > max_num_segments { + for _ in 0..context.old_segment_locations.len() - max_num_segments { + let old_segment_location = context.old_segment_locations.remove(0); if !self .obj() .emit_by_name::(SIGNAL_DELETE_FRAGMENT, &[&old_segment_location]) @@ -350,40 +369,7 @@ impl HlsSink3 { } gst::debug!(CAT, imp: self, "Wrote new playlist file!"); - Ok(gst::StateChangeSuccess::Success) - } - - fn segment_filename(&self, state: &mut StartedState) -> String { - assert!(state.current_segment_location.is_some()); - let name = state.current_segment_location.take().unwrap(); - let segment_filename = path::Path::new(&name) - .file_name() - .unwrap() - .to_str() - .unwrap(); - - let settings = self.settings.lock().unwrap(); - if let Some(playlist_root) = &settings.playlist_root { - format!("{}/{}", playlist_root, segment_filename) - } else { - segment_filename.to_string() - } - } - - fn write_final_playlist(&self) -> Result { - gst::debug!(CAT, imp: self, "Preparing to write final playlist"); - self.write_playlist(None) - } - - fn stop(&self) { - gst::debug!(CAT, imp: self, "Stopping"); - - let mut state = self.state.lock().unwrap(); - if let State::Started(_) = *state { - *state = State::Stopped; - } - - gst::debug!(CAT, imp: self, "Stopped"); + Ok(gst::FlowSuccess::Ok) } } @@ -416,18 +402,15 @@ impl BinImpl for HlsSink3 { if let Ok(new_fragment_opened_at) = s.get::("running-time") { let mut state = self.state.lock().unwrap(); - match &mut *state { - State::Stopped => {} - State::Started(state) => { - state.fragment_opened_at = Some(new_fragment_opened_at) - } - }; + if let Some(context) = state.context.as_mut() { + context.fragment_opened_at = Some(new_fragment_opened_at); + } } } "splitmuxsink-fragment-closed" => { let s = msg.structure().unwrap(); if let Ok(fragment_closed_at) = s.get::("running-time") { - let _ = self.write_playlist(Some(fragment_closed_at)); + self.on_fragment_closed(fragment_closed_at); } } _ => {} @@ -670,20 +653,20 @@ impl ObjectImpl for HlsSink3 { obj.add(&settings.splitmuxsink).unwrap(); settings.splitmuxsink.connect("format-location", false, { - let self_weak = self.downgrade(); + let imp_weak = self.downgrade(); move |args| { - let self_ = match self_weak.upgrade() { - Some(self_) => self_, + let imp = match imp_weak.upgrade() { + Some(imp) => imp, None => return Some(None::.to_value()), }; let fragment_id = args[1].get::().unwrap(); - gst::info!(CAT, imp: self_, "Got fragment-id: {}", fragment_id); + gst::info!(CAT, imp: imp, "Got fragment-id: {}", fragment_id); - match self_.on_format_location(fragment_id) { + match imp.on_format_location(fragment_id) { Ok(segment_location) => Some(segment_location.to_value()), Err(err) => { - gst::error!(CAT, imp: self_, "on format-location handler: {}", err); + gst::error!(CAT, imp: imp, "on format-location handler: {}", err); Some("unknown_segment".to_value()) } } @@ -740,38 +723,14 @@ impl ElementImpl for HlsSink3 { &self, transition: gst::StateChange, ) -> Result { - if let gst::StateChange::NullToReady = transition { + if transition == gst::StateChange::ReadyToPaused { self.start(); } let ret = self.parent_change_state(transition)?; - match transition { - gst::StateChange::PausedToReady => { - let write_final = { - let mut state = self.state.lock().unwrap(); - match &mut *state { - State::Stopped => false, - State::Started(state) => { - if state.playlist.is_rendering() { - state.playlist.stop(); - true - } else { - false - } - } - } - }; - - if write_final { - // Don't fail transitioning to READY if this fails - let _ = self.write_final_playlist(); - } - } - gst::StateChange::ReadyToNull => { - self.stop(); - } - _ => (), + if transition == gst::StateChange::PausedToReady { + self.stop(); } Ok(ret)