multiqueue: Use signed clock values for running time calculation

This improves the accuracy of queue levels and when to push buffers
for buffers falling outside of the segment

https://bugzilla.gnome.org/show_bug.cgi?id=757193
This commit is contained in:
Edward Hervey 2015-11-30 17:09:43 +01:00 committed by Edward Hervey
parent c51d2b2a37
commit 4d96e5b834
2 changed files with 89 additions and 72 deletions

View file

@ -149,7 +149,7 @@ struct _GstSingleQueue
* in every segment usage */
/* position of src/sink */
GstClockTime sinktime, srctime;
GstClockTimeDiff sinktime, srctime;
/* cached input value, used for interleave */
GstClockTimeDiff cached_sinktime;
/* TRUE if either position needs to be recalculated */
@ -168,8 +168,8 @@ struct _GstSingleQueue
guint32 nextid; /* ID of the next object waiting to be pushed */
guint32 oldid; /* ID of the last object pushed (last in a series) */
guint32 last_oldid; /* Previously observed old_id, reset to MAXUINT32 on flush */
GstClockTime next_time; /* End running time of next buffer to be pushed */
GstClockTime last_time; /* Start running time of last pushed buffer */
GstClockTimeDiff next_time; /* End running time of next buffer to be pushed */
GstClockTimeDiff last_time; /* Start running time of last pushed buffer */
GCond turn; /* SingleQueue turn waiting conditional */
/* for serialized queries */
@ -292,6 +292,23 @@ enum
} \
} G_STMT_END
/* Convenience function */
static inline GstClockTimeDiff
my_segment_to_running_time (GstSegment * segment, GstClockTime val)
{
GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
if (GST_CLOCK_TIME_IS_VALID (val)) {
gboolean sign =
gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
if (sign > 0)
res = val;
else if (sign < 0)
res = -val;
}
return res;
}
static void gst_multi_queue_finalize (GObject * object);
static void gst_multi_queue_set_property (GObject * object,
guint prop_id, const GValue * value, GParamSpec * pspec);
@ -494,7 +511,7 @@ gst_multi_queue_init (GstMultiQueue * mqueue)
mqueue->counter = 1;
mqueue->highid = -1;
mqueue->high_time = GST_CLOCK_TIME_NONE;
mqueue->high_time = GST_CLOCK_STIME_NONE;
g_mutex_init (&mqueue->qlock);
g_mutex_init (&mqueue->buffering_post_lock);
@ -904,13 +921,13 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush,
sq->nextid = 0;
sq->oldid = 0;
sq->last_oldid = G_MAXUINT32;
sq->next_time = GST_CLOCK_TIME_NONE;
sq->last_time = GST_CLOCK_TIME_NONE;
sq->cached_sinktime = GST_CLOCK_TIME_NONE;
sq->next_time = GST_CLOCK_STIME_NONE;
sq->last_time = GST_CLOCK_STIME_NONE;
sq->cached_sinktime = GST_CLOCK_STIME_NONE;
gst_data_queue_set_flushing (sq->queue, FALSE);
/* Reset high time to be recomputed next */
mq->high_time = GST_CLOCK_TIME_NONE;
mq->high_time = GST_CLOCK_STIME_NONE;
sq->flushing = FALSE;
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
@ -1090,10 +1107,10 @@ beach:
GST_DEBUG_OBJECT (mq,
"low:%" GST_STIME_FORMAT " high:%" GST_STIME_FORMAT " interleave:%"
GST_TIME_FORMAT " mq->interleave:%" GST_TIME_FORMAT
" last_interleave_update:%" GST_TIME_FORMAT, GST_TIME_ARGS (low),
GST_TIME_ARGS (high), GST_TIME_ARGS (interleave),
" last_interleave_update:%" GST_STIME_FORMAT, GST_STIME_ARGS (low),
GST_STIME_ARGS (high), GST_TIME_ARGS (interleave),
GST_TIME_ARGS (mq->interleave),
GST_TIME_ARGS (mq->last_interleave_update));
GST_STIME_ARGS (mq->last_interleave_update));
}
@ -1103,26 +1120,25 @@ beach:
static void
update_time_level (GstMultiQueue * mq, GstSingleQueue * sq)
{
gint64 sink_time, src_time;
GstClockTimeDiff sink_time, src_time;
if (sq->sink_tainted) {
sink_time = sq->sinktime =
gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME,
sink_time = sq->sinktime = my_segment_to_running_time (&sq->sink_segment,
sq->sink_segment.position);
GST_DEBUG_OBJECT (mq,
"queue %d sink_segment.position:%" GST_TIME_FORMAT ", sink_time:%"
GST_TIME_FORMAT, sq->id, GST_TIME_ARGS (sq->sink_segment.position),
GST_TIME_ARGS (sink_time));
GST_STIME_FORMAT, sq->id, GST_TIME_ARGS (sq->sink_segment.position),
GST_STIME_ARGS (sink_time));
if (G_UNLIKELY (sq->last_time == GST_CLOCK_TIME_NONE)) {
if (G_UNLIKELY (sq->last_time == GST_CLOCK_STIME_NONE)) {
/* If the single queue still doesn't have a last_time set, this means
* that nothing has been pushed out yet.
* In order for the high_time computation to be as efficient as possible,
* we set the last_time */
sq->last_time = sink_time;
}
if (G_UNLIKELY (sink_time != GST_CLOCK_TIME_NONE)) {
if (G_UNLIKELY (sink_time != GST_CLOCK_STIME_NONE)) {
/* if we have a time, we become untainted and use the time */
sq->sink_tainted = FALSE;
if (mq->use_interleave) {
@ -1153,22 +1169,22 @@ update_time_level (GstMultiQueue * mq, GstSingleQueue * sq)
position = sq->sink_segment.position;
}
src_time = sq->srctime =
gst_segment_to_running_time (segment, GST_FORMAT_TIME, position);
src_time = sq->srctime = my_segment_to_running_time (segment, position);
/* if we have a time, we become untainted and use the time */
if (G_UNLIKELY (src_time != GST_CLOCK_TIME_NONE)) {
if (G_UNLIKELY (src_time != GST_CLOCK_STIME_NONE)) {
sq->src_tainted = FALSE;
}
} else
src_time = sq->srctime;
GST_DEBUG_OBJECT (mq,
"queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id,
GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));
"queue %d, sink %" GST_STIME_FORMAT ", src %" GST_STIME_FORMAT, sq->id,
GST_STIME_ARGS (sink_time), GST_STIME_ARGS (src_time));
/* This allows for streams with out of order timestamping - sometimes the
* emerging timestamp is later than the arriving one(s) */
if (G_LIKELY (sink_time != -1 && src_time != -1 && sink_time > src_time))
if (G_LIKELY (GST_CLOCK_STIME_IS_VALID (sink_time) &&
GST_CLOCK_STIME_IS_VALID (src_time) && sink_time > src_time))
sq->cur_time = sink_time - src_time;
else
sq->cur_time = 0;
@ -1281,10 +1297,10 @@ apply_gap (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
gst_multi_queue_post_buffering (mq);
}
static GstClockTime
static GstClockTimeDiff
get_running_time (GstSegment * segment, GstMiniObject * object, gboolean end)
{
GstClockTime time = GST_CLOCK_TIME_NONE;
GstClockTimeDiff time = GST_CLOCK_STIME_NONE;
if (GST_IS_BUFFER (object)) {
GstBuffer *buf = GST_BUFFER_CAST (object);
@ -1295,7 +1311,7 @@ get_running_time (GstSegment * segment, GstMiniObject * object, gboolean end)
btime += GST_BUFFER_DURATION (buf);
if (btime > segment->stop)
btime = segment->stop;
time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, btime);
time = my_segment_to_running_time (segment, btime);
}
} else if (GST_IS_BUFFER_LIST (object)) {
GstBufferList *list = GST_BUFFER_LIST_CAST (object);
@ -1310,9 +1326,9 @@ get_running_time (GstSegment * segment, GstMiniObject * object, gboolean end)
if (GST_CLOCK_TIME_IS_VALID (btime)) {
if (end && GST_BUFFER_DURATION_IS_VALID (buf))
btime += GST_BUFFER_DURATION (buf);
if (time > segment->stop)
if (btime > segment->stop)
btime = segment->stop;
time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, btime);
time = my_segment_to_running_time (segment, btime);
if (!end)
goto done;
} else if (!end) {
@ -1329,7 +1345,7 @@ get_running_time (GstSegment * segment, GstMiniObject * object, gboolean end)
gst_event_parse_segment (event, &new_segment);
if (new_segment->format == GST_FORMAT_TIME) {
time =
gst_segment_to_running_time (new_segment, GST_FORMAT_TIME,
my_segment_to_running_time ((GstSegment *) new_segment,
new_segment->start);
}
}
@ -1509,7 +1525,7 @@ gst_multi_queue_loop (GstPad * pad)
GstMiniObject *object = NULL;
guint32 newid;
GstFlowReturn result;
GstClockTime next_time;
GstClockTimeDiff next_time;
gboolean is_buffer;
gboolean do_update_buffering = FALSE;
gboolean dropping = FALSE;
@ -1537,7 +1553,7 @@ next:
is_buffer = GST_IS_BUFFER (object);
/* Get running time of the item. Events will have GST_CLOCK_TIME_NONE */
/* Get running time of the item. Events will have GST_CLOCK_STIME_NONE */
next_time = get_running_time (&sq->src_segment, object, FALSE);
GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d",
@ -1565,7 +1581,7 @@ next:
/* Update the nextid so other threads know when to wake us up */
sq->nextid = newid;
/* Take into account the extra cache time since we're unlinked */
if (GST_CLOCK_TIME_IS_VALID (next_time))
if (GST_CLOCK_STIME_IS_VALID (next_time))
next_time += mq->unlinked_cache_time;
sq->next_time = next_time;
@ -1581,17 +1597,17 @@ next:
/* Recompute the high time */
compute_high_time (mq);
while (((mq->sync_by_running_time && next_time != GST_CLOCK_TIME_NONE &&
(mq->high_time == GST_CLOCK_TIME_NONE
while (((mq->sync_by_running_time && GST_CLOCK_STIME_IS_VALID (next_time)
&& (mq->high_time == GST_CLOCK_STIME_NONE
|| next_time > mq->high_time))
|| (!mq->sync_by_running_time && newid > mq->highid))
&& sq->srcresult == GST_FLOW_NOT_LINKED) {
GST_DEBUG_OBJECT (mq,
"queue %d sleeping for not-linked wakeup with "
"newid %u, highid %u, next_time %" GST_TIME_FORMAT
", high_time %" GST_TIME_FORMAT, sq->id, newid, mq->highid,
GST_TIME_ARGS (next_time), GST_TIME_ARGS (mq->high_time));
"newid %u, highid %u, next_time %" GST_STIME_FORMAT
", high_time %" GST_STIME_FORMAT, sq->id, newid, mq->highid,
GST_STIME_ARGS (next_time), GST_STIME_ARGS (mq->high_time));
/* Wake up all non-linked pads before we sleep */
wake_up_next_non_linked (mq);
@ -1609,9 +1625,9 @@ next:
compute_high_time (mq);
GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked "
"wakeup with newid %u, highid %u, next_time %" GST_TIME_FORMAT
", high_time %" GST_TIME_FORMAT, sq->id, newid, mq->highid,
GST_TIME_ARGS (next_time), GST_TIME_ARGS (mq->high_time));
"wakeup with newid %u, highid %u, next_time %" GST_STIME_FORMAT
", high_time %" GST_STIME_FORMAT, sq->id, newid, mq->highid,
GST_STIME_ARGS (next_time), GST_STIME_ARGS (mq->high_time));
}
/* Re-compute the high_id in case someone else pushed */
@ -1625,7 +1641,7 @@ next:
}
/* We're done waiting, we can clear the nextid and nexttime */
sq->nextid = 0;
sq->next_time = GST_CLOCK_TIME_NONE;
sq->next_time = GST_CLOCK_STIME_NONE;
}
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
@ -1638,10 +1654,10 @@ next:
/* Update time stats */
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
next_time = get_running_time (&sq->src_segment, object, TRUE);
if (next_time != GST_CLOCK_TIME_NONE) {
if (sq->last_time == GST_CLOCK_TIME_NONE || sq->last_time < next_time)
if (GST_CLOCK_STIME_IS_VALID (next_time)) {
if (sq->last_time == GST_CLOCK_STIME_NONE || sq->last_time < next_time)
sq->last_time = next_time;
if (mq->high_time == GST_CLOCK_TIME_NONE || mq->high_time <= next_time) {
if (mq->high_time == GST_CLOCK_STIME_NONE || mq->high_time <= next_time) {
/* Wake up all non-linked pads now that we advanced the high time */
mq->high_time = next_time;
wake_up_next_non_linked (mq);
@ -1987,16 +2003,15 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
sref = gst_event_ref (event);
if (mq->use_interleave) {
GstClockTime val, dur;
GstClockTime stime;
gst_event_parse_gap (event, &val, &dur);
if (GST_CLOCK_TIME_IS_VALID (val)) {
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
if (GST_CLOCK_TIME_IS_VALID (dur))
val += dur;
val =
gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME,
val);
if (GST_CLOCK_TIME_IS_VALID (val)) {
sq->cached_sinktime = val;
stime = my_segment_to_running_time (&sq->sink_segment, val);
if (GST_CLOCK_STIME_IS_VALID (stime)) {
sq->cached_sinktime = stime;
calculate_interleave (mq);
}
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
@ -2250,12 +2265,12 @@ wake_up_next_non_linked (GstMultiQueue * mq)
if (mq->numwaiting < 1)
return;
if (mq->sync_by_running_time && mq->high_time != GST_CLOCK_TIME_NONE) {
if (mq->sync_by_running_time && GST_CLOCK_STIME_IS_VALID (mq->high_time)) {
/* Else figure out which singlequeue(s) need waking up */
for (tmp = mq->queues; tmp; tmp = tmp->next) {
GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
if (sq->srcresult == GST_FLOW_NOT_LINKED
&& sq->next_time != GST_CLOCK_TIME_NONE
&& GST_CLOCK_STIME_IS_VALID (sq->next_time)
&& sq->next_time <= mq->high_time) {
GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id);
g_cond_signal (&sq->turn);
@ -2325,8 +2340,8 @@ compute_high_time (GstMultiQueue * mq)
* pads, or if all pads are not-linked, it's the lowest nex time of
* not-linked pad */
GList *tmp;
GstClockTime highest = GST_CLOCK_TIME_NONE;
GstClockTime lowest = GST_CLOCK_TIME_NONE;
GstClockTimeDiff highest = GST_CLOCK_STIME_NONE;
GstClockTimeDiff lowest = GST_CLOCK_STIME_NONE;
if (!mq->sync_by_running_time)
return;
@ -2335,41 +2350,43 @@ compute_high_time (GstMultiQueue * mq)
GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
GST_LOG_OBJECT (mq,
"inspecting sq:%d , next_time:%" GST_TIME_FORMAT ", last_time:%"
GST_TIME_FORMAT ", srcresult:%s", sq->id, GST_TIME_ARGS (sq->next_time),
GST_TIME_ARGS (sq->last_time), gst_flow_get_name (sq->srcresult));
"inspecting sq:%d , next_time:%" GST_STIME_FORMAT ", last_time:%"
GST_STIME_FORMAT ", srcresult:%s", sq->id,
GST_STIME_ARGS (sq->next_time), GST_STIME_ARGS (sq->last_time),
gst_flow_get_name (sq->srcresult));
if (sq->srcresult == GST_FLOW_NOT_LINKED) {
/* No need to consider queues which are not waiting */
if (sq->next_time == GST_CLOCK_TIME_NONE) {
if (!GST_CLOCK_STIME_IS_VALID (sq->next_time)) {
GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id);
continue;
}
if (lowest == GST_CLOCK_TIME_NONE || sq->next_time < lowest)
if (lowest == GST_CLOCK_STIME_NONE || sq->next_time < lowest)
lowest = sq->next_time;
} else if (sq->srcresult != GST_FLOW_EOS) {
/* If we don't have a global high time, or the global high time
* is lower than this single queue's last outputted time, store
* the queue's one, unless the singlequeue is at EOS (srcresult
* = EOS) */
if (highest == GST_CLOCK_TIME_NONE
|| (sq->last_time != GST_CLOCK_TIME_NONE && sq->last_time > highest))
if (highest == GST_CLOCK_STIME_NONE
|| (sq->last_time != GST_CLOCK_STIME_NONE && sq->last_time > highest))
highest = sq->last_time;
}
GST_LOG_OBJECT (mq,
"highest now %" GST_TIME_FORMAT " lowest %" GST_TIME_FORMAT,
GST_TIME_ARGS (highest), GST_TIME_ARGS (lowest));
"highest now %" GST_STIME_FORMAT " lowest %" GST_STIME_FORMAT,
GST_STIME_ARGS (highest), GST_STIME_ARGS (lowest));
}
if (highest == GST_CLOCK_TIME_NONE)
if (highest == GST_CLOCK_STIME_NONE)
mq->high_time = lowest;
else
mq->high_time = highest;
GST_LOG_OBJECT (mq,
"High time is now : %" GST_TIME_FORMAT ", lowest non-linked %"
GST_TIME_FORMAT, GST_TIME_ARGS (mq->high_time), GST_TIME_ARGS (lowest));
"High time is now : %" GST_STIME_FORMAT ", lowest non-linked %"
GST_STIME_FORMAT, GST_STIME_ARGS (mq->high_time),
GST_STIME_ARGS (lowest));
}
#define IS_FILLED(q, format, value) (((q)->max_size.format) != 0 && \
@ -2641,13 +2658,13 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id)
sq->nextid = 0;
sq->oldid = 0;
sq->next_time = GST_CLOCK_TIME_NONE;
sq->last_time = GST_CLOCK_TIME_NONE;
sq->next_time = GST_CLOCK_STIME_NONE;
sq->last_time = GST_CLOCK_STIME_NONE;
g_cond_init (&sq->turn);
g_cond_init (&sq->query_handled);
sq->sinktime = GST_CLOCK_TIME_NONE;
sq->srctime = GST_CLOCK_TIME_NONE;
sq->sinktime = GST_CLOCK_STIME_NONE;
sq->srctime = GST_CLOCK_STIME_NONE;
sq->sink_tainted = TRUE;
sq->src_tainted = TRUE;

View file

@ -68,7 +68,7 @@ struct _GstMultiQueue {
guint counter; /* incoming object counter, use atomic accesses */
guint32 highid; /* contains highest id of last outputted object */
GstClockTime high_time; /* highest start running time */
GstClockTimeDiff high_time; /* highest start running time */
GMutex qlock; /* Global queue lock (vs object lock or individual */
/* queues lock). Protects nbqueues, queues, global */
@ -80,7 +80,7 @@ struct _GstMultiQueue {
GMutex buffering_post_lock; /* assures only one posted at a time */
GstClockTime interleave; /* Input interleave */
GstClockTime last_interleave_update;
GstClockTimeDiff last_interleave_update;
GstClockTime unlinked_cache_time;
};