diff --git a/subprojects/gstreamer/libs/gst/base/gstaggregator.c b/subprojects/gstreamer/libs/gst/base/gstaggregator.c index fc5c336224..c0b71139d6 100644 --- a/subprojects/gstreamer/libs/gst/base/gstaggregator.c +++ b/subprojects/gstreamer/libs/gst/base/gstaggregator.c @@ -277,6 +277,9 @@ struct _GstAggregatorPadPrivate gboolean eos; + /* number of queued stream-start */ + gboolean stream_start_pending; + GMutex lock; GCond event_cond; /* This lock prevents a flush start processing happening while @@ -305,6 +308,7 @@ gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad) aggpad->priv->time_level = 0; aggpad->priv->first_buffer = TRUE; aggpad->priv->waited_once = FALSE; + aggpad->priv->stream_start_pending = FALSE; } static gboolean @@ -1100,6 +1104,7 @@ gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad, item = next; } aggpad->priv->num_buffers = 0; + aggpad->priv->stream_start_pending = FALSE; gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL); PAD_BROADCAST_EVENT (aggpad); @@ -1375,21 +1380,42 @@ gst_aggregator_negotiate (GstAggregator * self) return ret; } -static void -gst_aggregator_aggregate_func (GstAggregator * self) +static gboolean +gst_aggregator_check_pending_new_stream (GstElement * self, GstPad * pad, + gboolean * have_new_stream) +{ + GstAggregatorPad *aggpad = (GstAggregatorPad *) pad; + gboolean new_stream = FALSE; + + PAD_LOCK (aggpad); + if (aggpad->priv->stream_start_pending) + new_stream = TRUE; + PAD_UNLOCK (aggpad); + + if (new_stream) { + *have_new_stream = TRUE; + return FALSE; + } + + return TRUE; +} + +static GstFlowReturn +gst_aggregator_loop (GstAggregator * self) { GstAggregatorPrivate *priv = self->priv; GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self); gboolean timeout = FALSE; + GstFlowReturn flow_return = GST_FLOW_OK; if (self->priv->running == FALSE) { GST_DEBUG_OBJECT (self, "Not running anymore"); - return; + return GST_FLOW_OK; } GST_LOG_OBJECT (self, "Checking aggregate"); while (priv->send_eos && priv->running) { - GstFlowReturn flow_return = GST_FLOW_OK; + flow_return = GST_FLOW_OK; DoHandleEventsAndQueriesData events_query_data = { FALSE, GST_FLOW_OK }; gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), @@ -1402,10 +1428,14 @@ gst_aggregator_aggregate_func (GstAggregator * self) gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), gst_aggregator_pad_skip_buffers, NULL); + SRC_LOCK (self); if (self->priv->got_eos_event) { + self->priv->got_eos_event = FALSE; + SRC_UNLOCK (self); gst_aggregator_push_eos (self); continue; } + SRC_UNLOCK (self); /* Ensure we have buffers ready (either in clipped_buffer or at the head of * the queue */ @@ -1449,7 +1479,7 @@ gst_aggregator_aggregate_func (GstAggregator * self) /* We don't want to set the pads to flushing, but we want to * stop the thread, so just break here */ GST_OBJECT_UNLOCK (self); - break; + return GST_FLOW_FLUSHING; } GST_OBJECT_UNLOCK (self); @@ -1460,6 +1490,11 @@ gst_aggregator_aggregate_func (GstAggregator * self) handle_error: GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return)); + /* Don't flush buffer/event/queries on EOS. We may do restart pad task + * on new stream-start */ + if (flow_return == GST_FLOW_EOS) + return GST_FLOW_EOS; + if (flow_return != GST_FLOW_OK) { GList *item; @@ -1474,6 +1509,37 @@ gst_aggregator_aggregate_func (GstAggregator * self) } } + return flow_return; +} + +static void +gst_aggregator_aggregate_func (GstAggregator * self) +{ + GstAggregatorPrivate *priv = self->priv; + + while (1) { + GstFlowReturn ret; + gboolean pending_new_stream = FALSE; + + ret = gst_aggregator_loop (self); + + SRC_LOCK (self); + if (ret != GST_FLOW_EOS) + break; + + gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), + (GstElementForeachPadFunc) gst_aggregator_check_pending_new_stream, + &pending_new_stream); + + if (!pending_new_stream) + break; + + GST_INFO_OBJECT (self, "Have pending new stream, keep aggregate"); + gst_aggregator_reset_flow_values (self); + priv->send_eos = TRUE; + SRC_UNLOCK (self); + }; + /* Pause the task here, the only ways to get here are: * 1) We're stopping, in which case the task is stopped anyway * 2) We got a flow error above, in which case it might take @@ -1482,6 +1548,7 @@ gst_aggregator_aggregate_func (GstAggregator * self) * again without doing anything */ gst_pad_pause_task (self->srcpad); + SRC_UNLOCK (self); } static gboolean @@ -1761,9 +1828,14 @@ gst_aggregator_default_sink_event (GstAggregator * self, } case GST_EVENT_STREAM_START: { + SRC_LOCK (self); PAD_LOCK (aggpad); + GST_DEBUG_OBJECT (aggpad, "Clear EOS on STREAM-START"); aggpad->priv->eos = FALSE; + aggpad->priv->stream_start_pending = FALSE; PAD_UNLOCK (aggpad); + SRC_BROADCAST (self); + SRC_UNLOCK (self); goto eat; } case GST_EVENT_GAP: @@ -1847,13 +1919,31 @@ static GstFlowReturn gst_aggregator_default_sink_event_pre_queue (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event) { + GstAggregatorPrivate *priv = self->priv; GstFlowReturn ret = GST_FLOW_OK; + GstEventType event_type = GST_EVENT_TYPE (event); - if (GST_EVENT_IS_SERIALIZED (event) - && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) { + if (GST_EVENT_IS_SERIALIZED (event) && event_type != GST_EVENT_FLUSH_STOP) { SRC_LOCK (self); PAD_LOCK (aggpad); + if (event_type == GST_EVENT_STREAM_START) { + GstTaskState task_state; + aggpad->priv->flow_return = GST_FLOW_OK; + + aggpad->priv->stream_start_pending = TRUE; + + task_state = gst_pad_get_task_state (self->srcpad); + if (task_state == GST_TASK_PAUSED) { + GST_DEBUG_OBJECT (aggpad, "Resuming pad task"); + priv->send_eos = TRUE; + gst_aggregator_reset_flow_values (self); + gst_aggregator_start_srcpad_task (self); + } else { + GST_DEBUG_OBJECT (aggpad, "Pad task function is running already"); + } + } + if (aggpad->priv->flow_return != GST_FLOW_OK) goto flushing;