From 9c95072048feee72fdd00e05b5b2d77f939ce9d5 Mon Sep 17 00:00:00 2001 From: Mark Nauwelaerts Date: Mon, 8 Aug 2011 12:15:20 +0200 Subject: [PATCH] rtpbin: alternative inter-stream syncing methods ... at least if not syncing to NPT time: * either sync using RTCP SR data (as currently) * only perform the above once using initial RTCP SR packets * discard RTCP and sync by equating provided stream's clock-base rtptime, as provided by jitterbuffer (typically obtained from RTP-Info in RTSP). --- gst/rtpmanager/gstrtpbin.c | 171 ++++++++++++++++++++++++++++++++++--- gst/rtpmanager/gstrtpbin.h | 1 + 2 files changed, 161 insertions(+), 11 deletions(-) diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c index e9b3164b64..c3ca387f9d 100644 --- a/gst/rtpmanager/gstrtpbin.c +++ b/gst/rtpmanager/gstrtpbin.c @@ -248,6 +248,7 @@ enum #define DEFAULT_AUTOREMOVE FALSE #define DEFAULT_BUFFER_MODE RTP_JITTER_BUFFER_MODE_SLAVE #define DEFAULT_USE_PIPELINE_CLOCK FALSE +#define DEFAULT_RTCP_SYNC GST_RTP_BIN_RTCP_SYNC_ALWAYS #define DEFAULT_RTCP_SYNC_INTERVAL 0 enum @@ -258,6 +259,7 @@ enum PROP_DO_LOST, PROP_IGNORE_PT, PROP_NTP_SYNC, + PROP_RTCP_SYNC, PROP_RTCP_SYNC_INTERVAL, PROP_AUTOREMOVE, PROP_BUFFER_MODE, @@ -265,6 +267,31 @@ enum PROP_LAST }; +enum +{ + GST_RTP_BIN_RTCP_SYNC_ALWAYS, + GST_RTP_BIN_RTCP_SYNC_INITIAL, + GST_RTP_BIN_RTCP_SYNC_RTP +}; + +#define GST_RTP_BIN_RTCP_SYNC_TYPE (gst_rtp_bin_rtcp_sync_get_type()) +static GType +gst_rtp_bin_rtcp_sync_get_type (void) +{ + static GType rtcp_sync_type = 0; + static const GEnumValue rtcp_sync_types[] = { + {GST_RTP_BIN_RTCP_SYNC_ALWAYS, "always", "always"}, + {GST_RTP_BIN_RTCP_SYNC_INITIAL, "initial", "initial"}, + {GST_RTP_BIN_RTCP_SYNC_RTP, "rtp-info", "rtp-info"}, + {0, NULL, NULL}, + }; + + if (!rtcp_sync_type) { + rtcp_sync_type = g_enum_register_static ("GstRTCPSync", rtcp_sync_types); + } + return rtcp_sync_type; +} + /* helper objects */ typedef struct _GstRtpBinSession GstRtpBinSession; typedef struct _GstRtpBinStream GstRtpBinStream; @@ -315,6 +342,9 @@ struct _GstRtpBinStream gboolean have_sync; /* mapping to local RTP and NTP time */ gint64 rt_delta; + gint64 rtp_delta; + /* base rtptime in gst time */ + gint64 clock_base; }; #define GST_RTP_SESSION_LOCK(sess) g_mutex_lock ((sess)->lock) @@ -780,6 +810,8 @@ gst_rtp_bin_reset_sync (GstRtpBin * rtpbin) * lip-sync */ stream->have_sync = FALSE; stream->rt_delta = 0; + stream->rtp_delta = 0; + stream->clock_base = -100 * GST_SECOND; } } GST_RTP_BIN_UNLOCK (rtpbin); @@ -984,7 +1016,8 @@ stream_set_ts_offset (GstRtpBin * bin, GstRtpBinStream * stream, static void gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, guint8 * data, guint64 ntptime, guint64 last_extrtptime, - guint64 base_rtptime, guint64 base_time, guint clock_rate) + guint64 base_rtptime, guint64 base_time, guint clock_rate, + gint64 rtp_clock_base) { GstRtpBinClient *client; gboolean created; @@ -1027,8 +1060,9 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, GST_DEBUG_OBJECT (bin, "base %" G_GUINT64_FORMAT ", extrtptime %" G_GUINT64_FORMAT - ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d", base_rtptime, - last_extrtptime, local_rtp, clock_rate); + ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d, " + "clock-base %" G_GINT64_FORMAT, base_rtptime, + last_extrtptime, local_rtp, clock_rate, rtp_clock_base); /* calculate local RTP time in gstreamer timestamp, we essentially perform the * same conversion that a jitterbuffer would use to convert an rtp timestamp @@ -1075,8 +1109,10 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, stream->rt_delta = rtdiff - ntpdiff; stream_set_ts_offset (bin, stream, stream->rt_delta); - } else if (client->nstreams > 1) { - gint64 min; + } else { + gint64 min, rtp_min, clock_base = stream->clock_base; + gboolean all_sync, use_rtp; + gboolean rtcp_sync = g_atomic_int_get (&bin->rtcp_sync); /* calculate delta between server and receiver. last_unix is created by * converting the ntptime in the last SR packet to a gstreamer timestamp. This @@ -1094,19 +1130,104 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, * latencies). * The stream that has the smallest diff is selected as the reference stream, * all other streams will have a positive offset to this difference. */ - min = G_MAXINT64; + + /* some alternative setting allow ignoring RTCP as much as possible, + * for servers generating bogus ntp timeline */ + min = rtp_min = G_MAXINT64; + use_rtp = FALSE; + if (rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) { + guint64 ext_base; + + use_rtp = TRUE; + /* signed version for convienience */ + clock_base = base_rtptime; + /* deal with possible wrap-around */ + ext_base = base_rtptime; + rtp_clock_base = gst_rtp_buffer_ext_timestamp (&ext_base, rtp_clock_base); + /* sanity check; base rtp and provided clock_base should be close */ + if (rtp_clock_base >= clock_base) { + if (rtp_clock_base - clock_base < 10 * clock_rate) { + rtp_clock_base = base_time + + gst_util_uint64_scale_int (rtp_clock_base - clock_base, + GST_SECOND, clock_rate); + } else { + use_rtp = FALSE; + } + } else { + if (clock_base - rtp_clock_base < 10 * clock_rate) { + rtp_clock_base = base_time - + gst_util_uint64_scale_int (clock_base - rtp_clock_base, + GST_SECOND, clock_rate); + } else { + use_rtp = FALSE; + } + } + /* warn and bail for clarity out if no sane values */ + if (!use_rtp) { + GST_WARNING_OBJECT (bin, "unable to sync to provided rtptime"); + return; + } + /* store to track changes */ + clock_base = rtp_clock_base; + /* generate a fake as before, + * now equating rtptime obtained from RTP-Info, + * where the large time represent the otherwise irrelevant npt/ntp time */ + stream->rtp_delta = (GST_SECOND << 28) - rtp_clock_base; + } + for (walk = client->streams; walk; walk = g_slist_next (walk)) { GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data; - if (!ostream->have_sync) + if (!ostream->have_sync) { + all_sync = FALSE; continue; + } + + /* change in current stream's base from previously init'ed value + * leads to reset of all stream's base */ + if (stream != ostream && stream->clock_base >= 0 && + (stream->clock_base != clock_base)) { + GST_DEBUG_OBJECT (bin, "reset upon clock base change"); + ostream->clock_base = -100 * GST_SECOND; + ostream->rtp_delta = 0; + } if (ostream->rt_delta < min) min = ostream->rt_delta; + if (ostream->rtp_delta < rtp_min) + rtp_min = ostream->rtp_delta; } - GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT, client, - min); + /* arrange to re-sync for each stream upon significant change, + * e.g. post-seek */ + all_sync = (stream->clock_base == clock_base); + stream->clock_base = clock_base; + + /* may need init performed above later on, but nothing more to do now */ + if (client->nstreams <= 1) + return; + + GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT + " all sync %d", client, min, all_sync); + GST_DEBUG_OBJECT (bin, "rtcp sync mode %d, use_rtp %d", rtcp_sync, use_rtp); + + switch (rtcp_sync) { + case GST_RTP_BIN_RTCP_SYNC_RTP: + if (!use_rtp) + break; + GST_DEBUG_OBJECT (bin, "using rtp generated reports; " + "client %p min rtp delta %" G_GINT64_FORMAT, client, rtp_min); + /* fall-through */ + case GST_RTP_BIN_RTCP_SYNC_INITIAL: + /* if all have been synced already, do not bother further */ + if (all_sync) { + GST_DEBUG_OBJECT (bin, "all streams already synced; done"); + return; + } + break; + default: + break; + } /* bail out if we adjusted recently enough */ if (all_sync && (last_unix - bin->priv->last_unix) < @@ -1131,7 +1252,10 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, /* calculate offset to our reference stream, this should always give a * positive number. */ - ts_offset = ostream->rt_delta - min; + if (use_rtp) + ts_offset = ostream->rtp_delta - rtp_min; + else + ts_offset = ostream->rt_delta - min; stream_set_ts_offset (bin, ostream, ts_offset); } @@ -1164,6 +1288,7 @@ gst_rtp_bin_handle_sync (GstElement * jitterbuffer, GstStructure * s, guint64 base_rtptime; guint64 base_time; guint clock_rate; + guint64 clock_base; guint64 extrtptime; GstBuffer *buffer; @@ -1179,6 +1304,7 @@ gst_rtp_bin_handle_sync (GstElement * jitterbuffer, GstStructure * s, g_value_get_uint64 (gst_structure_get_value (s, "base-rtptime")); base_time = g_value_get_uint64 (gst_structure_get_value (s, "base-time")); clock_rate = g_value_get_uint (gst_structure_get_value (s, "clock-rate")); + clock_base = g_value_get_uint64 (gst_structure_get_value (s, "clock-base")); extrtptime = g_value_get_uint64 (gst_structure_get_value (s, "sr-ext-rtptime")); buffer = gst_value_get_buffer (gst_structure_get_value (s, "sr-buffer")); @@ -1231,7 +1357,8 @@ gst_rtp_bin_handle_sync (GstElement * jitterbuffer, GstStructure * s, GST_RTP_BIN_LOCK (bin); /* associate the stream to CNAME */ gst_rtp_bin_associate (bin, stream, len, data, - ntptime, extrtptime, base_rtptime, base_time, clock_rate); + ntptime, extrtptime, base_rtptime, base_time, clock_rate, + clock_base); GST_RTP_BIN_UNLOCK (bin); } } @@ -1275,7 +1402,9 @@ create_stream (GstRtpBinSession * session, guint32 ssrc) stream->have_sync = FALSE; stream->rt_delta = 0; + stream->rtp_delta = 0; stream->percent = 100; + stream->clock_base = -100 * GST_SECOND; session->streams = g_slist_prepend (session->streams, stream); /* provide clock_rate to the jitterbuffer when needed */ @@ -1692,6 +1821,19 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) "Synchronize received streams to the NTP clock", DEFAULT_NTP_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** + * GstRtpBin::rtcp-sync: + * + * If not synchronizing (directly) to the NTP clock, determines how to sync + * the various streams. + * + * Since: 0.10.31 + */ + g_object_class_install_property (gobject_class, PROP_RTCP_SYNC, + g_param_spec_enum ("rtcp-sync", "RTCP Sync", + "Use of RTCP SR in synchronization", GST_RTP_BIN_RTCP_SYNC_TYPE, + DEFAULT_RTCP_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** * GstRtpBin::rtcp-sync-interval: * @@ -1734,6 +1876,7 @@ gst_rtp_bin_init (GstRtpBin * rtpbin, GstRtpBinClass * klass) rtpbin->do_lost = DEFAULT_DO_LOST; rtpbin->ignore_pt = DEFAULT_IGNORE_PT; rtpbin->ntp_sync = DEFAULT_NTP_SYNC; + rtpbin->rtcp_sync = DEFAULT_RTCP_SYNC; rtpbin->rtcp_sync_interval = DEFAULT_RTCP_SYNC_INTERVAL; rtpbin->priv->autoremove = DEFAULT_AUTOREMOVE; rtpbin->buffer_mode = DEFAULT_BUFFER_MODE; @@ -1850,6 +1993,9 @@ gst_rtp_bin_set_property (GObject * object, guint prop_id, case PROP_NTP_SYNC: rtpbin->ntp_sync = g_value_get_boolean (value); break; + case PROP_RTCP_SYNC: + g_atomic_int_set (&rtpbin->rtcp_sync, g_value_get_enum (value)); + break; case PROP_RTCP_SYNC_INTERVAL: rtpbin->rtcp_sync_interval = g_value_get_uint (value); break; @@ -1915,6 +2061,9 @@ gst_rtp_bin_get_property (GObject * object, guint prop_id, case PROP_NTP_SYNC: g_value_set_boolean (value, rtpbin->ntp_sync); break; + case PROP_RTCP_SYNC: + g_value_set_enum (value, g_atomic_int_get (&rtpbin->rtcp_sync)); + break; case PROP_RTCP_SYNC_INTERVAL: g_value_set_uint (value, rtpbin->rtcp_sync_interval); break; diff --git a/gst/rtpmanager/gstrtpbin.h b/gst/rtpmanager/gstrtpbin.h index 7cc845c160..a9157871d3 100644 --- a/gst/rtpmanager/gstrtpbin.h +++ b/gst/rtpmanager/gstrtpbin.h @@ -50,6 +50,7 @@ struct _GstRtpBin { gboolean do_lost; gboolean ignore_pt; gboolean ntp_sync; + gint rtcp_sync; guint rtcp_sync_interval; RTPJitterBufferMode buffer_mode; gboolean buffering;