ndisrc: Move timestamp handling from demuxer to source

This allows putting correct timestamps on buffers coming out of the
source already instead of leaving them unset until the demuxer.

And also calculate timestamps for metadata buffers.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1718>
This commit is contained in:
Sebastian Dröge 2024-08-07 14:01:01 +03:00 committed by GStreamer Marge Bot
parent 1c48d7065d
commit eb0a44fe67
3 changed files with 532 additions and 508 deletions

View file

@ -1,11 +1,14 @@
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use atomic_refcell::AtomicRefCell;
use gst::prelude::*; use gst::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
use gst_base::prelude::*; use gst_base::prelude::*;
use gst_base::subclass::base_src::CreateSuccess; use gst_base::subclass::base_src::CreateSuccess;
use gst_base::subclass::prelude::*; use gst_base::subclass::prelude::*;
use std::cmp;
use std::collections::VecDeque;
use std::sync::Mutex; use std::sync::Mutex;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
@ -66,7 +69,10 @@ impl Default for Settings {
#[derive(Default)] #[derive(Default)]
struct State { struct State {
receiver: Option<Receiver>, receiver: Option<Receiver>,
// Audio/video/metadata time observations
timestamp_mode: TimestampMode, timestamp_mode: TimestampMode,
observations_timestamp: [Observations; 3],
observations_timecode: [Observations; 3],
current_latency: Option<gst::ClockTime>, current_latency: Option<gst::ClockTime>,
} }
@ -536,7 +542,58 @@ impl BaseSrcImpl for NdiSrc {
let mut gst_buffer = gst::Buffer::new(); let mut gst_buffer = gst::Buffer::new();
{ {
let buffer_ref = gst_buffer.get_mut().unwrap(); let buffer_ref = gst_buffer.get_mut().unwrap();
NdiSrcMeta::add(buffer_ref, ndi_buffer, state.timestamp_mode); let ((pts, duration, resync), discont) = match ndi_buffer {
Buffer::Audio {
ref frame,
discont,
receive_time_gst,
receive_time_real,
} => (
self.calculate_audio_timestamp(
&mut state,
receive_time_gst,
receive_time_real,
frame,
),
discont,
),
Buffer::Video {
ref frame,
discont,
receive_time_gst,
receive_time_real,
} => (
self.calculate_video_timestamp(
&mut state,
receive_time_gst,
receive_time_real,
frame,
),
discont,
),
Buffer::Metadata {
ref frame,
receive_time_gst,
receive_time_real,
} => (
self.calculate_metadata_timestamp(
&mut state,
receive_time_gst,
receive_time_real,
frame,
),
false,
),
};
buffer_ref.set_pts(pts);
buffer_ref.set_duration(duration);
if resync {
buffer_ref.set_flags(gst::BufferFlags::RESYNC);
}
if discont {
buffer_ref.set_flags(gst::BufferFlags::DISCONT);
}
NdiSrcMeta::add(buffer_ref, ndi_buffer);
} }
drop(state); drop(state);
@ -555,3 +612,465 @@ impl BaseSrcImpl for NdiSrc {
} }
} }
} }
impl NdiSrc {
#[allow(clippy::too_many_arguments)]
fn calculate_timestamp(
&self,
state: &mut State,
idx: usize,
receive_time_gst: gst::ClockTime,
receive_time_real: gst::ClockTime,
timestamp: i64,
timecode: i64,
duration: Option<gst::ClockTime>,
) -> (gst::ClockTime, Option<gst::ClockTime>, bool) {
let timestamp = if timestamp == ndisys::NDIlib_recv_timestamp_undefined {
gst::ClockTime::NONE
} else {
Some((timestamp as u64 * 100).nseconds())
};
let timecode = (timecode as u64 * 100).nseconds();
gst::log!(
CAT,
imp = self,
"Received frame with timecode {}, timestamp {}, duration {}, receive time {}, local time now {}",
timecode,
timestamp.display(),
duration.display(),
receive_time_gst.display(),
receive_time_real,
);
let res_timestamp = state.observations_timestamp[idx].process(
self.obj().upcast_ref(),
timestamp,
receive_time_gst,
duration,
);
let res_timecode = state.observations_timecode[idx].process(
self.obj().upcast_ref(),
Some(timecode),
receive_time_gst,
duration,
);
let (pts, duration, discont) = match state.timestamp_mode {
TimestampMode::ReceiveTimeTimecode => match res_timecode {
Some((pts, duration, discont)) => (pts, duration, discont),
None => {
gst::warning!(CAT, imp = self, "Can't calculate timestamp");
(receive_time_gst, duration, false)
}
},
TimestampMode::ReceiveTimeTimestamp => match res_timestamp {
Some((pts, duration, discont)) => (pts, duration, discont),
None => {
if timestamp.is_some() {
gst::warning!(CAT, imp = self, "Can't calculate timestamp");
}
(receive_time_gst, duration, false)
}
},
TimestampMode::Timecode => (timecode, duration, false),
TimestampMode::Timestamp if timestamp.is_none() => (receive_time_gst, duration, false),
TimestampMode::Timestamp => {
// Timestamps are relative to the UNIX epoch
let timestamp = timestamp.unwrap();
if receive_time_real > timestamp {
let diff = receive_time_real - timestamp;
if diff > receive_time_gst {
(gst::ClockTime::ZERO, duration, false)
} else {
(receive_time_gst - diff, duration, false)
}
} else {
let diff = timestamp - receive_time_real;
(receive_time_gst + diff, duration, false)
}
}
TimestampMode::ReceiveTime => (receive_time_gst, duration, false),
TimestampMode::Auto => {
res_timecode
.or(res_timestamp)
.unwrap_or((receive_time_gst, duration, false))
}
};
gst::log!(
CAT,
imp = self,
"Calculated PTS {}, duration {}",
pts.display(),
duration.display(),
);
(pts, duration, discont)
}
fn calculate_video_timestamp(
&self,
state: &mut State,
receive_time_gst: gst::ClockTime,
receive_time_real: gst::ClockTime,
video_frame: &crate::ndi::VideoFrame,
) -> (gst::ClockTime, Option<gst::ClockTime>, bool) {
let duration = gst::ClockTime::SECOND.mul_div_floor(
video_frame.frame_rate().1 as u64,
video_frame.frame_rate().0 as u64,
);
self.calculate_timestamp(
state,
0,
receive_time_gst,
receive_time_real,
video_frame.timestamp(),
video_frame.timecode(),
duration,
)
}
fn calculate_audio_timestamp(
&self,
state: &mut State,
receive_time_gst: gst::ClockTime,
receive_time_real: gst::ClockTime,
audio_frame: &crate::ndi::AudioFrame,
) -> (gst::ClockTime, Option<gst::ClockTime>, bool) {
let duration = gst::ClockTime::SECOND.mul_div_floor(
audio_frame.no_samples() as u64,
audio_frame.sample_rate() as u64,
);
self.calculate_timestamp(
state,
1,
receive_time_gst,
receive_time_real,
audio_frame.timestamp(),
audio_frame.timecode(),
duration,
)
}
fn calculate_metadata_timestamp(
&self,
state: &mut State,
receive_time_gst: gst::ClockTime,
receive_time_real: gst::ClockTime,
metadata_frame: &crate::ndi::MetadataFrame,
) -> (gst::ClockTime, Option<gst::ClockTime>, bool) {
self.calculate_timestamp(
state,
2,
receive_time_gst,
receive_time_real,
ndisys::NDIlib_recv_timestamp_undefined,
metadata_frame.timecode(),
gst::ClockTime::NONE,
)
}
}
const PREFILL_WINDOW_LENGTH: usize = 12;
const WINDOW_LENGTH: u64 = 512;
const WINDOW_DURATION: u64 = 2_000_000_000;
#[derive(Default)]
struct Observations(AtomicRefCell<ObservationsInner>);
struct ObservationsInner {
base_remote_time: Option<u64>,
base_local_time: Option<u64>,
deltas: VecDeque<i64>,
min_delta: i64,
skew: i64,
filling: bool,
window_size: usize,
// Remote/local times for workaround around fundamentally wrong slopes
// This is not reset below and has a bigger window.
times: VecDeque<(u64, u64)>,
slope_correction: (u64, u64),
}
impl Default for ObservationsInner {
fn default() -> ObservationsInner {
ObservationsInner {
base_local_time: None,
base_remote_time: None,
deltas: VecDeque::new(),
min_delta: 0,
skew: 0,
filling: true,
window_size: 0,
times: VecDeque::new(),
slope_correction: (1, 1),
}
}
}
impl ObservationsInner {
fn reset(&mut self) {
self.base_local_time = None;
self.base_remote_time = None;
self.deltas = VecDeque::new();
self.min_delta = 0;
self.skew = 0;
self.filling = true;
self.window_size = 0;
}
}
impl Observations {
// Based on the algorithm used in GStreamer's rtpjitterbuffer, which comes from
// Fober, Orlarey and Letz, 2005, "Real Time Clock Skew Estimation over Network Delays":
// http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.102.1546
fn process(
&self,
element: &gst::Element,
remote_time: Option<gst::ClockTime>,
local_time: gst::ClockTime,
duration: Option<gst::ClockTime>,
) -> Option<(gst::ClockTime, Option<gst::ClockTime>, bool)> {
let remote_time = remote_time?.nseconds();
let local_time = local_time.nseconds();
let mut inner = self.0.borrow_mut();
gst::trace!(
CAT,
obj = element,
"Local time {}, remote time {}, slope correct {}/{}",
local_time.nseconds(),
remote_time.nseconds(),
inner.slope_correction.0,
inner.slope_correction.1,
);
inner.times.push_back((remote_time, local_time));
while inner
.times
.back()
.unwrap()
.1
.saturating_sub(inner.times.front().unwrap().1)
> WINDOW_DURATION
{
let _ = inner.times.pop_front();
}
// Static remote times
if inner.slope_correction.1 == 0 {
return None;
}
let remote_time =
remote_time.mul_div_round(inner.slope_correction.0, inner.slope_correction.1)?;
let (base_remote_time, base_local_time) =
match (inner.base_remote_time, inner.base_local_time) {
(Some(remote), Some(local)) => (remote, local),
_ => {
gst::debug!(
CAT,
obj = element,
"Initializing base time: local {}, remote {}",
local_time.nseconds(),
remote_time.nseconds(),
);
inner.base_remote_time = Some(remote_time);
inner.base_local_time = Some(local_time);
return Some((local_time.nseconds(), duration, true));
}
};
if inner.times.len() < PREFILL_WINDOW_LENGTH {
return Some((local_time.nseconds(), duration, false));
}
// Check if the slope is simply wrong and try correcting
{
let local_diff = inner
.times
.back()
.unwrap()
.1
.saturating_sub(inner.times.front().unwrap().1);
let remote_diff = inner
.times
.back()
.unwrap()
.0
.saturating_sub(inner.times.front().unwrap().0);
if remote_diff == 0 {
inner.reset();
inner.base_remote_time = Some(remote_time);
inner.base_local_time = Some(local_time);
// Static remote times
inner.slope_correction = (0, 0);
return None;
} else {
let slope = local_diff as f64 / remote_diff as f64;
let scaled_slope =
slope * (inner.slope_correction.1 as f64) / (inner.slope_correction.0 as f64);
// Check for some obviously wrong slopes and try to correct for that
if !(0.5..1.5).contains(&scaled_slope) {
gst::warning!(
CAT,
obj = element,
"Too small/big slope {}, resetting",
scaled_slope
);
let discont = !inner.deltas.is_empty();
inner.reset();
if (0.0005..0.0015).contains(&slope) {
// Remote unit was actually 0.1ns
inner.slope_correction = (1, 1000);
} else if (0.005..0.015).contains(&slope) {
// Remote unit was actually 1ns
inner.slope_correction = (1, 100);
} else if (0.05..0.15).contains(&slope) {
// Remote unit was actually 10ns
inner.slope_correction = (1, 10);
} else if (5.0..15.0).contains(&slope) {
// Remote unit was actually 1us
inner.slope_correction = (10, 1);
} else if (50.0..150.0).contains(&slope) {
// Remote unit was actually 10us
inner.slope_correction = (100, 1);
} else if (50.0..150.0).contains(&slope) {
// Remote unit was actually 100us
inner.slope_correction = (1000, 1);
} else if (50.0..150.0).contains(&slope) {
// Remote unit was actually 1ms
inner.slope_correction = (10000, 1);
} else {
inner.slope_correction = (1, 1);
}
let remote_time = inner
.times
.back()
.unwrap()
.0
.mul_div_round(inner.slope_correction.0, inner.slope_correction.1)?;
gst::debug!(
CAT,
obj = element,
"Initializing base time: local {}, remote {}, slope correction {}/{}",
local_time.nseconds(),
remote_time.nseconds(),
inner.slope_correction.0,
inner.slope_correction.1,
);
inner.base_remote_time = Some(remote_time);
inner.base_local_time = Some(local_time);
return Some((local_time.nseconds(), duration, discont));
}
}
}
let remote_diff = remote_time.saturating_sub(base_remote_time);
let local_diff = local_time.saturating_sub(base_local_time);
let delta = (local_diff as i64) - (remote_diff as i64);
gst::trace!(
CAT,
obj = element,
"Local diff {}, remote diff {}, delta {}",
local_diff.nseconds(),
remote_diff.nseconds(),
delta,
);
if (delta > inner.skew && delta - inner.skew > 1_000_000_000)
|| (delta < inner.skew && inner.skew - delta > 1_000_000_000)
{
gst::warning!(
CAT,
obj = element,
"Delta {} too far from skew {}, resetting",
delta,
inner.skew
);
let discont = !inner.deltas.is_empty();
gst::debug!(
CAT,
obj = element,
"Initializing base time: local {}, remote {}",
local_time.nseconds(),
remote_time.nseconds(),
);
inner.reset();
inner.base_remote_time = Some(remote_time);
inner.base_local_time = Some(local_time);
return Some((local_time.nseconds(), duration, discont));
}
if inner.filling {
if inner.deltas.is_empty() || delta < inner.min_delta {
inner.min_delta = delta;
}
inner.deltas.push_back(delta);
if remote_diff > WINDOW_DURATION || inner.deltas.len() as u64 == WINDOW_LENGTH {
inner.window_size = inner.deltas.len();
inner.skew = inner.min_delta;
inner.filling = false;
} else {
let perc_time = remote_diff.mul_div_floor(100, WINDOW_DURATION).unwrap() as i64;
let perc_window = (inner.deltas.len() as u64)
.mul_div_floor(100, WINDOW_LENGTH)
.unwrap() as i64;
let perc = cmp::max(perc_time, perc_window);
inner.skew = (perc * inner.min_delta + ((10_000 - perc) * inner.skew)) / 10_000;
}
} else {
let old = inner.deltas.pop_front().unwrap();
inner.deltas.push_back(delta);
if delta <= inner.min_delta {
inner.min_delta = delta;
} else if old == inner.min_delta {
inner.min_delta = inner.deltas.iter().copied().min().unwrap();
}
inner.skew = (inner.min_delta + (124 * inner.skew)) / 125;
}
let out_time = base_local_time + remote_diff;
let out_time = if inner.skew < 0 {
out_time.saturating_sub((-inner.skew) as u64)
} else {
out_time + (inner.skew as u64)
};
gst::trace!(
CAT,
obj = element,
"Skew {}, min delta {}",
inner.skew,
inner.min_delta
);
gst::trace!(CAT, obj = element, "Outputting {}", out_time.nseconds());
Some((out_time.nseconds(), duration, false))
}
}

View file

@ -1,19 +1,18 @@
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use atomic_refcell::AtomicRefCell;
use gst::prelude::*; use gst::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
use gst_video::prelude::*; use gst_video::prelude::*;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use std::{cmp, collections::VecDeque, sync::Mutex}; use std::sync::Mutex;
use byte_slice_cast::*; use byte_slice_cast::*;
use crate::{ use crate::{
ndi_cc_meta::NDICCMetaDecoder, ndi_cc_meta::NDICCMetaDecoder,
ndisrcmeta::{self, Buffer}, ndisrcmeta::{self, Buffer},
ndisys, TimestampMode, ndisys,
}; };
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
@ -41,11 +40,6 @@ struct State {
ndi_cc_decoder: Option<NDICCMetaDecoder>, ndi_cc_decoder: Option<NDICCMetaDecoder>,
pending_metadata: Vec<crate::ndi::MetadataFrame>, pending_metadata: Vec<crate::ndi::MetadataFrame>,
// Audio/video time observations
timestamp_mode: TimestampMode,
observations_timestamp: [Observations; 2],
observations_timecode: [Observations; 2],
} }
impl Default for State { impl Default for State {
@ -67,10 +61,6 @@ impl Default for State {
ndi_cc_decoder: None, ndi_cc_decoder: None,
pending_metadata: Vec::new(), pending_metadata: Vec::new(),
timestamp_mode: TimestampMode::Auto,
observations_timestamp: [Observations::default(), Observations::default()],
observations_timecode: [Observations::default(), Observations::default()],
} }
} }
} }
@ -212,6 +202,11 @@ impl NdiSrcDemux {
) -> Result<gst::FlowSuccess, gst::FlowError> { ) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::log!(CAT, imp = self, "Handling buffer {:?}", buffer); gst::log!(CAT, imp = self, "Handling buffer {:?}", buffer);
let pts = buffer.pts();
let duration = buffer.duration();
let resync = buffer.flags().contains(gst::BufferFlags::RESYNC);
let discont = buffer.flags().contains(gst::BufferFlags::DISCONT);
let mut meta = buffer let mut meta = buffer
.make_mut() .make_mut()
.meta_mut::<ndisrcmeta::NdiSrcMeta>() .meta_mut::<ndisrcmeta::NdiSrcMeta>()
@ -490,51 +485,15 @@ impl NdiSrcDemux {
let srcpad; let srcpad;
let buffer; let buffer;
match ndi_buffer { match ndi_buffer {
Buffer::Audio { Buffer::Audio { frame, .. } => {
frame,
discont,
receive_time_gst,
receive_time_real,
} => {
srcpad = state.audio_pad.clone().unwrap(); srcpad = state.audio_pad.clone().unwrap();
let (pts, duration, resync) = self
.calculate_audio_timestamp(
&mut state,
receive_time_gst,
receive_time_real,
&frame,
)
.ok_or_else(|| {
gst::debug!(CAT, imp = self, "Flushing, dropping buffer");
gst::FlowError::Flushing
})?;
buffer = self.create_audio_buffer(&state, pts, duration, discont, resync, frame)?; buffer = self.create_audio_buffer(&state, pts, duration, discont, resync, frame)?;
gst::log!(CAT, imp = self, "Produced audio buffer {:?}", buffer); gst::log!(CAT, imp = self, "Produced audio buffer {:?}", buffer);
} }
Buffer::Video { Buffer::Video { frame, .. } => {
frame,
discont,
receive_time_gst,
receive_time_real,
} => {
srcpad = state.video_pad.clone().unwrap(); srcpad = state.video_pad.clone().unwrap();
let (pts, duration, resync) = self
.calculate_video_timestamp(
&mut state,
receive_time_gst,
receive_time_real,
&frame,
)
.ok_or_else(|| {
gst::debug!(CAT, imp = self, "Flushing, dropping buffer");
gst::FlowError::Flushing
})?;
buffer = buffer =
self.create_video_buffer(&mut state, pts, duration, discont, resync, frame)?; self.create_video_buffer(&mut state, pts, duration, discont, resync, frame)?;
gst::log!(CAT, imp = self, "Produced video buffer {:?}", buffer); gst::log!(CAT, imp = self, "Produced video buffer {:?}", buffer);
} }
Buffer::Metadata { frame, .. } => { Buffer::Metadata { frame, .. } => {
@ -610,126 +569,6 @@ impl NdiSrcDemux {
} }
impl NdiSrcDemux { impl NdiSrcDemux {
#[allow(clippy::too_many_arguments)]
fn calculate_timestamp(
&self,
state: &mut State,
is_audio: bool,
receive_time_gst: gst::ClockTime,
receive_time_real: gst::ClockTime,
timestamp: i64,
timecode: i64,
duration: Option<gst::ClockTime>,
) -> Option<(gst::ClockTime, Option<gst::ClockTime>, bool)> {
let timestamp = if timestamp == ndisys::NDIlib_recv_timestamp_undefined {
gst::ClockTime::NONE
} else {
Some((timestamp as u64 * 100).nseconds())
};
let timecode = (timecode as u64 * 100).nseconds();
gst::log!(
CAT,
imp = self,
"Received frame with timecode {}, timestamp {}, duration {}, receive time {}, local time now {}",
timecode,
timestamp.display(),
duration.display(),
receive_time_gst.display(),
receive_time_real,
);
let res_timestamp = state.observations_timestamp[usize::from(!is_audio)].process(
self.obj().upcast_ref(),
timestamp,
receive_time_gst,
duration,
);
let res_timecode = state.observations_timecode[usize::from(!is_audio)].process(
self.obj().upcast_ref(),
Some(timecode),
receive_time_gst,
duration,
);
let (pts, duration, discont) = match state.timestamp_mode {
TimestampMode::ReceiveTimeTimecode => match res_timecode {
Some((pts, duration, discont)) => (pts, duration, discont),
None => {
gst::warning!(CAT, imp = self, "Can't calculate timestamp");
(receive_time_gst, duration, false)
}
},
TimestampMode::ReceiveTimeTimestamp => match res_timestamp {
Some((pts, duration, discont)) => (pts, duration, discont),
None => {
if timestamp.is_some() {
gst::warning!(CAT, imp = self, "Can't calculate timestamp");
}
(receive_time_gst, duration, false)
}
},
TimestampMode::Timecode => (timecode, duration, false),
TimestampMode::Timestamp if timestamp.is_none() => (receive_time_gst, duration, false),
TimestampMode::Timestamp => {
// Timestamps are relative to the UNIX epoch
let timestamp = timestamp?;
if receive_time_real > timestamp {
let diff = receive_time_real - timestamp;
if diff > receive_time_gst {
(gst::ClockTime::ZERO, duration, false)
} else {
(receive_time_gst - diff, duration, false)
}
} else {
let diff = timestamp - receive_time_real;
(receive_time_gst + diff, duration, false)
}
}
TimestampMode::ReceiveTime => (receive_time_gst, duration, false),
TimestampMode::Auto => {
res_timecode
.or(res_timestamp)
.unwrap_or((receive_time_gst, duration, false))
}
};
gst::log!(
CAT,
imp = self,
"Calculated PTS {}, duration {}",
pts.display(),
duration.display(),
);
Some((pts, duration, discont))
}
fn calculate_video_timestamp(
&self,
state: &mut State,
receive_time_gst: gst::ClockTime,
receive_time_real: gst::ClockTime,
video_frame: &crate::ndi::VideoFrame,
) -> Option<(gst::ClockTime, Option<gst::ClockTime>, bool)> {
let duration = gst::ClockTime::SECOND.mul_div_floor(
video_frame.frame_rate().1 as u64,
video_frame.frame_rate().0 as u64,
);
self.calculate_timestamp(
state,
false,
receive_time_gst,
receive_time_real,
video_frame.timestamp(),
video_frame.timecode(),
duration,
)
}
fn create_video_buffer_pool(&self, video_info: &gst_video::VideoInfo) -> gst::BufferPool { fn create_video_buffer_pool(&self, video_info: &gst_video::VideoInfo) -> gst::BufferPool {
let pool = gst_video::VideoBufferPool::new(); let pool = gst_video::VideoBufferPool::new();
let mut config = pool.config(); let mut config = pool.config();
@ -958,7 +797,7 @@ impl NdiSrcDemux {
fn create_video_buffer( fn create_video_buffer(
&self, &self,
state: &mut State, state: &mut State,
pts: gst::ClockTime, pts: Option<gst::ClockTime>,
duration: Option<gst::ClockTime>, duration: Option<gst::ClockTime>,
discont: bool, discont: bool,
resync: bool, resync: bool,
@ -1356,29 +1195,6 @@ impl NdiSrcDemux {
} }
} }
fn calculate_audio_timestamp(
&self,
state: &mut State,
receive_time_gst: gst::ClockTime,
receive_time_real: gst::ClockTime,
audio_frame: &crate::ndi::AudioFrame,
) -> Option<(gst::ClockTime, Option<gst::ClockTime>, bool)> {
let duration = gst::ClockTime::SECOND.mul_div_floor(
audio_frame.no_samples() as u64,
audio_frame.sample_rate() as u64,
);
self.calculate_timestamp(
state,
true,
receive_time_gst,
receive_time_real,
audio_frame.timestamp(),
audio_frame.timecode(),
duration,
)
}
fn create_audio_info( fn create_audio_info(
&self, &self,
audio_frame: &crate::ndi::AudioFrame, audio_frame: &crate::ndi::AudioFrame,
@ -1461,7 +1277,7 @@ impl NdiSrcDemux {
fn create_audio_buffer( fn create_audio_buffer(
&self, &self,
state: &State, state: &State,
pts: gst::ClockTime, pts: Option<gst::ClockTime>,
duration: Option<gst::ClockTime>, duration: Option<gst::ClockTime>,
discont: bool, discont: bool,
resync: bool, resync: bool,
@ -1761,302 +1577,3 @@ impl VideoInfo {
} }
} }
} }
const PREFILL_WINDOW_LENGTH: usize = 12;
const WINDOW_LENGTH: u64 = 512;
const WINDOW_DURATION: u64 = 2_000_000_000;
#[derive(Default)]
struct Observations(AtomicRefCell<ObservationsInner>);
struct ObservationsInner {
base_remote_time: Option<u64>,
base_local_time: Option<u64>,
deltas: VecDeque<i64>,
min_delta: i64,
skew: i64,
filling: bool,
window_size: usize,
// Remote/local times for workaround around fundamentally wrong slopes
// This is not reset below and has a bigger window.
times: VecDeque<(u64, u64)>,
slope_correction: (u64, u64),
}
impl Default for ObservationsInner {
fn default() -> ObservationsInner {
ObservationsInner {
base_local_time: None,
base_remote_time: None,
deltas: VecDeque::new(),
min_delta: 0,
skew: 0,
filling: true,
window_size: 0,
times: VecDeque::new(),
slope_correction: (1, 1),
}
}
}
impl ObservationsInner {
fn reset(&mut self) {
self.base_local_time = None;
self.base_remote_time = None;
self.deltas = VecDeque::new();
self.min_delta = 0;
self.skew = 0;
self.filling = true;
self.window_size = 0;
}
}
impl Observations {
// Based on the algorithm used in GStreamer's rtpjitterbuffer, which comes from
// Fober, Orlarey and Letz, 2005, "Real Time Clock Skew Estimation over Network Delays":
// http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.102.1546
fn process(
&self,
element: &gst::Element,
remote_time: Option<gst::ClockTime>,
local_time: gst::ClockTime,
duration: Option<gst::ClockTime>,
) -> Option<(gst::ClockTime, Option<gst::ClockTime>, bool)> {
let remote_time = remote_time?.nseconds();
let local_time = local_time.nseconds();
let mut inner = self.0.borrow_mut();
gst::trace!(
CAT,
obj = element,
"Local time {}, remote time {}, slope correct {}/{}",
local_time.nseconds(),
remote_time.nseconds(),
inner.slope_correction.0,
inner.slope_correction.1,
);
inner.times.push_back((remote_time, local_time));
while inner
.times
.back()
.unwrap()
.1
.saturating_sub(inner.times.front().unwrap().1)
> WINDOW_DURATION
{
let _ = inner.times.pop_front();
}
// Static remote times
if inner.slope_correction.1 == 0 {
return None;
}
let remote_time =
remote_time.mul_div_round(inner.slope_correction.0, inner.slope_correction.1)?;
let (base_remote_time, base_local_time) =
match (inner.base_remote_time, inner.base_local_time) {
(Some(remote), Some(local)) => (remote, local),
_ => {
gst::debug!(
CAT,
obj = element,
"Initializing base time: local {}, remote {}",
local_time.nseconds(),
remote_time.nseconds(),
);
inner.base_remote_time = Some(remote_time);
inner.base_local_time = Some(local_time);
return Some((local_time.nseconds(), duration, true));
}
};
if inner.times.len() < PREFILL_WINDOW_LENGTH {
return Some((local_time.nseconds(), duration, false));
}
// Check if the slope is simply wrong and try correcting
{
let local_diff = inner
.times
.back()
.unwrap()
.1
.saturating_sub(inner.times.front().unwrap().1);
let remote_diff = inner
.times
.back()
.unwrap()
.0
.saturating_sub(inner.times.front().unwrap().0);
if remote_diff == 0 {
inner.reset();
inner.base_remote_time = Some(remote_time);
inner.base_local_time = Some(local_time);
// Static remote times
inner.slope_correction = (0, 0);
return None;
} else {
let slope = local_diff as f64 / remote_diff as f64;
let scaled_slope =
slope * (inner.slope_correction.1 as f64) / (inner.slope_correction.0 as f64);
// Check for some obviously wrong slopes and try to correct for that
if !(0.5..1.5).contains(&scaled_slope) {
gst::warning!(
CAT,
obj = element,
"Too small/big slope {}, resetting",
scaled_slope
);
let discont = !inner.deltas.is_empty();
inner.reset();
if (0.0005..0.0015).contains(&slope) {
// Remote unit was actually 0.1ns
inner.slope_correction = (1, 1000);
} else if (0.005..0.015).contains(&slope) {
// Remote unit was actually 1ns
inner.slope_correction = (1, 100);
} else if (0.05..0.15).contains(&slope) {
// Remote unit was actually 10ns
inner.slope_correction = (1, 10);
} else if (5.0..15.0).contains(&slope) {
// Remote unit was actually 1us
inner.slope_correction = (10, 1);
} else if (50.0..150.0).contains(&slope) {
// Remote unit was actually 10us
inner.slope_correction = (100, 1);
} else if (50.0..150.0).contains(&slope) {
// Remote unit was actually 100us
inner.slope_correction = (1000, 1);
} else if (50.0..150.0).contains(&slope) {
// Remote unit was actually 1ms
inner.slope_correction = (10000, 1);
} else {
inner.slope_correction = (1, 1);
}
let remote_time = inner
.times
.back()
.unwrap()
.0
.mul_div_round(inner.slope_correction.0, inner.slope_correction.1)?;
gst::debug!(
CAT,
obj = element,
"Initializing base time: local {}, remote {}, slope correction {}/{}",
local_time.nseconds(),
remote_time.nseconds(),
inner.slope_correction.0,
inner.slope_correction.1,
);
inner.base_remote_time = Some(remote_time);
inner.base_local_time = Some(local_time);
return Some((local_time.nseconds(), duration, discont));
}
}
}
let remote_diff = remote_time.saturating_sub(base_remote_time);
let local_diff = local_time.saturating_sub(base_local_time);
let delta = (local_diff as i64) - (remote_diff as i64);
gst::trace!(
CAT,
obj = element,
"Local diff {}, remote diff {}, delta {}",
local_diff.nseconds(),
remote_diff.nseconds(),
delta,
);
if (delta > inner.skew && delta - inner.skew > 1_000_000_000)
|| (delta < inner.skew && inner.skew - delta > 1_000_000_000)
{
gst::warning!(
CAT,
obj = element,
"Delta {} too far from skew {}, resetting",
delta,
inner.skew
);
let discont = !inner.deltas.is_empty();
gst::debug!(
CAT,
obj = element,
"Initializing base time: local {}, remote {}",
local_time.nseconds(),
remote_time.nseconds(),
);
inner.reset();
inner.base_remote_time = Some(remote_time);
inner.base_local_time = Some(local_time);
return Some((local_time.nseconds(), duration, discont));
}
if inner.filling {
if inner.deltas.is_empty() || delta < inner.min_delta {
inner.min_delta = delta;
}
inner.deltas.push_back(delta);
if remote_diff > WINDOW_DURATION || inner.deltas.len() as u64 == WINDOW_LENGTH {
inner.window_size = inner.deltas.len();
inner.skew = inner.min_delta;
inner.filling = false;
} else {
let perc_time = remote_diff.mul_div_floor(100, WINDOW_DURATION).unwrap() as i64;
let perc_window = (inner.deltas.len() as u64)
.mul_div_floor(100, WINDOW_LENGTH)
.unwrap() as i64;
let perc = cmp::max(perc_time, perc_window);
inner.skew = (perc * inner.min_delta + ((10_000 - perc) * inner.skew)) / 10_000;
}
} else {
let old = inner.deltas.pop_front().unwrap();
inner.deltas.push_back(delta);
if delta <= inner.min_delta {
inner.min_delta = delta;
} else if old == inner.min_delta {
inner.min_delta = inner.deltas.iter().copied().min().unwrap();
}
inner.skew = (inner.min_delta + (124 * inner.skew)) / 125;
}
let out_time = base_local_time + remote_diff;
let out_time = if inner.skew < 0 {
out_time.saturating_sub((-inner.skew) as u64)
} else {
out_time + (inner.skew as u64)
};
gst::trace!(
CAT,
obj = element,
"Skew {}, min delta {}",
inner.skew,
inner.min_delta
);
gst::trace!(CAT, obj = element, "Outputting {}", out_time.nseconds());
Some((out_time.nseconds(), duration, false))
}
}

View file

@ -5,7 +5,6 @@ use std::fmt;
use std::mem; use std::mem;
use crate::ndi::{AudioFrame, MetadataFrame, VideoFrame}; use crate::ndi::{AudioFrame, MetadataFrame, VideoFrame};
use crate::TimestampMode;
#[repr(transparent)] #[repr(transparent)]
pub struct NdiSrcMeta(imp::NdiSrcMeta); pub struct NdiSrcMeta(imp::NdiSrcMeta);
@ -27,9 +26,7 @@ pub enum Buffer {
}, },
Metadata { Metadata {
frame: MetadataFrame, frame: MetadataFrame,
#[allow(unused)]
receive_time_gst: gst::ClockTime, receive_time_gst: gst::ClockTime,
#[allow(unused)]
receive_time_real: gst::ClockTime, receive_time_real: gst::ClockTime,
}, },
} }
@ -41,15 +38,11 @@ impl NdiSrcMeta {
pub fn add( pub fn add(
buffer: &mut gst::BufferRef, buffer: &mut gst::BufferRef,
ndi_buffer: Buffer, ndi_buffer: Buffer,
timestamp_mode: TimestampMode,
) -> gst::MetaRefMut<Self, gst::meta::Standalone> { ) -> gst::MetaRefMut<Self, gst::meta::Standalone> {
unsafe { unsafe {
// Manually dropping because gst_buffer_add_meta() takes ownership of the // Manually dropping because gst_buffer_add_meta() takes ownership of the
// content of the struct // content of the struct
let mut params = mem::ManuallyDrop::new(imp::NdiSrcMetaParams { let mut params = mem::ManuallyDrop::new(imp::NdiSrcMetaParams { ndi_buffer });
ndi_buffer,
timestamp_mode,
});
let meta = gst::ffi::gst_buffer_add_meta( let meta = gst::ffi::gst_buffer_add_meta(
buffer.as_mut_ptr(), buffer.as_mut_ptr(),
@ -83,8 +76,6 @@ impl fmt::Debug for NdiSrcMeta {
} }
mod imp { mod imp {
use crate::TimestampMode;
use super::Buffer; use super::Buffer;
use glib::translate::*; use glib::translate::*;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
@ -93,14 +84,12 @@ mod imp {
pub(super) struct NdiSrcMetaParams { pub(super) struct NdiSrcMetaParams {
pub ndi_buffer: Buffer, pub ndi_buffer: Buffer,
pub timestamp_mode: TimestampMode,
} }
#[repr(C)] #[repr(C)]
pub struct NdiSrcMeta { pub struct NdiSrcMeta {
parent: gst::ffi::GstMeta, parent: gst::ffi::GstMeta,
pub(super) ndi_buffer: Option<Buffer>, pub(super) ndi_buffer: Option<Buffer>,
pub(super) timestamp_mode: TimestampMode,
} }
pub(super) fn ndi_src_meta_api_get_type() -> glib::Type { pub(super) fn ndi_src_meta_api_get_type() -> glib::Type {
@ -129,7 +118,6 @@ mod imp {
let params = ptr::read(params as *const NdiSrcMetaParams); let params = ptr::read(params as *const NdiSrcMetaParams);
ptr::write(&mut meta.ndi_buffer, Some(params.ndi_buffer)); ptr::write(&mut meta.ndi_buffer, Some(params.ndi_buffer));
ptr::write(&mut meta.timestamp_mode, params.timestamp_mode);
true.into_glib() true.into_glib()
} }