diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index 3503d22b9e..f18bbdbf99 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -69,8 +69,9 @@ /* Might become API */ static void gst_aggregator_merge_tags (GstAggregator * aggregator, const GstTagList * tags, GstTagMergeMode mode); -static void gst_aggregator_set_timeout (GstAggregator * agg, gint64 timeout); -static gint64 gst_aggregator_get_timeout (GstAggregator * agg); +static void gst_aggregator_set_latency_property (GstAggregator * agg, + gint64 latency); +static gint64 gst_aggregator_get_latency_property (GstAggregator * agg); GST_DEBUG_CATEGORY_STATIC (aggregator_debug); @@ -141,6 +142,48 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug); g_thread_self()); \ } G_STMT_END +#define SRC_STREAM_LOCK(self) G_STMT_START { \ + GST_LOG_OBJECT (self, "Taking src STREAM lock from thread %p", \ + g_thread_self()); \ + g_mutex_lock(&self->priv->src_lock); \ + GST_LOG_OBJECT (self, "Took src STREAM lock from thread %p", \ + g_thread_self()); \ + } G_STMT_END + +#define SRC_STREAM_UNLOCK(self) G_STMT_START { \ + GST_LOG_OBJECT (self, "Releasing src STREAM lock from thread %p", \ + g_thread_self()); \ + g_mutex_unlock(&self->priv->src_lock); \ + GST_LOG_OBJECT (self, "Release src STREAM lock from thread %p", \ + g_thread_self()); \ + } G_STMT_END + +#define SRC_STREAM_WAIT(self) G_STMT_START { \ + GST_LOG_OBJECT (self, "Waiting for src STREAM on thread %p", \ + g_thread_self()); \ + g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock)); \ + GST_LOG_OBJECT (self, "DONE Waiting for src STREAM on thread %p", \ + g_thread_self()); \ + } G_STMT_END + +#define SRC_STREAM_BROADCAST(self) { \ + GST_LOG_OBJECT (self, "Signaling src STREAM from thread %p", \ + g_thread_self()); \ + g_cond_broadcast(&(self->priv->src_cond)); \ + } + +#define KICK_SRC_THREAD(self) \ + do { \ + SRC_STREAM_LOCK (self); \ + GST_LOG_OBJECT (self, "kicking src STREAM from thread %p", \ + g_thread_self ()); \ + if (self->priv->aggregate_id) \ + gst_clock_id_unschedule (self->priv->aggregate_id); \ + self->priv->n_kicks++; \ + SRC_STREAM_BROADCAST (self); \ + SRC_STREAM_UNLOCK (self); \ + } while (0) + struct _GstAggregatorPadPrivate { gboolean pending_flush_start; @@ -148,8 +191,6 @@ struct _GstAggregatorPadPrivate gboolean pending_eos; gboolean flushing; - GstClockID timeout_id; - GMutex event_lock; GCond event_cond; @@ -175,38 +216,10 @@ _aggpad_flush (GstAggregatorPad * aggpad, GstAggregator * agg) *************************************/ static GstElementClass *aggregator_parent_class = NULL; -#define AGGREGATOR_QUEUE(self) (((GstAggregator*)self)->priv->queue) - -#define QUEUE_PUSH(self) G_STMT_START { \ - GST_LOG_OBJECT (self, "Pushing to QUEUE in thread %p", \ - g_thread_self()); \ - g_async_queue_push (AGGREGATOR_QUEUE (self), GINT_TO_POINTER (1)); \ -} G_STMT_END - -#define QUEUE_POP(self) G_STMT_START { \ - GST_LOG_OBJECT (self, "Waiting on QUEUE in thread %p", \ - g_thread_self()); \ - g_async_queue_pop (AGGREGATOR_QUEUE (self)); \ - GST_LOG_OBJECT (self, "Waited on QUEUE in thread %p", \ - g_thread_self()); \ -} G_STMT_END - -#define QUEUE_FLUSH(self) G_STMT_START { \ - GST_LOG_OBJECT (self, "Flushing QUEUE in thread %p", \ - g_thread_self()); \ - g_async_queue_lock (AGGREGATOR_QUEUE (self)); \ - while (g_async_queue_try_pop_unlocked (AGGREGATOR_QUEUE (self))); \ - g_async_queue_unlock (AGGREGATOR_QUEUE (self)); \ - GST_LOG_OBJECT (self, "Flushed QUEUE in thread %p", \ - g_thread_self()); \ -} G_STMT_END - struct _GstAggregatorPrivate { gint padcount; - GAsyncQueue *queue; - /* Our state is >= PAUSED */ gboolean running; @@ -230,6 +243,12 @@ struct _GstAggregatorPrivate gboolean latency_live; GstClockTime latency_min; GstClockTime latency_max; + + /* aggregate */ + GstClockID aggregate_id; + gint n_kicks; + GMutex src_lock; + GCond src_cond; }; typedef struct @@ -241,12 +260,12 @@ typedef struct gboolean one_actually_seeked; } EventData; -#define DEFAULT_TIMEOUT -1 +#define DEFAULT_LATENCY -1 enum { PROP_0, - PROP_TIMEOUT, + PROP_LATENCY, PROP_LAST }; @@ -329,18 +348,13 @@ no_iter: } static inline gboolean -_check_all_pads_with_data_or_eos_or_timeout (GstAggregator * self, - GstAggregatorPad * aggpad) +_check_all_pads_with_data_or_eos (GstAggregator * self, + GstAggregatorPad * aggpad, gpointer user_data) { if (aggpad->buffer || aggpad->eos) { return TRUE; } - if (g_atomic_int_get (&aggpad->unresponsive) == TRUE) { - /* pad has been deemed unresponsive */ - return TRUE; - } - GST_LOG_OBJECT (aggpad, "Not ready to be aggregated"); return FALSE; @@ -460,6 +474,95 @@ _push_eos (GstAggregator * self) gst_pad_push_event (self->srcpad, event); } +static GstClockTime +gst_aggregator_get_next_time (GstAggregator * self) +{ + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self); + + if (klass->get_next_time) + return klass->get_next_time (self); + + return GST_CLOCK_TIME_NONE; +} + +/* called with the src STREAM lock */ +static gboolean +_wait_and_check (GstAggregator * self) +{ + GstClockTime latency_max, latency_min; + GstClockTime start; + gboolean live; + + gst_aggregator_get_latency (self, &live, &latency_min, &latency_max); + + if (gst_aggregator_iterate_sinkpads (self, + (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, + NULL)) { + GST_DEBUG_OBJECT (self, "all pads have data"); + return TRUE; + } + + SRC_STREAM_LOCK (self); + start = gst_aggregator_get_next_time (self); + + if (!live || !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) + || !GST_CLOCK_TIME_IS_VALID (start)) { + while (self->priv->n_kicks <= 0) + SRC_STREAM_WAIT (self); + self->priv->n_kicks--; + } else { + GstClockTime time; + GstClockReturn status; + + GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT, + GST_TIME_ARGS (start)); + + time = GST_ELEMENT_CAST (self)->base_time + start; + + if (GST_CLOCK_TIME_IS_VALID (latency_max)) { + time += latency_max; + } else if (GST_CLOCK_TIME_IS_VALID (latency_min)) { + time += latency_min; + } else { + time += self->latency; + } + + GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %" + GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT + " latency max %" GST_TIME_FORMAT " latency min %" GST_TIME_FORMAT + " current %" GST_TIME_FORMAT ")", GST_TIME_ARGS (time), + GST_TIME_ARGS (GST_ELEMENT_CAST (self)->base_time), + GST_TIME_ARGS (start), GST_TIME_ARGS (latency_max), + GST_TIME_ARGS (latency_min), + GST_TIME_ARGS (gst_clock_get_time (GST_ELEMENT_CLOCK (self)))); + + self->priv->aggregate_id = + gst_clock_new_single_shot_id (GST_ELEMENT_CLOCK (self), time); + SRC_STREAM_UNLOCK (self); + + status = gst_clock_id_wait (self->priv->aggregate_id, NULL); + + SRC_STREAM_LOCK (self); + if (self->priv->aggregate_id) { + gst_clock_id_unref (self->priv->aggregate_id); + self->priv->aggregate_id = NULL; + } + self->priv->n_kicks--; + + GST_DEBUG_OBJECT (self, "clock returned %d", status); + + /* we timed out */ + if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) { + SRC_STREAM_UNLOCK (self); + return TRUE; + } + } + SRC_STREAM_UNLOCK (self); + + return gst_aggregator_iterate_sinkpads (self, + (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL); +} + static void aggregate_func (GstAggregator * self) { @@ -468,22 +571,19 @@ aggregate_func (GstAggregator * self) if (self->priv->running == FALSE) { GST_DEBUG_OBJECT (self, "Not running anymore"); - return; } - QUEUE_POP (self); - GST_LOG_OBJECT (self, "Checking aggregate"); - while (priv->send_eos && gst_aggregator_iterate_sinkpads (self, - (GstAggregatorPadForeachFunc) - _check_all_pads_with_data_or_eos_or_timeout, NULL) && priv->running) { + while (priv->send_eos && priv->running) { + if (!_wait_and_check (self)) + continue; + GST_TRACE_OBJECT (self, "Actually aggregating!"); priv->flow_return = klass->aggregate (self); if (priv->flow_return == GST_FLOW_EOS) { - QUEUE_FLUSH (self); _push_eos (self); } @@ -497,7 +597,6 @@ aggregate_func (GstAggregator * self) if (priv->flow_return != GST_FLOW_OK) break; } - } static gboolean @@ -528,14 +627,14 @@ _stop_srcpad_task (GstAggregator * self, GstEvent * flush_start) flush_start ? "Pausing" : "Stopping"); self->priv->running = FALSE; - QUEUE_PUSH (self); + KICK_SRC_THREAD (self); if (flush_start) { res = gst_pad_push_event (self->srcpad, flush_start); } gst_pad_stop_task (self->srcpad); - QUEUE_FLUSH (self); + KICK_SRC_THREAD (self); return res; } @@ -546,6 +645,7 @@ _start_srcpad_task (GstAggregator * self) GST_INFO_OBJECT (self, "Starting srcpad task"); self->priv->running = TRUE; + self->priv->n_kicks = 0; gst_pad_start_task (GST_PAD (self->srcpad), (GstTaskFunction) aggregate_func, self, NULL); } @@ -662,7 +762,7 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event) gst_pad_push_event (self->srcpad, event); priv->send_eos = TRUE; event = NULL; - QUEUE_PUSH (self); + KICK_SRC_THREAD (self); GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK"); GST_PAD_STREAM_UNLOCK (self->srcpad); @@ -690,7 +790,7 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event) } PAD_UNLOCK_EVENT (aggpad); - QUEUE_PUSH (self); + KICK_SRC_THREAD (self); goto eat; } case GST_EVENT_SEGMENT: @@ -741,15 +841,6 @@ _stop_pad (GstAggregator * self, GstAggregatorPad * pad, gpointer unused_udata) { _aggpad_flush (pad, self); - PAD_LOCK_EVENT (pad); - /* remove the timeouts */ - if (pad->priv->timeout_id) { - gst_clock_id_unschedule (pad->priv->timeout_id); - gst_clock_id_unref (pad->priv->timeout_id); - pad->priv->timeout_id = NULL; - } - PAD_UNLOCK_EVENT (pad); - return TRUE; } @@ -811,9 +902,9 @@ failure: static void _release_pad (GstElement * element, GstPad * pad) { + GstAggregator *self = GST_AGGREGATOR (element); GstBuffer *tmpbuf; - GstAggregator *self = GST_AGGREGATOR (element); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); GST_INFO_OBJECT (pad, "Removing pad"); @@ -823,39 +914,7 @@ _release_pad (GstElement * element, GstPad * pad) gst_buffer_replace (&tmpbuf, NULL); gst_element_remove_pad (element, pad); - /* Something changed make sure we try to aggregate */ - QUEUE_PUSH (self); -} - -static gboolean -_unresponsive_timeout (GstClock * clock, GstClockTime time, GstClockID id, - gpointer user_data) -{ - GstAggregatorPad *aggpad; - GstAggregator *self; - - if (user_data == NULL) - return FALSE; - - aggpad = GST_AGGREGATOR_PAD (user_data); - - /* avoid holding the last reference to the parent element here */ - PAD_LOCK_EVENT (aggpad); - - self = GST_AGGREGATOR (gst_pad_get_parent (GST_PAD (aggpad))); - - GST_DEBUG_OBJECT (aggpad, "marked unresponsive"); - - g_atomic_int_set (&aggpad->unresponsive, TRUE); - - if (self) { - QUEUE_PUSH (self); - gst_object_unref (self); - } - - PAD_UNLOCK_EVENT (aggpad); - - return TRUE; + KICK_SRC_THREAD (self); } static GstPad * @@ -970,11 +1029,11 @@ gst_aggregator_get_latency (GstAggregator * self, gboolean * live, min = self->priv->latency_min; max = self->priv->latency_max; - if (GST_CLOCK_TIME_IS_VALID (self->timeout)) { + if (GST_CLOCK_TIME_IS_VALID (self->latency)) { if (GST_CLOCK_TIME_IS_VALID (min)) - min += self->timeout; + min += self->latency; if (GST_CLOCK_TIME_IS_VALID (max)) - max += self->timeout; + max += self->latency; } if (live) @@ -998,13 +1057,13 @@ gst_aggregator_query_latency (GstAggregator * self, GstQuery * query) gst_aggregator_iterate_sinkpads (self, (GstAggregatorPadForeachFunc) _latency_query, &data); - if (data.live && GST_CLOCK_TIME_IS_VALID (self->timeout) && - self->timeout > data.max) { + if (data.live && GST_CLOCK_TIME_IS_VALID (self->latency) && + self->latency > data.max) { GST_ELEMENT_WARNING (self, CORE, NEGOTIATION, - ("%s", "Timeout too big"), - ("The requested timeout value is too big for the latency in the " - "current pipeline. Limiting to %" G_GINT64_FORMAT, data.max)); - self->timeout = data.max; + ("%s", "Latency too big"), + ("The requested latency value is too big for the current pipeline. " + "Limiting to %" G_GINT64_FORMAT, data.max)); + self->latency = data.max; } self->priv->latency_live = data.live; @@ -1012,11 +1071,11 @@ gst_aggregator_query_latency (GstAggregator * self, GstQuery * query) self->priv->latency_max = data.max; /* add our own */ - if (GST_CLOCK_TIME_IS_VALID (self->timeout)) { + if (GST_CLOCK_TIME_IS_VALID (self->latency)) { if (GST_CLOCK_TIME_IS_VALID (data.min)) - data.min += self->timeout; + data.min += self->latency; if (GST_CLOCK_TIME_IS_VALID (data.max)) - data.max += self->timeout; + data.max += self->latency; } GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT @@ -1317,6 +1376,8 @@ gst_aggregator_finalize (GObject * object) gst_object_unref (self->clock); g_mutex_clear (&self->priv->setcaps_lock); + g_mutex_clear (&self->priv->src_lock); + g_cond_clear (&self->priv->src_cond); G_OBJECT_CLASS (aggregator_parent_class)->finalize (object); } @@ -1324,51 +1385,44 @@ gst_aggregator_finalize (GObject * object) static void gst_aggregator_dispose (GObject * object) { - GstAggregator *self = (GstAggregator *) object; - G_OBJECT_CLASS (aggregator_parent_class)->dispose (object); - - if (AGGREGATOR_QUEUE (self)) { - g_async_queue_unref (AGGREGATOR_QUEUE (self)); - AGGREGATOR_QUEUE (self) = NULL; - } } -/** - * gst_aggregator_set_timeout: +/* + * gst_aggregator_set_latency_property: * @agg: a #GstAggregator - * @timeout: the new timeout value. + * @latency: the new latency value. * - * Sets the new timeout value to @timeout. This value is used to limit the + * Sets the new latency value to @latency. This value is used to limit the * amount of time a pad waits for data to appear before considering the pad * as unresponsive. */ static void -gst_aggregator_set_timeout (GstAggregator * self, gint64 timeout) +gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency) { g_return_if_fail (GST_IS_AGGREGATOR (self)); GST_OBJECT_LOCK (self); if (self->priv->latency_live && self->priv->latency_max != 0 && - GST_CLOCK_TIME_IS_VALID (timeout) && timeout > self->priv->latency_max) { + GST_CLOCK_TIME_IS_VALID (latency) && latency > self->priv->latency_max) { GST_ELEMENT_WARNING (self, CORE, NEGOTIATION, - ("%s", "Timeout too big"), - ("The requested timeout value is too big for the latency in the " + ("%s", "Latency too big"), + ("The requested latency value is too big for the latency in the " "current pipeline. Limiting to %" G_GINT64_FORMAT, self->priv->latency_max)); - timeout = self->priv->latency_max; + latency = self->priv->latency_max; } - self->timeout = timeout; + self->latency = latency; GST_OBJECT_UNLOCK (self); } -/** - * gst_aggregator_get_timeout: +/* + * gst_aggregator_get_latency_property: * @agg: a #GstAggregator * - * Gets the timeout value. See gst_aggregator_set_timeout for + * Gets the latency value. See gst_aggregator_set_latency for * more details. * * Returns: The time in nanoseconds to wait for data to arrive on a sink pad @@ -1376,14 +1430,14 @@ gst_aggregator_set_timeout (GstAggregator * self, gint64 timeout) * unlimited time. */ static gint64 -gst_aggregator_get_timeout (GstAggregator * agg) +gst_aggregator_get_latency_property (GstAggregator * agg) { gint64 res; g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1); GST_OBJECT_LOCK (agg); - res = agg->timeout; + res = agg->latency; GST_OBJECT_UNLOCK (agg); return res; @@ -1396,8 +1450,8 @@ gst_aggregator_set_property (GObject * object, guint prop_id, GstAggregator *agg = GST_AGGREGATOR (object); switch (prop_id) { - case PROP_TIMEOUT: - gst_aggregator_set_timeout (agg, g_value_get_int64 (value)); + case PROP_LATENCY: + gst_aggregator_set_latency_property (agg, g_value_get_int64 (value)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -1412,8 +1466,8 @@ gst_aggregator_get_property (GObject * object, guint prop_id, GstAggregator *agg = GST_AGGREGATOR (object); switch (prop_id) { - case PROP_TIMEOUT: - g_value_set_int64 (value, gst_aggregator_get_timeout (agg)); + case PROP_LATENCY: + g_value_set_int64 (value, gst_aggregator_get_latency_property (agg)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -1454,12 +1508,12 @@ gst_aggregator_class_init (GstAggregatorClass * klass) gobject_class->finalize = gst_aggregator_finalize; gobject_class->dispose = gst_aggregator_dispose; - g_object_class_install_property (gobject_class, PROP_TIMEOUT, - g_param_spec_int64 ("timeout", "Buffer timeout", + g_object_class_install_property (gobject_class, PROP_LATENCY, + g_param_spec_int64 ("latency", "Buffer latency", "Number of nanoseconds to wait for a buffer to arrive on a sink pad" "before the pad is deemed unresponsive (-1 unlimited)", -1, (G_MAXLONG == G_MAXINT64) ? G_MAXINT64 : (G_MAXLONG * GST_SECOND - 1), - DEFAULT_TIMEOUT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); } static void @@ -1488,7 +1542,6 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass) self->priv->latency_max = GST_CLOCK_TIME_NONE; _reset_flow_values (self); - AGGREGATOR_QUEUE (self) = g_async_queue_new (); self->srcpad = gst_pad_new_from_template (pad_template, "src"); gst_pad_set_event_function (self->srcpad, @@ -1501,9 +1554,11 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass) gst_element_add_pad (GST_ELEMENT (self), self->srcpad); self->clock = gst_system_clock_obtain (); - self->timeout = -1; + self->latency = -1; g_mutex_init (&self->priv->setcaps_lock); + g_mutex_init (&self->priv->src_lock); + g_cond_init (&self->priv->src_cond); } /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init @@ -1542,16 +1597,8 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer) GstAggregatorPrivate *priv = self->priv; GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object); - GstClockTime timeout = gst_aggregator_get_timeout (self); - GstClockTime now; GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer); - if (aggpad->priv->timeout_id) { - gst_clock_id_unschedule (aggpad->priv->timeout_id); - gst_clock_id_unref (aggpad->priv->timeout_id); - aggpad->priv->timeout_id = NULL; - } - g_atomic_int_set (&aggpad->unresponsive, FALSE); PAD_STREAM_LOCK (aggpad); @@ -1583,15 +1630,9 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer) PAD_UNLOCK_EVENT (aggpad); PAD_STREAM_UNLOCK (aggpad); - QUEUE_PUSH (self); - - if (GST_CLOCK_TIME_IS_VALID (timeout)) { - now = gst_clock_get_time (self->clock); - aggpad->priv->timeout_id = - gst_clock_new_single_shot_id (self->clock, now + timeout); - gst_clock_id_wait_async (aggpad->priv->timeout_id, _unresponsive_timeout, - gst_object_ref (aggpad), gst_object_unref); - } + if (gst_aggregator_iterate_sinkpads (self, + (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL)) + KICK_SRC_THREAD (self); GST_DEBUG_OBJECT (aggpad, "Done chaining"); diff --git a/libs/gst/base/gstaggregator.h b/libs/gst/base/gstaggregator.h index 8bf8fbd9af..19b04e982c 100644 --- a/libs/gst/base/gstaggregator.h +++ b/libs/gst/base/gstaggregator.h @@ -73,7 +73,6 @@ struct _GstAggregatorPad GstBuffer * buffer; GstSegment segment; gboolean eos; - gboolean unresponsive; /* < Private > */ GstAggregatorPadPrivate * priv; @@ -141,7 +140,7 @@ struct _GstAggregator GstClock * clock; /* properties */ - gint64 timeout; + gint64 latency; gpointer _gst_reserved[GST_PADDING]; }; @@ -189,6 +188,12 @@ struct _GstAggregator * Should be linked up first. Called when the element goes from * READY to PAUSED. The subclass should get ready to process * aggregated buffers. + * @get_next_time: Optional. + * Called when the element needs to know the time of the next + * rendered buffer for live pipelines. This causes deadline + * based aggregation to occur. Defaults to returning + * GST_CLOCK_TIME_NONE causing the element to wait for buffers + * on all sink pads before aggregating. * * The aggregator base class will handle in a thread-safe way all manners of * concurrent flushes, seeks, pad additions and removals, leaving to the @@ -238,6 +243,8 @@ struct _GstAggregatorClass { gboolean (*start) (GstAggregator * aggregator); + GstClockTime (*get_next_time) (GstAggregator * aggregator); + /*< private >*/ gpointer _gst_reserved[GST_PADDING]; };