gst/gstbin.c: Move the common code for posting state-change messages into one function.

Original commit message from CVS:
* gst/gstbin.c: (gst_bin_add_func), (gst_bin_remove_func),
(gst_bin_change_state_func), (bin_push_state_continue),
(bin_handle_async_start), (bin_handle_async_done),
(gst_bin_handle_message_func):
Move the common code for posting state-change messages into
one function.
Broadcast the state signal after we posted the messages.
Mark the bin as busy when it's doing a state-change.
Make sure async-start/done messages don't interfere with the bin's
state when it's busy.
After the state change, let the bin check which elements completed the
state change while it was busy so that it can update its state.
This commit is contained in:
Wim Taymans 2007-06-19 10:41:33 +00:00
parent 1c0bafd86f
commit e599053491
2 changed files with 100 additions and 75 deletions

View file

@ -1,3 +1,18 @@
2007-06-19 Wim Taymans <wim@fluendo.com>
* gst/gstbin.c: (gst_bin_add_func), (gst_bin_remove_func),
(gst_bin_change_state_func), (bin_push_state_continue),
(bin_handle_async_start), (bin_handle_async_done),
(gst_bin_handle_message_func):
Move the common code for posting state-change messages into
one function.
Broadcast the state signal after we posted the messages.
Mark the bin as busy when it's doing a state-change.
Make sure async-start/done messages don't interfere with the bin's
state when it's busy.
After the state change, let the bin check which elements completed the
state change while it was busy so that it can update its state.
2007-06-19 Jan Schmidt <thaytan@mad.scientist.com> 2007-06-19 Jan Schmidt <thaytan@mad.scientist.com>
* docs/random/release: * docs/random/release:

View file

@ -214,9 +214,8 @@ static GstStateChangeReturn gst_bin_change_state_func (GstElement * element,
GstStateChange transition); GstStateChange transition);
static GstStateChangeReturn gst_bin_get_state_func (GstElement * element, static GstStateChangeReturn gst_bin_get_state_func (GstElement * element,
GstState * state, GstState * pending, GstClockTime timeout); GstState * state, GstState * pending, GstClockTime timeout);
static void bin_handle_async_done (GstBin * bin, GstMessage ** smessage, static void bin_handle_async_done (GstBin * bin, GstStateChangeReturn ret);
GstMessage ** amessage, BinContinueData ** cont, GstStateChangeReturn ret); static void bin_handle_async_start (GstBin * bin);
static void bin_handle_async_start (GstBin * bin, GstMessage ** message);
static void bin_push_state_continue (BinContinueData * data); static void bin_push_state_continue (BinContinueData * data);
static gboolean gst_bin_add_func (GstBin * bin, GstElement * element); static gboolean gst_bin_add_func (GstBin * bin, GstElement * element);
@ -854,9 +853,7 @@ gst_bin_add_func (GstBin * bin, GstElement * element)
gchar *elem_name; gchar *elem_name;
GstIterator *it; GstIterator *it;
gboolean is_sink; gboolean is_sink;
GstMessage *clock_message = NULL, *async_message = NULL, *state_message = GstMessage *clock_message = NULL;
NULL;
BinContinueData *cont = NULL;
GstStateChangeReturn ret; GstStateChangeReturn ret;
GST_DEBUG_OBJECT (bin, "element :%s", GST_ELEMENT_NAME (element)); GST_DEBUG_OBJECT (bin, "element :%s", GST_ELEMENT_NAME (element));
@ -916,10 +913,10 @@ gst_bin_add_func (GstBin * bin, GstElement * element)
switch (ret) { switch (ret) {
case GST_STATE_CHANGE_ASYNC: case GST_STATE_CHANGE_ASYNC:
bin_handle_async_start (bin, &state_message); bin_handle_async_start (bin);
break; break;
case GST_STATE_CHANGE_NO_PREROLL: case GST_STATE_CHANGE_NO_PREROLL:
bin_handle_async_done (bin, &state_message, &async_message, &cont, ret); bin_handle_async_done (bin, ret);
break; break;
case GST_STATE_CHANGE_FAILURE: case GST_STATE_CHANGE_FAILURE:
break; break;
@ -939,18 +936,9 @@ no_state_recalc:
gst_element_set_clock (element, GST_ELEMENT_CLOCK (bin)); gst_element_set_clock (element, GST_ELEMENT_CLOCK (bin));
GST_OBJECT_UNLOCK (bin); GST_OBJECT_UNLOCK (bin);
if (state_message)
gst_element_post_message (GST_ELEMENT_CAST (bin), state_message);
if (async_message)
gst_element_post_message (GST_ELEMENT_CAST (bin), async_message);
if (clock_message) if (clock_message)
gst_element_post_message (GST_ELEMENT_CAST (bin), clock_message); gst_element_post_message (GST_ELEMENT_CAST (bin), clock_message);
if (cont)
bin_push_state_continue (cont);
/* unlink all linked pads */ /* unlink all linked pads */
it = gst_element_iterate_pads (element); it = gst_element_iterate_pads (element);
gst_iterator_foreach (it, (GFunc) unlink_pads, element); gst_iterator_foreach (it, (GFunc) unlink_pads, element);
@ -1046,9 +1034,7 @@ gst_bin_remove_func (GstBin * bin, GstElement * element)
gchar *elem_name; gchar *elem_name;
GstIterator *it; GstIterator *it;
gboolean is_sink, othersink, found; gboolean is_sink, othersink, found;
GstMessage *clock_message = NULL, *async_message = NULL, *state_message = GstMessage *clock_message = NULL;
NULL;
BinContinueData *cont = NULL;
GList *walk, *next; GList *walk, *next;
gboolean other_async, this_async, have_no_preroll; gboolean other_async, this_async, have_no_preroll;
GstStateChangeReturn ret; GstStateChangeReturn ret;
@ -1179,7 +1165,7 @@ gst_bin_remove_func (GstBin * bin, GstElement * element)
else else
ret = GST_STATE_CHANGE_SUCCESS; ret = GST_STATE_CHANGE_SUCCESS;
bin_handle_async_done (bin, &state_message, &async_message, &cont, ret); bin_handle_async_done (bin, ret);
} else { } else {
GST_DEBUG_OBJECT (bin, GST_DEBUG_OBJECT (bin,
"recalc state preroll: %d, other async: %d, this async %d", "recalc state preroll: %d, other async: %d, this async %d",
@ -1204,15 +1190,6 @@ no_state_recalc:
if (clock_message) if (clock_message)
gst_element_post_message (GST_ELEMENT_CAST (bin), clock_message); gst_element_post_message (GST_ELEMENT_CAST (bin), clock_message);
if (state_message)
gst_element_post_message (GST_ELEMENT_CAST (bin), state_message);
if (async_message)
gst_element_post_message (GST_ELEMENT_CAST (bin), async_message);
if (cont)
bin_push_state_continue (cont);
GST_CAT_INFO_OBJECT (GST_CAT_PARENTAGE, bin, "removed child \"%s\"", GST_CAT_INFO_OBJECT (GST_CAT_PARENTAGE, bin, "removed child \"%s\"",
elem_name); elem_name);
g_free (elem_name); g_free (elem_name);
@ -2036,6 +2013,10 @@ gst_bin_change_state_func (GstElement * element, GstStateChange transition)
break; break;
} }
GST_OBJECT_LOCK (bin);
bin->polling = TRUE;
GST_OBJECT_UNLOCK (bin);
/* iterate in state change order */ /* iterate in state change order */
it = gst_bin_iterate_sorted (bin); it = gst_bin_iterate_sorted (bin);
@ -2121,6 +2102,31 @@ restart:
done: done:
gst_iterator_free (it); gst_iterator_free (it);
GST_OBJECT_LOCK (bin);
bin->polling = FALSE;
if (ret != GST_STATE_CHANGE_ASYNC) {
GST_DEBUG_OBJECT (bin, "no async elements");
goto state_end;
}
if (GST_STATE_TARGET (bin) <= GST_STATE_READY) {
GST_DEBUG_OBJECT (bin, "target state %s <= READY",
gst_element_state_get_name (GST_STATE_TARGET (bin)));
goto state_end;
}
GST_DEBUG_OBJECT (bin, "check async elements");
/* check if all elements managed to commit their state already */
if (!find_message (bin, NULL, GST_MESSAGE_ASYNC_START)) {
/* nothing found, remove all old ASYNC_DONE messages */
bin_remove_messages (bin, NULL, GST_MESSAGE_ASYNC_DONE);
GST_DEBUG_OBJECT (bin, "async elements commited");
bin_handle_async_done (bin, GST_STATE_CHANGE_SUCCESS);
}
state_end:
GST_OBJECT_UNLOCK (bin);
GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element, GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element,
"done changing bin's state from %s to %s, now in %s, ret %s", "done changing bin's state from %s to %s, now in %s, ret %s",
gst_element_state_get_name (current), gst_element_state_get_name (current),
@ -2260,24 +2266,22 @@ bin_push_state_continue (BinContinueData * data)
bin = data->bin; bin = data->bin;
klass = GST_BIN_GET_CLASS (bin); klass = GST_BIN_GET_CLASS (bin);
GST_OBJECT_LOCK (bin);
GST_DEBUG_OBJECT (bin, "pushing continue on thread pool"); GST_DEBUG_OBJECT (bin, "pushing continue on thread pool");
g_thread_pool_push (klass->pool, data, NULL); g_thread_pool_push (klass->pool, data, NULL);
GST_OBJECT_UNLOCK (bin);
} }
/* an element started an async state change, if we were not busy with a state /* an element started an async state change, if we were not busy with a state
* change, we perform a lost state. * change, we perform a lost state.
*/ */
static void static void
bin_handle_async_start (GstBin * bin, GstMessage ** message) bin_handle_async_start (GstBin * bin)
{ {
GstState old_state, new_state; GstState old_state, new_state;
if (GST_STATE_RETURN (bin) == GST_STATE_CHANGE_FAILURE) if (GST_STATE_RETURN (bin) == GST_STATE_CHANGE_FAILURE)
goto had_error; goto had_error;
if (GST_STATE_PENDING (bin) != GST_STATE_VOID_PENDING) if (bin->polling || GST_STATE_PENDING (bin) != GST_STATE_VOID_PENDING)
goto was_busy; goto was_busy;
/* async starts are ignored when we are NO_PREROLL */ /* async starts are ignored when we are NO_PREROLL */
@ -2301,9 +2305,14 @@ bin_handle_async_start (GstBin * bin, GstMessage ** message)
GST_STATE_NEXT (bin) = new_state; GST_STATE_NEXT (bin) = new_state;
GST_STATE_PENDING (bin) = new_state; GST_STATE_PENDING (bin) = new_state;
GST_STATE_RETURN (bin) = GST_STATE_CHANGE_ASYNC; GST_STATE_RETURN (bin) = GST_STATE_CHANGE_ASYNC;
GST_OBJECT_UNLOCK (bin);
*message = gst_message_new_state_changed (GST_OBJECT_CAST (bin), /* post message */
new_state, new_state, new_state); gst_element_post_message (GST_ELEMENT_CAST (bin),
gst_message_new_state_changed (GST_OBJECT_CAST (bin),
new_state, new_state, new_state));
GST_OBJECT_LOCK (bin);
return; return;
@ -2327,19 +2336,23 @@ was_no_preroll:
/* this function is called when there are no more async elements in the bin. We /* this function is called when there are no more async elements in the bin. We
* post a state changed message and an ASYNC_DONE message. */ * post a state changed message and an ASYNC_DONE message. */
static void static void
bin_handle_async_done (GstBin * bin, GstMessage ** smessage, bin_handle_async_done (GstBin * bin, GstStateChangeReturn ret)
GstMessage ** amessage, BinContinueData ** cont, GstStateChangeReturn ret)
{ {
GstState current, pending, target; GstState current, pending, target;
GstStateChangeReturn old_ret; GstStateChangeReturn old_ret;
GstState old_state, old_next; GstState old_state, old_next;
gboolean toplevel; gboolean toplevel;
GstMessage *smessage = NULL, *amessage = NULL;
BinContinueData *cont = NULL;
if (GST_STATE_RETURN (bin) == GST_STATE_CHANGE_FAILURE) if (GST_STATE_RETURN (bin) == GST_STATE_CHANGE_FAILURE)
goto had_error; goto had_error;
pending = GST_STATE_PENDING (bin); pending = GST_STATE_PENDING (bin);
if (bin->polling)
goto was_busy;
/* check if there is something to commit */ /* check if there is something to commit */
if (pending == GST_STATE_VOID_PENDING) if (pending == GST_STATE_VOID_PENDING)
goto nothing_pending; goto nothing_pending;
@ -2351,7 +2364,7 @@ bin_handle_async_done (GstBin * bin, GstMessage ** smessage,
target = GST_STATE_TARGET (bin); target = GST_STATE_TARGET (bin);
pending = GST_STATE_PENDING (bin) = target; pending = GST_STATE_PENDING (bin) = target;
*amessage = gst_message_new_async_done (GST_OBJECT_CAST (bin)); amessage = gst_message_new_async_done (GST_OBJECT_CAST (bin));
old_state = GST_STATE (bin); old_state = GST_STATE (bin);
/* this is the state we should go to next */ /* this is the state we should go to next */
@ -2383,35 +2396,50 @@ bin_handle_async_done (GstBin * bin, GstMessage ** smessage,
pending = GST_STATE_VOID_PENDING; pending = GST_STATE_VOID_PENDING;
GST_STATE_PENDING (bin) = pending; GST_STATE_PENDING (bin) = pending;
GST_STATE_NEXT (bin) = GST_STATE_VOID_PENDING; GST_STATE_NEXT (bin) = GST_STATE_VOID_PENDING;
GST_STATE_BROADCAST (bin);
} else { } else {
BinContinueData *data;
GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin,
"continue state change, pending %s", "continue state change, pending %s",
gst_element_state_get_name (pending)); gst_element_state_get_name (pending));
data = g_new0 (BinContinueData, 1); cont = g_new0 (BinContinueData, 1);
/* ref to the bin */ /* ref to the bin */
data->bin = gst_object_ref (bin); cont->bin = gst_object_ref (bin);
/* cookie to detect concurrent state change */ /* cookie to detect concurrent state change */
data->cookie = GST_ELEMENT_CAST (bin)->state_cookie; cont->cookie = GST_ELEMENT_CAST (bin)->state_cookie;
/* pending target state */ /* pending target state */
data->pending = pending; cont->pending = pending;
/* mark busy */ /* mark busy */
GST_STATE_RETURN (bin) = GST_STATE_CHANGE_ASYNC; GST_STATE_RETURN (bin) = GST_STATE_CHANGE_ASYNC;
*cont = data;
} }
if (old_next != GST_STATE_PLAYING) { if (old_next != GST_STATE_PLAYING) {
if (old_state != old_next || old_ret == GST_STATE_CHANGE_ASYNC) { if (old_state != old_next || old_ret == GST_STATE_CHANGE_ASYNC) {
*smessage = gst_message_new_state_changed (GST_OBJECT_CAST (bin), smessage = gst_message_new_state_changed (GST_OBJECT_CAST (bin),
old_state, old_next, pending); old_state, old_next, pending);
} }
} }
GST_OBJECT_UNLOCK (bin);
if (smessage) {
GST_DEBUG_OBJECT (bin, "posting state change message");
gst_element_post_message (GST_ELEMENT_CAST (bin), smessage);
}
if (amessage) {
/* post our combined ASYNC_DONE when all is ASYNC_DONE. */
GST_DEBUG_OBJECT (bin, "posting ASYNC_DONE to parent");
gst_element_post_message (GST_ELEMENT_CAST (bin), amessage);
}
GST_OBJECT_LOCK (bin);
if (cont) {
/* toplevel, start continue state */
GST_DEBUG_OBJECT (bin, "all async-done, starting state continue");
bin_push_state_continue (cont);
} else {
GST_DEBUG_OBJECT (bin, "state change complete");
GST_STATE_BROADCAST (bin);
}
return; return;
had_error: had_error:
@ -2419,6 +2447,11 @@ had_error:
GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin, "we had an error"); GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin, "we had an error");
return; return;
} }
was_busy:
{
GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin, "state change busy");
return;
}
nothing_pending: nothing_pending:
{ {
GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "nothing pending"); GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "nothing pending");
@ -2625,7 +2658,6 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message)
{ {
gboolean toplevel, new_base_time; gboolean toplevel, new_base_time;
GstState target; GstState target;
GstMessage *smessage = NULL;
GST_DEBUG_OBJECT (bin, "ASYNC_START message %p, %s", message, GST_DEBUG_OBJECT (bin, "ASYNC_START message %p, %s", message,
GST_OBJECT_NAME (src)); GST_OBJECT_NAME (src));
@ -2641,7 +2673,7 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message)
/* takes ownership of the message */ /* takes ownership of the message */
bin_replace_message (bin, message, GST_MESSAGE_ASYNC_START); bin_replace_message (bin, message, GST_MESSAGE_ASYNC_START);
bin_handle_async_start (bin, &smessage); bin_handle_async_start (bin);
/* get our toplevel state */ /* get our toplevel state */
toplevel = ((GST_OBJECT_PARENT (bin) == NULL) toplevel = ((GST_OBJECT_PARENT (bin) == NULL)
@ -2658,9 +2690,6 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message)
} }
GST_OBJECT_UNLOCK (bin); GST_OBJECT_UNLOCK (bin);
if (smessage)
gst_element_post_message (GST_ELEMENT_CAST (bin), smessage);
if (!toplevel) if (!toplevel)
/* not toplevel, forward async-start to parent */ /* not toplevel, forward async-start to parent */
goto forward; goto forward;
@ -2679,8 +2708,6 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message)
case GST_MESSAGE_ASYNC_DONE: case GST_MESSAGE_ASYNC_DONE:
{ {
GstState target; GstState target;
GstMessage *smessage = NULL, *amessage = NULL;
BinContinueData *cont = NULL;
GST_DEBUG_OBJECT (bin, "ASYNC_DONE message %p, %s", message, GST_DEBUG_OBJECT (bin, "ASYNC_DONE message %p, %s", message,
GST_OBJECT_NAME (src)); GST_OBJECT_NAME (src));
@ -2699,26 +2726,9 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message)
/* nothing found, remove all old ASYNC_DONE messages */ /* nothing found, remove all old ASYNC_DONE messages */
bin_remove_messages (bin, NULL, GST_MESSAGE_ASYNC_DONE); bin_remove_messages (bin, NULL, GST_MESSAGE_ASYNC_DONE);
bin_handle_async_done (bin, &smessage, &amessage, &cont, bin_handle_async_done (bin, GST_STATE_CHANGE_SUCCESS);
GST_STATE_CHANGE_SUCCESS);
} }
GST_OBJECT_UNLOCK (bin); GST_OBJECT_UNLOCK (bin);
if (smessage) {
GST_DEBUG_OBJECT (bin, "posting state change message");
gst_element_post_message (GST_ELEMENT_CAST (bin), smessage);
}
if (amessage) {
/* post our combined ASYNC_DONE when all is ASYNC_DONE. */
GST_DEBUG_OBJECT (bin, "all async-done, posting ASYNC_DONE to parent");
gst_element_post_message (GST_ELEMENT_CAST (bin), amessage);
}
if (cont) {
/* toplevel, start continue state */
GST_DEBUG_OBJECT (bin, "all async-done, starting state continue");
bin_push_state_continue (cont);
}
break; break;
ignore_done_message: ignore_done_message: