mpegtsmux: improve m2ts timestamp interpolation

* a known PCR should really belong to a packet holding that PCR
* interpolation can be extended backward for initial packets (PAT, PMT)
This commit is contained in:
Mark Nauwelaerts 2012-06-07 14:38:10 +02:00
parent 8637fd69e5
commit ead42a5e27
4 changed files with 60 additions and 53 deletions

View file

@ -284,7 +284,7 @@ mpegtsmux_reset (MpegTsMux * mux, gboolean alloc)
mux->first = TRUE;
mux->last_flow_ret = GST_FLOW_OK;
mux->first_pcr = TRUE;
mux->previous_pcr = -1;
mux->last_ts = 0;
mux->is_delta = TRUE;
@ -1203,7 +1203,7 @@ new_packet_m2ts (MpegTsMux * mux, guint8 * data, guint len, gint64 new_pcr)
GST_BUFFER_TIMESTAMP (buf) = mux->last_ts;
if (new_pcr < 0) {
/* If theres no pcr in current ts packet then just add the packet
/* If there is no pcr in current ts packet then just add the packet
to the adapter for later output when we see a PCR */
GST_LOG_OBJECT (mux, "Accumulating non-PCR packet");
gst_adapter_push (mux->adapter, buf);
@ -1212,57 +1212,53 @@ new_packet_m2ts (MpegTsMux * mux, guint8 * data, guint len, gint64 new_pcr)
chunk_bytes = gst_adapter_available (mux->adapter);
/* We have a new PCR, output anything in the adapter */
if (mux->first_pcr) {
/* We can't generate sensible timestamps for anything that might
* be in the adapter preceding the first PCR and will hit a divide
* by zero, so empty the adapter. This is probably a null op. */
gst_adapter_clear (mux->adapter);
/* Warn if we threw anything away */
if (chunk_bytes) {
GST_ELEMENT_WARNING (mux, STREAM, MUX,
("Discarding %d bytes from stream preceding first PCR",
chunk_bytes / M2TS_PACKET_LENGTH * NORMAL_TS_PACKET_LENGTH),
(NULL));
chunk_bytes = 0;
}
mux->first_pcr = FALSE;
/* no first interpolation point yet, then this is the one,
* otherwise it is the second interpolation point */
if (mux->previous_pcr < 0 && chunk_bytes) {
mux->previous_pcr = new_pcr;
mux->previous_offset = chunk_bytes;
GST_LOG_OBJECT (mux, "Accumulating non-PCR packet");
gst_adapter_push (mux->adapter, buf);
return TRUE;
}
if (chunk_bytes) {
/* Start the PCR offset counting at 192 bytes: At the end of the packet
* that had the last PCR */
guint64 pcr_bytes = M2TS_PACKET_LENGTH, ts_rate;
/* interpolate if needed, and 2 points available */
if (chunk_bytes && (new_pcr != mux->previous_pcr)) {
gint64 offset = 0;
/* Include the pending packet size to get the ts_rate right */
chunk_bytes += M2TS_PACKET_LENGTH;
GST_LOG_OBJECT (mux, "Processing pending packets; "
"previous pcr %" G_GINT64_FORMAT ", previous offset %d, "
"current pcr %" G_GINT64_FORMAT ", current offset %d",
mux->previous_pcr, (gint) mux->previous_offset,
new_pcr, (gint) chunk_bytes);
/* calculate rate based on latest and previous pcr values */
ts_rate = gst_util_uint64_scale (chunk_bytes, CLOCK_FREQ_SCR,
(new_pcr - mux->previous_pcr));
GST_LOG_OBJECT (mux, "Processing pending packets with ts_rate %"
G_GUINT64_FORMAT, ts_rate);
while (1) {
g_assert (chunk_bytes > mux->previous_offset);
while (offset < chunk_bytes) {
guint64 cur_pcr, ts;
/* Loop, pulling packets of the adapter, updating their 4 byte
* timestamp header and pushing */
/* The header is the bottom 30 bits of the PCR, apparently not
* encoded into base + ext as in the packets themselves, so
* we can just interpolate, mask and insert */
cur_pcr = (mux->previous_pcr +
gst_util_uint64_scale (pcr_bytes, CLOCK_FREQ_SCR, ts_rate));
/* interpolate PCR */
if (G_LIKELY (offset >= mux->previous_offset))
cur_pcr = mux->previous_pcr +
gst_util_uint64_scale (offset - mux->previous_offset,
new_pcr - mux->previous_pcr, chunk_bytes - mux->previous_offset);
else
cur_pcr = mux->previous_pcr -
gst_util_uint64_scale (mux->previous_offset - offset,
new_pcr - mux->previous_pcr, chunk_bytes - mux->previous_offset);
ts = gst_adapter_prev_timestamp (mux->adapter, NULL);
out_buf = gst_adapter_take_buffer (mux->adapter, M2TS_PACKET_LENGTH);
if (G_UNLIKELY (!out_buf))
break;
g_assert (out_buf);
offset += M2TS_PACKET_LENGTH;
gst_buffer_set_caps (out_buf, GST_PAD_CAPS (mux->srcpad));
GST_BUFFER_TIMESTAMP (out_buf) = ts;
/* Write the 4 byte timestamp value, bottom 30 bits only = PCR */
/* The header is the bottom 30 bits of the PCR, apparently not
* encoded into base + ext as in the packets themselves */
GST_WRITE_UINT32_BE (GST_BUFFER_DATA (out_buf), cur_pcr & 0x3FFFFFFF);
GST_LOG_OBJECT (mux, "Outputting a packet of length %d PCR %"
@ -1272,7 +1268,6 @@ new_packet_m2ts (MpegTsMux * mux, guint8 * data, guint len, gint64 new_pcr)
mux->last_flow_ret = ret;
return FALSE;
}
pcr_bytes += M2TS_PACKET_LENGTH;
}
}
@ -1288,7 +1283,10 @@ new_packet_m2ts (MpegTsMux * mux, guint8 * data, guint len, gint64 new_pcr)
return FALSE;
}
mux->previous_pcr = new_pcr;
if (new_pcr != mux->previous_pcr) {
mux->previous_pcr = new_pcr;
mux->previous_offset = -M2TS_PACKET_LENGTH;
}
return TRUE;
}

View file

@ -153,8 +153,8 @@ struct MpegTsMux {
GstClockTime last_ts;
/* m2ts specific */
gboolean first_pcr;
gint64 previous_pcr;
gint64 previous_offset;
GstAdapter *adapter;
/* output buffer aggregation */

View file

@ -108,6 +108,10 @@
/* Times per second to write PCR */
#define TSMUX_DEFAULT_PCR_FREQ (25)
/* Base for all written PCR and DTS/PTS,
* so we have some slack to go backwards */
#define CLOCK_BASE (TSMUX_CLOCK_FREQ * 10 * 360)
static gboolean tsmux_write_pat (TsMux * mux);
static gboolean tsmux_write_pmt (TsMux * mux, TsMuxProgram * program);
@ -431,13 +435,13 @@ tsmux_find_stream (TsMux * mux, guint16 pid)
}
static gboolean
tsmux_packet_out (TsMux * mux)
tsmux_packet_out (TsMux * mux, gint64 pcr)
{
if (G_UNLIKELY (mux->write_func == NULL))
return TRUE;
return mux->write_func (mux->packet_buf, TSMUX_PACKET_LENGTH,
mux->write_func_data, mux->new_pcr);
mux->write_func_data, pcr);
}
/*
@ -702,18 +706,17 @@ tsmux_write_stream_packet (TsMux * mux, TsMuxStream * stream)
guint payload_len, payload_offs;
TsMuxPacketInfo *pi = &stream->pi;
gboolean res;
gint64 cur_pcr = -1;
mux->new_pcr = -1;
g_return_val_if_fail (mux != NULL, FALSE);
g_return_val_if_fail (stream != NULL, FALSE);
if (tsmux_stream_is_pcr (stream)) {
gint64 cur_pcr = 0;
gint64 cur_pts = tsmux_stream_get_pts (stream);
gboolean write_pat;
GList *cur;
cur_pcr = 0;
if (cur_pts != -1) {
TS_DEBUG ("TS for PCR stream is %" G_GINT64_FORMAT, cur_pts);
}
@ -724,6 +727,8 @@ tsmux_write_stream_packet (TsMux * mux, TsMuxStream * stream)
cur_pcr = (cur_pts - TSMUX_PCR_OFFSET) *
(TSMUX_SYS_CLOCK_FREQ / TSMUX_CLOCK_FREQ);
cur_pcr += CLOCK_BASE * (TSMUX_SYS_CLOCK_FREQ / TSMUX_CLOCK_FREQ);
/* Need to decide whether to write a new PCR in this packet */
if (stream->last_pcr == -1 ||
(cur_pcr - stream->last_pcr >
@ -733,7 +738,8 @@ tsmux_write_stream_packet (TsMux * mux, TsMuxStream * stream)
TSMUX_PACKET_FLAG_ADAPTATION | TSMUX_PACKET_FLAG_WRITE_PCR;
stream->pi.pcr = cur_pcr;
stream->last_pcr = cur_pcr;
mux->new_pcr = cur_pcr;
} else {
cur_pcr = -1;
}
/* check if we need to rewrite pat */
@ -771,8 +777,13 @@ tsmux_write_stream_packet (TsMux * mux, TsMuxStream * stream)
}
pi->packet_start_unit_indicator = tsmux_stream_at_pes_start (stream);
if (pi->packet_start_unit_indicator)
if (pi->packet_start_unit_indicator) {
tsmux_stream_initialize_pes_packet (stream);
if (stream->dts != -1)
stream->dts += CLOCK_BASE;
if (stream->pts != -1)
stream->pts += CLOCK_BASE;
}
pi->stream_avail = tsmux_stream_bytes_avail (stream);
if (!tsmux_write_ts_header (mux->packet_buf, pi, &payload_len, &payload_offs))
@ -782,7 +793,7 @@ tsmux_write_stream_packet (TsMux * mux, TsMuxStream * stream)
payload_len))
return FALSE;
res = tsmux_packet_out (mux);
res = tsmux_packet_out (mux, cur_pcr);
/* Reset all dynamic flags */
stream->pi.flags &= TSMUX_PACKET_FLAG_PES_FULL_HEADER;
@ -853,11 +864,10 @@ tsmux_write_section (TsMux * mux, TsMuxSection * section)
cur_in += payload_len;
payload_remain -= payload_len;
if (G_UNLIKELY (!tsmux_packet_out (mux))) {
mux->new_pcr = -1;
/* we do not write PCR in section */
if (G_UNLIKELY (!tsmux_packet_out (mux, -1))) {
return FALSE;
}
mux->new_pcr = -1;
}
return TRUE;

View file

@ -167,7 +167,6 @@ struct TsMux {
/* scratch space for writing ES_info descriptors */
guint8 es_info_buf[TSMUX_MAX_ES_INFO_LENGTH];
gint64 new_pcr;
};
/* create/free new muxer session */