From b59a9262c069d8f0c149ce4c9185c9a439ad6ece Mon Sep 17 00:00:00 2001 From: Edward Hervey Date: Mon, 20 Oct 2014 12:30:50 +0200 Subject: [PATCH] tsdemux: GAP detection All pads of a stream are now added at the beginning. In order to cope with streams that don't get any data (forever or for a long time) we detect gaps and push out GAP events when needed. Cleanups and commenting by Jan Schmidt https://bugzilla.gnome.org/show_bug.cgi?id=734040 --- gst/mpegtsdemux/mpegtspacketizer.c | 14 +++ gst/mpegtsdemux/mpegtspacketizer.h | 3 + gst/mpegtsdemux/tsdemux.c | 177 ++++++++++++++++++++++------- 3 files changed, 152 insertions(+), 42 deletions(-) diff --git a/gst/mpegtsdemux/mpegtspacketizer.c b/gst/mpegtsdemux/mpegtspacketizer.c index 84cc72e07c..55eae074d6 100644 --- a/gst/mpegtsdemux/mpegtspacketizer.c +++ b/gst/mpegtsdemux/mpegtspacketizer.c @@ -121,6 +121,19 @@ flush_observations (MpegTSPacketizer2 * packetizer) packetizer->lastobsid = 0; } +GstClockTime +mpegts_packetizer_get_current_time (MpegTSPacketizer2 * packetizer, + guint16 pcr_pid) +{ + MpegTSPCR *pcrtable = get_pcr_table (packetizer, pcr_pid); + + if (pcrtable == NULL) + return GST_CLOCK_TIME_NONE; + + return mpegts_packetizer_pts_to_ts (packetizer, pcrtable->last_pcrtime, + pcr_pid); +} + static inline MpegTSPacketizerStreamSubtable * find_subtable (GSList * subtables, guint8 table_id, guint16 subtable_extension) { @@ -1874,6 +1887,7 @@ record_pcr (MpegTSPacketizer2 * packetizer, MpegTSPCR * pcrtable, packetizer->nb_seen_offsets += 1; + pcrtable->last_pcrtime = PCRTIME_TO_GSTTIME (pcr); /* FIXME : Invert logic later (probability is higher that we have a * current estimator) */ diff --git a/gst/mpegtsdemux/mpegtspacketizer.h b/gst/mpegtsdemux/mpegtspacketizer.h index f107b80a0a..f6fb41d90d 100644 --- a/gst/mpegtsdemux/mpegtspacketizer.h +++ b/gst/mpegtsdemux/mpegtspacketizer.h @@ -365,6 +365,9 @@ mpegts_packetizer_ts_to_offset (MpegTSPacketizer2 * packetizer, G_GNUC_INTERNAL GstClockTime mpegts_packetizer_pts_to_ts (MpegTSPacketizer2 * packetizer, GstClockTime pts, guint16 pcr_pid); +G_GNUC_INTERNAL GstClockTime +mpegts_packetizer_get_current_time (MpegTSPacketizer2 * packetizer, + guint16 pcr_pid); G_GNUC_INTERNAL void mpegts_packetizer_set_current_pcr_offset (MpegTSPacketizer2 * packetizer, GstClockTime offset, guint16 pcr_pid); diff --git a/gst/mpegtsdemux/tsdemux.c b/gst/mpegtsdemux/tsdemux.c index bd196c37b5..aba2c5a43f 100644 --- a/gst/mpegtsdemux/tsdemux.c +++ b/gst/mpegtsdemux/tsdemux.c @@ -165,6 +165,13 @@ struct _TSDemuxStream GstClockTime pts; GstClockTime dts; + /* Reference PTS used to detect gaps */ + GstClockTime gap_ref_pts; + /* Number of outputted buffers */ + guint32 nb_out_buffers; + /* Reference number of buffers for gaps */ + guint32 gap_ref_buffers; + /* Current PTS/DTS for this stream (in 90kHz unit) */ guint64 raw_pts, raw_dts; @@ -295,6 +302,8 @@ static void gst_ts_demux_stream_flush (TSDemuxStream * stream, GstTSDemux * demux); static gboolean push_event (MpegTSBase * base, GstEvent * event); +static void gst_ts_demux_check_and_sync_streams (GstTSDemux * demux, + GstClockTime time); static void _extra_init (void) @@ -1402,10 +1411,13 @@ gst_ts_demux_stream_added (MpegTSBase * base, MpegTSBaseStream * bstream, stream->discont = TRUE; stream->pts = GST_CLOCK_TIME_NONE; stream->dts = GST_CLOCK_TIME_NONE; + stream->first_dts = GST_CLOCK_TIME_NONE; stream->raw_pts = -1; stream->raw_dts = -1; stream->pending_ts = TRUE; - stream->first_dts = GST_CLOCK_TIME_NONE; + stream->nb_out_buffers = 0; + stream->gap_ref_buffers = 0; + stream->gap_ref_pts = GST_CLOCK_TIME_NONE; stream->continuity_counter = CONTINUITY_UNSET; } } @@ -1458,31 +1470,18 @@ gst_ts_demux_stream_removed (MpegTSBase * base, MpegTSBaseStream * bstream) static void activate_pad_for_stream (GstTSDemux * tsdemux, TSDemuxStream * stream) { - GList *tmp; - gboolean alldone = TRUE; - if (stream->pad) { GST_DEBUG_OBJECT (tsdemux, "Activating pad %s:%s for stream %p", GST_DEBUG_PAD_NAME (stream->pad), stream); gst_element_add_pad ((GstElement *) tsdemux, stream->pad); stream->active = TRUE; GST_DEBUG_OBJECT (stream->pad, "done adding pad"); - - /* Check if all pads were activated, and if so emit no-more-pads */ - for (tmp = tsdemux->program->stream_list; tmp; tmp = tmp->next) { - stream = (TSDemuxStream *) tmp->data; - if (stream->pad && !stream->active) - alldone = FALSE; - } - if (alldone) { - GST_DEBUG_OBJECT (tsdemux, "All pads were activated, emit no-more-pads"); - gst_element_no_more_pads ((GstElement *) tsdemux); - } - } else + } else { GST_WARNING_OBJECT (tsdemux, "stream %p (pid 0x%04x, type:0x%03x) has no pad", stream, ((MpegTSBaseStream *) stream)->pid, ((MpegTSBaseStream *) stream)->stream_type); + } } static void @@ -1505,6 +1504,9 @@ gst_ts_demux_stream_flush (TSDemuxStream * stream, GstTSDemux * tsdemux) stream->raw_pts = -1; stream->raw_dts = -1; stream->pending_ts = TRUE; + stream->nb_out_buffers = 0; + stream->gap_ref_buffers = 0; + stream->gap_ref_pts = GST_CLOCK_TIME_NONE; stream->continuity_counter = CONTINUITY_UNSET; } @@ -1529,6 +1531,7 @@ gst_ts_demux_program_started (MpegTSBase * base, MpegTSBaseProgram * program) if (demux->requested_program_number == program->program_number || (demux->requested_program_number == -1 && demux->program_number == -1)) { + GList *tmp; GST_LOG ("program %d started", program->program_number); demux->program_number = program->program_number; @@ -1538,7 +1541,12 @@ gst_ts_demux_program_started (MpegTSBase * base, MpegTSBaseProgram * program) * an update newsegment */ demux->calculate_update_segment = !program->initial_program; - /* FIXME : When do we emit no_more_pads ? */ + /* Add all streams, then fire no-more-pads */ + for (tmp = program->stream_list; tmp; tmp = tmp->next) { + TSDemuxStream *stream = (TSDemuxStream *) tmp->data; + activate_pad_for_stream (demux, stream); + } + gst_element_no_more_pads ((GstElement *) demux); } } @@ -1985,32 +1993,98 @@ calculate_and_push_newsegment (GstTSDemux * demux, TSDemuxStream * stream) } push_new_segment: - if (demux->update_segment) { - GST_DEBUG_OBJECT (stream->pad, "Pushing update segment"); - gst_event_ref (demux->update_segment); - gst_pad_push_event (stream->pad, demux->update_segment); - } + for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) { + stream = (TSDemuxStream *) tmp->data; + if (stream->pad == NULL) + continue; + if (demux->update_segment) { + GST_DEBUG_OBJECT (stream->pad, "Pushing update segment"); + gst_event_ref (demux->update_segment); + gst_pad_push_event (stream->pad, demux->update_segment); + } - if (demux->segment_event) { - GST_DEBUG_OBJECT (stream->pad, "Pushing newsegment event"); - gst_event_ref (demux->segment_event); - gst_pad_push_event (stream->pad, demux->segment_event); - } + if (demux->segment_event) { + GST_DEBUG_OBJECT (stream->pad, "Pushing newsegment event"); + gst_event_ref (demux->segment_event); + gst_pad_push_event (stream->pad, demux->segment_event); + } - if (demux->global_tags) { - gst_pad_push_event (stream->pad, - gst_event_new_tag (gst_tag_list_ref (demux->global_tags))); - } + if (demux->global_tags) { + gst_pad_push_event (stream->pad, + gst_event_new_tag (gst_tag_list_ref (demux->global_tags))); + } - /* Push pending tags */ - if (stream->taglist) { - GST_DEBUG_OBJECT (stream->pad, "Sending tags %" GST_PTR_FORMAT, - stream->taglist); - gst_pad_push_event (stream->pad, gst_event_new_tag (stream->taglist)); - stream->taglist = NULL; - } + /* Push pending tags */ + if (stream->taglist) { + GST_DEBUG_OBJECT (stream->pad, "Sending tags %" GST_PTR_FORMAT, + stream->taglist); + gst_pad_push_event (stream->pad, gst_event_new_tag (stream->taglist)); + stream->taglist = NULL; + } - stream->need_newsegment = FALSE; + stream->need_newsegment = FALSE; + } +} + +static void +gst_ts_demux_check_and_sync_streams (GstTSDemux * demux, GstClockTime time) +{ + GList *tmp; + + GST_DEBUG_OBJECT (demux, + "Recheck streams and sync to at least: %" GST_TIME_FORMAT, + GST_TIME_ARGS (time)); + + if (G_UNLIKELY (demux->program == NULL)) + return; + + /* Go over each stream and update it to at least 'time' time. + * For each stream, the pad stores the buffer counter the last time + * a gap check occurred (gap_ref_buffers) and a gap_ref_pts timestamp + * that is either the PTS from the stream or the PCR the pad was updated + * to. + * + * We can check nb_out_buffers to see if any buffers were pushed since then. + * This means we can detect buffers passing without PTSes fine and still generate + * gaps. + * + * If there haven't been any buffers pushed on this stream since the last + * gap check, push a gap event updating to the indicated input PCR time + * and update the pad's tracking. + * + * If there have been buffers pushed, update the reference buffer count + * and but don't push a gap event + */ + for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) { + TSDemuxStream *ps = (TSDemuxStream *) tmp->data; + GST_DEBUG_OBJECT (ps->pad, + "0x%04x, PTS:%" GST_TIME_FORMAT " REFPTS:%" GST_TIME_FORMAT " Gap:%" + GST_TIME_FORMAT " nb_buffers: %d (ref:%d)", + ((MpegTSBaseStream *) ps)->pid, GST_TIME_ARGS (ps->pts), + GST_TIME_ARGS (ps->gap_ref_pts), + GST_TIME_ARGS (ps->pts - ps->gap_ref_pts), ps->nb_out_buffers, + ps->gap_ref_buffers); + if (ps->pad == NULL) + continue; + + if (ps->nb_out_buffers == ps->gap_ref_buffers && ps->gap_ref_pts != ps->pts) { + /* Do initial setup of pad if needed - segment etc */ + GST_DEBUG_OBJECT (ps->pad, + "Stream needs update. Pushing GAP event to TS %" GST_TIME_FORMAT, + GST_TIME_ARGS (time)); + if (G_UNLIKELY (ps->need_newsegment)) + calculate_and_push_newsegment (demux, ps); + + /* Now send gap event */ + gst_pad_push_event (ps->pad, gst_event_new_gap (time, 0)); + } + + /* Update GAP tracking vars so we don't re-check this stream for a while */ + ps->gap_ref_pts = time; + if (ps->pts != GST_CLOCK_TIME_NONE && ps->pts > time) + ps->gap_ref_pts = ps->pts; + ps->gap_ref_buffers = ps->nb_out_buffers; + } } static GstFlowReturn @@ -2087,9 +2161,6 @@ gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream) } } - if (G_UNLIKELY (!stream->active)) - activate_pad_for_stream (demux, stream); - if (G_UNLIKELY (stream->need_newsegment)) calculate_and_push_newsegment (demux, stream); @@ -2109,6 +2180,7 @@ gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream) stream->discont = FALSE; res = gst_pad_push (stream->pad, pend->buffer); + stream->nb_out_buffers += 1; g_slice_free (PendingBuffer, pend); } g_list_free (stream->pending); @@ -2146,10 +2218,31 @@ gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream) stream->discont = FALSE; res = gst_pad_push (stream->pad, buffer); + /* Record that a buffer was pushed */ + stream->nb_out_buffers += 1; GST_DEBUG_OBJECT (stream->pad, "Returned %s", gst_flow_get_name (res)); res = gst_flow_combiner_update_flow (demux->flowcombiner, res); GST_DEBUG_OBJECT (stream->pad, "combined %s", gst_flow_get_name (res)); + /* GAP / sparse stream tracking */ + if (G_UNLIKELY (stream->gap_ref_pts == GST_CLOCK_TIME_NONE)) + stream->gap_ref_pts = stream->pts; + else { + /* Look if the stream PTS has advanced 2 seconds since the last + * gap check, and sync streams if it has. The first stream to + * hit this will trigger a gap check */ + if (G_UNLIKELY (stream->pts != GST_CLOCK_TIME_NONE && + stream->pts > stream->gap_ref_pts + 2 * GST_SECOND)) { + GstClockTime curpcr = + mpegts_packetizer_get_current_time (MPEG_TS_BASE_PACKETIZER (demux), + demux->program->pcr_pid); + if (curpcr == GST_CLOCK_TIME_NONE || curpcr < 800 * GST_MSECOND) + goto beach; + curpcr -= 800 * GST_MSECOND; + gst_ts_demux_check_and_sync_streams (demux, curpcr); + } + } + beach: /* Reset everything */ GST_LOG ("Resetting to EMPTY, returning %s", gst_flow_get_name (res));