diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index dce7180737..42c0d8e3b5 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -69,6 +69,9 @@ /* Might become API */ static void gst_aggregator_merge_tags (GstAggregator * aggregator, const GstTagList * tags, GstTagMergeMode mode); +static void gst_aggregator_set_timeout (GstAggregator * agg, gint64 timeout); +static gint64 gst_aggregator_get_timeout (GstAggregator * agg); + GST_DEBUG_CATEGORY_STATIC (aggregator_debug); #define GST_CAT_DEFAULT aggregator_debug @@ -145,6 +148,8 @@ struct _GstAggregatorPadPrivate gboolean pending_eos; gboolean flushing; + GstClockID timeout_id; + GMutex event_lock; GCond event_cond; @@ -232,6 +237,15 @@ typedef struct gboolean one_actually_seeked; } EventData; +#define DEFAULT_TIMEOUT -1 + +enum +{ + PROP_0, + PROP_TIMEOUT, + PROP_LAST +}; + /** * gst_aggregator_iterate_sinkpads: * @self: The #GstAggregator @@ -311,13 +325,18 @@ no_iter: } static inline gboolean -_check_all_pads_with_data_or_eos (GstAggregator * self, +_check_all_pads_with_data_or_eos_or_timeout (GstAggregator * self, GstAggregatorPad * aggpad) { if (aggpad->buffer || aggpad->eos) { return TRUE; } + if (g_atomic_int_get (&aggpad->unresponsive) == TRUE) { + /* pad has been deemed unresponsive */ + return TRUE; + } + GST_LOG_OBJECT (aggpad, "Not ready to be aggregated"); return FALSE; @@ -453,8 +472,8 @@ aggregate_func (GstAggregator * self) GST_LOG_OBJECT (self, "Checking aggregate"); while (priv->send_eos && gst_aggregator_iterate_sinkpads (self, - (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, - NULL) && priv->running) { + (GstAggregatorPadForeachFunc) + _check_all_pads_with_data_or_eos_or_timeout, NULL) && priv->running) { GST_TRACE_OBJECT (self, "Actually aggregating!"); priv->flow_return = klass->aggregate (self); @@ -714,10 +733,19 @@ eat: } static gboolean -_flush_pad (GstAggregator * self, GstAggregatorPad * pad, gpointer unused_udata) +_stop_pad (GstAggregator * self, GstAggregatorPad * pad, gpointer unused_udata) { _aggpad_flush (pad, self); + PAD_LOCK_EVENT (pad); + /* remove the timeouts */ + if (pad->priv->timeout_id) { + gst_clock_id_unschedule (pad->priv->timeout_id); + gst_clock_id_unref (pad->priv->timeout_id); + pad->priv->timeout_id = NULL; + } + PAD_UNLOCK_EVENT (pad); + return TRUE; } @@ -727,7 +755,7 @@ _stop (GstAggregator * agg) _reset_flow_values (agg); gst_aggregator_iterate_sinkpads (agg, - (GstAggregatorPadForeachFunc) _flush_pad, NULL); + (GstAggregatorPadForeachFunc) _stop_pad, NULL); if (agg->priv->tags) gst_tag_list_unref (agg->priv->tags); @@ -795,6 +823,37 @@ _release_pad (GstElement * element, GstPad * pad) QUEUE_PUSH (self); } +static gboolean +_unresponsive_timeout (GstClock * clock, GstClockTime time, GstClockID id, + gpointer user_data) +{ + GstAggregatorPad *aggpad; + GstAggregator *self; + + if (user_data == NULL) + return FALSE; + + aggpad = GST_AGGREGATOR_PAD (user_data); + + /* avoid holding the last reference to the parent element here */ + PAD_LOCK_EVENT (aggpad); + + self = GST_AGGREGATOR (gst_pad_get_parent (GST_PAD (aggpad))); + + GST_DEBUG_OBJECT (aggpad, "marked unresponsive"); + + g_atomic_int_set (&aggpad->unresponsive, TRUE); + + if (self) { + QUEUE_PUSH (self); + gst_object_unref (self); + } + + PAD_UNLOCK_EVENT (aggpad); + + return TRUE; +} + static GstPad * _request_new_pad (GstElement * element, GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps) @@ -827,6 +886,7 @@ _request_new_pad (GstElement * element, agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type, "name", name, "direction", GST_PAD_SINK, "template", templ, NULL); g_free (name); + GST_OBJECT_UNLOCK (element); } else { @@ -1127,6 +1187,7 @@ gst_aggregator_finalize (GObject * object) { GstAggregator *self = (GstAggregator *) object; + gst_object_unref (self->clock); g_mutex_clear (&self->priv->setcaps_lock); G_OBJECT_CLASS (aggregator_parent_class)->finalize (object); @@ -1145,6 +1206,82 @@ gst_aggregator_dispose (GObject * object) } } +/** + * gst_aggregator_set_timeout: + * @agg: a #GstAggregator + * @timeout: the new timeout value. + * + * Sets the new timeout value to @timeout. This value is used to limit the + * amount of time a pad waits for data to appear before considering the pad + * as unresponsive. + */ +static void +gst_aggregator_set_timeout (GstAggregator * agg, gint64 timeout) +{ + g_return_if_fail (GST_IS_AGGREGATOR (agg)); + + GST_OBJECT_LOCK (agg); + agg->timeout = timeout; + GST_OBJECT_UNLOCK (agg); +} + +/** + * gst_aggregator_get_timeout: + * @agg: a #GstAggregator + * + * Gets the timeout value. See gst_aggregator_set_timeout for + * more details. + * + * Returns: The time in nanoseconds to wait for data to arrive on a sink pad + * before a pad is deemed unresponsive. A value of -1 means an + * unlimited time. + */ +static gint64 +gst_aggregator_get_timeout (GstAggregator * agg) +{ + gint64 res; + + g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1); + + GST_OBJECT_LOCK (agg); + res = agg->timeout; + GST_OBJECT_UNLOCK (agg); + + return res; +} + +static void +gst_aggregator_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstAggregator *agg = GST_AGGREGATOR (object); + + switch (prop_id) { + case PROP_TIMEOUT: + gst_aggregator_set_timeout (agg, g_value_get_int64 (value)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_aggregator_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstAggregator *agg = GST_AGGREGATOR (object); + + switch (prop_id) { + case PROP_TIMEOUT: + g_value_set_int64 (value, gst_aggregator_get_timeout (agg)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + /* GObject vmethods implementations */ static void gst_aggregator_class_init (GstAggregatorClass * klass) @@ -1173,8 +1310,17 @@ gst_aggregator_class_init (GstAggregatorClass * klass) gstelement_class->release_pad = GST_DEBUG_FUNCPTR (_release_pad); gstelement_class->change_state = GST_DEBUG_FUNCPTR (_change_state); + gobject_class->set_property = gst_aggregator_set_property; + gobject_class->get_property = gst_aggregator_get_property; gobject_class->finalize = gst_aggregator_finalize; gobject_class->dispose = gst_aggregator_dispose; + + g_object_class_install_property (gobject_class, PROP_TIMEOUT, + g_param_spec_int64 ("timeout", "Buffer timeout", + "Number of nanoseconds to wait for a buffer to arrive on a sink pad" + "before the pad is deemed unresponsive (-1 unlimited)", -1, + G_MAXINT64, DEFAULT_TIMEOUT, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); } static void @@ -1211,6 +1357,9 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass) gst_element_add_pad (GST_ELEMENT (self), self->srcpad); + self->clock = gst_system_clock_obtain (); + self->timeout = -1; + g_mutex_init (&self->priv->setcaps_lock); } @@ -1250,10 +1399,19 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer) GstAggregatorPrivate *priv = self->priv; GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object); + GstClockTime timeout = gst_aggregator_get_timeout (self); + GstClockTime now; GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer); + if (aggpad->priv->timeout_id) { + gst_clock_id_unschedule (aggpad->priv->timeout_id); + gst_clock_id_unref (aggpad->priv->timeout_id); + aggpad->priv->timeout_id = NULL; + } + g_atomic_int_set (&aggpad->unresponsive, FALSE); PAD_STREAM_LOCK (aggpad); + if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) goto flushing; @@ -1261,6 +1419,7 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer) goto eos; PAD_LOCK_EVENT (aggpad); + if (aggpad->buffer) { GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); PAD_WAIT_EVENT (aggpad); @@ -1283,6 +1442,14 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer) QUEUE_PUSH (self); + if (GST_CLOCK_TIME_IS_VALID (timeout)) { + now = gst_clock_get_time (self->clock); + aggpad->priv->timeout_id = + gst_clock_new_single_shot_id (self->clock, now + timeout); + gst_clock_id_wait_async (aggpad->priv->timeout_id, _unresponsive_timeout, + gst_object_ref (aggpad), gst_object_unref); + } + GST_DEBUG_OBJECT (aggpad, "Done chaining"); return priv->flow_return; diff --git a/libs/gst/base/gstaggregator.h b/libs/gst/base/gstaggregator.h index 507da136cb..837fdbb34f 100644 --- a/libs/gst/base/gstaggregator.h +++ b/libs/gst/base/gstaggregator.h @@ -73,6 +73,7 @@ struct _GstAggregatorPad GstBuffer * buffer; GstSegment segment; gboolean eos; + gboolean unresponsive; /* < Private > */ GstAggregatorPadPrivate * priv; @@ -137,6 +138,11 @@ struct _GstAggregator /*< private >*/ GstAggregatorPrivate * priv; + GstClock * clock; + + /* properties */ + gint64 timeout; + gpointer _gst_reserved[GST_PADDING]; };