GstTask: use GstTaskPool for managing threads

Use the new GstTaskPool to handle streaming threads.
This commit is contained in:
Wim Taymans 2009-05-12 00:25:11 +02:00 committed by Wim Taymans
parent 4d326be6cf
commit 5fdd5e0a1e
2 changed files with 51 additions and 19 deletions

View file

@ -79,6 +79,9 @@ struct _GstTaskPrivate
gboolean prio_set;
GThreadPriority priority;
GstTaskPool *pool;
gpointer id;
};
static void gst_task_class_init (GstTaskClass * klass);
@ -96,6 +99,20 @@ static GStaticMutex pool_lock = G_STATIC_MUTEX_INIT;
G_DEFINE_TYPE_WITH_CODE (GstTask, gst_task, GST_TYPE_OBJECT, _do_init);
static void
init_klass_pool (GstTaskClass * klass)
{
g_static_mutex_lock (&pool_lock);
if (klass->pool) {
gst_task_pool_cleanup (klass->pool);
gst_object_unref (klass->pool);
}
klass->pool = gst_task_pool_new ();
gst_task_pool_set_func (klass->pool, (GFunc) gst_task_func, klass);
gst_task_pool_prepare (klass->pool, NULL);
g_static_mutex_unlock (&pool_lock);
}
static void
gst_task_class_init (GstTaskClass * klass)
{
@ -107,13 +124,16 @@ gst_task_class_init (GstTaskClass * klass)
gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_task_finalize);
klass->pool = g_thread_pool_new (
(GFunc) gst_task_func, klass, -1, FALSE, NULL);
init_klass_pool (klass);
}
static void
gst_task_init (GstTask * task)
{
GstTaskClass *klass;
klass = GST_TASK_GET_CLASS (task);
task->priv = GST_TASK_GET_PRIVATE (task);
task->running = FALSE;
task->abidata.ABI.thread = NULL;
@ -121,6 +141,12 @@ gst_task_init (GstTask * task)
task->cond = g_cond_new ();
task->state = GST_TASK_STOPPED;
task->priv->prio_set = FALSE;
/* use the default klass pool for this task, users can
* override this later */
g_static_mutex_lock (&pool_lock);
task->priv->pool = gst_object_ref (klass->pool);
g_static_mutex_lock (&pool_lock);
}
static void
@ -260,18 +286,7 @@ gst_task_cleanup_all (void)
GstTaskClass *klass;
if ((klass = g_type_class_peek (GST_TYPE_TASK))) {
g_static_mutex_lock (&pool_lock);
if (klass->pool) {
/* Shut down all the threads, we still process the ones scheduled
* because the unref happens in the thread function.
* Also wait for currently running ones to finish. */
g_thread_pool_free (klass->pool, FALSE, TRUE);
/* create new pool, so we can still do something after this
* call. */
klass->pool = g_thread_pool_new (
(GFunc) gst_task_func, klass, -1, FALSE, NULL);
}
g_static_mutex_unlock (&pool_lock);
init_klass_pool (klass);
}
}
@ -474,9 +489,7 @@ start_task (GstTask * task)
tclass = GST_TASK_GET_CLASS (task);
/* push on the thread pool */
g_static_mutex_lock (&pool_lock);
g_thread_pool_push (tclass->pool, task, &error);
g_static_mutex_unlock (&pool_lock);
priv->id = gst_task_pool_push (priv->pool, task, &error);
if (error != NULL) {
g_warning ("failed to create thread: %s", error->message);
@ -628,7 +641,12 @@ gst_task_pause (GstTask * task)
gboolean
gst_task_join (GstTask * task)
{
GThread *tself;
GThread *tself, *thread;
GstTaskPrivate *priv;
gpointer id;
GstTaskPool *pool = NULL;
priv = task->priv;
g_return_val_if_fail (GST_IS_TASK (task), FALSE);
@ -649,8 +667,21 @@ gst_task_join (GstTask * task)
* to join it here. */
while (G_LIKELY (task->running))
GST_TASK_WAIT (task);
/* clean the thread */
task->abidata.ABI.thread = NULL;
/* get the id and pool to join */
if ((id = priv->id)) {
if ((pool = priv->pool))
gst_object_ref (pool);
priv->id = NULL;
}
GST_OBJECT_UNLOCK (task);
if (pool) {
gst_task_pool_join (pool, id);
gst_object_unref (pool);
}
GST_DEBUG_OBJECT (task, "Joined task %p", task);
return TRUE;

View file

@ -24,6 +24,7 @@
#define __GST_TASK_H__
#include <gst/gstobject.h>
#include <gst/gsttaskpool.h>
G_BEGIN_DECLS
@ -168,7 +169,7 @@ struct _GstTaskClass {
GstObjectClass parent_class;
/*< private >*/
GThreadPool *pool;
GstTaskPool *pool;
/*< private >*/
gpointer _gst_reserved[GST_PADDING];