From 76e8b2ecdad9f5e09c2c9b32f0c679a8b6638431 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 20 Jun 2012 10:31:49 +0200 Subject: [PATCH] task: add GDestroyNotify to _new Add a GDestroyNotify to the user_data we pass to gst_task_new() Change gst_pad_start_task() to also take the notify --- gst/gstpad.c | 10 ++++++---- gst/gstpad.h | 2 +- gst/gsttask.c | 15 ++++++++++----- gst/gsttask.h | 12 ++++++++---- libs/gst/base/gstbaseparse.c | 4 ++-- libs/gst/base/gstbasesink.c | 2 +- libs/gst/base/gstbasesrc.c | 4 ++-- plugins/elements/gstmultiqueue.c | 2 +- plugins/elements/gstqueue.c | 5 +++-- plugins/elements/gstqueue2.c | 5 +++-- plugins/elements/gsttypefindelement.c | 6 +++--- tests/check/gst/gstmessage.c | 2 +- tests/check/gst/gsttask.c | 10 +++++----- 13 files changed, 46 insertions(+), 33 deletions(-) diff --git a/gst/gstpad.c b/gst/gstpad.c index 6ec225e2a8..4d6124d8a4 100644 --- a/gst/gstpad.c +++ b/gst/gstpad.c @@ -5079,9 +5079,10 @@ static GstTaskThreadCallbacks thr_callbacks = { * gst_pad_start_task: * @pad: the #GstPad to start the task of * @func: the task function to call - * @data: data passed to the task function + * @user_data: user data passed to the task function + * @notify: called when @user_data is no longer referenced * - * Starts a task that repeatedly calls @func with @data. This function + * Starts a task that repeatedly calls @func with @user_data. This function * is mostly used in pad activation functions to start the dataflow. * The #GST_PAD_STREAM_LOCK of @pad will automatically be acquired * before @func is called. @@ -5089,7 +5090,8 @@ static GstTaskThreadCallbacks thr_callbacks = { * Returns: a %TRUE if the task could be started. */ gboolean -gst_pad_start_task (GstPad * pad, GstTaskFunction func, gpointer data) +gst_pad_start_task (GstPad * pad, GstTaskFunction func, gpointer user_data, + GDestroyNotify notify) { GstTask *task; gboolean res; @@ -5102,7 +5104,7 @@ gst_pad_start_task (GstPad * pad, GstTaskFunction func, gpointer data) GST_OBJECT_LOCK (pad); task = GST_PAD_TASK (pad); if (task == NULL) { - task = gst_task_new (func, data); + task = gst_task_new (func, user_data, notify); gst_task_set_lock (task, GST_PAD_GET_STREAM_LOCK (pad)); gst_task_set_thread_callbacks (task, &thr_callbacks, pad, NULL); GST_INFO_OBJECT (pad, "created task %p", task); diff --git a/gst/gstpad.h b/gst/gstpad.h index 2281ed7ec6..1e049a70e1 100644 --- a/gst/gstpad.h +++ b/gst/gstpad.h @@ -952,7 +952,7 @@ gboolean gst_pad_send_event (GstPad *pad, GstEvent *event); /* pad tasks */ gboolean gst_pad_start_task (GstPad *pad, GstTaskFunction func, - gpointer data); + gpointer user_data, GDestroyNotify notify); gboolean gst_pad_pause_task (GstPad *pad); gboolean gst_pad_stop_task (GstPad *pad); diff --git a/gst/gsttask.c b/gst/gsttask.c index 96c687b771..3900850d2a 100644 --- a/gst/gsttask.c +++ b/gst/gsttask.c @@ -208,6 +208,9 @@ gst_task_finalize (GObject * object) priv->thr_notify = NULL; priv->thr_user_data = NULL; + if (task->notify) + task->notify (task->user_data); + gst_object_unref (priv->pool); /* task thread cannot be running here since it holds a ref @@ -305,7 +308,7 @@ gst_task_func (GstTask * task) GST_OBJECT_UNLOCK (task); } - task->func (task->data); + task->func (task->user_data); } done: g_rec_mutex_unlock (lock); @@ -365,10 +368,11 @@ gst_task_cleanup_all (void) /** * gst_task_new: * @func: The #GstTaskFunction to use - * @data: (closure): User data to pass to @func + * @user_data: User data to pass to @func + * @notify: the function to call when @user_data is no longer needed. * * Create a new Task that will repeatedly call the provided @func - * with @data as a parameter. Typically the task will run in + * with @user_data as a parameter. Typically the task will run in * a new thread. * * The function cannot be changed after the task has been created. You @@ -386,13 +390,14 @@ gst_task_cleanup_all (void) * MT safe. */ GstTask * -gst_task_new (GstTaskFunction func, gpointer data) +gst_task_new (GstTaskFunction func, gpointer user_data, GDestroyNotify notify) { GstTask *task; task = g_object_newv (GST_TYPE_TASK, 0, NULL); task->func = func; - task->data = data; + task->user_data = user_data; + task->notify = notify; GST_DEBUG ("Created task %p", task); diff --git a/gst/gsttask.h b/gst/gsttask.h index 7f95d7751e..c8a6fabd2a 100644 --- a/gst/gsttask.h +++ b/gst/gsttask.h @@ -35,7 +35,7 @@ G_BEGIN_DECLS * A function that will repeatedly be called in the thread created by * a #GstTask. */ -typedef void (*GstTaskFunction) (void *user_data); +typedef void (*GstTaskFunction) (gpointer user_data); /* --- standard type macros --- */ #define GST_TYPE_TASK (gst_task_get_type ()) @@ -134,7 +134,8 @@ typedef struct { * @cond: used to pause/resume the task * @lock: The lock taken when iterating the task function * @func: the function executed by this task - * @data: data passed to the task function + * @user_data: user_data passed to the task function + * @notify: GDestroyNotify for @user_data * @running: a flag indicating that the task is running * * The #GstTask object. @@ -149,7 +150,8 @@ struct _GstTask { GRecMutex *lock; GstTaskFunction func; - gpointer data; + gpointer user_data; + GDestroyNotify notify; gboolean running; @@ -175,7 +177,9 @@ void gst_task_cleanup_all (void); GType gst_task_get_type (void); -GstTask* gst_task_new (GstTaskFunction func, gpointer data); +GstTask* gst_task_new (GstTaskFunction func, + gpointer user_data, GDestroyNotify notify); + void gst_task_set_lock (GstTask *task, GRecMutex *mutex); GstTaskPool * gst_task_get_pool (GstTask *task); diff --git a/libs/gst/base/gstbaseparse.c b/libs/gst/base/gstbaseparse.c index e2f9d97bbe..1ec76d5397 100644 --- a/libs/gst/base/gstbaseparse.c +++ b/libs/gst/base/gstbaseparse.c @@ -2986,7 +2986,7 @@ gst_base_parse_sink_activate (GstPad * sinkpad, GstObject * parent) goto baseparse_push; return gst_pad_start_task (sinkpad, (GstTaskFunction) gst_base_parse_loop, - sinkpad); + sinkpad, NULL); /* fallback */ baseparse_push: { @@ -3916,7 +3916,7 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event) /* Start streaming thread if paused */ gst_pad_start_task (parse->sinkpad, - (GstTaskFunction) gst_base_parse_loop, parse->sinkpad); + (GstTaskFunction) gst_base_parse_loop, parse->sinkpad, NULL); GST_PAD_STREAM_UNLOCK (parse->sinkpad); diff --git a/libs/gst/base/gstbasesink.c b/libs/gst/base/gstbasesink.c index c770472fe9..ece8a8dfc7 100644 --- a/libs/gst/base/gstbasesink.c +++ b/libs/gst/base/gstbasesink.c @@ -3828,7 +3828,7 @@ gst_base_sink_default_activate_pull (GstBaseSink * basesink, gboolean active) if (active) { /* start task */ result = gst_pad_start_task (basesink->sinkpad, - (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad); + (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad, NULL); } else { /* step 2, make sure streaming finishes */ result = gst_pad_stop_task (basesink->sinkpad); diff --git a/libs/gst/base/gstbasesrc.c b/libs/gst/base/gstbasesrc.c index 2c77b7cb60..050c0bb555 100644 --- a/libs/gst/base/gstbasesrc.c +++ b/libs/gst/base/gstbasesrc.c @@ -1645,7 +1645,7 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock) /* and restart the task in case it got paused explicitly or by * the FLUSH_START event we pushed out. */ tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop, - src->srcpad); + src->srcpad, NULL); if (res && !tres) res = FALSE; @@ -3398,7 +3398,7 @@ gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play) GST_OBJECT_UNLOCK (basesrc->srcpad); if (start) gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop, - basesrc->srcpad); + basesrc->srcpad, NULL); GST_DEBUG_OBJECT (basesrc, "signal"); GST_LIVE_SIGNAL (basesrc); } diff --git a/plugins/elements/gstmultiqueue.c b/plugins/elements/gstmultiqueue.c index bd3b69cc0d..445cc717de 100644 --- a/plugins/elements/gstmultiqueue.c +++ b/plugins/elements/gstmultiqueue.c @@ -794,7 +794,7 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush) GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id); result = gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop, - sq->srcpad); + sq->srcpad, NULL); } return result; } diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index aa494be123..f1438e9ec2 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -735,7 +735,7 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) queue->eos = FALSE; queue->unexpected = FALSE; gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop, - queue->srcpad); + queue->srcpad, NULL); GST_QUEUE_MUTEX_UNLOCK (queue); STATUS (queue, pad, "after flush"); @@ -1314,7 +1314,8 @@ gst_queue_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode, queue->eos = FALSE; queue->unexpected = FALSE; result = - gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad); + gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad, + NULL); GST_QUEUE_MUTEX_UNLOCK (queue); } else { /* step 1, unblock loop function */ diff --git a/plugins/elements/gstqueue2.c b/plugins/elements/gstqueue2.c index fa7747ca13..1908188d0a 100644 --- a/plugins/elements/gstqueue2.c +++ b/plugins/elements/gstqueue2.c @@ -2139,7 +2139,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent, /* reset rate counters */ reset_rate_timer (queue); gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop, - queue->srcpad); + queue->srcpad, NULL); GST_QUEUE2_MUTEX_UNLOCK (queue); } else { GST_QUEUE2_MUTEX_LOCK (queue); @@ -2950,7 +2950,8 @@ gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active) queue->sinkresult = GST_FLOW_OK; queue->is_eos = FALSE; queue->unexpected = FALSE; - result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad); + result = + gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad, NULL); GST_QUEUE2_MUTEX_UNLOCK (queue); } else { /* unblock loop function */ diff --git a/plugins/elements/gsttypefindelement.c b/plugins/elements/gsttypefindelement.c index 74d2edeaf7..2ee33a0644 100644 --- a/plugins/elements/gsttypefindelement.c +++ b/plugins/elements/gsttypefindelement.c @@ -460,7 +460,7 @@ gst_type_find_element_seek (GstTypeFindElement * typefind, GstEvent * event) gst_segment_do_seek (&seeksegment, rate, format, flags, cur_type, cur, stop_type, stop, NULL); - flush = !!(flags & GST_SEEK_FLAG_FLUSH); + flush = ! !(flags & GST_SEEK_FLAG_FLUSH); GST_DEBUG_OBJECT (typefind, "New segment %" GST_SEGMENT_FORMAT, &seeksegment); @@ -504,7 +504,7 @@ gst_type_find_element_seek (GstTypeFindElement * typefind, GstEvent * event) /* restart our task since it might have been stopped when we did the * flush. */ gst_pad_start_task (typefind->sink, - (GstTaskFunction) gst_type_find_element_loop, typefind->sink); + (GstTaskFunction) gst_type_find_element_loop, typefind->sink, NULL); /* streaming can continue now */ GST_PAD_STREAM_UNLOCK (typefind->sink); @@ -1218,7 +1218,7 @@ gst_type_find_element_activate_sink (GstPad * pad, GstObject * parent) /* only start our task if we ourselves decide to start in pull mode */ return gst_pad_start_task (pad, (GstTaskFunction) gst_type_find_element_loop, - pad); + pad, NULL); typefind_push: { diff --git a/tests/check/gst/gstmessage.c b/tests/check/gst/gstmessage.c index deea587b21..59a6d63917 100644 --- a/tests/check/gst/gstmessage.c +++ b/tests/check/gst/gstmessage.c @@ -208,7 +208,7 @@ GST_START_TEST (test_parsing) /* create a task with some dummy function, we're not actually going to run * the task here */ - task = gst_task_new ((GstTaskFunction) gst_object_unref, NULL); + task = gst_task_new ((GstTaskFunction) gst_object_unref, NULL, NULL); ASSERT_OBJECT_REFCOUNT (task, "task", 1); diff --git a/tests/check/gst/gsttask.c b/tests/check/gst/gsttask.c index 824f92669f..699219f4ae 100644 --- a/tests/check/gst/gsttask.c +++ b/tests/check/gst/gsttask.c @@ -46,7 +46,7 @@ GST_START_TEST (test_join) GstTask *t; gboolean ret; - t = gst_task_new (task_func2, &t); + t = gst_task_new (task_func2, &t, NULL); fail_if (t == NULL); g_rec_mutex_init (&task_mutex); @@ -90,7 +90,7 @@ GST_START_TEST (test_lock_start) GstTask *t; gboolean ret; - t = gst_task_new (task_func, NULL); + t = gst_task_new (task_func, NULL, NULL); fail_if (t == NULL); g_rec_mutex_init (&task_mutex); @@ -128,7 +128,7 @@ GST_START_TEST (test_lock) GstTask *t; gboolean ret; - t = gst_task_new (task_func, NULL); + t = gst_task_new (task_func, NULL, NULL); fail_if (t == NULL); g_rec_mutex_init (&task_mutex); @@ -156,7 +156,7 @@ GST_START_TEST (test_no_lock) GstTask *t; gboolean ret; - t = gst_task_new (task_func, NULL); + t = gst_task_new (task_func, NULL, NULL); fail_if (t == NULL); /* stop should be possible without lock */ @@ -182,7 +182,7 @@ GST_START_TEST (test_create) { GstTask *t; - t = gst_task_new (task_func, NULL); + t = gst_task_new (task_func, NULL, NULL); fail_if (t == NULL); gst_object_unref (t);