aggregator: Always handle serialized events/queries directly before waiting

Otherwise it can happen that we start waiting for another pad, while one
pad already has events that can be handled and potentially also a buffer
that can be handled. That buffer would then however not be accessible by
the subclass from GstAggregator::get_next_time() as there would be the
events in front of it, which doesn't allow the subclass then to
calculate the next time based on already available buffers.

As a side-effect this also allows removing the duplicated event handling
code in the aggregate function as we'll always report pads as not ready
when there is a serialized event or query at the top of at least one
pad's queue.

Fixes https://gitlab.freedesktop.org/gstreamer/gstreamer/issues/428
This commit is contained in:
Sebastian Dröge 2019-08-19 18:19:50 +03:00
parent d90d771a9a
commit 74797e962f

View file

@ -398,8 +398,15 @@ gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
pad->priv->clipped_buffer == NULL);
}
/* Will return FALSE if there's no buffer available on every non-EOS pad, or
* if at least one of the pads has an event or query at the top of its queue.
*
* Only returns TRUE if all non-EOS pads have a buffer available at the top of
* their queue or a clipped buffer already.
*/
static gboolean
gst_aggregator_check_pads_ready (GstAggregator * self)
gst_aggregator_check_pads_ready (GstAggregator * self,
gboolean * have_event_or_query_ret)
{
GstAggregatorPad *pad = NULL;
GList *l, *sinkpads;
@ -419,18 +426,30 @@ gst_aggregator_check_pads_ready (GstAggregator * self)
PAD_LOCK (pad);
if (pad->priv->num_buffers == 0) {
if (!gst_aggregator_pad_queue_is_empty (pad))
have_event_or_query = TRUE;
if (!pad->priv->eos) {
have_buffer = FALSE;
/* If there's an event or query at the top of the queue and we don't yet
* have taken the top buffer out and stored it as clip_buffer, remember
* that and exit the loop. We first have to handle all events/queries
* before we handle any buffers. */
if (!pad->priv->clipped_buffer
&& (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data))
|| GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data)))) {
PAD_UNLOCK (pad);
have_event_or_query = TRUE;
break;
}
/* If not live we need data on all pads, so leave the loop */
if (!self->priv->peer_latency_live) {
PAD_UNLOCK (pad);
goto pad_not_ready;
}
}
/* Otherwise check if we have a clipped buffer or a buffer at the top of
* the queue, and if not then this pad is not ready unless it is also EOS */
if (!pad->priv->clipped_buffer
&& !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
/* We must not have any buffers at all in this pad then as otherwise we
* would've had an event/query at the top of the queue */
g_assert (pad->priv->num_buffers == 0);
/* Only consider this pad as worth waiting for if it's not already EOS.
* There's no point in waiting for buffers on EOS pads */
if (!pad->priv->eos)
have_buffer = FALSE;
} else if (self->priv->peer_latency_live) {
/* In live mode, having a single pad with buffers is enough to
* generate a start time from it. In non-live mode all pads need
@ -442,7 +461,10 @@ gst_aggregator_check_pads_ready (GstAggregator * self)
PAD_UNLOCK (pad);
}
if (!have_buffer && !have_event_or_query)
if (have_event_or_query)
goto pad_not_ready_but_event_or_query;
if (!have_buffer)
goto pad_not_ready;
if (have_buffer)
@ -450,23 +472,42 @@ gst_aggregator_check_pads_ready (GstAggregator * self)
GST_OBJECT_UNLOCK (self);
GST_LOG_OBJECT (self, "pads are ready");
if (have_event_or_query_ret)
*have_event_or_query_ret = have_event_or_query;
return TRUE;
no_sinkpads:
{
GST_LOG_OBJECT (self, "pads not ready: no sink pads");
GST_OBJECT_UNLOCK (self);
if (have_event_or_query_ret)
*have_event_or_query_ret = have_event_or_query;
return FALSE;
}
pad_not_ready:
{
if (have_event_or_query)
GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet,"
" but waking up for serialized event");
else
GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
GST_OBJECT_UNLOCK (self);
return have_event_or_query;
if (have_event_or_query_ret)
*have_event_or_query_ret = have_event_or_query;
return FALSE;
}
pad_not_ready_but_event_or_query:
{
GST_LOG_OBJECT (pad,
"pad not ready to be aggregated yet, need to handle serialized event or query first");
GST_OBJECT_UNLOCK (self);
if (have_event_or_query_ret)
*have_event_or_query_ret = have_event_or_query;
return FALSE;
}
}
@ -630,6 +671,7 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
GstClockTime latency;
GstClockTime start;
gboolean res;
gboolean have_event_or_query = FALSE;
*timeout = FALSE;
@ -637,13 +679,21 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
latency = gst_aggregator_get_latency_unlocked (self);
if (gst_aggregator_check_pads_ready (self)) {
if (gst_aggregator_check_pads_ready (self, &have_event_or_query)) {
GST_DEBUG_OBJECT (self, "all pads have data");
SRC_UNLOCK (self);
return TRUE;
}
/* If we have an event or query, immediately return FALSE instead of waiting
* and handle it immediately */
if (have_event_or_query) {
GST_DEBUG_OBJECT (self, "Have serialized event or query to handle first");
SRC_UNLOCK (self);
return FALSE;
}
/* Before waiting, check if we're actually still running */
if (!self->priv->running || !self->priv->send_eos) {
SRC_UNLOCK (self);
@ -722,7 +772,7 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
}
}
res = gst_aggregator_check_pads_ready (self);
res = gst_aggregator_check_pads_ready (self, NULL);
SRC_UNLOCK (self);
return res;
@ -1175,17 +1225,6 @@ gst_aggregator_aggregate_func (GstAggregator * self)
if (!gst_aggregator_wait_and_check (self, &timeout))
continue;
events_query_data.processed_event = FALSE;
events_query_data.flow_ret = GST_FLOW_OK;
gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
gst_aggregator_do_events_and_queries, &events_query_data);
if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK)
goto handle_error;
if (events_query_data.processed_event)
continue;
if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) {
if (!gst_aggregator_negotiate_unlocked (self)) {
gst_pad_mark_reconfigure (GST_AGGREGATOR_SRC_PAD (self));