From b1e2b088796ab44ae047a76960d65ef5ace8f627 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 5 Apr 2007 13:54:23 +0000 Subject: [PATCH] gst/rtpmanager/gstrtpbin.*: Add debugging category. Original commit message from CVS: * gst/rtpmanager/gstrtpbin.c: (find_session_by_id), (create_session), (find_stream_by_ssrc), (create_stream), (gst_rtp_bin_class_init), (new_payload_found), (new_ssrc_pad_found), (create_recv_rtp), (create_recv_rtcp), (create_send_rtp), (create_rtcp): * gst/rtpmanager/gstrtpbin.h: Add debugging category. Added RTPStream to manage stream per SSRC, each with its own jitterbuffer and ptdemux. Added SSRCDemux. Connect to various SSRC and PT signals and create ghostpads, link stuff. * gst/rtpmanager/gstrtpmanager.c: (plugin_init): Added rtpbin to elements. * gst/rtpmanager/gstrtpptdemux.c: (gst_rtp_pt_demux_chain): Fix caps and forward GstFlowReturn * gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_class_init), (gst_rtp_session_event_recv_rtp_sink), (gst_rtp_session_chain_recv_rtp), (gst_rtp_session_event_recv_rtcp_sink), (gst_rtp_session_chain_recv_rtcp), (gst_rtp_session_event_send_rtp_sink), (gst_rtp_session_chain_send_rtp), (create_recv_rtp_sink), (create_recv_rtcp_sink), (create_send_rtp_sink), (create_rtcp_src), (gst_rtp_session_request_new_pad): Add debug category. Add event handling * gst/rtpmanager/gstrtpssrcdemux.c: (find_rtp_pad_for_ssrc), (create_rtp_pad_for_ssrc), (gst_rtp_ssrc_demux_class_init), (gst_rtp_ssrc_demux_init), (gst_rtp_ssrc_demux_chain), (gst_rtp_ssrc_demux_change_state): * gst/rtpmanager/gstrtpssrcdemux.h: Add debug category. Add new-pt-pad signal. --- ChangeLog | 41 +++++ gst/rtpmanager/gstrtpbin.c | 285 +++++++++++++++++++++++++++++-- gst/rtpmanager/gstrtpbin.h | 4 +- gst/rtpmanager/gstrtpmanager.c | 4 + gst/rtpmanager/gstrtpptdemux.c | 50 +++--- gst/rtpmanager/gstrtpsession.c | 102 ++++++++++- gst/rtpmanager/gstrtpssrcdemux.c | 50 ++++-- gst/rtpmanager/gstrtpssrcdemux.h | 7 +- 8 files changed, 487 insertions(+), 56 deletions(-) diff --git a/ChangeLog b/ChangeLog index af63770c6b..52cb42481e 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,44 @@ +2007-04-05 Wim Taymans + + * gst/rtpmanager/gstrtpbin.c: (find_session_by_id), + (create_session), (find_stream_by_ssrc), (create_stream), + (gst_rtp_bin_class_init), (new_payload_found), + (new_ssrc_pad_found), (create_recv_rtp), (create_recv_rtcp), + (create_send_rtp), (create_rtcp): + * gst/rtpmanager/gstrtpbin.h: + Add debugging category. + Added RTPStream to manage stream per SSRC, each with its own + jitterbuffer and ptdemux. + Added SSRCDemux. + Connect to various SSRC and PT signals and create ghostpads, link stuff. + + + * gst/rtpmanager/gstrtpmanager.c: (plugin_init): + Added rtpbin to elements. + + * gst/rtpmanager/gstrtpptdemux.c: (gst_rtp_pt_demux_chain): + Fix caps and forward GstFlowReturn + + * gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_class_init), + (gst_rtp_session_event_recv_rtp_sink), + (gst_rtp_session_chain_recv_rtp), + (gst_rtp_session_event_recv_rtcp_sink), + (gst_rtp_session_chain_recv_rtcp), + (gst_rtp_session_event_send_rtp_sink), + (gst_rtp_session_chain_send_rtp), (create_recv_rtp_sink), + (create_recv_rtcp_sink), (create_send_rtp_sink), (create_rtcp_src), + (gst_rtp_session_request_new_pad): + Add debug category. + Add event handling + + * gst/rtpmanager/gstrtpssrcdemux.c: (find_rtp_pad_for_ssrc), + (create_rtp_pad_for_ssrc), (gst_rtp_ssrc_demux_class_init), + (gst_rtp_ssrc_demux_init), (gst_rtp_ssrc_demux_chain), + (gst_rtp_ssrc_demux_change_state): + * gst/rtpmanager/gstrtpssrcdemux.h: + Add debug category. + Add new-pt-pad signal. + 2007-04-05 Thomas Vander Stichele submitted by: Mogens Jaeger diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c index c1ec71307f..d63321f6a8 100644 --- a/gst/rtpmanager/gstrtpbin.c +++ b/gst/rtpmanager/gstrtpbin.c @@ -43,6 +43,10 @@ #include "gstrtpbin.h" +GST_DEBUG_CATEGORY_STATIC (gst_rtp_bin_debug); +#define GST_CAT_DEFAULT gst_rtp_bin_debug + + /* elementfactory information */ static const GstElementDetails rtpbin_details = GST_ELEMENT_DETAILS ("RTP Bin", "Filter/Editor/Video", @@ -98,6 +102,7 @@ GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%d", struct _GstRTPBinPrivate { + guint foo; }; /* signals and args */ @@ -113,30 +118,72 @@ enum }; /* helper objects */ -typedef struct +typedef struct _GstRTPBinSession GstRTPBinSession; +typedef struct _GstRTPBinStream GstRTPBinStream; +typedef struct _GstRTPBinClient GstRTPBinClient; + +/* Manages the RTP stream for one SSRC. + * + * We pipe the stream (comming from the SSRC demuxer) into a jitterbuffer. + * If we see an SDES RTCP packet that links multiple SSRCs together based on a + * common CNAME, we create a GstRTPBinClient structure to group the SSRCs + * together (see below). + */ +struct _GstRTPBinStream +{ + /* the SSRC of this stream */ + guint32 ssrc; + /* parent bin */ + GstRTPBin *bin; + /* the session this SSRC belongs to */ + GstRTPBinSession *session; + /* the jitterbuffer of the SSRC */ + GstElement *buffer; + /* the PT demuxer of the SSRC */ + GstElement *demux; + gulong demux_newpad_sig; +}; + +/* Manages the receiving end of the packets. + * + * There is one such structure for each RTP session (audio/video/...). + * We get the RTP/RTCP packets and stuff them into the session manager. From + * there they are pushed into an SSRC demuxer that splits the stream based on + * SSRC. Each of the SSRC streams go into their own jitterbuffer (managed with + * the GstRTPBinStream above). + */ +struct _GstRTPBinSession { /* session id */ gint id; + /* the parent bin */ + GstRTPBin *bin; /* the session element */ GstElement *session; /* the SSRC demuxer */ - GstElement *ssrcdemux; + GstElement *demux; + gulong demux_newpad_sig; + + /* list of GstRTPBinStream */ + GSList *streams; /* the pads of the session */ GstPad *recv_rtp_sink; + GstPad *recv_rtp_src; GstPad *recv_rtcp_sink; + GstPad *recv_rtcp_src; GstPad *send_rtp_sink; + GstPad *send_rtp_src; GstPad *rtcp_src; - -} GstRTPBinSession; +}; /* find a session with the given id */ static GstRTPBinSession * find_session_by_id (GstRTPBin * rtpbin, gint id) { - GList *walk; + GSList *walk; - for (walk = rtpbin->sessions; walk; walk = g_list_next (walk)) { + for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) { GstRTPBinSession *sess = (GstRTPBinSession *) walk->data; if (sess->id == id) @@ -150,14 +197,25 @@ static GstRTPBinSession * create_session (GstRTPBin * rtpbin, gint id) { GstRTPBinSession *sess; - GstElement *elem; + GstElement *elem, *demux; if (!(elem = gst_element_factory_make ("rtpsession", NULL))) goto no_session; + if (!(demux = gst_element_factory_make ("rtpssrcdemux", NULL))) + goto no_demux; + sess = g_new0 (GstRTPBinSession, 1); sess->id = id; + sess->bin = rtpbin; sess->session = elem; + sess->demux = demux; + rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess); + + gst_bin_add (GST_BIN_CAST (rtpbin), elem); + gst_element_set_state (elem, GST_STATE_PLAYING); + gst_bin_add (GST_BIN_CAST (rtpbin), demux); + gst_element_set_state (demux, GST_STATE_PLAYING); return sess; @@ -167,8 +225,85 @@ no_session: g_warning ("rtpbin: could not create rtpsession element"); return NULL; } +no_demux: + { + gst_object_unref (elem); + g_warning ("rtpbin: could not create rtpssrcdemux element"); + return NULL; + } } +#if 0 +static GstRTPBinStream * +find_stream_by_ssrc (GstRTPBinSession * session, guint32 ssrc) +{ + GSList *walk; + + for (walk = session->streams; walk; walk = g_slist_next (walk)) { + GstRTPBinStream *stream = (GstRTPBinStream *) walk->data; + + if (stream->ssrc == ssrc) + return stream; + } + return NULL; +} +#endif + +static GstRTPBinStream * +create_stream (GstRTPBinSession * session, guint32 ssrc) +{ + GstElement *buffer, *demux; + GstRTPBinStream *stream; + + if (!(buffer = gst_element_factory_make ("rtpjitterbuffer", NULL))) + goto no_jitterbuffer; + + if (!(demux = gst_element_factory_make ("rtpptdemux", NULL))) + goto no_demux; + + stream = g_new0 (GstRTPBinStream, 1); + stream->ssrc = ssrc; + stream->bin = session->bin; + stream->session = session; + stream->buffer = buffer; + stream->demux = demux; + session->streams = g_slist_prepend (session->streams, stream); + + gst_bin_add (GST_BIN_CAST (session->bin), buffer); + gst_element_set_state (buffer, GST_STATE_PLAYING); + gst_bin_add (GST_BIN_CAST (session->bin), demux); + gst_element_set_state (demux, GST_STATE_PLAYING); + + /* link stuff */ + gst_element_link (buffer, demux); + + return stream; + + /* ERRORS */ +no_jitterbuffer: + { + g_warning ("rtpbin: could not create rtpjitterbuffer element"); + return NULL; + } +no_demux: + { + gst_object_unref (buffer); + g_warning ("rtpbin: could not create rtpptdemux element"); + return NULL; + } +} + +/* Manages the RTP streams that come from one client and should therefore be + * synchronized. + */ +struct _GstRTPBinClient +{ + /* the common CNAME for the streams */ + gchar *cname; + /* the streams */ + GSList *streams; +}; + /* GObject vmethods */ static void gst_rtp_bin_finalize (GObject * object); static void gst_rtp_bin_set_property (GObject * object, guint prop_id, @@ -230,6 +365,8 @@ gst_rtp_bin_class_init (GstRTPBinClass * klass) gstelement_class->request_new_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad); gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_release_pad); + + GST_DEBUG_CATEGORY_INIT (gst_rtp_bin_debug, "rtpbin", 0, "RTP bin"); } static void @@ -312,22 +449,88 @@ gst_rtp_bin_change_state (GstElement * element, GstStateChange transition) return res; } +/* a new pad (SSRC) was created in @session */ +static void +new_payload_found (GstElement * element, guint pt, GstPad * pad, + GstRTPBinStream * stream) +{ + GstRTPBin *rtpbin; + GstElementClass *klass; + GstPadTemplate *templ; + gchar *padname; + GstPad *gpad; + + rtpbin = stream->bin; + + GST_DEBUG ("new payload pad %d", pt); + + /* ghost the pad to the parent */ + klass = GST_ELEMENT_GET_CLASS (rtpbin); + templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%d_%d_%d"); + padname = g_strdup_printf ("recv_rtp_src_%d_%u_%d", + stream->session->id, stream->ssrc, pt); + gpad = gst_ghost_pad_new_from_template (padname, pad, templ); + g_free (padname); + + gst_pad_set_active (gpad, TRUE); + gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad); +} + +/* a new pad (SSRC) was created in @session */ +static void +new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad, + GstRTPBinSession * session) +{ + GstRTPBinStream *stream; + GstPad *sinkpad; + + GST_DEBUG_OBJECT (session->bin, "new SSRC pad %08x", ssrc); + + /* create new stream */ + stream = create_stream (session, ssrc); + if (!stream) + goto no_stream; + + /* get pad and link */ + GST_DEBUG_OBJECT (session->bin, "linking jitterbuffer"); + sinkpad = gst_element_get_static_pad (stream->buffer, "sink"); + gst_pad_link (pad, sinkpad); + gst_object_unref (sinkpad); + + /* connect to the new-pad signal of the payload demuxer */ + stream->demux_newpad_sig = g_signal_connect (stream->demux, + "new-payload-type", (GCallback) new_payload_found, stream); + + return; + + /* ERRORS */ +no_stream: + { + GST_DEBUG ("could not create stream"); + return; + } +} + /* Create a pad for receiving RTP for the session in @name */ static GstPad * create_recv_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name) { - GstPad *result; + GstPad *result, *sinkdpad; guint sessid; GstRTPBinSession *session; + GstPadLinkReturn lres; /* first get the session number */ if (name == NULL || sscanf (name, "recv_rtp_sink_%d", &sessid) != 1) goto no_name; + GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid); + /* get or create session */ session = find_session_by_id (rtpbin, sessid); if (!session) { + GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid); /* create session now */ session = create_session (rtpbin, sessid); if (session == NULL) @@ -337,18 +540,37 @@ create_recv_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name) if (session->recv_rtp_sink != NULL) goto existed; + GST_DEBUG_OBJECT (rtpbin, "getting RTP sink pad"); /* get recv_rtp pad and store */ session->recv_rtp_sink = gst_element_get_request_pad (session->session, "recv_rtp_sink"); if (session->recv_rtp_sink == NULL) goto pad_failed; + GST_DEBUG_OBJECT (rtpbin, "getting RTP src pad"); + /* get srcpad, link to SSRCDemux */ + session->recv_rtp_src = + gst_element_get_static_pad (session->session, "recv_rtp_src"); + if (session->recv_rtp_src == NULL) + goto pad_failed; + + GST_DEBUG_OBJECT (rtpbin, "getting demuxer sink pad"); + sinkdpad = gst_element_get_static_pad (session->demux, "sink"); + lres = gst_pad_link (session->recv_rtp_src, sinkdpad); + gst_object_unref (sinkdpad); + if (lres != GST_PAD_LINK_OK) + goto link_failed; + + /* connect to the new-ssrc-pad signal of the demuxer */ + session->demux_newpad_sig = g_signal_connect (session->demux, + "new-ssrc-pad", (GCallback) new_ssrc_pad_found, session); + + GST_DEBUG_OBJECT (rtpbin, "ghosting session sink pad"); result = gst_ghost_pad_new_from_template (name, session->recv_rtp_sink, templ); + gst_pad_set_active (result, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result); - /* FIXME, get srcpad, link to SSRCDemux */ - return result; /* ERRORS */ @@ -372,6 +594,11 @@ pad_failed: g_warning ("rtpbin: failed to get session pad"); return NULL; } +link_failed: + { + g_warning ("rtpbin: failed to link pads"); + return NULL; + } } /* Create a pad for receiving RTCP for the session in @name @@ -384,10 +611,17 @@ create_recv_rtcp (GstRTPBin * rtpbin, GstPadTemplate * templ, guint sessid; GstRTPBinSession *session; +#if 0 + GstPad *sinkdpad; + GstPadLinkReturn lres; +#endif + /* first get the session number */ if (name == NULL || sscanf (name, "recv_rtcp_sink_%d", &sessid) != 1) goto no_name; + GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid); + /* get the session, it must exist or we error */ session = find_session_by_id (rtpbin, sessid); if (!session) @@ -397,18 +631,35 @@ create_recv_rtcp (GstRTPBin * rtpbin, GstPadTemplate * templ, if (session->recv_rtcp_sink != NULL) goto existed; + GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad"); + /* get recv_rtp pad and store */ session->recv_rtcp_sink = gst_element_get_request_pad (session->session, "recv_rtcp_sink"); if (session->recv_rtcp_sink == NULL) goto pad_failed; +#if 0 + /* get srcpad, link to SSRCDemux */ + GST_DEBUG_OBJECT (rtpbin, "getting sync src pad"); + session->recv_rtcp_src = + gst_element_get_static_pad (session->session, "sync_src"); + if (session->recv_rtcp_src == NULL) + goto pad_failed; + + GST_DEBUG_OBJECT (rtpbin, "linking sync to demux"); + sinkdpad = gst_element_get_static_pad (session->demux, "sink"); + lres = gst_pad_link (session->recv_rtcp_src, sinkdpad); + gst_object_unref (sinkdpad); + if (lres != GST_PAD_LINK_OK) + goto link_failed; +#endif + result = gst_ghost_pad_new_from_template (name, session->recv_rtcp_sink, templ); + gst_pad_set_active (result, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result); - /* FIXME, get srcpad, link to SSRCDemux */ - return result; /* ERRORS */ @@ -433,6 +684,13 @@ pad_failed: g_warning ("rtpbin: failed to get session pad"); return NULL; } +#if 0 +link_failed: + { + g_warning ("rtpbin: failed to link pads"); + return NULL; + } +#endif } /* Create a pad for sending RTP for the session in @name @@ -471,6 +729,7 @@ create_send_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name) result = gst_ghost_pad_new_from_template (name, session->send_rtp_sink, templ); + gst_pad_set_active (result, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result); /* get srcpad */ @@ -484,6 +743,7 @@ create_send_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name) templ = gst_element_class_get_pad_template (klass, "send_rtp_src_%d"); srcghost = gst_ghost_pad_new_from_template (gname, session->send_rtp_sink, templ); + gst_pad_set_active (srcghost, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), srcghost); g_free (gname); @@ -546,6 +806,7 @@ create_rtcp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name) goto pad_failed; result = gst_ghost_pad_new_from_template (name, session->rtcp_src, templ); + gst_pad_set_active (result, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result); return result; diff --git a/gst/rtpmanager/gstrtpbin.h b/gst/rtpmanager/gstrtpbin.h index 517c117858..ccd57a78ab 100644 --- a/gst/rtpmanager/gstrtpbin.h +++ b/gst/rtpmanager/gstrtpbin.h @@ -38,10 +38,10 @@ typedef struct _GstRTPBinClass GstRTPBinClass; typedef struct _GstRTPBinPrivate GstRTPBinPrivate; struct _GstRTPBin { - GstBin element; + GstBin bin; /* a list of session */ - GList *sessions; + GSList *sessions; /*< private >*/ GstRTPBinPrivate *priv; diff --git a/gst/rtpmanager/gstrtpmanager.c b/gst/rtpmanager/gstrtpmanager.c index d71d850c60..1490c4cbea 100644 --- a/gst/rtpmanager/gstrtpmanager.c +++ b/gst/rtpmanager/gstrtpmanager.c @@ -21,6 +21,7 @@ #include "config.h" #endif +#include "gstrtpbin.h" #include "gstrtpclient.h" #include "gstrtpjitterbuffer.h" #include "gstrtpptdemux.h" @@ -30,6 +31,9 @@ static gboolean plugin_init (GstPlugin * plugin) { + if (!gst_element_register (plugin, "rtpbin", GST_RANK_NONE, GST_TYPE_RTP_BIN)) + return FALSE; + if (!gst_element_register (plugin, "rtpclient", GST_RANK_NONE, GST_TYPE_RTP_CLIENT)) return FALSE; diff --git a/gst/rtpmanager/gstrtpptdemux.c b/gst/rtpmanager/gstrtpptdemux.c index 5950f619fe..d7ff34d920 100644 --- a/gst/rtpmanager/gstrtpptdemux.c +++ b/gst/rtpmanager/gstrtpptdemux.c @@ -56,15 +56,15 @@ static GstStaticPadTemplate rtp_pt_demux_sink_template = GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK, GST_PAD_ALWAYS, - GST_STATIC_CAPS ("application/x-rtp, " - "payload = (int) [ 0, 255 ], " "clock-rate = (int) [ 0, 2147483647 ]") + GST_STATIC_CAPS ("application/x-rtp") ); static GstStaticPadTemplate rtp_pt_demux_src_template = -GST_STATIC_PAD_TEMPLATE ("src%d", +GST_STATIC_PAD_TEMPLATE ("src_%d", GST_PAD_SRC, GST_PAD_SOMETIMES, - GST_STATIC_CAPS_ANY); + GST_STATIC_CAPS ("application/x-rtp, " "payload = (int) [ 0, 255 ]") + ); GST_DEBUG_CATEGORY_STATIC (gst_rtp_pt_demux_debug); #define GST_CAT_DEFAULT gst_rtp_pt_demux_debug @@ -191,10 +191,13 @@ gst_rtp_pt_demux_chain (GstPad * pad, GstBuffer * buf) rtpdemux = GST_RTP_PT_DEMUX (GST_OBJECT_PARENT (pad)); - g_return_val_if_fail (gst_rtp_buffer_validate (buf), GST_FLOW_ERROR); + if (!gst_rtp_buffer_validate (buf)) + goto invalid_buffer; pt = gst_rtp_buffer_get_payload_type (buf); + GST_DEBUG_OBJECT (rtpdemux, "received buffer for pt %d", pt); + srcpad = find_pad_for_pt (rtpdemux, pt); if (srcpad == NULL) { /* new PT, create a src pad */ @@ -205,15 +208,14 @@ gst_rtp_pt_demux_chain (GstPad * pad, GstBuffer * buf) GstRTPPtDemuxPad *rtpdemuxpad; klass = GST_ELEMENT_GET_CLASS (rtpdemux); - templ = gst_element_class_get_pad_template (klass, "src%d"); - padname = g_strdup_printf ("src%d", pt); + templ = gst_element_class_get_pad_template (klass, "src_%d"); + padname = g_strdup_printf ("src_%d", pt); srcpad = gst_pad_new_from_template (templ, padname); g_free (padname); caps = gst_pad_get_caps (srcpad); caps = gst_caps_make_writable (caps); - gst_caps_append_structure (caps, - gst_structure_new ("payload", "payload", G_TYPE_INT, pt, NULL)); + gst_caps_set_simple (caps, "payload", G_TYPE_INT, pt, NULL); gst_pad_set_caps (srcpad, caps); /* XXX: set _link () function */ @@ -221,17 +223,15 @@ gst_rtp_pt_demux_chain (GstPad * pad, GstBuffer * buf) gst_pad_set_active (srcpad, TRUE); gst_element_add_pad (element, srcpad); - if (srcpad) { - GST_DEBUG ("Adding pt=%d to the list.", pt); - rtpdemuxpad = g_new0 (GstRTPPtDemuxPad, 1); - rtpdemuxpad->pt = pt; - rtpdemuxpad->pad = srcpad; - rtpdemux->srcpads = g_slist_append (rtpdemux->srcpads, rtpdemuxpad); + GST_DEBUG ("Adding pt=%d to the list.", pt); + rtpdemuxpad = g_new0 (GstRTPPtDemuxPad, 1); + rtpdemuxpad->pt = pt; + rtpdemuxpad->pad = srcpad; + rtpdemux->srcpads = g_slist_append (rtpdemux->srcpads, rtpdemuxpad); - GST_DEBUG ("emitting new-payload_type for pt %d", pt); - g_signal_emit (G_OBJECT (rtpdemux), - gst_rtp_pt_demux_signals[SIGNAL_NEW_PAYLOAD_TYPE], 0, pt, srcpad); - } + GST_DEBUG ("emitting new-payload_type for pt %d", pt); + g_signal_emit (G_OBJECT (rtpdemux), + gst_rtp_pt_demux_signals[SIGNAL_NEW_PAYLOAD_TYPE], 0, pt, srcpad); } if (pt != rtpdemux->last_pt) { @@ -246,9 +246,19 @@ gst_rtp_pt_demux_chain (GstPad * pad, GstBuffer * buf) /* push to srcpad */ if (srcpad) - gst_pad_push (srcpad, GST_BUFFER (buf)); + ret = gst_pad_push (srcpad, GST_BUFFER (buf)); return ret; + + /* ERRORS */ +invalid_buffer: + { + /* this is fatal and should be filtered earlier */ + GST_ELEMENT_ERROR (rtpdemux, STREAM, DECODE, (NULL), + ("Dropping invalid RTP payload")); + gst_buffer_unref (buf); + return GST_FLOW_ERROR; + } } static GstCaps * diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c index 47df756f04..5d7508a636 100644 --- a/gst/rtpmanager/gstrtpsession.c +++ b/gst/rtpmanager/gstrtpsession.c @@ -41,6 +41,9 @@ #endif #include "gstrtpsession.h" +GST_DEBUG_CATEGORY_STATIC (gst_rtp_session_debug); +#define GST_CAT_DEFAULT gst_rtp_session_debug + /* elementfactory information */ static const GstElementDetails rtpsession_details = GST_ELEMENT_DETAILS ("RTP Session", @@ -174,6 +177,9 @@ gst_rtp_session_class_init (GstRTPSessionClass * klass) GST_DEBUG_FUNCPTR (gst_rtp_session_request_new_pad); gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_session_release_pad); + + GST_DEBUG_CATEGORY_INIT (gst_rtp_session_debug, + "rtpsession", 0, "RTP Session"); } static void @@ -255,6 +261,26 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition) return res; } +static GstFlowReturn +gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event) +{ + GstRTPSession *rtpsession; + gboolean ret = FALSE; + + rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); + + GST_DEBUG_OBJECT (rtpsession, "received event"); + + switch (GST_EVENT_TYPE (event)) { + default: + ret = gst_pad_push_event (rtpsession->recv_rtp_src, event); + break; + } + gst_object_unref (rtpsession); + + return ret; +} + /* receive a packet from a sender, send it to the RTP session manager and * forward the packet on the rtp_src pad */ @@ -266,6 +292,8 @@ gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer) rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); + GST_DEBUG_OBJECT (rtpsession, "received RTP packet"); + /* FIXME, do something */ ret = gst_pad_push (rtpsession->recv_rtp_src, buffer); @@ -274,6 +302,26 @@ gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer) return ret; } +static GstFlowReturn +gst_rtp_session_event_recv_rtcp_sink (GstPad * pad, GstEvent * event) +{ + GstRTPSession *rtpsession; + gboolean ret = FALSE; + + rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); + + GST_DEBUG_OBJECT (rtpsession, "received event"); + + switch (GST_EVENT_TYPE (event)) { + default: + ret = gst_pad_push_event (rtpsession->sync_src, event); + break; + } + gst_object_unref (rtpsession); + + return ret; +} + /* Receive an RTCP packet from a sender, send it to the RTP session manager and * forward the SR packets to the sync_src pad. */ @@ -286,6 +334,8 @@ gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstBuffer * buffer) rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); /* FIXME, do something */ + GST_DEBUG_OBJECT (rtpsession, "received RTCP packet"); + ret = gst_pad_push (rtpsession->sync_src, buffer); gst_object_unref (rtpsession); @@ -293,6 +343,26 @@ gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstBuffer * buffer) return ret; } +static GstFlowReturn +gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstEvent * event) +{ + GstRTPSession *rtpsession; + gboolean ret = FALSE; + + rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); + + GST_DEBUG_OBJECT (rtpsession, "received event"); + + switch (GST_EVENT_TYPE (event)) { + default: + ret = gst_pad_push_event (rtpsession->send_rtp_src, event); + break; + } + gst_object_unref (rtpsession); + + return ret; +} + /* Recieve an RTP packet to be send to the receivers, send to RTP session * manager and forward to send_rtp_src. */ @@ -304,6 +374,8 @@ gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer) rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); + GST_DEBUG_OBJECT (rtpsession, "received RTP packet"); + /* FIXME, do something */ ret = gst_pad_push (rtpsession->send_rtp_src, buffer); @@ -319,17 +391,24 @@ gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer) static GstPad * create_recv_rtp_sink (GstRTPSession * rtpsession) { + GST_DEBUG_OBJECT (rtpsession, "creating RTP sink pad"); + rtpsession->recv_rtp_sink = gst_pad_new_from_static_template (&rtpsession_recv_rtp_sink_template, NULL); gst_pad_set_chain_function (rtpsession->recv_rtp_sink, gst_rtp_session_chain_recv_rtp); + gst_pad_set_event_function (rtpsession->recv_rtp_sink, + gst_rtp_session_event_recv_rtp_sink); + gst_pad_set_active (rtpsession->recv_rtp_sink, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_sink); + GST_DEBUG_OBJECT (rtpsession, "creating RTP src pad"); rtpsession->recv_rtp_src = gst_pad_new_from_static_template (&rtpsession_recv_rtp_src_template, - NULL); + "recv_rtp_src"); + gst_pad_set_active (rtpsession->recv_rtp_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_src); return rtpsession->recv_rtp_sink; @@ -341,16 +420,24 @@ create_recv_rtp_sink (GstRTPSession * rtpsession) static GstPad * create_recv_rtcp_sink (GstRTPSession * rtpsession) { + GST_DEBUG_OBJECT (rtpsession, "creating RTCP sink pad"); + rtpsession->recv_rtcp_sink = gst_pad_new_from_static_template (&rtpsession_recv_rtcp_sink_template, NULL); gst_pad_set_chain_function (rtpsession->recv_rtcp_sink, gst_rtp_session_chain_recv_rtcp); + gst_pad_set_event_function (rtpsession->recv_rtcp_sink, + gst_rtp_session_event_recv_rtcp_sink); + gst_pad_set_active (rtpsession->recv_rtcp_sink, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtcp_sink); + GST_DEBUG_OBJECT (rtpsession, "creating sync src pad"); rtpsession->sync_src = - gst_pad_new_from_static_template (&rtpsession_sync_src_template, NULL); + gst_pad_new_from_static_template (&rtpsession_sync_src_template, + "sync_src"); + gst_pad_set_active (rtpsession->sync_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->sync_src); return rtpsession->recv_rtcp_sink; @@ -362,17 +449,23 @@ create_recv_rtcp_sink (GstRTPSession * rtpsession) static GstPad * create_send_rtp_sink (GstRTPSession * rtpsession) { + GST_DEBUG_OBJECT (rtpsession, "creating pad"); + rtpsession->send_rtp_sink = gst_pad_new_from_static_template (&rtpsession_send_rtp_sink_template, NULL); gst_pad_set_chain_function (rtpsession->send_rtp_sink, gst_rtp_session_chain_send_rtp); + gst_pad_set_event_function (rtpsession->send_rtp_sink, + gst_rtp_session_event_send_rtp_sink); + gst_pad_set_active (rtpsession->send_rtp_sink, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtcp_sink); rtpsession->send_rtp_src = gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template, NULL); + gst_pad_set_active (rtpsession->send_rtp_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src); return rtpsession->send_rtp_sink; @@ -385,8 +478,11 @@ create_send_rtp_sink (GstRTPSession * rtpsession) static GstPad * create_rtcp_src (GstRTPSession * rtpsession) { + GST_DEBUG_OBJECT (rtpsession, "creating pad"); + rtpsession->rtcp_src = gst_pad_new_from_static_template (&rtpsession_rtcp_src_template, NULL); + gst_pad_set_active (rtpsession->rtcp_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->rtcp_src); return rtpsession->rtcp_src; @@ -406,6 +502,8 @@ gst_rtp_session_request_new_pad (GstElement * element, rtpsession = GST_RTP_SESSION (element); klass = GST_ELEMENT_GET_CLASS (element); + GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name)); + /* figure out the template */ if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink")) { if (rtpsession->recv_rtp_sink != NULL) diff --git a/gst/rtpmanager/gstrtpssrcdemux.c b/gst/rtpmanager/gstrtpssrcdemux.c index fe6f1bee75..3237100cfc 100644 --- a/gst/rtpmanager/gstrtpssrcdemux.c +++ b/gst/rtpmanager/gstrtpssrcdemux.c @@ -28,6 +28,9 @@ #include "gstrtpssrcdemux.h" +GST_DEBUG_CATEGORY_STATIC (gst_rtp_ssrc_demux_debug); +#define GST_CAT_DEFAULT gst_rtp_ssrc_demux_debug + /* generic templates */ static GstStaticPadTemplate rtp_ssrc_demux_sink_template = GST_STATIC_PAD_TEMPLATE ("sink", @@ -50,12 +53,10 @@ static GstElementDetails gst_rtp_ssrc_demux_details = { "Wim Taymans " }; -GST_DEBUG_CATEGORY_STATIC (gst_rtp_ssrc_demux_debug); -#define GST_CAT_DEFAULT gst_rtp_ssrc_demux_debug - /* signals */ enum { + SIGNAL_NEW_SSRC_PAD, LAST_SIGNAL }; @@ -77,7 +78,7 @@ static gboolean gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event); /* srcpad stuff */ static gboolean gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event); -/* static guint gst_rtp_ssrc_demux_signals[LAST_SIGNAL] = { 0 }; */ +static guint gst_rtp_ssrc_demux_signals[LAST_SIGNAL] = { 0 }; /** * Item for storing GstPad <-> SSRC pairs. @@ -91,11 +92,11 @@ struct _GstRTPSsrcDemuxPad /* find a src pad for a given SSRC, returns NULL if the SSRC was not found */ static GstPad * -find_pad_for_ssrc (GstRTPSsrcDemux * demux, guint32 ssrc) +find_rtp_pad_for_ssrc (GstRTPSsrcDemux * demux, guint32 ssrc) { GSList *walk; - for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) { + for (walk = demux->rtp_srcpads; walk; walk = g_slist_next (walk)) { GstRTPSsrcDemuxPad *pad = (GstRTPSsrcDemuxPad *) walk->data; if (pad->ssrc == ssrc) @@ -105,7 +106,7 @@ find_pad_for_ssrc (GstRTPSsrcDemux * demux, guint32 ssrc) } static GstPad * -create_pad_for_ssrc (GstRTPSsrcDemux * demux, guint32 ssrc) +create_rtp_pad_for_ssrc (GstRTPSsrcDemux * demux, guint32 ssrc) { GstPad *result; GstElementClass *klass; @@ -123,15 +124,18 @@ create_pad_for_ssrc (GstRTPSsrcDemux * demux, guint32 ssrc) demuxpad = g_new0 (GstRTPSsrcDemuxPad, 1); demuxpad->ssrc = ssrc; demuxpad->pad = result; - demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad); + demux->rtp_srcpads = g_slist_prepend (demux->rtp_srcpads, demuxpad); /* copy caps from input */ - gst_pad_set_caps (result, GST_PAD_CAPS (demux->sinkpad)); + gst_pad_set_caps (result, GST_PAD_CAPS (demux->rtp_sink)); gst_pad_set_event_function (result, gst_rtp_ssrc_demux_src_event); gst_pad_set_active (result, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (demux), result); + g_signal_emit (G_OBJECT (demux), + gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, result); + return result; } @@ -159,6 +163,13 @@ gst_rtp_ssrc_demux_class_init (GstRTPSsrcDemuxClass * klass) gobject_klass->finalize = GST_DEBUG_FUNCPTR (gst_rtp_ssrc_demux_finalize); + gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD] = + g_signal_new ("new-ssrc-pad", + G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, + G_STRUCT_OFFSET (GstRTPSsrcDemuxClass, new_ssrc_pad), + NULL, NULL, g_cclosure_marshal_VOID__UINT_POINTER, + G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_PAD); + gstelement_klass->change_state = GST_DEBUG_FUNCPTR (gst_rtp_ssrc_demux_change_state); @@ -172,12 +183,12 @@ gst_rtp_ssrc_demux_init (GstRTPSsrcDemux * demux, { GstElementClass *klass = GST_ELEMENT_GET_CLASS (demux); - demux->sinkpad = + demux->rtp_sink = gst_pad_new_from_template (gst_element_class_get_pad_template (klass, "sink"), "sink"); - gst_pad_set_chain_function (demux->sinkpad, gst_rtp_ssrc_demux_chain); - gst_pad_set_event_function (demux->sinkpad, gst_rtp_ssrc_demux_sink_event); - gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->sinkpad); + gst_pad_set_chain_function (demux->rtp_sink, gst_rtp_ssrc_demux_chain); + gst_pad_set_event_function (demux->rtp_sink, gst_rtp_ssrc_demux_sink_event); + gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtp_sink); } static void @@ -224,9 +235,12 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf) ssrc = gst_rtp_buffer_get_ssrc (buf); - srcpad = find_pad_for_ssrc (demux, ssrc); + GST_DEBUG_OBJECT (demux, "received buffer of SSRC %08x", ssrc); + + srcpad = find_rtp_pad_for_ssrc (demux, ssrc); if (srcpad == NULL) { - srcpad = create_pad_for_ssrc (demux, ssrc); + GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc); + srcpad = create_rtp_pad_for_ssrc (demux, ssrc); if (!srcpad) goto create_failed; } @@ -239,11 +253,11 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf) /* ERRORS */ invalid_payload: { - /* this is not fatal yet */ - GST_ELEMENT_WARNING (demux, STREAM, DECODE, (NULL), + /* this is fatal and should be filtered earlier */ + GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL), ("Dropping invalid RTP payload")); gst_buffer_unref (buf); - return GST_FLOW_OK; + return GST_FLOW_ERROR; } create_failed: { diff --git a/gst/rtpmanager/gstrtpssrcdemux.h b/gst/rtpmanager/gstrtpssrcdemux.h index 6e1c230394..475d2f540e 100644 --- a/gst/rtpmanager/gstrtpssrcdemux.h +++ b/gst/rtpmanager/gstrtpssrcdemux.h @@ -36,13 +36,16 @@ struct _GstRTPSsrcDemux { GstElement parent; - GstPad *sinkpad; - GSList *srcpads; + GstPad *rtp_sink; + GSList *rtp_srcpads; }; struct _GstRTPSsrcDemuxClass { GstElementClass parent_class; + + /* signals */ + void (*new_ssrc_pad) (GstElement *element, guint32 ssrc, GstPad *pad); }; GType gst_rtp_ssrc_demux_get_type (void);