add a gst_element_set_state_async method that sets the state and starts a thread to make sure the state change comple...

Original commit message from CVS:
* check/gst/gstpipeline.c: (GST_START_TEST):
* docs/gst/gstreamer-sections.txt:
* gst/gstutils.c: (set_state_async_thread_func),
(gst_element_set_state_async):
* gst/gstutils.h:
add a gst_element_set_state_async method that
sets the state and starts a thread to make sure the state
change completes as best as it can
This commit is contained in:
Thomas Vander Stichele 2005-09-12 18:14:03 +00:00
parent 0f69c8a186
commit cf231073c2
6 changed files with 173 additions and 129 deletions

View file

@ -1,3 +1,14 @@
2005-09-12 Thomas Vander Stichele <thomas at apestaart dot org>
* check/gst/gstpipeline.c: (GST_START_TEST):
* docs/gst/gstreamer-sections.txt:
* gst/gstutils.c: (set_state_async_thread_func),
(gst_element_set_state_async):
* gst/gstutils.h:
add a "gst_element_set_state_async" method that
sets the state and starts a thread to make sure the state
change completes as best as it can
2005-09-12 Thomas Vander Stichele <thomas at apestaart dot org> 2005-09-12 Thomas Vander Stichele <thomas at apestaart dot org>
* check/gst/gstpipeline.c: (GST_START_TEST), (gst_pipeline_suite): * check/gst/gstpipeline.c: (GST_START_TEST), (gst_pipeline_suite):

View file

@ -21,26 +21,6 @@
#include <gst/check/gstcheck.h> #include <gst/check/gstcheck.h>
#if 0
static void
pop_messages (GstBus * bus, int count)
{
GstMessage *message;
int i;
GST_DEBUG ("popping %d messages", count);
for (i = 0; i < count; ++i) {
fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1)
== GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED");
message = gst_bus_pop (bus);
gst_message_unref (message);
}
GST_DEBUG ("popped %d messages", count);
}
#endif
/* an empty pipeline can go to PLAYING in one go */ /* an empty pipeline can go to PLAYING in one go */
GST_START_TEST (test_async_state_change_empty) GST_START_TEST (test_async_state_change_empty)
{ {
@ -87,6 +67,7 @@ GST_START_TEST (test_async_state_change_fake)
GstPipeline *pipeline; GstPipeline *pipeline;
GstElement *src, *sink; GstElement *src, *sink;
GstBus *bus; GstBus *bus;
gboolean done = FALSE;
pipeline = GST_PIPELINE (gst_pipeline_new (NULL)); pipeline = GST_PIPELINE (gst_pipeline_new (NULL));
fail_unless (pipeline != NULL, "Could not create pipeline"); fail_unless (pipeline != NULL, "Could not create pipeline");
@ -100,37 +81,28 @@ GST_START_TEST (test_async_state_change_fake)
bus = gst_pipeline_get_bus (pipeline); bus = gst_pipeline_get_bus (pipeline);
fail_unless_equals_int (gst_element_set_state (GST_ELEMENT (pipeline), fail_unless_equals_int (gst_element_set_state_async (GST_ELEMENT (pipeline),
GST_STATE_PLAYING), GST_STATE_CHANGE_ASYNC); GST_STATE_PLAYING), GST_STATE_CHANGE_ASYNC);
#if 0 while (!done) {
/* FIXME: Wim is implementing a set_state_async, which will
* spawn a thread and make sure the pipeline gets to the
* requested final state, or errors out before */
gst_bin_watch_for_state_change (GST_BIN (pipeline));
while ((type = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1))) {
GstMessage *message; GstMessage *message;
GstMessageType type;
GstState old, new; GstState old, new;
GstState state, pending; GstMessageType type;
GstStateChange ret;
GTimeVal timeval;
type = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
message = gst_bus_pop (bus); message = gst_bus_pop (bus);
gst_message_parse_state_changed (message, &old, &new); gst_message_parse_state_changed (message, &old, &new);
GST_DEBUG_OBJECT (message->src, "state change from %d to %d", old, new); GST_DEBUG_OBJECT (message->src, "state change from %d to %d", old, new);
g_print ("message\n"); if (message->src == GST_OBJECT (pipeline) && new == GST_STATE_PLAYING)
g_print ("%s: %d -> %d\n", GST_OBJECT_NAME (message->src), old, new); done = TRUE;
gst_message_unref (message); gst_message_unref (message);
timeval.tv_sec = 0;
timeval.tv_usec = 0;
ret = gst_element_get_state (GST_ELEMENT (pipeline), &state, &pending,
&timeval);
} }
#endif
g_object_set (G_OBJECT (pipeline), "play-timeout", 3 * GST_SECOND, NULL);
fail_unless_equals_int (gst_element_set_state (GST_ELEMENT (pipeline),
GST_STATE_NULL), GST_STATE_CHANGE_SUCCESS);
gst_object_unref (bus);
gst_object_unref (pipeline); gst_object_unref (pipeline);
} }

View file

@ -499,6 +499,7 @@ gst_element_set_locked_state
gst_element_set_name gst_element_set_name
gst_element_set_parent gst_element_set_parent
gst_element_set_state gst_element_set_state
gst_element_set_state_async
gst_element_state_get_name gst_element_state_get_name
gst_element_sync_state_with_parent gst_element_sync_state_with_parent
gst_element_unlink gst_element_unlink

View file

@ -1887,6 +1887,92 @@ gst_bin_watch_for_state_change (GstBin * bin)
g_thread_pool_push (pool, gst_object_ref (bin), NULL); g_thread_pool_push (pool, gst_object_ref (bin), NULL);
} }
static void
set_state_async_thread_func (GstElement * element, gpointer statep)
{
GstState state = GPOINTER_TO_INT (statep);
GstState current, pending;
GstStateChangeReturn ret = GST_STATE_CHANGE_ASYNC;
GST_CAT_INFO_OBJECT (GST_CAT_STATES, element,
"new thread ensuring state change to %s",
gst_element_state_get_name (state));
while (TRUE) {
/* wait indefinitely */
ret = gst_element_get_state (element, &current, &pending, NULL);
GST_CAT_INFO_OBJECT (GST_CAT_STATES, element,
"get_state returned %d, current %s, pending %s", ret,
gst_element_state_get_name (current),
gst_element_state_get_name (pending));
/* can only be SUCCESS or FAILURE */
if (ret == GST_STATE_CHANGE_FAILURE) {
/* we can only break, hopefully an error message was posted as well */
GST_CAT_INFO_OBJECT (GST_CAT_STATES, element,
"FAILURE during state change");
break;
} else if (ret == GST_STATE_CHANGE_SUCCESS) {
if (current == state) {
GST_CAT_INFO_OBJECT (GST_CAT_STATES, element,
"successfully reached final state");
break;
}
GST_CAT_INFO_OBJECT (GST_CAT_STATES, element,
"setting target state %s again", gst_element_state_get_name (state));
gst_element_set_state (element, state);
} else {
g_assert_not_reached ();
}
}
GST_CAT_INFO_OBJECT (GST_CAT_STATES, element,
"thread done waiting on state change");
gst_object_unref (element);
}
/**
* gst_element_set_state_async:
* @element: a #GstElement to change state of
* @state: the element's new #GstState
*
* Sets the state of the element. This function will try to set the
* requested state by going through all the intermediary states and calling
* the class's state change function for each. If the state change returns
* #GST_STATE_CHANGE_ASYNC at any time, a thread will be started to
* monitor the state change and make sure the element is brought to the
* requested state.
*
* Returns: Result of the state change using #GstStateChangeReturn.
*
* MT safe.
*/
GstStateChangeReturn
gst_element_set_state_async (GstElement * element, GstState state)
{
GstStateChangeReturn ret;
ret = gst_element_set_state (element, state);
if (ret == GST_STATE_CHANGE_ASYNC) {
static GThreadPool *pool = NULL;
static GStaticMutex mutex = G_STATIC_MUTEX_INIT;
GST_CAT_INFO_OBJECT (GST_CAT_STATES, element,
"starting new thread to ensure state change to %s",
gst_element_state_get_name (state));
g_static_mutex_lock (&mutex);
if (pool == NULL)
pool = g_thread_pool_new ((GFunc) set_state_async_thread_func,
GINT_TO_POINTER (state), -1, FALSE, NULL);
g_static_mutex_unlock (&mutex);
g_thread_pool_push (pool, gst_object_ref (element), NULL);
}
return ret;
}
static void static void
gst_element_populate_std_props (GObjectClass * klass, const gchar * prop_name, gst_element_populate_std_props (GObjectClass * klass, const gchar * prop_name,
guint arg_id, GParamFlags flags) guint arg_id, GParamFlags flags)

View file

@ -31,7 +31,6 @@ G_BEGIN_DECLS
void gst_util_set_value_from_string (GValue *value, const gchar *value_str); void gst_util_set_value_from_string (GValue *value, const gchar *value_str);
void gst_util_set_object_arg (GObject *object, const gchar *name, const gchar *value); void gst_util_set_object_arg (GObject *object, const gchar *name, const gchar *value);
void gst_util_dump_mem (const guchar *mem, guint size); void gst_util_dump_mem (const guchar *mem, guint size);
guint64 gst_util_uint64_scale (guint64 val, guint64 num, guint64 denom); guint64 gst_util_uint64_scale (guint64 val, guint64 num, guint64 denom);
@ -98,9 +97,9 @@ type_as_function ## _get_type (void) \
* After this you will need to implement interface_as_function ## _supported * After this you will need to implement interface_as_function ## _supported
* and interface_as_function ## _interface_init * and interface_as_function ## _interface_init
*/ */
#define GST_BOILERPLATE_WITH_INTERFACE(type, type_as_function, parent_type, \ #define GST_BOILERPLATE_WITH_INTERFACE(type, type_as_function, \
parent_type_as_macro, interface_type, interface_type_as_macro, \ parent_type, parent_type_as_macro, interface_type, \
interface_as_function) \ interface_type_as_macro, interface_as_function) \
\ \
static void interface_as_function ## _interface_init (interface_type ## Class *klass); \ static void interface_as_function ## _interface_init (interface_type ## Class *klass); \
static gboolean interface_as_function ## _supported (type *object, GType iface_type); \ static gboolean interface_as_function ## _supported (type *object, GType iface_type); \
@ -127,7 +126,8 @@ type_as_function ## _init_interfaces (GType type)
\ \
g_type_add_interface_static (type, GST_TYPE_IMPLEMENTS_INTERFACE, \ g_type_add_interface_static (type, GST_TYPE_IMPLEMENTS_INTERFACE, \
&implements_iface_info); \ &implements_iface_info); \
g_type_add_interface_static (type, interface_type_as_macro, &iface_info); \ g_type_add_interface_static (type, interface_type_as_macro, \
&iface_info); \
} \ } \
\ \
GST_BOILERPLATE_FULL (type, type_as_function, parent_type, \ GST_BOILERPLATE_FULL (type, type_as_function, parent_type, \
@ -143,7 +143,8 @@ GST_BOILERPLATE_FULL (type, type_as_function, parent_type,
/* Same as above, but in case there is no implementation, it evaluates /* Same as above, but in case there is no implementation, it evaluates
* to def_return */ * to def_return */
#define GST_CALL_PARENT_WITH_DEFAULT(parent_class_cast, name, args, def_return) \ #define GST_CALL_PARENT_WITH_DEFAULT(parent_class_cast, name, args, \
def_return) \
((parent_class_cast(parent_class)->name != NULL) ? \ ((parent_class_cast(parent_class)->name != NULL) ? \
parent_class_cast(parent_class)->name args : def_return) parent_class_cast(parent_class)->name args : def_return)
@ -312,6 +313,7 @@ void gst_element_unlink_pads (GstElement *src, const
gboolean gst_element_link_pads_filtered (GstElement * src, const gchar * srcpadname, gboolean gst_element_link_pads_filtered (GstElement * src, const gchar * srcpadname,
GstElement * dest, const gchar * destpadname, GstElement * dest, const gchar * destpadname,
GstCaps *filter); GstCaps *filter);
GstStateChangeReturn gst_element_set_state_async (GstElement * element, GstState state);
/* util elementfactory functions */ /* util elementfactory functions */
gboolean gst_element_factory_can_src_caps(GstElementFactory *factory, const GstCaps *caps); gboolean gst_element_factory_can_src_caps(GstElementFactory *factory, const GstCaps *caps);

View file

@ -21,26 +21,6 @@
#include <gst/check/gstcheck.h> #include <gst/check/gstcheck.h>
#if 0
static void
pop_messages (GstBus * bus, int count)
{
GstMessage *message;
int i;
GST_DEBUG ("popping %d messages", count);
for (i = 0; i < count; ++i) {
fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1)
== GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED");
message = gst_bus_pop (bus);
gst_message_unref (message);
}
GST_DEBUG ("popped %d messages", count);
}
#endif
/* an empty pipeline can go to PLAYING in one go */ /* an empty pipeline can go to PLAYING in one go */
GST_START_TEST (test_async_state_change_empty) GST_START_TEST (test_async_state_change_empty)
{ {
@ -87,6 +67,7 @@ GST_START_TEST (test_async_state_change_fake)
GstPipeline *pipeline; GstPipeline *pipeline;
GstElement *src, *sink; GstElement *src, *sink;
GstBus *bus; GstBus *bus;
gboolean done = FALSE;
pipeline = GST_PIPELINE (gst_pipeline_new (NULL)); pipeline = GST_PIPELINE (gst_pipeline_new (NULL));
fail_unless (pipeline != NULL, "Could not create pipeline"); fail_unless (pipeline != NULL, "Could not create pipeline");
@ -100,37 +81,28 @@ GST_START_TEST (test_async_state_change_fake)
bus = gst_pipeline_get_bus (pipeline); bus = gst_pipeline_get_bus (pipeline);
fail_unless_equals_int (gst_element_set_state (GST_ELEMENT (pipeline), fail_unless_equals_int (gst_element_set_state_async (GST_ELEMENT (pipeline),
GST_STATE_PLAYING), GST_STATE_CHANGE_ASYNC); GST_STATE_PLAYING), GST_STATE_CHANGE_ASYNC);
#if 0 while (!done) {
/* FIXME: Wim is implementing a set_state_async, which will
* spawn a thread and make sure the pipeline gets to the
* requested final state, or errors out before */
gst_bin_watch_for_state_change (GST_BIN (pipeline));
while ((type = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1))) {
GstMessage *message; GstMessage *message;
GstMessageType type;
GstState old, new; GstState old, new;
GstState state, pending; GstMessageType type;
GstStateChange ret;
GTimeVal timeval;
type = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
message = gst_bus_pop (bus); message = gst_bus_pop (bus);
gst_message_parse_state_changed (message, &old, &new); gst_message_parse_state_changed (message, &old, &new);
GST_DEBUG_OBJECT (message->src, "state change from %d to %d", old, new); GST_DEBUG_OBJECT (message->src, "state change from %d to %d", old, new);
g_print ("message\n"); if (message->src == GST_OBJECT (pipeline) && new == GST_STATE_PLAYING)
g_print ("%s: %d -> %d\n", GST_OBJECT_NAME (message->src), old, new); done = TRUE;
gst_message_unref (message); gst_message_unref (message);
timeval.tv_sec = 0;
timeval.tv_usec = 0;
ret = gst_element_get_state (GST_ELEMENT (pipeline), &state, &pending,
&timeval);
} }
#endif
g_object_set (G_OBJECT (pipeline), "play-timeout", 3 * GST_SECOND, NULL);
fail_unless_equals_int (gst_element_set_state (GST_ELEMENT (pipeline),
GST_STATE_NULL), GST_STATE_CHANGE_SUCCESS);
gst_object_unref (bus);
gst_object_unref (pipeline); gst_object_unref (pipeline);
} }