rtpbin: Untangle NTP-based and RTP-Info based stream association

Both were entangled previously and very hard to follow what happens
under which conditions. Now as a very first step the code decides which
of the two cases it is going to apply, and then proceeds accordingly.
This also avoids calculating completely invalid values along the way and
even printing them int the debug output.

Also improve debug output in various places.

This shouldn't cause any behaviour changes.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/6543>
This commit is contained in:
Sebastian Dröge 2024-04-17 13:44:38 +03:00 committed by GStreamer Marge Bot
parent 7d0c7144ba
commit 155c3fb3b2
2 changed files with 165 additions and 150 deletions

View file

@ -403,7 +403,7 @@ gst_rtp_bin_rtcp_sync_get_type (void)
static const GEnumValue rtcp_sync_types[] = {
{GST_RTP_BIN_RTCP_SYNC_ALWAYS, "always", "always"},
{GST_RTP_BIN_RTCP_SYNC_INITIAL, "initial", "initial"},
{GST_RTP_BIN_RTCP_SYNC_RTP, "rtp-info", "rtp-info"},
{GST_RTP_BIN_RTCP_SYNC_RTP_INFO, "rtp-info", "rtp-info"},
{0, NULL, NULL},
};
@ -1562,20 +1562,18 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
/* For all other cases (not RFC7273 and not NTP sync) we have to look how
* all streams of a client relate to each other */
GstRtpBinClient *client;
GSList *walk;
GstClockTime running_time, running_time_rtp;
/* first find or create the CNAME */
client = get_client (bin, len, data);
GstRtpBinClient *client = get_client (bin, len, data);
/* find stream in the client */
GSList *walk;
for (walk = client->streams; walk; walk = g_slist_next (walk)) {
GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
if (ostream == stream)
break;
}
/* not found, add it to the list */
if (walk == NULL) {
GST_DEBUG_OBJECT (bin,
@ -1589,175 +1587,188 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
stream->ssrc, client, client->cname);
}
if (!GST_CLOCK_TIME_IS_VALID (extrtptime)) {
GST_DEBUG_OBJECT (bin, "invalidated sync data");
if (bin->rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
/* we don't need that data, so carry on,
* but make some values look saner */
extrtptime = base_rtptime;
} else {
/* nothing we can do with this data in this case */
GST_DEBUG_OBJECT (bin, "bailing out");
return;
/* First decide whether to do RTP-Info or RTCP / NTP header extension sync */
gboolean rtp_info_sync = FALSE;
GST_DEBUG_OBJECT (bin, "RTCP sync mode %d", bin->rtcp_sync);
switch (bin->rtcp_sync) {
case GST_RTP_BIN_RTCP_SYNC_ALWAYS:
case GST_RTP_BIN_RTCP_SYNC_INITIAL:{
if (!GST_CLOCK_TIME_IS_VALID (extrtptime)
|| !GST_CLOCK_TIME_IS_VALID (ntpnstime)
|| extrtptime < base_rtptime) {
GST_DEBUG_OBJECT (bin, "invalidated sync data, bailing out");
return;
}
GST_DEBUG_OBJECT (bin, "Doing RTCP sync");
break;
}
case GST_RTP_BIN_RTCP_SYNC_RTP_INFO:{
rtp_info_sync = TRUE;
GST_DEBUG_OBJECT (bin, "Doing RTP-Info sync");
break;
}
}
/* Take the extended rtptime we found in the SR packet and map it to the
* local rtptime. The local rtp time is used to construct timestamps on the
* buffers so we will calculate what running_time corresponds to the RTP
* timestamp in the SR packet. */
running_time_rtp = extrtptime - base_rtptime;
GST_DEBUG_OBJECT (bin,
"base %" G_GUINT64_FORMAT ", extrtptime %" G_GUINT64_FORMAT
", local RTP %" G_GUINT64_FORMAT ", clock-rate %d, "
"clock-base %" G_GINT64_FORMAT, base_rtptime,
extrtptime, running_time_rtp, clock_rate, rtp_clock_base);
/* calculate local RTP time in gstreamer timestamp, we essentially perform the
* same conversion that a jitterbuffer would use to convert an rtp timestamp
* into a corresponding gstreamer timestamp. Note that the base_time also
* contains the drift between sender and receiver. */
running_time =
gst_util_uint64_scale_int (running_time_rtp, GST_SECOND, clock_rate);
running_time += base_time;
gboolean all_sync = TRUE;
gint64 min = G_MAXINT64, rtp_min = G_MAXINT64;
stream->have_sync = TRUE;
GST_DEBUG_OBJECT (bin,
"SR RTP running time %" G_GUINT64_FORMAT ", SR NTP %" G_GUINT64_FORMAT,
running_time, ntpnstime);
/* recalc inter stream playout offset based on the selected mode */
if (rtp_info_sync) {
guint64 ext_base = -1;
guint64 rtp_clock_base_ext;
gint64 diff_rtp;
/* recalc inter stream playout offset, but only if there is more than one stream. */
gint64 min, rtp_min, clock_base;
gboolean all_sync, use_rtp;
gboolean rtcp_sync = g_atomic_int_get (&bin->rtcp_sync);
GST_DEBUG_OBJECT (bin,
"base RTP time %" G_GUINT64_FORMAT ", clock-rate %d, " "clock-base %"
G_GUINT64_FORMAT, base_rtptime, clock_rate, rtp_clock_base);
/* calculate delta between server and receiver. ntpnstime is created by
* converting the ntptime in the last SR packet to a gstreamer timestamp. This
* delta expresses the difference to our timeline and the server timeline. The
* difference in itself doesn't mean much but we can combine the delta of
* multiple streams to create a stream specific offset. */
stream->rt_delta = ntpnstime - running_time;
/* convert to extended RTP time */
rtp_clock_base_ext =
gst_rtp_buffer_ext_timestamp (&ext_base, rtp_clock_base);
/* calculate the min of all deltas, ignoring streams that did not yet have a
* valid rt_delta because we did not yet receive an SR packet for those
* streams.
if (rtp_clock_base_ext >= base_rtptime) {
diff_rtp =
gst_util_uint64_scale_int (rtp_clock_base_ext - base_rtptime,
GST_SECOND, clock_rate);
} else {
diff_rtp =
-gst_util_uint64_scale_int (base_rtptime - rtp_clock_base_ext,
GST_SECOND, clock_rate);
}
GST_DEBUG_OBJECT (bin, "RTP time difference %" GST_STIME_FORMAT,
GST_STIME_ARGS (diff_rtp));
/* Calculate the difference between NPT start (0) and the local time */
stream->rtp_delta = 0 - (base_time + diff_rtp);
for (walk = client->streams; walk; walk = g_slist_next (walk)) {
GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
/* if not set yet for the other stream then ignore it */
if (!ostream->have_sync) {
all_sync = FALSE;
continue;
}
/* change in current stream's base from previously init'ed value leads
* to reset of all stream's base, e.g. after a seek */
if (stream != ostream && stream->clock_base >= 0 &&
(stream->clock_base != rtp_clock_base)) {
GST_DEBUG_OBJECT (bin, "reset upon clock base change");
ostream->clock_base = -100 * GST_SECOND;
ostream->rtp_delta = 0;
}
if (ostream->rtp_delta < rtp_min)
rtp_min = ostream->rtp_delta;
}
} else {
GstClockTime rtp_running_time;
guint64 diff_rtp;
/* Take the extended rtptime we found in the SR packet and map it to the
* local rtptime. The local rtp time is used to construct timestamps on the
* buffers so we will calculate what running_time corresponds to the RTP
* timestamp in the SR packet.
*
* Note: this is always positive!
*/
diff_rtp = extrtptime - base_rtptime;
GST_DEBUG_OBJECT (bin,
"base RTP time %" G_GUINT64_FORMAT ", SR RTP time %" G_GUINT64_FORMAT
", RTP time difference %" G_GUINT64_FORMAT ", clock-rate %d",
base_rtptime, extrtptime, diff_rtp, clock_rate);
/* calculate local RTP time in gstreamer timestamp, we essentially perform the
* same conversion that a jitterbuffer would use to convert an rtp timestamp
* into a corresponding gstreamer timestamp. Note that the base_time also
* contains the drift between sender and receiver. */
rtp_running_time =
gst_util_uint64_scale_int (diff_rtp, GST_SECOND, clock_rate);
rtp_running_time += base_time;
GST_DEBUG_OBJECT (bin,
"SR RTP running time %" GST_TIME_FORMAT ", SR NTP %" GST_TIME_FORMAT,
GST_TIME_ARGS (rtp_running_time), GST_TIME_ARGS (ntpnstime));
/* calculate delta between server and receiver. ntpnstime is created by
* converting the ntptime in the last SR packet to a gstreamer timestamp. This
* delta expresses the difference to our timeline and the server timeline. The
* difference in itself doesn't mean much but we can combine the delta of
* multiple streams to create a stream specific offset.
*/
stream->rt_delta = ntpnstime - rtp_running_time;
for (walk = client->streams; walk; walk = g_slist_next (walk)) {
GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
/* if not set yet for the other stream then ignore it */
if (!ostream->have_sync) {
all_sync = FALSE;
continue;
}
if (ostream->rt_delta < min)
min = ostream->rt_delta;
}
}
/* Above we calculated the min of all deltas for either mode, ignoring
* streams that did not yet have a valid delta because we did not yet
* receive an SR packet for those streams yet (RTCP mode) or no RTP base
* time (RTP-Info mode).
*
* We calculate the minimum because we would like to only apply positive
* offsets to streams, delaying their playback instead of trying to speed up
* other streams (which might be impossible when we have to create negative
* latencies).
*
* The stream that has the smallest diff is selected as the reference stream,
* all other streams will have a positive offset to this difference. */
* all other streams will have a positive offset to this difference.
*
* we also assume that RTP-Info and RTCP sync give the same results.
*/
/* some alternative setting allow ignoring RTCP as much as possible,
* for servers generating bogus ntp timeline */
min = rtp_min = G_MAXINT64;
use_rtp = FALSE;
if (rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
guint64 ext_base = -1;
gint64 rtp_delta = 0;
use_rtp = TRUE;
/* convert to extended RTP time */
rtp_clock_base = gst_rtp_buffer_ext_timestamp (&ext_base, rtp_clock_base);
/* sanity check; base rtp and provided clock_base should be close */
if (rtp_clock_base >= base_rtptime) {
if (rtp_clock_base - base_rtptime < 10 * clock_rate) {
rtp_delta = base_time +
gst_util_uint64_scale_int (rtp_clock_base - base_rtptime,
GST_SECOND, clock_rate);
} else {
use_rtp = FALSE;
}
} else {
if (base_rtptime - rtp_clock_base < 10 * clock_rate) {
rtp_delta = base_time -
gst_util_uint64_scale_int (base_rtptime - rtp_clock_base,
GST_SECOND, clock_rate);
} else {
use_rtp = FALSE;
}
}
/* warn and bail for clarity out if no sane values */
if (!use_rtp) {
GST_WARNING_OBJECT (bin, "unable to sync to provided rtptime");
return;
}
/* store to track changes */
clock_base = rtp_delta;
/* generate a fake as before,
* now equating rtptime obtained from RTP-Info,
* where the large time represent the otherwise irrelevant npt/ntp time */
stream->rtp_delta = (GST_SECOND << 28) - rtp_delta;
} else {
clock_base = rtp_clock_base;
}
all_sync = TRUE;
for (walk = client->streams; walk; walk = g_slist_next (walk)) {
GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
if (!ostream->have_sync) {
all_sync = FALSE;
continue;
}
/* change in current stream's base from previously init'ed value
* leads to reset of all stream's base */
if (stream != ostream && stream->clock_base >= 0 &&
(stream->clock_base != clock_base)) {
GST_DEBUG_OBJECT (bin, "reset upon clock base change");
ostream->clock_base = -100 * GST_SECOND;
ostream->rtp_delta = 0;
}
if (ostream->rt_delta < min)
min = ostream->rt_delta;
if (ostream->rtp_delta < rtp_min)
rtp_min = ostream->rtp_delta;
}
/* arrange to re-sync for each stream upon significant change,
* e.g. post-seek */
all_sync = all_sync && (stream->clock_base == clock_base);
stream->clock_base = clock_base;
/* arrange to re-sync for each stream upon significant change, e.g. post-seek
* or when the RTP base time changes because of a jitterbuffer reset */
all_sync = all_sync && stream->clock_base == rtp_clock_base;
stream->clock_base = rtp_clock_base;
/* may need init performed above later on, but nothing more to do now */
if (client->nstreams <= 1)
return;
GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT
" all sync %d", client, min, all_sync);
GST_DEBUG_OBJECT (bin, "rtcp sync mode %d, use_rtp %d", rtcp_sync, use_rtp);
GST_DEBUG_OBJECT (bin,
"client %p RTP-Info sync %d, min delta %" G_GINT64_FORMAT
", min RTP delta %" G_GINT64_FORMAT ", all sync %d", client,
rtp_info_sync, min, rtp_min, all_sync);
switch (rtcp_sync) {
case GST_RTP_BIN_RTCP_SYNC_RTP:
if (!use_rtp)
break;
GST_DEBUG_OBJECT (bin, "using rtp generated reports; "
"client %p min rtp delta %" G_GINT64_FORMAT, client, rtp_min);
/* fall-through */
case GST_RTP_BIN_RTCP_SYNC_INITIAL:
/* if all have been synced already, do not bother further */
if (all_sync) {
GST_DEBUG_OBJECT (bin, "all streams already synced; done");
return;
}
break;
default:
break;
/* if all have been synced already, do not bother further */
if (bin->rtcp_sync == GST_RTP_BIN_RTCP_SYNC_INITIAL && all_sync) {
GST_DEBUG_OBJECT (bin, "all streams already synced; done");
return;
}
/* bail out if we adjusted recently enough */
if (all_sync && (ntpnstime - bin->priv->last_ntpnstime) <
if (all_sync && !rtp_info_sync && GST_CLOCK_TIME_IS_VALID (ntpnstime)
&& GST_CLOCK_TIME_IS_VALID (bin->priv->last_ntpnstime)
&& ntpnstime - bin->priv->last_ntpnstime <
bin->rtcp_sync_interval * GST_MSECOND) {
GST_DEBUG_OBJECT (bin, "discarding RTCP sender packet for sync; "
"previous sender info too recent "
"(previous NTP %" G_GUINT64_FORMAT ")", bin->priv->last_ntpnstime);
GST_DEBUG_OBJECT (bin,
"discarding RTCP sender packet for sync; "
"previous sender info too recent " "(previous NTP %" G_GUINT64_FORMAT
")", bin->priv->last_ntpnstime);
return;
}
bin->priv->last_ntpnstime = ntpnstime;
if (!rtp_info_sync)
bin->priv->last_ntpnstime = ntpnstime;
/* calculate offsets for each stream */
for (walk = client->streams; walk; walk = g_slist_next (walk)) {
@ -1772,11 +1783,15 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
/* calculate offset to our reference stream, this should always give a
* positive number. */
if (use_rtp)
if (rtp_info_sync)
ts_offset = ostream->rtp_delta - rtp_min;
else
ts_offset = ostream->rt_delta - min;
GST_DEBUG_OBJECT (bin,
"Calculated ts-offset %" GST_STIME_FORMAT " for SSRC %08x",
GST_STIME_ARGS (ts_offset), ostream->ssrc);
stream_set_ts_offset (bin, ostream, ts_offset, bin->max_ts_offset,
bin->min_ts_offset, TRUE);
}

View file

@ -41,7 +41,7 @@ typedef enum
{
GST_RTP_BIN_RTCP_SYNC_ALWAYS,
GST_RTP_BIN_RTCP_SYNC_INITIAL,
GST_RTP_BIN_RTCP_SYNC_RTP
GST_RTP_BIN_RTCP_SYNC_RTP_INFO
} GstRTCPSync;
typedef struct _GstRtpBin GstRtpBin;