mpegtsmux: Improve PCR/SI scheduling.

Change PCR / SI scheduling so that instead of checking if
the current PCR is larger than the next target time, instead
check if the PCR of the next packet would be too late, so PCR
and SI are always scheduled earlier than the target, not later.

There are still cases where PCR can be written too late though,
because we don't check before each output packet.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/merge_requests/2073>
This commit is contained in:
Jan Schmidt 2021-03-11 18:05:25 +11:00 committed by GStreamer Marge Bot
parent 49c61338d6
commit 3201575d2a

View file

@ -101,6 +101,10 @@
#define TSMUX_DEFAULT_NETWORK_ID 0x0001 #define TSMUX_DEFAULT_NETWORK_ID 0x0001
#define TSMUX_DEFAULT_TS_ID 0x0001 #define TSMUX_DEFAULT_TS_ID 0x0001
/* The last byte of the PCR in the header defines the byte position
* at which PCR should be calculated */
#define PCR_BYTE_OFFSET 11
/* HACK: We use a fixed buffering offset for the PCR at the moment - /* HACK: We use a fixed buffering offset for the PCR at the moment -
* this is the amount 'in advance' of the stream that the PCR sits. * this is the amount 'in advance' of the stream that the PCR sits.
* 1/8 second atm */ * 1/8 second atm */
@ -1248,6 +1252,7 @@ ts_to_pcr (gint64 ts)
return (ts - TSMUX_PCR_OFFSET) * (TSMUX_SYS_CLOCK_FREQ / TSMUX_CLOCK_FREQ); return (ts - TSMUX_PCR_OFFSET) * (TSMUX_SYS_CLOCK_FREQ / TSMUX_CLOCK_FREQ);
} }
/* Calculate the PCR to write into the current packet */
static gint64 static gint64
get_current_pcr (TsMux * mux, gint64 cur_ts) get_current_pcr (TsMux * mux, gint64 cur_ts)
{ {
@ -1257,25 +1262,48 @@ get_current_pcr (TsMux * mux, gint64 cur_ts)
if (mux->first_pcr_ts == G_MININT64) { if (mux->first_pcr_ts == G_MININT64) {
g_assert (cur_ts != G_MININT64); g_assert (cur_ts != G_MININT64);
mux->first_pcr_ts = cur_ts; mux->first_pcr_ts = cur_ts;
GST_DEBUG ("First PCR offset is %" G_GUINT64_FORMAT, cur_ts);
} }
return ts_to_pcr (mux->first_pcr_ts) + return ts_to_pcr (mux->first_pcr_ts) +
gst_util_uint64_scale (mux->n_bytes * 8, TSMUX_SYS_CLOCK_FREQ, gst_util_uint64_scale ((mux->n_bytes + PCR_BYTE_OFFSET) * 8,
mux->bitrate); TSMUX_SYS_CLOCK_FREQ, mux->bitrate);
}
/* Predict the PCR at the next packet if possible */
static gint64
get_next_pcr (TsMux * mux, gint64 cur_ts)
{
if (!mux->bitrate)
return ts_to_pcr (cur_ts);
if (mux->first_pcr_ts == G_MININT64) {
g_assert (cur_ts != G_MININT64);
mux->first_pcr_ts = cur_ts;
GST_DEBUG ("First PCR offset is %" G_GUINT64_FORMAT, cur_ts);
}
return ts_to_pcr (mux->first_pcr_ts) +
gst_util_uint64_scale ((mux->n_bytes + TSMUX_PACKET_LENGTH +
PCR_BYTE_OFFSET) * 8, TSMUX_SYS_CLOCK_FREQ, mux->bitrate);
} }
static gint64 static gint64
write_new_pcr (TsMux * mux, TsMuxStream * stream, gint64 cur_pcr) write_new_pcr (TsMux * mux, TsMuxStream * stream, gint64 cur_pcr,
gint64 next_pcr)
{ {
if (stream->next_pcr == -1 || cur_pcr > stream->next_pcr) { if (stream->next_pcr == -1 || next_pcr > stream->next_pcr) {
stream->pi.flags |= stream->pi.flags |=
TSMUX_PACKET_FLAG_ADAPTATION | TSMUX_PACKET_FLAG_WRITE_PCR; TSMUX_PACKET_FLAG_ADAPTATION | TSMUX_PACKET_FLAG_WRITE_PCR;
stream->pi.pcr = cur_pcr; stream->pi.pcr = cur_pcr;
if (stream->next_pcr == -1) if (stream->next_pcr != -1 && cur_pcr >= stream->next_pcr) {
stream->next_pcr = cur_pcr + mux->pcr_interval * 300; GST_WARNING ("Writing PCR %" G_GUINT64_FORMAT " missed the target %"
else G_GUINT64_FORMAT " by %f ms", cur_pcr, stream->next_pcr,
stream->next_pcr += mux->pcr_interval * 300; (double) (cur_pcr - stream->next_pcr) / 27000.0);
}
/* Next PCR deadline is now plus the scheduled interval */
stream->next_pcr = cur_pcr + mux->pcr_interval * 300;
} else { } else {
cur_pcr = -1; cur_pcr = -1;
} }
@ -1289,48 +1317,48 @@ rewrite_si (TsMux * mux, gint64 cur_ts)
gboolean write_pat; gboolean write_pat;
gboolean write_si; gboolean write_si;
GList *cur; GList *cur;
gint64 cur_pcr; gint64 next_pcr;
cur_pcr = get_current_pcr (mux, cur_ts); next_pcr = get_next_pcr (mux, cur_ts);
/* check if we need to rewrite pat */ /* check if we need to rewrite pat */
if (mux->next_pat_pcr == -1 || mux->pat_changed) if (mux->next_pat_pcr == -1 || mux->pat_changed)
write_pat = TRUE; write_pat = TRUE;
else if (cur_pcr > mux->next_pat_pcr) else if (next_pcr > mux->next_pat_pcr)
write_pat = TRUE; write_pat = TRUE;
else else
write_pat = FALSE; write_pat = FALSE;
if (write_pat) { if (write_pat) {
if (mux->next_pat_pcr == -1) if (mux->next_pat_pcr == -1)
mux->next_pat_pcr = cur_pcr + mux->pat_interval * 300; mux->next_pat_pcr = next_pcr + mux->pat_interval * 300;
else else
mux->next_pat_pcr += mux->pat_interval * 300; mux->next_pat_pcr += mux->pat_interval * 300;
if (!tsmux_write_pat (mux)) if (!tsmux_write_pat (mux))
return FALSE; return FALSE;
cur_pcr = get_current_pcr (mux, cur_ts); next_pcr = get_next_pcr (mux, cur_ts);
} }
/* check if we need to rewrite sit */ /* check if we need to rewrite sit */
if (mux->next_si_pcr == -1 || mux->si_changed) if (mux->next_si_pcr == -1 || mux->si_changed)
write_si = TRUE; write_si = TRUE;
else if (cur_pcr > mux->next_si_pcr) else if (next_pcr > mux->next_si_pcr)
write_si = TRUE; write_si = TRUE;
else else
write_si = FALSE; write_si = FALSE;
if (write_si) { if (write_si) {
if (mux->next_si_pcr == -1) if (mux->next_si_pcr == -1)
mux->next_si_pcr = cur_pcr + mux->si_interval * 300; mux->next_si_pcr = next_pcr + mux->si_interval * 300;
else else
mux->next_si_pcr += mux->si_interval * 300; mux->next_si_pcr += mux->si_interval * 300;
if (!tsmux_write_si (mux)) if (!tsmux_write_si (mux))
return FALSE; return FALSE;
cur_pcr = get_current_pcr (mux, cur_ts); next_pcr = get_current_pcr (mux, cur_ts);
} }
/* check if we need to rewrite any of the current pmts */ /* check if we need to rewrite any of the current pmts */
@ -1340,28 +1368,28 @@ rewrite_si (TsMux * mux, gint64 cur_ts)
if (program->next_pmt_pcr == -1 || program->pmt_changed) if (program->next_pmt_pcr == -1 || program->pmt_changed)
write_pmt = TRUE; write_pmt = TRUE;
else if (cur_pcr > program->next_pmt_pcr) else if (next_pcr > program->next_pmt_pcr)
write_pmt = TRUE; write_pmt = TRUE;
else else
write_pmt = FALSE; write_pmt = FALSE;
if (write_pmt) { if (write_pmt) {
if (program->next_pmt_pcr == -1) if (program->next_pmt_pcr == -1)
program->next_pmt_pcr = cur_pcr + program->pmt_interval * 300; program->next_pmt_pcr = next_pcr + program->pmt_interval * 300;
else else
program->next_pmt_pcr += program->pmt_interval * 300; program->next_pmt_pcr += program->pmt_interval * 300;
if (!tsmux_write_pmt (mux, program)) if (!tsmux_write_pmt (mux, program))
return FALSE; return FALSE;
cur_pcr = get_current_pcr (mux, cur_ts); next_pcr = get_current_pcr (mux, cur_ts);
} }
if (program->scte35_pid != 0) { if (program->scte35_pid != 0) {
gboolean write_scte_null = FALSE; gboolean write_scte_null = FALSE;
if (program->next_scte35_pcr == -1) if (program->next_scte35_pcr == -1)
write_scte_null = TRUE; write_scte_null = TRUE;
else if (cur_pcr > program->next_scte35_pcr) else if (next_pcr > program->next_scte35_pcr)
write_scte_null = TRUE; write_scte_null = TRUE;
if (write_scte_null) { if (write_scte_null) {
@ -1369,7 +1397,7 @@ rewrite_si (TsMux * mux, gint64 cur_ts)
program->next_scte35_pcr); program->next_scte35_pcr);
if (program->next_scte35_pcr == -1) if (program->next_scte35_pcr == -1)
program->next_scte35_pcr = program->next_scte35_pcr =
cur_pcr + program->scte35_null_interval * 300; next_pcr + program->scte35_null_interval * 300;
else else
program->next_scte35_pcr += program->scte35_null_interval * 300; program->next_scte35_pcr += program->scte35_null_interval * 300;
GST_DEBUG ("next scte35 NOW pcr %" G_GINT64_FORMAT, GST_DEBUG ("next scte35 NOW pcr %" G_GINT64_FORMAT,
@ -1378,7 +1406,7 @@ rewrite_si (TsMux * mux, gint64 cur_ts)
if (!tsmux_write_scte_null (mux, program)) if (!tsmux_write_scte_null (mux, program))
return FALSE; return FALSE;
cur_pcr = get_current_pcr (mux, cur_ts); next_pcr = get_current_pcr (mux, cur_ts);
} }
} }
} }
@ -1393,66 +1421,73 @@ pad_stream (TsMux * mux, TsMuxStream * stream, gint64 cur_ts)
GstBuffer *buf = NULL; GstBuffer *buf = NULL;
GstMapInfo map; GstMapInfo map;
gboolean ret = TRUE; gboolean ret = TRUE;
GstClockTimeDiff diff;
guint64 start_n_bytes;
if (!mux->bitrate) if (!mux->bitrate)
goto done; goto done;
if (!GST_CLOCK_STIME_IS_VALID (cur_ts))
goto done;
if (!GST_CLOCK_STIME_IS_VALID (stream->first_ts))
stream->first_ts = cur_ts;
diff = GST_CLOCK_DIFF (stream->first_ts, cur_ts);
if (diff == 0)
goto done;
start_n_bytes = mux->n_bytes;
do { do {
if (GST_CLOCK_STIME_IS_VALID (cur_ts)) { GST_LOG ("Transport stream bitrate: %" G_GUINT64_FORMAT " over %"
GstClockTimeDiff diff; G_GUINT64_FORMAT " bytes, duration %" GST_TIME_FORMAT,
gst_util_uint64_scale (mux->n_bytes * 8, TSMUX_CLOCK_FREQ, diff),
mux->n_bytes, GST_TIME_ARGS (diff * GST_SECOND / TSMUX_CLOCK_FREQ));
if (!GST_CLOCK_STIME_IS_VALID (stream->first_ts)) /* calculate what the overall bitrate will be if we add 1 more packet */
stream->first_ts = cur_ts; bitrate =
gst_util_uint64_scale ((mux->n_bytes + TSMUX_PACKET_LENGTH) * 8,
TSMUX_CLOCK_FREQ, diff);
diff = GST_CLOCK_DIFF (stream->first_ts, cur_ts); if (bitrate <= mux->bitrate) {
gint64 new_pcr;
guint payload_len, payload_offs;
if (diff) { if (!tsmux_get_buffer (mux, &buf)) {
bitrate = ret = FALSE;
gst_util_uint64_scale (mux->n_bytes * 8, TSMUX_CLOCK_FREQ, diff); goto done;
GST_LOG ("Transport stream bitrate: %" G_GUINT64_FORMAT, bitrate);
if (bitrate < mux->bitrate) {
gint64 new_pcr;
guint payload_len, payload_offs;
GST_LOG ("Padding transport stream");
if (!rewrite_si (mux, cur_ts)) {
ret = FALSE;
goto done;
}
if (!tsmux_get_buffer (mux, &buf)) {
ret = FALSE;
goto done;
}
gst_buffer_map (buf, &map, GST_MAP_READ);
if ((new_pcr =
write_new_pcr (mux, stream, get_current_pcr (mux,
cur_ts)) != -1))
tsmux_write_ts_header (mux, map.data, &stream->pi, &payload_len,
&payload_offs, 0);
else
tsmux_write_null_ts_header (map.data);
gst_buffer_unmap (buf, &map);
stream->pi.flags &= TSMUX_PACKET_FLAG_PES_FULL_HEADER;
if (!(ret = tsmux_packet_out (mux, buf, new_pcr)))
goto done;
}
} else {
break;
} }
} else {
break; gst_buffer_map (buf, &map, GST_MAP_READ);
if ((new_pcr =
write_new_pcr (mux, stream, get_current_pcr (mux,
cur_ts), get_next_pcr (mux, cur_ts)) != -1)) {
GST_LOG ("Writing PCR-only packet on PID 0x%04x", stream->pi.pid);
tsmux_write_ts_header (mux, map.data, &stream->pi, &payload_len,
&payload_offs, 0);
} else {
GST_LOG ("Writing null stuffing packet");
if (!rewrite_si (mux, cur_ts)) {
ret = FALSE;
goto done;
}
tsmux_write_null_ts_header (map.data);
}
gst_buffer_unmap (buf, &map);
stream->pi.flags &= TSMUX_PACKET_FLAG_PES_FULL_HEADER;
if (!(ret = tsmux_packet_out (mux, buf, new_pcr)))
goto done;
} }
} while (bitrate < mux->bitrate); } while (bitrate < mux->bitrate);
if (mux->n_bytes != start_n_bytes) {
GST_LOG ("Finished padding the mux");
}
done: done:
return ret; return ret;
} }
@ -1481,7 +1516,6 @@ tsmux_write_stream_packet (TsMux * mux, TsMuxStream * stream)
if (tsmux_stream_is_pcr (stream)) { if (tsmux_stream_is_pcr (stream)) {
gint64 cur_ts = CLOCK_BASE; gint64 cur_ts = CLOCK_BASE;
if (tsmux_stream_get_dts (stream) != G_MININT64) if (tsmux_stream_get_dts (stream) != G_MININT64)
cur_ts += tsmux_stream_get_dts (stream); cur_ts += tsmux_stream_get_dts (stream);
else else
@ -1493,7 +1527,9 @@ tsmux_write_stream_packet (TsMux * mux, TsMuxStream * stream)
if (!pad_stream (mux, stream, cur_ts)) if (!pad_stream (mux, stream, cur_ts))
goto fail; goto fail;
new_pcr = write_new_pcr (mux, stream, get_current_pcr (mux, cur_ts)); new_pcr =
write_new_pcr (mux, stream, get_current_pcr (mux, cur_ts),
get_next_pcr (mux, cur_ts));
} }
pi->packet_start_unit_indicator = tsmux_stream_at_pes_start (stream); pi->packet_start_unit_indicator = tsmux_stream_at_pes_start (stream);