rtpbin: Handle ntp-sync=true before everything else

This simplifies the code as it's a much simpler case than the normal
inter-stream synchronization, and interleaving it with that only
reduces readability of the code.

Also improve some debug output in this code path.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/6543>
This commit is contained in:
Sebastian Dröge 2024-04-17 13:30:42 +03:00 committed by GStreamer Marge Bot
parent 4b0e75a094
commit 4421c3de75

View file

@ -1481,6 +1481,88 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
return;
}
/* In case of NTP sync we can directly calculate the offset for this stream
* here and return immediately */
if (bin->ntp_sync) {
GstClockTime rtp_running_time, diff_rtp;
GstClockTime local_running_time, local_ntpnstime;
gint64 ntpdiff, rtdiff;
GST_DEBUG_OBJECT (bin, "Doing NTP sync");
if (!GST_CLOCK_TIME_IS_VALID (extrtptime)
|| !GST_CLOCK_TIME_IS_VALID (ntpnstime)
|| extrtptime < base_rtptime) {
GST_DEBUG_OBJECT (bin, "invalidated sync data, bailing out");
return;
}
/* Take the extended rtptime we found in the SR packet and map it to the
* local rtptime. The local rtp time is used to construct timestamps on the
* buffers so we will calculate what running_time corresponds to the RTP
* timestamp in the SR packet. */
diff_rtp = extrtptime - base_rtptime;
GST_DEBUG_OBJECT (bin,
"base RTP time %" G_GUINT64_FORMAT ", SR RTP time %" G_GUINT64_FORMAT
", RTP time difference %" G_GUINT64_FORMAT ", clock-rate %d",
base_rtptime, extrtptime, diff_rtp, clock_rate);
/* calculate local RTP time in GStreamer timestamp units, we essentially
* perform the same conversion that a jitterbuffer would use to convert an
* rtp timestamp into a corresponding gstreamer timestamp. Note that the
* base_time also contains the drift between sender and receiver. */
rtp_running_time =
gst_util_uint64_scale_int (diff_rtp, GST_SECOND, clock_rate);
rtp_running_time += base_time;
GST_DEBUG_OBJECT (bin,
"RTP running time %" GST_TIME_FORMAT ", SR NTP time %" GST_TIME_FORMAT,
GST_TIME_ARGS (rtp_running_time), GST_TIME_ARGS (ntpnstime));
/* For NTP sync we need to first get a snapshot of running_time and NTP
* time. We know at what running_time we play a certain RTP time, we also
* calculated when we would play the RTP time in the SR packet. Now we need
* to know how the running_time and the NTP time relate to each other. */
get_current_times (bin, &local_running_time, &local_ntpnstime);
/* see how far away the NTP time is. This is the difference between the
* current NTP time and the NTP time in the last SR packet. */
ntpdiff = local_ntpnstime - ntpnstime;
/* see how far away the running_time is. This is the difference between the
* current running_time and the running_time of the RTP timestamp in the
* last SR packet. */
rtdiff = local_running_time - rtp_running_time;
GST_DEBUG_OBJECT (bin,
"local NTP time %" G_GUINT64_FORMAT ", SR NTP time %" G_GUINT64_FORMAT,
local_ntpnstime, ntpnstime);
GST_DEBUG_OBJECT (bin,
"local running time %" G_GUINT64_FORMAT ", SR RTP running time %"
G_GUINT64_FORMAT, local_running_time, rtp_running_time);
GST_DEBUG_OBJECT (bin,
"NTP diff %" G_GINT64_FORMAT ", RT diff %" G_GINT64_FORMAT, ntpdiff,
rtdiff);
/* combine to get the final diff to apply to the running_time */
stream->rt_delta = rtdiff - ntpdiff;
GST_DEBUG_OBJECT (bin,
"Calculated ts-offset %" GST_STIME_FORMAT " for SSRC %08x",
GST_STIME_ARGS (stream->rt_delta), stream->ssrc);
stream_set_ts_offset (bin, stream, stream->rt_delta, bin->max_ts_offset,
bin->min_ts_offset, FALSE);
gst_rtp_bin_send_sync_event (stream);
return;
}
/* For all other cases (not RFC7273 and not NTP sync) we have to look how
* all streams of a client relate to each other */
GstRtpBinClient *client;
gboolean created;
GSList *walk;
@ -1548,195 +1630,159 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
"SR RTP running time %" G_GUINT64_FORMAT ", SR NTP %" G_GUINT64_FORMAT,
running_time, ntpnstime);
/* recalc inter stream playout offset, but only if there is more than one
* stream or we're doing NTP sync. */
if (bin->ntp_sync) {
gint64 ntpdiff, rtdiff;
guint64 local_ntpnstime;
GstClockTime local_running_time;
/* recalc inter stream playout offset, but only if there is more than one stream. */
gint64 min, rtp_min, clock_base;
gboolean all_sync, use_rtp;
gboolean rtcp_sync = g_atomic_int_get (&bin->rtcp_sync);
/* For NTP sync we need to first get a snapshot of running_time and NTP
* time. We know at what running_time we play a certain RTP time, we also
* calculated when we would play the RTP time in the SR packet. Now we need
* to know how the running_time and the NTP time relate to each other. */
get_current_times (bin, &local_running_time, &local_ntpnstime);
/* calculate delta between server and receiver. ntpnstime is created by
* converting the ntptime in the last SR packet to a gstreamer timestamp. This
* delta expresses the difference to our timeline and the server timeline. The
* difference in itself doesn't mean much but we can combine the delta of
* multiple streams to create a stream specific offset. */
stream->rt_delta = ntpnstime - running_time;
/* see how far away the NTP time is. This is the difference between the
* current NTP time and the NTP time in the last SR packet. */
ntpdiff = local_ntpnstime - ntpnstime;
/* see how far away the running_time is. This is the difference between the
* current running_time and the running_time of the RTP timestamp in the
* last SR packet. */
rtdiff = local_running_time - running_time;
/* calculate the min of all deltas, ignoring streams that did not yet have a
* valid rt_delta because we did not yet receive an SR packet for those
* streams.
* We calculate the minimum because we would like to only apply positive
* offsets to streams, delaying their playback instead of trying to speed up
* other streams (which might be impossible when we have to create negative
* latencies).
* The stream that has the smallest diff is selected as the reference stream,
* all other streams will have a positive offset to this difference. */
GST_DEBUG_OBJECT (bin,
"local NTP time %" G_GUINT64_FORMAT ", SR NTP time %" G_GUINT64_FORMAT,
local_ntpnstime, ntpnstime);
GST_DEBUG_OBJECT (bin,
"local running time %" G_GUINT64_FORMAT ", SR RTP running time %"
G_GUINT64_FORMAT, local_running_time, running_time);
GST_DEBUG_OBJECT (bin,
"NTP diff %" G_GINT64_FORMAT ", RT diff %" G_GINT64_FORMAT, ntpdiff,
rtdiff);
/* some alternative setting allow ignoring RTCP as much as possible,
* for servers generating bogus ntp timeline */
min = rtp_min = G_MAXINT64;
use_rtp = FALSE;
if (rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
guint64 ext_base = -1;
gint64 rtp_delta = 0;
/* combine to get the final diff to apply to the running_time */
stream->rt_delta = rtdiff - ntpdiff;
stream_set_ts_offset (bin, stream, stream->rt_delta, bin->max_ts_offset,
bin->min_ts_offset, FALSE);
} else {
gint64 min, rtp_min, clock_base;
gboolean all_sync, use_rtp;
gboolean rtcp_sync = g_atomic_int_get (&bin->rtcp_sync);
/* calculate delta between server and receiver. ntpnstime is created by
* converting the ntptime in the last SR packet to a gstreamer timestamp. This
* delta expresses the difference to our timeline and the server timeline. The
* difference in itself doesn't mean much but we can combine the delta of
* multiple streams to create a stream specific offset. */
stream->rt_delta = ntpnstime - running_time;
/* calculate the min of all deltas, ignoring streams that did not yet have a
* valid rt_delta because we did not yet receive an SR packet for those
* streams.
* We calculate the minimum because we would like to only apply positive
* offsets to streams, delaying their playback instead of trying to speed up
* other streams (which might be impossible when we have to create negative
* latencies).
* The stream that has the smallest diff is selected as the reference stream,
* all other streams will have a positive offset to this difference. */
/* some alternative setting allow ignoring RTCP as much as possible,
* for servers generating bogus ntp timeline */
min = rtp_min = G_MAXINT64;
use_rtp = FALSE;
if (rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
guint64 ext_base = -1;
gint64 rtp_delta = 0;
use_rtp = TRUE;
/* convert to extended RTP time */
rtp_clock_base = gst_rtp_buffer_ext_timestamp (&ext_base, rtp_clock_base);
/* sanity check; base rtp and provided clock_base should be close */
if (rtp_clock_base >= base_rtptime) {
if (rtp_clock_base - base_rtptime < 10 * clock_rate) {
rtp_delta = base_time +
gst_util_uint64_scale_int (rtp_clock_base - base_rtptime,
GST_SECOND, clock_rate);
} else {
use_rtp = FALSE;
}
use_rtp = TRUE;
/* convert to extended RTP time */
rtp_clock_base = gst_rtp_buffer_ext_timestamp (&ext_base, rtp_clock_base);
/* sanity check; base rtp and provided clock_base should be close */
if (rtp_clock_base >= base_rtptime) {
if (rtp_clock_base - base_rtptime < 10 * clock_rate) {
rtp_delta = base_time +
gst_util_uint64_scale_int (rtp_clock_base - base_rtptime,
GST_SECOND, clock_rate);
} else {
if (base_rtptime - rtp_clock_base < 10 * clock_rate) {
rtp_delta = base_time -
gst_util_uint64_scale_int (base_rtptime - rtp_clock_base,
GST_SECOND, clock_rate);
} else {
use_rtp = FALSE;
}
use_rtp = FALSE;
}
/* warn and bail for clarity out if no sane values */
if (!use_rtp) {
GST_WARNING_OBJECT (bin, "unable to sync to provided rtptime");
} else {
if (base_rtptime - rtp_clock_base < 10 * clock_rate) {
rtp_delta = base_time -
gst_util_uint64_scale_int (base_rtptime - rtp_clock_base,
GST_SECOND, clock_rate);
} else {
use_rtp = FALSE;
}
}
/* warn and bail for clarity out if no sane values */
if (!use_rtp) {
GST_WARNING_OBJECT (bin, "unable to sync to provided rtptime");
return;
}
/* store to track changes */
clock_base = rtp_delta;
/* generate a fake as before,
* now equating rtptime obtained from RTP-Info,
* where the large time represent the otherwise irrelevant npt/ntp time */
stream->rtp_delta = (GST_SECOND << 28) - rtp_delta;
} else {
clock_base = rtp_clock_base;
}
all_sync = TRUE;
for (walk = client->streams; walk; walk = g_slist_next (walk)) {
GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
if (!ostream->have_sync) {
all_sync = FALSE;
continue;
}
/* change in current stream's base from previously init'ed value
* leads to reset of all stream's base */
if (stream != ostream && stream->clock_base >= 0 &&
(stream->clock_base != clock_base)) {
GST_DEBUG_OBJECT (bin, "reset upon clock base change");
ostream->clock_base = -100 * GST_SECOND;
ostream->rtp_delta = 0;
}
if (ostream->rt_delta < min)
min = ostream->rt_delta;
if (ostream->rtp_delta < rtp_min)
rtp_min = ostream->rtp_delta;
}
/* arrange to re-sync for each stream upon significant change,
* e.g. post-seek */
all_sync = all_sync && (stream->clock_base == clock_base);
stream->clock_base = clock_base;
/* may need init performed above later on, but nothing more to do now */
if (client->nstreams <= 1)
return;
GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT
" all sync %d", client, min, all_sync);
GST_DEBUG_OBJECT (bin, "rtcp sync mode %d, use_rtp %d", rtcp_sync, use_rtp);
switch (rtcp_sync) {
case GST_RTP_BIN_RTCP_SYNC_RTP:
if (!use_rtp)
break;
GST_DEBUG_OBJECT (bin, "using rtp generated reports; "
"client %p min rtp delta %" G_GINT64_FORMAT, client, rtp_min);
/* fall-through */
case GST_RTP_BIN_RTCP_SYNC_INITIAL:
/* if all have been synced already, do not bother further */
if (all_sync) {
GST_DEBUG_OBJECT (bin, "all streams already synced; done");
return;
}
/* store to track changes */
clock_base = rtp_delta;
/* generate a fake as before,
* now equating rtptime obtained from RTP-Info,
* where the large time represent the otherwise irrelevant npt/ntp time */
stream->rtp_delta = (GST_SECOND << 28) - rtp_delta;
} else {
clock_base = rtp_clock_base;
}
all_sync = TRUE;
for (walk = client->streams; walk; walk = g_slist_next (walk)) {
GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
if (!ostream->have_sync) {
all_sync = FALSE;
continue;
}
/* change in current stream's base from previously init'ed value
* leads to reset of all stream's base */
if (stream != ostream && stream->clock_base >= 0 &&
(stream->clock_base != clock_base)) {
GST_DEBUG_OBJECT (bin, "reset upon clock base change");
ostream->clock_base = -100 * GST_SECOND;
ostream->rtp_delta = 0;
}
if (ostream->rt_delta < min)
min = ostream->rt_delta;
if (ostream->rtp_delta < rtp_min)
rtp_min = ostream->rtp_delta;
}
/* arrange to re-sync for each stream upon significant change,
* e.g. post-seek */
all_sync = all_sync && (stream->clock_base == clock_base);
stream->clock_base = clock_base;
/* may need init performed above later on, but nothing more to do now */
if (client->nstreams <= 1)
return;
GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT
" all sync %d", client, min, all_sync);
GST_DEBUG_OBJECT (bin, "rtcp sync mode %d, use_rtp %d", rtcp_sync, use_rtp);
switch (rtcp_sync) {
case GST_RTP_BIN_RTCP_SYNC_RTP:
if (!use_rtp)
break;
GST_DEBUG_OBJECT (bin, "using rtp generated reports; "
"client %p min rtp delta %" G_GINT64_FORMAT, client, rtp_min);
/* fall-through */
case GST_RTP_BIN_RTCP_SYNC_INITIAL:
/* if all have been synced already, do not bother further */
if (all_sync) {
GST_DEBUG_OBJECT (bin, "all streams already synced; done");
return;
}
break;
default:
break;
}
/* bail out if we adjusted recently enough */
if (all_sync && (ntpnstime - bin->priv->last_ntpnstime) <
bin->rtcp_sync_interval * GST_MSECOND) {
GST_DEBUG_OBJECT (bin, "discarding RTCP sender packet for sync; "
"previous sender info too recent "
"(previous NTP %" G_GUINT64_FORMAT ")", bin->priv->last_ntpnstime);
return;
}
bin->priv->last_ntpnstime = ntpnstime;
/* calculate offsets for each stream */
for (walk = client->streams; walk; walk = g_slist_next (walk)) {
GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
gint64 ts_offset;
/* ignore streams for which we didn't receive an SR packet yet, we
* can't synchronize them yet. We can however sync other streams just
* fine. */
if (!ostream->have_sync)
continue;
/* calculate offset to our reference stream, this should always give a
* positive number. */
if (use_rtp)
ts_offset = ostream->rtp_delta - rtp_min;
else
ts_offset = ostream->rt_delta - min;
stream_set_ts_offset (bin, ostream, ts_offset, bin->max_ts_offset,
bin->min_ts_offset, TRUE);
}
break;
default:
break;
}
/* bail out if we adjusted recently enough */
if (all_sync && (ntpnstime - bin->priv->last_ntpnstime) <
bin->rtcp_sync_interval * GST_MSECOND) {
GST_DEBUG_OBJECT (bin, "discarding RTCP sender packet for sync; "
"previous sender info too recent "
"(previous NTP %" G_GUINT64_FORMAT ")", bin->priv->last_ntpnstime);
return;
}
bin->priv->last_ntpnstime = ntpnstime;
/* calculate offsets for each stream */
for (walk = client->streams; walk; walk = g_slist_next (walk)) {
GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
gint64 ts_offset;
/* ignore streams for which we didn't receive an SR packet yet, we
* can't synchronize them yet. We can however sync other streams just
* fine. */
if (!ostream->have_sync)
continue;
/* calculate offset to our reference stream, this should always give a
* positive number. */
if (use_rtp)
ts_offset = ostream->rtp_delta - rtp_min;
else
ts_offset = ostream->rt_delta - min;
stream_set_ts_offset (bin, ostream, ts_offset, bin->max_ts_offset,
bin->min_ts_offset, TRUE);
}
gst_rtp_bin_send_sync_event (stream);
return;