mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-06-07 07:58:51 +00:00
rtprtxqueue: Implement support for buffer lists
This commit is contained in:
parent
1c27002ebd
commit
57ff27f8c8
1 changed files with 51 additions and 5 deletions
|
@ -64,6 +64,8 @@ static gboolean gst_rtp_rtx_queue_src_event (GstPad * pad, GstObject * parent,
|
||||||
GstEvent * event);
|
GstEvent * event);
|
||||||
static GstFlowReturn gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent,
|
static GstFlowReturn gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent,
|
||||||
GstBuffer * buffer);
|
GstBuffer * buffer);
|
||||||
|
static GstFlowReturn gst_rtp_rtx_queue_chain_list (GstPad * pad,
|
||||||
|
GstObject * parent, GstBufferList * list);
|
||||||
|
|
||||||
static GstStateChangeReturn gst_rtp_rtx_queue_change_state (GstElement *
|
static GstStateChangeReturn gst_rtp_rtx_queue_change_state (GstElement *
|
||||||
element, GstStateChange transition);
|
element, GstStateChange transition);
|
||||||
|
@ -159,6 +161,8 @@ gst_rtp_rtx_queue_init (GstRTPRtxQueue * rtx)
|
||||||
GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad);
|
GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad);
|
||||||
gst_pad_set_chain_function (rtx->sinkpad,
|
gst_pad_set_chain_function (rtx->sinkpad,
|
||||||
GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain));
|
GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain));
|
||||||
|
gst_pad_set_chain_list_function (rtx->sinkpad,
|
||||||
|
GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain_list));
|
||||||
gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
|
gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
|
||||||
|
|
||||||
rtx->queue = g_queue_new ();
|
rtx->queue = g_queue_new ();
|
||||||
|
@ -246,6 +250,16 @@ do_push (GstBuffer * buffer, GstRTPRtxQueue * rtx)
|
||||||
gst_pad_push (rtx->srcpad, buffer);
|
gst_pad_push (rtx->srcpad, buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Must be called with rtx->lock */
|
||||||
|
static void
|
||||||
|
shrink_queue (GstRTPRtxQueue * rtx)
|
||||||
|
{
|
||||||
|
if (rtx->max_size_packets) {
|
||||||
|
while (g_queue_get_length (rtx->queue) > rtx->max_size_packets)
|
||||||
|
gst_buffer_unref (g_queue_pop_tail (rtx->queue));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static GstFlowReturn
|
static GstFlowReturn
|
||||||
gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
|
gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
|
||||||
{
|
{
|
||||||
|
@ -257,11 +271,7 @@ gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
|
||||||
|
|
||||||
g_mutex_lock (&rtx->lock);
|
g_mutex_lock (&rtx->lock);
|
||||||
g_queue_push_head (rtx->queue, gst_buffer_ref (buffer));
|
g_queue_push_head (rtx->queue, gst_buffer_ref (buffer));
|
||||||
|
shrink_queue (rtx);
|
||||||
if (rtx->max_size_packets) {
|
|
||||||
while (g_queue_get_length (rtx->queue) > rtx->max_size_packets)
|
|
||||||
gst_buffer_unref (g_queue_pop_tail (rtx->queue));
|
|
||||||
}
|
|
||||||
|
|
||||||
pending = rtx->pending;
|
pending = rtx->pending;
|
||||||
rtx->pending = NULL;
|
rtx->pending = NULL;
|
||||||
|
@ -275,6 +285,42 @@ gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static gboolean
|
||||||
|
push_to_queue (GstBuffer ** buffer, guint idx, gpointer user_data)
|
||||||
|
{
|
||||||
|
GQueue *queue = user_data;
|
||||||
|
|
||||||
|
g_queue_push_head (queue, gst_buffer_ref (*buffer));
|
||||||
|
|
||||||
|
return TRUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
static GstFlowReturn
|
||||||
|
gst_rtp_rtx_queue_chain_list (GstPad * pad, GstObject * parent,
|
||||||
|
GstBufferList * list)
|
||||||
|
{
|
||||||
|
GstRTPRtxQueue *rtx;
|
||||||
|
GstFlowReturn ret;
|
||||||
|
GList *pending;
|
||||||
|
|
||||||
|
rtx = GST_RTP_RTX_QUEUE (parent);
|
||||||
|
|
||||||
|
g_mutex_lock (&rtx->lock);
|
||||||
|
gst_buffer_list_foreach (list, push_to_queue, rtx->queue);
|
||||||
|
shrink_queue (rtx);
|
||||||
|
|
||||||
|
pending = rtx->pending;
|
||||||
|
rtx->pending = NULL;
|
||||||
|
g_mutex_unlock (&rtx->lock);
|
||||||
|
|
||||||
|
g_list_foreach (pending, (GFunc) do_push, rtx);
|
||||||
|
g_list_free (pending);
|
||||||
|
|
||||||
|
ret = gst_pad_push_list (rtx->srcpad, list);
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
gst_rtp_rtx_queue_get_property (GObject * object,
|
gst_rtp_rtx_queue_get_property (GObject * object,
|
||||||
guint prop_id, GValue * value, GParamSpec * pspec)
|
guint prop_id, GValue * value, GParamSpec * pspec)
|
||||||
|
|
Loading…
Reference in a new issue