aggregator: Restart srcpad task on stream-start

Re-start srcpad task on stream-start in addition to flush event
so that subclass can process data when new pad is added
after EOS or an input stream is started again with stream-start event

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4783>
This commit is contained in:
Seungha Yang 2023-06-06 03:03:07 +09:00
parent 1aa9e74aaf
commit f78e7b1bef

View file

@ -277,6 +277,9 @@ struct _GstAggregatorPadPrivate
gboolean eos; gboolean eos;
/* number of queued stream-start */
gboolean stream_start_pending;
GMutex lock; GMutex lock;
GCond event_cond; GCond event_cond;
/* This lock prevents a flush start processing happening while /* This lock prevents a flush start processing happening while
@ -305,6 +308,7 @@ gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad)
aggpad->priv->time_level = 0; aggpad->priv->time_level = 0;
aggpad->priv->first_buffer = TRUE; aggpad->priv->first_buffer = TRUE;
aggpad->priv->waited_once = FALSE; aggpad->priv->waited_once = FALSE;
aggpad->priv->stream_start_pending = FALSE;
} }
static gboolean static gboolean
@ -1100,6 +1104,7 @@ gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
item = next; item = next;
} }
aggpad->priv->num_buffers = 0; aggpad->priv->num_buffers = 0;
aggpad->priv->stream_start_pending = FALSE;
gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL); gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL);
PAD_BROADCAST_EVENT (aggpad); PAD_BROADCAST_EVENT (aggpad);
@ -1375,21 +1380,42 @@ gst_aggregator_negotiate (GstAggregator * self)
return ret; return ret;
} }
static void static gboolean
gst_aggregator_aggregate_func (GstAggregator * self) gst_aggregator_check_pending_new_stream (GstElement * self, GstPad * pad,
gboolean * have_new_stream)
{
GstAggregatorPad *aggpad = (GstAggregatorPad *) pad;
gboolean new_stream = FALSE;
PAD_LOCK (aggpad);
if (aggpad->priv->stream_start_pending)
new_stream = TRUE;
PAD_UNLOCK (aggpad);
if (new_stream) {
*have_new_stream = TRUE;
return FALSE;
}
return TRUE;
}
static GstFlowReturn
gst_aggregator_loop (GstAggregator * self)
{ {
GstAggregatorPrivate *priv = self->priv; GstAggregatorPrivate *priv = self->priv;
GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self); GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
gboolean timeout = FALSE; gboolean timeout = FALSE;
GstFlowReturn flow_return = GST_FLOW_OK;
if (self->priv->running == FALSE) { if (self->priv->running == FALSE) {
GST_DEBUG_OBJECT (self, "Not running anymore"); GST_DEBUG_OBJECT (self, "Not running anymore");
return; return GST_FLOW_OK;
} }
GST_LOG_OBJECT (self, "Checking aggregate"); GST_LOG_OBJECT (self, "Checking aggregate");
while (priv->send_eos && priv->running) { while (priv->send_eos && priv->running) {
GstFlowReturn flow_return = GST_FLOW_OK; flow_return = GST_FLOW_OK;
DoHandleEventsAndQueriesData events_query_data = { FALSE, GST_FLOW_OK }; DoHandleEventsAndQueriesData events_query_data = { FALSE, GST_FLOW_OK };
gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
@ -1402,10 +1428,14 @@ gst_aggregator_aggregate_func (GstAggregator * self)
gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
gst_aggregator_pad_skip_buffers, NULL); gst_aggregator_pad_skip_buffers, NULL);
SRC_LOCK (self);
if (self->priv->got_eos_event) { if (self->priv->got_eos_event) {
self->priv->got_eos_event = FALSE;
SRC_UNLOCK (self);
gst_aggregator_push_eos (self); gst_aggregator_push_eos (self);
continue; continue;
} }
SRC_UNLOCK (self);
/* Ensure we have buffers ready (either in clipped_buffer or at the head of /* Ensure we have buffers ready (either in clipped_buffer or at the head of
* the queue */ * the queue */
@ -1449,7 +1479,7 @@ gst_aggregator_aggregate_func (GstAggregator * self)
/* We don't want to set the pads to flushing, but we want to /* We don't want to set the pads to flushing, but we want to
* stop the thread, so just break here */ * stop the thread, so just break here */
GST_OBJECT_UNLOCK (self); GST_OBJECT_UNLOCK (self);
break; return GST_FLOW_FLUSHING;
} }
GST_OBJECT_UNLOCK (self); GST_OBJECT_UNLOCK (self);
@ -1460,6 +1490,11 @@ gst_aggregator_aggregate_func (GstAggregator * self)
handle_error: handle_error:
GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return)); GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return));
/* Don't flush buffer/event/queries on EOS. We may do restart pad task
* on new stream-start */
if (flow_return == GST_FLOW_EOS)
return GST_FLOW_EOS;
if (flow_return != GST_FLOW_OK) { if (flow_return != GST_FLOW_OK) {
GList *item; GList *item;
@ -1474,6 +1509,37 @@ gst_aggregator_aggregate_func (GstAggregator * self)
} }
} }
return flow_return;
}
static void
gst_aggregator_aggregate_func (GstAggregator * self)
{
GstAggregatorPrivate *priv = self->priv;
while (1) {
GstFlowReturn ret;
gboolean pending_new_stream = FALSE;
ret = gst_aggregator_loop (self);
SRC_LOCK (self);
if (ret != GST_FLOW_EOS)
break;
gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
(GstElementForeachPadFunc) gst_aggregator_check_pending_new_stream,
&pending_new_stream);
if (!pending_new_stream)
break;
GST_INFO_OBJECT (self, "Have pending new stream, keep aggregate");
gst_aggregator_reset_flow_values (self);
priv->send_eos = TRUE;
SRC_UNLOCK (self);
};
/* Pause the task here, the only ways to get here are: /* Pause the task here, the only ways to get here are:
* 1) We're stopping, in which case the task is stopped anyway * 1) We're stopping, in which case the task is stopped anyway
* 2) We got a flow error above, in which case it might take * 2) We got a flow error above, in which case it might take
@ -1482,6 +1548,7 @@ gst_aggregator_aggregate_func (GstAggregator * self)
* again without doing anything * again without doing anything
*/ */
gst_pad_pause_task (self->srcpad); gst_pad_pause_task (self->srcpad);
SRC_UNLOCK (self);
} }
static gboolean static gboolean
@ -1761,9 +1828,14 @@ gst_aggregator_default_sink_event (GstAggregator * self,
} }
case GST_EVENT_STREAM_START: case GST_EVENT_STREAM_START:
{ {
SRC_LOCK (self);
PAD_LOCK (aggpad); PAD_LOCK (aggpad);
GST_DEBUG_OBJECT (aggpad, "Clear EOS on STREAM-START");
aggpad->priv->eos = FALSE; aggpad->priv->eos = FALSE;
aggpad->priv->stream_start_pending = FALSE;
PAD_UNLOCK (aggpad); PAD_UNLOCK (aggpad);
SRC_BROADCAST (self);
SRC_UNLOCK (self);
goto eat; goto eat;
} }
case GST_EVENT_GAP: case GST_EVENT_GAP:
@ -1847,13 +1919,31 @@ static GstFlowReturn
gst_aggregator_default_sink_event_pre_queue (GstAggregator * self, gst_aggregator_default_sink_event_pre_queue (GstAggregator * self,
GstAggregatorPad * aggpad, GstEvent * event) GstAggregatorPad * aggpad, GstEvent * event)
{ {
GstAggregatorPrivate *priv = self->priv;
GstFlowReturn ret = GST_FLOW_OK; GstFlowReturn ret = GST_FLOW_OK;
GstEventType event_type = GST_EVENT_TYPE (event);
if (GST_EVENT_IS_SERIALIZED (event) if (GST_EVENT_IS_SERIALIZED (event) && event_type != GST_EVENT_FLUSH_STOP) {
&& GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
SRC_LOCK (self); SRC_LOCK (self);
PAD_LOCK (aggpad); PAD_LOCK (aggpad);
if (event_type == GST_EVENT_STREAM_START) {
GstTaskState task_state;
aggpad->priv->flow_return = GST_FLOW_OK;
aggpad->priv->stream_start_pending = TRUE;
task_state = gst_pad_get_task_state (self->srcpad);
if (task_state == GST_TASK_PAUSED) {
GST_DEBUG_OBJECT (aggpad, "Resuming pad task");
priv->send_eos = TRUE;
gst_aggregator_reset_flow_values (self);
gst_aggregator_start_srcpad_task (self);
} else {
GST_DEBUG_OBJECT (aggpad, "Pad task function is running already");
}
}
if (aggpad->priv->flow_return != GST_FLOW_OK) if (aggpad->priv->flow_return != GST_FLOW_OK)
goto flushing; goto flushing;