rtpjittterbuffer: Port from TimerQueue to RtpTimerQueue

This commit is contained in:
Nicolas Dufresne 2019-06-14 14:29:36 -04:00 committed by Olivier Crête
parent f5e3280dbe
commit d4b2231de2

View file

@ -265,12 +265,6 @@ enum
#define GST_BUFFER_IS_RETRANSMISSION(buffer) \ #define GST_BUFFER_IS_RETRANSMISSION(buffer) \
GST_BUFFER_FLAG_IS_SET (buffer, GST_RTP_BUFFER_FLAG_RETRANSMISSION) GST_BUFFER_FLAG_IS_SET (buffer, GST_RTP_BUFFER_FLAG_RETRANSMISSION)
typedef struct TimerQueue
{
GQueue *timers;
GHashTable *hashtable;
} TimerQueue;
struct _GstRtpJitterBufferPrivate struct _GstRtpJitterBufferPrivate
{ {
GstPad *sinkpad, *srcpad; GstPad *sinkpad, *srcpad;
@ -340,7 +334,7 @@ struct _GstRtpJitterBufferPrivate
guint32 next_in_seqnum; guint32 next_in_seqnum;
GArray *timers; GArray *timers;
TimerQueue *rtx_stats_timers; RtpTimerQueue *rtx_stats_timers;
/* start and stop ranges */ /* start and stop ranges */
GstClockTime npt_start; GstClockTime npt_start;
@ -495,9 +489,6 @@ static GstStructure *gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer *
static void update_rtx_stats (GstRtpJitterBuffer * jitterbuffer, static void update_rtx_stats (GstRtpJitterBuffer * jitterbuffer,
const RtpTimer * timer, GstClockTime dts, gboolean success); const RtpTimer * timer, GstClockTime dts, gboolean success);
static TimerQueue *timer_queue_new (void);
static void timer_queue_free (TimerQueue * queue);
static void static void
gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass) gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
{ {
@ -966,7 +957,7 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
priv->avg_jitter = 0; priv->avg_jitter = 0;
priv->segment_seqnum = GST_SEQNUM_INVALID; priv->segment_seqnum = GST_SEQNUM_INVALID;
priv->timers = g_array_new (FALSE, TRUE, sizeof (RtpTimer)); priv->timers = g_array_new (FALSE, TRUE, sizeof (RtpTimer));
priv->rtx_stats_timers = timer_queue_new (); priv->rtx_stats_timers = rtp_timer_queue_new ();
priv->jbuf = rtp_jitter_buffer_new (); priv->jbuf = rtp_jitter_buffer_new ();
g_mutex_init (&priv->jbuf_lock); g_mutex_init (&priv->jbuf_lock);
g_cond_init (&priv->jbuf_queue); g_cond_init (&priv->jbuf_queue);
@ -1051,7 +1042,7 @@ gst_rtp_jitter_buffer_finalize (GObject * object)
priv = jitterbuffer->priv; priv = jitterbuffer->priv;
g_array_free (priv->timers, TRUE); g_array_free (priv->timers, TRUE);
timer_queue_free (priv->rtx_stats_timers); g_object_unref (priv->rtx_stats_timers);
g_mutex_clear (&priv->jbuf_lock); g_mutex_clear (&priv->jbuf_lock);
g_cond_clear (&priv->jbuf_queue); g_cond_clear (&priv->jbuf_queue);
g_cond_clear (&priv->jbuf_timer); g_cond_clear (&priv->jbuf_timer);
@ -1989,68 +1980,6 @@ apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
return timestamp; return timestamp;
} }
static TimerQueue *
timer_queue_new (void)
{
TimerQueue *queue;
queue = g_slice_new (TimerQueue);
queue->timers = g_queue_new ();
queue->hashtable = g_hash_table_new (NULL, NULL);
return queue;
}
static void
timer_queue_free (TimerQueue * queue)
{
if (!queue)
return;
g_hash_table_destroy (queue->hashtable);
g_queue_free_full (queue->timers, g_free);
g_slice_free (TimerQueue, queue);
}
static void
timer_queue_append (TimerQueue * queue, const RtpTimer * timer,
GstClockTime timeout, gboolean lost)
{
RtpTimer *copy;
copy = g_memdup (timer, sizeof (*timer));
copy->timeout = timeout;
copy->type = lost ? RTP_TIMER_LOST : RTP_TIMER_EXPECTED;
copy->idx = -1;
GST_LOG ("Append rtx-stats timer #%d, %" GST_TIME_FORMAT,
copy->seqnum, GST_TIME_ARGS (copy->timeout));
g_queue_push_tail (queue->timers, copy);
g_hash_table_insert (queue->hashtable, GINT_TO_POINTER (copy->seqnum), copy);
}
static void
timer_queue_clear_until (TimerQueue * queue, GstClockTime timeout)
{
RtpTimer *test;
test = g_queue_peek_head (queue->timers);
while (test && test->timeout < timeout) {
GST_LOG ("Pop rtx-stats timer #%d, %" GST_TIME_FORMAT " < %"
GST_TIME_FORMAT, test->seqnum, GST_TIME_ARGS (test->timeout),
GST_TIME_ARGS (timeout));
g_hash_table_remove (queue->hashtable, GINT_TO_POINTER (test->seqnum));
g_free (g_queue_pop_head (queue->timers));
test = g_queue_peek_head (queue->timers);
}
}
static RtpTimer *
timer_queue_find (TimerQueue * queue, guint16 seqnum)
{
return g_hash_table_lookup (queue->hashtable, GINT_TO_POINTER (seqnum));
}
static RtpTimer * static RtpTimer *
find_timer (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum) find_timer (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
{ {
@ -2361,13 +2290,15 @@ update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
} }
if (!is_rtx || timer->num_rtx_retry > 1) { if (!is_rtx || timer->num_rtx_retry > 1) {
RtpTimer *stats_timer = rtp_timer_dup (timer);
/* Store timer in order to record stats when/if the retransmitted /* Store timer in order to record stats when/if the retransmitted
* packet arrives. We should also store timer information if we've * packet arrives. We should also store timer information if we've
* requested retransmission more than once since we may receive * requested retransmission more than once since we may receive
* several retransmitted packets. For accuracy we should update the * several retransmitted packets. For accuracy we should update the
* stats also when the redundant retransmitted packets arrives. */ * stats also when the redundant retransmitted packets arrives. */
timer_queue_append (priv->rtx_stats_timers, timer, stats_timer->timeout = pts + priv->rtx_stats_timeout * GST_MSECOND;
pts + priv->rtx_stats_timeout * GST_MSECOND, FALSE); stats_timer->type = RTP_TIMER_EXPECTED;
rtp_timer_queue_insert (priv->rtx_stats_timers, stats_timer);
} }
} }
} }
@ -3101,7 +3032,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
timer = find_timer (jitterbuffer, seqnum); timer = find_timer (jitterbuffer, seqnum);
if (GST_BUFFER_IS_RETRANSMISSION (buffer)) { if (GST_BUFFER_IS_RETRANSMISSION (buffer)) {
if (!timer) if (!timer)
timer = timer_queue_find (priv->rtx_stats_timers, seqnum); timer = rtp_timer_queue_find (priv->rtx_stats_timers, seqnum);
if (timer) if (timer)
timer->num_rtx_received++; timer->num_rtx_received++;
} }
@ -3931,9 +3862,12 @@ do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
rtp_jitter_buffer_free_item (item); rtp_jitter_buffer_free_item (item);
if (GST_CLOCK_TIME_IS_VALID (timer->rtx_last)) { if (GST_CLOCK_TIME_IS_VALID (timer->rtx_last)) {
RtpTimer *stats_timer = rtp_timer_dup (timer);
/* Store info to update stats if the packet arrives too late */ /* Store info to update stats if the packet arrives too late */
timer_queue_append (priv->rtx_stats_timers, timer, /* TODO this one could be zero-copy when the timers array is replaced */
now + priv->rtx_stats_timeout * GST_MSECOND, TRUE); stats_timer->timeout = now + priv->rtx_stats_timeout * GST_MSECOND;
stats_timer->type = RTP_TIMER_LOST;
rtp_timer_queue_insert (priv->rtx_stats_timers, stats_timer);
} }
remove_timer (jitterbuffer, timer); remove_timer (jitterbuffer, timer);
@ -4046,7 +3980,7 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
/* Clear expired rtx-stats timers */ /* Clear expired rtx-stats timers */
if (priv->do_retransmission) if (priv->do_retransmission)
timer_queue_clear_until (priv->rtx_stats_timers, now); rtp_timer_queue_remove_until (priv->rtx_stats_timers, now);
/* Iterate "normal" timers */ /* Iterate "normal" timers */
len = priv->timers->len; len = priv->timers->len;