task: add GDestroyNotify to _new

Add a GDestroyNotify to the user_data we pass to gst_task_new()
Change gst_pad_start_task() to also take the notify
This commit is contained in:
Wim Taymans 2012-06-20 10:31:49 +02:00
parent d7d5306009
commit 76e8b2ecda
13 changed files with 46 additions and 33 deletions

View file

@ -5079,9 +5079,10 @@ static GstTaskThreadCallbacks thr_callbacks = {
* gst_pad_start_task: * gst_pad_start_task:
* @pad: the #GstPad to start the task of * @pad: the #GstPad to start the task of
* @func: the task function to call * @func: the task function to call
* @data: data passed to the task function * @user_data: user data passed to the task function
* @notify: called when @user_data is no longer referenced
* *
* Starts a task that repeatedly calls @func with @data. This function * Starts a task that repeatedly calls @func with @user_data. This function
* is mostly used in pad activation functions to start the dataflow. * is mostly used in pad activation functions to start the dataflow.
* The #GST_PAD_STREAM_LOCK of @pad will automatically be acquired * The #GST_PAD_STREAM_LOCK of @pad will automatically be acquired
* before @func is called. * before @func is called.
@ -5089,7 +5090,8 @@ static GstTaskThreadCallbacks thr_callbacks = {
* Returns: a %TRUE if the task could be started. * Returns: a %TRUE if the task could be started.
*/ */
gboolean gboolean
gst_pad_start_task (GstPad * pad, GstTaskFunction func, gpointer data) gst_pad_start_task (GstPad * pad, GstTaskFunction func, gpointer user_data,
GDestroyNotify notify)
{ {
GstTask *task; GstTask *task;
gboolean res; gboolean res;
@ -5102,7 +5104,7 @@ gst_pad_start_task (GstPad * pad, GstTaskFunction func, gpointer data)
GST_OBJECT_LOCK (pad); GST_OBJECT_LOCK (pad);
task = GST_PAD_TASK (pad); task = GST_PAD_TASK (pad);
if (task == NULL) { if (task == NULL) {
task = gst_task_new (func, data); task = gst_task_new (func, user_data, notify);
gst_task_set_lock (task, GST_PAD_GET_STREAM_LOCK (pad)); gst_task_set_lock (task, GST_PAD_GET_STREAM_LOCK (pad));
gst_task_set_thread_callbacks (task, &thr_callbacks, pad, NULL); gst_task_set_thread_callbacks (task, &thr_callbacks, pad, NULL);
GST_INFO_OBJECT (pad, "created task %p", task); GST_INFO_OBJECT (pad, "created task %p", task);

View file

@ -952,7 +952,7 @@ gboolean gst_pad_send_event (GstPad *pad, GstEvent *event);
/* pad tasks */ /* pad tasks */
gboolean gst_pad_start_task (GstPad *pad, GstTaskFunction func, gboolean gst_pad_start_task (GstPad *pad, GstTaskFunction func,
gpointer data); gpointer user_data, GDestroyNotify notify);
gboolean gst_pad_pause_task (GstPad *pad); gboolean gst_pad_pause_task (GstPad *pad);
gboolean gst_pad_stop_task (GstPad *pad); gboolean gst_pad_stop_task (GstPad *pad);

View file

@ -208,6 +208,9 @@ gst_task_finalize (GObject * object)
priv->thr_notify = NULL; priv->thr_notify = NULL;
priv->thr_user_data = NULL; priv->thr_user_data = NULL;
if (task->notify)
task->notify (task->user_data);
gst_object_unref (priv->pool); gst_object_unref (priv->pool);
/* task thread cannot be running here since it holds a ref /* task thread cannot be running here since it holds a ref
@ -305,7 +308,7 @@ gst_task_func (GstTask * task)
GST_OBJECT_UNLOCK (task); GST_OBJECT_UNLOCK (task);
} }
task->func (task->data); task->func (task->user_data);
} }
done: done:
g_rec_mutex_unlock (lock); g_rec_mutex_unlock (lock);
@ -365,10 +368,11 @@ gst_task_cleanup_all (void)
/** /**
* gst_task_new: * gst_task_new:
* @func: The #GstTaskFunction to use * @func: The #GstTaskFunction to use
* @data: (closure): User data to pass to @func * @user_data: User data to pass to @func
* @notify: the function to call when @user_data is no longer needed.
* *
* Create a new Task that will repeatedly call the provided @func * Create a new Task that will repeatedly call the provided @func
* with @data as a parameter. Typically the task will run in * with @user_data as a parameter. Typically the task will run in
* a new thread. * a new thread.
* *
* The function cannot be changed after the task has been created. You * The function cannot be changed after the task has been created. You
@ -386,13 +390,14 @@ gst_task_cleanup_all (void)
* MT safe. * MT safe.
*/ */
GstTask * GstTask *
gst_task_new (GstTaskFunction func, gpointer data) gst_task_new (GstTaskFunction func, gpointer user_data, GDestroyNotify notify)
{ {
GstTask *task; GstTask *task;
task = g_object_newv (GST_TYPE_TASK, 0, NULL); task = g_object_newv (GST_TYPE_TASK, 0, NULL);
task->func = func; task->func = func;
task->data = data; task->user_data = user_data;
task->notify = notify;
GST_DEBUG ("Created task %p", task); GST_DEBUG ("Created task %p", task);

View file

@ -35,7 +35,7 @@ G_BEGIN_DECLS
* A function that will repeatedly be called in the thread created by * A function that will repeatedly be called in the thread created by
* a #GstTask. * a #GstTask.
*/ */
typedef void (*GstTaskFunction) (void *user_data); typedef void (*GstTaskFunction) (gpointer user_data);
/* --- standard type macros --- */ /* --- standard type macros --- */
#define GST_TYPE_TASK (gst_task_get_type ()) #define GST_TYPE_TASK (gst_task_get_type ())
@ -134,7 +134,8 @@ typedef struct {
* @cond: used to pause/resume the task * @cond: used to pause/resume the task
* @lock: The lock taken when iterating the task function * @lock: The lock taken when iterating the task function
* @func: the function executed by this task * @func: the function executed by this task
* @data: data passed to the task function * @user_data: user_data passed to the task function
* @notify: GDestroyNotify for @user_data
* @running: a flag indicating that the task is running * @running: a flag indicating that the task is running
* *
* The #GstTask object. * The #GstTask object.
@ -149,7 +150,8 @@ struct _GstTask {
GRecMutex *lock; GRecMutex *lock;
GstTaskFunction func; GstTaskFunction func;
gpointer data; gpointer user_data;
GDestroyNotify notify;
gboolean running; gboolean running;
@ -175,7 +177,9 @@ void gst_task_cleanup_all (void);
GType gst_task_get_type (void); GType gst_task_get_type (void);
GstTask* gst_task_new (GstTaskFunction func, gpointer data); GstTask* gst_task_new (GstTaskFunction func,
gpointer user_data, GDestroyNotify notify);
void gst_task_set_lock (GstTask *task, GRecMutex *mutex); void gst_task_set_lock (GstTask *task, GRecMutex *mutex);
GstTaskPool * gst_task_get_pool (GstTask *task); GstTaskPool * gst_task_get_pool (GstTask *task);

View file

@ -2986,7 +2986,7 @@ gst_base_parse_sink_activate (GstPad * sinkpad, GstObject * parent)
goto baseparse_push; goto baseparse_push;
return gst_pad_start_task (sinkpad, (GstTaskFunction) gst_base_parse_loop, return gst_pad_start_task (sinkpad, (GstTaskFunction) gst_base_parse_loop,
sinkpad); sinkpad, NULL);
/* fallback */ /* fallback */
baseparse_push: baseparse_push:
{ {
@ -3916,7 +3916,7 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event)
/* Start streaming thread if paused */ /* Start streaming thread if paused */
gst_pad_start_task (parse->sinkpad, gst_pad_start_task (parse->sinkpad,
(GstTaskFunction) gst_base_parse_loop, parse->sinkpad); (GstTaskFunction) gst_base_parse_loop, parse->sinkpad, NULL);
GST_PAD_STREAM_UNLOCK (parse->sinkpad); GST_PAD_STREAM_UNLOCK (parse->sinkpad);

View file

@ -3828,7 +3828,7 @@ gst_base_sink_default_activate_pull (GstBaseSink * basesink, gboolean active)
if (active) { if (active) {
/* start task */ /* start task */
result = gst_pad_start_task (basesink->sinkpad, result = gst_pad_start_task (basesink->sinkpad,
(GstTaskFunction) gst_base_sink_loop, basesink->sinkpad); (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad, NULL);
} else { } else {
/* step 2, make sure streaming finishes */ /* step 2, make sure streaming finishes */
result = gst_pad_stop_task (basesink->sinkpad); result = gst_pad_stop_task (basesink->sinkpad);

View file

@ -1645,7 +1645,7 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
/* and restart the task in case it got paused explicitly or by /* and restart the task in case it got paused explicitly or by
* the FLUSH_START event we pushed out. */ * the FLUSH_START event we pushed out. */
tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop, tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
src->srcpad); src->srcpad, NULL);
if (res && !tres) if (res && !tres)
res = FALSE; res = FALSE;
@ -3398,7 +3398,7 @@ gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
GST_OBJECT_UNLOCK (basesrc->srcpad); GST_OBJECT_UNLOCK (basesrc->srcpad);
if (start) if (start)
gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop, gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop,
basesrc->srcpad); basesrc->srcpad, NULL);
GST_DEBUG_OBJECT (basesrc, "signal"); GST_DEBUG_OBJECT (basesrc, "signal");
GST_LIVE_SIGNAL (basesrc); GST_LIVE_SIGNAL (basesrc);
} }

View file

@ -794,7 +794,7 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id); GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
result = result =
gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop, gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop,
sq->srcpad); sq->srcpad, NULL);
} }
return result; return result;
} }

View file

@ -735,7 +735,7 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
queue->eos = FALSE; queue->eos = FALSE;
queue->unexpected = FALSE; queue->unexpected = FALSE;
gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop, gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
queue->srcpad); queue->srcpad, NULL);
GST_QUEUE_MUTEX_UNLOCK (queue); GST_QUEUE_MUTEX_UNLOCK (queue);
STATUS (queue, pad, "after flush"); STATUS (queue, pad, "after flush");
@ -1314,7 +1314,8 @@ gst_queue_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
queue->eos = FALSE; queue->eos = FALSE;
queue->unexpected = FALSE; queue->unexpected = FALSE;
result = result =
gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad); gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad,
NULL);
GST_QUEUE_MUTEX_UNLOCK (queue); GST_QUEUE_MUTEX_UNLOCK (queue);
} else { } else {
/* step 1, unblock loop function */ /* step 1, unblock loop function */

View file

@ -2139,7 +2139,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
/* reset rate counters */ /* reset rate counters */
reset_rate_timer (queue); reset_rate_timer (queue);
gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop, gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
queue->srcpad); queue->srcpad, NULL);
GST_QUEUE2_MUTEX_UNLOCK (queue); GST_QUEUE2_MUTEX_UNLOCK (queue);
} else { } else {
GST_QUEUE2_MUTEX_LOCK (queue); GST_QUEUE2_MUTEX_LOCK (queue);
@ -2950,7 +2950,8 @@ gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
queue->sinkresult = GST_FLOW_OK; queue->sinkresult = GST_FLOW_OK;
queue->is_eos = FALSE; queue->is_eos = FALSE;
queue->unexpected = FALSE; queue->unexpected = FALSE;
result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad); result =
gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad, NULL);
GST_QUEUE2_MUTEX_UNLOCK (queue); GST_QUEUE2_MUTEX_UNLOCK (queue);
} else { } else {
/* unblock loop function */ /* unblock loop function */

View file

@ -460,7 +460,7 @@ gst_type_find_element_seek (GstTypeFindElement * typefind, GstEvent * event)
gst_segment_do_seek (&seeksegment, rate, format, flags, gst_segment_do_seek (&seeksegment, rate, format, flags,
cur_type, cur, stop_type, stop, NULL); cur_type, cur, stop_type, stop, NULL);
flush = !!(flags & GST_SEEK_FLAG_FLUSH); flush = ! !(flags & GST_SEEK_FLAG_FLUSH);
GST_DEBUG_OBJECT (typefind, "New segment %" GST_SEGMENT_FORMAT, &seeksegment); GST_DEBUG_OBJECT (typefind, "New segment %" GST_SEGMENT_FORMAT, &seeksegment);
@ -504,7 +504,7 @@ gst_type_find_element_seek (GstTypeFindElement * typefind, GstEvent * event)
/* restart our task since it might have been stopped when we did the /* restart our task since it might have been stopped when we did the
* flush. */ * flush. */
gst_pad_start_task (typefind->sink, gst_pad_start_task (typefind->sink,
(GstTaskFunction) gst_type_find_element_loop, typefind->sink); (GstTaskFunction) gst_type_find_element_loop, typefind->sink, NULL);
/* streaming can continue now */ /* streaming can continue now */
GST_PAD_STREAM_UNLOCK (typefind->sink); GST_PAD_STREAM_UNLOCK (typefind->sink);
@ -1218,7 +1218,7 @@ gst_type_find_element_activate_sink (GstPad * pad, GstObject * parent)
/* only start our task if we ourselves decide to start in pull mode */ /* only start our task if we ourselves decide to start in pull mode */
return gst_pad_start_task (pad, (GstTaskFunction) gst_type_find_element_loop, return gst_pad_start_task (pad, (GstTaskFunction) gst_type_find_element_loop,
pad); pad, NULL);
typefind_push: typefind_push:
{ {

View file

@ -208,7 +208,7 @@ GST_START_TEST (test_parsing)
/* create a task with some dummy function, we're not actually going to run /* create a task with some dummy function, we're not actually going to run
* the task here */ * the task here */
task = gst_task_new ((GstTaskFunction) gst_object_unref, NULL); task = gst_task_new ((GstTaskFunction) gst_object_unref, NULL, NULL);
ASSERT_OBJECT_REFCOUNT (task, "task", 1); ASSERT_OBJECT_REFCOUNT (task, "task", 1);

View file

@ -46,7 +46,7 @@ GST_START_TEST (test_join)
GstTask *t; GstTask *t;
gboolean ret; gboolean ret;
t = gst_task_new (task_func2, &t); t = gst_task_new (task_func2, &t, NULL);
fail_if (t == NULL); fail_if (t == NULL);
g_rec_mutex_init (&task_mutex); g_rec_mutex_init (&task_mutex);
@ -90,7 +90,7 @@ GST_START_TEST (test_lock_start)
GstTask *t; GstTask *t;
gboolean ret; gboolean ret;
t = gst_task_new (task_func, NULL); t = gst_task_new (task_func, NULL, NULL);
fail_if (t == NULL); fail_if (t == NULL);
g_rec_mutex_init (&task_mutex); g_rec_mutex_init (&task_mutex);
@ -128,7 +128,7 @@ GST_START_TEST (test_lock)
GstTask *t; GstTask *t;
gboolean ret; gboolean ret;
t = gst_task_new (task_func, NULL); t = gst_task_new (task_func, NULL, NULL);
fail_if (t == NULL); fail_if (t == NULL);
g_rec_mutex_init (&task_mutex); g_rec_mutex_init (&task_mutex);
@ -156,7 +156,7 @@ GST_START_TEST (test_no_lock)
GstTask *t; GstTask *t;
gboolean ret; gboolean ret;
t = gst_task_new (task_func, NULL); t = gst_task_new (task_func, NULL, NULL);
fail_if (t == NULL); fail_if (t == NULL);
/* stop should be possible without lock */ /* stop should be possible without lock */
@ -182,7 +182,7 @@ GST_START_TEST (test_create)
{ {
GstTask *t; GstTask *t;
t = gst_task_new (task_func, NULL); t = gst_task_new (task_func, NULL, NULL);
fail_if (t == NULL); fail_if (t == NULL);
gst_object_unref (t); gst_object_unref (t);