queue, queue2: Fix current-level-time report

Do not update timelevel on segment. Segment itself does not tell
anything about the amount of buffered time duration in the element
but buffer timestamp/duration is required to measure actual bufferred time.
Moreover, at the time when new segment is applied to sink/srcpad,
segment.position would point to random value.
Therefore calculating running time using the random value does not
make sense and it will result in wrong timelevel report.

This patch updates queue/queue2's timelevel measuring logic so that
it can be updated only on buffer/buffer-list/gap-event flow.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/5430>
This commit is contained in:
Seungha Yang 2023-10-03 02:05:23 +09:00 committed by GStreamer Marge Bot
parent 714c78ac51
commit 96a758abdb
4 changed files with 139 additions and 55 deletions

View file

@ -472,9 +472,10 @@ gst_queue_init (GstQueue * queue)
queue->sinktime = GST_CLOCK_STIME_NONE; queue->sinktime = GST_CLOCK_STIME_NONE;
queue->srctime = GST_CLOCK_STIME_NONE; queue->srctime = GST_CLOCK_STIME_NONE;
queue->sink_start_time = GST_CLOCK_STIME_NONE;
queue->sink_tainted = TRUE; queue->sink_tainted = FALSE;
queue->src_tainted = TRUE; queue->src_tainted = FALSE;
queue->newseg_applied_to_src = FALSE; queue->newseg_applied_to_src = FALSE;
@ -528,7 +529,7 @@ my_segment_to_running_time (GstSegment * segment, GstClockTime val)
static void static void
update_time_level (GstQueue * queue) update_time_level (GstQueue * queue)
{ {
gint64 sink_time, src_time; gint64 sink_time, src_time, sink_start_time;
if (queue->sink_tainted) { if (queue->sink_tainted) {
GST_LOG_OBJECT (queue, "update sink time"); GST_LOG_OBJECT (queue, "update sink time");
@ -538,6 +539,7 @@ update_time_level (GstQueue * queue)
queue->sink_tainted = FALSE; queue->sink_tainted = FALSE;
} }
sink_time = queue->sinktime; sink_time = queue->sinktime;
sink_start_time = queue->sink_start_time;
if (queue->src_tainted) { if (queue->src_tainted) {
GST_LOG_OBJECT (queue, "update src time"); GST_LOG_OBJECT (queue, "update src time");
@ -548,21 +550,31 @@ update_time_level (GstQueue * queue)
} }
src_time = queue->srctime; src_time = queue->srctime;
GST_LOG_OBJECT (queue, "sink %" GST_STIME_FORMAT ", src %" GST_STIME_FORMAT, GST_LOG_OBJECT (queue, "sink %" GST_STIME_FORMAT ", src %" GST_STIME_FORMAT
GST_STIME_ARGS (sink_time), GST_STIME_ARGS (src_time)); ", sink-start-time %" GST_STIME_FORMAT,
GST_STIME_ARGS (sink_time), GST_STIME_ARGS (src_time),
GST_STIME_ARGS (sink_start_time));
if (GST_CLOCK_STIME_IS_VALID (src_time) if (GST_CLOCK_STIME_IS_VALID (sink_time)) {
&& GST_CLOCK_STIME_IS_VALID (sink_time) && sink_time >= src_time) if (!GST_CLOCK_STIME_IS_VALID (src_time) &&
queue->cur_level.time = sink_time - src_time; GST_CLOCK_STIME_IS_VALID (sink_start_time) &&
else sink_time >= sink_start_time) {
/* If we got input buffers but output thread didn't push any buffer yet */
queue->cur_level.time = sink_time - sink_start_time;
} else if (GST_CLOCK_STIME_IS_VALID (src_time) && sink_time >= src_time) {
queue->cur_level.time = sink_time - src_time;
} else {
queue->cur_level.time = 0;
}
} else {
queue->cur_level.time = 0; queue->cur_level.time = 0;
}
} }
/* take a SEGMENT event and apply the values to segment, updating the time /* take a SEGMENT event and apply the values to segment */
* level of queue. */
static void static void
apply_segment (GstQueue * queue, GstEvent * event, GstSegment * segment, apply_segment (GstQueue * queue, GstEvent * event, GstSegment * segment,
gboolean sink) gboolean is_sink)
{ {
gst_event_copy_segment (event, segment); gst_event_copy_segment (event, segment);
@ -576,15 +588,15 @@ apply_segment (GstQueue * queue, GstEvent * event, GstSegment * segment,
segment->stop = -1; segment->stop = -1;
segment->time = 0; segment->time = 0;
} }
if (sink)
queue->sink_tainted = TRUE; /* Will be updated on buffer flows */
else if (is_sink) {
queue->src_tainted = TRUE; queue->sink_tainted = FALSE;
} else {
queue->src_tainted = FALSE;
}
GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment); GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
/* segment can update the time level of the queue */
update_time_level (queue);
} }
static void static void
@ -597,6 +609,11 @@ apply_gap (GstQueue * queue, GstEvent * event,
gst_event_parse_gap (event, &timestamp, &duration); gst_event_parse_gap (event, &timestamp, &duration);
if (GST_CLOCK_TIME_IS_VALID (timestamp)) { if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
if (is_sink && !GST_CLOCK_STIME_IS_VALID (queue->sink_start_time)) {
queue->sink_start_time = my_segment_to_running_time (segment, timestamp);
GST_DEBUG_OBJECT (queue, "Start time updated to %" GST_STIME_FORMAT,
GST_STIME_ARGS (queue->sink_start_time));
}
if (GST_CLOCK_TIME_IS_VALID (duration)) { if (GST_CLOCK_TIME_IS_VALID (duration)) {
timestamp += duration; timestamp += duration;
@ -618,7 +635,7 @@ apply_gap (GstQueue * queue, GstEvent * event,
/* take a buffer and update segment, updating the time level of the queue. */ /* take a buffer and update segment, updating the time level of the queue. */
static void static void
apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment, apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment,
gboolean sink) gboolean is_sink)
{ {
GstClockTime duration, timestamp; GstClockTime duration, timestamp;
@ -630,29 +647,40 @@ apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment,
if (timestamp == GST_CLOCK_TIME_NONE) if (timestamp == GST_CLOCK_TIME_NONE)
timestamp = segment->position; timestamp = segment->position;
if (is_sink && !GST_CLOCK_STIME_IS_VALID (queue->sink_start_time) &&
GST_CLOCK_TIME_IS_VALID (timestamp)) {
queue->sink_start_time = my_segment_to_running_time (segment, timestamp);
GST_DEBUG_OBJECT (queue, "Start time updated to %" GST_STIME_FORMAT,
GST_STIME_ARGS (queue->sink_start_time));
}
/* add duration */ /* add duration */
if (duration != GST_CLOCK_TIME_NONE) if (duration != GST_CLOCK_TIME_NONE)
timestamp += duration; timestamp += duration;
GST_LOG_OBJECT (queue, "%s position updated to %" GST_TIME_FORMAT, GST_LOG_OBJECT (queue, "%s position updated to %" GST_TIME_FORMAT,
segment == &queue->sink_segment ? "sink" : "src", is_sink ? "sink" : "src", GST_TIME_ARGS (timestamp));
GST_TIME_ARGS (timestamp));
segment->position = timestamp; segment->position = timestamp;
if (sink) if (is_sink)
queue->sink_tainted = TRUE; queue->sink_tainted = TRUE;
else else
queue->src_tainted = TRUE; queue->src_tainted = TRUE;
/* calc diff with other end */ /* calc diff with other end */
update_time_level (queue); update_time_level (queue);
} }
typedef struct
{
GstClockTime first_timestamp;
GstClockTime timestamp;
} BufListData;
static gboolean static gboolean
buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer user_data) buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer user_data)
{ {
GstClockTime *timestamp = user_data; BufListData *data = user_data;
GstClockTime btime; GstClockTime btime;
GST_TRACE ("buffer %u has pts %" GST_TIME_FORMAT " dts %" GST_TIME_FORMAT GST_TRACE ("buffer %u has pts %" GST_TIME_FORMAT " dts %" GST_TIME_FORMAT
@ -661,13 +689,17 @@ buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer user_data)
GST_TIME_ARGS (GST_BUFFER_DURATION (*buf))); GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
btime = GST_BUFFER_DTS_OR_PTS (*buf); btime = GST_BUFFER_DTS_OR_PTS (*buf);
if (GST_CLOCK_TIME_IS_VALID (btime)) if (GST_CLOCK_TIME_IS_VALID (btime)) {
*timestamp = btime; if (!GST_CLOCK_TIME_IS_VALID (data->first_timestamp))
data->first_timestamp = btime;
data->timestamp = btime;
}
if (GST_BUFFER_DURATION_IS_VALID (*buf)) if (GST_BUFFER_DURATION_IS_VALID (*buf))
*timestamp += GST_BUFFER_DURATION (*buf); data->timestamp += GST_BUFFER_DURATION (*buf);
GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp)); GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (data->timestamp));
return TRUE; return TRUE;
} }
@ -675,21 +707,31 @@ buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer user_data)
/* take a buffer list and update segment, updating the time level of the queue */ /* take a buffer list and update segment, updating the time level of the queue */
static void static void
apply_buffer_list (GstQueue * queue, GstBufferList * buffer_list, apply_buffer_list (GstQueue * queue, GstBufferList * buffer_list,
GstSegment * segment, gboolean sink) GstSegment * segment, gboolean is_sink)
{ {
GstClockTime timestamp; BufListData data;
data.first_timestamp = GST_CLOCK_TIME_NONE;
/* if no timestamp is set, assume it's continuous with the previous time */ /* if no timestamp is set, assume it's continuous with the previous time */
timestamp = segment->position; data.timestamp = segment->position;
gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &timestamp); gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &data);
if (is_sink && !GST_CLOCK_STIME_IS_VALID (queue->sink_start_time) &&
GST_CLOCK_TIME_IS_VALID (data.first_timestamp)) {
queue->sink_start_time = my_segment_to_running_time (segment,
data.first_timestamp);
GST_DEBUG_OBJECT (queue, "Start time updated to %" GST_STIME_FORMAT,
GST_STIME_ARGS (queue->sink_start_time));
}
GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT, GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
GST_TIME_ARGS (timestamp)); GST_TIME_ARGS (data.timestamp));
segment->position = timestamp; segment->position = data.timestamp;
if (sink) if (is_sink)
queue->sink_tainted = TRUE; queue->sink_tainted = TRUE;
else else
queue->src_tainted = TRUE; queue->src_tainted = TRUE;
@ -727,7 +769,8 @@ gst_queue_locked_flush (GstQueue * queue, gboolean full)
queue->head_needs_discont = queue->tail_needs_discont = FALSE; queue->head_needs_discont = queue->tail_needs_discont = FALSE;
queue->sinktime = queue->srctime = GST_CLOCK_STIME_NONE; queue->sinktime = queue->srctime = GST_CLOCK_STIME_NONE;
queue->sink_tainted = queue->src_tainted = TRUE; queue->sink_start_time = GST_CLOCK_STIME_NONE;
queue->sink_tainted = queue->src_tainted = FALSE;
/* we deleted a lot of something */ /* we deleted a lot of something */
GST_QUEUE_SIGNAL_DEL (queue); GST_QUEUE_SIGNAL_DEL (queue);

View file

@ -99,6 +99,7 @@ struct _GstQueue {
/* position of src/sink */ /* position of src/sink */
GstClockTimeDiff sinktime, srctime; GstClockTimeDiff sinktime, srctime;
GstClockTimeDiff sink_start_time;
/* TRUE if either position needs to be recalculated */ /* TRUE if either position needs to be recalculated */
gboolean sink_tainted, src_tainted; gboolean sink_tainted, src_tainted;

View file

@ -538,8 +538,9 @@ gst_queue2_init (GstQueue2 * queue)
queue->sinktime = GST_CLOCK_TIME_NONE; queue->sinktime = GST_CLOCK_TIME_NONE;
queue->srctime = GST_CLOCK_TIME_NONE; queue->srctime = GST_CLOCK_TIME_NONE;
queue->sink_tainted = TRUE; queue->sink_start_time = GST_CLOCK_TIME_NONE;
queue->src_tainted = TRUE; queue->sink_tainted = FALSE;
queue->src_tainted = FALSE;
queue->srcresult = GST_FLOW_FLUSHING; queue->srcresult = GST_FLOW_FLUSHING;
queue->sinkresult = GST_FLOW_FLUSHING; queue->sinkresult = GST_FLOW_FLUSHING;
@ -769,19 +770,29 @@ update_time_level (GstQueue2 * queue)
queue->src_tainted = FALSE; queue->src_tainted = FALSE;
} }
GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT
GST_TIME_ARGS (queue->sinktime), GST_TIME_ARGS (queue->srctime)); ", sink-start-time %" GST_TIME_FORMAT,
GST_TIME_ARGS (queue->sinktime), GST_TIME_ARGS (queue->srctime),
GST_TIME_ARGS (queue->sink_start_time));
if (queue->sinktime != GST_CLOCK_TIME_NONE if (GST_CLOCK_TIME_IS_VALID (queue->sinktime)) {
&& queue->srctime != GST_CLOCK_TIME_NONE if (!GST_CLOCK_TIME_IS_VALID (queue->srctime) &&
&& queue->sinktime >= queue->srctime) GST_CLOCK_TIME_IS_VALID (queue->sink_start_time) &&
queue->cur_level.time = queue->sinktime - queue->srctime; queue->sinktime >= queue->sink_start_time) {
else /* If we got input buffers but output thread didn't push any buffer yet */
queue->cur_level.time = queue->sinktime - queue->sink_start_time;
} else if (GST_CLOCK_TIME_IS_VALID (queue->srctime) &&
queue->sinktime >= queue->srctime) {
queue->cur_level.time = queue->sinktime - queue->srctime;
} else {
queue->cur_level.time = 0;
}
} else {
queue->cur_level.time = 0; queue->cur_level.time = 0;
}
} }
/* take a SEGMENT event and apply the values to segment, updating the time /* take a SEGMENT event and apply the values to segment */
* level of queue. */
static void static void
apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment, apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
gboolean is_sink) gboolean is_sink)
@ -808,13 +819,11 @@ apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment); GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
/* Will be updated on buffer flows */
if (is_sink) if (is_sink)
queue->sink_tainted = TRUE; queue->sink_tainted = FALSE;
else else
queue->src_tainted = TRUE; queue->src_tainted = FALSE;
/* segment can update the time level of the queue */
update_time_level (queue);
} }
static void static void
@ -827,6 +836,12 @@ apply_gap (GstQueue2 * queue, GstEvent * event,
gst_event_parse_gap (event, &timestamp, &duration); gst_event_parse_gap (event, &timestamp, &duration);
if (GST_CLOCK_TIME_IS_VALID (timestamp)) { if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
if (is_sink && !GST_CLOCK_TIME_IS_VALID (queue->sink_start_time)) {
queue->sink_start_time = gst_segment_to_running_time (segment,
GST_FORMAT_TIME, timestamp);
GST_DEBUG_OBJECT (queue, "Start time updated to %" GST_TIME_FORMAT,
GST_TIME_ARGS (queue->sink_start_time));
}
if (GST_CLOCK_TIME_IS_VALID (duration)) { if (GST_CLOCK_TIME_IS_VALID (duration)) {
timestamp += duration; timestamp += duration;
@ -911,6 +926,14 @@ apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
if (timestamp == GST_CLOCK_TIME_NONE) if (timestamp == GST_CLOCK_TIME_NONE)
timestamp = segment->position; timestamp = segment->position;
if (is_sink && !GST_CLOCK_TIME_IS_VALID (queue->sink_start_time) &&
GST_CLOCK_TIME_IS_VALID (timestamp)) {
queue->sink_start_time = gst_segment_to_running_time (segment,
GST_FORMAT_TIME, timestamp);
GST_DEBUG_OBJECT (queue, "Start time updated to %" GST_TIME_FORMAT,
GST_TIME_ARGS (queue->sink_start_time));
}
/* add duration */ /* add duration */
if (duration != GST_CLOCK_TIME_NONE) if (duration != GST_CLOCK_TIME_NONE)
timestamp += duration; timestamp += duration;
@ -931,6 +954,7 @@ apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
struct BufListData struct BufListData
{ {
GstClockTime first_timestamp;
GstClockTime timestamp; GstClockTime timestamp;
guint bitrate; guint bitrate;
}; };
@ -949,8 +973,12 @@ buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer data)
GST_TIME_ARGS (GST_BUFFER_DURATION (*buf))); GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
btime = GST_BUFFER_DTS_OR_PTS (*buf); btime = GST_BUFFER_DTS_OR_PTS (*buf);
if (GST_CLOCK_TIME_IS_VALID (btime)) if (GST_CLOCK_TIME_IS_VALID (btime)) {
if (!GST_CLOCK_TIME_IS_VALID (bld->first_timestamp))
bld->first_timestamp = btime;
*timestamp = btime; *timestamp = btime;
}
if (GST_BUFFER_DURATION_IS_VALID (*buf)) if (GST_BUFFER_DURATION_IS_VALID (*buf))
*timestamp += GST_BUFFER_DURATION (*buf); *timestamp += GST_BUFFER_DURATION (*buf);
@ -973,6 +1001,8 @@ apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
{ {
struct BufListData bld; struct BufListData bld;
bld.first_timestamp = GST_CLOCK_TIME_NONE;
/* if no timestamp is set, assume it's continuous with the previous time */ /* if no timestamp is set, assume it's continuous with the previous time */
bld.timestamp = segment->position; bld.timestamp = segment->position;
@ -989,6 +1019,14 @@ apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &bld); gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &bld);
if (is_sink && !GST_CLOCK_TIME_IS_VALID (queue->sink_start_time) &&
GST_CLOCK_TIME_IS_VALID (bld.first_timestamp)) {
queue->sink_start_time = gst_segment_to_running_time (segment,
GST_FORMAT_TIME, bld.first_timestamp);
GST_DEBUG_OBJECT (queue, "Start time updated to %" GST_TIME_FORMAT,
GST_TIME_ARGS (queue->sink_start_time));
}
GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT, GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
GST_TIME_ARGS (bld.timestamp)); GST_TIME_ARGS (bld.timestamp));
@ -1909,7 +1947,8 @@ gst_queue2_locked_flush (GstQueue2 * queue, gboolean full, gboolean clear_temp)
gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME); gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
gst_segment_init (&queue->src_segment, GST_FORMAT_TIME); gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE; queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE;
queue->sink_tainted = queue->src_tainted = TRUE; queue->sink_start_time = GST_CLOCK_TIME_NONE;
queue->sink_tainted = queue->src_tainted = FALSE;
if (queue->starting_segment != NULL) if (queue->starting_segment != NULL)
gst_event_unref (queue->starting_segment); gst_event_unref (queue->starting_segment);
queue->starting_segment = NULL; queue->starting_segment = NULL;

View file

@ -86,6 +86,7 @@ struct _GstQueue2
/* Position of src/sink */ /* Position of src/sink */
GstClockTime sinktime, srctime; GstClockTime sinktime, srctime;
GstClockTime sink_start_time;
/* TRUE if either position needs to be recalculated */ /* TRUE if either position needs to be recalculated */
gboolean sink_tainted, src_tainted; gboolean sink_tainted, src_tainted;
/* Bitrates taken from tags */ /* Bitrates taken from tags */