mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-11-26 11:41:09 +00:00
aggregator: fix use-after-free in queries processing
Test included. The problem appears when aggregator drops the query while it's being proccessed by the klass->sink_query handler. This can happen on FLUSH_START event. If the query is still in the queue, it can be safely dropped, but if it's already in the klass->sink_query() handler, then sink pad has no choice and has to wait for the proccessing to complete. Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/5765>
This commit is contained in:
parent
94e6a6e3de
commit
97e748466e
2 changed files with 93 additions and 4 deletions
|
@ -264,6 +264,12 @@ struct _GstAggregatorPadPrivate
|
|||
guint num_buffers;
|
||||
GstBuffer *peeked_buffer;
|
||||
|
||||
/* TRUE if the serialized query is in the proccess of handling at some
|
||||
* exact moment. This will obligate the sinkpad streaming thread wait
|
||||
* until the handling finishes.
|
||||
* Always protected by the PAD_LOCK. */
|
||||
gboolean query_in_proccess;
|
||||
|
||||
/* used to track fill state of queues, only used with live-src and when
|
||||
* latency property is set to > 0 */
|
||||
GstClockTime head_position;
|
||||
|
@ -975,8 +981,10 @@ gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
|
|||
!GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
|
||||
if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data)))
|
||||
event = gst_event_ref (g_queue_peek_tail (&pad->priv->data));
|
||||
if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data)))
|
||||
if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data))) {
|
||||
query = g_queue_peek_tail (&pad->priv->data);
|
||||
pad->priv->query_in_proccess = TRUE;
|
||||
}
|
||||
}
|
||||
PAD_UNLOCK (pad);
|
||||
if (event || query) {
|
||||
|
@ -1011,6 +1019,8 @@ gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
|
|||
NULL);
|
||||
g_queue_pop_tail (&pad->priv->data);
|
||||
}
|
||||
|
||||
pad->priv->query_in_proccess = FALSE;
|
||||
}
|
||||
|
||||
PAD_BROADCAST_EVENT (pad);
|
||||
|
@ -2762,9 +2772,14 @@ gst_aggregator_default_sink_query_pre_queue (GstAggregator * self,
|
|||
SRC_BROADCAST (self);
|
||||
SRC_UNLOCK (self);
|
||||
|
||||
while (!gst_aggregator_pad_queue_is_empty (aggpad)
|
||||
&& aggpad->priv->flow_return == GST_FLOW_OK) {
|
||||
GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
|
||||
/* Sanity check: aggregator's sink pad can only proccess one serialized
|
||||
* query at a time. */
|
||||
g_warn_if_fail (!aggpad->priv->query_in_proccess);
|
||||
|
||||
while ((!gst_aggregator_pad_queue_is_empty (aggpad)
|
||||
&& aggpad->priv->flow_return == GST_FLOW_OK) ||
|
||||
aggpad->priv->query_in_proccess) {
|
||||
GST_DEBUG_OBJECT (aggpad, "Waiting for query to be consumed");
|
||||
PAD_WAIT_EVENT (aggpad);
|
||||
}
|
||||
|
||||
|
|
|
@ -166,6 +166,24 @@ gst_test_aggregator_aggregate (GstAggregator * aggregator, gboolean timeout)
|
|||
#define gst_test_aggregator_parent_class parent_class
|
||||
G_DEFINE_TYPE (GstTestAggregator, gst_test_aggregator, GST_TYPE_AGGREGATOR);
|
||||
|
||||
static gboolean gst_aggregator_test_slow_down_sink_query = FALSE;
|
||||
|
||||
static gboolean
|
||||
gst_aggregator_test_slow_sink_query (GstAggregator * self,
|
||||
GstAggregatorPad * aggpad, GstQuery * query)
|
||||
{
|
||||
GST_DEBUG ("Handling query %" GST_PTR_FORMAT, query);
|
||||
if (GST_QUERY_IS_SERIALIZED (query)) {
|
||||
GstStructure *s = gst_query_writable_structure (query);
|
||||
|
||||
if (gst_aggregator_test_slow_down_sink_query)
|
||||
g_usleep (G_TIME_SPAN_MILLISECOND * 10);
|
||||
gst_structure_set (s, "some-int", G_TYPE_INT, 123, NULL);
|
||||
GST_DEBUG ("Written to the query %" GST_PTR_FORMAT, query);
|
||||
}
|
||||
return GST_AGGREGATOR_CLASS (parent_class)->sink_query (self, aggpad, query);
|
||||
}
|
||||
|
||||
static void
|
||||
gst_test_aggregator_class_init (GstTestAggregatorClass * klass)
|
||||
{
|
||||
|
@ -193,6 +211,7 @@ gst_test_aggregator_class_init (GstTestAggregatorClass * klass)
|
|||
GST_DEBUG_FUNCPTR (gst_test_aggregator_aggregate);
|
||||
|
||||
base_aggregator_class->get_next_time = gst_aggregator_simple_get_next_time;
|
||||
base_aggregator_class->sink_query = gst_aggregator_test_slow_sink_query;
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -646,6 +665,60 @@ GST_START_TEST (test_aggregate_handle_queries)
|
|||
|
||||
GST_END_TEST;
|
||||
|
||||
GST_START_TEST (test_aggregate_queries_robustness)
|
||||
{
|
||||
GThread *thread1;
|
||||
ChainData data1 = { 0, };
|
||||
TestData test = { 0, };
|
||||
GstCaps *caps;
|
||||
gint64 start_time;
|
||||
|
||||
gst_aggregator_test_slow_down_sink_query = TRUE;
|
||||
|
||||
_test_data_init (&test, FALSE);
|
||||
|
||||
caps = gst_caps_new_empty_simple ("foo/x-bar");
|
||||
_chain_data_init (&data1, test.aggregator,
|
||||
gst_query_new_allocation (caps, FALSE),
|
||||
gst_query_new_allocation (caps, FALSE),
|
||||
gst_query_new_allocation (caps, FALSE),
|
||||
gst_query_new_allocation (caps, FALSE),
|
||||
gst_query_new_allocation (caps, FALSE),
|
||||
gst_query_new_allocation (caps, FALSE),
|
||||
gst_query_new_allocation (caps, FALSE),
|
||||
gst_query_new_allocation (caps, FALSE),
|
||||
gst_query_new_allocation (caps, FALSE),
|
||||
gst_query_new_allocation (caps, FALSE),
|
||||
gst_query_new_allocation (caps, FALSE),
|
||||
gst_query_new_allocation (caps, FALSE),
|
||||
gst_query_new_allocation (caps, FALSE),
|
||||
gst_query_new_allocation (caps, FALSE),
|
||||
gst_query_new_allocation (caps, FALSE),
|
||||
gst_query_new_allocation (caps, FALSE),
|
||||
gst_query_new_allocation (caps, FALSE), NULL);
|
||||
gst_caps_unref (caps);
|
||||
|
||||
thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL);
|
||||
g_usleep (G_TIME_SPAN_MILLISECOND * 5);
|
||||
for (start_time = g_get_monotonic_time ();
|
||||
start_time + G_TIME_SPAN_SECOND > g_get_monotonic_time ();
|
||||
g_usleep (G_TIME_SPAN_MILLISECOND)) {
|
||||
fail_unless (gst_element_send_event (test.aggregator,
|
||||
gst_event_new_flush_start ()));
|
||||
fail_unless (gst_element_send_event (test.aggregator,
|
||||
gst_event_new_flush_stop (TRUE)));
|
||||
}
|
||||
|
||||
g_thread_join (thread1);
|
||||
|
||||
_chain_data_clear (&data1);
|
||||
_test_data_clear (&test);
|
||||
|
||||
gst_aggregator_test_slow_down_sink_query = FALSE;
|
||||
}
|
||||
|
||||
GST_END_TEST;
|
||||
|
||||
#define NUM_BUFFERS 3
|
||||
static void
|
||||
handoff (GstElement * fakesink, GstBuffer * buf, GstPad * pad, guint * count)
|
||||
|
@ -1398,6 +1471,7 @@ gst_aggregator_suite (void)
|
|||
tcase_add_test (general, test_aggregate_gap);
|
||||
tcase_add_test (general, test_aggregate_handle_events);
|
||||
tcase_add_test (general, test_aggregate_handle_queries);
|
||||
tcase_add_test (general, test_aggregate_queries_robustness);
|
||||
tcase_add_test (general, test_flushing_seek);
|
||||
tcase_add_test (general, test_infinite_seek);
|
||||
tcase_add_test (general, test_infinite_seek_50_src);
|
||||
|
|
Loading…
Reference in a new issue