From 96a758abdbb9eb4832fed3ea7d23cc0f730a3a56 Mon Sep 17 00:00:00 2001 From: Seungha Yang Date: Tue, 3 Oct 2023 02:05:23 +0900 Subject: [PATCH] queue, queue2: Fix current-level-time report Do not update timelevel on segment. Segment itself does not tell anything about the amount of buffered time duration in the element but buffer timestamp/duration is required to measure actual bufferred time. Moreover, at the time when new segment is applied to sink/srcpad, segment.position would point to random value. Therefore calculating running time using the random value does not make sense and it will result in wrong timelevel report. This patch updates queue/queue2's timelevel measuring logic so that it can be updated only on buffer/buffer-list/gap-event flow. Part-of: --- .../gstreamer/plugins/elements/gstqueue.c | 117 ++++++++++++------ .../gstreamer/plugins/elements/gstqueue.h | 1 + .../gstreamer/plugins/elements/gstqueue2.c | 75 ++++++++--- .../gstreamer/plugins/elements/gstqueue2.h | 1 + 4 files changed, 139 insertions(+), 55 deletions(-) diff --git a/subprojects/gstreamer/plugins/elements/gstqueue.c b/subprojects/gstreamer/plugins/elements/gstqueue.c index a154c7842a..50c2bf59f2 100644 --- a/subprojects/gstreamer/plugins/elements/gstqueue.c +++ b/subprojects/gstreamer/plugins/elements/gstqueue.c @@ -472,9 +472,10 @@ gst_queue_init (GstQueue * queue) queue->sinktime = GST_CLOCK_STIME_NONE; queue->srctime = GST_CLOCK_STIME_NONE; + queue->sink_start_time = GST_CLOCK_STIME_NONE; - queue->sink_tainted = TRUE; - queue->src_tainted = TRUE; + queue->sink_tainted = FALSE; + queue->src_tainted = FALSE; queue->newseg_applied_to_src = FALSE; @@ -528,7 +529,7 @@ my_segment_to_running_time (GstSegment * segment, GstClockTime val) static void update_time_level (GstQueue * queue) { - gint64 sink_time, src_time; + gint64 sink_time, src_time, sink_start_time; if (queue->sink_tainted) { GST_LOG_OBJECT (queue, "update sink time"); @@ -538,6 +539,7 @@ update_time_level (GstQueue * queue) queue->sink_tainted = FALSE; } sink_time = queue->sinktime; + sink_start_time = queue->sink_start_time; if (queue->src_tainted) { GST_LOG_OBJECT (queue, "update src time"); @@ -548,21 +550,31 @@ update_time_level (GstQueue * queue) } src_time = queue->srctime; - GST_LOG_OBJECT (queue, "sink %" GST_STIME_FORMAT ", src %" GST_STIME_FORMAT, - GST_STIME_ARGS (sink_time), GST_STIME_ARGS (src_time)); + GST_LOG_OBJECT (queue, "sink %" GST_STIME_FORMAT ", src %" GST_STIME_FORMAT + ", sink-start-time %" GST_STIME_FORMAT, + GST_STIME_ARGS (sink_time), GST_STIME_ARGS (src_time), + GST_STIME_ARGS (sink_start_time)); - if (GST_CLOCK_STIME_IS_VALID (src_time) - && GST_CLOCK_STIME_IS_VALID (sink_time) && sink_time >= src_time) - queue->cur_level.time = sink_time - src_time; - else + if (GST_CLOCK_STIME_IS_VALID (sink_time)) { + if (!GST_CLOCK_STIME_IS_VALID (src_time) && + GST_CLOCK_STIME_IS_VALID (sink_start_time) && + sink_time >= sink_start_time) { + /* If we got input buffers but output thread didn't push any buffer yet */ + queue->cur_level.time = sink_time - sink_start_time; + } else if (GST_CLOCK_STIME_IS_VALID (src_time) && sink_time >= src_time) { + queue->cur_level.time = sink_time - src_time; + } else { + queue->cur_level.time = 0; + } + } else { queue->cur_level.time = 0; + } } -/* take a SEGMENT event and apply the values to segment, updating the time - * level of queue. */ +/* take a SEGMENT event and apply the values to segment */ static void apply_segment (GstQueue * queue, GstEvent * event, GstSegment * segment, - gboolean sink) + gboolean is_sink) { gst_event_copy_segment (event, segment); @@ -576,15 +588,15 @@ apply_segment (GstQueue * queue, GstEvent * event, GstSegment * segment, segment->stop = -1; segment->time = 0; } - if (sink) - queue->sink_tainted = TRUE; - else - queue->src_tainted = TRUE; + + /* Will be updated on buffer flows */ + if (is_sink) { + queue->sink_tainted = FALSE; + } else { + queue->src_tainted = FALSE; + } GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment); - - /* segment can update the time level of the queue */ - update_time_level (queue); } static void @@ -597,6 +609,11 @@ apply_gap (GstQueue * queue, GstEvent * event, gst_event_parse_gap (event, ×tamp, &duration); if (GST_CLOCK_TIME_IS_VALID (timestamp)) { + if (is_sink && !GST_CLOCK_STIME_IS_VALID (queue->sink_start_time)) { + queue->sink_start_time = my_segment_to_running_time (segment, timestamp); + GST_DEBUG_OBJECT (queue, "Start time updated to %" GST_STIME_FORMAT, + GST_STIME_ARGS (queue->sink_start_time)); + } if (GST_CLOCK_TIME_IS_VALID (duration)) { timestamp += duration; @@ -618,7 +635,7 @@ apply_gap (GstQueue * queue, GstEvent * event, /* take a buffer and update segment, updating the time level of the queue. */ static void apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment, - gboolean sink) + gboolean is_sink) { GstClockTime duration, timestamp; @@ -630,29 +647,40 @@ apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment, if (timestamp == GST_CLOCK_TIME_NONE) timestamp = segment->position; + if (is_sink && !GST_CLOCK_STIME_IS_VALID (queue->sink_start_time) && + GST_CLOCK_TIME_IS_VALID (timestamp)) { + queue->sink_start_time = my_segment_to_running_time (segment, timestamp); + GST_DEBUG_OBJECT (queue, "Start time updated to %" GST_STIME_FORMAT, + GST_STIME_ARGS (queue->sink_start_time)); + } + /* add duration */ if (duration != GST_CLOCK_TIME_NONE) timestamp += duration; GST_LOG_OBJECT (queue, "%s position updated to %" GST_TIME_FORMAT, - segment == &queue->sink_segment ? "sink" : "src", - GST_TIME_ARGS (timestamp)); + is_sink ? "sink" : "src", GST_TIME_ARGS (timestamp)); segment->position = timestamp; - if (sink) + if (is_sink) queue->sink_tainted = TRUE; else queue->src_tainted = TRUE; - /* calc diff with other end */ update_time_level (queue); } +typedef struct +{ + GstClockTime first_timestamp; + GstClockTime timestamp; +} BufListData; + static gboolean buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer user_data) { - GstClockTime *timestamp = user_data; + BufListData *data = user_data; GstClockTime btime; GST_TRACE ("buffer %u has pts %" GST_TIME_FORMAT " dts %" GST_TIME_FORMAT @@ -661,13 +689,17 @@ buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer user_data) GST_TIME_ARGS (GST_BUFFER_DURATION (*buf))); btime = GST_BUFFER_DTS_OR_PTS (*buf); - if (GST_CLOCK_TIME_IS_VALID (btime)) - *timestamp = btime; + if (GST_CLOCK_TIME_IS_VALID (btime)) { + if (!GST_CLOCK_TIME_IS_VALID (data->first_timestamp)) + data->first_timestamp = btime; + + data->timestamp = btime; + } if (GST_BUFFER_DURATION_IS_VALID (*buf)) - *timestamp += GST_BUFFER_DURATION (*buf); + data->timestamp += GST_BUFFER_DURATION (*buf); - GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp)); + GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (data->timestamp)); return TRUE; } @@ -675,21 +707,31 @@ buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer user_data) /* take a buffer list and update segment, updating the time level of the queue */ static void apply_buffer_list (GstQueue * queue, GstBufferList * buffer_list, - GstSegment * segment, gboolean sink) + GstSegment * segment, gboolean is_sink) { - GstClockTime timestamp; + BufListData data; + + data.first_timestamp = GST_CLOCK_TIME_NONE; /* if no timestamp is set, assume it's continuous with the previous time */ - timestamp = segment->position; + data.timestamp = segment->position; - gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, ×tamp); + gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &data); + + if (is_sink && !GST_CLOCK_STIME_IS_VALID (queue->sink_start_time) && + GST_CLOCK_TIME_IS_VALID (data.first_timestamp)) { + queue->sink_start_time = my_segment_to_running_time (segment, + data.first_timestamp); + GST_DEBUG_OBJECT (queue, "Start time updated to %" GST_STIME_FORMAT, + GST_STIME_ARGS (queue->sink_start_time)); + } GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT, - GST_TIME_ARGS (timestamp)); + GST_TIME_ARGS (data.timestamp)); - segment->position = timestamp; + segment->position = data.timestamp; - if (sink) + if (is_sink) queue->sink_tainted = TRUE; else queue->src_tainted = TRUE; @@ -727,7 +769,8 @@ gst_queue_locked_flush (GstQueue * queue, gboolean full) queue->head_needs_discont = queue->tail_needs_discont = FALSE; queue->sinktime = queue->srctime = GST_CLOCK_STIME_NONE; - queue->sink_tainted = queue->src_tainted = TRUE; + queue->sink_start_time = GST_CLOCK_STIME_NONE; + queue->sink_tainted = queue->src_tainted = FALSE; /* we deleted a lot of something */ GST_QUEUE_SIGNAL_DEL (queue); diff --git a/subprojects/gstreamer/plugins/elements/gstqueue.h b/subprojects/gstreamer/plugins/elements/gstqueue.h index 2a9cb3612a..cc632ad355 100644 --- a/subprojects/gstreamer/plugins/elements/gstqueue.h +++ b/subprojects/gstreamer/plugins/elements/gstqueue.h @@ -99,6 +99,7 @@ struct _GstQueue { /* position of src/sink */ GstClockTimeDiff sinktime, srctime; + GstClockTimeDiff sink_start_time; /* TRUE if either position needs to be recalculated */ gboolean sink_tainted, src_tainted; diff --git a/subprojects/gstreamer/plugins/elements/gstqueue2.c b/subprojects/gstreamer/plugins/elements/gstqueue2.c index 19488e664c..1a5127c457 100644 --- a/subprojects/gstreamer/plugins/elements/gstqueue2.c +++ b/subprojects/gstreamer/plugins/elements/gstqueue2.c @@ -538,8 +538,9 @@ gst_queue2_init (GstQueue2 * queue) queue->sinktime = GST_CLOCK_TIME_NONE; queue->srctime = GST_CLOCK_TIME_NONE; - queue->sink_tainted = TRUE; - queue->src_tainted = TRUE; + queue->sink_start_time = GST_CLOCK_TIME_NONE; + queue->sink_tainted = FALSE; + queue->src_tainted = FALSE; queue->srcresult = GST_FLOW_FLUSHING; queue->sinkresult = GST_FLOW_FLUSHING; @@ -769,19 +770,29 @@ update_time_level (GstQueue2 * queue) queue->src_tainted = FALSE; } - GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, - GST_TIME_ARGS (queue->sinktime), GST_TIME_ARGS (queue->srctime)); + GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT + ", sink-start-time %" GST_TIME_FORMAT, + GST_TIME_ARGS (queue->sinktime), GST_TIME_ARGS (queue->srctime), + GST_TIME_ARGS (queue->sink_start_time)); - if (queue->sinktime != GST_CLOCK_TIME_NONE - && queue->srctime != GST_CLOCK_TIME_NONE - && queue->sinktime >= queue->srctime) - queue->cur_level.time = queue->sinktime - queue->srctime; - else + if (GST_CLOCK_TIME_IS_VALID (queue->sinktime)) { + if (!GST_CLOCK_TIME_IS_VALID (queue->srctime) && + GST_CLOCK_TIME_IS_VALID (queue->sink_start_time) && + queue->sinktime >= queue->sink_start_time) { + /* If we got input buffers but output thread didn't push any buffer yet */ + queue->cur_level.time = queue->sinktime - queue->sink_start_time; + } else if (GST_CLOCK_TIME_IS_VALID (queue->srctime) && + queue->sinktime >= queue->srctime) { + queue->cur_level.time = queue->sinktime - queue->srctime; + } else { + queue->cur_level.time = 0; + } + } else { queue->cur_level.time = 0; + } } -/* take a SEGMENT event and apply the values to segment, updating the time - * level of queue. */ +/* take a SEGMENT event and apply the values to segment */ static void apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment, gboolean is_sink) @@ -808,13 +819,11 @@ apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment, GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment); + /* Will be updated on buffer flows */ if (is_sink) - queue->sink_tainted = TRUE; + queue->sink_tainted = FALSE; else - queue->src_tainted = TRUE; - - /* segment can update the time level of the queue */ - update_time_level (queue); + queue->src_tainted = FALSE; } static void @@ -827,6 +836,12 @@ apply_gap (GstQueue2 * queue, GstEvent * event, gst_event_parse_gap (event, ×tamp, &duration); if (GST_CLOCK_TIME_IS_VALID (timestamp)) { + if (is_sink && !GST_CLOCK_TIME_IS_VALID (queue->sink_start_time)) { + queue->sink_start_time = gst_segment_to_running_time (segment, + GST_FORMAT_TIME, timestamp); + GST_DEBUG_OBJECT (queue, "Start time updated to %" GST_TIME_FORMAT, + GST_TIME_ARGS (queue->sink_start_time)); + } if (GST_CLOCK_TIME_IS_VALID (duration)) { timestamp += duration; @@ -911,6 +926,14 @@ apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment, if (timestamp == GST_CLOCK_TIME_NONE) timestamp = segment->position; + if (is_sink && !GST_CLOCK_TIME_IS_VALID (queue->sink_start_time) && + GST_CLOCK_TIME_IS_VALID (timestamp)) { + queue->sink_start_time = gst_segment_to_running_time (segment, + GST_FORMAT_TIME, timestamp); + GST_DEBUG_OBJECT (queue, "Start time updated to %" GST_TIME_FORMAT, + GST_TIME_ARGS (queue->sink_start_time)); + } + /* add duration */ if (duration != GST_CLOCK_TIME_NONE) timestamp += duration; @@ -931,6 +954,7 @@ apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment, struct BufListData { + GstClockTime first_timestamp; GstClockTime timestamp; guint bitrate; }; @@ -949,8 +973,12 @@ buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer data) GST_TIME_ARGS (GST_BUFFER_DURATION (*buf))); btime = GST_BUFFER_DTS_OR_PTS (*buf); - if (GST_CLOCK_TIME_IS_VALID (btime)) + if (GST_CLOCK_TIME_IS_VALID (btime)) { + if (!GST_CLOCK_TIME_IS_VALID (bld->first_timestamp)) + bld->first_timestamp = btime; + *timestamp = btime; + } if (GST_BUFFER_DURATION_IS_VALID (*buf)) *timestamp += GST_BUFFER_DURATION (*buf); @@ -973,6 +1001,8 @@ apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list, { struct BufListData bld; + bld.first_timestamp = GST_CLOCK_TIME_NONE; + /* if no timestamp is set, assume it's continuous with the previous time */ bld.timestamp = segment->position; @@ -989,6 +1019,14 @@ apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list, gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &bld); + if (is_sink && !GST_CLOCK_TIME_IS_VALID (queue->sink_start_time) && + GST_CLOCK_TIME_IS_VALID (bld.first_timestamp)) { + queue->sink_start_time = gst_segment_to_running_time (segment, + GST_FORMAT_TIME, bld.first_timestamp); + GST_DEBUG_OBJECT (queue, "Start time updated to %" GST_TIME_FORMAT, + GST_TIME_ARGS (queue->sink_start_time)); + } + GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT, GST_TIME_ARGS (bld.timestamp)); @@ -1909,7 +1947,8 @@ gst_queue2_locked_flush (GstQueue2 * queue, gboolean full, gboolean clear_temp) gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME); gst_segment_init (&queue->src_segment, GST_FORMAT_TIME); queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE; - queue->sink_tainted = queue->src_tainted = TRUE; + queue->sink_start_time = GST_CLOCK_TIME_NONE; + queue->sink_tainted = queue->src_tainted = FALSE; if (queue->starting_segment != NULL) gst_event_unref (queue->starting_segment); queue->starting_segment = NULL; diff --git a/subprojects/gstreamer/plugins/elements/gstqueue2.h b/subprojects/gstreamer/plugins/elements/gstqueue2.h index 5e8fa9af1b..bd857fc796 100644 --- a/subprojects/gstreamer/plugins/elements/gstqueue2.h +++ b/subprojects/gstreamer/plugins/elements/gstqueue2.h @@ -86,6 +86,7 @@ struct _GstQueue2 /* Position of src/sink */ GstClockTime sinktime, srctime; + GstClockTime sink_start_time; /* TRUE if either position needs to be recalculated */ gboolean sink_tainted, src_tainted; /* Bitrates taken from tags */