diff --git a/plugins/elements/gstqueue2.c b/plugins/elements/gstqueue2.c index 483613f3f3..80a9857505 100644 --- a/plugins/elements/gstqueue2.c +++ b/plugins/elements/gstqueue2.c @@ -2215,6 +2215,52 @@ out_unexpected: } } +static GstMiniObject * +gst_queue2_dequeue_on_unexpected (GstQueue2 * queue, + GstQueue2ItemType * item_type) +{ + GstMiniObject *data; + + GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got UNEXPECTED from downstream"); + + /* stop pushing buffers, we dequeue all items until we see an item that we + * can push again, which is EOS or NEWSEGMENT. If there is nothing in the + * queue we can push, we set a flag to make the sinkpad refuse more + * buffers with an UNEXPECTED return value until we receive something + * pushable again or we get flushed. */ + while ((data = gst_queue2_locked_dequeue (queue, item_type))) { + if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "dropping UNEXPECTED buffer %p", data); + gst_buffer_unref (GST_BUFFER_CAST (data)); + } else if (*item_type == GST_QUEUE2_ITEM_TYPE_EVENT) { + GstEvent *event = GST_EVENT_CAST (data); + GstEventType type = GST_EVENT_TYPE (event); + + if (type == GST_EVENT_EOS || type == GST_EVENT_NEWSEGMENT) { + /* we found a pushable item in the queue, push it out */ + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "pushing pushable event %s after UNEXPECTED", + GST_EVENT_TYPE_NAME (event)); + return data; + } + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "dropping UNEXPECTED event %p", event); + gst_event_unref (event); + } else if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "dropping UNEXPECTED buffer list %p", data); + gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data)); + } + } + /* no more items in the queue. Set the unexpected flag so that upstream + * make us refuse any more buffers on the sinkpad. Since we will still + * accept EOS and NEWSEGMENT we return _FLOW_OK to the caller so that the + * task function does not shut down. */ + queue->unexpected = TRUE; + return NULL; +} + /* dequeue an item from the queue an push it downstream. This functions returns * the result of the push. */ static GstFlowReturn @@ -2248,40 +2294,9 @@ next: /* need to check for srcresult here as well */ GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing); if (result == GST_FLOW_UNEXPECTED) { - GST_CAT_LOG_OBJECT (queue_dataflow, queue, - "got UNEXPECTED from downstream"); - /* stop pushing buffers, we dequeue all items until we see an item that we - * can push again, which is EOS or NEWSEGMENT. If there is nothing in the - * queue we can push, we set a flag to make the sinkpad refuse more - * buffers with an UNEXPECTED return value until we receive something - * pushable again or we get flushed. */ - while ((data = gst_queue2_locked_dequeue (queue, &item_type))) { - if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) { - GST_CAT_LOG_OBJECT (queue_dataflow, queue, - "dropping UNEXPECTED buffer %p", data); - gst_buffer_unref (GST_BUFFER_CAST (data)); - } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) { - GstEvent *event = GST_EVENT_CAST (data); - GstEventType type = GST_EVENT_TYPE (event); - - if (type == GST_EVENT_EOS || type == GST_EVENT_NEWSEGMENT) { - /* we found a pushable item in the queue, push it out */ - GST_CAT_LOG_OBJECT (queue_dataflow, queue, - "pushing pushable event %s after UNEXPECTED", - GST_EVENT_TYPE_NAME (event)); - goto next; - } - GST_CAT_LOG_OBJECT (queue_dataflow, queue, - "dropping UNEXPECTED event %p", event); - gst_event_unref (event); - } - } - /* no more items in the queue. Set the unexpected flag so that upstream - * make us refuse any more buffers on the sinkpad. Since we will still - * accept EOS and NEWSEGMENT we return _FLOW_OK to the caller so that the - * task function does not shut down. */ - queue->unexpected = TRUE; - result = GST_FLOW_OK; + data = gst_queue2_dequeue_on_unexpected (queue, &item_type); + if (data != NULL) + goto next; } } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) { GstEvent *event = GST_EVENT_CAST (data);