multiqueue: Handle gapless input

When dealing with gapless input (i.e. streams with changing group-id in
GST_EVENT_STREAM_START), we need to take into account the elapsed
running-time (if applicable) in order to properly calculate levels and output
time. Without doing this all incoming data from future groups would be
considered as being "late" and would be consumed immediately.

This does **NOT** modify the actual segment and buffer times, and is only used
internally.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/2784>
This commit is contained in:
Edward Hervey 2022-07-21 15:26:14 +02:00 committed by GStreamer Marge Bot
parent f785b3d584
commit 4b8f411c5d

View file

@ -147,6 +147,15 @@ struct _GstSingleQueue
/* TRUE if either position needs to be recalculated */
gboolean sink_tainted, src_tainted;
/* stream group id */
guint32 sink_stream_gid;
guint32 src_stream_gid;
/* TRUE if the stream group-id changed. Resetted to FALSE the next time the
* segment is calculated */
gboolean sink_stream_gid_changed;
gboolean src_stream_gid_changed;
/* queue of data */
GstDataQueue *queue;
GstDataQueueSize max_size, extra_size;
@ -1343,6 +1352,9 @@ gst_single_queue_pause (GstMultiQueue * mq, GstSingleQueue * sq)
}
sq->sink_tainted = sq->src_tainted = TRUE;
sq->sink_stream_gid = sq->src_stream_gid = GST_GROUP_ID_INVALID;
sq->sink_stream_gid_changed = FALSE;
sq->src_stream_gid_changed = FALSE;
return result;
}
@ -1358,6 +1370,9 @@ gst_single_queue_stop (GstMultiQueue * mq, GstSingleQueue * sq)
gst_object_unref (srcpad);
}
sq->sink_tainted = sq->src_tainted = TRUE;
sq->sink_stream_gid = sq->src_stream_gid = GST_GROUP_ID_INVALID;
sq->sink_stream_gid_changed = FALSE;
sq->src_stream_gid_changed = FALSE;
return result;
}
@ -1762,6 +1777,23 @@ static void
apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
GstSegment * segment)
{
GstClockTimeDiff ppos = 0;
/* If we switched groups, grab the previous position */
if (segment->rate > 0.0) {
if (segment == &sq->sink_segment && sq->sink_stream_gid_changed) {
ppos =
gst_segment_to_running_time (segment, GST_FORMAT_TIME,
segment->position);
sq->sink_stream_gid_changed = FALSE;
} else if (segment == &sq->src_segment && sq->src_stream_gid_changed) {
ppos =
gst_segment_to_running_time (segment, GST_FORMAT_TIME,
segment->position);
sq->src_stream_gid_changed = FALSE;
}
}
gst_event_copy_segment (event, segment);
/* now configure the values, we use these to track timestamps on the
@ -1776,12 +1808,19 @@ apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
}
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
if (ppos) {
GST_DEBUG_OBJECT (mq, "queue %d, Applying base of %" GST_TIME_FORMAT,
sq->id, GST_TIME_ARGS (ppos));
segment->base = ppos;
}
/* Make sure we have a valid initial segment position (and not garbage
* from upstream) */
if (segment->rate > 0.0)
segment->position = segment->start;
else
segment->position = segment->stop;
if (segment == &sq->sink_segment)
sq->sink_tainted = TRUE;
else {
@ -1982,10 +2021,21 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
*allow_drop = FALSE;
break;
case GST_EVENT_STREAM_START:
{
guint32 group_id;
if (gst_event_parse_group_id (event, &group_id)) {
if (sq->src_stream_gid == GST_GROUP_ID_INVALID) {
sq->src_stream_gid = group_id;
} else if (group_id != sq->src_stream_gid) {
sq->src_stream_gid = group_id;
sq->src_stream_gid_changed = TRUE;
}
}
result = GST_FLOW_OK;
if (G_UNLIKELY (*allow_drop))
*allow_drop = FALSE;
break;
}
case GST_EVENT_SEGMENT:
apply_segment (mq, sq, event, &sq->src_segment);
/* Applying the segment may have made the queue non-full again, unblock it if needed */
@ -2607,6 +2657,15 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
switch (type) {
case GST_EVENT_STREAM_START:
{
guint32 group_id;
if (gst_event_parse_group_id (event, &group_id)) {
if (sq->sink_stream_gid == GST_GROUP_ID_INVALID) {
sq->sink_stream_gid = group_id;
} else if (group_id != sq->sink_stream_gid) {
sq->sink_stream_gid = group_id;
sq->sink_stream_gid_changed = TRUE;
}
}
if (mq->sync_by_running_time) {
GstStreamFlags stream_flags;
gst_event_parse_stream_flags (event, &stream_flags);
@ -3473,6 +3532,10 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id)
sq->sink_tainted = TRUE;
sq->src_tainted = TRUE;
sq->sink_stream_gid = sq->src_stream_gid = GST_GROUP_ID_INVALID;
sq->sink_stream_gid_changed = FALSE;
sq->src_stream_gid_changed = FALSE;
name = g_strdup_printf ("sink_%u", sq->id);
templ = gst_static_pad_template_get (&sinktemplate);
sinkpad = g_object_new (GST_TYPE_MULTIQUEUE_PAD, "name", name,