queue2: split out draining of queue on FLOW_UNEXPECTED into separate function

This commit is contained in:
Tim-Philipp Müller 2011-11-03 13:02:36 +00:00
parent c9df12754d
commit d663e18402

View file

@ -2215,6 +2215,52 @@ out_unexpected:
} }
} }
static GstMiniObject *
gst_queue2_dequeue_on_unexpected (GstQueue2 * queue,
GstQueue2ItemType * item_type)
{
GstMiniObject *data;
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got UNEXPECTED from downstream");
/* stop pushing buffers, we dequeue all items until we see an item that we
* can push again, which is EOS or NEWSEGMENT. If there is nothing in the
* queue we can push, we set a flag to make the sinkpad refuse more
* buffers with an UNEXPECTED return value until we receive something
* pushable again or we get flushed. */
while ((data = gst_queue2_locked_dequeue (queue, item_type))) {
if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"dropping UNEXPECTED buffer %p", data);
gst_buffer_unref (GST_BUFFER_CAST (data));
} else if (*item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
GstEvent *event = GST_EVENT_CAST (data);
GstEventType type = GST_EVENT_TYPE (event);
if (type == GST_EVENT_EOS || type == GST_EVENT_NEWSEGMENT) {
/* we found a pushable item in the queue, push it out */
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"pushing pushable event %s after UNEXPECTED",
GST_EVENT_TYPE_NAME (event));
return data;
}
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"dropping UNEXPECTED event %p", event);
gst_event_unref (event);
} else if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"dropping UNEXPECTED buffer list %p", data);
gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
}
}
/* no more items in the queue. Set the unexpected flag so that upstream
* make us refuse any more buffers on the sinkpad. Since we will still
* accept EOS and NEWSEGMENT we return _FLOW_OK to the caller so that the
* task function does not shut down. */
queue->unexpected = TRUE;
return NULL;
}
/* dequeue an item from the queue an push it downstream. This functions returns /* dequeue an item from the queue an push it downstream. This functions returns
* the result of the push. */ * the result of the push. */
static GstFlowReturn static GstFlowReturn
@ -2248,40 +2294,9 @@ next:
/* need to check for srcresult here as well */ /* need to check for srcresult here as well */
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing); GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
if (result == GST_FLOW_UNEXPECTED) { if (result == GST_FLOW_UNEXPECTED) {
GST_CAT_LOG_OBJECT (queue_dataflow, queue, data = gst_queue2_dequeue_on_unexpected (queue, &item_type);
"got UNEXPECTED from downstream"); if (data != NULL)
/* stop pushing buffers, we dequeue all items until we see an item that we goto next;
* can push again, which is EOS or NEWSEGMENT. If there is nothing in the
* queue we can push, we set a flag to make the sinkpad refuse more
* buffers with an UNEXPECTED return value until we receive something
* pushable again or we get flushed. */
while ((data = gst_queue2_locked_dequeue (queue, &item_type))) {
if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"dropping UNEXPECTED buffer %p", data);
gst_buffer_unref (GST_BUFFER_CAST (data));
} else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
GstEvent *event = GST_EVENT_CAST (data);
GstEventType type = GST_EVENT_TYPE (event);
if (type == GST_EVENT_EOS || type == GST_EVENT_NEWSEGMENT) {
/* we found a pushable item in the queue, push it out */
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"pushing pushable event %s after UNEXPECTED",
GST_EVENT_TYPE_NAME (event));
goto next;
}
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"dropping UNEXPECTED event %p", event);
gst_event_unref (event);
}
}
/* no more items in the queue. Set the unexpected flag so that upstream
* make us refuse any more buffers on the sinkpad. Since we will still
* accept EOS and NEWSEGMENT we return _FLOW_OK to the caller so that the
* task function does not shut down. */
queue->unexpected = TRUE;
result = GST_FLOW_OK;
} }
} else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) { } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
GstEvent *event = GST_EVENT_CAST (data); GstEvent *event = GST_EVENT_CAST (data);