tsdemux: Wait for valid PCR/offset obvervations

It is quite possible that we might get PTS/DTS before the first
PCR/Offset observation.

In order to end up with valid timestamp we wait until at least one
stream was able to get a proper running-time for any PTS/DTS.
Until then, we queue up the pending buffers to push out.

Once we see a first valid timestamp, we re-evaluate the amount of
running-time elapsed (based on returned inital running-time and amount
of data/DTS queued up) for any given stream.

Taking the biggest amount of elapsed time, we set that on the packetizer
as the initial offset and recalculate all pending buffers running-time
PTS/DTS.

Note: The buffer queueing system can also be used later on for the
dvb fast start proposal (where we queue up all stream packets before
seeing PAT/PMT and then push them once we know if they belong to the
chosen program).
This commit is contained in:
Edward Hervey 2013-07-29 08:10:07 +02:00
parent a4ee1abb15
commit 0e9ce593bf

View file

@ -97,6 +97,16 @@ typedef enum
* Drop all incoming buffers */ * Drop all incoming buffers */
} PendingPacketState; } PendingPacketState;
/* Pending buffer */
typedef struct
{
/* The fully reconstructed buffer */
GstBuffer *buffer;
/* Raw PTS/DTS (in 90kHz units) */
guint64 pts, dts;
} PendingBuffer;
typedef struct _TSDemuxStream TSDemuxStream; typedef struct _TSDemuxStream TSDemuxStream;
struct _TSDemuxStream struct _TSDemuxStream
@ -104,35 +114,49 @@ struct _TSDemuxStream
MpegTSBaseStream stream; MpegTSBaseStream stream;
GstPad *pad; GstPad *pad;
/* Whether the pad was added or not */ /* Whether the pad was added or not */
gboolean active; gboolean active;
/* TRUE if we are waiting for a valid timestamp */
gboolean pending_ts;
/* the return of the latest push */ /* the return of the latest push */
GstFlowReturn flow_return; GstFlowReturn flow_return;
/* Output data */ /* Output data */
PendingPacketState state; PendingPacketState state;
/* Data to push (allocated) */ /* Data being reconstructed (allocated) */
guint8 *data; guint8 *data;
/* Size of data to push (if known) */ /* Size of data being reconstructed (if known, else 0) */
guint expected_size; guint expected_size;
/* Size of currently queued data */ /* Amount of bytes in current ->data */
guint current_size; guint current_size;
/* Size of ->data */
guint allocated_size; guint allocated_size;
/* Current PTS/DTS for this stream */ /* Current PTS/DTS for this stream (in running time) */
GstClockTime pts; GstClockTime pts;
GstClockTime dts; GstClockTime dts;
/* Current PTS/DTS for this stream (in 90kHz unit) */
guint64 raw_pts, raw_dts;
/* Whether this stream needs to send a newsegment */ /* Whether this stream needs to send a newsegment */
gboolean need_newsegment; gboolean need_newsegment;
/* The value to use when calculating the newsegment */
GstClockTime first_dts;
GstTagList *taglist; GstTagList *taglist;
gint continuity_counter; gint continuity_counter;
/* List of pending buffers */
GList *pending;
}; };
#define VIDEO_CAPS \ #define VIDEO_CAPS \
@ -1059,6 +1083,10 @@ gst_ts_demux_stream_added (MpegTSBase * base, MpegTSBaseStream * bstream,
stream->need_newsegment = TRUE; stream->need_newsegment = TRUE;
stream->pts = GST_CLOCK_TIME_NONE; stream->pts = GST_CLOCK_TIME_NONE;
stream->dts = GST_CLOCK_TIME_NONE; stream->dts = GST_CLOCK_TIME_NONE;
stream->raw_pts = -1;
stream->raw_dts = -1;
stream->pending_ts = TRUE;
stream->first_dts = GST_CLOCK_TIME_NONE;
stream->continuity_counter = CONTINUITY_UNSET; stream->continuity_counter = CONTINUITY_UNSET;
} }
stream->flow_return = GST_FLOW_OK; stream->flow_return = GST_FLOW_OK;
@ -1121,8 +1149,6 @@ activate_pad_for_stream (GstTSDemux * tsdemux, TSDemuxStream * stream)
static void static void
gst_ts_demux_stream_flush (TSDemuxStream * stream) gst_ts_demux_stream_flush (TSDemuxStream * stream)
{ {
stream->pts = GST_CLOCK_TIME_NONE;
GST_DEBUG ("flushing stream %p", stream); GST_DEBUG ("flushing stream %p", stream);
if (stream->data) if (stream->data)
@ -1135,6 +1161,9 @@ gst_ts_demux_stream_flush (TSDemuxStream * stream)
stream->need_newsegment = TRUE; stream->need_newsegment = TRUE;
stream->pts = GST_CLOCK_TIME_NONE; stream->pts = GST_CLOCK_TIME_NONE;
stream->dts = GST_CLOCK_TIME_NONE; stream->dts = GST_CLOCK_TIME_NONE;
stream->first_dts = GST_CLOCK_TIME_NONE;
stream->raw_pts = -1;
stream->raw_dts = -1;
if (stream->flow_return == GST_FLOW_FLUSHING) { if (stream->flow_return == GST_FLOW_FLUSHING) {
stream->flow_return = GST_FLOW_OK; stream->flow_return = GST_FLOW_OK;
} }
@ -1190,6 +1219,7 @@ gst_ts_demux_record_pts (GstTSDemux * demux, TSDemuxStream * stream,
{ {
MpegTSBaseStream *bs = (MpegTSBaseStream *) stream; MpegTSBaseStream *bs = (MpegTSBaseStream *) stream;
stream->raw_pts = pts;
if (pts == -1) { if (pts == -1) {
stream->pts = GST_CLOCK_TIME_NONE; stream->pts = GST_CLOCK_TIME_NONE;
return; return;
@ -1223,6 +1253,7 @@ gst_ts_demux_record_dts (GstTSDemux * demux, TSDemuxStream * stream,
{ {
MpegTSBaseStream *bs = (MpegTSBaseStream *) stream; MpegTSBaseStream *bs = (MpegTSBaseStream *) stream;
stream->raw_dts = dts;
if (dts == -1) { if (dts == -1) {
stream->dts = GST_CLOCK_TIME_NONE; stream->dts = GST_CLOCK_TIME_NONE;
return; return;
@ -1250,6 +1281,131 @@ gst_ts_demux_record_dts (GstTSDemux * demux, TSDemuxStream * stream,
} }
} }
/* This is called when we haven't got a valid initial PTS/DTS on all streams */
static gboolean
check_pending_buffers (GstTSDemux * demux, TSDemuxStream * stream)
{
gboolean have_observation = FALSE;
/* The biggest offset */
guint64 offset = 0;
GList *tmp;
/* 1. Go over all streams */
for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
TSDemuxStream *tmpstream = (TSDemuxStream *) tmp->data;
/* 1.1 check if at least one stream got a valid DTS */
if ((tmpstream->raw_dts != -1 && tmpstream->dts != GST_CLOCK_TIME_NONE) ||
(tmpstream->raw_pts != -1 && tmpstream->pts != GST_CLOCK_TIME_NONE)) {
have_observation = TRUE;
break;
}
}
/* 2. If we don't have a valid value yet, break out */
if (have_observation == FALSE)
return FALSE;
/* 3. Go over all streams that have current/pending data */
for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
TSDemuxStream *tmpstream = (TSDemuxStream *) tmp->data;
PendingBuffer *pend;
guint64 firstval, lastval, ts;
/* 3.1 Calculate the offset between current DTS and first DTS */
if (tmpstream->pending == NULL || tmpstream->state == PENDING_PACKET_EMPTY)
continue;
/* If we don't have any pending data, the offset is 0 for this stream */
if (tmpstream->pending == NULL)
break;
if (tmpstream->raw_dts != -1)
lastval = tmpstream->raw_dts;
else if (tmpstream->raw_pts != -1)
lastval = tmpstream->raw_pts;
else {
GST_WARNING ("Don't have a last DTS/PTS to use for offset recalculation");
continue;
}
pend = tmpstream->pending->data;
if (pend->dts != -1)
firstval = pend->dts;
else if (pend->pts != -1)
firstval = pend->pts;
else {
GST_WARNING
("Don't have a first DTS/PTS to use for offset recalculation");
continue;
}
/* 3.2 Add to the offset the report TS for the current DTS */
ts = mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux),
MPEGTIME_TO_GSTTIME (lastval), demux->program->pcr_pid);
if (ts == GST_CLOCK_TIME_NONE) {
GST_WARNING ("THIS SHOULD NOT HAPPEN !");
continue;
}
ts += MPEGTIME_TO_GSTTIME (lastval - firstval);
/* 3.3 If that offset is bigger than the current offset, store it */
if (ts > offset)
offset = ts;
}
GST_DEBUG ("New initial pcr_offset %" GST_TIME_FORMAT,
GST_TIME_ARGS (offset));
/* 4. Set the offset on the packetizer */
mpegts_packetizer_set_current_pcr_offset (MPEG_TS_BASE_PACKETIZER (demux),
offset, demux->program->pcr_pid);
/* 4. Go over all streams */
for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
TSDemuxStream *stream = (TSDemuxStream *) tmp->data;
stream->pending_ts = FALSE;
/* 4.1 Set pending_ts for FALSE */
/* 4.2 Recalculate PTS/DTS (in running time) for pending data */
if (stream->pending) {
GList *tmp2;
for (tmp2 = stream->pending; tmp2; tmp2 = tmp2->next) {
PendingBuffer *pend = (PendingBuffer *) tmp2->data;
if (pend->pts != -1)
GST_BUFFER_PTS (pend->buffer) =
mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux),
MPEGTIME_TO_GSTTIME (pend->pts), demux->program->pcr_pid);
if (pend->dts != -1)
GST_BUFFER_DTS (pend->buffer) =
mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux),
MPEGTIME_TO_GSTTIME (pend->dts), demux->program->pcr_pid);
/* 4.2.2 Set first_dts to TS of lowest DTS (for segment) */
if (stream->first_dts == GST_CLOCK_TIME_NONE) {
if (GST_BUFFER_DTS (pend->buffer) != GST_CLOCK_TIME_NONE)
stream->first_dts = GST_BUFFER_DTS (pend->buffer);
else if (GST_BUFFER_PTS (pend->buffer) != GST_CLOCK_TIME_NONE)
stream->first_dts = GST_BUFFER_PTS (pend->buffer);
}
}
}
/* Recalculate PTS/DTS (in running time) for current data */
if (stream->state != PENDING_PACKET_EMPTY) {
if (stream->raw_dts != -1) {
stream->dts =
mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux),
MPEGTIME_TO_GSTTIME (stream->raw_dts), demux->program->pcr_pid);
if (stream->first_dts == GST_CLOCK_TIME_NONE)
stream->first_dts = stream->dts;
}
if (stream->raw_pts != -1) {
stream->pts =
mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux),
MPEGTIME_TO_GSTTIME (stream->raw_pts), demux->program->pcr_pid);
if (stream->first_dts == GST_CLOCK_TIME_NONE)
stream->first_dts = stream->pts;
}
}
}
return TRUE;
}
static void static void
gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream, gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream,
guint8 * data, guint32 length, guint64 bufferoffset) guint8 * data, guint32 length, guint64 bufferoffset)
@ -1270,6 +1426,17 @@ gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream,
gst_ts_demux_record_dts (demux, stream, header.DTS, bufferoffset); gst_ts_demux_record_dts (demux, stream, header.DTS, bufferoffset);
gst_ts_demux_record_pts (demux, stream, header.PTS, bufferoffset); gst_ts_demux_record_pts (demux, stream, header.PTS, bufferoffset);
if (G_UNLIKELY (stream->pending_ts &&
(stream->pts != GST_CLOCK_TIME_NONE
|| stream->dts != GST_CLOCK_TIME_NONE))) {
GST_DEBUG ("Got pts/dts update, rechecking all streams");
check_pending_buffers (demux, stream);
} else if (stream->first_dts == GST_CLOCK_TIME_NONE) {
if (GST_CLOCK_TIME_IS_VALID (stream->dts))
stream->first_dts = stream->dts;
else if (GST_CLOCK_TIME_IS_VALID (stream->pts))
stream->first_dts = stream->pts;
}
GST_DEBUG_OBJECT (demux, GST_DEBUG_OBJECT (demux,
"stream PTS %" GST_TIME_FORMAT " DTS %" GST_TIME_FORMAT, "stream PTS %" GST_TIME_FORMAT " DTS %" GST_TIME_FORMAT,
@ -1413,13 +1580,10 @@ calculate_and_push_newsegment (GstTSDemux * demux, TSDemuxStream * stream)
for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) { for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
TSDemuxStream *pstream = (TSDemuxStream *) tmp->data; TSDemuxStream *pstream = (TSDemuxStream *) tmp->data;
if (GST_CLOCK_TIME_IS_VALID (pstream->pts)) { if (GST_CLOCK_TIME_IS_VALID (pstream->first_dts)) {
if (!GST_CLOCK_TIME_IS_VALID (lowest_pts) || pstream->pts < lowest_pts) if (!GST_CLOCK_TIME_IS_VALID (lowest_pts)
lowest_pts = pstream->pts; || pstream->first_dts < lowest_pts)
} lowest_pts = pstream->first_dts;
if (GST_CLOCK_TIME_IS_VALID (pstream->dts)) {
if (!GST_CLOCK_TIME_IS_VALID (lowest_pts) || pstream->dts < lowest_pts)
lowest_pts = pstream->dts;
} }
} }
if (GST_CLOCK_TIME_IS_VALID (lowest_pts)) if (GST_CLOCK_TIME_IS_VALID (lowest_pts))
@ -1515,24 +1679,48 @@ gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream)
goto beach; goto beach;
} }
if (G_UNLIKELY (!stream->active))
activate_pad_for_stream (demux, stream);
if (G_UNLIKELY (stream->pad == NULL)) {
g_free (stream->data);
goto beach;
}
if (G_UNLIKELY (demux->program == NULL)) { if (G_UNLIKELY (demux->program == NULL)) {
GST_LOG_OBJECT (demux, "No program"); GST_LOG_OBJECT (demux, "No program");
g_free (stream->data); g_free (stream->data);
goto beach; goto beach;
} }
buffer = gst_buffer_new_wrapped (stream->data, stream->current_size);
if (G_UNLIKELY (stream->pending_ts && !check_pending_buffers (demux, stream))) {
PendingBuffer *pend;
pend = g_slice_new0 (PendingBuffer);
pend->buffer = buffer;
pend->pts = stream->raw_pts;
pend->dts = stream->raw_dts;
stream->pending = g_list_append (stream->pending, pend);
GST_DEBUG ("Not enough information to push buffers yet, storing buffer");
goto beach;
}
if (G_UNLIKELY (!stream->active))
activate_pad_for_stream (demux, stream);
if (G_UNLIKELY (stream->need_newsegment)) if (G_UNLIKELY (stream->need_newsegment))
calculate_and_push_newsegment (demux, stream); calculate_and_push_newsegment (demux, stream);
buffer = gst_buffer_new_wrapped (stream->data, stream->current_size); /* FIXME : Push pending buffers if any */
if (G_UNLIKELY (stream->pending)) {
GList *tmp;
for (tmp = stream->pending; tmp; tmp = tmp->next) {
PendingBuffer *pend = (PendingBuffer *) tmp->data;
GST_DEBUG_OBJECT (stream->pad,
"Pushing pending buffer PTS:%" GST_TIME_FORMAT " DTS:%"
GST_TIME_FORMAT, GST_TIME_ARGS (GST_BUFFER_PTS (pend->buffer)),
GST_TIME_ARGS (GST_BUFFER_DTS (pend->buffer)));
res = gst_pad_push (stream->pad, pend->buffer);
g_slice_free (PendingBuffer, pend);
}
g_list_free (stream->pending);
stream->pending = NULL;
}
GST_DEBUG_OBJECT (stream->pad, "stream->pts %" GST_TIME_FORMAT, GST_DEBUG_OBJECT (stream->pad, "stream->pts %" GST_TIME_FORMAT,
GST_TIME_ARGS (stream->pts)); GST_TIME_ARGS (stream->pts));