queue2: Unblock any waiting serialize queries on FLUSH_START

Fixes some deadlocks during flushing.

And store queue items differently to not accidentially read
already unreffed queries when flushing. Queries are owned by
upstream and not us.
This commit is contained in:
Sebastian Dröge 2013-05-27 15:41:14 +02:00
parent b6fac17502
commit cdc429f296

View file

@ -265,6 +265,12 @@ typedef enum
GST_QUEUE2_ITEM_TYPE_QUERY GST_QUEUE2_ITEM_TYPE_QUERY
} GstQueue2ItemType; } GstQueue2ItemType;
typedef struct
{
GstQueue2ItemType type;
GstMiniObject *item;
} GstQueue2Item;
/* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */ /* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
static void static void
@ -467,10 +473,11 @@ gst_queue2_finalize (GObject * object)
GST_DEBUG_OBJECT (queue, "finalizing queue"); GST_DEBUG_OBJECT (queue, "finalizing queue");
while (!g_queue_is_empty (&queue->queue)) { while (!g_queue_is_empty (&queue->queue)) {
GstMiniObject *data = g_queue_pop_head (&queue->queue); GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
if (!GST_IS_QUERY (data)) if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
gst_mini_object_unref (data); gst_mini_object_unref (qitem->item);
g_slice_free (GstQueue2Item, qitem);
} }
queue->last_query = FALSE; queue->last_query = FALSE;
@ -1528,18 +1535,21 @@ gst_queue2_locked_flush (GstQueue2 * queue, gboolean full)
init_ranges (queue); init_ranges (queue);
} else { } else {
while (!g_queue_is_empty (&queue->queue)) { while (!g_queue_is_empty (&queue->queue)) {
GstMiniObject *data = g_queue_pop_head (&queue->queue); GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
if (!full && GST_IS_EVENT (data) && GST_EVENT_IS_STICKY (data) && if (!full && qitem->type == GST_QUEUE2_ITEM_TYPE_EVENT
GST_EVENT_TYPE (data) != GST_EVENT_SEGMENT && GST_EVENT_IS_STICKY (qitem->item)
&& GST_EVENT_TYPE (data) != GST_EVENT_EOS) { && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
gst_pad_store_sticky_event (queue->srcpad, GST_EVENT_CAST (data)); && GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) {
gst_pad_store_sticky_event (queue->srcpad,
GST_EVENT_CAST (qitem->item));
} }
/* Then lose another reference because we are supposed to destroy that /* Then lose another reference because we are supposed to destroy that
data when flushing */ data when flushing */
if (!GST_IS_QUERY (data)) if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
gst_mini_object_unref (data); gst_mini_object_unref (qitem->item);
g_slice_free (GstQueue2Item, qitem);
} }
} }
g_cond_signal (&queue->query_handled); g_cond_signal (&queue->query_handled);
@ -2034,7 +2044,10 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
update_buffering (queue); update_buffering (queue);
if (QUEUE_IS_USING_QUEUE (queue)) { if (QUEUE_IS_USING_QUEUE (queue)) {
g_queue_push_tail (&queue->queue, item); GstQueue2Item *qitem = g_slice_new (GstQueue2Item);
qitem->type = item_type;
qitem->item = item;
g_queue_push_tail (&queue->queue, qitem);
} else { } else {
gst_mini_object_unref (GST_MINI_OBJECT_CAST (item)); gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
} }
@ -2062,10 +2075,13 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
{ {
GstMiniObject *item; GstMiniObject *item;
if (!QUEUE_IS_USING_QUEUE (queue)) if (!QUEUE_IS_USING_QUEUE (queue)) {
item = gst_queue2_read_item_from_file (queue); item = gst_queue2_read_item_from_file (queue);
else } else {
item = g_queue_pop_head (&queue->queue); GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
item = qitem->item;
g_slice_free (GstQueue2Item, qitem);
}
if (item == NULL) if (item == NULL)
goto no_item; goto no_item;
@ -2182,6 +2198,8 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
/* unblock the loop and chain functions */ /* unblock the loop and chain functions */
GST_QUEUE2_SIGNAL_ADD (queue); GST_QUEUE2_SIGNAL_ADD (queue);
GST_QUEUE2_SIGNAL_DEL (queue); GST_QUEUE2_SIGNAL_DEL (queue);
queue->last_query = FALSE;
g_cond_signal (&queue->query_handled);
GST_QUEUE2_MUTEX_UNLOCK (queue); GST_QUEUE2_MUTEX_UNLOCK (queue);
/* make sure it pauses, this should happen since we sent /* make sure it pauses, this should happen since we sent
@ -2193,6 +2211,8 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
/* flush the sink pad */ /* flush the sink pad */
queue->sinkresult = GST_FLOW_FLUSHING; queue->sinkresult = GST_FLOW_FLUSHING;
GST_QUEUE2_SIGNAL_DEL (queue); GST_QUEUE2_SIGNAL_DEL (queue);
queue->last_query = FALSE;
g_cond_signal (&queue->query_handled);
GST_QUEUE2_MUTEX_UNLOCK (queue); GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_event_unref (event); gst_event_unref (event);