diff --git a/plugins/elements/gstmultiqueue.c b/plugins/elements/gstmultiqueue.c index 9503c2a97c..e382ba24c6 100644 --- a/plugins/elements/gstmultiqueue.c +++ b/plugins/elements/gstmultiqueue.c @@ -139,6 +139,7 @@ struct _GstSingleQueue guint id; /* group of streams to which this queue belongs to */ guint groupid; + GstClockTimeDiff group_high_time; GstMultiQueue *mqueue; @@ -216,7 +217,7 @@ static void gst_single_queue_free (GstSingleQueue * squeue); static void wake_up_next_non_linked (GstMultiQueue * mq); static void compute_high_id (GstMultiQueue * mq); -static GstClockTimeDiff compute_high_time (GstMultiQueue * mq, guint groupid); +static void compute_high_time (GstMultiQueue * mq, guint groupid); static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq); static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq); @@ -1019,6 +1020,7 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush, sq->next_time = GST_CLOCK_STIME_NONE; sq->last_time = GST_CLOCK_STIME_NONE; sq->cached_sinktime = GST_CLOCK_STIME_NONE; + sq->group_high_time = GST_CLOCK_STIME_NONE; gst_data_queue_set_flushing (sq->queue, FALSE); /* We will become active again on the next buffer/gap */ @@ -1733,30 +1735,37 @@ next: sq->oldid = sq->last_oldid; if (sq->srcresult == GST_FLOW_NOT_LINKED) { - GstClockTimeDiff high_time; + gboolean should_wait; /* Go to sleep until it's time to push this buffer */ /* Recompute the highid */ compute_high_id (mq); /* Recompute the high time */ - high_time = compute_high_time (mq, sq->groupid); + compute_high_time (mq, sq->groupid); GST_DEBUG_OBJECT (mq, "groupid %d high_time %" GST_STIME_FORMAT " next_time %" - GST_STIME_FORMAT, sq->groupid, GST_STIME_ARGS (high_time), + GST_STIME_FORMAT, sq->groupid, GST_STIME_ARGS (sq->group_high_time), GST_STIME_ARGS (next_time)); - while (((mq->sync_by_running_time && GST_CLOCK_STIME_IS_VALID (next_time) - && (high_time == GST_CLOCK_STIME_NONE - || next_time > high_time)) - || (!mq->sync_by_running_time && newid > mq->highid)) - && sq->srcresult == GST_FLOW_NOT_LINKED) { + if (mq->sync_by_running_time) + /* In this case we only need to wait if: + * 1) there is a time against which to wait + * 2) and either we have gone over the high_time or there is no + * high_time */ + should_wait = GST_CLOCK_STIME_IS_VALID (next_time) && + (sq->group_high_time == GST_CLOCK_STIME_NONE + || next_time > sq->group_high_time); + else + should_wait = newid > mq->highid; + + while (should_wait && sq->srcresult == GST_FLOW_NOT_LINKED) { GST_DEBUG_OBJECT (mq, "queue %d sleeping for not-linked wakeup with " "newid %u, highid %u, next_time %" GST_STIME_FORMAT ", high_time %" GST_STIME_FORMAT, sq->id, newid, mq->highid, - GST_STIME_ARGS (next_time), GST_STIME_ARGS (high_time)); + GST_STIME_ARGS (next_time), GST_STIME_ARGS (sq->group_high_time)); /* Wake up all non-linked pads before we sleep */ wake_up_next_non_linked (mq); @@ -1771,13 +1780,20 @@ next: } /* Recompute the high time and ID */ - high_time = compute_high_time (mq, sq->groupid); + compute_high_time (mq, sq->groupid); compute_high_id (mq); GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked " "wakeup with newid %u, highid %u, next_time %" GST_STIME_FORMAT ", high_time %" GST_STIME_FORMAT, sq->id, newid, mq->highid, - GST_STIME_ARGS (next_time), GST_STIME_ARGS (high_time)); + GST_STIME_ARGS (next_time), GST_STIME_ARGS (sq->group_high_time)); + + if (mq->sync_by_running_time) + should_wait = GST_CLOCK_STIME_IS_VALID (next_time) && + (sq->group_high_time == GST_CLOCK_STIME_NONE + || next_time > sq->group_high_time); + else + should_wait = newid > mq->highid; } /* Re-compute the high_id in case someone else pushed */ @@ -1836,6 +1852,7 @@ next: sq->id); compute_high_id (mq); + compute_high_time (mq, sq->groupid); do_update_buffering = TRUE; /* maybe no-one is waiting */ @@ -2419,8 +2436,9 @@ wake_up_next_non_linked (GstMultiQueue * mq) for (tmp = mq->queues; tmp; tmp = tmp->next) { GstSingleQueue *sq = (GstSingleQueue *) tmp->data; if (sq->srcresult == GST_FLOW_NOT_LINKED + && GST_CLOCK_STIME_IS_VALID (sq->group_high_time) && GST_CLOCK_STIME_IS_VALID (sq->next_time) - && sq->next_time <= mq->high_time) { + && sq->next_time <= sq->group_high_time) { GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id); g_cond_signal (&sq->turn); } @@ -2482,7 +2500,7 @@ compute_high_id (GstMultiQueue * mq) } /* WITH LOCK TAKEN */ -static GstClockTimeDiff +static void compute_high_time (GstMultiQueue * mq, guint groupid) { /* The high-time is either the highest last time among the linked @@ -2493,11 +2511,13 @@ compute_high_time (GstMultiQueue * mq, guint groupid) GstClockTimeDiff lowest = GST_CLOCK_STIME_NONE; GstClockTimeDiff group_high = GST_CLOCK_STIME_NONE; GstClockTimeDiff group_low = GST_CLOCK_STIME_NONE; + GstClockTimeDiff res; /* Number of streams which belong to groupid */ guint group_count = 0; if (!mq->sync_by_running_time) - return GST_CLOCK_STIME_NONE; + /* return GST_CLOCK_STIME_NONE; */ + return; for (tmp = mq->queues; tmp; tmp = tmp->next) { GstSingleQueue *sq = (GstSingleQueue *) tmp->data; @@ -2557,11 +2577,17 @@ compute_high_time (GstMultiQueue * mq, guint groupid) /* If there's only one stream of a given type, use the global high */ if (group_count < 2) - return mq->high_time; + res = mq->high_time; + else if (group_high == GST_CLOCK_STIME_NONE) + res = group_low; + else + res = group_high; - if (group_high == GST_CLOCK_STIME_NONE) - return group_low; - return group_high; + for (tmp = mq->queues; tmp; tmp = tmp->next) { + GstSingleQueue *sq = (GstSingleQueue *) tmp->data; + if (groupid == sq->groupid) + sq->group_high_time = res; + } } #define IS_FILLED(q, format, value) (((q)->max_size.format) != 0 && \ @@ -2807,6 +2833,7 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id) mqueue->nbqueues++; sq->id = temp_id; sq->groupid = DEFAULT_PAD_GROUP_ID; + sq->group_high_time = GST_CLOCK_STIME_NONE; mqueue->queues = g_list_insert_before (mqueue->queues, tmp, sq); mqueue->queues_cookie++;