docs/gst/gstreamer-sections.txt: Add new element field and method.

Original commit message from CVS:
* docs/gst/gstreamer-sections.txt:
Add new element field and method.
* gst/gstbin.c: (gst_bin_class_init), (gst_bin_init),
(bin_remove_messages), (gst_bin_add_func), (gst_bin_remove_func),
(gst_bin_recalc_state), (gst_bin_get_state_func),
(gst_bin_element_set_state), (gst_bin_change_state_func),
(gst_bin_continue_func), (bin_bus_handler),
(bin_push_state_continue), (bin_handle_async_start),
(bin_handle_async_done), (gst_bin_handle_message_func):
Make async state changes a bit smarter by using new ASYNC_START and
ASYNC_DONE messages. This reduces the number of times we run the state
recalculation thread.
Don't change state of element with a pending ASYNC_START message.
Deprecate STATE_DIRTY messages.
* gst/gstelement.c: (gst_element_init), (gst_element_send_event),
(gst_element_get_state_func), (gst_element_continue_state),
(gst_element_lost_state), (gst_element_set_state_func),
(gst_element_change_state):
* gst/gstelement.h:
Keep the state that was last set by the app in a new element field.
Don't allow state changes when handling an element event.
Post ASYNC_START and ASYNC_DONE messages.
Change lost_state so that we go to PAUSED and wait for the parent to set
us to PLAYING again (so latency calculation can be performed)
Export gst_element_change_state() method so that subclasses can use it.
API: gst_element_change_state()
API: GST_STATE_TARGET
* gst/gstpipeline.c: (gst_pipeline_class_init),
(reset_stream_time), (gst_pipeline_change_state),
(gst_pipeline_handle_message), (gst_pipeline_set_new_stream_time):
Using the new ASYNC_START message we can reset the base_time when
needed. This can then be used to implement base_time redistribution in
flushing seeks so that we can remove the explicit seek handling.
Perform latency query and configuration when going to PLAYING.
* libs/gst/base/gstbasesink.c: (gst_base_sink_commit_state),
(gst_base_sink_query), (gst_base_sink_change_state):
Post new ASYNC_START/ASYNC_DONE messages.
* tests/check/generic/sinks.c: (GST_START_TEST):
Fix test because the bin will not set the async element to PLAYING right
away.
* tests/check/gst/gstbin.c: (pop_async_done), (GST_START_TEST):
Make the message check a little stronger.
Handle ASYNC messages.
* tests/check/pipelines/cleanup.c: (GST_START_TEST):
* tests/check/pipelines/simple-launch-lines.c: (GST_START_TEST):
Expect ASYNC_DONE messages.
This commit is contained in:
Wim Taymans 2007-03-19 10:47:56 +00:00
parent d66263996d
commit d14c4c4a67
11 changed files with 879 additions and 260 deletions

View file

@ -1,3 +1,59 @@
2007-03-19 Wim Taymans <wim@fluendo.com>
* docs/gst/gstreamer-sections.txt:
Add new element field and method.
* gst/gstbin.c: (gst_bin_class_init), (gst_bin_init),
(bin_remove_messages), (gst_bin_add_func), (gst_bin_remove_func),
(gst_bin_recalc_state), (gst_bin_get_state_func),
(gst_bin_element_set_state), (gst_bin_change_state_func),
(gst_bin_continue_func), (bin_bus_handler),
(bin_push_state_continue), (bin_handle_async_start),
(bin_handle_async_done), (gst_bin_handle_message_func):
Make async state changes a bit smarter by using new ASYNC_START and
ASYNC_DONE messages. This reduces the number of times we run the state
recalculation thread.
Don't change state of element with a pending ASYNC_START message.
Deprecate STATE_DIRTY messages.
* gst/gstelement.c: (gst_element_init), (gst_element_send_event),
(gst_element_get_state_func), (gst_element_continue_state),
(gst_element_lost_state), (gst_element_set_state_func),
(gst_element_change_state):
* gst/gstelement.h:
Keep the state that was last set by the app in a new element field.
Don't allow state changes when handling an element event.
Post ASYNC_START and ASYNC_DONE messages.
Change lost_state so that we go to PAUSED and wait for the parent to set
us to PLAYING again (so latency calculation can be performed)
Export gst_element_change_state() method so that subclasses can use it.
API: gst_element_change_state()
API: GST_STATE_TARGET
* gst/gstpipeline.c: (gst_pipeline_class_init),
(reset_stream_time), (gst_pipeline_change_state),
(gst_pipeline_handle_message), (gst_pipeline_set_new_stream_time):
Using the new ASYNC_START message we can reset the base_time when
needed. This can then be used to implement base_time redistribution in
flushing seeks so that we can remove the explicit seek handling.
Perform latency query and configuration when going to PLAYING.
* libs/gst/base/gstbasesink.c: (gst_base_sink_commit_state),
(gst_base_sink_query), (gst_base_sink_change_state):
Post new ASYNC_START/ASYNC_DONE messages.
* tests/check/generic/sinks.c: (GST_START_TEST):
Fix test because the bin will not set the async element to PLAYING right
away.
* tests/check/gst/gstbin.c: (pop_async_done), (GST_START_TEST):
Make the message check a little stronger.
Handle ASYNC messages.
* tests/check/pipelines/cleanup.c: (GST_START_TEST):
* tests/check/pipelines/simple-launch-lines.c: (GST_START_TEST):
Expect ASYNC_DONE messages.
2007-03-19 Wim Taymans <wim@fluendo.com>
* docs/gst/gstreamer-sections.txt:

View file

@ -419,6 +419,7 @@ GST_STATE_GET_NEXT
GST_STATE_NEXT
GST_STATE_PENDING
GST_STATE_RETURN
GST_STATE_TARGET
GST_STATE_TRANSITION
GST_STATE_TRANSITION_CURRENT
GST_STATE_TRANSITION_NEXT
@ -500,6 +501,7 @@ gst_element_lost_state
gst_element_state_get_name
gst_element_state_change_return_get_name
gst_element_sync_state_with_parent
gst_element_change_state
<SUBSECTION element-tags>
gst_element_found_tags

View file

@ -187,11 +187,12 @@ GST_ELEMENT_DETAILS ("Generic bin",
static void gst_bin_dispose (GObject * object);
static void gst_bin_recalc_state (GstBin * bin, gboolean force);
static GstStateChangeReturn gst_bin_change_state_func (GstElement * element,
GstStateChange transition);
static GstStateChangeReturn gst_bin_get_state_func (GstElement * element,
GstState * state, GstState * pending, GstClockTime timeout);
static void bin_handle_async_done (GstBin * bin, GstMessage ** message);
static void bin_push_state_continue (GstBin * bin);
static gboolean gst_bin_add_func (GstBin * bin, GstElement * element);
static gboolean gst_bin_remove_func (GstBin * bin, GstElement * element);
@ -215,7 +216,7 @@ static void gst_bin_restore_thyself (GstObject * object, xmlNodePtr self);
static void bin_remove_messages (GstBin * bin, GstObject * src,
GstMessageType types);
static void gst_bin_recalc_func (GstBin * child, gpointer data);
static void gst_bin_continue_func (GstBin * child, gpointer data);
static gint bin_element_is_sink (GstElement * child, GstBin * bin);
static gint bin_element_is_src (GstElement * child, GstBin * bin);
@ -398,7 +399,7 @@ gst_bin_class_init (GstBinClass * klass)
GST_DEBUG ("creating bin thread pool");
err = NULL;
klass->pool =
g_thread_pool_new ((GFunc) gst_bin_recalc_func, NULL, -1, FALSE, &err);
g_thread_pool_new ((GFunc) gst_bin_continue_func, NULL, -1, FALSE, &err);
if (err != NULL) {
g_critical ("could alloc threadpool %s", err->message);
}
@ -413,9 +414,8 @@ gst_bin_init (GstBin * bin)
bin->children = NULL;
bin->children_cookie = 0;
bin->messages = NULL;
bin->polling = FALSE;
bin->state_dirty = FALSE;
bin->provided_clock = NULL;
bin->state_dirty = FALSE;
bin->clock_dirty = FALSE;
/* Set up a bus for listening to child elements */
@ -680,12 +680,13 @@ bin_remove_messages (GstBin * bin, GstObject * src, GstMessageType types)
if (message_check (message, &find) == 0) {
GST_DEBUG_OBJECT (GST_MESSAGE_SRC (message),
"deleting message of types %d", types);
"deleting message %p of types 0x%08x", message, types);
bin->messages = g_list_delete_link (bin->messages, walk);
gst_message_unref (message);
} else {
GST_DEBUG_OBJECT (GST_MESSAGE_SRC (message),
"not deleting message of type %d", GST_MESSAGE_TYPE (message));
"not deleting message %p of type 0x%08x", message,
GST_MESSAGE_TYPE (message));
}
}
}
@ -803,6 +804,7 @@ gst_bin_add_func (GstBin * bin, GstElement * element)
* that is not important right now. When the pipeline goes to PLAYING,
* a new clock will be selected */
gst_element_set_clock (element, GST_ELEMENT_CLOCK (bin));
GST_DEBUG_OBJECT (bin, "marking state dirty");
bin->state_dirty = TRUE;
GST_OBJECT_UNLOCK (bin);
@ -848,7 +850,6 @@ had_parent:
}
}
/**
* gst_bin_add:
* @bin: a #GstBin
@ -906,7 +907,9 @@ gst_bin_remove_func (GstBin * bin, GstElement * element)
gchar *elem_name;
GstIterator *it;
gboolean is_sink;
GstMessage *clock_message = NULL;
GstMessage *clock_message = NULL, *async_message = NULL, *smessage = NULL;
GList *walk, *next;
gboolean other_async = FALSE, this_async = FALSE, cont = FALSE;
GST_OBJECT_LOCK (element);
/* Check if the element is already being removed and immediately
@ -958,12 +961,58 @@ gst_bin_remove_func (GstBin * bin, GstElement * element)
gst_message_new_clock_lost (GST_OBJECT_CAST (bin), bin->provided_clock);
}
/* remove messages for the element, if there was a pending ASYNC_START
* message we must see if removing the element caused the bin to lose its
* async state. */
for (walk = bin->messages; walk; walk = next) {
GstMessage *message = (GstMessage *) walk->data;
GstElement *src = GST_ELEMENT_CAST (GST_MESSAGE_SRC (message));
next = g_list_next (walk);
if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ASYNC_START) {
if (src == element)
this_async = TRUE;
else
other_async = TRUE;
GST_DEBUG_OBJECT (GST_MESSAGE_SRC (message),
"looking at message %p", message);
}
if (src == element) {
/* delete all message types */
GST_DEBUG_OBJECT (GST_MESSAGE_SRC (message),
"deleting message %p of element \"%s\"", message, elem_name);
bin->messages = g_list_delete_link (bin->messages, walk);
gst_message_unref (message);
}
}
/* all other elements were not async and we removed the async one,
* post a ASYNC_DONE message because we are not async anymore now. */
if (!other_async && this_async) {
GST_DEBUG_OBJECT (bin, "we removed the last async element");
cont = GST_OBJECT_PARENT (bin) == NULL;
if (!cont) {
bin_handle_async_done (bin, &smessage);
async_message = gst_message_new_async_done (GST_OBJECT_CAST (bin));
}
}
GST_DEBUG_OBJECT (bin, "marking state dirty");
bin->state_dirty = TRUE;
GST_OBJECT_UNLOCK (bin);
if (clock_message) {
if (clock_message)
gst_element_post_message (GST_ELEMENT_CAST (bin), clock_message);
}
if (smessage)
gst_element_post_message (GST_ELEMENT_CAST (bin), smessage);
if (async_message)
gst_element_post_message (GST_ELEMENT_CAST (bin), async_message);
if (cont)
bin_push_state_continue (bin);
GST_CAT_INFO_OBJECT (GST_CAT_PARENTAGE, bin, "removed child \"%s\"",
elem_name);
@ -1277,26 +1326,6 @@ gst_bin_iterate_sources (GstBin * bin)
return result;
}
/*
* MT safe
*/
static GstStateChangeReturn
gst_bin_get_state_func (GstElement * element, GstState * state,
GstState * pending, GstClockTime timeout)
{
GstBin *bin = GST_BIN (element);
GstStateChangeReturn ret;
GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "getting state");
/* do a non forced recalculation of the state */
gst_bin_recalc_state (bin, FALSE);
ret = parent_class->get_state (element, state, pending, timeout);
return ret;
}
static void
gst_bin_recalc_state (GstBin * bin, gboolean force)
{
@ -1404,6 +1433,7 @@ restart:
done:
bin->polling = FALSE;
GST_STATE_RETURN (bin) = ret;
GST_OBJECT_UNLOCK (bin);
/* now we can take the state lock, it is possible that new elements
@ -1461,6 +1491,26 @@ unknown_state:
}
}
/*
* MT safe
*/
static GstStateChangeReturn
gst_bin_get_state_func (GstElement * element, GstState * state,
GstState * pending, GstClockTime timeout)
{
GstBin *bin = GST_BIN (element);
GstStateChangeReturn ret;
GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "getting state");
/* do a non forced recalculation of the state */
gst_bin_recalc_state (bin, FALSE);
ret = parent_class->get_state (element, state, pending, timeout);
return ret;
}
/***********************************************
* Topologically sorted iterator
* see http://en.wikipedia.org/wiki/Topological_sorting
@ -1746,10 +1796,13 @@ gst_bin_iterate_sorted (GstBin * bin)
}
static GstStateChangeReturn
gst_bin_element_set_state (GstBin * bin, GstElement * element, GstState pending)
gst_bin_element_set_state (GstBin * bin, GstElement * element,
GstClockTime base_time, GstState current, GstState next)
{
GstStateChangeReturn ret;
gboolean locked;
GList *found;
MessageFind find;
/* peel off the locked flag */
GST_OBJECT_LOCK (element);
@ -1757,18 +1810,53 @@ gst_bin_element_set_state (GstBin * bin, GstElement * element, GstState pending)
GST_OBJECT_UNLOCK (element);
/* skip locked elements */
if (G_UNLIKELY (locked)) {
GST_DEBUG_OBJECT (element,
"element is locked, pretending state change succeeded");
ret = GST_STATE_CHANGE_SUCCESS;
goto done;
if (G_UNLIKELY (locked))
goto locked;
/* the element was busy with an upwards async state change, we must wait for
* an ASYNC_DONE message before we attemp to change the state. */
find.src = GST_OBJECT_CAST (element);
find.types = GST_MESSAGE_ASYNC_START;
GST_OBJECT_LOCK (bin);
if ((found = g_list_find_custom (bin->messages, &find,
(GCompareFunc) message_check))) {
GstMessage *message = GST_MESSAGE_CAST (found->data);
GstObject *src = GST_MESSAGE_SRC (message);
GST_DEBUG_OBJECT (element, "element message %p, %s async busy",
message, GST_ELEMENT_NAME (src));
/* only wait for upward state changes */
if (next > current)
goto was_busy;
}
GST_OBJECT_UNLOCK (bin);
GST_DEBUG_OBJECT (bin,
"setting element %s to %s, base_time %" GST_TIME_FORMAT,
GST_ELEMENT_NAME (element), gst_element_state_get_name (next),
GST_TIME_ARGS (base_time));
/* set base_time on child */
gst_element_set_base_time (element, base_time);
/* change state */
ret = gst_element_set_state (element, pending);
ret = gst_element_set_state (element, next);
done:
return ret;
locked:
{
GST_DEBUG_OBJECT (element,
"element is locked, pretending state change succeeded");
return GST_STATE_CHANGE_SUCCESS;
}
was_busy:
{
GST_DEBUG_OBJECT (element, "element is was busy delaying state change");
GST_OBJECT_UNLOCK (bin);
return GST_STATE_CHANGE_ASYNC;
}
}
/* gst_iterator_fold functions for pads_activate
@ -1925,11 +2013,8 @@ restart:
child = GST_ELEMENT_CAST (data);
/* set base_time on child */
gst_element_set_base_time (child, base_time);
/* set state now */
ret = gst_bin_element_set_state (bin, child, next);
/* set state and base_time now */
ret = gst_bin_element_set_state (bin, child, base_time, current, next);
switch (ret) {
case GST_STATE_CHANGE_SUCCESS:
@ -1939,11 +2024,13 @@ restart:
gst_element_state_get_name (next));
break;
case GST_STATE_CHANGE_ASYNC:
{
GST_CAT_INFO_OBJECT (GST_CAT_STATES, element,
"child '%s' is changing state asynchronously to %s",
GST_ELEMENT_NAME (child), gst_element_state_get_name (next));
have_async = TRUE;
break;
}
case GST_STATE_CHANGE_FAILURE:
GST_CAT_INFO_OBJECT (GST_CAT_STATES, element,
"child '%s' failed to go to state %d(%s)",
@ -1991,10 +2078,11 @@ done:
gst_iterator_free (it);
GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element,
"done changing bin's state from %s to %s, now in %s, ret %d",
"done changing bin's state from %s to %s, now in %s, ret %s",
gst_element_state_get_name (current),
gst_element_state_get_name (next),
gst_element_state_get_name (GST_STATE (element)), ret);
gst_element_state_get_name (GST_STATE (element)),
gst_element_state_change_return_get_name (ret));
return ret;
@ -2067,20 +2155,137 @@ gst_bin_send_event (GstElement * element, GstEvent * event)
}
static void
gst_bin_recalc_func (GstBin * bin, gpointer data)
gst_bin_continue_func (GstBin * bin, gpointer data)
{
GST_DEBUG_OBJECT (bin, "doing state recalc");
GstState current, next, pending, target, old_state, old_next;
GstStateChangeReturn old_ret, ret;
GstStateChange transition;
gboolean busy, post;
GST_DEBUG_OBJECT (bin, "waiting for state lock");
GST_STATE_LOCK (bin);
gst_bin_recalc_state (bin, FALSE);
GST_DEBUG_OBJECT (bin, "doing state continue");
GST_OBJECT_LOCK (bin);
old_ret = GST_STATE_RETURN (bin);
GST_STATE_RETURN (bin) = GST_STATE_CHANGE_SUCCESS;
busy = (old_ret == GST_STATE_CHANGE_ASYNC);
target = GST_STATE_TARGET (bin);
pending = GST_STATE_PENDING (bin);
/* check if there is something to commit */
if (pending == GST_STATE_VOID_PENDING)
goto nothing_pending;
old_state = GST_STATE (bin);
/* this is the state we should go to next */
old_next = GST_STATE_NEXT (bin);
GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "busy %d, target %s",
busy, gst_element_state_get_name (target));
if (old_next == GST_STATE_PLAYING) {
post = FALSE;
} else {
post = TRUE;
}
/* we're going to PLAYING, always do the PAUSED->PLAYING state change */
if (target == GST_STATE_PLAYING) {
next = GST_STATE_PAUSED;
GST_STATE_PENDING (bin) = pending = target;
} else {
next = old_next;
}
/* update current state */
current = GST_STATE (bin) = next;
/* see if we reached the final state */
if (pending == current)
goto complete;
next = GST_STATE_GET_NEXT (current, pending);
transition = (GstStateChange) GST_STATE_TRANSITION (current, next);
GST_STATE_NEXT (bin) = next;
/* mark busy */
GST_STATE_RETURN (bin) = GST_STATE_CHANGE_ASYNC;
GST_OBJECT_UNLOCK (bin);
if (post) {
GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin,
"committing state from %s to %s, pending %s",
gst_element_state_get_name (old_state),
gst_element_state_get_name (old_next),
gst_element_state_get_name (pending));
gst_element_post_message (GST_ELEMENT_CAST (bin),
gst_message_new_state_changed (GST_OBJECT_CAST (bin),
old_state, old_next, pending));
}
if (busy) {
GST_DEBUG_OBJECT (bin, "posting ASYNC_DONE");
gst_element_post_message (GST_ELEMENT_CAST (bin),
gst_message_new_async_done (GST_OBJECT_CAST (bin)));
}
GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin,
"continue state change %s to %s, final %s",
gst_element_state_get_name (current),
gst_element_state_get_name (next), gst_element_state_get_name (pending));
ret = gst_element_change_state (GST_ELEMENT_CAST (bin), transition);
done:
GST_STATE_UNLOCK (bin);
GST_DEBUG_OBJECT (bin, "state recalc done");
GST_DEBUG_OBJECT (bin, "state continue done");
gst_object_unref (bin);
return;
nothing_pending:
{
GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "nothing pending");
GST_OBJECT_UNLOCK (bin);
goto done;
}
complete:
{
GST_STATE_PENDING (bin) = GST_STATE_VOID_PENDING;
GST_STATE_NEXT (bin) = GST_STATE_VOID_PENDING;
GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "completed state change to %s",
gst_element_state_get_name (pending));
GST_OBJECT_UNLOCK (bin);
/* don't post silly messages with the same state. This can happen
* when an element state is changed to what it already was. For bins
* this can be the result of a lost state, which we check with the
* previous return value.
* We do signal the cond though as a _get_state() might be blocking
* on it. */
if (old_state != old_next || old_ret == GST_STATE_CHANGE_ASYNC) {
gst_element_post_message (GST_ELEMENT_CAST (bin),
gst_message_new_state_changed (GST_OBJECT_CAST (bin),
old_state, old_next, GST_STATE_VOID_PENDING));
}
GST_DEBUG_OBJECT (bin, "posting ASYNC_DONE");
gst_element_post_message (GST_ELEMENT_CAST (bin),
gst_message_new_async_done (GST_OBJECT_CAST (bin)));
GST_STATE_BROADCAST (bin);
goto done;
}
}
static GstBusSyncReply
bin_bus_handler (GstBus * bus, GstMessage * message, GstBin * bin)
{
GstBinClass *bclass;
bclass = GST_BIN_GET_CLASS (bin);
@ -2092,6 +2297,139 @@ bin_bus_handler (GstBus * bus, GstMessage * message, GstBin * bin)
return GST_BUS_DROP;
}
static void
bin_push_state_continue (GstBin * bin)
{
GstBinClass *klass;
/* mark the bin dirty */
GST_OBJECT_LOCK (bin);
klass = GST_BIN_GET_CLASS (bin);
GST_DEBUG_OBJECT (bin, "pushing continue on thread pool");
gst_object_ref (bin);
g_thread_pool_push (klass->pool, bin, NULL);
GST_OBJECT_UNLOCK (bin);
}
/* an element started an async state change, if we were not busy with a state
* change, we perform a lost state.
*/
static void
bin_handle_async_start (GstBin * bin, GstMessage ** message)
{
GstState old_state, new_state;
if (GST_STATE_RETURN (bin) == GST_STATE_CHANGE_FAILURE)
goto had_error;
if (GST_STATE_PENDING (bin) != GST_STATE_VOID_PENDING)
goto was_busy;
old_state = GST_STATE (bin);
if (old_state > GST_STATE_PAUSED)
new_state = GST_STATE_PAUSED;
else
new_state = old_state;
GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin,
"lost state of %s, new %s", gst_element_state_get_name (old_state),
gst_element_state_get_name (new_state));
GST_STATE (bin) = new_state;
GST_STATE_NEXT (bin) = new_state;
GST_STATE_PENDING (bin) = new_state;
GST_STATE_RETURN (bin) = GST_STATE_CHANGE_ASYNC;
*message = gst_message_new_state_changed (GST_OBJECT_CAST (bin),
new_state, new_state, new_state);
return;
had_error:
{
GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin, "we had an error");
return;
}
was_busy:
{
GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin, "state change busy");
return;
}
}
static void
bin_handle_async_done (GstBin * bin, GstMessage ** message)
{
GstState pending;
GstStateChangeReturn old_ret;
GstState old_state, old_next;
GstState current, next;
old_ret = GST_STATE_RETURN (bin);
GST_STATE_RETURN (bin) = GST_STATE_CHANGE_SUCCESS;
pending = GST_STATE_PENDING (bin);
/* check if there is something to commit */
if (pending == GST_STATE_VOID_PENDING)
goto nothing_pending;
old_state = GST_STATE (bin);
/* this is the state we should go to next */
old_next = GST_STATE_NEXT (bin);
/* update current state */
current = GST_STATE (bin) = old_next;
/* see if we reached the final state */
if (pending == current)
goto complete;
next = GST_STATE_GET_NEXT (current, pending);
GST_STATE_NEXT (bin) = next;
/* mark busy */
GST_STATE_RETURN (bin) = GST_STATE_CHANGE_ASYNC;
GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin,
"committing state from %s to %s, pending %s",
gst_element_state_get_name (old_state),
gst_element_state_get_name (old_next),
gst_element_state_get_name (pending));
*message = gst_message_new_state_changed (GST_OBJECT_CAST (bin),
old_state, old_next, pending);
return;
nothing_pending:
{
GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "nothing pending");
return;
}
complete:
{
GST_STATE_PENDING (bin) = GST_STATE_VOID_PENDING;
GST_STATE_NEXT (bin) = GST_STATE_VOID_PENDING;
GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "completed state change");
/* don't post silly messages with the same state. This can happen
* when an element state is changed to what it already was. For bins
* this can be the result of a lost state, which we check with the
* previous return value.
* We do signal the cond though as a _get_state() might be blocking
* on it. */
if (old_state != old_next || old_ret == GST_STATE_CHANGE_ASYNC) {
*message = gst_message_new_state_changed (GST_OBJECT_CAST (bin),
old_state, old_next, GST_STATE_VOID_PENDING);
}
GST_STATE_BROADCAST (bin);
return;
}
}
/* handle child messages:
*
* This method is called synchronously when a child posts a message on
@ -2139,15 +2477,29 @@ bin_bus_handler (GstBus * bus, GstMessage * message, GstBin * bin)
* This message is never sent to the application but is forwarded to
* the parent.
*
* GST_MESSAGE_ASYNC_START: Create an internal ELEMENT message that stores
* the state of the element and the fact that the element will need a
* new base_time.
*
* GST_MESSAGE_ASYNC_DONE: Find the internal ELEMENT message we kept for the
* element when it posted ASYNC_START. If all elements are done, post a
* ASYNC_DONE message to the parent.
*
* OTHER: post upwards.
*/
static void
gst_bin_handle_message_func (GstBin * bin, GstMessage * message)
{
GST_DEBUG_OBJECT (bin, "[msg %p] handling child message of type %s",
message, GST_MESSAGE_TYPE_NAME (message));
GstObject *src;
GstMessageType type;
switch (GST_MESSAGE_TYPE (message)) {
src = GST_MESSAGE_SRC (message);
type = GST_MESSAGE_TYPE (message);
GST_DEBUG_OBJECT (bin, "[msg %p] handling child %s message of type %s",
message, GST_ELEMENT_NAME (src), GST_MESSAGE_TYPE_NAME (message));
switch (type) {
case GST_MESSAGE_EOS:
{
gboolean eos;
@ -2168,44 +2520,11 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message)
}
case GST_MESSAGE_STATE_DIRTY:
{
GstObject *src;
GstBinClass *klass;
src = GST_MESSAGE_SRC (message);
GST_DEBUG_OBJECT (bin, "%s gave state dirty", GST_ELEMENT_NAME (src));
/* mark the bin dirty */
GST_OBJECT_LOCK (bin);
GST_DEBUG_OBJECT (bin, "marking dirty");
bin->state_dirty = TRUE;
if (GST_OBJECT_PARENT (bin))
goto not_toplevel;
GST_WARNING_OBJECT (bin, "received deprecated STATE_DIRTY message");
/* free message */
gst_message_unref (message);
klass = GST_BIN_GET_CLASS (bin);
if (!bin->polling) {
GST_DEBUG_OBJECT (bin, "pushing recalc on thread pool");
gst_object_ref (bin);
g_thread_pool_push (klass->pool, bin, NULL);
} else {
GST_DEBUG_OBJECT (bin,
"state recalc already in progress, not pushing new recalc");
}
GST_OBJECT_UNLOCK (bin);
break;
/* non toplevel bins just forward the message and don't start
* a recalc themselves */
not_toplevel:
{
GST_OBJECT_UNLOCK (bin);
GST_DEBUG_OBJECT (bin, "not toplevel, forwarding");
goto forward;
}
}
case GST_MESSAGE_SEGMENT_START:
GST_OBJECT_LOCK (bin);
@ -2311,6 +2630,117 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message)
gst_message_unref (message);
break;
}
case GST_MESSAGE_ASYNC_START:
{
gboolean forward = FALSE, new_base_time;
GstState target;
GstMessage *smessage = NULL;
GST_DEBUG_OBJECT (bin, "ASYNC_START message %p, %s", message,
GST_OBJECT_NAME (src));
gst_message_parse_async_start (message, &new_base_time);
GST_OBJECT_LOCK (bin);
/* we ignore the message if we are going to <= READY */
target = GST_STATE_TARGET (bin);
if (target <= GST_STATE_READY)
goto ignore_start_message;
bin_replace_message (bin, message, GST_MESSAGE_ASYNC_START);
bin_handle_async_start (bin, &smessage);
/* prepare an ASYNC_START message */
if (GST_OBJECT_PARENT (bin)) {
forward = TRUE;
message =
gst_message_new_async_start (GST_OBJECT_CAST (bin), new_base_time);
} else {
message = NULL;
}
GST_OBJECT_UNLOCK (bin);
if (smessage)
gst_element_post_message (GST_ELEMENT_CAST (bin), smessage);
if (forward)
goto forward;
else
break;
ignore_start_message:
{
GST_DEBUG_OBJECT (bin, "ignoring message, target %s",
gst_element_state_get_name (target));
GST_OBJECT_UNLOCK (bin);
gst_message_unref (message);
break;
}
}
case GST_MESSAGE_ASYNC_DONE:
{
gboolean done = FALSE, toplevel = FALSE;
GstState target;
MessageFind find;
GstMessage *smessage = NULL;
GST_DEBUG_OBJECT (bin, "ASYNC_DONE message %p, %s", message,
GST_OBJECT_NAME (src));
GST_OBJECT_LOCK (bin);
target = GST_STATE_TARGET (bin);
/* ignore messages if we are shutting down */
if (target <= GST_STATE_READY)
goto ignore_done_message;
bin_replace_message (bin, message, GST_MESSAGE_ASYNC_START);
/* if there are no more ASYNC_START messages, everybody posted
* a ASYNC_DONE and we can post one on the bus. */
/* we don't care who still has a pending ASYNC_START */
find.src = NULL;
find.types = GST_MESSAGE_ASYNC_START;
if (!g_list_find_custom (bin->messages, &find,
(GCompareFunc) message_check)) {
/* nothing found, remove all old ASYNC_DONE messages */
bin_remove_messages (bin, NULL, GST_MESSAGE_ASYNC_DONE);
done = TRUE;
toplevel = GST_OBJECT_PARENT (bin) == NULL;
if (!toplevel)
bin_handle_async_done (bin, &smessage);
}
GST_OBJECT_UNLOCK (bin);
if (smessage) {
GST_DEBUG_OBJECT (bin, "posting state change message");
gst_element_post_message (GST_ELEMENT_CAST (bin), smessage);
}
if (done) {
if (!toplevel) {
GST_DEBUG_OBJECT (bin,
"all async-done, posting ASYNC_DONE to parent");
/* post our combined ASYNC_DONE when all is ASYNC_DONE. */
gst_element_post_message (GST_ELEMENT_CAST (bin),
gst_message_new_async_done (GST_OBJECT_CAST (bin)));
} else {
/* toplevel, start continue state */
GST_DEBUG_OBJECT (bin, "all async-done, starting state continue");
bin_push_state_continue (bin);
}
}
break;
ignore_done_message:
{
GST_DEBUG_OBJECT (bin, "ignoring message, target %s",
gst_element_state_get_name (target));
GST_OBJECT_UNLOCK (bin);
gst_message_unref (message);
break;
}
}
default:
goto forward;
}

View file

@ -82,6 +82,7 @@
#include <gobject/gvaluecollector.h>
#include "gstelement.h"
#include "gstenumtypes.h"
#include "gstbus.h"
#include "gstmarshal.h"
#include "gsterror.h"
@ -118,8 +119,6 @@ static void gst_element_base_class_finalize (gpointer g_class);
static void gst_element_dispose (GObject * object);
static void gst_element_finalize (GObject * object);
static GstStateChangeReturn gst_element_change_state (GstElement * element,
GstStateChange transition);
static GstStateChangeReturn gst_element_change_state_func (GstElement * element,
GstStateChange transition);
static GstStateChangeReturn gst_element_get_state_func (GstElement * element,
@ -254,6 +253,7 @@ static void
gst_element_init (GstElement * element)
{
GST_STATE (element) = GST_STATE_NULL;
GST_STATE_TARGET (element) = GST_STATE_NULL;
GST_STATE_NEXT (element) = GST_STATE_VOID_PENDING;
GST_STATE_PENDING (element) = GST_STATE_VOID_PENDING;
GST_STATE_RETURN (element) = GST_STATE_CHANGE_SUCCESS;
@ -1295,6 +1295,7 @@ gst_element_send_event (GstElement * element, GstEvent * event)
oclass = GST_ELEMENT_GET_CLASS (element);
GST_STATE_LOCK (element);
if (oclass->send_event) {
GST_CAT_DEBUG (GST_CAT_ELEMENT_PADS, "send %s event on element %s",
GST_EVENT_TYPE_NAME (event), GST_ELEMENT_NAME (element));
@ -1302,6 +1303,8 @@ gst_element_send_event (GstElement * element, GstEvent * event)
} else {
result = gst_element_default_send_event (element, event);
}
GST_STATE_UNLOCK (element);
return result;
}
@ -1762,6 +1765,9 @@ gst_element_get_state_func (GstElement * element,
GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element, "RETURN is %s",
gst_element_state_change_return_get_name (ret));
GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element, "RETURN is %s",
gst_element_state_change_return_get_name (ret));
/* we got an error, report immediatly */
if (ret == GST_STATE_CHANGE_FAILURE)
goto done;
@ -1984,14 +1990,14 @@ nothing_aborted:
GstStateChangeReturn
gst_element_continue_state (GstElement * element, GstStateChangeReturn ret)
{
GstState pending;
GstState old_ret, old_state, old_next;
GstState current, next;
GstStateChangeReturn old_ret;
GstState old_state, old_next;
GstState current, next, pending;
GstMessage *message;
GstStateChange transition;
GST_OBJECT_LOCK (element);
old_ret = (GstState) GST_STATE_RETURN (element);
old_ret = GST_STATE_RETURN (element);
GST_STATE_RETURN (element) = ret;
pending = GST_STATE_PENDING (element);
@ -2047,7 +2053,8 @@ complete:
GST_STATE_PENDING (element) = GST_STATE_VOID_PENDING;
GST_STATE_NEXT (element) = GST_STATE_VOID_PENDING;
GST_CAT_INFO_OBJECT (GST_CAT_STATES, element, "completed state change");
GST_CAT_INFO_OBJECT (GST_CAT_STATES, element,
"completed state change to %s", gst_element_state_get_name (pending));
GST_OBJECT_UNLOCK (element);
/* don't post silly messages with the same state. This can happen
@ -2076,10 +2083,16 @@ complete:
* element is copied to the pending state so that any call to
* gst_element_get_state() will return %GST_STATE_CHANGE_ASYNC.
*
* An ASYNC_START message is posted with an indication to distribute a new
* base_time to the element.
* If the element was PLAYING, it will go to PAUSED. The element
* will be restored to its PLAYING state by the parent pipeline when it
* prerolls again.
*
* This is mostly used for elements that lost their preroll buffer
* in the %GST_STATE_PAUSED state after a flush, they become %GST_STATE_PAUSED
* again if a new preroll buffer is queued.
* This function can only be called when the element is currently
* in the %GST_STATE_PAUSED or %GST_STATE_PLAYING state after a flush,
* they will go to their pending state again when a new preroll buffer is
* queued. This function can only be called when the element is currently
* not in error or an async state change.
*
* This function is used internally and should normally not be called from
@ -2090,7 +2103,7 @@ complete:
void
gst_element_lost_state (GstElement * element)
{
GstState current_state;
GstState old_state, new_state;
GstMessage *message;
g_return_if_fail (GST_IS_ELEMENT (element));
@ -2100,22 +2113,31 @@ gst_element_lost_state (GstElement * element)
GST_STATE_RETURN (element) == GST_STATE_CHANGE_FAILURE)
goto nothing_lost;
current_state = GST_STATE (element);
old_state = GST_STATE (element);
/* when we were PLAYING, the new state is PAUSED. We will also not
* automatically go to PLAYING but let the parent bin(s) set us to PLAYING
* when we preroll. */
if (old_state > GST_STATE_PAUSED)
new_state = GST_STATE_PAUSED;
else
new_state = old_state;
GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element,
"lost state of %s", gst_element_state_get_name (current_state));
"lost state of %s to %s", gst_element_state_get_name (old_state),
gst_element_state_get_name (new_state));
GST_STATE_NEXT (element) = current_state;
GST_STATE_PENDING (element) = current_state;
GST_STATE (element) = new_state;
GST_STATE_NEXT (element) = new_state;
GST_STATE_PENDING (element) = new_state;
GST_STATE_RETURN (element) = GST_STATE_CHANGE_ASYNC;
GST_OBJECT_UNLOCK (element);
message = gst_message_new_state_changed (GST_OBJECT_CAST (element),
current_state, current_state, current_state);
new_state, new_state, new_state);
gst_element_post_message (element, message);
/* and mark us dirty */
message = gst_message_new_state_dirty (GST_OBJECT_CAST (element));
message = gst_message_new_async_start (GST_OBJECT_CAST (element), TRUE);
gst_element_post_message (element, message);
return;
@ -2200,7 +2222,9 @@ gst_element_set_state_func (GstElement * element, GstState state)
/* increment state cookie so that we can track each state change */
element->state_cookie++;
/* this is the (new) state we should go to */
/* this is the (new) state we should go to. TARGET is the last state we set on
* the element. */
GST_STATE_TARGET (element) = state;
GST_STATE_PENDING (element) = state;
GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element,
@ -2271,8 +2295,19 @@ was_busy:
}
}
/* with STATE_LOCK */
static GstStateChangeReturn
/**
* gst_element_change_state:
* @element: a #GstElement
* @transition: the requested transition
*
* Perform @transition on @element.
*
* This function must be called with STATE_LOCK held and is mainly used
* internally.
*
* Returns: the #GstStateChangeReturn of the state transition.
*/
GstStateChangeReturn
gst_element_change_state (GstElement * element, GstStateChange transition)
{
GstElementClass *oclass;
@ -2300,22 +2335,26 @@ gst_element_change_state (GstElement * element, GstStateChange transition)
gst_element_abort_state (element);
break;
case GST_STATE_CHANGE_ASYNC:
{
GstState target;
GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element,
"element will change state ASYNC");
/* if we go upwards, we give the app a change to wait for
* completion */
if (current < next)
target = GST_STATE_TARGET (element);
if (target > GST_STATE_READY)
goto async;
/* else we just continue the state change downwards */
GST_CAT_INFO_OBJECT (GST_CAT_STATES, element,
"forcing commit state %s < %s",
gst_element_state_get_name (current),
gst_element_state_get_name (next));
"forcing commit state %s <= %s",
gst_element_state_get_name (target),
gst_element_state_get_name (GST_STATE_READY));
ret = gst_element_continue_state (element, GST_STATE_CHANGE_SUCCESS);
break;
}
case GST_STATE_CHANGE_SUCCESS:
GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element,
"element changed state SUCCESS");

View file

@ -119,6 +119,16 @@ typedef enum {
*/
#define GST_STATE_PENDING(elem) (GST_ELEMENT_CAST(elem)->pending_state)
/**
* GST_STATE_TARGET:
* @elem: a #GstElement to return the target state for.
*
* This macro returns the target #GstState of the element.
*
* Since: 0.10.13
*/
#define GST_STATE_TARGET(elem) (GST_ELEMENT_CAST(elem)->abidata.ABI.target_state)
/**
* GST_STATE_RETURN:
* @elem: a #GstElement to return the last state result for.
@ -427,7 +437,14 @@ struct _GstElement
guint32 pads_cookie;
/*< private >*/
gpointer _gst_reserved[GST_PADDING];
union {
struct {
/* state set by application */
GstState target_state;
} ABI;
/* adding + 0 to mark ABI change to be undone later */
gpointer _gst_reserved[GST_PADDING + 0];
} abidata;
};
/**
@ -626,6 +643,8 @@ GstStateChangeReturn gst_element_get_state (GstElement * element,
GstStateChangeReturn gst_element_set_state (GstElement *element, GstState state);
void gst_element_abort_state (GstElement * element);
GstStateChangeReturn gst_element_change_state (GstElement * element,
GstStateChange transition);
GstStateChangeReturn gst_element_continue_state (GstElement * element,
GstStateChangeReturn ret);
void gst_element_lost_state (GstElement * element);

View file

@ -120,6 +120,8 @@ struct _GstPipelinePrivate
{
/* with LOCK */
gboolean auto_flush_bus;
GstClockTime new_stream_time;
};
@ -133,13 +135,12 @@ static void gst_pipeline_set_property (GObject * object, guint prop_id,
static void gst_pipeline_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static gboolean gst_pipeline_send_event (GstElement * element,
GstEvent * event);
static GstClock *gst_pipeline_provide_clock_func (GstElement * element);
static GstStateChangeReturn gst_pipeline_change_state (GstElement * element,
GstStateChange transition);
static void gst_pipeline_handle_message (GstBin * bin, GstMessage * message);
static GstBinClass *parent_class = NULL;
/* static guint gst_pipeline_signals[LAST_SIGNAL] = { 0 }; */
@ -185,6 +186,7 @@ gst_pipeline_class_init (gpointer g_class, gpointer class_data)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (g_class);
GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
GstBinClass *gstbin_class = GST_BIN_CLASS (g_class);
GstPipelineClass *klass = GST_PIPELINE_CLASS (g_class);
parent_class = g_type_class_peek_parent (klass);
@ -226,11 +228,13 @@ gst_pipeline_class_init (gpointer g_class, gpointer class_data)
gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_pipeline_dispose);
gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_pipeline_send_event);
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_pipeline_change_state);
gstelement_class->provide_clock =
GST_DEBUG_FUNCPTR (gst_pipeline_provide_clock_func);
gstbin_class->handle_message =
GST_DEBUG_FUNCPTR (gst_pipeline_handle_message);
}
static void
@ -311,90 +315,19 @@ gst_pipeline_get_property (GObject * object, guint prop_id,
}
}
/* default pipeline seeking code:
*
* If the pipeline is PLAYING and a flushing seek is done, set
* the pipeline to PAUSED before doing the seek.
*
* A flushing seek also resets the stream time to 0 so that when
* we go back to PLAYING after the seek, the base_time is recalculated
* and redistributed to the elements.
*/
static gboolean
do_pipeline_seek (GstElement * element, GstEvent * event)
/* set the stream time to 0 */
static void
reset_stream_time (GstPipeline * pipeline)
{
gdouble rate;
GstSeekFlags flags;
gboolean flush;
gboolean was_playing = FALSE;
gboolean res;
/* we are only interested in the FLUSH flag of the seek event. */
gst_event_parse_seek (event, &rate, NULL, &flags, NULL, NULL, NULL, NULL);
flush = flags & GST_SEEK_FLAG_FLUSH;
/* if flushing seek, get the current state */
if (flush) {
GstState state;
/* need to call _get_state() since a bin state is only updated
* with this call. */
gst_element_get_state (element, &state, NULL, 0);
was_playing = (state == GST_STATE_PLAYING);
if (was_playing) {
/* and PAUSE when the pipeline was PLAYING, we don't need
* to wait for the state change to complete since we are going
* to flush out any preroll sample anyway. */
gst_element_set_state (element, GST_STATE_PAUSED);
GST_OBJECT_LOCK (pipeline);
if (pipeline->stream_time != GST_CLOCK_TIME_NONE) {
GST_DEBUG_OBJECT (pipeline, "reset stream_time to 0");
pipeline->stream_time = 0;
pipeline->priv->new_stream_time = TRUE;
} else {
GST_DEBUG_OBJECT (pipeline, "application asked to not reset stream_time");
}
}
/* let parent class implement the seek behaviour */
res = GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
/* if flushing seek restore previous state */
if (flush) {
gboolean need_reset;
GST_OBJECT_LOCK (element);
need_reset = GST_PIPELINE (element)->stream_time != GST_CLOCK_TIME_NONE;
GST_OBJECT_UNLOCK (element);
/* need to reset the stream time to 0 after a successfull flushing seek,
* unless the user explicitly disabled this behavior by setting stream
* time to NONE */
if (need_reset && res)
gst_pipeline_set_new_stream_time (GST_PIPELINE (element), 0);
if (was_playing)
/* and continue playing, this might return ASYNC in which case the
* application can wait for the PREROLL to complete after the seek.
*/
gst_element_set_state (element, GST_STATE_PLAYING);
}
return res;
}
static gboolean
gst_pipeline_send_event (GstElement * element, GstEvent * event)
{
gboolean res;
GstEventType event_type = GST_EVENT_TYPE (event);
switch (event_type) {
case GST_EVENT_SEEK:
/* do the default seek handling */
res = do_pipeline_seek (element, event);
break;
default:
/* else parent implements the defaults */
res = GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
break;
}
return res;
GST_OBJECT_UNLOCK (pipeline);
}
/**
@ -433,8 +366,11 @@ gst_pipeline_change_state (GstElement * element, GstStateChange transition)
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
{
GstClockTime new_base_time;
GstQuery *query;
GstClockTime min_latency, max_latency;
GstClockTime start_time, stream_time, delay;
gboolean new_clock;
gboolean new_clock, update;
gboolean res;
GST_DEBUG_OBJECT (element, "selecting clock and base_time");
@ -452,6 +388,8 @@ gst_pipeline_change_state (GstElement * element, GstStateChange transition)
GST_OBJECT_LOCK (element);
new_clock = element->clock != clock;
stream_time = pipeline->stream_time;
update = pipeline->priv->new_stream_time;
pipeline->priv->new_stream_time = FALSE;
delay = pipeline->delay;
GST_OBJECT_UNLOCK (element);
@ -468,6 +406,14 @@ gst_pipeline_change_state (GstElement * element, GstStateChange transition)
gst_message_new_new_clock (GST_OBJECT_CAST (element), clock));
}
if (clock)
gst_object_unref (clock);
/* stream time changed, either with a PAUSED or a flush, we need to update
* the base time */
if (update) {
GST_DEBUG_OBJECT (pipeline, "stream_time changed, updating base time");
if (stream_time != GST_CLOCK_TIME_NONE
&& start_time != GST_CLOCK_TIME_NONE) {
new_base_time = start_time - stream_time + delay;
@ -479,14 +425,49 @@ gst_pipeline_change_state (GstElement * element, GstStateChange transition)
} else
new_base_time = GST_CLOCK_TIME_NONE;
if (clock)
gst_object_unref (clock);
if (new_base_time != GST_CLOCK_TIME_NONE)
gst_element_set_base_time (element, new_base_time);
else
GST_DEBUG_OBJECT (pipeline,
"NOT adjusting base time because stream time is NONE");
"NOT adjusting base_time because stream_time is NONE");
} else {
GST_DEBUG_OBJECT (pipeline,
"NOT adjusting base_time because we selected one before");
}
/* determine latency in this pipeline */
GST_DEBUG_OBJECT (element, "querying pipeline latency");
query = gst_query_new_latency ();
if (gst_element_query (element, query)) {
gboolean live;
gst_query_parse_latency (query, &live, &min_latency, &max_latency);
GST_DEBUG_OBJECT (element,
"configuring min latency %" GST_TIME_FORMAT ", max latency %"
GST_TIME_FORMAT ", live %d", GST_TIME_ARGS (min_latency),
GST_TIME_ARGS (max_latency), live);
/* configure latency on elements */
res =
gst_element_send_event (element,
gst_event_new_latency (min_latency));
if (res) {
GST_INFO_OBJECT (element, "configured latency of %" GST_TIME_FORMAT,
GST_TIME_ARGS (min_latency));
} else {
GST_WARNING_OBJECT (element,
"failed to configure latency of %" GST_TIME_FORMAT,
GST_TIME_ARGS (min_latency));
GST_ELEMENT_WARNING (element, CORE, CLOCK, (NULL),
("Failed to configure latency of %" GST_TIME_FORMAT,
GST_TIME_ARGS (min_latency)));
}
} else {
/* this is not a real problem, we just don't configure any latency. */
GST_WARNING_OBJECT (element, "failed to query pipeline latency");
}
gst_query_unref (query);
break;
}
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
@ -503,17 +484,7 @@ gst_pipeline_change_state (GstElement * element, GstStateChange transition)
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
{
gboolean need_reset;
/* only reset the stream_time when the application did not
* specify a stream_time explicitly */
GST_OBJECT_LOCK (element);
need_reset = pipeline->stream_time != GST_CLOCK_TIME_NONE;
GST_OBJECT_UNLOCK (element);
if (need_reset)
gst_pipeline_set_new_stream_time (pipeline, 0);
reset_stream_time (pipeline);
break;
}
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
@ -532,8 +503,11 @@ gst_pipeline_change_state (GstElement * element, GstStateChange transition)
GST_OBJECT_LOCK (element);
/* store the current stream time */
if (pipeline->stream_time != GST_CLOCK_TIME_NONE)
if (pipeline->stream_time != GST_CLOCK_TIME_NONE) {
pipeline->stream_time = now - element->base_time;
pipeline->priv->new_stream_time = TRUE;
}
GST_DEBUG_OBJECT (element,
"stream_time=%" GST_TIME_FORMAT ", now=%" GST_TIME_FORMAT
", base_time %" GST_TIME_FORMAT,
@ -574,6 +548,32 @@ invalid_clock:
}
}
static void
gst_pipeline_handle_message (GstBin * bin, GstMessage * message)
{
GstPipeline *pipeline = GST_PIPELINE_CAST (bin);
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_ASYNC_START:
{
gboolean new_base_time;
gst_message_parse_async_start (message, &new_base_time);
/* reset our stream time if we need to distribute a new base_time to the
* children. */
if (new_base_time)
reset_stream_time (pipeline);
break;
}
default:
break;
}
GST_BIN_CLASS (parent_class)->handle_message (bin, message);
}
/**
* gst_pipeline_get_bus:
* @pipeline: a #GstPipeline
@ -614,6 +614,7 @@ gst_pipeline_set_new_stream_time (GstPipeline * pipeline, GstClockTime time)
GST_OBJECT_LOCK (pipeline);
pipeline->stream_time = time;
pipeline->priv->new_stream_time = TRUE;
GST_OBJECT_UNLOCK (pipeline);
GST_DEBUG_OBJECT (pipeline, "set new stream_time to %" GST_TIME_FORMAT,

View file

@ -192,6 +192,8 @@ struct _GstBaseSinkPrivate
/* latency stuff */
GstClockTime latency;
gboolean commited;
};
#define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
@ -888,9 +890,10 @@ gst_base_sink_commit_state (GstBaseSink * basesink)
{
/* commit state and proceed to next pending state */
GstState current, next, pending, post_pending;
GstMessage *message;
gboolean post_paused = FALSE;
gboolean post_async_done = FALSE;
gboolean post_playing = FALSE;
gboolean sync;
/* we are certainly not playing async anymore now */
basesink->playing_async = FALSE;
@ -900,6 +903,7 @@ gst_base_sink_commit_state (GstBaseSink * basesink)
next = GST_STATE_NEXT (basesink);
pending = GST_STATE_PENDING (basesink);
post_pending = pending;
sync = basesink->sync;
switch (pending) {
case GST_STATE_PLAYING:
@ -912,6 +916,8 @@ gst_base_sink_commit_state (GstBaseSink * basesink)
GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
basesink->need_preroll = FALSE;
post_async_done = TRUE;
basesink->priv->commited = TRUE;
post_playing = TRUE;
/* post PAUSED too when we were READY */
if (current == GST_STATE_READY) {
@ -929,6 +935,8 @@ gst_base_sink_commit_state (GstBaseSink * basesink)
case GST_STATE_PAUSED:
GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
post_paused = TRUE;
post_async_done = TRUE;
basesink->priv->commited = TRUE;
post_pending = GST_STATE_VOID_PENDING;
break;
case GST_STATE_READY:
@ -947,19 +955,18 @@ gst_base_sink_commit_state (GstBaseSink * basesink)
GST_OBJECT_UNLOCK (basesink);
if (post_paused) {
message = gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
current, next, post_pending);
gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
gst_element_post_message (GST_ELEMENT_CAST (basesink),
gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
current, next, post_pending));
}
if (post_async_done) {
gst_element_post_message (GST_ELEMENT_CAST (basesink),
gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
}
if (post_playing) {
message = gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
next, pending, GST_STATE_VOID_PENDING);
gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
}
/* and mark dirty */
if (post_paused || post_playing) {
gst_element_post_message (GST_ELEMENT_CAST (basesink),
gst_message_new_state_dirty (GST_OBJECT_CAST (basesink)));
gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
next, pending, GST_STATE_VOID_PENDING));
}
GST_STATE_BROADCAST (basesink);
@ -2724,6 +2731,13 @@ gst_base_sink_query (GstElement * element, GstQuery * query)
gboolean live, us_live;
GstClockTime min, max;
GST_PAD_PREROLL_LOCK (basesink->sinkpad);
if (!gst_base_sink_is_prerolled (basesink)) {
GST_DEBUG_OBJECT (basesink, "not prerolled yet, can't report latency");
res = FALSE;
goto done;
}
if ((res =
gst_base_sink_query_latency (basesink, &live, &us_live, &min,
&max))) {
@ -2737,6 +2751,8 @@ gst_base_sink_query (GstElement * element, GstQuery * query)
}
gst_query_set_latency (query, live, min, max);
}
done:
GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
break;
}
case GST_QUERY_JITTER:
@ -2780,6 +2796,7 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition)
case GST_STATE_CHANGE_READY_TO_PAUSED:
/* need to complete preroll before this state change completes, there
* is no data flow in READY so we can safely assume we need to preroll. */
GST_PAD_PREROLL_LOCK (basesink->sinkpad);
GST_DEBUG_OBJECT (basesink, "READY to PAUSED, need preroll");
gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
gst_segment_init (basesink->abidata.ABI.clip_segment,
@ -2795,7 +2812,11 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition)
basesink->priv->latency = 0;
basesink->eos = FALSE;
gst_base_sink_reset_qos (basesink);
basesink->priv->commited = FALSE;
ret = GST_STATE_CHANGE_ASYNC;
gst_element_post_message (GST_ELEMENT_CAST (basesink),
gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
GST_PAD_PREROLL_LOCK (basesink->sinkpad);
@ -2817,7 +2838,10 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition)
GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, need preroll");
basesink->need_preroll = TRUE;
basesink->playing_async = TRUE;
basesink->priv->commited = FALSE;
ret = GST_STATE_CHANGE_ASYNC;
gst_element_post_message (GST_ELEMENT_CAST (basesink),
gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
}
GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
break;
@ -2857,7 +2881,10 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition)
} else {
GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, need preroll");
basesink->playing_async = TRUE;
basesink->priv->commited = FALSE;
ret = GST_STATE_CHANGE_ASYNC;
gst_element_post_message (GST_ELEMENT_CAST (basesink),
gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
}
GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
", dropped: %" G_GUINT64_FORMAT, basesink->priv->rendered,
@ -2867,8 +2894,24 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition)
GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
GST_PAD_PREROLL_LOCK (basesink->sinkpad);
if (!basesink->priv->commited) {
GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
gst_element_post_message (GST_ELEMENT_CAST (basesink),
gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY));
gst_element_post_message (GST_ELEMENT_CAST (basesink),
gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
basesink->priv->commited = TRUE;
} else {
GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
}
basesink->priv->current_sstart = 0;
basesink->priv->current_sstop = 0;
GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
break;
case GST_STATE_CHANGE_READY_TO_NULL:
if (bclass->stop)

View file

@ -313,7 +313,7 @@ GST_START_TEST (test_livesrc_sink)
if (n_sink == 2) {
fail_unless (old == GST_STATE_READY, "unexpected old");
fail_unless (new == GST_STATE_PAUSED, "unexpected new");
fail_unless (pending == GST_STATE_PLAYING, "unexpected pending");
fail_unless (pending == GST_STATE_VOID_PENDING, "unexpected pending");
} else if (n_sink == 1) {
fail_unless (old == GST_STATE_PAUSED, "unexpected old");
fail_unless (new == GST_STATE_PLAYING, "unexpected new");

View file

@ -22,6 +22,21 @@
#include <gst/check/gstcheck.h>
static void
pop_async_done (GstBus * bus)
{
GstMessage *message;
GST_DEBUG ("popping async-done message");
message = gst_bus_poll (bus, GST_MESSAGE_ASYNC_DONE, -1);
fail_unless (message && GST_MESSAGE_TYPE (message)
== GST_MESSAGE_ASYNC_DONE, "did not get GST_MESSAGE_ASYNC_DONE");
gst_message_unref (message);
GST_DEBUG ("popped message");
}
static void
pop_messages (GstBus * bus, int count)
{
@ -277,7 +292,7 @@ GST_START_TEST (test_message_state_changed_children)
fail_unless (pending == GST_STATE_VOID_PENDING);
/* wait for async thread to settle down */
while (GST_OBJECT_REFCOUNT_VALUE (pipeline) > 2)
while (GST_OBJECT_REFCOUNT_VALUE (pipeline) > 3)
THREAD_SWITCH ();
/* each object is referenced by a message;
@ -291,6 +306,7 @@ GST_START_TEST (test_message_state_changed_children)
ASSERT_OBJECT_REFCOUNT_BETWEEN (pipeline, "pipeline", 2, 3);
pop_messages (bus, 3);
pop_async_done (bus);
fail_if ((gst_bus_pop (bus)) != NULL);
ASSERT_OBJECT_REFCOUNT (bus, "bus", 2);
@ -390,6 +406,7 @@ GST_START_TEST (test_watch_for_state_change)
fail_unless (ret == GST_STATE_CHANGE_SUCCESS);
pop_messages (bus, 6);
pop_async_done (bus);
fail_unless (gst_bus_have_pending (bus) == FALSE,
"Unexpected messages on bus");
@ -400,10 +417,12 @@ GST_START_TEST (test_watch_for_state_change)
pop_messages (bus, 3);
/* this one might return either SUCCESS or ASYNC, likely SUCCESS */
gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PAUSED);
ret = gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PAUSED);
gst_element_get_state (GST_ELEMENT (bin), NULL, NULL, GST_CLOCK_TIME_NONE);
pop_messages (bus, 3);
if (ret == GST_STATE_CHANGE_ASYNC)
pop_async_done (bus);
fail_unless (gst_bus_have_pending (bus) == FALSE,
"Unexpected messages on bus");
@ -521,6 +540,7 @@ GST_START_TEST (test_children_state_change_order_flagged_sink)
src = gst_element_factory_make ("fakesrc", NULL);
fail_if (src == NULL, "Could not create fakesrc");
g_object_set (src, "num-buffers", 5, NULL);
identity = gst_element_factory_make ("identity", NULL);
fail_if (identity == NULL, "Could not create identity");
@ -552,6 +572,7 @@ GST_START_TEST (test_children_state_change_order_flagged_sink)
/* READY => PAUSED */
/* because of pre-rolling, sink will return ASYNC on state
* change and change state later when it has a buffer */
GST_DEBUG ("popping READY -> PAUSED messages");
ASSERT_STATE_CHANGE_MSG (bus, identity, GST_STATE_READY, GST_STATE_PAUSED,
105);
#if 0
@ -561,20 +582,21 @@ GST_START_TEST (test_children_state_change_order_flagged_sink)
* in which case the sink will commit its new state before the source ... */
ASSERT_STATE_CHANGE_MSG (bus, src, GST_STATE_READY, GST_STATE_PAUSED, 106);
ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_READY, GST_STATE_PAUSED, 107);
#else
pop_messages (bus, 2); /* pop remaining ready => paused messages off the bus */
ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_READY, GST_STATE_PAUSED,
108);
pop_async_done (bus);
#endif
/* PAUSED => PLAYING */
GST_DEBUG ("popping PAUSED -> PLAYING messages");
ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_PAUSED, GST_STATE_PLAYING, 109);
ASSERT_STATE_CHANGE_MSG (bus, identity, GST_STATE_PAUSED, GST_STATE_PLAYING,
110);
ASSERT_STATE_CHANGE_MSG (bus, src, GST_STATE_PAUSED, GST_STATE_PLAYING, 111);
ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_PAUSED, GST_STATE_PLAYING,
112);
#else
pop_messages (bus, 3); /* pop remaining ready => paused messages off the bus */
pop_messages (bus, 4); /* pop paused => playing messages off the bus */
#endif
/* don't set to NULL that will set the bus flushing and kill our messages */
ret = gst_element_set_state (pipeline, GST_STATE_READY);
@ -656,6 +678,7 @@ GST_START_TEST (test_children_state_change_order_semi_sink)
/* READY => PAUSED */
/* because of pre-rolling, sink will return ASYNC on state
* change and change state later when it has a buffer */
GST_DEBUG ("popping READY -> PAUSED messages");
ASSERT_STATE_CHANGE_MSG (bus, identity, GST_STATE_READY, GST_STATE_PAUSED,
205);
#if 0
@ -665,19 +688,20 @@ GST_START_TEST (test_children_state_change_order_semi_sink)
* in which case the sink will commit its new state before the source ... */
ASSERT_STATE_CHANGE_MSG (bus, src, GST_STATE_READY, GST_STATE_PAUSED, 206);
ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_READY, GST_STATE_PAUSED, 207);
#else
pop_messages (bus, 2); /* pop remaining ready => paused messages off the bus */
ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_READY, GST_STATE_PAUSED,
208);
pop_async_done (bus);
/* PAUSED => PLAYING */
GST_DEBUG ("popping PAUSED -> PLAYING messages");
ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_PAUSED, GST_STATE_PLAYING, 209);
ASSERT_STATE_CHANGE_MSG (bus, identity, GST_STATE_PAUSED, GST_STATE_PLAYING,
210);
ASSERT_STATE_CHANGE_MSG (bus, src, GST_STATE_PAUSED, GST_STATE_PLAYING, 211);
ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_PAUSED, GST_STATE_PLAYING,
212);
#else
pop_messages (bus, 3); /* pop remaining ready => paused messages off the bus */
pop_messages (bus, 4); /* pop paused => playing messages off the bus */
#endif
/* don't set to NULL that will set the bus flushing and kill our messages */

View file

@ -88,8 +88,9 @@ GST_START_TEST (test_pipeline_unref)
sink = gst_bin_get_by_name (GST_BIN (pipeline), "sink");
fail_if (sink == NULL);
run_pipeline (pipeline, s, GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED,
GST_MESSAGE_EOS);
run_pipeline (pipeline, s,
GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
GST_MESSAGE_ASYNC_DONE, GST_MESSAGE_EOS);
while (GST_OBJECT_REFCOUNT_VALUE (src) > 1)
THREAD_SWITCH ();
ASSERT_OBJECT_REFCOUNT (src, "src", 1);

View file

@ -97,24 +97,28 @@ GST_START_TEST (test_2_elements)
s = "fakesrc can-activate-push=false ! fakesink can-activate-pull=true";
run_pipeline (setup_pipeline (s), s,
GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_UNKNOWN);
GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
GST_MESSAGE_ASYNC_DONE, GST_MESSAGE_UNKNOWN);
s = "fakesrc can-activate-push=true ! fakesink can-activate-pull=false";
run_pipeline (setup_pipeline (s), s,
GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_UNKNOWN);
GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
GST_MESSAGE_ASYNC_DONE, GST_MESSAGE_UNKNOWN);
s = "fakesrc can-activate-push=false num-buffers=10 ! fakesink can-activate-pull=true";
run_pipeline (setup_pipeline (s), s,
GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_EOS);
GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
GST_MESSAGE_ASYNC_DONE, GST_MESSAGE_EOS);
s = "fakesrc can-activate-push=true num-buffers=10 ! fakesink can-activate-pull=false";
run_pipeline (setup_pipeline (s), s,
GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_EOS);
GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
GST_MESSAGE_ASYNC_DONE, GST_MESSAGE_EOS);
s = "fakesrc can-activate-push=false ! fakesink can-activate-pull=false";
ASSERT_CRITICAL (run_pipeline (setup_pipeline (s), s,
GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED,
GST_MESSAGE_UNKNOWN));
GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
GST_MESSAGE_ASYNC_DONE, GST_MESSAGE_UNKNOWN));
}
GST_END_TEST;