ts-jitterbuffer: improve scheduling of lost events

When we have detected that packets are equidistant and have
determined a packet spacing, we can schedule lost events "on time"
instead of pushing them at the same time as the next received
packet.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/756>
This commit is contained in:
Mathieu Duponchelle 2022-05-12 01:09:58 +02:00 committed by GStreamer Marge Bot
parent 8dd8d67974
commit dd0b86bf11
3 changed files with 255 additions and 37 deletions

View file

@ -86,6 +86,7 @@ extern "C" {
gap: c_int,
is_rtx: gboolean,
) -> GstClockTime;
pub fn ts_rtp_jitter_buffer_num_packets(jbuf: *mut RTPJitterBuffer) -> c_uint;
pub fn ts_rtp_jitter_buffer_insert(
jbuf: *mut RTPJitterBuffer,
item: *mut RTPJitterBufferItem,

View file

@ -153,6 +153,7 @@ impl SinkHandler {
state.last_popped_seqnum = None;
state.last_popped_pts = None;
state.last_popped_buffer_pts = None;
inner.last_in_seqnum = None;
inner.last_rtptime = None;
@ -515,15 +516,25 @@ impl SinkHandler {
let mut state = jb.state.lock().unwrap();
let (latency, context_wait) = {
let (latency, context_wait, do_lost, max_dropout_time) = {
let settings = jb.settings.lock().unwrap();
(settings.latency, settings.context_wait)
(
settings.latency,
settings.context_wait,
settings.do_lost,
gst::ClockTime::from_mseconds(settings.max_dropout_time as u64),
)
};
// Reschedule if needed
let (_, next_wakeup) =
jb.src_pad_handler
.next_wakeup(&jb.obj(), &state, latency, context_wait);
let (_, next_wakeup) = jb.src_pad_handler.next_wakeup(
&jb.obj(),
&state,
do_lost,
latency,
context_wait,
max_dropout_time,
);
if let Some((next_wakeup, _)) = next_wakeup {
if let Some((previous_next_wakeup, ref abort_handle)) = state.wait_handle {
if previous_next_wakeup.is_none()
@ -650,6 +661,7 @@ impl SrcHandler {
pts: impl Into<Option<gst::ClockTime>>,
discont: &mut bool,
) -> Vec<gst::Event> {
let pts = pts.into();
let (latency, do_lost) = {
let jb = element.imp();
let settings = jb.settings.lock().unwrap();
@ -678,8 +690,13 @@ impl SrcHandler {
let gap = gap as u64;
// FIXME reason why we can expect Some for the 2 lines below
let mut last_popped_pts = state.last_popped_pts.unwrap();
let interval = pts.into().unwrap().saturating_sub(last_popped_pts);
let spacing = interval / (gap + 1);
let spacing = if state.equidistant > 3 {
state.packet_spacing
} else {
let interval = pts.unwrap().saturating_sub(last_popped_pts);
interval / (gap + 1)
};
*discont = true;
@ -711,6 +728,10 @@ impl SrcHandler {
gst::ClockTime::ZERO
};
if timestamp.opt_gt(pts).unwrap_or(false) {
break;
}
state.last_popped_pts = Some(timestamp);
if do_lost {
@ -790,6 +811,7 @@ impl SrcHandler {
};
state.last_popped_pts = buffer.pts();
state.last_popped_buffer_pts = buffer.pts();
if state.last_popped_pts.is_some() {
state.position = state.last_popped_pts;
}
@ -821,12 +843,69 @@ impl SrcHandler {
jb.src_pad.push(buffer).await
}
// If there is a gap between the next seqnum we must output and the seqnum of
// the earliest item currently stored, we may want to wake up earlier in order
// to push the corresponding lost event, provided we are reasonably sure that
// packets are equidistant and we have calculated a packet spacing.
fn next_lost_wakeup(
&self,
state: &State,
do_lost: bool,
latency: gst::ClockTime,
context_wait: gst::ClockTime,
max_dropout_time: gst::ClockTime,
) -> Option<gst::ClockTime> {
// No reason to wake up if we've already pushed longer than
// max_dropout_time consecutive PacketLost events
let dropout_time = state
.last_popped_pts
.opt_saturating_sub(state.last_popped_buffer_pts);
if dropout_time.opt_gt(max_dropout_time).unwrap_or(false) {
return None;
}
if do_lost && state.equidistant > 3 && !state.packet_spacing.is_zero() {
if let Some(last_popped_pts) = state.last_popped_pts {
if let Some(earliest) = state.earliest_seqnum {
if let Some(gap) = state
.last_popped_seqnum
.map(|last| gst_rtp::compare_seqnum(last, earliest))
{
if gap > 1 {
return Some(last_popped_pts + latency - context_wait / 2);
}
}
} else {
return Some(last_popped_pts + latency - context_wait / 2);
}
}
}
None
}
// The time we should wake up at in order to push our earliest item on time
fn next_packet_wakeup(
&self,
state: &State,
latency: gst::ClockTime,
context_wait: gst::ClockTime,
) -> Option<gst::ClockTime> {
state.earliest_pts.map(|earliest_pts| {
(earliest_pts + latency)
.saturating_sub(state.packet_spacing)
.saturating_sub(context_wait / 2)
})
}
fn next_wakeup(
&self,
element: &super::JitterBuffer,
state: &State,
do_lost: bool,
latency: gst::ClockTime,
context_wait: gst::ClockTime,
max_dropout_time: gst::ClockTime,
) -> (
Option<gst::ClockTime>,
Option<(Option<gst::ClockTime>, Duration)>,
@ -849,29 +928,26 @@ impl SrcHandler {
return (now, Some((now, Duration::ZERO)));
}
if state.earliest_pts.is_none() {
return (now, None);
if let Some(next_wakeup) = self
.next_lost_wakeup(state, do_lost, latency, context_wait, max_dropout_time)
.or_else(|| self.next_packet_wakeup(state, latency, context_wait))
{
let delay = next_wakeup
.opt_saturating_sub(now)
.unwrap_or(gst::ClockTime::ZERO);
gst::debug!(
CAT,
obj = element,
"Next wakeup at {} with delay {}",
next_wakeup.display(),
delay
);
(now, Some((Some(next_wakeup), delay.into())))
} else {
(now, None)
}
let next_wakeup = state.earliest_pts.map(|earliest_pts| {
(earliest_pts + latency)
.saturating_sub(state.packet_spacing)
.saturating_sub(context_wait / 2)
});
let delay = next_wakeup
.opt_saturating_sub(now)
.unwrap_or(gst::ClockTime::ZERO);
gst::debug!(
CAT,
obj = element,
"Next wakeup at {} with delay {}",
next_wakeup.display(),
delay
);
(now, Some((next_wakeup, delay.into())))
}
}
@ -977,6 +1053,8 @@ struct State {
last_popped_seqnum: Option<u16>,
last_popped_pts: Option<gst::ClockTime>,
/* Not affected by PacketLost events */
last_popped_buffer_pts: Option<gst::ClockTime>,
stats: Stats,
@ -1005,6 +1083,7 @@ impl Default for State {
last_popped_seqnum: None,
last_popped_pts: None,
last_popped_buffer_pts: None,
stats: Stats::default(),
@ -1073,9 +1152,14 @@ impl TaskImpl for JitterBufferTask {
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
let jb = self.element.imp();
let (latency, context_wait) = {
let (latency, context_wait, do_lost, max_dropout_time) = {
let settings = jb.settings.lock().unwrap();
(settings.latency, settings.context_wait)
(
settings.latency,
settings.context_wait,
settings.do_lost,
gst::ClockTime::from_mseconds(settings.max_dropout_time as u64),
)
};
loop {
@ -1084,8 +1168,10 @@ impl TaskImpl for JitterBufferTask {
let (_, next_wakeup) = self.src_pad_handler.next_wakeup(
&self.element,
&state,
do_lost,
latency,
context_wait,
max_dropout_time,
);
let (delay_fut, abort_handle) = match next_wakeup {
@ -1121,15 +1207,17 @@ impl TaskImpl for JitterBufferTask {
}
}
let (head_pts, head_seq) = {
let state = jb.state.lock().unwrap();
let (head_pts, head_seq, lost_events) = {
let mut state = jb.state.lock().unwrap();
//
// Check earliest PTS as we have just taken the lock
let (now, next_wakeup) = self.src_pad_handler.next_wakeup(
&self.element,
&state,
do_lost,
latency,
context_wait,
max_dropout_time,
);
gst::debug!(
@ -1150,10 +1238,128 @@ impl TaskImpl for JitterBufferTask {
}
let (head_pts, head_seq) = state.jbuf.peek();
let mut events = vec![];
(head_pts, head_seq)
// We may have woken up in order to push lost events on time
// (see next_packet_wakeup())
if do_lost && state.equidistant > 3 && !state.packet_spacing.is_zero() {
loop {
// Make sure we don't push longer than max_dropout_time
// consecutive PacketLost events
let dropout_time = state
.last_popped_pts
.opt_saturating_sub(state.last_popped_buffer_pts);
if dropout_time.opt_gt(max_dropout_time).unwrap_or(false) {
break;
}
if let Some((lost_seq, lost_pts)) =
state.last_popped_seqnum.and_then(|last| {
if let Some(last_popped_pts) = state.last_popped_pts {
let next = last.wrapping_add(1);
if (last_popped_pts + latency - context_wait / 2)
.opt_lt(now)
.unwrap_or(false)
{
if let Some(earliest) = state.earliest_seqnum {
if next != earliest {
Some((
next,
last_popped_pts + state.packet_spacing,
))
} else {
None
}
} else {
Some((next, last_popped_pts + state.packet_spacing))
}
} else {
None
}
} else {
None
}
})
{
if (lost_pts + latency).opt_lt(now).unwrap_or(false) {
/* We woke up to push the next lost event exactly on time, yet
* clearly we are now too late to do so. This may have happened
* because of a seqnum jump on the input side or some other
* condition, but in any case we want to let the regular
* generate_lost_events method take over, with its lost events
* aggregation logic.
*/
break;
}
if lost_pts.opt_gt(state.earliest_pts).unwrap_or(false) {
/* Don't let our logic carry us too far in the future */
break;
}
let s = gst::Structure::builder("GstRTPPacketLost")
.field("seqnum", lost_seq as u32)
.field("timestamp", lost_pts)
.field("duration", state.packet_spacing)
.field("retry", 0)
.build();
events.push(gst::event::CustomDownstream::new(s));
state.stats.num_lost += 1;
state.last_popped_pts = Some(lost_pts);
state.last_popped_seqnum = Some(lost_seq);
} else {
break;
}
}
}
(head_pts, head_seq, events)
};
{
// Push any lost events we may have woken up to push on schedule
for event in lost_events {
gst::debug!(
CAT,
obj = jb.src_pad.gst_pad(),
"Pushing lost event {:?}",
event
);
let _ = jb.src_pad.push_event(event).await;
}
let state = jb.state.lock().unwrap();
//
// Now recheck earliest PTS as we have just retaken the lock and may
// have advanced last_popped_* fields
let (now, next_wakeup) = self.src_pad_handler.next_wakeup(
&self.element,
&state,
do_lost,
latency,
context_wait,
max_dropout_time,
);
gst::debug!(
CAT,
obj = &self.element,
"Woke up at {}, earliest_pts {}",
now.display(),
state.earliest_pts.display()
);
if let Some((next_wakeup, _)) = next_wakeup {
if next_wakeup.opt_gt(now).unwrap_or(false) {
// Reschedule and wait a bit longer in the next iteration
return Ok(());
}
} else {
return Ok(());
}
}
let res = self.src_pad_handler.pop_and_push(&self.element).await;
{
@ -1162,9 +1368,14 @@ impl TaskImpl for JitterBufferTask {
state.last_res = res;
if head_pts == state.earliest_pts && head_seq == state.earliest_seqnum {
let (earliest_pts, earliest_seqnum) = state.jbuf.find_earliest();
state.earliest_pts = earliest_pts;
state.earliest_seqnum = earliest_seqnum;
if state.jbuf.num_packets() > 0 {
let (earliest_pts, earliest_seqnum) = state.jbuf.find_earliest();
state.earliest_pts = earliest_pts;
state.earliest_seqnum = earliest_seqnum;
} else {
state.earliest_pts = None;
state.earliest_seqnum = None;
}
}
if res.is_ok() {
@ -1173,8 +1384,10 @@ impl TaskImpl for JitterBufferTask {
let (now, next_wakeup) = self.src_pad_handler.next_wakeup(
&self.element,
&state,
do_lost,
latency,
context_wait,
max_dropout_time,
);
if let Some((Some(next_wakeup), _)) = next_wakeup {
if now.is_some_and(|now| next_wakeup > now) {

View file

@ -363,6 +363,10 @@ impl RTPJitterBuffer {
pub fn reset_skew(&self) {
unsafe { ffi::ts_rtp_jitter_buffer_reset_skew(self.to_glib_none().0) }
}
pub fn num_packets(&self) -> u32 {
unsafe { ffi::ts_rtp_jitter_buffer_num_packets(self.to_glib_none().0) }
}
}
impl Default for RTPJitterBuffer {