mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-11-25 03:01:03 +00:00
aggregator: make peek() has() pop() drop() buffer API threadsafe
Enforce that the last buffer that was peeked (or had its existence checked) on a pad is the one that gets popped / dropped, resetting at the end of each aggregation cycle. Fixes https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/603 Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/608>
This commit is contained in:
parent
347fff4ec1
commit
52aa6a9dda
2 changed files with 147 additions and 21 deletions
|
@ -46,6 +46,24 @@
|
|||
* has been taken with pop_buffer (), a new buffer can be queued
|
||||
* on that pad.
|
||||
*
|
||||
* * When gst_aggregator_pad_peek_buffer() or gst_aggregator_pad_has_buffer()
|
||||
* are called, a reference is taken to the returned buffer, which stays
|
||||
* valid until either:
|
||||
*
|
||||
* - gst_aggregator_pad_pop_buffer() is called, in which case the caller
|
||||
* is guaranteed that the buffer they receive is the same as the peeked
|
||||
* buffer.
|
||||
* - gst_aggregator_pad_drop_buffer() is called, in which case the caller
|
||||
* is guaranteed that the dropped buffer is the one that was peeked.
|
||||
* - the subclass implementation of #GstAggregatorClass.aggregate returns.
|
||||
*
|
||||
* Subsequent calls to gst_aggregator_pad_peek_buffer() or
|
||||
* gst_aggregator_pad_has_buffer() return / check the same buffer that was
|
||||
* returned / checked, until one of the conditions listed above is met.
|
||||
*
|
||||
* Subclasses are only allowed to call these methods from the aggregate
|
||||
* thread.
|
||||
*
|
||||
* * If the subclass wishes to push a buffer downstream in its aggregate
|
||||
* implementation, it should do so through the
|
||||
* gst_aggregator_finish_buffer() method. This method will take care
|
||||
|
@ -127,7 +145,7 @@ static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg);
|
|||
static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
|
||||
|
||||
static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad,
|
||||
GstBuffer * buffer);
|
||||
GstBuffer * buffer, gboolean dequeued);
|
||||
|
||||
GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
|
||||
#define GST_CAT_DEFAULT aggregator_debug
|
||||
|
@ -239,6 +257,7 @@ struct _GstAggregatorPadPrivate
|
|||
GQueue data; /* buffers, events and queries */
|
||||
GstBuffer *clipped_buffer;
|
||||
guint num_buffers;
|
||||
GstBuffer *peeked_buffer;
|
||||
|
||||
/* used to track fill state of queues, only used with live-src and when
|
||||
* latency property is set to > 0 */
|
||||
|
@ -951,7 +970,8 @@ gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad,
|
|||
if (GST_IS_BUFFER (item->data)
|
||||
&& klass->skip_buffer (aggpad, agg, item->data)) {
|
||||
GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data);
|
||||
gst_aggregator_pad_buffer_consumed (aggpad, GST_BUFFER (item->data));
|
||||
gst_aggregator_pad_buffer_consumed (aggpad, GST_BUFFER (item->data),
|
||||
TRUE);
|
||||
gst_buffer_unref (item->data);
|
||||
g_queue_delete_link (&aggpad->priv->data, item);
|
||||
} else {
|
||||
|
@ -966,6 +986,22 @@ gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad,
|
|||
return TRUE;
|
||||
}
|
||||
|
||||
static gboolean
|
||||
gst_aggregator_pad_reset_peeked_buffer (GstElement * self, GstPad * epad,
|
||||
gpointer user_data)
|
||||
{
|
||||
GstAggregatorPad *aggpad = (GstAggregatorPad *) epad;
|
||||
|
||||
PAD_LOCK (aggpad);
|
||||
|
||||
gst_buffer_replace (&aggpad->priv->peeked_buffer, NULL);
|
||||
|
||||
PAD_UNLOCK (aggpad);
|
||||
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
|
||||
GstFlowReturn flow_return, gboolean full)
|
||||
|
@ -1300,8 +1336,11 @@ gst_aggregator_aggregate_func (GstAggregator * self)
|
|||
|
||||
/* Ensure we have buffers ready (either in clipped_buffer or at the head of
|
||||
* the queue */
|
||||
if (!gst_aggregator_wait_and_check (self, &timeout))
|
||||
if (!gst_aggregator_wait_and_check (self, &timeout)) {
|
||||
gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
|
||||
gst_aggregator_pad_reset_peeked_buffer, NULL);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) {
|
||||
if (!gst_aggregator_negotiate_unlocked (self)) {
|
||||
|
@ -1319,6 +1358,9 @@ gst_aggregator_aggregate_func (GstAggregator * self)
|
|||
flow_return = klass->aggregate (self, timeout);
|
||||
}
|
||||
|
||||
gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
|
||||
gst_aggregator_pad_reset_peeked_buffer, NULL);
|
||||
|
||||
if (!priv->selected_samples_called_or_warned) {
|
||||
GST_FIXME_OBJECT (self,
|
||||
"Subclass should call gst_aggregator_selected_samples() from its "
|
||||
|
@ -3235,9 +3277,12 @@ gst_aggregator_pad_init (GstAggregatorPad * pad)
|
|||
|
||||
/* Must be called with the PAD_LOCK held */
|
||||
static void
|
||||
gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, GstBuffer * buffer)
|
||||
gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, GstBuffer * buffer,
|
||||
gboolean dequeued)
|
||||
{
|
||||
if (dequeued)
|
||||
pad->priv->num_buffers--;
|
||||
|
||||
if (buffer && pad->priv->emit_signals) {
|
||||
g_signal_emit (pad, gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED],
|
||||
0, buffer);
|
||||
|
@ -3278,7 +3323,7 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
|
|||
buffer = aggclass->clip (self, pad, buffer);
|
||||
|
||||
if (buffer == NULL) {
|
||||
gst_aggregator_pad_buffer_consumed (pad, buffer);
|
||||
gst_aggregator_pad_buffer_consumed (pad, buffer, TRUE);
|
||||
GST_TRACE_OBJECT (pad, "Clipping consumed the buffer");
|
||||
}
|
||||
}
|
||||
|
@ -3302,22 +3347,44 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
|
|||
GstBuffer *
|
||||
gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad)
|
||||
{
|
||||
GstBuffer *buffer;
|
||||
GstBuffer *buffer = NULL;
|
||||
|
||||
PAD_LOCK (pad);
|
||||
|
||||
if (pad->priv->flow_return != GST_FLOW_OK) {
|
||||
PAD_UNLOCK (pad);
|
||||
return NULL;
|
||||
/* If the subclass has already peeked a buffer, we guarantee
|
||||
* that it receives the same buffer, no matter if the pad has
|
||||
* errored out / been flushed in the meantime.
|
||||
*/
|
||||
if (pad->priv->peeked_buffer) {
|
||||
buffer = pad->priv->peeked_buffer;
|
||||
goto done;
|
||||
}
|
||||
|
||||
gst_aggregator_pad_clip_buffer_unlocked (pad);
|
||||
if (pad->priv->flow_return != GST_FLOW_OK)
|
||||
goto done;
|
||||
|
||||
gst_aggregator_pad_clip_buffer_unlocked (pad);
|
||||
buffer = pad->priv->clipped_buffer;
|
||||
|
||||
done:
|
||||
if (buffer) {
|
||||
if (pad->priv->clipped_buffer != NULL) {
|
||||
/* Here we still hold a reference to both the clipped buffer
|
||||
* and possibly the peeked buffer, we transfer the first and
|
||||
* potentially release the second
|
||||
*/
|
||||
gst_aggregator_pad_buffer_consumed (pad, buffer, TRUE);
|
||||
pad->priv->clipped_buffer = NULL;
|
||||
gst_aggregator_pad_buffer_consumed (pad, buffer);
|
||||
gst_buffer_replace (&pad->priv->peeked_buffer, NULL);
|
||||
} else {
|
||||
/* Here our clipped buffer has already been released, for
|
||||
* example because of a flush. We thus transfer the reference
|
||||
* to the peeked buffer to the caller, and we don't decrement
|
||||
* pad.num_buffers as it has already been done elsewhere
|
||||
*/
|
||||
gst_aggregator_pad_buffer_consumed (pad, buffer, FALSE);
|
||||
pad->priv->peeked_buffer = NULL;
|
||||
}
|
||||
GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
|
||||
}
|
||||
|
||||
|
@ -3359,24 +3426,29 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
|
|||
GstBuffer *
|
||||
gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad)
|
||||
{
|
||||
GstBuffer *buffer;
|
||||
GstBuffer *buffer = NULL;
|
||||
|
||||
PAD_LOCK (pad);
|
||||
|
||||
if (pad->priv->flow_return != GST_FLOW_OK) {
|
||||
PAD_UNLOCK (pad);
|
||||
return NULL;
|
||||
if (pad->priv->peeked_buffer) {
|
||||
buffer = gst_buffer_ref (pad->priv->peeked_buffer);
|
||||
goto done;
|
||||
}
|
||||
|
||||
if (pad->priv->flow_return != GST_FLOW_OK)
|
||||
goto done;
|
||||
|
||||
gst_aggregator_pad_clip_buffer_unlocked (pad);
|
||||
|
||||
if (pad->priv->clipped_buffer) {
|
||||
buffer = gst_buffer_ref (pad->priv->clipped_buffer);
|
||||
pad->priv->peeked_buffer = gst_buffer_ref (buffer);
|
||||
} else {
|
||||
buffer = NULL;
|
||||
}
|
||||
PAD_UNLOCK (pad);
|
||||
|
||||
done:
|
||||
PAD_UNLOCK (pad);
|
||||
return buffer;
|
||||
}
|
||||
|
||||
|
@ -3398,8 +3470,15 @@ gst_aggregator_pad_has_buffer (GstAggregatorPad * pad)
|
|||
gboolean has_buffer;
|
||||
|
||||
PAD_LOCK (pad);
|
||||
|
||||
if (pad->priv->peeked_buffer) {
|
||||
has_buffer = TRUE;
|
||||
} else {
|
||||
gst_aggregator_pad_clip_buffer_unlocked (pad);
|
||||
has_buffer = (pad->priv->clipped_buffer != NULL);
|
||||
if (has_buffer)
|
||||
pad->priv->peeked_buffer = gst_buffer_ref (pad->priv->clipped_buffer);
|
||||
}
|
||||
PAD_UNLOCK (pad);
|
||||
|
||||
return has_buffer;
|
||||
|
|
|
@ -58,6 +58,7 @@ struct _GstTestAggregator
|
|||
|
||||
guint64 timestamp;
|
||||
gboolean gap_expected;
|
||||
gboolean do_flush_on_aggregate;
|
||||
};
|
||||
|
||||
struct _GstTestAggregatorClass
|
||||
|
@ -100,7 +101,21 @@ gst_test_aggregator_aggregate (GstAggregator * aggregator, gboolean timeout)
|
|||
testagg->gap_expected = FALSE;
|
||||
}
|
||||
|
||||
if (testagg->do_flush_on_aggregate) {
|
||||
GstBuffer *popped_buf;
|
||||
buf = gst_aggregator_pad_peek_buffer (pad);
|
||||
|
||||
GST_DEBUG_OBJECT (pad, "Flushing on aggregate");
|
||||
|
||||
gst_pad_send_event (GST_PAD (pad), gst_event_new_flush_start ());
|
||||
popped_buf = gst_aggregator_pad_pop_buffer (pad);
|
||||
|
||||
fail_unless (buf == popped_buf);
|
||||
gst_buffer_unref (buf);
|
||||
gst_buffer_unref (popped_buf);
|
||||
} else {
|
||||
gst_aggregator_pad_drop_buffer (pad);
|
||||
}
|
||||
|
||||
g_value_reset (&value);
|
||||
break;
|
||||
|
@ -1259,6 +1274,37 @@ GST_START_TEST (test_change_state_intensive)
|
|||
|
||||
GST_END_TEST;
|
||||
|
||||
GST_START_TEST (test_flush_on_aggregate)
|
||||
{
|
||||
GThread *thread1, *thread2;
|
||||
ChainData data1 = { 0, };
|
||||
ChainData data2 = { 0, };
|
||||
TestData test = { 0, };
|
||||
|
||||
_test_data_init (&test, FALSE);
|
||||
((GstTestAggregator *) test.aggregator)->do_flush_on_aggregate = TRUE;
|
||||
_chain_data_init (&data1, test.aggregator, gst_buffer_new (), NULL);
|
||||
_chain_data_init (&data2, test.aggregator, gst_buffer_new (), NULL);
|
||||
|
||||
thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL);
|
||||
thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL);
|
||||
|
||||
g_main_loop_run (test.ml);
|
||||
g_source_remove (test.timeout_id);
|
||||
|
||||
/* these will return immediately as when the data is popped the threads are
|
||||
* unlocked and will terminate */
|
||||
g_thread_join (thread1);
|
||||
g_thread_join (thread2);
|
||||
|
||||
_chain_data_clear (&data1);
|
||||
_chain_data_clear (&data2);
|
||||
_test_data_clear (&test);
|
||||
}
|
||||
|
||||
GST_END_TEST;
|
||||
|
||||
|
||||
static Suite *
|
||||
gst_aggregator_suite (void)
|
||||
{
|
||||
|
@ -1286,6 +1332,7 @@ gst_aggregator_suite (void)
|
|||
tcase_add_test (general, test_timeout_pipeline_with_wait);
|
||||
tcase_add_test (general, test_add_remove);
|
||||
tcase_add_test (general, test_change_state_intensive);
|
||||
tcase_add_test (general, test_flush_on_aggregate);
|
||||
|
||||
return suite;
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue