From 4d96e5b8341732326243777a3667956fb4c9daf0 Mon Sep 17 00:00:00 2001 From: Edward Hervey Date: Mon, 30 Nov 2015 17:09:43 +0100 Subject: [PATCH] multiqueue: Use signed clock values for running time calculation This improves the accuracy of queue levels and when to push buffers for buffers falling outside of the segment https://bugzilla.gnome.org/show_bug.cgi?id=757193 --- plugins/elements/gstmultiqueue.c | 157 +++++++++++++++++-------------- plugins/elements/gstmultiqueue.h | 4 +- 2 files changed, 89 insertions(+), 72 deletions(-) diff --git a/plugins/elements/gstmultiqueue.c b/plugins/elements/gstmultiqueue.c index 1df1068928..db9f445dbe 100644 --- a/plugins/elements/gstmultiqueue.c +++ b/plugins/elements/gstmultiqueue.c @@ -149,7 +149,7 @@ struct _GstSingleQueue * in every segment usage */ /* position of src/sink */ - GstClockTime sinktime, srctime; + GstClockTimeDiff sinktime, srctime; /* cached input value, used for interleave */ GstClockTimeDiff cached_sinktime; /* TRUE if either position needs to be recalculated */ @@ -168,8 +168,8 @@ struct _GstSingleQueue guint32 nextid; /* ID of the next object waiting to be pushed */ guint32 oldid; /* ID of the last object pushed (last in a series) */ guint32 last_oldid; /* Previously observed old_id, reset to MAXUINT32 on flush */ - GstClockTime next_time; /* End running time of next buffer to be pushed */ - GstClockTime last_time; /* Start running time of last pushed buffer */ + GstClockTimeDiff next_time; /* End running time of next buffer to be pushed */ + GstClockTimeDiff last_time; /* Start running time of last pushed buffer */ GCond turn; /* SingleQueue turn waiting conditional */ /* for serialized queries */ @@ -292,6 +292,23 @@ enum } \ } G_STMT_END +/* Convenience function */ +static inline GstClockTimeDiff +my_segment_to_running_time (GstSegment * segment, GstClockTime val) +{ + GstClockTimeDiff res = GST_CLOCK_STIME_NONE; + + if (GST_CLOCK_TIME_IS_VALID (val)) { + gboolean sign = + gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val); + if (sign > 0) + res = val; + else if (sign < 0) + res = -val; + } + return res; +} + static void gst_multi_queue_finalize (GObject * object); static void gst_multi_queue_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); @@ -494,7 +511,7 @@ gst_multi_queue_init (GstMultiQueue * mqueue) mqueue->counter = 1; mqueue->highid = -1; - mqueue->high_time = GST_CLOCK_TIME_NONE; + mqueue->high_time = GST_CLOCK_STIME_NONE; g_mutex_init (&mqueue->qlock); g_mutex_init (&mqueue->buffering_post_lock); @@ -904,13 +921,13 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush, sq->nextid = 0; sq->oldid = 0; sq->last_oldid = G_MAXUINT32; - sq->next_time = GST_CLOCK_TIME_NONE; - sq->last_time = GST_CLOCK_TIME_NONE; - sq->cached_sinktime = GST_CLOCK_TIME_NONE; + sq->next_time = GST_CLOCK_STIME_NONE; + sq->last_time = GST_CLOCK_STIME_NONE; + sq->cached_sinktime = GST_CLOCK_STIME_NONE; gst_data_queue_set_flushing (sq->queue, FALSE); /* Reset high time to be recomputed next */ - mq->high_time = GST_CLOCK_TIME_NONE; + mq->high_time = GST_CLOCK_STIME_NONE; sq->flushing = FALSE; GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); @@ -1090,10 +1107,10 @@ beach: GST_DEBUG_OBJECT (mq, "low:%" GST_STIME_FORMAT " high:%" GST_STIME_FORMAT " interleave:%" GST_TIME_FORMAT " mq->interleave:%" GST_TIME_FORMAT - " last_interleave_update:%" GST_TIME_FORMAT, GST_TIME_ARGS (low), - GST_TIME_ARGS (high), GST_TIME_ARGS (interleave), + " last_interleave_update:%" GST_STIME_FORMAT, GST_STIME_ARGS (low), + GST_STIME_ARGS (high), GST_TIME_ARGS (interleave), GST_TIME_ARGS (mq->interleave), - GST_TIME_ARGS (mq->last_interleave_update)); + GST_STIME_ARGS (mq->last_interleave_update)); } @@ -1103,26 +1120,25 @@ beach: static void update_time_level (GstMultiQueue * mq, GstSingleQueue * sq) { - gint64 sink_time, src_time; + GstClockTimeDiff sink_time, src_time; if (sq->sink_tainted) { - sink_time = sq->sinktime = - gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME, + sink_time = sq->sinktime = my_segment_to_running_time (&sq->sink_segment, sq->sink_segment.position); GST_DEBUG_OBJECT (mq, "queue %d sink_segment.position:%" GST_TIME_FORMAT ", sink_time:%" - GST_TIME_FORMAT, sq->id, GST_TIME_ARGS (sq->sink_segment.position), - GST_TIME_ARGS (sink_time)); + GST_STIME_FORMAT, sq->id, GST_TIME_ARGS (sq->sink_segment.position), + GST_STIME_ARGS (sink_time)); - if (G_UNLIKELY (sq->last_time == GST_CLOCK_TIME_NONE)) { + if (G_UNLIKELY (sq->last_time == GST_CLOCK_STIME_NONE)) { /* If the single queue still doesn't have a last_time set, this means * that nothing has been pushed out yet. * In order for the high_time computation to be as efficient as possible, * we set the last_time */ sq->last_time = sink_time; } - if (G_UNLIKELY (sink_time != GST_CLOCK_TIME_NONE)) { + if (G_UNLIKELY (sink_time != GST_CLOCK_STIME_NONE)) { /* if we have a time, we become untainted and use the time */ sq->sink_tainted = FALSE; if (mq->use_interleave) { @@ -1153,22 +1169,22 @@ update_time_level (GstMultiQueue * mq, GstSingleQueue * sq) position = sq->sink_segment.position; } - src_time = sq->srctime = - gst_segment_to_running_time (segment, GST_FORMAT_TIME, position); + src_time = sq->srctime = my_segment_to_running_time (segment, position); /* if we have a time, we become untainted and use the time */ - if (G_UNLIKELY (src_time != GST_CLOCK_TIME_NONE)) { + if (G_UNLIKELY (src_time != GST_CLOCK_STIME_NONE)) { sq->src_tainted = FALSE; } } else src_time = sq->srctime; GST_DEBUG_OBJECT (mq, - "queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id, - GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time)); + "queue %d, sink %" GST_STIME_FORMAT ", src %" GST_STIME_FORMAT, sq->id, + GST_STIME_ARGS (sink_time), GST_STIME_ARGS (src_time)); /* This allows for streams with out of order timestamping - sometimes the * emerging timestamp is later than the arriving one(s) */ - if (G_LIKELY (sink_time != -1 && src_time != -1 && sink_time > src_time)) + if (G_LIKELY (GST_CLOCK_STIME_IS_VALID (sink_time) && + GST_CLOCK_STIME_IS_VALID (src_time) && sink_time > src_time)) sq->cur_time = sink_time - src_time; else sq->cur_time = 0; @@ -1281,10 +1297,10 @@ apply_gap (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event, gst_multi_queue_post_buffering (mq); } -static GstClockTime +static GstClockTimeDiff get_running_time (GstSegment * segment, GstMiniObject * object, gboolean end) { - GstClockTime time = GST_CLOCK_TIME_NONE; + GstClockTimeDiff time = GST_CLOCK_STIME_NONE; if (GST_IS_BUFFER (object)) { GstBuffer *buf = GST_BUFFER_CAST (object); @@ -1295,7 +1311,7 @@ get_running_time (GstSegment * segment, GstMiniObject * object, gboolean end) btime += GST_BUFFER_DURATION (buf); if (btime > segment->stop) btime = segment->stop; - time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, btime); + time = my_segment_to_running_time (segment, btime); } } else if (GST_IS_BUFFER_LIST (object)) { GstBufferList *list = GST_BUFFER_LIST_CAST (object); @@ -1310,9 +1326,9 @@ get_running_time (GstSegment * segment, GstMiniObject * object, gboolean end) if (GST_CLOCK_TIME_IS_VALID (btime)) { if (end && GST_BUFFER_DURATION_IS_VALID (buf)) btime += GST_BUFFER_DURATION (buf); - if (time > segment->stop) + if (btime > segment->stop) btime = segment->stop; - time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, btime); + time = my_segment_to_running_time (segment, btime); if (!end) goto done; } else if (!end) { @@ -1329,7 +1345,7 @@ get_running_time (GstSegment * segment, GstMiniObject * object, gboolean end) gst_event_parse_segment (event, &new_segment); if (new_segment->format == GST_FORMAT_TIME) { time = - gst_segment_to_running_time (new_segment, GST_FORMAT_TIME, + my_segment_to_running_time ((GstSegment *) new_segment, new_segment->start); } } @@ -1509,7 +1525,7 @@ gst_multi_queue_loop (GstPad * pad) GstMiniObject *object = NULL; guint32 newid; GstFlowReturn result; - GstClockTime next_time; + GstClockTimeDiff next_time; gboolean is_buffer; gboolean do_update_buffering = FALSE; gboolean dropping = FALSE; @@ -1537,7 +1553,7 @@ next: is_buffer = GST_IS_BUFFER (object); - /* Get running time of the item. Events will have GST_CLOCK_TIME_NONE */ + /* Get running time of the item. Events will have GST_CLOCK_STIME_NONE */ next_time = get_running_time (&sq->src_segment, object, FALSE); GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d", @@ -1565,7 +1581,7 @@ next: /* Update the nextid so other threads know when to wake us up */ sq->nextid = newid; /* Take into account the extra cache time since we're unlinked */ - if (GST_CLOCK_TIME_IS_VALID (next_time)) + if (GST_CLOCK_STIME_IS_VALID (next_time)) next_time += mq->unlinked_cache_time; sq->next_time = next_time; @@ -1581,17 +1597,17 @@ next: /* Recompute the high time */ compute_high_time (mq); - while (((mq->sync_by_running_time && next_time != GST_CLOCK_TIME_NONE && - (mq->high_time == GST_CLOCK_TIME_NONE + while (((mq->sync_by_running_time && GST_CLOCK_STIME_IS_VALID (next_time) + && (mq->high_time == GST_CLOCK_STIME_NONE || next_time > mq->high_time)) || (!mq->sync_by_running_time && newid > mq->highid)) && sq->srcresult == GST_FLOW_NOT_LINKED) { GST_DEBUG_OBJECT (mq, "queue %d sleeping for not-linked wakeup with " - "newid %u, highid %u, next_time %" GST_TIME_FORMAT - ", high_time %" GST_TIME_FORMAT, sq->id, newid, mq->highid, - GST_TIME_ARGS (next_time), GST_TIME_ARGS (mq->high_time)); + "newid %u, highid %u, next_time %" GST_STIME_FORMAT + ", high_time %" GST_STIME_FORMAT, sq->id, newid, mq->highid, + GST_STIME_ARGS (next_time), GST_STIME_ARGS (mq->high_time)); /* Wake up all non-linked pads before we sleep */ wake_up_next_non_linked (mq); @@ -1609,9 +1625,9 @@ next: compute_high_time (mq); GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked " - "wakeup with newid %u, highid %u, next_time %" GST_TIME_FORMAT - ", high_time %" GST_TIME_FORMAT, sq->id, newid, mq->highid, - GST_TIME_ARGS (next_time), GST_TIME_ARGS (mq->high_time)); + "wakeup with newid %u, highid %u, next_time %" GST_STIME_FORMAT + ", high_time %" GST_STIME_FORMAT, sq->id, newid, mq->highid, + GST_STIME_ARGS (next_time), GST_STIME_ARGS (mq->high_time)); } /* Re-compute the high_id in case someone else pushed */ @@ -1625,7 +1641,7 @@ next: } /* We're done waiting, we can clear the nextid and nexttime */ sq->nextid = 0; - sq->next_time = GST_CLOCK_TIME_NONE; + sq->next_time = GST_CLOCK_STIME_NONE; } GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); @@ -1638,10 +1654,10 @@ next: /* Update time stats */ GST_MULTI_QUEUE_MUTEX_LOCK (mq); next_time = get_running_time (&sq->src_segment, object, TRUE); - if (next_time != GST_CLOCK_TIME_NONE) { - if (sq->last_time == GST_CLOCK_TIME_NONE || sq->last_time < next_time) + if (GST_CLOCK_STIME_IS_VALID (next_time)) { + if (sq->last_time == GST_CLOCK_STIME_NONE || sq->last_time < next_time) sq->last_time = next_time; - if (mq->high_time == GST_CLOCK_TIME_NONE || mq->high_time <= next_time) { + if (mq->high_time == GST_CLOCK_STIME_NONE || mq->high_time <= next_time) { /* Wake up all non-linked pads now that we advanced the high time */ mq->high_time = next_time; wake_up_next_non_linked (mq); @@ -1987,16 +2003,15 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) sref = gst_event_ref (event); if (mq->use_interleave) { GstClockTime val, dur; + GstClockTime stime; gst_event_parse_gap (event, &val, &dur); if (GST_CLOCK_TIME_IS_VALID (val)) { GST_MULTI_QUEUE_MUTEX_LOCK (mq); if (GST_CLOCK_TIME_IS_VALID (dur)) val += dur; - val = - gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME, - val); - if (GST_CLOCK_TIME_IS_VALID (val)) { - sq->cached_sinktime = val; + stime = my_segment_to_running_time (&sq->sink_segment, val); + if (GST_CLOCK_STIME_IS_VALID (stime)) { + sq->cached_sinktime = stime; calculate_interleave (mq); } GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); @@ -2250,12 +2265,12 @@ wake_up_next_non_linked (GstMultiQueue * mq) if (mq->numwaiting < 1) return; - if (mq->sync_by_running_time && mq->high_time != GST_CLOCK_TIME_NONE) { + if (mq->sync_by_running_time && GST_CLOCK_STIME_IS_VALID (mq->high_time)) { /* Else figure out which singlequeue(s) need waking up */ for (tmp = mq->queues; tmp; tmp = tmp->next) { GstSingleQueue *sq = (GstSingleQueue *) tmp->data; if (sq->srcresult == GST_FLOW_NOT_LINKED - && sq->next_time != GST_CLOCK_TIME_NONE + && GST_CLOCK_STIME_IS_VALID (sq->next_time) && sq->next_time <= mq->high_time) { GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id); g_cond_signal (&sq->turn); @@ -2325,8 +2340,8 @@ compute_high_time (GstMultiQueue * mq) * pads, or if all pads are not-linked, it's the lowest nex time of * not-linked pad */ GList *tmp; - GstClockTime highest = GST_CLOCK_TIME_NONE; - GstClockTime lowest = GST_CLOCK_TIME_NONE; + GstClockTimeDiff highest = GST_CLOCK_STIME_NONE; + GstClockTimeDiff lowest = GST_CLOCK_STIME_NONE; if (!mq->sync_by_running_time) return; @@ -2335,41 +2350,43 @@ compute_high_time (GstMultiQueue * mq) GstSingleQueue *sq = (GstSingleQueue *) tmp->data; GST_LOG_OBJECT (mq, - "inspecting sq:%d , next_time:%" GST_TIME_FORMAT ", last_time:%" - GST_TIME_FORMAT ", srcresult:%s", sq->id, GST_TIME_ARGS (sq->next_time), - GST_TIME_ARGS (sq->last_time), gst_flow_get_name (sq->srcresult)); + "inspecting sq:%d , next_time:%" GST_STIME_FORMAT ", last_time:%" + GST_STIME_FORMAT ", srcresult:%s", sq->id, + GST_STIME_ARGS (sq->next_time), GST_STIME_ARGS (sq->last_time), + gst_flow_get_name (sq->srcresult)); if (sq->srcresult == GST_FLOW_NOT_LINKED) { /* No need to consider queues which are not waiting */ - if (sq->next_time == GST_CLOCK_TIME_NONE) { + if (!GST_CLOCK_STIME_IS_VALID (sq->next_time)) { GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id); continue; } - if (lowest == GST_CLOCK_TIME_NONE || sq->next_time < lowest) + if (lowest == GST_CLOCK_STIME_NONE || sq->next_time < lowest) lowest = sq->next_time; } else if (sq->srcresult != GST_FLOW_EOS) { /* If we don't have a global high time, or the global high time * is lower than this single queue's last outputted time, store * the queue's one, unless the singlequeue is at EOS (srcresult * = EOS) */ - if (highest == GST_CLOCK_TIME_NONE - || (sq->last_time != GST_CLOCK_TIME_NONE && sq->last_time > highest)) + if (highest == GST_CLOCK_STIME_NONE + || (sq->last_time != GST_CLOCK_STIME_NONE && sq->last_time > highest)) highest = sq->last_time; } GST_LOG_OBJECT (mq, - "highest now %" GST_TIME_FORMAT " lowest %" GST_TIME_FORMAT, - GST_TIME_ARGS (highest), GST_TIME_ARGS (lowest)); + "highest now %" GST_STIME_FORMAT " lowest %" GST_STIME_FORMAT, + GST_STIME_ARGS (highest), GST_STIME_ARGS (lowest)); } - if (highest == GST_CLOCK_TIME_NONE) + if (highest == GST_CLOCK_STIME_NONE) mq->high_time = lowest; else mq->high_time = highest; GST_LOG_OBJECT (mq, - "High time is now : %" GST_TIME_FORMAT ", lowest non-linked %" - GST_TIME_FORMAT, GST_TIME_ARGS (mq->high_time), GST_TIME_ARGS (lowest)); + "High time is now : %" GST_STIME_FORMAT ", lowest non-linked %" + GST_STIME_FORMAT, GST_STIME_ARGS (mq->high_time), + GST_STIME_ARGS (lowest)); } #define IS_FILLED(q, format, value) (((q)->max_size.format) != 0 && \ @@ -2641,13 +2658,13 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id) sq->nextid = 0; sq->oldid = 0; - sq->next_time = GST_CLOCK_TIME_NONE; - sq->last_time = GST_CLOCK_TIME_NONE; + sq->next_time = GST_CLOCK_STIME_NONE; + sq->last_time = GST_CLOCK_STIME_NONE; g_cond_init (&sq->turn); g_cond_init (&sq->query_handled); - sq->sinktime = GST_CLOCK_TIME_NONE; - sq->srctime = GST_CLOCK_TIME_NONE; + sq->sinktime = GST_CLOCK_STIME_NONE; + sq->srctime = GST_CLOCK_STIME_NONE; sq->sink_tainted = TRUE; sq->src_tainted = TRUE; diff --git a/plugins/elements/gstmultiqueue.h b/plugins/elements/gstmultiqueue.h index dac704fcfb..98e4bc1d52 100644 --- a/plugins/elements/gstmultiqueue.h +++ b/plugins/elements/gstmultiqueue.h @@ -68,7 +68,7 @@ struct _GstMultiQueue { guint counter; /* incoming object counter, use atomic accesses */ guint32 highid; /* contains highest id of last outputted object */ - GstClockTime high_time; /* highest start running time */ + GstClockTimeDiff high_time; /* highest start running time */ GMutex qlock; /* Global queue lock (vs object lock or individual */ /* queues lock). Protects nbqueues, queues, global */ @@ -80,7 +80,7 @@ struct _GstMultiQueue { GMutex buffering_post_lock; /* assures only one posted at a time */ GstClockTime interleave; /* Input interleave */ - GstClockTime last_interleave_update; + GstClockTimeDiff last_interleave_update; GstClockTime unlinked_cache_time; };