From d14c4c4a678f2874d7c59e2be6d6887a324ae5b1 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 19 Mar 2007 10:47:56 +0000 Subject: [PATCH] docs/gst/gstreamer-sections.txt: Add new element field and method. Original commit message from CVS: * docs/gst/gstreamer-sections.txt: Add new element field and method. * gst/gstbin.c: (gst_bin_class_init), (gst_bin_init), (bin_remove_messages), (gst_bin_add_func), (gst_bin_remove_func), (gst_bin_recalc_state), (gst_bin_get_state_func), (gst_bin_element_set_state), (gst_bin_change_state_func), (gst_bin_continue_func), (bin_bus_handler), (bin_push_state_continue), (bin_handle_async_start), (bin_handle_async_done), (gst_bin_handle_message_func): Make async state changes a bit smarter by using new ASYNC_START and ASYNC_DONE messages. This reduces the number of times we run the state recalculation thread. Don't change state of element with a pending ASYNC_START message. Deprecate STATE_DIRTY messages. * gst/gstelement.c: (gst_element_init), (gst_element_send_event), (gst_element_get_state_func), (gst_element_continue_state), (gst_element_lost_state), (gst_element_set_state_func), (gst_element_change_state): * gst/gstelement.h: Keep the state that was last set by the app in a new element field. Don't allow state changes when handling an element event. Post ASYNC_START and ASYNC_DONE messages. Change lost_state so that we go to PAUSED and wait for the parent to set us to PLAYING again (so latency calculation can be performed) Export gst_element_change_state() method so that subclasses can use it. API: gst_element_change_state() API: GST_STATE_TARGET * gst/gstpipeline.c: (gst_pipeline_class_init), (reset_stream_time), (gst_pipeline_change_state), (gst_pipeline_handle_message), (gst_pipeline_set_new_stream_time): Using the new ASYNC_START message we can reset the base_time when needed. This can then be used to implement base_time redistribution in flushing seeks so that we can remove the explicit seek handling. Perform latency query and configuration when going to PLAYING. * libs/gst/base/gstbasesink.c: (gst_base_sink_commit_state), (gst_base_sink_query), (gst_base_sink_change_state): Post new ASYNC_START/ASYNC_DONE messages. * tests/check/generic/sinks.c: (GST_START_TEST): Fix test because the bin will not set the async element to PLAYING right away. * tests/check/gst/gstbin.c: (pop_async_done), (GST_START_TEST): Make the message check a little stronger. Handle ASYNC messages. * tests/check/pipelines/cleanup.c: (GST_START_TEST): * tests/check/pipelines/simple-launch-lines.c: (GST_START_TEST): Expect ASYNC_DONE messages. --- ChangeLog | 56 ++ docs/gst/gstreamer-sections.txt | 2 + gst/gstbin.c | 606 +++++++++++++++++--- gst/gstelement.c | 93 ++- gst/gstelement.h | 21 +- gst/gstpipeline.c | 229 ++++---- libs/gst/base/gstbasesink.c | 65 ++- tests/check/generic/sinks.c | 2 +- tests/check/gst/gstbin.c | 44 +- tests/check/pipelines/cleanup.c | 5 +- tests/check/pipelines/simple-launch-lines.c | 16 +- 11 files changed, 879 insertions(+), 260 deletions(-) diff --git a/ChangeLog b/ChangeLog index e8034693e9..84ffe41c24 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,59 @@ +2007-03-19 Wim Taymans + + * docs/gst/gstreamer-sections.txt: + Add new element field and method. + + * gst/gstbin.c: (gst_bin_class_init), (gst_bin_init), + (bin_remove_messages), (gst_bin_add_func), (gst_bin_remove_func), + (gst_bin_recalc_state), (gst_bin_get_state_func), + (gst_bin_element_set_state), (gst_bin_change_state_func), + (gst_bin_continue_func), (bin_bus_handler), + (bin_push_state_continue), (bin_handle_async_start), + (bin_handle_async_done), (gst_bin_handle_message_func): + Make async state changes a bit smarter by using new ASYNC_START and + ASYNC_DONE messages. This reduces the number of times we run the state + recalculation thread. + Don't change state of element with a pending ASYNC_START message. + Deprecate STATE_DIRTY messages. + + * gst/gstelement.c: (gst_element_init), (gst_element_send_event), + (gst_element_get_state_func), (gst_element_continue_state), + (gst_element_lost_state), (gst_element_set_state_func), + (gst_element_change_state): + * gst/gstelement.h: + Keep the state that was last set by the app in a new element field. + Don't allow state changes when handling an element event. + Post ASYNC_START and ASYNC_DONE messages. + Change lost_state so that we go to PAUSED and wait for the parent to set + us to PLAYING again (so latency calculation can be performed) + Export gst_element_change_state() method so that subclasses can use it. + API: gst_element_change_state() + API: GST_STATE_TARGET + + * gst/gstpipeline.c: (gst_pipeline_class_init), + (reset_stream_time), (gst_pipeline_change_state), + (gst_pipeline_handle_message), (gst_pipeline_set_new_stream_time): + Using the new ASYNC_START message we can reset the base_time when + needed. This can then be used to implement base_time redistribution in + flushing seeks so that we can remove the explicit seek handling. + Perform latency query and configuration when going to PLAYING. + + * libs/gst/base/gstbasesink.c: (gst_base_sink_commit_state), + (gst_base_sink_query), (gst_base_sink_change_state): + Post new ASYNC_START/ASYNC_DONE messages. + + * tests/check/generic/sinks.c: (GST_START_TEST): + Fix test because the bin will not set the async element to PLAYING right + away. + + * tests/check/gst/gstbin.c: (pop_async_done), (GST_START_TEST): + Make the message check a little stronger. + Handle ASYNC messages. + + * tests/check/pipelines/cleanup.c: (GST_START_TEST): + * tests/check/pipelines/simple-launch-lines.c: (GST_START_TEST): + Expect ASYNC_DONE messages. + 2007-03-19 Wim Taymans * docs/gst/gstreamer-sections.txt: diff --git a/docs/gst/gstreamer-sections.txt b/docs/gst/gstreamer-sections.txt index d5bbdb6c2d..0d42b589d4 100644 --- a/docs/gst/gstreamer-sections.txt +++ b/docs/gst/gstreamer-sections.txt @@ -419,6 +419,7 @@ GST_STATE_GET_NEXT GST_STATE_NEXT GST_STATE_PENDING GST_STATE_RETURN +GST_STATE_TARGET GST_STATE_TRANSITION GST_STATE_TRANSITION_CURRENT GST_STATE_TRANSITION_NEXT @@ -500,6 +501,7 @@ gst_element_lost_state gst_element_state_get_name gst_element_state_change_return_get_name gst_element_sync_state_with_parent +gst_element_change_state gst_element_found_tags diff --git a/gst/gstbin.c b/gst/gstbin.c index 34c91b9884..9eab23aaf7 100644 --- a/gst/gstbin.c +++ b/gst/gstbin.c @@ -187,11 +187,12 @@ GST_ELEMENT_DETAILS ("Generic bin", static void gst_bin_dispose (GObject * object); -static void gst_bin_recalc_state (GstBin * bin, gboolean force); static GstStateChangeReturn gst_bin_change_state_func (GstElement * element, GstStateChange transition); static GstStateChangeReturn gst_bin_get_state_func (GstElement * element, GstState * state, GstState * pending, GstClockTime timeout); +static void bin_handle_async_done (GstBin * bin, GstMessage ** message); +static void bin_push_state_continue (GstBin * bin); static gboolean gst_bin_add_func (GstBin * bin, GstElement * element); static gboolean gst_bin_remove_func (GstBin * bin, GstElement * element); @@ -215,7 +216,7 @@ static void gst_bin_restore_thyself (GstObject * object, xmlNodePtr self); static void bin_remove_messages (GstBin * bin, GstObject * src, GstMessageType types); -static void gst_bin_recalc_func (GstBin * child, gpointer data); +static void gst_bin_continue_func (GstBin * child, gpointer data); static gint bin_element_is_sink (GstElement * child, GstBin * bin); static gint bin_element_is_src (GstElement * child, GstBin * bin); @@ -398,7 +399,7 @@ gst_bin_class_init (GstBinClass * klass) GST_DEBUG ("creating bin thread pool"); err = NULL; klass->pool = - g_thread_pool_new ((GFunc) gst_bin_recalc_func, NULL, -1, FALSE, &err); + g_thread_pool_new ((GFunc) gst_bin_continue_func, NULL, -1, FALSE, &err); if (err != NULL) { g_critical ("could alloc threadpool %s", err->message); } @@ -413,9 +414,8 @@ gst_bin_init (GstBin * bin) bin->children = NULL; bin->children_cookie = 0; bin->messages = NULL; - bin->polling = FALSE; - bin->state_dirty = FALSE; bin->provided_clock = NULL; + bin->state_dirty = FALSE; bin->clock_dirty = FALSE; /* Set up a bus for listening to child elements */ @@ -680,12 +680,13 @@ bin_remove_messages (GstBin * bin, GstObject * src, GstMessageType types) if (message_check (message, &find) == 0) { GST_DEBUG_OBJECT (GST_MESSAGE_SRC (message), - "deleting message of types %d", types); + "deleting message %p of types 0x%08x", message, types); bin->messages = g_list_delete_link (bin->messages, walk); gst_message_unref (message); } else { GST_DEBUG_OBJECT (GST_MESSAGE_SRC (message), - "not deleting message of type %d", GST_MESSAGE_TYPE (message)); + "not deleting message %p of type 0x%08x", message, + GST_MESSAGE_TYPE (message)); } } } @@ -803,6 +804,7 @@ gst_bin_add_func (GstBin * bin, GstElement * element) * that is not important right now. When the pipeline goes to PLAYING, * a new clock will be selected */ gst_element_set_clock (element, GST_ELEMENT_CLOCK (bin)); + GST_DEBUG_OBJECT (bin, "marking state dirty"); bin->state_dirty = TRUE; GST_OBJECT_UNLOCK (bin); @@ -848,7 +850,6 @@ had_parent: } } - /** * gst_bin_add: * @bin: a #GstBin @@ -906,7 +907,9 @@ gst_bin_remove_func (GstBin * bin, GstElement * element) gchar *elem_name; GstIterator *it; gboolean is_sink; - GstMessage *clock_message = NULL; + GstMessage *clock_message = NULL, *async_message = NULL, *smessage = NULL; + GList *walk, *next; + gboolean other_async = FALSE, this_async = FALSE, cont = FALSE; GST_OBJECT_LOCK (element); /* Check if the element is already being removed and immediately @@ -958,12 +961,58 @@ gst_bin_remove_func (GstBin * bin, GstElement * element) gst_message_new_clock_lost (GST_OBJECT_CAST (bin), bin->provided_clock); } + /* remove messages for the element, if there was a pending ASYNC_START + * message we must see if removing the element caused the bin to lose its + * async state. */ + for (walk = bin->messages; walk; walk = next) { + GstMessage *message = (GstMessage *) walk->data; + GstElement *src = GST_ELEMENT_CAST (GST_MESSAGE_SRC (message)); + + next = g_list_next (walk); + + if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ASYNC_START) { + if (src == element) + this_async = TRUE; + else + other_async = TRUE; + + GST_DEBUG_OBJECT (GST_MESSAGE_SRC (message), + "looking at message %p", message); + } + if (src == element) { + /* delete all message types */ + GST_DEBUG_OBJECT (GST_MESSAGE_SRC (message), + "deleting message %p of element \"%s\"", message, elem_name); + bin->messages = g_list_delete_link (bin->messages, walk); + gst_message_unref (message); + } + } + /* all other elements were not async and we removed the async one, + * post a ASYNC_DONE message because we are not async anymore now. */ + if (!other_async && this_async) { + GST_DEBUG_OBJECT (bin, "we removed the last async element"); + + cont = GST_OBJECT_PARENT (bin) == NULL; + if (!cont) { + bin_handle_async_done (bin, &smessage); + async_message = gst_message_new_async_done (GST_OBJECT_CAST (bin)); + } + } + GST_DEBUG_OBJECT (bin, "marking state dirty"); bin->state_dirty = TRUE; GST_OBJECT_UNLOCK (bin); - if (clock_message) { + if (clock_message) gst_element_post_message (GST_ELEMENT_CAST (bin), clock_message); - } + + if (smessage) + gst_element_post_message (GST_ELEMENT_CAST (bin), smessage); + + if (async_message) + gst_element_post_message (GST_ELEMENT_CAST (bin), async_message); + + if (cont) + bin_push_state_continue (bin); GST_CAT_INFO_OBJECT (GST_CAT_PARENTAGE, bin, "removed child \"%s\"", elem_name); @@ -1277,26 +1326,6 @@ gst_bin_iterate_sources (GstBin * bin) return result; } -/* - * MT safe - */ -static GstStateChangeReturn -gst_bin_get_state_func (GstElement * element, GstState * state, - GstState * pending, GstClockTime timeout) -{ - GstBin *bin = GST_BIN (element); - GstStateChangeReturn ret; - - GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "getting state"); - - /* do a non forced recalculation of the state */ - gst_bin_recalc_state (bin, FALSE); - - ret = parent_class->get_state (element, state, pending, timeout); - - return ret; -} - static void gst_bin_recalc_state (GstBin * bin, gboolean force) { @@ -1404,6 +1433,7 @@ restart: done: bin->polling = FALSE; + GST_STATE_RETURN (bin) = ret; GST_OBJECT_UNLOCK (bin); /* now we can take the state lock, it is possible that new elements @@ -1461,6 +1491,26 @@ unknown_state: } } +/* + * MT safe + */ +static GstStateChangeReturn +gst_bin_get_state_func (GstElement * element, GstState * state, + GstState * pending, GstClockTime timeout) +{ + GstBin *bin = GST_BIN (element); + GstStateChangeReturn ret; + + GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "getting state"); + + /* do a non forced recalculation of the state */ + gst_bin_recalc_state (bin, FALSE); + + ret = parent_class->get_state (element, state, pending, timeout); + + return ret; +} + /*********************************************** * Topologically sorted iterator * see http://en.wikipedia.org/wiki/Topological_sorting @@ -1746,10 +1796,13 @@ gst_bin_iterate_sorted (GstBin * bin) } static GstStateChangeReturn -gst_bin_element_set_state (GstBin * bin, GstElement * element, GstState pending) +gst_bin_element_set_state (GstBin * bin, GstElement * element, + GstClockTime base_time, GstState current, GstState next) { GstStateChangeReturn ret; gboolean locked; + GList *found; + MessageFind find; /* peel off the locked flag */ GST_OBJECT_LOCK (element); @@ -1757,18 +1810,53 @@ gst_bin_element_set_state (GstBin * bin, GstElement * element, GstState pending) GST_OBJECT_UNLOCK (element); /* skip locked elements */ - if (G_UNLIKELY (locked)) { - GST_DEBUG_OBJECT (element, - "element is locked, pretending state change succeeded"); - ret = GST_STATE_CHANGE_SUCCESS; - goto done; + if (G_UNLIKELY (locked)) + goto locked; + + /* the element was busy with an upwards async state change, we must wait for + * an ASYNC_DONE message before we attemp to change the state. */ + find.src = GST_OBJECT_CAST (element); + find.types = GST_MESSAGE_ASYNC_START; + + GST_OBJECT_LOCK (bin); + if ((found = g_list_find_custom (bin->messages, &find, + (GCompareFunc) message_check))) { + GstMessage *message = GST_MESSAGE_CAST (found->data); + GstObject *src = GST_MESSAGE_SRC (message); + + GST_DEBUG_OBJECT (element, "element message %p, %s async busy", + message, GST_ELEMENT_NAME (src)); + /* only wait for upward state changes */ + if (next > current) + goto was_busy; } + GST_OBJECT_UNLOCK (bin); + + GST_DEBUG_OBJECT (bin, + "setting element %s to %s, base_time %" GST_TIME_FORMAT, + GST_ELEMENT_NAME (element), gst_element_state_get_name (next), + GST_TIME_ARGS (base_time)); + + /* set base_time on child */ + gst_element_set_base_time (element, base_time); /* change state */ - ret = gst_element_set_state (element, pending); + ret = gst_element_set_state (element, next); -done: return ret; + +locked: + { + GST_DEBUG_OBJECT (element, + "element is locked, pretending state change succeeded"); + return GST_STATE_CHANGE_SUCCESS; + } +was_busy: + { + GST_DEBUG_OBJECT (element, "element is was busy delaying state change"); + GST_OBJECT_UNLOCK (bin); + return GST_STATE_CHANGE_ASYNC; + } } /* gst_iterator_fold functions for pads_activate @@ -1925,11 +2013,8 @@ restart: child = GST_ELEMENT_CAST (data); - /* set base_time on child */ - gst_element_set_base_time (child, base_time); - - /* set state now */ - ret = gst_bin_element_set_state (bin, child, next); + /* set state and base_time now */ + ret = gst_bin_element_set_state (bin, child, base_time, current, next); switch (ret) { case GST_STATE_CHANGE_SUCCESS: @@ -1939,11 +2024,13 @@ restart: gst_element_state_get_name (next)); break; case GST_STATE_CHANGE_ASYNC: + { GST_CAT_INFO_OBJECT (GST_CAT_STATES, element, "child '%s' is changing state asynchronously to %s", GST_ELEMENT_NAME (child), gst_element_state_get_name (next)); have_async = TRUE; break; + } case GST_STATE_CHANGE_FAILURE: GST_CAT_INFO_OBJECT (GST_CAT_STATES, element, "child '%s' failed to go to state %d(%s)", @@ -1991,10 +2078,11 @@ done: gst_iterator_free (it); GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element, - "done changing bin's state from %s to %s, now in %s, ret %d", + "done changing bin's state from %s to %s, now in %s, ret %s", gst_element_state_get_name (current), gst_element_state_get_name (next), - gst_element_state_get_name (GST_STATE (element)), ret); + gst_element_state_get_name (GST_STATE (element)), + gst_element_state_change_return_get_name (ret)); return ret; @@ -2067,20 +2155,137 @@ gst_bin_send_event (GstElement * element, GstEvent * event) } static void -gst_bin_recalc_func (GstBin * bin, gpointer data) +gst_bin_continue_func (GstBin * bin, gpointer data) { - GST_DEBUG_OBJECT (bin, "doing state recalc"); + GstState current, next, pending, target, old_state, old_next; + GstStateChangeReturn old_ret, ret; + GstStateChange transition; + gboolean busy, post; + + GST_DEBUG_OBJECT (bin, "waiting for state lock"); GST_STATE_LOCK (bin); - gst_bin_recalc_state (bin, FALSE); + + GST_DEBUG_OBJECT (bin, "doing state continue"); + GST_OBJECT_LOCK (bin); + + old_ret = GST_STATE_RETURN (bin); + GST_STATE_RETURN (bin) = GST_STATE_CHANGE_SUCCESS; + busy = (old_ret == GST_STATE_CHANGE_ASYNC); + target = GST_STATE_TARGET (bin); + pending = GST_STATE_PENDING (bin); + + /* check if there is something to commit */ + if (pending == GST_STATE_VOID_PENDING) + goto nothing_pending; + + old_state = GST_STATE (bin); + /* this is the state we should go to next */ + old_next = GST_STATE_NEXT (bin); + + GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "busy %d, target %s", + busy, gst_element_state_get_name (target)); + + if (old_next == GST_STATE_PLAYING) { + post = FALSE; + } else { + post = TRUE; + } + + /* we're going to PLAYING, always do the PAUSED->PLAYING state change */ + if (target == GST_STATE_PLAYING) { + next = GST_STATE_PAUSED; + GST_STATE_PENDING (bin) = pending = target; + } else { + next = old_next; + } + + /* update current state */ + current = GST_STATE (bin) = next; + + /* see if we reached the final state */ + if (pending == current) + goto complete; + + next = GST_STATE_GET_NEXT (current, pending); + transition = (GstStateChange) GST_STATE_TRANSITION (current, next); + + GST_STATE_NEXT (bin) = next; + /* mark busy */ + GST_STATE_RETURN (bin) = GST_STATE_CHANGE_ASYNC; + GST_OBJECT_UNLOCK (bin); + + if (post) { + GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, + "committing state from %s to %s, pending %s", + gst_element_state_get_name (old_state), + gst_element_state_get_name (old_next), + gst_element_state_get_name (pending)); + + gst_element_post_message (GST_ELEMENT_CAST (bin), + gst_message_new_state_changed (GST_OBJECT_CAST (bin), + old_state, old_next, pending)); + } + + if (busy) { + GST_DEBUG_OBJECT (bin, "posting ASYNC_DONE"); + gst_element_post_message (GST_ELEMENT_CAST (bin), + gst_message_new_async_done (GST_OBJECT_CAST (bin))); + } + + GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, + "continue state change %s to %s, final %s", + gst_element_state_get_name (current), + gst_element_state_get_name (next), gst_element_state_get_name (pending)); + + ret = gst_element_change_state (GST_ELEMENT_CAST (bin), transition); + +done: GST_STATE_UNLOCK (bin); - GST_DEBUG_OBJECT (bin, "state recalc done"); + GST_DEBUG_OBJECT (bin, "state continue done"); gst_object_unref (bin); + + return; + +nothing_pending: + { + GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "nothing pending"); + GST_OBJECT_UNLOCK (bin); + goto done; + } +complete: + { + GST_STATE_PENDING (bin) = GST_STATE_VOID_PENDING; + GST_STATE_NEXT (bin) = GST_STATE_VOID_PENDING; + + GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "completed state change to %s", + gst_element_state_get_name (pending)); + GST_OBJECT_UNLOCK (bin); + + /* don't post silly messages with the same state. This can happen + * when an element state is changed to what it already was. For bins + * this can be the result of a lost state, which we check with the + * previous return value. + * We do signal the cond though as a _get_state() might be blocking + * on it. */ + if (old_state != old_next || old_ret == GST_STATE_CHANGE_ASYNC) { + gst_element_post_message (GST_ELEMENT_CAST (bin), + gst_message_new_state_changed (GST_OBJECT_CAST (bin), + old_state, old_next, GST_STATE_VOID_PENDING)); + } + + GST_DEBUG_OBJECT (bin, "posting ASYNC_DONE"); + gst_element_post_message (GST_ELEMENT_CAST (bin), + gst_message_new_async_done (GST_OBJECT_CAST (bin))); + + GST_STATE_BROADCAST (bin); + + goto done; + } } static GstBusSyncReply bin_bus_handler (GstBus * bus, GstMessage * message, GstBin * bin) { - GstBinClass *bclass; bclass = GST_BIN_GET_CLASS (bin); @@ -2092,6 +2297,139 @@ bin_bus_handler (GstBus * bus, GstMessage * message, GstBin * bin) return GST_BUS_DROP; } +static void +bin_push_state_continue (GstBin * bin) +{ + GstBinClass *klass; + + /* mark the bin dirty */ + GST_OBJECT_LOCK (bin); + klass = GST_BIN_GET_CLASS (bin); + GST_DEBUG_OBJECT (bin, "pushing continue on thread pool"); + gst_object_ref (bin); + g_thread_pool_push (klass->pool, bin, NULL); + GST_OBJECT_UNLOCK (bin); +} + +/* an element started an async state change, if we were not busy with a state + * change, we perform a lost state. + */ +static void +bin_handle_async_start (GstBin * bin, GstMessage ** message) +{ + GstState old_state, new_state; + + if (GST_STATE_RETURN (bin) == GST_STATE_CHANGE_FAILURE) + goto had_error; + + if (GST_STATE_PENDING (bin) != GST_STATE_VOID_PENDING) + goto was_busy; + + old_state = GST_STATE (bin); + + if (old_state > GST_STATE_PAUSED) + new_state = GST_STATE_PAUSED; + else + new_state = old_state; + + GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin, + "lost state of %s, new %s", gst_element_state_get_name (old_state), + gst_element_state_get_name (new_state)); + + GST_STATE (bin) = new_state; + GST_STATE_NEXT (bin) = new_state; + GST_STATE_PENDING (bin) = new_state; + GST_STATE_RETURN (bin) = GST_STATE_CHANGE_ASYNC; + + *message = gst_message_new_state_changed (GST_OBJECT_CAST (bin), + new_state, new_state, new_state); + + return; + +had_error: + { + GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin, "we had an error"); + return; + } +was_busy: + { + GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin, "state change busy"); + return; + } +} + +static void +bin_handle_async_done (GstBin * bin, GstMessage ** message) +{ + GstState pending; + GstStateChangeReturn old_ret; + GstState old_state, old_next; + GstState current, next; + + old_ret = GST_STATE_RETURN (bin); + GST_STATE_RETURN (bin) = GST_STATE_CHANGE_SUCCESS; + pending = GST_STATE_PENDING (bin); + + /* check if there is something to commit */ + if (pending == GST_STATE_VOID_PENDING) + goto nothing_pending; + + old_state = GST_STATE (bin); + /* this is the state we should go to next */ + old_next = GST_STATE_NEXT (bin); + /* update current state */ + current = GST_STATE (bin) = old_next; + + /* see if we reached the final state */ + if (pending == current) + goto complete; + + next = GST_STATE_GET_NEXT (current, pending); + + GST_STATE_NEXT (bin) = next; + /* mark busy */ + GST_STATE_RETURN (bin) = GST_STATE_CHANGE_ASYNC; + + GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, + "committing state from %s to %s, pending %s", + gst_element_state_get_name (old_state), + gst_element_state_get_name (old_next), + gst_element_state_get_name (pending)); + + *message = gst_message_new_state_changed (GST_OBJECT_CAST (bin), + old_state, old_next, pending); + + return; + +nothing_pending: + { + GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "nothing pending"); + return; + } +complete: + { + GST_STATE_PENDING (bin) = GST_STATE_VOID_PENDING; + GST_STATE_NEXT (bin) = GST_STATE_VOID_PENDING; + + GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "completed state change"); + + /* don't post silly messages with the same state. This can happen + * when an element state is changed to what it already was. For bins + * this can be the result of a lost state, which we check with the + * previous return value. + * We do signal the cond though as a _get_state() might be blocking + * on it. */ + if (old_state != old_next || old_ret == GST_STATE_CHANGE_ASYNC) { + *message = gst_message_new_state_changed (GST_OBJECT_CAST (bin), + old_state, old_next, GST_STATE_VOID_PENDING); + } + + GST_STATE_BROADCAST (bin); + + return; + } +} + /* handle child messages: * * This method is called synchronously when a child posts a message on @@ -2139,15 +2477,29 @@ bin_bus_handler (GstBus * bus, GstMessage * message, GstBin * bin) * This message is never sent to the application but is forwarded to * the parent. * + * GST_MESSAGE_ASYNC_START: Create an internal ELEMENT message that stores + * the state of the element and the fact that the element will need a + * new base_time. + * + * GST_MESSAGE_ASYNC_DONE: Find the internal ELEMENT message we kept for the + * element when it posted ASYNC_START. If all elements are done, post a + * ASYNC_DONE message to the parent. + * * OTHER: post upwards. */ static void gst_bin_handle_message_func (GstBin * bin, GstMessage * message) { - GST_DEBUG_OBJECT (bin, "[msg %p] handling child message of type %s", - message, GST_MESSAGE_TYPE_NAME (message)); + GstObject *src; + GstMessageType type; - switch (GST_MESSAGE_TYPE (message)) { + src = GST_MESSAGE_SRC (message); + type = GST_MESSAGE_TYPE (message); + + GST_DEBUG_OBJECT (bin, "[msg %p] handling child %s message of type %s", + message, GST_ELEMENT_NAME (src), GST_MESSAGE_TYPE_NAME (message)); + + switch (type) { case GST_MESSAGE_EOS: { gboolean eos; @@ -2168,44 +2520,11 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message) } case GST_MESSAGE_STATE_DIRTY: { - GstObject *src; - GstBinClass *klass; - - src = GST_MESSAGE_SRC (message); - - GST_DEBUG_OBJECT (bin, "%s gave state dirty", GST_ELEMENT_NAME (src)); - - /* mark the bin dirty */ - GST_OBJECT_LOCK (bin); - GST_DEBUG_OBJECT (bin, "marking dirty"); - bin->state_dirty = TRUE; - - if (GST_OBJECT_PARENT (bin)) - goto not_toplevel; + GST_WARNING_OBJECT (bin, "received deprecated STATE_DIRTY message"); /* free message */ gst_message_unref (message); - - klass = GST_BIN_GET_CLASS (bin); - if (!bin->polling) { - GST_DEBUG_OBJECT (bin, "pushing recalc on thread pool"); - gst_object_ref (bin); - g_thread_pool_push (klass->pool, bin, NULL); - } else { - GST_DEBUG_OBJECT (bin, - "state recalc already in progress, not pushing new recalc"); - } - GST_OBJECT_UNLOCK (bin); break; - - /* non toplevel bins just forward the message and don't start - * a recalc themselves */ - not_toplevel: - { - GST_OBJECT_UNLOCK (bin); - GST_DEBUG_OBJECT (bin, "not toplevel, forwarding"); - goto forward; - } } case GST_MESSAGE_SEGMENT_START: GST_OBJECT_LOCK (bin); @@ -2311,6 +2630,117 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message) gst_message_unref (message); break; } + case GST_MESSAGE_ASYNC_START: + { + gboolean forward = FALSE, new_base_time; + GstState target; + GstMessage *smessage = NULL; + + GST_DEBUG_OBJECT (bin, "ASYNC_START message %p, %s", message, + GST_OBJECT_NAME (src)); + + gst_message_parse_async_start (message, &new_base_time); + + GST_OBJECT_LOCK (bin); + /* we ignore the message if we are going to <= READY */ + target = GST_STATE_TARGET (bin); + if (target <= GST_STATE_READY) + goto ignore_start_message; + + bin_replace_message (bin, message, GST_MESSAGE_ASYNC_START); + + bin_handle_async_start (bin, &smessage); + + /* prepare an ASYNC_START message */ + if (GST_OBJECT_PARENT (bin)) { + forward = TRUE; + message = + gst_message_new_async_start (GST_OBJECT_CAST (bin), new_base_time); + } else { + message = NULL; + } + GST_OBJECT_UNLOCK (bin); + + if (smessage) + gst_element_post_message (GST_ELEMENT_CAST (bin), smessage); + + if (forward) + goto forward; + else + break; + + ignore_start_message: + { + GST_DEBUG_OBJECT (bin, "ignoring message, target %s", + gst_element_state_get_name (target)); + GST_OBJECT_UNLOCK (bin); + gst_message_unref (message); + break; + } + } + case GST_MESSAGE_ASYNC_DONE: + { + gboolean done = FALSE, toplevel = FALSE; + GstState target; + MessageFind find; + GstMessage *smessage = NULL; + + GST_DEBUG_OBJECT (bin, "ASYNC_DONE message %p, %s", message, + GST_OBJECT_NAME (src)); + + GST_OBJECT_LOCK (bin); + target = GST_STATE_TARGET (bin); + /* ignore messages if we are shutting down */ + if (target <= GST_STATE_READY) + goto ignore_done_message; + + bin_replace_message (bin, message, GST_MESSAGE_ASYNC_START); + /* if there are no more ASYNC_START messages, everybody posted + * a ASYNC_DONE and we can post one on the bus. */ + + /* we don't care who still has a pending ASYNC_START */ + find.src = NULL; + find.types = GST_MESSAGE_ASYNC_START; + + if (!g_list_find_custom (bin->messages, &find, + (GCompareFunc) message_check)) { + /* nothing found, remove all old ASYNC_DONE messages */ + bin_remove_messages (bin, NULL, GST_MESSAGE_ASYNC_DONE); + done = TRUE; + toplevel = GST_OBJECT_PARENT (bin) == NULL; + if (!toplevel) + bin_handle_async_done (bin, &smessage); + } + GST_OBJECT_UNLOCK (bin); + + if (smessage) { + GST_DEBUG_OBJECT (bin, "posting state change message"); + gst_element_post_message (GST_ELEMENT_CAST (bin), smessage); + } + if (done) { + if (!toplevel) { + GST_DEBUG_OBJECT (bin, + "all async-done, posting ASYNC_DONE to parent"); + /* post our combined ASYNC_DONE when all is ASYNC_DONE. */ + gst_element_post_message (GST_ELEMENT_CAST (bin), + gst_message_new_async_done (GST_OBJECT_CAST (bin))); + } else { + /* toplevel, start continue state */ + GST_DEBUG_OBJECT (bin, "all async-done, starting state continue"); + bin_push_state_continue (bin); + } + } + break; + + ignore_done_message: + { + GST_DEBUG_OBJECT (bin, "ignoring message, target %s", + gst_element_state_get_name (target)); + GST_OBJECT_UNLOCK (bin); + gst_message_unref (message); + break; + } + } default: goto forward; } diff --git a/gst/gstelement.c b/gst/gstelement.c index be458ca5b4..30880f85d9 100644 --- a/gst/gstelement.c +++ b/gst/gstelement.c @@ -82,6 +82,7 @@ #include #include "gstelement.h" +#include "gstenumtypes.h" #include "gstbus.h" #include "gstmarshal.h" #include "gsterror.h" @@ -118,8 +119,6 @@ static void gst_element_base_class_finalize (gpointer g_class); static void gst_element_dispose (GObject * object); static void gst_element_finalize (GObject * object); -static GstStateChangeReturn gst_element_change_state (GstElement * element, - GstStateChange transition); static GstStateChangeReturn gst_element_change_state_func (GstElement * element, GstStateChange transition); static GstStateChangeReturn gst_element_get_state_func (GstElement * element, @@ -254,6 +253,7 @@ static void gst_element_init (GstElement * element) { GST_STATE (element) = GST_STATE_NULL; + GST_STATE_TARGET (element) = GST_STATE_NULL; GST_STATE_NEXT (element) = GST_STATE_VOID_PENDING; GST_STATE_PENDING (element) = GST_STATE_VOID_PENDING; GST_STATE_RETURN (element) = GST_STATE_CHANGE_SUCCESS; @@ -1295,6 +1295,7 @@ gst_element_send_event (GstElement * element, GstEvent * event) oclass = GST_ELEMENT_GET_CLASS (element); + GST_STATE_LOCK (element); if (oclass->send_event) { GST_CAT_DEBUG (GST_CAT_ELEMENT_PADS, "send %s event on element %s", GST_EVENT_TYPE_NAME (event), GST_ELEMENT_NAME (element)); @@ -1302,6 +1303,8 @@ gst_element_send_event (GstElement * element, GstEvent * event) } else { result = gst_element_default_send_event (element, event); } + GST_STATE_UNLOCK (element); + return result; } @@ -1762,6 +1765,9 @@ gst_element_get_state_func (GstElement * element, GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element, "RETURN is %s", gst_element_state_change_return_get_name (ret)); + GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element, "RETURN is %s", + gst_element_state_change_return_get_name (ret)); + /* we got an error, report immediatly */ if (ret == GST_STATE_CHANGE_FAILURE) goto done; @@ -1984,14 +1990,14 @@ nothing_aborted: GstStateChangeReturn gst_element_continue_state (GstElement * element, GstStateChangeReturn ret) { - GstState pending; - GstState old_ret, old_state, old_next; - GstState current, next; + GstStateChangeReturn old_ret; + GstState old_state, old_next; + GstState current, next, pending; GstMessage *message; GstStateChange transition; GST_OBJECT_LOCK (element); - old_ret = (GstState) GST_STATE_RETURN (element); + old_ret = GST_STATE_RETURN (element); GST_STATE_RETURN (element) = ret; pending = GST_STATE_PENDING (element); @@ -2047,7 +2053,8 @@ complete: GST_STATE_PENDING (element) = GST_STATE_VOID_PENDING; GST_STATE_NEXT (element) = GST_STATE_VOID_PENDING; - GST_CAT_INFO_OBJECT (GST_CAT_STATES, element, "completed state change"); + GST_CAT_INFO_OBJECT (GST_CAT_STATES, element, + "completed state change to %s", gst_element_state_get_name (pending)); GST_OBJECT_UNLOCK (element); /* don't post silly messages with the same state. This can happen @@ -2076,10 +2083,16 @@ complete: * element is copied to the pending state so that any call to * gst_element_get_state() will return %GST_STATE_CHANGE_ASYNC. * + * An ASYNC_START message is posted with an indication to distribute a new + * base_time to the element. + * If the element was PLAYING, it will go to PAUSED. The element + * will be restored to its PLAYING state by the parent pipeline when it + * prerolls again. + * * This is mostly used for elements that lost their preroll buffer - * in the %GST_STATE_PAUSED state after a flush, they become %GST_STATE_PAUSED - * again if a new preroll buffer is queued. - * This function can only be called when the element is currently + * in the %GST_STATE_PAUSED or %GST_STATE_PLAYING state after a flush, + * they will go to their pending state again when a new preroll buffer is + * queued. This function can only be called when the element is currently * not in error or an async state change. * * This function is used internally and should normally not be called from @@ -2090,7 +2103,7 @@ complete: void gst_element_lost_state (GstElement * element) { - GstState current_state; + GstState old_state, new_state; GstMessage *message; g_return_if_fail (GST_IS_ELEMENT (element)); @@ -2100,22 +2113,31 @@ gst_element_lost_state (GstElement * element) GST_STATE_RETURN (element) == GST_STATE_CHANGE_FAILURE) goto nothing_lost; - current_state = GST_STATE (element); + old_state = GST_STATE (element); + + /* when we were PLAYING, the new state is PAUSED. We will also not + * automatically go to PLAYING but let the parent bin(s) set us to PLAYING + * when we preroll. */ + if (old_state > GST_STATE_PAUSED) + new_state = GST_STATE_PAUSED; + else + new_state = old_state; GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element, - "lost state of %s", gst_element_state_get_name (current_state)); + "lost state of %s to %s", gst_element_state_get_name (old_state), + gst_element_state_get_name (new_state)); - GST_STATE_NEXT (element) = current_state; - GST_STATE_PENDING (element) = current_state; + GST_STATE (element) = new_state; + GST_STATE_NEXT (element) = new_state; + GST_STATE_PENDING (element) = new_state; GST_STATE_RETURN (element) = GST_STATE_CHANGE_ASYNC; GST_OBJECT_UNLOCK (element); message = gst_message_new_state_changed (GST_OBJECT_CAST (element), - current_state, current_state, current_state); + new_state, new_state, new_state); gst_element_post_message (element, message); - /* and mark us dirty */ - message = gst_message_new_state_dirty (GST_OBJECT_CAST (element)); + message = gst_message_new_async_start (GST_OBJECT_CAST (element), TRUE); gst_element_post_message (element, message); return; @@ -2200,7 +2222,9 @@ gst_element_set_state_func (GstElement * element, GstState state) /* increment state cookie so that we can track each state change */ element->state_cookie++; - /* this is the (new) state we should go to */ + /* this is the (new) state we should go to. TARGET is the last state we set on + * the element. */ + GST_STATE_TARGET (element) = state; GST_STATE_PENDING (element) = state; GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element, @@ -2271,8 +2295,19 @@ was_busy: } } -/* with STATE_LOCK */ -static GstStateChangeReturn +/** + * gst_element_change_state: + * @element: a #GstElement + * @transition: the requested transition + * + * Perform @transition on @element. + * + * This function must be called with STATE_LOCK held and is mainly used + * internally. + * + * Returns: the #GstStateChangeReturn of the state transition. + */ +GstStateChangeReturn gst_element_change_state (GstElement * element, GstStateChange transition) { GstElementClass *oclass; @@ -2300,22 +2335,26 @@ gst_element_change_state (GstElement * element, GstStateChange transition) gst_element_abort_state (element); break; case GST_STATE_CHANGE_ASYNC: + { + GstState target; + GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element, "element will change state ASYNC"); - /* if we go upwards, we give the app a change to wait for - * completion */ - if (current < next) + target = GST_STATE_TARGET (element); + + if (target > GST_STATE_READY) goto async; /* else we just continue the state change downwards */ GST_CAT_INFO_OBJECT (GST_CAT_STATES, element, - "forcing commit state %s < %s", - gst_element_state_get_name (current), - gst_element_state_get_name (next)); + "forcing commit state %s <= %s", + gst_element_state_get_name (target), + gst_element_state_get_name (GST_STATE_READY)); ret = gst_element_continue_state (element, GST_STATE_CHANGE_SUCCESS); break; + } case GST_STATE_CHANGE_SUCCESS: GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element, "element changed state SUCCESS"); diff --git a/gst/gstelement.h b/gst/gstelement.h index 9aad5e5151..d5181acb1f 100644 --- a/gst/gstelement.h +++ b/gst/gstelement.h @@ -119,6 +119,16 @@ typedef enum { */ #define GST_STATE_PENDING(elem) (GST_ELEMENT_CAST(elem)->pending_state) +/** + * GST_STATE_TARGET: + * @elem: a #GstElement to return the target state for. + * + * This macro returns the target #GstState of the element. + * + * Since: 0.10.13 + */ +#define GST_STATE_TARGET(elem) (GST_ELEMENT_CAST(elem)->abidata.ABI.target_state) + /** * GST_STATE_RETURN: * @elem: a #GstElement to return the last state result for. @@ -427,7 +437,14 @@ struct _GstElement guint32 pads_cookie; /*< private >*/ - gpointer _gst_reserved[GST_PADDING]; + union { + struct { + /* state set by application */ + GstState target_state; + } ABI; + /* adding + 0 to mark ABI change to be undone later */ + gpointer _gst_reserved[GST_PADDING + 0]; + } abidata; }; /** @@ -626,6 +643,8 @@ GstStateChangeReturn gst_element_get_state (GstElement * element, GstStateChangeReturn gst_element_set_state (GstElement *element, GstState state); void gst_element_abort_state (GstElement * element); +GstStateChangeReturn gst_element_change_state (GstElement * element, + GstStateChange transition); GstStateChangeReturn gst_element_continue_state (GstElement * element, GstStateChangeReturn ret); void gst_element_lost_state (GstElement * element); diff --git a/gst/gstpipeline.c b/gst/gstpipeline.c index a28f5ec609..215078eed7 100644 --- a/gst/gstpipeline.c +++ b/gst/gstpipeline.c @@ -120,6 +120,8 @@ struct _GstPipelinePrivate { /* with LOCK */ gboolean auto_flush_bus; + + GstClockTime new_stream_time; }; @@ -133,13 +135,12 @@ static void gst_pipeline_set_property (GObject * object, guint prop_id, static void gst_pipeline_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); -static gboolean gst_pipeline_send_event (GstElement * element, - GstEvent * event); - static GstClock *gst_pipeline_provide_clock_func (GstElement * element); static GstStateChangeReturn gst_pipeline_change_state (GstElement * element, GstStateChange transition); +static void gst_pipeline_handle_message (GstBin * bin, GstMessage * message); + static GstBinClass *parent_class = NULL; /* static guint gst_pipeline_signals[LAST_SIGNAL] = { 0 }; */ @@ -185,6 +186,7 @@ gst_pipeline_class_init (gpointer g_class, gpointer class_data) { GObjectClass *gobject_class = G_OBJECT_CLASS (g_class); GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class); + GstBinClass *gstbin_class = GST_BIN_CLASS (g_class); GstPipelineClass *klass = GST_PIPELINE_CLASS (g_class); parent_class = g_type_class_peek_parent (klass); @@ -226,11 +228,13 @@ gst_pipeline_class_init (gpointer g_class, gpointer class_data) gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_pipeline_dispose); - gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_pipeline_send_event); gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_pipeline_change_state); gstelement_class->provide_clock = GST_DEBUG_FUNCPTR (gst_pipeline_provide_clock_func); + + gstbin_class->handle_message = + GST_DEBUG_FUNCPTR (gst_pipeline_handle_message); } static void @@ -311,90 +315,19 @@ gst_pipeline_get_property (GObject * object, guint prop_id, } } -/* default pipeline seeking code: - * - * If the pipeline is PLAYING and a flushing seek is done, set - * the pipeline to PAUSED before doing the seek. - * - * A flushing seek also resets the stream time to 0 so that when - * we go back to PLAYING after the seek, the base_time is recalculated - * and redistributed to the elements. - */ -static gboolean -do_pipeline_seek (GstElement * element, GstEvent * event) +/* set the stream time to 0 */ +static void +reset_stream_time (GstPipeline * pipeline) { - gdouble rate; - GstSeekFlags flags; - gboolean flush; - gboolean was_playing = FALSE; - gboolean res; - - /* we are only interested in the FLUSH flag of the seek event. */ - gst_event_parse_seek (event, &rate, NULL, &flags, NULL, NULL, NULL, NULL); - - flush = flags & GST_SEEK_FLAG_FLUSH; - - /* if flushing seek, get the current state */ - if (flush) { - GstState state; - - /* need to call _get_state() since a bin state is only updated - * with this call. */ - gst_element_get_state (element, &state, NULL, 0); - was_playing = (state == GST_STATE_PLAYING); - - if (was_playing) { - /* and PAUSE when the pipeline was PLAYING, we don't need - * to wait for the state change to complete since we are going - * to flush out any preroll sample anyway. */ - gst_element_set_state (element, GST_STATE_PAUSED); - } + GST_OBJECT_LOCK (pipeline); + if (pipeline->stream_time != GST_CLOCK_TIME_NONE) { + GST_DEBUG_OBJECT (pipeline, "reset stream_time to 0"); + pipeline->stream_time = 0; + pipeline->priv->new_stream_time = TRUE; + } else { + GST_DEBUG_OBJECT (pipeline, "application asked to not reset stream_time"); } - - /* let parent class implement the seek behaviour */ - res = GST_ELEMENT_CLASS (parent_class)->send_event (element, event); - - /* if flushing seek restore previous state */ - if (flush) { - gboolean need_reset; - - GST_OBJECT_LOCK (element); - need_reset = GST_PIPELINE (element)->stream_time != GST_CLOCK_TIME_NONE; - GST_OBJECT_UNLOCK (element); - - /* need to reset the stream time to 0 after a successfull flushing seek, - * unless the user explicitly disabled this behavior by setting stream - * time to NONE */ - if (need_reset && res) - gst_pipeline_set_new_stream_time (GST_PIPELINE (element), 0); - - if (was_playing) - /* and continue playing, this might return ASYNC in which case the - * application can wait for the PREROLL to complete after the seek. - */ - gst_element_set_state (element, GST_STATE_PLAYING); - } - return res; -} - -static gboolean -gst_pipeline_send_event (GstElement * element, GstEvent * event) -{ - gboolean res; - GstEventType event_type = GST_EVENT_TYPE (event); - - switch (event_type) { - case GST_EVENT_SEEK: - /* do the default seek handling */ - res = do_pipeline_seek (element, event); - break; - default: - /* else parent implements the defaults */ - res = GST_ELEMENT_CLASS (parent_class)->send_event (element, event); - break; - } - - return res; + GST_OBJECT_UNLOCK (pipeline); } /** @@ -433,8 +366,11 @@ gst_pipeline_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_PAUSED_TO_PLAYING: { GstClockTime new_base_time; + GstQuery *query; + GstClockTime min_latency, max_latency; GstClockTime start_time, stream_time, delay; - gboolean new_clock; + gboolean new_clock, update; + gboolean res; GST_DEBUG_OBJECT (element, "selecting clock and base_time"); @@ -452,6 +388,8 @@ gst_pipeline_change_state (GstElement * element, GstStateChange transition) GST_OBJECT_LOCK (element); new_clock = element->clock != clock; stream_time = pipeline->stream_time; + update = pipeline->priv->new_stream_time; + pipeline->priv->new_stream_time = FALSE; delay = pipeline->delay; GST_OBJECT_UNLOCK (element); @@ -468,25 +406,68 @@ gst_pipeline_change_state (GstElement * element, GstStateChange transition) gst_message_new_new_clock (GST_OBJECT_CAST (element), clock)); } - if (stream_time != GST_CLOCK_TIME_NONE - && start_time != GST_CLOCK_TIME_NONE) { - new_base_time = start_time - stream_time + delay; - GST_DEBUG_OBJECT (element, - "stream_time=%" GST_TIME_FORMAT ", now=%" GST_TIME_FORMAT - ", base_time %" GST_TIME_FORMAT, - GST_TIME_ARGS (stream_time), GST_TIME_ARGS (start_time), - GST_TIME_ARGS (new_base_time)); - } else - new_base_time = GST_CLOCK_TIME_NONE; - if (clock) gst_object_unref (clock); - if (new_base_time != GST_CLOCK_TIME_NONE) - gst_element_set_base_time (element, new_base_time); - else + /* stream time changed, either with a PAUSED or a flush, we need to update + * the base time */ + if (update) { + GST_DEBUG_OBJECT (pipeline, "stream_time changed, updating base time"); + + if (stream_time != GST_CLOCK_TIME_NONE + && start_time != GST_CLOCK_TIME_NONE) { + new_base_time = start_time - stream_time + delay; + GST_DEBUG_OBJECT (element, + "stream_time=%" GST_TIME_FORMAT ", now=%" GST_TIME_FORMAT + ", base_time %" GST_TIME_FORMAT, + GST_TIME_ARGS (stream_time), GST_TIME_ARGS (start_time), + GST_TIME_ARGS (new_base_time)); + } else + new_base_time = GST_CLOCK_TIME_NONE; + + if (new_base_time != GST_CLOCK_TIME_NONE) + gst_element_set_base_time (element, new_base_time); + else + GST_DEBUG_OBJECT (pipeline, + "NOT adjusting base_time because stream_time is NONE"); + } else { GST_DEBUG_OBJECT (pipeline, - "NOT adjusting base time because stream time is NONE"); + "NOT adjusting base_time because we selected one before"); + } + + /* determine latency in this pipeline */ + GST_DEBUG_OBJECT (element, "querying pipeline latency"); + query = gst_query_new_latency (); + if (gst_element_query (element, query)) { + gboolean live; + + gst_query_parse_latency (query, &live, &min_latency, &max_latency); + + GST_DEBUG_OBJECT (element, + "configuring min latency %" GST_TIME_FORMAT ", max latency %" + GST_TIME_FORMAT ", live %d", GST_TIME_ARGS (min_latency), + GST_TIME_ARGS (max_latency), live); + + /* configure latency on elements */ + res = + gst_element_send_event (element, + gst_event_new_latency (min_latency)); + if (res) { + GST_INFO_OBJECT (element, "configured latency of %" GST_TIME_FORMAT, + GST_TIME_ARGS (min_latency)); + } else { + GST_WARNING_OBJECT (element, + "failed to configure latency of %" GST_TIME_FORMAT, + GST_TIME_ARGS (min_latency)); + GST_ELEMENT_WARNING (element, CORE, CLOCK, (NULL), + ("Failed to configure latency of %" GST_TIME_FORMAT, + GST_TIME_ARGS (min_latency))); + } + } else { + /* this is not a real problem, we just don't configure any latency. */ + GST_WARNING_OBJECT (element, "failed to query pipeline latency"); + } + gst_query_unref (query); break; } case GST_STATE_CHANGE_PLAYING_TO_PAUSED: @@ -503,17 +484,7 @@ gst_pipeline_change_state (GstElement * element, GstStateChange transition) break; case GST_STATE_CHANGE_READY_TO_PAUSED: { - gboolean need_reset; - - /* only reset the stream_time when the application did not - * specify a stream_time explicitly */ - GST_OBJECT_LOCK (element); - need_reset = pipeline->stream_time != GST_CLOCK_TIME_NONE; - GST_OBJECT_UNLOCK (element); - - if (need_reset) - gst_pipeline_set_new_stream_time (pipeline, 0); - + reset_stream_time (pipeline); break; } case GST_STATE_CHANGE_PAUSED_TO_PLAYING: @@ -532,8 +503,11 @@ gst_pipeline_change_state (GstElement * element, GstStateChange transition) GST_OBJECT_LOCK (element); /* store the current stream time */ - if (pipeline->stream_time != GST_CLOCK_TIME_NONE) + if (pipeline->stream_time != GST_CLOCK_TIME_NONE) { pipeline->stream_time = now - element->base_time; + pipeline->priv->new_stream_time = TRUE; + } + GST_DEBUG_OBJECT (element, "stream_time=%" GST_TIME_FORMAT ", now=%" GST_TIME_FORMAT ", base_time %" GST_TIME_FORMAT, @@ -574,6 +548,32 @@ invalid_clock: } } +static void +gst_pipeline_handle_message (GstBin * bin, GstMessage * message) +{ + GstPipeline *pipeline = GST_PIPELINE_CAST (bin); + + switch (GST_MESSAGE_TYPE (message)) { + case GST_MESSAGE_ASYNC_START: + { + gboolean new_base_time; + + gst_message_parse_async_start (message, &new_base_time); + + /* reset our stream time if we need to distribute a new base_time to the + * children. */ + if (new_base_time) + reset_stream_time (pipeline); + + break; + } + default: + break; + } + GST_BIN_CLASS (parent_class)->handle_message (bin, message); +} + + /** * gst_pipeline_get_bus: * @pipeline: a #GstPipeline @@ -614,6 +614,7 @@ gst_pipeline_set_new_stream_time (GstPipeline * pipeline, GstClockTime time) GST_OBJECT_LOCK (pipeline); pipeline->stream_time = time; + pipeline->priv->new_stream_time = TRUE; GST_OBJECT_UNLOCK (pipeline); GST_DEBUG_OBJECT (pipeline, "set new stream_time to %" GST_TIME_FORMAT, diff --git a/libs/gst/base/gstbasesink.c b/libs/gst/base/gstbasesink.c index ca9da77ab7..3ac30de8f9 100644 --- a/libs/gst/base/gstbasesink.c +++ b/libs/gst/base/gstbasesink.c @@ -192,6 +192,8 @@ struct _GstBaseSinkPrivate /* latency stuff */ GstClockTime latency; + + gboolean commited; }; #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size)) @@ -888,9 +890,10 @@ gst_base_sink_commit_state (GstBaseSink * basesink) { /* commit state and proceed to next pending state */ GstState current, next, pending, post_pending; - GstMessage *message; gboolean post_paused = FALSE; + gboolean post_async_done = FALSE; gboolean post_playing = FALSE; + gboolean sync; /* we are certainly not playing async anymore now */ basesink->playing_async = FALSE; @@ -900,6 +903,7 @@ gst_base_sink_commit_state (GstBaseSink * basesink) next = GST_STATE_NEXT (basesink); pending = GST_STATE_PENDING (basesink); post_pending = pending; + sync = basesink->sync; switch (pending) { case GST_STATE_PLAYING: @@ -912,6 +916,8 @@ gst_base_sink_commit_state (GstBaseSink * basesink) GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING"); basesink->need_preroll = FALSE; + post_async_done = TRUE; + basesink->priv->commited = TRUE; post_playing = TRUE; /* post PAUSED too when we were READY */ if (current == GST_STATE_READY) { @@ -929,6 +935,8 @@ gst_base_sink_commit_state (GstBaseSink * basesink) case GST_STATE_PAUSED: GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED"); post_paused = TRUE; + post_async_done = TRUE; + basesink->priv->commited = TRUE; post_pending = GST_STATE_VOID_PENDING; break; case GST_STATE_READY: @@ -947,19 +955,18 @@ gst_base_sink_commit_state (GstBaseSink * basesink) GST_OBJECT_UNLOCK (basesink); if (post_paused) { - message = gst_message_new_state_changed (GST_OBJECT_CAST (basesink), - current, next, post_pending); - gst_element_post_message (GST_ELEMENT_CAST (basesink), message); + gst_element_post_message (GST_ELEMENT_CAST (basesink), + gst_message_new_state_changed (GST_OBJECT_CAST (basesink), + current, next, post_pending)); + } + if (post_async_done) { + gst_element_post_message (GST_ELEMENT_CAST (basesink), + gst_message_new_async_done (GST_OBJECT_CAST (basesink))); } if (post_playing) { - message = gst_message_new_state_changed (GST_OBJECT_CAST (basesink), - next, pending, GST_STATE_VOID_PENDING); - gst_element_post_message (GST_ELEMENT_CAST (basesink), message); - } - /* and mark dirty */ - if (post_paused || post_playing) { gst_element_post_message (GST_ELEMENT_CAST (basesink), - gst_message_new_state_dirty (GST_OBJECT_CAST (basesink))); + gst_message_new_state_changed (GST_OBJECT_CAST (basesink), + next, pending, GST_STATE_VOID_PENDING)); } GST_STATE_BROADCAST (basesink); @@ -2724,6 +2731,13 @@ gst_base_sink_query (GstElement * element, GstQuery * query) gboolean live, us_live; GstClockTime min, max; + GST_PAD_PREROLL_LOCK (basesink->sinkpad); + if (!gst_base_sink_is_prerolled (basesink)) { + GST_DEBUG_OBJECT (basesink, "not prerolled yet, can't report latency"); + res = FALSE; + goto done; + } + if ((res = gst_base_sink_query_latency (basesink, &live, &us_live, &min, &max))) { @@ -2737,6 +2751,8 @@ gst_base_sink_query (GstElement * element, GstQuery * query) } gst_query_set_latency (query, live, min, max); } + done: + GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); break; } case GST_QUERY_JITTER: @@ -2780,6 +2796,7 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_READY_TO_PAUSED: /* need to complete preroll before this state change completes, there * is no data flow in READY so we can safely assume we need to preroll. */ + GST_PAD_PREROLL_LOCK (basesink->sinkpad); GST_DEBUG_OBJECT (basesink, "READY to PAUSED, need preroll"); gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED); gst_segment_init (basesink->abidata.ABI.clip_segment, @@ -2795,7 +2812,11 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition) basesink->priv->latency = 0; basesink->eos = FALSE; gst_base_sink_reset_qos (basesink); + basesink->priv->commited = FALSE; ret = GST_STATE_CHANGE_ASYNC; + gst_element_post_message (GST_ELEMENT_CAST (basesink), + gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE)); + GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: GST_PAD_PREROLL_LOCK (basesink->sinkpad); @@ -2817,7 +2838,10 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition) GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, need preroll"); basesink->need_preroll = TRUE; basesink->playing_async = TRUE; + basesink->priv->commited = FALSE; ret = GST_STATE_CHANGE_ASYNC; + gst_element_post_message (GST_ELEMENT_CAST (basesink), + gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE)); } GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); break; @@ -2857,7 +2881,10 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition) } else { GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, need preroll"); basesink->playing_async = TRUE; + basesink->priv->commited = FALSE; ret = GST_STATE_CHANGE_ASYNC; + gst_element_post_message (GST_ELEMENT_CAST (basesink), + gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE)); } GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT ", dropped: %" G_GUINT64_FORMAT, basesink->priv->rendered, @@ -2867,8 +2894,24 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition) GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); break; case GST_STATE_CHANGE_PAUSED_TO_READY: + GST_PAD_PREROLL_LOCK (basesink->sinkpad); + if (!basesink->priv->commited) { + GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done"); + + gst_element_post_message (GST_ELEMENT_CAST (basesink), + gst_message_new_state_changed (GST_OBJECT_CAST (basesink), + GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY)); + + gst_element_post_message (GST_ELEMENT_CAST (basesink), + gst_message_new_async_done (GST_OBJECT_CAST (basesink))); + + basesink->priv->commited = TRUE; + } else { + GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll"); + } basesink->priv->current_sstart = 0; basesink->priv->current_sstop = 0; + GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); break; case GST_STATE_CHANGE_READY_TO_NULL: if (bclass->stop) diff --git a/tests/check/generic/sinks.c b/tests/check/generic/sinks.c index 7cf5bea4c9..bacfd7f014 100644 --- a/tests/check/generic/sinks.c +++ b/tests/check/generic/sinks.c @@ -313,7 +313,7 @@ GST_START_TEST (test_livesrc_sink) if (n_sink == 2) { fail_unless (old == GST_STATE_READY, "unexpected old"); fail_unless (new == GST_STATE_PAUSED, "unexpected new"); - fail_unless (pending == GST_STATE_PLAYING, "unexpected pending"); + fail_unless (pending == GST_STATE_VOID_PENDING, "unexpected pending"); } else if (n_sink == 1) { fail_unless (old == GST_STATE_PAUSED, "unexpected old"); fail_unless (new == GST_STATE_PLAYING, "unexpected new"); diff --git a/tests/check/gst/gstbin.c b/tests/check/gst/gstbin.c index 61612b39ae..4843717e5f 100644 --- a/tests/check/gst/gstbin.c +++ b/tests/check/gst/gstbin.c @@ -22,6 +22,21 @@ #include +static void +pop_async_done (GstBus * bus) +{ + GstMessage *message; + + GST_DEBUG ("popping async-done message"); + message = gst_bus_poll (bus, GST_MESSAGE_ASYNC_DONE, -1); + + fail_unless (message && GST_MESSAGE_TYPE (message) + == GST_MESSAGE_ASYNC_DONE, "did not get GST_MESSAGE_ASYNC_DONE"); + + gst_message_unref (message); + GST_DEBUG ("popped message"); +} + static void pop_messages (GstBus * bus, int count) { @@ -277,7 +292,7 @@ GST_START_TEST (test_message_state_changed_children) fail_unless (pending == GST_STATE_VOID_PENDING); /* wait for async thread to settle down */ - while (GST_OBJECT_REFCOUNT_VALUE (pipeline) > 2) + while (GST_OBJECT_REFCOUNT_VALUE (pipeline) > 3) THREAD_SWITCH (); /* each object is referenced by a message; @@ -291,6 +306,7 @@ GST_START_TEST (test_message_state_changed_children) ASSERT_OBJECT_REFCOUNT_BETWEEN (pipeline, "pipeline", 2, 3); pop_messages (bus, 3); + pop_async_done (bus); fail_if ((gst_bus_pop (bus)) != NULL); ASSERT_OBJECT_REFCOUNT (bus, "bus", 2); @@ -390,6 +406,7 @@ GST_START_TEST (test_watch_for_state_change) fail_unless (ret == GST_STATE_CHANGE_SUCCESS); pop_messages (bus, 6); + pop_async_done (bus); fail_unless (gst_bus_have_pending (bus) == FALSE, "Unexpected messages on bus"); @@ -400,10 +417,12 @@ GST_START_TEST (test_watch_for_state_change) pop_messages (bus, 3); /* this one might return either SUCCESS or ASYNC, likely SUCCESS */ - gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PAUSED); + ret = gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PAUSED); gst_element_get_state (GST_ELEMENT (bin), NULL, NULL, GST_CLOCK_TIME_NONE); pop_messages (bus, 3); + if (ret == GST_STATE_CHANGE_ASYNC) + pop_async_done (bus); fail_unless (gst_bus_have_pending (bus) == FALSE, "Unexpected messages on bus"); @@ -521,6 +540,7 @@ GST_START_TEST (test_children_state_change_order_flagged_sink) src = gst_element_factory_make ("fakesrc", NULL); fail_if (src == NULL, "Could not create fakesrc"); + g_object_set (src, "num-buffers", 5, NULL); identity = gst_element_factory_make ("identity", NULL); fail_if (identity == NULL, "Could not create identity"); @@ -552,6 +572,7 @@ GST_START_TEST (test_children_state_change_order_flagged_sink) /* READY => PAUSED */ /* because of pre-rolling, sink will return ASYNC on state * change and change state later when it has a buffer */ + GST_DEBUG ("popping READY -> PAUSED messages"); ASSERT_STATE_CHANGE_MSG (bus, identity, GST_STATE_READY, GST_STATE_PAUSED, 105); #if 0 @@ -561,20 +582,21 @@ GST_START_TEST (test_children_state_change_order_flagged_sink) * in which case the sink will commit its new state before the source ... */ ASSERT_STATE_CHANGE_MSG (bus, src, GST_STATE_READY, GST_STATE_PAUSED, 106); ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_READY, GST_STATE_PAUSED, 107); +#else + + pop_messages (bus, 2); /* pop remaining ready => paused messages off the bus */ ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_READY, GST_STATE_PAUSED, 108); - + pop_async_done (bus); +#endif /* PAUSED => PLAYING */ + GST_DEBUG ("popping PAUSED -> PLAYING messages"); ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_PAUSED, GST_STATE_PLAYING, 109); ASSERT_STATE_CHANGE_MSG (bus, identity, GST_STATE_PAUSED, GST_STATE_PLAYING, 110); ASSERT_STATE_CHANGE_MSG (bus, src, GST_STATE_PAUSED, GST_STATE_PLAYING, 111); ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_PAUSED, GST_STATE_PLAYING, 112); -#else - pop_messages (bus, 3); /* pop remaining ready => paused messages off the bus */ - pop_messages (bus, 4); /* pop paused => playing messages off the bus */ -#endif /* don't set to NULL that will set the bus flushing and kill our messages */ ret = gst_element_set_state (pipeline, GST_STATE_READY); @@ -656,6 +678,7 @@ GST_START_TEST (test_children_state_change_order_semi_sink) /* READY => PAUSED */ /* because of pre-rolling, sink will return ASYNC on state * change and change state later when it has a buffer */ + GST_DEBUG ("popping READY -> PAUSED messages"); ASSERT_STATE_CHANGE_MSG (bus, identity, GST_STATE_READY, GST_STATE_PAUSED, 205); #if 0 @@ -665,19 +688,20 @@ GST_START_TEST (test_children_state_change_order_semi_sink) * in which case the sink will commit its new state before the source ... */ ASSERT_STATE_CHANGE_MSG (bus, src, GST_STATE_READY, GST_STATE_PAUSED, 206); ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_READY, GST_STATE_PAUSED, 207); +#else + pop_messages (bus, 2); /* pop remaining ready => paused messages off the bus */ ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_READY, GST_STATE_PAUSED, 208); + pop_async_done (bus); /* PAUSED => PLAYING */ + GST_DEBUG ("popping PAUSED -> PLAYING messages"); ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_PAUSED, GST_STATE_PLAYING, 209); ASSERT_STATE_CHANGE_MSG (bus, identity, GST_STATE_PAUSED, GST_STATE_PLAYING, 210); ASSERT_STATE_CHANGE_MSG (bus, src, GST_STATE_PAUSED, GST_STATE_PLAYING, 211); ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_PAUSED, GST_STATE_PLAYING, 212); -#else - pop_messages (bus, 3); /* pop remaining ready => paused messages off the bus */ - pop_messages (bus, 4); /* pop paused => playing messages off the bus */ #endif /* don't set to NULL that will set the bus flushing and kill our messages */ diff --git a/tests/check/pipelines/cleanup.c b/tests/check/pipelines/cleanup.c index ca7f19a6a0..eeafe907ce 100644 --- a/tests/check/pipelines/cleanup.c +++ b/tests/check/pipelines/cleanup.c @@ -88,8 +88,9 @@ GST_START_TEST (test_pipeline_unref) sink = gst_bin_get_by_name (GST_BIN (pipeline), "sink"); fail_if (sink == NULL); - run_pipeline (pipeline, s, GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED, - GST_MESSAGE_EOS); + run_pipeline (pipeline, s, + GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED | + GST_MESSAGE_ASYNC_DONE, GST_MESSAGE_EOS); while (GST_OBJECT_REFCOUNT_VALUE (src) > 1) THREAD_SWITCH (); ASSERT_OBJECT_REFCOUNT (src, "src", 1); diff --git a/tests/check/pipelines/simple-launch-lines.c b/tests/check/pipelines/simple-launch-lines.c index 630637bf3a..8c1306d172 100644 --- a/tests/check/pipelines/simple-launch-lines.c +++ b/tests/check/pipelines/simple-launch-lines.c @@ -97,24 +97,28 @@ GST_START_TEST (test_2_elements) s = "fakesrc can-activate-push=false ! fakesink can-activate-pull=true"; run_pipeline (setup_pipeline (s), s, - GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_UNKNOWN); + GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED | + GST_MESSAGE_ASYNC_DONE, GST_MESSAGE_UNKNOWN); s = "fakesrc can-activate-push=true ! fakesink can-activate-pull=false"; run_pipeline (setup_pipeline (s), s, - GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_UNKNOWN); + GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED | + GST_MESSAGE_ASYNC_DONE, GST_MESSAGE_UNKNOWN); s = "fakesrc can-activate-push=false num-buffers=10 ! fakesink can-activate-pull=true"; run_pipeline (setup_pipeline (s), s, - GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_EOS); + GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED | + GST_MESSAGE_ASYNC_DONE, GST_MESSAGE_EOS); s = "fakesrc can-activate-push=true num-buffers=10 ! fakesink can-activate-pull=false"; run_pipeline (setup_pipeline (s), s, - GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_EOS); + GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED | + GST_MESSAGE_ASYNC_DONE, GST_MESSAGE_EOS); s = "fakesrc can-activate-push=false ! fakesink can-activate-pull=false"; ASSERT_CRITICAL (run_pipeline (setup_pipeline (s), s, - GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED, - GST_MESSAGE_UNKNOWN)); + GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED | + GST_MESSAGE_ASYNC_DONE, GST_MESSAGE_UNKNOWN)); } GST_END_TEST;