rtpssrcdemux: Only forward stick events while holding the sinkpad stream lock

Otherwise we get a race where if the RTCP packet comes in first and while
it is added the pads, the segment event arrives on the RTP stream, the event
may be lost completely and never forwarded.
This commit is contained in:
Olivier Crête 2013-04-02 23:42:23 -04:00
parent 76679f9ae9
commit 6f3734c305

View file

@ -145,6 +145,9 @@ struct _GstRtpSsrcDemuxPad
GstPad *rtp_pad; GstPad *rtp_pad;
GstCaps *caps; GstCaps *caps;
GstPad *rtcp_pad; GstPad *rtcp_pad;
gboolean pushed_initial_rtp_events;
gboolean pushed_initial_rtcp_events;
}; };
/* find a src pad for a given SSRC, returns NULL if the SSRC was not found /* find a src pad for a given SSRC, returns NULL if the SSRC was not found
@ -210,6 +213,25 @@ forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
return TRUE; return TRUE;
} }
static void
forward_initial_events (GstRtpSsrcDemux * demux, guint32 ssrc, GstPad * pad,
PadType padtype)
{
struct ForwardStickyEventData fdata;
GstPad *sinkpad;
if (padtype == RTP_PAD)
sinkpad = demux->rtp_sink;
else if (padtype == RTCP_PAD)
sinkpad = demux->rtcp_sink;
else
g_assert_not_reached ();
fdata.ssrc = ssrc;
fdata.pad = pad;
gst_pad_sticky_events_foreach (sinkpad, forward_sticky_events, &fdata);
}
static GstPad * static GstPad *
find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
@ -220,31 +242,44 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
GstPadTemplate *templ; GstPadTemplate *templ;
gchar *padname; gchar *padname;
GstRtpSsrcDemuxPad *demuxpad; GstRtpSsrcDemuxPad *demuxpad;
struct ForwardStickyEventData fdata;
GstPad *retpad; GstPad *retpad;
gulong rtp_block, rtcp_block; gulong rtp_block, rtcp_block;
GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc);
GST_PAD_LOCK (demux); GST_PAD_LOCK (demux);
demuxpad = find_demux_pad_for_ssrc (demux, ssrc); demuxpad = find_demux_pad_for_ssrc (demux, ssrc);
if (demuxpad != NULL) { if (demuxpad != NULL) {
gboolean forward = FALSE;
switch (padtype) { switch (padtype) {
case RTP_PAD: case RTP_PAD:
retpad = gst_object_ref (demuxpad->rtp_pad); retpad = gst_object_ref (demuxpad->rtp_pad);
if (!demuxpad->pushed_initial_rtp_events) {
forward = TRUE;
demuxpad->pushed_initial_rtp_events = TRUE;
}
break; break;
case RTCP_PAD: case RTCP_PAD:
retpad = gst_object_ref (demuxpad->rtcp_pad); retpad = gst_object_ref (demuxpad->rtcp_pad);
if (!demuxpad->pushed_initial_rtcp_events) {
forward = TRUE;
demuxpad->pushed_initial_rtcp_events = TRUE;
}
break; break;
default: default:
retpad = NULL; retpad = NULL;
g_assert_not_reached (); g_assert_not_reached ();
} }
GST_PAD_UNLOCK (demux); GST_PAD_UNLOCK (demux);
if (forward)
forward_initial_events (demux, ssrc, retpad, padtype);
return retpad; return retpad;
} }
GST_DEBUG_OBJECT (demux, "creating new pad for SSRC %08x", ssrc);
klass = GST_ELEMENT_GET_CLASS (demux); klass = GST_ELEMENT_GET_CLASS (demux);
templ = gst_element_class_get_pad_template (klass, "src_%u"); templ = gst_element_class_get_pad_template (klass, "src_%u");
padname = g_strdup_printf ("src_%u", ssrc); padname = g_strdup_printf ("src_%u", ssrc);
@ -262,8 +297,6 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
demuxpad->rtp_pad = rtp_pad; demuxpad->rtp_pad = rtp_pad;
demuxpad->rtcp_pad = rtcp_pad; demuxpad->rtcp_pad = rtcp_pad;
fdata.ssrc = ssrc;
gst_pad_set_element_private (rtp_pad, demuxpad); gst_pad_set_element_private (rtp_pad, demuxpad);
gst_pad_set_element_private (rtcp_pad, demuxpad); gst_pad_set_element_private (rtcp_pad, demuxpad);
@ -275,18 +308,22 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
gst_pad_set_event_function (rtp_pad, gst_rtp_ssrc_demux_src_event); gst_pad_set_event_function (rtp_pad, gst_rtp_ssrc_demux_src_event);
gst_pad_use_fixed_caps (rtp_pad); gst_pad_use_fixed_caps (rtp_pad);
gst_pad_set_active (rtp_pad, TRUE); gst_pad_set_active (rtp_pad, TRUE);
fdata.pad = rtp_pad;
gst_pad_sticky_events_foreach (demux->rtp_sink, forward_sticky_events,
&fdata);
gst_pad_set_event_function (rtcp_pad, gst_rtp_ssrc_demux_src_event); gst_pad_set_event_function (rtcp_pad, gst_rtp_ssrc_demux_src_event);
gst_pad_set_iterate_internal_links_function (rtcp_pad, gst_pad_set_iterate_internal_links_function (rtcp_pad,
gst_rtp_ssrc_demux_iterate_internal_links_src); gst_rtp_ssrc_demux_iterate_internal_links_src);
gst_pad_use_fixed_caps (rtcp_pad); gst_pad_use_fixed_caps (rtcp_pad);
gst_pad_set_active (rtcp_pad, TRUE); gst_pad_set_active (rtcp_pad, TRUE);
fdata.pad = rtcp_pad;
gst_pad_sticky_events_foreach (demux->rtcp_sink, forward_sticky_events, if (padtype == RTP_PAD) {
&fdata); demuxpad->pushed_initial_rtp_events = TRUE;
forward_initial_events (demux, ssrc, rtp_pad, padtype);
} else if (padtype == RTCP_PAD) {
demuxpad->pushed_initial_rtcp_events = TRUE;
forward_initial_events (demux, ssrc, rtcp_pad, padtype);
} else {
g_assert_not_reached ();
}
gst_element_add_pad (GST_ELEMENT_CAST (demux), rtp_pad); gst_element_add_pad (GST_ELEMENT_CAST (demux), rtp_pad);
gst_element_add_pad (GST_ELEMENT_CAST (demux), rtcp_pad); gst_element_add_pad (GST_ELEMENT_CAST (demux), rtcp_pad);
@ -531,7 +568,11 @@ forward_event (GstPad * pad, gpointer user_data)
for (walk = fdata->demux->srcpads; walk; walk = walk->next) { for (walk = fdata->demux->srcpads; walk; walk = walk->next) {
GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data; GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
if (pad == dpad->rtp_pad || pad == dpad->rtcp_pad) { /* Only forward the event if the initial events have been through first,
* the initial events should be forwarded before any other event
* or buffer is pushed */
if ((pad == dpad->rtp_pad && dpad->pushed_initial_rtp_events) ||
(pad == dpad->rtcp_pad && dpad->pushed_initial_rtcp_events)) {
newevent = add_ssrc_and_ref (fdata->event, dpad->ssrc); newevent = add_ssrc_and_ref (fdata->event, dpad->ssrc);
break; break;
} }