aggregator: refactor flushing logic

Instead of tracking "pending_flush_*" on the pads and the
aggregator, we now simply track the last seqnum for flush start
and flush stop events on the pads, and use it to determine whether
we should enter or exit our flushing state.

See https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/issues/977
This commit is contained in:
Mathieu Duponchelle 2019-05-22 21:37:43 +02:00
parent 7cc933dec7
commit 25383eaa5c
2 changed files with 132 additions and 117 deletions

View file

@ -234,8 +234,9 @@ struct _GstAggregatorPadPrivate
{ {
/* Following fields are protected by the PAD_LOCK */ /* Following fields are protected by the PAD_LOCK */
GstFlowReturn flow_return; GstFlowReturn flow_return;
gboolean pending_flush_start;
gboolean pending_flush_stop; guint32 last_flush_start_seqnum;
guint32 last_flush_stop_seqnum;
gboolean first_buffer; gboolean first_buffer;
@ -315,13 +316,14 @@ struct _GstAggregatorPrivate
/* Our state is >= PAUSED */ /* Our state is >= PAUSED */
gboolean running; /* protected by src_lock */ gboolean running; /* protected by src_lock */
/* seqnum from seek or segment, /* seqnum from last seek or common seqnum to flush start events received
* to be applied to synthetic segment/eos events */ * on all pads, for flushing without a seek */
gint seqnum; guint32 next_seqnum;
/* seqnum to apply to synthetic segment/eos events */
guint32 seqnum;
gboolean send_stream_start; /* protected by srcpad stream lock */ gboolean send_stream_start; /* protected by srcpad stream lock */
gboolean send_segment; gboolean send_segment;
gboolean flush_seeking; gboolean flushing;
gboolean pending_flush_start;
gboolean send_eos; /* protected by srcpad stream lock */ gboolean send_eos; /* protected by srcpad stream lock */
GstCaps *srccaps; /* protected by the srcpad stream lock */ GstCaps *srccaps; /* protected by the srcpad stream lock */
@ -513,7 +515,7 @@ gst_aggregator_push_mandatory_events (GstAggregator * self)
} }
GST_OBJECT_LOCK (self); GST_OBJECT_LOCK (self);
if (self->priv->send_segment && !self->priv->flush_seeking) { if (self->priv->send_segment && !self->priv->flushing) {
segment = segment =
gst_event_new_segment (&GST_AGGREGATOR_PAD (self->srcpad)->segment); gst_event_new_segment (&GST_AGGREGATOR_PAD (self->srcpad)->segment);
@ -528,7 +530,7 @@ gst_aggregator_push_mandatory_events (GstAggregator * self)
GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segment); GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segment);
} }
if (priv->tags && priv->tags_changed && !self->priv->flush_seeking) { if (priv->tags && priv->tags_changed && !self->priv->flushing) {
tags = gst_event_new_tag (gst_tag_list_ref (priv->tags)); tags = gst_event_new_tag (gst_tag_list_ref (priv->tags));
priv->tags_changed = FALSE; priv->tags_changed = FALSE;
} }
@ -563,13 +565,13 @@ gst_aggregator_default_finish_buffer (GstAggregator * self, GstBuffer * buffer)
gst_aggregator_push_mandatory_events (self); gst_aggregator_push_mandatory_events (self);
GST_OBJECT_LOCK (self); GST_OBJECT_LOCK (self);
if (!self->priv->flush_seeking && gst_pad_is_active (self->srcpad)) { if (!self->priv->flushing && gst_pad_is_active (self->srcpad)) {
GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer); GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
GST_OBJECT_UNLOCK (self); GST_OBJECT_UNLOCK (self);
return gst_pad_push (self->srcpad, buffer); return gst_pad_push (self->srcpad, buffer);
} else { } else {
GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)", GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
self->priv->flush_seeking, gst_pad_is_active (self->srcpad)); self->priv->flushing, gst_pad_is_active (self->srcpad));
GST_OBJECT_UNLOCK (self); GST_OBJECT_UNLOCK (self);
gst_buffer_unref (buffer); gst_buffer_unref (buffer);
return GST_FLOW_OK; return GST_FLOW_OK;
@ -1156,7 +1158,7 @@ gst_aggregator_aggregate_func (GstAggregator * self)
continue; continue;
GST_OBJECT_LOCK (self); GST_OBJECT_LOCK (self);
if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) { if (flow_return == GST_FLOW_FLUSHING && priv->flushing) {
/* We don't want to set the pads to flushing, but we want to /* We don't want to set the pads to flushing, but we want to
* stop the thread, so just break here */ * stop the thread, so just break here */
GST_OBJECT_UNLOCK (self); GST_OBJECT_UNLOCK (self);
@ -1218,18 +1220,6 @@ gst_aggregator_start (GstAggregator * self)
return result; return result;
} }
static gboolean
_check_pending_flush_stop (GstAggregatorPad * pad)
{
gboolean res;
PAD_LOCK (pad);
res = (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start);
PAD_UNLOCK (pad);
return res;
}
static gboolean static gboolean
gst_aggregator_stop_srcpad_task (GstAggregator * self, GstEvent * flush_start) gst_aggregator_stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
{ {
@ -1272,7 +1262,7 @@ gst_aggregator_flush (GstAggregator * self)
GST_DEBUG_OBJECT (self, "Flushing everything"); GST_DEBUG_OBJECT (self, "Flushing everything");
GST_OBJECT_LOCK (self); GST_OBJECT_LOCK (self);
priv->send_segment = TRUE; priv->send_segment = TRUE;
priv->flush_seeking = FALSE; priv->flushing = FALSE;
priv->tags_changed = FALSE; priv->tags_changed = FALSE;
GST_OBJECT_UNLOCK (self); GST_OBJECT_UNLOCK (self);
if (klass->flush) if (klass->flush)
@ -1285,7 +1275,7 @@ gst_aggregator_flush (GstAggregator * self)
/* Called with GstAggregator's object lock held */ /* Called with GstAggregator's object lock held */
static gboolean static gboolean
gst_aggregator_all_flush_stop_received_locked (GstAggregator * self) gst_aggregator_all_flush_stop_received (GstAggregator * self, guint32 seqnum)
{ {
GList *tmp; GList *tmp;
GstAggregatorPad *tmppad; GstAggregatorPad *tmppad;
@ -1293,9 +1283,25 @@ gst_aggregator_all_flush_stop_received_locked (GstAggregator * self)
for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) { for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
tmppad = (GstAggregatorPad *) tmp->data; tmppad = (GstAggregatorPad *) tmp->data;
if (_check_pending_flush_stop (tmppad) == FALSE) { if (tmppad->priv->last_flush_stop_seqnum != seqnum)
GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i", return FALSE;
tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop); }
return TRUE;
}
/* Called with GstAggregator's object lock held */
static gboolean
gst_aggregator_all_flush_start_received (GstAggregator * self, guint32 seqnum)
{
GList *tmp;
GstAggregatorPad *tmppad;
for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
tmppad = (GstAggregatorPad *) tmp->data;
if (tmppad->priv->last_flush_start_seqnum != seqnum) {
return FALSE; return FALSE;
} }
} }
@ -1309,41 +1315,36 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
{ {
GstAggregatorPrivate *priv = self->priv; GstAggregatorPrivate *priv = self->priv;
GstAggregatorPadPrivate *padpriv = aggpad->priv; GstAggregatorPadPrivate *padpriv = aggpad->priv;
guint32 seqnum = gst_event_get_seqnum (event);
gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, FALSE); gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, FALSE);
PAD_FLUSH_LOCK (aggpad); PAD_FLUSH_LOCK (aggpad);
PAD_LOCK (aggpad); PAD_LOCK (aggpad);
if (padpriv->pending_flush_start) { padpriv->last_flush_start_seqnum = seqnum;
GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
padpriv->pending_flush_start = FALSE;
padpriv->pending_flush_stop = TRUE;
}
PAD_UNLOCK (aggpad); PAD_UNLOCK (aggpad);
GST_OBJECT_LOCK (self); GST_OBJECT_LOCK (self);
if (priv->flush_seeking) {
/* If flush_seeking we forward the first FLUSH_START */ if (!priv->flushing && gst_aggregator_all_flush_start_received (self, seqnum)) {
if (priv->pending_flush_start) { /* Make sure we don't forward more than one FLUSH_START */
priv->pending_flush_start = FALSE; priv->flushing = TRUE;
priv->next_seqnum = seqnum;
GST_OBJECT_UNLOCK (self); GST_OBJECT_UNLOCK (self);
GST_INFO_OBJECT (self, "Flushing, pausing srcpad task"); GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
gst_aggregator_stop_srcpad_task (self, event); gst_aggregator_stop_srcpad_task (self, event);
GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking"); GST_INFO_OBJECT (self, "Getting STREAM_LOCK while flushing");
GST_PAD_STREAM_LOCK (self->srcpad); GST_PAD_STREAM_LOCK (self->srcpad);
GST_LOG_OBJECT (self, "GOT STREAM_LOCK"); GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
event = NULL; event = NULL;
} else { } else {
GST_OBJECT_UNLOCK (self);
gst_event_unref (event); gst_event_unref (event);
}
} else {
GST_OBJECT_UNLOCK (self); GST_OBJECT_UNLOCK (self);
gst_event_unref (event);
} }
PAD_FLUSH_UNLOCK (aggpad); PAD_FLUSH_UNLOCK (aggpad);
} }
@ -1400,17 +1401,30 @@ gst_aggregator_default_sink_event (GstAggregator * self,
case GST_EVENT_FLUSH_START: case GST_EVENT_FLUSH_START:
{ {
gst_aggregator_flush_start (self, aggpad, event); gst_aggregator_flush_start (self, aggpad, event);
/* We forward only in one case: right after flush_seeking */ /* We forward only in one case: right after flushing */
event = NULL; event = NULL;
goto eat; goto eat;
} }
case GST_EVENT_FLUSH_STOP: case GST_EVENT_FLUSH_STOP:
{ {
guint32 seqnum = gst_event_get_seqnum (event);
PAD_FLUSH_LOCK (aggpad);
PAD_LOCK (aggpad);
aggpad->priv->last_flush_stop_seqnum = seqnum;
PAD_UNLOCK (aggpad);
/* aggregate might be running if this FLUSH_STOP was not
* sent following a flushing seek, let's make sure we don't
* flush the pad's current buffer before aggregate has returned
*/
GST_PAD_STREAM_LOCK (self->srcpad);
gst_aggregator_pad_flush (aggpad, self); gst_aggregator_pad_flush (aggpad, self);
GST_PAD_STREAM_UNLOCK (self->srcpad);
GST_OBJECT_LOCK (self); GST_OBJECT_LOCK (self);
if (priv->flush_seeking) { if (priv->flushing
g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE); && gst_aggregator_all_flush_stop_received (self, seqnum)) {
if (gst_aggregator_all_flush_stop_received_locked (self)) {
GST_OBJECT_UNLOCK (self); GST_OBJECT_UNLOCK (self);
/* That means we received FLUSH_STOP/FLUSH_STOP on /* That means we received FLUSH_STOP/FLUSH_STOP on
* all sinkpads -- Seeking is Done... sending FLUSH_STOP */ * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
@ -1422,15 +1436,16 @@ gst_aggregator_default_sink_event (GstAggregator * self,
SRC_BROADCAST (self); SRC_BROADCAST (self);
SRC_UNLOCK (self); SRC_UNLOCK (self);
GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK"); GST_INFO_OBJECT (self,
"Flush stopped, releasing source pad STREAM_LOCK");
GST_PAD_STREAM_UNLOCK (self->srcpad); GST_PAD_STREAM_UNLOCK (self->srcpad);
gst_aggregator_start_srcpad_task (self); gst_aggregator_start_srcpad_task (self);
} else { } else {
GST_OBJECT_UNLOCK (self); GST_OBJECT_UNLOCK (self);
} }
} else {
GST_OBJECT_UNLOCK (self); PAD_FLUSH_UNLOCK (aggpad);
}
/* We never forward the event */ /* We never forward the event */
goto eat; goto eat;
@ -1581,6 +1596,14 @@ gst_aggregator_stop (GstAggregator * agg)
gst_aggregator_set_allocation (agg, NULL, NULL, NULL, NULL); gst_aggregator_set_allocation (agg, NULL, NULL, NULL, NULL);
if (agg->priv->running) {
/* As sinkpads get deactivated after the src pad, we
* may have restarted the source pad task after receiving
* flush events on one of our sinkpads. Stop our src pad
* task again if that is the case */
gst_aggregator_stop_srcpad_task (agg, NULL);
}
return result; return result;
} }
@ -1875,7 +1898,7 @@ gst_aggregator_send_event (GstElement * element, GstEvent * event)
GST_OBJECT_LOCK (self); GST_OBJECT_LOCK (self);
gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt, gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt,
flags, start_type, start, stop_type, stop, NULL); flags, start_type, start, stop_type, stop, NULL);
self->priv->seqnum = gst_event_get_seqnum (event); self->priv->next_seqnum = gst_event_get_seqnum (event);
self->priv->first_buffer = FALSE; self->priv->first_buffer = FALSE;
GST_OBJECT_UNLOCK (self); GST_OBJECT_UNLOCK (self);
@ -1883,7 +1906,6 @@ gst_aggregator_send_event (GstElement * element, GstEvent * event)
} }
GST_STATE_UNLOCK (element); GST_STATE_UNLOCK (element);
return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element, return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
event); event);
} }
@ -1959,13 +1981,6 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
gst_query_unref (seeking); gst_query_unref (seeking);
} }
if (evdata->flush) {
PAD_LOCK (aggpad);
aggpad->priv->pending_flush_start = FALSE;
aggpad->priv->pending_flush_stop = FALSE;
PAD_UNLOCK (aggpad);
}
} else { } else {
evdata->one_actually_seeked = TRUE; evdata->one_actually_seeked = TRUE;
} }
@ -1986,24 +2001,6 @@ gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self,
evdata->result = TRUE; evdata->result = TRUE;
evdata->one_actually_seeked = FALSE; evdata->one_actually_seeked = FALSE;
/* We first need to set all pads as flushing in a first pass
* as flush_start flush_stop is sometimes sent synchronously
* while we send the seek event */
if (evdata->flush) {
GList *l;
GST_OBJECT_LOCK (self);
for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) {
GstAggregatorPad *pad = l->data;
PAD_LOCK (pad);
pad->priv->pending_flush_start = TRUE;
pad->priv->pending_flush_stop = FALSE;
PAD_UNLOCK (pad);
}
GST_OBJECT_UNLOCK (self);
}
gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, evdata); gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, evdata);
gst_event_unref (evdata->event); gst_event_unref (evdata->event);
@ -2029,18 +2026,26 @@ gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
flush = flags & GST_SEEK_FLAG_FLUSH; flush = flags & GST_SEEK_FLAG_FLUSH;
GST_OBJECT_LOCK (self); GST_OBJECT_LOCK (self);
if (flush) { self->priv->next_seqnum = gst_event_get_seqnum (event);
priv->pending_flush_start = TRUE;
priv->flush_seeking = TRUE;
}
gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt, gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt,
flags, start_type, start, stop_type, stop, NULL); flags, start_type, start, stop_type, stop, NULL);
/* Seeking sets a position */ /* Seeking sets a position */
self->priv->first_buffer = FALSE; self->priv->first_buffer = FALSE;
if (flush)
priv->flushing = TRUE;
GST_OBJECT_UNLOCK (self); GST_OBJECT_UNLOCK (self);
if (flush) {
GstEvent *event = gst_event_new_flush_start ();
gst_event_set_seqnum (event, self->priv->next_seqnum);
gst_aggregator_stop_srcpad_task (self, event);
}
/* forward the seek upstream */ /* forward the seek upstream */
evdata.event = event; evdata.event = event;
evdata.flush = flush; evdata.flush = flush;
@ -2050,8 +2055,7 @@ gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
if (!evdata.result || !evdata.one_actually_seeked) { if (!evdata.result || !evdata.one_actually_seeked) {
GST_OBJECT_LOCK (self); GST_OBJECT_LOCK (self);
priv->flush_seeking = FALSE; priv->flushing = FALSE;
priv->pending_flush_start = FALSE;
GST_OBJECT_UNLOCK (self); GST_OBJECT_UNLOCK (self);
} }
@ -2136,6 +2140,7 @@ gst_aggregator_src_pad_activate_mode_func (GstPad * pad,
/* deactivating */ /* deactivating */
GST_INFO_OBJECT (self, "Deactivating srcpad"); GST_INFO_OBJECT (self, "Deactivating srcpad");
gst_aggregator_stop_srcpad_task (self, FALSE); gst_aggregator_stop_srcpad_task (self, FALSE);
return TRUE; return TRUE;

View file

@ -797,6 +797,7 @@ GST_START_TEST (test_flushing_seek)
ChainData data2 = { 0, }; ChainData data2 = { 0, };
TestData test = { 0, }; TestData test = { 0, };
GstBuffer *buf; GstBuffer *buf;
guint32 seqnum;
_test_data_init (&test, TRUE); _test_data_init (&test, TRUE);
@ -817,16 +818,20 @@ GST_START_TEST (test_flushing_seek)
/* now do a successful flushing seek */ /* now do a successful flushing seek */
event = gst_event_new_seek (1, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH, event = gst_event_new_seek (1, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH,
GST_SEEK_TYPE_SET, 0, GST_SEEK_TYPE_SET, 10 * GST_SECOND); GST_SEEK_TYPE_SET, 0, GST_SEEK_TYPE_SET, 10 * GST_SECOND);
seqnum = gst_event_get_seqnum (event);
fail_unless (gst_pad_send_event (test.srcpad, event)); fail_unless (gst_pad_send_event (test.srcpad, event));
/* flushing starts once one of the upstream elements sends the first /* flushing starts when a flushing seek is received, and stops
* FLUSH_START */ * when all sink pads have received FLUSH_STOP */
fail_unless_equals_int (test.flush_start_events, 0); fail_unless_equals_int (test.flush_start_events, 1);
fail_unless_equals_int (test.flush_stop_events, 0); fail_unless_equals_int (test.flush_stop_events, 0);
/* send a first FLUSH_START on agg:sink_0, will be sent downstream */ /* send a first FLUSH_START on agg:sink_0, nothing will be sent
* downstream */
GST_DEBUG_OBJECT (data2.sinkpad, "send flush_start"); GST_DEBUG_OBJECT (data2.sinkpad, "send flush_start");
fail_unless (gst_pad_push_event (data2.srcpad, gst_event_new_flush_start ())); event = gst_event_new_flush_start ();
gst_event_set_seqnum (event, seqnum);
fail_unless (gst_pad_push_event (data2.srcpad, event));
fail_unless_equals_int (test.flush_start_events, 1); fail_unless_equals_int (test.flush_start_events, 1);
fail_unless_equals_int (test.flush_stop_events, 0); fail_unless_equals_int (test.flush_stop_events, 0);
@ -834,16 +839,19 @@ GST_START_TEST (test_flushing_seek)
data2.expected_result = GST_FLOW_FLUSHING; data2.expected_result = GST_FLOW_FLUSHING;
thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL); thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL);
/* this should send not additional flush_start */ /* this should send no additional flush_start */
GST_DEBUG_OBJECT (data1.sinkpad, "send flush_start"); GST_DEBUG_OBJECT (data1.sinkpad, "send flush_start");
fail_unless (gst_pad_push_event (data1.srcpad, gst_event_new_flush_start ())); event = gst_event_new_flush_start ();
gst_event_set_seqnum (event, seqnum);
fail_unless (gst_pad_push_event (data1.srcpad, event));
fail_unless_equals_int (test.flush_start_events, 1); fail_unless_equals_int (test.flush_start_events, 1);
fail_unless_equals_int (test.flush_stop_events, 0); fail_unless_equals_int (test.flush_stop_events, 0);
/* the first FLUSH_STOP is not forwarded downstream */ /* the first FLUSH_STOP is not forwarded downstream */
GST_DEBUG_OBJECT (data1.srcpad, "send flush_stop"); GST_DEBUG_OBJECT (data1.srcpad, "send flush_stop");
fail_unless (gst_pad_push_event (data1.srcpad, event = gst_event_new_flush_stop (TRUE);
gst_event_new_flush_stop (TRUE))); gst_event_set_seqnum (event, seqnum);
fail_unless (gst_pad_push_event (data1.srcpad, event));
fail_unless_equals_int (test.flush_start_events, 1); fail_unless_equals_int (test.flush_start_events, 1);
fail_unless_equals_int (test.flush_stop_events, 0); fail_unless_equals_int (test.flush_stop_events, 0);
@ -858,7 +866,9 @@ GST_START_TEST (test_flushing_seek)
/* flush agg:sink_1 as well. This completes the flushing seek so a FLUSH_STOP is /* flush agg:sink_1 as well. This completes the flushing seek so a FLUSH_STOP is
* sent downstream */ * sent downstream */
GST_DEBUG_OBJECT (data2.srcpad, "send flush_stop"); GST_DEBUG_OBJECT (data2.srcpad, "send flush_stop");
gst_pad_push_event (data2.srcpad, gst_event_new_flush_stop (TRUE)); event = gst_event_new_flush_stop (TRUE);
gst_event_set_seqnum (event, seqnum);
gst_pad_push_event (data2.srcpad, event);
/* and the last FLUSH_STOP is forwarded downstream */ /* and the last FLUSH_STOP is forwarded downstream */
fail_unless_equals_int (test.flush_stop_events, 1); fail_unless_equals_int (test.flush_stop_events, 1);