aggregator: Add sink_event_pre_queue() and sink_query_pre_queue() vfuncs

These allow subclasses catching serialized events/queries before they're
queued up.
This commit is contained in:
Sebastian Dröge 2019-08-14 14:25:48 +03:00
parent aebff1fcaa
commit d7d79f2c54
2 changed files with 131 additions and 95 deletions

View file

@ -1586,6 +1586,61 @@ eat:
return res;
}
/* Queue serialized events and let the others go through directly.
* The queued events with be handled from the src-pad task in
* gst_aggregator_do_events_and_queries().
*/
static gboolean
gst_aggregator_default_sink_event_pre_queue (GstAggregator * self,
GstAggregatorPad * aggpad, GstEvent * event)
{
GstFlowReturn ret = GST_FLOW_OK;
if (GST_EVENT_IS_SERIALIZED (event)
&& GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
SRC_LOCK (self);
PAD_LOCK (aggpad);
if (aggpad->priv->flow_return != GST_FLOW_OK)
goto flushing;
if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
GST_OBJECT_LOCK (aggpad);
gst_event_copy_segment (event, &aggpad->priv->head_segment);
aggpad->priv->head_position = aggpad->priv->head_segment.position;
update_time_level (aggpad, TRUE);
GST_OBJECT_UNLOCK (aggpad);
}
GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, event);
g_queue_push_head (&aggpad->priv->data, event);
SRC_BROADCAST (self);
PAD_UNLOCK (aggpad);
SRC_UNLOCK (self);
} else {
GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
if (!klass->sink_event (self, aggpad, event)) {
/* Copied from GstPad to convert boolean to a GstFlowReturn in
* the event handling func */
ret = GST_FLOW_ERROR;
}
}
return ret;
flushing:
GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event",
gst_flow_get_name (aggpad->priv->flow_return));
PAD_UNLOCK (aggpad);
SRC_UNLOCK (self);
if (GST_EVENT_IS_STICKY (event))
gst_pad_store_sticky_event (GST_PAD (aggpad), event);
gst_event_unref (event);
return aggpad->priv->flow_return;
}
static gboolean
gst_aggregator_stop_pad (GstElement * self, GstPad * epad, gpointer user_data)
{
@ -2257,6 +2312,58 @@ gst_aggregator_default_sink_query (GstAggregator * self,
return gst_pad_query_default (pad, GST_OBJECT (self), query);
}
static gboolean
gst_aggregator_default_sink_query_pre_queue (GstAggregator * self,
GstAggregatorPad * aggpad, GstQuery * query)
{
if (GST_QUERY_IS_SERIALIZED (query)) {
GstStructure *s;
gboolean ret = FALSE;
SRC_LOCK (self);
PAD_LOCK (aggpad);
if (aggpad->priv->flow_return != GST_FLOW_OK) {
SRC_UNLOCK (self);
goto flushing;
}
g_queue_push_head (&aggpad->priv->data, query);
SRC_BROADCAST (self);
SRC_UNLOCK (self);
while (!gst_aggregator_pad_queue_is_empty (aggpad)
&& aggpad->priv->flow_return == GST_FLOW_OK) {
GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
PAD_WAIT_EVENT (aggpad);
}
s = gst_query_writable_structure (query);
if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret))
gst_structure_remove_field (s, "gst-aggregator-retval");
else
g_queue_remove (&aggpad->priv->data, query);
if (aggpad->priv->flow_return != GST_FLOW_OK)
goto flushing;
PAD_UNLOCK (aggpad);
return ret;
} else {
GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
return klass->sink_query (self, aggpad, query);
}
flushing:
GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query",
gst_flow_get_name (aggpad->priv->flow_return));
PAD_UNLOCK (aggpad);
return FALSE;
}
static void
gst_aggregator_finalize (GObject * object)
{
@ -2427,6 +2534,9 @@ gst_aggregator_class_init (GstAggregatorClass * klass)
klass->negotiate = gst_aggregator_default_negotiate;
klass->sink_event_pre_queue = gst_aggregator_default_sink_event_pre_queue;
klass->sink_query_pre_queue = gst_aggregator_default_sink_query_pre_queue;
gstelement_class->request_new_pad =
GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad);
gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_aggregator_send_event);
@ -2769,111 +2879,23 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
GstQuery * query)
{
GstAggregator *self = GST_AGGREGATOR (parent);
GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
if (GST_QUERY_IS_SERIALIZED (query)) {
GstStructure *s;
gboolean ret = FALSE;
SRC_LOCK (self);
PAD_LOCK (aggpad);
if (aggpad->priv->flow_return != GST_FLOW_OK) {
SRC_UNLOCK (self);
goto flushing;
}
g_queue_push_head (&aggpad->priv->data, query);
SRC_BROADCAST (self);
SRC_UNLOCK (self);
while (!gst_aggregator_pad_queue_is_empty (aggpad)
&& aggpad->priv->flow_return == GST_FLOW_OK) {
GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
PAD_WAIT_EVENT (aggpad);
}
s = gst_query_writable_structure (query);
if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret))
gst_structure_remove_field (s, "gst-aggregator-retval");
else
g_queue_remove (&aggpad->priv->data, query);
if (aggpad->priv->flow_return != GST_FLOW_OK)
goto flushing;
PAD_UNLOCK (aggpad);
return ret;
} else {
GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
return klass->sink_query (self, aggpad, query);
}
flushing:
GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query",
gst_flow_get_name (aggpad->priv->flow_return));
PAD_UNLOCK (aggpad);
return FALSE;
g_assert (klass->sink_query_pre_queue);
return klass->sink_query_pre_queue (self, aggpad, query);
}
/* Queue serialized events and let the others go through directly.
* The queued events with be handled from the src-pad task in
* gst_aggregator_do_events_and_queries().
*/
static GstFlowReturn
gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
GstEvent * event)
{
GstFlowReturn ret = GST_FLOW_OK;
GstAggregator *self = GST_AGGREGATOR (parent);
GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
if (GST_EVENT_IS_SERIALIZED (event)
&& GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
SRC_LOCK (self);
PAD_LOCK (aggpad);
if (aggpad->priv->flow_return != GST_FLOW_OK)
goto flushing;
if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
GST_OBJECT_LOCK (aggpad);
gst_event_copy_segment (event, &aggpad->priv->head_segment);
aggpad->priv->head_position = aggpad->priv->head_segment.position;
update_time_level (aggpad, TRUE);
GST_OBJECT_UNLOCK (aggpad);
}
GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, event);
g_queue_push_head (&aggpad->priv->data, event);
SRC_BROADCAST (self);
PAD_UNLOCK (aggpad);
SRC_UNLOCK (self);
} else {
GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
if (!klass->sink_event (self, aggpad, event)) {
/* Copied from GstPad to convert boolean to a GstFlowReturn in
* the event handling func */
ret = GST_FLOW_ERROR;
}
}
return ret;
flushing:
GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event",
gst_flow_get_name (aggpad->priv->flow_return));
PAD_UNLOCK (aggpad);
SRC_UNLOCK (self);
if (GST_EVENT_IS_STICKY (event))
gst_pad_store_sticky_event (pad, event);
gst_event_unref (event);
return aggpad->priv->flow_return;
g_assert (klass->sink_event_pre_queue);
return klass->sink_event_pre_queue (self, aggpad, event);
}
static gboolean

View file

@ -237,6 +237,12 @@ struct _GstAggregator
* Allows the subclass to handle the allocation query from upstream.
* @negotiate: Optional.
* Negotiate the caps with the peer (Since: 1.18).
* @sink_event_pre_queue: Optional.
* Called when an event is received on a sink pad before queueing up
* serialized events. The subclass should always chain up (Since: 1.18).
* @sink_query_pre_queue: Optional.
* Called when a query is received on a sink pad before queueing up
* serialized queries. The subclass should always chain up (Since: 1.18).
*
* The aggregator base class will handle in a thread-safe way all manners of
* concurrent flushes, seeks, pad additions and removals, leaving to the
@ -316,8 +322,16 @@ struct _GstAggregatorClass {
gboolean (*negotiate) (GstAggregator * self);
gboolean (*sink_event_pre_queue) (GstAggregator * aggregator,
GstAggregatorPad * aggregator_pad,
GstEvent * event);
gboolean (*sink_query_pre_queue) (GstAggregator * aggregator,
GstAggregatorPad * aggregator_pad,
GstQuery * query);
/*< private >*/
gpointer _gst_reserved[GST_PADDING_LARGE-1];
gpointer _gst_reserved[GST_PADDING_LARGE-3];
};
/************************************