element: Add gst_element_call_async()

This calls a function from another thread, asynchronously. This is to be
used for cases when a state change has to be performed from a streaming
thread, directly via gst_element_set_state() or indirectly e.g. via SEEK
events.

Calling those functions directly from the streaming thread will cause
deadlocks in many situations, as they might involve waiting for the
streaming thread to shut down from this very streaming thread.

This is mostly a convenience function around a GThreadPool and is for example
used by GstBin to continue asynchronous state changes.

https://bugzilla.gnome.org/show_bug.cgi?id=760532
This commit is contained in:
Sebastian Dröge 2016-02-28 12:06:40 +02:00
parent a4db38ab05
commit 8177173db0
6 changed files with 97 additions and 31 deletions

View file

@ -917,6 +917,10 @@ gst_element_add_property_notify_watch
gst_element_add_property_deep_notify_watch gst_element_add_property_deep_notify_watch
gst_element_remove_property_notify_watch gst_element_remove_property_notify_watch
<SUBSECTION element-call-async>
GstElementCallAsyncFunc
gst_element_call_async
<SUBSECTION Standard> <SUBSECTION Standard>
GST_ELEMENT GST_ELEMENT
GST_IS_ELEMENT GST_IS_ELEMENT

View file

@ -201,7 +201,6 @@ struct _GstBinPrivate
typedef struct typedef struct
{ {
GstBin *bin;
guint32 cookie; guint32 cookie;
GstState pending; GstState pending;
} BinContinueData; } BinContinueData;
@ -221,7 +220,7 @@ static GstStateChangeReturn gst_bin_get_state_func (GstElement * element,
static void bin_handle_async_done (GstBin * bin, GstStateChangeReturn ret, static void bin_handle_async_done (GstBin * bin, GstStateChangeReturn ret,
gboolean flag_pending, GstClockTime running_time); gboolean flag_pending, GstClockTime running_time);
static void bin_handle_async_start (GstBin * bin); static void bin_handle_async_start (GstBin * bin);
static void bin_push_state_continue (BinContinueData * data); static void bin_push_state_continue (GstBin * bin, BinContinueData * data);
static void bin_do_eos (GstBin * bin); static void bin_do_eos (GstBin * bin);
static gboolean gst_bin_add_func (GstBin * bin, GstElement * element); static gboolean gst_bin_add_func (GstBin * bin, GstElement * element);
@ -249,7 +248,7 @@ static gboolean gst_bin_do_latency_func (GstBin * bin);
static void bin_remove_messages (GstBin * bin, GstObject * src, static void bin_remove_messages (GstBin * bin, GstObject * src,
GstMessageType types); GstMessageType types);
static void gst_bin_continue_func (BinContinueData * data); static void gst_bin_continue_func (GstBin * bin, BinContinueData * data);
static gint bin_element_is_sink (GstElement * child, GstBin * bin); static gint bin_element_is_sink (GstElement * child, GstBin * bin);
static gint bin_element_is_src (GstElement * child, GstBin * bin); static gint bin_element_is_src (GstElement * child, GstBin * bin);
@ -358,7 +357,6 @@ gst_bin_class_init (GstBinClass * klass)
{ {
GObjectClass *gobject_class; GObjectClass *gobject_class;
GstElementClass *gstelement_class; GstElementClass *gstelement_class;
GError *err;
gobject_class = (GObjectClass *) klass; gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass; gstelement_class = (GstElementClass *) klass;
@ -470,14 +468,6 @@ gst_bin_class_init (GstBinClass * klass)
klass->handle_message = GST_DEBUG_FUNCPTR (gst_bin_handle_message_func); klass->handle_message = GST_DEBUG_FUNCPTR (gst_bin_handle_message_func);
klass->do_latency = GST_DEBUG_FUNCPTR (gst_bin_do_latency_func); klass->do_latency = GST_DEBUG_FUNCPTR (gst_bin_do_latency_func);
GST_DEBUG ("creating bin thread pool");
err = NULL;
klass->pool =
g_thread_pool_new ((GFunc) gst_bin_continue_func, NULL, -1, FALSE, &err);
if (err != NULL) {
g_critical ("could not alloc threadpool %s", err->message);
}
} }
static void static void
@ -3025,13 +3015,11 @@ gst_bin_send_event (GstElement * element, GstEvent * event)
* their state, this function will attempt to bring the bin to the next state. * their state, this function will attempt to bring the bin to the next state.
*/ */
static void static void
gst_bin_continue_func (BinContinueData * data) gst_bin_continue_func (GstBin * bin, BinContinueData * data)
{ {
GstBin *bin;
GstState current, next, pending; GstState current, next, pending;
GstStateChange transition; GstStateChange transition;
bin = data->bin;
pending = data->pending; pending = data->pending;
GST_DEBUG_OBJECT (bin, "waiting for state lock"); GST_DEBUG_OBJECT (bin, "waiting for state lock");
@ -3065,8 +3053,6 @@ gst_bin_continue_func (BinContinueData * data)
GST_STATE_UNLOCK (bin); GST_STATE_UNLOCK (bin);
GST_DEBUG_OBJECT (bin, "state continue done"); GST_DEBUG_OBJECT (bin, "state continue done");
gst_object_unref (bin);
g_slice_free (BinContinueData, data);
return; return;
interrupted: interrupted:
@ -3074,8 +3060,6 @@ interrupted:
GST_OBJECT_UNLOCK (bin); GST_OBJECT_UNLOCK (bin);
GST_STATE_UNLOCK (bin); GST_STATE_UNLOCK (bin);
GST_DEBUG_OBJECT (bin, "state continue aborted due to intervening change"); GST_DEBUG_OBJECT (bin, "state continue aborted due to intervening change");
gst_object_unref (bin);
g_slice_free (BinContinueData, data);
return; return;
} }
} }
@ -3095,17 +3079,18 @@ bin_bus_handler (GstBus * bus, GstMessage * message, GstBin * bin)
} }
static void static void
bin_push_state_continue (BinContinueData * data) free_bin_continue_data (BinContinueData * data)
{ {
GstBinClass *klass; g_slice_free (BinContinueData, data);
GstBin *bin; }
/* ref was taken */
bin = data->bin;
klass = GST_BIN_GET_CLASS (bin);
static void
bin_push_state_continue (GstBin * bin, BinContinueData * data)
{
GST_DEBUG_OBJECT (bin, "pushing continue on thread pool"); GST_DEBUG_OBJECT (bin, "pushing continue on thread pool");
g_thread_pool_push (klass->pool, data, NULL); gst_element_call_async (GST_ELEMENT_CAST (bin),
(GstElementCallAsyncFunc) gst_bin_continue_func, data,
(GDestroyNotify) free_bin_continue_data);
} }
/* an element started an async state change, if we were not busy with a state /* an element started an async state change, if we were not busy with a state
@ -3268,8 +3253,6 @@ bin_handle_async_done (GstBin * bin, GstStateChangeReturn ret,
cont = g_slice_new (BinContinueData); cont = g_slice_new (BinContinueData);
/* ref to the bin */
cont->bin = gst_object_ref (bin);
/* cookie to detect concurrent state change */ /* cookie to detect concurrent state change */
cont->cookie = GST_ELEMENT_CAST (bin)->state_cookie; cont->cookie = GST_ELEMENT_CAST (bin)->state_cookie;
/* pending target state */ /* pending target state */
@ -3300,7 +3283,7 @@ bin_handle_async_done (GstBin * bin, GstStateChangeReturn ret,
if (cont) { if (cont) {
/* toplevel, start continue state */ /* toplevel, start continue state */
GST_DEBUG_OBJECT (bin, "all async-done, starting state continue"); GST_DEBUG_OBJECT (bin, "all async-done, starting state continue");
bin_push_state_continue (cont); bin_push_state_continue (bin, cont);
} else { } else {
GST_DEBUG_OBJECT (bin, "state change complete"); GST_DEBUG_OBJECT (bin, "state change complete");
GST_STATE_BROADCAST (bin); GST_STATE_BROADCAST (bin);

View file

@ -152,7 +152,7 @@ struct _GstBinClass {
GstElementClass parent_class; GstElementClass parent_class;
/*< private >*/ /*< private >*/
GThreadPool *pool; GThreadPool *pool; /* deprecated */
/* signals */ /* signals */
void (*element_added) (GstBin *bin, GstElement *child); void (*element_added) (GstBin *bin, GstElement *child);

View file

@ -147,9 +147,13 @@ static GstPadTemplate
* gst_element_class_get_request_pad_template (GstElementClass * * gst_element_class_get_request_pad_template (GstElementClass *
element_class, const gchar * name); element_class, const gchar * name);
static void gst_element_call_async_func (gpointer data, gpointer user_data);
static GstObjectClass *parent_class = NULL; static GstObjectClass *parent_class = NULL;
static guint gst_element_signals[LAST_SIGNAL] = { 0 }; static guint gst_element_signals[LAST_SIGNAL] = { 0 };
static GThreadPool *gst_element_pool = NULL;
/* this is used in gstelementfactory.c:gst_element_register() */ /* this is used in gstelementfactory.c:gst_element_register() */
GQuark __gst_elementclass_factory = 0; GQuark __gst_elementclass_factory = 0;
@ -187,6 +191,7 @@ static void
gst_element_class_init (GstElementClass * klass) gst_element_class_init (GstElementClass * klass)
{ {
GObjectClass *gobject_class; GObjectClass *gobject_class;
GError *err = NULL;
gobject_class = (GObjectClass *) klass; gobject_class = (GObjectClass *) klass;
@ -247,6 +252,15 @@ gst_element_class_init (GstElementClass * klass)
klass->set_context = GST_DEBUG_FUNCPTR (gst_element_set_context_default); klass->set_context = GST_DEBUG_FUNCPTR (gst_element_set_context_default);
klass->elementfactory = NULL; klass->elementfactory = NULL;
GST_DEBUG ("creating element thread pool");
gst_element_pool =
g_thread_pool_new ((GFunc) gst_element_call_async_func, NULL, -1, FALSE,
&err);
if (err != NULL) {
g_critical ("could not alloc threadpool %s", err->message);
g_clear_error (&err);
}
} }
static void static void
@ -3364,3 +3378,60 @@ gst_element_remove_property_notify_watch (GstElement * element, gulong watch_id)
{ {
g_signal_handler_disconnect (element, watch_id); g_signal_handler_disconnect (element, watch_id);
} }
typedef struct
{
GstElement *element;
GstElementCallAsyncFunc func;
gpointer user_data;
GDestroyNotify destroy_notify;
} GstElementCallAsyncData;
static void
gst_element_call_async_func (gpointer data, gpointer user_data)
{
GstElementCallAsyncData *async_data = data;
async_data->func (async_data->element, async_data->user_data);
if (async_data->destroy_notify)
async_data->destroy_notify (async_data->user_data);
gst_object_unref (async_data->element);
g_free (async_data);
}
/**
* gst_element_call_async:
* @element: a #GstElement
* @func: Function to call asynchronously from another thread
* @user_data: Data to pass to @func
* @destroy_notify: GDestroyNotify for @user_data
*
* Calls @func from another thread and passes @user_data to it. This is to be
* used for cases when a state change has to be performed from a streaming
* thread, directly via gst_element_set_state() or indirectly e.g. via SEEK
* events.
*
* Calling those functions directly from the streaming thread will cause
* deadlocks in many situations, as they might involve waiting for the
* streaming thread to shut down from this very streaming thread.
*
* MT safe.
*
* Since: 1.10
*/
void
gst_element_call_async (GstElement * element, GstElementCallAsyncFunc func,
gpointer user_data, GDestroyNotify destroy_notify)
{
GstElementCallAsyncData *async_data;
g_return_if_fail (GST_IS_ELEMENT (element));
async_data = g_new0 (GstElementCallAsyncData, 1);
async_data->element = gst_object_ref (element);
async_data->func = func;
async_data->user_data = user_data;
async_data->destroy_notify = destroy_notify;
g_thread_pool_push (gst_element_pool, async_data, NULL);
}

View file

@ -816,6 +816,13 @@ GstStateChangeReturn gst_element_continue_state (GstElement * element,
GstStateChangeReturn ret); GstStateChangeReturn ret);
void gst_element_lost_state (GstElement * element); void gst_element_lost_state (GstElement * element);
typedef void (*GstElementCallAsyncFunc) (GstElement * element,
gpointer user_data);
void gst_element_call_async (GstElement * element,
GstElementCallAsyncFunc func, gpointer user_data,
GDestroyNotify destroy_notify);
/* factory management */ /* factory management */
GstElementFactory* gst_element_get_factory (GstElement *element); GstElementFactory* gst_element_get_factory (GstElement *element);

View file

@ -478,6 +478,7 @@ EXPORTS
gst_element_add_pad gst_element_add_pad
gst_element_add_property_deep_notify_watch gst_element_add_property_deep_notify_watch
gst_element_add_property_notify_watch gst_element_add_property_notify_watch
gst_element_call_async
gst_element_change_state gst_element_change_state
gst_element_class_add_metadata gst_element_class_add_metadata
gst_element_class_add_pad_template gst_element_class_add_pad_template