rtpjitterbuffer: immediately insert a lost-event on multiple lost packets

There is a problem with the code today, where a single timer will
be scheduled for a series of lost packets, and then if the first packet
in that series arrives, it will cause a rescheduling of that timer, going
from a "multi"-timer to a single-timer, causing a lot of the packets
in that timer to be unaccounted for, and creating a situation in where
the jitterbuffer will never again push out another packet.

This patch solves the problem by instead of scheduling those lost packets
as another timer, it instead asks to have that lost-event pushed straight
out.

This very much goes with the intent of the code here: These packets are
so desperately late that no cure exists, and we might as well get the
lost-event out of the way and get on with it.

This change has some interesting knock-on effect being presented in
later commits. It completely removes the concept of "already-lost", so
that is why that test has been disabled in this commit, to be
removed later.
This commit is contained in:
Havard Graff 2020-03-19 23:12:04 +01:00 committed by GStreamer Merge Bot
parent d045b40db9
commit 5dacf366c0
2 changed files with 90 additions and 11 deletions

View file

@ -2479,10 +2479,11 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
GstClockTime duration, expected_pts;
gboolean equidistant = priv->equidistant > 0;
GstClockTime last_in_pts = priv->last_in_pts;
GST_DEBUG_OBJECT (jitterbuffer,
"pts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
GST_TIME_ARGS (pts), GST_TIME_ARGS (priv->last_in_pts));
GST_TIME_ARGS (pts), GST_TIME_ARGS (last_in_pts));
if (pts == GST_CLOCK_TIME_NONE) {
GST_WARNING_OBJECT (jitterbuffer, "Have no PTS");
@ -2492,8 +2493,8 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
if (equidistant) {
GstClockTime total_duration;
/* the total duration spanned by the missing packets */
if (pts >= priv->last_in_pts)
total_duration = pts - priv->last_in_pts;
if (pts >= last_in_pts)
total_duration = pts - last_in_pts;
else
total_duration = 0;
@ -2530,18 +2531,30 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
GST_TIME_ARGS (priv->latency_ns), lost_packets,
GST_TIME_ARGS (gap_time));
/* this timer will fire immediately and the lost event will be pushed from
* the timer thread */
/* this multi-lost-packet event will be inserted directly into the packet-queue
for immediate processing */
if (lost_packets > 0) {
rtp_timer_queue_set_lost (priv->timers, expected, lost_packets,
priv->last_in_pts + duration, gap_time,
timeout_offset (jitterbuffer));
RtpTimer *timer;
GstClockTime timestamp =
apply_offset (jitterbuffer, last_in_pts + duration);
insert_lost_event (jitterbuffer, expected, lost_packets, timestamp,
gap_time, 0);
timer = rtp_timer_queue_find (priv->timers, expected);
if (timer && timer->type == RTP_TIMER_EXPECTED) {
if (timer->queued)
rtp_timer_queue_unschedule (priv->timers, timer);
GST_DEBUG_OBJECT (jitterbuffer, "removing timer for seqnum #%u",
expected);
rtp_timer_free (timer);
}
expected += lost_packets;
priv->last_in_pts += gap_time;
last_in_pts += gap_time;
}
}
expected_pts = priv->last_in_pts + duration;
expected_pts = last_in_pts + duration;
} else {
/* If we cannot assume equidistant packet spacing, the only thing we now
* for sure is that the missing packets have expected pts not later than

View file

@ -3096,6 +3096,71 @@ GST_START_TEST (test_reset_does_not_stall)
GST_END_TEST;
typedef struct
{
guint16 seqnum;
guint32 rtptime;
gint sleep_ms;
} PushBufferCtx;
GST_START_TEST (test_multiple_lost_do_not_stall)
{
GstHarness *h = gst_harness_new ("rtpjitterbuffer");
gint latency_ms = 200;
guint inital_bufs = latency_ms / TEST_BUF_MS;
guint max_dropout_time = 10;
guint16 i;
guint16 seqnum = 1000;
guint32 rtptime = seqnum * TEST_RTP_TS_DURATION;
guint in_queue;
PushBufferCtx bufs[] = {
{1039, 166560, 58},
{1011, 161280, 1000},
};
gint size = G_N_ELEMENTS (bufs);
gst_harness_use_systemclock (h);
gst_harness_set_src_caps (h, generate_caps ());
g_object_set (h->element, "latency", latency_ms, "do-retransmission", TRUE,
"do-lost", TRUE, "rtx-max-retries", 2,
"max-dropout-time", max_dropout_time, NULL);
/* push initial buffers and pull them out as well */
for (i = 0; i < inital_bufs; i++) {
seqnum += 1;
rtptime += TEST_RTP_TS_DURATION;
push_test_buffer_now (h, seqnum, rtptime);
g_usleep (G_USEC_PER_SEC / 1000 * 20);
}
for (i = 0; i < inital_bufs; i++) {
gst_buffer_unref (gst_harness_pull (h));
}
/* push buffers according to list */
for (i = 0; i < size; i++) {
push_test_buffer_now (h, bufs[i].seqnum, bufs[i].rtptime);
g_usleep (G_USEC_PER_SEC / 1000 * bufs[i].sleep_ms);
seqnum = MAX (bufs[i].seqnum, seqnum);
}
in_queue = gst_harness_buffers_in_queue (h);
/* and then normal buffers again */
for (i = 0; i < 5; i++) {
seqnum += 1;
push_test_buffer_now (h, seqnum, seqnum * TEST_RTP_TS_DURATION);
g_usleep (G_USEC_PER_SEC / 1000 * 20);
}
/* we expect at least some of those buffers to come through */
fail_unless (gst_harness_buffers_in_queue (h) != in_queue);
gst_harness_teardown (h);
}
GST_END_TEST;
static Suite *
rtpjitterbuffer_suite (void)
@ -3161,11 +3226,12 @@ rtpjitterbuffer_suite (void)
tcase_add_test (tc_chain, test_performance);
tcase_add_test (tc_chain, test_drop_messages_too_late);
tcase_add_test (tc_chain, test_drop_messages_already_lost);
tcase_skip_broken_test (tc_chain, test_drop_messages_already_lost);
tcase_add_test (tc_chain, test_drop_messages_drop_on_latency);
tcase_add_test (tc_chain, test_drop_messages_interval);
tcase_add_test (tc_chain, test_reset_does_not_stall);
tcase_add_test (tc_chain, test_multiple_lost_do_not_stall);
return s;
}