mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-06-07 16:08:51 +00:00
Fix a deadlock that happens if the next element sends an event and the previous element sends a buffer where the queu...
Original commit message from CVS: Fix a deadlock that happens if the next element sends an event and the previous element sends a buffer where the queue is full. See the comment in the code for the rest.
This commit is contained in:
parent
8cf0d17c3f
commit
9111ea764c
2 changed files with 58 additions and 20 deletions
|
@ -377,6 +377,20 @@ gst_queue_locked_flush (GstQueue *queue)
|
||||||
g_cond_signal (queue->item_del);
|
g_cond_signal (queue->item_del);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
gst_queue_handle_pending_events (GstQueue *queue)
|
||||||
|
{
|
||||||
|
/* check for events to send upstream */
|
||||||
|
while (!g_queue_is_empty (queue->events)){
|
||||||
|
GstQueueEventResponse *er = g_queue_pop_head (queue->events);
|
||||||
|
GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "sending event upstream");
|
||||||
|
er->ret = gst_pad_event_default (GST_PAD_PEER (queue->sinkpad), er->event);
|
||||||
|
er->handled = TRUE;
|
||||||
|
g_cond_signal (queue->event_done);
|
||||||
|
GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "event sent");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#define STATUS(queue, msg) \
|
#define STATUS(queue, msg) \
|
||||||
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, \
|
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, \
|
||||||
"(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
|
"(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
|
||||||
|
@ -412,15 +426,7 @@ restart:
|
||||||
g_mutex_lock (queue->qlock);
|
g_mutex_lock (queue->qlock);
|
||||||
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locked t:%p", g_thread_self ());
|
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locked t:%p", g_thread_self ());
|
||||||
|
|
||||||
/* check for events to send upstream */
|
gst_queue_handle_pending_events (queue);
|
||||||
while (!g_queue_is_empty (queue->events)){
|
|
||||||
GstQueueEventResponse *er = g_queue_pop_head (queue->events);
|
|
||||||
GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "sending event upstream");
|
|
||||||
er->ret = gst_pad_event_default (GST_PAD_PEER (pad), er->event);
|
|
||||||
er->handled = TRUE;
|
|
||||||
g_cond_signal (queue->event_done);
|
|
||||||
GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "event sent");
|
|
||||||
}
|
|
||||||
|
|
||||||
/* assume don't need to flush this buffer when the queue is filled */
|
/* assume don't need to flush this buffer when the queue is filled */
|
||||||
queue->flush = FALSE;
|
queue->flush = FALSE;
|
||||||
|
@ -569,6 +575,14 @@ restart:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* OK, we've got a serious issue here. Imagine the situation
|
||||||
|
* where the puller (next element) is sending an event here,
|
||||||
|
* so it cannot pull events from the queue, and we cannot
|
||||||
|
* push data further because the queue is 'full' and therefore,
|
||||||
|
* we wait here (and do not handle events): deadlock! to solve
|
||||||
|
* that, we handle pending upstream events here, too. */
|
||||||
|
gst_queue_handle_pending_events (queue);
|
||||||
|
|
||||||
STATUS (queue, "waiting for item_del signal");
|
STATUS (queue, "waiting for item_del signal");
|
||||||
g_cond_wait (queue->item_del, queue->qlock);
|
g_cond_wait (queue->item_del, queue->qlock);
|
||||||
STATUS (queue, "received item_del signal");
|
STATUS (queue, "received item_del signal");
|
||||||
|
@ -719,7 +733,6 @@ restart:
|
||||||
|
|
||||||
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "signalling item_del");
|
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "signalling item_del");
|
||||||
g_cond_signal (queue->item_del);
|
g_cond_signal (queue->item_del);
|
||||||
|
|
||||||
g_mutex_unlock (queue->qlock);
|
g_mutex_unlock (queue->qlock);
|
||||||
|
|
||||||
/* FIXME: I suppose this needs to be locked, since the EOS
|
/* FIXME: I suppose this needs to be locked, since the EOS
|
||||||
|
@ -759,9 +772,15 @@ gst_queue_handle_src_event (GstPad *pad,
|
||||||
er.event = event;
|
er.event = event;
|
||||||
er.handled = FALSE;
|
er.handled = FALSE;
|
||||||
g_queue_push_tail (queue->events, &er);
|
g_queue_push_tail (queue->events, &er);
|
||||||
|
GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue,
|
||||||
|
"Preparing for loop for event handler");
|
||||||
|
/* see the chain function on why this is here - it prevents a deadlock */
|
||||||
|
g_cond_signal (queue->item_del);
|
||||||
while (!er.handled) {
|
while (!er.handled) {
|
||||||
g_cond_wait (queue->event_done, queue->qlock);
|
g_cond_wait (queue->event_done, queue->qlock);
|
||||||
}
|
}
|
||||||
|
GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue,
|
||||||
|
"Event handled");
|
||||||
res = er.ret;
|
res = er.ret;
|
||||||
} else {
|
} else {
|
||||||
res = gst_pad_event_default (pad, event);
|
res = gst_pad_event_default (pad, event);
|
||||||
|
|
|
@ -377,6 +377,20 @@ gst_queue_locked_flush (GstQueue *queue)
|
||||||
g_cond_signal (queue->item_del);
|
g_cond_signal (queue->item_del);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
gst_queue_handle_pending_events (GstQueue *queue)
|
||||||
|
{
|
||||||
|
/* check for events to send upstream */
|
||||||
|
while (!g_queue_is_empty (queue->events)){
|
||||||
|
GstQueueEventResponse *er = g_queue_pop_head (queue->events);
|
||||||
|
GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "sending event upstream");
|
||||||
|
er->ret = gst_pad_event_default (GST_PAD_PEER (queue->sinkpad), er->event);
|
||||||
|
er->handled = TRUE;
|
||||||
|
g_cond_signal (queue->event_done);
|
||||||
|
GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "event sent");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#define STATUS(queue, msg) \
|
#define STATUS(queue, msg) \
|
||||||
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, \
|
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, \
|
||||||
"(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
|
"(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
|
||||||
|
@ -412,15 +426,7 @@ restart:
|
||||||
g_mutex_lock (queue->qlock);
|
g_mutex_lock (queue->qlock);
|
||||||
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locked t:%p", g_thread_self ());
|
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locked t:%p", g_thread_self ());
|
||||||
|
|
||||||
/* check for events to send upstream */
|
gst_queue_handle_pending_events (queue);
|
||||||
while (!g_queue_is_empty (queue->events)){
|
|
||||||
GstQueueEventResponse *er = g_queue_pop_head (queue->events);
|
|
||||||
GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "sending event upstream");
|
|
||||||
er->ret = gst_pad_event_default (GST_PAD_PEER (pad), er->event);
|
|
||||||
er->handled = TRUE;
|
|
||||||
g_cond_signal (queue->event_done);
|
|
||||||
GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "event sent");
|
|
||||||
}
|
|
||||||
|
|
||||||
/* assume don't need to flush this buffer when the queue is filled */
|
/* assume don't need to flush this buffer when the queue is filled */
|
||||||
queue->flush = FALSE;
|
queue->flush = FALSE;
|
||||||
|
@ -569,6 +575,14 @@ restart:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* OK, we've got a serious issue here. Imagine the situation
|
||||||
|
* where the puller (next element) is sending an event here,
|
||||||
|
* so it cannot pull events from the queue, and we cannot
|
||||||
|
* push data further because the queue is 'full' and therefore,
|
||||||
|
* we wait here (and do not handle events): deadlock! to solve
|
||||||
|
* that, we handle pending upstream events here, too. */
|
||||||
|
gst_queue_handle_pending_events (queue);
|
||||||
|
|
||||||
STATUS (queue, "waiting for item_del signal");
|
STATUS (queue, "waiting for item_del signal");
|
||||||
g_cond_wait (queue->item_del, queue->qlock);
|
g_cond_wait (queue->item_del, queue->qlock);
|
||||||
STATUS (queue, "received item_del signal");
|
STATUS (queue, "received item_del signal");
|
||||||
|
@ -719,7 +733,6 @@ restart:
|
||||||
|
|
||||||
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "signalling item_del");
|
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "signalling item_del");
|
||||||
g_cond_signal (queue->item_del);
|
g_cond_signal (queue->item_del);
|
||||||
|
|
||||||
g_mutex_unlock (queue->qlock);
|
g_mutex_unlock (queue->qlock);
|
||||||
|
|
||||||
/* FIXME: I suppose this needs to be locked, since the EOS
|
/* FIXME: I suppose this needs to be locked, since the EOS
|
||||||
|
@ -759,9 +772,15 @@ gst_queue_handle_src_event (GstPad *pad,
|
||||||
er.event = event;
|
er.event = event;
|
||||||
er.handled = FALSE;
|
er.handled = FALSE;
|
||||||
g_queue_push_tail (queue->events, &er);
|
g_queue_push_tail (queue->events, &er);
|
||||||
|
GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue,
|
||||||
|
"Preparing for loop for event handler");
|
||||||
|
/* see the chain function on why this is here - it prevents a deadlock */
|
||||||
|
g_cond_signal (queue->item_del);
|
||||||
while (!er.handled) {
|
while (!er.handled) {
|
||||||
g_cond_wait (queue->event_done, queue->qlock);
|
g_cond_wait (queue->event_done, queue->qlock);
|
||||||
}
|
}
|
||||||
|
GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue,
|
||||||
|
"Event handled");
|
||||||
res = er.ret;
|
res = er.ret;
|
||||||
} else {
|
} else {
|
||||||
res = gst_pad_event_default (pad, event);
|
res = gst_pad_event_default (pad, event);
|
||||||
|
|
Loading…
Reference in a new issue