diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index 2afa3c34fe..f0dd552a78 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -751,6 +751,7 @@ static gboolean check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data) { GstEvent *event = NULL; + GstQuery *query = NULL; GstAggregatorClass *klass = NULL; gboolean *processed_event = user_data; @@ -763,11 +764,14 @@ check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data) pad->priv->eos = TRUE; } if (pad->priv->clipped_buffer == NULL && - GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) { - event = gst_event_ref (g_queue_peek_tail (&pad->priv->buffers)); + !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers))) { + if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) + event = gst_event_ref (g_queue_peek_tail (&pad->priv->buffers)); + if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->buffers))) + query = g_queue_peek_tail (&pad->priv->buffers); } PAD_UNLOCK (pad); - if (event) { + if (event || query) { gboolean ret; if (processed_event) @@ -776,18 +780,43 @@ check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data) klass = GST_AGGREGATOR_GET_CLASS (self); GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event); - gst_event_ref (event); - ret = klass->sink_event (self, pad, event); - PAD_LOCK (pad); - if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) - pad->priv->negotiated = ret; - if (g_queue_peek_tail (&pad->priv->buffers) == event) - gst_event_unref (g_queue_pop_tail (&pad->priv->buffers)); - gst_event_unref (event); + if (event) { + gst_event_ref (event); + ret = klass->sink_event (self, pad, event); + + PAD_LOCK (pad); + if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) + pad->priv->negotiated = ret; + if (g_queue_peek_tail (&pad->priv->buffers) == event) + gst_event_unref (g_queue_pop_tail (&pad->priv->buffers)); + gst_event_unref (event); + } + + if (query) { + GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event); + ret = klass->sink_query (self, pad, query); + + PAD_LOCK (pad); + if (g_queue_peek_tail (&pad->priv->buffers) == query) { + GstStructure *s; + + s = gst_query_writable_structure (query); + gst_structure_set (s, "gst-aggregator-retval", G_TYPE_BOOLEAN, ret, + NULL); + g_queue_pop_tail (&pad->priv->buffers); + } + } + PAD_BROADCAST_EVENT (pad); PAD_UNLOCK (pad); } + if (query) { + if (processed_event) + *processed_event = TRUE; + if (klass == NULL) + klass = GST_AGGREGATOR_GET_CLASS (self); + } } while (event != NULL); return TRUE; @@ -816,7 +845,8 @@ gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad, GST_EVENT_TYPE (item->data) == GST_EVENT_EOS || GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT || !GST_EVENT_IS_STICKY (item->data)) { - gst_mini_object_unref (item->data); + if (!GST_IS_QUERY (item->data)) + gst_mini_object_unref (item->data); g_queue_delete_link (&aggpad->priv->buffers, item); } item = next; @@ -2603,31 +2633,53 @@ static gboolean gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent, GstQuery * query) { + GstAggregator *self = GST_AGGREGATOR (parent); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); if (GST_QUERY_IS_SERIALIZED (query)) { + GstStructure *s; + gboolean ret = FALSE; + + SRC_LOCK (self); PAD_LOCK (aggpad); + if (aggpad->priv->flow_return != GST_FLOW_OK) { + SRC_UNLOCK (self); + goto flushing; + } + + g_queue_push_head (&aggpad->priv->buffers, query); + SRC_BROADCAST (self); + SRC_UNLOCK (self); + while (!gst_aggregator_pad_queue_is_empty (aggpad) && aggpad->priv->flow_return == GST_FLOW_OK) { GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); PAD_WAIT_EVENT (aggpad); } + s = gst_query_writable_structure (query); + if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret)) + gst_structure_remove_field (s, "gst-aggregator-retval"); + else + g_queue_remove (&aggpad->priv->buffers, query); + if (aggpad->priv->flow_return != GST_FLOW_OK) goto flushing; PAD_UNLOCK (aggpad); + + return ret; } - return klass->sink_query (GST_AGGREGATOR (parent), - GST_AGGREGATOR_PAD (pad), query); + return klass->sink_query (self, aggpad, query); flushing: GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query", gst_flow_get_name (aggpad->priv->flow_return)); PAD_UNLOCK (aggpad); + return FALSE; }