mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2024-11-26 13:31:00 +00:00
ndisrc: Improve handling of broken sources with regards to timestamping
- NDI HX Camera Android in the past used 1ns instead of 100ns as unit for timecodes/timestamps. - NDI HX Camera iOS uses 0 for all timecodes and the same non-zero value for all audio timestamps Detect such situations and try to compensate for them. Also add a new "auto" timestamping mode that prefers to use timecodes and otherwise falls back to timestamps or receive times. Fixes https://github.com/teltek/gst-plugin-ndi/issues/79
This commit is contained in:
parent
a3c752830b
commit
9c10ba87df
3 changed files with 173 additions and 47 deletions
|
@ -24,16 +24,18 @@ use once_cell::sync::Lazy;
|
||||||
#[repr(u32)]
|
#[repr(u32)]
|
||||||
#[enum_type(name = "GstNdiTimestampMode")]
|
#[enum_type(name = "GstNdiTimestampMode")]
|
||||||
pub enum TimestampMode {
|
pub enum TimestampMode {
|
||||||
|
#[enum_value(name = "Auto", nick = "auto")]
|
||||||
|
Auto = 0,
|
||||||
#[enum_value(name = "Receive Time / Timecode", nick = "receive-time-vs-timecode")]
|
#[enum_value(name = "Receive Time / Timecode", nick = "receive-time-vs-timecode")]
|
||||||
ReceiveTimeTimecode = 0,
|
ReceiveTimeTimecode = 1,
|
||||||
#[enum_value(name = "Receive Time / Timestamp", nick = "receive-time-vs-timestamp")]
|
#[enum_value(name = "Receive Time / Timestamp", nick = "receive-time-vs-timestamp")]
|
||||||
ReceiveTimeTimestamp = 1,
|
ReceiveTimeTimestamp = 2,
|
||||||
#[enum_value(name = "NDI Timecode", nick = "timecode")]
|
#[enum_value(name = "NDI Timecode", nick = "timecode")]
|
||||||
Timecode = 2,
|
Timecode = 3,
|
||||||
#[enum_value(name = "NDI Timestamp", nick = "timestamp")]
|
#[enum_value(name = "NDI Timestamp", nick = "timestamp")]
|
||||||
Timestamp = 3,
|
Timestamp = 4,
|
||||||
#[enum_value(name = "Receive Time", nick = "receive-time")]
|
#[enum_value(name = "Receive Time", nick = "receive-time")]
|
||||||
ReceiveTime = 4,
|
ReceiveTime = 5,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)]
|
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)]
|
||||||
|
|
|
@ -53,7 +53,7 @@ impl Default for Settings {
|
||||||
max_queue_length: 10,
|
max_queue_length: 10,
|
||||||
bandwidth: ndisys::NDIlib_recv_bandwidth_highest,
|
bandwidth: ndisys::NDIlib_recv_bandwidth_highest,
|
||||||
color_format: RecvColorFormat::UyvyBgra,
|
color_format: RecvColorFormat::UyvyBgra,
|
||||||
timestamp_mode: TimestampMode::ReceiveTimeTimecode,
|
timestamp_mode: TimestampMode::Auto,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,6 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct Receiver(Arc<ReceiverInner>);
|
pub struct Receiver(Arc<ReceiverInner>);
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq)]
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
|
@ -226,6 +225,7 @@ struct ReceiverQueueInner {
|
||||||
timeout: bool,
|
timeout: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const PREFILL_WINDOW_LENGTH: usize = 12;
|
||||||
const WINDOW_LENGTH: u64 = 512;
|
const WINDOW_LENGTH: u64 = 512;
|
||||||
const WINDOW_DURATION: u64 = 2_000_000_000;
|
const WINDOW_DURATION: u64 = 2_000_000_000;
|
||||||
|
|
||||||
|
@ -240,6 +240,11 @@ struct ObservationsInner {
|
||||||
skew: i64,
|
skew: i64,
|
||||||
filling: bool,
|
filling: bool,
|
||||||
window_size: usize,
|
window_size: usize,
|
||||||
|
|
||||||
|
// Remote/local times for workaround around fundamentally wrong slopes
|
||||||
|
// This is not reset below and has a bigger window.
|
||||||
|
times: VecDeque<(u64, u64)>,
|
||||||
|
slope_correction: (u64, u64),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for ObservationsInner {
|
impl Default for ObservationsInner {
|
||||||
|
@ -252,10 +257,24 @@ impl Default for ObservationsInner {
|
||||||
skew: 0,
|
skew: 0,
|
||||||
filling: true,
|
filling: true,
|
||||||
window_size: 0,
|
window_size: 0,
|
||||||
|
times: VecDeque::new(),
|
||||||
|
slope_correction: (1, 1),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl ObservationsInner {
|
||||||
|
fn reset(&mut self) {
|
||||||
|
self.base_local_time = None;
|
||||||
|
self.base_remote_time = None;
|
||||||
|
self.deltas = VecDeque::new();
|
||||||
|
self.min_delta = 0;
|
||||||
|
self.skew = 0;
|
||||||
|
self.filling = true;
|
||||||
|
self.window_size = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Observations {
|
impl Observations {
|
||||||
// Based on the algorithm used in GStreamer's rtpjitterbuffer, which comes from
|
// Based on the algorithm used in GStreamer's rtpjitterbuffer, which comes from
|
||||||
// Fober, Orlarey and Letz, 2005, "Real Time Clock Skew Estimation over Network Delays":
|
// Fober, Orlarey and Letz, 2005, "Real Time Clock Skew Estimation over Network Delays":
|
||||||
|
@ -266,22 +285,41 @@ impl Observations {
|
||||||
remote_time: Option<gst::ClockTime>,
|
remote_time: Option<gst::ClockTime>,
|
||||||
local_time: gst::ClockTime,
|
local_time: gst::ClockTime,
|
||||||
duration: Option<gst::ClockTime>,
|
duration: Option<gst::ClockTime>,
|
||||||
) -> (gst::ClockTime, Option<gst::ClockTime>, bool) {
|
) -> Option<(gst::ClockTime, Option<gst::ClockTime>, bool)> {
|
||||||
let remote_time = match remote_time {
|
let remote_time = remote_time?.nseconds();
|
||||||
None => return (local_time, duration, false),
|
|
||||||
Some(remote_time) => remote_time.nseconds(),
|
|
||||||
};
|
|
||||||
let local_time = local_time.nseconds();
|
let local_time = local_time.nseconds();
|
||||||
|
|
||||||
|
let mut inner = self.0.borrow_mut();
|
||||||
|
|
||||||
gst_trace!(
|
gst_trace!(
|
||||||
CAT,
|
CAT,
|
||||||
obj: element,
|
obj: element,
|
||||||
"Local time {}, remote time {}",
|
"Local time {}, remote time {}, slope correct {}/{}",
|
||||||
gst::ClockTime::from_nseconds(local_time),
|
gst::ClockTime::from_nseconds(local_time),
|
||||||
gst::ClockTime::from_nseconds(remote_time),
|
gst::ClockTime::from_nseconds(remote_time),
|
||||||
|
inner.slope_correction.0,
|
||||||
|
inner.slope_correction.1,
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut inner = self.0.borrow_mut();
|
inner.times.push_back((remote_time, local_time));
|
||||||
|
while inner
|
||||||
|
.times
|
||||||
|
.back()
|
||||||
|
.unwrap()
|
||||||
|
.1
|
||||||
|
.saturating_sub(inner.times.front().unwrap().1)
|
||||||
|
> WINDOW_DURATION
|
||||||
|
{
|
||||||
|
let _ = inner.times.pop_front();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Static remote times
|
||||||
|
if inner.slope_correction.1 == 0 {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
let remote_time =
|
||||||
|
remote_time.mul_div_round(inner.slope_correction.0, inner.slope_correction.1)?;
|
||||||
|
|
||||||
let (base_remote_time, base_local_time) =
|
let (base_remote_time, base_local_time) =
|
||||||
match (inner.base_remote_time, inner.base_local_time) {
|
match (inner.base_remote_time, inner.base_local_time) {
|
||||||
|
@ -297,10 +335,102 @@ impl Observations {
|
||||||
inner.base_remote_time = Some(remote_time);
|
inner.base_remote_time = Some(remote_time);
|
||||||
inner.base_local_time = Some(local_time);
|
inner.base_local_time = Some(local_time);
|
||||||
|
|
||||||
return (gst::ClockTime::from_nseconds(local_time), duration, true);
|
return Some((gst::ClockTime::from_nseconds(local_time), duration, true));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if inner.times.len() < PREFILL_WINDOW_LENGTH {
|
||||||
|
return Some((gst::ClockTime::from_nseconds(local_time), duration, false));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the slope is simply wrong and try correcting
|
||||||
|
{
|
||||||
|
let local_diff = inner
|
||||||
|
.times
|
||||||
|
.back()
|
||||||
|
.unwrap()
|
||||||
|
.1
|
||||||
|
.saturating_sub(inner.times.front().unwrap().1);
|
||||||
|
let remote_diff = inner
|
||||||
|
.times
|
||||||
|
.back()
|
||||||
|
.unwrap()
|
||||||
|
.0
|
||||||
|
.saturating_sub(inner.times.front().unwrap().0);
|
||||||
|
|
||||||
|
if remote_diff == 0 {
|
||||||
|
inner.reset();
|
||||||
|
inner.base_remote_time = Some(remote_time);
|
||||||
|
inner.base_local_time = Some(local_time);
|
||||||
|
|
||||||
|
// Static remote times
|
||||||
|
inner.slope_correction = (0, 0);
|
||||||
|
return None;
|
||||||
|
} else {
|
||||||
|
let slope = local_diff as f64 / remote_diff as f64;
|
||||||
|
let scaled_slope =
|
||||||
|
slope * (inner.slope_correction.1 as f64) / (inner.slope_correction.0 as f64);
|
||||||
|
|
||||||
|
// Check for some obviously wrong slopes and try to correct for that
|
||||||
|
if !(0.5..1.5).contains(&scaled_slope) {
|
||||||
|
gst_warning!(
|
||||||
|
CAT,
|
||||||
|
obj: element,
|
||||||
|
"Too small/big slope {}, resetting",
|
||||||
|
scaled_slope
|
||||||
|
);
|
||||||
|
|
||||||
|
let discont = !inner.deltas.is_empty();
|
||||||
|
inner.reset();
|
||||||
|
|
||||||
|
if (0.0005..0.0015).contains(&slope) {
|
||||||
|
// Remote unit was actually 0.1ns
|
||||||
|
inner.slope_correction = (1, 1000);
|
||||||
|
} else if (0.005..0.015).contains(&slope) {
|
||||||
|
// Remote unit was actually 1ns
|
||||||
|
inner.slope_correction = (1, 100);
|
||||||
|
} else if (0.05..0.15).contains(&slope) {
|
||||||
|
// Remote unit was actually 10ns
|
||||||
|
inner.slope_correction = (1, 10);
|
||||||
|
} else if (5.0..15.0).contains(&slope) {
|
||||||
|
// Remote unit was actually 1us
|
||||||
|
inner.slope_correction = (10, 1);
|
||||||
|
} else if (50.0..150.0).contains(&slope) {
|
||||||
|
// Remote unit was actually 10us
|
||||||
|
inner.slope_correction = (100, 1);
|
||||||
|
} else if (50.0..150.0).contains(&slope) {
|
||||||
|
// Remote unit was actually 100us
|
||||||
|
inner.slope_correction = (1000, 1);
|
||||||
|
} else if (50.0..150.0).contains(&slope) {
|
||||||
|
// Remote unit was actually 1ms
|
||||||
|
inner.slope_correction = (10000, 1);
|
||||||
|
} else {
|
||||||
|
inner.slope_correction = (1, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
let remote_time = inner
|
||||||
|
.times
|
||||||
|
.back()
|
||||||
|
.unwrap()
|
||||||
|
.0
|
||||||
|
.mul_div_round(inner.slope_correction.0, inner.slope_correction.1)?;
|
||||||
|
gst_debug!(
|
||||||
|
CAT,
|
||||||
|
obj: element,
|
||||||
|
"Initializing base time: local {}, remote {}, slope correction {}/{}",
|
||||||
|
gst::ClockTime::from_nseconds(local_time),
|
||||||
|
gst::ClockTime::from_nseconds(remote_time),
|
||||||
|
inner.slope_correction.0,
|
||||||
|
inner.slope_correction.1,
|
||||||
|
);
|
||||||
|
inner.base_remote_time = Some(remote_time);
|
||||||
|
inner.base_local_time = Some(local_time);
|
||||||
|
|
||||||
|
return Some((gst::ClockTime::from_nseconds(local_time), duration, discont));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let remote_diff = remote_time.saturating_sub(base_remote_time);
|
let remote_diff = remote_time.saturating_sub(base_remote_time);
|
||||||
let local_diff = local_time.saturating_sub(base_local_time);
|
let local_diff = local_time.saturating_sub(base_local_time);
|
||||||
let delta = (local_diff as i64) - (remote_diff as i64);
|
let delta = (local_diff as i64) - (remote_diff as i64);
|
||||||
|
@ -314,33 +444,6 @@ impl Observations {
|
||||||
delta,
|
delta,
|
||||||
);
|
);
|
||||||
|
|
||||||
if remote_diff > 0 && local_diff > 0 {
|
|
||||||
let slope = (local_diff as f64) / (remote_diff as f64);
|
|
||||||
if !(0.8..1.2).contains(&slope) {
|
|
||||||
gst_warning!(
|
|
||||||
CAT,
|
|
||||||
obj: element,
|
|
||||||
"Too small/big slope {}, resetting",
|
|
||||||
slope
|
|
||||||
);
|
|
||||||
|
|
||||||
let discont = !inner.deltas.is_empty();
|
|
||||||
*inner = ObservationsInner::default();
|
|
||||||
|
|
||||||
gst_debug!(
|
|
||||||
CAT,
|
|
||||||
obj: element,
|
|
||||||
"Initializing base time: local {}, remote {}",
|
|
||||||
gst::ClockTime::from_nseconds(local_time),
|
|
||||||
gst::ClockTime::from_nseconds(remote_time),
|
|
||||||
);
|
|
||||||
inner.base_remote_time = Some(remote_time);
|
|
||||||
inner.base_local_time = Some(local_time);
|
|
||||||
|
|
||||||
return (gst::ClockTime::from_nseconds(local_time), duration, discont);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (delta > inner.skew && delta - inner.skew > 1_000_000_000)
|
if (delta > inner.skew && delta - inner.skew > 1_000_000_000)
|
||||||
|| (delta < inner.skew && inner.skew - delta > 1_000_000_000)
|
|| (delta < inner.skew && inner.skew - delta > 1_000_000_000)
|
||||||
{
|
{
|
||||||
|
@ -353,7 +456,6 @@ impl Observations {
|
||||||
);
|
);
|
||||||
|
|
||||||
let discont = !inner.deltas.is_empty();
|
let discont = !inner.deltas.is_empty();
|
||||||
*inner = ObservationsInner::default();
|
|
||||||
|
|
||||||
gst_debug!(
|
gst_debug!(
|
||||||
CAT,
|
CAT,
|
||||||
|
@ -362,10 +464,12 @@ impl Observations {
|
||||||
gst::ClockTime::from_nseconds(local_time),
|
gst::ClockTime::from_nseconds(local_time),
|
||||||
gst::ClockTime::from_nseconds(remote_time),
|
gst::ClockTime::from_nseconds(remote_time),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
inner.reset();
|
||||||
inner.base_remote_time = Some(remote_time);
|
inner.base_remote_time = Some(remote_time);
|
||||||
inner.base_local_time = Some(local_time);
|
inner.base_local_time = Some(local_time);
|
||||||
|
|
||||||
return (gst::ClockTime::from_nseconds(local_time), duration, discont);
|
return Some((gst::ClockTime::from_nseconds(local_time), duration, discont));
|
||||||
}
|
}
|
||||||
|
|
||||||
if inner.filling {
|
if inner.filling {
|
||||||
|
@ -421,7 +525,7 @@ impl Observations {
|
||||||
gst::ClockTime::from_nseconds(out_time)
|
gst::ClockTime::from_nseconds(out_time)
|
||||||
);
|
);
|
||||||
|
|
||||||
(gst::ClockTime::from_nseconds(out_time), duration, false)
|
Some((gst::ClockTime::from_nseconds(out_time), duration, false))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -820,8 +924,23 @@ impl Receiver {
|
||||||
);
|
);
|
||||||
|
|
||||||
let (pts, duration, discont) = match self.0.timestamp_mode {
|
let (pts, duration, discont) = match self.0.timestamp_mode {
|
||||||
TimestampMode::ReceiveTimeTimecode => res_timecode,
|
TimestampMode::ReceiveTimeTimecode => match res_timecode {
|
||||||
TimestampMode::ReceiveTimeTimestamp => res_timestamp,
|
Some((pts, duration, discont)) => (pts, duration, discont),
|
||||||
|
None => {
|
||||||
|
gst_warning!(CAT, obj: element, "Can't calculate timestamp");
|
||||||
|
(receive_time, duration, false)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
TimestampMode::ReceiveTimeTimestamp => match res_timestamp {
|
||||||
|
Some((pts, duration, discont)) => (pts, duration, discont),
|
||||||
|
None => {
|
||||||
|
if timestamp.is_some() {
|
||||||
|
gst_warning!(CAT, obj: element, "Can't calculate timestamp");
|
||||||
|
}
|
||||||
|
|
||||||
|
(receive_time, duration, false)
|
||||||
|
}
|
||||||
|
},
|
||||||
TimestampMode::Timecode => (timecode, duration, false),
|
TimestampMode::Timecode => (timecode, duration, false),
|
||||||
TimestampMode::Timestamp if timestamp.is_none() => (receive_time, duration, false),
|
TimestampMode::Timestamp if timestamp.is_none() => (receive_time, duration, false),
|
||||||
TimestampMode::Timestamp => {
|
TimestampMode::Timestamp => {
|
||||||
|
@ -840,6 +959,11 @@ impl Receiver {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
TimestampMode::ReceiveTime => (receive_time, duration, false),
|
TimestampMode::ReceiveTime => (receive_time, duration, false),
|
||||||
|
TimestampMode::Auto => {
|
||||||
|
res_timecode
|
||||||
|
.or(res_timestamp)
|
||||||
|
.unwrap_or((receive_time, duration, false))
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
gst_log!(
|
gst_log!(
|
||||||
|
|
Loading…
Reference in a new issue