diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index 36962005b8..e8282de266 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -1586,6 +1586,61 @@ eat: return res; } +/* Queue serialized events and let the others go through directly. + * The queued events with be handled from the src-pad task in + * gst_aggregator_do_events_and_queries(). + */ +static gboolean +gst_aggregator_default_sink_event_pre_queue (GstAggregator * self, + GstAggregatorPad * aggpad, GstEvent * event) +{ + GstFlowReturn ret = GST_FLOW_OK; + + if (GST_EVENT_IS_SERIALIZED (event) + && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) { + SRC_LOCK (self); + PAD_LOCK (aggpad); + + if (aggpad->priv->flow_return != GST_FLOW_OK) + goto flushing; + + if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) { + GST_OBJECT_LOCK (aggpad); + gst_event_copy_segment (event, &aggpad->priv->head_segment); + aggpad->priv->head_position = aggpad->priv->head_segment.position; + update_time_level (aggpad, TRUE); + GST_OBJECT_UNLOCK (aggpad); + } + + GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, event); + g_queue_push_head (&aggpad->priv->data, event); + SRC_BROADCAST (self); + PAD_UNLOCK (aggpad); + SRC_UNLOCK (self); + } else { + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self); + + if (!klass->sink_event (self, aggpad, event)) { + /* Copied from GstPad to convert boolean to a GstFlowReturn in + * the event handling func */ + ret = GST_FLOW_ERROR; + } + } + + return ret; + +flushing: + GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event", + gst_flow_get_name (aggpad->priv->flow_return)); + PAD_UNLOCK (aggpad); + SRC_UNLOCK (self); + if (GST_EVENT_IS_STICKY (event)) + gst_pad_store_sticky_event (GST_PAD (aggpad), event); + gst_event_unref (event); + + return aggpad->priv->flow_return; +} + static gboolean gst_aggregator_stop_pad (GstElement * self, GstPad * epad, gpointer user_data) { @@ -2257,6 +2312,58 @@ gst_aggregator_default_sink_query (GstAggregator * self, return gst_pad_query_default (pad, GST_OBJECT (self), query); } +static gboolean +gst_aggregator_default_sink_query_pre_queue (GstAggregator * self, + GstAggregatorPad * aggpad, GstQuery * query) +{ + if (GST_QUERY_IS_SERIALIZED (query)) { + GstStructure *s; + gboolean ret = FALSE; + + SRC_LOCK (self); + PAD_LOCK (aggpad); + + if (aggpad->priv->flow_return != GST_FLOW_OK) { + SRC_UNLOCK (self); + goto flushing; + } + + g_queue_push_head (&aggpad->priv->data, query); + SRC_BROADCAST (self); + SRC_UNLOCK (self); + + while (!gst_aggregator_pad_queue_is_empty (aggpad) + && aggpad->priv->flow_return == GST_FLOW_OK) { + GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); + PAD_WAIT_EVENT (aggpad); + } + + s = gst_query_writable_structure (query); + if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret)) + gst_structure_remove_field (s, "gst-aggregator-retval"); + else + g_queue_remove (&aggpad->priv->data, query); + + if (aggpad->priv->flow_return != GST_FLOW_OK) + goto flushing; + + PAD_UNLOCK (aggpad); + + return ret; + } else { + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self); + + return klass->sink_query (self, aggpad, query); + } + +flushing: + GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query", + gst_flow_get_name (aggpad->priv->flow_return)); + PAD_UNLOCK (aggpad); + + return FALSE; +} + static void gst_aggregator_finalize (GObject * object) { @@ -2427,6 +2534,9 @@ gst_aggregator_class_init (GstAggregatorClass * klass) klass->negotiate = gst_aggregator_default_negotiate; + klass->sink_event_pre_queue = gst_aggregator_default_sink_event_pre_queue; + klass->sink_query_pre_queue = gst_aggregator_default_sink_query_pre_queue; + gstelement_class->request_new_pad = GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad); gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_aggregator_send_event); @@ -2769,111 +2879,23 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent, GstQuery * query) { GstAggregator *self = GST_AGGREGATOR (parent); + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); - if (GST_QUERY_IS_SERIALIZED (query)) { - GstStructure *s; - gboolean ret = FALSE; - - SRC_LOCK (self); - PAD_LOCK (aggpad); - - if (aggpad->priv->flow_return != GST_FLOW_OK) { - SRC_UNLOCK (self); - goto flushing; - } - - g_queue_push_head (&aggpad->priv->data, query); - SRC_BROADCAST (self); - SRC_UNLOCK (self); - - while (!gst_aggregator_pad_queue_is_empty (aggpad) - && aggpad->priv->flow_return == GST_FLOW_OK) { - GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); - PAD_WAIT_EVENT (aggpad); - } - - s = gst_query_writable_structure (query); - if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret)) - gst_structure_remove_field (s, "gst-aggregator-retval"); - else - g_queue_remove (&aggpad->priv->data, query); - - if (aggpad->priv->flow_return != GST_FLOW_OK) - goto flushing; - - PAD_UNLOCK (aggpad); - - return ret; - } else { - GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); - - return klass->sink_query (self, aggpad, query); - } - -flushing: - GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query", - gst_flow_get_name (aggpad->priv->flow_return)); - PAD_UNLOCK (aggpad); - - return FALSE; + g_assert (klass->sink_query_pre_queue); + return klass->sink_query_pre_queue (self, aggpad, query); } -/* Queue serialized events and let the others go through directly. - * The queued events with be handled from the src-pad task in - * gst_aggregator_do_events_and_queries(). - */ static GstFlowReturn gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent, GstEvent * event) { - GstFlowReturn ret = GST_FLOW_OK; GstAggregator *self = GST_AGGREGATOR (parent); + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); - if (GST_EVENT_IS_SERIALIZED (event) - && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) { - SRC_LOCK (self); - PAD_LOCK (aggpad); - - if (aggpad->priv->flow_return != GST_FLOW_OK) - goto flushing; - - if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) { - GST_OBJECT_LOCK (aggpad); - gst_event_copy_segment (event, &aggpad->priv->head_segment); - aggpad->priv->head_position = aggpad->priv->head_segment.position; - update_time_level (aggpad, TRUE); - GST_OBJECT_UNLOCK (aggpad); - } - - GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, event); - g_queue_push_head (&aggpad->priv->data, event); - SRC_BROADCAST (self); - PAD_UNLOCK (aggpad); - SRC_UNLOCK (self); - } else { - GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); - - if (!klass->sink_event (self, aggpad, event)) { - /* Copied from GstPad to convert boolean to a GstFlowReturn in - * the event handling func */ - ret = GST_FLOW_ERROR; - } - } - - return ret; - -flushing: - GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event", - gst_flow_get_name (aggpad->priv->flow_return)); - PAD_UNLOCK (aggpad); - SRC_UNLOCK (self); - if (GST_EVENT_IS_STICKY (event)) - gst_pad_store_sticky_event (pad, event); - gst_event_unref (event); - - return aggpad->priv->flow_return; + g_assert (klass->sink_event_pre_queue); + return klass->sink_event_pre_queue (self, aggpad, event); } static gboolean diff --git a/libs/gst/base/gstaggregator.h b/libs/gst/base/gstaggregator.h index 0787c0cdad..56d4b28c95 100644 --- a/libs/gst/base/gstaggregator.h +++ b/libs/gst/base/gstaggregator.h @@ -237,6 +237,12 @@ struct _GstAggregator * Allows the subclass to handle the allocation query from upstream. * @negotiate: Optional. * Negotiate the caps with the peer (Since: 1.18). + * @sink_event_pre_queue: Optional. + * Called when an event is received on a sink pad before queueing up + * serialized events. The subclass should always chain up (Since: 1.18). + * @sink_query_pre_queue: Optional. + * Called when a query is received on a sink pad before queueing up + * serialized queries. The subclass should always chain up (Since: 1.18). * * The aggregator base class will handle in a thread-safe way all manners of * concurrent flushes, seeks, pad additions and removals, leaving to the @@ -316,8 +322,16 @@ struct _GstAggregatorClass { gboolean (*negotiate) (GstAggregator * self); + gboolean (*sink_event_pre_queue) (GstAggregator * aggregator, + GstAggregatorPad * aggregator_pad, + GstEvent * event); + + gboolean (*sink_query_pre_queue) (GstAggregator * aggregator, + GstAggregatorPad * aggregator_pad, + GstQuery * query); + /*< private >*/ - gpointer _gst_reserved[GST_PADDING_LARGE-1]; + gpointer _gst_reserved[GST_PADDING_LARGE-3]; }; /************************************