From 9441e711df7a40135efd28ea7f9d1b50bfc59ef1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Thu, 29 Mar 2012 14:45:41 +0200 Subject: [PATCH] multiqueue: Wake up all not-linked streams when a stream switches from linked to not-linked We reset all the waiting streams, let them push another buffer to see if they're now active again. This allows faster switching between streams and prevents deadlocks if downstream does any waiting too. Also improve locking a bit, srcresult must be protected by the multiqueue lock too because it's used/set from random threads. --- plugins/elements/gstmultiqueue.c | 80 ++++++++++++++++++++++++++++---- 1 file changed, 70 insertions(+), 10 deletions(-) diff --git a/plugins/elements/gstmultiqueue.c b/plugins/elements/gstmultiqueue.c index e882ff775c..9891b9364d 100644 --- a/plugins/elements/gstmultiqueue.c +++ b/plugins/elements/gstmultiqueue.c @@ -140,6 +140,12 @@ struct _GstSingleQueue /* flowreturn of previous srcpad push */ GstFlowReturn srcresult; + /* If something was actually pushed on + * this pad after flushing/pad activation + * and the srcresult corresponds to something + * real + */ + gboolean pushed; /* segments */ GstSegment sink_segment; @@ -748,27 +754,29 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush) sq->id); if (flush) { + GST_MULTI_QUEUE_MUTEX_LOCK (mq); sq->srcresult = GST_FLOW_WRONG_STATE; gst_data_queue_set_flushing (sq->queue, TRUE); sq->flushing = TRUE; + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); /* wake up non-linked task */ GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task", sq->id); - GST_MULTI_QUEUE_MUTEX_LOCK (mq); g_cond_signal (sq->turn); - GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id); result = gst_pad_pause_task (sq->srcpad); sq->sink_tainted = sq->src_tainted = TRUE; } else { + GST_MULTI_QUEUE_MUTEX_LOCK (mq); gst_data_queue_flush (sq->queue); gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME); gst_segment_init (&sq->src_segment, GST_FORMAT_TIME); /* All pads start off not-linked for a smooth kick-off */ sq->srcresult = GST_FLOW_OK; + sq->pushed = FALSE; sq->cur_time = 0; sq->max_size.visible = mq->max_size.visible; sq->is_eos = FALSE; @@ -780,11 +788,10 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush) gst_data_queue_set_flushing (sq->queue, FALSE); /* Reset high time to be recomputed next */ - GST_MULTI_QUEUE_MUTEX_LOCK (mq); mq->high_time = GST_CLOCK_TIME_NONE; - GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); sq->flushing = FALSE; + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id); result = @@ -1218,14 +1225,13 @@ gst_multi_queue_loop (GstPad * pad) * or it's the first loop, or we just passed the previous highid, * we might need to wake some sleeping pad up, so there's extra work * there too */ + GST_MULTI_QUEUE_MUTEX_LOCK (mq); if (sq->srcresult == GST_FLOW_NOT_LINKED || (sq->last_oldid == G_MAXUINT32) || (newid != (sq->last_oldid + 1)) || sq->last_oldid > mq->highid) { GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s", gst_flow_get_name (sq->srcresult)); - GST_MULTI_QUEUE_MUTEX_LOCK (mq); - /* Check again if we're flushing after the lock is taken, * the flush flag might have been changed in the meantime */ if (sq->flushing) { @@ -1292,9 +1298,8 @@ gst_multi_queue_loop (GstPad * pad) /* We're done waiting, we can clear the nextid and nexttime */ sq->nextid = 0; sq->next_time = GST_CLOCK_TIME_NONE; - - GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); } + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); if (sq->flushing) goto out_flushing; @@ -1303,6 +1308,7 @@ gst_multi_queue_loop (GstPad * pad) gst_flow_get_name (sq->srcresult)); /* Update time stats */ + GST_MULTI_QUEUE_MUTEX_LOCK (mq); next_time = get_running_time (&sq->src_segment, object, FALSE); if (next_time != GST_CLOCK_TIME_NONE) { if (sq->last_time == GST_CLOCK_TIME_NONE || sq->last_time < next_time) @@ -1313,10 +1319,51 @@ gst_multi_queue_loop (GstPad * pad) wake_up_next_non_linked (mq); } } + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); /* Try to push out the new object */ result = gst_single_queue_push_one (mq, sq, object); + + /* Check if we pushed something already and if this is + * now a switch from an active to a non-active stream. + * + * If it is, we reset all the waiting streams, let them + * push another buffer to see if they're now active again. + * This allows faster switching between streams and prevents + * deadlocks if downstream does any waiting too. + */ + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + if (sq->pushed && sq->srcresult == GST_FLOW_OK + && result == GST_FLOW_NOT_LINKED) { + GList *tmp; + + GST_LOG_OBJECT (mq, "SingleQueue %d : Changed from active to non-active", + sq->id); + + compute_high_id (mq); + + /* maybe no-one is waiting */ + if (mq->numwaiting > 0) { + /* Else figure out which singlequeue(s) need waking up */ + for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { + GstSingleQueue *sq2 = (GstSingleQueue *) tmp->data; + + if (sq2->srcresult == GST_FLOW_NOT_LINKED) { + GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq2->id); + sq2->pushed = FALSE; + sq2->srcresult = GST_FLOW_OK; + g_cond_signal (sq2->turn); + } + } + } + } + + if (GST_IS_BUFFER (object)) + sq->pushed = TRUE; sq->srcresult = result; + sq->last_oldid = newid; + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + object = NULL; if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED @@ -1326,8 +1373,6 @@ gst_multi_queue_loop (GstPad * pad) GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s", gst_flow_get_name (sq->srcresult)); - sq->last_oldid = newid; - return; out_flushing: @@ -1422,16 +1467,30 @@ static gboolean gst_multi_queue_sink_activate_push (GstPad * pad, gboolean active) { GstSingleQueue *sq; + GstMultiQueue *mq; sq = (GstSingleQueue *) gst_pad_get_element_private (pad); + mq = (GstMultiQueue *) gst_pad_get_parent (pad); + + /* mq is NULL if the pad is activated/deactivated before being + * added to the multiqueue */ + if (mq) + GST_MULTI_QUEUE_MUTEX_LOCK (mq); if (active) { /* All pads start off linked until they push one buffer */ sq->srcresult = GST_FLOW_OK; + sq->pushed = FALSE; } else { sq->srcresult = GST_FLOW_WRONG_STATE; gst_data_queue_flush (sq->queue); } + + if (mq) { + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + gst_object_unref (mq); + } + return TRUE; } @@ -1944,6 +2003,7 @@ gst_single_queue_new (GstMultiQueue * mqueue, gint id) sq->mqueue = mqueue; sq->srcresult = GST_FLOW_WRONG_STATE; + sq->pushed = FALSE; sq->queue = gst_data_queue_new_full ((GstDataQueueCheckFullFunction) single_queue_check_full, (GstDataQueueFullCallback) single_queue_overrun_cb,