gst/gstqueue.c: Don't drop buffers on interrupt. Rather, ignore fulness status and preserve stream (#159676).

Original commit message from CVS:
* gst/gstqueue.c: (gst_queue_locked_flush), (gst_queue_chain),
(gst_queue_get), (gst_queue_handle_src_event):
Don't drop buffers on interrupt. Rather, ignore fulness status
and preserve stream (#159676).
(gst_queue_handle_src_query):
Don't go below zero.
This commit is contained in:
Ronald S. Bultje 2005-03-01 16:29:01 +00:00
parent 84f4a18302
commit cea7e759d2
3 changed files with 67 additions and 14 deletions

View file

@ -1,3 +1,12 @@
2005-03-01 Ronald S. Bultje <rbultje@ronald.bitfreak.net>
* gst/gstqueue.c: (gst_queue_locked_flush), (gst_queue_chain),
(gst_queue_get), (gst_queue_handle_src_event):
Don't drop buffers on interrupt. Rather, ignore fulness status
and preserve stream (#159676).
(gst_queue_handle_src_query):
Don't go below zero.
2005-02-27 Phil Blundell <pb@nexus.co.uk> 2005-02-27 Phil Blundell <pb@nexus.co.uk>
Reviewed by: Maciej Katafiasz <mathrick@freedesktop.org> Reviewed by: Maciej Katafiasz <mathrick@freedesktop.org>

View file

@ -474,6 +474,8 @@ gst_queue_link_src (GstPad * pad, const GstCaps * caps)
static void static void
gst_queue_locked_flush (GstQueue * queue) gst_queue_locked_flush (GstQueue * queue)
{ {
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Flushing contents...");
while (!g_queue_is_empty (queue->queue)) { while (!g_queue_is_empty (queue->queue)) {
GstData *data = g_queue_pop_head (queue->queue); GstData *data = g_queue_pop_head (queue->queue);
@ -569,7 +571,9 @@ restart:
if (GST_IS_BUFFER (data)) if (GST_IS_BUFFER (data))
GST_CAT_LOG_OBJECT (queue_dataflow, queue, GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"adding buffer %p of size %d", data, GST_BUFFER_SIZE (data)); "adding buffer %p of size %d and time %" GST_TIME_FORMAT,
data, GST_BUFFER_SIZE (data),
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (data)));
/* We make space available if we're "full" according to whatever /* We make space available if we're "full" according to whatever
* the user defined as "full". Note that this only applies to buffers. * the user defined as "full". Note that this only applies to buffers.
@ -660,7 +664,7 @@ restart:
GST_QUEUE_MUTEX_UNLOCK; GST_QUEUE_MUTEX_UNLOCK;
sched = gst_pad_get_scheduler (queue->sinkpad); sched = gst_pad_get_scheduler (queue->sinkpad);
if (!sched || gst_scheduler_interrupt (sched, GST_ELEMENT (queue))) { if (!sched || gst_scheduler_interrupt (sched, GST_ELEMENT (queue))) {
goto out_unref; goto ignore_interrupt;
} }
/* if we got here because we were unlocked after a /* if we got here because we were unlocked after a
* flush, we don't need to add the buffer to the * flush, we don't need to add the buffer to the
@ -693,6 +697,7 @@ restart:
} }
} }
ignore_interrupt:
/* OK, we've got a serious issue here. Imagine the situation /* OK, we've got a serious issue here. Imagine the situation
* where the puller (next element) is sending an event here, * where the puller (next element) is sending an event here,
* so it cannot pull events from the queue, and we cannot * so it cannot pull events from the queue, and we cannot
@ -701,9 +706,16 @@ restart:
* that, we handle pending upstream events here, too. */ * that, we handle pending upstream events here, too. */
gst_queue_handle_pending_events (queue); gst_queue_handle_pending_events (queue);
STATUS (queue, "waiting for item_del signal from thread using qlock"); if (!queue->interrupt) {
STATUS (queue,
"waiting for item_del signal from thread using qlock");
g_cond_wait (queue->item_del, queue->qlock); g_cond_wait (queue->item_del, queue->qlock);
STATUS (queue, "received item_del signal from thread using qlock"); STATUS (queue, "received item_del signal from thread using qlock");
} else {
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
"Not waiting, just adding buffer, after interrupt (bad!)");
break;
}
} }
STATUS (queue, "post-full wait"); STATUS (queue, "post-full wait");
@ -848,6 +860,9 @@ restart:
queue->cur_level.bytes -= GST_BUFFER_SIZE (data); queue->cur_level.bytes -= GST_BUFFER_SIZE (data);
if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE) if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
queue->cur_level.time -= GST_BUFFER_DURATION (data); queue->cur_level.time -= GST_BUFFER_DURATION (data);
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"Got buffer of time %" GST_TIME_FORMAT,
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (data)));
} }
/* Now that we're done, we can lose our own reference to /* Now that we're done, we can lose our own reference to
@ -947,6 +962,7 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
if (GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH) { if (GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH) {
gst_queue_locked_flush (queue); gst_queue_locked_flush (queue);
} }
break;
default: default:
break; break;
} }
@ -974,10 +990,16 @@ gst_queue_handle_src_query (GstPad * pad,
/* FIXME: this code assumes that there's no discont in the queue */ /* FIXME: this code assumes that there's no discont in the queue */
switch (*fmt) { switch (*fmt) {
case GST_FORMAT_BYTES: case GST_FORMAT_BYTES:
if (*value >= queue->cur_level.bytes)
*value -= queue->cur_level.bytes; *value -= queue->cur_level.bytes;
else
*value = 0;
break; break;
case GST_FORMAT_TIME: case GST_FORMAT_TIME:
if (*value >= queue->cur_level.time)
*value -= queue->cur_level.time; *value -= queue->cur_level.time;
else
*value = 0;
break; break;
default: default:
/* FIXME */ /* FIXME */

View file

@ -474,6 +474,8 @@ gst_queue_link_src (GstPad * pad, const GstCaps * caps)
static void static void
gst_queue_locked_flush (GstQueue * queue) gst_queue_locked_flush (GstQueue * queue)
{ {
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Flushing contents...");
while (!g_queue_is_empty (queue->queue)) { while (!g_queue_is_empty (queue->queue)) {
GstData *data = g_queue_pop_head (queue->queue); GstData *data = g_queue_pop_head (queue->queue);
@ -569,7 +571,9 @@ restart:
if (GST_IS_BUFFER (data)) if (GST_IS_BUFFER (data))
GST_CAT_LOG_OBJECT (queue_dataflow, queue, GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"adding buffer %p of size %d", data, GST_BUFFER_SIZE (data)); "adding buffer %p of size %d and time %" GST_TIME_FORMAT,
data, GST_BUFFER_SIZE (data),
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (data)));
/* We make space available if we're "full" according to whatever /* We make space available if we're "full" according to whatever
* the user defined as "full". Note that this only applies to buffers. * the user defined as "full". Note that this only applies to buffers.
@ -660,7 +664,7 @@ restart:
GST_QUEUE_MUTEX_UNLOCK; GST_QUEUE_MUTEX_UNLOCK;
sched = gst_pad_get_scheduler (queue->sinkpad); sched = gst_pad_get_scheduler (queue->sinkpad);
if (!sched || gst_scheduler_interrupt (sched, GST_ELEMENT (queue))) { if (!sched || gst_scheduler_interrupt (sched, GST_ELEMENT (queue))) {
goto out_unref; goto ignore_interrupt;
} }
/* if we got here because we were unlocked after a /* if we got here because we were unlocked after a
* flush, we don't need to add the buffer to the * flush, we don't need to add the buffer to the
@ -693,6 +697,7 @@ restart:
} }
} }
ignore_interrupt:
/* OK, we've got a serious issue here. Imagine the situation /* OK, we've got a serious issue here. Imagine the situation
* where the puller (next element) is sending an event here, * where the puller (next element) is sending an event here,
* so it cannot pull events from the queue, and we cannot * so it cannot pull events from the queue, and we cannot
@ -701,9 +706,16 @@ restart:
* that, we handle pending upstream events here, too. */ * that, we handle pending upstream events here, too. */
gst_queue_handle_pending_events (queue); gst_queue_handle_pending_events (queue);
STATUS (queue, "waiting for item_del signal from thread using qlock"); if (!queue->interrupt) {
STATUS (queue,
"waiting for item_del signal from thread using qlock");
g_cond_wait (queue->item_del, queue->qlock); g_cond_wait (queue->item_del, queue->qlock);
STATUS (queue, "received item_del signal from thread using qlock"); STATUS (queue, "received item_del signal from thread using qlock");
} else {
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
"Not waiting, just adding buffer, after interrupt (bad!)");
break;
}
} }
STATUS (queue, "post-full wait"); STATUS (queue, "post-full wait");
@ -848,6 +860,9 @@ restart:
queue->cur_level.bytes -= GST_BUFFER_SIZE (data); queue->cur_level.bytes -= GST_BUFFER_SIZE (data);
if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE) if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
queue->cur_level.time -= GST_BUFFER_DURATION (data); queue->cur_level.time -= GST_BUFFER_DURATION (data);
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"Got buffer of time %" GST_TIME_FORMAT,
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (data)));
} }
/* Now that we're done, we can lose our own reference to /* Now that we're done, we can lose our own reference to
@ -947,6 +962,7 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
if (GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH) { if (GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH) {
gst_queue_locked_flush (queue); gst_queue_locked_flush (queue);
} }
break;
default: default:
break; break;
} }
@ -974,10 +990,16 @@ gst_queue_handle_src_query (GstPad * pad,
/* FIXME: this code assumes that there's no discont in the queue */ /* FIXME: this code assumes that there's no discont in the queue */
switch (*fmt) { switch (*fmt) {
case GST_FORMAT_BYTES: case GST_FORMAT_BYTES:
if (*value >= queue->cur_level.bytes)
*value -= queue->cur_level.bytes; *value -= queue->cur_level.bytes;
else
*value = 0;
break; break;
case GST_FORMAT_TIME: case GST_FORMAT_TIME:
if (*value >= queue->cur_level.time)
*value -= queue->cur_level.time; *value -= queue->cur_level.time;
else
*value = 0;
break; break;
default: default:
/* FIXME */ /* FIXME */