diff --git a/ChangeLog b/ChangeLog index f2885cc9fb..576e8e6535 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,18 @@ +2006-08-23 Wim Taymans + + * gst/gstpad.c: (gst_pad_start_task), (gst_pad_pause_task), + (gst_pad_stop_task): + Improve debugging for task functions. + + * gst/gsttask.c: (gst_task_func), (gst_task_set_lock), + (gst_task_start), (gst_task_pause), (gst_task_join): + Make sure that the task function started and finished after a + join(). + Don't try to push the task function on the threadpool multiple + times. + Improve the g_warning message with some useful suggestions + about how to fix the problem. + 2006-08-23 Wim Taymans * gst/gstutils.c: (gst_pad_proxy_getcaps): diff --git a/gst/gstpad.c b/gst/gstpad.c index 33600db920..b95c6c1752 100644 --- a/gst/gstpad.c +++ b/gst/gstpad.c @@ -4104,12 +4104,15 @@ gst_pad_start_task (GstPad * pad, GstTaskFunction func, gpointer data) g_return_val_if_fail (GST_IS_PAD (pad), FALSE); g_return_val_if_fail (func != NULL, FALSE); + GST_DEBUG_OBJECT (pad, "start task"); + GST_OBJECT_LOCK (pad); task = GST_PAD_TASK (pad); if (task == NULL) { task = gst_task_create (func, data); gst_task_set_lock (task, GST_PAD_GET_STREAM_LOCK (pad)); GST_PAD_TASK (pad) = task; + GST_DEBUG_OBJECT (pad, "created task"); } gst_task_start (task); GST_OBJECT_UNLOCK (pad); @@ -4134,6 +4137,8 @@ gst_pad_pause_task (GstPad * pad) g_return_val_if_fail (GST_IS_PAD (pad), FALSE); + GST_DEBUG_OBJECT (pad, "pause task"); + GST_OBJECT_LOCK (pad); task = GST_PAD_TASK (pad); if (task == NULL) @@ -4177,6 +4182,8 @@ gst_pad_stop_task (GstPad * pad) g_return_val_if_fail (GST_IS_PAD (pad), FALSE); + GST_DEBUG_OBJECT (pad, "stop task"); + GST_OBJECT_LOCK (pad); task = GST_PAD_TASK (pad); if (task == NULL) @@ -4197,6 +4204,7 @@ gst_pad_stop_task (GstPad * pad) no_task: { + GST_DEBUG_OBJECT (pad, "no task"); GST_OBJECT_UNLOCK (pad); GST_PAD_STREAM_LOCK (pad); @@ -4211,6 +4219,7 @@ join_failed: * the task's thread. We install the task again so that it will be stopped * again from the right thread next time hopefully. */ GST_OBJECT_LOCK (pad); + GST_DEBUG_OBJECT (pad, "join failed"); /* we can only install this task if there was no other task */ if (GST_PAD_TASK (pad) == NULL) GST_PAD_TASK (pad) = task; diff --git a/gst/gsttask.c b/gst/gsttask.c index 8e49738945..8458ad538a 100644 --- a/gst/gsttask.c +++ b/gst/gsttask.c @@ -162,7 +162,6 @@ gst_task_func (GstTask * task, GstTaskClass * tclass) lock = GST_TASK_GET_LOCK (task); if (G_UNLIKELY (lock == NULL)) goto no_lock; - task->running = TRUE; task->abidata.ABI.thread = tself; GST_OBJECT_UNLOCK (task); @@ -198,11 +197,18 @@ done: GST_OBJECT_UNLOCK (task); g_static_rec_mutex_unlock (lock); - /* now we allow messing with the lock again */ GST_OBJECT_LOCK (task); - task->running = FALSE; task->abidata.ABI.thread = NULL; + exit: + /* now we allow messing with the lock again by setting the running flag to + * FALSE. Together with the SIGNAL this is the sign for the _join() to + * complete. + * Note that we still have not dropped the final ref on the task. We could + * check here if there is a pending join() going on and drop the last ref + * before releasing the lock as we can be sure that a ref is held by the + * caller of the join(). */ + task->running = FALSE; GST_TASK_SIGNAL (task); GST_OBJECT_UNLOCK (task); @@ -256,6 +262,9 @@ gst_task_cleanup_all (void) * with @data as a parameter. Typically the task will run in * a new thread. * + * The function cannot be changed after the task has been created. You + * must create a new GstTask to change the function. + * * Returns: A new #GstTask. * * MT safe. @@ -291,7 +300,7 @@ void gst_task_set_lock (GstTask * task, GStaticRecMutex * mutex) { GST_OBJECT_LOCK (task); - if (task->running) + if (G_UNLIKELY (task->running)) goto is_running; GST_TASK_GET_LOCK (task) = mutex; GST_OBJECT_UNLOCK (task); @@ -362,11 +371,20 @@ gst_task_start (GstTask * task) { GstTaskClass *tclass; - tclass = GST_TASK_GET_CLASS (task); + /* If the task already has a thread scheduled we don't have to do + * anything. */ + if (task->running) + break; /* new task, push on threadpool. We ref before so * that it remains alive while on the threadpool. */ gst_object_ref (task); + /* mark task as running so that a join will wait until we schedule + * and exit the task function. */ + task->running = TRUE; + + tclass = GST_TASK_GET_CLASS (task); + g_static_mutex_lock (&pool_lock); g_thread_pool_push (tclass->pool, task, NULL); g_static_mutex_unlock (&pool_lock); @@ -468,9 +486,14 @@ gst_task_pause (GstTask * task) { GstTaskClass *tclass; - tclass = GST_TASK_GET_CLASS (task); + if (task->running) + break; gst_object_ref (task); + task->running = TRUE; + + tclass = GST_TASK_GET_CLASS (task); + g_static_mutex_lock (&pool_lock); g_thread_pool_push (tclass->pool, task, NULL); g_static_mutex_unlock (&pool_lock); @@ -505,7 +528,8 @@ no_lock: * The task will automatically be stopped with this call. * * This function cannot be called from within a task function as this - * will cause a deadlock. + * would cause a deadlock. The function will detect this and print a + * g_warning. * * Returns: TRUE if the task could be joined. * @@ -525,11 +549,15 @@ gst_task_join (GstTask * task) /* we don't use a real thread join here because we are using * threadpools */ GST_OBJECT_LOCK (task); - if (tself == task->abidata.ABI.thread) + if (G_UNLIKELY (tself == task->abidata.ABI.thread)) goto joining_self; task->state = GST_TASK_STOPPED; + /* signal the state change for when it was blocked in PAUSED. */ GST_TASK_SIGNAL (task); - while (task->running) + /* we set the running flag when pushing the task on the threadpool. + * This means that the task function might not be called when we try + * to join it here. */ + while (G_LIKELY (task->running)) GST_TASK_WAIT (task); GST_OBJECT_UNLOCK (task); @@ -542,7 +570,10 @@ joining_self: { GST_WARNING_OBJECT (task, "trying to join task from its thread"); GST_OBJECT_UNLOCK (task); - g_warning ("trying to join task %p from its thread would deadlock", task); + g_warning ("\nTrying to join task %p from its thread would deadlock.\n" + "You cannot change the state of an element from its streaming\n" + "thread. Use g_idle_add() or post a GstMessage on the bus to\n" + "schedule the state change from the main thread.\n", task); return FALSE; } }