multiqueue: Recalculate fill level after changing high-threshold

This ensures the following special case is handled properly:

1. Queue is empty
2. Data is pushed, fill level is below the current high-threshold
3. high-threshold is set to a level that is below the current fill level

Since mq->percent wasn't being recalculated in step #3 properly, this
caused the multiqueue to switch off its buffering state when new data is
pushed in, and never post a 100% buffering message. The application will
have received a <100% buffering message from step #2, but will never see
100%.

Fix this by recalculating the current fill level percentage during
high-threshold property changes in the same manner as it is done when
use-buffering is modified.

https://bugzilla.gnome.org/show_bug.cgi?id=763757
This commit is contained in:
Carlos Rafael Giani 2016-04-14 00:09:44 +02:00 committed by Sebastian Dröge
parent 895332e056
commit a1d2f387c6
2 changed files with 251 additions and 24 deletions

View file

@ -209,6 +209,7 @@ static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
static void update_buffering (GstMultiQueue * mq, GstSingleQueue * sq);
static void gst_multi_queue_post_buffering (GstMultiQueue * mq);
static void recheck_buffering_status (GstMultiQueue * mq);
static void gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full);
@ -613,36 +614,14 @@ gst_multi_queue_set_property (GObject * object, guint prop_id,
break;
case PROP_USE_BUFFERING:
mq->use_buffering = g_value_get_boolean (value);
if (!mq->use_buffering && mq->buffering) {
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
mq->buffering = FALSE;
GST_DEBUG_OBJECT (mq, "buffering 100 percent");
SET_PERCENT (mq, 100);
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
}
if (mq->use_buffering) {
GList *tmp;
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
tmp = mq->queues;
while (tmp) {
GstSingleQueue *q = (GstSingleQueue *) tmp->data;
update_buffering (mq, q);
gst_data_queue_limits_changed (q->queue);
tmp = g_list_next (tmp);
}
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
}
gst_multi_queue_post_buffering (mq);
recheck_buffering_status (mq);
break;
case PROP_LOW_PERCENT:
mq->low_percent = g_value_get_int (value);
break;
case PROP_HIGH_PERCENT:
mq->high_percent = g_value_get_int (value);
recheck_buffering_status (mq);
break;
case PROP_SYNC_BY_RUNNING_TIME:
mq->sync_by_running_time = g_value_get_boolean (value);
@ -1043,6 +1022,45 @@ gst_multi_queue_post_buffering (GstMultiQueue * mq)
g_mutex_unlock (&mq->buffering_post_lock);
}
static void
recheck_buffering_status (GstMultiQueue * mq)
{
if (!mq->use_buffering && mq->buffering) {
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
mq->buffering = FALSE;
GST_DEBUG_OBJECT (mq,
"Buffering property disabled, but queue was still buffering; setting percentage to 100%%");
SET_PERCENT (mq, 100);
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
}
if (mq->use_buffering) {
GList *tmp;
gint old_perc;
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
/* force fill level percentage to be recalculated */
old_perc = mq->percent;
mq->percent = 0;
tmp = mq->queues;
while (tmp) {
GstSingleQueue *q = (GstSingleQueue *) tmp->data;
update_buffering (mq, q);
gst_data_queue_limits_changed (q->queue);
tmp = g_list_next (tmp);
}
GST_DEBUG_OBJECT (mq, "Recalculated fill level: old: %d%% new: %d%%",
old_perc, mq->percent);
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
}
gst_multi_queue_post_buffering (mq);
}
static void
calculate_interleave (GstMultiQueue * mq)
{

View file

@ -862,6 +862,213 @@ GST_START_TEST (test_sparse_stream)
GST_END_TEST;
static gpointer
pad_push_datablock_thread (gpointer data)
{
GstPad *pad = data;
GstBuffer *buf;
buf = gst_buffer_new_allocate (NULL, 80 * 1000, NULL);
gst_pad_push (pad, buf);
return NULL;
}
static GstPadProbeReturn
block_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
{
return GST_PAD_PROBE_OK;
}
#define CHECK_FOR_BUFFERING_MSG(PIPELINE, EXPECTED_PERC) \
G_STMT_START { \
gint buf_perc; \
GstMessage *msg; \
GST_LOG ("waiting for %d%% buffering message", (EXPECTED_PERC)); \
msg = gst_bus_poll (GST_ELEMENT_BUS (PIPELINE), \
GST_MESSAGE_BUFFERING | GST_MESSAGE_ERROR, -1); \
fail_if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR, \
"Expected BUFFERING message, got ERROR message"); \
gst_message_parse_buffering (msg, &buf_perc); \
gst_message_unref (msg); \
fail_unless (buf_perc == (EXPECTED_PERC), \
"Got incorrect percentage: %d%% expected: %d%%", buf_perc, \
(EXPECTED_PERC)); \
} G_STMT_END
GST_START_TEST (test_initial_fill_above_high_threshold)
{
/* This test checks what happens if the first buffer that enters
* the queue immediately fills it above the high-threshold. */
GstElement *pipe;
GstElement *mq, *fakesink;
GstPad *inputpad;
GstPad *mq_sinkpad;
GstPad *sinkpad;
GstSegment segment;
GThread *thread;
/* Setup test pipeline with one multiqueue and one fakesink */
pipe = gst_pipeline_new ("testbin");
mq = gst_element_factory_make ("multiqueue", NULL);
fail_unless (mq != NULL);
gst_bin_add (GST_BIN (pipe), mq);
fakesink = gst_element_factory_make ("fakesink", NULL);
fail_unless (fakesink != NULL);
gst_bin_add (GST_BIN (pipe), fakesink);
/* Block fakesink sinkpad flow to ensure the queue isn't emptied
* by the prerolling sink */
sinkpad = gst_element_get_static_pad (fakesink, "sink");
gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_BLOCK, block_probe, NULL,
NULL);
gst_object_unref (sinkpad);
/* Set size limit to 1000000 byte, low threshold to 1%, high
* threshold to 5%, to make sure that even just one data push
* will exceed both thresholds.*/
g_object_set (mq,
"use-buffering", (gboolean) TRUE,
"max-size-bytes", (guint) 1000 * 1000,
"max-size-buffers", (guint) 0,
"max-size-time", (guint64) 0,
"extra-size-bytes", (guint) 0,
"extra-size-buffers", (guint) 0,
"extra-size-time", (guint64) 0,
"low-percent", (gint) 1, "high-percent", (gint) 5, NULL);
gst_segment_init (&segment, GST_FORMAT_TIME);
inputpad = gst_pad_new ("dummysrc", GST_PAD_SRC);
gst_pad_set_query_function (inputpad, mq_dummypad_query);
mq_sinkpad = gst_element_get_request_pad (mq, "sink_%u");
fail_unless (mq_sinkpad != NULL);
fail_unless (gst_pad_link (inputpad, mq_sinkpad) == GST_PAD_LINK_OK);
gst_pad_set_active (inputpad, TRUE);
gst_pad_push_event (inputpad, gst_event_new_stream_start ("test"));
gst_pad_push_event (inputpad, gst_event_new_segment (&segment));
gst_object_unref (mq_sinkpad);
fail_unless (gst_element_link (mq, fakesink));
/* Start pipeline in paused state to ensure the sink remains
* in preroll mode and blocks */
gst_element_set_state (pipe, GST_STATE_PAUSED);
/* Feed data. queue will be filled to 8% (because it pushes 80000 bytes),
* which is above both the low- and the high-threshold. This should
* produce a 100% buffering message. */
thread = g_thread_new ("push1", pad_push_datablock_thread, inputpad);
g_thread_join (thread);
CHECK_FOR_BUFFERING_MSG (pipe, 100);
gst_element_set_state (pipe, GST_STATE_NULL);
gst_object_unref (inputpad);
gst_object_unref (pipe);
}
GST_END_TEST;
GST_START_TEST (test_high_threshold_change)
{
/* This test checks what happens if the high threshold is changed to a
* value below the current buffer fill level. Expected behavior is for
* multiqueue to emit a 100% buffering message in that case. */
GstElement *pipe;
GstElement *mq, *fakesink;
GstPad *inputpad;
GstPad *mq_sinkpad;
GstPad *sinkpad;
GstSegment segment;
GThread *thread;
/* Setup test pipeline with one multiqueue and one fakesink */
pipe = gst_pipeline_new ("testbin");
mq = gst_element_factory_make ("multiqueue", NULL);
fail_unless (mq != NULL);
gst_bin_add (GST_BIN (pipe), mq);
fakesink = gst_element_factory_make ("fakesink", NULL);
fail_unless (fakesink != NULL);
gst_bin_add (GST_BIN (pipe), fakesink);
/* Block fakesink sinkpad flow to ensure the queue isn't emptied
* by the prerolling sink */
sinkpad = gst_element_get_static_pad (fakesink, "sink");
gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_BLOCK, block_probe, NULL,
NULL);
gst_object_unref (sinkpad);
g_object_set (mq,
"use-buffering", (gboolean) TRUE,
"max-size-bytes", (guint) 1000 * 1000,
"max-size-buffers", (guint) 0,
"max-size-time", (guint64) 0,
"extra-size-bytes", (guint) 0,
"extra-size-buffers", (guint) 0,
"extra-size-time", (guint64) 0,
"low-percent", (gint) 1, "high-percent", (gint) 99, NULL);
gst_segment_init (&segment, GST_FORMAT_TIME);
inputpad = gst_pad_new ("dummysrc", GST_PAD_SRC);
gst_pad_set_query_function (inputpad, mq_dummypad_query);
mq_sinkpad = gst_element_get_request_pad (mq, "sink_%u");
fail_unless (mq_sinkpad != NULL);
fail_unless (gst_pad_link (inputpad, mq_sinkpad) == GST_PAD_LINK_OK);
gst_pad_set_active (inputpad, TRUE);
gst_pad_push_event (inputpad, gst_event_new_stream_start ("test"));
gst_pad_push_event (inputpad, gst_event_new_segment (&segment));
gst_object_unref (mq_sinkpad);
fail_unless (gst_element_link (mq, fakesink));
/* Start pipeline in paused state to ensure the sink remains
* in preroll mode and blocks */
gst_element_set_state (pipe, GST_STATE_PAUSED);
/* Feed data. queue will be filled to 8% (because it pushes 80000 bytes),
* which is below the high-threshold, provoking a buffering message. */
thread = g_thread_new ("push1", pad_push_datablock_thread, inputpad);
g_thread_join (thread);
/* Check for the buffering message; it should indicate 8% fill level
* (Note that the percentage from the message is normalized, but since
* the high threshold is at 99%, it should still apply) */
CHECK_FOR_BUFFERING_MSG (pipe, 8);
/* Set high threshold to half of what it was before. This means that the
* relative fill level doubles. As a result, this should trigger a buffering
* message with a percentage of 16%. */
g_object_set (mq, "high-percent", (gint) 50, NULL);
CHECK_FOR_BUFFERING_MSG (pipe, 16);
/* Set high threshold to a value that lies below the current fill level.
* This should trigger a 100% buffering message immediately, even without
* pushing in extra data. */
g_object_set (mq, "high-percent", (gint) 5, NULL);
CHECK_FOR_BUFFERING_MSG (pipe, 100);
gst_element_set_state (pipe, GST_STATE_NULL);
gst_object_unref (inputpad);
gst_object_unref (pipe);
}
GST_END_TEST;
static gpointer
pad_push_thread (gpointer data)
{
@ -1210,6 +1417,8 @@ multiqueue_suite (void)
tcase_add_test (tc_chain, test_not_linked_eos);
tcase_add_test (tc_chain, test_sparse_stream);
tcase_add_test (tc_chain, test_initial_fill_above_high_threshold);
tcase_add_test (tc_chain, test_high_threshold_change);
tcase_add_test (tc_chain, test_limit_changes);
tcase_add_test (tc_chain, test_buffering_with_none_pts);