mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-06-07 07:58:51 +00:00
queue: Fix handling of serialized queries
During FLUSH_START the query needs to be unblocked already, otherwise it can lead to deadlocks if the FLUSH_START is the result of something done from the streaming thread of the srcpad (the queue will never be emptied!).
This commit is contained in:
parent
cdc429f296
commit
6f042f13ae
2 changed files with 57 additions and 36 deletions
|
@ -219,6 +219,13 @@ static gboolean gst_queue_sink_activate_mode (GstPad * pad, GstObject * parent,
|
||||||
static gboolean gst_queue_is_empty (GstQueue * queue);
|
static gboolean gst_queue_is_empty (GstQueue * queue);
|
||||||
static gboolean gst_queue_is_filled (GstQueue * queue);
|
static gboolean gst_queue_is_filled (GstQueue * queue);
|
||||||
|
|
||||||
|
|
||||||
|
typedef struct
|
||||||
|
{
|
||||||
|
gboolean is_query;
|
||||||
|
GstMiniObject *item;
|
||||||
|
} GstQueueItem;
|
||||||
|
|
||||||
#define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ())
|
#define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ())
|
||||||
|
|
||||||
static GType
|
static GType
|
||||||
|
@ -437,6 +444,7 @@ gst_queue_init (GstQueue * queue)
|
||||||
g_mutex_init (&queue->qlock);
|
g_mutex_init (&queue->qlock);
|
||||||
g_cond_init (&queue->item_add);
|
g_cond_init (&queue->item_add);
|
||||||
g_cond_init (&queue->item_del);
|
g_cond_init (&queue->item_del);
|
||||||
|
g_cond_init (&queue->query_handled);
|
||||||
|
|
||||||
queue->queue = gst_queue_array_new (DEFAULT_MAX_SIZE_BUFFERS * 3 / 2);
|
queue->queue = gst_queue_array_new (DEFAULT_MAX_SIZE_BUFFERS * 3 / 2);
|
||||||
|
|
||||||
|
@ -456,22 +464,23 @@ gst_queue_init (GstQueue * queue)
|
||||||
static void
|
static void
|
||||||
gst_queue_finalize (GObject * object)
|
gst_queue_finalize (GObject * object)
|
||||||
{
|
{
|
||||||
GstMiniObject *data;
|
|
||||||
GstQueue *queue = GST_QUEUE (object);
|
GstQueue *queue = GST_QUEUE (object);
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (queue, "finalizing queue");
|
GST_DEBUG_OBJECT (queue, "finalizing queue");
|
||||||
|
|
||||||
while (!gst_queue_array_is_empty (queue->queue)) {
|
while (!gst_queue_array_is_empty (queue->queue)) {
|
||||||
data = gst_queue_array_pop_head (queue->queue);
|
GstQueueItem *qitem = gst_queue_array_pop_head (queue->queue);
|
||||||
/* FIXME: if it's a query, shouldn't we unref that too? */
|
/* FIXME: if it's a query, shouldn't we unref that too? */
|
||||||
if (!GST_IS_QUERY (data))
|
if (!qitem->is_query)
|
||||||
gst_mini_object_unref (data);
|
gst_mini_object_unref (qitem->item);
|
||||||
|
g_slice_free (GstQueueItem, qitem);
|
||||||
}
|
}
|
||||||
gst_queue_array_free (queue->queue);
|
gst_queue_array_free (queue->queue);
|
||||||
|
|
||||||
g_mutex_clear (&queue->qlock);
|
g_mutex_clear (&queue->qlock);
|
||||||
g_cond_clear (&queue->item_add);
|
g_cond_clear (&queue->item_add);
|
||||||
g_cond_clear (&queue->item_del);
|
g_cond_clear (&queue->item_del);
|
||||||
|
g_cond_clear (&queue->query_handled);
|
||||||
|
|
||||||
G_OBJECT_CLASS (parent_class)->finalize (object);
|
G_OBJECT_CLASS (parent_class)->finalize (object);
|
||||||
}
|
}
|
||||||
|
@ -575,21 +584,22 @@ apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment,
|
||||||
static void
|
static void
|
||||||
gst_queue_locked_flush (GstQueue * queue, gboolean full)
|
gst_queue_locked_flush (GstQueue * queue, gboolean full)
|
||||||
{
|
{
|
||||||
GstMiniObject *data;
|
|
||||||
|
|
||||||
while (!gst_queue_array_is_empty (queue->queue)) {
|
while (!gst_queue_array_is_empty (queue->queue)) {
|
||||||
data = gst_queue_array_pop_head (queue->queue);
|
GstQueueItem *qitem = gst_queue_array_pop_head (queue->queue);
|
||||||
|
|
||||||
/* Then lose another reference because we are supposed to destroy that
|
/* Then lose another reference because we are supposed to destroy that
|
||||||
data when flushing */
|
data when flushing */
|
||||||
if (!full && GST_IS_EVENT (data) && GST_EVENT_IS_STICKY (data) &&
|
if (!full && GST_IS_EVENT (qitem->item) && GST_EVENT_IS_STICKY (qitem->item)
|
||||||
GST_EVENT_TYPE (data) != GST_EVENT_SEGMENT
|
&& GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
|
||||||
&& GST_EVENT_TYPE (data) != GST_EVENT_EOS) {
|
&& GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) {
|
||||||
gst_pad_store_sticky_event (queue->srcpad, GST_EVENT_CAST (data));
|
gst_pad_store_sticky_event (queue->srcpad, GST_EVENT_CAST (qitem->item));
|
||||||
}
|
}
|
||||||
if (!GST_IS_QUERY (data))
|
if (!qitem->is_query)
|
||||||
gst_mini_object_unref (data);
|
gst_mini_object_unref (qitem->item);
|
||||||
|
g_slice_free (GstQueueItem, qitem);
|
||||||
}
|
}
|
||||||
queue->last_query = FALSE;
|
queue->last_query = FALSE;
|
||||||
|
g_cond_signal (&queue->query_handled);
|
||||||
GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
|
GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
|
||||||
queue->min_threshold.buffers = queue->orig_min_threshold.buffers;
|
queue->min_threshold.buffers = queue->orig_min_threshold.buffers;
|
||||||
queue->min_threshold.bytes = queue->orig_min_threshold.bytes;
|
queue->min_threshold.bytes = queue->orig_min_threshold.bytes;
|
||||||
|
@ -616,8 +626,12 @@ gst_queue_locked_enqueue_buffer (GstQueue * queue, gpointer item)
|
||||||
queue->cur_level.bytes += gst_buffer_get_size (buffer);
|
queue->cur_level.bytes += gst_buffer_get_size (buffer);
|
||||||
apply_buffer (queue, buffer, &queue->sink_segment, TRUE, TRUE);
|
apply_buffer (queue, buffer, &queue->sink_segment, TRUE, TRUE);
|
||||||
|
|
||||||
if (item)
|
if (item) {
|
||||||
gst_queue_array_push_tail (queue->queue, item);
|
GstQueueItem *qitem = g_slice_new (GstQueueItem);
|
||||||
|
qitem->item = item;
|
||||||
|
qitem->is_query = FALSE;
|
||||||
|
gst_queue_array_push_tail (queue->queue, qitem);
|
||||||
|
}
|
||||||
GST_QUEUE_SIGNAL_ADD (queue);
|
GST_QUEUE_SIGNAL_ADD (queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -654,8 +668,12 @@ gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (item)
|
if (item) {
|
||||||
gst_queue_array_push_tail (queue->queue, item);
|
GstQueueItem *qitem = g_slice_new (GstQueueItem);
|
||||||
|
qitem->item = item;
|
||||||
|
qitem->is_query = FALSE;
|
||||||
|
gst_queue_array_push_tail (queue->queue, qitem);
|
||||||
|
}
|
||||||
GST_QUEUE_SIGNAL_ADD (queue);
|
GST_QUEUE_SIGNAL_ADD (queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -663,12 +681,16 @@ gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item)
|
||||||
static GstMiniObject *
|
static GstMiniObject *
|
||||||
gst_queue_locked_dequeue (GstQueue * queue)
|
gst_queue_locked_dequeue (GstQueue * queue)
|
||||||
{
|
{
|
||||||
|
GstQueueItem *qitem;
|
||||||
GstMiniObject *item;
|
GstMiniObject *item;
|
||||||
|
|
||||||
item = gst_queue_array_pop_head (queue->queue);
|
qitem = gst_queue_array_pop_head (queue->queue);
|
||||||
if (item == NULL)
|
if (qitem == NULL)
|
||||||
goto no_item;
|
goto no_item;
|
||||||
|
|
||||||
|
item = qitem->item;
|
||||||
|
g_slice_free (GstQueueItem, qitem);
|
||||||
|
|
||||||
if (GST_IS_BUFFER (item)) {
|
if (GST_IS_BUFFER (item)) {
|
||||||
GstBuffer *buffer = GST_BUFFER_CAST (item);
|
GstBuffer *buffer = GST_BUFFER_CAST (item);
|
||||||
|
|
||||||
|
@ -748,6 +770,8 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
|
||||||
/* unblock the loop and chain functions */
|
/* unblock the loop and chain functions */
|
||||||
GST_QUEUE_SIGNAL_ADD (queue);
|
GST_QUEUE_SIGNAL_ADD (queue);
|
||||||
GST_QUEUE_SIGNAL_DEL (queue);
|
GST_QUEUE_SIGNAL_DEL (queue);
|
||||||
|
queue->last_query = FALSE;
|
||||||
|
g_cond_signal (&queue->query_handled);
|
||||||
GST_QUEUE_MUTEX_UNLOCK (queue);
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
|
|
||||||
/* make sure it pauses, this should happen since we sent
|
/* make sure it pauses, this should happen since we sent
|
||||||
|
@ -830,16 +854,19 @@ gst_queue_handle_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
|
||||||
switch (GST_QUERY_TYPE (query)) {
|
switch (GST_QUERY_TYPE (query)) {
|
||||||
default:
|
default:
|
||||||
if (G_UNLIKELY (GST_QUERY_IS_SERIALIZED (query))) {
|
if (G_UNLIKELY (GST_QUERY_IS_SERIALIZED (query))) {
|
||||||
|
GstQueueItem *qitem;
|
||||||
|
|
||||||
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
||||||
GST_LOG_OBJECT (queue, "queuing query %p (%s)", query,
|
GST_LOG_OBJECT (queue, "queuing query %p (%s)", query,
|
||||||
GST_QUERY_TYPE_NAME (query));
|
GST_QUERY_TYPE_NAME (query));
|
||||||
gst_queue_array_push_tail (queue->queue, query);
|
qitem = g_slice_new (GstQueueItem);
|
||||||
|
qitem->item = GST_MINI_OBJECT_CAST (query);
|
||||||
|
qitem->is_query = TRUE;
|
||||||
|
gst_queue_array_push_tail (queue->queue, qitem);
|
||||||
GST_QUEUE_SIGNAL_ADD (queue);
|
GST_QUEUE_SIGNAL_ADD (queue);
|
||||||
while (!gst_queue_array_is_empty (queue->queue)) {
|
g_cond_wait (&queue->query_handled, &queue->qlock);
|
||||||
/* for as long as the queue has items, we know the query is
|
if (queue->srcresult != GST_FLOW_OK)
|
||||||
* not handled yet */
|
goto out_flushing;
|
||||||
GST_QUEUE_WAIT_DEL_CHECK (queue, out_flushing);
|
|
||||||
}
|
|
||||||
res = queue->last_query;
|
res = queue->last_query;
|
||||||
GST_QUEUE_MUTEX_UNLOCK (queue);
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
} else {
|
} else {
|
||||||
|
@ -852,16 +879,7 @@ gst_queue_handle_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
|
||||||
/* ERRORS */
|
/* ERRORS */
|
||||||
out_flushing:
|
out_flushing:
|
||||||
{
|
{
|
||||||
gint index;
|
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (queue, "we are flushing");
|
GST_DEBUG_OBJECT (queue, "we are flushing");
|
||||||
|
|
||||||
/* Remove query from queue if still there, since we hold no ref to it */
|
|
||||||
index = gst_queue_array_find (queue->queue, NULL, query);
|
|
||||||
|
|
||||||
if (index >= 0)
|
|
||||||
gst_queue_array_drop_element (queue->queue, index);
|
|
||||||
|
|
||||||
GST_QUEUE_MUTEX_UNLOCK (queue);
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
return FALSE;
|
return FALSE;
|
||||||
}
|
}
|
||||||
|
@ -870,7 +888,7 @@ out_flushing:
|
||||||
static gboolean
|
static gboolean
|
||||||
gst_queue_is_empty (GstQueue * queue)
|
gst_queue_is_empty (GstQueue * queue)
|
||||||
{
|
{
|
||||||
GstMiniObject *head;
|
GstQueueItem *head;
|
||||||
|
|
||||||
if (gst_queue_array_is_empty (queue->queue))
|
if (gst_queue_array_is_empty (queue->queue))
|
||||||
return TRUE;
|
return TRUE;
|
||||||
|
@ -880,7 +898,7 @@ gst_queue_is_empty (GstQueue * queue)
|
||||||
* we would block forever on serialized queries.
|
* we would block forever on serialized queries.
|
||||||
*/
|
*/
|
||||||
head = gst_queue_array_peek_head (queue->queue);
|
head = gst_queue_array_peek_head (queue->queue);
|
||||||
if (!GST_IS_BUFFER (head) && !GST_IS_BUFFER_LIST (head))
|
if (!GST_IS_BUFFER (head->item) && !GST_IS_BUFFER_LIST (head->item))
|
||||||
return FALSE;
|
return FALSE;
|
||||||
|
|
||||||
/* It is possible that a max size is reached before all min thresholds are.
|
/* It is possible that a max size is reached before all min thresholds are.
|
||||||
|
@ -1134,6 +1152,7 @@ next:
|
||||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||||
"dropping query %p because of EOS", query);
|
"dropping query %p because of EOS", query);
|
||||||
queue->last_query = FALSE;
|
queue->last_query = FALSE;
|
||||||
|
g_cond_signal (&queue->query_handled);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/* no more items in the queue. Set the unexpected flag so that upstream
|
/* no more items in the queue. Set the unexpected flag so that upstream
|
||||||
|
@ -1162,6 +1181,7 @@ next:
|
||||||
GstQuery *query = GST_QUERY_CAST (data);
|
GstQuery *query = GST_QUERY_CAST (data);
|
||||||
|
|
||||||
queue->last_query = gst_pad_peer_query (queue->srcpad, query);
|
queue->last_query = gst_pad_peer_query (queue->srcpad, query);
|
||||||
|
g_cond_signal (&queue->query_handled);
|
||||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||||
"did query %p, return %d", query, queue->last_query);
|
"did query %p, return %d", query, queue->last_query);
|
||||||
}
|
}
|
||||||
|
|
|
@ -133,6 +133,7 @@ struct _GstQueue {
|
||||||
/* whether the first new segment has been applied to src */
|
/* whether the first new segment has been applied to src */
|
||||||
gboolean newseg_applied_to_src;
|
gboolean newseg_applied_to_src;
|
||||||
|
|
||||||
|
GCond query_handled;
|
||||||
gboolean last_query;
|
gboolean last_query;
|
||||||
|
|
||||||
gboolean flush_on_eos; /* flush on EOS */
|
gboolean flush_on_eos; /* flush on EOS */
|
||||||
|
|
Loading…
Reference in a new issue