fixed some interruptability problems with thread and queue

Original commit message from CVS:
fixed some interruptability problems with thread and queue
This commit is contained in:
Erik Walthinsen 2001-05-25 00:29:25 +00:00
parent a74b02deb8
commit a7cd58c92e
7 changed files with 67 additions and 15 deletions

View file

@ -1438,9 +1438,9 @@ gst_pad_push (GstPad *pad, GstBuffer *buf)
{ {
GstRealPad *peer = GST_RPAD_PEER (pad); GstRealPad *peer = GST_RPAD_PEER (pad);
g_return_if_fail (peer != NULL);
GST_DEBUG_ENTER ("(%s:%s)", GST_DEBUG_PAD_NAME (pad)); GST_DEBUG_ENTER ("(%s:%s)", GST_DEBUG_PAD_NAME (pad));
g_return_if_fail (peer != NULL);
if (peer->pushfunc) { if (peer->pushfunc) {
GST_DEBUG (GST_CAT_DATAFLOW, "calling pushfunc &%s of peer pad %s:%s\n", GST_DEBUG (GST_CAT_DATAFLOW, "calling pushfunc &%s of peer pad %s:%s\n",
@ -1465,10 +1465,10 @@ gst_pad_pull (GstPad *pad)
{ {
GstRealPad *peer = GST_RPAD_PEER(pad); GstRealPad *peer = GST_RPAD_PEER(pad);
g_return_val_if_fail (peer != NULL, NULL);
GST_DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad)); GST_DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad));
g_return_val_if_fail (peer != NULL, NULL);
if (peer->pullfunc) { if (peer->pullfunc) {
GST_DEBUG (GST_CAT_DATAFLOW,"calling pullfunc %s of peer pad %s:%s\n", GST_DEBUG (GST_CAT_DATAFLOW,"calling pullfunc %s of peer pad %s:%s\n",
GST_DEBUG_FUNCPTR_NAME(peer->pullfunc),GST_DEBUG_PAD_NAME(peer)); GST_DEBUG_FUNCPTR_NAME(peer->pullfunc),GST_DEBUG_PAD_NAME(peer));

View file

@ -323,8 +323,15 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
// if there's a pending state change for this queue or its manager, switch // if there's a pending state change for this queue or its manager, switch
// back to iterator so bottom half of state change executes // back to iterator so bottom half of state change executes
if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING || if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING ||
GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING) // GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING)
GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->sinkpad))))) !=
GST_STATE_NONE_PENDING)
{ {
GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n");
if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING)
GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING)\n");
if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING)
GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING\n");
GST_UNLOCK(queue); GST_UNLOCK(queue);
cothread_switch(cothread_current_main()); cothread_switch(cothread_current_main());
} }
@ -386,8 +393,15 @@ gst_queue_get (GstPad *pad)
// if there's a pending state change for this queue or its manager, switch // if there's a pending state change for this queue or its manager, switch
// back to iterator so bottom half of state change executes // back to iterator so bottom half of state change executes
if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING || if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING ||
GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING) // GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING)
GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->srcpad))))) !=
GST_STATE_NONE_PENDING)
{ {
GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n");
if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING)
GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING)\n");
if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING)
GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING\n");
GST_UNLOCK(queue); GST_UNLOCK(queue);
cothread_switch(cothread_current_main()); cothread_switch(cothread_current_main());
} }

View file

@ -75,6 +75,7 @@ struct _GstQueue {
gint leaky; /* whether the queue is leaky, and if so at which end */ gint leaky; /* whether the queue is leaky, and if so at which end */
// GMutex *lock; (optimization?)
GCond *emptycond; GCond *emptycond;
GCond *fullcond; GCond *fullcond;

View file

@ -45,6 +45,8 @@ extern "C" {
(GTK_CHECK_CLASS_TYPE((klass),GST_TYPE_SCHEDULE)) (GTK_CHECK_CLASS_TYPE((klass),GST_TYPE_SCHEDULE))
#define GST_SCHED_PARENT(sched) ((sched)->parent)
//typedef struct _GstSchedule GstSchedule; //typedef struct _GstSchedule GstSchedule;
//typedef struct _GstScheduleClass GstScheduleClass; //typedef struct _GstScheduleClass GstScheduleClass;
typedef struct _GstScheduleChain GstScheduleChain; typedef struct _GstScheduleChain GstScheduleChain;

View file

@ -250,6 +250,7 @@ gst_thread_change_state (GstElement *element)
gboolean stateset = GST_STATE_SUCCESS; gboolean stateset = GST_STATE_SUCCESS;
gint transition; gint transition;
pthread_t self = pthread_self(); pthread_t self = pthread_self();
GstElement *peerelement;
g_return_val_if_fail (GST_IS_THREAD(element), FALSE); g_return_val_if_fail (GST_IS_THREAD(element), FALSE);
// GST_DEBUG_ENTER("(\"%s\")",GST_ELEMENT_NAME(element)); // GST_DEBUG_ENTER("(\"%s\")",GST_ELEMENT_NAME(element));
@ -328,10 +329,10 @@ gst_thread_change_state (GstElement *element)
if (pthread_equal(self, thread->thread_id)) if (pthread_equal(self, thread->thread_id))
{ {
//FIXME this should not happen //FIXME this should not happen
g_assert(!pthread_equal(self, thread->thread_id));
GST_DEBUG(GST_CAT_THREAD,"no sync(" GST_DEBUG_THREAD_FORMAT "): setting own thread's state to paused\n", GST_DEBUG(GST_CAT_THREAD,"no sync(" GST_DEBUG_THREAD_FORMAT "): setting own thread's state to paused\n",
GST_DEBUG_THREAD_ARGS(thread->pid)); GST_DEBUG_THREAD_ARGS(thread->pid));
GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING); GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
g_assert(!pthread_equal(self, thread->thread_id));
} }
else else
{ {
@ -357,8 +358,10 @@ gst_thread_change_state (GstElement *element)
// //
//FIXME also make this more efficient by keeping list of managed queues //FIXME also make this more efficient by keeping list of managed queues
THR_DEBUG("waking queue \"%s\"\n",GST_ELEMENT_NAME(e)); THR_DEBUG("waking queue \"%s\"\n",GST_ELEMENT_NAME(e));
GST_LOCK(e);
g_cond_signal((GST_QUEUE(e)->emptycond)); g_cond_signal((GST_QUEUE(e)->emptycond));
g_cond_signal((GST_QUEUE(e)->fullcond)); g_cond_signal((GST_QUEUE(e)->fullcond));
GST_UNLOCK(e);
} }
else else
{ {
@ -367,16 +370,32 @@ gst_thread_change_state (GstElement *element)
{ {
GstPad *p = GST_PAD(pads->data); GstPad *p = GST_PAD(pads->data);
pads = g_list_next(pads); pads = g_list_next(pads);
if (GST_IS_REAL_PAD(p) &&
GST_ELEMENT_SCHED(e) != GST_ELEMENT_SCHED(GST_ELEMENT(GST_PAD_PARENT(GST_PAD_PEER(p))))) peerelement = GST_PAD_PARENT(GST_PAD_PEER(p));
if (!peerelement) continue; // deal with case where there's no peer
if (!GST_FLAG_IS_SET(peerelement,GST_ELEMENT_DECOUPLED)) {
GST_DEBUG(GST_CAT_THREAD,"peer element isn't DECOUPLED\n");
continue;
}
// FIXME this needs to go away eventually
if (!GST_IS_QUEUE(peerelement)) {
GST_DEBUG(GST_CAT_THREAD,"peer element isn't a Queue\n");
continue;
}
if (GST_ELEMENT_SCHED(peerelement) != GST_ELEMENT_SCHED(thread))
// GST_ELEMENT_SCHED(e) != GST_ELEMENT_SCHED(GST_ELEMENT(GST_PAD_PARENT(GST_PAD_PEER(p)))))
{ {
THR_DEBUG(" element \"%s\" has pad cross sched boundary\n",GST_ELEMENT_NAME(e)); THR_DEBUG(" element \"%s\" has pad cross sched boundary\n",GST_ELEMENT_NAME(e));
// FIXME i assume this signals our own (current) thread so don't need to lock // FIXME i assume this signals our own (current) thread so don't need to lock
// FIXME however, this *may* go to yet another thread for which we need locks // FIXME however, this *may* go to yet another thread for which we need locks
// FIXME i'm too tired to deal with this now // FIXME i'm too tired to deal with this now
g_cond_signal(GST_QUEUE(GST_ELEMENT(GST_PAD_PARENT(GST_PAD_PEER(p))))->emptycond); GST_LOCK(peerelement);
g_cond_signal(GST_QUEUE(GST_ELEMENT(GST_PAD_PARENT(GST_PAD_PEER(p))))->fullcond); g_cond_signal(GST_QUEUE(peerelement)->emptycond);
g_cond_signal(GST_QUEUE(peerelement)->fullcond);
GST_UNLOCK(peerelement);
} }
} }
} }
@ -396,6 +415,7 @@ gst_thread_change_state (GstElement *element)
} }
else else
{ {
// FIXME FIXME we need to interrupt, or reorder the states!
THR_DEBUG("telling thread to pause (ready)\n"); THR_DEBUG("telling thread to pause (ready)\n");
g_mutex_lock(thread->lock); g_mutex_lock(thread->lock);
gst_thread_signal_thread(thread,FALSE); gst_thread_signal_thread(thread,FALSE);
@ -486,7 +506,7 @@ gst_thread_main_loop (void *arg)
THR_DEBUG_MAIN("parent thread has signaled back at top of while\n"); THR_DEBUG_MAIN("parent thread has signaled back at top of while\n");
// now is a good time to change the state of the children and the thread itself // now is a good time to change the state of the children and the thread itself
gst_thread_update_state (thread); gst_thread_update_state (thread);
THR_DEBUG_MAIN("doe changing state, signaling back to parent process\n"); THR_DEBUG_MAIN("done changing state, signaling back to parent process\n");
g_cond_signal (thread->cond); g_cond_signal (thread->cond);
g_mutex_unlock (thread->lock); g_mutex_unlock (thread->lock);
THR_DEBUG_MAIN("done syncing with parent process at top of while\n"); THR_DEBUG_MAIN("done syncing with parent process at top of while\n");

View file

@ -323,8 +323,15 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
// if there's a pending state change for this queue or its manager, switch // if there's a pending state change for this queue or its manager, switch
// back to iterator so bottom half of state change executes // back to iterator so bottom half of state change executes
if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING || if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING ||
GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING) // GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING)
GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->sinkpad))))) !=
GST_STATE_NONE_PENDING)
{ {
GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n");
if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING)
GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING)\n");
if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING)
GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING\n");
GST_UNLOCK(queue); GST_UNLOCK(queue);
cothread_switch(cothread_current_main()); cothread_switch(cothread_current_main());
} }
@ -386,8 +393,15 @@ gst_queue_get (GstPad *pad)
// if there's a pending state change for this queue or its manager, switch // if there's a pending state change for this queue or its manager, switch
// back to iterator so bottom half of state change executes // back to iterator so bottom half of state change executes
if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING || if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING ||
GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING) // GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING)
GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->srcpad))))) !=
GST_STATE_NONE_PENDING)
{ {
GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n");
if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING)
GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING)\n");
if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING)
GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING\n");
GST_UNLOCK(queue); GST_UNLOCK(queue);
cothread_switch(cothread_current_main()); cothread_switch(cothread_current_main());
} }

View file

@ -75,6 +75,7 @@ struct _GstQueue {
gint leaky; /* whether the queue is leaky, and if so at which end */ gint leaky; /* whether the queue is leaky, and if so at which end */
// GMutex *lock; (optimization?)
GCond *emptycond; GCond *emptycond;
GCond *fullcond; GCond *fullcond;