aggregator: delegate buffer skipping to the aggregate thread

As we do that for serialized events as well, and the subclass will
most likely need to access pad->segment to make its decisions,
doing that from the sinkpad's streaming threads was racy.
This commit is contained in:
Mathieu Duponchelle 2018-01-23 22:49:52 +01:00
parent 9f69034d41
commit 117200faeb

View file

@ -126,6 +126,8 @@ static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg);
static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad);
GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
#define GST_CAT_DEFAULT aggregator_debug
@ -774,6 +776,42 @@ gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
return TRUE;
}
static gboolean
gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad,
gpointer user_data)
{
GList *item;
GstAggregatorPad *aggpad = (GstAggregatorPad *) epad;
GstAggregator *agg = (GstAggregator *) self;
GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
if (!klass->skip_buffer)
return FALSE;
PAD_LOCK (aggpad);
item = g_queue_peek_head_link (&aggpad->priv->data);
while (item) {
GList *next = item->next;
if (GST_IS_BUFFER (item->data)
&& klass->skip_buffer (aggpad, agg, item->data)) {
GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data);
gst_aggregator_pad_buffer_consumed (aggpad);
gst_buffer_unref (item->data);
g_queue_delete_link (&aggpad->priv->data, item);
} else {
break;
}
item = next;
}
PAD_UNLOCK (aggpad);
return TRUE;
}
static void
gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
GstFlowReturn flow_return, gboolean full)
@ -1056,6 +1094,10 @@ gst_aggregator_aggregate_func (GstAggregator * self)
gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
gst_aggregator_do_events_and_queries, NULL);
if (self->priv->peer_latency_live)
gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
gst_aggregator_pad_skip_buffers, NULL);
/* Ensure we have buffers ready (either in clipped_buffer or at the head of
* the queue */
if (!gst_aggregator_wait_and_check (self, &timeout))
@ -2426,18 +2468,12 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
{
GstFlowReturn flow_return;
GstClockTime buf_pts;
GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
PAD_LOCK (aggpad);
flow_return = aggpad->priv->flow_return;
if (flow_return != GST_FLOW_OK)
goto flushing;
if (klass->skip_buffer && klass->skip_buffer (aggpad, self, buffer))
goto skipped;
PAD_UNLOCK (aggpad);
buf_pts = GST_BUFFER_PTS (buffer);
@ -2541,14 +2577,6 @@ flushing:
gst_buffer_unref (buffer);
return flow_return;
skipped:
PAD_UNLOCK (aggpad);
GST_DEBUG_OBJECT (aggpad, "Skipped buffer %" GST_PTR_FORMAT, buffer);
gst_buffer_unref (buffer);
return GST_FLOW_OK;
}
static GstFlowReturn