queue2: Add support for serialized queries if using a memory queue

This commit is contained in:
Sebastian Dröge 2013-05-24 19:22:22 +02:00
parent 46270f8925
commit e692993e79
2 changed files with 65 additions and 6 deletions

View file

@ -261,7 +261,8 @@ typedef enum
GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0,
GST_QUEUE2_ITEM_TYPE_BUFFER,
GST_QUEUE2_ITEM_TYPE_BUFFER_LIST,
GST_QUEUE2_ITEM_TYPE_EVENT
GST_QUEUE2_ITEM_TYPE_EVENT,
GST_QUEUE2_ITEM_TYPE_QUERY
} GstQueue2ItemType;
/* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
@ -440,6 +441,9 @@ gst_queue2_init (GstQueue2 * queue)
g_cond_init (&queue->item_del);
g_queue_init (&queue->queue);
g_cond_init (&queue->query_handled);
queue->last_query = FALSE;
queue->buffering_percent = 100;
/* tempfile related */
@ -465,13 +469,16 @@ gst_queue2_finalize (GObject * object)
while (!g_queue_is_empty (&queue->queue)) {
GstMiniObject *data = g_queue_pop_head (&queue->queue);
gst_mini_object_unref (data);
if (!GST_IS_QUERY (data))
gst_mini_object_unref (data);
}
queue->last_query = FALSE;
g_queue_clear (&queue->queue);
g_mutex_clear (&queue->qlock);
g_cond_clear (&queue->item_add);
g_cond_clear (&queue->item_del);
g_cond_clear (&queue->query_handled);
g_timer_destroy (queue->in_timer);
g_timer_destroy (queue->out_timer);
@ -1525,9 +1532,12 @@ gst_queue2_locked_flush (GstQueue2 * queue)
/* Then lose another reference because we are supposed to destroy that
data when flushing */
gst_mini_object_unref (data);
if (!GST_IS_QUERY (data))
gst_mini_object_unref (data);
}
}
g_cond_signal (&queue->query_handled);
queue->last_query = FALSE;
GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
@ -2001,6 +2011,10 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
goto unexpected_event;
break;
}
} else if (GST_IS_QUERY (item)) {
/* Can't happen as we check that in the caller */
if (!QUEUE_IS_USING_QUEUE (queue))
g_assert_not_reached ();
} else {
g_warning ("Unexpected item %p added in queue %s (refcounting problem?)",
item, GST_OBJECT_NAME (queue));
@ -2116,7 +2130,10 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
/* update the buffering */
if (queue->use_buffering)
update_buffering (queue);
} else if (GST_IS_QUERY (item)) {
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"retrieved query %p from queue", item);
*item_type = GST_QUEUE2_ITEM_TYPE_QUERY;
} else {
g_warning
("Unexpected item %p dequeued from queue %s (refcounting problem?)",
@ -2248,19 +2265,45 @@ static gboolean
gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
GstQuery * query)
{
GstQueue2 *queue;
gboolean res;
queue = GST_QUEUE2 (parent);
switch (GST_QUERY_TYPE (query)) {
default:
if (GST_QUERY_IS_SERIALIZED (query)) {
GST_WARNING_OBJECT (pad, "unhandled serialized query");
res = FALSE;
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received query %p", query);
/* serialized events go in the queue */
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
if (QUEUE_IS_USING_QUEUE (queue)) {
gst_queue2_locked_enqueue (queue, query, GST_QUEUE2_ITEM_TYPE_QUERY);
STATUS (queue, queue->sinkpad, "wait for QUERY");
g_cond_wait (&queue->query_handled, &queue->qlock);
if (queue->sinkresult != GST_FLOW_OK)
goto out_flushing;
res = queue->last_query;
} else {
GST_DEBUG_OBJECT (queue,
"refusing query, we are not using the queue");
res = FALSE;
}
GST_QUEUE2_MUTEX_UNLOCK (queue);
} else {
res = gst_pad_query_default (pad, parent, query);
}
break;
}
return res;
/* ERRORS */
out_flushing:
{
GST_DEBUG_OBJECT (queue, "refusing query, we are flushing");
GST_QUEUE2_MUTEX_UNLOCK (queue);
return FALSE;
}
}
static gboolean
@ -2454,6 +2497,10 @@ gst_queue2_dequeue_on_eos (GstQueue2 * queue, GstQueue2ItemType * item_type)
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"dropping EOS buffer list %p", data);
gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
} else if (*item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
queue->last_query = FALSE;
g_cond_signal (&queue->query_handled);
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "dropping EOS query %p", data);
}
}
/* no more items in the queue. Set the unexpected flag so that upstream
@ -2528,6 +2575,15 @@ next:
* to the caller so that the task function does not shut down */
result = GST_FLOW_OK;
}
} else if (item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
GstQuery *query = GST_QUERY_CAST (data);
queue->last_query = gst_pad_peer_query (queue->srcpad, query);
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"did query %p, return %d", query, queue->last_query);
g_cond_signal (&queue->query_handled);
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
result = GST_FLOW_OK;
}
return result;

View file

@ -97,6 +97,9 @@ struct _GstQueue2
/* the queue of data we're keeping our hands on */
GQueue queue;
GCond query_handled;
gboolean last_query; /* result of last serialized query */
GstQueue2Size cur_level; /* currently in the queue */
GstQueue2Size max_level; /* max. amount of data allowed in the queue */
gboolean use_buffering;