diff --git a/subprojects/gstreamer/libs/gst/base/gstaggregator.c b/subprojects/gstreamer/libs/gst/base/gstaggregator.c index 6bfe04ea29..d60aa14531 100644 --- a/subprojects/gstreamer/libs/gst/base/gstaggregator.c +++ b/subprojects/gstreamer/libs/gst/base/gstaggregator.c @@ -264,6 +264,12 @@ struct _GstAggregatorPadPrivate guint num_buffers; GstBuffer *peeked_buffer; + /* TRUE if the serialized query is in the proccess of handling at some + * exact moment. This will obligate the sinkpad streaming thread wait + * until the handling finishes. + * Always protected by the PAD_LOCK. */ + gboolean query_in_proccess; + /* used to track fill state of queues, only used with live-src and when * latency property is set to > 0 */ GstClockTime head_position; @@ -975,8 +981,10 @@ gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad, !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) { if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data))) event = gst_event_ref (g_queue_peek_tail (&pad->priv->data)); - if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data))) + if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data))) { query = g_queue_peek_tail (&pad->priv->data); + pad->priv->query_in_proccess = TRUE; + } } PAD_UNLOCK (pad); if (event || query) { @@ -1011,6 +1019,8 @@ gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad, NULL); g_queue_pop_tail (&pad->priv->data); } + + pad->priv->query_in_proccess = FALSE; } PAD_BROADCAST_EVENT (pad); @@ -2762,9 +2772,14 @@ gst_aggregator_default_sink_query_pre_queue (GstAggregator * self, SRC_BROADCAST (self); SRC_UNLOCK (self); - while (!gst_aggregator_pad_queue_is_empty (aggpad) - && aggpad->priv->flow_return == GST_FLOW_OK) { - GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); + /* Sanity check: aggregator's sink pad can only proccess one serialized + * query at a time. */ + g_warn_if_fail (!aggpad->priv->query_in_proccess); + + while ((!gst_aggregator_pad_queue_is_empty (aggpad) + && aggpad->priv->flow_return == GST_FLOW_OK) || + aggpad->priv->query_in_proccess) { + GST_DEBUG_OBJECT (aggpad, "Waiting for query to be consumed"); PAD_WAIT_EVENT (aggpad); } diff --git a/subprojects/gstreamer/tests/check/libs/aggregator.c b/subprojects/gstreamer/tests/check/libs/aggregator.c index 215b06186c..1f2c5b4e0f 100644 --- a/subprojects/gstreamer/tests/check/libs/aggregator.c +++ b/subprojects/gstreamer/tests/check/libs/aggregator.c @@ -166,6 +166,24 @@ gst_test_aggregator_aggregate (GstAggregator * aggregator, gboolean timeout) #define gst_test_aggregator_parent_class parent_class G_DEFINE_TYPE (GstTestAggregator, gst_test_aggregator, GST_TYPE_AGGREGATOR); +static gboolean gst_aggregator_test_slow_down_sink_query = FALSE; + +static gboolean +gst_aggregator_test_slow_sink_query (GstAggregator * self, + GstAggregatorPad * aggpad, GstQuery * query) +{ + GST_DEBUG ("Handling query %" GST_PTR_FORMAT, query); + if (GST_QUERY_IS_SERIALIZED (query)) { + GstStructure *s = gst_query_writable_structure (query); + + if (gst_aggregator_test_slow_down_sink_query) + g_usleep (G_TIME_SPAN_MILLISECOND * 10); + gst_structure_set (s, "some-int", G_TYPE_INT, 123, NULL); + GST_DEBUG ("Written to the query %" GST_PTR_FORMAT, query); + } + return GST_AGGREGATOR_CLASS (parent_class)->sink_query (self, aggpad, query); +} + static void gst_test_aggregator_class_init (GstTestAggregatorClass * klass) { @@ -193,6 +211,7 @@ gst_test_aggregator_class_init (GstTestAggregatorClass * klass) GST_DEBUG_FUNCPTR (gst_test_aggregator_aggregate); base_aggregator_class->get_next_time = gst_aggregator_simple_get_next_time; + base_aggregator_class->sink_query = gst_aggregator_test_slow_sink_query; } static void @@ -646,6 +665,60 @@ GST_START_TEST (test_aggregate_handle_queries) GST_END_TEST; +GST_START_TEST (test_aggregate_queries_robustness) +{ + GThread *thread1; + ChainData data1 = { 0, }; + TestData test = { 0, }; + GstCaps *caps; + gint64 start_time; + + gst_aggregator_test_slow_down_sink_query = TRUE; + + _test_data_init (&test, FALSE); + + caps = gst_caps_new_empty_simple ("foo/x-bar"); + _chain_data_init (&data1, test.aggregator, + gst_query_new_allocation (caps, FALSE), + gst_query_new_allocation (caps, FALSE), + gst_query_new_allocation (caps, FALSE), + gst_query_new_allocation (caps, FALSE), + gst_query_new_allocation (caps, FALSE), + gst_query_new_allocation (caps, FALSE), + gst_query_new_allocation (caps, FALSE), + gst_query_new_allocation (caps, FALSE), + gst_query_new_allocation (caps, FALSE), + gst_query_new_allocation (caps, FALSE), + gst_query_new_allocation (caps, FALSE), + gst_query_new_allocation (caps, FALSE), + gst_query_new_allocation (caps, FALSE), + gst_query_new_allocation (caps, FALSE), + gst_query_new_allocation (caps, FALSE), + gst_query_new_allocation (caps, FALSE), + gst_query_new_allocation (caps, FALSE), NULL); + gst_caps_unref (caps); + + thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL); + g_usleep (G_TIME_SPAN_MILLISECOND * 5); + for (start_time = g_get_monotonic_time (); + start_time + G_TIME_SPAN_SECOND > g_get_monotonic_time (); + g_usleep (G_TIME_SPAN_MILLISECOND)) { + fail_unless (gst_element_send_event (test.aggregator, + gst_event_new_flush_start ())); + fail_unless (gst_element_send_event (test.aggregator, + gst_event_new_flush_stop (TRUE))); + } + + g_thread_join (thread1); + + _chain_data_clear (&data1); + _test_data_clear (&test); + + gst_aggregator_test_slow_down_sink_query = FALSE; +} + +GST_END_TEST; + #define NUM_BUFFERS 3 static void handoff (GstElement * fakesink, GstBuffer * buf, GstPad * pad, guint * count) @@ -1398,6 +1471,7 @@ gst_aggregator_suite (void) tcase_add_test (general, test_aggregate_gap); tcase_add_test (general, test_aggregate_handle_events); tcase_add_test (general, test_aggregate_handle_queries); + tcase_add_test (general, test_aggregate_queries_robustness); tcase_add_test (general, test_flushing_seek); tcase_add_test (general, test_infinite_seek); tcase_add_test (general, test_infinite_seek_50_src);