From cea7e759d2a01bbdf8a41e3ba488273e0a11cc75 Mon Sep 17 00:00:00 2001 From: "Ronald S. Bultje" Date: Tue, 1 Mar 2005 16:29:01 +0000 Subject: [PATCH] gst/gstqueue.c: Don't drop buffers on interrupt. Rather, ignore fulness status and preserve stream (#159676). Original commit message from CVS: * gst/gstqueue.c: (gst_queue_locked_flush), (gst_queue_chain), (gst_queue_get), (gst_queue_handle_src_event): Don't drop buffers on interrupt. Rather, ignore fulness status and preserve stream (#159676). (gst_queue_handle_src_query): Don't go below zero. --- ChangeLog | 9 +++++++++ gst/gstqueue.c | 36 +++++++++++++++++++++++++++++------- plugins/elements/gstqueue.c | 36 +++++++++++++++++++++++++++++------- 3 files changed, 67 insertions(+), 14 deletions(-) diff --git a/ChangeLog b/ChangeLog index 06c98ebc12..2a1448f0ea 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,12 @@ +2005-03-01 Ronald S. Bultje + + * gst/gstqueue.c: (gst_queue_locked_flush), (gst_queue_chain), + (gst_queue_get), (gst_queue_handle_src_event): + Don't drop buffers on interrupt. Rather, ignore fulness status + and preserve stream (#159676). + (gst_queue_handle_src_query): + Don't go below zero. + 2005-02-27 Phil Blundell Reviewed by: Maciej Katafiasz diff --git a/gst/gstqueue.c b/gst/gstqueue.c index 23866344a8..3d21e8a028 100644 --- a/gst/gstqueue.c +++ b/gst/gstqueue.c @@ -474,6 +474,8 @@ gst_queue_link_src (GstPad * pad, const GstCaps * caps) static void gst_queue_locked_flush (GstQueue * queue) { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Flushing contents..."); + while (!g_queue_is_empty (queue->queue)) { GstData *data = g_queue_pop_head (queue->queue); @@ -569,7 +571,9 @@ restart: if (GST_IS_BUFFER (data)) GST_CAT_LOG_OBJECT (queue_dataflow, queue, - "adding buffer %p of size %d", data, GST_BUFFER_SIZE (data)); + "adding buffer %p of size %d and time %" GST_TIME_FORMAT, + data, GST_BUFFER_SIZE (data), + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (data))); /* We make space available if we're "full" according to whatever * the user defined as "full". Note that this only applies to buffers. @@ -660,7 +664,7 @@ restart: GST_QUEUE_MUTEX_UNLOCK; sched = gst_pad_get_scheduler (queue->sinkpad); if (!sched || gst_scheduler_interrupt (sched, GST_ELEMENT (queue))) { - goto out_unref; + goto ignore_interrupt; } /* if we got here because we were unlocked after a * flush, we don't need to add the buffer to the @@ -693,6 +697,7 @@ restart: } } + ignore_interrupt: /* OK, we've got a serious issue here. Imagine the situation * where the puller (next element) is sending an event here, * so it cannot pull events from the queue, and we cannot @@ -701,9 +706,16 @@ restart: * that, we handle pending upstream events here, too. */ gst_queue_handle_pending_events (queue); - STATUS (queue, "waiting for item_del signal from thread using qlock"); - g_cond_wait (queue->item_del, queue->qlock); - STATUS (queue, "received item_del signal from thread using qlock"); + if (!queue->interrupt) { + STATUS (queue, + "waiting for item_del signal from thread using qlock"); + g_cond_wait (queue->item_del, queue->qlock); + STATUS (queue, "received item_del signal from thread using qlock"); + } else { + GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, + "Not waiting, just adding buffer, after interrupt (bad!)"); + break; + } } STATUS (queue, "post-full wait"); @@ -848,6 +860,9 @@ restart: queue->cur_level.bytes -= GST_BUFFER_SIZE (data); if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE) queue->cur_level.time -= GST_BUFFER_DURATION (data); + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "Got buffer of time %" GST_TIME_FORMAT, + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (data))); } /* Now that we're done, we can lose our own reference to @@ -947,6 +962,7 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event) if (GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH) { gst_queue_locked_flush (queue); } + break; default: break; } @@ -974,10 +990,16 @@ gst_queue_handle_src_query (GstPad * pad, /* FIXME: this code assumes that there's no discont in the queue */ switch (*fmt) { case GST_FORMAT_BYTES: - *value -= queue->cur_level.bytes; + if (*value >= queue->cur_level.bytes) + *value -= queue->cur_level.bytes; + else + *value = 0; break; case GST_FORMAT_TIME: - *value -= queue->cur_level.time; + if (*value >= queue->cur_level.time) + *value -= queue->cur_level.time; + else + *value = 0; break; default: /* FIXME */ diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index 23866344a8..3d21e8a028 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -474,6 +474,8 @@ gst_queue_link_src (GstPad * pad, const GstCaps * caps) static void gst_queue_locked_flush (GstQueue * queue) { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Flushing contents..."); + while (!g_queue_is_empty (queue->queue)) { GstData *data = g_queue_pop_head (queue->queue); @@ -569,7 +571,9 @@ restart: if (GST_IS_BUFFER (data)) GST_CAT_LOG_OBJECT (queue_dataflow, queue, - "adding buffer %p of size %d", data, GST_BUFFER_SIZE (data)); + "adding buffer %p of size %d and time %" GST_TIME_FORMAT, + data, GST_BUFFER_SIZE (data), + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (data))); /* We make space available if we're "full" according to whatever * the user defined as "full". Note that this only applies to buffers. @@ -660,7 +664,7 @@ restart: GST_QUEUE_MUTEX_UNLOCK; sched = gst_pad_get_scheduler (queue->sinkpad); if (!sched || gst_scheduler_interrupt (sched, GST_ELEMENT (queue))) { - goto out_unref; + goto ignore_interrupt; } /* if we got here because we were unlocked after a * flush, we don't need to add the buffer to the @@ -693,6 +697,7 @@ restart: } } + ignore_interrupt: /* OK, we've got a serious issue here. Imagine the situation * where the puller (next element) is sending an event here, * so it cannot pull events from the queue, and we cannot @@ -701,9 +706,16 @@ restart: * that, we handle pending upstream events here, too. */ gst_queue_handle_pending_events (queue); - STATUS (queue, "waiting for item_del signal from thread using qlock"); - g_cond_wait (queue->item_del, queue->qlock); - STATUS (queue, "received item_del signal from thread using qlock"); + if (!queue->interrupt) { + STATUS (queue, + "waiting for item_del signal from thread using qlock"); + g_cond_wait (queue->item_del, queue->qlock); + STATUS (queue, "received item_del signal from thread using qlock"); + } else { + GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, + "Not waiting, just adding buffer, after interrupt (bad!)"); + break; + } } STATUS (queue, "post-full wait"); @@ -848,6 +860,9 @@ restart: queue->cur_level.bytes -= GST_BUFFER_SIZE (data); if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE) queue->cur_level.time -= GST_BUFFER_DURATION (data); + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "Got buffer of time %" GST_TIME_FORMAT, + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (data))); } /* Now that we're done, we can lose our own reference to @@ -947,6 +962,7 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event) if (GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH) { gst_queue_locked_flush (queue); } + break; default: break; } @@ -974,10 +990,16 @@ gst_queue_handle_src_query (GstPad * pad, /* FIXME: this code assumes that there's no discont in the queue */ switch (*fmt) { case GST_FORMAT_BYTES: - *value -= queue->cur_level.bytes; + if (*value >= queue->cur_level.bytes) + *value -= queue->cur_level.bytes; + else + *value = 0; break; case GST_FORMAT_TIME: - *value -= queue->cur_level.time; + if (*value >= queue->cur_level.time) + *value -= queue->cur_level.time; + else + *value = 0; break; default: /* FIXME */