rtpssrcdemux: Avoid taking streamlock out-of-band

In this change we now protect the internal srcpads list using the
stream lock and limit usage of the internal stream lock to
preventing data flowing on the other src pad type while creating
and signalling the new pad.

This fixes a deadlock with RTPBin shutdown lock. These two locks would
end up being taken in two different order, which caused a deadlock. More
generally, we should not rely on a streamlock when handling out-of-band
data, so as a side effect, we should not take a stream lock when
iterating internal links.
This commit is contained in:
Nicolas Dufresne 2019-05-21 15:25:03 -04:00
parent a493bcd549
commit f7c712d0b8
2 changed files with 125 additions and 38 deletions

View file

@ -145,6 +145,7 @@ struct _GstRtpSsrcDemuxPad
}; };
/* find a src pad for a given SSRC, returns NULL if the SSRC was not found /* find a src pad for a given SSRC, returns NULL if the SSRC was not found
* MUST be called with object lock
*/ */
static GstRtpSsrcDemuxPad * static GstRtpSsrcDemuxPad *
find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
@ -160,6 +161,38 @@ find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
return NULL; return NULL;
} }
/* returns a reference to the pad if found, %NULL otherwise */
static GstPad *
get_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, PadType padtype)
{
GstRtpSsrcDemuxPad *demuxpad;
GstPad *retpad;
GST_OBJECT_LOCK (demux);
demuxpad = find_demux_pad_for_ssrc (demux, ssrc);
if (!demuxpad) {
GST_OBJECT_UNLOCK (demux);
return NULL;
}
switch (padtype) {
case RTP_PAD:
retpad = gst_object_ref (demuxpad->rtp_pad);
break;
case RTCP_PAD:
retpad = gst_object_ref (demuxpad->rtcp_pad);
break;
default:
retpad = NULL;
g_assert_not_reached ();
}
GST_OBJECT_UNLOCK (demux);
return retpad;
}
static GstEvent * static GstEvent *
add_ssrc_and_ref (GstEvent * event, guint32 ssrc) add_ssrc_and_ref (GstEvent * event, guint32 ssrc)
{ {
@ -229,6 +262,7 @@ forward_initial_events (GstRtpSsrcDemux * demux, guint32 ssrc, GstPad * pad,
gst_pad_sticky_events_foreach (sinkpad, forward_sticky_events, &fdata); gst_pad_sticky_events_foreach (sinkpad, forward_sticky_events, &fdata);
} }
/* MUST only be called from streaming thread */
static GstPad * static GstPad *
find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
PadType padtype) PadType padtype)
@ -242,22 +276,9 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
INTERNAL_STREAM_LOCK (demux); INTERNAL_STREAM_LOCK (demux);
demuxpad = find_demux_pad_for_ssrc (demux, ssrc); retpad = get_demux_pad_for_ssrc (demux, ssrc, padtype);
if (demuxpad != NULL) { if (retpad != NULL) {
switch (padtype) {
case RTP_PAD:
retpad = gst_object_ref (demuxpad->rtp_pad);
break;
case RTCP_PAD:
retpad = gst_object_ref (demuxpad->rtcp_pad);
break;
default:
retpad = NULL;
g_assert_not_reached ();
}
INTERNAL_STREAM_UNLOCK (demux); INTERNAL_STREAM_UNLOCK (demux);
return retpad; return retpad;
} }
@ -283,7 +304,9 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
gst_pad_set_element_private (rtp_pad, demuxpad); gst_pad_set_element_private (rtp_pad, demuxpad);
gst_pad_set_element_private (rtcp_pad, demuxpad); gst_pad_set_element_private (rtcp_pad, demuxpad);
GST_OBJECT_LOCK (demux);
demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad); demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad);
GST_OBJECT_UNLOCK (demux);
gst_pad_set_query_function (rtp_pad, gst_rtp_ssrc_demux_src_query); gst_pad_set_query_function (rtp_pad, gst_rtp_ssrc_demux_src_query);
gst_pad_set_iterate_internal_links_function (rtp_pad, gst_pad_set_iterate_internal_links_function (rtp_pad,
@ -316,17 +339,11 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
g_assert_not_reached (); g_assert_not_reached ();
} }
gst_object_ref (rtp_pad);
gst_object_ref (rtcp_pad);
g_signal_emit (G_OBJECT (demux), g_signal_emit (G_OBJECT (demux),
gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, rtp_pad); gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, rtp_pad);
INTERNAL_STREAM_UNLOCK (demux); INTERNAL_STREAM_UNLOCK (demux);
gst_object_unref (rtp_pad);
gst_object_unref (rtcp_pad);
return retpad; return retpad;
} }
@ -486,17 +503,17 @@ gst_rtp_ssrc_demux_clear_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
{ {
GstRtpSsrcDemuxPad *dpad; GstRtpSsrcDemuxPad *dpad;
INTERNAL_STREAM_LOCK (demux); GST_OBJECT_LOCK (demux);
dpad = find_demux_pad_for_ssrc (demux, ssrc); dpad = find_demux_pad_for_ssrc (demux, ssrc);
if (dpad == NULL) { if (dpad == NULL) {
INTERNAL_STREAM_UNLOCK (demux); GST_OBJECT_UNLOCK (demux);
goto unknown_pad; goto unknown_pad;
} }
GST_DEBUG_OBJECT (demux, "clearing pad for SSRC %08x", ssrc); GST_DEBUG_OBJECT (demux, "clearing pad for SSRC %08x", ssrc);
demux->srcpads = g_slist_remove (demux->srcpads, dpad); demux->srcpads = g_slist_remove (demux->srcpads, dpad);
INTERNAL_STREAM_UNLOCK (demux); GST_OBJECT_UNLOCK (demux);
gst_pad_set_active (dpad->rtp_pad, FALSE); gst_pad_set_active (dpad->rtp_pad, FALSE);
gst_pad_set_active (dpad->rtcp_pad, FALSE); gst_pad_set_active (dpad->rtcp_pad, FALSE);
@ -535,7 +552,7 @@ forward_event (GstPad * pad, gpointer user_data)
GSList *walk = NULL; GSList *walk = NULL;
GstEvent *newevent = NULL; GstEvent *newevent = NULL;
INTERNAL_STREAM_LOCK (fdata->demux); GST_OBJECT_LOCK (fdata->demux);
for (walk = fdata->demux->srcpads; walk; walk = walk->next) { for (walk = fdata->demux->srcpads; walk; walk = walk->next) {
GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data; GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
@ -544,7 +561,7 @@ forward_event (GstPad * pad, gpointer user_data)
break; break;
} }
} }
INTERNAL_STREAM_UNLOCK (fdata->demux); GST_OBJECT_UNLOCK (fdata->demux);
if (newevent) if (newevent)
fdata->res &= gst_pad_push_event (pad, newevent); fdata->res &= gst_pad_push_event (pad, newevent);
@ -582,7 +599,6 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
guint32 ssrc; guint32 ssrc;
GstRTPBuffer rtp = { NULL }; GstRTPBuffer rtp = { NULL };
GstPad *srcpad; GstPad *srcpad;
GstRtpSsrcDemuxPad *dpad;
demux = GST_RTP_SSRC_DEMUX (parent); demux = GST_RTP_SSRC_DEMUX (parent);
@ -602,14 +618,17 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
ret = gst_pad_push (srcpad, buf); ret = gst_pad_push (srcpad, buf);
if (ret != GST_FLOW_OK) { if (ret != GST_FLOW_OK) {
GstPad *active_pad;
/* check if the ssrc still there, may have been removed */ /* check if the ssrc still there, may have been removed */
INTERNAL_STREAM_LOCK (demux); active_pad = get_demux_pad_for_ssrc (demux, ssrc, RTP_PAD);
dpad = find_demux_pad_for_ssrc (demux, ssrc);
if (dpad == NULL || dpad->rtp_pad != srcpad) { if (active_pad == NULL || active_pad != srcpad) {
/* SSRC was removed during the push ... ignore the error */ /* SSRC was removed during the push ... ignore the error */
ret = GST_FLOW_OK; ret = GST_FLOW_OK;
} }
INTERNAL_STREAM_UNLOCK (demux);
g_clear_object (&active_pad);
} }
gst_object_unref (srcpad); gst_object_unref (srcpad);
@ -644,7 +663,6 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent,
GstRTCPPacket packet; GstRTCPPacket packet;
GstRTCPBuffer rtcp = { NULL, }; GstRTCPBuffer rtcp = { NULL, };
GstPad *srcpad; GstPad *srcpad;
GstRtpSsrcDemuxPad *dpad;
demux = GST_RTP_SSRC_DEMUX (parent); demux = GST_RTP_SSRC_DEMUX (parent);
@ -689,14 +707,16 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent,
ret = gst_pad_push (srcpad, buf); ret = gst_pad_push (srcpad, buf);
if (ret != GST_FLOW_OK) { if (ret != GST_FLOW_OK) {
GstPad *active_pad;
/* check if the ssrc still there, may have been removed */ /* check if the ssrc still there, may have been removed */
INTERNAL_STREAM_LOCK (demux); active_pad = get_demux_pad_for_ssrc (demux, ssrc, RTCP_PAD);
dpad = find_demux_pad_for_ssrc (demux, ssrc); if (active_pad == NULL || active_pad != srcpad) {
if (dpad == NULL || dpad->rtcp_pad != srcpad) {
/* SSRC was removed during the push ... ignore the error */ /* SSRC was removed during the push ... ignore the error */
ret = GST_FLOW_OK; ret = GST_FLOW_OK;
} }
INTERNAL_STREAM_UNLOCK (demux);
g_clear_object (&active_pad);
} }
gst_object_unref (srcpad); gst_object_unref (srcpad);
@ -786,7 +806,7 @@ gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad, GstObject * parent)
demux = GST_RTP_SSRC_DEMUX (parent); demux = GST_RTP_SSRC_DEMUX (parent);
INTERNAL_STREAM_LOCK (demux); GST_OBJECT_LOCK (demux);
for (current = demux->srcpads; current; current = g_slist_next (current)) { for (current = demux->srcpads; current; current = g_slist_next (current)) {
GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) current->data; GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) current->data;
@ -807,7 +827,7 @@ gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad, GstObject * parent)
g_value_unset (&val); g_value_unset (&val);
} }
INTERNAL_STREAM_UNLOCK (demux); GST_OBJECT_UNLOCK (demux);
return it; return it;
} }

View file

@ -174,6 +174,72 @@ GST_START_TEST (test_event_forwarding)
GST_END_TEST; GST_END_TEST;
typedef struct
{
gint ready;
GMutex mutex;
GCond cond;
} LockTestContext;
static void
new_ssrc_pad_cb (GstElement * element, guint ssrc, GstPad * pad,
LockTestContext * ctx)
{
g_message ("Signalling ready");
g_atomic_int_set (&ctx->ready, 1);
g_message ("Waiting no more ready");
while (g_atomic_int_get (&ctx->ready))
g_usleep (G_USEC_PER_SEC / 100);
g_mutex_lock (&ctx->mutex);
g_mutex_unlock (&ctx->mutex);
}
static gpointer
push_buffer_func (gpointer user_data)
{
GstHarness *h = user_data;
gst_harness_push (h, create_buffer (0, 0xdeadbeef));
return NULL;
}
GST_START_TEST (test_oob_event_locking)
{
GstHarness *h = gst_harness_new_with_padnames ("rtpssrcdemux", "sink", NULL);
LockTestContext ctx = { FALSE, };
GThread *thread;
g_mutex_init (&ctx.mutex);
g_cond_init (&ctx.cond);
gst_harness_set_src_caps_str (h, "application/x-rtp");
g_signal_connect (h->element,
"new-ssrc-pad", G_CALLBACK (new_ssrc_pad_cb), &ctx);
thread = g_thread_new ("streaming-thread", push_buffer_func, h);
g_mutex_lock (&ctx.mutex);
g_message ("Waiting for ready");
while (!g_atomic_int_get (&ctx.ready))
g_usleep (G_USEC_PER_SEC / 100);
g_message ("Signal no more ready");
g_atomic_int_set (&ctx.ready, 0);
gst_harness_push_event (h,
gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_OOB, NULL));
g_mutex_unlock (&ctx.mutex);
g_thread_join (thread);
g_mutex_clear (&ctx.mutex);
g_cond_clear (&ctx.cond);
gst_harness_teardown (h);
}
GST_END_TEST;
static Suite * static Suite *
rtpssrcdemux_suite (void) rtpssrcdemux_suite (void)
{ {
@ -182,6 +248,7 @@ rtpssrcdemux_suite (void)
suite_add_tcase (s, tc_chain); suite_add_tcase (s, tc_chain);
tcase_add_test (tc_chain, test_event_forwarding); tcase_add_test (tc_chain, test_event_forwarding);
tcase_add_test (tc_chain, test_oob_event_locking);
return s; return s;
} }