From c557f71756ce7d58c75636b434b1a0faabdf8a81 Mon Sep 17 00:00:00 2001 From: Edward Hervey Date: Mon, 5 Mar 2012 09:38:57 +0100 Subject: [PATCH] tsdemux: Push packets as early as possible When the PES header tells us how big the outgoing packet is, push the packet downstream as soon as we have the specified size instead of waiting for the beginning of the next packet. Reduces latency and removes issues with very sparse streams (like subtitles and subpictures). --- gst/mpegtsdemux/tsdemux.c | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/gst/mpegtsdemux/tsdemux.c b/gst/mpegtsdemux/tsdemux.c index 9bb05cd5b9..255809edab 100644 --- a/gst/mpegtsdemux/tsdemux.c +++ b/gst/mpegtsdemux/tsdemux.c @@ -128,6 +128,11 @@ struct _TSDemuxStream GstBuffer *pendingbuffers[TS_MAX_PENDING_BUFFERS]; guint8 nbpending; + /* Size of data to push (if known) */ + guint expected_size; + /* Size of currently queued data */ + guint current_size; + /* Current data to be pushed out */ GstBufferList *current; GstBufferListIterator *currentit; @@ -1025,6 +1030,8 @@ gst_ts_demux_stream_flush (TSDemuxStream * stream) memset (stream->pendingbuffers, 0, TS_MAX_PENDING_BUFFERS); stream->nbpending = 0; + stream->expected_size = 0; + stream->current_size = 0; stream->current = NULL; stream->need_newsegment = TRUE; stream->pts = GST_CLOCK_TIME_NONE; @@ -1270,7 +1277,10 @@ gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream) } /* Remove PES headers */ - GST_DEBUG ("Moving data forward by %d bytes", header.header_size); + GST_DEBUG ("Moving data forward by %d bytes (packet_size:%d, have:%d)", + header.header_size, header.packet_length, + GST_BUFFER_SIZE (stream->pendingbuffers[0])); + stream->expected_size = header.packet_length; GST_BUFFER_DATA (stream->pendingbuffers[0]) += header.header_size; GST_BUFFER_SIZE (stream->pendingbuffers[0]) -= header.header_size; @@ -1340,12 +1350,14 @@ gst_ts_demux_queue_data (GstTSDemux * demux, TSDemuxStream * stream, GST_LOG ("HEADER: appending data to array"); /* Append to the array */ stream->pendingbuffers[stream->nbpending++] = buf; + stream->current_size += GST_BUFFER_SIZE (buf); /* parse the header */ gst_ts_demux_parse_pes_header (demux, stream); } else if (stream->state == PENDING_PACKET_BUFFER) { GST_LOG ("BUFFER: appending data to bufferlist"); stream->currentlist = g_list_prepend (stream->currentlist, buf); + stream->current_size += GST_BUFFER_SIZE (buf); } @@ -1500,6 +1512,8 @@ beach: memset (stream->pendingbuffers, 0, TS_MAX_PENDING_BUFFERS); stream->nbpending = 0; stream->current = NULL; + stream->expected_size = 0; + stream->current_size = 0; return res; } @@ -1538,9 +1552,14 @@ gst_ts_demux_handle_packet (GstTSDemux * demux, TSDemuxStream * stream, GST_BUFFER_OFFSET (packet->buffer)); } - if (packet->payload && (res == GST_FLOW_OK || res == GST_FLOW_NOT_LINKED)) + if (packet->payload && (res == GST_FLOW_OK || res == GST_FLOW_NOT_LINKED)) { gst_ts_demux_queue_data (demux, stream, packet); - else + GST_DEBUG ("current_size:%d, expected_size:%d", + stream->current_size, stream->expected_size); + /* Finally check if the data we queued completes a packet */ + if (stream->expected_size && stream->current_size == stream->expected_size) + res = gst_ts_demux_push_pending_data (demux, stream); + } else gst_buffer_unref (packet->buffer); return res;