multiqueue: Do not update time level on segment

Update time level decision logic to be identical to the queue/queue2

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/5430>
This commit is contained in:
Seungha Yang 2023-10-05 22:40:43 +09:00 committed by GStreamer Marge Bot
parent 650e7cc4b2
commit aa5f6b2376

View file

@ -141,12 +141,10 @@ struct _GstSingleQueue
/* segments */
GstSegment sink_segment;
GstSegment src_segment;
gboolean has_src_segment; /* preferred over initializing the src_segment to
* UNDEFINED as this doesn't requires adding ifs
* in every segment usage */
/* position of src/sink */
GstClockTimeDiff sinktime, srctime;
GstClockTimeDiff sink_start_time;
/* cached input value, used for interleave */
GstClockTimeDiff cached_sinktime;
/* TRUE if either position needs to be recalculated */
@ -1359,8 +1357,6 @@ gst_single_queue_pause (GstMultiQueue * mq, GstSingleQueue * sq)
gst_object_unref (srcpad);
}
sq->sink_tainted = sq->src_tainted = TRUE;
return result;
}
@ -1375,7 +1371,6 @@ gst_single_queue_stop (GstMultiQueue * mq, GstSingleQueue * sq)
result = gst_pad_stop_task (srcpad);
gst_object_unref (srcpad);
}
sq->sink_tainted = sq->src_tainted = TRUE;
return result;
}
@ -1405,7 +1400,6 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush,
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
sq->has_src_segment = FALSE;
/* All pads start off OK for a smooth kick-off */
sq->srcresult = GST_FLOW_OK;
sq->pushed = FALSE;
@ -1416,6 +1410,9 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush,
sq->nextid = 0;
sq->oldid = 0;
sq->last_oldid = G_MAXUINT32;
sq->sinktime = GST_CLOCK_STIME_NONE;
sq->srctime = GST_CLOCK_STIME_NONE;
sq->sink_start_time = GST_CLOCK_STIME_NONE;
sq->next_time = GST_CLOCK_STIME_NONE;
sq->last_time = GST_CLOCK_STIME_NONE;
sq->cached_sinktime = GST_CLOCK_STIME_NONE;
@ -1429,6 +1426,8 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush,
mq->high_time = GST_CLOCK_STIME_NONE;
sq->flushing = FALSE;
sq->sink_tainted = sq->src_tainted = FALSE;
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
}
}
@ -1698,7 +1697,7 @@ calculate_interleave (GstMultiQueue * mq, GstSingleQueue * sq)
static void
update_time_level (GstMultiQueue * mq, GstSingleQueue * sq)
{
GstClockTimeDiff sink_time, src_time;
GstClockTimeDiff sink_time, src_time, sink_start_time;
if (sq->sink_tainted) {
sink_time = sq->sinktime = my_segment_to_running_time (&sq->sink_segment,
@ -1716,56 +1715,48 @@ update_time_level (GstMultiQueue * mq, GstSingleQueue * sq)
* we set the last_time */
sq->last_time = sink_time;
}
if (G_UNLIKELY (sink_time != GST_CLOCK_STIME_NONE)) {
sq->sink_tainted = FALSE;
if (sink_time != GST_CLOCK_STIME_NONE) {
/* if we have a time, we become untainted and use the time */
sq->sink_tainted = FALSE;
if (mq->use_interleave) {
sq->cached_sinktime = sink_time;
calculate_interleave (mq, sq);
}
}
} else
} else {
sink_time = sq->sinktime;
}
sink_start_time = sq->sink_start_time;
if (sq->src_tainted) {
GstSegment *segment;
gint64 position;
if (sq->has_src_segment) {
segment = &sq->src_segment;
position = sq->src_segment.position;
} else {
/*
* If the src pad had no segment yet, use the sink segment
* to avoid signalling overrun if the received sink segment has a
* a position > max-size-time while the src pad time would be the default=0
*
* This can happen when switching pads on chained/adaptive streams and the
* new chain has a segment with a much larger position
*/
segment = &sq->sink_segment;
position = sq->sink_segment.position;
}
src_time = sq->srctime = my_segment_to_running_time (segment, position);
/* if we have a time, we become untainted and use the time */
if (G_UNLIKELY (src_time != GST_CLOCK_STIME_NONE)) {
sq->src_tainted = FALSE;
}
} else
src_time = sq->srctime = my_segment_to_running_time (&sq->src_segment,
sq->src_segment.position);
sq->src_tainted = FALSE;
} else {
src_time = sq->srctime;
}
GST_DEBUG_ID (sq->debug_id,
"sink %" GST_STIME_FORMAT ", src %" GST_STIME_FORMAT,
GST_STIME_ARGS (sink_time), GST_STIME_ARGS (src_time));
"sink %" GST_STIME_FORMAT ", src %" GST_STIME_FORMAT
", sink-start-time %" GST_STIME_FORMAT, GST_STIME_ARGS (sink_time),
GST_STIME_ARGS (src_time), GST_STIME_ARGS (sink_start_time));
/* This allows for streams with out of order timestamping - sometimes the
* emerging timestamp is later than the arriving one(s) */
if (G_LIKELY (GST_CLOCK_STIME_IS_VALID (sink_time) &&
GST_CLOCK_STIME_IS_VALID (src_time) && sink_time > src_time))
sq->cur_time = sink_time - src_time;
else
if (GST_CLOCK_STIME_IS_VALID (sink_time)) {
if (!GST_CLOCK_STIME_IS_VALID (src_time) &&
GST_CLOCK_STIME_IS_VALID (sink_start_time) &&
sink_time >= sink_start_time) {
/* If we got input buffers but output thread didn't push any buffer yet */
sq->cur_time = sink_time - sink_start_time;
} else if (GST_CLOCK_STIME_IS_VALID (src_time) && sink_time >= src_time) {
sq->cur_time = sink_time - src_time;
} else {
sq->cur_time = 0;
}
} else {
sq->cur_time = 0;
}
/* updating the time level can change the buffering state */
update_buffering (mq, sq);
@ -1773,22 +1764,22 @@ update_time_level (GstMultiQueue * mq, GstSingleQueue * sq)
return;
}
/* take a SEGMENT event and apply the values to segment, updating the time
* level of queue. */
/* take a SEGMENT event and apply the values to segment */
static void
apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
GstSegment * segment)
{
GstClockTimeDiff ppos = 0;
gboolean is_sink = segment == &sq->sink_segment;
/* If we switched groups, grab the previous position */
if (segment->rate > 0.0) {
if (segment == &sq->sink_segment && sq->sink_stream_gid_changed) {
if (is_sink && sq->sink_stream_gid_changed) {
ppos =
gst_segment_to_running_time (segment, GST_FORMAT_TIME,
segment->position);
sq->sink_stream_gid_changed = FALSE;
} else if (segment == &sq->src_segment && sq->src_stream_gid_changed) {
} else if (!is_sink && sq->src_stream_gid_changed) {
ppos =
gst_segment_to_running_time (segment, GST_FORMAT_TIME,
segment->position);
@ -1823,21 +1814,17 @@ apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
else
segment->position = segment->stop;
if (segment == &sq->sink_segment)
sq->sink_tainted = TRUE;
/* Will be updated on buffer flows */
if (is_sink)
sq->sink_tainted = FALSE;
else {
sq->has_src_segment = TRUE;
sq->src_tainted = TRUE;
sq->src_tainted = FALSE;
}
GST_DEBUG_ID (sq->debug_id,
"configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
/* segment can update the time level of the queue */
update_time_level (mq, sq);
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
gst_multi_queue_post_buffering (mq);
}
/* take a buffer and update segment, updating the time level of the queue. */
@ -1845,6 +1832,8 @@ static void
apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp,
GstClockTime duration, GstSegment * segment)
{
gboolean is_sink = segment == &sq->sink_segment;
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
/* if no timestamp is set, assume it's continuous with the previous
@ -1852,16 +1841,22 @@ apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp,
if (timestamp == GST_CLOCK_TIME_NONE)
timestamp = segment->position;
if (is_sink && !GST_CLOCK_STIME_IS_VALID (sq->sink_start_time)) {
sq->sink_start_time = my_segment_to_running_time (segment, timestamp);
GST_DEBUG_ID (sq->debug_id, "Start time updated to %" GST_STIME_FORMAT,
GST_STIME_ARGS (sq->sink_start_time));
}
/* add duration */
if (duration != GST_CLOCK_TIME_NONE)
timestamp += duration;
GST_DEBUG_ID (sq->debug_id, "%s position updated to %" GST_TIME_FORMAT,
segment == &sq->sink_segment ? "sink" : "src", GST_TIME_ARGS (timestamp));
is_sink ? "sink" : "src", GST_TIME_ARGS (timestamp));
segment->position = timestamp;
if (segment == &sq->sink_segment)
if (is_sink)
sq->sink_tainted = TRUE;
else
sq->src_tainted = TRUE;
@ -1878,12 +1873,18 @@ apply_gap (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
{
GstClockTime timestamp;
GstClockTime duration;
gboolean is_sink = segment == &sq->sink_segment;
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
gst_event_parse_gap (event, &timestamp, &duration);
if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
if (is_sink && !GST_CLOCK_STIME_IS_VALID (sq->sink_start_time)) {
sq->sink_start_time = my_segment_to_running_time (segment, timestamp);
GST_DEBUG_ID (sq->debug_id, "Start time updated to %" GST_STIME_FORMAT,
GST_STIME_ARGS (sq->sink_start_time));
}
if (GST_CLOCK_TIME_IS_VALID (duration)) {
timestamp += duration;
@ -1891,12 +1892,11 @@ apply_gap (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
GST_DEBUG_ID (sq->debug_id,
"%s position updated to %" GST_TIME_FORMAT,
segment == &sq->sink_segment ? "sink" : "src",
GST_TIME_ARGS (timestamp));
is_sink ? "sink" : "src", GST_TIME_ARGS (timestamp));
segment->position = timestamp;
if (segment == &sq->sink_segment)
if (is_sink)
sq->sink_tainted = TRUE;
else
sq->src_tainted = TRUE;
@ -2040,8 +2040,6 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
}
case GST_EVENT_SEGMENT:
apply_segment (mq, sq, event, &sq->src_segment);
/* Applying the segment may have made the queue non-full again, unblock it if needed */
gst_data_queue_limits_changed (sq->queue);
if (G_UNLIKELY (*allow_drop)) {
result = GST_FLOW_OK;
*allow_drop = FALSE;
@ -3558,8 +3556,9 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id)
sq->sinktime = GST_CLOCK_STIME_NONE;
sq->srctime = GST_CLOCK_STIME_NONE;
sq->sink_tainted = TRUE;
sq->src_tainted = TRUE;
sq->sink_start_time = GST_CLOCK_STIME_NONE;
sq->sink_tainted = FALSE;
sq->src_tainted = FALSE;
sq->sink_stream_gid = sq->src_stream_gid = GST_GROUP_ID_INVALID;
sq->sink_stream_gid_changed = FALSE;