multiqueue: Don't do an infinite loop in the loop function

Instead return after every iteration, which makes sure that the
stream lock is released for a short time after every iteration,
task state changes are checked, etc and this allows the task
to be stopped properly.
This commit is contained in:
Sebastian Dröge 2010-08-27 16:52:12 +02:00
parent 8035f13bbf
commit 33082eb9e4

View file

@ -1022,95 +1022,93 @@ gst_multi_queue_loop (GstPad * pad)
sq = (GstSingleQueue *) gst_pad_get_element_private (pad); sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
mq = sq->mqueue; mq = sq->mqueue;
do { GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);
GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);
/* Get something from the queue, blocking until that happens, or we get /* Get something from the queue, blocking until that happens, or we get
* flushed */ * flushed */
if (!(gst_data_queue_pop (sq->queue, &sitem))) if (!(gst_data_queue_pop (sq->queue, &sitem)))
goto out_flushing; goto out_flushing;
item = (GstMultiQueueItem *) sitem; item = (GstMultiQueueItem *) sitem;
newid = item->posid; newid = item->posid;
/* steal the object and destroy the item */ /* steal the object and destroy the item */
object = gst_multi_queue_item_steal_object (item); object = gst_multi_queue_item_steal_object (item);
gst_multi_queue_item_destroy (item); gst_multi_queue_item_destroy (item);
GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d", GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d",
sq->id, newid, oldid); sq->id, newid, oldid);
/* If we're not-linked, we do some extra work because we might need to /* If we're not-linked, we do some extra work because we might need to
* wait before pushing. If we're linked but there's a gap in the IDs, * wait before pushing. If we're linked but there's a gap in the IDs,
* or it's the first loop, or we just passed the previous highid, * or it's the first loop, or we just passed the previous highid,
* we might need to wake some sleeping pad up, so there's extra work * we might need to wake some sleeping pad up, so there's extra work
* there too */ * there too */
if (sq->srcresult == GST_FLOW_NOT_LINKED || if (sq->srcresult == GST_FLOW_NOT_LINKED ||
(oldid == G_MAXUINT32) || (newid != (oldid + 1)) || (oldid == G_MAXUINT32) || (newid != (oldid + 1)) || oldid > mq->highid) {
oldid > mq->highid) { GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s",
GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s", gst_flow_get_name (sq->srcresult));
gst_flow_get_name (sq->srcresult));
GST_MULTI_QUEUE_MUTEX_LOCK (mq); GST_MULTI_QUEUE_MUTEX_LOCK (mq);
/* Update the nextid so other threads know when to wake us up */ /* Update the nextid so other threads know when to wake us up */
sq->nextid = newid; sq->nextid = newid;
/* Update the oldid (the last ID we output) for highid tracking */ /* Update the oldid (the last ID we output) for highid tracking */
if (oldid != G_MAXUINT32) if (oldid != G_MAXUINT32)
sq->oldid = oldid; sq->oldid = oldid;
if (sq->srcresult == GST_FLOW_NOT_LINKED) { if (sq->srcresult == GST_FLOW_NOT_LINKED) {
/* Go to sleep until it's time to push this buffer */ /* Go to sleep until it's time to push this buffer */
/* Recompute the highid */ /* Recompute the highid */
compute_high_id (mq); compute_high_id (mq);
while (newid > mq->highid && sq->srcresult == GST_FLOW_NOT_LINKED) { while (newid > mq->highid && sq->srcresult == GST_FLOW_NOT_LINKED) {
GST_DEBUG_OBJECT (mq, "queue %d sleeping for not-linked wakeup with " GST_DEBUG_OBJECT (mq, "queue %d sleeping for not-linked wakeup with "
"newid %u and highid %u", sq->id, newid, mq->highid); "newid %u and highid %u", sq->id, newid, mq->highid);
/* Wake up all non-linked pads before we sleep */ /* Wake up all non-linked pads before we sleep */
wake_up_next_non_linked (mq);
mq->numwaiting++;
g_cond_wait (sq->turn, mq->qlock);
mq->numwaiting--;
GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked "
"wakeup with newid %u and highid %u", sq->id, newid, mq->highid);
}
/* Re-compute the high_id in case someone else pushed */
compute_high_id (mq);
} else {
compute_high_id (mq);
/* Wake up all non-linked pads */
wake_up_next_non_linked (mq); wake_up_next_non_linked (mq);
mq->numwaiting++;
g_cond_wait (sq->turn, mq->qlock);
mq->numwaiting--;
GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked "
"wakeup with newid %u and highid %u", sq->id, newid, mq->highid);
} }
/* We're done waiting, we can clear the nextid */
sq->nextid = 0;
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); /* Re-compute the high_id in case someone else pushed */
compute_high_id (mq);
} else {
compute_high_id (mq);
/* Wake up all non-linked pads */
wake_up_next_non_linked (mq);
} }
/* We're done waiting, we can clear the nextid */
sq->nextid = 0;
GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s", GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
gst_flow_get_name (sq->srcresult));
/* Try to push out the new object */
result = gst_single_queue_push_one (mq, sq, object);
sq->srcresult = result;
if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED
&& result != GST_FLOW_UNEXPECTED)
goto out_flushing;
GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s",
gst_flow_get_name (sq->srcresult));
oldid = newid;
} }
while (TRUE);
GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s",
gst_flow_get_name (sq->srcresult));
/* Try to push out the new object */
result = gst_single_queue_push_one (mq, sq, object);
sq->srcresult = result;
if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED
&& result != GST_FLOW_UNEXPECTED)
goto out_flushing;
GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s",
gst_flow_get_name (sq->srcresult));
oldid = newid;
return;
out_flushing: out_flushing:
{ {