diff --git a/ChangeLog b/ChangeLog index 7270390a97..4cd2c20079 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,19 @@ +2007-01-24 Edward Hervey + + * plugins/elements/gstmultiqueue.c: + (gst_multi_queue_loop): + Small fix. + (single_queue_overrun_cb), (single_queue_underrun_cb), + (single_queue_check_full), (gst_single_queue_new): + Implement single queue growth system. + This uses the extra-size properties, and will grow single queues by + that much if one goes full whereas there are others empty. This is + called extra-mode in the code. + When a single queue's levels go back below the initial max-size + limits, it is no longer in extra-mode. This is to ensure we don't + consume too much memory. + Fixes #399875 + 2007-01-23 Tim-Philipp Müller * gst/gst.c: (gst_init_get_option_group): diff --git a/plugins/elements/gstmultiqueue.c b/plugins/elements/gstmultiqueue.c index a7987d6c3a..19b369d447 100644 --- a/plugins/elements/gstmultiqueue.c +++ b/plugins/elements/gstmultiqueue.c @@ -53,6 +53,7 @@ struct _GstSingleQueue /* queue of data */ GstDataQueue *queue; GstDataQueueSize max_size, extra_size; + gboolean inextra; /* TRUE if the queue is currently in extradata mode */ /* Protected by global lock */ guint32 nextid; /* ID of the next object waiting to be pushed */ @@ -496,7 +497,7 @@ gst_multi_queue_loop (GstPad * pad) guint32 oldid = -1; sq = (GstSingleQueue *) gst_pad_get_element_private (pad); - mq = (GstMultiQueue *) gst_pad_get_parent (pad); + mq = sq->mqueue; restart: GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id); @@ -576,7 +577,6 @@ restart: beach: - gst_object_unref (mq); return; out_flushing: @@ -904,17 +904,44 @@ compute_next_non_linked (GstMultiQueue * mq) */ static void -single_queue_overrun_cb (GstSingleQueue * sq, GstMultiQueue * mq) +single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq) { - GST_LOG_OBJECT (mq, "Single Queue %d is full", sq->id); + GstMultiQueue *mq = sq->mqueue; + GList *tmp; + + GST_LOG_OBJECT (sq->mqueue, "Single Queue %d is full", sq->id); + + if (!sq->inextra) { + /* Check if at least one other queue is empty... */ + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { + GstSingleQueue *ssq = (GstSingleQueue *) tmp->data; + + if (gst_data_queue_is_empty (ssq->queue)) { + /* ... if so set sq->inextra to TRUE and don't emit overrun signal */ + GST_DEBUG_OBJECT (mq, + "Another queue is empty, bumping single queue into extra data mode"); + sq->inextra = TRUE; + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + goto beach; + } + } + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + } + /* Overrun is always forwarded, since this is blocking the upstream element */ - g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_OVERRUN], 0); + g_signal_emit (G_OBJECT (sq->mqueue), gst_multi_queue_signals[SIGNAL_OVERRUN], + 0); + +beach: + return; } static void -single_queue_underrun_cb (GstSingleQueue * sq, GstMultiQueue * mq) +single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq) { gboolean empty = TRUE; + GstMultiQueue *mq = sq->mqueue; GList *tmp; GST_LOG_OBJECT (mq, @@ -944,12 +971,32 @@ single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes, { gboolean res; + /* In all cases (extra mode or not), we check how the queue current level + * compares to max_size. */ res = (((sq->max_size.visible != 0) && sq->max_size.visible < visible) || ((sq->max_size.bytes != 0) && sq->max_size.bytes < bytes) || ((sq->max_size.time != 0) && sq->max_size.time < time)); + if (G_UNLIKELY (sq->inextra)) { + /* If we're in extra mode, one of two things can happen to check for + * fullness: */ + + if (!res) + /* #1 : Either we are not full against normal max_size levels, in which + * case we can go out of extra mode. */ + sq->inextra = FALSE; + else + /* #2 : Or else, the check should be done against max_size + extra_size */ + res = (((sq->max_size.visible != 0) && + (sq->max_size.visible + sq->extra_size.visible) < visible) || + ((sq->max_size.bytes != 0) && + (sq->max_size.bytes + sq->extra_size.bytes) < bytes) || + ((sq->max_size.time != 0) && + (sq->max_size.time + sq->extra_size.time) < time)); + + } return res; } @@ -993,10 +1040,10 @@ gst_single_queue_new (GstMultiQueue * mqueue) * OR should this be handled when we check if the queue is full/empty before pushing/popping ? */ g_signal_connect (G_OBJECT (sq->queue), "full", - G_CALLBACK (single_queue_overrun_cb), mqueue); + G_CALLBACK (single_queue_overrun_cb), sq); g_signal_connect (G_OBJECT (sq->queue), "empty", - G_CALLBACK (single_queue_underrun_cb), mqueue); + G_CALLBACK (single_queue_underrun_cb), sq); tmp = g_strdup_printf ("sink%d", sq->id); sq->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);