gstreamer/gst/rtpmanager/gstrtpssrcdemux.c
Olivier Crête 6f3734c305 rtpssrcdemux: Only forward stick events while holding the sinkpad stream lock
Otherwise we get a race where if the RTCP packet comes in first and while
it is added the pads, the segment event arrives on the RTP stream, the event
may be lost completely and never forwarded.
2013-04-02 23:42:42 -04:00

950 lines
25 KiB
C

/* GStreamer
* Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
*
* RTP SSRC demuxer
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:element-gstrtpssrcdemux
*
* gstrtpssrcdemux acts as a demuxer for RTP packets based on the SSRC of the
* packets. Its main purpose is to allow an application to easily receive and
* decode an RTP stream with multiple SSRCs.
*
* For each SSRC that is detected, a new pad will be created and the
* #GstRtpSsrcDemux::new-ssrc-pad signal will be emitted.
*
* <refsect2>
* <title>Example pipelines</title>
* |[
* gst-launch-1.0 udpsrc caps="application/x-rtp" ! gstrtpssrcdemux ! fakesink
* ]| Takes an RTP stream and send the RTP packets with the first detected SSRC
* to fakesink, discarding the other SSRCs.
* </refsect2>
*
* Last reviewed on 2007-05-28 (0.10.5)
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <string.h>
#include <gst/rtp/gstrtpbuffer.h>
#include <gst/rtp/gstrtcpbuffer.h>
#include "gstrtpbin-marshal.h"
#include "gstrtpssrcdemux.h"
GST_DEBUG_CATEGORY_STATIC (gst_rtp_ssrc_demux_debug);
#define GST_CAT_DEFAULT gst_rtp_ssrc_demux_debug
/* generic templates */
static GstStaticPadTemplate rtp_ssrc_demux_sink_template =
GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS ("application/x-rtp")
);
static GstStaticPadTemplate rtp_ssrc_demux_rtcp_sink_template =
GST_STATIC_PAD_TEMPLATE ("rtcp_sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS ("application/x-rtcp")
);
static GstStaticPadTemplate rtp_ssrc_demux_src_template =
GST_STATIC_PAD_TEMPLATE ("src_%u",
GST_PAD_SRC,
GST_PAD_SOMETIMES,
GST_STATIC_CAPS ("application/x-rtp")
);
static GstStaticPadTemplate rtp_ssrc_demux_rtcp_src_template =
GST_STATIC_PAD_TEMPLATE ("rtcp_src_%u",
GST_PAD_SRC,
GST_PAD_SOMETIMES,
GST_STATIC_CAPS ("application/x-rtcp")
);
#define GST_PAD_LOCK(obj) (g_rec_mutex_lock (&(obj)->padlock))
#define GST_PAD_UNLOCK(obj) (g_rec_mutex_unlock (&(obj)->padlock))
typedef enum
{
RTP_PAD,
RTCP_PAD
} PadType;
/* signals */
enum
{
SIGNAL_NEW_SSRC_PAD,
SIGNAL_REMOVED_SSRC_PAD,
SIGNAL_CLEAR_SSRC,
LAST_SIGNAL
};
#define gst_rtp_ssrc_demux_parent_class parent_class
G_DEFINE_TYPE (GstRtpSsrcDemux, gst_rtp_ssrc_demux, GST_TYPE_ELEMENT);
/* GObject vmethods */
static void gst_rtp_ssrc_demux_dispose (GObject * object);
static void gst_rtp_ssrc_demux_finalize (GObject * object);
/* GstElement vmethods */
static GstStateChangeReturn gst_rtp_ssrc_demux_change_state (GstElement *
element, GstStateChange transition);
static void gst_rtp_ssrc_demux_clear_ssrc (GstRtpSsrcDemux * demux,
guint32 ssrc);
/* sinkpad stuff */
static GstFlowReturn gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent,
GstBuffer * buf);
static gboolean gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstObject * parent,
GstEvent * event);
static GstFlowReturn gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad,
GstObject * parent, GstBuffer * buf);
static GstIterator *gst_rtp_ssrc_demux_iterate_internal_links_sink (GstPad *
pad, GstObject * parent);
/* srcpad stuff */
static gboolean gst_rtp_ssrc_demux_src_event (GstPad * pad, GstObject * parent,
GstEvent * event);
static GstIterator *gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad,
GstObject * parent);
static gboolean gst_rtp_ssrc_demux_src_query (GstPad * pad, GstObject * parent,
GstQuery * query);
static guint gst_rtp_ssrc_demux_signals[LAST_SIGNAL] = { 0 };
/*
* Item for storing GstPad <-> SSRC pairs.
*/
struct _GstRtpSsrcDemuxPad
{
guint32 ssrc;
GstPad *rtp_pad;
GstCaps *caps;
GstPad *rtcp_pad;
gboolean pushed_initial_rtp_events;
gboolean pushed_initial_rtcp_events;
};
/* find a src pad for a given SSRC, returns NULL if the SSRC was not found
*/
static GstRtpSsrcDemuxPad *
find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
{
GSList *walk;
for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
if (pad->ssrc == ssrc)
return pad;
}
return NULL;
}
static GstEvent *
add_ssrc_and_ref (GstEvent * event, guint32 ssrc)
{
/* Set the ssrc on the output caps */
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_CAPS:
{
GstCaps *caps;
GstCaps *newcaps;
GstStructure *s;
gst_event_parse_caps (event, &caps);
newcaps = gst_caps_copy (caps);
s = gst_caps_get_structure (newcaps, 0);
gst_structure_set (s, "ssrc", G_TYPE_UINT, ssrc, NULL);
event = gst_event_new_caps (newcaps);
gst_caps_unref (newcaps);
break;
}
default:
gst_event_ref (event);
break;
}
return event;
}
struct ForwardStickyEventData
{
GstPad *pad;
guint32 ssrc;
};
static gboolean
forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
{
struct ForwardStickyEventData *data = user_data;
GstEvent *newevent;
newevent = add_ssrc_and_ref (*event, data->ssrc);
gst_pad_push_event (data->pad, newevent);
return TRUE;
}
static void
forward_initial_events (GstRtpSsrcDemux * demux, guint32 ssrc, GstPad * pad,
PadType padtype)
{
struct ForwardStickyEventData fdata;
GstPad *sinkpad;
if (padtype == RTP_PAD)
sinkpad = demux->rtp_sink;
else if (padtype == RTCP_PAD)
sinkpad = demux->rtcp_sink;
else
g_assert_not_reached ();
fdata.ssrc = ssrc;
fdata.pad = pad;
gst_pad_sticky_events_foreach (sinkpad, forward_sticky_events, &fdata);
}
static GstPad *
find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
PadType padtype)
{
GstPad *rtp_pad, *rtcp_pad;
GstElementClass *klass;
GstPadTemplate *templ;
gchar *padname;
GstRtpSsrcDemuxPad *demuxpad;
GstPad *retpad;
gulong rtp_block, rtcp_block;
GST_PAD_LOCK (demux);
demuxpad = find_demux_pad_for_ssrc (demux, ssrc);
if (demuxpad != NULL) {
gboolean forward = FALSE;
switch (padtype) {
case RTP_PAD:
retpad = gst_object_ref (demuxpad->rtp_pad);
if (!demuxpad->pushed_initial_rtp_events) {
forward = TRUE;
demuxpad->pushed_initial_rtp_events = TRUE;
}
break;
case RTCP_PAD:
retpad = gst_object_ref (demuxpad->rtcp_pad);
if (!demuxpad->pushed_initial_rtcp_events) {
forward = TRUE;
demuxpad->pushed_initial_rtcp_events = TRUE;
}
break;
default:
retpad = NULL;
g_assert_not_reached ();
}
GST_PAD_UNLOCK (demux);
if (forward)
forward_initial_events (demux, ssrc, retpad, padtype);
return retpad;
}
GST_DEBUG_OBJECT (demux, "creating new pad for SSRC %08x", ssrc);
klass = GST_ELEMENT_GET_CLASS (demux);
templ = gst_element_class_get_pad_template (klass, "src_%u");
padname = g_strdup_printf ("src_%u", ssrc);
rtp_pad = gst_pad_new_from_template (templ, padname);
g_free (padname);
templ = gst_element_class_get_pad_template (klass, "rtcp_src_%u");
padname = g_strdup_printf ("rtcp_src_%u", ssrc);
rtcp_pad = gst_pad_new_from_template (templ, padname);
g_free (padname);
/* wrap in structure and add to list */
demuxpad = g_new0 (GstRtpSsrcDemuxPad, 1);
demuxpad->ssrc = ssrc;
demuxpad->rtp_pad = rtp_pad;
demuxpad->rtcp_pad = rtcp_pad;
gst_pad_set_element_private (rtp_pad, demuxpad);
gst_pad_set_element_private (rtcp_pad, demuxpad);
demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad);
gst_pad_set_query_function (rtp_pad, gst_rtp_ssrc_demux_src_query);
gst_pad_set_iterate_internal_links_function (rtp_pad,
gst_rtp_ssrc_demux_iterate_internal_links_src);
gst_pad_set_event_function (rtp_pad, gst_rtp_ssrc_demux_src_event);
gst_pad_use_fixed_caps (rtp_pad);
gst_pad_set_active (rtp_pad, TRUE);
gst_pad_set_event_function (rtcp_pad, gst_rtp_ssrc_demux_src_event);
gst_pad_set_iterate_internal_links_function (rtcp_pad,
gst_rtp_ssrc_demux_iterate_internal_links_src);
gst_pad_use_fixed_caps (rtcp_pad);
gst_pad_set_active (rtcp_pad, TRUE);
if (padtype == RTP_PAD) {
demuxpad->pushed_initial_rtp_events = TRUE;
forward_initial_events (demux, ssrc, rtp_pad, padtype);
} else if (padtype == RTCP_PAD) {
demuxpad->pushed_initial_rtcp_events = TRUE;
forward_initial_events (demux, ssrc, rtcp_pad, padtype);
} else {
g_assert_not_reached ();
}
gst_element_add_pad (GST_ELEMENT_CAST (demux), rtp_pad);
gst_element_add_pad (GST_ELEMENT_CAST (demux), rtcp_pad);
switch (padtype) {
case RTP_PAD:
retpad = gst_object_ref (demuxpad->rtp_pad);
break;
case RTCP_PAD:
retpad = gst_object_ref (demuxpad->rtcp_pad);
break;
default:
retpad = NULL;
g_assert_not_reached ();
}
gst_object_ref (rtp_pad);
gst_object_ref (rtcp_pad);
rtp_block = gst_pad_add_probe (rtp_pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
NULL, NULL, NULL);
rtcp_block = gst_pad_add_probe (rtcp_pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
NULL, NULL, NULL);
GST_PAD_UNLOCK (demux);
g_signal_emit (G_OBJECT (demux),
gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, rtp_pad);
gst_pad_remove_probe (rtp_pad, rtp_block);
gst_pad_remove_probe (rtcp_pad, rtcp_block);
gst_object_unref (rtp_pad);
gst_object_unref (rtcp_pad);
return retpad;
}
static void
gst_rtp_ssrc_demux_class_init (GstRtpSsrcDemuxClass * klass)
{
GObjectClass *gobject_klass;
GstElementClass *gstelement_klass;
GstRtpSsrcDemuxClass *gstrtpssrcdemux_klass;
gobject_klass = (GObjectClass *) klass;
gstelement_klass = (GstElementClass *) klass;
gstrtpssrcdemux_klass = (GstRtpSsrcDemuxClass *) klass;
gobject_klass->dispose = gst_rtp_ssrc_demux_dispose;
gobject_klass->finalize = gst_rtp_ssrc_demux_finalize;
/**
* GstRtpSsrcDemux::new-ssrc-pad:
* @demux: the object which received the signal
* @ssrc: the SSRC of the pad
* @pad: the new pad.
*
* Emited when a new SSRC pad has been created.
*/
gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD] =
g_signal_new ("new-ssrc-pad",
G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstRtpSsrcDemuxClass, new_ssrc_pad),
NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_OBJECT,
G_TYPE_NONE, 2, G_TYPE_UINT, GST_TYPE_PAD);
/**
* GstRtpSsrcDemux::removed-ssrc-pad:
* @demux: the object which received the signal
* @ssrc: the SSRC of the pad
* @pad: the removed pad.
*
* Emited when a SSRC pad has been removed.
*/
gst_rtp_ssrc_demux_signals[SIGNAL_REMOVED_SSRC_PAD] =
g_signal_new ("removed-ssrc-pad",
G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstRtpSsrcDemuxClass, removed_ssrc_pad),
NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_OBJECT,
G_TYPE_NONE, 2, G_TYPE_UINT, GST_TYPE_PAD);
/**
* GstRtpSsrcDemux::clear-ssrc:
* @demux: the object which received the signal
* @ssrc: the SSRC of the pad
*
* Action signal to remove the pad for SSRC.
*/
gst_rtp_ssrc_demux_signals[SIGNAL_CLEAR_SSRC] =
g_signal_new ("clear-ssrc",
G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
G_STRUCT_OFFSET (GstRtpSsrcDemuxClass, clear_ssrc),
NULL, NULL, gst_rtp_bin_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
gstelement_klass->change_state =
GST_DEBUG_FUNCPTR (gst_rtp_ssrc_demux_change_state);
gstrtpssrcdemux_klass->clear_ssrc =
GST_DEBUG_FUNCPTR (gst_rtp_ssrc_demux_clear_ssrc);
gst_element_class_add_pad_template (gstelement_klass,
gst_static_pad_template_get (&rtp_ssrc_demux_sink_template));
gst_element_class_add_pad_template (gstelement_klass,
gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_sink_template));
gst_element_class_add_pad_template (gstelement_klass,
gst_static_pad_template_get (&rtp_ssrc_demux_src_template));
gst_element_class_add_pad_template (gstelement_klass,
gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_src_template));
gst_element_class_set_static_metadata (gstelement_klass, "RTP SSRC Demux",
"Demux/Network/RTP",
"Splits RTP streams based on the SSRC",
"Wim Taymans <wim.taymans@gmail.com>");
GST_DEBUG_CATEGORY_INIT (gst_rtp_ssrc_demux_debug,
"rtpssrcdemux", 0, "RTP SSRC demuxer");
}
static void
gst_rtp_ssrc_demux_init (GstRtpSsrcDemux * demux)
{
GstElementClass *klass = GST_ELEMENT_GET_CLASS (demux);
demux->rtp_sink =
gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
"sink"), "sink");
gst_pad_set_chain_function (demux->rtp_sink, gst_rtp_ssrc_demux_chain);
gst_pad_set_event_function (demux->rtp_sink, gst_rtp_ssrc_demux_sink_event);
gst_pad_set_iterate_internal_links_function (demux->rtp_sink,
gst_rtp_ssrc_demux_iterate_internal_links_sink);
gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtp_sink);
demux->rtcp_sink =
gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
"rtcp_sink"), "rtcp_sink");
gst_pad_set_chain_function (demux->rtcp_sink, gst_rtp_ssrc_demux_rtcp_chain);
gst_pad_set_event_function (demux->rtcp_sink, gst_rtp_ssrc_demux_sink_event);
gst_pad_set_iterate_internal_links_function (demux->rtcp_sink,
gst_rtp_ssrc_demux_iterate_internal_links_sink);
gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtcp_sink);
g_rec_mutex_init (&demux->padlock);
}
static void
gst_rtp_ssrc_demux_reset (GstRtpSsrcDemux * demux)
{
GSList *walk;
for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
gst_pad_set_active (dpad->rtp_pad, FALSE);
gst_pad_set_active (dpad->rtcp_pad, FALSE);
gst_element_remove_pad (GST_ELEMENT_CAST (demux), dpad->rtp_pad);
gst_element_remove_pad (GST_ELEMENT_CAST (demux), dpad->rtcp_pad);
g_free (dpad);
}
g_slist_free (demux->srcpads);
demux->srcpads = NULL;
}
static void
gst_rtp_ssrc_demux_dispose (GObject * object)
{
GstRtpSsrcDemux *demux;
demux = GST_RTP_SSRC_DEMUX (object);
gst_rtp_ssrc_demux_reset (demux);
G_OBJECT_CLASS (parent_class)->dispose (object);
}
static void
gst_rtp_ssrc_demux_finalize (GObject * object)
{
GstRtpSsrcDemux *demux;
demux = GST_RTP_SSRC_DEMUX (object);
g_rec_mutex_clear (&demux->padlock);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static void
gst_rtp_ssrc_demux_clear_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
{
GstRtpSsrcDemuxPad *dpad;
GST_PAD_LOCK (demux);
dpad = find_demux_pad_for_ssrc (demux, ssrc);
if (dpad == NULL) {
GST_PAD_UNLOCK (demux);
goto unknown_pad;
}
GST_DEBUG_OBJECT (demux, "clearing pad for SSRC %08x", ssrc);
demux->srcpads = g_slist_remove (demux->srcpads, dpad);
GST_PAD_UNLOCK (demux);
gst_pad_set_active (dpad->rtp_pad, FALSE);
gst_pad_set_active (dpad->rtcp_pad, FALSE);
g_signal_emit (G_OBJECT (demux),
gst_rtp_ssrc_demux_signals[SIGNAL_REMOVED_SSRC_PAD], 0, ssrc,
dpad->rtp_pad);
gst_element_remove_pad (GST_ELEMENT_CAST (demux), dpad->rtp_pad);
gst_element_remove_pad (GST_ELEMENT_CAST (demux), dpad->rtcp_pad);
g_free (dpad);
return;
/* ERRORS */
unknown_pad:
{
GST_WARNING_OBJECT (demux, "unknown SSRC %08x", ssrc);
return;
}
}
struct ForwardEventData
{
GstRtpSsrcDemux *demux;
GstEvent *event;
gboolean res;
GstPad *pad;
};
static gboolean
forward_event (GstPad * pad, gpointer user_data)
{
struct ForwardEventData *fdata = user_data;
GSList *walk = NULL;
GstEvent *newevent = NULL;
GST_PAD_LOCK (fdata->demux);
for (walk = fdata->demux->srcpads; walk; walk = walk->next) {
GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
/* Only forward the event if the initial events have been through first,
* the initial events should be forwarded before any other event
* or buffer is pushed */
if ((pad == dpad->rtp_pad && dpad->pushed_initial_rtp_events) ||
(pad == dpad->rtcp_pad && dpad->pushed_initial_rtcp_events)) {
newevent = add_ssrc_and_ref (fdata->event, dpad->ssrc);
break;
}
}
GST_PAD_UNLOCK (fdata->demux);
if (newevent)
fdata->res &= gst_pad_push_event (pad, newevent);
return TRUE;
}
static gboolean
gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstObject * parent,
GstEvent * event)
{
GstRtpSsrcDemux *demux;
struct ForwardEventData fdata;
demux = GST_RTP_SSRC_DEMUX (parent);
fdata.demux = demux;
fdata.pad = pad;
fdata.event = event;
fdata.res = TRUE;
gst_pad_forward (pad, forward_event, &fdata);
gst_event_unref (event);
return fdata.res;
}
static GstFlowReturn
gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
{
GstFlowReturn ret;
GstRtpSsrcDemux *demux;
guint32 ssrc;
GstRTPBuffer rtp = { NULL };
GstPad *srcpad;
GstRtpSsrcDemuxPad *dpad;
demux = GST_RTP_SSRC_DEMUX (parent);
if (!gst_rtp_buffer_map (buf, GST_MAP_READ, &rtp))
goto invalid_payload;
ssrc = gst_rtp_buffer_get_ssrc (&rtp);
gst_rtp_buffer_unmap (&rtp);
GST_DEBUG_OBJECT (demux, "received buffer of SSRC %08x", ssrc);
srcpad = find_or_create_demux_pad_for_ssrc (demux, ssrc, RTP_PAD);
if (srcpad == NULL)
goto create_failed;
/* push to srcpad */
ret = gst_pad_push (srcpad, buf);
if (ret != GST_FLOW_OK) {
/* check if the ssrc still there, may have been removed */
GST_PAD_LOCK (demux);
dpad = find_demux_pad_for_ssrc (demux, ssrc);
if (dpad == NULL || dpad->rtp_pad != srcpad) {
/* SSRC was removed during the push ... ignore the error */
ret = GST_FLOW_OK;
}
GST_PAD_UNLOCK (demux);
}
gst_object_unref (srcpad);
return ret;
/* ERRORS */
invalid_payload:
{
/* this is fatal and should be filtered earlier */
GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
("Dropping invalid RTP payload"));
gst_buffer_unref (buf);
return GST_FLOW_ERROR;
}
create_failed:
{
GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
("Could not create new pad"));
gst_buffer_unref (buf);
return GST_FLOW_ERROR;
}
}
static GstFlowReturn
gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent,
GstBuffer * buf)
{
GstFlowReturn ret;
GstRtpSsrcDemux *demux;
guint32 ssrc;
GstRTCPPacket packet;
GstRTCPBuffer rtcp = { NULL, };
GstPad *srcpad;
GstRtpSsrcDemuxPad *dpad;
demux = GST_RTP_SSRC_DEMUX (parent);
if (!gst_rtcp_buffer_validate (buf))
goto invalid_rtcp;
gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp);
if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet)) {
gst_rtcp_buffer_unmap (&rtcp);
goto invalid_rtcp;
}
/* first packet must be SR or RR or else the validate would have failed */
switch (gst_rtcp_packet_get_type (&packet)) {
case GST_RTCP_TYPE_SR:
/* get the ssrc so that we can route it to the right source pad */
gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, NULL, NULL,
NULL);
break;
default:
goto unexpected_rtcp;
}
gst_rtcp_buffer_unmap (&rtcp);
GST_DEBUG_OBJECT (demux, "received RTCP of SSRC %08x", ssrc);
srcpad = find_or_create_demux_pad_for_ssrc (demux, ssrc, RTCP_PAD);
if (srcpad == NULL)
goto create_failed;
/* push to srcpad */
ret = gst_pad_push (srcpad, buf);
if (ret != GST_FLOW_OK) {
/* check if the ssrc still there, may have been removed */
GST_PAD_LOCK (demux);
dpad = find_demux_pad_for_ssrc (demux, ssrc);
if (dpad == NULL || dpad->rtcp_pad != srcpad) {
/* SSRC was removed during the push ... ignore the error */
ret = GST_FLOW_OK;
}
GST_PAD_UNLOCK (demux);
}
gst_object_unref (srcpad);
return ret;
/* ERRORS */
invalid_rtcp:
{
/* this is fatal and should be filtered earlier */
GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
("Dropping invalid RTCP packet"));
gst_buffer_unref (buf);
return GST_FLOW_ERROR;
}
unexpected_rtcp:
{
GST_DEBUG_OBJECT (demux, "dropping unexpected RTCP packet");
gst_buffer_unref (buf);
return GST_FLOW_OK;
}
create_failed:
{
GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
("Could not create new pad"));
gst_buffer_unref (buf);
return GST_FLOW_ERROR;
}
}
static GstRtpSsrcDemuxPad *
find_demux_pad_for_pad (GstRtpSsrcDemux * demux, GstPad * pad)
{
GSList *walk;
for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
if (dpad->rtp_pad == pad || dpad->rtcp_pad == pad) {
return dpad;
}
}
return NULL;
}
static gboolean
gst_rtp_ssrc_demux_src_event (GstPad * pad, GstObject * parent,
GstEvent * event)
{
GstRtpSsrcDemux *demux;
const GstStructure *s;
demux = GST_RTP_SSRC_DEMUX (parent);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_CUSTOM_UPSTREAM:
case GST_EVENT_CUSTOM_BOTH:
case GST_EVENT_CUSTOM_BOTH_OOB:
s = gst_event_get_structure (event);
if (s && !gst_structure_has_field (s, "ssrc")) {
GstRtpSsrcDemuxPad *dpad = find_demux_pad_for_pad (demux, pad);
if (dpad) {
GstStructure *ws;
event = gst_event_make_writable (event);
ws = gst_event_writable_structure (event);
gst_structure_set (ws, "ssrc", G_TYPE_UINT, dpad->ssrc, NULL);
}
}
break;
default:
break;
}
return gst_pad_event_default (pad, parent, event);
}
static GstIterator *
gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad, GstObject * parent)
{
GstRtpSsrcDemux *demux;
GstPad *otherpad = NULL;
GstIterator *it = NULL;
GSList *current;
demux = GST_RTP_SSRC_DEMUX (parent);
GST_PAD_LOCK (demux);
for (current = demux->srcpads; current; current = g_slist_next (current)) {
GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) current->data;
if (pad == dpad->rtp_pad) {
otherpad = demux->rtp_sink;
break;
} else if (pad == dpad->rtcp_pad) {
otherpad = demux->rtcp_sink;
break;
}
}
if (otherpad) {
GValue val = { 0, };
g_value_init (&val, GST_TYPE_PAD);
g_value_set_object (&val, otherpad);
it = gst_iterator_new_single (GST_TYPE_PAD, &val);
g_value_unset (&val);
}
GST_PAD_UNLOCK (demux);
return it;
}
/* Should return 0 for elements to be included */
static gint
src_pad_compare_func (gconstpointer a, gconstpointer b)
{
GstPad *pad = GST_PAD (g_value_get_object (a));
const gchar *prefix = g_value_get_string (b);
gint res;
/* 0 means equal means we accept the pad, accepted if there is a name
* and it starts with the prefix */
GST_OBJECT_LOCK (pad);
res = !GST_PAD_NAME (pad) || !g_str_has_prefix (GST_PAD_NAME (pad), prefix);
GST_OBJECT_UNLOCK (pad);
return res;
}
static GstIterator *
gst_rtp_ssrc_demux_iterate_internal_links_sink (GstPad * pad,
GstObject * parent)
{
GstRtpSsrcDemux *demux;
GstIterator *it = NULL;
GValue gval = { 0, };
demux = GST_RTP_SSRC_DEMUX (parent);
g_value_init (&gval, G_TYPE_STRING);
if (pad == demux->rtp_sink)
g_value_set_static_string (&gval, "src_");
else if (pad == demux->rtcp_sink)
g_value_set_static_string (&gval, "rtcp_src_");
else
g_assert_not_reached ();
it = gst_element_iterate_src_pads (GST_ELEMENT_CAST (demux));
it = gst_iterator_filter (it, src_pad_compare_func, &gval);
return it;
}
static gboolean
gst_rtp_ssrc_demux_src_query (GstPad * pad, GstObject * parent,
GstQuery * query)
{
GstRtpSsrcDemux *demux;
gboolean res = FALSE;
demux = GST_RTP_SSRC_DEMUX (parent);
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_LATENCY:
{
if ((res = gst_pad_peer_query (demux->rtp_sink, query))) {
gboolean live;
GstClockTime min_latency, max_latency;
GstRtpSsrcDemuxPad *demuxpad;
demuxpad = gst_pad_get_element_private (pad);
gst_query_parse_latency (query, &live, &min_latency, &max_latency);
GST_DEBUG_OBJECT (demux, "peer min latency %" GST_TIME_FORMAT,
GST_TIME_ARGS (min_latency));
GST_DEBUG_OBJECT (demux, "latency for SSRC %08x", demuxpad->ssrc);
gst_query_set_latency (query, live, min_latency, max_latency);
}
break;
}
default:
res = gst_pad_query_default (pad, parent, query);
break;
}
return res;
}
static GstStateChangeReturn
gst_rtp_ssrc_demux_change_state (GstElement * element,
GstStateChange transition)
{
GstStateChangeReturn ret;
GstRtpSsrcDemux *demux;
demux = GST_RTP_SSRC_DEMUX (element);
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:
case GST_STATE_CHANGE_READY_TO_PAUSED:
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
default:
break;
}
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
switch (transition) {
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
gst_rtp_ssrc_demux_reset (demux);
break;
case GST_STATE_CHANGE_READY_TO_NULL:
default:
break;
}
return ret;
}