gst/gstbin.c: Add helper function to find messages.

Original commit message from CVS:
* gst/gstbin.c: (find_message), (bin_replace_message), (is_eos),
(gst_bin_remove_func), (gst_bin_element_set_state),
(bin_handle_async_start), (bin_handle_async_done),
(gst_bin_handle_message_func):
Add helper function to find messages.
Generate the async-done messages together with the state change
messages.
Small cleanups in handling toplevel bins.
This commit is contained in:
Wim Taymans 2007-06-07 10:11:47 +00:00
parent 2a3d26e66e
commit 491de61f5a
2 changed files with 87 additions and 77 deletions

View file

@ -1,3 +1,14 @@
2007-06-07 Wim Taymans <wim@fluendo.com>
* gst/gstbin.c: (find_message), (bin_replace_message), (is_eos),
(gst_bin_remove_func), (gst_bin_element_set_state),
(bin_handle_async_start), (bin_handle_async_done),
(gst_bin_handle_message_func):
Add helper function to find messages.
Generate the async-done messages together with the state change
messages.
Small cleanups in handling toplevel bins.
2007-06-06 Tim-Philipp Müller <tim at centricular dot net>
* libs/gst/base/gstdataqueue.c:

View file

@ -207,7 +207,8 @@ static GstStateChangeReturn gst_bin_change_state_func (GstElement * element,
GstStateChange transition);
static GstStateChangeReturn gst_bin_get_state_func (GstElement * element,
GstState * state, GstState * pending, GstClockTime timeout);
static void bin_handle_async_done (GstBin * bin, GstMessage ** message);
static void bin_handle_async_done (GstBin * bin, GstMessage ** smessage,
GstMessage ** amessage);
static void bin_push_state_continue (GstBin * bin);
static gboolean gst_bin_add_func (GstBin * bin, GstElement * element);
@ -708,6 +709,21 @@ message_check (GstMessage * message, MessageFind * target)
return (eq ? 0 : 1);
}
static GList *
find_message (GstBin * bin, GstObject * src, GstMessageType types)
{
GList *result;
MessageFind find;
find.src = src;
find.types = types;
result = g_list_find_custom (bin->messages, &find,
(GCompareFunc) message_check);
return result;
}
/* with LOCK, returns TRUE if message had a valid SRC, takes ref on
* the message.
*
@ -725,15 +741,8 @@ bin_replace_message (GstBin * bin, GstMessage * message, GstMessageType types)
name = GST_MESSAGE_TYPE_NAME (message);
if ((src = GST_MESSAGE_SRC (message))) {
MessageFind find;
find.src = src;
find.types = types;
/* first find the previous message posted by this element */
previous = g_list_find_custom (bin->messages, &find,
(GCompareFunc) message_check);
if (previous) {
if ((previous = find_message (bin, src, types))) {
/* if we found a previous message, replace it */
gst_message_unref (previous->data);
previous->data = message;
@ -800,14 +809,8 @@ is_eos (GstBin * bin)
element = GST_ELEMENT_CAST (walk->data);
if (bin_element_is_sink (element, bin) == 0) {
MessageFind find;
/* check if element posted EOS */
find.src = GST_OBJECT_CAST (element);
find.types = GST_MESSAGE_EOS;
if (g_list_find_custom (bin->messages, &find,
(GCompareFunc) message_check)) {
if (find_message (bin, GST_OBJECT_CAST (element), GST_MESSAGE_EOS)) {
GST_DEBUG ("element posted EOS");
} else {
GST_DEBUG ("element did not post EOS yet");
@ -999,9 +1002,10 @@ gst_bin_remove_func (GstBin * bin, GstElement * element)
gchar *elem_name;
GstIterator *it;
gboolean is_sink;
GstMessage *clock_message = NULL, *async_message = NULL, *smessage = NULL;
GstMessage *clock_message = NULL, *async_message = NULL, *state_message =
NULL;
GList *walk, *next;
gboolean other_async = FALSE, this_async = FALSE, cont = FALSE;
gboolean other_async = FALSE, this_async = FALSE, toplevel = FALSE;
GST_DEBUG_OBJECT (bin, "element :%s", GST_ELEMENT_NAME (element));
@ -1086,10 +1090,9 @@ gst_bin_remove_func (GstBin * bin, GstElement * element)
if (!other_async && this_async) {
GST_DEBUG_OBJECT (bin, "we removed the last async element");
cont = ((GST_OBJECT_PARENT (bin) == NULL) || bin->priv->asynchandling);
if (!cont) {
bin_handle_async_done (bin, &smessage);
async_message = gst_message_new_async_done (GST_OBJECT_CAST (bin));
toplevel = ((GST_OBJECT_PARENT (bin) == NULL) || bin->priv->asynchandling);
if (!toplevel) {
bin_handle_async_done (bin, &state_message, &async_message);
}
}
GST_DEBUG_OBJECT (bin, "marking state dirty");
@ -1099,13 +1102,13 @@ gst_bin_remove_func (GstBin * bin, GstElement * element)
if (clock_message)
gst_element_post_message (GST_ELEMENT_CAST (bin), clock_message);
if (smessage)
gst_element_post_message (GST_ELEMENT_CAST (bin), smessage);
if (state_message)
gst_element_post_message (GST_ELEMENT_CAST (bin), state_message);
if (async_message)
gst_element_post_message (GST_ELEMENT_CAST (bin), async_message);
if (cont)
if (toplevel)
bin_push_state_continue (bin);
GST_CAT_INFO_OBJECT (GST_CAT_PARENTAGE, bin, "removed child \"%s\"",
@ -1680,7 +1683,7 @@ reset_degree (GstElement * element, GstBinSortIterator * bit)
}
/* adjust the degree of all elements connected to the given
* element. If an degree of an element drops to 0, it is
* element. If a degree of an element drops to 0, it is
* added to the queue of elements to schedule next.
*
* We have to make sure not to cross the bin boundary this element
@ -1898,7 +1901,6 @@ gst_bin_element_set_state (GstBin * bin, GstElement * element,
GstStateChangeReturn ret;
gboolean locked;
GList *found;
MessageFind find;
/* peel off the locked flag */
GST_OBJECT_LOCK (element);
@ -1909,20 +1911,18 @@ gst_bin_element_set_state (GstBin * bin, GstElement * element,
if (G_UNLIKELY (locked))
goto locked;
GST_OBJECT_LOCK (bin);
/* the element was busy with an upwards async state change, we must wait for
* an ASYNC_DONE message before we attemp to change the state. */
find.src = GST_OBJECT_CAST (element);
find.types = GST_MESSAGE_ASYNC_START;
GST_OBJECT_LOCK (bin);
if ((found = g_list_find_custom (bin->messages, &find,
(GCompareFunc) message_check))) {
if ((found =
find_message (bin, GST_OBJECT_CAST (element),
GST_MESSAGE_ASYNC_START))) {
#ifndef GST_DISABLE_GST_DEBUG
GstMessage *message = GST_MESSAGE_CAST (found->data);
#endif
GST_DEBUG_OBJECT (element, "element message %p, %s async busy",
message, GST_ELEMENT_NAME (GST_MESSAGE_SRC (message)));
#endif
/* only wait for upward state changes */
if (next > current) {
/* We found an async element check if we can force its state to change or
@ -2443,6 +2443,8 @@ bin_handle_async_start (GstBin * bin, GstMessage ** message)
old_state = GST_STATE (bin);
/* when we PLAYING we go back to PAUSED, when preroll happens, we go back to
* PLAYING after optionally redistributing the base_time. */
if (old_state > GST_STATE_PAUSED)
new_state = GST_STATE_PAUSED;
else
@ -2474,14 +2476,17 @@ was_busy:
}
}
/* this function is called when there are no more async elements in the bin. We
* post a state changed message and an ASYNC_DONE message. */
static void
bin_handle_async_done (GstBin * bin, GstMessage ** message)
bin_handle_async_done (GstBin * bin, GstMessage ** smessage,
GstMessage ** amessage)
{
GstState pending;
GstStateChangeReturn old_ret;
GstState old_state, old_next;
GstState current, next;
gboolean cont;
gboolean toplevel;
old_ret = GST_STATE_RETURN (bin);
GST_STATE_RETURN (bin) = GST_STATE_CHANGE_SUCCESS;
@ -2491,6 +2496,8 @@ bin_handle_async_done (GstBin * bin, GstMessage ** message)
if (pending == GST_STATE_VOID_PENDING)
goto nothing_pending;
*amessage = gst_message_new_async_done (GST_OBJECT_CAST (bin));
old_state = GST_STATE (bin);
/* this is the state we should go to next */
old_next = GST_STATE_NEXT (bin);
@ -2499,12 +2506,12 @@ bin_handle_async_done (GstBin * bin, GstMessage ** message)
/* see if we need to continue the state change on our own. This happens when
* we were asked to do so or when we are the toplevel bin. */
cont = bin->priv->asynchandling || (GST_OBJECT_PARENT (bin) == NULL);
toplevel = ((GST_OBJECT_PARENT (bin) == NULL) || bin->priv->asynchandling);
/* see if we reached the final state. If we are not a toplevel bin we also
* must stop at this state change, the parent will set us to the required
* state eventually. */
if (pending == current || !cont)
if (pending == current || !toplevel)
goto complete;
next = GST_STATE_GET_NEXT (current, pending);
@ -2519,7 +2526,7 @@ bin_handle_async_done (GstBin * bin, GstMessage ** message)
gst_element_state_get_name (old_next),
gst_element_state_get_name (pending));
*message = gst_message_new_state_changed (GST_OBJECT_CAST (bin),
*smessage = gst_message_new_state_changed (GST_OBJECT_CAST (bin),
old_state, old_next, pending);
return;
@ -2543,7 +2550,7 @@ complete:
* We do signal the cond though as a _get_state() might be blocking
* on it. */
if (old_state != old_next || old_ret == GST_STATE_CHANGE_ASYNC) {
*message = gst_message_new_state_changed (GST_OBJECT_CAST (bin),
*smessage = gst_message_new_state_changed (GST_OBJECT_CAST (bin),
old_state, old_next, GST_STATE_VOID_PENDING);
}
@ -2658,7 +2665,6 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message)
break;
case GST_MESSAGE_SEGMENT_DONE:
{
MessageFind find;
gboolean post = FALSE;
GstFormat format;
gint64 position;
@ -2671,11 +2677,7 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message)
* a segment_done and we can post one on the bus. */
/* we don't care who still has a pending segment start */
find.src = NULL;
find.types = GST_MESSAGE_SEGMENT_START;
if (!g_list_find_custom (bin->messages, &find,
(GCompareFunc) message_check)) {
if (!find_message (bin, NULL, GST_MESSAGE_SEGMENT_START)) {
/* nothing found */
post = TRUE;
/* remove all old segment_done messages */
@ -2755,7 +2757,7 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message)
}
case GST_MESSAGE_ASYNC_START:
{
gboolean forward = FALSE, new_base_time;
gboolean toplevel, new_base_time;
GstState target;
GstMessage *smessage = NULL;
@ -2774,9 +2776,13 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message)
bin_handle_async_start (bin, &smessage);
/* get our toplevel state */
toplevel = ((GST_OBJECT_PARENT (bin) == NULL)
|| bin->priv->asynchandling);
/* prepare an ASYNC_START message */
if (GST_OBJECT_PARENT (bin) && (!bin->priv->asynchandling)) {
forward = TRUE;
if (!toplevel) {
/* non toplevel bin, prepare async-start for the parent */
message =
gst_message_new_async_start (GST_OBJECT_CAST (bin), new_base_time);
} else {
@ -2787,7 +2793,8 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message)
if (smessage)
gst_element_post_message (GST_ELEMENT_CAST (bin), smessage);
if (forward)
if (!toplevel)
/* not toplevel, forward async-start to parent */
goto forward;
else
break;
@ -2803,10 +2810,9 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message)
}
case GST_MESSAGE_ASYNC_DONE:
{
gboolean done = FALSE, toplevel = FALSE;
gboolean toplevel = FALSE;
GstState target;
MessageFind find;
GstMessage *smessage = NULL;
GstMessage *smessage = NULL, *amessage = NULL;
GST_DEBUG_OBJECT (bin, "ASYNC_DONE message %p, %s", message,
GST_OBJECT_NAME (src));
@ -2819,21 +2825,16 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message)
bin_replace_message (bin, message, GST_MESSAGE_ASYNC_START);
/* if there are no more ASYNC_START messages, everybody posted
* a ASYNC_DONE and we can post one on the bus. */
/* we don't care who still has a pending ASYNC_START */
find.src = NULL;
find.types = GST_MESSAGE_ASYNC_START;
if (!g_list_find_custom (bin->messages, &find,
(GCompareFunc) message_check)) {
* a ASYNC_DONE and we can post one on the bus. When checking, we
* don't care who still has a pending ASYNC_START */
if (!find_message (bin, NULL, GST_MESSAGE_ASYNC_START)) {
/* nothing found, remove all old ASYNC_DONE messages */
bin_remove_messages (bin, NULL, GST_MESSAGE_ASYNC_DONE);
done = TRUE;
toplevel = bin->priv->asynchandling
|| (GST_OBJECT_PARENT (bin) == NULL);
toplevel = ((GST_OBJECT_PARENT (bin) == NULL)
|| bin->priv->asynchandling);
if (!toplevel)
bin_handle_async_done (bin, &smessage);
bin_handle_async_done (bin, &smessage, &amessage);
}
GST_OBJECT_UNLOCK (bin);
@ -2841,18 +2842,16 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message)
GST_DEBUG_OBJECT (bin, "posting state change message");
gst_element_post_message (GST_ELEMENT_CAST (bin), smessage);
}
if (done) {
if (!toplevel) {
GST_DEBUG_OBJECT (bin,
"all async-done, posting ASYNC_DONE to parent");
/* post our combined ASYNC_DONE when all is ASYNC_DONE. */
gst_element_post_message (GST_ELEMENT_CAST (bin),
gst_message_new_async_done (GST_OBJECT_CAST (bin)));
} else {
/* toplevel, start continue state */
GST_DEBUG_OBJECT (bin, "all async-done, starting state continue");
bin_push_state_continue (bin);
}
if (amessage) {
/* post our combined ASYNC_DONE when all is ASYNC_DONE. */
GST_DEBUG_OBJECT (bin, "all async-done, posting ASYNC_DONE to parent");
gst_element_post_message (GST_ELEMENT_CAST (bin), amessage);
}
if (toplevel) {
/* toplevel, start continue state */
GST_DEBUG_OBJECT (bin, "all async-done, starting state continue");
bin_push_state_continue (bin);
}
break;