aggregator: Delay clipping to output thread

This is required because the synchronized events like caps or segments
may only be processed on the output thread.

https://bugzilla.gnome.org/show_bug.cgi?id=781673
This commit is contained in:
Olivier Crête 2016-07-06 16:39:17 -04:00 committed by Tim-Philipp Müller
parent 5c2391a5e0
commit 2ce3234aa0
2 changed files with 90 additions and 44 deletions

View file

@ -212,12 +212,14 @@ struct _GstAggregatorPadPrivate
gboolean first_buffer;
GQueue buffers;
GstBuffer *clipped_buffer;
guint num_buffers;
GstClockTime head_position;
GstClockTime tail_position;
GstClockTime head_time;
GstClockTime tail_time;
GstClockTime time_level;
GstSegment head_segment; /* segment before the queue */
gboolean eos;
@ -238,7 +240,7 @@ gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad)
aggpad->priv->flow_return = GST_FLOW_OK;
GST_OBJECT_LOCK (aggpad);
gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED);
gst_segment_init (&aggpad->clip_segment, GST_FORMAT_UNDEFINED);
gst_segment_init (&aggpad->priv->head_segment, GST_FORMAT_UNDEFINED);
GST_OBJECT_UNLOCK (aggpad);
aggpad->priv->head_position = GST_CLOCK_TIME_NONE;
aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
@ -418,7 +420,8 @@ no_iter:
static gboolean
gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
{
return (g_queue_peek_tail (&pad->priv->buffers) == NULL);
return (g_queue_peek_tail (&pad->priv->buffers) == NULL &&
pad->priv->clipped_buffer == NULL);
}
static gboolean
@ -751,7 +754,8 @@ check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data)
pad->priv->pending_eos = FALSE;
pad->priv->eos = TRUE;
}
if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) {
if (pad->priv->clipped_buffer == NULL &&
GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) {
event = g_queue_pop_tail (&pad->priv->buffers);
PAD_BROADCAST_EVENT (pad);
}
@ -799,6 +803,7 @@ gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
item = next;
}
aggpad->priv->num_buffers = 0;
gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL);
PAD_BROADCAST_EVENT (aggpad);
PAD_UNLOCK (aggpad);
@ -1028,9 +1033,9 @@ update_time_level (GstAggregatorPad * aggpad, gboolean head)
{
if (head) {
if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->head_position) &&
aggpad->clip_segment.format == GST_FORMAT_TIME)
aggpad->priv->head_segment.format == GST_FORMAT_TIME)
aggpad->priv->head_time =
gst_segment_to_running_time (&aggpad->clip_segment,
gst_segment_to_running_time (&aggpad->priv->head_segment,
GST_FORMAT_TIME, aggpad->priv->head_position);
else
aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
@ -2091,7 +2096,7 @@ static gboolean
gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad)
{
/* Empty queue always has space */
if (aggpad->priv->num_buffers == 0)
if (aggpad->priv->num_buffers == 0 && aggpad->priv->clipped_buffer == NULL)
return TRUE;
/* We also want at least two buffers, one is being processed and one is ready
@ -2141,7 +2146,6 @@ static GstFlowReturn
gst_aggregator_pad_chain_internal (GstAggregator * self,
GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
{
GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self);
GstFlowReturn flow_return;
GstClockTime buf_pts;
@ -2159,15 +2163,6 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
PAD_UNLOCK (aggpad);
if (aggclass->clip && head) {
buffer = aggclass->clip (self, aggpad, buffer);
}
if (buffer == NULL) {
GST_LOG_OBJECT (aggpad, "Buffer dropped by clip function");
goto done;
}
buf_pts = GST_BUFFER_PTS (buffer);
aggpad->priv->first_buffer = FALSE;
@ -2213,13 +2208,13 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
break;
case GST_AGGREGATOR_START_TIME_SELECTION_FIRST:
GST_OBJECT_LOCK (aggpad);
if (aggpad->segment.format == GST_FORMAT_TIME) {
if (aggpad->priv->head_segment.format == GST_FORMAT_TIME) {
start_time = buf_pts;
if (start_time != -1) {
start_time = MAX (start_time, aggpad->segment.start);
start_time = MAX (start_time, aggpad->priv->head_segment.start);
start_time =
gst_segment_to_running_time (&aggpad->segment, GST_FORMAT_TIME,
start_time);
gst_segment_to_running_time (&aggpad->priv->head_segment,
GST_FORMAT_TIME, start_time);
}
} else {
start_time = 0;
@ -2252,8 +2247,6 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
GST_OBJECT_UNLOCK (self);
SRC_UNLOCK (self);
done:
PAD_FLUSH_UNLOCK (aggpad);
GST_DEBUG_OBJECT (aggpad, "Done chaining");
@ -2342,8 +2335,8 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
GST_OBJECT_LOCK (aggpad);
gst_event_copy_segment (event, &aggpad->clip_segment);
aggpad->priv->head_position = aggpad->clip_segment.position;
gst_event_copy_segment (event, &aggpad->priv->head_segment);
aggpad->priv->head_position = aggpad->priv->head_segment.position;
update_time_level (aggpad, TRUE);
GST_OBJECT_UNLOCK (aggpad);
}
@ -2476,6 +2469,64 @@ gst_aggregator_pad_init (GstAggregatorPad * pad)
gst_aggregator_pad_reset_unlocked (pad);
}
/* Must be called with the PAD_LOCK held */
static void
gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad)
{
pad->priv->num_buffers--;
GST_TRACE_OBJECT (pad, "Consuming buffer");
if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) {
pad->priv->pending_eos = FALSE;
pad->priv->eos = TRUE;
}
PAD_BROADCAST_EVENT (pad);
}
/* Must be called with the PAD_LOCK held */
static void
gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
{
GstAggregator *self = NULL;
GstAggregatorClass *aggclass;
GstBuffer *buffer = NULL;
while (pad->priv->clipped_buffer == NULL &&
GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers))) {
buffer = g_queue_pop_tail (&pad->priv->buffers);
apply_buffer (pad, buffer, FALSE);
/* We only take the parent here so that it's not taken if the buffer is
* already clipped or if the queue is empty.
*/
if (self == NULL) {
self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad)));
if (self == NULL) {
gst_buffer_unref (buffer);
return;
}
aggclass = GST_AGGREGATOR_GET_CLASS (self);
}
if (aggclass->clip) {
GST_TRACE_OBJECT (pad, "Clipping: %" GST_PTR_FORMAT, buffer);
buffer = aggclass->clip (self, pad, buffer);
if (buffer == NULL) {
gst_aggregator_pad_buffer_consumed (pad);
GST_TRACE_OBJECT (pad, "Clipping consumed the buffer");
}
}
pad->priv->clipped_buffer = buffer;
}
if (self)
gst_object_unref (self);
}
/**
* gst_aggregator_pad_steal_buffer:
* @pad: the pad to get buffer from
@ -2488,23 +2539,20 @@ gst_aggregator_pad_init (GstAggregatorPad * pad)
GstBuffer *
gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
{
GstBuffer *buffer = NULL;
GstBuffer *buffer;
PAD_LOCK (pad);
if (GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers)))
buffer = g_queue_pop_tail (&pad->priv->buffers);
gst_aggregator_pad_clip_buffer_unlocked (pad);
buffer = pad->priv->clipped_buffer;
pad->priv->clipped_buffer = NULL;
if (buffer) {
apply_buffer (pad, buffer, FALSE);
pad->priv->num_buffers--;
GST_TRACE_OBJECT (pad, "Consuming buffer");
if (pad->priv->num_buffers == 0 && pad->priv->pending_eos) {
pad->priv->pending_eos = FALSE;
pad->priv->eos = TRUE;
}
PAD_BROADCAST_EVENT (pad);
gst_aggregator_pad_buffer_consumed (pad);
GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
}
PAD_UNLOCK (pad);
return buffer;
@ -2543,17 +2591,17 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
GstBuffer *
gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
{
GstBuffer *buffer = NULL;
GstBuffer *buffer;
PAD_LOCK (pad);
buffer = g_queue_peek_tail (&pad->priv->buffers);
/* The tail should always be a buffer, because if it is an event,
* it will be consumed immeditaly in gst_aggregator_steal_buffer */
if (GST_IS_BUFFER (buffer))
gst_buffer_ref (buffer);
else
gst_aggregator_pad_clip_buffer_unlocked (pad);
if (pad->priv->clipped_buffer) {
buffer = gst_buffer_ref (pad->priv->clipped_buffer);
} else {
buffer = NULL;
}
PAD_UNLOCK (pad);
return buffer;

View file

@ -71,8 +71,6 @@ struct _GstAggregatorPad
/* Protected by the OBJECT_LOCK */
GstSegment segment;
/* Segment to use in the clip function, before the queue */
GstSegment clip_segment;
/* < Private > */
GstAggregatorPadPrivate * priv;