tsdemux: Push packets as early as possible

When the PES header tells us how big the outgoing packet is, push the
packet downstream as soon as we have the specified size instead of waiting
for the beginning of the next packet.
Reduces latency and removes issues with very sparse streams (like subtitles
and subpictures).
This commit is contained in:
Edward Hervey 2012-03-05 09:38:57 +01:00
parent 4565bb1c01
commit c557f71756

View file

@ -128,6 +128,11 @@ struct _TSDemuxStream
GstBuffer *pendingbuffers[TS_MAX_PENDING_BUFFERS]; GstBuffer *pendingbuffers[TS_MAX_PENDING_BUFFERS];
guint8 nbpending; guint8 nbpending;
/* Size of data to push (if known) */
guint expected_size;
/* Size of currently queued data */
guint current_size;
/* Current data to be pushed out */ /* Current data to be pushed out */
GstBufferList *current; GstBufferList *current;
GstBufferListIterator *currentit; GstBufferListIterator *currentit;
@ -1025,6 +1030,8 @@ gst_ts_demux_stream_flush (TSDemuxStream * stream)
memset (stream->pendingbuffers, 0, TS_MAX_PENDING_BUFFERS); memset (stream->pendingbuffers, 0, TS_MAX_PENDING_BUFFERS);
stream->nbpending = 0; stream->nbpending = 0;
stream->expected_size = 0;
stream->current_size = 0;
stream->current = NULL; stream->current = NULL;
stream->need_newsegment = TRUE; stream->need_newsegment = TRUE;
stream->pts = GST_CLOCK_TIME_NONE; stream->pts = GST_CLOCK_TIME_NONE;
@ -1270,7 +1277,10 @@ gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream)
} }
/* Remove PES headers */ /* Remove PES headers */
GST_DEBUG ("Moving data forward by %d bytes", header.header_size); GST_DEBUG ("Moving data forward by %d bytes (packet_size:%d, have:%d)",
header.header_size, header.packet_length,
GST_BUFFER_SIZE (stream->pendingbuffers[0]));
stream->expected_size = header.packet_length;
GST_BUFFER_DATA (stream->pendingbuffers[0]) += header.header_size; GST_BUFFER_DATA (stream->pendingbuffers[0]) += header.header_size;
GST_BUFFER_SIZE (stream->pendingbuffers[0]) -= header.header_size; GST_BUFFER_SIZE (stream->pendingbuffers[0]) -= header.header_size;
@ -1340,12 +1350,14 @@ gst_ts_demux_queue_data (GstTSDemux * demux, TSDemuxStream * stream,
GST_LOG ("HEADER: appending data to array"); GST_LOG ("HEADER: appending data to array");
/* Append to the array */ /* Append to the array */
stream->pendingbuffers[stream->nbpending++] = buf; stream->pendingbuffers[stream->nbpending++] = buf;
stream->current_size += GST_BUFFER_SIZE (buf);
/* parse the header */ /* parse the header */
gst_ts_demux_parse_pes_header (demux, stream); gst_ts_demux_parse_pes_header (demux, stream);
} else if (stream->state == PENDING_PACKET_BUFFER) { } else if (stream->state == PENDING_PACKET_BUFFER) {
GST_LOG ("BUFFER: appending data to bufferlist"); GST_LOG ("BUFFER: appending data to bufferlist");
stream->currentlist = g_list_prepend (stream->currentlist, buf); stream->currentlist = g_list_prepend (stream->currentlist, buf);
stream->current_size += GST_BUFFER_SIZE (buf);
} }
@ -1500,6 +1512,8 @@ beach:
memset (stream->pendingbuffers, 0, TS_MAX_PENDING_BUFFERS); memset (stream->pendingbuffers, 0, TS_MAX_PENDING_BUFFERS);
stream->nbpending = 0; stream->nbpending = 0;
stream->current = NULL; stream->current = NULL;
stream->expected_size = 0;
stream->current_size = 0;
return res; return res;
} }
@ -1538,9 +1552,14 @@ gst_ts_demux_handle_packet (GstTSDemux * demux, TSDemuxStream * stream,
GST_BUFFER_OFFSET (packet->buffer)); GST_BUFFER_OFFSET (packet->buffer));
} }
if (packet->payload && (res == GST_FLOW_OK || res == GST_FLOW_NOT_LINKED)) if (packet->payload && (res == GST_FLOW_OK || res == GST_FLOW_NOT_LINKED)) {
gst_ts_demux_queue_data (demux, stream, packet); gst_ts_demux_queue_data (demux, stream, packet);
else GST_DEBUG ("current_size:%d, expected_size:%d",
stream->current_size, stream->expected_size);
/* Finally check if the data we queued completes a packet */
if (stream->expected_size && stream->current_size == stream->expected_size)
res = gst_ts_demux_push_pending_data (demux, stream);
} else
gst_buffer_unref (packet->buffer); gst_buffer_unref (packet->buffer);
return res; return res;