aggregator: Unify downstream flow return and flushing

Also means that having a non-OK downstream flow return
wakes up the chain functions.

https://bugzilla.gnome.org/show_bug.cgi?id=747220
This commit is contained in:
Olivier Crête 2015-04-01 22:10:11 -04:00 committed by Tim-Philipp Müller
parent 30eb13a16d
commit e73a173224

View file

@ -177,7 +177,7 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
struct _GstAggregatorPadPrivate struct _GstAggregatorPadPrivate
{ {
/* Following fields are protected by the PAD_LOCK */ /* Following fields are protected by the PAD_LOCK */
gboolean flushing; GstFlowReturn flow_return;
gboolean pending_flush_start; gboolean pending_flush_start;
gboolean pending_flush_stop; gboolean pending_flush_stop;
gboolean pending_eos; gboolean pending_eos;
@ -201,7 +201,7 @@ gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
PAD_LOCK (aggpad); PAD_LOCK (aggpad);
aggpad->priv->pending_eos = FALSE; aggpad->priv->pending_eos = FALSE;
aggpad->priv->eos = FALSE; aggpad->priv->eos = FALSE;
aggpad->priv->flushing = FALSE; aggpad->priv->flow_return = GST_FLOW_OK;
PAD_UNLOCK (aggpad); PAD_UNLOCK (aggpad);
if (klass->flush) if (klass->flush)
@ -230,7 +230,6 @@ struct _GstAggregatorPrivate
gboolean flush_seeking; gboolean flush_seeking;
gboolean pending_flush_start; gboolean pending_flush_start;
gboolean send_eos; /* protected by srcpad stream lock */ gboolean send_eos; /* protected by srcpad stream lock */
GstFlowReturn flow_return;
GstCaps *srccaps; /* protected by the srcpad stream lock */ GstCaps *srccaps; /* protected by the srcpad stream lock */
@ -399,7 +398,6 @@ static void
gst_aggregator_reset_flow_values (GstAggregator * self) gst_aggregator_reset_flow_values (GstAggregator * self)
{ {
GST_OBJECT_LOCK (self); GST_OBJECT_LOCK (self);
self->priv->flow_return = GST_FLOW_FLUSHING;
self->priv->send_stream_start = TRUE; self->priv->send_stream_start = TRUE;
self->priv->send_segment = TRUE; self->priv->send_segment = TRUE;
gst_segment_init (&self->segment, GST_FORMAT_TIME); gst_segment_init (&self->segment, GST_FORMAT_TIME);
@ -631,6 +629,20 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
return res; return res;
} }
static void
gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
GstFlowReturn flow_return)
{
PAD_LOCK (aggpad);
if (flow_return == GST_FLOW_NOT_LINKED)
aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return);
else
aggpad->priv->flow_return = flow_return;
gst_buffer_replace (&aggpad->priv->buffer, NULL);
PAD_BROADCAST_EVENT (aggpad);
PAD_UNLOCK (aggpad);
}
static void static void
gst_aggregator_aggregate_func (GstAggregator * self) gst_aggregator_aggregate_func (GstAggregator * self)
{ {
@ -655,10 +667,12 @@ gst_aggregator_aggregate_func (GstAggregator * self)
flow_return = klass->aggregate (self, timeout); flow_return = klass->aggregate (self, timeout);
GST_OBJECT_LOCK (self); GST_OBJECT_LOCK (self);
if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) {
priv->flow_return = GST_FLOW_OK; /* We don't want to set the pads to flushing, but we want to
else * stop the thread, so just break here */
priv->flow_return = flow_return; GST_OBJECT_UNLOCK (self);
break;
}
GST_OBJECT_UNLOCK (self); GST_OBJECT_UNLOCK (self);
if (flow_return == GST_FLOW_EOS) { if (flow_return == GST_FLOW_EOS) {
@ -667,8 +681,18 @@ gst_aggregator_aggregate_func (GstAggregator * self)
GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return)); GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return));
if (flow_return != GST_FLOW_OK) if (flow_return != GST_FLOW_OK) {
GList *item;
GST_OBJECT_LOCK (self);
for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) {
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
gst_aggregator_pad_set_flushing (aggpad, flow_return);
}
GST_OBJECT_UNLOCK (self);
break; break;
}
} }
/* Pause the task here, the only ways to get here are: /* Pause the task here, the only ways to get here are:
@ -692,7 +716,6 @@ gst_aggregator_start (GstAggregator * self)
self->priv->send_segment = TRUE; self->priv->send_segment = TRUE;
self->priv->send_eos = TRUE; self->priv->send_eos = TRUE;
self->priv->srccaps = NULL; self->priv->srccaps = NULL;
self->priv->flow_return = GST_FLOW_OK;
klass = GST_AGGREGATOR_GET_CLASS (self); klass = GST_AGGREGATOR_GET_CLASS (self);
@ -789,17 +812,6 @@ gst_aggregator_all_flush_stop_received_locked (GstAggregator * self)
return TRUE; return TRUE;
} }
static void
gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad)
{
PAD_LOCK (aggpad);
aggpad->priv->flushing = TRUE;
gst_buffer_replace (&aggpad->priv->buffer, NULL);
PAD_BROADCAST_EVENT (aggpad);
PAD_UNLOCK (aggpad);
}
static void static void
gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad, gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
GstEvent * event) GstEvent * event)
@ -807,7 +819,7 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
GstAggregatorPrivate *priv = self->priv; GstAggregatorPrivate *priv = self->priv;
GstAggregatorPadPrivate *padpriv = aggpad->priv; GstAggregatorPadPrivate *padpriv = aggpad->priv;
gst_aggregator_pad_set_flushing (aggpad); gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING);
PAD_FLUSH_LOCK (aggpad); PAD_FLUSH_LOCK (aggpad);
PAD_LOCK (aggpad); PAD_LOCK (aggpad);
@ -828,7 +840,6 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
GST_INFO_OBJECT (self, "Flushing, pausing srcpad task"); GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
gst_aggregator_stop_srcpad_task (self, event); gst_aggregator_stop_srcpad_task (self, event);
priv->flow_return = GST_FLOW_OK;
GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking"); GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
GST_PAD_STREAM_LOCK (self->srcpad); GST_PAD_STREAM_LOCK (self->srcpad);
@ -1065,7 +1076,7 @@ gst_aggregator_release_pad (GstElement * element, GstPad * pad)
GST_INFO_OBJECT (pad, "Removing pad"); GST_INFO_OBJECT (pad, "Removing pad");
SRC_LOCK (self); SRC_LOCK (self);
gst_aggregator_pad_set_flushing (aggpad); gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING);
gst_element_remove_pad (element, pad); gst_element_remove_pad (element, pad);
SRC_BROADCAST (self); SRC_BROADCAST (self);
@ -1790,7 +1801,6 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
{ {
GstBuffer *actual_buf = buffer; GstBuffer *actual_buf = buffer;
GstAggregator *self = GST_AGGREGATOR (object); GstAggregator *self = GST_AGGREGATOR (object);
GstAggregatorPrivate *priv = self->priv;
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object); GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object);
GstFlowReturn flow_return; GstFlowReturn flow_return;
@ -1800,13 +1810,18 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
PAD_FLUSH_LOCK (aggpad); PAD_FLUSH_LOCK (aggpad);
PAD_LOCK (aggpad); PAD_LOCK (aggpad);
flow_return = aggpad->priv->flow_return;
if (flow_return != GST_FLOW_OK)
goto flushing;
if (aggpad->priv->pending_eos == TRUE) if (aggpad->priv->pending_eos == TRUE)
goto eos; goto eos;
while (aggpad->priv->buffer && !aggpad->priv->flushing) while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK)
PAD_WAIT_EVENT (aggpad); PAD_WAIT_EVENT (aggpad);
if (aggpad->priv->flushing) flow_return = aggpad->priv->flow_return;
if (flow_return != GST_FLOW_OK)
goto flushing; goto flushing;
PAD_UNLOCK (aggpad); PAD_UNLOCK (aggpad);
@ -1820,6 +1835,9 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
if (aggpad->priv->buffer) if (aggpad->priv->buffer)
gst_buffer_unref (aggpad->priv->buffer); gst_buffer_unref (aggpad->priv->buffer);
aggpad->priv->buffer = actual_buf; aggpad->priv->buffer = actual_buf;
flow_return = aggpad->priv->flow_return;
PAD_UNLOCK (aggpad); PAD_UNLOCK (aggpad);
PAD_FLUSH_UNLOCK (aggpad); PAD_FLUSH_UNLOCK (aggpad);
@ -1828,10 +1846,6 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
GST_DEBUG_OBJECT (aggpad, "Done chaining"); GST_DEBUG_OBJECT (aggpad, "Done chaining");
GST_OBJECT_LOCK (self);
flow_return = priv->flow_return;
GST_OBJECT_UNLOCK (self);
return flow_return; return flow_return;
flushing: flushing:
@ -1839,9 +1853,10 @@ flushing:
PAD_FLUSH_UNLOCK (aggpad); PAD_FLUSH_UNLOCK (aggpad);
gst_buffer_unref (buffer); gst_buffer_unref (buffer);
GST_DEBUG_OBJECT (aggpad, "We are flushing"); GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer",
gst_flow_get_name (flow_return));
return GST_FLOW_FLUSHING; return flow_return;
eos: eos:
PAD_UNLOCK (aggpad); PAD_UNLOCK (aggpad);
@ -1863,10 +1878,10 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
if (GST_QUERY_IS_SERIALIZED (query)) { if (GST_QUERY_IS_SERIALIZED (query)) {
PAD_LOCK (aggpad); PAD_LOCK (aggpad);
while (aggpad->priv->buffer && !aggpad->priv->flushing) while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK)
PAD_WAIT_EVENT (aggpad); PAD_WAIT_EVENT (aggpad);
if (aggpad->priv->flushing) if (aggpad->priv->flow_return != GST_FLOW_OK)
goto flushing; goto flushing;
PAD_UNLOCK (aggpad); PAD_UNLOCK (aggpad);
@ -1876,8 +1891,9 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
GST_AGGREGATOR_PAD (pad), query); GST_AGGREGATOR_PAD (pad), query);
flushing: flushing:
GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query",
gst_flow_get_name (aggpad->priv->flow_return));
PAD_UNLOCK (aggpad); PAD_UNLOCK (aggpad);
GST_DEBUG_OBJECT (aggpad, "Pad is flushing, dropping query");
return FALSE; return FALSE;
} }
@ -1893,10 +1909,10 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
PAD_LOCK (aggpad); PAD_LOCK (aggpad);
while (aggpad->priv->buffer && !aggpad->priv->flushing) while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK)
PAD_WAIT_EVENT (aggpad); PAD_WAIT_EVENT (aggpad);
if (aggpad->priv->flushing if (aggpad->priv->flow_return != GST_FLOW_OK
&& GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
goto flushing; goto flushing;
@ -1907,8 +1923,9 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
GST_AGGREGATOR_PAD (pad), event); GST_AGGREGATOR_PAD (pad), event);
flushing: flushing:
GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event",
gst_flow_get_name (aggpad->priv->flow_return));
PAD_UNLOCK (aggpad); PAD_UNLOCK (aggpad);
GST_DEBUG_OBJECT (aggpad, "Pad is flushing, dropping event");
if (GST_EVENT_IS_STICKY (event)) if (GST_EVENT_IS_STICKY (event))
gst_pad_store_sticky_event (pad, event); gst_pad_store_sticky_event (pad, event);
gst_event_unref (event); gst_event_unref (event);
@ -1922,10 +1939,10 @@ gst_aggregator_pad_activate_mode_func (GstPad * pad,
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
if (active == FALSE) { if (active == FALSE) {
gst_aggregator_pad_set_flushing (aggpad); gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING);
} else { } else {
PAD_LOCK (aggpad); PAD_LOCK (aggpad);
aggpad->priv->flushing = FALSE; aggpad->priv->flow_return = GST_FLOW_OK;
PAD_BROADCAST_EVENT (aggpad); PAD_BROADCAST_EVENT (aggpad);
PAD_UNLOCK (aggpad); PAD_UNLOCK (aggpad);
} }