pad-monitor: add lots of locking

When handling elements that spawn multiple threads (hardware
enc/decoders), the pad monitor has to protect its variables specially
because some checks involve iterating over internally linked pads to
add/get some data for comparison (expected events, timestamp ranges,
caps).

Aside from locking its own mutex, the pad monitor can also lock the
parent's mutex when it needs to use data from its internally linked
pads. The locking order should always be parent and then individual
pad-monitor mutexes. This should prevent deadlocks when multiple
pad-monitors from the same element start doing checks at the same time
from different threads.
This commit is contained in:
Thiago Santos 2013-07-24 16:04:03 -03:00
parent 48c49f5071
commit 4a3f06885a

View file

@ -44,6 +44,43 @@ G_DEFINE_TYPE_WITH_CODE (GstQaPadMonitor, gst_qa_pad_monitor,
#define PAD_IS_IN_PUSH_MODE(p) ((p)->mode == GST_ACTIVATE_PUSH)
#define PENDING_FIELDS "pending-fields"
/*
* Locking the parent should always be done before locking the
* pad-monitor to prevent deadlocks in case another monitor from
* another pad on the same element starts an operation that also
* requires locking itself and some other monitors from internally
* linked pads.
*
* An example:
* An element has a sink and a src pad. Some test starts running at sinkpad
* and it locks the parent, and then it locks itself. In case it needs to get
* some information from the srcpad, it is able to lock the srcpad and get it
* because the srcpad should never lock itself before locking the parent (which
* it won't be able as sinkpad already locked it).
*
* As a side one, it is possible that srcpad locks itself without locking the
* parent in case it wants to do a check that won't need to use other internally
* linked pads (sinkpad). But in this case it might lock and unlock freely without
* causing deadlocks.
*/
#define GST_QA_PAD_MONITOR_PARENT_LOCK(m) \
G_STMT_START { \
if (G_LIKELY (GST_QA_MONITOR_GET_PARENT (m))) { \
GST_QA_MONITOR_LOCK (GST_QA_MONITOR_GET_PARENT (m)); \
} else { \
GST_WARNING_OBJECT (m, "No parent found, can't lock"); \
} \
} G_STMT_END
#define GST_QA_PAD_MONITOR_PARENT_UNLOCK(m) \
G_STMT_START { \
if (G_LIKELY (GST_QA_MONITOR_GET_PARENT (m))) { \
GST_QA_MONITOR_UNLOCK (GST_QA_MONITOR_GET_PARENT (m)); \
} else { \
GST_WARNING_OBJECT (m, "No parent found, can't unlock"); \
} \
} G_STMT_END
typedef struct
{
GstClockTime timestamp;
@ -269,7 +306,7 @@ static gboolean
gst_qa_pad_monitor_pad_should_proxy_othercaps (GstQaPadMonitor * monitor)
{
GstQaMonitor *parent = GST_QA_MONITOR_GET_PARENT (monitor);
/* We only know how to handle othercaps checks for decoders so far */
/* We only know how to handle othercaps checks for codecs so far */
return GST_QA_ELEMENT_MONITOR_ELEMENT_IS_DECODER (parent) ||
GST_QA_ELEMENT_MONITOR_ELEMENT_IS_ENCODER (parent);
}
@ -449,7 +486,6 @@ _parent_set_cb (GstObject * object, GstObject * parent, GstQaMonitor * monitor)
GST_DEBUG_PAD_NAME (object)));
}
static void
gst_qa_pad_monitor_dispose (GObject * object)
{
@ -572,12 +608,14 @@ gst_qa_pad_monitor_check_buffer_timestamp_in_received_range (GstQaPadMonitor *
GST_DEBUG_OBJECT (monitor, "Checking pad %s:%s input timestamps",
GST_DEBUG_PAD_NAME (otherpad));
othermonitor = g_object_get_data ((GObject *) otherpad, "qa-monitor");
GST_QA_MONITOR_LOCK (othermonitor);
if (gst_qa_pad_monitor_timestamp_is_in_received_range (othermonitor, ts)
&& gst_qa_pad_monitor_timestamp_is_in_received_range (othermonitor,
ts_end)) {
done = TRUE;
found = TRUE;
}
GST_QA_MONITOR_UNLOCK (othermonitor);
gst_object_unref (otherpad);
has_one = TRUE;
break;
@ -702,9 +740,12 @@ gst_qa_pad_monitor_check_aggregated_return (GstQaPadMonitor * monitor,
othermonitor = g_object_get_data ((GObject *) peerpad, "qa-monitor");
if (othermonitor) {
found_a_pad = TRUE;
GST_QA_MONITOR_LOCK (othermonitor);
aggregated =
_combine_flows (aggregated, othermonitor->last_flow_return);
GST_QA_MONITOR_UNLOCK (othermonitor);
}
gst_object_unref (peerpad);
}
gst_object_unref (otherpad);
@ -761,7 +802,9 @@ gst_qa_pad_monitor_otherpad_add_pending_serialized_event (GstQaPadMonitor *
SerializedEventData *data = g_slice_new0 (SerializedEventData);
data->timestamp = last_ts;
data->event = gst_event_ref (event);
GST_QA_MONITOR_LOCK (othermonitor);
g_ptr_array_add (othermonitor->serialized_events, data);
GST_QA_MONITOR_UNLOCK (othermonitor);
}
break;
case GST_ITERATOR_RESYNC:
@ -803,9 +846,11 @@ gst_qa_pad_monitor_otherpad_add_pending_field (GstQaPadMonitor * monitor,
case GST_ITERATOR_OK:
othermonitor = g_object_get_data ((GObject *) otherpad, "qa-monitor");
if (othermonitor) {
GST_QA_MONITOR_LOCK (othermonitor);
g_assert (othermonitor->pending_setcaps_fields != NULL);
gst_structure_set_value (othermonitor->pending_setcaps_fields,
field, v);
GST_QA_MONITOR_UNLOCK (othermonitor);
}
break;
case GST_ITERATOR_RESYNC:
@ -838,10 +883,12 @@ gst_qa_pad_monitor_otherpad_clear_pending_fields (GstQaPadMonitor * monitor)
case GST_ITERATOR_OK:
othermonitor = g_object_get_data ((GObject *) otherpad, "qa-monitor");
if (othermonitor) {
GST_QA_MONITOR_LOCK (othermonitor);
g_assert (othermonitor->pending_setcaps_fields != NULL);
gst_structure_free (othermonitor->pending_setcaps_fields);
othermonitor->pending_setcaps_fields =
gst_structure_empty_new (PENDING_FIELDS);
GST_QA_MONITOR_UNLOCK (othermonitor);
}
break;
case GST_ITERATOR_RESYNC:
@ -874,12 +921,14 @@ gst_qa_pad_monitor_add_expected_newsegment (GstQaPadMonitor * monitor,
switch (gst_iterator_next (iter, (gpointer *) & otherpad)) {
case GST_ITERATOR_OK:
othermonitor = g_object_get_data ((GObject *) otherpad, "qa-monitor");
GST_QA_MONITOR_LOCK (othermonitor);
if (othermonitor->expected_segment) {
GST_QA_MONITOR_REPORT_WARNING (othermonitor, FALSE, EVENT, EXPECTED,
"expected newsegment event never pushed");
gst_event_unref (othermonitor->expected_segment);
}
othermonitor->expected_segment = gst_event_ref (event);
GST_QA_MONITOR_UNLOCK (othermonitor);
gst_object_unref (otherpad);
break;
case GST_ITERATOR_RESYNC:
@ -1031,8 +1080,12 @@ gst_qa_pad_monitor_sink_event_check (GstQaPadMonitor * pad_monitor,
}
if (handler) {
GST_QA_MONITOR_UNLOCK (pad_monitor);
GST_QA_PAD_MONITOR_PARENT_UNLOCK (pad_monitor);
gst_event_ref (event);
ret = pad_monitor->event_func (pad, event);
GST_QA_PAD_MONITOR_PARENT_LOCK (pad_monitor);
GST_QA_MONITOR_LOCK (pad_monitor);
}
/* post checks */
@ -1107,8 +1160,10 @@ gst_qa_pad_monitor_src_event_check (GstQaPadMonitor * pad_monitor,
}
if (handler) {
GST_QA_MONITOR_UNLOCK (pad_monitor);
gst_event_ref (event);
ret = pad_monitor->event_func (pad, event);
GST_QA_MONITOR_LOCK (pad_monitor);
}
/* post checks */
@ -1136,15 +1191,24 @@ gst_qa_pad_monitor_chain_func (GstPad * pad, GstBuffer * buffer)
g_object_get_data ((GObject *) pad, "qa-monitor");
GstFlowReturn ret;
gst_qa_pad_monitor_check_first_buffer (pad_monitor, buffer);
GST_QA_MONITOR_LOCK (pad_monitor);
gst_qa_pad_monitor_check_first_buffer (pad_monitor, buffer);
gst_qa_pad_monitor_update_buffer_data (pad_monitor, buffer);
GST_QA_MONITOR_UNLOCK (pad_monitor);
ret = pad_monitor->chain_func (pad, buffer);
GST_QA_PAD_MONITOR_PARENT_LOCK (pad_monitor);
GST_QA_MONITOR_LOCK (pad_monitor);
pad_monitor->last_flow_return = ret;
gst_qa_pad_monitor_check_aggregated_return (pad_monitor, ret);
GST_QA_MONITOR_UNLOCK (pad_monitor);
GST_QA_PAD_MONITOR_PARENT_UNLOCK (pad_monitor);
return ret;
}
@ -1153,6 +1217,10 @@ gst_qa_pad_monitor_sink_event_func (GstPad * pad, GstEvent * event)
{
GstQaPadMonitor *pad_monitor =
g_object_get_data ((GObject *) pad, "qa-monitor");
gboolean ret;
GST_QA_PAD_MONITOR_PARENT_LOCK (pad_monitor);
GST_QA_MONITOR_LOCK (pad_monitor);
if (GST_EVENT_IS_SERIALIZED (event)) {
GstClockTime last_ts;
@ -1168,8 +1236,12 @@ gst_qa_pad_monitor_sink_event_func (GstPad * pad, GstEvent * event)
event, last_ts);
}
return gst_qa_pad_monitor_sink_event_check (pad_monitor, event,
ret = gst_qa_pad_monitor_sink_event_check (pad_monitor, event,
pad_monitor->event_func);
GST_QA_MONITOR_UNLOCK (pad_monitor);
GST_QA_PAD_MONITOR_PARENT_UNLOCK (pad_monitor);
return ret;
}
static gboolean
@ -1177,9 +1249,13 @@ gst_qa_pad_monitor_src_event_func (GstPad * pad, GstEvent * event)
{
GstQaPadMonitor *pad_monitor =
g_object_get_data ((GObject *) pad, "qa-monitor");
gboolean ret;
return gst_qa_pad_monitor_src_event_check (pad_monitor, event,
GST_QA_MONITOR_LOCK (pad_monitor);
ret = gst_qa_pad_monitor_src_event_check (pad_monitor, event,
pad_monitor->event_func);
GST_QA_MONITOR_UNLOCK (pad_monitor);
return ret;
}
static gboolean
@ -1220,6 +1296,9 @@ gst_qa_pad_monitor_buffer_probe (GstPad * pad, GstBuffer * buffer,
{
GstQaPadMonitor *monitor = udata;
GST_QA_PAD_MONITOR_PARENT_LOCK (monitor);
GST_QA_MONITOR_LOCK (monitor);
gst_qa_pad_monitor_check_first_buffer (monitor, buffer);
gst_qa_pad_monitor_update_buffer_data (monitor, buffer);
@ -1249,6 +1328,8 @@ gst_qa_pad_monitor_buffer_probe (GstPad * pad, GstBuffer * buffer,
}
}
}
GST_QA_MONITOR_UNLOCK (monitor);
GST_QA_PAD_MONITOR_PARENT_UNLOCK (monitor);
return TRUE;
}
@ -1256,6 +1337,10 @@ static gboolean
gst_qa_pad_monitor_event_probe (GstPad * pad, GstEvent * event, gpointer udata)
{
GstQaPadMonitor *monitor = GST_QA_PAD_MONITOR_CAST (udata);
gboolean ret;
GST_QA_PAD_MONITOR_PARENT_LOCK (monitor);
GST_QA_MONITOR_LOCK (monitor);
if (GST_EVENT_IS_SERIALIZED (event)) {
gint i;
@ -1286,7 +1371,11 @@ gst_qa_pad_monitor_event_probe (GstPad * pad, GstEvent * event, gpointer udata)
/* This so far is just like an event that is flowing downstream,
* so we do the same checks as a sinkpad event handler */
return gst_qa_pad_monitor_sink_event_check (monitor, event, NULL);
ret = gst_qa_pad_monitor_sink_event_check (monitor, event, NULL);
GST_QA_MONITOR_UNLOCK (monitor);
GST_QA_PAD_MONITOR_PARENT_UNLOCK (monitor);
return ret;
}
static GstCaps *
@ -1303,11 +1392,15 @@ gst_qa_pad_monitor_getcaps_func (GstPad * pad)
}
if (ret) {
/* We shouldn't need to lock the parent as this doesn't modify
* other monitors, just does some peer_pad_caps */
GST_QA_MONITOR_LOCK (pad_monitor);
gst_qa_pad_monitor_check_caps_complete (pad_monitor, ret);
if (GST_PAD_DIRECTION (pad) == GST_PAD_SINK) {
gst_qa_pad_monitor_check_caps_fields_proxied (pad_monitor, ret);
}
GST_QA_MONITOR_UNLOCK (pad_monitor);
}
return ret;
@ -1321,6 +1414,9 @@ gst_qa_pad_monitor_setcaps_func (GstPad * pad, GstCaps * caps)
gboolean ret = TRUE;
GstStructure *structure;
GST_QA_PAD_MONITOR_PARENT_LOCK (pad_monitor);
GST_QA_MONITOR_LOCK (pad_monitor);
if (caps) {
structure = gst_caps_get_structure (caps, 0);
if (gst_structure_n_fields (pad_monitor->pending_setcaps_fields)) {
@ -1373,12 +1469,17 @@ gst_qa_pad_monitor_setcaps_func (GstPad * pad, GstCaps * caps)
pad_monitor->pending_setcaps_fields =
gst_structure_empty_new (PENDING_FIELDS);
GST_QA_MONITOR_UNLOCK (pad_monitor);
GST_QA_PAD_MONITOR_PARENT_UNLOCK (pad_monitor);
if (pad_monitor->setcaps_func) {
ret = pad_monitor->setcaps_func (pad, caps);
}
GST_QA_PAD_MONITOR_PARENT_LOCK (pad_monitor);
if (!ret)
gst_qa_pad_monitor_otherpad_clear_pending_fields (pad_monitor);
GST_QA_PAD_MONITOR_PARENT_UNLOCK (pad_monitor);
return ret;
}