diff --git a/gst/rtpmanager/Makefile.am b/gst/rtpmanager/Makefile.am index f844e47c33..9e47cbdf39 100644 --- a/gst/rtpmanager/Makefile.am +++ b/gst/rtpmanager/Makefile.am @@ -17,6 +17,9 @@ libgstrtpmanager_la_SOURCES = gstrtpmanager.c \ gstrtpjitterbuffer.c \ gstrtpptdemux.c \ gstrtpssrcdemux.c \ + rtpsession.c \ + rtpsource.c \ + rtpstats.c \ gstrtpsession.c nodist_libgstrtpmanager_la_SOURCES = \ @@ -28,11 +31,15 @@ noinst_HEADERS = gstrtpbin.h \ gstrtpjitterbuffer.h \ gstrtpptdemux.h \ gstrtpssrcdemux.h \ + rtpsession.h \ + rtpsource.h \ + rtpstats.h \ gstrtpsession.h libgstrtpmanager_la_CFLAGS = $(GST_CFLAGS) $(GST_PLUGINS_BASE_CFLAGS) $(ERROR_CFLAGS) libgstrtpmanager_la_LIBADD = $(GST_LIBS_LIBS) -libgstrtpmanager_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) $(GST_BASE_LIBS) $(GST_PLUGINS_BASE_LIBS) -lgstrtp-@GST_MAJORMINOR@ +libgstrtpmanager_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) $(GST_BASE_LIBS) $(GST_PLUGINS_BASE_LIBS) -lgstrtp-@GST_MAJORMINOR@ \ + -lgstnetbuffer-@GST_MAJORMINOR@ CLEANFILES = $(BUILT_SOURCES) diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c index 6825e9cc88..9162d76c4b 100644 --- a/gst/rtpmanager/gstrtpbin.c +++ b/gst/rtpmanager/gstrtpbin.c @@ -129,7 +129,7 @@ typedef struct _GstRTPBinClient GstRTPBinClient; static guint gst_rtp_bin_signals[LAST_SIGNAL] = { 0 }; static GstCaps *pt_map_requested (GstElement * element, guint pt, - GstRTPBinStream * stream); + GstRTPBinSession * session); /* Manages the RTP stream for one SSRC. * @@ -215,9 +215,9 @@ static GstRTPBinSession * create_session (GstRTPBin * rtpbin, gint id) { GstRTPBinSession *sess; - GstElement *elem, *demux; + GstElement *session, *demux; - if (!(elem = gst_element_factory_make ("rtpsession", NULL))) + if (!(session = gst_element_factory_make ("rtpsession", NULL))) goto no_session; if (!(demux = gst_element_factory_make ("rtpssrcdemux", NULL))) @@ -227,13 +227,17 @@ create_session (GstRTPBin * rtpbin, gint id) sess->lock = g_mutex_new (); sess->id = id; sess->bin = rtpbin; - sess->session = elem; + sess->session = session; sess->demux = demux; sess->ptmap = g_hash_table_new (NULL, NULL); rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess); - gst_bin_add (GST_BIN_CAST (rtpbin), elem); - gst_element_set_state (elem, GST_STATE_PLAYING); + /* provide clock_rate to the session manager when needed */ + g_signal_connect (session, "request-pt-map", + (GCallback) pt_map_requested, sess); + + gst_bin_add (GST_BIN_CAST (rtpbin), session); + gst_element_set_state (session, GST_STATE_PLAYING); gst_bin_add (GST_BIN_CAST (rtpbin), demux); gst_element_set_state (demux, GST_STATE_PLAYING); @@ -247,7 +251,7 @@ no_session: } no_demux: { - gst_object_unref (elem); + gst_object_unref (session); g_warning ("rtpbin: could not create rtpssrcdemux element"); return NULL; } @@ -351,7 +355,7 @@ create_stream (GstRTPBinSession * session, guint32 ssrc) /* provide clock_rate to the jitterbuffer when needed */ g_signal_connect (buffer, "request-pt-map", - (GCallback) pt_map_requested, stream); + (GCallback) pt_map_requested, session); gst_bin_add (GST_BIN_CAST (session->bin), buffer); gst_element_set_state (buffer, GST_STATE_PLAYING); @@ -590,14 +594,12 @@ new_payload_found (GstElement * element, guint pt, GstPad * pad, } static GstCaps * -pt_map_requested (GstElement * element, guint pt, GstRTPBinStream * stream) +pt_map_requested (GstElement * element, guint pt, GstRTPBinSession * session) { GstRTPBin *rtpbin; - GstRTPBinSession *session; GstCaps *caps; - rtpbin = stream->bin; - session = stream->session; + rtpbin = session->bin; GST_DEBUG_OBJECT (rtpbin, "payload map requested for pt %d in session %d", pt, session->id); @@ -647,7 +649,7 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad, * demuxer so that it can apply a proper caps on the buffers for the * depayloaders. */ stream->demux_ptreq_sig = g_signal_connect (stream->demux, - "request-pt-map", (GCallback) pt_map_requested, stream); + "request-pt-map", (GCallback) pt_map_requested, session); GST_RTP_SESSION_UNLOCK (session); diff --git a/gst/rtpmanager/gstrtpjitterbuffer.h b/gst/rtpmanager/gstrtpjitterbuffer.h index e101039a73..3cbcd62f1e 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.h +++ b/gst/rtpmanager/gstrtpjitterbuffer.h @@ -63,6 +63,7 @@ struct _GstRTPJitterBufferClass { GstElementClass parent_class; + /* signals */ GstCaps* (*request_pt_map) (GstRTPJitterBuffer *buffer, guint pt); /*< private > */ diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c index cdad7e9bc8..03b0802b65 100644 --- a/gst/rtpmanager/gstrtpsession.c +++ b/gst/rtpmanager/gstrtpsession.c @@ -39,7 +39,10 @@ #ifdef HAVE_CONFIG_H #include "config.h" #endif + +#include "gstrtpbin-marshal.h" #include "gstrtpsession.h" +#include "rtpsession.h" GST_DEBUG_CATEGORY_STATIC (gst_rtp_session_debug); #define GST_CAT_DEFAULT gst_rtp_session_debug @@ -95,8 +98,8 @@ GST_STATIC_PAD_TEMPLATE ("send_rtp_src", GST_STATIC_CAPS ("application/x-rtp") ); -static GstStaticPadTemplate rtpsession_rtcp_src_template = -GST_STATIC_PAD_TEMPLATE ("rtcp_src", +static GstStaticPadTemplate rtpsession_send_rtcp_src_template = +GST_STATIC_PAD_TEMPLATE ("send_rtcp_src", GST_PAD_SRC, GST_PAD_REQUEST, GST_STATIC_CAPS ("application/x-rtcp") @@ -105,7 +108,7 @@ GST_STATIC_PAD_TEMPLATE ("rtcp_src", /* signals and args */ enum { - /* FILL ME */ + SIGNAL_REQUEST_PT_MAP, LAST_SIGNAL }; @@ -123,6 +126,31 @@ enum struct _GstRTPSessionPrivate { GMutex *lock; + RTPSession *session; + /* thread for sending out RTCP */ + GstClockID id; + gboolean stop_thread; + GThread *thread; +}; + +/* callbacks to handle actions from the session manager */ +static GstFlowReturn gst_rtp_session_process_rtp (RTPSession * sess, + RTPSource * src, GstBuffer * buffer, gpointer user_data); +static GstFlowReturn gst_rtp_session_send_rtp (RTPSession * sess, + RTPSource * src, GstBuffer * buffer, gpointer user_data); +static GstFlowReturn gst_rtp_session_send_rtcp (RTPSession * sess, + RTPSource * src, GstBuffer * buffer, gpointer user_data); +static gint gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload, + gpointer user_data); +static GstClockTime gst_rtp_session_get_time (RTPSession * sess, + gpointer user_data); + +static RTPSessionCallbacks callbacks = { + gst_rtp_session_process_rtp, + gst_rtp_session_send_rtp, + gst_rtp_session_send_rtcp, + gst_rtp_session_clock_rate, + gst_rtp_session_get_time }; /* GObject vmethods */ @@ -139,7 +167,7 @@ static GstPad *gst_rtp_session_request_new_pad (GstElement * element, GstPadTemplate * templ, const gchar * name); static void gst_rtp_session_release_pad (GstElement * element, GstPad * pad); -/*static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 }; */ +static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 }; GST_BOILERPLATE (GstRTPSession, gst_rtp_session, GstElement, GST_TYPE_ELEMENT); @@ -164,7 +192,7 @@ gst_rtp_session_base_init (gpointer klass) gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&rtpsession_send_rtp_src_template)); gst_element_class_add_pad_template (element_class, - gst_static_pad_template_get (&rtpsession_rtcp_src_template)); + gst_static_pad_template_get (&rtpsession_send_rtcp_src_template)); gst_element_class_set_details (element_class, &rtpsession_details); } @@ -184,6 +212,19 @@ gst_rtp_session_class_init (GstRTPSessionClass * klass) gobject_class->set_property = gst_rtp_session_set_property; gobject_class->get_property = gst_rtp_session_get_property; + /** + * GstRTPSession::request-pt-map: + * @sess: the object which received the signal + * @pt: the pt + * + * Request the payload type as #GstCaps for @pt. + */ + gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP] = + g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPSessionClass, request_pt_map), + NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT, GST_TYPE_CAPS, 1, + G_TYPE_UINT); + gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_session_change_state); gstelement_class->request_new_pad = @@ -200,6 +241,9 @@ gst_rtp_session_init (GstRTPSession * rtpsession, GstRTPSessionClass * klass) { rtpsession->priv = GST_RTP_SESSION_GET_PRIVATE (rtpsession); rtpsession->priv->lock = g_mutex_new (); + rtpsession->priv->session = rtp_session_new (); + /* configure callbacks */ + rtp_session_set_callbacks (rtpsession->priv->session, &callbacks, rtpsession); } static void @@ -209,6 +253,7 @@ gst_rtp_session_finalize (GObject * object) rtpsession = GST_RTP_SESSION (object); g_mutex_free (rtpsession->priv->lock); + g_object_unref (rtpsession->priv->session); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -243,6 +288,87 @@ gst_rtp_session_get_property (GObject * object, guint prop_id, } } +static void +rtcp_thread (GstRTPSession * rtpsession) +{ + GstClock *clock; + GstClockID id; + + clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession)); + if (clock == NULL) + return; + + GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread"); + + GST_RTP_SESSION_LOCK (rtpsession); + while (!rtpsession->priv->stop_thread) { + gdouble timeout; + GstClockTime target; + + timeout = rtp_session_get_rtcp_interval (rtpsession->priv->session); + GST_DEBUG_OBJECT (rtpsession, "next RTCP timeout: %lf", timeout); + + target = gst_clock_get_time (clock); + target += GST_SECOND * timeout; + id = rtpsession->priv->id = gst_clock_new_single_shot_id (clock, target); + GST_RTP_SESSION_UNLOCK (rtpsession); + + gst_clock_id_wait (id, NULL); + + GST_DEBUG_OBJECT (rtpsession, "got RTCP timeout"); + + /* make the session manager produce RTCP, we ignore the result. */ + rtp_session_produce_rtcp (rtpsession->priv->session); + + GST_RTP_SESSION_LOCK (rtpsession); + gst_clock_id_unref (id); + rtpsession->priv->id = NULL; + } + GST_RTP_SESSION_UNLOCK (rtpsession); + + gst_object_unref (clock); + + GST_DEBUG_OBJECT (rtpsession, "leaving RTCP thread"); +} + +static gboolean +start_rtcp_thread (GstRTPSession * rtpsession) +{ + GError *error = NULL; + gboolean res; + + GST_DEBUG_OBJECT (rtpsession, "starting RTCP thread"); + + GST_RTP_SESSION_LOCK (rtpsession); + rtpsession->priv->stop_thread = FALSE; + rtpsession->priv->thread = + g_thread_create ((GThreadFunc) rtcp_thread, rtpsession, TRUE, &error); + GST_RTP_SESSION_UNLOCK (rtpsession); + + if (error != NULL) { + res = FALSE; + GST_DEBUG_OBJECT (rtpsession, "failed to start thread, %s", error->message); + g_error_free (error); + } else { + res = TRUE; + } + return res; +} + +static void +stop_rtcp_thread (GstRTPSession * rtpsession) +{ + GST_DEBUG_OBJECT (rtpsession, "stopping RTCP thread"); + + GST_RTP_SESSION_LOCK (rtpsession); + rtpsession->priv->stop_thread = TRUE; + if (rtpsession->priv->id) + gst_clock_id_unschedule (rtpsession->priv->id); + GST_RTP_SESSION_UNLOCK (rtpsession); + + g_thread_join (rtpsession->priv->thread); +} + static GstStateChangeReturn gst_rtp_session_change_state (GstElement * element, GstStateChange transition) { @@ -258,6 +384,8 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition) break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: break; + case GST_STATE_CHANGE_PLAYING_TO_PAUSED: + stop_rtcp_thread (rtpsession); default: break; } @@ -265,6 +393,10 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition) res = parent_class->change_state (element, transition); switch (transition) { + case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + if (!start_rtcp_thread (rtpsession)) + goto failed_thread; + break; case GST_STATE_CHANGE_PLAYING_TO_PAUSED: break; case GST_STATE_CHANGE_PAUSED_TO_READY: @@ -275,15 +407,158 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition) break; } return res; + + /* ERRORS */ +failed_thread: + { + return GST_STATE_CHANGE_FAILURE; + } +} + +/* called when the session manager has an RTP packet ready for further + * processing */ +static GstFlowReturn +gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src, + GstBuffer * buffer, gpointer user_data) +{ + GstFlowReturn result; + GstRTPSession *rtpsession; + GstRTPSessionPrivate *priv; + + rtpsession = GST_RTP_SESSION (user_data); + priv = rtpsession->priv; + + if (rtpsession->recv_rtp_src) { + result = gst_pad_push (rtpsession->recv_rtp_src, buffer); + } else { + gst_buffer_unref (buffer); + result = GST_FLOW_OK; + } + return result; +} + +/* called when the session manager has an RTP packet ready for further + * sending */ +static GstFlowReturn +gst_rtp_session_send_rtp (RTPSession * sess, RTPSource * src, + GstBuffer * buffer, gpointer user_data) +{ + GstFlowReturn result; + GstRTPSession *rtpsession; + GstRTPSessionPrivate *priv; + + rtpsession = GST_RTP_SESSION (user_data); + priv = rtpsession->priv; + + if (rtpsession->send_rtp_src) { + result = gst_pad_push (rtpsession->send_rtp_src, buffer); + } else { + gst_buffer_unref (buffer); + result = GST_FLOW_OK; + } + return result; +} + +/* called when the session manager has an RTCP packet ready for further + * sending */ +static GstFlowReturn +gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src, + GstBuffer * buffer, gpointer user_data) +{ + GstFlowReturn result; + GstRTPSession *rtpsession; + GstRTPSessionPrivate *priv; + + rtpsession = GST_RTP_SESSION (user_data); + priv = rtpsession->priv; + + if (rtpsession->send_rtcp_src) { + result = gst_pad_push (rtpsession->send_rtcp_src, buffer); + } else { + gst_buffer_unref (buffer); + result = GST_FLOW_OK; + } + return result; +} + + +/* called when the session manager needs the clock rate */ +static gint +gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload, + gpointer user_data) +{ + gint result = -1; + GstRTPSession *rtpsession; + GValue ret = { 0 }; + GValue args[2] = { {0}, {0} }; + GstCaps *caps; + const GstStructure *caps_struct; + + rtpsession = GST_RTP_SESSION_CAST (user_data); + + g_value_init (&args[0], GST_TYPE_ELEMENT); + g_value_set_object (&args[0], rtpsession); + g_value_init (&args[1], G_TYPE_UINT); + g_value_set_uint (&args[1], payload); + + g_value_init (&ret, GST_TYPE_CAPS); + g_value_set_boxed (&ret, NULL); + + g_signal_emitv (args, gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP], 0, + &ret); + + caps = (GstCaps *) g_value_get_boxed (&ret); + if (!caps) + goto no_caps; + + caps_struct = gst_caps_get_structure (caps, 0); + if (!gst_structure_get_int (caps_struct, "clock-rate", &result)) + goto no_clock_rate; + + return result; + + /* ERRORS */ +no_caps: + { + GST_DEBUG_OBJECT (rtpsession, "could not get caps"); + return -1; + } +no_clock_rate: + { + GST_DEBUG_OBJECT (rtpsession, "could not clock-rate from caps"); + return -1; + } +} + +/* called when the session manager needs the time of clock */ +static GstClockTime +gst_rtp_session_get_time (RTPSession * sess, gpointer user_data) +{ + GstClockTime result; + GstRTPSession *rtpsession; + GstClock *clock; + + rtpsession = GST_RTP_SESSION_CAST (user_data); + + clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession)); + if (clock) { + result = gst_clock_get_time (clock); + gst_object_unref (clock); + } else + result = GST_CLOCK_TIME_NONE; + + return result; } static GstFlowReturn gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event) { GstRTPSession *rtpsession; + GstRTPSessionPrivate *priv; gboolean ret = FALSE; rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); + priv = rtpsession->priv; GST_DEBUG_OBJECT (rtpsession, "received event %s", GST_EVENT_TYPE_NAME (event)); @@ -305,14 +580,15 @@ static GstFlowReturn gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer) { GstRTPSession *rtpsession; + GstRTPSessionPrivate *priv; GstFlowReturn ret; rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); + priv = rtpsession->priv; GST_DEBUG_OBJECT (rtpsession, "received RTP packet"); - /* FIXME, do something */ - ret = gst_pad_push (rtpsession->recv_rtp_src, buffer); + ret = rtp_session_process_rtp (priv->session, buffer); gst_object_unref (rtpsession); @@ -323,9 +599,11 @@ static GstFlowReturn gst_rtp_session_event_recv_rtcp_sink (GstPad * pad, GstEvent * event) { GstRTPSession *rtpsession; + GstRTPSessionPrivate *priv; gboolean ret = FALSE; rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); + priv = rtpsession->priv; GST_DEBUG_OBJECT (rtpsession, "received event %s", GST_EVENT_TYPE_NAME (event)); @@ -347,14 +625,15 @@ static GstFlowReturn gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstBuffer * buffer) { GstRTPSession *rtpsession; + GstRTPSessionPrivate *priv; GstFlowReturn ret; rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); + priv = rtpsession->priv; - /* FIXME, do something */ GST_DEBUG_OBJECT (rtpsession, "received RTCP packet"); - ret = gst_pad_push (rtpsession->sync_src, buffer); + ret = rtp_session_process_rtcp (priv->session, buffer); gst_object_unref (rtpsession); @@ -365,9 +644,11 @@ static GstFlowReturn gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstEvent * event) { GstRTPSession *rtpsession; + GstRTPSessionPrivate *priv; gboolean ret = FALSE; rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); + priv = rtpsession->priv; GST_DEBUG_OBJECT (rtpsession, "received event"); @@ -388,14 +669,15 @@ static GstFlowReturn gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer) { GstRTPSession *rtpsession; + GstRTPSessionPrivate *priv; GstFlowReturn ret; rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); + priv = rtpsession->priv; GST_DEBUG_OBJECT (rtpsession, "received RTP packet"); - /* FIXME, do something */ - ret = gst_pad_push (rtpsession->send_rtp_src, buffer); + ret = rtp_session_send_rtp (priv->session, buffer); gst_object_unref (rtpsession); @@ -494,16 +776,18 @@ create_send_rtp_sink (GstRTPSession * rtpsession) * RTCP packets. */ static GstPad * -create_rtcp_src (GstRTPSession * rtpsession) +create_send_rtcp_src (GstRTPSession * rtpsession) { GST_DEBUG_OBJECT (rtpsession, "creating pad"); - rtpsession->rtcp_src = - gst_pad_new_from_static_template (&rtpsession_rtcp_src_template, NULL); - gst_pad_set_active (rtpsession->rtcp_src, TRUE); - gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->rtcp_src); + rtpsession->send_rtcp_src = + gst_pad_new_from_static_template (&rtpsession_send_rtcp_src_template, + NULL); + gst_pad_set_active (rtpsession->send_rtcp_src, TRUE); + gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), + rtpsession->send_rtcp_src); - return rtpsession->rtcp_src; + return rtpsession->send_rtcp_src; } static GstPad * @@ -542,11 +826,12 @@ gst_rtp_session_request_new_pad (GstElement * element, goto exists; result = create_send_rtp_sink (rtpsession); - } else if (templ == gst_element_class_get_pad_template (klass, "rtcp_src")) { - if (rtpsession->rtcp_src != NULL) + } else if (templ == gst_element_class_get_pad_template (klass, + "send_rtcp_src")) { + if (rtpsession->send_rtcp_src != NULL) goto exists; - result = create_rtcp_src (rtpsession); + result = create_send_rtcp_src (rtpsession); } else goto wrong_template; diff --git a/gst/rtpmanager/gstrtpsession.h b/gst/rtpmanager/gstrtpsession.h index 8b34306457..25bbb6ebd0 100644 --- a/gst/rtpmanager/gstrtpsession.h +++ b/gst/rtpmanager/gstrtpsession.h @@ -32,6 +32,7 @@ (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTP_SESSION)) #define GST_IS_RTP_SESSION_CLASS(klass) \ (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTP_SESSION)) +#define GST_RTP_SESSION_CAST(obj) ((GstRTPSession *)(obj)) typedef struct _GstRTPSession GstRTPSession; typedef struct _GstRTPSessionClass GstRTPSessionClass; @@ -48,13 +49,16 @@ struct _GstRTPSession { GstPad *recv_rtp_src; GstPad *sync_src; GstPad *send_rtp_src; - GstPad *rtcp_src; + GstPad *send_rtcp_src; GstRTPSessionPrivate *priv; }; struct _GstRTPSessionClass { GstElementClass parent_class; + + /* signals */ + GstCaps* (*request_pt_map) (GstRTPSession *sess, guint pt); }; GType gst_rtp_session_get_type (void); diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c new file mode 100644 index 0000000000..2283dc9727 --- /dev/null +++ b/gst/rtpmanager/rtpsession.c @@ -0,0 +1,1026 @@ +/* GStreamer + * Copyright (C) <2007> Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#include + +#include +#include +#include + +#include "rtpsession.h" + +GST_DEBUG_CATEGORY_STATIC (rtp_session_debug); +#define GST_CAT_DEFAULT rtp_session_debug + +/* signals and args */ +enum +{ + SIGNAL_ON_NEW_SSRC, + SIGNAL_ON_SSRC_COLLISION, + SIGNAL_ON_SSRC_VALIDATED, + SIGNAL_ON_BYE_SSRC, + LAST_SIGNAL +}; + +#define RTP_DEFAULT_BANDWIDTH 64000.0 +#define RTP_DEFAULT_RTCP_BANDWIDTH 1000 + +enum +{ + PROP_0 +}; + +/* GObject vmethods */ +static void rtp_session_finalize (GObject * object); +static void rtp_session_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void rtp_session_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); + +static guint rtp_session_signals[LAST_SIGNAL] = { 0 }; + +G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT); + +static void +rtp_session_class_init (RTPSessionClass * klass) +{ + GObjectClass *gobject_class; + + gobject_class = (GObjectClass *) klass; + + gobject_class->finalize = rtp_session_finalize; + gobject_class->set_property = rtp_session_set_property; + gobject_class->get_property = rtp_session_get_property; + + /** + * RTPSession::on-new-ssrc: + * @session: the object which received the signal + * @src: the new RTPSource + * + * Notify of a new SSRC that entered @session. + */ + rtp_session_signals[SIGNAL_ON_NEW_SSRC] = + g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc), + NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, + G_TYPE_OBJECT); + /** + * RTPSession::on-ssrc_collision: + * @session: the object which received the signal + * @src: the #RTPSource that caused a collision + * + * Notify when we have an SSRC collision + */ + rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] = + g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision), + NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, + G_TYPE_OBJECT); + /** + * RTPSession::on-ssrc_validated: + * @session: the object which received the signal + * @src: the new validated RTPSource + * + * Notify of a new SSRC that became validated. + */ + rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] = + g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated), + NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, + G_TYPE_OBJECT); + /** + * RTPSession::on-bye-ssrc: + * @session: the object which received the signal + * @src: the RTPSource that went away + * + * Notify of an SSRC that became inactive because of a BYE packet. + */ + rtp_session_signals[SIGNAL_ON_BYE_SSRC] = + g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc), + NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, + G_TYPE_OBJECT); + + GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session"); +} + +static void +rtp_session_init (RTPSession * sess) +{ + sess->lock = g_mutex_new (); + sess->ssrcs = + g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify) g_object_unref); + sess->cnames = g_hash_table_new_full (NULL, NULL, g_free, NULL); + + /* create an SSRC for this session manager */ + sess->source = rtp_session_create_source (sess); + + rtp_stats_init_defaults (&sess->stats); + + /* default UDP header length */ + sess->header_len = 28; + + GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc); +} + +static void +rtp_session_finalize (GObject * object) +{ + RTPSession *sess; + + sess = RTP_SESSION_CAST (object); + + g_mutex_free (sess->lock); + g_hash_table_unref (sess->ssrcs); + g_hash_table_unref (sess->cnames); + g_object_unref (sess->source); + + G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object); +} + +static void +rtp_session_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + RTPSession *sess; + + sess = RTP_SESSION (object); + + switch (prop_id) { + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +rtp_session_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + RTPSession *sess; + + sess = RTP_SESSION (object); + + switch (prop_id) { + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +on_new_ssrc (RTPSession * sess, RTPSource * source) +{ + g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source); +} + +static void +on_ssrc_collision (RTPSession * sess, RTPSource * source) +{ + g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0, + source); +} + +static void +on_ssrc_validated (RTPSession * sess, RTPSource * source) +{ + g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0, + source); +} + +static void +on_bye_ssrc (RTPSession * sess, RTPSource * source) +{ + g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source); +} + +/** + * rtp_session_new: + * + * Create a new session object. + * + * Returns: a new #RTPSession. g_object_unref() after usage. + */ +RTPSession * +rtp_session_new (void) +{ + RTPSession *sess; + + sess = g_object_new (RTP_TYPE_SESSION, NULL); + + return sess; +} + +/** + * rtp_session_set_callbacks: + * @sess: an #RTPSession + * @callbacks: callbacks to configure + * @user_data: user data passed in the callbacks + * + * Configure a set of callbacks to be notified of actions. + */ +void +rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks, + gpointer user_data) +{ + g_return_if_fail (RTP_IS_SESSION (sess)); + + sess->callbacks.process_rtp = callbacks->process_rtp; + sess->callbacks.send_rtp = callbacks->send_rtp; + sess->callbacks.send_rtcp = callbacks->send_rtcp; + sess->callbacks.clock_rate = callbacks->clock_rate; + sess->callbacks.get_time = callbacks->get_time; + sess->user_data = user_data; +} + +/** + * rtp_session_set_bandwidth: + * @sess: an #RTPSession + * @bandwidth: the bandwidth allocated + * + * Set the session bandwidth in bytes per second. + */ +void +rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth) +{ + g_return_if_fail (RTP_IS_SESSION (sess)); + + sess->stats.bandwidth = bandwidth; +} + +/** + * rtp_session_get_bandwidth: + * @sess: an #RTPSession + * + * Get the session bandwidth. + * + * Returns: the session bandwidth. + */ +gdouble +rtp_session_get_bandwidth (RTPSession * sess) +{ + g_return_val_if_fail (RTP_IS_SESSION (sess), 0); + + return sess->stats.bandwidth; +} + +/** + * rtp_session_set_rtcp_bandwidth: + * @sess: an #RTPSession + * @bandwidth: the RTCP bandwidth + * + * Set the bandwidth that should be used for RTCP + * messages. + */ +void +rtp_session_set_rtcp_bandwidth (RTPSession * sess, gdouble bandwidth) +{ + g_return_if_fail (RTP_IS_SESSION (sess)); + + sess->stats.rtcp_bandwidth = bandwidth; +} + +/** + * rtp_session_get_rtcp_bandwidth: + * @sess: an #RTPSession + * + * Get the session bandwidth used for RTCP. + * + * Returns: The bandwidth used for RTCP messages. + */ +gdouble +rtp_session_get_rtcp_bandwidth (RTPSession * sess) +{ + g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0); + + return sess->stats.rtcp_bandwidth; +} + +static GstFlowReturn +source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session) +{ + GstFlowReturn result = GST_FLOW_OK; + + if (source == session->source) { + GST_DEBUG ("source %08x pushed sender RTP packet", source->ssrc); + if (session->callbacks.send_rtp) + result = + session->callbacks.send_rtp (session, source, buffer, + session->user_data); + else + gst_buffer_unref (buffer); + } else { + GST_DEBUG ("source %08x pushed receiver RTP packet", source->ssrc); + if (session->callbacks.process_rtp) + result = + session->callbacks.process_rtp (session, source, buffer, + session->user_data); + else + gst_buffer_unref (buffer); + } + return result; +} + +static gint +source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session) +{ + gint result; + + if (session->callbacks.clock_rate) + result = session->callbacks.clock_rate (session, pt, session->user_data); + else + result = -1; + + GST_DEBUG ("got clock-rate %d for pt %d", result, pt); + + return result; +} + +static RTPSourceCallbacks callbacks = { + (RTPSourcePushRTP) source_push_rtp, + (RTPSourceClockRate) source_clock_rate, +}; + +static gboolean +check_collision (RTPSession * sess, RTPSource * source, + RTPArrivalStats * arrival) +{ + /* FIXME, do collision check */ + return FALSE; +} + +static RTPSource * +obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created, + RTPArrivalStats * arrival, gboolean rtp) +{ + RTPSource *source; + + source = g_hash_table_lookup (sess->ssrcs, GINT_TO_POINTER (ssrc)); + if (source == NULL) { + /* make new Source in probation and insert */ + source = rtp_source_new (ssrc); + + if (rtp) + source->probation = RTP_DEFAULT_PROBATION; + else + source->probation = 0; + + /* store from address, if any */ + if (arrival->have_address) { + if (rtp) + rtp_source_set_rtp_from (source, &arrival->address); + else + rtp_source_set_rtcp_from (source, &arrival->address); + } + + /* configure a callback on the source */ + rtp_source_set_callbacks (source, &callbacks, sess); + + g_hash_table_insert (sess->ssrcs, GINT_TO_POINTER (ssrc), source); + + /* we have one more source now */ + sess->total_sources++; + *created = TRUE; + } else { + *created = FALSE; + /* check for collision, this updates the address when not previously set */ + if (check_collision (sess, source, arrival)) + on_ssrc_collision (sess, source); + } + return source; +} + +/** + * rtp_session_add_source: + * @sess: a #RTPSession + * @src: #RTPSource to add + * + * Add @src to @session. + * + * Returns: %TRUE on success, %FALSE if a source with the same SSRC already + * existed in the session. + */ +gboolean +rtp_session_add_source (RTPSession * sess, RTPSource * src) +{ + gboolean result = FALSE; + RTPSource *find; + + g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE); + g_return_val_if_fail (src != NULL, FALSE); + + RTP_SESSION_LOCK (sess); + find = g_hash_table_lookup (sess->ssrcs, GINT_TO_POINTER (src->ssrc)); + if (find == NULL) { + g_hash_table_insert (sess->ssrcs, GINT_TO_POINTER (src->ssrc), src); + /* we have one more source now */ + sess->total_sources++; + result = TRUE; + } + RTP_SESSION_UNLOCK (sess); + + return result; +} + +/** + * rtp_session_get_num_sources: + * @sess: an #RTPSession + * + * Get the number of sources in @sess. + * + * Returns: The number of sources in @sess. + */ +gint +rtp_session_get_num_sources (RTPSession * sess) +{ + gint result; + + g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE); + + RTP_SESSION_LOCK (sess); + result = sess->total_sources; + RTP_SESSION_UNLOCK (sess); + + return result; +} + +/** + * rtp_session_get_num_active_sources: + * @sess: an #RTPSession + * + * Get the number of active sources in @sess. A source is considered active when + * it has been validated and has not yet received a BYE RTCP message. + * + * Returns: The number of active sources in @sess. + */ +gint +rtp_session_get_num_active_sources (RTPSession * sess) +{ + gint result; + + g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE); + + RTP_SESSION_LOCK (sess); + result = sess->stats.active_sources; + RTP_SESSION_UNLOCK (sess); + + return result; +} + +/** + * rtp_session_get_source_by_ssrc: + * @sess: an #RTPSession + * @ssrc: an SSRC + * + * Find the source with @ssrc in @sess. + * + * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found. + * g_object_unref() after usage. + */ +RTPSource * +rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc) +{ + RTPSource *result; + + g_return_val_if_fail (RTP_IS_SESSION (sess), NULL); + + RTP_SESSION_LOCK (sess); + result = g_hash_table_lookup (sess->ssrcs, GINT_TO_POINTER (ssrc)); + if (result) + g_object_ref (result); + RTP_SESSION_UNLOCK (sess); + + return result; +} + +/** + * rtp_session_get_source_by_cname: + * @sess: a #RTPSession + * @cname: an CNAME + * + * Find the source with @cname in @sess. + * + * Returns: a #RTPSource with CNAME @cname or NULL if the source was not found. + * g_object_unref() after usage. + */ +RTPSource * +rtp_session_get_source_by_cname (RTPSession * sess, const gchar * cname) +{ + RTPSource *result; + + g_return_val_if_fail (RTP_IS_SESSION (sess), NULL); + g_return_val_if_fail (cname != NULL, NULL); + + RTP_SESSION_LOCK (sess); + result = g_hash_table_lookup (sess->cnames, cname); + if (result) + g_object_ref (result); + RTP_SESSION_UNLOCK (sess); + + return result; +} + +/** + * rtp_session_create_source: + * @sess: an #RTPSession + * + * Create an #RTPSource for use in @sess. This function will create a source + * with an ssrc that is currently not used by any participants in the session. + * + * Returns: an #RTPSource. + */ +RTPSource * +rtp_session_create_source (RTPSession * sess) +{ + guint32 ssrc; + RTPSource *source; + + RTP_SESSION_LOCK (sess); + while (TRUE) { + ssrc = g_random_int (); + + /* see if it exists in the session, we're done if it doesn't */ + if (g_hash_table_lookup (sess->ssrcs, GINT_TO_POINTER (ssrc)) == NULL) + break; + } + source = rtp_source_new (ssrc); + g_hash_table_insert (sess->ssrcs, GINT_TO_POINTER (ssrc), source); + /* we have one more source now */ + sess->total_sources++; + RTP_SESSION_UNLOCK (sess); + + return source; +} + +/* update the RTPArrivalStats structure with the current time and other bits + * about the current buffer we are handling. + * This function is typically called when a validated packet is received. + */ +static void +update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival, + gboolean rtp, GstBuffer * buffer) +{ + /* get time or arrival */ + if (sess->callbacks.get_time) + arrival->time = sess->callbacks.get_time (sess, sess->user_data); + else + arrival->time = GST_CLOCK_TIME_NONE; + + /* update sizes */ + arrival->bytes = GST_BUFFER_SIZE (buffer) + 28; + arrival->payload_len = (rtp ? gst_rtp_buffer_get_payload_len (buffer) : 0); + + /* for netbuffer we can store the IP address to check for collisions */ + arrival->have_address = GST_IS_NETBUFFER (buffer); + if (arrival->have_address) { + GstNetBuffer *netbuf = (GstNetBuffer *) buffer; + + memcpy (&arrival->address, &netbuf->from, sizeof (GstNetAddress)); + } +} + +/** + * rtp_session_process_rtp: + * @sess: and #RTPSession + * @buffer: an RTP buffer + * + * Process an RTP buffer in the session manager. This function takes ownership + * of @buffer. + * + * Returns: a #GstFlowReturn. + */ +GstFlowReturn +rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer) +{ + GstFlowReturn result; + guint32 ssrc; + RTPSource *source; + gboolean created; + gboolean prevsender, prevactive; + RTPArrivalStats arrival; + + g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); + g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); + + if (!gst_rtp_buffer_validate (buffer)) + goto invalid_packet; + + /* update arrival stats */ + update_arrival_stats (sess, &arrival, TRUE, buffer); + + /* get SSRC and look up in session database */ + ssrc = gst_rtp_buffer_get_ssrc (buffer); + + RTP_SESSION_LOCK (sess); + source = obtain_source (sess, ssrc, &created, &arrival, TRUE); + + prevsender = RTP_SOURCE_IS_SENDER (source); + prevactive = RTP_SOURCE_IS_ACTIVE (source); + + /* let source process the packet */ + result = rtp_source_process_rtp (source, buffer, &arrival); + + /* source became active */ + if (prevactive != RTP_SOURCE_IS_ACTIVE (source)) { + sess->stats.active_sources++; + GST_DEBUG ("source: %08x became active, %d active sources", ssrc, + sess->stats.active_sources); + on_ssrc_validated (sess, source); + } + if (prevsender != RTP_SOURCE_IS_SENDER (source)) { + sess->stats.sender_sources++; + GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc, + sess->stats.sender_sources); + } + + if (created) + on_new_ssrc (sess, source); + + /* for validated sources, we add the CSRCs as well */ + if (source->validated) { + guint8 i, count; + + count = gst_rtp_buffer_get_csrc_count (buffer); + + for (i = 0; i < count; i++) { + guint32 csrc; + RTPSource *csrc_src; + + csrc = gst_rtp_buffer_get_csrc (buffer, i); + + /* get source */ + csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE); + if (created) { + GST_DEBUG ("created new CSRC: %08x", csrc); + rtp_source_set_as_csrc (csrc_src); + if (RTP_SOURCE_IS_ACTIVE (csrc_src)) + sess->stats.active_sources++; + on_new_ssrc (sess, source); + } + } + } + RTP_SESSION_UNLOCK (sess); + + return result; + + /* ERRORS */ +invalid_packet: + { + GST_DEBUG ("invalid RTP packet received"); + return GST_FLOW_OK; + } +} + +/* A Sender report contains statistics about how the sender is doing. This + * includes timing informataion about the relation between RTP and NTP + * timestamps is it using and the number of packets/bytes it sent to us. + * + * In this report is also included a set of report blocks related to how this + * sender is receiving data (in case we (or somebody else) is also sending stuff + * to it). This info includes the packet loss, jitter and seqnum. It also + * contains information to calculate the round trip time (LSR/DLSR). + */ +static void +rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet, + RTPArrivalStats * arrival) +{ + guint32 senderssrc, rtptime, packet_count, octet_count; + guint64 ntptime; + guint count, i; + RTPSource *source; + gboolean created; + + gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime, + &packet_count, &octet_count); + + RTP_SESSION_LOCK (sess); + source = obtain_source (sess, senderssrc, &created, arrival, FALSE); + + /* first update the source */ + rtp_source_process_sr (source, ntptime, rtptime, packet_count, octet_count); + + if (created) + on_new_ssrc (sess, source); + + count = gst_rtcp_packet_get_rb_count (packet); + for (i = 0; i < count; i++) { + guint32 ssrc, exthighestseq, jitter, lsr, dlsr; + guint8 fractionlost; + gint32 packetslost; + + gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost, + &packetslost, &exthighestseq, &jitter, &lsr, &dlsr); + + if (ssrc == sess->source->ssrc) { + /* only deal with report blocks for our session, we update the stats of + * the sender of the TCP message. We could also compare our stats against + * the other sender to see if we are better or worse. */ + rtp_source_process_rb (source, fractionlost, packetslost, + exthighestseq, jitter, lsr, dlsr); + } + } + RTP_SESSION_UNLOCK (sess); +} + +/* A receiver report contains statistics about how a receiver is doing. It + * includes stuff like packet loss, jitter and the seqnum it received last. It + * also contains info to calculate the round trip time. + * + * We are only interested in how the sender of this report is doing wrt to us. + */ +static void +rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet, + RTPArrivalStats * arrival) +{ + guint32 senderssrc; + guint count, i; + RTPSource *source; + gboolean created; + + senderssrc = gst_rtcp_packet_rr_get_ssrc (packet); + + GST_DEBUG ("got RR packet: SSRC %08x", senderssrc); + + RTP_SESSION_LOCK (sess); + source = obtain_source (sess, senderssrc, &created, arrival, FALSE); + + if (created) + on_new_ssrc (sess, source); + + count = gst_rtcp_packet_get_rb_count (packet); + for (i = 0; i < count; i++) { + guint32 ssrc, exthighestseq, jitter, lsr, dlsr; + guint8 fractionlost; + gint32 packetslost; + + gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost, + &packetslost, &exthighestseq, &jitter, &lsr, &dlsr); + + if (ssrc == sess->source->ssrc) { + rtp_source_process_rb (source, fractionlost, packetslost, + exthighestseq, jitter, lsr, dlsr); + } + } + RTP_SESSION_UNLOCK (sess); +} + +/* FIXME, we're just printing this for now... */ +static void +rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet, + RTPArrivalStats * arrival) +{ + guint chunks, i, j; + gboolean more_chunks, more_items; + + chunks = gst_rtcp_packet_sdes_get_chunk_count (packet); + GST_DEBUG ("got SDES packet with %d chunks", chunks); + + more_chunks = gst_rtcp_packet_sdes_first_chunk (packet); + i = 0; + while (more_chunks) { + guint32 ssrc; + + ssrc = gst_rtcp_packet_sdes_get_ssrc (packet); + + GST_DEBUG ("chunk %d, SSRC %08x", i, ssrc); + + more_items = gst_rtcp_packet_sdes_first_item (packet); + j = 0; + while (more_items) { + GstRTCPSDESType type; + guint8 len; + gchar *data; + + gst_rtcp_packet_sdes_get_item (packet, &type, &len, &data); + + GST_DEBUG ("item %d, type %d, len %d, data %s", j, type, len, data); + + more_items = gst_rtcp_packet_sdes_next_item (packet); + j++; + } + more_chunks = gst_rtcp_packet_sdes_next_chunk (packet); + i++; + } +} + +/* BYE is sent when a client leaves the session + */ +static void +rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet, + RTPArrivalStats * arrival) +{ + guint count, i; + gchar *reason; + + reason = gst_rtcp_packet_bye_get_reason (packet); + GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason)); + + count = gst_rtcp_packet_bye_get_ssrc_count (packet); + for (i = 0; i < count; i++) { + guint32 ssrc; + RTPSource *source; + gboolean created, prevactive, prevsender; + + ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i); + GST_DEBUG ("SSRC: %08x", ssrc); + + /* find src and mark bye, no probation when dealing with RTCP */ + RTP_SESSION_LOCK (sess); + source = obtain_source (sess, ssrc, &created, arrival, FALSE); + + prevactive = RTP_SOURCE_IS_ACTIVE (source); + prevsender = RTP_SOURCE_IS_SENDER (source); + + /* let the source handle the rest */ + rtp_source_process_bye (source, reason); + + if (prevactive && !RTP_SOURCE_IS_ACTIVE (source)) { + sess->stats.active_sources--; + GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc, + sess->stats.active_sources); + } + if (prevsender && !RTP_SOURCE_IS_SENDER (source)) { + sess->stats.sender_sources--; + GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc, + sess->stats.sender_sources); + } + + if (created) + on_new_ssrc (sess, source); + + on_bye_ssrc (sess, source); + RTP_SESSION_UNLOCK (sess); + } + g_free (reason); +} + +static void +rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet, + RTPArrivalStats * arrival) +{ + GST_DEBUG ("received APP"); +} + +/** + * rtp_session_process_rtcp: + * @sess: and #RTPSession + * @buffer: an RTCP buffer + * + * Process an RTCP buffer in the session manager. + * + * Returns: a #GstFlowReturn. + */ +GstFlowReturn +rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer) +{ + GstRTCPPacket packet; + gboolean more; + RTPArrivalStats arrival; + guint size; + + g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); + g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); + + if (!gst_rtcp_buffer_validate (buffer)) + goto invalid_packet; + + /* update arrival stats */ + update_arrival_stats (sess, &arrival, FALSE, buffer); + + GST_DEBUG ("received RTCP packet"); + + /* get packet size including header overhead */ + size = GST_BUFFER_SIZE (buffer) + sess->header_len; + + /* update average RTCP packet size */ + if (sess->stats.avg_rtcp_packet_size == 0) + sess->stats.avg_rtcp_packet_size = size; + else + sess->stats.avg_rtcp_packet_size = + (size + (15 * sess->stats.avg_rtcp_packet_size)) >> 4; + + /* start processing the compound packet */ + more = gst_rtcp_buffer_get_first_packet (buffer, &packet); + while (more) { + switch (gst_rtcp_packet_get_type (&packet)) { + case GST_RTCP_TYPE_SR: + rtp_session_process_sr (sess, &packet, &arrival); + break; + case GST_RTCP_TYPE_RR: + rtp_session_process_rr (sess, &packet, &arrival); + break; + case GST_RTCP_TYPE_SDES: + rtp_session_process_sdes (sess, &packet, &arrival); + break; + case GST_RTCP_TYPE_BYE: + rtp_session_process_bye (sess, &packet, &arrival); + break; + case GST_RTCP_TYPE_APP: + rtp_session_process_app (sess, &packet, &arrival); + break; + default: + GST_WARNING ("got unknown RTCP packet"); + break; + } + more = gst_rtcp_packet_move_to_next (&packet); + } + + gst_buffer_unref (buffer); + + return GST_FLOW_OK; + + /* ERRORS */ +invalid_packet: + { + GST_DEBUG ("invalid RTCP packet received"); + return GST_FLOW_OK; + } +} + +/** + * rtp_session_send_rtp: + * @sess: and #RTPSession + * @buffer: an RTP buffer + * + * Send the RTP buffer in the session manager. + * + * Returns: a #GstFlowReturn. + */ +GstFlowReturn +rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer) +{ + GstFlowReturn result; + RTPSource *source; + gboolean prevsender; + + g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); + g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); + + source = sess->source; + + prevsender = RTP_SOURCE_IS_SENDER (source); + + /* we use our own source to send */ + result = rtp_source_send_rtp (sess->source, buffer); + + if (RTP_SOURCE_IS_SENDER (source) && !prevsender) + sess->stats.sender_sources++; + + return result; +} + +/** + * rtp_session_get_rtcp_interval: + * @sess: an #RTPSession + * + * Get the interval for sending out the next RTCP packet + * + * Returns: an interval in seconds. + */ +gdouble +rtp_session_get_rtcp_interval (RTPSession * sess) +{ + gdouble result; + + g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); + + RTP_SESSION_LOCK (sess); + result = rtp_stats_calculate_rtcp_interval (&sess->stats, FALSE); + result = rtp_stats_add_rtcp_jitter (&sess->stats, result); + RTP_SESSION_UNLOCK (sess); + + return result; +} + +/** + * rtp_session_produce_rtcp: + * @sess: an #RTPSession + * + * Instruct the session manager to generate RTCP packets with current stats. + * This function will call the #RTPSessionSendRTCP callback, possibly multiple + * times, for each packet that should be processed. + * + * Returns: a #GstFlowReturn. + */ +GstFlowReturn +rtp_session_produce_rtcp (RTPSession * sess) +{ + /* FIXME: implement me */ + return GST_FLOW_NOT_SUPPORTED; +} diff --git a/gst/rtpmanager/rtpsession.h b/gst/rtpmanager/rtpsession.h new file mode 100644 index 0000000000..46062c9972 --- /dev/null +++ b/gst/rtpmanager/rtpsession.h @@ -0,0 +1,206 @@ +/* GStreamer + * Copyright (C) <2007> Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef __RTP_SESSION_H__ +#define __RTP_SESSION_H__ + +#include +#include + +#include "rtpsource.h" + +typedef struct _RTPSession RTPSession; +typedef struct _RTPSessionClass RTPSessionClass; + +#define RTP_TYPE_SESSION (rtp_session_get_type()) +#define RTP_SESSION(sess) (G_TYPE_CHECK_INSTANCE_CAST((sess),RTP_TYPE_SESSION,RTPSession)) +#define RTP_SESSION_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),RTP_TYPE_SESSION,RTPSessionClass)) +#define RTP_IS_SESSION(sess) (G_TYPE_CHECK_INSTANCE_TYPE((sess),RTP_TYPE_SESSION)) +#define RTP_IS_SESSION_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),RTP_TYPE_SESSION)) +#define RTP_SESSION_CAST(sess) ((RTPSession *)(sess)) + +#define RTP_SESSION_LOCK(sess) (g_mutex_lock ((sess)->lock)) +#define RTP_SESSION_UNLOCK(sess) (g_mutex_unlock ((sess)->lock)) + +/** + * RTPSessionProcessRTP: + * @sess: an #RTPSession + * @src: the #RTPSource + * @buffer: the RTP buffer ready for processing + * @user_data: user data specified when registering + * + * This callback will be called when @sess has @buffer ready for further + * processing. Processing the buffer typically includes decoding and displaying + * the buffer. + * + * Returns: a #GstFlowReturn. + */ +typedef GstFlowReturn (*RTPSessionProcessRTP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer, gpointer user_data); + +/** + * RTPSessionSendRTP: + * @sess: an #RTPSession + * @src: the #RTPSource + * @buffer: the RTP buffer ready for sending + * @user_data: user data specified when registering + * + * This callback will be called when @sess has @buffer ready for sending to + * all listening participants in this session. + * + * Returns: a #GstFlowReturn. + */ +typedef GstFlowReturn (*RTPSessionSendRTP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer, gpointer user_data); + +/** + * RTPSessionSendRTCP: + * @sess: an #RTPSession + * @src: the #RTPSource + * @buffer: the RTCP buffer ready for sending + * @user_data: user data specified when registering + * + * This callback will be called when @sess has @buffer ready for sending to + * all listening participants in this session. + * + * Returns: a #GstFlowReturn. + */ +typedef GstFlowReturn (*RTPSessionSendRTCP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer, gpointer user_data); + +/** + * RTPSessionClockRate: + * @sess: an #RTPSession + * @payload: the payload + * @user_data: user data specified when registering + * + * This callback will be called when @sess needs the clock-rate of @payload. + * + * Returns: the clock-rate of @pt. + */ +typedef gint (*RTPSessionClockRate) (RTPSession *sess, guint8 payload, gpointer user_data); + +/** + * RTPSessionGetTime: + * @sess: an #RTPSession + * @user_data: user data specified when registering + * + * This callback will be called when @sess needs the current time in + * nanoseconds. + * + * Returns: a #GstClockTime with the current time in nanoseconds. + */ +typedef GstClockTime (*RTPSessionGetTime) (RTPSession *sess, gpointer user_data); + +/** + * RTPSessionCallbacks: + * @RTPSessionProcessRTP: callback to process RTP packets + * @RTPSessionSendRTP: callback for sending RTP packets + * @RTPSessionSendRTCP: callback for sending RTCP packets + * @RTPSessionGetTime: callback for returning the current time + * + * These callbacks can be installed on the session manager to get notification + * when RTP and RTCP packets are ready for further processing. These callbacks + * are not implemented with signals for performance reasons. + */ +typedef struct { + RTPSessionProcessRTP process_rtp; + RTPSessionSendRTP send_rtp; + RTPSessionSendRTCP send_rtcp; + RTPSessionClockRate clock_rate; + RTPSessionGetTime get_time; +} RTPSessionCallbacks; + +/** + * RTPSession: + * @lock: lock to protect the session + * @source: the source of this session + * @ssrcs: Hashtable of sources indexed by SSRC + * @cnames: Hashtable of sources indexed by CNAME + * @num_sources: the number of sources + * @activecount: the number of active sources + * @callbacks: callbacks + * @user_data: user data passed in callbacks + * + * The RTP session manager object + */ +struct _RTPSession { + GObject object; + + GMutex *lock; + + guint header_len; + + RTPSource *source; + GHashTable *ssrcs; + GHashTable *cnames; + guint total_sources; + + RTPSessionCallbacks callbacks; + gpointer user_data; + + RTPSessionStats stats; +}; + +/** + * RTPSessionClass: + * @on_new_ssrc: emited when a new source is found + * @on_bye_ssrc: emited when a source is gone + * + * The session class. + */ +struct _RTPSessionClass { + GObjectClass parent_class; + + /* signals */ + void (*on_new_ssrc) (RTPSession *sess, RTPSource *source); + void (*on_ssrc_collision) (RTPSession *sess, RTPSource *source); + void (*on_ssrc_validated) (RTPSession *sess, RTPSource *source); + void (*on_bye_ssrc) (RTPSession *sess, RTPSource *source); +}; + +GType rtp_session_get_type (void); + +/* create and configure */ +RTPSession* rtp_session_new (void); +void rtp_session_set_callbacks (RTPSession *sess, + RTPSessionCallbacks *callbacks, + gpointer user_data); +void rtp_session_set_bandwidth (RTPSession *sess, gdouble bandwidth); +gdouble rtp_session_get_bandwidth (RTPSession *sess); +void rtp_session_set_rtcp_fraction (RTPSession *sess, gdouble fraction); +gdouble rtp_session_get_rtcp_fraction (RTPSession *sess); + +/* handling sources */ +gboolean rtp_session_add_source (RTPSession *sess, RTPSource *src); +gint rtp_session_get_num_sources (RTPSession *sess); +gint rtp_session_get_num_active_sources (RTPSession *sess); +RTPSource* rtp_session_get_source_by_ssrc (RTPSession *sess, guint32 ssrc); +RTPSource* rtp_session_get_source_by_cname (RTPSession *sess, const gchar *cname); +RTPSource* rtp_session_create_source (RTPSession *sess); + +/* processing packets from receivers */ +GstFlowReturn rtp_session_process_rtp (RTPSession *sess, GstBuffer *buffer); +GstFlowReturn rtp_session_process_rtcp (RTPSession *sess, GstBuffer *buffer); + +/* processing packets for sending */ +GstFlowReturn rtp_session_send_rtp (RTPSession *sess, GstBuffer *buffer); + +/* get interval for next RTCP interval */ +gdouble rtp_session_get_rtcp_interval (RTPSession *sess); +GstFlowReturn rtp_session_produce_rtcp (RTPSession *sess); + +#endif /* __RTP_SESSION_H__ */ diff --git a/gst/rtpmanager/rtpsource.c b/gst/rtpmanager/rtpsource.c new file mode 100644 index 0000000000..36f54381c0 --- /dev/null +++ b/gst/rtpmanager/rtpsource.c @@ -0,0 +1,477 @@ +/* GStreamer + * Copyright (C) <2007> Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ +#include + +#include +#include + +#include "rtpsource.h" + +GST_DEBUG_CATEGORY_STATIC (rtp_source_debug); +#define GST_CAT_DEFAULT rtp_source_debug + +#define RTP_MAX_PROBATION_LEN 32 + +/* signals and args */ +enum +{ + LAST_SIGNAL +}; + +enum +{ + PROP_0 +}; + +/* GObject vmethods */ +static void rtp_source_finalize (GObject * object); + +/* static guint rtp_source_signals[LAST_SIGNAL] = { 0 }; */ + +G_DEFINE_TYPE (RTPSource, rtp_source, G_TYPE_OBJECT); + +static void +rtp_source_class_init (RTPSourceClass * klass) +{ + GObjectClass *gobject_class; + + gobject_class = (GObjectClass *) klass; + + gobject_class->finalize = rtp_source_finalize; + + GST_DEBUG_CATEGORY_INIT (rtp_source_debug, "rtpsource", 0, "RTP Source"); +} + +static void +rtp_source_init (RTPSource * src) +{ + /* sources are initialy on probation until we receive enough valid RTP + * packets or a valid RTCP packet */ + src->validated = FALSE; + src->probation = RTP_DEFAULT_PROBATION; + + src->payload = 0; + src->clock_rate = -1; + src->packets = g_queue_new (); + + src->stats.jitter = 0; + src->stats.transit = -1; + src->stats.curr_sr = 0; + src->stats.curr_rr = 0; +} + +static void +rtp_source_finalize (GObject * object) +{ + RTPSource *src; + GstBuffer *buffer; + + src = RTP_SOURCE_CAST (object); + + while ((buffer = g_queue_pop_head (src->packets))) + gst_buffer_unref (buffer); + g_queue_free (src->packets); + + G_OBJECT_CLASS (rtp_source_parent_class)->finalize (object); +} + +/** + * rtp_source_new: + * @ssrc: an SSRC + * + * Create a #RTPSource with @ssrc. + * + * Returns: a new #RTPSource. Use g_object_unref() after usage. + */ +RTPSource * +rtp_source_new (guint32 ssrc) +{ + RTPSource *src; + + src = g_object_new (RTP_TYPE_SOURCE, NULL); + src->ssrc = ssrc; + + return src; +} + +/** + * rtp_source_set_callbacks: + * @src: an #RTPSource + * @cb: callback functions + * @user_data: user data + * + * Set the callbacks for the source. + */ +void +rtp_source_set_callbacks (RTPSource * src, RTPSourceCallbacks * cb, + gpointer user_data) +{ + g_return_if_fail (RTP_IS_SOURCE (src)); + + src->callbacks.push_rtp = cb->push_rtp; + src->callbacks.clock_rate = cb->clock_rate; + src->user_data = user_data; +} + +/** + * rtp_source_set_as_csrc: + * @src: an #RTPSource + * + * Configure @src as a CSRC, this will validate the RTpSource. + */ +void +rtp_source_set_as_csrc (RTPSource * src) +{ + g_return_if_fail (RTP_IS_SOURCE (src)); + + src->validated = TRUE; + src->is_csrc = TRUE; +} + +/** + * rtp_source_set_rtp_from: + * @src: an #RTPSource + * @address: the RTP address to set + * + * Set that @src is receiving RTP packets from @address. This is used for + * collistion checking. + */ +void +rtp_source_set_rtp_from (RTPSource * src, GstNetAddress * address) +{ + g_return_if_fail (RTP_IS_SOURCE (src)); + + src->have_rtp_from = TRUE; + memcpy (&src->rtp_from, address, sizeof (GstNetAddress)); +} + +/** + * rtp_source_set_rtcp_from: + * @src: an #RTPSource + * @address: the RTCP address to set + * + * Set that @src is receiving RTCP packets from @address. This is used for + * collistion checking. + */ +void +rtp_source_set_rtcp_from (RTPSource * src, GstNetAddress * address) +{ + g_return_if_fail (RTP_IS_SOURCE (src)); + + src->have_rtcp_from = TRUE; + memcpy (&src->rtcp_from, address, sizeof (GstNetAddress)); +} + +static GstFlowReturn +push_packet (RTPSource * src, GstBuffer * buffer) +{ + GstFlowReturn ret = GST_FLOW_OK; + + /* push queued packets first if any */ + while (!g_queue_is_empty (src->packets)) { + GstBuffer *buffer = GST_BUFFER_CAST (g_queue_pop_head (src->packets)); + + GST_DEBUG ("pushing queued packet"); + if (src->callbacks.push_rtp) + src->callbacks.push_rtp (src, buffer, src->user_data); + else + gst_buffer_unref (buffer); + } + GST_DEBUG ("pushing new packet"); + /* push packet */ + if (src->callbacks.push_rtp) + ret = src->callbacks.push_rtp (src, buffer, src->user_data); + else + gst_buffer_unref (buffer); + + return ret; +} + +static gint +get_clock_rate (RTPSource * src, guint8 payload) +{ + if (payload != src->payload) { + gint clock_rate = -1; + + if (src->callbacks.clock_rate) + clock_rate = src->callbacks.clock_rate (src, payload, src->user_data); + + GST_DEBUG ("new payload %d, got clock-rate %d", payload, clock_rate); + + src->clock_rate = clock_rate; + src->payload = payload; + } + return src->clock_rate; +} + +static void +calculate_jitter (RTPSource * src, GstBuffer * buffer, + RTPArrivalStats * arrival) +{ + GstClockTime current; + guint32 rtparrival, transit, rtptime; + gint32 diff; + gint clock_rate; + guint8 pt; + + /* get arrival time */ + if ((current = arrival->time) == GST_CLOCK_TIME_NONE) + goto no_time; + + pt = gst_rtp_buffer_get_payload_type (buffer); + + /* get clockrate */ + if ((clock_rate = get_clock_rate (src, pt)) == -1) + goto no_clock_rate; + + rtptime = gst_rtp_buffer_get_timestamp (buffer); + + /* convert arrival time to RTP timestamp units */ + rtparrival = gst_util_uint64_scale_int (current, clock_rate, GST_SECOND); + + /* transit time is difference with RTP timestamp */ + transit = rtparrival - rtptime; + /* get diff with previous transit time */ + if (src->stats.transit != -1) + diff = transit - src->stats.transit; + else + diff = 0; + src->stats.transit = transit; + if (diff < 0) + diff = -diff; + /* update jitter */ + src->stats.jitter += diff - ((src->stats.jitter + 8) >> 4); + + src->stats.prev_rtptime = src->stats.last_rtptime; + src->stats.last_rtptime = rtparrival; + + GST_DEBUG ("rtparrival %u, rtptime %u, clock-rate %d, diff %d, jitter: %u", + rtparrival, rtptime, clock_rate, diff, src->stats.jitter); + + return; + + /* ERRORS */ +no_time: + { + GST_WARNING ("cannot get current time"); + return; + } +no_clock_rate: + { + GST_WARNING ("cannot get clock-rate for pt %d", pt); + return; + } +} + +/** + * rtp_source_process_rtp: + * @src: an #RTPSource + * @buffer: an RTP buffer + * + * Let @src handle the incomming RTP @buffer. + * + * Returns: a #GstFlowReturn. + */ +GstFlowReturn +rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer, + RTPArrivalStats * arrival) +{ + GstFlowReturn result = GST_FLOW_OK; + + g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR); + g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); + + /* if we are still on probation, check seqnum */ + if (src->probation) { + guint16 seqnr, expected; + + expected = src->stats.max_seqnr + 1; + + /* when in probation, we require consecutive seqnums */ + seqnr = gst_rtp_buffer_get_seq (buffer); + if (seqnr == expected) { + /* expected packet */ + src->probation--; + src->stats.max_seqnr = seqnr; + + GST_DEBUG ("probation: seqnr %d == expected %d", seqnr, expected); + } else { + GST_DEBUG ("probation: seqnr %d != expected %d", seqnr, expected); + src->probation = RTP_DEFAULT_PROBATION; + src->stats.max_seqnr = seqnr; + } + } + if (src->probation) { + GstBuffer *q; + + GST_DEBUG ("probation %d: queue buffer", src->probation); + /* when still in probation, keep packets in a list. */ + g_queue_push_tail (src->packets, buffer); + /* remove packets from queue if there are too many */ + while (g_queue_get_length (src->packets) > RTP_MAX_PROBATION_LEN) { + q = g_queue_pop_head (src->packets); + gst_object_unref (q); + } + } else { + /* we are not in probation */ + src->stats.octetsreceived += arrival->payload_len; + src->stats.bytesreceived += arrival->bytes; + src->stats.packetsreceived++; + src->is_sender = TRUE; + + GST_DEBUG ("PC: %" G_GUINT64_FORMAT ", OC: %" G_GUINT64_FORMAT, + src->stats.packetsreceived, src->stats.octetsreceived); + + /* calculate jitter */ + calculate_jitter (src, buffer, arrival); + + /* we're ready to push the RTP packet now */ + result = push_packet (src, buffer); + } + return result; +} + +/** + * rtp_source_process_bye: + * @src: an #RTPSource + * @reason: the reason for leaving + * + * Notify @src that a BYE packet has been received. This will make the source + * inactive. + */ +void +rtp_source_process_bye (RTPSource * src, const gchar * reason) +{ + g_return_if_fail (RTP_IS_SOURCE (src)); + + GST_DEBUG ("marking SSRC %08x as BYE, reason: %s", src->ssrc, + GST_STR_NULL (reason)); + + /* copy the reason and mark as received_bye */ + g_free (src->bye_reason); + src->bye_reason = g_strdup (reason); + src->received_bye = TRUE; +} + +/** + * rtp_source_send_rtp: + * @src: an #RTPSource + * @buffer: an RTP buffer + * + * Send an RTP @buffer originating from @src. This will make @src a sender. + * + * Returns: a #GstFlowReturn. + */ +GstFlowReturn +rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer) +{ + GstFlowReturn result = GST_FLOW_OK; + + g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR); + g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); + + /* we are a sender now */ + src->is_sender = TRUE; + + /* push packet */ + if (src->callbacks.push_rtp) + result = src->callbacks.push_rtp (src, buffer, src->user_data); + else + gst_buffer_unref (buffer); + + return result; +} + +/** + * rtp_source_process_sr: + * @src: an #RTPSource + * @ntptime: the NTP time + * @rtptime: the RTP time + * @packet_count: the packet count + * @octet_count: the octect count + * + * Update the sender report in @src. + */ +void +rtp_source_process_sr (RTPSource * src, guint64 ntptime, guint32 rtptime, + guint32 packet_count, guint32 octet_count) +{ + RTPSenderReport *curr; + gint curridx; + + g_return_if_fail (RTP_IS_SOURCE (src)); + + GST_DEBUG ("got SR packet: SSRC %08x, NTP %" G_GUINT64_FORMAT + ", RTP %u, PC %u, OC %u", src->ssrc, ntptime, rtptime, packet_count, + octet_count); + + curridx = src->stats.curr_sr ^ 1; + curr = &src->stats.sr[curridx]; + + /* update current */ + curr->is_valid = TRUE; + curr->ntptime = ntptime; + curr->rtptime = rtptime; + curr->packet_count = packet_count; + curr->octet_count = octet_count; + + /* make current */ + src->stats.curr_sr = curridx; +} + +/** + * rtp_source_process_rb: + * @src: an #RTPSource + * @fractionlost: fraction lost since last SR/RR + * @packetslost: the cumululative number of packets lost + * @exthighestseq: the extended last sequence number received + * @jitter: the interarrival jitter + * @lsr: the last SR packet from this source + * @dlsr: the delay since last SR packet + * + * Update the report block in @src. + */ +void +rtp_source_process_rb (RTPSource * src, guint8 fractionlost, gint32 packetslost, + guint32 exthighestseq, guint32 jitter, guint32 lsr, guint32 dlsr) +{ + RTPReceiverReport *curr; + gint curridx; + + g_return_if_fail (RTP_IS_SOURCE (src)); + + GST_DEBUG ("got RB packet %d: SSRC %08x, FL %u" + ", PL %u, HS %u, JITTER %u, LSR %u, DLSR %u", src->ssrc, fractionlost, + packetslost, exthighestseq, jitter, lsr, dlsr); + + curridx = src->stats.curr_rr ^ 1; + curr = &src->stats.rr[curridx]; + + /* update current */ + curr->is_valid = TRUE; + curr->fractionlost = fractionlost; + curr->packetslost = packetslost; + curr->exthighestseq = exthighestseq; + curr->jitter = jitter; + curr->lsr = lsr; + curr->dlsr = dlsr; + + /* make current */ + src->stats.curr_rr = curridx; +} diff --git a/gst/rtpmanager/rtpsource.h b/gst/rtpmanager/rtpsource.h new file mode 100644 index 0000000000..d4ae6f5524 --- /dev/null +++ b/gst/rtpmanager/rtpsource.h @@ -0,0 +1,162 @@ +/* GStreamer + * Copyright (C) <2007> Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef __RTP_SOURCE_H__ +#define __RTP_SOURCE_H__ + +#include +#include +#include + +#include "rtpstats.h" + +/* the default number of consecutive RTP packets we need to receive before the + * source is considered valid */ +#define RTP_NO_PROBATION 0 +#define RTP_DEFAULT_PROBATION 2 + +typedef struct _RTPSource RTPSource; +typedef struct _RTPSourceClass RTPSourceClass; + +#define RTP_TYPE_SOURCE (rtp_source_get_type()) +#define RTP_SOURCE(src) (G_TYPE_CHECK_INSTANCE_CAST((src),RTP_TYPE_SOURCE,RTPSource)) +#define RTP_SOURCE_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),RTP_TYPE_SOURCE,RTPSourceClass)) +#define RTP_IS_SOURCE(src) (G_TYPE_CHECK_INSTANCE_TYPE((src),RTP_TYPE_SOURCE)) +#define RTP_IS_SOURCE_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),RTP_TYPE_SOURCE)) +#define RTP_SOURCE_CAST(src) ((RTPSource *)(src)) + +/** + * RTP_SOURCE_IS_ACTIVE: + * @src: an #RTPSource + * + * Check if @src is active. A source is active when it has been validated + * and has not yet received a BYE packet. + */ +#define RTP_SOURCE_IS_ACTIVE(src) (src->validated && !src->received_bye) + +/** + * RTP_SOURCE_IS_SENDER: + * @src: an #RTPSource + * + * Check if @src is a sender. + */ +#define RTP_SOURCE_IS_SENDER(src) (src->is_sender) + +/** + * RTPSourcePushRTP: + * @src: an #RTPSource + * @buffer: the RTP buffer ready for processing + * @user_data: user data specified when registering + * + * This callback will be called when @src has @buffer ready for further + * processing. + * + * Returns: a #GstFlowReturn. + */ +typedef GstFlowReturn (*RTPSourcePushRTP) (RTPSource *src, GstBuffer *buffer, gpointer user_data); + +/** + * RTPSourceClockRate: + * @src: an #RTPSource + * @payload: a payload type + * @user_data: user data specified when registering + * + * This callback will be called when @src needs the clock-rate of the + * @payload. + * + * Returns: a clock-rate for @payload. + */ +typedef gint (*RTPSourceClockRate) (RTPSource *src, guint8 payload, gpointer user_data); + +/** + * RTPSourceCallbacks: + * @push_rtp: a packet becomes available for handling + * @clock_rate: a clock-rate is requested + * @get_time: the current clock time is requested + * + * Callbacks performed by #RTPSource when actions need to be performed. + */ +typedef struct { + RTPSourcePushRTP push_rtp; + RTPSourceClockRate clock_rate; +} RTPSourceCallbacks; + +/** + * RTPSource: + * + * A source in the #RTPSession + */ +struct _RTPSource { + GObject object; + + /*< private >*/ + RTPSourceCallbacks callbacks; + gpointer user_data; + + guint32 ssrc; + gchar *cname; + gint probation; + gboolean validated; + gboolean received_bye; + gchar *bye_reason; + + gboolean is_csrc; + gboolean is_sender; + + gboolean have_rtp_from; + GstNetAddress rtp_from; + gboolean have_rtcp_from; + GstNetAddress rtcp_from; + + guint8 payload; + gint clock_rate; + + GQueue *packets; + + RTPSourceStats stats; +}; + +struct _RTPSourceClass { + GObjectClass parent_class; +}; + +GType rtp_source_get_type (void); + +/* managing lifetime of sources */ +RTPSource* rtp_source_new (guint32 ssrc); + +void rtp_source_set_callbacks (RTPSource *src, RTPSourceCallbacks *cb, gpointer data); +void rtp_source_set_as_csrc (RTPSource *src); + +void rtp_source_set_rtp_from (RTPSource *src, GstNetAddress *address); +void rtp_source_set_rtcp_from (RTPSource *src, GstNetAddress *address); + +GstFlowReturn rtp_source_process_rtp (RTPSource *src, GstBuffer *buffer, RTPArrivalStats *arrival); + +GstFlowReturn rtp_source_send_rtp (RTPSource *src, GstBuffer *buffer); + +/* RTCP messages */ +void rtp_source_process_bye (RTPSource *src, const gchar *reason); +void rtp_source_process_sr (RTPSource *src, guint64 ntptime, guint32 rtptime, + guint32 packet_count, guint32 octet_count); +void rtp_source_process_rb (RTPSource *src, guint8 fractionlost, gint32 packetslost, + guint32 exthighestseq, guint32 jitter, + guint32 lsr, guint32 dlsr); + +#endif /* __RTP_SOURCE_H__ */ diff --git a/gst/rtpmanager/rtpstats.c b/gst/rtpmanager/rtpstats.c new file mode 100644 index 0000000000..b9076eacd5 --- /dev/null +++ b/gst/rtpmanager/rtpstats.c @@ -0,0 +1,111 @@ +/* GStreamer + * Copyright (C) <2007> Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#include "rtpstats.h" + +/** + * rtp_stats_init_defaults: + * @stats: an #RTPSessionStats struct + * + * Initialize @stats with its default values. + */ +void +rtp_stats_init_defaults (RTPSessionStats * stats) +{ + stats->bandwidth = RTP_STATS_BANDWIDTH; + stats->sender_fraction = RTP_STATS_SENDER_FRACTION; + stats->receiver_fraction = RTP_STATS_RECEIVER_FRACTION; + stats->rtcp_bandwidth = RTP_STATS_RTCP_BANDWIDTH; + stats->min_interval = RTP_STATS_MIN_INTERVAL; +} + +/** + * rtp_stats_calculate_rtcp_interval: + * @stats: an #RTPSessionStats struct + * + * Calculate the RTCP interval. The result of this function is the amount of + * time to wait (in seconds) before sender a new RTCP message. + * + * Returns: the RTCP interval. + */ +gdouble +rtp_stats_calculate_rtcp_interval (RTPSessionStats * stats, gboolean sender) +{ + gdouble active, senders, receivers, sfraction; + gboolean avg_rtcp; + gdouble interval; + + active = stats->active_sources; + /* Try to avoid division by zero */ + if (stats->active_sources == 0) + active += 1.0; + + senders = (gdouble) stats->sender_sources; + receivers = (gdouble) (active - senders); + avg_rtcp = (gdouble) stats->avg_rtcp_packet_size; + + sfraction = senders / active; + + GST_DEBUG ("senders: %f, receivers %f, avg_rtcp %f, sfraction %f", + senders, receivers, avg_rtcp, sfraction); + + if (sfraction <= stats->sender_fraction) { + if (sender) { + interval = + (avg_rtcp * senders) / (stats->sender_fraction * + stats->rtcp_bandwidth); + } else { + interval = + (avg_rtcp * receivers) / ((1.0 - + stats->sender_fraction) * stats->rtcp_bandwidth); + } + } else { + interval = (avg_rtcp * active) / stats->rtcp_bandwidth; + } + + if (interval < stats->min_interval) + interval = stats->min_interval; + + if (!stats->sent_rtcp) + interval /= 2.0; + + return interval; +} + +/** + * rtp_stats_calculate_rtcp_interval: + * @stats: an #RTPSessionStats struct + * @interval: an RTCP interval + * + * Apply a random jitter to the @interval. @interval is typically obtained with + * rtp_stats_calculate_rtcp_interval(). + * + * Returns: the new RTCP interval. + */ +gdouble +rtp_stats_add_rtcp_jitter (RTPSessionStats * stats, gdouble interval) +{ + /* see RFC 3550 p 30 + * To compensate for "unconditional reconsideration" converging to a + * value below the intended average. + */ +#define COMPENSATION (2.71828 - 1.5); + + return (interval * g_random_double_range (0.5, 1.5)) / COMPENSATION; +} diff --git a/gst/rtpmanager/rtpstats.h b/gst/rtpmanager/rtpstats.h new file mode 100644 index 0000000000..66aa7bf72e --- /dev/null +++ b/gst/rtpmanager/rtpstats.h @@ -0,0 +1,161 @@ +/* GStreamer + * Copyright (C) <2007> Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef __RTP_STATS_H__ +#define __RTP_STATS_H__ + +#include +#include + +/** + * RTPSenderReport: + * + * A sender report structure. + */ +typedef struct { + gboolean is_valid; + guint64 ntptime; + guint32 rtptime; + guint32 packet_count; + guint32 octet_count; +} RTPSenderReport; + +/** + * RTPReceiverReport: + * + * A receiver report structure. + */ +typedef struct { + gboolean is_valid; + guint32 ssrc; /* who the report is from */ + guint8 fractionlost; + guint32 packetslost; + guint32 exthighestseq; + guint32 jitter; + guint32 lsr; + guint32 dlsr; +} RTPReceiverReport; + +/** + * RTPArrivalStats: + * @time: arrival time of a packet + * @address: address of the sender of the packet + * @bytes: bytes of the packet including lowlevel overhead + * @payload_len: bytes of the RTP payload + * + * Structure holding information about the arrival stats of a packet. + */ +typedef struct { + GstClockTime time; + gboolean have_address; + GstNetAddress address; + guint bytes; + guint payload_len; +} RTPArrivalStats; + +/** + * RTPSourceStats: + * @packetsreceived: number of received packets in total + * @prevpacketsreceived: number of packets received in previous reporting + * interval + * @octetsreceived: number of payload bytes received + * @bytesreceived: number of total bytes received including headers and lower + * protocol level overhead + * @max_seqnr: highest sequence number received + * @transit: previous transit time used for calculating @jitter + * @jitter: current jitter + * @prev_rtptime: previous time when an RTP packet was received + * @prev_rtcptime: previous time when an RTCP packet was received + * @last_rtptime: time when last RTP packet received + * @last_rtcptime: time when last RTCP packet received + * @curr_rr: index of current @rr block + * @rr: previous and current receiver report block + * @curr_sr: index of current @sr block + * @sr: previous and current sender report block + * + * Stats about a source. + */ +typedef struct { + guint64 packetsreceived; + guint64 prevpacketsreceived; + guint64 octetsreceived; + guint64 bytesreceived; + guint16 max_seqnr; + guint32 transit; + guint32 jitter; + + /* when we received stuff */ + GstClockTime prev_rtptime; + GstClockTime prev_rtcptime; + GstClockTime last_rtptime; + GstClockTime last_rtcptime; + + /* sender and receiver reports */ + gint curr_rr; + RTPReceiverReport rr[2]; + gint curr_sr; + RTPSenderReport sr[2]; +} RTPSourceStats; + +#define RTP_STATS_BANDWIDTH 64000.0 +#define RTP_STATS_RTCP_BANDWIDTH 3000.0 +/* + * Minimum average time between RTCP packets from this site (in + * seconds). This time prevents the reports from `clumping' when + * sessions are small and the law of large numbers isn't helping + * to smooth out the traffic. It also keeps the report interval + * from becoming ridiculously small during transient outages like + * a network partition. + */ +#define RTP_STATS_MIN_INTERVAL 5.0 + /* + * Fraction of the RTCP bandwidth to be shared among active + * senders. (This fraction was chosen so that in a typical + * session with one or two active senders, the computed report + * time would be roughly equal to the minimum report time so that + * we don't unnecessarily slow down receiver reports.) The + * receiver fraction must be 1 - the sender fraction. + */ +#define RTP_STATS_SENDER_FRACTION (0.25) +#define RTP_STATS_RECEIVER_FRACTION (1.0 - RTP_STATS_SENDER_FRACTION) + +/** + * RTPSessionStats: + * + * Stats kept for a session and used to produce RTCP packet timeouts. + */ +typedef struct { + gdouble bandwidth; + gdouble sender_fraction; + gdouble receiver_fraction; + gdouble rtcp_bandwidth; + gdouble min_interval; + guint sender_sources; + guint active_sources; + guint avg_rtcp_packet_size; + guint avg_bye_packet_size; + gboolean sent_rtcp; +} RTPSessionStats; + +void rtp_stats_init_defaults (RTPSessionStats *stats); + +gdouble rtp_stats_calculate_rtcp_interval (RTPSessionStats *stats, gboolean sender); +gdouble rtp_stats_add_rtcp_jitter (RTPSessionStats *stats, gdouble interval); + +#endif /* __RTP_STATS_H__ */