diff --git a/ChangeLog b/ChangeLog index 2de3cb0f7d..af63663809 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,79 @@ +2007-09-03 Wim Taymans + + * gst/rtpmanager/gstrtpbin-marshal.list: + * gst/rtpmanager/gstrtpbin.c: (gst_rtp_bin_get_client), + (gst_rtp_bin_associate), (gst_rtp_bin_sync_chain), (create_stream), + (gst_rtp_bin_init), (caps_changed), (new_ssrc_pad_found), + (create_recv_rtp), (create_recv_rtcp), (create_send_rtp): + * gst/rtpmanager/gstrtpbin.h: + Updated example pipelines in docs. + Handle sync_rtcp buffers from the SSRC demuxer to perform lip-sync. + Set the default latency correctly. + Add some more points where we can get caps. + + * gst/rtpmanager/gstrtpjitterbuffer.c: + (gst_rtp_jitter_buffer_class_init), + (gst_jitter_buffer_sink_parse_caps), (gst_rtp_jitter_buffer_loop), + (gst_rtp_jitter_buffer_query), + (gst_rtp_jitter_buffer_set_property), + (gst_rtp_jitter_buffer_get_property): + Add ts-offset property to control timestamping. + + * gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_class_init), + (gst_rtp_session_init), (gst_rtp_session_set_property), + (gst_rtp_session_get_property), (get_current_ntp_ns_time), + (rtcp_thread), (stop_rtcp_thread), (gst_rtp_session_change_state), + (gst_rtp_session_send_rtcp), (gst_rtp_session_sync_rtcp), + (gst_rtp_session_cache_caps), (gst_rtp_session_clock_rate), + (gst_rtp_session_sink_setcaps), (gst_rtp_session_chain_recv_rtp), + (gst_rtp_session_event_send_rtp_sink), + (gst_rtp_session_chain_send_rtp), (create_recv_rtp_sink), + (create_recv_rtcp_sink), (create_send_rtp_sink), + (create_send_rtcp_src): + Various cleanups. + Feed rtpsession manager with NTP time based on pipeline clock when + handling RTP packets and RTCP timeouts. + Perform all RTCP with the system clock. + Set caps on RTCP outgoing buffers. + + * gst/rtpmanager/gstrtpssrcdemux.c: (find_demux_pad_for_ssrc), + (create_demux_pad_for_ssrc), (gst_rtp_ssrc_demux_base_init), + (gst_rtp_ssrc_demux_init), (gst_rtp_ssrc_demux_sink_event), + (gst_rtp_ssrc_demux_rtcp_sink_event), (gst_rtp_ssrc_demux_chain), + (gst_rtp_ssrc_demux_rtcp_chain): + * gst/rtpmanager/gstrtpssrcdemux.h: + Also demux RTCP messages. + + * gst/rtpmanager/rtpsession.c: (rtp_session_set_callbacks), + (update_arrival_stats), (rtp_session_process_rtp), + (rtp_session_process_rb), (rtp_session_process_sr), + (rtp_session_process_rr), (rtp_session_process_rtcp), + (rtp_session_send_rtp), (rtp_session_send_bye), + (session_start_rtcp), (session_report_blocks), (session_cleanup), + (rtp_session_on_timeout): + * gst/rtpmanager/rtpsession.h: + Remove the get_time callback, the GStreamer part will feed us with + enough timing information. + Split sync timing and RTCP timing information. + Factor out common RB handling for SR and RR. + Send out SR RTCP packets for lip-sync. + Move SR and RR packet info generation to the source. + + * gst/rtpmanager/rtpsource.c: (rtp_source_init), + (rtp_source_update_caps), (get_clock_rate), (calculate_jitter), + (rtp_source_process_rtp), (rtp_source_send_rtp), + (rtp_source_process_sr), (rtp_source_process_rb), + (rtp_source_get_new_sr), (rtp_source_get_new_rb), + (rtp_source_get_last_sr): + * gst/rtpmanager/rtpsource.h: + * gst/rtpmanager/rtpstats.h: + Use caps on incomming buffers to get timing information when they are + there. + Calculate clock scew of the receiver compared to the sender and adjust + the rtp timestamps. + Calculate the round trip in sources. + Do SR and RR calculations in the source. + 2007-09-03 Renato Filho * configure.ac: diff --git a/gst/rtpmanager/gstrtpbin-marshal.list b/gst/rtpmanager/gstrtpbin-marshal.list index ca760d82bc..e5c5fc428a 100644 --- a/gst/rtpmanager/gstrtpbin-marshal.list +++ b/gst/rtpmanager/gstrtpbin-marshal.list @@ -3,3 +3,4 @@ BOXED:UINT BOXED:UINT,UINT VOID:UINT,OBJECT VOID:UINT,UINT +VOID:OBJECT,OBJECT diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c index cb2a9f0b3c..a4ba67c3ea 100644 --- a/gst/rtpmanager/gstrtpbin.c +++ b/gst/rtpmanager/gstrtpbin.c @@ -79,12 +79,12 @@ * * gst-launch gstrtpbin name=rtpbin \ * v4l2src ! ffmpegcolorspace ! ffenc_h263 ! rtph263ppay ! rtpbin.send_rtp_sink_0 \ - * rtpbin.send_rtp_src_0 ! udpsink port=5000 \ - * rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false \ - * udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0 \ - * audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1 \ - * rtpbin.send_rtp_src_1 ! udpsink port=5002 \ - * rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false \ + * rtpbin.send_rtp_src_0 ! udpsink port=5000 \ + * rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false async=false \ + * udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0 \ + * audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1 \ + * rtpbin.send_rtp_src_1 ! udpsink port=5002 \ + * rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false async=false \ * udpsrc port=5007 ! rtpbin.recv_rtcp_sink_1 * * Encode and payload H263 video captured from a v4l2src. Encode and payload AMR @@ -94,21 +94,22 @@ * on port 5001 and the audio RTCP packets for session 0 are sent on port 5003. * RTCP packets for session 0 are received on port 5005 and RTCP for session 1 * is received on port 5007. Since RTCP packets from the sender should be sent - * as soon as possible, sync=false is configured on udpsink. + * as soon as possible and do not participate in preroll, sync=false and + * async=false is configured on udpsink * * * - * gst-launch -v gstrtpbin name=rtpbin \ + * gst-launch -v gstrtpbin name=rtpbin \ * udpsrc caps="application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H263-1998" \ - * port=5000 ! rtpbin.recv_rtp_sink_0 \ - * rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink \ - * udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0 \ - * rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false \ - * udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \ - * port=5002 ! rtpbin.recv_rtp_sink_1 \ - * rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink \ - * udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1 \ - * rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false + * port=5000 ! rtpbin.recv_rtp_sink_0 \ + * rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink \ + * udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0 \ + * rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false async=false \ + * udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \ + * port=5002 ! rtpbin.recv_rtp_sink_1 \ + * rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink \ + * udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1 \ + * rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false async=false * * Receive H263 on port 5000, send it through rtpbin in session 0, depayload, * decode and display the video. @@ -122,7 +123,7 @@ * * * - * Last reviewed on 2007-08-28 (0.10.6) + * Last reviewed on 2007-08-30 (0.10.6) */ #ifdef HAVE_CONFIG_H @@ -130,6 +131,9 @@ #endif #include +#include +#include + #include "gstrtpbin-marshal.h" #include "gstrtpbin.h" @@ -187,6 +191,14 @@ GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%d", GST_STATIC_CAPS ("application/x-rtp") ); +/* padtemplate for the internal pad */ +static GstStaticPadTemplate rtpbin_sync_sink_template = +GST_STATIC_PAD_TEMPLATE ("sink_%d", + GST_PAD_SINK, + GST_PAD_SOMETIMES, + GST_STATIC_CAPS ("application/x-rtcp") + ); + #define GST_RTP_BIN_GET_PRIVATE(obj) \ (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRtpBinPrivate)) @@ -242,16 +254,37 @@ struct _GstRtpBinStream { /* the SSRC of this stream */ guint32 ssrc; + /* parent bin */ GstRtpBin *bin; + /* the session this SSRC belongs to */ GstRtpBinSession *session; + /* the jitterbuffer of the SSRC */ GstElement *buffer; + /* the PT demuxer of the SSRC */ GstElement *demux; gulong demux_newpad_sig; gulong demux_ptreq_sig; + + /* the internal pad we use to get RTCP sync messages */ + GstPad *sync_pad; + gboolean have_sync; + guint64 last_unix; + guint64 last_extrtptime; + + /* mapping to local RTP and NTP time */ + guint64 local_rtp; + guint64 local_unix; + gint64 unix_delta; + + /* for lip-sync */ + guint64 clock_base; + gint clock_rate; + gint64 ts_offset; + gint64 prev_ts_offset; }; #define GST_RTP_SESSION_LOCK(sess) g_mutex_lock ((sess)->lock) @@ -289,12 +322,28 @@ struct _GstRtpBinSession GstPad *recv_rtp_sink; GstPad *recv_rtp_src; GstPad *recv_rtcp_sink; - GstPad *recv_rtcp_src; + GstPad *sync_src; GstPad *send_rtp_sink; GstPad *send_rtp_src; GstPad *send_rtcp_src; }; +/* Manages the RTP streams that come from one client and should therefore be + * synchronized. + */ +struct _GstRtpBinClient +{ + /* the common CNAME for the streams */ + gchar *cname; + guint cname_len; + + /* the streams */ + guint nstreams; + GSList *streams; + + gint64 min_delta; +}; + /* find a session with the given id. Must be called with RTP_BIN_LOCK */ static GstRtpBinSession * find_session_by_id (GstRtpBin * rtpbin, gint id) @@ -513,6 +562,271 @@ gst_rtp_bin_clear_pt_map (GstRtpBin * bin) GST_RTP_BIN_UNLOCK (bin); } +static GstRtpBinClient * +gst_rtp_bin_get_client (GstRtpBin * bin, guint8 len, guint8 * data, + gboolean * created) +{ + GstRtpBinClient *result = NULL; + GSList *walk; + + for (walk = bin->clients; walk; walk = g_slist_next (walk)) { + GstRtpBinClient *client = (GstRtpBinClient *) walk->data; + + if (len != client->cname_len) + continue; + + if (!strncmp ((gchar *) data, client->cname, client->cname_len)) { + GST_DEBUG_OBJECT (bin, "found existing client %p with CNAME %s", client, + client->cname); + result = client; + break; + } + } + + /* nothing found, create one */ + if (result == NULL) { + result = g_new0 (GstRtpBinClient, 1); + result->cname = g_strndup ((gchar *) data, len); + result->cname_len = len; + result->min_delta = G_MAXINT64; + bin->clients = g_slist_prepend (bin->clients, result); + GST_DEBUG_OBJECT (bin, "created new client %p with CNAME %s", result, + result->cname); + } + return result; +} + +/* associate a stream to the given CNAME. This will make sure all streams for + * that CNAME are synchronized together. */ +static void +gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, + guint8 * data) +{ + GstRtpBinClient *client; + gboolean created; + GSList *walk; + + /* first find or create the CNAME */ + client = gst_rtp_bin_get_client (bin, len, data, &created); + + /* find stream in the client */ + for (walk = client->streams; walk; walk = g_slist_next (walk)) { + GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data; + + if (ostream == stream) + break; + } + /* not found, add it to the list */ + if (walk == NULL) { + GST_DEBUG_OBJECT (bin, + "new association of SSRC %08x with client %p with CNAME %s", + stream->ssrc, client, client->cname); + client->streams = g_slist_prepend (client->streams, stream); + client->nstreams++; + } else { + GST_DEBUG_OBJECT (bin, + "found association of SSRC %08x with client %p with CNAME %s", + stream->ssrc, client, client->cname); + } + + /* we can only continue if we know the local clock-base and clock-rate */ + if (stream->clock_base == -1) + goto no_clock_base; + if (stream->clock_rate <= 0) + goto no_clock_rate; + + /* map last RTP time to local timeline using our clock-base */ + stream->local_rtp = stream->last_extrtptime - stream->clock_base; + + GST_DEBUG_OBJECT (bin, + "base %" G_GUINT64_FORMAT ", extrtptime %" G_GUINT64_FORMAT + ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d", stream->clock_base, + stream->last_extrtptime, stream->local_rtp, stream->clock_rate); + + /* calculate local NTP time in gstreamer timestamp */ + stream->local_unix = + gst_util_uint64_scale_int (stream->local_rtp, GST_SECOND, + stream->clock_rate); + /* calculate delta between server and receiver */ + stream->unix_delta = stream->last_unix - stream->local_unix; + + GST_DEBUG_OBJECT (bin, + "local UNIX %" G_GUINT64_FORMAT ", remote UNIX %" G_GUINT64_FORMAT + ", delta %" G_GINT64_FORMAT, stream->local_unix, stream->last_unix, + stream->unix_delta); + + /* recalc inter stream playout offset, but only if there are more than one + * stream. */ + if (client->nstreams > 1) { + gint64 min; + + /* calculate the min of all deltas */ + min = G_MAXINT64; + for (walk = client->streams; walk; walk = g_slist_next (walk)) { + GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data; + + if (ostream->unix_delta < min) + min = ostream->unix_delta; + } + + GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT, client, + min); + + /* calculate offsets for each stream */ + for (walk = client->streams; walk; walk = g_slist_next (walk)) { + GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data; + + ostream->ts_offset = ostream->unix_delta - min; + + /* delta changed, see how much */ + if (ostream->prev_ts_offset != ostream->ts_offset) { + gint64 diff; + + if (ostream->prev_ts_offset > ostream->ts_offset) + diff = ostream->prev_ts_offset - ostream->ts_offset; + else + diff = ostream->ts_offset - ostream->prev_ts_offset; + + /* only change diff when it changed more than 1 millisecond. This + * compensates for rounding errors in NTP to RTP timestamp + * conversions */ + if (diff > GST_MSECOND) + g_object_set (ostream->buffer, "ts-offset", ostream->ts_offset, NULL); + + ostream->prev_ts_offset = ostream->ts_offset; + } + GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT, + ostream->ssrc, ostream->ts_offset); + } + } + return; + +no_clock_base: + { + GST_WARNING_OBJECT (bin, "we have no clock-base"); + return; + } +no_clock_rate: + { + GST_WARNING_OBJECT (bin, "we have no clock-rate"); + return; + } +} + +#define GST_RTCP_BUFFER_FOR_PACKETS(b,buffer,packet) \ + for ((b) = gst_rtcp_buffer_get_first_packet ((buffer), (packet)); (b); \ + (b) = gst_rtcp_packet_move_to_next ((packet))) + +#define GST_RTCP_SDES_FOR_ITEMS(b,packet) \ + for ((b) = gst_rtcp_packet_sdes_first_item ((packet)); (b); \ + (b) = gst_rtcp_packet_sdes_next_item ((packet))) + +#define GST_RTCP_SDES_FOR_ENTRIES(b,packet) \ + for ((b) = gst_rtcp_packet_sdes_first_entry ((packet)); (b); \ + (b) = gst_rtcp_packet_sdes_next_entry ((packet))) + +static GstFlowReturn +gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer) +{ + GstFlowReturn ret = GST_FLOW_OK; + GstRtpBinStream *stream; + GstRtpBin *bin; + GstRTCPPacket packet; + guint32 ssrc; + guint64 ntptime; + guint32 rtptime; + gboolean have_sr, have_sdes; + gboolean more; + + stream = gst_pad_get_element_private (pad); + bin = stream->bin; + + GST_DEBUG_OBJECT (bin, "received sync packet"); + + if (!gst_rtcp_buffer_validate (buffer)) + goto invalid_rtcp; + + have_sr = FALSE; + have_sdes = FALSE; + GST_RTCP_BUFFER_FOR_PACKETS (more, buffer, &packet) { + /* first packet must be SR or RR or else the validate would have failed */ + switch (gst_rtcp_packet_get_type (&packet)) { + case GST_RTCP_TYPE_SR: + /* only parse first. There is only supposed to be one SR in the packet + * but we will deal with malformed packets gracefully */ + if (have_sr) + break; + /* get NTP and RTP times */ + gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, &rtptime, + NULL, NULL); + + GST_DEBUG_OBJECT (bin, "received sync packet from SSRC %08x", ssrc); + /* ignore SR that is not ours */ + if (ssrc != stream->ssrc) + continue; + + have_sr = TRUE; + + /* store values in the stream */ + stream->have_sync = TRUE; + stream->last_unix = gst_rtcp_ntp_to_unix (ntptime); + /* use extended timestamp */ + gst_rtp_buffer_ext_timestamp (&stream->last_extrtptime, rtptime); + break; + case GST_RTCP_TYPE_SDES: + { + gboolean more_items, more_entries; + + /* only deal with first SDES, there is only supposed to be one SDES in + * the RTCP packet but we deal with bad packets gracefully. Also bail + * out if we have not seen an SR item yet. */ + if (have_sdes || !have_sr) + break; + + GST_RTCP_SDES_FOR_ITEMS (more_items, &packet) { + /* skip items that are not about the SSRC of the sender */ + if (gst_rtcp_packet_sdes_get_ssrc (&packet) != ssrc) + continue; + + /* find the CNAME entry */ + GST_RTCP_SDES_FOR_ENTRIES (more_entries, &packet) { + GstRTCPSDESType type; + guint8 len; + guint8 *data; + + gst_rtcp_packet_sdes_get_entry (&packet, &type, &len, &data); + + if (type == GST_RTCP_SDES_CNAME) { + stream->clock_base = GST_BUFFER_OFFSET (buffer); + /* associate the stream to CNAME */ + gst_rtp_bin_associate (bin, stream, len, data); + } + } + } + have_sdes = TRUE; + break; + } + default: + /* we can ignore these packets */ + break; + } + } + + gst_buffer_unref (buffer); + + return ret; + + /* ERRORS */ +invalid_rtcp: + { + /* this is fatal and should be filtered earlier */ + GST_ELEMENT_ERROR (bin, STREAM, DECODE, (NULL), + ("invalid RTCP packet received")); + gst_buffer_unref (buffer); + return GST_FLOW_ERROR; + } +} + /* create a new stream with @ssrc in @session. Must be called with * RTP_SESSION_LOCK. */ static GstRtpBinStream * @@ -520,6 +834,8 @@ create_stream (GstRtpBinSession * session, guint32 ssrc) { GstElement *buffer, *demux; GstRtpBinStream *stream; + GstPadTemplate *templ; + gchar *padname; if (!(buffer = gst_element_factory_make ("gstrtpjitterbuffer", NULL))) goto no_jitterbuffer; @@ -533,8 +849,22 @@ create_stream (GstRtpBinSession * session, guint32 ssrc) stream->session = session; stream->buffer = buffer; stream->demux = demux; + stream->last_extrtptime = -1; + stream->have_sync = FALSE; session->streams = g_slist_prepend (session->streams, stream); + /* make an internal sinkpad for RTCP sync packets. Take ownership of the + * pad. We will link this pad later. */ + padname = g_strdup_printf ("sync_%d", ssrc); + templ = gst_static_pad_template_get (&rtpbin_sync_sink_template); + stream->sync_pad = gst_pad_new_from_template (templ, padname); + gst_object_unref (templ); + gst_object_ref (stream->sync_pad); + gst_object_sink (stream->sync_pad); + gst_pad_set_element_private (stream->sync_pad, stream); + gst_pad_set_chain_function (stream->sync_pad, gst_rtp_bin_sync_chain); + gst_pad_set_active (stream->sync_pad, TRUE); + /* provide clock_rate to the jitterbuffer when needed */ g_signal_connect (buffer, "request-pt-map", (GCallback) pt_map_requested, session); @@ -566,17 +896,6 @@ no_demux: } } -/* Manages the RTP streams that come from one client and should therefore be - * synchronized. - */ -struct _GstRtpBinClient -{ - /* the common CNAME for the streams */ - gchar *cname; - /* the streams */ - GSList *streams; -}; - /* GObject vmethods */ static void gst_rtp_bin_finalize (GObject * object); static void gst_rtp_bin_set_property (GObject * object, guint prop_id, @@ -762,6 +1081,7 @@ gst_rtp_bin_init (GstRtpBin * rtpbin, GstRtpBinClass * klass) rtpbin->priv = GST_RTP_BIN_GET_PRIVATE (rtpbin); rtpbin->priv->bin_lock = g_mutex_new (); rtpbin->provided_clock = gst_system_clock_obtain (); + rtpbin->latency = DEFAULT_LATENCY_MS; } static void @@ -908,13 +1228,45 @@ no_caps: } } +/* emited when caps changed for the session */ +static void +caps_changed (GstPad * pad, GParamSpec * pspec, GstRtpBinSession * session) +{ + GstRtpBin *bin; + GstCaps *caps; + gint payload; + const GstStructure *s; + + bin = session->bin; + + g_object_get (pad, "caps", &caps, NULL); + + if (caps == NULL) + return; + + GST_DEBUG_OBJECT (bin, "got caps %" GST_PTR_FORMAT, caps); + + s = gst_caps_get_structure (caps, 0); + + /* get payload, finish when it's not there */ + if (!gst_structure_get_int (s, "payload", &payload)) + return; + + GST_RTP_SESSION_LOCK (session); + GST_DEBUG_OBJECT (bin, "insert caps for payload %d", payload); + g_hash_table_insert (session->ptmap, GINT_TO_POINTER (payload), caps); + GST_RTP_SESSION_UNLOCK (session); +} + /* a new pad (SSRC) was created in @session */ static void new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad, GstRtpBinSession * session) { GstRtpBinStream *stream; - GstPad *sinkpad; + GstPad *sinkpad, *srcpad; + gchar *padname; + GstCaps *caps; GST_DEBUG_OBJECT (session->bin, "new SSRC pad %08x", ssrc); @@ -925,12 +1277,38 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad, if (!stream) goto no_stream; + /* get the caps of the pad, we need the clock-rate and base_time if any. */ + if ((caps = gst_pad_get_caps (pad))) { + const GstStructure *s; + guint val; + + GST_DEBUG_OBJECT (session->bin, "pad has caps %" GST_PTR_FORMAT, caps); + + s = gst_caps_get_structure (caps, 0); + + if (!gst_structure_get_int (s, "clock-rate", &stream->clock_rate)) + stream->clock_rate = -1; + + if (gst_structure_get_uint (s, "clock-base", &val)) + stream->clock_base = val; + else + stream->clock_base = -1; + } + /* get pad and link */ GST_DEBUG_OBJECT (session->bin, "linking jitterbuffer"); sinkpad = gst_element_get_static_pad (stream->buffer, "sink"); gst_pad_link (pad, sinkpad); gst_object_unref (sinkpad); + /* get the RTCP sync pad */ + GST_DEBUG_OBJECT (session->bin, "linking sync pad"); + padname = g_strdup_printf ("rtcp_src_%d", ssrc); + srcpad = gst_element_get_pad (element, padname); + g_free (padname); + gst_pad_link (srcpad, stream->sync_pad); + gst_object_unref (srcpad); + /* connect to the new-pad signal of the payload demuxer, this will expose the * new pad by ghosting it. */ stream->demux_newpad_sig = g_signal_connect (stream->demux, @@ -992,6 +1370,9 @@ create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) if (session->recv_rtp_sink == NULL) goto pad_failed; + g_signal_connect (session->recv_rtp_sink, "notify::caps", + (GCallback) caps_changed, session); + GST_DEBUG_OBJECT (rtpbin, "getting RTP src pad"); /* get srcpad, link to SSRCDemux */ session->recv_rtp_src = @@ -999,8 +1380,9 @@ create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) if (session->recv_rtp_src == NULL) goto pad_failed; - GST_DEBUG_OBJECT (rtpbin, "getting demuxer sink pad"); + GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTP sink pad"); sinkdpad = gst_element_get_static_pad (session->demux, "sink"); + GST_DEBUG_OBJECT (rtpbin, "linking demuxer RTP sink pad"); lres = gst_pad_link (session->recv_rtp_src, sinkdpad); gst_object_unref (sinkdpad); if (lres != GST_PAD_LINK_OK) @@ -1057,11 +1439,8 @@ create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, GstPad *result; guint sessid; GstRtpBinSession *session; - -#if 0 GstPad *sinkdpad; GstPadLinkReturn lres; -#endif /* first get the session number */ if (name == NULL || sscanf (name, "recv_rtcp_sink_%d", &sessid) != 1) @@ -1083,29 +1462,25 @@ create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, if (session->recv_rtcp_sink != NULL) goto existed; - GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad"); - /* get recv_rtp pad and store */ + GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad"); session->recv_rtcp_sink = gst_element_get_request_pad (session->session, "recv_rtcp_sink"); if (session->recv_rtcp_sink == NULL) goto pad_failed; -#if 0 /* get srcpad, link to SSRCDemux */ GST_DEBUG_OBJECT (rtpbin, "getting sync src pad"); - session->recv_rtcp_src = - gst_element_get_static_pad (session->session, "sync_src"); - if (session->recv_rtcp_src == NULL) + session->sync_src = gst_element_get_static_pad (session->session, "sync_src"); + if (session->sync_src == NULL) goto pad_failed; - GST_DEBUG_OBJECT (rtpbin, "linking sync to demux"); - sinkdpad = gst_element_get_static_pad (session->demux, "sink"); - lres = gst_pad_link (session->recv_rtcp_src, sinkdpad); + GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTCP sink pad"); + sinkdpad = gst_element_get_static_pad (session->demux, "rtcp_sink"); + lres = gst_pad_link (session->sync_src, sinkdpad); gst_object_unref (sinkdpad); if (lres != GST_PAD_LINK_OK) goto link_failed; -#endif result = gst_ghost_pad_new_from_template (name, session->recv_rtcp_sink, templ); @@ -1136,13 +1511,11 @@ pad_failed: g_warning ("gstrtpbin: failed to get session pad"); return NULL; } -#if 0 link_failed: { g_warning ("gstrtpbin: failed to link pads"); return NULL; } -#endif } /* Create a pad for sending RTP for the session in @name. Must be called with @@ -1180,6 +1553,9 @@ create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) if (session->send_rtp_sink == NULL) goto pad_failed; + g_signal_connect (session->send_rtp_sink, "notify::caps", + (GCallback) caps_changed, session); + result = gst_ghost_pad_new_from_template (name, session->send_rtp_sink, templ); gst_pad_set_active (result, TRUE); diff --git a/gst/rtpmanager/gstrtpbin.h b/gst/rtpmanager/gstrtpbin.h index 4dd755f128..874167ccf9 100644 --- a/gst/rtpmanager/gstrtpbin.h +++ b/gst/rtpmanager/gstrtpbin.h @@ -48,6 +48,9 @@ struct _GstRtpBin { /* clock we provide */ GstClock *provided_clock; + /* a list of clients, these are streams with the same CNAME */ + GSList *clients; + /*< private >*/ GstRtpBinPrivate *priv; }; diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index bd1c07ead3..a23fbb87a1 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -99,12 +99,14 @@ enum #define DEFAULT_LATENCY_MS 200 #define DEFAULT_DROP_ON_LATENCY FALSE +#define DEFAULT_TS_OFFSET 0 enum { PROP_0, PROP_LATENCY, - PROP_DROP_ON_LATENCY + PROP_DROP_ON_LATENCY, + PROP_TS_OFFSET }; #define JBUF_LOCK(priv) (g_mutex_lock ((priv)->jbuf_lock)) @@ -137,6 +139,7 @@ struct _GstRtpJitterBufferPrivate /* properties */ guint latency_ms; gboolean drop_on_latency; + gint64 ts_offset; /* the last seqnum we pushed out */ guint32 last_popped_seqnum; @@ -150,6 +153,7 @@ struct _GstRtpJitterBufferPrivate gint32 clock_rate; gint64 clock_base; guint64 exttimestamp; + gint64 prev_ts_offset; /* when we are shutting down */ GstFlowReturn srcresult; @@ -277,6 +281,16 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass) "Drop buffers when maximum latency is reached", "Tells the jitterbuffer to never exceed the given latency in size", DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE)); + /** + * GstRtpJitterBuffer::ts-offset: + * + * Adjust RTP timestamps in the jitterbuffer with offset. + */ + g_object_class_install_property (gobject_class, PROP_TS_OFFSET, + g_param_spec_int64 ("ts-offset", + "Timestamp Offset", + "Adjust buffer RTP timestamps with offset in nanoseconds", G_MININT64, + G_MAXINT64, DEFAULT_TS_OFFSET, G_PARAM_READWRITE)); /** * GstRtpJitterBuffer::request-pt-map: * @buffer: the object which received the signal @@ -421,7 +435,7 @@ gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer, { GstRtpJitterBufferPrivate *priv; GstStructure *caps_struct; - const GValue *value; + guint val; priv = jitterbuffer->priv; @@ -443,22 +457,22 @@ gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer, /* gah, clock-base is uint. If we don't have a base, we will use the first * buffer timestamp as the base time. This will screw up sync but it's better * than nothing. */ - value = gst_structure_get_value (caps_struct, "clock-base"); - if (value && G_VALUE_HOLDS_UINT (value)) { - priv->clock_base = g_value_get_uint (value); - GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT, - priv->clock_base); - } else + if (gst_structure_get_uint (caps_struct, "clock-base", &val)) + priv->clock_base = val; + else priv->clock_base = -1; + GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT, + priv->clock_base); + /* first expected seqnum */ - value = gst_structure_get_value (caps_struct, "seqnum-base"); - if (value && G_VALUE_HOLDS_UINT (value)) { - priv->next_seqnum = g_value_get_uint (value); - GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_seqnum); - } else + if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) + priv->next_seqnum = val; + else priv->next_seqnum = -1; + GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_seqnum); + return TRUE; /* ERRORS */ @@ -929,6 +943,7 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer) GstClockTime timestamp; gint64 running_time; guint64 exttimestamp; + gint ts_offset_rtp; priv = jitterbuffer->priv; @@ -996,8 +1011,11 @@ again: exttimestamp, priv->clock_base); /* if no clock_base was given, take first ts as base */ - if (priv->clock_base == -1) + if (priv->clock_base == -1) { + GST_DEBUG_OBJECT (jitterbuffer, + "no clock base, using exttimestamp %" G_GUINT64_FORMAT, exttimestamp); priv->clock_base = exttimestamp; + } /* take rtp timestamp offset into account, this can wrap around */ exttimestamp -= priv->clock_base; @@ -1089,6 +1107,34 @@ push_buffer: outbuf = gst_buffer_make_metadata_writable (outbuf); GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT); } + + /* apply the timestamp offset */ + if (priv->ts_offset > 0) + ts_offset_rtp = + gst_util_uint64_scale_int (priv->ts_offset, priv->clock_rate, + GST_SECOND); + else if (priv->ts_offset < 0) + ts_offset_rtp = + -gst_util_uint64_scale_int (-priv->ts_offset, priv->clock_rate, + GST_SECOND); + else + ts_offset_rtp = 0; + + if (ts_offset_rtp != 0) { + guint32 timestamp; + + /* if the offset changed, mark with discont */ + if (priv->ts_offset != priv->prev_ts_offset) { + GST_DEBUG_OBJECT (jitterbuffer, "changing offset to %d", ts_offset_rtp); + GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT); + priv->prev_ts_offset = priv->ts_offset; + } + + timestamp = gst_rtp_buffer_get_timestamp (outbuf); + timestamp += ts_offset_rtp; + gst_rtp_buffer_set_timestamp (outbuf, timestamp); + } + /* now we are ready to push the buffer. Save the seqnum and release the lock * so the other end can push stuff in the queue again. */ priv->last_popped_seqnum = seqnum; @@ -1158,6 +1204,7 @@ gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query) GstClockTime min_latency, max_latency; gboolean us_live; GstPad *peer; + GstClockTime our_latency; if ((peer = gst_pad_get_peer (priv->sinkpad))) { if ((res = gst_pad_query (peer, query))) { @@ -1172,11 +1219,16 @@ gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query) priv->peer_latency = min_latency; JBUF_UNLOCK (priv); - min_latency += priv->latency_ms * GST_MSECOND; + our_latency = ((guint64) priv->latency_ms) * GST_MSECOND; + + GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT, + GST_TIME_ARGS (our_latency)); + + min_latency += our_latency; /* max_latency can be -1, meaning there is no upper limit for the * latency. */ if (max_latency != -1) - max_latency += priv->latency_ms * GST_MSECOND; + max_latency += our_latency * GST_MSECOND; GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %" GST_TIME_FORMAT " max %" GST_TIME_FORMAT, @@ -1199,7 +1251,11 @@ static void gst_rtp_jitter_buffer_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) { - GstRtpJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (object); + GstRtpJitterBuffer *jitterbuffer; + GstRtpJitterBufferPrivate *priv; + + jitterbuffer = GST_RTP_JITTER_BUFFER (object); + priv = jitterbuffer->priv; switch (prop_id) { case PROP_LATENCY: @@ -1208,23 +1264,29 @@ gst_rtp_jitter_buffer_set_property (GObject * object, /* FIXME, not threadsafe */ new_latency = g_value_get_uint (value); - old_latency = jitterbuffer->priv->latency_ms; + old_latency = priv->latency_ms; - jitterbuffer->priv->latency_ms = new_latency; + priv->latency_ms = new_latency; /* post message if latency changed, this will inform the parent pipeline * that a latency reconfiguration is possible/needed. */ if (new_latency != old_latency) { + GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT, + GST_TIME_ARGS (new_latency * GST_MSECOND)); + gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer))); } break; } case PROP_DROP_ON_LATENCY: - { - jitterbuffer->priv->drop_on_latency = g_value_get_boolean (value); + priv->drop_on_latency = g_value_get_boolean (value); + break; + case PROP_TS_OFFSET: + JBUF_LOCK (priv); + priv->ts_offset = g_value_get_int64 (value); + JBUF_UNLOCK (priv); break; - } default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -1235,14 +1297,23 @@ static void gst_rtp_jitter_buffer_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec) { - GstRtpJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (object); + GstRtpJitterBuffer *jitterbuffer; + GstRtpJitterBufferPrivate *priv; + + jitterbuffer = GST_RTP_JITTER_BUFFER (object); + priv = jitterbuffer->priv; switch (prop_id) { case PROP_LATENCY: - g_value_set_uint (value, jitterbuffer->priv->latency_ms); + g_value_set_uint (value, priv->latency_ms); break; case PROP_DROP_ON_LATENCY: - g_value_set_boolean (value, jitterbuffer->priv->drop_on_latency); + g_value_set_boolean (value, priv->drop_on_latency); + break; + case PROP_TS_OFFSET: + JBUF_LOCK (priv); + g_value_set_int64 (value, priv->ts_offset); + JBUF_UNLOCK (priv); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c index 985a371343..e716682c6b 100644 --- a/gst/rtpmanager/gstrtpsession.c +++ b/gst/rtpmanager/gstrtpsession.c @@ -132,6 +132,8 @@ #include "config.h" #endif +#include + #include "gstrtpbin-marshal.h" #include "gstrtpsession.h" #include "rtpsession.h" @@ -214,7 +216,8 @@ enum enum { - PROP_0 + PROP_0, + PROP_NTP_NS_BASE }; #define GST_RTP_SESSION_GET_PRIVATE(obj) \ @@ -234,8 +237,10 @@ struct _GstRtpSessionPrivate GThread *thread; /* caps mapping */ - guint8 pt; - gint clock_rate; + GHashTable *ptmap; + + /* NTP base time */ + guint64 ntpnsbase; }; /* callbacks to handle actions from the session manager */ @@ -245,18 +250,18 @@ static GstFlowReturn gst_rtp_session_send_rtp (RTPSession * sess, RTPSource * src, GstBuffer * buffer, gpointer user_data); static GstFlowReturn gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src, GstBuffer * buffer, gpointer user_data); +static GstFlowReturn gst_rtp_session_sync_rtcp (RTPSession * sess, + RTPSource * src, GstBuffer * buffer, gpointer user_data); static gint gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload, gpointer user_data); -static GstClockTime gst_rtp_session_get_time (RTPSession * sess, - gpointer user_data); static void gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data); static RTPSessionCallbacks callbacks = { gst_rtp_session_process_rtp, gst_rtp_session_send_rtp, gst_rtp_session_send_rtcp, + gst_rtp_session_sync_rtcp, gst_rtp_session_clock_rate, - gst_rtp_session_get_time, gst_rtp_session_reconsider }; @@ -363,6 +368,7 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass) gobject_class->set_property = gst_rtp_session_set_property; gobject_class->get_property = gst_rtp_session_get_property; + /** * GstRtpSession::request-pt-map: * @sess: the object which received the signal @@ -490,6 +496,7 @@ gst_rtp_session_init (GstRtpSession * rtpsession, GstRtpSessionClass * klass) (GCallback) on_bye_timeout, rtpsession); g_signal_connect (rtpsession->priv->session, "on-timeout", (GCallback) on_timeout, rtpsession); + rtpsession->priv->ptmap = g_hash_table_new (NULL, NULL); } static void @@ -513,6 +520,9 @@ gst_rtp_session_set_property (GObject * object, guint prop_id, rtpsession = GST_RTP_SESSION (object); switch (prop_id) { + case PROP_NTP_NS_BASE: + rtpsession->priv->ntpnsbase = g_value_get_uint64 (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -528,26 +538,51 @@ gst_rtp_session_get_property (GObject * object, guint prop_id, rtpsession = GST_RTP_SESSION (object); switch (prop_id) { + case PROP_NTP_NS_BASE: + g_value_set_uint64 (value, rtpsession->priv->ntpnsbase); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } } +static guint64 +get_current_ntp_ns_time (GstRtpSession * rtpsession, GstClock * clock) +{ + guint64 ntpnstime; + + if (clock) { + /* get current NTP time */ + ntpnstime = gst_clock_get_time (clock); + /* convert to running time */ + ntpnstime -= gst_element_get_base_time (GST_ELEMENT_CAST (rtpsession)); + /* add NTP base offset */ + ntpnstime += rtpsession->priv->ntpnsbase; + } else + ntpnstime = -1; + + return ntpnstime; +} + static void rtcp_thread (GstRtpSession * rtpsession) { - GstClock *clock; + GstClock *sysclock, *clock; GstClockID id; GstClockTime current_time; GstClockTime next_timeout; + guint64 ntpnstime; - /* RTCP timeouts we use the system clock */ - clock = gst_system_clock_obtain (); - if (clock == NULL) - goto no_clock; + /* for RTCP timeouts we use the system clock */ + sysclock = gst_system_clock_obtain (); + if (sysclock == NULL) + goto no_sysclock; - current_time = gst_clock_get_time (clock); + /* to get the current NTP time, we use the pipeline clock */ + clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession)); + + current_time = gst_clock_get_time (sysclock); GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread"); @@ -568,7 +603,7 @@ rtcp_thread (GstRtpSession * rtpsession) break; id = rtpsession->priv->id = - gst_clock_new_single_shot_id (clock, next_timeout); + gst_clock_new_single_shot_id (sysclock, next_timeout); GST_RTP_SESSION_UNLOCK (rtpsession); res = gst_clock_id_wait (id, NULL); @@ -581,7 +616,10 @@ rtcp_thread (GstRtpSession * rtpsession) break; /* update current time */ - current_time = gst_clock_get_time (clock); + current_time = gst_clock_get_time (sysclock); + + /* get current NTP time */ + ntpnstime = get_current_ntp_ns_time (rtpsession, clock); /* we get unlocked because we need to perform reconsideration, don't perform * the timeout but get a new reporting estimate. */ @@ -590,18 +628,18 @@ rtcp_thread (GstRtpSession * rtpsession) /* perform actions, we ignore result. Release lock because it might push. */ GST_RTP_SESSION_UNLOCK (rtpsession); - rtp_session_on_timeout (rtpsession->priv->session, current_time); + rtp_session_on_timeout (rtpsession->priv->session, current_time, ntpnstime); GST_RTP_SESSION_LOCK (rtpsession); } GST_RTP_SESSION_UNLOCK (rtpsession); - gst_object_unref (clock); + gst_object_unref (sysclock); GST_DEBUG_OBJECT (rtpsession, "leaving RTCP thread"); return; /* ERRORS */ -no_clock: +no_sysclock: { GST_ELEMENT_ERROR (rtpsession, CORE, CLOCK, (NULL), ("Could not get system clock")); @@ -662,7 +700,6 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_NULL_TO_READY: break; case GST_STATE_CHANGE_READY_TO_PAUSED: - priv->clock_rate = -1; break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: break; @@ -677,17 +714,9 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_PAUSED_TO_PLAYING: - { - GstClockTime base_time; - - base_time = GST_ELEMENT_CAST (rtpsession)->base_time; - - rtp_session_set_base_time (priv->session, base_time); - if (!start_rtcp_thread (rtpsession)) goto failed_thread; break; - } case GST_STATE_CHANGE_PLAYING_TO_PAUSED: break; case GST_STATE_CHANGE_PAUSED_TO_READY: @@ -774,6 +803,15 @@ gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src, priv = rtpsession->priv; if (rtpsession->send_rtcp_src) { + GstCaps *caps; + + /* set rtcp caps on output pad */ + if (!(caps = GST_PAD_CAPS (rtpsession->send_rtcp_src))) { + caps = gst_caps_new_simple ("application/x-rtcp", NULL); + gst_pad_set_caps (rtpsession->send_rtcp_src, caps); + gst_caps_unref (caps); + } + gst_buffer_set_caps (buffer, caps); GST_DEBUG_OBJECT (rtpsession, "sending RTCP"); result = gst_pad_push (rtpsession->send_rtcp_src, buffer); } else { @@ -784,30 +822,59 @@ gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src, return result; } -static gboolean -gst_rtp_session_parse_caps (GstRtpSession * rtpsession, GstCaps * caps) +/* called when the session manager has an SR RTCP packet ready for handling + * inter stream synchronisation */ +static GstFlowReturn +gst_rtp_session_sync_rtcp (RTPSession * sess, + RTPSource * src, GstBuffer * buffer, gpointer user_data) +{ + GstFlowReturn result; + GstRtpSession *rtpsession; + GstRtpSessionPrivate *priv; + + rtpsession = GST_RTP_SESSION (user_data); + priv = rtpsession->priv; + + if (rtpsession->sync_src) { + GstCaps *caps; + + /* set rtcp caps on output pad */ + if (!(caps = GST_PAD_CAPS (rtpsession->sync_src))) { + caps = gst_caps_new_simple ("application/x-rtcp", NULL); + gst_pad_set_caps (rtpsession->sync_src, caps); + gst_caps_unref (caps); + } + gst_buffer_set_caps (buffer, caps); + GST_DEBUG_OBJECT (rtpsession, "sending Sync RTCP"); + result = gst_pad_push (rtpsession->sync_src, buffer); + } else { + GST_DEBUG_OBJECT (rtpsession, "not sending Sync RTCP, no output pad"); + gst_buffer_unref (buffer); + result = GST_FLOW_OK; + } + return result; +} + +static void +gst_rtp_session_cache_caps (GstRtpSession * rtpsession, GstCaps * caps) { GstRtpSessionPrivate *priv; - const GstStructure *caps_struct; + const GstStructure *s; + gint payload; priv = rtpsession->priv; GST_DEBUG_OBJECT (rtpsession, "parsing caps"); - caps_struct = gst_caps_get_structure (caps, 0); - if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate)) - goto no_clock_rate; + s = gst_caps_get_structure (caps, 0); + if (!gst_structure_get_int (s, "payload", &payload)) + return; - GST_DEBUG_OBJECT (rtpsession, "parsed clock-rate %d", priv->clock_rate); + caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (payload)); + if (caps) + return; - return TRUE; - - /* ERRORS */ -no_clock_rate: - { - GST_DEBUG_OBJECT (rtpsession, "No clock-rate in caps!"); - return FALSE; - } + g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (payload), caps); } /* called when the session manager needs the clock rate */ @@ -821,13 +888,15 @@ gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload, GValue ret = { 0 }; GValue args[2] = { {0}, {0} }; GstCaps *caps; + const GstStructure *s; rtpsession = GST_RTP_SESSION_CAST (user_data); priv = rtpsession->priv; - /* if we have it, return it */ - if (priv->clock_rate != -1) - return priv->clock_rate; + GST_RTP_SESSION_LOCK (rtpsession); + caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (payload)); + if (caps) + goto done; g_value_init (&args[0], GST_TYPE_ELEMENT); g_value_set_object (&args[0], rtpsession); @@ -844,10 +913,16 @@ gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload, if (!caps) goto no_caps; - if (!gst_rtp_session_parse_caps (rtpsession, caps)) - goto parse_failed; + gst_rtp_session_cache_caps (rtpsession, caps); - result = priv->clock_rate; + s = gst_caps_get_structure (caps, 0); + if (!gst_structure_get_int (s, "clock-rate", &result)) + goto no_clock_rate; + + GST_DEBUG_OBJECT (rtpsession, "parsed clock-rate %d", result); + +done: + GST_RTP_SESSION_UNLOCK (rtpsession); return result; @@ -855,35 +930,15 @@ gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload, no_caps: { GST_DEBUG_OBJECT (rtpsession, "could not get caps"); - return -1; + goto done; } -parse_failed: +no_clock_rate: { - GST_DEBUG_OBJECT (rtpsession, "failed to parse caps"); - return -1; + GST_DEBUG_OBJECT (rtpsession, "No clock-rate in caps!"); + goto done; } } -/* called when the session manager needs the time of clock */ -static GstClockTime -gst_rtp_session_get_time (RTPSession * sess, gpointer user_data) -{ - GstClockTime result; - GstRtpSession *rtpsession; - GstClock *clock; - - rtpsession = GST_RTP_SESSION_CAST (user_data); - - clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession)); - if (clock) { - result = gst_clock_get_time (clock); - gst_object_unref (clock); - } else - result = GST_CLOCK_TIME_NONE; - - return result; -} - /* called when the session manager asks us to reconsider the timeout */ static void gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data) @@ -925,18 +980,19 @@ gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event) static gboolean gst_rtp_session_sink_setcaps (GstPad * pad, GstCaps * caps) { - gboolean res; GstRtpSession *rtpsession; GstRtpSessionPrivate *priv; rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); priv = rtpsession->priv; - res = gst_rtp_session_parse_caps (rtpsession, caps); + GST_RTP_SESSION_LOCK (rtpsession); + gst_rtp_session_cache_caps (rtpsession, caps); + GST_RTP_SESSION_UNLOCK (rtpsession); gst_object_unref (rtpsession); - return res; + return TRUE; } /* receive a packet from a sender, send it to the RTP session manager and @@ -948,13 +1004,17 @@ gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer) GstRtpSession *rtpsession; GstRtpSessionPrivate *priv; GstFlowReturn ret; + guint64 ntpnstime; rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); priv = rtpsession->priv; GST_DEBUG_OBJECT (rtpsession, "received RTP packet"); - ret = rtp_session_process_rtp (priv->session, buffer); + ntpnstime = + get_current_ntp_ns_time (rtpsession, GST_ELEMENT_CLOCK (rtpsession)); + + ret = rtp_session_process_rtp (priv->session, buffer, ntpnstime); gst_object_unref (rtpsession); @@ -1051,8 +1111,6 @@ gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstEvent * event) gst_segment_set_newsegment_full (segment, update, rate, arate, format, start, stop, time); - rtp_session_set_timestamp_sync (priv->session, start); - /* push event forward */ ret = gst_pad_push_event (rtpsession->send_rtp_src, event); break; @@ -1075,13 +1133,24 @@ gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer) GstRtpSession *rtpsession; GstRtpSessionPrivate *priv; GstFlowReturn ret; + GstClockTime timestamp; + guint64 ntpnstime; rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); priv = rtpsession->priv; GST_DEBUG_OBJECT (rtpsession, "received RTP packet"); - ret = rtp_session_send_rtp (priv->session, buffer); + /* get NTP time when this packet was captured, this depends on the timestamp. */ + timestamp = GST_BUFFER_TIMESTAMP (buffer); + if (GST_CLOCK_TIME_IS_VALID (timestamp)) { + /* convert to running time using the segment start value. */ + ntpnstime = timestamp - rtpsession->send_rtp_seg.start; + ntpnstime += priv->ntpnsbase; + } else + ntpnstime = -1; + + ret = rtp_session_send_rtp (priv->session, buffer, ntpnstime); gst_object_unref (rtpsession); @@ -1113,6 +1182,7 @@ create_recv_rtp_sink (GstRtpSession * rtpsession) rtpsession->recv_rtp_src = gst_pad_new_from_static_template (&rtpsession_recv_rtp_src_template, "recv_rtp_src"); + gst_pad_use_fixed_caps (rtpsession->recv_rtp_src); gst_pad_set_active (rtpsession->recv_rtp_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_src); @@ -1142,6 +1212,7 @@ create_recv_rtcp_sink (GstRtpSession * rtpsession) rtpsession->sync_src = gst_pad_new_from_static_template (&rtpsession_sync_src_template, "sync_src"); + gst_pad_use_fixed_caps (rtpsession->sync_src); gst_pad_set_active (rtpsession->sync_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->sync_src); @@ -1172,6 +1243,7 @@ create_send_rtp_sink (GstRtpSession * rtpsession) rtpsession->send_rtp_src = gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template, "send_rtp_src"); + gst_pad_use_fixed_caps (rtpsession->send_rtp_src); gst_pad_set_active (rtpsession->send_rtp_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src); @@ -1190,6 +1262,7 @@ create_send_rtcp_src (GstRtpSession * rtpsession) rtpsession->send_rtcp_src = gst_pad_new_from_static_template (&rtpsession_send_rtcp_src_template, "send_rtcp_src"); + gst_pad_use_fixed_caps (rtpsession->send_rtcp_src); gst_pad_set_active (rtpsession->send_rtcp_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtcp_src); diff --git a/gst/rtpmanager/gstrtpssrcdemux.c b/gst/rtpmanager/gstrtpssrcdemux.c index 539b03c75d..5457bc3513 100644 --- a/gst/rtpmanager/gstrtpssrcdemux.c +++ b/gst/rtpmanager/gstrtpssrcdemux.c @@ -52,6 +52,7 @@ #include #include +#include #include "gstrtpbin-marshal.h" #include "gstrtpssrcdemux.h" @@ -67,6 +68,13 @@ GST_STATIC_PAD_TEMPLATE ("sink", GST_STATIC_CAPS ("application/x-rtp") ); +static GstStaticPadTemplate rtp_ssrc_demux_rtcp_sink_template = +GST_STATIC_PAD_TEMPLATE ("rtcp_sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("application/x-rtcp") + ); + static GstStaticPadTemplate rtp_ssrc_demux_src_template = GST_STATIC_PAD_TEMPLATE ("src_%d", GST_PAD_SRC, @@ -74,6 +82,13 @@ GST_STATIC_PAD_TEMPLATE ("src_%d", GST_STATIC_CAPS ("application/x-rtp") ); +static GstStaticPadTemplate rtp_ssrc_demux_rtcp_src_template = +GST_STATIC_PAD_TEMPLATE ("rtcp_src_%d", + GST_PAD_SRC, + GST_PAD_SOMETIMES, + GST_STATIC_CAPS ("application/x-rtcp") + ); + static GstElementDetails gst_rtp_ssrc_demux_details = { "RTP SSRC Demux", "Demux/Network/RTP", @@ -103,6 +118,11 @@ static GstStateChangeReturn gst_rtp_ssrc_demux_change_state (GstElement * static GstFlowReturn gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf); static gboolean gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event); +static GstFlowReturn gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, + GstBuffer * buf); +static gboolean gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad, + GstEvent * event); + /* srcpad stuff */ static gboolean gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event); @@ -113,59 +133,78 @@ static guint gst_rtp_ssrc_demux_signals[LAST_SIGNAL] = { 0 }; */ struct _GstRtpSsrcDemuxPad { - GstPad *pad; guint32 ssrc; + GstPad *rtp_pad; GstCaps *caps; + GstPad *rtcp_pad; }; /* find a src pad for a given SSRC, returns NULL if the SSRC was not found */ -static GstPad * -find_rtp_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) +static GstRtpSsrcDemuxPad * +find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) { GSList *walk; - for (walk = demux->rtp_srcpads; walk; walk = g_slist_next (walk)) { + for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) { GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data; if (pad->ssrc == ssrc) - return pad->pad; + return pad; } return NULL; } -static GstPad * -create_rtp_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) +static GstRtpSsrcDemuxPad * +create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) { - GstPad *result; + GstPad *rtp_pad, *rtcp_pad; GstElementClass *klass; GstPadTemplate *templ; gchar *padname; GstRtpSsrcDemuxPad *demuxpad; + GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc); + klass = GST_ELEMENT_GET_CLASS (demux); templ = gst_element_class_get_pad_template (klass, "src_%d"); padname = g_strdup_printf ("src_%d", ssrc); - result = gst_pad_new_from_template (templ, padname); + rtp_pad = gst_pad_new_from_template (templ, padname); + g_free (padname); + + templ = gst_element_class_get_pad_template (klass, "rtcp_src_%d"); + padname = g_strdup_printf ("rtcp_src_%d", ssrc); + rtcp_pad = gst_pad_new_from_template (templ, padname); g_free (padname); /* wrap in structure and add to list */ demuxpad = g_new0 (GstRtpSsrcDemuxPad, 1); demuxpad->ssrc = ssrc; - demuxpad->pad = result; - demux->rtp_srcpads = g_slist_prepend (demux->rtp_srcpads, demuxpad); + demuxpad->rtp_pad = rtp_pad; + demuxpad->rtcp_pad = rtcp_pad; + + demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad); + GST_OBJECT_UNLOCK (demux); /* copy caps from input */ - gst_pad_set_caps (result, GST_PAD_CAPS (demux->rtp_sink)); + gst_pad_set_caps (rtp_pad, GST_PAD_CAPS (demux->rtp_sink)); + gst_pad_use_fixed_caps (rtp_pad); + gst_pad_set_caps (rtcp_pad, GST_PAD_CAPS (demux->rtcp_sink)); + gst_pad_use_fixed_caps (rtcp_pad); - gst_pad_set_event_function (result, gst_rtp_ssrc_demux_src_event); - gst_pad_set_active (result, TRUE); - gst_element_add_pad (GST_ELEMENT_CAST (demux), result); + gst_pad_set_event_function (rtp_pad, gst_rtp_ssrc_demux_src_event); + gst_pad_set_active (rtp_pad, TRUE); + gst_pad_set_active (rtcp_pad, TRUE); + + gst_element_add_pad (GST_ELEMENT_CAST (demux), rtp_pad); + gst_element_add_pad (GST_ELEMENT_CAST (demux), rtcp_pad); g_signal_emit (G_OBJECT (demux), - gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, result); + gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, rtp_pad); - return result; + GST_OBJECT_LOCK (demux); + + return demuxpad; } static void @@ -175,8 +214,12 @@ gst_rtp_ssrc_demux_base_init (gpointer g_class) gst_element_class_add_pad_template (gstelement_klass, gst_static_pad_template_get (&rtp_ssrc_demux_sink_template)); + gst_element_class_add_pad_template (gstelement_klass, + gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_sink_template)); gst_element_class_add_pad_template (gstelement_klass, gst_static_pad_template_get (&rtp_ssrc_demux_src_template)); + gst_element_class_add_pad_template (gstelement_klass, + gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_src_template)); gst_element_class_set_details (gstelement_klass, &gst_rtp_ssrc_demux_details); } @@ -226,6 +269,14 @@ gst_rtp_ssrc_demux_init (GstRtpSsrcDemux * demux, gst_pad_set_chain_function (demux->rtp_sink, gst_rtp_ssrc_demux_chain); gst_pad_set_event_function (demux->rtp_sink, gst_rtp_ssrc_demux_sink_event); gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtp_sink); + + demux->rtcp_sink = + gst_pad_new_from_template (gst_element_class_get_pad_template (klass, + "rtcp_sink"), "rtcp_sink"); + gst_pad_set_chain_function (demux->rtcp_sink, gst_rtp_ssrc_demux_rtcp_chain); + gst_pad_set_event_function (demux->rtcp_sink, + gst_rtp_ssrc_demux_rtcp_sink_event); + gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtcp_sink); } static void @@ -249,21 +300,63 @@ gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event) switch (GST_EVENT_TYPE (event)) { case GST_EVENT_NEWSEGMENT: default: - res = gst_pad_event_default (pad, event); + { + GSList *walk; + + res = TRUE; + GST_OBJECT_LOCK (demux); + for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) { + GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data; + + gst_event_ref (event); + res &= gst_pad_push_event (pad->rtp_pad, event); + } + GST_OBJECT_UNLOCK (demux); + gst_event_unref (event); break; + } } gst_object_unref (demux); return res; } +static gboolean +gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad, GstEvent * event) +{ + GstRtpSsrcDemux *demux; + gboolean res = FALSE; + + demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad)); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_NEWSEGMENT: + default: + { + GSList *walk; + + res = TRUE; + GST_OBJECT_LOCK (demux); + for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) { + GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data; + + res &= gst_pad_push_event (pad->rtcp_pad, event); + } + GST_OBJECT_UNLOCK (demux); + break; + } + } + gst_object_unref (demux); + return res; +} + static GstFlowReturn gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf) { GstFlowReturn ret; GstRtpSsrcDemux *demux; guint32 ssrc; - GstPad *srcpad; + GstRtpSsrcDemuxPad *dpad; demux = GST_RTP_SSRC_DEMUX (GST_OBJECT_PARENT (pad)); @@ -274,16 +367,16 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf) GST_DEBUG_OBJECT (demux, "received buffer of SSRC %08x", ssrc); - srcpad = find_rtp_pad_for_ssrc (demux, ssrc); - if (srcpad == NULL) { - GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc); - srcpad = create_rtp_pad_for_ssrc (demux, ssrc); - if (!srcpad) + GST_OBJECT_LOCK (demux); + dpad = find_demux_pad_for_ssrc (demux, ssrc); + if (dpad == NULL) { + if (!(dpad = create_demux_pad_for_ssrc (demux, ssrc))) goto create_failed; } + GST_OBJECT_UNLOCK (demux); /* push to srcpad */ - ret = gst_pad_push (srcpad, buf); + ret = gst_pad_push (dpad->rtp_pad, buf); return ret; @@ -298,9 +391,74 @@ invalid_payload: } create_failed: { - /* this is not fatal yet */ GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL), ("Could not create new pad")); + GST_OBJECT_UNLOCK (demux); + gst_buffer_unref (buf); + return GST_FLOW_ERROR; + } +} + +static GstFlowReturn +gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstBuffer * buf) +{ + GstFlowReturn ret; + GstRtpSsrcDemux *demux; + guint32 ssrc; + GstRtpSsrcDemuxPad *dpad; + GstRTCPPacket packet; + + demux = GST_RTP_SSRC_DEMUX (GST_OBJECT_PARENT (pad)); + + if (!gst_rtcp_buffer_validate (buf)) + goto invalid_rtcp; + + if (!gst_rtcp_buffer_get_first_packet (buf, &packet)) + goto invalid_rtcp; + + /* first packet must be SR or RR or else the validate would have failed */ + switch (gst_rtcp_packet_get_type (&packet)) { + case GST_RTCP_TYPE_SR: + gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, NULL, NULL, + NULL); + break; + case GST_RTCP_TYPE_RR: + ssrc = gst_rtcp_packet_rr_get_ssrc (&packet); + break; + default: + goto invalid_rtcp; + } + + GST_DEBUG_OBJECT (demux, "received RTCP of SSRC %08x", ssrc); + + GST_OBJECT_LOCK (demux); + dpad = find_demux_pad_for_ssrc (demux, ssrc); + if (dpad == NULL) { + GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc); + if (!(dpad = create_demux_pad_for_ssrc (demux, ssrc))) + goto create_failed; + } + GST_OBJECT_UNLOCK (demux); + + /* push to srcpad */ + ret = gst_pad_push (dpad->rtcp_pad, buf); + + return ret; + + /* ERRORS */ +invalid_rtcp: + { + /* this is fatal and should be filtered earlier */ + GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL), + ("Dropping invalid RTCP packet")); + gst_buffer_unref (buf); + return GST_FLOW_ERROR; + } +create_failed: + { + GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL), + ("Could not create new pad")); + GST_OBJECT_UNLOCK (demux); gst_buffer_unref (buf); return GST_FLOW_ERROR; } diff --git a/gst/rtpmanager/gstrtpssrcdemux.h b/gst/rtpmanager/gstrtpssrcdemux.h index 5d93330d18..bea2769d7a 100644 --- a/gst/rtpmanager/gstrtpssrcdemux.h +++ b/gst/rtpmanager/gstrtpssrcdemux.h @@ -37,7 +37,8 @@ struct _GstRtpSsrcDemux GstElement parent; GstPad *rtp_sink; - GSList *rtp_srcpads; + GstPad *rtcp_sink; + GSList *srcpads; }; struct _GstRtpSsrcDemuxClass diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c index 275e7c7492..e7f72b403d 100644 --- a/gst/rtpmanager/rtpsession.c +++ b/gst/rtpmanager/rtpsession.c @@ -23,6 +23,8 @@ #include #include +#include "gstrtpbin-marshal.h" + #include "rtpsession.h" GST_DEBUG_CATEGORY_STATIC (rtp_session_debug); @@ -332,8 +334,8 @@ rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks, sess->callbacks.process_rtp = callbacks->process_rtp; sess->callbacks.send_rtp = callbacks->send_rtp; sess->callbacks.send_rtcp = callbacks->send_rtcp; + sess->callbacks.sync_rtcp = callbacks->sync_rtcp; sess->callbacks.clock_rate = callbacks->clock_rate; - sess->callbacks.get_time = callbacks->get_time; sess->callbacks.reconsider = callbacks->reconsider; sess->user_data = user_data; } @@ -911,13 +913,14 @@ rtp_session_create_source (RTPSession * sess) */ static void update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival, - gboolean rtp, GstBuffer * buffer) + gboolean rtp, GstBuffer * buffer, guint64 ntpnstime) { - /* get time or arrival */ - if (sess->callbacks.get_time) - arrival->time = sess->callbacks.get_time (sess, sess->user_data); - else - arrival->time = GST_CLOCK_TIME_NONE; + GTimeVal current; + + /* get time of arrival */ + g_get_current_time (¤t); + arrival->time = GST_TIMEVAL_TO_TIME (current); + arrival->ntpnstime = ntpnstime; /* get packet size including header overhead */ arrival->bytes = GST_BUFFER_SIZE (buffer) + sess->header_len; @@ -941,6 +944,7 @@ update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival, * rtp_session_process_rtp: * @sess: and #RTPSession * @buffer: an RTP buffer + * @ntpnstime: the NTP arrival time in nanoseconds * * Process an RTP buffer in the session manager. This function takes ownership * of @buffer. @@ -948,7 +952,8 @@ update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival, * Returns: a #GstFlowReturn. */ GstFlowReturn -rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer) +rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer, + guint64 ntpnstime) { GstFlowReturn result; guint32 ssrc; @@ -965,7 +970,7 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer) RTP_SESSION_LOCK (sess); /* update arrival stats */ - update_arrival_stats (sess, &arrival, TRUE, buffer); + update_arrival_stats (sess, &arrival, TRUE, buffer, ntpnstime); /* ignore more RTP packets when we left the session */ if (sess->source->received_bye) @@ -1047,6 +1052,33 @@ ignore: } } +static void +rtp_session_process_rb (RTPSession * sess, RTPSource * source, + GstRTCPPacket * packet, RTPArrivalStats * arrival) +{ + guint count, i; + + count = gst_rtcp_packet_get_rb_count (packet); + for (i = 0; i < count; i++) { + guint32 ssrc, exthighestseq, jitter, lsr, dlsr; + guint8 fractionlost; + gint32 packetslost; + + gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost, + &packetslost, &exthighestseq, &jitter, &lsr, &dlsr); + + GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter); + + if (ssrc == sess->source->ssrc) { + /* only deal with report blocks for our session, we update the stats of + * the sender of the RTCP message. We could also compare our stats against + * the other sender to see if we are better or worse. */ + rtp_source_process_rb (source, arrival->time, fractionlost, packetslost, + exthighestseq, jitter, lsr, dlsr); + } + } +} + /* A Sender report contains statistics about how the sender is doing. This * includes timing informataion such as the relation between RTP and NTP * timestamps and the number of packets/bytes it sent to us. @@ -1062,7 +1094,6 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet, { guint32 senderssrc, rtptime, packet_count, octet_count; guint64 ntptime; - guint count, i; RTPSource *source; gboolean created, prevsender; @@ -1074,11 +1105,13 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet, source = obtain_source (sess, senderssrc, &created, arrival, FALSE); + GST_BUFFER_OFFSET (packet->buffer) = source->clock_base; + prevsender = RTP_SOURCE_IS_SENDER (source); /* first update the source */ - rtp_source_process_sr (source, ntptime, rtptime, packet_count, octet_count, - arrival->time); + rtp_source_process_sr (source, arrival->time, ntptime, rtptime, packet_count, + octet_count); if (prevsender != RTP_SOURCE_IS_SENDER (source)) { sess->stats.sender_sources++; @@ -1089,25 +1122,7 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet, if (created) on_new_ssrc (sess, source); - count = gst_rtcp_packet_get_rb_count (packet); - for (i = 0; i < count; i++) { - guint32 ssrc, exthighestseq, jitter, lsr, dlsr; - guint8 fractionlost; - gint32 packetslost; - - gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost, - &packetslost, &exthighestseq, &jitter, &lsr, &dlsr); - - GST_DEBUG ("RB %d: %08x, %u", i, ssrc, jitter); - - if (ssrc == sess->source->ssrc) { - /* only deal with report blocks for our session, we update the stats of - * the sender of the RTCP message. We could also compare our stats against - * the other sender to see if we are better or worse. */ - rtp_source_process_rb (source, fractionlost, packetslost, - exthighestseq, jitter, lsr, dlsr); - } - } + rtp_session_process_rb (sess, source, packet, arrival); } /* A receiver report contains statistics about how a receiver is doing. It @@ -1121,7 +1136,6 @@ rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet, RTPArrivalStats * arrival) { guint32 senderssrc; - guint count, i; RTPSource *source; gboolean created; @@ -1134,20 +1148,7 @@ rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet, if (created) on_new_ssrc (sess, source); - count = gst_rtcp_packet_get_rb_count (packet); - for (i = 0; i < count; i++) { - guint32 ssrc, exthighestseq, jitter, lsr, dlsr; - guint8 fractionlost; - gint32 packetslost; - - gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost, - &packetslost, &exthighestseq, &jitter, &lsr, &dlsr); - - if (ssrc == sess->source->ssrc) { - rtp_source_process_rb (source, fractionlost, packetslost, - exthighestseq, jitter, lsr, dlsr); - } - } + rtp_session_process_rb (sess, source, packet, arrival); } /* FIXME, we're just printing this for now... */ @@ -1280,7 +1281,8 @@ rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet, * @sess: and #RTPSession * @buffer: an RTCP buffer * - * Process an RTCP buffer in the session manager. + * Process an RTCP buffer in the session manager. This function takes ownership + * of @buffer. * * Returns: a #GstFlowReturn. */ @@ -1288,8 +1290,9 @@ GstFlowReturn rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer) { GstRTCPPacket packet; - gboolean more, is_bye = FALSE; + gboolean more, is_bye = FALSE, is_sr = FALSE; RTPArrivalStats arrival; + GstFlowReturn result = GST_FLOW_OK; g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); @@ -1301,7 +1304,7 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer) RTP_SESSION_LOCK (sess); /* update arrival stats */ - update_arrival_stats (sess, &arrival, FALSE, buffer); + update_arrival_stats (sess, &arrival, FALSE, buffer, -1); if (sess->sent_bye) goto ignore; @@ -1322,6 +1325,7 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer) switch (type) { case GST_RTCP_TYPE_SR: rtp_session_process_sr (sess, &packet, &arrival); + is_sr = TRUE; break; case GST_RTCP_TYPE_RR: rtp_session_process_rr (sess, &packet, &arrival); @@ -1357,14 +1361,20 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer) } RTP_SESSION_UNLOCK (sess); - gst_buffer_unref (buffer); + /* notify caller of sr packets in the callback */ + if (is_sr && sess->callbacks.sync_rtcp) + result = sess->callbacks.sync_rtcp (sess, sess->source, buffer, + sess->user_data); + else + gst_buffer_unref (buffer); - return GST_FLOW_OK; + return result; /* ERRORS */ invalid_packet: { GST_DEBUG ("invalid RTCP packet received"); + gst_buffer_unref (buffer); return GST_FLOW_OK; } ignore: @@ -1380,6 +1390,7 @@ ignore: * rtp_session_send_rtp: * @sess: an #RTPSession * @buffer: an RTP buffer + * @ntptime: the NTP time of when this buffer was captured. * * Send the RTP buffer in the session manager. This function takes ownership of * @buffer. @@ -1387,11 +1398,12 @@ ignore: * Returns: a #GstFlowReturn. */ GstFlowReturn -rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer) +rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer, guint64 ntptime) { GstFlowReturn result; RTPSource *source; gboolean prevsender; + GTimeVal current; g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); @@ -1405,14 +1417,13 @@ rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer) source = sess->source; /* update last activity */ - if (sess->callbacks.get_time) - source->last_rtp_activity = - sess->callbacks.get_time (sess, sess->user_data); + g_get_current_time (¤t); + source->last_rtp_activity = GST_TIMEVAL_TO_TIME (current); prevsender = RTP_SOURCE_IS_SENDER (source); /* we use our own source to send */ - result = rtp_source_send_rtp (sess->source, buffer); + result = rtp_source_send_rtp (sess->source, buffer, ntptime); if (RTP_SOURCE_IS_SENDER (source) && !prevsender) sess->stats.sender_sources++; @@ -1429,36 +1440,6 @@ invalid_packet: } } -/** - * rtp_session_set_send_sync - * @sess: an #RTPSession - * @base_time: the clock base time - * @start_time: the timestamp start time - * - * Establish a relation between the times returned by the get_time callback and - * the buffer timestamps. This information is used to convert the NTP times to - * RTP timestamps. - */ -void -rtp_session_set_base_time (RTPSession * sess, GstClockTime base_time) -{ - g_return_if_fail (RTP_IS_SESSION (sess)); - - RTP_SESSION_LOCK (sess); - sess->base_time = base_time; - RTP_SESSION_UNLOCK (sess); -} - -void -rtp_session_set_timestamp_sync (RTPSession * sess, GstClockTime start_timestamp) -{ - g_return_if_fail (RTP_IS_SESSION (sess)); - - RTP_SESSION_LOCK (sess); - sess->start_timestamp = start_timestamp; - RTP_SESSION_UNLOCK (sess); -} - static GstClockTime calculate_rtcp_interval (RTPSession * sess, gboolean deterministic, gboolean first) @@ -1498,6 +1479,7 @@ rtp_session_send_bye (RTPSession * sess, const gchar * reason) GstFlowReturn result = GST_FLOW_OK; RTPSource *source; GstClockTime current, interval; + GTimeVal curtv; g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); @@ -1518,10 +1500,8 @@ rtp_session_send_bye (RTPSession * sess, const gchar * reason) sess->sent_bye = FALSE; /* get current time */ - if (sess->callbacks.get_time) - current = sess->callbacks.get_time (sess, sess->user_data); - else - current = 0; + g_get_current_time (&curtv); + current = GST_TIMEVAL_TO_TIME (curtv); /* reschedule transmission */ sess->last_rtcp_send_time = current; @@ -1543,12 +1523,12 @@ done: /** * rtp_session_next_timeout: * @sess: an #RTPSession - * @time: the current time + * @time: the current system time * * Get the next time we should perform session maintenance tasks. * * Returns: a time when rtp_session_on_timeout() should be called with the - * current time. + * current system time. */ GstClockTime rtp_session_next_timeout (RTPSession * sess, GstClockTime time) @@ -1588,6 +1568,7 @@ typedef struct RTPSession *sess; GstBuffer *rtcp; GstClockTime time; + guint64 ntpnstime; GstClockTime interval; GstRTCPPacket packet; gboolean is_bye; @@ -1605,60 +1586,22 @@ session_start_rtcp (RTPSession * sess, ReportData * data) if (RTP_SOURCE_IS_SENDER (own)) { guint64 ntptime; guint32 rtptime; - GstClockTime running_time; - GstClockTimeDiff diff; + guint32 packet_count, octet_count; /* we are a sender, create SR */ GST_DEBUG ("create SR for SSRC %08x", own->ssrc); gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet); - /* use the sync params to interpollate the date->time member to rtptime. We - * use the last sent timestamp and rtptime as reference points. We assume - * that the slope of the rtptime vs timestamp curve is 1, which is certainly - * sufficient for the frequency at which we report SR and the rate we send - * out RTP packets. */ - rtptime = own->last_rtptime; - GST_DEBUG ("last_timestamp %" GST_TIME_FORMAT ", last_rtptime %" - G_GUINT32_FORMAT, GST_TIME_ARGS (own->last_timestamp), rtptime); + /* get latest stats */ + rtp_source_get_new_sr (own, data->ntpnstime, &ntptime, &rtptime, + &packet_count, &octet_count); + /* store stats */ + rtp_source_process_sr (own, data->ntpnstime, ntptime, rtptime, packet_count, + octet_count); - if (own->clock_rate != -1) { - /* Start by calculating the running_time of the timestamp, this is a result - * in nanoseconds. */ - running_time = - (own->last_timestamp - sess->start_timestamp) + sess->base_time; - - /* get the diff with the SR time */ - diff = GST_CLOCK_DIFF (running_time, data->time); - - /* now translate the diff to RTP time, handle positive and negative cases. - * If there is no diff, we already set rtptime correctly above. */ - if (diff > 0) { - GST_DEBUG ("running_time %" GST_TIME_FORMAT ", diff %" GST_TIME_FORMAT, - GST_TIME_ARGS (running_time), GST_TIME_ARGS (diff)); - rtptime += gst_util_uint64_scale (diff, own->clock_rate, GST_SECOND); - } else { - diff = -diff; - GST_DEBUG ("running_time %" GST_TIME_FORMAT ", diff -%" GST_TIME_FORMAT, - GST_TIME_ARGS (running_time), GST_TIME_ARGS (diff)); - rtptime -= gst_util_uint64_scale (diff, own->clock_rate, GST_SECOND); - } - } else { - GST_WARNING ("no clock-rate, cannot interpollate rtp time"); - } - - /* convert clock time to NTP time. upper 32 bits should contain the seconds - * and the lower 32 bits, the fractions of a second. */ - ntptime = gst_util_uint64_scale (data->time, (1LL << 32), GST_SECOND); - /* conversion from unix timestamp (seconds since 1970) to NTP (seconds - * since 1900). FIXME nothing says that the time is in unix timestamps. */ - ntptime += (2208988800LL << 32); - - GST_DEBUG ("NTP %08x:%08x, RTP %" G_GUINT32_FORMAT, - (guint32) (ntptime >> 32), (guint32) (ntptime & 0xffffffff), rtptime); - - /* fill in sender report info, FIXME RTP timestamps missing */ + /* fill in sender report info */ gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc, - ntptime, rtptime, own->stats.packets_sent, own->stats.octets_sent); + ntptime, rtptime, packet_count, octet_count); } else { /* we are only receiver, create RR */ GST_DEBUG ("create RR for SSRC %08x", own->ssrc); @@ -1681,63 +1624,18 @@ session_report_blocks (const gchar * key, RTPSource * source, ReportData * data) if (gst_rtcp_packet_get_rb_count (packet) < GST_RTCP_MAX_RB_COUNT) { /* only report about other sender sources */ if (source != sess->source && RTP_SOURCE_IS_SENDER (source)) { - RTPSourceStats *stats; - guint64 extended_max, expected; - guint64 expected_interval, received_interval, ntptime; - gint64 lost, lost_interval; - guint32 fraction, LSR, DLSR; - GstClockTime time; + guint8 fractionlost; + gint32 packetslost; + guint32 exthighestseq, jitter; + guint32 lsr, dlsr; - stats = &source->stats; - - extended_max = stats->cycles + stats->max_seq; - expected = extended_max - stats->base_seq + 1; - - GST_DEBUG ("ext_max %" G_GUINT64_FORMAT ", expected %" G_GUINT64_FORMAT - ", received %" G_GUINT64_FORMAT ", base_seq %" G_GUINT32_FORMAT, - extended_max, expected, stats->packets_received, stats->base_seq); - - lost = expected - stats->packets_received; - lost = CLAMP (lost, -0x800000, 0x7fffff); - - expected_interval = expected - stats->prev_expected; - stats->prev_expected = expected; - received_interval = stats->packets_received - stats->prev_received; - stats->prev_received = stats->packets_received; - - lost_interval = expected_interval - received_interval; - - if (expected_interval == 0 || lost_interval <= 0) - fraction = 0; - else - fraction = (lost_interval << 8) / expected_interval; - - GST_DEBUG ("add RR for SSRC %08x", source->ssrc); - /* we scaled the jitter up for additional precision */ - GST_DEBUG ("fraction %" G_GUINT32_FORMAT ", lost %" G_GINT64_FORMAT - ", extseq %" G_GUINT64_FORMAT ", jitter %d", fraction, lost, - extended_max, stats->jitter >> 4); - - if (rtp_source_get_last_sr (source, &ntptime, NULL, NULL, NULL, &time)) { - GstClockTime diff; - - /* LSR is middle bits of the last ntptime */ - LSR = (ntptime >> 16) & 0xffffffff; - diff = data->time - time; - GST_DEBUG ("last SR time diff %" GST_TIME_FORMAT, GST_TIME_ARGS (diff)); - /* DLSR, delay since last SR is expressed in 1/65536 second units */ - DLSR = gst_util_uint64_scale_int (diff, 65536, GST_SECOND); - } else { - /* No valid SR received, LSR/DLSR are set to 0 then */ - GST_DEBUG ("no valid SR received"); - LSR = 0; - DLSR = 0; - } - GST_DEBUG ("LSR %08x, DLSR %08x", LSR, DLSR); + /* get new stats */ + rtp_source_get_new_rb (source, data->time, &fractionlost, &packetslost, + &exthighestseq, &jitter, &lsr, &dlsr); /* packet is not yet filled, add report block for this source. */ - gst_rtcp_packet_add_rb (packet, source->ssrc, fraction, lost, - extended_max, stats->jitter >> 4, LSR, DLSR); + gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost, + exthighestseq, jitter, lsr, dlsr); } } } @@ -1784,7 +1682,6 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data) if (is_sender) { if (data->time > source->last_rtp_activity) { interval = MAX (data->interval * 2, 5 * GST_SECOND); - if (data->time - source->last_rtp_activity > interval) { GST_DEBUG ("sender source %08x timed out and became receiver, last %" GST_TIME_FORMAT, source->ssrc, @@ -1897,6 +1794,8 @@ is_rtcp_time (RTPSession * sess, GstClockTime time, ReportData * data) /** * rtp_session_on_timeout: * @sess: an #RTPSession + * @time: the current system time + * @ntpnstime: the current NTP time in nanoseconds * * Perform maintenance actions after the timeout obtained with * rtp_session_next_timeout() expired. @@ -1910,21 +1809,23 @@ is_rtcp_time (RTPSession * sess, GstClockTime time, ReportData * data) * Returns: a #GstFlowReturn. */ GstFlowReturn -rtp_session_on_timeout (RTPSession * sess, GstClockTime time) +rtp_session_on_timeout (RTPSession * sess, GstClockTime time, guint64 ntpnstime) { GstFlowReturn result = GST_FLOW_OK; ReportData data; g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); + GST_DEBUG ("reporting at %" GST_TIME_FORMAT ", NTP time %" GST_TIME_FORMAT, + GST_TIME_ARGS (time), GST_TIME_ARGS (ntpnstime)); + data.sess = sess; data.rtcp = NULL; data.time = time; + data.ntpnstime = ntpnstime; data.is_bye = FALSE; data.has_sdes = FALSE; - GST_DEBUG ("reporting at %" GST_TIME_FORMAT, GST_TIME_ARGS (time)); - RTP_SESSION_LOCK (sess); /* get a new interval, we need this for various cleanups etc */ data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp); diff --git a/gst/rtpmanager/rtpsession.h b/gst/rtpmanager/rtpsession.h index 9380b55e72..d7dbb784f2 100644 --- a/gst/rtpmanager/rtpsession.h +++ b/gst/rtpmanager/rtpsession.h @@ -81,6 +81,20 @@ typedef GstFlowReturn (*RTPSessionSendRTP) (RTPSession *sess, RTPSource *src, Gs */ typedef GstFlowReturn (*RTPSessionSendRTCP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer, gpointer user_data); +/** + * RTPSessionSyncRTCP: + * @sess: an #RTPSession + * @src: the #RTPSource + * @buffer: the RTCP buffer ready for sending + * @user_data: user data specified when registering + * + * This callback will be called when @sess has and SR @buffer ready for doing + * synchronisation between streams. + * + * Returns: a #GstFlowReturn. + */ +typedef GstFlowReturn (*RTPSessionSyncRTCP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer, gpointer user_data); + /** * RTPSessionClockRate: * @sess: an #RTPSession @@ -93,18 +107,6 @@ typedef GstFlowReturn (*RTPSessionSendRTCP) (RTPSession *sess, RTPSource *src, G */ typedef gint (*RTPSessionClockRate) (RTPSession *sess, guint8 payload, gpointer user_data); -/** - * RTPSessionGetTime: - * @sess: an #RTPSession - * @user_data: user data specified when registering - * - * This callback will be called when @sess needs the current time in - * nanoseconds. - * - * Returns: a #GstClockTime with the current time in nanoseconds. - */ -typedef GstClockTime (*RTPSessionGetTime) (RTPSession *sess, gpointer user_data); - /** * RTPSessionReconsider: * @sess: an #RTPSession @@ -121,7 +123,7 @@ typedef void (*RTPSessionReconsider) (RTPSession *sess, gpointer user_data); * @RTPSessionProcessRTP: callback to process RTP packets * @RTPSessionSendRTP: callback for sending RTP packets * @RTPSessionSendRTCP: callback for sending RTCP packets - * @RTPSessionGetTime: callback for returning the current time + * @RTPSessionSyncRTCP: callback for handling SR packets * @RTPSessionReconsider: callback for reconsidering the timeout * * These callbacks can be installed on the session manager to get notification @@ -132,8 +134,8 @@ typedef struct { RTPSessionProcessRTP process_rtp; RTPSessionSendRTP send_rtp; RTPSessionSendRTCP send_rtcp; + RTPSessionSyncRTCP sync_rtcp; RTPSessionClockRate clock_rate; - RTPSessionGetTime get_time; RTPSessionReconsider reconsider; } RTPSessionCallbacks; @@ -190,8 +192,7 @@ struct _RTPSession { RTPSessionStats stats; - /* for mapping RTP time to NTP time */ - GstClockTime start_timestamp; + /* for mapping clock time to NTP time */ GstClockTime base_time; }; @@ -250,18 +251,17 @@ RTPSource* rtp_session_get_source_by_cname (RTPSession *sess, const gcha RTPSource* rtp_session_create_source (RTPSession *sess); /* processing packets from receivers */ -GstFlowReturn rtp_session_process_rtp (RTPSession *sess, GstBuffer *buffer); +GstFlowReturn rtp_session_process_rtp (RTPSession *sess, GstBuffer *buffer, guint64 ntpnstime); GstFlowReturn rtp_session_process_rtcp (RTPSession *sess, GstBuffer *buffer); /* processing packets for sending */ -GstFlowReturn rtp_session_send_rtp (RTPSession *sess, GstBuffer *buffer); -void rtp_session_set_base_time (RTPSession *sess, GstClockTime base_time); -void rtp_session_set_timestamp_sync (RTPSession *sess, GstClockTime start_timestamp); +GstFlowReturn rtp_session_send_rtp (RTPSession *sess, GstBuffer *buffer, guint64 ntptime); + /* stopping the session */ GstFlowReturn rtp_session_send_bye (RTPSession *sess, const gchar *reason); /* get interval for next RTCP interval */ GstClockTime rtp_session_next_timeout (RTPSession *sess, GstClockTime time); -GstFlowReturn rtp_session_on_timeout (RTPSession *sess, GstClockTime time); +GstFlowReturn rtp_session_on_timeout (RTPSession *sess, GstClockTime time, guint64 ntpnstime); #endif /* __RTP_SESSION_H__ */ diff --git a/gst/rtpmanager/rtpsource.c b/gst/rtpmanager/rtpsource.c index 24bb84665a..63543358e5 100644 --- a/gst/rtpmanager/rtpsource.c +++ b/gst/rtpmanager/rtpsource.c @@ -68,7 +68,12 @@ rtp_source_init (RTPSource * src) src->payload = 0; src->clock_rate = -1; + src->clock_base = -1; + src->skew_base_ntpnstime = -1; + src->ext_rtptime = -1; + src->prev_ext_rtptime = -1; src->packets = g_queue_new (); + src->seqnum_base = -1; src->stats.cycles = -1; src->stats.jitter = 0; @@ -111,6 +116,44 @@ rtp_source_new (guint32 ssrc) return src; } +/** + * rtp_source_update_caps: + * @src: an #RTPSource + * @caps: a #GstCaps + * + * Parse @caps and store all relevant information in @source. + */ +void +rtp_source_update_caps (RTPSource * src, GstCaps * caps) +{ + GstStructure *s; + guint val; + gint ival; + + /* nothing changed, return */ + if (src->caps == caps) + return; + + s = gst_caps_get_structure (caps, 0); + + if (gst_structure_get_int (s, "payload", &ival)) + src->payload = ival; + GST_DEBUG ("got payload %d", src->payload); + + gst_structure_get_int (s, "clock-rate", &src->clock_rate); + GST_DEBUG ("got clock-rate %d", src->clock_rate); + + if (gst_structure_get_uint (s, "clock-base", &val)) + src->clock_base = val; + GST_DEBUG ("got clock-base %" G_GINT64_FORMAT, src->clock_base); + + if (gst_structure_get_uint (s, "seqnum-base", &val)) + src->seqnum_base = val; + GST_DEBUG ("got seqnum-base %" G_GINT32_FORMAT, src->seqnum_base); + + gst_caps_replace (&src->caps, caps); +} + /** * rtp_source_set_callbacks: * @src: an #RTPSource @@ -207,7 +250,7 @@ push_packet (RTPSource * src, GstBuffer * buffer) static gint get_clock_rate (RTPSource * src, guint8 payload) { - if (payload != src->payload) { + if (src->clock_rate == -1) { gint clock_rate = -1; if (src->callbacks.clock_rate) @@ -216,8 +259,9 @@ get_clock_rate (RTPSource * src, guint8 payload) GST_DEBUG ("new payload %d, got clock-rate %d", payload, clock_rate); src->clock_rate = clock_rate; - src->payload = payload; } + src->payload = payload; + return src->clock_rate; } @@ -225,14 +269,17 @@ static void calculate_jitter (RTPSource * src, GstBuffer * buffer, RTPArrivalStats * arrival) { - GstClockTime current; + guint64 ntpnstime; guint32 rtparrival, transit, rtptime; + guint64 ext_rtptime; gint32 diff; gint clock_rate; guint8 pt; + guint64 rtpdiff, ntpdiff; + gint64 skew; /* get arrival time */ - if ((current = arrival->time) == GST_CLOCK_TIME_NONE) + if ((ntpnstime = arrival->ntpnstime) == GST_CLOCK_TIME_NONE) goto no_time; pt = gst_rtp_buffer_get_payload_type (buffer); @@ -243,8 +290,56 @@ calculate_jitter (RTPSource * src, GstBuffer * buffer, rtptime = gst_rtp_buffer_get_timestamp (buffer); - /* convert arrival time to RTP timestamp units */ - rtparrival = gst_util_uint64_scale_int (current, clock_rate, GST_SECOND); + /* convert to extended timestamp right away */ + ext_rtptime = gst_rtp_buffer_ext_timestamp (&src->ext_rtptime, rtptime); + + /* no clock-base, take first rtptime as base */ + if (src->clock_base == -1) { + GST_DEBUG ("using clock-base of %" G_GUINT32_FORMAT, rtptime); + src->clock_base = rtptime; + } + + if (src->skew_base_ntpnstime == -1) { + /* lock on first observed NTP and RTP time, they should increment in-sync or + * we have a clock skew. */ + GST_DEBUG ("using base_ntpnstime of %" GST_TIME_FORMAT, + GST_TIME_ARGS (ntpnstime)); + src->skew_base_ntpnstime = ntpnstime; + src->skew_base_rtptime = rtptime; + src->prev_ext_rtptime = ext_rtptime; + src->avg_skew = 0; + } else if (src->prev_ext_rtptime < ext_rtptime) { + /* get elapsed rtptime but only when the previous rtptime was stricly smaller + * than the new one. */ + rtpdiff = ext_rtptime - src->skew_base_rtptime; + /* get NTP diff and convert to RTP time, this is always positive */ + ntpdiff = ntpnstime - src->skew_base_ntpnstime; + ntpdiff = gst_util_uint64_scale_int (ntpdiff, clock_rate, GST_SECOND); + + /* see how the NTP and RTP relate any deviation from 0 means that they drift + * out of sync and we must compensate. */ + skew = ntpdiff - rtpdiff; + /* average out the skew to get a smooth value. */ + src->avg_skew = (31 * src->avg_skew + skew) / 32; + + GST_DEBUG ("skew %" G_GINT64_FORMAT ", avg %" G_GINT64_FORMAT, skew, + src->avg_skew); + if (src->avg_skew != 0) { + guint32 timestamp; + + /* patch the buffer RTP timestamp with the skew */ + GST_DEBUG ("adjusting timestamp %" G_GINT64_FORMAT, src->avg_skew); + timestamp = gst_rtp_buffer_get_timestamp (buffer); + timestamp += src->avg_skew; + gst_rtp_buffer_set_timestamp (buffer, timestamp); + } + /* store previous extended timestamp */ + src->prev_ext_rtptime = ext_rtptime; + } + + /* convert arrival time to RTP timestamp units, truncate to 32 bits, we don't + * care about the absolute value, just the difference. */ + rtparrival = gst_util_uint64_scale_int (ntpnstime, clock_rate, GST_SECOND); /* transit time is difference with RTP timestamp */ transit = rtparrival - rtptime; @@ -324,6 +419,8 @@ rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer, seqnr = gst_rtp_buffer_get_seq (buffer); + rtp_source_update_caps (src, GST_BUFFER_CAPS (buffer)); + if (stats->cycles == -1) { GST_DEBUG ("received first buffer"); /* first time we heard of this source */ @@ -389,6 +486,7 @@ rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer, } } else { /* duplicate or reordered packet, will be filtered by jitterbuffer. */ + GST_WARNING ("duplicate or reordered packet"); } src->stats.octets_received += arrival->payload_len; @@ -401,7 +499,7 @@ rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer, GST_DEBUG ("seq %d, PC: %" G_GUINT64_FORMAT ", OC: %" G_GUINT64_FORMAT, seqnr, src->stats.packets_received, src->stats.octets_received); - /* calculate jitter */ + /* calculate jitter and perform skew correction */ calculate_jitter (src, buffer, arrival); /* we're ready to push the RTP packet now */ @@ -444,25 +542,27 @@ rtp_source_process_bye (RTPSource * src, const gchar * reason) * rtp_source_send_rtp: * @src: an #RTPSource * @buffer: an RTP buffer + * @ntpnstime: the NTP time when this buffer was captured in nanoseconds * * Send an RTP @buffer originating from @src. This will make @src a sender. * This function takes ownership of @buffer and modifies the SSRC in the RTP - * packet to that of @src. + * packet to that of @src when needed. * * Returns: a #GstFlowReturn. */ GstFlowReturn -rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer) +rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer, guint64 ntpnstime) { GstFlowReturn result = GST_FLOW_OK; guint len; - GstClockTime timestamp; g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); len = gst_rtp_buffer_get_payload_len (buffer); + rtp_source_update_caps (src, GST_BUFFER_CAPS (buffer)); + /* we are a sender now */ src->is_sender = TRUE; @@ -471,18 +571,9 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer) src->stats.octets_sent += len; /* we keep track of the last received RTP timestamp and the corresponding - * GStreamer timestamp so that we can convert NTP time to RTP time when - * sending SR reports */ + * NTP timestamp so that we can use this info when constructing SR reports */ src->last_rtptime = gst_rtp_buffer_get_timestamp (buffer); - - /* the timestamp can be undefined, in that case we use any previously - * received timestamp */ - timestamp = GST_BUFFER_TIMESTAMP (buffer); - if (timestamp != -1) - src->last_timestamp = timestamp; - - if (src->clock_rate == -1) - get_clock_rate (src, gst_rtp_buffer_get_payload_type (buffer)); + src->last_ntpnstime = ntpnstime; /* push packet */ if (src->callbacks.push_rtp) { @@ -496,7 +587,7 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer) * get the correct SSRC. */ buffer = gst_buffer_make_writable (buffer); - GST_DEBUG ("updating SSRC from %u to %u", ssrc, src->ssrc); + GST_DEBUG ("updating SSRC from %08x to %08x", ssrc, src->ssrc); gst_rtp_buffer_set_ssrc (buffer, src->ssrc); } GST_DEBUG ("pushing RTP packet %" G_GUINT64_FORMAT, @@ -513,17 +604,17 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer) /** * rtp_source_process_sr: * @src: an #RTPSource + * @time: time of packet arrival * @ntptime: the NTP time * @rtptime: the RTP time * @packet_count: the packet count * @octet_count: the octect count - * @time: time of packet arrival * * Update the sender report in @src. */ void -rtp_source_process_sr (RTPSource * src, guint64 ntptime, guint32 rtptime, - guint32 packet_count, guint32 octet_count, GstClockTime time) +rtp_source_process_sr (RTPSource * src, GstClockTime time, guint64 ntptime, + guint32 rtptime, guint32 packet_count, guint32 octet_count) { RTPSenderReport *curr; gint curridx; @@ -556,6 +647,7 @@ rtp_source_process_sr (RTPSource * src, guint64 ntptime, guint32 rtptime, /** * rtp_source_process_rb: * @src: an #RTPSource + * @time: the current time in nanoseconds since 1970 * @fractionlost: fraction lost since last SR/RR * @packetslost: the cumululative number of packets lost * @exthighestseq: the extended last sequence number received @@ -566,18 +658,20 @@ rtp_source_process_sr (RTPSource * src, guint64 ntptime, guint32 rtptime, * Update the report block in @src. */ void -rtp_source_process_rb (RTPSource * src, guint8 fractionlost, gint32 packetslost, - guint32 exthighestseq, guint32 jitter, guint32 lsr, guint32 dlsr) +rtp_source_process_rb (RTPSource * src, GstClockTime time, guint8 fractionlost, + gint32 packetslost, guint32 exthighestseq, guint32 jitter, guint32 lsr, + guint32 dlsr) { RTPReceiverReport *curr; gint curridx; + guint32 ntp, A; g_return_if_fail (RTP_IS_SOURCE (src)); - GST_DEBUG ("got RB packet: SSRC %08x, FL %" G_GUINT32_FORMAT "" - ", PL %d, HS %" G_GUINT32_FORMAT ", JITTER %" G_GUINT32_FORMAT - ", LSR %08x, DLSR %08x", src->ssrc, fractionlost, packetslost, - exthighestseq, jitter, lsr, dlsr); + GST_DEBUG ("got RB packet: SSRC %08x, FL %2x, PL %d, HS %" G_GUINT32_FORMAT + ", jitter %" G_GUINT32_FORMAT ", LSR %04x:%04x, DLSR %04x:%04x", + src->ssrc, fractionlost, packetslost, exthighestseq, jitter, lsr >> 16, + lsr & 0xffff, dlsr >> 16, dlsr & 0xffff); curridx = src->stats.curr_rr ^ 1; curr = &src->stats.rr[curridx]; @@ -591,26 +685,198 @@ rtp_source_process_rb (RTPSource * src, guint8 fractionlost, gint32 packetslost, curr->lsr = lsr; curr->dlsr = dlsr; + /* calculate round trip */ + ntp = (gst_rtcp_unix_to_ntp (time) >> 16) & 0xffffffff; + A = ntp - dlsr; + A -= lsr; + curr->round_trip = A; + + GST_DEBUG ("NTP %04x:%04x, round trip %04x:%04x", ntp >> 16, ntp & 0xffff, + A >> 16, A & 0xffff); + /* make current */ src->stats.curr_rr = curridx; } /** - * rtp_source_get_last_sr: + * rtp_source_get_new_sr: * @src: an #RTPSource + * @time: the current time in nanoseconds since 1970 * @ntptime: the NTP time * @rtptime: the RTP time * @packet_count: the packet count * @octet_count: the octect count + * + * Get new values to put into a new SR report from this source. + * + * Returns: %TRUE on success. + */ +gboolean +rtp_source_get_new_sr (RTPSource * src, GstClockTime ntpnstime, + guint64 * ntptime, guint32 * rtptime, guint32 * packet_count, + guint32 * octet_count) +{ + guint32 t_rtp; + guint64 t_current_ntp; + GstClockTimeDiff diff; + + g_return_val_if_fail (RTP_IS_SOURCE (src), FALSE); + + /* use the sync params to interpollate the date->time member to rtptime. We + * use the last sent timestamp and rtptime as reference points. We assume + * that the slope of the rtptime vs timestamp curve is 1, which is certainly + * sufficient for the frequency at which we report SR and the rate we send + * out RTP packets. */ + t_rtp = src->last_rtptime; + + GST_DEBUG ("last_ntpnstime %" GST_TIME_FORMAT ", last_rtptime %" + G_GUINT32_FORMAT, GST_TIME_ARGS (src->last_ntpnstime), t_rtp); + + if (src->clock_rate != -1) { + /* get the diff with the SR time */ + diff = GST_CLOCK_DIFF (src->last_ntpnstime, ntpnstime); + + /* now translate the diff to RTP time, handle positive and negative cases. + * If there is no diff, we already set rtptime correctly above. */ + if (diff > 0) { + GST_DEBUG ("ntpnstime %" GST_TIME_FORMAT ", diff %" GST_TIME_FORMAT, + GST_TIME_ARGS (ntpnstime), GST_TIME_ARGS (diff)); + t_rtp += gst_util_uint64_scale_int (diff, src->clock_rate, GST_SECOND); + } else { + diff = -diff; + GST_DEBUG ("ntpnstime %" GST_TIME_FORMAT ", diff -%" GST_TIME_FORMAT, + GST_TIME_ARGS (ntpnstime), GST_TIME_ARGS (diff)); + t_rtp -= gst_util_uint64_scale_int (diff, src->clock_rate, GST_SECOND); + } + } else { + GST_WARNING ("no clock-rate, cannot interpollate rtp time"); + } + + t_current_ntp = gst_util_uint64_scale (ntpnstime, (1LL << 32), GST_SECOND); + + GST_DEBUG ("NTP %08x:%08x, RTP %" G_GUINT32_FORMAT, + (guint32) (t_current_ntp >> 32), (guint32) (t_current_ntp & 0xffffffff), + t_rtp); + + if (ntptime) + *ntptime = t_current_ntp; + if (rtptime) + *rtptime = t_rtp; + if (packet_count) + *packet_count = src->stats.packets_sent; + if (octet_count) + *octet_count = src->stats.octets_sent; + + return TRUE; +} + +/** + * rtp_source_get_new_rb: + * @src: an #RTPSource + * @time: the current time in nanoseconds since 1970 + * @fractionlost: fraction lost since last SR/RR + * @packetslost: the cumululative number of packets lost + * @exthighestseq: the extended last sequence number received + * @jitter: the interarrival jitter + * @lsr: the last SR packet from this source + * @dlsr: the delay since last SR packet + * + * Get the values of the last RB report set with rtp_source_process_rb(). + * + * Returns: %TRUE on success. + */ +gboolean +rtp_source_get_new_rb (RTPSource * src, GstClockTime time, + guint8 * fractionlost, gint32 * packetslost, guint32 * exthighestseq, + guint32 * jitter, guint32 * lsr, guint32 * dlsr) +{ + RTPSourceStats *stats; + guint64 extended_max, expected; + guint64 expected_interval, received_interval, ntptime; + gint64 lost, lost_interval; + guint32 fraction, LSR, DLSR; + GstClockTime sr_time; + + stats = &src->stats; + + extended_max = stats->cycles + stats->max_seq; + expected = extended_max - stats->base_seq + 1; + + GST_DEBUG ("ext_max %" G_GUINT64_FORMAT ", expected %" G_GUINT64_FORMAT + ", received %" G_GUINT64_FORMAT ", base_seq %" G_GUINT32_FORMAT, + extended_max, expected, stats->packets_received, stats->base_seq); + + lost = expected - stats->packets_received; + lost = CLAMP (lost, -0x800000, 0x7fffff); + + expected_interval = expected - stats->prev_expected; + stats->prev_expected = expected; + received_interval = stats->packets_received - stats->prev_received; + stats->prev_received = stats->packets_received; + + lost_interval = expected_interval - received_interval; + + if (expected_interval == 0 || lost_interval <= 0) + fraction = 0; + else + fraction = (lost_interval << 8) / expected_interval; + + GST_DEBUG ("add RR for SSRC %08x", src->ssrc); + /* we scaled the jitter up for additional precision */ + GST_DEBUG ("fraction %" G_GUINT32_FORMAT ", lost %" G_GINT64_FORMAT + ", extseq %" G_GUINT64_FORMAT ", jitter %d", fraction, lost, + extended_max, stats->jitter >> 4); + + if (rtp_source_get_last_sr (src, &sr_time, &ntptime, NULL, NULL, NULL)) { + GstClockTime diff; + + /* LSR is middle 32 bits of the last ntptime */ + LSR = (ntptime >> 16) & 0xffffffff; + diff = time - sr_time; + GST_DEBUG ("last SR time diff %" GST_TIME_FORMAT, GST_TIME_ARGS (diff)); + /* DLSR, delay since last SR is expressed in 1/65536 second units */ + DLSR = gst_util_uint64_scale_int (diff, 65536, GST_SECOND); + } else { + /* No valid SR received, LSR/DLSR are set to 0 then */ + GST_DEBUG ("no valid SR received"); + LSR = 0; + DLSR = 0; + } + GST_DEBUG ("LSR %04x:%04x, DLSR %04x:%04x", LSR >> 16, LSR & 0xffff, + DLSR >> 16, DLSR & 0xffff); + + if (fractionlost) + *fractionlost = fraction; + if (packetslost) + *packetslost = lost; + if (exthighestseq) + *exthighestseq = extended_max; + if (jitter) + *jitter = stats->jitter >> 4; + if (lsr) + *lsr = LSR; + if (dlsr) + *dlsr = DLSR; + + return TRUE; +} + +/** + * rtp_source_get_last_sr: + * @src: an #RTPSource * @time: time of packet arrival + * @ntptime: the NTP time + * @rtptime: the RTP time + * @packet_count: the packet count + * @octet_count: the octect count * * Get the values of the last sender report as set with rtp_source_process_sr(). * * Returns: %TRUE if there was a valid SR report. */ gboolean -rtp_source_get_last_sr (RTPSource * src, guint64 * ntptime, guint32 * rtptime, - guint32 * packet_count, guint32 * octet_count, GstClockTime * time) +rtp_source_get_last_sr (RTPSource * src, GstClockTime * time, guint64 * ntptime, + guint32 * rtptime, guint32 * packet_count, guint32 * octet_count) { RTPSenderReport *curr; diff --git a/gst/rtpmanager/rtpsource.h b/gst/rtpmanager/rtpsource.h index 7920b6f4fe..be7934617b 100644 --- a/gst/rtpmanager/rtpsource.h +++ b/gst/rtpmanager/rtpsource.h @@ -134,13 +134,25 @@ struct _RTPSource { GstNetAddress rtcp_from; guint8 payload; + GstCaps *caps; gint clock_rate; + gint32 seqnum_base; + + gint64 clock_base; + + /* to calculate the clock skew */ + guint64 skew_base_ntpnstime; + guint64 skew_base_rtptime; + gint64 avg_skew; + guint64 ext_rtptime; + guint64 prev_ext_rtptime; GstClockTime bye_time; GstClockTime last_activity; GstClockTime last_rtp_activity; - GstClockTime last_timestamp; + GstClockTime last_rtptime; + GstClockTime last_ntpnstime; GQueue *packets; @@ -158,6 +170,7 @@ GType rtp_source_get_type (void); /* managing lifetime of sources */ RTPSource* rtp_source_new (guint32 ssrc); +void rtp_source_update_caps (RTPSource *src, GstCaps *caps); void rtp_source_set_callbacks (RTPSource *src, RTPSourceCallbacks *cb, gpointer data); void rtp_source_set_as_csrc (RTPSource *src); @@ -168,18 +181,24 @@ void rtp_source_set_rtcp_from (RTPSource *src, GstNetAddress *addres /* handling RTP */ GstFlowReturn rtp_source_process_rtp (RTPSource *src, GstBuffer *buffer, RTPArrivalStats *arrival); -GstFlowReturn rtp_source_send_rtp (RTPSource *src, GstBuffer *buffer); +GstFlowReturn rtp_source_send_rtp (RTPSource *src, GstBuffer *buffer, guint64 ntpnstime); /* RTCP messages */ void rtp_source_process_bye (RTPSource *src, const gchar *reason); -void rtp_source_process_sr (RTPSource *src, guint64 ntptime, guint32 rtptime, - guint32 packet_count, guint32 octet_count, GstClockTime time); -void rtp_source_process_rb (RTPSource *src, guint8 fractionlost, gint32 packetslost, - guint32 exthighestseq, guint32 jitter, +void rtp_source_process_sr (RTPSource *src, GstClockTime time, guint64 ntptime, + guint32 rtptime, guint32 packet_count, guint32 octet_count); +void rtp_source_process_rb (RTPSource *src, GstClockTime time, guint8 fractionlost, + gint32 packetslost, guint32 exthighestseq, guint32 jitter, guint32 lsr, guint32 dlsr); -gboolean rtp_source_get_last_sr (RTPSource *src, guint64 *ntptime, guint32 *rtptime, - guint32 *packet_count, guint32 *octet_count, GstClockTime *time); +gboolean rtp_source_get_new_sr (RTPSource *src, GstClockTime time, guint64 *ntptime, + guint32 *rtptime, guint32 *packet_count, guint32 *octet_count); +gboolean rtp_source_get_new_rb (RTPSource *src, GstClockTime time, guint8 *fractionlost, + gint32 *packetslost, guint32 *exthighestseq, guint32 *jitter, + guint32 *lsr, guint32 *dlsr); + +gboolean rtp_source_get_last_sr (RTPSource *src, GstClockTime *time, guint64 *ntptime, + guint32 *rtptime, guint32 *packet_count, guint32 *octet_count); gboolean rtp_source_get_last_rb (RTPSource *src, guint8 *fractionlost, gint32 *packetslost, guint32 *exthighestseq, guint32 *jitter, guint32 *lsr, guint32 *dlsr); diff --git a/gst/rtpmanager/rtpstats.h b/gst/rtpmanager/rtpstats.h index 0ee1ed1ea0..e2e4e397dc 100644 --- a/gst/rtpmanager/rtpstats.h +++ b/gst/rtpmanager/rtpstats.h @@ -51,6 +51,7 @@ typedef struct { guint32 jitter; guint32 lsr; guint32 dlsr; + guint32 round_trip; } RTPReceiverReport; /** @@ -64,6 +65,7 @@ typedef struct { */ typedef struct { GstClockTime time; + guint64 ntpnstime; gboolean have_address; GstNetAddress address; guint bytes;