From 6d4dc1a8075937f9a2a19f886c602d6d122c9ff5 Mon Sep 17 00:00:00 2001 From: Thijs Vermeir Date: Thu, 26 Jun 2008 20:27:00 +0000 Subject: [PATCH] plugins/elements/gstmultiqueue.*: revert extra-size-buffers stuff, caused some race conditions and extra-size-buffers... Original commit message from CVS: * plugins/elements/gstmultiqueue.c: * plugins/elements/gstmultiqueue.h: revert extra-size-buffers stuff, caused some race conditions and extra-size-buffers is not used anymore. Docs needs some updates --- ChangeLog | 7 ++ plugins/elements/gstmultiqueue.c | 150 +++++++++++++++---------------- plugins/elements/gstmultiqueue.h | 1 - 3 files changed, 79 insertions(+), 79 deletions(-) diff --git a/ChangeLog b/ChangeLog index 33b968a130..cea8f2d0aa 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,10 @@ +2008-06-26 Thijs Vermeir + + * plugins/elements/gstmultiqueue.c: + * plugins/elements/gstmultiqueue.h: + revert extra-size-buffers stuff, caused some race conditions + and extra-size-buffers is not used anymore. Docs needs some updates + 2008-06-26 Tim-Philipp Müller * win32/common/config.h: diff --git a/plugins/elements/gstmultiqueue.c b/plugins/elements/gstmultiqueue.c index 1e0b9bab66..291f136bd9 100644 --- a/plugins/elements/gstmultiqueue.c +++ b/plugins/elements/gstmultiqueue.c @@ -2,7 +2,6 @@ * Copyright (C) 2006 Edward Hervey * Copyright (C) 2007 Jan Schmidt * Copyright (C) 2007 Wim Taymans - * Copyright (C) 2008 Thijs Vermeir * * gstmultiqueue.c: * @@ -219,11 +218,11 @@ enum }; #define GST_MULTI_QUEUE_MUTEX_LOCK(q) G_STMT_START { \ - g_mutex_lock (q->qlock); \ + g_mutex_lock (q->qlock); \ } G_STMT_END #define GST_MULTI_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \ - g_mutex_unlock (q->qlock); \ + g_mutex_unlock (q->qlock); \ } G_STMT_END static void gst_multi_queue_finalize (GObject * object); @@ -253,8 +252,7 @@ gst_multi_queue_base_init (gpointer g_class) gst_element_class_set_details_simple (gstelement_class, "MultiQueue", - "Generic", "Multiple data queue", "Edward Hervey \n" - "Thijs Vermeir "); + "Generic", "Multiple data queue", "Edward Hervey "); gst_element_class_add_pad_template (gstelement_class, gst_static_pad_template_get (&sinktemplate)); gst_element_class_add_pad_template (gstelement_class, @@ -381,14 +379,13 @@ gst_multi_queue_finalize (GObject * object) G_OBJECT_CLASS (parent_class)->finalize (object); } -#define SET_CHILD_PROPERTY(mq,format) G_STMT_START { \ - GList * tmp = mq->queues; \ - while (tmp) { \ - GstSingleQueue *q = (GstSingleQueue*)tmp->data; \ +#define SET_CHILD_PROPERTY(mq,format) G_STMT_START { \ + GList * tmp = mq->queues; \ + while (tmp) { \ + GstSingleQueue *q = (GstSingleQueue*)tmp->data; \ q->max_size.format = mq->max_size.format; \ - q->extra_size.format = mq->extra_size.format; \ - tmp = g_list_next(tmp); \ - }; \ + tmp = g_list_next(tmp); \ + }; \ } G_STMT_END static void @@ -397,37 +394,38 @@ gst_multi_queue_set_property (GObject * object, guint prop_id, { GstMultiQueue *mq = GST_MULTI_QUEUE (object); - GST_MULTI_QUEUE_MUTEX_LOCK (mq); switch (prop_id) { case ARG_MAX_SIZE_BYTES: + GST_MULTI_QUEUE_MUTEX_LOCK (mq); mq->max_size.bytes = g_value_get_uint (value); SET_CHILD_PROPERTY (mq, bytes); + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); break; case ARG_MAX_SIZE_BUFFERS: + GST_MULTI_QUEUE_MUTEX_LOCK (mq); mq->max_size.visible = g_value_get_uint (value); SET_CHILD_PROPERTY (mq, visible); + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); break; case ARG_MAX_SIZE_TIME: + GST_MULTI_QUEUE_MUTEX_LOCK (mq); mq->max_size.time = g_value_get_uint64 (value); SET_CHILD_PROPERTY (mq, time); + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); break; case ARG_EXTRA_SIZE_BYTES: mq->extra_size.bytes = g_value_get_uint (value); - SET_CHILD_PROPERTY (mq, bytes); break; case ARG_EXTRA_SIZE_BUFFERS: mq->extra_size.visible = g_value_get_uint (value); - SET_CHILD_PROPERTY (mq, visible); break; case ARG_EXTRA_SIZE_TIME: mq->extra_size.time = g_value_get_uint64 (value); - SET_CHILD_PROPERTY (mq, time); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } - GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); } static void @@ -503,6 +501,7 @@ no_parent: } } + /* * GstElement methods */ @@ -601,7 +600,6 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush) sq->srcresult = GST_FLOW_OK; sq->cur_time = 0; sq->max_size.visible = mq->max_size.visible; - sq->extra_size.visible = mq->extra_size.visible; sq->is_eos = FALSE; sq->inextra = FALSE; sq->nextid = 0; @@ -1272,40 +1270,6 @@ compute_high_id (GstMultiQueue * mq) #define IS_FILLED(format, value) ((sq->max_size.format) != 0 && \ (sq->max_size.format) <= (value)) -static void -single_queue_overrun_cb_unlocked (GstDataQueue * dq, GstSingleQueue * sq) -{ - GstMultiQueue *mq = sq->mqueue; - GList *tmp; - GstDataQueueSize size; - - gst_data_queue_get_level (sq->queue, &size); - mq->filled = FALSE; - - /* if we have reached max visible we can maybe bump this - * if another queue is empty, skip this if we can't grow anymore - */ - if (IS_FILLED (visible, size.visible) && sq->extra_size.visible > 0) { - for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { - GstSingleQueue *ssq = (GstSingleQueue *) tmp->data; - - if (ssq == sq) - continue; - - if (gst_data_queue_is_empty (ssq->queue)) { - sq->max_size.visible++; - sq->extra_size.visible--; - GST_DEBUG_OBJECT (mq, - "queue %d is empty, bumping single queue %d max visible to %d", - ssq->id, sq->id, sq->max_size.visible); - break; - } - } - /* check if the queue is still full */ - mq->filled = gst_data_queue_is_full (sq->queue); - } -} - /* * GstSingleQueue functions */ @@ -1313,26 +1277,63 @@ static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq) { GstMultiQueue *mq = sq->mqueue; + GList *tmp; + GstDataQueueSize size; + gboolean filled = FALSE; + + gst_data_queue_get_level (sq->queue, &size); GST_LOG_OBJECT (mq, "Single Queue %d is full", sq->id); GST_MULTI_QUEUE_MUTEX_LOCK (mq); + for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { + GstSingleQueue *ssq = (GstSingleQueue *) tmp->data; + GstDataQueueSize ssize; - single_queue_overrun_cb_unlocked (dq, sq); + GST_LOG_OBJECT (mq, "Checking Queue %d", ssq->id); + if (gst_data_queue_is_empty (ssq->queue)) { + GST_LOG_OBJECT (mq, "Queue %d is empty", ssq->id); + if (IS_FILLED (visible, size.visible)) { + sq->max_size.visible = size.visible + 1; + GST_DEBUG_OBJECT (mq, + "Another queue is empty, bumping single queue %d max visible to %d", + sq->id, sq->max_size.visible); + } + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + goto beach; + } + /* check if we reached the hard time/bytes limits */ + gst_data_queue_get_level (ssq->queue, &ssize); + + GST_DEBUG_OBJECT (mq, + "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%" + G_GUINT64_FORMAT, ssq->id, ssize.visible, sq->max_size.visible, + ssize.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time); + + /* if this queue is filled completely we must signal overrun */ + if (IS_FILLED (bytes, ssize.bytes) || IS_FILLED (time, sq->cur_time)) { + GST_LOG_OBJECT (mq, "Queue %d is filled", ssq->id); + filled = TRUE; + } + } + /* no queues were empty */ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); - if (mq->filled) { + /* Overrun is always forwarded, since this is blocking the upstream element */ + if (filled) { GST_DEBUG_OBJECT (mq, "A queue is filled, signalling overrun"); g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_OVERRUN], 0); - mq->filled = FALSE; } + +beach: + return; } static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq) { - gboolean all_empty = TRUE; + gboolean empty = TRUE; GstMultiQueue *mq = sq->mqueue; GList *tmp; @@ -1341,36 +1342,29 @@ single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq) GST_MULTI_QUEUE_MUTEX_LOCK (mq); for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { - GstSingleQueue *ssq = (GstSingleQueue *) tmp->data; + GstSingleQueue *sq = (GstSingleQueue *) tmp->data; - if (sq == ssq) - continue; + if (gst_data_queue_is_full (sq->queue)) { + GstDataQueueSize size; - /* prevent data starvation */ - if (gst_data_queue_is_full (ssq->queue)) { - single_queue_overrun_cb_unlocked (dq, ssq); - goto check_filled; - } - - if (!gst_data_queue_is_empty (ssq->queue)) { - all_empty = FALSE; - break; + gst_data_queue_get_level (sq->queue, &size); + if (IS_FILLED (visible, size.visible)) { + sq->max_size.visible = size.visible + 1; + GST_DEBUG_OBJECT (mq, + "queue %d is filled, bumping its max visible to %d", sq->id, + sq->max_size.visible); + gst_data_queue_limits_changed (sq->queue); + } } + if (!gst_data_queue_is_empty (sq->queue)) + empty = FALSE; } GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); - if (all_empty) { + if (empty) { GST_DEBUG_OBJECT (mq, "All queues are empty, signalling it"); g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_UNDERRUN], 0); } - return; - -check_filled: - if (mq->filled) { - GST_DEBUG_OBJECT (mq, "A queue is filled, signalling overrun"); - g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_OVERRUN], 0); - mq->filled = FALSE; - } } static gboolean @@ -1448,7 +1442,7 @@ gst_single_queue_new (GstMultiQueue * mqueue) sq->oldid = 0; sq->turn = g_cond_new (); - /* attach to underrun/overrun signals to handle non-starvation */ + /* attach to underrun/overrun signals to handle non-starvation */ g_signal_connect (G_OBJECT (sq->queue), "full", G_CALLBACK (single_queue_overrun_cb), sq); g_signal_connect (G_OBJECT (sq->queue), "empty", diff --git a/plugins/elements/gstmultiqueue.h b/plugins/elements/gstmultiqueue.h index 1666b5d4c8..4874563236 100644 --- a/plugins/elements/gstmultiqueue.h +++ b/plugins/elements/gstmultiqueue.h @@ -68,7 +68,6 @@ struct _GstMultiQueue { gint nextnotlinked; /* ID of the next queue not linked (-1 : none) */ gint numwaiting; /* number of not-linked pads waiting */ - gboolean filled; /* overrun detected */ }; struct _GstMultiQueueClass {