aggregator: make the src pad task drive the pipeline for live pipelines

This removes the uses of GAsyncQueue and replaces it with explicit
GMutex, GCond and wakeup count which is used for the non-live case.

For live pipelines, the aggregator waits on the clock until either
data arrives on all sink pads or the expected output buffer time
arrives plus the timeout/latency at which time, the subclass
produces a buffer.

https://bugzilla.gnome.org/show_bug.cgi?id=741146
This commit is contained in:
Matthew Waters 2014-12-05 18:19:54 +11:00 committed by Tim-Philipp Müller
parent d391c9cfed
commit f4b86a6d8c
2 changed files with 206 additions and 158 deletions

View file

@ -69,8 +69,9 @@
/* Might become API */ /* Might become API */
static void gst_aggregator_merge_tags (GstAggregator * aggregator, static void gst_aggregator_merge_tags (GstAggregator * aggregator,
const GstTagList * tags, GstTagMergeMode mode); const GstTagList * tags, GstTagMergeMode mode);
static void gst_aggregator_set_timeout (GstAggregator * agg, gint64 timeout); static void gst_aggregator_set_latency_property (GstAggregator * agg,
static gint64 gst_aggregator_get_timeout (GstAggregator * agg); gint64 latency);
static gint64 gst_aggregator_get_latency_property (GstAggregator * agg);
GST_DEBUG_CATEGORY_STATIC (aggregator_debug); GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
@ -141,6 +142,48 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
g_thread_self()); \ g_thread_self()); \
} G_STMT_END } G_STMT_END
#define SRC_STREAM_LOCK(self) G_STMT_START { \
GST_LOG_OBJECT (self, "Taking src STREAM lock from thread %p", \
g_thread_self()); \
g_mutex_lock(&self->priv->src_lock); \
GST_LOG_OBJECT (self, "Took src STREAM lock from thread %p", \
g_thread_self()); \
} G_STMT_END
#define SRC_STREAM_UNLOCK(self) G_STMT_START { \
GST_LOG_OBJECT (self, "Releasing src STREAM lock from thread %p", \
g_thread_self()); \
g_mutex_unlock(&self->priv->src_lock); \
GST_LOG_OBJECT (self, "Release src STREAM lock from thread %p", \
g_thread_self()); \
} G_STMT_END
#define SRC_STREAM_WAIT(self) G_STMT_START { \
GST_LOG_OBJECT (self, "Waiting for src STREAM on thread %p", \
g_thread_self()); \
g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock)); \
GST_LOG_OBJECT (self, "DONE Waiting for src STREAM on thread %p", \
g_thread_self()); \
} G_STMT_END
#define SRC_STREAM_BROADCAST(self) { \
GST_LOG_OBJECT (self, "Signaling src STREAM from thread %p", \
g_thread_self()); \
g_cond_broadcast(&(self->priv->src_cond)); \
}
#define KICK_SRC_THREAD(self) \
do { \
SRC_STREAM_LOCK (self); \
GST_LOG_OBJECT (self, "kicking src STREAM from thread %p", \
g_thread_self ()); \
if (self->priv->aggregate_id) \
gst_clock_id_unschedule (self->priv->aggregate_id); \
self->priv->n_kicks++; \
SRC_STREAM_BROADCAST (self); \
SRC_STREAM_UNLOCK (self); \
} while (0)
struct _GstAggregatorPadPrivate struct _GstAggregatorPadPrivate
{ {
gboolean pending_flush_start; gboolean pending_flush_start;
@ -148,8 +191,6 @@ struct _GstAggregatorPadPrivate
gboolean pending_eos; gboolean pending_eos;
gboolean flushing; gboolean flushing;
GstClockID timeout_id;
GMutex event_lock; GMutex event_lock;
GCond event_cond; GCond event_cond;
@ -175,38 +216,10 @@ _aggpad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
*************************************/ *************************************/
static GstElementClass *aggregator_parent_class = NULL; static GstElementClass *aggregator_parent_class = NULL;
#define AGGREGATOR_QUEUE(self) (((GstAggregator*)self)->priv->queue)
#define QUEUE_PUSH(self) G_STMT_START { \
GST_LOG_OBJECT (self, "Pushing to QUEUE in thread %p", \
g_thread_self()); \
g_async_queue_push (AGGREGATOR_QUEUE (self), GINT_TO_POINTER (1)); \
} G_STMT_END
#define QUEUE_POP(self) G_STMT_START { \
GST_LOG_OBJECT (self, "Waiting on QUEUE in thread %p", \
g_thread_self()); \
g_async_queue_pop (AGGREGATOR_QUEUE (self)); \
GST_LOG_OBJECT (self, "Waited on QUEUE in thread %p", \
g_thread_self()); \
} G_STMT_END
#define QUEUE_FLUSH(self) G_STMT_START { \
GST_LOG_OBJECT (self, "Flushing QUEUE in thread %p", \
g_thread_self()); \
g_async_queue_lock (AGGREGATOR_QUEUE (self)); \
while (g_async_queue_try_pop_unlocked (AGGREGATOR_QUEUE (self))); \
g_async_queue_unlock (AGGREGATOR_QUEUE (self)); \
GST_LOG_OBJECT (self, "Flushed QUEUE in thread %p", \
g_thread_self()); \
} G_STMT_END
struct _GstAggregatorPrivate struct _GstAggregatorPrivate
{ {
gint padcount; gint padcount;
GAsyncQueue *queue;
/* Our state is >= PAUSED */ /* Our state is >= PAUSED */
gboolean running; gboolean running;
@ -230,6 +243,12 @@ struct _GstAggregatorPrivate
gboolean latency_live; gboolean latency_live;
GstClockTime latency_min; GstClockTime latency_min;
GstClockTime latency_max; GstClockTime latency_max;
/* aggregate */
GstClockID aggregate_id;
gint n_kicks;
GMutex src_lock;
GCond src_cond;
}; };
typedef struct typedef struct
@ -241,12 +260,12 @@ typedef struct
gboolean one_actually_seeked; gboolean one_actually_seeked;
} EventData; } EventData;
#define DEFAULT_TIMEOUT -1 #define DEFAULT_LATENCY -1
enum enum
{ {
PROP_0, PROP_0,
PROP_TIMEOUT, PROP_LATENCY,
PROP_LAST PROP_LAST
}; };
@ -329,18 +348,13 @@ no_iter:
} }
static inline gboolean static inline gboolean
_check_all_pads_with_data_or_eos_or_timeout (GstAggregator * self, _check_all_pads_with_data_or_eos (GstAggregator * self,
GstAggregatorPad * aggpad) GstAggregatorPad * aggpad, gpointer user_data)
{ {
if (aggpad->buffer || aggpad->eos) { if (aggpad->buffer || aggpad->eos) {
return TRUE; return TRUE;
} }
if (g_atomic_int_get (&aggpad->unresponsive) == TRUE) {
/* pad has been deemed unresponsive */
return TRUE;
}
GST_LOG_OBJECT (aggpad, "Not ready to be aggregated"); GST_LOG_OBJECT (aggpad, "Not ready to be aggregated");
return FALSE; return FALSE;
@ -460,6 +474,95 @@ _push_eos (GstAggregator * self)
gst_pad_push_event (self->srcpad, event); gst_pad_push_event (self->srcpad, event);
} }
static GstClockTime
gst_aggregator_get_next_time (GstAggregator * self)
{
GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
if (klass->get_next_time)
return klass->get_next_time (self);
return GST_CLOCK_TIME_NONE;
}
/* called with the src STREAM lock */
static gboolean
_wait_and_check (GstAggregator * self)
{
GstClockTime latency_max, latency_min;
GstClockTime start;
gboolean live;
gst_aggregator_get_latency (self, &live, &latency_min, &latency_max);
if (gst_aggregator_iterate_sinkpads (self,
(GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos,
NULL)) {
GST_DEBUG_OBJECT (self, "all pads have data");
return TRUE;
}
SRC_STREAM_LOCK (self);
start = gst_aggregator_get_next_time (self);
if (!live || !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self))
|| !GST_CLOCK_TIME_IS_VALID (start)) {
while (self->priv->n_kicks <= 0)
SRC_STREAM_WAIT (self);
self->priv->n_kicks--;
} else {
GstClockTime time;
GstClockReturn status;
GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT,
GST_TIME_ARGS (start));
time = GST_ELEMENT_CAST (self)->base_time + start;
if (GST_CLOCK_TIME_IS_VALID (latency_max)) {
time += latency_max;
} else if (GST_CLOCK_TIME_IS_VALID (latency_min)) {
time += latency_min;
} else {
time += self->latency;
}
GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %"
GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT
" latency max %" GST_TIME_FORMAT " latency min %" GST_TIME_FORMAT
" current %" GST_TIME_FORMAT ")", GST_TIME_ARGS (time),
GST_TIME_ARGS (GST_ELEMENT_CAST (self)->base_time),
GST_TIME_ARGS (start), GST_TIME_ARGS (latency_max),
GST_TIME_ARGS (latency_min),
GST_TIME_ARGS (gst_clock_get_time (GST_ELEMENT_CLOCK (self))));
self->priv->aggregate_id =
gst_clock_new_single_shot_id (GST_ELEMENT_CLOCK (self), time);
SRC_STREAM_UNLOCK (self);
status = gst_clock_id_wait (self->priv->aggregate_id, NULL);
SRC_STREAM_LOCK (self);
if (self->priv->aggregate_id) {
gst_clock_id_unref (self->priv->aggregate_id);
self->priv->aggregate_id = NULL;
}
self->priv->n_kicks--;
GST_DEBUG_OBJECT (self, "clock returned %d", status);
/* we timed out */
if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) {
SRC_STREAM_UNLOCK (self);
return TRUE;
}
}
SRC_STREAM_UNLOCK (self);
return gst_aggregator_iterate_sinkpads (self,
(GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL);
}
static void static void
aggregate_func (GstAggregator * self) aggregate_func (GstAggregator * self)
{ {
@ -468,22 +571,19 @@ aggregate_func (GstAggregator * self)
if (self->priv->running == FALSE) { if (self->priv->running == FALSE) {
GST_DEBUG_OBJECT (self, "Not running anymore"); GST_DEBUG_OBJECT (self, "Not running anymore");
return; return;
} }
QUEUE_POP (self);
GST_LOG_OBJECT (self, "Checking aggregate"); GST_LOG_OBJECT (self, "Checking aggregate");
while (priv->send_eos && gst_aggregator_iterate_sinkpads (self, while (priv->send_eos && priv->running) {
(GstAggregatorPadForeachFunc) if (!_wait_and_check (self))
_check_all_pads_with_data_or_eos_or_timeout, NULL) && priv->running) { continue;
GST_TRACE_OBJECT (self, "Actually aggregating!"); GST_TRACE_OBJECT (self, "Actually aggregating!");
priv->flow_return = klass->aggregate (self); priv->flow_return = klass->aggregate (self);
if (priv->flow_return == GST_FLOW_EOS) { if (priv->flow_return == GST_FLOW_EOS) {
QUEUE_FLUSH (self);
_push_eos (self); _push_eos (self);
} }
@ -497,7 +597,6 @@ aggregate_func (GstAggregator * self)
if (priv->flow_return != GST_FLOW_OK) if (priv->flow_return != GST_FLOW_OK)
break; break;
} }
} }
static gboolean static gboolean
@ -528,14 +627,14 @@ _stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
flush_start ? "Pausing" : "Stopping"); flush_start ? "Pausing" : "Stopping");
self->priv->running = FALSE; self->priv->running = FALSE;
QUEUE_PUSH (self); KICK_SRC_THREAD (self);
if (flush_start) { if (flush_start) {
res = gst_pad_push_event (self->srcpad, flush_start); res = gst_pad_push_event (self->srcpad, flush_start);
} }
gst_pad_stop_task (self->srcpad); gst_pad_stop_task (self->srcpad);
QUEUE_FLUSH (self); KICK_SRC_THREAD (self);
return res; return res;
} }
@ -546,6 +645,7 @@ _start_srcpad_task (GstAggregator * self)
GST_INFO_OBJECT (self, "Starting srcpad task"); GST_INFO_OBJECT (self, "Starting srcpad task");
self->priv->running = TRUE; self->priv->running = TRUE;
self->priv->n_kicks = 0;
gst_pad_start_task (GST_PAD (self->srcpad), gst_pad_start_task (GST_PAD (self->srcpad),
(GstTaskFunction) aggregate_func, self, NULL); (GstTaskFunction) aggregate_func, self, NULL);
} }
@ -662,7 +762,7 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
gst_pad_push_event (self->srcpad, event); gst_pad_push_event (self->srcpad, event);
priv->send_eos = TRUE; priv->send_eos = TRUE;
event = NULL; event = NULL;
QUEUE_PUSH (self); KICK_SRC_THREAD (self);
GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK"); GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
GST_PAD_STREAM_UNLOCK (self->srcpad); GST_PAD_STREAM_UNLOCK (self->srcpad);
@ -690,7 +790,7 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
} }
PAD_UNLOCK_EVENT (aggpad); PAD_UNLOCK_EVENT (aggpad);
QUEUE_PUSH (self); KICK_SRC_THREAD (self);
goto eat; goto eat;
} }
case GST_EVENT_SEGMENT: case GST_EVENT_SEGMENT:
@ -741,15 +841,6 @@ _stop_pad (GstAggregator * self, GstAggregatorPad * pad, gpointer unused_udata)
{ {
_aggpad_flush (pad, self); _aggpad_flush (pad, self);
PAD_LOCK_EVENT (pad);
/* remove the timeouts */
if (pad->priv->timeout_id) {
gst_clock_id_unschedule (pad->priv->timeout_id);
gst_clock_id_unref (pad->priv->timeout_id);
pad->priv->timeout_id = NULL;
}
PAD_UNLOCK_EVENT (pad);
return TRUE; return TRUE;
} }
@ -811,9 +902,9 @@ failure:
static void static void
_release_pad (GstElement * element, GstPad * pad) _release_pad (GstElement * element, GstPad * pad)
{ {
GstAggregator *self = GST_AGGREGATOR (element);
GstBuffer *tmpbuf; GstBuffer *tmpbuf;
GstAggregator *self = GST_AGGREGATOR (element);
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
GST_INFO_OBJECT (pad, "Removing pad"); GST_INFO_OBJECT (pad, "Removing pad");
@ -823,39 +914,7 @@ _release_pad (GstElement * element, GstPad * pad)
gst_buffer_replace (&tmpbuf, NULL); gst_buffer_replace (&tmpbuf, NULL);
gst_element_remove_pad (element, pad); gst_element_remove_pad (element, pad);
/* Something changed make sure we try to aggregate */ KICK_SRC_THREAD (self);
QUEUE_PUSH (self);
}
static gboolean
_unresponsive_timeout (GstClock * clock, GstClockTime time, GstClockID id,
gpointer user_data)
{
GstAggregatorPad *aggpad;
GstAggregator *self;
if (user_data == NULL)
return FALSE;
aggpad = GST_AGGREGATOR_PAD (user_data);
/* avoid holding the last reference to the parent element here */
PAD_LOCK_EVENT (aggpad);
self = GST_AGGREGATOR (gst_pad_get_parent (GST_PAD (aggpad)));
GST_DEBUG_OBJECT (aggpad, "marked unresponsive");
g_atomic_int_set (&aggpad->unresponsive, TRUE);
if (self) {
QUEUE_PUSH (self);
gst_object_unref (self);
}
PAD_UNLOCK_EVENT (aggpad);
return TRUE;
} }
static GstPad * static GstPad *
@ -970,11 +1029,11 @@ gst_aggregator_get_latency (GstAggregator * self, gboolean * live,
min = self->priv->latency_min; min = self->priv->latency_min;
max = self->priv->latency_max; max = self->priv->latency_max;
if (GST_CLOCK_TIME_IS_VALID (self->timeout)) { if (GST_CLOCK_TIME_IS_VALID (self->latency)) {
if (GST_CLOCK_TIME_IS_VALID (min)) if (GST_CLOCK_TIME_IS_VALID (min))
min += self->timeout; min += self->latency;
if (GST_CLOCK_TIME_IS_VALID (max)) if (GST_CLOCK_TIME_IS_VALID (max))
max += self->timeout; max += self->latency;
} }
if (live) if (live)
@ -998,13 +1057,13 @@ gst_aggregator_query_latency (GstAggregator * self, GstQuery * query)
gst_aggregator_iterate_sinkpads (self, gst_aggregator_iterate_sinkpads (self,
(GstAggregatorPadForeachFunc) _latency_query, &data); (GstAggregatorPadForeachFunc) _latency_query, &data);
if (data.live && GST_CLOCK_TIME_IS_VALID (self->timeout) && if (data.live && GST_CLOCK_TIME_IS_VALID (self->latency) &&
self->timeout > data.max) { self->latency > data.max) {
GST_ELEMENT_WARNING (self, CORE, NEGOTIATION, GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
("%s", "Timeout too big"), ("%s", "Latency too big"),
("The requested timeout value is too big for the latency in the " ("The requested latency value is too big for the current pipeline. "
"current pipeline. Limiting to %" G_GINT64_FORMAT, data.max)); "Limiting to %" G_GINT64_FORMAT, data.max));
self->timeout = data.max; self->latency = data.max;
} }
self->priv->latency_live = data.live; self->priv->latency_live = data.live;
@ -1012,11 +1071,11 @@ gst_aggregator_query_latency (GstAggregator * self, GstQuery * query)
self->priv->latency_max = data.max; self->priv->latency_max = data.max;
/* add our own */ /* add our own */
if (GST_CLOCK_TIME_IS_VALID (self->timeout)) { if (GST_CLOCK_TIME_IS_VALID (self->latency)) {
if (GST_CLOCK_TIME_IS_VALID (data.min)) if (GST_CLOCK_TIME_IS_VALID (data.min))
data.min += self->timeout; data.min += self->latency;
if (GST_CLOCK_TIME_IS_VALID (data.max)) if (GST_CLOCK_TIME_IS_VALID (data.max))
data.max += self->timeout; data.max += self->latency;
} }
GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
@ -1317,6 +1376,8 @@ gst_aggregator_finalize (GObject * object)
gst_object_unref (self->clock); gst_object_unref (self->clock);
g_mutex_clear (&self->priv->setcaps_lock); g_mutex_clear (&self->priv->setcaps_lock);
g_mutex_clear (&self->priv->src_lock);
g_cond_clear (&self->priv->src_cond);
G_OBJECT_CLASS (aggregator_parent_class)->finalize (object); G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
} }
@ -1324,51 +1385,44 @@ gst_aggregator_finalize (GObject * object)
static void static void
gst_aggregator_dispose (GObject * object) gst_aggregator_dispose (GObject * object)
{ {
GstAggregator *self = (GstAggregator *) object;
G_OBJECT_CLASS (aggregator_parent_class)->dispose (object); G_OBJECT_CLASS (aggregator_parent_class)->dispose (object);
if (AGGREGATOR_QUEUE (self)) {
g_async_queue_unref (AGGREGATOR_QUEUE (self));
AGGREGATOR_QUEUE (self) = NULL;
}
} }
/** /*
* gst_aggregator_set_timeout: * gst_aggregator_set_latency_property:
* @agg: a #GstAggregator * @agg: a #GstAggregator
* @timeout: the new timeout value. * @latency: the new latency value.
* *
* Sets the new timeout value to @timeout. This value is used to limit the * Sets the new latency value to @latency. This value is used to limit the
* amount of time a pad waits for data to appear before considering the pad * amount of time a pad waits for data to appear before considering the pad
* as unresponsive. * as unresponsive.
*/ */
static void static void
gst_aggregator_set_timeout (GstAggregator * self, gint64 timeout) gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency)
{ {
g_return_if_fail (GST_IS_AGGREGATOR (self)); g_return_if_fail (GST_IS_AGGREGATOR (self));
GST_OBJECT_LOCK (self); GST_OBJECT_LOCK (self);
if (self->priv->latency_live && self->priv->latency_max != 0 && if (self->priv->latency_live && self->priv->latency_max != 0 &&
GST_CLOCK_TIME_IS_VALID (timeout) && timeout > self->priv->latency_max) { GST_CLOCK_TIME_IS_VALID (latency) && latency > self->priv->latency_max) {
GST_ELEMENT_WARNING (self, CORE, NEGOTIATION, GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
("%s", "Timeout too big"), ("%s", "Latency too big"),
("The requested timeout value is too big for the latency in the " ("The requested latency value is too big for the latency in the "
"current pipeline. Limiting to %" G_GINT64_FORMAT, "current pipeline. Limiting to %" G_GINT64_FORMAT,
self->priv->latency_max)); self->priv->latency_max));
timeout = self->priv->latency_max; latency = self->priv->latency_max;
} }
self->timeout = timeout; self->latency = latency;
GST_OBJECT_UNLOCK (self); GST_OBJECT_UNLOCK (self);
} }
/** /*
* gst_aggregator_get_timeout: * gst_aggregator_get_latency_property:
* @agg: a #GstAggregator * @agg: a #GstAggregator
* *
* Gets the timeout value. See gst_aggregator_set_timeout for * Gets the latency value. See gst_aggregator_set_latency for
* more details. * more details.
* *
* Returns: The time in nanoseconds to wait for data to arrive on a sink pad * Returns: The time in nanoseconds to wait for data to arrive on a sink pad
@ -1376,14 +1430,14 @@ gst_aggregator_set_timeout (GstAggregator * self, gint64 timeout)
* unlimited time. * unlimited time.
*/ */
static gint64 static gint64
gst_aggregator_get_timeout (GstAggregator * agg) gst_aggregator_get_latency_property (GstAggregator * agg)
{ {
gint64 res; gint64 res;
g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1); g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1);
GST_OBJECT_LOCK (agg); GST_OBJECT_LOCK (agg);
res = agg->timeout; res = agg->latency;
GST_OBJECT_UNLOCK (agg); GST_OBJECT_UNLOCK (agg);
return res; return res;
@ -1396,8 +1450,8 @@ gst_aggregator_set_property (GObject * object, guint prop_id,
GstAggregator *agg = GST_AGGREGATOR (object); GstAggregator *agg = GST_AGGREGATOR (object);
switch (prop_id) { switch (prop_id) {
case PROP_TIMEOUT: case PROP_LATENCY:
gst_aggregator_set_timeout (agg, g_value_get_int64 (value)); gst_aggregator_set_latency_property (agg, g_value_get_int64 (value));
break; break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@ -1412,8 +1466,8 @@ gst_aggregator_get_property (GObject * object, guint prop_id,
GstAggregator *agg = GST_AGGREGATOR (object); GstAggregator *agg = GST_AGGREGATOR (object);
switch (prop_id) { switch (prop_id) {
case PROP_TIMEOUT: case PROP_LATENCY:
g_value_set_int64 (value, gst_aggregator_get_timeout (agg)); g_value_set_int64 (value, gst_aggregator_get_latency_property (agg));
break; break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@ -1454,12 +1508,12 @@ gst_aggregator_class_init (GstAggregatorClass * klass)
gobject_class->finalize = gst_aggregator_finalize; gobject_class->finalize = gst_aggregator_finalize;
gobject_class->dispose = gst_aggregator_dispose; gobject_class->dispose = gst_aggregator_dispose;
g_object_class_install_property (gobject_class, PROP_TIMEOUT, g_object_class_install_property (gobject_class, PROP_LATENCY,
g_param_spec_int64 ("timeout", "Buffer timeout", g_param_spec_int64 ("latency", "Buffer latency",
"Number of nanoseconds to wait for a buffer to arrive on a sink pad" "Number of nanoseconds to wait for a buffer to arrive on a sink pad"
"before the pad is deemed unresponsive (-1 unlimited)", -1, "before the pad is deemed unresponsive (-1 unlimited)", -1,
(G_MAXLONG == G_MAXINT64) ? G_MAXINT64 : (G_MAXLONG * GST_SECOND - 1), (G_MAXLONG == G_MAXINT64) ? G_MAXINT64 : (G_MAXLONG * GST_SECOND - 1),
DEFAULT_TIMEOUT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
} }
static void static void
@ -1488,7 +1542,6 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
self->priv->latency_max = GST_CLOCK_TIME_NONE; self->priv->latency_max = GST_CLOCK_TIME_NONE;
_reset_flow_values (self); _reset_flow_values (self);
AGGREGATOR_QUEUE (self) = g_async_queue_new ();
self->srcpad = gst_pad_new_from_template (pad_template, "src"); self->srcpad = gst_pad_new_from_template (pad_template, "src");
gst_pad_set_event_function (self->srcpad, gst_pad_set_event_function (self->srcpad,
@ -1501,9 +1554,11 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
gst_element_add_pad (GST_ELEMENT (self), self->srcpad); gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
self->clock = gst_system_clock_obtain (); self->clock = gst_system_clock_obtain ();
self->timeout = -1; self->latency = -1;
g_mutex_init (&self->priv->setcaps_lock); g_mutex_init (&self->priv->setcaps_lock);
g_mutex_init (&self->priv->src_lock);
g_cond_init (&self->priv->src_cond);
} }
/* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
@ -1542,16 +1597,8 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
GstAggregatorPrivate *priv = self->priv; GstAggregatorPrivate *priv = self->priv;
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object); GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object);
GstClockTime timeout = gst_aggregator_get_timeout (self);
GstClockTime now;
GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer); GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
if (aggpad->priv->timeout_id) {
gst_clock_id_unschedule (aggpad->priv->timeout_id);
gst_clock_id_unref (aggpad->priv->timeout_id);
aggpad->priv->timeout_id = NULL;
}
g_atomic_int_set (&aggpad->unresponsive, FALSE);
PAD_STREAM_LOCK (aggpad); PAD_STREAM_LOCK (aggpad);
@ -1583,15 +1630,9 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
PAD_UNLOCK_EVENT (aggpad); PAD_UNLOCK_EVENT (aggpad);
PAD_STREAM_UNLOCK (aggpad); PAD_STREAM_UNLOCK (aggpad);
QUEUE_PUSH (self); if (gst_aggregator_iterate_sinkpads (self,
(GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL))
if (GST_CLOCK_TIME_IS_VALID (timeout)) { KICK_SRC_THREAD (self);
now = gst_clock_get_time (self->clock);
aggpad->priv->timeout_id =
gst_clock_new_single_shot_id (self->clock, now + timeout);
gst_clock_id_wait_async (aggpad->priv->timeout_id, _unresponsive_timeout,
gst_object_ref (aggpad), gst_object_unref);
}
GST_DEBUG_OBJECT (aggpad, "Done chaining"); GST_DEBUG_OBJECT (aggpad, "Done chaining");

View file

@ -73,7 +73,6 @@ struct _GstAggregatorPad
GstBuffer * buffer; GstBuffer * buffer;
GstSegment segment; GstSegment segment;
gboolean eos; gboolean eos;
gboolean unresponsive;
/* < Private > */ /* < Private > */
GstAggregatorPadPrivate * priv; GstAggregatorPadPrivate * priv;
@ -141,7 +140,7 @@ struct _GstAggregator
GstClock * clock; GstClock * clock;
/* properties */ /* properties */
gint64 timeout; gint64 latency;
gpointer _gst_reserved[GST_PADDING]; gpointer _gst_reserved[GST_PADDING];
}; };
@ -189,6 +188,12 @@ struct _GstAggregator
* Should be linked up first. Called when the element goes from * Should be linked up first. Called when the element goes from
* READY to PAUSED. The subclass should get ready to process * READY to PAUSED. The subclass should get ready to process
* aggregated buffers. * aggregated buffers.
* @get_next_time: Optional.
* Called when the element needs to know the time of the next
* rendered buffer for live pipelines. This causes deadline
* based aggregation to occur. Defaults to returning
* GST_CLOCK_TIME_NONE causing the element to wait for buffers
* on all sink pads before aggregating.
* *
* The aggregator base class will handle in a thread-safe way all manners of * The aggregator base class will handle in a thread-safe way all manners of
* concurrent flushes, seeks, pad additions and removals, leaving to the * concurrent flushes, seeks, pad additions and removals, leaving to the
@ -238,6 +243,8 @@ struct _GstAggregatorClass {
gboolean (*start) (GstAggregator * aggregator); gboolean (*start) (GstAggregator * aggregator);
GstClockTime (*get_next_time) (GstAggregator * aggregator);
/*< private >*/ /*< private >*/
gpointer _gst_reserved[GST_PADDING]; gpointer _gst_reserved[GST_PADDING];
}; };