From cb0068581c6069d06c39e6a17892a3eaf503eeb2 Mon Sep 17 00:00:00 2001 From: Matt Howell Date: Fri, 27 Apr 2001 20:55:47 +0000 Subject: [PATCH] initial fix of thread synch for queues and state change Original commit message from CVS: initial fix of thread synch for queues and state change --- gst/gstelement.c | 7 +- gst/gstqueue.c | 134 +++++++++++++++++++++++++----------- gst/gstqueue.h | 2 +- gst/gstscheduler.c | 14 ++++ gst/gstthread.c | 36 ++++++++-- plugins/elements/gstqueue.c | 134 +++++++++++++++++++++++++----------- plugins/elements/gstqueue.h | 2 +- 7 files changed, 240 insertions(+), 89 deletions(-) diff --git a/gst/gstelement.c b/gst/gstelement.c index 361af129f3..e80336106a 100644 --- a/gst/gstelement.c +++ b/gst/gstelement.c @@ -791,6 +791,7 @@ gst_element_get_factory (GstElement *element) return oclass->elementfactory; } + /** * gst_element_change_state: * @element: element to change state of @@ -822,6 +823,11 @@ GST_ELEMENT_NAME(element),GST_ELEMENT_NAME(GST_ELEMENT_PARENT(element)),GST_ELEM GST_STATE (element) = GST_STATE_PENDING (element); GST_STATE_PENDING (element) = GST_STATE_NONE_PENDING; + // note: queues' state_change is a special case because it needs to lock + // for synchronization (from another thread). since this signal may block + // or (worse) make another state change, the queue needs to unlock before + // calling. thus, gstqueue.c::gst_queue_state_change() blocks, unblocks, + // unlocks, then emits this. gtk_signal_emit (GTK_OBJECT (element), gst_element_signals[STATE_CHANGE], GST_STATE (element)); return TRUE; @@ -1132,4 +1138,3 @@ gst_element_signal_eos (GstElement *element) gtk_signal_emit (GTK_OBJECT (element), gst_element_signals[EOS]); GST_FLAG_SET(element,GST_ELEMENT_COTHREAD_STOPPING); } - diff --git a/gst/gstqueue.c b/gst/gstqueue.c index 258eb928ed..485253b265 100644 --- a/gst/gstqueue.c +++ b/gst/gstqueue.c @@ -34,6 +34,7 @@ #include "gst_private.h" #include "gstqueue.h" +#include "gstscheduler.h" GstElementDetails gst_queue_details = { "Queue", @@ -55,7 +56,7 @@ enum { ARG_0, ARG_LEVEL, ARG_MAX_LEVEL, - ARG_BLOCK, +// ARG_BLOCK, }; @@ -115,8 +116,8 @@ gst_queue_class_init (GstQueueClass *klass) GTK_ARG_READABLE, ARG_LEVEL); gtk_object_add_arg_type ("GstQueue::max_level", GTK_TYPE_INT, GTK_ARG_READWRITE, ARG_MAX_LEVEL); - gtk_object_add_arg_type ("GstQueue::block", GTK_TYPE_BOOL, - GTK_ARG_READWRITE, ARG_BLOCK); +// gtk_object_add_arg_type ("GstQueue::block", GTK_TYPE_BOOL, +// GTK_ARG_READWRITE, ARG_BLOCK); gtkobject_class->set_arg = gst_queue_set_arg; gtkobject_class->get_arg = gst_queue_get_arg; @@ -145,7 +146,7 @@ gst_queue_init (GstQueue *queue) queue->queue = NULL; queue->level_buffers = 0; queue->max_buffers = 100; - queue->block = TRUE; +// queue->block = TRUE; queue->level_bytes = 0; queue->size_buffers = 0; queue->size_bytes = 0; @@ -269,10 +270,17 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf) GST_DEBUG (0,"queue: %s: chain %d %p\n", name, queue->level_buffers, buf); while (queue->level_buffers >= queue->max_buffers) { + // if there's a pending state change for this queue or its manager, switch + // back to iterator so bottom half of state change executes + if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING || + GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING) + { + GST_UNLOCK(queue); + cothread_switch(cothread_current_main()); + } + GST_DEBUG (0,"queue: %s waiting %d\n", name, queue->level_buffers); STATUS("%s: O\n"); - //g_cond_timed_wait (queue->fullcond, queue->fulllock, queue->timeval); - //FIXME need to signal other thread in case signals got lost? g_cond_signal (queue->emptycond); g_cond_wait (queue->fullcond, GST_OBJECT(queue)->lock); STATUS("%s: O+\n"); @@ -281,21 +289,19 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf) /* put the buffer on the tail of the list */ queue->queue = g_slist_append (queue->queue, buf); -// STATUS("%s: +\n"); GST_DEBUG (0,"(%s:%s)+ ",GST_DEBUG_PAD_NAME(pad)); /* if we were empty, but aren't any more, signal a condition */ - tosignal = (queue->level_buffers >= 0); queue->level_buffers++; +// if (queue->level_buffers >= 0) + if (queue->level_buffers == 1) + { + GST_DEBUG (0,"queue: %s signalling emptycond\n", name); + g_cond_signal (queue->emptycond); + } - /* we can unlock now */ GST_DEBUG (0,"queue: %s chain %d end signal(%d,%p)\n", name, queue->level_buffers, tosignal, queue->emptycond); - if (tosignal) { -// STATUS("%s: >\n"); - g_cond_signal (queue->emptycond); -// STATUS("%s: >>\n"); - } GST_UNLOCK (queue); } @@ -307,6 +313,8 @@ gst_queue_get (GstPad *pad) GSList *front; const guchar *name; + g_assert(pad != NULL); + g_assert(GST_IS_PAD(pad)); g_return_val_if_fail (pad != NULL, NULL); g_return_val_if_fail (GST_IS_PAD (pad), NULL); @@ -320,22 +328,32 @@ gst_queue_get (GstPad *pad) GST_DEBUG (0,"queue: %s have queue lock\n", name); // we bail if there's nothing there - if (!queue->level_buffers && !queue->block) { - GST_UNLOCK(queue); - return NULL; - } +// g_assert(queue->block); +// if (!queue->level_buffers && !queue->block) { +// GST_UNLOCK(queue); +// return NULL; +// } while (!queue->level_buffers) { - STATUS("queue: %s U released lock\n"); - //g_cond_timed_wait (queue->emptycond, queue->emptylock, queue->timeval); if (GST_FLAG_IS_SET (queue->sinkpad, GST_PAD_EOS)) { + STATUS("queue: %s U released lock\n"); + GST_UNLOCK(queue); gst_pad_set_eos (queue->srcpad); + // this return NULL shouldn't hurt anything... return NULL; } - //FIXME need to signal other thread in case signals got lost? + + // if there's a pending state change for this queue or its manager, switch + // back to iterator so bottom half of state change executes + if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING || + GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING) + { + GST_UNLOCK(queue); + cothread_switch(cothread_current_main()); + } + g_cond_signal (queue->fullcond); g_cond_wait (queue->emptycond, GST_OBJECT(queue)->lock); -// STATUS("queue: %s U- getting lock\n"); } front = queue->queue; @@ -344,32 +362,34 @@ gst_queue_get (GstPad *pad) queue->queue = g_slist_remove_link (queue->queue, front); g_slist_free (front); +// if (queue->level_buffers < queue->max_buffers) + if (queue->level_buffers == queue->max_buffers) + { + GST_DEBUG (0,"queue: %s signalling fullcond\n", name); + g_cond_signal (queue->fullcond); + } + queue->level_buffers--; -// STATUS("%s: -\n"); GST_DEBUG (0,"(%s:%s)- ",GST_DEBUG_PAD_NAME(pad)); - if (queue->level_buffers < queue->max_buffers) { -// STATUS("%s: < \n"); - g_cond_signal (queue->fullcond); -// STATUS("%s: << \n"); - } GST_UNLOCK(queue); -// GST_DEBUG (0,"queue: %s pushing %d %p \n", name, queue->level_buffers, buf); -// gst_pad_push (queue->srcpad, buf); -// GST_DEBUG (0,"queue: %s pushing %d done \n", name, queue->level_buffers); - return buf; - /* unlock now */ } static GstElementStateReturn gst_queue_change_state (GstElement *element) { GstQueue *queue; + GstElementStateReturn ret; g_return_val_if_fail (GST_IS_QUEUE (element), GST_STATE_FAILURE); queue = GST_QUEUE (element); + + // lock the queue so another thread (not in sync with this thread's state) + // can't call this queue's _get (or whatever) + GST_LOCK (queue); + GST_DEBUG (0,"gstqueue: state pending %d\n", GST_STATE_PENDING (element)); /* if going down into NULL state, clear out buffers*/ @@ -380,9 +400,41 @@ gst_queue_change_state (GstElement *element) /* if we haven't failed already, give the parent class a chance to ;-) */ if (GST_ELEMENT_CLASS (parent_class)->change_state) - return GST_ELEMENT_CLASS (parent_class)->change_state (element); + { + gboolean valid_handler = FALSE; + guint state_change_id = gtk_signal_lookup("state_change", GTK_OBJECT_TYPE(element)); - return GST_STATE_SUCCESS; + // determine whether we need to block the parent (element) class' + // STATE_CHANGE signal so we can UNLOCK before returning. we block + // it if we could find the state_change signal AND there's a signal + // handler attached to it. + // + // note: this assumes that change_state() *only* emits state_change signal. + // if element change_state() emits other signals, they need to be blocked + // as well. + if (state_change_id && + gtk_signal_handler_pending(GTK_OBJECT(element), state_change_id, FALSE)) + valid_handler = TRUE; + if (valid_handler) + gtk_signal_handler_block(GTK_OBJECT(element), state_change_id); + + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element); + + if (valid_handler) + gtk_signal_handler_unblock(GTK_OBJECT(element), state_change_id); + + // UNLOCK, *then* emit signal (if there's one there) + GST_UNLOCK(queue); + if (valid_handler) + gtk_signal_emit(GTK_OBJECT (element), state_change_id, GST_STATE(element)); + } + else + { + ret = GST_STATE_SUCCESS; + GST_UNLOCK(queue); + } + + return ret; } @@ -400,9 +452,9 @@ gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id) case ARG_MAX_LEVEL: queue->max_buffers = GTK_VALUE_INT (*arg); break; - case ARG_BLOCK: - queue->block = GTK_VALUE_BOOL (*arg); - break; +// case ARG_BLOCK: +// queue->block = GTK_VALUE_BOOL (*arg); +// break; default: break; } @@ -425,9 +477,9 @@ gst_queue_get_arg (GtkObject *object, GtkArg *arg, guint id) case ARG_MAX_LEVEL: GTK_VALUE_INT (*arg) = queue->max_buffers; break; - case ARG_BLOCK: - GTK_VALUE_BOOL (*arg) = queue->block; - break; +// case ARG_BLOCK: +// GTK_VALUE_BOOL (*arg) = queue->block; +// break; default: arg->type = GTK_TYPE_INVALID; break; diff --git a/gst/gstqueue.h b/gst/gstqueue.h index 606346735b..c14b622faa 100644 --- a/gst/gstqueue.h +++ b/gst/gstqueue.h @@ -61,7 +61,7 @@ struct _GstQueue { gint level_buffers; /* number of buffers queued here */ gint max_buffers; /* maximum number of buffers queued here */ - gboolean block; /* if set to FALSE, _get returns NULL if queue empty */ +// gboolean block; /* if set to FALSE, _get returns NULL if queue empty */ gint level_bytes; /* number of bytes queued here */ gint size_buffers; /* size of queue in buffers */ gint size_bytes; /* size of queue in bytes */ diff --git a/gst/gstscheduler.c b/gst/gstscheduler.c index d7f709e392..a727e2068a 100644 --- a/gst/gstscheduler.c +++ b/gst/gstscheduler.c @@ -1297,6 +1297,20 @@ g_return_val_if_fail (chains != NULL, FALSE); GST_DEBUG (GST_CAT_DATAFLOW,"set COTHREAD_STOPPING flag on \"%s\"(@%p)\n", GST_ELEMENT_NAME (entry),entry); cothread_switch (entry->threadstate); + + // following is a check to see if the chain was interrupted due to a + // top-half state_change(). (i.e., if there's a pending state.) + // + // if it was, return to gstthread.c::gst_thread_main_loop() to + // execute the state change. + GST_DEBUG (GST_CAT_DATAFLOW,"cothread switch ended or interrupted\n"); + if (GST_STATE_PENDING(GST_SCHEDULE(sched)->parent) != GST_STATE_NONE_PENDING) + { + GST_DEBUG (GST_CAT_DATAFLOW,"handle pending state %d\n", + GST_STATE_PENDING(GST_SCHEDULE(sched)->parent)); + return 0; + } + } else { GST_INFO (GST_CAT_DATAFLOW,"no entry into chain!"); } diff --git a/gst/gstthread.c b/gst/gstthread.c index 559e5c179e..eec495fa4a 100644 --- a/gst/gstthread.c +++ b/gst/gstthread.c @@ -27,7 +27,7 @@ #include "gstthread.h" #include "gstscheduler.h" - +#include "gstqueue.h" GstElementDetails gst_thread_details = { "Threaded container", @@ -252,7 +252,8 @@ gst_thread_change_state (GstElement *element) GST_DEBUG (GST_CAT_THREAD, "creating thread \"%s\"\n", GST_ELEMENT_NAME (GST_ELEMENT (element))); - g_mutex_lock (thread->lock); + g_mutex_lock(thread->lock); + // create the thread pthread_create (&thread->thread_id, NULL, gst_thread_main_loop, thread); @@ -260,8 +261,8 @@ gst_thread_change_state (GstElement *element) // wait for it to 'spin up' GST_DEBUG (GST_CAT_THREAD, "sync: waiting for spinup\n"); g_cond_wait(thread->cond,thread->lock); - g_mutex_unlock(thread->lock); GST_DEBUG (GST_CAT_THREAD, "sync: thread claims to be up\n"); + g_mutex_unlock(thread->lock); } else { GST_INFO (GST_CAT_THREAD, "NOT starting thread \"%s\"", GST_ELEMENT_NAME (GST_ELEMENT (element))); @@ -284,6 +285,30 @@ gst_thread_change_state (GstElement *element) GST_INFO (GST_CAT_THREAD,"pausing thread \"%s\"", GST_ELEMENT_NAME (GST_ELEMENT (element))); + // the following code ensures that the bottom half of thread will run + // to perform each elements' change_state() (by calling gstbin.c:: + // change_state()). + // + the pending state was already set by gstelement.c::set_state() + // + find every queue we manage, and signal its empty and full conditions + { + GList *elements = (element->sched)->elements; + while (elements) + { + GstElement *e = GST_ELEMENT(elements->data); + g_assert(e); + elements = g_list_next(elements); + if (GST_IS_QUEUE(e)) + { + //FIXME make this more efficient by only waking queues that are asleep + //FIXME and only waking the appropriate condition (depending on if it's + //FIXME on up- or down-stream side) + // + //FIXME also make this more efficient by keeping list of managed queues + g_cond_signal((GST_QUEUE(e)->emptycond)); + g_cond_signal((GST_QUEUE(e)->fullcond)); + } + } + } gst_thread_signal_thread(thread,FALSE); break; case GST_STATE_PLAYING_TO_READY: @@ -375,7 +400,6 @@ gst_thread_main_loop (void *arg) GST_DEBUG(0,"sync: removed spinning state due to failed iteration\n"); } } - GST_DEBUG (GST_CAT_THREAD, "sync: waiting at bottom of while for signal from main process\n"); g_mutex_lock (thread->lock); GST_DEBUG (GST_CAT_THREAD, "sync: signaling that the thread is out of the SPINNING loop\n"); @@ -399,6 +423,10 @@ gst_thread_main_loop (void *arg) } // the set flag is to say whether it should set TRUE or FALSE +// +// WARNING: this has synchronization built in! if you remove or add any +// locks, waits, signals, or unlocks you need to be sure they match the +// code above (in gst_thread_main_loop()). basically, don't change anything. static void gst_thread_signal_thread (GstThread *thread, gboolean spinning) { diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index 258eb928ed..485253b265 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -34,6 +34,7 @@ #include "gst_private.h" #include "gstqueue.h" +#include "gstscheduler.h" GstElementDetails gst_queue_details = { "Queue", @@ -55,7 +56,7 @@ enum { ARG_0, ARG_LEVEL, ARG_MAX_LEVEL, - ARG_BLOCK, +// ARG_BLOCK, }; @@ -115,8 +116,8 @@ gst_queue_class_init (GstQueueClass *klass) GTK_ARG_READABLE, ARG_LEVEL); gtk_object_add_arg_type ("GstQueue::max_level", GTK_TYPE_INT, GTK_ARG_READWRITE, ARG_MAX_LEVEL); - gtk_object_add_arg_type ("GstQueue::block", GTK_TYPE_BOOL, - GTK_ARG_READWRITE, ARG_BLOCK); +// gtk_object_add_arg_type ("GstQueue::block", GTK_TYPE_BOOL, +// GTK_ARG_READWRITE, ARG_BLOCK); gtkobject_class->set_arg = gst_queue_set_arg; gtkobject_class->get_arg = gst_queue_get_arg; @@ -145,7 +146,7 @@ gst_queue_init (GstQueue *queue) queue->queue = NULL; queue->level_buffers = 0; queue->max_buffers = 100; - queue->block = TRUE; +// queue->block = TRUE; queue->level_bytes = 0; queue->size_buffers = 0; queue->size_bytes = 0; @@ -269,10 +270,17 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf) GST_DEBUG (0,"queue: %s: chain %d %p\n", name, queue->level_buffers, buf); while (queue->level_buffers >= queue->max_buffers) { + // if there's a pending state change for this queue or its manager, switch + // back to iterator so bottom half of state change executes + if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING || + GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING) + { + GST_UNLOCK(queue); + cothread_switch(cothread_current_main()); + } + GST_DEBUG (0,"queue: %s waiting %d\n", name, queue->level_buffers); STATUS("%s: O\n"); - //g_cond_timed_wait (queue->fullcond, queue->fulllock, queue->timeval); - //FIXME need to signal other thread in case signals got lost? g_cond_signal (queue->emptycond); g_cond_wait (queue->fullcond, GST_OBJECT(queue)->lock); STATUS("%s: O+\n"); @@ -281,21 +289,19 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf) /* put the buffer on the tail of the list */ queue->queue = g_slist_append (queue->queue, buf); -// STATUS("%s: +\n"); GST_DEBUG (0,"(%s:%s)+ ",GST_DEBUG_PAD_NAME(pad)); /* if we were empty, but aren't any more, signal a condition */ - tosignal = (queue->level_buffers >= 0); queue->level_buffers++; +// if (queue->level_buffers >= 0) + if (queue->level_buffers == 1) + { + GST_DEBUG (0,"queue: %s signalling emptycond\n", name); + g_cond_signal (queue->emptycond); + } - /* we can unlock now */ GST_DEBUG (0,"queue: %s chain %d end signal(%d,%p)\n", name, queue->level_buffers, tosignal, queue->emptycond); - if (tosignal) { -// STATUS("%s: >\n"); - g_cond_signal (queue->emptycond); -// STATUS("%s: >>\n"); - } GST_UNLOCK (queue); } @@ -307,6 +313,8 @@ gst_queue_get (GstPad *pad) GSList *front; const guchar *name; + g_assert(pad != NULL); + g_assert(GST_IS_PAD(pad)); g_return_val_if_fail (pad != NULL, NULL); g_return_val_if_fail (GST_IS_PAD (pad), NULL); @@ -320,22 +328,32 @@ gst_queue_get (GstPad *pad) GST_DEBUG (0,"queue: %s have queue lock\n", name); // we bail if there's nothing there - if (!queue->level_buffers && !queue->block) { - GST_UNLOCK(queue); - return NULL; - } +// g_assert(queue->block); +// if (!queue->level_buffers && !queue->block) { +// GST_UNLOCK(queue); +// return NULL; +// } while (!queue->level_buffers) { - STATUS("queue: %s U released lock\n"); - //g_cond_timed_wait (queue->emptycond, queue->emptylock, queue->timeval); if (GST_FLAG_IS_SET (queue->sinkpad, GST_PAD_EOS)) { + STATUS("queue: %s U released lock\n"); + GST_UNLOCK(queue); gst_pad_set_eos (queue->srcpad); + // this return NULL shouldn't hurt anything... return NULL; } - //FIXME need to signal other thread in case signals got lost? + + // if there's a pending state change for this queue or its manager, switch + // back to iterator so bottom half of state change executes + if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING || + GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING) + { + GST_UNLOCK(queue); + cothread_switch(cothread_current_main()); + } + g_cond_signal (queue->fullcond); g_cond_wait (queue->emptycond, GST_OBJECT(queue)->lock); -// STATUS("queue: %s U- getting lock\n"); } front = queue->queue; @@ -344,32 +362,34 @@ gst_queue_get (GstPad *pad) queue->queue = g_slist_remove_link (queue->queue, front); g_slist_free (front); +// if (queue->level_buffers < queue->max_buffers) + if (queue->level_buffers == queue->max_buffers) + { + GST_DEBUG (0,"queue: %s signalling fullcond\n", name); + g_cond_signal (queue->fullcond); + } + queue->level_buffers--; -// STATUS("%s: -\n"); GST_DEBUG (0,"(%s:%s)- ",GST_DEBUG_PAD_NAME(pad)); - if (queue->level_buffers < queue->max_buffers) { -// STATUS("%s: < \n"); - g_cond_signal (queue->fullcond); -// STATUS("%s: << \n"); - } GST_UNLOCK(queue); -// GST_DEBUG (0,"queue: %s pushing %d %p \n", name, queue->level_buffers, buf); -// gst_pad_push (queue->srcpad, buf); -// GST_DEBUG (0,"queue: %s pushing %d done \n", name, queue->level_buffers); - return buf; - /* unlock now */ } static GstElementStateReturn gst_queue_change_state (GstElement *element) { GstQueue *queue; + GstElementStateReturn ret; g_return_val_if_fail (GST_IS_QUEUE (element), GST_STATE_FAILURE); queue = GST_QUEUE (element); + + // lock the queue so another thread (not in sync with this thread's state) + // can't call this queue's _get (or whatever) + GST_LOCK (queue); + GST_DEBUG (0,"gstqueue: state pending %d\n", GST_STATE_PENDING (element)); /* if going down into NULL state, clear out buffers*/ @@ -380,9 +400,41 @@ gst_queue_change_state (GstElement *element) /* if we haven't failed already, give the parent class a chance to ;-) */ if (GST_ELEMENT_CLASS (parent_class)->change_state) - return GST_ELEMENT_CLASS (parent_class)->change_state (element); + { + gboolean valid_handler = FALSE; + guint state_change_id = gtk_signal_lookup("state_change", GTK_OBJECT_TYPE(element)); - return GST_STATE_SUCCESS; + // determine whether we need to block the parent (element) class' + // STATE_CHANGE signal so we can UNLOCK before returning. we block + // it if we could find the state_change signal AND there's a signal + // handler attached to it. + // + // note: this assumes that change_state() *only* emits state_change signal. + // if element change_state() emits other signals, they need to be blocked + // as well. + if (state_change_id && + gtk_signal_handler_pending(GTK_OBJECT(element), state_change_id, FALSE)) + valid_handler = TRUE; + if (valid_handler) + gtk_signal_handler_block(GTK_OBJECT(element), state_change_id); + + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element); + + if (valid_handler) + gtk_signal_handler_unblock(GTK_OBJECT(element), state_change_id); + + // UNLOCK, *then* emit signal (if there's one there) + GST_UNLOCK(queue); + if (valid_handler) + gtk_signal_emit(GTK_OBJECT (element), state_change_id, GST_STATE(element)); + } + else + { + ret = GST_STATE_SUCCESS; + GST_UNLOCK(queue); + } + + return ret; } @@ -400,9 +452,9 @@ gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id) case ARG_MAX_LEVEL: queue->max_buffers = GTK_VALUE_INT (*arg); break; - case ARG_BLOCK: - queue->block = GTK_VALUE_BOOL (*arg); - break; +// case ARG_BLOCK: +// queue->block = GTK_VALUE_BOOL (*arg); +// break; default: break; } @@ -425,9 +477,9 @@ gst_queue_get_arg (GtkObject *object, GtkArg *arg, guint id) case ARG_MAX_LEVEL: GTK_VALUE_INT (*arg) = queue->max_buffers; break; - case ARG_BLOCK: - GTK_VALUE_BOOL (*arg) = queue->block; - break; +// case ARG_BLOCK: +// GTK_VALUE_BOOL (*arg) = queue->block; +// break; default: arg->type = GTK_TYPE_INVALID; break; diff --git a/plugins/elements/gstqueue.h b/plugins/elements/gstqueue.h index 606346735b..c14b622faa 100644 --- a/plugins/elements/gstqueue.h +++ b/plugins/elements/gstqueue.h @@ -61,7 +61,7 @@ struct _GstQueue { gint level_buffers; /* number of buffers queued here */ gint max_buffers; /* maximum number of buffers queued here */ - gboolean block; /* if set to FALSE, _get returns NULL if queue empty */ +// gboolean block; /* if set to FALSE, _get returns NULL if queue empty */ gint level_bytes; /* number of bytes queued here */ gint size_buffers; /* size of queue in buffers */ gint size_bytes; /* size of queue in bytes */