mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-02-17 11:45:25 +00:00
multiqueue: do not post messages holding the lock
It might cause deadlocks to post messages while holding the multiqueue lock. To avoid this a new boolean flag is set whenever a new buffering percent is found. The message is posted after the lock can be released. To make sure the buffering messages are posted in the right order, messages are posted holding another lock. This prevents 2 threads trying to post messages at the same time. https://bugzilla.gnome.org/show_bug.cgi?id=736295
This commit is contained in:
parent
a7d7d9305f
commit
3f83ca9c43
2 changed files with 50 additions and 24 deletions
|
@ -200,6 +200,7 @@ static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
|
|||
static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
|
||||
|
||||
static void update_buffering (GstMultiQueue * mq, GstSingleQueue * sq);
|
||||
static void gst_multi_queue_post_buffering (GstMultiQueue * mq);
|
||||
|
||||
static void gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full);
|
||||
|
||||
|
@ -271,6 +272,14 @@ enum
|
|||
g_mutex_unlock (&q->qlock); \
|
||||
} G_STMT_END
|
||||
|
||||
#define SET_PERCENT(mq, perc) G_STMT_START { \
|
||||
if (perc != mq->percent) { \
|
||||
mq->percent = perc; \
|
||||
mq->percent_changed = TRUE; \
|
||||
GST_DEBUG_OBJECT (mq, "buffering %d percent", perc); \
|
||||
} \
|
||||
} G_STMT_END
|
||||
|
||||
static void gst_multi_queue_finalize (GObject * object);
|
||||
static void gst_multi_queue_set_property (GObject * object,
|
||||
guint prop_id, const GValue * value, GParamSpec * pspec);
|
||||
|
@ -461,6 +470,7 @@ gst_multi_queue_init (GstMultiQueue * mqueue)
|
|||
mqueue->high_time = GST_CLOCK_TIME_NONE;
|
||||
|
||||
g_mutex_init (&mqueue->qlock);
|
||||
g_mutex_init (&mqueue->buffering_post_lock);
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -475,6 +485,7 @@ gst_multi_queue_finalize (GObject * object)
|
|||
|
||||
/* free/unref instance data */
|
||||
g_mutex_clear (&mqueue->qlock);
|
||||
g_mutex_clear (&mqueue->buffering_post_lock);
|
||||
|
||||
G_OBJECT_CLASS (parent_class)->finalize (object);
|
||||
}
|
||||
|
@ -502,6 +513,7 @@ gst_multi_queue_set_property (GObject * object, guint prop_id,
|
|||
mq->max_size.bytes = g_value_get_uint (value);
|
||||
SET_CHILD_PROPERTY (mq, bytes);
|
||||
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
|
||||
gst_multi_queue_post_buffering (mq);
|
||||
break;
|
||||
case PROP_MAX_SIZE_BUFFERS:
|
||||
{
|
||||
|
@ -537,6 +549,7 @@ gst_multi_queue_set_property (GObject * object, guint prop_id,
|
|||
}
|
||||
|
||||
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
|
||||
gst_multi_queue_post_buffering (mq);
|
||||
|
||||
break;
|
||||
}
|
||||
|
@ -545,6 +558,7 @@ gst_multi_queue_set_property (GObject * object, guint prop_id,
|
|||
mq->max_size.time = g_value_get_uint64 (value);
|
||||
SET_CHILD_PROPERTY (mq, time);
|
||||
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
|
||||
gst_multi_queue_post_buffering (mq);
|
||||
break;
|
||||
case PROP_EXTRA_SIZE_BYTES:
|
||||
mq->extra_size.bytes = g_value_get_uint (value);
|
||||
|
@ -558,13 +572,11 @@ gst_multi_queue_set_property (GObject * object, guint prop_id,
|
|||
case PROP_USE_BUFFERING:
|
||||
mq->use_buffering = g_value_get_boolean (value);
|
||||
if (!mq->use_buffering && mq->buffering) {
|
||||
GstMessage *message;
|
||||
|
||||
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
|
||||
mq->buffering = FALSE;
|
||||
GST_DEBUG_OBJECT (mq, "buffering 100 percent");
|
||||
message = gst_message_new_buffering (GST_OBJECT_CAST (mq), 100);
|
||||
|
||||
gst_element_post_message (GST_ELEMENT_CAST (mq), message);
|
||||
SET_PERCENT (mq, 100);
|
||||
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
|
||||
}
|
||||
|
||||
if (mq->use_buffering) {
|
||||
|
@ -583,6 +595,7 @@ gst_multi_queue_set_property (GObject * object, guint prop_id,
|
|||
|
||||
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
|
||||
}
|
||||
gst_multi_queue_post_buffering (mq);
|
||||
break;
|
||||
case PROP_LOW_PERCENT:
|
||||
mq->low_percent = g_value_get_int (value);
|
||||
|
@ -775,6 +788,7 @@ gst_multi_queue_change_state (GstElement * element, GstStateChange transition)
|
|||
SET_CHILD_PROPERTY (mqueue, visible);
|
||||
|
||||
GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
|
||||
gst_multi_queue_post_buffering (mqueue);
|
||||
|
||||
break;
|
||||
}
|
||||
|
@ -906,7 +920,6 @@ static void
|
|||
update_buffering (GstMultiQueue * mq, GstSingleQueue * sq)
|
||||
{
|
||||
gint percent;
|
||||
gboolean post = FALSE;
|
||||
|
||||
/* nothing to dowhen we are not in buffering mode */
|
||||
if (!mq->use_buffering)
|
||||
|
@ -915,19 +928,13 @@ update_buffering (GstMultiQueue * mq, GstSingleQueue * sq)
|
|||
percent = get_percentage (sq);
|
||||
|
||||
if (mq->buffering) {
|
||||
post = TRUE;
|
||||
if (percent >= mq->high_percent) {
|
||||
mq->buffering = FALSE;
|
||||
}
|
||||
/* make sure it increases */
|
||||
percent = MAX (mq->percent, percent);
|
||||
|
||||
if (percent == mq->percent)
|
||||
/* don't post if nothing changed */
|
||||
post = FALSE;
|
||||
else
|
||||
/* else keep last value we posted */
|
||||
mq->percent = percent;
|
||||
SET_PERCENT (mq, percent);
|
||||
} else {
|
||||
GList *iter;
|
||||
gboolean is_buffering = TRUE;
|
||||
|
@ -944,26 +951,37 @@ update_buffering (GstMultiQueue * mq, GstSingleQueue * sq)
|
|||
|
||||
if (is_buffering && percent < mq->low_percent) {
|
||||
mq->buffering = TRUE;
|
||||
mq->percent = percent;
|
||||
post = TRUE;
|
||||
SET_PERCENT (mq, percent);
|
||||
}
|
||||
}
|
||||
if (post) {
|
||||
GstMessage *message;
|
||||
}
|
||||
|
||||
static void
|
||||
gst_multi_queue_post_buffering (GstMultiQueue * mq)
|
||||
{
|
||||
GstMessage *msg = NULL;
|
||||
|
||||
g_mutex_lock (&mq->buffering_post_lock);
|
||||
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
|
||||
if (mq->percent_changed) {
|
||||
gint percent = mq->percent;
|
||||
|
||||
mq->percent_changed = FALSE;
|
||||
|
||||
/* scale to high percent so that it becomes the 100% mark */
|
||||
percent = percent * 100 / mq->high_percent;
|
||||
/* clip */
|
||||
if (percent > 100)
|
||||
percent = 100;
|
||||
|
||||
GST_DEBUG_OBJECT (mq, "buffering %d percent", percent);
|
||||
message = gst_message_new_buffering (GST_OBJECT_CAST (mq), percent);
|
||||
|
||||
gst_element_post_message (GST_ELEMENT_CAST (mq), message);
|
||||
} else {
|
||||
GST_DEBUG_OBJECT (mq, "filled %d percent", percent);
|
||||
GST_DEBUG_OBJECT (mq, "Going to post buffering: %d%%", percent);
|
||||
msg = gst_message_new_buffering (GST_OBJECT_CAST (mq), percent);
|
||||
}
|
||||
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
|
||||
|
||||
if (msg != NULL)
|
||||
gst_element_post_message (GST_ELEMENT_CAST (mq), msg);
|
||||
|
||||
g_mutex_unlock (&mq->buffering_post_lock);
|
||||
}
|
||||
|
||||
/* calculate the diff between running time on the sink and src of the queue.
|
||||
|
@ -1064,6 +1082,7 @@ apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
|
|||
update_time_level (mq, sq);
|
||||
|
||||
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
|
||||
gst_multi_queue_post_buffering (mq);
|
||||
}
|
||||
|
||||
/* take a buffer and update segment, updating the time level of the queue. */
|
||||
|
@ -1095,6 +1114,7 @@ apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp,
|
|||
/* calc diff with other end */
|
||||
update_time_level (mq, sq);
|
||||
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
|
||||
gst_multi_queue_post_buffering (mq);
|
||||
}
|
||||
|
||||
static GstClockTime
|
||||
|
@ -1473,6 +1493,7 @@ gst_multi_queue_loop (GstPad * pad)
|
|||
update_buffering (mq, sq);
|
||||
|
||||
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
|
||||
gst_multi_queue_post_buffering (mq);
|
||||
|
||||
if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED
|
||||
&& result != GST_FLOW_EOS)
|
||||
|
@ -1732,6 +1753,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
|
|||
update_buffering (mq, sq);
|
||||
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
|
||||
single_queue_overrun_cb (sq->queue, sq);
|
||||
gst_multi_queue_post_buffering (mq);
|
||||
break;
|
||||
case GST_EVENT_SEGMENT:
|
||||
apply_segment (mq, sq, sref, &sq->sink_segment);
|
||||
|
@ -2200,6 +2222,7 @@ gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full)
|
|||
GST_MULTI_QUEUE_MUTEX_LOCK (sq->mqueue);
|
||||
update_buffering (sq->mqueue, sq);
|
||||
GST_MULTI_QUEUE_MUTEX_UNLOCK (sq->mqueue);
|
||||
gst_multi_queue_post_buffering (sq->mqueue);
|
||||
}
|
||||
|
||||
static void
|
||||
|
|
|
@ -74,6 +74,9 @@ struct _GstMultiQueue {
|
|||
/* GstMultiQueueSize, counter and highid */
|
||||
|
||||
gint numwaiting; /* number of not-linked pads waiting */
|
||||
|
||||
gboolean percent_changed;
|
||||
GMutex buffering_post_lock; /* assures only one posted at a time */
|
||||
};
|
||||
|
||||
struct _GstMultiQueueClass {
|
||||
|
|
Loading…
Reference in a new issue