decodebin: aggregate buffering messages

Aggregate buffering messages to only post the lower value
to avoid setting pipeline to playing while any multiqueue
is still buffering.

There are 3 scenarios where the entries should be removed from
the list:

1) When decodebin is set to READY
2) When an element posts a 100% buffering (already implemented)
3) When a multiqueue is removed from decodebin.

For item 3 we don't need to handle it because this should only
happen when either 1 is hapenning or when it is playing a
chained file, for which number 2 should have happened for the
previous stream to finish

https://bugzilla.gnome.org/show_bug.cgi?id=726423
This commit is contained in:
Thiago Santos 2014-03-16 14:27:30 -03:00
parent ba87655628
commit 783195ccef
2 changed files with 173 additions and 0 deletions

View file

@ -183,6 +183,8 @@ struct _GstDecodeBin
gboolean expose_allstreams; /* Whether to expose unknow type streams or not */
GList *filtered; /* elements for which error messages are filtered */
GList *buffering_status; /* element currently buffering messages */
};
struct _GstDecodeBinClass
@ -4563,6 +4565,9 @@ gst_decode_bin_change_state (GstElement * element, GstStateChange transition)
dbin->decode_chain = NULL;
}
EXPOSE_UNLOCK (dbin);
g_list_free_full (dbin->buffering_status,
(GDestroyNotify) gst_message_unref);
dbin->buffering_status = NULL;
break;
case GST_STATE_CHANGE_READY_TO_NULL:
default:
@ -4597,6 +4602,80 @@ gst_decode_bin_handle_message (GstBin * bin, GstMessage * msg)
GST_OBJECT_LOCK (dbin);
drop = (g_list_find (dbin->filtered, GST_MESSAGE_SRC (msg)) != NULL);
GST_OBJECT_UNLOCK (dbin);
} else if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_BUFFERING) {
gint perc, msg_perc;
gint smaller_perc = 100;
GstMessage *smaller = NULL;
GList *found = NULL;
GList *iter;
/* buffering messages must be aggregated as there might be multiple
* multiqueue in the pipeline and their independent buffering messages
* will confuse the application
*
* decodebin keeps a list of messages received from elements that are
* buffering.
* Rules are:
* 1) Always post the smaller buffering %
* 2) If an element posts a 100% buffering message, remove it from the list
* 3) When there are no more messages on the list, post 100% message
* 4) When an element posts a new buffering message, update the one
* on the list to this new value
*/
gst_message_parse_buffering (msg, &msg_perc);
/*
* Single loop for 2 things:
* 1) Look for a message with the same source
* 1.1) If the received message is 100%, remove it from the list
* 2) Find the minimum buffering from the list
*/
for (iter = dbin->buffering_status; iter;) {
GstMessage *bufstats = iter->data;
if (GST_MESSAGE_SRC (bufstats) == GST_MESSAGE_SRC (msg)) {
found = iter;
if (msg_perc < 100) {
gst_message_unref (iter->data);
bufstats = iter->data = gst_message_ref (msg);
} else {
GList *current = iter;
/* remove the element here and avoid confusing the loop */
iter = g_list_next (iter);
gst_message_unref (current->data);
dbin->buffering_status =
g_list_delete_link (dbin->buffering_status, current);
continue;
}
}
gst_message_parse_buffering (bufstats, &perc);
if (perc < smaller_perc) {
smaller_perc = perc;
smaller = bufstats;
}
iter = g_list_next (iter);
}
if (found == NULL && msg_perc < 100) {
if (msg_perc < smaller_perc) {
smaller_perc = msg_perc;
smaller = msg;
}
dbin->buffering_status =
g_list_prepend (dbin->buffering_status, gst_message_ref (msg));
}
/* now compute the buffering message that should be posted */
if (smaller_perc == 100) {
g_assert (dbin->buffering_status == NULL);
/* we are posting the original received msg */
} else {
gst_message_replace (&msg, smaller);
}
}
if (drop)

View file

@ -623,6 +623,99 @@ GST_START_TEST (test_parser_negotiation)
GST_END_TEST;
GST_START_TEST (test_buffering_aggregation)
{
GstElement *pipe, *decodebin;
GstMessage *msg;
GstElement *mq0, *mq1, *mq2;
gint perc;
pipe = gst_pipeline_new (NULL);
fail_unless (pipe != NULL, "failed to create pipeline");
decodebin = gst_element_factory_make ("decodebin", "decodebin");
fail_unless (decodebin != NULL, "Failed to create decodebin element");
fail_unless (gst_bin_add (GST_BIN (pipe), decodebin));
/* to simulate the buffering scenarios we stuff 2 multiqueues inside
* decodebin. This is hacky, but sould make decodebin handle its buffering
* messages all the same */
mq0 = gst_element_factory_make ("multiqueue", NULL);
mq1 = gst_element_factory_make ("multiqueue", NULL);
mq2 = gst_element_factory_make ("multiqueue", NULL);
fail_unless (gst_bin_add (GST_BIN (decodebin), mq0));
fail_unless (gst_bin_add (GST_BIN (decodebin), mq1));
fail_unless (gst_bin_add (GST_BIN (decodebin), mq2));
fail_unless_equals_int (gst_element_set_state (pipe, GST_STATE_READY),
GST_STATE_CHANGE_SUCCESS);
fail_unless_equals_int (gst_element_set_state (pipe, GST_STATE_PAUSED),
GST_STATE_CHANGE_ASYNC);
/* currently we shoud have no buffering messages */
msg = gst_bus_poll (GST_ELEMENT_BUS (pipe), GST_MESSAGE_BUFFERING, 0);
fail_unless (msg == NULL);
/* only a single element buffering, the buffering percent should be the
* same as it */
gst_element_post_message (mq0, gst_message_new_buffering (GST_OBJECT (mq0),
50));
msg = gst_bus_poll (GST_ELEMENT_BUS (pipe), GST_MESSAGE_BUFFERING, 0);
fail_unless (msg != NULL);
fail_unless (GST_MESSAGE_SRC (msg) == (GstObject *) mq0);
gst_message_parse_buffering (msg, &perc);
fail_unless (perc == 50);
gst_message_unref (msg);
/* two elements buffering, the buffering percent should be the
* lowest one */
gst_element_post_message (mq1, gst_message_new_buffering (GST_OBJECT (mq1),
20));
msg = gst_bus_poll (GST_ELEMENT_BUS (pipe), GST_MESSAGE_BUFFERING, 0);
fail_unless (msg != NULL);
fail_unless (GST_MESSAGE_SRC (msg) == (GstObject *) mq1);
gst_message_parse_buffering (msg, &perc);
fail_unless (perc == 20);
gst_message_unref (msg);
/* a 100% message should be ignored */
gst_element_post_message (mq2, gst_message_new_buffering (GST_OBJECT (mq2),
100));
msg = gst_bus_poll (GST_ELEMENT_BUS (pipe), GST_MESSAGE_BUFFERING, 0);
fail_unless (msg != NULL);
fail_unless (GST_MESSAGE_SRC (msg) == (GstObject *) mq1);
gst_message_parse_buffering (msg, &perc);
fail_unless (perc == 20);
gst_message_unref (msg);
/* a new buffering message is posted with a higher value, go with the 20 */
gst_element_post_message (mq2, gst_message_new_buffering (GST_OBJECT (mq2),
80));
msg = gst_bus_poll (GST_ELEMENT_BUS (pipe), GST_MESSAGE_BUFFERING, 0);
fail_unless (msg != NULL);
fail_unless (GST_MESSAGE_SRC (msg) == (GstObject *) mq1);
gst_message_parse_buffering (msg, &perc);
fail_unless (perc == 20);
gst_message_unref (msg);
/* The mq1 finishes buffering, new buffering status is now 50% from mq0 */
gst_element_post_message (mq1, gst_message_new_buffering (GST_OBJECT (mq1),
100));
msg = gst_bus_poll (GST_ELEMENT_BUS (pipe), GST_MESSAGE_BUFFERING, 0);
fail_unless (msg != NULL);
fail_unless (GST_MESSAGE_SRC (msg) == (GstObject *) mq0);
gst_message_parse_buffering (msg, &perc);
fail_unless (perc == 50);
gst_message_unref (msg);
gst_element_set_state (pipe, GST_STATE_NULL);
gst_object_unref (pipe);
}
GST_END_TEST;
static Suite *
decodebin_suite (void)
{
@ -634,6 +727,7 @@ decodebin_suite (void)
tcase_add_test (tc_chain, test_reuse_without_decoders);
tcase_add_test (tc_chain, test_mp3_parser_loop);
tcase_add_test (tc_chain, test_parser_negotiation);
tcase_add_test (tc_chain, test_buffering_aggregation);
return s;
}