hlssink3: Various cleanup

* Simplify state/playlist management
* Fix a bug that segment is not deleted if location contains directory
and playlist-root is unset
* Split playlist update routine into two steps, adding segment
to playlist and playlist write

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1306>
This commit is contained in:
Seungha Yang 2023-09-24 18:19:49 +09:00
parent d8546dd140
commit 0fe69cea9f

View file

@ -18,7 +18,7 @@ use m3u8_rs::MediaPlaylistType;
use std::fs; use std::fs;
use std::io::Write; use std::io::Write;
use std::path; use std::path;
use std::sync::{Arc, Mutex}; use std::sync::Mutex;
const DEFAULT_LOCATION: &str = "segment%05d.ts"; const DEFAULT_LOCATION: &str = "segment%05d.ts";
const DEFAULT_PLAYLIST_LOCATION: &str = "playlist.m3u8"; const DEFAULT_PLAYLIST_LOCATION: &str = "playlist.m3u8";
@ -111,7 +111,7 @@ impl Default for Settings {
} }
} }
pub(crate) struct StartedState { struct PlaylistContext {
pdt_base_utc: Option<DateTime<Utc>>, pdt_base_utc: Option<DateTime<Utc>>,
pdt_base_running_time: Option<gst::ClockTime>, pdt_base_running_time: Option<gst::ClockTime>,
playlist: Playlist, playlist: Playlist,
@ -121,45 +121,15 @@ pub(crate) struct StartedState {
old_segment_locations: Vec<String>, old_segment_locations: Vec<String>,
} }
impl StartedState { #[derive(Default)]
fn new( struct State {
target_duration: f32, context: Option<PlaylistContext>,
playlist_type: Option<MediaPlaylistType>,
i_frames_only: bool,
) -> Self {
Self {
pdt_base_utc: None,
pdt_base_running_time: None,
playlist: Playlist::new(target_duration, playlist_type, i_frames_only),
current_segment_location: None,
fragment_opened_at: None,
fragment_running_time: None,
old_segment_locations: Vec::new(),
}
}
fn fragment_duration_since(&self, fragment_closed: gst::ClockTime) -> f32 {
let segment_duration = fragment_closed - self.fragment_opened_at.unwrap();
segment_duration.mseconds() as f32 / 1_000f32
}
}
#[allow(clippy::large_enum_variant)]
enum State {
Stopped,
Started(StartedState),
}
impl Default for State {
fn default() -> Self {
Self::Stopped
}
} }
#[derive(Default)] #[derive(Default)]
pub struct HlsSink3 { pub struct HlsSink3 {
settings: Arc<Mutex<Settings>>, settings: Mutex<Settings>,
state: Arc<Mutex<State>>, state: Mutex<State>,
} }
impl HlsSink3 { impl HlsSink3 {
@ -176,12 +146,24 @@ impl HlsSink3 {
}; };
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
if let State::Stopped = *state { state.context = Some(PlaylistContext {
*state = State::Started(StartedState::new( pdt_base_utc: None,
target_duration, pdt_base_running_time: None,
playlist_type, playlist: Playlist::new(target_duration, playlist_type, i_frames_only),
i_frames_only, fragment_opened_at: None,
)); fragment_running_time: None,
current_segment_location: None,
old_segment_locations: Vec::new(),
});
}
fn stop(&self) {
let mut state = self.state.lock().unwrap();
if let Some(mut context) = state.context.take() {
if context.playlist.is_rendering() {
context.playlist.stop();
let _ = self.write_playlist(&mut context);
}
} }
} }
@ -193,11 +175,18 @@ impl HlsSink3 {
fragment_id fragment_id
); );
// TODO: Create method in state to simplify this boilerplate: `let state = self.state.started()?` let mut state = self.state.lock().unwrap();
let mut state_guard = self.state.lock().unwrap(); let context = match state.context.as_mut() {
let state = match &mut *state_guard { Some(context) => context,
State::Stopped => return Err("Not in Started state".to_string()), None => {
State::Started(s) => s, gst::error!(
CAT,
imp: self,
"Playlist is not configured",
);
return Err(String::from("Playlist is not configured"));
}
}; };
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
@ -220,7 +209,7 @@ impl HlsSink3 {
segment_file_location segment_file_location
); );
state.current_segment_location = Some(segment_file_location.clone()); context.current_segment_location = Some(segment_file_location.clone());
let fragment_stream = self let fragment_stream = self
.obj() .obj()
@ -238,7 +227,7 @@ impl HlsSink3 {
CAT, CAT,
imp: self, imp: self,
"New segment location: {:?}", "New segment location: {:?}",
state.current_segment_location.as_ref() context.current_segment_location.as_ref()
); );
Ok(segment_file_location) Ok(segment_file_location)
} }
@ -276,31 +265,60 @@ impl HlsSink3 {
}); });
} }
fn write_playlist( fn on_fragment_closed(&self, closed_at: gst::ClockTime, date_time: Option<DateTime<Utc>>) {
&self, let mut state = self.state.lock().unwrap();
fragment_closed_at: Option<gst::ClockTime>, let context = match state.context.as_mut() {
date_time: Option<DateTime<Utc>>, Some(context) => context,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { None => {
gst::info!(CAT, imp: self, "Preparing to write new playlist"); gst::error!(CAT, imp: self, "Playlist is not configured");
return;
let mut state_guard = self.state.lock().unwrap(); }
let state = match &mut *state_guard {
State::Stopped => return Err(gst::StateChangeError),
State::Started(s) => s,
}; };
gst::info!(CAT, imp: self, "COUNT {}", state.playlist.len()); let location = match context.current_segment_location.take() {
Some(location) => location,
None => {
gst::error!(CAT, imp: self, "Unknown segment location");
return;
}
};
// Only add fragment if it's complete. let opened_at = match context.fragment_opened_at.take() {
if let Some(fragment_closed) = fragment_closed_at { Some(opened_at) => opened_at,
let segment_filename = self.segment_filename(state); None => {
state.playlist.add_segment( gst::error!(CAT, imp: self, "Unknown segment duration");
segment_filename.clone(), return;
state.fragment_duration_since(fragment_closed), }
date_time, };
);
state.old_segment_locations.push(segment_filename); let duration = ((closed_at - opened_at).mseconds() as f32) / 1_000f32;
} let file_name = path::Path::new(&location)
.file_name()
.unwrap()
.to_str()
.unwrap();
let settings = self.settings.lock().unwrap();
let segment_file_name = if let Some(playlist_root) = &settings.playlist_root {
format!("{playlist_root}/{file_name}")
} else {
file_name.to_string()
};
drop(settings);
context
.playlist
.add_segment(segment_file_name, duration, date_time);
context.old_segment_locations.push(location);
let _ = self.write_playlist(context);
}
fn write_playlist(
&self,
context: &mut PlaylistContext,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::info!(CAT, imp: self, "Preparing to write new playlist, COUNT {}", context.playlist.len());
let (playlist_location, max_num_segments, max_playlist_length) = { let (playlist_location, max_num_segments, max_playlist_length) = {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
@ -311,7 +329,7 @@ impl HlsSink3 {
) )
}; };
state.playlist.update_playlist_state(max_playlist_length); context.playlist.update_playlist_state(max_playlist_length);
// Acquires the playlist file handle so we can update it with new content. By default, this // Acquires the playlist file handle so we can update it with new content. By default, this
// is expected to be the same file every time. // is expected to be the same file every time.
@ -327,11 +345,11 @@ impl HlsSink3 {
imp: self, imp: self,
"Could not get stream to write playlist content", "Could not get stream to write playlist content",
); );
gst::StateChangeError gst::FlowError::Error
})? })?
.into_write(); .into_write();
state context
.playlist .playlist
.write_to(&mut playlist_stream) .write_to(&mut playlist_stream)
.map_err(|err| { .map_err(|err| {
@ -341,7 +359,7 @@ impl HlsSink3 {
"Could not write new playlist: {}", "Could not write new playlist: {}",
err.to_string() err.to_string()
); );
gst::StateChangeError gst::FlowError::Error
})?; })?;
playlist_stream.flush().map_err(|err| { playlist_stream.flush().map_err(|err| {
gst::error!( gst::error!(
@ -350,14 +368,14 @@ impl HlsSink3 {
"Could not flush playlist: {}", "Could not flush playlist: {}",
err.to_string() err.to_string()
); );
gst::StateChangeError gst::FlowError::Error
})?; })?;
if state.playlist.is_type_undefined() && max_num_segments > 0 { if context.playlist.is_type_undefined() && max_num_segments > 0 {
// Cleanup old segments from filesystem // Cleanup old segments from filesystem
if state.old_segment_locations.len() > max_num_segments { if context.old_segment_locations.len() > max_num_segments {
for _ in 0..state.old_segment_locations.len() - max_num_segments { for _ in 0..context.old_segment_locations.len() - max_num_segments {
let old_segment_location = state.old_segment_locations.remove(0); let old_segment_location = context.old_segment_locations.remove(0);
if !self if !self
.obj() .obj()
.emit_by_name::<bool>(SIGNAL_DELETE_FRAGMENT, &[&old_segment_location]) .emit_by_name::<bool>(SIGNAL_DELETE_FRAGMENT, &[&old_segment_location])
@ -369,40 +387,7 @@ impl HlsSink3 {
} }
gst::debug!(CAT, imp: self, "Wrote new playlist file!"); gst::debug!(CAT, imp: self, "Wrote new playlist file!");
Ok(gst::StateChangeSuccess::Success) Ok(gst::FlowSuccess::Ok)
}
fn segment_filename(&self, state: &mut StartedState) -> String {
assert!(state.current_segment_location.is_some());
let name = state.current_segment_location.take().unwrap();
let segment_filename = path::Path::new(&name)
.file_name()
.unwrap()
.to_str()
.unwrap();
let settings = self.settings.lock().unwrap();
if let Some(playlist_root) = &settings.playlist_root {
format!("{playlist_root}/{segment_filename}")
} else {
segment_filename.to_string()
}
}
fn write_final_playlist(&self) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst::debug!(CAT, imp: self, "Preparing to write final playlist");
self.write_playlist(None, None)
}
fn stop(&self) {
gst::debug!(CAT, imp: self, "Stopping");
let mut state = self.state.lock().unwrap();
if let State::Started(_) = *state {
*state = State::Stopped;
}
gst::debug!(CAT, imp: self, "Stopped");
} }
} }
@ -435,44 +420,39 @@ impl BinImpl for HlsSink3 {
if let Ok(new_fragment_opened_at) = s.get::<gst::ClockTime>("running-time") if let Ok(new_fragment_opened_at) = s.get::<gst::ClockTime>("running-time")
{ {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
match &mut *state { if let Some(context) = state.context.as_mut() {
State::Stopped => {} context.fragment_opened_at = Some(new_fragment_opened_at);
State::Started(state) => { }
state.fragment_opened_at = Some(new_fragment_opened_at)
}
};
} }
} }
"splitmuxsink-fragment-closed" => { "splitmuxsink-fragment-closed" => {
let s = msg.structure().unwrap();
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
let mut state_guard = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
let state = match &mut *state_guard { let context = match state.context.as_mut() {
State::Stopped => { Some(context) => context,
gst::element_error!( None => {
self.obj(), gst::error!(
gst::StreamError::Failed, CAT,
("Fragment closed in wrong state"), imp: self,
["Fragment closed but element is in stopped state"] "Playlist is not configured",
); );
return; return;
} }
State::Started(state) => state,
}; };
let fragment_pts = state let fragment_pts = context
.fragment_running_time .fragment_running_time
.expect("fragment running time must be set by format-location-full"); .expect("fragment running time must be set by format-location-full");
if state.pdt_base_running_time.is_none() { if context.pdt_base_running_time.is_none() {
state.pdt_base_running_time = state.fragment_running_time; context.pdt_base_running_time = context.fragment_running_time;
} }
// Calculate the mapping from running time to UTC // Calculate the mapping from running time to UTC
// calculate pdt_base_utc for each segment for !pdt_follows_pipeline_clock // calculate pdt_base_utc for each segment for !pdt_follows_pipeline_clock
// when pdt_follows_pipeline_clock is set, we calculate the base time every time // when pdt_follows_pipeline_clock is set, we calculate the base time every time
// this avoids the drift between pdt tag and external clock (if gst clock has skew w.r.t external clock) // this avoids the drift between pdt tag and external clock (if gst clock has skew w.r.t external clock)
if state.pdt_base_utc.is_none() || !settings.pdt_follows_pipeline_clock { if context.pdt_base_utc.is_none() || !settings.pdt_follows_pipeline_clock {
let now_utc = Utc::now(); let now_utc = Utc::now();
let now_gst = settings.giostreamsink.clock().unwrap().time().unwrap(); let now_gst = settings.giostreamsink.clock().unwrap().time().unwrap();
let pts_clock_time = let pts_clock_time =
@ -483,21 +463,21 @@ impl BinImpl for HlsSink3 {
.checked_sub_signed(Duration::nanoseconds(diff.nseconds() as i64)) .checked_sub_signed(Duration::nanoseconds(diff.nseconds() as i64))
.expect("offsetting the utc with gstreamer clock-diff overflow"); .expect("offsetting the utc with gstreamer clock-diff overflow");
state.pdt_base_utc = Some(pts_utc); context.pdt_base_utc = Some(pts_utc);
} }
let fragment_date_time = if settings.enable_program_date_time let fragment_date_time = if settings.enable_program_date_time
&& state.pdt_base_running_time.is_some() && context.pdt_base_running_time.is_some()
{ {
// Add the diff of running time to UTC time // Add the diff of running time to UTC time
// date_time = first_segment_utc + (current_seg_running_time - first_seg_running_time) // date_time = first_segment_utc + (current_seg_running_time - first_seg_running_time)
state context
.pdt_base_utc .pdt_base_utc
.unwrap() .unwrap()
.checked_add_signed(Duration::nanoseconds( .checked_add_signed(Duration::nanoseconds(
state context
.fragment_running_time .fragment_running_time
.opt_checked_sub(state.pdt_base_running_time) .opt_checked_sub(context.pdt_base_running_time)
.unwrap() .unwrap()
.unwrap() .unwrap()
.nseconds() as i64, .nseconds() as i64,
@ -505,12 +485,11 @@ impl BinImpl for HlsSink3 {
} else { } else {
None None
}; };
drop(state_guard); drop(state);
drop(settings); drop(settings);
if let Ok(fragment_closed_at) = s.get::<gst::ClockTime>("running-time") { if let Ok(fragment_closed_at) = s.get::<gst::ClockTime>("running-time") {
let _ = self.on_fragment_closed(fragment_closed_at, fragment_date_time);
self.write_playlist(Some(fragment_closed_at), fragment_date_time);
} }
} }
_ => {} _ => {}
@ -768,30 +747,29 @@ impl ObjectImpl for HlsSink3 {
]); ]);
obj.add(&settings.splitmuxsink).unwrap(); obj.add(&settings.splitmuxsink).unwrap();
let state = self.state.clone();
settings settings
.splitmuxsink .splitmuxsink
.connect("format-location-full", false, { .connect("format-location-full", false, {
let self_weak = self.downgrade(); let imp_weak = self.downgrade();
move |args| { move |args| {
let self_ = match self_weak.upgrade() { let imp = match imp_weak.upgrade() {
Some(self_) => self_, Some(imp) => imp,
None => return Some(None::<String>.to_value()), None => return Some(None::<String>.to_value()),
}; };
let fragment_id = args[1].get::<u32>().unwrap(); let fragment_id = args[1].get::<u32>().unwrap();
gst::info!(CAT, imp: self_, "Got fragment-id: {}", fragment_id); gst::info!(CAT, imp: imp, "Got fragment-id: {}", fragment_id);
let mut state_guard = state.lock().unwrap(); let mut state = imp.state.lock().unwrap();
let state = match &mut *state_guard { let context = match state.context.as_mut() {
State::Stopped => { Some(context) => context,
None => {
gst::error!( gst::error!(
CAT, CAT,
imp: self_, imp: imp,
"on format location called with Stopped state" "on format location called with Stopped state"
); );
return Some("unknown_segment".to_value()); return Some("unknown_segment".to_value());
} }
State::Started(s) => s,
}; };
let sample = args[2].get::<gst::Sample>().unwrap(); let sample = args[2].get::<gst::Sample>().unwrap();
@ -802,22 +780,22 @@ impl ObjectImpl for HlsSink3 {
.expect("segment not available") .expect("segment not available")
.downcast_ref::<gst::ClockTime>() .downcast_ref::<gst::ClockTime>()
.expect("no time segment"); .expect("no time segment");
state.fragment_running_time = context.fragment_running_time =
segment.to_running_time(buffer.pts().unwrap()); segment.to_running_time(buffer.pts().unwrap());
} else { } else {
gst::warning!( gst::warning!(
CAT, CAT,
imp: self_, imp: imp,
"buffer null for fragment-id: {}", "buffer null for fragment-id: {}",
fragment_id fragment_id
); );
} }
drop(state_guard); drop(state);
match self_.on_format_location(fragment_id) { match imp.on_format_location(fragment_id) {
Ok(segment_location) => Some(segment_location.to_value()), Ok(segment_location) => Some(segment_location.to_value()),
Err(err) => { Err(err) => {
gst::error!(CAT, imp: self_, "on format-location handler: {}", err); gst::error!(CAT, imp: imp, "on format-location handler: {}", err);
Some("unknown_segment".to_value()) Some("unknown_segment".to_value())
} }
} }
@ -874,7 +852,7 @@ impl ElementImpl for HlsSink3 {
&self, &self,
transition: gst::StateChange, transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
if let gst::StateChange::NullToReady = transition { if transition == gst::StateChange::ReadyToPaused {
self.start(); self.start();
} }
@ -883,39 +861,15 @@ impl ElementImpl for HlsSink3 {
match transition { match transition {
gst::StateChange::PlayingToPaused => { gst::StateChange::PlayingToPaused => {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
match &mut *state { if let Some(context) = state.context.as_mut() {
State::Stopped => (), // reset mapping from rt to utc. during pause
State::Started(state) => { // rt is stopped but utc keep moving so need to
// reset mapping from rt to utc. during pause // calculate the mapping again
// rt is stopped but utc keep moving so need to context.pdt_base_running_time = None;
// calculate the mapping again context.pdt_base_utc = None
state.pdt_base_running_time = None;
state.pdt_base_utc = None
}
} }
} }
gst::StateChange::PausedToReady => { gst::StateChange::PausedToReady => {
let write_final = {
let mut state = self.state.lock().unwrap();
match &mut *state {
State::Stopped => false,
State::Started(state) => {
if state.playlist.is_rendering() {
state.playlist.stop();
true
} else {
false
}
}
}
};
if write_final {
// Don't fail transitioning to READY if this fails
let _ = self.write_final_playlist();
}
}
gst::StateChange::ReadyToNull => {
self.stop(); self.stop();
} }
_ => (), _ => (),