From 22dc57f84ed2f5cdb1013cab94068a3b3034ae79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20Cr=C3=AAte?= Date: Fri, 6 Mar 2015 19:50:08 -0500 Subject: [PATCH] aggregator: Queue "latency" buffers at each sink pad. In the case where you have a source giving the GstAggregator smaller buffers than it uses, when it reaches a timeout, it will consume the first buffer, then try to read another buffer for the pad. If the previous element is not fast enough, it may get the next buffer even though it may be queued just before. To prevent that race, the easiest solution is to move the queue inside the GstAggregatorPad itself. It also means that there is no need for strange code cause by increasing the min latency without increasing the max latency proportionally. This also means queuing the synchronized events and possibly acting on them on the src task. https://bugzilla.gnome.org/show_bug.cgi?id=745768 --- libs/gst/base/gstaggregator.c | 405 +++++++++++++++++++++++++++------- libs/gst/base/gstaggregator.h | 2 + 2 files changed, 329 insertions(+), 78 deletions(-) diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index 6cb73c0c16..a0a8776bb6 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -215,7 +215,13 @@ struct _GstAggregatorPadPrivate gboolean pending_flush_stop; gboolean pending_eos; - GstBuffer *buffer; + GQueue buffers; + GstClockTime head_position; + GstClockTime tail_position; + GstClockTime head_time; + GstClockTime tail_time; + GstClockTime time_level; + gboolean eos; GMutex lock; @@ -235,6 +241,15 @@ gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg) aggpad->priv->pending_eos = FALSE; aggpad->priv->eos = FALSE; aggpad->priv->flow_return = GST_FLOW_OK; + GST_OBJECT_LOCK (aggpad); + gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED); + gst_segment_init (&aggpad->clip_segment, GST_FORMAT_UNDEFINED); + GST_OBJECT_UNLOCK (aggpad); + aggpad->priv->head_position = GST_CLOCK_TIME_NONE; + aggpad->priv->tail_position = GST_CLOCK_TIME_NONE; + aggpad->priv->head_time = GST_CLOCK_TIME_NONE; + aggpad->priv->tail_time = GST_CLOCK_TIME_NONE; + aggpad->priv->time_level = 0; PAD_UNLOCK (aggpad); if (klass->flush) @@ -287,7 +302,7 @@ struct _GstAggregatorPrivate GstClockTime start_time; /* properties */ - gint64 latency; + gint64 latency; /* protected by both src_lock and all pad locks */ }; typedef struct @@ -312,6 +327,9 @@ enum PROP_LAST }; +static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self, + GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head); + /** * gst_aggregator_iterate_sinkpads: * @self: The #GstAggregator @@ -391,6 +409,12 @@ no_iter: return result; } +static gboolean +gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad) +{ + return (g_queue_peek_tail (&pad->priv->buffers) == NULL); +} + static gboolean gst_aggregator_check_pads_ready (GstAggregator * self) { @@ -414,10 +438,11 @@ gst_aggregator_check_pads_ready (GstAggregator * self) * generate a start time from it. In non-live mode all pads need * to have a buffer */ - if (self->priv->peer_latency_live && pad->priv->buffer) + if (self->priv->peer_latency_live && + !gst_aggregator_pad_queue_is_empty (pad)) self->priv->first_buffer = FALSE; - if (pad->priv->buffer == NULL && !pad->priv->eos) { + if (gst_aggregator_pad_queue_is_empty (pad) && !pad->priv->eos) { PAD_UNLOCK (pad); goto pad_not_ready; } @@ -690,16 +715,69 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) return res; } +static gboolean +check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data) +{ + GstEvent *event = NULL; + GstAggregatorClass *klass = NULL; + gboolean *processed_event = user_data; + + do { + event = NULL; + + PAD_LOCK (pad); + if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) { + pad->priv->pending_eos = FALSE; + pad->priv->eos = TRUE; + } + if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) { + event = g_queue_pop_tail (&pad->priv->buffers); + PAD_BROADCAST_EVENT (pad); + } + PAD_UNLOCK (pad); + if (event) { + if (processed_event) + *processed_event = TRUE; + if (klass == NULL) + klass = GST_AGGREGATOR_GET_CLASS (self); + + GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event); + klass->sink_event (self, pad, event); + } + } while (event != NULL); + + return TRUE; +} + static void gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad, - GstFlowReturn flow_return) + GstFlowReturn flow_return, gboolean full) { + GList *item; + PAD_LOCK (aggpad); if (flow_return == GST_FLOW_NOT_LINKED) aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return); else aggpad->priv->flow_return = flow_return; - gst_buffer_replace (&aggpad->priv->buffer, NULL); + + item = g_queue_peek_head_link (&aggpad->priv->buffers); + while (item) { + GList *next = item->next; + + /* In partial flush, we do like the pad, we get rid of non-sticky events + * and EOS/SEGMENT. + */ + if (full || GST_IS_BUFFER (item->data) || + GST_EVENT_TYPE (item->data) == GST_EVENT_EOS || + GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT || + !GST_EVENT_IS_STICKY (item->data)) { + gst_mini_object_unref (item->data); + g_queue_delete_link (&aggpad->priv->buffers, item); + } + item = next; + } + PAD_BROADCAST_EVENT (aggpad); PAD_UNLOCK (aggpad); } @@ -719,10 +797,17 @@ gst_aggregator_aggregate_func (GstAggregator * self) GST_LOG_OBJECT (self, "Checking aggregate"); while (priv->send_eos && priv->running) { GstFlowReturn flow_return; + gboolean processed_event = FALSE; + + gst_aggregator_iterate_sinkpads (self, check_events, NULL); if (!gst_aggregator_wait_and_check (self, &timeout)) continue; + gst_aggregator_iterate_sinkpads (self, check_events, &processed_event); + if (processed_event) + continue; + GST_TRACE_OBJECT (self, "Actually aggregating!"); flow_return = klass->aggregate (self, timeout); @@ -748,7 +833,7 @@ gst_aggregator_aggregate_func (GstAggregator * self) for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) { GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data); - gst_aggregator_pad_set_flushing (aggpad, flow_return); + gst_aggregator_pad_set_flushing (aggpad, flow_return, TRUE); } GST_OBJECT_UNLOCK (self); break; @@ -879,7 +964,7 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad, GstAggregatorPrivate *priv = self->priv; GstAggregatorPadPrivate *padpriv = aggpad->priv; - gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING); + gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, FALSE); PAD_FLUSH_LOCK (aggpad); PAD_LOCK (aggpad); @@ -914,10 +999,44 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad, gst_event_unref (event); } PAD_FLUSH_UNLOCK (aggpad); - - gst_aggregator_pad_drop_buffer (aggpad); } +/* Must be called with the the PAD_LOCK held */ +static void +update_time_level (GstAggregatorPad * aggpad, gboolean head) +{ + if (head) { + if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->head_position) && + aggpad->clip_segment.format == GST_FORMAT_TIME) + aggpad->priv->head_time = + gst_segment_to_running_time (&aggpad->clip_segment, + GST_FORMAT_TIME, aggpad->priv->head_position); + else + aggpad->priv->head_time = GST_CLOCK_TIME_NONE; + } else { + if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_position) && + aggpad->segment.format == GST_FORMAT_TIME) + aggpad->priv->tail_time = + gst_segment_to_running_time (&aggpad->segment, + GST_FORMAT_TIME, aggpad->priv->tail_position); + else + aggpad->priv->tail_time = aggpad->priv->head_time; + } + + if (aggpad->priv->head_time == GST_CLOCK_TIME_NONE || + aggpad->priv->tail_time == GST_CLOCK_TIME_NONE) { + aggpad->priv->time_level = 0; + return; + } + + if (aggpad->priv->tail_time > aggpad->priv->head_time) + aggpad->priv->time_level = 0; + else + aggpad->priv->time_level = aggpad->priv->head_time - + aggpad->priv->tail_time; +} + + /* GstAggregator vmethods default implementations */ static gboolean gst_aggregator_default_sink_event (GstAggregator * self, @@ -978,7 +1097,7 @@ gst_aggregator_default_sink_event (GstAggregator * self, */ SRC_LOCK (self); PAD_LOCK (aggpad); - if (!aggpad->priv->buffer) { + if (gst_aggregator_pad_queue_is_empty (aggpad)) { aggpad->priv->eos = TRUE; } else { aggpad->priv->pending_eos = TRUE; @@ -991,9 +1110,12 @@ gst_aggregator_default_sink_event (GstAggregator * self, } case GST_EVENT_SEGMENT: { + PAD_LOCK (aggpad); GST_OBJECT_LOCK (aggpad); gst_event_copy_segment (event, &aggpad->segment); + update_time_level (aggpad, FALSE); GST_OBJECT_UNLOCK (aggpad); + PAD_UNLOCK (aggpad); GST_OBJECT_LOCK (self); self->priv->seqnum = gst_event_get_seqnum (event); @@ -1006,19 +1128,40 @@ gst_aggregator_default_sink_event (GstAggregator * self, } case GST_EVENT_GAP: { - GstClockTime pts; + GstClockTime pts, endpts; GstClockTime duration; GstBuffer *gapbuf; gst_event_parse_gap (event, &pts, &duration); gapbuf = gst_buffer_new (); + if (GST_CLOCK_TIME_IS_VALID (duration)) + endpts = pts + duration; + else + endpts = GST_CLOCK_TIME_NONE; + + GST_OBJECT_LOCK (aggpad); + res = gst_segment_clip (&aggpad->segment, GST_FORMAT_TIME, pts, endpts, + &pts, &endpts); + GST_OBJECT_UNLOCK (aggpad); + + if (!res) { + GST_WARNING_OBJECT (self, "GAP event outside segment, dropping"); + goto eat; + } + + if (GST_CLOCK_TIME_IS_VALID (endpts) && GST_CLOCK_TIME_IS_VALID (pts)) + duration = endpts - pts; + else + duration = GST_CLOCK_TIME_NONE; + GST_BUFFER_PTS (gapbuf) = pts; GST_BUFFER_DURATION (gapbuf) = duration; GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP); GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE); - if (gst_pad_chain (pad, gapbuf) != GST_FLOW_OK) { + if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) != + GST_FLOW_OK) { GST_WARNING_OBJECT (self, "Failed to chain gap buffer"); res = FALSE; } @@ -1150,7 +1293,7 @@ gst_aggregator_release_pad (GstElement * element, GstPad * pad) GST_INFO_OBJECT (pad, "Removing pad"); SRC_LOCK (self); - gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING); + gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE); gst_element_remove_pad (element, pad); self->priv->has_peer_latency = FALSE; @@ -1252,7 +1395,7 @@ gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query) min += self->priv->sub_latency_min; if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max) && GST_CLOCK_TIME_IS_VALID (max)) - max += self->priv->sub_latency_max; + max += self->priv->sub_latency_max + our_latency; else max = GST_CLOCK_TIME_NONE; @@ -1660,41 +1803,36 @@ static void gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency) { gboolean changed; - GstClockTime min, max; g_return_if_fail (GST_IS_AGGREGATOR (self)); g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency)); SRC_LOCK (self); - if (self->priv->peer_latency_live) { - min = self->priv->peer_latency_min; - max = self->priv->peer_latency_max; - /* add our own */ - min += latency; - min += self->priv->sub_latency_min; - if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max) - && GST_CLOCK_TIME_IS_VALID (max)) - max += self->priv->sub_latency_max; - else - max = GST_CLOCK_TIME_NONE; + changed = (self->priv->latency != latency); - if (GST_CLOCK_TIME_IS_VALID (max) && min > max) { - GST_ELEMENT_WARNING (self, CORE, NEGOTIATION, - ("%s", "Latency too big"), - ("The requested latency value is too big for the latency in the " - "current pipeline. Limiting to %" G_GINT64_FORMAT, max)); - /* FIXME: This could in theory become negative, but in - * that case all is lost anyway */ - latency -= min - max; - /* FIXME: shouldn't we g_object_notify() the change here? */ + if (changed) { + GList *item; + + GST_OBJECT_LOCK (self); + /* First lock all the pads */ + for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) { + GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data); + PAD_LOCK (aggpad); } + + self->priv->latency = latency; + + SRC_BROADCAST (self); + + /* Now wake up the pads */ + for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) { + GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data); + PAD_BROADCAST_EVENT (aggpad); + PAD_UNLOCK (aggpad); + } + GST_OBJECT_UNLOCK (self); } - changed = (self->priv->latency != latency); - self->priv->latency = latency; - - if (changed) - SRC_BROADCAST (self); SRC_UNLOCK (self); if (changed) @@ -1902,14 +2040,60 @@ gst_aggregator_get_type (void) return type; } +/* Must be called with PAD lock held */ +static gboolean +gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad) +{ + /* Empty queue always has space */ + if (g_queue_get_length (&aggpad->priv->buffers) == 0) + return TRUE; + + /* zero latency, if there is a buffer, it's full */ + if (self->priv->latency == 0) + return FALSE; + + /* Allow no more buffers than the latency */ + return (aggpad->priv->time_level <= self->priv->latency); +} + +/* Must be called with the PAD_LOCK held */ +static void +apply_buffer (GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head) +{ + GstClockTime timestamp; + + if (GST_BUFFER_DTS_IS_VALID (buffer)) + timestamp = GST_BUFFER_DTS (buffer); + else + timestamp = GST_BUFFER_PTS (buffer); + + if (timestamp == GST_CLOCK_TIME_NONE) { + if (head) + timestamp = aggpad->priv->head_position; + else + timestamp = aggpad->priv->tail_position; + } + + /* add duration */ + if (GST_BUFFER_DURATION_IS_VALID (buffer)) + timestamp += GST_BUFFER_DURATION (buffer); + + if (head) + aggpad->priv->head_position = timestamp; + else + aggpad->priv->tail_position = timestamp; + + update_time_level (aggpad, head); +} + static GstFlowReturn -gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) +gst_aggregator_pad_chain_internal (GstAggregator * self, + GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head) { GstBuffer *actual_buf = buffer; - GstAggregator *self = GST_AGGREGATOR (object); - GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); - GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object); + GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self); GstFlowReturn flow_return; + GstClockTime buf_pts; GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer); @@ -1923,26 +2107,49 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) if (aggpad->priv->pending_eos == TRUE) goto eos; - while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK) - PAD_WAIT_EVENT (aggpad); - flow_return = aggpad->priv->flow_return; if (flow_return != GST_FLOW_OK) goto flushing; PAD_UNLOCK (aggpad); - if (aggclass->clip) { + if (aggclass->clip && head) { aggclass->clip (self, aggpad, buffer, &actual_buf); } - SRC_LOCK (self); - PAD_LOCK (aggpad); - if (aggpad->priv->buffer) - gst_buffer_unref (aggpad->priv->buffer); - aggpad->priv->buffer = actual_buf; + if (actual_buf == NULL) { + GST_LOG_OBJECT (actual_buf, "Buffer dropped by clip function"); + goto done; + } - flow_return = aggpad->priv->flow_return; + buf_pts = GST_BUFFER_PTS (actual_buf); + + for (;;) { + SRC_LOCK (self); + PAD_LOCK (aggpad); + if (gst_aggregator_pad_has_space (self, aggpad) + && aggpad->priv->flow_return == GST_FLOW_OK) { + if (head) + g_queue_push_head (&aggpad->priv->buffers, actual_buf); + else + g_queue_push_tail (&aggpad->priv->buffers, actual_buf); + apply_buffer (aggpad, actual_buf, head); + actual_buf = buffer = NULL; + SRC_BROADCAST (self); + break; + } + + flow_return = aggpad->priv->flow_return; + if (flow_return != GST_FLOW_OK) { + SRC_UNLOCK (self); + goto flushing; + } + GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); + SRC_UNLOCK (self); + PAD_WAIT_EVENT (aggpad); + + PAD_UNLOCK (aggpad); + } if (self->priv->first_buffer) { GstClockTime start_time; @@ -1954,7 +2161,7 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) break; case GST_AGGREGATOR_START_TIME_SELECTION_FIRST: if (aggpad->segment.format == GST_FORMAT_TIME) { - start_time = GST_BUFFER_PTS (actual_buf); + start_time = buf_pts; if (start_time != -1) { start_time = MAX (start_time, aggpad->segment.start); start_time = @@ -1990,11 +2197,12 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) } PAD_UNLOCK (aggpad); - PAD_FLUSH_UNLOCK (aggpad); - - SRC_BROADCAST (self); SRC_UNLOCK (self); +done: + + PAD_FLUSH_UNLOCK (aggpad); + GST_DEBUG_OBJECT (aggpad, "Done chaining"); return flow_return; @@ -2003,9 +2211,10 @@ flushing: PAD_UNLOCK (aggpad); PAD_FLUSH_UNLOCK (aggpad); - gst_buffer_unref (buffer); GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer", gst_flow_get_name (flow_return)); + if (buffer) + gst_buffer_unref (buffer); return flow_return; @@ -2014,11 +2223,18 @@ eos: PAD_FLUSH_UNLOCK (aggpad); gst_buffer_unref (buffer); - GST_DEBUG_OBJECT (pad, "We are EOS already..."); + GST_DEBUG_OBJECT (aggpad, "We are EOS already..."); return GST_FLOW_EOS; } +static GstFlowReturn +gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) +{ + return gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object), + GST_AGGREGATOR_PAD_CAST (pad), buffer, TRUE); +} + static gboolean gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent, GstQuery * query) @@ -2029,8 +2245,11 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent, if (GST_QUERY_IS_SERIALIZED (query)) { PAD_LOCK (aggpad); - while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK) + while (!gst_aggregator_pad_queue_is_empty (aggpad) + && aggpad->priv->flow_return == GST_FLOW_OK) { + GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); PAD_WAIT_EVENT (aggpad); + } if (aggpad->priv->flow_return != GST_FLOW_OK) goto flushing; @@ -2052,31 +2271,49 @@ static gboolean gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent, GstEvent * event) { + GstAggregator *self = GST_AGGREGATOR (parent); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); if (GST_EVENT_IS_SERIALIZED (event) && GST_EVENT_TYPE (event) != GST_EVENT_EOS - && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE) { + /* && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE */ ) { + SRC_LOCK (self); PAD_LOCK (aggpad); - - while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK) - PAD_WAIT_EVENT (aggpad); - if (aggpad->priv->flow_return != GST_FLOW_OK && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) goto flushing; + if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) { + GST_OBJECT_LOCK (aggpad); + gst_event_copy_segment (event, &aggpad->clip_segment); + aggpad->priv->head_position = aggpad->clip_segment.position; + update_time_level (aggpad, TRUE); + GST_OBJECT_UNLOCK (aggpad); + } + + if (!gst_aggregator_pad_queue_is_empty (aggpad) && + GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) { + GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, + event); + g_queue_push_head (&aggpad->priv->buffers, event); + event = NULL; + SRC_BROADCAST (self); + } PAD_UNLOCK (aggpad); + SRC_UNLOCK (self); } - return klass->sink_event (GST_AGGREGATOR (parent), - GST_AGGREGATOR_PAD (pad), event); + if (event) + return klass->sink_event (self, aggpad, event); + else + return TRUE; flushing: GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event", gst_flow_get_name (aggpad->priv->flow_return)); PAD_UNLOCK (aggpad); + SRC_UNLOCK (self); if (GST_EVENT_IS_STICKY (event)) gst_pad_store_sticky_event (pad, event); gst_event_unref (event); @@ -2087,10 +2324,14 @@ static gboolean gst_aggregator_pad_activate_mode_func (GstPad * pad, GstObject * parent, GstPadMode mode, gboolean active) { + GstAggregator *self = GST_AGGREGATOR (parent); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); if (active == FALSE) { - gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING); + SRC_LOCK (self); + gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE); + SRC_BROADCAST (self); + SRC_UNLOCK (self); } else { PAD_LOCK (aggpad); aggpad->priv->flow_return = GST_FLOW_OK; @@ -2138,7 +2379,7 @@ gst_aggregator_pad_dispose (GObject * object) { GstAggregatorPad *pad = (GstAggregatorPad *) object; - gst_aggregator_pad_drop_buffer (pad); + gst_aggregator_pad_set_flushing (pad, GST_FLOW_FLUSHING, TRUE); G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object); } @@ -2162,7 +2403,7 @@ gst_aggregator_pad_init (GstAggregatorPad * pad) G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD, GstAggregatorPadPrivate); - pad->priv->buffer = NULL; + g_queue_init (&pad->priv->buffers); g_cond_init (&pad->priv->event_cond); g_mutex_init (&pad->priv->flush_lock); @@ -2184,11 +2425,13 @@ gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad) GstBuffer *buffer = NULL; PAD_LOCK (pad); - if (pad->priv->buffer) { + if (GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers))) + buffer = g_queue_pop_tail (&pad->priv->buffers); + + if (buffer) { + apply_buffer (pad, buffer, FALSE); GST_TRACE_OBJECT (pad, "Consuming buffer"); - buffer = pad->priv->buffer; - pad->priv->buffer = NULL; - if (pad->priv->pending_eos) { + if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) { pad->priv->pending_eos = FALSE; pad->priv->eos = TRUE; } @@ -2236,8 +2479,14 @@ gst_aggregator_pad_get_buffer (GstAggregatorPad * pad) GstBuffer *buffer = NULL; PAD_LOCK (pad); - if (pad->priv->buffer) - buffer = gst_buffer_ref (pad->priv->buffer); + buffer = g_queue_peek_tail (&pad->priv->buffers); + /* The tail should always be a buffer, because if it is an event, + * it will be consumed immeditaly in gst_aggregator_steal_buffer */ + + if (GST_IS_BUFFER (buffer)) + gst_buffer_ref (buffer); + else + buffer = NULL; PAD_UNLOCK (pad); return buffer; diff --git a/libs/gst/base/gstaggregator.h b/libs/gst/base/gstaggregator.h index cdfa9c2dce..9401d0c50b 100644 --- a/libs/gst/base/gstaggregator.h +++ b/libs/gst/base/gstaggregator.h @@ -71,6 +71,8 @@ struct _GstAggregatorPad /* Protected by the OBJECT_LOCK */ GstSegment segment; + /* Segment to use in the clip function, before the queue */ + GstSegment clip_segment; /* < Private > */ GstAggregatorPadPrivate * priv;