aggregator: fix use-after-free in queries processing

Test included.
The problem appears when aggregator drops the query while
it's being proccessed by the klass->sink_query handler.
This can happen on FLUSH_START event. If the query is still
in the queue, it can be safely dropped, but if it's already
in the klass->sink_query() handler, then sink pad has no
choice and has to wait for the proccessing to complete.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/5958>
This commit is contained in:
Alexander Slobodeniuk 2023-12-12 23:55:04 +01:00 committed by GStreamer Marge Bot
parent 6744cb0d20
commit f1a3d85e80
2 changed files with 93 additions and 4 deletions

View file

@ -264,6 +264,12 @@ struct _GstAggregatorPadPrivate
guint num_buffers;
GstBuffer *peeked_buffer;
/* TRUE if the serialized query is in the proccess of handling at some
* exact moment. This will obligate the sinkpad streaming thread wait
* until the handling finishes.
* Always protected by the PAD_LOCK. */
gboolean query_in_proccess;
/* used to track fill state of queues, only used with live-src and when
* latency property is set to > 0 */
GstClockTime head_position;
@ -971,8 +977,10 @@ gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
!GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data)))
event = gst_event_ref (g_queue_peek_tail (&pad->priv->data));
if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data)))
if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data))) {
query = g_queue_peek_tail (&pad->priv->data);
pad->priv->query_in_proccess = TRUE;
}
}
PAD_UNLOCK (pad);
if (event || query) {
@ -1007,6 +1015,8 @@ gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
NULL);
g_queue_pop_tail (&pad->priv->data);
}
pad->priv->query_in_proccess = FALSE;
}
PAD_BROADCAST_EVENT (pad);
@ -2638,9 +2648,14 @@ gst_aggregator_default_sink_query_pre_queue (GstAggregator * self,
SRC_BROADCAST (self);
SRC_UNLOCK (self);
while (!gst_aggregator_pad_queue_is_empty (aggpad)
&& aggpad->priv->flow_return == GST_FLOW_OK) {
GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
/* Sanity check: aggregator's sink pad can only proccess one serialized
* query at a time. */
g_warn_if_fail (!aggpad->priv->query_in_proccess);
while ((!gst_aggregator_pad_queue_is_empty (aggpad)
&& aggpad->priv->flow_return == GST_FLOW_OK) ||
aggpad->priv->query_in_proccess) {
GST_DEBUG_OBJECT (aggpad, "Waiting for query to be consumed");
PAD_WAIT_EVENT (aggpad);
}

View file

@ -166,6 +166,24 @@ gst_test_aggregator_aggregate (GstAggregator * aggregator, gboolean timeout)
#define gst_test_aggregator_parent_class parent_class
G_DEFINE_TYPE (GstTestAggregator, gst_test_aggregator, GST_TYPE_AGGREGATOR);
static gboolean gst_aggregator_test_slow_down_sink_query = FALSE;
static gboolean
gst_aggregator_test_slow_sink_query (GstAggregator * self,
GstAggregatorPad * aggpad, GstQuery * query)
{
GST_DEBUG ("Handling query %" GST_PTR_FORMAT, query);
if (GST_QUERY_IS_SERIALIZED (query)) {
GstStructure *s = gst_query_writable_structure (query);
if (gst_aggregator_test_slow_down_sink_query)
g_usleep (G_TIME_SPAN_MILLISECOND * 10);
gst_structure_set (s, "some-int", G_TYPE_INT, 123, NULL);
GST_DEBUG ("Written to the query %" GST_PTR_FORMAT, query);
}
return GST_AGGREGATOR_CLASS (parent_class)->sink_query (self, aggpad, query);
}
static void
gst_test_aggregator_class_init (GstTestAggregatorClass * klass)
{
@ -193,6 +211,7 @@ gst_test_aggregator_class_init (GstTestAggregatorClass * klass)
GST_DEBUG_FUNCPTR (gst_test_aggregator_aggregate);
base_aggregator_class->get_next_time = gst_aggregator_simple_get_next_time;
base_aggregator_class->sink_query = gst_aggregator_test_slow_sink_query;
}
static void
@ -646,6 +665,60 @@ GST_START_TEST (test_aggregate_handle_queries)
GST_END_TEST;
GST_START_TEST (test_aggregate_queries_robustness)
{
GThread *thread1;
ChainData data1 = { 0, };
TestData test = { 0, };
GstCaps *caps;
gint64 start_time;
gst_aggregator_test_slow_down_sink_query = TRUE;
_test_data_init (&test, FALSE);
caps = gst_caps_new_empty_simple ("foo/x-bar");
_chain_data_init (&data1, test.aggregator,
gst_query_new_allocation (caps, FALSE),
gst_query_new_allocation (caps, FALSE),
gst_query_new_allocation (caps, FALSE),
gst_query_new_allocation (caps, FALSE),
gst_query_new_allocation (caps, FALSE),
gst_query_new_allocation (caps, FALSE),
gst_query_new_allocation (caps, FALSE),
gst_query_new_allocation (caps, FALSE),
gst_query_new_allocation (caps, FALSE),
gst_query_new_allocation (caps, FALSE),
gst_query_new_allocation (caps, FALSE),
gst_query_new_allocation (caps, FALSE),
gst_query_new_allocation (caps, FALSE),
gst_query_new_allocation (caps, FALSE),
gst_query_new_allocation (caps, FALSE),
gst_query_new_allocation (caps, FALSE),
gst_query_new_allocation (caps, FALSE), NULL);
gst_caps_unref (caps);
thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL);
g_usleep (G_TIME_SPAN_MILLISECOND * 5);
for (start_time = g_get_monotonic_time ();
start_time + G_TIME_SPAN_SECOND > g_get_monotonic_time ();
g_usleep (G_TIME_SPAN_MILLISECOND)) {
fail_unless (gst_element_send_event (test.aggregator,
gst_event_new_flush_start ()));
fail_unless (gst_element_send_event (test.aggregator,
gst_event_new_flush_stop (TRUE)));
}
g_thread_join (thread1);
_chain_data_clear (&data1);
_test_data_clear (&test);
gst_aggregator_test_slow_down_sink_query = FALSE;
}
GST_END_TEST;
#define NUM_BUFFERS 3
static void
handoff (GstElement * fakesink, GstBuffer * buf, GstPad * pad, guint * count)
@ -1398,6 +1471,7 @@ gst_aggregator_suite (void)
tcase_add_test (general, test_aggregate_gap);
tcase_add_test (general, test_aggregate_handle_events);
tcase_add_test (general, test_aggregate_handle_queries);
tcase_add_test (general, test_aggregate_queries_robustness);
tcase_add_test (general, test_flushing_seek);
tcase_add_test (general, test_infinite_seek);
tcase_add_test (general, test_infinite_seek_50_src);