tsmux: Ability for streams to disappear and reappear

Until now, any streams in tsmux had to be present when the element
started its first buffer. Now they can appear at any point during the
stream, or even disappear and reappear later using the same PID.
This commit is contained in:
Vivia Nikolaidou 2020-04-10 19:54:31 +03:00 committed by GStreamer Merge Bot
parent 97c05d3f4b
commit fecd38c8f6
6 changed files with 246 additions and 100 deletions

View file

@ -688,89 +688,78 @@ not_negotiated:
}
static GstFlowReturn
gst_base_ts_mux_create_streams (GstBaseTsMux * mux)
gst_base_ts_mux_create_pad_stream (GstBaseTsMux * mux, GstPad * pad)
{
GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (pad);
gchar *name = NULL;
gchar *pcr_name;
GstFlowReturn ret = GST_FLOW_OK;
GList *walk = GST_ELEMENT (mux)->sinkpads;
/* Create the streams */
while (walk) {
GstPad *pad = GST_PAD (walk->data);
GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (walk->data);
gchar *name = NULL;
gchar *pcr_name;
walk = g_list_next (walk);
if (ts_pad->prog_id == -1) {
name = GST_PAD_NAME (pad);
if (mux->prog_map != NULL && gst_structure_has_field (mux->prog_map,
name)) {
gint idx;
gboolean ret = gst_structure_get_int (mux->prog_map, name, &idx);
if (!ret) {
GST_ELEMENT_ERROR (mux, STREAM, MUX,
("Reading program map failed. Assuming default"), (NULL));
idx = DEFAULT_PROG_ID;
}
if (idx < 0) {
GST_DEBUG_OBJECT (mux, "Program number %d associate with pad %s less "
"than zero; DEFAULT_PROGRAM = %d is used instead",
idx, name, DEFAULT_PROG_ID);
idx = DEFAULT_PROG_ID;
}
ts_pad->prog_id = idx;
} else {
ts_pad->prog_id = DEFAULT_PROG_ID;
if (ts_pad->prog_id == -1) {
name = GST_PAD_NAME (pad);
if (mux->prog_map != NULL && gst_structure_has_field (mux->prog_map, name)) {
gint idx;
gboolean ret = gst_structure_get_int (mux->prog_map, name, &idx);
if (!ret) {
GST_ELEMENT_ERROR (mux, STREAM, MUX,
("Reading program map failed. Assuming default"), (NULL));
idx = DEFAULT_PROG_ID;
}
}
ts_pad->prog =
(TsMuxProgram *) g_hash_table_lookup (mux->programs,
GINT_TO_POINTER (ts_pad->prog_id));
if (ts_pad->prog == NULL) {
ts_pad->prog = tsmux_program_new (mux->tsmux, ts_pad->prog_id);
if (ts_pad->prog == NULL)
goto no_program;
tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval);
tsmux_program_set_scte35_pid (ts_pad->prog, mux->scte35_pid);
tsmux_program_set_scte35_interval (ts_pad->prog,
mux->scte35_null_interval);
g_hash_table_insert (mux->programs, GINT_TO_POINTER (ts_pad->prog_id),
ts_pad->prog);
}
if (ts_pad->stream == NULL) {
ret = gst_base_ts_mux_create_stream (mux, ts_pad);
if (ret != GST_FLOW_OK)
goto no_stream;
}
if (ts_pad->prog->pcr_stream == NULL) {
/* Take the first stream of the program for the PCR */
GST_DEBUG_OBJECT (ts_pad,
"Use stream (pid=%d) from pad as PCR for program (prog_id = %d)",
ts_pad->pid, ts_pad->prog_id);
tsmux_program_set_pcr_stream (ts_pad->prog, ts_pad->stream);
}
/* Check for user-specified PCR PID */
pcr_name = g_strdup_printf ("PCR_%d", ts_pad->prog->pgm_number);
if (mux->prog_map && gst_structure_has_field (mux->prog_map, pcr_name)) {
const gchar *sink_name =
gst_structure_get_string (mux->prog_map, pcr_name);
if (!g_strcmp0 (name, sink_name)) {
GST_DEBUG_OBJECT (mux, "User specified stream (pid=%d) as PCR for "
"program (prog_id = %d)", ts_pad->pid, ts_pad->prog->pgm_number);
tsmux_program_set_pcr_stream (ts_pad->prog, ts_pad->stream);
if (idx < 0) {
GST_DEBUG_OBJECT (mux, "Program number %d associate with pad %s less "
"than zero; DEFAULT_PROGRAM = %d is used instead",
idx, name, DEFAULT_PROG_ID);
idx = DEFAULT_PROG_ID;
}
ts_pad->prog_id = idx;
} else {
ts_pad->prog_id = DEFAULT_PROG_ID;
}
g_free (pcr_name);
}
return GST_FLOW_OK;
ts_pad->prog =
(TsMuxProgram *) g_hash_table_lookup (mux->programs,
GINT_TO_POINTER (ts_pad->prog_id));
if (ts_pad->prog == NULL) {
ts_pad->prog = tsmux_program_new (mux->tsmux, ts_pad->prog_id);
if (ts_pad->prog == NULL)
goto no_program;
tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval);
tsmux_program_set_scte35_pid (ts_pad->prog, mux->scte35_pid);
tsmux_program_set_scte35_interval (ts_pad->prog, mux->scte35_null_interval);
g_hash_table_insert (mux->programs, GINT_TO_POINTER (ts_pad->prog_id),
ts_pad->prog);
}
if (ts_pad->stream == NULL) {
ret = gst_base_ts_mux_create_stream (mux, ts_pad);
if (ret != GST_FLOW_OK)
goto no_stream;
}
if (ts_pad->prog->pcr_stream == NULL) {
/* Take the first stream of the program for the PCR */
GST_DEBUG_OBJECT (ts_pad,
"Use stream (pid=%d) from pad as PCR for program (prog_id = %d)",
ts_pad->pid, ts_pad->prog_id);
tsmux_program_set_pcr_stream (ts_pad->prog, ts_pad->stream);
}
/* Check for user-specified PCR PID */
pcr_name = g_strdup_printf ("PCR_%d", ts_pad->prog->pgm_number);
if (mux->prog_map && gst_structure_has_field (mux->prog_map, pcr_name)) {
const gchar *sink_name = gst_structure_get_string (mux->prog_map, pcr_name);
if (!g_strcmp0 (name, sink_name)) {
GST_DEBUG_OBJECT (mux, "User specified stream (pid=%d) as PCR for "
"program (prog_id = %d)", ts_pad->pid, ts_pad->prog->pgm_number);
tsmux_program_set_pcr_stream (ts_pad->prog, ts_pad->stream);
}
}
g_free (pcr_name);
return ret;
/* ERRORS */
no_program:
@ -787,6 +776,25 @@ no_stream:
}
}
static GstFlowReturn
gst_base_ts_mux_create_streams (GstBaseTsMux * mux)
{
GstFlowReturn ret = GST_FLOW_OK;
GList *walk = GST_ELEMENT (mux)->sinkpads;
/* Create the streams */
while (walk) {
GstPad *pad = GST_PAD (walk->data);
ret = gst_base_ts_mux_create_pad_stream (mux, pad);
if (ret != GST_FLOW_OK)
return ret;
walk = g_list_next (walk);
}
return GST_FLOW_OK;
}
static void
new_packet_common_init (GstBaseTsMux * mux, GstBuffer * buf, guint8 * data,
guint len)
@ -1078,8 +1086,22 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
}
prog = best->prog;
if (prog == NULL)
goto no_program;
if (prog == NULL) {
GList *cur;
gst_base_ts_mux_create_pad_stream (mux, GST_PAD (best));
tsmux_resend_pat (mux->tsmux);
tsmux_resend_si (mux->tsmux);
prog = best->prog;
g_assert_nonnull (prog);
/* output PMT for each program */
for (cur = mux->tsmux->programs; cur; cur = cur->next) {
TsMuxProgram *program = (TsMuxProgram *) cur->data;
tsmux_resend_pmt (program);
}
}
g_assert (buf != NULL);
@ -1213,15 +1235,6 @@ write_fail:
{
return mux->last_flow_ret;
}
no_program:
{
if (buf)
gst_buffer_unref (buf);
GST_ELEMENT_ERROR (mux, STREAM, MUX,
("Stream on pad %" GST_PTR_FORMAT
" is not associated with any program", best), (NULL));
return GST_FLOW_ERROR;
}
}
/* GstElement implementation */
@ -1259,6 +1272,37 @@ stream_exists:
}
}
static void
gst_base_ts_mux_release_pad (GstElement * element, GstPad * pad)
{
GstBaseTsMux *mux = GST_BASE_TS_MUX (element);
if (mux->tsmux) {
GList *cur;
GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (pad);
gint pid = ts_pad->pid;
if (ts_pad->prog->pcr_stream == ts_pad->stream) {
tsmux_stream_pcr_unref (ts_pad->prog->pcr_stream);
ts_pad->prog->pcr_stream = NULL;
}
if (tsmux_remove_stream (mux->tsmux, pid, ts_pad->prog)) {
g_hash_table_remove (mux->programs, GINT_TO_POINTER (ts_pad->prog_id));
}
tsmux_resend_pat (mux->tsmux);
tsmux_resend_si (mux->tsmux);
/* output PMT for each program */
for (cur = mux->tsmux->programs; cur; cur = cur->next) {
TsMuxProgram *program = (TsMuxProgram *) cur->data;
tsmux_resend_pmt (program);
}
}
gst_element_remove_pad (element, pad);
}
static gboolean
gst_base_ts_mux_send_event (GstElement * element, GstEvent * event)
{
@ -1863,6 +1907,7 @@ gst_base_ts_mux_class_init (GstBaseTsMuxClass * klass)
gobject_class->constructed = gst_base_ts_mux_constructed;
gstelement_class->request_new_pad = gst_base_ts_mux_request_new_pad;
gstelement_class->release_pad = gst_base_ts_mux_release_pad;
gstelement_class->send_event = gst_base_ts_mux_send_event;
gstagg_class->update_src_caps = gst_base_ts_mux_update_src_caps;

View file

@ -467,6 +467,25 @@ tsmux_program_new (TsMux * mux, gint prog_id)
return program;
}
gboolean
tsmux_program_delete (TsMux * mux, TsMuxProgram * program)
{
g_return_val_if_fail (mux != NULL, FALSE);
if (mux->nb_programs == 0)
return FALSE;
if (!program)
return FALSE;
mux->programs = g_list_remove (mux->programs, program);
mux->nb_programs--;
mux->pat_changed = TRUE;
tsmux_program_free ((TsMuxProgram *) program);
return TRUE;
}
/**
* tsmux_set_pmt_interval:
* @program: a #TsMuxProgram
@ -594,6 +613,8 @@ tsmux_program_add_stream (TsMuxProgram * program, TsMuxStream * stream)
g_return_if_fail (program != NULL);
g_return_if_fail (stream != NULL);
stream->program_array_index = program->streams->len;
g_array_append_val (program->streams, stream);
program->pmt_changed = TRUE;
}
@ -720,6 +741,35 @@ tsmux_find_stream (TsMux * mux, guint16 pid)
return found;
}
gboolean
tsmux_remove_stream (TsMux * mux, guint16 pid, TsMuxProgram * program)
{
GList *cur;
gboolean ret = FALSE;
g_return_val_if_fail (mux != NULL, FALSE);
for (cur = mux->streams; cur; cur = cur->next) {
TsMuxStream *stream = (TsMuxStream *) cur->data;
if (tsmux_stream_get_pid (stream) == pid) {
if (program->streams->len == 1) {
tsmux_program_delete (mux, program);
ret = TRUE;
} else {
program->streams =
g_array_remove_index (program->streams,
stream->program_array_index);
}
mux->streams = g_list_remove (mux->streams, stream);
tsmux_stream_free (stream);
return ret;
}
}
return ret;
}
static gboolean
tsmux_get_buffer (TsMux * mux, GstBuffer ** buf)
{

View file

@ -225,6 +225,7 @@ void tsmux_resend_pmt (TsMuxProgram *program);
void tsmux_program_set_scte35_pid (TsMuxProgram *program, guint16 pid);
guint16 tsmux_program_get_scte35_pid (TsMuxProgram *program);
void tsmux_program_set_scte35_interval (TsMuxProgram *mux, guint interval);
gboolean tsmux_program_delete (TsMux *mux, TsMuxProgram *program);
/* SI table management */
@ -239,6 +240,7 @@ gboolean tsmux_send_section (TsMux *mux, GstMpegtsSection *s
/* stream management */
TsMuxStream * tsmux_create_stream (TsMux *mux, guint stream_type, guint16 pid, gchar *language);
TsMuxStream * tsmux_find_stream (TsMux *mux, guint16 pid);
gboolean tsmux_remove_stream (TsMux *mux, guint16 pid, TsMuxProgram *program);
void tsmux_program_add_stream (TsMuxProgram *program, TsMuxStream *stream);
void tsmux_program_set_pcr_stream (TsMuxProgram *program, TsMuxStream *stream);

View file

@ -133,6 +133,7 @@ tsmux_stream_new (guint16 pid, guint stream_type)
stream->pes_payload_size = 0;
stream->cur_pes_payload_size = 0;
stream->pes_bytes_written = 0;
stream->program_array_index = -1;
switch (stream_type) {
case TSMUX_ST_VIDEO_MPEG1:

View file

@ -166,6 +166,8 @@ struct TsMuxStream {
guint8 id;
/* extended stream id (13818-1 Amdt 2) */
guint8 id_extended;
/* array index in program array */
gint program_array_index;
gboolean is_video_stream;

View file

@ -148,30 +148,18 @@ cleanup_tsmux (GstElement * mux, const gchar * sinkname)
}
static void
check_tsmux_pad (GstStaticPadTemplate * srctemplate,
check_tsmux_pad_given_muxer (GstElement * mux,
const gchar * src_caps_string, gint pes_id, gint pmt_id,
const gchar * sinkname, CheckOutputBuffersFunc check_func, guint n_bufs,
gssize input_buf_size, guint alignment)
CheckOutputBuffersFunc check_func, guint n_bufs, gssize input_buf_size)
{
GstClockTime ts;
GstElement *mux;
GstBuffer *inbuffer, *outbuffer;
GstCaps *caps;
gint num_buffers;
gint i;
gint pmt_pid = -1, el_pid = -1, pcr_pid = -1, packets = 0;
gchar *padname;
GstQuery *drain;
mux = setup_tsmux (srctemplate, sinkname, &padname);
if (alignment != 0)
g_object_set (mux, "alignment", alignment, NULL);
fail_unless (gst_element_set_state (mux,
GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
"could not set to playing");
caps = gst_caps_from_string (src_caps_string);
gst_check_setup_events (mysrcpad, mux, caps, GST_FORMAT_TIME);
gst_caps_unref (caps);
@ -348,11 +336,68 @@ check_tsmux_pad (GstStaticPadTemplate * srctemplate,
g_list_free (buffers);
buffers = NULL;
}
static void
check_tsmux_pad (GstStaticPadTemplate * srctemplate,
const gchar * src_caps_string, gint pes_id, gint pmt_id,
const gchar * sinkname, CheckOutputBuffersFunc check_func, guint n_bufs,
gssize input_buf_size, guint alignment)
{
gchar *padname;
GstElement *mux;
mux = setup_tsmux (srctemplate, sinkname, &padname);
if (alignment != 0)
g_object_set (mux, "alignment", alignment, NULL);
fail_unless (gst_element_set_state (mux,
GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
"could not set to playing");
check_tsmux_pad_given_muxer (mux, src_caps_string, pes_id, pmt_id,
check_func, n_bufs, input_buf_size);
cleanup_tsmux (mux, padname);
g_free (padname);
}
GST_START_TEST (test_reappearing_pad)
{
gchar *padname;
GstElement *mux;
GstPad *pad;
mux = gst_check_setup_element ("mpegtsmux");
mysrcpad = setup_src_pad (mux, &video_src_template, "sink_%d", &padname);
mysinkpad = gst_check_setup_sink_pad (mux, &sink_template);
gst_pad_set_active (mysrcpad, TRUE);
gst_pad_set_active (mysinkpad, TRUE);
fail_unless (gst_element_set_state (mux,
GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
"could not set to playing");
check_tsmux_pad_given_muxer (mux, VIDEO_CAPS_STRING, 0xE0, 0x1b, NULL, 1, 1);
pad = gst_element_get_static_pad (mux, padname);
gst_pad_set_active (mysrcpad, FALSE);
gst_object_unref (pad);
teardown_src_pad (mux, padname);
gst_element_release_request_pad (mux, pad);
g_free (padname);
mysrcpad = setup_src_pad (mux, &video_src_template, "sink_%d", &padname);
gst_pad_set_active (mysrcpad, TRUE);
check_tsmux_pad_given_muxer (mux, VIDEO_CAPS_STRING, 0xE0, 0x1b, NULL, 1, 1);
cleanup_tsmux (mux, padname);
g_free (padname);
}
GST_END_TEST;
GST_START_TEST (test_video)
{
@ -491,6 +536,7 @@ mpegtsmux_suite (void)
tcase_add_test (tc_chain, test_multiple_state_change);
tcase_add_test (tc_chain, test_align);
tcase_add_test (tc_chain, test_keyframe_flag_propagation);
tcase_add_test (tc_chain, test_reappearing_pad);
return s;
}