rtpjitterbuffer: Cancel timers instead of just unlocking loop thread

When the queue is full (and adding more packets would risk a seqnum
roll-over), the best approach is to just start pushing out packets
from the other side.  Just pushing out the packets results in the
timers being left hanging with old seqnums, so it's safer to just
execute them immediately in this case. It does limit the timer space
to the time it takes to receiver about 32k packets, but without
extended sequence number, this is the best RTP can do.

This also results in the test no longer needed to have timeouts or
timers as pushing packets in drives everything.

Fixes #619
This commit is contained in:
Olivier Crête 2019-09-26 18:39:48 -04:00 committed by Nicolas Dufresne
parent 4a9f42430a
commit a24596423a
2 changed files with 15 additions and 39 deletions

View file

@ -2939,8 +2939,15 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
* sequence number, let's allow at least 10k packets in any case. */ * sequence number, let's allow at least 10k packets in any case. */
while (rtp_jitter_buffer_is_full (priv->jbuf) && while (rtp_jitter_buffer_is_full (priv->jbuf) &&
priv->srcresult == GST_FLOW_OK) { priv->srcresult == GST_FLOW_OK) {
RtpTimer *timer = rtp_timer_queue_peek_earliest (priv->timers);
while (timer) {
timer->timeout = -1;
if (timer->type == RTP_TIMER_DEADLINE)
break;
timer = rtp_timer_get_next (timer);
}
update_current_timer (jitterbuffer); update_current_timer (jitterbuffer);
JBUF_SIGNAL_EVENT (priv);
JBUF_WAIT_QUEUE (priv); JBUF_WAIT_QUEUE (priv);
} }
if (priv->srcresult != GST_FLOW_OK) if (priv->srcresult != GST_FLOW_OK)

View file

@ -2647,65 +2647,34 @@ GST_START_TEST (test_performance)
GST_END_TEST; GST_END_TEST;
static gpointer
generate_harness_buffer (gpointer data)
{
GstHarness *h = data;
guint i;
for (i = 32766; i < 41000; i++)
gst_harness_push (h, generate_test_buffer (1000 + i));
return NULL;
}
GST_START_TEST (test_fill_queue) GST_START_TEST (test_fill_queue)
{ {
GstHarness *h = gst_harness_new ("rtpjitterbuffer"); GstHarness *h = gst_harness_new ("rtpjitterbuffer");
const gint num_consecutive = 40000; const gint num_consecutive = 40000;
gint i;
GstSegment segment;
GThread *t;
GstBuffer *buf; GstBuffer *buf;
gint i;
gst_harness_use_testclock (h);
gst_segment_init (&segment, GST_FORMAT_TIME);
gst_harness_set_src_caps (h, generate_caps ()); gst_harness_set_src_caps (h, generate_caps ());
gst_harness_play (h); gst_harness_play (h);
gst_harness_push (h, generate_test_buffer (1000)); gst_harness_push (h, generate_test_buffer (1000));
gst_harness_push (h, generate_test_buffer (1002)); /* Skip 1001 */
for (i = 2; i < num_consecutive; i++)
for (i = 3; i < 32766; i++)
gst_harness_push (h, generate_test_buffer (1000 + i)); gst_harness_push (h, generate_test_buffer (1000 + i));
t = g_thread_new ("fill-queue-test-push", generate_harness_buffer, h);
/* Just give a chance to the thread to start and to try to push one packet */
g_usleep (100 * 1000);
fail_unless (gst_harness_crank_single_clock_wait (h));
buf = gst_harness_pull (h); buf = gst_harness_pull (h);
fail_unless_equals_int (1000, get_rtp_seq_num (buf)); fail_unless_equals_int (1000, get_rtp_seq_num (buf));
gst_buffer_unref (buf); gst_buffer_unref (buf);
/* 1001 is skipped */
/* Gap at 1001 here */ for (i = 2; i < num_consecutive; i++) {
fail_unless (gst_harness_crank_single_clock_wait (h));
buf = gst_harness_pull (h);
fail_unless_equals_int (1002, get_rtp_seq_num (buf));
gst_buffer_unref (buf);
for (i = 3; i < num_consecutive; i++) {
GstBuffer *buf = gst_harness_pull (h); GstBuffer *buf = gst_harness_pull (h);
fail_unless_equals_int (1000 + i, get_rtp_seq_num (buf)); fail_unless_equals_int (1000 + i, get_rtp_seq_num (buf));
gst_buffer_unref (buf); gst_buffer_unref (buf);
} }
g_thread_join (t);
gst_harness_teardown (h); gst_harness_teardown (h);
} }