mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-11-23 10:11:08 +00:00
gst/rtpmanager/gstrtpbin.c: Link to the right pads regardless of which one was created first in the ssrc demuxer.
Original commit message from CVS: * gst/rtpmanager/gstrtpbin.c: (new_ssrc_pad_found): Link to the right pads regardless of which one was created first in the ssrc demuxer. * gst/rtpmanager/gstrtpjitterbuffer.c: (gst_rtp_jitter_buffer_chain), (gst_rtp_jitter_buffer_loop): * gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_process_rtp), (gst_rtp_session_chain_recv_rtp), (gst_rtp_session_chain_send_rtp): * gst/rtpmanager/rtpsource.c: (calculate_jitter): Improve debugging. * gst/rtpmanager/gstrtpssrcdemux.c: (create_demux_pad_for_ssrc), (gst_rtp_ssrc_demux_init), (gst_rtp_ssrc_demux_finalize), (gst_rtp_ssrc_demux_sink_event), (gst_rtp_ssrc_demux_rtcp_sink_event), (gst_rtp_ssrc_demux_chain), (gst_rtp_ssrc_demux_rtcp_chain), (gst_rtp_ssrc_demux_internal_links): * gst/rtpmanager/gstrtpssrcdemux.h: Fix race in creating the RTP and RTCP pads when a new SSRC is detected.
This commit is contained in:
parent
b2aa36cb0d
commit
56d5832287
6 changed files with 56 additions and 24 deletions
|
@ -1404,8 +1404,11 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
|
||||||
|
|
||||||
/* get pad and link */
|
/* get pad and link */
|
||||||
GST_DEBUG_OBJECT (session->bin, "linking jitterbuffer");
|
GST_DEBUG_OBJECT (session->bin, "linking jitterbuffer");
|
||||||
|
padname = g_strdup_printf ("src_%d", ssrc);
|
||||||
|
srcpad = gst_element_get_pad (element, padname);
|
||||||
|
g_free (padname);
|
||||||
sinkpad = gst_element_get_static_pad (stream->buffer, "sink");
|
sinkpad = gst_element_get_static_pad (stream->buffer, "sink");
|
||||||
gst_pad_link (pad, sinkpad);
|
gst_pad_link (srcpad, sinkpad);
|
||||||
gst_object_unref (sinkpad);
|
gst_object_unref (sinkpad);
|
||||||
|
|
||||||
/* get the RTCP sync pad */
|
/* get the RTCP sync pad */
|
||||||
|
@ -1434,7 +1437,7 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
|
||||||
no_stream:
|
no_stream:
|
||||||
{
|
{
|
||||||
GST_RTP_SESSION_UNLOCK (session);
|
GST_RTP_SESSION_UNLOCK (session);
|
||||||
GST_DEBUG ("could not create stream");
|
GST_DEBUG_OBJECT (session->bin, "could not create stream");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -797,6 +797,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
|
||||||
guint16 seqnum;
|
guint16 seqnum;
|
||||||
GstFlowReturn ret = GST_FLOW_OK;
|
GstFlowReturn ret = GST_FLOW_OK;
|
||||||
GstClockTime timestamp;
|
GstClockTime timestamp;
|
||||||
|
guint64 latency_ts;
|
||||||
|
|
||||||
jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
|
jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
|
||||||
|
|
||||||
|
@ -849,7 +850,6 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
|
||||||
* latency is set, we just pump it in the queue and let the other end push it
|
* latency is set, we just pump it in the queue and let the other end push it
|
||||||
* out as fast as possible. */
|
* out as fast as possible. */
|
||||||
if (priv->latency_ms && priv->drop_on_latency) {
|
if (priv->latency_ms && priv->drop_on_latency) {
|
||||||
guint64 latency_ts;
|
|
||||||
|
|
||||||
latency_ts =
|
latency_ts =
|
||||||
gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
|
gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
|
||||||
|
@ -1053,8 +1053,8 @@ again:
|
||||||
if (priv->next_seqnum != -1) {
|
if (priv->next_seqnum != -1) {
|
||||||
/* we expected next_seqnum but received something else, that's a gap */
|
/* we expected next_seqnum but received something else, that's a gap */
|
||||||
GST_WARNING_OBJECT (jitterbuffer,
|
GST_WARNING_OBJECT (jitterbuffer,
|
||||||
"Sequence number GAP detected -> %d instead of %d", priv->next_seqnum,
|
"Sequence number GAP detected: expected %d instead of %d",
|
||||||
seqnum);
|
priv->next_seqnum, seqnum);
|
||||||
} else {
|
} else {
|
||||||
/* we don't know what the next_seqnum should be, wait for the last
|
/* we don't know what the next_seqnum should be, wait for the last
|
||||||
* possible moment to push this buffer, maybe we get an earlier seqnum
|
* possible moment to push this buffer, maybe we get an earlier seqnum
|
||||||
|
|
|
@ -781,11 +781,11 @@ gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src,
|
||||||
rtpsession = GST_RTP_SESSION (user_data);
|
rtpsession = GST_RTP_SESSION (user_data);
|
||||||
priv = rtpsession->priv;
|
priv = rtpsession->priv;
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (rtpsession, "reading receiving RTP packet");
|
|
||||||
|
|
||||||
if (rtpsession->recv_rtp_src) {
|
if (rtpsession->recv_rtp_src) {
|
||||||
|
GST_DEBUG_OBJECT (rtpsession, "pushing received RTP packet");
|
||||||
result = gst_pad_push (rtpsession->recv_rtp_src, buffer);
|
result = gst_pad_push (rtpsession->recv_rtp_src, buffer);
|
||||||
} else {
|
} else {
|
||||||
|
GST_DEBUG_OBJECT (rtpsession, "dropping received RTP packet");
|
||||||
gst_buffer_unref (buffer);
|
gst_buffer_unref (buffer);
|
||||||
result = GST_FLOW_OK;
|
result = GST_FLOW_OK;
|
||||||
}
|
}
|
||||||
|
@ -1114,10 +1114,22 @@ gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer)
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = rtp_session_process_rtp (priv->session, buffer, ntpnstime);
|
ret = rtp_session_process_rtp (priv->session, buffer, ntpnstime);
|
||||||
|
if (ret != GST_FLOW_OK)
|
||||||
|
goto push_error;
|
||||||
|
|
||||||
|
|
||||||
|
done:
|
||||||
gst_object_unref (rtpsession);
|
gst_object_unref (rtpsession);
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
|
|
||||||
|
/* ERRORS */
|
||||||
|
push_error:
|
||||||
|
{
|
||||||
|
GST_DEBUG_OBJECT (rtpsession, "process returned %s",
|
||||||
|
gst_flow_get_name (ret));
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static GstFlowReturn
|
static GstFlowReturn
|
||||||
|
@ -1286,10 +1298,21 @@ gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer)
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = rtp_session_send_rtp (priv->session, buffer, ntpnstime);
|
ret = rtp_session_send_rtp (priv->session, buffer, ntpnstime);
|
||||||
|
if (ret != GST_FLOW_OK)
|
||||||
|
goto push_error;
|
||||||
|
|
||||||
|
done:
|
||||||
gst_object_unref (rtpsession);
|
gst_object_unref (rtpsession);
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
|
|
||||||
|
/* ERRORS */
|
||||||
|
push_error:
|
||||||
|
{
|
||||||
|
GST_DEBUG_OBJECT (rtpsession, "process returned %s",
|
||||||
|
gst_flow_get_name (ret));
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Create sinkpad to receive RTP packets from senders. This will also create a
|
/* Create sinkpad to receive RTP packets from senders. This will also create a
|
||||||
|
|
|
@ -96,6 +96,9 @@ static GstElementDetails gst_rtp_ssrc_demux_details = {
|
||||||
"Wim Taymans <wim@fluendo.com>"
|
"Wim Taymans <wim@fluendo.com>"
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#define GST_PAD_LOCK(obj) (g_mutex_lock ((obj)->padlock))
|
||||||
|
#define GST_PAD_UNLOCK(obj) (g_mutex_unlock ((obj)->padlock))
|
||||||
|
|
||||||
/* signals */
|
/* signals */
|
||||||
enum
|
enum
|
||||||
{
|
{
|
||||||
|
@ -159,6 +162,7 @@ find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* with PAD_LOCK */
|
||||||
static GstRtpSsrcDemuxPad *
|
static GstRtpSsrcDemuxPad *
|
||||||
create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
|
create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
|
||||||
GstClockTime timestamp)
|
GstClockTime timestamp)
|
||||||
|
@ -202,9 +206,6 @@ create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
|
||||||
|
|
||||||
demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad);
|
demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad);
|
||||||
|
|
||||||
/* unlock to perform the remainder and to fire our signal */
|
|
||||||
GST_OBJECT_UNLOCK (demux);
|
|
||||||
|
|
||||||
/* copy caps from input */
|
/* copy caps from input */
|
||||||
gst_pad_set_caps (rtp_pad, GST_PAD_CAPS (demux->rtp_sink));
|
gst_pad_set_caps (rtp_pad, GST_PAD_CAPS (demux->rtp_sink));
|
||||||
gst_pad_use_fixed_caps (rtp_pad);
|
gst_pad_use_fixed_caps (rtp_pad);
|
||||||
|
@ -227,8 +228,6 @@ create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
|
||||||
g_signal_emit (G_OBJECT (demux),
|
g_signal_emit (G_OBJECT (demux),
|
||||||
gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, rtp_pad);
|
gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, rtp_pad);
|
||||||
|
|
||||||
GST_OBJECT_LOCK (demux);
|
|
||||||
|
|
||||||
return demuxpad;
|
return demuxpad;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -304,6 +303,8 @@ gst_rtp_ssrc_demux_init (GstRtpSsrcDemux * demux,
|
||||||
gst_rtp_ssrc_demux_rtcp_sink_event);
|
gst_rtp_ssrc_demux_rtcp_sink_event);
|
||||||
gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtcp_sink);
|
gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtcp_sink);
|
||||||
|
|
||||||
|
demux->padlock = g_mutex_new ();
|
||||||
|
|
||||||
gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
|
gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -327,6 +328,7 @@ gst_rtp_ssrc_demux_finalize (GObject * object)
|
||||||
GstRtpSsrcDemux *demux;
|
GstRtpSsrcDemux *demux;
|
||||||
|
|
||||||
demux = GST_RTP_SSRC_DEMUX (object);
|
demux = GST_RTP_SSRC_DEMUX (object);
|
||||||
|
g_mutex_free (demux->padlock);
|
||||||
|
|
||||||
G_OBJECT_CLASS (parent_class)->finalize (object);
|
G_OBJECT_CLASS (parent_class)->finalize (object);
|
||||||
}
|
}
|
||||||
|
@ -349,14 +351,14 @@ gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event)
|
||||||
GSList *walk;
|
GSList *walk;
|
||||||
|
|
||||||
res = TRUE;
|
res = TRUE;
|
||||||
GST_OBJECT_LOCK (demux);
|
GST_PAD_LOCK (demux);
|
||||||
for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
|
for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
|
||||||
GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
|
GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
|
||||||
|
|
||||||
gst_event_ref (event);
|
gst_event_ref (event);
|
||||||
res &= gst_pad_push_event (pad->rtp_pad, event);
|
res &= gst_pad_push_event (pad->rtp_pad, event);
|
||||||
}
|
}
|
||||||
GST_OBJECT_UNLOCK (demux);
|
GST_PAD_UNLOCK (demux);
|
||||||
gst_event_unref (event);
|
gst_event_unref (event);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -381,13 +383,13 @@ gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad, GstEvent * event)
|
||||||
GSList *walk;
|
GSList *walk;
|
||||||
|
|
||||||
res = TRUE;
|
res = TRUE;
|
||||||
GST_OBJECT_LOCK (demux);
|
GST_PAD_LOCK (demux);
|
||||||
for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
|
for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
|
||||||
GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
|
GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
|
||||||
|
|
||||||
res &= gst_pad_push_event (pad->rtcp_pad, event);
|
res &= gst_pad_push_event (pad->rtcp_pad, event);
|
||||||
}
|
}
|
||||||
GST_OBJECT_UNLOCK (demux);
|
GST_PAD_UNLOCK (demux);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -412,7 +414,7 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf)
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (demux, "received buffer of SSRC %08x", ssrc);
|
GST_DEBUG_OBJECT (demux, "received buffer of SSRC %08x", ssrc);
|
||||||
|
|
||||||
GST_OBJECT_LOCK (demux);
|
GST_PAD_LOCK (demux);
|
||||||
dpad = find_demux_pad_for_ssrc (demux, ssrc);
|
dpad = find_demux_pad_for_ssrc (demux, ssrc);
|
||||||
if (dpad == NULL) {
|
if (dpad == NULL) {
|
||||||
if (!(dpad =
|
if (!(dpad =
|
||||||
|
@ -420,7 +422,7 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf)
|
||||||
GST_BUFFER_TIMESTAMP (buf))))
|
GST_BUFFER_TIMESTAMP (buf))))
|
||||||
goto create_failed;
|
goto create_failed;
|
||||||
}
|
}
|
||||||
GST_OBJECT_UNLOCK (demux);
|
GST_PAD_UNLOCK (demux);
|
||||||
|
|
||||||
/* push to srcpad */
|
/* push to srcpad */
|
||||||
ret = gst_pad_push (dpad->rtp_pad, buf);
|
ret = gst_pad_push (dpad->rtp_pad, buf);
|
||||||
|
@ -440,7 +442,7 @@ create_failed:
|
||||||
{
|
{
|
||||||
GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
|
GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
|
||||||
("Could not create new pad"));
|
("Could not create new pad"));
|
||||||
GST_OBJECT_UNLOCK (demux);
|
GST_PAD_UNLOCK (demux);
|
||||||
gst_buffer_unref (buf);
|
gst_buffer_unref (buf);
|
||||||
return GST_FLOW_ERROR;
|
return GST_FLOW_ERROR;
|
||||||
}
|
}
|
||||||
|
@ -479,14 +481,14 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstBuffer * buf)
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (demux, "received RTCP of SSRC %08x", ssrc);
|
GST_DEBUG_OBJECT (demux, "received RTCP of SSRC %08x", ssrc);
|
||||||
|
|
||||||
GST_OBJECT_LOCK (demux);
|
GST_PAD_LOCK (demux);
|
||||||
dpad = find_demux_pad_for_ssrc (demux, ssrc);
|
dpad = find_demux_pad_for_ssrc (demux, ssrc);
|
||||||
if (dpad == NULL) {
|
if (dpad == NULL) {
|
||||||
GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc);
|
GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc);
|
||||||
if (!(dpad = create_demux_pad_for_ssrc (demux, ssrc, -1)))
|
if (!(dpad = create_demux_pad_for_ssrc (demux, ssrc, -1)))
|
||||||
goto create_failed;
|
goto create_failed;
|
||||||
}
|
}
|
||||||
GST_OBJECT_UNLOCK (demux);
|
GST_PAD_UNLOCK (demux);
|
||||||
|
|
||||||
/* push to srcpad */
|
/* push to srcpad */
|
||||||
ret = gst_pad_push (dpad->rtcp_pad, buf);
|
ret = gst_pad_push (dpad->rtcp_pad, buf);
|
||||||
|
@ -506,7 +508,7 @@ create_failed:
|
||||||
{
|
{
|
||||||
GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
|
GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
|
||||||
("Could not create new pad"));
|
("Could not create new pad"));
|
||||||
GST_OBJECT_UNLOCK (demux);
|
GST_PAD_UNLOCK (demux);
|
||||||
gst_buffer_unref (buf);
|
gst_buffer_unref (buf);
|
||||||
return GST_FLOW_ERROR;
|
return GST_FLOW_ERROR;
|
||||||
}
|
}
|
||||||
|
@ -539,7 +541,7 @@ gst_rtp_ssrc_demux_internal_links (GstPad * pad)
|
||||||
|
|
||||||
demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
|
demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
|
||||||
|
|
||||||
GST_OBJECT_LOCK (demux);
|
GST_PAD_LOCK (demux);
|
||||||
for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
|
for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
|
||||||
GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
|
GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
|
||||||
|
|
||||||
|
@ -555,7 +557,7 @@ gst_rtp_ssrc_demux_internal_links (GstPad * pad)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
GST_OBJECT_UNLOCK (demux);
|
GST_PAD_UNLOCK (demux);
|
||||||
|
|
||||||
gst_object_unref (demux);
|
gst_object_unref (demux);
|
||||||
return res;
|
return res;
|
||||||
|
|
|
@ -40,6 +40,8 @@ struct _GstRtpSsrcDemux
|
||||||
|
|
||||||
GstPad *rtp_sink;
|
GstPad *rtp_sink;
|
||||||
GstPad *rtcp_sink;
|
GstPad *rtcp_sink;
|
||||||
|
|
||||||
|
GMutex *padlock;
|
||||||
GSList *srcpads;
|
GSList *srcpads;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -284,6 +284,8 @@ calculate_jitter (RTPSource * src, GstBuffer * buffer,
|
||||||
|
|
||||||
pt = gst_rtp_buffer_get_payload_type (buffer);
|
pt = gst_rtp_buffer_get_payload_type (buffer);
|
||||||
|
|
||||||
|
GST_DEBUG ("SSRC %08x got payload %d", src->ssrc, pt);
|
||||||
|
|
||||||
/* get clockrate */
|
/* get clockrate */
|
||||||
if ((clock_rate = get_clock_rate (src, pt)) == -1)
|
if ((clock_rate = get_clock_rate (src, pt)) == -1)
|
||||||
goto no_clock_rate;
|
goto no_clock_rate;
|
||||||
|
|
Loading…
Reference in a new issue