From f73fb55d1e50d332752dbc712c08c5bf772a9224 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 20 Jul 2005 10:58:10 +0000 Subject: [PATCH] gst/base/gstbasesrc.*: Add a gboolean to decide when to push out a discont. Original commit message from CVS: * gst/base/gstbasesrc.c: (gst_base_src_init), (gst_base_src_do_seek), (gst_base_src_loop), (gst_base_src_start): * gst/base/gstbasesrc.h: Add a gboolean to decide when to push out a discont. * gst/gstqueue.c: (gst_queue_handle_sink_event), (gst_queue_chain), (gst_queue_loop), (gst_queue_handle_src_query), (gst_queue_sink_activate_push), (gst_queue_src_activate_push), (gst_queue_set_property), (gst_queue_get_property): Some cleanups. * tests/threadstate/threadstate1.c: (main): Make a thread test compile and run... very silly.. --- ChangeLog | 17 +++++ gst/base/gstbasesrc.c | 11 +-- gst/base/gstbasesrc.h | 1 + gst/gstqueue.c | 114 ++++++++++++++----------------- libs/gst/base/gstbasesrc.c | 11 +-- libs/gst/base/gstbasesrc.h | 1 + plugins/elements/gstqueue.c | 114 ++++++++++++++----------------- tests/threadstate/threadstate1.c | 12 ++-- 8 files changed, 141 insertions(+), 140 deletions(-) diff --git a/ChangeLog b/ChangeLog index 9a285518b6..c296dd17a2 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,20 @@ +2005-07-20 Wim Taymans + + * gst/base/gstbasesrc.c: (gst_base_src_init), + (gst_base_src_do_seek), (gst_base_src_loop), (gst_base_src_start): + * gst/base/gstbasesrc.h: + Add a gboolean to decide when to push out a discont. + + * gst/gstqueue.c: (gst_queue_handle_sink_event), (gst_queue_chain), + (gst_queue_loop), (gst_queue_handle_src_query), + (gst_queue_sink_activate_push), (gst_queue_src_activate_push), + (gst_queue_set_property), (gst_queue_get_property): + Some cleanups. + + * tests/threadstate/threadstate1.c: (main): + Make a thread test compile and run... very silly.. + + 2005-07-20 Ronald S. Bultje * docs/manual/appendix-porting.xml: diff --git a/gst/base/gstbasesrc.c b/gst/base/gstbasesrc.c index 6d830d3c2b..cdacbd024d 100644 --- a/gst/base/gstbasesrc.c +++ b/gst/base/gstbasesrc.c @@ -195,6 +195,7 @@ gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class) basesrc->segment_start = -1; basesrc->segment_end = -1; + basesrc->need_discont = TRUE; basesrc->blocksize = DEFAULT_BLOCKSIZE; basesrc->clock_id = NULL; @@ -432,8 +433,8 @@ gst_base_src_do_seek (GstBaseSrc * src, GstEvent * event) goto error; } - /* now send discont */ - gst_base_src_send_discont (src); + /* now make sure the discont will be send */ + src->need_discont = TRUE; /* and restart the task */ gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop, @@ -669,9 +670,10 @@ gst_base_src_loop (GstPad * pad) src = GST_BASE_SRC (GST_OBJECT_PARENT (pad)); - if (src->offset == 0) { + if (src->need_discont) { /* now send discont */ gst_base_src_send_discont (src); + src->need_discont = FALSE; } ret = gst_base_src_get_range (pad, src->offset, src->blocksize, &buf); @@ -871,7 +873,6 @@ gst_base_src_start (GstBaseSrc * basesrc) /* start in the beginning */ basesrc->offset = 0; - basesrc->segment_start = 0; /* figure out the size */ if (bclass->get_size) { @@ -886,7 +887,9 @@ gst_base_src_start (GstBaseSrc * basesrc) GST_DEBUG ("size %d %lld", result, basesrc->size); /* we always run to the end */ + basesrc->segment_start = 0; basesrc->segment_end = basesrc->size; + basesrc->need_discont = TRUE; /* check if we can seek, updates ->seekable */ gst_base_src_is_seekable (basesrc); diff --git a/gst/base/gstbasesrc.h b/gst/base/gstbasesrc.h index f8713b1f7a..0676b91709 100644 --- a/gst/base/gstbasesrc.h +++ b/gst/base/gstbasesrc.h @@ -92,6 +92,7 @@ struct _GstBaseSrc { gint64 segment_start; /* start and end positions for seeking */ gint64 segment_end; gboolean segment_loop; + gboolean need_discont; guint64 offset; /* current offset in the resource */ guint64 size; /* total size of the resource */ diff --git a/gst/gstqueue.c b/gst/gstqueue.c index f8d3708576..971d4c4518 100644 --- a/gst/gstqueue.c +++ b/gst/gstqueue.c @@ -95,21 +95,27 @@ enum /* FILL ME */ }; -#define GST_QUEUE_MUTEX_LOCK G_STMT_START { \ - GST_CAT_LOG_OBJECT (queue_dataflow, queue, \ +#define GST_QUEUE_MUTEX_LOCK(q) G_STMT_START { \ + GST_CAT_LOG_OBJECT (queue_dataflow, q, \ "locking qlock from thread %p", \ g_thread_self ()); \ - g_mutex_lock (queue->qlock); \ - GST_CAT_LOG_OBJECT (queue_dataflow, queue, \ + g_mutex_lock (q->qlock); \ + GST_CAT_LOG_OBJECT (queue_dataflow, q, \ "locked qlock from thread %p", \ g_thread_self ()); \ } G_STMT_END -#define GST_QUEUE_MUTEX_UNLOCK G_STMT_START { \ - GST_CAT_LOG_OBJECT (queue_dataflow, queue, \ +#define GST_QUEUE_MUTEX_LOCK_CHECK(q,label) G_STMT_START { \ + GST_QUEUE_MUTEX_LOCK (q); \ + if (q->srcresult != GST_FLOW_OK) \ + goto label; \ +} G_STMT_END + +#define GST_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \ + GST_CAT_LOG_OBJECT (queue_dataflow, q, \ "unlocking qlock from thread %p", \ g_thread_self ()); \ - g_mutex_unlock (queue->qlock); \ + g_mutex_unlock (q->qlock); \ } G_STMT_END @@ -468,25 +474,25 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) switch (GST_EVENT_TYPE (event)) { case GST_EVENT_FLUSH: STATUS (queue, "received flush event"); - /* forward event */ + /* forward event, re first as we're going to use it still */ gst_event_ref (event); gst_pad_push_event (queue->srcpad, event); if (GST_EVENT_FLUSH_DONE (event)) { - GST_QUEUE_MUTEX_LOCK; + GST_QUEUE_MUTEX_LOCK (queue); gst_queue_locked_flush (queue); queue->srcresult = GST_FLOW_OK; gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop, queue->srcpad); - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); + + STATUS (queue, "after flush"); } else { /* now unblock the chain function */ - GST_QUEUE_MUTEX_LOCK; + GST_QUEUE_MUTEX_LOCK (queue); queue->srcresult = GST_FLOW_WRONG_STATE; /* unblock the loop function */ g_cond_signal (queue->item_add); - GST_QUEUE_MUTEX_UNLOCK; - - STATUS (queue, "after flush"); + GST_QUEUE_MUTEX_UNLOCK (queue); /* make sure it pauses */ gst_pad_pause_task (queue->srcpad); @@ -504,11 +510,11 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) break; } - GST_QUEUE_MUTEX_LOCK; + GST_QUEUE_MUTEX_LOCK (queue); g_queue_push_tail (queue->queue, event); g_cond_signal (queue->item_add); + GST_QUEUE_MUTEX_UNLOCK (queue); - GST_QUEUE_MUTEX_UNLOCK; done: return TRUE; @@ -546,10 +552,7 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); /* we have to lock the queue since we span threads */ - GST_QUEUE_MUTEX_LOCK; - - if (queue->srcresult != GST_FLOW_OK) - goto out_flushing; + GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); GST_CAT_LOG_OBJECT (queue_dataflow, queue, "adding buffer %p of size %d", buffer, GST_BUFFER_SIZE (buffer)); @@ -558,12 +561,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) * the user defined as "full". Note that this only applies to buffers. * We always handle events and they don't count in our statistics. */ while (gst_queue_is_filled (queue)) { - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0); - GST_QUEUE_MUTEX_LOCK; - - if (queue->srcresult != GST_FLOW_OK) - goto out_flushing; + GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); /* how are we going to make space for this buffer? */ switch (queue->leaky) { @@ -637,12 +637,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) } STATUS (queue, "post-full wait"); - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0); - GST_QUEUE_MUTEX_LOCK; - - if (queue->srcresult != GST_FLOW_OK) - goto out_flushing; + GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); break; } @@ -660,14 +657,14 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add"); g_cond_signal (queue->item_add); - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); return GST_FLOW_OK; /* special conditions */ out_unref: { - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); gst_buffer_unref (buffer); @@ -679,7 +676,7 @@ out_flushing: GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because task paused, reason: %d", ret); - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); gst_buffer_unref (buffer); @@ -697,19 +694,13 @@ gst_queue_loop (GstPad * pad) queue = GST_QUEUE (GST_PAD_PARENT (pad)); /* have to lock for thread-safety */ - GST_QUEUE_MUTEX_LOCK; + GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); restart: - if (queue->srcresult != GST_FLOW_OK) - goto out_flushing; - while (gst_queue_is_empty (queue)) { - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0); - GST_QUEUE_MUTEX_LOCK; - - if (queue->srcresult != GST_FLOW_OK) - goto out_flushing; + GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); STATUS (queue, "pre-empty wait"); while (gst_queue_is_empty (queue)) { @@ -730,12 +721,9 @@ restart: } STATUS (queue, "post-empty wait"); - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0); - GST_QUEUE_MUTEX_LOCK; - - if (queue->srcresult != GST_FLOW_OK) - goto out_flushing; + GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); } /* There's something in the list now, whatever it is */ @@ -752,9 +740,9 @@ restart: if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE) queue->cur_level.time -= GST_BUFFER_DURATION (data); - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); result = gst_pad_push (pad, GST_BUFFER (data)); - GST_QUEUE_MUTEX_LOCK; + GST_QUEUE_MUTEX_LOCK (queue); /* can opt to check for srcresult here but the push should * return an error value that is more accurate */ if (result != GST_FLOW_OK) { @@ -775,9 +763,9 @@ restart: gst_pad_pause_task (queue->srcpad); restart = FALSE; } - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); gst_pad_push_event (queue->srcpad, GST_EVENT (data)); - GST_QUEUE_MUTEX_LOCK; + GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); if (restart == TRUE) goto restart; } @@ -786,7 +774,7 @@ restart: GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del"); g_cond_signal (queue->item_del); - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); return; @@ -794,7 +782,7 @@ out_flushing: { GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because task paused, reason: %d", queue->srcresult); - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); return; } @@ -877,11 +865,11 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active) result = TRUE; } else { /* step 1, unblock chain and loop functions */ - GST_QUEUE_MUTEX_LOCK; + GST_QUEUE_MUTEX_LOCK (queue); queue->srcresult = GST_FLOW_WRONG_STATE; gst_queue_locked_flush (queue); g_cond_signal (queue->item_del); - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); /* step 2, make sure streaming finishes */ result = gst_pad_stop_task (pad); @@ -900,16 +888,16 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active) queue = GST_QUEUE (gst_pad_get_parent (pad)); if (active) { - GST_QUEUE_MUTEX_LOCK; + GST_QUEUE_MUTEX_LOCK (queue); queue->srcresult = GST_FLOW_OK; result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad); - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); } else { /* step 1, unblock chain and loop functions */ - GST_QUEUE_MUTEX_LOCK; + GST_QUEUE_MUTEX_LOCK (queue); queue->srcresult = GST_FLOW_WRONG_STATE; g_cond_signal (queue->item_add); - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); /* step 2, make sure streaming finishes */ result = gst_pad_stop_task (pad); @@ -965,7 +953,7 @@ gst_queue_set_property (GObject * object, /* someone could change levels here, and since this * affects the get/put funcs, we need to lock for safety. */ - GST_QUEUE_MUTEX_LOCK; + GST_QUEUE_MUTEX_LOCK (queue); switch (prop_id) { case ARG_MAX_SIZE_BYTES: @@ -1000,7 +988,7 @@ gst_queue_set_property (GObject * object, break; } - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); } static void @@ -1009,7 +997,7 @@ gst_queue_get_property (GObject * object, { GstQueue *queue = GST_QUEUE (object); - GST_QUEUE_MUTEX_LOCK; + GST_QUEUE_MUTEX_LOCK (queue); switch (prop_id) { case ARG_CUR_LEVEL_BYTES: @@ -1053,5 +1041,5 @@ gst_queue_get_property (GObject * object, break; } - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); } diff --git a/libs/gst/base/gstbasesrc.c b/libs/gst/base/gstbasesrc.c index 6d830d3c2b..cdacbd024d 100644 --- a/libs/gst/base/gstbasesrc.c +++ b/libs/gst/base/gstbasesrc.c @@ -195,6 +195,7 @@ gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class) basesrc->segment_start = -1; basesrc->segment_end = -1; + basesrc->need_discont = TRUE; basesrc->blocksize = DEFAULT_BLOCKSIZE; basesrc->clock_id = NULL; @@ -432,8 +433,8 @@ gst_base_src_do_seek (GstBaseSrc * src, GstEvent * event) goto error; } - /* now send discont */ - gst_base_src_send_discont (src); + /* now make sure the discont will be send */ + src->need_discont = TRUE; /* and restart the task */ gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop, @@ -669,9 +670,10 @@ gst_base_src_loop (GstPad * pad) src = GST_BASE_SRC (GST_OBJECT_PARENT (pad)); - if (src->offset == 0) { + if (src->need_discont) { /* now send discont */ gst_base_src_send_discont (src); + src->need_discont = FALSE; } ret = gst_base_src_get_range (pad, src->offset, src->blocksize, &buf); @@ -871,7 +873,6 @@ gst_base_src_start (GstBaseSrc * basesrc) /* start in the beginning */ basesrc->offset = 0; - basesrc->segment_start = 0; /* figure out the size */ if (bclass->get_size) { @@ -886,7 +887,9 @@ gst_base_src_start (GstBaseSrc * basesrc) GST_DEBUG ("size %d %lld", result, basesrc->size); /* we always run to the end */ + basesrc->segment_start = 0; basesrc->segment_end = basesrc->size; + basesrc->need_discont = TRUE; /* check if we can seek, updates ->seekable */ gst_base_src_is_seekable (basesrc); diff --git a/libs/gst/base/gstbasesrc.h b/libs/gst/base/gstbasesrc.h index f8713b1f7a..0676b91709 100644 --- a/libs/gst/base/gstbasesrc.h +++ b/libs/gst/base/gstbasesrc.h @@ -92,6 +92,7 @@ struct _GstBaseSrc { gint64 segment_start; /* start and end positions for seeking */ gint64 segment_end; gboolean segment_loop; + gboolean need_discont; guint64 offset; /* current offset in the resource */ guint64 size; /* total size of the resource */ diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index f8d3708576..971d4c4518 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -95,21 +95,27 @@ enum /* FILL ME */ }; -#define GST_QUEUE_MUTEX_LOCK G_STMT_START { \ - GST_CAT_LOG_OBJECT (queue_dataflow, queue, \ +#define GST_QUEUE_MUTEX_LOCK(q) G_STMT_START { \ + GST_CAT_LOG_OBJECT (queue_dataflow, q, \ "locking qlock from thread %p", \ g_thread_self ()); \ - g_mutex_lock (queue->qlock); \ - GST_CAT_LOG_OBJECT (queue_dataflow, queue, \ + g_mutex_lock (q->qlock); \ + GST_CAT_LOG_OBJECT (queue_dataflow, q, \ "locked qlock from thread %p", \ g_thread_self ()); \ } G_STMT_END -#define GST_QUEUE_MUTEX_UNLOCK G_STMT_START { \ - GST_CAT_LOG_OBJECT (queue_dataflow, queue, \ +#define GST_QUEUE_MUTEX_LOCK_CHECK(q,label) G_STMT_START { \ + GST_QUEUE_MUTEX_LOCK (q); \ + if (q->srcresult != GST_FLOW_OK) \ + goto label; \ +} G_STMT_END + +#define GST_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \ + GST_CAT_LOG_OBJECT (queue_dataflow, q, \ "unlocking qlock from thread %p", \ g_thread_self ()); \ - g_mutex_unlock (queue->qlock); \ + g_mutex_unlock (q->qlock); \ } G_STMT_END @@ -468,25 +474,25 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) switch (GST_EVENT_TYPE (event)) { case GST_EVENT_FLUSH: STATUS (queue, "received flush event"); - /* forward event */ + /* forward event, re first as we're going to use it still */ gst_event_ref (event); gst_pad_push_event (queue->srcpad, event); if (GST_EVENT_FLUSH_DONE (event)) { - GST_QUEUE_MUTEX_LOCK; + GST_QUEUE_MUTEX_LOCK (queue); gst_queue_locked_flush (queue); queue->srcresult = GST_FLOW_OK; gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop, queue->srcpad); - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); + + STATUS (queue, "after flush"); } else { /* now unblock the chain function */ - GST_QUEUE_MUTEX_LOCK; + GST_QUEUE_MUTEX_LOCK (queue); queue->srcresult = GST_FLOW_WRONG_STATE; /* unblock the loop function */ g_cond_signal (queue->item_add); - GST_QUEUE_MUTEX_UNLOCK; - - STATUS (queue, "after flush"); + GST_QUEUE_MUTEX_UNLOCK (queue); /* make sure it pauses */ gst_pad_pause_task (queue->srcpad); @@ -504,11 +510,11 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) break; } - GST_QUEUE_MUTEX_LOCK; + GST_QUEUE_MUTEX_LOCK (queue); g_queue_push_tail (queue->queue, event); g_cond_signal (queue->item_add); + GST_QUEUE_MUTEX_UNLOCK (queue); - GST_QUEUE_MUTEX_UNLOCK; done: return TRUE; @@ -546,10 +552,7 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); /* we have to lock the queue since we span threads */ - GST_QUEUE_MUTEX_LOCK; - - if (queue->srcresult != GST_FLOW_OK) - goto out_flushing; + GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); GST_CAT_LOG_OBJECT (queue_dataflow, queue, "adding buffer %p of size %d", buffer, GST_BUFFER_SIZE (buffer)); @@ -558,12 +561,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) * the user defined as "full". Note that this only applies to buffers. * We always handle events and they don't count in our statistics. */ while (gst_queue_is_filled (queue)) { - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0); - GST_QUEUE_MUTEX_LOCK; - - if (queue->srcresult != GST_FLOW_OK) - goto out_flushing; + GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); /* how are we going to make space for this buffer? */ switch (queue->leaky) { @@ -637,12 +637,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) } STATUS (queue, "post-full wait"); - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0); - GST_QUEUE_MUTEX_LOCK; - - if (queue->srcresult != GST_FLOW_OK) - goto out_flushing; + GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); break; } @@ -660,14 +657,14 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add"); g_cond_signal (queue->item_add); - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); return GST_FLOW_OK; /* special conditions */ out_unref: { - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); gst_buffer_unref (buffer); @@ -679,7 +676,7 @@ out_flushing: GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because task paused, reason: %d", ret); - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); gst_buffer_unref (buffer); @@ -697,19 +694,13 @@ gst_queue_loop (GstPad * pad) queue = GST_QUEUE (GST_PAD_PARENT (pad)); /* have to lock for thread-safety */ - GST_QUEUE_MUTEX_LOCK; + GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); restart: - if (queue->srcresult != GST_FLOW_OK) - goto out_flushing; - while (gst_queue_is_empty (queue)) { - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0); - GST_QUEUE_MUTEX_LOCK; - - if (queue->srcresult != GST_FLOW_OK) - goto out_flushing; + GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); STATUS (queue, "pre-empty wait"); while (gst_queue_is_empty (queue)) { @@ -730,12 +721,9 @@ restart: } STATUS (queue, "post-empty wait"); - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0); - GST_QUEUE_MUTEX_LOCK; - - if (queue->srcresult != GST_FLOW_OK) - goto out_flushing; + GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); } /* There's something in the list now, whatever it is */ @@ -752,9 +740,9 @@ restart: if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE) queue->cur_level.time -= GST_BUFFER_DURATION (data); - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); result = gst_pad_push (pad, GST_BUFFER (data)); - GST_QUEUE_MUTEX_LOCK; + GST_QUEUE_MUTEX_LOCK (queue); /* can opt to check for srcresult here but the push should * return an error value that is more accurate */ if (result != GST_FLOW_OK) { @@ -775,9 +763,9 @@ restart: gst_pad_pause_task (queue->srcpad); restart = FALSE; } - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); gst_pad_push_event (queue->srcpad, GST_EVENT (data)); - GST_QUEUE_MUTEX_LOCK; + GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); if (restart == TRUE) goto restart; } @@ -786,7 +774,7 @@ restart: GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del"); g_cond_signal (queue->item_del); - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); return; @@ -794,7 +782,7 @@ out_flushing: { GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because task paused, reason: %d", queue->srcresult); - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); return; } @@ -877,11 +865,11 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active) result = TRUE; } else { /* step 1, unblock chain and loop functions */ - GST_QUEUE_MUTEX_LOCK; + GST_QUEUE_MUTEX_LOCK (queue); queue->srcresult = GST_FLOW_WRONG_STATE; gst_queue_locked_flush (queue); g_cond_signal (queue->item_del); - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); /* step 2, make sure streaming finishes */ result = gst_pad_stop_task (pad); @@ -900,16 +888,16 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active) queue = GST_QUEUE (gst_pad_get_parent (pad)); if (active) { - GST_QUEUE_MUTEX_LOCK; + GST_QUEUE_MUTEX_LOCK (queue); queue->srcresult = GST_FLOW_OK; result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad); - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); } else { /* step 1, unblock chain and loop functions */ - GST_QUEUE_MUTEX_LOCK; + GST_QUEUE_MUTEX_LOCK (queue); queue->srcresult = GST_FLOW_WRONG_STATE; g_cond_signal (queue->item_add); - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); /* step 2, make sure streaming finishes */ result = gst_pad_stop_task (pad); @@ -965,7 +953,7 @@ gst_queue_set_property (GObject * object, /* someone could change levels here, and since this * affects the get/put funcs, we need to lock for safety. */ - GST_QUEUE_MUTEX_LOCK; + GST_QUEUE_MUTEX_LOCK (queue); switch (prop_id) { case ARG_MAX_SIZE_BYTES: @@ -1000,7 +988,7 @@ gst_queue_set_property (GObject * object, break; } - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); } static void @@ -1009,7 +997,7 @@ gst_queue_get_property (GObject * object, { GstQueue *queue = GST_QUEUE (object); - GST_QUEUE_MUTEX_LOCK; + GST_QUEUE_MUTEX_LOCK (queue); switch (prop_id) { case ARG_CUR_LEVEL_BYTES: @@ -1053,5 +1041,5 @@ gst_queue_get_property (GObject * object, break; } - GST_QUEUE_MUTEX_UNLOCK; + GST_QUEUE_MUTEX_UNLOCK (queue); } diff --git a/tests/threadstate/threadstate1.c b/tests/threadstate/threadstate1.c index 2b1e5faabc..58d8b87191 100644 --- a/tests/threadstate/threadstate1.c +++ b/tests/threadstate/threadstate1.c @@ -10,13 +10,13 @@ int main (int argc, char *argv[]) { GstElement *fakesrc, *fakesink; - GstElement *thread; + GstElement *pipeline; gint x; gst_init (&argc, &argv); - thread = gst_thread_new ("thread"); - g_assert (thread != NULL); + pipeline = gst_pipeline_new ("pipeline"); + g_assert (pipeline != NULL); fakesrc = gst_element_factory_make ("fakesrc", "fake_source"); g_assert (fakesrc != NULL); @@ -24,16 +24,16 @@ main (int argc, char *argv[]) fakesink = gst_element_factory_make ("fakesink", "fake_sink"); g_assert (fakesink != NULL); - gst_bin_add_many (GST_BIN (thread), fakesrc, fakesink, NULL); + gst_bin_add_many (GST_BIN (pipeline), fakesrc, fakesink, NULL); gst_element_link (fakesrc, fakesink); for (x = 0; x < 10; x++) { g_print ("playing %d\n", x); - gst_element_set_state (GST_ELEMENT (thread), GST_STATE_PLAYING); + gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PLAYING); g_usleep (G_USEC_PER_SEC); g_print ("pausing %d\n", x); - gst_element_set_state (GST_ELEMENT (thread), GST_STATE_PAUSED); + gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PAUSED); g_usleep (G_USEC_PER_SEC); }