aggregator: Release pads' peeked buffer when removing the pad or finalizing it

The peeked buffer was always reset after calling ::aggregate() but under
no other circumstances. If a pad was removed after peeking and before
::aggregate() returned then the peeked buffer would be leaked.

This can easily happen if pads are removed from the aggregator from a
pad probe downstream of the source pad but still in the source pad's
streaming thread.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/785>
This commit is contained in:
Sebastian Dröge 2021-04-06 20:56:55 +03:00 committed by Tim-Philipp Müller
parent 102232fa62
commit 934e6bb114
2 changed files with 41 additions and 0 deletions

View file

@ -1936,6 +1936,7 @@ gst_aggregator_release_pad (GstElement * element, GstPad * pad)
SRC_LOCK (self); SRC_LOCK (self);
gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE); gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
gst_buffer_replace (&aggpad->priv->peeked_buffer, NULL);
gst_element_remove_pad (element, pad); gst_element_remove_pad (element, pad);
self->priv->has_peer_latency = FALSE; self->priv->has_peer_latency = FALSE;
@ -3191,6 +3192,7 @@ gst_aggregator_pad_finalize (GObject * object)
{ {
GstAggregatorPad *pad = (GstAggregatorPad *) object; GstAggregatorPad *pad = (GstAggregatorPad *) object;
gst_buffer_replace (&pad->priv->peeked_buffer, NULL);
g_cond_clear (&pad->priv->event_cond); g_cond_clear (&pad->priv->event_cond);
g_mutex_clear (&pad->priv->flush_lock); g_mutex_clear (&pad->priv->flush_lock);
g_mutex_clear (&pad->priv->lock); g_mutex_clear (&pad->priv->lock);

View file

@ -59,6 +59,7 @@ struct _GstTestAggregator
guint64 timestamp; guint64 timestamp;
gboolean gap_expected; gboolean gap_expected;
gboolean do_flush_on_aggregate; gboolean do_flush_on_aggregate;
gboolean do_remove_pad_on_aggregate;
}; };
struct _GstTestAggregatorClass struct _GstTestAggregatorClass
@ -113,6 +114,14 @@ gst_test_aggregator_aggregate (GstAggregator * aggregator, gboolean timeout)
fail_unless (buf == popped_buf); fail_unless (buf == popped_buf);
gst_buffer_unref (buf); gst_buffer_unref (buf);
gst_buffer_unref (popped_buf); gst_buffer_unref (popped_buf);
} else if (testagg->do_remove_pad_on_aggregate) {
buf = gst_aggregator_pad_peek_buffer (pad);
GST_DEBUG_OBJECT (pad, "Removing pad on aggregate");
gst_buffer_unref (buf);
gst_element_release_request_pad (GST_ELEMENT (aggregator),
GST_PAD (pad));
} else { } else {
gst_aggregator_pad_drop_buffer (pad); gst_aggregator_pad_drop_buffer (pad);
} }
@ -1304,6 +1313,35 @@ GST_START_TEST (test_flush_on_aggregate)
GST_END_TEST; GST_END_TEST;
GST_START_TEST (test_remove_pad_on_aggregate)
{
GThread *thread1, *thread2;
ChainData data1 = { 0, };
ChainData data2 = { 0, };
TestData test = { 0, };
_test_data_init (&test, FALSE);
((GstTestAggregator *) test.aggregator)->do_remove_pad_on_aggregate = TRUE;
_chain_data_init (&data1, test.aggregator, gst_buffer_new (), NULL);
_chain_data_init (&data2, test.aggregator, gst_buffer_new (), NULL);
thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL);
thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL);
g_main_loop_run (test.ml);
g_source_remove (test.timeout_id);
/* these will return immediately as when the data is popped the threads are
* unlocked and will terminate */
g_thread_join (thread1);
g_thread_join (thread2);
_chain_data_clear (&data1);
_chain_data_clear (&data2);
_test_data_clear (&test);
}
GST_END_TEST;
static Suite * static Suite *
gst_aggregator_suite (void) gst_aggregator_suite (void)
@ -1333,6 +1371,7 @@ gst_aggregator_suite (void)
tcase_add_test (general, test_add_remove); tcase_add_test (general, test_add_remove);
tcase_add_test (general, test_change_state_intensive); tcase_add_test (general, test_change_state_intensive);
tcase_add_test (general, test_flush_on_aggregate); tcase_add_test (general, test_flush_on_aggregate);
tcase_add_test (general, test_remove_pad_on_aggregate);
return suite; return suite;
} }