Merge pull request #73 from sdroege/ndisrc-timestamp-tracking-improvements

Various improvements to timestamp tracking code in the source
This commit is contained in:
Samuel Alonso Rodriguez 2021-10-01 10:50:36 +02:00 committed by GitHub
commit c98b626f69
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 239 additions and 199 deletions

View file

@ -32,6 +32,8 @@ pub enum TimestampMode {
Timecode = 2,
#[genum(name = "NDI Timestamp", nick = "timestamp")]
Timestamp = 3,
#[genum(name = "Receive Time", nick = "receive-time")]
ReceiveTime = 4,
}
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {

View file

@ -475,7 +475,10 @@ impl BaseSrcImpl for NdiSrc {
let settings = self.settings.lock().unwrap();
if let Some(latency) = state.current_latency {
let min = if settings.timestamp_mode != TimestampMode::Timecode {
let min = if matches!(
settings.timestamp_mode,
TimestampMode::ReceiveTimeTimecode | TimestampMode::ReceiveTimeTimestamp
) {
latency
} else {
gst::ClockTime::ZERO

View file

@ -1,6 +1,6 @@
use glib::prelude::*;
use gst::prelude::*;
use gst::{gst_debug, gst_error, gst_log, gst_warning};
use gst::{gst_debug, gst_error, gst_log, gst_trace, gst_warning};
use gst_video::prelude::*;
use byte_slice_cast::AsMutSliceOf;
@ -74,29 +74,34 @@ struct ReceiverQueueInner {
timeout: bool,
}
// 100 frames observations window over which we calculate the timestamp drift
// between sender and receiver. A bigger window allows more smoothing out of
// network effects
const WINDOW_LENGTH: usize = 100;
const WINDOW_LENGTH: u64 = 512;
const WINDOW_DURATION: u64 = 2_000_000_000;
#[derive(Clone)]
struct Observations(Arc<Mutex<ObservationsInner>>);
struct ObservationsInner {
// NDI timestamp - GStreamer clock time tuples
values: Vec<(u64, u64)>,
values_tmp: [(u64, u64); WINDOW_LENGTH],
current_mapping: TimeMapping,
next_mapping: TimeMapping,
time_mapping_pending: bool,
// How many frames we skipped since last observation
// we took
skip_count: usize,
// How many frames we skip in this period. once skip_count
// reaches this, we take another observation
skip_period: usize,
// How many observations are left until we update the skip_period
// again. This is always initialized to WINDOW_LENGTH
skip_period_update_in: usize,
struct ObservationsInner {
base_remote_time: Option<u64>,
base_local_time: Option<u64>,
deltas: VecDeque<i64>,
min_delta: i64,
skew: i64,
filling: bool,
window_size: usize,
}
impl Default for ObservationsInner {
fn default() -> ObservationsInner {
ObservationsInner {
base_local_time: None,
base_remote_time: None,
deltas: VecDeque::new(),
min_delta: 0,
skew: 0,
filling: true,
window_size: 0,
}
}
}
#[derive(Clone, Copy, Debug)]
@ -109,174 +114,175 @@ struct TimeMapping {
impl Observations {
fn new() -> Self {
Self(Arc::new(Mutex::new(ObservationsInner {
values: Vec::with_capacity(WINDOW_LENGTH),
values_tmp: [(0, 0); WINDOW_LENGTH],
current_mapping: TimeMapping::default(),
next_mapping: TimeMapping::default(),
time_mapping_pending: false,
skip_count: 0,
skip_period: 1,
skip_period_update_in: WINDOW_LENGTH,
})))
Self(Arc::new(Mutex::new(ObservationsInner::default())))
}
// Based on the algorithm used in GStreamer's rtpjitterbuffer, which comes from
// Fober, Orlarey and Letz, 2005, "Real Time Clock Skew Estimation over Network Delays":
// http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.102.1546
fn process(
&self,
element: &gst_base::BaseSrc,
time: (Option<gst::ClockTime>, gst::ClockTime),
duration: Option<gst::ClockTime>,
) -> (gst::ClockTime, Option<gst::ClockTime>) {
) -> (gst::ClockTime, Option<gst::ClockTime>, bool) {
if time.0.is_none() {
return (time.1, duration);
return (time.1, duration, false);
}
let time = (time.0.unwrap(), time.1);
let remote_time = time.0.nseconds();
let local_time = time.1.nseconds();
let mut inner = self.0.lock().unwrap();
let ObservationsInner {
ref mut values,
ref mut values_tmp,
ref mut current_mapping,
ref mut next_mapping,
ref mut time_mapping_pending,
ref mut skip_count,
ref mut skip_period,
ref mut skip_period_update_in,
} = *inner;
if values.is_empty() {
current_mapping.xbase = time.0.nseconds();
current_mapping.b = time.1.nseconds();
current_mapping.num = 1;
current_mapping.den = 1;
}
if *skip_count == 0 {
*skip_count += 1;
if *skip_count >= *skip_period {
*skip_count = 0;
}
*skip_period_update_in -= 1;
if *skip_period_update_in == 0 {
*skip_period_update_in = WINDOW_LENGTH;
// Start by first updating every frame, then every second frame, then every third
// frame, etc. until we update once every quarter second
let framerate = gst::ClockTime::SECOND
.checked_div(duration.unwrap_or(40 * gst::ClockTime::MSECOND).nseconds())
.unwrap_or(25) as usize;
if *skip_period < framerate / 4 + 1 {
*skip_period += 1;
} else {
*skip_period = framerate / 4 + 1;
}
}
assert!(values.len() <= WINDOW_LENGTH);
if values.len() == WINDOW_LENGTH {
values.remove(0);
}
values.push((time.0.nseconds(), time.1.nseconds()));
if let Some((num, den, b, xbase, r_squared)) =
gst::calculate_linear_regression(values, Some(values_tmp))
{
next_mapping.xbase = xbase;
next_mapping.b = b;
next_mapping.num = num;
next_mapping.den = den;
*time_mapping_pending = true;
gst_debug!(
CAT,
obj: element,
"Calculated new time mapping: GStreamer time = {} * (NDI time - {}) + {} ({})",
next_mapping.num as f64 / next_mapping.den as f64,
gst::ClockTime::from_nseconds(next_mapping.xbase),
gst::ClockTime::from_nseconds(next_mapping.b),
r_squared,
);
}
} else {
*skip_count += 1;
if *skip_count >= *skip_period {
*skip_count = 0;
}
}
if *time_mapping_pending {
let expected = gst::Clock::adjust_with_calibration(
time.0,
gst::ClockTime::from_nseconds(current_mapping.xbase),
gst::ClockTime::from_nseconds(current_mapping.b),
gst::ClockTime::from_nseconds(current_mapping.num),
gst::ClockTime::from_nseconds(current_mapping.den),
);
let new_calculated = gst::Clock::adjust_with_calibration(
time.0,
gst::ClockTime::from_nseconds(next_mapping.xbase),
gst::ClockTime::from_nseconds(next_mapping.b),
gst::ClockTime::from_nseconds(next_mapping.num),
gst::ClockTime::from_nseconds(next_mapping.den),
);
let diff = if new_calculated > expected {
new_calculated - expected
} else {
expected - new_calculated
};
// Allow at most 5% frame duration or 2ms difference per frame
let max_diff = cmp::max(
(duration.map(|d| d / 10)).unwrap_or(2 * gst::ClockTime::MSECOND),
2 * gst::ClockTime::MSECOND,
);
if diff > max_diff {
gst_debug!(
CAT,
obj: element,
"New time mapping causes difference {} but only {} allowed",
diff,
max_diff,
);
if new_calculated > expected {
current_mapping.b = (expected + max_diff).nseconds();
current_mapping.xbase = time.0.nseconds();
} else {
current_mapping.b = (expected - max_diff).nseconds();
current_mapping.xbase = time.0.nseconds();
}
} else {
*current_mapping = *next_mapping;
}
}
let converted_timestamp = gst::Clock::adjust_with_calibration(
time.0,
gst::ClockTime::from_nseconds(current_mapping.xbase),
gst::ClockTime::from_nseconds(current_mapping.b),
gst::ClockTime::from_nseconds(current_mapping.num),
gst::ClockTime::from_nseconds(current_mapping.den),
);
let converted_duration =
duration.and_then(|d| d.mul_div_floor(current_mapping.num, current_mapping.den));
gst_debug!(
gst_trace!(
CAT,
obj: element,
"Converted timestamp {}/{} to {}, duration {} to {}",
time.0,
time.1,
converted_timestamp.display(),
duration.display(),
converted_duration.display(),
"Local time {}, remote time {}",
gst::ClockTime::from_nseconds(local_time),
gst::ClockTime::from_nseconds(remote_time),
);
(converted_timestamp, converted_duration)
let mut inner = self.0.lock().unwrap();
let (base_remote_time, base_local_time) =
match (inner.base_remote_time, inner.base_local_time) {
(Some(remote), Some(local)) => (remote, local),
_ => {
gst_debug!(
CAT,
obj: element,
"Initializing base time: local {}, remote {}",
gst::ClockTime::from_nseconds(local_time),
gst::ClockTime::from_nseconds(remote_time),
);
inner.base_remote_time = Some(remote_time);
inner.base_local_time = Some(local_time);
return (gst::ClockTime::from_nseconds(local_time), duration, true);
}
};
let remote_diff = remote_time.saturating_sub(base_remote_time);
let local_diff = local_time.saturating_sub(base_local_time);
let delta = (local_diff as i64) - (remote_diff as i64);
gst_trace!(
CAT,
obj: element,
"Local diff {}, remote diff {}, delta {}",
gst::ClockTime::from_nseconds(local_diff),
gst::ClockTime::from_nseconds(remote_diff),
delta,
);
if remote_diff > 0 && local_diff > 0 {
let slope = (local_diff as f64) / (remote_diff as f64);
if slope < 0.8 || slope > 1.2 {
gst_warning!(
CAT,
obj: element,
"Too small/big slope {}, resetting",
slope
);
let discont = !inner.deltas.is_empty();
*inner = ObservationsInner::default();
gst_debug!(
CAT,
obj: element,
"Initializing base time: local {}, remote {}",
gst::ClockTime::from_nseconds(local_time),
gst::ClockTime::from_nseconds(remote_time),
);
inner.base_remote_time = Some(remote_time);
inner.base_local_time = Some(local_time);
return (gst::ClockTime::from_nseconds(local_time), duration, discont);
}
}
if (delta > inner.skew && delta - inner.skew > 1_000_000_000)
|| (delta < inner.skew && inner.skew - delta > 1_000_000_000)
{
gst_warning!(
CAT,
obj: element,
"Delta {} too far from skew {}, resetting",
delta,
inner.skew
);
let discont = !inner.deltas.is_empty();
*inner = ObservationsInner::default();
gst_debug!(
CAT,
obj: element,
"Initializing base time: local {}, remote {}",
gst::ClockTime::from_nseconds(local_time),
gst::ClockTime::from_nseconds(remote_time),
);
inner.base_remote_time = Some(remote_time);
inner.base_local_time = Some(local_time);
return (gst::ClockTime::from_nseconds(local_time), duration, discont);
}
if inner.filling {
if inner.deltas.is_empty() || delta < inner.min_delta {
inner.min_delta = delta;
}
inner.deltas.push_back(delta);
if remote_diff > WINDOW_DURATION || inner.deltas.len() as u64 == WINDOW_LENGTH {
inner.window_size = inner.deltas.len();
inner.skew = inner.min_delta;
inner.filling = false;
} else {
let perc_time = remote_diff.mul_div_floor(100, WINDOW_DURATION).unwrap() as i64;
let perc_window = (inner.deltas.len() as u64)
.mul_div_floor(100, WINDOW_LENGTH)
.unwrap() as i64;
let perc = cmp::max(perc_time, perc_window);
inner.skew = (perc * inner.min_delta + ((10_000 - perc) * inner.skew)) / 10_000;
}
} else {
let old = inner.deltas.pop_front().unwrap();
inner.deltas.push_back(delta);
if delta <= inner.min_delta {
inner.min_delta = delta;
} else if old == inner.min_delta {
inner.min_delta = inner.deltas.iter().copied().min().unwrap();
}
inner.skew = (inner.min_delta + (124 * inner.skew)) / 125;
}
let out_time = base_local_time + remote_diff;
let out_time = if inner.skew < 0 {
out_time.saturating_sub((-inner.skew) as u64)
} else {
out_time + (inner.skew as u64)
};
gst_trace!(
CAT,
obj: element,
"Skew {}, min delta {}",
inner.skew,
inner.min_delta
);
gst_trace!(
CAT,
obj: element,
"Outputting {}",
gst::ClockTime::from_nseconds(out_time)
);
(gst::ClockTime::from_nseconds(out_time), duration, false)
}
}
@ -285,8 +291,8 @@ impl Default for TimeMapping {
Self {
xbase: 0,
b: 0,
num: 1,
den: 1,
num: 0,
den: 0,
}
}
}
@ -493,6 +499,8 @@ impl Receiver {
}
fn receive_thread(receiver: &Weak<ReceiverInner>, recv: RecvInstance) {
let mut first_video_frame = true;
let mut first_audio_frame = true;
let mut first_frame = true;
let mut timer = time::Instant::now();
@ -553,11 +561,31 @@ impl Receiver {
}
Ok(Some(Frame::Video(frame))) => {
first_frame = false;
receiver.create_video_buffer_and_info(&element, frame)
let mut buffer = receiver.create_video_buffer_and_info(&element, frame);
if first_video_frame {
if let Ok(Buffer::Video(ref mut buffer, _)) = buffer {
buffer
.get_mut()
.unwrap()
.set_flags(gst::BufferFlags::DISCONT);
first_video_frame = false;
}
}
buffer
}
Ok(Some(Frame::Audio(frame))) => {
first_frame = false;
receiver.create_audio_buffer_and_info(&element, frame)
let mut buffer = receiver.create_audio_buffer_and_info(&element, frame);
if first_audio_frame {
if let Ok(Buffer::Video(ref mut buffer, _)) = buffer {
buffer
.get_mut()
.unwrap()
.set_flags(gst::BufferFlags::DISCONT);
first_audio_frame = false;
}
}
buffer
}
Ok(Some(Frame::Metadata(frame))) => {
if let Some(metadata) = frame.metadata() {
@ -623,14 +651,8 @@ impl Receiver {
timestamp: i64,
timecode: i64,
duration: Option<gst::ClockTime>,
) -> Option<(gst::ClockTime, Option<gst::ClockTime>)> {
let clock = element.clock()?;
// For now take the current running time as PTS. At a later time we
// will want to work with the timestamp given by the NDI SDK if available
let now = clock.time()?;
let base_time = element.base_time()?;
let receive_time = now - base_time;
) -> Option<(gst::ClockTime, Option<gst::ClockTime>, bool)> {
let receive_time = element.current_running_time()?;
let real_time_now = gst::ClockTime::from_nseconds(glib::real_time() as u64 * 1000);
let timestamp = if timestamp == ndisys::NDIlib_recv_timestamp_undefined {
@ -651,7 +673,7 @@ impl Receiver {
real_time_now,
);
let (pts, duration) = match self.0.timestamp_mode {
let (pts, duration, discont) = match self.0.timestamp_mode {
TimestampMode::ReceiveTimeTimecode => {
self.0
.observations
@ -662,23 +684,24 @@ impl Receiver {
.observations
.process(element, (timestamp, receive_time), duration)
}
TimestampMode::Timecode => (timecode, duration),
TimestampMode::Timestamp if timestamp.is_none() => (receive_time, duration),
TimestampMode::Timecode => (timecode, duration, false),
TimestampMode::Timestamp if timestamp.is_none() => (receive_time, duration, false),
TimestampMode::Timestamp => {
// Timestamps are relative to the UNIX epoch
let timestamp = timestamp?;
if real_time_now > timestamp {
let diff = real_time_now - timestamp;
if diff > receive_time {
(gst::ClockTime::ZERO, duration)
(gst::ClockTime::ZERO, duration, false)
} else {
(receive_time - diff, duration)
(receive_time - diff, duration, false)
}
} else {
let diff = timestamp - real_time_now;
(receive_time + diff, duration)
(receive_time + diff, duration, false)
}
}
TimestampMode::ReceiveTime => (receive_time, duration, false),
};
gst_log!(
@ -689,7 +712,7 @@ impl Receiver {
duration.display(),
);
Some((pts, duration))
Some((pts, duration, discont))
}
fn create_video_buffer_and_info(
@ -699,7 +722,7 @@ impl Receiver {
) -> Result<Buffer, gst::FlowError> {
gst_debug!(CAT, obj: element, "Received video frame {:?}", video_frame);
let (pts, duration) = self
let (pts, duration, discont) = self
.calculate_video_timestamp(element, &video_frame)
.ok_or_else(|| {
gst_debug!(CAT, obj: element, "Flushing, dropping buffer");
@ -708,7 +731,13 @@ impl Receiver {
let info = self.create_video_info(element, &video_frame)?;
let buffer = self.create_video_buffer(element, pts, duration, &info, &video_frame);
let mut buffer = self.create_video_buffer(element, pts, duration, &info, &video_frame);
if discont {
buffer
.get_mut()
.unwrap()
.set_flags(gst::BufferFlags::RESYNC);
}
gst_log!(CAT, obj: element, "Produced video buffer {:?}", buffer);
@ -719,7 +748,7 @@ impl Receiver {
&self,
element: &gst_base::BaseSrc,
video_frame: &VideoFrame,
) -> Option<(gst::ClockTime, Option<gst::ClockTime>)> {
) -> Option<(gst::ClockTime, Option<gst::ClockTime>, bool)> {
let duration = gst::ClockTime::SECOND.mul_div_floor(
video_frame.frame_rate().1 as u64,
video_frame.frame_rate().0 as u64,
@ -1055,7 +1084,7 @@ impl Receiver {
) -> Result<Buffer, gst::FlowError> {
gst_debug!(CAT, obj: element, "Received audio frame {:?}", audio_frame);
let (pts, duration) = self
let (pts, duration, discont) = self
.calculate_audio_timestamp(element, &audio_frame)
.ok_or_else(|| {
gst_debug!(CAT, obj: element, "Flushing, dropping buffer");
@ -1064,7 +1093,13 @@ impl Receiver {
let info = self.create_audio_info(element, &audio_frame)?;
let buffer = self.create_audio_buffer(element, pts, duration, &info, &audio_frame);
let mut buffer = self.create_audio_buffer(element, pts, duration, &info, &audio_frame);
if discont {
buffer
.get_mut()
.unwrap()
.set_flags(gst::BufferFlags::RESYNC);
}
gst_log!(CAT, obj: element, "Produced audio buffer {:?}", buffer);
@ -1075,7 +1110,7 @@ impl Receiver {
&self,
element: &gst_base::BaseSrc,
audio_frame: &AudioFrame,
) -> Option<(gst::ClockTime, Option<gst::ClockTime>)> {
) -> Option<(gst::ClockTime, Option<gst::ClockTime>, bool)> {
let duration = gst::ClockTime::SECOND.mul_div_floor(
audio_frame.no_samples() as u64,
audio_frame.sample_rate() as u64,