queue: add support for serialized queries

This commit is contained in:
Wim Taymans 2012-03-14 15:42:33 +01:00
parent fb2fc331de
commit 2e864c4722
2 changed files with 51 additions and 13 deletions

View file

@ -626,7 +626,7 @@ gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item)
/* dequeue an item from the queue and update level stats, with QUEUE_LOCK */
static GstMiniObject *
gst_queue_locked_dequeue (GstQueue * queue, gboolean * is_buffer)
gst_queue_locked_dequeue (GstQueue * queue)
{
GstMiniObject *item;
@ -648,7 +648,6 @@ gst_queue_locked_dequeue (GstQueue * queue, gboolean * is_buffer)
if (queue->cur_level.buffers == 0)
queue->cur_level.time = 0;
*is_buffer = TRUE;
} else if (GST_IS_EVENT (item)) {
GstEvent *event = GST_EVENT_CAST (item);
@ -671,8 +670,11 @@ gst_queue_locked_dequeue (GstQueue * queue, gboolean * is_buffer)
default:
break;
}
} else if (GST_IS_QUERY (item)) {
GstQuery *query = GST_QUERY_CAST (item);
*is_buffer = FALSE;
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"retrieved query %p from queue", query);
} else {
g_warning
("Unexpected item %p dequeued from queue %s (refcounting problem?)",
@ -776,14 +778,38 @@ out_eos:
static gboolean
gst_queue_handle_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
{
GstQueue *queue = GST_QUEUE_CAST (parent);
gboolean res;
switch (GST_QUERY_TYPE (query)) {
default:
res = gst_pad_query_default (pad, parent, query);
if (G_UNLIKELY (GST_QUERY_IS_SERIALIZED (query))) {
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
GST_LOG_OBJECT (queue, "queuing query %p (%s)", query,
GST_QUERY_TYPE_NAME (query));
g_queue_push_tail (&queue->queue, query);
GST_QUEUE_SIGNAL_ADD (queue);
while (queue->queue.length != 0) {
/* for as long as the queue has items, we know the query is
* not handled yet */
GST_QUEUE_WAIT_DEL_CHECK (queue, out_flushing);
}
res = queue->last_query;
GST_QUEUE_MUTEX_UNLOCK (queue);
} else {
res = gst_pad_query_default (pad, parent, query);
}
break;
}
return res;
/* ERRORS */
out_flushing:
{
GST_DEBUG_OBJECT (queue, "we are flushing");
GST_QUEUE_MUTEX_UNLOCK (queue);
return FALSE;
}
}
static gboolean
@ -820,9 +846,8 @@ gst_queue_leak_downstream (GstQueue * queue)
/* for as long as the queue is filled, dequeue an item and discard it */
while (gst_queue_is_filled (queue)) {
GstMiniObject *leak;
gboolean is_buffer;
leak = gst_queue_locked_dequeue (queue, &is_buffer);
leak = gst_queue_locked_dequeue (queue);
/* there is nothing to dequeue and the queue is still filled.. This should
* not happen */
g_assert (leak != NULL);
@ -977,14 +1002,13 @@ gst_queue_push_one (GstQueue * queue)
{
GstFlowReturn result = GST_FLOW_OK;
GstMiniObject *data;
gboolean is_buffer;
data = gst_queue_locked_dequeue (queue, &is_buffer);
data = gst_queue_locked_dequeue (queue);
if (data == NULL)
goto no_item;
next:
if (is_buffer) {
if (GST_IS_BUFFER (data)) {
GstBuffer *buffer;
buffer = GST_BUFFER_CAST (data);
@ -1013,12 +1037,12 @@ next:
* can push again, which is EOS or SEGMENT. If there is nothing in the
* queue we can push, we set a flag to make the sinkpad refuse more
* buffers with an EOS return value. */
while ((data = gst_queue_locked_dequeue (queue, &is_buffer))) {
if (is_buffer) {
while ((data = gst_queue_locked_dequeue (queue))) {
if (GST_IS_BUFFER (data)) {
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"dropping EOS buffer %p", data);
gst_buffer_unref (GST_BUFFER_CAST (data));
} else {
} else if (GST_IS_EVENT (data)) {
GstEvent *event = GST_EVENT_CAST (data);
GstEventType type = GST_EVENT_TYPE (event);
@ -1032,6 +1056,12 @@ next:
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"dropping EOS event %p", event);
gst_event_unref (event);
} else if (GST_IS_QUERY (data)) {
GstQuery *query = GST_QUERY_CAST (data);
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"dropping query %p because of EOS", query);
queue->last_query = FALSE;
}
}
/* no more items in the queue. Set the unexpected flag so that upstream
@ -1041,7 +1071,7 @@ next:
queue->unexpected = TRUE;
result = GST_FLOW_OK;
}
} else {
} else if (GST_IS_EVENT (data)) {
GstEvent *event = GST_EVENT_CAST (data);
GstEventType type = GST_EVENT_TYPE (event);
@ -1056,6 +1086,12 @@ next:
"pushed EOS event %p, return EOS", event);
result = GST_FLOW_EOS;
}
} else if (GST_IS_QUERY (data)) {
GstQuery *query = GST_QUERY_CAST (data);
queue->last_query = gst_pad_peer_query (queue->srcpad, query);
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"did query %p, return %d", query, queue->last_query);
}
return result;

View file

@ -131,6 +131,8 @@ struct _GstQueue {
/* whether the first new segment has been applied to src */
gboolean newseg_applied_to_src;
gboolean last_query;
};
struct _GstQueueClass {