Add pull mode to mpegpsdemux and report duration reading first and last PTS. Some random cleanups.

This commit is contained in:
Josep Torra 2009-02-02 23:12:07 +01:00
parent 18e2ffa484
commit c8eb591688
4 changed files with 899 additions and 143 deletions

View file

@ -190,6 +190,17 @@
#define MPEG_MUX_RATE_MULT 50 #define MPEG_MUX_RATE_MULT 50
/* sync:4 == 00xx ! pts:3 ! 1 ! pts:15 ! 1 | pts:15 ! 1 */
#define READ_TS(data, target, lost_sync_label) \
if ((*data & 0x01) != 0x01) goto lost_sync_label; \
target = ((guint64) (*data++ & 0x0E)) << 29; \
target |= ((guint64) (*data++ )) << 22; \
if ((*data & 0x01) != 0x01) goto lost_sync_label; \
target |= ((guint64) (*data++ & 0xFE)) << 14; \
target |= ((guint64) (*data++ )) << 7; \
if ((*data & 0x01) != 0x01) goto lost_sync_label; \
target |= ((guint64) (*data++ & 0xFE)) >> 1;
/* some extra GstFlowReturn values used internally */ /* some extra GstFlowReturn values used internally */
#define GST_FLOW_NEED_MORE_DATA -100 #define GST_FLOW_NEED_MORE_DATA -100
#define GST_FLOW_LOST_SYNC -101 #define GST_FLOW_LOST_SYNC -101

View file

@ -52,6 +52,15 @@
#define MAX_DVD_AUDIO_STREAMS 8 #define MAX_DVD_AUDIO_STREAMS 8
#define MAX_DVD_SUBPICTURE_STREAMS 32 #define MAX_DVD_SUBPICTURE_STREAMS 32
#define BLOCK_SZ 4096
#define SCAN_SZ 80
typedef enum
{
SCAN_SCR,
SCAN_DTS,
SCAN_PTS
} SCAN_MODE;
/* We clamp scr delta with 0 so negative bytes won't be possible */ /* We clamp scr delta with 0 so negative bytes won't be possible */
#define GSTTIME_TO_BYTES(time) \ #define GSTTIME_TO_BYTES(time) \
@ -179,9 +188,16 @@ static void gst_flups_demux_finalize (GstFluPSDemux * demux);
static void gst_flups_demux_reset (GstFluPSDemux * demux); static void gst_flups_demux_reset (GstFluPSDemux * demux);
static gboolean gst_flups_demux_sink_event (GstPad * pad, GstEvent * event); static gboolean gst_flups_demux_sink_event (GstPad * pad, GstEvent * event);
static GstFlowReturn gst_flups_demux_chain (GstPad * pad, GstBuffer * buffer);
static gboolean gst_flups_demux_sink_activate (GstPad * sinkpad);
static gboolean gst_flups_demux_sink_activate_push (GstPad * sinkpad,
gboolean active);
static gboolean gst_flups_demux_sink_activate_pull (GstPad * sinkpad,
gboolean active);
static void gst_flups_demux_loop (GstPad * pad);
static gboolean gst_flups_demux_src_event (GstPad * pad, GstEvent * event); static gboolean gst_flups_demux_src_event (GstPad * pad, GstEvent * event);
static gboolean gst_flups_demux_src_query (GstPad * pad, GstQuery * query); static gboolean gst_flups_demux_src_query (GstPad * pad, GstQuery * query);
static GstFlowReturn gst_flups_demux_chain (GstPad * pad, GstBuffer * buffer);
static GstStateChangeReturn gst_flups_demux_change_state (GstElement * element, static GstStateChangeReturn gst_flups_demux_change_state (GstElement * element,
GstStateChange transition); GstStateChange transition);
@ -259,8 +275,17 @@ gst_flups_demux_init (GstFluPSDemux * demux)
GstFluPSDemuxClass *klass = GST_FLUPS_DEMUX_GET_CLASS (demux); GstFluPSDemuxClass *klass = GST_FLUPS_DEMUX_GET_CLASS (demux);
demux->sinkpad = gst_pad_new_from_template (klass->sink_template, "sink"); demux->sinkpad = gst_pad_new_from_template (klass->sink_template, "sink");
gst_pad_set_event_function (demux->sinkpad, gst_flups_demux_sink_event); gst_pad_set_event_function (demux->sinkpad,
gst_pad_set_chain_function (demux->sinkpad, gst_flups_demux_chain); GST_DEBUG_FUNCPTR (gst_flups_demux_sink_event));
gst_pad_set_chain_function (demux->sinkpad,
GST_DEBUG_FUNCPTR (gst_flups_demux_chain));
gst_pad_set_activate_function (demux->sinkpad,
GST_DEBUG_FUNCPTR (gst_flups_demux_sink_activate));
gst_pad_set_activatepull_function (demux->sinkpad,
GST_DEBUG_FUNCPTR (gst_flups_demux_sink_activate_pull));
gst_pad_set_activatepush_function (demux->sinkpad,
GST_DEBUG_FUNCPTR (gst_flups_demux_sink_activate_push));
gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad); gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
demux->streams = demux->streams =
@ -392,8 +417,10 @@ gst_flups_demux_create_stream (GstFluPSDemux * demux, gint id, gint stream_type)
stream->notlinked = FALSE; stream->notlinked = FALSE;
stream->type = stream_type; stream->type = stream_type;
stream->pad = gst_pad_new_from_template (template, name); stream->pad = gst_pad_new_from_template (template, name);
gst_pad_set_event_function (stream->pad, gst_flups_demux_src_event); gst_pad_set_event_function (stream->pad,
gst_pad_set_query_function (stream->pad, gst_flups_demux_src_query); GST_DEBUG_FUNCPTR (gst_flups_demux_src_event));
gst_pad_set_query_function (stream->pad,
GST_DEBUG_FUNCPTR (gst_flups_demux_src_query));
gst_pad_use_fixed_caps (stream->pad); gst_pad_use_fixed_caps (stream->pad);
gst_pad_set_caps (stream->pad, caps); gst_pad_set_caps (stream->pad, caps);
gst_caps_unref (caps); gst_caps_unref (caps);
@ -510,12 +537,19 @@ gst_flups_demux_send_data (GstFluPSDemux * demux, GstFluPSStream * stream,
gst_pad_push_event (stream->pad, newsegment); gst_pad_push_event (stream->pad, newsegment);
stream->need_segment = FALSE; stream->need_segment = FALSE;
if (!demux->is_segment_open) {
GST_DEBUG_OBJECT (demux, "segment opened");
demux->is_segment_open = TRUE;
}
} }
/* OK, sent new segment now prepare the buffer for sending */ /* OK, sent new segment now prepare the buffer for sending */
/* caps */ /* caps */
gst_buffer_set_caps (buf, GST_PAD_CAPS (stream->pad)); gst_buffer_set_caps (buf, GST_PAD_CAPS (stream->pad));
GST_BUFFER_TIMESTAMP (buf) = timestamp; GST_BUFFER_TIMESTAMP (buf) = timestamp;
if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
gst_segment_set_last_stop (&demux->src_segment, GST_FORMAT_TIME, timestamp);
}
/* Set the buffer discont flag, and clear discont state on the stream */ /* Set the buffer discont flag, and clear discont state on the stream */
if (stream->discont) { if (stream->discont) {
@ -682,6 +716,52 @@ gst_flups_demux_handle_dvd_event (GstFluPSDemux * demux, GstEvent * event)
return TRUE; return TRUE;
} }
static void
gst_flups_demux_flush (GstFluPSDemux * demux)
{
GST_DEBUG_OBJECT (demux, "flushing demuxer");
gst_adapter_clear (demux->adapter);
gst_adapter_clear (demux->rev_adapter);
gst_pes_filter_drain (&demux->filter);
demux->adapter_offset = G_MAXUINT64;
demux->current_scr = G_MAXUINT64;
demux->bytes_since_scr = 0;
}
static void
gst_flups_demux_close_segment (GstFluPSDemux * demux)
{
if (G_UNLIKELY (!demux->is_segment_open)) {
GST_DEBUG_OBJECT (demux, "no running segment to close");
return;
}
#if POST_10_10
GST_INFO_OBJECT (demux, "closing running segment %" GST_SEGMENT_FORMAT,
&demux->src_segment);
#endif
/* Close the current segment for a linear playback */
if (demux->src_segment.rate >= 0) {
/* for forward playback, we played from start to last_stop */
gst_flups_demux_send_event (demux, gst_event_new_new_segment (TRUE,
demux->src_segment.rate, demux->src_segment.format,
demux->src_segment.start, demux->src_segment.last_stop,
demux->src_segment.time));
} else {
gint64 stop;
if ((stop = demux->src_segment.stop) == -1)
stop = demux->src_segment.duration;
/* for reverse playback, we played from stop to last_stop. */
gst_flups_demux_send_event (demux, gst_event_new_new_segment (TRUE,
demux->src_segment.rate, demux->src_segment.format,
demux->src_segment.last_stop, stop, demux->src_segment.last_stop));
}
demux->is_segment_open = FALSE;
}
static gboolean static gboolean
gst_flups_demux_sink_event (GstPad * pad, GstEvent * event) gst_flups_demux_sink_event (GstPad * pad, GstEvent * event)
{ {
@ -696,14 +776,8 @@ gst_flups_demux_sink_event (GstPad * pad, GstEvent * event)
break; break;
case GST_EVENT_FLUSH_STOP: case GST_EVENT_FLUSH_STOP:
gst_flups_demux_send_event (demux, event); gst_flups_demux_send_event (demux, event);
gst_segment_init (&demux->sink_segment, GST_FORMAT_UNDEFINED); gst_segment_init (&demux->sink_segment, GST_FORMAT_UNDEFINED);
gst_adapter_clear (demux->adapter); gst_flups_demux_flush (demux);
gst_adapter_clear (demux->rev_adapter);
demux->adapter_offset = G_MAXUINT64;
gst_pes_filter_drain (&demux->filter);
demux->current_scr = G_MAXUINT64;
demux->bytes_since_scr = 0;
break; break;
case GST_EVENT_NEWSEGMENT: case GST_EVENT_NEWSEGMENT:
{ {
@ -712,6 +786,9 @@ gst_flups_demux_sink_event (GstPad * pad, GstEvent * event)
GstFormat format; GstFormat format;
gint64 start, stop, time; gint64 start, stop, time;
/* Close current segment */
gst_flups_demux_close_segment (demux);
#ifdef HAVE_NEWSEG_FULL #ifdef HAVE_NEWSEG_FULL
{ {
gdouble arate; gdouble arate;
@ -786,16 +863,9 @@ gst_flups_demux_sink_event (GstPad * pad, GstEvent * event)
} }
static gboolean static gboolean
gst_flups_demux_src_event (GstPad * pad, GstEvent * event) gst_flups_demux_handle_seek_push (GstFluPSDemux * demux, GstEvent * event)
{ {
gboolean res = FALSE; gboolean res = FALSE;
GstFluPSDemux *demux;
demux = GST_FLUPS_DEMUX (gst_pad_get_parent (pad));
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEEK:
{
gdouble rate; gdouble rate;
GstFormat format; GstFormat format;
GstSeekFlags flags; GstSeekFlags flags;
@ -849,8 +919,165 @@ gst_flups_demux_src_event (GstPad * pad, GstEvent * event)
done: done:
gst_event_unref (event); gst_event_unref (event);
break; return res;
not_supported:
{
gst_event_unref (event);
return FALSE;
} }
}
static gboolean
gst_flups_demux_handle_seek_pull (GstFluPSDemux * demux, GstEvent * event)
{
GstFormat format;
GstSeekFlags flags;
GstSeekType start_type, stop_type;
gint64 start, stop;
gdouble rate;
gboolean update, flush, keyframe;
GstSegment seeksegment;
GstClockTime first_pts = MPEGTIME_TO_GSTTIME (demux->first_pts);
gst_event_parse_seek (event, &rate, &format, &flags,
&start_type, &start, &stop_type, &stop);
if (format != GST_FORMAT_TIME)
goto wrong_format;
/* We need to convert to byte based seek and we need a scr_rate for that. */
if (demux->scr_rate_n == G_MAXUINT64 || demux->scr_rate_d == G_MAXUINT64)
goto no_scr_rate;
flush = flags & GST_SEEK_FLAG_FLUSH;
keyframe = flags & GST_SEEK_FLAG_KEY_UNIT;
if (flush) {
/* Flush start up and downstream to make sure data flow and loops are
idle */
gst_flups_demux_send_event (demux, gst_event_new_flush_start ());
gst_pad_push_event (demux->sinkpad, gst_event_new_flush_start ());
} else {
/* Pause the pulling task */
gst_pad_pause_task (demux->sinkpad);
}
/* Take the stream lock */
GST_PAD_STREAM_LOCK (demux->sinkpad);
if (flush) {
/* Stop flushing upstream we need to pull */
gst_pad_push_event (demux->sinkpad, gst_event_new_flush_stop ());
}
/* Work on a copy until we are sure the seek succeeded. */
memcpy (&seeksegment, &demux->src_segment, sizeof (GstSegment));
#if POST_10_10
GST_DEBUG_OBJECT (demux, "segment before configure %" GST_SEGMENT_FORMAT,
&demux->src_segment);
#endif
/* Apply the seek to our segment */
gst_segment_set_seek (&seeksegment, rate, format, flags,
start_type, start, stop_type, stop, &update);
if (flush || seeksegment.last_stop != demux->src_segment.last_stop) {
/* Do the actual seeking */
#if POST_10_10
GST_INFO_OBJECT (demux, "sink segment configured %" GST_SEGMENT_FORMAT,
&demux->sink_segment);
#endif
gst_segment_set_last_stop (&demux->sink_segment, GST_FORMAT_BYTES,
MIN (GSTTIME_TO_BYTES (seeksegment.last_stop),
demux->sink_segment.stop));
#if POST_10_10
GST_INFO_OBJECT (demux, "sink segment configured %" GST_SEGMENT_FORMAT,
&demux->sink_segment);
#endif
}
/* check the limits */
if (seeksegment.start < first_pts)
seeksegment.start = first_pts;
if (seeksegment.last_stop < first_pts)
seeksegment.last_stop = first_pts;
/* update the rate in our src segment */
demux->sink_segment.rate = rate;
#if POST_10_10
GST_DEBUG_OBJECT (demux, "seek segment configured %" GST_SEGMENT_FORMAT,
&seeksegment);
#endif
if (flush) {
/* Stop flushing, the sinks are at time 0 now */
gst_flups_demux_send_event (demux, gst_event_new_flush_stop ());
} else {
gst_flups_demux_close_segment (demux);
}
if (flush || seeksegment.last_stop != demux->src_segment.last_stop) {
gst_flups_demux_flush (demux);
}
/* Ok seek succeeded, take the newly configured segment */
memcpy (&demux->src_segment, &seeksegment, sizeof (GstSegment));
/* Notify about the start of a new segment */
if (demux->src_segment.flags & GST_SEEK_FLAG_SEGMENT) {
gst_element_post_message (GST_ELEMENT (demux),
gst_message_new_segment_start (GST_OBJECT (demux),
demux->src_segment.format, demux->src_segment.last_stop));
}
/* Tell all the stream a new segment is needed */
gst_flups_demux_mark_discont (demux, TRUE, TRUE);
gst_pad_start_task (demux->sinkpad,
(GstTaskFunction) gst_flups_demux_loop, demux->sinkpad);
GST_PAD_STREAM_UNLOCK (demux->sinkpad);
gst_event_unref (event);
return TRUE;
/* ERRORS */
wrong_format:
{
GST_WARNING_OBJECT (demux, "we only support seeking in TIME or BYTES "
"formats");
gst_event_unref (event);
return FALSE;
}
no_scr_rate:
{
GST_WARNING_OBJECT (demux, "seek not possible, no scr_rate");
gst_event_unref (event);
return FALSE;
}
}
static gboolean
gst_flups_demux_src_event (GstPad * pad, GstEvent * event)
{
gboolean res = FALSE;
GstFluPSDemux *demux;
demux = GST_FLUPS_DEMUX (gst_pad_get_parent (pad));
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEEK:
if (demux->random_access) {
res = gst_flups_demux_handle_seek_pull (demux, event);
} else {
res = gst_flups_demux_handle_seek_push (demux, event);
}
break;
default: default:
res = gst_pad_push_event (demux->sinkpad, event); res = gst_pad_push_event (demux->sinkpad, event);
break; break;
@ -859,14 +1086,6 @@ gst_flups_demux_src_event (GstPad * pad, GstEvent * event)
gst_object_unref (demux); gst_object_unref (demux);
return res; return res;
not_supported:
{
gst_object_unref (demux);
gst_event_unref (event);
return FALSE;
}
} }
static gboolean static gboolean
@ -884,7 +1103,6 @@ gst_flups_demux_src_query (GstPad * pad, GstQuery * query)
case GST_QUERY_POSITION: case GST_QUERY_POSITION:
{ {
GstFormat format; GstFormat format;
gint64 position;
gst_query_parse_position (query, &format, NULL); gst_query_parse_position (query, &format, NULL);
@ -893,17 +1111,11 @@ gst_flups_demux_src_query (GstPad * pad, GstQuery * query)
format); format);
goto not_supported; goto not_supported;
} }
if (demux->current_scr == G_MAXUINT64 || demux->first_scr == G_MAXUINT64) {
GST_DEBUG_OBJECT (demux, "position not possible, no current_scr");
goto not_supported;
}
position = MPEGTIME_TO_GSTTIME (demux->current_scr - demux->first_scr);
GST_LOG_OBJECT (demux, "Position at GStreamer Time:%" GST_TIME_FORMAT, GST_LOG_OBJECT (demux, "Position at GStreamer Time:%" GST_TIME_FORMAT,
GST_TIME_ARGS (position)); GST_TIME_ARGS (demux->src_segment.last_stop));
gst_query_set_position (query, format, position); gst_query_set_position (query, format, demux->src_segment.last_stop);
res = TRUE; res = TRUE;
break; break;
} }
@ -915,6 +1127,14 @@ gst_flups_demux_src_query (GstPad * pad, GstQuery * query)
gst_query_parse_duration (query, &format, NULL); gst_query_parse_duration (query, &format, NULL);
if (G_LIKELY (format == GST_FORMAT_TIME &&
GST_CLOCK_TIME_IS_VALID (demux->src_segment.duration))) {
gst_query_set_duration (query, GST_FORMAT_TIME,
demux->src_segment.duration);
res = TRUE;
break;
}
if ((peer = gst_pad_get_peer (demux->sinkpad)) == NULL) { if ((peer = gst_pad_get_peer (demux->sinkpad)) == NULL) {
GST_DEBUG_OBJECT (demux, "duration not possible, no peer"); GST_DEBUG_OBJECT (demux, "duration not possible, no peer");
goto not_supported; goto not_supported;
@ -1694,13 +1914,534 @@ need_data:
} }
} }
static gboolean static inline gboolean
gst_flups_demux_is_pes_sync (guint32 sync) gst_flups_demux_is_pes_sync (guint32 sync)
{ {
return ((sync & 0xfc) == 0xbc) || return ((sync & 0xfc) == 0xbc) ||
((sync & 0xe0) == 0xc0) || ((sync & 0xf0) == 0xe0); ((sync & 0xe0) == 0xc0) || ((sync & 0xf0) == 0xe0);
} }
static inline gboolean
gst_flups_demux_scan_ts (GstFluPSDemux * demux, const guint8 * data,
SCAN_MODE mode, guint64 * rts)
{
gboolean ret = FALSE;
guint32 scr1, scr2;
guint64 scr;
guint64 pts, dts;
guint32 code;
/* read the 4 bytes for the sync code */
code = GST_READ_UINT32_BE (data);
if (G_LIKELY (code != ID_PS_PACK_START_CODE))
goto beach;
/* skip start code */
data += 4;
scr1 = GUINT32_FROM_BE (*(guint32 *) data);
scr2 = GUINT32_FROM_BE (*(guint32 *) (data + 4));
/* start parsing the stream */
if ((*data & 0xc0) == 0x40) {
guint32 scr_ext;
guint32 next32;
guint8 stuffing_bytes;
/* :2=01 ! scr:3 ! marker:1==1 ! scr:15 ! marker:1==1 ! scr:15 */
/* check markers */
if ((scr1 & 0xc4000400) != 0x44000400)
goto beach;
scr = ((guint64) scr1 & 0x38000000) << 3;
scr |= ((guint64) scr1 & 0x03fff800) << 4;
scr |= ((guint64) scr1 & 0x000003ff) << 5;
scr |= ((guint64) scr2 & 0xf8000000) >> 27;
/* marker:1==1 ! scr_ext:9 ! marker:1==1 */
if ((scr2 & 0x04010000) != 0x04010000)
goto beach;
scr_ext = (scr2 & 0x03fe0000) >> 17;
if (scr_ext) {
scr = (scr * 300 + scr_ext % 300) / 300;
}
/* SCR has been converted into units of 90Khz ticks to make it comparable
to DTS/PTS, that also implies 1 tick rounding error */
data += 6;
/* PMR:22 ! :2==11 ! reserved:5 ! stuffing_len:3 */
next32 = (GUINT32_FROM_BE ((*(guint32 *) data)));
if ((next32 & 0x00000300) != 0x00000300)
goto beach;
stuffing_bytes = (next32 & 0x07);
data += 4;
while (stuffing_bytes--) {
if (*data++ != 0xff)
goto beach;
}
} else {
/* check markers */
if ((scr1 & 0xf1000100) != 0x21000100)
goto beach;
if ((scr2 & 0x01800001) != 0x01800001)
goto beach;
/* :4=0010 ! scr:3 ! marker:1==1 ! scr:15 ! marker:1==1 ! scr:15 ! marker:1==1 */
scr = ((guint64) scr1 & 0x0e000000) << 5;
scr |= ((guint64) scr1 & 0x00fffe00) << 6;
scr |= ((guint64) scr1 & 0x000000ff) << 7;
scr |= ((guint64) scr2 & 0xfe000000) >> 25;
data += 8;
}
if (mode == SCAN_SCR) {
*rts = scr;
ret = TRUE;
}
/* read the 4 bytes for the PES sync code */
code = GST_READ_UINT32_BE (data);
if (!gst_flups_demux_is_pes_sync (code))
goto beach;
switch (code) {
case ID_PS_PROGRAM_STREAM_MAP:
case ID_PRIVATE_STREAM_2:
case ID_ECM_STREAM:
case ID_EMM_STREAM:
case ID_PROGRAM_STREAM_DIRECTORY:
case ID_DSMCC_STREAM:
case ID_ITU_TREC_H222_TYPE_E_STREAM:
case ID_PADDING_STREAM:
goto beach;
default:
break;
}
/* skip sync code and size */
data += 6;
pts = dts = -1;
/* stuffing bits, first two bits are '10' for mpeg2 pes so this code is
* not triggered. */
while (TRUE) {
if (*data != 0xff)
break;
data++;
}
/* STD buffer size, never for mpeg2 */
if ((*data & 0xc0) == 0x40)
data += 3;
/* PTS but no DTS, never for mpeg2 */
if ((*data & 0xf0) == 0x20) {
READ_TS (data, pts, beach);
}
/* PTS and DTS, never for mpeg2 */
else if ((*data & 0xf0) == 0x30) {
READ_TS (data, pts, beach);
READ_TS (data, dts, beach);
} else if ((*data & 0xc0) == 0x80) {
/* mpeg2 case */
guchar flags;
/* 2: '10'
* 2: PES_scrambling_control
* 1: PES_priority
* 1: data_alignment_indicator
* 1: copyright
* 1: original_or_copy
*/
flags = *data++;
if ((flags & 0xc0) != 0x80)
goto beach;
/* 2: PTS_DTS_flags
* 1: ESCR_flag
* 1: ES_rate_flag
* 1: DSM_trick_mode_flag
* 1: additional_copy_info_flag
* 1: PES_CRC_flag
* 1: PES_extension_flag
*/
flags = *data++;
/* 8: PES_header_data_length */
data++;
/* only DTS: this is invalid */
if ((flags & 0xc0) == 0x40)
goto beach;
/* check for PTS */
if ((flags & 0x80)) {
READ_TS (data, pts, beach);
}
/* check for DTS */
if ((flags & 0x40)) {
READ_TS (data, dts, beach);
}
}
if (mode == SCAN_DTS && dts != -1) {
*rts = dts;
ret = TRUE;
}
if (mode == SCAN_PTS && pts != -1) {
*rts = pts;
ret = TRUE;
}
beach:
return ret;
}
static inline void
gst_flups_demux_scan_forward_ts (GstFluPSDemux * demux, guint64 * pos,
SCAN_MODE mode, guint64 * rts)
{
GstFlowReturn ret = GST_FLOW_OK;
GstBuffer *buffer = NULL;
guint64 offset = *pos;
gboolean found = FALSE;
guint64 ts = 0;
/* read some data */
ret = gst_pad_pull_range (demux->sinkpad, offset, BLOCK_SZ, &buffer);
offset += BLOCK_SZ - SCAN_SZ + 1;
do {
const guint8 *data = GST_BUFFER_DATA (buffer);
do {
found = gst_flups_demux_scan_ts (demux, data++, mode, &ts);
} while (!found && data < (GST_BUFFER_DATA (buffer) + BLOCK_SZ - SCAN_SZ));
/* done with the buffer, unref it */
gst_buffer_unref (buffer);
if (found) {
*rts = ts;
*pos = offset - 1;
break;
}
ret = gst_pad_pull_range (demux->sinkpad, offset, BLOCK_SZ, &buffer);
offset += BLOCK_SZ;
} while (offset < (demux->sink_segment.stop * 0.10));
}
static inline void
gst_flups_demux_scan_backward_ts (GstFluPSDemux * demux, guint64 * pos,
SCAN_MODE mode, guint64 * rts)
{
GstFlowReturn ret = GST_FLOW_OK;
GstBuffer *buffer = NULL;
guint64 offset = *pos;
gboolean found = FALSE;
guint64 ts = 0;
/* read some data */
ret = gst_pad_pull_range (demux->sinkpad, offset, BLOCK_SZ, &buffer);
offset -= (BLOCK_SZ - SCAN_SZ + 1);
do {
const guint8 *data = GST_BUFFER_DATA (buffer) + BLOCK_SZ - SCAN_SZ;
do {
found = gst_flups_demux_scan_ts (demux, data--, mode, &ts);
} while (!found && data > GST_BUFFER_DATA (buffer));
/* done with the buffer, unref it */
gst_buffer_unref (buffer);
if (found) {
*rts = ts;
*pos = offset + 1;
break;
}
ret = gst_pad_pull_range (demux->sinkpad, offset, BLOCK_SZ, &buffer);
offset -= BLOCK_SZ;
} while (offset > (demux->sink_segment.stop * 0.90));
}
static inline gboolean
gst_flups_sink_get_duration (GstFluPSDemux * demux)
{
gboolean res = FALSE;
GstPad *peer;
GstFormat format = GST_FORMAT_BYTES;
gint64 length = 0;
guint64 offset;
/* init the sink segment */
gst_segment_init (&demux->sink_segment, format);
/* get peer to figure out length */
if ((peer = gst_pad_get_peer (demux->sinkpad)) == NULL)
goto beach;
res = gst_pad_query_duration (peer, &format, &length);
gst_object_unref (peer);
if (!res || length <= 0)
goto beach;
GST_DEBUG_OBJECT (demux, "file length %" G_GINT64_FORMAT, length);
/* update the sink segment */
demux->sink_segment.stop = length;
gst_segment_set_duration (&demux->sink_segment, format, length);
gst_segment_set_last_stop (&demux->sink_segment, format, 0);
/* Scan for notorious SCR and PTS to calculate the duration */
/* scan for first SCR in the stream */
offset = demux->sink_segment.start;
gst_flups_demux_scan_forward_ts (demux, &offset, SCAN_SCR, &demux->first_scr);
demux->base_time = MPEGTIME_TO_GSTTIME (demux->first_scr);
GST_DEBUG_OBJECT (demux, "First SCR: %" G_GINT64_FORMAT " %" GST_TIME_FORMAT
" in packet starting at %" G_GUINT64_FORMAT,
demux->first_scr, GST_TIME_ARGS (demux->base_time), offset);
/* scan for last SCR in the stream */
offset = demux->sink_segment.stop - BLOCK_SZ;
gst_flups_demux_scan_backward_ts (demux, &offset, SCAN_SCR, &demux->last_scr);
GST_DEBUG_OBJECT (demux, "Last SCR: %" G_GINT64_FORMAT " %" GST_TIME_FORMAT
" in packet starting at %" G_GUINT64_FORMAT,
demux->last_scr, GST_TIME_ARGS (MPEGTIME_TO_GSTTIME (demux->last_scr)),
offset);
/* scan for first PTS in the stream */
offset = demux->sink_segment.start;
gst_flups_demux_scan_forward_ts (demux, &offset, SCAN_PTS, &demux->first_pts);
GST_DEBUG_OBJECT (demux, "First PTS: %" G_GINT64_FORMAT " %" GST_TIME_FORMAT
" in packet starting at %" G_GUINT64_FORMAT,
demux->first_pts, GST_TIME_ARGS (MPEGTIME_TO_GSTTIME (demux->first_pts)),
offset);
/* scan for last PTS in the stream */
offset = demux->sink_segment.stop - BLOCK_SZ;
gst_flups_demux_scan_backward_ts (demux, &offset, SCAN_PTS, &demux->last_pts);
GST_DEBUG_OBJECT (demux, "Last PTS: %" G_GINT64_FORMAT " %" GST_TIME_FORMAT
" in packet starting at %" G_GUINT64_FORMAT,
demux->last_pts, GST_TIME_ARGS (MPEGTIME_TO_GSTTIME (demux->last_pts)),
offset);
if (G_LIKELY (demux->first_pts != G_MAXUINT64 &&
demux->last_pts != G_MAXUINT64)) {
/* update the src segment */
demux->src_segment.start = MPEGTIME_TO_GSTTIME (demux->first_pts);
demux->src_segment.stop = -1;
gst_segment_set_duration (&demux->src_segment, GST_FORMAT_TIME,
MPEGTIME_TO_GSTTIME (demux->last_pts - demux->first_pts));
gst_segment_set_last_stop (&demux->src_segment, GST_FORMAT_TIME,
demux->src_segment.start);
}
#if POST_10_10
GST_INFO_OBJECT (demux, "sink segment configured %" GST_SEGMENT_FORMAT,
&demux->sink_segment);
GST_INFO_OBJECT (demux, "src segment configured %" GST_SEGMENT_FORMAT,
&demux->src_segment);
#endif
res = TRUE;
beach:
return res;
}
static inline GstFlowReturn
gst_flups_demux_pull_block (GstPad * pad, GstFluPSDemux * demux,
guint64 offset, guint size)
{
GstFlowReturn ret = GST_FLOW_OK;
GstBuffer *buffer;
ret = gst_pad_pull_range (pad, offset, size, &buffer);
if (G_UNLIKELY (ret != GST_FLOW_OK)) {
GST_DEBUG_OBJECT (demux, "pull range at %" G_GUINT64_FORMAT
" size %u failed", offset, size);
goto beach;
} else
GST_LOG_OBJECT (demux, "pull range at %" G_GUINT64_FORMAT
" size %u done", offset, size);
if (demux->sink_segment.rate < 0) {
GST_LOG_OBJECT (demux, "setting discont flag on backward rate");
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
}
ret = gst_flups_demux_chain (pad, buffer);
beach:
return ret;
}
static void
gst_flups_demux_loop (GstPad * pad)
{
GstFluPSDemux *demux;
GstFlowReturn ret = GST_FLOW_OK;
guint offset = 0;
demux = GST_FLUPS_DEMUX (gst_pad_get_parent (pad));
if (G_UNLIKELY (demux->sink_segment.format == GST_FORMAT_UNDEFINED))
gst_flups_sink_get_duration (demux);
offset = demux->sink_segment.last_stop;
if (demux->sink_segment.rate >= 0) {
guint size = BLOCK_SZ;
if (G_LIKELY (demux->sink_segment.stop != -1)) {
size = MIN (size, demux->sink_segment.stop - offset);
}
/* pull in data */
ret = gst_flups_demux_pull_block (pad, demux, offset, size);
/* pause if something went wrong */
if (G_UNLIKELY (ret != GST_FLOW_OK))
goto pause;
/* update our position */
offset += size;
gst_segment_set_last_stop (&demux->sink_segment, GST_FORMAT_BYTES, offset);
/* check EOS condition */
if ((demux->src_segment.flags & GST_SEEK_FLAG_SEGMENT) &&
((demux->sink_segment.last_stop >= demux->sink_segment.stop) ||
(demux->src_segment.stop != -1 &&
demux->src_segment.last_stop >= demux->src_segment.stop))) {
ret = GST_FLOW_UNEXPECTED;
goto pause;
}
} else { /* Reverse playback */
guint size = MIN (offset, BLOCK_SZ);
/* pull in data */
ret = gst_flups_demux_pull_block (pad, demux, offset - size, size);
/* pause if something went wrong */
if (G_UNLIKELY (ret != GST_FLOW_OK))
goto pause;
/* update our position */
offset -= size;
gst_segment_set_last_stop (&demux->sink_segment, GST_FORMAT_BYTES, offset);
/* check EOS condition */
if (demux->sink_segment.last_stop <= demux->sink_segment.start ||
demux->src_segment.last_stop <= demux->src_segment.start) {
ret = GST_FLOW_UNEXPECTED;
goto pause;
}
}
gst_object_unref (demux);
return;
pause:
{
const gchar *reason = gst_flow_get_name (ret);
GST_LOG_OBJECT (demux, "pausing task, reason %s", reason);
gst_pad_pause_task (pad);
if (GST_FLOW_IS_FATAL (ret) || ret == GST_FLOW_NOT_LINKED) {
if (ret == GST_FLOW_UNEXPECTED) {
/* perform EOS logic */
gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
if (demux->src_segment.flags & GST_SEEK_FLAG_SEGMENT) {
gint64 stop;
/* for segment playback we need to post when (in stream time)
* we stopped, this is either stop (when set) or the duration. */
if ((stop = demux->src_segment.stop) == -1)
stop = demux->src_segment.duration;
if (demux->sink_segment.rate >= 0) {
GST_LOG_OBJECT (demux, "Sending segment done, at end of segment");
gst_element_post_message (GST_ELEMENT_CAST (demux),
gst_message_new_segment_done (GST_OBJECT_CAST (demux),
GST_FORMAT_TIME, stop));
} else { /* Reverse playback */
GST_LOG_OBJECT (demux, "Sending segment done, at beginning of "
"segment");
gst_element_post_message (GST_ELEMENT_CAST (demux),
gst_message_new_segment_done (GST_OBJECT_CAST (demux),
GST_FORMAT_TIME, demux->src_segment.start));
}
} else {
/* normal playback, send EOS to all linked pads */
gst_element_no_more_pads (GST_ELEMENT (demux));
GST_LOG_OBJECT (demux, "Sending EOS, at end of stream");
if (!gst_flups_demux_send_event (demux, gst_event_new_eos ())) {
GST_WARNING_OBJECT (demux, "failed pushing EOS on streams");
GST_ELEMENT_ERROR (demux, STREAM, FAILED,
("Internal data stream error."), ("Can't push EOS downstream"));
}
}
} else {
GST_ELEMENT_ERROR (demux, STREAM, FAILED,
("Internal data stream error."),
("stream stopped, reason %s", reason));
gst_flups_demux_send_event (demux, gst_event_new_eos ());
}
}
gst_object_unref (demux);
return;
}
}
/* If we can pull that's prefered */
static gboolean
gst_flups_demux_sink_activate (GstPad * sinkpad)
{
if (gst_pad_check_pull_range (sinkpad)) {
return gst_pad_activate_pull (sinkpad, TRUE);
} else {
return gst_pad_activate_push (sinkpad, TRUE);
}
}
/* This function gets called when we activate ourselves in push mode. */
static gboolean
gst_flups_demux_sink_activate_push (GstPad * sinkpad, gboolean active)
{
GstFluPSDemux *demux;
demux = GST_FLUPS_DEMUX (gst_pad_get_parent (sinkpad));
demux->random_access = FALSE;
gst_object_unref (demux);
return TRUE;
}
/* this function gets called when we activate ourselves in pull mode.
* We can perform random access to the resource and we start a task
* to start reading */
static gboolean
gst_flups_demux_sink_activate_pull (GstPad * sinkpad, gboolean active)
{
GstFluPSDemux *demux;
demux = GST_FLUPS_DEMUX (gst_pad_get_parent (sinkpad));
if (active) {
GST_DEBUG ("pull mode activated");
demux->random_access = TRUE;
gst_object_unref (demux);
return gst_pad_start_task (sinkpad, (GstTaskFunction) gst_flups_demux_loop,
sinkpad);
} else {
demux->random_access = FALSE;
gst_object_unref (demux);
return gst_pad_stop_task (sinkpad);
}
}
static GstFlowReturn static GstFlowReturn
gst_flups_demux_chain (GstPad * pad, GstBuffer * buffer) gst_flups_demux_chain (GstPad * pad, GstBuffer * buffer)
{ {
@ -1855,10 +2596,14 @@ gst_flups_demux_change_state (GstElement * element, GstStateChange transition)
(GstPESFilterResync) gst_flups_demux_resync_cb, demux); (GstPESFilterResync) gst_flups_demux_resync_cb, demux);
demux->filter.gather_pes = TRUE; demux->filter.gather_pes = TRUE;
demux->first_scr = G_MAXUINT64; demux->first_scr = G_MAXUINT64;
demux->last_scr = G_MAXUINT64;
demux->current_scr = G_MAXUINT64; demux->current_scr = G_MAXUINT64;
demux->base_time = G_MAXUINT64; demux->base_time = G_MAXUINT64;
demux->scr_rate_n = G_MAXUINT64; demux->scr_rate_n = G_MAXUINT64;
demux->scr_rate_d = G_MAXUINT64; demux->scr_rate_d = G_MAXUINT64;
demux->first_pts = G_MAXUINT64;
demux->last_pts = G_MAXUINT64;
demux->is_segment_open = FALSE;
break; break;
case GST_STATE_CHANGE_READY_TO_PAUSED: case GST_STATE_CHANGE_READY_TO_PAUSED:
demux->current_scr = G_MAXUINT64; demux->current_scr = G_MAXUINT64;
@ -1866,11 +2611,14 @@ gst_flups_demux_change_state (GstElement * element, GstStateChange transition)
demux->next_pts = G_MAXUINT64; demux->next_pts = G_MAXUINT64;
demux->next_dts = G_MAXUINT64; demux->next_dts = G_MAXUINT64;
demux->first_scr = G_MAXUINT64; demux->first_scr = G_MAXUINT64;
demux->last_scr = G_MAXUINT64;
demux->base_time = G_MAXUINT64; demux->base_time = G_MAXUINT64;
demux->scr_rate_n = G_MAXUINT64; demux->scr_rate_n = G_MAXUINT64;
demux->scr_rate_d = G_MAXUINT64; demux->scr_rate_d = G_MAXUINT64;
demux->need_no_more_pads = TRUE; demux->need_no_more_pads = TRUE;
demux->first_pts = G_MAXUINT64;
demux->last_pts = G_MAXUINT64;
demux->is_segment_open = FALSE;
gst_flups_demux_reset_psm (demux); gst_flups_demux_reset_psm (demux);
gst_segment_init (&demux->sink_segment, GST_FORMAT_UNDEFINED); gst_segment_init (&demux->sink_segment, GST_FORMAT_UNDEFINED);
gst_segment_init (&demux->src_segment, GST_FORMAT_TIME); gst_segment_init (&demux->src_segment, GST_FORMAT_TIME);

View file

@ -50,14 +50,12 @@
#include "gstpesfilter.h" #include "gstpesfilter.h"
G_BEGIN_DECLS G_BEGIN_DECLS
#define GST_TYPE_FLUPS_DEMUX (gst_flups_demux_get_type()) #define GST_TYPE_FLUPS_DEMUX (gst_flups_demux_get_type())
#define GST_FLUPS_DEMUX(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_FLUPS_DEMUX,GstFluPSDemux)) #define GST_FLUPS_DEMUX(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_FLUPS_DEMUX,GstFluPSDemux))
#define GST_FLUPS_DEMUX_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_FLUPS_DEMUX,GstFluPSDemuxClass)) #define GST_FLUPS_DEMUX_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_FLUPS_DEMUX,GstFluPSDemuxClass))
#define GST_FLUPS_DEMUX_GET_CLASS(klass) (G_TYPE_INSTANCE_GET_CLASS((klass),GST_TYPE_FLUPS_DEMUX,GstFluPSDemuxClass)) #define GST_FLUPS_DEMUX_GET_CLASS(klass) (G_TYPE_INSTANCE_GET_CLASS((klass),GST_TYPE_FLUPS_DEMUX,GstFluPSDemuxClass))
#define GST_IS_FLUPS_DEMUX(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_FLUPS_DEMUX)) #define GST_IS_FLUPS_DEMUX(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_FLUPS_DEMUX))
#define GST_IS_FLUPS_DEMUX_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_FLUPS_DEMUX)) #define GST_IS_FLUPS_DEMUX_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_FLUPS_DEMUX))
typedef struct _GstFluPSStream GstFluPSStream; typedef struct _GstFluPSStream GstFluPSStream;
typedef struct _GstFluPSDemux GstFluPSDemux; typedef struct _GstFluPSDemux GstFluPSDemux;
typedef struct _GstFluPSDemuxClass GstFluPSDemuxClass; typedef struct _GstFluPSDemuxClass GstFluPSDemuxClass;
@ -65,20 +63,23 @@ typedef struct _GstFluPSDemuxClass GstFluPSDemuxClass;
#define GST_FLUPS_DEMUX_MAX_STREAMS 256 #define GST_FLUPS_DEMUX_MAX_STREAMS 256
#define GST_FLUPS_DEMUX_MAX_PSM 256 #define GST_FLUPS_DEMUX_MAX_PSM 256
typedef enum { typedef enum
{
GST_FLUPS_DEMUX_SYNC_AUTO = 0, GST_FLUPS_DEMUX_SYNC_AUTO = 0,
GST_FLUPS_DEMUX_SYNC_SCR = 1, GST_FLUPS_DEMUX_SYNC_SCR = 1,
GST_FLUPS_DEMUX_SYNC_DTS = 2 GST_FLUPS_DEMUX_SYNC_DTS = 2
} GstFluPSDemuxSync; } GstFluPSDemuxSync;
typedef enum { typedef enum
{
STATE_FLUPS_DEMUX_NEED_SYNC, STATE_FLUPS_DEMUX_NEED_SYNC,
STATE_FLUPS_DEMUX_SYNCED, STATE_FLUPS_DEMUX_SYNCED,
STATE_FLUPS_DEMUX_NEED_MORE_DATA, STATE_FLUPS_DEMUX_NEED_MORE_DATA,
} GstFluPSDemuxState; } GstFluPSDemuxState;
/* Information associated with a single FluPS stream. */ /* Information associated with a single FluPS stream. */
struct _GstFluPSStream { struct _GstFluPSStream
{
GstPad *pad; GstPad *pad;
gint id; gint id;
@ -90,10 +91,12 @@ struct _GstFluPSStream {
gboolean need_segment; gboolean need_segment;
}; };
struct _GstFluPSDemux { struct _GstFluPSDemux
{
GstElement parent; GstElement parent;
GstPad *sinkpad; GstPad *sinkpad;
gboolean random_access; /* If we operate in pull mode */
GstAdapter *adapter; GstAdapter *adapter;
GstAdapter *rev_adapter; GstAdapter *rev_adapter;
@ -103,6 +106,7 @@ struct _GstFluPSDemux {
gint64 mux_rate; gint64 mux_rate;
guint64 first_scr; guint64 first_scr;
guint64 last_scr;
guint64 first_dts; guint64 first_dts;
guint64 base_time; guint64 base_time;
guint64 current_scr; guint64 current_scr;
@ -114,10 +118,14 @@ struct _GstFluPSDemux {
guint64 first_scr_offset; guint64 first_scr_offset;
guint64 last_scr_offset; guint64 last_scr_offset;
guint64 first_pts;
guint64 last_pts;
gint16 psm[GST_FLUPS_DEMUX_MAX_PSM]; gint16 psm[GST_FLUPS_DEMUX_MAX_PSM];
GstSegment sink_segment; GstSegment sink_segment;
GstSegment src_segment; GstSegment src_segment;
gboolean is_segment_open;
/* stream output */ /* stream output */
GstFluPSStream *current_stream; GstFluPSStream *current_stream;
@ -134,7 +142,8 @@ struct _GstFluPSDemux {
GstEvent *lang_codes; GstEvent *lang_codes;
}; };
struct _GstFluPSDemuxClass { struct _GstFluPSDemuxClass
{
GstElementClass parent_class; GstElementClass parent_class;
GstPadTemplate *sink_template; GstPadTemplate *sink_template;
@ -148,5 +157,4 @@ GType gst_flups_demux_get_type (void);
gboolean gst_flups_demux_plugin_init (GstPlugin * plugin); gboolean gst_flups_demux_plugin_init (GstPlugin * plugin);
G_END_DECLS G_END_DECLS
#endif /* __GST_FLUPS_DEMUX_H__ */ #endif /* __GST_FLUPS_DEMUX_H__ */

View file

@ -97,17 +97,6 @@ gst_pes_filter_set_callbacks (GstPESFilter * filter,
filter->user_data = user_data; filter->user_data = user_data;
} }
/* sync:4 == 00xx ! pts:3 ! 1 ! pts:15 ! 1 | pts:15 ! 1 */
#define READ_TS(data, target, lost_sync_label) \
if ((*data & 0x01) != 0x01) goto lost_sync_label; \
target = ((guint64) (*data++ & 0x0E)) << 29; \
target |= ((guint64) (*data++ )) << 22; \
if ((*data & 0x01) != 0x01) goto lost_sync_label; \
target |= ((guint64) (*data++ & 0xFE)) << 14; \
target |= ((guint64) (*data++ )) << 7; \
if ((*data & 0x01) != 0x01) goto lost_sync_label; \
target |= ((guint64) (*data++ & 0xFE)) >> 1;
static gboolean static gboolean
gst_pes_filter_is_sync (guint32 sync) gst_pes_filter_is_sync (guint32 sync)
{ {