From 56d583228740f37745db800a010b3f7b4ce6f164 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 17 Sep 2007 02:01:41 +0000 Subject: [PATCH] gst/rtpmanager/gstrtpbin.c: Link to the right pads regardless of which one was created first in the ssrc demuxer. Original commit message from CVS: * gst/rtpmanager/gstrtpbin.c: (new_ssrc_pad_found): Link to the right pads regardless of which one was created first in the ssrc demuxer. * gst/rtpmanager/gstrtpjitterbuffer.c: (gst_rtp_jitter_buffer_chain), (gst_rtp_jitter_buffer_loop): * gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_process_rtp), (gst_rtp_session_chain_recv_rtp), (gst_rtp_session_chain_send_rtp): * gst/rtpmanager/rtpsource.c: (calculate_jitter): Improve debugging. * gst/rtpmanager/gstrtpssrcdemux.c: (create_demux_pad_for_ssrc), (gst_rtp_ssrc_demux_init), (gst_rtp_ssrc_demux_finalize), (gst_rtp_ssrc_demux_sink_event), (gst_rtp_ssrc_demux_rtcp_sink_event), (gst_rtp_ssrc_demux_chain), (gst_rtp_ssrc_demux_rtcp_chain), (gst_rtp_ssrc_demux_internal_links): * gst/rtpmanager/gstrtpssrcdemux.h: Fix race in creating the RTP and RTCP pads when a new SSRC is detected. --- gst/rtpmanager/gstrtpbin.c | 7 ++++-- gst/rtpmanager/gstrtpjitterbuffer.c | 6 ++--- gst/rtpmanager/gstrtpsession.c | 27 ++++++++++++++++++++-- gst/rtpmanager/gstrtpssrcdemux.c | 36 +++++++++++++++-------------- gst/rtpmanager/gstrtpssrcdemux.h | 2 ++ gst/rtpmanager/rtpsource.c | 2 ++ 6 files changed, 56 insertions(+), 24 deletions(-) diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c index cdbdaf6422..035f4cc909 100644 --- a/gst/rtpmanager/gstrtpbin.c +++ b/gst/rtpmanager/gstrtpbin.c @@ -1404,8 +1404,11 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad, /* get pad and link */ GST_DEBUG_OBJECT (session->bin, "linking jitterbuffer"); + padname = g_strdup_printf ("src_%d", ssrc); + srcpad = gst_element_get_pad (element, padname); + g_free (padname); sinkpad = gst_element_get_static_pad (stream->buffer, "sink"); - gst_pad_link (pad, sinkpad); + gst_pad_link (srcpad, sinkpad); gst_object_unref (sinkpad); /* get the RTCP sync pad */ @@ -1434,7 +1437,7 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad, no_stream: { GST_RTP_SESSION_UNLOCK (session); - GST_DEBUG ("could not create stream"); + GST_DEBUG_OBJECT (session->bin, "could not create stream"); return; } } diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index 57be42ec56..d27c2aa1ce 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -797,6 +797,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer) guint16 seqnum; GstFlowReturn ret = GST_FLOW_OK; GstClockTime timestamp; + guint64 latency_ts; jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad)); @@ -849,7 +850,6 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer) * latency is set, we just pump it in the queue and let the other end push it * out as fast as possible. */ if (priv->latency_ms && priv->drop_on_latency) { - guint64 latency_ts; latency_ts = gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000); @@ -1053,8 +1053,8 @@ again: if (priv->next_seqnum != -1) { /* we expected next_seqnum but received something else, that's a gap */ GST_WARNING_OBJECT (jitterbuffer, - "Sequence number GAP detected -> %d instead of %d", priv->next_seqnum, - seqnum); + "Sequence number GAP detected: expected %d instead of %d", + priv->next_seqnum, seqnum); } else { /* we don't know what the next_seqnum should be, wait for the last * possible moment to push this buffer, maybe we get an earlier seqnum diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c index 6d4cf8b031..83210ff8ab 100644 --- a/gst/rtpmanager/gstrtpsession.c +++ b/gst/rtpmanager/gstrtpsession.c @@ -781,11 +781,11 @@ gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src, rtpsession = GST_RTP_SESSION (user_data); priv = rtpsession->priv; - GST_DEBUG_OBJECT (rtpsession, "reading receiving RTP packet"); - if (rtpsession->recv_rtp_src) { + GST_DEBUG_OBJECT (rtpsession, "pushing received RTP packet"); result = gst_pad_push (rtpsession->recv_rtp_src, buffer); } else { + GST_DEBUG_OBJECT (rtpsession, "dropping received RTP packet"); gst_buffer_unref (buffer); result = GST_FLOW_OK; } @@ -1114,10 +1114,22 @@ gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer) } ret = rtp_session_process_rtp (priv->session, buffer, ntpnstime); + if (ret != GST_FLOW_OK) + goto push_error; + +done: gst_object_unref (rtpsession); return ret; + + /* ERRORS */ +push_error: + { + GST_DEBUG_OBJECT (rtpsession, "process returned %s", + gst_flow_get_name (ret)); + goto done; + } } static GstFlowReturn @@ -1286,10 +1298,21 @@ gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer) } ret = rtp_session_send_rtp (priv->session, buffer, ntpnstime); + if (ret != GST_FLOW_OK) + goto push_error; +done: gst_object_unref (rtpsession); return ret; + + /* ERRORS */ +push_error: + { + GST_DEBUG_OBJECT (rtpsession, "process returned %s", + gst_flow_get_name (ret)); + goto done; + } } /* Create sinkpad to receive RTP packets from senders. This will also create a diff --git a/gst/rtpmanager/gstrtpssrcdemux.c b/gst/rtpmanager/gstrtpssrcdemux.c index 2f1e9e67a2..a5956d1cf4 100644 --- a/gst/rtpmanager/gstrtpssrcdemux.c +++ b/gst/rtpmanager/gstrtpssrcdemux.c @@ -96,6 +96,9 @@ static GstElementDetails gst_rtp_ssrc_demux_details = { "Wim Taymans " }; +#define GST_PAD_LOCK(obj) (g_mutex_lock ((obj)->padlock)) +#define GST_PAD_UNLOCK(obj) (g_mutex_unlock ((obj)->padlock)) + /* signals */ enum { @@ -159,6 +162,7 @@ find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) return NULL; } +/* with PAD_LOCK */ static GstRtpSsrcDemuxPad * create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, GstClockTime timestamp) @@ -202,9 +206,6 @@ create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad); - /* unlock to perform the remainder and to fire our signal */ - GST_OBJECT_UNLOCK (demux); - /* copy caps from input */ gst_pad_set_caps (rtp_pad, GST_PAD_CAPS (demux->rtp_sink)); gst_pad_use_fixed_caps (rtp_pad); @@ -227,8 +228,6 @@ create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, g_signal_emit (G_OBJECT (demux), gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, rtp_pad); - GST_OBJECT_LOCK (demux); - return demuxpad; } @@ -304,6 +303,8 @@ gst_rtp_ssrc_demux_init (GstRtpSsrcDemux * demux, gst_rtp_ssrc_demux_rtcp_sink_event); gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtcp_sink); + demux->padlock = g_mutex_new (); + gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED); } @@ -327,6 +328,7 @@ gst_rtp_ssrc_demux_finalize (GObject * object) GstRtpSsrcDemux *demux; demux = GST_RTP_SSRC_DEMUX (object); + g_mutex_free (demux->padlock); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -349,14 +351,14 @@ gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event) GSList *walk; res = TRUE; - GST_OBJECT_LOCK (demux); + GST_PAD_LOCK (demux); for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) { GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data; gst_event_ref (event); res &= gst_pad_push_event (pad->rtp_pad, event); } - GST_OBJECT_UNLOCK (demux); + GST_PAD_UNLOCK (demux); gst_event_unref (event); break; } @@ -381,13 +383,13 @@ gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad, GstEvent * event) GSList *walk; res = TRUE; - GST_OBJECT_LOCK (demux); + GST_PAD_LOCK (demux); for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) { GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data; res &= gst_pad_push_event (pad->rtcp_pad, event); } - GST_OBJECT_UNLOCK (demux); + GST_PAD_UNLOCK (demux); break; } } @@ -412,7 +414,7 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf) GST_DEBUG_OBJECT (demux, "received buffer of SSRC %08x", ssrc); - GST_OBJECT_LOCK (demux); + GST_PAD_LOCK (demux); dpad = find_demux_pad_for_ssrc (demux, ssrc); if (dpad == NULL) { if (!(dpad = @@ -420,7 +422,7 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf) GST_BUFFER_TIMESTAMP (buf)))) goto create_failed; } - GST_OBJECT_UNLOCK (demux); + GST_PAD_UNLOCK (demux); /* push to srcpad */ ret = gst_pad_push (dpad->rtp_pad, buf); @@ -440,7 +442,7 @@ create_failed: { GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL), ("Could not create new pad")); - GST_OBJECT_UNLOCK (demux); + GST_PAD_UNLOCK (demux); gst_buffer_unref (buf); return GST_FLOW_ERROR; } @@ -479,14 +481,14 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstBuffer * buf) GST_DEBUG_OBJECT (demux, "received RTCP of SSRC %08x", ssrc); - GST_OBJECT_LOCK (demux); + GST_PAD_LOCK (demux); dpad = find_demux_pad_for_ssrc (demux, ssrc); if (dpad == NULL) { GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc); if (!(dpad = create_demux_pad_for_ssrc (demux, ssrc, -1))) goto create_failed; } - GST_OBJECT_UNLOCK (demux); + GST_PAD_UNLOCK (demux); /* push to srcpad */ ret = gst_pad_push (dpad->rtcp_pad, buf); @@ -506,7 +508,7 @@ create_failed: { GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL), ("Could not create new pad")); - GST_OBJECT_UNLOCK (demux); + GST_PAD_UNLOCK (demux); gst_buffer_unref (buf); return GST_FLOW_ERROR; } @@ -539,7 +541,7 @@ gst_rtp_ssrc_demux_internal_links (GstPad * pad) demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad)); - GST_OBJECT_LOCK (demux); + GST_PAD_LOCK (demux); for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) { GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data; @@ -555,7 +557,7 @@ gst_rtp_ssrc_demux_internal_links (GstPad * pad) break; } } - GST_OBJECT_UNLOCK (demux); + GST_PAD_UNLOCK (demux); gst_object_unref (demux); return res; diff --git a/gst/rtpmanager/gstrtpssrcdemux.h b/gst/rtpmanager/gstrtpssrcdemux.h index 88aeed8038..10c569d9ce 100644 --- a/gst/rtpmanager/gstrtpssrcdemux.h +++ b/gst/rtpmanager/gstrtpssrcdemux.h @@ -40,6 +40,8 @@ struct _GstRtpSsrcDemux GstPad *rtp_sink; GstPad *rtcp_sink; + + GMutex *padlock; GSList *srcpads; }; diff --git a/gst/rtpmanager/rtpsource.c b/gst/rtpmanager/rtpsource.c index 4ffc6bbcba..1d7eb24a06 100644 --- a/gst/rtpmanager/rtpsource.c +++ b/gst/rtpmanager/rtpsource.c @@ -284,6 +284,8 @@ calculate_jitter (RTPSource * src, GstBuffer * buffer, pt = gst_rtp_buffer_get_payload_type (buffer); + GST_DEBUG ("SSRC %08x got payload %d", src->ssrc, pt); + /* get clockrate */ if ((clock_rate = get_clock_rate (src, pt)) == -1) goto no_clock_rate;