From a7cd58c92eb9e6a7f9f8e0de8c3b94960f952ad6 Mon Sep 17 00:00:00 2001 From: Erik Walthinsen Date: Fri, 25 May 2001 00:29:25 +0000 Subject: [PATCH] fixed some interruptability problems with thread and queue Original commit message from CVS: fixed some interruptability problems with thread and queue --- gst/gstpad.c | 8 ++++---- gst/gstqueue.c | 18 ++++++++++++++++-- gst/gstqueue.h | 1 + gst/gstscheduler.h | 2 ++ gst/gstthread.c | 34 +++++++++++++++++++++++++++------- plugins/elements/gstqueue.c | 18 ++++++++++++++++-- plugins/elements/gstqueue.h | 1 + 7 files changed, 67 insertions(+), 15 deletions(-) diff --git a/gst/gstpad.c b/gst/gstpad.c index 47441d91aa..d073693f52 100644 --- a/gst/gstpad.c +++ b/gst/gstpad.c @@ -1438,9 +1438,9 @@ gst_pad_push (GstPad *pad, GstBuffer *buf) { GstRealPad *peer = GST_RPAD_PEER (pad); - g_return_if_fail (peer != NULL); - GST_DEBUG_ENTER ("(%s:%s)", GST_DEBUG_PAD_NAME (pad)); + + g_return_if_fail (peer != NULL); if (peer->pushfunc) { GST_DEBUG (GST_CAT_DATAFLOW, "calling pushfunc &%s of peer pad %s:%s\n", @@ -1465,10 +1465,10 @@ gst_pad_pull (GstPad *pad) { GstRealPad *peer = GST_RPAD_PEER(pad); - g_return_val_if_fail (peer != NULL, NULL); - GST_DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad)); + g_return_val_if_fail (peer != NULL, NULL); + if (peer->pullfunc) { GST_DEBUG (GST_CAT_DATAFLOW,"calling pullfunc %s of peer pad %s:%s\n", GST_DEBUG_FUNCPTR_NAME(peer->pullfunc),GST_DEBUG_PAD_NAME(peer)); diff --git a/gst/gstqueue.c b/gst/gstqueue.c index 7c255b97cd..896d240845 100644 --- a/gst/gstqueue.c +++ b/gst/gstqueue.c @@ -323,8 +323,15 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf) // if there's a pending state change for this queue or its manager, switch // back to iterator so bottom half of state change executes if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING || - GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING) +// GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING) +GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->sinkpad))))) != +GST_STATE_NONE_PENDING) { + GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n"); + if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING) + GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING)\n"); + if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING) + GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING\n"); GST_UNLOCK(queue); cothread_switch(cothread_current_main()); } @@ -386,8 +393,15 @@ gst_queue_get (GstPad *pad) // if there's a pending state change for this queue or its manager, switch // back to iterator so bottom half of state change executes if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING || - GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING) +// GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING) +GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->srcpad))))) != +GST_STATE_NONE_PENDING) { + GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n"); + if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING) + GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING)\n"); + if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING) + GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING\n"); GST_UNLOCK(queue); cothread_switch(cothread_current_main()); } diff --git a/gst/gstqueue.h b/gst/gstqueue.h index 5f695f6667..085d5ac122 100644 --- a/gst/gstqueue.h +++ b/gst/gstqueue.h @@ -75,6 +75,7 @@ struct _GstQueue { gint leaky; /* whether the queue is leaky, and if so at which end */ +// GMutex *lock; (optimization?) GCond *emptycond; GCond *fullcond; diff --git a/gst/gstscheduler.h b/gst/gstscheduler.h index d980c21c0f..cdc9052219 100644 --- a/gst/gstscheduler.h +++ b/gst/gstscheduler.h @@ -45,6 +45,8 @@ extern "C" { (GTK_CHECK_CLASS_TYPE((klass),GST_TYPE_SCHEDULE)) +#define GST_SCHED_PARENT(sched) ((sched)->parent) + //typedef struct _GstSchedule GstSchedule; //typedef struct _GstScheduleClass GstScheduleClass; typedef struct _GstScheduleChain GstScheduleChain; diff --git a/gst/gstthread.c b/gst/gstthread.c index aec3f4bb44..87c20d699a 100644 --- a/gst/gstthread.c +++ b/gst/gstthread.c @@ -250,6 +250,7 @@ gst_thread_change_state (GstElement *element) gboolean stateset = GST_STATE_SUCCESS; gint transition; pthread_t self = pthread_self(); + GstElement *peerelement; g_return_val_if_fail (GST_IS_THREAD(element), FALSE); // GST_DEBUG_ENTER("(\"%s\")",GST_ELEMENT_NAME(element)); @@ -328,10 +329,10 @@ gst_thread_change_state (GstElement *element) if (pthread_equal(self, thread->thread_id)) { //FIXME this should not happen - g_assert(!pthread_equal(self, thread->thread_id)); GST_DEBUG(GST_CAT_THREAD,"no sync(" GST_DEBUG_THREAD_FORMAT "): setting own thread's state to paused\n", GST_DEBUG_THREAD_ARGS(thread->pid)); GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING); + g_assert(!pthread_equal(self, thread->thread_id)); } else { @@ -357,8 +358,10 @@ gst_thread_change_state (GstElement *element) // //FIXME also make this more efficient by keeping list of managed queues THR_DEBUG("waking queue \"%s\"\n",GST_ELEMENT_NAME(e)); + GST_LOCK(e); g_cond_signal((GST_QUEUE(e)->emptycond)); g_cond_signal((GST_QUEUE(e)->fullcond)); + GST_UNLOCK(e); } else { @@ -367,16 +370,32 @@ gst_thread_change_state (GstElement *element) { GstPad *p = GST_PAD(pads->data); pads = g_list_next(pads); - if (GST_IS_REAL_PAD(p) && - GST_ELEMENT_SCHED(e) != GST_ELEMENT_SCHED(GST_ELEMENT(GST_PAD_PARENT(GST_PAD_PEER(p))))) + + peerelement = GST_PAD_PARENT(GST_PAD_PEER(p)); + if (!peerelement) continue; // deal with case where there's no peer + + if (!GST_FLAG_IS_SET(peerelement,GST_ELEMENT_DECOUPLED)) { + GST_DEBUG(GST_CAT_THREAD,"peer element isn't DECOUPLED\n"); + continue; + } + + // FIXME this needs to go away eventually + if (!GST_IS_QUEUE(peerelement)) { + GST_DEBUG(GST_CAT_THREAD,"peer element isn't a Queue\n"); + continue; + } + + if (GST_ELEMENT_SCHED(peerelement) != GST_ELEMENT_SCHED(thread)) +// GST_ELEMENT_SCHED(e) != GST_ELEMENT_SCHED(GST_ELEMENT(GST_PAD_PARENT(GST_PAD_PEER(p))))) { THR_DEBUG(" element \"%s\" has pad cross sched boundary\n",GST_ELEMENT_NAME(e)); // FIXME i assume this signals our own (current) thread so don't need to lock // FIXME however, this *may* go to yet another thread for which we need locks // FIXME i'm too tired to deal with this now - g_cond_signal(GST_QUEUE(GST_ELEMENT(GST_PAD_PARENT(GST_PAD_PEER(p))))->emptycond); - g_cond_signal(GST_QUEUE(GST_ELEMENT(GST_PAD_PARENT(GST_PAD_PEER(p))))->fullcond); - + GST_LOCK(peerelement); + g_cond_signal(GST_QUEUE(peerelement)->emptycond); + g_cond_signal(GST_QUEUE(peerelement)->fullcond); + GST_UNLOCK(peerelement); } } } @@ -396,6 +415,7 @@ gst_thread_change_state (GstElement *element) } else { + // FIXME FIXME we need to interrupt, or reorder the states! THR_DEBUG("telling thread to pause (ready)\n"); g_mutex_lock(thread->lock); gst_thread_signal_thread(thread,FALSE); @@ -486,7 +506,7 @@ gst_thread_main_loop (void *arg) THR_DEBUG_MAIN("parent thread has signaled back at top of while\n"); // now is a good time to change the state of the children and the thread itself gst_thread_update_state (thread); - THR_DEBUG_MAIN("doe changing state, signaling back to parent process\n"); + THR_DEBUG_MAIN("done changing state, signaling back to parent process\n"); g_cond_signal (thread->cond); g_mutex_unlock (thread->lock); THR_DEBUG_MAIN("done syncing with parent process at top of while\n"); diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index 7c255b97cd..896d240845 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -323,8 +323,15 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf) // if there's a pending state change for this queue or its manager, switch // back to iterator so bottom half of state change executes if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING || - GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING) +// GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING) +GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->sinkpad))))) != +GST_STATE_NONE_PENDING) { + GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n"); + if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING) + GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING)\n"); + if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING) + GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING\n"); GST_UNLOCK(queue); cothread_switch(cothread_current_main()); } @@ -386,8 +393,15 @@ gst_queue_get (GstPad *pad) // if there's a pending state change for this queue or its manager, switch // back to iterator so bottom half of state change executes if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING || - GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING) +// GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING) +GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->srcpad))))) != +GST_STATE_NONE_PENDING) { + GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n"); + if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING) + GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING)\n"); + if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING) + GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING\n"); GST_UNLOCK(queue); cothread_switch(cothread_current_main()); } diff --git a/plugins/elements/gstqueue.h b/plugins/elements/gstqueue.h index 5f695f6667..085d5ac122 100644 --- a/plugins/elements/gstqueue.h +++ b/plugins/elements/gstqueue.h @@ -75,6 +75,7 @@ struct _GstQueue { gint leaky; /* whether the queue is leaky, and if so at which end */ +// GMutex *lock; (optimization?) GCond *emptycond; GCond *fullcond;