tsdemux: GAP detection

All pads of a stream are now added at the beginning. In order to cope with
streams that don't get any data (forever or for a long time) we detect gaps
and push out GAP events when needed.

Cleanups and commenting by Jan Schmidt <jan@centricular.com>

https://bugzilla.gnome.org/show_bug.cgi?id=734040
This commit is contained in:
Edward Hervey 2014-10-20 12:30:50 +02:00
parent 89455b7106
commit b59a9262c0
3 changed files with 152 additions and 42 deletions

View file

@ -121,6 +121,19 @@ flush_observations (MpegTSPacketizer2 * packetizer)
packetizer->lastobsid = 0;
}
GstClockTime
mpegts_packetizer_get_current_time (MpegTSPacketizer2 * packetizer,
guint16 pcr_pid)
{
MpegTSPCR *pcrtable = get_pcr_table (packetizer, pcr_pid);
if (pcrtable == NULL)
return GST_CLOCK_TIME_NONE;
return mpegts_packetizer_pts_to_ts (packetizer, pcrtable->last_pcrtime,
pcr_pid);
}
static inline MpegTSPacketizerStreamSubtable *
find_subtable (GSList * subtables, guint8 table_id, guint16 subtable_extension)
{
@ -1874,6 +1887,7 @@ record_pcr (MpegTSPacketizer2 * packetizer, MpegTSPCR * pcrtable,
packetizer->nb_seen_offsets += 1;
pcrtable->last_pcrtime = PCRTIME_TO_GSTTIME (pcr);
/* FIXME : Invert logic later (probability is higher that we have a
* current estimator) */

View file

@ -365,6 +365,9 @@ mpegts_packetizer_ts_to_offset (MpegTSPacketizer2 * packetizer,
G_GNUC_INTERNAL GstClockTime
mpegts_packetizer_pts_to_ts (MpegTSPacketizer2 * packetizer,
GstClockTime pts, guint16 pcr_pid);
G_GNUC_INTERNAL GstClockTime
mpegts_packetizer_get_current_time (MpegTSPacketizer2 * packetizer,
guint16 pcr_pid);
G_GNUC_INTERNAL void
mpegts_packetizer_set_current_pcr_offset (MpegTSPacketizer2 * packetizer,
GstClockTime offset, guint16 pcr_pid);

View file

@ -165,6 +165,13 @@ struct _TSDemuxStream
GstClockTime pts;
GstClockTime dts;
/* Reference PTS used to detect gaps */
GstClockTime gap_ref_pts;
/* Number of outputted buffers */
guint32 nb_out_buffers;
/* Reference number of buffers for gaps */
guint32 gap_ref_buffers;
/* Current PTS/DTS for this stream (in 90kHz unit) */
guint64 raw_pts, raw_dts;
@ -295,6 +302,8 @@ static void gst_ts_demux_stream_flush (TSDemuxStream * stream,
GstTSDemux * demux);
static gboolean push_event (MpegTSBase * base, GstEvent * event);
static void gst_ts_demux_check_and_sync_streams (GstTSDemux * demux,
GstClockTime time);
static void
_extra_init (void)
@ -1402,10 +1411,13 @@ gst_ts_demux_stream_added (MpegTSBase * base, MpegTSBaseStream * bstream,
stream->discont = TRUE;
stream->pts = GST_CLOCK_TIME_NONE;
stream->dts = GST_CLOCK_TIME_NONE;
stream->first_dts = GST_CLOCK_TIME_NONE;
stream->raw_pts = -1;
stream->raw_dts = -1;
stream->pending_ts = TRUE;
stream->first_dts = GST_CLOCK_TIME_NONE;
stream->nb_out_buffers = 0;
stream->gap_ref_buffers = 0;
stream->gap_ref_pts = GST_CLOCK_TIME_NONE;
stream->continuity_counter = CONTINUITY_UNSET;
}
}
@ -1458,31 +1470,18 @@ gst_ts_demux_stream_removed (MpegTSBase * base, MpegTSBaseStream * bstream)
static void
activate_pad_for_stream (GstTSDemux * tsdemux, TSDemuxStream * stream)
{
GList *tmp;
gboolean alldone = TRUE;
if (stream->pad) {
GST_DEBUG_OBJECT (tsdemux, "Activating pad %s:%s for stream %p",
GST_DEBUG_PAD_NAME (stream->pad), stream);
gst_element_add_pad ((GstElement *) tsdemux, stream->pad);
stream->active = TRUE;
GST_DEBUG_OBJECT (stream->pad, "done adding pad");
/* Check if all pads were activated, and if so emit no-more-pads */
for (tmp = tsdemux->program->stream_list; tmp; tmp = tmp->next) {
stream = (TSDemuxStream *) tmp->data;
if (stream->pad && !stream->active)
alldone = FALSE;
}
if (alldone) {
GST_DEBUG_OBJECT (tsdemux, "All pads were activated, emit no-more-pads");
gst_element_no_more_pads ((GstElement *) tsdemux);
}
} else
} else {
GST_WARNING_OBJECT (tsdemux,
"stream %p (pid 0x%04x, type:0x%03x) has no pad", stream,
((MpegTSBaseStream *) stream)->pid,
((MpegTSBaseStream *) stream)->stream_type);
}
}
static void
@ -1505,6 +1504,9 @@ gst_ts_demux_stream_flush (TSDemuxStream * stream, GstTSDemux * tsdemux)
stream->raw_pts = -1;
stream->raw_dts = -1;
stream->pending_ts = TRUE;
stream->nb_out_buffers = 0;
stream->gap_ref_buffers = 0;
stream->gap_ref_pts = GST_CLOCK_TIME_NONE;
stream->continuity_counter = CONTINUITY_UNSET;
}
@ -1529,6 +1531,7 @@ gst_ts_demux_program_started (MpegTSBase * base, MpegTSBaseProgram * program)
if (demux->requested_program_number == program->program_number ||
(demux->requested_program_number == -1 && demux->program_number == -1)) {
GList *tmp;
GST_LOG ("program %d started", program->program_number);
demux->program_number = program->program_number;
@ -1538,7 +1541,12 @@ gst_ts_demux_program_started (MpegTSBase * base, MpegTSBaseProgram * program)
* an update newsegment */
demux->calculate_update_segment = !program->initial_program;
/* FIXME : When do we emit no_more_pads ? */
/* Add all streams, then fire no-more-pads */
for (tmp = program->stream_list; tmp; tmp = tmp->next) {
TSDemuxStream *stream = (TSDemuxStream *) tmp->data;
activate_pad_for_stream (demux, stream);
}
gst_element_no_more_pads ((GstElement *) demux);
}
}
@ -1985,32 +1993,98 @@ calculate_and_push_newsegment (GstTSDemux * demux, TSDemuxStream * stream)
}
push_new_segment:
if (demux->update_segment) {
GST_DEBUG_OBJECT (stream->pad, "Pushing update segment");
gst_event_ref (demux->update_segment);
gst_pad_push_event (stream->pad, demux->update_segment);
}
for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
stream = (TSDemuxStream *) tmp->data;
if (stream->pad == NULL)
continue;
if (demux->update_segment) {
GST_DEBUG_OBJECT (stream->pad, "Pushing update segment");
gst_event_ref (demux->update_segment);
gst_pad_push_event (stream->pad, demux->update_segment);
}
if (demux->segment_event) {
GST_DEBUG_OBJECT (stream->pad, "Pushing newsegment event");
gst_event_ref (demux->segment_event);
gst_pad_push_event (stream->pad, demux->segment_event);
}
if (demux->segment_event) {
GST_DEBUG_OBJECT (stream->pad, "Pushing newsegment event");
gst_event_ref (demux->segment_event);
gst_pad_push_event (stream->pad, demux->segment_event);
}
if (demux->global_tags) {
gst_pad_push_event (stream->pad,
gst_event_new_tag (gst_tag_list_ref (demux->global_tags)));
}
if (demux->global_tags) {
gst_pad_push_event (stream->pad,
gst_event_new_tag (gst_tag_list_ref (demux->global_tags)));
}
/* Push pending tags */
if (stream->taglist) {
GST_DEBUG_OBJECT (stream->pad, "Sending tags %" GST_PTR_FORMAT,
stream->taglist);
gst_pad_push_event (stream->pad, gst_event_new_tag (stream->taglist));
stream->taglist = NULL;
}
/* Push pending tags */
if (stream->taglist) {
GST_DEBUG_OBJECT (stream->pad, "Sending tags %" GST_PTR_FORMAT,
stream->taglist);
gst_pad_push_event (stream->pad, gst_event_new_tag (stream->taglist));
stream->taglist = NULL;
}
stream->need_newsegment = FALSE;
stream->need_newsegment = FALSE;
}
}
static void
gst_ts_demux_check_and_sync_streams (GstTSDemux * demux, GstClockTime time)
{
GList *tmp;
GST_DEBUG_OBJECT (demux,
"Recheck streams and sync to at least: %" GST_TIME_FORMAT,
GST_TIME_ARGS (time));
if (G_UNLIKELY (demux->program == NULL))
return;
/* Go over each stream and update it to at least 'time' time.
* For each stream, the pad stores the buffer counter the last time
* a gap check occurred (gap_ref_buffers) and a gap_ref_pts timestamp
* that is either the PTS from the stream or the PCR the pad was updated
* to.
*
* We can check nb_out_buffers to see if any buffers were pushed since then.
* This means we can detect buffers passing without PTSes fine and still generate
* gaps.
*
* If there haven't been any buffers pushed on this stream since the last
* gap check, push a gap event updating to the indicated input PCR time
* and update the pad's tracking.
*
* If there have been buffers pushed, update the reference buffer count
* and but don't push a gap event
*/
for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
TSDemuxStream *ps = (TSDemuxStream *) tmp->data;
GST_DEBUG_OBJECT (ps->pad,
"0x%04x, PTS:%" GST_TIME_FORMAT " REFPTS:%" GST_TIME_FORMAT " Gap:%"
GST_TIME_FORMAT " nb_buffers: %d (ref:%d)",
((MpegTSBaseStream *) ps)->pid, GST_TIME_ARGS (ps->pts),
GST_TIME_ARGS (ps->gap_ref_pts),
GST_TIME_ARGS (ps->pts - ps->gap_ref_pts), ps->nb_out_buffers,
ps->gap_ref_buffers);
if (ps->pad == NULL)
continue;
if (ps->nb_out_buffers == ps->gap_ref_buffers && ps->gap_ref_pts != ps->pts) {
/* Do initial setup of pad if needed - segment etc */
GST_DEBUG_OBJECT (ps->pad,
"Stream needs update. Pushing GAP event to TS %" GST_TIME_FORMAT,
GST_TIME_ARGS (time));
if (G_UNLIKELY (ps->need_newsegment))
calculate_and_push_newsegment (demux, ps);
/* Now send gap event */
gst_pad_push_event (ps->pad, gst_event_new_gap (time, 0));
}
/* Update GAP tracking vars so we don't re-check this stream for a while */
ps->gap_ref_pts = time;
if (ps->pts != GST_CLOCK_TIME_NONE && ps->pts > time)
ps->gap_ref_pts = ps->pts;
ps->gap_ref_buffers = ps->nb_out_buffers;
}
}
static GstFlowReturn
@ -2087,9 +2161,6 @@ gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream)
}
}
if (G_UNLIKELY (!stream->active))
activate_pad_for_stream (demux, stream);
if (G_UNLIKELY (stream->need_newsegment))
calculate_and_push_newsegment (demux, stream);
@ -2109,6 +2180,7 @@ gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream)
stream->discont = FALSE;
res = gst_pad_push (stream->pad, pend->buffer);
stream->nb_out_buffers += 1;
g_slice_free (PendingBuffer, pend);
}
g_list_free (stream->pending);
@ -2146,10 +2218,31 @@ gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream)
stream->discont = FALSE;
res = gst_pad_push (stream->pad, buffer);
/* Record that a buffer was pushed */
stream->nb_out_buffers += 1;
GST_DEBUG_OBJECT (stream->pad, "Returned %s", gst_flow_get_name (res));
res = gst_flow_combiner_update_flow (demux->flowcombiner, res);
GST_DEBUG_OBJECT (stream->pad, "combined %s", gst_flow_get_name (res));
/* GAP / sparse stream tracking */
if (G_UNLIKELY (stream->gap_ref_pts == GST_CLOCK_TIME_NONE))
stream->gap_ref_pts = stream->pts;
else {
/* Look if the stream PTS has advanced 2 seconds since the last
* gap check, and sync streams if it has. The first stream to
* hit this will trigger a gap check */
if (G_UNLIKELY (stream->pts != GST_CLOCK_TIME_NONE &&
stream->pts > stream->gap_ref_pts + 2 * GST_SECOND)) {
GstClockTime curpcr =
mpegts_packetizer_get_current_time (MPEG_TS_BASE_PACKETIZER (demux),
demux->program->pcr_pid);
if (curpcr == GST_CLOCK_TIME_NONE || curpcr < 800 * GST_MSECOND)
goto beach;
curpcr -= 800 * GST_MSECOND;
gst_ts_demux_check_and_sync_streams (demux, curpcr);
}
}
beach:
/* Reset everything */
GST_LOG ("Resetting to EMPTY, returning %s", gst_flow_get_name (res));