aggregator: Queue "latency" buffers at each sink pad.

In the case where you have a source giving the GstAggregator smaller
buffers than it uses, when it reaches a timeout, it will consume the
first buffer, then try to read another buffer for the pad. If the
previous element is not fast enough, it may get the next buffer even
though it may be queued just before. To prevent that race, the easiest
solution is to move the queue inside the GstAggregatorPad itself. It
also means that there is no need for strange code cause by increasing
the min latency without increasing the max latency proportionally.

This also means queuing the synchronized events and possibly acting
on them on the src task.

https://bugzilla.gnome.org/show_bug.cgi?id=745768
This commit is contained in:
Olivier Crête 2015-03-06 19:50:08 -05:00 committed by Tim-Philipp Müller
parent 5db97caef7
commit 22dc57f84e
2 changed files with 329 additions and 78 deletions

View file

@ -215,7 +215,13 @@ struct _GstAggregatorPadPrivate
gboolean pending_flush_stop; gboolean pending_flush_stop;
gboolean pending_eos; gboolean pending_eos;
GstBuffer *buffer; GQueue buffers;
GstClockTime head_position;
GstClockTime tail_position;
GstClockTime head_time;
GstClockTime tail_time;
GstClockTime time_level;
gboolean eos; gboolean eos;
GMutex lock; GMutex lock;
@ -235,6 +241,15 @@ gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
aggpad->priv->pending_eos = FALSE; aggpad->priv->pending_eos = FALSE;
aggpad->priv->eos = FALSE; aggpad->priv->eos = FALSE;
aggpad->priv->flow_return = GST_FLOW_OK; aggpad->priv->flow_return = GST_FLOW_OK;
GST_OBJECT_LOCK (aggpad);
gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED);
gst_segment_init (&aggpad->clip_segment, GST_FORMAT_UNDEFINED);
GST_OBJECT_UNLOCK (aggpad);
aggpad->priv->head_position = GST_CLOCK_TIME_NONE;
aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
aggpad->priv->tail_time = GST_CLOCK_TIME_NONE;
aggpad->priv->time_level = 0;
PAD_UNLOCK (aggpad); PAD_UNLOCK (aggpad);
if (klass->flush) if (klass->flush)
@ -287,7 +302,7 @@ struct _GstAggregatorPrivate
GstClockTime start_time; GstClockTime start_time;
/* properties */ /* properties */
gint64 latency; gint64 latency; /* protected by both src_lock and all pad locks */
}; };
typedef struct typedef struct
@ -312,6 +327,9 @@ enum
PROP_LAST PROP_LAST
}; };
static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self,
GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head);
/** /**
* gst_aggregator_iterate_sinkpads: * gst_aggregator_iterate_sinkpads:
* @self: The #GstAggregator * @self: The #GstAggregator
@ -391,6 +409,12 @@ no_iter:
return result; return result;
} }
static gboolean
gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
{
return (g_queue_peek_tail (&pad->priv->buffers) == NULL);
}
static gboolean static gboolean
gst_aggregator_check_pads_ready (GstAggregator * self) gst_aggregator_check_pads_ready (GstAggregator * self)
{ {
@ -414,10 +438,11 @@ gst_aggregator_check_pads_ready (GstAggregator * self)
* generate a start time from it. In non-live mode all pads need * generate a start time from it. In non-live mode all pads need
* to have a buffer * to have a buffer
*/ */
if (self->priv->peer_latency_live && pad->priv->buffer) if (self->priv->peer_latency_live &&
!gst_aggregator_pad_queue_is_empty (pad))
self->priv->first_buffer = FALSE; self->priv->first_buffer = FALSE;
if (pad->priv->buffer == NULL && !pad->priv->eos) { if (gst_aggregator_pad_queue_is_empty (pad) && !pad->priv->eos) {
PAD_UNLOCK (pad); PAD_UNLOCK (pad);
goto pad_not_ready; goto pad_not_ready;
} }
@ -690,16 +715,69 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
return res; return res;
} }
static gboolean
check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data)
{
GstEvent *event = NULL;
GstAggregatorClass *klass = NULL;
gboolean *processed_event = user_data;
do {
event = NULL;
PAD_LOCK (pad);
if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) {
pad->priv->pending_eos = FALSE;
pad->priv->eos = TRUE;
}
if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) {
event = g_queue_pop_tail (&pad->priv->buffers);
PAD_BROADCAST_EVENT (pad);
}
PAD_UNLOCK (pad);
if (event) {
if (processed_event)
*processed_event = TRUE;
if (klass == NULL)
klass = GST_AGGREGATOR_GET_CLASS (self);
GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
klass->sink_event (self, pad, event);
}
} while (event != NULL);
return TRUE;
}
static void static void
gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad, gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
GstFlowReturn flow_return) GstFlowReturn flow_return, gboolean full)
{ {
GList *item;
PAD_LOCK (aggpad); PAD_LOCK (aggpad);
if (flow_return == GST_FLOW_NOT_LINKED) if (flow_return == GST_FLOW_NOT_LINKED)
aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return); aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return);
else else
aggpad->priv->flow_return = flow_return; aggpad->priv->flow_return = flow_return;
gst_buffer_replace (&aggpad->priv->buffer, NULL);
item = g_queue_peek_head_link (&aggpad->priv->buffers);
while (item) {
GList *next = item->next;
/* In partial flush, we do like the pad, we get rid of non-sticky events
* and EOS/SEGMENT.
*/
if (full || GST_IS_BUFFER (item->data) ||
GST_EVENT_TYPE (item->data) == GST_EVENT_EOS ||
GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT ||
!GST_EVENT_IS_STICKY (item->data)) {
gst_mini_object_unref (item->data);
g_queue_delete_link (&aggpad->priv->buffers, item);
}
item = next;
}
PAD_BROADCAST_EVENT (aggpad); PAD_BROADCAST_EVENT (aggpad);
PAD_UNLOCK (aggpad); PAD_UNLOCK (aggpad);
} }
@ -719,10 +797,17 @@ gst_aggregator_aggregate_func (GstAggregator * self)
GST_LOG_OBJECT (self, "Checking aggregate"); GST_LOG_OBJECT (self, "Checking aggregate");
while (priv->send_eos && priv->running) { while (priv->send_eos && priv->running) {
GstFlowReturn flow_return; GstFlowReturn flow_return;
gboolean processed_event = FALSE;
gst_aggregator_iterate_sinkpads (self, check_events, NULL);
if (!gst_aggregator_wait_and_check (self, &timeout)) if (!gst_aggregator_wait_and_check (self, &timeout))
continue; continue;
gst_aggregator_iterate_sinkpads (self, check_events, &processed_event);
if (processed_event)
continue;
GST_TRACE_OBJECT (self, "Actually aggregating!"); GST_TRACE_OBJECT (self, "Actually aggregating!");
flow_return = klass->aggregate (self, timeout); flow_return = klass->aggregate (self, timeout);
@ -748,7 +833,7 @@ gst_aggregator_aggregate_func (GstAggregator * self)
for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) { for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) {
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
gst_aggregator_pad_set_flushing (aggpad, flow_return); gst_aggregator_pad_set_flushing (aggpad, flow_return, TRUE);
} }
GST_OBJECT_UNLOCK (self); GST_OBJECT_UNLOCK (self);
break; break;
@ -879,7 +964,7 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
GstAggregatorPrivate *priv = self->priv; GstAggregatorPrivate *priv = self->priv;
GstAggregatorPadPrivate *padpriv = aggpad->priv; GstAggregatorPadPrivate *padpriv = aggpad->priv;
gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING); gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, FALSE);
PAD_FLUSH_LOCK (aggpad); PAD_FLUSH_LOCK (aggpad);
PAD_LOCK (aggpad); PAD_LOCK (aggpad);
@ -914,10 +999,44 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
gst_event_unref (event); gst_event_unref (event);
} }
PAD_FLUSH_UNLOCK (aggpad); PAD_FLUSH_UNLOCK (aggpad);
gst_aggregator_pad_drop_buffer (aggpad);
} }
/* Must be called with the the PAD_LOCK held */
static void
update_time_level (GstAggregatorPad * aggpad, gboolean head)
{
if (head) {
if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->head_position) &&
aggpad->clip_segment.format == GST_FORMAT_TIME)
aggpad->priv->head_time =
gst_segment_to_running_time (&aggpad->clip_segment,
GST_FORMAT_TIME, aggpad->priv->head_position);
else
aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
} else {
if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_position) &&
aggpad->segment.format == GST_FORMAT_TIME)
aggpad->priv->tail_time =
gst_segment_to_running_time (&aggpad->segment,
GST_FORMAT_TIME, aggpad->priv->tail_position);
else
aggpad->priv->tail_time = aggpad->priv->head_time;
}
if (aggpad->priv->head_time == GST_CLOCK_TIME_NONE ||
aggpad->priv->tail_time == GST_CLOCK_TIME_NONE) {
aggpad->priv->time_level = 0;
return;
}
if (aggpad->priv->tail_time > aggpad->priv->head_time)
aggpad->priv->time_level = 0;
else
aggpad->priv->time_level = aggpad->priv->head_time -
aggpad->priv->tail_time;
}
/* GstAggregator vmethods default implementations */ /* GstAggregator vmethods default implementations */
static gboolean static gboolean
gst_aggregator_default_sink_event (GstAggregator * self, gst_aggregator_default_sink_event (GstAggregator * self,
@ -978,7 +1097,7 @@ gst_aggregator_default_sink_event (GstAggregator * self,
*/ */
SRC_LOCK (self); SRC_LOCK (self);
PAD_LOCK (aggpad); PAD_LOCK (aggpad);
if (!aggpad->priv->buffer) { if (gst_aggregator_pad_queue_is_empty (aggpad)) {
aggpad->priv->eos = TRUE; aggpad->priv->eos = TRUE;
} else { } else {
aggpad->priv->pending_eos = TRUE; aggpad->priv->pending_eos = TRUE;
@ -991,9 +1110,12 @@ gst_aggregator_default_sink_event (GstAggregator * self,
} }
case GST_EVENT_SEGMENT: case GST_EVENT_SEGMENT:
{ {
PAD_LOCK (aggpad);
GST_OBJECT_LOCK (aggpad); GST_OBJECT_LOCK (aggpad);
gst_event_copy_segment (event, &aggpad->segment); gst_event_copy_segment (event, &aggpad->segment);
update_time_level (aggpad, FALSE);
GST_OBJECT_UNLOCK (aggpad); GST_OBJECT_UNLOCK (aggpad);
PAD_UNLOCK (aggpad);
GST_OBJECT_LOCK (self); GST_OBJECT_LOCK (self);
self->priv->seqnum = gst_event_get_seqnum (event); self->priv->seqnum = gst_event_get_seqnum (event);
@ -1006,19 +1128,40 @@ gst_aggregator_default_sink_event (GstAggregator * self,
} }
case GST_EVENT_GAP: case GST_EVENT_GAP:
{ {
GstClockTime pts; GstClockTime pts, endpts;
GstClockTime duration; GstClockTime duration;
GstBuffer *gapbuf; GstBuffer *gapbuf;
gst_event_parse_gap (event, &pts, &duration); gst_event_parse_gap (event, &pts, &duration);
gapbuf = gst_buffer_new (); gapbuf = gst_buffer_new ();
if (GST_CLOCK_TIME_IS_VALID (duration))
endpts = pts + duration;
else
endpts = GST_CLOCK_TIME_NONE;
GST_OBJECT_LOCK (aggpad);
res = gst_segment_clip (&aggpad->segment, GST_FORMAT_TIME, pts, endpts,
&pts, &endpts);
GST_OBJECT_UNLOCK (aggpad);
if (!res) {
GST_WARNING_OBJECT (self, "GAP event outside segment, dropping");
goto eat;
}
if (GST_CLOCK_TIME_IS_VALID (endpts) && GST_CLOCK_TIME_IS_VALID (pts))
duration = endpts - pts;
else
duration = GST_CLOCK_TIME_NONE;
GST_BUFFER_PTS (gapbuf) = pts; GST_BUFFER_PTS (gapbuf) = pts;
GST_BUFFER_DURATION (gapbuf) = duration; GST_BUFFER_DURATION (gapbuf) = duration;
GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP); GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP);
GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE); GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE);
if (gst_pad_chain (pad, gapbuf) != GST_FLOW_OK) { if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) !=
GST_FLOW_OK) {
GST_WARNING_OBJECT (self, "Failed to chain gap buffer"); GST_WARNING_OBJECT (self, "Failed to chain gap buffer");
res = FALSE; res = FALSE;
} }
@ -1150,7 +1293,7 @@ gst_aggregator_release_pad (GstElement * element, GstPad * pad)
GST_INFO_OBJECT (pad, "Removing pad"); GST_INFO_OBJECT (pad, "Removing pad");
SRC_LOCK (self); SRC_LOCK (self);
gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING); gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
gst_element_remove_pad (element, pad); gst_element_remove_pad (element, pad);
self->priv->has_peer_latency = FALSE; self->priv->has_peer_latency = FALSE;
@ -1252,7 +1395,7 @@ gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query)
min += self->priv->sub_latency_min; min += self->priv->sub_latency_min;
if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max) if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
&& GST_CLOCK_TIME_IS_VALID (max)) && GST_CLOCK_TIME_IS_VALID (max))
max += self->priv->sub_latency_max; max += self->priv->sub_latency_max + our_latency;
else else
max = GST_CLOCK_TIME_NONE; max = GST_CLOCK_TIME_NONE;
@ -1660,41 +1803,36 @@ static void
gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency) gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency)
{ {
gboolean changed; gboolean changed;
GstClockTime min, max;
g_return_if_fail (GST_IS_AGGREGATOR (self)); g_return_if_fail (GST_IS_AGGREGATOR (self));
g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency)); g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency));
SRC_LOCK (self); SRC_LOCK (self);
if (self->priv->peer_latency_live) { changed = (self->priv->latency != latency);
min = self->priv->peer_latency_min;
max = self->priv->peer_latency_max;
/* add our own */
min += latency;
min += self->priv->sub_latency_min;
if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
&& GST_CLOCK_TIME_IS_VALID (max))
max += self->priv->sub_latency_max;
else
max = GST_CLOCK_TIME_NONE;
if (GST_CLOCK_TIME_IS_VALID (max) && min > max) { if (changed) {
GST_ELEMENT_WARNING (self, CORE, NEGOTIATION, GList *item;
("%s", "Latency too big"),
("The requested latency value is too big for the latency in the " GST_OBJECT_LOCK (self);
"current pipeline. Limiting to %" G_GINT64_FORMAT, max)); /* First lock all the pads */
/* FIXME: This could in theory become negative, but in for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
* that case all is lost anyway */ GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
latency -= min - max; PAD_LOCK (aggpad);
/* FIXME: shouldn't we g_object_notify() the change here? */
} }
self->priv->latency = latency;
SRC_BROADCAST (self);
/* Now wake up the pads */
for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
PAD_BROADCAST_EVENT (aggpad);
PAD_UNLOCK (aggpad);
}
GST_OBJECT_UNLOCK (self);
} }
changed = (self->priv->latency != latency);
self->priv->latency = latency;
if (changed)
SRC_BROADCAST (self);
SRC_UNLOCK (self); SRC_UNLOCK (self);
if (changed) if (changed)
@ -1902,14 +2040,60 @@ gst_aggregator_get_type (void)
return type; return type;
} }
/* Must be called with PAD lock held */
static gboolean
gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad)
{
/* Empty queue always has space */
if (g_queue_get_length (&aggpad->priv->buffers) == 0)
return TRUE;
/* zero latency, if there is a buffer, it's full */
if (self->priv->latency == 0)
return FALSE;
/* Allow no more buffers than the latency */
return (aggpad->priv->time_level <= self->priv->latency);
}
/* Must be called with the PAD_LOCK held */
static void
apply_buffer (GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
{
GstClockTime timestamp;
if (GST_BUFFER_DTS_IS_VALID (buffer))
timestamp = GST_BUFFER_DTS (buffer);
else
timestamp = GST_BUFFER_PTS (buffer);
if (timestamp == GST_CLOCK_TIME_NONE) {
if (head)
timestamp = aggpad->priv->head_position;
else
timestamp = aggpad->priv->tail_position;
}
/* add duration */
if (GST_BUFFER_DURATION_IS_VALID (buffer))
timestamp += GST_BUFFER_DURATION (buffer);
if (head)
aggpad->priv->head_position = timestamp;
else
aggpad->priv->tail_position = timestamp;
update_time_level (aggpad, head);
}
static GstFlowReturn static GstFlowReturn
gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) gst_aggregator_pad_chain_internal (GstAggregator * self,
GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
{ {
GstBuffer *actual_buf = buffer; GstBuffer *actual_buf = buffer;
GstAggregator *self = GST_AGGREGATOR (object); GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self);
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object);
GstFlowReturn flow_return; GstFlowReturn flow_return;
GstClockTime buf_pts;
GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer); GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
@ -1923,26 +2107,49 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
if (aggpad->priv->pending_eos == TRUE) if (aggpad->priv->pending_eos == TRUE)
goto eos; goto eos;
while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK)
PAD_WAIT_EVENT (aggpad);
flow_return = aggpad->priv->flow_return; flow_return = aggpad->priv->flow_return;
if (flow_return != GST_FLOW_OK) if (flow_return != GST_FLOW_OK)
goto flushing; goto flushing;
PAD_UNLOCK (aggpad); PAD_UNLOCK (aggpad);
if (aggclass->clip) { if (aggclass->clip && head) {
aggclass->clip (self, aggpad, buffer, &actual_buf); aggclass->clip (self, aggpad, buffer, &actual_buf);
} }
SRC_LOCK (self); if (actual_buf == NULL) {
PAD_LOCK (aggpad); GST_LOG_OBJECT (actual_buf, "Buffer dropped by clip function");
if (aggpad->priv->buffer) goto done;
gst_buffer_unref (aggpad->priv->buffer); }
aggpad->priv->buffer = actual_buf;
flow_return = aggpad->priv->flow_return; buf_pts = GST_BUFFER_PTS (actual_buf);
for (;;) {
SRC_LOCK (self);
PAD_LOCK (aggpad);
if (gst_aggregator_pad_has_space (self, aggpad)
&& aggpad->priv->flow_return == GST_FLOW_OK) {
if (head)
g_queue_push_head (&aggpad->priv->buffers, actual_buf);
else
g_queue_push_tail (&aggpad->priv->buffers, actual_buf);
apply_buffer (aggpad, actual_buf, head);
actual_buf = buffer = NULL;
SRC_BROADCAST (self);
break;
}
flow_return = aggpad->priv->flow_return;
if (flow_return != GST_FLOW_OK) {
SRC_UNLOCK (self);
goto flushing;
}
GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
SRC_UNLOCK (self);
PAD_WAIT_EVENT (aggpad);
PAD_UNLOCK (aggpad);
}
if (self->priv->first_buffer) { if (self->priv->first_buffer) {
GstClockTime start_time; GstClockTime start_time;
@ -1954,7 +2161,7 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
break; break;
case GST_AGGREGATOR_START_TIME_SELECTION_FIRST: case GST_AGGREGATOR_START_TIME_SELECTION_FIRST:
if (aggpad->segment.format == GST_FORMAT_TIME) { if (aggpad->segment.format == GST_FORMAT_TIME) {
start_time = GST_BUFFER_PTS (actual_buf); start_time = buf_pts;
if (start_time != -1) { if (start_time != -1) {
start_time = MAX (start_time, aggpad->segment.start); start_time = MAX (start_time, aggpad->segment.start);
start_time = start_time =
@ -1990,11 +2197,12 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
} }
PAD_UNLOCK (aggpad); PAD_UNLOCK (aggpad);
PAD_FLUSH_UNLOCK (aggpad);
SRC_BROADCAST (self);
SRC_UNLOCK (self); SRC_UNLOCK (self);
done:
PAD_FLUSH_UNLOCK (aggpad);
GST_DEBUG_OBJECT (aggpad, "Done chaining"); GST_DEBUG_OBJECT (aggpad, "Done chaining");
return flow_return; return flow_return;
@ -2003,9 +2211,10 @@ flushing:
PAD_UNLOCK (aggpad); PAD_UNLOCK (aggpad);
PAD_FLUSH_UNLOCK (aggpad); PAD_FLUSH_UNLOCK (aggpad);
gst_buffer_unref (buffer);
GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer", GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer",
gst_flow_get_name (flow_return)); gst_flow_get_name (flow_return));
if (buffer)
gst_buffer_unref (buffer);
return flow_return; return flow_return;
@ -2014,11 +2223,18 @@ eos:
PAD_FLUSH_UNLOCK (aggpad); PAD_FLUSH_UNLOCK (aggpad);
gst_buffer_unref (buffer); gst_buffer_unref (buffer);
GST_DEBUG_OBJECT (pad, "We are EOS already..."); GST_DEBUG_OBJECT (aggpad, "We are EOS already...");
return GST_FLOW_EOS; return GST_FLOW_EOS;
} }
static GstFlowReturn
gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
{
return gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object),
GST_AGGREGATOR_PAD_CAST (pad), buffer, TRUE);
}
static gboolean static gboolean
gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent, gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
GstQuery * query) GstQuery * query)
@ -2029,8 +2245,11 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
if (GST_QUERY_IS_SERIALIZED (query)) { if (GST_QUERY_IS_SERIALIZED (query)) {
PAD_LOCK (aggpad); PAD_LOCK (aggpad);
while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK) while (!gst_aggregator_pad_queue_is_empty (aggpad)
&& aggpad->priv->flow_return == GST_FLOW_OK) {
GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
PAD_WAIT_EVENT (aggpad); PAD_WAIT_EVENT (aggpad);
}
if (aggpad->priv->flow_return != GST_FLOW_OK) if (aggpad->priv->flow_return != GST_FLOW_OK)
goto flushing; goto flushing;
@ -2052,31 +2271,49 @@ static gboolean
gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent, gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
GstEvent * event) GstEvent * event)
{ {
GstAggregator *self = GST_AGGREGATOR (parent);
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
if (GST_EVENT_IS_SERIALIZED (event) && GST_EVENT_TYPE (event) != GST_EVENT_EOS if (GST_EVENT_IS_SERIALIZED (event) && GST_EVENT_TYPE (event) != GST_EVENT_EOS
&& GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE) { /* && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE */ ) {
SRC_LOCK (self);
PAD_LOCK (aggpad); PAD_LOCK (aggpad);
while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK)
PAD_WAIT_EVENT (aggpad);
if (aggpad->priv->flow_return != GST_FLOW_OK if (aggpad->priv->flow_return != GST_FLOW_OK
&& GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
goto flushing; goto flushing;
if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
GST_OBJECT_LOCK (aggpad);
gst_event_copy_segment (event, &aggpad->clip_segment);
aggpad->priv->head_position = aggpad->clip_segment.position;
update_time_level (aggpad, TRUE);
GST_OBJECT_UNLOCK (aggpad);
}
if (!gst_aggregator_pad_queue_is_empty (aggpad) &&
GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT,
event);
g_queue_push_head (&aggpad->priv->buffers, event);
event = NULL;
SRC_BROADCAST (self);
}
PAD_UNLOCK (aggpad); PAD_UNLOCK (aggpad);
SRC_UNLOCK (self);
} }
return klass->sink_event (GST_AGGREGATOR (parent), if (event)
GST_AGGREGATOR_PAD (pad), event); return klass->sink_event (self, aggpad, event);
else
return TRUE;
flushing: flushing:
GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event", GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event",
gst_flow_get_name (aggpad->priv->flow_return)); gst_flow_get_name (aggpad->priv->flow_return));
PAD_UNLOCK (aggpad); PAD_UNLOCK (aggpad);
SRC_UNLOCK (self);
if (GST_EVENT_IS_STICKY (event)) if (GST_EVENT_IS_STICKY (event))
gst_pad_store_sticky_event (pad, event); gst_pad_store_sticky_event (pad, event);
gst_event_unref (event); gst_event_unref (event);
@ -2087,10 +2324,14 @@ static gboolean
gst_aggregator_pad_activate_mode_func (GstPad * pad, gst_aggregator_pad_activate_mode_func (GstPad * pad,
GstObject * parent, GstPadMode mode, gboolean active) GstObject * parent, GstPadMode mode, gboolean active)
{ {
GstAggregator *self = GST_AGGREGATOR (parent);
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
if (active == FALSE) { if (active == FALSE) {
gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING); SRC_LOCK (self);
gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
SRC_BROADCAST (self);
SRC_UNLOCK (self);
} else { } else {
PAD_LOCK (aggpad); PAD_LOCK (aggpad);
aggpad->priv->flow_return = GST_FLOW_OK; aggpad->priv->flow_return = GST_FLOW_OK;
@ -2138,7 +2379,7 @@ gst_aggregator_pad_dispose (GObject * object)
{ {
GstAggregatorPad *pad = (GstAggregatorPad *) object; GstAggregatorPad *pad = (GstAggregatorPad *) object;
gst_aggregator_pad_drop_buffer (pad); gst_aggregator_pad_set_flushing (pad, GST_FLOW_FLUSHING, TRUE);
G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object); G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object);
} }
@ -2162,7 +2403,7 @@ gst_aggregator_pad_init (GstAggregatorPad * pad)
G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD, G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD,
GstAggregatorPadPrivate); GstAggregatorPadPrivate);
pad->priv->buffer = NULL; g_queue_init (&pad->priv->buffers);
g_cond_init (&pad->priv->event_cond); g_cond_init (&pad->priv->event_cond);
g_mutex_init (&pad->priv->flush_lock); g_mutex_init (&pad->priv->flush_lock);
@ -2184,11 +2425,13 @@ gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
GstBuffer *buffer = NULL; GstBuffer *buffer = NULL;
PAD_LOCK (pad); PAD_LOCK (pad);
if (pad->priv->buffer) { if (GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers)))
buffer = g_queue_pop_tail (&pad->priv->buffers);
if (buffer) {
apply_buffer (pad, buffer, FALSE);
GST_TRACE_OBJECT (pad, "Consuming buffer"); GST_TRACE_OBJECT (pad, "Consuming buffer");
buffer = pad->priv->buffer; if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) {
pad->priv->buffer = NULL;
if (pad->priv->pending_eos) {
pad->priv->pending_eos = FALSE; pad->priv->pending_eos = FALSE;
pad->priv->eos = TRUE; pad->priv->eos = TRUE;
} }
@ -2236,8 +2479,14 @@ gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
GstBuffer *buffer = NULL; GstBuffer *buffer = NULL;
PAD_LOCK (pad); PAD_LOCK (pad);
if (pad->priv->buffer) buffer = g_queue_peek_tail (&pad->priv->buffers);
buffer = gst_buffer_ref (pad->priv->buffer); /* The tail should always be a buffer, because if it is an event,
* it will be consumed immeditaly in gst_aggregator_steal_buffer */
if (GST_IS_BUFFER (buffer))
gst_buffer_ref (buffer);
else
buffer = NULL;
PAD_UNLOCK (pad); PAD_UNLOCK (pad);
return buffer; return buffer;

View file

@ -71,6 +71,8 @@ struct _GstAggregatorPad
/* Protected by the OBJECT_LOCK */ /* Protected by the OBJECT_LOCK */
GstSegment segment; GstSegment segment;
/* Segment to use in the clip function, before the queue */
GstSegment clip_segment;
/* < Private > */ /* < Private > */
GstAggregatorPadPrivate * priv; GstAggregatorPadPrivate * priv;