diff --git a/gst/gstqueue.c b/gst/gstqueue.c index efff3c940f..b9a6d94644 100644 --- a/gst/gstqueue.c +++ b/gst/gstqueue.c @@ -377,6 +377,20 @@ gst_queue_locked_flush (GstQueue *queue) g_cond_signal (queue->item_del); } +static void +gst_queue_handle_pending_events (GstQueue *queue) +{ + /* check for events to send upstream */ + while (!g_queue_is_empty (queue->events)){ + GstQueueEventResponse *er = g_queue_pop_head (queue->events); + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "sending event upstream"); + er->ret = gst_pad_event_default (GST_PAD_PEER (queue->sinkpad), er->event); + er->handled = TRUE; + g_cond_signal (queue->event_done); + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "event sent"); + } +} + #define STATUS(queue, msg) \ GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, \ "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \ @@ -412,15 +426,7 @@ restart: g_mutex_lock (queue->qlock); GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locked t:%p", g_thread_self ()); - /* check for events to send upstream */ - while (!g_queue_is_empty (queue->events)){ - GstQueueEventResponse *er = g_queue_pop_head (queue->events); - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "sending event upstream"); - er->ret = gst_pad_event_default (GST_PAD_PEER (pad), er->event); - er->handled = TRUE; - g_cond_signal (queue->event_done); - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "event sent"); - } + gst_queue_handle_pending_events (queue); /* assume don't need to flush this buffer when the queue is filled */ queue->flush = FALSE; @@ -569,6 +575,14 @@ restart: } } + /* OK, we've got a serious issue here. Imagine the situation + * where the puller (next element) is sending an event here, + * so it cannot pull events from the queue, and we cannot + * push data further because the queue is 'full' and therefore, + * we wait here (and do not handle events): deadlock! to solve + * that, we handle pending upstream events here, too. */ + gst_queue_handle_pending_events (queue); + STATUS (queue, "waiting for item_del signal"); g_cond_wait (queue->item_del, queue->qlock); STATUS (queue, "received item_del signal"); @@ -719,7 +733,6 @@ restart: GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "signalling item_del"); g_cond_signal (queue->item_del); - g_mutex_unlock (queue->qlock); /* FIXME: I suppose this needs to be locked, since the EOS @@ -759,9 +772,15 @@ gst_queue_handle_src_event (GstPad *pad, er.event = event; er.handled = FALSE; g_queue_push_tail (queue->events, &er); + GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue, + "Preparing for loop for event handler"); + /* see the chain function on why this is here - it prevents a deadlock */ + g_cond_signal (queue->item_del); while (!er.handled) { g_cond_wait (queue->event_done, queue->qlock); } + GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue, + "Event handled"); res = er.ret; } else { res = gst_pad_event_default (pad, event); diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index efff3c940f..b9a6d94644 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -377,6 +377,20 @@ gst_queue_locked_flush (GstQueue *queue) g_cond_signal (queue->item_del); } +static void +gst_queue_handle_pending_events (GstQueue *queue) +{ + /* check for events to send upstream */ + while (!g_queue_is_empty (queue->events)){ + GstQueueEventResponse *er = g_queue_pop_head (queue->events); + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "sending event upstream"); + er->ret = gst_pad_event_default (GST_PAD_PEER (queue->sinkpad), er->event); + er->handled = TRUE; + g_cond_signal (queue->event_done); + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "event sent"); + } +} + #define STATUS(queue, msg) \ GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, \ "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \ @@ -412,15 +426,7 @@ restart: g_mutex_lock (queue->qlock); GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locked t:%p", g_thread_self ()); - /* check for events to send upstream */ - while (!g_queue_is_empty (queue->events)){ - GstQueueEventResponse *er = g_queue_pop_head (queue->events); - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "sending event upstream"); - er->ret = gst_pad_event_default (GST_PAD_PEER (pad), er->event); - er->handled = TRUE; - g_cond_signal (queue->event_done); - GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "event sent"); - } + gst_queue_handle_pending_events (queue); /* assume don't need to flush this buffer when the queue is filled */ queue->flush = FALSE; @@ -569,6 +575,14 @@ restart: } } + /* OK, we've got a serious issue here. Imagine the situation + * where the puller (next element) is sending an event here, + * so it cannot pull events from the queue, and we cannot + * push data further because the queue is 'full' and therefore, + * we wait here (and do not handle events): deadlock! to solve + * that, we handle pending upstream events here, too. */ + gst_queue_handle_pending_events (queue); + STATUS (queue, "waiting for item_del signal"); g_cond_wait (queue->item_del, queue->qlock); STATUS (queue, "received item_del signal"); @@ -719,7 +733,6 @@ restart: GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "signalling item_del"); g_cond_signal (queue->item_del); - g_mutex_unlock (queue->qlock); /* FIXME: I suppose this needs to be locked, since the EOS @@ -759,9 +772,15 @@ gst_queue_handle_src_event (GstPad *pad, er.event = event; er.handled = FALSE; g_queue_push_tail (queue->events, &er); + GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue, + "Preparing for loop for event handler"); + /* see the chain function on why this is here - it prevents a deadlock */ + g_cond_signal (queue->item_del); while (!er.handled) { g_cond_wait (queue->event_done, queue->qlock); } + GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue, + "Event handled"); res = er.ret; } else { res = gst_pad_event_default (pad, event);