From dd0b86bf11ef23c56bdc1879c66e1fe6338f876e Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Thu, 12 May 2022 01:09:58 +0200 Subject: [PATCH] ts-jitterbuffer: improve scheduling of lost events When we have detected that packets are equidistant and have determined a packet spacing, we can schedule lost events "on time" instead of pushing them at the same time as the next received packet. Part-of: --- generic/threadshare/src/jitterbuffer/ffi.rs | 1 + generic/threadshare/src/jitterbuffer/imp.rs | 287 +++++++++++++++--- .../src/jitterbuffer/jitterbuffer.rs | 4 + 3 files changed, 255 insertions(+), 37 deletions(-) diff --git a/generic/threadshare/src/jitterbuffer/ffi.rs b/generic/threadshare/src/jitterbuffer/ffi.rs index 4bde73ef3..006cd5b3f 100644 --- a/generic/threadshare/src/jitterbuffer/ffi.rs +++ b/generic/threadshare/src/jitterbuffer/ffi.rs @@ -86,6 +86,7 @@ extern "C" { gap: c_int, is_rtx: gboolean, ) -> GstClockTime; + pub fn ts_rtp_jitter_buffer_num_packets(jbuf: *mut RTPJitterBuffer) -> c_uint; pub fn ts_rtp_jitter_buffer_insert( jbuf: *mut RTPJitterBuffer, item: *mut RTPJitterBufferItem, diff --git a/generic/threadshare/src/jitterbuffer/imp.rs b/generic/threadshare/src/jitterbuffer/imp.rs index 45ef86e53..a147a2cd0 100644 --- a/generic/threadshare/src/jitterbuffer/imp.rs +++ b/generic/threadshare/src/jitterbuffer/imp.rs @@ -153,6 +153,7 @@ impl SinkHandler { state.last_popped_seqnum = None; state.last_popped_pts = None; + state.last_popped_buffer_pts = None; inner.last_in_seqnum = None; inner.last_rtptime = None; @@ -515,15 +516,25 @@ impl SinkHandler { let mut state = jb.state.lock().unwrap(); - let (latency, context_wait) = { + let (latency, context_wait, do_lost, max_dropout_time) = { let settings = jb.settings.lock().unwrap(); - (settings.latency, settings.context_wait) + ( + settings.latency, + settings.context_wait, + settings.do_lost, + gst::ClockTime::from_mseconds(settings.max_dropout_time as u64), + ) }; // Reschedule if needed - let (_, next_wakeup) = - jb.src_pad_handler - .next_wakeup(&jb.obj(), &state, latency, context_wait); + let (_, next_wakeup) = jb.src_pad_handler.next_wakeup( + &jb.obj(), + &state, + do_lost, + latency, + context_wait, + max_dropout_time, + ); if let Some((next_wakeup, _)) = next_wakeup { if let Some((previous_next_wakeup, ref abort_handle)) = state.wait_handle { if previous_next_wakeup.is_none() @@ -650,6 +661,7 @@ impl SrcHandler { pts: impl Into>, discont: &mut bool, ) -> Vec { + let pts = pts.into(); let (latency, do_lost) = { let jb = element.imp(); let settings = jb.settings.lock().unwrap(); @@ -678,8 +690,13 @@ impl SrcHandler { let gap = gap as u64; // FIXME reason why we can expect Some for the 2 lines below let mut last_popped_pts = state.last_popped_pts.unwrap(); - let interval = pts.into().unwrap().saturating_sub(last_popped_pts); - let spacing = interval / (gap + 1); + + let spacing = if state.equidistant > 3 { + state.packet_spacing + } else { + let interval = pts.unwrap().saturating_sub(last_popped_pts); + interval / (gap + 1) + }; *discont = true; @@ -711,6 +728,10 @@ impl SrcHandler { gst::ClockTime::ZERO }; + if timestamp.opt_gt(pts).unwrap_or(false) { + break; + } + state.last_popped_pts = Some(timestamp); if do_lost { @@ -790,6 +811,7 @@ impl SrcHandler { }; state.last_popped_pts = buffer.pts(); + state.last_popped_buffer_pts = buffer.pts(); if state.last_popped_pts.is_some() { state.position = state.last_popped_pts; } @@ -821,12 +843,69 @@ impl SrcHandler { jb.src_pad.push(buffer).await } + // If there is a gap between the next seqnum we must output and the seqnum of + // the earliest item currently stored, we may want to wake up earlier in order + // to push the corresponding lost event, provided we are reasonably sure that + // packets are equidistant and we have calculated a packet spacing. + fn next_lost_wakeup( + &self, + state: &State, + do_lost: bool, + latency: gst::ClockTime, + context_wait: gst::ClockTime, + max_dropout_time: gst::ClockTime, + ) -> Option { + // No reason to wake up if we've already pushed longer than + // max_dropout_time consecutive PacketLost events + let dropout_time = state + .last_popped_pts + .opt_saturating_sub(state.last_popped_buffer_pts); + if dropout_time.opt_gt(max_dropout_time).unwrap_or(false) { + return None; + } + + if do_lost && state.equidistant > 3 && !state.packet_spacing.is_zero() { + if let Some(last_popped_pts) = state.last_popped_pts { + if let Some(earliest) = state.earliest_seqnum { + if let Some(gap) = state + .last_popped_seqnum + .map(|last| gst_rtp::compare_seqnum(last, earliest)) + { + if gap > 1 { + return Some(last_popped_pts + latency - context_wait / 2); + } + } + } else { + return Some(last_popped_pts + latency - context_wait / 2); + } + } + } + + None + } + + // The time we should wake up at in order to push our earliest item on time + fn next_packet_wakeup( + &self, + state: &State, + latency: gst::ClockTime, + context_wait: gst::ClockTime, + ) -> Option { + state.earliest_pts.map(|earliest_pts| { + (earliest_pts + latency) + .saturating_sub(state.packet_spacing) + .saturating_sub(context_wait / 2) + }) + } + fn next_wakeup( &self, element: &super::JitterBuffer, state: &State, + do_lost: bool, latency: gst::ClockTime, context_wait: gst::ClockTime, + max_dropout_time: gst::ClockTime, ) -> ( Option, Option<(Option, Duration)>, @@ -849,29 +928,26 @@ impl SrcHandler { return (now, Some((now, Duration::ZERO))); } - if state.earliest_pts.is_none() { - return (now, None); + if let Some(next_wakeup) = self + .next_lost_wakeup(state, do_lost, latency, context_wait, max_dropout_time) + .or_else(|| self.next_packet_wakeup(state, latency, context_wait)) + { + let delay = next_wakeup + .opt_saturating_sub(now) + .unwrap_or(gst::ClockTime::ZERO); + + gst::debug!( + CAT, + obj = element, + "Next wakeup at {} with delay {}", + next_wakeup.display(), + delay + ); + + (now, Some((Some(next_wakeup), delay.into()))) + } else { + (now, None) } - - let next_wakeup = state.earliest_pts.map(|earliest_pts| { - (earliest_pts + latency) - .saturating_sub(state.packet_spacing) - .saturating_sub(context_wait / 2) - }); - - let delay = next_wakeup - .opt_saturating_sub(now) - .unwrap_or(gst::ClockTime::ZERO); - - gst::debug!( - CAT, - obj = element, - "Next wakeup at {} with delay {}", - next_wakeup.display(), - delay - ); - - (now, Some((next_wakeup, delay.into()))) } } @@ -977,6 +1053,8 @@ struct State { last_popped_seqnum: Option, last_popped_pts: Option, + /* Not affected by PacketLost events */ + last_popped_buffer_pts: Option, stats: Stats, @@ -1005,6 +1083,7 @@ impl Default for State { last_popped_seqnum: None, last_popped_pts: None, + last_popped_buffer_pts: None, stats: Stats::default(), @@ -1073,9 +1152,14 @@ impl TaskImpl for JitterBufferTask { fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { async move { let jb = self.element.imp(); - let (latency, context_wait) = { + let (latency, context_wait, do_lost, max_dropout_time) = { let settings = jb.settings.lock().unwrap(); - (settings.latency, settings.context_wait) + ( + settings.latency, + settings.context_wait, + settings.do_lost, + gst::ClockTime::from_mseconds(settings.max_dropout_time as u64), + ) }; loop { @@ -1084,8 +1168,10 @@ impl TaskImpl for JitterBufferTask { let (_, next_wakeup) = self.src_pad_handler.next_wakeup( &self.element, &state, + do_lost, latency, context_wait, + max_dropout_time, ); let (delay_fut, abort_handle) = match next_wakeup { @@ -1121,15 +1207,17 @@ impl TaskImpl for JitterBufferTask { } } - let (head_pts, head_seq) = { - let state = jb.state.lock().unwrap(); + let (head_pts, head_seq, lost_events) = { + let mut state = jb.state.lock().unwrap(); // // Check earliest PTS as we have just taken the lock let (now, next_wakeup) = self.src_pad_handler.next_wakeup( &self.element, &state, + do_lost, latency, context_wait, + max_dropout_time, ); gst::debug!( @@ -1150,10 +1238,128 @@ impl TaskImpl for JitterBufferTask { } let (head_pts, head_seq) = state.jbuf.peek(); + let mut events = vec![]; - (head_pts, head_seq) + // We may have woken up in order to push lost events on time + // (see next_packet_wakeup()) + if do_lost && state.equidistant > 3 && !state.packet_spacing.is_zero() { + loop { + // Make sure we don't push longer than max_dropout_time + // consecutive PacketLost events + let dropout_time = state + .last_popped_pts + .opt_saturating_sub(state.last_popped_buffer_pts); + if dropout_time.opt_gt(max_dropout_time).unwrap_or(false) { + break; + } + + if let Some((lost_seq, lost_pts)) = + state.last_popped_seqnum.and_then(|last| { + if let Some(last_popped_pts) = state.last_popped_pts { + let next = last.wrapping_add(1); + if (last_popped_pts + latency - context_wait / 2) + .opt_lt(now) + .unwrap_or(false) + { + if let Some(earliest) = state.earliest_seqnum { + if next != earliest { + Some(( + next, + last_popped_pts + state.packet_spacing, + )) + } else { + None + } + } else { + Some((next, last_popped_pts + state.packet_spacing)) + } + } else { + None + } + } else { + None + } + }) + { + if (lost_pts + latency).opt_lt(now).unwrap_or(false) { + /* We woke up to push the next lost event exactly on time, yet + * clearly we are now too late to do so. This may have happened + * because of a seqnum jump on the input side or some other + * condition, but in any case we want to let the regular + * generate_lost_events method take over, with its lost events + * aggregation logic. + */ + break; + } + + if lost_pts.opt_gt(state.earliest_pts).unwrap_or(false) { + /* Don't let our logic carry us too far in the future */ + break; + } + + let s = gst::Structure::builder("GstRTPPacketLost") + .field("seqnum", lost_seq as u32) + .field("timestamp", lost_pts) + .field("duration", state.packet_spacing) + .field("retry", 0) + .build(); + + events.push(gst::event::CustomDownstream::new(s)); + state.stats.num_lost += 1; + state.last_popped_pts = Some(lost_pts); + state.last_popped_seqnum = Some(lost_seq); + } else { + break; + } + } + } + + (head_pts, head_seq, events) }; + { + // Push any lost events we may have woken up to push on schedule + for event in lost_events { + gst::debug!( + CAT, + obj = jb.src_pad.gst_pad(), + "Pushing lost event {:?}", + event + ); + let _ = jb.src_pad.push_event(event).await; + } + + let state = jb.state.lock().unwrap(); + // + // Now recheck earliest PTS as we have just retaken the lock and may + // have advanced last_popped_* fields + let (now, next_wakeup) = self.src_pad_handler.next_wakeup( + &self.element, + &state, + do_lost, + latency, + context_wait, + max_dropout_time, + ); + + gst::debug!( + CAT, + obj = &self.element, + "Woke up at {}, earliest_pts {}", + now.display(), + state.earliest_pts.display() + ); + + if let Some((next_wakeup, _)) = next_wakeup { + if next_wakeup.opt_gt(now).unwrap_or(false) { + // Reschedule and wait a bit longer in the next iteration + return Ok(()); + } + } else { + return Ok(()); + } + } + let res = self.src_pad_handler.pop_and_push(&self.element).await; { @@ -1162,9 +1368,14 @@ impl TaskImpl for JitterBufferTask { state.last_res = res; if head_pts == state.earliest_pts && head_seq == state.earliest_seqnum { - let (earliest_pts, earliest_seqnum) = state.jbuf.find_earliest(); - state.earliest_pts = earliest_pts; - state.earliest_seqnum = earliest_seqnum; + if state.jbuf.num_packets() > 0 { + let (earliest_pts, earliest_seqnum) = state.jbuf.find_earliest(); + state.earliest_pts = earliest_pts; + state.earliest_seqnum = earliest_seqnum; + } else { + state.earliest_pts = None; + state.earliest_seqnum = None; + } } if res.is_ok() { @@ -1173,8 +1384,10 @@ impl TaskImpl for JitterBufferTask { let (now, next_wakeup) = self.src_pad_handler.next_wakeup( &self.element, &state, + do_lost, latency, context_wait, + max_dropout_time, ); if let Some((Some(next_wakeup), _)) = next_wakeup { if now.is_some_and(|now| next_wakeup > now) { diff --git a/generic/threadshare/src/jitterbuffer/jitterbuffer.rs b/generic/threadshare/src/jitterbuffer/jitterbuffer.rs index 5b08b803e..15c5d4352 100644 --- a/generic/threadshare/src/jitterbuffer/jitterbuffer.rs +++ b/generic/threadshare/src/jitterbuffer/jitterbuffer.rs @@ -363,6 +363,10 @@ impl RTPJitterBuffer { pub fn reset_skew(&self) { unsafe { ffi::ts_rtp_jitter_buffer_reset_skew(self.to_glib_none().0) } } + + pub fn num_packets(&self) -> u32 { + unsafe { ffi::ts_rtp_jitter_buffer_num_packets(self.to_glib_none().0) } + } } impl Default for RTPJitterBuffer {