From 14816cdb49bffb88daea34217eae1638701ae909 Mon Sep 17 00:00:00 2001 From: Jan Schmidt Date: Thu, 19 Jan 2006 13:30:31 +0000 Subject: [PATCH] gst/gstevent.c: Fix docs typo Original commit message from CVS: * gst/gstevent.c: Fix docs typo * plugins/elements/gstqueue.c: (gst_queue_handle_sink_event), (gst_queue_chain), (gst_queue_push_one), (gst_queue_loop): Do some refactoring. Doesn't actually change functionality, but makes landing the DRAIN event easier later. --- ChangeLog | 10 +++ gst/gstevent.c | 5 +- plugins/elements/gstqueue.c | 145 +++++++++++++++++++----------------- 3 files changed, 89 insertions(+), 71 deletions(-) diff --git a/ChangeLog b/ChangeLog index e7e0215dc3..92fdd11a1b 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,13 @@ +2006-01-19 Jan Schmidt + + * gst/gstevent.c: + Fix docs typo + + * plugins/elements/gstqueue.c: (gst_queue_handle_sink_event), + (gst_queue_chain), (gst_queue_push_one), (gst_queue_loop): + Do some refactoring. Doesn't actually change functionality, + but makes landing the DRAIN event easier later. + 2006-01-19 Tim-Philipp Müller * docs/pwg/advanced-scheduling.xml: diff --git a/gst/gstevent.c b/gst/gstevent.c index 71af9051bf..824aaf2126 100644 --- a/gst/gstevent.c +++ b/gst/gstevent.c @@ -388,8 +388,9 @@ gst_event_new_flush_stop (void) * event on a pad can return UNEXPECTED as a GstFlowReturn when data * after the EOS event arrives. * - * The EOS event will travel up to the sink elements in the pipeline - * which will then post the GST_MESSAGE_EOS on the bus. + * The EOS event will travel down to the sink elements in the pipeline + * which will then post the GST_MESSAGE_EOS on the bus after they have + * finished playing any buffered data. * * When all sinks have posted an EOS message, the EOS message is * forwarded to the application. diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index 8f527ebbbe..2cea209f4a 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -58,7 +58,7 @@ GST_DEBUG_CATEGORY_STATIC (queue_debug); #define GST_CAT_DEFAULT (queue_debug) GST_DEBUG_CATEGORY_STATIC (queue_dataflow); -#define STATUS(queue, msg) \ +#define STATUS(queue, pad, msg) \ GST_CAT_LOG_OBJECT (queue_dataflow, queue, \ "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \ "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \ @@ -145,6 +145,7 @@ static void gst_queue_get_property (GObject * object, static GstFlowReturn gst_queue_chain (GstPad * pad, GstBuffer * buffer); static GstFlowReturn gst_queue_bufferalloc (GstPad * pad, guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf); +static gboolean gst_queue_push_one (GstQueue * queue); static void gst_queue_loop (GstPad * pad); static gboolean gst_queue_handle_sink_event (GstPad * pad, GstEvent * event); @@ -162,6 +163,8 @@ static gboolean gst_queue_sink_activate_push (GstPad * pad, gboolean active); static GstStateChangeReturn gst_queue_change_state (GstElement * element, GstStateChange transition); +static gboolean gst_queue_is_empty (GstQueue * queue); +static gboolean gst_queue_is_filled (GstQueue * queue); #define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ()) @@ -495,23 +498,6 @@ gst_queue_locked_flush (GstQueue * queue) g_cond_signal (queue->item_del); } -#define STATUS(queue, msg) \ - GST_CAT_LOG_OBJECT (queue_dataflow, queue, \ - "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \ - "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \ - "-%" G_GUINT64_FORMAT " ns, %u elements", \ - GST_DEBUG_PAD_NAME (pad), \ - queue->cur_level.buffers, \ - queue->min_threshold.buffers, \ - queue->max_size.buffers, \ - queue->cur_level.bytes, \ - queue->min_threshold.bytes, \ - queue->max_size.bytes, \ - queue->cur_level.time, \ - queue->min_threshold.time, \ - queue->max_size.time, \ - queue->queue->length) - static gboolean gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) { @@ -522,7 +508,7 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) switch (GST_EVENT_TYPE (event)) { case GST_EVENT_FLUSH_START: - STATUS (queue, "received flush start event"); + STATUS (queue, pad, "received flush start event"); /* forward event */ gst_pad_push_event (queue->srcpad, event); @@ -539,7 +525,7 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped"); goto done; case GST_EVENT_FLUSH_STOP: - STATUS (queue, "received flush stop event"); + STATUS (queue, pad, "received flush stop event"); /* forward event */ gst_pad_push_event (queue->srcpad, event); @@ -554,10 +540,10 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) } GST_QUEUE_MUTEX_UNLOCK (queue); - STATUS (queue, "after flush"); + STATUS (queue, pad, "after flush"); goto done; case GST_EVENT_EOS: - STATUS (queue, "received EOS"); + STATUS (queue, pad, "received EOS"); have_eos = TRUE; break; default: @@ -685,10 +671,11 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) /* don't leak. Instead, wait for space to be available */ case GST_QUEUE_NO_LEAK: - STATUS (queue, "pre-full wait"); + STATUS (queue, pad, "pre-full wait"); while (gst_queue_is_filled (queue)) { - STATUS (queue, "waiting for item_del signal from thread using qlock"); + STATUS (queue, pad, + "waiting for item_del signal from thread using qlock"); g_cond_wait (queue->item_del, queue->qlock); if (queue->srcresult != GST_FLOW_OK) @@ -697,10 +684,11 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) /* if there's a pending state change for this queue * or its manager, switch back to iterator so bottom * half of state change executes */ - STATUS (queue, "received item_del signal from thread using qlock"); + STATUS (queue, pad, + "received item_del signal from thread using qlock"); } - STATUS (queue, "post-full wait"); + STATUS (queue, pad, "post-full wait"); GST_QUEUE_MUTEX_UNLOCK (queue); g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0); GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); @@ -717,7 +705,7 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) if (GST_BUFFER_DURATION (buffer) != GST_CLOCK_TIME_NONE) queue->cur_level.time += GST_BUFFER_DURATION (buffer); - STATUS (queue, "+ level"); + STATUS (queue, pad, "+ level"); GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add"); g_cond_signal (queue->item_add); @@ -748,47 +736,11 @@ out_flushing: } } -static void -gst_queue_loop (GstPad * pad) +static gboolean +gst_queue_push_one (GstQueue * queue) { - GstQueue *queue; - GstMiniObject *data; gboolean restart = TRUE; - - queue = GST_QUEUE (GST_PAD_PARENT (pad)); - - /* have to lock for thread-safety */ - GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); - -restart: - while (gst_queue_is_empty (queue)) { - GST_QUEUE_MUTEX_UNLOCK (queue); - g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0); - GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); - - STATUS (queue, "pre-empty wait"); - while (gst_queue_is_empty (queue)) { - STATUS (queue, "waiting for item_add"); - - GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p", - g_thread_self ()); - g_cond_wait (queue->item_add, queue->qlock); - - /* we released the lock in the g_cond above so we might be - * flushing now */ - if (queue->srcresult != GST_FLOW_OK) - goto out_flushing; - - GST_LOG_OBJECT (queue, "done g_cond_wait using qlock from thread %p", - g_thread_self ()); - STATUS (queue, "got item_add signal"); - } - - STATUS (queue, "post-empty wait"); - GST_QUEUE_MUTEX_UNLOCK (queue); - g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0); - GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); - } + GstMiniObject *data; /* There's something in the list now, whatever it is */ data = g_queue_pop_head (queue->queue); @@ -805,7 +757,7 @@ restart: queue->cur_level.time -= GST_BUFFER_DURATION (data); GST_QUEUE_MUTEX_UNLOCK (queue); - result = gst_pad_push (pad, GST_BUFFER (data)); + result = gst_pad_push (queue->srcpad, GST_BUFFER (data)); /* need to check for srcresult here as well */ GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); /* else result of push indicates what happens */ @@ -840,16 +792,71 @@ restart: gst_pad_push_event (queue->srcpad, GST_EVENT (data)); GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); if (restart == TRUE) - goto restart; + return TRUE; } else { g_warning ("Unexpected object in queue %s (refcounting problem?)", GST_OBJECT_NAME (queue)); } - STATUS (queue, "after _get()"); + STATUS (queue, queue->srcpad, "after _get()"); GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del"); g_cond_signal (queue->item_del); + + return FALSE; + +out_flushing: + gst_pad_pause_task (queue->srcpad); + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "exit because task paused, reason: %s", + gst_flow_get_name (queue->srcresult)); + + return FALSE; /* FALSE == no restart */ +} + +static void +gst_queue_loop (GstPad * pad) +{ + GstQueue *queue; + + queue = GST_QUEUE (GST_PAD_PARENT (pad)); + + /* have to lock for thread-safety */ + GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); + +restart: + while (gst_queue_is_empty (queue)) { + GST_QUEUE_MUTEX_UNLOCK (queue); + g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0); + GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); + + STATUS (queue, pad, "pre-empty wait"); + while (gst_queue_is_empty (queue)) { + STATUS (queue, pad, "waiting for item_add"); + + GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p", + g_thread_self ()); + g_cond_wait (queue->item_add, queue->qlock); + + /* we released the lock in the g_cond above so we might be + * flushing now */ + if (queue->srcresult != GST_FLOW_OK) + goto out_flushing; + + GST_LOG_OBJECT (queue, "done g_cond_wait using qlock from thread %p", + g_thread_self ()); + STATUS (queue, pad, "got item_add signal"); + } + + STATUS (queue, pad, "post-empty wait"); + GST_QUEUE_MUTEX_UNLOCK (queue); + g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0); + GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); + } + + if (gst_queue_push_one (queue)) + goto restart; + GST_QUEUE_MUTEX_UNLOCK (queue); return;