diff --git a/plugins/elements/gstqueue2.c b/plugins/elements/gstqueue2.c index f085abe3bf..70032178ef 100644 --- a/plugins/elements/gstqueue2.c +++ b/plugins/elements/gstqueue2.c @@ -2339,14 +2339,21 @@ gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent, * completely, which can not happen if we block on the query.. * Therefore we only potentially block when we are not buffering. */ GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing); - if (QUEUE_IS_USING_QUEUE (queue) && !queue->is_buffering) { - gst_queue2_locked_enqueue (queue, query, GST_QUEUE2_ITEM_TYPE_QUERY); + if (QUEUE_IS_USING_QUEUE (queue) && (gst_queue2_is_empty (queue) + || !queue->use_buffering)) { + if (!g_atomic_int_get (&queue->downstream_may_block)) { + gst_queue2_locked_enqueue (queue, query, + GST_QUEUE2_ITEM_TYPE_QUERY); - STATUS (queue, queue->sinkpad, "wait for QUERY"); - g_cond_wait (&queue->query_handled, &queue->qlock); - if (queue->sinkresult != GST_FLOW_OK) - goto out_flushing; - res = queue->last_query; + STATUS (queue, queue->sinkpad, "wait for QUERY"); + g_cond_wait (&queue->query_handled, &queue->qlock); + if (queue->sinkresult != GST_FLOW_OK) + goto out_flushing; + res = queue->last_query; + } else { + GST_DEBUG_OBJECT (queue, "refusing query, downstream might block"); + res = FALSE; + } } else { GST_DEBUG_OBJECT (queue, "refusing query, we are not using the queue"); @@ -2588,6 +2595,10 @@ gst_queue2_push_one (GstQueue2 * queue) goto no_item; next: + STATUS (queue, queue->srcpad, "We have something dequeud"); + g_atomic_int_set (&queue->downstream_may_block, + item_type == GST_QUEUE2_ITEM_TYPE_BUFFER || + item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST); GST_QUEUE2_MUTEX_UNLOCK (queue); if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) { @@ -2596,6 +2607,7 @@ next: buffer = GST_BUFFER_CAST (data); result = gst_pad_push (queue->srcpad, buffer); + g_atomic_int_set (&queue->downstream_may_block, 0); /* need to check for srcresult here as well */ GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing); @@ -2627,6 +2639,7 @@ next: buffer_list = GST_BUFFER_LIST_CAST (data); result = gst_pad_push_list (queue->srcpad, buffer_list); + g_atomic_int_set (&queue->downstream_may_block, 0); /* need to check for srcresult here as well */ GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing); @@ -2641,7 +2654,9 @@ next: } else if (item_type == GST_QUEUE2_ITEM_TYPE_QUERY) { GstQuery *query = GST_QUERY_CAST (data); + GST_LOG_OBJECT (queue->srcpad, "Peering query %p", query); queue->last_query = gst_pad_peer_query (queue->srcpad, query); + GST_LOG_OBJECT (queue->srcpad, "Peered query"); GST_CAT_LOG_OBJECT (queue_dataflow, queue, "did query %p, return %d", query, queue->last_query); g_cond_signal (&queue->query_handled); diff --git a/plugins/elements/gstqueue2.h b/plugins/elements/gstqueue2.h index 3fe5fc45dd..bbd614dd4a 100644 --- a/plugins/elements/gstqueue2.h +++ b/plugins/elements/gstqueue2.h @@ -151,6 +151,8 @@ struct _GstQueue2 guint64 ring_buffer_max_size; guint8 * ring_buffer; + + volatile gint downstream_may_block; }; struct _GstQueue2Class