mpegtsmux: allow for aligned output

... and refactor packet handling some more in the process.

Fixes #651805.
This commit is contained in:
Mark Nauwelaerts 2012-06-07 17:38:51 +02:00
parent ead42a5e27
commit 0407e21ee3
2 changed files with 140 additions and 92 deletions

View file

@ -108,9 +108,13 @@ enum
ARG_PROG_MAP, ARG_PROG_MAP,
ARG_M2TS_MODE, ARG_M2TS_MODE,
ARG_PAT_INTERVAL, ARG_PAT_INTERVAL,
ARG_PMT_INTERVAL ARG_PMT_INTERVAL,
ARG_ALIGNMENT
}; };
#define MPEGTSMUX_DEFAULT_ALIGNMENT -1
#define MPEGTSMUX_DEFAULT_M2TS FALSE
static GstStaticPadTemplate mpegtsmux_sink_factory = static GstStaticPadTemplate mpegtsmux_sink_factory =
GST_STATIC_PAD_TEMPLATE ("sink_%d", GST_STATIC_PAD_TEMPLATE ("sink_%d",
GST_PAD_SINK, GST_PAD_SINK,
@ -151,7 +155,8 @@ static gboolean new_packet_cb (guint8 * data, guint len, void *user_data,
gint64 new_pcr); gint64 new_pcr);
static void release_buffer_cb (guint8 * data, void *user_data); static void release_buffer_cb (guint8 * data, void *user_data);
static GstFlowReturn mpegtsmux_collect_packet (MpegTsMux * mux, static GstFlowReturn mpegtsmux_collect_packet (MpegTsMux * mux,
guint8 * data, guint len); GstBuffer * buf);
static GstFlowReturn mpegtsmux_push_packets (MpegTsMux * mux, gboolean force);
static void mpegtsdemux_prepare_srcpad (MpegTsMux * mux); static void mpegtsdemux_prepare_srcpad (MpegTsMux * mux);
static GstFlowReturn mpegtsmux_collected (GstCollectPads2 * pads, static GstFlowReturn mpegtsmux_collected (GstCollectPads2 * pads,
@ -207,8 +212,8 @@ mpegtsmux_class_init (MpegTsMuxClass * klass)
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_M2TS_MODE, g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_M2TS_MODE,
g_param_spec_boolean ("m2ts-mode", "M2TS(192 bytes) Mode", g_param_spec_boolean ("m2ts-mode", "M2TS(192 bytes) Mode",
"Set to TRUE to output Blu-Ray disc format with 192 byte packets. " "Set to TRUE to output Blu-Ray disc format with 192 byte packets. "
"FALSE for standard TS format with 188 byte packets.", FALSE, "FALSE for standard TS format with 188 byte packets.",
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); MPEGTSMUX_DEFAULT_M2TS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PAT_INTERVAL, g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PAT_INTERVAL,
g_param_spec_uint ("pat-interval", "PAT interval", g_param_spec_uint ("pat-interval", "PAT interval",
@ -221,6 +226,13 @@ mpegtsmux_class_init (MpegTsMuxClass * klass)
"Set the interval (in ticks of the 90kHz clock) for writing out the PMT table", "Set the interval (in ticks of the 90kHz clock) for writing out the PMT table",
1, G_MAXUINT, TSMUX_DEFAULT_PMT_INTERVAL, 1, G_MAXUINT, TSMUX_DEFAULT_PMT_INTERVAL,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_ALIGNMENT,
g_param_spec_int ("alignment", "packet alignment",
"Number of packets per buffer (padded with dummy packets on EOS) "
"(-1 = auto, 0 = all available packets)",
-1, G_MAXINT, MPEGTSMUX_DEFAULT_ALIGNMENT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
} }
static void static void
@ -240,12 +252,14 @@ mpegtsmux_init (MpegTsMux * mux, MpegTsMuxClass * g_class)
mux); mux);
mux->adapter = gst_adapter_new (); mux->adapter = gst_adapter_new ();
mux->out_adapter = gst_adapter_new ();
/* properties */ /* properties */
mux->m2ts_mode = FALSE; mux->m2ts_mode = MPEGTSMUX_DEFAULT_M2TS;
mux->pat_interval = TSMUX_DEFAULT_PAT_INTERVAL; mux->pat_interval = TSMUX_DEFAULT_PAT_INTERVAL;
mux->pmt_interval = TSMUX_DEFAULT_PMT_INTERVAL; mux->pmt_interval = TSMUX_DEFAULT_PMT_INTERVAL;
mux->prog_map = NULL; mux->prog_map = NULL;
mux->alignment = MPEGTSMUX_DEFAULT_ALIGNMENT;
/* initial state */ /* initial state */
mpegtsmux_reset (mux, TRUE); mpegtsmux_reset (mux, TRUE);
@ -293,6 +307,9 @@ mpegtsmux_reset (MpegTsMux * mux, gboolean alloc)
mux->force_key_unit_event = NULL; mux->force_key_unit_event = NULL;
mux->pending_key_unit_ts = GST_CLOCK_TIME_NONE; mux->pending_key_unit_ts = GST_CLOCK_TIME_NONE;
gst_adapter_clear (mux->adapter);
gst_adapter_clear (mux->out_adapter);
if (mux->tsmux) { if (mux->tsmux) {
tsmux_free (mux->tsmux); tsmux_free (mux->tsmux);
mux->tsmux = NULL; mux->tsmux = NULL;
@ -333,10 +350,13 @@ mpegtsmux_dispose (GObject * object)
mpegtsmux_reset (mux, FALSE); mpegtsmux_reset (mux, FALSE);
if (mux->adapter) { if (mux->adapter) {
gst_adapter_clear (mux->adapter);
g_object_unref (mux->adapter); g_object_unref (mux->adapter);
mux->adapter = NULL; mux->adapter = NULL;
} }
if (mux->out_adapter) {
g_object_unref (mux->out_adapter);
mux->out_adapter = NULL;
}
if (mux->collect) { if (mux->collect) {
gst_object_unref (mux->collect); gst_object_unref (mux->collect);
mux->collect = NULL; mux->collect = NULL;
@ -388,6 +408,9 @@ gst_mpegtsmux_set_property (GObject * object, guint prop_id,
walk = g_slist_next (walk); walk = g_slist_next (walk);
} }
break; break;
case ARG_ALIGNMENT:
mux->alignment = g_value_get_int (value);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -413,6 +436,9 @@ gst_mpegtsmux_get_property (GObject * object, guint prop_id,
case ARG_PMT_INTERVAL: case ARG_PMT_INTERVAL:
g_value_set_uint (value, mux->pmt_interval); g_value_set_uint (value, mux->pmt_interval);
break; break;
case ARG_ALIGNMENT:
g_value_set_int (value, mux->alignment);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -997,7 +1023,7 @@ mpegtsmux_collected (GstCollectPads2 * pads, MpegTsMux * mux)
} }
} }
/* flush packet cache */ /* flush packet cache */
mpegtsmux_collect_packet (mux, NULL, 0); mpegtsmux_push_packets (mux, FALSE);
} else { } else {
/* FIXME: Drain all remaining streams */ /* FIXME: Drain all remaining streams */
/* At EOS */ /* At EOS */
@ -1137,70 +1163,106 @@ new_packet_common_init (MpegTsMux * mux, GstBuffer * buf, guint8 * data,
} }
static GstFlowReturn static GstFlowReturn
mpegtsmux_collect_packet (MpegTsMux * mux, guint8 * data, guint len) mpegtsmux_push_packets (MpegTsMux * mux, gboolean force)
{ {
gint align = mux->alignment;
gint av, packet_size;
GstBuffer *buf; GstBuffer *buf;
GstFlowReturn ret = GST_FLOW_OK; GstFlowReturn ret = GST_FLOW_OK;
GstClockTime ts;
GST_LOG_OBJECT (mux, "collecting packet size %d", len); if (mux->m2ts_mode) {
again: packet_size = M2TS_PACKET_LENGTH;
if (!mux->out_buffer && data) { if (align < 0)
/* some extra spare slack for header overhead and PAT/PMT */ align = 32;
mux->out_buffer = } else {
gst_buffer_new_and_alloc (MAX (mux->last_size * 2 + 1024, len)); packet_size = NORMAL_TS_PACKET_LENGTH;
mux->out_offset = 0; if (align < 0)
GST_DEBUG_OBJECT (mux, "created new packet cache size %d", align = 0;
GST_BUFFER_SIZE (mux->out_buffer));
} }
buf = mux->out_buffer; av = gst_adapter_available (mux->out_adapter);
if (data && (len + mux->out_offset <= GST_BUFFER_SIZE (buf))) { GST_LOG_OBJECT (mux, "align %d, av %d", align, av);
GST_LOG_OBJECT (mux, "collecting packet");
memcpy (GST_BUFFER_DATA (buf) + mux->out_offset, data, len); if (!align)
mux->out_offset += len; align = av;
ret = GST_FLOW_OK; else
} else if (buf) { align *= packet_size;
GST_LOG_OBJECT (mux, "pushing packet cache");
GST_BUFFER_TIMESTAMP (buf) = mux->last_ts; GST_LOG_OBJECT (mux, "aligning to %d bytes", align);
GST_BUFFER_SIZE (buf) = mux->out_offset; if (G_LIKELY ((align <= av) && av)) {
new_packet_common_init (mux, buf, NULL, 0); GST_LOG_OBJECT (mux, "pushing %d aligned bytes", av - (av % align));
mux->out_buffer = NULL; ts = gst_adapter_prev_timestamp (mux->out_adapter, NULL);
mux->out_offset = 0; buf = gst_adapter_take_buffer (mux->out_adapter, av - (av % align));
g_assert (buf);
GST_BUFFER_TIMESTAMP (buf) = ts;
gst_buffer_set_caps (buf, GST_PAD_CAPS (mux->srcpad));
ret = gst_pad_push (mux->srcpad, buf);
av = av % align;
}
if (av && force) {
guint8 *data;
guint32 header;
gint dummy;
GST_LOG_OBJECT (mux, "handling %d leftover bytes", av);
buf = gst_buffer_new_and_alloc (align);
ts = gst_adapter_prev_timestamp (mux->out_adapter, NULL);
gst_adapter_copy (mux->out_adapter, GST_BUFFER_DATA (buf), 0, av);
gst_adapter_clear (mux->out_adapter);
GST_BUFFER_TIMESTAMP (buf) = ts;
gst_buffer_set_caps (buf, GST_PAD_CAPS (mux->srcpad));
data = GST_BUFFER_DATA (buf) + av;
header = GST_READ_UINT32_BE (data - packet_size);
dummy = (GST_BUFFER_SIZE (buf) - av) / packet_size;
GST_LOG_OBJECT (mux, "adding %d null packets", dummy);
for (; dummy > 0; dummy--) {
gint offset;
if (packet_size > NORMAL_TS_PACKET_LENGTH) {
GST_WRITE_UINT32_BE (data, header);
/* simply increase header a bit and never mind too much */
header++;
offset = 4;
} else {
offset = 0;
}
GST_WRITE_UINT8 (data + offset, TSMUX_SYNC_BYTE);
/* null packet PID */
GST_WRITE_UINT16_BE (data + offset + 1, 0x1FFF);
/* no adaptation field exists | continuity counter undefined */
GST_WRITE_UINT8 (data + offset + 3, 0x10);
/* payload */
memset (data + offset + 4, 0, NORMAL_TS_PACKET_LENGTH - 4);
data += packet_size;
}
ret = gst_pad_push (mux->srcpad, buf); ret = gst_pad_push (mux->srcpad, buf);
if (data)
goto again;
} }
return ret; return ret;
} }
static gboolean static GstFlowReturn
new_packet_m2ts (MpegTsMux * mux, guint8 * data, guint len, gint64 new_pcr) mpegtsmux_collect_packet (MpegTsMux * mux, GstBuffer * buf)
{ {
GstBuffer *buf, *out_buf; GST_LOG_OBJECT (mux, "collecting packet size %d", GST_BUFFER_SIZE (buf));
GstFlowReturn ret; gst_adapter_push (mux->out_adapter, buf);
return GST_FLOW_OK;
}
static gboolean
new_packet_m2ts (MpegTsMux * mux, GstBuffer * buf, gint64 new_pcr)
{
GstBuffer *out_buf;
int chunk_bytes; int chunk_bytes;
/* FIXME also collect packets GST_LOG_OBJECT (mux, "Have buffer with new_pcr=%" G_GINT64_FORMAT, new_pcr);
* although no more separate timestamp for each one then ?? */
GST_LOG_OBJECT (mux, "Have buffer with new_pcr=%" G_GINT64_FORMAT " size %d",
new_pcr, len);
buf = gst_buffer_new_and_alloc (M2TS_PACKET_LENGTH);
if (G_UNLIKELY (buf == NULL)) {
GST_ELEMENT_ERROR (mux, STREAM, MUX,
("Failed allocating output buffer"), (NULL));
mux->last_flow_ret = GST_FLOW_ERROR;
return FALSE;
}
/* copies the TS data of 188 bytes to the m2ts buffer at an offset
of 4 bytes to leave space for writing the timestamp later */
memcpy (GST_BUFFER_DATA (buf) + 4, data, len);
/* After copying the data into the buffer, do other common init (flags and streamheaders) */
new_packet_common_init (mux, buf, data, len);
GST_BUFFER_TIMESTAMP (buf) = mux->last_ts;
if (new_pcr < 0) { if (new_pcr < 0) {
/* If there is no pcr in current ts packet then just add the packet /* If there is no pcr in current ts packet then just add the packet
@ -1263,11 +1325,7 @@ new_packet_m2ts (MpegTsMux * mux, guint8 * data, guint len, gint64 new_pcr)
GST_LOG_OBJECT (mux, "Outputting a packet of length %d PCR %" GST_LOG_OBJECT (mux, "Outputting a packet of length %d PCR %"
G_GUINT64_FORMAT, M2TS_PACKET_LENGTH, cur_pcr); G_GUINT64_FORMAT, M2TS_PACKET_LENGTH, cur_pcr);
ret = gst_pad_push (mux->srcpad, out_buf); mpegtsmux_collect_packet (mux, out_buf);
if (G_UNLIKELY (ret != GST_FLOW_OK)) {
mux->last_flow_ret = ret;
return FALSE;
}
} }
} }
@ -1277,11 +1335,7 @@ new_packet_m2ts (MpegTsMux * mux, guint8 * data, guint len, gint64 new_pcr)
GST_LOG_OBJECT (mux, "Outputting a packet of length %d PCR %" GST_LOG_OBJECT (mux, "Outputting a packet of length %d PCR %"
G_GUINT64_FORMAT, M2TS_PACKET_LENGTH, new_pcr); G_GUINT64_FORMAT, M2TS_PACKET_LENGTH, new_pcr);
ret = gst_pad_push (mux->srcpad, buf); mpegtsmux_collect_packet (mux, buf);
if (G_UNLIKELY (ret != GST_FLOW_OK)) {
mux->last_flow_ret = ret;
return FALSE;
}
if (new_pcr != mux->previous_pcr) { if (new_pcr != mux->previous_pcr) {
mux->previous_pcr = new_pcr; mux->previous_pcr = new_pcr;
@ -1291,38 +1345,30 @@ new_packet_m2ts (MpegTsMux * mux, guint8 * data, guint len, gint64 new_pcr)
return TRUE; return TRUE;
} }
static gboolean /* Called when the TsMux has prepared a packet for output. Return FALSE
new_packet_normal_ts (MpegTsMux * mux, guint8 * data, guint len, gint64 new_pcr) * on error */
{
GstFlowReturn ret;
/* Output a normal TS packet */
GST_LOG_OBJECT (mux, "Pushing a packet of length %d", len);
/* do common init (flags and streamheaders) */
new_packet_common_init (mux, NULL, data, len);
ret = mpegtsmux_collect_packet (mux, data, len);
if (G_UNLIKELY (ret != GST_FLOW_OK)) {
mux->last_flow_ret = ret;
return FALSE;
}
return TRUE;
}
static gboolean static gboolean
new_packet_cb (guint8 * data, guint len, void *user_data, gint64 new_pcr) new_packet_cb (guint8 * data, guint len, void *user_data, gint64 new_pcr)
{ {
/* Called when the TsMux has prepared a packet for output. Return FALSE
* on error */
MpegTsMux *mux = (MpegTsMux *) user_data; MpegTsMux *mux = (MpegTsMux *) user_data;
gint offset = 0;
GstBuffer *buf;
if (mux->m2ts_mode == TRUE) { if (mux->m2ts_mode == TRUE)
return new_packet_m2ts (mux, data, len, new_pcr); offset = 4;
}
return new_packet_normal_ts (mux, data, len, new_pcr); buf = gst_buffer_new_and_alloc (NORMAL_TS_PACKET_LENGTH + offset);
GST_BUFFER_TIMESTAMP (buf) = mux->last_ts;
memcpy (GST_BUFFER_DATA (buf) + offset, data, len);
/* do common init (flags and streamheaders) */
new_packet_common_init (mux, buf, data, len);
if (offset)
return new_packet_m2ts (mux, buf, new_pcr);
else
mpegtsmux_collect_packet (mux, buf);
return TRUE;
} }
static void static void

View file

@ -139,6 +139,7 @@ struct MpegTsMux {
GstStructure *prog_map; GstStructure *prog_map;
guint pat_interval; guint pat_interval;
guint pmt_interval; guint pmt_interval;
gint alignment;
/* state */ /* state */
gboolean first; gboolean first;
@ -158,6 +159,7 @@ struct MpegTsMux {
GstAdapter *adapter; GstAdapter *adapter;
/* output buffer aggregation */ /* output buffer aggregation */
GstAdapter *out_adapter;
GstBuffer *out_buffer; GstBuffer *out_buffer;
gint out_offset; gint out_offset;
gint last_size; gint last_size;