tsparse: Handle backward and discont timestamps better.

Assume that small backward PCR jumps are just from upstream packet
mis-ordering and don't reset timestamp tracking state - assuming that
things will be OK again shortly.

Make the threshold for detecting discont between sequential buffers
configurable and match the smoothing-latency setting on tsparse
to better cope with data bursts.
This commit is contained in:
Jan Schmidt 2014-10-29 22:58:37 +11:00
parent a49ce685d1
commit 068cba5df6
3 changed files with 49 additions and 19 deletions

View file

@ -50,8 +50,8 @@ G_DEFINE_TYPE_EXTENDED (MpegTSPacketizer2, mpegts_packetizer, G_TYPE_OBJECT, 0,
static void mpegts_packetizer_dispose (GObject * object); static void mpegts_packetizer_dispose (GObject * object);
static void mpegts_packetizer_finalize (GObject * object); static void mpegts_packetizer_finalize (GObject * object);
static GstClockTime calculate_skew (MpegTSPCR * pcr, guint64 pcrtime, static GstClockTime calculate_skew (MpegTSPacketizer2 * packetizer,
GstClockTime time); MpegTSPCR * pcr, guint64 pcrtime, GstClockTime time);
static void _close_current_group (MpegTSPCR * pcrtable); static void _close_current_group (MpegTSPCR * pcrtable);
static void record_pcr (MpegTSPacketizer2 * packetizer, MpegTSPCR * pcrtable, static void record_pcr (MpegTSPacketizer2 * packetizer, MpegTSPCR * pcrtable,
guint64 pcr, guint64 offset); guint64 pcr, guint64 offset);
@ -272,6 +272,7 @@ mpegts_packetizer_init (MpegTSPacketizer2 * packetizer)
packetizer->nb_seen_offsets = 0; packetizer->nb_seen_offsets = 0;
packetizer->refoffset = -1; packetizer->refoffset = -1;
packetizer->last_in_time = GST_CLOCK_TIME_NONE; packetizer->last_in_time = GST_CLOCK_TIME_NONE;
packetizer->pcr_discont_threshold = GST_SECOND;
} }
static void static void
@ -397,7 +398,8 @@ mpegts_packetizer_parse_adaptation_field_control (MpegTSPacketizer2 *
if (packetizer->calculate_skew if (packetizer->calculate_skew
&& GST_CLOCK_TIME_IS_VALID (packetizer->last_in_time)) { && GST_CLOCK_TIME_IS_VALID (packetizer->last_in_time)) {
pcrtable = get_pcr_table (packetizer, packet->pid); pcrtable = get_pcr_table (packetizer, packet->pid);
calculate_skew (pcrtable, packet->pcr, packetizer->last_in_time); calculate_skew (packetizer, pcrtable, packet->pcr,
packetizer->last_in_time);
} }
if (packetizer->calculate_offset) { if (packetizer->calculate_offset) {
if (!pcrtable) if (!pcrtable)
@ -1283,7 +1285,8 @@ mpegts_packetizer_resync (MpegTSPCR * pcr, GstClockTime time,
* Returns: @time adjusted with the clock skew. * Returns: @time adjusted with the clock skew.
*/ */
static GstClockTime static GstClockTime
calculate_skew (MpegTSPCR * pcr, guint64 pcrtime, GstClockTime time) calculate_skew (MpegTSPacketizer2 * packetizer,
MpegTSPCR * pcr, guint64 pcrtime, GstClockTime time)
{ {
guint64 send_diff, recv_diff; guint64 send_diff, recv_diff;
gint64 delta; gint64 delta;
@ -1321,7 +1324,8 @@ calculate_skew (MpegTSPCR * pcr, guint64 pcrtime, GstClockTime time)
send_diff = gstpcrtime - pcr->base_pcrtime; send_diff = gstpcrtime - pcr->base_pcrtime;
} else if (GST_CLOCK_TIME_IS_VALID (time) } else if (GST_CLOCK_TIME_IS_VALID (time)
&& pcr->last_pcrtime - gstpcrtime > 15 * GST_SECOND) { && pcr->last_pcrtime - gstpcrtime > 15 * GST_SECOND) {
/* Assume a reset */ /* Time jumped backward by > 15 seconds, and we have a timestamp
* to use to close the discont. Assume a reset */
GST_DEBUG ("PCR reset"); GST_DEBUG ("PCR reset");
/* Calculate PCR we would have expected for the given input time, /* Calculate PCR we would have expected for the given input time,
* essentially applying the reverse correction process * essentially applying the reverse correction process
@ -1348,10 +1352,22 @@ calculate_skew (MpegTSPCR * pcr, guint64 pcrtime, GstClockTime time)
" corrected pcr time %" GST_TIME_FORMAT, " corrected pcr time %" GST_TIME_FORMAT,
GST_TIME_ARGS (pcr->pcroffset), GST_TIME_ARGS (gstpcrtime)); GST_TIME_ARGS (pcr->pcroffset), GST_TIME_ARGS (gstpcrtime));
} else { } else {
GST_WARNING ("backward timestamps at server but no timestamps"); /* Small jumps backward, assume some arrival jitter and skip it */
send_diff = 0; send_diff = 0;
/* at least try to get a new timestamp.. */
pcr->base_time = GST_CLOCK_TIME_NONE; if (pcr->last_pcrtime - gstpcrtime < GST_SECOND) {
GST_WARNING
("(small) backward timestamps at server or no buffer timestamps. Ignoring.");
/* This will trigger the no_skew logic before but leave other state
* intact */
time = GST_CLOCK_TIME_NONE;
} else {
/* A bigger backward step than packet out-of-order can account for. Reset base PCR time
* to be resynched the next time we see a PCR */
GST_WARNING
("backward timestamps at server or no buffer timestamps. Resync base PCR");
pcr->base_pcrtime = GST_CLOCK_TIME_NONE;
}
} }
} else } else
send_diff = gstpcrtime - pcr->base_pcrtime; send_diff = gstpcrtime - pcr->base_pcrtime;
@ -1396,7 +1412,7 @@ calculate_skew (MpegTSPCR * pcr, guint64 pcrtime, GstClockTime time)
/* if the difference between the sender timeline and the receiver timeline /* if the difference between the sender timeline and the receiver timeline
* changed too quickly we have to resync because the server likely restarted * changed too quickly we have to resync because the server likely restarted
* its timestamps. */ * its timestamps. */
if (ABS (delta - pcr->skew) > GST_SECOND) { if (ABS (delta - pcr->skew) > packetizer->pcr_discont_threshold) {
GST_WARNING ("delta - skew: %" GST_TIME_FORMAT " too big, reset skew", GST_WARNING ("delta - skew: %" GST_TIME_FORMAT " too big, reset skew",
GST_TIME_ARGS (delta - pcr->skew)); GST_TIME_ARGS (delta - pcr->skew));
mpegts_packetizer_resync (pcr, time, gstpcrtime, TRUE); mpegts_packetizer_resync (pcr, time, gstpcrtime, TRUE);
@ -1595,8 +1611,8 @@ _reevaluate_group_pcr_offset (MpegTSPCR * pcrtable, PCROffsetGroup * group)
GST_DEBUG ("Previous group bitrate (%" G_GUINT64_FORMAT " / %" GST_DEBUG ("Previous group bitrate (%" G_GUINT64_FORMAT " / %"
GST_TIME_FORMAT ") : %" G_GUINT64_FORMAT, GST_TIME_FORMAT ") : %" G_GUINT64_FORMAT,
current->pending[current->last].offset, current->pending[current->last].offset,
GST_TIME_ARGS (PCRTIME_TO_GSTTIME (current->pending[current-> GST_TIME_ARGS (PCRTIME_TO_GSTTIME (current->pending[current->last].
last].pcr)), prevbr); pcr)), prevbr);
} else if (prev->values[prev->last_value].offset) { } else if (prev->values[prev->last_value].offset) {
prevoffset = prev->values[prev->last_value].offset + prev->first_offset; prevoffset = prev->values[prev->last_value].offset + prev->first_offset;
prevpcr = prev->values[prev->last_value].pcr + prev->first_pcr; prevpcr = prev->values[prev->last_value].pcr + prev->first_pcr;
@ -1608,8 +1624,8 @@ _reevaluate_group_pcr_offset (MpegTSPCR * pcrtable, PCROffsetGroup * group)
GST_DEBUG ("Previous group bitrate (%" G_GUINT64_FORMAT " / %" GST_DEBUG ("Previous group bitrate (%" G_GUINT64_FORMAT " / %"
GST_TIME_FORMAT ") : %" G_GUINT64_FORMAT, GST_TIME_FORMAT ") : %" G_GUINT64_FORMAT,
prev->values[prev->last_value].offset, prev->values[prev->last_value].offset,
GST_TIME_ARGS (PCRTIME_TO_GSTTIME (prev->values[prev-> GST_TIME_ARGS (PCRTIME_TO_GSTTIME (prev->values[prev->last_value].
last_value].pcr)), prevbr); pcr)), prevbr);
} else { } else {
GST_DEBUG ("Using overall bitrate"); GST_DEBUG ("Using overall bitrate");
prevoffset = prev->values[prev->last_value].offset + prev->first_offset; prevoffset = prev->values[prev->last_value].offset + prev->first_offset;
@ -1918,9 +1934,8 @@ record_pcr (MpegTSPacketizer2 * packetizer, MpegTSPCR * pcrtable,
group->first_offset, group->first_offset,
GST_TIME_ARGS (PCRTIME_TO_GSTTIME (group->pcr_offset))); GST_TIME_ARGS (PCRTIME_TO_GSTTIME (group->pcr_offset)));
GST_DEBUG ("Last PCR: +%" GST_TIME_FORMAT " offset: +%" G_GUINT64_FORMAT, GST_DEBUG ("Last PCR: +%" GST_TIME_FORMAT " offset: +%" G_GUINT64_FORMAT,
GST_TIME_ARGS (PCRTIME_TO_GSTTIME (group->values[group-> GST_TIME_ARGS (PCRTIME_TO_GSTTIME (group->values[group->last_value].
last_value].pcr)), pcr)), group->values[group->last_value].offset);
group->values[group->last_value].offset);
/* Check if before group */ /* Check if before group */
if (offset < group->first_offset) { if (offset < group->first_offset) {
GST_DEBUG ("offset is before that group"); GST_DEBUG ("offset is before that group");
@ -2416,6 +2431,15 @@ mpegts_packetizer_set_reference_offset (MpegTSPacketizer2 * packetizer,
PACKETIZER_GROUP_UNLOCK (packetizer); PACKETIZER_GROUP_UNLOCK (packetizer);
} }
void
mpegts_packetizer_set_pcr_discont_threshold (MpegTSPacketizer2 * packetizer,
GstClockTime threshold)
{
PACKETIZER_GROUP_LOCK (packetizer);
packetizer->pcr_discont_threshold = threshold;
PACKETIZER_GROUP_UNLOCK (packetizer);
}
void void
mpegts_packetizer_set_current_pcr_offset (MpegTSPacketizer2 * packetizer, mpegts_packetizer_set_current_pcr_offset (MpegTSPacketizer2 * packetizer,
GstClockTime offset, guint16 pcr_pid) GstClockTime offset, guint16 pcr_pid)

View file

@ -282,6 +282,7 @@ struct _MpegTSPacketizer2 {
guint8 pcrtablelut[0x2000]; guint8 pcrtablelut[0x2000];
MpegTSPCR *observations[MAX_PCR_OBS_CHANNELS]; MpegTSPCR *observations[MAX_PCR_OBS_CHANNELS];
guint8 lastobsid; guint8 lastobsid;
GstClockTime pcr_discont_threshold;
}; };
struct _MpegTSPacketizer2Class { struct _MpegTSPacketizer2Class {
@ -374,6 +375,9 @@ mpegts_packetizer_set_current_pcr_offset (MpegTSPacketizer2 * packetizer,
G_GNUC_INTERNAL void G_GNUC_INTERNAL void
mpegts_packetizer_set_reference_offset (MpegTSPacketizer2 * packetizer, mpegts_packetizer_set_reference_offset (MpegTSPacketizer2 * packetizer,
guint64 refoffset); guint64 refoffset);
G_GNUC_INTERNAL void
mpegts_packetizer_set_pcr_discont_threshold (MpegTSPacketizer2 * packetizer,
GstClockTime threshold);
G_END_DECLS G_END_DECLS
#endif /* GST_MPEGTS_PACKETIZER_H */ #endif /* GST_MPEGTS_PACKETIZER_H */

View file

@ -250,6 +250,8 @@ mpegts_parse_set_property (GObject * object, guint prop_id,
break; break;
case PROP_SMOOTHING_LATENCY: case PROP_SMOOTHING_LATENCY:
parse->smoothing_latency = GST_USECOND * g_value_get_uint (value); parse->smoothing_latency = GST_USECOND * g_value_get_uint (value);
mpegts_packetizer_set_pcr_discont_threshold (GST_MPEGTS_BASE
(parse)->packetizer, parse->smoothing_latency);
break; break;
case PROP_PCR_PID: case PROP_PCR_PID:
parse->pcr_pid = parse->user_pcr_pid = g_value_get_int (value); parse->pcr_pid = parse->user_pcr_pid = g_value_get_int (value);
@ -774,7 +776,7 @@ drain_pending_buffers (MpegTSParse2 * parse, gboolean drain_all)
pcr_diff = get_pending_timestamp_diff (parse); pcr_diff = get_pending_timestamp_diff (parse);
} else { /* Case 4 */ } else { /* Case 4 */
start_ts = parse->previous_pcr; start_ts = parse->previous_pcr;
if (pcr > start_ts) if (GST_CLOCK_TIME_IS_VALID (pcr) && pcr > start_ts)
pcr_diff = GST_CLOCK_DIFF (start_ts, pcr); pcr_diff = GST_CLOCK_DIFF (start_ts, pcr);
/* Make sure PCR observations are sufficiently far apart */ /* Make sure PCR observations are sufficiently far apart */
@ -782,8 +784,8 @@ drain_pending_buffers (MpegTSParse2 * parse, gboolean drain_all)
return GST_FLOW_OK; return GST_FLOW_OK;
} }
GST_LOG_OBJECT (parse, "Pushing buffers - startTS %" GST_TIME_FORMAT GST_INFO_OBJECT (parse, "Pushing buffers - startTS %" GST_TIME_FORMAT
" duration %" GST_TIME_FORMAT " %" G_GSIZE_FORMAT " bytes\n", " duration %" GST_TIME_FORMAT " %" G_GSIZE_FORMAT " bytes",
GST_TIME_ARGS (start_ts), GST_TIME_ARGS (pcr_diff), pcr_bytes); GST_TIME_ARGS (start_ts), GST_TIME_ARGS (pcr_diff), pcr_bytes);
/* Now, push buffers out pacing timestamps over pcr_diff time and pcr_bytes */ /* Now, push buffers out pacing timestamps over pcr_diff time and pcr_bytes */