mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-01-26 00:58:12 +00:00
plugins/elements/gstqueue.*: When downstream returns UNEXPECTED from pushing a buffer, don't try to push more buffers...
Original commit message from CVS: * plugins/elements/gstqueue.c: (gst_queue_locked_enqueue), (gst_queue_handle_sink_event), (gst_queue_chain), (gst_queue_push_one), (gst_queue_handle_src_query), (gst_queue_sink_activate_push), (gst_queue_src_activate_push): * plugins/elements/gstqueue.h: When downstream returns UNEXPECTED from pushing a buffer, don't try to push more buffers but allow pushing of EOS and NEWSEGMENT. Add some more debug info here and there. Fixes #476514.
This commit is contained in:
parent
4987f5a70f
commit
f49cb86d16
3 changed files with 103 additions and 2 deletions
11
ChangeLog
11
ChangeLog
|
@ -1,3 +1,14 @@
|
|||
2007-09-14 Wim Taymans <wim.taymans@gmail.com>
|
||||
|
||||
* plugins/elements/gstqueue.c: (gst_queue_locked_enqueue),
|
||||
(gst_queue_handle_sink_event), (gst_queue_chain),
|
||||
(gst_queue_push_one), (gst_queue_handle_src_query),
|
||||
(gst_queue_sink_activate_push), (gst_queue_src_activate_push):
|
||||
* plugins/elements/gstqueue.h:
|
||||
When downstream returns UNEXPECTED from pushing a buffer, don't try to
|
||||
push more buffers but allow pushing of EOS and NEWSEGMENT.
|
||||
Add some more debug info here and there. Fixes #476514.
|
||||
|
||||
2007-09-14 Wim Taymans <wim.taymans@gmail.com>
|
||||
|
||||
* libs/gst/base/gstbasesink.c: (gst_base_sink_init),
|
||||
|
|
|
@ -616,7 +616,7 @@ gst_queue_locked_flush (GstQueue * queue)
|
|||
GST_QUEUE_SIGNAL_DEL (queue);
|
||||
}
|
||||
|
||||
/* enqueue an item an update the level stats */
|
||||
/* enqueue an item an update the level stats, with QUEUE_LOCK */
|
||||
static void
|
||||
gst_queue_locked_enqueue (GstQueue * queue, gpointer item)
|
||||
{
|
||||
|
@ -636,9 +636,15 @@ gst_queue_locked_enqueue (GstQueue * queue, gpointer item)
|
|||
/* Zero the thresholds, this makes sure the queue is completely
|
||||
* filled and we can read all data from the queue. */
|
||||
GST_QUEUE_CLEAR_LEVEL (queue->min_threshold);
|
||||
/* mark the queue as EOS. This prevents us from accepting more data. */
|
||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from upstream");
|
||||
queue->eos = TRUE;
|
||||
break;
|
||||
case GST_EVENT_NEWSEGMENT:
|
||||
apply_segment (queue, event, &queue->sink_segment);
|
||||
/* a new segment allows us to accept more buffers if we got UNEXPECTED
|
||||
* from downstream */
|
||||
queue->unexpected = FALSE;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
|
@ -747,6 +753,8 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
|
|||
GST_QUEUE_MUTEX_LOCK (queue);
|
||||
gst_queue_locked_flush (queue);
|
||||
queue->srcresult = GST_FLOW_OK;
|
||||
queue->eos = FALSE;
|
||||
queue->unexpected = FALSE;
|
||||
if (gst_pad_is_linked (queue->srcpad)) {
|
||||
gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
|
||||
queue->srcpad);
|
||||
|
@ -762,6 +770,9 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
|
|||
if (GST_EVENT_IS_SERIALIZED (event)) {
|
||||
/* serialized events go in the queue */
|
||||
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
||||
/* refuse more events on EOS */
|
||||
if (queue->eos)
|
||||
goto out_eos;
|
||||
gst_queue_locked_enqueue (queue, event);
|
||||
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||
} else {
|
||||
|
@ -776,6 +787,15 @@ done:
|
|||
/* ERRORS */
|
||||
out_flushing:
|
||||
{
|
||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||
"refusing event, we are flushing");
|
||||
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||
gst_buffer_unref (event);
|
||||
return FALSE;
|
||||
}
|
||||
out_eos:
|
||||
{
|
||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "refusing event, we are EOS");
|
||||
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||
gst_buffer_unref (event);
|
||||
return FALSE;
|
||||
|
@ -815,6 +835,11 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
|
|||
|
||||
/* we have to lock the queue since we span threads */
|
||||
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
||||
/* when we received EOS, we refuse any more data */
|
||||
if (queue->eos)
|
||||
goto out_eos;
|
||||
if (queue->unexpected)
|
||||
goto out_unexpected;
|
||||
|
||||
timestamp = GST_BUFFER_TIMESTAMP (buffer);
|
||||
duration = GST_BUFFER_DURATION (buffer);
|
||||
|
@ -910,6 +935,25 @@ out_flushing:
|
|||
|
||||
return ret;
|
||||
}
|
||||
out_eos:
|
||||
{
|
||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
|
||||
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||
|
||||
gst_buffer_unref (buffer);
|
||||
|
||||
return GST_FLOW_UNEXPECTED;
|
||||
}
|
||||
out_unexpected:
|
||||
{
|
||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||
"exit because we received UNEXPECTED");
|
||||
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||
|
||||
gst_buffer_unref (buffer);
|
||||
|
||||
return GST_FLOW_UNEXPECTED;
|
||||
}
|
||||
}
|
||||
|
||||
/* dequeue an item from the queue an push it downstream. This functions returns
|
||||
|
@ -924,6 +968,7 @@ gst_queue_push_one (GstQueue * queue)
|
|||
if (data == NULL)
|
||||
goto no_item;
|
||||
|
||||
next:
|
||||
if (GST_IS_BUFFER (data)) {
|
||||
GstBuffer *buffer = GST_BUFFER_CAST (data);
|
||||
|
||||
|
@ -933,6 +978,42 @@ gst_queue_push_one (GstQueue * queue)
|
|||
|
||||
/* need to check for srcresult here as well */
|
||||
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
||||
|
||||
if (result == GST_FLOW_UNEXPECTED) {
|
||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||
"got UNEXPECTED from downstream");
|
||||
/* stop pushing buffers, we dequeue all items until we see an item that we
|
||||
* can push again, which is EOS or NEWSEGMENT. If there is nothing in the
|
||||
* queue we can push, we set a flag to make the sinkpad refuse more
|
||||
* buffers with an UNEXPECTED return value. */
|
||||
while ((data = gst_queue_locked_dequeue (queue))) {
|
||||
if (GST_IS_BUFFER (data)) {
|
||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||
"dropping UNEXPECTED buffer %p", data);
|
||||
gst_buffer_unref (GST_BUFFER_CAST (data));
|
||||
} else if (GST_IS_EVENT (data)) {
|
||||
GstEvent *event = GST_EVENT_CAST (data);
|
||||
GstEventType type = GST_EVENT_TYPE (event);
|
||||
|
||||
if (type == GST_EVENT_EOS || type == GST_EVENT_NEWSEGMENT) {
|
||||
/* we found a pushable item in the queue, push it out */
|
||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||
"pushing pushable event %s after UNEXPECTED %p",
|
||||
GST_EVENT_TYPE_NAME (event));
|
||||
goto next;
|
||||
}
|
||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||
"dropping UNEXPECTED event %p", event);
|
||||
gst_event_unref (event);
|
||||
}
|
||||
}
|
||||
/* no more items in the queue. Set the unexpected flag so that upstream
|
||||
* make us refuse any more buffers on the sinkpad. Since we will still
|
||||
* accept EOS and NEWSEGMENT we return _FLOW_OK to the caller so that the
|
||||
* task function does not shut down. */
|
||||
queue->unexpected = TRUE;
|
||||
result = GST_FLOW_OK;
|
||||
}
|
||||
} else if (GST_IS_EVENT (data)) {
|
||||
GstEvent *event = GST_EVENT_CAST (data);
|
||||
GstEventType type = GST_EVENT_TYPE (event);
|
||||
|
@ -943,8 +1024,11 @@ gst_queue_push_one (GstQueue * queue)
|
|||
|
||||
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
||||
/* if we're EOS, return UNEXPECTED so that the task pauses. */
|
||||
if (type == GST_EVENT_EOS)
|
||||
if (type == GST_EVENT_EOS) {
|
||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||
"pushed EOS event %p, return UNEXPECTED", event);
|
||||
result = GST_FLOW_UNEXPECTED;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
|
||||
|
@ -1106,6 +1190,8 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active)
|
|||
if (active) {
|
||||
GST_QUEUE_MUTEX_LOCK (queue);
|
||||
queue->srcresult = GST_FLOW_OK;
|
||||
queue->eos = FALSE;
|
||||
queue->unexpected = FALSE;
|
||||
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||
} else {
|
||||
/* step 1, unblock chain function */
|
||||
|
@ -1131,6 +1217,8 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active)
|
|||
if (active) {
|
||||
GST_QUEUE_MUTEX_LOCK (queue);
|
||||
queue->srcresult = GST_FLOW_OK;
|
||||
queue->eos = FALSE;
|
||||
queue->unexpected = FALSE;
|
||||
/* we do not start the task yet if the pad is not connected */
|
||||
if (gst_pad_is_linked (pad))
|
||||
result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
|
||||
|
|
|
@ -87,6 +87,8 @@ struct _GstQueue {
|
|||
|
||||
/* flowreturn when srcpad is paused */
|
||||
GstFlowReturn srcresult;
|
||||
gboolean unexpected;
|
||||
gboolean eos;
|
||||
|
||||
/* the queue of data we're keeping our grubby hands on */
|
||||
GQueue *queue;
|
||||
|
|
Loading…
Reference in a new issue