queue/queue2/multiqueue: When flushing, make sure to not lose any sticky events

https://bugzilla.gnome.org/show_bug.cgi?id=688824
This commit is contained in:
Sebastian Dröge 2013-05-27 13:01:43 +02:00
parent 73895c05b1
commit b6fac17502
3 changed files with 65 additions and 16 deletions

View file

@ -198,6 +198,8 @@ static void compute_high_time (GstMultiQueue * mq);
static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq); static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq); static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
static void gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full);
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u", static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u",
GST_PAD_SINK, GST_PAD_SINK,
GST_PAD_REQUEST, GST_PAD_REQUEST,
@ -733,7 +735,8 @@ gst_multi_queue_change_state (GstElement * element, GstStateChange transition)
} }
static gboolean static gboolean
gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush) gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush,
gboolean full)
{ {
gboolean result; gboolean result;
@ -760,7 +763,7 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
sq->sink_tainted = sq->src_tainted = TRUE; sq->sink_tainted = sq->src_tainted = TRUE;
} else { } else {
GST_MULTI_QUEUE_MUTEX_LOCK (mq); GST_MULTI_QUEUE_MUTEX_LOCK (mq);
gst_data_queue_flush (sq->queue); gst_single_queue_flush_queue (sq, full);
gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME); gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
gst_segment_init (&sq->src_segment, GST_FORMAT_TIME); gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
/* All pads start off not-linked for a smooth kick-off */ /* All pads start off not-linked for a smooth kick-off */
@ -1372,7 +1375,7 @@ out_flushing:
* so empty this one and trigger dynamic queue growth. At * so empty this one and trigger dynamic queue growth. At
* this point the srcresult is not OK, NOT_LINKED * this point the srcresult is not OK, NOT_LINKED
* or EOS, i.e. a real failure */ * or EOS, i.e. a real failure */
gst_data_queue_flush (sq->queue); gst_single_queue_flush_queue (sq, FALSE);
single_queue_underrun_cb (sq->queue, sq); single_queue_underrun_cb (sq->queue, sq);
gst_data_queue_set_flushing (sq->queue, TRUE); gst_data_queue_set_flushing (sq->queue, TRUE);
gst_pad_pause_task (sq->srcpad); gst_pad_pause_task (sq->srcpad);
@ -1511,7 +1514,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
res = gst_pad_push_event (sq->srcpad, event); res = gst_pad_push_event (sq->srcpad, event);
gst_single_queue_flush (mq, sq, TRUE); gst_single_queue_flush (mq, sq, TRUE, FALSE);
goto done; goto done;
case GST_EVENT_FLUSH_STOP: case GST_EVENT_FLUSH_STOP:
@ -1520,7 +1523,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
res = gst_pad_push_event (sq->srcpad, event); res = gst_pad_push_event (sq->srcpad, event);
gst_single_queue_flush (mq, sq, FALSE); gst_single_queue_flush (mq, sq, FALSE, FALSE);
goto done; goto done;
case GST_EVENT_SEGMENT: case GST_EVENT_SEGMENT:
/* take ref because the queue will take ownership and we need the event /* take ref because the queue will take ownership and we need the event
@ -1646,9 +1649,9 @@ gst_multi_queue_src_activate_mode (GstPad * pad, GstObject * parent,
switch (mode) { switch (mode) {
case GST_PAD_MODE_PUSH: case GST_PAD_MODE_PUSH:
if (active) { if (active) {
result = gst_single_queue_flush (mq, sq, FALSE); result = gst_single_queue_flush (mq, sq, FALSE, TRUE);
} else { } else {
result = gst_single_queue_flush (mq, sq, TRUE); result = gst_single_queue_flush (mq, sq, TRUE, TRUE);
/* make sure streaming finishes */ /* make sure streaming finishes */
result |= gst_pad_stop_task (pad); result |= gst_pad_stop_task (pad);
} }
@ -1951,6 +1954,41 @@ single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes,
return res; return res;
} }
static void
gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full)
{
GstDataQueueItem *sitem;
gboolean was_flushing = FALSE;
while (!gst_data_queue_is_empty (sq->queue)) {
GstMiniObject *data;
/* FIXME: If this fails here although the queue is not empty,
* we're flushing... but we want to rescue all sticky
* events nonetheless.
*/
if (!gst_data_queue_pop (sq->queue, &sitem)) {
was_flushing = TRUE;
gst_data_queue_set_flushing (sq->queue, FALSE);
continue;
}
data = sitem->object;
if (!full && GST_IS_EVENT (data) && GST_EVENT_IS_STICKY (data) &&
GST_EVENT_TYPE (data) != GST_EVENT_SEGMENT
&& GST_EVENT_TYPE (data) != GST_EVENT_EOS) {
gst_pad_store_sticky_event (sq->srcpad, GST_EVENT_CAST (data));
}
sitem->destroy (sitem);
}
gst_data_queue_flush (sq->queue);
if (was_flushing)
gst_data_queue_set_flushing (sq->queue, TRUE);
}
static void static void
gst_single_queue_free (GstSingleQueue * sq) gst_single_queue_free (GstSingleQueue * sq)
{ {

View file

@ -209,7 +209,7 @@ static gboolean gst_queue_handle_src_event (GstPad * pad, GstObject * parent,
static gboolean gst_queue_handle_src_query (GstPad * pad, GstObject * parent, static gboolean gst_queue_handle_src_query (GstPad * pad, GstObject * parent,
GstQuery * query); GstQuery * query);
static void gst_queue_locked_flush (GstQueue * queue); static void gst_queue_locked_flush (GstQueue * queue, gboolean full);
static gboolean gst_queue_src_activate_mode (GstPad * pad, GstObject * parent, static gboolean gst_queue_src_activate_mode (GstPad * pad, GstObject * parent,
GstPadMode mode, gboolean active); GstPadMode mode, gboolean active);
@ -573,7 +573,7 @@ apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment,
} }
static void static void
gst_queue_locked_flush (GstQueue * queue) gst_queue_locked_flush (GstQueue * queue, gboolean full)
{ {
GstMiniObject *data; GstMiniObject *data;
@ -581,6 +581,11 @@ gst_queue_locked_flush (GstQueue * queue)
data = gst_queue_array_pop_head (queue->queue); data = gst_queue_array_pop_head (queue->queue);
/* Then lose another reference because we are supposed to destroy that /* Then lose another reference because we are supposed to destroy that
data when flushing */ data when flushing */
if (!full && GST_IS_EVENT (data) && GST_EVENT_IS_STICKY (data) &&
GST_EVENT_TYPE (data) != GST_EVENT_SEGMENT
&& GST_EVENT_TYPE (data) != GST_EVENT_EOS) {
gst_pad_store_sticky_event (queue->srcpad, GST_EVENT_CAST (data));
}
if (!GST_IS_QUERY (data)) if (!GST_IS_QUERY (data))
gst_mini_object_unref (data); gst_mini_object_unref (data);
} }
@ -627,7 +632,7 @@ gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item)
/* Zero the thresholds, this makes sure the queue is completely /* Zero the thresholds, this makes sure the queue is completely
* filled and we can read all data from the queue. */ * filled and we can read all data from the queue. */
if (queue->flush_on_eos) if (queue->flush_on_eos)
gst_queue_locked_flush (queue); gst_queue_locked_flush (queue, FALSE);
else else
GST_QUEUE_CLEAR_LEVEL (queue->min_threshold); GST_QUEUE_CLEAR_LEVEL (queue->min_threshold);
/* mark the queue as EOS. This prevents us from accepting more data. */ /* mark the queue as EOS. This prevents us from accepting more data. */
@ -758,7 +763,7 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
gst_pad_push_event (queue->srcpad, event); gst_pad_push_event (queue->srcpad, event);
GST_QUEUE_MUTEX_LOCK (queue); GST_QUEUE_MUTEX_LOCK (queue);
gst_queue_locked_flush (queue); gst_queue_locked_flush (queue, FALSE);
queue->srcresult = GST_FLOW_OK; queue->srcresult = GST_FLOW_OK;
queue->eos = FALSE; queue->eos = FALSE;
queue->unexpected = FALSE; queue->unexpected = FALSE;
@ -1228,7 +1233,7 @@ out_flushing:
GST_CAT_LOG_OBJECT (queue_dataflow, queue, GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"pause task, reason: %s", gst_flow_get_name (ret)); "pause task, reason: %s", gst_flow_get_name (ret));
if (ret == GST_FLOW_FLUSHING) if (ret == GST_FLOW_FLUSHING)
gst_queue_locked_flush (queue); gst_queue_locked_flush (queue, FALSE);
else else
GST_QUEUE_SIGNAL_DEL (queue); GST_QUEUE_SIGNAL_DEL (queue);
GST_QUEUE_MUTEX_UNLOCK (queue); GST_QUEUE_MUTEX_UNLOCK (queue);
@ -1377,7 +1382,7 @@ gst_queue_sink_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
/* step 1, unblock chain function */ /* step 1, unblock chain function */
GST_QUEUE_MUTEX_LOCK (queue); GST_QUEUE_MUTEX_LOCK (queue);
queue->srcresult = GST_FLOW_FLUSHING; queue->srcresult = GST_FLOW_FLUSHING;
gst_queue_locked_flush (queue); gst_queue_locked_flush (queue, TRUE);
GST_QUEUE_MUTEX_UNLOCK (queue); GST_QUEUE_MUTEX_UNLOCK (queue);
} }
result = TRUE; result = TRUE;

View file

@ -1520,7 +1520,7 @@ gst_queue2_flush_temp_file (GstQueue2 * queue)
} }
static void static void
gst_queue2_locked_flush (GstQueue2 * queue) gst_queue2_locked_flush (GstQueue2 * queue, gboolean full)
{ {
if (!QUEUE_IS_USING_QUEUE (queue)) { if (!QUEUE_IS_USING_QUEUE (queue)) {
if (QUEUE_IS_USING_TEMP_FILE (queue)) if (QUEUE_IS_USING_TEMP_FILE (queue))
@ -1530,6 +1530,12 @@ gst_queue2_locked_flush (GstQueue2 * queue)
while (!g_queue_is_empty (&queue->queue)) { while (!g_queue_is_empty (&queue->queue)) {
GstMiniObject *data = g_queue_pop_head (&queue->queue); GstMiniObject *data = g_queue_pop_head (&queue->queue);
if (!full && GST_IS_EVENT (data) && GST_EVENT_IS_STICKY (data) &&
GST_EVENT_TYPE (data) != GST_EVENT_SEGMENT
&& GST_EVENT_TYPE (data) != GST_EVENT_EOS) {
gst_pad_store_sticky_event (queue->srcpad, GST_EVENT_CAST (data));
}
/* Then lose another reference because we are supposed to destroy that /* Then lose another reference because we are supposed to destroy that
data when flushing */ data when flushing */
if (!GST_IS_QUERY (data)) if (!GST_IS_QUERY (data))
@ -2202,7 +2208,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
gst_pad_push_event (queue->srcpad, event); gst_pad_push_event (queue->srcpad, event);
GST_QUEUE2_MUTEX_LOCK (queue); GST_QUEUE2_MUTEX_LOCK (queue);
gst_queue2_locked_flush (queue); gst_queue2_locked_flush (queue, FALSE);
queue->srcresult = GST_FLOW_OK; queue->srcresult = GST_FLOW_OK;
queue->sinkresult = GST_FLOW_OK; queue->sinkresult = GST_FLOW_OK;
queue->is_eos = FALSE; queue->is_eos = FALSE;
@ -3043,7 +3049,7 @@ gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
GST_DEBUG_OBJECT (queue, "deactivating push mode"); GST_DEBUG_OBJECT (queue, "deactivating push mode");
queue->srcresult = GST_FLOW_FLUSHING; queue->srcresult = GST_FLOW_FLUSHING;
queue->sinkresult = GST_FLOW_FLUSHING; queue->sinkresult = GST_FLOW_FLUSHING;
gst_queue2_locked_flush (queue); gst_queue2_locked_flush (queue, TRUE);
GST_QUEUE2_MUTEX_UNLOCK (queue); GST_QUEUE2_MUTEX_UNLOCK (queue);
} }
result = TRUE; result = TRUE;