rtprtxsend: use a GSequence to implement the buffer queue

This has the advantage that searching the queue to find the
buffer with the requested seqnum is done with binary search.
This commit is contained in:
George Kiagiadakis 2013-11-04 20:05:03 +02:00 committed by Wim Taymans
parent 487fa8c989
commit 51edc07127
2 changed files with 35 additions and 41 deletions

View file

@ -165,8 +165,8 @@ static void
gst_rtp_rtx_send_reset (GstRtpRtxSend * rtx, gboolean full) gst_rtp_rtx_send_reset (GstRtpRtxSend * rtx, gboolean full)
{ {
g_mutex_lock (&rtx->lock); g_mutex_lock (&rtx->lock);
g_queue_foreach (rtx->queue, (GFunc) buffer_queue_item_free, NULL); g_sequence_remove_range (g_sequence_get_begin_iter (rtx->queue),
g_queue_clear (rtx->queue); g_sequence_get_end_iter (rtx->queue));
g_queue_foreach (rtx->pending, (GFunc) gst_buffer_unref, NULL); g_queue_foreach (rtx->pending, (GFunc) gst_buffer_unref, NULL);
g_queue_clear (rtx->pending); g_queue_clear (rtx->pending);
rtx->master_ssrc = 0; rtx->master_ssrc = 0;
@ -183,7 +183,7 @@ gst_rtp_rtx_send_finalize (GObject * object)
GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (object); GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (object);
gst_rtp_rtx_send_reset (rtx, TRUE); gst_rtp_rtx_send_reset (rtx, TRUE);
g_queue_free (rtx->queue); g_sequence_free (rtx->queue);
g_queue_free (rtx->pending); g_queue_free (rtx->pending);
g_mutex_clear (&rtx->lock); g_mutex_clear (&rtx->lock);
@ -215,7 +215,7 @@ gst_rtp_rtx_send_init (GstRtpRtxSend * rtx)
GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_chain)); GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_chain));
gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad); gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
rtx->queue = g_queue_new (); rtx->queue = g_sequence_new ((GDestroyNotify) buffer_queue_item_free);
rtx->pending = g_queue_new (); rtx->pending = g_queue_new ();
g_mutex_init (&rtx->lock); g_mutex_init (&rtx->lock);
@ -241,29 +241,14 @@ choose_ssrc (GstRtpRtxSend * rtx)
return ssrc; return ssrc;
} }
typedef struct static gint
buffer_queue_items_cmp (BufferQueueItem * a, BufferQueueItem * b,
gpointer user_data)
{ {
GstRtpRtxSend *rtx; /* gst_rtp_buffer_compare_seqnum returns the opposite of what we want,
guint seqnum; * it returns negative when seqnum1 > seqnum2 and we want negative
gboolean found; * when b > a, i.e. a is smaller, so it comes first in the sequence */
} RTXData; return gst_rtp_buffer_compare_seqnum (b->seqnum, a->seqnum);
/* traverse queue history and try to find the buffer that the
* requested seqnum */
static void
push_seqnum (BufferQueueItem * item, RTXData * data)
{
GstRtpRtxSend *rtx = data->rtx;
if (data->found)
return;
/* data->seqnum comes from the request */
if (item->seqnum == data->seqnum) {
data->found = TRUE;
GST_DEBUG_OBJECT (rtx, "found %" G_GUINT16_FORMAT, item->seqnum);
g_queue_push_tail (rtx->pending, gst_buffer_ref (item->buffer));
}
} }
static gboolean static gboolean
@ -281,7 +266,6 @@ gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) { if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
guint32 seqnum = 0; guint32 seqnum = 0;
guint ssrc = 0; guint ssrc = 0;
RTXData data;
/* retrieve seqnum of the packet that need to be restransmisted */ /* retrieve seqnum of the packet that need to be restransmisted */
if (!gst_structure_get_uint (s, "seqnum", &seqnum)) if (!gst_structure_get_uint (s, "seqnum", &seqnum))
@ -298,12 +282,20 @@ gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
g_mutex_lock (&rtx->lock); g_mutex_lock (&rtx->lock);
/* check if request is for us */ /* check if request is for us */
if (rtx->master_ssrc == ssrc) { if (rtx->master_ssrc == ssrc) {
GSequenceIter *iter;
BufferQueueItem search_item;
/* update statistics */
++rtx->num_rtx_requests; ++rtx->num_rtx_requests;
data.rtx = rtx;
data.seqnum = seqnum; search_item.seqnum = seqnum;
data.found = FALSE; iter = g_sequence_lookup (rtx->queue, &search_item,
/* TODO do a binary search because rtx->queue is sorted by seq num */ (GCompareDataFunc) buffer_queue_items_cmp, NULL);
g_queue_foreach (rtx->queue, (GFunc) push_seqnum, &data); if (iter) {
BufferQueueItem *item = g_sequence_get (iter);
GST_DEBUG_OBJECT (rtx, "found %" G_GUINT16_FORMAT, item->seqnum);
g_queue_push_tail (rtx->pending, gst_buffer_ref (item->buffer));
}
} }
g_mutex_unlock (&rtx->lock); g_mutex_unlock (&rtx->lock);
@ -326,8 +318,8 @@ gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
rtx->rtx_ssrc = choose_ssrc (rtx); rtx->rtx_ssrc = choose_ssrc (rtx);
/* clear buffers we already saved */ /* clear buffers we already saved */
g_queue_foreach (rtx->queue, (GFunc) gst_buffer_unref, NULL); g_sequence_remove_range (g_sequence_get_begin_iter (rtx->queue),
g_queue_clear (rtx->queue); g_sequence_get_end_iter (rtx->queue));
/* clear buffers that are about to be retransmited */ /* clear buffers that are about to be retransmited */
g_queue_foreach (rtx->pending, (GFunc) gst_buffer_unref, NULL); g_queue_foreach (rtx->pending, (GFunc) gst_buffer_unref, NULL);
@ -394,8 +386,10 @@ gst_rtp_rtx_send_get_ts_diff (GstRtpRtxSend * self)
BufferQueueItem *high_buf, *low_buf; BufferQueueItem *high_buf, *low_buf;
guint32 result; guint32 result;
high_buf = g_queue_peek_head (self->queue); high_buf =
low_buf = g_queue_peek_tail (self->queue); g_sequence_get (g_sequence_iter_prev (g_sequence_get_end_iter
(self->queue)));
low_buf = g_sequence_get (g_sequence_get_begin_iter (self->queue));
if (!high_buf || !low_buf || high_buf == low_buf) if (!high_buf || !low_buf || high_buf == low_buf)
return 0; return 0;
@ -521,16 +515,16 @@ gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
item->seqnum = seqnum; item->seqnum = seqnum;
item->timestamp = rtptime; item->timestamp = rtptime;
item->buffer = gst_buffer_ref (buffer); item->buffer = gst_buffer_ref (buffer);
g_queue_push_head (rtx->queue, item); g_sequence_append (rtx->queue, item);
/* remove oldest packets from history if they are too many */ /* remove oldest packets from history if they are too many */
if (rtx->max_size_packets) { if (rtx->max_size_packets) {
while (g_queue_get_length (rtx->queue) > rtx->max_size_packets) while (g_sequence_get_length (rtx->queue) > rtx->max_size_packets)
buffer_queue_item_free (g_queue_pop_tail (rtx->queue)); g_sequence_remove (g_sequence_get_begin_iter (rtx->queue));
} }
if (rtx->max_size_time) { if (rtx->max_size_time) {
while (gst_rtp_rtx_send_get_ts_diff (rtx) > rtx->max_size_time) while (gst_rtp_rtx_send_get_ts_diff (rtx) > rtx->max_size_time)
buffer_queue_item_free (g_queue_pop_tail (rtx->queue)); g_sequence_remove (g_sequence_get_begin_iter (rtx->queue));
} }
/* within lock, get packets that have to be retransmited */ /* within lock, get packets that have to be retransmited */

View file

@ -48,7 +48,7 @@ struct _GstRtpRtxSend
GMutex lock; GMutex lock;
/* history of rtp packets */ /* history of rtp packets */
GQueue *queue; GSequence *queue;
/* rtp packets that will be pushed upon next buffer */ /* rtp packets that will be pushed upon next buffer */
GQueue *pending; GQueue *pending;