aggregator: add a timeout property determining buffer wait time

Determines the amount of time that a pad will wait for a buffer before
being marked unresponsive.

Network sources may fail to produce buffers for an extended period of time,
currently causing the pipeline to stall possibly indefinitely, waiting for
these buffers to appear.

Subclasses should render unresponsive pads with either silence (audio), the
last (video) frame or what makes the most sense in the given context.
This commit is contained in:
Matthew Waters 2014-10-06 18:23:03 +11:00
parent c23cd9c3be
commit a41bc98b6e
2 changed files with 178 additions and 5 deletions

View file

@ -69,6 +69,9 @@
/* Might become API */
static void gst_aggregator_merge_tags (GstAggregator * aggregator,
const GstTagList * tags, GstTagMergeMode mode);
static void gst_aggregator_set_timeout (GstAggregator * agg, gint64 timeout);
static gint64 gst_aggregator_get_timeout (GstAggregator * agg);
GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
#define GST_CAT_DEFAULT aggregator_debug
@ -145,6 +148,8 @@ struct _GstAggregatorPadPrivate
gboolean pending_eos;
gboolean flushing;
GstClockID timeout_id;
GMutex event_lock;
GCond event_cond;
@ -232,6 +237,15 @@ typedef struct
gboolean one_actually_seeked;
} EventData;
#define DEFAULT_TIMEOUT -1
enum
{
PROP_0,
PROP_TIMEOUT,
PROP_LAST
};
/**
* gst_aggregator_iterate_sinkpads:
* @self: The #GstAggregator
@ -311,13 +325,18 @@ no_iter:
}
static inline gboolean
_check_all_pads_with_data_or_eos (GstAggregator * self,
_check_all_pads_with_data_or_eos_or_timeout (GstAggregator * self,
GstAggregatorPad * aggpad)
{
if (aggpad->buffer || aggpad->eos) {
return TRUE;
}
if (g_atomic_int_get (&aggpad->unresponsive) == TRUE) {
/* pad has been deemed unresponsive */
return TRUE;
}
GST_LOG_OBJECT (aggpad, "Not ready to be aggregated");
return FALSE;
@ -453,8 +472,8 @@ aggregate_func (GstAggregator * self)
GST_LOG_OBJECT (self, "Checking aggregate");
while (priv->send_eos && gst_aggregator_iterate_sinkpads (self,
(GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos,
NULL) && priv->running) {
(GstAggregatorPadForeachFunc)
_check_all_pads_with_data_or_eos_or_timeout, NULL) && priv->running) {
GST_TRACE_OBJECT (self, "Actually aggregating!");
priv->flow_return = klass->aggregate (self);
@ -714,10 +733,19 @@ eat:
}
static gboolean
_flush_pad (GstAggregator * self, GstAggregatorPad * pad, gpointer unused_udata)
_stop_pad (GstAggregator * self, GstAggregatorPad * pad, gpointer unused_udata)
{
_aggpad_flush (pad, self);
PAD_LOCK_EVENT (pad);
/* remove the timeouts */
if (pad->priv->timeout_id) {
gst_clock_id_unschedule (pad->priv->timeout_id);
gst_clock_id_unref (pad->priv->timeout_id);
pad->priv->timeout_id = NULL;
}
PAD_UNLOCK_EVENT (pad);
return TRUE;
}
@ -727,7 +755,7 @@ _stop (GstAggregator * agg)
_reset_flow_values (agg);
gst_aggregator_iterate_sinkpads (agg,
(GstAggregatorPadForeachFunc) _flush_pad, NULL);
(GstAggregatorPadForeachFunc) _stop_pad, NULL);
if (agg->priv->tags)
gst_tag_list_unref (agg->priv->tags);
@ -795,6 +823,37 @@ _release_pad (GstElement * element, GstPad * pad)
QUEUE_PUSH (self);
}
static gboolean
_unresponsive_timeout (GstClock * clock, GstClockTime time, GstClockID id,
gpointer user_data)
{
GstAggregatorPad *aggpad;
GstAggregator *self;
if (user_data == NULL)
return FALSE;
aggpad = GST_AGGREGATOR_PAD (user_data);
/* avoid holding the last reference to the parent element here */
PAD_LOCK_EVENT (aggpad);
self = GST_AGGREGATOR (gst_pad_get_parent (GST_PAD (aggpad)));
GST_DEBUG_OBJECT (aggpad, "marked unresponsive");
g_atomic_int_set (&aggpad->unresponsive, TRUE);
if (self) {
QUEUE_PUSH (self);
gst_object_unref (self);
}
PAD_UNLOCK_EVENT (aggpad);
return TRUE;
}
static GstPad *
_request_new_pad (GstElement * element,
GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
@ -827,6 +886,7 @@ _request_new_pad (GstElement * element,
agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type,
"name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
g_free (name);
GST_OBJECT_UNLOCK (element);
} else {
@ -1127,6 +1187,7 @@ gst_aggregator_finalize (GObject * object)
{
GstAggregator *self = (GstAggregator *) object;
gst_object_unref (self->clock);
g_mutex_clear (&self->priv->setcaps_lock);
G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
@ -1145,6 +1206,82 @@ gst_aggregator_dispose (GObject * object)
}
}
/**
* gst_aggregator_set_timeout:
* @agg: a #GstAggregator
* @timeout: the new timeout value.
*
* Sets the new timeout value to @timeout. This value is used to limit the
* amount of time a pad waits for data to appear before considering the pad
* as unresponsive.
*/
static void
gst_aggregator_set_timeout (GstAggregator * agg, gint64 timeout)
{
g_return_if_fail (GST_IS_AGGREGATOR (agg));
GST_OBJECT_LOCK (agg);
agg->timeout = timeout;
GST_OBJECT_UNLOCK (agg);
}
/**
* gst_aggregator_get_timeout:
* @agg: a #GstAggregator
*
* Gets the timeout value. See gst_aggregator_set_timeout for
* more details.
*
* Returns: The time in nanoseconds to wait for data to arrive on a sink pad
* before a pad is deemed unresponsive. A value of -1 means an
* unlimited time.
*/
static gint64
gst_aggregator_get_timeout (GstAggregator * agg)
{
gint64 res;
g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1);
GST_OBJECT_LOCK (agg);
res = agg->timeout;
GST_OBJECT_UNLOCK (agg);
return res;
}
static void
gst_aggregator_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
{
GstAggregator *agg = GST_AGGREGATOR (object);
switch (prop_id) {
case PROP_TIMEOUT:
gst_aggregator_set_timeout (agg, g_value_get_int64 (value));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_aggregator_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec)
{
GstAggregator *agg = GST_AGGREGATOR (object);
switch (prop_id) {
case PROP_TIMEOUT:
g_value_set_int64 (value, gst_aggregator_get_timeout (agg));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
/* GObject vmethods implementations */
static void
gst_aggregator_class_init (GstAggregatorClass * klass)
@ -1173,8 +1310,17 @@ gst_aggregator_class_init (GstAggregatorClass * klass)
gstelement_class->release_pad = GST_DEBUG_FUNCPTR (_release_pad);
gstelement_class->change_state = GST_DEBUG_FUNCPTR (_change_state);
gobject_class->set_property = gst_aggregator_set_property;
gobject_class->get_property = gst_aggregator_get_property;
gobject_class->finalize = gst_aggregator_finalize;
gobject_class->dispose = gst_aggregator_dispose;
g_object_class_install_property (gobject_class, PROP_TIMEOUT,
g_param_spec_int64 ("timeout", "Buffer timeout",
"Number of nanoseconds to wait for a buffer to arrive on a sink pad"
"before the pad is deemed unresponsive (-1 unlimited)", -1,
G_MAXINT64, DEFAULT_TIMEOUT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
}
static void
@ -1211,6 +1357,9 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
self->clock = gst_system_clock_obtain ();
self->timeout = -1;
g_mutex_init (&self->priv->setcaps_lock);
}
@ -1250,10 +1399,19 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
GstAggregatorPrivate *priv = self->priv;
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object);
GstClockTime timeout = gst_aggregator_get_timeout (self);
GstClockTime now;
GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
if (aggpad->priv->timeout_id) {
gst_clock_id_unschedule (aggpad->priv->timeout_id);
gst_clock_id_unref (aggpad->priv->timeout_id);
aggpad->priv->timeout_id = NULL;
}
g_atomic_int_set (&aggpad->unresponsive, FALSE);
PAD_STREAM_LOCK (aggpad);
if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
goto flushing;
@ -1261,6 +1419,7 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
goto eos;
PAD_LOCK_EVENT (aggpad);
if (aggpad->buffer) {
GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
PAD_WAIT_EVENT (aggpad);
@ -1283,6 +1442,14 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
QUEUE_PUSH (self);
if (GST_CLOCK_TIME_IS_VALID (timeout)) {
now = gst_clock_get_time (self->clock);
aggpad->priv->timeout_id =
gst_clock_new_single_shot_id (self->clock, now + timeout);
gst_clock_id_wait_async (aggpad->priv->timeout_id, _unresponsive_timeout,
gst_object_ref (aggpad), gst_object_unref);
}
GST_DEBUG_OBJECT (aggpad, "Done chaining");
return priv->flow_return;

View file

@ -73,6 +73,7 @@ struct _GstAggregatorPad
GstBuffer * buffer;
GstSegment segment;
gboolean eos;
gboolean unresponsive;
/* < Private > */
GstAggregatorPadPrivate * priv;
@ -137,6 +138,11 @@ struct _GstAggregator
/*< private >*/
GstAggregatorPrivate * priv;
GstClock * clock;
/* properties */
gint64 timeout;
gpointer _gst_reserved[GST_PADDING];
};