multiqueue: Fix high_time wakeup logic

When calculating the high_time, cache the group value in each singlequeue.

This fixes the issue by which wake_up_next_non_linked() would use the global
high-time to decide whether to wake-up a waiting thread, instead of the group
one, resulting in those threads constantly spinning.

Tidy up a bit the waiting logic while we're at it.

With this patch, we go from 212% playing a 8 audio / 8 video file down to less
than 10% (most of it being the video decoding).

https://bugzilla.gnome.org/show_bug.cgi?id=770225
This commit is contained in:
Edward Hervey 2016-08-23 14:57:33 +09:00 committed by Sebastian Dröge
parent 3cae933579
commit 3117525cb6

View file

@ -139,6 +139,7 @@ struct _GstSingleQueue
guint id; guint id;
/* group of streams to which this queue belongs to */ /* group of streams to which this queue belongs to */
guint groupid; guint groupid;
GstClockTimeDiff group_high_time;
GstMultiQueue *mqueue; GstMultiQueue *mqueue;
@ -216,7 +217,7 @@ static void gst_single_queue_free (GstSingleQueue * squeue);
static void wake_up_next_non_linked (GstMultiQueue * mq); static void wake_up_next_non_linked (GstMultiQueue * mq);
static void compute_high_id (GstMultiQueue * mq); static void compute_high_id (GstMultiQueue * mq);
static GstClockTimeDiff compute_high_time (GstMultiQueue * mq, guint groupid); static void compute_high_time (GstMultiQueue * mq, guint groupid);
static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq); static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq); static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
@ -1019,6 +1020,7 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush,
sq->next_time = GST_CLOCK_STIME_NONE; sq->next_time = GST_CLOCK_STIME_NONE;
sq->last_time = GST_CLOCK_STIME_NONE; sq->last_time = GST_CLOCK_STIME_NONE;
sq->cached_sinktime = GST_CLOCK_STIME_NONE; sq->cached_sinktime = GST_CLOCK_STIME_NONE;
sq->group_high_time = GST_CLOCK_STIME_NONE;
gst_data_queue_set_flushing (sq->queue, FALSE); gst_data_queue_set_flushing (sq->queue, FALSE);
/* We will become active again on the next buffer/gap */ /* We will become active again on the next buffer/gap */
@ -1733,30 +1735,37 @@ next:
sq->oldid = sq->last_oldid; sq->oldid = sq->last_oldid;
if (sq->srcresult == GST_FLOW_NOT_LINKED) { if (sq->srcresult == GST_FLOW_NOT_LINKED) {
GstClockTimeDiff high_time; gboolean should_wait;
/* Go to sleep until it's time to push this buffer */ /* Go to sleep until it's time to push this buffer */
/* Recompute the highid */ /* Recompute the highid */
compute_high_id (mq); compute_high_id (mq);
/* Recompute the high time */ /* Recompute the high time */
high_time = compute_high_time (mq, sq->groupid); compute_high_time (mq, sq->groupid);
GST_DEBUG_OBJECT (mq, GST_DEBUG_OBJECT (mq,
"groupid %d high_time %" GST_STIME_FORMAT " next_time %" "groupid %d high_time %" GST_STIME_FORMAT " next_time %"
GST_STIME_FORMAT, sq->groupid, GST_STIME_ARGS (high_time), GST_STIME_FORMAT, sq->groupid, GST_STIME_ARGS (sq->group_high_time),
GST_STIME_ARGS (next_time)); GST_STIME_ARGS (next_time));
while (((mq->sync_by_running_time && GST_CLOCK_STIME_IS_VALID (next_time) if (mq->sync_by_running_time)
&& (high_time == GST_CLOCK_STIME_NONE /* In this case we only need to wait if:
|| next_time > high_time)) * 1) there is a time against which to wait
|| (!mq->sync_by_running_time && newid > mq->highid)) * 2) and either we have gone over the high_time or there is no
&& sq->srcresult == GST_FLOW_NOT_LINKED) { * high_time */
should_wait = GST_CLOCK_STIME_IS_VALID (next_time) &&
(sq->group_high_time == GST_CLOCK_STIME_NONE
|| next_time > sq->group_high_time);
else
should_wait = newid > mq->highid;
while (should_wait && sq->srcresult == GST_FLOW_NOT_LINKED) {
GST_DEBUG_OBJECT (mq, GST_DEBUG_OBJECT (mq,
"queue %d sleeping for not-linked wakeup with " "queue %d sleeping for not-linked wakeup with "
"newid %u, highid %u, next_time %" GST_STIME_FORMAT "newid %u, highid %u, next_time %" GST_STIME_FORMAT
", high_time %" GST_STIME_FORMAT, sq->id, newid, mq->highid, ", high_time %" GST_STIME_FORMAT, sq->id, newid, mq->highid,
GST_STIME_ARGS (next_time), GST_STIME_ARGS (high_time)); GST_STIME_ARGS (next_time), GST_STIME_ARGS (sq->group_high_time));
/* Wake up all non-linked pads before we sleep */ /* Wake up all non-linked pads before we sleep */
wake_up_next_non_linked (mq); wake_up_next_non_linked (mq);
@ -1771,13 +1780,20 @@ next:
} }
/* Recompute the high time and ID */ /* Recompute the high time and ID */
high_time = compute_high_time (mq, sq->groupid); compute_high_time (mq, sq->groupid);
compute_high_id (mq); compute_high_id (mq);
GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked " GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked "
"wakeup with newid %u, highid %u, next_time %" GST_STIME_FORMAT "wakeup with newid %u, highid %u, next_time %" GST_STIME_FORMAT
", high_time %" GST_STIME_FORMAT, sq->id, newid, mq->highid, ", high_time %" GST_STIME_FORMAT, sq->id, newid, mq->highid,
GST_STIME_ARGS (next_time), GST_STIME_ARGS (high_time)); GST_STIME_ARGS (next_time), GST_STIME_ARGS (sq->group_high_time));
if (mq->sync_by_running_time)
should_wait = GST_CLOCK_STIME_IS_VALID (next_time) &&
(sq->group_high_time == GST_CLOCK_STIME_NONE
|| next_time > sq->group_high_time);
else
should_wait = newid > mq->highid;
} }
/* Re-compute the high_id in case someone else pushed */ /* Re-compute the high_id in case someone else pushed */
@ -1836,6 +1852,7 @@ next:
sq->id); sq->id);
compute_high_id (mq); compute_high_id (mq);
compute_high_time (mq, sq->groupid);
do_update_buffering = TRUE; do_update_buffering = TRUE;
/* maybe no-one is waiting */ /* maybe no-one is waiting */
@ -2419,8 +2436,9 @@ wake_up_next_non_linked (GstMultiQueue * mq)
for (tmp = mq->queues; tmp; tmp = tmp->next) { for (tmp = mq->queues; tmp; tmp = tmp->next) {
GstSingleQueue *sq = (GstSingleQueue *) tmp->data; GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
if (sq->srcresult == GST_FLOW_NOT_LINKED if (sq->srcresult == GST_FLOW_NOT_LINKED
&& GST_CLOCK_STIME_IS_VALID (sq->group_high_time)
&& GST_CLOCK_STIME_IS_VALID (sq->next_time) && GST_CLOCK_STIME_IS_VALID (sq->next_time)
&& sq->next_time <= mq->high_time) { && sq->next_time <= sq->group_high_time) {
GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id); GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id);
g_cond_signal (&sq->turn); g_cond_signal (&sq->turn);
} }
@ -2482,7 +2500,7 @@ compute_high_id (GstMultiQueue * mq)
} }
/* WITH LOCK TAKEN */ /* WITH LOCK TAKEN */
static GstClockTimeDiff static void
compute_high_time (GstMultiQueue * mq, guint groupid) compute_high_time (GstMultiQueue * mq, guint groupid)
{ {
/* The high-time is either the highest last time among the linked /* The high-time is either the highest last time among the linked
@ -2493,11 +2511,13 @@ compute_high_time (GstMultiQueue * mq, guint groupid)
GstClockTimeDiff lowest = GST_CLOCK_STIME_NONE; GstClockTimeDiff lowest = GST_CLOCK_STIME_NONE;
GstClockTimeDiff group_high = GST_CLOCK_STIME_NONE; GstClockTimeDiff group_high = GST_CLOCK_STIME_NONE;
GstClockTimeDiff group_low = GST_CLOCK_STIME_NONE; GstClockTimeDiff group_low = GST_CLOCK_STIME_NONE;
GstClockTimeDiff res;
/* Number of streams which belong to groupid */ /* Number of streams which belong to groupid */
guint group_count = 0; guint group_count = 0;
if (!mq->sync_by_running_time) if (!mq->sync_by_running_time)
return GST_CLOCK_STIME_NONE; /* return GST_CLOCK_STIME_NONE; */
return;
for (tmp = mq->queues; tmp; tmp = tmp->next) { for (tmp = mq->queues; tmp; tmp = tmp->next) {
GstSingleQueue *sq = (GstSingleQueue *) tmp->data; GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
@ -2557,11 +2577,17 @@ compute_high_time (GstMultiQueue * mq, guint groupid)
/* If there's only one stream of a given type, use the global high */ /* If there's only one stream of a given type, use the global high */
if (group_count < 2) if (group_count < 2)
return mq->high_time; res = mq->high_time;
else if (group_high == GST_CLOCK_STIME_NONE)
res = group_low;
else
res = group_high;
if (group_high == GST_CLOCK_STIME_NONE) for (tmp = mq->queues; tmp; tmp = tmp->next) {
return group_low; GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
return group_high; if (groupid == sq->groupid)
sq->group_high_time = res;
}
} }
#define IS_FILLED(q, format, value) (((q)->max_size.format) != 0 && \ #define IS_FILLED(q, format, value) (((q)->max_size.format) != 0 && \
@ -2807,6 +2833,7 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id)
mqueue->nbqueues++; mqueue->nbqueues++;
sq->id = temp_id; sq->id = temp_id;
sq->groupid = DEFAULT_PAD_GROUP_ID; sq->groupid = DEFAULT_PAD_GROUP_ID;
sq->group_high_time = GST_CLOCK_STIME_NONE;
mqueue->queues = g_list_insert_before (mqueue->queues, tmp, sq); mqueue->queues = g_list_insert_before (mqueue->queues, tmp, sq);
mqueue->queues_cookie++; mqueue->queues_cookie++;