mpegtsmux: optimize output by aggregating packets

... rather than pushing each (small) packet in a separate buffer.
m2ts mode not yet optimized though.
This commit is contained in:
Mark Nauwelaerts 2012-05-14 11:38:21 +02:00 committed by Sebastian Dröge
parent dda551e723
commit 74c73eba05
2 changed files with 77 additions and 26 deletions

View file

@ -149,6 +149,8 @@ static void mpegtsmux_dispose (GObject * object);
static gboolean new_packet_cb (guint8 * data, guint len, void *user_data,
gint64 new_pcr);
static void release_buffer_cb (guint8 * data, void *user_data);
static GstFlowReturn mpegtsmux_collect_packet (MpegTsMux * mux,
guint8 * data, guint len);
static void mpegtsdemux_prepare_srcpad (MpegTsMux * mux);
static GstFlowReturn mpegtsmux_collected (GstCollectPads * pads,
@ -293,6 +295,7 @@ mpegtsmux_dispose (GObject * object)
mux->streamheader = NULL;
}
gst_event_replace (&mux->force_key_unit_event, NULL);
gst_buffer_replace (&mux->out_buffer, NULL);
GST_CALL_PARENT (G_OBJECT_CLASS, dispose, (object));
}
@ -946,6 +949,7 @@ mpegtsmux_collected (GstCollectPads * pads, MpegTsMux * mux)
best->queued_buf = NULL;
mux->is_delta = delta;
mux->last_size = GST_BUFFER_SIZE (buf);
while (tsmux_stream_bytes_in_buffer (best->stream) > 0) {
if (!tsmux_write_stream_packet (mux->tsmux, best->stream)) {
/* Failed writing data for some reason. Set appropriate error */
@ -956,6 +960,8 @@ mpegtsmux_collected (GstCollectPads * pads, MpegTsMux * mux)
goto write_fail;
}
}
/* flush packet cache */
mpegtsmux_collect_packet (mux, NULL, 0);
if (prog->pcr_stream == best->stream) {
mux->last_ts = best->last_ts;
}
@ -1050,33 +1056,80 @@ new_packet_common_init (MpegTsMux * mux, GstBuffer * buf, guint8 * data,
guint len)
{
/* Packets should be at least 188 bytes, but check anyway */
g_return_if_fail (len >= 2);
g_return_if_fail (len >= 2 || !data);
if (!mux->streamheader_sent) {
if (!mux->streamheader_sent && data) {
guint pid = ((data[1] & 0x1f) << 8) | data[2];
/* if it's a PAT or a PMT */
if (pid == 0x00 || (pid >= TSMUX_START_PMT_PID && pid < TSMUX_START_ES_PID)) {
mux->streamheader =
g_list_append (mux->streamheader, gst_buffer_copy (buf));
GstBuffer *hbuf;
if (!buf) {
hbuf = gst_buffer_new_and_alloc (len);
memcpy (GST_BUFFER_DATA (hbuf), data, len);
} else {
hbuf = gst_buffer_copy (buf);
}
mux->streamheader = g_list_append (mux->streamheader, hbuf);
} else if (mux->streamheader) {
mpegtsdemux_set_header_on_caps (mux);
mux->streamheader_sent = TRUE;
}
}
/* Set the caps on the buffer only after possibly setting the stream headers
* into the pad caps above */
gst_buffer_set_caps (buf, GST_PAD_CAPS (mux->srcpad));
if (buf) {
/* Set the caps on the buffer only after possibly setting the stream headers
* into the pad caps above */
gst_buffer_set_caps (buf, GST_PAD_CAPS (mux->srcpad));
if (mux->is_delta) {
GST_LOG_OBJECT (mux, "marking as delta unit");
GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
} else {
GST_DEBUG_OBJECT (mux, "marking as non-delta unit");
mux->is_delta = TRUE;
if (mux->is_delta) {
GST_LOG_OBJECT (mux, "marking as delta unit");
GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
} else {
GST_DEBUG_OBJECT (mux, "marking as non-delta unit");
mux->is_delta = TRUE;
}
}
}
static GstFlowReturn
mpegtsmux_collect_packet (MpegTsMux * mux, guint8 * data, guint len)
{
GstBuffer *buf;
GstFlowReturn ret = GST_FLOW_OK;
GST_LOG_OBJECT (mux, "collecting packet size %d", len);
again:
if (!mux->out_buffer && data) {
/* some extra spare slack for header overhead and PAT/PMT */
mux->out_buffer =
gst_buffer_new_and_alloc (MAX (mux->last_size * 2 + 1024, len));
mux->out_offset = 0;
GST_DEBUG_OBJECT (mux, "created new packet cache size %d",
GST_BUFFER_SIZE (mux->out_buffer));
}
buf = mux->out_buffer;
if (data && (len + mux->out_offset <= GST_BUFFER_SIZE (buf))) {
GST_LOG_OBJECT (mux, "collecting packet");
memcpy (GST_BUFFER_DATA (buf) + mux->out_offset, data, len);
mux->out_offset += len;
ret = GST_FLOW_OK;
} else if (buf) {
GST_LOG_OBJECT (mux, "pushing packet cache");
GST_BUFFER_TIMESTAMP (buf) = mux->last_ts;
GST_BUFFER_SIZE (buf) = mux->out_offset;
new_packet_common_init (mux, buf, NULL, 0);
mux->out_buffer = NULL;
mux->out_offset = 0;
ret = gst_pad_push (mux->srcpad, buf);
if (data)
goto again;
}
return ret;
}
static gboolean
new_packet_m2ts (MpegTsMux * mux, guint8 * data, guint len, gint64 new_pcr)
{
@ -1084,6 +1137,9 @@ new_packet_m2ts (MpegTsMux * mux, guint8 * data, guint len, gint64 new_pcr)
GstFlowReturn ret;
int chunk_bytes;
/* FIXME also collect packets
* although no more separate timestamp for each one then ?? */
GST_LOG_OBJECT (mux, "Have buffer with new_pcr=%" G_GINT64_FORMAT " size %d",
new_pcr, len);
@ -1195,24 +1251,15 @@ new_packet_m2ts (MpegTsMux * mux, guint8 * data, guint len, gint64 new_pcr)
static gboolean
new_packet_normal_ts (MpegTsMux * mux, guint8 * data, guint len, gint64 new_pcr)
{
GstBuffer *buf;
GstFlowReturn ret;
/* Output a normal TS packet */
GST_LOG_OBJECT (mux, "Outputting a packet of length %d", len);
buf = gst_buffer_new_and_alloc (len);
if (G_UNLIKELY (buf == NULL)) {
mux->last_flow_ret = GST_FLOW_ERROR;
return FALSE;
}
GST_LOG_OBJECT (mux, "Pushing a packet of length %d", len);
memcpy (GST_BUFFER_DATA (buf), data, len);
/* After copying the data into the buffer, do other common init (flags and streamheaders) */
new_packet_common_init (mux, buf, data, len);
/* do common init (flags and streamheaders) */
new_packet_common_init (mux, NULL, data, len);
ret = mpegtsmux_collect_packet (mux, data, len);
GST_BUFFER_TIMESTAMP (buf) = mux->last_ts;
ret = gst_pad_push (mux->srcpad, buf);
if (G_UNLIKELY (ret != GST_FLOW_OK)) {
mux->last_flow_ret = ret;
return FALSE;

View file

@ -130,6 +130,10 @@ struct MpegTsMux {
gboolean streamheader_sent;
GstClockTime pending_key_unit_ts;
GstEvent *force_key_unit_event;
GstBuffer *out_buffer;
gint out_offset;
gint last_size;
};
struct MpegTsMuxClass {