From bf00ee46de8507af084cbf0f664fa5b6634164c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20Cr=C3=AAte?= Date: Fri, 11 Jan 2019 17:53:43 -0500 Subject: [PATCH] rtpjitterbuffer: Limit size to 2^15 packets If it goes over 2^15 packets, it will think it has rolled over and start dropping all packets. So make sure the seqnum distance is not too big. But let's not limit it to a number that is too small to avoid emptying it needlessly if there is a spurious huge sequence number, let's allow at least 10k packets in any case. --- gst/rtpmanager/gstrtpjitterbuffer.c | 34 +++++++++++++++++++++++ gst/rtpmanager/rtpjitterbuffer.c | 43 +++++++++++++++++++++++++++++ gst/rtpmanager/rtpjitterbuffer.h | 1 + 3 files changed, 78 insertions(+) diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index 9909af3292..5ec9eeeedd 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -200,6 +200,20 @@ enum (g_mutex_unlock (&(priv)->jbuf_lock)); \ } G_STMT_END +#define JBUF_WAIT_QUEUE(priv) G_STMT_START { \ + GST_DEBUG ("waiting queue"); \ + (priv)->waiting_queue++; \ + g_cond_wait (&(priv)->jbuf_queue, &(priv)->jbuf_lock); \ + (priv)->waiting_queue--; \ + GST_DEBUG ("waiting queue done"); \ +} G_STMT_END +#define JBUF_SIGNAL_QUEUE(priv) G_STMT_START { \ + if (G_UNLIKELY ((priv)->waiting_queue)) { \ + GST_DEBUG ("signal queue, %d waiters", (priv)->waiting_queue); \ + g_cond_signal (&(priv)->jbuf_queue); \ + } \ +} G_STMT_END + #define JBUF_WAIT_TIMER(priv) G_STMT_START { \ GST_DEBUG ("waiting timer"); \ (priv)->waiting_timer++; \ @@ -263,6 +277,8 @@ struct _GstRtpJitterBufferPrivate RTPJitterBuffer *jbuf; GMutex jbuf_lock; + gboolean waiting_queue; + GCond jbuf_queue; gboolean waiting_timer; GCond jbuf_timer; gboolean waiting_event; @@ -1025,6 +1041,7 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer) priv->rtx_stats_timers = timer_queue_new (); priv->jbuf = rtp_jitter_buffer_new (); g_mutex_init (&priv->jbuf_lock); + g_cond_init (&priv->jbuf_queue); g_cond_init (&priv->jbuf_timer); g_cond_init (&priv->jbuf_event); g_cond_init (&priv->jbuf_query); @@ -1130,6 +1147,7 @@ gst_rtp_jitter_buffer_finalize (GObject * object) g_array_free (priv->timers, TRUE); timer_queue_free (priv->rtx_stats_timers); g_mutex_clear (&priv->jbuf_lock); + g_cond_clear (&priv->jbuf_queue); g_cond_clear (&priv->jbuf_timer); g_cond_clear (&priv->jbuf_event); g_cond_clear (&priv->jbuf_query); @@ -1578,6 +1596,7 @@ gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer) /* this unblocks any waiting pops on the src pad task */ JBUF_SIGNAL_EVENT (priv); JBUF_SIGNAL_QUERY (priv, FALSE); + JBUF_SIGNAL_QUEUE (priv); JBUF_UNLOCK (priv); } @@ -1683,6 +1702,7 @@ gst_rtp_jitter_buffer_change_state (GstElement * element, /* block until we go to PLAYING */ priv->blocked = TRUE; priv->timer_running = TRUE; + priv->srcresult = GST_FLOW_OK; priv->timer_thread = g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer); JBUF_UNLOCK (priv); @@ -1721,9 +1741,11 @@ gst_rtp_jitter_buffer_change_state (GstElement * element, JBUF_LOCK (priv); gst_buffer_replace (&priv->last_sr, NULL); priv->timer_running = FALSE; + priv->srcresult = GST_FLOW_FLUSHING; unschedule_current_timer (jitterbuffer); JBUF_SIGNAL_TIMER (priv); JBUF_SIGNAL_QUERY (priv, FALSE); + JBUF_SIGNAL_QUEUE (priv); JBUF_UNLOCK (priv); g_thread_join (priv->timer_thread); priv->timer_thread = NULL; @@ -3131,6 +3153,17 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, timer->num_rtx_received++; } + /* At 2^15, we would detect a seqnum rollover too early, therefore + * limit the queue size. But let's not limit it to a number that is + * too small to avoid emptying it needlessly if there is a spurious huge + * sequence number, let's allow at least 10k packets in any case. */ + while (rtp_jitter_buffer_get_seqnum_diff (priv->jbuf) >= 32765 && + rtp_jitter_buffer_num_packets (priv->jbuf) > 10000 && + priv->srcresult == GST_FLOW_OK) + JBUF_WAIT_QUEUE (priv); + if (priv->srcresult != GST_FLOW_OK) + goto out_flushing; + /* let's check if this buffer is too late, we can only accept packets with * bigger seqnum than the one we last pushed. */ if (G_LIKELY (priv->last_popped_seqnum != -1)) { @@ -4190,6 +4223,7 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer) JBUF_LOCK_CHECK (priv, flushing); do { result = handle_next_buffer (jitterbuffer); + JBUF_SIGNAL_QUEUE (priv); if (G_LIKELY (result == GST_FLOW_WAIT)) { /* now wait for the next event */ JBUF_WAIT_EVENT (priv, flushing); diff --git a/gst/rtpmanager/rtpjitterbuffer.c b/gst/rtpmanager/rtpjitterbuffer.c index f1187e2b69..0308a538fe 100644 --- a/gst/rtpmanager/rtpjitterbuffer.c +++ b/gst/rtpmanager/rtpjitterbuffer.c @@ -1227,6 +1227,49 @@ rtp_jitter_buffer_get_ts_diff (RTPJitterBuffer * jbuf) return result; } + +/** + * rtp_jitter_buffer_get_seqnum_diff: + * @jbuf: an #RTPJitterBuffer + * + * Get the difference between the seqnum of first and last packet in the + * jitterbuffer. + * + * Returns: The difference expressed in seqnum. + */ +guint16 +rtp_jitter_buffer_get_seqnum_diff (RTPJitterBuffer * jbuf) +{ + guint32 high_seqnum, low_seqnum; + RTPJitterBufferItem *high_buf, *low_buf; + guint16 result; + + g_return_val_if_fail (jbuf != NULL, 0); + + high_buf = (RTPJitterBufferItem *) g_queue_peek_tail_link (jbuf->packets); + low_buf = (RTPJitterBufferItem *) g_queue_peek_head_link (jbuf->packets); + + while (high_buf && high_buf->seqnum == -1) + high_buf = (RTPJitterBufferItem *) high_buf->prev; + + while (low_buf && low_buf->seqnum == -1) + low_buf = (RTPJitterBufferItem *) low_buf->next; + + if (!high_buf || !low_buf || high_buf == low_buf) + return 0; + + high_seqnum = high_buf->seqnum; + low_seqnum = low_buf->seqnum; + + /* it needs to work if ts wraps */ + if (high_seqnum >= low_seqnum) { + result = (guint32) (high_seqnum - low_seqnum); + } else { + result = (guint32) (high_seqnum + G_MAXUINT16 + 1 - low_seqnum); + } + return result; +} + /** * rtp_jitter_buffer_get_sync: * @jbuf: an #RTPJitterBuffer diff --git a/gst/rtpmanager/rtpjitterbuffer.h b/gst/rtpmanager/rtpjitterbuffer.h index 16db644720..b65b6038e9 100644 --- a/gst/rtpmanager/rtpjitterbuffer.h +++ b/gst/rtpmanager/rtpjitterbuffer.h @@ -184,6 +184,7 @@ gint rtp_jitter_buffer_get_percent (RTPJitterBuffer * jbuf guint rtp_jitter_buffer_num_packets (RTPJitterBuffer *jbuf); guint32 rtp_jitter_buffer_get_ts_diff (RTPJitterBuffer *jbuf); +guint16 rtp_jitter_buffer_get_seqnum_diff (RTPJitterBuffer * jbuf); void rtp_jitter_buffer_get_sync (RTPJitterBuffer *jbuf, guint64 *rtptime, guint64 *timestamp, guint32 *clock_rate,