mpegtsmux: Add SCTE-35 support

This adds two properties:
* scte-35-pid: If not 0, enables the SCTE-35 support for the current
  program. This will write the proper PMT and send SCTE-35 NULL
  commands (i.e. heartbeats) at a regular interval
* scte-35-null-interval: This specifies the interval at which the
  NULL commands should be sent

Sending SCTE-35 commands is done by creating the appropriate SCTE-35
GstMpegtsSection and then sending them on the muxer. See the
associated example
This commit is contained in:
Edward Hervey 2019-09-26 17:45:31 +02:00 committed by Edward Hervey
parent 6a9108884c
commit ef16d7558f
8 changed files with 342 additions and 11 deletions

View file

@ -1,9 +1,11 @@
/*
* gst-scte-section.h -
* Copyright (C) 2013, CableLabs, Louisville, CO 80027
* (c) 2019, Centricular ltd
*
* Authors:
* RUIH Team <ruih@cablelabs.com>
* Edward Hervey <edward@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public

View file

@ -195,8 +195,12 @@ enum
PROP_SI_INTERVAL,
PROP_BITRATE,
PROP_PCR_INTERVAL,
PROP_SCTE_35_PID,
PROP_SCTE_35_NULL_INTERVAL
};
#define DEFAULT_SCTE_35_PID 0
#define BASETSMUX_DEFAULT_ALIGNMENT -1
#define CLOCK_BASE 9LL
@ -726,8 +730,11 @@ gst_base_ts_mux_create_streams (GstBaseTsMux * mux)
if (ts_pad->prog == NULL)
goto no_program;
tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval);
g_hash_table_insert (mux->programs,
GINT_TO_POINTER (ts_pad->prog_id), ts_pad->prog);
tsmux_program_set_scte35_pid (ts_pad->prog, mux->scte35_pid);
tsmux_program_set_scte35_interval (ts_pad->prog,
mux->scte35_null_interval);
g_hash_table_insert (mux->programs, GINT_TO_POINTER (ts_pad->prog_id),
ts_pad->prog);
}
if (ts_pad->stream == NULL) {
@ -1052,6 +1059,7 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
gint64 dts = GST_CLOCK_STIME_NONE;
gboolean delta = TRUE, header = FALSE;
StreamData *stream_data;
GstMpegtsSection *scte_section = NULL;
GST_DEBUG_OBJECT (mux, "Pads collected");
@ -1128,6 +1136,16 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
GST_DEBUG_OBJECT (best, "Chose stream for output (PID: 0x%04x)", best->pid);
GST_OBJECT_LOCK (mux);
scte_section = mux->pending_scte35_section;
mux->pending_scte35_section = NULL;
GST_OBJECT_UNLOCK (mux);
if (G_UNLIKELY (scte_section)) {
GST_DEBUG_OBJECT (mux, "Sending pending SCTE section");
if (!tsmux_send_section (mux->tsmux, scte_section))
GST_ERROR_OBJECT (mux, "Error sending SCTE section !");
}
if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (buf))) {
pts = GSTTIME_TO_MPEGTIME (GST_BUFFER_PTS (buf));
GST_DEBUG_OBJECT (mux, "Buffer has PTS %" GST_TIME_FORMAT " pts %"
@ -1249,8 +1267,18 @@ gst_base_ts_mux_send_event (GstElement * element, GstEvent * event)
if (section) {
GST_DEBUG ("Received event with mpegts section");
/* TODO: Check that the section type is supported */
tsmux_add_mpegts_si_section (mux->tsmux, section);
if (section->section_type == GST_MPEGTS_SECTION_SCTE_SIT) {
/* Will be sent from the streaming threads */
GST_DEBUG_OBJECT (mux, "Storing SCTE event");
GST_OBJECT_LOCK (element);
if (mux->pending_scte35_section)
gst_mpegts_section_unref (mux->pending_scte35_section);
mux->pending_scte35_section = section;
GST_OBJECT_UNLOCK (element);
} else {
/* TODO: Check that the section type is supported */
tsmux_add_mpegts_si_section (mux->tsmux, section);
}
gst_event_unref (event);
@ -1710,6 +1738,12 @@ gst_base_ts_mux_set_property (GObject * object, guint prop_id,
if (mux->tsmux)
tsmux_set_pcr_interval (mux->tsmux, mux->pcr_interval);
break;
case PROP_SCTE_35_PID:
mux->scte35_pid = g_value_get_uint (value);
break;
case PROP_SCTE_35_NULL_INTERVAL:
mux->scte35_null_interval = g_value_get_uint (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -1744,6 +1778,12 @@ gst_base_ts_mux_get_property (GObject * object, guint prop_id,
case PROP_PCR_INTERVAL:
g_value_set_uint (value, mux->pcr_interval);
break;
case PROP_SCTE_35_PID:
g_value_set_uint (value, mux->scte35_pid);
break;
case PROP_SCTE_35_NULL_INTERVAL:
g_value_set_uint (value, mux->scte35_null_interval);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -1878,6 +1918,20 @@ gst_base_ts_mux_class_init (GstBaseTsMuxClass * klass)
1, G_MAXUINT, TSMUX_DEFAULT_PCR_INTERVAL,
(GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SCTE_35_PID,
g_param_spec_uint ("scte-35-pid", "SCTE-35 PID",
"PID to use for inserting SCTE-35 packets (0: unused)",
0, G_MAXUINT, DEFAULT_SCTE_35_PID,
(GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
g_object_class_install_property (G_OBJECT_CLASS (klass),
PROP_SCTE_35_NULL_INTERVAL, g_param_spec_uint ("scte-35-null-interval",
"SCTE-35 NULL packet interval",
"Set the interval (in ticks of the 90kHz clock) for writing SCTE-35 NULL (heartbeat) packets."
" (only valid if scte-35-pid is different from 0)", 1, G_MAXUINT,
TSMUX_DEFAULT_SCTE_35_NULL_INTERVAL,
(GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
&gst_base_ts_mux_src_factory, GST_TYPE_AGGREGATOR_PAD);
}
@ -1895,6 +1949,8 @@ gst_base_ts_mux_init (GstBaseTsMux * mux)
mux->prog_map = NULL;
mux->alignment = BASETSMUX_DEFAULT_ALIGNMENT;
mux->bitrate = TSMUX_DEFAULT_BITRATE;
mux->scte35_pid = DEFAULT_SCTE_35_PID;
mux->scte35_null_interval = TSMUX_DEFAULT_SCTE_35_NULL_INTERVAL;
mux->packet_size = GST_BASE_TS_MUX_NORMAL_PACKET_LENGTH;
mux->automatic_alignment = 0;

View file

@ -174,11 +174,14 @@ struct GstBaseTsMux {
guint si_interval;
guint64 bitrate;
guint pcr_interval;
guint scte35_pid;
guint scte35_null_interval;
/* state */
gboolean first;
GstClockTime pending_key_unit_ts;
GstEvent *force_key_unit_event;
GstMpegtsSection *pending_scte35_section;
/* write callback handling/state */
GstFlowReturn last_flow_ret;

View file

@ -112,6 +112,7 @@
static gboolean tsmux_write_pat (TsMux * mux);
static gboolean tsmux_write_pmt (TsMux * mux, TsMuxProgram * program);
static gboolean tsmux_write_scte_null (TsMux * mux, TsMuxProgram * program);
static void
tsmux_section_free (TsMuxSection * section)
{
@ -361,6 +362,7 @@ tsmux_add_mpegts_si_section (TsMux * mux, GstMpegtsSection * section)
return TRUE;
}
/**
* tsmux_free:
* @mux: a #TsMux
@ -451,6 +453,11 @@ tsmux_program_new (TsMux * mux, gint prog_id)
program->pmt_pid = mux->next_pmt_pid++;
program->pcr_stream = NULL;
/* SCTE35 is disabled by default */
program->scte35_pid = 0;
program->scte35_null_interval = TSMUX_DEFAULT_SCTE_35_NULL_INTERVAL;
program->next_scte35_pcr = -1;
program->streams = g_array_sized_new (FALSE, TRUE, sizeof (TsMuxStream *), 1);
mux->programs = g_list_prepend (mux->programs, program);
@ -495,6 +502,22 @@ tsmux_get_pmt_interval (TsMuxProgram * program)
return program->pmt_interval;
}
/**
* tsmux_program_set_scte35_interval:
* @program: a #TsMuxProgram
* @freq: a new SCTE-35 NULL interval
*
* Set the interval (in cycles of the 90kHz clock) for sending out the SCTE-35
* NULL command. This is only effective is the SCTE-35 PID is not 0.
*/
void
tsmux_program_set_scte35_interval (TsMuxProgram * program, guint interval)
{
g_return_if_fail (program != NULL);
program->scte35_null_interval = interval;
}
/**
* tsmux_resend_pmt:
* @program: a #TsMuxProgram
@ -509,6 +532,55 @@ tsmux_resend_pmt (TsMuxProgram * program)
program->next_pmt_pcr = -1;
}
/**
* tsmux_program_set_scte35_pid:
* @program: a #TsMuxProgram
* @pid: The pid to use, or 0 to deactivate usage.
*
* Set the @pid to use for sending SCTE-35 packets on the given
* @program.
*
* This needs to be called as early as possible if SCTE-35 sections
* are even going to be used with the given @program so that the PMT
* can be properly configured.
*/
void
tsmux_program_set_scte35_pid (TsMuxProgram * program, guint16 pid)
{
TsMuxSection *section;
GstMpegtsSCTESIT *sit;
g_return_if_fail (program != NULL);
program->scte35_pid = pid;
/* Create/Update the section */
if (program->scte35_null_section) {
tsmux_section_free (program->scte35_null_section);
program->scte35_null_section = NULL;
}
if (pid != 0) {
program->scte35_null_section = section = g_slice_new0 (TsMuxSection);
section->pi.pid = pid;
sit = gst_mpegts_scte_null_new ();
section->section = gst_mpegts_section_from_scte_sit (sit, pid);
}
}
/**
* tsmux_program_get_scte35_pid:
* @program: a #TsMuxProgram
*
* Get the PID configured for sending SCTE-35 packets.
*
* Returns: the configured SCTE-35 PID, or 0 if not active.
*/
guint16
tsmux_program_get_scte35_pid (TsMuxProgram * program)
{
g_return_val_if_fail (program != NULL, 0);
return program->scte35_pid;
}
/**
* tsmux_program_add_stream:
* @program: a #TsMuxProgram
@ -931,8 +1003,9 @@ tsmux_write_ts_header (TsMux * mux, guint8 * buf, TsMuxPacketInfo * pi,
return TRUE;
}
/* The unused_arg is needed for g_hash_table_foreach() */
static gboolean
tsmux_section_write_packet (GstMpegtsSectionType * type,
tsmux_section_write_packet (gpointer unused_arg,
TsMuxSection * section, TsMux * mux)
{
GstBuffer *section_buffer;
@ -1056,6 +1129,38 @@ fail:
return FALSE;
}
/**
* tsmux_send_section:
* @mux: a #TsMux
* @section: (transfer full): a #GstMpegtsSection to add
*
* Send a @section immediately on the stream.
*
* Returns: %TRUE on success, %FALSE otherwise
*/
gboolean
tsmux_send_section (TsMux * mux, GstMpegtsSection * section)
{
gboolean ret;
TsMuxSection tsmux_section;
g_return_val_if_fail (mux != NULL, FALSE);
g_return_val_if_fail (section != NULL, FALSE);
memset (&tsmux_section, 0, sizeof (tsmux_section));
GST_DEBUG ("Sending mpegts section with type %d to mux",
section->section_type);
tsmux_section.section = section;
tsmux_section.pi.pid = section->pid;
ret = tsmux_section_write_packet (NULL, &tsmux_section, mux);
gst_mpegts_section_unref (section);
return ret;
}
static gboolean
tsmux_write_si (TsMux * mux)
{
@ -1183,6 +1288,31 @@ rewrite_si (TsMux * mux, gint64 cur_ts)
cur_pcr = get_current_pcr (mux, cur_ts);
}
if (program->scte35_pid != 0) {
gboolean write_scte_null = FALSE;
if (program->next_scte35_pcr == -1)
write_scte_null = TRUE;
else if (cur_pcr > program->next_scte35_pcr)
write_scte_null = TRUE;
if (write_scte_null) {
GST_DEBUG ("next scte35 pcr %" G_GINT64_FORMAT,
program->next_scte35_pcr);
if (program->next_scte35_pcr == -1)
program->next_scte35_pcr =
cur_pcr + program->scte35_null_interval * 300;
else
program->next_scte35_pcr += program->scte35_null_interval * 300;
GST_DEBUG ("next scte35 NOW pcr %" G_GINT64_FORMAT,
program->next_scte35_pcr);
if (!tsmux_write_scte_null (mux, program))
return FALSE;
cur_pcr = get_current_pcr (mux, cur_ts);
}
}
}
return TRUE;
@ -1358,6 +1488,8 @@ tsmux_program_free (TsMuxProgram * program)
/* Free PMT section */
if (program->pmt.section)
gst_mpegts_section_unref (program->pmt.section);
if (program->scte35_null_section)
tsmux_section_free (program->scte35_null_section);
g_array_free (program->streams, TRUE);
g_slice_free (TsMuxProgram, program);
@ -1403,8 +1535,7 @@ tsmux_write_pat (TsMux * mux)
mux->pat_changed = FALSE;
}
return tsmux_section_write_packet (GINT_TO_POINTER (GST_MPEGTS_SECTION_PAT),
&mux->pat, mux);
return tsmux_section_write_packet (NULL, &mux->pat, mux);
}
static gboolean
@ -1459,6 +1590,12 @@ tsmux_write_pmt (TsMux * mux, TsMuxProgram * program)
g_ptr_array_add (pmt->descriptors, descriptor);
#endif
/* Will SCTE-35 be potentially used ? */
if (program->scte35_pid != 0) {
descriptor = gst_mpegts_descriptor_from_registration ("CUEI", NULL, 0);
g_ptr_array_add (pmt->descriptors, descriptor);
}
/* Write out the entries */
for (i = 0; i < program->streams->len; i++) {
GstMpegtsPMTStream *pmt_stream;
@ -1475,6 +1612,14 @@ tsmux_write_pmt (TsMux * mux, TsMuxProgram * program)
g_ptr_array_add (pmt->streams, pmt_stream);
}
/* Will SCTE-35 be potentially used ? */
if (program->scte35_pid != 0) {
GstMpegtsPMTStream *pmt_stream = gst_mpegts_pmt_stream_new ();
pmt_stream->stream_type = GST_MPEGTS_STREAM_TYPE_SCTE_SIT;
pmt_stream->pid = program->scte35_pid;
g_ptr_array_add (pmt->streams, pmt_stream);
}
TS_DEBUG ("PMT for program %d has %d streams",
program->pgm_number, program->streams->len);
@ -1490,8 +1635,15 @@ tsmux_write_pmt (TsMux * mux, TsMuxProgram * program)
program->pmt.section->version_number = program->pmt_version++;
}
return tsmux_section_write_packet (GINT_TO_POINTER (GST_MPEGTS_SECTION_PMT),
&program->pmt, mux);
return tsmux_section_write_packet (NULL, &program->pmt, mux);
}
static gboolean
tsmux_write_scte_null (TsMux * mux, TsMuxProgram * program)
{
/* SCTE-35 NULL section is created when PID is set */
GST_LOG ("Writing SCTE NULL packet");
return tsmux_section_write_packet (NULL, program->scte35_null_section, mux);
}
void

View file

@ -128,6 +128,14 @@ struct TsMuxProgram {
/* PID to write the PMT */
guint16 pmt_pid;
TsMuxSection *scte35_null_section;
/* SCTE-35 pid (0 if inactive/unused) */
guint16 scte35_pid;
/* Interval between SCTE-35 NULL packets in MPEG PTS clock time */
guint scte35_null_interval;
/* Next SCTE-35 position, 27 MHz */
gint64 next_scte35_pcr;
/* stream which carries the PCR */
TsMuxStream *pcr_stream;
@ -214,6 +222,10 @@ void tsmux_program_free (TsMuxProgram *program);
void tsmux_set_pmt_interval (TsMuxProgram *program, guint interval);
guint tsmux_get_pmt_interval (TsMuxProgram *program);
void tsmux_resend_pmt (TsMuxProgram *program);
void tsmux_program_set_scte35_pid (TsMuxProgram *program, guint16 pid);
guint16 tsmux_program_get_scte35_pid (TsMuxProgram *program);
void tsmux_program_set_scte35_interval (TsMuxProgram *mux, guint interval);
/* SI table management */
void tsmux_set_si_interval (TsMux *mux, guint interval);
@ -221,6 +233,9 @@ guint tsmux_get_si_interval (TsMux *mux);
void tsmux_resend_si (TsMux *mux);
gboolean tsmux_add_mpegts_si_section (TsMux * mux, GstMpegtsSection * section);
/* One-time sections */
gboolean tsmux_send_section (TsMux *mux, GstMpegtsSection *section);
/* stream management */
TsMuxStream * tsmux_create_stream (TsMux *mux, guint stream_type, guint16 pid, gchar *language);
TsMuxStream * tsmux_find_stream (TsMux *mux, guint16 pid);

View file

@ -123,6 +123,8 @@ G_BEGIN_DECLS
#define TSMUX_DEFAULT_SI_INTERVAL (TSMUX_CLOCK_FREQ / 10)
/* PCR interval (1/25th sec) */
#define TSMUX_DEFAULT_PCR_INTERVAL (TSMUX_CLOCK_FREQ / 25)
/* SCTE-35 NULL Interval (5mins) */
#define TSMUX_DEFAULT_SCTE_35_NULL_INTERVAL (TSMUX_CLOCK_FREQ * 300)
/* Bitrate (bits per second) */
#define TSMUX_DEFAULT_BITRATE 0

View file

@ -1,4 +1,4 @@
foreach fname : ['ts-parser.c', 'ts-section-writer.c']
foreach fname : ['ts-parser.c', 'ts-section-writer.c', 'ts-scte-writer.c']
exe_name = fname.split('.').get(0).underscorify()
executable(exe_name,

View file

@ -0,0 +1,101 @@
#include <gst/gst.h>
#include <gst/mpegts/mpegts.h>
/* 45s stream
* Send scte-35 NULL packets every 5s
* Use PID 123 for SCTE-35 */
#define PIPELINE_STR "videotestsrc is-live=True num-buffers=1350 ! video/x-raw,framerate=30/1 ! x264enc tune=zerolatency ! queue ! mpegtsmux name=mux scte-35-pid=123 scte-35-null-interval=450000 ! filesink location=test-scte.ts"
static void
_on_bus_message (GstBus * bus, GstMessage * message, GMainLoop * mainloop)
{
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_ERROR:
case GST_MESSAGE_EOS:
g_main_loop_quit (mainloop);
break;
default:
break;
}
}
static void
send_splice (GstElement * mux, gboolean out)
{
GstMpegtsSCTESIT *sit;
GstMpegtsSection *section;
g_print ("Sending Splice %s event\n", out ? "Out" : "In");
/* Splice is at 5s for 30s */
if (out)
sit = gst_mpegts_scte_splice_out_new (1, 5 * 90000, 30 * 90000);
else
sit = gst_mpegts_scte_splice_in_new (2, 35 * 90000);
section = gst_mpegts_section_from_scte_sit (sit, 123);
gst_mpegts_section_send_event (section, mux);
gst_mpegts_section_unref (section);
}
static gboolean
send_splice_in (GstElement * mux)
{
send_splice (mux, FALSE);
return G_SOURCE_REMOVE;
}
static gboolean
send_splice_out (GstElement * mux)
{
send_splice (mux, TRUE);
/* In 30s send the splice-in one */
g_timeout_add_seconds (30, (GSourceFunc) send_splice_in, mux);
return G_SOURCE_REMOVE;
}
int
main (int argc, char **argv)
{
GstElement *pipeline = NULL;
GError *error = NULL;
GstBus *bus;
GMainLoop *mainloop;
GstElement *mux;
gst_init (&argc, &argv);
gst_mpegts_initialize ();
pipeline = gst_parse_launch (PIPELINE_STR, &error);
if (error) {
g_print ("pipeline could not be constructed: %s\n", error->message);
g_clear_error (&error);
return 1;
}
mainloop = g_main_loop_new (NULL, FALSE);
mux = gst_bin_get_by_name (GST_BIN (pipeline), "mux");
/* Send splice-out 1s in */
g_timeout_add_seconds (1, (GSourceFunc) send_splice_out, mux);
gst_object_unref (mux);
/* Put a bus handler */
bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
gst_bus_add_signal_watch (bus);
g_signal_connect (bus, "message", (GCallback) _on_bus_message, mainloop);
/* Start pipeline */
gst_element_set_state (pipeline, GST_STATE_PLAYING);
g_main_loop_run (mainloop);
gst_element_set_state (pipeline, GST_STATE_NULL);
gst_object_unref (pipeline);
gst_object_unref (bus);
return 0;
}