multiqueue: avoid returning downstream GST_FLOW_EOS from previous segment to current upstream segment

This commit is contained in:
Mark Nauwelaerts 2015-02-21 17:13:26 +01:00
parent 70603c1d27
commit c3454a85c5

View file

@ -1209,7 +1209,7 @@ done:
static GstFlowReturn
gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
GstMiniObject * object)
GstMiniObject * object, gboolean * allow_drop)
{
GstFlowReturn result = sq->srcresult;
@ -1226,11 +1226,17 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
/* Applying the buffer may have made the queue non-full again, unblock it if needed */
gst_data_queue_limits_changed (sq->queue);
GST_DEBUG_OBJECT (mq,
"SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT,
sq->id, buffer, GST_TIME_ARGS (timestamp));
result = gst_pad_push (sq->srcpad, buffer);
if (G_UNLIKELY (*allow_drop)) {
GST_DEBUG_OBJECT (mq,
"SingleQueue %d : Dropping EOS buffer %p with ts %" GST_TIME_FORMAT,
sq->id, buffer, GST_TIME_ARGS (timestamp));
gst_buffer_unref (buffer);
} else {
GST_DEBUG_OBJECT (mq,
"SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT,
sq->id, buffer, GST_TIME_ARGS (timestamp));
result = gst_pad_push (sq->srcpad, buffer);
}
} else if (GST_IS_EVENT (object)) {
GstEvent *event;
@ -1239,11 +1245,17 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS:
result = GST_FLOW_EOS;
if (G_UNLIKELY (*allow_drop))
*allow_drop = FALSE;
break;
case GST_EVENT_SEGMENT:
apply_segment (mq, sq, event, &sq->src_segment);
/* Applying the segment may have made the queue non-full again, unblock it if needed */
gst_data_queue_limits_changed (sq->queue);
if (G_UNLIKELY (*allow_drop)) {
result = GST_FLOW_OK;
*allow_drop = FALSE;
}
break;
case GST_EVENT_GAP:
apply_gap (mq, sq, event, &sq->src_segment);
@ -1254,18 +1266,32 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
break;
}
GST_DEBUG_OBJECT (mq,
"SingleQueue %d : Pushing event %p of type %s",
sq->id, event, GST_EVENT_TYPE_NAME (event));
if (G_UNLIKELY (*allow_drop)) {
GST_DEBUG_OBJECT (mq,
"SingleQueue %d : Dropping EOS event %p of type %s",
sq->id, event, GST_EVENT_TYPE_NAME (event));
gst_event_unref (event);
} else {
GST_DEBUG_OBJECT (mq,
"SingleQueue %d : Pushing event %p of type %s",
sq->id, event, GST_EVENT_TYPE_NAME (event));
gst_pad_push_event (sq->srcpad, event);
gst_pad_push_event (sq->srcpad, event);
}
} else if (GST_IS_QUERY (object)) {
GstQuery *query;
gboolean res;
query = GST_QUERY_CAST (object);
res = gst_pad_peer_query (sq->srcpad, query);
if (G_UNLIKELY (*allow_drop)) {
GST_DEBUG_OBJECT (mq,
"SingleQueue %d : Dropping EOS query %p", sq->id, query);
gst_query_unref (query);
res = FALSE;
} else {
res = gst_pad_peer_query (sq->srcpad, query);
}
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
sq->last_query = res;
@ -1354,10 +1380,12 @@ gst_multi_queue_loop (GstPad * pad)
GstClockTime next_time;
gboolean is_buffer;
gboolean do_update_buffering = FALSE;
gboolean dropping = FALSE;
sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
mq = sq->mqueue;
next:
GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);
if (sq->flushing)
@ -1485,7 +1513,7 @@ gst_multi_queue_loop (GstPad * pad)
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
/* Try to push out the new object */
result = gst_single_queue_push_one (mq, sq, object);
result = gst_single_queue_push_one (mq, sq, object, &dropping);
object = NULL;
/* Check if we pushed something already and if this is
@ -1525,6 +1553,25 @@ gst_multi_queue_loop (GstPad * pad)
if (is_buffer)
sq->pushed = TRUE;
/* now hold on a bit;
* can not simply throw this result to upstream, because
* that might already be onto another segment, so we have to make
* sure we are relaying the correct info wrt proper segment */
if (result == GST_FLOW_EOS && !dropping &&
sq->srcresult != GST_FLOW_NOT_LINKED) {
GST_DEBUG_OBJECT (mq, "starting EOS drop on sq %d", sq->id);
dropping = TRUE;
/* pretend we have not seen EOS yet for upstream's sake */
result = sq->srcresult;
} else if (dropping && gst_data_queue_is_empty (sq->queue)) {
/* queue empty, so stop dropping
* we can commit the result we have now,
* which is either OK after a segment, or EOS */
GST_DEBUG_OBJECT (mq, "committed EOS drop on sq %d", sq->id);
dropping = FALSE;
result = GST_FLOW_EOS;
}
sq->srcresult = result;
sq->last_oldid = newid;
@ -1534,6 +1581,9 @@ gst_multi_queue_loop (GstPad * pad)
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
gst_multi_queue_post_buffering (mq);
if (dropping)
goto next;
if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED
&& result != GST_FLOW_EOS)
goto out_flushing;
@ -1807,6 +1857,11 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
case GST_EVENT_SEGMENT:
apply_segment (mq, sq, sref, &sq->sink_segment);
gst_event_unref (sref);
/* a new segment allows us to accept more buffers if we got EOS
* from downstream */
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
sq->srcresult = GST_FLOW_OK;
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
break;
case GST_EVENT_GAP:
apply_gap (mq, sq, sref, &sq->sink_segment);