From 3da8ea1c3dba7465c02bd1833087df5bd16d81dc Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 10 May 2007 15:21:20 +0000 Subject: [PATCH] plugins/elements/gstqueue.*: Be smarter when calculating the current amount of data in the queue by measuring the dif... Original commit message from CVS: * plugins/elements/gstqueue.c: (gst_queue_class_init), (update_time_level), (gst_queue_locked_flush), (gst_queue_handle_sink_event), (gst_queue_chain), (gst_queue_push_one), (gst_queue_loop): * plugins/elements/gstqueue.h: Be smarter when calculating the current amount of data in the queue by measuring the difference between start and end timestamps (in running time) inside the queue. Fixes #432876. API: GstQueue::pushing to notify elements that we are pushing data again since the running signal is rather broken for this purpose. --- ChangeLog | 13 +++ plugins/elements/gstqueue.c | 179 +++++++++++++++++++++++++++++++----- plugins/elements/gstqueue.h | 6 ++ 3 files changed, 177 insertions(+), 21 deletions(-) diff --git a/ChangeLog b/ChangeLog index 60a9734645..6b36fb5f6f 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,16 @@ +2007-05-10 Wim Taymans + + * plugins/elements/gstqueue.c: (gst_queue_class_init), + (update_time_level), (gst_queue_locked_flush), + (gst_queue_handle_sink_event), (gst_queue_chain), + (gst_queue_push_one), (gst_queue_loop): + * plugins/elements/gstqueue.h: + Be smarter when calculating the current amount of data in the queue by + measuring the difference between start and end timestamps (in running + time) inside the queue. Fixes #432876. + API: GstQueue::pushing to notify elements that we are pushing data again + since the running signal is rather broken for this purpose. + 2007-05-10 Stefan Kost * plugins/elements/gstqueue.c (_do_init, gst_queue_signals, diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index ac65eb7695..e19b5b82de 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -106,6 +106,7 @@ enum SIGNAL_UNDERRUN, SIGNAL_RUNNING, SIGNAL_OVERRUN, + SIGNAL_PUSHING, LAST_SIGNAL }; @@ -277,6 +278,17 @@ gst_queue_class_init (GstQueueClass * klass) g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstQueueClass, overrun), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); + /** + * GstQueue::pushing: + * @queue: the queue instance + * + * Reports when the queue has enough data to start pushing data again on the + * source pad. + */ + gst_queue_signals[SIGNAL_PUSHING] = + g_signal_new ("pushing", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, + G_STRUCT_OFFSET (GstQueueClass, pushing), NULL, NULL, + g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); /* properties */ g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES, @@ -489,6 +501,24 @@ gst_queue_acceptcaps (GstPad * pad, GstCaps * caps) return TRUE; } +static void +update_time_level (GstQueue * queue) +{ + gint64 sink_time, src_time; + + sink_time = + gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME, + queue->sink_segment.last_stop); + + src_time = gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME, + queue->src_segment.last_stop); + + if (sink_time >= src_time) + queue->cur_level.time = sink_time - src_time; + else + queue->cur_level.time = 0; +} + static void gst_queue_locked_flush (GstQueue * queue) { @@ -505,6 +535,8 @@ gst_queue_locked_flush (GstQueue * queue) queue->min_threshold.buffers = queue->orig_min_threshold.buffers; queue->min_threshold.bytes = queue->orig_min_threshold.bytes; queue->min_threshold.time = queue->orig_min_threshold.time; + gst_segment_init (&queue->sink_segment, GST_FORMAT_UNDEFINED); + gst_segment_init (&queue->src_segment, GST_FORMAT_UNDEFINED); /* we deleted something... */ g_cond_signal (queue->item_del); @@ -559,6 +591,24 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) STATUS (queue, pad, "received EOS"); have_eos = TRUE; break; + case GST_EVENT_NEWSEGMENT: + { + gboolean update; + GstFormat format; + gdouble rate, arate; + gint64 start, stop, time; + + gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format, + &start, &stop, &time); + + GST_DEBUG_OBJECT (queue, "received NEWSEGMENT in %s", + gst_format_get_name (format)); + + /* now configure the values */ + gst_segment_set_newsegment_full (&queue->sink_segment, update, + rate, arate, format, start, stop, time); + break; + } default: if (GST_EVENT_IS_SERIALIZED (event)) { /* we put the event in the queue, we don't have to act ourselves */ @@ -611,19 +661,24 @@ gst_queue_is_filled (GstQueue * queue) queue->cur_level.time >= queue->max_size.time))); } - static GstFlowReturn gst_queue_chain (GstPad * pad, GstBuffer * buffer) { GstQueue *queue; + GstClockTime duration, timestamp; queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); /* we have to lock the queue since we span threads */ GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); + timestamp = GST_BUFFER_TIMESTAMP (buffer); + duration = GST_BUFFER_DURATION (buffer); + GST_CAT_LOG_OBJECT (queue_dataflow, queue, - "adding buffer %p of size %d", buffer, GST_BUFFER_SIZE (buffer)); + "adding buffer %p of size %d, time %" GST_TIME_FORMAT ", duration %" + GST_TIME_FORMAT, buffer, GST_BUFFER_SIZE (buffer), + GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration)); /* We make space available if we're "full" according to whatever * the user defined as "full". Note that this only applies to buffers. @@ -679,9 +734,29 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) * to make things read-only. Also keep our list uptodate. */ queue->cur_level.bytes -= GST_BUFFER_SIZE (leak); queue->cur_level.buffers--; - if (GST_BUFFER_DURATION (leak) != GST_CLOCK_TIME_NONE) - queue->cur_level.time -= GST_BUFFER_DURATION (leak); + timestamp = GST_BUFFER_TIMESTAMP (buffer); + duration = GST_BUFFER_DURATION (buffer); + + /* update start time in queue */ + if (queue->src_segment.format == GST_FORMAT_TIME) { + gint64 last_stop; + + if (timestamp != GST_CLOCK_TIME_NONE) + last_stop = timestamp; + else + last_stop = queue->src_segment.last_stop; + + gst_segment_set_last_stop (&queue->src_segment, GST_FORMAT_TIME, + last_stop); + + update_time_level (queue); + } else if (duration != GST_CLOCK_TIME_NONE) { + if (queue->cur_level.time > duration) + queue->cur_level.time -= duration; + else + queue->cur_level.time = 0; + } gst_buffer_unref (leak); break; } @@ -724,9 +799,26 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) /* add buffer to the statistics */ queue->cur_level.buffers++; queue->cur_level.bytes += GST_BUFFER_SIZE (buffer); - if (GST_BUFFER_DURATION (buffer) != GST_CLOCK_TIME_NONE) - queue->cur_level.time += GST_BUFFER_DURATION (buffer); + /* update start time in queue */ + if (queue->sink_segment.format == GST_FORMAT_TIME) { + gint64 last_stop; + + if (timestamp != GST_CLOCK_TIME_NONE) + last_stop = timestamp; + else + last_stop = queue->sink_segment.last_stop; + + if (duration != GST_CLOCK_TIME_NONE) + last_stop += duration; + + gst_segment_set_last_stop (&queue->sink_segment, GST_FORMAT_TIME, + last_stop); + + update_time_level (queue); + } else if (duration != GST_CLOCK_TIME_NONE) { + queue->cur_level.time += duration; + } STATUS (queue, pad, "+ level"); GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add"); @@ -771,15 +863,38 @@ gst_queue_push_one (GstQueue * queue) if (GST_IS_BUFFER (data)) { GstFlowReturn result; + GstClockTime timestamp, duration; + GstBuffer *buffer = GST_BUFFER (data); /* Update statistics */ queue->cur_level.buffers--; - queue->cur_level.bytes -= GST_BUFFER_SIZE (data); - if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE) - queue->cur_level.time -= GST_BUFFER_DURATION (data); + queue->cur_level.bytes -= GST_BUFFER_SIZE (buffer); + + timestamp = GST_BUFFER_TIMESTAMP (buffer); + duration = GST_BUFFER_DURATION (buffer); + + /* update start time in queue */ + if (queue->src_segment.format == GST_FORMAT_TIME) { + gint64 last_stop; + + if (timestamp != GST_CLOCK_TIME_NONE) + last_stop = timestamp; + else + last_stop = queue->src_segment.last_stop; + + gst_segment_set_last_stop (&queue->src_segment, GST_FORMAT_TIME, + last_stop); + + update_time_level (queue); + } else if (duration != GST_CLOCK_TIME_NONE) { + if (queue->cur_level.time > duration) + queue->cur_level.time -= duration; + else + queue->cur_level.time = 0; + } GST_QUEUE_MUTEX_UNLOCK (queue); - result = gst_pad_push (queue->srcpad, GST_BUFFER (data)); + result = gst_pad_push (queue->srcpad, buffer); /* need to check for srcresult here as well */ GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); /* else result of push indicates what happens */ @@ -794,19 +909,40 @@ gst_queue_push_one (GstQueue * queue) gst_pad_pause_task (queue->srcpad); } } else if (GST_IS_EVENT (data)) { - if (GST_EVENT_TYPE (data) == GST_EVENT_EOS) { - queue->cur_level.buffers = 0; - queue->cur_level.bytes = 0; - queue->cur_level.time = 0; - /* all incomming data is now unexpected */ - queue->srcresult = GST_FLOW_UNEXPECTED; - /* and we don't need to process anymore */ - GST_DEBUG_OBJECT (queue, "pausing queue, we're EOS now"); - gst_pad_pause_task (queue->srcpad); - restart = FALSE; + GstEvent *event = GST_EVENT (data); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_EOS: + queue->cur_level.buffers = 0; + queue->cur_level.bytes = 0; + queue->cur_level.time = 0; + /* all incomming data is now unexpected */ + queue->srcresult = GST_FLOW_UNEXPECTED; + /* and we don't need to process anymore */ + GST_DEBUG_OBJECT (queue, "pausing queue, we're EOS now"); + gst_pad_pause_task (queue->srcpad); + restart = FALSE; + break; + case GST_EVENT_NEWSEGMENT: + { + gboolean update; + GstFormat format; + gdouble rate, arate; + gint64 start, stop, time; + + gst_event_parse_new_segment_full (event, &update, &rate, &arate, + &format, &start, &stop, &time); + + /* now configure the values */ + gst_segment_set_newsegment_full (&queue->src_segment, update, + rate, arate, format, start, stop, time); + break; + } + default: + break; } GST_QUEUE_MUTEX_UNLOCK (queue); - gst_pad_push_event (queue->srcpad, GST_EVENT (data)); + gst_pad_push_event (queue->srcpad, event); GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); if (restart == TRUE) return TRUE; @@ -869,6 +1005,7 @@ restart: STATUS (queue, pad, "post-empty wait"); GST_QUEUE_MUTEX_UNLOCK (queue); g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0); + g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_PUSHING], 0); GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); } diff --git a/plugins/elements/gstqueue.h b/plugins/elements/gstqueue.h index 0de1218cb5..05ca904a0e 100644 --- a/plugins/elements/gstqueue.h +++ b/plugins/elements/gstqueue.h @@ -75,6 +75,10 @@ struct _GstQueue { GstPad *sinkpad; GstPad *srcpad; + /* segments to keep track of timestamps */ + GstSegment sink_segment; + GstSegment src_segment; + /* flowreturn when srcpad is paused */ GstFlowReturn srcresult; @@ -106,6 +110,8 @@ struct _GstQueueClass { void (*running) (GstQueue *queue); void (*overrun) (GstQueue *queue); + void (*pushing) (GstQueue *queue); + gpointer _gst_reserved[GST_PADDING]; };