multiqueue: Stop using the gst_pad_element_private API

There was a race where we could still get the pad event function
called when its private member were already unset, leading to
a segfault in the event handler:

```
0  gst_multi_queue_src_event (pad=<optimized out>, parent=<optimized out>, event=0x7f3ff0007600) at ../subprojects/gstreamer/plugins/elements/gstmultiqueue.c:2534
2534          ret = gst_pad_push_event (sq->sinkpad, event);
[Current thread is 1 (Thread 0x7f406c0258c0 (LWP 21925))]
(gdb) bt
0  0x00007f4062ec1399 in gst_multi_queue_src_event (pad=<optimized out>, parent=<optimized out>, event=0x7f3ff0007600 [GstEvent]) at ../subprojects/gstreamer/plugins/elements/gstmultiqueue.c:2534
1  0x00007f406b40f46d in gst_validate_pad_monitor_src_event_check (handler=0x7f4062ec1360 <gst_multi_queue_src_event>, event=0x7f3ff0007600 [GstEvent], parent=0x7f3fcc01f090 [GstMultiQueue|multiqueue167], pad_monitor=0x7f3fe809e7c0 [GstValidatePadMonitor|validatepadmonitor2213]) at ../subprojects/gst-devtools/validate/gst/validate/gst-validate-pad-monitor.c:2101
2  0x00007f406b40f46d in gst_validate_pad_monitor_src_event_func (pad=<optimized out>, parent=0x7f3fcc01f090 [GstMultiQueue|multiqueue167], event=0x7f3ff0007600 [GstEvent]) at ../subprojects/gst-devtools/validate/gst/validate/gst-validate-pad-monitor.c:2374
3  0x00007f406b904387 in gst_pad_send_event_unchecked (pad=pad@entry=0x7f3fdc027650 [GstPad|src_0], event=event@entry=0x7f3ff0007600 [GstEvent], type=<optimized out>, type@entry=GST_PAD_PROBE_TYPE_EVENT_UPSTREAM) at ../subprojects/gstreamer/gst/gstpad.c:5772
4  0x00007f406b90481b in gst_pad_push_event_unchecked (pad=pad@entry=0x7f4058182fc0 [GstPad|sink], event=event@entry=0x7f3ff0007600 [GstEvent], type=type@entry=GST_PAD_PROBE_TYPE_EVENT_UPSTREAM) at ../subprojects/gstreamer/gst/gstpad.c:5417
5  0x00007f406b90f016 in gst_pad_push_event (pad=0x7f4058182fc0 [GstPad|sink], event=event@entry=0x7f3ff0007600 [GstEvent]) at ../subprojects/gstreamer/gst/gstpad.c:5554
6  0x00007f406a1c99ba in gst_video_decoder_src_event_default (decoder=0x7f3fe81c6060 [GstTheoraDec|theoradec46], event=<optimized out>) at ../subprojects/gst-plugins-base/gst-libs/gst/video/gstvideodecoder.c:1532
7  0x00007f406b40f46d in gst_validate_pad_monitor_src_event_check (handler=0x7f406a1ca270 <gst_video_decoder_src_event>, event=0x7f3ff0007600 [GstEvent], parent=0x7f3fe81c6060 [GstTheoraDec|theoradec46], pad_monitor=0x7f4028163aa0 [GstValidatePadMonitor|validatepadmonitor2216]) at ../subprojects/gst-devtools/validate/gst/validate/gst-validate-pad-monitor.c:2101
8  0x00007f406b40f46d in gst_validate_pad_monitor_src_event_func (pad=<optimized out>, parent=0x7f3fe81c6060 [GstTheoraDec|theoradec46], event=0x7f3ff0007600 [GstEvent]) at ../subprojects/gst-devtools/validate/gst/validate/gst-validate-pad-monitor.c:2374
```

This make the GstSingleQueue a MiniObject, mainly so it is properly
refcounted.

This also make use of the GstMultiQueuePad class for srcpads which
is totally valid as srcpads and sinkpads share the same SingleQueue
object.
This commit is contained in:
Thibault Saunier 2019-06-22 23:46:35 -04:00
parent c34fe82b5b
commit 6a20fcc97a

View file

@ -99,8 +99,7 @@
#include "gstmultiqueue.h"
#include <gst/glib-compat-private.h>
/**
* GstSingleQueue:
/* GstSingleQueue:
* @sinkpad: associated sink #GstPad
* @srcpad: associated source #GstPad
*
@ -111,6 +110,8 @@ typedef struct _GstSingleQueue GstSingleQueue;
struct _GstSingleQueue
{
volatile gint refcount;
/* unique identifier of the queue */
guint id;
/* group of streams to which this queue belongs to */
@ -173,7 +174,6 @@ struct _GstSingleQueue
GstClockTime interleave; /* Calculated interleve within the thread */
};
/* Extension of GstDataQueueItem structure for our usage */
typedef struct _GstMultiQueueItem GstMultiQueueItem;
@ -191,7 +191,8 @@ struct _GstMultiQueueItem
};
static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue, guint id);
static void gst_single_queue_free (GstSingleQueue * squeue);
static void gst_single_queue_unref (GstSingleQueue * squeue);
static GstSingleQueue *gst_single_queue_ref (GstSingleQueue * squeue);
static void wake_up_next_non_linked (GstMultiQueue * mq);
static void compute_high_id (GstMultiQueue * mq);
@ -342,8 +343,10 @@ gst_multiqueue_pad_get_property (GObject * object, guint prop_id,
switch (prop_id) {
case PROP_PAD_GROUP_ID:
GST_OBJECT_LOCK (pad->sq->mqueue);
if (pad->sq)
g_value_set_uint (value, pad->sq->groupid);
GST_OBJECT_UNLOCK (pad->sq->mqueue);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@ -359,10 +362,10 @@ gst_multiqueue_pad_set_property (GObject * object, guint prop_id,
switch (prop_id) {
case PROP_PAD_GROUP_ID:
GST_OBJECT_LOCK (pad);
GST_OBJECT_LOCK (pad->sq->mqueue);
if (pad->sq)
pad->sq->groupid = g_value_get_uint (value);
GST_OBJECT_UNLOCK (pad);
GST_OBJECT_UNLOCK (pad->sq->mqueue);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@ -370,6 +373,16 @@ gst_multiqueue_pad_set_property (GObject * object, guint prop_id,
}
}
static void
gst_multiqueue_pad_finalize (GObject * object)
{
GstMultiQueuePad *pad = GST_MULTIQUEUE_PAD (object);
gst_single_queue_unref (pad->sq);
G_OBJECT_CLASS (gst_multiqueue_pad_parent_class)->finalize (object);
}
static void
gst_multiqueue_pad_class_init (GstMultiQueuePadClass * klass)
{
@ -377,6 +390,7 @@ gst_multiqueue_pad_class_init (GstMultiQueuePadClass * klass)
gobject_class->set_property = gst_multiqueue_pad_set_property;
gobject_class->get_property = gst_multiqueue_pad_get_property;
gobject_class->finalize = gst_multiqueue_pad_finalize;
/**
* GstMultiQueuePad:group-id:
@ -632,7 +646,8 @@ gst_multi_queue_class_init (GstMultiQueueClass * klass)
"Generic", "Multiple data queue", "Edward Hervey <edward@fluendo.com>");
gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
&sinktemplate, GST_TYPE_MULTIQUEUE_PAD);
gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
&srctemplate, GST_TYPE_MULTIQUEUE_PAD);
gstelement_class->request_new_pad =
GST_DEBUG_FUNCPTR (gst_multi_queue_request_new_pad);
@ -678,8 +693,7 @@ gst_multi_queue_finalize (GObject * object)
{
GstMultiQueue *mqueue = GST_MULTI_QUEUE (object);
g_list_foreach (mqueue->queues, (GFunc) gst_single_queue_free, NULL);
g_list_free (mqueue->queues);
g_list_free_full (mqueue->queues, (GDestroyNotify) gst_single_queue_unref);
mqueue->queues = NULL;
mqueue->queues_cookie++;
@ -892,7 +906,7 @@ gst_multi_queue_iterate_internal_links (GstPad * pad, GstObject * parent)
GValue val = { 0, };
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
squeue = gst_pad_get_element_private (pad);
squeue = GST_MULTIQUEUE_PAD (pad)->sq;
if (!squeue)
goto out;
@ -984,11 +998,8 @@ gst_multi_queue_release_pad (GstElement * element, GstPad * pad)
gst_pad_set_active (sq->srcpad, FALSE);
gst_pad_set_active (sq->sinkpad, FALSE);
gst_pad_set_element_private (sq->srcpad, NULL);
gst_pad_set_element_private (sq->sinkpad, NULL);
gst_element_remove_pad (element, sq->srcpad);
gst_element_remove_pad (element, sq->sinkpad);
gst_single_queue_free (sq);
}
static GstStateChangeReturn
@ -1791,7 +1802,7 @@ gst_multi_queue_loop (GstPad * pad)
gboolean do_update_buffering = FALSE;
gboolean dropping = FALSE;
sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
sq = GST_MULTIQUEUE_PAD (pad)->sq;
mq = sq->mqueue;
next:
@ -2099,7 +2110,7 @@ gst_multi_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
guint32 curid;
GstClockTime timestamp, duration;
sq = gst_pad_get_element_private (pad);
sq = GST_MULTIQUEUE_PAD (pad)->sq;
mq = sq->mqueue;
/* if eos, we are always full, so avoid hanging incoming indefinitely */
@ -2179,7 +2190,7 @@ gst_multi_queue_sink_activate_mode (GstPad * pad, GstObject * parent,
GstSingleQueue *sq;
GstMultiQueue *mq;
sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
sq = GST_MULTIQUEUE_PAD (pad)->sq;
mq = (GstMultiQueue *) gst_pad_get_parent (pad);
/* mq is NULL if the pad is activated/deactivated before being
@ -2240,7 +2251,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
GstEventType type;
GstEvent *sref = NULL;
sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
sq = GST_MULTIQUEUE_PAD (pad)->sq;
mq = (GstMultiQueue *) parent;
type = GST_EVENT_TYPE (event);
@ -2415,7 +2426,7 @@ gst_multi_queue_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
GstSingleQueue *sq;
GstMultiQueue *mq;
sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
sq = GST_MULTIQUEUE_PAD (pad)->sq;
mq = (GstMultiQueue *) parent;
switch (GST_QUERY_TYPE (query)) {
@ -2490,7 +2501,7 @@ gst_multi_queue_src_activate_mode (GstPad * pad, GstObject * parent,
GstSingleQueue *sq;
gboolean result;
sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
sq = GST_MULTIQUEUE_PAD (pad)->sq;
mq = sq->mqueue;
GST_DEBUG_OBJECT (mq, "SingleQueue %d", sq->id);
@ -2515,7 +2526,7 @@ gst_multi_queue_src_activate_mode (GstPad * pad, GstObject * parent,
static gboolean
gst_multi_queue_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
GstSingleQueue *sq = gst_pad_get_element_private (pad);
GstSingleQueue *sq = GST_MULTIQUEUE_PAD (pad)->sq;
GstMultiQueue *mq = sq->mqueue;
gboolean ret;
@ -2934,14 +2945,25 @@ gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full)
}
static void
gst_single_queue_free (GstSingleQueue * sq)
gst_single_queue_unref (GstSingleQueue * sq)
{
/* DRAIN QUEUE */
gst_data_queue_flush (sq->queue);
g_object_unref (sq->queue);
g_cond_clear (&sq->turn);
g_cond_clear (&sq->query_handled);
g_free (sq);
if (g_atomic_int_dec_and_test (&sq->refcount)) {
/* DRAIN QUEUE */
gst_data_queue_flush (sq->queue);
g_object_unref (sq->queue);
g_cond_clear (&sq->turn);
g_cond_clear (&sq->query_handled);
g_free (sq);
}
}
static GstSingleQueue *
gst_single_queue_ref (GstSingleQueue * squeue)
{
g_atomic_int_inc (&squeue->refcount);
return squeue;
}
static GstSingleQueue *
@ -2975,6 +2997,8 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id)
}
sq = g_new0 (GstSingleQueue, 1);
sq->refcount = 1;
mqueue->nbqueues++;
sq->id = temp_id;
sq->groupid = DEFAULT_PAD_GROUP_ID;
@ -3044,9 +3068,15 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id)
GST_OBJECT_FLAG_SET (sq->sinkpad, GST_PAD_FLAG_PROXY_CAPS);
name = g_strdup_printf ("src_%u", sq->id);
sq->srcpad = gst_pad_new_from_static_template (&srctemplate, name);
templ = gst_static_pad_template_get (&srctemplate);
sq->srcpad = g_object_new (GST_TYPE_MULTIQUEUE_PAD, "name", name,
"direction", templ->direction, "template", templ, NULL);
gst_object_unref (templ);
g_free (name);
mqpad = (GstMultiQueuePad *) sq->srcpad;
mqpad->sq = gst_single_queue_ref (sq);
gst_pad_set_activatemode_function (sq->srcpad,
GST_DEBUG_FUNCPTR (gst_multi_queue_src_activate_mode));
gst_pad_set_event_function (sq->srcpad,
@ -3057,9 +3087,6 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id)
GST_DEBUG_FUNCPTR (gst_multi_queue_iterate_internal_links));
GST_OBJECT_FLAG_SET (sq->srcpad, GST_PAD_FLAG_PROXY_CAPS);
gst_pad_set_element_private (sq->sinkpad, (gpointer) sq);
gst_pad_set_element_private (sq->srcpad, (gpointer) sq);
GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
/* only activate the pads when we are not in the NULL state