plugins/elements/gstqueue.*: Be smarter when calculating the current amount of data in the queue by measuring the dif...

Original commit message from CVS:
* plugins/elements/gstqueue.c: (gst_queue_class_init),
(update_time_level), (gst_queue_locked_flush),
(gst_queue_handle_sink_event), (gst_queue_chain),
(gst_queue_push_one), (gst_queue_loop):
* plugins/elements/gstqueue.h:
Be smarter when calculating the current amount of data in the queue by
measuring the difference between start and end timestamps (in running
time) inside the queue. Fixes #432876.
API: GstQueue::pushing to notify elements that we are pushing data again
since the running signal is rather broken for this purpose.
This commit is contained in:
Wim Taymans 2007-05-10 15:21:20 +00:00
parent 687b7ad009
commit 3da8ea1c3d
3 changed files with 177 additions and 21 deletions

View file

@ -1,3 +1,16 @@
2007-05-10 Wim Taymans <wim@fluendo.com>
* plugins/elements/gstqueue.c: (gst_queue_class_init),
(update_time_level), (gst_queue_locked_flush),
(gst_queue_handle_sink_event), (gst_queue_chain),
(gst_queue_push_one), (gst_queue_loop):
* plugins/elements/gstqueue.h:
Be smarter when calculating the current amount of data in the queue by
measuring the difference between start and end timestamps (in running
time) inside the queue. Fixes #432876.
API: GstQueue::pushing to notify elements that we are pushing data again
since the running signal is rather broken for this purpose.
2007-05-10 Stefan Kost <ensonic@users.sf.net>
* plugins/elements/gstqueue.c (_do_init, gst_queue_signals,

View file

@ -106,6 +106,7 @@ enum
SIGNAL_UNDERRUN,
SIGNAL_RUNNING,
SIGNAL_OVERRUN,
SIGNAL_PUSHING,
LAST_SIGNAL
};
@ -277,6 +278,17 @@ gst_queue_class_init (GstQueueClass * klass)
g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
G_STRUCT_OFFSET (GstQueueClass, overrun), NULL, NULL,
g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
/**
* GstQueue::pushing:
* @queue: the queue instance
*
* Reports when the queue has enough data to start pushing data again on the
* source pad.
*/
gst_queue_signals[SIGNAL_PUSHING] =
g_signal_new ("pushing", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
G_STRUCT_OFFSET (GstQueueClass, pushing), NULL, NULL,
g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
/* properties */
g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES,
@ -489,6 +501,24 @@ gst_queue_acceptcaps (GstPad * pad, GstCaps * caps)
return TRUE;
}
static void
update_time_level (GstQueue * queue)
{
gint64 sink_time, src_time;
sink_time =
gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
queue->sink_segment.last_stop);
src_time = gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
queue->src_segment.last_stop);
if (sink_time >= src_time)
queue->cur_level.time = sink_time - src_time;
else
queue->cur_level.time = 0;
}
static void
gst_queue_locked_flush (GstQueue * queue)
{
@ -505,6 +535,8 @@ gst_queue_locked_flush (GstQueue * queue)
queue->min_threshold.buffers = queue->orig_min_threshold.buffers;
queue->min_threshold.bytes = queue->orig_min_threshold.bytes;
queue->min_threshold.time = queue->orig_min_threshold.time;
gst_segment_init (&queue->sink_segment, GST_FORMAT_UNDEFINED);
gst_segment_init (&queue->src_segment, GST_FORMAT_UNDEFINED);
/* we deleted something... */
g_cond_signal (queue->item_del);
@ -559,6 +591,24 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
STATUS (queue, pad, "received EOS");
have_eos = TRUE;
break;
case GST_EVENT_NEWSEGMENT:
{
gboolean update;
GstFormat format;
gdouble rate, arate;
gint64 start, stop, time;
gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
&start, &stop, &time);
GST_DEBUG_OBJECT (queue, "received NEWSEGMENT in %s",
gst_format_get_name (format));
/* now configure the values */
gst_segment_set_newsegment_full (&queue->sink_segment, update,
rate, arate, format, start, stop, time);
break;
}
default:
if (GST_EVENT_IS_SERIALIZED (event)) {
/* we put the event in the queue, we don't have to act ourselves */
@ -611,19 +661,24 @@ gst_queue_is_filled (GstQueue * queue)
queue->cur_level.time >= queue->max_size.time)));
}
static GstFlowReturn
gst_queue_chain (GstPad * pad, GstBuffer * buffer)
{
GstQueue *queue;
GstClockTime duration, timestamp;
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
/* we have to lock the queue since we span threads */
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
timestamp = GST_BUFFER_TIMESTAMP (buffer);
duration = GST_BUFFER_DURATION (buffer);
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"adding buffer %p of size %d", buffer, GST_BUFFER_SIZE (buffer));
"adding buffer %p of size %d, time %" GST_TIME_FORMAT ", duration %"
GST_TIME_FORMAT, buffer, GST_BUFFER_SIZE (buffer),
GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration));
/* We make space available if we're "full" according to whatever
* the user defined as "full". Note that this only applies to buffers.
@ -679,9 +734,29 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
* to make things read-only. Also keep our list uptodate. */
queue->cur_level.bytes -= GST_BUFFER_SIZE (leak);
queue->cur_level.buffers--;
if (GST_BUFFER_DURATION (leak) != GST_CLOCK_TIME_NONE)
queue->cur_level.time -= GST_BUFFER_DURATION (leak);
timestamp = GST_BUFFER_TIMESTAMP (buffer);
duration = GST_BUFFER_DURATION (buffer);
/* update start time in queue */
if (queue->src_segment.format == GST_FORMAT_TIME) {
gint64 last_stop;
if (timestamp != GST_CLOCK_TIME_NONE)
last_stop = timestamp;
else
last_stop = queue->src_segment.last_stop;
gst_segment_set_last_stop (&queue->src_segment, GST_FORMAT_TIME,
last_stop);
update_time_level (queue);
} else if (duration != GST_CLOCK_TIME_NONE) {
if (queue->cur_level.time > duration)
queue->cur_level.time -= duration;
else
queue->cur_level.time = 0;
}
gst_buffer_unref (leak);
break;
}
@ -724,9 +799,26 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
/* add buffer to the statistics */
queue->cur_level.buffers++;
queue->cur_level.bytes += GST_BUFFER_SIZE (buffer);
if (GST_BUFFER_DURATION (buffer) != GST_CLOCK_TIME_NONE)
queue->cur_level.time += GST_BUFFER_DURATION (buffer);
/* update start time in queue */
if (queue->sink_segment.format == GST_FORMAT_TIME) {
gint64 last_stop;
if (timestamp != GST_CLOCK_TIME_NONE)
last_stop = timestamp;
else
last_stop = queue->sink_segment.last_stop;
if (duration != GST_CLOCK_TIME_NONE)
last_stop += duration;
gst_segment_set_last_stop (&queue->sink_segment, GST_FORMAT_TIME,
last_stop);
update_time_level (queue);
} else if (duration != GST_CLOCK_TIME_NONE) {
queue->cur_level.time += duration;
}
STATUS (queue, pad, "+ level");
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add");
@ -771,15 +863,38 @@ gst_queue_push_one (GstQueue * queue)
if (GST_IS_BUFFER (data)) {
GstFlowReturn result;
GstClockTime timestamp, duration;
GstBuffer *buffer = GST_BUFFER (data);
/* Update statistics */
queue->cur_level.buffers--;
queue->cur_level.bytes -= GST_BUFFER_SIZE (data);
if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
queue->cur_level.time -= GST_BUFFER_DURATION (data);
queue->cur_level.bytes -= GST_BUFFER_SIZE (buffer);
timestamp = GST_BUFFER_TIMESTAMP (buffer);
duration = GST_BUFFER_DURATION (buffer);
/* update start time in queue */
if (queue->src_segment.format == GST_FORMAT_TIME) {
gint64 last_stop;
if (timestamp != GST_CLOCK_TIME_NONE)
last_stop = timestamp;
else
last_stop = queue->src_segment.last_stop;
gst_segment_set_last_stop (&queue->src_segment, GST_FORMAT_TIME,
last_stop);
update_time_level (queue);
} else if (duration != GST_CLOCK_TIME_NONE) {
if (queue->cur_level.time > duration)
queue->cur_level.time -= duration;
else
queue->cur_level.time = 0;
}
GST_QUEUE_MUTEX_UNLOCK (queue);
result = gst_pad_push (queue->srcpad, GST_BUFFER (data));
result = gst_pad_push (queue->srcpad, buffer);
/* need to check for srcresult here as well */
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
/* else result of push indicates what happens */
@ -794,19 +909,40 @@ gst_queue_push_one (GstQueue * queue)
gst_pad_pause_task (queue->srcpad);
}
} else if (GST_IS_EVENT (data)) {
if (GST_EVENT_TYPE (data) == GST_EVENT_EOS) {
queue->cur_level.buffers = 0;
queue->cur_level.bytes = 0;
queue->cur_level.time = 0;
/* all incomming data is now unexpected */
queue->srcresult = GST_FLOW_UNEXPECTED;
/* and we don't need to process anymore */
GST_DEBUG_OBJECT (queue, "pausing queue, we're EOS now");
gst_pad_pause_task (queue->srcpad);
restart = FALSE;
GstEvent *event = GST_EVENT (data);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS:
queue->cur_level.buffers = 0;
queue->cur_level.bytes = 0;
queue->cur_level.time = 0;
/* all incomming data is now unexpected */
queue->srcresult = GST_FLOW_UNEXPECTED;
/* and we don't need to process anymore */
GST_DEBUG_OBJECT (queue, "pausing queue, we're EOS now");
gst_pad_pause_task (queue->srcpad);
restart = FALSE;
break;
case GST_EVENT_NEWSEGMENT:
{
gboolean update;
GstFormat format;
gdouble rate, arate;
gint64 start, stop, time;
gst_event_parse_new_segment_full (event, &update, &rate, &arate,
&format, &start, &stop, &time);
/* now configure the values */
gst_segment_set_newsegment_full (&queue->src_segment, update,
rate, arate, format, start, stop, time);
break;
}
default:
break;
}
GST_QUEUE_MUTEX_UNLOCK (queue);
gst_pad_push_event (queue->srcpad, GST_EVENT (data));
gst_pad_push_event (queue->srcpad, event);
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
if (restart == TRUE)
return TRUE;
@ -869,6 +1005,7 @@ restart:
STATUS (queue, pad, "post-empty wait");
GST_QUEUE_MUTEX_UNLOCK (queue);
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_PUSHING], 0);
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
}

View file

@ -75,6 +75,10 @@ struct _GstQueue {
GstPad *sinkpad;
GstPad *srcpad;
/* segments to keep track of timestamps */
GstSegment sink_segment;
GstSegment src_segment;
/* flowreturn when srcpad is paused */
GstFlowReturn srcresult;
@ -106,6 +110,8 @@ struct _GstQueueClass {
void (*running) (GstQueue *queue);
void (*overrun) (GstQueue *queue);
void (*pushing) (GstQueue *queue);
gpointer _gst_reserved[GST_PADDING];
};