From efdcc6c8eb792cf526c61341dc4e5e7b5d64bff2 Mon Sep 17 00:00:00 2001 From: Matthew Waters Date: Mon, 6 Oct 2014 18:23:03 +1100 Subject: [PATCH] aggregator: add a timeout property determining buffer wait time Determines the amount of time that a pad will wait for a buffer before being marked unresponsive. Network sources may fail to produce buffers for an extended period of time, currently causing the pipeline to stall possibly indefinitely, waiting for these buffers to appear. Subclasses should render unresponsive pads with either silence (audio), the last (video) frame or what makes the most sense in the given context. --- libs/gst/base/gstaggregator.c | 177 +++++++++++++++++++++++++++++++++- libs/gst/base/gstaggregator.h | 6 ++ 2 files changed, 178 insertions(+), 5 deletions(-) diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index dce7180737..42c0d8e3b5 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -69,6 +69,9 @@ /* Might become API */ static void gst_aggregator_merge_tags (GstAggregator * aggregator, const GstTagList * tags, GstTagMergeMode mode); +static void gst_aggregator_set_timeout (GstAggregator * agg, gint64 timeout); +static gint64 gst_aggregator_get_timeout (GstAggregator * agg); + GST_DEBUG_CATEGORY_STATIC (aggregator_debug); #define GST_CAT_DEFAULT aggregator_debug @@ -145,6 +148,8 @@ struct _GstAggregatorPadPrivate gboolean pending_eos; gboolean flushing; + GstClockID timeout_id; + GMutex event_lock; GCond event_cond; @@ -232,6 +237,15 @@ typedef struct gboolean one_actually_seeked; } EventData; +#define DEFAULT_TIMEOUT -1 + +enum +{ + PROP_0, + PROP_TIMEOUT, + PROP_LAST +}; + /** * gst_aggregator_iterate_sinkpads: * @self: The #GstAggregator @@ -311,13 +325,18 @@ no_iter: } static inline gboolean -_check_all_pads_with_data_or_eos (GstAggregator * self, +_check_all_pads_with_data_or_eos_or_timeout (GstAggregator * self, GstAggregatorPad * aggpad) { if (aggpad->buffer || aggpad->eos) { return TRUE; } + if (g_atomic_int_get (&aggpad->unresponsive) == TRUE) { + /* pad has been deemed unresponsive */ + return TRUE; + } + GST_LOG_OBJECT (aggpad, "Not ready to be aggregated"); return FALSE; @@ -453,8 +472,8 @@ aggregate_func (GstAggregator * self) GST_LOG_OBJECT (self, "Checking aggregate"); while (priv->send_eos && gst_aggregator_iterate_sinkpads (self, - (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, - NULL) && priv->running) { + (GstAggregatorPadForeachFunc) + _check_all_pads_with_data_or_eos_or_timeout, NULL) && priv->running) { GST_TRACE_OBJECT (self, "Actually aggregating!"); priv->flow_return = klass->aggregate (self); @@ -714,10 +733,19 @@ eat: } static gboolean -_flush_pad (GstAggregator * self, GstAggregatorPad * pad, gpointer unused_udata) +_stop_pad (GstAggregator * self, GstAggregatorPad * pad, gpointer unused_udata) { _aggpad_flush (pad, self); + PAD_LOCK_EVENT (pad); + /* remove the timeouts */ + if (pad->priv->timeout_id) { + gst_clock_id_unschedule (pad->priv->timeout_id); + gst_clock_id_unref (pad->priv->timeout_id); + pad->priv->timeout_id = NULL; + } + PAD_UNLOCK_EVENT (pad); + return TRUE; } @@ -727,7 +755,7 @@ _stop (GstAggregator * agg) _reset_flow_values (agg); gst_aggregator_iterate_sinkpads (agg, - (GstAggregatorPadForeachFunc) _flush_pad, NULL); + (GstAggregatorPadForeachFunc) _stop_pad, NULL); if (agg->priv->tags) gst_tag_list_unref (agg->priv->tags); @@ -795,6 +823,37 @@ _release_pad (GstElement * element, GstPad * pad) QUEUE_PUSH (self); } +static gboolean +_unresponsive_timeout (GstClock * clock, GstClockTime time, GstClockID id, + gpointer user_data) +{ + GstAggregatorPad *aggpad; + GstAggregator *self; + + if (user_data == NULL) + return FALSE; + + aggpad = GST_AGGREGATOR_PAD (user_data); + + /* avoid holding the last reference to the parent element here */ + PAD_LOCK_EVENT (aggpad); + + self = GST_AGGREGATOR (gst_pad_get_parent (GST_PAD (aggpad))); + + GST_DEBUG_OBJECT (aggpad, "marked unresponsive"); + + g_atomic_int_set (&aggpad->unresponsive, TRUE); + + if (self) { + QUEUE_PUSH (self); + gst_object_unref (self); + } + + PAD_UNLOCK_EVENT (aggpad); + + return TRUE; +} + static GstPad * _request_new_pad (GstElement * element, GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps) @@ -827,6 +886,7 @@ _request_new_pad (GstElement * element, agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type, "name", name, "direction", GST_PAD_SINK, "template", templ, NULL); g_free (name); + GST_OBJECT_UNLOCK (element); } else { @@ -1127,6 +1187,7 @@ gst_aggregator_finalize (GObject * object) { GstAggregator *self = (GstAggregator *) object; + gst_object_unref (self->clock); g_mutex_clear (&self->priv->setcaps_lock); G_OBJECT_CLASS (aggregator_parent_class)->finalize (object); @@ -1145,6 +1206,82 @@ gst_aggregator_dispose (GObject * object) } } +/** + * gst_aggregator_set_timeout: + * @agg: a #GstAggregator + * @timeout: the new timeout value. + * + * Sets the new timeout value to @timeout. This value is used to limit the + * amount of time a pad waits for data to appear before considering the pad + * as unresponsive. + */ +static void +gst_aggregator_set_timeout (GstAggregator * agg, gint64 timeout) +{ + g_return_if_fail (GST_IS_AGGREGATOR (agg)); + + GST_OBJECT_LOCK (agg); + agg->timeout = timeout; + GST_OBJECT_UNLOCK (agg); +} + +/** + * gst_aggregator_get_timeout: + * @agg: a #GstAggregator + * + * Gets the timeout value. See gst_aggregator_set_timeout for + * more details. + * + * Returns: The time in nanoseconds to wait for data to arrive on a sink pad + * before a pad is deemed unresponsive. A value of -1 means an + * unlimited time. + */ +static gint64 +gst_aggregator_get_timeout (GstAggregator * agg) +{ + gint64 res; + + g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1); + + GST_OBJECT_LOCK (agg); + res = agg->timeout; + GST_OBJECT_UNLOCK (agg); + + return res; +} + +static void +gst_aggregator_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstAggregator *agg = GST_AGGREGATOR (object); + + switch (prop_id) { + case PROP_TIMEOUT: + gst_aggregator_set_timeout (agg, g_value_get_int64 (value)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_aggregator_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstAggregator *agg = GST_AGGREGATOR (object); + + switch (prop_id) { + case PROP_TIMEOUT: + g_value_set_int64 (value, gst_aggregator_get_timeout (agg)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + /* GObject vmethods implementations */ static void gst_aggregator_class_init (GstAggregatorClass * klass) @@ -1173,8 +1310,17 @@ gst_aggregator_class_init (GstAggregatorClass * klass) gstelement_class->release_pad = GST_DEBUG_FUNCPTR (_release_pad); gstelement_class->change_state = GST_DEBUG_FUNCPTR (_change_state); + gobject_class->set_property = gst_aggregator_set_property; + gobject_class->get_property = gst_aggregator_get_property; gobject_class->finalize = gst_aggregator_finalize; gobject_class->dispose = gst_aggregator_dispose; + + g_object_class_install_property (gobject_class, PROP_TIMEOUT, + g_param_spec_int64 ("timeout", "Buffer timeout", + "Number of nanoseconds to wait for a buffer to arrive on a sink pad" + "before the pad is deemed unresponsive (-1 unlimited)", -1, + G_MAXINT64, DEFAULT_TIMEOUT, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); } static void @@ -1211,6 +1357,9 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass) gst_element_add_pad (GST_ELEMENT (self), self->srcpad); + self->clock = gst_system_clock_obtain (); + self->timeout = -1; + g_mutex_init (&self->priv->setcaps_lock); } @@ -1250,10 +1399,19 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer) GstAggregatorPrivate *priv = self->priv; GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object); + GstClockTime timeout = gst_aggregator_get_timeout (self); + GstClockTime now; GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer); + if (aggpad->priv->timeout_id) { + gst_clock_id_unschedule (aggpad->priv->timeout_id); + gst_clock_id_unref (aggpad->priv->timeout_id); + aggpad->priv->timeout_id = NULL; + } + g_atomic_int_set (&aggpad->unresponsive, FALSE); PAD_STREAM_LOCK (aggpad); + if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) goto flushing; @@ -1261,6 +1419,7 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer) goto eos; PAD_LOCK_EVENT (aggpad); + if (aggpad->buffer) { GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); PAD_WAIT_EVENT (aggpad); @@ -1283,6 +1442,14 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer) QUEUE_PUSH (self); + if (GST_CLOCK_TIME_IS_VALID (timeout)) { + now = gst_clock_get_time (self->clock); + aggpad->priv->timeout_id = + gst_clock_new_single_shot_id (self->clock, now + timeout); + gst_clock_id_wait_async (aggpad->priv->timeout_id, _unresponsive_timeout, + gst_object_ref (aggpad), gst_object_unref); + } + GST_DEBUG_OBJECT (aggpad, "Done chaining"); return priv->flow_return; diff --git a/libs/gst/base/gstaggregator.h b/libs/gst/base/gstaggregator.h index 507da136cb..837fdbb34f 100644 --- a/libs/gst/base/gstaggregator.h +++ b/libs/gst/base/gstaggregator.h @@ -73,6 +73,7 @@ struct _GstAggregatorPad GstBuffer * buffer; GstSegment segment; gboolean eos; + gboolean unresponsive; /* < Private > */ GstAggregatorPadPrivate * priv; @@ -137,6 +138,11 @@ struct _GstAggregator /*< private >*/ GstAggregatorPrivate * priv; + GstClock * clock; + + /* properties */ + gint64 timeout; + gpointer _gst_reserved[GST_PADDING]; };