aggregator: add latency query handling

This commit is contained in:
Matthew Waters 2014-10-06 21:46:24 +11:00 committed by Tim-Philipp Müller
parent efdcc6c8eb
commit 3c8198e99f

View file

@ -226,6 +226,10 @@ struct _GstAggregatorPrivate
/* Lock to prevent two src setcaps from happening at the same time */
GMutex setcaps_lock;
gboolean latency_live;
GstClockTime latency_min;
GstClockTime latency_max;
};
typedef struct
@ -904,6 +908,88 @@ _request_new_pad (GstElement * element,
return GST_PAD (agg_pad);
}
typedef struct
{
GstClockTime min, max;
gboolean live;
} LatencyData;
static gboolean
_latency_query (GstAggregator * self, GstPad * pad, gpointer user_data)
{
LatencyData *data = user_data;
GstClockTime min, max;
GstQuery *query;
gboolean live, res;
query = gst_query_new_latency ();
res = gst_pad_peer_query (pad, query);
if (res) {
gst_query_parse_latency (query, &live, &min, &max);
GST_LOG_OBJECT (self, "got latency live:%s min:%" G_GINT64_FORMAT
" max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max);
if (min > data->min)
data->min = min;
if (max != GST_CLOCK_TIME_NONE &&
((data->max != GST_CLOCK_TIME_NONE && max < data->max) ||
(data->max == GST_CLOCK_TIME_NONE)))
data->max = max;
data->live |= live;
}
gst_query_unref (query);
return TRUE;
}
static gboolean
gst_aggregator_query_latency (GstAggregator * self, GstQuery * query)
{
LatencyData data;
data.min = 0;
data.max = GST_CLOCK_TIME_NONE;
data.live = FALSE;
/* query upstream's latency */
gst_aggregator_iterate_sinkpads (self,
(GstAggregatorPadForeachFunc) _latency_query, &data);
if (data.live && GST_CLOCK_TIME_IS_VALID (self->timeout) &&
self->timeout > data.max) {
GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
("%s", "Timeout too big"),
("The requested timeout value is too big for the latency in the "
"current pipeline. Limiting to %" G_GINT64_FORMAT, data.max));
self->timeout = data.max;
}
self->priv->latency_live = data.live;
self->priv->latency_min = data.min;
self->priv->latency_max = data.max;
/* add our own */
if (GST_CLOCK_TIME_IS_VALID (self->timeout)) {
if (GST_CLOCK_TIME_IS_VALID (data.min))
data.min += self->timeout;
if (GST_CLOCK_TIME_IS_VALID (data.max))
data.max += self->timeout;
}
GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
" max:%" G_GINT64_FORMAT, data.live ? "true" : "false", data.min,
data.max);
gst_query_set_latency (query, data.live, data.min, data.max);
return TRUE;
}
static gboolean
_send_event (GstElement * element, GstEvent * event)
{
@ -951,6 +1037,10 @@ _src_query (GstAggregator * self, GstQuery * query)
goto discard;
}
case GST_QUERY_LATENCY:
{
return gst_aggregator_query_latency (self, query);
}
default:
break;
}
@ -1216,13 +1306,24 @@ gst_aggregator_dispose (GObject * object)
* as unresponsive.
*/
static void
gst_aggregator_set_timeout (GstAggregator * agg, gint64 timeout)
gst_aggregator_set_timeout (GstAggregator * self, gint64 timeout)
{
g_return_if_fail (GST_IS_AGGREGATOR (agg));
g_return_if_fail (GST_IS_AGGREGATOR (self));
GST_OBJECT_LOCK (agg);
agg->timeout = timeout;
GST_OBJECT_UNLOCK (agg);
GST_OBJECT_LOCK (self);
if (self->priv->latency_live && self->priv->latency_max != 0 &&
GST_CLOCK_TIME_IS_VALID (timeout) && timeout > self->priv->latency_max) {
GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
("%s", "Timeout too big"),
("The requested timeout value is too big for the latency in the "
"current pipeline. Limiting to %" G_GINT64_FORMAT,
self->priv->latency_max));
timeout = self->priv->latency_max;
}
self->timeout = timeout;
GST_OBJECT_UNLOCK (self);
}
/**
@ -1343,6 +1444,10 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
priv->padcount = -1;
priv->tags_changed = FALSE;
self->priv->latency_live = FALSE;
self->priv->latency_min = 0;
self->priv->latency_max = GST_CLOCK_TIME_NONE;
_reset_flow_values (self);
AGGREGATOR_QUEUE (self) = g_async_queue_new ();