aggregator: Queue "latency" buffers at each sink pad.

In the case where you have a source giving the GstAggregator smaller
buffers than it uses, when it reaches a timeout, it will consume the
first buffer, then try to read another buffer for the pad. If the
previous element is not fast enough, it may get the next buffer even
though it may be queued just before. To prevent that race, the easiest
solution is to move the queue inside the GstAggregatorPad itself. It
also means that there is no need for strange code cause by increasing
the min latency without increasing the max latency proportionally.

This also means queuing the synchronized events and possibly acting
on them on the src task.

https://bugzilla.gnome.org/show_bug.cgi?id=745768
This commit is contained in:
Olivier Crête 2015-03-06 19:50:08 -05:00
parent e4a1db2287
commit 6efc106a67
3 changed files with 330 additions and 79 deletions

View file

@ -215,7 +215,13 @@ struct _GstAggregatorPadPrivate
gboolean pending_flush_stop;
gboolean pending_eos;
GstBuffer *buffer;
GQueue buffers;
GstClockTime head_position;
GstClockTime tail_position;
GstClockTime head_time;
GstClockTime tail_time;
GstClockTime time_level;
gboolean eos;
GMutex lock;
@ -235,6 +241,15 @@ gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
aggpad->priv->pending_eos = FALSE;
aggpad->priv->eos = FALSE;
aggpad->priv->flow_return = GST_FLOW_OK;
GST_OBJECT_LOCK (aggpad);
gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED);
gst_segment_init (&aggpad->clip_segment, GST_FORMAT_UNDEFINED);
GST_OBJECT_UNLOCK (aggpad);
aggpad->priv->head_position = GST_CLOCK_TIME_NONE;
aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
aggpad->priv->tail_time = GST_CLOCK_TIME_NONE;
aggpad->priv->time_level = 0;
PAD_UNLOCK (aggpad);
if (klass->flush)
@ -287,7 +302,7 @@ struct _GstAggregatorPrivate
GstClockTime start_time;
/* properties */
gint64 latency;
gint64 latency; /* protected by both src_lock and all pad locks */
};
typedef struct
@ -312,6 +327,9 @@ enum
PROP_LAST
};
static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self,
GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head);
/**
* gst_aggregator_iterate_sinkpads:
* @self: The #GstAggregator
@ -391,6 +409,12 @@ no_iter:
return result;
}
static gboolean
gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
{
return (g_queue_peek_tail (&pad->priv->buffers) == NULL);
}
static gboolean
gst_aggregator_check_pads_ready (GstAggregator * self)
{
@ -414,10 +438,11 @@ gst_aggregator_check_pads_ready (GstAggregator * self)
* generate a start time from it. In non-live mode all pads need
* to have a buffer
*/
if (self->priv->peer_latency_live && pad->priv->buffer)
if (self->priv->peer_latency_live &&
!gst_aggregator_pad_queue_is_empty (pad))
self->priv->first_buffer = FALSE;
if (pad->priv->buffer == NULL && !pad->priv->eos) {
if (gst_aggregator_pad_queue_is_empty (pad) && !pad->priv->eos) {
PAD_UNLOCK (pad);
goto pad_not_ready;
}
@ -690,16 +715,69 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
return res;
}
static gboolean
check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data)
{
GstEvent *event = NULL;
GstAggregatorClass *klass = NULL;
gboolean *processed_event = user_data;
do {
event = NULL;
PAD_LOCK (pad);
if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) {
pad->priv->pending_eos = FALSE;
pad->priv->eos = TRUE;
}
if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) {
event = g_queue_pop_tail (&pad->priv->buffers);
PAD_BROADCAST_EVENT (pad);
}
PAD_UNLOCK (pad);
if (event) {
if (processed_event)
*processed_event = TRUE;
if (klass == NULL)
klass = GST_AGGREGATOR_GET_CLASS (self);
GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
klass->sink_event (self, pad, event);
}
} while (event != NULL);
return TRUE;
}
static void
gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
GstFlowReturn flow_return)
GstFlowReturn flow_return, gboolean full)
{
GList *item;
PAD_LOCK (aggpad);
if (flow_return == GST_FLOW_NOT_LINKED)
aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return);
else
aggpad->priv->flow_return = flow_return;
gst_buffer_replace (&aggpad->priv->buffer, NULL);
item = g_queue_peek_head_link (&aggpad->priv->buffers);
while (item) {
GList *next = item->next;
/* In partial flush, we do like the pad, we get rid of non-sticky events
* and EOS/SEGMENT.
*/
if (full || GST_IS_BUFFER (item->data) ||
GST_EVENT_TYPE (item->data) == GST_EVENT_EOS ||
GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT ||
!GST_EVENT_IS_STICKY (item->data)) {
gst_mini_object_unref (item->data);
g_queue_delete_link (&aggpad->priv->buffers, item);
}
item = next;
}
PAD_BROADCAST_EVENT (aggpad);
PAD_UNLOCK (aggpad);
}
@ -719,10 +797,17 @@ gst_aggregator_aggregate_func (GstAggregator * self)
GST_LOG_OBJECT (self, "Checking aggregate");
while (priv->send_eos && priv->running) {
GstFlowReturn flow_return;
gboolean processed_event = FALSE;
gst_aggregator_iterate_sinkpads (self, check_events, NULL);
if (!gst_aggregator_wait_and_check (self, &timeout))
continue;
gst_aggregator_iterate_sinkpads (self, check_events, &processed_event);
if (processed_event)
continue;
GST_TRACE_OBJECT (self, "Actually aggregating!");
flow_return = klass->aggregate (self, timeout);
@ -748,7 +833,7 @@ gst_aggregator_aggregate_func (GstAggregator * self)
for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) {
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
gst_aggregator_pad_set_flushing (aggpad, flow_return);
gst_aggregator_pad_set_flushing (aggpad, flow_return, TRUE);
}
GST_OBJECT_UNLOCK (self);
break;
@ -879,7 +964,7 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
GstAggregatorPrivate *priv = self->priv;
GstAggregatorPadPrivate *padpriv = aggpad->priv;
gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING);
gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, FALSE);
PAD_FLUSH_LOCK (aggpad);
PAD_LOCK (aggpad);
@ -914,10 +999,44 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
gst_event_unref (event);
}
PAD_FLUSH_UNLOCK (aggpad);
gst_aggregator_pad_drop_buffer (aggpad);
}
/* Must be called with the the PAD_LOCK held */
static void
update_time_level (GstAggregatorPad * aggpad, gboolean head)
{
if (head) {
if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->head_position) &&
aggpad->clip_segment.format == GST_FORMAT_TIME)
aggpad->priv->head_time =
gst_segment_to_running_time (&aggpad->clip_segment,
GST_FORMAT_TIME, aggpad->priv->head_position);
else
aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
} else {
if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_position) &&
aggpad->segment.format == GST_FORMAT_TIME)
aggpad->priv->tail_time =
gst_segment_to_running_time (&aggpad->segment,
GST_FORMAT_TIME, aggpad->priv->tail_position);
else
aggpad->priv->tail_time = aggpad->priv->head_time;
}
if (aggpad->priv->head_time == GST_CLOCK_TIME_NONE ||
aggpad->priv->tail_time == GST_CLOCK_TIME_NONE) {
aggpad->priv->time_level = 0;
return;
}
if (aggpad->priv->tail_time > aggpad->priv->head_time)
aggpad->priv->time_level = 0;
else
aggpad->priv->time_level = aggpad->priv->head_time -
aggpad->priv->tail_time;
}
/* GstAggregator vmethods default implementations */
static gboolean
gst_aggregator_default_sink_event (GstAggregator * self,
@ -978,7 +1097,7 @@ gst_aggregator_default_sink_event (GstAggregator * self,
*/
SRC_LOCK (self);
PAD_LOCK (aggpad);
if (!aggpad->priv->buffer) {
if (gst_aggregator_pad_queue_is_empty (aggpad)) {
aggpad->priv->eos = TRUE;
} else {
aggpad->priv->pending_eos = TRUE;
@ -991,9 +1110,12 @@ gst_aggregator_default_sink_event (GstAggregator * self,
}
case GST_EVENT_SEGMENT:
{
PAD_LOCK (aggpad);
GST_OBJECT_LOCK (aggpad);
gst_event_copy_segment (event, &aggpad->segment);
update_time_level (aggpad, FALSE);
GST_OBJECT_UNLOCK (aggpad);
PAD_UNLOCK (aggpad);
GST_OBJECT_LOCK (self);
self->priv->seqnum = gst_event_get_seqnum (event);
@ -1006,19 +1128,40 @@ gst_aggregator_default_sink_event (GstAggregator * self,
}
case GST_EVENT_GAP:
{
GstClockTime pts;
GstClockTime pts, endpts;
GstClockTime duration;
GstBuffer *gapbuf;
gst_event_parse_gap (event, &pts, &duration);
gapbuf = gst_buffer_new ();
if (GST_CLOCK_TIME_IS_VALID (duration))
endpts = pts + duration;
else
endpts = GST_CLOCK_TIME_NONE;
GST_OBJECT_LOCK (aggpad);
res = gst_segment_clip (&aggpad->segment, GST_FORMAT_TIME, pts, endpts,
&pts, &endpts);
GST_OBJECT_UNLOCK (aggpad);
if (!res) {
GST_WARNING_OBJECT (self, "GAP event outside segment, dropping");
goto eat;
}
if (GST_CLOCK_TIME_IS_VALID (endpts) && GST_CLOCK_TIME_IS_VALID (pts))
duration = endpts - pts;
else
duration = GST_CLOCK_TIME_NONE;
GST_BUFFER_PTS (gapbuf) = pts;
GST_BUFFER_DURATION (gapbuf) = duration;
GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP);
GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE);
if (gst_pad_chain (pad, gapbuf) != GST_FLOW_OK) {
if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) !=
GST_FLOW_OK) {
GST_WARNING_OBJECT (self, "Failed to chain gap buffer");
res = FALSE;
}
@ -1150,7 +1293,7 @@ gst_aggregator_release_pad (GstElement * element, GstPad * pad)
GST_INFO_OBJECT (pad, "Removing pad");
SRC_LOCK (self);
gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING);
gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
gst_element_remove_pad (element, pad);
self->priv->has_peer_latency = FALSE;
@ -1252,7 +1395,7 @@ gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query)
min += self->priv->sub_latency_min;
if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
&& GST_CLOCK_TIME_IS_VALID (max))
max += self->priv->sub_latency_max;
max += self->priv->sub_latency_max + our_latency;
else
max = GST_CLOCK_TIME_NONE;
@ -1660,41 +1803,36 @@ static void
gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency)
{
gboolean changed;
GstClockTime min, max;
g_return_if_fail (GST_IS_AGGREGATOR (self));
g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency));
SRC_LOCK (self);
if (self->priv->peer_latency_live) {
min = self->priv->peer_latency_min;
max = self->priv->peer_latency_max;
/* add our own */
min += latency;
min += self->priv->sub_latency_min;
if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
&& GST_CLOCK_TIME_IS_VALID (max))
max += self->priv->sub_latency_max;
else
max = GST_CLOCK_TIME_NONE;
if (GST_CLOCK_TIME_IS_VALID (max) && min > max) {
GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
("%s", "Latency too big"),
("The requested latency value is too big for the latency in the "
"current pipeline. Limiting to %" G_GINT64_FORMAT, max));
/* FIXME: This could in theory become negative, but in
* that case all is lost anyway */
latency -= min - max;
/* FIXME: shouldn't we g_object_notify() the change here? */
}
}
changed = (self->priv->latency != latency);
if (changed) {
GList *item;
GST_OBJECT_LOCK (self);
/* First lock all the pads */
for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
PAD_LOCK (aggpad);
}
self->priv->latency = latency;
if (changed)
SRC_BROADCAST (self);
/* Now wake up the pads */
for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
PAD_BROADCAST_EVENT (aggpad);
PAD_UNLOCK (aggpad);
}
GST_OBJECT_UNLOCK (self);
}
SRC_UNLOCK (self);
if (changed)
@ -1902,14 +2040,60 @@ gst_aggregator_get_type (void)
return type;
}
/* Must be called with PAD lock held */
static gboolean
gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad)
{
/* Empty queue always has space */
if (g_queue_get_length (&aggpad->priv->buffers) == 0)
return TRUE;
/* zero latency, if there is a buffer, it's full */
if (self->priv->latency == 0)
return FALSE;
/* Allow no more buffers than the latency */
return (aggpad->priv->time_level <= self->priv->latency);
}
/* Must be called with the PAD_LOCK held */
static void
apply_buffer (GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
{
GstClockTime timestamp;
if (GST_BUFFER_DTS_IS_VALID (buffer))
timestamp = GST_BUFFER_DTS (buffer);
else
timestamp = GST_BUFFER_PTS (buffer);
if (timestamp == GST_CLOCK_TIME_NONE) {
if (head)
timestamp = aggpad->priv->head_position;
else
timestamp = aggpad->priv->tail_position;
}
/* add duration */
if (GST_BUFFER_DURATION_IS_VALID (buffer))
timestamp += GST_BUFFER_DURATION (buffer);
if (head)
aggpad->priv->head_position = timestamp;
else
aggpad->priv->tail_position = timestamp;
update_time_level (aggpad, head);
}
static GstFlowReturn
gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
gst_aggregator_pad_chain_internal (GstAggregator * self,
GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
{
GstBuffer *actual_buf = buffer;
GstAggregator *self = GST_AGGREGATOR (object);
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object);
GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self);
GstFlowReturn flow_return;
GstClockTime buf_pts;
GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
@ -1923,26 +2107,49 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
if (aggpad->priv->pending_eos == TRUE)
goto eos;
while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK)
PAD_WAIT_EVENT (aggpad);
flow_return = aggpad->priv->flow_return;
if (flow_return != GST_FLOW_OK)
goto flushing;
PAD_UNLOCK (aggpad);
if (aggclass->clip) {
if (aggclass->clip && head) {
aggclass->clip (self, aggpad, buffer, &actual_buf);
}
if (actual_buf == NULL) {
GST_LOG_OBJECT (actual_buf, "Buffer dropped by clip function");
goto done;
}
buf_pts = GST_BUFFER_PTS (actual_buf);
for (;;) {
SRC_LOCK (self);
PAD_LOCK (aggpad);
if (aggpad->priv->buffer)
gst_buffer_unref (aggpad->priv->buffer);
aggpad->priv->buffer = actual_buf;
if (gst_aggregator_pad_has_space (self, aggpad)
&& aggpad->priv->flow_return == GST_FLOW_OK) {
if (head)
g_queue_push_head (&aggpad->priv->buffers, actual_buf);
else
g_queue_push_tail (&aggpad->priv->buffers, actual_buf);
apply_buffer (aggpad, actual_buf, head);
actual_buf = buffer = NULL;
SRC_BROADCAST (self);
break;
}
flow_return = aggpad->priv->flow_return;
if (flow_return != GST_FLOW_OK) {
SRC_UNLOCK (self);
goto flushing;
}
GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
SRC_UNLOCK (self);
PAD_WAIT_EVENT (aggpad);
PAD_UNLOCK (aggpad);
}
if (self->priv->first_buffer) {
GstClockTime start_time;
@ -1954,7 +2161,7 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
break;
case GST_AGGREGATOR_START_TIME_SELECTION_FIRST:
if (aggpad->segment.format == GST_FORMAT_TIME) {
start_time = GST_BUFFER_PTS (actual_buf);
start_time = buf_pts;
if (start_time != -1) {
start_time = MAX (start_time, aggpad->segment.start);
start_time =
@ -1990,11 +2197,12 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
}
PAD_UNLOCK (aggpad);
PAD_FLUSH_UNLOCK (aggpad);
SRC_BROADCAST (self);
SRC_UNLOCK (self);
done:
PAD_FLUSH_UNLOCK (aggpad);
GST_DEBUG_OBJECT (aggpad, "Done chaining");
return flow_return;
@ -2003,9 +2211,10 @@ flushing:
PAD_UNLOCK (aggpad);
PAD_FLUSH_UNLOCK (aggpad);
gst_buffer_unref (buffer);
GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer",
gst_flow_get_name (flow_return));
if (buffer)
gst_buffer_unref (buffer);
return flow_return;
@ -2014,11 +2223,18 @@ eos:
PAD_FLUSH_UNLOCK (aggpad);
gst_buffer_unref (buffer);
GST_DEBUG_OBJECT (pad, "We are EOS already...");
GST_DEBUG_OBJECT (aggpad, "We are EOS already...");
return GST_FLOW_EOS;
}
static GstFlowReturn
gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
{
return gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object),
GST_AGGREGATOR_PAD_CAST (pad), buffer, TRUE);
}
static gboolean
gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
GstQuery * query)
@ -2029,8 +2245,11 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
if (GST_QUERY_IS_SERIALIZED (query)) {
PAD_LOCK (aggpad);
while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK)
while (!gst_aggregator_pad_queue_is_empty (aggpad)
&& aggpad->priv->flow_return == GST_FLOW_OK) {
GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
PAD_WAIT_EVENT (aggpad);
}
if (aggpad->priv->flow_return != GST_FLOW_OK)
goto flushing;
@ -2052,31 +2271,49 @@ static gboolean
gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
GstEvent * event)
{
GstAggregator *self = GST_AGGREGATOR (parent);
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
if (GST_EVENT_IS_SERIALIZED (event) && GST_EVENT_TYPE (event) != GST_EVENT_EOS
&& GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE) {
/* && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE */ ) {
SRC_LOCK (self);
PAD_LOCK (aggpad);
while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK)
PAD_WAIT_EVENT (aggpad);
if (aggpad->priv->flow_return != GST_FLOW_OK
&& GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
goto flushing;
PAD_UNLOCK (aggpad);
if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
GST_OBJECT_LOCK (aggpad);
gst_event_copy_segment (event, &aggpad->clip_segment);
aggpad->priv->head_position = aggpad->clip_segment.position;
update_time_level (aggpad, TRUE);
GST_OBJECT_UNLOCK (aggpad);
}
return klass->sink_event (GST_AGGREGATOR (parent),
GST_AGGREGATOR_PAD (pad), event);
if (!gst_aggregator_pad_queue_is_empty (aggpad) &&
GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT,
event);
g_queue_push_head (&aggpad->priv->buffers, event);
event = NULL;
SRC_BROADCAST (self);
}
PAD_UNLOCK (aggpad);
SRC_UNLOCK (self);
}
if (event)
return klass->sink_event (self, aggpad, event);
else
return TRUE;
flushing:
GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event",
gst_flow_get_name (aggpad->priv->flow_return));
PAD_UNLOCK (aggpad);
SRC_UNLOCK (self);
if (GST_EVENT_IS_STICKY (event))
gst_pad_store_sticky_event (pad, event);
gst_event_unref (event);
@ -2087,10 +2324,14 @@ static gboolean
gst_aggregator_pad_activate_mode_func (GstPad * pad,
GstObject * parent, GstPadMode mode, gboolean active)
{
GstAggregator *self = GST_AGGREGATOR (parent);
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
if (active == FALSE) {
gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING);
SRC_LOCK (self);
gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
SRC_BROADCAST (self);
SRC_UNLOCK (self);
} else {
PAD_LOCK (aggpad);
aggpad->priv->flow_return = GST_FLOW_OK;
@ -2138,7 +2379,7 @@ gst_aggregator_pad_dispose (GObject * object)
{
GstAggregatorPad *pad = (GstAggregatorPad *) object;
gst_aggregator_pad_drop_buffer (pad);
gst_aggregator_pad_set_flushing (pad, GST_FLOW_FLUSHING, TRUE);
G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object);
}
@ -2162,7 +2403,7 @@ gst_aggregator_pad_init (GstAggregatorPad * pad)
G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD,
GstAggregatorPadPrivate);
pad->priv->buffer = NULL;
g_queue_init (&pad->priv->buffers);
g_cond_init (&pad->priv->event_cond);
g_mutex_init (&pad->priv->flush_lock);
@ -2184,11 +2425,13 @@ gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
GstBuffer *buffer = NULL;
PAD_LOCK (pad);
if (pad->priv->buffer) {
if (GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers)))
buffer = g_queue_pop_tail (&pad->priv->buffers);
if (buffer) {
apply_buffer (pad, buffer, FALSE);
GST_TRACE_OBJECT (pad, "Consuming buffer");
buffer = pad->priv->buffer;
pad->priv->buffer = NULL;
if (pad->priv->pending_eos) {
if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) {
pad->priv->pending_eos = FALSE;
pad->priv->eos = TRUE;
}
@ -2236,8 +2479,14 @@ gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
GstBuffer *buffer = NULL;
PAD_LOCK (pad);
if (pad->priv->buffer)
buffer = gst_buffer_ref (pad->priv->buffer);
buffer = g_queue_peek_tail (&pad->priv->buffers);
/* The tail should always be a buffer, because if it is an event,
* it will be consumed immeditaly in gst_aggregator_steal_buffer */
if (GST_IS_BUFFER (buffer))
gst_buffer_ref (buffer);
else
buffer = NULL;
PAD_UNLOCK (pad);
return buffer;

View file

@ -71,6 +71,8 @@ struct _GstAggregatorPad
/* Protected by the OBJECT_LOCK */
GstSegment segment;
/* Segment to use in the clip function, before the queue */
GstSegment clip_segment;
/* < Private > */
GstAggregatorPadPrivate * priv;

View file

@ -723,7 +723,7 @@ gst_audio_aggregator_do_clip (GstAggregator * agg,
bpf = GST_AUDIO_INFO_BPF (&pad->info);
GST_OBJECT_LOCK (bpad);
*out = gst_audio_buffer_clip (buffer, &bpad->segment, rate, bpf);
*out = gst_audio_buffer_clip (buffer, &bpad->clip_segment, rate, bpf);
GST_OBJECT_UNLOCK (bpad);
return GST_FLOW_OK;