aggregator: Query latency on first incoming buffer.

And keep on querying upstream until we get a reply.

Also, the _get_latency_unlocked() method required being calld
with a private lock, so removed the _unlocked() variant from the API.
And it now returns GST_CLOCK_TIME_NONE when the element is not live as
we think that 0 upstream latency is possible.

https://bugzilla.gnome.org/show_bug.cgi?id=745768
This commit is contained in:
Olivier Crête 2015-03-06 21:12:52 -05:00 committed by Tim-Philipp Müller
parent 46e33e84fd
commit 5a0e2c4c47
2 changed files with 90 additions and 70 deletions

View file

@ -87,6 +87,8 @@ static gint64 gst_aggregator_get_latency_property (GstAggregator * agg);
*/ */
static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
GST_DEBUG_CATEGORY_STATIC (aggregator_debug); GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
#define GST_CAT_DEFAULT aggregator_debug #define GST_CAT_DEFAULT aggregator_debug
@ -236,9 +238,10 @@ struct _GstAggregatorPrivate
GstTagList *tags; GstTagList *tags;
gboolean tags_changed; gboolean tags_changed;
gboolean latency_live; /* protected by src_lock */ gboolean peer_latency_live; /* protected by src_lock */
GstClockTime latency_min; /* protected by src_lock */ GstClockTime peer_latency_min; /* protected by src_lock */
GstClockTime latency_max; /* protected by src_lock */ GstClockTime peer_latency_max; /* protected by src_lock */
gboolean has_peer_latency;
GstClockTime sub_latency_min; /* protected by src_lock */ GstClockTime sub_latency_min; /* protected by src_lock */
GstClockTime sub_latency_max; /* protected by src_lock */ GstClockTime sub_latency_max; /* protected by src_lock */
@ -535,15 +538,15 @@ gst_aggregator_get_next_time (GstAggregator * self)
static gboolean static gboolean
gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
{ {
GstClockTime latency_max, latency_min; GstClockTime latency;
GstClockTime start; GstClockTime start;
gboolean live, res; gboolean res;
*timeout = FALSE; *timeout = FALSE;
SRC_LOCK (self); SRC_LOCK (self);
gst_aggregator_get_latency_unlocked (self, &live, &latency_min, &latency_max); latency = gst_aggregator_get_latency_unlocked (self);
if (gst_aggregator_check_pads_ready (self)) { if (gst_aggregator_check_pads_ready (self)) {
GST_DEBUG_OBJECT (self, "all pads have data"); GST_DEBUG_OBJECT (self, "all pads have data");
@ -561,8 +564,9 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
start = gst_aggregator_get_next_time (self); start = gst_aggregator_get_next_time (self);
if (!live || !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) if (!GST_CLOCK_TIME_IS_VALID (latency) ||
|| !GST_CLOCK_TIME_IS_VALID (start)) { !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) ||
!GST_CLOCK_TIME_IS_VALID (start)) {
/* We wake up here when something happened, and below /* We wake up here when something happened, and below
* then check if we're ready now. If we return FALSE, * then check if we're ready now. If we return FALSE,
* we will be directly called again. * we will be directly called again.
@ -585,15 +589,14 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
GST_OBJECT_UNLOCK (self); GST_OBJECT_UNLOCK (self);
time = base_time + start; time = base_time + start;
time += latency_min; time += latency;
GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %" GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %"
GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT
" latency max %" GST_TIME_FORMAT " latency min %" GST_TIME_FORMAT " latency %" GST_TIME_FORMAT " current %" GST_TIME_FORMAT ")",
" current %" GST_TIME_FORMAT ")", GST_TIME_ARGS (time), GST_TIME_ARGS (time),
GST_TIME_ARGS (GST_ELEMENT_CAST (self)->base_time), GST_TIME_ARGS (GST_ELEMENT_CAST (self)->base_time),
GST_TIME_ARGS (start), GST_TIME_ARGS (latency_max), GST_TIME_ARGS (start), GST_TIME_ARGS (latency),
GST_TIME_ARGS (latency_min),
GST_TIME_ARGS (gst_clock_get_time (clock))); GST_TIME_ARGS (gst_clock_get_time (clock)));
@ -987,6 +990,10 @@ gst_aggregator_stop (GstAggregator * agg)
else else
result = TRUE; result = TRUE;
agg->priv->has_peer_latency = FALSE;
agg->priv->peer_latency_live = FALSE;
agg->priv->peer_latency_min = agg->priv->peer_latency_max = FALSE;
if (agg->priv->tags) if (agg->priv->tags)
gst_tag_list_unref (agg->priv->tags); gst_tag_list_unref (agg->priv->tags);
agg->priv->tags = NULL; agg->priv->tags = NULL;
@ -1110,8 +1117,10 @@ gst_aggregator_request_new_pad (GstElement * element,
return GST_PAD (agg_pad); return GST_PAD (agg_pad);
} }
/* Must be called with SRC_LOCK held */
static gboolean static gboolean
gst_aggregator_query_latency (GstAggregator * self, GstQuery * query) gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query)
{ {
gboolean query_ret, live; gboolean query_ret, live;
GstClockTime our_latency, min, max; GstClockTime our_latency, min, max;
@ -1125,13 +1134,11 @@ gst_aggregator_query_latency (GstAggregator * self, GstQuery * query)
gst_query_parse_latency (query, &live, &min, &max); gst_query_parse_latency (query, &live, &min, &max);
SRC_LOCK (self);
our_latency = self->priv->latency; our_latency = self->priv->latency;
if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) { if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) {
GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT
". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min)); ". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min));
SRC_UNLOCK (self);
return FALSE; return FALSE;
} }
@ -1140,13 +1147,13 @@ gst_aggregator_query_latency (GstAggregator * self, GstQuery * query)
("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %" ("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %"
GST_TIME_FORMAT ". Add queues or other buffering elements.", GST_TIME_FORMAT ". Add queues or other buffering elements.",
GST_TIME_ARGS (max), GST_TIME_ARGS (min))); GST_TIME_ARGS (max), GST_TIME_ARGS (min)));
SRC_UNLOCK (self);
return FALSE; return FALSE;
} }
self->priv->latency_live = live; self->priv->peer_latency_live = live;
self->priv->latency_min = min; self->priv->peer_latency_min = min;
self->priv->latency_max = max; self->priv->peer_latency_max = max;
self->priv->has_peer_latency = TRUE;
/* add our own */ /* add our own */
min += our_latency; min += our_latency;
@ -1170,7 +1177,6 @@ gst_aggregator_query_latency (GstAggregator * self, GstQuery * query)
} }
SRC_BROADCAST (self); SRC_BROADCAST (self);
SRC_UNLOCK (self);
GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
" max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max); " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max);
@ -1180,47 +1186,63 @@ gst_aggregator_query_latency (GstAggregator * self, GstQuery * query)
return query_ret; return query_ret;
} }
/** /*
* gst_aggregator_get_latency_unlocked: * MUST be called with the src_lock held.
* @self: a #GstAggregator
* @live: (out) (allow-none): whether @self is live
* @min_latency: (out) (allow-none): the configured minimum latency of @self
* @max_latency: (out) (allow-none): the configured maximum latency of @self
* *
* Retreives the latency values reported by @self in response to the latency * See gst_aggregator_get_latency() for doc
* query. */
static GstClockTime
gst_aggregator_get_latency_unlocked (GstAggregator * self)
{
GstClockTime latency;
g_return_val_if_fail (GST_IS_AGGREGATOR (self), 0);
if (!self->priv->has_peer_latency) {
GstQuery *query = gst_query_new_latency ();
gboolean ret;
ret = gst_aggregator_query_latency_unlocked (self, query);
gst_query_unref (query);
if (!ret)
return GST_CLOCK_TIME_NONE;
}
if (!self->priv->has_peer_latency || !self->priv->peer_latency_live)
return GST_CLOCK_TIME_NONE;
/* latency_min is never GST_CLOCK_TIME_NONE by construction */
latency = self->priv->peer_latency_min;
/* add our own */
latency += self->priv->latency;
latency += self->priv->sub_latency_min;
return latency;
}
/**
* gst_aggregator_get_latency:
* @self: a #GstAggregator
*
* Retrieves the latency values reported by @self in response to the latency
* query, or %GST_CLOCK_TIME_NONE if there is not live source connected and the element
* will not wait for the clock.
* *
* Typically only called by subclasses. * Typically only called by subclasses.
* *
* MUST be called with the src_lock held. * Returns: The latency or %GST_CLOCK_TIME_NONE if the element does not sync
*/ */
void GstClockTime
gst_aggregator_get_latency_unlocked (GstAggregator * self, gboolean * live, gst_aggregator_get_latency (GstAggregator * self)
GstClockTime * min_latency, GstClockTime * max_latency)
{ {
GstClockTime min, max; GstClockTime ret;
g_return_if_fail (GST_IS_AGGREGATOR (self)); SRC_LOCK (self);
ret = gst_aggregator_get_latency_unlocked (self);
SRC_UNLOCK (self);
/* latency_min is never GST_CLOCK_TIME_NONE by construction */ return ret;
min = self->priv->latency_min;
max = self->priv->latency_max;
/* add our own */
min += self->priv->latency;
min += self->priv->sub_latency_min;
if (GST_CLOCK_TIME_IS_VALID (max)
&& GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max))
max += self->priv->sub_latency_max;
else
max = GST_CLOCK_TIME_NONE;
if (live)
*live = self->priv->latency_live;
if (min_latency)
*min_latency = min;
if (max_latency)
*max_latency = max;
} }
static gboolean static gboolean
@ -1271,17 +1293,17 @@ gst_aggregator_default_src_query (GstAggregator * self, GstQuery * query)
gst_query_set_seeking (query, format, FALSE, 0, -1); gst_query_set_seeking (query, format, FALSE, 0, -1);
res = TRUE; res = TRUE;
goto discard; break;
} }
case GST_QUERY_LATENCY: case GST_QUERY_LATENCY:
return gst_aggregator_query_latency (self, query); SRC_LOCK (self);
default: res = gst_aggregator_query_latency_unlocked (self, query);
SRC_UNLOCK (self);
break; break;
default:
return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
} }
return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
discard:
return res; return res;
} }
@ -1549,9 +1571,9 @@ gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency)
g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency)); g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency));
SRC_LOCK (self); SRC_LOCK (self);
if (self->priv->latency_live) { if (self->priv->peer_latency_live) {
min = self->priv->latency_min; min = self->priv->peer_latency_min;
max = self->priv->latency_max; max = self->priv->peer_latency_max;
/* add our own */ /* add our own */
min += latency; min += latency;
min += self->priv->sub_latency_min; min += self->priv->sub_latency_min;
@ -1707,9 +1729,10 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
priv->padcount = -1; priv->padcount = -1;
priv->tags_changed = FALSE; priv->tags_changed = FALSE;
self->priv->latency_live = FALSE; self->priv->peer_latency_live = FALSE;
self->priv->latency_min = self->priv->sub_latency_min = 0; self->priv->peer_latency_min = self->priv->sub_latency_min = 0;
self->priv->latency_max = self->priv->sub_latency_max = 0; self->priv->peer_latency_max = self->priv->sub_latency_max = 0;
self->priv->has_peer_latency = FALSE;
gst_aggregator_reset_flow_values (self); gst_aggregator_reset_flow_values (self);
self->srcpad = gst_pad_new_from_template (pad_template, "src"); self->srcpad = gst_pad_new_from_template (pad_template, "src");

View file

@ -271,10 +271,7 @@ gboolean gst_aggregator_iterate_sinkpads (GstAggregator
GstAggregatorPadForeachFunc func, GstAggregatorPadForeachFunc func,
gpointer user_data); gpointer user_data);
void gst_aggregator_get_latency_unlocked (GstAggregator * self, GstClockTime gst_aggregator_get_latency (GstAggregator * self);
gboolean * live,
GstClockTime * min,
GstClockTime * max);
G_END_DECLS G_END_DECLS