check/generic/states.c: Make sure all tasks are stopped.

Original commit message from CVS:
* check/generic/states.c: (GST_START_TEST):
Make sure all tasks are stopped.

* check/gst/gstbin.c: (GST_START_TEST):
Unref after usage for proper valgrinding.

* gst/gstpad.c: (gst_pad_finalize), (gst_pad_stop_task):
Really wait for the task to stop before destroying the
mutex.

* gst/gstqueue.c: (gst_queue_sink_activate_push),
(gst_queue_src_activate_push):
Small cleanups. Don't stop the task when we did not start
it.

* gst/gsttask.c: (gst_task_get_type), (gst_task_init),
(gst_task_func), (gst_task_cleanup_all), (gst_task_set_lock),
(gst_task_get_state), (gst_task_start), (gst_task_pause),
(gst_task_join):
* gst/gsttask.h:
Protect the stream lock with the object lock.
Disallow setting the stream lock when running.
Add cleanup_all to wait for the threadpool to finish.
Remove code to autoallocate a mutex if none was provided.
Add _join() to wait for a task to stop.
Protect the thread pool with a global lock.
This commit is contained in:
Wim Taymans 2005-08-24 20:49:53 +00:00
parent ec8ec3da8a
commit 1e40e471d9
10 changed files with 219 additions and 40 deletions

View file

@ -1,3 +1,32 @@
2005-08-24 Wim Taymans <wim@fluendo.com>
* check/generic/states.c: (GST_START_TEST):
Make sure all tasks are stopped.
* check/gst/gstbin.c: (GST_START_TEST):
Unref after usage for proper valgrinding.
* gst/gstpad.c: (gst_pad_finalize), (gst_pad_stop_task):
Really wait for the task to stop before destroying the
mutex.
* gst/gstqueue.c: (gst_queue_sink_activate_push),
(gst_queue_src_activate_push):
Small cleanups. Don't stop the task when we did not start
it.
* gst/gsttask.c: (gst_task_get_type), (gst_task_init),
(gst_task_func), (gst_task_cleanup_all), (gst_task_set_lock),
(gst_task_get_state), (gst_task_start), (gst_task_pause),
(gst_task_join):
* gst/gsttask.h:
Protect the stream lock with the object lock.
Disallow setting the stream lock when running.
Add cleanup_all to wait for the threadpool to finish.
Remove code to autoallocate a mutex if none was provided.
Add _join() to wait for a task to stop.
Protect the thread pool with a global lock.
2005-08-24 Wim Taymans <wim@fluendo.com>
* gst/base/gstbasesink.c: (gst_base_sink_handle_object),

View file

@ -50,6 +50,8 @@ GST_START_TEST (test_state_changes)
gst_element_set_state (element, GST_STATE_PAUSED);
gst_element_set_state (element, GST_STATE_NULL);
gst_object_unref (GST_OBJECT (element));
gst_task_cleanup_all ();
}
}

View file

@ -423,6 +423,10 @@ GST_START_TEST (test_add_linked)
/* check if pads really are linked */
fail_unless (gst_pad_is_linked (srcpad));
fail_unless (gst_pad_is_linked (sinkpad));
gst_object_unref (srcpad);
gst_object_unref (sinkpad);
gst_object_unref (pipeline);
}
GST_END_TEST;

View file

@ -295,6 +295,14 @@ static void
gst_pad_finalize (GObject * object)
{
GstPad *pad = GST_PAD (object);
GstTask *task;
/* in case the task is still around, clean it up */
if ((task = GST_PAD_TASK (pad))) {
gst_task_join (task);
GST_PAD_TASK (pad) = NULL;
gst_object_unref (task);
}
if (pad->stream_rec_lock) {
g_static_rec_mutex_free (pad->stream_rec_lock);
@ -1878,8 +1886,8 @@ gst_pad_default_fixate (GQuark field_id, const GValue * value, gpointer data)
* @pad: a #GstPad to fixate
* @caps: the #GstCaps to fixate
*
* Fixate a caps on the given pad. Modifies the caps in place, so you should be
* that the caps are actually writable (see gst_caps_make_writable()).
* Fixate a caps on the given pad. Modifies the caps in place, so you should
* make sure that the caps are actually writable (see gst_caps_make_writable()).
*/
void
gst_pad_fixate_caps (GstPad * pad, GstCaps * caps)
@ -3759,7 +3767,11 @@ no_task:
* @pad: the #GstPad to stop the task of
*
* Stop the task of @pad. This function will also make sure that the
* function executed by the task will effectively stop.
* function executed by the task will effectively stop if not called
* from the GstTaskFunction.
*
* This function will deadlock if called from the GstTaskFunction of
* the task. Use #gst_task_pause() instead.
*
* Returns: a TRUE if the task could be stopped or FALSE when the pad
* has no task.
@ -3782,6 +3794,8 @@ gst_pad_stop_task (GstPad * pad)
GST_STREAM_LOCK (pad);
GST_STREAM_UNLOCK (pad);
gst_task_join (task);
gst_object_unref (task);
return TRUE;

View file

@ -927,25 +927,27 @@ gst_queue_handle_src_query (GstPad * pad, GstQuery * query)
static gboolean
gst_queue_sink_activate_push (GstPad * pad, gboolean active)
{
gboolean result = FALSE;
gboolean result = TRUE;
GstQueue *queue;
queue = GST_QUEUE (gst_pad_get_parent (pad));
if (active) {
GST_QUEUE_MUTEX_LOCK (queue);
queue->srcresult = GST_FLOW_OK;
result = TRUE;
GST_QUEUE_MUTEX_UNLOCK (queue);
} else {
/* step 1, unblock chain and loop functions */
/* step 1, unblock chain function */
GST_QUEUE_MUTEX_LOCK (queue);
queue->srcresult = GST_FLOW_WRONG_STATE;
gst_queue_locked_flush (queue);
g_cond_signal (queue->item_del);
GST_QUEUE_MUTEX_UNLOCK (queue);
/* step 2, make sure streaming finishes */
result = gst_pad_stop_task (pad);
/* and make sure the chain function finishes */
GST_STREAM_LOCK (pad);
GST_STREAM_UNLOCK (pad);
}
gst_object_unref (queue);
return result;
@ -971,9 +973,10 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active)
}
GST_QUEUE_MUTEX_UNLOCK (queue);
} else {
/* step 1, unblock chain and loop functions */
/* step 1, unblock loop function */
GST_QUEUE_MUTEX_LOCK (queue);
queue->srcresult = GST_FLOW_WRONG_STATE;
/* the item add signal will unblock */
g_cond_signal (queue->item_add);
GST_QUEUE_MUTEX_UNLOCK (queue);

View file

@ -25,6 +25,9 @@
#include "gstinfo.h"
#include "gsttask.h"
GST_DEBUG_CATEGORY (task_debug);
#define GST_CAT_DEFAULT (task_debug)
static void gst_task_class_init (GstTaskClass * klass);
static void gst_task_init (GstTask * task);
static void gst_task_finalize (GObject * object);
@ -33,6 +36,8 @@ static void gst_task_func (GstTask * task, GstTaskClass * tclass);
static GstObjectClass *parent_class = NULL;
static GStaticMutex pool_lock = G_STATIC_MUTEX_INIT;
GType
gst_task_get_type (void)
{
@ -54,6 +59,8 @@ gst_task_get_type (void)
_gst_task_type =
g_type_register_static (GST_TYPE_OBJECT, "GstTask", &task_info, 0);
GST_DEBUG_CATEGORY_INIT (task_debug, "task", 0, "Processing tasks");
}
return _gst_task_type;
}
@ -76,6 +83,7 @@ gst_task_class_init (GstTaskClass * klass)
static void
gst_task_init (GstTask * task)
{
task->running = FALSE;
task->lock = NULL;
task->cond = g_cond_new ();
task->state = GST_TASK_STOPPED;
@ -97,16 +105,28 @@ gst_task_finalize (GObject * object)
static void
gst_task_func (GstTask * task, GstTaskClass * tclass)
{
GStaticRecMutex *lock;
GST_DEBUG ("Entering task %p, thread %p", task, g_thread_self ());
/* we have to grab the lock to get the mutex. We also
* mark our state running so that nobody can mess with
* the mutex. */
GST_LOCK (task);
if (task->state == GST_TASK_STOPPED)
goto exit;
lock = GST_TASK_GET_LOCK (task);
task->running = TRUE;
GST_UNLOCK (task);
/* locking order is TASK_LOCK, LOCK */
GST_TASK_LOCK (task);
g_static_rec_mutex_lock (lock);
GST_LOCK (task);
while (G_LIKELY (task->state != GST_TASK_STOPPED)) {
while (G_UNLIKELY (task->state == GST_TASK_PAUSED)) {
gint t;
t = GST_TASK_UNLOCK_FULL (task);
t = g_static_rec_mutex_unlock_full (lock);
if (t <= 0) {
g_warning ("wrong STREAM_LOCK count %d", t);
}
@ -115,7 +135,7 @@ gst_task_func (GstTask * task, GstTaskClass * tclass)
GST_UNLOCK (task);
/* locking order.. */
if (t > 0)
GST_TASK_LOCK_FULL (task, t);
g_static_rec_mutex_lock_full (lock, t);
GST_LOCK (task);
if (task->state == GST_TASK_STOPPED)
@ -129,13 +149,49 @@ gst_task_func (GstTask * task, GstTaskClass * tclass)
}
done:
GST_UNLOCK (task);
GST_TASK_UNLOCK (task);
g_static_rec_mutex_unlock (lock);
/* now we allow messing with the lock again */
GST_LOCK (task);
task->running = FALSE;
exit:
GST_TASK_SIGNAL (task);
GST_UNLOCK (task);
GST_DEBUG ("Exit task %p, thread %p", task, g_thread_self ());
gst_object_unref (task);
}
/**
* gst_task_cleanup_all:
*
* Wait for all tasks to be stopped. This is mainly used internally
* to ensure proper cleanup of internal datastructures in testsuites.
*
* MT safe.
*/
void
gst_task_cleanup_all (void)
{
GstTaskClass *klass;
if ((klass = g_type_class_peek (GST_TYPE_TASK))) {
g_static_mutex_lock (&pool_lock);
if (klass->pool) {
/* Shut down all the threads, we still process the ones scheduled
* because the unref happens in the thread function.
* Also wait for currently running ones to finish. */
g_thread_pool_free (klass->pool, FALSE, TRUE);
/* create new pool, so we can still do something after this
* call. */
klass->pool = g_thread_pool_new (
(GFunc) gst_task_func, klass, -1, FALSE, NULL);
}
g_static_mutex_unlock (&pool_lock);
}
}
/**
* gst_task_create:
* @func: The #GstTaskFunction to use
@ -176,8 +232,19 @@ void
gst_task_set_lock (GstTask * task, GStaticRecMutex * mutex)
{
GST_LOCK (task);
task->lock = mutex;
if (task->running)
goto is_running;
GST_TASK_GET_LOCK (task) = mutex;
GST_UNLOCK (task);
return;
/* ERRORS */
is_running:
{
g_warning ("cannot call set_lock on a running task");
GST_UNLOCK (task);
}
}
@ -218,39 +285,51 @@ gst_task_get_state (GstTask * task)
gboolean
gst_task_start (GstTask * task)
{
GstTaskClass *tclass;
GstTaskState old;
GStaticRecMutex *lock;
g_return_val_if_fail (GST_IS_TASK (task), FALSE);
tclass = GST_TASK_GET_CLASS (task);
GST_DEBUG_OBJECT (task, "Starting task %p", task);
GST_LOCK (task);
if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL)) {
lock = g_new (GStaticRecMutex, 1);
g_static_rec_mutex_init (lock);
GST_TASK_GET_LOCK (task) = lock;
}
if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL))
goto no_lock;
old = task->state;
task->state = GST_TASK_STARTED;
switch (old) {
case GST_TASK_STOPPED:
{
GstTaskClass *tclass;
tclass = GST_TASK_GET_CLASS (task);
/* new task, push on threadpool. We ref before so
* that it remains alive while on the threadpool. */
gst_object_ref (task);
g_static_mutex_lock (&pool_lock);
g_thread_pool_push (tclass->pool, task, NULL);
g_static_mutex_unlock (&pool_lock);
break;
}
case GST_TASK_PAUSED:
/* PAUSE to PLAY, signal */
GST_TASK_SIGNAL (task);
break;
case GST_TASK_STARTED:
/* was OK */
break;
}
GST_UNLOCK (task);
return TRUE;
/* ERRORS */
no_lock:
{
g_warning ("starting task without a lock");
return FALSE;
}
}
/**
@ -305,13 +384,10 @@ gst_task_stop (GstTask * task)
gboolean
gst_task_pause (GstTask * task)
{
GstTaskClass *tclass;
GstTaskState old;
g_return_val_if_fail (GST_IS_TASK (task), FALSE);
tclass = GST_TASK_GET_CLASS (task);
GST_DEBUG_OBJECT (task, "Pausing task %p", task);
GST_LOCK (task);
@ -319,9 +395,17 @@ gst_task_pause (GstTask * task)
task->state = GST_TASK_PAUSED;
switch (old) {
case GST_TASK_STOPPED:
{
GstTaskClass *tclass;
tclass = GST_TASK_GET_CLASS (task);
gst_object_ref (task);
g_static_mutex_lock (&pool_lock);
g_thread_pool_push (tclass->pool, task, NULL);
g_static_mutex_unlock (&pool_lock);
break;
}
case GST_TASK_PAUSED:
break;
case GST_TASK_STARTED:
@ -331,3 +415,35 @@ gst_task_pause (GstTask * task)
return TRUE;
}
/**
* gst_task_join:
* @task: The #GstTask to join
*
* Joins @task. After this call, it is safe to unref the task
* and clean up the lock set with #gst_task_set_lock().
*
* The task will automatically be stopped with this call.
*
* This function cannot be called from within a task function.
*
* Returns: TRUE if the task could be joined.
*
* MT safe.
*/
gboolean
gst_task_join (GstTask * task)
{
g_return_val_if_fail (GST_IS_TASK (task), FALSE);
GST_DEBUG_OBJECT (task, "Joining task %p", task);
GST_LOCK (task);
task->state = GST_TASK_STOPPED;
GST_TASK_SIGNAL (task);
while (task->running)
GST_TASK_WAIT (task);
GST_UNLOCK (task);
return TRUE;
}

View file

@ -55,10 +55,6 @@ typedef enum {
#define GST_TASK_BROADCAST(task) g_cond_breadcast(GST_TASK_GET_COND (task))
#define GST_TASK_GET_LOCK(task) (GST_TASK_CAST(task)->lock)
#define GST_TASK_LOCK(task) g_static_rec_mutex_lock(GST_TASK_GET_LOCK(task))
#define GST_TASK_UNLOCK(task) g_static_rec_mutex_unlock(GST_TASK_GET_LOCK(task))
#define GST_TASK_UNLOCK_FULL(task) g_static_rec_mutex_unlock_full(GST_TASK_GET_LOCK(task))
#define GST_TASK_LOCK_FULL(task,t) g_static_rec_mutex_lock_full(GST_TASK_GET_LOCK(task),(t))
struct _GstTask {
GstObject object;
@ -72,6 +68,8 @@ struct _GstTask {
GstTaskFunction func;
gpointer data;
gboolean running;
/*< private >*/
gpointer _gst_reserved[GST_PADDING];
};
@ -86,6 +84,8 @@ struct _GstTaskClass {
gpointer _gst_reserved[GST_PADDING];
};
void gst_task_cleanup_all (void);
GType gst_task_get_type (void);
GstTask* gst_task_create (GstTaskFunction func, gpointer data);
@ -97,6 +97,8 @@ gboolean gst_task_start (GstTask *task);
gboolean gst_task_stop (GstTask *task);
gboolean gst_task_pause (GstTask *task);
gboolean gst_task_join (GstTask *task);
G_END_DECLS
#endif /* __GST_TASK_H__ */

View file

@ -927,25 +927,27 @@ gst_queue_handle_src_query (GstPad * pad, GstQuery * query)
static gboolean
gst_queue_sink_activate_push (GstPad * pad, gboolean active)
{
gboolean result = FALSE;
gboolean result = TRUE;
GstQueue *queue;
queue = GST_QUEUE (gst_pad_get_parent (pad));
if (active) {
GST_QUEUE_MUTEX_LOCK (queue);
queue->srcresult = GST_FLOW_OK;
result = TRUE;
GST_QUEUE_MUTEX_UNLOCK (queue);
} else {
/* step 1, unblock chain and loop functions */
/* step 1, unblock chain function */
GST_QUEUE_MUTEX_LOCK (queue);
queue->srcresult = GST_FLOW_WRONG_STATE;
gst_queue_locked_flush (queue);
g_cond_signal (queue->item_del);
GST_QUEUE_MUTEX_UNLOCK (queue);
/* step 2, make sure streaming finishes */
result = gst_pad_stop_task (pad);
/* and make sure the chain function finishes */
GST_STREAM_LOCK (pad);
GST_STREAM_UNLOCK (pad);
}
gst_object_unref (queue);
return result;
@ -971,9 +973,10 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active)
}
GST_QUEUE_MUTEX_UNLOCK (queue);
} else {
/* step 1, unblock chain and loop functions */
/* step 1, unblock loop function */
GST_QUEUE_MUTEX_LOCK (queue);
queue->srcresult = GST_FLOW_WRONG_STATE;
/* the item add signal will unblock */
g_cond_signal (queue->item_add);
GST_QUEUE_MUTEX_UNLOCK (queue);

View file

@ -50,6 +50,8 @@ GST_START_TEST (test_state_changes)
gst_element_set_state (element, GST_STATE_PAUSED);
gst_element_set_state (element, GST_STATE_NULL);
gst_object_unref (GST_OBJECT (element));
gst_task_cleanup_all ();
}
}

View file

@ -423,6 +423,10 @@ GST_START_TEST (test_add_linked)
/* check if pads really are linked */
fail_unless (gst_pad_is_linked (srcpad));
fail_unless (gst_pad_is_linked (sinkpad));
gst_object_unref (srcpad);
gst_object_unref (sinkpad);
gst_object_unref (pipeline);
}
GST_END_TEST;