mpegtsdemux: push based seeking based on PCR

buffer timestamps are converted to GstClockTime to cover pcr/pts wraps.
multiple pcr/pts wraps are handled with an index which ensures at most
a single pcr wraparound between two entries.
the last seen pcr is recorded to have a nearby index point for short seeks
resuming playback might be delayed if the postion is not a keyframe

TODO: replace manual packet scanning and parsing in the initial duration estimation
This commit is contained in:
Janne Grunau 2011-02-22 12:33:56 +01:00 committed by Edward Hervey
parent f89a0abca0
commit 15391b29e1
6 changed files with 807 additions and 52 deletions

View file

@ -187,6 +187,8 @@ mpegts_base_class_init (MpegTSBaseClass * klass)
static void
mpegts_base_reset (MpegTSBase * base)
{
MpegTSBaseClass *klass = GST_MPEGTS_BASE_GET_CLASS (base);
mpegts_packetizer_clear (base->packetizer);
memset (base->is_pes, 0, 8192);
memset (base->known_psi, 0, 8192);
@ -206,6 +208,8 @@ mpegts_base_reset (MpegTSBase * base)
/* base->pat = NULL; */
/* pmt pids will be added and removed dynamically */
if (klass->reset)
klass->reset (base);
}
static void
@ -1013,8 +1017,8 @@ mpegts_base_sink_event (GstPad * pad, GstEvent * event)
gst_event_unref (event);
res = FALSE;
break;
case GST_EVENT_FLUSH_STOP:
mpegts_packetizer_clear (base->packetizer);
case GST_EVENT_FLUSH_START:
mpegts_packetizer_flush (base->packetizer);
/* Passthrough */
default:
res = GST_MPEGTS_BASE_GET_CLASS (base)->push_event (base, event);
@ -1181,6 +1185,9 @@ mpegts_base_loop (MpegTSBase * base)
goto error;
}
break;
case BASE_MODE_PUSHING:
GST_WARNING ("wrong BASE_MODE_PUSHING mode in pull loop");
break;
}
return;
@ -1201,6 +1208,92 @@ error:
}
}
gboolean
mpegts_base_handle_seek_event (MpegTSBase * base, GstPad * pad,
GstEvent * event)
{
MpegTSBaseClass *klass = GST_MPEGTS_BASE_GET_CLASS (base);
GstFlowReturn ret = GST_FLOW_ERROR;
gdouble rate;
gboolean flush;
GstFormat format;
GstSeekFlags flags;
GstSeekType start_type, stop_type;
gint64 start, stop;
gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
&stop_type, &stop);
if (format != GST_FORMAT_TIME)
return FALSE;
GST_DEBUG ("seek event, rate: %f start: %" GST_TIME_FORMAT
" stop: %" GST_TIME_FORMAT, rate, GST_TIME_ARGS (start),
GST_TIME_ARGS (stop));
flush = flags & GST_SEEK_FLAG_FLUSH;
if (base->mode == BASE_MODE_PUSHING) {
GST_ERROR ("seeking in push mode not supported");
goto done;
}
/* stop streaming, either by flushing or by pausing the task */
base->mode = BASE_MODE_SEEKING;
if (flush) {
GST_DEBUG_OBJECT (base, "sending flush start");
gst_pad_push_event (base->sinkpad, gst_event_new_flush_start ());
GST_MPEGTS_BASE_GET_CLASS (base)->push_event (base,
gst_event_new_flush_start ());
} else
gst_pad_pause_task (base->sinkpad);
/* wait for streaming to finish */
GST_PAD_STREAM_LOCK (base->sinkpad);
if (flush) {
/* send a FLUSH_STOP for the sinkpad, since we need data for seeking */
GST_DEBUG_OBJECT (base, "sending flush stop");
gst_pad_push_event (base->sinkpad, gst_event_new_flush_stop ());
}
if (flags & (GST_SEEK_FLAG_KEY_UNIT | GST_SEEK_FLAG_SEGMENT |
GST_SEEK_FLAG_SKIP)) {
GST_WARNING ("seek flags 0x%x are not supported", (int) flags);
goto done;
}
if (format == GST_FORMAT_TIME) {
/* If the subclass can seek, do that */
if (klass->seek) {
ret = klass->seek (base, event);
if (G_UNLIKELY (ret != GST_FLOW_OK)) {
GST_WARNING ("seeking failed %s", gst_flow_get_name (ret));
goto done;
}
} else {
GST_WARNING ("subclass has no seek implementation");
goto done;
}
}
if (flush) {
/* if we sent a FLUSH_START, we now send a FLUSH_STOP */
GST_DEBUG_OBJECT (base, "sending flush stop");
//gst_pad_push_event (base->sinkpad, gst_event_new_flush_stop ());
GST_MPEGTS_BASE_GET_CLASS (base)->push_event (base,
gst_event_new_flush_stop ());
}
//else
done:
gst_pad_start_task (base->sinkpad, (GstTaskFunction) mpegts_base_loop, base);
GST_PAD_STREAM_UNLOCK (base->sinkpad);
return ret == GST_FLOW_OK;
}
static gboolean
mpegts_base_sink_activate (GstPad * pad)
{
@ -1227,6 +1320,8 @@ mpegts_base_sink_activate_pull (GstPad * pad, gboolean active)
static gboolean
mpegts_base_sink_activate_push (GstPad * pad, gboolean active)
{
MpegTSBase *base = GST_MPEGTS_BASE (GST_OBJECT_PARENT (pad));
base->mode = BASE_MODE_PUSHING;
return TRUE;
}
@ -1243,6 +1338,8 @@ mpegts_base_change_state (GstElement * element, GstStateChange transition)
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_READY:
mpegts_base_reset (base);
if (base->mode != BASE_MODE_PUSHING)
base->mode = BASE_MODE_SCANNING;
break;
default:
break;

View file

@ -74,7 +74,8 @@ struct _MpegTSBaseProgram
typedef enum {
BASE_MODE_SCANNING,
BASE_MODE_SEEKING,
BASE_MODE_STREAMING
BASE_MODE_STREAMING,
BASE_MODE_PUSHING
} MpegTSBaseMode;
struct _MpegTSBase {
@ -124,6 +125,7 @@ struct _MpegTSBaseClass {
GstElementClass parent_class;
/* Virtual methods */
void (*reset) (MpegTSBase *base);
GstFlowReturn (*push) (MpegTSBase *base, MpegTSPacketizerPacket *packet, MpegTSPacketizerSection * section);
gboolean (*push_event) (MpegTSBase *base, GstEvent * event);
/* program_started gets called when program's pmt arrives for first time */
@ -139,6 +141,9 @@ struct _MpegTSBaseClass {
/* find_timestamps is called to find PCR */
GstFlowReturn (*find_timestamps) (MpegTSBase * base, guint64 initoff, guint64 *offset);
/* seek is called to wait for seeking */
GstFlowReturn (*seek) (MpegTSBase * base, GstEvent * event);
/* signals */
void (*pat_info) (GstStructure *pat);
void (*pmt_info) (GstStructure *pmt);
@ -155,6 +160,8 @@ MpegTSBaseProgram *mpegts_base_add_program (MpegTSBase * base, gint program_numb
guint8 *mpegts_get_descriptor_from_stream (MpegTSBaseStream * stream, guint8 tag);
guint8 *mpegts_get_descriptor_from_program (MpegTSBaseProgram * program, guint8 tag);
gboolean
mpegts_base_handle_seek_event(MpegTSBase * base, GstPad * pad, GstEvent * event);
gboolean gst_mpegtsbase_plugin_init (GstPlugin * plugin);

View file

@ -2086,6 +2086,24 @@ mpegts_packetizer_clear (MpegTSPacketizer2 * packetizer)
packetizer->empty = TRUE;
}
void
mpegts_packetizer_flush (MpegTSPacketizer2 * packetizer)
{
if (packetizer->streams) {
int i;
for (i = 0; i < 8192; i++) {
if (packetizer->streams[i]) {
gst_adapter_flush (packetizer->streams[i]->section_adapter,
packetizer->streams[i]->section_adapter->size);
}
}
}
gst_adapter_flush (packetizer->adapter, packetizer->adapter->size);
packetizer->offset = 0;
packetizer->empty = TRUE;
}
void
mpegts_packetizer_remove_stream (MpegTSPacketizer2 * packetizer, gint16 pid)
{

View file

@ -138,6 +138,7 @@ GType mpegts_packetizer_get_type(void);
MpegTSPacketizer2 *mpegts_packetizer_new (void);
void mpegts_packetizer_clear (MpegTSPacketizer2 *packetizer);
void mpegts_packetizer_flush (MpegTSPacketizer2 *packetizer);
void mpegts_packetizer_push (MpegTSPacketizer2 *packetizer, GstBuffer *buffer);
gboolean mpegts_packetizer_has_packets (MpegTSPacketizer2 *packetizer);
MpegTSPacketizerPacketReturn mpegts_packetizer_next_packet (MpegTSPacketizer2 *packetizer,

View file

@ -30,6 +30,8 @@
#include <stdlib.h>
#include <string.h>
#include <glib.h>
#include "mpegtsbase.h"
#include "tsdemux.h"
#include "gstmpegdesc.h"
@ -44,6 +46,12 @@
/* Size of the pendingbuffers array. */
#define TS_MAX_PENDING_BUFFERS 256
#define PCR_WRAP_SIZE_128KBPS (((gint64)1490)*(1024*1024))
/* small PCR for wrap detection */
#define PCR_SMALL 17775000
/* maximal PCR time */
#define PCR_MAX_VALUE (((((guint64)1)<<33) * 300) + 298)
GST_DEBUG_CATEGORY_STATIC (ts_demux_debug);
#define GST_CAT_DEFAULT ts_demux_debug
@ -173,6 +181,7 @@ static void
gst_ts_demux_program_started (MpegTSBase * base, MpegTSBaseProgram * program);
static void
gst_ts_demux_program_stopped (MpegTSBase * base, MpegTSBaseProgram * program);
static void gst_ts_demux_reset (MpegTSBase * base);
static GstFlowReturn
gst_ts_demux_push (MpegTSBase * base, MpegTSPacketizerPacket * packet,
MpegTSPacketizerSection * section);
@ -181,6 +190,10 @@ gst_ts_demux_stream_added (MpegTSBase * base, MpegTSBaseStream * stream,
MpegTSBaseProgram * program);
static void
gst_ts_demux_stream_removed (MpegTSBase * base, MpegTSBaseStream * stream);
static GstFlowReturn gst_ts_demux_do_seek (MpegTSBase * base, GstEvent * event);
static GstFlowReturn
find_pcr_packet (MpegTSBase * base, guint64 offset, gint64 length,
TSPcrOffset * pcroffset);
static GstFlowReturn
find_timestamps (MpegTSBase * base, guint64 initoff, guint64 * offset);
static void gst_ts_demux_set_property (GObject * object, guint prop_id,
@ -189,8 +202,9 @@ static void gst_ts_demux_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static void gst_ts_demux_finalize (GObject * object);
static GstFlowReturn
process_pcr (MpegTSBase * base, guint64 initoff, GstClockTime * pcr,
process_pcr (MpegTSBase * base, guint64 initoff, TSPcrOffset * pcroffset,
guint numpcr, gboolean isinitial);
static void gst_ts_demux_flush_streams (GstTSDemux * tsdemux);
static gboolean push_event (MpegTSBase * base, GstEvent * event);
static void _extra_init (GType type);
@ -254,6 +268,7 @@ gst_ts_demux_class_init (GstTSDemuxClass * klass)
ts_class = GST_MPEGTS_BASE_CLASS (klass);
ts_class->reset = GST_DEBUG_FUNCPTR (gst_ts_demux_reset);
ts_class->push = GST_DEBUG_FUNCPTR (gst_ts_demux_push);
ts_class->push_event = GST_DEBUG_FUNCPTR (push_event);
ts_class->program_started = GST_DEBUG_FUNCPTR (gst_ts_demux_program_started);
@ -261,6 +276,7 @@ gst_ts_demux_class_init (GstTSDemuxClass * klass)
ts_class->stream_added = gst_ts_demux_stream_added;
ts_class->stream_removed = gst_ts_demux_stream_removed;
ts_class->find_timestamps = GST_DEBUG_FUNCPTR (find_timestamps);
ts_class->seek = GST_DEBUG_FUNCPTR (gst_ts_demux_do_seek);
}
static void
@ -270,6 +286,32 @@ gst_ts_demux_init (GstTSDemux * demux, GstTSDemuxClass * klass)
demux->program_number = -1;
demux->duration = GST_CLOCK_TIME_NONE;
GST_MPEGTS_BASE (demux)->stream_size = sizeof (TSDemuxStream);
gst_segment_init (&demux->segment, GST_FORMAT_TIME);
demux->first_pcr = (TSPcrOffset) {
GST_CLOCK_TIME_NONE, 0, 0};
demux->cur_pcr = (TSPcrOffset) {
0};
demux->last_pcr = (TSPcrOffset) {
0};
}
static void
gst_ts_demux_reset (MpegTSBase * base)
{
GstTSDemux *demux = (GstTSDemux *) base;
g_array_free (demux->index, TRUE);
demux->index = NULL;
demux->index_size = 0;
demux->need_newsegment = TRUE;
demux->program_number = -1;
demux->duration = GST_CLOCK_TIME_NONE;
gst_segment_init (&demux->segment, GST_FORMAT_TIME);
demux->first_pcr = (TSPcrOffset) {
GST_CLOCK_TIME_NONE, 0, 0};
demux->cur_pcr = (TSPcrOffset) {
0};
demux->last_pcr = (TSPcrOffset) {
0};
}
static void
@ -324,6 +366,7 @@ gst_ts_demux_srcpad_query_types (GstPad * pad)
{
static const GstQueryType query_types[] = {
GST_QUERY_DURATION,
GST_QUERY_SEEKING,
0
};
@ -334,40 +377,327 @@ static gboolean
gst_ts_demux_srcpad_query (GstPad * pad, GstQuery * query)
{
gboolean res = TRUE;
GstFormat format;
GstTSDemux *demux;
demux = GST_TS_DEMUX (gst_pad_get_parent (pad));
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_DURATION:
{
GstFormat format;
GST_DEBUG ("query duration");
gst_query_parse_duration (query, &format, NULL);
/* can only get position in time */
if (format != GST_FORMAT_TIME)
goto wrong_format;
gst_query_set_duration (query, GST_FORMAT_TIME, demux->duration);
if (format == GST_FORMAT_TIME) {
gst_query_set_duration (query, GST_FORMAT_TIME,
demux->segment.duration);
} else {
GST_DEBUG_OBJECT (demux, "only query duration on TIME is supported");
res = FALSE;
}
break;
case GST_QUERY_SEEKING:
GST_DEBUG ("query seeking");
gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
if (format == GST_FORMAT_TIME) {
gst_query_set_seeking (query, GST_FORMAT_TIME,
demux->parent.mode != BASE_MODE_PUSHING, 0,
demux->segment.duration);
} else {
GST_DEBUG_OBJECT (demux, "only TIME is supported for query seeking");
res = FALSE;
}
break;
}
default:
res = gst_pad_query_default (pad, query);
break;
}
done:
gst_object_unref (demux);
return res;
wrong_format:
{
GST_DEBUG_OBJECT (demux, "only query duration on TIME is supported");
res = FALSE;
goto done;
}
}
static inline GstClockTime
calculate_gsttime (TSPcrOffset * start, guint64 pcr)
{
GstClockTime time = start->gsttime;
if (start->pcr > pcr)
time += PCRTIME_TO_GSTTIME (PCR_MAX_VALUE - start->pcr) +
PCRTIME_TO_GSTTIME (pcr);
else
time += PCRTIME_TO_GSTTIME (pcr - start->pcr);
return time;
}
static gint
TSPcrOffset_find (gconstpointer a, gconstpointer b, gpointer user_data)
{
/* GST_INFO ("a: %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT, */
/* GST_TIME_ARGS (((TSPcrOffset *) a)->gsttime), ((TSPcrOffset *) a)->offset); */
/* GST_INFO ("b: %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT, */
/* GST_TIME_ARGS (((TSPcrOffset *) b)->gsttime), ((TSPcrOffset *) b)->offset); */
if (((TSPcrOffset *) a)->gsttime < ((TSPcrOffset *) b)->gsttime)
return -1;
else if (((TSPcrOffset *) a)->gsttime > ((TSPcrOffset *) b)->gsttime)
return 1;
else
return 0;
}
static GstFlowReturn
gst_ts_demux_perform_seek (MpegTSBase * base, GstSegment * segment)
{
GstTSDemux *demux = (GstTSDemux *) base;
GstFlowReturn res = GST_FLOW_ERROR;
int loop_cnt = 0;
double bias = 1.0;
gint64 desired_offset;
gint64 seekpos = 0;
gint64 time_diff;
GstClockTime seektime;
TSPcrOffset seekpcroffset, pcr_start, pcr_stop, *tmp;
desired_offset = segment->last_stop;
seektime = desired_offset + demux->first_pcr.gsttime;
seekpcroffset.gsttime = seektime;
GST_DEBUG ("seeking to %" GST_TIME_FORMAT, GST_TIME_ARGS (seektime));
gst_ts_demux_flush_streams (demux);
if (G_UNLIKELY (!demux->index)) {
GST_ERROR ("no index");
goto done;
}
/* get the first index entry before the seek position */
tmp = gst_util_array_binary_search (demux->index->data, demux->index_size,
sizeof (*tmp), TSPcrOffset_find, GST_SEARCH_MODE_BEFORE, &seekpcroffset,
NULL);
if (G_UNLIKELY (!tmp)) {
GST_ERROR ("value not found");
goto done;
}
pcr_start = *tmp;
pcr_stop = *(++tmp);
if (G_UNLIKELY (!pcr_stop.offset)) {
GST_ERROR ("invalid entry");
goto done;
}
/* check if the last recorded pcr can be used */
if (pcr_start.offset < demux->cur_pcr.offset
&& demux->cur_pcr.offset < pcr_stop.offset) {
demux->cur_pcr.gsttime = calculate_gsttime (&pcr_start, demux->cur_pcr.pcr);
if (demux->cur_pcr.gsttime < seekpcroffset.gsttime)
pcr_start = demux->cur_pcr;
else
pcr_stop = demux->cur_pcr;
}
GST_DEBUG ("start %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT,
GST_TIME_ARGS (pcr_start.gsttime), pcr_start.offset);
GST_DEBUG ("stop %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT,
GST_TIME_ARGS (pcr_stop.gsttime), pcr_stop.offset);
time_diff = seektime - pcr_start.gsttime;
seekpcroffset = pcr_start;
GST_DEBUG ("cur %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT
" time diff: %" G_GINT64_FORMAT,
GST_TIME_ARGS (demux->cur_pcr.gsttime), demux->cur_pcr.offset, time_diff);
/* seek loop */
while (loop_cnt++ < 10 && (time_diff < 0 || time_diff > 333 * GST_MSECOND)) {
gint64 duration = pcr_stop.gsttime - pcr_start.gsttime;
gint64 size = pcr_stop.offset - pcr_start.offset;
seekpos =
pcr_start.offset + size * bias * ((double) (seektime -
pcr_start.gsttime) / duration);
/* look a litle bit behind */
seekpos =
MAX (pcr_start.offset + 188, seekpos - 55 * MPEGTS_MAX_PACKETSIZE);
GST_DEBUG ("looking for time: %" GST_TIME_FORMAT " .. %" GST_TIME_FORMAT
" .. %" GST_TIME_FORMAT " bias = %g",
GST_TIME_ARGS (pcr_start.gsttime),
GST_TIME_ARGS (seektime), GST_TIME_ARGS (pcr_stop.gsttime), bias);
GST_DEBUG ("looking in bytes: %" G_GINT64_FORMAT " .. %" G_GINT64_FORMAT
" .. %" G_GINT64_FORMAT, pcr_start.offset, seekpos, pcr_stop.offset,
bias);
res =
find_pcr_packet (&demux->parent, seekpos, 4000 * MPEGTS_MAX_PACKETSIZE,
&seekpcroffset);
if (G_UNLIKELY (res == GST_FLOW_UNEXPECTED)) {
seekpos =
MAX ((gint64) pcr_start.offset,
seekpos - 2000 * MPEGTS_MAX_PACKETSIZE) + 188;
res =
find_pcr_packet (&demux->parent, seekpos,
8000 * MPEGTS_MAX_PACKETSIZE, &seekpcroffset);
}
if (G_UNLIKELY (res != GST_FLOW_OK)) {
GST_WARNING ("seeking failed %s", gst_flow_get_name (res));
goto done;
}
seekpcroffset.gsttime = calculate_gsttime (&pcr_start, seekpcroffset.pcr);
bias =
1.0 + MAX (-.3, MIN (.3,
((double) seektime - seekpcroffset.gsttime) / duration));
/* validate */
if (G_UNLIKELY ((seekpcroffset.gsttime < pcr_start.gsttime) ||
(seekpcroffset.gsttime > pcr_stop.gsttime))) {
GST_ERROR ("Unexpected timestamp found, seeking failed! %"
GST_TIME_FORMAT, GST_TIME_ARGS (seekpcroffset.gsttime));
res = GST_FLOW_ERROR;
goto done;
}
if (seekpcroffset.gsttime > seektime) {
pcr_stop = seekpcroffset;
} else {
pcr_start = seekpcroffset;
}
time_diff = seektime - pcr_start.gsttime;
GST_DEBUG ("looking: %" GST_TIME_FORMAT " found: %" GST_TIME_FORMAT
" diff = %" G_GINT64_FORMAT, GST_TIME_ARGS (seektime),
GST_TIME_ARGS (seekpcroffset.gsttime), time_diff);
}
GST_DEBUG ("seeking finished after %d loops", loop_cnt);
segment->last_stop = seekpcroffset.gsttime;
segment->time = seekpcroffset.gsttime;
/* we stop at the end */
if (segment->stop == -1)
segment->stop = segment->duration;
demux->need_newsegment = TRUE;
demux->parent.seek_offset = seekpcroffset.offset;
GST_DEBUG ("seeked to postion:%" GST_TIME_FORMAT,
GST_TIME_ARGS (seekpcroffset.gsttime));
res = GST_FLOW_OK;
done:
return res;
}
static GstFlowReturn
gst_ts_demux_do_seek (MpegTSBase * base, GstEvent * event)
{
GstTSDemux *demux = (GstTSDemux *) base;
GstFlowReturn res = GST_FLOW_ERROR;
gdouble rate;
gboolean accurate, flush;
GstFormat format;
GstSeekFlags flags;
GstSeekType start_type, stop_type;
gint64 start, stop;
GstSegment seeksegment;
gboolean update;
gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
&stop_type, &stop);
if (format != GST_FORMAT_TIME) {
goto done;
}
GST_DEBUG ("seek event, rate: %f start: %" GST_TIME_FORMAT
" stop: %" GST_TIME_FORMAT, rate, GST_TIME_ARGS (start),
GST_TIME_ARGS (stop));
accurate = flags & GST_SEEK_FLAG_ACCURATE;
flush = flags & GST_SEEK_FLAG_FLUSH;
if (flags & (GST_SEEK_FLAG_KEY_UNIT | GST_SEEK_FLAG_SEGMENT |
GST_SEEK_FLAG_SKIP)) {
GST_WARNING ("seek flags 0x%x are not supported", (int) flags);
goto done;
}
/* copy segment, we need this because we still need the old
* segment when we close the current segment. */
memcpy (&seeksegment, &demux->segment, sizeof (GstSegment));
/* configure the segment with the seek variables */
GST_DEBUG_OBJECT (demux, "configuring seek");
GST_DEBUG ("seeksegment: start: %" GST_TIME_FORMAT " stop: %"
GST_TIME_FORMAT " time: %" GST_TIME_FORMAT " accum: %" GST_TIME_FORMAT
" last_stop: %" GST_TIME_FORMAT " duration: %" GST_TIME_FORMAT,
GST_TIME_ARGS (seeksegment.start), GST_TIME_ARGS (seeksegment.stop),
GST_TIME_ARGS (seeksegment.time), GST_TIME_ARGS (seeksegment.accum),
GST_TIME_ARGS (seeksegment.last_stop),
GST_TIME_ARGS (seeksegment.duration));
gst_segment_set_seek (&seeksegment, rate, format, flags, start_type, start,
stop_type, stop, &update);
GST_DEBUG ("seeksegment: start: %" GST_TIME_FORMAT " stop: %"
GST_TIME_FORMAT " time: %" GST_TIME_FORMAT " accum: %" GST_TIME_FORMAT
" last_stop: %" GST_TIME_FORMAT " duration: %" GST_TIME_FORMAT,
GST_TIME_ARGS (seeksegment.start), GST_TIME_ARGS (seeksegment.stop),
GST_TIME_ARGS (seeksegment.time), GST_TIME_ARGS (seeksegment.accum),
GST_TIME_ARGS (seeksegment.last_stop),
GST_TIME_ARGS (seeksegment.duration));
res = gst_ts_demux_perform_seek (base, &seeksegment);
if (G_UNLIKELY (res != GST_FLOW_OK)) {
GST_WARNING ("seeking failed %s", gst_flow_get_name (res));
goto done;
}
/* commit the new segment */
memcpy (&demux->segment, &seeksegment, sizeof (GstSegment));
if (demux->segment.flags & GST_SEEK_FLAG_SEGMENT) {
gst_element_post_message (GST_ELEMENT_CAST (demux),
gst_message_new_segment_start (GST_OBJECT_CAST (demux),
demux->segment.format, demux->segment.last_stop));
}
done:
return res;
}
static gboolean
gst_ts_demux_srcpad_event (GstPad * pad, GstEvent * event)
{
gboolean res = TRUE;
GstTSDemux *demux = GST_TS_DEMUX (gst_pad_get_parent (pad));
GST_DEBUG_OBJECT (pad, "Got event %s",
gst_event_type_get_name (GST_EVENT_TYPE (event)));
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEEK:
res = mpegts_base_handle_seek_event ((MpegTSBase *) demux, pad, event);
if (!res) {
GST_WARNING ("seeking failed");
}
gst_event_unref (event);
break;
default:
res = gst_pad_event_default (pad, event);
}
gst_object_unref (demux);
return res;
}
static gboolean
push_event (MpegTSBase * base, GstEvent * event)
@ -675,6 +1005,7 @@ create_pad_for_stream (MpegTSBase * base, MpegTSBaseStream * bstream,
gst_pad_set_caps (pad, caps);
gst_pad_set_query_type_function (pad, gst_ts_demux_srcpad_query_types);
gst_pad_set_query_function (pad, gst_ts_demux_srcpad_query);
gst_pad_set_event_function (pad, gst_ts_demux_srcpad_event);
gst_caps_unref (caps);
}
@ -725,6 +1056,33 @@ activate_pad_for_stream (GstTSDemux * tsdemux, TSDemuxStream * stream)
GST_WARNING_OBJECT (tsdemux, "stream %p has no pad", stream);
}
static void
gst_ts_demux_stream_flush (TSDemuxStream * stream)
{
gint i;
stream->pts = GST_CLOCK_TIME_NONE;
for (i = 0; i < stream->nbpending; i++)
gst_buffer_unref (stream->pendingbuffers[i]);
memset (stream->pendingbuffers, 0, TS_MAX_PENDING_BUFFERS);
stream->nbpending = 0;
stream->current = NULL;
}
static void
gst_ts_demux_flush_streams (GstTSDemux * demux)
{
gint i;
for (i = 0; i < 0x2000; i++) {
if (demux->program->streams[i]) {
gst_ts_demux_stream_flush ((TSDemuxStream *) demux->program->streams[i]);
}
}
}
static void
gst_ts_demux_program_started (MpegTSBase * base, MpegTSBaseProgram * program)
{
@ -832,6 +1190,171 @@ process_section (MpegTSBase * base)
return done;
}
static gboolean
process_pes (MpegTSBase * base, TSPcrOffset * pcroffset)
{
gboolean based, done = FALSE;
MpegTSPacketizerPacket packet;
MpegTSPacketizerPacketReturn pret;
GstTSDemux *demux = GST_TS_DEMUX (base);
guint16 pcr_pid = 0;
while ((!done)
&& ((pret =
mpegts_packetizer_next_packet (base->packetizer,
&packet)) != PACKET_NEED_MORE)) {
if (G_UNLIKELY (pret == PACKET_BAD))
/* bad header, skip the packet */
goto next;
if (demux->program != NULL) {
pcr_pid = demux->program->pcr_pid;
}
/* base PSI data */
if (packet.payload != NULL && mpegts_base_is_psi (base, &packet)) {
MpegTSPacketizerSection section;
based =
mpegts_packetizer_push_section (base->packetizer, &packet, &section);
if (G_UNLIKELY (!based))
/* bad section data */
goto next;
if (G_LIKELY (section.complete)) {
/* section complete */
GST_DEBUG ("Section Complete");
based = mpegts_base_handle_psi (base, &section);
gst_buffer_unref (section.buffer);
if (G_UNLIKELY (!based))
/* bad PSI table */
goto next;
}
}
if (packet.pid == pcr_pid && (packet.adaptation_field_control & 0x02)
&& (packet.afc_flags & MPEGTS_AFC_PCR_FLAG)) {
GST_DEBUG ("PCR[0x%x]: %" G_GINT64_FORMAT, packet.pid, packet.pcr);
pcroffset->pcr = packet.pcr;
pcroffset->offset = packet.offset;
done = TRUE;
}
next:
mpegts_packetizer_clear_packet (base->packetizer, &packet);
}
return done;
}
static GstFlowReturn
find_pcr_packet (MpegTSBase * base, guint64 offset, gint64 length,
TSPcrOffset * pcroffset)
{
GstFlowReturn ret = GST_FLOW_OK;
GstTSDemux *demux = GST_TS_DEMUX (base);
MpegTSBaseProgram *program;
GstBuffer *buf;
gboolean done = FALSE;
guint64 scan_offset = 0;
GST_DEBUG ("Scanning for PCR between:%" G_GINT64_FORMAT
" and the end:%" G_GINT64_FORMAT, offset, offset + length);
/* Get the program */
program = demux->program;
if (G_UNLIKELY (program == NULL))
return GST_FLOW_ERROR;
mpegts_packetizer_flush (base->packetizer);
while (!done && scan_offset < length) {
ret =
gst_pad_pull_range (base->sinkpad, offset + scan_offset,
50 * MPEGTS_MAX_PACKETSIZE, &buf);
if (ret != GST_FLOW_OK)
goto beach;
mpegts_packetizer_push (base->packetizer, buf);
done = process_pes (base, pcroffset);
scan_offset += 50 * MPEGTS_MAX_PACKETSIZE;
}
if (!done || scan_offset >= length) {
GST_WARNING ("No PCR found!");
ret = GST_FLOW_ERROR;
goto beach;
}
beach:
mpegts_packetizer_flush (base->packetizer);
return ret;
}
static gboolean
verify_timestamps (MpegTSBase * base, TSPcrOffset * first, TSPcrOffset * last)
{
GstTSDemux *demux = GST_TS_DEMUX (base);
guint64 length = 4000 * MPEGTS_MAX_PACKETSIZE;
guint64 offset = PCR_WRAP_SIZE_128KBPS;
demux->index =
g_array_sized_new (TRUE, TRUE, sizeof (*first),
2 + 1 + ((last->offset - first->offset) / PCR_WRAP_SIZE_128KBPS));
first->gsttime = PCRTIME_TO_GSTTIME (first->pcr);
demux->index = g_array_append_val (demux->index, *first);
demux->index_size++;
demux->first_pcr = *first;
demux->index_pcr = *first;
GST_DEBUG ("first time: %" GST_TIME_FORMAT " pcr: %" GST_TIME_FORMAT
" offset: %" G_GINT64_FORMAT
" last pcr: %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT,
GST_TIME_ARGS (first->gsttime),
GST_TIME_ARGS (PCRTIME_TO_GSTTIME (first->pcr)), first->offset,
GST_TIME_ARGS (PCRTIME_TO_GSTTIME (last->pcr)), last->offset);
while (offset + length < last->offset) {
TSPcrOffset half;
GstFlowReturn ret;
gint tries = 0;
retry:
ret = find_pcr_packet (base, offset, length, &half);
if (G_UNLIKELY (ret != GST_FLOW_OK)) {
GST_WARNING ("no pcr found, retrying");
if (tries++ < 3) {
offset += length;
length *= 2;
goto retry;
}
return FALSE;
}
half.gsttime = calculate_gsttime (first, half.pcr);
GST_DEBUG ("add half time: %" GST_TIME_FORMAT " pcr: %" GST_TIME_FORMAT
" offset: %" G_GINT64_FORMAT,
GST_TIME_ARGS (half.gsttime),
GST_TIME_ARGS (PCRTIME_TO_GSTTIME (half.pcr)), half.offset);
demux->index = g_array_append_val (demux->index, half);
demux->index_size++;
length = 4000 * MPEGTS_MAX_PACKETSIZE;
offset += PCR_WRAP_SIZE_128KBPS;
*first = half;
}
last->gsttime = calculate_gsttime (first, last->pcr);
GST_DEBUG ("add last time: %" GST_TIME_FORMAT " pcr: %" GST_TIME_FORMAT
" offset: %" G_GINT64_FORMAT,
GST_TIME_ARGS (last->gsttime),
GST_TIME_ARGS (PCRTIME_TO_GSTTIME (last->pcr)), last->offset);
demux->index = g_array_append_val (demux->index, *last);
demux->index_size++;
demux->last_pcr = *last;
return TRUE;
}
static GstFlowReturn
find_timestamps (MpegTSBase * base, guint64 initoff, guint64 * offset)
@ -844,7 +1367,7 @@ find_timestamps (MpegTSBase * base, guint64 initoff, guint64 * offset)
gint64 total_bytes;
guint64 scan_offset;
guint i = 0;
GstClockTime initial, final;
TSPcrOffset initial, final;
GstTSDemux *demux = GST_TS_DEMUX (base);
GST_DEBUG ("Scanning for timestamps");
@ -875,6 +1398,8 @@ find_timestamps (MpegTSBase * base, guint64 initoff, guint64 * offset)
mpegts_packetizer_clear (base->packetizer);
/* Remove current program so we ensure looking for a PAT when scanning the
* for the final PCR */
gst_structure_free (base->pat);
base->pat = NULL;
mpegts_base_remove_program (base, demux->current_program_number);
if (ret != GST_FLOW_OK && ret != GST_FLOW_UNEXPECTED) {
@ -922,8 +1447,11 @@ find_timestamps (MpegTSBase * base, guint64 initoff, guint64 * offset)
goto beach;
}
demux->duration = final - initial;
verify_timestamps (base, &initial, &final);
gst_segment_set_duration (&demux->segment, GST_FORMAT_TIME,
demux->last_pcr.gsttime - demux->first_pcr.gsttime);
demux->duration = demux->last_pcr.gsttime - demux->first_pcr.gsttime;
GST_DEBUG ("Done, duration:%" GST_TIME_FORMAT,
GST_TIME_ARGS (demux->duration));
@ -931,13 +1459,15 @@ beach:
mpegts_packetizer_clear (base->packetizer);
/* Remove current program */
gst_structure_free (base->pat);
base->pat = NULL;
mpegts_base_remove_program (base, demux->current_program_number);
return ret;
}
static GstFlowReturn
process_pcr (MpegTSBase * base, guint64 initoff, GstClockTime * pcr,
process_pcr (MpegTSBase * base, guint64 initoff, TSPcrOffset * pcroffset,
guint numpcr, gboolean isinitial)
{
GstTSDemux *demux = GST_TS_DEMUX (base);
@ -988,15 +1518,31 @@ process_pcr (MpegTSBase * base, guint64 initoff, GstClockTime * pcr,
offset = 0;
size = GST_BUFFER_SIZE (buf);
/* FIXME : We should jump to next packet instead of scanning everything */
while ((size >= br.size) && (nbpcr < numpcr)
&& (offset =
gst_byte_reader_masked_scan_uint32 (&br, pcrmask, pcrpattern,
offset, size)) != -1) {
resync:
offset = gst_byte_reader_masked_scan_uint32 (&br, 0xff000000, 0x47000000,
0, base->packetsize);
if (offset == -1)
continue;
while ((nbpcr < numpcr) && (size >= base->packetsize)) {
guint32 header = GST_READ_UINT32_BE (br.data + offset);
if ((header >> 24) != 0x47)
goto resync;
if ((header & pcrmask) != pcrpattern) {
/* Move offset forward by 1 packet */
size -= base->packetsize;
offset += base->packetsize;
continue;
}
/* Potential PCR */
/* GST_DEBUG ("offset %" G_GUINT64_FORMAT, GST_BUFFER_OFFSET (buf) + offset);
GST_MEMDUMP ("something", GST_BUFFER_DATA (buf) + offset, 16);*/
if ((*(br.data + offset + 5)) & 0x10) {
if ((*(br.data + offset + 5)) & MPEGTS_AFC_PCR_FLAG) {
guint64 lpcr = mpegts_packetizer_compute_pcr (br.data + offset + 6);
GST_INFO ("Found PCR %" G_GUINT64_FORMAT " %" GST_TIME_FORMAT
@ -1011,6 +1557,9 @@ process_pcr (MpegTSBase * base, guint64 initoff, GstClockTime * pcr,
if (nbpcr > 1) {
if (pcrs[nbpcr] == pcrs[nbpcr - 1]) {
GST_WARNING ("Found same PCR at different offset");
} else if (pcrs[nbpcr] < pcrs[nbpcr - 1]) {
GST_WARNING ("Found PCR wraparound");
nbpcr += 1;
} else if ((pcrs[nbpcr] - pcrs[nbpcr - 1]) >
(guint64) 10 * 60 * 27000000) {
GST_WARNING ("PCR differs with previous PCR by more than 10 mins");
@ -1019,20 +1568,22 @@ process_pcr (MpegTSBase * base, guint64 initoff, GstClockTime * pcr,
} else
nbpcr += 1;
}
/* Move offset forward by 1 */
size -= offset + 1;
offset += 1;
/* Move offset forward by 1 packet */
size -= base->packetsize;
offset += base->packetsize;
}
}
beach:
GST_DEBUG ("Found %d PCR", nbpcr);
if (nbpcr) {
if (isinitial)
*pcr = PCRTIME_TO_GSTTIME (pcrs[0]);
else
*pcr = PCRTIME_TO_GSTTIME (pcrs[nbpcr - 1]);
if (isinitial) {
pcroffset->pcr = pcrs[0];
pcroffset->offset = pcroffs[0];
} else {
pcroffset->pcr = pcrs[nbpcr - 1];
pcroffset->offset = pcroffs[nbpcr - 1];
}
GST_DEBUG ("pcrdiff:%" GST_TIME_FORMAT " offsetdiff %" G_GUINT64_FORMAT,
GST_TIME_ARGS (PCRTIME_TO_GSTTIME (pcrs[nbpcr - 1] - pcrs[0])),
pcroffs[nbpcr - 1] - pcroffs[0]);
@ -1061,6 +1612,18 @@ gst_ts_demux_record_pcr (GstTSDemux * demux, TSDemuxStream * stream,
G_GUINT64_FORMAT, bs->pid,
GST_TIME_ARGS (PCRTIME_TO_GSTTIME (pcr)), offset);
if (G_LIKELY (bs->pid == demux->program->pcr_pid)) {
demux->cur_pcr.gsttime = GST_CLOCK_TIME_NONE;
demux->cur_pcr.offset = offset;
demux->cur_pcr.pcr = pcr;
/* set first_pcr in push mode */
if (G_UNLIKELY (!demux->first_pcr.gsttime == GST_CLOCK_TIME_NONE)) {
demux->first_pcr.gsttime = PCRTIME_TO_GSTTIME (pcr);
demux->first_pcr.offset = offset;
demux->first_pcr.pcr = pcr;
}
}
if (G_UNLIKELY (demux->emit_statistics)) {
GstStructure *st;
st = gst_structure_id_empty_new (QUARK_TSDEMUX);
@ -1139,6 +1702,36 @@ gst_ts_demux_record_dts (GstTSDemux * demux, TSDemuxStream * stream,
}
}
static inline GstClockTime
calc_gsttime_from_pts (TSPcrOffset * start, guint64 pts)
{
GstClockTime time = start->gsttime - PCRTIME_TO_GSTTIME (start->pcr);
if (start->pcr > pts * 300)
time += PCRTIME_TO_GSTTIME (PCR_MAX_VALUE) + MPEGTIME_TO_GSTTIME (pts);
else
time += MPEGTIME_TO_GSTTIME (pts);
return time;
}
static gint
TSPcrOffset_find_offset (gconstpointer a, gconstpointer b, gpointer user_data)
{
/* GST_INFO ("a: %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT, */
/* GST_TIME_ARGS (((TSPcrOffset *) a)->gsttime), ((TSPcrOffset *) a)->offset); */
/* GST_INFO ("b: %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT, */
/* GST_TIME_ARGS (((TSPcrOffset *) b)->gsttime), ((TSPcrOffset *) b)->offset); */
if (((TSPcrOffset *) a)->offset < ((TSPcrOffset *) b)->offset)
return -1;
else if (((TSPcrOffset *) a)->offset > ((TSPcrOffset *) b)->offset)
return 1;
else
return 0;
}
static GstFlowReturn
gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream)
{
@ -1214,12 +1807,32 @@ gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream)
/* PTS 32 */
if ((p2 & 0x80)) { /* PTS */
GstClockTime time;
guint64 offset = GST_BUFFER_OFFSET (stream->pendingbuffers[0]);
READ_TS (data, pts, discont);
gst_ts_demux_record_pts (demux, stream, pts,
GST_BUFFER_OFFSET (stream->pendingbuffers[0]));
gst_ts_demux_record_pts (demux, stream, pts, offset);
length -= 4;
GST_BUFFER_TIMESTAMP (stream->pendingbuffers[0]) =
MPEGTIME_TO_GSTTIME (pts);
if (demux->index_pcr.offset + PCR_WRAP_SIZE_128KBPS + 1000 * 128 < offset
|| (demux->index_pcr.offset > offset)) {
/* find next entry */
TSPcrOffset *next;
demux->index_pcr.offset = offset;
next = gst_util_array_binary_search (demux->index->data,
demux->index_size, sizeof (*next), TSPcrOffset_find_offset,
GST_SEARCH_MODE_BEFORE, &demux->index_pcr, NULL);
if (next) {
GST_INFO ("new index_pcr %" GST_TIME_FORMAT " offset: %"
G_GINT64_FORMAT, GST_TIME_ARGS (next->gsttime), next->offset);
demux->index_pcr = *next;
}
}
time = calc_gsttime_from_pts (&demux->index_pcr, pts);
GST_BUFFER_TIMESTAMP (stream->pendingbuffers[0]) = time;
if (!GST_CLOCK_TIME_IS_VALID (stream->pts)) {
stream->pts = GST_BUFFER_TIMESTAMP (stream->pendingbuffers[0]);
@ -1344,7 +1957,6 @@ gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream)
guint i;
GstClockTime tinypts = GST_CLOCK_TIME_NONE;
GstClockTime stop = GST_CLOCK_TIME_NONE;
GstEvent *newsegmentevent;
GST_DEBUG ("stream:%p, pid:0x%04x stream_type:%d state:%d pad:%s:%s",
@ -1381,24 +1993,27 @@ gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream)
tinypts))
tinypts = ((TSDemuxStream *) demux->program->streams[i])->pts;
}
}
if (GST_CLOCK_TIME_IS_VALID (demux->duration))
stop = tinypts + demux->duration;
GST_DEBUG ("Sending newsegment event");
GST_DEBUG ("segment: tinypts: %" GST_TIME_FORMAT " stop: %"
GST_TIME_FORMAT " time: %" GST_TIME_FORMAT,
GST_TIME_ARGS (tinypts),
GST_TIME_ARGS (demux->first_pcr.gsttime + demux->duration),
GST_TIME_ARGS (tinypts - demux->first_pcr.gsttime));
newsegmentevent =
gst_event_new_new_segment (0, 1.0, GST_FORMAT_TIME, tinypts, stop,
0);
gst_event_new_new_segment (0, 1.0, GST_FORMAT_TIME, tinypts,
demux->first_pcr.gsttime + demux->duration,
tinypts - demux->first_pcr.gsttime);
push_event ((MpegTSBase *) demux, newsegmentevent);
demux->need_newsegment = FALSE;
}
GST_DEBUG_OBJECT (stream->pad, "Pushing buffer list ");
GST_DEBUG_OBJECT (stream->pad,
"Pushing buffer list with timestamp: %" GST_TIME_FORMAT,
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (gst_buffer_list_get
(stream->current, 0, 0))));
res = gst_pad_push_list (stream->pad, stream->current);
GST_DEBUG_OBJECT (stream->pad, "Returned %s", gst_flow_get_name (res));

View file

@ -48,6 +48,14 @@ G_BEGIN_DECLS
#define GST_TS_DEMUX_CAST(obj) ((GstTSDemux*) obj)
typedef struct _GstTSDemux GstTSDemux;
typedef struct _GstTSDemuxClass GstTSDemuxClass;
typedef struct _TSPcrOffset TSPcrOffset;
struct _TSPcrOffset
{
guint64 gsttime;
guint64 pcr;
guint64 offset;
};
struct _GstTSDemux
{
@ -62,7 +70,16 @@ struct _GstTSDemux
MpegTSBaseProgram *program; /* Current program */
guint current_program_number;
gboolean need_newsegment;
GstSegment segment;
GstClockTime duration; /* Total duration */
/* pcr wrap and seeking */
GArray *index;
gint index_size;
TSPcrOffset first_pcr;
TSPcrOffset last_pcr;
TSPcrOffset cur_pcr;
TSPcrOffset index_pcr;
};
struct _GstTSDemuxClass