configure.ac: Disable rtpmanager for now because it depends on CVS -base.

Original commit message from CVS:
* configure.ac:
Disable rtpmanager for now because it depends on CVS -base.
* gst/rtpmanager/Makefile.am:
Added new files for session manager.
* gst/rtpmanager/gstrtpjitterbuffer.h:
* gst/rtpmanager/gstrtpbin.c: (create_session), (get_pt_map),
(create_stream), (pt_map_requested), (new_ssrc_pad_found):
Some cleanups.
the session manager can now also request a pt-map.
* gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_base_init),
(gst_rtp_session_class_init), (gst_rtp_session_init),
(gst_rtp_session_finalize), (rtcp_thread), (start_rtcp_thread),
(stop_rtcp_thread), (gst_rtp_session_change_state),
(gst_rtp_session_process_rtp), (gst_rtp_session_send_rtp),
(gst_rtp_session_send_rtcp), (gst_rtp_session_clock_rate),
(gst_rtp_session_get_time), (gst_rtp_session_event_recv_rtp_sink),
(gst_rtp_session_chain_recv_rtp),
(gst_rtp_session_event_recv_rtcp_sink),
(gst_rtp_session_chain_recv_rtcp),
(gst_rtp_session_event_send_rtp_sink),
(gst_rtp_session_chain_send_rtp), (create_send_rtcp_src),
(gst_rtp_session_request_new_pad):
* gst/rtpmanager/gstrtpsession.h:
We can ask for pt-map now too when the session manager needs it.
Hook up to the new session manager, implement the needed callbacks for
pushing data, getting clock time and requesting clock-rates.
Rename rtcp_src to send_rtcp_src to make it clear that this RTCP is to
be send to clients.
Add code to start and stop the thread that will schedule RTCP through
the session manager.
* gst/rtpmanager/rtpsession.c: (rtp_session_class_init),
(rtp_session_init), (rtp_session_finalize),
(rtp_session_set_property), (rtp_session_get_property),
(on_new_ssrc), (on_ssrc_collision), (on_ssrc_validated),
(on_bye_ssrc), (rtp_session_new), (rtp_session_set_callbacks),
(rtp_session_set_bandwidth), (rtp_session_get_bandwidth),
(rtp_session_set_rtcp_bandwidth), (rtp_session_get_rtcp_bandwidth),
(source_push_rtp), (source_clock_rate), (check_collision),
(obtain_source), (rtp_session_add_source),
(rtp_session_get_num_sources),
(rtp_session_get_num_active_sources),
(rtp_session_get_source_by_ssrc),
(rtp_session_get_source_by_cname), (rtp_session_create_source),
(update_arrival_stats), (rtp_session_process_rtp),
(rtp_session_process_sr), (rtp_session_process_rr),
(rtp_session_process_sdes), (rtp_session_process_bye),
(rtp_session_process_app), (rtp_session_process_rtcp),
(rtp_session_send_rtp), (rtp_session_get_rtcp_interval),
(rtp_session_produce_rtcp):
* gst/rtpmanager/rtpsession.h:
The advanced beginnings of the main session manager that handles the
participant database of RTPSources, SSRC probation, SSRC collisions,
parse RTCP to update source stats. etc..
* gst/rtpmanager/rtpsource.c: (rtp_source_class_init),
(rtp_source_init), (rtp_source_finalize), (rtp_source_new),
(rtp_source_set_callbacks), (rtp_source_set_as_csrc),
(rtp_source_set_rtp_from), (rtp_source_set_rtcp_from),
(push_packet), (get_clock_rate), (calculate_jitter),
(rtp_source_process_rtp), (rtp_source_process_bye),
(rtp_source_send_rtp), (rtp_source_process_sr),
(rtp_source_process_rb):
* gst/rtpmanager/rtpsource.h:
Object that encapsulates an SSRC and its state in the database.
Calculates the jitter and transit times of data packets.
* gst/rtpmanager/rtpstats.c: (rtp_stats_init_defaults),
(rtp_stats_calculate_rtcp_interval), (rtp_stats_add_rtcp_jitter):
* gst/rtpmanager/rtpstats.h:
Various stats regarding the session and sources.
Used to calculate the RTCP interval.
This commit is contained in:
Wim Taymans 2007-04-18 18:58:53 +00:00
parent 6cbfc31aae
commit 1d75a69ccf
14 changed files with 2556 additions and 37 deletions

View file

@ -1,3 +1,81 @@
2007-04-18 Wim Taymans <wim@fluendo.com>
* configure.ac:
Disable rtpmanager for now because it depends on CVS -base.
* gst/rtpmanager/Makefile.am:
Added new files for session manager.
* gst/rtpmanager/gstrtpjitterbuffer.h:
* gst/rtpmanager/gstrtpbin.c: (create_session), (get_pt_map),
(create_stream), (pt_map_requested), (new_ssrc_pad_found):
Some cleanups.
the session manager can now also request a pt-map.
* gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_base_init),
(gst_rtp_session_class_init), (gst_rtp_session_init),
(gst_rtp_session_finalize), (rtcp_thread), (start_rtcp_thread),
(stop_rtcp_thread), (gst_rtp_session_change_state),
(gst_rtp_session_process_rtp), (gst_rtp_session_send_rtp),
(gst_rtp_session_send_rtcp), (gst_rtp_session_clock_rate),
(gst_rtp_session_get_time), (gst_rtp_session_event_recv_rtp_sink),
(gst_rtp_session_chain_recv_rtp),
(gst_rtp_session_event_recv_rtcp_sink),
(gst_rtp_session_chain_recv_rtcp),
(gst_rtp_session_event_send_rtp_sink),
(gst_rtp_session_chain_send_rtp), (create_send_rtcp_src),
(gst_rtp_session_request_new_pad):
* gst/rtpmanager/gstrtpsession.h:
We can ask for pt-map now too when the session manager needs it.
Hook up to the new session manager, implement the needed callbacks for
pushing data, getting clock time and requesting clock-rates.
Rename rtcp_src to send_rtcp_src to make it clear that this RTCP is to
be send to clients.
Add code to start and stop the thread that will schedule RTCP through
the session manager.
* gst/rtpmanager/rtpsession.c: (rtp_session_class_init),
(rtp_session_init), (rtp_session_finalize),
(rtp_session_set_property), (rtp_session_get_property),
(on_new_ssrc), (on_ssrc_collision), (on_ssrc_validated),
(on_bye_ssrc), (rtp_session_new), (rtp_session_set_callbacks),
(rtp_session_set_bandwidth), (rtp_session_get_bandwidth),
(rtp_session_set_rtcp_bandwidth), (rtp_session_get_rtcp_bandwidth),
(source_push_rtp), (source_clock_rate), (check_collision),
(obtain_source), (rtp_session_add_source),
(rtp_session_get_num_sources),
(rtp_session_get_num_active_sources),
(rtp_session_get_source_by_ssrc),
(rtp_session_get_source_by_cname), (rtp_session_create_source),
(update_arrival_stats), (rtp_session_process_rtp),
(rtp_session_process_sr), (rtp_session_process_rr),
(rtp_session_process_sdes), (rtp_session_process_bye),
(rtp_session_process_app), (rtp_session_process_rtcp),
(rtp_session_send_rtp), (rtp_session_get_rtcp_interval),
(rtp_session_produce_rtcp):
* gst/rtpmanager/rtpsession.h:
The advanced beginnings of the main session manager that handles the
participant database of RTPSources, SSRC probation, SSRC collisions,
parse RTCP to update source stats. etc..
* gst/rtpmanager/rtpsource.c: (rtp_source_class_init),
(rtp_source_init), (rtp_source_finalize), (rtp_source_new),
(rtp_source_set_callbacks), (rtp_source_set_as_csrc),
(rtp_source_set_rtp_from), (rtp_source_set_rtcp_from),
(push_packet), (get_clock_rate), (calculate_jitter),
(rtp_source_process_rtp), (rtp_source_process_bye),
(rtp_source_send_rtp), (rtp_source_process_sr),
(rtp_source_process_rb):
* gst/rtpmanager/rtpsource.h:
Object that encapsulates an SSRC and its state in the database.
Calculates the jitter and transit times of data packets.
* gst/rtpmanager/rtpstats.c: (rtp_stats_init_defaults),
(rtp_stats_calculate_rtcp_interval), (rtp_stats_add_rtcp_jitter):
* gst/rtpmanager/rtpstats.h:
Various stats regarding the session and sources.
Used to calculate the RTCP interval.
2007-04-17 Tim-Philipp Müller <tim at centricular dot net>
* gst/app/Makefile.am:

2
common

@ -1 +1 @@
Subproject commit 9097e252e477e18182f08a032d8860bdee9a0416
Subproject commit e05f45f13961b851501ca8938aa2049fa96c7b11

View file

@ -95,7 +95,6 @@ GST_PLUGINS_ALL="\
nuvdemux \
real \
replaygain \
rtpmanager \
spectrum \
speed \
qtdemux \

View file

@ -17,6 +17,9 @@ libgstrtpmanager_la_SOURCES = gstrtpmanager.c \
gstrtpjitterbuffer.c \
gstrtpptdemux.c \
gstrtpssrcdemux.c \
rtpsession.c \
rtpsource.c \
rtpstats.c \
gstrtpsession.c
nodist_libgstrtpmanager_la_SOURCES = \
@ -28,11 +31,15 @@ noinst_HEADERS = gstrtpbin.h \
gstrtpjitterbuffer.h \
gstrtpptdemux.h \
gstrtpssrcdemux.h \
rtpsession.h \
rtpsource.h \
rtpstats.h \
gstrtpsession.h
libgstrtpmanager_la_CFLAGS = $(GST_CFLAGS) $(GST_PLUGINS_BASE_CFLAGS) $(ERROR_CFLAGS)
libgstrtpmanager_la_LIBADD = $(GST_LIBS_LIBS)
libgstrtpmanager_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) $(GST_BASE_LIBS) $(GST_PLUGINS_BASE_LIBS) -lgstrtp-@GST_MAJORMINOR@
libgstrtpmanager_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) $(GST_BASE_LIBS) $(GST_PLUGINS_BASE_LIBS) -lgstrtp-@GST_MAJORMINOR@ \
-lgstnetbuffer-@GST_MAJORMINOR@
CLEANFILES = $(BUILT_SOURCES)

View file

@ -129,7 +129,7 @@ typedef struct _GstRTPBinClient GstRTPBinClient;
static guint gst_rtp_bin_signals[LAST_SIGNAL] = { 0 };
static GstCaps *pt_map_requested (GstElement * element, guint pt,
GstRTPBinStream * stream);
GstRTPBinSession * session);
/* Manages the RTP stream for one SSRC.
*
@ -215,9 +215,9 @@ static GstRTPBinSession *
create_session (GstRTPBin * rtpbin, gint id)
{
GstRTPBinSession *sess;
GstElement *elem, *demux;
GstElement *session, *demux;
if (!(elem = gst_element_factory_make ("rtpsession", NULL)))
if (!(session = gst_element_factory_make ("rtpsession", NULL)))
goto no_session;
if (!(demux = gst_element_factory_make ("rtpssrcdemux", NULL)))
@ -227,13 +227,17 @@ create_session (GstRTPBin * rtpbin, gint id)
sess->lock = g_mutex_new ();
sess->id = id;
sess->bin = rtpbin;
sess->session = elem;
sess->session = session;
sess->demux = demux;
sess->ptmap = g_hash_table_new (NULL, NULL);
rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess);
gst_bin_add (GST_BIN_CAST (rtpbin), elem);
gst_element_set_state (elem, GST_STATE_PLAYING);
/* provide clock_rate to the session manager when needed */
g_signal_connect (session, "request-pt-map",
(GCallback) pt_map_requested, sess);
gst_bin_add (GST_BIN_CAST (rtpbin), session);
gst_element_set_state (session, GST_STATE_PLAYING);
gst_bin_add (GST_BIN_CAST (rtpbin), demux);
gst_element_set_state (demux, GST_STATE_PLAYING);
@ -247,7 +251,7 @@ no_session:
}
no_demux:
{
gst_object_unref (elem);
gst_object_unref (session);
g_warning ("rtpbin: could not create rtpssrcdemux element");
return NULL;
}
@ -351,7 +355,7 @@ create_stream (GstRTPBinSession * session, guint32 ssrc)
/* provide clock_rate to the jitterbuffer when needed */
g_signal_connect (buffer, "request-pt-map",
(GCallback) pt_map_requested, stream);
(GCallback) pt_map_requested, session);
gst_bin_add (GST_BIN_CAST (session->bin), buffer);
gst_element_set_state (buffer, GST_STATE_PLAYING);
@ -590,14 +594,12 @@ new_payload_found (GstElement * element, guint pt, GstPad * pad,
}
static GstCaps *
pt_map_requested (GstElement * element, guint pt, GstRTPBinStream * stream)
pt_map_requested (GstElement * element, guint pt, GstRTPBinSession * session)
{
GstRTPBin *rtpbin;
GstRTPBinSession *session;
GstCaps *caps;
rtpbin = stream->bin;
session = stream->session;
rtpbin = session->bin;
GST_DEBUG_OBJECT (rtpbin, "payload map requested for pt %d in session %d", pt,
session->id);
@ -647,7 +649,7 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
* demuxer so that it can apply a proper caps on the buffers for the
* depayloaders. */
stream->demux_ptreq_sig = g_signal_connect (stream->demux,
"request-pt-map", (GCallback) pt_map_requested, stream);
"request-pt-map", (GCallback) pt_map_requested, session);
GST_RTP_SESSION_UNLOCK (session);

View file

@ -63,6 +63,7 @@ struct _GstRTPJitterBufferClass
{
GstElementClass parent_class;
/* signals */
GstCaps* (*request_pt_map) (GstRTPJitterBuffer *buffer, guint pt);
/*< private > */

View file

@ -39,7 +39,10 @@
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstrtpbin-marshal.h"
#include "gstrtpsession.h"
#include "rtpsession.h"
GST_DEBUG_CATEGORY_STATIC (gst_rtp_session_debug);
#define GST_CAT_DEFAULT gst_rtp_session_debug
@ -95,8 +98,8 @@ GST_STATIC_PAD_TEMPLATE ("send_rtp_src",
GST_STATIC_CAPS ("application/x-rtp")
);
static GstStaticPadTemplate rtpsession_rtcp_src_template =
GST_STATIC_PAD_TEMPLATE ("rtcp_src",
static GstStaticPadTemplate rtpsession_send_rtcp_src_template =
GST_STATIC_PAD_TEMPLATE ("send_rtcp_src",
GST_PAD_SRC,
GST_PAD_REQUEST,
GST_STATIC_CAPS ("application/x-rtcp")
@ -105,7 +108,7 @@ GST_STATIC_PAD_TEMPLATE ("rtcp_src",
/* signals and args */
enum
{
/* FILL ME */
SIGNAL_REQUEST_PT_MAP,
LAST_SIGNAL
};
@ -123,6 +126,31 @@ enum
struct _GstRTPSessionPrivate
{
GMutex *lock;
RTPSession *session;
/* thread for sending out RTCP */
GstClockID id;
gboolean stop_thread;
GThread *thread;
};
/* callbacks to handle actions from the session manager */
static GstFlowReturn gst_rtp_session_process_rtp (RTPSession * sess,
RTPSource * src, GstBuffer * buffer, gpointer user_data);
static GstFlowReturn gst_rtp_session_send_rtp (RTPSession * sess,
RTPSource * src, GstBuffer * buffer, gpointer user_data);
static GstFlowReturn gst_rtp_session_send_rtcp (RTPSession * sess,
RTPSource * src, GstBuffer * buffer, gpointer user_data);
static gint gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
gpointer user_data);
static GstClockTime gst_rtp_session_get_time (RTPSession * sess,
gpointer user_data);
static RTPSessionCallbacks callbacks = {
gst_rtp_session_process_rtp,
gst_rtp_session_send_rtp,
gst_rtp_session_send_rtcp,
gst_rtp_session_clock_rate,
gst_rtp_session_get_time
};
/* GObject vmethods */
@ -139,7 +167,7 @@ static GstPad *gst_rtp_session_request_new_pad (GstElement * element,
GstPadTemplate * templ, const gchar * name);
static void gst_rtp_session_release_pad (GstElement * element, GstPad * pad);
/*static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 }; */
static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 };
GST_BOILERPLATE (GstRTPSession, gst_rtp_session, GstElement, GST_TYPE_ELEMENT);
@ -164,7 +192,7 @@ gst_rtp_session_base_init (gpointer klass)
gst_element_class_add_pad_template (element_class,
gst_static_pad_template_get (&rtpsession_send_rtp_src_template));
gst_element_class_add_pad_template (element_class,
gst_static_pad_template_get (&rtpsession_rtcp_src_template));
gst_static_pad_template_get (&rtpsession_send_rtcp_src_template));
gst_element_class_set_details (element_class, &rtpsession_details);
}
@ -184,6 +212,19 @@ gst_rtp_session_class_init (GstRTPSessionClass * klass)
gobject_class->set_property = gst_rtp_session_set_property;
gobject_class->get_property = gst_rtp_session_get_property;
/**
* GstRTPSession::request-pt-map:
* @sess: the object which received the signal
* @pt: the pt
*
* Request the payload type as #GstCaps for @pt.
*/
gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP] =
g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPSessionClass, request_pt_map),
NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT, GST_TYPE_CAPS, 1,
G_TYPE_UINT);
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_rtp_session_change_state);
gstelement_class->request_new_pad =
@ -200,6 +241,9 @@ gst_rtp_session_init (GstRTPSession * rtpsession, GstRTPSessionClass * klass)
{
rtpsession->priv = GST_RTP_SESSION_GET_PRIVATE (rtpsession);
rtpsession->priv->lock = g_mutex_new ();
rtpsession->priv->session = rtp_session_new ();
/* configure callbacks */
rtp_session_set_callbacks (rtpsession->priv->session, &callbacks, rtpsession);
}
static void
@ -209,6 +253,7 @@ gst_rtp_session_finalize (GObject * object)
rtpsession = GST_RTP_SESSION (object);
g_mutex_free (rtpsession->priv->lock);
g_object_unref (rtpsession->priv->session);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
@ -243,6 +288,87 @@ gst_rtp_session_get_property (GObject * object, guint prop_id,
}
}
static void
rtcp_thread (GstRTPSession * rtpsession)
{
GstClock *clock;
GstClockID id;
clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession));
if (clock == NULL)
return;
GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread");
GST_RTP_SESSION_LOCK (rtpsession);
while (!rtpsession->priv->stop_thread) {
gdouble timeout;
GstClockTime target;
timeout = rtp_session_get_rtcp_interval (rtpsession->priv->session);
GST_DEBUG_OBJECT (rtpsession, "next RTCP timeout: %lf", timeout);
target = gst_clock_get_time (clock);
target += GST_SECOND * timeout;
id = rtpsession->priv->id = gst_clock_new_single_shot_id (clock, target);
GST_RTP_SESSION_UNLOCK (rtpsession);
gst_clock_id_wait (id, NULL);
GST_DEBUG_OBJECT (rtpsession, "got RTCP timeout");
/* make the session manager produce RTCP, we ignore the result. */
rtp_session_produce_rtcp (rtpsession->priv->session);
GST_RTP_SESSION_LOCK (rtpsession);
gst_clock_id_unref (id);
rtpsession->priv->id = NULL;
}
GST_RTP_SESSION_UNLOCK (rtpsession);
gst_object_unref (clock);
GST_DEBUG_OBJECT (rtpsession, "leaving RTCP thread");
}
static gboolean
start_rtcp_thread (GstRTPSession * rtpsession)
{
GError *error = NULL;
gboolean res;
GST_DEBUG_OBJECT (rtpsession, "starting RTCP thread");
GST_RTP_SESSION_LOCK (rtpsession);
rtpsession->priv->stop_thread = FALSE;
rtpsession->priv->thread =
g_thread_create ((GThreadFunc) rtcp_thread, rtpsession, TRUE, &error);
GST_RTP_SESSION_UNLOCK (rtpsession);
if (error != NULL) {
res = FALSE;
GST_DEBUG_OBJECT (rtpsession, "failed to start thread, %s", error->message);
g_error_free (error);
} else {
res = TRUE;
}
return res;
}
static void
stop_rtcp_thread (GstRTPSession * rtpsession)
{
GST_DEBUG_OBJECT (rtpsession, "stopping RTCP thread");
GST_RTP_SESSION_LOCK (rtpsession);
rtpsession->priv->stop_thread = TRUE;
if (rtpsession->priv->id)
gst_clock_id_unschedule (rtpsession->priv->id);
GST_RTP_SESSION_UNLOCK (rtpsession);
g_thread_join (rtpsession->priv->thread);
}
static GstStateChangeReturn
gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
{
@ -258,6 +384,8 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
break;
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
stop_rtcp_thread (rtpsession);
default:
break;
}
@ -265,6 +393,10 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
res = parent_class->change_state (element, transition);
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
if (!start_rtcp_thread (rtpsession))
goto failed_thread;
break;
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
@ -275,15 +407,158 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
break;
}
return res;
/* ERRORS */
failed_thread:
{
return GST_STATE_CHANGE_FAILURE;
}
}
/* called when the session manager has an RTP packet ready for further
* processing */
static GstFlowReturn
gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src,
GstBuffer * buffer, gpointer user_data)
{
GstFlowReturn result;
GstRTPSession *rtpsession;
GstRTPSessionPrivate *priv;
rtpsession = GST_RTP_SESSION (user_data);
priv = rtpsession->priv;
if (rtpsession->recv_rtp_src) {
result = gst_pad_push (rtpsession->recv_rtp_src, buffer);
} else {
gst_buffer_unref (buffer);
result = GST_FLOW_OK;
}
return result;
}
/* called when the session manager has an RTP packet ready for further
* sending */
static GstFlowReturn
gst_rtp_session_send_rtp (RTPSession * sess, RTPSource * src,
GstBuffer * buffer, gpointer user_data)
{
GstFlowReturn result;
GstRTPSession *rtpsession;
GstRTPSessionPrivate *priv;
rtpsession = GST_RTP_SESSION (user_data);
priv = rtpsession->priv;
if (rtpsession->send_rtp_src) {
result = gst_pad_push (rtpsession->send_rtp_src, buffer);
} else {
gst_buffer_unref (buffer);
result = GST_FLOW_OK;
}
return result;
}
/* called when the session manager has an RTCP packet ready for further
* sending */
static GstFlowReturn
gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src,
GstBuffer * buffer, gpointer user_data)
{
GstFlowReturn result;
GstRTPSession *rtpsession;
GstRTPSessionPrivate *priv;
rtpsession = GST_RTP_SESSION (user_data);
priv = rtpsession->priv;
if (rtpsession->send_rtcp_src) {
result = gst_pad_push (rtpsession->send_rtcp_src, buffer);
} else {
gst_buffer_unref (buffer);
result = GST_FLOW_OK;
}
return result;
}
/* called when the session manager needs the clock rate */
static gint
gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
gpointer user_data)
{
gint result = -1;
GstRTPSession *rtpsession;
GValue ret = { 0 };
GValue args[2] = { {0}, {0} };
GstCaps *caps;
const GstStructure *caps_struct;
rtpsession = GST_RTP_SESSION_CAST (user_data);
g_value_init (&args[0], GST_TYPE_ELEMENT);
g_value_set_object (&args[0], rtpsession);
g_value_init (&args[1], G_TYPE_UINT);
g_value_set_uint (&args[1], payload);
g_value_init (&ret, GST_TYPE_CAPS);
g_value_set_boxed (&ret, NULL);
g_signal_emitv (args, gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP], 0,
&ret);
caps = (GstCaps *) g_value_get_boxed (&ret);
if (!caps)
goto no_caps;
caps_struct = gst_caps_get_structure (caps, 0);
if (!gst_structure_get_int (caps_struct, "clock-rate", &result))
goto no_clock_rate;
return result;
/* ERRORS */
no_caps:
{
GST_DEBUG_OBJECT (rtpsession, "could not get caps");
return -1;
}
no_clock_rate:
{
GST_DEBUG_OBJECT (rtpsession, "could not clock-rate from caps");
return -1;
}
}
/* called when the session manager needs the time of clock */
static GstClockTime
gst_rtp_session_get_time (RTPSession * sess, gpointer user_data)
{
GstClockTime result;
GstRTPSession *rtpsession;
GstClock *clock;
rtpsession = GST_RTP_SESSION_CAST (user_data);
clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession));
if (clock) {
result = gst_clock_get_time (clock);
gst_object_unref (clock);
} else
result = GST_CLOCK_TIME_NONE;
return result;
}
static GstFlowReturn
gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event)
{
GstRTPSession *rtpsession;
GstRTPSessionPrivate *priv;
gboolean ret = FALSE;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
priv = rtpsession->priv;
GST_DEBUG_OBJECT (rtpsession, "received event %s",
GST_EVENT_TYPE_NAME (event));
@ -305,14 +580,15 @@ static GstFlowReturn
gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer)
{
GstRTPSession *rtpsession;
GstRTPSessionPrivate *priv;
GstFlowReturn ret;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
priv = rtpsession->priv;
GST_DEBUG_OBJECT (rtpsession, "received RTP packet");
/* FIXME, do something */
ret = gst_pad_push (rtpsession->recv_rtp_src, buffer);
ret = rtp_session_process_rtp (priv->session, buffer);
gst_object_unref (rtpsession);
@ -323,9 +599,11 @@ static GstFlowReturn
gst_rtp_session_event_recv_rtcp_sink (GstPad * pad, GstEvent * event)
{
GstRTPSession *rtpsession;
GstRTPSessionPrivate *priv;
gboolean ret = FALSE;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
priv = rtpsession->priv;
GST_DEBUG_OBJECT (rtpsession, "received event %s",
GST_EVENT_TYPE_NAME (event));
@ -347,14 +625,15 @@ static GstFlowReturn
gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstBuffer * buffer)
{
GstRTPSession *rtpsession;
GstRTPSessionPrivate *priv;
GstFlowReturn ret;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
priv = rtpsession->priv;
/* FIXME, do something */
GST_DEBUG_OBJECT (rtpsession, "received RTCP packet");
ret = gst_pad_push (rtpsession->sync_src, buffer);
ret = rtp_session_process_rtcp (priv->session, buffer);
gst_object_unref (rtpsession);
@ -365,9 +644,11 @@ static GstFlowReturn
gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstEvent * event)
{
GstRTPSession *rtpsession;
GstRTPSessionPrivate *priv;
gboolean ret = FALSE;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
priv = rtpsession->priv;
GST_DEBUG_OBJECT (rtpsession, "received event");
@ -388,14 +669,15 @@ static GstFlowReturn
gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer)
{
GstRTPSession *rtpsession;
GstRTPSessionPrivate *priv;
GstFlowReturn ret;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
priv = rtpsession->priv;
GST_DEBUG_OBJECT (rtpsession, "received RTP packet");
/* FIXME, do something */
ret = gst_pad_push (rtpsession->send_rtp_src, buffer);
ret = rtp_session_send_rtp (priv->session, buffer);
gst_object_unref (rtpsession);
@ -494,16 +776,18 @@ create_send_rtp_sink (GstRTPSession * rtpsession)
* RTCP packets.
*/
static GstPad *
create_rtcp_src (GstRTPSession * rtpsession)
create_send_rtcp_src (GstRTPSession * rtpsession)
{
GST_DEBUG_OBJECT (rtpsession, "creating pad");
rtpsession->rtcp_src =
gst_pad_new_from_static_template (&rtpsession_rtcp_src_template, NULL);
gst_pad_set_active (rtpsession->rtcp_src, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->rtcp_src);
rtpsession->send_rtcp_src =
gst_pad_new_from_static_template (&rtpsession_send_rtcp_src_template,
NULL);
gst_pad_set_active (rtpsession->send_rtcp_src, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
rtpsession->send_rtcp_src);
return rtpsession->rtcp_src;
return rtpsession->send_rtcp_src;
}
static GstPad *
@ -542,11 +826,12 @@ gst_rtp_session_request_new_pad (GstElement * element,
goto exists;
result = create_send_rtp_sink (rtpsession);
} else if (templ == gst_element_class_get_pad_template (klass, "rtcp_src")) {
if (rtpsession->rtcp_src != NULL)
} else if (templ == gst_element_class_get_pad_template (klass,
"send_rtcp_src")) {
if (rtpsession->send_rtcp_src != NULL)
goto exists;
result = create_rtcp_src (rtpsession);
result = create_send_rtcp_src (rtpsession);
} else
goto wrong_template;

View file

@ -32,6 +32,7 @@
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTP_SESSION))
#define GST_IS_RTP_SESSION_CLASS(klass) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTP_SESSION))
#define GST_RTP_SESSION_CAST(obj) ((GstRTPSession *)(obj))
typedef struct _GstRTPSession GstRTPSession;
typedef struct _GstRTPSessionClass GstRTPSessionClass;
@ -48,13 +49,16 @@ struct _GstRTPSession {
GstPad *recv_rtp_src;
GstPad *sync_src;
GstPad *send_rtp_src;
GstPad *rtcp_src;
GstPad *send_rtcp_src;
GstRTPSessionPrivate *priv;
};
struct _GstRTPSessionClass {
GstElementClass parent_class;
/* signals */
GstCaps* (*request_pt_map) (GstRTPSession *sess, guint pt);
};
GType gst_rtp_session_get_type (void);

1026
gst/rtpmanager/rtpsession.c Normal file

File diff suppressed because it is too large Load diff

206
gst/rtpmanager/rtpsession.h Normal file
View file

@ -0,0 +1,206 @@
/* GStreamer
* Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __RTP_SESSION_H__
#define __RTP_SESSION_H__
#include <gst/gst.h>
#include <gst/netbuffer/gstnetbuffer.h>
#include "rtpsource.h"
typedef struct _RTPSession RTPSession;
typedef struct _RTPSessionClass RTPSessionClass;
#define RTP_TYPE_SESSION (rtp_session_get_type())
#define RTP_SESSION(sess) (G_TYPE_CHECK_INSTANCE_CAST((sess),RTP_TYPE_SESSION,RTPSession))
#define RTP_SESSION_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),RTP_TYPE_SESSION,RTPSessionClass))
#define RTP_IS_SESSION(sess) (G_TYPE_CHECK_INSTANCE_TYPE((sess),RTP_TYPE_SESSION))
#define RTP_IS_SESSION_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),RTP_TYPE_SESSION))
#define RTP_SESSION_CAST(sess) ((RTPSession *)(sess))
#define RTP_SESSION_LOCK(sess) (g_mutex_lock ((sess)->lock))
#define RTP_SESSION_UNLOCK(sess) (g_mutex_unlock ((sess)->lock))
/**
* RTPSessionProcessRTP:
* @sess: an #RTPSession
* @src: the #RTPSource
* @buffer: the RTP buffer ready for processing
* @user_data: user data specified when registering
*
* This callback will be called when @sess has @buffer ready for further
* processing. Processing the buffer typically includes decoding and displaying
* the buffer.
*
* Returns: a #GstFlowReturn.
*/
typedef GstFlowReturn (*RTPSessionProcessRTP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer, gpointer user_data);
/**
* RTPSessionSendRTP:
* @sess: an #RTPSession
* @src: the #RTPSource
* @buffer: the RTP buffer ready for sending
* @user_data: user data specified when registering
*
* This callback will be called when @sess has @buffer ready for sending to
* all listening participants in this session.
*
* Returns: a #GstFlowReturn.
*/
typedef GstFlowReturn (*RTPSessionSendRTP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer, gpointer user_data);
/**
* RTPSessionSendRTCP:
* @sess: an #RTPSession
* @src: the #RTPSource
* @buffer: the RTCP buffer ready for sending
* @user_data: user data specified when registering
*
* This callback will be called when @sess has @buffer ready for sending to
* all listening participants in this session.
*
* Returns: a #GstFlowReturn.
*/
typedef GstFlowReturn (*RTPSessionSendRTCP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer, gpointer user_data);
/**
* RTPSessionClockRate:
* @sess: an #RTPSession
* @payload: the payload
* @user_data: user data specified when registering
*
* This callback will be called when @sess needs the clock-rate of @payload.
*
* Returns: the clock-rate of @pt.
*/
typedef gint (*RTPSessionClockRate) (RTPSession *sess, guint8 payload, gpointer user_data);
/**
* RTPSessionGetTime:
* @sess: an #RTPSession
* @user_data: user data specified when registering
*
* This callback will be called when @sess needs the current time in
* nanoseconds.
*
* Returns: a #GstClockTime with the current time in nanoseconds.
*/
typedef GstClockTime (*RTPSessionGetTime) (RTPSession *sess, gpointer user_data);
/**
* RTPSessionCallbacks:
* @RTPSessionProcessRTP: callback to process RTP packets
* @RTPSessionSendRTP: callback for sending RTP packets
* @RTPSessionSendRTCP: callback for sending RTCP packets
* @RTPSessionGetTime: callback for returning the current time
*
* These callbacks can be installed on the session manager to get notification
* when RTP and RTCP packets are ready for further processing. These callbacks
* are not implemented with signals for performance reasons.
*/
typedef struct {
RTPSessionProcessRTP process_rtp;
RTPSessionSendRTP send_rtp;
RTPSessionSendRTCP send_rtcp;
RTPSessionClockRate clock_rate;
RTPSessionGetTime get_time;
} RTPSessionCallbacks;
/**
* RTPSession:
* @lock: lock to protect the session
* @source: the source of this session
* @ssrcs: Hashtable of sources indexed by SSRC
* @cnames: Hashtable of sources indexed by CNAME
* @num_sources: the number of sources
* @activecount: the number of active sources
* @callbacks: callbacks
* @user_data: user data passed in callbacks
*
* The RTP session manager object
*/
struct _RTPSession {
GObject object;
GMutex *lock;
guint header_len;
RTPSource *source;
GHashTable *ssrcs;
GHashTable *cnames;
guint total_sources;
RTPSessionCallbacks callbacks;
gpointer user_data;
RTPSessionStats stats;
};
/**
* RTPSessionClass:
* @on_new_ssrc: emited when a new source is found
* @on_bye_ssrc: emited when a source is gone
*
* The session class.
*/
struct _RTPSessionClass {
GObjectClass parent_class;
/* signals */
void (*on_new_ssrc) (RTPSession *sess, RTPSource *source);
void (*on_ssrc_collision) (RTPSession *sess, RTPSource *source);
void (*on_ssrc_validated) (RTPSession *sess, RTPSource *source);
void (*on_bye_ssrc) (RTPSession *sess, RTPSource *source);
};
GType rtp_session_get_type (void);
/* create and configure */
RTPSession* rtp_session_new (void);
void rtp_session_set_callbacks (RTPSession *sess,
RTPSessionCallbacks *callbacks,
gpointer user_data);
void rtp_session_set_bandwidth (RTPSession *sess, gdouble bandwidth);
gdouble rtp_session_get_bandwidth (RTPSession *sess);
void rtp_session_set_rtcp_fraction (RTPSession *sess, gdouble fraction);
gdouble rtp_session_get_rtcp_fraction (RTPSession *sess);
/* handling sources */
gboolean rtp_session_add_source (RTPSession *sess, RTPSource *src);
gint rtp_session_get_num_sources (RTPSession *sess);
gint rtp_session_get_num_active_sources (RTPSession *sess);
RTPSource* rtp_session_get_source_by_ssrc (RTPSession *sess, guint32 ssrc);
RTPSource* rtp_session_get_source_by_cname (RTPSession *sess, const gchar *cname);
RTPSource* rtp_session_create_source (RTPSession *sess);
/* processing packets from receivers */
GstFlowReturn rtp_session_process_rtp (RTPSession *sess, GstBuffer *buffer);
GstFlowReturn rtp_session_process_rtcp (RTPSession *sess, GstBuffer *buffer);
/* processing packets for sending */
GstFlowReturn rtp_session_send_rtp (RTPSession *sess, GstBuffer *buffer);
/* get interval for next RTCP interval */
gdouble rtp_session_get_rtcp_interval (RTPSession *sess);
GstFlowReturn rtp_session_produce_rtcp (RTPSession *sess);
#endif /* __RTP_SESSION_H__ */

477
gst/rtpmanager/rtpsource.c Normal file
View file

@ -0,0 +1,477 @@
/* GStreamer
* Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#include <string.h>
#include <gst/rtp/gstrtpbuffer.h>
#include <gst/rtp/gstrtcpbuffer.h>
#include "rtpsource.h"
GST_DEBUG_CATEGORY_STATIC (rtp_source_debug);
#define GST_CAT_DEFAULT rtp_source_debug
#define RTP_MAX_PROBATION_LEN 32
/* signals and args */
enum
{
LAST_SIGNAL
};
enum
{
PROP_0
};
/* GObject vmethods */
static void rtp_source_finalize (GObject * object);
/* static guint rtp_source_signals[LAST_SIGNAL] = { 0 }; */
G_DEFINE_TYPE (RTPSource, rtp_source, G_TYPE_OBJECT);
static void
rtp_source_class_init (RTPSourceClass * klass)
{
GObjectClass *gobject_class;
gobject_class = (GObjectClass *) klass;
gobject_class->finalize = rtp_source_finalize;
GST_DEBUG_CATEGORY_INIT (rtp_source_debug, "rtpsource", 0, "RTP Source");
}
static void
rtp_source_init (RTPSource * src)
{
/* sources are initialy on probation until we receive enough valid RTP
* packets or a valid RTCP packet */
src->validated = FALSE;
src->probation = RTP_DEFAULT_PROBATION;
src->payload = 0;
src->clock_rate = -1;
src->packets = g_queue_new ();
src->stats.jitter = 0;
src->stats.transit = -1;
src->stats.curr_sr = 0;
src->stats.curr_rr = 0;
}
static void
rtp_source_finalize (GObject * object)
{
RTPSource *src;
GstBuffer *buffer;
src = RTP_SOURCE_CAST (object);
while ((buffer = g_queue_pop_head (src->packets)))
gst_buffer_unref (buffer);
g_queue_free (src->packets);
G_OBJECT_CLASS (rtp_source_parent_class)->finalize (object);
}
/**
* rtp_source_new:
* @ssrc: an SSRC
*
* Create a #RTPSource with @ssrc.
*
* Returns: a new #RTPSource. Use g_object_unref() after usage.
*/
RTPSource *
rtp_source_new (guint32 ssrc)
{
RTPSource *src;
src = g_object_new (RTP_TYPE_SOURCE, NULL);
src->ssrc = ssrc;
return src;
}
/**
* rtp_source_set_callbacks:
* @src: an #RTPSource
* @cb: callback functions
* @user_data: user data
*
* Set the callbacks for the source.
*/
void
rtp_source_set_callbacks (RTPSource * src, RTPSourceCallbacks * cb,
gpointer user_data)
{
g_return_if_fail (RTP_IS_SOURCE (src));
src->callbacks.push_rtp = cb->push_rtp;
src->callbacks.clock_rate = cb->clock_rate;
src->user_data = user_data;
}
/**
* rtp_source_set_as_csrc:
* @src: an #RTPSource
*
* Configure @src as a CSRC, this will validate the RTpSource.
*/
void
rtp_source_set_as_csrc (RTPSource * src)
{
g_return_if_fail (RTP_IS_SOURCE (src));
src->validated = TRUE;
src->is_csrc = TRUE;
}
/**
* rtp_source_set_rtp_from:
* @src: an #RTPSource
* @address: the RTP address to set
*
* Set that @src is receiving RTP packets from @address. This is used for
* collistion checking.
*/
void
rtp_source_set_rtp_from (RTPSource * src, GstNetAddress * address)
{
g_return_if_fail (RTP_IS_SOURCE (src));
src->have_rtp_from = TRUE;
memcpy (&src->rtp_from, address, sizeof (GstNetAddress));
}
/**
* rtp_source_set_rtcp_from:
* @src: an #RTPSource
* @address: the RTCP address to set
*
* Set that @src is receiving RTCP packets from @address. This is used for
* collistion checking.
*/
void
rtp_source_set_rtcp_from (RTPSource * src, GstNetAddress * address)
{
g_return_if_fail (RTP_IS_SOURCE (src));
src->have_rtcp_from = TRUE;
memcpy (&src->rtcp_from, address, sizeof (GstNetAddress));
}
static GstFlowReturn
push_packet (RTPSource * src, GstBuffer * buffer)
{
GstFlowReturn ret = GST_FLOW_OK;
/* push queued packets first if any */
while (!g_queue_is_empty (src->packets)) {
GstBuffer *buffer = GST_BUFFER_CAST (g_queue_pop_head (src->packets));
GST_DEBUG ("pushing queued packet");
if (src->callbacks.push_rtp)
src->callbacks.push_rtp (src, buffer, src->user_data);
else
gst_buffer_unref (buffer);
}
GST_DEBUG ("pushing new packet");
/* push packet */
if (src->callbacks.push_rtp)
ret = src->callbacks.push_rtp (src, buffer, src->user_data);
else
gst_buffer_unref (buffer);
return ret;
}
static gint
get_clock_rate (RTPSource * src, guint8 payload)
{
if (payload != src->payload) {
gint clock_rate = -1;
if (src->callbacks.clock_rate)
clock_rate = src->callbacks.clock_rate (src, payload, src->user_data);
GST_DEBUG ("new payload %d, got clock-rate %d", payload, clock_rate);
src->clock_rate = clock_rate;
src->payload = payload;
}
return src->clock_rate;
}
static void
calculate_jitter (RTPSource * src, GstBuffer * buffer,
RTPArrivalStats * arrival)
{
GstClockTime current;
guint32 rtparrival, transit, rtptime;
gint32 diff;
gint clock_rate;
guint8 pt;
/* get arrival time */
if ((current = arrival->time) == GST_CLOCK_TIME_NONE)
goto no_time;
pt = gst_rtp_buffer_get_payload_type (buffer);
/* get clockrate */
if ((clock_rate = get_clock_rate (src, pt)) == -1)
goto no_clock_rate;
rtptime = gst_rtp_buffer_get_timestamp (buffer);
/* convert arrival time to RTP timestamp units */
rtparrival = gst_util_uint64_scale_int (current, clock_rate, GST_SECOND);
/* transit time is difference with RTP timestamp */
transit = rtparrival - rtptime;
/* get diff with previous transit time */
if (src->stats.transit != -1)
diff = transit - src->stats.transit;
else
diff = 0;
src->stats.transit = transit;
if (diff < 0)
diff = -diff;
/* update jitter */
src->stats.jitter += diff - ((src->stats.jitter + 8) >> 4);
src->stats.prev_rtptime = src->stats.last_rtptime;
src->stats.last_rtptime = rtparrival;
GST_DEBUG ("rtparrival %u, rtptime %u, clock-rate %d, diff %d, jitter: %u",
rtparrival, rtptime, clock_rate, diff, src->stats.jitter);
return;
/* ERRORS */
no_time:
{
GST_WARNING ("cannot get current time");
return;
}
no_clock_rate:
{
GST_WARNING ("cannot get clock-rate for pt %d", pt);
return;
}
}
/**
* rtp_source_process_rtp:
* @src: an #RTPSource
* @buffer: an RTP buffer
*
* Let @src handle the incomming RTP @buffer.
*
* Returns: a #GstFlowReturn.
*/
GstFlowReturn
rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer,
RTPArrivalStats * arrival)
{
GstFlowReturn result = GST_FLOW_OK;
g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
/* if we are still on probation, check seqnum */
if (src->probation) {
guint16 seqnr, expected;
expected = src->stats.max_seqnr + 1;
/* when in probation, we require consecutive seqnums */
seqnr = gst_rtp_buffer_get_seq (buffer);
if (seqnr == expected) {
/* expected packet */
src->probation--;
src->stats.max_seqnr = seqnr;
GST_DEBUG ("probation: seqnr %d == expected %d", seqnr, expected);
} else {
GST_DEBUG ("probation: seqnr %d != expected %d", seqnr, expected);
src->probation = RTP_DEFAULT_PROBATION;
src->stats.max_seqnr = seqnr;
}
}
if (src->probation) {
GstBuffer *q;
GST_DEBUG ("probation %d: queue buffer", src->probation);
/* when still in probation, keep packets in a list. */
g_queue_push_tail (src->packets, buffer);
/* remove packets from queue if there are too many */
while (g_queue_get_length (src->packets) > RTP_MAX_PROBATION_LEN) {
q = g_queue_pop_head (src->packets);
gst_object_unref (q);
}
} else {
/* we are not in probation */
src->stats.octetsreceived += arrival->payload_len;
src->stats.bytesreceived += arrival->bytes;
src->stats.packetsreceived++;
src->is_sender = TRUE;
GST_DEBUG ("PC: %" G_GUINT64_FORMAT ", OC: %" G_GUINT64_FORMAT,
src->stats.packetsreceived, src->stats.octetsreceived);
/* calculate jitter */
calculate_jitter (src, buffer, arrival);
/* we're ready to push the RTP packet now */
result = push_packet (src, buffer);
}
return result;
}
/**
* rtp_source_process_bye:
* @src: an #RTPSource
* @reason: the reason for leaving
*
* Notify @src that a BYE packet has been received. This will make the source
* inactive.
*/
void
rtp_source_process_bye (RTPSource * src, const gchar * reason)
{
g_return_if_fail (RTP_IS_SOURCE (src));
GST_DEBUG ("marking SSRC %08x as BYE, reason: %s", src->ssrc,
GST_STR_NULL (reason));
/* copy the reason and mark as received_bye */
g_free (src->bye_reason);
src->bye_reason = g_strdup (reason);
src->received_bye = TRUE;
}
/**
* rtp_source_send_rtp:
* @src: an #RTPSource
* @buffer: an RTP buffer
*
* Send an RTP @buffer originating from @src. This will make @src a sender.
*
* Returns: a #GstFlowReturn.
*/
GstFlowReturn
rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer)
{
GstFlowReturn result = GST_FLOW_OK;
g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
/* we are a sender now */
src->is_sender = TRUE;
/* push packet */
if (src->callbacks.push_rtp)
result = src->callbacks.push_rtp (src, buffer, src->user_data);
else
gst_buffer_unref (buffer);
return result;
}
/**
* rtp_source_process_sr:
* @src: an #RTPSource
* @ntptime: the NTP time
* @rtptime: the RTP time
* @packet_count: the packet count
* @octet_count: the octect count
*
* Update the sender report in @src.
*/
void
rtp_source_process_sr (RTPSource * src, guint64 ntptime, guint32 rtptime,
guint32 packet_count, guint32 octet_count)
{
RTPSenderReport *curr;
gint curridx;
g_return_if_fail (RTP_IS_SOURCE (src));
GST_DEBUG ("got SR packet: SSRC %08x, NTP %" G_GUINT64_FORMAT
", RTP %u, PC %u, OC %u", src->ssrc, ntptime, rtptime, packet_count,
octet_count);
curridx = src->stats.curr_sr ^ 1;
curr = &src->stats.sr[curridx];
/* update current */
curr->is_valid = TRUE;
curr->ntptime = ntptime;
curr->rtptime = rtptime;
curr->packet_count = packet_count;
curr->octet_count = octet_count;
/* make current */
src->stats.curr_sr = curridx;
}
/**
* rtp_source_process_rb:
* @src: an #RTPSource
* @fractionlost: fraction lost since last SR/RR
* @packetslost: the cumululative number of packets lost
* @exthighestseq: the extended last sequence number received
* @jitter: the interarrival jitter
* @lsr: the last SR packet from this source
* @dlsr: the delay since last SR packet
*
* Update the report block in @src.
*/
void
rtp_source_process_rb (RTPSource * src, guint8 fractionlost, gint32 packetslost,
guint32 exthighestseq, guint32 jitter, guint32 lsr, guint32 dlsr)
{
RTPReceiverReport *curr;
gint curridx;
g_return_if_fail (RTP_IS_SOURCE (src));
GST_DEBUG ("got RB packet %d: SSRC %08x, FL %u"
", PL %u, HS %u, JITTER %u, LSR %u, DLSR %u", src->ssrc, fractionlost,
packetslost, exthighestseq, jitter, lsr, dlsr);
curridx = src->stats.curr_rr ^ 1;
curr = &src->stats.rr[curridx];
/* update current */
curr->is_valid = TRUE;
curr->fractionlost = fractionlost;
curr->packetslost = packetslost;
curr->exthighestseq = exthighestseq;
curr->jitter = jitter;
curr->lsr = lsr;
curr->dlsr = dlsr;
/* make current */
src->stats.curr_rr = curridx;
}

162
gst/rtpmanager/rtpsource.h Normal file
View file

@ -0,0 +1,162 @@
/* GStreamer
* Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __RTP_SOURCE_H__
#define __RTP_SOURCE_H__
#include <gst/gst.h>
#include <gst/rtp/gstrtcpbuffer.h>
#include <gst/netbuffer/gstnetbuffer.h>
#include "rtpstats.h"
/* the default number of consecutive RTP packets we need to receive before the
* source is considered valid */
#define RTP_NO_PROBATION 0
#define RTP_DEFAULT_PROBATION 2
typedef struct _RTPSource RTPSource;
typedef struct _RTPSourceClass RTPSourceClass;
#define RTP_TYPE_SOURCE (rtp_source_get_type())
#define RTP_SOURCE(src) (G_TYPE_CHECK_INSTANCE_CAST((src),RTP_TYPE_SOURCE,RTPSource))
#define RTP_SOURCE_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),RTP_TYPE_SOURCE,RTPSourceClass))
#define RTP_IS_SOURCE(src) (G_TYPE_CHECK_INSTANCE_TYPE((src),RTP_TYPE_SOURCE))
#define RTP_IS_SOURCE_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),RTP_TYPE_SOURCE))
#define RTP_SOURCE_CAST(src) ((RTPSource *)(src))
/**
* RTP_SOURCE_IS_ACTIVE:
* @src: an #RTPSource
*
* Check if @src is active. A source is active when it has been validated
* and has not yet received a BYE packet.
*/
#define RTP_SOURCE_IS_ACTIVE(src) (src->validated && !src->received_bye)
/**
* RTP_SOURCE_IS_SENDER:
* @src: an #RTPSource
*
* Check if @src is a sender.
*/
#define RTP_SOURCE_IS_SENDER(src) (src->is_sender)
/**
* RTPSourcePushRTP:
* @src: an #RTPSource
* @buffer: the RTP buffer ready for processing
* @user_data: user data specified when registering
*
* This callback will be called when @src has @buffer ready for further
* processing.
*
* Returns: a #GstFlowReturn.
*/
typedef GstFlowReturn (*RTPSourcePushRTP) (RTPSource *src, GstBuffer *buffer, gpointer user_data);
/**
* RTPSourceClockRate:
* @src: an #RTPSource
* @payload: a payload type
* @user_data: user data specified when registering
*
* This callback will be called when @src needs the clock-rate of the
* @payload.
*
* Returns: a clock-rate for @payload.
*/
typedef gint (*RTPSourceClockRate) (RTPSource *src, guint8 payload, gpointer user_data);
/**
* RTPSourceCallbacks:
* @push_rtp: a packet becomes available for handling
* @clock_rate: a clock-rate is requested
* @get_time: the current clock time is requested
*
* Callbacks performed by #RTPSource when actions need to be performed.
*/
typedef struct {
RTPSourcePushRTP push_rtp;
RTPSourceClockRate clock_rate;
} RTPSourceCallbacks;
/**
* RTPSource:
*
* A source in the #RTPSession
*/
struct _RTPSource {
GObject object;
/*< private >*/
RTPSourceCallbacks callbacks;
gpointer user_data;
guint32 ssrc;
gchar *cname;
gint probation;
gboolean validated;
gboolean received_bye;
gchar *bye_reason;
gboolean is_csrc;
gboolean is_sender;
gboolean have_rtp_from;
GstNetAddress rtp_from;
gboolean have_rtcp_from;
GstNetAddress rtcp_from;
guint8 payload;
gint clock_rate;
GQueue *packets;
RTPSourceStats stats;
};
struct _RTPSourceClass {
GObjectClass parent_class;
};
GType rtp_source_get_type (void);
/* managing lifetime of sources */
RTPSource* rtp_source_new (guint32 ssrc);
void rtp_source_set_callbacks (RTPSource *src, RTPSourceCallbacks *cb, gpointer data);
void rtp_source_set_as_csrc (RTPSource *src);
void rtp_source_set_rtp_from (RTPSource *src, GstNetAddress *address);
void rtp_source_set_rtcp_from (RTPSource *src, GstNetAddress *address);
GstFlowReturn rtp_source_process_rtp (RTPSource *src, GstBuffer *buffer, RTPArrivalStats *arrival);
GstFlowReturn rtp_source_send_rtp (RTPSource *src, GstBuffer *buffer);
/* RTCP messages */
void rtp_source_process_bye (RTPSource *src, const gchar *reason);
void rtp_source_process_sr (RTPSource *src, guint64 ntptime, guint32 rtptime,
guint32 packet_count, guint32 octet_count);
void rtp_source_process_rb (RTPSource *src, guint8 fractionlost, gint32 packetslost,
guint32 exthighestseq, guint32 jitter,
guint32 lsr, guint32 dlsr);
#endif /* __RTP_SOURCE_H__ */

111
gst/rtpmanager/rtpstats.c Normal file
View file

@ -0,0 +1,111 @@
/* GStreamer
* Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#include "rtpstats.h"
/**
* rtp_stats_init_defaults:
* @stats: an #RTPSessionStats struct
*
* Initialize @stats with its default values.
*/
void
rtp_stats_init_defaults (RTPSessionStats * stats)
{
stats->bandwidth = RTP_STATS_BANDWIDTH;
stats->sender_fraction = RTP_STATS_SENDER_FRACTION;
stats->receiver_fraction = RTP_STATS_RECEIVER_FRACTION;
stats->rtcp_bandwidth = RTP_STATS_RTCP_BANDWIDTH;
stats->min_interval = RTP_STATS_MIN_INTERVAL;
}
/**
* rtp_stats_calculate_rtcp_interval:
* @stats: an #RTPSessionStats struct
*
* Calculate the RTCP interval. The result of this function is the amount of
* time to wait (in seconds) before sender a new RTCP message.
*
* Returns: the RTCP interval.
*/
gdouble
rtp_stats_calculate_rtcp_interval (RTPSessionStats * stats, gboolean sender)
{
gdouble active, senders, receivers, sfraction;
gboolean avg_rtcp;
gdouble interval;
active = stats->active_sources;
/* Try to avoid division by zero */
if (stats->active_sources == 0)
active += 1.0;
senders = (gdouble) stats->sender_sources;
receivers = (gdouble) (active - senders);
avg_rtcp = (gdouble) stats->avg_rtcp_packet_size;
sfraction = senders / active;
GST_DEBUG ("senders: %f, receivers %f, avg_rtcp %f, sfraction %f",
senders, receivers, avg_rtcp, sfraction);
if (sfraction <= stats->sender_fraction) {
if (sender) {
interval =
(avg_rtcp * senders) / (stats->sender_fraction *
stats->rtcp_bandwidth);
} else {
interval =
(avg_rtcp * receivers) / ((1.0 -
stats->sender_fraction) * stats->rtcp_bandwidth);
}
} else {
interval = (avg_rtcp * active) / stats->rtcp_bandwidth;
}
if (interval < stats->min_interval)
interval = stats->min_interval;
if (!stats->sent_rtcp)
interval /= 2.0;
return interval;
}
/**
* rtp_stats_calculate_rtcp_interval:
* @stats: an #RTPSessionStats struct
* @interval: an RTCP interval
*
* Apply a random jitter to the @interval. @interval is typically obtained with
* rtp_stats_calculate_rtcp_interval().
*
* Returns: the new RTCP interval.
*/
gdouble
rtp_stats_add_rtcp_jitter (RTPSessionStats * stats, gdouble interval)
{
/* see RFC 3550 p 30
* To compensate for "unconditional reconsideration" converging to a
* value below the intended average.
*/
#define COMPENSATION (2.71828 - 1.5);
return (interval * g_random_double_range (0.5, 1.5)) / COMPENSATION;
}

161
gst/rtpmanager/rtpstats.h Normal file
View file

@ -0,0 +1,161 @@
/* GStreamer
* Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __RTP_STATS_H__
#define __RTP_STATS_H__
#include <gst/gst.h>
#include <gst/netbuffer/gstnetbuffer.h>
/**
* RTPSenderReport:
*
* A sender report structure.
*/
typedef struct {
gboolean is_valid;
guint64 ntptime;
guint32 rtptime;
guint32 packet_count;
guint32 octet_count;
} RTPSenderReport;
/**
* RTPReceiverReport:
*
* A receiver report structure.
*/
typedef struct {
gboolean is_valid;
guint32 ssrc; /* who the report is from */
guint8 fractionlost;
guint32 packetslost;
guint32 exthighestseq;
guint32 jitter;
guint32 lsr;
guint32 dlsr;
} RTPReceiverReport;
/**
* RTPArrivalStats:
* @time: arrival time of a packet
* @address: address of the sender of the packet
* @bytes: bytes of the packet including lowlevel overhead
* @payload_len: bytes of the RTP payload
*
* Structure holding information about the arrival stats of a packet.
*/
typedef struct {
GstClockTime time;
gboolean have_address;
GstNetAddress address;
guint bytes;
guint payload_len;
} RTPArrivalStats;
/**
* RTPSourceStats:
* @packetsreceived: number of received packets in total
* @prevpacketsreceived: number of packets received in previous reporting
* interval
* @octetsreceived: number of payload bytes received
* @bytesreceived: number of total bytes received including headers and lower
* protocol level overhead
* @max_seqnr: highest sequence number received
* @transit: previous transit time used for calculating @jitter
* @jitter: current jitter
* @prev_rtptime: previous time when an RTP packet was received
* @prev_rtcptime: previous time when an RTCP packet was received
* @last_rtptime: time when last RTP packet received
* @last_rtcptime: time when last RTCP packet received
* @curr_rr: index of current @rr block
* @rr: previous and current receiver report block
* @curr_sr: index of current @sr block
* @sr: previous and current sender report block
*
* Stats about a source.
*/
typedef struct {
guint64 packetsreceived;
guint64 prevpacketsreceived;
guint64 octetsreceived;
guint64 bytesreceived;
guint16 max_seqnr;
guint32 transit;
guint32 jitter;
/* when we received stuff */
GstClockTime prev_rtptime;
GstClockTime prev_rtcptime;
GstClockTime last_rtptime;
GstClockTime last_rtcptime;
/* sender and receiver reports */
gint curr_rr;
RTPReceiverReport rr[2];
gint curr_sr;
RTPSenderReport sr[2];
} RTPSourceStats;
#define RTP_STATS_BANDWIDTH 64000.0
#define RTP_STATS_RTCP_BANDWIDTH 3000.0
/*
* Minimum average time between RTCP packets from this site (in
* seconds). This time prevents the reports from `clumping' when
* sessions are small and the law of large numbers isn't helping
* to smooth out the traffic. It also keeps the report interval
* from becoming ridiculously small during transient outages like
* a network partition.
*/
#define RTP_STATS_MIN_INTERVAL 5.0
/*
* Fraction of the RTCP bandwidth to be shared among active
* senders. (This fraction was chosen so that in a typical
* session with one or two active senders, the computed report
* time would be roughly equal to the minimum report time so that
* we don't unnecessarily slow down receiver reports.) The
* receiver fraction must be 1 - the sender fraction.
*/
#define RTP_STATS_SENDER_FRACTION (0.25)
#define RTP_STATS_RECEIVER_FRACTION (1.0 - RTP_STATS_SENDER_FRACTION)
/**
* RTPSessionStats:
*
* Stats kept for a session and used to produce RTCP packet timeouts.
*/
typedef struct {
gdouble bandwidth;
gdouble sender_fraction;
gdouble receiver_fraction;
gdouble rtcp_bandwidth;
gdouble min_interval;
guint sender_sources;
guint active_sources;
guint avg_rtcp_packet_size;
guint avg_bye_packet_size;
gboolean sent_rtcp;
} RTPSessionStats;
void rtp_stats_init_defaults (RTPSessionStats *stats);
gdouble rtp_stats_calculate_rtcp_interval (RTPSessionStats *stats, gboolean sender);
gdouble rtp_stats_add_rtcp_jitter (RTPSessionStats *stats, gdouble interval);
#endif /* __RTP_STATS_H__ */