matroskademux: support push based mode

Fixes #598610.
This commit is contained in:
Mark Nauwelaerts 2009-12-16 12:43:27 +01:00
parent e4183c6904
commit 900ff7247e
4 changed files with 470 additions and 8 deletions

View file

@ -170,6 +170,29 @@ gst_ebml_read_change_state (GstElement * element, GstStateChange transition)
return ret;
}
/*
* Used in push mode.
* Provided buffer is used as cache, based on offset 0, and no further reads
* will be issued.
*/
void
gst_ebml_read_reset_cache (GstEbmlRead * ebml, GstBuffer * buffer,
guint64 offset)
{
if (ebml->cached_buffer)
gst_buffer_unref (ebml->cached_buffer);
ebml->cached_buffer = buffer;
ebml->push_cache = TRUE;
buffer = gst_buffer_make_metadata_writable (buffer);
GST_BUFFER_OFFSET (buffer) = offset;
ebml->offset = offset;
g_list_foreach (ebml->level, (GFunc) gst_ebml_level_free, NULL);
g_list_free (ebml->level);
ebml->level = NULL;
}
/*
* Return: the amount of levels in the hierarchy that the
* current element lies higher than the previous one.
@ -224,6 +247,13 @@ gst_ebml_read_peek_bytes (GstEbmlRead * ebml, guint size, GstBuffer ** p_buf,
return GST_FLOW_OK;
}
/* not enough data in the cache, free cache and get a new one */
/* never drop pushed cache */
if (ebml->push_cache) {
if (ebml->offset == cache_offset + cache_size)
return GST_FLOW_END;
else
return GST_FLOW_UNEXPECTED;
}
gst_buffer_unref (ebml->cached_buffer);
ebml->cached_buffer = NULL;
}
@ -437,8 +467,17 @@ gst_ebml_peek_id (GstEbmlRead * ebml, guint * level_up, guint32 * id)
next:
off = ebml->offset; /* save offset */
if ((ret = gst_ebml_read_element_id (ebml, id, &level_up_tmp)) != GST_FLOW_OK)
if ((ret = gst_ebml_read_element_id (ebml, id, &level_up_tmp)) != GST_FLOW_OK) {
if (ret != GST_FLOW_END)
return ret;
else {
/* simulate dummy VOID element,
* and have the call stack bail out all the way */
*id = GST_EBML_ID_VOID;
*level_up = G_MAXUINT32 >> 2;
return GST_FLOW_OK;
}
}
ebml->offset = off; /* restore offset */

View file

@ -39,6 +39,9 @@ G_BEGIN_DECLS
#define GST_EBML_READ_GET_CLASS(obj) \
(G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_EBML_READ, GstEbmlReadClass))
/* custom flow return code */
#define GST_FLOW_END GST_FLOW_CUSTOM_SUCCESS
typedef struct _GstEbmlLevel {
guint64 start;
guint64 length;
@ -48,6 +51,7 @@ typedef struct _GstEbmlRead {
GstElement parent;
GstBuffer *cached_buffer;
gboolean push_cache;
GstPad *sinkpad;
guint64 offset;
@ -63,6 +67,10 @@ GType gst_ebml_read_get_type (void);
void gst_ebml_level_free (GstEbmlLevel *level);
void gst_ebml_read_reset_cache (GstEbmlRead * ebml,
GstBuffer * buffer,
guint64 offset);
GstFlowReturn gst_ebml_peek_id (GstEbmlRead *ebml,
guint *level_up,
guint32 *id);

View file

@ -149,6 +149,11 @@ static const GstQueryType *gst_matroska_demux_get_src_query_types (GstPad *
static gboolean gst_matroska_demux_handle_src_query (GstPad * pad,
GstQuery * query);
static gboolean gst_matroska_demux_handle_sink_event (GstPad * pad,
GstEvent * event);
static GstFlowReturn gst_matroska_demux_chain (GstPad * pad,
GstBuffer * buffer);
static GstStateChangeReturn
gst_matroska_demux_change_state (GstElement * element,
GstStateChange transition);
@ -208,6 +213,8 @@ gst_matroska_demux_finalize (GObject * object)
demux->global_tags = NULL;
}
g_object_unref (demux->adapter);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
@ -244,6 +251,10 @@ gst_matroska_demux_init (GstMatroskaDemux * demux,
GST_DEBUG_FUNCPTR (gst_matroska_demux_sink_activate));
gst_pad_set_activatepull_function (demux->sinkpad,
GST_DEBUG_FUNCPTR (gst_matroska_demux_sink_activate_pull));
gst_pad_set_chain_function (demux->sinkpad,
GST_DEBUG_FUNCPTR (gst_matroska_demux_chain));
gst_pad_set_event_function (demux->sinkpad,
GST_DEBUG_FUNCPTR (gst_matroska_demux_handle_sink_event));
gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
GST_EBML_READ (demux)->sinkpad = demux->sinkpad;
@ -255,6 +266,8 @@ gst_matroska_demux_init (GstMatroskaDemux * demux,
demux->index = NULL;
demux->global_tags = NULL;
demux->adapter = gst_adapter_new ();
/* finish off */
gst_matroska_demux_reset (GST_ELEMENT (demux));
}
@ -390,6 +403,10 @@ gst_matroska_demux_reset (GstElement * element)
demux->duration = -1;
demux->last_stop_end = GST_CLOCK_TIME_NONE;
demux->offset = 0;
demux->cluster_time = GST_CLOCK_TIME_NONE;
demux->cluster_offset = 0;
if (demux->close_segment) {
gst_event_unref (demux->close_segment);
demux->close_segment = NULL;
@ -2339,15 +2356,12 @@ gst_matroska_demux_handle_src_event (GstPad * pad, GstEvent * event)
}
static GstFlowReturn
gst_matroska_demux_init_stream (GstMatroskaDemux * demux)
gst_matroska_demux_parse_header (GstMatroskaDemux * demux)
{
GstEbmlRead *ebml = GST_EBML_READ (demux);
guint32 id;
GstFlowReturn ret;
gchar *doctype;
guint version;
GstFlowReturn ret;
GST_DEBUG_OBJECT (demux, "Init stream");
if ((ret = gst_ebml_read_header (ebml, &doctype, &version)) != GST_FLOW_OK)
return ret;
@ -2366,6 +2380,21 @@ gst_matroska_demux_init_stream (GstMatroskaDemux * demux)
return GST_FLOW_ERROR;
}
return ret;
}
static GstFlowReturn
gst_matroska_demux_init_stream (GstMatroskaDemux * demux)
{
GstEbmlRead *ebml = GST_EBML_READ (demux);
guint32 id;
GstFlowReturn ret;
GST_DEBUG_OBJECT (demux, "Init stream");
if ((ret = gst_matroska_demux_parse_header (demux)) != GST_FLOW_OK)
return ret;
/* find segment, must be the next element but search as long as
* we find it anyway */
while (TRUE) {
@ -5168,11 +5197,389 @@ pause:
}
}
static inline void
gst_matroska_demux_take (GstMatroskaDemux * demux, guint bytes)
{
GstBuffer *buffer;
GST_LOG_OBJECT (demux, "caching %d bytes for parsing", bytes);
buffer = gst_adapter_take_buffer (demux->adapter, bytes);
gst_ebml_read_reset_cache (GST_EBML_READ (demux), buffer, demux->offset);
demux->offset += bytes;
}
static inline void
gst_matroska_demux_flush (GstMatroskaDemux * demux, guint flush)
{
GST_LOG_OBJECT (demux, "skipping %d bytes", flush);
gst_adapter_flush (demux->adapter, flush);
demux->offset += flush;
}
static GstFlowReturn
gst_matroska_demux_peek_id_length (GstMatroskaDemux * demux, guint32 * _id,
guint64 * _length, guint * _needed)
{
guint avail, needed;
const guint8 *buf;
gint len_mask = 0x80, read = 1, n = 1, num_ffs = 0;
guint64 total;
guint8 b;
g_return_val_if_fail (_id != NULL, GST_FLOW_ERROR);
g_return_val_if_fail (_length != NULL, GST_FLOW_ERROR);
g_return_val_if_fail (_needed != NULL, GST_FLOW_ERROR);
/* well ... */
*_id = (guint32) GST_EBML_SIZE_UNKNOWN;
*_length = GST_EBML_SIZE_UNKNOWN;
/* read element id */
needed = 2;
avail = gst_adapter_available (demux->adapter);
if (avail < needed)
goto exit;
buf = gst_adapter_peek (demux->adapter, 1);
b = GST_READ_UINT8 (buf);
total = (guint64) b;
while (read <= 4 && !(total & len_mask)) {
read++;
len_mask >>= 1;
}
if (G_UNLIKELY (read > 4))
goto invalid_id;
/* need id and at least something for subsequent length */
if ((needed = read + 1) > avail)
goto exit;
buf = gst_adapter_peek (demux->adapter, needed);
while (n < read) {
b = GST_READ_UINT8 (buf + n);
total = (total << 8) | b;
++n;
}
*_id = (guint32) total;
/* read element length */
b = GST_READ_UINT8 (buf + n);
total = (guint64) b;
len_mask = 0x80;
read = 1;
while (read <= 8 && !(total & len_mask)) {
read++;
len_mask >>= 1;
}
if (G_UNLIKELY (read > 8))
goto invalid_length;
if ((needed += read - 1) > avail)
goto exit;
if ((total &= (len_mask - 1)) == len_mask - 1)
num_ffs++;
buf = gst_adapter_peek (demux->adapter, needed);
buf += (needed - read);
n = 1;
while (n < read) {
guint8 b = GST_READ_UINT8 (buf + n);
if (G_UNLIKELY (b == 0xff))
num_ffs++;
total = (total << 8) | b;
++n;
}
if (G_UNLIKELY (read == num_ffs))
*_length = G_MAXUINT64;
else
*_length = total;
*_length = total;
exit:
*_needed = needed;
return GST_FLOW_OK;
/* ERRORS */
invalid_id:
{
GST_ERROR_OBJECT (demux,
"Invalid EBML ID size tag (0x%x) at position %" G_GUINT64_FORMAT " (0x%"
G_GINT64_MODIFIER "x)", (guint) b, demux->offset, demux->offset);
return GST_FLOW_ERROR;
}
invalid_length:
{
GST_ERROR_OBJECT (demux,
"Invalid EBML length size tag (0x%x) at position %" G_GUINT64_FORMAT
" (0x%" G_GINT64_MODIFIER "x)", (guint) b, demux->offset,
demux->offset);
return GST_FLOW_ERROR;
}
}
static GstFlowReturn
gst_matroska_demux_chain (GstPad * pad, GstBuffer * buffer)
{
GstMatroskaDemux *demux = GST_MATROSKA_DEMUX (GST_PAD_PARENT (pad));
GstEbmlRead *ebml = GST_EBML_READ (demux);
guint available;
GstFlowReturn ret = GST_FLOW_OK;
guint needed;
guint32 id;
guint64 length;
gchar *name;
gst_adapter_push (demux->adapter, buffer);
buffer = NULL;
next:
available = gst_adapter_available (demux->adapter);
ret = gst_matroska_demux_peek_id_length (demux, &id, &length, &needed);
if (G_UNLIKELY (ret != GST_FLOW_OK))
goto parse_failed;
GST_LOG_OBJECT (demux, "Offset %" G_GUINT64_FORMAT ", Element id 0x%x, "
"size %" G_GUINT64_FORMAT ", needed %d, available %d", demux->offset, id,
length, needed, available);
if (needed > available)
return GST_FLOW_OK;
/* only a few blocks are expected/allowed to be large,
* and will be recursed into, whereas others must fit */
if (G_LIKELY (id != GST_MATROSKA_ID_SEGMENT && id != GST_MATROSKA_ID_CLUSTER)) {
if (needed + length > available)
return GST_FLOW_OK;
/* probably happens with 'large pieces' at the end, so consider it EOS */
if (G_UNLIKELY (length > 10 * 1024 * 1024)) {
GST_WARNING_OBJECT (demux, "forcing EOS due to size %" G_GUINT64_FORMAT,
length);
return GST_FLOW_UNEXPECTED;
}
}
switch (demux->state) {
case GST_MATROSKA_DEMUX_STATE_START:
switch (id) {
case GST_EBML_ID_HEADER:
gst_matroska_demux_take (demux, length + needed);
ret = gst_matroska_demux_parse_header (demux);
if (ret != GST_FLOW_OK)
goto parse_failed;
demux->state = GST_MATROSKA_DEMUX_STATE_HEADER;
break;
default:
goto invalid_header;
break;
}
break;
case GST_MATROSKA_DEMUX_STATE_HEADER:
case GST_MATROSKA_DEMUX_STATE_DATA:
switch (id) {
case GST_MATROSKA_ID_SEGMENT:
/* eat segment prefix */
gst_matroska_demux_flush (demux, needed);
GST_DEBUG_OBJECT (demux,
"Found Segment start at offset %" G_GUINT64_FORMAT, ebml->offset);
/* seeks are from the beginning of the segment,
* after the segment ID/length */
demux->ebml_segment_start = demux->offset;
break;
case GST_MATROSKA_ID_SEGMENTINFO:
if (!demux->segmentinfo_parsed) {
gst_matroska_demux_take (demux, length + needed);
ret = gst_matroska_demux_parse_info (demux);
} else {
gst_matroska_demux_flush (demux, needed + length);
}
break;
case GST_MATROSKA_ID_TRACKS:
if (!demux->tracks_parsed) {
gst_matroska_demux_take (demux, length + needed);
ret = gst_matroska_demux_parse_tracks (demux);
} else {
gst_matroska_demux_flush (demux, needed + length);
}
break;
case GST_MATROSKA_ID_CLUSTER:
if (G_UNLIKELY (!demux->tracks_parsed)) {
GST_DEBUG_OBJECT (demux, "Cluster before Track");
goto not_streamable;
} else if (demux->state == GST_MATROSKA_DEMUX_STATE_HEADER) {
demux->state = GST_MATROSKA_DEMUX_STATE_DATA;
GST_DEBUG_OBJECT (demux, "signaling no more pads");
gst_element_no_more_pads (GST_ELEMENT (demux));
/* send initial newsegment */
gst_matroska_demux_send_event (demux,
gst_event_new_new_segment (FALSE, 1.0,
GST_FORMAT_TIME, 0,
(demux->segment.duration >
0) ? demux->segment.duration : -1, 0));
}
demux->cluster_time = GST_CLOCK_TIME_NONE;
demux->cluster_offset = demux->parent.offset;
/* eat cluster prefix */
gst_matroska_demux_flush (demux, needed);
break;
case GST_MATROSKA_ID_CLUSTERTIMECODE:
{
guint64 num;
gst_matroska_demux_take (demux, length + needed);
if ((ret = gst_ebml_read_uint (ebml, &id, &num)) != GST_FLOW_OK)
goto parse_failed;
GST_DEBUG_OBJECT (demux, "ClusterTimeCode: %" G_GUINT64_FORMAT, num);
demux->cluster_time = num;
break;
}
case GST_MATROSKA_ID_BLOCKGROUP:
gst_matroska_demux_take (demux, length + needed);
DEBUG_ELEMENT_START (demux, ebml, "BlockGroup");
if ((ret = gst_ebml_read_master (ebml, &id)) == GST_FLOW_OK) {
ret = gst_matroska_demux_parse_blockgroup_or_simpleblock (demux,
demux->cluster_time, demux->cluster_offset, FALSE);
}
DEBUG_ELEMENT_STOP (demux, ebml, "BlockGroup", ret);
if (ret != GST_FLOW_OK)
goto exit;
break;
case GST_MATROSKA_ID_SIMPLEBLOCK:
gst_matroska_demux_take (demux, length + needed);
DEBUG_ELEMENT_START (demux, ebml, "SimpleBlock");
ret = gst_matroska_demux_parse_blockgroup_or_simpleblock (demux,
demux->cluster_time, demux->cluster_offset, TRUE);
DEBUG_ELEMENT_STOP (demux, ebml, "SimpleBlock", ret);
if (ret != GST_FLOW_OK)
goto exit;
break;
case GST_MATROSKA_ID_ATTACHMENTS:
if (!demux->attachments_parsed) {
gst_matroska_demux_take (demux, length + needed);
ret = gst_matroska_demux_parse_attachments (demux);
} else {
gst_matroska_demux_flush (demux, needed + length);
}
break;
case GST_MATROSKA_ID_TAGS:
gst_matroska_demux_take (demux, length + needed);
ret = gst_matroska_demux_parse_metadata (demux);
break;
case GST_MATROSKA_ID_CHAPTERS:
name = "Cues";
goto skip;
case GST_MATROSKA_ID_SEEKHEAD:
name = "SeekHead";
goto skip;
case GST_MATROSKA_ID_CUES:
name = "Cues";
goto skip;
default:
name = "Unknown";
skip:
GST_DEBUG_OBJECT (demux, "skipping Element 0x%x (%s)", id, name);
gst_matroska_demux_flush (demux, needed + length);
break;
}
break;
}
if (ret != GST_FLOW_OK)
goto parse_failed;
else
goto next;
exit:
return ret;
/* ERRORS */
parse_failed:
{
GST_ELEMENT_ERROR (demux, STREAM, DEMUX, (NULL),
("Failed to parse Element 0x%x", id));
return GST_FLOW_ERROR;
}
not_streamable:
{
GST_ELEMENT_ERROR (demux, STREAM, DEMUX, (NULL),
("File layout does not permit streaming"));
return GST_FLOW_ERROR;
}
invalid_header:
{
GST_ELEMENT_ERROR (demux, STREAM, DEMUX, (NULL), ("Invalid header"));
return GST_FLOW_ERROR;
}
}
static gboolean
gst_matroska_demux_handle_sink_event (GstPad * pad, GstEvent * event)
{
gboolean res = TRUE;
GstMatroskaDemux *demux = GST_MATROSKA_DEMUX (GST_PAD_PARENT (pad));
GST_DEBUG_OBJECT (demux,
"have event type %s: %p on sink pad", GST_EVENT_TYPE_NAME (event), event);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_NEWSEGMENT:
{
GstFormat format;
gdouble rate, arate;
gint64 start, stop, time = 0;
gboolean update;
GstSegment segment;
/* some debug output */
gst_segment_init (&segment, GST_FORMAT_UNDEFINED);
gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
&start, &stop, &time);
gst_segment_set_newsegment_full (&segment, update, rate, arate, format,
start, stop, time);
GST_DEBUG_OBJECT (demux,
"received format %d newsegment %" GST_SEGMENT_FORMAT, format,
&segment);
/* chain will send initial newsegment after pads have been added */
GST_DEBUG_OBJECT (demux, "eating event");
gst_event_unref (event);
res = TRUE;
break;
}
case GST_EVENT_EOS:
{
if (demux->state != GST_MATROSKA_DEMUX_STATE_DATA) {
gst_event_unref (event);
GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
(NULL), ("got eos and didn't receive a complete header object"));
} else if (demux->num_streams == 0) {
GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
(NULL), ("got eos but no streams (yet)"));
} else {
gst_matroska_demux_send_event (demux, event);
}
break;
}
default:
res = gst_pad_event_default (pad, event);
break;
}
return res;
}
static gboolean
gst_matroska_demux_sink_activate (GstPad * sinkpad)
{
if (gst_pad_check_pull_range (sinkpad))
if (gst_pad_check_pull_range (sinkpad)) {
GST_DEBUG ("going to pull mode");
return gst_pad_activate_pull (sinkpad, TRUE);
} else {
GST_DEBUG ("going to push (streaming) mode");
return gst_pad_activate_push (sinkpad, TRUE);
}
return FALSE;
}

View file

@ -23,6 +23,7 @@
#define __GST_MATROSKA_DEMUX_H__
#include <gst/gst.h>
#include <gst/base/gstadapter.h>
#include "ebml-read.h"
#include "matroska-ids.h"
@ -97,6 +98,13 @@ typedef struct _GstMatroskaDemux {
GstEvent *close_segment;
GstEvent *new_segment;
GstTagList *global_tags;
/* push based mode usual suspects */
guint64 offset;
GstAdapter *adapter;
/* some state saving */
GstClockTime cluster_time;
guint64 cluster_offset;
} GstMatroskaDemux;
typedef struct _GstMatroskaDemuxClass {