From aa5f6b2376d7736b6b996ad773c8661494dc3714 Mon Sep 17 00:00:00 2001 From: Seungha Yang Date: Thu, 5 Oct 2023 22:40:43 +0900 Subject: [PATCH] multiqueue: Do not update time level on segment Update time level decision logic to be identical to the queue/queue2 Part-of: --- .../plugins/elements/gstmultiqueue.c | 129 +++++++++--------- 1 file changed, 64 insertions(+), 65 deletions(-) diff --git a/subprojects/gstreamer/plugins/elements/gstmultiqueue.c b/subprojects/gstreamer/plugins/elements/gstmultiqueue.c index 1f759d6aaf..9f204c9efe 100644 --- a/subprojects/gstreamer/plugins/elements/gstmultiqueue.c +++ b/subprojects/gstreamer/plugins/elements/gstmultiqueue.c @@ -141,12 +141,10 @@ struct _GstSingleQueue /* segments */ GstSegment sink_segment; GstSegment src_segment; - gboolean has_src_segment; /* preferred over initializing the src_segment to - * UNDEFINED as this doesn't requires adding ifs - * in every segment usage */ /* position of src/sink */ GstClockTimeDiff sinktime, srctime; + GstClockTimeDiff sink_start_time; /* cached input value, used for interleave */ GstClockTimeDiff cached_sinktime; /* TRUE if either position needs to be recalculated */ @@ -1359,8 +1357,6 @@ gst_single_queue_pause (GstMultiQueue * mq, GstSingleQueue * sq) gst_object_unref (srcpad); } - sq->sink_tainted = sq->src_tainted = TRUE; - return result; } @@ -1375,7 +1371,6 @@ gst_single_queue_stop (GstMultiQueue * mq, GstSingleQueue * sq) result = gst_pad_stop_task (srcpad); gst_object_unref (srcpad); } - sq->sink_tainted = sq->src_tainted = TRUE; return result; } @@ -1405,7 +1400,6 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush, GST_MULTI_QUEUE_MUTEX_LOCK (mq); gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME); gst_segment_init (&sq->src_segment, GST_FORMAT_TIME); - sq->has_src_segment = FALSE; /* All pads start off OK for a smooth kick-off */ sq->srcresult = GST_FLOW_OK; sq->pushed = FALSE; @@ -1416,6 +1410,9 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush, sq->nextid = 0; sq->oldid = 0; sq->last_oldid = G_MAXUINT32; + sq->sinktime = GST_CLOCK_STIME_NONE; + sq->srctime = GST_CLOCK_STIME_NONE; + sq->sink_start_time = GST_CLOCK_STIME_NONE; sq->next_time = GST_CLOCK_STIME_NONE; sq->last_time = GST_CLOCK_STIME_NONE; sq->cached_sinktime = GST_CLOCK_STIME_NONE; @@ -1429,6 +1426,8 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush, mq->high_time = GST_CLOCK_STIME_NONE; sq->flushing = FALSE; + + sq->sink_tainted = sq->src_tainted = FALSE; GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); } } @@ -1698,7 +1697,7 @@ calculate_interleave (GstMultiQueue * mq, GstSingleQueue * sq) static void update_time_level (GstMultiQueue * mq, GstSingleQueue * sq) { - GstClockTimeDiff sink_time, src_time; + GstClockTimeDiff sink_time, src_time, sink_start_time; if (sq->sink_tainted) { sink_time = sq->sinktime = my_segment_to_running_time (&sq->sink_segment, @@ -1716,56 +1715,48 @@ update_time_level (GstMultiQueue * mq, GstSingleQueue * sq) * we set the last_time */ sq->last_time = sink_time; } - if (G_UNLIKELY (sink_time != GST_CLOCK_STIME_NONE)) { + + sq->sink_tainted = FALSE; + if (sink_time != GST_CLOCK_STIME_NONE) { /* if we have a time, we become untainted and use the time */ - sq->sink_tainted = FALSE; if (mq->use_interleave) { sq->cached_sinktime = sink_time; calculate_interleave (mq, sq); } } - } else + } else { sink_time = sq->sinktime; + } + + sink_start_time = sq->sink_start_time; if (sq->src_tainted) { - GstSegment *segment; - gint64 position; - - if (sq->has_src_segment) { - segment = &sq->src_segment; - position = sq->src_segment.position; - } else { - /* - * If the src pad had no segment yet, use the sink segment - * to avoid signalling overrun if the received sink segment has a - * a position > max-size-time while the src pad time would be the default=0 - * - * This can happen when switching pads on chained/adaptive streams and the - * new chain has a segment with a much larger position - */ - segment = &sq->sink_segment; - position = sq->sink_segment.position; - } - - src_time = sq->srctime = my_segment_to_running_time (segment, position); - /* if we have a time, we become untainted and use the time */ - if (G_UNLIKELY (src_time != GST_CLOCK_STIME_NONE)) { - sq->src_tainted = FALSE; - } - } else + src_time = sq->srctime = my_segment_to_running_time (&sq->src_segment, + sq->src_segment.position); + sq->src_tainted = FALSE; + } else { src_time = sq->srctime; + } GST_DEBUG_ID (sq->debug_id, - "sink %" GST_STIME_FORMAT ", src %" GST_STIME_FORMAT, - GST_STIME_ARGS (sink_time), GST_STIME_ARGS (src_time)); + "sink %" GST_STIME_FORMAT ", src %" GST_STIME_FORMAT + ", sink-start-time %" GST_STIME_FORMAT, GST_STIME_ARGS (sink_time), + GST_STIME_ARGS (src_time), GST_STIME_ARGS (sink_start_time)); - /* This allows for streams with out of order timestamping - sometimes the - * emerging timestamp is later than the arriving one(s) */ - if (G_LIKELY (GST_CLOCK_STIME_IS_VALID (sink_time) && - GST_CLOCK_STIME_IS_VALID (src_time) && sink_time > src_time)) - sq->cur_time = sink_time - src_time; - else + if (GST_CLOCK_STIME_IS_VALID (sink_time)) { + if (!GST_CLOCK_STIME_IS_VALID (src_time) && + GST_CLOCK_STIME_IS_VALID (sink_start_time) && + sink_time >= sink_start_time) { + /* If we got input buffers but output thread didn't push any buffer yet */ + sq->cur_time = sink_time - sink_start_time; + } else if (GST_CLOCK_STIME_IS_VALID (src_time) && sink_time >= src_time) { + sq->cur_time = sink_time - src_time; + } else { + sq->cur_time = 0; + } + } else { sq->cur_time = 0; + } /* updating the time level can change the buffering state */ update_buffering (mq, sq); @@ -1773,22 +1764,22 @@ update_time_level (GstMultiQueue * mq, GstSingleQueue * sq) return; } -/* take a SEGMENT event and apply the values to segment, updating the time - * level of queue. */ +/* take a SEGMENT event and apply the values to segment */ static void apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event, GstSegment * segment) { GstClockTimeDiff ppos = 0; + gboolean is_sink = segment == &sq->sink_segment; /* If we switched groups, grab the previous position */ if (segment->rate > 0.0) { - if (segment == &sq->sink_segment && sq->sink_stream_gid_changed) { + if (is_sink && sq->sink_stream_gid_changed) { ppos = gst_segment_to_running_time (segment, GST_FORMAT_TIME, segment->position); sq->sink_stream_gid_changed = FALSE; - } else if (segment == &sq->src_segment && sq->src_stream_gid_changed) { + } else if (!is_sink && sq->src_stream_gid_changed) { ppos = gst_segment_to_running_time (segment, GST_FORMAT_TIME, segment->position); @@ -1823,21 +1814,17 @@ apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event, else segment->position = segment->stop; - if (segment == &sq->sink_segment) - sq->sink_tainted = TRUE; + /* Will be updated on buffer flows */ + if (is_sink) + sq->sink_tainted = FALSE; else { - sq->has_src_segment = TRUE; - sq->src_tainted = TRUE; + sq->src_tainted = FALSE; } GST_DEBUG_ID (sq->debug_id, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment); - /* segment can update the time level of the queue */ - update_time_level (mq, sq); - GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); - gst_multi_queue_post_buffering (mq); } /* take a buffer and update segment, updating the time level of the queue. */ @@ -1845,6 +1832,8 @@ static void apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp, GstClockTime duration, GstSegment * segment) { + gboolean is_sink = segment == &sq->sink_segment; + GST_MULTI_QUEUE_MUTEX_LOCK (mq); /* if no timestamp is set, assume it's continuous with the previous @@ -1852,16 +1841,22 @@ apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp, if (timestamp == GST_CLOCK_TIME_NONE) timestamp = segment->position; + if (is_sink && !GST_CLOCK_STIME_IS_VALID (sq->sink_start_time)) { + sq->sink_start_time = my_segment_to_running_time (segment, timestamp); + GST_DEBUG_ID (sq->debug_id, "Start time updated to %" GST_STIME_FORMAT, + GST_STIME_ARGS (sq->sink_start_time)); + } + /* add duration */ if (duration != GST_CLOCK_TIME_NONE) timestamp += duration; GST_DEBUG_ID (sq->debug_id, "%s position updated to %" GST_TIME_FORMAT, - segment == &sq->sink_segment ? "sink" : "src", GST_TIME_ARGS (timestamp)); + is_sink ? "sink" : "src", GST_TIME_ARGS (timestamp)); segment->position = timestamp; - if (segment == &sq->sink_segment) + if (is_sink) sq->sink_tainted = TRUE; else sq->src_tainted = TRUE; @@ -1878,12 +1873,18 @@ apply_gap (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event, { GstClockTime timestamp; GstClockTime duration; + gboolean is_sink = segment == &sq->sink_segment; GST_MULTI_QUEUE_MUTEX_LOCK (mq); gst_event_parse_gap (event, ×tamp, &duration); if (GST_CLOCK_TIME_IS_VALID (timestamp)) { + if (is_sink && !GST_CLOCK_STIME_IS_VALID (sq->sink_start_time)) { + sq->sink_start_time = my_segment_to_running_time (segment, timestamp); + GST_DEBUG_ID (sq->debug_id, "Start time updated to %" GST_STIME_FORMAT, + GST_STIME_ARGS (sq->sink_start_time)); + } if (GST_CLOCK_TIME_IS_VALID (duration)) { timestamp += duration; @@ -1891,12 +1892,11 @@ apply_gap (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event, GST_DEBUG_ID (sq->debug_id, "%s position updated to %" GST_TIME_FORMAT, - segment == &sq->sink_segment ? "sink" : "src", - GST_TIME_ARGS (timestamp)); + is_sink ? "sink" : "src", GST_TIME_ARGS (timestamp)); segment->position = timestamp; - if (segment == &sq->sink_segment) + if (is_sink) sq->sink_tainted = TRUE; else sq->src_tainted = TRUE; @@ -2040,8 +2040,6 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq, } case GST_EVENT_SEGMENT: apply_segment (mq, sq, event, &sq->src_segment); - /* Applying the segment may have made the queue non-full again, unblock it if needed */ - gst_data_queue_limits_changed (sq->queue); if (G_UNLIKELY (*allow_drop)) { result = GST_FLOW_OK; *allow_drop = FALSE; @@ -3558,8 +3556,9 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id) sq->sinktime = GST_CLOCK_STIME_NONE; sq->srctime = GST_CLOCK_STIME_NONE; - sq->sink_tainted = TRUE; - sq->src_tainted = TRUE; + sq->sink_start_time = GST_CLOCK_STIME_NONE; + sq->sink_tainted = FALSE; + sq->src_tainted = FALSE; sq->sink_stream_gid = sq->src_stream_gid = GST_GROUP_ID_INVALID; sq->sink_stream_gid_changed = FALSE;