diff --git a/gst-libs/gst/base/gstaggregator.c b/gst-libs/gst/base/gstaggregator.c index 6cb73c0c16..a0a8776bb6 100644 --- a/gst-libs/gst/base/gstaggregator.c +++ b/gst-libs/gst/base/gstaggregator.c @@ -215,7 +215,13 @@ struct _GstAggregatorPadPrivate gboolean pending_flush_stop; gboolean pending_eos; - GstBuffer *buffer; + GQueue buffers; + GstClockTime head_position; + GstClockTime tail_position; + GstClockTime head_time; + GstClockTime tail_time; + GstClockTime time_level; + gboolean eos; GMutex lock; @@ -235,6 +241,15 @@ gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg) aggpad->priv->pending_eos = FALSE; aggpad->priv->eos = FALSE; aggpad->priv->flow_return = GST_FLOW_OK; + GST_OBJECT_LOCK (aggpad); + gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED); + gst_segment_init (&aggpad->clip_segment, GST_FORMAT_UNDEFINED); + GST_OBJECT_UNLOCK (aggpad); + aggpad->priv->head_position = GST_CLOCK_TIME_NONE; + aggpad->priv->tail_position = GST_CLOCK_TIME_NONE; + aggpad->priv->head_time = GST_CLOCK_TIME_NONE; + aggpad->priv->tail_time = GST_CLOCK_TIME_NONE; + aggpad->priv->time_level = 0; PAD_UNLOCK (aggpad); if (klass->flush) @@ -287,7 +302,7 @@ struct _GstAggregatorPrivate GstClockTime start_time; /* properties */ - gint64 latency; + gint64 latency; /* protected by both src_lock and all pad locks */ }; typedef struct @@ -312,6 +327,9 @@ enum PROP_LAST }; +static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self, + GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head); + /** * gst_aggregator_iterate_sinkpads: * @self: The #GstAggregator @@ -391,6 +409,12 @@ no_iter: return result; } +static gboolean +gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad) +{ + return (g_queue_peek_tail (&pad->priv->buffers) == NULL); +} + static gboolean gst_aggregator_check_pads_ready (GstAggregator * self) { @@ -414,10 +438,11 @@ gst_aggregator_check_pads_ready (GstAggregator * self) * generate a start time from it. In non-live mode all pads need * to have a buffer */ - if (self->priv->peer_latency_live && pad->priv->buffer) + if (self->priv->peer_latency_live && + !gst_aggregator_pad_queue_is_empty (pad)) self->priv->first_buffer = FALSE; - if (pad->priv->buffer == NULL && !pad->priv->eos) { + if (gst_aggregator_pad_queue_is_empty (pad) && !pad->priv->eos) { PAD_UNLOCK (pad); goto pad_not_ready; } @@ -690,16 +715,69 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) return res; } +static gboolean +check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data) +{ + GstEvent *event = NULL; + GstAggregatorClass *klass = NULL; + gboolean *processed_event = user_data; + + do { + event = NULL; + + PAD_LOCK (pad); + if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) { + pad->priv->pending_eos = FALSE; + pad->priv->eos = TRUE; + } + if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) { + event = g_queue_pop_tail (&pad->priv->buffers); + PAD_BROADCAST_EVENT (pad); + } + PAD_UNLOCK (pad); + if (event) { + if (processed_event) + *processed_event = TRUE; + if (klass == NULL) + klass = GST_AGGREGATOR_GET_CLASS (self); + + GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event); + klass->sink_event (self, pad, event); + } + } while (event != NULL); + + return TRUE; +} + static void gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad, - GstFlowReturn flow_return) + GstFlowReturn flow_return, gboolean full) { + GList *item; + PAD_LOCK (aggpad); if (flow_return == GST_FLOW_NOT_LINKED) aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return); else aggpad->priv->flow_return = flow_return; - gst_buffer_replace (&aggpad->priv->buffer, NULL); + + item = g_queue_peek_head_link (&aggpad->priv->buffers); + while (item) { + GList *next = item->next; + + /* In partial flush, we do like the pad, we get rid of non-sticky events + * and EOS/SEGMENT. + */ + if (full || GST_IS_BUFFER (item->data) || + GST_EVENT_TYPE (item->data) == GST_EVENT_EOS || + GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT || + !GST_EVENT_IS_STICKY (item->data)) { + gst_mini_object_unref (item->data); + g_queue_delete_link (&aggpad->priv->buffers, item); + } + item = next; + } + PAD_BROADCAST_EVENT (aggpad); PAD_UNLOCK (aggpad); } @@ -719,10 +797,17 @@ gst_aggregator_aggregate_func (GstAggregator * self) GST_LOG_OBJECT (self, "Checking aggregate"); while (priv->send_eos && priv->running) { GstFlowReturn flow_return; + gboolean processed_event = FALSE; + + gst_aggregator_iterate_sinkpads (self, check_events, NULL); if (!gst_aggregator_wait_and_check (self, &timeout)) continue; + gst_aggregator_iterate_sinkpads (self, check_events, &processed_event); + if (processed_event) + continue; + GST_TRACE_OBJECT (self, "Actually aggregating!"); flow_return = klass->aggregate (self, timeout); @@ -748,7 +833,7 @@ gst_aggregator_aggregate_func (GstAggregator * self) for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) { GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data); - gst_aggregator_pad_set_flushing (aggpad, flow_return); + gst_aggregator_pad_set_flushing (aggpad, flow_return, TRUE); } GST_OBJECT_UNLOCK (self); break; @@ -879,7 +964,7 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad, GstAggregatorPrivate *priv = self->priv; GstAggregatorPadPrivate *padpriv = aggpad->priv; - gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING); + gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, FALSE); PAD_FLUSH_LOCK (aggpad); PAD_LOCK (aggpad); @@ -914,10 +999,44 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad, gst_event_unref (event); } PAD_FLUSH_UNLOCK (aggpad); - - gst_aggregator_pad_drop_buffer (aggpad); } +/* Must be called with the the PAD_LOCK held */ +static void +update_time_level (GstAggregatorPad * aggpad, gboolean head) +{ + if (head) { + if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->head_position) && + aggpad->clip_segment.format == GST_FORMAT_TIME) + aggpad->priv->head_time = + gst_segment_to_running_time (&aggpad->clip_segment, + GST_FORMAT_TIME, aggpad->priv->head_position); + else + aggpad->priv->head_time = GST_CLOCK_TIME_NONE; + } else { + if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_position) && + aggpad->segment.format == GST_FORMAT_TIME) + aggpad->priv->tail_time = + gst_segment_to_running_time (&aggpad->segment, + GST_FORMAT_TIME, aggpad->priv->tail_position); + else + aggpad->priv->tail_time = aggpad->priv->head_time; + } + + if (aggpad->priv->head_time == GST_CLOCK_TIME_NONE || + aggpad->priv->tail_time == GST_CLOCK_TIME_NONE) { + aggpad->priv->time_level = 0; + return; + } + + if (aggpad->priv->tail_time > aggpad->priv->head_time) + aggpad->priv->time_level = 0; + else + aggpad->priv->time_level = aggpad->priv->head_time - + aggpad->priv->tail_time; +} + + /* GstAggregator vmethods default implementations */ static gboolean gst_aggregator_default_sink_event (GstAggregator * self, @@ -978,7 +1097,7 @@ gst_aggregator_default_sink_event (GstAggregator * self, */ SRC_LOCK (self); PAD_LOCK (aggpad); - if (!aggpad->priv->buffer) { + if (gst_aggregator_pad_queue_is_empty (aggpad)) { aggpad->priv->eos = TRUE; } else { aggpad->priv->pending_eos = TRUE; @@ -991,9 +1110,12 @@ gst_aggregator_default_sink_event (GstAggregator * self, } case GST_EVENT_SEGMENT: { + PAD_LOCK (aggpad); GST_OBJECT_LOCK (aggpad); gst_event_copy_segment (event, &aggpad->segment); + update_time_level (aggpad, FALSE); GST_OBJECT_UNLOCK (aggpad); + PAD_UNLOCK (aggpad); GST_OBJECT_LOCK (self); self->priv->seqnum = gst_event_get_seqnum (event); @@ -1006,19 +1128,40 @@ gst_aggregator_default_sink_event (GstAggregator * self, } case GST_EVENT_GAP: { - GstClockTime pts; + GstClockTime pts, endpts; GstClockTime duration; GstBuffer *gapbuf; gst_event_parse_gap (event, &pts, &duration); gapbuf = gst_buffer_new (); + if (GST_CLOCK_TIME_IS_VALID (duration)) + endpts = pts + duration; + else + endpts = GST_CLOCK_TIME_NONE; + + GST_OBJECT_LOCK (aggpad); + res = gst_segment_clip (&aggpad->segment, GST_FORMAT_TIME, pts, endpts, + &pts, &endpts); + GST_OBJECT_UNLOCK (aggpad); + + if (!res) { + GST_WARNING_OBJECT (self, "GAP event outside segment, dropping"); + goto eat; + } + + if (GST_CLOCK_TIME_IS_VALID (endpts) && GST_CLOCK_TIME_IS_VALID (pts)) + duration = endpts - pts; + else + duration = GST_CLOCK_TIME_NONE; + GST_BUFFER_PTS (gapbuf) = pts; GST_BUFFER_DURATION (gapbuf) = duration; GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP); GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE); - if (gst_pad_chain (pad, gapbuf) != GST_FLOW_OK) { + if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) != + GST_FLOW_OK) { GST_WARNING_OBJECT (self, "Failed to chain gap buffer"); res = FALSE; } @@ -1150,7 +1293,7 @@ gst_aggregator_release_pad (GstElement * element, GstPad * pad) GST_INFO_OBJECT (pad, "Removing pad"); SRC_LOCK (self); - gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING); + gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE); gst_element_remove_pad (element, pad); self->priv->has_peer_latency = FALSE; @@ -1252,7 +1395,7 @@ gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query) min += self->priv->sub_latency_min; if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max) && GST_CLOCK_TIME_IS_VALID (max)) - max += self->priv->sub_latency_max; + max += self->priv->sub_latency_max + our_latency; else max = GST_CLOCK_TIME_NONE; @@ -1660,41 +1803,36 @@ static void gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency) { gboolean changed; - GstClockTime min, max; g_return_if_fail (GST_IS_AGGREGATOR (self)); g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency)); SRC_LOCK (self); - if (self->priv->peer_latency_live) { - min = self->priv->peer_latency_min; - max = self->priv->peer_latency_max; - /* add our own */ - min += latency; - min += self->priv->sub_latency_min; - if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max) - && GST_CLOCK_TIME_IS_VALID (max)) - max += self->priv->sub_latency_max; - else - max = GST_CLOCK_TIME_NONE; + changed = (self->priv->latency != latency); - if (GST_CLOCK_TIME_IS_VALID (max) && min > max) { - GST_ELEMENT_WARNING (self, CORE, NEGOTIATION, - ("%s", "Latency too big"), - ("The requested latency value is too big for the latency in the " - "current pipeline. Limiting to %" G_GINT64_FORMAT, max)); - /* FIXME: This could in theory become negative, but in - * that case all is lost anyway */ - latency -= min - max; - /* FIXME: shouldn't we g_object_notify() the change here? */ + if (changed) { + GList *item; + + GST_OBJECT_LOCK (self); + /* First lock all the pads */ + for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) { + GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data); + PAD_LOCK (aggpad); } + + self->priv->latency = latency; + + SRC_BROADCAST (self); + + /* Now wake up the pads */ + for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) { + GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data); + PAD_BROADCAST_EVENT (aggpad); + PAD_UNLOCK (aggpad); + } + GST_OBJECT_UNLOCK (self); } - changed = (self->priv->latency != latency); - self->priv->latency = latency; - - if (changed) - SRC_BROADCAST (self); SRC_UNLOCK (self); if (changed) @@ -1902,14 +2040,60 @@ gst_aggregator_get_type (void) return type; } +/* Must be called with PAD lock held */ +static gboolean +gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad) +{ + /* Empty queue always has space */ + if (g_queue_get_length (&aggpad->priv->buffers) == 0) + return TRUE; + + /* zero latency, if there is a buffer, it's full */ + if (self->priv->latency == 0) + return FALSE; + + /* Allow no more buffers than the latency */ + return (aggpad->priv->time_level <= self->priv->latency); +} + +/* Must be called with the PAD_LOCK held */ +static void +apply_buffer (GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head) +{ + GstClockTime timestamp; + + if (GST_BUFFER_DTS_IS_VALID (buffer)) + timestamp = GST_BUFFER_DTS (buffer); + else + timestamp = GST_BUFFER_PTS (buffer); + + if (timestamp == GST_CLOCK_TIME_NONE) { + if (head) + timestamp = aggpad->priv->head_position; + else + timestamp = aggpad->priv->tail_position; + } + + /* add duration */ + if (GST_BUFFER_DURATION_IS_VALID (buffer)) + timestamp += GST_BUFFER_DURATION (buffer); + + if (head) + aggpad->priv->head_position = timestamp; + else + aggpad->priv->tail_position = timestamp; + + update_time_level (aggpad, head); +} + static GstFlowReturn -gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) +gst_aggregator_pad_chain_internal (GstAggregator * self, + GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head) { GstBuffer *actual_buf = buffer; - GstAggregator *self = GST_AGGREGATOR (object); - GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); - GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object); + GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self); GstFlowReturn flow_return; + GstClockTime buf_pts; GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer); @@ -1923,26 +2107,49 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) if (aggpad->priv->pending_eos == TRUE) goto eos; - while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK) - PAD_WAIT_EVENT (aggpad); - flow_return = aggpad->priv->flow_return; if (flow_return != GST_FLOW_OK) goto flushing; PAD_UNLOCK (aggpad); - if (aggclass->clip) { + if (aggclass->clip && head) { aggclass->clip (self, aggpad, buffer, &actual_buf); } - SRC_LOCK (self); - PAD_LOCK (aggpad); - if (aggpad->priv->buffer) - gst_buffer_unref (aggpad->priv->buffer); - aggpad->priv->buffer = actual_buf; + if (actual_buf == NULL) { + GST_LOG_OBJECT (actual_buf, "Buffer dropped by clip function"); + goto done; + } - flow_return = aggpad->priv->flow_return; + buf_pts = GST_BUFFER_PTS (actual_buf); + + for (;;) { + SRC_LOCK (self); + PAD_LOCK (aggpad); + if (gst_aggregator_pad_has_space (self, aggpad) + && aggpad->priv->flow_return == GST_FLOW_OK) { + if (head) + g_queue_push_head (&aggpad->priv->buffers, actual_buf); + else + g_queue_push_tail (&aggpad->priv->buffers, actual_buf); + apply_buffer (aggpad, actual_buf, head); + actual_buf = buffer = NULL; + SRC_BROADCAST (self); + break; + } + + flow_return = aggpad->priv->flow_return; + if (flow_return != GST_FLOW_OK) { + SRC_UNLOCK (self); + goto flushing; + } + GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); + SRC_UNLOCK (self); + PAD_WAIT_EVENT (aggpad); + + PAD_UNLOCK (aggpad); + } if (self->priv->first_buffer) { GstClockTime start_time; @@ -1954,7 +2161,7 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) break; case GST_AGGREGATOR_START_TIME_SELECTION_FIRST: if (aggpad->segment.format == GST_FORMAT_TIME) { - start_time = GST_BUFFER_PTS (actual_buf); + start_time = buf_pts; if (start_time != -1) { start_time = MAX (start_time, aggpad->segment.start); start_time = @@ -1990,11 +2197,12 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) } PAD_UNLOCK (aggpad); - PAD_FLUSH_UNLOCK (aggpad); - - SRC_BROADCAST (self); SRC_UNLOCK (self); +done: + + PAD_FLUSH_UNLOCK (aggpad); + GST_DEBUG_OBJECT (aggpad, "Done chaining"); return flow_return; @@ -2003,9 +2211,10 @@ flushing: PAD_UNLOCK (aggpad); PAD_FLUSH_UNLOCK (aggpad); - gst_buffer_unref (buffer); GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer", gst_flow_get_name (flow_return)); + if (buffer) + gst_buffer_unref (buffer); return flow_return; @@ -2014,11 +2223,18 @@ eos: PAD_FLUSH_UNLOCK (aggpad); gst_buffer_unref (buffer); - GST_DEBUG_OBJECT (pad, "We are EOS already..."); + GST_DEBUG_OBJECT (aggpad, "We are EOS already..."); return GST_FLOW_EOS; } +static GstFlowReturn +gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) +{ + return gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object), + GST_AGGREGATOR_PAD_CAST (pad), buffer, TRUE); +} + static gboolean gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent, GstQuery * query) @@ -2029,8 +2245,11 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent, if (GST_QUERY_IS_SERIALIZED (query)) { PAD_LOCK (aggpad); - while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK) + while (!gst_aggregator_pad_queue_is_empty (aggpad) + && aggpad->priv->flow_return == GST_FLOW_OK) { + GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); PAD_WAIT_EVENT (aggpad); + } if (aggpad->priv->flow_return != GST_FLOW_OK) goto flushing; @@ -2052,31 +2271,49 @@ static gboolean gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent, GstEvent * event) { + GstAggregator *self = GST_AGGREGATOR (parent); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); if (GST_EVENT_IS_SERIALIZED (event) && GST_EVENT_TYPE (event) != GST_EVENT_EOS - && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE) { + /* && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE */ ) { + SRC_LOCK (self); PAD_LOCK (aggpad); - - while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK) - PAD_WAIT_EVENT (aggpad); - if (aggpad->priv->flow_return != GST_FLOW_OK && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) goto flushing; + if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) { + GST_OBJECT_LOCK (aggpad); + gst_event_copy_segment (event, &aggpad->clip_segment); + aggpad->priv->head_position = aggpad->clip_segment.position; + update_time_level (aggpad, TRUE); + GST_OBJECT_UNLOCK (aggpad); + } + + if (!gst_aggregator_pad_queue_is_empty (aggpad) && + GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) { + GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, + event); + g_queue_push_head (&aggpad->priv->buffers, event); + event = NULL; + SRC_BROADCAST (self); + } PAD_UNLOCK (aggpad); + SRC_UNLOCK (self); } - return klass->sink_event (GST_AGGREGATOR (parent), - GST_AGGREGATOR_PAD (pad), event); + if (event) + return klass->sink_event (self, aggpad, event); + else + return TRUE; flushing: GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event", gst_flow_get_name (aggpad->priv->flow_return)); PAD_UNLOCK (aggpad); + SRC_UNLOCK (self); if (GST_EVENT_IS_STICKY (event)) gst_pad_store_sticky_event (pad, event); gst_event_unref (event); @@ -2087,10 +2324,14 @@ static gboolean gst_aggregator_pad_activate_mode_func (GstPad * pad, GstObject * parent, GstPadMode mode, gboolean active) { + GstAggregator *self = GST_AGGREGATOR (parent); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); if (active == FALSE) { - gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING); + SRC_LOCK (self); + gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE); + SRC_BROADCAST (self); + SRC_UNLOCK (self); } else { PAD_LOCK (aggpad); aggpad->priv->flow_return = GST_FLOW_OK; @@ -2138,7 +2379,7 @@ gst_aggregator_pad_dispose (GObject * object) { GstAggregatorPad *pad = (GstAggregatorPad *) object; - gst_aggregator_pad_drop_buffer (pad); + gst_aggregator_pad_set_flushing (pad, GST_FLOW_FLUSHING, TRUE); G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object); } @@ -2162,7 +2403,7 @@ gst_aggregator_pad_init (GstAggregatorPad * pad) G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD, GstAggregatorPadPrivate); - pad->priv->buffer = NULL; + g_queue_init (&pad->priv->buffers); g_cond_init (&pad->priv->event_cond); g_mutex_init (&pad->priv->flush_lock); @@ -2184,11 +2425,13 @@ gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad) GstBuffer *buffer = NULL; PAD_LOCK (pad); - if (pad->priv->buffer) { + if (GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers))) + buffer = g_queue_pop_tail (&pad->priv->buffers); + + if (buffer) { + apply_buffer (pad, buffer, FALSE); GST_TRACE_OBJECT (pad, "Consuming buffer"); - buffer = pad->priv->buffer; - pad->priv->buffer = NULL; - if (pad->priv->pending_eos) { + if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) { pad->priv->pending_eos = FALSE; pad->priv->eos = TRUE; } @@ -2236,8 +2479,14 @@ gst_aggregator_pad_get_buffer (GstAggregatorPad * pad) GstBuffer *buffer = NULL; PAD_LOCK (pad); - if (pad->priv->buffer) - buffer = gst_buffer_ref (pad->priv->buffer); + buffer = g_queue_peek_tail (&pad->priv->buffers); + /* The tail should always be a buffer, because if it is an event, + * it will be consumed immeditaly in gst_aggregator_steal_buffer */ + + if (GST_IS_BUFFER (buffer)) + gst_buffer_ref (buffer); + else + buffer = NULL; PAD_UNLOCK (pad); return buffer; diff --git a/gst-libs/gst/base/gstaggregator.h b/gst-libs/gst/base/gstaggregator.h index cdfa9c2dce..9401d0c50b 100644 --- a/gst-libs/gst/base/gstaggregator.h +++ b/gst-libs/gst/base/gstaggregator.h @@ -71,6 +71,8 @@ struct _GstAggregatorPad /* Protected by the OBJECT_LOCK */ GstSegment segment; + /* Segment to use in the clip function, before the queue */ + GstSegment clip_segment; /* < Private > */ GstAggregatorPadPrivate * priv; diff --git a/gst/audiomixer/gstaudioaggregator.c b/gst/audiomixer/gstaudioaggregator.c index b7e5cff674..351b1d70a4 100644 --- a/gst/audiomixer/gstaudioaggregator.c +++ b/gst/audiomixer/gstaudioaggregator.c @@ -723,7 +723,7 @@ gst_audio_aggregator_do_clip (GstAggregator * agg, bpf = GST_AUDIO_INFO_BPF (&pad->info); GST_OBJECT_LOCK (bpad); - *out = gst_audio_buffer_clip (buffer, &bpad->segment, rate, bpf); + *out = gst_audio_buffer_clip (buffer, &bpad->clip_segment, rate, bpf); GST_OBJECT_UNLOCK (bpad); return GST_FLOW_OK;