rtpjitterbuffer: Limit size to 2^15 packets

If it goes over 2^15 packets, it will think it has rolled over
and start dropping all packets. So make sure the seqnum distance is not too big.

But let's not limit it to a number that is too small to avoid emptying it
needlessly if there is a spurious huge sequence number, let's allow at
least 10k packets in any case.
This commit is contained in:
Olivier Crête 2019-01-11 17:53:43 -05:00 committed by Olivier Crête
parent 086bad4643
commit bf00ee46de
3 changed files with 78 additions and 0 deletions

View file

@ -200,6 +200,20 @@ enum
(g_mutex_unlock (&(priv)->jbuf_lock)); \
} G_STMT_END
#define JBUF_WAIT_QUEUE(priv) G_STMT_START { \
GST_DEBUG ("waiting queue"); \
(priv)->waiting_queue++; \
g_cond_wait (&(priv)->jbuf_queue, &(priv)->jbuf_lock); \
(priv)->waiting_queue--; \
GST_DEBUG ("waiting queue done"); \
} G_STMT_END
#define JBUF_SIGNAL_QUEUE(priv) G_STMT_START { \
if (G_UNLIKELY ((priv)->waiting_queue)) { \
GST_DEBUG ("signal queue, %d waiters", (priv)->waiting_queue); \
g_cond_signal (&(priv)->jbuf_queue); \
} \
} G_STMT_END
#define JBUF_WAIT_TIMER(priv) G_STMT_START { \
GST_DEBUG ("waiting timer"); \
(priv)->waiting_timer++; \
@ -263,6 +277,8 @@ struct _GstRtpJitterBufferPrivate
RTPJitterBuffer *jbuf;
GMutex jbuf_lock;
gboolean waiting_queue;
GCond jbuf_queue;
gboolean waiting_timer;
GCond jbuf_timer;
gboolean waiting_event;
@ -1025,6 +1041,7 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
priv->rtx_stats_timers = timer_queue_new ();
priv->jbuf = rtp_jitter_buffer_new ();
g_mutex_init (&priv->jbuf_lock);
g_cond_init (&priv->jbuf_queue);
g_cond_init (&priv->jbuf_timer);
g_cond_init (&priv->jbuf_event);
g_cond_init (&priv->jbuf_query);
@ -1130,6 +1147,7 @@ gst_rtp_jitter_buffer_finalize (GObject * object)
g_array_free (priv->timers, TRUE);
timer_queue_free (priv->rtx_stats_timers);
g_mutex_clear (&priv->jbuf_lock);
g_cond_clear (&priv->jbuf_queue);
g_cond_clear (&priv->jbuf_timer);
g_cond_clear (&priv->jbuf_event);
g_cond_clear (&priv->jbuf_query);
@ -1578,6 +1596,7 @@ gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
/* this unblocks any waiting pops on the src pad task */
JBUF_SIGNAL_EVENT (priv);
JBUF_SIGNAL_QUERY (priv, FALSE);
JBUF_SIGNAL_QUEUE (priv);
JBUF_UNLOCK (priv);
}
@ -1683,6 +1702,7 @@ gst_rtp_jitter_buffer_change_state (GstElement * element,
/* block until we go to PLAYING */
priv->blocked = TRUE;
priv->timer_running = TRUE;
priv->srcresult = GST_FLOW_OK;
priv->timer_thread =
g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
JBUF_UNLOCK (priv);
@ -1721,9 +1741,11 @@ gst_rtp_jitter_buffer_change_state (GstElement * element,
JBUF_LOCK (priv);
gst_buffer_replace (&priv->last_sr, NULL);
priv->timer_running = FALSE;
priv->srcresult = GST_FLOW_FLUSHING;
unschedule_current_timer (jitterbuffer);
JBUF_SIGNAL_TIMER (priv);
JBUF_SIGNAL_QUERY (priv, FALSE);
JBUF_SIGNAL_QUEUE (priv);
JBUF_UNLOCK (priv);
g_thread_join (priv->timer_thread);
priv->timer_thread = NULL;
@ -3131,6 +3153,17 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
timer->num_rtx_received++;
}
/* At 2^15, we would detect a seqnum rollover too early, therefore
* limit the queue size. But let's not limit it to a number that is
* too small to avoid emptying it needlessly if there is a spurious huge
* sequence number, let's allow at least 10k packets in any case. */
while (rtp_jitter_buffer_get_seqnum_diff (priv->jbuf) >= 32765 &&
rtp_jitter_buffer_num_packets (priv->jbuf) > 10000 &&
priv->srcresult == GST_FLOW_OK)
JBUF_WAIT_QUEUE (priv);
if (priv->srcresult != GST_FLOW_OK)
goto out_flushing;
/* let's check if this buffer is too late, we can only accept packets with
* bigger seqnum than the one we last pushed. */
if (G_LIKELY (priv->last_popped_seqnum != -1)) {
@ -4190,6 +4223,7 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
JBUF_LOCK_CHECK (priv, flushing);
do {
result = handle_next_buffer (jitterbuffer);
JBUF_SIGNAL_QUEUE (priv);
if (G_LIKELY (result == GST_FLOW_WAIT)) {
/* now wait for the next event */
JBUF_WAIT_EVENT (priv, flushing);

View file

@ -1227,6 +1227,49 @@ rtp_jitter_buffer_get_ts_diff (RTPJitterBuffer * jbuf)
return result;
}
/**
* rtp_jitter_buffer_get_seqnum_diff:
* @jbuf: an #RTPJitterBuffer
*
* Get the difference between the seqnum of first and last packet in the
* jitterbuffer.
*
* Returns: The difference expressed in seqnum.
*/
guint16
rtp_jitter_buffer_get_seqnum_diff (RTPJitterBuffer * jbuf)
{
guint32 high_seqnum, low_seqnum;
RTPJitterBufferItem *high_buf, *low_buf;
guint16 result;
g_return_val_if_fail (jbuf != NULL, 0);
high_buf = (RTPJitterBufferItem *) g_queue_peek_tail_link (jbuf->packets);
low_buf = (RTPJitterBufferItem *) g_queue_peek_head_link (jbuf->packets);
while (high_buf && high_buf->seqnum == -1)
high_buf = (RTPJitterBufferItem *) high_buf->prev;
while (low_buf && low_buf->seqnum == -1)
low_buf = (RTPJitterBufferItem *) low_buf->next;
if (!high_buf || !low_buf || high_buf == low_buf)
return 0;
high_seqnum = high_buf->seqnum;
low_seqnum = low_buf->seqnum;
/* it needs to work if ts wraps */
if (high_seqnum >= low_seqnum) {
result = (guint32) (high_seqnum - low_seqnum);
} else {
result = (guint32) (high_seqnum + G_MAXUINT16 + 1 - low_seqnum);
}
return result;
}
/**
* rtp_jitter_buffer_get_sync:
* @jbuf: an #RTPJitterBuffer

View file

@ -184,6 +184,7 @@ gint rtp_jitter_buffer_get_percent (RTPJitterBuffer * jbuf
guint rtp_jitter_buffer_num_packets (RTPJitterBuffer *jbuf);
guint32 rtp_jitter_buffer_get_ts_diff (RTPJitterBuffer *jbuf);
guint16 rtp_jitter_buffer_get_seqnum_diff (RTPJitterBuffer * jbuf);
void rtp_jitter_buffer_get_sync (RTPJitterBuffer *jbuf, guint64 *rtptime,
guint64 *timestamp, guint32 *clock_rate,