aggregator: Process serialized queries through the queue

This ensures that they really get processed in order with
buffers. Just waiting for the queue to be empty is sometimes not
enough as the buffers are dropped from the pad before the result is
pushed to the next element, sometimes resulting in surprising
re-ordering.
This commit is contained in:
Olivier Crête 2017-05-23 00:53:57 +02:00 committed by Tim-Philipp Müller
parent 0500807b55
commit 474acca0a8

View file

@ -751,6 +751,7 @@ static gboolean
check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data)
{
GstEvent *event = NULL;
GstQuery *query = NULL;
GstAggregatorClass *klass = NULL;
gboolean *processed_event = user_data;
@ -763,11 +764,14 @@ check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data)
pad->priv->eos = TRUE;
}
if (pad->priv->clipped_buffer == NULL &&
GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) {
event = gst_event_ref (g_queue_peek_tail (&pad->priv->buffers));
!GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers))) {
if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers)))
event = gst_event_ref (g_queue_peek_tail (&pad->priv->buffers));
if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->buffers)))
query = g_queue_peek_tail (&pad->priv->buffers);
}
PAD_UNLOCK (pad);
if (event) {
if (event || query) {
gboolean ret;
if (processed_event)
@ -776,18 +780,43 @@ check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data)
klass = GST_AGGREGATOR_GET_CLASS (self);
GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
gst_event_ref (event);
ret = klass->sink_event (self, pad, event);
PAD_LOCK (pad);
if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS)
pad->priv->negotiated = ret;
if (g_queue_peek_tail (&pad->priv->buffers) == event)
gst_event_unref (g_queue_pop_tail (&pad->priv->buffers));
gst_event_unref (event);
if (event) {
gst_event_ref (event);
ret = klass->sink_event (self, pad, event);
PAD_LOCK (pad);
if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS)
pad->priv->negotiated = ret;
if (g_queue_peek_tail (&pad->priv->buffers) == event)
gst_event_unref (g_queue_pop_tail (&pad->priv->buffers));
gst_event_unref (event);
}
if (query) {
GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
ret = klass->sink_query (self, pad, query);
PAD_LOCK (pad);
if (g_queue_peek_tail (&pad->priv->buffers) == query) {
GstStructure *s;
s = gst_query_writable_structure (query);
gst_structure_set (s, "gst-aggregator-retval", G_TYPE_BOOLEAN, ret,
NULL);
g_queue_pop_tail (&pad->priv->buffers);
}
}
PAD_BROADCAST_EVENT (pad);
PAD_UNLOCK (pad);
}
if (query) {
if (processed_event)
*processed_event = TRUE;
if (klass == NULL)
klass = GST_AGGREGATOR_GET_CLASS (self);
}
} while (event != NULL);
return TRUE;
@ -816,7 +845,8 @@ gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
GST_EVENT_TYPE (item->data) == GST_EVENT_EOS ||
GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT ||
!GST_EVENT_IS_STICKY (item->data)) {
gst_mini_object_unref (item->data);
if (!GST_IS_QUERY (item->data))
gst_mini_object_unref (item->data);
g_queue_delete_link (&aggpad->priv->buffers, item);
}
item = next;
@ -2603,31 +2633,53 @@ static gboolean
gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
GstQuery * query)
{
GstAggregator *self = GST_AGGREGATOR (parent);
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
if (GST_QUERY_IS_SERIALIZED (query)) {
GstStructure *s;
gboolean ret = FALSE;
SRC_LOCK (self);
PAD_LOCK (aggpad);
if (aggpad->priv->flow_return != GST_FLOW_OK) {
SRC_UNLOCK (self);
goto flushing;
}
g_queue_push_head (&aggpad->priv->buffers, query);
SRC_BROADCAST (self);
SRC_UNLOCK (self);
while (!gst_aggregator_pad_queue_is_empty (aggpad)
&& aggpad->priv->flow_return == GST_FLOW_OK) {
GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
PAD_WAIT_EVENT (aggpad);
}
s = gst_query_writable_structure (query);
if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret))
gst_structure_remove_field (s, "gst-aggregator-retval");
else
g_queue_remove (&aggpad->priv->buffers, query);
if (aggpad->priv->flow_return != GST_FLOW_OK)
goto flushing;
PAD_UNLOCK (aggpad);
return ret;
}
return klass->sink_query (GST_AGGREGATOR (parent),
GST_AGGREGATOR_PAD (pad), query);
return klass->sink_query (self, aggpad, query);
flushing:
GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query",
gst_flow_get_name (aggpad->priv->flow_return));
PAD_UNLOCK (aggpad);
return FALSE;
}