diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index a3d9b2bb4c..ba1b22d507 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -46,6 +46,24 @@ * has been taken with pop_buffer (), a new buffer can be queued * on that pad. * + * * When gst_aggregator_pad_peek_buffer() or gst_aggregator_pad_has_buffer() + * are called, a reference is taken to the returned buffer, which stays + * valid until either: + * + * - gst_aggregator_pad_pop_buffer() is called, in which case the caller + * is guaranteed that the buffer they receive is the same as the peeked + * buffer. + * - gst_aggregator_pad_drop_buffer() is called, in which case the caller + * is guaranteed that the dropped buffer is the one that was peeked. + * - the subclass implementation of #GstAggregatorClass.aggregate returns. + * + * Subsequent calls to gst_aggregator_pad_peek_buffer() or + * gst_aggregator_pad_has_buffer() return / check the same buffer that was + * returned / checked, until one of the conditions listed above is met. + * + * Subclasses are only allowed to call these methods from the aggregate + * thread. + * * * If the subclass wishes to push a buffer downstream in its aggregate * implementation, it should do so through the * gst_aggregator_finish_buffer() method. This method will take care @@ -127,7 +145,7 @@ static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg); static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self); static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, - GstBuffer * buffer); + GstBuffer * buffer, gboolean dequeued); GST_DEBUG_CATEGORY_STATIC (aggregator_debug); #define GST_CAT_DEFAULT aggregator_debug @@ -239,6 +257,7 @@ struct _GstAggregatorPadPrivate GQueue data; /* buffers, events and queries */ GstBuffer *clipped_buffer; guint num_buffers; + GstBuffer *peeked_buffer; /* used to track fill state of queues, only used with live-src and when * latency property is set to > 0 */ @@ -951,7 +970,8 @@ gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad, if (GST_IS_BUFFER (item->data) && klass->skip_buffer (aggpad, agg, item->data)) { GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data); - gst_aggregator_pad_buffer_consumed (aggpad, GST_BUFFER (item->data)); + gst_aggregator_pad_buffer_consumed (aggpad, GST_BUFFER (item->data), + TRUE); gst_buffer_unref (item->data); g_queue_delete_link (&aggpad->priv->data, item); } else { @@ -966,6 +986,22 @@ gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad, return TRUE; } +static gboolean +gst_aggregator_pad_reset_peeked_buffer (GstElement * self, GstPad * epad, + gpointer user_data) +{ + GstAggregatorPad *aggpad = (GstAggregatorPad *) epad; + + PAD_LOCK (aggpad); + + gst_buffer_replace (&aggpad->priv->peeked_buffer, NULL); + + PAD_UNLOCK (aggpad); + + return TRUE; +} + + static void gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad, GstFlowReturn flow_return, gboolean full) @@ -1300,8 +1336,11 @@ gst_aggregator_aggregate_func (GstAggregator * self) /* Ensure we have buffers ready (either in clipped_buffer or at the head of * the queue */ - if (!gst_aggregator_wait_and_check (self, &timeout)) + if (!gst_aggregator_wait_and_check (self, &timeout)) { + gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), + gst_aggregator_pad_reset_peeked_buffer, NULL); continue; + } if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) { if (!gst_aggregator_negotiate_unlocked (self)) { @@ -1319,6 +1358,9 @@ gst_aggregator_aggregate_func (GstAggregator * self) flow_return = klass->aggregate (self, timeout); } + gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), + gst_aggregator_pad_reset_peeked_buffer, NULL); + if (!priv->selected_samples_called_or_warned) { GST_FIXME_OBJECT (self, "Subclass should call gst_aggregator_selected_samples() from its " @@ -3235,9 +3277,12 @@ gst_aggregator_pad_init (GstAggregatorPad * pad) /* Must be called with the PAD_LOCK held */ static void -gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, GstBuffer * buffer) +gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, GstBuffer * buffer, + gboolean dequeued) { - pad->priv->num_buffers--; + if (dequeued) + pad->priv->num_buffers--; + if (buffer && pad->priv->emit_signals) { g_signal_emit (pad, gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED], 0, buffer); @@ -3278,7 +3323,7 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad) buffer = aggclass->clip (self, pad, buffer); if (buffer == NULL) { - gst_aggregator_pad_buffer_consumed (pad, buffer); + gst_aggregator_pad_buffer_consumed (pad, buffer, TRUE); GST_TRACE_OBJECT (pad, "Clipping consumed the buffer"); } } @@ -3302,22 +3347,44 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad) GstBuffer * gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad) { - GstBuffer *buffer; + GstBuffer *buffer = NULL; PAD_LOCK (pad); - if (pad->priv->flow_return != GST_FLOW_OK) { - PAD_UNLOCK (pad); - return NULL; + /* If the subclass has already peeked a buffer, we guarantee + * that it receives the same buffer, no matter if the pad has + * errored out / been flushed in the meantime. + */ + if (pad->priv->peeked_buffer) { + buffer = pad->priv->peeked_buffer; + goto done; } - gst_aggregator_pad_clip_buffer_unlocked (pad); + if (pad->priv->flow_return != GST_FLOW_OK) + goto done; + gst_aggregator_pad_clip_buffer_unlocked (pad); buffer = pad->priv->clipped_buffer; +done: if (buffer) { - pad->priv->clipped_buffer = NULL; - gst_aggregator_pad_buffer_consumed (pad, buffer); + if (pad->priv->clipped_buffer != NULL) { + /* Here we still hold a reference to both the clipped buffer + * and possibly the peeked buffer, we transfer the first and + * potentially release the second + */ + gst_aggregator_pad_buffer_consumed (pad, buffer, TRUE); + pad->priv->clipped_buffer = NULL; + gst_buffer_replace (&pad->priv->peeked_buffer, NULL); + } else { + /* Here our clipped buffer has already been released, for + * example because of a flush. We thus transfer the reference + * to the peeked buffer to the caller, and we don't decrement + * pad.num_buffers as it has already been done elsewhere + */ + gst_aggregator_pad_buffer_consumed (pad, buffer, FALSE); + pad->priv->peeked_buffer = NULL; + } GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer); } @@ -3359,24 +3426,29 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad) GstBuffer * gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad) { - GstBuffer *buffer; + GstBuffer *buffer = NULL; PAD_LOCK (pad); - if (pad->priv->flow_return != GST_FLOW_OK) { - PAD_UNLOCK (pad); - return NULL; + if (pad->priv->peeked_buffer) { + buffer = gst_buffer_ref (pad->priv->peeked_buffer); + goto done; } + if (pad->priv->flow_return != GST_FLOW_OK) + goto done; + gst_aggregator_pad_clip_buffer_unlocked (pad); if (pad->priv->clipped_buffer) { buffer = gst_buffer_ref (pad->priv->clipped_buffer); + pad->priv->peeked_buffer = gst_buffer_ref (buffer); } else { buffer = NULL; } - PAD_UNLOCK (pad); +done: + PAD_UNLOCK (pad); return buffer; } @@ -3398,8 +3470,15 @@ gst_aggregator_pad_has_buffer (GstAggregatorPad * pad) gboolean has_buffer; PAD_LOCK (pad); - gst_aggregator_pad_clip_buffer_unlocked (pad); - has_buffer = (pad->priv->clipped_buffer != NULL); + + if (pad->priv->peeked_buffer) { + has_buffer = TRUE; + } else { + gst_aggregator_pad_clip_buffer_unlocked (pad); + has_buffer = (pad->priv->clipped_buffer != NULL); + if (has_buffer) + pad->priv->peeked_buffer = gst_buffer_ref (pad->priv->clipped_buffer); + } PAD_UNLOCK (pad); return has_buffer; diff --git a/tests/check/libs/aggregator.c b/tests/check/libs/aggregator.c index 342402453b..24df76865a 100644 --- a/tests/check/libs/aggregator.c +++ b/tests/check/libs/aggregator.c @@ -58,6 +58,7 @@ struct _GstTestAggregator guint64 timestamp; gboolean gap_expected; + gboolean do_flush_on_aggregate; }; struct _GstTestAggregatorClass @@ -100,7 +101,21 @@ gst_test_aggregator_aggregate (GstAggregator * aggregator, gboolean timeout) testagg->gap_expected = FALSE; } - gst_aggregator_pad_drop_buffer (pad); + if (testagg->do_flush_on_aggregate) { + GstBuffer *popped_buf; + buf = gst_aggregator_pad_peek_buffer (pad); + + GST_DEBUG_OBJECT (pad, "Flushing on aggregate"); + + gst_pad_send_event (GST_PAD (pad), gst_event_new_flush_start ()); + popped_buf = gst_aggregator_pad_pop_buffer (pad); + + fail_unless (buf == popped_buf); + gst_buffer_unref (buf); + gst_buffer_unref (popped_buf); + } else { + gst_aggregator_pad_drop_buffer (pad); + } g_value_reset (&value); break; @@ -1259,6 +1274,37 @@ GST_START_TEST (test_change_state_intensive) GST_END_TEST; +GST_START_TEST (test_flush_on_aggregate) +{ + GThread *thread1, *thread2; + ChainData data1 = { 0, }; + ChainData data2 = { 0, }; + TestData test = { 0, }; + + _test_data_init (&test, FALSE); + ((GstTestAggregator *) test.aggregator)->do_flush_on_aggregate = TRUE; + _chain_data_init (&data1, test.aggregator, gst_buffer_new (), NULL); + _chain_data_init (&data2, test.aggregator, gst_buffer_new (), NULL); + + thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL); + thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL); + + g_main_loop_run (test.ml); + g_source_remove (test.timeout_id); + + /* these will return immediately as when the data is popped the threads are + * unlocked and will terminate */ + g_thread_join (thread1); + g_thread_join (thread2); + + _chain_data_clear (&data1); + _chain_data_clear (&data2); + _test_data_clear (&test); +} + +GST_END_TEST; + + static Suite * gst_aggregator_suite (void) { @@ -1286,6 +1332,7 @@ gst_aggregator_suite (void) tcase_add_test (general, test_timeout_pipeline_with_wait); tcase_add_test (general, test_add_remove); tcase_add_test (general, test_change_state_intensive); + tcase_add_test (general, test_flush_on_aggregate); return suite; }