From 6a8d2c45f6d0b0d8d2ef25a639ff35787af4544e Mon Sep 17 00:00:00 2001 From: Thibault Saunier Date: Thu, 23 Feb 2012 14:46:09 -0300 Subject: [PATCH] mpegtspacketizer: Estimate clock skew and retimestamps buffers more precisly Apply the EPTLA algotithm to estimate clock skew. Reusing code from -good/gst/rtpmanager/rtpjitterbuffer.c --- gst/mpegtsdemux/mpegtsbase.c | 3 + gst/mpegtsdemux/mpegtspacketizer.c | 314 +++++++++++++++++++++++++++++ gst/mpegtsdemux/mpegtspacketizer.h | 19 ++ 3 files changed, 336 insertions(+) diff --git a/gst/mpegtsdemux/mpegtsbase.c b/gst/mpegtsdemux/mpegtsbase.c index 2b480c02cf..193e84ac70 100644 --- a/gst/mpegtsdemux/mpegtsbase.c +++ b/gst/mpegtsdemux/mpegtsbase.c @@ -1258,6 +1258,9 @@ query_upstream_latency (MpegTSBase * base) GST_WARNING_OBJECT (base, "Failed to query upstream latency"); gst_query_unref (query); base->query_latency = TRUE; + + /* Calculate clock skew for live streams only */ + base->packetizer->calculate_skew = base->upstream_live; } static inline GstFlowReturn diff --git a/gst/mpegtsdemux/mpegtspacketizer.c b/gst/mpegtsdemux/mpegtspacketizer.c index 85eac76628..a7ce40f79f 100644 --- a/gst/mpegtsdemux/mpegtspacketizer.c +++ b/gst/mpegtsdemux/mpegtspacketizer.c @@ -28,6 +28,8 @@ * with newer GLib versions (>= 2.31.0) */ #define GLIB_DISABLE_DEPRECATION_WARNINGS +/* Skew calculation pameters */ +#define MAX_TIME (2 * GST_SECOND) #include "mpegtspacketizer.h" #include "gstmpegdesc.h" @@ -78,6 +80,9 @@ static gchar *convert_to_utf8 (const gchar * text, gint length, guint start, static gchar *get_encoding (const gchar * text, guint * start_text, gboolean * is_multibyte); static gchar *get_encoding_and_convert (const gchar * text, guint length); +static GstClockTime calculate_skew (MpegTSPacketizer2 * packetizer, + guint32 pcrtime, GstClockTime time); +static void mpegts_packetizer_reset_skew (MpegTSPacketizer2 * packetizer); #define CONTINUITY_UNSET 255 #define MAX_CONTINUITY 15 @@ -165,6 +170,8 @@ mpegts_packetizer_init (MpegTSPacketizer2 * packetizer) packetizer->empty = TRUE; packetizer->streams = g_new0 (MpegTSPacketizerStream *, 8192); packetizer->know_packet_size = FALSE; + packetizer->calculate_skew = FALSE; + mpegts_packetizer_reset_skew (packetizer); } static void @@ -262,6 +269,9 @@ mpegts_packetizer_parse_adaptation_field_control (MpegTSPacketizer2 * /* PCR */ if (afcflags & MPEGTS_AFC_PCR_FLAG) { packet->pcr = mpegts_packetizer_compute_pcr (data); + if (packetizer->calculate_skew) + packet->pcr = calculate_skew (packetizer, packet->pcr, + GST_BUFFER_TIMESTAMP (packet->buffer)); *data += 6; } @@ -2736,3 +2746,307 @@ failed: return g_strndup (text, length - start_text); } } + +/** + * mpegts_packetizer_reset_skew: + * @packetizer: an #MpegTSPacketizer2 + * + * Reset the skew calculations in @packetizer. + */ +static void +mpegts_packetizer_reset_skew (MpegTSPacketizer2 * packetizer) +{ + packetizer->base_time = GST_CLOCK_TIME_NONE; + packetizer->base_pcrtime = GST_CLOCK_TIME_NONE; + packetizer->last_pcrtime = GST_CLOCK_TIME_NONE; + packetizer->window_pos = 0; + packetizer->window_filling = TRUE; + packetizer->window_min = 0; + packetizer->skew = 0; + packetizer->prev_send_diff = GST_CLOCK_TIME_NONE; + packetizer->prev_out_time = GST_CLOCK_TIME_NONE; + GST_DEBUG ("reset skew correction"); +} + +static void +mpegts_packetizer_resync (MpegTSPacketizer2 * packetizer, GstClockTime time, + GstClockTime gstpcrtime, gboolean reset_skew) +{ + packetizer->base_time = time; + packetizer->base_pcrtime = gstpcrtime; + packetizer->prev_out_time = GST_CLOCK_TIME_NONE; + packetizer->prev_send_diff = GST_CLOCK_TIME_NONE; + if (reset_skew) { + packetizer->window_filling = TRUE; + packetizer->window_pos = 0; + packetizer->window_min = 0; + packetizer->window_size = 0; + packetizer->skew = 0; + } +} + + +/* Code mostly copied from -good/gst/rtpmanager/rtpjitterbuffer.c */ + +/* For the clock skew we use a windowed low point averaging algorithm as can be + * found in Fober, Orlarey and Letz, 2005, "Real Time Clock Skew Estimation + * over Network Delays": + * http://www.grame.fr/Ressources/pub/TR-050601.pdf + * http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.102.1546 + * + * The idea is that the jitter is composed of: + * + * J = N + n + * + * N : a constant network delay. + * n : random added noise. The noise is concentrated around 0 + * + * In the receiver we can track the elapsed time at the sender with: + * + * send_diff(i) = (Tsi - Ts0); + * + * Tsi : The time at the sender at packet i + * Ts0 : The time at the sender at the first packet + * + * This is the difference between the RTP timestamp in the first received packet + * and the current packet. + * + * At the receiver we have to deal with the jitter introduced by the network. + * + * recv_diff(i) = (Tri - Tr0) + * + * Tri : The time at the receiver at packet i + * Tr0 : The time at the receiver at the first packet + * + * Both of these values contain a jitter Ji, a jitter for packet i, so we can + * write: + * + * recv_diff(i) = (Cri + D + ni) - (Cr0 + D + n0)) + * + * Cri : The time of the clock at the receiver for packet i + * D + ni : The jitter when receiving packet i + * + * We see that the network delay is irrelevant here as we can elliminate D: + * + * recv_diff(i) = (Cri + ni) - (Cr0 + n0)) + * + * The drift is now expressed as: + * + * Drift(i) = recv_diff(i) - send_diff(i); + * + * We now keep the W latest values of Drift and find the minimum (this is the + * one with the lowest network jitter and thus the one which is least affected + * by it). We average this lowest value to smooth out the resulting network skew. + * + * Both the window and the weighting used for averaging influence the accuracy + * of the drift estimation. Finding the correct parameters turns out to be a + * compromise between accuracy and inertia. + * + * We use a 2 second window or up to 512 data points, which is statistically big + * enough to catch spikes (FIXME, detect spikes). + * We also use a rather large weighting factor (125) to smoothly adapt. During + * startup, when filling the window, we use a parabolic weighting factor, the + * more the window is filled, the faster we move to the detected possible skew. + * + * Returns: @time adjusted with the clock skew. + */ +static GstClockTime +calculate_skew (MpegTSPacketizer2 * packetizer, guint32 pcrtime, + GstClockTime time) +{ + guint64 send_diff, recv_diff; + gint64 delta; + gint64 old; + gint pos, i; + GstClockTime gstpcrtime, out_time; + guint64 slope; + + gstpcrtime = PCRTIME_TO_GSTTIME (pcrtime); + + /* keep track of the last extended pcrtime */ + packetizer->last_pcrtime = gstpcrtime; + + /* first time, lock on to time and gstpcrtime */ + if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (packetizer->base_time))) { + packetizer->base_time = time; + packetizer->prev_out_time = GST_CLOCK_TIME_NONE; + GST_DEBUG ("Taking new base time %" GST_TIME_FORMAT, GST_TIME_ARGS (time)); + } + + if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (packetizer->base_pcrtime))) { + packetizer->base_pcrtime = gstpcrtime; + packetizer->prev_send_diff = -1; + GST_DEBUG ("Taking new base pcrtime %" GST_TIME_FORMAT, + GST_TIME_ARGS (gstpcrtime)); + } + + if (G_LIKELY (gstpcrtime >= packetizer->base_pcrtime)) + send_diff = gstpcrtime - packetizer->base_pcrtime; + else if (GST_CLOCK_TIME_IS_VALID (time)) { + /* elapsed time at sender, timestamps can go backwards and thus be smaller + * than our base time, take a new base time in that case. */ + GST_WARNING ("backward timestamps at server, taking new base time"); + mpegts_packetizer_resync (packetizer, time, gstpcrtime, FALSE); + send_diff = 0; + } else { + GST_WARNING ("backward timestamps at server but no timestamps"); + send_diff = 0; + /* at least try to get a new timestamp.. */ + packetizer->base_time = -1; + } + + GST_DEBUG ("gstpcr %" GST_TIME_FORMAT ", base %" + GST_TIME_FORMAT ", send_diff %" GST_TIME_FORMAT, + GST_TIME_ARGS (gstpcrtime), GST_TIME_ARGS (packetizer->base_pcrtime), + GST_TIME_ARGS (send_diff)); + + /* we don't have an arrival timestamp so we can't do skew detection. we + * should still apply a timestamp based on RTP timestamp and base_time */ + if (!GST_CLOCK_TIME_IS_VALID (time) + || !GST_CLOCK_TIME_IS_VALID (packetizer->base_time)) + goto no_skew; + + /* elapsed time at receiver, includes the jitter */ + recv_diff = time - packetizer->base_time; + + /* measure the diff */ + delta = ((gint64) recv_diff) - ((gint64) send_diff); + + /* measure the slope, this gives a rought estimate between the sender speed + * and the receiver speed. This should be approximately 8, higher values + * indicate a burst (especially when the connection starts) */ + slope = recv_diff > 0 ? (send_diff * 8) / recv_diff : 8; + + GST_DEBUG ("time %" GST_TIME_FORMAT ", base %" GST_TIME_FORMAT ", recv_diff %" + GST_TIME_FORMAT ", slope %" G_GUINT64_FORMAT, GST_TIME_ARGS (time), + GST_TIME_ARGS (packetizer->base_time), GST_TIME_ARGS (recv_diff), slope); + + /* if the difference between the sender timeline and the receiver timeline + * changed too quickly we have to resync because the server likely restarted + * its timestamps. */ + if (ABS (delta - packetizer->skew) > GST_SECOND) { + GST_WARNING ("delta - skew: %" GST_TIME_FORMAT " too big, reset skew", + GST_TIME_ARGS (delta - packetizer->skew)); + mpegts_packetizer_resync (packetizer, time, gstpcrtime, TRUE); + send_diff = 0; + delta = 0; + } + + pos = packetizer->window_pos; + + if (G_UNLIKELY (packetizer->window_filling)) { + /* we are filling the window */ + GST_DEBUG ("filling %d, delta %" G_GINT64_FORMAT, pos, delta); + packetizer->window[pos++] = delta; + /* calc the min delta we observed */ + if (G_UNLIKELY (pos == 1 || delta < packetizer->window_min)) + packetizer->window_min = delta; + + if (G_UNLIKELY (send_diff >= MAX_TIME || pos >= MAX_WINDOW)) { + packetizer->window_size = pos; + + /* window filled */ + GST_DEBUG ("min %" G_GINT64_FORMAT, packetizer->window_min); + + /* the skew is now the min */ + packetizer->skew = packetizer->window_min; + packetizer->window_filling = FALSE; + } else { + gint perc_time, perc_window, perc; + + /* figure out how much we filled the window, this depends on the amount of + * time we have or the max number of points we keep. */ + perc_time = send_diff * 100 / MAX_TIME; + perc_window = pos * 100 / MAX_WINDOW; + perc = MAX (perc_time, perc_window); + + /* make a parabolic function, the closer we get to the MAX, the more value + * we give to the scaling factor of the new value */ + perc = perc * perc; + + /* quickly go to the min value when we are filling up, slowly when we are + * just starting because we're not sure it's a good value yet. */ + packetizer->skew = + (perc * packetizer->window_min + ((10000 - + perc) * packetizer->skew)) / 10000; + packetizer->window_size = pos + 1; + } + } else { + /* pick old value and store new value. We keep the previous value in order + * to quickly check if the min of the window changed */ + old = packetizer->window[pos]; + packetizer->window[pos++] = delta; + + if (G_UNLIKELY (delta <= packetizer->window_min)) { + /* if the new value we inserted is smaller or equal to the current min, + * it becomes the new min */ + packetizer->window_min = delta; + } else if (G_UNLIKELY (old == packetizer->window_min)) { + gint64 min = G_MAXINT64; + + /* if we removed the old min, we have to find a new min */ + for (i = 0; i < packetizer->window_size; i++) { + /* we found another value equal to the old min, we can stop searching now */ + if (packetizer->window[i] == old) { + min = old; + break; + } + if (packetizer->window[i] < min) + min = packetizer->window[i]; + } + packetizer->window_min = min; + } + /* average the min values */ + packetizer->skew = + (packetizer->window_min + (124 * packetizer->skew)) / 125; + GST_DEBUG ("delta %" G_GINT64_FORMAT ", new min: %" G_GINT64_FORMAT, delta, + packetizer->window_min); + } + /* wrap around in the window */ + if (G_UNLIKELY (pos >= packetizer->window_size)) + pos = 0; + + packetizer->window_pos = pos; + +no_skew: + /* the output time is defined as the base timestamp plus the PCR time + * adjusted for the clock skew .*/ + if (packetizer->base_time != -1) { + out_time = packetizer->base_time + send_diff; + /* skew can be negative and we don't want to make invalid timestamps */ + if (packetizer->skew < 0 && out_time < -packetizer->skew) { + out_time = 0; + } else { + out_time += packetizer->skew; + } + /* check if timestamps are not going backwards, we can only check this if we + * have a previous out time and a previous send_diff */ + if (G_LIKELY (packetizer->prev_out_time != -1 + && packetizer->prev_send_diff != -1)) { + /* now check for backwards timestamps */ + if (G_UNLIKELY ( + /* if the server timestamps went up and the out_time backwards */ + (send_diff > packetizer->prev_send_diff + && out_time < packetizer->prev_out_time) || + /* if the server timestamps went backwards and the out_time forwards */ + (send_diff < packetizer->prev_send_diff + && out_time > packetizer->prev_out_time) || + /* if the server timestamps did not change */ + send_diff == packetizer->prev_send_diff)) { + GST_DEBUG ("backwards timestamps, using previous time"); + out_time = GSTTIME_TO_MPEGTIME (out_time); + } + } + } else { + /* We simply use the pcrtime without applying any skew compensation */ + out_time = pcrtime; + } + + packetizer->prev_out_time = out_time; + packetizer->prev_send_diff = send_diff; + + GST_DEBUG ("skew %" G_GINT64_FORMAT ", out %" GST_TIME_FORMAT, + packetizer->skew, GST_TIME_ARGS (out_time)); + + return out_time; +} diff --git a/gst/mpegtsdemux/mpegtspacketizer.h b/gst/mpegtsdemux/mpegtspacketizer.h index 832862577d..84da5f647a 100644 --- a/gst/mpegtsdemux/mpegtspacketizer.h +++ b/gst/mpegtsdemux/mpegtspacketizer.h @@ -28,6 +28,8 @@ #include #include +#include "gstmpegdefs.h" + #define MPEGTS_NORMAL_PACKETSIZE 188 #define MPEGTS_M2TS_PACKETSIZE 192 #define MPEGTS_DVB_ASI_PACKETSIZE 204 @@ -39,6 +41,8 @@ #define MPEGTS_AFC_PCR_FLAG 0x10 #define MPEGTS_AFC_OPCR_FLAG 0x08 +#define MAX_WINDOW 512 + G_BEGIN_DECLS #define GST_TYPE_MPEGTS_PACKETIZER \ @@ -79,6 +83,21 @@ struct _MpegTSPacketizer2 { /* current offset of the tip of the adapter */ guint64 offset; gboolean empty; + + /* clock skew calculation */ + gboolean calculate_skew; + GstClockTime base_time; + GstClockTime base_pcrtime; + GstClockTime base_extrtp; + GstClockTime prev_out_time; + GstClockTime last_pcrtime; + gint64 window[MAX_WINDOW]; + guint window_pos; + guint window_size; + gboolean window_filling; + gint64 window_min; + gint64 skew; + gint64 prev_send_diff; }; struct _MpegTSPacketizer2Class {