rtx: various improvements

Use locking
Don't push from the event handler, collected packets in a queue and push from
the chain function.
Clear queues on shutdown.
This commit is contained in:
Wim Taymans 2013-08-21 16:53:59 +02:00
parent ee15bc9284
commit 89b9019e3e
2 changed files with 50 additions and 2 deletions

View file

@ -98,12 +98,26 @@ gst_rtp_rtx_queue_class_init (GstRTPRtxQueueClass * klass)
GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_change_state); GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_change_state);
} }
static void
gst_rtp_rtx_queue_reset (GstRTPRtxQueue * rtx, gboolean full)
{
g_mutex_lock (&rtx->lock);
g_queue_foreach (rtx->queue, (GFunc) gst_buffer_unref, NULL);
g_queue_clear (rtx->queue);
g_list_foreach (rtx->pending, (GFunc) gst_buffer_unref, NULL);
g_list_free (rtx->pending);
rtx->pending = NULL;
g_mutex_unlock (&rtx->lock);
}
static void static void
gst_rtp_rtx_queue_finalize (GObject * object) gst_rtp_rtx_queue_finalize (GObject * object)
{ {
GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (object); GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (object);
gst_rtp_rtx_queue_reset (rtx, TRUE);
g_queue_free (rtx->queue); g_queue_free (rtx->queue);
g_mutex_clear (&rtx->lock);
G_OBJECT_CLASS (gst_rtp_rtx_queue_parent_class)->finalize (object); G_OBJECT_CLASS (gst_rtp_rtx_queue_parent_class)->finalize (object);
} }
@ -131,14 +145,15 @@ gst_rtp_rtx_queue_init (GstRTPRtxQueue * rtx)
GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain)); GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain));
gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad); gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
rtx->queue = g_queue_new (); rtx->queue = g_queue_new ();
g_mutex_init (&rtx->lock);
} }
typedef struct typedef struct
{ {
GstRTPRtxQueue *rtx; GstRTPRtxQueue *rtx;
guint seqnum; guint seqnum;
gboolean found;
} RTXData; } RTXData;
static void static void
@ -148,6 +163,9 @@ push_seqnum (GstBuffer * buffer, RTXData * data)
GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT; GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
guint16 seqnum; guint16 seqnum;
if (data->found)
return;
if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpbuffer)) if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpbuffer))
return; return;
@ -155,7 +173,9 @@ push_seqnum (GstBuffer * buffer, RTXData * data)
gst_rtp_buffer_unmap (&rtpbuffer); gst_rtp_buffer_unmap (&rtpbuffer);
if (seqnum == data->seqnum) { if (seqnum == data->seqnum) {
gst_pad_push (rtx->srcpad, gst_buffer_ref (buffer)); data->found = TRUE;
GST_DEBUG_OBJECT (rtx, "found %d", seqnum);
rtx->pending = g_list_prepend (rtx->pending, gst_buffer_ref (buffer));
} }
} }
@ -178,9 +198,15 @@ gst_rtp_rtx_queue_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
if (!gst_structure_get_uint (s, "seqnum", &seqnum)) if (!gst_structure_get_uint (s, "seqnum", &seqnum))
seqnum = -1; seqnum = -1;
GST_DEBUG_OBJECT (rtx, "request %d", seqnum);
g_mutex_lock (&rtx->lock);
data.rtx = rtx; data.rtx = rtx;
data.seqnum = seqnum; data.seqnum = seqnum;
data.found = FALSE;
g_queue_foreach (rtx->queue, (GFunc) push_seqnum, &data); g_queue_foreach (rtx->queue, (GFunc) push_seqnum, &data);
g_mutex_unlock (&rtx->lock);
gst_event_unref (event); gst_event_unref (event);
res = TRUE; res = TRUE;
} else { } else {
@ -195,18 +221,32 @@ gst_rtp_rtx_queue_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
return res; return res;
} }
static void
do_push (GstBuffer * buffer, GstRTPRtxQueue * rtx)
{
gst_pad_push (rtx->srcpad, buffer);
}
static GstFlowReturn static GstFlowReturn
gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{ {
GstRTPRtxQueue *rtx; GstRTPRtxQueue *rtx;
GstFlowReturn ret; GstFlowReturn ret;
GList *pending;
rtx = GST_RTP_RTX_QUEUE (parent); rtx = GST_RTP_RTX_QUEUE (parent);
g_mutex_lock (&rtx->lock);
g_queue_push_head (rtx->queue, gst_buffer_ref (buffer)); g_queue_push_head (rtx->queue, gst_buffer_ref (buffer));
while (g_queue_get_length (rtx->queue) > 100) { while (g_queue_get_length (rtx->queue) > 100) {
gst_buffer_unref (g_queue_pop_tail (rtx->queue)); gst_buffer_unref (g_queue_pop_tail (rtx->queue));
} }
pending = rtx->pending;
rtx->pending = NULL;
g_mutex_unlock (&rtx->lock);
g_list_foreach (pending, (GFunc) do_push, rtx);
g_list_free (pending);
ret = gst_pad_push (rtx->srcpad, buffer); ret = gst_pad_push (rtx->srcpad, buffer);
@ -239,6 +279,9 @@ static GstStateChangeReturn
gst_rtp_rtx_queue_change_state (GstElement * element, GstStateChange transition) gst_rtp_rtx_queue_change_state (GstElement * element, GstStateChange transition)
{ {
GstStateChangeReturn ret; GstStateChangeReturn ret;
GstRTPRtxQueue *rtx;
rtx = GST_RTP_RTX_QUEUE (element);
switch (transition) { switch (transition) {
default: default:
@ -250,6 +293,9 @@ gst_rtp_rtx_queue_change_state (GstElement * element, GstStateChange transition)
transition); transition);
switch (transition) { switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_READY:
gst_rtp_rtx_queue_reset (rtx, TRUE);
break;
default: default:
break; break;
} }

View file

@ -52,7 +52,9 @@ struct _GstRTPRtxQueue
GstPad *sinkpad; GstPad *sinkpad;
GstPad *srcpad; GstPad *srcpad;
GMutex lock;
GQueue *queue; GQueue *queue;
GList *pending;
}; };
struct _GstRTPRtxQueueClass struct _GstRTPRtxQueueClass