mpegtsbase/tsdemux: Refactor seek and segment handling

All calculations go through the mpegtspacketizer
Remove unused variables/code
This commit is contained in:
Edward Hervey 2012-03-01 18:05:17 +01:00
parent 77ece06a3f
commit 097f09b823
5 changed files with 264 additions and 220 deletions

View file

@ -206,6 +206,8 @@
GST_MSECOND/10, CLOCK_BASE))
#define GSTTIME_TO_MPEGTIME(time) (gst_util_uint64_scale ((time), \
CLOCK_BASE, GST_MSECOND/10))
#define GSTTIME_TO_PCRTIME(time) (gst_util_uint64_scale ((time), \
300 * CLOCK_BASE, GST_MSECOND/10))
#define MPEG_MUX_RATE_MULT 50

View file

@ -221,9 +221,7 @@ mpegts_base_reset (MpegTSBase * base)
base->mode = BASE_MODE_STREAMING;
base->seen_pat = FALSE;
base->first_pat_offset = -1;
base->in_gap = 0;
base->first_buf_ts = GST_CLOCK_TIME_NONE;
base->seek_offset = -1;
base->upstream_live = FALSE;
base->query_latency = FALSE;
@ -621,7 +619,7 @@ mpegts_base_deactivate_program (MpegTSBase * base, MpegTSBaseProgram * program)
static void
mpegts_base_activate_program (MpegTSBase * base, MpegTSBaseProgram * program,
guint16 pmt_pid, GstStructure * pmt_info)
guint16 pmt_pid, GstStructure * pmt_info, gboolean initial_program)
{
guint i, nbstreams;
guint pcr_pid;
@ -666,8 +664,8 @@ mpegts_base_activate_program (MpegTSBase * base, MpegTSBaseProgram * program,
mpegts_base_program_add_stream (base, program, (guint16) pcr_pid, -1, NULL);
MPEGTS_BIT_SET (base->is_pes, pcr_pid);
program->active = TRUE;
program->initial_program = initial_program;
klass = GST_MPEGTS_BASE_GET_CLASS (base);
if (klass->program_started != NULL)
@ -855,6 +853,7 @@ mpegts_base_apply_pmt (MpegTSBase * base,
{
MpegTSBaseProgram *program, *old_program;
guint program_number;
gboolean initial_program = TRUE;
/* FIXME : not so sure this is valid anymore */
if (G_UNLIKELY (base->seen_pat == FALSE)) {
@ -889,11 +888,13 @@ mpegts_base_apply_pmt (MpegTSBase * base,
/* Desactivate the old program */
mpegts_base_deactivate_program (base, old_program);
mpegts_base_free_program (old_program);
initial_program = FALSE;
} else
program = old_program;
/* First activate program */
mpegts_base_activate_program (base, program, pmt_pid, pmt_info);
mpegts_base_activate_program (base, program, pmt_pid, pmt_info,
initial_program);
/* if (program->pmt_info) */
/* gst_structure_free (program->pmt_info); */
@ -986,9 +987,10 @@ mpegts_base_handle_psi (MpegTSBase * base, MpegTSPacketizerSection * section)
mpegts_base_apply_pat (base, structure);
if (base->seen_pat == FALSE) {
base->seen_pat = TRUE;
base->first_pat_offset = GST_BUFFER_OFFSET (section->buffer);
GST_DEBUG ("First PAT offset: %" G_GUINT64_FORMAT,
base->first_pat_offset);
GST_BUFFER_OFFSET (section->buffer));
mpegts_packetizer_set_reference_offset (base->packetizer,
GST_BUFFER_OFFSET (section->buffer));
}
} else
@ -1215,8 +1217,6 @@ mpegts_base_sink_event (GstPad * pad, GstEvent * event)
gst_segment_set_newsegment_full (&base->segment, update, rate,
applied_rate, format, start, stop, position);
gst_event_unref (event);
base->in_gap = GST_CLOCK_TIME_NONE;
base->first_buf_ts = GST_CLOCK_TIME_NONE;
}
break;
case GST_EVENT_EOS:
@ -1231,7 +1231,6 @@ mpegts_base_sink_event (GstPad * pad, GstEvent * event)
case GST_EVENT_FLUSH_STOP:
gst_segment_init (&base->segment, GST_FORMAT_UNDEFINED);
base->seen_pat = FALSE;
base->first_pat_offset = -1;
/* Passthrough */
default:
res = GST_MPEGTS_BASE_GET_CLASS (base)->push_event (base, event);
@ -1256,9 +1255,6 @@ query_upstream_latency (MpegTSBase * base)
GST_WARNING_OBJECT (base, "Failed to query upstream latency");
gst_query_unref (query);
base->query_latency = TRUE;
/* Calculate clock skew for live streams only */
base->packetizer->calculate_skew = base->upstream_live;
}
static inline GstFlowReturn
@ -1293,13 +1289,6 @@ mpegts_base_chain (GstPad * pad, GstBuffer * buf)
query_upstream_latency (base);
}
if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (base->first_buf_ts)) &&
GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
base->first_buf_ts = GST_BUFFER_TIMESTAMP (buf);
GST_DEBUG_OBJECT (base, "first buffer timestamp %" GST_TIME_FORMAT,
GST_TIME_ARGS (base->first_buf_ts));
}
mpegts_packetizer_push (base->packetizer, buf);
while (((pret = mpegts_packetizer_next_packet (base->packetizer,
&packet)) != PACKET_NEED_MORE) && res == GST_FLOW_OK) {
@ -1347,17 +1336,20 @@ mpegts_base_scan (MpegTSBase * base)
GstFlowReturn ret;
GstBuffer *buf;
guint i;
MpegTSBaseClass *klass = GST_MPEGTS_BASE_GET_CLASS (base);
gboolean done = FALSE;
MpegTSPacketizerPacketReturn pret;
gint64 tmpval;
guint64 upstream_size, seek_pos;
GstFormat format;
guint initial_pcr_seen;
GST_DEBUG ("Scanning for initial sync point");
/* Find initial sync point */
for (i = 0; i < 10; i++) {
GST_DEBUG ("Grabbing %d => %d", i * 50 * MPEGTS_MAX_PACKETSIZE,
50 * MPEGTS_MAX_PACKETSIZE);
/* Find initial sync point and at least 5 PCR values */
for (i = 0; i < 10 && !done; i++) {
GST_DEBUG ("Grabbing %d => %d", i * 65536, 65536);
ret = gst_pad_pull_range (base->sinkpad, i * 50 * MPEGTS_MAX_PACKETSIZE,
50 * MPEGTS_MAX_PACKETSIZE, &buf);
ret = gst_pad_pull_range (base->sinkpad, i * 65536, 65536, &buf);
if (G_UNLIKELY (ret != GST_FLOW_OK))
goto beach;
@ -1365,33 +1357,80 @@ mpegts_base_scan (MpegTSBase * base)
mpegts_packetizer_push (base->packetizer, buf);
if (mpegts_packetizer_has_packets (base->packetizer)) {
/* Mark the initial sync point and remember the packetsize */
base->initial_sync_point = base->seek_offset = base->packetizer->offset;
GST_DEBUG ("Sync point is now %" G_GUINT64_FORMAT, base->seek_offset);
base->packetsize = base->packetizer->packet_size;
/* If the subclass can seek for timestamps, do that */
if (klass->find_timestamps) {
guint64 offset;
mpegts_packetizer_clear (base->packetizer);
ret = klass->find_timestamps (base, 0, &offset);
base->initial_sync_point = base->seek_offset =
base->packetizer->offset = base->first_pat_offset;
if (base->seek_offset == -1) {
/* Mark the initial sync point and remember the packetsize */
base->seek_offset = base->packetizer->offset;
GST_DEBUG ("Sync point is now %" G_GUINT64_FORMAT, base->seek_offset);
base->packetsize = base->packetizer->packet_size;
}
while (1) {
/* Eat up all packets */
pret = mpegts_packetizer_process_next_packet (base->packetizer);
if (pret == PACKET_NEED_MORE)
break;
if (pret != PACKET_BAD &&
mpegts_packetizer_get_seen_pcr (base->packetizer) >= 5) {
GST_DEBUG ("Got enough initial PCR");
done = TRUE;
break;
}
}
goto beach;
}
}
GST_WARNING ("Didn't find initial sync point");
ret = GST_FLOW_ERROR;
initial_pcr_seen = mpegts_packetizer_get_seen_pcr (base->packetizer);
if (G_UNLIKELY (initial_pcr_seen == 0))
goto no_initial_pcr;
GST_DEBUG ("Seen %d initial PCR", initial_pcr_seen);
/* Now send data from the end */
mpegts_packetizer_clear (base->packetizer);
/* Get the size of upstream */
format = GST_FORMAT_BYTES;
if (!gst_pad_query_peer_duration (base->sinkpad, &format, &tmpval))
goto beach;
upstream_size = tmpval;
done = FALSE;
/* Find last PCR value */
for (seek_pos = MAX (0, upstream_size - 655360);
seek_pos < upstream_size && !done; seek_pos += 65536) {
GST_DEBUG ("Grabbing %d => %d", seek_pos, 65536);
ret = gst_pad_pull_range (base->sinkpad, seek_pos, 65536, &buf);
if (G_UNLIKELY (ret != GST_FLOW_OK))
goto beach;
/* Push to packetizer */
mpegts_packetizer_push (base->packetizer, buf);
if (mpegts_packetizer_has_packets (base->packetizer)) {
while (1) {
/* Eat up all packets */
pret = mpegts_packetizer_process_next_packet (base->packetizer);
if (pret == PACKET_NEED_MORE)
break;
if (pret != PACKET_BAD &&
mpegts_packetizer_get_seen_pcr (base->packetizer) >
initial_pcr_seen) {
GST_DEBUG ("Got last PCR");
done = TRUE;
break;
}
}
}
}
beach:
mpegts_packetizer_clear (base->packetizer);
return ret;
no_initial_pcr:
mpegts_packetizer_clear (base->packetizer);
GST_WARNING_OBJECT (base, "Couldn't find any PCR within the first %d bytes",
10 * 65536);
return GST_FLOW_ERROR;
}
@ -1409,7 +1448,7 @@ mpegts_base_loop (MpegTSBase * base)
GST_DEBUG ("Changing to Streaming");
break;
case BASE_MODE_SEEKING:
/* FIXME : yes, we should do something here */
/* FIXME : unclear if we still need mode_seeking... */
base->mode = BASE_MODE_STREAMING;
break;
case BASE_MODE_STREAMING:
@ -1492,6 +1531,8 @@ mpegts_base_handle_seek_event (MpegTSBase * base, GstPad * pad,
flush = flags & GST_SEEK_FLAG_FLUSH;
if (base->mode == BASE_MODE_PUSHING) {
/* FIXME : Actually ... it is supported, we just need to convert
* the seek event to BYTES */
GST_ERROR ("seeking in push mode not supported");
goto push_mode;
}
@ -1505,6 +1546,7 @@ mpegts_base_handle_seek_event (MpegTSBase * base, GstPad * pad,
gst_event_new_flush_start ());
} else
gst_pad_pause_task (base->sinkpad);
/* wait for streaming to finish */
GST_PAD_STREAM_LOCK (base->sinkpad);
@ -1512,6 +1554,8 @@ mpegts_base_handle_seek_event (MpegTSBase * base, GstPad * pad,
/* send a FLUSH_STOP for the sinkpad, since we need data for seeking */
GST_DEBUG_OBJECT (base, "sending flush stop");
gst_pad_push_event (base->sinkpad, gst_event_new_flush_stop ());
/* And actually flush our pending data */
mpegts_base_flush (base);
}
if (flags & (GST_SEEK_FLAG_SEGMENT | GST_SEEK_FLAG_SKIP)) {
@ -1526,11 +1570,9 @@ mpegts_base_handle_seek_event (MpegTSBase * base, GstPad * pad,
ret = klass->seek (base, event);
if (G_UNLIKELY (ret != GST_FLOW_OK)) {
GST_WARNING ("seeking failed %s", gst_flow_get_name (ret));
goto done;
}
} else {
GST_WARNING ("subclass has no seek implementation");
goto done;
}
}
@ -1568,6 +1610,7 @@ mpegts_base_sink_activate_pull (GstPad * pad, gboolean active)
MpegTSBase *base = GST_MPEGTS_BASE (GST_OBJECT_PARENT (pad));
if (active) {
base->mode = BASE_MODE_SCANNING;
base->packetizer->calculate_offset = TRUE;
return gst_pad_start_task (pad, (GstTaskFunction) mpegts_base_loop, base);
} else
return gst_pad_stop_task (pad);
@ -1578,6 +1621,7 @@ mpegts_base_sink_activate_push (GstPad * pad, gboolean active)
{
MpegTSBase *base = GST_MPEGTS_BASE (GST_OBJECT_PARENT (pad));
base->mode = BASE_MODE_PUSHING;
base->packetizer->calculate_skew = TRUE;
return TRUE;
}

View file

@ -78,6 +78,8 @@ struct _MpegTSBaseProgram
/* TRUE if the program is currently being used */
gboolean active;
/* TRUE if this is the first program created */
gboolean initial_program;
};
typedef enum {
@ -98,9 +100,6 @@ struct _MpegTSBase {
/* pull-based behaviour */
MpegTSBaseMode mode;
/* location of first sync point */
guint64 initial_sync_point;
/* Current pull offset (also set by seek handler) */
guint64 seek_offset;
@ -132,13 +131,6 @@ struct _MpegTSBase {
/* Whether we saw a PAT yet */
gboolean seen_pat;
/* Offset from the origin to the first PAT (pullmode) */
guint64 first_pat_offset;
/* interpolation gap between the upstream timestamp and the pts */
GstClockTime in_gap;
GstClockTime first_buf_ts;
/* Whether upstream is live or not */
gboolean upstream_live;
/* Whether we queried the upstream latency or not */

View file

@ -69,8 +69,17 @@
/* seek to SEEK_TIMESTAMP_OFFSET before the desired offset and search then
* either accurately or for the next timestamp
*/
#define SEEK_TIMESTAMP_OFFSET (1000 * GST_MSECOND)
#define SEEK_TIMESTAMP_OFFSET (500 * GST_MSECOND)
#define SEGMENT_FORMAT "[format:%s, rate:%f, start:%" \
GST_TIME_FORMAT", stop:%"GST_TIME_FORMAT", time:%"GST_TIME_FORMAT \
", accum:%"GST_TIME_FORMAT", last_stop:%"GST_TIME_FORMAT \
", duration:%"GST_TIME_FORMAT"]"
#define SEGMENT_ARGS(a) gst_format_get_name((a).format), (a).rate, \
GST_TIME_ARGS((a).start), GST_TIME_ARGS((a).stop), \
GST_TIME_ARGS((a).time), GST_TIME_ARGS((a).accum), \
GST_TIME_ARGS((a).last_stop), GST_TIME_ARGS((a).duration)
GST_DEBUG_CATEGORY_STATIC (ts_demux_debug);
@ -128,9 +137,15 @@ struct _TSDemuxStream
/* Raw value of current PTS/DTS */
guint64 raw_pts;
guint64 raw_dts;
/* PTS/DTS with rollover fixed */
guint64 fixed_pts;
guint64 fixed_dts;
/* Number of rollover seen for PTS/DTS (default:0) */
guint nb_pts_rollover;
guint nb_dts_rollover;
/* Whether this stream needs to send a newsegment */
gboolean need_newsegment;
};
#define VIDEO_CAPS \
@ -204,8 +219,6 @@ enum
};
/* Pad functions */
static const GstQueryType *gst_ts_demux_srcpad_query_types (GstPad * pad);
static gboolean gst_ts_demux_srcpad_query (GstPad * pad, GstQuery * query);
/* mpegtsbase methods */
@ -304,12 +317,19 @@ gst_ts_demux_reset (MpegTSBase * base)
{
GstTSDemux *demux = (GstTSDemux *) base;
demux->need_newsegment = TRUE;
demux->program_number = -1;
demux->duration = GST_CLOCK_TIME_NONE;
demux->calculate_update_segment = FALSE;
gst_segment_init (&demux->segment, GST_FORMAT_TIME);
if (demux->segment_event) {
gst_event_unref (demux->segment_event);
demux->segment_event = NULL;
}
if (demux->update_segment) {
gst_event_unref (demux->update_segment);
demux->update_segment = NULL;
}
}
static void
@ -389,9 +409,21 @@ gst_ts_demux_srcpad_query (GstPad * pad, GstQuery * query)
GST_DEBUG ("query duration");
gst_query_parse_duration (query, &format, NULL);
if (format == GST_FORMAT_TIME) {
if (!gst_pad_peer_query (base->sinkpad, query))
gst_query_set_duration (query, GST_FORMAT_TIME,
demux->segment.duration);
if (!gst_pad_peer_query (base->sinkpad, query)) {
gint64 val;
format = GST_FORMAT_BYTES;
if (!gst_pad_query_peer_duration (base->sinkpad, &format, &val))
res = FALSE;
else {
GstClockTime dur =
mpegts_packetizer_offset_to_ts (base->packetizer, val);
if (GST_CLOCK_TIME_IS_VALID (dur))
gst_query_set_duration (query, GST_FORMAT_TIME, dur);
else
res = FALSE;
}
}
} else {
GST_DEBUG_OBJECT (demux, "only query duration on TIME is supported");
res = FALSE;
@ -453,13 +485,6 @@ gst_ts_demux_srcpad_query (GstPad * pad, GstQuery * query)
}
static GstFlowReturn
gst_ts_demux_perform_seek (MpegTSBase * base, GstSegment * segment)
{
return GST_FLOW_ERROR;
}
static GstFlowReturn
gst_ts_demux_do_seek (MpegTSBase * base, GstEvent * event)
{
@ -472,6 +497,7 @@ gst_ts_demux_do_seek (MpegTSBase * base, GstEvent * event)
gint64 start, stop;
GstSegment seeksegment;
gboolean update;
guint64 start_offset;
gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
&stop_type, &stop);
@ -492,31 +518,35 @@ gst_ts_demux_do_seek (MpegTSBase * base, GstEvent * event)
/* copy segment, we need this because we still need the old
* segment when we close the current segment. */
memcpy (&seeksegment, &demux->segment, sizeof (GstSegment));
if (demux->segment_event) {
gst_event_unref (demux->segment_event);
demux->segment_event = NULL;
}
/* configure the segment with the seek variables */
GST_DEBUG_OBJECT (demux, "configuring seek");
GST_DEBUG ("seeksegment: start: %" GST_TIME_FORMAT " stop: %"
GST_TIME_FORMAT " time: %" GST_TIME_FORMAT " accum: %" GST_TIME_FORMAT
" last_stop: %" GST_TIME_FORMAT " duration: %" GST_TIME_FORMAT,
GST_TIME_ARGS (seeksegment.start), GST_TIME_ARGS (seeksegment.stop),
GST_TIME_ARGS (seeksegment.time), GST_TIME_ARGS (seeksegment.accum),
GST_TIME_ARGS (seeksegment.last_stop),
GST_TIME_ARGS (seeksegment.duration));
GST_DEBUG ("seeksegment before set_seek " SEGMENT_FORMAT,
SEGMENT_ARGS (seeksegment));
gst_segment_set_seek (&seeksegment, rate, format, flags, start_type, start,
stop_type, stop, &update);
GST_DEBUG ("seeksegment: start: %" GST_TIME_FORMAT " stop: %"
GST_TIME_FORMAT " time: %" GST_TIME_FORMAT " accum: %" GST_TIME_FORMAT
" last_stop: %" GST_TIME_FORMAT " duration: %" GST_TIME_FORMAT,
GST_TIME_ARGS (seeksegment.start), GST_TIME_ARGS (seeksegment.stop),
GST_TIME_ARGS (seeksegment.time), GST_TIME_ARGS (seeksegment.accum),
GST_TIME_ARGS (seeksegment.last_stop),
GST_TIME_ARGS (seeksegment.duration));
res = gst_ts_demux_perform_seek (base, &seeksegment);
if (G_UNLIKELY (res != GST_FLOW_OK)) {
GST_WARNING ("seeking failed %s", gst_flow_get_name (res));
GST_DEBUG ("seeksegment after set_seek " SEGMENT_FORMAT,
SEGMENT_ARGS (seeksegment));
/* Convert start/stop to offset */
start_offset =
mpegts_packetizer_ts_to_offset (base->packetizer, MAX (0,
start - SEEK_TIMESTAMP_OFFSET));
if (G_UNLIKELY (start_offset == -1)) {
GST_WARNING ("Couldn't convert start position to an offset");
goto done;
}
/* record offset */
base->seek_offset = start_offset;
res = GST_FLOW_OK;
/* commit the new segment */
memcpy (&demux->segment, &seeksegment, sizeof (GstSegment));
@ -927,10 +957,13 @@ gst_ts_demux_stream_added (MpegTSBase * base, MpegTSBaseStream * bstream,
if (bstream->stream_type != 0xff)
stream->pad = create_pad_for_stream (base, bstream, program);
stream->need_newsegment = TRUE;
stream->pts = GST_CLOCK_TIME_NONE;
stream->dts = GST_CLOCK_TIME_NONE;
stream->raw_pts = 0;
stream->raw_dts = 0;
stream->fixed_pts = 0;
stream->fixed_dts = 0;
stream->nb_pts_rollover = 0;
stream->nb_dts_rollover = 0;
}
@ -940,24 +973,14 @@ gst_ts_demux_stream_added (MpegTSBase * base, MpegTSBaseStream * bstream,
static void
gst_ts_demux_stream_removed (MpegTSBase * base, MpegTSBaseStream * bstream)
{
GstTSDemux *demux = GST_TS_DEMUX (base);
TSDemuxStream *stream = (TSDemuxStream *) bstream;
if (stream->pad) {
if (gst_pad_is_active (stream->pad)) {
gboolean need_newsegment = demux->need_newsegment;
/* We must not send the newsegment when flushing the pending data
on the removed stream. We should only push it when the newly added
stream finishes parsing its PTS */
demux->need_newsegment = FALSE;
/* Flush out all data */
GST_DEBUG_OBJECT (stream->pad, "Flushing out pending data");
gst_ts_demux_push_pending_data ((GstTSDemux *) base, stream);
demux->need_newsegment = need_newsegment;
GST_DEBUG_OBJECT (stream->pad, "Pushing out EOS");
gst_pad_push_event (stream->pad, gst_event_new_eos ());
GST_DEBUG_OBJECT (stream->pad, "Deactivating and removing pad");
@ -998,6 +1021,15 @@ gst_ts_demux_stream_flush (TSDemuxStream * stream)
stream->nbpending = 0;
stream->current = NULL;
stream->need_newsegment = TRUE;
stream->pts = GST_CLOCK_TIME_NONE;
stream->dts = GST_CLOCK_TIME_NONE;
stream->raw_pts = 0;
stream->raw_dts = 0;
stream->fixed_pts = 0;
stream->fixed_dts = 0;
stream->nb_pts_rollover = 0;
stream->nb_dts_rollover = 0;
}
static void
@ -1023,6 +1055,20 @@ gst_ts_demux_program_started (MpegTSBase * base, MpegTSBaseProgram * program)
demux->program_number = program->program_number;
demux->program = program;
/* If this is not the initial program, we need to calculate
* an update newsegment */
demux->calculate_update_segment = !program->initial_program;
/* If we have an upstream time segment and it's the initial program, just use that */
if (program->initial_program && base->segment.format == GST_FORMAT_TIME) {
demux->segment = base->segment;
demux->segment_event =
gst_event_new_new_segment_full (FALSE, base->segment.rate,
base->segment.applied_rate, GST_FORMAT_TIME, base->segment.start,
base->segment.stop, base->segment.time);
GST_EVENT_SRC (demux->segment_event) = gst_object_ref (demux);
}
/* Activate all stream pads, pads will already have been created */
if (base->mode != BASE_MODE_SCANNING) {
for (tmp = program->stream_list; tmp; tmp = tmp->next)
@ -1032,7 +1078,6 @@ gst_ts_demux_program_started (MpegTSBase * base, MpegTSBaseProgram * program)
/* Inform scanner we have got our program */
demux->current_program_number = program->program_number;
demux->need_newsegment = TRUE;
}
}
@ -1113,8 +1158,8 @@ gst_ts_demux_record_pts (GstTSDemux * demux, TSDemuxStream * stream,
/* Compute PTS in GstClockTime */
stream->raw_pts = pts;
stream->pts =
MPEGTIME_TO_GSTTIME (pts + stream->nb_pts_rollover * PTS_DTS_MAX_VALUE);
stream->fixed_pts = pts + stream->nb_pts_rollover * PTS_DTS_MAX_VALUE;
stream->pts = MPEGTIME_TO_GSTTIME (stream->fixed_pts);
GST_LOG ("pid 0x%04x Stored PTS %" G_GUINT64_FORMAT " (%" GST_TIME_FORMAT ")",
bs->pid, stream->raw_pts, GST_TIME_ARGS (stream->pts));
@ -1159,8 +1204,8 @@ gst_ts_demux_record_dts (GstTSDemux * demux, TSDemuxStream * stream,
/* Compute DTS in GstClockTime */
stream->raw_dts = dts;
stream->dts =
MPEGTIME_TO_GSTTIME (dts + stream->nb_dts_rollover * PTS_DTS_MAX_VALUE);
stream->fixed_dts = dts + stream->nb_dts_rollover * PTS_DTS_MAX_VALUE;
stream->dts = MPEGTIME_TO_GSTTIME (stream->fixed_dts);
GST_LOG ("pid 0x%04x Stored DTS %" G_GUINT64_FORMAT " (%" GST_TIME_FORMAT ")",
bs->pid, stream->raw_dts, GST_TIME_ARGS (stream->dts));
@ -1215,46 +1260,12 @@ gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream)
GST_TIME_ARGS (stream->pts),
GST_TIME_ARGS (MPEGTIME_TO_GSTTIME (header.DTS)));
/* safe default if insufficient upstream info */
if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (base->in_gap) &&
GST_CLOCK_TIME_IS_VALID (base->first_buf_ts) &&
base->mode == BASE_MODE_PUSHING &&
base->segment.format == GST_FORMAT_TIME)) {
/* Find the earliest current PTS we're going to push */
GstClockTime firstpts = GST_CLOCK_TIME_NONE;
GList *tmp;
for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
TSDemuxStream *pstream = (TSDemuxStream *) tmp->data;
if (!GST_CLOCK_TIME_IS_VALID (firstpts) || pstream->pts < firstpts)
firstpts = pstream->pts;
}
base->in_gap = base->first_buf_ts - firstpts;
GST_DEBUG_OBJECT (base, "upstream segment start %" GST_TIME_FORMAT
", first buffer timestamp: %" GST_TIME_FORMAT
", first PTS: %" GST_TIME_FORMAT
", interpolation gap: %" GST_TIME_FORMAT,
GST_TIME_ARGS (base->segment.start),
GST_TIME_ARGS (base->first_buf_ts), GST_TIME_ARGS (firstpts),
GST_TIME_ARGS (base->in_gap));
}
if (!GST_CLOCK_TIME_IS_VALID (base->in_gap))
base->in_gap = 0;
if (base->upstream_live) {
{
MpegTSPacketizer2 *packetizer = base->packetizer;
if (GST_CLOCK_TIME_IS_VALID (packetizer->base_time))
GST_BUFFER_TIMESTAMP (stream->pendingbuffers[0]) =
stream->pts - packetizer->base_pcrtime + packetizer->base_time +
packetizer->skew;
else
GST_BUFFER_TIMESTAMP (stream->pendingbuffers[0]) = GST_CLOCK_TIME_NONE;
} else
GST_BUFFER_TIMESTAMP (stream->pendingbuffers[0]) =
stream->pts + base->in_gap;
mpegts_packetizer_pts_to_ts (packetizer, stream->pts);
}
GST_DEBUG ("buf %" GST_TIME_FORMAT,
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (stream->pendingbuffers[0])));
}
@ -1346,85 +1357,77 @@ static void
calculate_and_push_newsegment (GstTSDemux * demux, TSDemuxStream * stream)
{
MpegTSBase *base = (MpegTSBase *) demux;
GstEvent *newsegmentevent;
gint64 start = 0, stop = GST_CLOCK_TIME_NONE, position = 0;
GstClockTime firstpts = GST_CLOCK_TIME_NONE;
GstClockTime lowest_pts = GST_CLOCK_TIME_NONE;
GstClockTime firstts = GST_CLOCK_TIME_NONE;
GList *tmp;
GST_DEBUG ("Creating new newsegment for stream %p", stream);
/* Outgoing newsegment values
* start : The first/start PTS
* stop : The last PTS (or -1)
* position : The stream time corresponding to start
*
* Except for live mode with incoming GST_TIME_FORMAT newsegment where
* it is the same values as that incoming newsegment (and we convert the
* PTS to that remote clock).
*/
/* 1) If we need to calculate an update newsegment, do it
* 2) If we need to calculate a new newsegment, do it
* 3) If an update_segment is valid, push it
* 4) If a newsegment is valid, push it */
/* Speedup : if we don't need to calculate anything, go straight to pushing */
if (!demux->calculate_update_segment && demux->segment_event)
goto push_new_segment;
/* Calculate the 'new_start' value, used for both updates and newsegment */
for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
TSDemuxStream *pstream = (TSDemuxStream *) tmp->data;
if (!GST_CLOCK_TIME_IS_VALID (firstpts) || pstream->pts < firstpts)
firstpts = pstream->pts;
if (GST_CLOCK_TIME_IS_VALID (pstream->pts)) {
if (!GST_CLOCK_TIME_IS_VALID (lowest_pts) || pstream->pts < lowest_pts)
lowest_pts = pstream->pts;
}
if (GST_CLOCK_TIME_IS_VALID (pstream->dts)) {
if (!GST_CLOCK_TIME_IS_VALID (lowest_pts) || pstream->dts < lowest_pts)
lowest_pts = pstream->dts;
}
}
if (GST_CLOCK_TIME_IS_VALID (lowest_pts))
firstts = mpegts_packetizer_pts_to_ts (base->packetizer, lowest_pts);
GST_DEBUG ("lowest_pts %" G_GUINT64_FORMAT " => clocktime %" GST_TIME_FORMAT,
lowest_pts, GST_TIME_ARGS (firstts));
if (demux->calculate_update_segment) {
GST_DEBUG ("Calculating update segment");
/* If we have a valid segment, create an update of that */
if (demux->segment.format == GST_FORMAT_TIME) {
GST_DEBUG ("Re-using segment " SEGMENT_FORMAT,
SEGMENT_ARGS (demux->segment));
demux->update_segment =
gst_event_new_new_segment_full (TRUE, demux->segment.rate,
demux->segment.applied_rate, GST_FORMAT_TIME, demux->segment.start,
firstts, demux->segment.time);
GST_EVENT_SRC (demux->update_segment) = gst_object_ref (demux);
}
demux->calculate_update_segment = FALSE;
}
if (base->mode == BASE_MODE_PUSHING) {
/* FIXME : We're just ignore the upstream format for the time being */
/* FIXME : We should use base->segment.format and a upstream latency query
* to decide if we need to use live values or not */
GST_DEBUG ("push-based. base Segment start:%" GST_TIME_FORMAT " duration:%"
GST_TIME_FORMAT ", stop:%" GST_TIME_FORMAT ", time:%" GST_TIME_FORMAT,
GST_TIME_ARGS (base->segment.start),
GST_TIME_ARGS (base->segment.duration),
GST_TIME_ARGS (base->segment.stop), GST_TIME_ARGS (base->segment.time));
GST_DEBUG ("push-based. demux Segment start:%" GST_TIME_FORMAT " duration:%"
GST_TIME_FORMAT ", stop:%" GST_TIME_FORMAT ", time:%" GST_TIME_FORMAT,
GST_TIME_ARGS (demux->segment.start),
GST_TIME_ARGS (demux->segment.duration),
GST_TIME_ARGS (demux->segment.stop),
GST_TIME_ARGS (demux->segment.time));
GST_DEBUG ("stream pts: %" GST_TIME_FORMAT " first pts: %" GST_TIME_FORMAT,
GST_TIME_ARGS (stream->pts), GST_TIME_ARGS (firstpts));
if (base->segment.format == GST_FORMAT_TIME) {
start = base->segment.start;
stop = base->segment.stop;
}
if (!base->upstream_live) {
/* Shift the start depending on our position in the stream */
start += firstpts + base->in_gap - base->first_buf_ts;
}
position = start;
} else {
/* pull mode */
GST_DEBUG ("pull-based. Segment start:%" GST_TIME_FORMAT " duration:%"
GST_TIME_FORMAT ", time:%" GST_TIME_FORMAT,
GST_TIME_ARGS (demux->segment.start),
GST_TIME_ARGS (demux->segment.duration),
GST_TIME_ARGS (demux->segment.time));
/* FIXME : This is not entirely correct. We should be using the PTS time
* realm and not the PCR one. Doesn't matter *too* much if PTS/PCR values
* aren't too far apart, but still. */
/* FIXME : EDWARD : Removed previous first pcr gsttime */
start = demux->segment.start;
stop = demux->segment.duration;
position = demux->segment.time;
if (!demux->segment_event) {
GST_DEBUG ("Calculating actual segment");
/* FIXME : Set proper values */
demux->segment_event =
gst_event_new_new_segment_full (FALSE, 1.0, 1.0, GST_FORMAT_TIME,
firstts, GST_CLOCK_TIME_NONE, firstts);
GST_EVENT_SRC (demux->segment_event) = gst_object_ref (demux);
}
GST_DEBUG ("new segment: start: %" GST_TIME_FORMAT " stop: %"
GST_TIME_FORMAT " time: %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
GST_TIME_ARGS (stop), GST_TIME_ARGS (position));
newsegmentevent =
gst_event_new_new_segment (FALSE, 1.0, GST_FORMAT_TIME, start, stop,
position);
push_new_segment:
if (demux->update_segment) {
GST_DEBUG_OBJECT (stream->pad, "Pushing update segment");
gst_event_ref (demux->update_segment);
gst_pad_push_event (stream->pad, demux->update_segment);
}
push_event ((MpegTSBase *) demux, newsegmentevent);
if (demux->segment_event) {
GST_DEBUG_OBJECT (stream->pad, "Pushing newsegment event");
gst_event_ref (demux->segment_event);
gst_pad_push_event (stream->pad, demux->segment_event);
}
demux->need_newsegment = FALSE;
stream->need_newsegment = FALSE;
}
static GstFlowReturn
@ -1461,7 +1464,7 @@ gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream)
goto beach;
}
if (G_UNLIKELY (demux->need_newsegment))
if (G_UNLIKELY (stream->need_newsegment))
calculate_and_push_newsegment (demux, stream);
/* We have a confirmed buffer, let's push it out */
@ -1475,12 +1478,9 @@ gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream)
GST_DEBUG_OBJECT (stream->pad, "stream->pts %" GST_TIME_FORMAT,
GST_TIME_ARGS (stream->pts));
if (GST_CLOCK_TIME_IS_VALID (stream->pts)
&& !GST_CLOCK_TIME_IS_VALID (GST_BUFFER_TIMESTAMP (firstbuffer))
&& GST_CLOCK_TIME_IS_VALID (packetizer->base_time)) {
&& !GST_CLOCK_TIME_IS_VALID (GST_BUFFER_TIMESTAMP (firstbuffer)))
GST_BUFFER_TIMESTAMP (firstbuffer) =
stream->pts - packetizer->base_pcrtime + packetizer->base_time +
packetizer->skew;
}
mpegts_packetizer_pts_to_ts (packetizer, stream->pts);
GST_DEBUG_OBJECT (stream->pad,
"Pushing buffer list with timestamp: %" GST_TIME_FORMAT,
@ -1493,7 +1493,7 @@ gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream)
beach:
/* Reset everything */
GST_LOG ("Resetting to EMPTY");
GST_LOG ("Resetting to EMPTY, returning %s", gst_flow_get_name (res));
stream->state = PENDING_PACKET_EMPTY;
memset (stream->pendingbuffers, 0, TS_MAX_PENDING_BUFFERS);
stream->nbpending = 0;
@ -1536,7 +1536,7 @@ gst_ts_demux_handle_packet (GstTSDemux * demux, TSDemuxStream * stream,
GST_BUFFER_OFFSET (packet->buffer));
}
if (packet->payload)
if (packet->payload && (res == GST_FLOW_OK || res == GST_FLOW_NOT_LINKED))
gst_ts_demux_queue_data (demux, stream, packet);
else
gst_buffer_unref (packet->buffer);
@ -1549,7 +1549,6 @@ gst_ts_demux_flush (MpegTSBase * base)
{
GstTSDemux *demux = GST_TS_DEMUX_CAST (base);
demux->need_newsegment = TRUE;
gst_ts_demux_flush_streams (demux);
}

View file

@ -61,11 +61,18 @@ struct _GstTSDemux
/*< private >*/
MpegTSBaseProgram *program; /* Current program */
guint current_program_number;
gboolean need_newsegment;
/* Downstream segment */
/* segments to be sent */
GstSegment segment;
GstClockTime duration; /* Total duration */
GstEvent *segment_event;
/* Set when program change */
gboolean calculate_update_segment;
/* update segment is */
GstEvent *update_segment;
/* Full stream duration */
GstClockTime duration;
};
struct _GstTSDemuxClass