multiqueue: Wake up all not-linked streams when a stream switches from linked to not-linked

We reset all the waiting streams, let them push another buffer to
see if they're now active again. This allows faster switching
between streams and prevents deadlocks if downstream does any
waiting too.

Also improve locking a bit, srcresult must be protected by the
multiqueue lock too because it's used/set from random threads.
This commit is contained in:
Sebastian Dröge 2012-03-29 14:45:41 +02:00
parent 17e691421f
commit 9441e711df

View file

@ -140,6 +140,12 @@ struct _GstSingleQueue
/* flowreturn of previous srcpad push */ /* flowreturn of previous srcpad push */
GstFlowReturn srcresult; GstFlowReturn srcresult;
/* If something was actually pushed on
* this pad after flushing/pad activation
* and the srcresult corresponds to something
* real
*/
gboolean pushed;
/* segments */ /* segments */
GstSegment sink_segment; GstSegment sink_segment;
@ -748,27 +754,29 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
sq->id); sq->id);
if (flush) { if (flush) {
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
sq->srcresult = GST_FLOW_WRONG_STATE; sq->srcresult = GST_FLOW_WRONG_STATE;
gst_data_queue_set_flushing (sq->queue, TRUE); gst_data_queue_set_flushing (sq->queue, TRUE);
sq->flushing = TRUE; sq->flushing = TRUE;
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
/* wake up non-linked task */ /* wake up non-linked task */
GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task", GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task",
sq->id); sq->id);
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
g_cond_signal (sq->turn); g_cond_signal (sq->turn);
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id); GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id);
result = gst_pad_pause_task (sq->srcpad); result = gst_pad_pause_task (sq->srcpad);
sq->sink_tainted = sq->src_tainted = TRUE; sq->sink_tainted = sq->src_tainted = TRUE;
} else { } else {
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
gst_data_queue_flush (sq->queue); gst_data_queue_flush (sq->queue);
gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME); gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
gst_segment_init (&sq->src_segment, GST_FORMAT_TIME); gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
/* All pads start off not-linked for a smooth kick-off */ /* All pads start off not-linked for a smooth kick-off */
sq->srcresult = GST_FLOW_OK; sq->srcresult = GST_FLOW_OK;
sq->pushed = FALSE;
sq->cur_time = 0; sq->cur_time = 0;
sq->max_size.visible = mq->max_size.visible; sq->max_size.visible = mq->max_size.visible;
sq->is_eos = FALSE; sq->is_eos = FALSE;
@ -780,11 +788,10 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
gst_data_queue_set_flushing (sq->queue, FALSE); gst_data_queue_set_flushing (sq->queue, FALSE);
/* Reset high time to be recomputed next */ /* Reset high time to be recomputed next */
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
mq->high_time = GST_CLOCK_TIME_NONE; mq->high_time = GST_CLOCK_TIME_NONE;
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
sq->flushing = FALSE; sq->flushing = FALSE;
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id); GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
result = result =
@ -1218,14 +1225,13 @@ gst_multi_queue_loop (GstPad * pad)
* or it's the first loop, or we just passed the previous highid, * or it's the first loop, or we just passed the previous highid,
* we might need to wake some sleeping pad up, so there's extra work * we might need to wake some sleeping pad up, so there's extra work
* there too */ * there too */
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
if (sq->srcresult == GST_FLOW_NOT_LINKED if (sq->srcresult == GST_FLOW_NOT_LINKED
|| (sq->last_oldid == G_MAXUINT32) || (newid != (sq->last_oldid + 1)) || (sq->last_oldid == G_MAXUINT32) || (newid != (sq->last_oldid + 1))
|| sq->last_oldid > mq->highid) { || sq->last_oldid > mq->highid) {
GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s", GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s",
gst_flow_get_name (sq->srcresult)); gst_flow_get_name (sq->srcresult));
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
/* Check again if we're flushing after the lock is taken, /* Check again if we're flushing after the lock is taken,
* the flush flag might have been changed in the meantime */ * the flush flag might have been changed in the meantime */
if (sq->flushing) { if (sq->flushing) {
@ -1292,9 +1298,8 @@ gst_multi_queue_loop (GstPad * pad)
/* We're done waiting, we can clear the nextid and nexttime */ /* We're done waiting, we can clear the nextid and nexttime */
sq->nextid = 0; sq->nextid = 0;
sq->next_time = GST_CLOCK_TIME_NONE; sq->next_time = GST_CLOCK_TIME_NONE;
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
} }
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
if (sq->flushing) if (sq->flushing)
goto out_flushing; goto out_flushing;
@ -1303,6 +1308,7 @@ gst_multi_queue_loop (GstPad * pad)
gst_flow_get_name (sq->srcresult)); gst_flow_get_name (sq->srcresult));
/* Update time stats */ /* Update time stats */
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
next_time = get_running_time (&sq->src_segment, object, FALSE); next_time = get_running_time (&sq->src_segment, object, FALSE);
if (next_time != GST_CLOCK_TIME_NONE) { if (next_time != GST_CLOCK_TIME_NONE) {
if (sq->last_time == GST_CLOCK_TIME_NONE || sq->last_time < next_time) if (sq->last_time == GST_CLOCK_TIME_NONE || sq->last_time < next_time)
@ -1313,10 +1319,51 @@ gst_multi_queue_loop (GstPad * pad)
wake_up_next_non_linked (mq); wake_up_next_non_linked (mq);
} }
} }
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
/* Try to push out the new object */ /* Try to push out the new object */
result = gst_single_queue_push_one (mq, sq, object); result = gst_single_queue_push_one (mq, sq, object);
/* Check if we pushed something already and if this is
* now a switch from an active to a non-active stream.
*
* If it is, we reset all the waiting streams, let them
* push another buffer to see if they're now active again.
* This allows faster switching between streams and prevents
* deadlocks if downstream does any waiting too.
*/
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
if (sq->pushed && sq->srcresult == GST_FLOW_OK
&& result == GST_FLOW_NOT_LINKED) {
GList *tmp;
GST_LOG_OBJECT (mq, "SingleQueue %d : Changed from active to non-active",
sq->id);
compute_high_id (mq);
/* maybe no-one is waiting */
if (mq->numwaiting > 0) {
/* Else figure out which singlequeue(s) need waking up */
for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
GstSingleQueue *sq2 = (GstSingleQueue *) tmp->data;
if (sq2->srcresult == GST_FLOW_NOT_LINKED) {
GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq2->id);
sq2->pushed = FALSE;
sq2->srcresult = GST_FLOW_OK;
g_cond_signal (sq2->turn);
}
}
}
}
if (GST_IS_BUFFER (object))
sq->pushed = TRUE;
sq->srcresult = result; sq->srcresult = result;
sq->last_oldid = newid;
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
object = NULL; object = NULL;
if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED
@ -1326,8 +1373,6 @@ gst_multi_queue_loop (GstPad * pad)
GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s", GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s",
gst_flow_get_name (sq->srcresult)); gst_flow_get_name (sq->srcresult));
sq->last_oldid = newid;
return; return;
out_flushing: out_flushing:
@ -1422,16 +1467,30 @@ static gboolean
gst_multi_queue_sink_activate_push (GstPad * pad, gboolean active) gst_multi_queue_sink_activate_push (GstPad * pad, gboolean active)
{ {
GstSingleQueue *sq; GstSingleQueue *sq;
GstMultiQueue *mq;
sq = (GstSingleQueue *) gst_pad_get_element_private (pad); sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
mq = (GstMultiQueue *) gst_pad_get_parent (pad);
/* mq is NULL if the pad is activated/deactivated before being
* added to the multiqueue */
if (mq)
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
if (active) { if (active) {
/* All pads start off linked until they push one buffer */ /* All pads start off linked until they push one buffer */
sq->srcresult = GST_FLOW_OK; sq->srcresult = GST_FLOW_OK;
sq->pushed = FALSE;
} else { } else {
sq->srcresult = GST_FLOW_WRONG_STATE; sq->srcresult = GST_FLOW_WRONG_STATE;
gst_data_queue_flush (sq->queue); gst_data_queue_flush (sq->queue);
} }
if (mq) {
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
gst_object_unref (mq);
}
return TRUE; return TRUE;
} }
@ -1944,6 +2003,7 @@ gst_single_queue_new (GstMultiQueue * mqueue, gint id)
sq->mqueue = mqueue; sq->mqueue = mqueue;
sq->srcresult = GST_FLOW_WRONG_STATE; sq->srcresult = GST_FLOW_WRONG_STATE;
sq->pushed = FALSE;
sq->queue = gst_data_queue_new_full ((GstDataQueueCheckFullFunction) sq->queue = gst_data_queue_new_full ((GstDataQueueCheckFullFunction)
single_queue_check_full, single_queue_check_full,
(GstDataQueueFullCallback) single_queue_overrun_cb, (GstDataQueueFullCallback) single_queue_overrun_cb,