From 15b694fc7a029ee18354af9458b4c43a52bd8fc8 Mon Sep 17 00:00:00 2001 From: Thijs Vermeir Date: Sun, 22 Jun 2008 19:19:35 +0000 Subject: [PATCH] plugins/elements/gstmultiqueue.c: Add functionality to extra-size-buffers property. Original commit message from CVS: * plugins/elements/gstmultiqueue.c: Add functionality to extra-size-buffers property. --- ChangeLog | 5 ++ plugins/elements/gstmultiqueue.c | 115 ++++++++++++++----------------- 2 files changed, 57 insertions(+), 63 deletions(-) diff --git a/ChangeLog b/ChangeLog index 83b8f5c6c4..730b03c758 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,8 @@ +2008-06-22 Thijs Vermeir + + * plugins/elements/gstmultiqueue.c: + Add functionality to extra-size-buffers property. + 2008-06-22 Thijs Vermeir * plugins/elements/gstmultiqueue.c: diff --git a/plugins/elements/gstmultiqueue.c b/plugins/elements/gstmultiqueue.c index 291f136bd9..abe8c18382 100644 --- a/plugins/elements/gstmultiqueue.c +++ b/plugins/elements/gstmultiqueue.c @@ -2,6 +2,7 @@ * Copyright (C) 2006 Edward Hervey * Copyright (C) 2007 Jan Schmidt * Copyright (C) 2007 Wim Taymans + * Copyright (C) 2008 Thijs Vermeir * * gstmultiqueue.c: * @@ -218,11 +219,11 @@ enum }; #define GST_MULTI_QUEUE_MUTEX_LOCK(q) G_STMT_START { \ - g_mutex_lock (q->qlock); \ + g_mutex_lock (q->qlock); \ } G_STMT_END #define GST_MULTI_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \ - g_mutex_unlock (q->qlock); \ + g_mutex_unlock (q->qlock); \ } G_STMT_END static void gst_multi_queue_finalize (GObject * object); @@ -252,7 +253,8 @@ gst_multi_queue_base_init (gpointer g_class) gst_element_class_set_details_simple (gstelement_class, "MultiQueue", - "Generic", "Multiple data queue", "Edward Hervey "); + "Generic", "Multiple data queue", "Edward Hervey \n" + "Thijs Vermeir "); gst_element_class_add_pad_template (gstelement_class, gst_static_pad_template_get (&sinktemplate)); gst_element_class_add_pad_template (gstelement_class, @@ -379,13 +381,14 @@ gst_multi_queue_finalize (GObject * object) G_OBJECT_CLASS (parent_class)->finalize (object); } -#define SET_CHILD_PROPERTY(mq,format) G_STMT_START { \ - GList * tmp = mq->queues; \ - while (tmp) { \ - GstSingleQueue *q = (GstSingleQueue*)tmp->data; \ +#define SET_CHILD_PROPERTY(mq,format) G_STMT_START { \ + GList * tmp = mq->queues; \ + while (tmp) { \ + GstSingleQueue *q = (GstSingleQueue*)tmp->data; \ q->max_size.format = mq->max_size.format; \ - tmp = g_list_next(tmp); \ - }; \ + q->extra_size.format = mq->extra_size.format; \ + tmp = g_list_next(tmp); \ + }; \ } G_STMT_END static void @@ -394,38 +397,37 @@ gst_multi_queue_set_property (GObject * object, guint prop_id, { GstMultiQueue *mq = GST_MULTI_QUEUE (object); + GST_MULTI_QUEUE_MUTEX_LOCK (mq); switch (prop_id) { case ARG_MAX_SIZE_BYTES: - GST_MULTI_QUEUE_MUTEX_LOCK (mq); mq->max_size.bytes = g_value_get_uint (value); SET_CHILD_PROPERTY (mq, bytes); - GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); break; case ARG_MAX_SIZE_BUFFERS: - GST_MULTI_QUEUE_MUTEX_LOCK (mq); mq->max_size.visible = g_value_get_uint (value); SET_CHILD_PROPERTY (mq, visible); - GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); break; case ARG_MAX_SIZE_TIME: - GST_MULTI_QUEUE_MUTEX_LOCK (mq); mq->max_size.time = g_value_get_uint64 (value); SET_CHILD_PROPERTY (mq, time); - GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); break; case ARG_EXTRA_SIZE_BYTES: mq->extra_size.bytes = g_value_get_uint (value); + SET_CHILD_PROPERTY (mq, bytes); break; case ARG_EXTRA_SIZE_BUFFERS: mq->extra_size.visible = g_value_get_uint (value); + SET_CHILD_PROPERTY (mq, visible); break; case ARG_EXTRA_SIZE_TIME: mq->extra_size.time = g_value_get_uint64 (value); + SET_CHILD_PROPERTY (mq, time); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); } static void @@ -501,7 +503,6 @@ no_parent: } } - /* * GstElement methods */ @@ -600,6 +601,7 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush) sq->srcresult = GST_FLOW_OK; sq->cur_time = 0; sq->max_size.visible = mq->max_size.visible; + sq->extra_size.visible = mq->extra_size.visible; sq->is_eos = FALSE; sq->inextra = FALSE; sq->nextid = 0; @@ -1279,61 +1281,48 @@ single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq) GstMultiQueue *mq = sq->mqueue; GList *tmp; GstDataQueueSize size; - gboolean filled = FALSE; + gboolean filled = TRUE; gst_data_queue_get_level (sq->queue, &size); GST_LOG_OBJECT (mq, "Single Queue %d is full", sq->id); GST_MULTI_QUEUE_MUTEX_LOCK (mq); - for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { - GstSingleQueue *ssq = (GstSingleQueue *) tmp->data; - GstDataQueueSize ssize; - GST_LOG_OBJECT (mq, "Checking Queue %d", ssq->id); + /* if we have reached max visible we can maybe bump this + * if another queue is empty, skip this if we can't grow anymore + */ + if (IS_FILLED (visible, size.visible) && sq->extra_size.visible > 0) { + for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { + GstSingleQueue *ssq = (GstSingleQueue *) tmp->data; - if (gst_data_queue_is_empty (ssq->queue)) { - GST_LOG_OBJECT (mq, "Queue %d is empty", ssq->id); - if (IS_FILLED (visible, size.visible)) { - sq->max_size.visible = size.visible + 1; + if (ssq == sq) + continue; + + if (gst_data_queue_is_empty (ssq->queue)) { + sq->max_size.visible++; + sq->extra_size.visible--; GST_DEBUG_OBJECT (mq, - "Another queue is empty, bumping single queue %d max visible to %d", - sq->id, sq->max_size.visible); + "queue %d is empty, bumping single queue %d max visible to %d", + ssq->id, sq->id, sq->max_size.visible); + break; } - GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); - goto beach; - } - /* check if we reached the hard time/bytes limits */ - gst_data_queue_get_level (ssq->queue, &ssize); - - GST_DEBUG_OBJECT (mq, - "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%" - G_GUINT64_FORMAT, ssq->id, ssize.visible, sq->max_size.visible, - ssize.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time); - - /* if this queue is filled completely we must signal overrun */ - if (IS_FILLED (bytes, ssize.bytes) || IS_FILLED (time, sq->cur_time)) { - GST_LOG_OBJECT (mq, "Queue %d is filled", ssq->id); - filled = TRUE; } + /* check if the queue is still full */ + filled = gst_data_queue_is_full (sq->queue); } - /* no queues were empty */ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); - /* Overrun is always forwarded, since this is blocking the upstream element */ if (filled) { GST_DEBUG_OBJECT (mq, "A queue is filled, signalling overrun"); g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_OVERRUN], 0); } - -beach: - return; } static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq) { - gboolean empty = TRUE; + gboolean all_empty = TRUE; GstMultiQueue *mq = sq->mqueue; GList *tmp; @@ -1342,26 +1331,26 @@ single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq) GST_MULTI_QUEUE_MUTEX_LOCK (mq); for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { - GstSingleQueue *sq = (GstSingleQueue *) tmp->data; + GstSingleQueue *ssq = (GstSingleQueue *) tmp->data; - if (gst_data_queue_is_full (sq->queue)) { - GstDataQueueSize size; + if (sq == ssq) + continue; - gst_data_queue_get_level (sq->queue, &size); - if (IS_FILLED (visible, size.visible)) { - sq->max_size.visible = size.visible + 1; - GST_DEBUG_OBJECT (mq, - "queue %d is filled, bumping its max visible to %d", sq->id, - sq->max_size.visible); - gst_data_queue_limits_changed (sq->queue); - } + /* prevent data starvation */ + if (gst_data_queue_is_full (ssq->queue)) { + single_queue_overrun_cb (dq, ssq); + all_empty = FALSE; + break; + } + + if (!gst_data_queue_is_empty (ssq->queue)) { + all_empty = FALSE; + break; } - if (!gst_data_queue_is_empty (sq->queue)) - empty = FALSE; } GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); - if (empty) { + if (all_empty) { GST_DEBUG_OBJECT (mq, "All queues are empty, signalling it"); g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_UNDERRUN], 0); } @@ -1442,7 +1431,7 @@ gst_single_queue_new (GstMultiQueue * mqueue) sq->oldid = 0; sq->turn = g_cond_new (); - /* attach to underrun/overrun signals to handle non-starvation */ + /* attach to underrun/overrun signals to handle non-starvation */ g_signal_connect (G_OBJECT (sq->queue), "full", G_CALLBACK (single_queue_overrun_cb), sq); g_signal_connect (G_OBJECT (sq->queue), "empty",