task: Introduce gst_task_resume() API

This new API allow resuming a task if it was paused, while leaving it to
stopped stated if it was stopped or not started yet. This new API can be
useful for callback driver workflow, where you basically want to pause and
resume the task when buffers are notified while avoiding the race with a
gst_task_stop() coming from another thread.
This commit is contained in:
Nicolas Dufresne 2020-01-31 11:32:10 -05:00 committed by GStreamer Merge Bot
parent c682579c5e
commit e272ae281f
3 changed files with 104 additions and 23 deletions

View file

@ -657,33 +657,14 @@ start_task (GstTask * task)
return res; return res;
} }
static inline gboolean
/** gst_task_set_state_unlocked (GstTask * task, GstTaskState state)
* gst_task_set_state:
* @task: a #GstTask
* @state: the new task state
*
* Sets the state of @task to @state.
*
* The @task must have a lock associated with it using
* gst_task_set_lock() when going to GST_TASK_STARTED or GST_TASK_PAUSED or
* this function will return %FALSE.
*
* MT safe.
*
* Returns: %TRUE if the state could be changed.
*/
gboolean
gst_task_set_state (GstTask * task, GstTaskState state)
{ {
GstTaskState old; GstTaskState old;
gboolean res = TRUE; gboolean res = TRUE;
g_return_val_if_fail (GST_IS_TASK (task), FALSE);
GST_DEBUG_OBJECT (task, "Changing task %p to state %d", task, state); GST_DEBUG_OBJECT (task, "Changing task %p to state %d", task, state);
GST_OBJECT_LOCK (task);
if (state != GST_TASK_STOPPED) if (state != GST_TASK_STOPPED)
if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL)) if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL))
goto no_lock; goto no_lock;
@ -709,7 +690,6 @@ gst_task_set_state (GstTask * task, GstTaskState state)
break; break;
} }
} }
GST_OBJECT_UNLOCK (task);
return res; return res;
@ -717,12 +697,41 @@ gst_task_set_state (GstTask * task, GstTaskState state)
no_lock: no_lock:
{ {
GST_WARNING_OBJECT (task, "state %d set on task without a lock", state); GST_WARNING_OBJECT (task, "state %d set on task without a lock", state);
GST_OBJECT_UNLOCK (task);
g_warning ("task without a lock can't be set to state %d", state); g_warning ("task without a lock can't be set to state %d", state);
return FALSE; return FALSE;
} }
} }
/**
* gst_task_set_state:
* @task: a #GstTask
* @state: the new task state
*
* Sets the state of @task to @state.
*
* The @task must have a lock associated with it using
* gst_task_set_lock() when going to GST_TASK_STARTED or GST_TASK_PAUSED or
* this function will return %FALSE.
*
* MT safe.
*
* Returns: %TRUE if the state could be changed.
*/
gboolean
gst_task_set_state (GstTask * task, GstTaskState state)
{
gboolean res = TRUE;
g_return_val_if_fail (GST_IS_TASK (task), FALSE);
GST_OBJECT_LOCK (task);
res = gst_task_set_state_unlocked (task, state);
GST_OBJECT_UNLOCK (task);
return res;
}
/** /**
* gst_task_start: * gst_task_start:
* @task: The #GstTask to start * @task: The #GstTask to start
@ -777,6 +786,32 @@ gst_task_pause (GstTask * task)
return gst_task_set_state (task, GST_TASK_PAUSED); return gst_task_set_state (task, GST_TASK_PAUSED);
} }
/**
* gst_task_resume:
* @task: The #GstTask to resume
*
* Resume @task in case it was paused. If the task was stopped, it will
* remain in that state and this function will return %FALSE.
*
* Returns: %TRUE if the task could be resumed.
*
* MT safe.
* Since: 1.18
*/
gboolean
gst_task_resume (GstTask * task)
{
gboolean res = FALSE;
g_return_val_if_fail (GST_IS_TASK (task), FALSE);
GST_OBJECT_LOCK (task);
if (GET_TASK_STATE (task) != GST_TASK_STOPPED)
res = gst_task_set_state_unlocked (task, GST_TASK_STARTED);
GST_OBJECT_UNLOCK (task);
return res;
}
/** /**
* gst_task_join: * gst_task_join:
* @task: The #GstTask to join * @task: The #GstTask to join

View file

@ -207,6 +207,9 @@ gboolean gst_task_stop (GstTask *task);
GST_API GST_API
gboolean gst_task_pause (GstTask *task); gboolean gst_task_pause (GstTask *task);
GST_API
gboolean gst_task_resume (GstTask *task);
GST_API GST_API
gboolean gst_task_join (GstTask *task); gboolean gst_task_join (GstTask *task);

View file

@ -31,6 +31,48 @@ static GRecMutex task_mutex;
#define TEST_RACE_ITERATIONS 1000 #define TEST_RACE_ITERATIONS 1000
static void
task_resume_func (void *data)
{
g_mutex_lock (&task_lock);
g_cond_signal (&task_cond);
g_mutex_unlock (&task_lock);
}
GST_START_TEST (test_resume)
{
GstTask *t;
t = gst_task_new (task_resume_func, &t, NULL);
fail_if (t == NULL);
g_rec_mutex_init (&task_mutex);
gst_task_set_lock (t, &task_mutex);
g_cond_init (&task_cond);
g_mutex_init (&task_lock);
g_mutex_lock (&task_lock);
/* Pause the task, and resume it. */
fail_unless (gst_task_pause (t));
fail_unless (gst_task_resume (t));
while (GST_TASK_STATE (t) != GST_TASK_STARTED)
g_cond_wait (&task_cond, &task_lock);
fail_unless (gst_task_stop (t));
g_mutex_unlock (&task_lock);
fail_unless (gst_task_join (t));
/* Make sure we cannot resume from stopped. */
fail_if (gst_task_resume (t));
gst_object_unref (t);
}
GST_END_TEST;
static void static void
task_signal_pause_func (void *data) task_signal_pause_func (void *data)
{ {
@ -265,6 +307,7 @@ gst_task_suite (void)
tcase_add_test (tc_chain, test_lock_start); tcase_add_test (tc_chain, test_lock_start);
tcase_add_test (tc_chain, test_join); tcase_add_test (tc_chain, test_join);
tcase_add_test (tc_chain, test_pause_stop_race); tcase_add_test (tc_chain, test_pause_stop_race);
tcase_add_test (tc_chain, test_resume);
return s; return s;
} }