plugins/elements/gstqueue.c: When changing thr max capacity of a leaky queue, immediatly drop buffers instead of wait...

Original commit message from CVS:
Patch by: Jonas Holmberg <jonas dot holmberg at axis dot com>
* plugins/elements/gstqueue.c: (gst_queue_leak_downstream),
(gst_queue_chain), (queue_capacity_change),
(gst_queue_set_property):
When changing thr max capacity of a leaky queue, immediatly drop buffers
instead of waiting for a push on the sinkpad. Fixes #530637.
This commit is contained in:
Jonas Holmberg 2008-04-30 09:35:43 +00:00 committed by Wim Taymans
parent ba8f586a9c
commit 0da9f87257
2 changed files with 46 additions and 26 deletions

View file

@ -1,3 +1,13 @@
2008-04-30 Wim Taymans <wim.taymans@collabora.co.uk>
Patch by: Jonas Holmberg <jonas dot holmberg at axis dot com>
* plugins/elements/gstqueue.c: (gst_queue_leak_downstream),
(gst_queue_chain), (queue_capacity_change),
(gst_queue_set_property):
When changing thr max capacity of a leaky queue, immediatly drop buffers
instead of waiting for a push on the sinkpad. Fixes #530637.
2008-04-30 Stefan Kost <ensonic@users.sf.net> 2008-04-30 Stefan Kost <ensonic@users.sf.net>
* gst/gstdebugutils.c: * gst/gstdebugutils.c:

View file

@ -831,6 +831,26 @@ gst_queue_is_filled (GstQueue * queue)
queue->cur_level.time >= queue->max_size.time))); queue->cur_level.time >= queue->max_size.time)));
} }
static void
gst_queue_leak_downstream (GstQueue * queue)
{
/* for as long as the queue is filled, dequeue an item and discard it */
do {
GstMiniObject *leak;
leak = gst_queue_locked_dequeue (queue);
/* there is nothing to dequeue and the queue is still filled.. This should
* not happen */
g_assert (leak != NULL);
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
"queue is full, leaking item %p on downstream end", leak);
gst_buffer_unref (leak);
} while (gst_queue_is_filled (queue));
/* last buffer needs to get a DISCONT flag */
queue->head_needs_discont = TRUE;
}
static GstFlowReturn static GstFlowReturn
gst_queue_chain (GstPad * pad, GstBuffer * buffer) gst_queue_chain (GstPad * pad, GstBuffer * buffer)
{ {
@ -878,25 +898,8 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
/* now we can clean up and exit right away */ /* now we can clean up and exit right away */
goto out_unref; goto out_unref;
case GST_QUEUE_LEAK_DOWNSTREAM: case GST_QUEUE_LEAK_DOWNSTREAM:
{ gst_queue_leak_downstream (queue);
/* for as long as the queue is filled, dequeue an item and discard
* it. */
do {
GstMiniObject *leak;
leak = gst_queue_locked_dequeue (queue);
/* there is nothing to dequeue and the queue is still filled.. This
* should not happen. */
g_assert (leak != NULL);
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
"queue is full, leaking item %p on downstream end", leak);
gst_buffer_unref (leak);
} while (gst_queue_is_filled (queue));
/* last buffer needs to get a DISCONT flag */
queue->head_needs_discont = TRUE;
break; break;
}
default: default:
g_warning ("Unknown leaky type, using default"); g_warning ("Unknown leaky type, using default");
/* fall-through */ /* fall-through */
@ -1332,11 +1335,18 @@ gst_queue_change_state (GstElement * element, GstStateChange transition)
return ret; return ret;
} }
/* changing the capacity of the queue must wake up static void
* the _chain function, it might have more room now queue_capacity_change (GstQueue * queue)
* to store the buffer/event in the queue */ {
#define QUEUE_CAPACITY_CHANGE(q)\ if (queue->leaky == GST_QUEUE_LEAK_DOWNSTREAM) {
GST_QUEUE_SIGNAL_DEL (q); gst_queue_leak_downstream (queue);
}
/* changing the capacity of the queue must wake up
* the _chain function, it might have more room now
* to store the buffer/event in the queue */
GST_QUEUE_SIGNAL_DEL (queue);
}
/* Changing the minimum required fill level must /* Changing the minimum required fill level must
* wake up the _loop function as it might now * wake up the _loop function as it might now
@ -1358,15 +1368,15 @@ gst_queue_set_property (GObject * object,
switch (prop_id) { switch (prop_id) {
case ARG_MAX_SIZE_BYTES: case ARG_MAX_SIZE_BYTES:
queue->max_size.bytes = g_value_get_uint (value); queue->max_size.bytes = g_value_get_uint (value);
QUEUE_CAPACITY_CHANGE (queue); queue_capacity_change (queue);
break; break;
case ARG_MAX_SIZE_BUFFERS: case ARG_MAX_SIZE_BUFFERS:
queue->max_size.buffers = g_value_get_uint (value); queue->max_size.buffers = g_value_get_uint (value);
QUEUE_CAPACITY_CHANGE (queue); queue_capacity_change (queue);
break; break;
case ARG_MAX_SIZE_TIME: case ARG_MAX_SIZE_TIME:
queue->max_size.time = g_value_get_uint64 (value); queue->max_size.time = g_value_get_uint64 (value);
QUEUE_CAPACITY_CHANGE (queue); queue_capacity_change (queue);
break; break;
case ARG_MIN_THRESHOLD_BYTES: case ARG_MIN_THRESHOLD_BYTES:
queue->min_threshold.bytes = g_value_get_uint (value); queue->min_threshold.bytes = g_value_get_uint (value);