diff --git a/plugins/elements/gstmultiqueue.c b/plugins/elements/gstmultiqueue.c index fa508834f0..7a920d4bcf 100644 --- a/plugins/elements/gstmultiqueue.c +++ b/plugins/elements/gstmultiqueue.c @@ -209,6 +209,7 @@ static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq); static void update_buffering (GstMultiQueue * mq, GstSingleQueue * sq); static void gst_multi_queue_post_buffering (GstMultiQueue * mq); +static void recheck_buffering_status (GstMultiQueue * mq); static void gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full); @@ -613,36 +614,14 @@ gst_multi_queue_set_property (GObject * object, guint prop_id, break; case PROP_USE_BUFFERING: mq->use_buffering = g_value_get_boolean (value); - if (!mq->use_buffering && mq->buffering) { - GST_MULTI_QUEUE_MUTEX_LOCK (mq); - mq->buffering = FALSE; - GST_DEBUG_OBJECT (mq, "buffering 100 percent"); - SET_PERCENT (mq, 100); - GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); - } - - if (mq->use_buffering) { - GList *tmp; - - GST_MULTI_QUEUE_MUTEX_LOCK (mq); - - tmp = mq->queues; - while (tmp) { - GstSingleQueue *q = (GstSingleQueue *) tmp->data; - update_buffering (mq, q); - gst_data_queue_limits_changed (q->queue); - tmp = g_list_next (tmp); - } - - GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); - } - gst_multi_queue_post_buffering (mq); + recheck_buffering_status (mq); break; case PROP_LOW_PERCENT: mq->low_percent = g_value_get_int (value); break; case PROP_HIGH_PERCENT: mq->high_percent = g_value_get_int (value); + recheck_buffering_status (mq); break; case PROP_SYNC_BY_RUNNING_TIME: mq->sync_by_running_time = g_value_get_boolean (value); @@ -1043,6 +1022,45 @@ gst_multi_queue_post_buffering (GstMultiQueue * mq) g_mutex_unlock (&mq->buffering_post_lock); } +static void +recheck_buffering_status (GstMultiQueue * mq) +{ + if (!mq->use_buffering && mq->buffering) { + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + mq->buffering = FALSE; + GST_DEBUG_OBJECT (mq, + "Buffering property disabled, but queue was still buffering; setting percentage to 100%%"); + SET_PERCENT (mq, 100); + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + } + + if (mq->use_buffering) { + GList *tmp; + gint old_perc; + + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + + /* force fill level percentage to be recalculated */ + old_perc = mq->percent; + mq->percent = 0; + + tmp = mq->queues; + while (tmp) { + GstSingleQueue *q = (GstSingleQueue *) tmp->data; + update_buffering (mq, q); + gst_data_queue_limits_changed (q->queue); + tmp = g_list_next (tmp); + } + + GST_DEBUG_OBJECT (mq, "Recalculated fill level: old: %d%% new: %d%%", + old_perc, mq->percent); + + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + } + + gst_multi_queue_post_buffering (mq); +} + static void calculate_interleave (GstMultiQueue * mq) { diff --git a/tests/check/elements/multiqueue.c b/tests/check/elements/multiqueue.c index 736ebad757..6ccbbd869f 100644 --- a/tests/check/elements/multiqueue.c +++ b/tests/check/elements/multiqueue.c @@ -862,6 +862,213 @@ GST_START_TEST (test_sparse_stream) GST_END_TEST; +static gpointer +pad_push_datablock_thread (gpointer data) +{ + GstPad *pad = data; + GstBuffer *buf; + + buf = gst_buffer_new_allocate (NULL, 80 * 1000, NULL); + gst_pad_push (pad, buf); + + return NULL; +} + +static GstPadProbeReturn +block_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) +{ + return GST_PAD_PROBE_OK; +} + +#define CHECK_FOR_BUFFERING_MSG(PIPELINE, EXPECTED_PERC) \ + G_STMT_START { \ + gint buf_perc; \ + GstMessage *msg; \ + GST_LOG ("waiting for %d%% buffering message", (EXPECTED_PERC)); \ + msg = gst_bus_poll (GST_ELEMENT_BUS (PIPELINE), \ + GST_MESSAGE_BUFFERING | GST_MESSAGE_ERROR, -1); \ + fail_if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR, \ + "Expected BUFFERING message, got ERROR message"); \ + gst_message_parse_buffering (msg, &buf_perc); \ + gst_message_unref (msg); \ + fail_unless (buf_perc == (EXPECTED_PERC), \ + "Got incorrect percentage: %d%% expected: %d%%", buf_perc, \ + (EXPECTED_PERC)); \ + } G_STMT_END + +GST_START_TEST (test_initial_fill_above_high_threshold) +{ + /* This test checks what happens if the first buffer that enters + * the queue immediately fills it above the high-threshold. */ + GstElement *pipe; + GstElement *mq, *fakesink; + GstPad *inputpad; + GstPad *mq_sinkpad; + GstPad *sinkpad; + GstSegment segment; + GThread *thread; + + + /* Setup test pipeline with one multiqueue and one fakesink */ + + pipe = gst_pipeline_new ("testbin"); + mq = gst_element_factory_make ("multiqueue", NULL); + fail_unless (mq != NULL); + gst_bin_add (GST_BIN (pipe), mq); + + fakesink = gst_element_factory_make ("fakesink", NULL); + fail_unless (fakesink != NULL); + gst_bin_add (GST_BIN (pipe), fakesink); + + /* Block fakesink sinkpad flow to ensure the queue isn't emptied + * by the prerolling sink */ + sinkpad = gst_element_get_static_pad (fakesink, "sink"); + gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_BLOCK, block_probe, NULL, + NULL); + gst_object_unref (sinkpad); + + /* Set size limit to 1000000 byte, low threshold to 1%, high + * threshold to 5%, to make sure that even just one data push + * will exceed both thresholds.*/ + g_object_set (mq, + "use-buffering", (gboolean) TRUE, + "max-size-bytes", (guint) 1000 * 1000, + "max-size-buffers", (guint) 0, + "max-size-time", (guint64) 0, + "extra-size-bytes", (guint) 0, + "extra-size-buffers", (guint) 0, + "extra-size-time", (guint64) 0, + "low-percent", (gint) 1, "high-percent", (gint) 5, NULL); + + gst_segment_init (&segment, GST_FORMAT_TIME); + + inputpad = gst_pad_new ("dummysrc", GST_PAD_SRC); + gst_pad_set_query_function (inputpad, mq_dummypad_query); + + mq_sinkpad = gst_element_get_request_pad (mq, "sink_%u"); + fail_unless (mq_sinkpad != NULL); + fail_unless (gst_pad_link (inputpad, mq_sinkpad) == GST_PAD_LINK_OK); + + gst_pad_set_active (inputpad, TRUE); + + gst_pad_push_event (inputpad, gst_event_new_stream_start ("test")); + gst_pad_push_event (inputpad, gst_event_new_segment (&segment)); + + gst_object_unref (mq_sinkpad); + + fail_unless (gst_element_link (mq, fakesink)); + + /* Start pipeline in paused state to ensure the sink remains + * in preroll mode and blocks */ + gst_element_set_state (pipe, GST_STATE_PAUSED); + + /* Feed data. queue will be filled to 8% (because it pushes 80000 bytes), + * which is above both the low- and the high-threshold. This should + * produce a 100% buffering message. */ + thread = g_thread_new ("push1", pad_push_datablock_thread, inputpad); + g_thread_join (thread); + CHECK_FOR_BUFFERING_MSG (pipe, 100); + + gst_element_set_state (pipe, GST_STATE_NULL); + gst_object_unref (inputpad); + gst_object_unref (pipe); +} + +GST_END_TEST; + +GST_START_TEST (test_high_threshold_change) +{ + /* This test checks what happens if the high threshold is changed to a + * value below the current buffer fill level. Expected behavior is for + * multiqueue to emit a 100% buffering message in that case. */ + GstElement *pipe; + GstElement *mq, *fakesink; + GstPad *inputpad; + GstPad *mq_sinkpad; + GstPad *sinkpad; + GstSegment segment; + GThread *thread; + + + /* Setup test pipeline with one multiqueue and one fakesink */ + + pipe = gst_pipeline_new ("testbin"); + mq = gst_element_factory_make ("multiqueue", NULL); + fail_unless (mq != NULL); + gst_bin_add (GST_BIN (pipe), mq); + + fakesink = gst_element_factory_make ("fakesink", NULL); + fail_unless (fakesink != NULL); + gst_bin_add (GST_BIN (pipe), fakesink); + + /* Block fakesink sinkpad flow to ensure the queue isn't emptied + * by the prerolling sink */ + sinkpad = gst_element_get_static_pad (fakesink, "sink"); + gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_BLOCK, block_probe, NULL, + NULL); + gst_object_unref (sinkpad); + + g_object_set (mq, + "use-buffering", (gboolean) TRUE, + "max-size-bytes", (guint) 1000 * 1000, + "max-size-buffers", (guint) 0, + "max-size-time", (guint64) 0, + "extra-size-bytes", (guint) 0, + "extra-size-buffers", (guint) 0, + "extra-size-time", (guint64) 0, + "low-percent", (gint) 1, "high-percent", (gint) 99, NULL); + + gst_segment_init (&segment, GST_FORMAT_TIME); + + inputpad = gst_pad_new ("dummysrc", GST_PAD_SRC); + gst_pad_set_query_function (inputpad, mq_dummypad_query); + + mq_sinkpad = gst_element_get_request_pad (mq, "sink_%u"); + fail_unless (mq_sinkpad != NULL); + fail_unless (gst_pad_link (inputpad, mq_sinkpad) == GST_PAD_LINK_OK); + + gst_pad_set_active (inputpad, TRUE); + + gst_pad_push_event (inputpad, gst_event_new_stream_start ("test")); + gst_pad_push_event (inputpad, gst_event_new_segment (&segment)); + + gst_object_unref (mq_sinkpad); + + fail_unless (gst_element_link (mq, fakesink)); + + /* Start pipeline in paused state to ensure the sink remains + * in preroll mode and blocks */ + gst_element_set_state (pipe, GST_STATE_PAUSED); + + /* Feed data. queue will be filled to 8% (because it pushes 80000 bytes), + * which is below the high-threshold, provoking a buffering message. */ + thread = g_thread_new ("push1", pad_push_datablock_thread, inputpad); + g_thread_join (thread); + + /* Check for the buffering message; it should indicate 8% fill level + * (Note that the percentage from the message is normalized, but since + * the high threshold is at 99%, it should still apply) */ + CHECK_FOR_BUFFERING_MSG (pipe, 8); + + /* Set high threshold to half of what it was before. This means that the + * relative fill level doubles. As a result, this should trigger a buffering + * message with a percentage of 16%. */ + g_object_set (mq, "high-percent", (gint) 50, NULL); + CHECK_FOR_BUFFERING_MSG (pipe, 16); + + /* Set high threshold to a value that lies below the current fill level. + * This should trigger a 100% buffering message immediately, even without + * pushing in extra data. */ + g_object_set (mq, "high-percent", (gint) 5, NULL); + CHECK_FOR_BUFFERING_MSG (pipe, 100); + + gst_element_set_state (pipe, GST_STATE_NULL); + gst_object_unref (inputpad); + gst_object_unref (pipe); +} + +GST_END_TEST; + static gpointer pad_push_thread (gpointer data) { @@ -1210,6 +1417,8 @@ multiqueue_suite (void) tcase_add_test (tc_chain, test_not_linked_eos); tcase_add_test (tc_chain, test_sparse_stream); + tcase_add_test (tc_chain, test_initial_fill_above_high_threshold); + tcase_add_test (tc_chain, test_high_threshold_change); tcase_add_test (tc_chain, test_limit_changes); tcase_add_test (tc_chain, test_buffering_with_none_pts);