From b6fac17502957bb4e10e73926da87f289a1d94b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 27 May 2013 13:01:43 +0200 Subject: [PATCH] queue/queue2/multiqueue: When flushing, make sure to not lose any sticky events https://bugzilla.gnome.org/show_bug.cgi?id=688824 --- plugins/elements/gstmultiqueue.c | 52 +++++++++++++++++++++++++++----- plugins/elements/gstqueue.c | 17 +++++++---- plugins/elements/gstqueue2.c | 12 ++++++-- 3 files changed, 65 insertions(+), 16 deletions(-) diff --git a/plugins/elements/gstmultiqueue.c b/plugins/elements/gstmultiqueue.c index fd5fe2e120..1f258e80be 100644 --- a/plugins/elements/gstmultiqueue.c +++ b/plugins/elements/gstmultiqueue.c @@ -198,6 +198,8 @@ static void compute_high_time (GstMultiQueue * mq); static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq); static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq); +static void gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full); + static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u", GST_PAD_SINK, GST_PAD_REQUEST, @@ -733,7 +735,8 @@ gst_multi_queue_change_state (GstElement * element, GstStateChange transition) } static gboolean -gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush) +gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush, + gboolean full) { gboolean result; @@ -760,7 +763,7 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush) sq->sink_tainted = sq->src_tainted = TRUE; } else { GST_MULTI_QUEUE_MUTEX_LOCK (mq); - gst_data_queue_flush (sq->queue); + gst_single_queue_flush_queue (sq, full); gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME); gst_segment_init (&sq->src_segment, GST_FORMAT_TIME); /* All pads start off not-linked for a smooth kick-off */ @@ -1372,7 +1375,7 @@ out_flushing: * so empty this one and trigger dynamic queue growth. At * this point the srcresult is not OK, NOT_LINKED * or EOS, i.e. a real failure */ - gst_data_queue_flush (sq->queue); + gst_single_queue_flush_queue (sq, FALSE); single_queue_underrun_cb (sq->queue, sq); gst_data_queue_set_flushing (sq->queue, TRUE); gst_pad_pause_task (sq->srcpad); @@ -1511,7 +1514,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) res = gst_pad_push_event (sq->srcpad, event); - gst_single_queue_flush (mq, sq, TRUE); + gst_single_queue_flush (mq, sq, TRUE, FALSE); goto done; case GST_EVENT_FLUSH_STOP: @@ -1520,7 +1523,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) res = gst_pad_push_event (sq->srcpad, event); - gst_single_queue_flush (mq, sq, FALSE); + gst_single_queue_flush (mq, sq, FALSE, FALSE); goto done; case GST_EVENT_SEGMENT: /* take ref because the queue will take ownership and we need the event @@ -1646,9 +1649,9 @@ gst_multi_queue_src_activate_mode (GstPad * pad, GstObject * parent, switch (mode) { case GST_PAD_MODE_PUSH: if (active) { - result = gst_single_queue_flush (mq, sq, FALSE); + result = gst_single_queue_flush (mq, sq, FALSE, TRUE); } else { - result = gst_single_queue_flush (mq, sq, TRUE); + result = gst_single_queue_flush (mq, sq, TRUE, TRUE); /* make sure streaming finishes */ result |= gst_pad_stop_task (pad); } @@ -1951,6 +1954,41 @@ single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes, return res; } +static void +gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full) +{ + GstDataQueueItem *sitem; + gboolean was_flushing = FALSE; + + while (!gst_data_queue_is_empty (sq->queue)) { + GstMiniObject *data; + + /* FIXME: If this fails here although the queue is not empty, + * we're flushing... but we want to rescue all sticky + * events nonetheless. + */ + if (!gst_data_queue_pop (sq->queue, &sitem)) { + was_flushing = TRUE; + gst_data_queue_set_flushing (sq->queue, FALSE); + continue; + } + + data = sitem->object; + + if (!full && GST_IS_EVENT (data) && GST_EVENT_IS_STICKY (data) && + GST_EVENT_TYPE (data) != GST_EVENT_SEGMENT + && GST_EVENT_TYPE (data) != GST_EVENT_EOS) { + gst_pad_store_sticky_event (sq->srcpad, GST_EVENT_CAST (data)); + } + + sitem->destroy (sitem); + } + + gst_data_queue_flush (sq->queue); + if (was_flushing) + gst_data_queue_set_flushing (sq->queue, TRUE); +} + static void gst_single_queue_free (GstSingleQueue * sq) { diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index d3780ba235..834826571c 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -209,7 +209,7 @@ static gboolean gst_queue_handle_src_event (GstPad * pad, GstObject * parent, static gboolean gst_queue_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query); -static void gst_queue_locked_flush (GstQueue * queue); +static void gst_queue_locked_flush (GstQueue * queue, gboolean full); static gboolean gst_queue_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode, gboolean active); @@ -573,7 +573,7 @@ apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment, } static void -gst_queue_locked_flush (GstQueue * queue) +gst_queue_locked_flush (GstQueue * queue, gboolean full) { GstMiniObject *data; @@ -581,6 +581,11 @@ gst_queue_locked_flush (GstQueue * queue) data = gst_queue_array_pop_head (queue->queue); /* Then lose another reference because we are supposed to destroy that data when flushing */ + if (!full && GST_IS_EVENT (data) && GST_EVENT_IS_STICKY (data) && + GST_EVENT_TYPE (data) != GST_EVENT_SEGMENT + && GST_EVENT_TYPE (data) != GST_EVENT_EOS) { + gst_pad_store_sticky_event (queue->srcpad, GST_EVENT_CAST (data)); + } if (!GST_IS_QUERY (data)) gst_mini_object_unref (data); } @@ -627,7 +632,7 @@ gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item) /* Zero the thresholds, this makes sure the queue is completely * filled and we can read all data from the queue. */ if (queue->flush_on_eos) - gst_queue_locked_flush (queue); + gst_queue_locked_flush (queue, FALSE); else GST_QUEUE_CLEAR_LEVEL (queue->min_threshold); /* mark the queue as EOS. This prevents us from accepting more data. */ @@ -758,7 +763,7 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) gst_pad_push_event (queue->srcpad, event); GST_QUEUE_MUTEX_LOCK (queue); - gst_queue_locked_flush (queue); + gst_queue_locked_flush (queue, FALSE); queue->srcresult = GST_FLOW_OK; queue->eos = FALSE; queue->unexpected = FALSE; @@ -1228,7 +1233,7 @@ out_flushing: GST_CAT_LOG_OBJECT (queue_dataflow, queue, "pause task, reason: %s", gst_flow_get_name (ret)); if (ret == GST_FLOW_FLUSHING) - gst_queue_locked_flush (queue); + gst_queue_locked_flush (queue, FALSE); else GST_QUEUE_SIGNAL_DEL (queue); GST_QUEUE_MUTEX_UNLOCK (queue); @@ -1377,7 +1382,7 @@ gst_queue_sink_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode, /* step 1, unblock chain function */ GST_QUEUE_MUTEX_LOCK (queue); queue->srcresult = GST_FLOW_FLUSHING; - gst_queue_locked_flush (queue); + gst_queue_locked_flush (queue, TRUE); GST_QUEUE_MUTEX_UNLOCK (queue); } result = TRUE; diff --git a/plugins/elements/gstqueue2.c b/plugins/elements/gstqueue2.c index c1f1ce662f..90395dff7d 100644 --- a/plugins/elements/gstqueue2.c +++ b/plugins/elements/gstqueue2.c @@ -1520,7 +1520,7 @@ gst_queue2_flush_temp_file (GstQueue2 * queue) } static void -gst_queue2_locked_flush (GstQueue2 * queue) +gst_queue2_locked_flush (GstQueue2 * queue, gboolean full) { if (!QUEUE_IS_USING_QUEUE (queue)) { if (QUEUE_IS_USING_TEMP_FILE (queue)) @@ -1530,6 +1530,12 @@ gst_queue2_locked_flush (GstQueue2 * queue) while (!g_queue_is_empty (&queue->queue)) { GstMiniObject *data = g_queue_pop_head (&queue->queue); + if (!full && GST_IS_EVENT (data) && GST_EVENT_IS_STICKY (data) && + GST_EVENT_TYPE (data) != GST_EVENT_SEGMENT + && GST_EVENT_TYPE (data) != GST_EVENT_EOS) { + gst_pad_store_sticky_event (queue->srcpad, GST_EVENT_CAST (data)); + } + /* Then lose another reference because we are supposed to destroy that data when flushing */ if (!GST_IS_QUERY (data)) @@ -2202,7 +2208,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent, gst_pad_push_event (queue->srcpad, event); GST_QUEUE2_MUTEX_LOCK (queue); - gst_queue2_locked_flush (queue); + gst_queue2_locked_flush (queue, FALSE); queue->srcresult = GST_FLOW_OK; queue->sinkresult = GST_FLOW_OK; queue->is_eos = FALSE; @@ -3043,7 +3049,7 @@ gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent, GST_DEBUG_OBJECT (queue, "deactivating push mode"); queue->srcresult = GST_FLOW_FLUSHING; queue->sinkresult = GST_FLOW_FLUSHING; - gst_queue2_locked_flush (queue); + gst_queue2_locked_flush (queue, TRUE); GST_QUEUE2_MUTEX_UNLOCK (queue); } result = TRUE;