hlssink3: Support NTP Timestamp in GstBuffer

We want to enable retrieving the segment start timestamp from the GST
buffer instead of calculating it.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2017>
This commit is contained in:
eipachte 2024-12-24 14:07:32 +02:00 committed by GStreamer Marge Bot
parent eb5c8276c0
commit a82a4f6d0f
5 changed files with 169 additions and 30 deletions

View file

@ -4540,7 +4540,7 @@
"writable": true "writable": true
}, },
"pdt-follows-pipeline-clock": { "pdt-follows-pipeline-clock": {
"blurb": "As there might be drift between the wallclock and pipeline clock, this controls whether the Program-Date-Time markers should follow the pipeline clock rate (true), or be skewed to match the wallclock rate (false).", "blurb": "As there might be drift between the wallclock and pipeline clock, this controls whether the Program-Date-Time markers should follow the pipeline clock rate (true), or be skewed to match the wallclock rate (false) (deprecated).",
"conditionally-available": false, "conditionally-available": false,
"construct": false, "construct": false,
"construct-only": false, "construct-only": false,
@ -4588,6 +4588,18 @@
"readable": true, "readable": true,
"type": "gchararray", "type": "gchararray",
"writable": true "writable": true
},
"program-date-time-reference": {
"blurb": "Sets the reference for program date time. Pipeline: Use pipeline clock. System: Use system clock (As there might be drift between the wallclock and pipeline clock), BufferReferenceTimestamp: Use buffer time from reference timestamp meta.",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "pipeline (0)",
"mutable": "null",
"readable": true,
"type": "GstHlsProgramDateTimeReference",
"writable": true
} }
}, },
"signals": { "signals": {
@ -4623,6 +4635,26 @@
} }
} }
}, },
"GstHlsProgramDateTimeReference": {
"kind": "enum",
"values": [
{
"desc": "Pipeline: Use the pipeline clock",
"name": "pipeline",
"value": "0"
},
{
"desc": "System: Use the system clock",
"name": "system",
"value": "1"
},
{
"desc": "BufferReferenceTimestamp: Use the buffer reference timestamp",
"name": "buffer-reference-timestamp",
"value": "2"
}
]
},
"GstHlsSink3PlaylistType": { "GstHlsSink3PlaylistType": {
"kind": "enum", "kind": "enum",
"values": [ "values": [

View file

@ -26,6 +26,8 @@ const DEFAULT_PLAYLIST_LENGTH: u32 = 5;
const DEFAULT_PROGRAM_DATE_TIME_TAG: bool = false; const DEFAULT_PROGRAM_DATE_TIME_TAG: bool = false;
const DEFAULT_CLOCK_TRACKING_FOR_PDT: bool = true; const DEFAULT_CLOCK_TRACKING_FOR_PDT: bool = true;
const DEFAULT_ENDLIST: bool = true; const DEFAULT_ENDLIST: bool = true;
const DEFAULT_PROGRAM_DATE_TIME_REFERENCE: HlsProgramDateTimeReference =
HlsProgramDateTimeReference::Pipeline;
const SIGNAL_GET_PLAYLIST_STREAM: &str = "get-playlist-stream"; const SIGNAL_GET_PLAYLIST_STREAM: &str = "get-playlist-stream";
const SIGNAL_GET_FRAGMENT_STREAM: &str = "get-fragment-stream"; const SIGNAL_GET_FRAGMENT_STREAM: &str = "get-fragment-stream";
@ -39,13 +41,29 @@ static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
) )
}); });
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)]
#[repr(u32)]
#[enum_type(name = "GstHlsProgramDateTimeReference")]
#[non_exhaustive]
pub enum HlsProgramDateTimeReference {
#[enum_value(name = "Pipeline: Use the pipeline clock", nick = "pipeline")]
Pipeline = 0,
#[enum_value(name = "System: Use the system clock", nick = "system")]
System = 1,
#[enum_value(
name = "BufferReferenceTimestamp: Use the buffer reference timestamp",
nick = "buffer-reference-timestamp"
)]
BufferReferenceTimestamp = 2,
}
struct Settings { struct Settings {
playlist_location: String, playlist_location: String,
playlist_root: Option<String>, playlist_root: Option<String>,
playlist_length: u32, playlist_length: u32,
max_num_segment_files: usize, max_num_segment_files: usize,
enable_program_date_time: bool, enable_program_date_time: bool,
pdt_follows_pipeline_clock: bool, program_date_time_reference: HlsProgramDateTimeReference,
enable_endlist: bool, enable_endlist: bool,
} }
@ -57,7 +75,7 @@ impl Default for Settings {
playlist_length: DEFAULT_PLAYLIST_LENGTH, playlist_length: DEFAULT_PLAYLIST_LENGTH,
max_num_segment_files: DEFAULT_MAX_NUM_SEGMENT_FILES as usize, max_num_segment_files: DEFAULT_MAX_NUM_SEGMENT_FILES as usize,
enable_program_date_time: DEFAULT_PROGRAM_DATE_TIME_TAG, enable_program_date_time: DEFAULT_PROGRAM_DATE_TIME_TAG,
pdt_follows_pipeline_clock: DEFAULT_CLOCK_TRACKING_FOR_PDT, program_date_time_reference: DEFAULT_PROGRAM_DATE_TIME_REFERENCE,
enable_endlist: DEFAULT_ENDLIST, enable_endlist: DEFAULT_ENDLIST,
} }
} }
@ -126,14 +144,18 @@ impl ObjectImpl for HlsBaseSink {
.blurb("Length of HLS playlist. To allow players to conform to section 6.3.3 of the HLS specification, this should be at least 3. If set to 0, the playlist will be infinite.") .blurb("Length of HLS playlist. To allow players to conform to section 6.3.3 of the HLS specification, this should be at least 3. If set to 0, the playlist will be infinite.")
.default_value(DEFAULT_PLAYLIST_LENGTH) .default_value(DEFAULT_PLAYLIST_LENGTH)
.build(), .build(),
glib::ParamSpecBoolean::builder("enable-program-date-time") glib::ParamSpecBoolean::builder("enable-program-date-time")
.nick("add EXT-X-PROGRAM-DATE-TIME tag") .nick("add EXT-X-PROGRAM-DATE-TIME tag")
.blurb("put EXT-X-PROGRAM-DATE-TIME tag in the playlist") .blurb("put EXT-X-PROGRAM-DATE-TIME tag in the playlist")
.default_value(DEFAULT_PROGRAM_DATE_TIME_TAG) .default_value(DEFAULT_PROGRAM_DATE_TIME_TAG)
.build(), .build(),
glib::ParamSpecEnum::builder_with_default("program-date-time-reference", DEFAULT_PROGRAM_DATE_TIME_REFERENCE)
.nick("Program Date Time Reference")
.blurb("Sets the reference for program date time. Pipeline: Use pipeline clock. System: Use system clock (As there might be drift between the wallclock and pipeline clock), BufferReferenceTimestamp: Use buffer time from reference timestamp meta.")
.build(),
glib::ParamSpecBoolean::builder("pdt-follows-pipeline-clock") glib::ParamSpecBoolean::builder("pdt-follows-pipeline-clock")
.nick("Whether Program-Date-Time should follow the pipeline clock") .nick("Whether Program-Date-Time should follow the pipeline clock")
.blurb("As there might be drift between the wallclock and pipeline clock, this controls whether the Program-Date-Time markers should follow the pipeline clock rate (true), or be skewed to match the wallclock rate (false).") .blurb("As there might be drift between the wallclock and pipeline clock, this controls whether the Program-Date-Time markers should follow the pipeline clock rate (true), or be skewed to match the wallclock rate (false) (deprecated).")
.default_value(DEFAULT_CLOCK_TRACKING_FOR_PDT) .default_value(DEFAULT_CLOCK_TRACKING_FOR_PDT)
.build(), .build(),
glib::ParamSpecBoolean::builder("enable-endlist") glib::ParamSpecBoolean::builder("enable-endlist")
@ -172,7 +194,16 @@ impl ObjectImpl for HlsBaseSink {
settings.enable_program_date_time = value.get().expect("type checked upstream"); settings.enable_program_date_time = value.get().expect("type checked upstream");
} }
"pdt-follows-pipeline-clock" => { "pdt-follows-pipeline-clock" => {
settings.pdt_follows_pipeline_clock = value.get().expect("type checked upstream"); gst::warning!(CAT, "The 'pdt-follows-pipeline-clock' property is deprecated. Use 'program-date-time-reference' instead.");
settings.program_date_time_reference =
if value.get().expect("type checked upstream") {
HlsProgramDateTimeReference::Pipeline
} else {
HlsProgramDateTimeReference::System
};
}
"program-date-time-reference" => {
settings.program_date_time_reference = value.get().expect("type checked upstream");
} }
"enable-endlist" => { "enable-endlist" => {
settings.enable_endlist = value.get().expect("type checked upstream"); settings.enable_endlist = value.get().expect("type checked upstream");
@ -192,7 +223,10 @@ impl ObjectImpl for HlsBaseSink {
} }
"playlist-length" => settings.playlist_length.to_value(), "playlist-length" => settings.playlist_length.to_value(),
"enable-program-date-time" => settings.enable_program_date_time.to_value(), "enable-program-date-time" => settings.enable_program_date_time.to_value(),
"pdt-follows-pipeline-clock" => settings.pdt_follows_pipeline_clock.to_value(), "pdt-follows-pipeline-clock" => (settings.program_date_time_reference
== HlsProgramDateTimeReference::Pipeline)
.to_value(),
"program-date-time-reference" => settings.program_date_time_reference.to_value(),
"enable-endlist" => settings.enable_endlist.to_value(), "enable-endlist" => settings.enable_endlist.to_value(),
_ => unimplemented!(), _ => unimplemented!(),
} }
@ -367,6 +401,7 @@ impl HlsBaseSink {
location: &str, location: &str,
running_time: Option<gst::ClockTime>, running_time: Option<gst::ClockTime>,
duration: gst::ClockTime, duration: gst::ClockTime,
timestamp: Option<DateTime<Utc>>,
mut segment: MediaSegment, mut segment: MediaSegment,
) -> Result<gst::FlowSuccess, gst::FlowError> { ) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
@ -387,10 +422,12 @@ impl HlsBaseSink {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
// Calculate the mapping from running time to UTC // Calculate the mapping from running time to UTC
// calculate pdt_base_utc for each segment for !pdt_follows_pipeline_clock // calculate pdt_base_utc for each segment if program_date_time_reference == System
// when pdt_follows_pipeline_clock is set, we calculate the base time every time // when following system clock, we calculate the base time every time
// this avoids the drift between pdt tag and external clock (if gst clock has skew w.r.t external clock) // this avoids the drift between pdt tag and external clock (if gst clock has skew w.r.t external clock)
if context.pdt_base_utc.is_none() || !settings.pdt_follows_pipeline_clock { if context.pdt_base_utc.is_none()
|| settings.program_date_time_reference == HlsProgramDateTimeReference::System
{
let obj = self.obj(); let obj = self.obj();
let now_utc = Utc::now(); let now_utc = Utc::now();
let now_gst = obj.clock().unwrap().time().unwrap(); let now_gst = obj.clock().unwrap().time().unwrap();
@ -405,22 +442,32 @@ impl HlsBaseSink {
} }
if settings.enable_program_date_time { if settings.enable_program_date_time {
// Add the diff of running time to UTC time // if program_date_time_reference == BufferReferenceTimestamp and timestamp is provided, use the timestamp provided in the buffer
// date_time = first_segment_utc + (current_seg_running_time - first_seg_running_time) if settings.program_date_time_reference
let date_time = == HlsProgramDateTimeReference::BufferReferenceTimestamp
context && timestamp.is_some()
.pdt_base_utc {
.unwrap() if let Some(timestamp) = timestamp {
.checked_add_signed(Duration::nanoseconds( segment.program_date_time = Some(timestamp.into());
running_time }
.opt_checked_sub(context.pdt_base_running_time) } else {
.unwrap() // Add the diff of running time to UTC time
.unwrap() // date_time = first_segment_utc + (current_seg_running_time - first_seg_running_time)
.nseconds() as i64, let date_time =
)); context
.pdt_base_utc
.unwrap()
.checked_add_signed(Duration::nanoseconds(
running_time
.opt_checked_sub(context.pdt_base_running_time)
.unwrap()
.unwrap()
.nseconds() as i64,
));
if let Some(date_time) = date_time { if let Some(date_time) = date_time {
segment.program_date_time = Some(date_time.into()); segment.program_date_time = Some(date_time.into());
}
} }
} }
} }
@ -432,12 +479,18 @@ impl HlsBaseSink {
} }
self.write_playlist(context).inspect(|_res| { self.write_playlist(context).inspect(|_res| {
let s = gst::Structure::builder("hls-segment-added") let mut s = gst::Structure::builder("hls-segment-added")
.field("location", location) .field("location", location)
.field("running-time", running_time.unwrap()) .field("running-time", running_time.unwrap())
.field("duration", duration) .field("duration", duration);
.build(); if let Some(ts) = timestamp {
self.post_message(gst::message::Element::builder(s).src(&*self.obj()).build()); s = s.field("timestamp", ts.timestamp());
};
self.post_message(
gst::message::Element::builder(s.build())
.src(&*self.obj())
.build(),
);
}) })
} }

View file

@ -10,6 +10,7 @@ use crate::hlsbasesink::HlsBaseSinkImpl;
use crate::hlssink3::HlsSink3PlaylistType; use crate::hlssink3::HlsSink3PlaylistType;
use crate::playlist::Playlist; use crate::playlist::Playlist;
use crate::HlsBaseSink; use crate::HlsBaseSink;
use chrono::{DateTime, Utc};
use gio::prelude::*; use gio::prelude::*;
use gst::glib; use gst::glib;
use gst::prelude::*; use gst::prelude::*;
@ -461,6 +462,7 @@ impl HlsCmafSink {
duration: gst::ClockTime, duration: gst::ClockTime,
running_time: Option<gst::ClockTime>, running_time: Option<gst::ClockTime>,
location: String, location: String,
timestamp: Option<DateTime<Utc>>,
) -> Result<gst::FlowSuccess, gst::FlowError> { ) -> Result<gst::FlowSuccess, gst::FlowError> {
let uri = base_imp!(self).get_segment_uri(&location, None); let uri = base_imp!(self).get_segment_uri(&location, None);
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
@ -476,6 +478,7 @@ impl HlsCmafSink {
&location, &location,
running_time, running_time,
duration, duration,
timestamp,
MediaSegment { MediaSegment {
uri, uri,
duration: duration.mseconds() as f32 / 1_000f32, duration: duration.mseconds() as f32 / 1_000f32,
@ -558,6 +561,6 @@ impl HlsCmafSink {
gst::FlowError::Error gst::FlowError::Error
})?; })?;
self.add_segment(duration, running_time, location) self.add_segment(duration, running_time, location, None)
} }
} }

View file

@ -11,6 +11,7 @@ use crate::hlsbasesink::HlsBaseSinkImpl;
use crate::hlssink3::HlsSink3PlaylistType; use crate::hlssink3::HlsSink3PlaylistType;
use crate::playlist::Playlist; use crate::playlist::Playlist;
use crate::HlsBaseSink; use crate::HlsBaseSink;
use chrono::{DateTime, Utc};
use gio::prelude::*; use gio::prelude::*;
use gst::glib; use gst::glib;
use gst::prelude::*; use gst::prelude::*;
@ -35,6 +36,40 @@ macro_rules! base_imp {
}; };
} }
/// Offset between NTP and UNIX epoch in seconds.
/// NTP = UNIX + NTP_UNIX_OFFSET.
const NTP_UNIX_OFFSET: u64 = 2_208_988_800;
/// Reference timestamp meta caps for NTP timestamps.
static NTP_CAPS: LazyLock<gst::Caps> =
LazyLock::new(|| gst::Caps::builder("timestamp/x-ntp").build());
/// Reference timestamp meta caps for UNIX timestamps.
static UNIX_CAPS: LazyLock<gst::Caps> =
LazyLock::new(|| gst::Caps::builder("timestamp/x-unix").build());
/// Returns the UTC time of the buffer in the UNIX epoch.
fn get_utc_time_from_buffer(buffer: &gst::BufferRef) -> Option<DateTime<Utc>> {
buffer
.iter_meta::<gst::ReferenceTimestampMeta>()
.find_map(|meta| {
if meta.reference().can_intersect(&UNIX_CAPS) {
Some(meta.timestamp())
} else if meta.reference().can_intersect(&NTP_CAPS) {
meta.timestamp().checked_sub(NTP_UNIX_OFFSET.seconds())
} else {
None
}
})
.and_then(|clock_time| {
let time_nsec = clock_time.nseconds();
let one_sec = gst::ClockTime::SECOND.nseconds();
let timestamp_secs = (time_nsec / one_sec).try_into().ok()?;
let timestamp_nanos = (time_nsec % one_sec).try_into().ok()?;
DateTime::<Utc>::from_timestamp(timestamp_secs, timestamp_nanos)
})
}
impl From<HlsSink3PlaylistType> for Option<MediaPlaylistType> { impl From<HlsSink3PlaylistType> for Option<MediaPlaylistType> {
fn from(pl_type: HlsSink3PlaylistType) -> Self { fn from(pl_type: HlsSink3PlaylistType) -> Self {
use HlsSink3PlaylistType::*; use HlsSink3PlaylistType::*;
@ -125,6 +160,7 @@ struct HlsSink3State {
fragment_opened_at: Option<gst::ClockTime>, fragment_opened_at: Option<gst::ClockTime>,
fragment_running_time: Option<gst::ClockTime>, fragment_running_time: Option<gst::ClockTime>,
current_segment_location: Option<String>, current_segment_location: Option<String>,
fragment_start_timestamp: Option<DateTime<Utc>>,
} }
#[derive(Default)] #[derive(Default)]
@ -255,6 +291,9 @@ impl ObjectImpl for HlsSink3 {
let sample = args[2].get::<gst::Sample>().unwrap(); let sample = args[2].get::<gst::Sample>().unwrap();
let buffer = sample.buffer(); let buffer = sample.buffer();
let buffer_timestamp = buffer.and_then(get_utc_time_from_buffer);
let running_time = if let Some(buffer) = buffer { let running_time = if let Some(buffer) = buffer {
let segment = sample let segment = sample
.segment() .segment()
@ -272,6 +311,13 @@ impl ObjectImpl for HlsSink3 {
None None
}; };
{
let mut state = imp.state.lock().unwrap();
if state.fragment_start_timestamp.is_none() {
state.fragment_start_timestamp = buffer_timestamp;
}
}
match imp.on_format_location(fragment_id, running_time) { match imp.on_format_location(fragment_id, running_time) {
Ok(segment_location) => Some(segment_location.to_value()), Ok(segment_location) => Some(segment_location.to_value()),
Err(err) => { Err(err) => {
@ -578,6 +624,8 @@ impl HlsSink3 {
}; };
let running_time = state.fragment_running_time; let running_time = state.fragment_running_time;
let fragment_start_timestamp = state.fragment_start_timestamp.take();
drop(state); drop(state);
let obj = self.obj(); let obj = self.obj();
@ -587,6 +635,7 @@ impl HlsSink3 {
&location, &location,
running_time, running_time,
duration, duration,
fragment_start_timestamp,
MediaSegment { MediaSegment {
uri, uri,
duration: duration_msec, duration: duration_msec,

View file

@ -28,6 +28,8 @@ pub fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
{ {
use gst::prelude::*; use gst::prelude::*;
HlsBaseSink::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); HlsBaseSink::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
hlsbasesink::HlsProgramDateTimeReference::static_type()
.mark_as_plugin_api(gst::PluginAPIFlags::empty());
} }
hlssink3::register(plugin)?; hlssink3::register(plugin)?;