aggregator: Unify downstream flow return and flushing

Also means that having a non-OK downstream flow return
wakes up the chain functions.

https://bugzilla.gnome.org/show_bug.cgi?id=747220
This commit is contained in:
Olivier Crête 2015-04-01 22:10:11 -04:00
parent b9a422d8d0
commit 43d4d3c5ca

View file

@ -177,7 +177,7 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
struct _GstAggregatorPadPrivate
{
/* Following fields are protected by the PAD_LOCK */
gboolean flushing;
GstFlowReturn flow_return;
gboolean pending_flush_start;
gboolean pending_flush_stop;
gboolean pending_eos;
@ -201,7 +201,7 @@ gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
PAD_LOCK (aggpad);
aggpad->priv->pending_eos = FALSE;
aggpad->priv->eos = FALSE;
aggpad->priv->flushing = FALSE;
aggpad->priv->flow_return = GST_FLOW_OK;
PAD_UNLOCK (aggpad);
if (klass->flush)
@ -230,7 +230,6 @@ struct _GstAggregatorPrivate
gboolean flush_seeking;
gboolean pending_flush_start;
gboolean send_eos; /* protected by srcpad stream lock */
GstFlowReturn flow_return;
GstCaps *srccaps; /* protected by the srcpad stream lock */
@ -399,7 +398,6 @@ static void
gst_aggregator_reset_flow_values (GstAggregator * self)
{
GST_OBJECT_LOCK (self);
self->priv->flow_return = GST_FLOW_FLUSHING;
self->priv->send_stream_start = TRUE;
self->priv->send_segment = TRUE;
gst_segment_init (&self->segment, GST_FORMAT_TIME);
@ -631,6 +629,20 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
return res;
}
static void
gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
GstFlowReturn flow_return)
{
PAD_LOCK (aggpad);
if (flow_return == GST_FLOW_NOT_LINKED)
aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return);
else
aggpad->priv->flow_return = flow_return;
gst_buffer_replace (&aggpad->priv->buffer, NULL);
PAD_BROADCAST_EVENT (aggpad);
PAD_UNLOCK (aggpad);
}
static void
gst_aggregator_aggregate_func (GstAggregator * self)
{
@ -655,10 +667,12 @@ gst_aggregator_aggregate_func (GstAggregator * self)
flow_return = klass->aggregate (self, timeout);
GST_OBJECT_LOCK (self);
if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking)
priv->flow_return = GST_FLOW_OK;
else
priv->flow_return = flow_return;
if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) {
/* We don't want to set the pads to flushing, but we want to
* stop the thread, so just break here */
GST_OBJECT_UNLOCK (self);
break;
}
GST_OBJECT_UNLOCK (self);
if (flow_return == GST_FLOW_EOS) {
@ -667,8 +681,18 @@ gst_aggregator_aggregate_func (GstAggregator * self)
GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return));
if (flow_return != GST_FLOW_OK)
if (flow_return != GST_FLOW_OK) {
GList *item;
GST_OBJECT_LOCK (self);
for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) {
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
gst_aggregator_pad_set_flushing (aggpad, flow_return);
}
GST_OBJECT_UNLOCK (self);
break;
}
}
/* Pause the task here, the only ways to get here are:
@ -692,7 +716,6 @@ gst_aggregator_start (GstAggregator * self)
self->priv->send_segment = TRUE;
self->priv->send_eos = TRUE;
self->priv->srccaps = NULL;
self->priv->flow_return = GST_FLOW_OK;
klass = GST_AGGREGATOR_GET_CLASS (self);
@ -789,17 +812,6 @@ gst_aggregator_all_flush_stop_received_locked (GstAggregator * self)
return TRUE;
}
static void
gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad)
{
PAD_LOCK (aggpad);
aggpad->priv->flushing = TRUE;
gst_buffer_replace (&aggpad->priv->buffer, NULL);
PAD_BROADCAST_EVENT (aggpad);
PAD_UNLOCK (aggpad);
}
static void
gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
GstEvent * event)
@ -807,7 +819,7 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
GstAggregatorPrivate *priv = self->priv;
GstAggregatorPadPrivate *padpriv = aggpad->priv;
gst_aggregator_pad_set_flushing (aggpad);
gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING);
PAD_FLUSH_LOCK (aggpad);
PAD_LOCK (aggpad);
@ -828,7 +840,6 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
gst_aggregator_stop_srcpad_task (self, event);
priv->flow_return = GST_FLOW_OK;
GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
GST_PAD_STREAM_LOCK (self->srcpad);
@ -1065,7 +1076,7 @@ gst_aggregator_release_pad (GstElement * element, GstPad * pad)
GST_INFO_OBJECT (pad, "Removing pad");
SRC_LOCK (self);
gst_aggregator_pad_set_flushing (aggpad);
gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING);
gst_element_remove_pad (element, pad);
SRC_BROADCAST (self);
@ -1790,7 +1801,6 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
{
GstBuffer *actual_buf = buffer;
GstAggregator *self = GST_AGGREGATOR (object);
GstAggregatorPrivate *priv = self->priv;
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object);
GstFlowReturn flow_return;
@ -1800,13 +1810,18 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
PAD_FLUSH_LOCK (aggpad);
PAD_LOCK (aggpad);
flow_return = aggpad->priv->flow_return;
if (flow_return != GST_FLOW_OK)
goto flushing;
if (aggpad->priv->pending_eos == TRUE)
goto eos;
while (aggpad->priv->buffer && !aggpad->priv->flushing)
while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK)
PAD_WAIT_EVENT (aggpad);
if (aggpad->priv->flushing)
flow_return = aggpad->priv->flow_return;
if (flow_return != GST_FLOW_OK)
goto flushing;
PAD_UNLOCK (aggpad);
@ -1820,6 +1835,9 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
if (aggpad->priv->buffer)
gst_buffer_unref (aggpad->priv->buffer);
aggpad->priv->buffer = actual_buf;
flow_return = aggpad->priv->flow_return;
PAD_UNLOCK (aggpad);
PAD_FLUSH_UNLOCK (aggpad);
@ -1828,10 +1846,6 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
GST_DEBUG_OBJECT (aggpad, "Done chaining");
GST_OBJECT_LOCK (self);
flow_return = priv->flow_return;
GST_OBJECT_UNLOCK (self);
return flow_return;
flushing:
@ -1839,9 +1853,10 @@ flushing:
PAD_FLUSH_UNLOCK (aggpad);
gst_buffer_unref (buffer);
GST_DEBUG_OBJECT (aggpad, "We are flushing");
GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer",
gst_flow_get_name (flow_return));
return GST_FLOW_FLUSHING;
return flow_return;
eos:
PAD_UNLOCK (aggpad);
@ -1863,10 +1878,10 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
if (GST_QUERY_IS_SERIALIZED (query)) {
PAD_LOCK (aggpad);
while (aggpad->priv->buffer && !aggpad->priv->flushing)
while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK)
PAD_WAIT_EVENT (aggpad);
if (aggpad->priv->flushing)
if (aggpad->priv->flow_return != GST_FLOW_OK)
goto flushing;
PAD_UNLOCK (aggpad);
@ -1876,8 +1891,9 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
GST_AGGREGATOR_PAD (pad), query);
flushing:
GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query",
gst_flow_get_name (aggpad->priv->flow_return));
PAD_UNLOCK (aggpad);
GST_DEBUG_OBJECT (aggpad, "Pad is flushing, dropping query");
return FALSE;
}
@ -1893,10 +1909,10 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
PAD_LOCK (aggpad);
while (aggpad->priv->buffer && !aggpad->priv->flushing)
while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK)
PAD_WAIT_EVENT (aggpad);
if (aggpad->priv->flushing
if (aggpad->priv->flow_return != GST_FLOW_OK
&& GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
goto flushing;
@ -1907,8 +1923,9 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
GST_AGGREGATOR_PAD (pad), event);
flushing:
GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event",
gst_flow_get_name (aggpad->priv->flow_return));
PAD_UNLOCK (aggpad);
GST_DEBUG_OBJECT (aggpad, "Pad is flushing, dropping event");
if (GST_EVENT_IS_STICKY (event))
gst_pad_store_sticky_event (pad, event);
gst_event_unref (event);
@ -1922,10 +1939,10 @@ gst_aggregator_pad_activate_mode_func (GstPad * pad,
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
if (active == FALSE) {
gst_aggregator_pad_set_flushing (aggpad);
gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING);
} else {
PAD_LOCK (aggpad);
aggpad->priv->flushing = FALSE;
aggpad->priv->flow_return = GST_FLOW_OK;
PAD_BROADCAST_EVENT (aggpad);
PAD_UNLOCK (aggpad);
}