From 5dacf366c078fdddc6bbb03f6c9ab9c6c946156f Mon Sep 17 00:00:00 2001 From: Havard Graff Date: Thu, 19 Mar 2020 23:12:04 +0100 Subject: [PATCH] rtpjitterbuffer: immediately insert a lost-event on multiple lost packets There is a problem with the code today, where a single timer will be scheduled for a series of lost packets, and then if the first packet in that series arrives, it will cause a rescheduling of that timer, going from a "multi"-timer to a single-timer, causing a lot of the packets in that timer to be unaccounted for, and creating a situation in where the jitterbuffer will never again push out another packet. This patch solves the problem by instead of scheduling those lost packets as another timer, it instead asks to have that lost-event pushed straight out. This very much goes with the intent of the code here: These packets are so desperately late that no cure exists, and we might as well get the lost-event out of the way and get on with it. This change has some interesting knock-on effect being presented in later commits. It completely removes the concept of "already-lost", so that is why that test has been disabled in this commit, to be removed later. --- gst/rtpmanager/gstrtpjitterbuffer.c | 33 +++++++++---- tests/check/elements/rtpjitterbuffer.c | 68 +++++++++++++++++++++++++- 2 files changed, 90 insertions(+), 11 deletions(-) diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index 6fee3de567..8c2a0940bc 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -2479,10 +2479,11 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected, GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; GstClockTime duration, expected_pts; gboolean equidistant = priv->equidistant > 0; + GstClockTime last_in_pts = priv->last_in_pts; GST_DEBUG_OBJECT (jitterbuffer, "pts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT, - GST_TIME_ARGS (pts), GST_TIME_ARGS (priv->last_in_pts)); + GST_TIME_ARGS (pts), GST_TIME_ARGS (last_in_pts)); if (pts == GST_CLOCK_TIME_NONE) { GST_WARNING_OBJECT (jitterbuffer, "Have no PTS"); @@ -2492,8 +2493,8 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected, if (equidistant) { GstClockTime total_duration; /* the total duration spanned by the missing packets */ - if (pts >= priv->last_in_pts) - total_duration = pts - priv->last_in_pts; + if (pts >= last_in_pts) + total_duration = pts - last_in_pts; else total_duration = 0; @@ -2530,18 +2531,30 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected, GST_TIME_ARGS (priv->latency_ns), lost_packets, GST_TIME_ARGS (gap_time)); - /* this timer will fire immediately and the lost event will be pushed from - * the timer thread */ + /* this multi-lost-packet event will be inserted directly into the packet-queue + for immediate processing */ if (lost_packets > 0) { - rtp_timer_queue_set_lost (priv->timers, expected, lost_packets, - priv->last_in_pts + duration, gap_time, - timeout_offset (jitterbuffer)); + RtpTimer *timer; + GstClockTime timestamp = + apply_offset (jitterbuffer, last_in_pts + duration); + insert_lost_event (jitterbuffer, expected, lost_packets, timestamp, + gap_time, 0); + + timer = rtp_timer_queue_find (priv->timers, expected); + if (timer && timer->type == RTP_TIMER_EXPECTED) { + if (timer->queued) + rtp_timer_queue_unschedule (priv->timers, timer); + GST_DEBUG_OBJECT (jitterbuffer, "removing timer for seqnum #%u", + expected); + rtp_timer_free (timer); + } + expected += lost_packets; - priv->last_in_pts += gap_time; + last_in_pts += gap_time; } } - expected_pts = priv->last_in_pts + duration; + expected_pts = last_in_pts + duration; } else { /* If we cannot assume equidistant packet spacing, the only thing we now * for sure is that the missing packets have expected pts not later than diff --git a/tests/check/elements/rtpjitterbuffer.c b/tests/check/elements/rtpjitterbuffer.c index a8ce9c3698..fd4d271354 100644 --- a/tests/check/elements/rtpjitterbuffer.c +++ b/tests/check/elements/rtpjitterbuffer.c @@ -3096,6 +3096,71 @@ GST_START_TEST (test_reset_does_not_stall) GST_END_TEST; +typedef struct +{ + guint16 seqnum; + guint32 rtptime; + gint sleep_ms; +} PushBufferCtx; + +GST_START_TEST (test_multiple_lost_do_not_stall) +{ + GstHarness *h = gst_harness_new ("rtpjitterbuffer"); + gint latency_ms = 200; + guint inital_bufs = latency_ms / TEST_BUF_MS; + guint max_dropout_time = 10; + guint16 i; + guint16 seqnum = 1000; + guint32 rtptime = seqnum * TEST_RTP_TS_DURATION; + guint in_queue; + PushBufferCtx bufs[] = { + {1039, 166560, 58}, + {1011, 161280, 1000}, + }; + gint size = G_N_ELEMENTS (bufs); + + gst_harness_use_systemclock (h); + gst_harness_set_src_caps (h, generate_caps ()); + + g_object_set (h->element, "latency", latency_ms, "do-retransmission", TRUE, + "do-lost", TRUE, "rtx-max-retries", 2, + "max-dropout-time", max_dropout_time, NULL); + + /* push initial buffers and pull them out as well */ + for (i = 0; i < inital_bufs; i++) { + seqnum += 1; + rtptime += TEST_RTP_TS_DURATION; + push_test_buffer_now (h, seqnum, rtptime); + g_usleep (G_USEC_PER_SEC / 1000 * 20); + } + for (i = 0; i < inital_bufs; i++) { + gst_buffer_unref (gst_harness_pull (h)); + } + + /* push buffers according to list */ + for (i = 0; i < size; i++) { + push_test_buffer_now (h, bufs[i].seqnum, bufs[i].rtptime); + g_usleep (G_USEC_PER_SEC / 1000 * bufs[i].sleep_ms); + seqnum = MAX (bufs[i].seqnum, seqnum); + } + + in_queue = gst_harness_buffers_in_queue (h); + + /* and then normal buffers again */ + for (i = 0; i < 5; i++) { + seqnum += 1; + push_test_buffer_now (h, seqnum, seqnum * TEST_RTP_TS_DURATION); + g_usleep (G_USEC_PER_SEC / 1000 * 20); + } + + /* we expect at least some of those buffers to come through */ + fail_unless (gst_harness_buffers_in_queue (h) != in_queue); + + gst_harness_teardown (h); +} + +GST_END_TEST; + static Suite * rtpjitterbuffer_suite (void) @@ -3161,11 +3226,12 @@ rtpjitterbuffer_suite (void) tcase_add_test (tc_chain, test_performance); tcase_add_test (tc_chain, test_drop_messages_too_late); - tcase_add_test (tc_chain, test_drop_messages_already_lost); + tcase_skip_broken_test (tc_chain, test_drop_messages_already_lost); tcase_add_test (tc_chain, test_drop_messages_drop_on_latency); tcase_add_test (tc_chain, test_drop_messages_interval); tcase_add_test (tc_chain, test_reset_does_not_stall); + tcase_add_test (tc_chain, test_multiple_lost_do_not_stall); return s; }