rtpbin: add support for AUX sender and receiver

AUX elements are elements that can be inserted into the rtpbin
pipeline right before or after 1 or more session elements.

The AUX elements are essential for implementing functionality such
as error correction (FEC) and retransmission (RTX).

Fixes https://bugzilla.gnome.org/show_bug.cgi?id=711087
This commit is contained in:
Wim Taymans 2013-12-31 12:31:25 +01:00
parent 841f9ad050
commit d08e05b4ef
2 changed files with 412 additions and 150 deletions

View file

@ -259,6 +259,9 @@ enum
SIGNAL_NEW_JITTERBUFFER,
SIGNAL_REQUEST_AUX_SENDER,
SIGNAL_REQUEST_AUX_RECEIVER,
LAST_SIGNAL
};
@ -1983,6 +1986,41 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass)
new_jitterbuffer), NULL, NULL, g_cclosure_marshal_generic,
G_TYPE_NONE, 3, GST_TYPE_ELEMENT, G_TYPE_UINT, G_TYPE_UINT);
/**
* GstRtpBin::request-aux-sender:
* @rtpbin: the object which received the signal
* @session: the session
*
* Request an AUX sender element for the given @session. The AUX
* element will be added to the bin.
*
* If no handler is connected, no AUX element will be used.
*
* Since: 1.4
*/
gst_rtp_bin_signals[SIGNAL_REQUEST_AUX_SENDER] =
g_signal_new ("request-aux-sender", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
request_aux_sender), _gst_element_accumulator, NULL,
g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
/**
* GstRtpBin::request-aux-receiver:
* @rtpbin: the object which received the signal
* @session: the session
*
* Request an AUX receiver element for the given @session. The AUX
* element will be added to the bin.
*
* If no handler is connected, no AUX element will be used.
*
* Since: 1.4
*/
gst_rtp_bin_signals[SIGNAL_REQUEST_AUX_RECEIVER] =
g_signal_new ("request-aux-receiver", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
request_aux_receiver), _gst_element_accumulator, NULL,
g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
g_object_class_install_property (gobject_class, PROP_SDES,
g_param_spec_boxed ("sdes", "SDES",
"The SDES items of this session",
@ -2856,6 +2894,91 @@ no_stream:
}
}
static gboolean
complete_session_sink (GstRtpBin * rtpbin, GstRtpBinSession * session)
{
gchar *gname;
guint sessid = session->id;
GstPad *recv_rtp_sink;
GstElement *decoder;
GstElementClass *klass;
GstPadTemplate *templ;
/* get recv_rtp pad and store */
session->recv_rtp_sink =
gst_element_get_request_pad (session->session, "recv_rtp_sink");
if (session->recv_rtp_sink == NULL)
goto pad_failed;
g_signal_connect (session->recv_rtp_sink, "notify::caps",
(GCallback) caps_changed, session);
GST_DEBUG_OBJECT (rtpbin, "requesting RTP decoder");
decoder = session_request_element (session, SIGNAL_REQUEST_RTP_DECODER);
if (decoder) {
GstPad *decsrc, *decsink;
GstPadLinkReturn ret;
GST_DEBUG_OBJECT (rtpbin, "linking RTP decoder");
decsink = gst_element_get_static_pad (decoder, "rtp_sink");
if (decsink == NULL)
goto dec_sink_failed;
recv_rtp_sink = decsink;
decsrc = gst_element_get_static_pad (decoder, "rtp_src");
if (decsrc == NULL)
goto dec_src_failed;
ret = gst_pad_link (decsrc, session->recv_rtp_sink);
gst_object_unref (decsrc);
if (ret != GST_PAD_LINK_OK)
goto dec_link_failed;
} else {
GST_DEBUG_OBJECT (rtpbin, "no RTP decoder given");
recv_rtp_sink = gst_object_ref (session->recv_rtp_sink);
}
GST_DEBUG_OBJECT (rtpbin, "ghosting session sink pad");
klass = GST_ELEMENT_GET_CLASS (rtpbin);
gname = g_strdup_printf ("recv_rtp_sink_%u", sessid);
templ = gst_element_class_get_pad_template (klass, "recv_rtp_sink_%u");
session->recv_rtp_sink_ghost =
gst_ghost_pad_new_from_template (gname, recv_rtp_sink, templ);
gst_object_unref (recv_rtp_sink);
gst_pad_set_active (session->recv_rtp_sink_ghost, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->recv_rtp_sink_ghost);
g_free (gname);
return TRUE;
/* ERRORS */
pad_failed:
{
g_warning ("rtpbin: failed to get session recv_rtp_sink pad");
return FALSE;
}
dec_sink_failed:
{
g_warning ("rtpbin: failed to get decoder sink pad for session %d", sessid);
return FALSE;
}
dec_src_failed:
{
g_warning ("rtpbin: failed to get decoder src pad for session %d", sessid);
gst_object_unref (recv_rtp_sink);
return FALSE;
}
dec_link_failed:
{
g_warning ("rtpbin: failed to link rtp decoder for session %d", sessid);
gst_object_unref (recv_rtp_sink);
return FALSE;
}
}
/* Create a pad for receiving RTP for the session in @name. Must be called with
* RTP_BIN_LOCK.
*/
@ -2863,8 +2986,8 @@ static GstPad *
create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
{
guint sessid;
GstElement *decoder;
GstPad *sinkdpad, *decsink;
GstElement *aux;
GstPad *recv_rtp_src;
GstRtpBinSession *session;
/* first get the session number */
@ -2887,69 +3010,61 @@ create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
if (session->recv_rtp_sink_ghost != NULL)
return session->recv_rtp_sink_ghost;
GST_DEBUG_OBJECT (rtpbin, "getting RTP sink pad");
/* get recv_rtp pad and store */
session->recv_rtp_sink =
gst_element_get_request_pad (session->session, "recv_rtp_sink");
if (session->recv_rtp_sink == NULL)
goto pad_failed;
/* setup the session sink pad */
if (!complete_session_sink (rtpbin, session))
goto session_sink_failed;
g_signal_connect (session->recv_rtp_sink, "notify::caps",
(GCallback) caps_changed, session);
GST_DEBUG_OBJECT (rtpbin, "requesting RTP decoder");
decoder = session_request_element (session, SIGNAL_REQUEST_RTP_DECODER);
if (decoder) {
GstPad *decsrc;
GstPadLinkReturn ret;
GST_DEBUG_OBJECT (rtpbin, "linking RTP decoder");
decsink = gst_element_get_static_pad (decoder, "rtp_sink");
decsrc = gst_element_get_static_pad (decoder, "rtp_src");
if (decsink == NULL)
goto dec_sink_failed;
if (decsrc == NULL)
goto dec_src_failed;
ret = gst_pad_link (decsrc, session->recv_rtp_sink);
gst_object_unref (decsrc);
if (ret != GST_PAD_LINK_OK)
goto dec_link_failed;
} else {
GST_DEBUG_OBJECT (rtpbin, "no RTP decoder given");
decsink = gst_object_ref (session->recv_rtp_sink);
}
GST_DEBUG_OBJECT (rtpbin, "getting RTP src pad");
/* get srcpad, link to SSRCDemux */
session->recv_rtp_src =
gst_element_get_static_pad (session->session, "recv_rtp_src");
if (session->recv_rtp_src == NULL)
goto src_pad_failed;
goto pad_failed;
GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTP sink pad");
sinkdpad = gst_element_get_static_pad (session->demux, "sink");
GST_DEBUG_OBJECT (rtpbin, "linking demuxer RTP sink pad");
gst_pad_link_full (session->recv_rtp_src, sinkdpad,
GST_PAD_LINK_CHECK_NOTHING);
gst_object_unref (sinkdpad);
/* find out if we need AUX elements or if we can go into the SSRC demuxer
* directly */
aux = session_request_element (session, SIGNAL_REQUEST_AUX_RECEIVER);
if (aux) {
gchar *pname;
GstPad *auxsink;
GstPadLinkReturn ret;
/* connect to the new-ssrc-pad signal of the SSRC demuxer */
session->demux_newpad_sig = g_signal_connect (session->demux,
"new-ssrc-pad", (GCallback) new_ssrc_pad_found, session);
session->demux_padremoved_sig = g_signal_connect (session->demux,
"removed-ssrc-pad", (GCallback) ssrc_demux_pad_removed, session);
GST_DEBUG_OBJECT (rtpbin, "linking AUX receiver");
GST_DEBUG_OBJECT (rtpbin, "ghosting session sink pad");
session->recv_rtp_sink_ghost =
gst_ghost_pad_new_from_template (name, decsink, templ);
gst_object_unref (decsink);
gst_pad_set_active (session->recv_rtp_sink_ghost, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->recv_rtp_sink_ghost);
pname = g_strdup_printf ("sink_%d", sessid);
auxsink = gst_element_get_static_pad (aux, pname);
g_free (pname);
if (auxsink == NULL)
goto aux_sink_failed;
ret = gst_pad_link (session->recv_rtp_src, auxsink);
gst_object_unref (auxsink);
if (ret != GST_PAD_LINK_OK)
goto aux_link_failed;
/* this can be NULL when this AUX element is not to be linked to
* an SSRC demuxer */
pname = g_strdup_printf ("src_%d", sessid);
recv_rtp_src = gst_element_get_static_pad (aux, pname);
g_free (pname);
} else {
recv_rtp_src = gst_object_ref (session->recv_rtp_src);
}
if (recv_rtp_src) {
GstPad *sinkdpad;
GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTP sink pad");
sinkdpad = gst_element_get_static_pad (session->demux, "sink");
GST_DEBUG_OBJECT (rtpbin, "linking demuxer RTP sink pad");
gst_pad_link_full (recv_rtp_src, sinkdpad, GST_PAD_LINK_CHECK_NOTHING);
gst_object_unref (recv_rtp_src);
gst_object_unref (sinkdpad);
/* connect to the new-ssrc-pad signal of the SSRC demuxer */
session->demux_newpad_sig = g_signal_connect (session->demux,
"new-ssrc-pad", (GCallback) new_ssrc_pad_found, session);
session->demux_padremoved_sig = g_signal_connect (session->demux,
"removed-ssrc-pad", (GCallback) ssrc_demux_pad_removed, session);
}
return session->recv_rtp_sink_ghost;
/* ERRORS */
@ -2963,32 +3078,24 @@ create_error:
/* create_session already warned */
return NULL;
}
session_sink_failed:
{
/* warning already done */
return NULL;
}
pad_failed:
{
g_warning ("rtpbin: failed to get session rtp_sink pad");
g_warning ("rtpbin: failed to get session recv_rtp_src pad");
return NULL;
}
dec_sink_failed:
aux_sink_failed:
{
g_warning ("rtpbin: failed to get decoder sink pad for session %d", sessid);
g_warning ("rtpbin: failed to get AUX sink pad for session %d", sessid);
return NULL;
}
dec_src_failed:
aux_link_failed:
{
g_warning ("rtpbin: failed to get decoder src pad for session %d", sessid);
gst_object_unref (decsink);
return NULL;
}
dec_link_failed:
{
g_warning ("rtpbin: failed to link rtp decoder for session %d", sessid);
gst_object_unref (decsink);
return NULL;
}
src_pad_failed:
{
g_warning ("rtpbin: failed to get session rtp_src pad");
gst_object_unref (decsink);
g_warning ("rtpbin: failed to link AUX pad to session %d", sessid);
return NULL;
}
}
@ -3168,18 +3275,195 @@ remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
}
}
static gboolean
complete_session_src (GstRtpBin * rtpbin, GstRtpBinSession * session)
{
gchar *gname;
guint sessid = session->id;
GstPad *send_rtp_src;
GstElement *encoder;
GstElementClass *klass;
GstPadTemplate *templ;
/* get srcpad */
session->send_rtp_src =
gst_element_get_static_pad (session->session, "send_rtp_src");
if (session->send_rtp_src == NULL)
goto no_srcpad;
GST_DEBUG_OBJECT (rtpbin, "getting RTP encoder");
encoder = session_request_element (session, SIGNAL_REQUEST_RTP_ENCODER);
if (encoder) {
gchar *ename;
GstPad *encsrc, *encsink;
GstPadLinkReturn ret;
GST_DEBUG_OBJECT (rtpbin, "linking RTP encoder");
ename = g_strdup_printf ("rtp_src_%d", sessid);
encsrc = gst_element_get_static_pad (encoder, ename);
g_free (ename);
if (encsrc == NULL)
goto enc_src_failed;
send_rtp_src = encsrc;
ename = g_strdup_printf ("rtp_sink_%d", sessid);
encsink = gst_element_get_static_pad (encoder, ename);
g_free (ename);
if (encsink == NULL)
goto enc_sink_failed;
ret = gst_pad_link (session->send_rtp_src, encsink);
gst_object_unref (encsink);
if (ret != GST_PAD_LINK_OK)
goto enc_link_failed;
} else {
GST_DEBUG_OBJECT (rtpbin, "no RTP encoder given");
send_rtp_src = gst_object_ref (session->send_rtp_src);
}
/* ghost the new source pad */
klass = GST_ELEMENT_GET_CLASS (rtpbin);
gname = g_strdup_printf ("send_rtp_src_%u", sessid);
templ = gst_element_class_get_pad_template (klass, "send_rtp_src_%u");
session->send_rtp_src_ghost =
gst_ghost_pad_new_from_template (gname, send_rtp_src, templ);
gst_object_unref (send_rtp_src);
gst_pad_set_active (session->send_rtp_src_ghost, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_src_ghost);
g_free (gname);
return TRUE;
/* ERRORS */
no_srcpad:
{
g_warning ("rtpbin: failed to get rtp source pad for session %d", sessid);
return FALSE;
}
enc_src_failed:
{
g_warning ("rtpbin: failed to get encoder src pad for session %d", sessid);
return FALSE;
}
enc_sink_failed:
{
g_warning ("rtpbin: failed to get encoder sink pad for session %d", sessid);
gst_object_unref (send_rtp_src);
return FALSE;
}
enc_link_failed:
{
g_warning ("rtpbin: failed to link rtp encoder for session %d", sessid);
gst_object_unref (send_rtp_src);
return FALSE;
}
}
static gboolean
setup_aux_sender_fold (const GValue * item, GValue * result, gpointer user_data)
{
GstPad *pad;
gchar *name;
guint sessid;
GstRtpBinSession *session = user_data, *newsess;
GstRtpBin *rtpbin = session->bin;
GstPadLinkReturn ret;
pad = g_value_get_object (item);
name = gst_pad_get_name (pad);
if (name == NULL || sscanf (name, "src_%u", &sessid) != 1)
goto no_name;
g_free (name);
newsess = find_session_by_id (rtpbin, sessid);
if (newsess == NULL) {
/* create new session */
newsess = create_session (rtpbin, sessid);
if (newsess == NULL)
goto create_error;
} else if (newsess->send_rtp_sink != NULL)
goto existing_session;
/* get send_rtp pad and store */
newsess->send_rtp_sink =
gst_element_get_request_pad (newsess->session, "send_rtp_sink");
if (newsess->send_rtp_sink == NULL)
goto pad_failed;
ret = gst_pad_link (pad, newsess->send_rtp_sink);
if (ret != GST_PAD_LINK_OK)
goto aux_link_failed;
if (!complete_session_src (rtpbin, newsess))
goto session_src_failed;
return TRUE;
/* ERRORS */
no_name:
{
GST_WARNING ("ignoring invalid pad name %s", GST_STR_NULL (name));
g_free (name);
return TRUE;
}
create_error:
{
/* create_session already warned */
return FALSE;
}
existing_session:
{
g_warning ("rtpbin: session %d is already a sender", sessid);
return FALSE;
}
pad_failed:
{
g_warning ("rtpbin: failed to get session pad for session %d", sessid);
return FALSE;
}
aux_link_failed:
{
g_warning ("rtpbin: failed to link AUX for session %d", sessid);
return FALSE;
}
session_src_failed:
{
g_warning ("rtpbin: failed to complete AUX for session %d", sessid);
return FALSE;
}
}
static gboolean
setup_aux_sender (GstRtpBin * rtpbin, GstRtpBinSession * session,
GstElement * aux)
{
GstIterator *it;
GValue result = { 0, };
GstIteratorResult res;
it = gst_element_iterate_src_pads (aux);
res = gst_iterator_fold (it, setup_aux_sender_fold, &result, session);
gst_iterator_free (it);
return res == GST_ITERATOR_DONE;
}
/* Create a pad for sending RTP for the session in @name. Must be called with
* RTP_BIN_LOCK.
*/
static GstPad *
create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
{
gchar *gname;
gchar *pname;
guint sessid;
GstPad *encsrc;
GstElement *encoder;
GstPad *send_rtp_sink;
GstElement *aux;
GstRtpBinSession *session;
GstElementClass *klass;
/* first get the session number */
if (name == NULL || sscanf (name, "send_rtp_sink_%u", &sessid) != 1)
@ -3198,64 +3482,41 @@ create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
if (session->send_rtp_sink_ghost != NULL)
return session->send_rtp_sink_ghost;
/* get send_rtp pad and store */
session->send_rtp_sink =
gst_element_get_request_pad (session->session, "send_rtp_sink");
if (session->send_rtp_sink == NULL)
goto pad_failed;
/* check if we are already using this session as a sender */
if (session->send_rtp_sink != NULL)
goto existing_session;
session->send_rtp_sink_ghost =
gst_ghost_pad_new_from_template (name, session->send_rtp_sink, templ);
gst_pad_set_active (session->send_rtp_sink_ghost, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_sink_ghost);
GST_DEBUG_OBJECT (rtpbin, "getting RTP AUX sender");
aux = session_request_element (session, SIGNAL_REQUEST_AUX_SENDER);
if (aux) {
GST_DEBUG_OBJECT (rtpbin, "linking AUX sender");
if (!setup_aux_sender (rtpbin, session, aux))
goto aux_session_failed;
/* get srcpad */
session->send_rtp_src =
gst_element_get_static_pad (session->session, "send_rtp_src");
if (session->send_rtp_src == NULL)
goto no_srcpad;
pname = g_strdup_printf ("sink_%d", sessid);
send_rtp_sink = gst_element_get_static_pad (aux, pname);
g_free (pname);
GST_DEBUG_OBJECT (rtpbin, "getting RTP encoder");
encoder = session_request_element (session, SIGNAL_REQUEST_RTP_ENCODER);
if (encoder) {
gchar *ename;
GstPad *encsink;
GstPadLinkReturn ret;
GST_DEBUG_OBJECT (rtpbin, "linking RTP encoder");
ename = g_strdup_printf ("rtp_sink_%d", sessid);
encsink = gst_element_get_static_pad (encoder, ename);
g_free (ename);
ename = g_strdup_printf ("rtp_src_%d", sessid);
encsrc = gst_element_get_static_pad (encoder, ename);
g_free (ename);
if (encsrc == NULL)
goto enc_src_failed;
if (encsink == NULL)
goto enc_sink_failed;
ret = gst_pad_link (session->send_rtp_src, encsink);
gst_object_unref (encsink);
if (ret != GST_PAD_LINK_OK)
goto enc_link_failed;
if (send_rtp_sink == NULL)
goto aux_sink_failed;
} else {
GST_DEBUG_OBJECT (rtpbin, "no RTP encoder given");
encsrc = gst_object_ref (session->send_rtp_src);
/* get send_rtp pad and store */
session->send_rtp_sink =
gst_element_get_request_pad (session->session, "send_rtp_sink");
if (session->send_rtp_sink == NULL)
goto pad_failed;
if (!complete_session_src (rtpbin, session))
goto session_src_failed;
send_rtp_sink = gst_object_ref (session->send_rtp_sink);
}
/* ghost the new source pad */
klass = GST_ELEMENT_GET_CLASS (rtpbin);
gname = g_strdup_printf ("send_rtp_src_%u", sessid);
templ = gst_element_class_get_pad_template (klass, "send_rtp_src_%u");
session->send_rtp_src_ghost =
gst_ghost_pad_new_from_template (gname, encsrc, templ);
gst_object_unref (encsrc);
gst_pad_set_active (session->send_rtp_src_ghost, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_src_ghost);
g_free (gname);
session->send_rtp_sink_ghost =
gst_ghost_pad_new_from_template (name, send_rtp_sink, templ);
gst_object_unref (send_rtp_sink);
gst_pad_set_active (session->send_rtp_sink_ghost, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_sink_ghost);
return session->send_rtp_sink_ghost;
@ -3270,31 +3531,29 @@ create_error:
/* create_session already warned */
return NULL;
}
existing_session:
{
g_warning ("rtpbin: session %d is already in use", sessid);
return NULL;
}
aux_session_failed:
{
g_warning ("rtpbin: failed to get AUX sink pad for session %d", sessid);
return NULL;
}
aux_sink_failed:
{
g_warning ("rtpbin: failed to get AUX sink pad for session %d", sessid);
return NULL;
}
pad_failed:
{
g_warning ("rtpbin: failed to get session pad for session %d", sessid);
return NULL;
}
no_srcpad:
session_src_failed:
{
g_warning ("rtpbin: failed to get rtp source pad for session %d", sessid);
return NULL;
}
enc_src_failed:
{
g_warning ("rtpbin: failed to get encoder src pad for session %d", sessid);
return NULL;
}
enc_sink_failed:
{
g_warning ("rtpbin: failed to get encoder sink pad for session %d", sessid);
gst_object_unref (encsrc);
return NULL;
}
enc_link_failed:
{
g_warning ("rtpbin: failed to link rtp encoder for session %d", sessid);
gst_object_unref (encsrc);
g_warning ("rtpbin: failed to setup source pads for session %d", sessid);
return NULL;
}
}

View file

@ -103,6 +103,9 @@ struct _GstRtpBinClass {
GstElement* (*request_rtp_decoder) (GstRtpBin *rtpbin, guint session);
GstElement* (*request_rtcp_encoder) (GstRtpBin *rtpbin, guint session);
GstElement* (*request_rtcp_decoder) (GstRtpBin *rtpbin, guint session);
GstElement* (*request_aux_sender) (GstRtpBin *rtpbin, guint session);
GstElement* (*request_aux_receiver) (GstRtpBin *rtpbin, guint session);
};
GType gst_rtp_bin_get_type (void);