multiqueue: Exit loop function if the pad is flushing

Fixes possible deadlocks when flushing an unlinked pad that waits
for other pads to advance.
This commit is contained in:
Sebastian Dröge 2011-03-21 16:28:37 +01:00
parent a789096c04
commit 14eb517849

View file

@ -148,6 +148,7 @@ struct _GstSingleQueue
GstDataQueueSize max_size, extra_size; GstDataQueueSize max_size, extra_size;
GstClockTime cur_time; GstClockTime cur_time;
gboolean is_eos; gboolean is_eos;
gboolean flushing;
/* Protected by global lock */ /* Protected by global lock */
guint32 nextid; /* ID of the next object waiting to be pushed */ guint32 nextid; /* ID of the next object waiting to be pushed */
@ -661,6 +662,8 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
sq->srcresult = GST_FLOW_WRONG_STATE; sq->srcresult = GST_FLOW_WRONG_STATE;
gst_data_queue_set_flushing (sq->queue, TRUE); gst_data_queue_set_flushing (sq->queue, TRUE);
sq->flushing = TRUE;
/* wake up non-linked task */ /* wake up non-linked task */
GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task", GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task",
sq->id); sq->id);
@ -685,6 +688,8 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
sq->last_oldid = G_MAXUINT32; sq->last_oldid = G_MAXUINT32;
gst_data_queue_set_flushing (sq->queue, FALSE); gst_data_queue_set_flushing (sq->queue, FALSE);
sq->flushing = FALSE;
GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id); GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
result = result =
gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop, gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop,
@ -1025,6 +1030,9 @@ gst_multi_queue_loop (GstPad * pad)
GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id); GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);
if (sq->flushing)
goto out_flushing;
/* Get something from the queue, blocking until that happens, or we get /* Get something from the queue, blocking until that happens, or we get
* flushed */ * flushed */
if (!(gst_data_queue_pop (sq->queue, &sitem))) if (!(gst_data_queue_pop (sq->queue, &sitem)))
@ -1077,6 +1085,11 @@ gst_multi_queue_loop (GstPad * pad)
g_cond_wait (sq->turn, mq->qlock); g_cond_wait (sq->turn, mq->qlock);
mq->numwaiting--; mq->numwaiting--;
if (sq->flushing) {
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
goto out_flushing;
}
GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked " GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked "
"wakeup with newid %u and highid %u", sq->id, newid, mq->highid); "wakeup with newid %u and highid %u", sq->id, newid, mq->highid);
} }
@ -1094,6 +1107,9 @@ gst_multi_queue_loop (GstPad * pad)
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
} }
if (sq->flushing)
goto out_flushing;
GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s", GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s",
gst_flow_get_name (sq->srcresult)); gst_flow_get_name (sq->srcresult));
@ -1664,6 +1680,7 @@ gst_single_queue_new (GstMultiQueue * mqueue)
(GstDataQueueFullCallback) single_queue_overrun_cb, (GstDataQueueFullCallback) single_queue_overrun_cb,
(GstDataQueueEmptyCallback) single_queue_underrun_cb, sq); (GstDataQueueEmptyCallback) single_queue_underrun_cb, sq);
sq->is_eos = FALSE; sq->is_eos = FALSE;
sq->flushing = FALSE;
gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME); gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
gst_segment_init (&sq->src_segment, GST_FORMAT_TIME); gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);