hlssink3: Allow adding EXT-X-PROGRAM-DATE-TIME tag.

- connect to `format-location-full` it provide the first
sample of the fragment. preserve the running-time of the
first sample in fragment.
- on fragment-close message, find the mapping of running-time
to UTC time.
- on each subsequent fragment, calculate the offset of the
running-time with first fragment and add offset to base
utc time

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1145>
This commit is contained in:
rajneeshksoni 2023-04-10 22:11:39 +04:00 committed by Sebastian Dröge
parent 95a7a3c0ec
commit 4be24fdcaf
4 changed files with 161 additions and 20 deletions

View file

@ -2186,6 +2186,18 @@
} }
}, },
"properties": { "properties": {
"enable-program-date-time": {
"blurb": "put EXT-X-PROGRAM-DATE-TIME tag in the playlist",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "false",
"mutable": "null",
"readable": true,
"type": "gboolean",
"writable": true
},
"i-frames-only": { "i-frames-only": {
"blurb": "Each video segments is single iframe, So put EXT-X-I-FRAMES-ONLY tag in the playlist", "blurb": "Each video segments is single iframe, So put EXT-X-I-FRAMES-ONLY tag in the playlist",
"conditionally-available": false, "conditionally-available": false,

View file

@ -16,6 +16,7 @@ gio = { git = "https://github.com/gtk-rs/gtk-rs-core" }
once_cell = "1.7.2" once_cell = "1.7.2"
m3u8-rs = "5.0" m3u8-rs = "5.0"
regex = "1" regex = "1"
chrono = "0.4"
[dev-dependencies] [dev-dependencies]
gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }

View file

@ -8,6 +8,7 @@
use crate::playlist::{Playlist, SegmentFormatter}; use crate::playlist::{Playlist, SegmentFormatter};
use crate::HlsSink3PlaylistType; use crate::HlsSink3PlaylistType;
use chrono::{DateTime, Duration, Utc};
use gio::prelude::*; use gio::prelude::*;
use glib::subclass::prelude::*; use glib::subclass::prelude::*;
use gst::glib::once_cell::sync::Lazy; use gst::glib::once_cell::sync::Lazy;
@ -26,6 +27,7 @@ const DEFAULT_TARGET_DURATION: u32 = 15;
const DEFAULT_PLAYLIST_LENGTH: u32 = 5; const DEFAULT_PLAYLIST_LENGTH: u32 = 5;
const DEFAULT_PLAYLIST_TYPE: HlsSink3PlaylistType = HlsSink3PlaylistType::Unspecified; const DEFAULT_PLAYLIST_TYPE: HlsSink3PlaylistType = HlsSink3PlaylistType::Unspecified;
const DEFAULT_I_FRAMES_ONLY_PLAYLIST: bool = false; const DEFAULT_I_FRAMES_ONLY_PLAYLIST: bool = false;
const DEFAULT_PROGRAM_DATE_TIME_TAG: bool = false;
const DEFAULT_SEND_KEYFRAME_REQUESTS: bool = true; const DEFAULT_SEND_KEYFRAME_REQUESTS: bool = true;
const SIGNAL_GET_PLAYLIST_STREAM: &str = "get-playlist-stream"; const SIGNAL_GET_PLAYLIST_STREAM: &str = "get-playlist-stream";
@ -68,6 +70,7 @@ struct Settings {
max_num_segment_files: usize, max_num_segment_files: usize,
target_duration: u32, target_duration: u32,
i_frames_only: bool, i_frames_only: bool,
enable_program_date_time: bool,
send_keyframe_requests: bool, send_keyframe_requests: bool,
splitmuxsink: gst::Element, splitmuxsink: gst::Element,
@ -97,6 +100,7 @@ impl Default for Settings {
target_duration: DEFAULT_TARGET_DURATION, target_duration: DEFAULT_TARGET_DURATION,
send_keyframe_requests: DEFAULT_SEND_KEYFRAME_REQUESTS, send_keyframe_requests: DEFAULT_SEND_KEYFRAME_REQUESTS,
i_frames_only: DEFAULT_I_FRAMES_ONLY_PLAYLIST, i_frames_only: DEFAULT_I_FRAMES_ONLY_PLAYLIST,
enable_program_date_time: DEFAULT_PROGRAM_DATE_TIME_TAG,
splitmuxsink, splitmuxsink,
giostreamsink, giostreamsink,
@ -107,8 +111,11 @@ impl Default for Settings {
} }
pub(crate) struct StartedState { pub(crate) struct StartedState {
base_date_time: Option<DateTime<Utc>>,
base_running_time: Option<gst::ClockTime>,
playlist: Playlist, playlist: Playlist,
fragment_opened_at: Option<gst::ClockTime>, fragment_opened_at: Option<gst::ClockTime>,
fragment_running_time: Option<gst::ClockTime>,
current_segment_location: Option<String>, current_segment_location: Option<String>,
old_segment_locations: Vec<String>, old_segment_locations: Vec<String>,
} }
@ -120,9 +127,12 @@ impl StartedState {
i_frames_only: bool, i_frames_only: bool,
) -> Self { ) -> Self {
Self { Self {
base_date_time: None,
base_running_time: None,
playlist: Playlist::new(target_duration, playlist_type, i_frames_only), playlist: Playlist::new(target_duration, playlist_type, i_frames_only),
current_segment_location: None, current_segment_location: None,
fragment_opened_at: None, fragment_opened_at: None,
fragment_running_time: None,
old_segment_locations: Vec::new(), old_segment_locations: Vec::new(),
} }
} }
@ -257,6 +267,7 @@ impl HlsSink3 {
fn write_playlist( fn write_playlist(
&self, &self,
fragment_closed_at: Option<gst::ClockTime>, fragment_closed_at: Option<gst::ClockTime>,
date_time: Option<DateTime<Utc>>,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst::info!(CAT, imp: self, "Preparing to write new playlist"); gst::info!(CAT, imp: self, "Preparing to write new playlist");
@ -274,6 +285,7 @@ impl HlsSink3 {
state.playlist.add_segment( state.playlist.add_segment(
segment_filename.clone(), segment_filename.clone(),
state.fragment_duration_since(fragment_closed), state.fragment_duration_since(fragment_closed),
date_time,
); );
state.old_segment_locations.push(segment_filename); state.old_segment_locations.push(segment_filename);
} }
@ -362,7 +374,7 @@ impl HlsSink3 {
fn write_final_playlist(&self) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { fn write_final_playlist(&self) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst::debug!(CAT, imp: self, "Preparing to write final playlist"); gst::debug!(CAT, imp: self, "Preparing to write final playlist");
self.write_playlist(None) self.write_playlist(None, None)
} }
fn stop(&self) { fn stop(&self) {
@ -416,8 +428,66 @@ impl BinImpl for HlsSink3 {
} }
"splitmuxsink-fragment-closed" => { "splitmuxsink-fragment-closed" => {
let s = msg.structure().unwrap(); let s = msg.structure().unwrap();
let settings = self.settings.lock().unwrap();
let mut state_guard = self.state.lock().unwrap();
let state = match &mut *state_guard {
State::Stopped => {
gst::element_error!(
self.obj(),
gst::StreamError::Failed,
("Framented closed in wrong state"),
["Fragment closed but element is in stopped state"]
);
return;
}
State::Started(state) => state,
};
if state.base_running_time.is_none() && state.fragment_running_time.is_some() {
state.base_running_time = state.fragment_running_time;
}
// Calculate the mapping from running time to UTC
if state.base_date_time.is_none() && state.fragment_running_time.is_some() {
let fragment_pts = state.fragment_running_time.unwrap();
let now_utc = Utc::now();
let now_gst = settings.giostreamsink.clock().unwrap().time().unwrap();
let pts_clock_time =
fragment_pts + settings.giostreamsink.base_time().unwrap();
let diff = now_gst.checked_sub(pts_clock_time).unwrap();
let pts_utc = now_utc
.checked_sub_signed(Duration::nanoseconds(diff.nseconds() as i64))
.unwrap();
state.base_date_time = Some(pts_utc);
}
let fragment_date_time = if settings.enable_program_date_time
&& state.base_running_time.is_some()
&& state.fragment_running_time.is_some()
{
// Add the diff of running time to UTC time
// date_time = first_segment_utc + (current_seg_running_time - first_seg_running_time)
state.base_date_time.unwrap().checked_add_signed(
Duration::nanoseconds(
state
.fragment_running_time
.opt_checked_sub(state.base_running_time)
.unwrap()
.unwrap()
.nseconds() as i64,
),
)
} else {
None
};
drop(state_guard);
drop(settings);
if let Ok(fragment_closed_at) = s.get::<gst::ClockTime>("running-time") { if let Ok(fragment_closed_at) = s.get::<gst::ClockTime>("running-time") {
let _ = self.write_playlist(Some(fragment_closed_at)); let _ =
self.write_playlist(Some(fragment_closed_at), fragment_date_time);
} }
} }
_ => {} _ => {}
@ -469,6 +539,11 @@ impl ObjectImpl for HlsSink3 {
.blurb("Each video segments is single iframe, So put EXT-X-I-FRAMES-ONLY tag in the playlist") .blurb("Each video segments is single iframe, So put EXT-X-I-FRAMES-ONLY tag in the playlist")
.default_value(DEFAULT_I_FRAMES_ONLY_PLAYLIST) .default_value(DEFAULT_I_FRAMES_ONLY_PLAYLIST)
.build(), .build(),
glib::ParamSpecBoolean::builder("enable-program-date-time")
.nick("add EXT-X-PROGRAM-DATE-TIME tag")
.blurb("put EXT-X-PROGRAM-DATE-TIME tag in the playlist")
.default_value(DEFAULT_PROGRAM_DATE_TIME_TAG)
.build(),
glib::ParamSpecBoolean::builder("send-keyframe-requests") glib::ParamSpecBoolean::builder("send-keyframe-requests")
.nick("Send Keyframe Requests") .nick("Send Keyframe Requests")
.blurb("Send keyframe requests to ensure correct fragmentation. If this is disabled then the input must have keyframes in regular intervals.") .blurb("Send keyframe requests to ensure correct fragmentation. If this is disabled then the input must have keyframes in regular intervals.")
@ -536,6 +611,9 @@ impl ObjectImpl for HlsSink3 {
); );
} }
} }
"enable-program-date-time" => {
settings.enable_program_date_time = value.get().expect("type checked upstream");
}
"send-keyframe-requests" => { "send-keyframe-requests" => {
settings.send_keyframe_requests = value.get().expect("type checked upstream"); settings.send_keyframe_requests = value.get().expect("type checked upstream");
settings settings
@ -563,6 +641,7 @@ impl ObjectImpl for HlsSink3 {
playlist_type.to_value() playlist_type.to_value()
} }
"i-frames-only" => settings.i_frames_only.to_value(), "i-frames-only" => settings.i_frames_only.to_value(),
"enable-program-date-time" => settings.enable_program_date_time.to_value(),
"send-keyframe-requests" => settings.send_keyframe_requests.to_value(), "send-keyframe-requests" => settings.send_keyframe_requests.to_value(),
_ => unimplemented!(), _ => unimplemented!(),
} }
@ -660,27 +739,60 @@ impl ObjectImpl for HlsSink3 {
]); ]);
obj.add(&settings.splitmuxsink).unwrap(); obj.add(&settings.splitmuxsink).unwrap();
let state = self.state.clone();
settings
.splitmuxsink
.connect("format-location-full", false, {
let self_weak = self.downgrade();
move |args| {
let self_ = match self_weak.upgrade() {
Some(self_) => self_,
None => return Some(None::<String>.to_value()),
};
let fragment_id = args[1].get::<u32>().unwrap();
gst::info!(CAT, imp: self_, "Got fragment-id: {}", fragment_id);
settings.splitmuxsink.connect("format-location", false, { let mut state_guard = state.lock().unwrap();
let self_weak = self.downgrade(); let mut state = match &mut *state_guard {
move |args| { State::Stopped => {
let self_ = match self_weak.upgrade() { gst::error!(
Some(self_) => self_, CAT,
None => return Some(None::<String>.to_value()), imp: self_,
}; "on format location called with Stopped state"
let fragment_id = args[1].get::<u32>().unwrap(); );
return Some("unknown_segment".to_value());
}
State::Started(s) => s,
};
gst::info!(CAT, imp: self_, "Got fragment-id: {}", fragment_id); let sample = args[2].get::<gst::Sample>().unwrap();
let buffer = sample.buffer();
if let Some(buffer) = buffer {
let segment = sample
.segment()
.expect("segment not available")
.downcast_ref::<gst::ClockTime>()
.expect("no time segment");
state.fragment_running_time = segment.to_running_time(buffer.pts().unwrap());
} else {
gst::warning!(
CAT,
imp: self_,
"buffer null for fragment-id: {}",
fragment_id
);
}
drop(state_guard);
match self_.on_format_location(fragment_id) { match self_.on_format_location(fragment_id) {
Ok(segment_location) => Some(segment_location.to_value()), Ok(segment_location) => Some(segment_location.to_value()),
Err(err) => { Err(err) => {
gst::error!(CAT, imp: self_, "on format-location handler: {}", err); gst::error!(CAT, imp: self_, "on format-location handler: {}", err);
Some("unknown_segment".to_value()) Some("unknown_segment".to_value())
}
} }
} }
} });
});
} }
} }
@ -739,6 +851,19 @@ impl ElementImpl for HlsSink3 {
let ret = self.parent_change_state(transition)?; let ret = self.parent_change_state(transition)?;
match transition { match transition {
gst::StateChange::PlayingToPaused => {
let mut state = self.state.lock().unwrap();
match &mut *state {
State::Stopped => (),
State::Started(state) => {
// reset mapping from rt to utc. during pause
// rt is stopped but utc keep moving so need to
// calculate the mapping again
state.base_running_time = None;
state.base_date_time = None
}
}
}
gst::StateChange::PausedToReady => { gst::StateChange::PausedToReady => {
let write_final = { let write_final = {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();

View file

@ -6,6 +6,7 @@
// //
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use chrono::{DateTime, Utc};
use gst::glib::once_cell::sync::Lazy; use gst::glib::once_cell::sync::Lazy;
use m3u8_rs::{MediaPlaylist, MediaPlaylistType, MediaSegment}; use m3u8_rs::{MediaPlaylist, MediaPlaylistType, MediaSegment};
use regex::Regex; use regex::Regex;
@ -68,8 +69,10 @@ impl Playlist {
} }
/// Adds a new segment to the playlist. /// Adds a new segment to the playlist.
pub fn add_segment(&mut self, uri: String, duration: f32) { pub fn add_segment(&mut self, uri: String, duration: f32, date_time: Option<DateTime<Utc>>) {
self.start(); self.start();
// TODO: We are adding date-time to each segment, hence during write all the segments have
// program-date-time header.
self.inner.segments.push(MediaSegment { self.inner.segments.push(MediaSegment {
uri, uri,
duration, duration,
@ -78,7 +81,7 @@ impl Playlist {
discontinuity: false, discontinuity: false,
key: None, key: None,
map: None, map: None,
program_date_time: None, program_date_time: date_time.map(|d| d.into()),
daterange: None, daterange: None,
unknown_tags: vec![], unknown_tags: vec![],
}); });