queue2: do not post buffering messages holding the lock

It might cause deadlocks to post messages while holding the queue2
lock. To avoid this a new boolean flag is set whenever a new
buffering percent is found. The message is posted after the lock
is released.

To make sure the buffering messages are posted in the right order, messages
are posted holding another lock. This prevents 2 threads trying to post
messages at the same time.

https://bugzilla.gnome.org/show_bug.cgi?id=736969
This commit is contained in:
Thiago Santos 2014-09-19 12:02:46 -03:00
parent 7a93e6b005
commit ecf479e3d1
2 changed files with 57 additions and 31 deletions

View file

@ -206,6 +206,16 @@ enum
} \ } \
} G_STMT_END } G_STMT_END
#define SET_PERCENT(q, perc) G_STMT_START { \
if (perc != q->buffering_percent) { \
q->buffering_percent = perc; \
q->percent_changed = TRUE; \
GST_DEBUG_OBJECT (q, "buffering %d percent", perc); \
get_buffering_stats (q, perc, &q->mode, &q->avg_in, &q->avg_out, \
&q->buffering_left); \
} \
} G_STMT_END
#define _do_init \ #define _do_init \
GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \ GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \ GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
@ -254,6 +264,7 @@ static gboolean gst_queue2_is_filled (GstQueue2 * queue);
static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range); static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
static void update_in_rates (GstQueue2 * queue); static void update_in_rates (GstQueue2 * queue);
static void gst_queue2_post_buffering (GstQueue2 * queue);
typedef enum typedef enum
{ {
@ -453,6 +464,7 @@ gst_queue2_init (GstQueue2 * queue)
g_cond_init (&queue->query_handled); g_cond_init (&queue->query_handled);
queue->last_query = FALSE; queue->last_query = FALSE;
g_mutex_init (&queue->buffering_post_lock);
queue->buffering_percent = 100; queue->buffering_percent = 100;
/* tempfile related */ /* tempfile related */
@ -486,6 +498,7 @@ gst_queue2_finalize (GObject * object)
queue->last_query = FALSE; queue->last_query = FALSE;
g_queue_clear (&queue->queue); g_queue_clear (&queue->queue);
g_mutex_clear (&queue->qlock); g_mutex_clear (&queue->qlock);
g_mutex_clear (&queue->buffering_post_lock);
g_cond_clear (&queue->item_add); g_cond_clear (&queue->item_add);
g_cond_clear (&queue->item_del); g_cond_clear (&queue->item_del);
g_cond_clear (&queue->query_handled); g_cond_clear (&queue->query_handled);
@ -872,11 +885,36 @@ get_buffering_stats (GstQueue2 * queue, gint percent, GstBufferingMode * mode,
} }
} }
static void
gst_queue2_post_buffering (GstQueue2 * queue)
{
GstMessage *msg = NULL;
g_mutex_lock (&queue->buffering_post_lock);
GST_QUEUE2_MUTEX_LOCK (queue);
if (queue->percent_changed) {
gint percent = queue->buffering_percent;
queue->percent_changed = FALSE;
GST_DEBUG_OBJECT (queue, "Going to post buffering: %d%%", percent);
msg = gst_message_new_buffering (GST_OBJECT_CAST (queue), percent);
gst_message_set_buffering_stats (msg, queue->mode, queue->avg_in,
queue->avg_out, queue->buffering_left);
}
GST_QUEUE2_MUTEX_UNLOCK (queue);
if (msg != NULL)
gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
g_mutex_unlock (&queue->buffering_post_lock);
}
static void static void
update_buffering (GstQueue2 * queue) update_buffering (GstQueue2 * queue)
{ {
gint percent; gint percent;
gboolean post = FALSE;
/* Ensure the variables used to calculate buffering state are up-to-date. */ /* Ensure the variables used to calculate buffering state are up-to-date. */
if (queue->current) if (queue->current)
@ -887,42 +925,19 @@ update_buffering (GstQueue2 * queue)
return; return;
if (queue->is_buffering) { if (queue->is_buffering) {
post = TRUE;
/* if we were buffering see if we reached the high watermark */ /* if we were buffering see if we reached the high watermark */
if (percent >= queue->high_percent) if (percent >= queue->high_percent)
queue->is_buffering = FALSE; queue->is_buffering = FALSE;
SET_PERCENT (queue, percent);
} else { } else {
/* we were not buffering, check if we need to start buffering if we drop /* we were not buffering, check if we need to start buffering if we drop
* below the low threshold */ * below the low threshold */
if (percent < queue->low_percent) { if (percent < queue->low_percent) {
queue->is_buffering = TRUE; queue->is_buffering = TRUE;
post = TRUE; SET_PERCENT (queue, percent);
} }
} }
if (post) {
if (percent == queue->buffering_percent)
post = FALSE;
else
queue->buffering_percent = percent;
}
if (post) {
GstMessage *message;
GstBufferingMode mode;
gint avg_in, avg_out;
gint64 buffering_left;
get_buffering_stats (queue, percent, &mode, &avg_in, &avg_out,
&buffering_left);
message = gst_message_new_buffering (GST_OBJECT_CAST (queue),
(gint) percent);
gst_message_set_buffering_stats (message, mode,
avg_in, avg_out, buffering_left);
gst_element_post_message (GST_ELEMENT_CAST (queue), message);
}
} }
static void static void
@ -2314,6 +2329,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
goto out_eos; goto out_eos;
gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT); gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
GST_QUEUE2_MUTEX_UNLOCK (queue); GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_queue2_post_buffering (queue);
} else { } else {
/* non-serialized events are passed upstream. */ /* non-serialized events are passed upstream. */
ret = gst_pad_push_event (queue->srcpad, event); ret = gst_pad_push_event (queue->srcpad, event);
@ -2390,6 +2406,7 @@ gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
res = FALSE; res = FALSE;
} }
GST_QUEUE2_MUTEX_UNLOCK (queue); GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_queue2_post_buffering (queue);
} else { } else {
res = gst_pad_query_default (pad, parent, query); res = gst_pad_query_default (pad, parent, query);
} }
@ -2490,6 +2507,7 @@ gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue,
/* put buffer in queue now */ /* put buffer in queue now */
gst_queue2_locked_enqueue (queue, item, item_type); gst_queue2_locked_enqueue (queue, item, item_type);
GST_QUEUE2_MUTEX_UNLOCK (queue); GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_queue2_post_buffering (queue);
return GST_FLOW_OK; return GST_FLOW_OK;
@ -2630,6 +2648,7 @@ next:
item_type == GST_QUEUE2_ITEM_TYPE_BUFFER || item_type == GST_QUEUE2_ITEM_TYPE_BUFFER ||
item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST); item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
GST_QUEUE2_MUTEX_UNLOCK (queue); GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_queue2_post_buffering (queue);
if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) { if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
GstBuffer *buffer; GstBuffer *buffer;
@ -2749,6 +2768,7 @@ gst_queue2_loop (GstPad * pad)
goto out_flushing; goto out_flushing;
GST_QUEUE2_MUTEX_UNLOCK (queue); GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_queue2_post_buffering (queue);
return; return;
@ -3109,6 +3129,7 @@ gst_queue2_get_range (GstPad * pad, GstObject * parent, guint64 offset,
/* FIXME - function will block when the range is not yet available */ /* FIXME - function will block when the range is not yet available */
ret = gst_queue2_create_read (queue, offset, length, buffer); ret = gst_queue2_create_read (queue, offset, length, buffer);
GST_QUEUE2_MUTEX_UNLOCK (queue); GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_queue2_post_buffering (queue);
return ret; return ret;
@ -3434,13 +3455,10 @@ gst_queue2_set_property (GObject * object,
case PROP_USE_BUFFERING: case PROP_USE_BUFFERING:
queue->use_buffering = g_value_get_boolean (value); queue->use_buffering = g_value_get_boolean (value);
if (!queue->use_buffering && queue->is_buffering) { if (!queue->use_buffering && queue->is_buffering) {
GstMessage *msg = gst_message_new_buffering (GST_OBJECT_CAST (queue),
100);
GST_DEBUG_OBJECT (queue, "Disabled buffering while buffering, " GST_DEBUG_OBJECT (queue, "Disabled buffering while buffering, "
"posting 100%% message"); "posting 100%% message");
SET_PERCENT (queue, 100);
queue->is_buffering = FALSE; queue->is_buffering = FALSE;
gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
} }
if (queue->use_buffering) { if (queue->use_buffering) {
@ -3472,6 +3490,7 @@ gst_queue2_set_property (GObject * object,
} }
GST_QUEUE2_MUTEX_UNLOCK (queue); GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_queue2_post_buffering (queue);
} }
static void static void

View file

@ -153,6 +153,13 @@ struct _GstQueue2
guint8 * ring_buffer; guint8 * ring_buffer;
volatile gint downstream_may_block; volatile gint downstream_may_block;
GstBufferingMode mode;
gint64 buffering_left;
gint avg_in;
gint avg_out;
gboolean percent_changed;
GMutex buffering_post_lock; /* assures only one posted at a time */
}; };
struct _GstQueue2Class struct _GstQueue2Class