diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index a95f4f9308..9bbb755994 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -626,7 +626,7 @@ gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item) /* dequeue an item from the queue and update level stats, with QUEUE_LOCK */ static GstMiniObject * -gst_queue_locked_dequeue (GstQueue * queue, gboolean * is_buffer) +gst_queue_locked_dequeue (GstQueue * queue) { GstMiniObject *item; @@ -648,7 +648,6 @@ gst_queue_locked_dequeue (GstQueue * queue, gboolean * is_buffer) if (queue->cur_level.buffers == 0) queue->cur_level.time = 0; - *is_buffer = TRUE; } else if (GST_IS_EVENT (item)) { GstEvent *event = GST_EVENT_CAST (item); @@ -671,8 +670,11 @@ gst_queue_locked_dequeue (GstQueue * queue, gboolean * is_buffer) default: break; } + } else if (GST_IS_QUERY (item)) { + GstQuery *query = GST_QUERY_CAST (item); - *is_buffer = FALSE; + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "retrieved query %p from queue", query); } else { g_warning ("Unexpected item %p dequeued from queue %s (refcounting problem?)", @@ -776,14 +778,38 @@ out_eos: static gboolean gst_queue_handle_sink_query (GstPad * pad, GstObject * parent, GstQuery * query) { + GstQueue *queue = GST_QUEUE_CAST (parent); gboolean res; switch (GST_QUERY_TYPE (query)) { default: - res = gst_pad_query_default (pad, parent, query); + if (G_UNLIKELY (GST_QUERY_IS_SERIALIZED (query))) { + GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); + GST_LOG_OBJECT (queue, "queuing query %p (%s)", query, + GST_QUERY_TYPE_NAME (query)); + g_queue_push_tail (&queue->queue, query); + GST_QUEUE_SIGNAL_ADD (queue); + while (queue->queue.length != 0) { + /* for as long as the queue has items, we know the query is + * not handled yet */ + GST_QUEUE_WAIT_DEL_CHECK (queue, out_flushing); + } + res = queue->last_query; + GST_QUEUE_MUTEX_UNLOCK (queue); + } else { + res = gst_pad_query_default (pad, parent, query); + } break; } return res; + + /* ERRORS */ +out_flushing: + { + GST_DEBUG_OBJECT (queue, "we are flushing"); + GST_QUEUE_MUTEX_UNLOCK (queue); + return FALSE; + } } static gboolean @@ -820,9 +846,8 @@ gst_queue_leak_downstream (GstQueue * queue) /* for as long as the queue is filled, dequeue an item and discard it */ while (gst_queue_is_filled (queue)) { GstMiniObject *leak; - gboolean is_buffer; - leak = gst_queue_locked_dequeue (queue, &is_buffer); + leak = gst_queue_locked_dequeue (queue); /* there is nothing to dequeue and the queue is still filled.. This should * not happen */ g_assert (leak != NULL); @@ -977,14 +1002,13 @@ gst_queue_push_one (GstQueue * queue) { GstFlowReturn result = GST_FLOW_OK; GstMiniObject *data; - gboolean is_buffer; - data = gst_queue_locked_dequeue (queue, &is_buffer); + data = gst_queue_locked_dequeue (queue); if (data == NULL) goto no_item; next: - if (is_buffer) { + if (GST_IS_BUFFER (data)) { GstBuffer *buffer; buffer = GST_BUFFER_CAST (data); @@ -1013,12 +1037,12 @@ next: * can push again, which is EOS or SEGMENT. If there is nothing in the * queue we can push, we set a flag to make the sinkpad refuse more * buffers with an EOS return value. */ - while ((data = gst_queue_locked_dequeue (queue, &is_buffer))) { - if (is_buffer) { + while ((data = gst_queue_locked_dequeue (queue))) { + if (GST_IS_BUFFER (data)) { GST_CAT_LOG_OBJECT (queue_dataflow, queue, "dropping EOS buffer %p", data); gst_buffer_unref (GST_BUFFER_CAST (data)); - } else { + } else if (GST_IS_EVENT (data)) { GstEvent *event = GST_EVENT_CAST (data); GstEventType type = GST_EVENT_TYPE (event); @@ -1032,6 +1056,12 @@ next: GST_CAT_LOG_OBJECT (queue_dataflow, queue, "dropping EOS event %p", event); gst_event_unref (event); + } else if (GST_IS_QUERY (data)) { + GstQuery *query = GST_QUERY_CAST (data); + + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "dropping query %p because of EOS", query); + queue->last_query = FALSE; } } /* no more items in the queue. Set the unexpected flag so that upstream @@ -1041,7 +1071,7 @@ next: queue->unexpected = TRUE; result = GST_FLOW_OK; } - } else { + } else if (GST_IS_EVENT (data)) { GstEvent *event = GST_EVENT_CAST (data); GstEventType type = GST_EVENT_TYPE (event); @@ -1056,6 +1086,12 @@ next: "pushed EOS event %p, return EOS", event); result = GST_FLOW_EOS; } + } else if (GST_IS_QUERY (data)) { + GstQuery *query = GST_QUERY_CAST (data); + + queue->last_query = gst_pad_peer_query (queue->srcpad, query); + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "did query %p, return %d", query, queue->last_query); } return result; diff --git a/plugins/elements/gstqueue.h b/plugins/elements/gstqueue.h index 3b0752c943..57ccc2a645 100644 --- a/plugins/elements/gstqueue.h +++ b/plugins/elements/gstqueue.h @@ -131,6 +131,8 @@ struct _GstQueue { /* whether the first new segment has been applied to src */ gboolean newseg_applied_to_src; + + gboolean last_query; }; struct _GstQueueClass {