aggregator: Delay clipping to output thread

This is required because the synchronized events like caps or segments
may only be processed on the output thread.

https://bugzilla.gnome.org/show_bug.cgi?id=781673
This commit is contained in:
Olivier Crête 2016-07-06 16:39:17 -04:00
parent a6710944e8
commit 4cec1925e3
3 changed files with 91 additions and 45 deletions

View file

@ -752,7 +752,7 @@ gst_audio_aggregator_do_clip (GstAggregator * agg,
bpf = GST_AUDIO_INFO_BPF (&pad->info); bpf = GST_AUDIO_INFO_BPF (&pad->info);
GST_OBJECT_LOCK (bpad); GST_OBJECT_LOCK (bpad);
buffer = gst_audio_buffer_clip (buffer, &bpad->clip_segment, rate, bpf); buffer = gst_audio_buffer_clip (buffer, &bpad->segment, rate, bpf);
GST_OBJECT_UNLOCK (bpad); GST_OBJECT_UNLOCK (bpad);
return buffer; return buffer;

View file

@ -212,12 +212,14 @@ struct _GstAggregatorPadPrivate
gboolean first_buffer; gboolean first_buffer;
GQueue buffers; GQueue buffers;
GstBuffer *clipped_buffer;
guint num_buffers; guint num_buffers;
GstClockTime head_position; GstClockTime head_position;
GstClockTime tail_position; GstClockTime tail_position;
GstClockTime head_time; GstClockTime head_time;
GstClockTime tail_time; GstClockTime tail_time;
GstClockTime time_level; GstClockTime time_level;
GstSegment head_segment; /* segment before the queue */
gboolean eos; gboolean eos;
@ -238,7 +240,7 @@ gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad)
aggpad->priv->flow_return = GST_FLOW_OK; aggpad->priv->flow_return = GST_FLOW_OK;
GST_OBJECT_LOCK (aggpad); GST_OBJECT_LOCK (aggpad);
gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED); gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED);
gst_segment_init (&aggpad->clip_segment, GST_FORMAT_UNDEFINED); gst_segment_init (&aggpad->priv->head_segment, GST_FORMAT_UNDEFINED);
GST_OBJECT_UNLOCK (aggpad); GST_OBJECT_UNLOCK (aggpad);
aggpad->priv->head_position = GST_CLOCK_TIME_NONE; aggpad->priv->head_position = GST_CLOCK_TIME_NONE;
aggpad->priv->tail_position = GST_CLOCK_TIME_NONE; aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
@ -418,7 +420,8 @@ no_iter:
static gboolean static gboolean
gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad) gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
{ {
return (g_queue_peek_tail (&pad->priv->buffers) == NULL); return (g_queue_peek_tail (&pad->priv->buffers) == NULL &&
pad->priv->clipped_buffer == NULL);
} }
static gboolean static gboolean
@ -751,7 +754,8 @@ check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data)
pad->priv->pending_eos = FALSE; pad->priv->pending_eos = FALSE;
pad->priv->eos = TRUE; pad->priv->eos = TRUE;
} }
if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) { if (pad->priv->clipped_buffer == NULL &&
GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) {
event = g_queue_pop_tail (&pad->priv->buffers); event = g_queue_pop_tail (&pad->priv->buffers);
PAD_BROADCAST_EVENT (pad); PAD_BROADCAST_EVENT (pad);
} }
@ -799,6 +803,7 @@ gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
item = next; item = next;
} }
aggpad->priv->num_buffers = 0; aggpad->priv->num_buffers = 0;
gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL);
PAD_BROADCAST_EVENT (aggpad); PAD_BROADCAST_EVENT (aggpad);
PAD_UNLOCK (aggpad); PAD_UNLOCK (aggpad);
@ -1028,9 +1033,9 @@ update_time_level (GstAggregatorPad * aggpad, gboolean head)
{ {
if (head) { if (head) {
if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->head_position) && if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->head_position) &&
aggpad->clip_segment.format == GST_FORMAT_TIME) aggpad->priv->head_segment.format == GST_FORMAT_TIME)
aggpad->priv->head_time = aggpad->priv->head_time =
gst_segment_to_running_time (&aggpad->clip_segment, gst_segment_to_running_time (&aggpad->priv->head_segment,
GST_FORMAT_TIME, aggpad->priv->head_position); GST_FORMAT_TIME, aggpad->priv->head_position);
else else
aggpad->priv->head_time = GST_CLOCK_TIME_NONE; aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
@ -2091,7 +2096,7 @@ static gboolean
gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad) gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad)
{ {
/* Empty queue always has space */ /* Empty queue always has space */
if (aggpad->priv->num_buffers == 0) if (aggpad->priv->num_buffers == 0 && aggpad->priv->clipped_buffer == NULL)
return TRUE; return TRUE;
/* We also want at least two buffers, one is being processed and one is ready /* We also want at least two buffers, one is being processed and one is ready
@ -2141,7 +2146,6 @@ static GstFlowReturn
gst_aggregator_pad_chain_internal (GstAggregator * self, gst_aggregator_pad_chain_internal (GstAggregator * self,
GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head) GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
{ {
GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self);
GstFlowReturn flow_return; GstFlowReturn flow_return;
GstClockTime buf_pts; GstClockTime buf_pts;
@ -2159,15 +2163,6 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
PAD_UNLOCK (aggpad); PAD_UNLOCK (aggpad);
if (aggclass->clip && head) {
buffer = aggclass->clip (self, aggpad, buffer);
}
if (buffer == NULL) {
GST_LOG_OBJECT (aggpad, "Buffer dropped by clip function");
goto done;
}
buf_pts = GST_BUFFER_PTS (buffer); buf_pts = GST_BUFFER_PTS (buffer);
aggpad->priv->first_buffer = FALSE; aggpad->priv->first_buffer = FALSE;
@ -2213,13 +2208,13 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
break; break;
case GST_AGGREGATOR_START_TIME_SELECTION_FIRST: case GST_AGGREGATOR_START_TIME_SELECTION_FIRST:
GST_OBJECT_LOCK (aggpad); GST_OBJECT_LOCK (aggpad);
if (aggpad->segment.format == GST_FORMAT_TIME) { if (aggpad->priv->head_segment.format == GST_FORMAT_TIME) {
start_time = buf_pts; start_time = buf_pts;
if (start_time != -1) { if (start_time != -1) {
start_time = MAX (start_time, aggpad->segment.start); start_time = MAX (start_time, aggpad->priv->head_segment.start);
start_time = start_time =
gst_segment_to_running_time (&aggpad->segment, GST_FORMAT_TIME, gst_segment_to_running_time (&aggpad->priv->head_segment,
start_time); GST_FORMAT_TIME, start_time);
} }
} else { } else {
start_time = 0; start_time = 0;
@ -2252,8 +2247,6 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
GST_OBJECT_UNLOCK (self); GST_OBJECT_UNLOCK (self);
SRC_UNLOCK (self); SRC_UNLOCK (self);
done:
PAD_FLUSH_UNLOCK (aggpad); PAD_FLUSH_UNLOCK (aggpad);
GST_DEBUG_OBJECT (aggpad, "Done chaining"); GST_DEBUG_OBJECT (aggpad, "Done chaining");
@ -2342,8 +2335,8 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) { if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
GST_OBJECT_LOCK (aggpad); GST_OBJECT_LOCK (aggpad);
gst_event_copy_segment (event, &aggpad->clip_segment); gst_event_copy_segment (event, &aggpad->priv->head_segment);
aggpad->priv->head_position = aggpad->clip_segment.position; aggpad->priv->head_position = aggpad->priv->head_segment.position;
update_time_level (aggpad, TRUE); update_time_level (aggpad, TRUE);
GST_OBJECT_UNLOCK (aggpad); GST_OBJECT_UNLOCK (aggpad);
} }
@ -2476,6 +2469,64 @@ gst_aggregator_pad_init (GstAggregatorPad * pad)
gst_aggregator_pad_reset_unlocked (pad); gst_aggregator_pad_reset_unlocked (pad);
} }
/* Must be called with the PAD_LOCK held */
static void
gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad)
{
pad->priv->num_buffers--;
GST_TRACE_OBJECT (pad, "Consuming buffer");
if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) {
pad->priv->pending_eos = FALSE;
pad->priv->eos = TRUE;
}
PAD_BROADCAST_EVENT (pad);
}
/* Must be called with the PAD_LOCK held */
static void
gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
{
GstAggregator *self = NULL;
GstAggregatorClass *aggclass;
GstBuffer *buffer = NULL;
while (pad->priv->clipped_buffer == NULL &&
GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers))) {
buffer = g_queue_pop_tail (&pad->priv->buffers);
apply_buffer (pad, buffer, FALSE);
/* We only take the parent here so that it's not taken if the buffer is
* already clipped or if the queue is empty.
*/
if (self == NULL) {
self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad)));
if (self == NULL) {
gst_buffer_unref (buffer);
return;
}
aggclass = GST_AGGREGATOR_GET_CLASS (self);
}
if (aggclass->clip) {
GST_TRACE_OBJECT (pad, "Clipping: %" GST_PTR_FORMAT, buffer);
buffer = aggclass->clip (self, pad, buffer);
if (buffer == NULL) {
gst_aggregator_pad_buffer_consumed (pad);
GST_TRACE_OBJECT (pad, "Clipping consumed the buffer");
}
}
pad->priv->clipped_buffer = buffer;
}
if (self)
gst_object_unref (self);
}
/** /**
* gst_aggregator_pad_steal_buffer: * gst_aggregator_pad_steal_buffer:
* @pad: the pad to get buffer from * @pad: the pad to get buffer from
@ -2488,23 +2539,20 @@ gst_aggregator_pad_init (GstAggregatorPad * pad)
GstBuffer * GstBuffer *
gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad) gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
{ {
GstBuffer *buffer = NULL; GstBuffer *buffer;
PAD_LOCK (pad); PAD_LOCK (pad);
if (GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers)))
buffer = g_queue_pop_tail (&pad->priv->buffers); gst_aggregator_pad_clip_buffer_unlocked (pad);
buffer = pad->priv->clipped_buffer;
pad->priv->clipped_buffer = NULL;
if (buffer) { if (buffer) {
apply_buffer (pad, buffer, FALSE); gst_aggregator_pad_buffer_consumed (pad);
pad->priv->num_buffers--;
GST_TRACE_OBJECT (pad, "Consuming buffer");
if (pad->priv->num_buffers == 0 && pad->priv->pending_eos) {
pad->priv->pending_eos = FALSE;
pad->priv->eos = TRUE;
}
PAD_BROADCAST_EVENT (pad);
GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer); GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
} }
PAD_UNLOCK (pad); PAD_UNLOCK (pad);
return buffer; return buffer;
@ -2543,17 +2591,17 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
GstBuffer * GstBuffer *
gst_aggregator_pad_get_buffer (GstAggregatorPad * pad) gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
{ {
GstBuffer *buffer = NULL; GstBuffer *buffer;
PAD_LOCK (pad); PAD_LOCK (pad);
buffer = g_queue_peek_tail (&pad->priv->buffers);
/* The tail should always be a buffer, because if it is an event,
* it will be consumed immeditaly in gst_aggregator_steal_buffer */
if (GST_IS_BUFFER (buffer)) gst_aggregator_pad_clip_buffer_unlocked (pad);
gst_buffer_ref (buffer);
else if (pad->priv->clipped_buffer) {
buffer = gst_buffer_ref (pad->priv->clipped_buffer);
} else {
buffer = NULL; buffer = NULL;
}
PAD_UNLOCK (pad); PAD_UNLOCK (pad);
return buffer; return buffer;

View file

@ -71,8 +71,6 @@ struct _GstAggregatorPad
/* Protected by the OBJECT_LOCK */ /* Protected by the OBJECT_LOCK */
GstSegment segment; GstSegment segment;
/* Segment to use in the clip function, before the queue */
GstSegment clip_segment;
/* < Private > */ /* < Private > */
GstAggregatorPadPrivate * priv; GstAggregatorPadPrivate * priv;