diff --git a/gst/rtpmanager/gstrtprtxqueue.c b/gst/rtpmanager/gstrtprtxqueue.c index ca299c201b..23fcc2da55 100644 --- a/gst/rtpmanager/gstrtprtxqueue.c +++ b/gst/rtpmanager/gstrtprtxqueue.c @@ -98,12 +98,26 @@ gst_rtp_rtx_queue_class_init (GstRTPRtxQueueClass * klass) GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_change_state); } +static void +gst_rtp_rtx_queue_reset (GstRTPRtxQueue * rtx, gboolean full) +{ + g_mutex_lock (&rtx->lock); + g_queue_foreach (rtx->queue, (GFunc) gst_buffer_unref, NULL); + g_queue_clear (rtx->queue); + g_list_foreach (rtx->pending, (GFunc) gst_buffer_unref, NULL); + g_list_free (rtx->pending); + rtx->pending = NULL; + g_mutex_unlock (&rtx->lock); +} + static void gst_rtp_rtx_queue_finalize (GObject * object) { GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (object); + gst_rtp_rtx_queue_reset (rtx, TRUE); g_queue_free (rtx->queue); + g_mutex_clear (&rtx->lock); G_OBJECT_CLASS (gst_rtp_rtx_queue_parent_class)->finalize (object); } @@ -131,14 +145,15 @@ gst_rtp_rtx_queue_init (GstRTPRtxQueue * rtx) GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain)); gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad); - rtx->queue = g_queue_new (); + g_mutex_init (&rtx->lock); } typedef struct { GstRTPRtxQueue *rtx; guint seqnum; + gboolean found; } RTXData; static void @@ -148,6 +163,9 @@ push_seqnum (GstBuffer * buffer, RTXData * data) GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT; guint16 seqnum; + if (data->found) + return; + if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpbuffer)) return; @@ -155,7 +173,9 @@ push_seqnum (GstBuffer * buffer, RTXData * data) gst_rtp_buffer_unmap (&rtpbuffer); if (seqnum == data->seqnum) { - gst_pad_push (rtx->srcpad, gst_buffer_ref (buffer)); + data->found = TRUE; + GST_DEBUG_OBJECT (rtx, "found %d", seqnum); + rtx->pending = g_list_prepend (rtx->pending, gst_buffer_ref (buffer)); } } @@ -178,9 +198,15 @@ gst_rtp_rtx_queue_src_event (GstPad * pad, GstObject * parent, GstEvent * event) if (!gst_structure_get_uint (s, "seqnum", &seqnum)) seqnum = -1; + GST_DEBUG_OBJECT (rtx, "request %d", seqnum); + + g_mutex_lock (&rtx->lock); data.rtx = rtx; data.seqnum = seqnum; + data.found = FALSE; g_queue_foreach (rtx->queue, (GFunc) push_seqnum, &data); + g_mutex_unlock (&rtx->lock); + gst_event_unref (event); res = TRUE; } else { @@ -195,18 +221,32 @@ gst_rtp_rtx_queue_src_event (GstPad * pad, GstObject * parent, GstEvent * event) return res; } +static void +do_push (GstBuffer * buffer, GstRTPRtxQueue * rtx) +{ + gst_pad_push (rtx->srcpad, buffer); +} + static GstFlowReturn gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) { GstRTPRtxQueue *rtx; GstFlowReturn ret; + GList *pending; rtx = GST_RTP_RTX_QUEUE (parent); + g_mutex_lock (&rtx->lock); g_queue_push_head (rtx->queue, gst_buffer_ref (buffer)); while (g_queue_get_length (rtx->queue) > 100) { gst_buffer_unref (g_queue_pop_tail (rtx->queue)); } + pending = rtx->pending; + rtx->pending = NULL; + g_mutex_unlock (&rtx->lock); + + g_list_foreach (pending, (GFunc) do_push, rtx); + g_list_free (pending); ret = gst_pad_push (rtx->srcpad, buffer); @@ -239,6 +279,9 @@ static GstStateChangeReturn gst_rtp_rtx_queue_change_state (GstElement * element, GstStateChange transition) { GstStateChangeReturn ret; + GstRTPRtxQueue *rtx; + + rtx = GST_RTP_RTX_QUEUE (element); switch (transition) { default: @@ -250,6 +293,9 @@ gst_rtp_rtx_queue_change_state (GstElement * element, GstStateChange transition) transition); switch (transition) { + case GST_STATE_CHANGE_PAUSED_TO_READY: + gst_rtp_rtx_queue_reset (rtx, TRUE); + break; default: break; } diff --git a/gst/rtpmanager/gstrtprtxqueue.h b/gst/rtpmanager/gstrtprtxqueue.h index 258efb300f..671a959178 100644 --- a/gst/rtpmanager/gstrtprtxqueue.h +++ b/gst/rtpmanager/gstrtprtxqueue.h @@ -52,7 +52,9 @@ struct _GstRTPRtxQueue GstPad *sinkpad; GstPad *srcpad; + GMutex lock; GQueue *queue; + GList *pending; }; struct _GstRTPRtxQueueClass