From d133f1548eb3ee6cea8ea4dec4fc91cdda9d7c46 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 17 Sep 2007 16:22:17 +0000 Subject: [PATCH] gst/playback/gstqueue2.c: Also fix #476514 for queue2. Original commit message from CVS: * gst/playback/gstqueue2.c: (update_buffering), (gst_queue_locked_flush), (gst_queue_locked_enqueue), (gst_queue_handle_sink_event), (gst_queue_chain), (gst_queue_push_one), (gst_queue_sink_activate_push), (gst_queue_src_activate_push), (gst_queue_src_activate_pull): Also fix #476514 for queue2. --- ChangeLog | 9 ++++ gst/playback/gstqueue2.c | 97 +++++++++++++++++++++++++++++++++++++--- 2 files changed, 100 insertions(+), 6 deletions(-) diff --git a/ChangeLog b/ChangeLog index 7e0a3e6f99..d6739203a4 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,12 @@ +2007-09-17 Wim Taymans + + * gst/playback/gstqueue2.c: (update_buffering), + (gst_queue_locked_flush), (gst_queue_locked_enqueue), + (gst_queue_handle_sink_event), (gst_queue_chain), + (gst_queue_push_one), (gst_queue_sink_activate_push), + (gst_queue_src_activate_push), (gst_queue_src_activate_pull): + Also fix #476514 for queue2. + 2007-09-16 Wim Taymans * gst-libs/gst/rtp/gstbasertpdepayload.c: diff --git a/gst/playback/gstqueue2.c b/gst/playback/gstqueue2.c index 44627518b7..d478cda914 100644 --- a/gst/playback/gstqueue2.c +++ b/gst/playback/gstqueue2.c @@ -158,6 +158,7 @@ struct _GstQueue /* flowreturn when srcpad is paused */ GstFlowReturn srcresult; gboolean is_eos; + gboolean unexpected; /* the queue of data we're keeping our hands on */ GQueue *queue; @@ -166,11 +167,13 @@ struct _GstQueue GstQueueSize max_level; /* max. amount of data allowed in the queue */ gboolean use_buffering; gboolean use_rate_estimate; + GstClockTime buffering_interval; gint low_percent; /* low/high watermarks for buffering */ gint high_percent; /* current buffering state */ gboolean is_buffering; + guint buffering_iteration; /* for measuring input/output rates */ guint64 bytes_in; @@ -682,6 +685,7 @@ update_buffering (GstQueue * queue) * below the low threshold */ if (percent < queue->low_percent) { queue->is_buffering = TRUE; + queue->buffering_iteration++; post = TRUE; } } @@ -985,7 +989,6 @@ gst_queue_locked_flush (GstQueue * queue) GST_QUEUE_CLEAR_LEVEL (queue->cur_level); gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME); gst_segment_init (&queue->src_segment, GST_FORMAT_TIME); - queue->is_eos = FALSE; if (queue->starting_segment != NULL) gst_event_unref (queue->starting_segment); queue->starting_segment = NULL; @@ -1041,6 +1044,9 @@ gst_queue_locked_enqueue (GstQueue * queue, gpointer item) queue->segment_event_received = TRUE; queue->starting_segment = event; } + /* a new segment allows us to accept more buffers if we got UNEXPECTED + * from downstream */ + queue->unexpected = FALSE; break; default: if (QUEUE_IS_USING_TEMP_FILE (queue)) @@ -1182,6 +1188,8 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) GST_QUEUE_MUTEX_LOCK (queue); gst_queue_locked_flush (queue); queue->srcresult = GST_FLOW_OK; + queue->is_eos = FALSE; + queue->unexpected = FALSE; /* reset rate counters */ reset_rate_timer (queue); gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop, @@ -1193,6 +1201,9 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) if (GST_EVENT_IS_SERIALIZED (event)) { /* serialized events go in the queue */ GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); + /* refuse more events on EOS */ + if (queue->is_eos) + goto out_eos; gst_queue_locked_enqueue (queue, event); GST_QUEUE_MUTEX_UNLOCK (queue); } else { @@ -1207,10 +1218,16 @@ done: /* ERRORS */ out_flushing: { - GST_DEBUG_OBJECT (queue, "we are flushing"); + GST_DEBUG_OBJECT (queue, "refusing event, we are flushing"); GST_QUEUE_MUTEX_UNLOCK (queue); - - gst_buffer_unref (event); + gst_event_unref (event); + return FALSE; + } +out_eos: + { + GST_DEBUG_OBJECT (queue, "refusing event, we are EOS"); + GST_QUEUE_MUTEX_UNLOCK (queue); + gst_event_unref (event); return FALSE; } } @@ -1275,6 +1292,12 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) /* we have to lock the queue since we span threads */ GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); + /* when we received EOS, we refuse more data */ + if (queue->is_eos) + goto out_eos; + /* when we received unexpected from downstream, refuse more buffers */ + if (queue->unexpected) + goto out_unexpected; /* We make space available if we're "full" according to whatever * the user defined as "full". */ @@ -1299,11 +1322,27 @@ out_flushing: GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because task paused, reason: %s", gst_flow_get_name (ret)); GST_QUEUE_MUTEX_UNLOCK (queue); - gst_buffer_unref (buffer); return ret; } +out_eos: + { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS"); + GST_QUEUE_MUTEX_UNLOCK (queue); + gst_buffer_unref (buffer); + + return GST_FLOW_UNEXPECTED; + } +out_unexpected: + { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "exit because we received UNEXPECTED"); + GST_QUEUE_MUTEX_UNLOCK (queue); + gst_buffer_unref (buffer); + + return GST_FLOW_UNEXPECTED; + } } /* dequeue an item from the queue an push it downstream. This functions returns @@ -1318,6 +1357,7 @@ gst_queue_push_one (GstQueue * queue) if (data == NULL) goto no_item; +next: if (GST_IS_BUFFER (data)) { GstBuffer *buffer = GST_BUFFER_CAST (data); @@ -1327,6 +1367,42 @@ gst_queue_push_one (GstQueue * queue) /* need to check for srcresult here as well */ GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); + if (result == GST_FLOW_UNEXPECTED) { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "got UNEXPECTED from downstream"); + /* stop pushing buffers, we dequeue all items until we see an item that we + * can push again, which is EOS or NEWSEGMENT. If there is nothing in the + * queue we can push, we set a flag to make the sinkpad refuse more + * buffers with an UNEXPECTED return value until we receive something + * pushable again or we get flushed. */ + while ((data = gst_queue_locked_dequeue (queue))) { + if (GST_IS_BUFFER (data)) { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "dropping UNEXPECTED buffer %p", data); + gst_buffer_unref (GST_BUFFER_CAST (data)); + } else if (GST_IS_EVENT (data)) { + GstEvent *event = GST_EVENT_CAST (data); + GstEventType type = GST_EVENT_TYPE (event); + + if (type == GST_EVENT_EOS || type == GST_EVENT_NEWSEGMENT) { + /* we found a pushable item in the queue, push it out */ + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "pushing pushable event %s after UNEXPECTED %p", + GST_EVENT_TYPE_NAME (event)); + goto next; + } + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "dropping UNEXPECTED event %p", event); + gst_event_unref (event); + } + } + /* no more items in the queue. Set the unexpected flag so that upstream + * make us refuse any more buffers on the sinkpad. Since we will still + * accept EOS and NEWSEGMENT we return _FLOW_OK to the caller so that the + * task function does not shut down. */ + queue->unexpected = TRUE; + result = GST_FLOW_OK; + } } else if (GST_IS_EVENT (data)) { GstEvent *event = GST_EVENT_CAST (data); GstEventType type = GST_EVENT_TYPE (event); @@ -1337,8 +1413,11 @@ gst_queue_push_one (GstQueue * queue) GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); /* if we're EOS, return UNEXPECTED so that the task pauses. */ - if (type == GST_EVENT_EOS) + if (type == GST_EVENT_EOS) { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "pushed EOS event %p, return UNEXPECTED", event); result = GST_FLOW_UNEXPECTED; + } } return result; @@ -1557,6 +1636,8 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active) GST_QUEUE_MUTEX_LOCK (queue); GST_DEBUG_OBJECT (queue, "activating push mode"); queue->srcresult = GST_FLOW_OK; + queue->is_eos = FALSE; + queue->unexpected = FALSE; reset_rate_timer (queue); GST_QUEUE_MUTEX_UNLOCK (queue); } else { @@ -1587,6 +1668,8 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active) GST_QUEUE_MUTEX_LOCK (queue); GST_DEBUG_OBJECT (queue, "activating push mode"); queue->srcresult = GST_FLOW_OK; + queue->is_eos = FALSE; + queue->unexpected = FALSE; result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad); GST_QUEUE_MUTEX_UNLOCK (queue); } else { @@ -1621,6 +1704,8 @@ gst_queue_src_activate_pull (GstPad * pad, gboolean active) GST_QUEUE_MUTEX_LOCK (queue); GST_DEBUG_OBJECT (queue, "activating pull mode"); queue->srcresult = GST_FLOW_OK; + queue->is_eos = FALSE; + queue->unexpected = FALSE; result = TRUE; GST_QUEUE_MUTEX_UNLOCK (queue); } else {